Module 3: Lazy Iteration and Streaming¶
Progression Note¶
By the end of Module 3, you will master lazy generators, itertools mastery, and streaming pipelines that never materialize unnecessary data. This prepares you for safe recursion and error handling in streams (Module 4). See the series progression map in the repo root for full details.
Here's a snippet from the progression map:
| Module | Focus | Key Outcomes |
|---|---|---|
| 2 | First-Class Functions & Expressive Python | Configurable pure pipelines without globals |
| 3 | Lazy Iteration & Generators | Memory-efficient streaming, itertools mastery, short-circuiting |
| 4 | Recursion & Error Handling in Streams | Safe recursion, Result/Option, streaming errors |
M03C04: Chunking, Windowing, Grouping – Streaming Aggregations¶
Core question:
How do you implement lazy chunking, sliding windows, and contiguous grouping in streaming pipelines to perform aggregations without materializing full collections, while preserving order and coverage?
This core builds on Core 3's itertools composition by introducing the final classic patterns for streaming aggregations: - Fixed-size chunking with configurable overlap and tail policy. - Sliding windows with bounded deque auxiliary space. - Contiguous grouping with streaming per-group processing. - All purely lazy, with mathematically tight coverage and demand laws.
We continue the running project from m03-rag.md, now adding overlapping chunks (for better retrieval recall) and per-doc grouping (for future dedup/embedding stages).
Audience: Developers who have lazy pipelines but still materialise lists for chunking, windowing, or grouping — and pay the memory price.
Outcome:
1. Spot any for i in range(0, len(text), step): text[i:i+k] loop and instantly know you can make it lazy with overlap and tail handling.
2. Refactor it to a streaming generator in < 10 lines.
3. Write a Hypothesis property proving perfect reconstruction/coverage + exact demand bounds.
Laws (frozen, used across this core): - E1 — Equivalence: For any finite re-iterable sequence xs: list(pipe(xs)) == eager_equiv(list(xs)). - O1 — Global order: Output preserves input order (chunks/windows in appearance order). - O2 — Monotone per-key: Within each key/doc_id, chunk.start values are non-decreasing. - C1 — Coverage (chunking): - tail_policy="emit_short": perfect reconstruction possible (concat with overlap drop). - tail_policy="pad": perfect reconstruction possible after stripping "\0". - tail_policy="drop": reconstruction covers the maximal prefix obtainable by a first chunk of length k followed by zero or more steps of size (k−o); whenever chunks are emitted we have len(rec) = k + m·(k−o) and len(text) − len(rec) < (k−o).
- D1 — Demand-boundedness:
- Sliding windows (deque): producing n windows pulls ≤ n + (w-1) upstream elements.
- Chunking over indexable text: no upstream iterator pull; exactly one slice per emitted chunk.
- G1 — Groupby: Only valid if equal keys occur contiguously (by construction, not sorting).
1. Conceptual Foundation¶
1.1 The One-Sentence Rule¶
Implement chunking with fixed-step indexing, sliding windows with deque(maxlen=w), and grouping with groupby only on contiguous keys — never materialise the full collection for aggregation.
1.2 Streaming Aggregations in One Precise Sentence¶
Chunking divides indexable sequences into overlapping blocks, sliding windows emit bounded overlapping tuples via deque, and contiguous grouping streams (key, sub-iterator) pairs for per-group reductions — all with O(max(window, group)) auxiliary memory.
1.3 Why This Matters Now¶
Core 3 gave you composition primitives (chain, islice, groupby, tee).
In real pipelines you still need to break text into chunks, create context windows, or aggregate per document — and doing it eagerly kills the laziness you just won.
These three patterns are the final building blocks that make truly unbounded, memory-constant pipelines possible.
1.4 Streaming Aggregations in 5 Lines¶
Sliding window with deque:
from collections import deque
def sliding(xs, w: int):
buf = deque(maxlen=w)
for x in xs:
buf.append(x)
if len(buf) == w:
yield tuple(buf)
Input: a b c d e f g
Window size 3:
[a b c]
[b c d]
[c d e]
[d e f]
[e f g]
Bounded aux O(w); lazy; produces exactly len(xs) - w + 1 windows.
2. Mental Model: Eager vs Streaming Aggregations¶
2.1 One Picture¶
Eager Aggregations (Full Materialize) Streaming Aggregs (Lazy/Bounded)
+-------------------------------+ +------------------------------+
| list(xs) → windows/chunks | | xs → yield windows/chunks |
| → dict-of-lists | | → groupby (contig) |
| full everything in memory | | ↓ |
| O(n) allocation | | O(max window/group) aux |
+-------------------------------+ +------------------------------+
↑ OOM risk ↑ Unbounded-safe default
2.2 Behavioral Contract¶
| Aspect | Eager (Lists/Dicts) | Streaming (Generators) |
|---|---|---|
| Memory | O(n) full collections | O(max window/group) bounded |
| Computation | All upfront | Per-item or per-group on demand |
| Purity | Mutation possible | Pure yields |
| Short-circuit | No | Yes (islice fence) |
| Equivalence | Trivial | Proven via reconstruction (C1) |
When Eager Wins: Tiny data you will random-access many times.
Known Pitfalls: - groupby on non-contiguous keys silently splits groups. - Sliding windows with deque: must consume fully or use tuple(buf) snapshots. - Chunking may split grapheme clusters (code-point only).
Unicode Policy: Pure code-point slicing (Python native). For grapheme-aware chunking use third-party libraries (not in stdlib).
Forbidden Patterns (CI-greppable):
- list( or sorted( or [* in core pipeline files.
- dict[ grouping without bounded groups.
3. Running Project: Windowed & Grouped Chunks in RAG¶
3.1 Types (Canonical, Used Throughout)¶
Extend RagEnv for overlap and tail policy (as defined in rag_types.py):
from dataclasses import dataclass
@dataclass(frozen=True)
class RagEnv:
chunk_size: int
overlap: int = 0 # New: 0 ≤ overlap < chunk_size
tail_policy: str = "emit_short" # "emit_short" | "drop" | "pad"
3.2 Eager Start (Anti-Pattern)¶
# Eager overlapping chunks + grouping
chunks = []
for cd in list(gen_clean_docs(docs)):
text = cd.abstract
for i in range(0, len(text), env.chunk_size - env.overlap):
j = i + env.chunk_size
s = text[i:j]
if s:
chunks.append(ChunkWithoutEmbedding(cd.doc_id, s, i, j))
grouped = {}
for c in chunks:
grouped.setdefault(c.doc_id, []).append(c) # Full dict-of-lists
Smells: Full materialization of all chunks and all groups.
4. Refactor to Lazy: Aggregations in RAG¶
4.1 Lazy Core – The Final RAG Patterns¶
from __future__ import annotations
from collections.abc import Iterable, Iterator
from collections import deque
from itertools import groupby, chain
from operator import attrgetter
from rag_types import RawDoc, CleanDoc, RagEnv, ChunkWithoutEmbedding
from core2 import gen_clean_docs # M03C02
from core3 import ensure_contiguous # M03C03
def gen_overlapping_chunks(
doc_id: str,
text: str,
*,
k: int,
o: int = 0,
tail_policy: str = "emit_short"
) -> Iterator[ChunkWithoutEmbedding]:
"""
Laws:
C1 (coverage): With emit_short/pad → perfect reconstruction possible.
With drop → covers the maximal floor-division prefix.
D1: Exactly one slice per emitted chunk.
Note: For tail_policy="pad", logical end j may exceed len(text).
"""
if k <= 0 or not 0 <= o < k:
raise ValueError("invalid chunk/overlap")
step = k - o
n = len(text)
i = 0
while i < n:
j = i + k
short_tail = j > n
if short_tail and tail_policy == "drop":
break
segment = text[i:j]
if short_tail and tail_policy == "pad":
segment += "\0" * (k - len(segment))
j = i + k # logical end includes padding
if segment:
yield ChunkWithoutEmbedding(doc_id, segment, i, j)
i += step
def sliding_windows(xs: Iterable, w: int) -> Iterator[tuple]:
"""D1: producing n windows pulls ≤ n + (w-1) upstream elements."""
if w <= 0: raise ValueError
buf = deque(maxlen=w)
it = iter(xs)
# Prime the buffer
for _ in range(w - 1):
try:
buf.append(next(it))
except StopIteration:
return
for x in it:
buf.append(x)
yield tuple(buf)
def gen_grouped_chunks(
chunks: Iterable[ChunkWithoutEmbedding]
) -> Iterator[tuple[str, Iterator[ChunkWithoutEmbedding]]]:
"""G1: requires contiguity by construction."""
chunks = ensure_contiguous(chunks, key=attrgetter("doc_id"))
yield from groupby(chunks, key=attrgetter("doc_id"))
# Example streaming aggregation
def gen_per_doc_chunk_counts(
chunks: Iterable[ChunkWithoutEmbedding]
) -> Iterator[tuple[str, int]]:
for doc_id, grp in gen_grouped_chunks(chunks):
yield doc_id, sum(1 for _ in grp) # consumes group immediately
# Composed pipeline
def gen_rag_chunks(docs: Iterable[RawDoc], env: RagEnv) -> Iterator[ChunkWithoutEmbedding]:
cleaned = gen_clean_docs(docs)
yield from chain.from_iterable(
gen_overlapping_chunks(cd.doc_id, cd.abstract, k=env.chunk_size, o=env.overlap, tail_policy=env.tail_policy)
for cd in cleaned
)
Wins: Perfect coverage with overlap; bounded memory; streaming per-doc ops.
Complexity: | Pattern | Time | Aux Space | Notes | |------------------|---------------|-----------------|------------------------------------| | Overlap chunking | Θ(len(text)) | O(1) | One slice per chunk; end may > len(text) with pad | | Sliding windows | Θ(n) | O(w) | Exactly n - w + 1 windows | | Contiguous group | Θ(n) | O(current run) | Must fully consume each group |
Note: Always fully consume a groupby sub-iterator before requesting the next group.
Re-iterability Warning: Single-pass; consume groups immediately.
4.2 Lazy Shell (Edge Only)¶
chunked = gen_rag_chunks(read_docs(path), env)
grouped = gen_grouped_chunks(chunked)
for doc_id, grp in grouped:
write_doc_chunks(doc_id, grp) # atomic per doc if needed
Note: Fully streaming; atomic per group possible.
5. Equational Reasoning: Substitution Exercise¶
Hand Exercise: Inline overlapping chunker → yields with perfect reconstruction under emit_short/pad.
Bug Hunt: Eager materialises all chunks; streaming emits on demand with bounded work.
6. Property-Based Testing: Proving Equivalence (Advanced, Optional)¶
6.1 Custom Strategy¶
text_st = st.text(min_size=0, max_size=1000)
k_st = st.integers(min_value=1, max_value=128)
o_st = st.integers(min_value=0, max_value=127)
6.2 Full Suite (All Laws Enforced)¶
@given(text_st, k_st, o_st)
def test_chunking_coverage_emit_short(text, k, o):
assume(o < k)
chunks = list(gen_overlapping_chunks("id", text, k=k, o=o, tail_policy="emit_short"))
rec = chunks[0].text if chunks else ""
rec += "".join(c.text[o:] for c in chunks[1:])
assert rec == text
@given(text_st, k_st, o_st)
def test_chunking_coverage_pad(text, k, o):
assume(o < k)
chunks = list(gen_overlapping_chunks("id", text, k=k, o=o, tail_policy="pad"))
rec = chunks[0].text if chunks else ""
rec += "".join(c.text[o:] for c in chunks[1:])
assert rec.rstrip("\0") == text
@given(text_st, k_st, o_st)
def test_chunking_coverage_drop(text, k, o):
assume(o < k)
chunks = list(gen_overlapping_chunks("id", text, k=k, o=o, tail_policy="drop"))
rec = chunks[0].text if chunks else ""
rec += "".join(c.text[o:] for c in chunks[1:])
assert text.startswith(rec)
if chunks and len(text) >= k:
step = k - o
assert (len(rec) - k) % step == 0
assert len(text) - len(rec) < step
@given(st.lists(st.integers(), min_size=0, max_size=200), st.integers(1, 20))
def test_sliding_window_demand_and_coverage(data, w):
pulls = 0
def counted():
nonlocal pulls
for x in data:
pulls += 1
yield x
windows = list(sliding_windows(counted(), w))
expected_windows = max(0, len(data) - w + 1)
assert len(windows) == expected_windows
assert pulls <= len(windows) + (w - 1)
# perfect reconstruction when len(data) >= w
if len(data) >= w:
recon = list(windows[0])
recon.extend(win[-1] for win in windows[1:])
assert recon == data
6.3 Shrinking Demo: Catching a Real Bug¶
Bad version (no overlap handling in reconstruction):
Fails reconstruction law on any overlap > 0.
7. When Aggregations Aren't Worth It¶
Non-contiguous keys (sort first, paying O(n log n)); truly random access (materialise).
8. Pre-Core Quiz¶
- Window aux space? → O(w).
- Groupby needs contiguous keys? → Yes — guard or sort.
- Chunking slices per chunk? → Exactly one.
- Coverage with overlap? → Perfect under emit_short/pad.
- Demand for n windows? → ≤ n + (w-1).
9. Post-Core Reflection & Exercise¶
Reflect: Find any remaining list-building aggregation in your codebase — it is now obsolete.
Project Exercise: Add overlap=128 and tail_policy="emit_short" to your RAG env. Verify perfect reconstruction on a sample abstract and that memory stays flat on the 10k dataset.
Next: M03C05 – Infinite & Unbounded Sequences. (Builds on this.)
Repository Alignment¶
- Implementation:
module-03/funcpipe-rag-03/src/funcpipe_rag/api/core.py::gen_overlapping_chunks,sliding_windows,gen_grouped_chunks. - Tests:
module-03/funcpipe-rag-03/tests/test_module_03.py::test_overlapping_coverage,::test_sliding_demand.
itertools Decision Table – Use This¶
| Tool | Use When | Memory | Pitfall | Safe? |
|---|---|---|---|---|
| chain | Concat many iterables | O(1) | None | Yes |
| groupby | Group contiguous equal items | O(1) | Must sort first if not contiguous | Yes |
| tee | Multiple consumers of same iterator | O(skew) | Unbounded skew → memory explosion | Careful |
| islice | Skip/take without consuming | O(1) | None | Yes |
| accumulate | Running totals/reductions | O(n) | Default op is + | Yes |
| compress | Filter by boolean mask | O(1) | None | Yes |
Further Reading: For the deepest itertools mastery, see the official docs and Dan Bader’s ‘Python Tricks’ chapter on iterators.
You now own the most powerful lazy streaming toolkit in Python. Module 4 will show you how to make even failure and resource cleanup lazy and pure.