Skip to content

Core 6: Command-Line and Config-Driven FuncPipe – Integrating with Click/Typer

Module 09

Core question:
How do you integrate command-line interfaces with FuncPipe using Click or Typer to create config-driven entry points that delegate to pure pipelines, support overrides, and maintain testability while separating CLI concerns from core logic?

In this core, we integrate CLI tools with FuncPipe in the FuncPipe RAG Builder (now at funcpipe-rag-09). Click and Typer enable type-safe, composable CLIs with subcommands, options, and arguments; Typer leverages Python type hints for automatic validation and help docs. Emphasize thin CLI adapters (commands that parse to RunSpec, delegate to pure-ish run_from_spec, handle output/exit centrally), config-driven execution (e.g., JSON/YAML loaded as Pydantic, with overrides: CLI > env > file > defaults via deep merge and re-validation), and testable commands (invoke as funcs with mocks, pure runner for core). Refactor RAG entry points (e.g., rag-process command) into CLI-driven scripts, verifying equivalence and laws like determinism (same spec yields same outputs under pinned env). Dependencies: pip install typer pyyaml; optional click (Typer for type-driven simplicity; Click for advanced); tradeoffs: Click flexibility (callbacks, groups) vs Typer simplicity (type-driven); sharp edges: config overriding (deep merge to avoid partial overrides), error handling in CLI (map FPResult to exit codes/table), config discovery (--config, env var, XDG paths), dry-run/print-config for usability.

Motivation Bug: Hard-coded scripts mix I/O with logic, leading to untestable entry points; CLI integration with config-driving separates concerns for reusable, testable FuncPipe.

Delta from Core 5 (Module 09): Data/ML pipelines are internal; this adds CLI entry points with config-loading for user-driven execution.

CLI Protocol (Contract, Entry/Exit Criteria): - Thin Adapters: Commands parse/build RunSpec, call run_from_spec (pure except boundaries), handle render/exit centrally; no policy in CLI. - Config-Driven: Load from file/env with CLI overrides (precedence: CLI > env > file > defaults); deep merge + re-validate; discovery via --config/env/XDG. - Composability: Subcommands for modular pipelines; groups for related funcs. - Purity: Core funcs pure; effects in CLI (e.g., print, file I/O); runtime coercion for primitives. - Semantics: Laws like determinism (fixed spec deterministic under pinned env/artifacts, no timestamps/random); equivalence (CLI call == direct func up to I/O/formatting); verified via properties/invoke tests. - Integration: Wrap RAG pipelines in commands; use Pydantic for config/args; test with Typer/Click runners; entrypoints in pyproject.toml. - Mypy Config: --strict; typer/click typing.

Audience: Developers exposing FuncPipe as tools/scripts, needing config-driven CLIs in FP style.

Outcome: 1. Build CLI with Typer/Click delegating to pure FuncPipe. 2. Make RAG config-driven via CLI with overrides/merge. 3. Prove equivalence/laws with tests.


1. Laws & Invariants

Law Description Enforcement
Determinism Law Same spec yields same outputs (conditional on pure core, pinned env/artifacts, no timestamps/random). Invoke tests with preconditions
Equivalence Law CLI response == direct core call (up to I/O/formatting). Hypothesis/invoke
Idempotence Inv Repeat calls same if idempotent logic. Property tests
Config Inv Overrides applied consistently per precedence; re-validated post-merge. Config tests

These laws ensure CLI doesn't break FuncPipe properties.


2. Decision Table

Scenario Type Safety Subcommands Needed Recommended
Simple script Low No Click basics
Type-driven High No Typer
Complex groups Medium Yes Click groups
Config overrides Any Any Both with options

Typer for FP-friendly types; Click for advanced.


3. Public API (CLI Commands & Config Loaders)

Commands as thin adapters. Guard imports.

Repo alignment note (end-of-Module-09): - This repo ships a stdlib argparse CLI at src/funcpipe_rag/boundaries/shells/cli.py. - Override parsing/merge lives in src/funcpipe_rag/pipelines/cli.py. - A minimal optional Typer shell exists at src/funcpipe_rag/boundaries/shells/typer_cli.py (import-guarded).

Exit Code Mapping (from FPResult): | Result | Code | Example | |--------|------|---------| | Ok | 0 | Success | | Err(domain) | 2 | Invalid input | | Err(infra) | 3 | File not found | | Err(unexpected) | 1 | Runtime error |

from typing import Optional, Dict, Any, TypeVar, List
from pathlib import Path
from functools import reduce
import os
import json
import yaml
import typer
from pydantic import BaseModel
from funcpipe_rag import rag_pipeline  # Pure core; placeholder
from funcpipe_rag import PipelineConfig, FPResult, Ok, ErrInfo, StepConfig  # From prior

Out = TypeVar('Out')

app = typer.Typer()


class RunSpec(BaseModel):
    input_path: Path
    config: PipelineConfig
    artifacts: Dict[str, Any]  # Model handles, etc.
    seed: Optional[int] = None  # For determinism
    output_format: str = 'json'  # json/text/etc.


def deep_merge(base: Dict[str, Any], over: Dict[str, Any]) -> Dict[str, Any]:
    merged = base.copy()  # Non-mutating
    for k, v in over.items():
        if isinstance(v, dict) and isinstance(merged.get(k), dict):
            merged[k] = deep_merge(merged[k], v)
        else:
            merged[k] = v
    return merged


def load_and_override(cli_config_overrides: Dict[str, Any], env_config_overrides: Dict[str, Any],
                      file_path: Optional[Path] = None) -> PipelineConfig:
    # Precedence: CLI > env > file > defaults for config-level
    if file_path is None:
        file_path = find_config()
    if file_path and file_path.exists():
        with open(file_path, 'r') as f:
            if file_path.suffix in ('.yaml', '.yml'):
                base_dict = yaml.safe_load(f)
            else:
                base_dict = json.load(f)
        base_dict = PipelineConfig.model_validate(base_dict).model_dump()
    else:
        base_dict = PipelineConfig().model_dump()  # Defaults
    merged = reduce(deep_merge, [base_dict, env_config_overrides, cli_config_overrides])
    return PipelineConfig.model_validate(merged)  # Re-validate post-merge


def find_config() -> Optional[Path]:
    if env_path := os.getenv('FUNCPipe_CONFIG'):
        return Path(env_path)
    xdg = Path(os.getenv('XDG_CONFIG_HOME', '~/.config')).expanduser() / 'funcpipe'
    for ext in ['.json', '.yaml', '.yml']:
        candidate = xdg / ('config' + ext)
        if candidate.exists():
            return candidate
    return None


def parse_override(path_value: str) -> Dict[str, Any]:
    path, value = path_value.split('=', 1)
    keys = path.split('.')
    d = {keys[-1]: coerce(value)}
    for k in reversed(keys[:-1]):
        d = {k: d}
    return d


def coerce(v: str) -> Any:
    try:
        return json.loads(v)
    except:
        return v


def apply_overrides(cfg: PipelineConfig, overrides: Dict[str, Any]) -> PipelineConfig:
    cfg2 = cfg.model_copy(deep=True)
    new_steps: List[StepConfig] = []
    seen: set[str] = set()
    for step in cfg2.steps:
        if step.name in overrides:
            seen.add(step.name)
            over = overrides[step.name]
            new_params = deep_merge(step.params, over.get('params', {}))
            step = step.model_copy(update={"params": new_params})
        new_steps.append(step)
    unknown = set(overrides.keys()) - seen
    if unknown:
        raise ValueError(f"Unknown step override(s): {sorted(unknown)}")
    return cfg2.model_copy(update={"steps": new_steps})


def run_from_spec(spec: RunSpec) -> FPResult[Out, ErrInfo]:
    if spec.seed is not None:
        set_seed(spec.seed)  # Placeholder; pin for determinism
    docs = load_docs(spec.input_path)  # Boundary I/O; placeholder
    return rag_pipeline(spec.config, docs, spec.artifacts)  # Delegate pure; adjust sig


def handle_result(result: FPResult[Out, ErrInfo], output_format: str) -> int:
    if isinstance(result, Ok):
        out = format_result(result.value, output_format)  # Placeholder; json.dumps for json
        typer.echo(out)
        return 0
    else:
        if output_format == 'json':
            err_out = json.dumps({
                "error": {"kind": result.error.kind, "code": result.error.code, "message": result.error.msg}
            }, ensure_ascii=False)
        else:
            err_out = f"Error [{result.error.kind}]: {result.error.msg} (code: {result.error.code})"
        typer.echo(err_out, err=True)
        code_map = {'domain': 2, 'infra': 3, 'unexpected': 1}
        return code_map.get(result.error.kind, 1)


def build_spec_from_cli(
        input_path: Path,
        config_path: Optional[Path],
        override: Optional[list[str]],
        seed: Optional[int],
        output_format: str,
) -> RunSpec:
    cli_config_overrides = {}  # Config-level
    cli_step_overrides = {}  # Step-level
    if override:
        for ov in override:
            parsed = parse_override(ov)
            if 'steps' in parsed:
                cli_config_overrides = deep_merge(cli_config_overrides, parsed)
            else:
                cli_step_overrides = deep_merge(cli_step_overrides, parsed)
    env_config_overrides = parse_override(os.getenv('FUNCPipe_OVERRIDE', '')) if os.getenv(
        'FUNCPipe_OVERRIDE') else {}  # Config-level example
    config = load_and_override(cli_config_overrides, env_config_overrides, config_path)
    config = apply_overrides(config, cli_step_overrides)
    config = PipelineConfig.model_validate(config.model_dump())  # Re-validate after apply
    return RunSpec(input_path=input_path, config=config, artifacts=load_artifacts(), seed=seed,
                   output_format=output_format)  # load_artifacts placeholder


@app.command()
def rag_process(
        input_path: Path,
        config_path: Optional[Path] = typer.Option(None, '--config'),
        override: Optional[list[str]] = typer.Option(None, '--set', help="Override step.params.key=value"),
        seed: Optional[int] = typer.Option(None, '--seed'),
        output_format: str = typer.Option('json', '--format'),
        dry_run: bool = typer.Option(False, '--dry-run'),
        print_config: bool = typer.Option(False, '--print-config'),
):
    if print_config:
        config = load_and_override({}, {}, config_path)
        typer.echo(config.model_dump_json(indent=2))
        raise typer.Exit(0)
    spec = build_spec_from_cli(input_path, config_path, override, seed, output_format)
    if dry_run:
        typer.echo("Dry run: would process with spec above")
        raise typer.Exit(0)
    result = run_from_spec(spec)
    exit_code = handle_result(result, spec.output_format)
    raise typer.Exit(exit_code)

4. Reference Implementations

4.1 Click Basics in FP

import click

@click.group()
def cli():
    pass

@cli.command()
@click.argument('input_path', type=click.Path(exists=True, path_type=Path))
@click.option('--config-path', type=click.Path(exists=True, path_type=Path))
@click.option('--set', 'overrides', multiple=True, help="Override step.params.key=value")
@click.option('--seed', type=int)
@click.option('--format', 'output_format', default='json')
@click.option('--dry-run', is_flag=True)
@click.option('--print-config', is_flag=True)
def rag_process(input_path, config_path, overrides, seed, output_format, dry_run, print_config):
    if print_config:
        config = load_and_override({}, {}, config_path)
        click.echo(config.model_dump_json(indent=2))
        raise click.exceptions.Exit(0)
    spec = build_spec_from_cli(input_path, config_path, overrides, seed, output_format)
    if dry_run:
        click.echo("Dry run: would process with spec above")
        raise click.exceptions.Exit(0)
    result = run_from_spec(spec)
    exit_code = handle_result(result, spec.output_format)  # Use click.echo in handle
    raise click.exceptions.Exit(exit_code)

4.2 Typer for Type-Driven

# See Public API; similar

4.3 Config Loading/Overrides

# See load_and_override; expand parse_override for nested
# For nested list, extend to support [index]; placeholder for prod

4.4 Integration in RAG

# pyproject.toml for entrypoints (this repo uses a stdlib argparse shell)
[project.scripts]
funcpipe-rag = "funcpipe_rag.boundaries.shells.cli:main"

4.5 Before/After Refactor

# Before: Script
config = load_config('default.json')
result = rag_pipeline(config, docs)
# After: CLI
$ rag rag-process input.txt --set chunk.params.size=256

5. Property-Based Proofs (repo tests)

Runnable tests live in tests/unit/pipelines/test_cli_overrides.py.

from typer.testing import CliRunner
from hypothesis import given
import hypothesis.strategies as st
import os
import pytest
from funcpipe_rag import StepConfig  # Import for tests
from unittest.mock import patch  # Import for tests


# Property tests on pure layers
@given(st.dictionaries(st.text(), st.text()))
def test_deep_merge_precedence(base):
    env = {'a': 'env'}
    cli = {'a': 'cli', 'b': 'cli'}
    merged = deep_merge(deep_merge(base, env), cli)
    assert merged['a'] == 'cli'
    assert 'b' in merged


@given(st.integers(min_value=100, max_value=1000))
def test_runner_equiv(chunk_size):
    config = PipelineConfig(steps=[StepConfig(name='chunk', params={'size': chunk_size})])
    spec = RunSpec(input_path=Path('test.txt'), config=config, artifacts={}, seed=42, output_format='json')
    # In this repo, the CLI shell lives in `funcpipe_rag.boundaries.shells.cli`.
    # Patch boundary I/O (storage) rather than core pipeline building.
    with patch('funcpipe_rag.infra.adapters.file_storage.FileStorage.read_docs') as mock_load:
        mock_load.return_value = []  # Mock boundary
        result = run_from_spec(spec)
    assert result.is_ok()  # Or check value


# CLI example-based
def test_cli_dry_run():
    runner = CliRunner()
    with runner.isolated_filesystem():
        Path('test.txt').write_text('data')
        os.environ['FUNCPipe_CONFIG'] = ''  # Pin
        result = runner.invoke(app, ['rag-process', 'test.txt', '--dry-run'])
        assert result.exit_code == 0
        assert "Dry run" in result.output

6. Runtime Preservation Guarantee

CLI adds no overhead; core perf preserved.


7. Anti-Patterns & Immediate Fixes

Anti-Pattern Symptom Fix
Logic in CLI Untestable Delegate to pure runner
No overrides Rigid Add options/env/merge
No validation Bad args Use typer/click types + Pydantic

8. Pre-Core Quiz

  1. Click for…? → Flexible CLIs
  2. Typer for…? → Type-driven
  3. Config for…? → Reproducibility
  4. Thin adapters? → Testability
  5. Benefit? → User-friendly entry

9. Post-Core Exercise

  1. Add CLI to RAG pipeline.
  2. Test equivalence with invoke.

Pipeline Usage (Idiomatic)

@app.command()
def handler(arg: Type):
    spec = build_spec_from_cli(arg)
    result = run_from_spec(spec)
    raise typer.Exit(handle_result(result, spec.output_format))

Next Core: 7. FP Style with Distributed/Dataflow Systems (Dask/Beam-Like Transform + Sink Thinking)