Structured Error Reports
M04C10: Structured Error Reports from Streaming Pipelines (Errors as First-Class Data)¶
Module 04 Recap – What You Can Now Do¶
You have completed the full error-handling stack:
- M04C01–C02: Safe recursion over trees — constant stack, full laziness, machine-checked termination.
- M04C03: Memoization — pure, persistent, observationally invisible caching.
- M04C04: Result/Option — turn every per-record failure into a value; streams never halt.
- M04C05: Streaming combinators — one-pass routing, recovery, parallelism over mixed good/bad data.
- M04C06: Aggregation strategies — fail-fast or accumulate all errors with mathematical guarantees.
- M04C07: Circuit-breakers — abort the moment a run becomes hopeless.
- M04C08: Resource safety — zero leaks, even on early termination or exceptions.
- M04C09: Pure retries — bounded, fair, composable, with full provenance.
- M04C10: Structured reports — turn every error into beautiful, serialisable, grouped diagnostics.
You can now process real-world messy data at scale with:
- Zero recursion blowups
- Zero cache misses on duplicates
- Zero lost records on failures
- Zero resource leaks
- Zero wasted work on doomed runs
- Perfect structured error reports
This is production-grade functional data processing in Python — the kind that ships to millions of documents and survives anything the real world throws at it.
Core question:
How do you turn every error in a streaming pipeline into structured, serialisable, grouped reports — complete with counts, ordered samples, and retry metadata — while keeping the pipeline pure and memory-bounded?
We now take the fully-featured Iterator[Result[Chunk, ErrInfo | BreakInfo[ErrInfo]]] stream from M04C05–C09 and answer the final question:
“I have captured every possible failure with full provenance. Now how do I turn that into actionable diagnostics without ad-hoc logging or unbounded memory?”
The naïve solution is scattered logging:
This loses grouping, ordering, counts, and retry context — and blows up logs on high error rates.
The production solution folds the Result stream into an immutable ErrReport — pure data with groups by code/stage/path, bounded samples, and aggregated retry statistics.
Audience: Engineers who need complete, structured failure diagnostics from batch RAG runs.
Outcome:
1. You will aggregate every error into a rich, bounded, serialisable report with one fold.
2. You will group by code/stage/path_prefix and extract retry metadata automatically.
3. You will ship perfect JSON error reports that survive 1 % or 50 % failure rates.
We formalise exactly what we want from correct, production-ready error reports: completeness, bounded memory, ordering, and purity.
Concrete Motivating Example¶
Same 100 000 chunk tree from previous cores:
- 95 000 embed successfully.
- 4 800 transient failures → retried and succeeded.
- 200 genuine failures (invalid content).
Desired final report (JSON):
{
"total_errs": 200,
"total_items": 100000,
"error_rate": 0.002,
"avg_attempts": 2.41,
"by_code": {
"INVALID_CONTENT": {"count": 200, "samples": [...]}
},
"by_stage": {
"embed": {"count": 200, "samples": [...]}
},
"by_path_prefix": {
"2.4": {"count": 180, "samples": [...]}
}
}
One fold, bounded memory, perfect provenance.
1. Laws & Invariants (machine-checked)¶
| Law | Formal Statement | Enforcement |
|---|---|---|
| Completeness | Every Err in a finite stream contributes exactly once to each relevant group’s count; samples are a prefix of encounter order up to max_samples per group. |
test_report_completeness, test_report_sample_ordering. |
| Purity | Same input stream → identical report (deterministic, no side effects). | Reproducibility test. |
| Bounded Memory | Memory ≤ O(#groups × max_samples). | test_report_bounded_memory. |
| Ordering | Samples within each group appear in original encounter order. | test_report_sample_ordering. |
These laws guarantee your reports are complete, reproducible, and memory-safe.
2. Decision Table – Which Report Do You Actually Need?¶
| Need | Bounded Memory? | Need Samples? | Recommended Fold |
|---|---|---|---|
| Just counts by code | Yes | No | fold_error_counts |
| Full grouped report with samples | Yes | Yes | fold_error_report(max_samples=20) |
| Everything (including successes) | No | Yes | collect_both (Core 6) |
Cap samples aggressively in production — 10–50 per group is plenty for debugging.
3. Public API Surface (end-of-Module-04 refactor note)¶
Refactor note: error reports live in funcpipe_rag.policies.reports (src/funcpipe_rag/policies/reports.py) and are re-exported from funcpipe_rag.api.core.
from funcpipe_rag.api.core import ErrGroup, ErrReport, fold_error_counts, fold_error_report, report_to_jsonable
4. Reference Implementations¶
4.1 Error Normalisation (for grouping)¶
def _normalize_err(e: Any) -> tuple[str, str, tuple[int, ...]]:
code = getattr(e, "code", "UNKNOWN")
stage = getattr(e, "stage", "UNKNOWN")
path = getattr(e, "path", ())
# Treat breaker errors as a distinct code/stage for reporting
if isinstance(e, BreakInfo):
code = e.code if e.code.startswith("BREAK/") else f"BREAK/{code}"
stage = "BREAK"
return code, stage, path
4.2 Group Builder (internal)¶
@dataclass(slots=True)
class _GroupBuilder(Generic[E]):
count: int = 0
samples: list[E] = field(default_factory=list)
cap: int = 10
def add(self, e: E) -> None:
self.count += 1
if len(self.samples) < self.cap:
self.samples.append(e)
def freeze(self) -> ErrGroup[E]:
return ErrGroup(self.count, tuple(self.samples))
4.3 Full Structured Report¶
def fold_error_report(
stream: Iterable[Result[Any, E]],
*,
max_samples: int = 10,
path_depth: int = 3,
) -> ErrReport[E]:
total_errs = total_items = 0
by_code: dict[str, _GroupBuilder[E]] = {}
by_stage: dict[str, _GroupBuilder[E]] = {}
by_path: dict[tuple[int, ...], _GroupBuilder[E]] = {}
sum_attempts = sum_delay = 0.0
cnt_attempts = cnt_delay = 0
for r in stream:
total_items += 1
if isinstance(r, Err):
total_errs += 1
e = r.error
code, stage, path = _normalize_err(e)
prefix = path[:path_depth]
by_code.setdefault(code, _GroupBuilder(max_samples)).add(e)
by_stage.setdefault(stage, _GroupBuilder(max_samples)).add(e)
by_path.setdefault(prefix, _GroupBuilder(max_samples)).add(e)
ctx = getattr(e, "ctx", None)
if isinstance(ctx, Mapping):
a = ctx.get("attempt")
d = ctx.get("next_delay_ms")
if isinstance(a, (int, float)):
sum_attempts += a
cnt_attempts += 1
if isinstance(d, (int, float)):
sum_delay += d
cnt_delay += 1
return ErrReport(
total_errs=total_errs,
total_items=total_items,
by_code=MappingProxyType({k: v.freeze() for k, v in by_code.items()}),
by_stage=MappingProxyType({k: v.freeze() for k, v in by_stage.items()}),
by_path_prefix=MappingProxyType({k: v.freeze() for k, v in by_path.items()}),
ctx_summary=MappingProxyType({
"avg_attempts": sum_attempts / cnt_attempts if cnt_attempts else 0.0,
"avg_next_delay_ms": sum_delay / cnt_delay if cnt_delay else 0.0,
"error_rate": total_errs / total_items if total_items else 0.0,
}),
)
4.4 JSON Serialisation¶
def report_to_jsonable(report: ErrReport[E]) -> dict[str, Any]:
def group_to_dict(g: ErrGroup[E]) -> dict[str, Any]:
return {
"count": g.count,
"samples": [asdict(s) if is_dataclass(s) else {"value": s} for s in g.samples],
}
return {
"total_errs": report.total_errs,
"total_items": report.total_items,
"error_rate": report.ctx_summary["error_rate"],
"avg_attempts": report.ctx_summary["avg_attempts"],
"avg_next_delay_ms": report.ctx_summary["avg_next_delay_ms"],
"by_code": {k: group_to_dict(v) for k, v in report.by_code.items()},
"by_stage": {k: group_to_dict(v) for k, v in report.by_stage.items()},
"by_path_prefix": { ".".join(map(str, k)): group_to_dict(v) for k, v in report.by_path_prefix.items()},
}
4.5 Idiomatic RAG Usage¶
# The stream is single-pass; split with tee for reporting + indexing
from itertools import tee
report_stream, index_stream = tee(embedded)
report = fold_error_report(report_stream, max_samples=20)
if report.total_errs > 0:
logger.error("RAG embedding failures:\n%s",
json.dumps(report_to_jsonable(report), indent=2))
send_to_monitoring(report_to_jsonable(report))
else:
logger.info("Embedding succeeded for all %d chunks", report.total_items)
index_chunks(filter_ok(index_stream))
5. Property-Based Proofs (tests/test_reports.py)¶
@given(items=st.lists(st.integers()))
def test_report_completeness(items):
def f(x: int) -> Result[int, ErrInfo]:
return Err(make_errinfo(f"C{x}", f"msg{x}", "stage", (x,))) if x % 2 else Ok(x)
report = fold_error_report(map_result_iter(f, items))
assert report.total_errs == sum(1 for x in items if x % 2)
assert report.total_items == len(items)
@given(items=st.lists(st.integers(), unique=True))
def test_report_sample_ordering(items):
def f(x: int) -> Result[int, ErrInfo]:
# mark odd values as errors
return Err(make_errinfo("ERR", f"msg{x}", "s", (x,))) if x % 2 != 0 else Ok(x)
report = fold_error_report(map_result_iter(f, items))
group = report.by_code.get("ERR")
# Vacuous truth: if there are no ERR samples, ordering is trivially satisfied
if group is None or not group.samples:
return
samples = group.samples
sample_xs = [int(s.msg[3:]) for s in samples] # "msg{X}" -> X
positions = [
next(i for i, x in enumerate(items) if x == sx and x % 2 != 0)
for sx in sample_xs
]
# Encounter order: positions must be strictly increasing
assert positions == sorted(positions)
@given(items=st.lists(st.integers()))
def test_report_bounded_memory(items):
report = fold_error_report(map_result_iter(lambda x: Err("E"), items), max_samples=10)
assert all(len(g.samples) <= 10 for g in report.by_code.values())
6. Big-O & Allocation Guarantees¶
| Variant | Time | Space | Notes |
|---|---|---|---|
| fold_error_report | O(N) | O(#groups × max_samples) | Bounded by max_samples |
7. Anti-Patterns & Immediate Fixes¶
| Anti-Pattern | Symptom | Fix |
|---|---|---|
| Ad-hoc logging | Scattered diagnostics | Use fold_error_report |
| Unbounded sample collection | Memory blowup | Cap with max_samples |
| No grouping | Hard analysis | Group by code/stage/path_prefix |
| Ignoring retry metadata | Lost insights | Aggregate from ErrInfo.ctx |
8. Pre-Core Quiz¶
fold_error_reportfor…? → Rich grouped diagnosticsmax_samplesfor…? → Bound memory per group- Report includes…? → Counts, samples, ctx_summary
- Law for reports? → Completeness (every Err counted)
- Use after…? → Breakers/retries for final outcomes
9. Post-Core Exercise¶
- Apply
fold_error_reportto a real embedding run → inspect JSON. - Add custom grouping (e.g. by
ctx["attempt"]) → test. - Replace all ad-hoc error logging with structured reports.
- Send report to monitoring on non-zero errors.
You have completed Module 04.
You can now process real-world data at scale with:
- Zero recursion blowups
- Zero cache misses on duplicates
- Zero lost records on failures
- Zero resource leaks
- Zero wasted work on doomed runs
- Perfect structured error reports
This is production-grade functional data processing in Python — the kind that ships to millions of documents and survives anything the real world throws at it.
On to Module 05: Advanced Type-Driven Design.