#!/usr/bin/env python3
#
# Copyright 2021 Graviti. Licensed under MIT License.
#
"""The segment of remote dataset on TensorBay."""
import os
import time
from copy import deepcopy
from itertools import zip_longest
from typing import TYPE_CHECKING, Any, Dict, Generator, Iterable, Optional, Tuple, Union
import filetype
from requests_toolbelt import MultipartEncoder
from ulid import ULID, from_timestamp
from tensorbay.client.lazy import LazyPage, PagingList
from tensorbay.client.status import Status
from tensorbay.dataset import AuthData, Data, Frame, RemoteData
from tensorbay.dataset.data import DataBase
from tensorbay.exception import FrameError, InvalidParamsError, ResourceNotExistError, ResponseError
from tensorbay.label import Label
from tensorbay.sensor.sensor import Sensor, Sensors
from tensorbay.utility import URL, FileMixin, chunked, config, locked
if TYPE_CHECKING:
from tensorbay.client.dataset import DatasetClient, FusionDatasetClient
_STRATEGIES = {"abort", "override", "skip"}
_MASK_KEYS = ("semantic_mask", "instance_mask", "panoptic_mask")
[docs]class SegmentClientBase:
"""This class defines the basic concept of :class:`SegmentClient`.
A :class:`SegmentClientBase` contains the information needed for determining
a unique segment in a dataset on TensorBay.
Arguments:
name: Segment name.
dataset_client: The dataset client.
Attributes:
name: Segment name.
status: The status of the dataset client.
"""
_EXPIRED_IN_SECOND = 240
def __init__(
self, name: str, dataset_client: "Union[DatasetClient, FusionDatasetClient]"
) -> None:
self._name = name
self._dataset_id = dataset_client.dataset_id
self._dataset_client = dataset_client
self._status = dataset_client.status
self._client = dataset_client._client
self._permission: Dict[str, Any] = {"expireAt": 0}
if dataset_client.cache_enabled:
self._cache_path: str = os.path.join(
dataset_client._cache_path,
dataset_client.status.commit_id, # type: ignore[arg-type]
name,
)
else:
self._cache_path = ""
def _get_url(self, remote_path: str) -> str:
"""Get URL of a specific remote path.
Arguments:
remote_path: The remote path of the file.
Returns:
The URL of the remote file.
"""
params: Dict[str, Any] = {
"segmentName": self._name,
"remotePath": remote_path,
}
params.update(self._status.get_status_info())
if config.is_internal:
params["isInternal"] = True
response = self._client.open_api_do("GET", "data/urls", self._dataset_id, params=params)
return response.json()["urls"][0]["url"] # type: ignore[no-any-return]
def _list_urls(self, offset: int = 0, limit: int = 128) -> Dict[str, Any]:
params: Dict[str, Any] = {
"segmentName": self._name,
"offset": offset,
"limit": limit,
}
params.update(self._status.get_status_info())
if config.is_internal:
params["isInternal"] = True
response = self._client.open_api_do("GET", "data/urls", self._dataset_id, params=params)
return response.json() # type: ignore[no-any-return]
def _get_data_details(self, remote_path: str) -> Dict[str, Any]:
params: Dict[str, Any] = {
"segmentName": self._name,
"remotePath": remote_path,
}
params.update(self._status.get_status_info())
if config.is_internal:
params["isInternal"] = True
response = self._client.open_api_do("GET", "data/details", self._dataset_id, params=params)
try:
data_details = response.json()["dataDetails"][0]
except IndexError as error:
raise ResourceNotExistError(resource="data", identification=remote_path) from error
return data_details # type: ignore[no-any-return]
def _list_data_details(self, offset: int = 0, limit: int = 128) -> Dict[str, Any]:
params: Dict[str, Any] = {
"segmentName": self._name,
"offset": offset,
"limit": limit,
}
params.update(self._status.get_status_info())
if config.is_internal:
params["isInternal"] = True
response = self._client.open_api_do("GET", "data/details", self._dataset_id, params=params)
return response.json() # type: ignore[no-any-return]
def _get_mask_url(self, mask_type: str, remote_path: str) -> str:
params: Dict[str, Any] = {
"segmentName": self._name,
"maskType": mask_type,
"remotePath": remote_path,
}
params.update(self._status.get_status_info())
if config.is_internal:
params["isInternal"] = True
response = self._client.open_api_do("GET", "masks/urls", self._dataset_id, params=params)
try:
mask_url = response.json()["urls"][0]["url"]
except IndexError as error:
raise ResourceNotExistError(
resource="{mask_type} of data", identification=remote_path
) from error
return mask_url # type: ignore[no-any-return]
def _list_mask_urls(self, mask_type: str, offset: int = 0, limit: int = 128) -> Dict[str, Any]:
params: Dict[str, Any] = {
"segmentName": self._name,
"maskType": mask_type,
"offset": offset,
"limit": limit,
}
params.update(self._status.get_status_info())
if config.is_internal:
params["isInternal"] = True
response = self._client.open_api_do("GET", "masks/urls", self._dataset_id, params=params)
return response.json() # type: ignore[no-any-return]
def _list_labels(self, offset: int = 0, limit: int = 128) -> Dict[str, Any]:
params: Dict[str, Any] = {
"segmentName": self._name,
"offset": offset,
"limit": limit,
}
params.update(self._status.get_status_info())
response = self._client.open_api_do("GET", "labels", self._dataset_id, params=params)
return response.json() # type: ignore[no-any-return]
@locked
def _request_upload_permission(self) -> None:
params: Dict[str, Any] = {"expired": self._EXPIRED_IN_SECOND, "segmentName": self._name}
params.update(self._status.get_status_info())
if config.is_internal:
params["isInternal"] = True
_permission = self._client.open_api_do(
"GET", "policies", self._dataset_id, params=params
).json()
result = _permission["result"]
del result["multipleUploadLimit"]
keys = [key for key, value in result.items() if value is None]
for key in keys:
del result[key]
self._permission = _permission
def _get_upload_permission(self) -> Dict[str, Any]:
if int(time.time()) >= self._permission["expireAt"]:
self._request_upload_permission()
return deepcopy(self._permission)
def _upload_file(self, data: FileMixin) -> None:
"""Upload the file in the data to the draft.
Arguments:
data: The data instance needs to be uploaded.
"""
permission = self._get_upload_permission()
post_data = permission["result"]
local_path = data.path
checksum = data.get_checksum()
post_data["key"] = permission["extra"]["objectPrefix"] + checksum
host = permission["extra"]["host"]
backend_type = permission["extra"]["backendType"]
if backend_type == "azure":
url = (
f'{permission["extra"]["host"]}{permission["extra"]["objectPrefix"]}'
f'{checksum}?{permission["result"]["token"]}'
)
self._put_binary_file_to_azure(url, local_path, post_data)
elif backend_type == "fps":
self._post_multipart_formdata(
host,
local_path,
post_data,
checksum,
)
else:
self._post_multipart_formdata(
host,
local_path,
post_data,
)
def _upload_mask_files(self, label: Label) -> None:
for key in _MASK_KEYS:
mask = getattr(label, key, None)
if mask:
self._upload_file(mask)
def _post_multipart_formdata(
self,
url: str,
local_path: str,
data: Dict[str, Any],
filename: str = "",
) -> None:
with open(local_path, "rb") as fp:
file_type = filetype.guess_mime(local_path)
if "x-amz-date" in data:
data["Content-Type"] = file_type
try:
data["file"] = (filename, fp, file_type)
self._post_formdata(url, data)
except ResponseError as error:
if b"MalformedPOSTRequest" in error.response.content:
data["file"] = ("workaroundForMalformedPostRequest", fp, file_type)
self._post_formdata(url, data)
else:
raise
def _post_formdata(self, url: str, data: Dict[str, Any]) -> None:
multipart = MultipartEncoder(data)
self._client.do(
"POST",
url,
data=multipart,
headers={"Content-Type": multipart.content_type},
)
def _put_binary_file_to_azure(
self,
url: str,
local_path: str,
data: Dict[str, Any],
) -> None:
with open(local_path, "rb") as fp:
file_type = filetype.guess_mime(local_path)
request_headers = {
"x-ms-blob-content-type": file_type,
"x-ms-blob-type": data["x-ms-blob-type"],
}
self._client.do("PUT", url, data=fp, headers=request_headers)
def _synchronize_import_info(self, callback_bodies: Tuple[Dict[str, Any], ...]) -> None:
put_data: Dict[str, Any] = {
"segmentName": self.name,
"objects": callback_bodies,
"deleteSource": False,
}
put_data.update(self._status.get_status_info())
self._client.open_api_do("PUT", "multi/cloud-callback", self._dataset_id, json=put_data)
def _synchronize_upload_info(
self,
callback_bodies: Tuple[Dict[str, Any], ...],
) -> None:
put_data: Dict[str, Any] = {
"segmentName": self.name,
"objects": callback_bodies,
}
put_data.update(self._status.get_status_info())
self._client.open_api_do("PUT", "multi/callback", self._dataset_id, json=put_data)
def _upload_label(self, data: Union[AuthData, Data]) -> None:
label = data.label.dumps()
if not label:
return
post_data: Dict[str, Any] = {
"segmentName": self.name,
"remotePath": data.target_remote_path,
"label": label,
}
post_data.update(self._status.get_status_info())
self._client.open_api_do("PUT", "labels", self._dataset_id, json=post_data)
def _upload_multi_label(self, data: Iterable[DataBase._Type]) -> None:
post_data: Dict[str, Any] = {"segmentName": self.name}
objects = []
for single_data in data:
label = single_data.label.dumps()
if not label:
continue
remote_path = (
single_data.path
if isinstance(single_data, RemoteData)
else single_data.target_remote_path
)
objects.append({"remotePath": remote_path, "label": label})
post_data["objects"] = objects
post_data.update(self._status.get_status_info())
self._client.open_api_do("PUT", "multi/data/labels", self._dataset_id, json=post_data)
[docs] def upload_label(self, data: Union[DataBase._Type, Iterable[DataBase._Type]]) -> None:
"""Upload label with Data object to the draft.
Arguments:
data: The data object which represents the local file to upload.
"""
self._status.check_authority_for_draft()
if not isinstance(data, Iterable):
data = [data]
for chunked_data in chunked(data, 128):
for single_data in chunked_data:
self._upload_mask_files(single_data.label)
self._upload_multi_label(chunked_data)
@property
def name(self) -> str:
"""Return the segment name.
Returns:
The segment name.
"""
return self._name
@property
def status(self) -> Status:
"""Return the status of the dataset client.
Returns:
The status of the dataset client.
"""
return self._status
[docs]class SegmentClient(SegmentClientBase):
"""This class defines :class:`SegmentClient`.
:class:`SegmentClient` inherits from SegmentClientBase and provides methods within a
segment scope, such as `upload_label()`, `upload_data()`, `list_data()` and so on.
In contrast to FusionSegmentClient, :class:`SegmentClient` has only one sensor.
"""
_dataset_client: "DatasetClient"
def __init__(self, name: str, data_client: "DatasetClient") -> None:
super().__init__(name, data_client)
def _generate_data_paths(self, offset: int = 0, limit: int = 128) -> Generator[str, None, int]:
params: Dict[str, Any] = {
"segmentName": self._name,
"offset": offset,
"limit": limit,
}
params.update(self._status.get_status_info())
response = self._client.open_api_do("GET", "data", self._dataset_id, params=params).json()
for item in response["data"]:
yield item["remotePath"]
return response["totalCount"] # type: ignore[no-any-return]
def _generate_data(self, offset: int = 0, limit: int = 128) -> Generator[RemoteData, None, int]:
response = self._list_data_details(offset, limit)
urls = LazyPage.from_items(
offset,
limit,
self._generate_urls,
(item["url"] for item in response["dataDetails"]),
)
mask_urls = {}
for key in _MASK_KEYS:
mask_urls[key] = LazyPage.from_items(
offset,
limit,
lambda offset, limit, k=key.upper(): ( # type: ignore[misc]
self._generate_mask_urls(k, offset, limit)
),
(item["label"].get(key.upper(), {}).get("url") for item in response["dataDetails"]),
)
for i, item in enumerate(response["dataDetails"]):
data = RemoteData.from_response_body(
item,
url=URL.from_getter(urls.items[i].get, urls.pull),
cache_path=self._cache_path,
)
label = data.label
for key in _MASK_KEYS:
mask = getattr(label, key, None)
if mask:
mask.url = URL.from_getter(mask_urls[key].items[i].get, mask_urls[key].pull)
mask.cache_path = os.path.join(self._cache_path, key, mask.path)
yield data
return response["totalCount"] # type: ignore[no-any-return]
def _generate_urls(self, offset: int = 0, limit: int = 128) -> Generator[str, None, int]:
response = self._list_urls(offset, limit)
for item in response["urls"]:
yield item["url"]
return response["totalCount"] # type: ignore[no-any-return]
def _generate_mask_urls(
self, mask_type: str, offset: int = 0, limit: int = 128
) -> Generator[Optional[str], None, int]:
response = self._list_mask_urls(mask_type, offset, limit)
for item in response["urls"]:
yield item["url"] if item else None
return response["totalCount"] # type: ignore[no-any-return]
def _upload_or_import_data(self, data: Union[Data, AuthData]) -> Optional[Dict[str, Any]]:
if isinstance(data, Data):
self._upload_file(data)
self._upload_mask_files(data.label)
return data.get_callback_body()
self._synchronize_import_info((data.get_callback_body(),))
return None
[docs] def upload_file(self, local_path: str, target_remote_path: str = "") -> None:
"""Upload data with local path to the draft.
Arguments:
local_path: The local path of the data to upload.
target_remote_path: The path to save the data in segment client.
"""
self._status.check_authority_for_draft()
data = Data(local_path, target_remote_path=target_remote_path)
self._upload_file(data)
self._synchronize_upload_info((data.get_callback_body(),))
[docs] def upload_data(self, data: Data) -> None:
"""Upload Data object to the draft.
Arguments:
data: The :class:`~tensorbay.dataset.data.Data`.
"""
self._status.check_authority_for_draft()
self._upload_file(data)
self._upload_mask_files(data.label)
self._synchronize_upload_info((data.get_callback_body(),))
[docs] def import_auth_data(self, data: AuthData) -> None:
"""Import AuthData object to the draft.
Arguments:
data: The :class:`~tensorbay.dataset.data.Data`.
"""
self._status.check_authority_for_draft()
self._synchronize_import_info((data.get_callback_body(),))
[docs] def copy_data(
self,
source_remote_paths: Union[str, Iterable[str]],
target_remote_paths: Union[None, str, Iterable[str]] = None,
*,
source_client: Optional["SegmentClient"] = None,
strategy: str = "abort",
) -> None:
"""Copy data to this segment.
Arguments:
source_remote_paths: The source remote paths of the copied data.
target_remote_paths: The target remote paths of the copied data.
This argument is used to specify new remote paths of the copied data.
If None, the remote path of the copied data will not be changed after copy.
source_client: The source segment client of the copied data.
This argument is used to specifies where the copied data comes from when the copied
data is from another commit, draft, segment or even another dataset.
If None, the copied data comes from this segment.
strategy: The strategy of handling the name conflict. There are three options:
1. "abort": stop copying and raise exception;
2. "override": the source data will override the origin data;
3. "skip": keep the origin data.
Raises:
InvalidParamsError: When strategy is invalid.
ValueError: When the type of target_remote_paths is not equal
with source_remote_paths.
"""
self._status.check_authority_for_draft()
if strategy not in _STRATEGIES:
raise InvalidParamsError(param_name="strategy", param_value=strategy)
if not target_remote_paths:
all_target_remote_paths = []
all_source_remote_paths = (
[source_remote_paths]
if isinstance(source_remote_paths, str)
else list(source_remote_paths)
)
elif isinstance(source_remote_paths, str) and isinstance(target_remote_paths, str):
all_target_remote_paths = [target_remote_paths]
all_source_remote_paths = [source_remote_paths]
elif not isinstance(source_remote_paths, str) and not isinstance(target_remote_paths, str):
all_target_remote_paths = list(target_remote_paths)
all_source_remote_paths = list(source_remote_paths)
if len(all_target_remote_paths) != len(all_source_remote_paths):
raise ValueError(
"To copy the data, the length of target_remote_paths "
"must be equal with source_remote_paths"
)
else:
raise ValueError(
"To copy the data, the type of target_remote_paths "
"must be equal with source_remote_paths"
)
source = {}
if source_client:
source["segmentName"] = source_client.name
source["id"] = source_client._dataset_id # pylint: disable=protected-access
source.update(source_client.status.get_status_info())
else:
source["segmentName"] = self.name
post_data: Dict[str, Any] = {
"strategy": strategy,
"source": source,
"segmentName": self.name,
}
post_data.update(self._status.get_status_info())
for targets, sources in zip_longest(
chunked(all_target_remote_paths, 128), chunked(all_source_remote_paths, 128)
):
if targets:
post_data["remotePaths"] = targets
post_data["source"]["remotePaths"] = sources
self._client.open_api_do("POST", "data?multipleCopy", self._dataset_id, json=post_data)
[docs] def move_data(
self,
source_remote_paths: Union[str, Iterable[str]],
target_remote_paths: Union[None, str, Iterable[str]] = None,
*,
source_client: Optional["SegmentClient"] = None,
strategy: str = "abort",
) -> None:
"""Move data to this segment, also used to rename data.
Arguments:
source_remote_paths: The source remote paths of the moved data.
target_remote_paths: The target remote paths of the moved data.
This argument is used to specify new remote paths of the moved data.
If None, the remote path of the moved data will not be changed after copy.
source_client: The source segment client of the moved data.
This argument is used to specifies where the moved data comes from when the moved
data is from another segment.
If None, the moved data comes from this segment.
strategy: The strategy of handling the name conflict. There are three options:
1. "abort": stop copying and raise exception;
2. "override": the source data will override the origin data;
3. "skip": keep the origin data.
Raises:
InvalidParamsError: When strategy is invalid.
ValueError: When the type or the length of target_remote_paths is not equal
with source_remote_paths.
Or when the dataset_id and drafter_number of source_client
is not equal with the current segment client.
"""
self._status.check_authority_for_draft()
if strategy not in _STRATEGIES:
raise InvalidParamsError(param_name="strategy", param_value=strategy)
if not target_remote_paths:
all_target_remote_paths = []
all_source_remote_paths = (
[source_remote_paths]
if isinstance(source_remote_paths, str)
else list(source_remote_paths)
)
elif isinstance(source_remote_paths, str) and isinstance(target_remote_paths, str):
all_target_remote_paths = [target_remote_paths]
all_source_remote_paths = [source_remote_paths]
elif not isinstance(source_remote_paths, str) and not isinstance(target_remote_paths, str):
all_target_remote_paths = list(target_remote_paths)
all_source_remote_paths = list(source_remote_paths)
if len(all_target_remote_paths) != len(all_source_remote_paths):
raise ValueError(
"To move the data, the length of target_remote_paths "
"must be equal with source_remote_paths"
)
else:
raise ValueError(
"To move the data, the type of target_remote_paths "
"must be equal with source_remote_paths"
)
source = {}
if source_client:
if (
source_client.status.draft_number == self.status.draft_number
and source_client._dataset_id # pylint: disable=protected-access
== self._dataset_id
):
source["segmentName"] = source_client.name
else:
raise ValueError(
"To move the data, the dataset_id and drafter_number of source_client "
"must be equal with the current segment client"
)
else:
source["segmentName"] = self.name
post_data: Dict[str, Any] = {
"strategy": strategy,
"source": source,
"segmentName": self.name,
}
post_data.update(self._status.get_status_info())
for targets, sources in zip_longest(
chunked(all_target_remote_paths, 128), chunked(all_source_remote_paths, 128)
):
if targets:
post_data["remotePaths"] = targets
post_data["source"]["remotePaths"] = sources
self._client.open_api_do("POST", "data?multipleMove", self._dataset_id, json=post_data)
[docs] def list_data_paths(self) -> PagingList[str]:
"""List required data path in a segment in a certain commit.
Returns:
The PagingList of data paths.
"""
return PagingList(self._generate_data_paths, 128)
[docs] def get_data(self, remote_path: str) -> RemoteData:
"""Get required Data object from a dataset segment.
Arguments:
remote_path: The remote paths of the required data.
Returns:
:class:`~tensorbay.dataset.data.RemoteData`.
Raises:
ResourceNotExistError: When the required data does not exist.
"""
if not remote_path:
raise ResourceNotExistError(resource="data", identification=remote_path)
data_details = self._get_data_details(remote_path)
data = RemoteData.from_response_body(
data_details,
url=URL(data_details["url"], lambda: self._get_url(remote_path)),
cache_path=self._cache_path,
)
label = data.label
for key in _MASK_KEYS:
mask = getattr(label, key, None)
if mask:
mask.url = URL(
data_details["label"][key.upper()]["url"],
lambda k=key.upper(), r=remote_path: ( # type: ignore[misc, arg-type]
self._get_mask_url(k, r)
),
)
mask.cache_path = os.path.join(self._cache_path, key, mask.path)
return data
[docs] def list_data(self) -> PagingList[RemoteData]:
"""List required Data object in a dataset segment.
Returns:
The PagingList of :class:`~tensorbay.dataset.data.RemoteData`.
"""
return PagingList(self._generate_data, 128)
[docs] def delete_data(self, remote_path: str) -> None:
"""Delete data of a segment in a certain commit with the given remote paths.
Arguments:
remote_path: The remote path of data in a segment.
"""
self._status.check_authority_for_draft()
delete_data: Dict[str, Any] = {
"segmentName": self.name,
"remotePath": remote_path,
}
delete_data.update(self._status.get_status_info())
self._client.open_api_do("DELETE", "data", self._dataset_id, json=delete_data)
[docs] def list_urls(self) -> PagingList[str]:
"""List the data urls in this segment.
Returns:
The PagingList of urls.
"""
return PagingList(self._generate_urls, 128)
[docs] def list_mask_urls(self, mask_type: str) -> PagingList[Optional[str]]:
"""List the mask urls in this segment.
Arguments:
mask_type: The required mask type, the supported types are
``SEMANTIC_MASK``, ``INSTANCE_MASK`` and ``PANOPTIC_MASK``
Returns:
The PagingList of mask urls.
"""
return PagingList(
lambda offset, limit: self._generate_mask_urls(mask_type, offset, limit), 128
)
[docs]class FusionSegmentClient(SegmentClientBase):
"""This class defines :class:`FusionSegmentClient`.
:class:`FusionSegmentClient` inherits from :class:`SegmentClientBase` and provides
methods within a fusion segment scope, such as
:meth:`FusionSegmentClient.upload_sensor`,
:meth:`FusionSegmentClient.upload_frame`
and :meth:`FusionSegmentClient.list_frames`.
In contrast to :class:`SegmentClient`, :class:`FusionSegmentClient` has multiple sensors.
"""
_dataset_client: "FusionDatasetClient"
def __init__(self, name: str, data_client: "FusionDatasetClient") -> None:
super().__init__(name, data_client)
def _generate_frames(self, offset: int = 0, limit: int = 128) -> Generator[Frame, None, int]:
response = self._list_data_details(offset, limit)
url_page = LazyPage.from_items(
offset,
limit,
self._generate_urls,
(
{frame["sensorName"]: frame["url"] for frame in item["frame"]}
for item in response["dataDetails"]
),
)
for index, item in enumerate(response["dataDetails"]):
yield Frame.from_response_body(item, index, url_page, cache_path=self._cache_path)
return response["totalCount"] # type: ignore[no-any-return]
def _generate_urls(
self, offset: int = 0, limit: int = 128
) -> Generator[Dict[str, str], None, int]:
response = self._list_urls(offset, limit)
for frame in response["urls"]:
yield {item["sensorName"]: item["url"] for item in frame["urls"]}
return response["totalCount"] # type: ignore[no-any-return]
def _upload_or_import_data(
self,
data: Union[Data, AuthData],
sensor_name: str,
frame_id: str,
) -> Optional[Dict[str, Any]]:
callback_body = data.get_callback_body()
callback_body["frameId"] = frame_id
callback_body["sensorName"] = sensor_name
if isinstance(data, Data):
self._upload_file(data)
self._upload_mask_files(data.label)
return callback_body
self._synchronize_import_info((callback_body,))
return None
[docs] def get_sensors(self) -> Sensors:
"""Return the sensors in a fusion segment client.
Returns:
The :class:`sensors<~tensorbay.sensor.sensor.Sensors>` in the fusion segment client.
"""
params: Dict[str, Any] = {"segmentName": self._name}
params.update(self._status.get_status_info())
response = self._client.open_api_do(
"GET", "sensors", self._dataset_id, params=params
).json()
return Sensors.loads(response["sensors"])
[docs] def upload_sensor(self, sensor: Sensor) -> None:
"""Upload sensor to the draft.
Arguments:
sensor: The sensor to upload.
"""
self._status.check_authority_for_draft()
post_data = sensor.dumps()
post_data.update(self._status.get_status_info())
post_data["segmentName"] = self._name
self._client.open_api_do("POST", "sensors", self._dataset_id, json=post_data)
[docs] def delete_sensor(self, sensor_name: str) -> None:
"""Delete a TensorBay sensor of the draft with the given sensor name.
Arguments:
sensor_name: The TensorBay sensor to delete.
"""
self._status.check_authority_for_draft()
delete_data: Dict[str, Any] = {"segmentName": self._name, "sensorName": sensor_name}
delete_data.update(self._status.get_status_info())
self._client.open_api_do("DELETE", "sensors", self._dataset_id, json=delete_data)
[docs] def upload_frame(self, frame: Frame, timestamp: Optional[float] = None) -> None:
"""Upload frame to the draft.
Arguments:
frame: The :class:`~tensorbay.dataset.frame.Frame` to upload.
timestamp: The mark to sort frames, supporting timestamp and float.
Raises:
FrameError: When lacking frame id or frame id conflicts.
"""
self._status.check_authority_for_draft()
if timestamp is None:
try:
frame_id = frame.frame_id
except AttributeError as error:
raise FrameError(
"Lack frame id, please add frame id in frame or "
"give timestamp to the function!"
) from error
elif not hasattr(frame, "frame_id"):
frame_id = from_timestamp(timestamp)
else:
raise FrameError("Frame id conflicts, please do not give timestamp to the function!.")
callback_bodies = []
for sensor_name, data in frame.items():
try:
callback_body = data.get_callback_body()
except AttributeError:
continue
callback_body["frameId"] = frame_id.str
callback_body["sensorName"] = sensor_name
if isinstance(data, Data):
self._upload_file(data)
self._upload_mask_files(data.label)
callback_bodies.append(callback_body)
elif isinstance(data, AuthData):
self._synchronize_import_info((callback_body,))
for chunked_callback_bodies in chunked(callback_bodies, 50):
self._synchronize_upload_info(chunked_callback_bodies)
[docs] def list_frames(self) -> PagingList[Frame]:
"""List required frames in the segment in a certain commit.
Returns:
The PagingList of :class:`~tensorbay.dataset.frame.Frame`.
"""
return PagingList(self._generate_frames, 128)
[docs] def delete_frame(self, frame_id: Union[str, ULID]) -> None:
"""Delete a frame of a segment in a certain commit with the given frame id.
Arguments:
frame_id: The id of a frame in a segment.
"""
self._status.check_authority_for_draft()
delete_data: Dict[str, Any] = {
"segmentName": self.name,
"frameId": str(frame_id),
}
delete_data.update(self._status.get_status_info())
self._client.open_api_do("DELETE", "frames", self._dataset_id, json=delete_data)
[docs] def list_urls(self) -> PagingList[Dict[str, str]]:
"""List the data urls in this segment.
Returns:
The PagingList of url dict, which key is the sensor name, value is the url.
"""
urls = PagingList(self._generate_urls, 128)
urls._repr_maxlevel = 2 # pylint: disable=protected-access
return urls