Skip to content

Core 7: FP Style with Distributed/Dataflow Systems (Dask/Beam-Like Transform + Sink Thinking)

Module 09

Core question: How do you adapt distributed/dataflow systems like Dask or Apache Beam to functional programming principles in FuncPipe, using pure transforms, lazy graphs, explicit sinks, and stateful operators with algebraic contracts to maintain composability, purity, and testability for large-scale pipelines?

In this core, we delve into adapting distributed and dataflow systems to a functional programming (FP) style within the FuncPipe RAG Builder (now at funcpipe-rag-09). Distributed systems are essential for handling data volumes that exceed single-machine capabilities, but they introduce complexities like work duplication from retries, shuffles for joins, nondeterminism from scheduling, and floating-point drift in aggregations that can undermine FP's emphasis on purity and predictability. Dask excels in Python-native lazy computation through task graphs, allowing you to parallelize code with familiar APIs like dask.delayed for custom tasks or dask.bag for unstructured data collections—ideal for quick prototyping and integration with existing Python ecosystems. Apache Beam, in contrast, provides a unified model for portable pipelines using PTransforms and PCollections, enabling the same code to run on various backends (e.g., local DirectRunner, Google Dataflow, Apache Flink, or Apache Spark), which is invaluable for cross-platform deployments.

To align these systems with FP, we model pipelines as lazy graphs of operators, where each operator has an explicit algebraic contract that guarantees correct behavior under distribution (e.g., associativity for reductions to allow parallel aggregation without order-dependence). We focus on pure transforms (stateless functions operating on individual elements or partitions without side effects), explicit sinks (final steps confined to I/O, such as writing results to files or databases, with idempotence guarantees), and simple stateful operators (like combines with monoids for aggregations). Pipelines are configured via data structures (e.g., YAML specifying operator sequence, contracts like monoids, runner/backend settings, resources, serialization rules, and materialization points), seamlessly integrating with FuncPipe's ADTs (e.g., mapping FPResult over partitions to handle per-element failures). We refactor the RAG ingestion process (e.g., processing massive document sets into embeddings) into these distributed FP pipelines, verifying equivalence to local versions and laws like purity (fixed inputs yield identical outputs under a fixed scheduler) and associativity for reducers.

Dependencies: pip install dask[distributed] apache-beam; tradeoffs: Dask's simplicity for Python-centric workflows vs Beam's portability across languages and runners; sharp edges: Dask's partition locality favors low-shuffle ops while Beam's shuffles enable joins but incur costs—test with local schedulers to catch issues early, and ensure purity by avoiding mutable globals or time-dependent logic in transforms.

Motivation Bug: Imperative distributed code often embeds effects (e.g., I/O or state mutation) directly in computation steps, leading to non-reproducible runs, hard-to-test graphs, and failures under retries or shuffles. FP style enforces a clear boundary: pure, composable transforms build the graph lazily, while effects are confined to idempotent sinks, enabling reliable scaling without sacrificing testability or debuggability.

Delta from Core 6 (Module 09): The CLI provides user-facing entry points for running pipelines; this core extends FuncPipe to distributed execution, where graphs are built as composable operators with algebraic guarantees, allowing seamless local-to-cluster transitions.

Distributed Protocol (Contract, Entry/Exit Criteria): - Purity: Transforms must be referentially transparent (outputs depend only on inputs, injected artifacts, and config); no mutable globals or network calls inside operators; idempotent under retries/duplication (e.g., no incrementing counters); worker-local caches are allowed if read-only and deterministic (e.g., model loading in setup). - Determinism: Under fixed scheduler and no floating-point ops, fixed inputs yield fixed outputs; floating-point ops require tolerance in equivalence checks; nondeterminism from scheduling/reordering requires contracts (e.g., commutative monoids for reductions). - Composability: Graphs are built as sequences of typed operators (e.g., Map, FlatMap, Combine); each operator declares its contract (e.g., monoid for reductions) to ensure associativity/commutativity under reordering. - Laziness: Construct the graph without execution; trigger compute/submit only at the end (Dask.compute, Beam.run) or explicit materialization points. - Semantics: Laws like purity (a transform on fixed inputs is deterministic under a fixed scheduler); equivalence (distributed graph == local execution up to partitioning, ordering, and batching, given operator contracts); idempotence for sinks (repeat writes produce the same result or are transactional); verified via property tests with local runners and keyed/tolerant comparisons. - Integration: Map RAG stages (clean, chunk, embed) to operators; propagate FPResult for per-element failures; use Beam coders or Dask serialization for ADTs; test graphs locally before scaling. - Mypy Config: --strict; dask/beam typing optional.

Audience: Engineers scaling FP pipelines to distributed systems, needing pure transforms in dataflows while handling state, shuffles, and failures gracefully.

Outcome: 1. Define operator IR with algebraic contracts for distributed FP. 2. Build and run RAG pipelines on Dask/Beam with pure operators and sinks. 3. Prove equivalence/laws with keyed, tolerant tests.


1. Laws & Invariants

Law Description Enforcement
Purity Law Transforms depend only on inputs; no globals or side effects. Type checks/runtime validation in IR builder
Equivalence Law Distributed result == local for same inputs (up to partitioning/ordering/batching). Hypothesis with keyed multisets + tolerance
Idempotence Inv Repeat runs same output if idempotent logic. Property tests with retry simulation
Partition Inv Elementwise transforms preserve element independence; stateful stages declare monoid/window. IR build-time contract checks

These laws ensure distributed layers don't break FP properties. Enforcement uses type-level contracts (e.g., requiring Monoid typeclass for Combine) and runtime assertions in the graph builder. Preconditions: For commutativity-dependent ops, monoid must be commutative (tested via properties).


2. Decision Table

Scenario Portability Needed Data Type Recommended
Python-only scale No Arrays/DF Dask DataFrame
Python-only scale No Unstructured Dask Bag
Custom tasks No Any Dask Delayed
Multi-lang/runners Yes General Beam
Lazy graphs Any Any Both
FP purity focus Any Any Both with pure DoFn

Dask for ease, Beam for portability.

3. Public API (Graph Builders & Transforms)

Builders as funcs. Guard imports. We define an intermediate representation (IR) for operators with algebraic contracts to ensure composability and validation before backend mapping.

Repo alignment note (end-of-Module-09): - Dask/Beam are optional and not part of this repo’s default dependency set. - This repo includes a placeholder location for compilers at src/funcpipe_rag/pipelines/distributed.py (import-guarded).

from typing import Callable, TypeVar, Any, List, Dict, Optional, Literal, Union
from funcpipe_rag import FPResult, RawDoc, Chunk, CleanDoc, EmbeddedChunk, Ok, Err, is_ok, raise_on_err
from funcpipe_rag import Monoid  # Reuse canonical Monoid from Module 05
from dataclasses import dataclass
import yaml
from pydantic import BaseModel, Field, validator
from typing_extensions import TypedDict

DASK_AVAILABLE = False
BEAM_AVAILABLE = False
try:
    import dask
    import dask.bag as db
    from dask.distributed import Client, LocalCluster

    DASK_AVAILABLE = True
except ImportError:
    DASK_AVAILABLE = False
try:
    import apache_beam as beam
    from apache_beam.transforms import PTransform
    from apache_beam.coders import Coder
    from apache_beam.transforms.combiners import CombineFn

    BEAM_AVAILABLE = True
except ImportError:
    BEAM_AVAILABLE = False

T = TypeVar('T')
U = TypeVar('U')


class TypeId(str):
    """String tag for type identity, e.g. 'FPResult[CleanDoc]'"""
    pass


# ErrorPolicy for FPResult handling
ErrorPolicy = Literal["drop", "collect", "fail_fast"]


# Monoid with optional commutative flag
@dataclass
class SumMonoid(Monoid[int]):
    commutative: bool = True

    def empty(self) -> int: return 0

    def combine(self, a: int, b: int) -> int: return a + b


# Func registry to avoid globals
class FuncRegistry(TypedDict):
    funcs: Dict[str, Callable]
    lifts: Dict[str, Callable[[T], FPResult[U]]]
    monoids: Dict[str, Monoid]


DEFAULT_REGISTRY: FuncRegistry = {
    'funcs': {
        'clean_doc': clean_doc,
        'chunk_doc': chunk_doc,
        'embed_batch': embed_batch,
    },
    'lifts': {
        'some_lift_func': some_lift_func,
    },
    'monoids': {
        'sum': SumMonoid(),
    }
}


@dataclass
class SinkContract:
    idempotent: bool


# Operator IR ADTs
@dataclass
class MapOp:
    name: str
    func: Callable[[T], FPResult[U]]
    in_type: TypeId
    out_type: TypeId
    error_policy: ErrorPolicy


@dataclass
class FlatMapOp:
    name: str
    func: Callable[[T], List[FPResult[U]]]
    in_type: TypeId
    out_type: TypeId
    error_policy: ErrorPolicy


@dataclass
class BatchMapOp:
    name: str
    func: Callable[[List[T]], List[FPResult[U]]]
    in_type: TypeId
    out_type: TypeId
    error_policy: ErrorPolicy
    max_batch_size: int = 128


@dataclass
class CombineOp:
    name: str
    lift: Callable[[T], FPResult[U]]
    monoid: Monoid[U]
    in_type: TypeId
    out_type: TypeId
    error_policy: ErrorPolicy


@dataclass
class SinkOp:
    name: str
    func: Callable[[U], None]
    in_type: TypeId
    contract: SinkContract


Operator = MapOp | FlatMapOp | BatchMapOp | CombineOp | SinkOp


# YAML Config Schema Example (first op consumes FPResult[RawDoc])
# operators:
#   - type: Map
#     name: clean
#     func: clean_doc
#     error_policy: drop
#     in_type: FPResult[RawDoc]
#     out_type: FPResult[CleanDoc]
#   - type: FlatMap
#     name: chunk
#     func: chunk_doc
#     error_policy: collect
#     in_type: FPResult[CleanDoc]
#     out_type: FPResult[Chunk]
#   - type: BatchMap
#     name: embed
#     func: embed_batch
#     max_batch_size: 128
#     error_policy: drop
#     in_type: FPResult[Chunk]
#     out_type: FPResult[EmbeddedChunk]
#   - type: Sink
#     name: write
#     func: write_chunk
#     contract: {idempotent: true}
#     in_type: FPResult[EmbeddedChunk]

class OpConfig(BaseModel):
    type: Literal["Map", "FlatMap", "BatchMap", "Combine", "Sink"]
    name: str
    func: str
    error_policy: ErrorPolicy = "collect"
    monoid: Optional[str] = None
    lift: Optional[str] = None
    max_batch_size: Optional[int] = None
    contract: Optional[Dict[str, Any]] = None
    in_type: str
    out_type: str


class PipelineConfig(BaseModel):
    operators: List[OpConfig]
    backend: Literal["dask", "beam"]
    resources: Dict[str, Any] = Field(default_factory=dict)
    materialization_points: List[str] = Field(default_factory=list)


def load_config(yaml_path: str) -> PipelineConfig:
    with open(yaml_path, 'r') as f:
        data = yaml.safe_load(f)
    return PipelineConfig.model_validate(data)


def build_ir_from_config(
        config: PipelineConfig,
        registry: FuncRegistry | None = None
) -> List[Operator]:
    reg = registry or DEFAULT_REGISTRY
    ir = []
    prev_out_type = TypeId(config.operators[0].in_type)
    if prev_out_type != TypeId("FPResult[RawDoc]"):
        raise ValueError("First operator must consume FPResult[RawDoc]")
    for op_cfg in config.operators:
        func = reg['funcs'].get(op_cfg.func)
        if func is None:
            raise ValueError(f"Func {op_cfg.func} not in registry")
        name = op_cfg.name
        error_policy = op_cfg.error_policy
        in_type = TypeId(op_cfg.in_type)
        out_type = TypeId(op_cfg.out_type)
        if in_type != prev_out_type:
            raise ValueError(f"Type mismatch at {name}")
        if op_cfg.type == 'Map':
            ir.append(MapOp(name, func, in_type, out_type, error_policy))
        elif op_cfg.type == 'FlatMap':
            ir.append(FlatMapOp(name, func, in_type, out_type, error_policy))
        elif op_cfg.type == 'BatchMap':
            ir.append(BatchMapOp(name, func, in_type, out_type, error_policy, op_cfg.max_batch_size or 128))
        elif op_cfg.type == 'Combine':
            if op_cfg.monoid is None:
                raise ValueError("Monoid required")
            monoid = reg['monoids'].get(op_cfg.monoid)
            if monoid is None:
                raise ValueError(f"Monoid {op_cfg.monoid} not in registry")
            lift = reg['lifts'].get(op_cfg.lift)
            if lift is None:
                raise ValueError(f"Lift {op_cfg.lift} not in registry")
            if not getattr(monoid, 'commutative', False):
                raise ValueError(f"CombineOp {name} requires commutative monoid")
            ir.append(CombineOp(name, lift, monoid, in_type, out_type, error_policy))
        elif op_cfg.type == 'Sink':
            if not op_cfg.contract or not op_cfg.contract.get('idempotent'):
                raise ValueError("Sink must be idempotent")
            ir.append(SinkOp(name, func, in_type, contract=SinkContract(idempotent=True)))
        prev_out_type = out_type
    if any(isinstance(op, SinkOp) for op in ir[:-1]):
        raise ValueError("Sinks must be terminal")
    if any(isinstance(op, CombineOp) and i != len(ir) - 1 for i, op in enumerate(ir)):
        raise ValueError("CombineOp must be terminal")
    return ir


# FPResult lifting helpers
def map_r(f: Callable[[T], FPResult[U]]) -> Callable[[FPResult[T]], FPResult[U]]:
    def g(r: FPResult[T]) -> FPResult[U]:
        if r.is_err: return r
        return f(r.value)

    return g


def flatmap_r(f: Callable[[T], List[FPResult[U]]]) -> Callable[[FPResult[T]], List[FPResult[U]]]:
    def g(r: FPResult[T]) -> List[FPResult[U]]:
        if r.is_err: return [r]
        return f(r.value)

    return g


def apply_policy_dask(bag, policy: ErrorPolicy):
    if policy == "drop":
        return bag.filter(is_ok)
    if policy == "fail_fast":
        return bag.map(raise_on_err)
    return bag  # collect: pass through


def apply_policy_beam(pcoll, policy: ErrorPolicy):
    if policy == "drop":
        return pcoll | beam.Filter(is_ok)
    if policy == "fail_fast":
        return pcoll | beam.Map(raise_on_err)
    return pcoll


# Backend compilers
def compile_to_dask(ir: List[Operator], data: List[RawDoc], config: PipelineConfig) -> Union[db.Bag, Delayed]:
    if not DASK_AVAILABLE:
        raise ImportError("Dask not available")
    bag = db.from_sequence(data, npartitions=config.resources.get('workers', 4)).map(Ok)  # Normalize source
    for op in ir:
        if isinstance(op, MapOp):
            bag = bag.map(map_r(op.func))
            bag = apply_policy_dask(bag, op.error_policy)
        elif isinstance(op, FlatMapOp):
            bag = bag.flatmap(flatmap_r(op.func))
            bag = apply_policy_dask(bag, op.error_policy)
        elif isinstance(op, BatchMapOp):
            def batch_part(part):
                buf = []
                for r in part:
                    if r.is_err:
                        yield r
                        continue
                    buf.append(r.value)
                    if len(buf) >= op.max_batch_size:
                        yield from op.func(buf)
                        buf = []
                if buf:
                    yield from op.func(buf)

            bag = bag.map_partitions(batch_part)
            bag = apply_policy_dask(bag, op.error_policy)
        elif isinstance(op, CombineOp):
            if op.error_policy == "collect":
                raise NotImplementedError("Collect policy in Combine")
            bag = bag.map(map_r(op.lift))
            bag = apply_policy_dask(bag, op.error_policy)
            bag = bag.filter(is_ok).map(lambda r: r.value)
            bag = bag.fold(op.monoid.combine, initial=op.monoid.empty(), combine=op.monoid.combine)
        if op.name in config.materialization_points:
            bag = bag.persist()
    return bag


def compile_to_beam(ir: List[Operator], data: List[RawDoc], config: PipelineConfig) -> PTransform:
    if not BEAM_AVAILABLE:
        raise ImportError("Beam not available")

    class CustomCoder(Coder):
        def encode(self, x): import pickle; return pickle.dumps(x)

        def decode(self, b): import pickle; return pickle.loads(b)

    beam.coders.registry.register_coder(FPResult, CustomCoder)

    def build_composite_transform() -> PTransform:
        class Composite(PTransform):
            def expand(self, pcoll):
                pcoll = pcoll | beam.Map(Ok)  # Normalize source
                for op in ir:
                    if isinstance(op, MapOp):
                        pcoll = pcoll | beam.Map(map_r(op.func))
                        pcoll = apply_policy_beam(pcoll, op.error_policy)
                    elif isinstance(op, FlatMapOp):
                        pcoll = pcoll | beam.FlatMap(flatmap_r(op.func))
                        pcoll = apply_policy_beam(pcoll, op.error_policy)
                    elif isinstance(op, BatchMapOp):
                        current_op = op

                        def batch_dofn():
                            class BatchDoFn(beam.DoFn):
                                def process(self, batch):
                                    buf = []
                                    for r in batch:
                                        if r.is_err:
                                            yield r
                                            continue
                                        buf.append(r.value)
                                        if len(buf) >= current_op.max_batch_size:
                                            for res in current_op.func(buf):
                                                yield res
                                            buf = []
                                    if buf:
                                        for res in current_op.func(buf):
                                            yield res

                            return BatchDoFn()

                        pcoll = pcoll | beam.BatchElements(max_batch_size=op.max_batch_size) | beam.ParDo(batch_dofn())
                        pcoll = apply_policy_beam(pcoll, op.error_policy)
                    elif isinstance(op, CombineOp):
                        if op.error_policy == "collect":
                            raise NotImplementedError("Collect policy in Combine")

                        def make_combine_fn(op: CombineOp):
                            class _Fn(CombineFn):
                                def create_accumulator(self):
                                    return op.monoid.empty()

                                def add_input(self, acc, input):
                                    r = map_r(op.lift)(input)
                                    if r.is_err:
                                        if op.error_policy == "drop":
                                            return acc
                                        if op.error_policy == "fail_fast":
                                            raise ValueError("Fail-fast")
                                    return op.monoid.combine(acc, r.value)

                                def merge_accumulators(self, accs):
                                    if not accs: return self.create_accumulator()
                                    acc = accs[0]
                                    for a in accs[1:]:
                                        acc = op.monoid.combine(acc, a)
                                    return acc

                            return _Fn()

                        pcoll = pcoll | beam.CombineGlobally(make_combine_fn(op))
                return pcoll

        return Composite()

    return build_composite_transform()

4. Reference Implementations

4.1 Dask in FP Style

def clean_doc(doc: RawDoc) -> FPResult[CleanDoc]: ...
def chunk_doc(doc: CleanDoc) -> List[FPResult[Chunk]]: ...
def embed_batch(chunks: List[Chunk]) -> List[FPResult[EmbeddedChunk]]: ...

config = load_config('config.yaml')
ir = build_ir_from_config(config)
bag = compile_to_dask(ir, docs, config)

4.2 Beam in FP Style

p = beam.Pipeline()
pcoll = p | beam.Create(docs)
transform = compile_to_beam(ir, docs, config)
pcoll = pcoll | transform
p.run()

4.3 Explicit Sinks

# Elementwise, idempotent
def write_chunk(chunk: EmbeddedChunk) -> None: ...

4.4 Integration in RAG

config = load_config('rag_config.yaml')
ir = build_ir_from_config(config)
sink_ops = [op for op in ir if isinstance(op, SinkOp)]
transform_ops = [op for op in ir if not isinstance(op, SinkOp)]
if config.backend == 'dask':
    with LocalCluster() as cluster, Client(cluster):
        bag = compile_to_dask(transform_ops, docs, config)
        results = bag.compute()
        for sink in sink_ops:
            for r in results:
                if r.is_ok:
                    sink.func(r.value)
elif config.backend == 'beam':
    p = beam.Pipeline()
    pcoll = p | beam.Create(docs) | compile_to_beam(transform_ops, docs, config)
    for sink in sink_ops:
        pcoll | beam.Map(lambda r: sink.func(r.value) if r.is_ok else None)
    p.run()

4.5 Before/After Refactor

# Before: Local
local = [
    embed_r
    for d in docs
    for cd in [clean_doc(d)] if cd.is_ok
    for cs in [chunk_doc(cd.value)]
    for c in cs if c.is_ok
    for embed_r in embed_batch([c.value for c in cs if c.is_ok]) if embed_r.is_ok
]

# After: Dask
bag = db.from_sequence(docs).map(Ok).map(map_r(clean_doc)).flatmap(flatmap_r(chunk_doc)).map_partitions(...embed_batch...)
distributed = bag.compute()

5. Property-Based Proofs (repo tests)

This repo focuses on the spec/reconstruction and stdlib-first properties by default; distributed backends are exercised only when optional dependencies are installed.

from hypothesis import given, settings
import hypothesis.strategies as st
import numpy as np
from collections import Counter

@given(st.lists(raw_doc_strat(), max_size=50))
@settings(max_examples=50)
def test_dask_equiv(docs):
    local = [
        embed_r
        for d in docs
        for cd in [clean_doc(d)] if cd.is_ok
        for cs in [chunk_doc(cd.value)]
        for c in cs if c.is_ok
        for embed_r in embed_batch([c.value for c in cs if c.is_ok]) if embed_r.is_ok
    ]
    bag = db.from_sequence(docs).map(Ok).map(map_r(clean_doc)).flatmap(flatmap_r(chunk_doc)).map_partitions(...embed_batch...)
    distributed = bag.compute()
    def stable_key(r: FPResult[EmbeddedChunk]) -> tuple:
        if r.is_err: return ("err", repr(r.error))
        c = r.value
        return (c.doc_id, c.chunk_id, tuple(np.round(c.embedding.vector, decimals=5)))
    assert Counter(stable_key(r) for r in local) == Counter(stable_key(r) for r in distributed)


6. Runtime Preservation Guarantee

Distributed scales horizontally; local for tests via single-worker configs. Use local runners (Dask LocalCluster, Beam DirectRunner) to verify before cluster deploy.

7. Anti-Patterns & Immediate Fixes

Anti-Pattern Symptom Fix
Effects in transforms Non-pure Confine to sinks; validate in IR
Eager compute Memory blowup Lazy IR build + materialization
Mutable DoFn Non-reproducible Stateless + worker-local RO cache
No batching Slow embeddings Batch-enabled BatchMapOp
---
## 8. Pre-Core Quiz
1. Dask for…? → Python scale
2. Beam for…? → Portable pipelines
3. Transforms for…? → Pure ops
4. Sinks for…? → Effects
5. Benefit? → FP distributed
## 9. Post-Core Exercise
1. Scale RAG to Dask/Beam via YAML config.
2. Add GroupByKeyOp to IR for keyed dedup; prove equiv with Hypothesis, including tolerance/retries.
Pipeline Usage (Idiomatic)
config = load_config('path.yaml')
ir = build_ir_from_config(config)
bag = compile_to_dask(ir, data, config)
result = bag.compute()
Next Core: 8. Wrapping Imperative Libraries in Functional Facades (Normalizing Side-Effectful APIs)