M07C07: Composing Effects – Building Larger Behaviour from Small Capability Interfaces¶
Module 07 – Main Track Core
Main track: Cores 1, 3–10 (Ports & Adapters + Capability Protocols → Production).
This is a required core. Every production FuncPipe system composes small capabilities into reusable behaviours.
Progression Note¶
Module 7 takes the lawful containers and pipelines from Module 6 and puts all effects behind explicit boundaries.
| Module | Focus | Key Outcomes |
|---|---|---|
| 6 | Monadic Flows as Composable Pipelines | Lawful and_then, Reader/State/Writer patterns, error-typed flows |
| 7 | Effect Boundaries & Resource Safety | Ports & adapters, capability protocols, resource-safe IO, idempotency |
| 8 | Async / Concurrent Pipelines | Backpressure, timeouts, resumability, fairness (built on 6–7) |
Core question
How do you compose small capability protocols into larger, reusable behaviours using higher-order functions and monadic chaining – without ever bloating interfaces or violating minimality?
What you now have after M07C01–M07C06 + this core
- Pure domain core
- Zero direct I/O in domain code
- All I/O behind swappable ports
- Effectful operations described as pure data (IOPlan)
- Typed capability protocols for every common effect
- Reliable resource cleanup
- Pure, composable logging via Writer
- Statically verified capability isolation
- Composable behaviours – small capabilities wrapped and chained into reusable, higher-order effect descriptions
What the rest of Module 7 adds
- Idempotent effect design
- Transaction/session patterns
- Incremental migration playbook
- Production story: CI, golden tests, shadow traffic
You are now six steps away from a complete production-grade functional architecture.
1. Laws & Invariants (machine-checked where possible)¶
| Law / Invariant | Description | Enforcement |
|---|---|---|
| Behaviour Neutrality | For deterministic capability implementations and purely observational side-channels (logging/metrics), wrapping in a higher-order behaviour does not change the primary result. | Equivalence tests |
| Bind Right Identity | For any plan: io_bind(plan, io_pure) == plan |
Hypothesis |
| Bind Left Identity | For any x: io_bind(io_pure(x), f) == f(x) |
Hypothesis |
| Bind Associativity | io_bind(io_bind(b, f), g) == io_bind(b, lambda x: io_bind(f(x), g)) |
Hypothesis |
| Minimalism Preservation | Composed behaviours use only the capabilities they declare; no hidden extras. | mypy --strict + review |
These laws guarantee that composition is predictable, refactor-safe, and never silently widens capability usage.
2. Decision Table – How to Combine Capabilities?¶
| Need | Scale | Purity Required | Recommended Pattern |
|---|---|---|---|
| One-off orchestration | Small | Yes | Direct arguments (storage: StorageRead, logger: Logger) |
| Reusable behaviour | Medium | Yes | Higher-order wrapper (logged_read(storage, logger)) |
| Many capabilities | Large | Yes | Aggregated protocol in domain + concrete dataclass in shell + Reader injection |
| Full pipeline | Very large | No (shell) | IOPlan chaining + perform in shell |
Golden rule: Never create a function that takes more than ~4 capability arguments. Past that point, readability degrades and refactors become painful – use an aggregated protocol (domain) + concrete dataclass (shell-only).
3. Public API – Composition Helpers (src/funcpipe_rag/domain/composition.py)¶
# src/funcpipe_rag/domain/composition.py – mypy --strict clean
from __future__ import annotations
from collections.abc import Callable, Iterator
from typing import TypeVar
from funcpipe_rag.core.rag_types import RawDoc
from funcpipe_rag.domain.capabilities import Logger, StorageRead
from funcpipe_rag.domain.logging import LogEntry
from funcpipe_rag.domain.effects.io_plan import IOPlan, io_bind, io_delay
from funcpipe_rag.result.types import ErrInfo, Ok, Result
A = TypeVar("A")
B = TypeVar("B")
C = TypeVar("C")
# Higher-order wrapper: logged read
def logged_read(
storage: StorageRead,
logger: Logger,
) -> Callable[[str], IOPlan[Iterator[Result[RawDoc, ErrInfo]]]]:
"""Reusable behaviour: logging + storage.read_docs, but still just an IOPlan."""
def behaviour(path: str) -> IOPlan[Iterator[Result[RawDoc, ErrInfo]]]:
def act() -> Result[Iterator[Result[RawDoc, ErrInfo]], ErrInfo]:
logger.log(LogEntry("INFO", f"read_docs path={path}"))
return Ok(storage.read_docs(path))
return io_delay(act)
return behaviour
# Binary chaining helper (associativity makes nesting safe)
def chain_io(
f: Callable[[A], IOPlan[B]],
g: Callable[[B], IOPlan[C]],
) -> Callable[[A], IOPlan[C]]:
return lambda a: io_bind(f(a), g)
4. Reference Implementations¶
4.1 Two Sanctioned Patterns for Multi-Capability Needs¶
Pattern 1 – Direct arguments (small, local composition)
# Illustrative example (not a repo file): direct-argument capability injection.
def rag_stage_needing_read_and_clock(
storage: StorageRead,
clock: Clock,
path: str,
) -> Writer[Iterator[Chunk], Logs]:
start = clock.now()
docs = storage.read_docs(path)
# ... pure processing
return trace_stage(f"stage took {clock.now() - start}")
Pattern 2 – Aggregated protocol in domain + concrete dataclass in shell
# Illustrative example (not a repo file): bundling many capabilities behind one Protocol.
class RagCapabilities(Protocol):
storage: Storage
logger: Logger
clock: Clock
# shell/capabilities.py – shell-only concrete bundle
@dataclass(frozen=True)
class ProdRagCapabilities:
storage: Storage
logger: Logger
clock: Clock
4.2 Full RAG Composition Example (domain code – pure)¶
# Illustrative example (not a repo file): composing behaviours from smaller IOPlan programs.
from funcpipe_rag import Reader # in the real file, import Reader from its actual module
def rag_composed() -> Reader[RagCapabilities, IOPlan[Result[None, ErrInfo]]]:
def run(cap: RagCapabilities) -> IOPlan[Result[None, ErrInfo]]:
read = logged_read(cap.storage, cap.logger)
process = timed_process(cap.clock) # sketch – follows logged_read pattern
write = atomic_write(cap.storage) # sketch – follows logged_read pattern
return chain_io(
chain_io(read, process),
write,
)("in.csv")
return Reader(run)
Shell simply injects the real adapter:
cap = ProdRagCapabilities(
storage=FileStorage(),
logger=ConsoleLogger(),
clock=SystemClock(),
)
plan = rag_composed().run(cap)
result = perform(plan)
4.3 Before → After¶
# Before – inline, duplicated, eager
def old_rag(storage, logger, clock):
logger.log("reading")
docs = storage.read_docs(path)
start = clock.now()
chunks = process(docs)
logger.log(f"processed in {clock.now() - start}")
storage.write_chunks(output_path, chunks)
# After – composed, reusable, delayed
behaviour = chain_io(
chain_io(
logged_read(storage, logger),
timed_process(clock),
),
atomic_write(storage),
)
plan = behaviour(path)
result = perform(plan)
5. Property-Based Proofs (selected)¶
# In this repo, IOPlan monad laws are checked in:
# - tests/unit/domain/test_io_plan_laws.py
#
# The `logged_read` helper is a thin wrapper in:
# - src/funcpipe_rag/domain/composition.py
#
# An additional "neutrality" property (logged vs unlogged read yields identical values)
# is a good optional exercise when you introduce richer in-memory adapter fixtures.
6. Big-O & Allocation Guarantees¶
| Operation | Time | Call-stack | Heap | Allocation |
|---|---|---|---|---|
| Higher-order wrap | O(1) | O(1) | O(1) | O(1) |
io_bind chain |
O(1) per step | O(depth) | O(1) per step | O(depth) closures |
Composition adds only constant overhead per layer – no work is performed until perform.
7. Anti-Patterns & Immediate Fixes¶
| Anti-Pattern | Symptom | Fix |
|---|---|---|
| Inline orchestration | Duplicated sequencing logic | Higher-order reusable behaviours |
| Concrete capability bundle in core | Domain code depending on concrete dataclass | Aggregated protocol in domain + concrete dataclass in shell |
| Eager effects in composition | Premature I/O | Always return IOPlan descriptions |
| Mega-port | One protocol with 40 methods | One capability → one protocol |
8. Pre-Core Quiz¶
- Compose small capabilities with…? → Higher-order functions
- Chain delayed effects with…? →
io_bind - Inject many capabilities via…? → Aggregated protocol in domain + concrete dataclass in shell + Reader
- Where do you create concrete bundles? → Shell only
- Real power comes from…? → Reusable behaviours without interface bloat
9. Post-Core Exercise¶
- Write a
timed_readbehaviour that logs duration usingClock. - Chain it with
logged_writeinto a reusablecopy_with_timing. - Refactor your real RAG pipeline to use a shell-only concrete
ProdRagCapabilitiesdataclass injected via Reader. - Write a property test proving two different
Clockimplementations yield identical logs (modulo timestamps).
Next → M07C08: Idempotent Effect Design for Safe Retries and Replays
You now compose tiny capability protocols into powerful, reusable behaviours – all while preserving minimality, purity, and static safety. Your system scales without interface bloat or duplicated orchestration logic. The remaining cores are specialisations and production patterns.