Skip to content

Python API Reference

The core entry points are documented via mkdocstrings. Key modules:

bijux_rag.rag.app

Application services for the 'real RAG' path.

This module wires

clean -> chunk -> index -> retrieve -> (optional rerank) -> generate.

Both CLI and FastAPI boundary call into this layer to avoid drift.

RagApp dataclass

RagApp(
    generator=ExtractiveGenerator(),
    reranker=LexicalOverlapReranker(),
    profile="default",
)

RagBuildConfig dataclass

RagBuildConfig(
    chunk_env,
    backend="bm25",
    embedder="hash16",
    sbert_model="all-MiniLM-L6-v2",
    bm25_buckets=2048,
)

RAG build configuration.

RagIndex dataclass

RagIndex(backend, index, fingerprint, schema_version=1)

In-memory index wrapper for deterministic CI profile.

ask

ask(
    *,
    index_path,
    query,
    top_k=5,
    filters=None,
    embedder=None,
    rerank=True,
)

Retrieve and answer with citations.

Source code in src/bijux_rag/rag/app.py
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
def ask(
    *,
    index_path: Path,
    query: str,
    top_k: int = 5,
    filters: Mapping[str, str] | None = None,
    embedder: Embedder | None = None,
    rerank: bool = True,
) -> Answer:
    """Retrieve and answer with citations."""

    cands = retrieve(
        index_path=index_path,
        query=query,
        top_k=max(20, int(top_k)),
        filters=filters,
        embedder=embedder,
    )
    if rerank:
        cands = LexicalOverlapReranker().rerank(query=query, candidates=cands, top_k=int(top_k))
    else:
        cands = cands[: int(top_k)]
    return ExtractiveGenerator().generate(query=query, candidates=cands)

build_index_from_csv

build_index_from_csv(*, csv_path, out_path, cfg)

Build and persist an index.

Returns:

Type Description
str

The index fingerprint.

Source code in src/bijux_rag/rag/app.py
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
def build_index_from_csv(*, csv_path: Path, out_path: Path, cfg: RagBuildConfig) -> str:
    """Build and persist an index.

    Returns:
        The index fingerprint.
    """

    chunks = ingest_csv_to_chunks(csv_path=csv_path, env=cfg.chunk_env)
    if cfg.backend == "bm25":
        idx = build_bm25_index(chunks=chunks, buckets=cfg.bm25_buckets)
        idx.save(str(out_path))
        return idx.fingerprint

    if cfg.backend == "numpy-cosine":
        emb = _make_embedder(cfg)
        idx = build_numpy_cosine_index(chunks=chunks, embedder=emb)
        idx.save(str(out_path))
        return idx.fingerprint

    raise ValueError(f"unknown index backend: {cfg.backend}")

ingest_csv_to_chunks

ingest_csv_to_chunks(*, csv_path, env)

Ingest a CSV and return chunks.

Parameters:

Name Type Description Default
csv_path Path

CSV path with columns: doc_id,title,abstract,categories.

required
env RagEnv

Chunking configuration.

required

Returns:

Type Description
list[Chunk]

A list of chunks (without embeddings for lexical backends).

Source code in src/bijux_rag/rag/app.py
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
def ingest_csv_to_chunks(*, csv_path: Path, env: RagEnv) -> list[Chunk]:
    """Ingest a CSV and return chunks.

    Args:
        csv_path: CSV path with columns: doc_id,title,abstract,categories.
        env: Chunking configuration.

    Returns:
        A list of chunks (without embeddings for lexical backends).
    """

    storage = FileStorage()
    docs: list[RawDoc] = []
    errors: list[str] = []
    for res in storage.read_docs(str(csv_path)):
        if is_ok(res):
            docs.append(res.value)
        elif is_err(res):
            errors.append(f"{res.error.code}: {res.error.msg}")
        else:  # pragma: no cover
            errors.append("unknown error")

    if errors:
        # Fail fast: ingestion is a boundary operation.
        raise ValueError("CSV parse failures: " + "; ".join(errors[:3]))

    cleaned = list(_iter_clean_docs(docs))
    raw_chunks = list(_iter_chunks(cleaned, env))
    return [
        Chunk(
            doc_id=c.doc_id,
            text=c.text,
            start=c.start,
            end=c.end,
            metadata=c.metadata,
            embedding=(),
        )
        for c in raw_chunks
    ]

ingest_docs_to_chunks

ingest_docs_to_chunks(*, docs, env)

Ingest in-memory docs and return chunks.

Source code in src/bijux_rag/rag/app.py
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
def ingest_docs_to_chunks(*, docs: Iterable[RawDoc], env: RagEnv) -> list[Chunk]:
    """Ingest in-memory docs and return chunks."""

    cleaned = list(_iter_clean_docs(docs))
    raw_chunks = list(_iter_chunks(cleaned, env))
    return [
        Chunk(
            doc_id=c.doc_id,
            text=c.text,
            start=c.start,
            end=c.end,
            metadata=c.metadata,
            embedding=(),
        )
        for c in raw_chunks
    ]

parse_filters

parse_filters(filters)

Parse CLI/API filters.

Parameters:

Name Type Description Default
filters list[str] | None

list like ["category=cs.AI", "doc_id=foo"].

required
Source code in src/bijux_rag/rag/app.py
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
def parse_filters(filters: list[str] | None) -> dict[str, str]:
    """Parse CLI/API filters.

    Args:
        filters: list like ["category=cs.AI", "doc_id=foo"].
    """

    out: dict[str, str] = {}
    for f in filters or []:
        if "=" not in f:
            raise ValueError(f"invalid filter: {f}")
        k, v = f.split("=", 1)
        k = k.strip()
        v = v.strip()
        if not k or not v:
            raise ValueError(f"invalid filter: {f}")
        out[k] = v
    return out

retrieve

retrieve(
    *,
    index_path,
    query,
    top_k=5,
    filters=None,
    embedder=None,
)

Retrieve candidates from a persisted index.

Source code in src/bijux_rag/rag/app.py
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
def retrieve(
    *,
    index_path: Path,
    query: str,
    top_k: int = 5,
    filters: Mapping[str, str] | None = None,
    embedder: Embedder | None = None,
) -> list[Candidate]:
    """Retrieve candidates from a persisted index."""

    idx = load_index(str(index_path))

    if isinstance(idx, NumpyCosineIndex) and embedder is None:
        # Default embedder based on index spec.
        if idx.spec.model.startswith("sbert:"):
            embedder = SentenceTransformersEmbedder(model_name=idx.spec.model.split(":", 1)[1])
        else:
            embedder = HashEmbedder()

    return idx.retrieve(query=query, top_k=int(top_k), filters=filters, embedder=embedder)

bijux_rag.rag.indexes

Reference indexes.

Two backends are provided out of the box: * NumpyCosineIndex: small/medium corpora, deterministic, dependency-free. * BM25Index: CI-friendly lexical retrieval without model downloads.

Persistence format: msgpack (schema_versioned).

BM25Index dataclass

BM25Index(
    chunks,
    buckets,
    df,
    tfs,
    doc_len,
    avg_dl,
    k1=1.2,
    b=0.75,
)

Hashed-token BM25 index.

This is a practical, CI-friendly retrieval baseline: - deterministic - no large model downloads - supports metadata filters

NumpyCosineIndex dataclass

NumpyCosineIndex(chunks, vectors, spec)

Dense vector index using cosine similarity.

build_bm25_index

build_bm25_index(*, chunks, buckets=2048, k1=1.2, b=0.75)

Build a hashed-token BM25 index.

Source code in src/bijux_rag/rag/indexes.py
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
def build_bm25_index(
    *, chunks: Sequence[Chunk], buckets: int = 2048, k1: float = 1.2, b: float = 0.75
) -> BM25Index:
    """Build a hashed-token BM25 index."""

    if not chunks:
        raise ValueError("cannot build index from empty chunk list")
    n = len(chunks)
    df = np.zeros((buckets,), dtype=np.int32)
    tfs: list[tuple[tuple[int, int], ...]] = []
    doc_len = np.zeros((n,), dtype=np.int32)

    ordered_chunks = sorted(chunks, key=lambda c: c.chunk_id)

    # Compute per-chunk term counts and bucket doc-frequencies.
    for i, c in enumerate(ordered_chunks):
        toks = _tokenize(c.text)
        doc_len[i] = np.int32(len(toks))
        counts: dict[int, int] = {}
        seen: set[int] = set()
        for t in toks:
            bucket = _stable_token_bucket(t, buckets=buckets)
            counts[bucket] = counts.get(bucket, 0) + 1
            seen.add(bucket)
        for bkt in seen:
            df[bkt] += 1
        tfs.append(tuple(sorted(counts.items())))

    avg_dl = float(doc_len.mean()) if n else 0.0
    return BM25Index(
        chunks=tuple(ordered_chunks),
        buckets=buckets,
        df=df,
        tfs=tuple(tfs),
        doc_len=doc_len,
        avg_dl=avg_dl,
        k1=float(k1),
        b=float(b),
    )

build_numpy_cosine_index

build_numpy_cosine_index(*, chunks, embedder)

Build a dense index from chunk texts.

Source code in src/bijux_rag/rag/indexes.py
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
def build_numpy_cosine_index(*, chunks: Sequence[Chunk], embedder: Embedder) -> NumpyCosineIndex:
    """Build a dense index from chunk texts."""

    if not chunks:
        raise ValueError("cannot build index from empty chunk list")
    ordered_chunks = sorted(chunks, key=lambda c: c.chunk_id)
    spec = embedder.spec
    texts = [c.text for c in ordered_chunks]
    vecs = embedder.embed_texts(texts)
    if vecs.ndim != 2:
        raise ValueError("embedder must return a 2D array")
    if vecs.shape[0] != len(ordered_chunks):
        raise ValueError("embedder output size mismatch")
    # Spec dim is enforced at the boundary (this is the point of EmbeddingSpec).
    if vecs.shape[1] != spec.dim:
        # Allow embedders to report placeholder dims; in that case, take the real dim.
        spec = EmbeddingSpec(
            model=spec.model, dim=int(vecs.shape[1]), metric=spec.metric, normalized=spec.normalized
        )
    arr = np.asarray(vecs, dtype=np.float32)
    if spec.normalized:
        arr = _l2_normalize(arr)
    out_chunks = tuple(
        Chunk(
            doc_id=c.doc_id,
            text=c.text,
            start=c.start,
            end=c.end,
            metadata=c.metadata,
            embedding=tuple(float(x) for x in arr[i].tolist()),
            embedding_spec=spec,
        )
        for i, c in enumerate(ordered_chunks)
    )
    return NumpyCosineIndex(chunks=out_chunks, vectors=arr, spec=spec)

load_index

load_index(path)

Load an index from disk.

Source code in src/bijux_rag/rag/indexes.py
588
589
590
591
592
593
594
595
596
597
598
def load_index(path: str) -> NumpyCosineIndex | BM25Index:
    """Load an index from disk."""

    with open(path, "rb") as f:
        payload = msgpack.unpackb(f.read(), raw=False)
    backend = payload.get("backend")
    if backend == "bm25":
        return BM25Index.load(path)
    if backend == "numpy-cosine":
        return NumpyCosineIndex.load(path)
    raise ValueError(f"unknown index backend: {backend}")

bijux_rag.rag.ports

RAG primitives: ports for embedders, indexes, retrieval, and generation.

This module is deliberately dependency-light. Concrete backends live in sibling modules.

The goal is to make bijux-rag actually RAG: ingest -> index -> retrieve (+ optional rerank) -> answer with citations.

Answer dataclass

Answer(text, citations=(), candidates=())

A grounded answer.

Parameters:

Name Type Description Default
text str

Answer text.

required
citations tuple[Citation, ...]

Evidence citations.

()

Candidate dataclass

Candidate(chunk, score, metadata=dict())

A retrieved chunk plus score and non-sensitive metadata.

Citation dataclass

Citation(doc_id, chunk_id, start, end, text=None)

A citation referencing an evidence chunk.

Embedder

Bases: Protocol

Embedder port.

Implementations must be deterministic given the same inputs and configuration.

Generator

Bases: Protocol

Generator port.

Index

Bases: Protocol

Index port.

Indexes are responsible for persistence (save/load) and schema versioning.

Indexer

Bases: Protocol

Indexer port.

Reranker

Bases: Protocol

Reranker port.

bijux_rag.boundaries.web.fastapi_app

FastAPI adapter exposing chunking and RAG endpoints.

pyright: reportUnusedFunction=false

create_app

create_app()

Construct a FastAPI app with chunking and RAG endpoints.

Source code in src/bijux_rag/boundaries/web/fastapi_app.py
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
def create_app() -> FastAPI:
    """Construct a FastAPI app with chunking and RAG endpoints."""

    app = FastAPI(title="bijux-rag", openapi_version="3.1.0")
    router = APIRouter(prefix="/v1")

    _APP = RagApp()
    _INDEX_STORE: Dict[str, RagIndex] = {}

    @router.get("/healthz")
    async def healthz() -> dict[str, bool]:
        return {"ok": True}

    @router.post("/chunks", response_model=PChunkResponse)
    async def chunks(req: PChunkRequest) -> PChunkResponse:
        # Boundary validation ensures we do not 500 on invalid inputs.
        try:
            docs = [(d.doc_id, d.text, d.title, d.category) for d in req.docs]
            cfg = ChunkAndEmbedConfig(
                chunk_size=req.chunk_size,
                overlap=req.overlap,
                include_embeddings=req.include_embeddings,
            )
            res = chunk_and_embed_docs(docs, cfg)
        except ValueError as e:
            # Defensive: should be unreachable if request validation is correct.
            raise HTTPException(status_code=422, detail=str(e)) from e

        if isinstance(res, Err):
            raise HTTPException(status_code=400, detail=res.error)

        return PChunkResponse(
            chunks=[
                ChunkOut(
                    doc_id=c.doc_id,
                    text=c.text,
                    start=c.start,
                    end=c.end,
                    metadata=dict(c.metadata),
                    embedding=c.embedding if c.embedding else None,
                    chunk_id=c.chunk_id,
                )
                for c in res.value
            ]
        )

    @router.post("/index/build", response_model=IndexBuildResponse)
    async def index_build(req: IndexBuildRequest) -> IndexBuildResponse:
        docs = [
            RawDoc(
                doc_id=d.doc_id,
                title=d.title or "",
                abstract=d.text,
                categories=d.category or "",
            )
            for d in req.docs
        ]
        res = _APP.build_index(
            docs=docs,
            backend=_backend_from_str(req.backend),
            chunk_size=req.chunk_size,
            overlap=req.overlap,
        )
        if isinstance(res, Err):
            raise HTTPException(status_code=400, detail=res.error)

        idx = res.value
        index_id = f"idx_{idx.fingerprint}"
        _INDEX_STORE[index_id] = idx

        return IndexBuildResponse(
            index_id=index_id,
            fingerprint=idx.fingerprint,
            schema_version=idx.schema_version,
        )

    @router.post("/retrieve", response_model=RetrieveResponse)
    async def retrieve(req: RetrieveRequest) -> RetrieveResponse:
        idx = _INDEX_STORE.get(req.index_id)
        if idx is None:
            raise HTTPException(status_code=404, detail="Unknown index_id")

        res = _APP.retrieve(index=idx, query=req.query, top_k=req.top_k, filters=req.filters)
        if isinstance(res, Err):
            raise HTTPException(status_code=400, detail=res.error)

        candidates: list[Candidate] = res.value
        return RetrieveResponse(
            candidates=[
                PCandidate(
                    score=c.score,
                    chunk={
                        "doc_id": c.chunk.doc_id,
                        "chunk_id": c.chunk.chunk_id,
                        "text": c.chunk.text,
                        "start": c.chunk.start,
                        "end": c.chunk.end,
                        "metadata": dict(c.chunk.metadata),
                    },
                    metadata=dict(c.metadata),
                )
                for c in candidates
            ]
        )

    @router.post("/ask", response_model=AskResponse)
    async def ask(req: AskRequest) -> AskResponse:
        idx = _INDEX_STORE.get(req.index_id)
        if idx is None:
            raise HTTPException(status_code=404, detail="Unknown index_id")

        res = _APP.ask(
            index=idx,
            query=req.query,
            top_k=req.top_k,
            filters=req.filters,
            rerank=req.rerank,
        )
        if isinstance(res, Err):
            raise HTTPException(status_code=400, detail=res.error)

        ans = cast(Answer, res.value)
        return AskResponse(
            answer=ans.text,
            citations=[
                PCitation(
                    doc_id=c.doc_id,
                    chunk_id=c.chunk_id,
                    start=c.start,
                    end=c.end,
                    text=c.text,
                )
                for c in ans.citations
            ],
            candidates=[
                PCandidate(
                    score=c.score,
                    chunk={
                        "doc_id": c.chunk.doc_id,
                        "chunk_id": c.chunk.chunk_id,
                        "text": c.chunk.text,
                        "start": c.chunk.start,
                        "end": c.chunk.end,
                        "metadata": dict(c.chunk.metadata),
                    },
                    metadata=dict(c.metadata),
                )
                for c in ans.candidates
            ],
        )

    app.include_router(router)

    def _custom_openapi() -> dict[str, Any]:
        if app.openapi_schema:
            return app.openapi_schema
        app.openapi_schema = get_openapi(
            title=app.title,
            version="0.1.0",
            routes=app.routes,
            openapi_version="3.1.0",
            description=app.description,
        )
        return app.openapi_schema

    app.openapi = _custom_openapi  # type: ignore[method-assign]
    return app