federico
Starting commint, requirements missing
9d11120
raw
history blame
41.8 kB
import numpy as np
from scipy.spatial import distance as dist
from utils.labels import pose_id_part, pose_id_part_openpose, rev_pose_id_part_openpose, rev_pose_id_part
import cv2
import os
import json
def rescale_bb(boxes, pad, im_width, im_height):
"""
Modify in place the bounding box coordinates (percentage) to the new image width and height
Args:
:boxes (numpy.ndarray): Array of bounding box coordinates expressed in percentage [y_min, x_min, y_max, x_max]
:pad (tuple): The first element represents the right padding (applied by resize_preserving_ar() function);
the second element represents the bottom padding (applied by resize_preserving_ar() function) and
the third element is a tuple that is the shape of the image after resizing without the padding (this is useful for
the coordinates changes)
:im_width (int): The new image width
:im_height (int): The new image height
Returns:
"""
right_padding = pad[0]
bottom_padding = pad[1]
if bottom_padding != 0:
for box in boxes:
y_min, y_max = box[0] * im_height, box[2] * im_height # to pixels
box[0], box[2] = y_min / (im_height - pad[1]), y_max / (im_height - pad[1]) # back to percentage
if right_padding != 0:
for box in boxes:
x_min, x_max = box[1] * im_width, box[3] * im_width # to pixels
box[1], box[3] = x_min / (im_width - pad[0]), x_max / (im_width - pad[0]) # back to percentage
def rescale_key_points(key_points, pad, im_width, im_height):
"""
Modify in place the bounding box coordinates (percentage) to the new image width and height
Args:
:key_points (numpy.ndarray): Array of bounding box coordinates expressed in percentage [y_min, x_min, y_max, x_max]
:pad (tuple): The first element represents the right padding (applied by resize_preserving_ar() function);
the second element represents the bottom padding (applied by resize_preserving_ar() function) and
the third element is a tuple that is the shape of the image after resizing without the padding (this is useful for
the coordinates changes)
:im_width (int): The new image width
:im_height (int): The new image height
Returns:
"""
right_padding = pad[0]
bottom_padding = pad[1]
if bottom_padding != 0:
for aux in key_points:
for point in aux: # x 1 y 0
y = point[0] * im_height
point[0] = y / (im_height - pad[1])
if right_padding != 0:
for aux in key_points:
for point in aux:
x = point[1] * im_width
point[1] = x / (im_width - pad[0])
def change_coordinates_aspect_ratio(aux_key_points_array, img_person, img_person_resized):
"""
Args:
:
Returns:
:
"""
aux_key_points_array_ratio = []
ratio_h, ratio_w = img_person.shape[0] / (img_person_resized.shape[1]), img_person.shape[1] / (img_person_resized.shape[2]) # shape 0 batch 1
for elem in aux_key_points_array:
aux = np.zeros(3)
aux[0] = int((elem[0]) * ratio_h)
aux[1] = int(elem[1] * ratio_h)
aux[2] = int(elem[2])
aux_key_points_array_ratio.append(aux)
aux_key_points_array_ratio = np.array(aux_key_points_array_ratio, dtype=int)
return aux_key_points_array_ratio
def parse_output_pose(heatmaps, offsets, threshold):
"""
Parse the output pose (auxiliary function for tflite models)
Args:
:
Returns:
:
"""
#
# heatmaps: 9x9x17 probability of appearance of each keypoint in the particular part of the image (9,9) -> used to locate position of the joints
# offsets: 9x9x34 used for calculation of the keypoint's position (first 17 x coords, the second 17 y coords)
#
joint_num = heatmaps.shape[-1]
pose_kps = np.zeros((joint_num, 3), np.uint32)
for i in range(heatmaps.shape[-1]):
joint_heatmap = heatmaps[..., i]
max_val_pos = np.squeeze(np.argwhere(joint_heatmap == np.max(joint_heatmap)))
remap_pos = np.array(max_val_pos / 8 * 257, dtype=np.int32)
pose_kps[i, 0] = int(remap_pos[0] + offsets[max_val_pos[0], max_val_pos[1], i])
pose_kps[i, 1] = int(remap_pos[1] + offsets[max_val_pos[0], max_val_pos[1], i + joint_num])
max_prob = np.max(joint_heatmap)
if max_prob > threshold:
if pose_kps[i, 0] < 257 and pose_kps[i, 1] < 257:
pose_kps[i, 2] = 1
return pose_kps
def retrieve_xyz_from_detection(points_list, point_cloud_img):
"""
Retrieve the xyz of the list of points passed as input (if we have the point cloud of the image)
Args:
:points_list (list): list of points for which we want to retrieve xyz information
:point_cloud_img (numpy.ndarray): numpy array containing XYZRGBA information of the image
Returns:
:xyz (list): list of lists of 3D points with XYZ information (left camera origin (0,0,0))
"""
xyz = [[point_cloud_img[:, :, 0][point[1], point[0]], point_cloud_img[:, :, 1][point[1], point[0]], point_cloud_img[:, :, 2][point[1], point[0]]]
for point in points_list]
return xyz
def retrieve_xyz_pose_points(point_cloud_image, key_points_score, key_points):
"""Retrieve the key points from the point cloud to get the XYZ position in the 3D space
Args:
:point_cloud_image (numpy.ndarray):
:key_points_score (list):
:key_points (list):
Returns:
:xyz_pose: a list of lists representing the XYZ 3D coordinates of each key point (j is the index number of the id pose)
"""
xyz_pose = []
for i in range(len(key_points_score)):
xyz_pose_aux = []
for j in range(len(key_points_score[i])):
# if key_points_score[i][j] > threshold:# and j < 5:
x, y = int(key_points[i][j][0] * point_cloud_image.shape[0]) - 1, int(key_points[i][j][1] * point_cloud_image.shape[1]) - 1
xyz_pose_aux.append([point_cloud_image[x, y, 0], point_cloud_image[x, y, 1], point_cloud_image[x, y, 2], key_points_score[i][j]])
xyz_pose.append(xyz_pose_aux)
return xyz_pose
def compute_distance(points_list, min_distance=1.5):
"""
Compute the distance between each point and find if there are points that are closer to each other that do not respect a certain distance
expressed in meter.
Args:
:points_list (list): list of points expressed in xyz 3D coordinates (meters)
:min_distance (float): minimum threshold for distances (if the l2 distance between two objects is lower than this value it is considered a violation)
(default is 1.5)
Returns:
:distance_matrix: matrix containing the distances between each points (diagonal 0)
:violate: set of points that violate the minimum distance threshold
:couple_points: list of lists of couple points that violate the min_distance threshold (to keep track of each couple)
"""
if points_list is None or len(points_list) == 1 or len(points_list) == 0:
return None, None, None
else: # if there are more than two points
violate = set()
couple_points = []
aux = np.array(points_list)
distance_matrix = dist.cdist(aux, aux, 'euclidean')
for i in range(0, distance_matrix.shape[0]): # loop over the upper triangular of the distance matrix
for j in range(i + 1, distance_matrix.shape[1]):
if distance_matrix[i, j] < min_distance:
# print("Distance between {} and {} is {:.2f} meters".format(i, j, distance_matrix[i, j]))
violate.add(i)
violate.add(j)
couple_points.append((i, j))
return distance_matrix, violate, couple_points
def initialize_video_recorder(output_path, output_depth_path, fps, shape):
"""Initialize OpenCV video recorders that will be used to write each image/frame to a single video
Args:
:output (str): The file location where the recorded video will be saved
:output_depth (str): The file location where the recorded video with depth information will be saved
:fps (int): The frame per seconds of the output videos
:shape (tuple): The dimension of the output video (width, height)
Returns:
:writer (cv2.VideoWriter): The video writer used to save the video
:writer_depth (cv2.VideoWriter): The video writer used to save the video with depth information
"""
if not os.path.isdir(os.path.split(output_path)[0]):
logger.error("Invalid path for the video writer; folder does not exist")
exit(1)
fourcc = cv2.VideoWriter_fourcc(*"MJPG")
writer = cv2.VideoWriter(output_path, fourcc, fps, shape, True)
writer_depth = None
if output_depth_path:
if not os.path.isdir(os.path.split(output_depth_path)[0]):
logger.error("Invalid path for the depth video writer; folder does not exist")
exit(1)
writer_depth = cv2.VideoWriter(output_depth_path, fourcc, fps, shape, True)
return writer, writer_depth
def delete_items_from_array_aux(arr, i):
"""
Auxiliary function that delete the item at a certain index from a numpy array
Args:
:arr (numpy.ndarray): Array of array where each element correspond to the four coordinates of bounding box expressed in percentage
:i (int): Index of the element to be deleted
Returns:
:arr_ret: the array without the element at index i
"""
aux = arr.tolist()
aux.pop(i)
arr_ret = np.array(aux)
return arr_ret
def fit_plane_least_square(xyz):
# find a plane that best fit xyz points using least squares
(rows, cols) = xyz.shape
g = np.ones((rows, 3))
g[:, 0] = xyz[:, 0] # X
g[:, 1] = xyz[:, 1] # Y
z = xyz[:, 2]
(a, b, c), _, rank, s = np.linalg.lstsq(g, z, rcond=None)
normal = (a, b, -1)
nn = np.linalg.norm(normal)
normal = normal / nn
point = np.array([0.0, 0.0, c])
d = -point.dot(normal)
return d, normal, point
#
# def plot_plane(data, normal, d):
# from mpl_toolkits.mplot3d import Axes3D
# import matplotlib.pyplot as plt
#
# fig = plt.figure()
# ax = fig.gca(projection='3d')
#
# # plot fitted plane
# maxx = np.max(data[:, 0])
# maxy = np.max(data[:, 1])
# minx = np.min(data[:, 0])
# miny = np.min(data[:, 1])
#
# # compute needed points for plane plotting
# xx, yy = np.meshgrid([minx - 10, maxx + 10], [miny - 10, maxy + 10])
# z = (-normal[0] * xx - normal[1] * yy - d) * 1. / normal[2]
#
# # plot plane
# ax.plot_surface(xx, yy, z, alpha=0.2)
#
# ax.set_xlabel('x')
# ax.set_ylabel('y')
# ax.set_zlabel('z')
# plt.show()
#
# return
def shape_to_np(shape, dtype="int"):
"""
Function used for the dlib facial detector; it determine the facial landmarks for the face region, then convert the facial landmark
(x, y)-coordinates to a NumPy array
Args:
:shape ():
:dtype ():
(Default is "int")
Returns:
:coordinates (list): list of x, y coordinates
"""
# initialize the list of (x, y)-coordinates
coordinates = np.zeros((68, 2), dtype=dtype)
# loop over the 68 facial landmarks and convert them to a 2-tuple of (x, y)-coordinates
for i in range(0, 68):
coordinates[i] = (shape.part(i).x, shape.part(i).y)
# return the list of (x, y)-coordinates
return coordinates
def rect_to_bb(rect):
"""
Function used for the dlib facial detector; it converts dlib's rectangle to a tuple (x, y, w, h) where x and y represent xmin and ymin
coordinates while w and h represent the width and the height
Args:
:rect (dlib.rectangle): dlib rectangle object that represents the region of the image where a face is detected
Returns:
:res (tuple): tuple that represents the region of the image where a face is detected in the form x, y, w, h
"""
# take a bounding predicted by dlib and convert it to the format (x, y, w, h) as we would normally do with OpenCV
x = rect.left()
y = rect.top()
w = rect.right() - x
h = rect.bottom() - y
# return a tuple of (x, y, w, h)
res = x, y, w, h
return res
def enlarge_bb(y_min, x_min, y_max, x_max, im_width, im_height):
"""
Enlarge the bounding box to include more background margin (used for face detection)
Args:
:y_min (int): the top y coordinate of the bounding box
:x_min (int): the left x coordinate of the bounding box
:y_max (int): the bottom y coordinate of the bounding box
:x_max (int): the right x coordinate of the bounding box
:im_width (int): The width of the image
:im_height (int): The height of the image
Returns:
:y_min (int): the top y coordinate of the bounding box after enlarging
:x_min (int): the left x coordinate of the bounding box after enlarging
:y_max (int): the bottom y coordinate of the bounding box after enlarging
:x_max (int): the right x coordinate of the bounding box after enlarging
"""
y_min = int(max(0, y_min - abs(y_min - y_max) / 10))
y_max = int(min(im_height, y_max + abs(y_min - y_max) / 10))
x_min = int(max(0, x_min - abs(x_min - x_max) / 5))
x_max = int(min(im_width, x_max + abs(x_min - x_max) / 4)) # 5
x_max = int(min(x_max, im_width))
return y_min, x_min, y_max, x_max
def linear_assignment(cost_matrix):
try:
import lap
_, x, y = lap.lapjv(cost_matrix, extend_cost=True)
return np.array([[y[i], i] for i in x if i >= 0])
except ImportError:
from scipy.optimize import linear_sum_assignment
x, y = linear_sum_assignment(cost_matrix)
return np.array(list(zip(x, y)))
def iou_batch(bb_test, bb_gt):
"""
From SORT: Computes IUO between two bboxes in the form [x1,y1,x2,y2]
Args:
:bb_test ():
:bb_gt ():
Returns:
"""
# print(bb_test, bb_gt)
bb_gt = np.expand_dims(bb_gt, 0)
bb_test = np.expand_dims(bb_test, 1)
xx1 = np.maximum(bb_test[..., 0], bb_gt[..., 0])
yy1 = np.maximum(bb_test[..., 1], bb_gt[..., 1])
xx2 = np.minimum(bb_test[..., 2], bb_gt[..., 2])
yy2 = np.minimum(bb_test[..., 3], bb_gt[..., 3])
w = np.maximum(0., xx2 - xx1)
h = np.maximum(0., yy2 - yy1)
wh = w * h
o = wh / ((bb_test[..., 2] - bb_test[..., 0]) * (bb_test[..., 3] - bb_test[..., 1]) + (bb_gt[..., 2] - bb_gt[..., 0]) * (
bb_gt[..., 3] - bb_gt[..., 1]) - wh)
return o
def convert_bbox_to_z(bbox):
"""
Takes a bounding box in the form [x1,y1,x2,y2] and returns z in the form [x,y,s,r] where x,y is the centre of the box and s is the scale/area and r is
the aspect ratio
Args:
:bbox ():
Returns:
"""
w = bbox[2] - bbox[0]
h = bbox[3] - bbox[1]
x = bbox[0] + w / 2.
y = bbox[1] + h / 2.
s = w * h # scale is just area
r = w / float(h) if float(h) != 0 else w
return np.array([x, y, s, r]).reshape((4, 1))
def convert_x_to_bbox(x, score=None):
"""
Takes a bounding box in the centre form [x,y,s,r] and returns it in the form
[x1,y1,x2,y2] where x1,y1 is the top left and x2,y2 is the bottom right
Args:
:x ():
:score ():
(Default is None)
Returns:
"""
w = np.sqrt(x[2] * x[3])
h = x[2] / w
if score is None:
return np.array([x[0] - w / 2., x[1] - h / 2., x[0] + w / 2., x[1] + h / 2.]).reshape((1, 4))
else:
return np.array([x[0] - w / 2., x[1] - h / 2., x[0] + w / 2., x[1] + h / 2., score]).reshape((1, 5))
def associate_detections_to_trackers(detections, trackers, iou_threshold=0.3):
"""
Assigns detections to tracked object (both represented as bounding boxes)
Returns 3 lists of matches, unmatched_detections and unmatched_trackers
Args:
:detections ():
:trackers ():
:iou_threshold ():
(Default is 0.3)
Returns:
"""
if len(trackers) == 0:
return np.empty((0, 2), dtype=int), np.arange(len(detections)), np.empty((0, 5), dtype=int)
iou_matrix = iou_batch(detections, trackers)
# print("IOU MATRIX: ", iou_matrix)
if min(iou_matrix.shape) > 0:
a = (iou_matrix > iou_threshold).astype(np.int32)
if a.sum(1).max() == 1 and a.sum(0).max() == 1:
matched_indices = np.stack(np.where(a), axis=1)
else:
matched_indices = linear_assignment(-iou_matrix)
else:
matched_indices = np.empty(shape=(0, 2))
unmatched_detections = []
for d, det in enumerate(detections):
if d not in matched_indices[:, 0]:
unmatched_detections.append(d)
unmatched_trackers = []
for t, trk in enumerate(trackers):
if t not in matched_indices[:, 1]:
unmatched_trackers.append(t)
# filter out matched with low IOU
matches = []
for m in matched_indices:
if iou_matrix[m[0], m[1]] < iou_threshold:
unmatched_detections.append(m[0])
unmatched_trackers.append(m[1])
else:
matches.append(m.reshape(1, 2))
if len(matches) == 0:
matches = np.empty((0, 2), dtype=int)
else:
matches = np.concatenate(matches, axis=0)
return matches, np.array(unmatched_detections), np.array(unmatched_trackers)
def find_face_from_key_points(key_points, bboxes, image, person=None, openpose=False, gazefollow=True):
"""
Args:
key_points:
bboxes:
image:
person:
openpose:
gazefollow:
Returns:
"""
im_width, im_height = image.shape[1], image.shape[0]
# key_points, bboxes = person.get_key_points()[-1], person.get_bboxes()[-1]
# print("PERSON ID:", person.get_id())
# 0 nose, 1/2 left/right eye, 3/4 left/right ear
# 5/6 leftShoulder/rightShoulder
# 7/8 leftElbow/rightElbow
# 9/10 leftWrist/rightWrist
# 11/12 leftHip/rightHip
# 13/14 leftKnee/rightKnee
# 15/16 leftAnkle/rightAnkle
# print(key_points)
face_points = key_points[:7]
if openpose:
face_points = []
for point in key_points[:7]:
# print(point[2], type(point[2]))
if point[2] > 0.0:
face_points.append(point)
# print("face1", face_points)
if len(face_points) == 0:
return None, []
# print("bboxe", bboxes, face_points)
if not gazefollow:
ct = compute_centroid(face_points)
x_min, y_min = ct[0] - 10, ct[1] - 15
x_max, y_max = ct[0] + 10, ct[1] + 10
y_min_bbox = y_min
elif gazefollow:
# [l_shoulder, r_shoulder] = key_points[5:]
# print(l_shoulder, r_shoulder)
print("FACE", face_points)
if len(face_points) == 1:
return None, []
x_min, y_min, _ = np.amin(face_points, axis=0)
x_max, y_max, _ = np.amax(face_points, axis=0)
# aux_diff =
# print("X: ", aux_diff)
# if aux_diff < 20:
# x_max += 20
# x_min -= 20
aux_diff = y_max - y_min
print("y: ", aux_diff)
if aux_diff < 50: # rapporto xmax -xmin o altro
y_max += (x_max - x_min) / 1.4
y_min -= (x_max - x_min) / 1.2
# x_min -= 10
# x_max += 10
y_min_bbox = int(y_min) # int(bboxes[1]) if bboxes is not None else y_min - (x_max-x_min)
# if bboxes is None:
# y_max = y_max + (x_max-x_min)
y_min, x_min, y_max, x_max = enlarge_bb(y_min_bbox, x_min, y_max, x_max, im_width, im_height)
# print(y_min, x_min, y_max, x_max, y_max - y_min, x_max - x_min)
# if -1 < y_max - y_min < 5 and -1 < x_max - x_min < 5: # due punti uguali
# # print("AAAAA")
# return None, []
face_image = image[y_min:y_max, x_min:x_max]
if person is not None:
# person.print_()
person.update_faces(face_image)
person.update_faces_coordinates([y_min, x_min, y_max, x_max])
# person.update_faces_key_points(face_points)
# person.print_()
return None
else:
return face_image, [y_min, x_min, y_max, x_max]
def compute_interaction_cosine(head_position, target_position, gaze_direction):
"""
Computes the interaction between two people using the angle of view.
The interaction in measured as the cosine of the angle formed by the line from person A to B and the gaze direction of person A.
Args:
:head_position (list): list of pixel coordinates [x, y] that represents the position of the head of person A
:target_position (list): list of pixel coordinates [x, y] that represents the position of head of person B
:gaze_direction (list): list that represents the gaze direction of the head of person A in the form [gx, gy]
Returns:
:val (float): value that describe the quantity of interaction
"""
if head_position == target_position:
return 0 # or -1
else:
# direction from observer to target
direction = np.arctan2((target_position[1] - head_position[1]), (target_position[0] - head_position[0]))
direction_gaze = np.arctan2(gaze_direction[1], gaze_direction[0])
difference = direction - direction_gaze
# difference of the line joining observer -> target with the gazing direction,
val = np.cos(difference)
if val < 0:
return 0
else:
return val
def compute_attention_from_vectors(list_objects):
"""
Args:
:list_objects ():
Returns:
"""
dict_person = dict()
id_list = []
for obj in list_objects:
if len(obj.get_key_points()) > 0:
# print("Object ID: ", obj.get_id(), "x: ", obj.get_poses_vector_norm()[-1][0], "y: ", obj.get_poses_vector_norm()[-1][1])
id_list.append(obj.get_id())
# print("kpts: ", obj.get_key_points()[-1])
aux = [obj.get_key_points()[-1][j][:2] for j in [0, 2, 1, 4, 3]]
dict_person[obj.get_id()] = [obj.get_poses_vector_norm()[-1], np.mean(aux, axis=0).tolist()]
attention_matrix = np.zeros((len(dict_person), len(dict_person)), dtype=np.float32)
for i in range(attention_matrix.shape[0]):
for j in range(attention_matrix.shape[1]):
if i == j:
continue
attention_matrix[i][j] = compute_interaction_cosine(dict_person[i][1], dict_person[j][1], dict_person[i][0])
return attention_matrix.tolist(), id_list
def compute_attention_ypr(list_objects):
"""
Args:
:list_objects ():
Returns:
:
"""
for obj in list_objects:
if len(obj.get_key_points()) > 0:
print("Object ID: ", obj.get_id(), "yaw: ", obj.get_poses_ypr()[-1][0], "pitch: ", obj.get_poses_ypr()[-1][1], "roll: ",
obj.get_poses_ypr()[-1][2])
def save_key_points_to_json(ids, kpts, path_json, openpose=False):
"""
Save key points to .json format according to Openpose output format
Args:
:kpts ():
:path_json ():
Returns:
"""
# print(path_json)
dict_file = {"version": 1.3}
list_dict_person = []
for j in range(len(kpts)):
dict_person = {"person_id": [int(ids[j])],
"face_keypoints_2d": [],
"hand_left_keypoints_2d": [],
"hand_right_keypoints_2d": [],
"pose_keypoints_3d": [],
"face_keypoints_3d": [],
"hand_left_keypoints_3d": [],
"hand_right_keypoints_3d": []}
kpts_openpose = np.zeros((25, 3))
for i, point in enumerate(kpts[j]):
if openpose:
idx_op = rev_pose_id_part_openpose[pose_id_part_openpose[i]]
else:
idx_op = rev_pose_id_part_openpose[pose_id_part[i]]
# print(idx_op, point[1], point[0], point[2])
kpts_openpose[idx_op] = [point[1], point[0], point[2]] # x, y, conf
list_kpts_openpose = list(np.concatenate(kpts_openpose).ravel())
dict_person["pose_keypoints_2d"] = list_kpts_openpose
# print(dict_person)
list_dict_person.append(dict_person)
dict_file["people"] = list_dict_person
# Serializing json
json_object = json.dumps(dict_file, indent=4)
# Writing to sample.json
with open(path_json, "w") as outfile:
outfile.write(json_object)
def json_to_poses(json_data):
"""
Args:
:js_data ():
Returns:
:res ():
"""
poses = []
confidences = []
ids = []
for arr in json_data["people"]:
ids.append(arr["person_id"])
confidences.append(arr["pose_keypoints_2d"][2::3])
aux = arr["pose_keypoints_2d"][2::3]
arr = np.delete(arr["pose_keypoints_2d"], slice(2, None, 3))
# print("B", list(zip(arr[::2], arr[1::2])))
poses.append(list(zip(arr[::2], arr[1::2], aux)))
return poses, confidences, ids
def parse_json1(aux):
# print(aux['people'])
list_kpts = []
id_list = []
for person in aux['people']:
# print(len(person['pose_keypoints_2d']))
aux = person['pose_keypoints_2d']
aux_kpts = [[aux[i+1], aux[i], aux[i+2]] for i in range(0, 75, 3)]
# print(len(aux_kpts))
list_kpts.append(aux_kpts)
id_list.append(person['person_id'])
# print(list_kpts)
return list_kpts, id_list
def load_poses_from_json1(json_filename):
"""
Args:
:json_filename ():
Returns:
:poses, conf:
"""
with open(json_filename) as data_file:
loaded = json.load(data_file)
zz = parse_json1(loaded)
return zz
def load_poses_from_json(json_filename):
"""
Args:
:json_filename ():
Returns:
:poses, conf:
"""
with open(json_filename) as data_file:
loaded = json.load(data_file)
poses, conf, ids = json_to_poses(loaded)
if len(poses) < 1: # != 1:
return None, None, None
else:
return poses, conf, ids
def compute_head_features(img, pose, conf, open_pose=True):
"""
Args:
img:
pose:
conf:
open_pose:
Returns:
"""
joints = [0, 15, 16, 17, 18] if open_pose else [0, 2, 1, 4, 3]
n_joints_set = [pose[joint] for joint in joints if joint_set(pose[joint])] # if open_pose else pose
if len(n_joints_set) < 1:
return None, None
centroid = compute_centroid(n_joints_set)
# for j in n_joints_set:
# print(j, centroid)
max_dist = max([dist_2D([j[0], j[1]], centroid) for j in n_joints_set])
new_repr = [(np.array([pose[joint][0], pose[joint][1]]) - np.array(centroid)) for joint in joints] if open_pose else [
(np.array(pose[i]) - np.array(centroid)) for i in range(len(n_joints_set))]
result = []
for i in range(0, 5):
if joint_set(pose[joints[i]]):
if max_dist != 0.0:
result.append([new_repr[i][0] / max_dist, new_repr[i][1] / max_dist])
else:
result.append([new_repr[i][0], new_repr[i][1]])
else:
result.append([0, 0])
flat_list = [item for sublist in result for item in sublist]
conf_list = []
for j in joints:
conf_list.append(conf[j])
return flat_list, conf_list, centroid
def compute_body_features(pose, conf):
"""
Args:
pose:
conf:
Returns:
"""
joints = [0, 15, 16, 17, 18]
alljoints = range(0, 25)
n_joints_set = [pose[joint] for joint in joints if joint_set(pose[joint])]
if len(n_joints_set) < 1:
return None, None
centroid = compute_centroid(n_joints_set)
n_joints_set = [pose[joint] for joint in alljoints if joint_set(pose[joint])]
max_dist = max([dist_2D(j, centroid) for j in n_joints_set])
new_repr = [(np.array(pose[joint]) - np.array(centroid)) for joint in alljoints]
result = []
for i in range(0, 25):
if joint_set(pose[i]):
result.append([new_repr[i][0] / max_dist, new_repr[i][1] / max_dist])
else:
result.append([0, 0])
flat_list = [item for sublist in result for item in sublist]
for j in alljoints:
flat_list.append(conf[j])
return flat_list, centroid
def compute_centroid(points):
"""
Args:
points:
Returns:
"""
x, y = [], []
for point in points:
if len(point) == 3:
if point[2] > 0.0:
x.append(point[0])
y.append(point[1])
else:
x.append(point[0])
y.append(point[1])
# print(x, y)
if x == [] or y == []:
return [None, None]
mean_x = np.mean(x)
mean_y = np.mean(y)
return [mean_x, mean_y]
def joint_set(p):
"""
Args:
p:
Returns:
"""
return p[0] != 0.0 or p[1] != 0.0
def dist_2D(p1, p2):
"""
Args:
p1:
p2:
Returns:
"""
# print(p1)
# print(p2)
p1 = np.array(p1)
p2 = np.array(p2)
squared_dist = np.sum((p1 - p2) ** 2, axis=0)
return np.sqrt(squared_dist)
def compute_head_centroid(pose):
"""
Args:
pose:
Returns:
"""
joints = [0, 15, 16, 17, 18]
n_joints_set = [pose[joint] for joint in joints if joint_set(pose[joint])]
# if len(n_joints_set) < 2:
# return None
centroid = compute_centroid(n_joints_set)
return centroid
def head_direction_to_json(path_json, norm_list, unc_list, ids_list, file_name):
dict_file = {}
list_dict_person = []
for k, i in enumerate(norm_list):
dict_person = {"id_person": [ids_list[k]],
"norm_xy": [i[0][0].item(), i[0][1].item()], # from numpy to native python type for json serilization
"center_xy": [int(i[1][0]), int(i[1][1])],
"uncertainty": [unc_list[k].item()]}
list_dict_person.append(dict_person)
dict_file["people"] = list_dict_person
json_object = json.dumps(dict_file, indent=4)
with open(path_json, "w") as outfile:
outfile.write(json_object)
def ypr_to_json(path_json, yaw_list, pitch_list, roll_list, yaw_u_list, pitch_u_list, roll_u_list, ids_list, center_xy):
dict_file = {}
list_dict_person = []
for k in range(len(yaw_list)):
dict_person = {"id_person": [ids_list[k]],
"yaw": [yaw_list[k].item()],
"yaw_u": [yaw_u_list[k].item()],
"pitch": [pitch_list[k].item()],
"pitch_u": [pitch_u_list[k].item()],
"roll": [roll_list[k].item()],
"roll_u": [roll_u_list[k].item()],
"center_xy": [int(center_xy[k][0]), int(center_xy[k][1])]}
list_dict_person.append(dict_person)
dict_file["people"] = list_dict_person
json_object = json.dumps(dict_file, indent=4)
with open(path_json, "w") as outfile:
outfile.write(json_object)
# exit()
def save_keypoints_image(img, poses, suffix_, path_save=''):
"""
Save the image with the key points drawn on it
Args:
img:
poses:
suffix_:
Returns:
"""
aux = img.copy()
for point in poses:
for i, p in enumerate(point):
if i in [0, 15, 16, 17, 18]:
cv2.circle(aux, (int(p[0]), int(p[1])), 2, (0, 255, 0), 2)
cv2.imwrite(os.path.join(path_save, suffix_ + '.jpg'), aux)
def unit_vector(vector):
"""
Returns the unit vector of the vector.
Args:
vector:
Returns:
"""
return vector / np.linalg.norm(vector)
def angle_between(v1, v2):
"""
Returns the angle in radians between vectors 'v1' and 'v2'::
angle_between((1, 0, 0), (0, 1, 0))
1.5707963267948966
angle_between((1, 0, 0), (1, 0, 0))
0.0
angle_between((1, 0, 0), (-1, 0, 0))
3.141592653589793
"""
# if not unit vector
v1_u = unit_vector(tuple(v1))
v2_u = unit_vector(tuple(v2))
angle = np.arccos(np.clip(np.dot(v1_u, v2_u), -1.0, 1.0))
return angle if angle < 1.80 else angle - 1.80
def centroid_constraint(centroid, centroid_det, gazefollow=False): # x y
"""
Args:
centroid:
centroid_det:
Returns:
"""
if centroid_det == [None, None]:
return False
if gazefollow == False:
if 0 < centroid_det[0] < 143 and 0 < centroid_det[1] < 24: # centroid in the overprinted text of hour in the video
return False
if 0 < centroid_det[1] < 4:
return False
if centroid[0] - 3 < centroid_det[0] < centroid[0] + 3 and centroid[1] - 3 < centroid_det[1] < centroid[
1] + 3: # detected centroid near the gt centroid
return True
else:
return False
else:
if int(centroid[0] - 30) < int(centroid_det[0]) < int(centroid[0] + 30) and int(centroid[1] - 30) < int(centroid_det[1]) < int(
centroid[1] + 30): # detected centroid near the gt centroid
return True
else:
return False
def initialize_video_reader(path_video):
"""
Args:
path_video:
Returns:
"""
cap = cv2.VideoCapture(path_video)
if cap is None or not cap.isOpened():
print('Warning: unable to open video source: ', path_video)
exit(-1)
return cap
def distance_skeletons(kpts1, kpts2, dst_type):
"""
Function to compute the distance between skeletons
#TO DO
Args:
kpts1:
kpts2:
dts_type:
Returns:
"""
if len(kpts1) != len(kpts2):
print("Error: Different notation used for keypoints")
exit(-1)
print(len(kpts1), len(kpts2))
# to openpose notations
if len(kpts1) == len(kpts2) == 17:
kpts1, kpts2 = kpt_centernet_to_openpose(kpts1), kpt_centernet_to_openpose(kpts2)
print(len(kpts1), len(kpts2))
if len(kpts1) != 25 or len(kpts2) != 25:
print("Error")
exit(-1)
res_dist = 0
if dst_type == 'all_points':
for i, _ in enumerate(kpts1):
res_dist += dist_2D(kpts1[i][:2], kpts2[i][:2])
res_dist /= 25
return res_dist
elif dst_type == 'head_centroid':
top1_c, top2_c = compute_head_centroid(kpts1), compute_head_centroid(kpts2)
if top1_c == [None, None] or top2_c == [None, None]:
res_dist = 900
else:
res_dist = dist_2D(top1_c[:2], top2_c[:2])
return res_dist
elif dst_type == 'three_centroids':
#TO DO
# top1_c, top2_c = compute_centroid(kpts1[0, 15, 16, 17, 18]), compute_centroid(kpts2[0, 15, 16, 17, 18])
# mid1_c, mid2_c = compute_centroid(kpts1[2, 5, 9, 12]), compute_centroid(kpts2[2, 5, 9, 12])
# btm1_c, btm2_c = compute_centroid(kpts1[9, 12, 10, 13]), compute_centroid(kpts2[9, 12, 10, 13])
# res_dist = dist_2D(top1_c[:2], top2_c[:2]) + dist_2D(mid1_c[:2], mid2_c[:2]) + dist_2D(btm1_c[:2], btm2_c[:2])
# res_dist /= 3
# return res_dist
return None
elif dst_type == '':
print("dst_typ not valid")
exit(-1)
def kpt_openpose_to_centernet(kpts):
"""
Args:
kpts:
Returns:
"""
#TO TEST
kpts_openpose = np.zeros((16, 3))
for i, point in enumerate(kpts):
idx_op = rev_pose_id_part[pose_id_part_openpose[i]]
kpts_openpose[idx_op] = [point[0], point[1], point[2]]
return kpts_openpose
def kpt_centernet_to_openpose(kpts):
"""
Args:
kpts:
Returns:
"""
#TO TEST
kpts_openpose = np.zeros((25, 3))
for i, point in enumerate(kpts):
idx_op = rev_pose_id_part_openpose[pose_id_part[i]]
kpts_openpose[idx_op] = [point[1], point[0], point[2]]
return kpts_openpose
def non_maxima_aux(det, kpt, threshold=15): # threshold in pxels
# print("A", kpt, "\n", len(kpt))
indexes_to_delete = []
if len(kpt) == 0 or len(det) == 0:
return [], []
if len(kpt) == 1 or len(det) == 1:
return det, kpt
kpt_res = kpt.copy()
det_res_aux = det.copy()
for i in range(0, len(kpt)):
for j in range(i, len(kpt)):
if i == j:
continue
dist = distance_skeletons(kpt[i], kpt[j], 'head_centroid')
# print("DIST", i, j, dist)
if dist < threshold:
if j not in indexes_to_delete:
indexes_to_delete.append(j)
# kpt_res.pop(j)
det_res = []
# print(indexes_to_delete)
indexes_to_delete = sorted(indexes_to_delete, reverse=True)
# print(len(kpt_res))
for index in indexes_to_delete:
kpt_res.pop(index)
det_res_aux = list(np.delete(det_res_aux, indexes_to_delete, axis=0))
det_res = np.array(det_res_aux)
return det_res, kpt_res
def compute_centroid_list(points):
"""
Args:
points:
Returns:
"""
x, y = [], []
for i in range(0, len(points), 3):
if points[i + 2] > 0.0: # confidence openpose
x.append(points[i])
y.append(points[i + 1])
if x == [] or y == []:
return [None, None]
mean_x = np.mean(x)
mean_y = np.mean(y)
return [mean_x, mean_y]
def normalize_wrt_maximum_distance_point(points, file_name=''):
centroid = compute_centroid_list(points)
# centroid = [points[0], points[1]]
# print(centroid)
# exit()
max_dist_x, max_dist_y = 0, 0
for i in range(0, len(points), 3):
if points[i + 2] > 0.0: # confidence openpose take only valid keypoints (if not detected (0, 0, 0)
distance_x = abs(points[i] - centroid[0])
distance_y = abs(points[i+1] - centroid[1])
# dist_aux.append(distance)
if distance_x > max_dist_x:
max_dist_x = distance_x
if distance_y > max_dist_y:
max_dist_y = distance_y
elif points[i + 2] == 0.0: # check for centernet people on borders with confidence 0
points[i] = 0
points[i+1] = 0
for i in range(0, len(points), 3):
if points[i + 2] > 0.0:
if max_dist_x != 0.0:
points[i] = (points[i] - centroid[0]) / max_dist_x
if max_dist_y != 0.0:
points[i + 1] = (points[i + 1] - centroid[1]) / max_dist_y
if max_dist_x == 0.0: # only one point valid with some confidence value so it become (0,0, confidence)
points[i] = 0.0
if max_dist_y == 0.0:
points[i + 1] = 0.0
return points
def retrieve_interest_points(kpts, detector):
"""
:param kpts:
:return:
"""
res_kpts = []
if detector == 'centernet':
face_points = [0, 1, 2, 3, 4]
for index in face_points:
res_kpts.append(kpts[index][1])
res_kpts.append(kpts[index][0])
res_kpts.append(kpts[index][2])
elif detector== 'zedcam':
face_points = [0, 14, 15, 16, 17]
for index in face_points:
res_kpts.append(kpts[index][0])
res_kpts.append(kpts[index][1])
res_kpts.append(kpts[index][2])
else:
# take only interest points (5 points of face)
face_points = [0, 16, 15, 18, 17]
for index in face_points:
res_kpts.append(kpts[index][0])
res_kpts.append(kpts[index][1])
res_kpts.append(kpts[index][2])
return res_kpts
def create_bbox_from_openpose_keypoints(data):
# from labels import pose_id_part_openpose
bbox = list()
ids = list()
kpt = list()
kpt_scores = list()
for person in data['people']:
ids.append(person['person_id'][0])
kpt_temp = list()
kpt_score_temp = list()
# create bbox with min max each dimension
x, y = [], []
for i in pose_id_part_openpose:
if i < 25:
# kpt and kpts scores
kpt_temp.append([int(person['pose_keypoints_2d'][i * 3]), int(person['pose_keypoints_2d'][(i * 3) + 1]),
person['pose_keypoints_2d'][(i * 3) + 2]])
kpt_score_temp.append(person['pose_keypoints_2d'][(i * 3) + 2])
# check confidence != 0
if person['pose_keypoints_2d'][(3 * i) + 2]!=0:
x.append(int(person['pose_keypoints_2d'][3 * i]))
y.append(int(person['pose_keypoints_2d'][(3 * i) + 1]))
kpt_scores.append(kpt_score_temp)
kpt.append(kpt_temp)
xmax = max(x)
xmin = min(x)
ymax = max(y)
ymin = min(y)
bbox.append([xmin, ymin, xmax, ymax, 1]) # last value is for compatibility of centernet
return bbox, kpt, kpt_scores # not to use scores
def atoi(text):
return int(text) if text.isdigit() else text
def natural_keys(text):
"""
alist.sort(key=natural_keys) sorts in human order
http://nedbatchelder.com/blog/200712/human_sorting.html
(See Toothy's implementation in the comments)
"""
import re
return [atoi(c) for c in re.split(r'(\d+)', text)]