Source code for tensorbay.opendataset.CADC.loader

#!/usr/bin/env python3
#
# Copyright 2021 Graviti. Licensed under MIT License.
#
# pylint: disable=invalid-name

"""Dataloader of CADC dataset."""

import json
import os
from datetime import datetime
from typing import Any, Dict, List

import quaternion

from tensorbay.dataset import Data, Frame, FusionDataset
from tensorbay.exception import ModuleImportError
from tensorbay.label import LabeledBox3D
from tensorbay.opendataset._utility import glob
from tensorbay.sensor import Camera, Lidar, Sensors

DATASET_NAME = "CADC"


[docs]def CADC(path: str) -> FusionDataset: """`CADC <http://cadcd.uwaterloo.ca/index.html>`_ dataset. The file structure should be like:: <path> 2018_03_06/ 0001/ 3d_ann.json labeled/ image_00/ data/ 0000000000.png 0000000001.png ... timestamps.txt ... image_07/ data/ timestamps.txt lidar_points/ data/ timestamps.txt novatel/ data/ dataformat.txt timestamps.txt ... 0018/ calib/ 00.yaml 01.yaml 02.yaml 03.yaml 04.yaml 05.yaml 06.yaml 07.yaml extrinsics.yaml README.txt 2018_03_07/ 2019_02_27/ Arguments: path: The root directory of the dataset. Returns: Loaded `~tensorbay.dataset.dataset.FusionDataset` instance. """ root_path = os.path.abspath(os.path.expanduser(path)) dataset = FusionDataset(DATASET_NAME) dataset.notes.is_continuous = True dataset.load_catalog(os.path.join(os.path.dirname(__file__), "catalog.json")) for date in os.listdir(root_path): date_path = os.path.join(root_path, date) sensors = _load_sensors(os.path.join(date_path, "calib")) for index in os.listdir(date_path): if index == "calib": continue segment = dataset.create_segment(f"{date}-{index}") segment.sensors = sensors segment_path = os.path.join(root_path, date, index) data_path = os.path.join(segment_path, "labeled") with open(os.path.join(segment_path, "3d_ann.json"), encoding="utf-8") as fp: # The first line of the json file is the json body. annotations = json.loads(fp.readline()) timestamps = _load_timestamps(sensors, data_path) for frame_index, annotation in enumerate(annotations): segment.append(_load_frame(sensors, data_path, frame_index, annotation, timestamps)) return dataset
def _load_timestamps(sensors: Sensors, data_path: str) -> Dict[str, List[str]]: timestamps = {} for sensor_name in sensors.keys(): data_folder = f"image_{sensor_name[-2:]}" if sensor_name != "LIDAR" else "lidar_points" timestamp_file = os.path.join(data_path, data_folder, "timestamps.txt") with open(timestamp_file, encoding="utf-8") as fp: timestamps[sensor_name] = fp.readlines() return timestamps def _load_frame( sensors: Sensors, data_path: str, frame_index: int, annotation: Dict[str, Any], timestamps: Dict[str, List[str]], ) -> Frame: frame = Frame() for sensor_name in sensors.keys(): # The data file name is a string of length 10 with each digit being a number: # 0000000000.jpg # 0000000001.bin stem = f"{frame_index:010}" # Each line of the timestamps file looks like: # 2018-03-06 15:02:33.000000000 timestamp = datetime.strptime( timestamps[sensor_name][frame_index][:23], "%Y-%m-%d %H:%M:%S.%f" ).timestamp() if sensor_name != "LIDAR": # The image folder corresponds to different cameras, whose name is likes "CAM00". # The image folder looks like "image_00". camera_folder = f"image_{sensor_name[-2:]}" image_file = f"{stem}.png" data = Data( os.path.join(data_path, camera_folder, "data", image_file), target_remote_path=f"{camera_folder}-{image_file}", timestamp=timestamp, ) else: data = Data( os.path.join(data_path, "lidar_points", "data", f"{stem}.bin"), timestamp=timestamp, ) data.label.box3d = _load_labels(annotation["cuboids"]) frame[sensor_name] = data return frame def _load_labels(boxes: List[Dict[str, Any]]) -> List[LabeledBox3D]: labels = [] for box in boxes: dimension = box["dimensions"] position = box["position"] attributes = box["attributes"] attributes["stationary"] = box["stationary"] attributes["camera_used"] = box["camera_used"] attributes["points_count"] = box["points_count"] label = LabeledBox3D( size=( dimension["y"], # The "y" dimension is the width from front to back. dimension["x"], # The "x" dimension is the width from left to right. dimension["z"], ), translation=( position["x"], # "x" axis points to the forward facing direction of the object. position["y"], # "y" axis points to the left direction of the object. position["z"], ), rotation=quaternion.from_rotation_vector((0, 0, box["yaw"])), category=box["label"], attributes=attributes, instance=box["uuid"], ) labels.append(label) return labels def _load_sensors(calib_path: str) -> Sensors: try: import yaml # pylint: disable=import-outside-toplevel except ModuleNotFoundError as error: raise ModuleImportError(module_name=error.name, package_name="pyyaml") from error sensors = Sensors() lidar = Lidar("LIDAR") lidar.set_extrinsics() sensors.add(lidar) with open(os.path.join(calib_path, "extrinsics.yaml"), encoding="utf-8") as fp: extrinsics = yaml.load(fp, Loader=yaml.FullLoader) for camera_calibration_file in glob(os.path.join(calib_path, "[0-9]*.yaml")): with open(camera_calibration_file, encoding="utf-8") as fp: camera_calibration = yaml.load(fp, Loader=yaml.FullLoader) # camera_calibration_file looks like: # /path-to-CADC/2018_03_06/calib/00.yaml camera_name = f"CAM{os.path.splitext(os.path.basename(camera_calibration_file))[0]}" camera = Camera(camera_name) camera.description = camera_calibration["camera_name"] camera.set_extrinsics(matrix=extrinsics[f"T_LIDAR_{camera_name}"]) camera_matrix = camera_calibration["camera_matrix"]["data"] camera.set_camera_matrix(matrix=[camera_matrix[:3], camera_matrix[3:6], camera_matrix[6:9]]) distortion = camera_calibration["distortion_coefficients"]["data"] camera.set_distortion_coefficients(**dict(zip(("k1", "k2", "p1", "p2", "k3"), distortion))) sensors.add(camera) return sensors