|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
import torch |
|
import numpy as np |
|
import time |
|
import cv2 |
|
|
|
def ap_per_class(tp, conf, pred_cls, target_cls): |
|
""" Compute the average precision, given the recall and precision curves. |
|
Method originally from https://github.com/rafaelpadilla/Object-Detection-Metrics. |
|
# Arguments |
|
tp: True positives (list). |
|
conf: Objectness value from 0-1 (list). |
|
pred_cls: Predicted object classes (list). |
|
target_cls: True object classes (list). |
|
# Returns |
|
The average precision as computed in py-faster-rcnn. |
|
""" |
|
|
|
|
|
tp, conf, pred_cls, target_cls = np.array(tp), np.array(conf), np.array(pred_cls), np.array(target_cls) |
|
|
|
|
|
i = np.argsort(-conf) |
|
tp, conf, pred_cls = tp[i], conf[i], pred_cls[i] |
|
|
|
|
|
unique_classes = np.unique(np.concatenate((pred_cls, target_cls), 0)) |
|
|
|
|
|
ap, p, r = [], [], [] |
|
for c in unique_classes: |
|
i = pred_cls == c |
|
n_gt = sum(target_cls == c) |
|
n_p = sum(i) |
|
|
|
if (n_p == 0) and (n_gt == 0): |
|
continue |
|
elif (n_p == 0) or (n_gt == 0): |
|
ap.append(0) |
|
r.append(0) |
|
p.append(0) |
|
else: |
|
|
|
fpc = np.cumsum(1 - tp[i]) |
|
tpc = np.cumsum(tp[i]) |
|
|
|
|
|
recall_curve = tpc / (n_gt + 1e-16) |
|
r.append(tpc[-1] / (n_gt + 1e-16)) |
|
|
|
|
|
precision_curve = tpc / (tpc + fpc) |
|
p.append(tpc[-1] / (tpc[-1] + fpc[-1])) |
|
|
|
|
|
ap.append(compute_ap(recall_curve, precision_curve)) |
|
|
|
return np.array(ap), unique_classes.astype('int32'), np.array(r), np.array(p) |
|
|
|
def compute_ap(recall, precision): |
|
""" Compute the average precision, given the recall and precision curves. |
|
Code originally from https://github.com/rbgirshick/py-faster-rcnn. |
|
# Arguments |
|
recall: The recall curve (list). |
|
precision: The precision curve (list). |
|
# Returns |
|
The average precision as computed in py-faster-rcnn. |
|
""" |
|
|
|
|
|
|
|
mrec = np.concatenate(([0.], recall, [1.])) |
|
mpre = np.concatenate(([0.], precision, [0.])) |
|
|
|
|
|
for i in range(mpre.size - 1, 0, -1): |
|
mpre[i - 1] = np.maximum(mpre[i - 1], mpre[i]) |
|
|
|
|
|
|
|
i = np.where(mrec[1:] != mrec[:-1])[0] |
|
|
|
|
|
ap = np.sum((mrec[i + 1] - mrec[i]) * mpre[i + 1]) |
|
return ap |
|
|
|
|
|
def bbox_iou(box1, box2, x1y1x2y2=False): |
|
""" |
|
Returns the IoU of two bounding boxes |
|
""" |
|
N, M = len(box1), len(box2) |
|
if x1y1x2y2: |
|
|
|
b1_x1, b1_y1, b1_x2, b1_y2 = box1[:, 0], box1[:, 1], box1[:, 2], box1[:, 3] |
|
b2_x1, b2_y1, b2_x2, b2_y2 = box2[:, 0], box2[:, 1], box2[:, 2], box2[:, 3] |
|
else: |
|
|
|
b1_x1, b1_x2 = box1[:, 0] - box1[:, 2] / 2, box1[:, 0] + box1[:, 2] / 2 |
|
b1_y1, b1_y2 = box1[:, 1] - box1[:, 3] / 2, box1[:, 1] + box1[:, 3] / 2 |
|
b2_x1, b2_x2 = box2[:, 0] - box2[:, 2] / 2, box2[:, 0] + box2[:, 2] / 2 |
|
b2_y1, b2_y2 = box2[:, 1] - box2[:, 3] / 2, box2[:, 1] + box2[:, 3] / 2 |
|
|
|
|
|
inter_rect_x1 = torch.max(b1_x1.unsqueeze(1), b2_x1) |
|
inter_rect_y1 = torch.max(b1_y1.unsqueeze(1), b2_y1) |
|
inter_rect_x2 = torch.min(b1_x2.unsqueeze(1), b2_x2) |
|
inter_rect_y2 = torch.min(b1_y2.unsqueeze(1), b2_y2) |
|
|
|
inter_area = torch.clamp(inter_rect_x2 - inter_rect_x1, 0) * torch.clamp(inter_rect_y2 - inter_rect_y1, 0) |
|
|
|
b1_area = ((b1_x2 - b1_x1) * (b1_y2 - b1_y1)).view(-1,1).expand(N,M) |
|
b2_area = ((b2_x2 - b2_x1) * (b2_y2 - b2_y1)).view(1,-1).expand(N,M) |
|
|
|
return inter_area / (b1_area + b2_area - inter_area + 1e-16) |
|
|
|
def xyxy2xywh(x): |
|
|
|
y = torch.zeros(x.shape) if x.dtype is torch.float32 else np.zeros(x.shape) |
|
y[:, 0] = (x[:, 0] + x[:, 2]) / 2 |
|
y[:, 1] = (x[:, 1] + x[:, 3]) / 2 |
|
y[:, 2] = x[:, 2] - x[:, 0] |
|
y[:, 3] = x[:, 3] - x[:, 1] |
|
return y |
|
|
|
|
|
def xywh2xyxy(x): |
|
|
|
y = torch.zeros(x.shape) if x.dtype is torch.float32 else np.zeros(x.shape) |
|
y[:, 0] = (x[:, 0] - x[:, 2] / 2) |
|
y[:, 1] = (x[:, 1] - x[:, 3] / 2) |
|
y[:, 2] = (x[:, 0] + x[:, 2] / 2) |
|
y[:, 3] = (x[:, 1] + x[:, 3] / 2) |
|
return y |
|
|
|
|
|
@torch.no_grad() |
|
def motdet_evaluate(model, data_loader, iou_thres=0.5, print_interval=10): |
|
model.eval() |
|
mean_mAP, mean_R, mean_P, seen = 0.0, 0.0, 0.0, 0 |
|
print('%11s' * 5 % ('Image', 'Total', 'P', 'R', 'mAP')) |
|
outputs, mAPs, mR, mP, TP, confidence, pred_class, target_class, jdict = \ |
|
[], [], [], [], [], [], [], [], [] |
|
AP_accum, AP_accum_count = np.zeros(1), np.zeros(1) |
|
for batch_i, data in enumerate(data_loader): |
|
seen += 1 |
|
if(batch_i > 300): |
|
break |
|
|
|
imgs, _ = data[0].decompose() |
|
|
|
|
|
targets = data[1][0] |
|
|
|
height, width = targets['orig_size'].cpu().numpy().tolist() |
|
t = time.time() |
|
output = model(imgs.cuda()) |
|
outputs_class = output['pred_logits'].squeeze() |
|
if outputs_class.ndim == 1: |
|
|
|
outputs_class = outputs_class.unsqueeze(-1) |
|
outputs_boxes = output['pred_boxes'].squeeze() |
|
target_boxes = targets['boxes'] |
|
|
|
|
|
if target_boxes is None: |
|
|
|
if target_boxes.size(0) != 0: |
|
mAPs.append(0), mR.append(0), mP.append(0) |
|
continue |
|
|
|
|
|
correct = [] |
|
if target_boxes.size(0) == 0: |
|
|
|
mAPs.append(0), mR.append(0), mP.append(0) |
|
continue |
|
else: |
|
target_cls = targets['labels'] |
|
|
|
target_boxes = xywh2xyxy(target_boxes) |
|
target_boxes[:, 0] *= width |
|
target_boxes[:, 2] *= width |
|
target_boxes[:, 1] *= height |
|
target_boxes[:, 3] *= height |
|
|
|
outputs_boxes = xywh2xyxy(outputs_boxes) |
|
outputs_boxes[:, 0] *= width |
|
outputs_boxes[:, 2] *= width |
|
outputs_boxes[:, 1] *= height |
|
outputs_boxes[:, 3] *= height |
|
|
|
detected = set() |
|
print("output_boxes.shape={} class.shape={}".format(outputs_boxes.shape, outputs_class.shape)) |
|
print((outputs_class.sigmoid() > 0.5).sum()) |
|
num_dt = 0 |
|
num_tp = 0 |
|
for *pred_bbox, conf in zip(outputs_boxes, outputs_class): |
|
obj_pred = 0 |
|
pred_bbox = torch.FloatTensor(pred_bbox[0]).view(1, -1) |
|
if conf.sigmoid() > 0.5: |
|
num_dt += 1 |
|
|
|
|
|
iou = bbox_iou(pred_bbox, target_boxes, x1y1x2y2=True)[0] |
|
|
|
best_i = np.argmax(iou) |
|
|
|
if iou[best_i] > iou_thres and obj_pred == int(target_cls[best_i]) and best_i.item() not in detected: |
|
correct.append(1) |
|
if conf.sigmoid() > 0.5: |
|
num_tp += 1 |
|
detected.add(best_i.item()) |
|
else: |
|
correct.append(0) |
|
print("precision={} recall={}".format(num_tp / max(1.0, num_dt), num_tp / max(1.0, len(target_boxes)))) |
|
|
|
AP, AP_class, R, P = ap_per_class(tp=correct, |
|
conf=outputs_class[:, 0].cpu(), |
|
pred_cls=np.zeros_like(outputs_class[:, 0].cpu()), |
|
target_cls=target_cls) |
|
|
|
|
|
AP_accum_count += np.bincount(AP_class, minlength=1) |
|
AP_accum += np.bincount(AP_class, minlength=1, weights=AP) |
|
|
|
|
|
mAPs.append(AP.mean()) |
|
mR.append(R.mean()) |
|
mP.append(P.mean()) |
|
|
|
|
|
mean_mAP = np.sum(mAPs) / (AP_accum_count + 1E-16) |
|
mean_R = np.sum(mR) / (AP_accum_count + 1E-16) |
|
mean_P = np.sum(mP) / (AP_accum_count + 1E-16) |
|
|
|
if batch_i % print_interval == 0: |
|
|
|
print(('%11s%11s' + '%11.3g' * 4 + 's') % |
|
(seen, 100, mean_P, mean_R, mean_mAP, time.time() - t)) |
|
|
|
print('%11s' * 5 % ('Image', 'Total', 'P', 'R', 'mAP')) |
|
|
|
print('AP: %-.4f\n\n' % (AP_accum[0] / (AP_accum_count[0] + 1E-16))) |
|
|
|
|
|
return mean_mAP, mean_R, mean_P |
|
|
|
|
|
def init_metrics(): |
|
mean_mAP, mean_R, mean_P, seen = 0.0, 0.0, 0.0, 0 |
|
outputs, mAPs, mR, mP, TP, confidence, pred_class, target_class, jdict = [], [], [], [], [], [], [], [], [] |
|
AP_accum, AP_accum_count = np.zeros(1), np.zeros(1) |
|
return {'mean_mAP': mean_mAP, |
|
'mean_R': mean_R, |
|
'mean_P': mean_P, |
|
'seen': seen, |
|
'outputs': outputs, |
|
'mAPs': mAPs, |
|
'mR': mR, |
|
'mP': mP, |
|
'TP': TP, |
|
'confidence': confidence, |
|
'pred_class': pred_class, |
|
'target_class': target_class, |
|
'jdict': jdict, |
|
'AP_accum': AP_accum, |
|
'AP_accum_count': AP_accum_count, |
|
} |
|
|
|
|
|
@torch.no_grad() |
|
def detmotdet_evaluate(model, data_loader, device, iou_thres=0.5, print_interval=10): |
|
model.eval() |
|
print('%11s' * 5 % ('Cur Image', 'Total', 'P', 'R', 'mAP')) |
|
|
|
metrics_list = [init_metrics() for i in range(10)] |
|
for batch_i, data in enumerate(data_loader): |
|
if(batch_i > 100): |
|
break |
|
|
|
for key in list(data.keys()): |
|
if isinstance(data[key], list): |
|
data[key] = [img_info.to(device) for img_info in data[key]] |
|
else: |
|
data[key] = data[key].to(device) |
|
output = model(data) |
|
num_frames = len(data['gt_instances']) |
|
for i in range(num_frames): |
|
metrics_i = metrics_list[i] |
|
metrics_i['seen'] += 1 |
|
gt_instances = data['gt_instances'][i].to(torch.device('cpu')) |
|
|
|
height, width = gt_instances.image_size |
|
t = time.time() |
|
outputs_class = output['pred_logits'][i].squeeze() |
|
outputs_boxes = output['pred_boxes'][i].squeeze() |
|
|
|
if outputs_class.ndim == 1: |
|
|
|
outputs_class = outputs_class.unsqueeze(-1) |
|
|
|
target_boxes = gt_instances.boxes |
|
|
|
|
|
if target_boxes is None: |
|
|
|
if target_boxes.size(0) != 0: |
|
metrics_i['mAPs'].append(0) |
|
metrics_i['mR'].append(0) |
|
metrics_i['mP'].append(0) |
|
print('cur_target_boxes is None') |
|
continue |
|
|
|
|
|
|
|
correct = [] |
|
if target_boxes.size(0) == 0: |
|
|
|
metrics_i['mAP'].append(0) |
|
metrics_i['mR'].append(0) |
|
metrics_i['mP'].apppend(0) |
|
print('cur_target_boxes.size(0) == 0') |
|
continue |
|
else: |
|
target_cls = gt_instances.labels |
|
|
|
target_boxes = xywh2xyxy(target_boxes) |
|
target_boxes[:, 0] *= width |
|
target_boxes[:, 2] *= width |
|
target_boxes[:, 1] *= height |
|
target_boxes[:, 3] *= height |
|
|
|
outputs_boxes = xywh2xyxy(outputs_boxes) |
|
outputs_boxes[:, 0] *= width |
|
outputs_boxes[:, 2] *= width |
|
outputs_boxes[:, 1] *= height |
|
outputs_boxes[:, 3] *= height |
|
|
|
detected = [] |
|
for *pred_bbox, conf in zip(outputs_boxes, outputs_class): |
|
obj_pred = 0 |
|
pred_bbox = torch.FloatTensor(pred_bbox[0]).view(1, -1) |
|
|
|
iou = bbox_iou(pred_bbox, target_boxes, x1y1x2y2=True)[0] |
|
|
|
best_i = np.argmax(iou) |
|
|
|
if iou[best_i] > iou_thres and obj_pred == int(target_cls[best_i]) and best_i not in detected: |
|
correct.append(1) |
|
detected.append(best_i) |
|
else: |
|
correct.append(0) |
|
|
|
|
|
AP, AP_class, R, P = ap_per_class(tp=correct, |
|
conf=outputs_class[:, 0].cpu(), |
|
pred_cls=np.zeros_like(outputs_class[:, 0].cpu()), |
|
target_cls=target_cls) |
|
|
|
|
|
metrics_i['AP_accum_count'] += np.bincount(AP_class, minlength=1) |
|
metrics_i['AP_accum'] += np.bincount(AP_class, minlength=1, weights=AP) |
|
|
|
|
|
metrics_i['mAPs'].append(AP.mean()) |
|
metrics_i['mR'].append(R.mean()) |
|
metrics_i['mP'].append(P.mean()) |
|
|
|
|
|
metrics_i['mean_mAP'] = np.sum(metrics_i['mAPs']) / (metrics_i['AP_accum_count'] + 1E-16) |
|
metrics_i['mean_R'] = np.sum(metrics_i['mR']) / (metrics_i['AP_accum_count'] + 1E-16) |
|
metrics_i['mean_P'] = np.sum(metrics_i['mP']) / (metrics_i['AP_accum_count'] + 1E-16) |
|
|
|
if batch_i % print_interval == 0: |
|
|
|
seen = metrics_i['seen'] |
|
mean_P = metrics_i['mean_P'] |
|
mean_R = metrics_i['mean_R'] |
|
mean_mAP = metrics_i['mean_mAP'] |
|
print("res_frame_{}".format(i)) |
|
print(('%11s%11s' + '%11.3g' * 4 + 's') % (seen, 100, mean_P, mean_R, mean_mAP, time.time() - t)) |
|
|
|
|
|
ret = [] |
|
for i in range(2): |
|
mean_mAP = metrics_list[i]['mean_mAP'] |
|
mean_R = metrics_list[i]['mean_R'] |
|
mean_P = metrics_list[i]['mean_P'] |
|
ret.append(mean_mAP) |
|
ret.append(mean_R) |
|
ret.append(mean_P) |
|
return ret |
|
|