Source code for tensorbay.opendataset.nuScenes.loader

#!/usr/bin/env python3
#
# Copyright 2021 Graviti. Licensed under MIT License.
#
# pylint: disable=invalid-name
# pylint: disable=missing-module-docstring


import json
import os
from typing import Any, Dict, Iterator, List

from ...dataset import Data, Frame, FusionDataset, FusionSegment
from ...geometry import Transform3D
from ...label import LabeledBox3D
from ...sensor import Camera, Lidar, Radar

DATASET_NAME = "nuScenes"

_SENSOR_TYPE_CLASS = {
    "lidar": Lidar,
    "radar": Radar,
    "camera": Camera,
}
_ATTRIBUTE_KEYS = {
    "vehicle": "vehicle_motion",
    "cycle": "cycle_rider",
    "pedestrian": "pedestrian_motion",
}


[docs]def nuScenes(path: str) -> FusionDataset: """Dataloader of the `nuScenes`_ dataset. .. _nuScenes: https://www.nuscenes.org/ The file structure should be like:: <path> v1.0-mini/ maps/ 36092f0b03a857c6a3403e25b4b7aab3.png ... samples/ CAM_BACK/ CAM_BACK_LEFT/ CAM_BACK_RIGHT/ CAM_FRONT/ CAM_FRONT_LEFT/ CAM_FRONT_RIGHT/ LIDAR_TOP/ RADAR_BACK_LEFT/ RADAR_BACK_RIGHT/ RADAR_FRONT/ RADAR_FRONT_LEFT/ RADAR_FRONT_RIGHT/ sweeps/ CAM_BACK/ CAM_BACK_LEFT/ CAM_BACK_RIGHT/ CAM_FRONT/ CAM_FRONT_LEFT/ CAM_FRONT_RIGHT/ LIDAR_TOP/ RADAR_BACK_LEFT/ RADAR_BACK_RIGHT/ RADAR_FRONT/ RADAR_FRONT_LEFT/ RADAR_FRONT_RIGHT/ v1.0-mini/ attribute.json calibrated_sensor.json category.json ego_pose.json instance.json log.json map.json sample_annotation.json sample_data.json sample.json scene.json sensor.json visibility.json v1.0-test/ maps/ samples/ sweeps/ v1.0-test/ v1.0-trainval/ maps/ samples/ sweeps/ v1.0-trainval/ Arguments: path: The root directory of the dataset. Returns: Loaded :class:`~tensorbay.dataset.dataset.FusionDataset` instance. """ root_path = os.path.abspath(os.path.expanduser(path)) dataset = FusionDataset(DATASET_NAME) dataset.notes.is_continuous = True dataset.notes.bin_point_cloud_fields = ["X", "Y", "Z", "Intensity", "Ring"] dataset.load_catalog(os.path.join(os.path.dirname(__file__), "catalog.json")) for subset in os.listdir(root_path): for segment in _generate_segments(root_path, subset): dataset.add_segment(segment) return dataset
def _generate_segments(root_path: str, subset: str) -> Iterator[FusionSegment]: subset_path = os.path.join(root_path, subset) is_test = subset.endswith("test") annotation_info = _get_annotation_info(os.path.join(subset_path, subset), is_test) for scene in annotation_info["scenes"]: segment = FusionSegment(f"{subset}-{scene['name']}") segment.description = scene["description"] current_frame_token = scene["first_sample_token"] while current_frame_token: _load_frame_and_sensor( segment, current_frame_token, annotation_info, subset_path, is_test, ) current_frame_token = annotation_info["samples"][current_frame_token]["next"] yield segment def _get_annotation_info(info_path: str, is_test: bool = False) -> Dict[str, Any]: annotation_info = { "samples": _get_info_with_token(info_path, "sample"), "frame_data": _get_info_with_sample_token(info_path, "sample_data"), "calibrated_sensors": _get_info_with_token(info_path, "calibrated_sensor"), "ego_poses": _get_info_with_token(info_path, "ego_pose"), "sensor": _get_info_with_token(info_path, "sensor"), } with open(os.path.join(info_path, "scene.json"), "r") as file: annotation_info["scenes"] = json.load(file) if not is_test: annotation_info["sample_annotations"] = _get_info_with_sample_token( info_path, "sample_annotation", no_key_frame=True ) annotation_info["instance"] = _get_info_with_token(info_path, "instance") annotation_info["category"] = _get_info_with_token(info_path, "category") annotation_info["attribute"] = _get_info_with_token(info_path, "attribute") annotation_info["visibility"] = _get_info_with_token(info_path, "visibility") return annotation_info def _get_info_with_token(info_path: str, annotation_part: str) -> Dict[str, Any]: filepath = os.path.join(info_path, f"{annotation_part}.json") with open(filepath, "r") as fp: info = json.load(fp) return {item.pop("token"): item for item in info} def _get_info_with_sample_token( info_path: str, annotation_part: str, no_key_frame: bool = False ) -> Dict[str, List[Any]]: filepath = os.path.join(info_path, f"{annotation_part}.json") with open(filepath, "r") as fp: info = json.load(fp) info_with_keys: Dict[str, List[Any]] = {} for item in info: if no_key_frame or item["is_key_frame"]: info_with_keys.setdefault(item.pop("sample_token"), []).append(item) return info_with_keys def _load_frame_and_sensor( segment: FusionSegment, current_frame_token: str, annotation_info: Dict[str, Any], subset_path: str, is_test: bool, ) -> None: frame = Frame() for sensor_frame in annotation_info["frame_data"][current_frame_token]: calibrated_sensor_info = annotation_info["calibrated_sensors"][ sensor_frame["calibrated_sensor_token"] ] common_sensor = annotation_info["sensor"][calibrated_sensor_info["sensor_token"]] sensor_name = common_sensor["channel"] sensor_type = common_sensor["modality"] if sensor_name not in segment.sensors: _load_sensor(segment, sensor_type, sensor_name, calibrated_sensor_info) data = Data( os.path.join(subset_path, sensor_frame["filename"]), timestamp=sensor_frame["timestamp"] / 10 ** 6, ) if not is_test and sensor_type == "lidar": _load_labels( data, current_frame_token, annotation_info["ego_poses"][sensor_frame["ego_pose_token"]], segment.sensors[sensor_name].extrinsics, annotation_info, ) frame[sensor_name] = data segment.append(frame) def _load_sensor( segment: FusionSegment, sensor_type: str, sensor_name: str, calibrated_sensor_info: Dict[str, Any], ) -> None: sensor = _SENSOR_TYPE_CLASS[sensor_type](sensor_name) sensor.set_extrinsics( translation=calibrated_sensor_info["translation"], rotation=calibrated_sensor_info["rotation"], ) intrinsics = calibrated_sensor_info["camera_intrinsic"] if intrinsics: sensor.set_camera_matrix(matrix=intrinsics) # type: ignore[attr-defined] segment.sensors.add(sensor) # type: ignore[arg-type] def _load_labels( data: Data, current_frame_token: str, lidar_ego_pose_info: Dict[str, Any], lidar_to_ego: Transform3D, annotation_info: Dict[str, Dict[str, Any]], ) -> None: labels = [] sample_annotations = annotation_info["sample_annotations"] if current_frame_token in sample_annotations: lidar_ego_pose = Transform3D( translation=lidar_ego_pose_info["translation"], rotation=lidar_ego_pose_info["rotation"], ) world_to_lidar = (lidar_ego_pose * lidar_to_ego).inverse() for instance_annotation in sample_annotations[current_frame_token]: labeled_box = _get_labeled_box(instance_annotation, annotation_info) labels.append(world_to_lidar * labeled_box) data.label.box3d = labels def _get_labeled_box( instance_annotation: Dict[str, Any], annotation_info: Dict[str, Any], ) -> LabeledBox3D: instance = instance_annotation["instance_token"] category = annotation_info["category"][annotation_info["instance"][instance]["category_token"]][ "name" ] attributes = {} for attribute_token in instance_annotation["attribute_tokens"]: attribute_full_name = annotation_info["attribute"][attribute_token]["name"] key, value = attribute_full_name.rsplit(".", 1) attributes[_ATTRIBUTE_KEYS[key]] = value attributes["visibility"] = annotation_info["visibility"][ instance_annotation["visibility_token"] ]["level"] width, length, height = instance_annotation["size"] return LabeledBox3D( translation=instance_annotation["translation"], rotation=instance_annotation["rotation"], size=(length, width, height), category=category, instance=instance, attributes=attributes, )