M05C02: Modelling Domain States (Pending/Running/Done/Failed) as ADTs¶
Progression Note¶
By the end of Module 5, you will model every domain concept as immutable algebraic data types (products and tagged sums), eliminating whole classes of runtime errors through exhaustive pattern matching, mypy-checked totality, and pure serialization contracts.
| Module | Focus | Key Outcomes |
|---|---|---|
| 4 | Safe Recursion & Error Handling | Stack-safe tree recursion, folds, Result/Option, streaming validation/retries |
| 5 | Advanced Type-Driven Design | ADTs, exhaustive pattern matching, total functions, refined types |
| 6 | Monadic Flows as Composable Pipelines | bind/and_then, Reader/State-like patterns, error-typed flows |
Core question
How do you model domain states like Pending/Running/Done/Failed as algebraic data types in Python — ensuring exhaustive handling, type-safe transitions, and total functions in every FuncPipe pipeline stage?
We extend Core 1’s product/sum types to real domain state machines. The question every seasoned team eventually asks:
“Why do we still ship pipelines that mysteriously get stuck in half-finished states, process events out-of-order, or silently ignore terminal failures?”
The answer is always the same: you’re using strings, booleans, or mutable status objects instead of proper state ADTs with payloads and validated transitions.
The naïve pattern everyone writes first:
class Job:
def __init__(self):
self.status = "pending"
self.progress = 0
self.error = None
def advance(self, event):
if self.status == "running" and event == "succeed":
self.status = "done" # someone forgets to copy artifact_id
Silent invalid transitions, mutable corruption, missed cases everywhere.
The production pattern: every state is a tagged variant with its own payload; every transition is a pure function forced to handle all cases.
ProcessingState = Pending | Running | Done | Failed
def transition(state: ProcessingState, event: Event) -> ProcessingState:
match state, event:
case Pending(), EvStart(started_at=s):
return running(started_at=s, progress_permille=0)
case Running(), EvAdvance(delta_permille=d):
return running(started_at=state.started_at,
progress_permille=min(1000, state.progress_permille + d))
case Running(), EvSucceed(completed_at=c, artifact_id=a, dim=d, sha256=h):
return done(
completed_at=c,
artifact_id=a,
dim=d,
sha256=h,
)
case Running(), EvFail(failed_at=f, code=code, msg=msg, attempt=attempt):
return failed(
failed_at=f,
code=code,
msg=msg,
attempt=attempt,
)
case (Done() | Failed()), _:
return state
raise ValueError("invalid transition")
Invalid transitions impossible, terminal states idempotent, adding a new event/state forces every transition site to update — forever.
Audience: Engineers tired of “stuck in running forever” bugs who want mathematically provable state machines.
Outcome 1. Every flag/field soup replaced with exhaustive, payload-carrying state ADTs. 2. Pure, validated transitions — no mutation, no invalid states. 3. Immutable, serialisable states that survive crashes/refactors without corruption.
Tiny Non-Domain Example – Traffic Light State Machine¶
from dataclasses import dataclass
from typing import Literal
try: # Python ≥3.11
from typing import assert_never
except ImportError: # Python ≤3.10
from typing_extensions import assert_never
@dataclass(frozen=True, slots=True)
class Red:
kind: Literal["red"] = "red"
@dataclass(frozen=True, slots=True)
class Green:
kind: Literal["green"] = "green"
@dataclass(frozen=True, slots=True)
class Yellow:
kind: Literal["yellow"] = "yellow"
TrafficLight = Red | Green | Yellow
def next_light(light: TrafficLight) -> TrafficLight:
match light:
case Red(): return Green()
case Green(): return Yellow()
case Yellow(): return Red()
assert_never(light)
Adding Blinking forces every call site to update — no silent infinite green.
Why State ADTs? (Three bullets every engineer should internalise)¶
- Exhaustiveness: Adding a new state/event breaks every handler/transition until you update it — no silent missing cases.
- Immutability + Value semantics: Frozen + structural eq/hash → safe as dict keys, cache keys, pure function results.
- Validated transitions + payload safety: Smart constructors + pure transition functions make illegal states unrepresentable.
1. Laws & Invariants (machine-checked)¶
| Law | Formal Statement | Enforcement |
|---|---|---|
| Exhaustiveness | Every match/transition handles all state and event variants | mypy --strict + assert_never + tests |
| Immutability | No field can be mutated post-construction | frozen=True, slots=True + tests |
| Structural Equality | x == y iff all fields equal |
Hypothesis equality tests |
| Transition Validity | Only legal transitions allowed; invalid raise early | Property tests on invalid event sequences |
| Progress Monotonic | Progress never decreases; always 0 ≤ progress_permille ≤ 1000 | Property tests + smart constructors |
| Terminal Idempotence | Applying any event to Done | Failed yields a value-equal state |
| JSON Round-Trip | from_dict(to_dict(x)) == x for all instances |
test_processing_state_roundtrip |
2. Decision Table – Enum vs Tagged Union¶
| Need | Payload? | Recommended Construction |
|---|---|---|
| Simple status flags | No | Enum |
| States with data | Yes | Union of tagged dataclasses (what we use here) |
Never use strings or mutable status objects for domain state.
3. Public API (fp/core.py – mypy --strict clean)¶
from __future__ import annotations
from dataclasses import dataclass
from enum import Enum
from typing import Generic, TypeVar, Literal, Mapping, Sequence, TypeAlias
from datetime import datetime, timezone
try: # Python ≥3.11
from typing import assert_never
except ImportError: # Python ≤3.10
from typing_extensions import assert_never
JSONPrimitive: TypeAlias = str | int | float | bool | None
JSON: TypeAlias = JSONPrimitive | Mapping[str, "JSON"] | Sequence["JSON"]
UTC = timezone.utc
T = TypeVar("T")
E = TypeVar("E")
class ErrorCode(str, Enum):
RATE_LIMIT = "RATE_LIMIT"
TIMEOUT = "TIMEOUT"
EMBED_FAIL = "EMBED_FAIL"
INTERNAL = "INTERNAL"
# Events — pure data + smart constructors
@dataclass(frozen=True, slots=True, kw_only=True)
class EvStart:
started_at: datetime
@dataclass(frozen=True, slots=True, kw_only=True)
class EvAdvance:
delta_permille: int # ≥0
@dataclass(frozen=True, slots=True, kw_only=True)
class EvSucceed:
completed_at: datetime
artifact_id: str
dim: int
sha256: str
@dataclass(frozen=True, slots=True, kw_only=True)
class EvFail:
failed_at: datetime
code: ErrorCode
msg: str
attempt: int
Event = EvStart | EvAdvance | EvSucceed | EvFail
def start_event(*, started_at: datetime) -> EvStart:
if started_at.tzinfo is None:
raise ValueError("started_at must be timezone-aware")
return EvStart(started_at=started_at)
def advance_event(*, delta_permille: int) -> EvAdvance:
if delta_permille < 0:
raise ValueError("delta_permille must be ≥0 — progress is monotonic")
return EvAdvance(delta_permille=delta_permille)
def succeed_event(*, completed_at: datetime, artifact_id: str, dim: int, sha256: str) -> EvSucceed:
if completed_at.tzinfo is None:
raise ValueError("completed_at must be timezone-aware")
return EvSucceed(completed_at=completed_at, artifact_id=artifact_id, dim=dim, sha256=sha256)
def fail_event(*, failed_at: datetime, code: ErrorCode, msg: str, attempt: int) -> EvFail:
if failed_at.tzinfo is None:
raise ValueError("failed_at must be timezone-aware")
if attempt < 1:
raise ValueError("attempt must be ≥1")
return EvFail(failed_at=failed_at, code=code, msg=msg, attempt=attempt)
# Processing state variants + smart constructors
@dataclass(frozen=True, slots=True, kw_only=True)
class Pending:
kind: Literal["pending"] = "pending"
queued_at: datetime
version: Literal[1] = 1
@dataclass(frozen=True, slots=True, kw_only=True)
class Running:
kind: Literal["running"] = "running"
started_at: datetime
progress_permille: int # 0–1000
version: Literal[1] = 1
@dataclass(frozen=True, slots=True, kw_only=True)
class Done:
kind: Literal["done"] = "done"
completed_at: datetime
artifact_id: str
dim: int
sha256: str
version: Literal[1] = 1
@dataclass(frozen=True, slots=True, kw_only=True)
class Failed:
kind: Literal["failed"] = "failed"
failed_at: datetime
code: ErrorCode
msg: str
attempt: int
version: Literal[1] = 1
ProcessingState = Pending | Running | Done | Failed
from funcpipe_rag.result.types import Result, Ok, Err
def pending(*, queued_at: datetime) -> Pending:
if queued_at.tzinfo is None:
raise ValueError("queued_at must be timezone-aware")
return Pending(queued_at=queued_at)
def running(*, started_at: datetime, progress_permille: int) -> Running:
if started_at.tzinfo is None:
raise ValueError("started_at must be timezone-aware")
if not 0 <= progress_permille <= 1000:
raise ValueError("progress_permille must be 0–1000")
return Running(started_at=started_at, progress_permille=progress_permille)
def done(*, completed_at: datetime, artifact_id: str, dim: int, sha256: str) -> Done:
if completed_at.tzinfo is None:
raise ValueError("completed_at must be timezone-aware")
if dim <= 0 or len(sha256) != 64 or not all(c in "0123456789abcdefABCDEF" for c in sha256):
raise ValueError("invalid Done payload")
return Done(completed_at=completed_at, artifact_id=artifact_id, dim=dim, sha256=sha256)
def failed(*, failed_at: datetime, code: ErrorCode, msg: str, attempt: int) -> Failed:
if failed_at.tzinfo is None:
raise ValueError("failed_at must be timezone-aware")
if attempt < 1:
raise ValueError("attempt must be >= 1")
return Failed(failed_at=failed_at, code=code, msg=msg, attempt=attempt)
4. Reference Implementations (continued)¶
4.1 Pure State Transition (match-based, exhaustive)¶
def transition(state: ProcessingState, event: Event) -> ProcessingState:
match state, event:
case Pending(), EvStart(started_at=s):
if s < state.queued_at:
raise ValueError("started_at cannot be earlier than queued_at")
return running(started_at=s, progress_permille=0)
case Running(), EvAdvance(delta_permille=d):
new_p = min(1000, state.progress_permille + d)
return running(started_at=state.started_at, progress_permille=new_p)
case Running(), EvSucceed(completed_at=c, artifact_id=a, dim=d, sha256=h):
if c < state.started_at:
raise ValueError("completed_at cannot be earlier than started_at")
return done(completed_at=c, artifact_id=a, dim=d, sha256=h)
case Running(), EvFail(failed_at=f, code=c, msg=m, attempt=a):
if f < state.started_at:
raise ValueError("failed_at cannot be earlier than started_at")
return failed(failed_at=f, code=c, msg=m, attempt=a)
case (Done() | Failed()), _:
return state
raise ValueError(f"invalid transition {state.kind} ← {event.__class__.__name__}")
4.2 Exhaustive Handling Example¶
def describe(state: ProcessingState) -> str:
match state:
case Pending(queued_at=q):
return f"Pending – queued at {q.isoformat()}"
case Running(started_at=s, progress_permille=p):
return f"Running – {p/10:.1f}% (started {s.isoformat()})"
case Done(completed_at=c, artifact_id=a, dim=d):
return f"Done – artifact {a} ({d} dims) at {c.isoformat()}"
case Failed(failed_at=f, code=c, msg=m, attempt=a):
return f"Failed – {c.value} (attempt {a}): {m}"
assert_never(state)
4.3 JSON Round-Trip (stable, versioned)¶
def processing_state_to_dict(s: ProcessingState) -> dict[str, JSON]:
base: dict[str, JSON] = {"kind": s.kind, "version": s.version}
match s:
case Pending(queued_at=q):
return base | {"queued_at": q.isoformat()}
case Running(started_at=s, progress_permille=p):
return base | {"started_at": s.isoformat(), "progress_permille": p}
case Done(completed_at=c, artifact_id=a, dim=d, sha256=h):
return base | {"completed_at": c.isoformat(), "artifact_id": a, "dim": d, "sha256": h}
case Failed(failed_at=f, code=c, msg=m, attempt=a):
return base | {"failed_at": f.isoformat(), "code": c.value, "msg": m, "attempt": a}
assert_never(s)
def processing_state_from_dict(d: Mapping[str, JSON]) -> ProcessingState:
if d.get("version") != 1:
raise ValueError("unsupported version")
kind = d["kind"]
match kind:
case "pending":
return pending(queued_at=datetime.fromisoformat(d["queued_at"])) # type: ignore
case "running":
return running(
started_at=datetime.fromisoformat(d["started_at"]),
progress_permille=int(d["progress_permille"]),
)
case "done":
return done(
completed_at=datetime.fromisoformat(d["completed_at"]),
artifact_id=str(d["artifact_id"]),
dim=int(d["dim"]),
sha256=str(d["sha256"]),
)
case "failed":
return failed(
failed_at=datetime.fromisoformat(d["failed_at"]),
code=ErrorCode(d["code"]), # type: ignore
msg=str(d["msg"]),
attempt=int(d["attempt"]),
)
raise ValueError(f"unknown kind {kind}")
4.4 Pipeline Integration (state machine is single source of truth)¶
def process_chunk(chunk: Chunk) -> Result[Done, Failed]:
state: ProcessingState = pending(queued_at=datetime.now(UTC))
try:
now = datetime.now(UTC)
state = transition(state, start_event(started_at=now))
# ... actual embedding work ...
completed_at = datetime.now(UTC)
state = transition(
state,
succeed_event(
completed_at=completed_at,
artifact_id="emb-123",
dim=1536,
sha256="0" * 64,
),
)
assert isinstance(state, Done)
return ok(state)
except Exception as exc:
state = transition(
state,
fail_event(
failed_at=datetime.now(UTC),
code=ErrorCode.EMBED_FAIL,
msg=str(exc),
attempt=1,
),
)
assert isinstance(state, Failed)
return err(state)
5. Property-Based Proofs (tests/test_module_05_c02.py)¶
import dataclasses
import pytest
from hypothesis import given, strategies as st
from datetime import datetime, timezone
UTC = timezone.utc
# Valid aware datetimes only
aware_dt = st.datetimes(timezones=st.just(UTC))
@given(queued_at=aware_dt)
def test_pending_naive_rejected(queued_at):
naive = queued_at.replace(tzinfo=None)
with pytest.raises(ValueError):
pending(queued_at=naive)
@given(delta=st.integers(max_value=-1))
def test_advance_negative_rejected(delta):
with pytest.raises(ValueError):
advance_event(delta_permille=delta)
@given(state=running_st, delta=st.integers(min_value=0, max_value=2000))
def test_progress_monotonic_and_clamped(state, delta):
ev = advance_event(delta_permille=delta)
new_state = transition(state, ev)
assert new_state.progress_permille >= state.progress_permille
assert new_state.progress_permille <= 1000
@given(state=st.one_of(done_st, failed_st))
def test_terminal_idempotent(state):
# any event leaves terminal state unchanged
for ev in [
start_event(started_at=datetime.now(UTC)),
advance_event(delta_permille=100),
succeed_event(completed_at=datetime.now(UTC), artifact_id="x", dim=1, sha256="0"*64),
fail_event(failed_at=datetime.now(UTC), code=ErrorCode.INTERNAL, msg="", attempt=1),
]:
assert transition(state, ev) == state
6. Big-O & Allocation Guarantees¶
| Operation | Time | Heap | Notes |
|---|---|---|---|
| State / event creation | O(1) | O(1) | slots=True → no dict |
| Transition / match | O(1) | O(1) | Fixed number of variants |
| JSON round-trip | O(1) | O(1) | Fixed fields (O(L) in string length) |
7. Anti-Patterns & Immediate Fixes¶
| Anti-Pattern | Symptom | Fix |
|---|---|---|
| String/enum status flags | Invalid states possible | Tagged union with payloads |
| Mutable state objects | Corruption, races | frozen=True + pure transition functions |
| Negative progress | Confusing UI, bugs | delta_permille ≥0 enforced at event creation |
| Naive datetimes | TypeError on comparison | Smart constructors require tz-aware |
8. Pre-Core Quiz¶
- Simple flags → Enum
- States with data → Tagged union of dataclasses
- Pure transitions → No mutation, return new state
- assert_never → Proves exhaustiveness
- Progress rule → Monotonic non-decreasing, clamped 0–1000
9. Post-Core Exercise¶
- Model your current chunk processing state as
ProcessingState→ implement transition and test invalid paths. - Add a new event (e.g.
EvPause) and new statePaused→ watch mypy break every transition site. - Make a pipeline step return
Result[Done, Failed]wrapping the terminal state produced by the state machine.
Next: M05C03 – Functors in Python (“Things You Can Map Over” – Option, Result, List).
You now model every domain state as pure, immutable, exhaustively-handled ADTs — illegal states are unrepresentable, invalid transitions impossible, and the type checker enforces totality. The rest of Module 5 composes these ADTs into functors, applicatives, and monoids for bulletproof pipelines.