Coverage for  / home / runner / work / bijux-cli / bijux-cli / src / bijux_cli / infra / retry.py: 96%

46 statements  

« prev     ^ index     » next       coverage.py v7.13.2, created at 2026-01-26 17:59 +0000

1# SPDX-License-Identifier: Apache-2.0 

2# Copyright © 2025 Bijan Mousavi 

3 

4"""Retry adapters for transient failures.""" 

5 

6from __future__ import annotations 

7 

8from collections.abc import Callable 

9import random 

10import time 

11from typing import Any, TypeVar 

12 

13T = TypeVar("T") 

14 

15 

16class NoopRetryPolicy: 

17 """No-op retry policy that calls the function once.""" 

18 

19 def run(self, func: Callable[[], T], seconds: float | None = None) -> T: 

20 """Execute the callable once.""" 

21 return func() 

22 

23 

24class TimeoutRetryPolicy: 

25 """Retries a callable until a timeout is reached.""" 

26 

27 def __init__(self, telemetry: Any, timeout: float = 5.0) -> None: 

28 """Initialize with telemetry and timeout.""" 

29 self._telemetry = telemetry 

30 self._timeout = timeout 

31 

32 def run(self, func: Callable[[], T], seconds: float | None = None) -> T: 

33 """Retry the callable until timeout expires.""" 

34 timeout = seconds if seconds is not None else self._timeout 

35 start = time.time() 

36 last_error: Exception | None = None 

37 while time.time() - start < timeout: 

38 try: 

39 return func() 

40 except Exception as exc: 

41 last_error = exc 

42 self._telemetry.event("retry_attempt_failed", {"error": str(exc)}) 

43 time.sleep(0.05) 

44 raise RuntimeError(f"Retry timeout after {timeout}s: {last_error}") 

45 

46 

47class ExponentialBackoffRetryPolicy: 

48 """Retries with exponential backoff and jitter.""" 

49 

50 def __init__( 

51 self, 

52 telemetry: Any, 

53 *, 

54 base_delay: float = 0.1, 

55 max_delay: float = 2.0, 

56 max_attempts: int = 5, 

57 ) -> None: 

58 """Initialize backoff settings.""" 

59 self._telemetry = telemetry 

60 self._base_delay = base_delay 

61 self._max_delay = max_delay 

62 self._max_attempts = max_attempts 

63 

64 def run(self, func: Callable[[], T], seconds: float | None = None) -> T: 

65 """Retry with exponential backoff.""" 

66 attempt = 0 

67 last_error: Exception | None = None 

68 while attempt < self._max_attempts: 68 ↛ 81line 68 didn't jump to line 81 because the condition on line 68 was always true

69 try: 

70 return func() 

71 except Exception as exc: 

72 last_error = exc 

73 attempt += 1 

74 delay = min(self._base_delay * (2**attempt), self._max_delay) 

75 delay = delay + random.uniform(0.0, delay / 4) # noqa: S311 # nosec B311 - non-crypto jitter 

76 self._telemetry.event( 

77 "retry_backoff", 

78 {"attempt": attempt, "delay": delay, "error": str(exc)}, 

79 ) 

80 time.sleep(delay) 

81 raise RuntimeError(f"Retry attempts exhausted: {last_error}") 

82 

83 

84__all__ = ["NoopRetryPolicy", "TimeoutRetryPolicy", "ExponentialBackoffRetryPolicy"]