Module 3: Lazy Iteration and Streaming¶
Progression Note¶
By the end of Module 3, you will master lazy generators, itertools mastery, and streaming pipelines that never materialize unnecessary data. This prepares you for safe recursion and error handling in streams (Module 4). See the series progression map in the repo root for full details.
Here's a snippet from the progression map:
| Module | Focus | Key Outcomes |
|---|---|---|
| 2 | First-Class Functions & Expressive Python | Configurable pure pipelines without globals |
| 3 | Lazy Iteration & Generators | Memory-efficient streaming, itertools mastery, short-circuiting |
| 4 | Recursion & Error Handling in Streams | Safe recursion, Result/Option, streaming errors |
M03C06: Building Reusable Iterator-Based FuncPipe Stages – Reusable Pipelines, Not One-Off Scripts¶
Core question:
How do you build reusable, composable iterator stages using closures and higher-order functions to create configurable pipelines instead of rigid one-off scripts, while preserving purity and equivalence?
This core builds on Core 5's fencing by introducing closures for truly reusable stages: - Use closures to configure generators. - Avoid globals/hardcoding; explicit frozen config. - Enable pipeline factories for tests/configs/variants. - Preserve laziness and equivalence.
We expand beyond the running project from Core 5 (FuncPipe RAG Builder from m03-rag.md) by adding cross-domain examples like log ETL, CSV streaming, API pagination, filesystem walks, and telemetry to prove scalability.
Audience: Developers with ad-hoc scripts needing reusable, testable pipelines.
Outcome: 1. Spot hardcoding smells like globals. 2. Refactor to closure-factory in < 10 lines. 3. Prove reuse laws with Hypothesis.
Laws (frozen, used across this core): - E1 — Equivalence: pipe(factory(S)) == eager_equiv(S). - P1 — Purity: No globals; all config explicit (captured immutably). - R1 — Reusability: factory() yields fresh iterator each call. - C1 — Closure parity: partial(fn, a=1)(x) == fn(x, a=1). - DTR — Determinism: For pure stages, given equal inputs/config, outputs are equal bit-for-bit. - FR — Freshness: - For Source factories: src() and src() are independent iterators. - For Transform factories: mk = factory(config); mk(xs) and mk(xs) are independent iterators.
1. Conceptual Foundation¶
1.1 The One-Sentence Rule¶
Use closures to make generator factories that take frozen config and return fresh iterators, eliminating globals for reusable pipelines.
1.2 Reusable Stages in One Precise Sentence¶
Closures capture immutable config in generator factories for composable, pure stages that are deterministic and fresh on every call.
1.3 Why This Matters Now¶
Hardcoded scripts are brittle; factories make reusable, configurable pipelines across domains like ETL, logging, and APIs.
1.4 Reusable Stages in 5 Lines¶
Closure factory:
def make_gen(env):
def gen(docs):
for d in docs:
yield process(d, env)
return gen
rag_fn = make_gen(RagEnv(512))
chunks = list(rag_fn(docs)) # configurable
Reusable.
1.5 Minimal Stage Harness (Foundation for All Examples)¶
To ensure consistent, type-safe composition, use this protocol and helpers for all stages. We distinguish:
- Source[T]: Callable[[], Iterator[T]] – Produces data from nothing (e.g., file readers, pagers). Sources may be effectful (I/O, retry, sleep).
- Transform[A, B]: Takes input, transforms to output — must be pure.
- Sink[B]: Consumes for side-effects (not covered here; fence before sinks).
from typing import TypeVar, Iterable, Iterator, Protocol, Generic, Callable
from itertools import islice
from functools import reduce
A = TypeVar("A"); B = TypeVar("B"); C = TypeVar("C")
Source = Callable[[], Iterator[A]]
class Transform(Protocol, Generic[A, B]):
def __call__(self, xs: Iterable[A]) -> Iterator[B]: ...
def compose2(s1: Transform[A, B], s2: Transform[B, C]) -> Transform[A, C]:
def pipe(xs: Iterable[A]) -> Iterator[C]:
return s2(s1(xs))
return pipe
def compose(*stages: Transform) -> Transform:
# Note: intentionally loose typing for simplicity; in production use overloads or fixed-arity versions
if not stages: raise ValueError("compose needs ≥1 stage")
return reduce(compose2, stages)
def fmap(fn: Callable[[A], B]) -> Transform[A, B]:
def stage(xs: Iterable[A]) -> Iterator[B]:
for x in xs: yield fn(x)
return stage
def ffilter(pred: Callable[[A], bool]) -> Transform[A, A]:
def stage(xs: Iterable[A]) -> Iterator[A]:
for x in xs:
if pred(x): yield x
return stage
def fence_k(k: int) -> Transform[A, A]:
return lambda xs: islice(xs, k)
def source_to_transform(src: Source[A]) -> Transform[None, A]:
def adapter(_: Iterable[None]) -> Iterator[A]:
yield from src()
return adapter
This harness promotes explicit composition, type safety, and purity. Use frozen dataclasses for configs to prevent mutation.
2. Mental Model: Hardcoded vs Reusable¶
2.1 One Picture¶
Hardcoded Scripts (Rigid) Reusable Factories (Flexible)
+-----------------------+ +------------------------------+
| globals/env in fn | | closure(frozen_config) |
| ↓ | | ↓ |
| one-off, untestable | | factory() → fresh iter |
| reuse = copy-paste | | composable, testable |
+-----------------------+ +------------------------------+
↑ Brittle / Globals ↑ Pure / Configurable
2.2 Behavioral Contract¶
| Aspect | Hardcoded (Globals) | Reusable (Closures) |
|---|---|---|
| Config | Globals/hardcode | Explicit frozen params |
| Reuse | Copy-paste | Factory call |
| Purity | Hidden state | Pure functions |
| Equivalence | Fragile | Via properties |
Default to factories; hardcoded scripts are only defensible for tiny, throw-away one-offs.
When Not to Use Closures: Complex state; use classes (later cores).
Known Pitfalls: - Late binding in loops → capture with default args. - Mutable captured config → use frozen dataclasses.
Forbidden Patterns: - Globals in core. - Enforce with grep for global.
Building Blocks Sidebar: - Closures for currying. - lambda for simple. - def inner for complex.
Resource Semantics: Stages must handle cleanup (e.g., files close on early stop).
Error Model: Fail-fast; no swallowing. Expose retry as explicit wrappers for sources only.
Backpressure: Filter/map → fence → amplify. Enforce with CI guards.
Taxonomy: Sources (no input, may be effectful), transforms (pure in→out), sinks (side-effects). Retries only on sources; transforms idempotent.
3. Cross-Domain Examples: Proving Scalability¶
To demonstrate reuse beyond RAG, here are production-grade examples using the harness. Each is pure (transforms) or appropriately effectful (sources), configurable, and follows the laws.
Each make_*_pipeline function returns a Transform[...] that you can either call directly (e.g. list(pipeline(xs))) or plug into larger chains via compose(...).
3.1 Example 1: Streaming CSV ETL (Schema Map → Filter → Fence)¶
import csv
from typing import Any, Callable, Iterator
def make_csv_source(path: str, *, dialect: str = "excel") -> Source[dict[str, str]]:
def src() -> Iterator[dict[str, str]]:
f = open(path, newline="")
try:
rdr = csv.DictReader(f, dialect=dialect)
for row in rdr: yield row
finally:
f.close()
return src
def make_project(cols: dict[str, str]) -> Transform[dict[str, str], dict[str, str]]:
def stage(rows: Iterable[dict[str, str]]) -> Iterator[dict[str, str]]:
for r in rows:
yield {o: r[i] for o, i in cols.items()}
return stage
def make_cast(spec: dict[str, Callable[[str], Any]], *, strict: bool = True) -> Transform[dict[str, str], dict[str, Any]]:
def stage(rows: Iterable[dict[str, str]]) -> Iterator[dict[str, Any]]:
for r in rows:
out = dict(r)
try:
for k, caster in spec.items():
out[k] = caster(r[k])
yield out
except Exception as e:
if strict: raise
# else skip silently or log
return stage
def make_csv_pipeline(path: str, max_rows: int) -> Transform[None, dict[str, Any]]:
src = make_csv_source(path)
return compose(
source_to_transform(src),
ffilter(lambda r: r.get("status") == "active"),
make_project({"id": "user_id", "amount": "total"}),
make_cast({"amount": float}),
fence_k(max_rows),
)
Why it's good: Single-pass, file closes on early stop, fence at the sink, no globals.
3.2 Example 2: Log Tail with Regex Filter and Rotation-Safe Reopen¶
import io, os, re, time
def follow(path: str, poll: float = 0.2) -> Iterator[str]:
f = open(path, "r", encoding="utf8", errors="replace")
try:
f.seek(0, io.SEEK_END)
ino = os.fstat(f.fileno()).st_ino
while True:
line = f.readline()
if line:
yield line.rstrip("\n")
else:
time.sleep(poll)
try:
if os.stat(path).st_ino != ino:
f.close()
f = open(path, "r", encoding="utf8", errors="replace")
ino = os.fstat(f.fileno()).st_ino
except FileNotFoundError:
time.sleep(poll)
finally:
f.close() # guaranteed cleanup even on early stop
def make_log_source(path: str) -> Source[str]:
def src() -> Iterator[str]:
yield from follow(path)
return src
def make_regex_filter(pattern: str) -> Transform[str, str]:
rx = re.compile(pattern)
return ffilter(rx.search)
def make_log_pipeline(path: str, pattern: str, k: int) -> Transform[None, str]:
src = make_log_source(path)
return compose(
source_to_transform(src),
make_regex_filter(pattern),
fence_k(k),
)
Why it's good: Cleanup guaranteed, rotation handled, bounded output.
3.3 Example 3: API Pagination (Pure Generator + Explicit Retry)¶
from typing import Any, Dict, Callable, Iterator
from time import sleep
def pager(fetch_page: Callable[[str|None], Dict[str, Any]], *, attempts=3) -> Iterator[Dict[str, Any]]:
token = None
while True:
tries = 0
while tries < attempts:
try:
page = fetch_page(token)
break
except Exception:
tries += 1
sleep(0.5 * tries)
else:
raise RuntimeError("page fetch failed")
for item in page["items"]: yield item
token = page.get("next")
if not token: return
def make_api_pipeline(fetch_page: Callable[[str|None], Dict[str, Any]], pred: Callable[[Dict[str, Any]], bool], k: int) -> Transform[None, Dict[str, Any]]:
raw_src: Source[Dict[str, Any]] = lambda: pager(fetch_page, attempts=3)
return compose(
source_to_transform(raw_src),
ffilter(pred),
fence_k(k),
)
Why it's good: Retries explicit and local to page; no hidden loops or duplicates.
3.4 Example 4: Telemetry – Sliding Windows per Device (Contiguity Contract)¶
from collections import deque
from itertools import groupby
from operator import itemgetter
from collections.abc import Hashable
from typing import Dict, Callable, Iterable, Iterator
def sliding(w: int) -> Transform[Dict, tuple[Dict,...]]:
def stage(xs: Iterable[Dict]) -> Iterator[tuple[Dict,...]]:
buf = deque(maxlen=w)
for x in xs:
buf.append(x)
if len(buf) == w:
yield tuple(buf)
return stage
def ensure_contiguous(key: Callable[[Dict], Hashable]) -> Transform[Dict, Dict]:
def stage(xs: Iterable[Dict]) -> Iterator[Dict]:
seen, prev = set(), object()
for i, x in enumerate(xs):
k = key(x)
if k != prev and k in seen:
raise ValueError(f"Non-contiguous key {k!r} at index {i}")
seen.add(k); prev = k
yield x
return stage
def make_rolling_avg_by_device(w: int) -> Transform[Dict, Dict]:
def stage(xs: Iterable[Dict]) -> Iterator[Dict]:
key = itemgetter("device_id")
xs = ensure_contiguous(key)(xs)
for did, grp in groupby(xs, key=key):
for window in sliding(w)(grp):
avg = sum(pt["value"] for pt in window) / w
yield {"device_id": did, "avg": avg, "end_ts": window[-1]["ts"]}
return stage
Why it's good: Bounded memory O(w), explicit contiguity guard, single pass.
3.5 Example 5: Filesystem Stream (Walk → Filter → Hash) Without Materialization¶
import os, hashlib
def make_walk_source(root: str) -> Source[str]:
def src() -> Iterator[str]:
for dirpath, _, files in os.walk(root):
for fn in files: yield os.path.join(dirpath, fn)
return src
def make_ext_filter(exts: set[str]) -> Transform[str, str]:
return ffilter(lambda p: os.path.splitext(p)[1].lower() in exts)
def make_sha256() -> Transform[str, tuple[str, str]]:
def stage(paths: Iterable[str]) -> Iterator[tuple[str, str]]:
for p in paths:
h = hashlib.sha256()
with open(p, "rb") as f:
for chunk in iter(lambda: f.read(1024 * 1024), b""):
h.update(chunk)
yield (p, h.hexdigest())
return stage
Why it's good: No path lists; file handles close; chunked IO.
3.6 Example 6: Text N-Grams (Closure Config, Fence, Determinism)¶
import re
def make_tokenize(rx=r"\w+") -> Transform[str, list[str]]:
pat = re.compile(rx)
return fmap(lambda s: pat.findall(s.lower()))
def make_ngrams(n: int) -> Transform[list[str], tuple[str,...]]:
def stage(tokens_iterables: Iterable[list[str]]) -> Iterator[tuple[str,...]]:
for toks in tokens_iterables:
for i in range(len(toks) - n + 1):
yield tuple(toks[i:i+n])
return stage
def make_ngram_pipeline(n: int, k: int) -> Transform[str, tuple[str,...]]:
return compose(
make_tokenize(),
make_ngrams(n),
fence_k(k),
)
Why it's good: Configurable amplification; fence prevents explosion.
3.7 Running Project: Reusable Stages in RAG (One Among Many)¶
For continuity, apply to RAG (from Core 5):
from collections.abc import Iterable, Iterator, Callable
from rag_types import RawDoc, RagEnv, ChunkWithoutEmbedding
from core2 import gen_clean_docs
from core5 import gen_bounded_chunks
def make_gen_rag_fn(env: RagEnv, max_chunks: int) -> Callable[[Iterable[RawDoc]], Iterator[ChunkWithoutEmbedding]]:
"""Config → (docs -> chunks). Pure, reusable, single-pass."""
def pipe(docs: Iterable[RawDoc]) -> Iterator[ChunkWithoutEmbedding]:
cleaned = gen_clean_docs(docs)
yield from gen_bounded_chunks(cleaned, env, max_chunks=max_chunks)
return pipe
# Reusable: Variants
rag_512 = make_gen_rag_fn(RagEnv(512), 1000)
chunks_512 = list(rag_512(docs))
rag_256 = make_gen_rag_fn(RagEnv(256), 500)
chunks_256 = list(rag_256(docs))
# Integration with generic harness (optional but recommended)
rag_stage: Transform[RawDoc, ChunkWithoutEmbedding] = make_gen_rag_fn(env, max_chunks)
pipeline = compose(rag_stage, some_downstream_stage)
Wins: Configurable; testable. Integrates with harness via compose if needed.
4. Anti-Patterns and Fixes¶
- Hidden Materialization:
list(...)in a stage breaks laziness. Fix: Stream + fence at the caller. - Late Binding in Loops:
stages.append(lambda x: f(x, k))inside a loop uses lastk. Fix: Capturekvia default arg or inner factory. - Implicit Retries: Retrying inside arbitrary stages surprises callers. Fix: Provide page-level retry for sources only.
- Shared Mutable Env: Two factories capturing same mutable config cause cross-talk. Fix: Frozen dataclasses or copy-on-capture.
5. Equational Reasoning: Substitution Exercise¶
Hand Exercise: Inline factory → configurable (apply to any example, e.g., CSV).
Bug Hunt: Hardcoded globals; factory explicit.
6. Property-Based Testing: Proving Equivalence (Advanced, Optional)¶
6.1 Custom Strategy¶
As previous.
6.2 Equivalence Property¶
@given(st.lists(raw_doc_st, max_size=40), env_st, st.integers(100,1000))
def test_factory_parity(docs, env, max_chunks):
rag_fn = make_gen_rag_fn(env, max_chunks)
assert list(rag_fn(docs)) == list(gen_bounded_chunks(gen_clean_docs(docs), env, max_chunks=max_chunks))
@given(st.lists(raw_doc_st, max_size=50), env_st, st.integers(1, 500))
def test_freshness(docs, env, cap):
mk = make_gen_rag_fn(env, cap)
a = mk(docs); b = mk(docs)
# interleaved consumption must match full materialization
got = []
it_a, it_b = iter(a), iter(b)
while True:
try: got.append(next(it_a))
except StopIteration: break
try: got.append(next(it_b))
except StopIteration: break
expect = list(mk(docs)) + list(mk(docs))
assert got == expect
@given(st.lists(raw_doc_st, max_size=50), env_st, st.integers(1, 500))
def test_determinism(docs, env, cap):
mk = make_gen_rag_fn(env, cap)
assert list(mk(docs)) == list(mk(docs))
Note: all laws are phrased in terms of new iterator instances from the factory or source; once an iterator is consumed, Python’s single-pass semantics still apply.
6.3 Additional Tests for Examples¶
from hypothesis import given, strategies as st
from itertools import islice # already imported earlier in the module; repeated here for test file completeness
@given(st.lists(st.text(), max_size=50))
def test_ngram_determinism(lines):
mk = make_ngram_pipeline(3, 1000)
assert list(mk(lines)) == list(mk(lines))
@given(st.lists(st.text(), max_size=50))
def test_ngram_prefix_equiv(lines):
fenced = make_ngram_pipeline(3, 10)
unfenced = compose(make_tokenize(), make_ngrams(3))
assert list(fenced(lines)) == list(islice(unfenced(lines), 10))
Note: Properties prove laws across domains, including prefix equivalence, determinism, and early-stop cleanup.
6.4 Shrinking Demo¶
Bad (globals): Fails variants; breaks purity.
7. When Reusability Isn't Worth It¶
For tiny, throw-away one-offs; otherwise factories.
8. Pre-Core Quiz¶
- Factory for? → Reusable.
- Closures? → Capture config.
- Globals? → Avoid.
- Equiv? → Preserved.
- Reuse? → Fresh iter.
9. Post-Core Reflection & Exercise¶
Reflect: Find hardcoded; refactor to factory.
Project Exercise: Build a pipeline from an example (e.g., CSV or logs); test variants.
Final Notes: - Stages are pure adapters; retries, resource lifetimes, and fences are explicit wrappers. - Always document contiguity and boundedness per stage. - Prefer explicit composition (compose) over opaque chains. - For concurrency, capture immutable configs; never share open handles across factory instances.
Next: M03C07 – Fan-In and Fan-Out for Streams. (Builds on this.)
Repository Alignment¶
- Implementation:
module-03/funcpipe-rag-03/src/funcpipe_rag/api/config.py::make_gen_rag_fn. - Tests:
module-03/funcpipe-rag-03/tests/test_module_03.py::test_make_gen_rag_fn_equivalence/::test_make_gen_rag_fn_curries_arguments.
itertools Decision Table – Use This¶
| Tool | Use When | Memory | Pitfall | Safe? |
|---|---|---|---|---|
| chain | Concat many iterables | O(1) | None | Yes |
| groupby | Group contiguous equal items | O(1) | Must sort first if not contiguous | Yes |
| tee | Multiple consumers of same iterator | O(skew) | Unbounded skew → memory explosion | Careful |
| islice | Skip/take without consuming | O(1) | None | Yes |
| accumulate | Running totals/reductions | O(n) | Default op is + | Yes |
| compress | Filter by boolean mask | O(1) | None | Yes |
Further Reading: For the deepest itertools mastery, see the official docs and Dan Bader’s ‘Python Tricks’ chapter on iterators.
You now own the most powerful lazy streaming toolkit in Python. Module 4 will show you how to make even failure and resource cleanup lazy and pure.