Skip to content

Core 5: Property-Based Testing for Async and Streaming Pipelines (Hypothesis Strategies and Faked I/O)

Module 10

Core question:
How do you use property-based testing with Hypothesis to verify laws in async and streaming pipelines, using strategies for iterators/generators and faked I/O/time/executors to ensure determinism and avoid flakiness?

In this core, we extend property-based testing (PBT) to async and streaming pipelines in the FuncPipe RAG Builder (now at funcpipe-rag-10). We use Hypothesis strategies for stream-like data (e.g., lists as finite proxies for iterators), concrete fakes for I/O (injected deps), virtual clocks for time (via anyio), and deterministic executors to make tests reproducible. This verifies deterministic laws like idempotence, no duplication, and at-most-once in async contexts. Builds on Core 4's sync PBT.

Motivation Bug: Async tests flake on timing/race conditions; streaming tests hang on unbounded data—PBT with fakes enables deterministic verification of async laws, catching bugs in compositions like retries/backpressure.

Delta from Core 4: PBT for pure sync; this adds async/streaming with fakes for determinism.

PBT Protocol (Contract, Entry/Exit Criteria): - Properties: As Core 4, plus stream-specific (e.g., "partial consumption preserves laziness"); derive from laws with explicit preconditions (e.g., "no duplication on retry" assumes idempotent adapters). - Semantics: Split: pure laws on sync models; adapter laws with det schedulers; concurrency laws with interleavings/virtual clocks. - Purity: Fake effects for det; use async def test_ with @given. - Error Model: Include async failures (e.g., timeouts as Err). - Resource Safety: Bound generation (e.g., max_size); no infinite gens; deadline per test; anyio.fail_after for hangs. - Integration: Add to RAG (test async chunking); CI runs async suite. - Mypy Config: --strict; async typing. - Exit: Laws pass, diversity hit, no flakiness, CI green.

Audience: Engineers testing async FP pipelines reliably.

Outcome: 1. Write PBT for invariants in async/stream layers. 2. Fake I/O/time/exec for det. 3. Test RAG async properties.


1. Laws & Invariants

Invariant Description Enforcement
Property Inv Properties hold for generated streams; failures shrink. Hypothesis runs
Determinism Inv Tests pass under fakes; same seed == same output (mod scheduler nondet). Seeded PBT
Laziness Inv Partial consumption (e.g., islice) doesn't evaluate full stream. PBT with counters
No Dup Inv Streams emit no new duplicates relative to input multiset. PBT collection
Propagation Inv Errors/timeouts propagate without corruption. PBT error injection

These extend Core 4 for async/stream. Formalize with preconditions (e.g., "no dup on retry" assumes idempotent adapters).


2. Decision Table

Scenario Async Needed Faking Needed Recommended
Pure laws No No Sync PBT (Core 4)
Adapter laws Yes Yes Async PBT with fakes
Concurrency laws Yes Yes Interleavings/clocks
Sync pure No No Core 4 PBT

Choose async PBT for coros; sync for data.


3. Public API (Strategies/Fakes for Async/Stream)

Strategies for streams; use lists as finite proxies, with async iterables for interleavings.

from typing import AsyncIterator, TypeVar, List, Callable, Generic
from collections.abc import AsyncIterator as AsyncIteratorType
from hypothesis import strategies as st
from funcpipe_rag import Result, Ok, Err, ErrInfo, RawDoc
import anyio  # For virtual clock

T = TypeVar("T")

stream_strategy = st.lists(raw_doc_strategy(), max_size=50, unique_by=lambda d: d.doc_id)  # Finite proxy; unique IDs


@st.composite
def async_stream_strategy(draw) -> Callable[[], AsyncIteratorType[RawDoc]]:
    items = draw(stream_strategy)
    pauses = draw(st.lists(st.booleans(), min_size=len(items), max_size=len(items)))

    async def gen():
        for x, p in zip(items, pauses):
            if p: await anyio.sleep(0)
            yield x

    return gen


class FakeStorage(Generic[T]):
    def __init__(self, items: List[Result[T, ErrInfo]]):
        self._items = items

    async def read_docs(self, path: str) -> AsyncIteratorType[Result[T, ErrInfo]]:
        for d in self._items:
            yield d

4. Reference Implementations

4.1 Pure Layer PBT (Idempotence - Sync Model)

Test pure core sync.

from hypothesis import given, settings, Phase
from funcpipe_rag import eq_pure


@given(raw=raw_doc_strategy())
@settings(max_examples=200, deadline=1000, phases=(Phase.explicit, Phase.generate, Phase.target))
def test_clean_idempotent(raw):
    once = clean_doc(raw)
    twice = clean_doc(once)
    assert eq_pure([once], [twice], key=lambda d: (d.doc_id, d.title, d.abstract, d.categories))

4.2 Adapter Layer PBT (Equivalence - Det Async)

Inject fakes.

import pytest
from hypothesis import given, settings, Phase
from funcpipe_rag import eq_pure
from typing import List


@given(docs=doc_list_strategy, seed=st.integers(0, 2 ** 32 - 1))
@settings(max_examples=200, deadline=None)
@pytest.mark.anyio
async def test_async_adapter_equiv(docs, seed):
    rng_imp = Random(seed)
    rng_fp = Random(seed)
    storage_imp = FakeStorage[RawDoc]([Ok(d) for d in docs])
    storage_fp = FakeStorage[RawDoc]([Ok(d) for d in docs])
    imp = await imperative_async_rag(rng=rng_imp, storage=storage_imp, path="fake_path")
    fp = await fp_async_rag(rng=rng_fp, storage=storage_fp, path="fake_path")
    assert eq_pure(imp, fp, key=lambda c: (c.doc_id, c.start, c.end, c.text))

4.3 Concurrency Layer PBT (No Dup - Interleavings)

Use anyio virtual clock.

from anyio.testing import TestClock
from hypothesis import given, settings
from collections import Counter

@given(gen=async_stream_strategy())
@settings(max_examples=200, deadline=None)
@pytest.mark.anyio
async def test_stream_no_dup(gen):
    stream = gen()
    with anyio.fail_after(2):  # Timeout to prevent hangs
        processed = [p async for p in async_process_stream(stream)]
    # No new duplicates relative to input
    input_ids = Counter([d.doc_id async for d in gen()])
    output_ids = Counter(p.doc_id for p in processed)
    assert all(output_ids[k] <= input_ids[k] for k in output_ids)

4.4 Faking I/O/Time/Exec

Use concrete fakes (injected); virtual clock for time.

@given(docs=st.lists(raw_doc_strategy(), max_size=10))
@settings(max_examples=200)
@pytest.mark.anyio
async def test_faked_io(docs):
    expected = sum(1 for i, _ in enumerate(docs) if i % 2 == 0)
    fake = FakeStorage[RawDoc]([Ok(d) if i % 2 == 0 else Err(ErrInfo("transient")) for i, d in enumerate(docs)])
    result = await tested_func(storage=fake)
    assert len(result) == expected  # Mixed Ok/Err

Virtual clock (via fixture for anyio_backend):

# conftest.py
@pytest.fixture
def anyio_backend():
    return "asyncio", {"use_uvloop": True}  # Faster loop; determinism from fakes + fixed schedules

Fake exec: Inject sync executor or fake concurrent.futures.

RAG Integration

Add async PBT to tests/test_rag_async.py; CI runs.


5. Property-Based Proofs (tests/test_module_10_core5.py)

Diversity with events.

from hypothesis import event, target

@given(gen=async_stream_strategy())
@pytest.mark.anyio
async def test_stream_diversity(gen):
    stream = gen()
    count = 0
    async for _ in stream:
        count += 1
    event(f"n_items={count}")
    target(count)
    # ... property ...

5.1 Partial Pull / Cancellation Law

Test laziness with counters.

@st.composite
def async_stream_strategy_min5(draw) -> Callable[[], AsyncIteratorType[RawDoc]]:
    items = draw(st.lists(raw_doc_strategy(), min_size=5, max_size=50, unique_by=lambda d: d.doc_id))
    pauses = draw(st.lists(st.booleans(), min_size=len(items), max_size=len(items)))
    async def gen():
        for x, p in zip(items, pauses):
            if p: await anyio.sleep(0)
            yield x
    return gen

@given(gen=async_stream_strategy_min5())
@pytest.mark.anyio
async def test_partial_laziness(gen):
    stream = gen()
    counter = 0
    async def counted_gen():
        nonlocal counter
        async for d in stream:
            counter += 1
            yield d
    processed = []
    async for p in async_process_stream(counted_gen()):
        processed.append(p)
        if len(processed) == 5: break  # Cancel
    assert len(processed) == 5
    assert counter == 5  # No overpull

5.2 Retry Law with Failure Schedule

Model failures as exceptions (assuming adapter retries exceptions per-item with de-dup).

class TransientError(Exception):
    pass

@st.composite
def failure_schedule_strategy(draw):
    return draw(st.lists(st.booleans(), max_size=10))  # Fail on True

@given(gen=async_stream_strategy(), schedule=failure_schedule_strategy())
@pytest.mark.anyio
async def test_retry_no_dup(gen, schedule):
    failures = iter(schedule)
    async def flaky_gen():
        async for d in gen():
            if next(failures, False): raise TransientError("retryable")
            yield d
    processed = [p async for p in async_retry_gen(flaky_gen())]
    input_docs = [d async for d in gen()]
    assert Counter(p.doc_id for p in processed) == Counter(d.doc_id for d in input_docs)  # Exact match, no dup/loss

6. Runtime Preservation Guarantee

Bound examples/sizes; no deadline if flaky; CI timeouts.


7. Anti-Patterns & Immediate Fixes

Anti-Pattern Symptom Fix
No fakes Flaky passes/fails Inject concrete fakes
Unbounded streams Hang/slow Bound sizes
Sync PBT for async Miss races Use async def test_
Real I/O External deps Fake all

8. Pre-Core Quiz

  1. PBT async for…? → Laws in coros/streams
  2. Fake I/O with…? → Concrete fakes
  3. Determinism by…? → Seed/fake time/exec
  4. Stateful for…? → Sequence ops
  5. CI for async…? → pytest-anyio

9. Post-Core Exercise

  1. Write async PBT for RAG stream.
  2. Fake I/O; test.
  3. Add to CI.

Pipeline Usage (Idiomatic)

@given(gen=async_stream_strategy())
@pytest.mark.anyio
async def test_stream_law(gen):
    # ...

Next: core 6. Future Directions – Pattern Matching, Typing Advances, Next-Step Parallel FuncPipe