Core 9: Cross-Process Composition – Serializing Configs/Specifications, Not Live Objects¶
Module 09
Core question: How do you compose functional pipelines across processes by serializing configurations and specifications (not live objects or code), ensuring reproducible, decoupled execution in distributed or multi-stage workflows while maintaining purity, security, and version control in FuncPipe? In this core, we extend FuncPipe to cross-process composition in the RAG Builder (now at
funcpipe-rag-09). Live objects (e.g., closures, partials, or instances) can't be serialized safely across processes, leading to brittleness, security risks (e.g., pickle attacks), and version mismatches. Instead, we serialize pure data specifications (e.g., YAML/JSON configs defining pipeline graphs, operator names, params, monoids, error policies) and reconstruct pipelines deterministically at runtime using registries and factories. This enables workflows like: serialize a RAG config on one machine, ship it to a cluster for execution, or chain multi-stage jobs (e.g., ingest → process → serve) without code coupling. We refactor RAG to serialize specs, verifying laws like reproducibility (same spec + registry yields same pipeline) and security (no code execution in deserialization). Specs are linear sequences of ops for simplicity; DAGs are out-of-scope but extendable via edges. Dependencies:pip install pyyaml; tradeoffs: Specs add reconstruction overhead but enable decoupling/versioning; sharp edges: Use schemas (Pydantic) for validation; avoid over-specifying (keep specs data-only).
Motivation Bug: Serializing live objects (e.g., via pickle) exposes code execution risks, ties to runtime envs, and breaks versioning; pure specs decouple definition from execution for safe, reproducible composition.
Delta from Core 8 (Module 09): Facades normalize libs to pure descriptions; this core enables cross-process by serializing specs (data defining pipelines), not objects, allowing distributed reconstruction.
Spec Protocol (Contract, Entry/Exit Criteria): - Purity: Specs are pure data (YAML/JSON); no code/objects. - Reconstruction: Factories/registries build pipelines from specs deterministically. - Validation: Pydantic schemas enforce structure/security (extra="forbid", no arbitrary fields). - Semantics: Laws like reproducibility (same spec + registry yields same pipeline); equivalence (reconstructed pipeline == original up to env); versioning (specs include content-addressed hashes for ops/monoids, computed canonically from sorted JSON); verified via round-trip tests. - Integration: Extend Module 6 configs; serialize graphs from Core 7. Op params are resolved via registry (e.g., partials/Reader); no explicit params in specs to keep them data-only. - Mypy Config: --strict; Pydantic for schemas.
Audience: Engineers building distributed/multi-stage FP systems needing safe, versioned composition.
Outcome: 1. Design serializable specs for pipelines. 2. Serialize/reconstruct RAG workflows cross-process. 3. Prove reproducibility with round-trip tests.
1. Laws & Invariants¶
| Law | Description | Enforcement |
|---|---|---|
| Reproducibility Law | Same spec + registry yields equivalent pipeline. | Round-trip Hypothesis tests |
| Equivalence Law | Reconstructed pipeline behaves as original (up to env). | Property tests on outputs |
| Security Inv | Deserialization executes no code; data only. | Schema validation/no eval |
| Version Inv | Specs include canonical hashes (sha256 of sorted JSON) for ops/monoids. | Pydantic + tests |
| These laws ensure specs enable safe composition. | ||
| --- | ||
| ## 2. Decision Table | ||
| Scenario | Decoupling Needed | Security Risk |
| ----------------------- | ------------------- | --------------- |
| Multi-stage jobs | High | Medium |
| Cluster shipping | High | High |
| Versioned workflows | High | Low |
| Specs for decoupling; schemas for security. | ||
| --- | ||
## 3. Public API (src/funcpipe_rag/pipelines/specs.py – Serializable Specs) |
||
| Repo alignment note (end-of-Module-09): | ||
| - This repo uses frozen dataclasses (stdlib) for specs to keep dependencies light. | ||
- Canonical hashing + allow-listed reconstruction are implemented in src/funcpipe_rag/pipelines/specs.py. |
from typing import List, Dict, Literal, Optional, Any, Callable
from pydantic import BaseModel, Field, ConfigDict
from funcpipe_rag import ErrorPolicy, ErrInfo, Result, Ok, Err
from funcpipe_rag import Pipeline, build_pipeline # Assume from prior
from funcpipe_rag import Monoid # Canonical
import hashlib
import json
class MonoidSpec(BaseModel):
model_config = ConfigDict(extra="forbid")
type: Literal["built_in", "custom"]
name: Literal["sum", "max"] | str # Built-in or custom id
params: Optional[Dict[str, Any]] = None # e.g., {"identity": 0}
version: str = "" # sha256 of canonical JSON (excluding version)
class SinkContractSpec(BaseModel):
model_config = ConfigDict(extra="forbid")
idempotent: bool
# Add more as needed
class OpSpec(BaseModel):
model_config = ConfigDict(extra="forbid")
type: Literal["Map", "FlatMap", "BatchMap", "Combine", "Sink"]
name: str
error_policy: ErrorPolicy
in_type: str
out_type: str
version: str = "" # sha256 of canonical JSON (excluding version)
func_id: Optional[str] = None # For Map/FlatMap/BatchMap
func_version: Optional[str] = None # Registry version for func
max_batch_size: Optional[int] = None # For BatchMap
lift_id: Optional[str] = None # For Combine
lift_version: Optional[str] = None # Registry version for lift
monoid_id: Optional[str] = None # For Combine
sink_id: Optional[str] = None # For Sink
sink_version: Optional[str] = None # Registry version for sink
sink_contract: Optional[SinkContractSpec] = None # For Sink
class PipelineSpec(BaseModel):
model_config = ConfigDict(extra="forbid")
ops: List[OpSpec]
monoids: Dict[str, MonoidSpec] # Serialized monoid data
backend: Literal["dask", "beam"]
resources: Dict[str, Any] = Field(default_factory=dict)
materialization_points: List[str] = Field(default_factory=list)
version: str = "" # Pipeline hash (excluding version)
def canonical_hash(model: BaseModel) -> str:
payload = model.model_dump(exclude={"version"}, mode="json")
data = json.dumps(payload, sort_keys=True, separators=(",", ":"))
return hashlib.sha256(data.encode()).hexdigest()
def serialize_pipeline(pipeline: Pipeline, registry_meta: Dict[str, Dict[str, str]]) -> str:
# Project IR to spec parts, compute hashes after building
monoid_specs = {}
for mid, m in pipeline.monoids.items():
mspec = MonoidSpec(
type="built_in" if m.is_builtin else "custom",
name=m.name,
params=m.params
)
mspec.version = canonical_hash(mspec)
monoid_specs[mid] = mspec
op_specs = []
for op in pipeline.ops: # Assume pipeline.ops is IR list
ospec = OpSpec(
type=op.type,
name=op.name,
error_policy=op.error_policy,
in_type=op.in_type,
out_type=op.out_type,
func_id=op.func_id if op.type in ["Map", "FlatMap", "BatchMap"] else None,
func_version=registry_meta.get(op.func_id, {}).get('version') if op.func_id else None,
max_batch_size=op.max_batch_size if op.type == "BatchMap" else None,
lift_id=op.lift_id if op.type == "Combine" else None,
lift_version=registry_meta.get(op.lift_id, {}).get('version') if op.lift_id else None,
monoid_id=op.monoid_id if op.type == "Combine" else None,
sink_id=op.sink_id if op.type == "Sink" else None,
sink_version=registry_meta.get(op.sink_id, {}).get('version') if op.sink_id else None,
sink_contract=SinkContractSpec(idempotent=op.contract.idempotent) if op.type == "Sink" else None
)
ospec.version = canonical_hash(ospec)
op_specs.append(ospec)
spec = PipelineSpec(
ops=op_specs,
monoids=monoid_specs,
backend=pipeline.backend,
resources=pipeline.resources,
materialization_points=pipeline.materialization_points,
)
spec.version = canonical_hash(spec) # Compute after build, excluding version
return spec.model_dump_json()
def reconstruct_pipeline(spec_json: str, registry: Dict[str, Callable], registry_meta: Dict[str, Dict[str, str]],
monoid_factory: Dict[str, Callable[[MonoidSpec], Monoid]],
sink_registry: Dict[str, Callable], sink_meta: Dict[str, Dict[str, str]],
allow_list: set[str]) -> Result[Pipeline, ErrInfo]:
try:
spec = PipelineSpec.model_validate_json(spec_json)
# Verify pipeline version against recomputed hash
recomputed = canonical_hash(spec)
if recomputed != spec.version:
return Err(ErrInfo(code="HASH_MISMATCH", msg="Pipeline hash mismatch"))
# Verify op/monoid hashes
for op in spec.ops:
if canonical_hash(op) != op.version:
return Err(ErrInfo(code="HASH_MISMATCH", msg=f"Op {op.name} hash mismatch"))
if op.func_id and op.func_id not in allow_list:
return Err(ErrInfo(code="UNALLOWED_OP", msg=f"Op {op.func_id} not allowed"))
if op.lift_id and op.lift_id not in allow_list:
return Err(ErrInfo(code="UNALLOWED_OP", msg=f"Lift {op.lift_id} not allowed"))
if op.sink_id and op.sink_id not in allow_list:
return Err(ErrInfo(code="UNALLOWED_SINK", msg=f"Sink {op.sink_id} not allowed"))
# Check component versions
if op.func_id and registry_meta.get(op.func_id, {}).get('version') != op.func_version:
return Err(ErrInfo(code="VERSION_MISMATCH", msg=f"Func {op.func_id} version changed"))
if op.lift_id and registry_meta.get(op.lift_id, {}).get('version') != op.lift_version:
return Err(ErrInfo(code="VERSION_MISMATCH", msg=f"Lift {op.lift_id} version changed"))
if op.sink_id and sink_meta.get(op.sink_id, {}).get('version') != op.sink_version:
return Err(ErrInfo(code="VERSION_MISMATCH", msg=f"Sink {op.sink_id} version changed"))
for mid, mspec in spec.monoids.items():
if canonical_hash(mspec) != mspec.version:
return Err(ErrInfo(code="HASH_MISMATCH", msg=f"Monoid {mid} hash mismatch"))
# Rebuild monoids (derive commutativity at runtime, ignore spec if present)
monoids = {}
for mid, mspec in spec.monoids.items():
factory = monoid_factory.get(mspec.name)
if not factory:
return Err(ErrInfo(code="UNKNOWN_MONOID", msg=f"Monoid {mspec.name} not found"))
monoid = factory(mspec)
# Derive/verify properties (e.g., test commutativity if needed)
monoids[mid] = monoid
# Rebuild IR from spec, then pipeline
ir = []
for o in spec.ops:
if o.type in ["Map", "FlatMap", "BatchMap"]:
if not o.func_id:
return Err(ErrInfo(code="MISSING_FIELD", msg=f"{o.type} missing func_id"))
func = registry.get(o.func_id)
if not func:
return Err(ErrInfo(code="UNKNOWN_OP", msg=f"Op {o.func_id} not found"))
ir.append(rebuild_op(o.type, func, o)) # Assume rebuild_op helper
elif o.type == "Combine":
if not o.lift_id or not o.monoid_id:
return Err(ErrInfo(code="MISSING_FIELD", msg="Combine missing lift_id or monoid_id"))
lift = registry.get(o.lift_id)
if not lift:
return Err(ErrInfo(code="UNKNOWN_OP", msg=f"Lift {o.lift_id} not found"))
monoid = monoids.get(o.monoid_id)
if not monoid:
return Err(ErrInfo(code="UNKNOWN_MONOID", msg=f"Monoid {o.monoid_id} not found"))
ir.append(rebuild_combine_op(lift, monoid, o))
elif o.type == "Sink":
if not o.sink_id or not o.sink_contract:
return Err(ErrInfo(code="MISSING_FIELD", msg="Sink missing sink_id or contract"))
sink = sink_registry.get(o.sink_id)
if not sink:
return Err(ErrInfo(code="UNKNOWN_SINK", msg=f"Sink {o.sink_id} not found"))
ir.append(rebuild_sink_op(sink, o.sink_contract, o))
return Ok(build_pipeline(ir, registry, monoids, spec.backend, spec.resources, spec.materialization_points))
except Exception as e:
return Err(ErrInfo(code="RECONSTRUCT_FAIL", msg=str(e)))
¶
from typing import List, Dict, Literal, Optional, Any, Callable
from pydantic import BaseModel, Field, ConfigDict
from funcpipe_rag import ErrorPolicy, ErrInfo, Result, Ok, Err
from funcpipe_rag import Pipeline, build_pipeline # Assume from prior
from funcpipe_rag import Monoid # Canonical
import hashlib
import json
class MonoidSpec(BaseModel):
model_config = ConfigDict(extra="forbid")
type: Literal["built_in", "custom"]
name: Literal["sum", "max"] | str # Built-in or custom id
params: Optional[Dict[str, Any]] = None # e.g., {"identity": 0}
version: str = "" # sha256 of canonical JSON (excluding version)
class SinkContractSpec(BaseModel):
model_config = ConfigDict(extra="forbid")
idempotent: bool
# Add more as needed
class OpSpec(BaseModel):
model_config = ConfigDict(extra="forbid")
type: Literal["Map", "FlatMap", "BatchMap", "Combine", "Sink"]
name: str
error_policy: ErrorPolicy
in_type: str
out_type: str
version: str = "" # sha256 of canonical JSON (excluding version)
func_id: Optional[str] = None # For Map/FlatMap/BatchMap
func_version: Optional[str] = None # Registry version for func
max_batch_size: Optional[int] = None # For BatchMap
lift_id: Optional[str] = None # For Combine
lift_version: Optional[str] = None # Registry version for lift
monoid_id: Optional[str] = None # For Combine
sink_id: Optional[str] = None # For Sink
sink_version: Optional[str] = None # Registry version for sink
sink_contract: Optional[SinkContractSpec] = None # For Sink
class PipelineSpec(BaseModel):
model_config = ConfigDict(extra="forbid")
ops: List[OpSpec]
monoids: Dict[str, MonoidSpec] # Serialized monoid data
backend: Literal["dask", "beam"]
resources: Dict[str, Any] = Field(default_factory=dict)
materialization_points: List[str] = Field(default_factory=list)
version: str = "" # Pipeline hash (excluding version)
def canonical_hash(model: BaseModel) -> str:
payload = model.model_dump(exclude={"version"}, mode="json")
data = json.dumps(payload, sort_keys=True, separators=(",", ":"))
return hashlib.sha256(data.encode()).hexdigest()
def serialize_pipeline(pipeline: Pipeline, registry_meta: Dict[str, Dict[str, str]]) -> str:
# Project IR to spec parts, compute hashes after building
monoid_specs = {}
for mid, m in pipeline.monoids.items():
mspec = MonoidSpec(
type="built_in" if m.is_builtin else "custom",
name=m.name,
params=m.params
)
mspec.version = canonical_hash(mspec)
monoid_specs[mid] = mspec
op_specs = []
for op in pipeline.ops: # Assume pipeline.ops is IR list
ospec = OpSpec(
type=op.type,
name=op.name,
error_policy=op.error_policy,
in_type=op.in_type,
out_type=op.out_type,
func_id=op.func_id if op.type in ["Map", "FlatMap", "BatchMap"] else None,
func_version=registry_meta.get(op.func_id, {}).get('version') if op.func_id else None,
max_batch_size=op.max_batch_size if op.type == "BatchMap" else None,
lift_id=op.lift_id if op.type == "Combine" else None,
lift_version=registry_meta.get(op.lift_id, {}).get('version') if op.lift_id else None,
monoid_id=op.monoid_id if op.type == "Combine" else None,
sink_id=op.sink_id if op.type == "Sink" else None,
sink_version=registry_meta.get(op.sink_id, {}).get('version') if op.sink_id else None,
sink_contract=SinkContractSpec(idempotent=op.contract.idempotent) if op.type == "Sink" else None
)
ospec.version = canonical_hash(ospec)
op_specs.append(ospec)
spec = PipelineSpec(
ops=op_specs,
monoids=monoid_specs,
backend=pipeline.backend,
resources=pipeline.resources,
materialization_points=pipeline.materialization_points,
)
spec.version = canonical_hash(spec) # Compute after build, excluding version
return spec.model_dump_json()
def reconstruct_pipeline(spec_json: str, registry: Dict[str, Callable], registry_meta: Dict[str, Dict[str, str]],
monoid_factory: Dict[str, Callable[[MonoidSpec], Monoid]],
sink_registry: Dict[str, Callable], sink_meta: Dict[str, Dict[str, str]],
allow_list: set[str]) -> Result[Pipeline, ErrInfo]:
try:
spec = PipelineSpec.model_validate_json(spec_json)
# Verify pipeline version against recomputed hash
recomputed = canonical_hash(spec)
if recomputed != spec.version:
return Err(ErrInfo(code="HASH_MISMATCH", msg="Pipeline hash mismatch"))
# Verify op/monoid hashes
for op in spec.ops:
if canonical_hash(op) != op.version:
return Err(ErrInfo(code="HASH_MISMATCH", msg=f"Op {op.name} hash mismatch"))
if op.func_id and op.func_id not in allow_list:
return Err(ErrInfo(code="UNALLOWED_OP", msg=f"Op {op.func_id} not allowed"))
if op.lift_id and op.lift_id not in allow_list:
return Err(ErrInfo(code="UNALLOWED_OP", msg=f"Lift {op.lift_id} not allowed"))
if op.sink_id and op.sink_id not in allow_list:
return Err(ErrInfo(code="UNALLOWED_SINK", msg=f"Sink {op.sink_id} not allowed"))
# Check component versions
if op.func_id and registry_meta.get(op.func_id, {}).get('version') != op.func_version:
return Err(ErrInfo(code="VERSION_MISMATCH", msg=f"Func {op.func_id} version changed"))
if op.lift_id and registry_meta.get(op.lift_id, {}).get('version') != op.lift_version:
return Err(ErrInfo(code="VERSION_MISMATCH", msg=f"Lift {op.lift_id} version changed"))
if op.sink_id and sink_meta.get(op.sink_id, {}).get('version') != op.sink_version:
return Err(ErrInfo(code="VERSION_MISMATCH", msg=f"Sink {op.sink_id} version changed"))
for mid, mspec in spec.monoids.items():
if canonical_hash(mspec) != mspec.version:
return Err(ErrInfo(code="HASH_MISMATCH", msg=f"Monoid {mid} hash mismatch"))
# Rebuild monoids (derive commutativity at runtime, ignore spec if present)
monoids = {}
for mid, mspec in spec.monoids.items():
factory = monoid_factory.get(mspec.name)
if not factory:
return Err(ErrInfo(code="UNKNOWN_MONOID", msg=f"Monoid {mspec.name} not found"))
monoid = factory(mspec)
# Derive/verify properties (e.g., test commutativity if needed)
monoids[mid] = monoid
# Rebuild IR from spec, then pipeline
ir = []
for o in spec.ops:
if o.type in ["Map", "FlatMap", "BatchMap"]:
if not o.func_id:
return Err(ErrInfo(code="MISSING_FIELD", msg=f"{o.type} missing func_id"))
func = registry.get(o.func_id)
if not func:
return Err(ErrInfo(code="UNKNOWN_OP", msg=f"Op {o.func_id} not found"))
ir.append(rebuild_op(o.type, func, o)) # Assume rebuild_op helper
elif o.type == "Combine":
if not o.lift_id or not o.monoid_id:
return Err(ErrInfo(code="MISSING_FIELD", msg="Combine missing lift_id or monoid_id"))
lift = registry.get(o.lift_id)
if not lift:
return Err(ErrInfo(code="UNKNOWN_OP", msg=f"Lift {o.lift_id} not found"))
monoid = monoids.get(o.monoid_id)
if not monoid:
return Err(ErrInfo(code="UNKNOWN_MONOID", msg=f"Monoid {o.monoid_id} not found"))
ir.append(rebuild_combine_op(lift, monoid, o))
elif o.type == "Sink":
if not o.sink_id or not o.sink_contract:
return Err(ErrInfo(code="MISSING_FIELD", msg="Sink missing sink_id or contract"))
sink = sink_registry.get(o.sink_id)
if not sink:
return Err(ErrInfo(code="UNKNOWN_SINK", msg=f"Sink {o.sink_id} not found"))
ir.append(rebuild_sink_op(sink, o.sink_contract, o))
return Ok(build_pipeline(ir, registry, monoids, spec.backend, spec.resources, spec.materialization_points))
except Exception as e:
return Err(ErrInfo(code="RECONSTRUCT_FAIL", msg=str(e)))
4. Reference Implementations¶
4.1 Spec Serialization (YAML)¶
import yaml
def serialize_to_yaml(spec: PipelineSpec) -> str:
return yaml.safe_dump(spec.model_dump())
4.2 Cross-Process RAG¶
# Process 1: Serialize
spec_json = serialize_pipeline(pipeline, registry_meta)
# Ship json
# Process 2: Reconstruct
result = reconstruct_pipeline(spec_json, registry, registry_meta, monoid_factory, sink_registry, sink_meta, allow_list=set(registry.keys()))
if isinstance(result, Err):
raise ValueError(result.error.msg)
pipeline = result.value
result = pipeline.run(data)
4.3 Before/After Refactor¶
# Before: Coupled
send_live_pipeline(pipeline) # Pickle risks
# After: Spec
spec = serialize_pipeline(pipeline, registry_meta)
reconstructed = reconstruct_pipeline(spec, registry, registry_meta, monoid_factory, sink_registry, sink_meta, allow_list)
¶
# Before: Coupled
send_live_pipeline(pipeline) # Pickle risks
# After: Spec
spec = serialize_pipeline(pipeline, registry_meta)
reconstructed = reconstruct_pipeline(spec, registry, registry_meta, monoid_factory, sink_registry, sink_meta, allow_list)
5. Property-Based Proofs (repo tests)¶
Runnable properties live in tests/unit/pipelines/test_specs_roundtrip.py.
@given(pipeline_strat())
def test_round_trip_equiv(pipeline):
spec = serialize_pipeline(pipeline, registry_meta)
recon_res = reconstruct_pipeline(spec, registry, registry_meta, monoid_factory, sink_registry, sink_meta, allow_list=set(registry.keys()))
assert isinstance(recon_res, Ok)
recon = recon_res.value
data = generate_data()
# Use multiset + tolerance from Core 7 for equivalence
assert multiset_eq(pipeline.run(data), recon.run(data), tol=1e-6) # Handles order/float drift
6. Runtime Preservation Guarantee¶
Serialization adds I/O overhead; reconstruction matches original perf.¶
7. Anti-Patterns & Immediate Fixes¶
| Anti-Pattern | Symptom | Fix |
|---|---|---|
| Pickle objects | Security risks | Serialize data specs |
| No versioning | Mismatch bugs | Include canonical hashes in specs |
| Loose schemas | Invalid specs | Pydantic with extra="forbid" |
| --- | ||
| ## 8. Pre-Core Quiz | ||
| 1. Serialize what…? → Configs/specs | ||
| 2. Avoid serializing…? → Live objects | ||
| 3. Benefit? → Decoupled execution | ||
| 4. Law? → Reproducibility | ||
| 5. Use for…? → Multi-process | ||
| ## 9. Post-Core Exercise | ||
| 1. Serialize a RAG pipeline spec. | ||
| 2. Prove round-trip with Hypothesis. | ||
| Pipeline Usage (Idiomatic) | ||
| Next Core: 10. Team Adoption – Coding Standards, Patterns, and Review Guidelines for FP-Style Python |