Skip to content

M08C05: Deterministic Testing of Async Functional Pipelines (Fake Time/Executors)

Module 08 – Main Track Core

Main track: Cores 1–10 (Async / Concurrent Pipelines → Production).
This is a required core. Every async FuncPipe pipeline that involves timing (retries, timeouts, backoff jitter) is tested deterministically using injected fake clocks, sleepers, RNGs, and timeout contexts — eliminating flakes and making CI fast and reliable.

Progression Note

Module 8 is Async FuncPipe & Backpressure — the lightweight, production-grade concurrency layer that sits directly on top of Module 7’s effect boundaries.

Module Focus Key Outcomes
7 Effect Boundaries & Resource Safety Ports & adapters, capability protocols, idempotent effects, explicit sessions
8 Async FuncPipe & Backpressure Async FuncPipe, non-blocking pipelines, backpressure, basic fairness
9 FP Across Libraries and Frameworks FuncPipe style in Pandas/Polars/Dask, FastAPI, CLI, distributed systems
10 Refactoring, Performance, Future-Proofing Systematic refactors, performance budgets, governance and long-term evolution

Core question
How do you test timing-dependent async pipelines deterministically — injecting fake time, sleep, RNG, and timeout contexts so that retries, timeouts, and jitter become reproducible, fast, and flake-free while preserving logical equivalence to production?

Symmetry with Previous Cores
All timing-dependent behavior added in M08C04 (retry, timeout, jitter) is deliberately designed for injection via ResilienceEnv + TimeoutCtx. This core is the payoff: the same pure AsyncPlan that runs in production can be executed under total control in tests.

What you now have after M08C04 + this core - Resilient, backpressure-safe async streams
- Fully deterministic testing of all timing-dependent behavior (including logical timeouts)
- Zero-flake CI for retries, timeouts, backoff jitter
- Fast test execution (no real sleeps)

What the rest of Module 8 adds - Rate limiting & fairness (M08C06)
- Full production async RAG pipeline with all safeguards

1. Laws & Architectural Invariants

Category Name Description Enforcement
Algebraic Laws Logical Equivalence For the same inputs and injected failure patterns, the observable Result sequence and attempt counts match real execution Dual-run tests (real + fake)
Architectural Invariants Determinism Same seed + same injections → identical results (including jitter, backoff order) Seeded Hypothesis
Architectural Invariants No Real Effects Test execution performs no real I/O, no real non-zero sleep, and uses only seeded RNG (no uncontrolled entropy) Banned imports + mock checks
Architectural Invariants Speed Full pipeline tests run in < 3 s (no real delays) CI timeout
Architectural Invariants Coverage All timing branches (success, retry, timeout, max-retries) exercised in properties Hypothesis shrinking

Note on equivalence: We guarantee logical equivalence of Result sequences, attempt counts, backoff delays, and timeout triggering for steps that are pure with respect to external effects (or cooperatively cancellable). Real cancellation side-effects of timeouts are not simulated — fake timeouts check the deadline only at context exit.

2. Decision Table

Scenario Real Execution Needs Determinism Recommended
Pure logic / no timing Yes No Normal sync tests
Retry / backoff / jitter Flaky/slow Yes make_test_resilience_env with fake sleep + fixed RNG
Timeout logic Flaky/slow Yes Injected TimeoutCtx + fake clock
Full pipeline Slow/real I/O Yes Hybrid: fake time + real I/O (or full fake)

Rule: - Every test that touches async_with_resilience must inject a make_test_resilience_env().
- Every test that uses a TimeoutPolicy must also inject a fake TimeoutCtx.

3. Public API – Deterministic Testing (src/funcpipe_rag/domain/effects/async_/resilience.py – mypy --strict clean)

# funcpipe_rag/domain/effects/async_/resilience.py
from __future__ import annotations

from dataclasses import dataclass
from typing import Callable, Awaitable
from random import Random
import asyncio
from contextlib import AbstractAsyncContextManager

from funcpipe_rag.domain.effects.async_.resilience import ResilienceEnv

@dataclass
class FakeClock:
    current_s: float = 0.0

    def now_s(self) -> float:
        return self.current_s

    def advance_s(self, seconds: float) -> None:
        self.current_s += seconds


class FakeTimeout(AbstractAsyncContextManager[None]):
    """Logical timeout that checks against a FakeClock — no real waiting."""
    def __init__(self, clock: FakeClock, seconds: float):
        self.clock = clock
        self.deadline = clock.now_s() + seconds

    async def __aenter__(self) -> None:
        pass

    async def __aexit__(self, exc_type, exc, tb) -> bool:
        if exc_type is not None:
            return False
        if self.clock.now_s() >= self.deadline:  # >= mirrors typical timeout semantics
            raise asyncio.TimeoutError
        return False


def make_fake_timeout_ctx(clock: FakeClock) -> Callable[[float], FakeTimeout]:
    return lambda seconds: FakeTimeout(clock, seconds)


def make_test_resilience_env(
    *,
    seed: int = 42,
    sleep: Callable[[float], Awaitable[None]] | None = None,
) -> ResilienceEnv:
    async def _noop_sleep(_: float) -> None:
        return None

    return ResilienceEnv(
        rng=Random(seed),
        sleep=sleep or _noop_sleep,
    )

4. Reference Implementations (pure domain code)

4.1 Deterministic retry with fake sleep (no real delay)

@pytest.mark.asyncio
async def test_retry_backoff_deterministic():
    delays: list[float] = []

    async def recording_sleep(seconds: float) -> None:
        delays.append(seconds)
        await asyncio.sleep(0)

    test_env = make_test_resilience_env(sleep=recording_sleep)

    policy = RetryPolicy(max_attempts=4, backoff_base_ms=100, jitter_factor=0.0)  # deterministic for doc

    call_count = 0
    async def flaky_step():
        nonlocal call_count
        call_count += 1
        if call_count < 4:
            return Err(ErrInfo(code="TRANSIENT", msg="fail"))
        return Ok("success")

    plan = async_with_resilience(lambda: flaky_step(), policy, None, test_env)
    res = await plan()

    assert isinstance(res, Ok)
    assert call_count == 4
    # Exact exponential backoff (no jitter)
    from pytest import approx
    assert delays == approx([0.1, 0.2, 0.4])

4.2 Deterministic logical timeout

@pytest.mark.asyncio
async def test_timeout_triggers_correctly():
    clock = FakeClock()

    async def advancing_sleep(seconds: float) -> None:
        clock.advance_s(seconds)
        await asyncio.sleep(0)

    test_env = make_test_resilience_env(sleep=advancing_sleep)
    timeout_ctx = make_fake_timeout_ctx(clock)

    timeout_policy = TimeoutPolicy(timeout_ms=50)

    async def hanging():
        await test_env.sleep(0.1)  # "takes" 100ms in fake time
        return Ok("never")

    plan = async_with_resilience(
        lambda: hanging(),
        RetryPolicy(max_attempts=1),
        timeout_policy,
        test_env,
        timeout_ctx=timeout_ctx,
    )
    res = await plan()

    assert isinstance(res, Err) and res.error.code == "TIMEOUT"
    assert clock.current_s >= 0.05  # timeout fired at deadline

4.3 Full RAG pipeline test with injections

@pytest.mark.asyncio
async def test_rag_pipeline_resilient_deterministic():
    clock = FakeClock()

    async def advancing_sleep(seconds: float) -> None:
        clock.advance_s(seconds)
        await asyncio.sleep(0)

    test_env = make_test_resilience_env(sleep=advancing_sleep)
    timeout_ctx = make_fake_timeout_ctx(clock)

    policy = BackpressurePolicy(max_concurrent=8)
    retry = RetryPolicy(max_attempts=3)
    timeout = TimeoutPolicy(timeout_ms=5000)

    stream = async_rag_pipeline_resilient_bounded(
        real_storage,
        "test.csv",
        fake_flaky_embedder,
        policy,
        retry,
        timeout,
        env=test_env,
        timeout_ctx=timeout_ctx,
    )

    results = [r async for r in stream()]

    # With fixed RNG and fake sleep, the exact sequence of retries/success is reproducible
    assert len(results) == expected_count
    assert all(isinstance(r, Ok) for r in results)  # all eventually succeed under test embedder

5. Property-Based Proofs (all pass in CI)

@given(attempts=st.integers(2, 10), base_ms=st.integers(50, 500))
@pytest.mark.asyncio
async def test_backoff_cap_and_jitter_deterministic(attempts, base_ms):
    policy = RetryPolicy(
        max_attempts=attempts,
        backoff_base_ms=base_ms,
        max_backoff_ms=1000,
        jitter_factor=0.5,
    )

    delays: list[float] = []
    async def recording_sleep(seconds: float) -> None:
        delays.append(seconds)

    test_env = make_test_resilience_env(sleep=recording_sleep)

    async def force_retry():
        return Err(ErrInfo(code="TRANSIENT", msg="retry"))

    plan = async_with_resilience(lambda: force_retry(), policy, None, test_env)
    await plan()  # triggers retries

    # With fixed seed, jitter is deterministic
    # Verify expected range
    for attempt, delay in enumerate(delays, start=1):
        expected_base = min(base_ms * (2 ** (attempt - 1)), policy.max_backoff_ms) / 1000.0
        assert expected_base * 0.5 <= delay <= expected_base * 1.5

6. Runtime Guarantees

Test Type Real Wall-clock Deterministic Speed Notes
Pure logic n/a Yes Instant Sync tests
Timing-dependent Flaky/slow Yes < 50 ms Fake sleep + fixed RNG
Full pipeline Minutes Yes < 3 s Fake embedder + injected resilience

7. Anti-Patterns & Immediate Fixes

Anti-Pattern Symptom Fix
Real asyncio.sleep in tests Slow/flaky CI Inject fake sleeper via make_test_resilience_env
Unseeded Random() Jitter flakes Fixed seed in make_test_resilience_env
Monkey-patching globals Brittle, non-local Explicit ResilienceEnv injection
Testing only happy path Flaky in production Hypothesis + injected failures

8. Pre-Core Quiz

  1. Why inject fake time…? → To make timing deterministic and fast
  2. ResilienceEnv contains…? → RNG and sleep function
  3. Fake clock is used for…? → Backoff verification and logical timeouts
  4. Tests must…? → Inject make_test_resilience_env(); inject fake TimeoutCtx when using timeouts
  5. Real power comes from…? → Zero-flake, instant verification of resilience logic

9. Post-Core Exercise

  1. Add make_test_resilience_env and make_fake_timeout_ctx to your project.
  2. Convert one flaky async test to use injections.
  3. Add a property test asserting exact backoff delays with fake sleep.
  4. Run your full test suite — confirm it completes in < 3 s with zero flakes.
  5. Celebrate – your async pipeline is now bulletproof in CI.

Next → M08C06: Rate Limiting & Fairness – Cooperative Scheduling

You now have fully deterministic testing of all timing-dependent async behavior — the final piece that makes Async FuncPipe truly production-ready and CI-friendly.

M08C05 is now frozen.