Skip to content

Module 2: First-Class Functions and Expressive Python

Progression Note

By the end of Module 2, you'll master first-class functions for configurability, expression-oriented code, and debugging taps. This prepares for lazy iteration in Module 3. See the series progression map in the repo root for full details.

Here's a snippet from the progression map:

Module Focus Key Outcomes
1: Foundational FP Concepts Purity, contracts, refactoring Spot impurities, write pure functions, prove equivalence with Hypothesis
2: First-Class Functions & Expressive Python Closures, partials, composable configurators Configure pure pipelines without globals
3: Lazy Iteration & Generators Streaming/lazy pipelines Efficient data processing without materializing everything

M02C03 – Intro to Laziness with Generators (“Don’t Build the List”)

Core question:
How do you replace eager, memory-hungry list comprehensions with lazy generators—so pipelines stay efficient, composable, and only compute what’s needed?

This core introduces laziness with generators in Python:
- Treat data as on-demand streams rather than materialized lists.
- Default to generator expressions for lazy computation, no upfront allocation.
- Build on Core 1/2 for streaming pipelines.

We continue the running project from m02-rag.md—extending the FuncPipe RAG Builder—to ground every concept. This project evolves across all 10 cores: start with an eager, memory-bound version; end with lazy, scalable streams.

Audience: Developers from Core 2 using expression-oriented pipelines but still materializing large lists with [...] comprehensions, risking OOM.
Outcome:
1. Spot eager materialization in code and explain why it wastes memory.
2. Refactor an eager list comprehension to a lazy generator.
3. Write a Hypothesis property providing strong evidence of equivalence on finite data, including a shrinking example.


Runnability Note (Module 01 Snapshot vs Module 02 End-State)

Some “before” snippets in this core are hypothetical pre-refactor examples used for contrast. They are labeled accordingly and are not meant to exactly match a real snapshot. We refactor these shapes into the real Module 02 API as the module progresses.

For a real, runnable Module 01 codebase, use the module-01 tag worktree:

  • make worktrees
  • Module 01 path: history/worktrees/module-01/
  • Import path for Module 01: history/worktrees/module-01/src/

1. Conceptual Foundation

1.1 Laziness with Generators in One Precise Sentence

Laziness with generators defers computation until values are needed, using yield to produce sequences on-demand—avoiding memory allocation for large or infinite data.

1.2 The One-Sentence Rule

Default to generators for large or unbounded data; materialize only at well-defined edges.

1.3 Why This Matters Now

With Core 2 expressions, your pipelines are declarative but eager (materializing huge lists mid-flow risks OOM on big data). Laziness makes them streaming, enabling infinite datasets and constant memory while composing with Core 1 configurators.

1.4 Laziness as Values in 5 Lines

Generators as first-class enable dynamic streams:

from collections.abc import Callable, Generator
from funcpipe_rag import CleanDoc, ChunkWithoutEmbedding, RagEnv
from itertools import islice


def chunk_stream(doc: CleanDoc, env: RagEnv) -> Generator[ChunkWithoutEmbedding, None, None]:
    text = doc.abstract
    for i in range(0, len(text), env.chunk_size):
        chunk_text = text[i:i + env.chunk_size]
        if not chunk_text:
            break
        yield ChunkWithoutEmbedding(doc.doc_id, chunk_text, i, i + len(chunk_text))


StreamFactory = Callable[[CleanDoc, RagEnv], Generator[ChunkWithoutEmbedding, None, None]]

streams: dict[str, StreamFactory] = {
    "chunks": chunk_stream,
    # Add more streams
}


def consume_stream(key: str, doc: CleanDoc, env: RagEnv, n: int) -> list[ChunkWithoutEmbedding]:
    it = streams[key](doc, env)
    return list(islice(it, n))

Because generators are lazy (compute on next), we can safely store and compose them with Core 1 partials—just like data. Note: generators are one-shot; use factories for reuse.


2. Mental Model: Eager Lists vs Lazy Generators

2.1 One Picture

Eager Lists (Memory-Bound)              Lazy Generators (Streaming)
+-----------------------+               +------------------------------+
| huge = [x for x in N] |               |   huge = (x for x in N)      |
| # OOM on large N      |               |   # O(1) memory              |
| print(huge[0])        |               |   print(next(huge))          |
+-----------------------+               +------------------------------+
   ↑ Allocates All Now                     ↑ Computes On-Demand

2.2 Contract Table

Aspect Eager Lists Lazy Generators
Memory O(n) allocation O(1) constant
Computation Upfront all On-demand per item
Infinite Data Impossible Safe with islice
Composability Lists chain eagerly Generators chain lazily
Testing Finite only Finite prefixes via islice

Note on Eager Choice: Rarely, for small profiled data (e.g., cache reuse), materialize behind a lazy API.


3. Running Project: FuncPipe RAG Builder

Our running project (from m02-rag.md) is extending the pure RAG pipeline from Module 1 with laziness.
- Dataset: 10k arXiv CS abstracts (arxiv_cs_abstracts_10k.csv).
- Goal: Make the internal pipeline lazy by replacing eager lists with generators; we still materialize at the edges for now (e.g., for deduplication). Module 3 will fully generalize lazy streaming.
- Start: Hypothetical pre-refactor eager version (core3_start.py, illustration only).
- End (this core): Lazy core, preserving equivalence to Module 1.

3.1 Types (Canonical, Used Throughout)

From src/funcpipe_rag/rag_types.py and src/funcpipe_rag/api/types.py (as in Core 1/2).

3.2 Eager Start (Anti-Pattern)

This is a hypothetical pre-refactor example used for contrast. It is intentionally not intended to be run as-is in the end-of-Module-02 checkout.

# core3_start.py (hypothetical pre-refactor; illustration only)
from funcpipe_rag import RawDoc, CleanDoc, ChunkWithoutEmbedding, Chunk, RagEnv
from funcpipe_rag import DocRule, Observations, RagTaps
from funcpipe_rag import any_doc
from funcpipe_rag import clean_doc, embed_chunk, structural_dedup_chunks
from typing import Callable


def eager_full_rag_api(docs: list[RawDoc], env: RagEnv, cleaner: Callable[[RawDoc], CleanDoc], *,
                       keep: DocRule | None = None, taps: RagTaps | None = None) -> tuple[list[Chunk], Observations]:
    rule = keep if keep is not None else any_doc
    kept_docs = [d for d in docs if rule(d)]  # Materializes full list
    if taps and taps.docs:
        taps.docs(tuple(kept_docs))
    cleaned = [cleaner(d) for d in kept_docs]  # Another full list
    if taps and taps.cleaned:
        taps.cleaned(tuple(cleaned))
    chunk_we = [c for cd in cleaned for c in gen_chunk_doc(cd, env)]  # Materializes huge chunks
    embedded = [embed_chunk(c) for c in chunk_we]  # Full embed list
    chunks = structural_dedup_chunks(embedded)
    if taps and taps.chunks:
        taps.chunks(tuple(chunks))
    obs = Observations(
        total_docs=len(docs),
        total_chunks=len(chunks),
        kept_docs=len(kept_docs),
        cleaned_docs=len(cleaned),
        sample_doc_ids=tuple(d.doc_id for d in kept_docs[:5]),
        sample_chunk_starts=tuple(c.start for c in chunks[:5]),
    )
    return chunks, obs


# Usage: Risks OOM on large docs
docs: list[RawDoc] = [RawDoc("cs-123", "Title", "Abstract text...", "cs.AI")]
chunks1, obs1 = eager_full_rag_api(docs, RagEnv(512), clean_doc)
chunks2, obs2 = eager_full_rag_api(docs, RagEnv(512), clean_doc)
assert chunks1 == chunks2

Smells: Eager lists (kept_docs, cleaned, chunk_we), upfront allocation.
Problem: Materializes intermediates; OOM on big data.


4. Refactor to Lazy: Generators and Yield

4.1 Lazy Core

First, the basic transformation from eager list to lazy generator:

from itertools import islice

# Eager
squares = [x**2 for x in range(1000000)]  # Allocates full list in memory

# Lazy
squares_gen = (x**2 for x in range(1000000))  # O(1) memory
first_few = list(islice(squares_gen, 10))  # Materialize only what's needed

Now apply to RAG: Use generators; defer computation. Define gen_chunk_doc as a generator.

from collections.abc import Generator, Iterable, Iterator, Callable
from funcpipe_rag import RawDoc, CleanDoc, ChunkWithoutEmbedding, Chunk, RagEnv
from funcpipe_rag import DocRule
from funcpipe_rag import any_doc
from funcpipe_rag import clean_doc, embed_chunk, structural_dedup_chunks


def gen_chunk_doc(cd: CleanDoc, env: RagEnv) -> Generator[ChunkWithoutEmbedding, None, None]:
    text = cd.abstract
    for start in range(0, len(text), env.chunk_size):
        chunk_text = text[start: start + env.chunk_size]
        if chunk_text:
            yield ChunkWithoutEmbedding(cd.doc_id, chunk_text, start, start + len(chunk_text))


def iter_rag(
        docs: Iterable[RawDoc],
        env: RagEnv,
        cleaner: Callable[[RawDoc], CleanDoc],
        *,
        keep: DocRule | None = None,
) -> Iterator[Chunk]:
    rule = keep if keep is not None else any_doc
    kept_docs_gen = (d for d in docs if rule(d))  # Lazy filter
    cleaned_gen = (cleaner(d) for d in kept_docs_gen)  # Lazy map
    chunk_we_gen = (c for cd in cleaned_gen for c in gen_chunk_doc(cd, env))  # Lazy flatMap
    embedded_gen = (embed_chunk(c) for c in chunk_we_gen)  # Lazy map
    yield from embedded_gen  # Stream undeduped embedded chunks


# Lazy pipeline internally; materialize at edges (e.g., for dedup)
# Boundary provides a finite, re-iterable input (e.g., a list of RawDoc).
docs: list[RawDoc] = [RawDoc("cs-123", "Title", "Abstract text...", "cs.AI")]
stream = iter_rag(docs, RagEnv(512), clean_doc)
# For deduplication (requires full view), materialize:
chunks1 = structural_dedup_chunks(list(stream))
# Here docs is the boundary: we assume a finite dataset. Module 3 covers streaming from disk/network lazily as well.
chunks2 = structural_dedup_chunks(list(iter_rag(docs, RagEnv(512), clean_doc)))
assert chunks1 == chunks2

Wins: Lazy generators chain with O(1) memory until materialization; matches Module 1/Core 1/2 semantics.
Note: Deduplication requires the full list (global view), so we materialize there; upstream remains lazy. Consume at edges; compose with Module 3 for advanced streaming. Module 3 will generalise iter_rag into reusable iterator stages and bring in itertools (chain, groupby, islice, etc.) — here we only care about the basic [...] → (...) refactor.


5. Equational Reasoning: Substitution Exercise

Hand Exercise: Replace expressions in iter_rag.
1. Inline kept_docs_gen = (d for d in docs if rule(d)) → lazy filter.
2. Substitute into cleaned_gen → lazy map.
3. Result: Entire stream computes on-demand until edge materialization.
Bug Hunt: In eager version, substitution allocates eagerly.


6. Property-Based Testing: Providing Strong Evidence of Equivalence (Advanced, Optional)

Use Hypothesis to provide strong evidence that the refactor preserved behavior to Module 1.

6.1 Custom Strategy (RAG Domain)

From tests/conftest.py (as in Module 1).

6.2 Equivalence Property

# tests/test_rag_api.py
from hypothesis import given
import hypothesis.strategies as st
from funcpipe_rag import (
    RagEnv,
    clean_doc,
    embed_chunk,
    iter_chunk_doc,
    iter_rag,
    structural_dedup_chunks,
)
from tests.conftest import doc_list_strategy, env_strategy

def baseline_full_rag(docs, env):
    embedded = [embed_chunk(c) for d in docs for c in iter_chunk_doc(clean_doc(d), env)]
    return structural_dedup_chunks(embedded)

@given(docs=doc_list_strategy(), env=env_strategy())
def test_m02c03_iter_rag_equivalence(docs, env):
    # Lazy: materializes only for dedup (necessary for global view)
    lazy_stream = iter_rag(docs, env, clean_doc)
    deduped_lazy = structural_dedup_chunks(list(lazy_stream))
    # Equivalence to Module 1 (full pipeline)
    assert deduped_lazy == baseline_full_rag(docs, env)

Note: On small finite data (per strategy), full materialization in test is fine; proves semantic preservation.

6.3 Shrinking Demo: Catching a Bug

Bad refactor (reusing consumed iterator):

def bad_iter_rag(docs: Iterable[RawDoc], env: RagEnv, cleaner: Callable[[RawDoc], CleanDoc], *, keep: DocRule | None = None) -> Iterator[Chunk]:
    rule = keep if keep is not None else any_doc
    kept_docs_gen = (d for d in docs if rule(d))  # Lazy
    list(kept_docs_gen)  # Accidentally consumes (e.g., for debug print/logging)
    cleaned_gen = (cleaner(d) for d in kept_docs_gen)  # Now exhausted!
    chunk_we_gen = (c for cd in cleaned_gen for c in gen_chunk_doc(cd, env))  # Empty
    embedded_gen = (embed_chunk(c) for c in chunk_we_gen)  # Empty
    yield from embedded_gen  # Empty stream

Property (swapped to bad_iter_rag):

@given(docs=doc_list_strategy(), env=env_strategy())
def test_bad_rag(docs, env):
    module1_chunks = baseline_full_rag(docs, env)
    bad_stream = bad_iter_rag(docs, env, clean_doc)
    deduped_bad = structural_dedup_chunks(list(bad_stream))
    assert deduped_bad == module1_chunks  # Fails: empty vs non-empty

Hypothesis failure trace (run to verify; example):

Falsifying example: test_bad_rag(
    docs=[RawDoc(doc_id='a', title='', abstract='a', categories='')], 
    env=RagEnv(chunk_size=128),
)
AssertionError
  • Shrinks to minimal non-empty docs; catches exhaustion bug (empty output despite valid input).

7. When Laziness Isn't Worth It

Rarely, for small/hot paths where full materialization is cheaper, use lists behind a lazy API.


8. Pre-Core Quiz

  1. Eager [...] on huge → violates? → No eager materialization
  2. Double list build → violates? → One-pass only
  3. sorted(infinite) → what’s the correct approach?

    Don’t sort the whole infinite stream. Work on finite windows or partial orderings: - sorted(islice(infinite_stream, n)) to sort a finite prefix. - heapq.nsmallest / nlargest for top-k without full sort. - heapq.merge only to merge multiple already-sorted (possibly infinite) streams. 4. Mid-pipeline list() → fix with? → Consume at edge
    5. Tool to prove lazy ≡ eager (finite)? → Hypothesis equivalence

9. Post-Core Reflection & Exercise

Reflect: In your code, find one eager list on big/streaming data.
Apply the recipe: 1. Replace with generator expression / yield. 2. Chain lazy operations. 3. Materialize only at edge (e.g., for global ops like dedup). 4. Prove equivalence with Hypothesis. 5. Compose with Core 1/2 patterns.

Did memory usage drop?
Did the pipeline handle larger data?

Project Exercise: Apply to RAG; run properties on sample data.

Next: Core 4 – Designing FP-Friendly APIs (Small Arity, Explicit Parameters, No Hidden Globals).

Verify all patterns with Hypothesis—examples provided show how to detect impurities like globals or non-determinism.

Further Reading: For more on generators in Python, see 'Fluent Python' by Luciano Ramalho. Explore itertools for advanced lazy tools once comfortable.