Skip to content

M07C10: Incremental Migration – Pushing Effects Outward in Existing Codebases

Module 07 – Final Main Track Core

Main track: Cores 1, 3–10 (Ports & Adapters + Capability Protocols → Production).
This is the capstone core – the practical payoff of everything in Module 7.

Progression Note

Module 7 takes the lawful containers and pipelines from Module 6 and puts all effects behind explicit boundaries.

Module Focus Key Outcomes
6 Monadic Flows as Composable Pipelines Lawful and_then, Reader/State/Writer patterns, error-typed flows
7 Effect Boundaries & Resource Safety Ports & adapters, capability protocols, resource-safe IO, idempotency
8 Async / Concurrent Pipelines Backpressure, timeouts, resumability, fairness (built on 6–7)

Core question
How do you incrementally migrate an existing imperative codebase full of scattered side effects to the pure FP architecture of Module 7 – step by step, without big rewrites, while mechanically proving behavioural equivalence at every stage?

1. Laws & Invariants (machine-checked in CI)

Law / Invariant Description Enforcement
Behavioural Equivalence After every migration step, the new implementation produces identical observable outputs for the same inputs. In practice, equivalence is defined on the stable boundary surface – file contents, HTTP responses, messages – typically excluding diagnostic logs. Where logs are part of the contract, they are compared via a normalised view (e.g. structured entries with timestamps stripped or injected deterministically in tests). Hypothesis properties (golden + random)
Purity Progress Each step reduces the number of direct effect sites in core modules. CI metric + review
No Regression All equivalence tests from previous steps continue to pass after the current step. Full suite in CI
Incremental Safety Every intermediate version is runnable and type-checks under mypy --strict. CI pipeline

These laws guarantee that migration is safe, verifiable, and never a leap of faith.

2. Decision Table – Which Migration Step for Which Symptom?

Symptom Example Code Pattern Recommended Step
Direct open() / requests.get() in core with open(path) as f: ... in domain Step 4 – Introduce port
datetime.now() / random in core start = datetime.now() Step 4 – Clock/RNG capability
Global config / singletons GLOBAL_CONFIG = load() Step 5 – Reader[Env] injection
print debugging scattered print("processing", doc.id) Step 6 – Writer logging
Manual try/except + early return try: ... except: return None Step 3 – Pure extraction + Result
Transaction handling with globals global_session.begin() Use explicit Session/Tx pattern from M07C09

Rule: Fix one symptom at a time. Never do more than one step in a single PR.

3. The 6-Step Migration Protocol (memorise this)

  1. Characterise – Write black-box equivalence tests around the current behaviour (golden inputs → expected outputs + side effects).
    Exit criteria: Tests pass; you have a safety net.

  2. Effect Census – Tag every line that performs I/O, mutation, time, randomness, or logging. Produce a table.
    Exit criteria: Every effect is listed and tagged.

Example census row for the god script:

Location Effect kind Description Migration step
line 6 time datetime.datetime.now() Step 4
line 7 logging print(f"[{start}] Starting...") Step 6
line 10–12 file read open(...) + csv.DictReader Step 4
line 25–29 file write open(..., "w") + json.dump Step 4
  1. Pure Extraction – Move logic into pure functions, parameterising all dependencies. Return Result / Writer where appropriate.
    Exit criteria: Core runs with dummy capabilities; new unit tests possible.

  2. Boundary Introduction – Define minimal capability protocols for each effect group. Replace tagged lines with protocol calls.
    Exit criteria: Core depends only on protocols; adapters implement them.

  3. Description Lifting – Replace direct protocol calls with IOPlan.delay(...). Core now returns descriptions.
    Exit criteria: No perform / direct effects in core; shell executes.

  4. Equivalence Proof – For every step, add Hypothesis property that old vs new produce identical observable results.
    Exit criteria: All equivalence tests pass in CI.

Do these steps in order, one function at a time. The whole codebase migrates gradually without ever breaking.

4. Reference Implementations – Full Stepwise Migration of a Real God Script

Starting Point – Typical Imperative RAG Script (god_rag.py)

# god_rag.py – the kind of script everyone has
import csv
import json
import datetime

def rag_god(input_path: str, output_path: str):
    start = datetime.datetime.now()
    print(f"[{start}] Starting RAG on {input_path}")

    with open(input_path) as f:
        reader = csv.DictReader(f)
        docs = [RawDoc(**row) for row in reader]

    print(f"Loaded {len(docs)} docs")

    chunks = []
    for doc in docs:
        print(f"Processing doc {doc.id}")
        chunks.extend(gen_chunks(doc))

    duration = datetime.datetime.now() - start
    print(f"Processed {len(chunks)} chunks in {duration}")

    with open(output_path, "w") as f:
        for c in chunks:
            json.dump(chunk_to_dict(c), f)
            f.write("\n")

    print("Done")

Step 1–2: Characterise + Effect Census (add tests + tags)

# tests/test_god_equivalence.py
@given(...)  # or golden files
def test_god_rag_preserves_behavior(input_csv, expected_jsonl):
    with tempfile.TemporaryDirectory() as tmp:
        in_path = os.path.join(tmp, "in.csv")
        out_path = os.path.join(tmp, "out.jsonl")
        write_csv(input_csv, in_path)
        rag_god(in_path, out_path)
        assert read_jsonl(out_path) == expected_jsonl

Tag effects in god_rag.py with comments:

# EFFECT: time
# EFFECT: print
# EFFECT: file read
# EFFECT: file write

Step 3: Pure Extraction

def pure_process_docs(docs: list[RawDoc]) -> list[Chunk]:
    chunks = []
    for doc in docs:
        chunks.extend(gen_chunks(doc))
    return chunks

def rag_pure(docs: list[RawDoc]) -> list[Chunk]:
    return pure_process_docs(docs)

Step 4: Boundary Introduction (ports + capabilities)

class StorageCap(Protocol):
    def read_docs(self, path: str) -> Iterator[Result[RawDoc, ErrInfo]]: ...
    def write_chunks(self, path: str, chunks: Iterator[Chunk]) -> Result[None, ErrInfo]: ...

class ClockCap(Protocol):
    def now(self) -> datetime.datetime: ...

class LoggerCap(Protocol):
    def log(self, entry: LogEntry) -> None: ...

def rag_with_caps(
    storage: StorageCap,
    clock: ClockCap,
    logger: LoggerCap,
    input_path: str,
    output_path: str,
) -> Result[list[Chunk], ErrInfo]:
    start = clock.now()
    logger.log(LogEntry("INFO", f"Starting RAG on {input_path}"))

    doc_results = list(storage.read_docs(input_path))
    errs = [r.error for r in doc_results if isinstance(r, Err)]
    if errs:
        logger.log(LogEntry("ERROR", f"Read failed: {errs[0].message}"))
        return Err(errs[0])  # domain-specific error aggregation strategy

    docs = [r.value for r in doc_results if isinstance(r, Ok)]
    logger.log(LogEntry("INFO", f"Loaded {len(docs)} docs"))

    chunks = rag_pure(docs)

    duration = clock.now() - start
    logger.log(LogEntry("INFO", f"Processed {len(chunks)} chunks in {duration}"))

    write_res = storage.write_chunks(output_path, iter(chunks))
    if isinstance(write_res, Err):
        logger.log(LogEntry("ERROR", f"Write failed: {write_res.error.message}"))
        return write_res

    logger.log(LogEntry("INFO", "Done"))
    return Ok(chunks)

Step 5: Description Lifting (IOPlan)

def rag_described(
    storage: StorageCap,
    clock: ClockCap,
    logger: LoggerCap,
    input_path: str,
    output_path: str,
) -> IOPlan[Result[list[Chunk], ErrInfo]]:
    def step_start() -> IOPlan[datetime.datetime]:
        return io_delay(clock.now)

    def step_log_start(start: datetime.datetime) -> IOPlan[None]:
        return io_delay(lambda: logger.log(LogEntry("INFO", f"[{start}] Starting {input_path}")))

    def step_read_docs() -> IOPlan[list[Result[RawDoc, ErrInfo]]]:
        return io_delay(lambda: list(storage.read_docs(input_path)))

    def step_process_docs(doc_results: list[Result[RawDoc, ErrInfo]]) -> IOPlan[Result[list[Chunk], ErrInfo]]:
        def compute() -> Result[list[Chunk], ErrInfo]:
            errs = [r.error for r in doc_results if isinstance(r, Err)]
            if errs:
                return Err(errs[0])
            docs = [r.value for r in doc_results if isinstance(r, Ok)]
            return Ok(rag_pure(docs))
        return io_delay(compute)

    def step_write_and_log(start: datetime.datetime, chunks: list[Chunk]) -> IOPlan[Result[list[Chunk], ErrInfo]]:
        def act() -> Result[list[Chunk], ErrInfo]:
            duration = clock.now() - start
            logger.log(LogEntry("INFO", f"Processed {len(chunks)} chunks in {duration}"))
            write_res = storage.write_chunks(output_path, iter(chunks))
            if isinstance(write_res, Err):
                logger.log(LogEntry("ERROR", f"Write failed: {write_res.error.message}"))
                return write_res
            logger.log(LogEntry("INFO", "Done"))
            return Ok(chunks)
        return io_delay(act)

    return io_bind(
        step_start(),
        lambda start: io_bind(
            step_log_start(start),
            lambda _: io_bind(
                step_read_docs(),
                lambda doc_results: io_bind(
                    step_process_docs(doc_results),
                    lambda res: io_pure(res) if isinstance(res, Err)
                    else step_write_and_log(start, res.value)
                ),
            ),
        ),
    )

Step 6: Equivalence Proof (final)

@given(docs=doc_list_strategy())
def test_full_migration_equivalence(docs):
    with tempfile.TemporaryDirectory() as tmp:
        in_path = os.path.join(tmp, "in.csv")
        out_path = os.path.join(tmp, "out.jsonl")
        write_csv(docs, in_path)

        # Old god version
        rag_god(in_path, out_path)
        old_output = read_jsonl(out_path)

        # New FP version
        cap = FullCapabilities(
            storage=FileStorageCap(tmp),
            clock=SystemClock(),
            logger=CollectingLogger(),
        )
        plan = rag_described(cap.storage, cap.clock, cap.logger, in_path, out_path)
        perform(plan)

        new_output = read_jsonl(out_path)
        assert old_output == new_output

5. Property-Based Proofs (per-step examples in repo)

Every migration step adds its own equivalence property. The final test above is the cumulative proof.

6. Runtime Preservation Guarantee

Each step preserves asymptotic complexity. Some steps introduce constant-factor overhead or memory buffering (e.g. materialisation for idempotency/resource safety) – these are explicitly called out and measured in benchmarks.

7. Anti-Patterns & Immediate Fixes

Anti-Pattern Symptom Fix
Big-bang rewrite High risk, blocked PRs One step per PR
Skipping equivalence tests Silent regressions Add property per step
Migrating without census Missed effects Always start with effect audit
Keeping old code dead Bloat Delete old version after equivalence passes

8. Pre-Core Quiz

  1. First migration step? → Characterise with black-box tests
  2. Verify each step with…? → Equivalence properties
  3. Never do…? → More than one step in a single change
  4. Golden rule? → One effect type per migration PR
  5. Final state? → Pure core + thin shell

9. Post-Core Exercise

  1. Take the ugliest function in your real codebase and apply steps 1–3.
  2. Add an equivalence property comparing old vs new.
  3. Open a PR with just that change – watch it pass CI while the rest of the system remains unchanged.
  4. Celebrate: you now know how to migrate anything to FuncPipe without fear.

End of Module 7

You have completed the full production-grade functional architecture:

  • Lawful monadic pipelines (Module 6)
  • Explicit effect boundaries and resource safety (Module 7)
  • Incremental migration path from imperative spaghetti to pure FP (this core)

Your system is now correct, fast, testable, retry-safe, resource-safe, and evolvable – without ever requiring a big-bang rewrite.

Module 8 begins the distributed / concurrent layer (async streams, backpressure, sharding) built directly on this foundation.