Skip to content

Core 2: FP Helper Libraries – toolz / returns in Real Pipelines

Module 09

Core question:
How do you optionally integrate external FP libraries like toolz (for ergonomic combinators) and returns (for typed monadic containers) into FuncPipe pipelines, using toolz for syntax sugar over stdlib iterators and returns only at outermost boundaries with immediate conversion to our custom ADTs?

In this core, we introduce external FP helpers as optional enhancements in the FuncPipe RAG Builder (now at funcpipe-rag-09). toolz provides ergonomic combinators (e.g., pipe, compose, curried.map/filter, memoize, reduceby, partition_all, sliding_window, groupby, merge_sorted), with some as direct sugar over stdlib (pipe/compose/curried variants) and others adding new semantics (e.g., reduceby for keyed folds). returns offers typed monads (Result, Maybe, IO, Future) with map/bind/rescue/fix, used strictly at boundaries (e.g., wrapping impure calls) and converted immediately to our ADTs (FuncPipe Result/Option) to avoid mixing types. We refactor select RAG stages with runtime-checked fallbacks, verifying equivalence and laws like functor/monad properties. Dependencies are optional via imports and a compat layer; installation: pip install toolz returns (mypy plugin: pip install returns[mypy]). Sharp edges: toolz.groupby materializes O(n) dict (no sort) vs streaming itertools.groupby (requires sorted/consecutive)—choose explicitly; returns requires plugin for typing. Avoid helpers for minimal deps; adopt only if ergonomics/typing justify load.

Motivation Bug: Custom combinators/monads risk bugs; toolz/returns offer tested impls, but mixing causes type splits—hence boundary-only policy with converters.

Delta from Core 1 (Module 09): Stdlib grounds basics; helpers add optional ergonomics/typing with fallbacks and boundaries.

Helper Lib Protocol (Contract, Entry/Exit Criteria): - Interop Policy: toolz: syntax over stdlib (e.g., curried.map == lambda f: lambda it: map(f, it)); returns: boundaries only (e.g., wrap impure, convert to FPResult immediately); no mixing within stages—core pipelines use FuncPipe ADTs exclusively. - Composability: toolz.pipe chains eagerly; returns.Result composes via bind/map (monadic). - Purity: Pure funcs only; memoize preserves purity; IO/Future describe effects (IO holds values eagerly unless thunked; Future defers coros until .awaited yields IO). - Laziness/Eagerness: toolz.itertoolz defers (e.g., curried.map yields generator); toolz.pipe applies eagerly. returns containers eager (compute on creation) except Future. - Semantics: Laws like functor (map id == id, map (f . g) == map g . map f) and monad (left/right identity, associativity); verified via properties. Bold warn: toolz.groupby O(n) dict vs itertools.groupby streaming—mismatch requires explicit choice. - Integration: Runtime-checked imports with stdlib fallbacks preserving types; converters at boundaries. Refactor RAG optionally. - Mypy Config: --strict; returns[mypy] plugin required for typing; ignore toolz if untyped.

Audience: Engineers seeking ergonomics/typed monads beyond stdlib, with safe boundaries to preserve pipeline coherence.

Outcome: 1. Use toolz for curried/pipe syntax; returns for boundary monads with converters. 2. Refactor RAG with runtime fallbacks preserving types. 3. Prove equivalence/laws with Hypothesis.


1. Laws & Invariants

Law Description Enforcement
Functor Law map(lambda x: x) == id; map(lambda x: f(g(x))) == map(g).map(f). Property tests
Monad Law bind(return, f) == f(x); bind(m, return) == m; bind(bind(m, f), g) == bind(m, lambda x: bind(f(x), g)). Property tests (returns only)
Purity Law Combinators depend only on inputs; no mutation. Code reviews
Laziness Inv Iterator funcs defer; pipe eager. Tests with islice
Equivalence Law Lib versions == stdlib/custom (post-conversion). Hypothesis equivalence

These laws align helpers with our ADTs.


2. Decision Table

Scenario Ergonomics Gain Typed Monads Needed Cognitive Load Recommended
Chain iterators High No Low toolz.pipe/curried
Wrap impure Low High Medium returns.safe + converter
Curry funcs High No Low toolz.curried
Memo pure Medium No Low functools.lru_cache (bounded)
Minimal deps None None None Stdlib fallback

Tradeoff: Helpers add load; use if benefits > ecosystem split.


3. Public API (Interop Wrappers/Converters)

Repo alignment note (end-of-Module-09): - Toolz/returns are optional and not installed by default. - Runnable compat lives in: - src/funcpipe_rag/interop/toolz_compat.py - src/funcpipe_rag/interop/returns_compat.py - Core code continues to use funcpipe_rag.result.types.Result / Option.

from typing import Callable, TypeVar, Iterator, Iterable, Optional
from functools import reduce, partial, lru_cache
from funcpipe_rag import Result as FPResult, Ok, Err, ErrInfo  # Our ADTs

T = TypeVar('T')
U = TypeVar('U')
E = TypeVar('E')

TOOLZ_AVAILABLE = False
RETURNS_AVAILABLE = False

try:
    import toolz

    TOOLZ_AVAILABLE = True
except ImportError:
    pass

try:
    from returns.result import Result, Success, Failure
    from returns.maybe import Maybe, Some, Nothing

    RETURNS_AVAILABLE = True
except ImportError:
    pass


def to_funcpipe_result(r: 'Result[T, E]') -> FPResult[T, E]:
    if isinstance(r, Success):
        return Ok(r.value)
    assert isinstance(r, Failure)
    return Err(ErrInfo.from_returns_failure(r.failure()))  # Assume ErrInfo factory; map fully


# Compat layer (type-preserving fallbacks)
class CompatCurried:
    @staticmethod
    def map(f: Callable[[T], U]) -> Callable[[Iterable[T]], Iterator[U]]:
        return partial(map, f)

    @staticmethod
    def filter(p: Callable[[T], bool]) -> Callable[[Iterable[T]], Iterator[T]]:
        return partial(filter, p)
    # Add more as used


curried = toolz.curried if TOOLZ_AVAILABLE else CompatCurried()


def compat_pipe(x, *fs):
    return reduce(lambda v, f: f(v), fs, x)


pipe = toolz.pipe if TOOLZ_AVAILABLE else compat_pipe

memoize = lru_cache(maxsize=128)  # Stdlib bounded for both paths

4. Reference Implementations

4.1 Toolz for Ergonomic Combinators

from operator import attrgetter
from itertools import chain as std_chain
# Pipe with curried (lazy where possible)
def process_docs(docs: Iterator[RawDoc]) -> Iterator[CleanDoc]:
    return pipe(docs, curried.map(clean_doc), curried.filter(lambda d: 'cs.AI' in d.categories))

# Compose for pipelines (add concat for arity change)
rag_stage = compose(curried.concat if TOOLZ_AVAILABLE else std_chain.from_iterable, curried.map(embed_chunk), curried.map(chunk_doc_curried(env)))  # Concat/flatten as needed

# Curried configurator
@curried.curry
def chunk_doc_curried(env: RagEnv, cd: CleanDoc) -> Iterator[ChunkWithoutEmbedding]:
    text = cd.abstract
    return (ChunkWithoutEmbedding(cd.doc_id, text[i:i+env.chunk_size], i, i+len(text[i:i+env.chunk_size])) 
            for i in range(0, len(text), env.chunk_size))  # Allow shorter tail

# Memoize bounded
@memoize
def cached_embed_key(doc_id: str, abstract_hash: int) -> tuple[float, ...]:
    return embed_chunk_by_key(doc_id, abstract_hash)  # Assume helper; hashable key

def cached_embed(cd: CleanDoc) -> tuple[float, ...]:
    return cached_embed_key(cd.doc_id, hash(cd.abstract))

# Reduceby for stats
def category_chunk_counts(chunks: Iterator[Chunk]) -> dict[str, int]:
    return reduceby(lambda c: c.categories.split(',')[0] if c.categories else 'none', lambda acc, _: acc + 1, chunks, 0)  # Hashable key

# Partition_all for batching
def batched_embed(chunks: Iterator[Chunk], batch_size: int) -> Iterator[list[Chunk]]:
    return partition_all(batch_size, chunks)

# Overlapping chunks (allow shorter tail)
def overlapping_chunks(text: str, size: int, overlap: int) -> Iterator[str]:
    step = size - overlap
    for i in range(0, len(text), step):
        chunk = text[i:i+size]
        if chunk:  # Drop empty
            yield chunk

# Groupby (materializes; small n)
def group_by_category(docs: list[RawDoc]) -> dict[str, list[RawDoc]]:
    return toolz_groupby(lambda d: d.categories.split(',')[0] if d.categories else 'none', docs)  # Hashable

# Merge_sorted fan-in
def merge_docs(*doc_streams: Iterator[RawDoc]) -> Iterator[RawDoc]:
    return merge_sorted(*doc_streams, key=attrgetter('doc_id'))

4.2 Returns for Boundary Monads

from returns.result import safe
from returns.io import impure, unsafe_perform_io
from returns.future import future_safe as future_result_safe  # For FutureResult
# Safe at boundary + convert
def safe_clean(doc: RawDoc) -> FPResult[CleanDoc, ErrInfo]:
    r = safe(clean_doc)(doc)  # Canonical
    return to_funcpipe_result(r.alt(lambda ex: ErrInfo.from_exc(ex)))  # Alt for failure map

# Maybe at boundary (define helper)
def get_category_maybe(doc: RawDoc) -> Maybe[str]:
    cats = doc.categories.split(',') if doc.categories else []
    return Maybe.from_optional(cats[0] if cats else None)

def get_category(doc: RawDoc) -> Optional[str]:  # To our Option
    m = get_category_maybe(doc)
    return m.value_or(None)

# IO thunked
def read_path(path: str) -> IO[str]:
    @impure
    def thunk(p: str) -> str:
        return Path(p).read_text()
    return thunk(path)  # Effect happens here, labeled as IO; unwrapping is unsafe boundary

# FutureResult from coro (full boundary)
async def safe_async_embed(text: str) -> FPResult[tuple[float, ...], ErrInfo]:
    fut = future_result_safe(embed_async, text)  # Boundary wrap to FutureResult
    io_r = await fut  # Await at boundary to IOResult
    r = unsafe_perform_io(io_r)  # Unwrap IO at boundary to Result
    return to_funcpipe_result(r.alt(lambda ex: ErrInfo.from_exc(ex)))  # Convert

4.3 Integration in RAG

# RAG with optional helpers (types preserved: FPResult out)
def rag_optional(docs: Iterator[RawDoc]) -> Iterator[FPResult[Chunk, ErrInfo]]:
    def wrap_clean(d: RawDoc) -> FPResult[CleanDoc, ErrInfo]:
        if RETURNS_AVAILABLE:
            return to_funcpipe_result(safe(clean_doc)(d))
        try:
            return Ok(clean_doc(d))
        except Exception as ex:
            return Err(ErrInfo.from_exc(ex))  # Fallback preserves FPResult
    cleaned = pipe(docs, curried.map(wrap_clean)) if TOOLZ_AVAILABLE else map(wrap_clean, docs)
    return map(chunk_and_embed, cleaned)  # Core in FPResult

4.4 Before/After Refactor

from functools import reduce
from operator import attrgetter
# Before: Stdlib dict aggregation
def category_counts_std(chunks: Iterator[Chunk]) -> dict[str, int]:
    counts = {}
    for c in chunks:
        key = c.categories.split(',')[0] if c.categories else 'none'
        counts[key] = counts.get(key, 0) + 1
    return counts
# After: toolz reduceby
def category_counts_toolz(chunks: Iterator[Chunk]) -> dict[str, int]:
    return reduceby(lambda c: c.categories.split(',')[0] if c.categories else 'none', lambda acc, _: acc + 1, chunks, 0)

5. Property-Based Proofs (repo tests)

Runnable properties live in tests/unit/interop/test_toolz_compat.py.

from hypothesis import given, strategies as st
from toolz import pipe, compose, curried
from returns.result import Result, Success, Failure
def ret(y): return Success(y)  # Monad return
@given(st.integers())
def test_result_functor_monad(x):
    r: Result[int, str] = Success(x)
    assert r.map(lambda y: y) == r  # Functor id
    f = lambda y: y + 1
    g = lambda y: y * 2
    assert r.map(lambda y: g(f(y))) == r.map(f).map(g)  # Functor comp
    assert r.bind(ret) == r  # Right id
    assert ret(x).bind(lambda y: ret(y + 1)) == ret(x + 1)  # Left id
    assert r.bind(lambda y: ret(y + 1).bind(lambda z: ret(z * 2))) == r.bind(lambda y: ret((y + 1) * 2))  # Associativity
@given(st.lists(st.integers()))
def test_toolz_groupby_equiv_sorted(xs):
    key = lambda z: z % 3
    toolz_g = toolz_groupby(key, xs)
    it_g = {k: list(g) for k, g in groupby(sorted(xs, key=key), key=key)}
    assert toolz_g == it_g  # Equiv under sorted
def test_toolz_groupby_mismatch_unsorted():
    xs = [1, 0, 2, 1]  # Non-consecutive
    key = lambda z: z % 3
    toolz_g = toolz_groupby(key, xs)
    it_g = {k: list(g) for k, g in groupby(xs, key=key)}
    assert toolz_g[1] == [1, 1] and it_g[1] == [1]  # Mismatch demo


6. Runtime Preservation Guarantee

Helpers ~ stdlib; groupby O(n) memory; memoize bounded by lru_cache.


7. Anti-Patterns & Immediate Fixes

Anti-Pattern Symptom Fix
Mix types in stage Type errors Convert at boundary
toolz.pipe no curried Runtime fail Use curried always
toolz.groupby large Memory blowup itertools + sort
Unbounded memoize Leak Use lru_cache
---
## 8. Pre-Core Quiz
1. toolz for…? → Syntax sugar + ops
2. returns for…? → Boundary monads
3. curried for…? → Pipe safety
4. groupby diff…? → Materialize vs stream
5. Optional how? → Compat layer

9. Post-Core Exercise

  1. Implement flag-gated RAG.
  2. Prove laws with Hypothesis.

Pipeline Usage (Idiomatic)

result = pipe(data, curried.map(f), curried.filter(g))

Next Core: 3. Data Processing – Pandas/Polars/Dask in FP Style (Pure Transforms, Method Chains vs Pipelines)