M08C01: async/await as Descriptions of Steps, Not Hidden Magic¶
Module 08 – Main Track Core
Main track: Cores 1–10 (Async / Concurrent Pipelines → Production).
This is a required core. Every production FuncPipe async system treatsasync defas a pure description of steps — never hidden magic.
Progression Note¶
Module 8 is Async FuncPipe & Backpressure — the lightweight, production-grade concurrency layer that sits directly on top of Module 7’s effect boundaries.
| Module | Focus | Key Outcomes |
|---|---|---|
| 7 | Effect Boundaries & Resource Safety | Ports & adapters, capability protocols, idempotent effects, explicit sessions |
| 8 | Async FuncPipe & Backpressure | Async FuncPipe, non-blocking pipelines, backpressure, basic fairness |
| 9 | FP Across Libraries and Frameworks | FuncPipe style in Pandas/Polars/Dask, FastAPI, CLI, distributed systems |
| 10 | Refactoring, Performance, Future-Proofing | Systematic refactors, performance budgets, governance and long-term evolution |
Core question
How do you model asynchronous operations as pure, composable descriptions using coroutines — so that concurrency is explicit data, testable, and never hidden magic?
Symmetry with Module 7
What you now have after M07 + this core
- Pure domain core
- All effects behind ports
- Effectful operations described as pure data (IOPlan for sync, AsyncPlan for async)
- Typed capability protocols
- Reliable resource cleanup
- Pure logging via Writer
- Idempotent effects
- Explicit sessions & transactions
- Async operations as pure descriptions – async def returns a coroutine object (a value), not execution. await inside coroutine bodies is required and allowed. Driving the plan (perform_async, asyncio.run, create_task, top-level await) happens only in shells.
What the rest of Module 8 adds
- Async generators & streaming (M08C02)
- Bounded concurrency, backpressure, fairness (M08C03–C05)
- Retry & timeout policies as data
- Deterministic async testing
- Full async RAG pipeline
You are now one step away from production-grade async/concurrency.
1. Algebraic Laws & Architectural Invariants (machine-checked where possible)¶
| Category | Name | Description | Enforcement |
|---|---|---|---|
| Algebraic Laws | Left Identity | async_bind(async_pure(x), f) == f(x) |
Hypothesis |
| Algebraic Laws | Right Identity | async_bind(plan, async_pure) == plan |
Hypothesis |
| Algebraic Laws | Associativity | async_bind(async_bind(m, f), g) == async_bind(m, lambda x: async_bind(f(x), g)) |
Hypothesis |
| Architectural Invariants | Description Purity | Constructing an AsyncPlan performs zero side effects. Effects happen only when a plan is driven in the shell. |
Mock tests + static analysis |
| Architectural Invariants | Replayability | The same AsyncPlan value can be called multiple times → each call produces a fresh coroutine. |
Property tests |
| Architectural Invariants | Interpreter Isolation | Core may await inside coroutine bodies, but must never drive a plan (perform_async, asyncio.run, create_task, top-level await). Only shells drive plans. |
mypy --strict + grep-CI + review |
The algebraic laws are true monad laws over Result. The architectural invariants are enforced by tooling and review.
2. Decision Table – Sync vs Async Description¶
| Scenario | Blocking OK? | Needs Concurrency / Non-blocking I/O? | Recommended Pattern |
|---|---|---|---|
| CPU-bound pure logic | Yes | No | Sync functions |
| I/O-bound single step | Yes | No | IOPlan (Module 7) |
| I/O-bound with dependent steps | No | Yes | AsyncPlan + async_bind |
| Independent parallel I/O | No | Yes | async_gather (M08C03) |
| Streaming over network / DB | No | Yes | Async generator + backpressure (M08C02) |
Rule: Use AsyncPlan exactly when you need non-blocking, dependent I/O steps. Never drive a plan in core.
3. Public API – AsyncPlan (src/funcpipe_rag/domain/effects/async_/plan.py – mypy --strict clean)¶
# funcpipe_rag/domain/effects/async_/plan.py
from __future__ import annotations
from collections.abc import Awaitable, Callable
from typing import TypeAlias, TypeVar
from funcpipe_rag import Result, Ok, Err, ErrInfo, assert_never
A = TypeVar("A")
B = TypeVar("B")
# AsyncPlan[T] = a pure, replayable description of async work that eventually yields Result[T, ErrInfo]
AsyncPlan: TypeAlias = Callable[[], Awaitable[Result[A, ErrInfo]]]
def async_pure(value: A) -> AsyncPlan[A]:
async def _coro() -> Result[A, ErrInfo]:
return Ok(value)
return lambda: _coro()
def async_bind(
plan: AsyncPlan[A],
f: Callable[[A], AsyncPlan[B]],
) -> AsyncPlan[B]:
async def _coro() -> Result[B, ErrInfo]:
res = await plan()
match res:
case Ok(value=v):
return await f(v)()
case Err(error=e):
return Err(e)
assert_never(res)
return lambda: _coro()
def async_map(
plan: AsyncPlan[A],
f: Callable[[A], B],
) -> AsyncPlan[B]:
return async_bind(plan, lambda x: async_pure(f(x)))
# Lift a parameterless async capability call into a replayable AsyncPlan
# Precondition: make_coro MUST return a fresh coroutine each call.
# Correct: lambda: capability.method(...)
# Incorrect: coro = capability.method(...); async_lift(lambda: coro)
def async_lift(
make_coro: Callable[[], Awaitable[Result[A, ErrInfo]]],
) -> AsyncPlan[A]:
return make_coro
Shell-only interpreter (src/funcpipe_rag/infra/adapters/async_runtime.py – never imported into domain)¶
# funcpipe_rag/infra/adapters/async_runtime.py
async def perform_async(plan: AsyncPlan[A]) -> Result[A, ErrInfo]:
"""The one interpreter – runs the async description."""
return await plan()
4. Reference Implementations (pure domain code)¶
4.1 Capability protocols (domain/ports.py)¶
class HttpClientCap(Protocol):
async def get_json(self, url: str) -> Result[dict, ErrInfo]: ...
class DbCap(Protocol):
async def save_user(self, user: UserProfile) -> Result[None, ErrInfo]: ...
4.2 Pure description – dependent async steps (domain/user.py)¶
# Factored version – the canonical, readable style
def _fetch(user_id: str, http: HttpClientCap) -> AsyncPlan[dict]:
return async_lift(lambda: http.get_json(f"https://api.example.com/users/{user_id}"))
def _validate(raw: dict) -> AsyncPlan[UserProfile]:
return async_pure(validate_user(raw))
def _save(user: UserProfile, db: DbCap) -> AsyncPlan[UserProfile]:
return async_bind(
async_lift(lambda: db.save_user(user)),
lambda _: async_pure(user),
)
def async_user_pipeline(
user_id: str,
http: HttpClientCap,
db: DbCap,
) -> AsyncPlan[UserProfile]:
return async_bind(
_fetch(user_id, http),
lambda raw: async_bind(
_validate(raw),
lambda user: _save(user, db),
),
)
4.3 Before → After (the real transformation)¶
# BEFORE – hidden magic, immediate execution, untestable
async def old_process(user_id: str):
raw = await http.get_json(f"/users/{user_id}") # immediate effect
user = validate_user(raw)
await db.save_user(user)
return user
# AFTER – pure description, no execution until shell calls perform_async
def async_user_pipeline(
user_id: str,
http: HttpClientCap,
db: DbCap,
) -> AsyncPlan[UserProfile]:
return async_bind(
_fetch(user_id, http),
lambda raw: async_bind(
_validate(raw),
lambda user: _save(user, db),
),
)
Shell usage (only place driving happens):
plan = async_user_pipeline("123", real_http, real_db)
result = await perform_async(plan) # or asyncio.run(perform_async(plan))
4.4 Real-world RAG example (domain/rag_async.py)¶
from dataclasses import replace
from funcpipe_rag.domain.effects.async_ import async_gather
def _embed_chunk(
chunk: ChunkWithoutEmbedding,
embedder: EmbeddingCap,
) -> AsyncPlan[EmbeddedChunk]:
return async_bind(
async_lift(lambda: embedder.embed(chunk.text.content)),
lambda vector: async_pure(
replace(chunk, embedding=Embedding(vector, embedder.model_name))
),
)
def async_rag_pipeline(
docs: Iterator[RawDoc],
env: RagEnv,
embedder: EmbeddingCap,
) -> AsyncPlan[list[EmbeddedChunk]]:
return async_bind(
async_pure(list(gen_clean_and_chunk(docs, env))),
lambda chunks: async_map(
async_gather( # M08C03 – bounded parallel embedding
[_embed_chunk(c, embedder) for c in chunks],
concurrency=env.embedding_concurrency,
),
structural_dedup_chunks,
),
)
5. Property-Based Proofs (all pass in CI)¶
@given(x=st.integers())
@pytest.mark.asyncio
async def test_async_bind_laws(x: int):
f = lambda y: async_pure(y + 1)
g = lambda z: async_pure(z * 2)
a = async_pure(x)
assert await perform_async(async_bind(async_bind(a, f), g)) == \
await perform_async(async_bind(a, lambda y: async_bind(f(y), g)))
assert await perform_async(async_bind(a, async_pure)) == \
await perform_async(a)
assert await perform_async(async_bind(async_pure(x), f)) == \
await perform_async(f(x))
@pytest.mark.asyncio
async def test_replayability():
plan = async_lift(lambda: http_mock.get_json("/test")) # mock is deterministic
r1 = await perform_async(plan)
r2 = await perform_async(plan) # fresh coroutine each call
assert r1 == r2
6. Big-O & Allocation Guarantees¶
| Operation | Time | Python call-stack | Heap allocation until interpretation |
|---|---|---|---|
| async_pure | O(1) | O(1) | O(1) |
| async_bind | O(1) | O(1) | O(1) closure |
| async_lift | O(1) | O(1) | O(1) |
| perform_async | O(chain) | O(chain) | O(chain) closures |
Composition is O(1) per layer — no work until interpretation.
7. Anti-Patterns & Immediate Fixes¶
| Anti-Pattern | Symptom | Fix |
|---|---|---|
Top-level await / perform_async / create_task in core |
Immediate effects, untestable | Return AsyncPlan, drive only in shell |
Re-using a single coroutine object with async_lift |
Replayability broken | Always pass a lambda that creates a fresh coroutine |
asyncio.create_task in core |
Hidden concurrency, leaks | Use explicit async_gather / bounded queues (later cores) |
Direct httpx.AsyncClient() in core |
Concrete dependency | Inject capability protocol |
8. Pre-Core Quiz¶
async defreturns…? → A coroutine object – a pure descriptionawaitinside coroutine bodies is…? → Required and allowed – it is part of the description- Driving a plan (
perform_async,asyncio.run,create_task) is…? → Only in shells – never in core - Compose async descriptions with…? →
async_bind/async_map/async_lift - Real power comes from…? → Concurrency is explicit data, not magic – exactly like
IOPlan
9. Post-Core Exercise¶
- Convert one real I/O-bound pipeline in your codebase to return an
AsyncPlan. - Chain ≥3 dependent async steps (preferably factored into small private helpers).
- Write a Hypothesis property proving the bind laws for your pipeline.
- Run the same
AsyncPlantwice in a test and assert identical results (for idempotent cases). - Celebrate – you now have async without magic.
Next → M08C02: Async Generators & Streams – Building Async FuncPipe
You now model async operations as pure, composable descriptions — exactly symmetrical to IOPlan. Your core remains pure, your shell controls execution, and concurrency is finally explicit data, not hidden magic.