Core 3: Observability – Tracing Through Pure Pipelines, Debuggable Composition¶
Module 10
Core question:
How do you add observability to pure functional pipelines, enabling side-effect-free tracing of intermediate states, metrics aggregation, and debuggable compositions without breaking purity or laziness?
In this core, we add observability patterns to the FuncPipe RAG Builder (now at funcpipe-rag-10). Observability includes tracing (taps for logging/sampling intermediates), metrics (monoidal accumulation of counts/errors), and debugging (naming/probing compositions). These are pure: traces as data (e.g., Writer monad from Module 6), metrics as folds (Module 4), probes as higher-order wrappers (Module 2). We refactor RAG to traceable stages, verifying neutrality (traced == untraced equivalence).
Motivation Bug: Opaque FP compositions hide failures (e.g., silent error propagation); impure logging breaks purity—pure observability enables debuggable pipelines without effects.
Delta from Core 2: Budgeted pipelines are performant; this makes them observable.
Observability Protocol (Contract, Entry/Exit Criteria): - Traces/Metrics: Pure data (e.g., Writer[Result[T, E], LogThunk]); aggregate monoidally. - Semantics: Neutral: traced pipeline == untraced (equivalence per Core 1); no added effects/laziness breaks. - Purity: Preserve; observability via higher-order (e.g., tap/map). - Error Model: Trace errors explicitly (e.g., tap_err). - Resource Safety: No change; traces bounded to avoid memory. - Integration: Add to RAG (trace chunks); verify with properties. - Mypy Config: --strict; Callable for probes. - Exit: Traces complete, equivalence holds, overhead in budget.
Audience: Engineers debugging/maintaining FP pipelines.
Outcome: 1. Add pure traces/metrics to compositions. 2. Debug via naming/probing. 3. Make RAG traceable, verify neutrality.
1. Laws & Invariants¶
| Invariant | Description | Enforcement |
|---|---|---|
| Neutrality Inv | Traced == untraced outputs (eq_pure/eq_error). | Properties |
| Purity Inv | No effects in traces; logs/metrics as data. | Reviews/mypy |
| Bounded Inv | Traces/metrics bounded (e.g., max_samples); no unbounded growth. | Hypothesis sizes |
| Monoid Inv | Metrics associative + identity; commutative if order irrelevant. | Properties |
| Laziness Inv | Probes preserve laziness assuming stages are lazy; no eager materialization. | Tests with islice |
These ensure observability is safe/pure.
Writer Laws (Normative for This Core)¶
Writer[T, LogThunk] (value: T, logs: LogThunk) with: - bind: Combines logs by thunk-concat: λ(): logs1() + logs2(). - identity: λ(): ().
2. Decision Table¶
| Need | Purity Req | Overhead OK | Recommended |
|---|---|---|---|
| Log intermediates | High | Low | Tap (Mod 3) |
| Aggregate metrics | High | Medium | Writer/fold (Mod 6/4) |
| Debug compositions | High | High | Name/probe (Mod 2) |
| Error traces | High | Low | Tap_err (Mod 4) |
Choose by need; verify overhead in budget.
3. Public API (Wrappers for Observability)¶
Wrappers for traces; e.g., traceable stage.
from typing import Callable, Iterator, TypeVar, TypeAlias
from funcpipe_rag import Writer, run_writer
T = TypeVar("T")
U = TypeVar("U")
Log: TypeAlias = str # Or structured ADT
LogThunk: TypeAlias = Callable[[], tuple[Log, ...]]
def traceable(stage: Callable[[Iterator[T]], Iterator[U]], max_samples: int = 100,
show: Callable[[U], Log] = lambda x: repr(x)[:100]) -> Callable[
[Iterator[T]], Writer[Iterator[U], LogThunk]]:
def wrapped(it: Iterator[T]) -> Writer[Iterator[U], LogThunk]:
logs = []
count = 0
out = stage(it)
def gen():
nonlocal count
for x in out:
if count < max_samples:
logs.append(show(x))
count += 1
yield x
return Writer(gen(), lambda: tuple(
logs)) # Logs are lazy-coupled to consumption; inspecting logs before exhausting the stream yields a prefix. With thunked logs, calling logs_thunk() later yields a longer prefix.
return wrapped
# Usage: traced_chunk, logs_thunk = run_writer(traceable(gen_chunks)(docs)); logs = logs_thunk()
# Default show is best-effort; for strict purity/perf, pass an explicit pure serializer.
4. Reference Implementations¶
4.1 Tracing Intermediates (Tap)¶
Pure logs as data.
def tap(max_samples: int = 100, show: Callable[[T], Log] = lambda x: repr(x)[:100]) -> Callable[[Iterator[T]], Writer[Iterator[T], LogThunk]]:
return traceable(lambda it: it, max_samples, show)
4.2 Metrics Aggregation (Writer/Fold)¶
Monoidal metrics (e.g., error counts). Per-record Writer allocations can be expensive; for large streams prefer a fused fold stage for metrics.
from dataclasses import dataclass
from funcpipe_rag import Writer, run_writer, Result, Err
@dataclass
class Metrics:
count: int = 0
errs: int = 0
def __add__(self, other: "Metrics") -> "Metrics":
return Metrics(self.count + other.count, self.errs + other.errs)
@classmethod
def zero(cls) -> "Metrics":
return cls()
def metriced(fn: Callable[[T], Result[U, ErrInfo]]) -> Callable[[T], Writer[Result[U, ErrInfo], Metrics]]:
def wrapped(x: T) -> Writer[Result[U, ErrInfo], Metrics]:
res = fn(x)
metrics = Metrics(count=1, errs=1 if isinstance(res, Err) else 0)
return Writer(res, metrics)
return wrapped
4.3 Debuggable Compositions (Naming/Probe)¶
Name intermediates; probe with breakpoints (pure paired yield).
from typing import Any
S = TypeVar("S")
U = TypeVar("U")
def probe_pipe(names: list[str], *stages: Callable[[Any], Any], show: Callable[[Any], Log] = lambda x: repr(x)[:100]) -> Callable[[S], tuple[U, tuple[Log, ...]]]:
assert len(names) == len(stages), "Names must match stages"
def pipe(inp: S) -> tuple[U, tuple[Log, ...]]:
val: Any = inp
logs = []
for name, stage in zip(names, stages):
val = stage(val)
logs.append(f"{name}: {show(val)}")
return val, tuple(logs)
return pipe
4.4 Error Traces (Tap_err)¶
Trace errors purely.
def tap_err(max_samples: int = 100, show: Callable[[ErrInfo], Log] = lambda e: repr(e)[:100]) -> Callable[[Iterator[Result[T, ErrInfo]]], Writer[Iterator[Result[T, ErrInfo]], LogThunk]]:
def wrapped(it: Iterator[Result[T, ErrInfo]]) -> Writer[Iterator[Result[T, ErrInfo]], LogThunk]:
logs = []
count = 0
def gen():
nonlocal count
for r in it:
if isinstance(r, Err) and count < max_samples:
logs.append(show(r.error))
count += 1
yield r
return Writer(gen(), lambda: tuple(logs))
return wrapped
RAG Integration¶
Add traces to chunking.
# Before: Opaque
def rag_chunk(docs: Iterator[CleanDoc]) -> Iterator[Chunk]:
return gen_chunks(docs)
# After: Traceable
def rag_chunk_traceable(docs: Iterator[CleanDoc]) -> Writer[Iterator[Chunk], LogThunk]:
return traceable(gen_chunks)(docs)
chunks, logs_thunk = run_writer(rag_chunk_traceable(docs))
logs = logs_thunk()
# Verify: logs complete, neutrality holds
Composing Traced Stages¶
Chain Writers across stages.
def compose_traced(stage1: Callable[[Iterator[S]], Writer[Iterator[T], LogThunk]], stage2: Callable[[Iterator[T]], Writer[Iterator[U], LogThunk]]) -> Callable[[Iterator[S]], Writer[Iterator[U], LogThunk]]:
return lambda it: stage1(it).bind(stage2)
5. Property-Based Proofs (tests/test_module_10_core3.py)¶
Hypothesis for neutrality.
@given(items=item_list_strategy())
def test_trace_neutral(items):
traced, logs_thunk = run_writer(traceable(gen_process)(iter(items)))
traced_list = list(traced)
logs = logs_thunk()
untraced = list(gen_process(iter(items)))
assert traced_list == untraced # Neutrality
assert len(logs) <= 100 # Bounded
assert len(logs) == min(len(items), 100) # Complete up to bound
@given(items=item_list_strategy())
def test_trace_lazy(items):
counter = [0]
def counted_iter():
for x in items:
counter[0] += 1
yield x
traced, _ = run_writer(traceable(gen_process)(counted_iter()))
partial = list(islice(traced, 5))
assert len(partial) == 5 # Laziness preserved; no full eval
assert counter[0] == 5 # No overpull
6. Runtime Preservation Guarantee¶
Overhead in budget (e.g., tracing <5% latency); pure data avoids effects.
7. Anti-Patterns & Immediate Fixes¶
| Anti-Pattern | Symptom | Fix |
|---|---|---|
| Impure logging | Breaks purity | Use Writer/tap |
| Unbounded traces | Memory blowup | Bound samples |
| Opaque compositions | Hard debug | Name/probe stages |
| Ignore errors | Silent failures | Tap_err explicitly |
8. Pre-Core Quiz¶
- Tracing via…? → Tap/Writer
- Metrics as…? → Monoids/folds
- Debug with…? → Naming/probing
- Neutrality means…? → Traced == untraced
- Bound traces for…? → Memory safety
9. Post-Core Exercise¶
- Add trace to RAG stage.
- Aggregate metrics; verify neutrality.
- Debug a composition.
Pipeline Usage (Idiomatic)
Next: core 4. Property-Based Regression & Invariant Testing for Pure Pipelines (Hypothesis in CI)