Skip to content

Core 3: Observability – Tracing Through Pure Pipelines, Debuggable Composition

Module 10

Core question:
How do you add observability to pure functional pipelines, enabling side-effect-free tracing of intermediate states, metrics aggregation, and debuggable compositions without breaking purity or laziness?

In this core, we add observability patterns to the FuncPipe RAG Builder (now at funcpipe-rag-10). Observability includes tracing (taps for logging/sampling intermediates), metrics (monoidal accumulation of counts/errors), and debugging (naming/probing compositions). These are pure: traces as data (e.g., Writer monad from Module 6), metrics as folds (Module 4), probes as higher-order wrappers (Module 2). We refactor RAG to traceable stages, verifying neutrality (traced == untraced equivalence).

Motivation Bug: Opaque FP compositions hide failures (e.g., silent error propagation); impure logging breaks purity—pure observability enables debuggable pipelines without effects.

Delta from Core 2: Budgeted pipelines are performant; this makes them observable.

Observability Protocol (Contract, Entry/Exit Criteria): - Traces/Metrics: Pure data (e.g., Writer[Result[T, E], LogThunk]); aggregate monoidally. - Semantics: Neutral: traced pipeline == untraced (equivalence per Core 1); no added effects/laziness breaks. - Purity: Preserve; observability via higher-order (e.g., tap/map). - Error Model: Trace errors explicitly (e.g., tap_err). - Resource Safety: No change; traces bounded to avoid memory. - Integration: Add to RAG (trace chunks); verify with properties. - Mypy Config: --strict; Callable for probes. - Exit: Traces complete, equivalence holds, overhead in budget.

Audience: Engineers debugging/maintaining FP pipelines.

Outcome: 1. Add pure traces/metrics to compositions. 2. Debug via naming/probing. 3. Make RAG traceable, verify neutrality.


1. Laws & Invariants

Invariant Description Enforcement
Neutrality Inv Traced == untraced outputs (eq_pure/eq_error). Properties
Purity Inv No effects in traces; logs/metrics as data. Reviews/mypy
Bounded Inv Traces/metrics bounded (e.g., max_samples); no unbounded growth. Hypothesis sizes
Monoid Inv Metrics associative + identity; commutative if order irrelevant. Properties
Laziness Inv Probes preserve laziness assuming stages are lazy; no eager materialization. Tests with islice

These ensure observability is safe/pure.

Writer Laws (Normative for This Core)

Writer[T, LogThunk] (value: T, logs: LogThunk) with: - bind: Combines logs by thunk-concat: λ(): logs1() + logs2(). - identity: λ(): ().


2. Decision Table

Need Purity Req Overhead OK Recommended
Log intermediates High Low Tap (Mod 3)
Aggregate metrics High Medium Writer/fold (Mod 6/4)
Debug compositions High High Name/probe (Mod 2)
Error traces High Low Tap_err (Mod 4)

Choose by need; verify overhead in budget.


3. Public API (Wrappers for Observability)

Wrappers for traces; e.g., traceable stage.

from typing import Callable, Iterator, TypeVar, TypeAlias
from funcpipe_rag import Writer, run_writer

T = TypeVar("T")
U = TypeVar("U")
Log: TypeAlias = str  # Or structured ADT
LogThunk: TypeAlias = Callable[[], tuple[Log, ...]]


def traceable(stage: Callable[[Iterator[T]], Iterator[U]], max_samples: int = 100,
              show: Callable[[U], Log] = lambda x: repr(x)[:100]) -> Callable[
    [Iterator[T]], Writer[Iterator[U], LogThunk]]:
    def wrapped(it: Iterator[T]) -> Writer[Iterator[U], LogThunk]:
        logs = []
        count = 0
        out = stage(it)

        def gen():
            nonlocal count
            for x in out:
                if count < max_samples:
                    logs.append(show(x))
                    count += 1
                yield x

        return Writer(gen(), lambda: tuple(
            logs))  # Logs are lazy-coupled to consumption; inspecting logs before exhausting the stream yields a prefix. With thunked logs, calling logs_thunk() later yields a longer prefix.

    return wrapped
# Usage: traced_chunk, logs_thunk = run_writer(traceable(gen_chunks)(docs)); logs = logs_thunk()
# Default show is best-effort; for strict purity/perf, pass an explicit pure serializer.

4. Reference Implementations

4.1 Tracing Intermediates (Tap)

Pure logs as data.

def tap(max_samples: int = 100, show: Callable[[T], Log] = lambda x: repr(x)[:100]) -> Callable[[Iterator[T]], Writer[Iterator[T], LogThunk]]:
    return traceable(lambda it: it, max_samples, show)

4.2 Metrics Aggregation (Writer/Fold)

Monoidal metrics (e.g., error counts). Per-record Writer allocations can be expensive; for large streams prefer a fused fold stage for metrics.

from dataclasses import dataclass
from funcpipe_rag import Writer, run_writer, Result, Err


@dataclass
class Metrics:
    count: int = 0
    errs: int = 0

    def __add__(self, other: "Metrics") -> "Metrics":
        return Metrics(self.count + other.count, self.errs + other.errs)

    @classmethod
    def zero(cls) -> "Metrics":
        return cls()


def metriced(fn: Callable[[T], Result[U, ErrInfo]]) -> Callable[[T], Writer[Result[U, ErrInfo], Metrics]]:
    def wrapped(x: T) -> Writer[Result[U, ErrInfo], Metrics]:
        res = fn(x)
        metrics = Metrics(count=1, errs=1 if isinstance(res, Err) else 0)
        return Writer(res, metrics)

    return wrapped

4.3 Debuggable Compositions (Naming/Probe)

Name intermediates; probe with breakpoints (pure paired yield).

from typing import Any

S = TypeVar("S")
U = TypeVar("U")

def probe_pipe(names: list[str], *stages: Callable[[Any], Any], show: Callable[[Any], Log] = lambda x: repr(x)[:100]) -> Callable[[S], tuple[U, tuple[Log, ...]]]:
    assert len(names) == len(stages), "Names must match stages"
    def pipe(inp: S) -> tuple[U, tuple[Log, ...]]:
        val: Any = inp
        logs = []
        for name, stage in zip(names, stages):
            val = stage(val)
            logs.append(f"{name}: {show(val)}")
        return val, tuple(logs)
    return pipe

4.4 Error Traces (Tap_err)

Trace errors purely.

def tap_err(max_samples: int = 100, show: Callable[[ErrInfo], Log] = lambda e: repr(e)[:100]) -> Callable[[Iterator[Result[T, ErrInfo]]], Writer[Iterator[Result[T, ErrInfo]], LogThunk]]:
    def wrapped(it: Iterator[Result[T, ErrInfo]]) -> Writer[Iterator[Result[T, ErrInfo]], LogThunk]:
        logs = []
        count = 0
        def gen():
            nonlocal count
            for r in it:
                if isinstance(r, Err) and count < max_samples:
                    logs.append(show(r.error))
                    count += 1
                yield r
        return Writer(gen(), lambda: tuple(logs))
    return wrapped

RAG Integration

Add traces to chunking.

# Before: Opaque
def rag_chunk(docs: Iterator[CleanDoc]) -> Iterator[Chunk]:
    return gen_chunks(docs)

# After: Traceable
def rag_chunk_traceable(docs: Iterator[CleanDoc]) -> Writer[Iterator[Chunk], LogThunk]:
    return traceable(gen_chunks)(docs)

chunks, logs_thunk = run_writer(rag_chunk_traceable(docs))
logs = logs_thunk()
# Verify: logs complete, neutrality holds

Composing Traced Stages

Chain Writers across stages.

def compose_traced(stage1: Callable[[Iterator[S]], Writer[Iterator[T], LogThunk]], stage2: Callable[[Iterator[T]], Writer[Iterator[U], LogThunk]]) -> Callable[[Iterator[S]], Writer[Iterator[U], LogThunk]]:
    return lambda it: stage1(it).bind(stage2)


5. Property-Based Proofs (tests/test_module_10_core3.py)

Hypothesis for neutrality.

@given(items=item_list_strategy())
def test_trace_neutral(items):
    traced, logs_thunk = run_writer(traceable(gen_process)(iter(items)))
    traced_list = list(traced)
    logs = logs_thunk()
    untraced = list(gen_process(iter(items)))
    assert traced_list == untraced  # Neutrality
    assert len(logs) <= 100  # Bounded
    assert len(logs) == min(len(items), 100)  # Complete up to bound

@given(items=item_list_strategy())
def test_trace_lazy(items):
    counter = [0]
    def counted_iter():
        for x in items:
            counter[0] += 1
            yield x
    traced, _ = run_writer(traceable(gen_process)(counted_iter()))
    partial = list(islice(traced, 5))
    assert len(partial) == 5  # Laziness preserved; no full eval
    assert counter[0] == 5  # No overpull


6. Runtime Preservation Guarantee

Overhead in budget (e.g., tracing <5% latency); pure data avoids effects.


7. Anti-Patterns & Immediate Fixes

Anti-Pattern Symptom Fix
Impure logging Breaks purity Use Writer/tap
Unbounded traces Memory blowup Bound samples
Opaque compositions Hard debug Name/probe stages
Ignore errors Silent failures Tap_err explicitly

8. Pre-Core Quiz

  1. Tracing via…? → Tap/Writer
  2. Metrics as…? → Monoids/folds
  3. Debug with…? → Naming/probing
  4. Neutrality means…? → Traced == untraced
  5. Bound traces for…? → Memory safety

9. Post-Core Exercise

  1. Add trace to RAG stage.
  2. Aggregate metrics; verify neutrality.
  3. Debug a composition.

Pipeline Usage (Idiomatic)

chunks, logs_thunk = run_writer(traceable(gen_chunks)(docs))
logs = logs_thunk()

Next: core 4. Property-Based Regression & Invariant Testing for Pure Pipelines (Hypothesis in CI)