Skip to content

Core 6: Advanced Patterns & Scaling – Pattern Matching, Typing, Parallel Execution

Module 10

Core question:
How do Python's structural pattern matching and advanced typing features enable richer, safer functional pipelines, and how do you scale FuncPipe to parallel execution with tools like concurrent.futures or Dask while preserving purity, composability, and laws?

In this core, we apply advanced Python features and scaling techniques to the FuncPipe RAG Builder (now at funcpipe-rag-10). We use structural pattern matching for expressive ADT handling, ParamSpec for type-safe higher-order functions, and parallel execution via concurrent.futures (CPU-bound) or Dask (distributed) with explicit contracts for purity, ordering, and error handling. These enable concise, verified, scalable pipelines. Builds on prior cores.

Motivation Bug: Complex ADTs require verbose dispatch; higher-order functions lose type precision; pipelines bottleneck on single core—advanced features and parallel backends solve these without sacrificing FP guarantees.

Delta from Core 5: PBT verifies laws; this applies language features and scaling with law-preserving contracts.

Protocol (Contract, Entry/Exit Criteria): - Properties: Match exhaustiveness (mypy with sealed ADTs), typing preservation, parallel equiv to sequential (stable order or multiset). - Semantics: Match for ADTs; typed HOFs; parallel via pure, picklable mappers. - Purity: Immutable data; no shared state. - Error Model: Per-item Results in parallel; aggregate or propagate. - Resource Safety: Bounded workers; timeouts. - Integration: Apply to RAG (match on Results, parallel chunking). - Mypy Config: --strict. - Exit: Features pass mypy, parallel passes PBT equiv, scales near-linearly (with GIL/serialization overhead).

Audience: Engineers building expressive, scalable FP pipelines.

Outcome: 1. Use match for ADTs. 2. Type-safe HOFs with ParamSpec. 3. Parallel backends with contracts. 4. Prototype in RAG, verify laws.


1. Laws & Invariants

Invariant Description Enforcement
Match Inv Match covers variants where possible (sealed ADTs via Literal/Enum + assert_never). Mypy + runtime
Typing Inv HOFs preserve full signatures/return types. Mypy --strict
Parallel Inv Parallel == sequential (stable order for sequential executor; multiset for parallel). PBT equivalence
Idempotence Inv Parallel ops idempotent if sequential is. PBT idempotence

These extend prior laws with explicit contracts.


2. Decision Table

Scenario Typing Needed Parallel Needed Recommended
ADT handling Medium No Match/case
HOF safety High No ParamSpec
Scale CPU-bound Low Yes ProcessPoolExecutor
Scale distributed Low Yes Dask

Choose match for expressiveness; advanced typing for HOFs; parallel for scale.


3. Public API (Advanced Patterns)

Type-safe HOFs; match on ADTs.

from typing import Callable, ParamSpec, TypeVar, Generic
from functools import wraps
from funcpipe_rag import Result, Ok, Err, ErrInfo, RawDoc, Chunk

P = ParamSpec("P")
R = TypeVar("R")
T = TypeVar("T")


def result_hof(f: Callable[P, R]) -> Callable[P, Result[R, ErrInfo]]:
    @wraps(f)
    def wrapped(*args: P.args, **kwargs: P.kwargs) -> Result[R, ErrInfo]:
        try:
            return Ok(f(*args, **kwargs))
        except Exception as exc:
            return Err(ErrInfo(code=type(exc).__name__, message=str(exc)))

    return wrapped


def to_option(res: Result[T, ErrInfo]) -> T | None:
    match res:
        case Ok(value):
            return value
        case Err(_):
            return None


def unwrap_or(res: Result[T, ErrInfo], default: T) -> T:
    match res:
        case Ok(value):
            return value
        case Err(_):
            return default

4. Reference Implementations

4.1 Pattern Matching for ADTs

Expressive dispatch with exhaustiveness.

from typing_extensions import assert_never

def process_res(res: Result[Chunk, ErrInfo]) -> Chunk:
    match res:
        case Ok(chunk): return chunk
        case Err(err) if err.code == "transient": raise RetryLater(err)
        case Err(err): raise PermanentError(err)
        case _: assert_never(res)  # mypy exhaustiveness aid

4.2 Typing Advances for HOFs

Preserve arbitrary signatures.

def logged_hof(f: Callable[P, R]) -> Callable[P, R]:
    @wraps(f)
    def wrapped(*args: P.args, **kwargs: P.kwargs) -> R:
        logger.info(f"Calling {f.__name__} with args={args[:2]}...")
        return f(*args, **kwargs)
    return wrapped

4.3 Parallel FuncPipe with Futures

CPU-bound; stable order; per-item errors.

from concurrent.futures import ProcessPoolExecutor
from typing import List

def _safe_apply(f: Callable[[T], R], x: T) -> Result[R, ErrInfo]:
    try:
        return Ok(f(x))
    except Exception as exc:
        return Err(ErrInfo(code=type(exc).__name__, message=str(exc)))

def parallel_map_result(f: Callable[[T], R], items: List[T], max_workers: int = 4) -> List[Result[R, ErrInfo]]:
    # f must be top-level/picklable; wrap in _safe_apply for error safety
    with ProcessPoolExecutor(max_workers) as exec:
        futures = [exec.submit(_safe_apply, f, item) for item in items]
        return [fut.result() for fut in futures]  # Stable order

4.4 Integrating Parallel in RAG

Parallel chunking (stable order).

def parallel_chunk(docs: List[CleanDoc], size: int) -> List[Chunk]:
    # chunk_doc must be top-level/picklable
    with ProcessPoolExecutor() as exec:
        results = exec.map(chunk_doc, docs, [size] * len(docs))
    return [c for cs in results for c in cs]  # Stable order

4.5 Dask for Distributed

Bag semantics (unordered); pure functions.

from functools import partial
import dask.bag as db

def dask_chunk(docs: db.Bag[CleanDoc], size: int) -> db.Bag[Chunk]:
    chunk_size = partial(chunk_doc, size=size)  # Picklable
    return docs.map(chunk_size).flatten()

RAG Integration

Apply match/parallel in RAG; verify laws.


5. Property-Based Proofs (tests/test_module_10_core6.py)

PBT for parallel equiv (stable order).

@given(docs=doc_list_strategy)
def test_parallel_equiv(docs):
    cleaned = [clean_doc(d) for d in docs]
    seq = [c for d in cleaned for c in chunk_doc(d, 256)]
    par = parallel_chunk(cleaned, 256)
    assert eq_pure(seq, par, key=lambda c: (c.doc_id, c.start, c.end, c.text), ordered=True)

6. Runtime Preservation Guarantee

Typing no overhead; match O(1); parallel near-linear speedup (benchmark target with GIL/serialization overhead).


7. Anti-Patterns & Immediate Fixes

Anti-Pattern Symptom Fix
Verbose dispatch Hard to read Match for ADTs
Type loss in HOFs Mypy errors ParamSpec
Sequential scale Slow on multicore Parallel backends
Mutable parallel Races Immutable data only

8. Pre-Core Quiz

  1. Match for…? → ADTs
  2. ParamSpec for…? → HOFs
  3. Parallel with…? → Futures/Dask
  4. Typing advances…? → Safer HOFs
  5. Scaling for…? → Multicore/distributed

9. Post-Core Exercise

  1. Add match to RAG error handling.
  2. Parallelize embedding; PBT equiv.
  3. Prototype Dask pipeline.

Pipeline Usage (Idiomatic)

match res:
    case Ok(v): ...
par = parallel_map_result(f, items)

Next: core 7. Domain-Driven Design Meets FP – Aligning Bounded Contexts with Pipelines