Skip to content

M08C01: async/await as Descriptions of Steps, Not Hidden Magic

Module 08 – Main Track Core

Main track: Cores 1–10 (Async / Concurrent Pipelines → Production).
This is a required core. Every production FuncPipe async system treats async def as a pure description of steps — never hidden magic.

Progression Note

Module 8 is Async FuncPipe & Backpressure — the lightweight, production-grade concurrency layer that sits directly on top of Module 7’s effect boundaries.

Module Focus Key Outcomes
7 Effect Boundaries & Resource Safety Ports & adapters, capability protocols, idempotent effects, explicit sessions
8 Async FuncPipe & Backpressure Async FuncPipe, non-blocking pipelines, backpressure, basic fairness
9 FP Across Libraries and Frameworks FuncPipe style in Pandas/Polars/Dask, FastAPI, CLI, distributed systems
10 Refactoring, Performance, Future-Proofing Systematic refactors, performance budgets, governance and long-term evolution

Core question
How do you model asynchronous operations as pure, composable descriptions using coroutines — so that concurrency is explicit data, testable, and never hidden magic?

Symmetry with Module 7

IOPlan[T]    = () -> Result[T, ErrInfo]
AsyncPlan[T] = () -> Awaitable[Result[T, ErrInfo]]
AsyncPlan is exactly the same idea as IOPlan, only non-blocking.

What you now have after M07 + this core - Pure domain core
- All effects behind ports
- Effectful operations described as pure data (IOPlan for sync, AsyncPlan for async)
- Typed capability protocols
- Reliable resource cleanup
- Pure logging via Writer
- Idempotent effects
- Explicit sessions & transactions
- Async operations as pure descriptionsasync def returns a coroutine object (a value), not execution. await inside coroutine bodies is required and allowed. Driving the plan (perform_async, asyncio.run, create_task, top-level await) happens only in shells.

What the rest of Module 8 adds - Async generators & streaming (M08C02)
- Bounded concurrency, backpressure, fairness (M08C03–C05)
- Retry & timeout policies as data
- Deterministic async testing
- Full async RAG pipeline

You are now one step away from production-grade async/concurrency.

1. Algebraic Laws & Architectural Invariants (machine-checked where possible)

Category Name Description Enforcement
Algebraic Laws Left Identity async_bind(async_pure(x), f) == f(x) Hypothesis
Algebraic Laws Right Identity async_bind(plan, async_pure) == plan Hypothesis
Algebraic Laws Associativity async_bind(async_bind(m, f), g) == async_bind(m, lambda x: async_bind(f(x), g)) Hypothesis
Architectural Invariants Description Purity Constructing an AsyncPlan performs zero side effects. Effects happen only when a plan is driven in the shell. Mock tests + static analysis
Architectural Invariants Replayability The same AsyncPlan value can be called multiple times → each call produces a fresh coroutine. Property tests
Architectural Invariants Interpreter Isolation Core may await inside coroutine bodies, but must never drive a plan (perform_async, asyncio.run, create_task, top-level await). Only shells drive plans. mypy --strict + grep-CI + review

The algebraic laws are true monad laws over Result. The architectural invariants are enforced by tooling and review.

2. Decision Table – Sync vs Async Description

Scenario Blocking OK? Needs Concurrency / Non-blocking I/O? Recommended Pattern
CPU-bound pure logic Yes No Sync functions
I/O-bound single step Yes No IOPlan (Module 7)
I/O-bound with dependent steps No Yes AsyncPlan + async_bind
Independent parallel I/O No Yes async_gather (M08C03)
Streaming over network / DB No Yes Async generator + backpressure (M08C02)

Rule: Use AsyncPlan exactly when you need non-blocking, dependent I/O steps. Never drive a plan in core.

3. Public API – AsyncPlan (src/funcpipe_rag/domain/effects/async_/plan.py – mypy --strict clean)

# funcpipe_rag/domain/effects/async_/plan.py
from __future__ import annotations

from collections.abc import Awaitable, Callable
from typing import TypeAlias, TypeVar

from funcpipe_rag import Result, Ok, Err, ErrInfo, assert_never

A = TypeVar("A")
B = TypeVar("B")

# AsyncPlan[T] = a pure, replayable description of async work that eventually yields Result[T, ErrInfo]
AsyncPlan: TypeAlias = Callable[[], Awaitable[Result[A, ErrInfo]]]

def async_pure(value: A) -> AsyncPlan[A]:
    async def _coro() -> Result[A, ErrInfo]:
        return Ok(value)
    return lambda: _coro()

def async_bind(
    plan: AsyncPlan[A],
    f: Callable[[A], AsyncPlan[B]],
) -> AsyncPlan[B]:
    async def _coro() -> Result[B, ErrInfo]:
        res = await plan()
        match res:
            case Ok(value=v):
                return await f(v)()
            case Err(error=e):
                return Err(e)
        assert_never(res)
    return lambda: _coro()

def async_map(
    plan: AsyncPlan[A],
    f: Callable[[A], B],
) -> AsyncPlan[B]:
    return async_bind(plan, lambda x: async_pure(f(x)))

# Lift a parameterless async capability call into a replayable AsyncPlan
# Precondition: make_coro MUST return a fresh coroutine each call.
# Correct:   lambda: capability.method(...)
# Incorrect: coro = capability.method(...); async_lift(lambda: coro)
def async_lift(
    make_coro: Callable[[], Awaitable[Result[A, ErrInfo]]],
) -> AsyncPlan[A]:
    return make_coro

Shell-only interpreter (src/funcpipe_rag/infra/adapters/async_runtime.py – never imported into domain)

# funcpipe_rag/infra/adapters/async_runtime.py
async def perform_async(plan: AsyncPlan[A]) -> Result[A, ErrInfo]:
    """The one interpreter – runs the async description."""
    return await plan()

4. Reference Implementations (pure domain code)

4.1 Capability protocols (domain/ports.py)

class HttpClientCap(Protocol):
    async def get_json(self, url: str) -> Result[dict, ErrInfo]: ...

class DbCap(Protocol):
    async def save_user(self, user: UserProfile) -> Result[None, ErrInfo]: ...

4.2 Pure description – dependent async steps (domain/user.py)

# Factored version – the canonical, readable style
def _fetch(user_id: str, http: HttpClientCap) -> AsyncPlan[dict]:
    return async_lift(lambda: http.get_json(f"https://api.example.com/users/{user_id}"))

def _validate(raw: dict) -> AsyncPlan[UserProfile]:
    return async_pure(validate_user(raw))

def _save(user: UserProfile, db: DbCap) -> AsyncPlan[UserProfile]:
    return async_bind(
        async_lift(lambda: db.save_user(user)),
        lambda _: async_pure(user),
    )

def async_user_pipeline(
    user_id: str,
    http: HttpClientCap,
    db: DbCap,
) -> AsyncPlan[UserProfile]:
    return async_bind(
        _fetch(user_id, http),
        lambda raw: async_bind(
            _validate(raw),
            lambda user: _save(user, db),
        ),
    )

4.3 Before → After (the real transformation)

# BEFORE – hidden magic, immediate execution, untestable
async def old_process(user_id: str):
    raw = await http.get_json(f"/users/{user_id}")   # immediate effect
    user = validate_user(raw)
    await db.save_user(user)
    return user

# AFTER – pure description, no execution until shell calls perform_async
def async_user_pipeline(
    user_id: str,
    http: HttpClientCap,
    db: DbCap,
) -> AsyncPlan[UserProfile]:
    return async_bind(
        _fetch(user_id, http),
        lambda raw: async_bind(
            _validate(raw),
            lambda user: _save(user, db),
        ),
    )

Shell usage (only place driving happens):

plan = async_user_pipeline("123", real_http, real_db)
result = await perform_async(plan)   # or asyncio.run(perform_async(plan))

4.4 Real-world RAG example (domain/rag_async.py)

from dataclasses import replace
from funcpipe_rag.domain.effects.async_ import async_gather

def _embed_chunk(
    chunk: ChunkWithoutEmbedding,
    embedder: EmbeddingCap,
) -> AsyncPlan[EmbeddedChunk]:
    return async_bind(
        async_lift(lambda: embedder.embed(chunk.text.content)),
        lambda vector: async_pure(
            replace(chunk, embedding=Embedding(vector, embedder.model_name))
        ),
    )

def async_rag_pipeline(
    docs: Iterator[RawDoc],
    env: RagEnv,
    embedder: EmbeddingCap,
) -> AsyncPlan[list[EmbeddedChunk]]:
    return async_bind(
        async_pure(list(gen_clean_and_chunk(docs, env))),
        lambda chunks: async_map(
            async_gather(  # M08C03 – bounded parallel embedding
                [_embed_chunk(c, embedder) for c in chunks],
                concurrency=env.embedding_concurrency,
            ),
            structural_dedup_chunks,
        ),
    )

5. Property-Based Proofs (all pass in CI)

@given(x=st.integers())
@pytest.mark.asyncio
async def test_async_bind_laws(x: int):
    f = lambda y: async_pure(y + 1)
    g = lambda z: async_pure(z * 2)

    a = async_pure(x)

    assert await perform_async(async_bind(async_bind(a, f), g)) == \
           await perform_async(async_bind(a, lambda y: async_bind(f(y), g)))

    assert await perform_async(async_bind(a, async_pure)) == \
           await perform_async(a)

    assert await perform_async(async_bind(async_pure(x), f)) == \
           await perform_async(f(x))
@pytest.mark.asyncio
async def test_replayability():
    plan = async_lift(lambda: http_mock.get_json("/test"))  # mock is deterministic
    r1 = await perform_async(plan)
    r2 = await perform_async(plan)  # fresh coroutine each call
    assert r1 == r2

6. Big-O & Allocation Guarantees

Operation Time Python call-stack Heap allocation until interpretation
async_pure O(1) O(1) O(1)
async_bind O(1) O(1) O(1) closure
async_lift O(1) O(1) O(1)
perform_async O(chain) O(chain) O(chain) closures

Composition is O(1) per layer — no work until interpretation.

7. Anti-Patterns & Immediate Fixes

Anti-Pattern Symptom Fix
Top-level await / perform_async / create_task in core Immediate effects, untestable Return AsyncPlan, drive only in shell
Re-using a single coroutine object with async_lift Replayability broken Always pass a lambda that creates a fresh coroutine
asyncio.create_task in core Hidden concurrency, leaks Use explicit async_gather / bounded queues (later cores)
Direct httpx.AsyncClient() in core Concrete dependency Inject capability protocol

8. Pre-Core Quiz

  1. async def returns…? → A coroutine object – a pure description
  2. await inside coroutine bodies is…? → Required and allowed – it is part of the description
  3. Driving a plan (perform_async, asyncio.run, create_task) is…? → Only in shells – never in core
  4. Compose async descriptions with…? → async_bind / async_map / async_lift
  5. Real power comes from…? → Concurrency is explicit data, not magic – exactly like IOPlan

9. Post-Core Exercise

  1. Convert one real I/O-bound pipeline in your codebase to return an AsyncPlan.
  2. Chain ≥3 dependent async steps (preferably factored into small private helpers).
  3. Write a Hypothesis property proving the bind laws for your pipeline.
  4. Run the same AsyncPlan twice in a test and assert identical results (for idempotent cases).
  5. Celebrate – you now have async without magic.

Next → M08C02: Async Generators & Streams – Building Async FuncPipe

You now model async operations as pure, composable descriptions — exactly symmetrical to IOPlan. Your core remains pure, your shell controls execution, and concurrency is finally explicit data, not hidden magic.