Skip to content

M08C04: Retry & Timeout Policies as Pure Data Fed into Async Steps

Module 08 – Main Track Core

Main track: Cores 1–10 (Async / Concurrent Pipelines → Production).
This is a required core. Every production FuncPipe pipeline that touches external services wraps them in retry/timeout policies — turning transient failures and stalls into typed ErrInfo without ever breaking purity or composability.

Progression Note

Module 8 is Async FuncPipe & Backpressure — the lightweight, production-grade concurrency layer that sits directly on top of Module 7’s effect boundaries.

Module Focus Key Outcomes
7 Effect Boundaries & Resource Safety Ports & adapters, capability protocols, idempotent effects, explicit sessions
8 Async FuncPipe & Backpressure Async FuncPipe, non-blocking pipelines, backpressure, basic fairness
9 FP Across Libraries and Frameworks FuncPipe style in Pandas/Polars/Dask, FastAPI, CLI, distributed systems
10 Refactoring, Performance, Future-Proofing Systematic refactors, performance budgets, governance and long-term evolution

Core question
How do you model retry and timeout policies as pure, immutable data that are fed into async descriptions — giving you configurable, testable resilience against transient failures and stalls while keeping the core pure?

Symmetry with Previous Cores
- Core 1: AsyncPlan = single-value async description
- Core 3: AsyncGen + BackpressurePolicy = bounded async stream
- Core 4: AsyncPlan / AsyncGen + RetryPolicy / TimeoutPolicy = resilient async description

Policies are pure data; the resilient wrapper returns an ordinary AsyncPlan. All effects (sleep, jitter, cancellation) happen only when the plan is driven.

What you now have after M08C03 + this core - Bounded async streams with backpressure
- Resilient async steps with configurable retry/timeout
- Pure descriptions — no effects until iteration
- Deterministic testing via injected ResilienceEnv

What the rest of Module 8 adds - Rate limiting & fairness (M08C05–C06)
- Full production async RAG pipeline with all safeguards

1. Laws & Architectural Invariants

Category Name Description Enforcement
Algebraic Laws Identity async_with_resilience(step, max_attempts=1, timeout=None) is step (exact identity when no custom env is passed) Hypothesis
Algebraic Laws Bounded Attempts Number of executions ≤ policy.max_attempts (for retriable failures) Runtime probe + property tests
Architectural Invariants Bounded Duration Total wall-clock time ≤ max_attempts × timeout_ms + capped backoff sum (with fake clock) Fake clock tests
Architectural Invariants Description Purity Constructing a resilient plan performs zero side effects (sleep/jitter/RNG only on execution) Mock tests
Architectural Invariants Idempotency Warning Non-idempotent steps with >1 attempt emit RuntimeWarning on first retry Runtime warning
Architectural Invariants Cancellation Safety Timeout cancels the underlying task cooperatively; no zombie tasks left behind Cancellation tests

2. Decision Table

Scenario Stable / Fast Flaky / Rate-limited Long-running / Unknown Duration Recommended
Local CPU / DB Yes No No No resilience needed
External API No Yes No Retry only
Network-bound No No Yes Timeout only
Unreliable third-party No Yes Yes Retry + Timeout

Rule: Always wrap external calls in resilience; never wrap pure/internal steps.

3. Public API – Resilience (src/funcpipe_rag/domain/effects/async_/resilience.py – mypy --strict clean)

# funcpipe_rag/domain/effects/async_/resilience.py
from __future__ import annotations

from dataclasses import dataclass, field
from typing import Callable, TypeVar, Awaitable
from random import Random
from asyncio import timeout as async_timeout, sleep, CancelledError, TimeoutError
import warnings

from funcpipe_rag.domain.effects.async_.plan import AsyncPlan
from funcpipe_rag.result.types import Err, ErrInfo, Ok, Result

T = TypeVar("T")
U = TypeVar("U")

@dataclass(frozen=True)
class RetryPolicy:
    max_attempts: int = 3
    backoff_base_ms: int = 100
    max_backoff_ms: int = 60_000
    jitter_factor: float = 0.5
    retriable_codes: frozenset[str] = field(default_factory=lambda: frozenset({"TRANSIENT", "RATE_LIMIT", "TIMEOUT"}))
    idempotent: bool = True

    def __post_init__(self) -> None:
        if self.max_attempts < 1:
            raise ValueError("max_attempts must be >= 1")

@dataclass(frozen=True)
class TimeoutPolicy:
    timeout_ms: int = 10_000

    def __post_init__(self) -> None:
        if self.timeout_ms <= 0:
            raise ValueError("timeout_ms must be > 0")

@dataclass(frozen=True)
class ResilienceEnv:
    rng: Random
    sleep: Callable[[float], Awaitable[None]]

    @staticmethod
    def default() -> "ResilienceEnv":
        return ResilienceEnv(Random(), sleep)


def async_with_resilience(
    step: AsyncPlan[T],
    retry: RetryPolicy,
    timeout: TimeoutPolicy | None = None,
    env: ResilienceEnv | None = None,
) -> AsyncPlan[T]:
    """
    Returns an AsyncPlan that applies retry/timeout policies to an existing AsyncPlan.
    Pure until executed — all effects (sleep, jitter, cancellation) happen only when driven.
    """
    # Exact identity fast-path — no wrapper overhead.
    # Only applies when no custom ResilienceEnv is provided; if an env is given we
    # always wrap so that all uses of env (e.g. future metrics or tracing) are
    # centralized in the wrapper.
    if retry.max_attempts == 1 and timeout is None and env is None:
        return step

    async def _resilient() -> Result[T, ErrInfo]:
        local_env = env or ResilienceEnv.default()
        rng = local_env.rng
        last_err: ErrInfo | None = None
        warned_non_idempotent = False

        for attempt in range(1, retry.max_attempts + 1):
            try:
                if timeout is not None:
                    async with async_timeout(timeout.timeout_ms / 1000.0):
                        res = await step()
                else:
                    res = await step()

                if isinstance(res, Ok):
                    return res

                last_err = res.error
                if last_err.code not in retry.retriable_codes:
                    return res

            except TimeoutError:
                assert timeout is not None
                last_err = ErrInfo(code="TIMEOUT", msg=f"Timeout after {timeout.timeout_ms}ms")
                if "TIMEOUT" not in retry.retriable_codes:
                    return Err(last_err)

            except CancelledError:
                raise  # Propagate cancellation immediately

            except Exception as exc:
                last_err = ErrInfo(code="UNEXPECTED", msg=str(exc), cause=exc)
                if last_err.code not in retry.retriable_codes:
                    return Err(last_err)

            # Warn once if retrying non-idempotent step
            if attempt > 1 and not retry.idempotent and not warned_non_idempotent:
                warnings.warn(
                    "Retrying a step marked non-idempotent — potential duplicate side-effects",
                    category=RuntimeWarning,
                    stacklevel=2,
                )
                warned_non_idempotent = True

            # Backoff before next attempt
            if attempt < retry.max_attempts:
                backoff = min(retry.backoff_base_ms * (2 ** (attempt - 1)), retry.max_backoff_ms) / 1000.0
                jitter = backoff * retry.jitter_factor * (2.0 * rng.random() - 1.0)
                delay = max(0.0, backoff + jitter)
                await local_env.sleep(delay)

        # Exhausted attempts
        meta = {"attempts": retry.max_attempts}
        if not retry.idempotent:
            meta["warning"] = "retried non-idempotent step"
        return Err(
            ErrInfo(
                code="MAX_RETRIES",
                msg=f"Failed after {retry.max_attempts} attempts",
                cause=last_err,
                meta=meta,
            )
        )

    return lambda: _resilient()


# Mapper for streams (use in async_gen_bounded_map etc.)
def resilient_mapper(
    f: Callable[[T], AsyncPlan[U]],
    retry: RetryPolicy,
    timeout: TimeoutPolicy | None = None,
    env: ResilienceEnv | None = None,
) -> Callable[[T], AsyncPlan[U]]:
    def wrapped(item: T) -> AsyncPlan[U]:
        return async_with_resilience(f(item), retry, timeout, env)
    return wrapped

4. Reference Implementations (pure domain code)

4.1 Resilient embedding step

retry_policy = RetryPolicy(max_attempts=4)
timeout_policy = TimeoutPolicy(timeout_ms=8000)

def resilient_embed_chunk(
    chunk: ChunkWithoutEmbedding,
    embedder: EmbeddingCap,
) -> AsyncPlan[EmbeddedChunk]:
    return async_with_resilience(
        async_embed_chunk(chunk, embedder),  # Core 1 AsyncPlan
        retry_policy,
        timeout_policy,
    )

4.2 Resilient bounded RAG pipeline

def async_rag_pipeline_resilient_bounded(
    storage: StoragePort,
    input_path: str,
    env: RagEnv,
    embedder: EmbeddingCap,
    bp_policy: BackpressurePolicy,
    retry: RetryPolicy,
    timeout: TimeoutPolicy | None,
) -> AsyncGen[EmbeddedChunk]:
    unbounded = async_rag_pipeline_stream(storage, input_path, env, embedder)
    mapper = resilient_mapper(lambda c: async_embed_chunk(c, embedder), retry, timeout)
    return async_gen_bounded_map(unbounded, mapper, bp_policy)

Shell consumption (unchanged — resilience happens automatically inside workers):

async def run_rag_resilient():
    bp_policy = BackpressurePolicy(max_concurrent=8)
    retry = RetryPolicy(max_attempts=4)
    timeout = TimeoutPolicy(timeout_ms=8000)

    stream = async_rag_pipeline_resilient_bounded(real_storage, "data.csv", env, embedder, bp_policy, retry, timeout)
    async for result in stream():
        # same as Core 3
        ...

4.3 Before → After (the real transformation)

# BEFORE – fragile embedding with ad-hoc retry
async def embed_with_adhoc_retry(chunk):
    for attempt in range(4):
        try:
            return await asyncio.wait_for(embedder.embed(chunk.text.content), timeout=8)
        except:
            await asyncio.sleep(0.1 * (2 ** attempt))
    raise RuntimeError("embed failed")

# AFTER – pure, configurable resilience
def embed_resilient(
    chunk: ChunkWithoutEmbedding,
    embedder: EmbeddingCap,
    retry: RetryPolicy = RetryPolicy(max_attempts=4),
    timeout: TimeoutPolicy | None = TimeoutPolicy(timeout_ms=8000),
) -> AsyncPlan[EmbeddedChunk]:
    return async_with_resilience(
        async_embed_chunk(chunk, embedder),
        retry,
        timeout,
    )

5. Property-Based Proofs (all pass in CI)

@pytest.mark.asyncio
async def test_identity_when_no_retry_no_timeout():
    async def step():
        return Ok(42)

    base: AsyncPlan[int] = lambda: step()
    plan = async_with_resilience(base, RetryPolicy(max_attempts=1), None)

    assert plan is base  # exact identity (fast-path)

    res = await plan()
    assert isinstance(res, Ok) and res.value == 42

@given(attempts=st.integers(2, 10))  # attempts >= 2 for real retry cases
@pytest.mark.asyncio
async def test_retry_bounded_attempts(attempts):
    policy = RetryPolicy(max_attempts=attempts, retriable_codes=frozenset({"TRANSIENT"}))

    call_count = 0
    async def always_fail():
        nonlocal call_count
        call_count += 1
        return Err(ErrInfo(code="TRANSIENT", msg="fail"))

    plan = async_with_resilience(lambda: always_fail(), policy, None)
    res = await plan()

    assert isinstance(res, Err) and res.error.code == "MAX_RETRIES"
    assert call_count == attempts

@given(base_ms=st.integers(50, 500))
@pytest.mark.asyncio
async def test_backoff_cap_respected(base_ms):
    policy = RetryPolicy(max_attempts=10, backoff_base_ms=base_ms, max_backoff_ms=1000, jitter_factor=0.0)  # deterministic for test

    delays: list[float] = []
    async def fake_sleep(d: float) -> None:
        delays.append(d)

    env = ResilienceEnv(rng=Random(42), sleep=fake_sleep)

    async def force_retry():
        return Err(ErrInfo(code="TRANSIENT", msg="retry"))

    plan = async_with_resilience(lambda: force_retry(), policy, None, env)
    await plan()  # triggers retries

    for attempt, delay in enumerate(delays, start=1):
        expected = min(base_ms * (2 ** (attempt - 1)), policy.max_backoff_ms) / 1000.0
        assert abs(delay - expected) < 1e-9
@pytest.mark.asyncio
async def test_timeout_cancels_and_retries_if_retriable():
    policy = RetryPolicy(max_attempts=2, retriable_codes=frozenset({"TIMEOUT"}))
    timeout_policy = TimeoutPolicy(timeout_ms=10)

    call_count = 0
    async def hanging():
        nonlocal call_count
        call_count += 1
        await asyncio.sleep(10)  # will timeout
        return Ok("never")

    plan = async_with_resilience(lambda: hanging(), policy, timeout_policy)
    res = await plan()

    assert call_count == 2  # retried once after timeout
    assert isinstance(res, Err) and res.error.code == "MAX_RETRIES"
@pytest.mark.asyncio
async def test_non_retriable_error_no_retry():
    policy = RetryPolicy(max_attempts=5)

    async def permanent_fail():
        return Err(ErrInfo(code="PERMANENT", msg="bad"))

    plan = async_with_resilience(lambda: permanent_fail(), policy, None)
    res = await plan()

    assert isinstance(res, Err) and res.error.code == "PERMANENT"
@pytest.mark.asyncio
async def test_non_idempotent_warning():
    policy = RetryPolicy(max_attempts=3, idempotent=False)

    with warnings.catch_warnings(record=True) as w:
        warnings.simplefilter("always")

        async def side_effect():
            return Err(ErrInfo(code="TRANSIENT", msg="retry me"))

        plan = async_with_resilience(lambda: side_effect(), policy, None)
        await plan()

        assert len(w) == 1
        assert "non-idempotent" in str(w[0].message)

6. Runtime Guarantees

Policy Setting Max Executions Max Wall-clock (approx) Notes
max_attempts=1 1 timeout_ms Exact identity wrapper
max_attempts=N N N × timeout_ms + capped exponential backoff Bounded by policy
timeout=None max_attempts capped backoff sum No per-attempt timeout

7. Anti-Patterns & Immediate Fixes

Anti-Pattern Symptom Fix
Hard-coded retry/timeout Inflexible, untestable Use RetryPolicy / TimeoutPolicy
Retrying non-idempotent steps Duplicate side-effects Set idempotent=False → runtime warning
No timeout on network calls Hung tasks Always wrap external calls in timeout
Infinite retry Live-lock on failure Always set finite max_attempts

8. Pre-Core Quiz

  1. Policies are…? → Pure immutable data
  2. Resilience is applied via…? → async_with_resilience returning an AsyncPlan
  3. Where do sleep/jitter/cancellation happen…? → Only when the plan is driven
  4. How to test deterministically…? → Inject ResilienceEnv with fake RNG/sleep
  5. Real power comes from…? → Configurable resilience without impurity

9. Post-Core Exercise

  1. Define a RetryPolicy and TimeoutPolicy for your flakiest external call.
  2. Wrap an AsyncPlan step with async_with_resilience.
  3. Add to a bounded stream via resilient_mapper.
  4. Write a test with fake ResilienceEnv asserting exact attempt count and backoff delays.
  5. Celebrate – you now have production-grade resilience.

Next → M08C05: Rate Limiting & Fairness – Cooperative Scheduling

You now have resilient, backpressure-safe async streams — ready for real-world flakiness and load. The remaining cores add fairness and complete the production Async FuncPipe.

M08C04 is now frozen.