Core 2: FP Helper Libraries – toolz / returns in Real Pipelines¶
Module 09
Core question:
How do you optionally integrate external FP libraries liketoolz(for ergonomic combinators) andreturns(for typed monadic containers) into FuncPipe pipelines, usingtoolzfor syntax sugar over stdlib iterators andreturnsonly at outermost boundaries with immediate conversion to our custom ADTs?
In this core, we introduce external FP helpers as optional enhancements in the FuncPipe RAG Builder (now at funcpipe-rag-09). toolz provides ergonomic combinators (e.g., pipe, compose, curried.map/filter, memoize, reduceby, partition_all, sliding_window, groupby, merge_sorted), with some as direct sugar over stdlib (pipe/compose/curried variants) and others adding new semantics (e.g., reduceby for keyed folds). returns offers typed monads (Result, Maybe, IO, Future) with map/bind/rescue/fix, used strictly at boundaries (e.g., wrapping impure calls) and converted immediately to our ADTs (FuncPipe Result/Option) to avoid mixing types. We refactor select RAG stages with runtime-checked fallbacks, verifying equivalence and laws like functor/monad properties. Dependencies are optional via imports and a compat layer; installation: pip install toolz returns (mypy plugin: pip install returns[mypy]). Sharp edges: toolz.groupby materializes O(n) dict (no sort) vs streaming itertools.groupby (requires sorted/consecutive)—choose explicitly; returns requires plugin for typing. Avoid helpers for minimal deps; adopt only if ergonomics/typing justify load.
Motivation Bug: Custom combinators/monads risk bugs; toolz/returns offer tested impls, but mixing causes type splits—hence boundary-only policy with converters.
Delta from Core 1 (Module 09): Stdlib grounds basics; helpers add optional ergonomics/typing with fallbacks and boundaries.
Helper Lib Protocol (Contract, Entry/Exit Criteria): - Interop Policy: toolz: syntax over stdlib (e.g., curried.map == lambda f: lambda it: map(f, it)); returns: boundaries only (e.g., wrap impure, convert to FPResult immediately); no mixing within stages—core pipelines use FuncPipe ADTs exclusively. - Composability: toolz.pipe chains eagerly; returns.Result composes via bind/map (monadic). - Purity: Pure funcs only; memoize preserves purity; IO/Future describe effects (IO holds values eagerly unless thunked; Future defers coros until .awaited yields IO). - Laziness/Eagerness: toolz.itertoolz defers (e.g., curried.map yields generator); toolz.pipe applies eagerly. returns containers eager (compute on creation) except Future. - Semantics: Laws like functor (map id == id, map (f . g) == map g . map f) and monad (left/right identity, associativity); verified via properties. Bold warn: toolz.groupby O(n) dict vs itertools.groupby streaming—mismatch requires explicit choice. - Integration: Runtime-checked imports with stdlib fallbacks preserving types; converters at boundaries. Refactor RAG optionally. - Mypy Config: --strict; returns[mypy] plugin required for typing; ignore toolz if untyped.
Audience: Engineers seeking ergonomics/typed monads beyond stdlib, with safe boundaries to preserve pipeline coherence.
Outcome: 1. Use toolz for curried/pipe syntax; returns for boundary monads with converters. 2. Refactor RAG with runtime fallbacks preserving types. 3. Prove equivalence/laws with Hypothesis.
1. Laws & Invariants¶
| Law | Description | Enforcement |
|---|---|---|
| Functor Law | map(lambda x: x) == id; map(lambda x: f(g(x))) == map(g).map(f). | Property tests |
| Monad Law | bind(return, f) == f(x); bind(m, return) == m; bind(bind(m, f), g) == bind(m, lambda x: bind(f(x), g)). | Property tests (returns only) |
| Purity Law | Combinators depend only on inputs; no mutation. | Code reviews |
| Laziness Inv | Iterator funcs defer; pipe eager. | Tests with islice |
| Equivalence Law | Lib versions == stdlib/custom (post-conversion). | Hypothesis equivalence |
These laws align helpers with our ADTs.
2. Decision Table¶
| Scenario | Ergonomics Gain | Typed Monads Needed | Cognitive Load | Recommended |
|---|---|---|---|---|
| Chain iterators | High | No | Low | toolz.pipe/curried |
| Wrap impure | Low | High | Medium | returns.safe + converter |
| Curry funcs | High | No | Low | toolz.curried |
| Memo pure | Medium | No | Low | functools.lru_cache (bounded) |
| Minimal deps | None | None | None | Stdlib fallback |
Tradeoff: Helpers add load; use if benefits > ecosystem split.
3. Public API (Interop Wrappers/Converters)¶
Repo alignment note (end-of-Module-09):
- Toolz/returns are optional and not installed by default.
- Runnable compat lives in:
- src/funcpipe_rag/interop/toolz_compat.py
- src/funcpipe_rag/interop/returns_compat.py
- Core code continues to use funcpipe_rag.result.types.Result / Option.
from typing import Callable, TypeVar, Iterator, Iterable, Optional
from functools import reduce, partial, lru_cache
from funcpipe_rag import Result as FPResult, Ok, Err, ErrInfo # Our ADTs
T = TypeVar('T')
U = TypeVar('U')
E = TypeVar('E')
TOOLZ_AVAILABLE = False
RETURNS_AVAILABLE = False
try:
import toolz
TOOLZ_AVAILABLE = True
except ImportError:
pass
try:
from returns.result import Result, Success, Failure
from returns.maybe import Maybe, Some, Nothing
RETURNS_AVAILABLE = True
except ImportError:
pass
def to_funcpipe_result(r: 'Result[T, E]') -> FPResult[T, E]:
if isinstance(r, Success):
return Ok(r.value)
assert isinstance(r, Failure)
return Err(ErrInfo.from_returns_failure(r.failure())) # Assume ErrInfo factory; map fully
# Compat layer (type-preserving fallbacks)
class CompatCurried:
@staticmethod
def map(f: Callable[[T], U]) -> Callable[[Iterable[T]], Iterator[U]]:
return partial(map, f)
@staticmethod
def filter(p: Callable[[T], bool]) -> Callable[[Iterable[T]], Iterator[T]]:
return partial(filter, p)
# Add more as used
curried = toolz.curried if TOOLZ_AVAILABLE else CompatCurried()
def compat_pipe(x, *fs):
return reduce(lambda v, f: f(v), fs, x)
pipe = toolz.pipe if TOOLZ_AVAILABLE else compat_pipe
memoize = lru_cache(maxsize=128) # Stdlib bounded for both paths
4. Reference Implementations¶
4.1 Toolz for Ergonomic Combinators¶
from operator import attrgetter
from itertools import chain as std_chain
# Pipe with curried (lazy where possible)
def process_docs(docs: Iterator[RawDoc]) -> Iterator[CleanDoc]:
return pipe(docs, curried.map(clean_doc), curried.filter(lambda d: 'cs.AI' in d.categories))
# Compose for pipelines (add concat for arity change)
rag_stage = compose(curried.concat if TOOLZ_AVAILABLE else std_chain.from_iterable, curried.map(embed_chunk), curried.map(chunk_doc_curried(env))) # Concat/flatten as needed
# Curried configurator
@curried.curry
def chunk_doc_curried(env: RagEnv, cd: CleanDoc) -> Iterator[ChunkWithoutEmbedding]:
text = cd.abstract
return (ChunkWithoutEmbedding(cd.doc_id, text[i:i+env.chunk_size], i, i+len(text[i:i+env.chunk_size]))
for i in range(0, len(text), env.chunk_size)) # Allow shorter tail
# Memoize bounded
@memoize
def cached_embed_key(doc_id: str, abstract_hash: int) -> tuple[float, ...]:
return embed_chunk_by_key(doc_id, abstract_hash) # Assume helper; hashable key
def cached_embed(cd: CleanDoc) -> tuple[float, ...]:
return cached_embed_key(cd.doc_id, hash(cd.abstract))
# Reduceby for stats
def category_chunk_counts(chunks: Iterator[Chunk]) -> dict[str, int]:
return reduceby(lambda c: c.categories.split(',')[0] if c.categories else 'none', lambda acc, _: acc + 1, chunks, 0) # Hashable key
# Partition_all for batching
def batched_embed(chunks: Iterator[Chunk], batch_size: int) -> Iterator[list[Chunk]]:
return partition_all(batch_size, chunks)
# Overlapping chunks (allow shorter tail)
def overlapping_chunks(text: str, size: int, overlap: int) -> Iterator[str]:
step = size - overlap
for i in range(0, len(text), step):
chunk = text[i:i+size]
if chunk: # Drop empty
yield chunk
# Groupby (materializes; small n)
def group_by_category(docs: list[RawDoc]) -> dict[str, list[RawDoc]]:
return toolz_groupby(lambda d: d.categories.split(',')[0] if d.categories else 'none', docs) # Hashable
# Merge_sorted fan-in
def merge_docs(*doc_streams: Iterator[RawDoc]) -> Iterator[RawDoc]:
return merge_sorted(*doc_streams, key=attrgetter('doc_id'))
4.2 Returns for Boundary Monads¶
from returns.result import safe
from returns.io import impure, unsafe_perform_io
from returns.future import future_safe as future_result_safe # For FutureResult
# Safe at boundary + convert
def safe_clean(doc: RawDoc) -> FPResult[CleanDoc, ErrInfo]:
r = safe(clean_doc)(doc) # Canonical
return to_funcpipe_result(r.alt(lambda ex: ErrInfo.from_exc(ex))) # Alt for failure map
# Maybe at boundary (define helper)
def get_category_maybe(doc: RawDoc) -> Maybe[str]:
cats = doc.categories.split(',') if doc.categories else []
return Maybe.from_optional(cats[0] if cats else None)
def get_category(doc: RawDoc) -> Optional[str]: # To our Option
m = get_category_maybe(doc)
return m.value_or(None)
# IO thunked
def read_path(path: str) -> IO[str]:
@impure
def thunk(p: str) -> str:
return Path(p).read_text()
return thunk(path) # Effect happens here, labeled as IO; unwrapping is unsafe boundary
# FutureResult from coro (full boundary)
async def safe_async_embed(text: str) -> FPResult[tuple[float, ...], ErrInfo]:
fut = future_result_safe(embed_async, text) # Boundary wrap to FutureResult
io_r = await fut # Await at boundary to IOResult
r = unsafe_perform_io(io_r) # Unwrap IO at boundary to Result
return to_funcpipe_result(r.alt(lambda ex: ErrInfo.from_exc(ex))) # Convert
4.3 Integration in RAG¶
# RAG with optional helpers (types preserved: FPResult out)
def rag_optional(docs: Iterator[RawDoc]) -> Iterator[FPResult[Chunk, ErrInfo]]:
def wrap_clean(d: RawDoc) -> FPResult[CleanDoc, ErrInfo]:
if RETURNS_AVAILABLE:
return to_funcpipe_result(safe(clean_doc)(d))
try:
return Ok(clean_doc(d))
except Exception as ex:
return Err(ErrInfo.from_exc(ex)) # Fallback preserves FPResult
cleaned = pipe(docs, curried.map(wrap_clean)) if TOOLZ_AVAILABLE else map(wrap_clean, docs)
return map(chunk_and_embed, cleaned) # Core in FPResult
4.4 Before/After Refactor¶
from functools import reduce
from operator import attrgetter
# Before: Stdlib dict aggregation
def category_counts_std(chunks: Iterator[Chunk]) -> dict[str, int]:
counts = {}
for c in chunks:
key = c.categories.split(',')[0] if c.categories else 'none'
counts[key] = counts.get(key, 0) + 1
return counts
# After: toolz reduceby
def category_counts_toolz(chunks: Iterator[Chunk]) -> dict[str, int]:
return reduceby(lambda c: c.categories.split(',')[0] if c.categories else 'none', lambda acc, _: acc + 1, chunks, 0)
¶
from functools import reduce
from operator import attrgetter
# Before: Stdlib dict aggregation
def category_counts_std(chunks: Iterator[Chunk]) -> dict[str, int]:
counts = {}
for c in chunks:
key = c.categories.split(',')[0] if c.categories else 'none'
counts[key] = counts.get(key, 0) + 1
return counts
# After: toolz reduceby
def category_counts_toolz(chunks: Iterator[Chunk]) -> dict[str, int]:
return reduceby(lambda c: c.categories.split(',')[0] if c.categories else 'none', lambda acc, _: acc + 1, chunks, 0)
5. Property-Based Proofs (repo tests)¶
Runnable properties live in tests/unit/interop/test_toolz_compat.py.
from hypothesis import given, strategies as st
from toolz import pipe, compose, curried
from returns.result import Result, Success, Failure
def ret(y): return Success(y) # Monad return
@given(st.integers())
def test_result_functor_monad(x):
r: Result[int, str] = Success(x)
assert r.map(lambda y: y) == r # Functor id
f = lambda y: y + 1
g = lambda y: y * 2
assert r.map(lambda y: g(f(y))) == r.map(f).map(g) # Functor comp
assert r.bind(ret) == r # Right id
assert ret(x).bind(lambda y: ret(y + 1)) == ret(x + 1) # Left id
assert r.bind(lambda y: ret(y + 1).bind(lambda z: ret(z * 2))) == r.bind(lambda y: ret((y + 1) * 2)) # Associativity
@given(st.lists(st.integers()))
def test_toolz_groupby_equiv_sorted(xs):
key = lambda z: z % 3
toolz_g = toolz_groupby(key, xs)
it_g = {k: list(g) for k, g in groupby(sorted(xs, key=key), key=key)}
assert toolz_g == it_g # Equiv under sorted
def test_toolz_groupby_mismatch_unsorted():
xs = [1, 0, 2, 1] # Non-consecutive
key = lambda z: z % 3
toolz_g = toolz_groupby(key, xs)
it_g = {k: list(g) for k, g in groupby(xs, key=key)}
assert toolz_g[1] == [1, 1] and it_g[1] == [1] # Mismatch demo
6. Runtime Preservation Guarantee¶
Helpers ~ stdlib; groupby O(n) memory; memoize bounded by lru_cache.
7. Anti-Patterns & Immediate Fixes¶
| Anti-Pattern | Symptom | Fix |
|---|---|---|
| Mix types in stage | Type errors | Convert at boundary |
| toolz.pipe no curried | Runtime fail | Use curried always |
| toolz.groupby large | Memory blowup | itertools + sort |
| Unbounded memoize | Leak | Use lru_cache |
| --- | ||
| ## 8. Pre-Core Quiz | ||
| 1. toolz for…? → Syntax sugar + ops | ||
| 2. returns for…? → Boundary monads | ||
| 3. curried for…? → Pipe safety | ||
| 4. groupby diff…? → Materialize vs stream | ||
| 5. Optional how? → Compat layer |
9. Post-Core Exercise¶
- Implement flag-gated RAG.
- Prove laws with Hypothesis.
Pipeline Usage (Idiomatic)
Next Core: 3. Data Processing – Pandas/Polars/Dask in FP Style (Pure Transforms, Method Chains vs Pipelines)