Module 2: First-Class Functions and Expressive Python¶
Progression Note¶
By the end of Module 2, you'll master first-class functions for configurability, expression-oriented code, and debugging taps. This prepares for lazy iteration in Module 3. See the series progression map in the repo root for full details.
Here's a snippet from the progression map:
| Module | Focus | Key Outcomes |
|---|---|---|
| 1: Foundational FP Concepts | Purity, contracts, refactoring | Spot impurities, write pure functions, prove equivalence with Hypothesis |
| 2: First-Class Functions & Expressive Python | Closures, partials, composable configurators | Configure pure pipelines without globals |
| 3: Lazy Iteration & Generators | Streaming/lazy pipelines | Efficient data processing without materializing everything |
M02C03 – Intro to Laziness with Generators (“Don’t Build the List”)¶
Core question:
How do you replace eager, memory-hungry list comprehensions with lazy generators—so pipelines stay efficient, composable, and only compute what’s needed?
This core introduces laziness with generators in Python:
- Treat data as on-demand streams rather than materialized lists.
- Default to generator expressions for lazy computation, no upfront allocation.
- Build on Core 1/2 for streaming pipelines.
We continue the running project from m02-rag.md—extending the FuncPipe RAG Builder—to ground every concept. This project evolves across all 10 cores: start with an eager, memory-bound version; end with lazy, scalable streams.
Audience: Developers from Core 2 using expression-oriented pipelines but still materializing large lists with [...] comprehensions, risking OOM.
Outcome:
1. Spot eager materialization in code and explain why it wastes memory.
2. Refactor an eager list comprehension to a lazy generator.
3. Write a Hypothesis property providing strong evidence of equivalence on finite data, including a shrinking example.
Runnability Note (Module 01 Snapshot vs Module 02 End-State)¶
Some “before” snippets in this core are hypothetical pre-refactor examples used for contrast. They are labeled accordingly and are not meant to exactly match a real snapshot. We refactor these shapes into the real Module 02 API as the module progresses.
For a real, runnable Module 01 codebase, use the module-01 tag worktree:
make worktrees- Module 01 path:
history/worktrees/module-01/ - Import path for Module 01:
history/worktrees/module-01/src/
1. Conceptual Foundation¶
1.1 Laziness with Generators in One Precise Sentence¶
Laziness with generators defers computation until values are needed, using
yieldto produce sequences on-demand—avoiding memory allocation for large or infinite data.
1.2 The One-Sentence Rule¶
Default to generators for large or unbounded data; materialize only at well-defined edges.
1.3 Why This Matters Now¶
With Core 2 expressions, your pipelines are declarative but eager (materializing huge lists mid-flow risks OOM on big data). Laziness makes them streaming, enabling infinite datasets and constant memory while composing with Core 1 configurators.
1.4 Laziness as Values in 5 Lines¶
Generators as first-class enable dynamic streams:
from collections.abc import Callable, Generator
from funcpipe_rag import CleanDoc, ChunkWithoutEmbedding, RagEnv
from itertools import islice
def chunk_stream(doc: CleanDoc, env: RagEnv) -> Generator[ChunkWithoutEmbedding, None, None]:
text = doc.abstract
for i in range(0, len(text), env.chunk_size):
chunk_text = text[i:i + env.chunk_size]
if not chunk_text:
break
yield ChunkWithoutEmbedding(doc.doc_id, chunk_text, i, i + len(chunk_text))
StreamFactory = Callable[[CleanDoc, RagEnv], Generator[ChunkWithoutEmbedding, None, None]]
streams: dict[str, StreamFactory] = {
"chunks": chunk_stream,
# Add more streams
}
def consume_stream(key: str, doc: CleanDoc, env: RagEnv, n: int) -> list[ChunkWithoutEmbedding]:
it = streams[key](doc, env)
return list(islice(it, n))
Because generators are lazy (compute on next), we can safely store and compose them with Core 1 partials—just like data. Note: generators are one-shot; use factories for reuse.
2. Mental Model: Eager Lists vs Lazy Generators¶
2.1 One Picture¶
Eager Lists (Memory-Bound) Lazy Generators (Streaming)
+-----------------------+ +------------------------------+
| huge = [x for x in N] | | huge = (x for x in N) |
| # OOM on large N | | # O(1) memory |
| print(huge[0]) | | print(next(huge)) |
+-----------------------+ +------------------------------+
↑ Allocates All Now ↑ Computes On-Demand
2.2 Contract Table¶
| Aspect | Eager Lists | Lazy Generators |
|---|---|---|
| Memory | O(n) allocation | O(1) constant |
| Computation | Upfront all | On-demand per item |
| Infinite Data | Impossible | Safe with islice |
| Composability | Lists chain eagerly | Generators chain lazily |
| Testing | Finite only | Finite prefixes via islice |
Note on Eager Choice: Rarely, for small profiled data (e.g., cache reuse), materialize behind a lazy API.
3. Running Project: FuncPipe RAG Builder¶
Our running project (from m02-rag.md) is extending the pure RAG pipeline from Module 1 with laziness.
- Dataset: 10k arXiv CS abstracts (arxiv_cs_abstracts_10k.csv).
- Goal: Make the internal pipeline lazy by replacing eager lists with generators; we still materialize at the edges for now (e.g., for deduplication). Module 3 will fully generalize lazy streaming.
- Start: Hypothetical pre-refactor eager version (core3_start.py, illustration only).
- End (this core): Lazy core, preserving equivalence to Module 1.
3.1 Types (Canonical, Used Throughout)¶
From src/funcpipe_rag/rag_types.py and src/funcpipe_rag/api/types.py (as in Core 1/2).
3.2 Eager Start (Anti-Pattern)¶
This is a hypothetical pre-refactor example used for contrast. It is intentionally not intended to be run as-is in the end-of-Module-02 checkout.
# core3_start.py (hypothetical pre-refactor; illustration only)
from funcpipe_rag import RawDoc, CleanDoc, ChunkWithoutEmbedding, Chunk, RagEnv
from funcpipe_rag import DocRule, Observations, RagTaps
from funcpipe_rag import any_doc
from funcpipe_rag import clean_doc, embed_chunk, structural_dedup_chunks
from typing import Callable
def eager_full_rag_api(docs: list[RawDoc], env: RagEnv, cleaner: Callable[[RawDoc], CleanDoc], *,
keep: DocRule | None = None, taps: RagTaps | None = None) -> tuple[list[Chunk], Observations]:
rule = keep if keep is not None else any_doc
kept_docs = [d for d in docs if rule(d)] # Materializes full list
if taps and taps.docs:
taps.docs(tuple(kept_docs))
cleaned = [cleaner(d) for d in kept_docs] # Another full list
if taps and taps.cleaned:
taps.cleaned(tuple(cleaned))
chunk_we = [c for cd in cleaned for c in gen_chunk_doc(cd, env)] # Materializes huge chunks
embedded = [embed_chunk(c) for c in chunk_we] # Full embed list
chunks = structural_dedup_chunks(embedded)
if taps and taps.chunks:
taps.chunks(tuple(chunks))
obs = Observations(
total_docs=len(docs),
total_chunks=len(chunks),
kept_docs=len(kept_docs),
cleaned_docs=len(cleaned),
sample_doc_ids=tuple(d.doc_id for d in kept_docs[:5]),
sample_chunk_starts=tuple(c.start for c in chunks[:5]),
)
return chunks, obs
# Usage: Risks OOM on large docs
docs: list[RawDoc] = [RawDoc("cs-123", "Title", "Abstract text...", "cs.AI")]
chunks1, obs1 = eager_full_rag_api(docs, RagEnv(512), clean_doc)
chunks2, obs2 = eager_full_rag_api(docs, RagEnv(512), clean_doc)
assert chunks1 == chunks2
Smells: Eager lists (kept_docs, cleaned, chunk_we), upfront allocation.
Problem: Materializes intermediates; OOM on big data.
4. Refactor to Lazy: Generators and Yield¶
4.1 Lazy Core¶
First, the basic transformation from eager list to lazy generator:
from itertools import islice
# Eager
squares = [x**2 for x in range(1000000)] # Allocates full list in memory
# Lazy
squares_gen = (x**2 for x in range(1000000)) # O(1) memory
first_few = list(islice(squares_gen, 10)) # Materialize only what's needed
Now apply to RAG: Use generators; defer computation. Define gen_chunk_doc as a generator.
from collections.abc import Generator, Iterable, Iterator, Callable
from funcpipe_rag import RawDoc, CleanDoc, ChunkWithoutEmbedding, Chunk, RagEnv
from funcpipe_rag import DocRule
from funcpipe_rag import any_doc
from funcpipe_rag import clean_doc, embed_chunk, structural_dedup_chunks
def gen_chunk_doc(cd: CleanDoc, env: RagEnv) -> Generator[ChunkWithoutEmbedding, None, None]:
text = cd.abstract
for start in range(0, len(text), env.chunk_size):
chunk_text = text[start: start + env.chunk_size]
if chunk_text:
yield ChunkWithoutEmbedding(cd.doc_id, chunk_text, start, start + len(chunk_text))
def iter_rag(
docs: Iterable[RawDoc],
env: RagEnv,
cleaner: Callable[[RawDoc], CleanDoc],
*,
keep: DocRule | None = None,
) -> Iterator[Chunk]:
rule = keep if keep is not None else any_doc
kept_docs_gen = (d for d in docs if rule(d)) # Lazy filter
cleaned_gen = (cleaner(d) for d in kept_docs_gen) # Lazy map
chunk_we_gen = (c for cd in cleaned_gen for c in gen_chunk_doc(cd, env)) # Lazy flatMap
embedded_gen = (embed_chunk(c) for c in chunk_we_gen) # Lazy map
yield from embedded_gen # Stream undeduped embedded chunks
# Lazy pipeline internally; materialize at edges (e.g., for dedup)
# Boundary provides a finite, re-iterable input (e.g., a list of RawDoc).
docs: list[RawDoc] = [RawDoc("cs-123", "Title", "Abstract text...", "cs.AI")]
stream = iter_rag(docs, RagEnv(512), clean_doc)
# For deduplication (requires full view), materialize:
chunks1 = structural_dedup_chunks(list(stream))
# Here docs is the boundary: we assume a finite dataset. Module 3 covers streaming from disk/network lazily as well.
chunks2 = structural_dedup_chunks(list(iter_rag(docs, RagEnv(512), clean_doc)))
assert chunks1 == chunks2
Wins: Lazy generators chain with O(1) memory until materialization; matches Module 1/Core 1/2 semantics.
Note: Deduplication requires the full list (global view), so we materialize there; upstream remains lazy. Consume at edges; compose with Module 3 for advanced streaming. Module 3 will generalise iter_rag into reusable iterator stages and bring in itertools (chain, groupby, islice, etc.) — here we only care about the basic [...] → (...) refactor.
5. Equational Reasoning: Substitution Exercise¶
Hand Exercise: Replace expressions in iter_rag.
1. Inline kept_docs_gen = (d for d in docs if rule(d)) → lazy filter.
2. Substitute into cleaned_gen → lazy map.
3. Result: Entire stream computes on-demand until edge materialization.
Bug Hunt: In eager version, substitution allocates eagerly.
6. Property-Based Testing: Providing Strong Evidence of Equivalence (Advanced, Optional)¶
Use Hypothesis to provide strong evidence that the refactor preserved behavior to Module 1.
6.1 Custom Strategy (RAG Domain)¶
From tests/conftest.py (as in Module 1).
6.2 Equivalence Property¶
# tests/test_rag_api.py
from hypothesis import given
import hypothesis.strategies as st
from funcpipe_rag import (
RagEnv,
clean_doc,
embed_chunk,
iter_chunk_doc,
iter_rag,
structural_dedup_chunks,
)
from tests.conftest import doc_list_strategy, env_strategy
def baseline_full_rag(docs, env):
embedded = [embed_chunk(c) for d in docs for c in iter_chunk_doc(clean_doc(d), env)]
return structural_dedup_chunks(embedded)
@given(docs=doc_list_strategy(), env=env_strategy())
def test_m02c03_iter_rag_equivalence(docs, env):
# Lazy: materializes only for dedup (necessary for global view)
lazy_stream = iter_rag(docs, env, clean_doc)
deduped_lazy = structural_dedup_chunks(list(lazy_stream))
# Equivalence to Module 1 (full pipeline)
assert deduped_lazy == baseline_full_rag(docs, env)
Note: On small finite data (per strategy), full materialization in test is fine; proves semantic preservation.
6.3 Shrinking Demo: Catching a Bug¶
Bad refactor (reusing consumed iterator):
def bad_iter_rag(docs: Iterable[RawDoc], env: RagEnv, cleaner: Callable[[RawDoc], CleanDoc], *, keep: DocRule | None = None) -> Iterator[Chunk]:
rule = keep if keep is not None else any_doc
kept_docs_gen = (d for d in docs if rule(d)) # Lazy
list(kept_docs_gen) # Accidentally consumes (e.g., for debug print/logging)
cleaned_gen = (cleaner(d) for d in kept_docs_gen) # Now exhausted!
chunk_we_gen = (c for cd in cleaned_gen for c in gen_chunk_doc(cd, env)) # Empty
embedded_gen = (embed_chunk(c) for c in chunk_we_gen) # Empty
yield from embedded_gen # Empty stream
Property (swapped to bad_iter_rag):
@given(docs=doc_list_strategy(), env=env_strategy())
def test_bad_rag(docs, env):
module1_chunks = baseline_full_rag(docs, env)
bad_stream = bad_iter_rag(docs, env, clean_doc)
deduped_bad = structural_dedup_chunks(list(bad_stream))
assert deduped_bad == module1_chunks # Fails: empty vs non-empty
Hypothesis failure trace (run to verify; example):
Falsifying example: test_bad_rag(
docs=[RawDoc(doc_id='a', title='', abstract='a', categories='')],
env=RagEnv(chunk_size=128),
)
AssertionError
- Shrinks to minimal non-empty docs; catches exhaustion bug (empty output despite valid input).
7. When Laziness Isn't Worth It¶
Rarely, for small/hot paths where full materialization is cheaper, use lists behind a lazy API.
8. Pre-Core Quiz¶
- Eager
[...]on huge → violates? → No eager materialization - Double list build → violates? → One-pass only
-
sorted(infinite)→ what’s the correct approach?Don’t sort the whole infinite stream. Work on finite windows or partial orderings: -
sorted(islice(infinite_stream, n))to sort a finite prefix. -heapq.nsmallest/ nlargest for top-k without full sort. -heapq.mergeonly to merge multiple already-sorted (possibly infinite) streams. 4. Mid-pipelinelist()→ fix with? → Consume at edge
5. Tool to prove lazy ≡ eager (finite)? → Hypothesis equivalence
9. Post-Core Reflection & Exercise¶
Reflect: In your code, find one eager list on big/streaming data.
Apply the recipe:
1. Replace with generator expression / yield.
2. Chain lazy operations.
3. Materialize only at edge (e.g., for global ops like dedup).
4. Prove equivalence with Hypothesis.
5. Compose with Core 1/2 patterns.
Did memory usage drop?
Did the pipeline handle larger data?
Project Exercise: Apply to RAG; run properties on sample data.
Next: Core 4 – Designing FP-Friendly APIs (Small Arity, Explicit Parameters, No Hidden Globals).
Verify all patterns with Hypothesis—examples provided show how to detect impurities like globals or non-determinism.
Further Reading: For more on generators in Python, see 'Fluent Python' by Luciano Ramalho. Explore itertools for advanced lazy tools once comfortable.