Circuit Breakers
M04C07: Short-Circuiting and Circuit-Breaker Behaviour via Pure Result Types¶
Progression Note¶
By the end of Module 4, you will master safe recursion over unpredictable tree-shaped data, monoidal folds as the universal recursion pattern, Result/Option for streaming error handling, validation aggregators, retries, and structured error reporting — all while preserving laziness, equational reasoning, and constant call-stack usage.
Here's a snippet from the progression map:
| Module | Focus | Key Outcomes |
|---|---|---|
| 3 | Lazy Iteration & Generators | Memory-efficient streaming, itertools mastery, short-circuiting, observability |
| 4 | Safe Recursion & Error Handling in Streams | Stack-safe tree recursion, folds, Result/Option, streaming validation/retries/reports |
| 5 | Advanced Type-Driven Design | ADTs, exhaustive pattern matching, total functions, refined types |
Core question:
How do you implement short-circuiting and circuit-breaker patterns in streaming pipelines using pure Result types, ensuring early termination on thresholds or failures while maintaining purity, composability, and resource safety?
We now take the Iterator[Result[Chunk, ErrInfo]] stream from M04C05–C06 and face the final real-world question:
“Processing 100 000 chunks is expensive. If the error rate climbs above 20 % after the first 500 chunks, the whole run is doomed — why waste hours finishing it?”
The naïve solution is a manual flag inside a loop:
error_rate = 0.0
seen = 0
for r in embedded:
seen += 1
if isinstance(r, Err):
n_err += 1
if seen >= 500:
error_rate = n_err / seen
if error_rate > 0.2:
logger.critical("Aborting run – error rate too high")
break
process(r)
This works once — but it’s duplicated everywhere, easy to get wrong, and breaks when you later add parallelism or recovery.
The production solution uses pure, composable circuit-breakers — lazy iterator transducers over Result streams that give you early termination with mathematical guarantees and automatic resource cleanup.
Audience: Engineers who run long-running batch pipelines and cannot afford to process doomed data for hours.
Outcome:
1. You will short-circuit on first error, error count, error rate, or arbitrary predicate — all with O(k) work.
2. You will choose between observable breakers (emit BreakInfo) and silent truncate breakers.
3. You will ship a RAG pipeline that aborts gracefully the moment it becomes hopeless, with full provenance on why.
We formalise exactly what we want from correct, production-ready breakers: short-circuiting, ordering, bounded work, resource cleanup, and equivalence to reference implementations.
Concrete Motivating Example¶
Same 100 000 chunk tree from previous cores:
- Normal success rate ≈ 99 %.
- One malformed section causes 5 000 consecutive failures → error rate spikes to 30 % after 10 000 chunks processed.
Desired behaviour:
embedded = circuit_breaker_rate_emit(
embedded,
max_rate=0.2,
min_samples=500,
)
for r in embedded:
if isinstance(r, Err) and isinstance(r.error, BreakInfo):
report_circuit_break(r.error)
break
if isinstance(r, Ok):
index_chunk(r.value)
else:
log_err_info(r.error)
Total work: ~10 500 chunk attempts (stops instantly when threshold hit).
1. Laws & Invariants (machine-checked)¶
| Law | Formal Statement | Enforcement |
|---|---|---|
| Short-Circuit | Emit/truncate breakers stop after trigger, performing O(k) work where k ≤ position of trigger. | test_emit_breakers_short_circuit, test_truncate_breakers_stop_silently. |
| Ordering | Items (including terminal BreakInfo) appear in original stream order. |
test_breaker_ordering. |
| Resource Cleanup | Upstream generators are closed via their close() method on early termination. |
test_upstream_closed_on_break. |
| Purity | Breakers are pure functions — deterministic, no side effects beyond iteration. | Reproducibility tests. |
| Equivalence | For finite streams, breaker output equals the full stream truncated at the first trigger position, optionally followed by a terminal BreakInfo. |
test_breaker_equivalence_to_full_scan. |
These laws guarantee breakers are safe, predictable, and resource-correct.
2. Decision Table – Which Breaker Do You Actually Use?¶
| Trigger | Need Observable Break? | Need Terminal Value? | Recommended Variant |
|---|---|---|---|
| First error | Yes | Yes | short_circuit_on_err_emit |
| First error (silent) | No | No | short_circuit_on_err_truncate |
| Error rate threshold | Yes | Yes | circuit_breaker_rate_emit |
| Error rate threshold (silent) | No | No | circuit_breaker_rate_truncate |
| Error count threshold | Yes | Yes | circuit_breaker_count_emit |
| Custom predicate | Yes | Yes | circuit_breaker_pred_emit |
Prefer emit variants — they give you a terminal BreakInfo for reporting without breaking composability.
3. Public API Surface (end-of-Module-04 refactor note)¶
Refactor note: breakers live in funcpipe_rag.policies.breakers (src/funcpipe_rag/policies/breakers.py) and are re-exported from funcpipe_rag.api.core.
All snippets assume the Result / Ok / Err ADT from earlier Module 4 cores is already imported.
from funcpipe_rag.api.core import (
BreakInfo,
circuit_breaker_count_emit,
circuit_breaker_count_truncate,
circuit_breaker_pred_emit,
circuit_breaker_pred_truncate,
circuit_breaker_rate_emit,
circuit_breaker_rate_truncate,
short_circuit_on_err_emit,
short_circuit_on_err_truncate,
)
4. Reference Implementations¶
All breakers are pure generators and guarantee upstream cleanup via try/finally.
4.1 Emit Breakers (Observable Termination)¶
def short_circuit_on_err_emit(
xs: Iterable[Result[T, E]],
) -> Iterator[Result[T, E | BreakInfo[E]]]:
"""Yield items until first Err (which is yielded), then emit terminal BreakInfo."""
it = iter(xs)
n_ok = n_err = 0
last_err: E | None = None
exhausted = False
try:
for r in it:
yield r
if isinstance(r, Ok):
n_ok += 1
else:
n_err += 1
last_err = r.error
bi = BreakInfo(
code="BREAK/FIRST_ERR",
reason="first error encountered",
last_error=last_err,
n_ok=n_ok,
n_err=n_err,
total=n_ok + n_err,
threshold=MappingProxyType({}),
)
yield Err(bi)
return
exhausted = True
finally:
if not exhausted:
close = getattr(it, "close", None)
if callable(close):
close()
def circuit_breaker_rate_emit(
xs: Iterable[Result[T, E]],
*,
max_rate: float,
min_samples: int = 100,
) -> Iterator[Result[T, E | BreakInfo[E]]]:
"""Yield until error rate > max_rate after min_samples, then emit terminal BreakInfo."""
if not 0.0 < max_rate < 1.0:
raise ValueError("max_rate must be in (0,1)")
if min_samples < 1:
raise ValueError("min_samples >= 1")
it = iter(xs)
n_ok = n_err = 0
last_err: E | None = None
exhausted = False
try:
for r in it:
yield r
if isinstance(r, Ok):
n_ok += 1
else:
n_err += 1
last_err = r.error
total = n_ok + n_err
if total >= min_samples and n_err / total > max_rate:
bi = BreakInfo(
code="BREAK/ERR_RATE",
reason=f"error rate {n_err/total:.3f} > {max_rate}",
last_error=last_err,
n_ok=n_ok,
n_err=n_err,
total=total,
threshold=MappingProxyType({"max_rate": max_rate, "min_samples": min_samples}),
)
yield Err(bi)
return
exhausted = True
finally:
if not exhausted:
close = getattr(it, "close", None)
if callable(close):
close()
def circuit_breaker_count_emit(
xs: Iterable[Result[T, E]],
*,
max_errs: int,
) -> Iterator[Result[T, E | BreakInfo[E]]]:
"""Yield until error count > max_errs, then emit terminal BreakInfo."""
if max_errs < 0:
raise ValueError("max_errs >= 0")
it = iter(xs)
n_ok = n_err = 0
last_err: E | None = None
exhausted = False
try:
for r in it:
yield r
if isinstance(r, Err):
n_err += 1
last_err = r.error
if n_err > max_errs:
bi = BreakInfo(
code="BREAK/ERR_COUNT",
reason=f"errors {n_err} > {max_errs}",
last_error=last_err,
n_ok=n_ok,
n_err=n_err,
total=n_ok + n_err,
threshold=MappingProxyType({"max_errs": max_errs}),
)
yield Err(bi)
return
else:
n_ok += 1
exhausted = True
finally:
if not exhausted:
close = getattr(it, "close", None)
if callable(close):
close()
def circuit_breaker_pred_emit(
xs: Iterable[Result[T, E]],
pred: Callable[[Result[T, E]], bool],
) -> Iterator[Result[T, E | BreakInfo[E]]]:
"""Yield until pred(r) is True, then emit terminal BreakInfo."""
it = iter(xs)
n_ok = n_err = 0
last_err: E | None = None
exhausted = False
try:
for r in it:
yield r
if isinstance(r, Ok):
n_ok += 1
else:
n_err += 1
last_err = r.error
if pred(r):
bi = BreakInfo(
code="BREAK/PRED",
reason="predicate triggered",
last_error=last_err,
n_ok=n_ok,
n_err=n_err,
total=n_ok + n_err,
threshold=MappingProxyType({}),
)
yield Err(bi)
return
exhausted = True
finally:
if not exhausted:
close = getattr(it, "close", None)
if callable(close):
close()
4.2 Truncate Breakers (Silent Termination)¶
def short_circuit_on_err_truncate(xs: Iterable[Result[T, E]]) -> Iterator[Result[T, E]]:
"""Yield until first Err, then stop silently. No terminal value."""
it = iter(xs)
exhausted = False
try:
for r in it:
yield r
if isinstance(r, Err):
return
exhausted = True
finally:
if not exhausted:
close = getattr(it, "close", None)
if callable(close):
close()
(The other truncate variants follow the same pattern — return instead of yielding BreakInfo.)
4.3 Idiomatic RAG Usage¶
embedded = circuit_breaker_rate_emit(
embedded,
max_rate=0.2,
min_samples=500,
)
for r in embedded:
if isinstance(r, Err) and isinstance(r.error, BreakInfo):
report_circuit_break(r.error)
break
if isinstance(r, Ok):
index_chunk(r.value)
else:
log_err_info(r.error)
5. Property-Based Proofs (tests/test_breakers.py)¶
@given(items=st.lists(st.integers()).filter(lambda v: 0 in v))
def test_emit_breakers_short_circuit(items):
first_err_pos = items.index(0)
def f(x: int) -> Result[int, str]:
return Ok(x) if x != 0 else Err("ZERO")
results = list(short_circuit_on_err_emit(map_result_iter(f, items)))
assert len(results) == first_err_pos + 2 # items + Err + BreakInfo
assert isinstance(results[-1], Err) and isinstance(results[-1].error, BreakInfo)
bi = results[-1].error
assert bi.n_ok == first_err_pos
assert bi.n_err == 1
assert bi.total == bi.n_ok + bi.n_err
assert bi.last_error == "ZERO"
@given(items=st.lists(st.integers()))
def test_truncate_breakers_stop_silently(items):
def f(x: int) -> Result[int, str]:
return Ok(x) if x != 0 else Err("ZERO")
results = list(short_circuit_on_err_truncate(map_result_iter(f, items)))
if 0 in items:
assert len(results) == items.index(0) + 1
else:
assert len(results) == len(items)
@given(items=st.lists(st.integers()))
def test_upstream_closed_on_break(items):
closed = False
sentinel_seen = False
def src():
nonlocal closed, sentinel_seen
try:
for x in items:
yield Ok(x) if x != 0 else Err("ZERO")
if x == 0:
yield Ok("should not be reached")
sentinel_seen = True
finally:
closed = True
results = list(short_circuit_on_err_truncate(src()))
assert not sentinel_seen
assert closed
@given(items=st.lists(st.integers()))
def test_breaker_ordering(items):
def f(x: int) -> Result[int, str]:
return Ok(x) if x != 0 else Err("ZERO")
full = list(map_result_iter(f, items))
broken = list(short_circuit_on_err_emit(map_result_iter(f, items)))
# Strip terminal BreakInfo if present
if broken and isinstance(broken[-1], Err) and isinstance(broken[-1].error, BreakInfo):
prefix = broken[:-1]
else:
prefix = broken
assert prefix == full[:len(prefix)]
@given(items=st.lists(st.integers()))
def test_breaker_equivalence_to_full_scan(items):
def f(x: int) -> Result[int, str]:
return Ok(x) if x != 0 else Err("ZERO")
full = list(map_result_iter(f, items))
broken = list(short_circuit_on_err_emit(map_result_iter(f, items)))
# Strip terminal BreakInfo if present
if broken and isinstance(broken[-1], Err) and isinstance(broken[-1].error, BreakInfo):
prefix = broken[:-1]
else:
prefix = broken
assert prefix == full[:len(prefix)]
def test_count_breaker_off_by_one():
xs: list[Result[int, str]] = [Err("E"), Err("E"), Err("E")]
results = list(circuit_breaker_count_emit(xs, max_errs=1))
# first Err + terminal BreakInfo
assert len(results) == 2
assert isinstance(results[0], Err)
assert isinstance(results[1], Err) and isinstance(results[1].error, BreakInfo)
bi = results[1].error
assert bi.n_err == 2
assert bi.threshold["max_errs"] == 1
6. Big-O & Allocation Guarantees¶
| Variant | Time | Heap | Laziness |
|---|---|---|---|
| *_emit breakers | O(k) on trigger / O(N) | O(1) | Yes |
| *_truncate breakers | O(k) on trigger / O(N) | O(1) | Yes |
All breakers are truly lazy generators with O(1) auxiliary memory.
7. Anti-Patterns & Immediate Fixes¶
| Anti-Pattern | Symptom | Fix |
|---|---|---|
| Manual flag-based early exit | Duplicated buggy code | Use *_emit or *_truncate breakers |
| Continuing after fatal error rate | Wasted hours of compute | Use circuit_breaker_rate_* |
| Resource leaks on early break | Open files/connections | All breakers close upstream on termination |
8. Pre-Core Quiz¶
- Emit breaker for…? → Observable termination with BreakInfo
- Truncate breaker for…? → Silent early stop
- Rate breaker for…? → Abort on error rate threshold
- Resource cleanup on break…? → Automatic via try/finally
- Never do manually what…? → A breaker can do safely
9. Post-Core Exercise¶
- Add
circuit_breaker_rate_emitto embedding stream → test early abort on injected failures. - Use
short_circuit_on_err_truncatefor quick debug runs → verify no resource leaks. - Implement custom predicate breaker for "stop on first OOM" → test.
- Combine with
par_try_map_iter→ verify parallel work stops on trigger.
Next: M04C08 – Resource-Aware Streaming – Ensuring Generators Close and Clean Up Correctly.
You now have the complete toolkit to abort doomed runs instantly while keeping every line of code pure, composable, and resource-safe. The rest of Module 4 is about retries and final reporting.