Module 3: Lazy Iteration and Streaming¶
Progression Note¶
By the end of Module 3, you will master lazy generators, itertools mastery, and streaming pipelines that never materialize unnecessary data. This prepares you for safe recursion and error handling in streams (Module 4). See the series progression map in the repo root for full details.
Here's a snippet from the progression map:
| Module | Focus | Key Outcomes |
|---|---|---|
| 2 | First-Class Functions & Expressive Python | Configurable pure pipelines without globals |
| 3 | Lazy Iteration & Generators | Memory-efficient streaming, itertools mastery, short-circuiting |
| 4 | Recursion & Error Handling in Streams | Safe recursion, Result/Option, streaming errors |
M03C09: Designing Custom Iterator Types – Classes Implementing __iter__ / __next__¶
Core question:
How do you design custom iterator classes that implement__iter__and__next__for complex stateful logic, ensuring purity, laziness, and equivalence while enabling reuse beyond simple generators?
This core builds on Core 8's time-aware patterns by introducing custom iterator classes:
- Use separate Iterable factories and Iterator cursors for reusability.
- Implement __iter__ for factories (return fresh cursor), __next__ for cursors.
- Handle resources with context managers and .close().
- Preserve laziness, purity, and freshness.
We extend the running project from Core 8 (FuncPipe RAG Builder from m03-rag.md) and add cross-domain examples like stateful CSV readers, log followers with state, and API pagers to prove scalability.
Audience: Developers needing complex, stateful streams beyond generators.
Outcome: 1. Spot generator limits like no reuse. 2. Build class iterator in < 15 lines. 3. Prove iter laws with Hypothesis.
Laws (frozen, used across this core):
- E1 — Equivalence: iter(class_factory(S)) == gen_equiv(S).
- P1 — Purity: No globals; explicit state.
- R1 — Reusability: For any iterable X, iter(X) is not iter(X) and both iterators produce identical sequences.
- I1a — Iterator parity: iter(it) is it and after exhaustion, next(it) raises immediately.
- I1b — Iterable parity: iter(X) is not iter(X) and list(iter(X)) == list(iter(X)).
- CL1 — Cleanup: Resources released on .close() or __exit__.
- DTR — Determinism: Equal init/state → equal outputs.
- FR — Freshness: Factory calls independent.
Iterator vs Iterable in Python (memorise):
- Iterable: Has __iter__ returning an iterator (may be self or fresh cursor). Supports for x in obj: and iter(obj).
- Iterator: Has __next__ (raise StopIteration at end) and __iter__ returning self. Single-pass; exhausted after consumption.
Factories are Iterable; cursors are Iterator.
1. Conceptual Foundation¶
1.1 The One-Sentence Rule¶
Use separate Iterable factories and Iterator cursors for stateful, reusable iterators with explicit cleanup, when generators lack control.
1.2 Custom Iter in One Precise Sentence¶
Iterable factories return fresh Iterators; iterators implement
__next__logic,__iter__return self.
In this series, enables resources; preserves laziness.
1.3 Why This Matters Now¶
Generators one-shot; classes reusable with state.
1.4 Custom Iter in 5 Lines¶
Class example:
class MyIterable:
def __init__(self, data):
self.data = data
def __iter__(self):
return MyIter(self.data)
class MyIter:
def __init__(self, data):
self.data = data
self.i = 0
def __iter__(self): return self
def __next__(self):
if self.i >= len(self.data): raise StopIteration
val = self.data[self.i]; self.i += 1; return val
Reusable.
1.5 Minimal Iter Harness (Extends Core 8)¶
Build on Core 8; add class patterns:
from typing import Iterator, Iterable, TypeVar
T = TypeVar("T")
class BaseIterable(Iterable[T]):
def __iter__(self) -> Iterator[T]:
raise NotImplementedError
class BaseIter(Iterator[T]):
def __iter__(self) -> 'BaseIter[T]':
return self
def __next__(self) -> T:
raise NotImplementedError
def close(self):
pass
Use as base; e.g., class MyIterable(BaseIterable[T]): ...
2. Mental Model: Generator vs Class Iter¶
2.1 One Picture¶
Generators (Simple) Class Iters (Powerful)
+-----------------------+ +------------------------------+
| one-shot, no reuse | | stateful, reusable |
| ↓ | | ↓ |
| no cleanup control | | .close() resources, errors |
| lightweight | | testable, composable |
+-----------------------+ +------------------------------+
↑ Limited / Stateless ↑ Flexible / Stateful
2.2 Behavioral Contract¶
| Aspect | Generators | Class Iters |
|---|---|---|
| Reuse | No (exhausted) | Yes (reset state) |
| Cleanup | Auto | Explicit .close() |
| State | Suspended | Explicit attrs |
| Equivalence | Simple | Via properties |
Note on Generator Choice: Simple logic; else class.
When Not to Class: No state; use gen.
Known Pitfalls: - Forgotten iter return self. - State mutation leaks.
Forbidden Patterns: - For iterators: iter not returning self. - For iterables: iter returning self (violates R1/I1b). - Enforce with type checks.
Building Blocks Sidebar: - For iterators: iter return self. - For iterables: iter return fresh cursor. - next logic/raise. - .close() cleanup.
Resource Semantics: Classes handle close in .close().
Error Model: Raise in next; cleanup always.
Purity Note: Sources (files/APIs/logs) are effectful; purity claims apply to transforms. Cleanup is explicit via .close()/context managers.
3. Cross-Domain Examples: Proving Scalability¶
Production-grade examples using the harness. Each stateful, clean.
3.1 Example 1: Stateful CSV Reader (Class Iter)¶
from __future__ import annotations
from typing import Iterator, Iterable, Dict
import csv
from io import TextIOBase
class CsvRows(Iterable[Dict[str, str]]):
"""DictReader: first row becomes header keys."""
def __init__(self, path: str, *, dialect: str = "excel"):
self._path = path
self._dialect = dialect
def __iter__(self) -> Iterator[Dict[str, str]]:
return _CsvRowsIter(self._path, self._dialect)
class _CsvRowsIter(Iterator[Dict[str, str]]):
def __init__(self, path: str, dialect: str):
self._path = path
self._dialect = dialect
self._f: TextIOBase | None = None
self._rdr: csv.DictReader | None = None
def __iter__(self) -> "_CsvRowsIter":
return self
def __enter__(self):
self._open()
return self
def __exit__(self, exc_type, exc, tb):
self.close()
def _open(self) -> None:
if self._f is None:
self._f = open(self._path, newline="")
self._rdr = csv.DictReader(self._f, dialect=self._dialect)
def __next__(self) -> Dict[str, str]:
if self._rdr is None:
self._open()
try:
return next(self._rdr) # type: ignore[arg-type]
except StopIteration:
self.close()
raise
def close(self) -> None:
if self._f is not None:
self._f.close()
self._f = None
self._rdr = None
Why it's good: Cleanup on early stop/close; lazy open in next means plain iteration works.
Usage with guaranteed cleanup:
# Plain iteration (closes on natural exhaustion)
for row in CsvRows("data.csv"):
process(row)
# Early-stop guaranteed cleanup
with iter(CsvRows("data.csv")) as rows:
for row in rows:
process(row)
if done: break
3.2 Example 2: Stateful Log Follower (Class Iter)¶
import io, os, time
from typing import Iterator
class LogFollower(Iterable[str]):
def __init__(self, path: str, poll: float = 0.2):
self.path = path
self.poll = poll
def __iter__(self) -> Iterator[str]:
return _LogFollowerIter(self.path, self.poll)
class _LogFollowerIter(Iterator[str]):
def __init__(self, path: str, poll: float):
self.path = path
self.poll = poll
self._f: io.TextIOBase | None = None
self._ino: int | None = None
def __iter__(self) -> "_LogFollowerIter":
return self
def __enter__(self):
self._open()
return self
def __exit__(self, exc_type, exc, tb):
self.close()
def _open(self):
self._f = open(self.path, "r", encoding="utf8", errors="replace")
self._f.seek(0, io.SEEK_END)
self._ino = os.fstat(self._f.fileno()).st_ino
def __next__(self) -> str:
if self._f is None:
self._open()
while True:
line = self._f.readline()
if line:
return line.rstrip("\n")
time.sleep(self.poll)
try:
if os.stat(self.path).st_ino != self._ino:
self._f.close()
self._open()
except FileNotFoundError:
pass
def close(self):
if self._f is not None:
self._f.close()
self._f = None
Why it's good: Stateful rotation/cleanup.
3.3 Example 3: Stateful API Pager¶
from typing import Iterator, Callable, Any, Optional
class ApiPager(Iterable[dict[str, Any]]):
def __init__(self, fetch_page: Callable[[Optional[str]], dict[str, Any]]):
self._fetch_page = fetch_page
def __iter__(self) -> Iterator[dict[str, Any]]:
return _ApiPagerIter(self._fetch_page)
class _ApiPagerIter(Iterator[dict[str, Any]]):
def __init__(self, fetch_page: Callable[[Optional[str]], dict[str, Any]]):
self._fetch_page = fetch_page
self._token: Optional[str] = None
self._current_items: list[dict[str, Any]] = []
self._idx: int = 0
self._done: bool = False
def __iter__(self) -> "_ApiPagerIter":
return self
def __next__(self) -> dict[str, Any]:
while self._idx >= len(self._current_items):
if self._done:
raise StopIteration
page = self._fetch_page(self._token)
self._current_items = page.get("items", [])
self._idx = 0
self._token = page.get("next")
if not self._token:
self._done = True
if not self._current_items and self._done:
raise StopIteration
item = self._current_items[self._idx]
self._idx += 1
return item
Why it's good: Stateful token + intra-page cursor; no item loss or duplicate pages.
3.4 Example 4: Stateful Telemetry Window¶
from collections import deque
class RollingAvgSource(Iterable[dict]):
def __init__(self, src: Iterable[dict], w: int):
self._src = src
self._w = w
def __iter__(self):
return RollingAvgIter(self._src, self._w)
class RollingAvgIter(Iterator[dict]):
def __init__(self, src: Iterable[dict], w: int):
self._src = iter(src)
self._w = w
self._buf = deque(maxlen=w)
def __iter__(self):
return self
def __next__(self) -> dict:
if len(self._buf) < self._w:
while len(self._buf) < self._w:
self._buf.append(next(self._src))
else:
self._buf.append(next(self._src))
avg = sum(d["value"] for d in self._buf) / self._w
return {"avg": avg, "end_ts": self._buf[-1]["ts"]}
Why it's good: Stateful buffer; fresh on each iter(RollingAvgSource(...)).
3.5 Example 5: Stateful FS Walker¶
import os
class FsWalker(Iterable[str]):
def __init__(self, root: str):
self.root = root
def __iter__(self):
return _FsWalkerIter(self.root)
class _FsWalkerIter(Iterator[str]):
def __init__(self, root: str):
self.walk = os.walk(root)
self.dirpath = None
self.files = []
def __iter__(self):
return self
def __next__(self) -> str:
while not self.files:
self.dirpath, _, self.files = next(self.walk)
fn = self.files.pop(0)
return os.path.join(self.dirpath, fn)
Why it's good: Stateful walk; fresh on each iter(FsWalker(...)).
3.6 Example 6: Stateful N-Gram¶
class NGramSource(Iterable[tuple[str, ...]]):
def __init__(self, toks_iterables: Iterable[list[str]], n: int):
self._toks_iterables = toks_iterables
self._n = n
def __iter__(self):
return NGramIter(self._toks_iterables, self._n)
class NGramIter(Iterator[tuple[str, ...]]):
def __init__(self, toks_iterables: Iterable[list[str]], n: int):
self._outer = iter(toks_iterables)
self._n = n
self._buf: list[str] = []
self._i = 0 # sliding index within buffer
def __iter__(self):
return self
def __next__(self) -> tuple[str, ...]:
while self._i + self._n > len(self._buf):
self._buf.extend(next(self._outer))
gram = tuple(self._buf[self._i:self._i + self._n])
self._i += 1
if self._i > 1024:
self._buf = self._buf[self._i:]
self._i = 0
return gram
Why it's good: Stateful overlap; fresh on each iter(NGramSource(...)).
3.7 Running Project: Stateful RAG Chunker¶
Extend RAG with class chunker:
class RagChunks(Iterable[ChunkWithoutEmbedding]):
def __init__(self, docs: Iterable[RawDoc], env: RagEnv, max_chunks: int):
self._docs = docs
self._env = env
self._max = max_chunks
def __iter__(self):
return RagChunker(self._docs, self._env, self._max)
class RagChunker(Iterator[ChunkWithoutEmbedding]):
def __init__(self, docs: Iterable[RawDoc], env: RagEnv, max_chunks: int):
self._docs = iter(docs)
self._env = env
self._max = max_chunks
self._emitted = 0
self._cur: Iterator[ChunkWithoutEmbedding] | None = None
def __iter__(self):
return self
def __next__(self) -> ChunkWithoutEmbedding:
if self._emitted >= self._max:
raise StopIteration
while True:
if self._cur is None:
d = next(self._docs) # may raise StopIteration
self._cur = gen_overlapping_chunks(d.doc_id, d.abstract, k=self._env.chunk_size, o=self._env.overlap, tail_policy=self._env.tail_policy)
try:
ch = next(self._cur)
self._emitted += 1
return ch
except StopIteration:
self._cur = None
Wins: Stateful count/cleanup; fresh on each iter(RagChunks(...)).
4. Anti-Patterns and Fixes¶
- Iterator without iter returning self: Breaks iterator parity (I1a). Fix: def iter(self): return self.
- Iterable returning self in iter: Violates reusability (R1/I1b). Fix: Return fresh cursor.
- No Cleanup: Leaks resources. Fix: .close() close.
5. Equational Reasoning: Substitution Exercise¶
Hand Exercise: Class to gen → equiv.
Bug Hunt: No cleanup; explicit del.
6. Property-Based Testing: Proving Equivalence (Advanced, Optional)¶
6.1 Custom Strategy¶
As previous.
6.2 Properties¶
from hypothesis import given, strategies as st
import pytest
@given(st.lists(st.integers(), max_size=50))
def test_iter_self(xs):
it = MyIter(xs)
assert iter(it) is it
@given(st.lists(st.text(), max_size=50))
def test_iterable_freshness(lines):
src = MyIterable(lines)
it1 = iter(src)
it2 = iter(src)
assert it1 is not it2 # R1/I1b
assert list(it1) == list(it2) == lines
@given(st.lists(st.integers(), max_size=50))
def test_iterator_parity(xs):
it = MyIter(xs)
assert iter(it) is it
assert list(it) == xs
with pytest.raises(StopIteration):
next(it)
@given(st.text(min_size=1), st.lists(st.text(), max_size=20))
def test_csv_equiv(header, cells, tmp_path):
p = tmp_path / "test.csv"
with p.open("w", newline="") as f:
f.write(header + "\n")
for c in cells:
f.write(c + "\n")
rows = list(CsvRows(str(p)))
assert rows == [{header: c} for c in cells]
def test_cleanup(tmp_path):
p = tmp_path / "test.csv"
p.write_text("a,b\n1,2\n")
with iter(CsvRows(str(p))) as it:
next(it)
# file closed by __exit__ on the cursor
# (in practice, test by attempting exclusive open or fd check)
6.3 Additional for Examples¶
Similar; e.g., class-CSV == gen equiv.
6.4 Shrinking Demo¶
Bad (no .close()): Leaks resources.
7. When Class Isn't Worth It¶
Simple logic; else class.
8. Pre-Core Quiz¶
- Class for? → Stateful iter.
- iter for iterators? → Return self.
- .close()? → Cleanup.
- Equiv? → Preserved.
- Reuse? → Fresh init.
9. Post-Core Reflection & Exercise¶
Reflect: Find gen limits; refactor to class.
Project Exercise: Make RAG chunker class; test cleanup.
Final Notes: - Classes for control; gens for simple. - Document state per class. - Test cleanup/del. - For async iters, see future.
Next: Core 10 – Observability for Streams. (Builds on this.)
Repository Alignment¶
- Implementation:
module-03/funcpipe-rag-03/src/funcpipe_rag/core/structural_dedup.py::DedupIterator. - Tests:
module-03/funcpipe-rag-03/tests/test_module_03.py::test_dedup_iterator_preserves_order.
itertools Decision Table – Use This¶
| Tool | Use When | Memory | Pitfall | Safe? |
|---|---|---|---|---|
| chain | Concat many iterables | O(1) | None | Yes |
| groupby | Group contiguous equal items | O(1) | Must sort first if not contiguous | Yes |
| tee | Multiple consumers of same iterator | O(skew) | Unbounded skew → memory explosion | Careful |
| islice | Skip/take without consuming | O(1) | None | Yes |
| accumulate | Running totals/reductions | O(n) | Default op is + | Yes |
| compress | Filter by boolean mask | O(1) | None | Yes |
Further Reading: For the deepest itertools mastery, see the official docs and Dan Bader’s ‘Python Tricks’ chapter on iterators.
You now own the most powerful lazy streaming toolkit in Python. Module 4 will show you how to make even failure and resource cleanup lazy and pure.