Skip to content

M05C08: ADT Serialization Contracts – Stable, Versioned, Beyond Pydantic

Progression Note

By the end of Module 5, you will model every domain concept as immutable algebraic data types (products and tagged sums), eliminating whole classes of runtime errors through exhaustive pattern matching, mypy-checked totality, and pure serialization contracts.

Module Focus Key Outcomes
4 Safe Recursion & Error Handling Stack-safe tree recursion, folds, Result/Option, streaming validation/retries
5 Advanced Type-Driven Design ADTs, exhaustive pattern matching, total functions, refined types
6 Monadic Flows as Composable Pipelines bind/and_then, Reader/State-like patterns, error-typed flows

Core question
How do you define stable, versioned, round-trippable serialization contracts for your core ADTs — using only plain dataclasses and lightweight codecs — so that persistence never silently breaks when you evolve the schema?

Every production system eventually discovers the same painful truth:

“Our ‘quick’ JSON dump broke six months later when we added a field, and now half the stored chunks are unreadable with no migration path.”

The naïve pattern everyone writes first:

# BEFORE – unstable, unversioned, fragile
serialized = json.dumps(asdict(chunk))        # field order changes, no tag for sums, no version
chunk = Chunk(**json.loads(serialized))       # missing fields → None, wrong types → crash later

Schema drift, silent corruption, no forward compatibility.

The production pattern: every persisted ADT is wrapped in a stable Envelope {tag: str, ver: int, payload: dict} and serialized with explicit encoder/decoder factories + migrators → guaranteed round-trip, explicit versioning, zero surprises forever.

# AFTER – stable, versioned, migratable
enc = enc_chunk()                               # factory → Encoder[Chunk]
serialized = to_json(core_chunk, enc)           # {"tag":"chunk","ver":1,"payload":{...}}
core_chunk = from_json(serialized, dec_chunk()) # migrates automatically

Schema changes are explicit, migratable, and never silent.

Audience: Engineers who have ever lost production data to “just add a field” and want mathematically stable persistence contracts.

Outcome 1. Every core ADT gains explicit encoder/decoder factories + Envelope. 2. All serialization proven round-trippable + migratable. 3. Zero-dependency, near-linear, streaming serde forever.

Tiny Non-Domain Example – Versioned Option[T]

# v1
opt_v1 = Some(value=42)
serialized_v1 = to_json(opt_v1, enc_option())
# {"tag":"option","ver":1,"payload":{"kind":"some","value":42}}

# v2 decoder with added metadata field (implemented as the current dec_option)
opt_v2 = from_json(serialized_v1, dec_option())
# Some(value=42, metadata={})

Adding a field never breaks old data.

Why Explicit Serialization Contracts? (Three bullets every engineer should internalise)

  • Stability: Tagged envelope + explicit payload → field order, missing fields, and sum variants never surprise.
  • Versioning + migration: ver field + MIGRATORS → schema evolution is explicit and automated.
  • Zero magic: Plain functions, no reflection → predictable, fast, dependency-free.

Pydantic is for edges only (C06). Core persistence uses explicit contracts.

1. Laws & Invariants (machine-checked)

Invariant Description Enforcement
Round-Trip from_x(to_x(x)) == x Hypothesis on all ADTs
Migration Round-Trip Old data → new decoder yields correct migrated value Property tests with old payloads
Envelope Shape Every serialized value has tag/ver/payload _check_env guard
Non-empty VFailure Validation failures always have ≥1 error Decoder enforcement

2. Decision Table – When to Use What Serde

Need Validation? Binary? Versioning? Recommended
Simple JSON, no validation No No Yes Plain envelope + json
Schema + validation Yes No Yes Marshmallow OneOfSchema
Compact / fast No Yes Yes MessagePack + envelope
Streaming large datasets No Yes/No Yes iter_ndjson / iter_msgpack

3. Public API (boundaries/serde.py – mypy --strict clean)

from __future__ import annotations

from dataclasses import dataclass
from typing import Any, Callable, Mapping, Protocol, TypeVar, Tuple
import json
import msgpack
from typing_extensions import assert_never

from funcpipe_rag.fp.core import Option, Some, NoneVal, Result, Ok, Err, Validation, VSuccess, VFailure, ErrInfo

__all__ = [
    "Envelope",
    "Encoder", "Decoder",
    "enc_option", "dec_option",
    "enc_result", "dec_result",
    "enc_validation", "dec_validation",
    "to_json", "from_json",
    "to_msgpack", "from_msgpack",
    "from_json_safe",
    "MIGRATORS", "migrate",
    "iter_ndjson", "iter_msgpack",
]

T = TypeVar("T")
E = TypeVar("E")

JSON = str | int | float | bool | None | list["JSON"] | dict[str, "JSON"]

@dataclass(frozen=True, slots=True)
class Envelope:
    tag: str                     # e.g. "option", "result", "chunk"
    ver: int
    payload: dict[str, JSON]

class Encoder(Protocol[T]):
    def __call__(self, x: T) -> Envelope: ...

class Decoder(Protocol[T]):
    def __call__(self, env: Envelope) -> T: ...

# Default JSON value codecs (identity for JSON-serializable values)
def json_encoder(x: JSON) -> JSON:
    return x

def json_decoder(j: JSON) -> JSON:
    return j

# Option factories
def enc_option(enc_val: Callable[[T], JSON] | None = None) -> Encoder[Option[T]]:
    ev = enc_val or json_encoder  # type: ignore[arg-type]  # assumes T is JSON-compatible if no codec supplied
    def _enc(x: Option[T]) -> Envelope:
        match x:
            case Some(value=v):
                return Envelope(tag="option", ver=1, payload={"kind": "some", "value": ev(v)})
            case NoneVal():
                return Envelope(tag="option", ver=1, payload={"kind": "none"})
            case other:
                assert_never(other)
    return _enc

def dec_option(dec_val: Callable[[JSON], T] | None = None) -> Decoder[Option[T]]:
    dv = dec_val or json_decoder  # type: ignore[arg-type]
    def _dec(env: Envelope) -> Option[T]:
        if env.tag != "option":
            raise ValueError(f"expected tag 'option', got {env.tag}")
        if env.ver != 1:
            raise ValueError(f"unknown version {env.ver}")
        kind = env.payload.get("kind")
        if kind == "some":
            return Some(dv(env.payload["value"]))
        if kind == "none":
            return NoneVal()
        raise ValueError(f"invalid kind {kind!r}")
    return _dec

# Result factories (explicit ErrInfo codecs required – no default assumption)
def enc_result(enc_val: Callable[[T], JSON] | None = None,
               enc_err: Callable[[ErrInfo], JSON] | None = None) -> Encoder[Result[T, ErrInfo]]:
    ev = enc_val or json_encoder  # type: ignore[arg-type]
    ee = enc_err or (lambda e: {"code": e.code, "msg": e.msg})  # safe default for ErrInfo
    def _enc(x: Result[T, ErrInfo]) -> Envelope:
        match x:
            case Ok(value=v):
                return Envelope(tag="result", ver=1, payload={"kind": "ok", "value": ev(v)})
            case Err(error=e):
                return Envelope(tag="result", ver=1, payload={"kind": "err", "error": ee(e)})
            case other:
                assert_never(other)
    return _enc

def dec_result(dec_val: Callable[[JSON], T] | None = None,
               dec_err: Callable[[JSON], ErrInfo] | None = None) -> Decoder[Result[T, ErrInfo]]:
    dv = dec_val or json_decoder  # type: ignore[arg-type]
    de = dec_err or (lambda j: ErrInfo(code=j["code"], msg=j["msg"]))
    def _dec(env: Envelope) -> Result[T, ErrInfo]:
        if env.tag != "result":
            raise ValueError(f"expected tag 'result', got {env.tag}")
        if env.ver != 1:
            raise ValueError(f"unknown version {env.ver}")
        kind = env.payload.get("kind")
        if kind == "ok":
            return Ok(dv(env.payload["value"]))
        if kind == "err":
            return Err(de(env.payload["error"]))
        raise ValueError(f"invalid kind {kind!r}")
    return _dec

# Validation factories (similar pattern)
def enc_validation(enc_val: Callable[[T], JSON] | None = None,
                   enc_err: Callable[[ErrInfo], JSON] | None = None) -> Encoder[Validation[T, ErrInfo]]:
    ev = enc_val or json_encoder
    ee = enc_err or (lambda e: {"code": e.code, "msg": e.msg})
    def _enc(x: Validation[T, ErrInfo]) -> Envelope:
        match x:
            case VSuccess(value=v):
                return Envelope(tag="validation", ver=1, payload={"kind": "v_success", "value": ev(v)})
            case VFailure(errors=es):
                return Envelope(tag="validation", ver=1, payload={"kind": "v_failure", "errors": [ee(e) for e in es]})
            case other:
                assert_never(other)
    return _enc

def dec_validation(dec_val: Callable[[JSON], T] | None = None,
                   dec_err: Callable[[JSON], ErrInfo] | None = None) -> Decoder[Validation[T, ErrInfo]]:
    dv = dec_val or json_decoder
    de = dec_err or (lambda j: ErrInfo(code=j["code"], msg=j["msg"]))
    def _dec(env: Envelope) -> Validation[T, ErrInfo]:
        if env.tag != "validation":
            raise ValueError(f"expected tag 'validation', got {env.tag}")
        if env.ver != 1:
            raise ValueError(f"unknown version {env.ver}")
        kind = env.payload.get("kind")
        if kind == "v_success":
            return VSuccess(dv(env.payload["value"]))
        if kind == "v_failure":
            errs = [de(e) for e in env.payload["errors"]]
            if not errs:
                raise ValueError("VFailure requires non-empty errors")
            return VFailure(errors=tuple(errs))
        raise ValueError(f"invalid kind {kind!r}")
    return _dec

# JSON / MessagePack
_JSON_KW = dict(ensure_ascii=False, allow_nan=False, separators=(",", ":"))
_MP_PACK = dict(use_bin_type=True)
_MP_UNPACK = dict(raw=False)

def _check_env(obj: Mapping[str, Any]) -> None:
    if not isinstance(obj, dict):
        raise ValueError("invalid envelope: not a dict")
    required = {"tag", "ver", "payload"}
    missing = required - set(obj)
    if missing:
        raise ValueError(f"invalid envelope: missing {missing}")
    if not isinstance(obj["tag"], str):
        raise ValueError("tag must be str")
    if not isinstance(obj["ver"], int):
        raise ValueError("ver must be int")
    if not isinstance(obj["payload"], dict):
        raise ValueError("payload must be dict")

def to_json(x: T, enc: Encoder[T]) -> str:
    env = enc(x)
    return json.dumps({"tag": env.tag, "ver": env.ver, "payload": env.payload}, **_JSON_KW)

def from_json(s: str, dec: Decoder[T]) -> T:
    obj = json.loads(s)
    _check_env(obj)
    env = Envelope(tag=obj["tag"], ver=obj["ver"], payload=obj["payload"])
    return dec(migrate(env))

def to_msgpack(x: T, enc: Encoder[T]) -> bytes:
    env = enc(x)
    return msgpack.packb({"tag": env.tag, "ver": env.ver, "payload": env.payload}, **_MP_PACK)

def from_msgpack(b: bytes, dec: Decoder[T]) -> T:
    obj = msgpack.unpackb(b, **_MP_UNPACK)
    _check_env(obj)
    env = Envelope(tag=obj["tag"], ver=obj["ver"], payload=obj["payload"])
    return dec(migrate(env))

# Safe decode + versioning
@dataclass(frozen=True, slots=True)
class DecodeErr:
    path: Tuple[str, ...] = ()
    msg: str = ""

def from_json_safe(s: str, dec: Decoder[T]) -> Validation[T, DecodeErr]:
    try:
        return VSuccess(from_json(s, dec))
    except Exception as exc:
        return VFailure((DecodeErr(msg=str(exc)),))

MIGRATORS: dict[tuple[str, int], Callable[[Envelope], Envelope]] = {}

MAX_MIGRATION_STEPS = 32

def migrate(env: Envelope) -> Envelope:
    key = (env.tag, env.ver)
    steps = 0
    seen: set[tuple[str, int]] = set()
    while key in MIGRATORS:
        if key in seen:
            raise RuntimeError(f"migration cycle detected at {key}")
        seen.add(key)
        steps += 1
        if steps > MAX_MIGRATION_STEPS:
            raise RuntimeError("migration step limit exceeded")
        env = MIGRATORS[key](env)
        key = (env.tag, env.ver)
    return env

# Streaming
def iter_ndjson(fp, dec: Decoder[T]):
    for line in fp:
        line = line.strip()
        if line:
            yield from_json(line, dec)

def iter_msgpack(fp, dec: Decoder[T]):
    unpacker = msgpack.Unpacker(fp, **_MP_UNPACK)
    for obj in unpacker:
        _check_env(obj)
        yield dec(migrate(Envelope(obj["tag"], obj["ver"], obj["payload"])))

4. Reference Implementations (continued)

4.1 Before vs After – Chunk Persistence

# BEFORE – unstable dict → JSON
serialized = json.dumps(asdict(core_chunk))      # field order changes, no version
core_chunk = Chunk(**json.loads(serialized))     # missing fields → default, crash later

# AFTER – stable envelope
enc = enc_chunk()                               # factory
serialized = to_json(core_chunk, enc)           # {"tag":"chunk","ver":1,"payload":{...}}
core_chunk = from_json(serialized, dec_chunk()) # migrates if needed, raises clear error if invalid

4.2 RAG Integration – Persistent Chunk Store

# Write stream
with open("chunks.ndjson", "w") as f:
    enc = enc_chunk()
    for core_chunk in pipeline_chunks:
        f.write(to_json(core_chunk, enc) + "\n")

# Read stream with migration
with open("chunks.ndjson") as f:
    dec = dec_chunk()
    for core_chunk in iter_ndjson(f, dec):
        yield core_chunk

4.3 Versioning & Migration Example (Chunk v1 → v2)

# Suppose v1 Chunk had no metadata field
def migrate_chunk_v1_to_v2(env: Envelope) -> Envelope:
    if env.ver != 1 or env.tag != "chunk":
        return env
    payload = env.payload.copy()
    payload.setdefault("metadata", {})
    return Envelope(tag="chunk", ver=2, payload=payload)

MIGRATORS[("chunk", 1)] = migrate_chunk_v1_to_v2

5. Property-Based Proofs (tests/test_serde.py)

from hypothesis import given, strategies as st

msgpack_int = st.integers(min_value=-(2**63), max_value=2**64 - 1)

@given(opt=st.one_of(st.builds(Some, value=st.integers()), st.just(NoneVal())))
def test_option_json_roundtrip(opt):
    enc = enc_option()
    dec = dec_option()
    s = to_json(opt, enc)
    back = from_json(s, dec)
    assert back == opt

@given(res=st.one_of(st.builds(Ok, value=msgpack_int),
                    st.builds(Err, error=st.builds(ErrInfo, code=st.text(), msg=st.text()))))
def test_result_msgpack_roundtrip(res):
    enc = enc_result()
    dec = dec_result()
    b = to_msgpack(res, enc)
    back = from_msgpack(b, dec)
    assert back == res

# Migration example test
def test_chunk_v1_migration():
    v1_payload = {"text": "hello", "embedding": [0.1, 0.2]}
    env_v1 = Envelope(tag="chunk", ver=1, payload=v1_payload)
    env_v2 = migrate(env_v1)
    assert env_v2.ver == 2
    assert env_v2.payload["metadata"] == {}

6. Big-O & Allocation Guarantees

Operation Time Heap Notes
encode/decode O(#fields) O(#fields) Single allocation per envelope
JSON/MessagePack O(N) O(N) Linear in payload size
Streaming iter O(1) per item O(1) per item Zero-copy when possible

7. Anti-Patterns & Immediate Fixes

Anti-Pattern Symptom Fix
Raw asdict → json Unstable order, no tags Explicit Envelope + enc/dec
No version field Schema drift breaks old data ver + MIGRATORS
Ad-hoc JSON handling Silent corruption _check_env + safe decode
Pydantic for core persistence Heavy deps, slow Plain codecs for core
No migration path Production data loss migrate + versioned envelope

8. Pre-Core Quiz

  1. Envelope contains…? → tag, ver, payload
  2. Round-trip law → from(to(x)) == x
  3. For versioning? → MIGRATORS dict
  4. Streaming with…? → iter_ndjson / iter_msgpack
  5. Beyond Pydantic for…? → Core persistence

9. Post-Core Exercise

  1. Implement Encoder/Decoder for one core ADT → test round-trip.
  2. Add a v2 migrator for an existing type → test old payload loads.
  3. Replace one json.dumps(asdict(...)) with envelope serde.
  4. Add streaming persistence for a pipeline stage.

Next: M05C09 – Compositional Domain Models – Splitting ADTs Across Subsystems.

You now serialize every core ADT with stable, versioned, migratable contracts — schema evolution is explicit and safe forever. The rest of Module 5 adds compositional domain models and performance guidance for heavy ADTs.