Skip to content

M08C02: Async Generators & Streams – Building Async FuncPipe

Module 08 – Main Track Core

Main track: Cores 1–10 (Async / Concurrent Pipelines → Production).
This is a required core. Every production FuncPipe streaming pipeline treats async def + yield as a pure description of a lazy async stream — never eager materialisation, never hidden resource leaks.

Progression Note

Module 8 is Async FuncPipe & Backpressure — the lightweight, production-grade concurrency layer that sits directly on top of Module 7’s effect boundaries.

Module Focus Key Outcomes
7 Effect Boundaries & Resource Safety Ports & adapters, capability protocols, idempotent effects, explicit sessions
8 Async FuncPipe & Backpressure Async FuncPipe, non-blocking pipelines, backpressure, basic fairness
9 FP Across Libraries and Frameworks FuncPipe style in Pandas/Polars/Dask, FastAPI, CLI, distributed systems
10 Refactoring, Performance, Future-Proofing Systematic refactors, performance budgets, governance and long-term evolution

Core question
How do you extend the pure async descriptions from Core 1 to lazy, composable async streams — preserving purity, resource safety, per-item Result-based error propagation, and backpressure readiness?

Symmetry with Module 7 & M08C01

IOGen[T]      = () -> Iterator[Result[T, ErrInfo]]
AsyncGen[T]   = () -> AsyncIterator[Result[T, ErrInfo]]
AsyncGen is exactly the same idea as a lazy sync generator or single-value AsyncPlan, only streaming and non-blocking.

What you now have after M08C01 + this core - Single-value async descriptions (AsyncPlan)
- Streaming async descriptions (AsyncGen)
- Full monadic composition over streams (async_gen_and_then, async_gen_map)
- Resource-safe streaming via async_gen_using
- Pure, replayable, lazy async pipelines — ready for backpressure (Core 3)

What the rest of Module 8 adds - Bounded concurrency & backpressure (M08C03–C05)
- Retry & timeout policies as data
- Rate limiting & fairness
- Full production async RAG pipeline

1. Algebraic Laws & Architectural Invariants

Category Name Description Enforcement
Algebraic Laws Left Identity async_gen_and_then(async_gen_return(x), f) == f(x) Hypothesis
Algebraic Laws Right Identity async_gen_and_then(g, async_gen_return) == g Hypothesis
Algebraic Laws Associativity async_gen_and_then(async_gen_and_then(g, f), h) == async_gen_and_then(g, lambda x: async_gen_and_then(f(x), h)) Hypothesis
Architectural Invariants Description Purity Constructing an AsyncGen performs zero side effects. Effects happen only when the stream is driven in the shell. Mock tests + static analysis
Architectural Invariants Laziness Items are yielded on demand; early break yields nothing further. Property tests
Architectural Invariants Per-item Error Propagation An Err in the outer stream is yielded as-is and processing continues (no short-circuit). Inner streams are not started on Err. Property tests
Architectural Invariants Resource Safety Resources are released on normal exit, early break, or exception via async_gen_using. Contextlib + aclose tests
Architectural Invariants Replayability The same AsyncGen thunk can be called multiple times → each call produces a fresh async iterator. Property tests

The algebraic laws treat AsyncGen as a monad on the success path (T inside Ok). Errors are per-item and do not short-circuit the outer stream.

2. Decision Table

Scenario Single Value Streaming Recommended Pattern
Known-size async batch Yes No AsyncPlan (Core 1)
Unbounded or large async input No Yes AsyncGen + async_gen_and_then
Needs explicit resource cleanup No Yes async_gen_using
Parallel independent streams No Yes async_gen_gather (M08C03)

Rule: Use AsyncGen exactly when you need lazy, potentially unbounded async streams.

async_gen_gather lives in src/funcpipe_rag/domain/effects/async_/stream.py and is re-exported from funcpipe_rag.domain.effects.async_.

3. Public API – AsyncGen (src/funcpipe_rag/domain/effects/async_/stream.py – mypy --strict clean)

# funcpipe_rag/domain/effects/async_/stream.py
from __future__ import annotations

from collections.abc import AsyncIterator, Callable, AsyncContextManager, Awaitable
from typing import TypeVar

from funcpipe_rag import Result, Ok, Err, ErrInfo, assert_never

T = TypeVar("T")
U = TypeVar("U")
R = TypeVar("R")

# AsyncGen[T] = a pure, replayable description of a lazy async stream yielding Result[T, ErrInfo]
AsyncGen = Callable[[], AsyncIterator[Result[T, ErrInfo]]]

def async_gen_return(value: T) -> AsyncGen[T]:
    async def _gen() -> AsyncIterator[Result[T, ErrInfo]]:
        yield Ok(value)
    return lambda: _gen()

def async_gen_map(
    gen: AsyncGen[T],
    f: Callable[[T], U],
) -> AsyncGen[U]:
    async def _mapped() -> AsyncIterator[Result[U, ErrInfo]]:
        async for item in gen():
            yield item.map(f)
    return lambda: _mapped()

def async_gen_and_then(
    gen: AsyncGen[T],
    f: Callable[[T], AsyncGen[U]],
) -> AsyncGen[U]:
    async def _bind() -> AsyncIterator[Result[U, ErrInfo]]:
        async for item in gen():
            match item:
                case Ok(value=v):
                    async for inner in f(v)():
                        yield inner
                case Err(error=e):
                    yield Err(e)
                case _:
                    assert_never(item)
    return lambda: _bind()

# Lift a per-item async function into a streaming transformer
def lift_async_item(
    fn: Callable[[T], Awaitable[Result[U, ErrInfo]]]
) -> Callable[[T], AsyncGen[U]]:
    async def _stream_item(x: T) -> AsyncIterator[Result[U, ErrInfo]]:
        yield await fn(x)
    return lambda x: (lambda: _stream_item(x))

# Resource-safe streaming
def async_gen_using(
    cm_thunk: Callable[[], AsyncContextManager[R]],
    make_stream: Callable[[R], AsyncGen[T]],
) -> AsyncGen[T]:
    async def _using() -> AsyncIterator[Result[T, ErrInfo]]:
        async with cm_thunk() as resource:
            async for item in make_stream(resource)():
                yield item
    return lambda: _using()

4. Reference Implementations (pure domain code)

4.1 Resource-safe HTTP response streaming

def async_stream_http_lines(url: str, client: HttpClientCap) -> AsyncGen[str]:
    # HttpClientCap.stream returns AsyncContextManager[AsyncResponse]
    def make_stream(resp: AsyncResponse) -> AsyncGen[str]:
        async def _lines() -> AsyncIterator[Result[str, ErrInfo]]:
            async for raw_line in resp.aiter_lines():
                yield Ok(raw_line.decode())
        return lambda: _lines()

    return async_gen_map(
        async_gen_using(lambda: client.stream("GET", url), make_stream),
        lambda line: line.rstrip("\n")
    )

4.2 Composing streaming pipeline (canonical factored style)

# Per-item async steps
_clean_item = lift_async_item(clean_item_async)
_embed_item = lift_async_item(embed_item_async)

def async_rag_stream(
    docs: AsyncGen[RawDoc],
    env: RagEnv,
    embedder: EmbeddingCap,
) -> AsyncGen[EmbeddedChunk]:
    return async_gen_and_then(
        async_gen_and_then(docs, _clean_item),
        _embed_item,
    )

4.3 Full RAG async stream (domain/rag_async.py)

def async_rag_pipeline_stream(
    storage: StoragePort,
    input_path: str,
    env: RagEnv,
    embedder: EmbeddingCap,
) -> AsyncGen[EmbeddedChunk]:
    return async_gen_and_then(
        async_gen_using(
            lambda: storage.open_input(input_path),  # returns AsyncContextManager[AsyncIterator[str]]
            parse_docs_stream,
        ),
        lambda docs: async_rag_stream(docs, env, embedder),
    )

Shell consumption (only place driving happens):

async def run_rag_stream():
    stream = async_rag_pipeline_stream(real_storage, "data.csv", env, embedder)
    processed = 0
    async for result in stream():
        match result:
            case Ok(chunk):
                await index_chunk(chunk)
                processed += 1
            case Err(e):
                logger.error("Failed chunk: %s", e)
                break  # or continue
    logger.info("Processed %d chunks", processed)

5. Property-Based Proofs (all pass in CI)

@given(x=st.integers())
@pytest.mark.asyncio
async def test_async_gen_left_identity(x: int):
    f = lambda y: async_gen_return(y + 1)

    left = [item async for item in async_gen_and_then(async_gen_return(x), f)()]
    right = [item async for item in f(x)()]

    assert left == right

@given(xs=st.lists(st.integers(), max_size=100))
@pytest.mark.asyncio
async def test_async_gen_right_identity(xs):
    # Laws are defined on the Ok-path only; we generate only Ok here on purpose.
    def src() -> AsyncGen[int]:
        async def _src() -> AsyncIterator[Result[int, ErrInfo]]:
            for v in xs:
                yield Ok(v)
        return lambda: _src()

    left = [item async for item in async_gen_and_then(src(), async_gen_return)()]
    right = [item async for item in src()()]

    assert left == right

@given(xs=st.lists(st.integers(), max_size=100))
@pytest.mark.asyncio
async def test_async_gen_associativity(xs):
    # Laws are defined on the Ok-path only; we generate only Ok here on purpose.
    def src() -> AsyncGen[int]:
        async def _src() -> AsyncIterator[Result[int, ErrInfo]]:
            for v in xs:
                yield Ok(v)
        return lambda: _src()

    f = lambda y: async_gen_return(y + 1)
    g = lambda z: async_gen_return(z * 2)

    a = async_gen_and_then(async_gen_and_then(src(), f), g)
    b = async_gen_and_then(src(), lambda y: async_gen_and_then(f(y), g))

    a_items = [item.value async for item in a() if isinstance(item, Ok)]
    b_items = [item.value async for item in b() if isinstance(item, Ok)]

    assert a_items == b_items

@pytest.mark.asyncio
async def test_async_gen_and_then_per_item_error_propagation():
    async def _src() -> AsyncIterator[Result[int, ErrInfo]]:
        yield Ok(1)
        yield Err(ErrInfo(code="BOOM", msg="test"))
        yield Ok(2)

    def src() -> AsyncGen[int]:
        return lambda: _src()

    f = lambda x: async_gen_return(x * 10)

    out = [item async for item in async_gen_and_then(src(), f)()]

    assert isinstance(out[0], Ok) and out[0].value == 10
    assert isinstance(out[1], Err) and out[1].error.code == "BOOM"
    assert isinstance(out[2], Ok) and out[2].value == 20

@pytest.mark.asyncio
async def test_async_gen_replayable():
    gen = async_gen_return(1)

    first = [item async for item in gen()]
    second = [item async for item in gen()]

    assert first == second
from contextlib import asynccontextmanager

@pytest.mark.asyncio
async def test_resource_cleanup_on_early_break():
    called = False

    @asynccontextmanager
    async def fake_resource():
        nonlocal called
        try:
            yield None
        finally:
            called = True

    stream = async_gen_using(fake_resource, lambda _: async_gen_return(42))

    async for _ in stream():  # consume one item then break
        break

    assert called

6. Runtime Guarantees

Operation Memory per item Total memory (full consumption) Notes
async_gen_map O(1) O(1) Pure transformation
async_gen_and_then O(1) O(1) Streams inner only on demand
async_gen_using O(1) O(1) Resource released on break/error

Full consumption has same asymptotics as batch; partial consumption is O(1).

7. Anti-Patterns & Immediate Fixes

Anti-Pattern Symptom Fix
Materialising streams into lists OOM on large inputs Keep as AsyncGen, consume lazily
Manual try/finally with aclose Leaks on early break Use async_gen_using
Unbounded async for without backpressure Downstream overload Add bounded queue (M08C03)
Yielding raw values instead of Result Exceptions kill the stream Always yield Result[T, ErrInfo]

8. Pre-Core Quiz

  1. Async generator returns…? → AsyncIterator – a lazy stream
  2. When is work performed…? → Only when iterated with async for in the shell
  3. Compose streaming pipelines with…? → async_gen_and_then / async_gen_map
  4. Resource safety is guaranteed by…? → async_gen_using
  5. Error propagation is…? → Per-item Result; Err yields and continues outer stream

9. Post-Core Exercise

  1. Convert one real I/O stream in your codebase to an AsyncGen description.
  2. Compose a ≥3-step streaming pipeline using async_gen_and_then.
  3. Wrap a resource (file, HTTP connection) with async_gen_using.
  4. Write a Hypothesis property proving one of the bind laws on a small synthetic stream.
  5. Celebrate – you now have true async streaming without magic or leaks.

Next → M08C03: Bounded Queues & Backpressure – Preventing Memory Blowups

You now have lazy, composable, resource-safe async streams — the foundation for production Async FuncPipe. The remaining cores add backpressure, fairness, retries, and full async RAG — all while keeping the core pure and the shell in full control.

M08C02 is now frozen.