Source code for tensorbay.opendataset.COCO2017.loader

#!/usr/bin/env python3
#
# Copyright 2021 Graviti. Licensed under MIT License.
#
# pylint: disable=invalid-name

"""Dataloader of COCO2017 dataset."""

import json
import os
from collections import defaultdict
from typing import Any, DefaultDict, Dict, List, Tuple

import numpy as np

from tensorbay.dataset import Data, Dataset
from tensorbay.label import (
    Label,
    LabeledBox2D,
    LabeledKeypoints2D,
    LabeledMultiPolygon,
    LabeledRLE,
    PanopticMask,
)
from tensorbay.opendataset._utility.glob import glob
from tensorbay.utility.itertools import chunked

try:
    from PIL import Image
except ModuleNotFoundError:
    from tensorbay.opendataset._utility.mocker import Image  # pylint:disable=ungrouped-imports

DATASET_NAME = "COCO2017"
_LABELED_SEGMENT_NAMES = ("train", "val")
_UNLABELED_SEGMENT_NAMES = ("test", "unlabeled")


[docs]def COCO2017(path: str) -> Dataset: """`COCO2017 <https://cocodataset.org/#home>`_ dataset. The file structure should be like:: <path> annotations/ panoptic_train2017/ 000000116037.png 000000116040.png ... panoptic_val2017/ instances_train2017.json instances_val2017.json panoptic_train2017.json panoptic_val2017.json person_keypoints_train2017.json person_keypoints_val2017.json train2017/ 000000116026.jpg 000000116031.jpg ... test2017/ val2017/ unlabeled2017/ Arguments: path: The root directory of the dataset. Returns: Loaded :class: `~tensorbay.dataset.dataset.Dataset` instance. """ root_path = os.path.abspath(os.path.expanduser(path)) dataset = Dataset(DATASET_NAME) dataset.load_catalog(os.path.join(os.path.dirname(__file__), "catalog.json")) for segment_name in _LABELED_SEGMENT_NAMES: segment = dataset.create_segment(segment_name) annotation_path = os.path.join(root_path, "annotations") task_information = _get_information(annotation_path, segment_name) categories = task_information["categories"] for image_path in glob(os.path.join(root_path, f"{segment_name}2017", "*.jpg")): data = Data(image_path) image_stem = os.path.splitext(os.path.basename(image_path))[0] image_id = int(image_stem) label = _get_instance_label( task_information["instances_annotations"], image_id, categories ) label.keypoints2d = _get_keypoints2d( task_information["person_keypoints_annotations"], image_id, categories ) label.box2d.extend( _get_panoptic_box2d(task_information["panoptic_annotations"], image_id, categories) ) label.panoptic_mask = _get_panoptic_mask( annotation_path, segment_name, image_stem, task_information["panoptic_annotations"], image_id, ) data.label = label segment.append(data) for segment_name in _UNLABELED_SEGMENT_NAMES: segment = dataset.create_segment(segment_name) for image_path in glob(os.path.join(root_path, f"{segment_name}2017", "*.jpg")): segment.append(Data(image_path)) return dataset
def _get_information(annotation_path: str, segment_name: str) -> Dict[str, Any]: task_information: Dict[str, Any] = {} for task in ("instances", "person_keypoints", "panoptic"): with open( os.path.join(annotation_path, f"{task}_{segment_name}2017.json"), encoding="utf-8" ) as fp: file_json = json.load(fp) task_annotation: DefaultDict[int, Any] = defaultdict(list) for annotation in file_json["annotations"]: task_annotation[annotation["image_id"]].append(annotation) task_information[f"{task}_annotations"] = task_annotation if task == "panoptic": task_information["categories"] = { category["id"]: f"{category['supercategory']}.{category['name']}" for category in file_json["categories"] } return task_information def _get_instance_label( instances_annotations: Dict[int, Any], image_id: int, categories: Dict[int, str] ) -> Label: label: Label = Label() label.box2d = [] label.multi_polygon = [] label.rle = [] if image_id not in instances_annotations: return label for annotation in instances_annotations[image_id]: category = categories[annotation["category_id"]] label.box2d.append(LabeledBox2D.from_xywh(*annotation["bbox"], category=category)) if annotation["iscrowd"] == 0: points = [chunked(coordinates, 2) for coordinates in annotation["segmentation"]] label.multi_polygon.append(LabeledMultiPolygon(points, category=category)) else: label.rle.append(LabeledRLE(annotation["segmentation"]["counts"], category=category)) return label def _get_keypoints2d( person_keypoints_annotations: Dict[int, Any], image_id: int, categories: Dict[int, str] ) -> List[LabeledKeypoints2D]: if image_id not in person_keypoints_annotations: return [] keypoints2d: List[LabeledKeypoints2D] = [] for annotation in person_keypoints_annotations[image_id]: points = chunked(annotation["keypoints"], 3) category = categories[annotation["category_id"]] keypoints2d.append(LabeledKeypoints2D(points, category=category)) return keypoints2d def _get_panoptic_box2d( panoptic_annotations: Dict[int, Any], image_id: int, categories: Dict[int, str] ) -> List[LabeledBox2D]: if image_id not in panoptic_annotations: return [] box2d: List[LabeledBox2D] = [] for annotation in panoptic_annotations[image_id]: for segment_info in annotation["segments_info"]: # category_id 1-91 are thing categories from the detection task # category_id 92-200 are stuff categories from the stuff task if segment_info["category_id"] > 91: category = categories[segment_info["category_id"]] box2d.append(LabeledBox2D.from_xywh(*segment_info["bbox"], category=category)) return box2d def _get_panoptic_mask( annotation_path: str, segment_name: str, image_stem: str, panoptic_annotations: Dict[int, Any], image_id: int, ) -> PanopticMask: compress_pixel, new_mask_path = _save_mask(annotation_path, segment_name, image_stem) category_ids = {} annotation = panoptic_annotations[image_id][0] for info in annotation["segments_info"]: category_ids[compress_pixel[info["id"]]] = info["category_id"] panoptic_mask = PanopticMask(local_path=new_mask_path) panoptic_mask.all_category_ids = category_ids return panoptic_mask def _save_mask( annotation_path: str, segment_name: str, image_stem: str ) -> Tuple[Dict[int, int], str]: new_file_path = os.path.join(annotation_path, f"new_panoptic_{segment_name}2017") os.makedirs(new_file_path, exist_ok=True) mask_name = f"{image_stem}.png" array = np.array( Image.open(os.path.join(annotation_path, f"panoptic_{segment_name}2017", mask_name)), dtype=int, ) mask_array = array[:, :, 0] + array[:, :, 1] * 256 + array[:, :, 2] * 256 * 256 array_elements = np.unique(mask_array) array_elements = np.delete(array_elements, np.where(array_elements == 0)) map_of_pixel = dict(zip(array_elements, range(1, len(array_elements) + 1))) map_of_pixel[0] = 0 mask_array = np.vectorize(map_of_pixel.get)(mask_array) new_mask_path = os.path.join(new_file_path, mask_name) Image.fromarray(np.uint8(mask_array)).save(new_mask_path) return map_of_pixel, new_mask_path