Skip to content

Core 4: Web and Services – FastAPI Endpoints with Pure Core & Thin Adapters

Module 09

Core question:
How do you build web services using FastAPI in a functional style, with thin adapters delegating to pure core logic, dependency injection for capabilities, and explicit mappings for errors/responses to maintain composability and testability in FuncPipe pipelines?

In this core, we build FP-style web services with FastAPI in the FuncPipe RAG Builder (now at funcpipe-rag-09). FastAPI provides type-safe, async-capable APIs with automatic OpenAPI docs, using Pydantic for validation and dependency injection for pure cores. We emphasize thin adapters (routers/endpoints that delegate to pure funcs), pure domain logic (no I/O/mutations in core), and composable routes integrating with prior boundaries (Module 7) and async flows (Module 8). Refactor RAG into API endpoints (e.g., /rag/process for pipeline, /rag/query for search), verifying equivalence to non-web versions and laws like purity (same inputs yield same outputs). Dependencies: pip install fastapi uvicorn; tradeoffs: FastAPI speed/validation vs Flask simplicity; sharp edges: sync vs async routes (prefer async for I/O), dep overrides for tests, Pydantic v2 changes (prefer .model_dump() for serialization).

Motivation Bug: Imperative web handlers mixing logic with I/O lead to untestable, non-composable services; FP style separates pure cores from thin adapters for reliable APIs.

Delta from Core 3 (Module 09): Data libs handle batch processing; this exposes pipelines as services with HTTP boundaries.

Web Protocol (Contract, Entry/Exit Criteria): - Purity: Core funcs pure (depend on injected deps/inputs); effects in adapters/deps (e.g., DB via Depends); purity conditional on deps being referentially transparent/version-pinned. - Composability: Routes as functions; deps for capabilities; routers for modular APIs. - Async/Sync: Prefer async for I/O-bound; sync for compute. For CPU-bound cores in async routes, offload to threadpools (e.g., via concurrent.futures) or task queues to avoid blocking the event loop—don't fake parallelism with async def alone. - Semantics: Laws like purity (handler(fixed deps, input) deterministic); equivalence to non-web (API call == direct func, up to serialization/HTTP mapping); verified via properties. - Integration: Wrap RAG core in FastAPI endpoints; use Pydantic for ADTs (Module 5); test with overrides; map FPResult to HTTP (Ok→200, Err(domain)→4xx, Err(unexpected)→5xx). - Mypy Config: --strict; fastapi/pydantic typing.

Audience: Web developers building services around FP pipelines, needing pure cores with testable adapters.

Outcome: 1. Build FastAPI services with thin adapters/pure cores. 2. Expose RAG as API endpoints. 3. Prove equivalence/laws with Hypothesis.


1. Laws & Invariants

Law Description Enforcement
Purity Law Core funcs depend only on inputs/deps; no globals/I/O (conditional on deps transparency). Code reviews/mypy
Equivalence Law API response == direct core call for same input (up to serialization/HTTP mapping). Hypothesis equivalence
Idempotence Inv Pure cores: repeat calls same output (if idempotent logic). Property tests
Resource Inv Adapters manage lifecycles (e.g., yield deps for cleanup). FastAPI tests

These laws ensure web layers don't break FP properties.


2. Decision Table

Scenario Async Needed Streaming Response Background Tasks Recommended
Sync compute API No No No Sync routes
Async I/O API Yes No No Async deps
Streaming output Yes Yes No StreamingResponse
Fire-forget No No Yes BackgroundTasks

FastAPI for type-safe, async services; deps for FP injection.


3. Public API (Adapters for Core; Dep Injection)

Adapters wrap core funcs. Guard imports.

Repo alignment note (end-of-Module-09): - FastAPI is treated as optional in this repo. - A minimal guarded adapter exists at src/funcpipe_rag/boundaries/web/fastapi_app.py (create_app() raises ImportError if FastAPI isn't installed).

Error-to-HTTP mapping table (reference only): | Err Kind | Default HTTP | Example Code | Status Override | Retryable | Example | |----------|--------------|--------------|-----------------|-----------|---------| | domain | 400 | invalid_input| 422 for validation | No | Invalid doc format | | infra | 503 | db_timeout | - | Yes | DB timeout | | unexpected | 500 | runtime_error| - | No | Unexpected error |

from __future__ import annotations  # For forward refs

from typing import Annotated
from collections.abc import AsyncIterable, AsyncGenerator
from functools import lru_cache
from fastapi import FastAPI, APIRouter, Body, Depends, HTTPException, BackgroundTasks
from pydantic import BaseModel
from funcpipe_rag import FPResult, Ok, Err, RawDoc, Chunk, ErrInfo
from funcpipe_rag import rag_core  # Pure core
from funcpipe_rag import RagEnv  # Environment/capability holder


# Assume ErrInfo from adts is immutable with attrs: kind ('domain'|'infra'|'unexpected'), msg, retryable, code (machine-readable)

# Extend for web mapping (without shadowing)
def map_err_to_http(err: ErrInfo) -> tuple[int, dict]:
    STATUS_MAP = {
        'domain': 400,
        'infra': 503,
        'unexpected': 500,
    }
    status = 422 if err.code == 'validation_error' else STATUS_MAP.get(err.kind, 500)
    envelope = {
        "error": {
            "kind": err.kind,
            "code": err.code,
            "message": err.msg,
            "retryable": err.retryable,
        }
    }
    return status, envelope


# Boundary DTOs (Pydantic v2)
class RawDocDTO(BaseModel):
    content: str
    metadata: dict[str, str] = {}


class ChunkDTO(BaseModel):
    text: str
    embedding: list[float]


# Converters
def to_internal_docs(dtos: list[RawDocDTO]) -> list[RawDoc]:
    return [RawDoc(content=dto.content, metadata=dto.metadata) for dto in dtos]


def from_internal_chunks(chunks: list[Chunk]) -> list[ChunkDTO]:
    return [ChunkDTO(text=c.text, embedding=c.embedding) for c in chunks]


# Centralized mapping (Ok/Err are simple dataclasses; use isinstance for robustness)
def fpresult_to_http(result: FPResult[list[Chunk], ErrInfo]) -> list[ChunkDTO]:
    if isinstance(result, Ok):
        return from_internal_chunks(result.value)
    elif isinstance(result, Err):
        status, envelope = map_err_to_http(result.error)
        raise HTTPException(status_code=status, detail=envelope)
    else:
        raise ValueError("Invalid FPResult")


# Lifespan for startup/shutdown
async def lifespan(app: FastAPI):
    # Startup: preload embeddings, etc.
    yield
    # Shutdown: close connections


app = FastAPI(lifespan=lifespan)


# Cached dep; RagEnv must be immutable or hold only pure config
@lru_cache(maxsize=1)
def get_cached_env() -> RagEnv:
    return RagEnv.from_config()


rag_router = APIRouter(prefix="/rag")


@rag_router.post("/process", response_model=list[ChunkDTO])
async def process_docs(
        docs: list[RawDocDTO] = Body(...),
        env: Annotated[RagEnv, Depends(get_cached_env)]
):
    internal_docs = to_internal_docs(docs)
    result = rag_core(internal_docs, env)
    return fpresult_to_http(result)


app.include_router(rag_router)

4. Reference Implementations

4.1 FastAPI Basics in FP

# Pure handler
def pure_handler(docs: list[RawDoc], env: RagEnv) -> FPResult[list[Chunk], ErrInfo]:
    return rag_core(docs, env)

# Thin adapter
@rag_router.post("/process")
async def rag_process(
    docs: list[RawDocDTO] = Body(...),
    env: Annotated[RagEnv, Depends(get_cached_env)]
):
    result = pure_handler(to_internal_docs(docs), env)
    return fpresult_to_http(result)

4.2 Dep Injection for Effects

async def get_db_dep() -> AsyncGenerator[DB, None]:
    db = await async_db_connect()
    try:
        yield db
    finally:
        await db.close()

@rag_router.get("/db-test")
async def db_test(db: Annotated[DB, Depends(get_db_dep)]):
    return await query_with_db_capability(db)  # Pure relative to capability

4.3 Async Endpoints

async def async_rag_core(docs: list[RawDoc], env: RagEnv) -> FPResult[list[Chunk], ErrInfo]:
    return await async_process(docs, env)

@rag_router.post("/async-process")
async def rag_async_process(
    docs: list[RawDocDTO] = Body(...),
    env: Annotated[RagEnv, Depends(get_cached_env)]
):
    result = await async_rag_core(to_internal_docs(docs), env)
    return fpresult_to_http(result)

4.4 Streaming Response

from typing import Protocol
from fastapi.responses import StreamingResponse
from asyncio import CancelledError

class Streamer(Protocol):
    async def stream_query(self, query: str) -> AsyncIterable[str]: ...
    async def close(self) -> None: ...
    async def cancel(self) -> None: ...

async def get_streamer_dep() -> AsyncGenerator[Streamer, None]:
    streamer = await get_streamer()
    try:
        yield streamer
    finally:
        await streamer.close()

async def stream_rag_core(query: str, streamer: Streamer) -> AsyncIterable[bytes]:
    try:
        async for chunk in streamer.stream_query(query):
            yield chunk.encode()
    except CancelledError:
        await streamer.cancel()

@rag_router.get("/stream-query")
async def rag_stream_query(
    query: str,
    streamer: Annotated[Streamer, Depends(get_streamer_dep)]
):
    return StreamingResponse(stream_rag_core(query, streamer), media_type="text/event-stream")

4.5 Background Tasks

class WorkDesc(BaseModel):
    tasks: list[str]

async def execute_work(work: WorkDesc, dep: Dep) -> None:
    ...

def describe_background_work(input: Input) -> WorkDesc:
    return WorkDesc(tasks=[...])

@rag_router.post("/background")
async def background_handler(
    input: Model = Body(...),
    bg_tasks: BackgroundTasks,
    dep=Depends()
):
    work = describe_background_work(input)
    bg_tasks.add_task(execute_work, work, dep)
    return {"status": "scheduled"}

4.6 Integration in RAG

rag_router.post("/query")(pure_query_handler)

4.7 Before/After Refactor

# Before: Imperative
@rag_router.post("/")
def handler(body: dict):
    docs = parse(body)
    result = process(docs)  # Mixed
    return result

# After: FP adapter
@rag_router.post("/")
async def handler(docs: list[RawDocDTO] = Body(...)):
    return fpresult_to_http(pure_process(to_internal_docs(docs)))

5. Property-Based Proofs (repo tests)

This repo does not run FastAPI property tests by default because FastAPI is an optional dependency here. The FP laws are enforced by keeping the web layer thin and heavily unit-testing the pure core.

from hypothesis import given
import hypothesis.strategies as st
from fastapi.testclient import TestClient
from funcpipe_rag import rag_core, default_env


def normalize_json(data):
    if isinstance(data, list):
        return sorted([{"text": i["text"], "embedding": i["embedding"]} for i in data], key=lambda x: x["text"])
    return data


@given(docs=st.lists(raw_doc_strat()))
def test_api_equiv(docs):
    dto_docs = [RawDocDTO(content=d.content, metadata=d.metadata) for d in docs]
    client = TestClient(app)
    try:
        app.dependency_overrides[get_cached_env] = lambda: default_env
        response = client.post("/rag/process", json=[d.model_dump() for d in dto_docs])
    finally:
        app.dependency_overrides.clear()

    expected_result = rag_core(docs, default_env)
    if isinstance(expected_result, Ok):
        assert response.status_code == 200
        expected_dto = from_internal_chunks(expected_result.value)
        assert normalize_json(response.json()) == normalize_json([d.model_dump() for d in expected_dto])
    elif isinstance(expected_result, Err):
        status, envelope = map_err_to_http(expected_result.error)
        assert response.status_code == status
        assert response.json() == {"detail": envelope}

6. Runtime Preservation Guarantee

FastAPI async for concurrency; core perf preserved.


7. Anti-Patterns & Immediate Fixes

Anti-Pattern Symptom Fix
Mixed I/O in core Untestable Deps for effects
No overrides Hard mocks Use dep overrides
Sync I/O blocks Slow Async routes

8. Pre-Core Quiz

  1. FastAPI for…? → Typed async APIs
  2. Deps for…? → FP injection
  3. Adapters for…? → HTTP boundaries
  4. Law? → Purity of handlers
  5. Benefit? → Testable services

9. Post-Core Exercise

  1. Expose RAG as FastAPI.
  2. Prove equiv with Hypothesis.

Pipeline Usage (Idiomatic)

@rag_router.post("/")
async def handler(input: Model = Body(...), dep=Depends()):
    return pure_core(input, dep)

Next Core: 5. Data/ML Pipelines – Stateless Transforms, Config-Driven FuncPipe Steps