Module 3: Lazy Iteration and Streaming¶
Progression Note¶
By the end of Module 3, you will master lazy generators, itertools mastery, and streaming pipelines that never materialize unnecessary data. This prepares you for safe recursion and error handling in streams (Module 4). See the series progression map in the repo root for full details.
Here's a snippet from the progression map:
| Module | Focus | Key Outcomes |
|---|---|---|
| 2 | First-Class Functions & Expressive Python | Configurable pure pipelines without globals |
| 3 | Lazy Iteration & Generators | Memory-efficient streaming, itertools mastery, short-circuiting |
| 4 | Recursion & Error Handling in Streams | Safe recursion, Result/Option, streaming errors |
M03C01: Iterator Protocol & Generator Functions – Lazy Dataflow Foundations¶
Core question:
How do you useyieldand the iterator protocol (__iter__/__next__) to transform eager list-based transforms into lazy, memory-efficient generators that only compute on demand — and why does this unlock everything else in Module 3?
This core introduces the iterator protocol and generator functions as the foundation for lazy dataflow in Python:
- Treat data pipelines as iterable sequences that produce values on-the-fly via yield.
- Replace eager list comprehensions with generators to avoid materializing full collections in memory.
- Isolate laziness to individual stages while preserving equivalence to Module 2’s eager versions, with explicit, tight laws for invariants (index correctness, coverage, bounded traversal, etc.).
We use the running project from m03-rag.md — refactoring the FuncPipe RAG Builder — to ground every concept. This project evolves across all 10 cores: start with an eager, list-based version; end with a fully lazy, streaming pipeline.
Audience: Python developers who have mastered Module 2’s pure configurators and expression-oriented code but still hit OOM errors on large datasets or waste time computing chunks that are never used.
Outcome:
1. Spot eagerness in code and explain exactly how much memory it wastes.
2. Refactor a 10–20 line eager function to a lazy generator using yield in < 5 minutes.
3. Write a Hypothesis property proving equivalence (including shrinking) and exact bounded traversal.
Text Slicing Policy: Chunks are code-point slices; grapheme clusters may be split (same as Python’s native slicing).
1. Conceptual Foundation¶
1.1 The One-Sentence Rule¶
Write generator functions by default for any pipeline stage that produces a sequence: use
yieldto produce values lazily, only when pulled, and never build the full list unless you explicitly materialise at a boundary.
1.2 Iterator Protocol in One Precise Sentence¶
An iterator is any object that implements
__iter__(returns itself) and__next__(returns the next value or raisesStopIteration); generator functions are the idiomatic way to create them becauseyieldautomatically handles the protocol and resumable state.
1.3 Why This Matters Now¶
Module 2 gave you pure, configurable pipelines — but they were still eager.
A single 100 MB abstract becomes hundreds of thousands of chunks → hundreds of MB of dataclass instances before you even embed the first one.
Lazy generators let you process terabytes of text with constant memory and short-circuit the moment you have enough chunks (e.g., top-k retrieval).
1.4 Generators as Lazy Values in 5 Lines¶
from collections.abc import Iterator
def lazy_range(n: int) -> Iterator[int]:
i = 0
while i < n:
yield i
i += 1
gen = lazy_range(10**9) # ← Nothing computed yet! Zero memory allocated.
print(next(gen)) # Only now does it compute the first value → 0
print(next(gen)) # → 1 (state was preserved)
Because the function is resumable, the generator is effectively a lazy, single-use stream — exactly what we need for RAG pipelines.
2. Mental Model: Eager Lists vs Lazy Generators¶
2.1 One Picture¶
Eager Lists (Materialize All) Lazy Generators (On-Demand)
+---------------------------+ +------------------------------+
| docs → [clean(doc) for doc] | docs → (clean(doc) for doc in docs)
| ↓ | ↓
| full list in memory | yield only when next() is called
| always computes everything | short-circuit with break/islice
+---------------------------+ +------------------------------+
↑ OOM on 100k+ docs ↑ Constant memory, stream-safe
2.2 Contract Table¶
| Aspect | Eager Lists | Lazy Generators |
|---|---|---|
| Memory | O(n) immediately | O(1) until consumed |
| Computation | All items upfront | Only what is pulled |
| Short-circuit | Impossible | Native (break, islice, next) |
| Re-iterability | Yes | No (single-use) — use tee if needed |
| Equivalence | Trivial | Proven with Hypothesis (exact laws) |
Note on Eager Choice: Only for tiny, repeatedly accessed data (≤ 10 k items). Everything else → generator.
3. Running Project: Lazy Chunking in RAG¶
- Dataset: 10k arXiv CS abstracts (
arxiv_cs_abstracts_10k.csv). - Goal: Lazily clean → chunk → embed → dedup (dedup added later).
- Start: Eager list-based version.
- End (this core): Single-stage lazy chunking with tight, verified laws.
3.1 Types (Canonical, Used Throughout)¶
from dataclasses import dataclass
from collections.abc import Iterator
@dataclass(frozen=True)
class CleanDoc:
doc_id: str
title: str
abstract: str
categories: str
@dataclass(frozen=True)
class ChunkWithoutEmbedding:
doc_id: str
text: str
start: int
end: int
@dataclass(frozen=True)
class RagEnv:
chunk_size: int
3.2 Eager Start (Anti-Pattern + Reference)¶
def chunk_doc(cd: CleanDoc, env: RagEnv) -> list[ChunkWithoutEmbedding]:
"""Eager reference implementation – used only for equivalence proofs."""
text = cd.abstract
step = env.chunk_size
n = len(text)
chunks: list[ChunkWithoutEmbedding] = []
i = 0
while i < n:
j = min(i + step, n)
segment = text[i:j]
if segment:
chunks.append(ChunkWithoutEmbedding(cd.doc_id, segment, i, j))
i = j
return chunks
Smells: Allocates the entire chunk list even if the caller only needs the first 10 chunks.
4. Refactor to Lazy: Generator Functions in RAG¶
4.1 Lazy Core – The One True Implementation¶
from collections.abc import Iterator
from dataclasses import replace
import math
def gen_chunk_doc(cd: CleanDoc, env: RagEnv) -> Iterator[ChunkWithoutEmbedding]:
"""
Tight laws (enforced exactly by Hypothesis):
L1: list(gen_chunk_doc(cd, env)) == chunk_doc(cd, env) # equivalence
L2: ∀c → c.end - c.start == len(c.text) # length invariant
L3: ''.join(c.text for c in gen_chunk_doc(cd, env)) == cd.abstract # perfect coverage
L4: chunk.starts are strictly increasing # order invariant
L5: cd.abstract.__getitem__ called exactly
(len(cd.abstract) + step - 1) // step times # exact bounded traversal
"""
step = env.chunk_size
if not isinstance(step, int) or step < 1:
raise ValueError(f"chunk_size must be positive integer (got {step!r})")
text = cd.abstract
n = len(text)
i = 0
while i < n:
j = min(i + step, n)
segment = text[i:j]
if segment:
yield ChunkWithoutEmbedding(cd.doc_id, segment, i, j)
i = j
# Preferred zero-copy variant (downstream decides when to materialise text)
def gen_chunk_spans(cd: CleanDoc, env: RagEnv) -> Iterator[tuple[int, int]]:
"""
Same preconditions and laws L1–L5 as gen_chunk_doc, except:
- Yields (start, end) offsets only
- Law L1 becomes: [cd.abstract[i:j] for i,j in gen_chunk_spans(...)] == [c.text for c in gen_chunk_doc(...)]
"""
step = env.chunk_size
if not isinstance(step, int) or step < 1:
raise ValueError(f"chunk_size must be positive integer (got {step!r})")
text = cd.abstract
n = len(text)
i = 0
while i < n:
j = min(i + step, n)
if i < j: # only yield non-empty spans
yield (i, j)
i = j
Wins:
- Constant memory regardless of document size.
- Short-circuitable: next(gen) computes only one chunk.
- Proven equivalent to eager version via Hypothesis with exact bounds.
Re-iterability Warning (once per module):
Generators are single-use. Never iterate twice. Safe patterns:
# Bounded prefix tap
from itertools import islice, chain
def tap_prefix(it, n, hook):
head = tuple(islice(it, n))
hook(head)
return chain(head, it)
# Unbounded parallel (use sparingly)
from itertools import tee
a, b = tee(it, 2) # memory = lag between consumers
5. Equational Reasoning: Substitution Exercise¶
gen_chunk_doc(cd, env)
≡ iterator that yields ChunkWithoutEmbedding(...) in exact chunk_doc order
≡ list(gen_chunk_doc(cd, env)) == chunk_doc(cd, env) # L1, proven exactly
Because the generator is pure and deterministic, list(gen_chunk_doc(cd, env)) is substitutable for the old eager list — equational reasoning holds perfectly.
6. Property-Based Testing: Proving Equivalence & Laws¶
6.1 Custom Strategy + Helpers¶
import hypothesis.strategies as st
from dataclasses import replace
class CountingStr(str):
def __new__(cls, value):
obj = super().__new__(cls, value)
obj.calls = 0
return obj
def __getitem__(self, key):
self.calls += 1
return super().__getitem__(key)
clean_doc_st = st.builds(
CleanDoc,
doc_id=st.text(min_size=1, max_size=20),
title=st.text(max_size=100),
abstract=st.text(max_size=10_000), # stress real slicing
categories=st.text(max_size=50),
)
env_st = st.builds(RagEnv, chunk_size=st.integers(1, 2048))
6.2 Full Suite (All Laws Enforced Exactly)¶
from hypothesis import given
@given(clean_doc_st, env_st)
def test_equivalence(cd, env):
assert list(gen_chunk_doc(cd, env)) == chunk_doc(cd, env)
@given(clean_doc_st, env_st)
def test_laws_l2_l3_l4(cd, env):
chunks = list(gen_chunk_doc(cd, env))
assert "".join(c.text for c in chunks) == cd.abstract # L3
for c in chunks:
assert c.end - c.start == len(c.text) # L2
starts = [c.start for c in chunks]
assert starts == sorted(starts) and len(starts) == len(set(starts)) # L4: strictly increasing
@given(clean_doc_st, env_st)
def test_exact_bounded_traversal(cd, env):
text = CountingStr(cd.abstract)
cd = replace(cd, abstract=text)
list(gen_chunk_doc(cd, env)) # consume
expected = (len(text) + env.chunk_size - 1) // env.chunk_size
assert text.calls == expected # L5 exact
@given(clean_doc_st, env_st)
def test_spans_equivalence_and_l2(cd, env):
spans = list(gen_chunk_spans(cd, env))
chunks = list(gen_chunk_doc(cd, env))
assert len(spans) == len(chunks)
for (i, j), c in zip(spans, chunks):
assert c.text == cd.abstract[i:j]
assert j - i == len(c.text)
6.3 Shrinking Demo: Catching a Real Bug¶
Bad version (yields empty chunks when length % step == 0):
def bad_gen_chunk_doc(cd: CleanDoc, env: RagEnv):
i = 0
text = cd.abstract
while i <= len(text): # wrong condition
segment = text[i:i + env.chunk_size]
yield ChunkWithoutEmbedding(cd.doc_id, segment, i, i + len(segment))
i += env.chunk_size
Property fails and shrinks to:
Falsifying example:
cd=CleanDoc(doc_id='a', abstract='aaaa', categories=''),
env=RagEnv(chunk_size=4)
→ Bad version yields final empty chunk; Hypothesis finds it in < 0.1 s.
7. When Laziness Isn’t Worth It¶
Only for: - Tiny sequences you will definitely iterate multiple times. - When you need random access (use list).
Everything else → generator.
8. Pre-Core Quiz¶
list(gen()) == eager()always? → Yes, by exact law L1.next(gen)computes everything? → No — only one item.- Can you iterate a generator twice? → No — single-use.
- How to safely peek at first 10 items? →
tap_prefixorislice+chain. - Global inside generator? → Breaks purity — isolate or inject explicitly.
9. Post-Core Reflection & Exercise¶
Reflect: Find one list comprehension in your codebase that builds > 10 k items. Refactor to generator. Add the exact equivalence + bounded-traversal properties.
Project Exercise: Run the lazy chunker on the full 10k arXiv dataset. Measure peak memory (should be ~10–20 MB instead of 500+ MB eager).
Next: M03C02 – Generator Expressions & itertools Foundations.
You now own the single most important primitive in all of Module 3. Everything else is just composing these lazy streams.