Streaming Error Handling
M04C05: Streaming Error Handling – Mixed Good/Bad Data in a Single FuncPipe¶
Progression Note¶
By the end of Module 4, you will master safe recursion over unpredictable tree-shaped data, monoidal folds as the universal recursion pattern, Result/Option for streaming error handling, validation aggregators, retries, and structured error reporting — all while preserving laziness, equational reasoning, and constant call-stack usage.
Here's a snippet from the progression map:
| Module | Focus | Key Outcomes |
|---|---|---|
| 3 | Lazy Iteration & Generators | Memory-efficient streaming, itertools mastery, short-circuiting, observability |
| 4 | Safe Recursion & Error Handling in Streams | Stack-safe tree recursion, folds, Result/Option, streaming validation/retries/reports |
| 5 | Advanced Type-Driven Design | ADTs, exhaustive pattern matching, total functions, refined types |
Core question:
How do you keep a lazy streaming pipeline flowing when individual records fail, while faithfully collecting every error with full provenance and enabling one-pass routing, recovery, or parallel processing — all without materialising the stream?
We now take the TreeDoc → Chunk → Result[Chunk, ErrInfo] stream from M04C04 and face the real-world production reality:
99 % of chunks embed successfully, but 1 % fail for different reasons (Unicode, OOM, model rejection, network timeout).
A naïve pipeline would:
You lose everything after the first bad chunk.
Even a careful try/except loop either: - halts on first error, - silently drops bad chunks, - or materialises everything just to separate good/bad.
The production solution uses tiny, composable streaming combinators that treat Result as a normal value:
embedded = par_try_map_iter(embed_chunk, chunks_with_path, stage="embed")
# → Iterator[Result[Chunk, ErrInfo]] that never raises and preserves order
The stream continues forever; good chunks flow through immediately; every failure is captured with full provenance and can be logged, retried, or routed without stopping anything.
Audience: Engineers who run RAG (or any data-processing) pipelines over real-world messy data and refuse to lose 99 % of their work because of 1 % bad records.
Outcome:
1. You will process mixed good/bad streams with zero halting and O(1) memory per item.
2. You will route, log, recover, or aggregate errors in one pass using lazy combinators.
3. You will ship a RAG pipeline that survives any per-chunk catastrophe and delivers rich, structured error reports.
We formalise exactly what we want from correct, production-ready streaming error handling: continuation, ordering, separation, bounded work, and perfect containment.
Concrete Motivating Example¶
Same deep TreeDoc from previous cores:
- 100 000 chunks total.
- 99 000 embed successfully.
- 800 contain truncated UTF-8 →
UnicodeDecodeError. - 200 are >10 MB →
MemoryErrorin embedder.
Desired behaviour:
embedded: Iterator[Result[Chunk, ErrInfo]] = par_try_map_iter(
embed_chunk,
chunks_with_path,
stage="embed",
key_path=lambda cp: cp[1],
)
# Never raises, processes all 100k items, yields 99k Ok + 1k Err instantly
oks, errs = partition_results(embedded) # materialise only at the very end
- Total time ≈ time for 99k successful embeddings (the 1k failures are near-instant).
- Full provenance on every error (tree path, stage, cause).
1. Laws & Invariants (machine-checked)¶
| Law | Formal Statement | Enforcement |
|---|---|---|
| Continuation | Pipeline never halts on Err; every input item produces exactly one output item. |
test_continuation_full_output. |
| Ordering | All combinators preserve input order in their outputs (successes and failures appear in original sequence). | test_ordering_preservation. |
| Separation (lazy) | filter_ok / filter_err partition the stream into good/bad subsequences, each preserving relative order, without materialisation. |
test_lazy_separation_equivalence. |
| Bounded-Work | Processing first k items (good or bad) performs exactly k applications of the wrapped function. | test_bounded_work. |
| Containment | No unhandled exception escapes any combinator; every failure becomes an Err. |
test_containment_no_leak. |
| One-Pass Routing | split_results_to_sinks visits each item exactly once. |
test_single_pass_split. |
These laws guarantee the stream is robust, predictable, and truly lazy.
2. Decision Table – Which Combinator Do You Actually Use?¶
| Goal | Need Parallelism? | Need Recovery? | Need One-Pass Routing? | Recommended Combinator |
|---|---|---|---|---|
| Simple exception containment | No | No | No | try_map_iter |
| Order-preserving parallelism | Yes | No | No | par_try_map_iter |
| Stream only successes | – | No | No | filter_ok |
| Stream/log only failures | – | No | No | filter_err / tap_err |
| Recover failures to values | – | Yes | No | recover_iter |
| Recover failures to Result | – | Yes | No | recover_result_iter |
| Route to two sinks in one pass | – | No | Yes | split_results_to_sinks |
| Route + contain sink exceptions | – | No | Yes | split_results_to_sinks_guarded |
| Materialise good/bad at end | – | No | No | partition_results (endpoint only) |
Never materialise early for separation — use one-pass routing instead.
3. Public API Surface (end-of-Module-04 refactor note)¶
Refactor note: streaming Result combinators live in funcpipe_rag.result.stream (src/funcpipe_rag/result/stream.py) and are re-exported from
funcpipe_rag.result and funcpipe_rag.api.core.
from funcpipe_rag.api.core import (
filter_err,
filter_ok,
par_try_map_iter,
partition_results,
recover_iter,
recover_result_iter,
split_results_to_sinks,
split_results_to_sinks_guarded,
tap_err,
tap_ok,
try_map_iter,
)
4. Reference Implementations¶
4.1 try_map_iter – Exception-Safe Lazy Wrapping¶
def try_map_iter(
fn: Callable[[T], U],
xs: Iterable[T],
*,
stage: str,
key_path: Callable[[T], tuple[int, ...]] | None = None,
code: str = "PIPE/EXC",
) -> Iterator[Result[U, ErrInfo]]:
for x in xs:
try:
yield Ok(fn(x))
except Exception as exc:
p = key_path(x) if key_path is not None else ()
yield Err(make_errinfo(code, str(exc), stage, p, exc))
4.2 par_try_map_iter – Order-Preserving Parallel Mapping¶
from collections import deque
from concurrent.futures import ThreadPoolExecutor, Future
def par_try_map_iter(
fn: Callable[[T], U],
xs: Iterable[T],
*,
stage: str,
key_path: Callable[[T], tuple[int, ...]] | None = None,
code: str = "PIPE/EXC",
max_workers: int = 8,
max_in_flight: int = 32,
) -> Iterator[Result[U, ErrInfo]]:
it = iter(xs)
inflight: deque[tuple[int, T, Future[U]]] = deque()
idx = 0
with ThreadPoolExecutor(max_workers=max_workers) as ex:
# Prime the pipeline
while len(inflight) < max_in_flight:
try:
x = next(it)
except StopIteration:
break
inflight.append((idx, x, ex.submit(fn, x)))
idx += 1
out_idx = 0
while inflight:
# Drain in order
while inflight and inflight[0][0] == out_idx:
i, x, fut = inflight.popleft()
try:
yield Ok(fut.result())
except Exception as exc:
p = key_path(x) if key_path is not None else ()
yield Err(make_errinfo(code, str(exc), stage, p, exc))
out_idx += 1
# Refill
if len(inflight) < max_in_flight:
try:
x = next(it)
except StopIteration:
continue
inflight.append((idx, x, ex.submit(fn, x)))
idx += 1
4.3 The Rest (concise, correct, lazy)¶
def filter_ok(xs: Iterable[Result[T, E]]) -> Iterator[T]:
for r in xs:
if isinstance(r, Ok):
yield r.value
def filter_err(xs: Iterable[Result[T, E]]) -> Iterator[E]:
for r in xs:
if isinstance(r, Err):
yield r.error
def tap_ok(xs: Iterable[Result[T, E]], fn: Callable[[T], None]) -> Iterator[Result[T, E]]:
"""Observational tap only – fn may log or increment metrics but must not mutate values."""
for r in xs:
if isinstance(r, Ok):
fn(r.value)
yield r
def tap_err(xs: Iterable[Result[T, E]], fn: Callable[[E], None]) -> Iterator[Result[T, E]]:
"""Observational tap only – fn may log or increment metrics but must not mutate values."""
for r in xs:
if isinstance(r, Err):
fn(r.error)
yield r
def recover_iter(xs: Iterable[Result[T, E]], fn: Callable[[E], T]) -> Iterator[T]:
for r in xs:
yield r.value if isinstance(r, Ok) else fn(r.error)
def recover_result_iter(xs: Iterable[Result[T, E]], fn: Callable[[E], Result[T, E]]) -> Iterator[Result[T, E]]:
for r in xs:
yield r if isinstance(r, Ok) else fn(r.error)
def split_results_to_sinks(
xs: Iterable[Result[T, E]],
on_ok: Callable[[T], None],
on_err: Callable[[E], None],
) -> None:
for r in xs:
if isinstance(r, Ok):
on_ok(r.value)
else:
on_err(r.error)
def split_results_to_sinks_guarded(
xs: Iterable[Result[T, E]],
on_ok: Callable[[T], None],
on_err: Callable[[E], None],
*,
stage: str = "sink",
) -> Iterator[Result[None, ErrInfo]]:
"""Contain sink exceptions; original Err is processed before sink may fail."""
for r in xs:
try:
if isinstance(r, Ok):
on_ok(r.value)
else:
on_err(r.error)
yield Ok(None)
except Exception as exc:
yield Err(make_errinfo("SINK/EXC", str(exc), stage, (), exc))
def partition_results(xs: Iterable[Result[T, E]]) -> tuple[list[T], list[E]]:
oks: list[T] = []
errs: list[E] = []
for r in xs:
if isinstance(r, Ok):
oks.append(r.value)
else:
errs.append(r.error)
return oks, errs
4.4 Idiomatic RAG Usage¶
embedded = par_try_map_iter(
embed_chunk,
chunks_with_path,
stage="embed",
key_path=lambda cp: cp[1],
)
split_results_to_sinks(
tap_err(embedded, log_err_info),
on_ok=index_chunk,
on_err=error_warehouse,
)
5. Property-Based Proofs (tests/test_result_stream.py)¶
@given(items=st.lists(st.integers()))
def test_continuation_full_output(items):
def f(x: int) -> int:
if x == -1:
raise ValueError("boom")
return x
results = list(try_map_iter(f, items, stage="test"))
assert len(results) == len(items)
@given(items=st.lists(st.integers()))
def test_ordering_preservation(items):
tagged = list(enumerate(items))
def f(iv):
i, v = iv
if v % 2 == 0:
raise ValueError("even")
return iv
results = list(try_map_iter(f, tagged, stage="test", key_path=lambda iv: (iv[0],)))
ok_indices = [r.value[0] for r in results if isinstance(r, Ok)]
err_indices = [r.error.path[0] for r in results if isinstance(r, Err)]
assert ok_indices + err_indices == list(range(len(items)))
@given(items=st.lists(st.integers()))
def test_lazy_separation_equivalence(items):
def f(x: int) -> int:
if x % 2 == 0:
raise ValueError("even")
return x
stream = try_map_iter(f, items, stage="test")
oks = list(filter_ok(stream))
stream2 = try_map_iter(f, items, stage="test")
errs = list(filter_err(stream2))
assert len(oks) + len(errs) == len(items)
@given(items=st.lists(st.integers()))
def test_single_pass_split(items):
seen = 0
def on_ok(_): nonlocal seen; seen += 1
def on_err(_): nonlocal seen; seen += 1
def f(x: int) -> int:
if x % 2 == 0:
raise ValueError("even")
return x
split_results_to_sinks(try_map_iter(f, items, stage="test"), on_ok, on_err)
assert seen == len(items)
@given(items=st.lists(st.integers()))
def test_bounded_work(items):
seen = 0
def f(x):
nonlocal seen
seen += 1
if x == 0:
raise ValueError("zero")
return x
stream = try_map_iter(f, items, stage="test")
list(islice(stream, 25))
assert seen == min(25, len(items))
@given(items=st.lists(st.integers()))
def test_try_map_iter_matches_try_except(items):
def f(x: int) -> int:
if x == 0:
raise ValueError("boom")
return 100 // x
# reference try/except
ref: list[Result[int, ErrInfo]] = []
for x in items:
try:
ref.append(Ok(100 // x))
except Exception as exc:
# match production arity: include `exc` as cause
ref.append(Err(make_errinfo("PIPE/EXC", str(exc), "test", (), exc)))
got = list(try_map_iter(f, items, stage="test"))
# Compare shapes, values, and error codes/messages (cause/path may differ)
assert [isinstance(r, Ok) for r in got] == [isinstance(r, Ok) for r in ref]
assert [r.value for r in got if isinstance(r, Ok)] == \
[r.value for r in ref if isinstance(r, Ok)]
assert [r.error.code for r in got if isinstance(r, Err)] == \
[r.error.code for r in ref if isinstance(r, Err)]
@given(items=st.lists(st.integers(), min_size=1, max_size=200))
def test_par_try_map_iter_matches_try_map_iter(items):
def f(x: int) -> int:
if x == 0:
raise ValueError("boom")
return 100 // x
seq = list(try_map_iter(f, items, stage="test"))
par = list(par_try_map_iter(f, items, stage="test", max_workers=4, max_in_flight=8))
assert [isinstance(r, Ok) for r in par] == [isinstance(r, Ok) for r in seq]
assert [r.value if isinstance(r, Ok) else r.error.msg for r in par] == \
[r.value if isinstance(r, Ok) else r.error.msg for r in seq]
@given(items=st.lists(st.integers()))
def test_containment_no_leak(items):
def f(x: int) -> int:
raise ValueError("always fail")
# This should never raise
list(try_map_iter(f, items, stage="test"))
list(par_try_map_iter(f, items, stage="test"))
6. Big-O & Allocation Guarantees¶
| Variant | Time per item | Heap per item | Laziness |
|---|---|---|---|
| try_map_iter | O(1) | O(1) | Yes |
| par_try_map_iter | O(1) amortised | O(max_in_flight) total | Yes |
| filter_ok / filter_err / tap_* | O(1) | O(1) | Yes |
| recover_iter / recover_result_iter | O(1) | O(1) | Yes |
| split_results_to_sinks | O(1) | O(1) | Yes |
| partition_results | O(1) | O(N) total | No |
All streaming operations are truly lazy.
7. Anti-Patterns & Immediate Fixes¶
| Anti-Pattern | Symptom | Fix |
|---|---|---|
| Halting on first exception | Lost data after first failure | Use try_map_iter / par_try_map_iter |
| Materialising early for separation | Memory blowup | Use split_results_to_sinks (one-pass) |
| Silent drop of bad records | Incomplete results | Use filter_err or tap_err to capture |
| Sink exceptions crashing pipeline | Partial processing | Use split_results_to_sinks_guarded |
8. Pre-Core Quiz¶
- try_map_iter for…? → Exception-safe lazy mapping
- par_try_map_iter for…? → Order-preserving parallel mapping
- filter_ok for…? → Stream only successes
- split_results_to_sinks for…? → One-pass routing to two sinks
- recover_iter for…? → Recover Err to values lazily
9. Post-Core Exercise¶
- Replace a try/except loop with
try_map_iter→ verify full output on mixed data. - Add
tap_errfor logging → test laziness withislice. - Use
par_try_map_iterfor embedding → measure speedup on real dataset. - Implement
split_results_to_sinks_guardedfor safe indexing + error warehouse. - Add ordering test to your own pipeline using tagged inputs.
Next: M04C06 – Validation Aggregators & Monoidal Error Collection.
You now have the complete toolkit to process real-world messy data without ever losing a single record or halting on one bad apple. The rest of Module 4 is about aggregating and reporting those errors beautifully.