Functional Retries
M04C09: Functional Retries with Policies (Pure, Composable Retry Loops)¶
Progression Note¶
By the end of Module 4, you will master safe recursion over unpredictable tree-shaped data, monoidal folds as the universal recursion pattern, Result/Option for streaming error handling, validation aggregators, retries, and structured error reporting — all while preserving laziness, equational reasoning, and constant call-stack usage.
Here's a snippet from the progression map:
| Module | Focus | Key Outcomes |
|---|---|---|
| 3 | Lazy Iteration & Generators | Memory-efficient streaming, itertools mastery, short-circuiting, observability |
| 4 | Safe Recursion & Error Handling in Streams | Stack-safe tree recursion, folds, Result/Option, streaming validation/retries/reports |
| 5 | Advanced Type-Driven Design | ADTs, exhaustive pattern matching, total functions, refined types |
Core question:
How do you implement pure, bounded, fair retries over aResultstream using policies as ordinary data — guaranteeing termination, no side effects, and perfect composability with breakers and resource managers?
We now take the Iterator[Result[Chunk, ErrInfo]] stream from M04C05–C08 and face the final reliability question:
“My embedding calls are flaky — network timeouts, rate limits, transient GPU OOM. I want to retry each failing chunk a few times with exponential backoff, but I refuse to block the whole pipeline or leak resources on abort.”
The naïve solution is a manual retry loop:
for chunk in chunks:
r = None
for attempt in range(5):
r = safe_embed(chunk) # returns Result
if isinstance(r, Ok):
embedded.append(r.value)
break
time.sleep(2 ** attempt) # blocks everything
if isinstance(r, Err):
embedded.append(fallback_chunk(chunk))
This blocks the entire stream on one slow chunk, leaks resources on early breaker termination, and is duplicated everywhere.
The production solution uses a pure, lazy retry combinator that treats retry policy as ordinary data and executes as a fair, bounded loop over the Result stream.
Audience: Engineers who call flaky external services (embedding APIs, vector DBs, OCR) inside RAG pipelines and need per-chunk resilience without sacrificing throughput or resource safety.
Outcome:
1. You will define retry policies as pure data and apply them with a single combinator that works on any Result stream.
2. You will get bounded, fair retries with full provenance on final errors.
3. You will ship a RAG pipeline that automatically retries transient failures while respecting breakers and resource cleanup.
We formalise exactly what we want from correct, production-ready retries: bounded execution, fairness, purity, bounded completion semantics, and seamless composition.
Concrete Motivating Example¶
Same 100 000 chunk tree from previous cores:
- 95 000 embed successfully on first try.
- 4 800 hit transient network timeout → succeed on retry 2–3.
- 200 are genuine failures (invalid content).
Desired behaviour:
embedded = retry_map_iter(
safe_embed, # returns Result[Chunk, ErrInfo]
chunks_with_path,
classifier=is_transient_err,
policy=exp_policy(total_attempts=5, base_ms=100, cap_ms=5000),
inflight_cap=128,
)
# → Iterator[Result[Chunk, ErrInfo]]
# Retries happen fairly; total work ≈ 100k + ~10k retries
# Final Errs annotated with attempt count, next_delay_ms, etc.
1. Laws & Invariants (machine-checked)¶
| Law | Formal Statement | Enforcement |
|---|---|---|
| Bounded Execution | At most max_attempts calls per input item (engine cap overrides policy). |
test_bounded_attempts, test_engine_cap_overrides_policy. |
| Purity | Deterministic on (inputs, policy); no side effects, no sleeps, no mutation. | Reproducibility + no global state. |
| Fairness | No item is starved; progress guaranteed within inflight_cap window (round-robin priming). |
test_fairness_interleaving. |
| Completion | All items eventually complete under bounded retries; completion order is implementation-defined (fair within window). | test_retry_completion. |
| Provenance | Final Err annotated with attempt, max_attempts, policy, next_delay_ms when E supports it. |
test_final_err_annotation. |
These laws guarantee retries are safe, observable, and composable.
2. Decision Table – Which Policy Do You Actually Use?¶
| Failure Pattern | Need Backoff? | Need Jitter? | Recommended Policy |
|---|---|---|---|
| Simple transient (network blip) | No | No | fixed_policy(3) |
| Rate-limited API | Yes | Optional | exp_policy(7, base_ms=200, cap_ms=30000) |
| Very flaky service | Yes | Yes | Custom policy with jitter |
| Custom logic (e.g. retry only 5xx) | – | – | User-defined Policy |
Always combine with engine max_attempts cap and Core 7 breakers for global safety.
3. Public API Surface (end-of-Module-04 refactor note)¶
Refactor note: retries live in funcpipe_rag.policies.retries (src/funcpipe_rag/policies/retries.py) and are re-exported from funcpipe_rag.api.core.
from funcpipe_rag.api.core import (
RetryCtx,
RetryDecision,
exp_policy,
fixed_policy,
is_retriable_errinfo,
restore_input_order,
retry_map_iter,
)
4. Reference Implementations¶
4.1 Core Retry Engine (fair, bounded, pure)¶
from collections import deque
def _annotate_err(
e: E,
*,
attempt: int,
max_attempts: int,
policy: str,
next_delay_ms: int | None = None,
) -> E:
"""Annotate error if it supports _replace and ctx (ErrInfo does)."""
if hasattr(e, "_replace") and hasattr(e, "ctx"):
ctx = dict(e.ctx) if e.ctx else {}
ctx.update({
"attempt": attempt,
"max_attempts": max_attempts,
"policy": policy,
})
if next_delay_ms is not None:
ctx["next_delay_ms"] = next_delay_ms
e = e._replace(ctx=MappingProxyType(ctx)) # type: ignore
return e
def retry_map_iter(
fn: Callable[[X], Result[Y, E]],
xs: Iterable[X],
*,
classifier: Classifier,
policy: Policy,
stage: str,
key_path: Callable[[X], tuple[int, ...]] | None = None,
max_attempts: int = 10,
policy_name: str | None = None,
inflight_cap: int = 64,
) -> Iterator[Result[Y, E]]:
"""Pure, fair, bounded retry over a Result-returning fn."""
if max_attempts < 1:
raise ValueError("max_attempts >= 1")
if inflight_cap < 1:
raise ValueError("inflight_cap >= 1")
name = policy_name or getattr(policy, "__name__", "anonymous")
it = iter(xs)
work: deque[tuple[X, int]] = deque() # (item, attempt)
def prime() -> None:
while len(work) < inflight_cap:
try:
work.append((next(it), 1))
except StopIteration:
break
prime()
while work:
x, attempt = work.popleft()
r = fn(x)
if isinstance(r, Ok):
yield r
prime()
continue
e = r.error
if not classifier(e):
yield Err(_annotate_err(e, attempt=attempt, max_attempts=max_attempts, policy=name))
prime()
continue
p = key_path(x) if key_path is not None else ()
ctx = RetryCtx(item=x, attempt=attempt, error=e, stage=stage, path=p, policy_name=name)
try:
dec = policy(ctx)
except Exception as pe:
dec = RetryDecision(retry=False, next_delay_ms=None)
if dec.retry and attempt < max_attempts:
work.append((x, attempt + 1))
else:
yield Err(_annotate_err(
e,
attempt=attempt,
max_attempts=max_attempts,
policy=name,
next_delay_ms=dec.next_delay_ms,
))
prime()
4.2 Policies (pure data → decision)¶
def fixed_policy(total_attempts: int) -> Policy:
def p(ctx: RetryCtx[Any, Any]) -> RetryDecision:
return RetryDecision(retry=ctx.attempt < total_attempts, next_delay_ms=None)
p.__name__ = f"fixed_policy[{total_attempts}]"
return p
def exp_policy(total_attempts: int, base_ms: int, cap_ms: int) -> Policy:
def p(ctx: RetryCtx[Any, Any]) -> RetryDecision:
delay = min(cap_ms, base_ms * (2 ** (ctx.attempt - 1)))
return RetryDecision(retry=ctx.attempt < total_attempts, next_delay_ms=delay)
p.__name__ = f"exp_policy[{total_attempts},{base_ms},{cap_ms}]"
return p
4.3 Default Classifier¶
def is_retriable_errinfo(e: Any) -> bool:
code = getattr(e, "code", None)
return code in {"RATE_LIMIT", "TIMEOUT", "CONN_RESET", "EMBED/UNAVAILABLE", "TRANSIENT"}
4.4 Resequencer (restore input order when needed)¶
def restore_input_order(
tagged: Iterable[tuple[int, Result[Y, E]]],
) -> Iterator[Result[Y, E]]:
"""Restore input order from (idx, result) pairs. Assumes indices are 0-based consecutive integers."""
buffer: dict[int, Result[Y, E]] = {}
expect = 0
for idx, r in tagged:
buffer[idx] = r
while expect in buffer:
yield buffer.pop(expect)
expect += 1
4.5 Idiomatic RAG Usage¶
embedded = retry_map_iter(
safe_embed, # returns Result[Chunk, ErrInfo]
chunks_with_path,
classifier=is_retriable_errinfo,
policy=exp_policy(total_attempts=5, base_ms=100, cap_ms=10000),
stage="embed",
key_path=lambda cp: cp[1],
inflight_cap=128,
max_attempts=10, # hard engine cap
)
# Optional: restore input order if downstream requires it
embedded = restore_input_order(enumerate(embedded))
for r in circuit_breaker_rate_emit(embedded, max_rate=0.2):
if isinstance(r, Err) and isinstance(r.error, BreakInfo):
report_circuit_break(r.error)
break
process(r)
5. Property-Based Proofs (tests/test_retries.py)¶
from hypothesis import given, strategies as st
from collections import defaultdict
@given(items=st.lists(st.integers()))
def test_bounded_attempts(items):
attempts = defaultdict(int)
def fn(x: int) -> Result[int, str]:
attempts[x] += 1
return Ok(x) if attempts[x] >= 3 else Err("TRANSIENT")
out = list(retry_map_iter(
fn, items,
classifier=lambda e: e == "TRANSIENT",
policy=fixed_policy(5),
stage="test",
max_attempts=10,
))
assert all(a <= 5 for a in attempts.values())
@given()
def test_engine_cap_overrides_policy():
attempts = [0]
def fn(_):
attempts[0] += 1
return Err("TRANSIENT")
def always_retry(_): return RetryDecision(True, None)
out = list(retry_map_iter(
fn, [0],
classifier=lambda _: True,
policy=always_retry,
stage="test",
max_attempts=4,
))
assert attempts[0] == 4
@given(items=st.lists(st.integers(), min_size=10))
def test_fairness_interleaving(items):
attempts = defaultdict(int)
def fn(x: int):
attempts[x] += 1
return Ok(x) if attempts[x] >= 2 else Err("TRANSIENT")
out = list(retry_map_iter(
fn, items,
classifier=lambda _: True,
policy=fixed_policy(3),
stage="test",
inflight_cap=4,
))
# Every item gets at least one chance before any gets a third
assert max(attempts.values()) <= min(attempts.values()) + 1
@given(items=st.lists(st.integers()))
def test_retry_completion(items):
tagged = list(enumerate(items))
attempts = defaultdict(int)
def fn(iv: tuple[int, int]):
i, v = iv
attempts[i] += 1
needed = (v % 5) + 1
return Ok(iv) if attempts[i] >= needed else Err("TRANSIENT")
results = list(retry_map_iter(
fn, tagged,
classifier=lambda _: True,
policy=fixed_policy(10),
stage="test",
inflight_cap=32,
))
# All items eventually complete
assert len(results) == len(items)
@given(items=st.lists(st.integers()))
def test_final_err_annotation(items):
def fn(x: int) -> Result[int, str]:
return Err("TRANSIENT")
out = list(retry_map_iter(
fn, items,
classifier=lambda _: True,
policy=fixed_policy(3),
stage="test",
max_attempts=5,
))
for r in out:
assert isinstance(r, Err)
e = r.error
assert e == "TRANSIENT" # annotation skipped for str errors
6. Big-O & Allocation Guarantees¶
| Variant | Time | Heap | Laziness |
|---|---|---|---|
| retry_map_iter | O(N × max_attempts) worst-case | O(inflight_cap) | Yes |
Bounded by max_attempts and inflight_cap; pure and lazy.
7. Anti-Patterns & Immediate Fixes¶
| Anti-Pattern | Symptom | Fix |
|---|---|---|
| Infinite retries | Non-termination | Always bound via policy + engine cap |
| Blocking sleep in retry | Pipeline stalls | Policy returns delay only; schedule async |
| Head-of-line blocking | One slow item delays all | Bounded inflight_cap + fair priming |
| Mutable retry state | Nondeterminism | Policy as pure data |
8. Pre-Core Quiz¶
retry_map_iterfor…? → Fair bounded retries on Result streamfixed_policyfor…? → Fixed attempt countinflight_capfor…? → Fairness / prevent starvation- Final
Errcontains…? → retry metadata when the error type supports it (e.g. ErrInfo.ctx) - Order semantics…? → Implementation-defined completion order; resequence if input order is required
9. Post-Core Exercise¶
- Apply
retry_map_iterto a flaky embedder → verify bounded attempts + fairness. - Write a jitter policy → test
next_delay_msvariance. - Refactor an imperative retry loop →
retry_map_iter. - Combine with breaker → confirm rate reflects final outcomes after retries.
Next: M04C10 – Structured Error Reports from Streaming Pipelines.
You now have the complete toolkit to make any flaky operation resilient — pure, bounded, fair, and composable with every previous core. The final core is about turning all those errors into beautiful reports.