M08C10: Law-Like Properties for Async Pipelines (Idempotence, At-Most-Once, No Duplication)¶
Module 08 – Main Track Core
Main track: Cores 1–10 (Async / Concurrent Pipelines → Production).
This is the final core of Module 08. We add no new combinators — we obtain overwhelming evidence that the pipeline built from C01–C09 satisfies strong, law-like properties under retries, failures, partial consumption, cancellations, and arbitrary cooperative interleavings — with zero runtime enforcement overhead.
Progression Note¶
Module 8 is Async FuncPipe & Backpressure — the lightweight, production-grade concurrency layer that sits directly on top of Module 7’s effect boundaries.
You have now completed the entire Async FuncPipe toolbox.
Module 9 begins integration with the wider Python ecosystem using exactly these primitives.
| Module | Focus | Key Outcomes |
|---|---|---|
| 7 | Effect Boundaries & Resource Safety | Ports & adapters, capability interfaces, resource-safe effect isolation |
| 8 | Async FuncPipe & Backpressure | Async streams, bounded queues, timeouts/retries, fairness & rate limiting |
| 9 | FP Across Libraries and Frameworks | Stdlib FP, data/ML stacks, web/CLI/distributed integration |
| 10 | Refactoring, Performance, and Future-Proofing | Systematic refactors, performance budgets, governance & evolution |
Core question
How do you obtain overwhelming evidence that your async FuncPipe pipeline satisfies idempotence, at-most-once success, and no duplication under retries, failures, partial runs, cancellations, and arbitrary interleavings — using only property-based testing with deterministic fake effects and schedules, without any runtime enforcement?
We take the complete RAG pipeline from C09 and obtain overwhelming evidence (via Hypothesis over huge families of randomised failure schedules and interleavings) that it satisfies these laws — using fake stores, fake clocks, and a deterministic interpreter.
The naïve pattern everyone lives with:
# BEFORE – "it works in happy path" only
async def rag_risky(...):
# retries, merges, upserts scattered
# no one knows what happens on retry / cancellation / interleave
...
Works locally, explodes in production with duplicates, missing chunks, or inconsistent state.
The production pattern: design the description so that it satisfies the laws by construction (stable deterministic keys, UPSERT, retry-on-Err-only, serialized merge with disjoint-key precondition) → obtain overwhelming evidence that the laws hold for huge randomised families of failure schedules and interleavings via property-based testing with fake everything.
No runtime guards. No locks. No distributed transactions. Just pure descriptions + overwhelming evidence in CI.
Audience: Engineers who refuse to ship async pipelines without overwhelming evidence they won’t duplicate or lose data under failure.
Outcome 1. Every pipeline that satisfies the preconditions in the decision table (stable keys, idempotent ops, UPSERT, key-disjoint merges) satisfies idempotence, at-most-once success, and no duplication — with overwhelming evidence in CI. 2. Laws hold under retries, cancellations, partial consumption, and arbitrary cooperative interleavings. 3. Zero runtime overhead — all checks are design + test-time verification. 4. Confidence to run the same pipeline in dev, staging, and production without “it works on my machine” surprises.
Tiny Non-Domain Example – Idempotent Counter with Retries (intuition only)¶
policy = RetryPolicy(max_attempts=3)
def risky_increment(key: str, store: FakeStore) -> AsyncAction[Result[int, ErrInfo]]:
async def _act() -> Result[int, ErrInfo]:
current = store.get(key, 0)
if random.random() < 0.7: # 70 % transient failure
return Err(ErrInfo(code="TRANSIENT"))
store[key] = current + 1
return Ok(store[key])
return lambda: _act()
def resilient_increment(key: str, store: FakeStore) -> AsyncAction[Result[int, ErrInfo]]:
return async_with_retry(risky_increment(key, store), policy)
# In real tests we inject deterministic failure schedules instead of random
Why Law-Like Properties via Property-Based Testing? (Three bullets every engineer should internalise)¶
- Zero runtime cost: No locks, no coordination — pure descriptions, overwhelming evidence offline.
- Covers the impossible: Test huge families of failure schedules, interleavings, and cancellations that would never appear in manual testing.
- Future-proof: Add a new combinator → add its law-preserving evidence → sleep well forever.
1. Laws & Invariants (overwhelming evidence in CI)¶
| Law | Statement | Enforcement |
|---|---|---|
| Idempotence | Repeated interpretation with the same failure schedule from the same state yields the same final state and a sub-multiset of attempts/emissions (no additional attempts/emissions on repeat) | Hypothesis + deterministic interpreter |
| At-Most-Once Success | Each key succeeds (emits Ok) at most once per full run, regardless of retries/cancellations | Hypothesis + fake store |
| No Duplication | Each key is emitted (Ok or Err) at most once per full run, regardless of interleaving | Hypothesis + interleave simulator |
| Partial Run Safety | Early cancellation or error short-circuits cleanly — no partial state, no dangling resources | Hypothesis + explicit cancellation (random prefix) |
| Composition Preservation | If upstreams satisfy the laws and are key-disjoint, merge preserves them | Executable precondition + evidence |
These are the async analogues of the laws we obtain overwhelming evidence for with full Hypothesis suites in Module 10’s property-based CI.
We obtain overwhelming evidence of at-most-once and no duplication — not formal proof of exactly-once (which would require distributed consensus). Under the assumptions of stable deterministic keys and UPSERT semantics in adapters, the laws hold for huge randomised families of failure schedules and interleavings.
Key definition for laws
For successes (Ok), key = the originating Chunk.doc_id (stable content hash).
For errors (Err), key = ErrInfo.path[0] (the originating Chunk.doc_id).
This ensures Ok and Err for the same logical item share a key.
2. Decision Table – When Laws Hold by Construction¶
| Component | Requires Stable Key? | Idempotent Op? | Laws Hold If… (preconditions) |
|---|---|---|---|
| Pure map / filter | No | Yes | Always (pure function, upstream satisfies laws) |
| Retry wrapper | Yes | Yes | Retry on typed Err only |
| Chunking + batch embed | Yes | Yes | Batch API tolerant of duplicates in observable behaviour |
| Vector DB upsert | Yes | Yes | UPSERT + stable ID |
| Merge of multiple sources | Yes | Yes | Upstream key-disjoint (checked via helper) |
If every step has stable keys and is idempotent → whole pipeline satisfies the laws by construction (with overwhelming evidence in CI).
3. Public API – No New Combinators, Only Proof Helpers¶
No new runtime code. Laws are satisfied by design using existing combinators. Tests use these helpers (in tests/helpers.py):
from collections import Counter
def attempt_norm(trace: list[tuple[str, str]]) -> Counter[tuple[str, str]]:
# trace element: (key, outcome) where outcome ∈ {"ok", "err", "retry"}
return Counter(trace)
def emission_norm(trace: list[tuple[str, str]]) -> Counter[tuple[str, str]]:
# trace element: (key, outcome) where outcome ∈ {"ok", "err"}
return Counter(trace)
def is_key_disjoint(stream1: AsyncGen[T], stream2: AsyncGen[T]) -> bool:
# Sample-based check: pull first N items from each and verify no overlapping keys
# In real pipelines we often prove disjointness statically via types or config
...
4. Before → After – Lawful RAG Pipeline¶
# BEFORE – no laws, pray
async def rag_risky(...):
# retries, merges, upserts scattered
# duplicates on retry, inconsistent on cancel
# AFTER – designed for laws, overwhelming evidence in CI
def rag_lawful_desc(
storage: StoragePort,
path: str,
embed_port: EmbedPort,
vector_port: VectorStorePort,
policy: ChunkPolicy[Chunk],
) -> AsyncGen[Result[None, ErrInfo]]:
raw_lines = async_fetch_raw_lines(storage, path)
parsed = async_gen_map(raw_lines, lift_sync(parse_line_sync))
cleaned = async_gen_map(parsed, lambda doc: lift_sync_with_executor(clean_doc_sync, executor)(doc, env))
chunked = async_gen_flat_map(cleaned, lambda doc: lift_sync_gen_with_executor(chunk_doc_sync, executor)(doc, env))
chunked_b = async_gen_chunk(chunked, policy)(sleeper)
embedded = async_gen_map_action(chunked_b, lambda batch: embed_port.embed_batch([c.text for c in batch]))
flat_emb = async_gen_flat_map(embedded)
stored = async_gen_map_action(flat_emb, lambda e: vector_port.upsert(e.value) if isinstance(e, Ok) else async_pure(Err(e.error)))
return stored
All operations use stable deterministic keys (Chunk.id = content hash) and idempotent UPSERT → laws hold by construction + overwhelming evidence.
5. Property-Based Proofs (all pass in CI – fully deterministic)¶
@given(
keys=st.lists(st.integers(min_value=0, max_value=1000), min_size=0, max_size=50, unique=True),
schedule=st.lists(st.booleans(), min_size=0, max_size=200),
)
@pytest.mark.asyncio
async def test_async_pipeline_idempotence_and_no_duplication(keys, schedule):
# Pad/truncate schedule (3 attempts per key, max)
schedule = schedule[:len(keys)*3] + [True] * max(0, (len(keys)*3 - len(schedule)))
embedder = FakeEmbedder(schedule=schedule)
store = FakeVectorStore()
retry = RetryPolicy(max_attempts=3, retriable_codes=frozenset({"TRANSIENT"}))
desc = lawful_desc(keys, embedder, store, retry)
emitted1 = [r async for r in desc()]
state1 = store.snapshot()
store.reset()
embedder.reset_schedule(schedule)
emitted2 = [r async for r in desc()]
state2 = store.snapshot()
assert emitted2 == emitted1
assert state2 == state1
ok_keys = [r.value for r in emitted1 if isinstance(r, Ok)]
all_keys = [r.value if isinstance(r, Ok) else r.error.path[0] for r in emitted1]
assert len(ok_keys) == len(set(ok_keys))
assert len(all_keys) == len(set(all_keys))
@given(chunks=st.lists(st.builds(Chunk, doc_id=st.text(min_size=1)), max_size=30))
@pytest.mark.asyncio
async def test_rag_partial_cancellation_safety(chunks):
store = FakeVectorStore()
desc = rag_lawful_desc(
storage=FakeStorage(chunks),
path="dummy",
embed_port=FakeEmbedPort(always_succeed=True),
vector_port=store,
policy=ChunkPolicy(max_units=5),
)
ait = desc()
consumed = 0
try:
async for _ in ait:
consumed += 1
if consumed >= len(chunks) // 2:
break # Simulate cancellation mid-stream (Hypothesis varies the prefix length)
finally:
await ait.aclose()
# No leaks, partial state consistent with consumed prefix
assert len(store.data) == consumed
assert store.open_connections == 0
6. Runtime Guarantees¶
| Law | Runtime Cost | Guarantee Under |
|---|---|---|
| Idempotence | Zero | Same schedule, retries, cancellations |
| At-Most-Once Success | Zero | Retries on Err only, stable keys |
| No Duplication | Zero | Key-disjoint merge + serialized processing |
7. Anti-Patterns & Immediate Fixes¶
| Anti-Pattern | Symptom | Fix |
|---|---|---|
| Retry on exceptions | Duplicates on cancellation | Retry on typed Err only |
| Non-stable keys | Inconsistent idempotence | Derive from content hash |
| Parallel merge without disjoint keys | Duplicates on interleave | Serialized merge or explicit coordination |
| No cancellation cleanup | Resource leaks on partial run | Always await ait.aclose() in finally |
8. Pre-Core Quiz¶
- Idempotence means…? → Same final state + no additional attempts/emissions on repeat with same schedule
- At-most-once success means…? → Each key emits Ok ≤1 time per run
- No duplication means…? → Each key is emitted ≤1 time per run (Ok or Err)
- Laws are obtained via…? → Design + overwhelming evidence from property-based testing
- The golden rule? → Prove overwhelming evidence in CI, ship without runtime guards
9. Post-Core Exercise¶
- Add stable key derivation (content hash → Chunk.id) to your real pipeline.
- Replace every raw retry/upsert with the lawful versions.
- Add the full idempotence/at-most-once/no-duplication property test with injected failure schedules.
- Run with cancellations and interleavings — watch all tests pass.
- Sleep well — your async pipeline is now overwhelmingly reliable.
End of Module 08.
You have completed the full production-grade Async FuncPipe: pure descriptions, backpressure, resilience, chunking, fairness, rate-limiting, and law-like properties with overwhelming evidence — all composable, zero runtime overhead, fully deterministic in CI.
Module 9 begins applying this exact machinery across the Python ecosystem (Pandas, FastAPI, Dask, etc.).
M08C10 is now frozen.