pesi
/

rtmo / rtmo_gpu.py
Luigi's picture
Add option --yolo_nas_pose, used to read YOLO NAS Pose model instead of RTMO
1f0f5d8
raw
history blame
17.8 kB
import os
import numpy as np
from typing import List, Tuple
import onnxruntime as ort
import cv2
# dictionary from https://github.com/Tau-J/rtmlib/blob/4b29101d54b611048ef165277cebfffff3030074/rtmlib/visualization/skeleton/coco17.py
coco17 = dict(name='coco17',
keypoint_info={
0:
dict(name='nose', id=0, color=[51, 153, 255], swap=''),
1:
dict(name='left_eye',
id=1,
color=[51, 153, 255],
swap='right_eye'),
2:
dict(name='right_eye',
id=2,
color=[51, 153, 255],
swap='left_eye'),
3:
dict(name='left_ear',
id=3,
color=[51, 153, 255],
swap='right_ear'),
4:
dict(name='right_ear',
id=4,
color=[51, 153, 255],
swap='left_ear'),
5:
dict(name='left_shoulder',
id=5,
color=[0, 255, 0],
swap='right_shoulder'),
6:
dict(name='right_shoulder',
id=6,
color=[255, 128, 0],
swap='left_shoulder'),
7:
dict(name='left_elbow',
id=7,
color=[0, 255, 0],
swap='right_elbow'),
8:
dict(name='right_elbow',
id=8,
color=[255, 128, 0],
swap='left_elbow'),
9:
dict(name='left_wrist',
id=9,
color=[0, 255, 0],
swap='right_wrist'),
10:
dict(name='right_wrist',
id=10,
color=[255, 128, 0],
swap='left_wrist'),
11:
dict(name='left_hip',
id=11,
color=[0, 255, 0],
swap='right_hip'),
12:
dict(name='right_hip',
id=12,
color=[255, 128, 0],
swap='left_hip'),
13:
dict(name='left_knee',
id=13,
color=[0, 255, 0],
swap='right_knee'),
14:
dict(name='right_knee',
id=14,
color=[255, 128, 0],
swap='left_knee'),
15:
dict(name='left_ankle',
id=15,
color=[0, 255, 0],
swap='right_ankle'),
16:
dict(name='right_ankle',
id=16,
color=[255, 128, 0],
swap='left_ankle')
},
skeleton_info={
0:
dict(link=('left_ankle', 'left_knee'),
id=0,
color=[0, 255, 0]),
1:
dict(link=('left_knee', 'left_hip'), id=1, color=[0, 255,
0]),
2:
dict(link=('right_ankle', 'right_knee'),
id=2,
color=[255, 128, 0]),
3:
dict(link=('right_knee', 'right_hip'),
id=3,
color=[255, 128, 0]),
4:
dict(link=('left_hip', 'right_hip'),
id=4,
color=[51, 153, 255]),
5:
dict(link=('left_shoulder', 'left_hip'),
id=5,
color=[51, 153, 255]),
6:
dict(link=('right_shoulder', 'right_hip'),
id=6,
color=[51, 153, 255]),
7:
dict(link=('left_shoulder', 'right_shoulder'),
id=7,
color=[51, 153, 255]),
8:
dict(link=('left_shoulder', 'left_elbow'),
id=8,
color=[0, 255, 0]),
9:
dict(link=('right_shoulder', 'right_elbow'),
id=9,
color=[255, 128, 0]),
10:
dict(link=('left_elbow', 'left_wrist'),
id=10,
color=[0, 255, 0]),
11:
dict(link=('right_elbow', 'right_wrist'),
id=11,
color=[255, 128, 0]),
12:
dict(link=('left_eye', 'right_eye'),
id=12,
color=[51, 153, 255]),
13:
dict(link=('nose', 'left_eye'), id=13, color=[51, 153, 255]),
14:
dict(link=('nose', 'right_eye'), id=14, color=[51, 153,
255]),
15:
dict(link=('left_eye', 'left_ear'),
id=15,
color=[51, 153, 255]),
16:
dict(link=('right_eye', 'right_ear'),
id=16,
color=[51, 153, 255]),
17:
dict(link=('left_ear', 'left_shoulder'),
id=17,
color=[51, 153, 255]),
18:
dict(link=('right_ear', 'right_shoulder'),
id=18,
color=[51, 153, 255])
})
# functions from https://github.com/Tau-J/rtmlib/blob/4b29101d54b611048ef165277cebfffff3030074/rtmlib/visualization/draw.py#L71
def draw_mmpose(img,
keypoints,
scores,
keypoint_info,
skeleton_info,
kpt_thr=0.5,
radius=2,
line_width=2):
assert len(keypoints.shape) == 2
vis_kpt = [s >= kpt_thr for s in scores]
link_dict = {}
for i, kpt_info in keypoint_info.items():
kpt_color = tuple(kpt_info['color'])
link_dict[kpt_info['name']] = kpt_info['id']
kpt = keypoints[i]
if vis_kpt[i]:
img = cv2.circle(img, (int(kpt[0]), int(kpt[1])), int(radius),
kpt_color, -1)
for i, ske_info in skeleton_info.items():
link = ske_info['link']
pt0, pt1 = link_dict[link[0]], link_dict[link[1]]
if vis_kpt[pt0] and vis_kpt[pt1]:
link_color = ske_info['color']
kpt0 = keypoints[pt0]
kpt1 = keypoints[pt1]
img = cv2.line(img, (int(kpt0[0]), int(kpt0[1])),
(int(kpt1[0]), int(kpt1[1])),
link_color,
thickness=line_width)
return img
# with simplification to use onnxruntime only
def draw_skeleton(img,
keypoints,
scores,
kpt_thr=0.5,
radius=2,
line_width=2):
num_keypoints = keypoints.shape[1]
if num_keypoints == 17:
skeleton = 'coco17'
else:
raise NotImplementedError
skeleton_dict = eval(f'{skeleton}')
keypoint_info = skeleton_dict['keypoint_info']
skeleton_info = skeleton_dict['skeleton_info']
if len(keypoints.shape) == 2:
keypoints = keypoints[None, :, :]
scores = scores[None, :, :]
num_instance = keypoints.shape[0]
if skeleton in ['coco17']:
for i in range(num_instance):
img = draw_mmpose(img, keypoints[i], scores[i], keypoint_info,
skeleton_info, kpt_thr, radius, line_width)
else:
raise NotImplementedError
return img
class RTMO_GPU(object):
def preprocess(self, img: np.ndarray):
"""Do preprocessing for RTMPose model inference.
Args:
img (np.ndarray): Input image in shape.
Returns:
tuple:
- resized_img (np.ndarray): Preprocessed image.
- center (np.ndarray): Center of image.
- scale (np.ndarray): Scale of image.
"""
if len(img.shape) == 3:
padded_img = np.ones(
(self.model_input_size[0], self.model_input_size[1], 3),
dtype=np.uint8) * 114
else:
padded_img = np.ones(self.model_input_size, dtype=np.uint8) * 114
ratio = min(self.model_input_size[0] / img.shape[0],
self.model_input_size[1] / img.shape[1])
resized_img = cv2.resize(
img,
(int(img.shape[1] * ratio), int(img.shape[0] * ratio)),
interpolation=cv2.INTER_LINEAR,
).astype(np.uint8)
padded_shape = (int(img.shape[0] * ratio), int(img.shape[1] * ratio))
padded_img[:padded_shape[0], :padded_shape[1]] = resized_img
# normalize image
if self.mean is not None:
self.mean = np.array(self.mean)
self.std = np.array(self.std)
padded_img = (padded_img - self.mean) / self.std
return padded_img, ratio
def postprocess(
self,
outputs: List[np.ndarray],
ratio: float = 1.,
) -> Tuple[np.ndarray, np.ndarray]:
"""Do postprocessing for RTMO model inference.
Args:
outputs (List[np.ndarray]): Outputs of RTMO model.
ratio (float): Ratio of preprocessing.
Returns:
tuple:
- final_boxes (np.ndarray): Final bounding boxes.
- final_scores (np.ndarray): Final scores.
"""
if not self.is_yolo_nas_pose:
# RTMO
det_outputs, pose_outputs = outputs
# onnx contains nms module
pack_dets = (det_outputs[0, :, :4], det_outputs[0, :, 4])
final_boxes, final_scores = pack_dets
final_boxes /= ratio
isscore = final_scores > 0.3
isbbox = [i for i in isscore]
# final_boxes = final_boxes[isbbox]
# decode pose outputs
keypoints, scores = pose_outputs[0, :, :, :2], pose_outputs[0, :, :, 2]
keypoints = keypoints / ratio
keypoints = keypoints[isbbox]
scores = scores[isbbox]
else:
# NAS Pose
flat_predictions = outputs[0]
if flat_predictions.shape[0] > 0: # at least one person found
mask = flat_predictions[:, 0] == 0
pred_bboxes = flat_predictions[mask, 1:5]
pred_joints = flat_predictions[mask, 6:].reshape((len(pred_bboxes), -1, 3))
keypoints, scores = pred_joints[:,:,:2], pred_joints[:,:,-1]
keypoints = keypoints / ratio
else: # no detection
keypoints, scores = np.zeros((0, 17, 2)), np.zeros((0, 17))
return keypoints, scores
def inference(self, img: np.ndarray):
"""Inference model.
Args:
img (np.ndarray): Input image in shape.
Returns:
outputs (np.ndarray): Output of RTMPose model.
"""
# build input to (1, 3, H, W)
img = img.transpose(2, 0, 1)
img = np.ascontiguousarray(img, dtype=np.float32 if not self.is_yolo_nas_pose else np.uint8)
input = img[None, :, :, :]
# Create an IO Binding object
io_binding = self.session.io_binding()
if not self.is_yolo_nas_pose:
# RTMO
io_binding.bind_input(name='input', device_type='cpu', device_id=0, element_type=np.float32, shape=input.shape, buffer_ptr=input.ctypes.data)
io_binding.bind_output(name='dets')
io_binding.bind_output(name='keypoints')
else:
# NAS Pose, flat format
io_binding.bind_input(name='input', device_type='cpu', device_id=0, element_type=np.uint8, shape=input.shape, buffer_ptr=input.ctypes.data)
io_binding.bind_output(name='graph2_flat_predictions')
# Run inference with IO Binding
self.session.run_with_iobinding(io_binding)
# Retrieve the outputs from the IO Binding object
outputs = [output.numpy() for output in io_binding.get_outputs()]
return outputs
def __call__(self, image: np.ndarray):
image, ratio = self.preprocess(image)
outputs = self.inference(image)
keypoints, scores = self.postprocess(outputs, ratio)
return keypoints, scores
def __init__(self,
onnx_model: str = None,
model_input_size: tuple = (640, 640),
mean: tuple = None,
std: tuple = None,
device: str = 'cuda',
is_yolo_nas_pose = False):
if not os.path.exists(onnx_model):
# If the file does not exist, raise FileNotFoundError
raise FileNotFoundError(f"The specified ONNX model file was not found: {onnx_model}")
providers = {'cpu': 'CPUExecutionProvider',
'cuda': [
('TensorrtExecutionProvider', {
'trt_fp16_enable':True,
'trt_engine_cache_enable':True,
'trt_engine_cache_path':'cache'}),
('CUDAExecutionProvider', {
'cudnn_conv_algo_search': 'DEFAULT',
'cudnn_conv_use_max_workspace': True
}),
'CPUExecutionProvider']}
self.session = ort.InferenceSession(path_or_bytes=onnx_model,
providers=providers[device])
self.onnx_model = onnx_model
self.model_input_size = model_input_size
self.mean = mean
self.std = std
self.device = device
self.is_yolo_nas_pose = is_yolo_nas_pose
class RTMO_GPU_Batch(RTMO_GPU):
def preprocess_batch(self, imgs: List[np.ndarray]) -> Tuple[np.ndarray, List[float]]:
"""Process a batch of images for RTMPose model inference.
Args:
imgs (List[np.ndarray]): List of input images.
Returns:
tuple:
- batch_img (np.ndarray): Batch of preprocessed images.
- ratios (List[float]): Ratios used for preprocessing each image.
"""
batch_img = []
ratios = []
for img in imgs:
preprocessed_img, ratio = super().preprocess(img)
batch_img.append(preprocessed_img)
ratios.append(ratio)
# Stack along the first dimension to create a batch
batch_img = np.stack(batch_img, axis=0)
return batch_img, ratios
def inference(self, batch_img: np.ndarray):
"""Override to handle batch inference.
Args:
batch_img (np.ndarray): Batch of preprocessed images.
Returns:
outputs (List[np.ndarray]): Outputs of RTMPose model for each image.
"""
batch_img = batch_img.transpose(0, 3, 1, 2) # NCHW format
batch_img = np.ascontiguousarray(batch_img, dtype=np.float32)
input = batch_img
# Create an IO Binding object
io_binding = self.session.io_binding()
# Bind the model inputs and outputs to the IO Binding object
io_binding.bind_input(name='input', device_type='cpu', device_id=0, element_type=np.float32, shape=input.shape, buffer_ptr=input.ctypes.data)
io_binding.bind_output(name='dets')
io_binding.bind_output(name='keypoints')
# Run inference with IO Binding
self.session.run_with_iobinding(io_binding)
# Retrieve the outputs from the IO Binding object
outputs = [output.numpy() for output in io_binding.get_outputs()]
return outputs
def postprocess_batch(
self,
outputs: List[np.ndarray],
ratios: List[float]
) -> List[Tuple[np.ndarray, np.ndarray]]:
"""Process outputs for a batch of images.
Args:
outputs (List[np.ndarray]): Outputs from the model for each image.
ratios (List[float]): Ratios used for preprocessing each image.
Returns:
List[Tuple[np.ndarray, np.ndarray]]: keypoints and scores for each image.
"""
batch_keypoints = []
batch_scores = []
for i, ratio in enumerate(ratios):
keypoints, scores = super().postprocess(outputs, ratio)
batch_keypoints.append(keypoints)
batch_scores.append(scores)
return batch_keypoints, batch_scores
def __call__(self, images: List[np.ndarray]):
batch_img, ratios = self.preprocess_batch(images)
outputs = self.inference(batch_img)
keypoints, scores = self.postprocess_batch(outputs, ratios)
return keypoints, scores