M07C10: Incremental Migration – Pushing Effects Outward in Existing Codebases¶
Module 07 – Final Main Track Core
Main track: Cores 1, 3–10 (Ports & Adapters + Capability Protocols → Production).
This is the capstone core – the practical payoff of everything in Module 7.
Progression Note¶
Module 7 takes the lawful containers and pipelines from Module 6 and puts all effects behind explicit boundaries.
| Module | Focus | Key Outcomes |
|---|---|---|
| 6 | Monadic Flows as Composable Pipelines | Lawful and_then, Reader/State/Writer patterns, error-typed flows |
| 7 | Effect Boundaries & Resource Safety | Ports & adapters, capability protocols, resource-safe IO, idempotency |
| 8 | Async / Concurrent Pipelines | Backpressure, timeouts, resumability, fairness (built on 6–7) |
Core question
How do you incrementally migrate an existing imperative codebase full of scattered side effects to the pure FP architecture of Module 7 – step by step, without big rewrites, while mechanically proving behavioural equivalence at every stage?
1. Laws & Invariants (machine-checked in CI)¶
| Law / Invariant | Description | Enforcement |
|---|---|---|
| Behavioural Equivalence | After every migration step, the new implementation produces identical observable outputs for the same inputs. In practice, equivalence is defined on the stable boundary surface – file contents, HTTP responses, messages – typically excluding diagnostic logs. Where logs are part of the contract, they are compared via a normalised view (e.g. structured entries with timestamps stripped or injected deterministically in tests). | Hypothesis properties (golden + random) |
| Purity Progress | Each step reduces the number of direct effect sites in core modules. | CI metric + review |
| No Regression | All equivalence tests from previous steps continue to pass after the current step. | Full suite in CI |
| Incremental Safety | Every intermediate version is runnable and type-checks under mypy --strict. |
CI pipeline |
These laws guarantee that migration is safe, verifiable, and never a leap of faith.
2. Decision Table – Which Migration Step for Which Symptom?¶
| Symptom | Example Code Pattern | Recommended Step |
|---|---|---|
Direct open() / requests.get() in core |
with open(path) as f: ... in domain |
Step 4 – Introduce port |
datetime.now() / random in core |
start = datetime.now() |
Step 4 – Clock/RNG capability |
| Global config / singletons | GLOBAL_CONFIG = load() |
Step 5 – Reader[Env] injection |
print debugging scattered |
print("processing", doc.id) |
Step 6 – Writer logging |
| Manual try/except + early return | try: ... except: return None |
Step 3 – Pure extraction + Result |
| Transaction handling with globals | global_session.begin() |
Use explicit Session/Tx pattern from M07C09 |
Rule: Fix one symptom at a time. Never do more than one step in a single PR.
3. The 6-Step Migration Protocol (memorise this)¶
-
Characterise – Write black-box equivalence tests around the current behaviour (golden inputs → expected outputs + side effects).
Exit criteria: Tests pass; you have a safety net. -
Effect Census – Tag every line that performs I/O, mutation, time, randomness, or logging. Produce a table.
Exit criteria: Every effect is listed and tagged.
Example census row for the god script:
| Location | Effect kind | Description | Migration step |
|---|---|---|---|
| line 6 | time | datetime.datetime.now() |
Step 4 |
| line 7 | logging | print(f"[{start}] Starting...") |
Step 6 |
| line 10–12 | file read | open(...) + csv.DictReader |
Step 4 |
| line 25–29 | file write | open(..., "w") + json.dump |
Step 4 |
-
Pure Extraction – Move logic into pure functions, parameterising all dependencies. Return
Result/Writerwhere appropriate.
Exit criteria: Core runs with dummy capabilities; new unit tests possible. -
Boundary Introduction – Define minimal capability protocols for each effect group. Replace tagged lines with protocol calls.
Exit criteria: Core depends only on protocols; adapters implement them. -
Description Lifting – Replace direct protocol calls with
IOPlan.delay(...). Core now returns descriptions.
Exit criteria: Noperform/ direct effects in core; shell executes. -
Equivalence Proof – For every step, add Hypothesis property that old vs new produce identical observable results.
Exit criteria: All equivalence tests pass in CI.
Do these steps in order, one function at a time. The whole codebase migrates gradually without ever breaking.
4. Reference Implementations – Full Stepwise Migration of a Real God Script¶
Starting Point – Typical Imperative RAG Script (god_rag.py)¶
# god_rag.py – the kind of script everyone has
import csv
import json
import datetime
def rag_god(input_path: str, output_path: str):
start = datetime.datetime.now()
print(f"[{start}] Starting RAG on {input_path}")
with open(input_path) as f:
reader = csv.DictReader(f)
docs = [RawDoc(**row) for row in reader]
print(f"Loaded {len(docs)} docs")
chunks = []
for doc in docs:
print(f"Processing doc {doc.id}")
chunks.extend(gen_chunks(doc))
duration = datetime.datetime.now() - start
print(f"Processed {len(chunks)} chunks in {duration}")
with open(output_path, "w") as f:
for c in chunks:
json.dump(chunk_to_dict(c), f)
f.write("\n")
print("Done")
Step 1–2: Characterise + Effect Census (add tests + tags)¶
# tests/test_god_equivalence.py
@given(...) # or golden files
def test_god_rag_preserves_behavior(input_csv, expected_jsonl):
with tempfile.TemporaryDirectory() as tmp:
in_path = os.path.join(tmp, "in.csv")
out_path = os.path.join(tmp, "out.jsonl")
write_csv(input_csv, in_path)
rag_god(in_path, out_path)
assert read_jsonl(out_path) == expected_jsonl
Tag effects in god_rag.py with comments:
Step 3: Pure Extraction¶
def pure_process_docs(docs: list[RawDoc]) -> list[Chunk]:
chunks = []
for doc in docs:
chunks.extend(gen_chunks(doc))
return chunks
def rag_pure(docs: list[RawDoc]) -> list[Chunk]:
return pure_process_docs(docs)
Step 4: Boundary Introduction (ports + capabilities)¶
class StorageCap(Protocol):
def read_docs(self, path: str) -> Iterator[Result[RawDoc, ErrInfo]]: ...
def write_chunks(self, path: str, chunks: Iterator[Chunk]) -> Result[None, ErrInfo]: ...
class ClockCap(Protocol):
def now(self) -> datetime.datetime: ...
class LoggerCap(Protocol):
def log(self, entry: LogEntry) -> None: ...
def rag_with_caps(
storage: StorageCap,
clock: ClockCap,
logger: LoggerCap,
input_path: str,
output_path: str,
) -> Result[list[Chunk], ErrInfo]:
start = clock.now()
logger.log(LogEntry("INFO", f"Starting RAG on {input_path}"))
doc_results = list(storage.read_docs(input_path))
errs = [r.error for r in doc_results if isinstance(r, Err)]
if errs:
logger.log(LogEntry("ERROR", f"Read failed: {errs[0].message}"))
return Err(errs[0]) # domain-specific error aggregation strategy
docs = [r.value for r in doc_results if isinstance(r, Ok)]
logger.log(LogEntry("INFO", f"Loaded {len(docs)} docs"))
chunks = rag_pure(docs)
duration = clock.now() - start
logger.log(LogEntry("INFO", f"Processed {len(chunks)} chunks in {duration}"))
write_res = storage.write_chunks(output_path, iter(chunks))
if isinstance(write_res, Err):
logger.log(LogEntry("ERROR", f"Write failed: {write_res.error.message}"))
return write_res
logger.log(LogEntry("INFO", "Done"))
return Ok(chunks)
Step 5: Description Lifting (IOPlan)¶
def rag_described(
storage: StorageCap,
clock: ClockCap,
logger: LoggerCap,
input_path: str,
output_path: str,
) -> IOPlan[Result[list[Chunk], ErrInfo]]:
def step_start() -> IOPlan[datetime.datetime]:
return io_delay(clock.now)
def step_log_start(start: datetime.datetime) -> IOPlan[None]:
return io_delay(lambda: logger.log(LogEntry("INFO", f"[{start}] Starting {input_path}")))
def step_read_docs() -> IOPlan[list[Result[RawDoc, ErrInfo]]]:
return io_delay(lambda: list(storage.read_docs(input_path)))
def step_process_docs(doc_results: list[Result[RawDoc, ErrInfo]]) -> IOPlan[Result[list[Chunk], ErrInfo]]:
def compute() -> Result[list[Chunk], ErrInfo]:
errs = [r.error for r in doc_results if isinstance(r, Err)]
if errs:
return Err(errs[0])
docs = [r.value for r in doc_results if isinstance(r, Ok)]
return Ok(rag_pure(docs))
return io_delay(compute)
def step_write_and_log(start: datetime.datetime, chunks: list[Chunk]) -> IOPlan[Result[list[Chunk], ErrInfo]]:
def act() -> Result[list[Chunk], ErrInfo]:
duration = clock.now() - start
logger.log(LogEntry("INFO", f"Processed {len(chunks)} chunks in {duration}"))
write_res = storage.write_chunks(output_path, iter(chunks))
if isinstance(write_res, Err):
logger.log(LogEntry("ERROR", f"Write failed: {write_res.error.message}"))
return write_res
logger.log(LogEntry("INFO", "Done"))
return Ok(chunks)
return io_delay(act)
return io_bind(
step_start(),
lambda start: io_bind(
step_log_start(start),
lambda _: io_bind(
step_read_docs(),
lambda doc_results: io_bind(
step_process_docs(doc_results),
lambda res: io_pure(res) if isinstance(res, Err)
else step_write_and_log(start, res.value)
),
),
),
)
Step 6: Equivalence Proof (final)¶
@given(docs=doc_list_strategy())
def test_full_migration_equivalence(docs):
with tempfile.TemporaryDirectory() as tmp:
in_path = os.path.join(tmp, "in.csv")
out_path = os.path.join(tmp, "out.jsonl")
write_csv(docs, in_path)
# Old god version
rag_god(in_path, out_path)
old_output = read_jsonl(out_path)
# New FP version
cap = FullCapabilities(
storage=FileStorageCap(tmp),
clock=SystemClock(),
logger=CollectingLogger(),
)
plan = rag_described(cap.storage, cap.clock, cap.logger, in_path, out_path)
perform(plan)
new_output = read_jsonl(out_path)
assert old_output == new_output
5. Property-Based Proofs (per-step examples in repo)¶
Every migration step adds its own equivalence property. The final test above is the cumulative proof.
6. Runtime Preservation Guarantee¶
Each step preserves asymptotic complexity. Some steps introduce constant-factor overhead or memory buffering (e.g. materialisation for idempotency/resource safety) – these are explicitly called out and measured in benchmarks.
7. Anti-Patterns & Immediate Fixes¶
| Anti-Pattern | Symptom | Fix |
|---|---|---|
| Big-bang rewrite | High risk, blocked PRs | One step per PR |
| Skipping equivalence tests | Silent regressions | Add property per step |
| Migrating without census | Missed effects | Always start with effect audit |
| Keeping old code dead | Bloat | Delete old version after equivalence passes |
8. Pre-Core Quiz¶
- First migration step? → Characterise with black-box tests
- Verify each step with…? → Equivalence properties
- Never do…? → More than one step in a single change
- Golden rule? → One effect type per migration PR
- Final state? → Pure core + thin shell
9. Post-Core Exercise¶
- Take the ugliest function in your real codebase and apply steps 1–3.
- Add an equivalence property comparing old vs new.
- Open a PR with just that change – watch it pass CI while the rest of the system remains unchanged.
- Celebrate: you now know how to migrate anything to FuncPipe without fear.
End of Module 7
You have completed the full production-grade functional architecture:
- Lawful monadic pipelines (Module 6)
- Explicit effect boundaries and resource safety (Module 7)
- Incremental migration path from imperative spaghetti to pure FP (this core)
Your system is now correct, fast, testable, retry-safe, resource-safe, and evolvable – without ever requiring a big-bang rewrite.
Module 8 begins the distributed / concurrent layer (async streams, backpressure, sharding) built directly on this foundation.