Skip to content

Resource-Aware Streams

M04C08: Resource-Aware Streaming – Ensuring Generators Close and Clean Up Correctly

Progression Note

By the end of Module 4, you will master safe recursion over unpredictable tree-shaped data, monoidal folds as the universal recursion pattern, Result/Option for streaming error handling, validation aggregators, retries, and structured error reporting — all while preserving laziness, equational reasoning, and constant call-stack usage.

Here's a snippet from the progression map:

Module Focus Key Outcomes
3 Lazy Iteration & Generators Memory-efficient streaming, itertools mastery, short-circuiting, observability
4 Safe Recursion & Error Handling in Streams Stack-safe tree recursion, folds, Result/Option, streaming validation/retries/reports
5 Advanced Type-Driven Design ADTs, exhaustive pattern matching, total functions, refined types

Core question:
How do you guarantee that every resource-holding generator (files, network connections, GPU contexts) is properly closed on normal completion, consumer exceptions, producer exceptions, or early termination from breakers — all while keeping the pipeline pure, lazy, and composable?

We now take the Iterator[Result[Chunk, ErrInfo | BreakInfo[ErrInfo]]] stream from M04C07 and face the final real-world reliability question:

“My embedding stream opens a single persistent HTTP connection (or GPU context) for the whole run. When a breaker fires after 10 000 chunks, the connection is never closed — I leak sockets and GPU memory.”

The naïve solution is manual try/finally around the whole pipeline:

conn = open_connection()
try:
    for chunk in chunks:
        yield embed_via_connection(conn, chunk)
finally:
    conn.close()

This works once — but it forgets to close on early break, consumer exceptions, or when you later parallelise.

The production solution uses tiny, composable resource managers that guarantee cleanup on every possible exit path — including when Core 7 breakers fire early.

Audience: Engineers who open any long-lived resource inside a generator and refuse to leak sockets/memory on errors or aborts.

Outcome:
1. You will wrap any resource-holding generator with automatic cleanup that works on all exit paths.
2. You will compose nested resources safely and prove closure via Hypothesis.
3. You will ship a RAG pipeline that never leaks resources — even when breakers abort early.

We formalise exactly what we want from correct, production-ready resource management: cleanup on all paths, scoped effects, laziness (no premature iteration), and compatibility with breakers.


Concrete Motivating Example

Same 100 000 chunk tree from previous cores, but now embedding uses a single persistent connection:

def embed_via_connection_stream(chunks_with_path):
    conn = http_pool.acquire()          # long-lived connection
    try:
        for chunk, path in chunks_with_path:
            yield safe_remote_embed(conn, chunk, path)
    finally:
        http_pool.release(conn)         # must run even on early break!

If a breaker fires after 10 000 chunks, the finally block is not guaranteed to run promptly in a multi-stage pipeline → the connection can remain open until the generator is explicitly closed or garbage-collected.

Desired behaviour:

with managed_stream(lambda: embed_via_connection_stream(chunks_with_path)) as safe_stream:
    for r in circuit_breaker_rate_emit(safe_stream, max_rate=0.2):
        if isinstance(r, Err) and isinstance(r.error, BreakInfo):
            report_circuit_break(r.error)
            break
        process(r)
# → Connection always released, even on early break

1. Laws & Invariants (machine-checked)

Law Formal Statement Enforcement
Cleanup on All Paths Resource is closed on normal exhaustion, consumer exception, producer exception, and early breaker termination. test_cleanup_normal, test_cleanup_consumer_exc, test_cleanup_producer_exc, test_cleanup_on_break.
Scoped Effects No side effects outside managed enter/exit; wrapper is pure except for the managed resource. Reproducibility + no global mutation.
Laziness Entering manager does not advance the iterator; resource creation semantics match direct use. test_manager_lazy_entry.
Composition (LIFO) Nested managers close in reverse order (LIFO) on any exit path. test_nested_manager_lifo.
Equivalence Wrapped stream yields identical values to unwrapped (except cleanup). test_managed_equivalence.

These laws guarantee zero resource leaks in real pipelines.


2. Decision Table – Which Resource Wrapper Do You Actually Use?

Resource Type Needs Factory? Nested? Recommended Wrapper
Simple generator with .close() No No with_resource_stream
Generator from factory Yes No managed_stream
Multiple resources Yes Yes nested_managed
Arbitrary closable object auto_close

Always wrap resource-holding generators.
Never use bare try/finally in pipeline code — use these wrappers.


3. Public API Surface (end-of-Module-04 refactor note)

Refactor note: resource wrappers live in funcpipe_rag.policies.resources (src/funcpipe_rag/policies/resources.py) and are re-exported from funcpipe_rag.api.core.

from funcpipe_rag.api.core import auto_close, managed_stream, nested_managed, with_resource_stream

4. Reference Implementations

4.1 with_resource_stream – Auto-Close Existing Generator

import contextlib
from types import TracebackType
from contextlib import AbstractContextManager
from typing import Any, Callable, ContextManager, Generic, Iterator, Sequence, TypeVar

R = TypeVar("R")

class _ResourceStream(Generic[R], AbstractContextManager[Iterator[R]]):
    def __init__(self, gen: Iterator[R]) -> None:
        self._gen = gen

    def __enter__(self) -> Iterator[R]:
        return self._gen

    def __exit__(self,
                 exc_type: type[BaseException] | None,
                 exc: BaseException | None,
                 tb: TracebackType | None) -> None:

        close = getattr(self._gen, "close", None)
        if callable(close):
            try:
                close()
            except Exception:
                pass  # swallow to never mask original exception
        return None

def with_resource_stream(gen: Iterator[R]) -> ContextManager[Iterator[R]]:
    return _ResourceStream(gen)

4.2 managed_stream – Factory-Based Resource

class _ManagedStream(Generic[R], AbstractContextManager[Iterator[R]]):
    def __init__(self, factory: Callable[[], Iterator[R]]) -> None:
        self._factory = factory
        self._gen: Iterator[R] | None = None

    def __enter__(self) -> Iterator[R]:
        self._gen = self._factory()
        return self._gen

    def __exit__(self,
                 exc_type: type[BaseException] | None,
                 exc: BaseException | None,
                 tb: TracebackType | None) -> None:
        if self._gen is not None:
            close = getattr(self._gen, "close", None)
            if callable(close):
                try:
                    close()
                except Exception:
                    pass
        return None

def managed_stream(factory: Callable[[], Iterator[R]]) -> ContextManager[Iterator[R]]:
    return _ManagedStream(factory)

4.3 nested_managed – Compose Multiple Managers

def nested_managed(managers: Sequence[ContextManager[Any]]) -> ContextManager[tuple[Any, ...]]:
    class _Nested(AbstractContextManager[tuple[Any, ...]]):
        def __init__(self, managers: Sequence[ContextManager[Any]]) -> None:
            self._managers = managers
            self._stack: contextlib.ExitStack | None = None

        def __enter__(self) -> tuple[Any, ...]:
            self._stack = contextlib.ExitStack()
            return tuple(self._stack.enter_context(m) for m in self._managers)

        def __exit__(self,
                     exc_type: type[BaseException] | None,
                     exc: BaseException | None,
                     tb: TracebackType | None) -> None:
            if self._stack is not None:
                self._stack.close()

    return _Nested(managers)

4.4 auto_close – Universal Closable Wrapper

def auto_close(obj: Any) -> ContextManager[Any]:
    """Close obj if it has .close(); respect existing context protocol; otherwise no-op."""
    if hasattr(obj, "__enter__") and hasattr(obj, "__exit__"):
        return contextlib.nullcontext(obj)  # keep outer protocol in control
    if hasattr(obj, "close"):
        return contextlib.closing(obj)
    return contextlib.nullcontext(obj)

4.5 Idiomatic RAG Usage with Breakers

def embed_via_connection_stream(chunks_with_path):
    conn = http_pool.acquire()          # long-lived connection
    try:
        for chunk, path in chunks_with_path:
            yield safe_remote_embed(conn, chunk, path)
    finally:
        http_pool.release(conn)         # must run even on early break!

with managed_stream(lambda: embed_via_connection_stream(chunks_with_path)) as safe_stream:
    for r in circuit_breaker_rate_emit(safe_stream, max_rate=0.2):
        if isinstance(r, Err) and isinstance(r.error, BreakInfo):
            report_circuit_break(r.error)
            break
        process(r)
# → Connection always released, even on early break

5. Property-Based Proofs (tests/test_resources.py)

def test_cleanup_normal():
    closed = False
    def gen():
        nonlocal closed
        try:
            yield 1
            yield 2
        finally:
            closed = True
    with with_resource_stream(gen()) as it:
        list(it)
    assert closed

def test_cleanup_on_consumer_exception():
    closed = False
    def gen():
        nonlocal closed
        try:
            yield 1
            yield 2
        finally:
            closed = True
    with with_resource_stream(gen()) as it:
        with pytest.raises(ValueError):
            for x in it:
                if x == 2:
                    raise ValueError("boom")
    assert closed

def test_cleanup_on_partial_iteration():
    closed = False
    def gen():
        nonlocal closed
        try:
            yield from range(1000)
        finally:
            closed = True
    with with_resource_stream(gen()) as it:
        for _ in range(10):
            next(it)
    assert closed

def test_cleanup_on_producer_exception():
    closed = False
    def gen():
        nonlocal closed
        try:
            yield 1
            raise ValueError("producer fail")
        finally:
            closed = True
    with with_resource_stream(gen()) as it:
        with pytest.raises(ValueError):
            list(it)
    assert closed

def test_manager_lazy_entry():
    entered = False
    def factory():
        nonlocal entered
        entered = True
        yield 42
    mgr = managed_stream(factory)
    assert not entered
    with mgr as it:
        assert entered
        assert next(it) == 42

@given(items=st.lists(st.integers()))
def test_cleanup_on_break(items):
    closed = False
    def src():
        nonlocal closed
        try:
            for x in items:
                yield Ok(x) if x != 0 else Err("ZERO")
        finally:
            closed = True
    with with_resource_stream(src()) as s:
        list(short_circuit_on_err_truncate(s))
    assert closed

def test_nested_manager_lifo():
    order = []
    def m1():
        order.append("enter1")
        yield "a"
        order.append("exit1")
    def m2():
        order.append("enter2")
        yield "b"
        order.append("exit2")
    with nested_managed([contextlib.contextmanager(m1), contextlib.contextmanager(m2)]) as (a, b):
        pass
    assert order == ["enter1", "enter2", "exit2", "exit1"]

def test_managed_equivalence():
    def factory():
        yield from range(10)
    with managed_stream(factory) as it:
        assert list(it) == list(range(10))

6. Big-O & Allocation Guarantees

Variant Time Heap Laziness
with_resource_stream O(N) O(1) Yes
managed_stream O(N) O(1) Yes
nested_managed O(N) O(#managers) Yes
auto_close O(1) O(1) Yes

Constant overhead; cleanup guaranteed on all paths.


7. Anti-Patterns & Immediate Fixes

Anti-Pattern Symptom Fix
Manual try/finally in generators Leaks on early break Use with_resource_stream
Bare generators with resources Leaks on exceptions Use managed_stream for factories
Nested manual cleanup Complex/error-prone Use nested_managed

8. Pre-Core Quiz

  1. with_resource_stream for…? → Auto-close existing generator
  2. managed_stream for…? → Factory-created resource streams
  3. nested_managed for…? → Compose multiple context managers
  4. auto_close for…? → Any object with .close()
  5. Cleanup guaranteed on…? → All exit paths including breakers

9. Post-Core Exercise

  1. Wrap a file-reading generator with with_resource_stream → test cleanup on partial iteration.
  2. Use managed_stream for temporary files → test on early breaker.
  3. Compose three nested resources → verify LIFO closure order.
  4. Add auto_close to your embedder → verify no leaks on OOM.

Next: M04C09 – Functional Retries with Policies (Pure, Composable Retry Loops).

You now have the complete toolkit to never leak a resource again — even when everything goes wrong. The rest of Module 4 is about retries and final reporting.