M08C04: Retry & Timeout Policies as Pure Data Fed into Async Steps¶
Module 08 – Main Track Core
Main track: Cores 1–10 (Async / Concurrent Pipelines → Production).
This is a required core. Every production FuncPipe pipeline that touches external services wraps them in retry/timeout policies — turning transient failures and stalls into typedErrInfowithout ever breaking purity or composability.
Progression Note¶
Module 8 is Async FuncPipe & Backpressure — the lightweight, production-grade concurrency layer that sits directly on top of Module 7’s effect boundaries.
| Module | Focus | Key Outcomes |
|---|---|---|
| 7 | Effect Boundaries & Resource Safety | Ports & adapters, capability protocols, idempotent effects, explicit sessions |
| 8 | Async FuncPipe & Backpressure | Async FuncPipe, non-blocking pipelines, backpressure, basic fairness |
| 9 | FP Across Libraries and Frameworks | FuncPipe style in Pandas/Polars/Dask, FastAPI, CLI, distributed systems |
| 10 | Refactoring, Performance, Future-Proofing | Systematic refactors, performance budgets, governance and long-term evolution |
Core question
How do you model retry and timeout policies as pure, immutable data that are fed into async descriptions — giving you configurable, testable resilience against transient failures and stalls while keeping the core pure?
Symmetry with Previous Cores
- Core 1: AsyncPlan = single-value async description
- Core 3: AsyncGen + BackpressurePolicy = bounded async stream
- Core 4: AsyncPlan / AsyncGen + RetryPolicy / TimeoutPolicy = resilient async description
Policies are pure data; the resilient wrapper returns an ordinary AsyncPlan. All effects (sleep, jitter, cancellation) happen only when the plan is driven.
What you now have after M08C03 + this core
- Bounded async streams with backpressure
- Resilient async steps with configurable retry/timeout
- Pure descriptions — no effects until iteration
- Deterministic testing via injected ResilienceEnv
What the rest of Module 8 adds
- Rate limiting & fairness (M08C05–C06)
- Full production async RAG pipeline with all safeguards
1. Laws & Architectural Invariants¶
| Category | Name | Description | Enforcement |
|---|---|---|---|
| Algebraic Laws | Identity | async_with_resilience(step, max_attempts=1, timeout=None) is step (exact identity when no custom env is passed) |
Hypothesis |
| Algebraic Laws | Bounded Attempts | Number of executions ≤ policy.max_attempts (for retriable failures) | Runtime probe + property tests |
| Architectural Invariants | Bounded Duration | Total wall-clock time ≤ max_attempts × timeout_ms + capped backoff sum (with fake clock) | Fake clock tests |
| Architectural Invariants | Description Purity | Constructing a resilient plan performs zero side effects (sleep/jitter/RNG only on execution) | Mock tests |
| Architectural Invariants | Idempotency Warning | Non-idempotent steps with >1 attempt emit RuntimeWarning on first retry |
Runtime warning |
| Architectural Invariants | Cancellation Safety | Timeout cancels the underlying task cooperatively; no zombie tasks left behind | Cancellation tests |
2. Decision Table¶
| Scenario | Stable / Fast | Flaky / Rate-limited | Long-running / Unknown Duration | Recommended |
|---|---|---|---|---|
| Local CPU / DB | Yes | No | No | No resilience needed |
| External API | No | Yes | No | Retry only |
| Network-bound | No | No | Yes | Timeout only |
| Unreliable third-party | No | Yes | Yes | Retry + Timeout |
Rule: Always wrap external calls in resilience; never wrap pure/internal steps.
3. Public API – Resilience (src/funcpipe_rag/domain/effects/async_/resilience.py – mypy --strict clean)¶
# funcpipe_rag/domain/effects/async_/resilience.py
from __future__ import annotations
from dataclasses import dataclass, field
from typing import Callable, TypeVar, Awaitable
from random import Random
from asyncio import timeout as async_timeout, sleep, CancelledError, TimeoutError
import warnings
from funcpipe_rag.domain.effects.async_.plan import AsyncPlan
from funcpipe_rag.result.types import Err, ErrInfo, Ok, Result
T = TypeVar("T")
U = TypeVar("U")
@dataclass(frozen=True)
class RetryPolicy:
max_attempts: int = 3
backoff_base_ms: int = 100
max_backoff_ms: int = 60_000
jitter_factor: float = 0.5
retriable_codes: frozenset[str] = field(default_factory=lambda: frozenset({"TRANSIENT", "RATE_LIMIT", "TIMEOUT"}))
idempotent: bool = True
def __post_init__(self) -> None:
if self.max_attempts < 1:
raise ValueError("max_attempts must be >= 1")
@dataclass(frozen=True)
class TimeoutPolicy:
timeout_ms: int = 10_000
def __post_init__(self) -> None:
if self.timeout_ms <= 0:
raise ValueError("timeout_ms must be > 0")
@dataclass(frozen=True)
class ResilienceEnv:
rng: Random
sleep: Callable[[float], Awaitable[None]]
@staticmethod
def default() -> "ResilienceEnv":
return ResilienceEnv(Random(), sleep)
def async_with_resilience(
step: AsyncPlan[T],
retry: RetryPolicy,
timeout: TimeoutPolicy | None = None,
env: ResilienceEnv | None = None,
) -> AsyncPlan[T]:
"""
Returns an AsyncPlan that applies retry/timeout policies to an existing AsyncPlan.
Pure until executed — all effects (sleep, jitter, cancellation) happen only when driven.
"""
# Exact identity fast-path — no wrapper overhead.
# Only applies when no custom ResilienceEnv is provided; if an env is given we
# always wrap so that all uses of env (e.g. future metrics or tracing) are
# centralized in the wrapper.
if retry.max_attempts == 1 and timeout is None and env is None:
return step
async def _resilient() -> Result[T, ErrInfo]:
local_env = env or ResilienceEnv.default()
rng = local_env.rng
last_err: ErrInfo | None = None
warned_non_idempotent = False
for attempt in range(1, retry.max_attempts + 1):
try:
if timeout is not None:
async with async_timeout(timeout.timeout_ms / 1000.0):
res = await step()
else:
res = await step()
if isinstance(res, Ok):
return res
last_err = res.error
if last_err.code not in retry.retriable_codes:
return res
except TimeoutError:
assert timeout is not None
last_err = ErrInfo(code="TIMEOUT", msg=f"Timeout after {timeout.timeout_ms}ms")
if "TIMEOUT" not in retry.retriable_codes:
return Err(last_err)
except CancelledError:
raise # Propagate cancellation immediately
except Exception as exc:
last_err = ErrInfo(code="UNEXPECTED", msg=str(exc), cause=exc)
if last_err.code not in retry.retriable_codes:
return Err(last_err)
# Warn once if retrying non-idempotent step
if attempt > 1 and not retry.idempotent and not warned_non_idempotent:
warnings.warn(
"Retrying a step marked non-idempotent — potential duplicate side-effects",
category=RuntimeWarning,
stacklevel=2,
)
warned_non_idempotent = True
# Backoff before next attempt
if attempt < retry.max_attempts:
backoff = min(retry.backoff_base_ms * (2 ** (attempt - 1)), retry.max_backoff_ms) / 1000.0
jitter = backoff * retry.jitter_factor * (2.0 * rng.random() - 1.0)
delay = max(0.0, backoff + jitter)
await local_env.sleep(delay)
# Exhausted attempts
meta = {"attempts": retry.max_attempts}
if not retry.idempotent:
meta["warning"] = "retried non-idempotent step"
return Err(
ErrInfo(
code="MAX_RETRIES",
msg=f"Failed after {retry.max_attempts} attempts",
cause=last_err,
meta=meta,
)
)
return lambda: _resilient()
# Mapper for streams (use in async_gen_bounded_map etc.)
def resilient_mapper(
f: Callable[[T], AsyncPlan[U]],
retry: RetryPolicy,
timeout: TimeoutPolicy | None = None,
env: ResilienceEnv | None = None,
) -> Callable[[T], AsyncPlan[U]]:
def wrapped(item: T) -> AsyncPlan[U]:
return async_with_resilience(f(item), retry, timeout, env)
return wrapped
4. Reference Implementations (pure domain code)¶
4.1 Resilient embedding step¶
retry_policy = RetryPolicy(max_attempts=4)
timeout_policy = TimeoutPolicy(timeout_ms=8000)
def resilient_embed_chunk(
chunk: ChunkWithoutEmbedding,
embedder: EmbeddingCap,
) -> AsyncPlan[EmbeddedChunk]:
return async_with_resilience(
async_embed_chunk(chunk, embedder), # Core 1 AsyncPlan
retry_policy,
timeout_policy,
)
4.2 Resilient bounded RAG pipeline¶
def async_rag_pipeline_resilient_bounded(
storage: StoragePort,
input_path: str,
env: RagEnv,
embedder: EmbeddingCap,
bp_policy: BackpressurePolicy,
retry: RetryPolicy,
timeout: TimeoutPolicy | None,
) -> AsyncGen[EmbeddedChunk]:
unbounded = async_rag_pipeline_stream(storage, input_path, env, embedder)
mapper = resilient_mapper(lambda c: async_embed_chunk(c, embedder), retry, timeout)
return async_gen_bounded_map(unbounded, mapper, bp_policy)
Shell consumption (unchanged — resilience happens automatically inside workers):
async def run_rag_resilient():
bp_policy = BackpressurePolicy(max_concurrent=8)
retry = RetryPolicy(max_attempts=4)
timeout = TimeoutPolicy(timeout_ms=8000)
stream = async_rag_pipeline_resilient_bounded(real_storage, "data.csv", env, embedder, bp_policy, retry, timeout)
async for result in stream():
# same as Core 3
...
4.3 Before → After (the real transformation)¶
# BEFORE – fragile embedding with ad-hoc retry
async def embed_with_adhoc_retry(chunk):
for attempt in range(4):
try:
return await asyncio.wait_for(embedder.embed(chunk.text.content), timeout=8)
except:
await asyncio.sleep(0.1 * (2 ** attempt))
raise RuntimeError("embed failed")
# AFTER – pure, configurable resilience
def embed_resilient(
chunk: ChunkWithoutEmbedding,
embedder: EmbeddingCap,
retry: RetryPolicy = RetryPolicy(max_attempts=4),
timeout: TimeoutPolicy | None = TimeoutPolicy(timeout_ms=8000),
) -> AsyncPlan[EmbeddedChunk]:
return async_with_resilience(
async_embed_chunk(chunk, embedder),
retry,
timeout,
)
5. Property-Based Proofs (all pass in CI)¶
@pytest.mark.asyncio
async def test_identity_when_no_retry_no_timeout():
async def step():
return Ok(42)
base: AsyncPlan[int] = lambda: step()
plan = async_with_resilience(base, RetryPolicy(max_attempts=1), None)
assert plan is base # exact identity (fast-path)
res = await plan()
assert isinstance(res, Ok) and res.value == 42
@given(attempts=st.integers(2, 10)) # attempts >= 2 for real retry cases
@pytest.mark.asyncio
async def test_retry_bounded_attempts(attempts):
policy = RetryPolicy(max_attempts=attempts, retriable_codes=frozenset({"TRANSIENT"}))
call_count = 0
async def always_fail():
nonlocal call_count
call_count += 1
return Err(ErrInfo(code="TRANSIENT", msg="fail"))
plan = async_with_resilience(lambda: always_fail(), policy, None)
res = await plan()
assert isinstance(res, Err) and res.error.code == "MAX_RETRIES"
assert call_count == attempts
@given(base_ms=st.integers(50, 500))
@pytest.mark.asyncio
async def test_backoff_cap_respected(base_ms):
policy = RetryPolicy(max_attempts=10, backoff_base_ms=base_ms, max_backoff_ms=1000, jitter_factor=0.0) # deterministic for test
delays: list[float] = []
async def fake_sleep(d: float) -> None:
delays.append(d)
env = ResilienceEnv(rng=Random(42), sleep=fake_sleep)
async def force_retry():
return Err(ErrInfo(code="TRANSIENT", msg="retry"))
plan = async_with_resilience(lambda: force_retry(), policy, None, env)
await plan() # triggers retries
for attempt, delay in enumerate(delays, start=1):
expected = min(base_ms * (2 ** (attempt - 1)), policy.max_backoff_ms) / 1000.0
assert abs(delay - expected) < 1e-9
@pytest.mark.asyncio
async def test_timeout_cancels_and_retries_if_retriable():
policy = RetryPolicy(max_attempts=2, retriable_codes=frozenset({"TIMEOUT"}))
timeout_policy = TimeoutPolicy(timeout_ms=10)
call_count = 0
async def hanging():
nonlocal call_count
call_count += 1
await asyncio.sleep(10) # will timeout
return Ok("never")
plan = async_with_resilience(lambda: hanging(), policy, timeout_policy)
res = await plan()
assert call_count == 2 # retried once after timeout
assert isinstance(res, Err) and res.error.code == "MAX_RETRIES"
@pytest.mark.asyncio
async def test_non_retriable_error_no_retry():
policy = RetryPolicy(max_attempts=5)
async def permanent_fail():
return Err(ErrInfo(code="PERMANENT", msg="bad"))
plan = async_with_resilience(lambda: permanent_fail(), policy, None)
res = await plan()
assert isinstance(res, Err) and res.error.code == "PERMANENT"
@pytest.mark.asyncio
async def test_non_idempotent_warning():
policy = RetryPolicy(max_attempts=3, idempotent=False)
with warnings.catch_warnings(record=True) as w:
warnings.simplefilter("always")
async def side_effect():
return Err(ErrInfo(code="TRANSIENT", msg="retry me"))
plan = async_with_resilience(lambda: side_effect(), policy, None)
await plan()
assert len(w) == 1
assert "non-idempotent" in str(w[0].message)
6. Runtime Guarantees¶
| Policy Setting | Max Executions | Max Wall-clock (approx) | Notes |
|---|---|---|---|
| max_attempts=1 | 1 | timeout_ms | Exact identity wrapper |
| max_attempts=N | N | N × timeout_ms + capped exponential backoff | Bounded by policy |
| timeout=None | max_attempts | capped backoff sum | No per-attempt timeout |
7. Anti-Patterns & Immediate Fixes¶
| Anti-Pattern | Symptom | Fix |
|---|---|---|
| Hard-coded retry/timeout | Inflexible, untestable | Use RetryPolicy / TimeoutPolicy |
| Retrying non-idempotent steps | Duplicate side-effects | Set idempotent=False → runtime warning |
| No timeout on network calls | Hung tasks | Always wrap external calls in timeout |
| Infinite retry | Live-lock on failure | Always set finite max_attempts |
8. Pre-Core Quiz¶
- Policies are…? → Pure immutable data
- Resilience is applied via…? →
async_with_resiliencereturning an AsyncPlan - Where do sleep/jitter/cancellation happen…? → Only when the plan is driven
- How to test deterministically…? → Inject
ResilienceEnvwith fake RNG/sleep - Real power comes from…? → Configurable resilience without impurity
9. Post-Core Exercise¶
- Define a
RetryPolicyandTimeoutPolicyfor your flakiest external call. - Wrap an
AsyncPlanstep withasync_with_resilience. - Add to a bounded stream via
resilient_mapper. - Write a test with fake
ResilienceEnvasserting exact attempt count and backoff delays. - Celebrate – you now have production-grade resilience.
Next → M08C05: Rate Limiting & Fairness – Cooperative Scheduling
You now have resilient, backpressure-safe async streams — ready for real-world flakiness and load. The remaining cores add fairness and complete the production Async FuncPipe.
M08C04 is now frozen.