#!/usr/bin/env python3
#
# Copyright 2021 Graviti. Licensed under MIT License.
#
# pylint: disable=invalid-name
# pylint: disable=missing-module-docstring
import base64
import os
from typing import Any, Dict, Iterator, List, Optional, Tuple
from tensorbay.dataset import Data, Frame, FusionDataset, FusionSegment
from tensorbay.label import LabeledBox2D, LabeledRLE
from tensorbay.opendataset._utility.nuScenes import (
get_info_with_determined_token,
get_info_with_token,
get_sensor,
transpose_rle,
uncompress_rle,
)
DATASET_NAME = "nuImages"
[docs]def nuImages(path: str) -> FusionDataset:
"""`nuImages <https://www.nuscenes.org/nuimages>`_ dataset.
The file structure should be like::
<path>
nuimages-v1.0-all-metadata/
v1.0-mini/
attribute.json
calibrated_sensor.json
category.json
ego_pose.json
instance.json
log.json
object_ann.json
sample_data.json
sample.json
sensor.json
surface_ann.json
v1.0-test/
...
v1.0-train/
...
v1.0-val/
...
samples/
CAM_BACK/
CAM_BACK_LEFT/
CAM_BACK_RIGHT/
CAM_FRONT/
CAM_FRONT_LEFT/
CAM_FRONT_RIGHT/
sweeps/
CAM_BACK/
CAM_BACK_LEFT/
CAM_BACK_RIGHT/
CAM_FRONT/
CAM_FRONT_LEFT/
CAM_FRONT_RIGHT/
nuimages-v1.0-mini/
samples/
...
sweeps/
...
v1.0-mini/
...
Arguments:
path: The root directory of the dataset.
Returns:
Loaded :class:`~tensorbay.dataset.dataset.Dataset` instance.
"""
root_path = os.path.abspath(os.path.expanduser(path))
dataset = FusionDataset(DATASET_NAME)
dataset.load_catalog(os.path.join(os.path.dirname(__file__), "catalog.json"))
metadata_path = os.path.join(root_path, "nuimages-v1.0-all-metadata")
for subset in os.listdir(metadata_path):
dataset.add_segment(_get_segment(root_path, subset, metadata_path))
return dataset
def _get_segment(root_path: str, subset: str, metadata_path: str) -> FusionSegment:
is_test = subset.endswith("test")
annotation_info = _get_annotation_info(os.path.join(metadata_path, subset), is_test)
segment = FusionSegment(subset)
_load_frame_and_sensor(segment, annotation_info, root_path, is_test)
return segment
def _get_annotation_info(info_path: str, is_test: bool = False) -> Dict[str, Any]:
annotation_info = {
"samples": get_info_with_token(info_path, "sample"),
"frame_data": get_info_with_determined_token(info_path, "sample_data"),
"calibrated_sensors": get_info_with_token(info_path, "calibrated_sensor"),
"ego_poses": get_info_with_token(info_path, "ego_pose"),
"sensor": get_info_with_token(info_path, "sensor"),
}
if not is_test:
annotation_info["object_annotations"] = get_info_with_determined_token(
info_path, "object_ann", no_key_frame=True, determined_token="sample_data_token"
)
annotation_info["surface_annotations"] = get_info_with_determined_token(
info_path, "surface_ann", no_key_frame=True, determined_token="sample_data_token"
)
annotation_info["category"] = get_info_with_token(info_path, "category")
annotation_info["attribute"] = get_info_with_token(info_path, "attribute")
return annotation_info
def _load_frame_and_sensor(
segment: FusionSegment,
annotation_info: Dict[str, Any],
subset_path: str,
is_test: bool,
) -> None:
frame = Frame()
for data_frames in annotation_info["frame_data"].values():
for data_frame in data_frames:
calibrated_sensor_info = annotation_info["calibrated_sensors"][
data_frame["calibrated_sensor_token"]
]
common_sensor = annotation_info["sensor"][calibrated_sensor_info["sensor_token"]]
sensor_name = common_sensor["channel"]
if sensor_name not in segment.sensors:
segment.sensors.add(get_sensor("camera", sensor_name, calibrated_sensor_info))
data = Data(
os.path.join(subset_path, data_frame["filename"]),
timestamp=data_frame["timestamp"] / 10 ** 6,
)
if not is_test:
data.label.box2d, data.label.rle = _get_labels(data_frame["token"], annotation_info)
frame[sensor_name] = data
segment.append(frame)
def _get_labels(
sample_data_token: str,
annotation_info: Dict[str, Dict[str, Any]],
) -> Tuple[List[LabeledBox2D], List[LabeledRLE]]:
object_annotations = annotation_info["object_annotations"]
surface_annotations = annotation_info["surface_annotations"]
categories = annotation_info["category"]
attributes = annotation_info["attribute"]
label_box2d: List[LabeledBox2D] = []
label_rle: List[LabeledRLE] = []
if sample_data_token in object_annotations:
for box2d, rle in _get_object_annotations(
object_annotations[sample_data_token], categories, attributes
):
label_box2d.append(box2d)
if rle:
label_rle.append(rle)
if sample_data_token in surface_annotations:
for rle in _get_surface_annotations(surface_annotations[sample_data_token], categories):
label_rle.append(rle)
return label_box2d, label_rle
def _get_object_annotations(
object_annotations: List[Dict[str, Any]],
token_to_categories: Dict[str, Any],
token_to_attributes: Dict[str, Any],
) -> Iterator[Tuple[LabeledBox2D, Optional[LabeledRLE]]]:
for object_annotation in object_annotations:
category = token_to_categories[object_annotation["category_token"]]["name"]
attributes = {}
for attribute_token in object_annotation["attribute_tokens"]:
key, value = token_to_attributes[attribute_token]["name"].rsplit(".", 1)
attributes[key] = value
box2d = LabeledBox2D(*object_annotation["bbox"], category=category, attributes=attributes)
mask = object_annotation["mask"]
rle = _get_labeled_rle(mask, category, attributes) if mask else None
yield box2d, rle
def _get_surface_annotations(
surface_annotations: List[Dict[str, Any]], categories: Dict[str, Any]
) -> Iterator[LabeledRLE]:
for surface_annotation in surface_annotations:
mask = surface_annotation["mask"]
yield _get_labeled_rle(mask, categories[surface_annotation["category_token"]]["name"])
def _get_labeled_rle(
mask: Dict[str, str],
category: Optional[str] = None,
attributes: Optional[Dict[str, str]] = None,
) -> LabeledRLE:
return LabeledRLE(
transpose_rle(
uncompress_rle(bytes.decode(base64.b64decode(mask["counts"]))), *mask["size"]
),
category=category,
attributes=attributes,
)