Skip to content

M07C08: Idempotent Effect Design – Safe Retries and Replays

Module 07 – Main Track Core

Main track: Cores 1, 3–10 (Ports & Adapters + Capability Protocols → Production).
This is a required core. Every production FuncPipe system designs its retryable effects to be idempotent.

Progression Note

Module 7 takes the lawful containers and pipelines from Module 6 and puts all effects behind explicit boundaries.

Module Focus Key Outcomes
6 Monadic Flows as Composable Pipelines Lawful and_then, Reader/State/Writer patterns, error-typed flows
7 Effect Boundaries & Resource Safety Ports & adapters, capability protocols, resource-safe IO, idempotency
8 Async / Concurrent Pipelines Backpressure, timeouts, resumability, fairness (built on 6–7)

Core question
How do you design effectful operations to be idempotent – using stable keys and atomic check-act patterns – so that retries and replays are safe, with no duplication or unexpected side effects?

What you now have after M07C01–M07C07 + this core - Pure domain core
- Zero direct I/O in domain code
- All I/O behind swappable ports
- Effectful operations described as pure data (IOPlan)
- Typed capability protocols for every common effect
- Reliable resource cleanup
- Pure, composable logging via Writer
- Statically verified capability isolation
- Composable behaviours from small protocols
- Idempotent effects – safe to retry or replay without duplication

What the rest of Module 7 adds - Transaction/session patterns
- Incremental migration playbook
- Production story: CI, golden tests, shadow traffic

You are now seven steps away from a complete production-grade functional architecture.

1. Laws & Invariants (machine-checked where possible)

Law / Invariant Description Enforcement
Idempotence Law For any idempotent plan: perform(plan) applied twice has the same observable outcome as applied once (after the first success). Hypothesis (state equivalence)
Retry Safety Retries only act when prior execution failed or was absent; successful acts are skipped. Property tests
Key Stability The key computed for a given logical input is identical across runs (same config/release). In this core we key only on chunk text; if config changes (e.g. chunker version) should produce new artefacts, include those fields in the key. Deterministic hash tests
Atomicity Contract Check-and-act is exposed as a single operation; actual atomicity provided by the adapter. Narrow capability + adapter tests
Minimalism Preservation Idempotent wrappers use only the minimal capabilities they need. mypy --strict + review

These laws make retries and replays safe by construction.

2. Decision Table – When and How to Make an Effect Idempotent?

Scenario Retry Frequency Cost of Check Recommended Pattern
Write processed chunks to storage High Low Stable content hash + atomic write-if-absent
Upload embeddings to vector DB Medium Medium Idempotent upsert with natural key (chunk_id)
Publish metrics / events Low Low Idempotent PUT or request-ID deduplication
Transient network call High N/A Pure retry (only if naturally idempotent, e.g. GET)

Golden rule: Every effect that can be retried must be idempotent. Never retry a non-idempotent effect.

3. Public API – Idempotent Wrappers (src/funcpipe_rag/domain/idempotent.py)

# src/funcpipe_rag/domain/idempotent.py – mypy --strict clean
from __future__ import annotations
from typing import Callable, Iterator, Protocol
from hashlib import sha256

from funcpipe_rag.core.rag_types import Chunk
from funcpipe_rag.domain.effects.io_plan import IOPlan, io_delay
from funcpipe_rag.result.types import Err, ErrInfo, Ok, Result

class AtomicWriteCap(Protocol):
    def write_if_absent(
        self, key: str, chunks: Iterator[Chunk]
    ) -> Result[bool, ErrInfo]:
        """True if written, False if already present."""

def content_key(chunks: Iterator[Chunk]) -> str:
    """Stable, length-prefixed hash – collision-resistant for our domain.

    In this core we key only on chunk text. If config changes (e.g. chunker version,
    embedding model) should produce new artefacts, include those fields here.
    """
    h = sha256()
    for c in chunks:
        h.update(str(len(c.text)).encode("utf-8"))
        h.update(b"\0")
        h.update(c.text.encode("utf-8"))
    return h.hexdigest()

def idempotent_write(
    atomic: AtomicWriteCap,
) -> Callable[[Iterator[Chunk]], IOPlan[Result[None, ErrInfo]]]:
    """Reusable idempotent write behaviour.

    Returns Result[None, ErrInfo]: it intentionally discards the underlying
    `written` flag from the adapter. If you need to distinguish "wrote vs
    skipped", change the return type to Result[bool, ErrInfo] and propagate it.
    """
    def behaviour(chunks: Iterator[Chunk]) -> IOPlan[Result[None, ErrInfo]]:
        # Pure part: materialize and hash once per logical call
        cs = list(chunks)
        key = content_key(iter(cs))

        def act() -> Result[Result[None, ErrInfo], ErrInfo]:
            # Important type pattern (used by `retry_idempotent`):
            # - The IOPlan outer layer uses `ErrInfo` for *infrastructure* failures.
            # - Domain failures are represented as an *inner* Result so callers
            #   can branch/retry without IOPlan short-circuiting.
            wrote = atomic.write_if_absent(key, iter(cs))
            return Ok(Err(wrote.error)) if isinstance(wrote, Err) else Ok(Ok(None))

        # Subsequent retries/replays are O(1) w.r.t. data size
        return io_delay(act)
    return behaviour

4. Reference Implementations

4.1 Atomic Write Adapter (real + mock)

# src/funcpipe_rag/infra/adapters/atomic_storage.py
from funcpipe_rag.domain.idempotent import AtomicWriteCap

class AtomicFileStorage(AtomicWriteCap):
    def __init__(self, *, root: str) -> None: ...
    def write_if_absent(self, key: str, chunks: Iterator[Chunk]) -> Result[bool, ErrInfo]: ...

4.2 Retry Wrapper (higher-order, composes with behaviours)

# src/funcpipe_rag/fp/effects/io_retry.py – mypy --strict clean
from __future__ import annotations
from dataclasses import dataclass
from typing import Callable, TypeVar

from funcpipe_rag.domain.effects.io_plan import IOPlan, io_bind, io_delay, io_pure
from funcpipe_rag.result.types import Err, ErrInfo, Ok, Result

A = TypeVar("A")
T = TypeVar("T")

def is_transient(err: ErrInfo) -> bool:
    """Domain-specific transient error detection."""
    return err.code in {"NETWORK_TIMEOUT", "RATE_LIMIT", "SERVICE_UNAVAILABLE"}

@dataclass(frozen=True)
class RetryPolicy:
    max_attempts: int
    backoff_ms: Callable[[int], int]  # attempt → delay

def retry_idempotent(
    policy: RetryPolicy,
) -> Callable[[Callable[[A], IOPlan[Result[T, ErrInfo]]]], Callable[[A], IOPlan[Result[T, ErrInfo]]]]:
    """Retry wrapper for idempotent behaviours only (synchronous/backoff variant).

    This is the sync version: backoff is implemented with time.sleep inside
    io_delay. Async retry policies are introduced in Module 8 for event-loop
    driven shells.
    """
    def lift(behaviour: Callable[[A], IOPlan[Result[T, ErrInfo]]]) -> Callable[[A], IOPlan[Result[T, ErrInfo]]]:
        def run(a: A) -> IOPlan[Result[T, ErrInfo]]:
            def step(attempt: int) -> IOPlan[Result[T, ErrInfo]]:
                if attempt >= policy.max_attempts:
                    return io_pure(Err(ErrInfo("MAX_RETRY", f"Failed after {attempt} attempts")))

                # Rebuild the plan on each attempt – safe because behaviour(a)
                # is assumed idempotent when interpreted by the IOPlan runner.
                plan = behaviour(a)

                def handle(r: Result[T, ErrInfo]) -> IOPlan[Result[T, ErrInfo]]:
                    if isinstance(r, Ok):
                        return io_pure(r)
                    if not is_transient(r.error):
                        return io_pure(r)
                    delay_ms = policy.backoff_ms(attempt)
                    return io_bind(io_delay(lambda: _sleep(delay_ms)), lambda _: step(attempt + 1))

                return io_bind(plan, handle)

            return step(0)

        return run
    return lift

def _sleep(delay_ms: int) -> Result[None, ErrInfo]:
    import time
    time.sleep(delay_ms / 1000.0)
    return Ok(None)

4.3 Full RAG with Idempotent Write + Retry (capstone sketch)

This repo provides the building blocks: - src/funcpipe_rag/domain/idempotent.py (idempotent_write) - src/funcpipe_rag/fp/effects/io_retry.py (retry_idempotent)

It does not yet ship a fully IOPlan-based RAG entry point; treat the full composition as an exercise.

4.4 Before → After

# Before – non-idempotent write (duplicates on retry)
def old_write(chunks):
    storage.write_chunks("out.jsonl", chunks)  # duplicates if retried

# After – idempotent + safe retry
behaviour = retry_idempotent(policy)(idempotent_write(cap.storage))
plan = behaviour(chunks)
result = perform(plan)  # returns Ok(Ok(None)) on success in this repo’s IOPlan convention

5. Tests (selected)

Idempotence is verified with a deterministic fake capability in tests/unit/domain/test_idempotent.py.

6. Big-O & Allocation Guarantees

Operation Time Call-stack Heap Allocation
Key computation O(n) O(1) O(n) O(n)
Atomic check-act O(1) O(1) O(1) O(1)

Materialization + hashing is O(n) once per logical call; subsequent retries/replays are O(1) w.r.t. data size.

7. Anti-Patterns & Immediate Fixes

Anti-Pattern Symptom Fix
Non-idempotent write Duplicates on retry Stable key + atomic write-if-absent
Unstable keys False misses on retry Length-prefixed, deterministic hashing
Broad capability for atomic op Over-permission Narrow AtomicWriteCap protocol
Append-only storage Ever-growing files on retry Idempotent upsert or keyed storage

8. Pre-Core Quiz

  1. Idempotent means…? → Repeat has same effect as single execution
  2. Safe retry requires…? → Idempotent effect + transient error detection
  3. Stable keys use…? → Length-prefixed, deterministic hashing
  4. Atomic ops need…? → Narrow capability (check+act)
  5. Real power comes from…? → Retries without duplication or corruption

9. Post-Core Exercise

  1. Make your chunk write idempotent using a content hash key.
  2. Wrap it with retry_idempotent and test that retries after success are no-ops.
  3. Add a property test proving that two identical inputs produce the same key and only one write.
  4. Implement an idempotent “publish to Kafka” behaviour using message keys.

Next → M07C09: Sessions & Transactions as Explicit Data (Not Hidden Globals)

You now design idempotent effects that can be safely retried or replayed – the final piece that makes your pipelines truly resilient. Combined with everything before, your system can now survive failures, network partitions, and process restarts without corruption or duplication. The remaining cores are specialisations and production patterns.