Source code for tensorbay.dataset.frame

#!/usr/bin/env python3
# Copyright 2021 Graviti. Licensed under MIT License.

"""The implementation of the TensorBay frame."""

import logging
from typing import Any, Dict, Optional, Type, TypeVar
from uuid import UUID

from ulid import ULID, from_str, from_uuid

from tensorbay.client.lazy import LazyPage
from import DataBase, RemoteData
from tensorbay.utility import URL, UserMutableMapping

logger = logging.getLogger(__name__)

[docs]class Frame(UserMutableMapping[str, "DataBase._Type"]): """This class defines the concept of frame. Frame is a concept in :class:`~tensorbay.dataset.dataset.FusionDataset`. It is the structure that composes :class:`~tensorbay.dataset.segment.FusionSegment`, and consists of multiple :class:`` collected at the same time corresponding to different sensors. Since :class:`Frame` extends :class:`~tensorbay.utility.user.UserMutableMapping`, its basic operations are the same as a dictionary's. To initialize a Frame and add a :class:`` to it: .. code:: python frame = Frame() frame[sensor_name] = Data() """ _T = TypeVar("_T", bound="Frame") _logger_flag = True def __init__(self, frame_id: Optional[ULID] = None) -> None: self._data: Dict[str, DataBase._Type] = {} # self._pose: Optional[Transform3D] = None if frame_id: self.frame_id = frame_id def _repr_head(self) -> str: if hasattr(self, "frame_id"): return f'{self.__class__.__name__}("{self.frame_id}")' return self.__class__.__name__
[docs] @classmethod def from_response_body( cls: Type[_T], body: Dict[str, Any], url_index: int, urls: LazyPage[Dict[str, str]], *, cache_path: str = "", ) -> _T: """Loads a :class:`Frame` object from a response body. Arguments: body: The response body which contains the information of a frame, whose format should be like:: { "frameId": <str>, "frame": [ { "sensorName": <str>, "remotePath": <str>, "timestamp": <float>, "url": <str>, "label": {...} }, ... ... ] } url_index: The index of the url. urls: A sequence of mappings which key is the sensor name and value is the url. cache_path: The path to store the cache. Returns: The loaded :class:`Frame` object. """ # noqa: DAR101 # try: frame_id = from_str(body["frameId"]) except ValueError: # Legacy fusion dataset use uuid as frame ID # Keep this code here to make SDK compatible with uuid frame_id = from_uuid(UUID(body["frameId"])) if cls._logger_flag: cls._logger_flag = False logger.warning( "WARNING: This is a legacy fusion dataset which use uuid as frame ID, " "it should be updated to ulid." ) frame = cls(frame_id) for data_contents in body["frame"]: sensor_name = data_contents["sensorName"] url = URL.from_getter(lambda s=sensor_name: urls.items[url_index].get()[s], urls.pull) frame[sensor_name] = RemoteData.from_response_body( data_contents, url=url, cache_path=cache_path, ) return frame
# @property # def pose(self) -> Optional[Transform3D]: # """Return the pose of the frame. # Returns: # A :class:`~tensorbay.geometry.transform.Transform3D` object # representing the pose of the frame. # """ # return self._pose # def set_pose( # self, # translation: Optional[Iterable[float]] = (0, 0, 0), # rotation: Transform3D.RotationType = (1, 0, 0, 0), # *, # matrix: Optional[MatrixType] = None, # ) -> None: # """Set the pose of the current frame. # Arguments: # translation: Translation of the frame pose in a sequence of [x, y, z]. # rotation: Rotation of the frame pose in a sequence of [w, x, y, z] # or a numpy quaternion object. # matrix: The transform representing the frame pose in # a 4x4 or 3x4 transform matrix. # """ # self._pose = Transform3D(translation, rotation, matrix=matrix)