Skip to content

M05C02: Modelling Domain States (Pending/Running/Done/Failed) as ADTs

Progression Note

By the end of Module 5, you will model every domain concept as immutable algebraic data types (products and tagged sums), eliminating whole classes of runtime errors through exhaustive pattern matching, mypy-checked totality, and pure serialization contracts.

Module Focus Key Outcomes
4 Safe Recursion & Error Handling Stack-safe tree recursion, folds, Result/Option, streaming validation/retries
5 Advanced Type-Driven Design ADTs, exhaustive pattern matching, total functions, refined types
6 Monadic Flows as Composable Pipelines bind/and_then, Reader/State-like patterns, error-typed flows

Core question
How do you model domain states like Pending/Running/Done/Failed as algebraic data types in Python — ensuring exhaustive handling, type-safe transitions, and total functions in every FuncPipe pipeline stage?

We extend Core 1’s product/sum types to real domain state machines. The question every seasoned team eventually asks:

“Why do we still ship pipelines that mysteriously get stuck in half-finished states, process events out-of-order, or silently ignore terminal failures?”

The answer is always the same: you’re using strings, booleans, or mutable status objects instead of proper state ADTs with payloads and validated transitions.

The naïve pattern everyone writes first:

class Job:
    def __init__(self):
        self.status = "pending"
        self.progress = 0
        self.error = None

    def advance(self, event):
        if self.status == "running" and event == "succeed":
            self.status = "done"     # someone forgets to copy artifact_id

Silent invalid transitions, mutable corruption, missed cases everywhere.

The production pattern: every state is a tagged variant with its own payload; every transition is a pure function forced to handle all cases.

ProcessingState = Pending | Running | Done | Failed

def transition(state: ProcessingState, event: Event) -> ProcessingState:
    match state, event:
        case Pending(), EvStart(started_at=s):
            return running(started_at=s, progress_permille=0)
        case Running(), EvAdvance(delta_permille=d):
            return running(started_at=state.started_at,
                           progress_permille=min(1000, state.progress_permille + d))
        case Running(), EvSucceed(completed_at=c, artifact_id=a, dim=d, sha256=h):
            return done(
                completed_at=c,
                artifact_id=a,
                dim=d,
                sha256=h,
            )
        case Running(), EvFail(failed_at=f, code=code, msg=msg, attempt=attempt):
            return failed(
                failed_at=f,
                code=code,
                msg=msg,
                attempt=attempt,
            )
        case (Done() | Failed()), _:
            return state
    raise ValueError("invalid transition")

Invalid transitions impossible, terminal states idempotent, adding a new event/state forces every transition site to update — forever.

Audience: Engineers tired of “stuck in running forever” bugs who want mathematically provable state machines.

Outcome 1. Every flag/field soup replaced with exhaustive, payload-carrying state ADTs. 2. Pure, validated transitions — no mutation, no invalid states. 3. Immutable, serialisable states that survive crashes/refactors without corruption.

Tiny Non-Domain Example – Traffic Light State Machine

from dataclasses import dataclass
from typing import Literal

try:  # Python ≥3.11
    from typing import assert_never
except ImportError:  # Python ≤3.10
    from typing_extensions import assert_never

@dataclass(frozen=True, slots=True)
class Red:
    kind: Literal["red"] = "red"

@dataclass(frozen=True, slots=True)
class Green:
    kind: Literal["green"] = "green"

@dataclass(frozen=True, slots=True)
class Yellow:
    kind: Literal["yellow"] = "yellow"

TrafficLight = Red | Green | Yellow

def next_light(light: TrafficLight) -> TrafficLight:
    match light:
        case Red():    return Green()
        case Green():  return Yellow()
        case Yellow(): return Red()
    assert_never(light)

Adding Blinking forces every call site to update — no silent infinite green.

Why State ADTs? (Three bullets every engineer should internalise)

  • Exhaustiveness: Adding a new state/event breaks every handler/transition until you update it — no silent missing cases.
  • Immutability + Value semantics: Frozen + structural eq/hash → safe as dict keys, cache keys, pure function results.
  • Validated transitions + payload safety: Smart constructors + pure transition functions make illegal states unrepresentable.

1. Laws & Invariants (machine-checked)

Law Formal Statement Enforcement
Exhaustiveness Every match/transition handles all state and event variants mypy --strict + assert_never + tests
Immutability No field can be mutated post-construction frozen=True, slots=True + tests
Structural Equality x == y iff all fields equal Hypothesis equality tests
Transition Validity Only legal transitions allowed; invalid raise early Property tests on invalid event sequences
Progress Monotonic Progress never decreases; always 0 ≤ progress_permille ≤ 1000 Property tests + smart constructors
Terminal Idempotence Applying any event to Done Failed yields a value-equal state
JSON Round-Trip from_dict(to_dict(x)) == x for all instances test_processing_state_roundtrip

2. Decision Table – Enum vs Tagged Union

Need Payload? Recommended Construction
Simple status flags No Enum
States with data Yes Union of tagged dataclasses (what we use here)

Never use strings or mutable status objects for domain state.

3. Public API (fp/core.py – mypy --strict clean)

from __future__ import annotations

from dataclasses import dataclass
from enum import Enum
from typing import Generic, TypeVar, Literal, Mapping, Sequence, TypeAlias
from datetime import datetime, timezone

try:  # Python ≥3.11
    from typing import assert_never
except ImportError:  # Python ≤3.10
    from typing_extensions import assert_never

JSONPrimitive: TypeAlias = str | int | float | bool | None
JSON: TypeAlias = JSONPrimitive | Mapping[str, "JSON"] | Sequence["JSON"]

UTC = timezone.utc
T = TypeVar("T")
E = TypeVar("E")

class ErrorCode(str, Enum):
    RATE_LIMIT = "RATE_LIMIT"
    TIMEOUT = "TIMEOUT"
    EMBED_FAIL = "EMBED_FAIL"
    INTERNAL = "INTERNAL"

# Events — pure data + smart constructors
@dataclass(frozen=True, slots=True, kw_only=True)
class EvStart:
    started_at: datetime

@dataclass(frozen=True, slots=True, kw_only=True)
class EvAdvance:
    delta_permille: int   # ≥0

@dataclass(frozen=True, slots=True, kw_only=True)
class EvSucceed:
    completed_at: datetime
    artifact_id: str
    dim: int
    sha256: str

@dataclass(frozen=True, slots=True, kw_only=True)
class EvFail:
    failed_at: datetime
    code: ErrorCode
    msg: str
    attempt: int

Event = EvStart | EvAdvance | EvSucceed | EvFail

def start_event(*, started_at: datetime) -> EvStart:
    if started_at.tzinfo is None:
        raise ValueError("started_at must be timezone-aware")
    return EvStart(started_at=started_at)

def advance_event(*, delta_permille: int) -> EvAdvance:
    if delta_permille < 0:
        raise ValueError("delta_permille must be ≥0 — progress is monotonic")
    return EvAdvance(delta_permille=delta_permille)

def succeed_event(*, completed_at: datetime, artifact_id: str, dim: int, sha256: str) -> EvSucceed:
    if completed_at.tzinfo is None:
        raise ValueError("completed_at must be timezone-aware")
    return EvSucceed(completed_at=completed_at, artifact_id=artifact_id, dim=dim, sha256=sha256)

def fail_event(*, failed_at: datetime, code: ErrorCode, msg: str, attempt: int) -> EvFail:
    if failed_at.tzinfo is None:
        raise ValueError("failed_at must be timezone-aware")
    if attempt < 1:
        raise ValueError("attempt must be ≥1")
    return EvFail(failed_at=failed_at, code=code, msg=msg, attempt=attempt)

# Processing state variants + smart constructors
@dataclass(frozen=True, slots=True, kw_only=True)
class Pending:
    kind: Literal["pending"] = "pending"
    queued_at: datetime
    version: Literal[1] = 1

@dataclass(frozen=True, slots=True, kw_only=True)
class Running:
    kind: Literal["running"] = "running"
    started_at: datetime
    progress_permille: int   # 0–1000
    version: Literal[1] = 1

@dataclass(frozen=True, slots=True, kw_only=True)
class Done:
    kind: Literal["done"] = "done"
    completed_at: datetime
    artifact_id: str
    dim: int
    sha256: str
    version: Literal[1] = 1

@dataclass(frozen=True, slots=True, kw_only=True)
class Failed:
    kind: Literal["failed"] = "failed"
    failed_at: datetime
    code: ErrorCode
    msg: str
    attempt: int
    version: Literal[1] = 1

ProcessingState = Pending | Running | Done | Failed

from funcpipe_rag.result.types import Result, Ok, Err

def pending(*, queued_at: datetime) -> Pending:
    if queued_at.tzinfo is None:
        raise ValueError("queued_at must be timezone-aware")
    return Pending(queued_at=queued_at)

def running(*, started_at: datetime, progress_permille: int) -> Running:
    if started_at.tzinfo is None:
        raise ValueError("started_at must be timezone-aware")
    if not 0 <= progress_permille <= 1000:
        raise ValueError("progress_permille must be 0–1000")
    return Running(started_at=started_at, progress_permille=progress_permille)

def done(*, completed_at: datetime, artifact_id: str, dim: int, sha256: str) -> Done:
    if completed_at.tzinfo is None:
        raise ValueError("completed_at must be timezone-aware")
    if dim <= 0 or len(sha256) != 64 or not all(c in "0123456789abcdefABCDEF" for c in sha256):
        raise ValueError("invalid Done payload")
    return Done(completed_at=completed_at, artifact_id=artifact_id, dim=dim, sha256=sha256)

def failed(*, failed_at: datetime, code: ErrorCode, msg: str, attempt: int) -> Failed:
    if failed_at.tzinfo is None:
        raise ValueError("failed_at must be timezone-aware")
    if attempt < 1:
        raise ValueError("attempt must be >= 1")
    return Failed(failed_at=failed_at, code=code, msg=msg, attempt=attempt)

4. Reference Implementations (continued)

4.1 Pure State Transition (match-based, exhaustive)

def transition(state: ProcessingState, event: Event) -> ProcessingState:
    match state, event:
        case Pending(), EvStart(started_at=s):
            if s < state.queued_at:
                raise ValueError("started_at cannot be earlier than queued_at")
            return running(started_at=s, progress_permille=0)

        case Running(), EvAdvance(delta_permille=d):
            new_p = min(1000, state.progress_permille + d)
            return running(started_at=state.started_at, progress_permille=new_p)

        case Running(), EvSucceed(completed_at=c, artifact_id=a, dim=d, sha256=h):
            if c < state.started_at:
                raise ValueError("completed_at cannot be earlier than started_at")
            return done(completed_at=c, artifact_id=a, dim=d, sha256=h)

        case Running(), EvFail(failed_at=f, code=c, msg=m, attempt=a):
            if f < state.started_at:
                raise ValueError("failed_at cannot be earlier than started_at")
            return failed(failed_at=f, code=c, msg=m, attempt=a)

        case (Done() | Failed()), _:
            return state

    raise ValueError(f"invalid transition {state.kind}{event.__class__.__name__}")

4.2 Exhaustive Handling Example

def describe(state: ProcessingState) -> str:
    match state:
        case Pending(queued_at=q):
            return f"Pending – queued at {q.isoformat()}"
        case Running(started_at=s, progress_permille=p):
            return f"Running – {p/10:.1f}% (started {s.isoformat()})"
        case Done(completed_at=c, artifact_id=a, dim=d):
            return f"Done – artifact {a} ({d} dims) at {c.isoformat()}"
        case Failed(failed_at=f, code=c, msg=m, attempt=a):
            return f"Failed – {c.value} (attempt {a}): {m}"
    assert_never(state)

4.3 JSON Round-Trip (stable, versioned)

def processing_state_to_dict(s: ProcessingState) -> dict[str, JSON]:
    base: dict[str, JSON] = {"kind": s.kind, "version": s.version}
    match s:
        case Pending(queued_at=q):
            return base | {"queued_at": q.isoformat()}
        case Running(started_at=s, progress_permille=p):
            return base | {"started_at": s.isoformat(), "progress_permille": p}
        case Done(completed_at=c, artifact_id=a, dim=d, sha256=h):
            return base | {"completed_at": c.isoformat(), "artifact_id": a, "dim": d, "sha256": h}
        case Failed(failed_at=f, code=c, msg=m, attempt=a):
            return base | {"failed_at": f.isoformat(), "code": c.value, "msg": m, "attempt": a}
    assert_never(s)

def processing_state_from_dict(d: Mapping[str, JSON]) -> ProcessingState:
    if d.get("version") != 1:
        raise ValueError("unsupported version")
    kind = d["kind"]
    match kind:
        case "pending":
            return pending(queued_at=datetime.fromisoformat(d["queued_at"]))  # type: ignore
        case "running":
            return running(
                started_at=datetime.fromisoformat(d["started_at"]),
                progress_permille=int(d["progress_permille"]),
            )
        case "done":
            return done(
                completed_at=datetime.fromisoformat(d["completed_at"]),
                artifact_id=str(d["artifact_id"]),
                dim=int(d["dim"]),
                sha256=str(d["sha256"]),
            )
        case "failed":
            return failed(
                failed_at=datetime.fromisoformat(d["failed_at"]),
                code=ErrorCode(d["code"]),   # type: ignore
                msg=str(d["msg"]),
                attempt=int(d["attempt"]),
            )
    raise ValueError(f"unknown kind {kind}")

4.4 Pipeline Integration (state machine is single source of truth)

def process_chunk(chunk: Chunk) -> Result[Done, Failed]:
    state: ProcessingState = pending(queued_at=datetime.now(UTC))
    try:
        now = datetime.now(UTC)
        state = transition(state, start_event(started_at=now))
        # ... actual embedding work ...
        completed_at = datetime.now(UTC)
        state = transition(
            state,
            succeed_event(
                completed_at=completed_at,
                artifact_id="emb-123",
                dim=1536,
                sha256="0" * 64,
            ),
        )
        assert isinstance(state, Done)
        return ok(state)
    except Exception as exc:
        state = transition(
            state,
            fail_event(
                failed_at=datetime.now(UTC),
                code=ErrorCode.EMBED_FAIL,
                msg=str(exc),
                attempt=1,
            ),
        )
        assert isinstance(state, Failed)
        return err(state)

5. Property-Based Proofs (tests/test_module_05_c02.py)

import dataclasses
import pytest
from hypothesis import given, strategies as st
from datetime import datetime, timezone

UTC = timezone.utc

# Valid aware datetimes only
aware_dt = st.datetimes(timezones=st.just(UTC))

@given(queued_at=aware_dt)
def test_pending_naive_rejected(queued_at):
    naive = queued_at.replace(tzinfo=None)
    with pytest.raises(ValueError):
        pending(queued_at=naive)

@given(delta=st.integers(max_value=-1))
def test_advance_negative_rejected(delta):
    with pytest.raises(ValueError):
        advance_event(delta_permille=delta)

@given(state=running_st, delta=st.integers(min_value=0, max_value=2000))
def test_progress_monotonic_and_clamped(state, delta):
    ev = advance_event(delta_permille=delta)
    new_state = transition(state, ev)
    assert new_state.progress_permille >= state.progress_permille
    assert new_state.progress_permille <= 1000

@given(state=st.one_of(done_st, failed_st))
def test_terminal_idempotent(state):
    # any event leaves terminal state unchanged
    for ev in [
        start_event(started_at=datetime.now(UTC)),
        advance_event(delta_permille=100),
        succeed_event(completed_at=datetime.now(UTC), artifact_id="x", dim=1, sha256="0"*64),
        fail_event(failed_at=datetime.now(UTC), code=ErrorCode.INTERNAL, msg="", attempt=1),
    ]:
        assert transition(state, ev) == state

6. Big-O & Allocation Guarantees

Operation Time Heap Notes
State / event creation O(1) O(1) slots=True → no dict
Transition / match O(1) O(1) Fixed number of variants
JSON round-trip O(1) O(1) Fixed fields (O(L) in string length)

7. Anti-Patterns & Immediate Fixes

Anti-Pattern Symptom Fix
String/enum status flags Invalid states possible Tagged union with payloads
Mutable state objects Corruption, races frozen=True + pure transition functions
Negative progress Confusing UI, bugs delta_permille ≥0 enforced at event creation
Naive datetimes TypeError on comparison Smart constructors require tz-aware

8. Pre-Core Quiz

  1. Simple flags → Enum
  2. States with data → Tagged union of dataclasses
  3. Pure transitions → No mutation, return new state
  4. assert_never → Proves exhaustiveness
  5. Progress rule → Monotonic non-decreasing, clamped 0–1000

9. Post-Core Exercise

  1. Model your current chunk processing state as ProcessingState → implement transition and test invalid paths.
  2. Add a new event (e.g. EvPause) and new state Paused → watch mypy break every transition site.
  3. Make a pipeline step return Result[Done, Failed] wrapping the terminal state produced by the state machine.

Next: M05C03 – Functors in Python (“Things You Can Map Over” – Option, Result, List).

You now model every domain state as pure, immutable, exhaustively-handled ADTs — illegal states are unrepresentable, invalid transitions impossible, and the type checker enforces totality. The rest of Module 5 composes these ADTs into functors, applicatives, and monoids for bulletproof pipelines.