Source code for tensorbay.opendataset.nuImages.loader

#!/usr/bin/env python3
#
# Copyright 2021 Graviti. Licensed under MIT License.
#
# pylint: disable=invalid-name

"""Dataloader of nuImages dataset."""

import base64
import os
from typing import Any, Dict, Iterator, List, Optional, Tuple

from tensorbay.dataset import Data, Frame, FusionDataset, FusionSegment
from tensorbay.label import LabeledBox2D, LabeledRLE
from tensorbay.opendataset._utility.nuScenes import (
    get_info_with_determined_token,
    get_info_with_token,
    get_sensor,
    transpose_rle,
    uncompress_rle,
)

DATASET_NAME = "nuImages"


[docs]def nuImages(path: str) -> FusionDataset: """`nuImages <https://www.nuscenes.org/nuimages>`_ dataset. The file structure should be like:: <path> nuimages-v1.0-all-metadata/ v1.0-mini/ attribute.json calibrated_sensor.json category.json ego_pose.json instance.json log.json object_ann.json sample_data.json sample.json sensor.json surface_ann.json v1.0-test/ ... v1.0-train/ ... v1.0-val/ ... samples/ CAM_BACK/ CAM_BACK_LEFT/ CAM_BACK_RIGHT/ CAM_FRONT/ CAM_FRONT_LEFT/ CAM_FRONT_RIGHT/ sweeps/ CAM_BACK/ CAM_BACK_LEFT/ CAM_BACK_RIGHT/ CAM_FRONT/ CAM_FRONT_LEFT/ CAM_FRONT_RIGHT/ nuimages-v1.0-mini/ samples/ ... sweeps/ ... v1.0-mini/ ... Arguments: path: The root directory of the dataset. Returns: Loaded :class:`~tensorbay.dataset.dataset.Dataset` instance. """ root_path = os.path.abspath(os.path.expanduser(path)) dataset = FusionDataset(DATASET_NAME) dataset.load_catalog(os.path.join(os.path.dirname(__file__), "catalog.json")) metadata_path = os.path.join(root_path, "nuimages-v1.0-all-metadata") for subset in os.listdir(metadata_path): dataset.add_segment(_get_segment(root_path, subset, metadata_path)) return dataset
def _get_segment(root_path: str, subset: str, metadata_path: str) -> FusionSegment: is_test = subset.endswith("test") annotation_info = _get_annotation_info(os.path.join(metadata_path, subset), is_test) segment = FusionSegment(subset) _load_frame_and_sensor(segment, annotation_info, root_path, is_test) return segment def _get_annotation_info(info_path: str, is_test: bool = False) -> Dict[str, Any]: annotation_info = { "samples": get_info_with_token(info_path, "sample"), "frame_data": get_info_with_determined_token(info_path, "sample_data"), "calibrated_sensors": get_info_with_token(info_path, "calibrated_sensor"), "ego_poses": get_info_with_token(info_path, "ego_pose"), "sensor": get_info_with_token(info_path, "sensor"), } if not is_test: annotation_info["object_annotations"] = get_info_with_determined_token( info_path, "object_ann", no_key_frame=True, determined_token="sample_data_token" ) annotation_info["surface_annotations"] = get_info_with_determined_token( info_path, "surface_ann", no_key_frame=True, determined_token="sample_data_token" ) annotation_info["category"] = get_info_with_token(info_path, "category") attributes_info = get_info_with_token(info_path, "attribute") for attribute_info in attributes_info.values(): name, value = attribute_info["name"].rsplit(".", 1) attribute_info["name"] = name.replace(".", "-") attribute_info["value"] = value annotation_info["attribute"] = attributes_info return annotation_info def _load_frame_and_sensor( segment: FusionSegment, annotation_info: Dict[str, Any], subset_path: str, is_test: bool, ) -> None: for data_frames in annotation_info["frame_data"].values(): frame = Frame() for data_frame in data_frames: calibrated_sensor_info = annotation_info["calibrated_sensors"][ data_frame["calibrated_sensor_token"] ] common_sensor = annotation_info["sensor"][calibrated_sensor_info["sensor_token"]] sensor_name = common_sensor["channel"] if sensor_name not in segment.sensors: segment.sensors.add(get_sensor("camera", sensor_name, calibrated_sensor_info)) data = Data( os.path.join(subset_path, data_frame["filename"]), timestamp=data_frame["timestamp"] / 10**6, ) if not is_test: data.label.box2d, data.label.rle = _get_labels(data_frame["token"], annotation_info) frame[sensor_name] = data segment.append(frame) def _get_labels( sample_data_token: str, annotation_info: Dict[str, Dict[str, Any]], ) -> Tuple[List[LabeledBox2D], List[LabeledRLE]]: object_annotations = annotation_info["object_annotations"] surface_annotations = annotation_info["surface_annotations"] categories = annotation_info["category"] attributes = annotation_info["attribute"] label_box2d: List[LabeledBox2D] = [] label_rle: List[LabeledRLE] = [] if sample_data_token in object_annotations: for box2d, rle in _get_object_annotations( object_annotations[sample_data_token], categories, attributes ): label_box2d.append(box2d) if rle: label_rle.append(rle) if sample_data_token in surface_annotations: for rle in _get_surface_annotations(surface_annotations[sample_data_token], categories): label_rle.append(rle) return label_box2d, label_rle def _get_object_annotations( object_annotations: List[Dict[str, Any]], token_to_categories: Dict[str, Any], token_to_attributes: Dict[str, Any], ) -> Iterator[Tuple[LabeledBox2D, Optional[LabeledRLE]]]: for object_annotation in object_annotations: category = token_to_categories[object_annotation["category_token"]]["name"] attributes = {} for attribute_token in object_annotation["attribute_tokens"]: attribute_info = token_to_attributes[attribute_token] attributes[attribute_info["name"]] = attribute_info["value"] box2d = LabeledBox2D(*object_annotation["bbox"], category=category, attributes=attributes) mask = object_annotation["mask"] rle = _get_labeled_rle(mask, category, attributes) if mask else None yield box2d, rle def _get_surface_annotations( surface_annotations: List[Dict[str, Any]], categories: Dict[str, Any] ) -> Iterator[LabeledRLE]: for surface_annotation in surface_annotations: mask = surface_annotation["mask"] yield _get_labeled_rle(mask, categories[surface_annotation["category_token"]]["name"]) def _get_labeled_rle( mask: Dict[str, str], category: Optional[str] = None, attributes: Optional[Dict[str, str]] = None, ) -> LabeledRLE: return LabeledRLE( transpose_rle( uncompress_rle(bytes.decode(base64.b64decode(mask["counts"]))), *mask["size"] ), category=category, attributes=attributes, )