Error Aggregation
M04C06: Error Aggregation Strategies – Fail-Fast vs Error-List Accumulation for Streams¶
Progression Note¶
By the end of Module 4, you will master safe recursion over unpredictable tree-shaped data, monoidal folds as the universal recursion pattern, Result/Option for streaming error handling, validation aggregators, retries, and structured error reporting — all while preserving laziness, equational reasoning, and constant call-stack usage.
Here's a snippet from the progression map:
| Module | Focus | Key Outcomes |
|---|---|---|
| 3 | Lazy Iteration & Generators | Memory-efficient streaming, itertools mastery, short-circuiting, observability |
| 4 | Safe Recursion & Error Handling in Streams | Stack-safe tree recursion, folds, Result/Option, streaming validation/retries/reports |
| 5 | Advanced Type-Driven Design | ADTs, exhaustive pattern matching, total functions, refined types |
Core question:
How do you aggregate errors from aResultstream — choosing between fail-fast (stop on first error for quick feedback) and full accumulation (collect every error for complete diagnostics) — while preserving laziness when possible and keeping the pipeline pure and composable?
We now take the Iterator[Result[Chunk, ErrInfo]] stream from M04C05 and face the final real-world question:
“After safely wrapping every possible per-chunk failure, what do we actually do with the mixed good/bad stream?”
Two equally valid answers exist:
-
Fail-fast: stop on the first error — perfect for interactive debugging or when one failure invalidates everything downstream.
-
Accumulate all errors: run to completion and collect every failure — essential for batch jobs where you must know exactly what went wrong with which record.
The naïve solution is a manual loop with a flag:
errors = []
for r in embedded:
if isinstance(r, Err):
errors.append(r.error)
if len(errors) >= 100: # arbitrary cap
break
else:
index_chunk(r.value)
This is verbose, easy to get wrong, and duplicates logic across codebases.
The production solution uses tiny, composable folds over Result streams that give you both strategies with mathematical guarantees.
Audience: Engineers who run batch RAG pipelines over millions of chunks and need either quick failure feedback or complete error reports — without ever writing another manual error-collecting loop.
Outcome:
1. You will choose fail-fast or accumulation per pipeline and implement it in 3–5 lines using pure folds.
2. You will cap error collection, aggregate monoidally, and prove short-circuiting works.
3. You will ship a RAG pipeline that either fails instantly on the first bad chunk or delivers a perfect error report covering every failure.
We formalise exactly what we want from correct, production-ready error aggregation: short-circuiting, full collection, bounded memory, ordering, and equivalence to reference implementations.
Concrete Motivating Example¶
Same 100 000 chunk tree from previous cores:
- 99 000 embed successfully.
- 800 Unicode errors.
- 200 OOM errors.
Two equally valid desired behaviours:
Fail-fast (interactive/debug mode):
def append_chunk(chunks: list[Chunk], chunk: Chunk) -> list[Chunk]:
# Pure version: returns a new list; good for clarity and law reasoning.
return [*chunks, chunk]
result = fold_results_fail_fast(
embedded,
init=[],
fn=append_chunk,
)
# → Err(the very first ErrInfo encountered)
# Total work: ~1000 chunk attempts (stops instantly)
Full accumulation (batch/reporting mode):
result = fold_results_collect_errs_capped(
embedded,
init=[],
fn=append_chunk,
max_errs=1000,
)
# → Err((list of up to 1000 ErrInfo, capped=True/False))
# Total work: all 100k chunks (but memory bounded)
Both are pure, lazy where possible, and composable.
In hot paths you may switch to a locally mutating accumulator (e.g. chunks.append(chunk); return chunks) provided the accumulator is not aliased outside the fold. That preserves observational equivalence while re-using the same underlying list; see the hybrid batch optimisation in Module 5.
1. Laws & Invariants (machine-checked)¶
Accumulation variants require finite streams; fail-fast and circuit-breaker work on infinite streams.
| Law | Formal Statement | Enforcement |
|---|---|---|
| Short-Circuit | fold_results_fail_fast stops after first Err, performing O(k) work where k is position of first error. |
test_fold_fail_fast_short_circuit. |
| Full Accumulation | fold_results_collect_errs / fold_results_collect_errs_capped process entire finite stream, collecting every Err in order. All-or-nothing: any Err → Err(...), accumulator discarded. |
test_fold_collect_errs_full_scan, test_fold_collect_errs_capped. |
| Circuit-Breaker | fold_until_error_rate stops when error rate exceeds threshold after min_samples. Returns last error (always present on break). |
test_fold_until_error_rate. |
| Ordering | Collected errors and accumulated values preserve original stream order. | test_fold_ordering_preservation. |
| Equivalence | All folds produce results equivalent to explicit reference loops. | test_fold_equivalence_to_reference. |
These laws guarantee you get exactly the aggregation strategy you asked for.
2. Decision Table – Which Aggregation Strategy Do You Actually Use?¶
| Scenario | Need Speed? | Need Full Diagnostics? | Need Bounded Memory? | Recommended Fold |
|---|---|---|---|---|
| Interactive/debug — stop on first error | Yes | No | Yes | fold_results_fail_fast |
| Batch job — report every error | No | Yes | No | fold_results_collect_errs |
| Batch job — report but cap memory | No | Yes | Yes | fold_results_collect_errs_capped |
| Circuit-breaker on error rate | Yes | Medium | Yes | fold_until_error_rate |
| Collect successes only, fail fast | Yes | No | Yes | all_ok_fail_fast |
| Collect everything (terminal only) | No | Yes | No | collect_both |
Never write manual error-collecting loops — use these folds instead.
Warning: fold_results_collect_errs, fold_results_collect_errs_capped, and collect_both must only be used on finite streams: they always consume the entire iterable by design. For potentially unbounded streams, stick to fold_results_fail_fast, all_ok_fail_fast, or fold_until_error_rate.
3. Public API Surface (end-of-Module-04 refactor note)¶
Refactor note: Result folds live in funcpipe_rag.result.folds (src/funcpipe_rag/result/folds.py) and are re-exported from
funcpipe_rag.result and funcpipe_rag.api.core.
from funcpipe_rag.api.core import (
ResultsBoth,
all_ok_fail_fast,
collect_both,
fold_results_collect_errs,
fold_results_collect_errs_capped,
fold_results_fail_fast,
fold_until_error_rate,
)
4. Reference Implementations¶
4.1 Fail-Fast Aggregation (Short-Circuit on First Error)¶
def fold_results_fail_fast(
xs: Iterable[Result[T, E]],
init: A,
fn: Callable[[A, T], A],
) -> Result[A, E]:
"""Aggregate over Ok values with fn; short-circuit on first Err. Laziness preserved."""
acc = init
for r in xs:
if isinstance(r, Err):
return Err(r.error)
acc = fn(acc, r.value)
return Ok(acc)
4.2 Full Error Accumulation (All-or-Nothing)¶
def fold_results_collect_errs(
xs: Iterable[Result[T, E]],
init: A,
fn: Callable[[A, T], A],
) -> Result[A, list[E]]:
"""Aggregate over Ok values with fn; collect all Err in list. Consumes entire finite stream.
All-or-nothing: any Err → Err(all errors), accumulator discarded."""
acc = init
errs: list[E] = []
for r in xs:
if isinstance(r, Err):
errs.append(r.error)
else:
acc = fn(acc, r.value)
return Err(errs) if errs else Ok(acc)
4.3 Capped Error Accumulation¶
def fold_results_collect_errs_capped(
xs: Iterable[Result[T, E]],
init: A,
fn: Callable[[A, T], A],
*,
max_errs: int,
) -> Result[A, tuple[list[E], bool]]:
"""Aggregate over Ok values with fn; collect up to max_errs errors + overflow flag.
All-or-nothing: any Err or overflow → Err((errs, capped)), accumulator discarded."""
acc = init
errs: list[E] = []
capped = False
for r in xs:
if isinstance(r, Err):
if len(errs) < max_errs:
errs.append(r.error)
else:
capped = True
else:
acc = fn(acc, r.value)
return Err((errs, capped)) if errs or capped else Ok(acc)
4.4 Circuit-Breaker on Error Rate¶
def fold_until_error_rate(
xs: Iterable[Result[T, E]],
init: A,
fn: Callable[[A, T], A],
*,
max_rate: float,
min_samples: int = 100,
) -> Result[A, tuple[E, float, int]]:
"""Aggregate until error rate exceeds max_rate after min_samples.
Returns last error (always present when tripping).
Precondition: 0.0 < max_rate < 1.0. Violations are treated as programmer bugs
and raise ValueError rather than being represented in the Result type.
"""
if not 0.0 < max_rate < 1.0:
raise ValueError("max_rate must be in (0,1)")
acc = init
n_ok = n_err = 0
last_err: E | None = None
for r in xs:
if isinstance(r, Ok):
acc = fn(acc, r.value)
n_ok += 1
else:
last_err = r.error
n_err += 1
total = n_ok + n_err
if total >= min_samples and n_err / total > max_rate:
assert last_err is not None
return Err((last_err, n_err / total, total))
return Ok(acc)
4.5 Simple Helpers¶
def all_ok_fail_fast(xs: Iterable[Result[T, E]]) -> Result[list[T], E]:
"""Collect all Ok values; short-circuit on first Err. O(#oks) result size, O(1) extra overhead."""
acc: list[T] = []
for r in xs:
if isinstance(r, Err):
return Err(r.error)
acc.append(r.value)
return Ok(acc)
def collect_both(xs: Iterable[Result[T, E]]) -> ResultsBoth[T, E]:
"""Collect all Ok values and all Err errors into ResultsBoth. Consumes the entire finite stream. O(N) memory, terminal aggregation only.
Use when you want partial successes and full error list."""
oks: list[T] = []
errs: list[E] = []
for r in xs:
if isinstance(r, Ok):
oks.append(r.value)
else:
errs.append(r.error)
return ResultsBoth(oks, errs)
4.6 Idiomatic RAG Usage¶
def append_chunk(chunks: list[Chunk], chunk: Chunk) -> list[Chunk]:
# Pure accumulator for teaching; swap to a local-mutation variant only in profiled hot paths.
return [*chunks, chunk]
# Debug/interactive mode — stop on first error
result = fold_results_fail_fast(
embedded,
init=[],
fn=append_chunk,
)
if isinstance(result, Err):
report_first_error(result.error)
# Batch/reporting mode — collect everything (capped)
result = fold_results_collect_errs_capped(
embedded,
init=[],
fn=append_chunk,
max_errs=1000,
)
if isinstance(result, Ok):
oks = result.value
errs: list[ErrInfo] = []
capped = False
else:
oks = [] # partial successes discarded (all-or-nothing)
errs, capped = result.error
index_chunks(oks)
report_errors(errs, capped=capped)
5. Property-Based Proofs (tests/test_result_folds.py)¶
@given(items=st.lists(st.integers()))
def test_fold_fail_fast_short_circuit(items):
seen = 0
def f(x: int) -> Result[int, str]:
nonlocal seen
seen += 1
return Ok(x) if x != 0 else Err("ZERO")
res = fold_results_fail_fast(map_result_iter(f, items), 0, lambda a, v: a + v)
if 0 in items:
assert isinstance(res, Err)
assert seen == items.index(0) + 1
else:
assert res.value == sum(items)
assert seen == len(items)
@given(items=st.lists(st.integers()))
def test_fold_collect_errs_full_scan(items):
seen = 0
def f(x: int) -> Result[int, str]:
nonlocal seen
seen += 1
return Ok(x) if x != 0 else Err("ZERO")
res = fold_results_collect_errs(map_result_iter(f, items), 0, lambda a, v: a + v)
assert seen == len(items)
if isinstance(res, Err):
assert len(res.error) == items.count(0)
else:
assert res.value == sum(items)
@given(items=st.lists(st.integers()))
def test_fold_collect_errs_capped(items):
def f(x: int) -> Result[int, str]:
return Ok(x) if x != 0 else Err("ZERO")
res = fold_results_collect_errs_capped(map_result_iter(f, items), 0, lambda a, v: a + v, max_errs=5)
if isinstance(res, Err):
errs, capped = res.error
assert len(errs) <= 5
assert capped == (items.count(0) > 5)
else:
assert items.count(0) == 0
assert res.value == sum(items)
@given(items=st.lists(st.integers()))
def test_fold_until_error_rate(items):
def f(x: int) -> Result[int, str]:
return Ok(x) if x % 3 != 0 else Err("MOD3")
res = fold_until_error_rate(map_result_iter(f, items), 0, lambda a, v: a + v, max_rate=0.3, min_samples=20)
if isinstance(res, Err):
last_err, rate, total = res.error
assert rate > 0.3
assert total >= 20
assert last_err == "MOD3"
else:
# If we didn't trip the circuit-breaker and we had enough samples,
# the final observed error rate must not exceed the threshold.
if len(items) >= 20:
total = len(items)
n_err = sum(1 for x in items if x % 3 == 0)
assert n_err / total <= 0.3
@given(items=st.lists(st.integers()))
def test_fold_equivalence_to_reference(items):
# reference manual loop (fail-fast)
def ref_fail_fast(xs):
acc = 0
for x in xs:
if x == 0:
return Err("ZERO")
acc += x
return Ok(acc)
# reference manual loop (collect errs, all-or-nothing)
def ref_collect_errs(xs):
acc = 0
errs = []
for x in xs:
if x == 0:
errs.append("ZERO")
else:
acc += x
return Err(errs) if errs else Ok(acc)
def f(x): return Ok(x) if x != 0 else Err("ZERO")
assert fold_results_fail_fast(map_result_iter(f, items), 0, lambda a, v: a + v) == ref_fail_fast(items)
assert fold_results_collect_errs(map_result_iter(f, items), 0, lambda a, v: a + v) == ref_collect_errs(items)
@given(items=st.lists(st.integers()))
def test_fold_ordering_preservation(items):
tagged = list(enumerate(items))
def f(iv):
i, v = iv
return Ok(iv) if v % 2 else Err(f"err{i}")
res = fold_results_collect_errs(map_result_iter(f, tagged), [], lambda acc, v: acc + [v])
if isinstance(res, Err):
err_positions = [int(e[3:]) for e in res.error]
assert err_positions == [i for i, v in enumerate(items) if v % 2 == 0]
6. Big-O & Allocation Guarantees¶
| Variant | Time | Heap | Laziness |
|---|---|---|---|
| fold_results_fail_fast | O(k) where k = first Err position | O(acc) | Yes |
| fold_results_collect_errs | O(N) | O(#errors + acc) | No (full scan) |
| fold_results_collect_errs_capped | O(N) | O(min(#errors, cap) + acc) | No (full scan) |
| fold_until_error_rate | O(k) or O(N) | O(acc) | Yes |
| all_ok_fail_fast | O(k) or O(N) | O(#oks) result + O(1) extra | Yes |
| collect_both | O(N) | O(N) | No |
Fail-fast strategies are lazy; accumulation requires finite streams.
7. Anti-Patterns & Immediate Fixes¶
| Anti-Pattern | Symptom | Fix |
|---|---|---|
| Manual error-collecting loops | Duplicated buggy code | Use fold_results_* |
| Always full-scan on errors | Wasted work | Use fold_results_fail_fast |
| Unbounded error collection | Memory blowup | Use fold_results_collect_errs_capped |
| High error rates unchecked | Degraded performance | Use fold_until_error_rate |
8. Pre-Core Quiz¶
- Fail-fast for…? → Early termination on first error
- Full accumulation for…? → Complete error report
- Capped accumulation for…? → Bounded memory diagnostics
- Circuit-breaker for…? → Stop on error rate threshold
- Never do manually what…? → A fold can do correctly
9. Post-Core Exercise¶
- Replace a manual error loop with
fold_results_fail_fast→ test short-circuit. - Add capped error collection → verify memory stays bounded.
- Implement circuit-breaker on embedding error rate → test early stop.
- Use
collect_bothat pipeline end → generate JSON error report.
Next: M04C07 – Retries with Backoff Policies for Transient Failures.
You now have the complete toolkit to choose exactly how strict or forgiving your pipeline should be — all with mathematical guarantees and zero manual loops. The rest of Module 4 is about adding retries and final reporting.