Source code for tensorbay.opendataset.nuScenes.loader

#!/usr/bin/env python3
#
# Copyright 2021 Graviti. Licensed under MIT License.
#
# pylint: disable=invalid-name

"""Dataloader of nuScenes dataset."""


import json
import os
from typing import Any, Dict, Iterator, List

from tensorbay.dataset import Data, Frame, FusionDataset, FusionSegment
from tensorbay.geometry import Transform3D
from tensorbay.label import LabeledBox3D
from tensorbay.opendataset._utility.nuScenes import (
    get_info_with_determined_token,
    get_info_with_token,
    get_sensor,
)

DATASET_NAME = "nuScenes"

_ATTRIBUTE_KEYS = {
    "vehicle": "vehicle_motion",
    "cycle": "cycle_rider",
    "pedestrian": "pedestrian_motion",
}


[docs]def nuScenes(path: str) -> FusionDataset: """`nuScenes <https://www.nuscenes.org/>`_ dataset. The file structure should be like:: <path> v1.0-mini/ maps/ 36092f0b03a857c6a3403e25b4b7aab3.png ... samples/ CAM_BACK/ CAM_BACK_LEFT/ CAM_BACK_RIGHT/ CAM_FRONT/ CAM_FRONT_LEFT/ CAM_FRONT_RIGHT/ LIDAR_TOP/ RADAR_BACK_LEFT/ RADAR_BACK_RIGHT/ RADAR_FRONT/ RADAR_FRONT_LEFT/ RADAR_FRONT_RIGHT/ sweeps/ CAM_BACK/ CAM_BACK_LEFT/ CAM_BACK_RIGHT/ CAM_FRONT/ CAM_FRONT_LEFT/ CAM_FRONT_RIGHT/ LIDAR_TOP/ RADAR_BACK_LEFT/ RADAR_BACK_RIGHT/ RADAR_FRONT/ RADAR_FRONT_LEFT/ RADAR_FRONT_RIGHT/ v1.0-mini/ attribute.json calibrated_sensor.json category.json ego_pose.json instance.json log.json map.json sample_annotation.json sample_data.json sample.json scene.json sensor.json visibility.json v1.0-test/ maps/ samples/ sweeps/ v1.0-test/ v1.0-trainval/ maps/ samples/ sweeps/ v1.0-trainval/ Arguments: path: The root directory of the dataset. Returns: Loaded :class:`~tensorbay.dataset.dataset.FusionDataset` instance. """ root_path = os.path.abspath(os.path.expanduser(path)) dataset = FusionDataset(DATASET_NAME) dataset.notes.is_continuous = True dataset.notes.bin_point_cloud_fields = ["X", "Y", "Z", "Intensity", "Ring"] dataset.load_catalog(os.path.join(os.path.dirname(__file__), "catalog.json")) for subset in os.listdir(root_path): for segment in _generate_segments(root_path, subset): dataset.add_segment(segment) return dataset
def _generate_segments(root_path: str, subset: str) -> Iterator[FusionSegment]: subset_path = os.path.join(root_path, subset) is_test = subset.endswith("test") annotation_info = _get_annotation_info(os.path.join(subset_path, subset), is_test) for scene in annotation_info["scenes"]: segment = FusionSegment(f"{subset}-{scene['name']}") segment.description = scene["description"] current_frame_token = scene["first_sample_token"] while current_frame_token: _load_frame_and_sensor( segment, current_frame_token, annotation_info, subset_path, is_test, ) current_frame_token = annotation_info["samples"][current_frame_token]["next"] yield segment def _get_annotation_info(info_path: str, is_test: bool = False) -> Dict[str, Any]: annotation_info = { "samples": get_info_with_token(info_path, "sample"), "frame_data": get_info_with_determined_token(info_path, "sample_data"), "calibrated_sensors": get_info_with_token(info_path, "calibrated_sensor"), "ego_poses": get_info_with_token(info_path, "ego_pose"), "sensor": get_info_with_token(info_path, "sensor"), } with open(os.path.join(info_path, "scene.json"), encoding="utf-8") as file: annotation_info["scenes"] = json.load(file) if not is_test: annotation_info["sample_annotations"] = get_info_with_determined_token( info_path, "sample_annotation", no_key_frame=True ) annotation_info["instance"] = get_info_with_token(info_path, "instance") annotation_info["category"] = get_info_with_token(info_path, "category") annotation_info["attribute"] = get_info_with_token(info_path, "attribute") annotation_info["visibility"] = get_info_with_token(info_path, "visibility") return annotation_info def _load_frame_and_sensor( segment: FusionSegment, current_frame_token: str, annotation_info: Dict[str, Any], subset_path: str, is_test: bool, ) -> None: frame = Frame() for sensor_frame in annotation_info["frame_data"][current_frame_token]: calibrated_sensor_info = annotation_info["calibrated_sensors"][ sensor_frame["calibrated_sensor_token"] ] common_sensor = annotation_info["sensor"][calibrated_sensor_info["sensor_token"]] sensor_name = common_sensor["channel"] sensor_type = common_sensor["modality"] if sensor_name not in segment.sensors: segment.sensors.add(get_sensor(sensor_type, sensor_name, calibrated_sensor_info)) data = Data( os.path.join(subset_path, sensor_frame["filename"]), timestamp=sensor_frame["timestamp"] / 10**6, ) if not is_test and sensor_type == "lidar": data.label.box3d = _get_labels( current_frame_token, annotation_info["ego_poses"][sensor_frame["ego_pose_token"]], segment.sensors[sensor_name].extrinsics, annotation_info, ) frame[sensor_name] = data segment.append(frame) def _get_labels( current_frame_token: str, lidar_ego_pose_info: Dict[str, Any], lidar_to_ego: Transform3D, annotation_info: Dict[str, Dict[str, Any]], ) -> List[LabeledBox3D]: labels = [] sample_annotations = annotation_info["sample_annotations"] if current_frame_token in sample_annotations: lidar_ego_pose = Transform3D( translation=lidar_ego_pose_info["translation"], rotation=lidar_ego_pose_info["rotation"], ) world_to_lidar = (lidar_ego_pose * lidar_to_ego).inverse() for instance_annotation in sample_annotations[current_frame_token]: labeled_box = _get_labeled_box(instance_annotation, annotation_info) labels.append(world_to_lidar * labeled_box) return labels def _get_labeled_box( instance_annotation: Dict[str, Any], annotation_info: Dict[str, Any], ) -> LabeledBox3D: instance = instance_annotation["instance_token"] category = annotation_info["category"][annotation_info["instance"][instance]["category_token"]][ "name" ] attributes = {} for attribute_token in instance_annotation["attribute_tokens"]: attribute_full_name = annotation_info["attribute"][attribute_token]["name"] key, value = attribute_full_name.rsplit(".", 1) attributes[_ATTRIBUTE_KEYS[key]] = value attributes["visibility"] = annotation_info["visibility"][ instance_annotation["visibility_token"] ]["level"] width, length, height = instance_annotation["size"] return LabeledBox3D( translation=instance_annotation["translation"], rotation=instance_annotation["rotation"], size=(length, width, height), category=category, instance=instance, attributes=attributes, )