Skip to content

Structured Error Reports

M04C10: Structured Error Reports from Streaming Pipelines (Errors as First-Class Data)

Module 04 Recap – What You Can Now Do

You have completed the full error-handling stack:

  • M04C01–C02: Safe recursion over trees — constant stack, full laziness, machine-checked termination.
  • M04C03: Memoization — pure, persistent, observationally invisible caching.
  • M04C04: Result/Option — turn every per-record failure into a value; streams never halt.
  • M04C05: Streaming combinators — one-pass routing, recovery, parallelism over mixed good/bad data.
  • M04C06: Aggregation strategies — fail-fast or accumulate all errors with mathematical guarantees.
  • M04C07: Circuit-breakers — abort the moment a run becomes hopeless.
  • M04C08: Resource safety — zero leaks, even on early termination or exceptions.
  • M04C09: Pure retries — bounded, fair, composable, with full provenance.
  • M04C10: Structured reports — turn every error into beautiful, serialisable, grouped diagnostics.

You can now process real-world messy data at scale with:

  • Zero recursion blowups
  • Zero cache misses on duplicates
  • Zero lost records on failures
  • Zero resource leaks
  • Zero wasted work on doomed runs
  • Perfect structured error reports

This is production-grade functional data processing in Python — the kind that ships to millions of documents and survives anything the real world throws at it.


Core question:
How do you turn every error in a streaming pipeline into structured, serialisable, grouped reports — complete with counts, ordered samples, and retry metadata — while keeping the pipeline pure and memory-bounded?

We now take the fully-featured Iterator[Result[Chunk, ErrInfo | BreakInfo[ErrInfo]]] stream from M04C05–C09 and answer the final question:

“I have captured every possible failure with full provenance. Now how do I turn that into actionable diagnostics without ad-hoc logging or unbounded memory?”

The naïve solution is scattered logging:

for r in embedded:
    if isinstance(r, Err):
        logger.error("Embedding failed: %s", r.error)

This loses grouping, ordering, counts, and retry context — and blows up logs on high error rates.

The production solution folds the Result stream into an immutable ErrReport — pure data with groups by code/stage/path, bounded samples, and aggregated retry statistics.

Audience: Engineers who need complete, structured failure diagnostics from batch RAG runs.

Outcome:
1. You will aggregate every error into a rich, bounded, serialisable report with one fold.
2. You will group by code/stage/path_prefix and extract retry metadata automatically.
3. You will ship perfect JSON error reports that survive 1 % or 50 % failure rates.

We formalise exactly what we want from correct, production-ready error reports: completeness, bounded memory, ordering, and purity.


Concrete Motivating Example

Same 100 000 chunk tree from previous cores:

  • 95 000 embed successfully.
  • 4 800 transient failures → retried and succeeded.
  • 200 genuine failures (invalid content).

Desired final report (JSON):

{
  "total_errs": 200,
  "total_items": 100000,
  "error_rate": 0.002,
  "avg_attempts": 2.41,
  "by_code": {
    "INVALID_CONTENT": {"count": 200, "samples": [...]}
  },
  "by_stage": {
    "embed": {"count": 200, "samples": [...]}
  },
  "by_path_prefix": {
    "2.4": {"count": 180, "samples": [...]}
  }
}

One fold, bounded memory, perfect provenance.


1. Laws & Invariants (machine-checked)

Law Formal Statement Enforcement
Completeness Every Err in a finite stream contributes exactly once to each relevant group’s count; samples are a prefix of encounter order up to max_samples per group. test_report_completeness, test_report_sample_ordering.
Purity Same input stream → identical report (deterministic, no side effects). Reproducibility test.
Bounded Memory Memory ≤ O(#groups × max_samples). test_report_bounded_memory.
Ordering Samples within each group appear in original encounter order. test_report_sample_ordering.

These laws guarantee your reports are complete, reproducible, and memory-safe.


2. Decision Table – Which Report Do You Actually Need?

Need Bounded Memory? Need Samples? Recommended Fold
Just counts by code Yes No fold_error_counts
Full grouped report with samples Yes Yes fold_error_report(max_samples=20)
Everything (including successes) No Yes collect_both (Core 6)

Cap samples aggressively in production — 10–50 per group is plenty for debugging.


3. Public API Surface (end-of-Module-04 refactor note)

Refactor note: error reports live in funcpipe_rag.policies.reports (src/funcpipe_rag/policies/reports.py) and are re-exported from funcpipe_rag.api.core.

from funcpipe_rag.api.core import ErrGroup, ErrReport, fold_error_counts, fold_error_report, report_to_jsonable

4. Reference Implementations

4.1 Error Normalisation (for grouping)

def _normalize_err(e: Any) -> tuple[str, str, tuple[int, ...]]:
    code = getattr(e, "code", "UNKNOWN")
    stage = getattr(e, "stage", "UNKNOWN")
    path = getattr(e, "path", ())
    # Treat breaker errors as a distinct code/stage for reporting
    if isinstance(e, BreakInfo):
        code = e.code if e.code.startswith("BREAK/") else f"BREAK/{code}"
        stage = "BREAK"
    return code, stage, path

4.2 Group Builder (internal)

@dataclass(slots=True)
class _GroupBuilder(Generic[E]):
    count: int = 0
    samples: list[E] = field(default_factory=list)
    cap: int = 10

    def add(self, e: E) -> None:
        self.count += 1
        if len(self.samples) < self.cap:
            self.samples.append(e)

    def freeze(self) -> ErrGroup[E]:
        return ErrGroup(self.count, tuple(self.samples))

4.3 Full Structured Report

def fold_error_report(
    stream: Iterable[Result[Any, E]],
    *,
    max_samples: int = 10,
    path_depth: int = 3,
) -> ErrReport[E]:
    total_errs = total_items = 0
    by_code: dict[str, _GroupBuilder[E]] = {}
    by_stage: dict[str, _GroupBuilder[E]] = {}
    by_path: dict[tuple[int, ...], _GroupBuilder[E]] = {}
    sum_attempts = sum_delay = 0.0
    cnt_attempts = cnt_delay = 0

    for r in stream:
        total_items += 1
        if isinstance(r, Err):
            total_errs += 1
            e = r.error
            code, stage, path = _normalize_err(e)
            prefix = path[:path_depth]

            by_code.setdefault(code, _GroupBuilder(max_samples)).add(e)
            by_stage.setdefault(stage, _GroupBuilder(max_samples)).add(e)
            by_path.setdefault(prefix, _GroupBuilder(max_samples)).add(e)

            ctx = getattr(e, "ctx", None)
            if isinstance(ctx, Mapping):
                a = ctx.get("attempt")
                d = ctx.get("next_delay_ms")
                if isinstance(a, (int, float)):
                    sum_attempts += a
                    cnt_attempts += 1
                if isinstance(d, (int, float)):
                    sum_delay += d
                    cnt_delay += 1

    return ErrReport(
        total_errs=total_errs,
        total_items=total_items,
        by_code=MappingProxyType({k: v.freeze() for k, v in by_code.items()}),
        by_stage=MappingProxyType({k: v.freeze() for k, v in by_stage.items()}),
        by_path_prefix=MappingProxyType({k: v.freeze() for k, v in by_path.items()}),
        ctx_summary=MappingProxyType({
            "avg_attempts": sum_attempts / cnt_attempts if cnt_attempts else 0.0,
            "avg_next_delay_ms": sum_delay / cnt_delay if cnt_delay else 0.0,
            "error_rate": total_errs / total_items if total_items else 0.0,
        }),
    )

4.4 JSON Serialisation

def report_to_jsonable(report: ErrReport[E]) -> dict[str, Any]:
    def group_to_dict(g: ErrGroup[E]) -> dict[str, Any]:
        return {
            "count": g.count,
            "samples": [asdict(s) if is_dataclass(s) else {"value": s} for s in g.samples],
        }

    return {
        "total_errs": report.total_errs,
        "total_items": report.total_items,
        "error_rate": report.ctx_summary["error_rate"],
        "avg_attempts": report.ctx_summary["avg_attempts"],
        "avg_next_delay_ms": report.ctx_summary["avg_next_delay_ms"],
        "by_code": {k: group_to_dict(v) for k, v in report.by_code.items()},
        "by_stage": {k: group_to_dict(v) for k, v in report.by_stage.items()},
        "by_path_prefix": { ".".join(map(str, k)): group_to_dict(v) for k, v in report.by_path_prefix.items()},
    }

4.5 Idiomatic RAG Usage

# The stream is single-pass; split with tee for reporting + indexing
from itertools import tee

report_stream, index_stream = tee(embedded)

report = fold_error_report(report_stream, max_samples=20)

if report.total_errs > 0:
    logger.error("RAG embedding failures:\n%s", 
                 json.dumps(report_to_jsonable(report), indent=2))
    send_to_monitoring(report_to_jsonable(report))
else:
    logger.info("Embedding succeeded for all %d chunks", report.total_items)

index_chunks(filter_ok(index_stream))

5. Property-Based Proofs (tests/test_reports.py)

@given(items=st.lists(st.integers()))
def test_report_completeness(items):
    def f(x: int) -> Result[int, ErrInfo]:
        return Err(make_errinfo(f"C{x}", f"msg{x}", "stage", (x,))) if x % 2 else Ok(x)
    report = fold_error_report(map_result_iter(f, items))
    assert report.total_errs == sum(1 for x in items if x % 2)
    assert report.total_items == len(items)

@given(items=st.lists(st.integers(), unique=True))
def test_report_sample_ordering(items):
    def f(x: int) -> Result[int, ErrInfo]:
        # mark odd values as errors
        return Err(make_errinfo("ERR", f"msg{x}", "s", (x,))) if x % 2 != 0 else Ok(x)

    report = fold_error_report(map_result_iter(f, items))
    group = report.by_code.get("ERR")
    # Vacuous truth: if there are no ERR samples, ordering is trivially satisfied
    if group is None or not group.samples:
        return

    samples = group.samples
    sample_xs = [int(s.msg[3:]) for s in samples]  # "msg{X}" -> X
    positions = [
        next(i for i, x in enumerate(items) if x == sx and x % 2 != 0)
        for sx in sample_xs
    ]
    # Encounter order: positions must be strictly increasing
    assert positions == sorted(positions)

@given(items=st.lists(st.integers()))
def test_report_bounded_memory(items):
    report = fold_error_report(map_result_iter(lambda x: Err("E"), items), max_samples=10)
    assert all(len(g.samples) <= 10 for g in report.by_code.values())

6. Big-O & Allocation Guarantees

Variant Time Space Notes
fold_error_report O(N) O(#groups × max_samples) Bounded by max_samples

7. Anti-Patterns & Immediate Fixes

Anti-Pattern Symptom Fix
Ad-hoc logging Scattered diagnostics Use fold_error_report
Unbounded sample collection Memory blowup Cap with max_samples
No grouping Hard analysis Group by code/stage/path_prefix
Ignoring retry metadata Lost insights Aggregate from ErrInfo.ctx

8. Pre-Core Quiz

  1. fold_error_report for…? → Rich grouped diagnostics
  2. max_samples for…? → Bound memory per group
  3. Report includes…? → Counts, samples, ctx_summary
  4. Law for reports? → Completeness (every Err counted)
  5. Use after…? → Breakers/retries for final outcomes

9. Post-Core Exercise

  1. Apply fold_error_report to a real embedding run → inspect JSON.
  2. Add custom grouping (e.g. by ctx["attempt"]) → test.
  3. Replace all ad-hoc error logging with structured reports.
  4. Send report to monitoring on non-zero errors.

You have completed Module 04.

You can now process real-world data at scale with:

  • Zero recursion blowups
  • Zero cache misses on duplicates
  • Zero lost records on failures
  • Zero resource leaks
  • Zero wasted work on doomed runs
  • Perfect structured error reports

This is production-grade functional data processing in Python — the kind that ships to millions of documents and survives anything the real world throws at it.

On to Module 05: Advanced Type-Driven Design.