M07C09: Sessions & Transactions as Explicit Data (Not Hidden Globals)¶
Module 07 – Main Track Core
Main track: Cores 1, 3–10 (Ports & Adapters + Capability Protocols → Production).
This is a required core. Every production FuncPipe system treats sessions and transactions as explicit data.
Progression Note¶
Module 7 takes the lawful containers and pipelines from Module 6 and puts all effects behind explicit boundaries.
| Module | Focus | Key Outcomes |
|---|---|---|
| 6 | Monadic Flows as Composable Pipelines | Lawful and_then, Reader/State/Writer patterns, error-typed flows |
| 7 | Effect Boundaries & Resource Safety | Ports & adapters, capability protocols, resource-safe IO, idempotency |
| 8 | Async / Concurrent Pipelines | Backpressure, timeouts, resumability, fairness (built on 6–7) |
Core question
How do you model sessions and transactions as explicit immutable data and pure functions – eliminating hidden globals and enabling composable, testable, retry-safe effects?
What you now have after M07C01–M07C08 + this core
- Pure domain core
- Zero direct I/O in domain code
- All I/O behind swappable ports
- Effectful operations described as pure data (IOPlan)
- Typed capability protocols for every common effect
- Reliable resource cleanup
- Pure, composable logging via Writer
- Statically verified capability isolation
- Composable behaviours from small protocols
- Idempotent effects for safe retries
- Explicit sessions & transactions – no hidden globals, fully composable and testable
What the rest of Module 7 adds
- Incremental migration playbook
- Production story: CI, golden tests, shadow traffic
You are now eight steps away from a complete production-grade functional architecture.
1. Laws & Invariants (machine-checked where possible)¶
| Law / Invariant | Description | Enforcement |
|---|---|---|
| Explicit Threading | Every function that needs a session/transaction takes it as an explicit argument; when logical session state must change, the new value is returned explicitly (never mutated or stored globally; adapter-internal connection state is invisible to the core). | mypy --strict + review |
| Atomicity | Transactional operations are all-or-nothing; rollback on body error is best-effort, commit failure dominates on success. | Property tests |
| Idempotency Compatibility | Begin/commit/rollback are idempotent (safe to replay). | Hypothesis |
| No Hidden State | No module-level or global session/transaction objects exist. | CI import rules + review |
| Composability | Transactional behaviours compose via io_bind without leaking state. |
Equivalence tests |
These laws eliminate the most common source of Heisenbugs in effectful systems: hidden mutable session state.
2. Decision Table – Session/Transaction Pattern¶
| Scenario | Lifetime | Needs Atomicity | Recommended Pattern |
|---|---|---|---|
| Long-lived DB connection | Whole run | No | Session dataclass (immutable Mapping) |
| Short-lived atomic write batch | One op | Yes | Explicit Tx value + with_tx wrapper |
| Retryable transaction | Multiple | Yes | with_tx + idempotent commit/rollback (Core 8) |
| Nested transactions | Nested | Yes | Thread Tx explicitly (or savepoints via capability) |
Golden rule: Never use a global or module-level session/transaction. Always pass them explicitly.
3. Public API – Session & Transaction ADTs (src/funcpipe_rag/fp/effects/tx.py)¶
# src/funcpipe_rag/fp/effects/tx.py – mypy --strict clean
from __future__ import annotations
from dataclasses import dataclass, field, replace
from types import MappingProxyType
from typing import Callable, Mapping, Protocol, TypeVar
from funcpipe_rag.domain.effects.io_plan import IOPlan, io_bind, io_pure
from funcpipe_rag.result.types import Err, ErrInfo, Ok, Result
T = TypeVar("T")
@dataclass(frozen=True)
class Session:
"""Long-lived connection state – immutable, explicit."""
conn_id: str
config: Mapping[str, str] = field(default_factory=lambda: MappingProxyType({}))
def session_with(s: Session, **updates: str) -> Session:
"""Pure update – returns new session."""
return replace(s, config=MappingProxyType(dict(s.config) | updates))
class TxProtocol(Protocol):
"""Transactional capability.
Implementers MUST ensure begin/commit/rollback are idempotent w.r.t.
the underlying store so that retries and replays are safe.
"""
def begin(self, session: Session) -> IOPlan[Result["Tx", ErrInfo]]: ...
def commit(self, tx: "Tx") -> IOPlan[Result[None, ErrInfo]]: ...
def rollback(self, tx: "Tx") -> IOPlan[Result[None, ErrInfo]]: ...
@dataclass(frozen=True)
class Tx:
session: Session
tx_id: str
def with_tx(
tx_cap: TxProtocol,
session: Session,
body: Callable[[Tx], IOPlan[Result[T, ErrInfo]]],
) -> IOPlan[Result[T, ErrInfo]]:
"""Bracket a transactional body – pure description.
Semantics (as implemented in this repo):
- If begin() fails → return that Err.
- If body() is Ok and commit() fails → return commit Err (commit failure dominates).
- If body() is Err → attempt rollback as best-effort and preserve the body error.
"""
def after_begin(begin_res: Result[Tx, ErrInfo]) -> IOPlan[Result[T, ErrInfo]]:
if isinstance(begin_res, Err):
return io_pure(begin_res)
tx = begin_res.value
def after_body(body_res: Result[T, ErrInfo]) -> IOPlan[Result[T, ErrInfo]]:
if isinstance(body_res, Ok):
# success path – commit must succeed
return io_bind(
tx_cap.commit(tx),
lambda c_res: io_pure(body_res) if isinstance(c_res, Ok) else io_pure(Err(c_res.error)),
)
# failure path – rollback best-effort; preserve the body error
return io_bind(tx_cap.rollback(tx), lambda _: io_pure(body_res))
return io_bind(body(tx), after_body)
return io_bind(tx_cap.begin(session), after_begin)
Tests for this contract live in tests/unit/domain/test_session.py.
4. Reference Implementations¶
4.1 Bad Example – Hidden Global Session¶
# anti_pattern/global_session.py
_global_session = None
def init():
global _global_session
_global_session = connect_db() # hidden side effect
def some_core_op():
# no argument – reaches into global
cur = _global_session.cursor()
cur.execute("...")
4.2 Good Refactor – Explicit Session + Transaction Capability¶
# Illustrative use-case sketch (not a repo file): a Tx-specific storage capability.
from typing import Iterator, Protocol
from funcpipe_rag.core.rag_types import Chunk
from funcpipe_rag.domain.effects.io_plan import IOPlan, io_delay, perform
from funcpipe_rag.domain.effects.tx import Session, Tx, TxProtocol, with_tx
from funcpipe_rag.result.types import ErrInfo, Result
class StorageTxCap(Protocol):
def write_chunks_in_tx(
self, path: str, chunks: Iterator[Chunk], tx: Tx
) -> Result[None, ErrInfo]:
...
def rag_with_transaction(
tx_cap: TxProtocol,
storage_cap: StorageTxCap,
session: Session,
chunks: Iterator[Chunk],
) -> IOPlan[Result[None, ErrInfo]]:
"""All dependencies explicit – no hidden globals."""
def body(tx: Tx) -> IOPlan[Result[None, ErrInfo]]:
# all writes inside tx
return io_delay(lambda: storage_cap.write_chunks_in_tx("out.jsonl", chunks, tx))
return with_tx(tx_cap, session, body)
Shell injects real capability:
session = Session("conn-123")
plan = rag_with_transaction(PostgresTxCap(), PostgresStorageTxCap(), session, chunks)
result = perform(plan)
4.3 Before → After¶
# Before – global session, manual tx handling
global_session = connect()
tx = global_session.begin()
try:
op1()
op2()
tx.commit()
except:
tx.rollback()
raise
# After – explicit, composable, safe
session = Session("conn")
plan = with_tx(tx_cap, session, lambda tx: op1(tx).bind(lambda _: op2(tx)))
result = perform(plan)
5. Property-Based Proofs (selected)¶
# tests/test_session_tx.py – assumes imports/fixtures from core library
@given(updates=st.dictionaries(st.text(), st.text()))
def test_session_threading(updates):
s1 = Session("conn")
s2 = session_with(s1, **updates)
assert dict(s2.config) == dict(updates)
assert dict(s1.config) == {} # original unchanged
@given()
def test_tx_atomicity():
def failing_body(tx: Tx) -> IOPlan[Result[None, ErrInfo]]:
return io_pure(Err(ErrInfo("FAIL", "boom")))
plan = with_tx(mock_cap, Session("conn"), failing_body)
result = perform(plan)
assert isinstance(result, Err)
assert mock_cap.rollback.called # rollback executed
assert not mock_cap.commit.called
6. Big-O & Allocation Guarantees¶
| Operation | Time | Call-stack | Heap | Allocation |
|---|---|---|---|---|
| Session update | O(n) | O(1) | O(n) | O(n) new mapping |
with_tx bracketing |
O(steps) | O(steps) | O(1) | O(1) |
Linear in transaction steps and session config size; no hidden costs.
7. Anti-Patterns & Immediate Fixes¶
| Anti-Pattern | Symptom | Fix |
|---|---|---|
| Global session | Hidden state, race conditions | Explicit Session dataclass |
| Manual try/except tx | Forgotten rollback, leaks | with_tx higher-order wrapper |
| Implicit transaction | Partial commits on crash | Explicit Tx value + atomic capability |
| Thread-local storage | Testing hell | Pure threading via arguments |
8. Pre-Core Quiz¶
- Sessions are…? → Explicit immutable dataclasses
- Transactions are…? → Explicit values threaded through functions
- Never use…? → Global or thread-local session objects
- Atomicity via…? →
with_txwrapper + capability - Real power comes from…? → Composable, testable, retry-safe transactions
9. Post-Core Exercise¶
- Model a Redis session as an explicit
Sessiondataclass. - Write a
with_cache_txwrapper that begins/commits a Redis multi/exec block. - Refactor one real pipeline stage to take an explicit
Txinstead of using a global connection. - Add a property test proving rollback is called on body failure.
Next → M07C10: Incremental Migration – Pushing Effects Outward in Existing Codebases
You now model sessions and transactions as explicit data – eliminating hidden globals forever. Combined with everything before, your system can now handle complex stateful effects while remaining pure, composable, and testable. The final core shows how to migrate real codebases to this architecture.