Skip to content

Core 1: Systematic Refactor – Imperative Spaghetti → Layered FuncPipe (Step-by-Step)

Module 10

Core question:
How do you iteratively refactor imperative spaghetti—entangled loops, shared state, hidden deps, and effects—into a layered FuncPipe pipeline with pure core, explicit boundaries, and verifiable equivalence, using a heuristics-guided cycle to handle real legacies safely?

In this core, we outline a heuristics-guided migration cycle for refactoring imperative code to functional pipelines in the FuncPipe RAG Builder (now at funcpipe-rag-10). We separate the abstract protocol (reusable framework), a realistic case study (tangled legacy script), and a verification appendix (patterns for equivalence evidence). Builds on Module 09 standards.

Motivation Bug: Tangled imperatives are flaky; big-bang risks regressions—heuristic-guided cycle with equivalence evidence enables safe, adaptive migration.

Delta from Module 09: Standards define good FP; this gives the adaptive playbook.

Audience: Engineers refactoring legacies safely.

Outcome: 1. Audit/classify impurities, including structural tangles. 2. Apply heuristics-guided cycle with diverging paths. 3. Migrate RAG, verifying equivalence/laws.


Migration Protocol (Abstract, Reusable)

  • Cycle: Heuristics-guided loop (graph + scores below) until stop: no impurities, equivalence holds, laws pass, budget met.
  • Semantics: Preserve evolving equivalence via code predicates (below); mocks for effects.
  • Purity: Progressive: tag → parameterize → extract; full when untagged.
  • Error Model Default: Per-record Result streams during migration (preserve continuation); collapse to batch post-purity when (a) adapters idempotent, (b) no per-record branching, (c) consumers don't need partials.
  • Resource Safety: Ports return ContextManager[Iterator[Result[T, E]]]; IoAction enters/exits during perform. Core never stores raw iterators.
  • Integration: Apply to legacies; multi-layered verification (appendix).
  • Mypy Config: --strict incremental; shims for sigs (schedule below).
  • Stop Condition: No tags, equivalence holds, laws pass, budget met.

Equivalence Predicates (Code, Normative)

Parameterized to domain.

from collections import Counter
def eq_pure(xs: list, ys: list, key: Callable[[Any], Hashable], order_sensitive: bool = False) -> bool:
    if order_sensitive: return [key(x) for x in xs] == [key(y) for y in ys]
    return Counter(key(x) for x in xs) == Counter(key(y) for y in ys)  # Multiset; assumes hashable keys. If keys aren’t hashable, map to a stable serialization or use sorted lists under a total order.

def eq_boundary(a: dict, b: dict, normalize: Callable[[dict], dict]) -> bool:
    return normalize(a) == normalize(b)  # Per-artifact; e.g., strip timestamps/logs

def eq_error(e1: list[ErrInfo], e2: list[ErrInfo], classify: Callable[[ErrInfo], tuple[str, str]]) -> bool:
    return Counter(classify(e) for e in e1) == Counter(classify(e) for e in e2)  # Multisets of (code, stage)
- Domain: Define key/normalize/classify per system (e.g., key=Chunk.id; normalize strips noise; classify=(code, stage)). - Each pass specifies predicate + params.

Decision Graph + Heuristics (Operational)

Graph:

Start: Audit/Tag (score smells) → [Choose max-score path]
  ├── Extract Pure Islands (score: mutation density + cyclomatic complexity)
  │   └── Laws/eq fail? → Loop/undo
  ├── Extract Boundaries (score: % lines in I/O + exception density)
  │   └── Swappability fail? → Loop/undo
  ├── Normalize Errors (score: exception density + silent swallows)
  │   └── Eq_error fail? → Loop/undo
  ├── Parameterize Deps (score: # global/hidden reads/writes + fan-in/out of shared state)
  │   └── Equivalence fail? → Loop/undo
  └── Any: Verify (appendix) + Optimize if budget over (fuse imperative behind pure)
      └── Fail? → Loop/escalate
Stop: No impurities, equivalence holds, laws pass, budget met.
Heuristics (measurable; tie-breaker: choose pure if tied; unit = smallest refactorable region, usually loop nest or helper; split >80 LOC by effect boundary first. Normalize each score to [0,1] per region; sum with equal weights unless domain dictates otherwise. If top-two scores are within 10%, do a paired pass (e.g., parameterize + pure extract) to avoid churn. Approximate without tooling: grep symbols for fan-in/out, count branches/loops for CC): - Mutation density: # mut ops (assigns, .append) / lines. - Cyclomatic complexity (CC): # branches/loops per region. - % lines in I/O: IO calls / total. - Exception density: try/except / lines. - # global/hidden: Count reads/writes to globals/time/RNG/env. - Fan-in/out: # refs to shared objects.

Iterations continue until stop; variance wide.

Resource Pattern (Canonical Combinator)

def within(cm_factory: Callable[[], ContextManager[Iterator[T]]], fn: Callable[[Iterator[T]], Iterator[U]]) -> IoAction[Iterator[U]]:
    def thunk():
        with cm_factory() as it:
            yield from fn(it)
    return IoAction.delay(thunk)  # Enter/exit in perform; safe partial

# Usage
read_desc = within(lambda: storage.read_docs(path), identity)  # Streaming safe

Artifact Taxonomy (For eq_boundary)

Artifacts classed as: (a) line logs (normalize: sort + strip timestamps), (b) structured records (normalize: key-based multisets), (c) blobs (normalize: hash/content eq). Each port exposes normalizer for its class.

Signature Evolution Schedule (3-Phase)

  1. Shadow: Run new in parallel; log/assert equivalence; no output switch.
  2. Canary: Subset traffic to new; monitor; non-flaky tests via mocks/seeds.
  3. Cutover: Switch; remove legacy/shims after stability (e.g., 1 sprint).

Case Study: Tangled RAG God Script Migration

Legacy (Before: Realistic Ugliness)

Anonymized from internal: partials feed phases; hidden coupling; mutable backoff; shared mutables.

# real_tangled_rag.py (anonymized)
import csv, json, random, time, os  # Hidden deps
CONFIG = {"chunk_size": 256, "retry": 3, "backoff": [1,2,4]}  # Global mutable
SHARED_STATE = {"partials": [], "error_counts": {}, "cache": {}}  # Shared mutables across phases/modules
def tangled_rag(in_path: str, out_path: str, temp_dir: str) -> list[Chunk]:  # Coupled to OS via temp
    global CONFIG, SHARED_STATE
    chunks = []
    with open(in_path) as f:
        reader = csv.DictReader(f)
        for row in reader:  # Parsing + retry + partials intertwined
            backoff = CONFIG['backoff'][:]  # Mutable copy; modifies over retries
            for attempt in range(CONFIG['retry']):
                try:
                    doc = RawDoc(**row)
                    SHARED_STATE['cache'][doc.doc_id] = doc  # Mutate shared
                    break
                except KeyError as e:
                    SHARED_STATE['error_counts'][str(e)] = SHARED_STATE['error_counts'].get(str(e), 0) + 1
                    if SHARED_STATE['error_counts'][str(e)] > 5: break  # Cross-cut threshold
                    time.sleep(backoff[attempt % len(backoff)])  # Hidden time; mutates backoff?
                    backoff[attempt % len(backoff)] *= 1.5  # Mutate backoff state
            if 'cs.' not in row.get('categories', ''): continue  # Flag in loop
            cleaned = CleanDoc(...)  # ...
            SHARED_STATE['partials'].append(cleaned)  # Feed to next phase
            # Inner loop: chunk + check partials (coupling)
            temp_file = os.path.join(temp_dir, f"{cleaned.doc_id}.tmp")  # OS couple
            with open(temp_file, 'w') as tf:  # Interleaved IO
                offset = random.randint(0,10)
                for i in range(0, len(cleaned.abstract), CONFIG['chunk_size'] + offset):
                    chunk = Chunk(...)
                    if any(chunk.text in p.abstract for p in SHARED_STATE['partials']): continue  # Cross-phase check
                    tf.write(json.dumps(chunk.to_dict()) + '\n')
                    chunks.append(chunk)
            # Reuse partials in write phase
    with open(out_path, 'w') as f:
        for chunk in chunks:
            if chunk.doc_id in SHARED_STATE['cache']:  # Hidden coupling
                f.write(json.dumps(chunk.to_dict()) + '\n')
    return chunks

Iteration 1: Audit + Parameterize Deps (High # Globals/Fan-in)

Score: High globals (6+) + fan-in (partials/cache/error_counts). Micro-commit Delta (Tagged → Parameterized):

# Before (tagged excerpt)
global CONFIG, SHARED_STATE  # Impurity: global mutable
backoff = CONFIG['backoff'][:]  # Impurity: mutable copy
time.sleep(backoff[attempt % len(backoff)])  # Impurity: hidden time
backoff[attempt % len(backoff)] *= 1.5  # Impurity: mutate state
offset = random.randint(0,10)  # Impurity: hidden RNG

# After (parameterized)
def tangled_rag(..., config: dict | None = None, shared_state: dict | None = None, clock: Callable[[float], None] = time.sleep, rng: random.Random = random.Random(42)) -> list[Chunk]:
    config = config or CONFIG.copy()
    shared_state = shared_state or {"partials": [], "error_counts": {}, "cache": {}}
    backoff = config['backoff'][:]  # Still mutable; next iteration
    clock(backoff[attempt % len(backoff)])
    offset = rng.randint(0,10)
    # Preserve eq_pure(chunks, key=lambda c: (c.doc_id, c.text)); eq_error(classify=lambda e: (type(e), e.key))
Verify: Appendix patterns.

Iteration 2: Normalize Errors + Extract Boundaries (High Exceptions/IO %)

Score: Exception density high; % IO ~30. Port: As protocol; adapter handles retries/backoff immutably. Verify: eq_error; metamorphic (retry idempotence).

Loop: Mutations remain → Next pure.

Iteration 3: Extract Pure Islands (High Mutations/Complexity)

Extract cleaning/chunking; shared state → explicit dicts. Assume constructors are pure; if not, treat them as effect boundaries and port them first. Score: Mutation density high; complexity from loops. Collapse errors? No—adapters not idempotent. Verify: Metamorphic families (appendix).

Stop: Checks pass.


Verification Appendix (Patterns)

Multi-Layered Evidence

  • Equivalence: Per pass, using predicates + domain params.
  • Local Laws: Idempotence/associativity for new pure (Module 05/06).
  • Swappability: Mock adapter == real (eq_boundary).
  • Metamorphic: Derive via:
  • Invariant (e.g., order irrelevance, retry idempotence, chunk monotonicity, add-noise, scale).
  • Transform (permute, rerun with seed, vary size, add errors, scale input).
  • Assert: eq_pure/eq_error post-transform.

Canonical Families: - Permutation: For order-irrelevance. - Retry: Same input/seeds → same output (idempotence). - Monotonicity: Smaller chunk_size → more chunks. - Noise: Add irrelevant records → same core outputs. - Associativity: Split/recombine folds → same aggregate.

... (examples per iteration in case study)


6. Runtime Preservation Guarantee

Set system-specific budget (multi-dimensional: latency, throughput, peak RSS). Profile per pass; fuse if over (imperative behind pure); defer non-blocking regressions to Core 2.


7. Anti-Patterns & Immediate Fixes

Anti-Pattern Symptom Fix
Vibes-based choices Inefficient paths Use heuristics/scores
Naïve predicates Pass regressions Parameterize + multisets
Early collapse Lose continuation Gate on adapter idempotence
Synthetic examples Unrealistic prep Use anonymized real code

8. Pre-Core Quiz

  1. Choose paths by…? → Heuristics/scores
  2. Predicates parameterized by…? → Domain key/normalize
  3. Collapse errors when…? → Adapters idempotent
  4. Metamorphic derivation? → Invariant + transform + assert
  5. Sig evolution? → 3-phase schedule

9. Post-Core Exercise

  1. Audit real tangled function; score + run 1 iteration.
  2. Define predicates + metamorphic family.
  3. Iterate to stop; profile.

Pipeline Usage (Idiomatic)

result = perform_io(rag_composed(storage, in_path, out_path))

Next: core 2. Performance Budgeting – When FP Wins, When to Vectorize, When to Drop to C/NumPy