M07C08: Idempotent Effect Design – Safe Retries and Replays¶
Module 07 – Main Track Core
Main track: Cores 1, 3–10 (Ports & Adapters + Capability Protocols → Production).
This is a required core. Every production FuncPipe system designs its retryable effects to be idempotent.
Progression Note¶
Module 7 takes the lawful containers and pipelines from Module 6 and puts all effects behind explicit boundaries.
| Module | Focus | Key Outcomes |
|---|---|---|
| 6 | Monadic Flows as Composable Pipelines | Lawful and_then, Reader/State/Writer patterns, error-typed flows |
| 7 | Effect Boundaries & Resource Safety | Ports & adapters, capability protocols, resource-safe IO, idempotency |
| 8 | Async / Concurrent Pipelines | Backpressure, timeouts, resumability, fairness (built on 6–7) |
Core question
How do you design effectful operations to be idempotent – using stable keys and atomic check-act patterns – so that retries and replays are safe, with no duplication or unexpected side effects?
What you now have after M07C01–M07C07 + this core
- Pure domain core
- Zero direct I/O in domain code
- All I/O behind swappable ports
- Effectful operations described as pure data (IOPlan)
- Typed capability protocols for every common effect
- Reliable resource cleanup
- Pure, composable logging via Writer
- Statically verified capability isolation
- Composable behaviours from small protocols
- Idempotent effects – safe to retry or replay without duplication
What the rest of Module 7 adds
- Transaction/session patterns
- Incremental migration playbook
- Production story: CI, golden tests, shadow traffic
You are now seven steps away from a complete production-grade functional architecture.
1. Laws & Invariants (machine-checked where possible)¶
| Law / Invariant | Description | Enforcement |
|---|---|---|
| Idempotence Law | For any idempotent plan: perform(plan) applied twice has the same observable outcome as applied once (after the first success). |
Hypothesis (state equivalence) |
| Retry Safety | Retries only act when prior execution failed or was absent; successful acts are skipped. | Property tests |
| Key Stability | The key computed for a given logical input is identical across runs (same config/release). In this core we key only on chunk text; if config changes (e.g. chunker version) should produce new artefacts, include those fields in the key. | Deterministic hash tests |
| Atomicity Contract | Check-and-act is exposed as a single operation; actual atomicity provided by the adapter. | Narrow capability + adapter tests |
| Minimalism Preservation | Idempotent wrappers use only the minimal capabilities they need. | mypy --strict + review |
These laws make retries and replays safe by construction.
2. Decision Table – When and How to Make an Effect Idempotent?¶
| Scenario | Retry Frequency | Cost of Check | Recommended Pattern |
|---|---|---|---|
| Write processed chunks to storage | High | Low | Stable content hash + atomic write-if-absent |
| Upload embeddings to vector DB | Medium | Medium | Idempotent upsert with natural key (chunk_id) |
| Publish metrics / events | Low | Low | Idempotent PUT or request-ID deduplication |
| Transient network call | High | N/A | Pure retry (only if naturally idempotent, e.g. GET) |
Golden rule: Every effect that can be retried must be idempotent. Never retry a non-idempotent effect.
3. Public API – Idempotent Wrappers (src/funcpipe_rag/domain/idempotent.py)¶
# src/funcpipe_rag/domain/idempotent.py – mypy --strict clean
from __future__ import annotations
from typing import Callable, Iterator, Protocol
from hashlib import sha256
from funcpipe_rag.core.rag_types import Chunk
from funcpipe_rag.domain.effects.io_plan import IOPlan, io_delay
from funcpipe_rag.result.types import Err, ErrInfo, Ok, Result
class AtomicWriteCap(Protocol):
def write_if_absent(
self, key: str, chunks: Iterator[Chunk]
) -> Result[bool, ErrInfo]:
"""True if written, False if already present."""
def content_key(chunks: Iterator[Chunk]) -> str:
"""Stable, length-prefixed hash – collision-resistant for our domain.
In this core we key only on chunk text. If config changes (e.g. chunker version,
embedding model) should produce new artefacts, include those fields here.
"""
h = sha256()
for c in chunks:
h.update(str(len(c.text)).encode("utf-8"))
h.update(b"\0")
h.update(c.text.encode("utf-8"))
return h.hexdigest()
def idempotent_write(
atomic: AtomicWriteCap,
) -> Callable[[Iterator[Chunk]], IOPlan[Result[None, ErrInfo]]]:
"""Reusable idempotent write behaviour.
Returns Result[None, ErrInfo]: it intentionally discards the underlying
`written` flag from the adapter. If you need to distinguish "wrote vs
skipped", change the return type to Result[bool, ErrInfo] and propagate it.
"""
def behaviour(chunks: Iterator[Chunk]) -> IOPlan[Result[None, ErrInfo]]:
# Pure part: materialize and hash once per logical call
cs = list(chunks)
key = content_key(iter(cs))
def act() -> Result[Result[None, ErrInfo], ErrInfo]:
# Important type pattern (used by `retry_idempotent`):
# - The IOPlan outer layer uses `ErrInfo` for *infrastructure* failures.
# - Domain failures are represented as an *inner* Result so callers
# can branch/retry without IOPlan short-circuiting.
wrote = atomic.write_if_absent(key, iter(cs))
return Ok(Err(wrote.error)) if isinstance(wrote, Err) else Ok(Ok(None))
# Subsequent retries/replays are O(1) w.r.t. data size
return io_delay(act)
return behaviour
4. Reference Implementations¶
4.1 Atomic Write Adapter (real + mock)¶
# src/funcpipe_rag/infra/adapters/atomic_storage.py
from funcpipe_rag.domain.idempotent import AtomicWriteCap
class AtomicFileStorage(AtomicWriteCap):
def __init__(self, *, root: str) -> None: ...
def write_if_absent(self, key: str, chunks: Iterator[Chunk]) -> Result[bool, ErrInfo]: ...
4.2 Retry Wrapper (higher-order, composes with behaviours)¶
# src/funcpipe_rag/fp/effects/io_retry.py – mypy --strict clean
from __future__ import annotations
from dataclasses import dataclass
from typing import Callable, TypeVar
from funcpipe_rag.domain.effects.io_plan import IOPlan, io_bind, io_delay, io_pure
from funcpipe_rag.result.types import Err, ErrInfo, Ok, Result
A = TypeVar("A")
T = TypeVar("T")
def is_transient(err: ErrInfo) -> bool:
"""Domain-specific transient error detection."""
return err.code in {"NETWORK_TIMEOUT", "RATE_LIMIT", "SERVICE_UNAVAILABLE"}
@dataclass(frozen=True)
class RetryPolicy:
max_attempts: int
backoff_ms: Callable[[int], int] # attempt → delay
def retry_idempotent(
policy: RetryPolicy,
) -> Callable[[Callable[[A], IOPlan[Result[T, ErrInfo]]]], Callable[[A], IOPlan[Result[T, ErrInfo]]]]:
"""Retry wrapper for idempotent behaviours only (synchronous/backoff variant).
This is the sync version: backoff is implemented with time.sleep inside
io_delay. Async retry policies are introduced in Module 8 for event-loop
driven shells.
"""
def lift(behaviour: Callable[[A], IOPlan[Result[T, ErrInfo]]]) -> Callable[[A], IOPlan[Result[T, ErrInfo]]]:
def run(a: A) -> IOPlan[Result[T, ErrInfo]]:
def step(attempt: int) -> IOPlan[Result[T, ErrInfo]]:
if attempt >= policy.max_attempts:
return io_pure(Err(ErrInfo("MAX_RETRY", f"Failed after {attempt} attempts")))
# Rebuild the plan on each attempt – safe because behaviour(a)
# is assumed idempotent when interpreted by the IOPlan runner.
plan = behaviour(a)
def handle(r: Result[T, ErrInfo]) -> IOPlan[Result[T, ErrInfo]]:
if isinstance(r, Ok):
return io_pure(r)
if not is_transient(r.error):
return io_pure(r)
delay_ms = policy.backoff_ms(attempt)
return io_bind(io_delay(lambda: _sleep(delay_ms)), lambda _: step(attempt + 1))
return io_bind(plan, handle)
return step(0)
return run
return lift
def _sleep(delay_ms: int) -> Result[None, ErrInfo]:
import time
time.sleep(delay_ms / 1000.0)
return Ok(None)
4.3 Full RAG with Idempotent Write + Retry (capstone sketch)¶
This repo provides the building blocks:
- src/funcpipe_rag/domain/idempotent.py (idempotent_write)
- src/funcpipe_rag/fp/effects/io_retry.py (retry_idempotent)
It does not yet ship a fully IOPlan-based RAG entry point; treat the full composition as an exercise.
4.4 Before → After¶
# Before – non-idempotent write (duplicates on retry)
def old_write(chunks):
storage.write_chunks("out.jsonl", chunks) # duplicates if retried
# After – idempotent + safe retry
behaviour = retry_idempotent(policy)(idempotent_write(cap.storage))
plan = behaviour(chunks)
result = perform(plan) # returns Ok(Ok(None)) on success in this repo’s IOPlan convention
5. Tests (selected)¶
Idempotence is verified with a deterministic fake capability in tests/unit/domain/test_idempotent.py.
6. Big-O & Allocation Guarantees¶
| Operation | Time | Call-stack | Heap | Allocation |
|---|---|---|---|---|
| Key computation | O(n) | O(1) | O(n) | O(n) |
| Atomic check-act | O(1) | O(1) | O(1) | O(1) |
Materialization + hashing is O(n) once per logical call; subsequent retries/replays are O(1) w.r.t. data size.
7. Anti-Patterns & Immediate Fixes¶
| Anti-Pattern | Symptom | Fix |
|---|---|---|
| Non-idempotent write | Duplicates on retry | Stable key + atomic write-if-absent |
| Unstable keys | False misses on retry | Length-prefixed, deterministic hashing |
| Broad capability for atomic op | Over-permission | Narrow AtomicWriteCap protocol |
| Append-only storage | Ever-growing files on retry | Idempotent upsert or keyed storage |
8. Pre-Core Quiz¶
- Idempotent means…? → Repeat has same effect as single execution
- Safe retry requires…? → Idempotent effect + transient error detection
- Stable keys use…? → Length-prefixed, deterministic hashing
- Atomic ops need…? → Narrow capability (check+act)
- Real power comes from…? → Retries without duplication or corruption
9. Post-Core Exercise¶
- Make your chunk write idempotent using a content hash key.
- Wrap it with
retry_idempotentand test that retries after success are no-ops. - Add a property test proving that two identical inputs produce the same key and only one write.
- Implement an idempotent “publish to Kafka” behaviour using message keys.
Next → M07C09: Sessions & Transactions as Explicit Data (Not Hidden Globals)
You now design idempotent effects that can be safely retried or replayed – the final piece that makes your pipelines truly resilient. Combined with everything before, your system can now survive failures, network partitions, and process restarts without corruption or duplication. The remaining cores are specialisations and production patterns.