Coverage for /home/runner/work/bijux-cli/bijux-cli/src/bijux_cli/infra/process.py: 91%
53 statements
« prev ^ index » next coverage.py v7.10.4, created at 2025-08-19 23:36 +0000
« prev ^ index » next coverage.py v7.10.4, created at 2025-08-19 23:36 +0000
1# SPDX-License-Identifier: MIT
2# Copyright © 2025 Bijan Mousavi
4"""Provides a process pool service for executing external commands.
6This module defines the `ProcessPool` class, a concrete implementation of the
7`ProcessPoolProtocol`. It is designed to run shell commands in isolated
8subprocesses using a managed pool of workers. Key features include command
9validation to prevent shell injection and an in-memory LRU cache to return
10results for repeated commands without re-execution.
11"""
13from __future__ import annotations
15from collections import OrderedDict
16from concurrent.futures import ProcessPoolExecutor
17import os
18import subprocess # nosec B404
19from typing import Any
21from injector import inject
23from bijux_cli.contracts import (
24 ObservabilityProtocol,
25 ProcessPoolProtocol,
26 TelemetryProtocol,
27)
28from bijux_cli.core.exceptions import BijuxError
31class ProcessPool(ProcessPoolProtocol):
32 """Executes validated commands in a worker pool with an LRU cache.
34 This class manages a `ProcessPoolExecutor` to run commands in separate
35 processes. It maintains a cache of recent command results to avoid
36 unnecessary re-execution.
38 Attributes:
39 _MAX_CACHE (int): The maximum number of command results to store in the
40 LRU cache.
41 _exec (ProcessPoolExecutor): The underlying executor for running tasks.
42 _log (ObservabilityProtocol): The logging service.
43 _tel (TelemetryProtocol): The telemetry service.
44 _cache (OrderedDict): An LRU cache storing tuples of command arguments
45 to their results `(returncode, stdout, stderr)`.
46 """
48 _MAX_CACHE = 1000
50 @inject
51 def __init__(
52 self,
53 observability: ObservabilityProtocol,
54 telemetry: TelemetryProtocol,
55 max_workers: int = 4,
56 ) -> None:
57 """Initializes the `ProcessPool` service.
59 Args:
60 observability (ObservabilityProtocol): The service for logging.
61 telemetry (TelemetryProtocol): The service for event tracking.
62 max_workers (int): The maximum number of worker processes. This can
63 be overridden by the `BIJUXCLI_MAX_WORKERS` environment variable.
64 """
65 max_workers = int(os.getenv("BIJUXCLI_MAX_WORKERS", str(max_workers)))
66 self._exec = ProcessPoolExecutor(max_workers=max_workers)
67 self._log = observability
68 self._tel = telemetry
69 self._cache: OrderedDict[tuple[str, ...], tuple[int, bytes, bytes]] = (
70 OrderedDict()
71 )
73 def run(self, cmd: list[str], *, executor: str) -> tuple[int, bytes, bytes]:
74 """Executes a command in the process pool, using a cache.
76 The command is first looked up in the LRU cache. If not found, it is
77 validated and then executed in a subprocess. The result is then stored
78 in the cache.
80 Args:
81 cmd (list[str]): The command and its arguments to execute.
82 executor (str): The name of the entity requesting the execution,
83 used for telemetry.
85 Returns:
86 tuple[int, bytes, bytes]: A tuple containing the command's return
87 code, standard output, and standard error.
89 Raises:
90 BijuxError: If command validation fails or if an unexpected error
91 occurs during subprocess execution.
92 """
93 from bijux_cli.services.utils import validate_command
95 key = tuple(cmd)
96 if key in self._cache:
97 self._log.log("debug", "Process-pool cache hit", extra={"cmd": cmd})
98 self._tel.event("procpool_cache_hit", {"cmd": cmd, "executor": executor})
99 self._cache.move_to_end(key)
100 return self._cache[key]
102 try:
103 safe_cmd = validate_command(cmd)
104 self._log.log("info", "Process-pool executing", extra={"cmd": safe_cmd})
105 self._tel.event("procpool_execute", {"cmd": safe_cmd, "executor": executor})
107 result = subprocess.run( # noqa: S603 # nosec B603
108 safe_cmd,
109 capture_output=True,
110 check=False,
111 shell=False,
112 )
114 self._cache[key] = (result.returncode, result.stdout, result.stderr)
115 self._cache.move_to_end(key)
116 if len(self._cache) > self._MAX_CACHE:
117 self._cache.popitem(last=False)
119 self._tel.event(
120 "procpool_executed",
121 {
122 "cmd": safe_cmd,
123 "executor": executor,
124 "returncode": result.returncode,
125 },
126 )
127 return result.returncode, result.stdout, result.stderr
129 except BijuxError:
130 self._tel.event(
131 "procpool_execution_failed",
132 {"cmd": cmd, "executor": executor, "error": "validation"},
133 )
134 raise
135 except Exception as exc:
136 self._tel.event(
137 "procpool_execution_failed",
138 {"cmd": cmd, "executor": executor, "error": str(exc)},
139 )
140 raise BijuxError(f"Process-pool execution failed: {exc}") from exc
142 def shutdown(self) -> None:
143 """Gracefully shuts down the worker process pool."""
144 self._exec.shutdown(wait=True)
145 self._tel.event("procpool_shutdown", {})
146 self._log.log("debug", "Process-pool shutdown")
148 def get_status(self) -> dict[str, Any]:
149 """Returns the current status of the process pool.
151 Returns:
152 dict[str, Any]: A dictionary containing status information, such as
153 the number of commands processed (and cached).
154 """
155 return {"commands_processed": len(self._cache)}
158def get_process_pool(
159 logger: ObservabilityProtocol, telemetry: TelemetryProtocol
160) -> ProcessPoolProtocol:
161 """A factory function for creating a `ProcessPool` instance.
163 This helper respects the `BIJUXCLI_MAX_WORKERS` environment variable to
164 configure the number of worker processes.
166 Args:
167 logger (ObservabilityProtocol): The logging service instance.
168 telemetry (TelemetryProtocol): The telemetry service instance.
170 Returns:
171 ProcessPoolProtocol: A configured instance of the process pool service.
172 """
173 workers = int(os.getenv("BIJUXCLI_MAX_WORKERS", "4"))
174 return ProcessPool(logger, telemetry, max_workers=workers)
177__all__ = ["get_process_pool", "ProcessPool"]