Spaces:
Sleeping
Sleeping
File size: 6,714 Bytes
53ad959 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 |
# Ultralytics YOLO 🚀, AGPL-3.0 license
from collections import defaultdict
from time import time
import cv2
import numpy as np
from ultralytics.utils.checks import check_imshow
from ultralytics.utils.plotting import Annotator, colors
class SpeedEstimator:
"""A class to estimation speed of objects in real-time video stream based on their tracks."""
def __init__(self):
"""Initializes the speed-estimator class with default values for Visual, Image, track and speed parameters."""
# Visual & im0 information
self.im0 = None
self.annotator = None
self.view_img = False
# Region information
self.reg_pts = [(20, 400), (1260, 400)]
self.region_thickness = 3
# Predict/track information
self.clss = None
self.names = None
self.boxes = None
self.trk_ids = None
self.trk_pts = None
self.line_thickness = 2
self.trk_history = defaultdict(list)
# Speed estimator information
self.current_time = 0
self.dist_data = {}
self.trk_idslist = []
self.spdl_dist_thresh = 10
self.trk_previous_times = {}
self.trk_previous_points = {}
# Check if environment support imshow
self.env_check = check_imshow(warn=True)
def set_args(
self,
reg_pts,
names,
view_img=False,
line_thickness=2,
region_thickness=5,
spdl_dist_thresh=10,
):
"""
Configures the speed estimation and display parameters.
Args:
reg_pts (list): Initial list of points defining the speed calculation region.
names (dict): object detection classes names
view_img (bool): Flag indicating frame display
line_thickness (int): Line thickness for bounding boxes.
region_thickness (int): Speed estimation region thickness
spdl_dist_thresh (int): Euclidean distance threshold for speed line
"""
if reg_pts is None:
print("Region points not provided, using default values")
else:
self.reg_pts = reg_pts
self.names = names
self.view_img = view_img
self.line_thickness = line_thickness
self.region_thickness = region_thickness
self.spdl_dist_thresh = spdl_dist_thresh
def extract_tracks(self, tracks):
"""
Extracts results from the provided data.
Args:
tracks (list): List of tracks obtained from the object tracking process.
"""
self.boxes = tracks[0].boxes.xyxy.cpu()
self.clss = tracks[0].boxes.cls.cpu().tolist()
self.trk_ids = tracks[0].boxes.id.int().cpu().tolist()
def store_track_info(self, track_id, box):
"""
Store track data.
Args:
track_id (int): object track id.
box (list): object bounding box data
"""
track = self.trk_history[track_id]
bbox_center = (float((box[0] + box[2]) / 2), float((box[1] + box[3]) / 2))
track.append(bbox_center)
if len(track) > 30:
track.pop(0)
self.trk_pts = np.hstack(track).astype(np.int32).reshape((-1, 1, 2))
return track
def plot_box_and_track(self, track_id, box, cls, track):
"""
Plot track and bounding box.
Args:
track_id (int): object track id.
box (list): object bounding box data
cls (str): object class name
track (list): tracking history for tracks path drawing
"""
speed_label = f"{int(self.dist_data[track_id])}km/ph" if track_id in self.dist_data else self.names[int(cls)]
bbox_color = colors(int(track_id)) if track_id in self.dist_data else (255, 0, 255)
self.annotator.box_label(box, speed_label, bbox_color)
cv2.polylines(self.im0, [self.trk_pts], isClosed=False, color=(0, 255, 0), thickness=1)
cv2.circle(self.im0, (int(track[-1][0]), int(track[-1][1])), 5, bbox_color, -1)
def calculate_speed(self, trk_id, track):
"""
Calculation of object speed.
Args:
trk_id (int): object track id.
track (list): tracking history for tracks path drawing
"""
if not self.reg_pts[0][0] < track[-1][0] < self.reg_pts[1][0]:
return
if self.reg_pts[1][1] - self.spdl_dist_thresh < track[-1][1] < self.reg_pts[1][1] + self.spdl_dist_thresh:
direction = "known"
elif self.reg_pts[0][1] - self.spdl_dist_thresh < track[-1][1] < self.reg_pts[0][1] + self.spdl_dist_thresh:
direction = "known"
else:
direction = "unknown"
if self.trk_previous_times[trk_id] != 0 and direction != "unknown" and trk_id not in self.trk_idslist:
self.trk_idslist.append(trk_id)
time_difference = time() - self.trk_previous_times[trk_id]
if time_difference > 0:
dist_difference = np.abs(track[-1][1] - self.trk_previous_points[trk_id][1])
speed = dist_difference / time_difference
self.dist_data[trk_id] = speed
self.trk_previous_times[trk_id] = time()
self.trk_previous_points[trk_id] = track[-1]
def estimate_speed(self, im0, tracks, region_color=(255, 0, 0)):
"""
Calculate object based on tracking data.
Args:
im0 (nd array): Image
tracks (list): List of tracks obtained from the object tracking process.
region_color (tuple): Color to use when drawing regions.
"""
self.im0 = im0
if tracks[0].boxes.id is None:
if self.view_img and self.env_check:
self.display_frames()
return im0
self.extract_tracks(tracks)
self.annotator = Annotator(self.im0, line_width=2)
self.annotator.draw_region(reg_pts=self.reg_pts, color=region_color, thickness=self.region_thickness)
for box, trk_id, cls in zip(self.boxes, self.trk_ids, self.clss):
track = self.store_track_info(trk_id, box)
if trk_id not in self.trk_previous_times:
self.trk_previous_times[trk_id] = 0
self.plot_box_and_track(trk_id, box, cls, track)
self.calculate_speed(trk_id, track)
if self.view_img and self.env_check:
self.display_frames()
return im0
def display_frames(self):
"""Display frame."""
cv2.imshow("Ultralytics Speed Estimation", self.im0)
if cv2.waitKey(1) & 0xFF == ord("q"):
return
if __name__ == "__main__":
SpeedEstimator()
|