File size: 5,182 Bytes
1d9c414
 
 
 
 
 
 
 
 
cde5d08
 
 
 
 
 
 
 
 
1d9c414
 
 
 
 
713635a
 
 
 
 
 
 
5e76035
1d9c414
 
 
713635a
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
cd3b30d
713635a
 
 
 
 
 
 
5e76035
713635a
 
 
 
 
 
297c2d0
713635a
 
 
 
5e76035
713635a
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1d9c414
 
 
5e76035
1d9c414
28476c6
2253ff6
565037e
2253ff6
 
1d9c414
 
5e76035
cde5d08
 
713635a
2253ff6
713635a
2253ff6
28476c6
713635a
1d9c414
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
import os
import glob
import gradio as gr
import shutil
from ultralytics import YOLO

# Initialize YOLO model
model = YOLO('./best.pt')

# Class to color mapping
ClassToColorMapping = {
    0: "black",
    1: "gray",
    2: "green",
    3: "purple",
    4: "red"
}

def check_ffmpeg():
    """Check if ffmpeg is installed."""
    if shutil.which("ffmpeg") is None:
        raise EnvironmentError("ffmpeg is not installed or not found in the system PATH. Please install ffmpeg to proceed.")

def convert_to_mp4(file):
    """Convert a .avi video file to .mp4 format."""
    mp4_file = f"{file[:-4]}.mp4"
    os.system(f"ffmpeg -y -i \"{file}\" -vcodec libx264 \"{mp4_file}\"")
    os.remove(file)  # Remove the original .avi file after conversion
    return mp4_file

def process_video(video_path, output_option):
    # Check if ffmpeg is installed
    check_ffmpeg()

    # Determine if the video needs to be saved
    save_video = output_option in ["Video", "Both"]

    # Run the YOLO model on the video, specifying the tracker configuration and save option
    results = model.track(
        video_path,
        save=save_video,
        tracker="bytetrack.yaml",
        half=False,
        vid_stride=1,
        iou=0.75,
        conf=0.25
    )

    # Initialize a dictionary to store unique IDs for each color
    chip_ids = {color: set() for color in ClassToColorMapping.values()}

    # Check if results are None or if there are no results
    if results is None or len(results) == 0:
        raise Exception("No detections were found in the video.")

    # Track the unique IDs for each color
    for result in results:
        # Check if result.boxes exists and has valid id and cls attributes
        if result.boxes is None or result.boxes.id is None or result.boxes.cls is None:
            continue  # Skip if no boxes or if ids or classes are None

        # Check if the id and cls tensors are not empty
        if result.boxes.id.numel() == 0 or result.boxes.cls.numel() == 0:
            continue  # Skip if ids or classes are empty tensors

        for cls, id_ in zip(result.boxes.cls, result.boxes.id):
            if cls is None or id_ is None:
                continue  # Skip if class or id is None
            color = ClassToColorMapping.get(int(cls.item()), None)  # Map class label to color
            if color:
                chip_ids[color].add(int(id_.item()))  # Add the unique ID to the set

    # Convert sets to counts of unique IDs
    chip_counts_by_color = {color: len(ids) for color, ids in chip_ids.items()}

    # If the user only wants the count, return it immediately
    if output_option == "Count":
        return chip_counts_by_color, None

    # Process video saving if required
    if save_video:
        video_name = os.path.splitext(os.path.basename(video_path))[0]

        # Find the latest directory created in 'runs/detect/'
        output_dir = max(glob.glob('./runs/detect/*'), key=os.path.getmtime)

        # Find the saved video file in the latest directory with the specific video name
        video_files = glob.glob(os.path.join(output_dir, f'{video_name}*.avi'))

        if not video_files:
            raise Exception(f"No .avi video files found in directory {output_dir} for {video_name}")

        # Convert each .avi video to .mp4
        mp4_files = []
        for file in video_files:
            mp4_file = convert_to_mp4(file)
            mp4_files.append(mp4_file)

        matched_mp4_files = [file for file in mp4_files if video_name in file]

        if not matched_mp4_files:
            raise Exception(f"No .mp4 video files found in directory {output_dir} after conversion for {video_name}.")

        # Return video path and chip counts if both are requested
        if output_option == "Both":
            return chip_counts_by_color, matched_mp4_files[0]
        else:
            return None, matched_mp4_files[0]
    else:
        # Return only chip counts if no video is needed
        return chip_counts_by_color, None

# Define Gradio inputs
video_input = gr.Video()
output_option_input = gr.Radio(choices=["Count", "Video", "Both"], label="Select output type")

# Define a single example for the interface
examples = [
    [os.path.abspath("video_1.mp4"), "Both"]
]

video_interface = gr.Interface(
    fn=process_video,
    inputs=[video_input, output_option_input],
    outputs=[gr.JSON(label="Color Counts"), "video"],  # Ensure two outputs are defined, using JSON for color counts
    title="YOLO Video Tracking Application with Color Counting",
    description="A simple application to track objects in a video using YOLO model and count unique objects by color. Upload your own video, or click one of the examples to load them.",
    article="""<div>
                <p style="text-align: center">Upload a video file and select the type of output you want: unique object counts by color, processed video, or both. Then, hit submit to process the video.</p>
               </div>""",
    examples=examples,
    cache_examples=True  # Disable caching to speed up launch
)

# Deploy the interface with share enabled
gr.TabbedInterface([video_interface], ["Track Video"]).launch(share=True)