Skip to content

Module 2: First-Class Functions and Expressive Python

Progression Note

By the end of Module 2, you'll master first-class functions for configurability, expression-oriented code, and debugging taps. This prepares for lazy iteration in Module 3. See the series progression map in the repo root for full details.

Here's a snippet from the progression map:

Module Focus Key Outcomes
1: Foundational FP Concepts Purity, contracts, refactoring Spot impurities, write pure functions, prove equivalence with Hypothesis
2: First-Class Functions & Expressive Python Closures, partials, composable configurators Configure pure pipelines without globals
3: Lazy Iteration & Generators Streaming/lazy pipelines Efficient data processing without materializing everything

M02C07 – Callback Hell to Combinators (Replacing Callbacks with Clear Functional Pipelines)

Core question:
How do you replace nested callbacks and imperative chains with combinators (flow, fmap, ffilter, flatmap) that compose lazy, configured, boundary-sealed functions—so pipelines from M02C01–M02C06 are efficient, readable, and testable?

This core introduces combinators for pipelines in Python:
- Use flow to build a 0-arg pipeline from a producer + iterable stages; you run it with pipeline().
- Apply fmap, ffilter, flatmap for lazy mapping/filtering/flattening with bound pure functions.
- Build on M02C06 config-as-data to bind settings, M02C03 laziness for streams, and M02C05 boundaries for effects.

We extend the running project from m02-rag.md—the FuncPipe RAG Builder—evolving from callback-heavy chains to clear combinator pipelines that preserve Module 1 equivalence for the chunk sequence.

Audience: Developers from M02C06 with config-as-data but still using nested callbacks or imperative loops that break readability and laziness.
Outcome:
1. Identify callback smells (nested functions, imperative chains) and explain their impact on composability.
2. Refactor a callback chain into combinators with bound pure functions.
3. Write Hypothesis properties proving pipeline equivalence, with a shrinking example.


1. Conceptual Foundation

1.1 Callback Hell to Combinators in One Precise Sentence

Combinators replace callback hell with higher-order functions (flow, fmap, ffilter, flatmap) that compose lazy streams of bound pure functions—ensuring pipelines are readable, efficient, and configurable without nesting or boilerplate.

1.2 The One-Sentence Rule

Replace nested callbacks and loops with flow for pipeline orchestration, fmap/ffilter/flatmap for transformations—bind config to pure functions via partial or by passing configuration-bearing callables, keeping effects sealed and streams lazy.

1.3 Why This Matters Now

M02C06 gave immutable config data, but nested callbacks or loops obscure pipelines. Combinators make them declarative chains, leveraging M02C03 laziness, M02C05 boundaries, and M02C06 binding for scalable code.

1.4 Combinators as Values in 5 Lines

Combinators as first-class enable dynamic pipelines:

from funcpipe_rag import CleanDoc, ChunkWithoutEmbedding, RagEnv
from funcpipe_rag import gen_chunk_doc
from funcpipe_rag import flatmap
from functools import partial


# Before: Imperative, eager loop
def before_chunk(cd: CleanDoc, env: RagEnv) -> list[ChunkWithoutEmbedding]:
    chunks = []
    text = cd.abstract
    for start in range(0, len(text), env.chunk_size):
        chunk_text = text[start: start + env.chunk_size]
        if chunk_text:
            chunks.append(ChunkWithoutEmbedding(cd.doc_id, chunk_text, start, start + len(chunk_text)))
    return chunks  # Eager list


# After: Lazy combinator with bound config
bound_chunk = partial(gen_chunk_doc, env=RagEnv(512))
lazy_chunk = flatmap(bound_chunk)  # flatmap defined in §4.1

# Usage: lazy_chunk(cleaned_docs_iter) → Iterator[ChunkWithoutEmbedding]

Combinators, bound to config via partial, allow storage in dicts, composition with M02C01, and lazy application—readable and efficient.

Note: Raw dicts from env/CLI live only at the boundary; inside, configuration is always represented as frozen dataclasses (possibly stored in dict lookups).


2. Mental Model: Callback Hell vs Combinator Chains

2.1 One Picture

Callback Hell (Nested)                       Combinator Chains (Linear)
+---------------------------+                +------------------------------+
| def rag(docs, on_done):   |                | flow(                        |
|     cleaned = clean(docs, |.               |   ffilter(bound_keep),       |
|     lambda c:             |                |   fmap(bound_clean),         |
|         chunks = chunk(c, |                |   flatmap(bound_chunk),      |
|         lambda ch:        |                |   fmap(bound_embed)          |
|             on_done(ch)   |                | )()                          |
|         ))                |                +------------------------------+
+---------------------------+                 
   ↑ Nested, Eager, Opaque                    ↑ Linear, Lazy, Config-Bound

2.2 Contract Table

Aspect Callback Hell Combinator Chains
Readability Nested indentation Linear flow
Laziness Often eager Iterator-based
Configurability Hardcoded Bound via partial
Composability Manual nesting Higher-order (flow)
Testing Mock callbacks Property-based streams
Mutable Defaults in Partials Breaks Determinism Use frozen dataclasses or immutable types for configs

Note on Callback Choice: Use callbacks only for legacy APIs; always prefer combinators for pipelines.


3. Running Project: FuncPipe RAG Builder

We extend the FuncPipe RAG Builder from m02-rag.md:
- Dataset: 10k arXiv CS abstracts (arxiv_cs_abstracts_10k.csv).
- Goal: Replace callback chains with combinator pipelines.
- Start: Callback-heavy version (core7_start.py).
- End: Linear combinator chain, preserving equivalence for chunk sequence.

3.1 Types (Canonical, Used Throughout)

From previous cores.

3.2 Callback Hell Start (Anti-Pattern)

# core7_start.py (anti-pattern): nested callbacks obscure the pipeline
from collections.abc import Callable

from funcpipe_rag import (
    Chunk,
    Ok,
    Observations,
    RagBoundaryDeps,
    RagConfig,
    eval_pred,
    gen_chunk_doc,
    structural_dedup_chunks,
)


def callback_full_rag_api(
        path: str,
        config: RagConfig,
        deps: RagBoundaryDeps,
        on_done: Callable[[tuple[list[Chunk], Observations]], None]
) -> None:
    def on_docs(docs):
        def on_cleaned(cleaned):
            def on_chunks(chunks):
                obs = Observations(
                    total_docs=len(docs),
                    total_chunks=len(chunks),
                    kept_docs=len(docs),
                    cleaned_docs=len(cleaned),
                )
                on_done((chunks, obs))

            chunks = [deps.core.embedder(c) for cd in cleaned for c in gen_chunk_doc(cd, config.env)]
            chunks = structural_dedup_chunks(chunks)
            on_chunks(chunks)

        kept = [d for d in docs if eval_pred(d, config.keep.keep_pred)]
        cleaned = [deps.core.cleaner(d) for d in kept]
        on_cleaned(cleaned)

    docs_res = deps.reader.read_docs(path)
    if isinstance(docs_res, Ok):
        on_docs(docs_res.value)

Smells:
- Nested callbacks (on_docs, on_cleaned).
- Eager lists mid-chain.
- Hard to compose/test.
Problem: Obscures flow; breaks laziness.


4. Refactor to Combinators: Linear Chains with Bound Functions

4.1 Combinators (Lazy, Generic)

Core combinators:

from funcpipe_rag import ffilter, flatmap, fmap, flow

# `flow` builds a 0-arg pipeline from a producer + iterable→iterable stages.
pipeline = flow(
    lambda: range(5),
    ffilter(lambda x: x % 2 == 0),
    fmap(lambda x: x + 1),
    flatmap(lambda x: (x, x)),
)

assert list(pipeline()) == [1, 1, 3, 3, 5, 5]

Properties:
- Lazy: Iterator-based.
- Generic: Work on any iterable.
- Pure: No effects.

Note: While combinators promote expression-oriented code, prioritize readability: If a combinator chain becomes nested or complex (e.g., 3+ layers), refactor to named helper functions or consider a simple loop inside a trivial pure wrapper. Purity matters, but so does maintainability.

4.2 Refactored Pipeline (Combinator Chain in Internal Logic)

Bound pure functions:

def _run_core_on_docs(
    docs: list[RawDoc],
    config: RagConfig,
    deps: RagCoreDeps
) -> Result[tuple[list[Chunk], Observations]]:
    keep_rule = lambda d: eval_pred(d, config.keep.keep_pred)
    bound_keep = ffilter(keep_rule)
    bound_clean = fmap(deps.cleaner)
    bound_chunk = flatmap(lambda cd: gen_chunk_doc(cd, config.env))
    bound_embed = fmap(deps.embedder)

    # Metrics pass (pedagogical; duplicates pure work)
    kept_docs = list(bound_keep(docs))
    cleaned = list(bound_clean(kept_docs))

    # Main pipeline
    pipeline = flow(lambda: docs, bound_keep, bound_clean, bound_chunk, bound_embed)
    chunks_iter = pipeline()
    chunks = structural_dedup_chunks(chunks_iter)
    obs = Observations(
        total_docs=len(docs),
        total_chunks=len(chunks),
        kept_docs=len(kept_docs),
        cleaned_docs=len(cleaned),
        sample_doc_ids=tuple(d.doc_id for d in kept_docs[: config.env.sample_size]),
        sample_chunk_starts=tuple(c.start for c in chunks[: config.env.sample_size]),
    )
    return Ok((chunks, obs))

Properties:
- Linear: Clear flow.
- Lazy: Streams until structural_dedup_chunks.
- Config-bound: Via partial; same semantics as M02C05–M02C06 (only wiring changes).

Note: For Observations we recompute the keep/clean steps on docs; in real code you’d thread the intermediate results or refactor Observations to avoid duplicate work.

4.3 Public API (Unchanged from M02C05–M02C06)

from funcpipe_rag import full_rag_api_path

res = full_rag_api_path("arxiv_cs_abstracts_10k.csv", config, deps)

Properties:
- Keeps Result; boundaries unchanged.

4.4 Configurator Tie-In (M02C01)

from funcpipe_rag import make_rag_fn

rag_fn = make_rag_fn(chunk_size=512)  # docs -> (chunks, obs)

Wins: Combinators compose with M02C01 partial for variants.


5. Equational Reasoning: Substitution Exercise

Hand Exercise: Substitute in fmap/ffilter.
1. Inline bound_keep = ffilter(config.keep) → fixed predicate.
2. Substitute into filterer → parametric iterator.
3. Result: Pipeline fixed for fixed config/deps (immutable).
Bug Hunt: In callback version, nesting obscures substitution.

Example:
- Callback: Nested lambdas → hard to substitute.
- Combinator: Linear stages → substitutable.

rag_pipeline = flow(...)  # rag_pipeline now fully determined by (config, deps)

6. Property-Based Testing: Proving Pipeline Behaviour

Use Hypothesis to prove refactor preserves laziness and config-driven behaviour.

6.1 Custom Strategy

From tests/conftest.py (as in Module 1).

6.2 Pipeline Equivalence Property

# tests/test_rag_api.py (equivalence via combinators)
from hypothesis import given
import hypothesis.strategies as st

from funcpipe_rag import (
    RagConfig,
    RagEnv,
    eval_pred,
    ffilter,
    flatmap,
    flow,
    fmap,
    full_rag_api_docs,
    gen_chunk_doc,
    get_deps,
    structural_dedup_chunks,
)
from tests.conftest import doc_list_strategy


@given(docs=doc_list_strategy(), chunk_size=st.integers(128, 1024))
def test_pipeline_equivalence(docs, chunk_size):
    config = RagConfig(env=RagEnv(chunk_size))
    deps = get_deps(config)

    keep_rule = lambda d: eval_pred(d, config.keep.keep_pred)
    pipeline = flow(
        lambda: docs,
        ffilter(keep_rule),
        fmap(deps.cleaner),
        flatmap(lambda cd: gen_chunk_doc(cd, config.env)),
        fmap(deps.embedder),
    )
    chunks = structural_dedup_chunks(pipeline())
    expected, _ = full_rag_api_docs(docs, config, deps)
    assert chunks == expected

Note: Tests combinator pipeline matches Module 1 (chunk sequence equivalence; Observations simplified for pedagogy).

6.3 Lazy Prefix Equivalence

from itertools import islice

@given(docs=doc_list_strategy(), chunk_size=st.integers(128, 1024), k=st.integers(0, 50))
def test_lazy_prefix(docs, chunk_size, k):
    config = RagConfig(env=RagEnv(chunk_size))
    deps = get_deps(config)

    keep_rule = lambda d: eval_pred(d, config.keep.keep_pred)
    pipeline = flow(
        lambda: docs,
        ffilter(keep_rule),
        fmap(deps.cleaner),
        flatmap(lambda cd: gen_chunk_doc(cd, config.env)),
        fmap(deps.embedder),
    )
    chunks_prefix = list(islice(pipeline(), k))
    assert chunks_prefix == list(pipeline())[:k]

Note: Verifies lazy pipeline matches Module 1 on prefixes.

6.4 Idempotence Property

from funcpipe_rag import Ok, RagBoundaryDeps, full_rag_api_path


class FakeReader:
    def __init__(self, docs):
        self._docs = docs

    def read_docs(self, path):
        _ = path
        return Ok(self._docs)


@given(docs=doc_list_strategy(), chunk_size=st.integers(128, 1024))
def test_pipeline_idempotence(docs, chunk_size):
    config = RagConfig(env=RagEnv(chunk_size))
    deps = RagBoundaryDeps(core=get_deps(config), reader=FakeReader(docs))
    res1 = full_rag_api_path("fake.csv", config, deps)
    res2 = full_rag_api_path("fake.csv", config, deps)
    assert res1 == res2

Note: Ensures no hidden state in combinator pipeline.

6.5 Shrinking Demo: Catching a Leaky Bug

Bad pipeline with missing filter:

from funcpipe_rag import RulesConfig, StartsWith


@given(docs=doc_list_strategy(), chunk_size=st.integers(128, 1024))
def test_bad_pipeline(docs, chunk_size):
    config = RagConfig(
        env=RagEnv(chunk_size),
        keep=RulesConfig(keep_pred=StartsWith("categories", "cs.")),
    )
    deps = get_deps(config)

    # Missing ffilter(keep_rule)!
    bad_pipeline = flow(
        lambda: docs,
        fmap(deps.cleaner),
        flatmap(lambda cd: gen_chunk_doc(cd, config.env)),
        fmap(deps.embedder),
    )
    chunks = structural_dedup_chunks(bad_pipeline())
    expected, _ = full_rag_api_docs(docs, config, deps)
    assert chunks == expected

Failure Trace (Example):

Falsifying example: test_bad_pipeline(
    docs=[RawDoc(doc_id='cs-123', title='Title', abstract='Abstract', categories='invalid')],
    chunk_size=128,
)
AssertionError

Analysis: Shrinks to docs failing keep (e.g., invalid category); catches missing filter bug.


7. When Combinators Aren't Worth It

Use callbacks/loops only in:
- Trivial one-step operations.
- Legacy integrations wrapping combinators.
Guardrails: Isolate to <10 lines; prefer combinators for pipelines.

Example:

# Trivial
for x in xs: print(x)  # OK for one-off

8. Pre-Core Quiz

  1. Nested callbacks? → Use flow.
  2. Eager list(gen)? → Laziness with fmap.
  3. Unbound predicate? → Partial with config.
  4. Effect in mapper? → Seal in stage.
  5. Prove pipeline? → Hypothesis over outputs/prefixes.

9. Post-Core Reflection & Exercise

Reflect: Find a callback chain or loop. Refactor to combinators with bound functions; add Hypothesis for equivalence/idempotence.
Project Exercise: Apply to RAG (e.g., pipeline with fmap/ffilter); run properties.
- Did linearity improve readability?
- Did laziness reduce memory?
- Did binding clarify config?

Next: Core 8 – Tiny Function DSLs.

Verify all patterns with Hypothesis—examples provided show how to detect impurities like globals or non-determinism.

Further Reading: For more on closures in Python, see 'Fluent Python' by Luciano Ramalho. Explore toolz for advanced partials once comfortable.