Skip to content

Core 2: Performance Budgeting – When FP Wins, When to Vectorize, When to Drop to C/NumPy

Module 10

Core question:
How do you set and enforce performance budgets in functional pipelines, identifying when pure FP patterns suffice or outperform imperative code, when to vectorize with libraries like NumPy/Pandas/Polars, and when to extend with C/Cython/Numba/Rust for hotspots, all while preserving purity and composability?

In this core, we introduce performance budgeting for the FuncPipe RAG Builder (now at funcpipe-rag-10). Budgeting involves setting system-specific thresholds for latency, throughput, and memory, profiling pipelines to enforce them, and choosing optimizations pragmatically. Pure FP often wins in composability and parallelism (e.g., immutable data enables safe caching/multithreading), but may lag in tight loops; vectorization accelerates array ops (Polars can be several times faster on columnar/lazy workloads, but expect little gain for Python UDFs); extensions unlock constant-factor speedups for bottlenecks. We refactor RAG stages (e.g., embedding/chunking) with budgets, verifying equivalence and laws. Prefer fused/lazy stages; materialize only at boundaries or when budgeted. Treat any list()/collect()/to_numpy() between stages as a fusion break; profile around these boundaries first.

Motivation Bug: Unbudgeted FP can regress perf (e.g., recursive folds vs loops); blind purity sacrifices composability—budgeting ensures targeted optimizations without losing FP wins.

Delta from Core 1: Refactoring yields pure pipelines; this enforces perf budgets on them.

Budgeting Protocol (Contract, Entry/Exit Criteria): - Budgets: Multi-dimensional, percentile-based thresholds (p50/p95/p99 latency <1.1x baseline at bottlenecks, throughput >90% target, peak/working-set RSS <1.5x); set per system/subsystem/stage. Baseline = last green mainline on fixed dataset + fixed seeds + warmup protocol; stored as CI artifact. - Semantics: Optimizations preserve equivalence (Core 1 predicates); pure FP first, vectorize if array-heavy, extend if loop-bound. - Purity: Preserve; extensions behind pure facades (e.g., Rust via PyO3 as callable). - Error Model: No change; budgets include failure paths. - Resource Safety: Profile memory; vectorization often trades memory for speed; measure RSS explicitly. - Integration: Apply to RAG (profile embedding); verify with profiled properties. - Mypy Config: --strict; type extensions (e.g., Numba types). - Exit: Budgets met, equivalence holds.

Audience: Engineers optimizing FP pipelines without sacrificing design.

Outcome: 1. Set/enforce budgets via profiling. 2. Choose FP/vectorize/extension based on hotspots. 3. Optimize RAG, verifying equivalence.


1. Laws & Invariants

Invariant Description Enforcement
Budget Inv Pipeline meets thresholds (latency/throughput/memory); regressions fixed before merge. Profiled CI checks
Equivalence Inv Optimizations preserve Core 1 predicates. Properties per opt
Purity Inv Extensions wrapped in pure APIs; no leaked mutations. Reviews/mypy
Scalability Inv Scaling no worse than baseline across size sweep; verify with size-based benchmarks. Hypothesis sizes
Idempotence Inv If transform intended idempotent, preserve; otherwise preserve extensional equality only. Properties

These ensure optimizations are safe and budgeted.


2. Decision Table

Hotspot Type FP Wins? Vectorize? Extend? Recommended
Composable logic Yes (caching) No No Pure FP
Array ops (e.g., emb) Sometimes Yes (Polars columnar / Torch batching) If needed Vectorize first
Tight loops Rarely If array Yes (Rust) Extend with facade
Parallel-safe Yes If SIMD If threaded FP + extensions
I/O-bound No No Rarely Async edges (Mod 08)
CPU-bound UDF No If columnar Yes Vectorize/extend
Memory-bound Sometimes Yes Yes Reduce materialization / stream; vectorize only if native lazy plan lowers RSS (verify)
GIL-blocked No No Yes (release) Extensions
Latency-critical Varies If fast If native Profile p99
Throughput-critical Varies Yes If parallel Vectorize + parallel

Choose by profile: FP default; escalate if budget missed.


3. Public API (Wrappers for Optimizations)

Wrappers for common opts; e.g., budgeted stage.

from typing import Callable
from funcpipe_rag import Chunk, Embedding


def budgeted_stage(fn: Callable[[list[Chunk]], list[Embedding]], budget: dict) -> Callable:
    def wrapped(chunks: list[Chunk]) -> list[Embedding]:
        with profile(budget):  # Enforce; raise if over
            return fn(chunks)

    return wrapped
# Usage: embed_stage = budgeted_stage(batch_embed, {"latency_ms": 100})

4. Reference Implementations

4.1 FP Wins: Caching/Parallel in Pure Pipelines

Immutable data enables lru_cache; e.g., pure chunking. Cache only if (a) key stable + versioned, (b) hit-rate above threshold, (c) cache size fits RSS budget, (d) contention measured under load. Prefer caching by stable small keys (doc_id, version) with explicit memo tables over hashing full payload objects.

from functools import lru_cache
from dataclasses import dataclass
import hashlib

@dataclass(frozen=True)
class CleanDoc:
    # ... (fields immutable)

@lru_cache(maxsize=1024)
def pure_chunk(doc_id: str, abstract_hash: str, size: int) -> tuple[Chunk, ...]:  # Small key
    abstract = ABSTRACT_STORE[(doc_id, abstract_hash)]  # Assume store; e.g., dict or DB
    return tuple(Chunk(doc_id, abstract[i:i+size], i, i+len(abstract[i:i+size])) for i in range(0, len(abstract), size))  # Immutable tuple
Wins: Cache hits outperform imperative mutations.

4.2 Vectorize: Batch Embed with NumPy/Torch

For embeddings, batch into model; Polars for columnar (several times faster on lazy workloads).

import torch  # Or NumPy/ONNX
def batch_embed(texts: list[str]) -> list[Embedding]:
    inputs = tokenizer(texts, return_tensors="pt", padding=True)  # Batch
    with torch.no_grad():
        embeds = model(**inputs).last_hidden_state.mean(dim=1)  # Vectorized
    return [Embedding(v.tolist()) for v in embeds]

# Polars for columnar (non-UDF)
import polars as pl
def polars_process(df: pl.DataFrame) -> pl.DataFrame:
    return df.lazy().select(  # Stay native; no Python UDF
        pl.col("text").str.to_lowercase(),  # Vectorized str ops
        # ... integrate batch_embed via plugin if needed
    ).collect()
Integrate: Replace list comp with batch for array-heavy.

4.3 Extend: Rust for Hot Loops

PyO3 for Rust extensions; constant-factor gains when work/transfer ratio high.

// rag_ext.rs (PyO3)
use pyo3::prelude::*;
#[pyfunction]
fn rust_chunk(py: Python, text: &str, size: usize) -> PyResult<Vec<(usize, usize)>> {
    py.allow_threads(|| {  // Release GIL
        let mut res = Vec::new();
        let mut start_byte = 0;
        let mut char_count = 0;
        for (byte_idx, ch) in text.char_indices() {  // Unicode-safe
            if char_count == size {
                res.push((start_byte, byte_idx));
                start_byte = byte_idx;
                char_count = 0;
            }
            char_count += 1;
        }
        if char_count > 0 {
            res.push((start_byte, text.len()));
        }
        Ok(res)  // Byte offsets; for graphemes, use unicode-segmentation crate
    })
}
Python facade:
from rag_ext import rust_chunk  # Pure callable
def extended_chunk(cleaned: CleanDoc, size: int) -> list[Chunk]:
    indices = rust_chunk(cleaned.abstract, size)
    return [Chunk(cleaned.doc_id, cleaned.abstract[start:end], start, end) for start, end in indices]  # Byte offsets; if domain uses char indices, convert
Chunk start/end are byte offsets in this pipeline; if your domain uses char/grapheme indices, convert before constructing Chunk.

4.4 Budget Enforcement Pattern

Use tool stack: py-spy/perf (wall/native), line_profiler (hotspots), tracemalloc/memray (RSS/allocs). Benchmarks: warmup, fixed seeds, pinned threads/affinity, scale datasets, separate micro from end-to-end, multiple runs. Percentiles measured via N repeated runs (after warmup) under pinned threads; CI uses median-of-medians to reduce noise. PR: lightweight microbench on fixed seeds, fail on >X% regression. Nightly: full end-to-end + percentiles + memray. Store history and alert on trends.

from contextlib import contextmanager
import tracemalloc, psutil, time
@contextmanager
def profile(budget: dict):
    start = time.time()
    tracemalloc.start()
    yield
    _, peak_alloc = tracemalloc.get_traced_memory()
    tracemalloc.stop()
    rss_at_exit = psutil.Process().memory_info().rss  # Approximate peak; use memray for true peak
    elapsed = time.time() - start
    if elapsed > budget["latency_s"]: raise PerfBudgetError("Over latency")
    if rss_at_exit > budget["rss_bytes"]: raise PerfBudgetError("Over RSS")
    # For percentiles/throughput: external harness with repeats

RAG Integration

Profile embedding; batch/vectorize if over; extend if still.

Optimization Selection Rule

  • Numba: numeric loops on NumPy arrays, minimal refactor.
  • Cython/C: stable kernels with tight Python interop.
  • Rust: complex logic + safe concurrency + long-term maintenance.

5. Property-Based Proofs (tests/test_module_10_core2.py)

Hypothesis for equivalence under opts.

import numpy as np
@given(texts=text_list_strategy())
def test_batch_equiv(texts):
    pure = [embed(t) for t in texts]
    pure_emb = {e.id: e.vector for e in pure}
    bat_emb = {e.id: e.vector for e in batch_embed(texts)}
    assert all(np.allclose(pure_emb[k], bat_emb[k], atol=1e-5) for k in pure_emb)  # Tolerance

@given(cleaned=clean_doc_strategy(), size=st.integers(128,1024))
def test_extend_equiv(cleaned, size):
    pure_c = list(pure_chunk(cleaned.doc_id, hashlib.sha256(cleaned.abstract.encode()).hexdigest(), size))
    ext_c = extended_chunk(cleaned, size)
    assert eq_pure(pure_c, ext_c, key=lambda c: (c.start, c.end))


6. Runtime Preservation Guarantee

Enforce budgets: optimizations must meet thresholds or revert; FP often improves via caching/parallel, vectorize reduces peaks, extensions for constant-factor gains.


7. Anti-Patterns & Immediate Fixes

Anti-Pattern Symptom Fix
Premature opt Wasted time Profile first
Purity sacrifice Lost composability Facades for extensions
Ignore memory OOM errors Include RSS in budget
No equivalence check Silent regressions Properties post-opt

8. Pre-Core Quiz

  1. Budget for…? → Latency/throughput/RSS
  2. FP wins when…? → Caching/parallel
  3. Vectorize with…? → Polars for speed
  4. Extend via…? → Rust/PyO3 facade
  5. Enforce how? → Profiled checks

9. Post-Core Exercise

  1. Profile RAG stage; set budget.
  2. Optimize hotspot (vector/extend); add properties.
  3. Verify budget met.

Pipeline Usage (Idiomatic)

embeds = budgeted_stage(batch_embed, budget)(texts)

Next: core 3. Observability – Tracing Through Pure Pipelines, Debuggable Composition