M05C08: ADT Serialization Contracts – Stable, Versioned, Beyond Pydantic¶
Progression Note¶
By the end of Module 5, you will model every domain concept as immutable algebraic data types (products and tagged sums), eliminating whole classes of runtime errors through exhaustive pattern matching, mypy-checked totality, and pure serialization contracts.
| Module | Focus | Key Outcomes |
|---|---|---|
| 4 | Safe Recursion & Error Handling | Stack-safe tree recursion, folds, Result/Option, streaming validation/retries |
| 5 | Advanced Type-Driven Design | ADTs, exhaustive pattern matching, total functions, refined types |
| 6 | Monadic Flows as Composable Pipelines | bind/and_then, Reader/State-like patterns, error-typed flows |
Core question
How do you define stable, versioned, round-trippable serialization contracts for your core ADTs — using only plain dataclasses and lightweight codecs — so that persistence never silently breaks when you evolve the schema?
Every production system eventually discovers the same painful truth:
“Our ‘quick’ JSON dump broke six months later when we added a field, and now half the stored chunks are unreadable with no migration path.”
The naïve pattern everyone writes first:
# BEFORE – unstable, unversioned, fragile
serialized = json.dumps(asdict(chunk)) # field order changes, no tag for sums, no version
chunk = Chunk(**json.loads(serialized)) # missing fields → None, wrong types → crash later
Schema drift, silent corruption, no forward compatibility.
The production pattern: every persisted ADT is wrapped in a stable Envelope {tag: str, ver: int, payload: dict} and serialized with explicit encoder/decoder factories + migrators → guaranteed round-trip, explicit versioning, zero surprises forever.
# AFTER – stable, versioned, migratable
enc = enc_chunk() # factory → Encoder[Chunk]
serialized = to_json(core_chunk, enc) # {"tag":"chunk","ver":1,"payload":{...}}
core_chunk = from_json(serialized, dec_chunk()) # migrates automatically
Schema changes are explicit, migratable, and never silent.
Audience: Engineers who have ever lost production data to “just add a field” and want mathematically stable persistence contracts.
Outcome 1. Every core ADT gains explicit encoder/decoder factories + Envelope. 2. All serialization proven round-trippable + migratable. 3. Zero-dependency, near-linear, streaming serde forever.
Tiny Non-Domain Example – Versioned Option[T]¶
# v1
opt_v1 = Some(value=42)
serialized_v1 = to_json(opt_v1, enc_option())
# {"tag":"option","ver":1,"payload":{"kind":"some","value":42}}
# v2 decoder with added metadata field (implemented as the current dec_option)
opt_v2 = from_json(serialized_v1, dec_option())
# Some(value=42, metadata={})
Adding a field never breaks old data.
Why Explicit Serialization Contracts? (Three bullets every engineer should internalise)¶
- Stability: Tagged envelope + explicit payload → field order, missing fields, and sum variants never surprise.
- Versioning + migration:
verfield +MIGRATORS→ schema evolution is explicit and automated. - Zero magic: Plain functions, no reflection → predictable, fast, dependency-free.
Pydantic is for edges only (C06). Core persistence uses explicit contracts.
1. Laws & Invariants (machine-checked)¶
| Invariant | Description | Enforcement |
|---|---|---|
| Round-Trip | from_x(to_x(x)) == x |
Hypothesis on all ADTs |
| Migration Round-Trip | Old data → new decoder yields correct migrated value | Property tests with old payloads |
| Envelope Shape | Every serialized value has tag/ver/payload | _check_env guard |
| Non-empty VFailure | Validation failures always have ≥1 error | Decoder enforcement |
2. Decision Table – When to Use What Serde¶
| Need | Validation? | Binary? | Versioning? | Recommended |
|---|---|---|---|---|
| Simple JSON, no validation | No | No | Yes | Plain envelope + json |
| Schema + validation | Yes | No | Yes | Marshmallow OneOfSchema |
| Compact / fast | No | Yes | Yes | MessagePack + envelope |
| Streaming large datasets | No | Yes/No | Yes | iter_ndjson / iter_msgpack |
3. Public API (boundaries/serde.py – mypy --strict clean)¶
from __future__ import annotations
from dataclasses import dataclass
from typing import Any, Callable, Mapping, Protocol, TypeVar, Tuple
import json
import msgpack
from typing_extensions import assert_never
from funcpipe_rag.fp.core import Option, Some, NoneVal, Result, Ok, Err, Validation, VSuccess, VFailure, ErrInfo
__all__ = [
"Envelope",
"Encoder", "Decoder",
"enc_option", "dec_option",
"enc_result", "dec_result",
"enc_validation", "dec_validation",
"to_json", "from_json",
"to_msgpack", "from_msgpack",
"from_json_safe",
"MIGRATORS", "migrate",
"iter_ndjson", "iter_msgpack",
]
T = TypeVar("T")
E = TypeVar("E")
JSON = str | int | float | bool | None | list["JSON"] | dict[str, "JSON"]
@dataclass(frozen=True, slots=True)
class Envelope:
tag: str # e.g. "option", "result", "chunk"
ver: int
payload: dict[str, JSON]
class Encoder(Protocol[T]):
def __call__(self, x: T) -> Envelope: ...
class Decoder(Protocol[T]):
def __call__(self, env: Envelope) -> T: ...
# Default JSON value codecs (identity for JSON-serializable values)
def json_encoder(x: JSON) -> JSON:
return x
def json_decoder(j: JSON) -> JSON:
return j
# Option factories
def enc_option(enc_val: Callable[[T], JSON] | None = None) -> Encoder[Option[T]]:
ev = enc_val or json_encoder # type: ignore[arg-type] # assumes T is JSON-compatible if no codec supplied
def _enc(x: Option[T]) -> Envelope:
match x:
case Some(value=v):
return Envelope(tag="option", ver=1, payload={"kind": "some", "value": ev(v)})
case NoneVal():
return Envelope(tag="option", ver=1, payload={"kind": "none"})
case other:
assert_never(other)
return _enc
def dec_option(dec_val: Callable[[JSON], T] | None = None) -> Decoder[Option[T]]:
dv = dec_val or json_decoder # type: ignore[arg-type]
def _dec(env: Envelope) -> Option[T]:
if env.tag != "option":
raise ValueError(f"expected tag 'option', got {env.tag}")
if env.ver != 1:
raise ValueError(f"unknown version {env.ver}")
kind = env.payload.get("kind")
if kind == "some":
return Some(dv(env.payload["value"]))
if kind == "none":
return NoneVal()
raise ValueError(f"invalid kind {kind!r}")
return _dec
# Result factories (explicit ErrInfo codecs required – no default assumption)
def enc_result(enc_val: Callable[[T], JSON] | None = None,
enc_err: Callable[[ErrInfo], JSON] | None = None) -> Encoder[Result[T, ErrInfo]]:
ev = enc_val or json_encoder # type: ignore[arg-type]
ee = enc_err or (lambda e: {"code": e.code, "msg": e.msg}) # safe default for ErrInfo
def _enc(x: Result[T, ErrInfo]) -> Envelope:
match x:
case Ok(value=v):
return Envelope(tag="result", ver=1, payload={"kind": "ok", "value": ev(v)})
case Err(error=e):
return Envelope(tag="result", ver=1, payload={"kind": "err", "error": ee(e)})
case other:
assert_never(other)
return _enc
def dec_result(dec_val: Callable[[JSON], T] | None = None,
dec_err: Callable[[JSON], ErrInfo] | None = None) -> Decoder[Result[T, ErrInfo]]:
dv = dec_val or json_decoder # type: ignore[arg-type]
de = dec_err or (lambda j: ErrInfo(code=j["code"], msg=j["msg"]))
def _dec(env: Envelope) -> Result[T, ErrInfo]:
if env.tag != "result":
raise ValueError(f"expected tag 'result', got {env.tag}")
if env.ver != 1:
raise ValueError(f"unknown version {env.ver}")
kind = env.payload.get("kind")
if kind == "ok":
return Ok(dv(env.payload["value"]))
if kind == "err":
return Err(de(env.payload["error"]))
raise ValueError(f"invalid kind {kind!r}")
return _dec
# Validation factories (similar pattern)
def enc_validation(enc_val: Callable[[T], JSON] | None = None,
enc_err: Callable[[ErrInfo], JSON] | None = None) -> Encoder[Validation[T, ErrInfo]]:
ev = enc_val or json_encoder
ee = enc_err or (lambda e: {"code": e.code, "msg": e.msg})
def _enc(x: Validation[T, ErrInfo]) -> Envelope:
match x:
case VSuccess(value=v):
return Envelope(tag="validation", ver=1, payload={"kind": "v_success", "value": ev(v)})
case VFailure(errors=es):
return Envelope(tag="validation", ver=1, payload={"kind": "v_failure", "errors": [ee(e) for e in es]})
case other:
assert_never(other)
return _enc
def dec_validation(dec_val: Callable[[JSON], T] | None = None,
dec_err: Callable[[JSON], ErrInfo] | None = None) -> Decoder[Validation[T, ErrInfo]]:
dv = dec_val or json_decoder
de = dec_err or (lambda j: ErrInfo(code=j["code"], msg=j["msg"]))
def _dec(env: Envelope) -> Validation[T, ErrInfo]:
if env.tag != "validation":
raise ValueError(f"expected tag 'validation', got {env.tag}")
if env.ver != 1:
raise ValueError(f"unknown version {env.ver}")
kind = env.payload.get("kind")
if kind == "v_success":
return VSuccess(dv(env.payload["value"]))
if kind == "v_failure":
errs = [de(e) for e in env.payload["errors"]]
if not errs:
raise ValueError("VFailure requires non-empty errors")
return VFailure(errors=tuple(errs))
raise ValueError(f"invalid kind {kind!r}")
return _dec
# JSON / MessagePack
_JSON_KW = dict(ensure_ascii=False, allow_nan=False, separators=(",", ":"))
_MP_PACK = dict(use_bin_type=True)
_MP_UNPACK = dict(raw=False)
def _check_env(obj: Mapping[str, Any]) -> None:
if not isinstance(obj, dict):
raise ValueError("invalid envelope: not a dict")
required = {"tag", "ver", "payload"}
missing = required - set(obj)
if missing:
raise ValueError(f"invalid envelope: missing {missing}")
if not isinstance(obj["tag"], str):
raise ValueError("tag must be str")
if not isinstance(obj["ver"], int):
raise ValueError("ver must be int")
if not isinstance(obj["payload"], dict):
raise ValueError("payload must be dict")
def to_json(x: T, enc: Encoder[T]) -> str:
env = enc(x)
return json.dumps({"tag": env.tag, "ver": env.ver, "payload": env.payload}, **_JSON_KW)
def from_json(s: str, dec: Decoder[T]) -> T:
obj = json.loads(s)
_check_env(obj)
env = Envelope(tag=obj["tag"], ver=obj["ver"], payload=obj["payload"])
return dec(migrate(env))
def to_msgpack(x: T, enc: Encoder[T]) -> bytes:
env = enc(x)
return msgpack.packb({"tag": env.tag, "ver": env.ver, "payload": env.payload}, **_MP_PACK)
def from_msgpack(b: bytes, dec: Decoder[T]) -> T:
obj = msgpack.unpackb(b, **_MP_UNPACK)
_check_env(obj)
env = Envelope(tag=obj["tag"], ver=obj["ver"], payload=obj["payload"])
return dec(migrate(env))
# Safe decode + versioning
@dataclass(frozen=True, slots=True)
class DecodeErr:
path: Tuple[str, ...] = ()
msg: str = ""
def from_json_safe(s: str, dec: Decoder[T]) -> Validation[T, DecodeErr]:
try:
return VSuccess(from_json(s, dec))
except Exception as exc:
return VFailure((DecodeErr(msg=str(exc)),))
MIGRATORS: dict[tuple[str, int], Callable[[Envelope], Envelope]] = {}
MAX_MIGRATION_STEPS = 32
def migrate(env: Envelope) -> Envelope:
key = (env.tag, env.ver)
steps = 0
seen: set[tuple[str, int]] = set()
while key in MIGRATORS:
if key in seen:
raise RuntimeError(f"migration cycle detected at {key}")
seen.add(key)
steps += 1
if steps > MAX_MIGRATION_STEPS:
raise RuntimeError("migration step limit exceeded")
env = MIGRATORS[key](env)
key = (env.tag, env.ver)
return env
# Streaming
def iter_ndjson(fp, dec: Decoder[T]):
for line in fp:
line = line.strip()
if line:
yield from_json(line, dec)
def iter_msgpack(fp, dec: Decoder[T]):
unpacker = msgpack.Unpacker(fp, **_MP_UNPACK)
for obj in unpacker:
_check_env(obj)
yield dec(migrate(Envelope(obj["tag"], obj["ver"], obj["payload"])))
4. Reference Implementations (continued)¶
4.1 Before vs After – Chunk Persistence¶
# BEFORE – unstable dict → JSON
serialized = json.dumps(asdict(core_chunk)) # field order changes, no version
core_chunk = Chunk(**json.loads(serialized)) # missing fields → default, crash later
# AFTER – stable envelope
enc = enc_chunk() # factory
serialized = to_json(core_chunk, enc) # {"tag":"chunk","ver":1,"payload":{...}}
core_chunk = from_json(serialized, dec_chunk()) # migrates if needed, raises clear error if invalid
4.2 RAG Integration – Persistent Chunk Store¶
# Write stream
with open("chunks.ndjson", "w") as f:
enc = enc_chunk()
for core_chunk in pipeline_chunks:
f.write(to_json(core_chunk, enc) + "\n")
# Read stream with migration
with open("chunks.ndjson") as f:
dec = dec_chunk()
for core_chunk in iter_ndjson(f, dec):
yield core_chunk
4.3 Versioning & Migration Example (Chunk v1 → v2)¶
# Suppose v1 Chunk had no metadata field
def migrate_chunk_v1_to_v2(env: Envelope) -> Envelope:
if env.ver != 1 or env.tag != "chunk":
return env
payload = env.payload.copy()
payload.setdefault("metadata", {})
return Envelope(tag="chunk", ver=2, payload=payload)
MIGRATORS[("chunk", 1)] = migrate_chunk_v1_to_v2
5. Property-Based Proofs (tests/test_serde.py)¶
from hypothesis import given, strategies as st
msgpack_int = st.integers(min_value=-(2**63), max_value=2**64 - 1)
@given(opt=st.one_of(st.builds(Some, value=st.integers()), st.just(NoneVal())))
def test_option_json_roundtrip(opt):
enc = enc_option()
dec = dec_option()
s = to_json(opt, enc)
back = from_json(s, dec)
assert back == opt
@given(res=st.one_of(st.builds(Ok, value=msgpack_int),
st.builds(Err, error=st.builds(ErrInfo, code=st.text(), msg=st.text()))))
def test_result_msgpack_roundtrip(res):
enc = enc_result()
dec = dec_result()
b = to_msgpack(res, enc)
back = from_msgpack(b, dec)
assert back == res
# Migration example test
def test_chunk_v1_migration():
v1_payload = {"text": "hello", "embedding": [0.1, 0.2]}
env_v1 = Envelope(tag="chunk", ver=1, payload=v1_payload)
env_v2 = migrate(env_v1)
assert env_v2.ver == 2
assert env_v2.payload["metadata"] == {}
6. Big-O & Allocation Guarantees¶
| Operation | Time | Heap | Notes |
|---|---|---|---|
| encode/decode | O(#fields) | O(#fields) | Single allocation per envelope |
| JSON/MessagePack | O(N) | O(N) | Linear in payload size |
| Streaming iter | O(1) per item | O(1) per item | Zero-copy when possible |
7. Anti-Patterns & Immediate Fixes¶
| Anti-Pattern | Symptom | Fix |
|---|---|---|
| Raw asdict → json | Unstable order, no tags | Explicit Envelope + enc/dec |
| No version field | Schema drift breaks old data | ver + MIGRATORS |
| Ad-hoc JSON handling | Silent corruption | _check_env + safe decode |
| Pydantic for core persistence | Heavy deps, slow | Plain codecs for core |
| No migration path | Production data loss | migrate + versioned envelope |
8. Pre-Core Quiz¶
- Envelope contains…? → tag, ver, payload
- Round-trip law → from(to(x)) == x
- For versioning? → MIGRATORS dict
- Streaming with…? → iter_ndjson / iter_msgpack
- Beyond Pydantic for…? → Core persistence
9. Post-Core Exercise¶
- Implement
Encoder/Decoderfor one core ADT → test round-trip. - Add a v2 migrator for an existing type → test old payload loads.
- Replace one
json.dumps(asdict(...))with envelope serde. - Add streaming persistence for a pipeline stage.
Next: M05C09 – Compositional Domain Models – Splitting ADTs Across Subsystems.
You now serialize every core ADT with stable, versioned, migratable contracts — schema evolution is explicit and safe forever. The rest of Module 5 adds compositional domain models and performance guidance for heavy ADTs.