Source code for tensorbay.opendataset.COCO2017.loader

#!/usr/bin/env python3
#
# Copyright 2021 Graviti. Licensed under MIT License.
#
# pylint: disable=invalid-name, missing-module-docstring

import json
import os
from collections import defaultdict
from typing import Any, DefaultDict, Dict, List, Tuple

import numpy as np

from tensorbay.dataset import Data, Dataset
from tensorbay.label import (
    Label,
    LabeledBox2D,
    LabeledKeypoints2D,
    LabeledMultiPolygon,
    LabeledRLE,
    PanopticMask,
)
from tensorbay.opendataset._utility.glob import glob
from tensorbay.utility.itertools import chunked

try:
    from PIL import Image
except ModuleNotFoundError:
    from tensorbay.opendataset._utility.mocker import Image  # pylint:disable=ungrouped-imports

DATASET_NAME = "COCO2017"
_LABELED_SEGMENT_NAMES = ("train", "val")
_UNLABELED_SEGMENT_NAMES = ("test", "unlabeled")


[docs]def COCO2017(path: str) -> Dataset: """`COCO2017 <https://cocodataset.org/#home>`_ dataset. The file structure should be like:: <path> annotations/ panoptic_train2017/ 000000116037.png 000000116040.png ... panoptic_val2017/ instances_train2017.json instances_val2017.json panoptic_train2017.json panoptic_val2017.json person_keypoints_train2017.json person_keypoints_val2017.json train2017/ 000000116026.jpg 000000116031.jpg ... test2017/ val2017/ unlabeled2017/ Arguments: path: The root directory of the dataset. Returns: Loaded :class: `~tensorbay.dataset.dataset.Dataset` instance. """ root_path = os.path.abspath(os.path.expanduser(path)) dataset = Dataset(DATASET_NAME) dataset.load_catalog(os.path.join(os.path.dirname(__file__), "catalog.json")) for segment_name in _LABELED_SEGMENT_NAMES: segment = dataset.create_segment(segment_name) annotation_path = os.path.join(root_path, "annotations") task_information = _get_information(annotation_path, segment_name) categories = task_information["categories"] for image_path in glob(os.path.join(root_path, f"{segment_name}2017", "*.jpg")): data = Data(image_path) image_stem = os.path.splitext(os.path.basename(image_path))[0] image_id = int(image_stem) label = _get_instance_label( task_information["instances_annotations"], image_id, categories ) label.keypoints2d = _get_keypoints2d( task_information["person_keypoints_annotations"], image_id, categories ) label.box2d.extend( _get_panoptic_box2d(task_information["panoptic_annotations"], image_id, categories) ) label.panoptic_mask = _get_panoptic_mask( annotation_path, segment_name, image_stem, task_information["panoptic_annotations"], image_id, ) data.label = label segment.append(data) for segment_name in _UNLABELED_SEGMENT_NAMES: segment = dataset.create_segment(segment_name) for image_path in glob(os.path.join(root_path, f"{segment_name}2017", "*.jpg")): segment.append(Data(image_path)) return dataset
def _get_information(annotation_path: str, segment_name: str) -> Dict[str, Any]: task_information: Dict[str, Any] = {} for task in ("instances", "person_keypoints", "panoptic"): with open( os.path.join(annotation_path, f"{task}_{segment_name}2017.json"), "r", encoding="utf-8" ) as fp: file_json = json.load(fp) task_annotation: DefaultDict[int, Any] = defaultdict(list) for annotation in file_json["annotations"]: task_annotation[annotation["image_id"]].append(annotation) task_information[f"{task}_annotations"] = task_annotation if task == "panoptic": task_information["categories"] = { category["id"]: f"{category['supercategory']}.{category['name']}" for category in file_json["categories"] } return task_information def _get_instance_label( instances_annotations: Dict[int, Any], image_id: int, categories: Dict[int, str] ) -> Label: label: Label = Label() label.box2d = [] label.multi_polygon = [] label.rle = [] if image_id not in instances_annotations: return label for annotation in instances_annotations[image_id]: category = categories[annotation["category_id"]] label.box2d.append(LabeledBox2D.from_xywh(*annotation["bbox"], category=category)) if annotation["iscrowd"] == 0: points = [chunked(coordinates, 2) for coordinates in annotation["segmentation"]] label.multi_polygon.append(LabeledMultiPolygon(points, category=category)) else: label.rle.append(LabeledRLE(annotation["segmentation"]["counts"], category=category)) return label def _get_keypoints2d( person_keypoints_annotations: Dict[int, Any], image_id: int, categories: Dict[int, str] ) -> List[LabeledKeypoints2D]: if image_id not in person_keypoints_annotations: return [] keypoints2d: List[LabeledKeypoints2D] = [] for annotation in person_keypoints_annotations[image_id]: points = chunked(annotation["keypoints"], 3) category = categories[annotation["category_id"]] keypoints2d.append(LabeledKeypoints2D(points, category=category)) return keypoints2d def _get_panoptic_box2d( panoptic_annotations: Dict[int, Any], image_id: int, categories: Dict[int, str] ) -> List[LabeledBox2D]: if image_id not in panoptic_annotations: return [] box2d: List[LabeledBox2D] = [] for annotation in panoptic_annotations[image_id]: for segment_info in annotation["segments_info"]: # category_id 1-91 are thing categories from the detection task # category_id 92-200 are stuff categories from the stuff task if segment_info["category_id"] > 91: category = categories[segment_info["category_id"]] box2d.append(LabeledBox2D.from_xywh(*segment_info["bbox"], category=category)) return box2d def _get_panoptic_mask( annotation_path: str, segment_name: str, image_stem: str, panoptic_annotations: Dict[int, Any], image_id: int, ) -> PanopticMask: compress_pixel, new_mask_path = _save_mask(annotation_path, segment_name, image_stem) category_ids = {} annotation = panoptic_annotations[image_id][0] for info in annotation["segments_info"]: category_ids[compress_pixel[info["id"]]] = info["category_id"] panoptic_mask = PanopticMask(local_path=new_mask_path) panoptic_mask.all_category_ids = category_ids return panoptic_mask def _save_mask( annotation_path: str, segment_name: str, image_stem: str ) -> Tuple[Dict[int, int], str]: new_file_path = os.path.join(annotation_path, f"new_panoptic_{segment_name}2017") os.makedirs(new_file_path, exist_ok=True) mask_name = f"{image_stem}.png" array = np.array( Image.open(os.path.join(annotation_path, f"panoptic_{segment_name}2017", mask_name)), dtype=int, ) mask_array = array[:, :, 0] + array[:, :, 1] * 256 + array[:, :, 2] * 256 * 256 array_elements = np.unique(mask_array) array_elements = np.delete(array_elements, np.where(array_elements == 0)) map_of_pixel = dict(zip(array_elements, range(1, len(array_elements) + 1))) map_of_pixel[0] = 0 mask_array = np.vectorize(map_of_pixel.get)(mask_array) new_mask_path = os.path.join(new_file_path, mask_name) Image.fromarray(np.uint8(mask_array)).save(new_mask_path) return map_of_pixel, new_mask_path