Core 1: Systematic Refactor – Imperative Spaghetti → Layered FuncPipe (Step-by-Step)¶
Module 10
Core question:
How do you iteratively refactor imperative spaghetti—entangled loops, shared state, hidden deps, and effects—into a layered FuncPipe pipeline with pure core, explicit boundaries, and verifiable equivalence, using a heuristics-guided cycle to handle real legacies safely?
In this core, we outline a heuristics-guided migration cycle for refactoring imperative code to functional pipelines in the FuncPipe RAG Builder (now at funcpipe-rag-10). We separate the abstract protocol (reusable framework), a realistic case study (tangled legacy script), and a verification appendix (patterns for equivalence evidence). Builds on Module 09 standards.
Motivation Bug: Tangled imperatives are flaky; big-bang risks regressions—heuristic-guided cycle with equivalence evidence enables safe, adaptive migration.
Delta from Module 09: Standards define good FP; this gives the adaptive playbook.
Audience: Engineers refactoring legacies safely.
Outcome: 1. Audit/classify impurities, including structural tangles. 2. Apply heuristics-guided cycle with diverging paths. 3. Migrate RAG, verifying equivalence/laws.
Migration Protocol (Abstract, Reusable)¶
- Cycle: Heuristics-guided loop (graph + scores below) until stop: no impurities, equivalence holds, laws pass, budget met.
- Semantics: Preserve evolving equivalence via code predicates (below); mocks for effects.
- Purity: Progressive: tag → parameterize → extract; full when untagged.
- Error Model Default: Per-record Result streams during migration (preserve continuation); collapse to batch post-purity when (a) adapters idempotent, (b) no per-record branching, (c) consumers don't need partials.
- Resource Safety: Ports return ContextManager[Iterator[Result[T, E]]]; IoAction enters/exits during perform. Core never stores raw iterators.
- Integration: Apply to legacies; multi-layered verification (appendix).
- Mypy Config: --strict incremental; shims for sigs (schedule below).
- Stop Condition: No tags, equivalence holds, laws pass, budget met.
Equivalence Predicates (Code, Normative)¶
Parameterized to domain.
from collections import Counter
def eq_pure(xs: list, ys: list, key: Callable[[Any], Hashable], order_sensitive: bool = False) -> bool:
if order_sensitive: return [key(x) for x in xs] == [key(y) for y in ys]
return Counter(key(x) for x in xs) == Counter(key(y) for y in ys) # Multiset; assumes hashable keys. If keys aren’t hashable, map to a stable serialization or use sorted lists under a total order.
def eq_boundary(a: dict, b: dict, normalize: Callable[[dict], dict]) -> bool:
return normalize(a) == normalize(b) # Per-artifact; e.g., strip timestamps/logs
def eq_error(e1: list[ErrInfo], e2: list[ErrInfo], classify: Callable[[ErrInfo], tuple[str, str]]) -> bool:
return Counter(classify(e) for e in e1) == Counter(classify(e) for e in e2) # Multisets of (code, stage)
Decision Graph + Heuristics (Operational)¶
Graph:
Start: Audit/Tag (score smells) → [Choose max-score path]
├── Extract Pure Islands (score: mutation density + cyclomatic complexity)
│ └── Laws/eq fail? → Loop/undo
├── Extract Boundaries (score: % lines in I/O + exception density)
│ └── Swappability fail? → Loop/undo
├── Normalize Errors (score: exception density + silent swallows)
│ └── Eq_error fail? → Loop/undo
├── Parameterize Deps (score: # global/hidden reads/writes + fan-in/out of shared state)
│ └── Equivalence fail? → Loop/undo
└── Any: Verify (appendix) + Optimize if budget over (fuse imperative behind pure)
└── Fail? → Loop/escalate
Stop: No impurities, equivalence holds, laws pass, budget met.
Iterations continue until stop; variance wide.
Resource Pattern (Canonical Combinator)¶
def within(cm_factory: Callable[[], ContextManager[Iterator[T]]], fn: Callable[[Iterator[T]], Iterator[U]]) -> IoAction[Iterator[U]]:
def thunk():
with cm_factory() as it:
yield from fn(it)
return IoAction.delay(thunk) # Enter/exit in perform; safe partial
# Usage
read_desc = within(lambda: storage.read_docs(path), identity) # Streaming safe
Artifact Taxonomy (For eq_boundary)¶
Artifacts classed as: (a) line logs (normalize: sort + strip timestamps), (b) structured records (normalize: key-based multisets), (c) blobs (normalize: hash/content eq). Each port exposes normalizer for its class.
Signature Evolution Schedule (3-Phase)¶
- Shadow: Run new in parallel; log/assert equivalence; no output switch.
- Canary: Subset traffic to new; monitor; non-flaky tests via mocks/seeds.
- Cutover: Switch; remove legacy/shims after stability (e.g., 1 sprint).
Case Study: Tangled RAG God Script Migration¶
Legacy (Before: Realistic Ugliness)¶
Anonymized from internal: partials feed phases; hidden coupling; mutable backoff; shared mutables.
# real_tangled_rag.py (anonymized)
import csv, json, random, time, os # Hidden deps
CONFIG = {"chunk_size": 256, "retry": 3, "backoff": [1,2,4]} # Global mutable
SHARED_STATE = {"partials": [], "error_counts": {}, "cache": {}} # Shared mutables across phases/modules
def tangled_rag(in_path: str, out_path: str, temp_dir: str) -> list[Chunk]: # Coupled to OS via temp
global CONFIG, SHARED_STATE
chunks = []
with open(in_path) as f:
reader = csv.DictReader(f)
for row in reader: # Parsing + retry + partials intertwined
backoff = CONFIG['backoff'][:] # Mutable copy; modifies over retries
for attempt in range(CONFIG['retry']):
try:
doc = RawDoc(**row)
SHARED_STATE['cache'][doc.doc_id] = doc # Mutate shared
break
except KeyError as e:
SHARED_STATE['error_counts'][str(e)] = SHARED_STATE['error_counts'].get(str(e), 0) + 1
if SHARED_STATE['error_counts'][str(e)] > 5: break # Cross-cut threshold
time.sleep(backoff[attempt % len(backoff)]) # Hidden time; mutates backoff?
backoff[attempt % len(backoff)] *= 1.5 # Mutate backoff state
if 'cs.' not in row.get('categories', ''): continue # Flag in loop
cleaned = CleanDoc(...) # ...
SHARED_STATE['partials'].append(cleaned) # Feed to next phase
# Inner loop: chunk + check partials (coupling)
temp_file = os.path.join(temp_dir, f"{cleaned.doc_id}.tmp") # OS couple
with open(temp_file, 'w') as tf: # Interleaved IO
offset = random.randint(0,10)
for i in range(0, len(cleaned.abstract), CONFIG['chunk_size'] + offset):
chunk = Chunk(...)
if any(chunk.text in p.abstract for p in SHARED_STATE['partials']): continue # Cross-phase check
tf.write(json.dumps(chunk.to_dict()) + '\n')
chunks.append(chunk)
# Reuse partials in write phase
with open(out_path, 'w') as f:
for chunk in chunks:
if chunk.doc_id in SHARED_STATE['cache']: # Hidden coupling
f.write(json.dumps(chunk.to_dict()) + '\n')
return chunks
Iteration 1: Audit + Parameterize Deps (High # Globals/Fan-in)¶
Score: High globals (6+) + fan-in (partials/cache/error_counts). Micro-commit Delta (Tagged → Parameterized):
# Before (tagged excerpt)
global CONFIG, SHARED_STATE # Impurity: global mutable
backoff = CONFIG['backoff'][:] # Impurity: mutable copy
time.sleep(backoff[attempt % len(backoff)]) # Impurity: hidden time
backoff[attempt % len(backoff)] *= 1.5 # Impurity: mutate state
offset = random.randint(0,10) # Impurity: hidden RNG
# After (parameterized)
def tangled_rag(..., config: dict | None = None, shared_state: dict | None = None, clock: Callable[[float], None] = time.sleep, rng: random.Random = random.Random(42)) -> list[Chunk]:
config = config or CONFIG.copy()
shared_state = shared_state or {"partials": [], "error_counts": {}, "cache": {}}
backoff = config['backoff'][:] # Still mutable; next iteration
clock(backoff[attempt % len(backoff)])
offset = rng.randint(0,10)
# Preserve eq_pure(chunks, key=lambda c: (c.doc_id, c.text)); eq_error(classify=lambda e: (type(e), e.key))
Iteration 2: Normalize Errors + Extract Boundaries (High Exceptions/IO %)¶
Score: Exception density high; % IO ~30. Port: As protocol; adapter handles retries/backoff immutably. Verify: eq_error; metamorphic (retry idempotence).
Loop: Mutations remain → Next pure.
Iteration 3: Extract Pure Islands (High Mutations/Complexity)¶
Extract cleaning/chunking; shared state → explicit dicts. Assume constructors are pure; if not, treat them as effect boundaries and port them first. Score: Mutation density high; complexity from loops. Collapse errors? No—adapters not idempotent. Verify: Metamorphic families (appendix).
Stop: Checks pass.
Verification Appendix (Patterns)¶
Multi-Layered Evidence¶
- Equivalence: Per pass, using predicates + domain params.
- Local Laws: Idempotence/associativity for new pure (Module 05/06).
- Swappability: Mock adapter == real (eq_boundary).
- Metamorphic: Derive via:
- Invariant (e.g., order irrelevance, retry idempotence, chunk monotonicity, add-noise, scale).
- Transform (permute, rerun with seed, vary size, add errors, scale input).
- Assert: eq_pure/eq_error post-transform.
Canonical Families: - Permutation: For order-irrelevance. - Retry: Same input/seeds → same output (idempotence). - Monotonicity: Smaller chunk_size → more chunks. - Noise: Add irrelevant records → same core outputs. - Associativity: Split/recombine folds → same aggregate.
... (examples per iteration in case study)
6. Runtime Preservation Guarantee¶
Set system-specific budget (multi-dimensional: latency, throughput, peak RSS). Profile per pass; fuse if over (imperative behind pure); defer non-blocking regressions to Core 2.
7. Anti-Patterns & Immediate Fixes¶
| Anti-Pattern | Symptom | Fix |
|---|---|---|
| Vibes-based choices | Inefficient paths | Use heuristics/scores |
| Naïve predicates | Pass regressions | Parameterize + multisets |
| Early collapse | Lose continuation | Gate on adapter idempotence |
| Synthetic examples | Unrealistic prep | Use anonymized real code |
8. Pre-Core Quiz¶
- Choose paths by…? → Heuristics/scores
- Predicates parameterized by…? → Domain key/normalize
- Collapse errors when…? → Adapters idempotent
- Metamorphic derivation? → Invariant + transform + assert
- Sig evolution? → 3-phase schedule
9. Post-Core Exercise¶
- Audit real tangled function; score + run 1 iteration.
- Define predicates + metamorphic family.
- Iterate to stop; profile.
Pipeline Usage (Idiomatic)
Next: core 2. Performance Budgeting – When FP Wins, When to Vectorize, When to Drop to C/NumPy