Core 5: Data/ML Pipelines – Stateless Transforms, Config-Driven FuncPipe Steps¶
Module 09
Core question:
How do you construct data and ML pipelines in FuncPipe using stateless transforms driven by configuration data, enabling composable, reproducible, and testable chains or pipelines that integrate pure functions with ML-specific steps like embedding or feature extraction?
In this core, we construct data and ML pipelines in FP style within the FuncPipe RAG Builder (now at funcpipe-rag-09). Emphasize stateless transforms (pure functions without internal state or side effects), configured via typed data structures (Pydantic models for steps, params, and ordering), composed via functional pipe/composition with explicit map/flat_map for fan-out. For ML, integrate steps like tokenization, embedding (using HuggingFace sentence-transformers), feature extraction, or model inference as stateless funcs—fit/train at boundaries to produce frozen artifacts (e.g., fitted scalers or loaded models in eval mode), then use only pure transform/predict inside pipelines. Pipelines are defined declaratively in config (e.g., JSON loaded to Pydantic), validated for step compatibility and params at build time, and executed lazily where possible (e.g., generator-based for streaming data). Refactor RAG processing (e.g., doc ingestion to embeddings) into config-driven pipelines, verifying equivalence and laws like idempotence for preprocessing. Dependencies: pip install pydantic sentence-transformers; tradeoffs: custom chains for flexibility vs adapters for third-party (e.g., sklearn FunctionTransformer for purity); sharp edges: statelessness requires explicit artifact injection (e.g., models via boundary factories), config validation prevents runtime errors, ML effects (loading/fitting) at boundaries (use cached deps/lifespan); determinism limited by ML libs (e.g., pin versions, set seeds/eval mode).
Motivation Bug: Hard-coded, stateful ML code leads to brittle, non-reproducible pipelines; config-driven stateless transforms enable versioning, testing, and reuse without hidden dependencies.
Delta from Core 4 (Module 09): Web services expose pipelines; this focuses on building internal data/ML flows with config-driven composability.
Data/ML Protocol (Contract, Entry/Exit Criteria): - Statelessness: Transforms take inputs/artifacts, return outputs; no mutable internal state (fitted models injected as frozen capabilities in eval mode). - Config-Driven: Pipelines defined as typed data (Pydantic-validated per-step); built to Callable via factory with step compatibility checks. - Composability: Chains via pipe/compose; explicit kind (MAP/FLAT_MAP) for fan-out steps. - Laziness: Prefer generator transforms for streaming; eager for ML batches (e.g., embed in batches). - Semantics: Laws like idempotence (preprocess(preprocess(x)) == preprocess(x)); equivalence (pipelined == sequential up to batching/ordering); config invariance (same config + artifacts yield same behavior, up to ML non-determinism); step compatibility (declared out_type matches next in_type); verified via properties with preconditions (e.g., seed pinning for determinism). - Integration: Wrap RAG stages (clean, chunk, embed) in config-driven pipeline; convert to/from iterators/ADTs; handle ML effects at boundaries (fit/load outside core); effectful steps last (lift to FPResult boundary). - Mypy Config: --strict; pydantic typing.
Audience: ML engineers seeking reproducible, FP-style pipelines for data processing and model workflows.
Outcome: 1. Build stateless, config-driven data/ML pipelines with compatibility/param validation. 2. Integrate into RAG with dynamic building and artifact injection. 3. Prove equivalence/laws with Hypothesis.
1. Laws & Invariants¶
| Law | Description | Enforcement |
|---|---|---|
| Idempotence Law | For preprocessing: f(f(x)) == f(x). | Property tests |
| Equivalence Law | Pipelined execution == sequential funcs (up to batching/ordering). | Hypothesis equivalence |
| Config Invariance | Same config + artifacts yield same outputs for same inputs (up to ML non-determinism; precondition: seed/eval pinning). | Property tests with preconditions |
| Step Compatibility Inv | Pipeline build fails if step out_type doesn't match next in_type. | Build-time validation |
| Stateless Inv | Repeat calls with same inputs/artifacts yield same outputs; no hidden state. | Code reviews/tests |
These laws ensure pipelines are reproducible and testable.
2. Decision Table¶
| Scenario | Data Volume | ML Heavy | Config Complexity | Recommended |
|---|---|---|---|---|
| Simple preprocessing | Small | No | Low | Custom pipe chain |
| Model inference | Medium | Yes | Medium | Custom with artifacts |
| Streaming data | Large | No | Low | Generator funcs |
| Dynamic configs | Any | Yes | High | Config factory + validation |
Choose based on needs; inject artifacts for ML reproducibility.
3. Public API (Config Factory & Builders)¶
Repo implementation lives in src/funcpipe_rag/pipelines/configured.py (stdlib-only, iterator-first, Result at the boundary).
Config as typed Pydantic; build to pipeline Callable with compatibility checks. Guard ML imports.
Step Kinds (for composition): | Kind | Description | |------|-------------| | MAP | 1:1 transform; use map | | FLAT_MAP | 1:many; use chain.from_iterable o map |
from typing import Callable, TypeVar, Iterator, Dict, Any
from functools import partial
from itertools import chain
from pydantic import BaseModel, model_validator, ConfigDict, Field
from funcpipe_rag import FPResult, Ok, Err, ErrInfo, RawDoc, CleanDoc, ChunkWithoutEmbedding, Chunk
import json # Stdlib for config load
T = TypeVar('T')
U = TypeVar('U')
# Per-step param models (forbid extras)
class CleanParams(BaseModel):
model_config = ConfigDict(extra='forbid')
strip: bool = True
lower: bool = True
class ChunkParams(BaseModel):
model_config = ConfigDict(extra='forbid')
size: int = 512
overlap: int = 0
@model_validator(mode='after')
def check_overlap(self) -> 'ChunkParams':
if self.size <= 0:
raise ValueError("size must be > 0")
if not (0 <= self.overlap < self.size):
raise ValueError("overlap must satisfy 0 <= overlap < size")
return self
class EmbedParams(BaseModel):
model_config = ConfigDict(extra='forbid')
# No batch_size; use batch combinator if needed
# Union for params
class StepConfig(BaseModel):
name: str
params: Dict[str, Any] = Field(default_factory=dict) # Normalized below
@model_validator(mode='after')
def validate_params(self) -> 'StepConfig':
param_models = {
'clean': CleanParams,
'chunk': ChunkParams,
'embed': EmbedParams,
}
model = param_models.get(self.name)
if model:
parsed = model(**self.params) # Validates; rejects unknown/extras
self.params = parsed.model_dump() # Normalize/coerce
else:
raise ValueError(f"Unknown step: {self.name}")
return self
class PipelineConfig(BaseModel):
steps: list[StepConfig]
# Stateless transforms (define before registry)
def clean_doc(doc: RawDoc, strip: bool = True, lower: bool = True) -> CleanDoc:
text = doc.content.strip() if strip else doc.content
text = text.lower() if lower else text
return CleanDoc(doc_id=doc.metadata.get('id', ''), abstract=text)
def chunk_doc(cd: CleanDoc, size: int = 512, overlap: int = 0) -> Iterator[ChunkWithoutEmbedding]:
text = cd.abstract
step = size - overlap
for i in range(0, len(text), step):
chunk = text[i:i + size]
if chunk:
yield ChunkWithoutEmbedding(cd.doc_id, chunk, i, i + len(chunk))
def embed_chunk(cwe: ChunkWithoutEmbedding, embedder: Callable[[str], list[float]]) -> FPResult[Chunk, ErrInfo]:
try:
emb = embedder(cwe.text)
return Ok(Chunk(cwe.doc_id, cwe.text, emb, cwe.start, cwe.end))
except Exception as ex:
return Err(ErrInfo.from_exc(ex))
# Registry with metadata (callables, runtime types)
STEP_REGISTRY = {
'clean': {'kind': 'MAP', 'func': clean_doc, 'in_type': RawDoc, 'out_type': CleanDoc, 'param_model': CleanParams},
'chunk': {'kind': 'FLAT_MAP', 'func': chunk_doc, 'in_type': CleanDoc, 'out_type': ChunkWithoutEmbedding,
'param_model': ChunkParams},
'embed': {'kind': 'MAP', 'func': embed_chunk, 'in_type': ChunkWithoutEmbedding, 'out_type': FPResult,
'param_model': EmbedParams}, # Use class for runtime
}
SENTENCE_TRANSFORMERS_AVAILABLE = False
try:
import sentence_transformers
SENTENCE_TRANSFORMERS_AVAILABLE = True
except ImportError:
pass
def load_config(path: str) -> PipelineConfig:
with open(path, 'r') as f:
data = json.load(f)
return PipelineConfig(**data) # Validates
def build_pipeline(config: PipelineConfig, artifacts: dict[str, Any]) -> Callable[
[Iterator[RawDoc]], Iterator[FPResult[Chunk, ErrInfo]]]:
funcs: list[Callable] = []
prev_type: type = RawDoc # Start
for step in config.steps:
reg = STEP_REGISTRY[step.name]
func: Callable = reg['func']
overlap = set(step.params) & set(artifacts.get(step.name, {}))
if overlap:
raise ValueError(f"Artifact overlaps config param: {overlap}")
partial_func = partial(func, **step.params, **artifacts.get(step.name, {})) # Inject
funcs.append((reg['kind'], partial_func))
if not issubclass(prev_type, reg['in_type']):
raise TypeError(f"Incompatible: {prev_type} not subclass of {reg['in_type']}")
prev_type = reg['out_type']
# Enforce effectful last
if prev_type != FPResult:
raise ValueError("Effectful steps must be last")
def pipeline(data: Iterator[RawDoc]) -> Iterator[FPResult[Chunk, ErrInfo]]:
it: Iterator[Any] = data
for kind, f in funcs:
if kind == 'FLAT_MAP':
it = chain.from_iterable(map(f, it))
else:
it = map(f, it)
return it
return pipeline
4. Reference Implementations¶
4.1 Stateless Transforms¶
4.2 Config-Driven Building¶
# Artifact factories (effects at boundary)
def embed_artifact(model_name: str = 'all-MiniLM-L6-v2') -> Callable[[str], list[float]]:
if SENTENCE_TRANSFORMERS_AVAILABLE:
model = sentence_transformers.SentenceTransformer(model_name)
model.eval() # Freeze; pin seed/version/device for determinism
return lambda text: model.encode(text).tolist()
raise ImportError("Install sentence-transformers")
# Artifacts from boundary (e.g., lifespan/DI)
artifacts = {'embed': {'embedder': embed_artifact()}} # Example; inject from outside in prod
4.3 ML-Specific Steps¶
# Adapted sklearn (pure transform only; fit at boundary)
from sklearn.preprocessing import StandardScaler
# Fit at boundary
def fit_scaler(data: list[list[float]], params: dict = {}) -> StandardScaler:
scaler = StandardScaler(**params)
scaler.fit(data)
return scaler # Freeze if needed
# Stateless transform (inject fitted; batch for efficiency)
def scale_features(features: list[list[float]], scaler: StandardScaler) -> list[list[float]]:
return scaler.transform(features).tolist() # Vectorized batch
4.4 Integration in RAG¶
# RAG with config pipeline
def rag_ml_pipeline(config_path: str, docs: Iterator[RawDoc], artifacts: dict[str, Any]) -> Iterator[FPResult[Chunk, ErrInfo]]:
config = load_config(config_path)
pipeline = build_pipeline(config, artifacts)
return pipeline(docs)
4.5 Before/After Refactor¶
# Before: Hard-coded
cleaned = map(partial(clean_doc, lower=True), docs)
chunks = chain.from_iterable(map(partial(chunk_doc, size=512), cleaned))
embedded = map(partial(embed_chunk, embedder=embed_artifact()), chunks)
# After: Config-driven
config = PipelineConfig(steps=[StepConfig(name='clean', params={'lower': True}), StepConfig(name='chunk', params={'size': 512}), StepConfig(name='embed')])
pipeline = build_pipeline(config, {'embed': {'embedder': embed_artifact()}})
result = pipeline(docs)
¶
# Before: Hard-coded
cleaned = map(partial(clean_doc, lower=True), docs)
chunks = chain.from_iterable(map(partial(chunk_doc, size=512), cleaned))
embedded = map(partial(embed_chunk, embedder=embed_artifact()), chunks)
# After: Config-driven
config = PipelineConfig(steps=[StepConfig(name='clean', params={'lower': True}), StepConfig(name='chunk', params={'size': 512}), StepConfig(name='embed')])
pipeline = build_pipeline(config, {'embed': {'embedder': embed_artifact()}})
result = pipeline(docs)
5. Property-Based Proofs (repo tests)¶
Runnable tests live in tests/unit/pipelines/test_configured_pipeline.py.
from hypothesis import given, assume
import hypothesis.strategies as st
# Structured strategy
raw_doc_strat = st.builds(RawDoc, content=st.text(), metadata=st.dictionaries(st.text(), st.text()))
@given(st.lists(raw_doc_strat))
def test_idempotence_clean(docs):
def normalize_text(s: str, strip: bool = True, lower: bool = True) -> str:
s = s.strip() if strip else s
return s.lower() if lower else s
cleaned = [normalize_text(d.content) for d in docs]
twice = [normalize_text(s) for s in cleaned]
assert cleaned == twice
@given(st.text(min_size=1), st.integers(min_value=1, max_value=10), st.integers(min_value=0, max_value=9))
def test_chunk_reconstruction(text, size, overlap):
assume(overlap < size)
cd = CleanDoc('id', text)
chunks = list(chunk_doc(cd, size=size, overlap=overlap))
recon = chunks[0].text
for c in chunks[1:]:
recon += c.text[overlap:] # Property: precise reconstruction by dropping overlap
assert recon == text
6. Runtime Preservation Guarantee¶
Stateless allows parallelism; config + pinned artifacts enable A/B testing; determinism requires eval mode/seed pinning.
7. Anti-Patterns & Immediate Fixes¶
| Anti-Pattern | Symptom | Fix |
|---|---|---|
| Stateful steps | Non-reproducible | Fit at boundary; inject frozen |
| Hard-coded params | Brittle changes | Use config data |
| No validation | Runtime fails | Validate config + compatibility |
| Eager full load | Memory issues | Use generators for streaming |
8. Pre-Core Quiz¶
- Stateless for…? → Reproducibility
- Config-driven…? → Versioning/reuse
- Chains for…? → Composability
- ML integration? → Boundary fit + inject
- Benefit? → Testable pipelines
9. Post-Core Exercise¶
- Build config-driven RAG pipeline.
- Prove idempotence with Hypothesis.
Pipeline Usage (Idiomatic)
Next Core: 6. Command-Line and Config-Driven FuncPipe – Integrating with Click/Typer