Source code for tensorbay.client.segment

#!/usr/bin/env python3
#
# Copyright 2021 Graviti. Licensed under MIT License.
#

"""The segment of remote dataset on TensorBay."""

import os
import time
from copy import deepcopy
from itertools import zip_longest
from typing import TYPE_CHECKING, Any, Dict, Generator, Iterable, Optional, Tuple, Union

import filetype
from requests_toolbelt import MultipartEncoder
from ulid import ULID, from_timestamp

from tensorbay.client.lazy import LazyPage, PagingList
from tensorbay.client.status import Status
from tensorbay.dataset import AuthData, Data, Frame, RemoteData
from tensorbay.dataset.data import DataBase
from tensorbay.exception import FrameError, InvalidParamsError, ResourceNotExistError, ResponseError
from tensorbay.label import Label
from tensorbay.sensor.sensor import Sensor, Sensors
from tensorbay.utility import URL, FileMixin, chunked, config, locked

if TYPE_CHECKING:
    from tensorbay.client.dataset import DatasetClient, FusionDatasetClient

_STRATEGIES = {"abort", "override", "skip"}
_MASK_KEYS = ("semantic_mask", "instance_mask", "panoptic_mask")


[docs]class SegmentClientBase: """This class defines the basic concept of :class:`SegmentClient`. A :class:`SegmentClientBase` contains the information needed for determining a unique segment in a dataset on TensorBay. Arguments: name: Segment name. dataset_client: The dataset client. Attributes: name: Segment name. status: The status of the dataset client. """ _EXPIRED_IN_SECOND = 240 def __init__( self, name: str, dataset_client: "Union[DatasetClient, FusionDatasetClient]" ) -> None: self._name = name self._dataset_id = dataset_client.dataset_id self._dataset_client = dataset_client self._status = dataset_client.status self._client = dataset_client._client self._permission: Dict[str, Any] = {"expireAt": 0} if dataset_client.cache_enabled: self._cache_path: str = os.path.join( dataset_client._cache_path, dataset_client.status.commit_id, # type: ignore[arg-type] name, ) else: self._cache_path = "" def _get_url(self, remote_path: str) -> str: """Get URL of a specific remote path. Arguments: remote_path: The remote path of the file. Returns: The URL of the remote file. """ params: Dict[str, Any] = { "segmentName": self._name, "remotePath": remote_path, } params.update(self._status.get_status_info()) if config.is_internal: params["isInternal"] = True response = self._client.open_api_do("GET", "data/urls", self._dataset_id, params=params) return response.json()["urls"][0]["url"] # type: ignore[no-any-return] def _list_urls(self, offset: int = 0, limit: int = 128) -> Dict[str, Any]: params: Dict[str, Any] = { "segmentName": self._name, "offset": offset, "limit": limit, } params.update(self._status.get_status_info()) if config.is_internal: params["isInternal"] = True response = self._client.open_api_do("GET", "data/urls", self._dataset_id, params=params) return response.json() # type: ignore[no-any-return] def _get_data_details(self, remote_path: str) -> Dict[str, Any]: params: Dict[str, Any] = { "segmentName": self._name, "remotePath": remote_path, } params.update(self._status.get_status_info()) if config.is_internal: params["isInternal"] = True response = self._client.open_api_do("GET", "data/details", self._dataset_id, params=params) try: data_details = response.json()["dataDetails"][0] except IndexError as error: raise ResourceNotExistError(resource="data", identification=remote_path) from error return data_details # type: ignore[no-any-return] def _list_data_details(self, offset: int = 0, limit: int = 128) -> Dict[str, Any]: params: Dict[str, Any] = { "segmentName": self._name, "offset": offset, "limit": limit, } params.update(self._status.get_status_info()) if config.is_internal: params["isInternal"] = True response = self._client.open_api_do("GET", "data/details", self._dataset_id, params=params) return response.json() # type: ignore[no-any-return] def _get_mask_url(self, mask_type: str, remote_path: str) -> str: params: Dict[str, Any] = { "segmentName": self._name, "maskType": mask_type, "remotePath": remote_path, } params.update(self._status.get_status_info()) if config.is_internal: params["isInternal"] = True response = self._client.open_api_do("GET", "masks/urls", self._dataset_id, params=params) try: mask_url = response.json()["urls"][0]["url"] except IndexError as error: raise ResourceNotExistError( resource="{mask_type} of data", identification=remote_path ) from error return mask_url # type: ignore[no-any-return] def _list_mask_urls(self, mask_type: str, offset: int = 0, limit: int = 128) -> Dict[str, Any]: params: Dict[str, Any] = { "segmentName": self._name, "maskType": mask_type, "offset": offset, "limit": limit, } params.update(self._status.get_status_info()) if config.is_internal: params["isInternal"] = True response = self._client.open_api_do("GET", "masks/urls", self._dataset_id, params=params) return response.json() # type: ignore[no-any-return] def _list_labels(self, offset: int = 0, limit: int = 128) -> Dict[str, Any]: params: Dict[str, Any] = { "segmentName": self._name, "offset": offset, "limit": limit, } params.update(self._status.get_status_info()) response = self._client.open_api_do("GET", "labels", self._dataset_id, params=params) return response.json() # type: ignore[no-any-return] @locked def _request_upload_permission(self) -> None: params: Dict[str, Any] = {"expired": self._EXPIRED_IN_SECOND, "segmentName": self._name} params.update(self._status.get_status_info()) if config.is_internal: params["isInternal"] = True _permission = self._client.open_api_do( "GET", "policies", self._dataset_id, params=params ).json() result = _permission["result"] del result["multipleUploadLimit"] keys = [key for key, value in result.items() if value is None] for key in keys: del result[key] self._permission = _permission def _get_upload_permission(self) -> Dict[str, Any]: if int(time.time()) >= self._permission["expireAt"]: self._request_upload_permission() return deepcopy(self._permission) def _upload_file(self, data: FileMixin) -> None: """Upload the file in the data to the draft. Arguments: data: The data instance needs to be uploaded. """ permission = self._get_upload_permission() post_data = permission["result"] local_path = data.path checksum = data.get_checksum() post_data["key"] = permission["extra"]["objectPrefix"] + checksum host = permission["extra"]["host"] backend_type = permission["extra"]["backendType"] if backend_type == "azure": url = ( f'{permission["extra"]["host"]}{permission["extra"]["objectPrefix"]}' f'{checksum}?{permission["result"]["token"]}' ) self._put_binary_file_to_azure(url, local_path, post_data) elif backend_type == "fps": self._post_multipart_formdata( host, local_path, post_data, checksum, ) else: self._post_multipart_formdata( host, local_path, post_data, ) def _upload_mask_files(self, label: Label) -> None: for key in _MASK_KEYS: mask = getattr(label, key, None) if mask: self._upload_file(mask) def _post_multipart_formdata( self, url: str, local_path: str, data: Dict[str, Any], filename: str = "", ) -> None: with open(local_path, "rb") as fp: file_type = filetype.guess_mime(local_path) if "x-amz-date" in data: data["Content-Type"] = file_type try: data["file"] = (filename, fp, file_type) self._post_formdata(url, data) except ResponseError as error: if b"MalformedPOSTRequest" in error.response.content: data["file"] = ("workaroundForMalformedPostRequest", fp, file_type) self._post_formdata(url, data) else: raise def _post_formdata(self, url: str, data: Dict[str, Any]) -> None: multipart = MultipartEncoder(data) self._client.do( "POST", url, data=multipart, headers={"Content-Type": multipart.content_type}, ) def _put_binary_file_to_azure( self, url: str, local_path: str, data: Dict[str, Any], ) -> None: with open(local_path, "rb") as fp: file_type = filetype.guess_mime(local_path) request_headers = { "x-ms-blob-content-type": file_type, "x-ms-blob-type": data["x-ms-blob-type"], } self._client.do("PUT", url, data=fp, headers=request_headers) def _synchronize_import_info(self, callback_bodies: Tuple[Dict[str, Any], ...]) -> None: put_data: Dict[str, Any] = { "segmentName": self.name, "objects": callback_bodies, "deleteSource": False, } put_data.update(self._status.get_status_info()) self._client.open_api_do("PUT", "multi/cloud-callback", self._dataset_id, json=put_data) def _synchronize_upload_info( self, callback_bodies: Tuple[Dict[str, Any], ...], ) -> None: put_data: Dict[str, Any] = { "segmentName": self.name, "objects": callback_bodies, } put_data.update(self._status.get_status_info()) self._client.open_api_do("PUT", "multi/callback", self._dataset_id, json=put_data) def _upload_label(self, data: Union[AuthData, Data]) -> None: label = data.label.dumps() if not label: return post_data: Dict[str, Any] = { "segmentName": self.name, "remotePath": data.target_remote_path, "label": label, } post_data.update(self._status.get_status_info()) self._client.open_api_do("PUT", "labels", self._dataset_id, json=post_data) def _upload_multi_label(self, data: Iterable[DataBase._Type]) -> None: post_data: Dict[str, Any] = {"segmentName": self.name} objects = [] for single_data in data: label = single_data.label.dumps() if not label: continue remote_path = ( single_data.path if isinstance(single_data, RemoteData) else single_data.target_remote_path ) objects.append({"remotePath": remote_path, "label": label}) post_data["objects"] = objects post_data.update(self._status.get_status_info()) self._client.open_api_do("PUT", "multi/data/labels", self._dataset_id, json=post_data)
[docs] def upload_label(self, data: Union[DataBase._Type, Iterable[DataBase._Type]]) -> None: """Upload label with Data object to the draft. Arguments: data: The data object which represents the local file to upload. """ self._status.check_authority_for_draft() if not isinstance(data, Iterable): data = [data] for chunked_data in chunked(data, 128): for single_data in chunked_data: self._upload_mask_files(single_data.label) self._upload_multi_label(chunked_data)
@property def name(self) -> str: """Return the segment name. Returns: The segment name. """ return self._name @property def status(self) -> Status: """Return the status of the dataset client. Returns: The status of the dataset client. """ return self._status
[docs]class SegmentClient(SegmentClientBase): """This class defines :class:`SegmentClient`. :class:`SegmentClient` inherits from SegmentClientBase and provides methods within a segment scope, such as `upload_label()`, `upload_data()`, `list_data()` and so on. In contrast to FusionSegmentClient, :class:`SegmentClient` has only one sensor. """ _dataset_client: "DatasetClient" def __init__(self, name: str, data_client: "DatasetClient") -> None: super().__init__(name, data_client) def _generate_data_paths(self, offset: int = 0, limit: int = 128) -> Generator[str, None, int]: params: Dict[str, Any] = { "segmentName": self._name, "offset": offset, "limit": limit, } params.update(self._status.get_status_info()) response = self._client.open_api_do("GET", "data", self._dataset_id, params=params).json() for item in response["data"]: yield item["remotePath"] return response["totalCount"] # type: ignore[no-any-return] def _generate_data(self, offset: int = 0, limit: int = 128) -> Generator[RemoteData, None, int]: response = self._list_data_details(offset, limit) urls = LazyPage.from_items( offset, limit, self._generate_urls, (item["url"] for item in response["dataDetails"]), ) mask_urls = {} for key in _MASK_KEYS: mask_urls[key] = LazyPage.from_items( offset, limit, lambda offset, limit, k=key.upper(): ( # type: ignore[misc] self._generate_mask_urls(k, offset, limit) ), (item["label"].get(key.upper(), {}).get("url") for item in response["dataDetails"]), ) for i, item in enumerate(response["dataDetails"]): data = RemoteData.from_response_body( item, url=URL.from_getter(urls.items[i].get, urls.pull), cache_path=self._cache_path, ) label = data.label for key in _MASK_KEYS: mask = getattr(label, key, None) if mask: mask.url = URL.from_getter(mask_urls[key].items[i].get, mask_urls[key].pull) mask.cache_path = os.path.join(self._cache_path, key, mask.path) yield data return response["totalCount"] # type: ignore[no-any-return] def _generate_urls(self, offset: int = 0, limit: int = 128) -> Generator[str, None, int]: response = self._list_urls(offset, limit) for item in response["urls"]: yield item["url"] return response["totalCount"] # type: ignore[no-any-return] def _generate_mask_urls( self, mask_type: str, offset: int = 0, limit: int = 128 ) -> Generator[Optional[str], None, int]: response = self._list_mask_urls(mask_type, offset, limit) for item in response["urls"]: yield item["url"] if item else None return response["totalCount"] # type: ignore[no-any-return] def _upload_or_import_data(self, data: Union[Data, AuthData]) -> Optional[Dict[str, Any]]: if isinstance(data, Data): self._upload_file(data) self._upload_mask_files(data.label) return data.get_callback_body() self._synchronize_import_info((data.get_callback_body(),)) return None
[docs] def upload_file(self, local_path: str, target_remote_path: str = "") -> None: """Upload data with local path to the draft. Arguments: local_path: The local path of the data to upload. target_remote_path: The path to save the data in segment client. """ self._status.check_authority_for_draft() data = Data(local_path, target_remote_path=target_remote_path) self._upload_file(data) self._synchronize_upload_info((data.get_callback_body(),))
[docs] def upload_data(self, data: Data) -> None: """Upload Data object to the draft. Arguments: data: The :class:`~tensorbay.dataset.data.Data`. """ self._status.check_authority_for_draft() self._upload_file(data) self._upload_mask_files(data.label) self._synchronize_upload_info((data.get_callback_body(),))
[docs] def import_auth_data(self, data: AuthData) -> None: """Import AuthData object to the draft. Arguments: data: The :class:`~tensorbay.dataset.data.Data`. """ self._status.check_authority_for_draft() self._synchronize_import_info((data.get_callback_body(),))
[docs] def copy_data( self, source_remote_paths: Union[str, Iterable[str]], target_remote_paths: Union[None, str, Iterable[str]] = None, *, source_client: Optional["SegmentClient"] = None, strategy: str = "abort", ) -> None: """Copy data to this segment. Arguments: source_remote_paths: The source remote paths of the copied data. target_remote_paths: The target remote paths of the copied data. This argument is used to specify new remote paths of the copied data. If None, the remote path of the copied data will not be changed after copy. source_client: The source segment client of the copied data. This argument is used to specifies where the copied data comes from when the copied data is from another commit, draft, segment or even another dataset. If None, the copied data comes from this segment. strategy: The strategy of handling the name conflict. There are three options: 1. "abort": stop copying and raise exception; 2. "override": the source data will override the origin data; 3. "skip": keep the origin data. Raises: InvalidParamsError: When strategy is invalid. ValueError: When the type of target_remote_paths is not equal with source_remote_paths. """ self._status.check_authority_for_draft() if strategy not in _STRATEGIES: raise InvalidParamsError(param_name="strategy", param_value=strategy) if not target_remote_paths: all_target_remote_paths = [] all_source_remote_paths = ( [source_remote_paths] if isinstance(source_remote_paths, str) else list(source_remote_paths) ) elif isinstance(source_remote_paths, str) and isinstance(target_remote_paths, str): all_target_remote_paths = [target_remote_paths] all_source_remote_paths = [source_remote_paths] elif not isinstance(source_remote_paths, str) and not isinstance(target_remote_paths, str): all_target_remote_paths = list(target_remote_paths) all_source_remote_paths = list(source_remote_paths) if len(all_target_remote_paths) != len(all_source_remote_paths): raise ValueError( "To copy the data, the length of target_remote_paths " "must be equal with source_remote_paths" ) else: raise ValueError( "To copy the data, the type of target_remote_paths " "must be equal with source_remote_paths" ) source = {} if source_client: source["segmentName"] = source_client.name source["id"] = source_client._dataset_id # pylint: disable=protected-access source.update(source_client.status.get_status_info()) else: source["segmentName"] = self.name post_data: Dict[str, Any] = { "strategy": strategy, "source": source, "segmentName": self.name, } post_data.update(self._status.get_status_info()) for targets, sources in zip_longest( chunked(all_target_remote_paths, 128), chunked(all_source_remote_paths, 128) ): if targets: post_data["remotePaths"] = targets post_data["source"]["remotePaths"] = sources self._client.open_api_do("POST", "data?multipleCopy", self._dataset_id, json=post_data)
[docs] def move_data( self, source_remote_paths: Union[str, Iterable[str]], target_remote_paths: Union[None, str, Iterable[str]] = None, *, source_client: Optional["SegmentClient"] = None, strategy: str = "abort", ) -> None: """Move data to this segment, also used to rename data. Arguments: source_remote_paths: The source remote paths of the moved data. target_remote_paths: The target remote paths of the moved data. This argument is used to specify new remote paths of the moved data. If None, the remote path of the moved data will not be changed after copy. source_client: The source segment client of the moved data. This argument is used to specifies where the moved data comes from when the moved data is from another segment. If None, the moved data comes from this segment. strategy: The strategy of handling the name conflict. There are three options: 1. "abort": stop copying and raise exception; 2. "override": the source data will override the origin data; 3. "skip": keep the origin data. Raises: InvalidParamsError: When strategy is invalid. ValueError: When the type or the length of target_remote_paths is not equal with source_remote_paths. Or when the dataset_id and drafter_number of source_client is not equal with the current segment client. """ self._status.check_authority_for_draft() if strategy not in _STRATEGIES: raise InvalidParamsError(param_name="strategy", param_value=strategy) if not target_remote_paths: all_target_remote_paths = [] all_source_remote_paths = ( [source_remote_paths] if isinstance(source_remote_paths, str) else list(source_remote_paths) ) elif isinstance(source_remote_paths, str) and isinstance(target_remote_paths, str): all_target_remote_paths = [target_remote_paths] all_source_remote_paths = [source_remote_paths] elif not isinstance(source_remote_paths, str) and not isinstance(target_remote_paths, str): all_target_remote_paths = list(target_remote_paths) all_source_remote_paths = list(source_remote_paths) if len(all_target_remote_paths) != len(all_source_remote_paths): raise ValueError( "To move the data, the length of target_remote_paths " "must be equal with source_remote_paths" ) else: raise ValueError( "To move the data, the type of target_remote_paths " "must be equal with source_remote_paths" ) source = {} if source_client: if ( source_client.status.draft_number == self.status.draft_number and source_client._dataset_id # pylint: disable=protected-access == self._dataset_id ): source["segmentName"] = source_client.name else: raise ValueError( "To move the data, the dataset_id and drafter_number of source_client " "must be equal with the current segment client" ) else: source["segmentName"] = self.name post_data: Dict[str, Any] = { "strategy": strategy, "source": source, "segmentName": self.name, } post_data.update(self._status.get_status_info()) for targets, sources in zip_longest( chunked(all_target_remote_paths, 128), chunked(all_source_remote_paths, 128) ): if targets: post_data["remotePaths"] = targets post_data["source"]["remotePaths"] = sources self._client.open_api_do("POST", "data?multipleMove", self._dataset_id, json=post_data)
[docs] def list_data_paths(self) -> PagingList[str]: """List required data path in a segment in a certain commit. Returns: The PagingList of data paths. """ return PagingList(self._generate_data_paths, 128)
[docs] def get_data(self, remote_path: str) -> RemoteData: """Get required Data object from a dataset segment. Arguments: remote_path: The remote paths of the required data. Returns: :class:`~tensorbay.dataset.data.RemoteData`. Raises: ResourceNotExistError: When the required data does not exist. """ if not remote_path: raise ResourceNotExistError(resource="data", identification=remote_path) data_details = self._get_data_details(remote_path) data = RemoteData.from_response_body( data_details, url=URL(data_details["url"], lambda: self._get_url(remote_path)), cache_path=self._cache_path, ) label = data.label for key in _MASK_KEYS: mask = getattr(label, key, None) if mask: mask.url = URL( data_details["label"][key.upper()]["url"], lambda k=key.upper(), r=remote_path: ( # type: ignore[misc, arg-type] self._get_mask_url(k, r) ), ) mask.cache_path = os.path.join(self._cache_path, key, mask.path) return data
[docs] def list_data(self) -> PagingList[RemoteData]: """List required Data object in a dataset segment. Returns: The PagingList of :class:`~tensorbay.dataset.data.RemoteData`. """ return PagingList(self._generate_data, 128)
[docs] def delete_data(self, remote_path: str) -> None: """Delete data of a segment in a certain commit with the given remote paths. Arguments: remote_path: The remote path of data in a segment. """ self._status.check_authority_for_draft() delete_data: Dict[str, Any] = { "segmentName": self.name, "remotePath": remote_path, } delete_data.update(self._status.get_status_info()) self._client.open_api_do("DELETE", "data", self._dataset_id, json=delete_data)
[docs] def list_urls(self) -> PagingList[str]: """List the data urls in this segment. Returns: The PagingList of urls. """ return PagingList(self._generate_urls, 128)
[docs] def list_mask_urls(self, mask_type: str) -> PagingList[Optional[str]]: """List the mask urls in this segment. Arguments: mask_type: The required mask type, the supported types are ``SEMANTIC_MASK``, ``INSTANCE_MASK`` and ``PANOPTIC_MASK`` Returns: The PagingList of mask urls. """ return PagingList( lambda offset, limit: self._generate_mask_urls(mask_type, offset, limit), 128 )
[docs]class FusionSegmentClient(SegmentClientBase): """This class defines :class:`FusionSegmentClient`. :class:`FusionSegmentClient` inherits from :class:`SegmentClientBase` and provides methods within a fusion segment scope, such as :meth:`FusionSegmentClient.upload_sensor`, :meth:`FusionSegmentClient.upload_frame` and :meth:`FusionSegmentClient.list_frames`. In contrast to :class:`SegmentClient`, :class:`FusionSegmentClient` has multiple sensors. """ _dataset_client: "FusionDatasetClient" def __init__(self, name: str, data_client: "FusionDatasetClient") -> None: super().__init__(name, data_client) def _generate_frames(self, offset: int = 0, limit: int = 128) -> Generator[Frame, None, int]: response = self._list_data_details(offset, limit) url_page = LazyPage.from_items( offset, limit, self._generate_urls, ( {frame["sensorName"]: frame["url"] for frame in item["frame"]} for item in response["dataDetails"] ), ) for index, item in enumerate(response["dataDetails"]): yield Frame.from_response_body(item, index, url_page, cache_path=self._cache_path) return response["totalCount"] # type: ignore[no-any-return] def _generate_urls( self, offset: int = 0, limit: int = 128 ) -> Generator[Dict[str, str], None, int]: response = self._list_urls(offset, limit) for frame in response["urls"]: yield {item["sensorName"]: item["url"] for item in frame["urls"]} return response["totalCount"] # type: ignore[no-any-return] def _upload_or_import_data( self, data: Union[Data, AuthData], sensor_name: str, frame_id: str, ) -> Optional[Dict[str, Any]]: callback_body = data.get_callback_body() callback_body["frameId"] = frame_id callback_body["sensorName"] = sensor_name if isinstance(data, Data): self._upload_file(data) self._upload_mask_files(data.label) return callback_body self._synchronize_import_info((callback_body,)) return None
[docs] def get_sensors(self) -> Sensors: """Return the sensors in a fusion segment client. Returns: The :class:`sensors<~tensorbay.sensor.sensor.Sensors>` in the fusion segment client. """ params: Dict[str, Any] = {"segmentName": self._name} params.update(self._status.get_status_info()) response = self._client.open_api_do( "GET", "sensors", self._dataset_id, params=params ).json() return Sensors.loads(response["sensors"])
[docs] def upload_sensor(self, sensor: Sensor) -> None: """Upload sensor to the draft. Arguments: sensor: The sensor to upload. """ self._status.check_authority_for_draft() post_data = sensor.dumps() post_data.update(self._status.get_status_info()) post_data["segmentName"] = self._name self._client.open_api_do("POST", "sensors", self._dataset_id, json=post_data)
[docs] def delete_sensor(self, sensor_name: str) -> None: """Delete a TensorBay sensor of the draft with the given sensor name. Arguments: sensor_name: The TensorBay sensor to delete. """ self._status.check_authority_for_draft() delete_data: Dict[str, Any] = {"segmentName": self._name, "sensorName": sensor_name} delete_data.update(self._status.get_status_info()) self._client.open_api_do("DELETE", "sensors", self._dataset_id, json=delete_data)
[docs] def upload_frame(self, frame: Frame, timestamp: Optional[float] = None) -> None: """Upload frame to the draft. Arguments: frame: The :class:`~tensorbay.dataset.frame.Frame` to upload. timestamp: The mark to sort frames, supporting timestamp and float. Raises: FrameError: When lacking frame id or frame id conflicts. """ self._status.check_authority_for_draft() if timestamp is None: try: frame_id = frame.frame_id except AttributeError as error: raise FrameError( "Lack frame id, please add frame id in frame or " "give timestamp to the function!" ) from error elif not hasattr(frame, "frame_id"): frame_id = from_timestamp(timestamp) else: raise FrameError("Frame id conflicts, please do not give timestamp to the function!.") callback_bodies = [] for sensor_name, data in frame.items(): try: callback_body = data.get_callback_body() except AttributeError: continue callback_body["frameId"] = frame_id.str callback_body["sensorName"] = sensor_name if isinstance(data, Data): self._upload_file(data) self._upload_mask_files(data.label) callback_bodies.append(callback_body) elif isinstance(data, AuthData): self._synchronize_import_info((callback_body,)) for chunked_callback_bodies in chunked(callback_bodies, 50): self._synchronize_upload_info(chunked_callback_bodies)
[docs] def list_frames(self) -> PagingList[Frame]: """List required frames in the segment in a certain commit. Returns: The PagingList of :class:`~tensorbay.dataset.frame.Frame`. """ return PagingList(self._generate_frames, 128)
[docs] def delete_frame(self, frame_id: Union[str, ULID]) -> None: """Delete a frame of a segment in a certain commit with the given frame id. Arguments: frame_id: The id of a frame in a segment. """ self._status.check_authority_for_draft() delete_data: Dict[str, Any] = { "segmentName": self.name, "frameId": str(frame_id), } delete_data.update(self._status.get_status_info()) self._client.open_api_do("DELETE", "frames", self._dataset_id, json=delete_data)
[docs] def list_urls(self) -> PagingList[Dict[str, str]]: """List the data urls in this segment. Returns: The PagingList of url dict, which key is the sensor name, value is the url. """ urls = PagingList(self._generate_urls, 128) urls._repr_maxlevel = 2 # pylint: disable=protected-access return urls