M06C08: Writer Pattern – Accumulating Logs/Metrics as Pure Data¶
Progression Note¶
Module 6 shifts from pure data modelling to effect-aware composition.
We now treat failure, absence, and observability as first-class effects that propagate automatically through pipelines — eliminating nested conditionals forever.
| Module | Focus | Key Outcomes |
|---|---|---|
| 5 | Algebraic Data Modelling | ADTs, exhaustive pattern matching, total functions, refined types |
| 6 | Monadic Flows as Composable Pipelines | bind/and_then, Reader/State-like patterns, error-typed flows |
| 7 | Effect Boundaries & Resource Safety | Dependency injection, boundaries, testing, evolution |
Core question
How do you add rich, ordered logging/tracing/metrics to monadic pipelines in a completely pure, composable, testable way — treating logs as accumulated data instead of side effects?
This is the core that finally gives you production-grade observability without sacrificing purity. After this core, every pipeline you write can emit detailed, deterministic logs/metrics that are fully testable — with zero print, zero global loggers, and zero hidden side effects.
Audience: Engineers who need observability in production pipelines but refuse to compromise on purity and testability.
Outcome
1. You will add logging/tracing with tell and listen.
2. You will have mechanical proof that your Writer compositions satisfy the monad laws — meaning logs always appear in exactly the order you expect.
3. You will stack Writer with Result/State/Reader for fully observable multi-effect pipelines.
Why Writer Is the Final Effect¶
- tell: Append a log entry — pure because logs are just data.
- listen / censor: Inspect or filter the accumulated logs — pure transformations.
- and_then: Automatically appends logs from both sides — exactly like State, but the "state" is immutable and append-only.
Use Writer when you need observability.
For config → Reader. For failure → Result. For local state → State.
The trade-off is a bit more combinator noise; in return you get perfectly deterministic, testable logs.
A very common logging-heavy shape is Reader[Config, Writer[Result[T, ErrInfo]]] when you want both config and logs even on error paths.
1. Laws & Invariants (machine-checked in CI)¶
| Law | Formal Statement | Why it matters |
|---|---|---|
| Left Identity | pure(x).and_then(f) == f(x) |
Safe to lift plain values |
| Right Identity | w.and_then(pure) == w |
Safe to extract sub-pipelines |
| Associativity | w.and_then(f).and_then(g) == w.and_then(lambda x: f(x).and_then(g)) |
Log order never changes with grouping |
| Tell Append | run_writer(tell(e1).and_then(lambda _: tell(e2))) == (None, (e1, e2)) |
Logs concatenate in execution order |
| Listen Roundtrip | listen(w).map(lambda pair: pair[0]) == w |
Listening does not change values or logs |
All laws verified with Hypothesis. A single counterexample breaks CI.
2. Public API – Writer is a one-field dataclass (mypy --strict clean)¶
# src/funcpipe_rag/fp/effects/writer.py – end-of-Module-06 (mypy --strict clean target)
from __future__ import annotations
from dataclasses import dataclass
from typing import Callable, Generic, Tuple, TypeVar
T = TypeVar("T")
U = TypeVar("U")
LogEntry = str
Log = Tuple[LogEntry, ...] # fixed monoid: identity = (), append = +
# In general, Writer can use any log type that forms a monoid
# (has an identity element and an associative combine operation);
# we fix it to a tuple of strings here to keep the Python implementation simple.
@dataclass(frozen=True)
class Writer(Generic[T]):
run: Callable[[], Tuple[T, Log]]
def map(self, f: Callable[[T], U]) -> "Writer[U]":
def run() -> Tuple[U, Log]:
value, log = self.run()
return f(value), log
return Writer(run)
def and_then(self, f: Callable[[T], "Writer[U]"]) -> "Writer[U]":
def run() -> Tuple[U, Log]:
value, log1 = self.run()
next_writer = f(value)
next_value, log2 = next_writer.run()
return next_value, log1 + log2
return Writer(run)
# Primitives
def pure(x: T) -> Writer[T]:
return Writer(lambda: (x, ()))
def tell(entry: LogEntry) -> Writer[None]:
return Writer(lambda: (None, (entry,)))
def tell_many(entries: Log) -> Writer[None]:
return Writer(lambda: (None, entries))
def listen(p: Writer[T]) -> Writer[Tuple[T, Log]]:
def run() -> Tuple[Tuple[T, Log], Log]:
value, log = p.run()
return (value, log), log
return Writer(run)
def censor(f: Callable[[Log], Log], p: Writer[T]) -> Writer[T]:
def run() -> Tuple[T, Log]:
value, log = p.run()
return value, f(log)
return Writer(run)
def run_writer(p: Writer[T]) -> Tuple[T, Log]:
return p.run()
3. Writer + Result – Observable Error Handling¶
E = TypeVar("E")
def wr_pure(x: T) -> Writer[Result[T, E]]:
return Writer(lambda: (Ok(x), ()))
def wr_map(p: Writer[Result[T, E]], f: Callable[[T], U]) -> Writer[Result[U, E]]:
def run() -> Tuple[Result[U, E], Log]:
r, log = p.run()
return r.map(f), log
return Writer(run)
def wr_and_then(p: Writer[Result[T, E]], k: Callable[[T], Writer[Result[U, E]]]) -> Writer[Result[U, E]]:
def run() -> Tuple[Result[U, E], Log]:
r, log1 = p.run()
if isinstance(r, Err):
return Err(r.error), log1 # logs before Err preserved
next_writer = k(r.value)
next_r, log2 = next_writer.run()
return next_r, log1 + log2
return Writer(run)
4. Real-World Example – Logging in RAG Pipeline¶
def embed_chunk(chunk: Chunk) -> Writer[Result[EmbeddedChunk, ErrInfo]]:
return (
tell(f"start embed chunk_id={chunk.id}")
.and_then(lambda _: pure(tokenize(chunk.text.content)))
.and_then(lambda tokens: tell(f"tokenized count={len(tokens)}").map(lambda _: tokens))
.and_then(lambda tokens: pure(model.encode(tokens)))
.and_then(lambda vec: tell(f"encoded dim={len(vec)}").map(lambda _: vec))
.map(lambda vec: replace(chunk, embedding=Embedding(vec, current_model)))
.map(Ok)
)
(result, logs) = run_writer(embed_chunk(some_chunk))
print(logs)
# ("start embed chunk_id=...", "tokenized count=123", "encoded dim=768")
5. Before → After – Print Debugging vs Pure Logging¶
# BEFORE – print side effects
def embed_chunk(chunk: Chunk) -> Result[EmbeddedChunk, ErrInfo]:
print(f"start embed chunk_id={chunk.id}")
tokens = tokenize(chunk.text.content)
print(f"tokenized count={len(tokens)}")
vec = model.encode(tokens)
print(f"encoded dim={len(vec)}")
return Ok(replace(chunk, embedding=Embedding(vec, current_model)))
# AFTER – pure, testable logs
def embed_chunk(chunk: Chunk) -> Writer[Result[EmbeddedChunk, ErrInfo]]:
return (
tell(f"start embed chunk_id={chunk.id}")
.and_then(lambda _: pure(tokenize(chunk.text.content)))
.and_then(lambda tokens: tell(f"tokenized count={len(tokens)}").map(lambda _: tokens))
.and_then(lambda tokens: pure(model.encode(tokens)))
.and_then(lambda vec: tell(f"encoded dim={len(vec)}").map(lambda _: vec))
.map(lambda vec: replace(chunk, embedding=Embedding(vec, current_model)))
.map(Ok)
)
Zero prints. Full testability. Logs exactly match execution order.
6. Property-Based Proofs & Key Examples (selected)¶
from hypothesis import given, strategies as st
@given(x=st.integers())
def test_writer_left_identity(x):
f = lambda n: Writer(lambda: (n + 1, ("inc",)))
assert run_writer(pure(x).and_then(f)) == run_writer(f(x))
@given(entries=st.lists(st.text()))
def test_writer_right_identity(entries):
w = Writer(lambda: (42, tuple(entries)))
assert run_writer(w.and_then(pure)) == run_writer(w)
@given(entries=st.lists(st.text()))
def test_writer_associativity(entries):
w = Writer(lambda: (42, tuple(entries)))
f = lambda a: Writer(lambda: (a + 1, ("f",)))
g = lambda b: Writer(lambda: (b * 2, ("g",)))
assert run_writer(w.and_then(f).and_then(g)) == run_writer(w.and_then(lambda x: f(x).and_then(g)))
@given(e1=st.text(), e2=st.text())
def test_writer_tell_append(e1, e2):
assert run_writer(tell(e1).and_then(lambda _: tell(e2))) == (None, (e1, e2))
@given(entries=st.lists(st.text()))
def test_writer_listen_roundtrip(entries):
w = Writer(lambda: (42, tuple(entries)))
assert run_writer(listen(w).map(lambda pair: pair[0])) == run_writer(w)
7. Anti-Patterns & Immediate Fixes¶
| Anti-Pattern | Symptom | Fix |
|---|---|---|
| print() / logging.info() | Impure, untestable | Use tell / Writer |
| Global logger | Hidden side effects | Accumulate logs as data |
| Manual log concatenation | Verbose, error-prone | Writer composes automatically |
8. Pre-Core Quiz¶
- Writer replaces…? → print() and global loggers
- You add a log with…? → tell("message")
- You inspect logs with…? → listen(p)
- Logs are appended with…? → and_then
- The golden rule? → Never print() in a pipeline again
9. Post-Core Exercise¶
- Take your most print-heavy pipeline and rewrite it with Writer + tell.
- Add structured metrics (e.g. token counts) to an embedding pipeline.
- Use listen + censor to filter sensitive data from logs in a test.
Next: M06C09 – Refactoring try/except Spaghetti into Pure Monadic Pipelines
You have now added pure, testable observability to every pipeline. Logs and metrics are now first-class data — composable, deterministic, and proven correct by Hypothesis. The remaining cores are full-pipeline refactors and architectural patterns.