Spaces:
Runtime error
Runtime error
| import logging | |
| import tempfile | |
| import os | |
| import torch | |
| import numpy as np | |
| import json | |
| from collections import OrderedDict | |
| from tqdm import tqdm | |
| from maskrcnn_benchmark.modeling.roi_heads.mask_head.inference import Masker | |
| from maskrcnn_benchmark.structures.bounding_box import BoxList | |
| from maskrcnn_benchmark.structures.boxlist_ops import boxlist_iou | |
| def do_coco_evaluation( | |
| dataset, | |
| predictions, | |
| box_only, | |
| output_folder, | |
| iou_types, | |
| expected_results, | |
| expected_results_sigma_tol, | |
| ): | |
| logger = logging.getLogger("maskrcnn_benchmark.inference") | |
| if box_only: | |
| logger.info("Evaluating bbox proposals") | |
| if dataset.coco is None and output_folder: | |
| json_results = prepare_for_tsv_detection(predictions, dataset) | |
| with open(os.path.join(output_folder, "box_proposals.json"), "w") as f: | |
| json.dump(json_results, f) | |
| return None | |
| areas = {"all": "", "small": "s", "medium": "m", "large": "l"} | |
| res = COCOResults("box_proposal") | |
| for limit in [100, 1000]: | |
| for area, suffix in areas.items(): | |
| stats = evaluate_box_proposals( | |
| predictions, dataset, area=area, limit=limit | |
| ) | |
| key = "AR{}@{:d}".format(suffix, limit) | |
| res.results["box_proposal"][key] = stats["ar"].item() | |
| logger.info(res) | |
| check_expected_results(res, expected_results, expected_results_sigma_tol) | |
| if output_folder: | |
| torch.save(res, os.path.join(output_folder, "box_proposals.pth")) | |
| return res, predictions | |
| logger.info("Preparing results for COCO format") | |
| coco_results = {} | |
| if "bbox" in iou_types: | |
| logger.info("Preparing bbox results") | |
| if dataset.coco is None: | |
| coco_results["bbox"] = prepare_for_tsv_detection(predictions, dataset) | |
| else: | |
| coco_results["bbox"] = prepare_for_coco_detection(predictions, dataset) | |
| if "segm" in iou_types: | |
| logger.info("Preparing segm results") | |
| coco_results["segm"] = prepare_for_coco_segmentation(predictions, dataset) | |
| if 'keypoints' in iou_types: | |
| logger.info('Preparing keypoints results') | |
| coco_results['keypoints'] = prepare_for_coco_keypoint(predictions, dataset) | |
| results = COCOResults(*iou_types) | |
| logger.info("Evaluating predictions") | |
| for iou_type in iou_types: | |
| with tempfile.NamedTemporaryFile() as f: | |
| file_path = f.name | |
| if output_folder: | |
| file_path = os.path.join(output_folder, iou_type + ".json") | |
| if dataset.coco: | |
| res = evaluate_predictions_on_coco( | |
| dataset.coco, coco_results[iou_type], file_path, iou_type | |
| ) | |
| results.update(res) | |
| elif output_folder: | |
| with open(file_path, "w") as f: | |
| json.dump(coco_results[iou_type], f) | |
| logger.info(results) | |
| check_expected_results(results, expected_results, expected_results_sigma_tol) | |
| if output_folder: | |
| torch.save(results, os.path.join(output_folder, "coco_results.pth")) | |
| return results, coco_results | |
| def prepare_for_tsv_detection(predictions, dataset): | |
| # assert isinstance(dataset, COCODataset) | |
| proposal_results = [] | |
| image_list = [] | |
| for im_id, prediction in enumerate(predictions): | |
| image_info = dataset.get_img_info(im_id) | |
| if len(prediction) == 0: | |
| continue | |
| # TODO replace with get_img_info? | |
| image_id = image_info["id"] | |
| image_width = image_info["width"] | |
| image_height = image_info["height"] | |
| prediction = prediction.resize((image_width, image_height)) | |
| prediction = prediction.convert("xywh") | |
| boxes = prediction.bbox.tolist() | |
| scores = prediction.get_field("scores").tolist() | |
| labels = prediction.get_field("labels").tolist() | |
| if prediction.has_field("centers"): | |
| centers = prediction.get_field("centers") | |
| else: | |
| centers = None | |
| for k, box in enumerate(boxes): | |
| proposal = { | |
| "image_id": image_id, | |
| "category_id": labels[k], | |
| "bbox": box, | |
| "score": scores[k], | |
| "area": image_width * image_height, | |
| "iscrowd": 0, | |
| } | |
| if centers is not None: | |
| proposal.update(center=centers[k].tolist()) | |
| proposal_results.append(proposal) | |
| image_list.append(image_info) | |
| # categories = [{'supercategory': 'proposal', 'id': 0, 'name': 'proposal'}] | |
| return dict(images=image_list, annotations=proposal_results) | |
| def prepare_for_coco_detection(predictions, dataset): | |
| # assert isinstance(dataset, COCODataset) | |
| coco_results = [] | |
| for image_id, prediction in enumerate(predictions): | |
| original_id = dataset.id_to_img_map[image_id] | |
| if len(prediction) == 0: | |
| continue | |
| # TODO replace with get_img_info? | |
| image_width = dataset.coco.imgs[original_id]["width"] | |
| image_height = dataset.coco.imgs[original_id]["height"] | |
| prediction = prediction.resize((image_width, image_height)) | |
| prediction = prediction.convert("xywh") | |
| boxes = prediction.bbox.tolist() | |
| scores = prediction.get_field("scores").tolist() | |
| labels = prediction.get_field("labels").tolist() | |
| for k, box in enumerate(boxes): | |
| if labels[k] in dataset.contiguous_category_id_to_json_id: | |
| coco_results.append( | |
| { | |
| "image_id": original_id, | |
| "category_id": dataset.contiguous_category_id_to_json_id[labels[k]], | |
| "bbox": box, | |
| "score": scores[k], | |
| }) | |
| return coco_results | |
| def prepare_for_coco_segmentation(predictions, dataset): | |
| import pycocotools.mask as mask_util | |
| import numpy as np | |
| masker = Masker(threshold=0.5, padding=1) | |
| # assert isinstance(dataset, COCODataset) | |
| coco_results = [] | |
| for image_id, prediction in tqdm(enumerate(predictions)): | |
| original_id = dataset.id_to_img_map[image_id] | |
| if len(prediction) == 0: | |
| continue | |
| # TODO replace with get_img_info? | |
| image_width = dataset.coco.imgs[original_id]["width"] | |
| image_height = dataset.coco.imgs[original_id]["height"] | |
| prediction = prediction.resize((image_width, image_height)) | |
| masks = prediction.get_field("mask") | |
| # t = time.time() | |
| # Masker is necessary only if masks haven't been already resized. | |
| if list(masks.shape[-2:]) != [image_height, image_width]: | |
| masks = masker(masks.expand(1, -1, -1, -1, -1), prediction) | |
| masks = masks[0] | |
| # logger.info('Time mask: {}'.format(time.time() - t)) | |
| # prediction = prediction.convert('xywh') | |
| # boxes = prediction.bbox.tolist() | |
| scores = prediction.get_field("scores").tolist() | |
| labels = prediction.get_field("labels").tolist() | |
| # rles = prediction.get_field('mask') | |
| rles = [ | |
| mask_util.encode(np.array(mask[0, :, :, np.newaxis], order="F"))[0] | |
| for mask in masks | |
| ] | |
| for rle in rles: | |
| rle["counts"] = rle["counts"].decode("utf-8") | |
| mapped_labels = [dataset.contiguous_category_id_to_json_id[i] for i in labels] | |
| coco_results.extend( | |
| [ | |
| { | |
| "image_id": original_id, | |
| "category_id": mapped_labels[k], | |
| "segmentation": rle, | |
| "score": scores[k], | |
| } | |
| for k, rle in enumerate(rles) | |
| ] | |
| ) | |
| return coco_results | |
| def prepare_for_coco_keypoint(predictions, dataset): | |
| # assert isinstance(dataset, COCODataset) | |
| coco_results = [] | |
| for image_id, prediction in enumerate(predictions): | |
| original_id = dataset.id_to_img_map[image_id] | |
| if len(prediction.bbox) == 0: | |
| continue | |
| # TODO replace with get_img_info? | |
| image_width = dataset.coco.imgs[original_id]['width'] | |
| image_height = dataset.coco.imgs[original_id]['height'] | |
| prediction = prediction.resize((image_width, image_height)) | |
| prediction = prediction.convert('xywh') | |
| boxes = prediction.bbox.tolist() | |
| scores = prediction.get_field('scores').tolist() | |
| labels = prediction.get_field('labels').tolist() | |
| keypoints = prediction.get_field('keypoints') | |
| keypoints = keypoints.resize((image_width, image_height)) | |
| keypoints = keypoints.to_coco_format() | |
| mapped_labels = [dataset.contiguous_category_id_to_json_id[i] for i in labels] | |
| coco_results.extend([{ | |
| 'image_id': original_id, | |
| 'category_id': mapped_labels[k], | |
| 'keypoints': keypoint, | |
| 'score': scores[k]} for k, keypoint in enumerate(keypoints)]) | |
| return coco_results | |
| # inspired from Detectron | |
| def evaluate_box_proposals( | |
| predictions, dataset, thresholds=None, area="all", limit=None | |
| ): | |
| """Evaluate detection proposal recall metrics. This function is a much | |
| faster alternative to the official COCO API recall evaluation code. However, | |
| it produces slightly different results. | |
| """ | |
| # Record max overlap value for each gt box | |
| # Return vector of overlap values | |
| areas = { | |
| "all": 0, | |
| "small": 1, | |
| "medium": 2, | |
| "large": 3, | |
| "96-128": 4, | |
| "128-256": 5, | |
| "256-512": 6, | |
| "512-inf": 7, | |
| } | |
| area_ranges = [ | |
| [0 ** 2, 1e5 ** 2], # all | |
| [0 ** 2, 32 ** 2], # small | |
| [32 ** 2, 96 ** 2], # medium | |
| [96 ** 2, 1e5 ** 2], # large | |
| [96 ** 2, 128 ** 2], # 96-128 | |
| [128 ** 2, 256 ** 2], # 128-256 | |
| [256 ** 2, 512 ** 2], # 256-512 | |
| [512 ** 2, 1e5 ** 2], | |
| ] # 512-inf | |
| assert area in areas, "Unknown area range: {}".format(area) | |
| area_range = area_ranges[areas[area]] | |
| gt_overlaps = [] | |
| num_pos = 0 | |
| for image_id, prediction in enumerate(predictions): | |
| original_id = dataset.id_to_img_map[image_id] | |
| # TODO replace with get_img_info? | |
| image_width = dataset.coco.imgs[original_id]["width"] | |
| image_height = dataset.coco.imgs[original_id]["height"] | |
| prediction = prediction.resize((image_width, image_height)) | |
| # sort predictions in descending order | |
| # TODO maybe remove this and make it explicit in the documentation | |
| if prediction.has_field("objectness"): | |
| inds = prediction.get_field("objectness").sort(descending=True)[1] | |
| else: | |
| inds = prediction.get_field("scores").sort(descending=True)[1] | |
| prediction = prediction[inds] | |
| ann_ids = dataset.coco.getAnnIds(imgIds=original_id) | |
| anno = dataset.coco.loadAnns(ann_ids) | |
| gt_boxes = [obj["bbox"] for obj in anno if obj["iscrowd"] == 0] | |
| gt_boxes = torch.as_tensor(gt_boxes).reshape(-1, 4) # guard against no boxes | |
| gt_boxes = BoxList(gt_boxes, (image_width, image_height), mode="xywh").convert( | |
| "xyxy" | |
| ) | |
| gt_areas = torch.as_tensor([obj["area"] for obj in anno if obj["iscrowd"] == 0]) | |
| if len(gt_boxes) == 0: | |
| continue | |
| valid_gt_inds = (gt_areas >= area_range[0]) & (gt_areas <= area_range[1]) | |
| gt_boxes = gt_boxes[valid_gt_inds] | |
| num_pos += len(gt_boxes) | |
| if len(gt_boxes) == 0: | |
| continue | |
| if len(prediction) == 0: | |
| continue | |
| if limit is not None and len(prediction) > limit: | |
| prediction = prediction[:limit] | |
| overlaps = boxlist_iou(prediction, gt_boxes) | |
| _gt_overlaps = torch.zeros(len(gt_boxes)) | |
| for j in range(min(len(prediction), len(gt_boxes))): | |
| # find which proposal box maximally covers each gt box | |
| # and get the iou amount of coverage for each gt box | |
| max_overlaps, argmax_overlaps = overlaps.max(dim=0) | |
| # find which gt box is 'best' covered (i.e. 'best' = most iou) | |
| gt_ovr, gt_ind = max_overlaps.max(dim=0) | |
| assert gt_ovr >= 0 | |
| # find the proposal box that covers the best covered gt box | |
| box_ind = argmax_overlaps[gt_ind] | |
| # record the iou coverage of this gt box | |
| _gt_overlaps[j] = overlaps[box_ind, gt_ind] | |
| assert _gt_overlaps[j] == gt_ovr | |
| # mark the proposal box and the gt box as used | |
| overlaps[box_ind, :] = -1 | |
| overlaps[:, gt_ind] = -1 | |
| # append recorded iou coverage level | |
| gt_overlaps.append(_gt_overlaps) | |
| if len(gt_overlaps) == 0: | |
| return { | |
| "ar": torch.zeros(1), | |
| "recalls": torch.zeros(1), | |
| "thresholds": thresholds, | |
| "gt_overlaps": gt_overlaps, | |
| "num_pos": num_pos, | |
| } | |
| gt_overlaps = torch.cat(gt_overlaps, dim=0) | |
| gt_overlaps, _ = torch.sort(gt_overlaps) | |
| if thresholds is None: | |
| step = 0.05 | |
| thresholds = torch.arange(0.5, 0.95 + 1e-5, step, dtype=torch.float32) | |
| recalls = torch.zeros_like(thresholds) | |
| # compute recall for each iou threshold | |
| for i, t in enumerate(thresholds): | |
| recalls[i] = (gt_overlaps >= t).float().sum() / float(num_pos) | |
| # ar = 2 * np.trapz(recalls, thresholds) | |
| ar = recalls.mean() | |
| return { | |
| "ar": ar, | |
| "recalls": recalls, | |
| "thresholds": thresholds, | |
| "gt_overlaps": gt_overlaps, | |
| "num_pos": num_pos, | |
| } | |
| def evaluate_predictions_on_coco( | |
| coco_gt, coco_results, json_result_file, iou_type="bbox" | |
| ): | |
| import json | |
| with open(json_result_file, "w") as f: | |
| json.dump(coco_results, f) | |
| from pycocotools.coco import COCO | |
| from pycocotools.cocoeval import COCOeval | |
| coco_dt = coco_gt.loadRes(str(json_result_file)) if coco_results else COCO() | |
| # coco_dt = coco_gt.loadRes(coco_results) | |
| if iou_type == 'keypoints': | |
| coco_gt = filter_valid_keypoints(coco_gt, coco_dt) | |
| coco_eval = COCOeval(coco_gt, coco_dt, iou_type) | |
| coco_eval.evaluate() | |
| coco_eval.accumulate() | |
| coco_eval.summarize() | |
| if iou_type == 'bbox': | |
| summarize_per_category(coco_eval, json_result_file.replace('.json', '.csv')) | |
| return coco_eval | |
| def summarize_per_category(coco_eval, csv_output=None): | |
| ''' | |
| Compute and display summary metrics for evaluation results. | |
| Note this functin can *only* be applied on the default parameter setting | |
| ''' | |
| def _summarize(iouThr=None, areaRng='all', maxDets=100): | |
| p = coco_eval.params | |
| titleStr = 'Average Precision' | |
| typeStr = '(AP)' | |
| iouStr = '{:0.2f}:{:0.2f}'.format(p.iouThrs[0], p.iouThrs[-1]) \ | |
| if iouThr is None else '{:0.2f}'.format(iouThr) | |
| result_str = ' {:<18} {} @[ IoU={:<9} | area={:>6s} | maxDets={:>3d} ], '. \ | |
| format(titleStr, typeStr, iouStr, areaRng, maxDets) | |
| aind = [i for i, aRng in enumerate(p.areaRngLbl) if aRng == areaRng] | |
| mind = [i for i, mDet in enumerate(p.maxDets) if mDet == maxDets] | |
| # dimension of precision: [TxRxKxAxM] | |
| s = coco_eval.eval['precision'] | |
| # IoU | |
| if iouThr is not None: | |
| t = np.where(iouThr == p.iouThrs)[0] | |
| s = s[t] | |
| s = s[:, :, :, aind, mind] | |
| if len(s[s > -1]) == 0: | |
| mean_s = -1 | |
| else: | |
| mean_s = np.mean(s[s > -1]) | |
| # cacluate AP(average precision) for each category | |
| num_classes = len(p.catIds) | |
| avg_ap = 0.0 | |
| for i in range(0, num_classes): | |
| result_str += '{}, '.format(np.mean(s[:, :, i, :])) | |
| avg_ap += np.mean(s[:, :, i, :]) | |
| result_str += ('{} \n'.format(avg_ap / num_classes)) | |
| return result_str | |
| id2name = {} | |
| for _, cat in coco_eval.cocoGt.cats.items(): | |
| id2name[cat['id']] = cat['name'] | |
| title_str = 'metric, ' | |
| for cid in coco_eval.params.catIds: | |
| title_str += '{}, '.format(id2name[cid]) | |
| title_str += 'avg \n' | |
| results = [title_str] | |
| results.append(_summarize()) | |
| results.append(_summarize(iouThr=.5, maxDets=coco_eval.params.maxDets[2])) | |
| results.append(_summarize(areaRng='small', maxDets=coco_eval.params.maxDets[2])) | |
| results.append(_summarize(areaRng='medium', maxDets=coco_eval.params.maxDets[2])) | |
| results.append(_summarize(areaRng='large', maxDets=coco_eval.params.maxDets[2])) | |
| with open(csv_output, 'w') as f: | |
| for result in results: | |
| f.writelines(result) | |
| def filter_valid_keypoints(coco_gt, coco_dt): | |
| kps = coco_dt.anns[1]['keypoints'] | |
| for id, ann in coco_gt.anns.items(): | |
| ann['keypoints'][2::3] = [a * b for a, b in zip(ann['keypoints'][2::3], kps[2::3])] | |
| ann['num_keypoints'] = sum(ann['keypoints'][2::3]) | |
| return coco_gt | |
| class COCOResults(object): | |
| METRICS = { | |
| "bbox": ["AP", "AP50", "AP75", "APs", "APm", "APl"], | |
| "segm": ["AP", "AP50", "AP75", "APs", "APm", "APl"], | |
| "box_proposal": [ | |
| "AR@100", | |
| "ARs@100", | |
| "ARm@100", | |
| "ARl@100", | |
| "AR@1000", | |
| "ARs@1000", | |
| "ARm@1000", | |
| "ARl@1000", | |
| ], | |
| "keypoints": ["AP", "AP50", "AP75", "APm", "APl"], | |
| } | |
| def __init__(self, *iou_types): | |
| allowed_types = ("box_proposal", "bbox", "segm", "keypoints") | |
| assert all(iou_type in allowed_types for iou_type in iou_types) | |
| results = OrderedDict() | |
| for iou_type in iou_types: | |
| results[iou_type] = OrderedDict( | |
| [(metric, -1) for metric in COCOResults.METRICS[iou_type]] | |
| ) | |
| self.results = results | |
| def update(self, coco_eval): | |
| if coco_eval is None: | |
| return | |
| from pycocotools.cocoeval import COCOeval | |
| assert isinstance(coco_eval, COCOeval) | |
| s = coco_eval.stats | |
| iou_type = coco_eval.params.iouType | |
| res = self.results[iou_type] | |
| metrics = COCOResults.METRICS[iou_type] | |
| for idx, metric in enumerate(metrics): | |
| res[metric] = s[idx] | |
| def __repr__(self): | |
| # TODO make it pretty | |
| return repr(self.results) | |
| def check_expected_results(results, expected_results, sigma_tol): | |
| if not expected_results: | |
| return | |
| logger = logging.getLogger("maskrcnn_benchmark.inference") | |
| for task, metric, (mean, std) in expected_results: | |
| actual_val = results.results[task][metric] | |
| lo = mean - sigma_tol * std | |
| hi = mean + sigma_tol * std | |
| ok = (lo < actual_val) and (actual_val < hi) | |
| msg = ( | |
| "{} > {} sanity check (actual vs. expected): " | |
| "{:.3f} vs. mean={:.4f}, std={:.4}, range=({:.4f}, {:.4f})" | |
| ).format(task, metric, actual_val, mean, std, lo, hi) | |
| if not ok: | |
| msg = "FAIL: " + msg | |
| logger.error(msg) | |
| else: | |
| msg = "PASS: " + msg | |
| logger.info(msg) | |