Core 3: Data Processing – Pandas/Polars/Dask in FP Style (Pure Transforms, Method Chains vs Pipelines)¶
Module 09
Core question:
How do you adapt data processing libraries like Pandas, Polars, and Dask to functional programming principles, emphasizing pure transforms, method chaining for readability, and composable pipelines, while respecting each library's execution model and costs in FuncPipe?
In this core, we adapt data processing libraries to FP principles in the FuncPipe RAG Builder (now at funcpipe-rag-09). Pandas provides DataFrame analysis with method chaining (e.g., .assign, .loc, .groupby), but chains are eager, single-threaded, and can incur copy costs—use for small data or as a baseline. Polars offers faster, Rust-backed DataFrames with lazy expressions (e.g., pl.col, .with_columns, .lazy) for optimized, deferred computation, enabling query planning. Dask extends Pandas for distributed/out-of-memory work with task graphs (e.g., dd.read_csv, .map_partitions), but adds overhead for small data and requires meta for schema. We focus on pure transforms (no in-place mutations), method chains for readability, and functional pipelines, integrating with prior lazy streams/monadic flows. Refactor RAG data loading/processing (e.g., CSV to chunks) to FP style, verifying equivalence and laws like idempotence for normalizers. Dependencies: pip install pandas polars dask[complete]; tradeoffs: Pandas ubiquity (eager/copies), Polars speed (lazy/optimized), Dask scale (graph-based/shuffles); sharp edges: Pandas copy-heavy, Polars stricter typing/schema (strip_chars only whitespace by default unless chars specified), Dask meta/inference issues and shuffles on global ops like groupby/join.
Motivation Bug: Imperative code with mutations/side effects in these libs leads to non-reproducible pipelines; FP style enforces purity/composability, but must respect models to avoid perf pitfalls like unnecessary copies or shuffles.
Delta from Core 2 (Module 09): Helpers add general FP tools; this adapts data-frame libs to FP patterns.
Data Processing Protocol (Contract, Entry/Exit Criteria): - Purity: Immutable ops only (e.g., Pandas .assign over df['col']=; Polars .with_columns; Dask .map_partitions with pure funcs); no in-place=True. - Composability: Method chains for linear flows; functional pipelines via pipe/curried. - Laziness/Eagerness: Pandas chains eager (copies possible); Polars/Dask support lazy graphs (.lazy/.compute)—defer to avoid early materialization. - Semantics: Laws like idempotence for normalizers (f(f(df)) == f(df)); commutativity under preconditions (e.g., independent filter/map commute if map preserves predicate/row count); schema invariance under preconditions (no dtype-changing ops); null stability (ops handle NaN/null consistently); lazy/eager equivalence (computed results match up to sorting/index/null handling); verified via properties with preconditions. - Integration: Wrap RAG CSV loading/processing in FP chains; convert to/from FuncPipe iterators/ADTs at boundaries; preserve laziness until sink. Note: Dask paths compute entire dataset into memory at boundary—use only at terminal sink. - Mypy Config: --strict; pandas-stubs/polars typing; dask optional.
Audience: Data engineers integrating Pandas/Polars/Dask into FP pipelines for scalable, pure transforms.
Outcome: 1. Adapt these libs to FP for pure chains/pipelines. 2. Refactor RAG data processing FP-style. 3. Prove equivalence/laws with Hypothesis.
1. Laws & Invariants¶
| Law | Description | Enforcement |
|---|---|---|
| Idempotence Law | For normalizing transforms: f(f(df)) == f(df) (e.g., strip/lower). | Property tests |
| Schema Invariance | Ops preserve expected columns/dtypes under preconditions (no dtype-changing ops). | Property tests with preconditions |
| Null Stability | Ops handle NaN/null consistently without unexpected propagation. | Property tests |
| Commutativity Inv | Independent ops commute under preconditions (e.g., filter then map == map then filter if map doesn't affect predicate or row count). | Property tests with preconditions |
| Equivalence Law | FP refactors yield same outputs as imperative (up to sorting/index/null handling). | Hypothesis equivalence |
| Lazy Equivalence | Lazy/eager versions equivalent when computed (Polars/Dask vs Pandas, up to sorting/index/null). | Property tests with preconditions |
These laws ensure FP-style processing is reliable, with explicit preconditions.
2. Decision Table¶
| Scenario | Data Size | Parallel Needed | Execution Model | Recommended |
|---|---|---|---|---|
| Small/in-memory | Small | No | Eager | Pandas chains |
| Fast in-memory | Medium | No | Lazy optimized | Polars expressions |
| Out-of-memory | Large | Yes | Distributed graph | Dask partitions |
| FP purity focus | Any | No | Varies | All with pure funcs |
Choose based on scale; Polars for speed, Dask for big data.
3. Public API (Wrappers for Interop; Optional Imports)¶
Repo implementation lives in src/funcpipe_rag/interop/dataframes.py (all third-party imports are optional).
Minimal wrappers for conversion to/from FuncPipe types. Guard imports.
from typing import Iterator
from funcpipe_rag import FPResult, Err, ErrInfo # Our ADTs
PANDAS_AVAILABLE = False
POLARS_AVAILABLE = False
DASK_AVAILABLE = False
try:
import pandas as pd
PANDAS_AVAILABLE = True
except ImportError:
pass
try:
import polars as pl
POLARS_AVAILABLE = True
except ImportError:
pass
try:
import dask.dataframe as dd
DASK_AVAILABLE = True
except ImportError:
pass
def df_to_iterator(df: 'pd.DataFrame | pl.DataFrame | dd.DataFrame') -> Iterator[dict]:
if PANDAS_AVAILABLE and isinstance(df, pd.DataFrame):
return iter(df.to_dict(orient='records'))
if POLARS_AVAILABLE and isinstance(df, pl.DataFrame):
return df.iter_rows(named=True)
if DASK_AVAILABLE and isinstance(df, dd.DataFrame):
pdf = df.compute() # Materialize entire dataset at boundary; use only at terminal sink
return iter(pdf.to_dict(orient='records'))
raise TypeError("Unsupported DataFrame type")
4. Reference Implementations¶
4.1 Pandas in FP Style¶
# Method chain (immutable)
def process_pandas(df: pd.DataFrame) -> pd.DataFrame:
return (df
.assign(clean_abstract=lambda d: d['abstract'].str.strip().str.lower())
.loc[lambda d: d['categories'].str.contains('cs.AI', na=False)]
.groupby('doc_id', as_index=False)
.agg(clean_abstract=('clean_abstract', 'first')))
# Pipeline with pipe
def clean_abstract(df: pd.DataFrame) -> pd.DataFrame:
return df.assign(clean_abstract=lambda d: d['abstract'].str.strip().str.lower())
def filter_ai(df: pd.DataFrame) -> pd.DataFrame:
return df.loc[lambda d: d['categories'].str.contains('cs.AI', na=False)]
def rag_pandas_pipeline(df: pd.DataFrame) -> pd.DataFrame:
return df.pipe(clean_abstract).pipe(filter_ai)
4.2 Polars in FP Style¶
# Expression-based (lazy)
def process_polars(lf: pl.LazyFrame) -> pl.DataFrame:
return (lf
.with_columns(pl.col('abstract').str.strip_chars().str.to_lowercase().alias('clean_abstract'))
.filter(pl.col('categories').str.contains('cs.AI'))
.group_by('doc_id')
.agg(pl.col('clean_abstract').first())
.collect())
# Pipeline as expressions (in contexts)
clean_abstract_expr = pl.col('abstract').str.strip_chars().str.to_lowercase().alias('clean_abstract')
filter_ai_expr = pl.col('categories').str.contains('cs.AI')
def rag_polars_pipeline(lf: pl.LazyFrame) -> pl.LazyFrame:
return lf.with_columns(clean_abstract_expr).filter(filter_ai_expr)
4.3 Dask in FP Style¶
# Distributed chains (avoid global groupby if possible)
def process_dask(ddf: dd.DataFrame) -> pd.DataFrame:
ddf = ddf.assign(clean_abstract=ddf['abstract'].str.strip().str.lower())
ddf = ddf[ddf['categories'].str.contains('cs.AI', na=False)]
return ddf.groupby('doc_id').clean_abstract.first().compute() # Warn: may shuffle
# Pure map_partitions (with meta)
def chunk_dask(ddf: dd.DataFrame, env: RagEnv) -> dd.DataFrame:
def chunk_part(pdf: pd.DataFrame) -> pd.DataFrame:
return pdf.assign(chunks=pdf['abstract'].apply(lambda t: [t[i:i+env.chunk_size] for i in range(0, len(t), env.chunk_size)]))
meta = ddf._meta.assign(chunks=pd.Series(dtype='object'))
return ddf.map_partitions(chunk_part, meta=meta)
4.4 Integration in RAG¶
# Optional FP data load (defer compute/collect)
def load_csv_optional(path: str) -> FPResult[Iterator[RawDoc], ErrInfo]:
if POLARS_AVAILABLE:
lf = pl.scan_csv(path)
lf = lf.with_columns(pl.col('abstract').str.strip_chars().str.to_lowercase()) # Defer op
df = lf.collect() # Collect at boundary
return Ok(RawDoc(**row) for row in df.iter_rows(named=True))
elif DASK_AVAILABLE:
ddf = dd.read_csv(path)
ddf = ddf.assign(clean_abstract=ddf['abstract'].str.strip().str.lower())
computed = ddf.compute() # Compute at boundary
return Ok(RawDoc(**row) for row in computed.to_dict(orient='records'))
elif PANDAS_AVAILABLE:
df = pd.read_csv(path)
df = df.assign(clean_abstract=df['abstract'].str.strip().str.lower())
return Ok(RawDoc(**row) for row in df.to_dict(orient='records'))
return Err(ErrInfo("No data lib available"))
4.5 Before/After Refactor¶
# Before: Imperative Pandas
df['clean'] = df['abstract'].str.strip()
df = df[df['categories'].str.contains('cs.AI')]
# After: FP chain
df = df.assign(clean=lambda d: d['abstract'].str.strip()).loc[lambda d: d['categories'].str.contains('cs.AI')]
¶
# Before: Imperative Pandas
df['clean'] = df['abstract'].str.strip()
df = df[df['categories'].str.contains('cs.AI')]
# After: FP chain
df = df.assign(clean=lambda d: d['abstract'].str.strip()).loc[lambda d: d['categories'].str.contains('cs.AI')]
5. Property-Based Proofs (repo tests)¶
This repo only tests the stdlib-only pieces by default; dataframe adapters are optional and are exercised in downstream integration tests when the libraries are installed.
from hypothesis import given, assume
import hypothesis.strategies as st
@given(st.lists(st.dictionaries(keys=st.sampled_from(['doc_id', 'abstract', 'categories']), values=st.text(min_size=0))))
def test_chain_idempotence(data):
df = pd.DataFrame(data)
assume('abstract' in df.columns)
f = lambda d: d.assign(clean_abstract=d['abstract'].str.strip().str.lower())
assert f(f(df)).equals(f(df)) # Idempotent normalizing
# Similar for Polars/Dask with strategies
@given(st.lists(st.dictionaries(keys=st.sampled_from(['a', 'b']), values=st.integers())))
def test_schema_invariance(data):
df = pd.DataFrame(data)
assume(all('a' in row and 'b' in row for row in data))
f = lambda d: d.assign(c=d['a'] + d['b'])
assert set(f(df).columns) == set(df.columns) | {'c'} # Preserves + adds
6. Runtime Preservation Guarantee¶
Polars fastest for in-memory; Dask scales out; Pandas baseline.
7. Anti-Patterns & Immediate Fixes¶
| Anti-Pattern | Symptom | Fix |
|---|---|---|
| In-place ops | Mutations | Use .assign/.with_columns |
| Eager full compute | Memory blowup | Use lazy/partitioned |
| Imperative loops | Non-composable | Chain methods |
| Global groupby in Dask | Shuffle costs | Prefer partition-local then combine |
8. Pre-Core Quiz¶
- Pandas for…? → Ubiquitous chains
- Polars for…? → Fast lazy expr
- Dask for…? → Distributed scale
- FP style? → Pure transforms
- Benefit? → Composable data
9. Post-Core Exercise¶
- Refactor RAG load FP-style.
- Prove idempotence with Hypothesis.
Pipeline Usage (Idiomatic)
Next Core: 4. Web and Services – FastAPI Endpoints with Pure Core & Thin Adapters