Skip to content

Core 5: Data/ML Pipelines – Stateless Transforms, Config-Driven FuncPipe Steps

Module 09

Core question:
How do you construct data and ML pipelines in FuncPipe using stateless transforms driven by configuration data, enabling composable, reproducible, and testable chains or pipelines that integrate pure functions with ML-specific steps like embedding or feature extraction?

In this core, we construct data and ML pipelines in FP style within the FuncPipe RAG Builder (now at funcpipe-rag-09). Emphasize stateless transforms (pure functions without internal state or side effects), configured via typed data structures (Pydantic models for steps, params, and ordering), composed via functional pipe/composition with explicit map/flat_map for fan-out. For ML, integrate steps like tokenization, embedding (using HuggingFace sentence-transformers), feature extraction, or model inference as stateless funcs—fit/train at boundaries to produce frozen artifacts (e.g., fitted scalers or loaded models in eval mode), then use only pure transform/predict inside pipelines. Pipelines are defined declaratively in config (e.g., JSON loaded to Pydantic), validated for step compatibility and params at build time, and executed lazily where possible (e.g., generator-based for streaming data). Refactor RAG processing (e.g., doc ingestion to embeddings) into config-driven pipelines, verifying equivalence and laws like idempotence for preprocessing. Dependencies: pip install pydantic sentence-transformers; tradeoffs: custom chains for flexibility vs adapters for third-party (e.g., sklearn FunctionTransformer for purity); sharp edges: statelessness requires explicit artifact injection (e.g., models via boundary factories), config validation prevents runtime errors, ML effects (loading/fitting) at boundaries (use cached deps/lifespan); determinism limited by ML libs (e.g., pin versions, set seeds/eval mode).

Motivation Bug: Hard-coded, stateful ML code leads to brittle, non-reproducible pipelines; config-driven stateless transforms enable versioning, testing, and reuse without hidden dependencies.

Delta from Core 4 (Module 09): Web services expose pipelines; this focuses on building internal data/ML flows with config-driven composability.

Data/ML Protocol (Contract, Entry/Exit Criteria): - Statelessness: Transforms take inputs/artifacts, return outputs; no mutable internal state (fitted models injected as frozen capabilities in eval mode). - Config-Driven: Pipelines defined as typed data (Pydantic-validated per-step); built to Callable via factory with step compatibility checks. - Composability: Chains via pipe/compose; explicit kind (MAP/FLAT_MAP) for fan-out steps. - Laziness: Prefer generator transforms for streaming; eager for ML batches (e.g., embed in batches). - Semantics: Laws like idempotence (preprocess(preprocess(x)) == preprocess(x)); equivalence (pipelined == sequential up to batching/ordering); config invariance (same config + artifacts yield same behavior, up to ML non-determinism); step compatibility (declared out_type matches next in_type); verified via properties with preconditions (e.g., seed pinning for determinism). - Integration: Wrap RAG stages (clean, chunk, embed) in config-driven pipeline; convert to/from iterators/ADTs; handle ML effects at boundaries (fit/load outside core); effectful steps last (lift to FPResult boundary). - Mypy Config: --strict; pydantic typing.

Audience: ML engineers seeking reproducible, FP-style pipelines for data processing and model workflows.

Outcome: 1. Build stateless, config-driven data/ML pipelines with compatibility/param validation. 2. Integrate into RAG with dynamic building and artifact injection. 3. Prove equivalence/laws with Hypothesis.


1. Laws & Invariants

Law Description Enforcement
Idempotence Law For preprocessing: f(f(x)) == f(x). Property tests
Equivalence Law Pipelined execution == sequential funcs (up to batching/ordering). Hypothesis equivalence
Config Invariance Same config + artifacts yield same outputs for same inputs (up to ML non-determinism; precondition: seed/eval pinning). Property tests with preconditions
Step Compatibility Inv Pipeline build fails if step out_type doesn't match next in_type. Build-time validation
Stateless Inv Repeat calls with same inputs/artifacts yield same outputs; no hidden state. Code reviews/tests

These laws ensure pipelines are reproducible and testable.


2. Decision Table

Scenario Data Volume ML Heavy Config Complexity Recommended
Simple preprocessing Small No Low Custom pipe chain
Model inference Medium Yes Medium Custom with artifacts
Streaming data Large No Low Generator funcs
Dynamic configs Any Yes High Config factory + validation

Choose based on needs; inject artifacts for ML reproducibility.


3. Public API (Config Factory & Builders)

Repo implementation lives in src/funcpipe_rag/pipelines/configured.py (stdlib-only, iterator-first, Result at the boundary). Config as typed Pydantic; build to pipeline Callable with compatibility checks. Guard ML imports.

Step Kinds (for composition): | Kind | Description | |------|-------------| | MAP | 1:1 transform; use map | | FLAT_MAP | 1:many; use chain.from_iterable o map |

from typing import Callable, TypeVar, Iterator, Dict, Any
from functools import partial
from itertools import chain
from pydantic import BaseModel, model_validator, ConfigDict, Field
from funcpipe_rag import FPResult, Ok, Err, ErrInfo, RawDoc, CleanDoc, ChunkWithoutEmbedding, Chunk
import json  # Stdlib for config load

T = TypeVar('T')
U = TypeVar('U')


# Per-step param models (forbid extras)
class CleanParams(BaseModel):
    model_config = ConfigDict(extra='forbid')
    strip: bool = True
    lower: bool = True


class ChunkParams(BaseModel):
    model_config = ConfigDict(extra='forbid')
    size: int = 512
    overlap: int = 0

    @model_validator(mode='after')
    def check_overlap(self) -> 'ChunkParams':
        if self.size <= 0:
            raise ValueError("size must be > 0")
        if not (0 <= self.overlap < self.size):
            raise ValueError("overlap must satisfy 0 <= overlap < size")
        return self


class EmbedParams(BaseModel):
    model_config = ConfigDict(extra='forbid')
    # No batch_size; use batch combinator if needed


# Union for params
class StepConfig(BaseModel):
    name: str
    params: Dict[str, Any] = Field(default_factory=dict)  # Normalized below

    @model_validator(mode='after')
    def validate_params(self) -> 'StepConfig':
        param_models = {
            'clean': CleanParams,
            'chunk': ChunkParams,
            'embed': EmbedParams,
        }
        model = param_models.get(self.name)
        if model:
            parsed = model(**self.params)  # Validates; rejects unknown/extras
            self.params = parsed.model_dump()  # Normalize/coerce
        else:
            raise ValueError(f"Unknown step: {self.name}")
        return self


class PipelineConfig(BaseModel):
    steps: list[StepConfig]


# Stateless transforms (define before registry)
def clean_doc(doc: RawDoc, strip: bool = True, lower: bool = True) -> CleanDoc:
    text = doc.content.strip() if strip else doc.content
    text = text.lower() if lower else text
    return CleanDoc(doc_id=doc.metadata.get('id', ''), abstract=text)


def chunk_doc(cd: CleanDoc, size: int = 512, overlap: int = 0) -> Iterator[ChunkWithoutEmbedding]:
    text = cd.abstract
    step = size - overlap
    for i in range(0, len(text), step):
        chunk = text[i:i + size]
        if chunk:
            yield ChunkWithoutEmbedding(cd.doc_id, chunk, i, i + len(chunk))


def embed_chunk(cwe: ChunkWithoutEmbedding, embedder: Callable[[str], list[float]]) -> FPResult[Chunk, ErrInfo]:
    try:
        emb = embedder(cwe.text)
        return Ok(Chunk(cwe.doc_id, cwe.text, emb, cwe.start, cwe.end))
    except Exception as ex:
        return Err(ErrInfo.from_exc(ex))


# Registry with metadata (callables, runtime types)
STEP_REGISTRY = {
    'clean': {'kind': 'MAP', 'func': clean_doc, 'in_type': RawDoc, 'out_type': CleanDoc, 'param_model': CleanParams},
    'chunk': {'kind': 'FLAT_MAP', 'func': chunk_doc, 'in_type': CleanDoc, 'out_type': ChunkWithoutEmbedding,
              'param_model': ChunkParams},
    'embed': {'kind': 'MAP', 'func': embed_chunk, 'in_type': ChunkWithoutEmbedding, 'out_type': FPResult,
              'param_model': EmbedParams},  # Use class for runtime
}

SENTENCE_TRANSFORMERS_AVAILABLE = False
try:
    import sentence_transformers

    SENTENCE_TRANSFORMERS_AVAILABLE = True
except ImportError:
    pass


def load_config(path: str) -> PipelineConfig:
    with open(path, 'r') as f:
        data = json.load(f)
    return PipelineConfig(**data)  # Validates


def build_pipeline(config: PipelineConfig, artifacts: dict[str, Any]) -> Callable[
    [Iterator[RawDoc]], Iterator[FPResult[Chunk, ErrInfo]]]:
    funcs: list[Callable] = []
    prev_type: type = RawDoc  # Start
    for step in config.steps:
        reg = STEP_REGISTRY[step.name]
        func: Callable = reg['func']
        overlap = set(step.params) & set(artifacts.get(step.name, {}))
        if overlap:
            raise ValueError(f"Artifact overlaps config param: {overlap}")
        partial_func = partial(func, **step.params, **artifacts.get(step.name, {}))  # Inject
        funcs.append((reg['kind'], partial_func))
        if not issubclass(prev_type, reg['in_type']):
            raise TypeError(f"Incompatible: {prev_type} not subclass of {reg['in_type']}")
        prev_type = reg['out_type']
    # Enforce effectful last
    if prev_type != FPResult:
        raise ValueError("Effectful steps must be last")

    def pipeline(data: Iterator[RawDoc]) -> Iterator[FPResult[Chunk, ErrInfo]]:
        it: Iterator[Any] = data
        for kind, f in funcs:
            if kind == 'FLAT_MAP':
                it = chain.from_iterable(map(f, it))
            else:
                it = map(f, it)
        return it

    return pipeline

4. Reference Implementations

4.1 Stateless Transforms

# See Public API; defined there

4.2 Config-Driven Building

# Artifact factories (effects at boundary)
def embed_artifact(model_name: str = 'all-MiniLM-L6-v2') -> Callable[[str], list[float]]:
    if SENTENCE_TRANSFORMERS_AVAILABLE:
        model = sentence_transformers.SentenceTransformer(model_name)
        model.eval()  # Freeze; pin seed/version/device for determinism
        return lambda text: model.encode(text).tolist()
    raise ImportError("Install sentence-transformers")

# Artifacts from boundary (e.g., lifespan/DI)
artifacts = {'embed': {'embedder': embed_artifact()}}  # Example; inject from outside in prod

4.3 ML-Specific Steps

# Adapted sklearn (pure transform only; fit at boundary)
from sklearn.preprocessing import StandardScaler

# Fit at boundary
def fit_scaler(data: list[list[float]], params: dict = {}) -> StandardScaler:
    scaler = StandardScaler(**params)
    scaler.fit(data)
    return scaler  # Freeze if needed

# Stateless transform (inject fitted; batch for efficiency)
def scale_features(features: list[list[float]], scaler: StandardScaler) -> list[list[float]]:
    return scaler.transform(features).tolist()  # Vectorized batch

4.4 Integration in RAG

# RAG with config pipeline
def rag_ml_pipeline(config_path: str, docs: Iterator[RawDoc], artifacts: dict[str, Any]) -> Iterator[FPResult[Chunk, ErrInfo]]:
    config = load_config(config_path)
    pipeline = build_pipeline(config, artifacts)
    return pipeline(docs)

4.5 Before/After Refactor

# Before: Hard-coded
cleaned = map(partial(clean_doc, lower=True), docs)
chunks = chain.from_iterable(map(partial(chunk_doc, size=512), cleaned))
embedded = map(partial(embed_chunk, embedder=embed_artifact()), chunks)
# After: Config-driven
config = PipelineConfig(steps=[StepConfig(name='clean', params={'lower': True}), StepConfig(name='chunk', params={'size': 512}), StepConfig(name='embed')])
pipeline = build_pipeline(config, {'embed': {'embedder': embed_artifact()}})
result = pipeline(docs)

5. Property-Based Proofs (repo tests)

Runnable tests live in tests/unit/pipelines/test_configured_pipeline.py.

from hypothesis import given, assume
import hypothesis.strategies as st

# Structured strategy
raw_doc_strat = st.builds(RawDoc, content=st.text(), metadata=st.dictionaries(st.text(), st.text()))

@given(st.lists(raw_doc_strat))
def test_idempotence_clean(docs):
    def normalize_text(s: str, strip: bool = True, lower: bool = True) -> str:
        s = s.strip() if strip else s
        return s.lower() if lower else s
    cleaned = [normalize_text(d.content) for d in docs]
    twice = [normalize_text(s) for s in cleaned]
    assert cleaned == twice

@given(st.text(min_size=1), st.integers(min_value=1, max_value=10), st.integers(min_value=0, max_value=9))
def test_chunk_reconstruction(text, size, overlap):
    assume(overlap < size)
    cd = CleanDoc('id', text)
    chunks = list(chunk_doc(cd, size=size, overlap=overlap))
    recon = chunks[0].text
    for c in chunks[1:]:
        recon += c.text[overlap:]  # Property: precise reconstruction by dropping overlap
    assert recon == text


6. Runtime Preservation Guarantee

Stateless allows parallelism; config + pinned artifacts enable A/B testing; determinism requires eval mode/seed pinning.


7. Anti-Patterns & Immediate Fixes

Anti-Pattern Symptom Fix
Stateful steps Non-reproducible Fit at boundary; inject frozen
Hard-coded params Brittle changes Use config data
No validation Runtime fails Validate config + compatibility
Eager full load Memory issues Use generators for streaming

8. Pre-Core Quiz

  1. Stateless for…? → Reproducibility
  2. Config-driven…? → Versioning/reuse
  3. Chains for…? → Composability
  4. ML integration? → Boundary fit + inject
  5. Benefit? → Testable pipelines

9. Post-Core Exercise

  1. Build config-driven RAG pipeline.
  2. Prove idempotence with Hypothesis.

Pipeline Usage (Idiomatic)

pipeline = build_pipeline(config, artifacts)
result = pipeline(data)

Next Core: 6. Command-Line and Config-Driven FuncPipe – Integrating with Click/Typer