Skip to content

Core 9: Cross-Process Composition – Serializing Configs/Specifications, Not Live Objects

Module 09

Core question: How do you compose functional pipelines across processes by serializing configurations and specifications (not live objects or code), ensuring reproducible, decoupled execution in distributed or multi-stage workflows while maintaining purity, security, and version control in FuncPipe? In this core, we extend FuncPipe to cross-process composition in the RAG Builder (now at funcpipe-rag-09). Live objects (e.g., closures, partials, or instances) can't be serialized safely across processes, leading to brittleness, security risks (e.g., pickle attacks), and version mismatches. Instead, we serialize pure data specifications (e.g., YAML/JSON configs defining pipeline graphs, operator names, params, monoids, error policies) and reconstruct pipelines deterministically at runtime using registries and factories. This enables workflows like: serialize a RAG config on one machine, ship it to a cluster for execution, or chain multi-stage jobs (e.g., ingest → process → serve) without code coupling. We refactor RAG to serialize specs, verifying laws like reproducibility (same spec + registry yields same pipeline) and security (no code execution in deserialization). Specs are linear sequences of ops for simplicity; DAGs are out-of-scope but extendable via edges. Dependencies: pip install pyyaml; tradeoffs: Specs add reconstruction overhead but enable decoupling/versioning; sharp edges: Use schemas (Pydantic) for validation; avoid over-specifying (keep specs data-only).

Motivation Bug: Serializing live objects (e.g., via pickle) exposes code execution risks, ties to runtime envs, and breaks versioning; pure specs decouple definition from execution for safe, reproducible composition.

Delta from Core 8 (Module 09): Facades normalize libs to pure descriptions; this core enables cross-process by serializing specs (data defining pipelines), not objects, allowing distributed reconstruction.

Spec Protocol (Contract, Entry/Exit Criteria): - Purity: Specs are pure data (YAML/JSON); no code/objects. - Reconstruction: Factories/registries build pipelines from specs deterministically. - Validation: Pydantic schemas enforce structure/security (extra="forbid", no arbitrary fields). - Semantics: Laws like reproducibility (same spec + registry yields same pipeline); equivalence (reconstructed pipeline == original up to env); versioning (specs include content-addressed hashes for ops/monoids, computed canonically from sorted JSON); verified via round-trip tests. - Integration: Extend Module 6 configs; serialize graphs from Core 7. Op params are resolved via registry (e.g., partials/Reader); no explicit params in specs to keep them data-only. - Mypy Config: --strict; Pydantic for schemas.

Audience: Engineers building distributed/multi-stage FP systems needing safe, versioned composition.

Outcome: 1. Design serializable specs for pipelines. 2. Serialize/reconstruct RAG workflows cross-process. 3. Prove reproducibility with round-trip tests.


1. Laws & Invariants

Law Description Enforcement
Reproducibility Law Same spec + registry yields equivalent pipeline. Round-trip Hypothesis tests
Equivalence Law Reconstructed pipeline behaves as original (up to env). Property tests on outputs
Security Inv Deserialization executes no code; data only. Schema validation/no eval
Version Inv Specs include canonical hashes (sha256 of sorted JSON) for ops/monoids. Pydantic + tests
These laws ensure specs enable safe composition.
---
## 2. Decision Table
Scenario Decoupling Needed Security Risk
----------------------- ------------------- ---------------
Multi-stage jobs High Medium
Cluster shipping High High
Versioned workflows High Low
Specs for decoupling; schemas for security.
---
## 3. Public API (src/funcpipe_rag/pipelines/specs.py – Serializable Specs)
Repo alignment note (end-of-Module-09):
- This repo uses frozen dataclasses (stdlib) for specs to keep dependencies light.
- Canonical hashing + allow-listed reconstruction are implemented in src/funcpipe_rag/pipelines/specs.py.

from typing import List, Dict, Literal, Optional, Any, Callable
from pydantic import BaseModel, Field, ConfigDict
from funcpipe_rag import ErrorPolicy, ErrInfo, Result, Ok, Err
from funcpipe_rag import Pipeline, build_pipeline  # Assume from prior
from funcpipe_rag import Monoid  # Canonical
import hashlib
import json


class MonoidSpec(BaseModel):
    model_config = ConfigDict(extra="forbid")
    type: Literal["built_in", "custom"]
    name: Literal["sum", "max"] | str  # Built-in or custom id
    params: Optional[Dict[str, Any]] = None  # e.g., {"identity": 0}
    version: str = ""  # sha256 of canonical JSON (excluding version)


class SinkContractSpec(BaseModel):
    model_config = ConfigDict(extra="forbid")
    idempotent: bool
    # Add more as needed


class OpSpec(BaseModel):
    model_config = ConfigDict(extra="forbid")
    type: Literal["Map", "FlatMap", "BatchMap", "Combine", "Sink"]
    name: str
    error_policy: ErrorPolicy
    in_type: str
    out_type: str
    version: str = ""  # sha256 of canonical JSON (excluding version)
    func_id: Optional[str] = None  # For Map/FlatMap/BatchMap
    func_version: Optional[str] = None  # Registry version for func
    max_batch_size: Optional[int] = None  # For BatchMap
    lift_id: Optional[str] = None  # For Combine
    lift_version: Optional[str] = None  # Registry version for lift
    monoid_id: Optional[str] = None  # For Combine
    sink_id: Optional[str] = None  # For Sink
    sink_version: Optional[str] = None  # Registry version for sink
    sink_contract: Optional[SinkContractSpec] = None  # For Sink


class PipelineSpec(BaseModel):
    model_config = ConfigDict(extra="forbid")
    ops: List[OpSpec]
    monoids: Dict[str, MonoidSpec]  # Serialized monoid data
    backend: Literal["dask", "beam"]
    resources: Dict[str, Any] = Field(default_factory=dict)
    materialization_points: List[str] = Field(default_factory=list)
    version: str = ""  # Pipeline hash (excluding version)


def canonical_hash(model: BaseModel) -> str:
    payload = model.model_dump(exclude={"version"}, mode="json")
    data = json.dumps(payload, sort_keys=True, separators=(",", ":"))
    return hashlib.sha256(data.encode()).hexdigest()


def serialize_pipeline(pipeline: Pipeline, registry_meta: Dict[str, Dict[str, str]]) -> str:
    # Project IR to spec parts, compute hashes after building
    monoid_specs = {}
    for mid, m in pipeline.monoids.items():
        mspec = MonoidSpec(
            type="built_in" if m.is_builtin else "custom",
            name=m.name,
            params=m.params
        )
        mspec.version = canonical_hash(mspec)
        monoid_specs[mid] = mspec

    op_specs = []
    for op in pipeline.ops:  # Assume pipeline.ops is IR list
        ospec = OpSpec(
            type=op.type,
            name=op.name,
            error_policy=op.error_policy,
            in_type=op.in_type,
            out_type=op.out_type,
            func_id=op.func_id if op.type in ["Map", "FlatMap", "BatchMap"] else None,
            func_version=registry_meta.get(op.func_id, {}).get('version') if op.func_id else None,
            max_batch_size=op.max_batch_size if op.type == "BatchMap" else None,
            lift_id=op.lift_id if op.type == "Combine" else None,
            lift_version=registry_meta.get(op.lift_id, {}).get('version') if op.lift_id else None,
            monoid_id=op.monoid_id if op.type == "Combine" else None,
            sink_id=op.sink_id if op.type == "Sink" else None,
            sink_version=registry_meta.get(op.sink_id, {}).get('version') if op.sink_id else None,
            sink_contract=SinkContractSpec(idempotent=op.contract.idempotent) if op.type == "Sink" else None
        )
        ospec.version = canonical_hash(ospec)
        op_specs.append(ospec)

    spec = PipelineSpec(
        ops=op_specs,
        monoids=monoid_specs,
        backend=pipeline.backend,
        resources=pipeline.resources,
        materialization_points=pipeline.materialization_points,
    )
    spec.version = canonical_hash(spec)  # Compute after build, excluding version
    return spec.model_dump_json()


def reconstruct_pipeline(spec_json: str, registry: Dict[str, Callable], registry_meta: Dict[str, Dict[str, str]],
                         monoid_factory: Dict[str, Callable[[MonoidSpec], Monoid]],
                         sink_registry: Dict[str, Callable], sink_meta: Dict[str, Dict[str, str]],
                         allow_list: set[str]) -> Result[Pipeline, ErrInfo]:
    try:
        spec = PipelineSpec.model_validate_json(spec_json)
        # Verify pipeline version against recomputed hash
        recomputed = canonical_hash(spec)
        if recomputed != spec.version:
            return Err(ErrInfo(code="HASH_MISMATCH", msg="Pipeline hash mismatch"))
        # Verify op/monoid hashes
        for op in spec.ops:
            if canonical_hash(op) != op.version:
                return Err(ErrInfo(code="HASH_MISMATCH", msg=f"Op {op.name} hash mismatch"))
            if op.func_id and op.func_id not in allow_list:
                return Err(ErrInfo(code="UNALLOWED_OP", msg=f"Op {op.func_id} not allowed"))
            if op.lift_id and op.lift_id not in allow_list:
                return Err(ErrInfo(code="UNALLOWED_OP", msg=f"Lift {op.lift_id} not allowed"))
            if op.sink_id and op.sink_id not in allow_list:
                return Err(ErrInfo(code="UNALLOWED_SINK", msg=f"Sink {op.sink_id} not allowed"))
            # Check component versions
            if op.func_id and registry_meta.get(op.func_id, {}).get('version') != op.func_version:
                return Err(ErrInfo(code="VERSION_MISMATCH", msg=f"Func {op.func_id} version changed"))
            if op.lift_id and registry_meta.get(op.lift_id, {}).get('version') != op.lift_version:
                return Err(ErrInfo(code="VERSION_MISMATCH", msg=f"Lift {op.lift_id} version changed"))
            if op.sink_id and sink_meta.get(op.sink_id, {}).get('version') != op.sink_version:
                return Err(ErrInfo(code="VERSION_MISMATCH", msg=f"Sink {op.sink_id} version changed"))
        for mid, mspec in spec.monoids.items():
            if canonical_hash(mspec) != mspec.version:
                return Err(ErrInfo(code="HASH_MISMATCH", msg=f"Monoid {mid} hash mismatch"))
        # Rebuild monoids (derive commutativity at runtime, ignore spec if present)
        monoids = {}
        for mid, mspec in spec.monoids.items():
            factory = monoid_factory.get(mspec.name)
            if not factory:
                return Err(ErrInfo(code="UNKNOWN_MONOID", msg=f"Monoid {mspec.name} not found"))
            monoid = factory(mspec)
            # Derive/verify properties (e.g., test commutativity if needed)
            monoids[mid] = monoid
        # Rebuild IR from spec, then pipeline
        ir = []
        for o in spec.ops:
            if o.type in ["Map", "FlatMap", "BatchMap"]:
                if not o.func_id:
                    return Err(ErrInfo(code="MISSING_FIELD", msg=f"{o.type} missing func_id"))
                func = registry.get(o.func_id)
                if not func:
                    return Err(ErrInfo(code="UNKNOWN_OP", msg=f"Op {o.func_id} not found"))
                ir.append(rebuild_op(o.type, func, o))  # Assume rebuild_op helper
            elif o.type == "Combine":
                if not o.lift_id or not o.monoid_id:
                    return Err(ErrInfo(code="MISSING_FIELD", msg="Combine missing lift_id or monoid_id"))
                lift = registry.get(o.lift_id)
                if not lift:
                    return Err(ErrInfo(code="UNKNOWN_OP", msg=f"Lift {o.lift_id} not found"))
                monoid = monoids.get(o.monoid_id)
                if not monoid:
                    return Err(ErrInfo(code="UNKNOWN_MONOID", msg=f"Monoid {o.monoid_id} not found"))
                ir.append(rebuild_combine_op(lift, monoid, o))
            elif o.type == "Sink":
                if not o.sink_id or not o.sink_contract:
                    return Err(ErrInfo(code="MISSING_FIELD", msg="Sink missing sink_id or contract"))
                sink = sink_registry.get(o.sink_id)
                if not sink:
                    return Err(ErrInfo(code="UNKNOWN_SINK", msg=f"Sink {o.sink_id} not found"))
                ir.append(rebuild_sink_op(sink, o.sink_contract, o))
        return Ok(build_pipeline(ir, registry, monoids, spec.backend, spec.resources, spec.materialization_points))
    except Exception as e:
        return Err(ErrInfo(code="RECONSTRUCT_FAIL", msg=str(e)))

4. Reference Implementations

4.1 Spec Serialization (YAML)

import yaml
def serialize_to_yaml(spec: PipelineSpec) -> str:
    return yaml.safe_dump(spec.model_dump())

4.2 Cross-Process RAG

# Process 1: Serialize
spec_json = serialize_pipeline(pipeline, registry_meta)
# Ship json
# Process 2: Reconstruct
result = reconstruct_pipeline(spec_json, registry, registry_meta, monoid_factory, sink_registry, sink_meta, allow_list=set(registry.keys()))
if isinstance(result, Err):
    raise ValueError(result.error.msg)
pipeline = result.value
result = pipeline.run(data)

4.3 Before/After Refactor

# Before: Coupled
send_live_pipeline(pipeline) # Pickle risks
# After: Spec
spec = serialize_pipeline(pipeline, registry_meta)
reconstructed = reconstruct_pipeline(spec, registry, registry_meta, monoid_factory, sink_registry, sink_meta, allow_list)

5. Property-Based Proofs (repo tests)

Runnable properties live in tests/unit/pipelines/test_specs_roundtrip.py.

@given(pipeline_strat())
def test_round_trip_equiv(pipeline):
    spec = serialize_pipeline(pipeline, registry_meta)
    recon_res = reconstruct_pipeline(spec, registry, registry_meta, monoid_factory, sink_registry, sink_meta, allow_list=set(registry.keys()))
    assert isinstance(recon_res, Ok)
    recon = recon_res.value
    data = generate_data()
    # Use multiset + tolerance from Core 7 for equivalence
    assert multiset_eq(pipeline.run(data), recon.run(data), tol=1e-6)  # Handles order/float drift


6. Runtime Preservation Guarantee

Serialization adds I/O overhead; reconstruction matches original perf.

7. Anti-Patterns & Immediate Fixes

Anti-Pattern Symptom Fix
Pickle objects Security risks Serialize data specs
No versioning Mismatch bugs Include canonical hashes in specs
Loose schemas Invalid specs Pydantic with extra="forbid"
---
## 8. Pre-Core Quiz
1. Serialize what…? → Configs/specs
2. Avoid serializing…? → Live objects
3. Benefit? → Decoupled execution
4. Law? → Reproducibility
5. Use for…? → Multi-process
## 9. Post-Core Exercise
1. Serialize a RAG pipeline spec.
2. Prove round-trip with Hypothesis.
Pipeline Usage (Idiomatic)
spec = serialize_pipeline(pipeline, registry_meta)
recon_res = reconstruct_pipeline(spec, registry, registry_meta, monoid_factory, sink_registry, sink_meta, allow_list)
if isinstance(recon_res, Err):
    logger.error(recon_res.error.msg)
else:
    result = recon_res.value.run(data)
Next Core: 10. Team Adoption – Coding Standards, Patterns, and Review Guidelines for FP-Style Python