Core 4: Web and Services – FastAPI Endpoints with Pure Core & Thin Adapters¶
Module 09
Core question:
How do you build web services using FastAPI in a functional style, with thin adapters delegating to pure core logic, dependency injection for capabilities, and explicit mappings for errors/responses to maintain composability and testability in FuncPipe pipelines?
In this core, we build FP-style web services with FastAPI in the FuncPipe RAG Builder (now at funcpipe-rag-09). FastAPI provides type-safe, async-capable APIs with automatic OpenAPI docs, using Pydantic for validation and dependency injection for pure cores. We emphasize thin adapters (routers/endpoints that delegate to pure funcs), pure domain logic (no I/O/mutations in core), and composable routes integrating with prior boundaries (Module 7) and async flows (Module 8). Refactor RAG into API endpoints (e.g., /rag/process for pipeline, /rag/query for search), verifying equivalence to non-web versions and laws like purity (same inputs yield same outputs). Dependencies: pip install fastapi uvicorn; tradeoffs: FastAPI speed/validation vs Flask simplicity; sharp edges: sync vs async routes (prefer async for I/O), dep overrides for tests, Pydantic v2 changes (prefer .model_dump() for serialization).
Motivation Bug: Imperative web handlers mixing logic with I/O lead to untestable, non-composable services; FP style separates pure cores from thin adapters for reliable APIs.
Delta from Core 3 (Module 09): Data libs handle batch processing; this exposes pipelines as services with HTTP boundaries.
Web Protocol (Contract, Entry/Exit Criteria): - Purity: Core funcs pure (depend on injected deps/inputs); effects in adapters/deps (e.g., DB via Depends); purity conditional on deps being referentially transparent/version-pinned. - Composability: Routes as functions; deps for capabilities; routers for modular APIs. - Async/Sync: Prefer async for I/O-bound; sync for compute. For CPU-bound cores in async routes, offload to threadpools (e.g., via concurrent.futures) or task queues to avoid blocking the event loop—don't fake parallelism with async def alone. - Semantics: Laws like purity (handler(fixed deps, input) deterministic); equivalence to non-web (API call == direct func, up to serialization/HTTP mapping); verified via properties. - Integration: Wrap RAG core in FastAPI endpoints; use Pydantic for ADTs (Module 5); test with overrides; map FPResult to HTTP (Ok→200, Err(domain)→4xx, Err(unexpected)→5xx). - Mypy Config: --strict; fastapi/pydantic typing.
Audience: Web developers building services around FP pipelines, needing pure cores with testable adapters.
Outcome: 1. Build FastAPI services with thin adapters/pure cores. 2. Expose RAG as API endpoints. 3. Prove equivalence/laws with Hypothesis.
1. Laws & Invariants¶
| Law | Description | Enforcement |
|---|---|---|
| Purity Law | Core funcs depend only on inputs/deps; no globals/I/O (conditional on deps transparency). | Code reviews/mypy |
| Equivalence Law | API response == direct core call for same input (up to serialization/HTTP mapping). | Hypothesis equivalence |
| Idempotence Inv | Pure cores: repeat calls same output (if idempotent logic). | Property tests |
| Resource Inv | Adapters manage lifecycles (e.g., yield deps for cleanup). | FastAPI tests |
These laws ensure web layers don't break FP properties.
2. Decision Table¶
| Scenario | Async Needed | Streaming Response | Background Tasks | Recommended |
|---|---|---|---|---|
| Sync compute API | No | No | No | Sync routes |
| Async I/O API | Yes | No | No | Async deps |
| Streaming output | Yes | Yes | No | StreamingResponse |
| Fire-forget | No | No | Yes | BackgroundTasks |
FastAPI for type-safe, async services; deps for FP injection.
3. Public API (Adapters for Core; Dep Injection)¶
Adapters wrap core funcs. Guard imports.
Repo alignment note (end-of-Module-09):
- FastAPI is treated as optional in this repo.
- A minimal guarded adapter exists at src/funcpipe_rag/boundaries/web/fastapi_app.py (create_app() raises ImportError if FastAPI isn't installed).
Error-to-HTTP mapping table (reference only): | Err Kind | Default HTTP | Example Code | Status Override | Retryable | Example | |----------|--------------|--------------|-----------------|-----------|---------| | domain | 400 | invalid_input| 422 for validation | No | Invalid doc format | | infra | 503 | db_timeout | - | Yes | DB timeout | | unexpected | 500 | runtime_error| - | No | Unexpected error |
from __future__ import annotations # For forward refs
from typing import Annotated
from collections.abc import AsyncIterable, AsyncGenerator
from functools import lru_cache
from fastapi import FastAPI, APIRouter, Body, Depends, HTTPException, BackgroundTasks
from pydantic import BaseModel
from funcpipe_rag import FPResult, Ok, Err, RawDoc, Chunk, ErrInfo
from funcpipe_rag import rag_core # Pure core
from funcpipe_rag import RagEnv # Environment/capability holder
# Assume ErrInfo from adts is immutable with attrs: kind ('domain'|'infra'|'unexpected'), msg, retryable, code (machine-readable)
# Extend for web mapping (without shadowing)
def map_err_to_http(err: ErrInfo) -> tuple[int, dict]:
STATUS_MAP = {
'domain': 400,
'infra': 503,
'unexpected': 500,
}
status = 422 if err.code == 'validation_error' else STATUS_MAP.get(err.kind, 500)
envelope = {
"error": {
"kind": err.kind,
"code": err.code,
"message": err.msg,
"retryable": err.retryable,
}
}
return status, envelope
# Boundary DTOs (Pydantic v2)
class RawDocDTO(BaseModel):
content: str
metadata: dict[str, str] = {}
class ChunkDTO(BaseModel):
text: str
embedding: list[float]
# Converters
def to_internal_docs(dtos: list[RawDocDTO]) -> list[RawDoc]:
return [RawDoc(content=dto.content, metadata=dto.metadata) for dto in dtos]
def from_internal_chunks(chunks: list[Chunk]) -> list[ChunkDTO]:
return [ChunkDTO(text=c.text, embedding=c.embedding) for c in chunks]
# Centralized mapping (Ok/Err are simple dataclasses; use isinstance for robustness)
def fpresult_to_http(result: FPResult[list[Chunk], ErrInfo]) -> list[ChunkDTO]:
if isinstance(result, Ok):
return from_internal_chunks(result.value)
elif isinstance(result, Err):
status, envelope = map_err_to_http(result.error)
raise HTTPException(status_code=status, detail=envelope)
else:
raise ValueError("Invalid FPResult")
# Lifespan for startup/shutdown
async def lifespan(app: FastAPI):
# Startup: preload embeddings, etc.
yield
# Shutdown: close connections
app = FastAPI(lifespan=lifespan)
# Cached dep; RagEnv must be immutable or hold only pure config
@lru_cache(maxsize=1)
def get_cached_env() -> RagEnv:
return RagEnv.from_config()
rag_router = APIRouter(prefix="/rag")
@rag_router.post("/process", response_model=list[ChunkDTO])
async def process_docs(
docs: list[RawDocDTO] = Body(...),
env: Annotated[RagEnv, Depends(get_cached_env)]
):
internal_docs = to_internal_docs(docs)
result = rag_core(internal_docs, env)
return fpresult_to_http(result)
app.include_router(rag_router)
4. Reference Implementations¶
4.1 FastAPI Basics in FP¶
# Pure handler
def pure_handler(docs: list[RawDoc], env: RagEnv) -> FPResult[list[Chunk], ErrInfo]:
return rag_core(docs, env)
# Thin adapter
@rag_router.post("/process")
async def rag_process(
docs: list[RawDocDTO] = Body(...),
env: Annotated[RagEnv, Depends(get_cached_env)]
):
result = pure_handler(to_internal_docs(docs), env)
return fpresult_to_http(result)
4.2 Dep Injection for Effects¶
async def get_db_dep() -> AsyncGenerator[DB, None]:
db = await async_db_connect()
try:
yield db
finally:
await db.close()
@rag_router.get("/db-test")
async def db_test(db: Annotated[DB, Depends(get_db_dep)]):
return await query_with_db_capability(db) # Pure relative to capability
4.3 Async Endpoints¶
async def async_rag_core(docs: list[RawDoc], env: RagEnv) -> FPResult[list[Chunk], ErrInfo]:
return await async_process(docs, env)
@rag_router.post("/async-process")
async def rag_async_process(
docs: list[RawDocDTO] = Body(...),
env: Annotated[RagEnv, Depends(get_cached_env)]
):
result = await async_rag_core(to_internal_docs(docs), env)
return fpresult_to_http(result)
4.4 Streaming Response¶
from typing import Protocol
from fastapi.responses import StreamingResponse
from asyncio import CancelledError
class Streamer(Protocol):
async def stream_query(self, query: str) -> AsyncIterable[str]: ...
async def close(self) -> None: ...
async def cancel(self) -> None: ...
async def get_streamer_dep() -> AsyncGenerator[Streamer, None]:
streamer = await get_streamer()
try:
yield streamer
finally:
await streamer.close()
async def stream_rag_core(query: str, streamer: Streamer) -> AsyncIterable[bytes]:
try:
async for chunk in streamer.stream_query(query):
yield chunk.encode()
except CancelledError:
await streamer.cancel()
@rag_router.get("/stream-query")
async def rag_stream_query(
query: str,
streamer: Annotated[Streamer, Depends(get_streamer_dep)]
):
return StreamingResponse(stream_rag_core(query, streamer), media_type="text/event-stream")
4.5 Background Tasks¶
class WorkDesc(BaseModel):
tasks: list[str]
async def execute_work(work: WorkDesc, dep: Dep) -> None:
...
def describe_background_work(input: Input) -> WorkDesc:
return WorkDesc(tasks=[...])
@rag_router.post("/background")
async def background_handler(
input: Model = Body(...),
bg_tasks: BackgroundTasks,
dep=Depends()
):
work = describe_background_work(input)
bg_tasks.add_task(execute_work, work, dep)
return {"status": "scheduled"}
4.6 Integration in RAG¶
4.7 Before/After Refactor¶
# Before: Imperative
@rag_router.post("/")
def handler(body: dict):
docs = parse(body)
result = process(docs) # Mixed
return result
# After: FP adapter
@rag_router.post("/")
async def handler(docs: list[RawDocDTO] = Body(...)):
return fpresult_to_http(pure_process(to_internal_docs(docs)))
5. Property-Based Proofs (repo tests)¶
This repo does not run FastAPI property tests by default because FastAPI is an optional dependency here. The FP laws are enforced by keeping the web layer thin and heavily unit-testing the pure core.
from hypothesis import given
import hypothesis.strategies as st
from fastapi.testclient import TestClient
from funcpipe_rag import rag_core, default_env
def normalize_json(data):
if isinstance(data, list):
return sorted([{"text": i["text"], "embedding": i["embedding"]} for i in data], key=lambda x: x["text"])
return data
@given(docs=st.lists(raw_doc_strat()))
def test_api_equiv(docs):
dto_docs = [RawDocDTO(content=d.content, metadata=d.metadata) for d in docs]
client = TestClient(app)
try:
app.dependency_overrides[get_cached_env] = lambda: default_env
response = client.post("/rag/process", json=[d.model_dump() for d in dto_docs])
finally:
app.dependency_overrides.clear()
expected_result = rag_core(docs, default_env)
if isinstance(expected_result, Ok):
assert response.status_code == 200
expected_dto = from_internal_chunks(expected_result.value)
assert normalize_json(response.json()) == normalize_json([d.model_dump() for d in expected_dto])
elif isinstance(expected_result, Err):
status, envelope = map_err_to_http(expected_result.error)
assert response.status_code == status
assert response.json() == {"detail": envelope}
6. Runtime Preservation Guarantee¶
FastAPI async for concurrency; core perf preserved.
7. Anti-Patterns & Immediate Fixes¶
| Anti-Pattern | Symptom | Fix |
|---|---|---|
| Mixed I/O in core | Untestable | Deps for effects |
| No overrides | Hard mocks | Use dep overrides |
| Sync I/O blocks | Slow | Async routes |
8. Pre-Core Quiz¶
- FastAPI for…? → Typed async APIs
- Deps for…? → FP injection
- Adapters for…? → HTTP boundaries
- Law? → Purity of handlers
- Benefit? → Testable services
9. Post-Core Exercise¶
- Expose RAG as FastAPI.
- Prove equiv with Hypothesis.
Pipeline Usage (Idiomatic)
@rag_router.post("/")
async def handler(input: Model = Body(...), dep=Depends()):
return pure_core(input, dep)
Next Core: 5. Data/ML Pipelines – Stateless Transforms, Config-Driven FuncPipe Steps