Skip to content

M07C07: Composing Effects – Building Larger Behaviour from Small Capability Interfaces

Module 07 – Main Track Core

Main track: Cores 1, 3–10 (Ports & Adapters + Capability Protocols → Production).
This is a required core. Every production FuncPipe system composes small capabilities into reusable behaviours.

Progression Note

Module 7 takes the lawful containers and pipelines from Module 6 and puts all effects behind explicit boundaries.

Module Focus Key Outcomes
6 Monadic Flows as Composable Pipelines Lawful and_then, Reader/State/Writer patterns, error-typed flows
7 Effect Boundaries & Resource Safety Ports & adapters, capability protocols, resource-safe IO, idempotency
8 Async / Concurrent Pipelines Backpressure, timeouts, resumability, fairness (built on 6–7)

Core question
How do you compose small capability protocols into larger, reusable behaviours using higher-order functions and monadic chaining – without ever bloating interfaces or violating minimality?

What you now have after M07C01–M07C06 + this core - Pure domain core
- Zero direct I/O in domain code
- All I/O behind swappable ports
- Effectful operations described as pure data (IOPlan)
- Typed capability protocols for every common effect
- Reliable resource cleanup
- Pure, composable logging via Writer
- Statically verified capability isolation
- Composable behaviours – small capabilities wrapped and chained into reusable, higher-order effect descriptions

What the rest of Module 7 adds - Idempotent effect design
- Transaction/session patterns
- Incremental migration playbook
- Production story: CI, golden tests, shadow traffic

You are now six steps away from a complete production-grade functional architecture.

1. Laws & Invariants (machine-checked where possible)

Law / Invariant Description Enforcement
Behaviour Neutrality For deterministic capability implementations and purely observational side-channels (logging/metrics), wrapping in a higher-order behaviour does not change the primary result. Equivalence tests
Bind Right Identity For any plan: io_bind(plan, io_pure) == plan Hypothesis
Bind Left Identity For any x: io_bind(io_pure(x), f) == f(x) Hypothesis
Bind Associativity io_bind(io_bind(b, f), g) == io_bind(b, lambda x: io_bind(f(x), g)) Hypothesis
Minimalism Preservation Composed behaviours use only the capabilities they declare; no hidden extras. mypy --strict + review

These laws guarantee that composition is predictable, refactor-safe, and never silently widens capability usage.

2. Decision Table – How to Combine Capabilities?

Need Scale Purity Required Recommended Pattern
One-off orchestration Small Yes Direct arguments (storage: StorageRead, logger: Logger)
Reusable behaviour Medium Yes Higher-order wrapper (logged_read(storage, logger))
Many capabilities Large Yes Aggregated protocol in domain + concrete dataclass in shell + Reader injection
Full pipeline Very large No (shell) IOPlan chaining + perform in shell

Golden rule: Never create a function that takes more than ~4 capability arguments. Past that point, readability degrades and refactors become painful – use an aggregated protocol (domain) + concrete dataclass (shell-only).

3. Public API – Composition Helpers (src/funcpipe_rag/domain/composition.py)

# src/funcpipe_rag/domain/composition.py – mypy --strict clean
from __future__ import annotations
from collections.abc import Callable, Iterator
from typing import TypeVar

from funcpipe_rag.core.rag_types import RawDoc
from funcpipe_rag.domain.capabilities import Logger, StorageRead
from funcpipe_rag.domain.logging import LogEntry
from funcpipe_rag.domain.effects.io_plan import IOPlan, io_bind, io_delay
from funcpipe_rag.result.types import ErrInfo, Ok, Result

A = TypeVar("A")
B = TypeVar("B")
C = TypeVar("C")

# Higher-order wrapper: logged read
def logged_read(
    storage: StorageRead,
    logger: Logger,
) -> Callable[[str], IOPlan[Iterator[Result[RawDoc, ErrInfo]]]]:
    """Reusable behaviour: logging + storage.read_docs, but still just an IOPlan."""
    def behaviour(path: str) -> IOPlan[Iterator[Result[RawDoc, ErrInfo]]]:
        def act() -> Result[Iterator[Result[RawDoc, ErrInfo]], ErrInfo]:
            logger.log(LogEntry("INFO", f"read_docs path={path}"))
            return Ok(storage.read_docs(path))

        return io_delay(act)
    return behaviour

# Binary chaining helper (associativity makes nesting safe)
def chain_io(
    f: Callable[[A], IOPlan[B]],
    g: Callable[[B], IOPlan[C]],
) -> Callable[[A], IOPlan[C]]:
    return lambda a: io_bind(f(a), g)

4. Reference Implementations

4.1 Two Sanctioned Patterns for Multi-Capability Needs

Pattern 1 – Direct arguments (small, local composition)

# Illustrative example (not a repo file): direct-argument capability injection.
def rag_stage_needing_read_and_clock(
    storage: StorageRead,
    clock: Clock,
    path: str,
) -> Writer[Iterator[Chunk], Logs]:
    start = clock.now()
    docs = storage.read_docs(path)
    # ... pure processing
    return trace_stage(f"stage took {clock.now() - start}")

Pattern 2 – Aggregated protocol in domain + concrete dataclass in shell

# Illustrative example (not a repo file): bundling many capabilities behind one Protocol.
class RagCapabilities(Protocol):
    storage: Storage
    logger: Logger
    clock: Clock

# shell/capabilities.py – shell-only concrete bundle
@dataclass(frozen=True)
class ProdRagCapabilities:
    storage: Storage
    logger: Logger
    clock: Clock

4.2 Full RAG Composition Example (domain code – pure)

# Illustrative example (not a repo file): composing behaviours from smaller IOPlan programs.
from funcpipe_rag import Reader  # in the real file, import Reader from its actual module

def rag_composed() -> Reader[RagCapabilities, IOPlan[Result[None, ErrInfo]]]:
    def run(cap: RagCapabilities) -> IOPlan[Result[None, ErrInfo]]:
        read = logged_read(cap.storage, cap.logger)
        process = timed_process(cap.clock)      # sketch – follows logged_read pattern
        write = atomic_write(cap.storage)       # sketch – follows logged_read pattern

        return chain_io(
            chain_io(read, process),
            write,
        )("in.csv")
    return Reader(run)

Shell simply injects the real adapter:

cap = ProdRagCapabilities(
    storage=FileStorage(),
    logger=ConsoleLogger(),
    clock=SystemClock(),
)
plan = rag_composed().run(cap)
result = perform(plan)

4.3 Before → After

# Before – inline, duplicated, eager
def old_rag(storage, logger, clock):
    logger.log("reading")
    docs = storage.read_docs(path)
    start = clock.now()
    chunks = process(docs)
    logger.log(f"processed in {clock.now() - start}")
    storage.write_chunks(output_path, chunks)

# After – composed, reusable, delayed
behaviour = chain_io(
    chain_io(
        logged_read(storage, logger),
        timed_process(clock),
    ),
    atomic_write(storage),
)
plan = behaviour(path)
result = perform(plan)

5. Property-Based Proofs (selected)

# In this repo, IOPlan monad laws are checked in:
# - tests/unit/domain/test_io_plan_laws.py
#
# The `logged_read` helper is a thin wrapper in:
# - src/funcpipe_rag/domain/composition.py
#
# An additional "neutrality" property (logged vs unlogged read yields identical values)
# is a good optional exercise when you introduce richer in-memory adapter fixtures.

6. Big-O & Allocation Guarantees

Operation Time Call-stack Heap Allocation
Higher-order wrap O(1) O(1) O(1) O(1)
io_bind chain O(1) per step O(depth) O(1) per step O(depth) closures

Composition adds only constant overhead per layer – no work is performed until perform.

7. Anti-Patterns & Immediate Fixes

Anti-Pattern Symptom Fix
Inline orchestration Duplicated sequencing logic Higher-order reusable behaviours
Concrete capability bundle in core Domain code depending on concrete dataclass Aggregated protocol in domain + concrete dataclass in shell
Eager effects in composition Premature I/O Always return IOPlan descriptions
Mega-port One protocol with 40 methods One capability → one protocol

8. Pre-Core Quiz

  1. Compose small capabilities with…? → Higher-order functions
  2. Chain delayed effects with…? → io_bind
  3. Inject many capabilities via…? → Aggregated protocol in domain + concrete dataclass in shell + Reader
  4. Where do you create concrete bundles? → Shell only
  5. Real power comes from…? → Reusable behaviours without interface bloat

9. Post-Core Exercise

  1. Write a timed_read behaviour that logs duration using Clock.
  2. Chain it with logged_write into a reusable copy_with_timing.
  3. Refactor your real RAG pipeline to use a shell-only concrete ProdRagCapabilities dataclass injected via Reader.
  4. Write a property test proving two different Clock implementations yield identical logs (modulo timestamps).

Next → M07C08: Idempotent Effect Design for Safe Retries and Replays

You now compose tiny capability protocols into powerful, reusable behaviours – all while preserving minimality, purity, and static safety. Your system scales without interface bloat or duplicated orchestration logic. The remaining cores are specialisations and production patterns.