Skip to content

Streaming Error Handling

M04C05: Streaming Error Handling – Mixed Good/Bad Data in a Single FuncPipe

Progression Note

By the end of Module 4, you will master safe recursion over unpredictable tree-shaped data, monoidal folds as the universal recursion pattern, Result/Option for streaming error handling, validation aggregators, retries, and structured error reporting — all while preserving laziness, equational reasoning, and constant call-stack usage.

Here's a snippet from the progression map:

Module Focus Key Outcomes
3 Lazy Iteration & Generators Memory-efficient streaming, itertools mastery, short-circuiting, observability
4 Safe Recursion & Error Handling in Streams Stack-safe tree recursion, folds, Result/Option, streaming validation/retries/reports
5 Advanced Type-Driven Design ADTs, exhaustive pattern matching, total functions, refined types

Core question:
How do you keep a lazy streaming pipeline flowing when individual records fail, while faithfully collecting every error with full provenance and enabling one-pass routing, recovery, or parallel processing — all without materialising the stream?

We now take the TreeDoc → Chunk → Result[Chunk, ErrInfo] stream from M04C04 and face the real-world production reality:

99 % of chunks embed successfully, but 1 % fail for different reasons (Unicode, OOM, model rejection, network timeout).

A naïve pipeline would:

for chunk in chunks:
    embedded.append(embed_chunk(chunk))   # raises → entire pipeline dies

You lose everything after the first bad chunk.

Even a careful try/except loop either: - halts on first error, - silently drops bad chunks, - or materialises everything just to separate good/bad.

The production solution uses tiny, composable streaming combinators that treat Result as a normal value:

embedded = par_try_map_iter(embed_chunk, chunks_with_path, stage="embed")
# → Iterator[Result[Chunk, ErrInfo]] that never raises and preserves order

The stream continues forever; good chunks flow through immediately; every failure is captured with full provenance and can be logged, retried, or routed without stopping anything.

Audience: Engineers who run RAG (or any data-processing) pipelines over real-world messy data and refuse to lose 99 % of their work because of 1 % bad records.

Outcome:
1. You will process mixed good/bad streams with zero halting and O(1) memory per item.
2. You will route, log, recover, or aggregate errors in one pass using lazy combinators.
3. You will ship a RAG pipeline that survives any per-chunk catastrophe and delivers rich, structured error reports.

We formalise exactly what we want from correct, production-ready streaming error handling: continuation, ordering, separation, bounded work, and perfect containment.


Concrete Motivating Example

Same deep TreeDoc from previous cores:

  • 100 000 chunks total.
  • 99 000 embed successfully.
  • 800 contain truncated UTF-8 → UnicodeDecodeError.
  • 200 are >10 MB → MemoryError in embedder.

Desired behaviour:

embedded: Iterator[Result[Chunk, ErrInfo]] = par_try_map_iter(
    embed_chunk,
    chunks_with_path,
    stage="embed",
    key_path=lambda cp: cp[1],
)

# Never raises, processes all 100k items, yields 99k Ok + 1k Err instantly
oks, errs = partition_results(embedded)   # materialise only at the very end
  • Total time ≈ time for 99k successful embeddings (the 1k failures are near-instant).
  • Full provenance on every error (tree path, stage, cause).

1. Laws & Invariants (machine-checked)

Law Formal Statement Enforcement
Continuation Pipeline never halts on Err; every input item produces exactly one output item. test_continuation_full_output.
Ordering All combinators preserve input order in their outputs (successes and failures appear in original sequence). test_ordering_preservation.
Separation (lazy) filter_ok / filter_err partition the stream into good/bad subsequences, each preserving relative order, without materialisation. test_lazy_separation_equivalence.
Bounded-Work Processing first k items (good or bad) performs exactly k applications of the wrapped function. test_bounded_work.
Containment No unhandled exception escapes any combinator; every failure becomes an Err. test_containment_no_leak.
One-Pass Routing split_results_to_sinks visits each item exactly once. test_single_pass_split.

These laws guarantee the stream is robust, predictable, and truly lazy.


2. Decision Table – Which Combinator Do You Actually Use?

Goal Need Parallelism? Need Recovery? Need One-Pass Routing? Recommended Combinator
Simple exception containment No No No try_map_iter
Order-preserving parallelism Yes No No par_try_map_iter
Stream only successes No No filter_ok
Stream/log only failures No No filter_err / tap_err
Recover failures to values Yes No recover_iter
Recover failures to Result Yes No recover_result_iter
Route to two sinks in one pass No Yes split_results_to_sinks
Route + contain sink exceptions No Yes split_results_to_sinks_guarded
Materialise good/bad at end No No partition_results (endpoint only)

Never materialise early for separation — use one-pass routing instead.


3. Public API Surface (end-of-Module-04 refactor note)

Refactor note: streaming Result combinators live in funcpipe_rag.result.stream (src/funcpipe_rag/result/stream.py) and are re-exported from funcpipe_rag.result and funcpipe_rag.api.core.

from funcpipe_rag.api.core import (
    filter_err,
    filter_ok,
    par_try_map_iter,
    partition_results,
    recover_iter,
    recover_result_iter,
    split_results_to_sinks,
    split_results_to_sinks_guarded,
    tap_err,
    tap_ok,
    try_map_iter,
)

4. Reference Implementations

4.1 try_map_iter – Exception-Safe Lazy Wrapping

def try_map_iter(
    fn: Callable[[T], U],
    xs: Iterable[T],
    *,
    stage: str,
    key_path: Callable[[T], tuple[int, ...]] | None = None,
    code: str = "PIPE/EXC",
) -> Iterator[Result[U, ErrInfo]]:
    for x in xs:
        try:
            yield Ok(fn(x))
        except Exception as exc:
            p = key_path(x) if key_path is not None else ()
            yield Err(make_errinfo(code, str(exc), stage, p, exc))

4.2 par_try_map_iter – Order-Preserving Parallel Mapping

from collections import deque
from concurrent.futures import ThreadPoolExecutor, Future

def par_try_map_iter(
    fn: Callable[[T], U],
    xs: Iterable[T],
    *,
    stage: str,
    key_path: Callable[[T], tuple[int, ...]] | None = None,
    code: str = "PIPE/EXC",
    max_workers: int = 8,
    max_in_flight: int = 32,
) -> Iterator[Result[U, ErrInfo]]:
    it = iter(xs)
    inflight: deque[tuple[int, T, Future[U]]] = deque()
    idx = 0

    with ThreadPoolExecutor(max_workers=max_workers) as ex:
        # Prime the pipeline
        while len(inflight) < max_in_flight:
            try:
                x = next(it)
            except StopIteration:
                break
            inflight.append((idx, x, ex.submit(fn, x)))
            idx += 1

        out_idx = 0
        while inflight:
            # Drain in order
            while inflight and inflight[0][0] == out_idx:
                i, x, fut = inflight.popleft()
                try:
                    yield Ok(fut.result())
                except Exception as exc:
                    p = key_path(x) if key_path is not None else ()
                    yield Err(make_errinfo(code, str(exc), stage, p, exc))
                out_idx += 1

            # Refill
            if len(inflight) < max_in_flight:
                try:
                    x = next(it)
                except StopIteration:
                    continue
                inflight.append((idx, x, ex.submit(fn, x)))
                idx += 1

4.3 The Rest (concise, correct, lazy)

def filter_ok(xs: Iterable[Result[T, E]]) -> Iterator[T]:
    for r in xs:
        if isinstance(r, Ok):
            yield r.value

def filter_err(xs: Iterable[Result[T, E]]) -> Iterator[E]:
    for r in xs:
        if isinstance(r, Err):
            yield r.error

def tap_ok(xs: Iterable[Result[T, E]], fn: Callable[[T], None]) -> Iterator[Result[T, E]]:
    """Observational tap only – fn may log or increment metrics but must not mutate values."""
    for r in xs:
        if isinstance(r, Ok):
            fn(r.value)
        yield r

def tap_err(xs: Iterable[Result[T, E]], fn: Callable[[E], None]) -> Iterator[Result[T, E]]:
    """Observational tap only – fn may log or increment metrics but must not mutate values."""
    for r in xs:
        if isinstance(r, Err):
            fn(r.error)
        yield r

def recover_iter(xs: Iterable[Result[T, E]], fn: Callable[[E], T]) -> Iterator[T]:
    for r in xs:
        yield r.value if isinstance(r, Ok) else fn(r.error)

def recover_result_iter(xs: Iterable[Result[T, E]], fn: Callable[[E], Result[T, E]]) -> Iterator[Result[T, E]]:
    for r in xs:
        yield r if isinstance(r, Ok) else fn(r.error)

def split_results_to_sinks(
    xs: Iterable[Result[T, E]],
    on_ok: Callable[[T], None],
    on_err: Callable[[E], None],
) -> None:
    for r in xs:
        if isinstance(r, Ok):
            on_ok(r.value)
        else:
            on_err(r.error)

def split_results_to_sinks_guarded(
    xs: Iterable[Result[T, E]],
    on_ok: Callable[[T], None],
    on_err: Callable[[E], None],
    *,
    stage: str = "sink",
) -> Iterator[Result[None, ErrInfo]]:
    """Contain sink exceptions; original Err is processed before sink may fail."""
    for r in xs:
        try:
            if isinstance(r, Ok):
                on_ok(r.value)
            else:
                on_err(r.error)
            yield Ok(None)
        except Exception as exc:
            yield Err(make_errinfo("SINK/EXC", str(exc), stage, (), exc))

def partition_results(xs: Iterable[Result[T, E]]) -> tuple[list[T], list[E]]:
    oks: list[T] = []
    errs: list[E] = []
    for r in xs:
        if isinstance(r, Ok):
            oks.append(r.value)
        else:
            errs.append(r.error)
    return oks, errs

4.4 Idiomatic RAG Usage

embedded = par_try_map_iter(
    embed_chunk,
    chunks_with_path,
    stage="embed",
    key_path=lambda cp: cp[1],
)

split_results_to_sinks(
    tap_err(embedded, log_err_info),
    on_ok=index_chunk,
    on_err=error_warehouse,
)

5. Property-Based Proofs (tests/test_result_stream.py)

@given(items=st.lists(st.integers()))
def test_continuation_full_output(items):
    def f(x: int) -> int:
        if x == -1:
            raise ValueError("boom")
        return x
    results = list(try_map_iter(f, items, stage="test"))
    assert len(results) == len(items)

@given(items=st.lists(st.integers()))
def test_ordering_preservation(items):
    tagged = list(enumerate(items))
    def f(iv):
        i, v = iv
        if v % 2 == 0:
            raise ValueError("even")
        return iv
    results = list(try_map_iter(f, tagged, stage="test", key_path=lambda iv: (iv[0],)))
    ok_indices = [r.value[0] for r in results if isinstance(r, Ok)]
    err_indices = [r.error.path[0] for r in results if isinstance(r, Err)]
    assert ok_indices + err_indices == list(range(len(items)))

@given(items=st.lists(st.integers()))
def test_lazy_separation_equivalence(items):
    def f(x: int) -> int:
        if x % 2 == 0:
            raise ValueError("even")
        return x
    stream = try_map_iter(f, items, stage="test")
    oks = list(filter_ok(stream))
    stream2 = try_map_iter(f, items, stage="test")
    errs = list(filter_err(stream2))
    assert len(oks) + len(errs) == len(items)

@given(items=st.lists(st.integers()))
def test_single_pass_split(items):
    seen = 0
    def on_ok(_): nonlocal seen; seen += 1
    def on_err(_): nonlocal seen; seen += 1
    def f(x: int) -> int:
        if x % 2 == 0:
            raise ValueError("even")
        return x
    split_results_to_sinks(try_map_iter(f, items, stage="test"), on_ok, on_err)
    assert seen == len(items)

@given(items=st.lists(st.integers()))
def test_bounded_work(items):
    seen = 0
    def f(x):
        nonlocal seen
        seen += 1
        if x == 0:
            raise ValueError("zero")
        return x
    stream = try_map_iter(f, items, stage="test")
    list(islice(stream, 25))
    assert seen == min(25, len(items))

@given(items=st.lists(st.integers()))
def test_try_map_iter_matches_try_except(items):
    def f(x: int) -> int:
        if x == 0:
            raise ValueError("boom")
        return 100 // x

    # reference try/except
    ref: list[Result[int, ErrInfo]] = []
    for x in items:
        try:
            ref.append(Ok(100 // x))
        except Exception as exc:
            # match production arity: include `exc` as cause
            ref.append(Err(make_errinfo("PIPE/EXC", str(exc), "test", (), exc)))

    got = list(try_map_iter(f, items, stage="test"))

    # Compare shapes, values, and error codes/messages (cause/path may differ)
    assert [isinstance(r, Ok) for r in got] == [isinstance(r, Ok) for r in ref]
    assert [r.value for r in got if isinstance(r, Ok)] == \
           [r.value for r in ref if isinstance(r, Ok)]
    assert [r.error.code for r in got if isinstance(r, Err)] == \
           [r.error.code for r in ref if isinstance(r, Err)]

@given(items=st.lists(st.integers(), min_size=1, max_size=200))
def test_par_try_map_iter_matches_try_map_iter(items):
    def f(x: int) -> int:
        if x == 0:
            raise ValueError("boom")
        return 100 // x

    seq = list(try_map_iter(f, items, stage="test"))
    par = list(par_try_map_iter(f, items, stage="test", max_workers=4, max_in_flight=8))

    assert [isinstance(r, Ok) for r in par] == [isinstance(r, Ok) for r in seq]
    assert [r.value if isinstance(r, Ok) else r.error.msg for r in par] == \
           [r.value if isinstance(r, Ok) else r.error.msg for r in seq]

@given(items=st.lists(st.integers()))
def test_containment_no_leak(items):
    def f(x: int) -> int:
        raise ValueError("always fail")
    # This should never raise
    list(try_map_iter(f, items, stage="test"))
    list(par_try_map_iter(f, items, stage="test"))

6. Big-O & Allocation Guarantees

Variant Time per item Heap per item Laziness
try_map_iter O(1) O(1) Yes
par_try_map_iter O(1) amortised O(max_in_flight) total Yes
filter_ok / filter_err / tap_* O(1) O(1) Yes
recover_iter / recover_result_iter O(1) O(1) Yes
split_results_to_sinks O(1) O(1) Yes
partition_results O(1) O(N) total No

All streaming operations are truly lazy.


7. Anti-Patterns & Immediate Fixes

Anti-Pattern Symptom Fix
Halting on first exception Lost data after first failure Use try_map_iter / par_try_map_iter
Materialising early for separation Memory blowup Use split_results_to_sinks (one-pass)
Silent drop of bad records Incomplete results Use filter_err or tap_err to capture
Sink exceptions crashing pipeline Partial processing Use split_results_to_sinks_guarded

8. Pre-Core Quiz

  1. try_map_iter for…? → Exception-safe lazy mapping
  2. par_try_map_iter for…? → Order-preserving parallel mapping
  3. filter_ok for…? → Stream only successes
  4. split_results_to_sinks for…? → One-pass routing to two sinks
  5. recover_iter for…? → Recover Err to values lazily

9. Post-Core Exercise

  1. Replace a try/except loop with try_map_iter → verify full output on mixed data.
  2. Add tap_err for logging → test laziness with islice.
  3. Use par_try_map_iter for embedding → measure speedup on real dataset.
  4. Implement split_results_to_sinks_guarded for safe indexing + error warehouse.
  5. Add ordering test to your own pipeline using tagged inputs.

Next: M04C06 – Validation Aggregators & Monoidal Error Collection.

You now have the complete toolkit to process real-world messy data without ever losing a single record or halting on one bad apple. The rest of Module 4 is about aggregating and reporting those errors beautifully.