#!/usr/bin/env python3
#
# Copyright 2021 Graviti. Licensed under MIT License.
#
"""Basic structures of asynchronous jobs."""
from time import sleep
from typing import Any, Callable, Dict, Optional, Tuple, Type, TypeVar, Union
from tensorbay.client.requests import Client
from tensorbay.client.search import FusionSearchResult, SearchResult
from tensorbay.client.struct import Draft
from tensorbay.utility import AttrsMixin, ReprMixin, ReprType, attr, camel, common_loads
_JOB_UPDATE_INTERVAL = 5
_JOB_NOT_COMPLETE_STATUS = {"QUEUING", "PROCESSING"}
[docs]class Job(AttrsMixin, ReprMixin): # pylint: disable=too-many-instance-attributes
"""This class defines :class:`Job`.
Arguments:
client: The :class:`~tensorbay.client.requests.Client`.
dataset_id: Dataset ID.
job_updater: The function to update the information of the Job instance.
title: Title of the Job.
job_id: ID of the Job.
job_type: Type of the Job.
arguments: Arguments of the Job.
created_at: The time when the Job is created.
started_at: The time when the Job is started.
finished_at: The time when the Job is finished.
status: The status of the Job.
error_message: The error message of the Job.
result: The result of the Job.
description: The description of the Job.
"""
_T = TypeVar("_T", bound="Job")
_repr_type = ReprType.INSTANCE
_repr_maxlevel = 2
_repr_attrs: Tuple[str, ...] = (
"title",
"arguments",
"created_at",
"started_at",
"finished_at",
"status",
"error_message",
)
title: str = attr()
job_id: str = attr(key=camel)
_job_type: str = attr(key=camel)
arguments: Dict[str, Any] = attr()
created_at: int = attr(key=camel)
started_at: Optional[int] = attr(key=camel, default=None)
finished_at: Optional[int] = attr(key=camel, default=None)
status: str = attr()
error_message: str = attr(key=camel, default="")
_result: Optional[Dict[str, Any]] = attr(key="result", default=None)
description: Optional[str] = attr(default="")
def __init__( # pylint: disable=too-many-arguments
self,
client: Client,
dataset_id: str,
job_updater: Callable[[str], Dict[str, Any]],
title: str,
job_id: str,
job_type: str,
arguments: Dict[str, Any],
created_at: int,
started_at: Optional[int],
finished_at: Optional[int],
status: str,
error_message: str,
result: Optional[Dict[str, Any]],
description: Optional[str] = "",
) -> None:
self._client = client
self._dataset_id = dataset_id
self._job_updater = job_updater
self.title = title
self.job_id = job_id
self._job_type = job_type
self.arguments = arguments
self.created_at = created_at
self.started_at = started_at
self.finished_at = finished_at
self.status = status
self.error_message = error_message
self._result = result
self.description = description
def _repr_head(self) -> str:
return f'{self.__class__.__name__}("{self.job_id}")'
[docs] @classmethod
def from_response_body(
cls: Type[_T],
body: Dict[str, Any],
*,
client: Client,
dataset_id: str,
job_updater: Callable[[str], Dict[str, Any]], # noqa: DAR101
) -> _T:
"""Loads a :class:`Job` object from a response body.
Arguments:
body: The response body which contains the information of a job,
whose format should be like::
{
"title": <str>
"jobId": <str>
"jobType"" <str>
"arguments": <object>
"createdAt": <int>
"startedAt": <int>
"finishedAt": <int>
"status": <str>
"errorMessage": <str>
"result": <object>
"description": <str>
}
client: The :class:`~tensorbay.client.requests.Client`.
dataset_id: Dataset ID.
job_updater: The function to update the information of the Job instance.
Returns:
The loaded :class:`Job` object.
"""
job = common_loads(cls, body)
job._client = client
job._dataset_id = dataset_id
job._job_updater = job_updater
return job
[docs] def update(self, until_complete: bool = False) -> None:
"""Update attrs of the Job instance.
Arguments:
until_complete: Whether to update job information until it is complete.
"""
job_info = self._job_updater(self.job_id)
if until_complete:
while job_info["status"] in _JOB_NOT_COMPLETE_STATUS:
sleep(_JOB_UPDATE_INTERVAL)
job_info = self._job_updater(self.job_id)
self.started_at = job_info.get("startedAt")
self.finished_at = job_info.get("finishedAt")
self.status = job_info["status"]
self.error_message = job_info["errorMessage"]
self._result = job_info.get("result")
[docs] def abort(self) -> None:
"""Abort a :class:`Job`."""
self._client.open_api_do(
"POST", f"jobs/{self.job_id}/abort?jobType={self._job_type}", self._dataset_id
)
[docs]class SquashAndMergeJob(Job):
"""This class defines :class:`SquashAndMergeJob`.
Arguments:
client: The :class:`~tensorbay.client.requests.Client`.
dataset_id: Dataset ID.
job_updater: The function to update the information of the Job instance.
draft_getter: The function to get draft by draft_number.
title: Title of the Job.
job_id: ID of the Job.
job_type: Type of the Job.
arguments: Arguments of the Job.
created_at: The time when the Job is created.
started_at: The time when the Job is started.
finished_at: The time when the Job is finished.
status: The status of the Job.
error_message: The error message of the Job.
result: The result of the Job.
description: The description of the Job.
"""
_T = TypeVar("_T", bound="SquashAndMergeJob")
def __init__( # pylint: disable=too-many-locals
self,
client: Client,
*,
dataset_id: str,
job_updater: Callable[[str], Dict[str, Any]],
draft_getter: Callable[[int], Draft],
title: str,
job_id: str,
job_type: str,
arguments: Dict[str, Any],
created_at: int,
started_at: Optional[int],
finished_at: Optional[int],
status: str,
error_message: str,
result: Optional[Dict[str, Any]],
description: Optional[str] = "",
) -> None:
super().__init__(
client,
dataset_id,
job_updater,
title,
job_id,
job_type,
arguments,
created_at,
started_at,
finished_at,
status,
error_message,
result,
description,
)
self._draft_getter = draft_getter
@property
def result(self) -> Optional[Draft]:
"""Get the result of the SquashAndMergeJob.
Returns:
The draft obtained from SquashAndMergeJob.
"""
if self._result:
draft_number: int = self._result["draftNumber"]
return self._draft_getter(draft_number)
return None
[docs] @classmethod
def from_response_body( # type: ignore[override] # pylint: disable=arguments-differ
cls: Type[_T],
body: Dict[str, Any],
*,
client: Client,
dataset_id: str,
job_updater: Callable[[str], Dict[str, Any]], # noqa: DAR101
draft_getter: Callable[[int], Draft],
) -> _T:
"""Loads a :class:`SquashAndMergeJob` object from a response body.
Arguments:
body: The response body which contains the information of a SquashAndMergeJob,
whose format should be like::
{
"title": <str>
"jobId": <str>
"jobType"" <str>
"arguments": <object>
"createdAt": <int>
"startedAt": <int>
"finishedAt": <int>
"status": <str>
"errorMessage": <str>
"result": <object>
"description": <str>
}
client: The :class:`~tensorbay.client.requests.Client`.
dataset_id: Dataset ID.
job_updater: The function to update the information of the SquashAndMergeJob instance.
draft_getter: The function to get draft by draft_number.
Returns:
The loaded :class:`SquashAndMergeJob` object.
"""
job = super().from_response_body(
body, client=client, dataset_id=dataset_id, job_updater=job_updater
)
job._draft_getter = draft_getter # pylint: disable=protected-access
return job
[docs] def retry(self) -> None:
"""Retry a :class:`SquashAndMergeJob`."""
self._client.open_api_do(
"POST", f"jobs/{self.job_id}/retry?jobType={self._job_type}", self._dataset_id
)
[docs]class BasicSearchJob(Job):
"""This class defines :class:`BasicSearchJob`."""
_T = TypeVar("_T", bound="BasicSearchJob")
def __init__( # pylint: disable=too-many-locals
self,
client: Client,
*,
dataset_id: str,
job_updater: Callable[[str], Dict[str, Any]],
is_fusion: bool,
title: str,
job_id: str,
job_type: str,
arguments: Dict[str, Any],
created_at: int,
started_at: Optional[int],
finished_at: Optional[int],
status: str,
error_message: str,
result: Optional[Dict[str, Any]],
description: Optional[str] = "",
) -> None:
super().__init__(
client,
dataset_id,
job_updater,
title,
job_id,
job_type,
arguments,
created_at,
started_at,
finished_at,
status,
error_message,
result,
description,
)
self._is_fusion = is_fusion
@property
def result(self) -> Union[SearchResult, FusionSearchResult, None]:
"""Get the result of the BasicSearchJob.
Returns:
The search result of the BasicSearchJob.
"""
if self.status == "SUCCESS" and self._result:
search_result_id: str = self._result["searchResultId"]
search_result_commit_id: str = self._result["searchResultCommitId"]
if self._is_fusion:
return FusionSearchResult(
self.job_id, search_result_id, search_result_commit_id, self._client
)
return SearchResult(
self.job_id, search_result_id, search_result_commit_id, self._client
)
return None
[docs] @classmethod
def from_response_body( # type: ignore[override] # pylint: disable=arguments-differ
cls: Type[_T],
body: Dict[str, Any],
*,
client: Client,
dataset_id: str,
job_updater: Callable[[str], Dict[str, Any]], # noqa: DAR101
is_fusion: bool,
) -> _T:
"""Loads a :class:`BasicSearchJob` object from a response body.
Arguments:
body: The response body which contains the information of a BasicSearchJob,
whose format should be like::
{
"title": <str>
"jobId": <str>
"jobType"" <str>
"arguments": <object>
"createdAt": <int>
"startedAt": <int>
"finishedAt": <int>
"status": <str>
"errorMessage": <str>
"result": <object>
"description": <str>
}
client: The :class:`~tensorbay.client.requests.Client`.
dataset_id: Dataset ID.
job_updater: The function to update the information of the BasicSearchJob instance.
if_fusion: Whether it is from fusion dataset.
Returns:
The loaded :class:`BasicSearchJob` object.
"""
job = super().from_response_body(
body, client=client, dataset_id=dataset_id, job_updater=job_updater
)
job._is_fusion = is_fusion # pylint: disable=protected-access
return job
[docs] def create_dataset(self, name: str, alias: str = "", is_public: bool = False) -> None:
"""Create a TensorBay dataset based on the search job.
Arguments:
name: Name of the dataset, unique for a user.
alias: Alias of the dataset, default is "".
is_public: Whether the dataset is a public dataset.
"""
post_data = {
"datasetName": name,
"alias": alias,
"isPublic": is_public,
}
self._client.open_api_do(
"POST", f"searchResults/{self.job_id}/datasets", "", json=post_data
)