M08C02: Async Generators & Streams – Building Async FuncPipe¶
Module 08 – Main Track Core
Main track: Cores 1–10 (Async / Concurrent Pipelines → Production).
This is a required core. Every production FuncPipe streaming pipeline treatsasync def+yieldas a pure description of a lazy async stream — never eager materialisation, never hidden resource leaks.
Progression Note¶
Module 8 is Async FuncPipe & Backpressure — the lightweight, production-grade concurrency layer that sits directly on top of Module 7’s effect boundaries.
| Module | Focus | Key Outcomes |
|---|---|---|
| 7 | Effect Boundaries & Resource Safety | Ports & adapters, capability protocols, idempotent effects, explicit sessions |
| 8 | Async FuncPipe & Backpressure | Async FuncPipe, non-blocking pipelines, backpressure, basic fairness |
| 9 | FP Across Libraries and Frameworks | FuncPipe style in Pandas/Polars/Dask, FastAPI, CLI, distributed systems |
| 10 | Refactoring, Performance, Future-Proofing | Systematic refactors, performance budgets, governance and long-term evolution |
Core question
How do you extend the pure async descriptions from Core 1 to lazy, composable async streams — preserving purity, resource safety, per-item Result-based error propagation, and backpressure readiness?
Symmetry with Module 7 & M08C01
AsyncPlan, only streaming and non-blocking.
What you now have after M08C01 + this core
- Single-value async descriptions (AsyncPlan)
- Streaming async descriptions (AsyncGen)
- Full monadic composition over streams (async_gen_and_then, async_gen_map)
- Resource-safe streaming via async_gen_using
- Pure, replayable, lazy async pipelines — ready for backpressure (Core 3)
What the rest of Module 8 adds
- Bounded concurrency & backpressure (M08C03–C05)
- Retry & timeout policies as data
- Rate limiting & fairness
- Full production async RAG pipeline
1. Algebraic Laws & Architectural Invariants¶
| Category | Name | Description | Enforcement |
|---|---|---|---|
| Algebraic Laws | Left Identity | async_gen_and_then(async_gen_return(x), f) == f(x) |
Hypothesis |
| Algebraic Laws | Right Identity | async_gen_and_then(g, async_gen_return) == g |
Hypothesis |
| Algebraic Laws | Associativity | async_gen_and_then(async_gen_and_then(g, f), h) == async_gen_and_then(g, lambda x: async_gen_and_then(f(x), h)) |
Hypothesis |
| Architectural Invariants | Description Purity | Constructing an AsyncGen performs zero side effects. Effects happen only when the stream is driven in the shell. |
Mock tests + static analysis |
| Architectural Invariants | Laziness | Items are yielded on demand; early break yields nothing further. | Property tests |
| Architectural Invariants | Per-item Error Propagation | An Err in the outer stream is yielded as-is and processing continues (no short-circuit). Inner streams are not started on Err. |
Property tests |
| Architectural Invariants | Resource Safety | Resources are released on normal exit, early break, or exception via async_gen_using. |
Contextlib + aclose tests |
| Architectural Invariants | Replayability | The same AsyncGen thunk can be called multiple times → each call produces a fresh async iterator. |
Property tests |
The algebraic laws treat AsyncGen as a monad on the success path (T inside Ok). Errors are per-item and do not short-circuit the outer stream.
2. Decision Table¶
| Scenario | Single Value | Streaming | Recommended Pattern |
|---|---|---|---|
| Known-size async batch | Yes | No | AsyncPlan (Core 1) |
| Unbounded or large async input | No | Yes | AsyncGen + async_gen_and_then |
| Needs explicit resource cleanup | No | Yes | async_gen_using |
| Parallel independent streams | No | Yes | async_gen_gather (M08C03) |
Rule: Use AsyncGen exactly when you need lazy, potentially unbounded async streams.
async_gen_gather lives in src/funcpipe_rag/domain/effects/async_/stream.py and is re-exported from funcpipe_rag.domain.effects.async_.
3. Public API – AsyncGen (src/funcpipe_rag/domain/effects/async_/stream.py – mypy --strict clean)¶
# funcpipe_rag/domain/effects/async_/stream.py
from __future__ import annotations
from collections.abc import AsyncIterator, Callable, AsyncContextManager, Awaitable
from typing import TypeVar
from funcpipe_rag import Result, Ok, Err, ErrInfo, assert_never
T = TypeVar("T")
U = TypeVar("U")
R = TypeVar("R")
# AsyncGen[T] = a pure, replayable description of a lazy async stream yielding Result[T, ErrInfo]
AsyncGen = Callable[[], AsyncIterator[Result[T, ErrInfo]]]
def async_gen_return(value: T) -> AsyncGen[T]:
async def _gen() -> AsyncIterator[Result[T, ErrInfo]]:
yield Ok(value)
return lambda: _gen()
def async_gen_map(
gen: AsyncGen[T],
f: Callable[[T], U],
) -> AsyncGen[U]:
async def _mapped() -> AsyncIterator[Result[U, ErrInfo]]:
async for item in gen():
yield item.map(f)
return lambda: _mapped()
def async_gen_and_then(
gen: AsyncGen[T],
f: Callable[[T], AsyncGen[U]],
) -> AsyncGen[U]:
async def _bind() -> AsyncIterator[Result[U, ErrInfo]]:
async for item in gen():
match item:
case Ok(value=v):
async for inner in f(v)():
yield inner
case Err(error=e):
yield Err(e)
case _:
assert_never(item)
return lambda: _bind()
# Lift a per-item async function into a streaming transformer
def lift_async_item(
fn: Callable[[T], Awaitable[Result[U, ErrInfo]]]
) -> Callable[[T], AsyncGen[U]]:
async def _stream_item(x: T) -> AsyncIterator[Result[U, ErrInfo]]:
yield await fn(x)
return lambda x: (lambda: _stream_item(x))
# Resource-safe streaming
def async_gen_using(
cm_thunk: Callable[[], AsyncContextManager[R]],
make_stream: Callable[[R], AsyncGen[T]],
) -> AsyncGen[T]:
async def _using() -> AsyncIterator[Result[T, ErrInfo]]:
async with cm_thunk() as resource:
async for item in make_stream(resource)():
yield item
return lambda: _using()
4. Reference Implementations (pure domain code)¶
4.1 Resource-safe HTTP response streaming¶
def async_stream_http_lines(url: str, client: HttpClientCap) -> AsyncGen[str]:
# HttpClientCap.stream returns AsyncContextManager[AsyncResponse]
def make_stream(resp: AsyncResponse) -> AsyncGen[str]:
async def _lines() -> AsyncIterator[Result[str, ErrInfo]]:
async for raw_line in resp.aiter_lines():
yield Ok(raw_line.decode())
return lambda: _lines()
return async_gen_map(
async_gen_using(lambda: client.stream("GET", url), make_stream),
lambda line: line.rstrip("\n")
)
4.2 Composing streaming pipeline (canonical factored style)¶
# Per-item async steps
_clean_item = lift_async_item(clean_item_async)
_embed_item = lift_async_item(embed_item_async)
def async_rag_stream(
docs: AsyncGen[RawDoc],
env: RagEnv,
embedder: EmbeddingCap,
) -> AsyncGen[EmbeddedChunk]:
return async_gen_and_then(
async_gen_and_then(docs, _clean_item),
_embed_item,
)
4.3 Full RAG async stream (domain/rag_async.py)¶
def async_rag_pipeline_stream(
storage: StoragePort,
input_path: str,
env: RagEnv,
embedder: EmbeddingCap,
) -> AsyncGen[EmbeddedChunk]:
return async_gen_and_then(
async_gen_using(
lambda: storage.open_input(input_path), # returns AsyncContextManager[AsyncIterator[str]]
parse_docs_stream,
),
lambda docs: async_rag_stream(docs, env, embedder),
)
Shell consumption (only place driving happens):
async def run_rag_stream():
stream = async_rag_pipeline_stream(real_storage, "data.csv", env, embedder)
processed = 0
async for result in stream():
match result:
case Ok(chunk):
await index_chunk(chunk)
processed += 1
case Err(e):
logger.error("Failed chunk: %s", e)
break # or continue
logger.info("Processed %d chunks", processed)
5. Property-Based Proofs (all pass in CI)¶
@given(x=st.integers())
@pytest.mark.asyncio
async def test_async_gen_left_identity(x: int):
f = lambda y: async_gen_return(y + 1)
left = [item async for item in async_gen_and_then(async_gen_return(x), f)()]
right = [item async for item in f(x)()]
assert left == right
@given(xs=st.lists(st.integers(), max_size=100))
@pytest.mark.asyncio
async def test_async_gen_right_identity(xs):
# Laws are defined on the Ok-path only; we generate only Ok here on purpose.
def src() -> AsyncGen[int]:
async def _src() -> AsyncIterator[Result[int, ErrInfo]]:
for v in xs:
yield Ok(v)
return lambda: _src()
left = [item async for item in async_gen_and_then(src(), async_gen_return)()]
right = [item async for item in src()()]
assert left == right
@given(xs=st.lists(st.integers(), max_size=100))
@pytest.mark.asyncio
async def test_async_gen_associativity(xs):
# Laws are defined on the Ok-path only; we generate only Ok here on purpose.
def src() -> AsyncGen[int]:
async def _src() -> AsyncIterator[Result[int, ErrInfo]]:
for v in xs:
yield Ok(v)
return lambda: _src()
f = lambda y: async_gen_return(y + 1)
g = lambda z: async_gen_return(z * 2)
a = async_gen_and_then(async_gen_and_then(src(), f), g)
b = async_gen_and_then(src(), lambda y: async_gen_and_then(f(y), g))
a_items = [item.value async for item in a() if isinstance(item, Ok)]
b_items = [item.value async for item in b() if isinstance(item, Ok)]
assert a_items == b_items
@pytest.mark.asyncio
async def test_async_gen_and_then_per_item_error_propagation():
async def _src() -> AsyncIterator[Result[int, ErrInfo]]:
yield Ok(1)
yield Err(ErrInfo(code="BOOM", msg="test"))
yield Ok(2)
def src() -> AsyncGen[int]:
return lambda: _src()
f = lambda x: async_gen_return(x * 10)
out = [item async for item in async_gen_and_then(src(), f)()]
assert isinstance(out[0], Ok) and out[0].value == 10
assert isinstance(out[1], Err) and out[1].error.code == "BOOM"
assert isinstance(out[2], Ok) and out[2].value == 20
@pytest.mark.asyncio
async def test_async_gen_replayable():
gen = async_gen_return(1)
first = [item async for item in gen()]
second = [item async for item in gen()]
assert first == second
from contextlib import asynccontextmanager
@pytest.mark.asyncio
async def test_resource_cleanup_on_early_break():
called = False
@asynccontextmanager
async def fake_resource():
nonlocal called
try:
yield None
finally:
called = True
stream = async_gen_using(fake_resource, lambda _: async_gen_return(42))
async for _ in stream(): # consume one item then break
break
assert called
6. Runtime Guarantees¶
| Operation | Memory per item | Total memory (full consumption) | Notes |
|---|---|---|---|
| async_gen_map | O(1) | O(1) | Pure transformation |
| async_gen_and_then | O(1) | O(1) | Streams inner only on demand |
| async_gen_using | O(1) | O(1) | Resource released on break/error |
Full consumption has same asymptotics as batch; partial consumption is O(1).
7. Anti-Patterns & Immediate Fixes¶
| Anti-Pattern | Symptom | Fix |
|---|---|---|
| Materialising streams into lists | OOM on large inputs | Keep as AsyncGen, consume lazily |
Manual try/finally with aclose |
Leaks on early break | Use async_gen_using |
Unbounded async for without backpressure |
Downstream overload | Add bounded queue (M08C03) |
Yielding raw values instead of Result |
Exceptions kill the stream | Always yield Result[T, ErrInfo] |
8. Pre-Core Quiz¶
- Async generator returns…? → AsyncIterator – a lazy stream
- When is work performed…? → Only when iterated with
async forin the shell - Compose streaming pipelines with…? →
async_gen_and_then/async_gen_map - Resource safety is guaranteed by…? →
async_gen_using - Error propagation is…? → Per-item
Result;Erryields and continues outer stream
9. Post-Core Exercise¶
- Convert one real I/O stream in your codebase to an
AsyncGendescription. - Compose a ≥3-step streaming pipeline using
async_gen_and_then. - Wrap a resource (file, HTTP connection) with
async_gen_using. - Write a Hypothesis property proving one of the bind laws on a small synthetic stream.
- Celebrate – you now have true async streaming without magic or leaks.
Next → M08C03: Bounded Queues & Backpressure – Preventing Memory Blowups
You now have lazy, composable, resource-safe async streams — the foundation for production Async FuncPipe. The remaining cores add backpressure, fairness, retries, and full async RAG — all while keeping the core pure and the shell in full control.
M08C02 is now frozen.