# Copyright (c) OpenMMLab. All rights reserved. import os.path as osp import warnings import numpy as np import torch from mmdet.core import bbox2result from mmdet.models import BaseDetector class DeployBaseDetector(BaseDetector): """DeployBaseDetector.""" def __init__(self, class_names, device_id): super(DeployBaseDetector, self).__init__() self.CLASSES = class_names self.device_id = device_id def simple_test(self, img, img_metas, **kwargs): raise NotImplementedError('This method is not implemented.') def aug_test(self, imgs, img_metas, **kwargs): raise NotImplementedError('This method is not implemented.') def extract_feat(self, imgs): raise NotImplementedError('This method is not implemented.') def forward_train(self, imgs, img_metas, **kwargs): raise NotImplementedError('This method is not implemented.') def val_step(self, data, optimizer): raise NotImplementedError('This method is not implemented.') def train_step(self, data, optimizer): raise NotImplementedError('This method is not implemented.') def forward_test(self, *, img, img_metas, **kwargs): raise NotImplementedError('This method is not implemented.') def async_simple_test(self, img, img_metas, **kwargs): raise NotImplementedError('This method is not implemented.') def forward(self, img, img_metas, return_loss=True, **kwargs): outputs = self.forward_test(img, img_metas, **kwargs) batch_dets, batch_labels = outputs[:2] batch_masks = outputs[2] if len(outputs) == 3 else None batch_size = img[0].shape[0] img_metas = img_metas[0] results = [] rescale = kwargs.get('rescale', True) for i in range(batch_size): dets, labels = batch_dets[i], batch_labels[i] if rescale: scale_factor = img_metas[i]['scale_factor'] if isinstance(scale_factor, (list, tuple, np.ndarray)): assert len(scale_factor) == 4 scale_factor = np.array(scale_factor)[None, :] # [1,4] dets[:, :4] /= scale_factor if 'border' in img_metas[i]: # offset pixel of the top-left corners between original image # and padded/enlarged image, 'border' is used when exporting # CornerNet and CentripetalNet to onnx x_off = img_metas[i]['border'][2] y_off = img_metas[i]['border'][0] dets[:, [0, 2]] -= x_off dets[:, [1, 3]] -= y_off dets[:, :4] *= (dets[:, :4] > 0).astype(dets.dtype) dets_results = bbox2result(dets, labels, len(self.CLASSES)) if batch_masks is not None: masks = batch_masks[i] img_h, img_w = img_metas[i]['img_shape'][:2] ori_h, ori_w = img_metas[i]['ori_shape'][:2] masks = masks[:, :img_h, :img_w] if rescale: masks = masks.astype(np.float32) masks = torch.from_numpy(masks) masks = torch.nn.functional.interpolate( masks.unsqueeze(0), size=(ori_h, ori_w)) masks = masks.squeeze(0).detach().numpy() if masks.dtype != bool: masks = masks >= 0.5 segms_results = [[] for _ in range(len(self.CLASSES))] for j in range(len(dets)): segms_results[labels[j]].append(masks[j]) results.append((dets_results, segms_results)) else: results.append(dets_results) return results class ONNXRuntimeDetector(DeployBaseDetector): """Wrapper for detector's inference with ONNXRuntime.""" def __init__(self, onnx_file, class_names, device_id): super(ONNXRuntimeDetector, self).__init__(class_names, device_id) import onnxruntime as ort # get the custom op path ort_custom_op_path = '' try: from mmcv.ops import get_onnxruntime_op_path ort_custom_op_path = get_onnxruntime_op_path() except (ImportError, ModuleNotFoundError): warnings.warn('If input model has custom op from mmcv, \ you may have to build mmcv with ONNXRuntime from source.') session_options = ort.SessionOptions() # register custom op for onnxruntime if osp.exists(ort_custom_op_path): session_options.register_custom_ops_library(ort_custom_op_path) sess = ort.InferenceSession(onnx_file, session_options) providers = ['CPUExecutionProvider'] options = [{}] is_cuda_available = ort.get_device() == 'GPU' if is_cuda_available: providers.insert(0, 'CUDAExecutionProvider') options.insert(0, {'device_id': device_id}) sess.set_providers(providers, options) self.sess = sess self.io_binding = sess.io_binding() self.output_names = [_.name for _ in sess.get_outputs()] self.is_cuda_available = is_cuda_available def forward_test(self, imgs, img_metas, **kwargs): input_data = imgs[0] # set io binding for inputs/outputs device_type = 'cuda' if self.is_cuda_available else 'cpu' if not self.is_cuda_available: input_data = input_data.cpu() self.io_binding.bind_input( name='input', device_type=device_type, device_id=self.device_id, element_type=np.float32, shape=input_data.shape, buffer_ptr=input_data.data_ptr()) for name in self.output_names: self.io_binding.bind_output(name) # run session to get outputs self.sess.run_with_iobinding(self.io_binding) ort_outputs = self.io_binding.copy_outputs_to_cpu() return ort_outputs class TensorRTDetector(DeployBaseDetector): """Wrapper for detector's inference with TensorRT.""" def __init__(self, engine_file, class_names, device_id, output_names=None): super(TensorRTDetector, self).__init__(class_names, device_id) warnings.warn('`output_names` is deprecated and will be removed in ' 'future releases.') from mmcv.tensorrt import TRTWraper, load_tensorrt_plugin try: load_tensorrt_plugin() except (ImportError, ModuleNotFoundError): warnings.warn('If input model has custom op from mmcv, \ you may have to build mmcv with TensorRT from source.') output_names = ['dets', 'labels'] model = TRTWraper(engine_file, ['input'], output_names) with_masks = False # if TensorRT has totally 4 inputs/outputs, then # the detector should have `mask` output. if len(model.engine) == 4: model.output_names = output_names + ['masks'] with_masks = True self.model = model self.with_masks = with_masks def forward_test(self, imgs, img_metas, **kwargs): input_data = imgs[0].contiguous() with torch.cuda.device(self.device_id), torch.no_grad(): outputs = self.model({'input': input_data}) outputs = [outputs[name] for name in self.model.output_names] outputs = [out.detach().cpu().numpy() for out in outputs] return outputs