Module 3: Lazy Iteration and Streaming¶
Progression Note¶
By the end of Module 3, you will master lazy generators, itertools mastery, and streaming pipelines that never materialize unnecessary data. This prepares you for safe recursion and error handling in streams (Module 4). See the series progression map in the repo root for full details.
Here's a snippet from the progression map:
| Module | Focus | Key Outcomes |
|---|---|---|
| 2 | First-Class Functions & Expressive Python | Configurable pure pipelines without globals |
| 3 | Lazy Iteration & Generators | Memory-efficient streaming, itertools mastery, short-circuiting |
| 4 | Recursion & Error Handling in Streams | Safe recursion, Result/Option, streaming errors |
M03C08: Time-Aware Streaming Patterns – Throttling and Simple Rate-Limiting in Pure Style¶
Core question:
How do you incorporate time awareness into iterator pipelines for throttling and rate-limiting, using pure, functional patterns without side-effects or global clocks, while preserving laziness and equivalence?
This core builds on Core 7's fan-in/out by introducing time-aware stages: - Use injected clocks and sleepers for pure throttling. - Implement token-bucket rate-limiting. - Handle monotonicity and purity via explicit time sources. - Preserve laziness, determinism, and freshness.
We extend the running project from Core 7 (FuncPipe RAG Builder from m03-rag.md) and add cross-domain examples like timed log tailing, rate-limited API calls, and throttled telemetry to prove scalability.
Audience: Developers with real-time streams needing time controls without impurity.
Outcome: 1. Spot untimed smells like bursty I/O. 2. Add throttling in < 10 lines. 3. Prove time laws with Hypothesis.
Laws (frozen, used across this core): - E1 — Equivalence: timed_pipe(S) == untimed_equiv(S) (ignoring delays). - P1 — Purity: No global clocks; explicit time sources. - R1 — Reusability: Factories yield fresh timed iters. - T1 — Throttle monotonic: Outputs non-decreasing in logical time. - RL1 — Rate-limit: Items/sec ≤ limit; burst handled (effective rate respects token bucket under monotonic clock). - DTR — Determinism: Equal inputs/time-src → equal outputs/delays. - FR — Freshness: Factory calls independent.
1. Conceptual Foundation¶
1.1 The One-Sentence Rule¶
Use timestamped items and delta-based delays with injected monotonic clocks/sleepers for pure throttling/rate-limiting in streams, avoiding globals for testable time.
1.2 Time-Aware in One Precise Sentence¶
Embed time in items or via injectable monotonic clocks; throttle via sleep on deltas.
In this series, preserves purity; mock time for tests.
1.3 Why This Matters Now¶
Untimed pipes burst; time-awareness enables controlled, polite streaming.
1.4 Time-Aware in 5 Lines¶
Throttling example:
def make_throttle(min_delta: float, clock, sleeper):
def stage(xs):
last = None
for x in xs:
now = clock()
if last is not None:
wait = max(0, (last + min_delta) - now)
if wait > 0: sleeper(wait)
last = clock()
yield x
return stage
Pure, testable.
1.5 Minimal Time Harness (Extends Core 7)¶
Build on Core 7 harness; add time helpers:
from typing import Callable, Iterable, Iterator, TypeVar
T = TypeVar("T")
def make_throttle(min_delta: float,
clock: Callable[[], float],
sleeper: Callable[[float], None]) -> Callable[[Iterable[T]], Iterator[T]]:
"""Enforce ≥ min_delta between yielded items.
Requires: clock is monotonic non-decreasing.
Deterministic given (clock, sleeper)."""
assert min_delta >= 0
def stage(xs: Iterable[T]) -> Iterator[T]:
last_emit: float | None = None
for x in xs:
now = clock()
if last_emit is not None:
wait = max(0.0, (last_emit + min_delta) - now)
if wait > 0:
sleeper(wait)
now = clock() # sleeper may advance time
last_emit = now
yield x
return stage
def make_rate_limit(rate: float, burst: int,
clock: Callable[[], float],
sleeper: Callable[[float], None]) -> Callable[[Iterable[T]], Iterator[T]]:
"""Token bucket: rate tokens/sec, capacity=burst.
Requires: clock is monotonic non-decreasing."""
assert rate > 0 and burst >= 1
def stage(xs: Iterable[T]) -> Iterator[T]:
tokens = float(burst)
last = clock()
for x in xs:
now = clock()
tokens = min(burst, tokens + (now - last) * rate)
if tokens < 1.0:
wait = max(0.0, (1.0 - tokens) / rate)
if wait > 0:
sleeper(wait)
now = clock()
tokens = min(burst, tokens + wait * rate)
tokens -= 1.0
last = now
yield x
return stage
def make_timestamp(clock: Callable[[], float]) -> Callable[[Iterable[T]], Iterator[tuple[float, T]]]:
def stage(xs: Iterable[T]) -> Iterator[tuple[float, T]]:
for x in xs:
yield clock(), x
return stage
def make_call_gate(min_delta: float, clock, sleeper):
"""Stateful boundary helper for pacing calls (not pure; use only at I/O edges)."""
last: float | None = None
def gate(fn, *args, **kwargs):
nonlocal last
now = clock()
if last is not None:
wait = max(0.0, (last + min_delta) - now)
if wait > 0: sleeper(wait)
result = fn(*args, **kwargs)
last = clock()
return result
return gate
Use with compose; e.g., compose(..., make_throttle(0.5, time.monotonic, time.sleep), ...). Inject mock clocks/sleepers for tests. Note: sync versions block on sleep; unsuitable for event loops—use async variants in production.
2. Mental Model: Untimed vs Time-Aware¶
2.1 One Picture¶
Untimed Streams (Bursty) Time-Aware Streams (Controlled)
+-----------------------+ +------------------------------+
| fast, uncontrolled | | delta/sleep for pace |
| ↓ | | ↓ |
| overload sinks | | throttle/limit, pure |
| test = slow | | injectable time, testable |
+-----------------------+ +------------------------------+
↑ Chaotic / Global ↑ Pure / Mockable
2.2 Behavioral Contract¶
| Aspect | Untimed | Time-Aware |
|---|---|---|
| Pace | As-fast-as-possible | Throttled/limited |
| Time Source | N/A | Injectable monotonic clock |
| Purity | Pure | Pure (no globals) |
| Testability | Simple | Mock time for fast tests |
Note on Untimed Choice: For batch; else time-aware.
When Not to Time: No rate needs; use Core 7.
Known Pitfalls: - Global time leaks nondeterminism. - Sleep blocks thread.
Forbidden Patterns: - time.time() in core; inject. - Enforce with grep for time.time.
Building Blocks Sidebar: - monotonic for deltas. - sleep for throttle. - Token bucket for limits.
Resource Semantics: Time stages delay but don't consume.
Error Model: Propagate; time errors raise.
Backpressure: Throttle after sources, before sinks.
3. Cross-Domain Examples: Proving Scalability¶
Production-grade examples using the harness. Each pure, timed.
3.1 Example 1: Throttled CSV Read (Time-Aware)¶
def make_timed_csv_pipeline(path: str, max_rows: int, delta: float, clock, sleeper) -> Transform[None, Dict[str, Any]]:
return compose(
source_to_transform(make_csv_source(path)),
ffilter(lambda r: r.get("status") == "active"),
make_throttle(delta, clock, sleeper),
make_project({"id": "user_id", "amount": "total"}),
make_cast({"amount": float}),
fence_k(max_rows),
)
Why it's good: Paced before heavy compute.
3.2 Example 2: Rate-Limited Log Tail¶
def make_timed_log_pipeline(path: str, pattern: str, k: int, rate: float, clock, sleeper) -> Transform[None, str]:
return compose(
source_to_transform(make_log_source(path)),
make_regex_filter(pattern),
make_rate_limit(rate, 1, clock, sleeper),
fence_k(k),
)
Why it's good: Limited after filter.
3.3 Example 3: Throttled API Pagination¶
def pager_timed(fetch_page, *, attempts=3, backoff=0.5, delta: float, clock, sleeper):
gate = make_call_gate(delta, clock, sleeper)
token = None
while True:
tries = 0
while True:
try:
page = gate(fetch_page, token)
break
except Exception:
tries += 1
if tries >= attempts: raise
sleeper(backoff * tries)
for item in page["items"]: yield item
token = page.get("next")
if not token: return
def make_timed_api_pipeline(fetch_page, pred: Callable, k: int, delta: float, clock, sleeper) -> Transform[None, Dict]:
src = lambda: pager_timed(fetch_page, delta=delta, clock=clock, sleeper=sleeper)
return compose(
source_to_transform(src),
ffilter(pred),
fence_k(k),
)
Why it's good: Paced calls; limiter in pager.
3.4 Example 4: Timestamped Telemetry with Throttle¶
def make_timed_telemetry_pipeline(src: Source[Dict], w: int, delta: float, clock, sleeper) -> Transform[None, Dict]:
return compose(
source_to_transform(src),
make_throttle(delta, clock, sleeper),
make_rolling_avg_by_device(w),
)
Why it's good: Wall-clock paced input to aggs.
3.5 Example 5: Rate-Limited FS Hash¶
def make_timed_fs_pipeline(root: str, rate: float, clock, sleeper) -> Transform[None, tuple[str, str, int]]:
return compose(
source_to_transform(make_walk_source(root)),
make_ext_filter({'.py'}),
make_rate_limit(rate, 1, clock, sleeper),
make_sha256_with_size(),
)
Why it's good: Limited before IO.
3.6 Example 6: Throttled N-Gram¶
def make_timed_ngram_pipeline(n: int, k: int, delta: float, clock, sleeper) -> Transform[str, tuple[str,...]]:
return compose(
make_tokenize(),
make_throttle(delta, clock, sleeper),
make_ngrams(n),
fence_k(k),
)
Why it's good: Paced before amplification.
3.7 Running Project: Timed RAG (Time-Aware)¶
Extend RAG with throttling:
def make_timed_rag_fn(env: RagEnv, max_chunks: int, delta: float, clock, sleeper) -> Callable[[Iterable[RawDoc]], Iterator[ChunkWithoutEmbedding]]:
throttle = make_throttle(delta, clock, sleeper)
def pipe(docs: Iterable[RawDoc]) -> Iterator[ChunkWithoutEmbedding]:
cleaned = gen_clean_docs(docs)
throttled = throttle(cleaned)
yield from gen_bounded_chunks(throttled, env, max_chunks)
return pipe
Wins: Paced chunking.
4. Anti-Patterns and Fixes¶
- Global Time: time.time() nondeterministic. Fix: Inject clock.
- Busy Wait: Poll loops waste CPU. Fix: Sleep on deltas.
- No Mock: Real time in tests slow. Fix: Fake clock.
5. Equational Reasoning: Substitution Exercise¶
Hand Exercise: Inline time → equiv untimed.
Bug Hunt: Global time; inject explicit.
6. Property-Based Testing: Proving Equivalence (Advanced, Optional)¶
6.1 Custom Strategy¶
As previous.
6.2 Properties¶
from hypothesis import given, strategies as st
class FakeTime:
def __init__(self) -> None:
self.t: float = 0.0
self.sleeps: list[float] = []
def clock(self) -> float:
return self.t
def sleep(self, dt: float) -> None:
assert dt >= 0.0
self.sleeps.append(dt)
self.t += dt
@given(
st.lists(st.anything(), max_size=50),
st.floats(min_value=0.1, max_value=1.0, allow_nan=False, allow_infinity=False),
)
def test_throttle_equiv(xs, delta):
ft = FakeTime()
throttle = make_throttle(delta, ft.clock, ft.sleep)
out = list(throttle(iter(xs)))
assert out == xs # equiv ignoring time
# Under FakeTime, we advance exactly `delta` between emits.
assert abs(sum(ft.sleeps) - delta * max(0, len(xs) - 1)) < 1e-9
@given(
st.lists(st.anything(), max_size=50),
st.floats(min_value=0.1, max_value=1.0, allow_nan=False, allow_infinity=False),
)
def test_throttle_monotonic(xs, delta):
ft = FakeTime()
throttle = make_throttle(delta, ft.clock, ft.sleep)
_ = list(throttle(iter(xs)))
# T1: all waits are non-negative ⇒ logical time never goes backwards.
assert all(dt >= 0.0 for dt in ft.sleeps)
@given(
st.lists(st.anything(), max_size=50),
st.floats(min_value=0.5, max_value=10.0, allow_nan=False, allow_infinity=False),
st.integers(1, 5),
)
def test_rate_limit_respects_rate(xs, rate, burst):
ft = FakeTime()
limit = make_rate_limit(rate, burst, ft.clock, ft.sleep)
out = list(limit(iter(xs)))
assert out == xs
n = len(xs)
free = min(n, burst) # initial token budget
# Tokens from time (`rate * ft.t`) plus initial free tokens must cover tokens spent (`n`).
assert ft.t * rate + free >= n - 1e-9
@given(
st.lists(st.anything(), max_size=50),
st.floats(min_value=0.5, max_value=10.0, allow_nan=False, allow_infinity=False),
)
def test_rate_limit_determinism(xs, rate):
ft1 = FakeTime()
limit1 = make_rate_limit(rate, 3, ft1.clock, ft1.sleep)
out1 = list(limit1(iter(xs)))
ft2 = FakeTime()
limit2 = make_rate_limit(rate, 3, ft2.clock, ft2.sleep)
out2 = list(limit2(iter(xs)))
assert out1 == out2 == xs
6.3 Additional for Examples¶
Similar; e.g., timed-CSV == untimed equiv.
6.4 Shrinking Demo¶
Bad (global time): Fails determinism.
7. When Time Isn't Worth It¶
Batch processing; else time-aware.
8. Pre-Core Quiz¶
- Throttle for? → Pace deltas.
- Clock? → Inject mock.
- Global time? → Avoid.
- Equiv? → Preserved.
- Test? → Fast with mock.
9. Post-Core Reflection & Exercise¶
Reflect: Find bursty; add throttle.
Project Exercise: Add time to RAG; test with mock.
Final Notes: - Time pure; inject clocks. - Document monotonicity per stage. - Mock for deterministic tests. - For async time, see future cores.
Next: M03C09 – Designing Custom Iterator Types. (Builds on this.)
Repository Alignment¶
- Implementation:
module-03/funcpipe-rag-03/src/funcpipe_rag/api/core.py::throttleplusmodule-03/funcpipe-rag-03/src/funcpipe_rag/fp.py::FakeTime. - Tests:
module-03/funcpipe-rag-03/tests/test_module_03.py::test_throttle_uses_injected_clock.
itertools Decision Table – Use This¶
| Tool | Use When | Memory | Pitfall | Safe? |
|---|---|---|---|---|
| chain | Concat many iterables | O(1) | None | Yes |
| groupby | Group contiguous equal items | O(1) | Must sort first if not contiguous | Yes |
| tee | Multiple consumers of same iterator | O(skew) | Unbounded skew → memory explosion | Careful |
| islice | Skip/take without consuming | O(1) | None | Yes |
| accumulate | Running totals/reductions | O(n) | Default op is + | Yes |
| compress | Filter by boolean mask | O(1) | None | Yes |
Further Reading: For the deepest itertools mastery, see the official docs and Dan Bader’s ‘Python Tricks’ chapter on iterators.
You now own the most powerful lazy streaming toolkit in Python. Module 4 will show you how to make even failure and resource cleanup lazy and pure.