Core 5: Property-Based Testing for Async and Streaming Pipelines (Hypothesis Strategies and Faked I/O)¶
Module 10
Core question:
How do you use property-based testing with Hypothesis to verify laws in async and streaming pipelines, using strategies for iterators/generators and faked I/O/time/executors to ensure determinism and avoid flakiness?
In this core, we extend property-based testing (PBT) to async and streaming pipelines in the FuncPipe RAG Builder (now at funcpipe-rag-10). We use Hypothesis strategies for stream-like data (e.g., lists as finite proxies for iterators), concrete fakes for I/O (injected deps), virtual clocks for time (via anyio), and deterministic executors to make tests reproducible. This verifies deterministic laws like idempotence, no duplication, and at-most-once in async contexts. Builds on Core 4's sync PBT.
Motivation Bug: Async tests flake on timing/race conditions; streaming tests hang on unbounded data—PBT with fakes enables deterministic verification of async laws, catching bugs in compositions like retries/backpressure.
Delta from Core 4: PBT for pure sync; this adds async/streaming with fakes for determinism.
PBT Protocol (Contract, Entry/Exit Criteria): - Properties: As Core 4, plus stream-specific (e.g., "partial consumption preserves laziness"); derive from laws with explicit preconditions (e.g., "no duplication on retry" assumes idempotent adapters). - Semantics: Split: pure laws on sync models; adapter laws with det schedulers; concurrency laws with interleavings/virtual clocks. - Purity: Fake effects for det; use async def test_ with @given. - Error Model: Include async failures (e.g., timeouts as Err). - Resource Safety: Bound generation (e.g., max_size); no infinite gens; deadline per test; anyio.fail_after for hangs. - Integration: Add to RAG (test async chunking); CI runs async suite. - Mypy Config: --strict; async typing. - Exit: Laws pass, diversity hit, no flakiness, CI green.
Audience: Engineers testing async FP pipelines reliably.
Outcome: 1. Write PBT for invariants in async/stream layers. 2. Fake I/O/time/exec for det. 3. Test RAG async properties.
1. Laws & Invariants¶
| Invariant | Description | Enforcement |
|---|---|---|
| Property Inv | Properties hold for generated streams; failures shrink. | Hypothesis runs |
| Determinism Inv | Tests pass under fakes; same seed == same output (mod scheduler nondet). | Seeded PBT |
| Laziness Inv | Partial consumption (e.g., islice) doesn't evaluate full stream. | PBT with counters |
| No Dup Inv | Streams emit no new duplicates relative to input multiset. | PBT collection |
| Propagation Inv | Errors/timeouts propagate without corruption. | PBT error injection |
These extend Core 4 for async/stream. Formalize with preconditions (e.g., "no dup on retry" assumes idempotent adapters).
2. Decision Table¶
| Scenario | Async Needed | Faking Needed | Recommended |
|---|---|---|---|
| Pure laws | No | No | Sync PBT (Core 4) |
| Adapter laws | Yes | Yes | Async PBT with fakes |
| Concurrency laws | Yes | Yes | Interleavings/clocks |
| Sync pure | No | No | Core 4 PBT |
Choose async PBT for coros; sync for data.
3. Public API (Strategies/Fakes for Async/Stream)¶
Strategies for streams; use lists as finite proxies, with async iterables for interleavings.
from typing import AsyncIterator, TypeVar, List, Callable, Generic
from collections.abc import AsyncIterator as AsyncIteratorType
from hypothesis import strategies as st
from funcpipe_rag import Result, Ok, Err, ErrInfo, RawDoc
import anyio # For virtual clock
T = TypeVar("T")
stream_strategy = st.lists(raw_doc_strategy(), max_size=50, unique_by=lambda d: d.doc_id) # Finite proxy; unique IDs
@st.composite
def async_stream_strategy(draw) -> Callable[[], AsyncIteratorType[RawDoc]]:
items = draw(stream_strategy)
pauses = draw(st.lists(st.booleans(), min_size=len(items), max_size=len(items)))
async def gen():
for x, p in zip(items, pauses):
if p: await anyio.sleep(0)
yield x
return gen
class FakeStorage(Generic[T]):
def __init__(self, items: List[Result[T, ErrInfo]]):
self._items = items
async def read_docs(self, path: str) -> AsyncIteratorType[Result[T, ErrInfo]]:
for d in self._items:
yield d
4. Reference Implementations¶
4.1 Pure Layer PBT (Idempotence - Sync Model)¶
Test pure core sync.
from hypothesis import given, settings, Phase
from funcpipe_rag import eq_pure
@given(raw=raw_doc_strategy())
@settings(max_examples=200, deadline=1000, phases=(Phase.explicit, Phase.generate, Phase.target))
def test_clean_idempotent(raw):
once = clean_doc(raw)
twice = clean_doc(once)
assert eq_pure([once], [twice], key=lambda d: (d.doc_id, d.title, d.abstract, d.categories))
4.2 Adapter Layer PBT (Equivalence - Det Async)¶
Inject fakes.
import pytest
from hypothesis import given, settings, Phase
from funcpipe_rag import eq_pure
from typing import List
@given(docs=doc_list_strategy, seed=st.integers(0, 2 ** 32 - 1))
@settings(max_examples=200, deadline=None)
@pytest.mark.anyio
async def test_async_adapter_equiv(docs, seed):
rng_imp = Random(seed)
rng_fp = Random(seed)
storage_imp = FakeStorage[RawDoc]([Ok(d) for d in docs])
storage_fp = FakeStorage[RawDoc]([Ok(d) for d in docs])
imp = await imperative_async_rag(rng=rng_imp, storage=storage_imp, path="fake_path")
fp = await fp_async_rag(rng=rng_fp, storage=storage_fp, path="fake_path")
assert eq_pure(imp, fp, key=lambda c: (c.doc_id, c.start, c.end, c.text))
4.3 Concurrency Layer PBT (No Dup - Interleavings)¶
Use anyio virtual clock.
from anyio.testing import TestClock
from hypothesis import given, settings
from collections import Counter
@given(gen=async_stream_strategy())
@settings(max_examples=200, deadline=None)
@pytest.mark.anyio
async def test_stream_no_dup(gen):
stream = gen()
with anyio.fail_after(2): # Timeout to prevent hangs
processed = [p async for p in async_process_stream(stream)]
# No new duplicates relative to input
input_ids = Counter([d.doc_id async for d in gen()])
output_ids = Counter(p.doc_id for p in processed)
assert all(output_ids[k] <= input_ids[k] for k in output_ids)
4.4 Faking I/O/Time/Exec¶
Use concrete fakes (injected); virtual clock for time.
@given(docs=st.lists(raw_doc_strategy(), max_size=10))
@settings(max_examples=200)
@pytest.mark.anyio
async def test_faked_io(docs):
expected = sum(1 for i, _ in enumerate(docs) if i % 2 == 0)
fake = FakeStorage[RawDoc]([Ok(d) if i % 2 == 0 else Err(ErrInfo("transient")) for i, d in enumerate(docs)])
result = await tested_func(storage=fake)
assert len(result) == expected # Mixed Ok/Err
Virtual clock (via fixture for anyio_backend):
# conftest.py
@pytest.fixture
def anyio_backend():
return "asyncio", {"use_uvloop": True} # Faster loop; determinism from fakes + fixed schedules
Fake exec: Inject sync executor or fake concurrent.futures.
RAG Integration¶
Add async PBT to tests/test_rag_async.py; CI runs.
5. Property-Based Proofs (tests/test_module_10_core5.py)¶
Diversity with events.
from hypothesis import event, target
@given(gen=async_stream_strategy())
@pytest.mark.anyio
async def test_stream_diversity(gen):
stream = gen()
count = 0
async for _ in stream:
count += 1
event(f"n_items={count}")
target(count)
# ... property ...
5.1 Partial Pull / Cancellation Law¶
Test laziness with counters.
@st.composite
def async_stream_strategy_min5(draw) -> Callable[[], AsyncIteratorType[RawDoc]]:
items = draw(st.lists(raw_doc_strategy(), min_size=5, max_size=50, unique_by=lambda d: d.doc_id))
pauses = draw(st.lists(st.booleans(), min_size=len(items), max_size=len(items)))
async def gen():
for x, p in zip(items, pauses):
if p: await anyio.sleep(0)
yield x
return gen
@given(gen=async_stream_strategy_min5())
@pytest.mark.anyio
async def test_partial_laziness(gen):
stream = gen()
counter = 0
async def counted_gen():
nonlocal counter
async for d in stream:
counter += 1
yield d
processed = []
async for p in async_process_stream(counted_gen()):
processed.append(p)
if len(processed) == 5: break # Cancel
assert len(processed) == 5
assert counter == 5 # No overpull
5.2 Retry Law with Failure Schedule¶
Model failures as exceptions (assuming adapter retries exceptions per-item with de-dup).
class TransientError(Exception):
pass
@st.composite
def failure_schedule_strategy(draw):
return draw(st.lists(st.booleans(), max_size=10)) # Fail on True
@given(gen=async_stream_strategy(), schedule=failure_schedule_strategy())
@pytest.mark.anyio
async def test_retry_no_dup(gen, schedule):
failures = iter(schedule)
async def flaky_gen():
async for d in gen():
if next(failures, False): raise TransientError("retryable")
yield d
processed = [p async for p in async_retry_gen(flaky_gen())]
input_docs = [d async for d in gen()]
assert Counter(p.doc_id for p in processed) == Counter(d.doc_id for d in input_docs) # Exact match, no dup/loss
6. Runtime Preservation Guarantee¶
Bound examples/sizes; no deadline if flaky; CI timeouts.
7. Anti-Patterns & Immediate Fixes¶
| Anti-Pattern | Symptom | Fix |
|---|---|---|
| No fakes | Flaky passes/fails | Inject concrete fakes |
| Unbounded streams | Hang/slow | Bound sizes |
| Sync PBT for async | Miss races | Use async def test_ |
| Real I/O | External deps | Fake all |
8. Pre-Core Quiz¶
- PBT async for…? → Laws in coros/streams
- Fake I/O with…? → Concrete fakes
- Determinism by…? → Seed/fake time/exec
- Stateful for…? → Sequence ops
- CI for async…? → pytest-anyio
9. Post-Core Exercise¶
- Write async PBT for RAG stream.
- Fake I/O; test.
- Add to CI.
Pipeline Usage (Idiomatic)
Next: core 6. Future Directions – Pattern Matching, Typing Advances, Next-Step Parallel FuncPipe