M07C03: Capability Protocols – Typed Effect Interfaces (Storage, Clock, Logger, Cache)¶
Module 07 – Main Track Core
Main track: Cores 1, 3–10 (Ports & Adapters + Capability Protocols → Production).
This is a required core. Every production FuncPipe system uses capability protocols.
Progression Note¶
Module 7 takes the lawful containers and pipelines from Module 6 and puts all effects behind explicit boundaries.
| Module | Focus | Key Outcomes |
|---|---|---|
| 6 | Monadic Flows as Composable Pipelines | Lawful and_then, Reader/State/Writer patterns, error-typed flows |
| 7 | Effect Boundaries & Resource Safety | Ports & adapters, capability protocols, resource-safe IO, idempotency |
| 8 | Async / Concurrent Pipelines | Backpressure, timeouts, resumability, fairness (built on 6–7) |
Core question
How do you design effect interfaces as capability protocols for common dependencies (storage, clock, logger, cache, etc.), ensuring static checking, composability, and effect isolation in FuncPipe pipelines?
What you now have after M07C01 + M07C02 + this core
- Pure domain core
- Zero direct I/O in domain code
- All I/O behind swappable adapters implementing capability protocols (Storage, Clock, Logger)
- Effectful operations described as pure data (IOPlan in funcpipe_rag.domain.effects.io_plan)
- A core set of typed capability protocols that let you inject time, logging, caching, etc. with full mypy --strict safety and zero accidental coupling
What the rest of Module 7 adds
- Resource safety guarantees (even on partial consumption)
- Idempotent effect design
- Transaction/session patterns
- Incremental migration playbook
- Production story: CI, golden tests, shadow traffic
You are now two steps away from a complete production-grade functional architecture.
1. Laws & Invariants (machine-checked where possible)¶
| Law / Invariant | Description | Enforcement |
|---|---|---|
| Minimalism | Each protocol contains only the methods required by its capability. No extras. | Code review |
| Capability Isolation | Core code types its dependencies as protocols (so mypy prevents calling undeclared methods); import layering (no infra in domain) is enforced by review/CI. | mypy --strict + CI |
| Determinism (mock) | Fixed mock implementations yield identical outputs for the same inputs. | Hypothesis |
| Monotonicity (logical clock) | Logical/test clocks are non-decreasing across calls. | Property tests |
| Error Contract | Fallible protocol methods return typed Result; no raw exceptions cross protocol boundaries into the core. |
Type checks + runtime contract |
| Adapter Equivalence | Real vs mock/fake implementations of the same protocol produce identical Result values for the same logical inputs. |
Hypothesis (swappability) |
These laws turn dependency injection from a runtime pattern into a type-checked + property-tested + review-enforced discipline.
2. Decision Table – Which Capability Needs Which Protocol?¶
| Capability | Fallible? | Needs State? | Recommended Protocol Pattern |
|---|---|---|---|
| Storage Read | Yes | No | StorageRead (Iterator[Result[RawDoc, ErrInfo]]) |
| Storage Write | Yes | No | StorageWrite (Result[None, ErrInfo]) |
| Clock | No | Yes | Clock (now() → datetime) |
| Logger | No | Yes | Logger (log(entry: LogEntry) → None) |
| Cache | Yes | Yes | Cache (Result[Option[Chunk], ErrInfo]) |
Rule: One capability → one protocol. Never a “GodPort”.
Important: Logging and clock are deliberately best-effort and infallible in the model. Concrete implementations should swallow or separately report their own failures (e.g. broken stdout) – they must never crash the core pipeline.
3. Public API – Capability Protocols (src/funcpipe_rag/domain/capabilities.py)¶
# src/funcpipe_rag/domain/capabilities.py – mypy --strict clean
from __future__ import annotations
from datetime import datetime
from collections.abc import Iterator
from typing import Protocol
from funcpipe_rag.core.rag_types import Chunk, RawDoc
from funcpipe_rag.domain.logging import LogEntry
from funcpipe_rag.result.types import ErrInfo, Option, Result
__all__ = [
"StorageRead",
"StorageWrite",
"Storage",
"Clock",
"Logger",
"Cache",
]
class StorageRead(Protocol):
def read_docs(self, path: str) -> Iterator[Result[RawDoc, ErrInfo]]: ...
class StorageWrite(Protocol):
def write_chunks(self, path: str, chunks: Iterator[Chunk]) -> Result[None, ErrInfo]: ...
class Storage(StorageRead, StorageWrite, Protocol):
"""Composed capability: full read/write access."""
class Clock(Protocol):
def now(self) -> datetime: ...
class Logger(Protocol):
def log(self, entry: LogEntry) -> None: ...
class Cache(Protocol):
def get(self, key: str) -> Result[Option[Chunk], ErrInfo]: ...
def set(self, key: str, chunk: Chunk) -> Result[None, ErrInfo]: ...
Zero concrete code. Zero imports from infra. These are pure capability interfaces.
Cache uses proper Option[Chunk] – cache miss ≠ legitimate None payload. We never reintroduce the Optional[T] lie.
4. Reference Implementations – Real & Mock Adapters¶
4.1 FileStorage (real I/O – satisfies both read and write)¶
4.2 SystemClock & MonotonicTestClock¶
# src/funcpipe_rag/infra/adapters/clock.py
from datetime import datetime, timezone, timedelta
class SystemClock(Clock):
def now(self) -> datetime:
return datetime.now(timezone.utc)
class MonotonicTestClock(Clock):
def __init__(self, start: datetime | None = None) -> None:
self._now = (start or datetime.now(timezone.utc)).replace(tzinfo=timezone.utc)
def now(self) -> datetime:
self._now += timedelta(microseconds=1)
return self._now
4.3 ConsoleLogger & CollectingLogger¶
# src/funcpipe_rag/infra/adapters/logger.py
class ConsoleLogger(Logger):
def log(self, entry: LogEntry) -> None:
try:
print(f"[{entry.level}] {entry.msg}")
except OSError:
pass # best-effort – never crash the pipeline
class CollectingLogger(Logger):
def __init__(self) -> None:
self.entries: list[LogEntry] = []
def log(self, entry: LogEntry) -> None:
self.entries.append(entry)
4.4 Integration – Core Uses Only Protocols + Writer for Logging¶
# Illustrative example (not a repo file): core depends only on protocols.
def timed_embed_chunk(
chunk: Chunk,
clock: Clock,
) -> Writer[Result[EmbeddedChunk, ErrInfo], LogEntry]:
start = clock.now()
result = embed_chunk_pure(chunk)
duration = clock.now() - start
return tell(LogEntry("INFO", f"embedded doc_id={chunk.doc_id} in {duration}")).map(lambda _: result)
Shell drains Writer and sends to real logger:
# Illustrative shell sketch (not a repo file): drain Writer logs to a real logger adapter.
result, logs = run_writer(writer_program)
for entry in logs:
logger.log(entry)
Never pass Logger into the core – logging stays pure via Writer.
5. Property-Based Proofs (selected)¶
# Repo references:
# - Clocks: `src/funcpipe_rag/infra/adapters/clock.py`
# - File storage: `src/funcpipe_rag/infra/adapters/file_storage.py`
# - In-memory storage: `src/funcpipe_rag/infra/adapters/memory_storage.py`
#
# This repo currently tests file-storage resource safety and parse errors in:
# - tests/unit/infra/adapters/test_file_storage.py
6. Big-O & Allocation Guarantees¶
| Operation | Time | Call-stack | Heap | Allocation |
|---|---|---|---|---|
| Protocol dispatch | O(1) | O(1) | O(1) | O(1) |
Protocols introduce no additional runtime overhead beyond normal method dispatch.
7. Anti-Patterns & Immediate Fixes¶
| Anti-Pattern | Symptom | Fix |
|---|---|---|
| Concrete class in core | Import from infra → coupling | Depend only on protocol |
| God protocol | One interface with 40 methods | One capability → one protocol |
| Optional[T] for absence | Cache miss indistinguishable | Use proper Option[T] |
| Logger in core | Impure logging in pure code | Log via Writer; drain in shell |
8. Pre-Core Quiz¶
- Capability protocols are…? → Typed interfaces for exactly one responsibility
- Core depends on…? → Only the protocol, never the concrete adapter
- Cache miss is modeled as…? →
Option[Chunk](neverChunk | None) - Logging in core uses…? →
Writer[_, LogEntry]– neverLogger - Real power comes from…? → Static checking + runtime swappability
9. Post-Core Exercise¶
- Define a
MetricsProtocoland implement both no-op and Prometheus versions. - Refactor one pipeline stage to use
MonotonicTestClockin tests and prove timing logic via collectednow()calls. - Write a property test that proves
CollectingLoggerreceives exactly the expected entries. - Add a
AppCapabilitiesdataclass bundling all your protocols and refactor one shell to inject a single object.
Next → M07C04: Resource Safety – Guaranteeing Cleanup on Partial Consumption
You now have statically checked, composable capability protocols for every common effect. Your core is pure, your shell injects real implementations, and mypy + Hypothesis guarantee you never accidentally reach into concrete infrastructure or conflate absence with null. This is the foundation every production FuncPipe system is built on.