Module 3: Lazy Iteration and Streaming¶
Progression Note¶
By the end of Module 3, you will master lazy generators, itertools mastery, and streaming pipelines that never materialize unnecessary data. This prepares you for safe recursion and error handling in streams (Module 4). See the series progression map in the repo root for full details.
Here's a snippet from the progression map:
| Module | Focus | Key Outcomes |
|---|---|---|
| 2 | First-Class Functions & Expressive Python | Configurable pure pipelines without globals |
| 3 | Lazy Iteration & Generators | Memory-efficient streaming, itertools mastery, short-circuiting |
| 4 | Recursion & Error Handling in Streams | Safe recursion, Result/Option, streaming errors |
M03C07: Fan-In and Fan-Out for Streams – Merging, Splitting, and Multicasting Iterators Safely¶
Core question:
How do you safely merge multiple input streams (fan-in) and split or multicast a single stream to multiple consumers (fan-out) in lazy, pure iterator pipelines without breaking equivalence or introducing side-effects?
This core builds on Core 6's reusable stages by introducing fan-in/fan-out patterns:
- Use chain/chain.from_iterable for sequential fan-in, roundrobin for fair interleaving, heapq.merge for sorted fan-in.
- Use fork2_lockstep for strict 1:1 fan-out; bounded multicast for variable-cardinality or independent consumption.
- Handle independence, buffering, and fairness.
- Preserve laziness, purity, and freshness.
We extend the running project from Core 6 (FuncPipe RAG Builder from m03-rag.md) and add cross-domain examples like multi-log merging, stream splitting for analytics, and API fan-in to prove scalability.
Audience: Developers building scalable, multi-source pipelines needing merge/split without materialization.
Outcome: 1. Identify single-stream limitations. 2. Add fan-in/out in < 10 lines. 3. Prove merge/fork laws with Hypothesis.
Laws (frozen, used across this core):
- E1 — Equivalence: merged/fanned == eager_equiv (no reordering unless specified).
- P1 — Purity: No side-effects; explicit configs.
- R1 — Reusability: Factories yield fresh multis.
- M1 — Merge order:
- chain: sequential (preserves per-source order).
- roundrobin: interleaving without starvation (in practice, no source is delayed indefinitely).
- heapq.merge: sorted by key, stable per-source for ties.
- T1 — Tee/multicast cost: Memory overhead is Θ(Δ), where Δ is the maximum difference in items consumed between the fastest and slowest branch; lockstep fork keeps Δ ≤ 1; bounded multicast enforces Δ≤maxlen else raises BufferError.
- DTR — Determinism: Equal inputs/config → equal outputs.
- FR — Freshness:
- For Source factories: src() and src() are independent iterators.
- For Transform factories: mk = factory(config); mk(xs) and mk(xs) are independent iterators.
- Fencing after fan-out: After any fan-out, fence each branch before any amplifying transform (n-grams, chunking, network I/O).
1. Conceptual Foundation¶
1.1 The One-Sentence Rule¶
Use chain/roundrobin/merge for fan-in and fork2_lockstep/bounded multicast for fan-out to handle multi-streams lazily, ensuring independence without full materialization.
1.2 Fan-In/Out in One Precise Sentence¶
Fan-in merges sources (chain seq, roundrobin interleaving, merge sorted); fan-out forks/multicasts for independent consumers with explicit buffering discipline.
1.3 Why This Matters Now¶
Single-stream pipes limit scalability; fan-in/out enables multi-source/consumer without eagerness.
1.4 Fan-In/Out in 5 Lines¶
Fan-in example:
Fan-out:
Safe, lazy.
1.5 Minimal Fan Harness (Extends Core 6)¶
Build on Core 6 harness; add fan helpers. We reuse the functional pipeline types and combinators from M03C06:
Transform, compose, ffilter, fence_k, and source_to_transform.
from typing import TypeVar, Iterable, Iterator, Callable, Any
import heapq
from itertools import tee
from collections import deque
A = TypeVar("A"); B = TypeVar("B"); C = TypeVar("C"); T = TypeVar("T")
Source = Callable[[], Iterator[T]]
def make_chain(*srcs: Source[T]) -> Source[T]:
def merged() -> Iterator[T]:
for s in srcs:
yield from s()
return merged
def make_roundrobin(*srcs: Source[T]) -> Source[T]:
def merged() -> Iterator[T]:
active = [s() for s in srcs]
while active:
nxt = []
for it in active:
try:
yield next(it)
nxt.append(it)
except StopIteration:
pass
active = nxt
return merged
def make_merge(*srcs: Source[T], key: Callable[[T], Any] | None = None, reverse: bool = False) -> Source[T]:
def merged() -> Iterator[T]:
iters = [s() for s in srcs]
yield from heapq.merge(*iters, key=key, reverse=reverse)
return merged
def fork2_lockstep(t: Transform[A,B], u: Transform[A,C]) -> Transform[A, tuple[B,C]]:
"""
Strict 1:1 lockstep fan-out.
Raises ValueError on cardinality mismatch (one branch shorter/longer).
"""
def stage(xs: Iterable[A]) -> Iterator[tuple[B,C]]:
a, b = tee(xs, 2)
it1, it2 = t(a), u(b)
while True:
try:
v1 = next(it1)
except StopIteration:
# ensure second branch also exhausted
try:
next(it2)
raise ValueError("fork2_lockstep: second branch produced extra items")
except StopIteration:
return
try:
v2 = next(it2)
except StopIteration:
raise ValueError("fork2_lockstep: second branch shorter than first")
yield (v1, v2)
return stage
_SENTINEL = object()
def multicast_bounded(xs: Iterable[T], n: int, maxlen: int = 1024) -> tuple[Iterator[T], ...]:
"""
Bounded multicast.
If any consumer lags by more than maxlen items, raises BufferError.
If one branch is never drained, the producer will eventually hit BufferError when skew reaches maxlen.
"""
upstream = iter(xs)
qs = [deque() for _ in range(n)]
done = False
def pump_once():
nonlocal done
if done: return
try:
x = next(upstream)
except StopIteration:
done = True
for q in qs: q.append(_SENTINEL)
return
for q in qs:
if len(q) >= maxlen:
raise BufferError(f"multicast buffer exceeded (maxlen={maxlen})")
q.append(x)
def sub(i: int) -> Iterator[T]:
while True:
if not qs[i]:
pump_once()
y = qs[i].popleft()
if y is _SENTINEL:
return
yield y
return tuple(sub(i) for i in range(n))
Use with compose; e.g., compose(source_to_transform(make_chain(s1, s2)), ...). Use fork2_lockstep only for strict 1:1 stages; otherwise use bounded multicast or separate sinks. Always fence branches before amplification.
2. Mental Model: Single vs Multi-Stream¶
2.1 One Picture¶
Single-Stream (Limited) Multi-Stream (Scalable)
+-----------------------+ +------------------------------+
| one src → one sink | | multi-src → merge → split |
| ↓ | | ↓ |
| rigid, no parallel | | fan-in/out, independent |
| reuse = rebuild | | lazy, pure, testable |
+-----------------------+ +------------------------------+
↑ Bottlenecked ↑ Composable / Multi
2.2 Behavioral Contract¶
| Aspect | Single-Stream | Multi-Stream (Fan-In/Out) |
|---|---|---|
| Sources | One | Many (merged) |
| Consumers | One | Many (forked/multicast) |
| Order | As-is | Seq (chain), interleaving (roundrobin), or sorted (merge) |
| Independence | N/A | Fork/multicast ensures |
Note on Single Choice: For simple; else multi.
When Not to Fan: No multi needs; use Core 6.
Known Pitfalls: - Fork requires strict 1:1 cardinality; mismatch raises ValueError. - Merge requires sorted inputs.
Forbidden Patterns: - Multiple consumes without fork/multicast. - Enforce with no repeated iter calls.
Building Blocks Sidebar: - chain for seq fan-in. - roundrobin for interleaving. - heapq.merge for sorted. - fork2_lockstep for strict 1:1 fan-out.
Resource Semantics: Fans close resources on exhaustion.
Error Model: Propagate; no auto-retry (use Core 6 wrappers).
Backpressure: Fence after merge/before split; multicast raises on buffer exceed.
3. Cross-Domain Examples: Proving Scalability¶
Production-grade examples using the harness. Each pure, lazy.
3.1 Example 1: Merging Multiple CSV Sources (Fan-In)¶
def make_multi_csv_pipeline(paths: list[str], max_rows: int) -> Transform[None, Dict[str, Any]]:
srcs = [make_csv_source(p) for p in paths]
merged_src = make_chain(*srcs)
return compose(
source_to_transform(merged_src),
ffilter(lambda r: r.get("status") == "active"),
make_project({"id": "user_id", "amount": "total"}),
make_cast({"amount": float}),
fence_k(max_rows),
)
Why it's good: Lazy merge of files; no load all.
Contiguity/Boundedness: Finite sources; fence caps.
3.2 Example 2: Log Merge with Time Sort (Sorted Fan-In)¶
def make_multi_log_pipeline(paths: list[str], pattern: str, k: int) -> Transform[None, str]:
srcs = [make_log_source(p) for p in paths]
merged_src = make_merge(*srcs, key=parse_ts) # parse_ts: str -> comparable
return compose(
source_to_transform(merged_src),
make_regex_filter(pattern),
fence_k(k),
)
Precondition: each input is individually non-decreasing under parse_ts; otherwise use roundrobin or a bounded-lateness merge.
Why it's good: Sorted merge without sort all; assumes per-log sorted.
Contiguity/Boundedness: Infinite; fence ensures termination.
3.3 Example 3: API Fan-In from Multiple Endpoints¶
def make_multi_api_pipeline(fetch_pages: list[Callable], pred: Callable, k: int) -> Transform[None, Dict]:
srcs = [lambda f=f: pager(f, attempts=2) for f in fetch_pages]
merged_src = make_roundrobin(*srcs) # interleaving to avoid starvation
return compose(
source_to_transform(merged_src),
ffilter(pred),
fence_k(k),
)
Why it's good: Interleaving merge of APIs.
3.4 Example 4: Telemetry Split for Multi-Agg (Fan-Out)¶
def values_after_w(w: int):
from collections import deque
def stage(xs):
buf = deque(maxlen=w)
for d in xs:
buf.append(d)
if len(buf) == w:
yield d['value']
return stage
def make_telemetry_pipeline(src: Source[dict], w: int) -> Transform[None, tuple[dict, float]]:
avg_stage = make_rolling_avg_by_device(w)
total_stage = values_after_w(w)
fork = fork2_lockstep(avg_stage, total_stage)
return compose(source_to_transform(src), fork)
Why it's good: Fork for parallel aggs; strict lockstep enforces 1:1 cardinality.
3.5 Example 5: FS Hash with Split for Checksum/Size¶
def make_sha256_with_size() -> Transform[str, tuple[str, str, int]]:
def stage(paths: Iterable[str]) -> Iterator[tuple[str, str, int]]:
for p in paths:
h = hashlib.sha256(); size = 0
with open(p, "rb") as f:
for chunk in iter(lambda: f.read(1024*1024), b""):
size += len(chunk); h.update(chunk)
yield (p, h.hexdigest(), size)
return stage
def make_fs_pipeline(root: str) -> Transform[None, tuple[str, str, int]]:
return compose(
source_to_transform(make_walk_source(root)),
make_ext_filter({'.py'}),
make_sha256_with_size(),
)
Why it's good: Single-pass compute; no split needed.
3.6 Example 6: Text N-Grams with Bounded Multicast (Fan-Out for Variable Cardinality)¶
For variable-cardinality (e.g., n-grams amplify), use multicast and separate sinks or keyed join.
import heapq
from collections import Counter
from itertools import islice
def topk(k: int):
def stage(xs):
counts = Counter()
for x in xs:
counts[x] += 1
if len(counts) > 10_000: # safety cap; document policy
counts = Counter(dict(counts.most_common(5_000)))
yield from heapq.nlargest(k, counts.items(), key=lambda kv: kv[1])
return stage
def make_ngram_multi(n: int, k: int) -> Transform[str, tuple[tuple[str,...], int]]:
base = compose(make_tokenize(), make_ngrams(n))
def pipe(lines: Iterable[str]) -> Iterator[tuple]:
a,b = multicast_bounded(base(lines), 2, maxlen=1000)
freqs = topk(k)(a)
# bounded distinct example (instead of unbounded set)
distinct = set(islice(b, 1000)) # explicit bound
# Keyed join or separate outputs; here assume post-process
for f in freqs: yield f[0], f[1] # e.g., top freqs
return pipe
Why it's good: Bounded multicast for derived stats; explicit buffer policy.
3.7 Running Project: Multi-Source RAG (Fan-In)¶
Extend RAG with multi-doc sources:
def make_multi_rag_fn(env: RagEnv, max_chunks: int, sources: list[Source[RawDoc]]) -> Callable[[], Iterator[ChunkWithoutEmbedding]]:
merged_src = make_chain(*sources)
def pipe() -> Iterator[ChunkWithoutEmbedding]:
cleaned = gen_clean_docs(merged_src())
yield from gen_bounded_chunks(cleaned, env, max_chunks)
return pipe
Wins: Merge docs lazily.
4. Anti-Patterns and Fixes¶
- No Fork/Multicast Consume: Multi-iter without fork/multicast exhausts. Fix: Always fork/multicast.
- Eager Merge: List all before process. Fix: Chain lazy.
- Unbounded Buffer: Tee with skew OOM. Fix: Fork lockstep or multicast bounded.
- Unfair Merge: Chain starves later srcs. Fix: Roundrobin for interleaving.
5. Equational Reasoning: Substitution Exercise¶
Hand Exercise: Inline fan → equiv single.
Bug Hunt: No fork/multicast; fan explicit.
6. Property-Based Testing: Proving Equivalence (Advanced, Optional)¶
6.1 Custom Strategy¶
As previous.
6.2 Properties¶
from hypothesis import given, strategies as st
import itertools as it
@given(st.lists(st.integers(), min_size=0, max_size=200))
def test_chain_equiv(xs):
s1 = lambda: iter(xs)
s2 = lambda: iter(xs)
merged = list(make_chain(s1, s2)())
assert merged == xs + xs
@given(st.lists(st.integers(), min_size=0, max_size=200))
def test_tee_independence(xs):
a, b = tee(iter(xs), 2)
_ = list(it.islice(a, 3))
assert list(b) == xs # tee buffers 3
@given(st.lists(st.integers(), min_size=0, max_size=200))
def test_merge_stability_ties(xs):
xs_sorted = sorted(xs)
s1 = lambda: ((0, x) for x in xs_sorted) # tag source 0
s2 = lambda: ((1, x) for x in xs_sorted) # tag source 1
out = list(make_merge(s1, s2, key=lambda p: p[1])())
seen0 = [x for src, x in out if src == 0]
seen1 = [x for src, x in out if src == 1]
assert seen0 == xs_sorted
assert seen1 == xs_sorted # stable per-source for ties
@given(st.lists(st.lists(st.integers()), min_size=1, max_size=4))
def test_roundrobin_interleaving(ls):
srcs = [lambda l=l: iter(l) for l in ls]
out = list(make_roundrobin(*srcs)())
# multiset equality only (basic sanity; non-starving behaviour follows from the implementation)
assert sorted(out) == sorted([y for x in ls for y in x])
@given(st.lists(st.integers(), min_size=0, max_size=50))
def test_fork_lockstep(xs):
inc = lambda xs: (x+1 for x in xs)
dec = lambda xs: (x-1 for x in xs)
fork = fork2_lockstep(inc, dec)
out = list(fork(iter(xs)))
assert out == [(x+1, x-1) for x in xs]
@given(st.lists(st.integers(), min_size=0, max_size=50))
def test_multicast_freshness(xs):
a,b = multicast_bounded(iter(xs), 2, maxlen=100)
assert list(a) == xs
assert list(b) == xs # independent
6.3 Additional for Examples¶
Similar for each; e.g., multi-CSV == concat singles.
6.4 Shrinking Demo¶
Bad (no fork/multicast): Fails independence.
7. When Fan Isn't Worth It¶
Single stream; else fan.
8. Pre-Core Quiz¶
- Fan-in for? → Merge srcs.
- Fork/multicast? → Split independent.
- Chain? → Seq merge.
- Equiv? → Preserved.
- Independence? → Fork/multicast ensures.
9. Post-Core Reflection & Exercise¶
Reflect: Find single-stream; add fan.
Project Exercise: Add multi-src to RAG; test merge.
Final Notes: - Fans pure; explicit order/independence. - Document order guarantees per fan. - Fork/multicast for multicast; balance consumption. - For async, see future cores.
Next: M03C08 – Time-Aware Streaming Patterns. (Builds on this.)
Repository Alignment¶
- Implementation:
module-03/funcpipe-rag-03/src/funcpipe_rag/api/core.py::multicast. - Tests:
module-03/funcpipe-rag-03/tests/test_module_03.py::test_multicast_independence.
itertools Decision Table – Use This¶
| Tool | Use When | Memory | Pitfall | Safe? |
|---|---|---|---|---|
| chain | Concat many iterables | O(1) | None | Yes |
| groupby | Group contiguous equal items | O(1) | Must sort first if not contiguous | Yes |
| tee | Multiple consumers of same iterator | O(skew) | Unbounded skew → memory explosion | Careful |
| islice | Skip/take without consuming | O(1) | None | Yes |
| accumulate | Running totals/reductions | O(n) | Default op is + | Yes |
| compress | Filter by boolean mask | O(1) | None | Yes |
Further Reading: For the deepest itertools mastery, see the official docs and Dan Bader’s ‘Python Tricks’ chapter on iterators.
You now own the most powerful lazy streaming toolkit in Python. Module 4 will show you how to make even failure and resource cleanup lazy and pure.