Spaces:
Sleeping
Sleeping
import os | |
import glob | |
import gradio as gr | |
import shutil | |
from ultralytics import YOLO | |
# Initialize YOLO model | |
model = YOLO('./best_train2.pt') | |
# Class to color mapping | |
ClassToColorMapping = { | |
0: "black", | |
1: "gray", | |
2: "green", | |
3: "purple", | |
4: "red" | |
} | |
def check_ffmpeg(): | |
"""Check if ffmpeg is installed.""" | |
if shutil.which("ffmpeg") is None: | |
raise EnvironmentError("ffmpeg is not installed or not found in the system PATH. Please install ffmpeg to proceed.") | |
def convert_to_mp4(file): | |
"""Convert a .avi video file to .mp4 format.""" | |
mp4_file = f"{file[:-4]}.mp4" | |
os.system(f"ffmpeg -y -i \"{file}\" -vcodec libx264 \"{mp4_file}\"") | |
os.remove(file) # Remove the original .avi file after conversion | |
return mp4_file | |
def process_video(video_path, output_option): | |
# Check if ffmpeg is installed | |
check_ffmpeg() | |
# Determine if the video needs to be saved | |
save_video = output_option in ["Video", "Both"] | |
# Run the YOLO model on the video, specifying the tracker configuration and save option | |
results = model.track( | |
video_path, | |
save=save_video, | |
tracker="bytetrack.yaml", | |
half=False, | |
vid_stride=1, | |
iou=0.75, | |
conf=0.25 | |
) | |
# Initialize a dictionary to store unique IDs for each color | |
chip_ids = {color: set() for color in ClassToColorMapping.values()} | |
# Check if results are None or if there are no results | |
if results is None or len(results) == 0: | |
raise Exception("No detections were found in the video.") | |
# Track the unique IDs for each color | |
for result in results: | |
# Check if result.boxes exists and has valid id and cls attributes | |
if result.boxes is None or result.boxes.id is None or result.boxes.cls is None: | |
continue # Skip if no boxes or if ids or classes are None | |
# Check if the id and cls tensors are not empty | |
if result.boxes.id.numel() == 0 or result.boxes.cls.numel() == 0: | |
continue # Skip if ids or classes are empty tensors | |
for cls, id_ in zip(result.boxes.cls, result.boxes.id): | |
if cls is None or id_ is None: | |
continue # Skip if class or id is None | |
color = ClassToColorMapping.get(int(cls.item()), None) # Map class label to color | |
if color: | |
chip_ids[color].add(int(id_.item())) # Add the unique ID to the set | |
# Convert sets to counts of unique IDs | |
chip_counts_by_color = {color: len(ids) for color, ids in chip_ids.items()} | |
# If the user only wants the count, return it immediately | |
if output_option == "Count": | |
return chip_counts_by_color, None | |
# Process video saving if required | |
if save_video: | |
video_name = os.path.splitext(os.path.basename(video_path))[0] | |
# Find the latest directory created in 'runs/detect/' | |
output_dir = max(glob.glob('./runs/detect/*'), key=os.path.getmtime) | |
# Find the saved video file in the latest directory with the specific video name | |
video_files = glob.glob(os.path.join(output_dir, f'{video_name}*.avi')) | |
if not video_files: | |
raise Exception(f"No .avi video files found in directory {output_dir} for {video_name}") | |
# Convert each .avi video to .mp4 | |
mp4_files = [] | |
for file in video_files: | |
mp4_file = convert_to_mp4(file) | |
mp4_files.append(mp4_file) | |
matched_mp4_files = [file for file in mp4_files if video_name in file] | |
if not matched_mp4_files: | |
raise Exception(f"No .mp4 video files found in directory {output_dir} after conversion for {video_name}.") | |
# Return video path and chip counts if both are requested | |
if output_option == "Both": | |
return chip_counts_by_color, matched_mp4_files[0] | |
else: | |
return None, matched_mp4_files[0] | |
else: | |
# Return only chip counts if no video is needed | |
return chip_counts_by_color, None | |
# Define Gradio inputs | |
video_input = gr.Video() | |
output_option_input = gr.Radio(choices=["Count", "Video", "Both"], label="Select output type") | |
# Define a single example for the interface | |
examples = [ | |
[os.path.abspath("video_1.mp4"), "Both"] | |
] | |
video_interface = gr.Interface( | |
fn=process_video, | |
inputs=[video_input, output_option_input], | |
outputs=[gr.JSON(label="Color Counts"), "video"], # Ensure two outputs are defined, using JSON for color counts | |
title="YOLO Video Tracking Application with Color Counting", | |
description="A simple application to track objects in a video using YOLO model and count unique objects by color. Upload your own video, or click one of the examples to load them.", | |
article="""<div> | |
<p style="text-align: center">Upload a video file and select the type of output you want: unique object counts by color, processed video, or both. Then, hit submit to process the video.</p> | |
</div>""", | |
examples=examples, | |
cache_examples=True # Disable caching to speed up launch | |
) | |
# Deploy the interface with share enabled | |
gr.TabbedInterface([video_interface], ["Track Video"]).launch(share=True) | |