M08C05: Deterministic Testing of Async Functional Pipelines (Fake Time/Executors)¶
Module 08 – Main Track Core
Main track: Cores 1–10 (Async / Concurrent Pipelines → Production).
This is a required core. Every async FuncPipe pipeline that involves timing (retries, timeouts, backoff jitter) is tested deterministically using injected fake clocks, sleepers, RNGs, and timeout contexts — eliminating flakes and making CI fast and reliable.
Progression Note¶
Module 8 is Async FuncPipe & Backpressure — the lightweight, production-grade concurrency layer that sits directly on top of Module 7’s effect boundaries.
| Module | Focus | Key Outcomes |
|---|---|---|
| 7 | Effect Boundaries & Resource Safety | Ports & adapters, capability protocols, idempotent effects, explicit sessions |
| 8 | Async FuncPipe & Backpressure | Async FuncPipe, non-blocking pipelines, backpressure, basic fairness |
| 9 | FP Across Libraries and Frameworks | FuncPipe style in Pandas/Polars/Dask, FastAPI, CLI, distributed systems |
| 10 | Refactoring, Performance, Future-Proofing | Systematic refactors, performance budgets, governance and long-term evolution |
Core question
How do you test timing-dependent async pipelines deterministically — injecting fake time, sleep, RNG, and timeout contexts so that retries, timeouts, and jitter become reproducible, fast, and flake-free while preserving logical equivalence to production?
Symmetry with Previous Cores
All timing-dependent behavior added in M08C04 (retry, timeout, jitter) is deliberately designed for injection via ResilienceEnv + TimeoutCtx. This core is the payoff: the same pure AsyncPlan that runs in production can be executed under total control in tests.
What you now have after M08C04 + this core
- Resilient, backpressure-safe async streams
- Fully deterministic testing of all timing-dependent behavior (including logical timeouts)
- Zero-flake CI for retries, timeouts, backoff jitter
- Fast test execution (no real sleeps)
What the rest of Module 8 adds
- Rate limiting & fairness (M08C06)
- Full production async RAG pipeline with all safeguards
1. Laws & Architectural Invariants¶
| Category | Name | Description | Enforcement |
|---|---|---|---|
| Algebraic Laws | Logical Equivalence | For the same inputs and injected failure patterns, the observable Result sequence and attempt counts match real execution | Dual-run tests (real + fake) |
| Architectural Invariants | Determinism | Same seed + same injections → identical results (including jitter, backoff order) | Seeded Hypothesis |
| Architectural Invariants | No Real Effects | Test execution performs no real I/O, no real non-zero sleep, and uses only seeded RNG (no uncontrolled entropy) | Banned imports + mock checks |
| Architectural Invariants | Speed | Full pipeline tests run in < 3 s (no real delays) | CI timeout |
| Architectural Invariants | Coverage | All timing branches (success, retry, timeout, max-retries) exercised in properties | Hypothesis shrinking |
Note on equivalence: We guarantee logical equivalence of Result sequences, attempt counts, backoff delays, and timeout triggering for steps that are pure with respect to external effects (or cooperatively cancellable). Real cancellation side-effects of timeouts are not simulated — fake timeouts check the deadline only at context exit.
2. Decision Table¶
| Scenario | Real Execution | Needs Determinism | Recommended |
|---|---|---|---|
| Pure logic / no timing | Yes | No | Normal sync tests |
| Retry / backoff / jitter | Flaky/slow | Yes | make_test_resilience_env with fake sleep + fixed RNG |
| Timeout logic | Flaky/slow | Yes | Injected TimeoutCtx + fake clock |
| Full pipeline | Slow/real I/O | Yes | Hybrid: fake time + real I/O (or full fake) |
Rule:
- Every test that touches async_with_resilience must inject a make_test_resilience_env().
- Every test that uses a TimeoutPolicy must also inject a fake TimeoutCtx.
3. Public API – Deterministic Testing (src/funcpipe_rag/domain/effects/async_/resilience.py – mypy --strict clean)¶
# funcpipe_rag/domain/effects/async_/resilience.py
from __future__ import annotations
from dataclasses import dataclass
from typing import Callable, Awaitable
from random import Random
import asyncio
from contextlib import AbstractAsyncContextManager
from funcpipe_rag.domain.effects.async_.resilience import ResilienceEnv
@dataclass
class FakeClock:
current_s: float = 0.0
def now_s(self) -> float:
return self.current_s
def advance_s(self, seconds: float) -> None:
self.current_s += seconds
class FakeTimeout(AbstractAsyncContextManager[None]):
"""Logical timeout that checks against a FakeClock — no real waiting."""
def __init__(self, clock: FakeClock, seconds: float):
self.clock = clock
self.deadline = clock.now_s() + seconds
async def __aenter__(self) -> None:
pass
async def __aexit__(self, exc_type, exc, tb) -> bool:
if exc_type is not None:
return False
if self.clock.now_s() >= self.deadline: # >= mirrors typical timeout semantics
raise asyncio.TimeoutError
return False
def make_fake_timeout_ctx(clock: FakeClock) -> Callable[[float], FakeTimeout]:
return lambda seconds: FakeTimeout(clock, seconds)
def make_test_resilience_env(
*,
seed: int = 42,
sleep: Callable[[float], Awaitable[None]] | None = None,
) -> ResilienceEnv:
async def _noop_sleep(_: float) -> None:
return None
return ResilienceEnv(
rng=Random(seed),
sleep=sleep or _noop_sleep,
)
4. Reference Implementations (pure domain code)¶
4.1 Deterministic retry with fake sleep (no real delay)¶
@pytest.mark.asyncio
async def test_retry_backoff_deterministic():
delays: list[float] = []
async def recording_sleep(seconds: float) -> None:
delays.append(seconds)
await asyncio.sleep(0)
test_env = make_test_resilience_env(sleep=recording_sleep)
policy = RetryPolicy(max_attempts=4, backoff_base_ms=100, jitter_factor=0.0) # deterministic for doc
call_count = 0
async def flaky_step():
nonlocal call_count
call_count += 1
if call_count < 4:
return Err(ErrInfo(code="TRANSIENT", msg="fail"))
return Ok("success")
plan = async_with_resilience(lambda: flaky_step(), policy, None, test_env)
res = await plan()
assert isinstance(res, Ok)
assert call_count == 4
# Exact exponential backoff (no jitter)
from pytest import approx
assert delays == approx([0.1, 0.2, 0.4])
4.2 Deterministic logical timeout¶
@pytest.mark.asyncio
async def test_timeout_triggers_correctly():
clock = FakeClock()
async def advancing_sleep(seconds: float) -> None:
clock.advance_s(seconds)
await asyncio.sleep(0)
test_env = make_test_resilience_env(sleep=advancing_sleep)
timeout_ctx = make_fake_timeout_ctx(clock)
timeout_policy = TimeoutPolicy(timeout_ms=50)
async def hanging():
await test_env.sleep(0.1) # "takes" 100ms in fake time
return Ok("never")
plan = async_with_resilience(
lambda: hanging(),
RetryPolicy(max_attempts=1),
timeout_policy,
test_env,
timeout_ctx=timeout_ctx,
)
res = await plan()
assert isinstance(res, Err) and res.error.code == "TIMEOUT"
assert clock.current_s >= 0.05 # timeout fired at deadline
4.3 Full RAG pipeline test with injections¶
@pytest.mark.asyncio
async def test_rag_pipeline_resilient_deterministic():
clock = FakeClock()
async def advancing_sleep(seconds: float) -> None:
clock.advance_s(seconds)
await asyncio.sleep(0)
test_env = make_test_resilience_env(sleep=advancing_sleep)
timeout_ctx = make_fake_timeout_ctx(clock)
policy = BackpressurePolicy(max_concurrent=8)
retry = RetryPolicy(max_attempts=3)
timeout = TimeoutPolicy(timeout_ms=5000)
stream = async_rag_pipeline_resilient_bounded(
real_storage,
"test.csv",
fake_flaky_embedder,
policy,
retry,
timeout,
env=test_env,
timeout_ctx=timeout_ctx,
)
results = [r async for r in stream()]
# With fixed RNG and fake sleep, the exact sequence of retries/success is reproducible
assert len(results) == expected_count
assert all(isinstance(r, Ok) for r in results) # all eventually succeed under test embedder
5. Property-Based Proofs (all pass in CI)¶
@given(attempts=st.integers(2, 10), base_ms=st.integers(50, 500))
@pytest.mark.asyncio
async def test_backoff_cap_and_jitter_deterministic(attempts, base_ms):
policy = RetryPolicy(
max_attempts=attempts,
backoff_base_ms=base_ms,
max_backoff_ms=1000,
jitter_factor=0.5,
)
delays: list[float] = []
async def recording_sleep(seconds: float) -> None:
delays.append(seconds)
test_env = make_test_resilience_env(sleep=recording_sleep)
async def force_retry():
return Err(ErrInfo(code="TRANSIENT", msg="retry"))
plan = async_with_resilience(lambda: force_retry(), policy, None, test_env)
await plan() # triggers retries
# With fixed seed, jitter is deterministic
# Verify expected range
for attempt, delay in enumerate(delays, start=1):
expected_base = min(base_ms * (2 ** (attempt - 1)), policy.max_backoff_ms) / 1000.0
assert expected_base * 0.5 <= delay <= expected_base * 1.5
6. Runtime Guarantees¶
| Test Type | Real Wall-clock | Deterministic | Speed | Notes |
|---|---|---|---|---|
| Pure logic | n/a | Yes | Instant | Sync tests |
| Timing-dependent | Flaky/slow | Yes | < 50 ms | Fake sleep + fixed RNG |
| Full pipeline | Minutes | Yes | < 3 s | Fake embedder + injected resilience |
7. Anti-Patterns & Immediate Fixes¶
| Anti-Pattern | Symptom | Fix |
|---|---|---|
Real asyncio.sleep in tests |
Slow/flaky CI | Inject fake sleeper via make_test_resilience_env |
Unseeded Random() |
Jitter flakes | Fixed seed in make_test_resilience_env |
| Monkey-patching globals | Brittle, non-local | Explicit ResilienceEnv injection |
| Testing only happy path | Flaky in production | Hypothesis + injected failures |
8. Pre-Core Quiz¶
- Why inject fake time…? → To make timing deterministic and fast
ResilienceEnvcontains…? → RNG and sleep function- Fake clock is used for…? → Backoff verification and logical timeouts
- Tests must…? → Inject
make_test_resilience_env(); inject fakeTimeoutCtxwhen using timeouts - Real power comes from…? → Zero-flake, instant verification of resilience logic
9. Post-Core Exercise¶
- Add
make_test_resilience_envandmake_fake_timeout_ctxto your project. - Convert one flaky async test to use injections.
- Add a property test asserting exact backoff delays with fake sleep.
- Run your full test suite — confirm it completes in < 3 s with zero flakes.
- Celebrate – your async pipeline is now bulletproof in CI.
Next → M08C06: Rate Limiting & Fairness – Cooperative Scheduling
You now have fully deterministic testing of all timing-dependent async behavior — the final piece that makes Async FuncPipe truly production-ready and CI-friendly.
M08C05 is now frozen.