M08C06: Rate Limiting & Fairness – Cooperative Scheduling¶
Module 08 – Main Track Core
Main track: Cores 1–10 (Async / Concurrent Pipelines → Production).
This is a required core. Every production FuncPipe pipeline that shares an external metered resource (embedding API, third-party LLM, database, GPU fleet) uses explicit token-bucket rate limiting and weighted-fair merging as pure data — turning transient 429s and tenant starvation into predictable, configurable, mathematically well-behaved outcomes.
Progression Note¶
Module 8 is Async FuncPipe & Backpressure — the final concurrency layer that makes the pure pipelines from Modules 1–7 scale to real-world production without ever compromising the laws we fought for.
| Module | Focus | Key Outcomes |
|---|---|---|
| 7 | Effect Boundaries & Resource Safety | Ports & adapters, capability interfaces, resource-safe effect isolation |
| 8 | Async FuncPipe & Backpressure | Async streams, bounded queues, timeouts/retries, fairness & rate limiting |
| 9 | FP Across Libraries and Frameworks | Stdlib FP, data/ML stacks, web/CLI/distributed integration |
| 10 | Refactoring, Performance, and Future-Proofing | Systematic refactors, performance budgets, governance & evolution |
Core question
How do you turn external rate limits and multi-tenant fairness requirements into pure, immutable data that are interpreted only when the async stream is driven — giving you hard throughput caps and starvation-free multiplexing without a single line of imperative scheduling logic in the core?
We take the resilient, bounded, backpressure-safe embedding stream from C03–C05 and ask the question every team faces at scale:
“Why did we get globally rate-limited after adding two more tenants, and why is the biggest tenant getting 98 % of the capacity while the small one is completely starved?”
The naïve pattern everyone writes first:
# BEFORE – magic numbers, unfair, untestable
async def embed_with_hack(chunk: Chunk) -> EmbeddedChunk:
await asyncio.sleep(0.125) # "rate limit"
return await model.aencode(chunk.text.content)
tasks = [embed_with_hack(c) for c in chunks] # largest tenant wins
results = await asyncio.gather(*tasks)
Magic numbers, no burst handling, no fairness, impossible to test deterministically, easy to forget the sleep.
The production pattern: policies are pure data → combinators return ordinary AsyncGen descriptions → enforcement (token bucket, WFQ selection, cooperative yields) happens only on iteration.
# AFTER – pure policies, composable, testable, fair
rate_policy = RateLimitPolicy(tokens_per_second=8.0, burst_tokens=20)
fair_policy = FairnessPolicy(weights={0: 3, 1: 1, 2: 1}, max_buffer_per_stream=16)
def production_multi_tenant_stream(
tenant_chunks: Mapping[int, AsyncGen[ChunkWithoutEmbedding]],
) -> AsyncGen[EmbeddedChunk]:
per_tenant = [
async_gen_bounded_map( # C03
chunks,
lambda c: async_with_resilience(async_embed_chunk(c, embedder), retry_policy, timeout_policy), # C04
backpressure_policy,
)
for chunks in tenant_chunks.values()
]
merged = async_gen_fair_merge(per_tenant, fair_policy) # ← this core
return async_gen_rate_limited(merged, rate_policy) # ← this core
One line change → completely different scheduling behaviour. Zero duplication. Full deterministic testing via fake clock.
Audience: Engineers who have been globally banned from an API at 3 a.m. or had to explain to a customer why their job never progresses.
Outcome
1. Every hard-coded await asyncio.sleep(...) replaced with RateLimitPolicy.
2. Every unfair merge / starving tenant replaced with FairnessPolicy.
3. All scheduling proven to satisfy token-bucket and weighted-fair-queue laws (with explicit, documented assumptions) via Hypothesis + fake clock.
4. Linear, readable, refactor-safe flows that respect external constraints automatically.
Tiny Non-Domain Example – Rate-limited & Fair Counter Merge¶
def infinite_counter(tag: str) -> AsyncGen[str]:
async def _gen():
i = 0
while True:
yield Ok(f"{tag}-{i}")
i += 1
await asyncio.sleep(0) # explicit cooperative yield
return lambda: _gen()
streams = [infinite_counter("A"), infinite_counter("B")] # A weight 1, B weight 4
fair = async_gen_fair_merge(streams, FairnessPolicy(weights={0:1, 1:4}))
limited = async_gen_rate_limited(fair, RateLimitPolicy(tokens_per_second=100.0))
async for item in limited(): # B gets ~80 % share, total ≤ 100/s + burst
print(item.value)
Why Rate Limiting & Fairness as Pure Data? (Three bullets every engineer should internalise)¶
- Mathematically hard rate cap: Token bucket = proven never to exceed
tokens_per_secondlong-term orburst_tokensinstantaneously. - No starvation + proportional share among ready streams: WFQ-style selection = every stream with buffered items is eventually selected; long-run ratio among contending streams matches weights.
- Configuration = data: Dev/staging/prod/tenant-specific behaviour is a one-line policy change, fully testable with fake clock.
1. Laws & Invariants (machine-checked where possible)¶
| Law | Statement & Assumptions | Enforcement |
|---|---|---|
| Token Bucket | Long-term rate ≤ tokens_per_second; instantaneous burst ≤ burst_tokens |
Fake clock + Hypothesis (global + sliding window) |
| No Starvation | Assuming every live producer eventually yields a new item when selected repeatedly, every live stream with capacity to produce will emit infinitely often | Property tests |
| Proportional Share | Among streams that always have items buffered, long-run emittedᵢ / weightᵢ is equal (empirical ε < 0.002 on large prefixes) | Deterministic property tests |
| Description Purity | Combinator construction performs zero side effects | Mock + static analysis |
| Bounded Memory | Memory = O(#streams × max_buffer_per_stream) | Property tests |
Local laws (one-liner per combinator)
- async_gen_rate_limited: each yielded item consumes exactly one token; if <1 token available, sleep exactly the minimum duration needed for one token to refill.
- async_gen_fair_merge: always selects the stream with the current minimum emitted[i] / weight[i] among streams with buffered items (lowest index on tie). When no stream has buffered items, yields control with asyncio.sleep(0).
2. Decision Table – When to Apply Which Policy¶
| Scenario | Need Hard Rate Cap | Multiple Competing Streams | Policy Order |
|---|---|---|---|
| Single embedding API | Yes | No | RateLimit only |
| Multi-tenant batch processing | Yes | Yes | Fairness → RateLimit |
| Internal unbounded work | No | Yes | Fairness only |
| Shared LLM with per-tenant limits | Yes | Yes | Per-tenant RateLimit + global Fairness |
Rule: Apply fairness first (it balances load), then rate limiting on the merged stream (it caps total outbound).
3. Public API – Pure Scheduling Combinators¶
import asyncio
from collections import deque
from collections.abc import AsyncIterator, Sequence, Mapping
from dataclasses import dataclass, field
from typing import TypeVar
from funcpipe_rag.domain.effects.async_ import AsyncGen
from funcpipe_rag.domain.effects.async_.resilience import ResilienceEnv
from funcpipe_rag.result.types import ErrInfo, Ok, Result
T = TypeVar("T")
@dataclass(frozen=True)
class RateLimitPolicy:
tokens_per_second: float = 10.0
burst_tokens: int = 10
@dataclass(frozen=True)
class FairnessPolicy:
weights: Mapping[int, int] = field(default_factory=dict) # stream index → weight, default 1
max_buffer_per_stream: int = 16
def async_gen_rate_limited(
stream: AsyncGen[T],
policy: RateLimitPolicy,
*,
env: ResilienceEnv | None = None,
) -> AsyncGen[T]:
local_env = env or ResilienceEnv.default()
async def _limited() -> AsyncIterator[Result[T, ErrInfo]]:
tokens = float(policy.burst_tokens)
last_refill_s = local_env.clock.now_s()
async for item in stream():
now = local_env.clock.now_s()
elapsed = now - last_refill_s
tokens = min(policy.burst_tokens, tokens + elapsed * policy.tokens_per_second)
last_refill_s = now
if tokens < 1.0:
await local_env.sleep((1.0 - tokens) / policy.tokens_per_second)
# refill again after sleep
now = local_env.clock.now_s()
elapsed = now - last_refill_s
tokens = min(policy.burst_tokens, tokens + elapsed * policy.tokens_per_second)
last_refill_s = now
tokens -= 1.0
yield item
return lambda: _limited()
def async_gen_fair_merge(
streams: Sequence[AsyncGen[T]],
policy: FairnessPolicy | None = None,
) -> AsyncGen[T]:
policy = policy or FairnessPolicy()
weights = [policy.weights.get(i, 1) for i in range(len(streams))]
async def _fair() -> AsyncIterator[Result[T, ErrInfo]]:
iterators = [s() for s in streams]
buffers: list[deque[Result[T, ErrInfo]]] = [deque(maxlen=policy.max_buffer_per_stream) for _ in streams]
active = [True] * len(streams)
emitted = [0] * len(streams)
# initial fill
for i, it in enumerate(iterators):
if active[i]:
try:
buffers[i].append(await it.__anext__())
except StopAsyncIteration:
active[i] = False
while any(active):
selected = -1
best_ratio = float('inf')
for i in range(len(streams)):
if active[i] and buffers[i]:
ratio = emitted[i] / weights[i]
if ratio < best_ratio or (ratio == best_ratio and i < selected):
best_ratio = ratio
selected = i
if selected != -1:
yield buffers[selected].popleft()
emitted[selected] += 1
# greedily refill selected stream
while len(buffers[selected]) < policy.max_buffer_per_stream and active[selected]:
try:
buffers[selected].append(await iterators[selected].__anext__())
except StopAsyncIteration:
active[selected] = False
break
else:
# no progress possible → cooperative yield
await asyncio.sleep(0)
return lambda: _fair()
4. Before → After – Multi-tenant Embedding Pipeline¶
# AFTER – one pipeline, many policies
def multi_tenant_embedding_stream(
tenant_chunks: Mapping[int, AsyncGen[ChunkWithoutEmbedding]],
rate_policy: RateLimitPolicy,
fair_policy: FairnessPolicy,
) -> AsyncGen[EmbeddedChunk]:
per_tenant = [
async_gen_bounded_map(
chunks,
lambda c: async_with_resilience(async_embed_chunk(c, embedder), retry, timeout),
backpressure,
)
for chunks in tenant_chunks.values()
]
fair = async_gen_fair_merge(per_tenant, fair_policy)
return async_gen_rate_limited(fair, rate_policy)
5. Key Tests & Empirical Proofs (all pass in CI)¶
# FakeClock, ResilienceEnv, async_gen_from_list defined in testing helpers (C03–C05)
@given(tps=st.floats(0.5, 50.0), burst=st.integers(1, 50), n=st.integers(100, 5000))
@pytest.mark.asyncio
async def test_token_bucket_never_exceeds_policy(tps, burst, n):
policy = RateLimitPolicy(tokens_per_second=tps, burst_tokens=burst)
clock = FakeClock()
async def fake_sleep(s: float) -> None:
clock.advance_s(s)
env = ResilienceEnv(clock=clock, sleep=fake_sleep, rng=Random(0))
emission_times: list[float] = []
def make_source() -> AsyncGen[None]:
async def _gen():
async for _ in async_gen_from_list(list(range(n)))():
yield Ok(None)
return lambda: _gen()
stream = async_gen_rate_limited(make_source(), policy, env=env)
# Record timestamps when the *rate-limited* stream actually yields items (after token checks & sleeps)
async for _ in stream():
emission_times.append(clock.now_s())
# Global bound
elapsed = clock.now_s()
assert n <= (elapsed * tps) + burst + 1
# Sliding 1-second window bound (O(n) two-pointer scan)
right = 0
n_times = len(emission_times)
for i, start in enumerate(emission_times):
end = start + 1.0
while right < n_times and emission_times[right] <= end:
right += 1
items_in_window = right - i
assert items_in_window <= tps * 1.0 + burst + 1
@given(total=st.integers(5000, 20000), weight_a=st.integers(1, 10), weight_b=st.integers(1, 10))
@pytest.mark.asyncio
async def test_fair_merge_exact_proportional_when_all_ready(total, weight_a, weight_b):
policy = FairnessPolicy(weights={0: weight_a, 1: weight_b}, max_buffer_per_stream=32)
def instant_producer(tag: str, count: int) -> AsyncGen[str]:
async def _gen():
for i in range(count):
yield Ok(f"{tag}{i}")
return lambda: _gen()
merged = async_gen_fair_merge([
instant_producer("A", total * 10),
instant_producer("B", total * 10),
], policy)
counts = {"A": 0, "B": 0}
async for item in merged():
if isinstance(item, Ok):
counts[item.value[0]] += 1
if sum(counts.values()) >= total:
break
expected_ratio_b = weight_b / (weight_a + weight_b)
actual_ratio_b = counts["B"] / sum(counts.values())
assert abs(actual_ratio_b - expected_ratio_b) <= 0.002
assert min(counts.values()) >= total // (weight_a + weight_b) - 10 # never starved
6. Runtime Guarantees¶
| Policy | Long-term Rate | Memory | Fairness (when all streams ready) |
|---|---|---|---|
| RateLimitPolicy | ≤ tokens_per_second | O(1) | — |
| FairnessPolicy | — | O(#streams × max_buffer_per_stream) | Empirical weight ratio within ε ≈ 0.002 on large prefixes |
7. Anti-Patterns & Immediate Fixes¶
| Anti-Pattern | Symptom | Fix |
|---|---|---|
Hard-coded await sleep(...) |
Untestable, forgotten, inflexible | RateLimitPolicy + async_gen_rate_limited |
| Naive merge of tenant streams | Starvation, angry customers | async_gen_fair_merge with weights |
| Rate limiting after the call | Still hits 429 | Limit the metered step itself |
| Unbounded per-stream buffers | OOM under fast producers | max_buffer_per_stream (enforced) |
8. Pre-Core Quiz¶
- Rate limiting is a…? → Token bucket interpreted on yield
- Fairness selects…? → Stream with minimum emitted/weight among ready streams
- Proportional share holds exactly when…? → All streams always have buffered items
- Memory is bounded by…? → #streams × max_buffer_per_stream
- The golden rule? → One pipeline, many scheduling policies — zero duplication
9. Post-Core Exercise¶
- Add
RateLimitPolicy+FairnessPolicyto your real multi-tenant pipeline. - Replace any hard-coded sleeps or unfair merges.
- Run the sliding-window token-bucket test against your actual policy values.
- Run the proportional-share test with your tenant weights.
- Sleep well — your pipeline now respects the real world without selling your soul.
Next → M08C07: Integrating Pure FuncPipe Core with Async Edges Cleanly
You have completed the entire production-grade Async FuncPipe toolbox: pure descriptions, backpressure, resilience, deterministic testing, rate limiting, and fairness — all as composable, mathematically lawful combinators.
Module 9 begins integration with the wider Python ecosystem using exactly these primitives.
M08C06 is now frozen.