Skip to content

M07C03: Capability Protocols – Typed Effect Interfaces (Storage, Clock, Logger, Cache)

Module 07 – Main Track Core

Main track: Cores 1, 3–10 (Ports & Adapters + Capability Protocols → Production).
This is a required core. Every production FuncPipe system uses capability protocols.

Progression Note

Module 7 takes the lawful containers and pipelines from Module 6 and puts all effects behind explicit boundaries.

Module Focus Key Outcomes
6 Monadic Flows as Composable Pipelines Lawful and_then, Reader/State/Writer patterns, error-typed flows
7 Effect Boundaries & Resource Safety Ports & adapters, capability protocols, resource-safe IO, idempotency
8 Async / Concurrent Pipelines Backpressure, timeouts, resumability, fairness (built on 6–7)

Core question
How do you design effect interfaces as capability protocols for common dependencies (storage, clock, logger, cache, etc.), ensuring static checking, composability, and effect isolation in FuncPipe pipelines?

What you now have after M07C01 + M07C02 + this core - Pure domain core
- Zero direct I/O in domain code
- All I/O behind swappable adapters implementing capability protocols (Storage, Clock, Logger)
- Effectful operations described as pure data (IOPlan in funcpipe_rag.domain.effects.io_plan)
- A core set of typed capability protocols that let you inject time, logging, caching, etc. with full mypy --strict safety and zero accidental coupling

What the rest of Module 7 adds - Resource safety guarantees (even on partial consumption)
- Idempotent effect design
- Transaction/session patterns
- Incremental migration playbook
- Production story: CI, golden tests, shadow traffic

You are now two steps away from a complete production-grade functional architecture.

1. Laws & Invariants (machine-checked where possible)

Law / Invariant Description Enforcement
Minimalism Each protocol contains only the methods required by its capability. No extras. Code review
Capability Isolation Core code types its dependencies as protocols (so mypy prevents calling undeclared methods); import layering (no infra in domain) is enforced by review/CI. mypy --strict + CI
Determinism (mock) Fixed mock implementations yield identical outputs for the same inputs. Hypothesis
Monotonicity (logical clock) Logical/test clocks are non-decreasing across calls. Property tests
Error Contract Fallible protocol methods return typed Result; no raw exceptions cross protocol boundaries into the core. Type checks + runtime contract
Adapter Equivalence Real vs mock/fake implementations of the same protocol produce identical Result values for the same logical inputs. Hypothesis (swappability)

These laws turn dependency injection from a runtime pattern into a type-checked + property-tested + review-enforced discipline.

2. Decision Table – Which Capability Needs Which Protocol?

Capability Fallible? Needs State? Recommended Protocol Pattern
Storage Read Yes No StorageRead (Iterator[Result[RawDoc, ErrInfo]])
Storage Write Yes No StorageWrite (Result[None, ErrInfo])
Clock No Yes Clock (now() → datetime)
Logger No Yes Logger (log(entry: LogEntry) → None)
Cache Yes Yes Cache (Result[Option[Chunk], ErrInfo])

Rule: One capability → one protocol. Never a “GodPort”.

Important: Logging and clock are deliberately best-effort and infallible in the model. Concrete implementations should swallow or separately report their own failures (e.g. broken stdout) – they must never crash the core pipeline.

3. Public API – Capability Protocols (src/funcpipe_rag/domain/capabilities.py)

# src/funcpipe_rag/domain/capabilities.py – mypy --strict clean
from __future__ import annotations
from datetime import datetime
from collections.abc import Iterator
from typing import Protocol

from funcpipe_rag.core.rag_types import Chunk, RawDoc
from funcpipe_rag.domain.logging import LogEntry
from funcpipe_rag.result.types import ErrInfo, Option, Result

__all__ = [
    "StorageRead",
    "StorageWrite",
    "Storage",
    "Clock",
    "Logger",
    "Cache",
]

class StorageRead(Protocol):
    def read_docs(self, path: str) -> Iterator[Result[RawDoc, ErrInfo]]: ...

class StorageWrite(Protocol):
    def write_chunks(self, path: str, chunks: Iterator[Chunk]) -> Result[None, ErrInfo]: ...

class Storage(StorageRead, StorageWrite, Protocol):
    """Composed capability: full read/write access."""

class Clock(Protocol):
    def now(self) -> datetime: ...

class Logger(Protocol):
    def log(self, entry: LogEntry) -> None: ...

class Cache(Protocol):
    def get(self, key: str) -> Result[Option[Chunk], ErrInfo]: ...
    def set(self, key: str, chunk: Chunk) -> Result[None, ErrInfo]: ...

Zero concrete code. Zero imports from infra. These are pure capability interfaces.

Cache uses proper Option[Chunk] – cache miss ≠ legitimate None payload. We never reintroduce the Optional[T] lie.

4. Reference Implementations – Real & Mock Adapters

4.1 FileStorage (real I/O – satisfies both read and write)

# src/funcpipe_rag/infra/adapters/file_storage.py
class FileStorage(Storage):
    ...

4.2 SystemClock & MonotonicTestClock

# src/funcpipe_rag/infra/adapters/clock.py
from datetime import datetime, timezone, timedelta

class SystemClock(Clock):
    def now(self) -> datetime:
        return datetime.now(timezone.utc)

class MonotonicTestClock(Clock):
    def __init__(self, start: datetime | None = None) -> None:
        self._now = (start or datetime.now(timezone.utc)).replace(tzinfo=timezone.utc)
    def now(self) -> datetime:
        self._now += timedelta(microseconds=1)
        return self._now

4.3 ConsoleLogger & CollectingLogger

# src/funcpipe_rag/infra/adapters/logger.py
class ConsoleLogger(Logger):
    def log(self, entry: LogEntry) -> None:
        try:
            print(f"[{entry.level}] {entry.msg}")
        except OSError:
            pass  # best-effort – never crash the pipeline

class CollectingLogger(Logger):
    def __init__(self) -> None:
        self.entries: list[LogEntry] = []
    def log(self, entry: LogEntry) -> None:
        self.entries.append(entry)

4.4 Integration – Core Uses Only Protocols + Writer for Logging

# Illustrative example (not a repo file): core depends only on protocols.
def timed_embed_chunk(
    chunk: Chunk,
    clock: Clock,
) -> Writer[Result[EmbeddedChunk, ErrInfo], LogEntry]:
    start = clock.now()
    result = embed_chunk_pure(chunk)
    duration = clock.now() - start
    return tell(LogEntry("INFO", f"embedded doc_id={chunk.doc_id} in {duration}")).map(lambda _: result)

Shell drains Writer and sends to real logger:

# Illustrative shell sketch (not a repo file): drain Writer logs to a real logger adapter.
result, logs = run_writer(writer_program)
for entry in logs:
    logger.log(entry)

Never pass Logger into the core – logging stays pure via Writer.

5. Property-Based Proofs (selected)

# Repo references:
# - Clocks: `src/funcpipe_rag/infra/adapters/clock.py`
# - File storage: `src/funcpipe_rag/infra/adapters/file_storage.py`
# - In-memory storage: `src/funcpipe_rag/infra/adapters/memory_storage.py`
#
# This repo currently tests file-storage resource safety and parse errors in:
# - tests/unit/infra/adapters/test_file_storage.py

6. Big-O & Allocation Guarantees

Operation Time Call-stack Heap Allocation
Protocol dispatch O(1) O(1) O(1) O(1)

Protocols introduce no additional runtime overhead beyond normal method dispatch.

7. Anti-Patterns & Immediate Fixes

Anti-Pattern Symptom Fix
Concrete class in core Import from infra → coupling Depend only on protocol
God protocol One interface with 40 methods One capability → one protocol
Optional[T] for absence Cache miss indistinguishable Use proper Option[T]
Logger in core Impure logging in pure code Log via Writer; drain in shell

8. Pre-Core Quiz

  1. Capability protocols are…? → Typed interfaces for exactly one responsibility
  2. Core depends on…? → Only the protocol, never the concrete adapter
  3. Cache miss is modeled as…? → Option[Chunk] (never Chunk | None)
  4. Logging in core uses…? → Writer[_, LogEntry] – never Logger
  5. Real power comes from…? → Static checking + runtime swappability

9. Post-Core Exercise

  1. Define a MetricsProtocol and implement both no-op and Prometheus versions.
  2. Refactor one pipeline stage to use MonotonicTestClock in tests and prove timing logic via collected now() calls.
  3. Write a property test that proves CollectingLogger receives exactly the expected entries.
  4. Add a AppCapabilities dataclass bundling all your protocols and refactor one shell to inject a single object.

Next → M07C04: Resource Safety – Guaranteeing Cleanup on Partial Consumption

You now have statically checked, composable capability protocols for every common effect. Your core is pure, your shell injects real implementations, and mypy + Hypothesis guarantee you never accidentally reach into concrete infrastructure or conflate absence with null. This is the foundation every production FuncPipe system is built on.