Source code for tensorbay.client.job

#!/usr/bin/env python3
#
# Copyright 2021 Graviti. Licensed under MIT License.
#

"""Basic structures of asynchronous jobs."""

from time import sleep
from typing import Any, Callable, Dict, Optional, Tuple, Type, TypeVar, Union

from tensorbay.client.requests import Client
from tensorbay.client.search import FusionSearchResult, SearchResult
from tensorbay.client.struct import Draft
from tensorbay.utility import AttrsMixin, ReprMixin, ReprType, attr, camel, common_loads

_JOB_UPDATE_INTERVAL = 5
_JOB_NOT_COMPLETE_STATUS = {"QUEUING", "PROCESSING"}


[docs]class Job(AttrsMixin, ReprMixin): # pylint: disable=too-many-instance-attributes """This class defines :class:`Job`. Arguments: client: The :class:`~tensorbay.client.requests.Client`. dataset_id: Dataset ID. job_updater: The function to update the information of the Job instance. title: Title of the Job. job_id: ID of the Job. job_type: Type of the Job. arguments: Arguments of the Job. created_at: The time when the Job is created. started_at: The time when the Job is started. finished_at: The time when the Job is finished. status: The status of the Job. error_message: The error message of the Job. result: The result of the Job. description: The description of the Job. """ _T = TypeVar("_T", bound="Job") _repr_type = ReprType.INSTANCE _repr_maxlevel = 2 _repr_attrs: Tuple[str, ...] = ( "title", "arguments", "created_at", "started_at", "finished_at", "status", "error_message", ) title: str = attr() job_id: str = attr(key=camel) _job_type: str = attr(key=camel) arguments: Dict[str, Any] = attr() created_at: int = attr(key=camel) started_at: Optional[int] = attr(key=camel, default=None) finished_at: Optional[int] = attr(key=camel, default=None) status: str = attr() error_message: str = attr(key=camel, default="") _result: Optional[Dict[str, Any]] = attr(key="result", default=None) description: Optional[str] = attr(default="") def __init__( # pylint: disable=too-many-arguments self, client: Client, dataset_id: str, job_updater: Callable[[str], Dict[str, Any]], title: str, job_id: str, job_type: str, arguments: Dict[str, Any], created_at: int, started_at: Optional[int], finished_at: Optional[int], status: str, error_message: str, result: Optional[Dict[str, Any]], description: Optional[str] = "", ) -> None: self._client = client self._dataset_id = dataset_id self._job_updater = job_updater self.title = title self.job_id = job_id self._job_type = job_type self.arguments = arguments self.created_at = created_at self.started_at = started_at self.finished_at = finished_at self.status = status self.error_message = error_message self._result = result self.description = description def _repr_head(self) -> str: return f'{self.__class__.__name__}("{self.job_id}")'
[docs] @classmethod def from_response_body( cls: Type[_T], body: Dict[str, Any], *, client: Client, dataset_id: str, job_updater: Callable[[str], Dict[str, Any]], # noqa: DAR101 ) -> _T: """Loads a :class:`Job` object from a response body. Arguments: body: The response body which contains the information of a job, whose format should be like:: { "title": <str> "jobId": <str> "jobType"" <str> "arguments": <object> "createdAt": <int> "startedAt": <int> "finishedAt": <int> "status": <str> "errorMessage": <str> "result": <object> "description": <str> } client: The :class:`~tensorbay.client.requests.Client`. dataset_id: Dataset ID. job_updater: The function to update the information of the Job instance. Returns: The loaded :class:`Job` object. """ job = common_loads(cls, body) job._client = client job._dataset_id = dataset_id job._job_updater = job_updater return job
[docs] def update(self, until_complete: bool = False) -> None: """Update attrs of the Job instance. Arguments: until_complete: Whether to update job information until it is complete. """ job_info = self._job_updater(self.job_id) if until_complete: while job_info["status"] in _JOB_NOT_COMPLETE_STATUS: sleep(_JOB_UPDATE_INTERVAL) job_info = self._job_updater(self.job_id) self.started_at = job_info.get("startedAt") self.finished_at = job_info.get("finishedAt") self.status = job_info["status"] self.error_message = job_info["errorMessage"] self._result = job_info.get("result")
[docs] def abort(self) -> None: """Abort a :class:`Job`.""" self._client.open_api_do( "POST", f"jobs/{self.job_id}/abort?jobType={self._job_type}", self._dataset_id )
[docs]class SquashAndMergeJob(Job): """This class defines :class:`SquashAndMergeJob`. Arguments: client: The :class:`~tensorbay.client.requests.Client`. dataset_id: Dataset ID. job_updater: The function to update the information of the Job instance. draft_getter: The function to get draft by draft_number. title: Title of the Job. job_id: ID of the Job. job_type: Type of the Job. arguments: Arguments of the Job. created_at: The time when the Job is created. started_at: The time when the Job is started. finished_at: The time when the Job is finished. status: The status of the Job. error_message: The error message of the Job. result: The result of the Job. description: The description of the Job. """ _T = TypeVar("_T", bound="SquashAndMergeJob") def __init__( # pylint: disable=too-many-locals self, client: Client, *, dataset_id: str, job_updater: Callable[[str], Dict[str, Any]], draft_getter: Callable[[int], Draft], title: str, job_id: str, job_type: str, arguments: Dict[str, Any], created_at: int, started_at: Optional[int], finished_at: Optional[int], status: str, error_message: str, result: Optional[Dict[str, Any]], description: Optional[str] = "", ) -> None: super().__init__( client, dataset_id, job_updater, title, job_id, job_type, arguments, created_at, started_at, finished_at, status, error_message, result, description, ) self._draft_getter = draft_getter @property def result(self) -> Optional[Draft]: """Get the result of the SquashAndMergeJob. Returns: The draft obtained from SquashAndMergeJob. """ if self._result: draft_number: int = self._result["draftNumber"] return self._draft_getter(draft_number) return None
[docs] @classmethod def from_response_body( # type: ignore[override] # pylint: disable=arguments-differ cls: Type[_T], body: Dict[str, Any], *, client: Client, dataset_id: str, job_updater: Callable[[str], Dict[str, Any]], # noqa: DAR101 draft_getter: Callable[[int], Draft], ) -> _T: """Loads a :class:`SquashAndMergeJob` object from a response body. Arguments: body: The response body which contains the information of a SquashAndMergeJob, whose format should be like:: { "title": <str> "jobId": <str> "jobType"" <str> "arguments": <object> "createdAt": <int> "startedAt": <int> "finishedAt": <int> "status": <str> "errorMessage": <str> "result": <object> "description": <str> } client: The :class:`~tensorbay.client.requests.Client`. dataset_id: Dataset ID. job_updater: The function to update the information of the SquashAndMergeJob instance. draft_getter: The function to get draft by draft_number. Returns: The loaded :class:`SquashAndMergeJob` object. """ job = super().from_response_body( body, client=client, dataset_id=dataset_id, job_updater=job_updater ) job._draft_getter = draft_getter # pylint: disable=protected-access return job
[docs] def retry(self) -> None: """Retry a :class:`SquashAndMergeJob`.""" self._client.open_api_do( "POST", f"jobs/{self.job_id}/retry?jobType={self._job_type}", self._dataset_id )
[docs]class BasicSearchJob(Job): """This class defines :class:`BasicSearchJob`.""" _T = TypeVar("_T", bound="BasicSearchJob") def __init__( # pylint: disable=too-many-locals self, client: Client, *, dataset_id: str, job_updater: Callable[[str], Dict[str, Any]], is_fusion: bool, title: str, job_id: str, job_type: str, arguments: Dict[str, Any], created_at: int, started_at: Optional[int], finished_at: Optional[int], status: str, error_message: str, result: Optional[Dict[str, Any]], description: Optional[str] = "", ) -> None: super().__init__( client, dataset_id, job_updater, title, job_id, job_type, arguments, created_at, started_at, finished_at, status, error_message, result, description, ) self._is_fusion = is_fusion @property def result(self) -> Union[SearchResult, FusionSearchResult, None]: """Get the result of the BasicSearchJob. Returns: The search result of the BasicSearchJob. """ if self.status == "SUCCESS" and self._result: search_result_id: str = self._result["searchResultId"] search_result_commit_id: str = self._result["searchResultCommitId"] if self._is_fusion: return FusionSearchResult( self.job_id, search_result_id, search_result_commit_id, self._client ) return SearchResult( self.job_id, search_result_id, search_result_commit_id, self._client ) return None
[docs] @classmethod def from_response_body( # type: ignore[override] # pylint: disable=arguments-differ cls: Type[_T], body: Dict[str, Any], *, client: Client, dataset_id: str, job_updater: Callable[[str], Dict[str, Any]], # noqa: DAR101 is_fusion: bool, ) -> _T: """Loads a :class:`BasicSearchJob` object from a response body. Arguments: body: The response body which contains the information of a BasicSearchJob, whose format should be like:: { "title": <str> "jobId": <str> "jobType"" <str> "arguments": <object> "createdAt": <int> "startedAt": <int> "finishedAt": <int> "status": <str> "errorMessage": <str> "result": <object> "description": <str> } client: The :class:`~tensorbay.client.requests.Client`. dataset_id: Dataset ID. job_updater: The function to update the information of the BasicSearchJob instance. if_fusion: Whether it is from fusion dataset. Returns: The loaded :class:`BasicSearchJob` object. """ job = super().from_response_body( body, client=client, dataset_id=dataset_id, job_updater=job_updater ) job._is_fusion = is_fusion # pylint: disable=protected-access return job
[docs] def create_dataset(self, name: str, alias: str = "", is_public: bool = False) -> None: """Create a TensorBay dataset based on the search job. Arguments: name: Name of the dataset, unique for a user. alias: Alias of the dataset, default is "". is_public: Whether the dataset is a public dataset. """ post_data = { "datasetName": name, "alias": alias, "isPublic": is_public, } self._client.open_api_do( "POST", f"searchResults/{self.job_id}/datasets", "", json=post_data )