Skip to content
v0.1.3

Retry Module API Reference

This section documents the internals of the retry module in Bijux CLI.

bijux_cli.infra.retry

Provides concrete asynchronous retry policy implementations.

This module defines classes that implement the RetryPolicyProtocol to handle transient errors in asynchronous operations. It offers two main strategies:

* `TimeoutRetryPolicy`: A simple policy that applies a single timeout to an
    operation.
* `ExponentialBackoffRetryPolicy`: A more advanced policy that retries an
    operation multiple times with an exponentially increasing delay and
    random jitter between attempts.

These components are designed to be used by services to build resilience against temporary failures, such as network issues.

ExponentialBackoffRetryPolicy

ExponentialBackoffRetryPolicy(telemetry: TelemetryProtocol)

Bases: RetryPolicyProtocol

A retry policy with exponential backoff, jitter, and per-attempt timeouts.

Attributes:

Initializes the ExponentialBackoffRetryPolicy.

Parameters:

Source code in src/bijux_cli/infra/retry.py
@inject
def __init__(self, telemetry: TelemetryProtocol) -> None:
    """Initializes the `ExponentialBackoffRetryPolicy`.

    Args:
        telemetry (TelemetryProtocol): The service for emitting events.
    """
    self._telemetry = telemetry

reset

reset() -> None

Resets the retry policy state. This is a no-op for this policy.

Source code in src/bijux_cli/infra/retry.py
def reset(self) -> None:
    """Resets the retry policy state. This is a no-op for this policy."""
    self._telemetry.event("retry_reset", {})

run async

run(
    supplier: Callable[[], Awaitable[T]],
    seconds: float = 1.0,
    retries: int = 3,
    delay: float = 1.0,
    backoff: float = 2.0,
    jitter: float = 0.3,
    retry_on: tuple[type[BaseException], ...] = (
        Exception,
    ),
) -> T

Executes a supplier with a timeout and exponential-backoff retries.

Parameters:

  • supplier (Callable[[], Awaitable[T]]) –

    The async operation to run.

  • seconds (float, default: 1.0 ) –

    The timeout for each attempt in seconds. Must be > 0.

  • retries (int, default: 3 ) –

    The maximum number of retry attempts.

  • delay (float, default: 1.0 ) –

    The initial delay in seconds before the first retry.

  • backoff (float, default: 2.0 ) –

    The multiplier for the delay after each failure.

  • jitter (float, default: 0.3 ) –

    The random fractional jitter to apply to each delay.

  • retry_on (tuple[type[BaseException], ...], default: (Exception,) ) –

    A tuple of exception types that will trigger a retry.

Returns:

  • T ( T ) –

    The result of the supplier if one of the attempts succeeds.

Raises:

  • ValueError

    If seconds is less than or equal to 0.

  • BaseException

    The last exception raised by supplier if all attempts fail.

Source code in src/bijux_cli/infra/retry.py
async def run(
    self,
    supplier: Callable[[], Awaitable[T]],
    seconds: float = 1.0,
    retries: int = 3,
    delay: float = 1.0,
    backoff: float = 2.0,
    jitter: float = 0.3,
    retry_on: tuple[type[BaseException], ...] = (Exception,),
) -> T:
    """Executes a supplier with a timeout and exponential-backoff retries.

    Args:
        supplier (Callable[[], Awaitable[T]]): The async operation to run.
        seconds (float): The timeout for each attempt in seconds. Must be > 0.
        retries (int): The maximum number of retry attempts.
        delay (float): The initial delay in seconds before the first retry.
        backoff (float): The multiplier for the delay after each failure.
        jitter (float): The random fractional jitter to apply to each delay.
        retry_on (tuple[type[BaseException], ...]): A tuple of exception
            types that will trigger a retry.

    Returns:
        T: The result of the `supplier` if one of the attempts succeeds.

    Raises:
        ValueError: If `seconds` is less than or equal to 0.
        BaseException: The last exception raised by `supplier` if all
            attempts fail.
    """
    if seconds <= 0:
        raise ValueError("seconds must be > 0")

    ctx = _try_asyncio_timeout(seconds)

    if ctx is not None:
        async with ctx:
            return await _backoff_loop(
                supplier,
                retries=retries,
                delay=delay,
                backoff=backoff,
                jitter=jitter,
                retry_on=retry_on,
                telemetry=self._telemetry,
            )
    else:

        async def timed_supplier() -> T:
            """Wraps the supplier in an `asyncio.wait_for` timeout.

            Returns:
                T: The result of the `supplier` if it completes in time.
            """
            return await asyncio.wait_for(supplier(), timeout=seconds)

        return await _backoff_loop(
            timed_supplier,
            retries=retries,
            delay=delay,
            backoff=backoff,
            jitter=jitter,
            retry_on=retry_on,
            telemetry=self._telemetry,
        )

TimeoutRetryPolicy

TimeoutRetryPolicy(telemetry: TelemetryProtocol)

Bases: RetryPolicyProtocol

A retry policy that applies a single, one-time timeout to an operation.

Attributes:

Initializes the TimeoutRetryPolicy.

Parameters:

Source code in src/bijux_cli/infra/retry.py
@inject
def __init__(self, telemetry: TelemetryProtocol) -> None:
    """Initializes the `TimeoutRetryPolicy`.

    Args:
        telemetry (TelemetryProtocol): The service for emitting events.
    """
    self._telemetry = telemetry

reset

reset() -> None

Resets the retry policy state. This is a no-op for this policy.

Source code in src/bijux_cli/infra/retry.py
def reset(self) -> None:
    """Resets the retry policy state. This is a no-op for this policy."""
    self._telemetry.event("retry_reset", {})

run async

run(
    supplier: Callable[[], Awaitable[T]],
    seconds: float = 1.0,
) -> T

Executes an awaitable supplier with a single timeout.

This method uses the modern asyncio.timeout context manager if available, otherwise it falls back to asyncio.wait_for.

Parameters:

  • supplier (Callable[[], Awaitable[T]]) –

    The async operation to run.

  • seconds (float, default: 1.0 ) –

    The timeout duration in seconds. Must be positive.

Returns:

  • T ( T ) –

    The result of the supplier if it completes in time.

Raises:

  • ValueError

    If seconds is less than or equal to 0.

  • BijuxError

    If the operation times out.

Source code in src/bijux_cli/infra/retry.py
async def run(
    self,
    supplier: Callable[[], Awaitable[T]],
    seconds: float = 1.0,
) -> T:
    """Executes an awaitable `supplier` with a single timeout.

    This method uses the modern `asyncio.timeout` context manager if
    available, otherwise it falls back to `asyncio.wait_for`.

    Args:
        supplier (Callable[[], Awaitable[T]]): The async operation to run.
        seconds (float): The timeout duration in seconds. Must be positive.

    Returns:
        T: The result of the `supplier` if it completes in time.

    Raises:
        ValueError: If `seconds` is less than or equal to 0.
        BijuxError: If the operation times out.
    """
    if seconds <= 0:
        raise ValueError("seconds must be > 0")

    ctx = _try_asyncio_timeout(seconds)

    try:
        if ctx is not None:
            async with ctx:
                result = await supplier()
        else:
            result = await asyncio.wait_for(supplier(), timeout=seconds)

        self._telemetry.event("retry_timeout_success", {"seconds": seconds})
        return result

    except TimeoutError as exc:
        self._telemetry.event(
            "retry_timeout_failed", {"seconds": seconds, "error": str(exc)}
        )
        raise BijuxError(
            f"Operation timed out after {seconds}s", http_status=504
        ) from exc