Skip to content

M06C08: Writer Pattern – Accumulating Logs/Metrics as Pure Data

Progression Note

Module 6 shifts from pure data modelling to effect-aware composition.
We now treat failure, absence, and observability as first-class effects that propagate automatically through pipelines — eliminating nested conditionals forever.

Module Focus Key Outcomes
5 Algebraic Data Modelling ADTs, exhaustive pattern matching, total functions, refined types
6 Monadic Flows as Composable Pipelines bind/and_then, Reader/State-like patterns, error-typed flows
7 Effect Boundaries & Resource Safety Dependency injection, boundaries, testing, evolution

Core question
How do you add rich, ordered logging/tracing/metrics to monadic pipelines in a completely pure, composable, testable way — treating logs as accumulated data instead of side effects?

This is the core that finally gives you production-grade observability without sacrificing purity. After this core, every pipeline you write can emit detailed, deterministic logs/metrics that are fully testable — with zero print, zero global loggers, and zero hidden side effects.

Audience: Engineers who need observability in production pipelines but refuse to compromise on purity and testability.

Outcome 1. You will add logging/tracing with tell and listen. 2. You will have mechanical proof that your Writer compositions satisfy the monad laws — meaning logs always appear in exactly the order you expect. 3. You will stack Writer with Result/State/Reader for fully observable multi-effect pipelines.

Why Writer Is the Final Effect

  • tell: Append a log entry — pure because logs are just data.
  • listen / censor: Inspect or filter the accumulated logs — pure transformations.
  • and_then: Automatically appends logs from both sides — exactly like State, but the "state" is immutable and append-only.

Use Writer when you need observability.
For config → Reader. For failure → Result. For local state → State.
The trade-off is a bit more combinator noise; in return you get perfectly deterministic, testable logs.
A very common logging-heavy shape is Reader[Config, Writer[Result[T, ErrInfo]]] when you want both config and logs even on error paths.

1. Laws & Invariants (machine-checked in CI)

Law Formal Statement Why it matters
Left Identity pure(x).and_then(f) == f(x) Safe to lift plain values
Right Identity w.and_then(pure) == w Safe to extract sub-pipelines
Associativity w.and_then(f).and_then(g) == w.and_then(lambda x: f(x).and_then(g)) Log order never changes with grouping
Tell Append run_writer(tell(e1).and_then(lambda _: tell(e2))) == (None, (e1, e2)) Logs concatenate in execution order
Listen Roundtrip listen(w).map(lambda pair: pair[0]) == w Listening does not change values or logs

All laws verified with Hypothesis. A single counterexample breaks CI.

2. Public API – Writer is a one-field dataclass (mypy --strict clean)

# src/funcpipe_rag/fp/effects/writer.py – end-of-Module-06 (mypy --strict clean target)

from __future__ import annotations
from dataclasses import dataclass
from typing import Callable, Generic, Tuple, TypeVar

T = TypeVar("T")
U = TypeVar("U")

LogEntry = str
Log = Tuple[LogEntry, ...]   # fixed monoid: identity = (), append = +
# In general, Writer can use any log type that forms a monoid
# (has an identity element and an associative combine operation);
# we fix it to a tuple of strings here to keep the Python implementation simple.

@dataclass(frozen=True)
class Writer(Generic[T]):
    run: Callable[[], Tuple[T, Log]]

    def map(self, f: Callable[[T], U]) -> "Writer[U]":
        def run() -> Tuple[U, Log]:
            value, log = self.run()
            return f(value), log
        return Writer(run)

    def and_then(self, f: Callable[[T], "Writer[U]"]) -> "Writer[U]":
        def run() -> Tuple[U, Log]:
            value, log1 = self.run()
            next_writer = f(value)
            next_value, log2 = next_writer.run()
            return next_value, log1 + log2
        return Writer(run)

# Primitives
def pure(x: T) -> Writer[T]:
    return Writer(lambda: (x, ()))

def tell(entry: LogEntry) -> Writer[None]:
    return Writer(lambda: (None, (entry,)))

def tell_many(entries: Log) -> Writer[None]:
    return Writer(lambda: (None, entries))

def listen(p: Writer[T]) -> Writer[Tuple[T, Log]]:
    def run() -> Tuple[Tuple[T, Log], Log]:
        value, log = p.run()
        return (value, log), log
    return Writer(run)

def censor(f: Callable[[Log], Log], p: Writer[T]) -> Writer[T]:
    def run() -> Tuple[T, Log]:
        value, log = p.run()
        return value, f(log)
    return Writer(run)

def run_writer(p: Writer[T]) -> Tuple[T, Log]:
    return p.run()

3. Writer + Result – Observable Error Handling

E = TypeVar("E")

def wr_pure(x: T) -> Writer[Result[T, E]]:
    return Writer(lambda: (Ok(x), ()))

def wr_map(p: Writer[Result[T, E]], f: Callable[[T], U]) -> Writer[Result[U, E]]:
    def run() -> Tuple[Result[U, E], Log]:
        r, log = p.run()
        return r.map(f), log
    return Writer(run)

def wr_and_then(p: Writer[Result[T, E]], k: Callable[[T], Writer[Result[U, E]]]) -> Writer[Result[U, E]]:
    def run() -> Tuple[Result[U, E], Log]:
        r, log1 = p.run()
        if isinstance(r, Err):
            return Err(r.error), log1                          # logs before Err preserved
        next_writer = k(r.value)
        next_r, log2 = next_writer.run()
        return next_r, log1 + log2
    return Writer(run)

4. Real-World Example – Logging in RAG Pipeline

def embed_chunk(chunk: Chunk) -> Writer[Result[EmbeddedChunk, ErrInfo]]:
    return (
        tell(f"start embed chunk_id={chunk.id}")
        .and_then(lambda _: pure(tokenize(chunk.text.content)))
        .and_then(lambda tokens: tell(f"tokenized count={len(tokens)}").map(lambda _: tokens))
        .and_then(lambda tokens: pure(model.encode(tokens)))
        .and_then(lambda vec: tell(f"encoded dim={len(vec)}").map(lambda _: vec))
        .map(lambda vec: replace(chunk, embedding=Embedding(vec, current_model)))
        .map(Ok)
    )

(result, logs) = run_writer(embed_chunk(some_chunk))
print(logs)
# ("start embed chunk_id=...", "tokenized count=123", "encoded dim=768")

5. Before → After – Print Debugging vs Pure Logging

# BEFORE – print side effects
def embed_chunk(chunk: Chunk) -> Result[EmbeddedChunk, ErrInfo]:
    print(f"start embed chunk_id={chunk.id}")
    tokens = tokenize(chunk.text.content)
    print(f"tokenized count={len(tokens)}")
    vec = model.encode(tokens)
    print(f"encoded dim={len(vec)}")
    return Ok(replace(chunk, embedding=Embedding(vec, current_model)))

# AFTER – pure, testable logs
def embed_chunk(chunk: Chunk) -> Writer[Result[EmbeddedChunk, ErrInfo]]:
    return (
        tell(f"start embed chunk_id={chunk.id}")
        .and_then(lambda _: pure(tokenize(chunk.text.content)))
        .and_then(lambda tokens: tell(f"tokenized count={len(tokens)}").map(lambda _: tokens))
        .and_then(lambda tokens: pure(model.encode(tokens)))
        .and_then(lambda vec: tell(f"encoded dim={len(vec)}").map(lambda _: vec))
        .map(lambda vec: replace(chunk, embedding=Embedding(vec, current_model)))
        .map(Ok)
    )

Zero prints. Full testability. Logs exactly match execution order.

6. Property-Based Proofs & Key Examples (selected)

from hypothesis import given, strategies as st

@given(x=st.integers())
def test_writer_left_identity(x):
    f = lambda n: Writer(lambda: (n + 1, ("inc",)))
    assert run_writer(pure(x).and_then(f)) == run_writer(f(x))

@given(entries=st.lists(st.text()))
def test_writer_right_identity(entries):
    w = Writer(lambda: (42, tuple(entries)))
    assert run_writer(w.and_then(pure)) == run_writer(w)

@given(entries=st.lists(st.text()))
def test_writer_associativity(entries):
    w = Writer(lambda: (42, tuple(entries)))
    f = lambda a: Writer(lambda: (a + 1, ("f",)))
    g = lambda b: Writer(lambda: (b * 2, ("g",)))
    assert run_writer(w.and_then(f).and_then(g)) == run_writer(w.and_then(lambda x: f(x).and_then(g)))

@given(e1=st.text(), e2=st.text())
def test_writer_tell_append(e1, e2):
    assert run_writer(tell(e1).and_then(lambda _: tell(e2))) == (None, (e1, e2))

@given(entries=st.lists(st.text()))
def test_writer_listen_roundtrip(entries):
    w = Writer(lambda: (42, tuple(entries)))
    assert run_writer(listen(w).map(lambda pair: pair[0])) == run_writer(w)

7. Anti-Patterns & Immediate Fixes

Anti-Pattern Symptom Fix
print() / logging.info() Impure, untestable Use tell / Writer
Global logger Hidden side effects Accumulate logs as data
Manual log concatenation Verbose, error-prone Writer composes automatically

8. Pre-Core Quiz

  1. Writer replaces…? → print() and global loggers
  2. You add a log with…? → tell("message")
  3. You inspect logs with…? → listen(p)
  4. Logs are appended with…? → and_then
  5. The golden rule? → Never print() in a pipeline again

9. Post-Core Exercise

  1. Take your most print-heavy pipeline and rewrite it with Writer + tell.
  2. Add structured metrics (e.g. token counts) to an embedding pipeline.
  3. Use listen + censor to filter sensitive data from logs in a test.

Next: M06C09 – Refactoring try/except Spaghetti into Pure Monadic Pipelines

You have now added pure, testable observability to every pipeline. Logs and metrics are now first-class data — composable, deterministic, and proven correct by Hypothesis. The remaining cores are full-pipeline refactors and architectural patterns.