Core 2: Performance Budgeting – When FP Wins, When to Vectorize, When to Drop to C/NumPy¶
Module 10
Core question:
How do you set and enforce performance budgets in functional pipelines, identifying when pure FP patterns suffice or outperform imperative code, when to vectorize with libraries like NumPy/Pandas/Polars, and when to extend with C/Cython/Numba/Rust for hotspots, all while preserving purity and composability?
In this core, we introduce performance budgeting for the FuncPipe RAG Builder (now at funcpipe-rag-10). Budgeting involves setting system-specific thresholds for latency, throughput, and memory, profiling pipelines to enforce them, and choosing optimizations pragmatically. Pure FP often wins in composability and parallelism (e.g., immutable data enables safe caching/multithreading), but may lag in tight loops; vectorization accelerates array ops (Polars can be several times faster on columnar/lazy workloads, but expect little gain for Python UDFs); extensions unlock constant-factor speedups for bottlenecks. We refactor RAG stages (e.g., embedding/chunking) with budgets, verifying equivalence and laws. Prefer fused/lazy stages; materialize only at boundaries or when budgeted. Treat any list()/collect()/to_numpy() between stages as a fusion break; profile around these boundaries first.
Motivation Bug: Unbudgeted FP can regress perf (e.g., recursive folds vs loops); blind purity sacrifices composability—budgeting ensures targeted optimizations without losing FP wins.
Delta from Core 1: Refactoring yields pure pipelines; this enforces perf budgets on them.
Budgeting Protocol (Contract, Entry/Exit Criteria): - Budgets: Multi-dimensional, percentile-based thresholds (p50/p95/p99 latency <1.1x baseline at bottlenecks, throughput >90% target, peak/working-set RSS <1.5x); set per system/subsystem/stage. Baseline = last green mainline on fixed dataset + fixed seeds + warmup protocol; stored as CI artifact. - Semantics: Optimizations preserve equivalence (Core 1 predicates); pure FP first, vectorize if array-heavy, extend if loop-bound. - Purity: Preserve; extensions behind pure facades (e.g., Rust via PyO3 as callable). - Error Model: No change; budgets include failure paths. - Resource Safety: Profile memory; vectorization often trades memory for speed; measure RSS explicitly. - Integration: Apply to RAG (profile embedding); verify with profiled properties. - Mypy Config: --strict; type extensions (e.g., Numba types). - Exit: Budgets met, equivalence holds.
Audience: Engineers optimizing FP pipelines without sacrificing design.
Outcome: 1. Set/enforce budgets via profiling. 2. Choose FP/vectorize/extension based on hotspots. 3. Optimize RAG, verifying equivalence.
1. Laws & Invariants¶
| Invariant | Description | Enforcement |
|---|---|---|
| Budget Inv | Pipeline meets thresholds (latency/throughput/memory); regressions fixed before merge. | Profiled CI checks |
| Equivalence Inv | Optimizations preserve Core 1 predicates. | Properties per opt |
| Purity Inv | Extensions wrapped in pure APIs; no leaked mutations. | Reviews/mypy |
| Scalability Inv | Scaling no worse than baseline across size sweep; verify with size-based benchmarks. | Hypothesis sizes |
| Idempotence Inv | If transform intended idempotent, preserve; otherwise preserve extensional equality only. | Properties |
These ensure optimizations are safe and budgeted.
2. Decision Table¶
| Hotspot Type | FP Wins? | Vectorize? | Extend? | Recommended |
|---|---|---|---|---|
| Composable logic | Yes (caching) | No | No | Pure FP |
| Array ops (e.g., emb) | Sometimes | Yes (Polars columnar / Torch batching) | If needed | Vectorize first |
| Tight loops | Rarely | If array | Yes (Rust) | Extend with facade |
| Parallel-safe | Yes | If SIMD | If threaded | FP + extensions |
| I/O-bound | No | No | Rarely | Async edges (Mod 08) |
| CPU-bound UDF | No | If columnar | Yes | Vectorize/extend |
| Memory-bound | Sometimes | Yes | Yes | Reduce materialization / stream; vectorize only if native lazy plan lowers RSS (verify) |
| GIL-blocked | No | No | Yes (release) | Extensions |
| Latency-critical | Varies | If fast | If native | Profile p99 |
| Throughput-critical | Varies | Yes | If parallel | Vectorize + parallel |
Choose by profile: FP default; escalate if budget missed.
3. Public API (Wrappers for Optimizations)¶
Wrappers for common opts; e.g., budgeted stage.
from typing import Callable
from funcpipe_rag import Chunk, Embedding
def budgeted_stage(fn: Callable[[list[Chunk]], list[Embedding]], budget: dict) -> Callable:
def wrapped(chunks: list[Chunk]) -> list[Embedding]:
with profile(budget): # Enforce; raise if over
return fn(chunks)
return wrapped
# Usage: embed_stage = budgeted_stage(batch_embed, {"latency_ms": 100})
4. Reference Implementations¶
4.1 FP Wins: Caching/Parallel in Pure Pipelines¶
Immutable data enables lru_cache; e.g., pure chunking. Cache only if (a) key stable + versioned, (b) hit-rate above threshold, (c) cache size fits RSS budget, (d) contention measured under load. Prefer caching by stable small keys (doc_id, version) with explicit memo tables over hashing full payload objects.
from functools import lru_cache
from dataclasses import dataclass
import hashlib
@dataclass(frozen=True)
class CleanDoc:
# ... (fields immutable)
@lru_cache(maxsize=1024)
def pure_chunk(doc_id: str, abstract_hash: str, size: int) -> tuple[Chunk, ...]: # Small key
abstract = ABSTRACT_STORE[(doc_id, abstract_hash)] # Assume store; e.g., dict or DB
return tuple(Chunk(doc_id, abstract[i:i+size], i, i+len(abstract[i:i+size])) for i in range(0, len(abstract), size)) # Immutable tuple
4.2 Vectorize: Batch Embed with NumPy/Torch¶
For embeddings, batch into model; Polars for columnar (several times faster on lazy workloads).
import torch # Or NumPy/ONNX
def batch_embed(texts: list[str]) -> list[Embedding]:
inputs = tokenizer(texts, return_tensors="pt", padding=True) # Batch
with torch.no_grad():
embeds = model(**inputs).last_hidden_state.mean(dim=1) # Vectorized
return [Embedding(v.tolist()) for v in embeds]
# Polars for columnar (non-UDF)
import polars as pl
def polars_process(df: pl.DataFrame) -> pl.DataFrame:
return df.lazy().select( # Stay native; no Python UDF
pl.col("text").str.to_lowercase(), # Vectorized str ops
# ... integrate batch_embed via plugin if needed
).collect()
4.3 Extend: Rust for Hot Loops¶
PyO3 for Rust extensions; constant-factor gains when work/transfer ratio high.
// rag_ext.rs (PyO3)
use pyo3::prelude::*;
#[pyfunction]
fn rust_chunk(py: Python, text: &str, size: usize) -> PyResult<Vec<(usize, usize)>> {
py.allow_threads(|| { // Release GIL
let mut res = Vec::new();
let mut start_byte = 0;
let mut char_count = 0;
for (byte_idx, ch) in text.char_indices() { // Unicode-safe
if char_count == size {
res.push((start_byte, byte_idx));
start_byte = byte_idx;
char_count = 0;
}
char_count += 1;
}
if char_count > 0 {
res.push((start_byte, text.len()));
}
Ok(res) // Byte offsets; for graphemes, use unicode-segmentation crate
})
}
from rag_ext import rust_chunk # Pure callable
def extended_chunk(cleaned: CleanDoc, size: int) -> list[Chunk]:
indices = rust_chunk(cleaned.abstract, size)
return [Chunk(cleaned.doc_id, cleaned.abstract[start:end], start, end) for start, end in indices] # Byte offsets; if domain uses char indices, convert
4.4 Budget Enforcement Pattern¶
Use tool stack: py-spy/perf (wall/native), line_profiler (hotspots), tracemalloc/memray (RSS/allocs). Benchmarks: warmup, fixed seeds, pinned threads/affinity, scale datasets, separate micro from end-to-end, multiple runs. Percentiles measured via N repeated runs (after warmup) under pinned threads; CI uses median-of-medians to reduce noise. PR: lightweight microbench on fixed seeds, fail on >X% regression. Nightly: full end-to-end + percentiles + memray. Store history and alert on trends.
from contextlib import contextmanager
import tracemalloc, psutil, time
@contextmanager
def profile(budget: dict):
start = time.time()
tracemalloc.start()
yield
_, peak_alloc = tracemalloc.get_traced_memory()
tracemalloc.stop()
rss_at_exit = psutil.Process().memory_info().rss # Approximate peak; use memray for true peak
elapsed = time.time() - start
if elapsed > budget["latency_s"]: raise PerfBudgetError("Over latency")
if rss_at_exit > budget["rss_bytes"]: raise PerfBudgetError("Over RSS")
# For percentiles/throughput: external harness with repeats
RAG Integration¶
Profile embedding; batch/vectorize if over; extend if still.
Optimization Selection Rule¶
- Numba: numeric loops on NumPy arrays, minimal refactor.
- Cython/C: stable kernels with tight Python interop.
- Rust: complex logic + safe concurrency + long-term maintenance.
5. Property-Based Proofs (tests/test_module_10_core2.py)¶
Hypothesis for equivalence under opts.
import numpy as np
@given(texts=text_list_strategy())
def test_batch_equiv(texts):
pure = [embed(t) for t in texts]
pure_emb = {e.id: e.vector for e in pure}
bat_emb = {e.id: e.vector for e in batch_embed(texts)}
assert all(np.allclose(pure_emb[k], bat_emb[k], atol=1e-5) for k in pure_emb) # Tolerance
@given(cleaned=clean_doc_strategy(), size=st.integers(128,1024))
def test_extend_equiv(cleaned, size):
pure_c = list(pure_chunk(cleaned.doc_id, hashlib.sha256(cleaned.abstract.encode()).hexdigest(), size))
ext_c = extended_chunk(cleaned, size)
assert eq_pure(pure_c, ext_c, key=lambda c: (c.start, c.end))
6. Runtime Preservation Guarantee¶
Enforce budgets: optimizations must meet thresholds or revert; FP often improves via caching/parallel, vectorize reduces peaks, extensions for constant-factor gains.
7. Anti-Patterns & Immediate Fixes¶
| Anti-Pattern | Symptom | Fix |
|---|---|---|
| Premature opt | Wasted time | Profile first |
| Purity sacrifice | Lost composability | Facades for extensions |
| Ignore memory | OOM errors | Include RSS in budget |
| No equivalence check | Silent regressions | Properties post-opt |
8. Pre-Core Quiz¶
- Budget for…? → Latency/throughput/RSS
- FP wins when…? → Caching/parallel
- Vectorize with…? → Polars for speed
- Extend via…? → Rust/PyO3 facade
- Enforce how? → Profiled checks
9. Post-Core Exercise¶
- Profile RAG stage; set budget.
- Optimize hotspot (vector/extend); add properties.
- Verify budget met.
Pipeline Usage (Idiomatic)
Next: core 3. Observability – Tracing Through Pure Pipelines, Debuggable Composition