Skip to content

Circuit Breakers

M04C07: Short-Circuiting and Circuit-Breaker Behaviour via Pure Result Types

Progression Note

By the end of Module 4, you will master safe recursion over unpredictable tree-shaped data, monoidal folds as the universal recursion pattern, Result/Option for streaming error handling, validation aggregators, retries, and structured error reporting — all while preserving laziness, equational reasoning, and constant call-stack usage.

Here's a snippet from the progression map:

Module Focus Key Outcomes
3 Lazy Iteration & Generators Memory-efficient streaming, itertools mastery, short-circuiting, observability
4 Safe Recursion & Error Handling in Streams Stack-safe tree recursion, folds, Result/Option, streaming validation/retries/reports
5 Advanced Type-Driven Design ADTs, exhaustive pattern matching, total functions, refined types

Core question:
How do you implement short-circuiting and circuit-breaker patterns in streaming pipelines using pure Result types, ensuring early termination on thresholds or failures while maintaining purity, composability, and resource safety?

We now take the Iterator[Result[Chunk, ErrInfo]] stream from M04C05–C06 and face the final real-world question:

“Processing 100 000 chunks is expensive. If the error rate climbs above 20 % after the first 500 chunks, the whole run is doomed — why waste hours finishing it?”

The naïve solution is a manual flag inside a loop:

error_rate = 0.0
seen = 0
for r in embedded:
    seen += 1
    if isinstance(r, Err):
        n_err += 1
    if seen >= 500:
        error_rate = n_err / seen
        if error_rate > 0.2:
            logger.critical("Aborting run – error rate too high")
            break
    process(r)

This works once — but it’s duplicated everywhere, easy to get wrong, and breaks when you later add parallelism or recovery.

The production solution uses pure, composable circuit-breakers — lazy iterator transducers over Result streams that give you early termination with mathematical guarantees and automatic resource cleanup.

Audience: Engineers who run long-running batch pipelines and cannot afford to process doomed data for hours.

Outcome:
1. You will short-circuit on first error, error count, error rate, or arbitrary predicate — all with O(k) work.
2. You will choose between observable breakers (emit BreakInfo) and silent truncate breakers.
3. You will ship a RAG pipeline that aborts gracefully the moment it becomes hopeless, with full provenance on why.

We formalise exactly what we want from correct, production-ready breakers: short-circuiting, ordering, bounded work, resource cleanup, and equivalence to reference implementations.


Concrete Motivating Example

Same 100 000 chunk tree from previous cores:

  • Normal success rate ≈ 99 %.
  • One malformed section causes 5 000 consecutive failures → error rate spikes to 30 % after 10 000 chunks processed.

Desired behaviour:

embedded = circuit_breaker_rate_emit(
    embedded,
    max_rate=0.2,
    min_samples=500,
)

for r in embedded:
    if isinstance(r, Err) and isinstance(r.error, BreakInfo):
        report_circuit_break(r.error)
        break
    if isinstance(r, Ok):
        index_chunk(r.value)
    else:
        log_err_info(r.error)

Total work: ~10 500 chunk attempts (stops instantly when threshold hit).


1. Laws & Invariants (machine-checked)

Law Formal Statement Enforcement
Short-Circuit Emit/truncate breakers stop after trigger, performing O(k) work where k ≤ position of trigger. test_emit_breakers_short_circuit, test_truncate_breakers_stop_silently.
Ordering Items (including terminal BreakInfo) appear in original stream order. test_breaker_ordering.
Resource Cleanup Upstream generators are closed via their close() method on early termination. test_upstream_closed_on_break.
Purity Breakers are pure functions — deterministic, no side effects beyond iteration. Reproducibility tests.
Equivalence For finite streams, breaker output equals the full stream truncated at the first trigger position, optionally followed by a terminal BreakInfo. test_breaker_equivalence_to_full_scan.

These laws guarantee breakers are safe, predictable, and resource-correct.


2. Decision Table – Which Breaker Do You Actually Use?

Trigger Need Observable Break? Need Terminal Value? Recommended Variant
First error Yes Yes short_circuit_on_err_emit
First error (silent) No No short_circuit_on_err_truncate
Error rate threshold Yes Yes circuit_breaker_rate_emit
Error rate threshold (silent) No No circuit_breaker_rate_truncate
Error count threshold Yes Yes circuit_breaker_count_emit
Custom predicate Yes Yes circuit_breaker_pred_emit

Prefer emit variants — they give you a terminal BreakInfo for reporting without breaking composability.


3. Public API Surface (end-of-Module-04 refactor note)

Refactor note: breakers live in funcpipe_rag.policies.breakers (src/funcpipe_rag/policies/breakers.py) and are re-exported from funcpipe_rag.api.core.

All snippets assume the Result / Ok / Err ADT from earlier Module 4 cores is already imported.

from funcpipe_rag.api.core import (
    BreakInfo,
    circuit_breaker_count_emit,
    circuit_breaker_count_truncate,
    circuit_breaker_pred_emit,
    circuit_breaker_pred_truncate,
    circuit_breaker_rate_emit,
    circuit_breaker_rate_truncate,
    short_circuit_on_err_emit,
    short_circuit_on_err_truncate,
)

4. Reference Implementations

All breakers are pure generators and guarantee upstream cleanup via try/finally.

4.1 Emit Breakers (Observable Termination)

def short_circuit_on_err_emit(
    xs: Iterable[Result[T, E]],
) -> Iterator[Result[T, E | BreakInfo[E]]]:
    """Yield items until first Err (which is yielded), then emit terminal BreakInfo."""
    it = iter(xs)
    n_ok = n_err = 0
    last_err: E | None = None
    exhausted = False
    try:
        for r in it:
            yield r
            if isinstance(r, Ok):
                n_ok += 1
            else:
                n_err += 1
                last_err = r.error
                bi = BreakInfo(
                    code="BREAK/FIRST_ERR",
                    reason="first error encountered",
                    last_error=last_err,
                    n_ok=n_ok,
                    n_err=n_err,
                    total=n_ok + n_err,
                    threshold=MappingProxyType({}),
                )
                yield Err(bi)
                return
        exhausted = True
    finally:
        if not exhausted:
            close = getattr(it, "close", None)
            if callable(close):
                close()
def circuit_breaker_rate_emit(
    xs: Iterable[Result[T, E]],
    *,
    max_rate: float,
    min_samples: int = 100,
) -> Iterator[Result[T, E | BreakInfo[E]]]:
    """Yield until error rate > max_rate after min_samples, then emit terminal BreakInfo."""
    if not 0.0 < max_rate < 1.0:
        raise ValueError("max_rate must be in (0,1)")
    if min_samples < 1:
        raise ValueError("min_samples >= 1")
    it = iter(xs)
    n_ok = n_err = 0
    last_err: E | None = None
    exhausted = False
    try:
        for r in it:
            yield r
            if isinstance(r, Ok):
                n_ok += 1
            else:
                n_err += 1
                last_err = r.error
            total = n_ok + n_err
            if total >= min_samples and n_err / total > max_rate:
                bi = BreakInfo(
                    code="BREAK/ERR_RATE",
                    reason=f"error rate {n_err/total:.3f} > {max_rate}",
                    last_error=last_err,
                    n_ok=n_ok,
                    n_err=n_err,
                    total=total,
                    threshold=MappingProxyType({"max_rate": max_rate, "min_samples": min_samples}),
                )
                yield Err(bi)
                return
        exhausted = True
    finally:
        if not exhausted:
            close = getattr(it, "close", None)
            if callable(close):
                close()
def circuit_breaker_count_emit(
    xs: Iterable[Result[T, E]],
    *,
    max_errs: int,
) -> Iterator[Result[T, E | BreakInfo[E]]]:
    """Yield until error count > max_errs, then emit terminal BreakInfo."""
    if max_errs < 0:
        raise ValueError("max_errs >= 0")
    it = iter(xs)
    n_ok = n_err = 0
    last_err: E | None = None
    exhausted = False
    try:
        for r in it:
            yield r
            if isinstance(r, Err):
                n_err += 1
                last_err = r.error
                if n_err > max_errs:
                    bi = BreakInfo(
                        code="BREAK/ERR_COUNT",
                        reason=f"errors {n_err} > {max_errs}",
                        last_error=last_err,
                        n_ok=n_ok,
                        n_err=n_err,
                        total=n_ok + n_err,
                        threshold=MappingProxyType({"max_errs": max_errs}),
                    )
                    yield Err(bi)
                    return
            else:
                n_ok += 1
        exhausted = True
    finally:
        if not exhausted:
            close = getattr(it, "close", None)
            if callable(close):
                close()
def circuit_breaker_pred_emit(
    xs: Iterable[Result[T, E]],
    pred: Callable[[Result[T, E]], bool],
) -> Iterator[Result[T, E | BreakInfo[E]]]:
    """Yield until pred(r) is True, then emit terminal BreakInfo."""
    it = iter(xs)
    n_ok = n_err = 0
    last_err: E | None = None
    exhausted = False
    try:
        for r in it:
            yield r
            if isinstance(r, Ok):
                n_ok += 1
            else:
                n_err += 1
                last_err = r.error
            if pred(r):
                bi = BreakInfo(
                    code="BREAK/PRED",
                    reason="predicate triggered",
                    last_error=last_err,
                    n_ok=n_ok,
                    n_err=n_err,
                    total=n_ok + n_err,
                    threshold=MappingProxyType({}),
                )
                yield Err(bi)
                return
        exhausted = True
    finally:
        if not exhausted:
            close = getattr(it, "close", None)
            if callable(close):
                close()

4.2 Truncate Breakers (Silent Termination)

def short_circuit_on_err_truncate(xs: Iterable[Result[T, E]]) -> Iterator[Result[T, E]]:
    """Yield until first Err, then stop silently. No terminal value."""
    it = iter(xs)
    exhausted = False
    try:
        for r in it:
            yield r
            if isinstance(r, Err):
                return
        exhausted = True
    finally:
        if not exhausted:
            close = getattr(it, "close", None)
            if callable(close):
                close()

(The other truncate variants follow the same pattern — return instead of yielding BreakInfo.)

4.3 Idiomatic RAG Usage

embedded = circuit_breaker_rate_emit(
    embedded,
    max_rate=0.2,
    min_samples=500,
)

for r in embedded:
    if isinstance(r, Err) and isinstance(r.error, BreakInfo):
        report_circuit_break(r.error)
        break
    if isinstance(r, Ok):
        index_chunk(r.value)
    else:
        log_err_info(r.error)

5. Property-Based Proofs (tests/test_breakers.py)

@given(items=st.lists(st.integers()).filter(lambda v: 0 in v))
def test_emit_breakers_short_circuit(items):
    first_err_pos = items.index(0)
    def f(x: int) -> Result[int, str]:
        return Ok(x) if x != 0 else Err("ZERO")
    results = list(short_circuit_on_err_emit(map_result_iter(f, items)))
    assert len(results) == first_err_pos + 2  # items + Err + BreakInfo
    assert isinstance(results[-1], Err) and isinstance(results[-1].error, BreakInfo)
    bi = results[-1].error
    assert bi.n_ok == first_err_pos
    assert bi.n_err == 1
    assert bi.total == bi.n_ok + bi.n_err
    assert bi.last_error == "ZERO"

@given(items=st.lists(st.integers()))
def test_truncate_breakers_stop_silently(items):
    def f(x: int) -> Result[int, str]:
        return Ok(x) if x != 0 else Err("ZERO")
    results = list(short_circuit_on_err_truncate(map_result_iter(f, items)))
    if 0 in items:
        assert len(results) == items.index(0) + 1
    else:
        assert len(results) == len(items)

@given(items=st.lists(st.integers()))
def test_upstream_closed_on_break(items):
    closed = False
    sentinel_seen = False
    def src():
        nonlocal closed, sentinel_seen
        try:
            for x in items:
                yield Ok(x) if x != 0 else Err("ZERO")
                if x == 0:
                    yield Ok("should not be reached")
                    sentinel_seen = True
        finally:
            closed = True
    results = list(short_circuit_on_err_truncate(src()))
    assert not sentinel_seen
    assert closed

@given(items=st.lists(st.integers()))
def test_breaker_ordering(items):
    def f(x: int) -> Result[int, str]:
        return Ok(x) if x != 0 else Err("ZERO")
    full = list(map_result_iter(f, items))
    broken = list(short_circuit_on_err_emit(map_result_iter(f, items)))
    # Strip terminal BreakInfo if present
    if broken and isinstance(broken[-1], Err) and isinstance(broken[-1].error, BreakInfo):
        prefix = broken[:-1]
    else:
        prefix = broken
    assert prefix == full[:len(prefix)]

@given(items=st.lists(st.integers()))
def test_breaker_equivalence_to_full_scan(items):
    def f(x: int) -> Result[int, str]:
        return Ok(x) if x != 0 else Err("ZERO")
    full = list(map_result_iter(f, items))
    broken = list(short_circuit_on_err_emit(map_result_iter(f, items)))
    # Strip terminal BreakInfo if present
    if broken and isinstance(broken[-1], Err) and isinstance(broken[-1].error, BreakInfo):
        prefix = broken[:-1]
    else:
        prefix = broken
    assert prefix == full[:len(prefix)]

def test_count_breaker_off_by_one():
    xs: list[Result[int, str]] = [Err("E"), Err("E"), Err("E")]
    results = list(circuit_breaker_count_emit(xs, max_errs=1))
    # first Err + terminal BreakInfo
    assert len(results) == 2
    assert isinstance(results[0], Err)
    assert isinstance(results[1], Err) and isinstance(results[1].error, BreakInfo)
    bi = results[1].error
    assert bi.n_err == 2
    assert bi.threshold["max_errs"] == 1

6. Big-O & Allocation Guarantees

Variant Time Heap Laziness
*_emit breakers O(k) on trigger / O(N) O(1) Yes
*_truncate breakers O(k) on trigger / O(N) O(1) Yes

All breakers are truly lazy generators with O(1) auxiliary memory.


7. Anti-Patterns & Immediate Fixes

Anti-Pattern Symptom Fix
Manual flag-based early exit Duplicated buggy code Use *_emit or *_truncate breakers
Continuing after fatal error rate Wasted hours of compute Use circuit_breaker_rate_*
Resource leaks on early break Open files/connections All breakers close upstream on termination

8. Pre-Core Quiz

  1. Emit breaker for…? → Observable termination with BreakInfo
  2. Truncate breaker for…? → Silent early stop
  3. Rate breaker for…? → Abort on error rate threshold
  4. Resource cleanup on break…? → Automatic via try/finally
  5. Never do manually what…? → A breaker can do safely

9. Post-Core Exercise

  1. Add circuit_breaker_rate_emit to embedding stream → test early abort on injected failures.
  2. Use short_circuit_on_err_truncate for quick debug runs → verify no resource leaks.
  3. Implement custom predicate breaker for "stop on first OOM" → test.
  4. Combine with par_try_map_iter → verify parallel work stops on trigger.

Next: M04C08 – Resource-Aware Streaming – Ensuring Generators Close and Clean Up Correctly.

You now have the complete toolkit to abort doomed runs instantly while keeping every line of code pure, composable, and resource-safe. The rest of Module 4 is about retries and final reporting.