Core 8: Wrapping Imperative Libraries in Functional Facades (Normalizing Side-Effectful APIs)¶
Module 09
Core question: How do you wrap imperative, side-effectful libraries in pure functional facades that return effect descriptions (not executed effects), expose algebraic ports with enforced contracts (e.g., keyed elements for idempotence), and normalize errors/resources into rich domain ADTs, enabling seamless integration into FuncPipe while preserving purity, composability, and testability?
In this core, we address the disciplined integration of imperative third-party libraries into the FuncPipe RAG Builder (now at funcpipe-rag-09). Libraries like openai, requests, boto3, or psycopg2 introduce effects (I/O, mutable state, unstructured exceptions) that must be confined to interpreters at boundaries. We design facades as pure ports (protocols returning IoAction/AsyncAction descriptions with algebraic laws), adapters as delayed imperative implementations (thin translation only), and interpreters as sinks that execute effects (handling runtime policies like retries, caching, backoff). Errors normalize to rich ErrInfo (with retryable, status, provenance, budgets, context); resources use context managers in descriptions; idempotence is enforced at types (Keyed[K, T] elements for non-idempotent ops). We refactor RAG embedding to use an EmbedderPort returning IoAction[Result[List[Keyed[K, EmbeddedChunk]], ErrInfo]], and handle a "hard" case: wrapping a streaming Kafka consumer into AsyncAction[Stream[Result[Keyed[Optional[K], bytes], ErrInfo]]] with backpressure, commit semantics, restart behavior, ordering guarantees, and deterministic shutdown. Laws are verified with fake interpreters; equivalence uses quarantined integration tests.
Dependencies: Varies (e.g., pip install openai requests psycopg2 boto3 confluent-kafka); tradeoffs: Descriptions enable mocks/retries but add indirection; sharp edges: Enforce single-shot with runtime guards; test with fakes to avoid live I/O.
Motivation Bug: Eager wrappers leak effects/mutability into cores; pure facades return descriptions, confining execution to sinks for testable, retry-safe pipelines.
Delta from Core 7 (Module 09): Distributed systems compile pure operators; this core treats imperative libs as interpreters/sinks, wrapping them in pure ports that return descriptions, completing the effect boundary story.
Facade Protocol (Contract, Entry/Exit Criteria): - Purity: Facades return descriptions; no effects executed in port methods. - Normalization: Lib errors → rich ErrInfo; resources via context managers in descriptions. - Algebraic Ports: Protocols return descriptions obeying monad laws (map/bind associativity, identity); single-shot (re-interpretation raises); cancellation closes resources; observational equality (same interpreter yields same result). - Idempotence: Enforced at types (Keyed[K, T] elements for non-idempotent ops). - Semantics: Laws like purity (facade on fixed inputs returns equivalent description); equivalence (interpreted facade observationally refines direct lib up to normalization); idempotence (keyed ops equivalent on repeats); verified via fake interpreters. - Integration: Use Module 07 IoAction/AsyncAction; compose with bind/map; interpret at sinks. - Mypy Config: --strict; protocols for ports.
Audience: Engineers integrating real-world libs into FP pipelines without compromising purity.
Outcome: 1. Design pure facades returning algebraic descriptions. 2. Wrap RAG dependencies (e.g., OpenAI, Kafka) into ports. 3. Prove laws with fake interpreters.
1. Laws & Invariants¶
| Law | Description | Enforcement |
|---|---|---|
| Purity Law | Facade returns description only; no effects executed. | Code reviews/no I/O in port methods |
| Equivalence Law | Interpreted facade observationally refines direct lib (up to normalization). | Hypothesis with fake + integration tests |
| Idempotence Inv | Keyed ops yield same results on repeat. | Property tests with keys/repeats |
| Resource Inv | Descriptions clean up on errors/cancellation. | Contextlib + Hypothesis |
IoAction/AsyncAction Algebra: Monad laws (bind associativity, left/right identity); single-shot (re-interpretation raises RuntimeError via internal flag); cancellation closes resources; observational equality (same interpreter yields same result).
2. Decision Table¶
| Lib Hazard | Duplication Risk | Mutation | Ordering | Partial Success | Recommended |
|---|---|---|---|---|---|
| HTTP/DB | Low | Low | No | No | IoAction + Port |
| API batches | Medium | Low | No | Yes | IoAction + Keyed |
| Streaming/callback | High | High | Yes | Yes | AsyncAction[Stream] + Keyed |
Choose based on semantic risks; types enforce contracts.¶
3. Public API (src/funcpipe_rag/domain/facades.py – Pure Ports)¶
Ports return descriptions; adapters delay effects.
Assumption: ErrInfo, Ok, Err, and Result are the canonical ADTs from funcpipe_rag.result.types (or the root re-exports in funcpipe_rag).
Do not re-define them here—extend at the ADT layer if needed.
from typing import Protocol, List, Callable, Generic, TypeVar, AsyncGenerator, Optional, Dict, Any
from funcpipe_rag import Result, ErrInfo, Ok, Err, Chunk, EmbeddedChunk
from funcpipe_rag import IoAction, AsyncAction, Stream # Sync/async descriptions
from pydantic import BaseModel, Field
from dataclasses import dataclass, field
K = TypeVar('K')
T = TypeVar('T')
@dataclass(frozen=True)
class Keyed(Generic[K, T]):
"""Domain-level idempotence key wrapper. Should live in adts if used across cores."""
key: K
value: T
class EmbedderPort(Protocol):
def embed_batch(self, items: List[Keyed[K, Chunk]]) -> IoAction[Result[List[Keyed[K, EmbeddedChunk]], ErrInfo]]: ...
def openai_embedder_adapter(
client_factory: Callable[[], "openai.OpenAI"],
model: str = "text-embedding-3-large",
) -> EmbedderPort:
def map_response(items: List[Keyed[K, Chunk]], resp: Any) -> List[Keyed[K, EmbeddedChunk]]:
return [Keyed(key=items[i].key, value=EmbeddedChunk(chunk=items[i].value, embedding=resp.data[i].embedding)) for
i in range(len(items))]
class OpenAIEmbedder:
def embed_batch(self, items: List[Keyed[K, Chunk]]) -> IoAction[Result[List[Keyed[K, EmbeddedChunk]], ErrInfo]]:
def run() -> Result[List[Keyed[K, EmbeddedChunk]], ErrInfo]:
import openai
client = client_factory()
texts = [item.value.text for item in items]
keys = [item.key for item in items]
try:
resp = client.embeddings.create(input=texts, model=model)
embeddings = map_response(items, resp)
return Ok(embeddings)
except openai.RateLimitError as e:
return Err(ErrInfo(
code="RATE_LIMIT", msg=str(e), retryable=True, http_status=429,
op="embed_batch", context={"keys": keys, "model": model},
cause=repr(e), budgets={"retries_left": 3}
))
except openai.AuthenticationError as e:
return Err(
ErrInfo(code="AUTH_ERR", msg=str(e), retryable=False, op="embed_batch", context={"keys": keys},
cause=repr(e)))
except Exception as e:
return Err(
ErrInfo(code="EMBED_ERR", msg=str(e), retryable=False, op="embed_batch", context={"keys": keys},
cause=repr(e)))
return IoAction.delay(run)
return OpenAIEmbedder()
¶
from typing import Protocol, List, Callable, Generic, TypeVar, AsyncGenerator, Optional, Dict, Any
from funcpipe_rag import Result, ErrInfo, Ok, Err, Chunk, EmbeddedChunk
from funcpipe_rag import IoAction, AsyncAction, Stream # Sync/async descriptions
from pydantic import BaseModel, Field
from dataclasses import dataclass, field
K = TypeVar('K')
T = TypeVar('T')
@dataclass(frozen=True)
class Keyed(Generic[K, T]):
"""Domain-level idempotence key wrapper. Should live in adts if used across cores."""
key: K
value: T
class EmbedderPort(Protocol):
def embed_batch(self, items: List[Keyed[K, Chunk]]) -> IoAction[Result[List[Keyed[K, EmbeddedChunk]], ErrInfo]]: ...
def openai_embedder_adapter(
client_factory: Callable[[], "openai.OpenAI"],
model: str = "text-embedding-3-large",
) -> EmbedderPort:
def map_response(items: List[Keyed[K, Chunk]], resp: Any) -> List[Keyed[K, EmbeddedChunk]]:
return [Keyed(key=items[i].key, value=EmbeddedChunk(chunk=items[i].value, embedding=resp.data[i].embedding)) for
i in range(len(items))]
class OpenAIEmbedder:
def embed_batch(self, items: List[Keyed[K, Chunk]]) -> IoAction[Result[List[Keyed[K, EmbeddedChunk]], ErrInfo]]:
def run() -> Result[List[Keyed[K, EmbeddedChunk]], ErrInfo]:
import openai
client = client_factory()
texts = [item.value.text for item in items]
keys = [item.key for item in items]
try:
resp = client.embeddings.create(input=texts, model=model)
embeddings = map_response(items, resp)
return Ok(embeddings)
except openai.RateLimitError as e:
return Err(ErrInfo(
code="RATE_LIMIT", msg=str(e), retryable=True, http_status=429,
op="embed_batch", context={"keys": keys, "model": model},
cause=repr(e), budgets={"retries_left": 3}
))
except openai.AuthenticationError as e:
return Err(
ErrInfo(code="AUTH_ERR", msg=str(e), retryable=False, op="embed_batch", context={"keys": keys},
cause=repr(e)))
except Exception as e:
return Err(
ErrInfo(code="EMBED_ERR", msg=str(e), retryable=False, op="embed_batch", context={"keys": keys},
cause=repr(e)))
return IoAction.delay(run)
return OpenAIEmbedder()
4. Reference Implementations¶
4.1 HTTP Facade (requests)¶
import requests
from funcpipe_rag import Result, ErrInfo, Ok, Err
from funcpipe_rag import IoAction
def http_get(item: Keyed[K, str]) -> IoAction[Result[Keyed[K, str], ErrInfo]]:
def run() -> Result[Keyed[K, str], ErrInfo]:
try:
r = requests.get(item.value, timeout=10)
r.raise_for_status()
return Ok(Keyed(key=item.key, value=r.text))
except requests.Timeout as e:
return Err(ErrInfo(code="TIMEOUT", msg=str(e), retryable=True, op="http_get", context={"key": item.key}))
except requests.HTTPError as e:
return Err(ErrInfo(code="HTTP_ERR", msg=str(e), http_status=e.response.status_code, op="http_get",
context={"key": item.key}))
except Exception as e:
return Err(ErrInfo(code="GET_ERR", msg=str(e), op="http_get", context={"key": item.key}, cause=repr(e)))
return IoAction.delay(run)
4.2 DB Facade (psycopg2)¶
import psycopg2
from psycopg2.extras import RealDictCursor
from typing import List, Dict, Any
from funcpipe_rag import Result, ErrInfo, Ok, Err, Keyed
from funcpipe_rag import IoAction
def db_query(item: Keyed[K, Tuple[str, str]]) -> IoAction[Result[Keyed[K, List[Dict[str, Any]]], ErrInfo]]:
conn_str, sql = item.value
def run():
try:
with psycopg2.connect(conn_str) as conn, conn.cursor(cursor_factory=RealDictCursor) as cur:
cur.execute(sql)
rows = cur.fetchall()
return Ok(Keyed(key=item.key, value=rows))
except Exception as e:
return Err(ErrInfo(code="DB_ERR", msg=str(e), op="db_query", context={"key": item.key}, cause=repr(e)))
return IoAction.delay(run)
4.3 Hard Case: Streaming Kafka (confluent-kafka)¶
from typing import Optional, AsyncGenerator, Dict, Literal, Protocol, Callable, TypeVar
from funcpipe_rag import Result, ErrInfo, Ok, Err, Keyed
from funcpipe_rag import AsyncAction, Stream
import asyncio
from confluent_kafka import Consumer, Message # Assume Message.error().retriable()
K = TypeVar("K")
class StreamConsumerPort(Protocol):
# Constrain key type to Optional[str] for this adapter; don't pretend it's fully generic.
def consume_stream(
self, topic: str, group_id: str,
commit_strategy: Literal["per_msg", "batch"],
batch_size: int = 100
) -> AsyncAction[Stream[Result[Keyed[Optional[str], bytes], ErrInfo]]]: ...
def kafka_consumer_adapter(
config_factory: Callable[[], Dict[str, str]],
) -> StreamConsumerPort:
class KafkaConsumer:
def consume_stream(self, topic: str, group_id: str, commit_strategy: Literal["per_msg", "batch"],
batch_size: int = 100) -> AsyncAction[Stream[Result[Keyed[Optional[str], bytes], ErrInfo]]]:
async def run() -> AsyncGenerator[Result[Keyed[Optional[str], bytes], ErrInfo]]:
conf = config_factory()
conf['group.id'] = group_id
consumer = Consumer(conf)
consumer.subscribe([topic])
try:
pending_commits = 0
while True:
msg = await asyncio.to_thread(consumer.poll, 1.0) # Backpressure: yield per msg
if msg is None:
await asyncio.sleep(0.1) # Yield control
continue
if msg.error():
# Do NOT commit errored messages; preserve at-least-once.
yield Err(ErrInfo(
code="KAFKA_ERR", msg=msg.error().str(),
op="consume_stream", retryable=msg.error().retriable()
))
else:
key = msg.key().decode() if msg.key() else None
value = msg.value()
yield Ok(Keyed(key=key, value=value))
pending_commits += 1
if commit_strategy == "per_msg":
await asyncio.to_thread(consumer.commit, asynchronous=False) # At-least-once, ordered
pending_commits = 0
elif commit_strategy == "batch" and pending_commits >= batch_size:
await asyncio.to_thread(consumer.commit, asynchronous=False)
pending_commits = 0
finally:
await asyncio.to_thread(consumer.close) # Deterministic shutdown
return AsyncAction.delay_gen(run) # Backpressure on yield; commit policy explicit; restart via group_id
return KafkaConsumer()
4.4 Integration in RAG¶
embedder = openai_embedder_adapter(client_factory=lambda: openai.OpenAI(api_key=os.getenv("OPENAI_KEY")))
items = [Keyed(key=str(i), value=c) for i, c in enumerate(chunks)]
action = embedder.embed_batch(items)
result = perform_io(action) # Interpreter at sink
4.5 Before/After Refactor¶
# Before: Imperative
client = openai.OpenAI(api_key="...")
resp = client.embeddings.create(...)
# After: Pure facade + interpreter
embedder = openai_embedder_adapter(client_factory=lambda: openai.OpenAI(api_key="..."))
items = [Keyed(key=str(i), value=c) for i, c in enumerate(chunks)]
action = embedder.embed_batch(items)
result = perform_io(action) # Effects only here
¶
# Before: Imperative
client = openai.OpenAI(api_key="...")
resp = client.embeddings.create(...)
# After: Pure facade + interpreter
embedder = openai_embedder_adapter(client_factory=lambda: openai.OpenAI(api_key="..."))
items = [Keyed(key=str(i), value=c) for i, c in enumerate(chunks)]
action = embedder.embed_batch(items)
result = perform_io(action) # Effects only here
5. Property-Based Proofs (repo tests)¶
Runnable tests live in tests/unit/domain/test_facades.py.
@given(st.lists(chunk_strat(), max_size=10))
def test_openai_facade_laws_fake(chunks):
fake_resp = FakeOpenAIResponse(data=[FakeEmbedding(embedding=[0.1]*1536) for _ in chunks])
def fake_factory(): return FakeOpenAI(fake_resp)
facade = openai_embedder_adapter(fake_factory)
keys = [str(i) for i in range(len(chunks))]
items = [Keyed(key=k, value=c) for k, c in zip(keys, chunks)]
action1 = facade.embed_batch(items)
action2 = facade.embed_batch(items)
# Observational equality: interpret under same fake
actual1 = perform_fake_io(action1)
actual2 = perform_fake_io(action2)
assert actual1 == actual2
expected = Ok([Keyed(key=keys[i], value=EmbeddedChunk(chunk=chunks[i], embedding=fake_resp.data[i].embedding)) for i in range(len(chunks))])
assert actual1 == expected
6. Runtime Preservation Guarantee¶
Facades add description overhead (negligible); interpretation at sinks preserves perf.¶
7. Anti-Patterns & Immediate Fixes¶
| Anti-Pattern | Symptom | Fix |
|---|---|---|
| Eager facades | Effects in core | Return descriptions |
| Poor error mapping | Lost retry info | Rich ErrInfo |
| Thick facades | Logic in wrapper | Thin translation only |
| Global clients | Races | Inject factory |
| --- | ||
| ## 8. Pre-Core Quiz | ||
| 1. Facades return…? → Descriptions | ||
| 2. Effects run in…? → Interpreters/sinks | ||
| 3. Errors become…? → Rich ErrInfo | ||
| 4. Benefit? → Pure + testable | ||
| 5. Law? → Purity + equivalence | ||
| ## 9. Post-Core Exercise | ||
| 1. Wrap a lib (e.g., boto3) returning IoAction. | ||
| 2. Prove laws with fake interpreter. | ||
| Pipeline Usage (Idiomatic) | ||
| Next Core: 9. Cross-Process Composition – Serializing Configs/Specifications, Not Live Objects |