Source code for tensorbay.healthcheck.pipeline

#!/usr/bin/env python3
#
# Copyright 2021 Graviti. Licensed under MIT License.
#

"""Pipeline and PipelineForIterable.

:class:`Pipeline` runs registered checker functions while healthchecking.
:class:`PipelineForIterable` is a healthcheck pipeline to processes iterable objects.

Checker functions can be registered to the healthcheck pipeline by :meth:`Pipeline.register` or
:meth:`PipelineForIterable.register <Pipeline.register>`.

"""

from typing import Callable, Generic, Iterable, Iterator, List, TypeVar

_A = TypeVar("_A")
_R = TypeVar("_R")
_Checker = Callable[[_A], Iterator[_R]]


[docs]class Pipeline(Generic[_A, _R]): """:class:`Pipeline` is a healthcheck pipeline to run registered checker functions.""" _S = TypeVar("_S", bound="Pipeline[_A, _R]") def __init__(self) -> None: self._pipeline: List[_Checker[_A, _R]] = [] def __call__(self, args: _A) -> Iterator[_R]: """Call the :class:`Pipeline` as a function. Arguments: args: The information needs to be checked. Yields: The checker function. """ for checker in self._pipeline: yield from checker(args)
[docs] def register(self, checker: _Checker[_A, _R]) -> _Checker[_A, _R]: """Decorator function to register checkers into pipeline. Arguments: checker: The checker function needs to be registered. Returns: The checker function unchanged. """ self._pipeline.append(checker) return checker
[docs] def copy(self: _S) -> _S: """Copy method to get a shallow copy of pipeline. Returns: A shallow copy of the pipeline. """ pipeline = self.__class__() pipeline._pipeline = self._pipeline.copy() # pylint: disable=protected-access return pipeline
[docs]class PipelineForIterable(Pipeline[_A, _R]): """Healthcheck pipeline for processing iterable objects.""" def __call__(self, args: Iterable[_A]) -> Iterator[_R]: # type: ignore[override] """Call the :class:`Pipeline` as a function. Arguments: args: The information needs to be checked. Yields: The checker function. """ for arg in args: yield from super().__call__(arg)