Coverage for /home/runner/work/bijux-cli/bijux-cli/src/bijux_cli/infra/process.py: 91%

53 statements  

« prev     ^ index     » next       coverage.py v7.10.4, created at 2025-08-19 23:36 +0000

1# SPDX-License-Identifier: MIT 

2# Copyright © 2025 Bijan Mousavi 

3 

4"""Provides a process pool service for executing external commands. 

5 

6This module defines the `ProcessPool` class, a concrete implementation of the 

7`ProcessPoolProtocol`. It is designed to run shell commands in isolated 

8subprocesses using a managed pool of workers. Key features include command 

9validation to prevent shell injection and an in-memory LRU cache to return 

10results for repeated commands without re-execution. 

11""" 

12 

13from __future__ import annotations 

14 

15from collections import OrderedDict 

16from concurrent.futures import ProcessPoolExecutor 

17import os 

18import subprocess # nosec B404 

19from typing import Any 

20 

21from injector import inject 

22 

23from bijux_cli.contracts import ( 

24 ObservabilityProtocol, 

25 ProcessPoolProtocol, 

26 TelemetryProtocol, 

27) 

28from bijux_cli.core.exceptions import BijuxError 

29 

30 

31class ProcessPool(ProcessPoolProtocol): 

32 """Executes validated commands in a worker pool with an LRU cache. 

33 

34 This class manages a `ProcessPoolExecutor` to run commands in separate 

35 processes. It maintains a cache of recent command results to avoid 

36 unnecessary re-execution. 

37 

38 Attributes: 

39 _MAX_CACHE (int): The maximum number of command results to store in the 

40 LRU cache. 

41 _exec (ProcessPoolExecutor): The underlying executor for running tasks. 

42 _log (ObservabilityProtocol): The logging service. 

43 _tel (TelemetryProtocol): The telemetry service. 

44 _cache (OrderedDict): An LRU cache storing tuples of command arguments 

45 to their results `(returncode, stdout, stderr)`. 

46 """ 

47 

48 _MAX_CACHE = 1000 

49 

50 @inject 

51 def __init__( 

52 self, 

53 observability: ObservabilityProtocol, 

54 telemetry: TelemetryProtocol, 

55 max_workers: int = 4, 

56 ) -> None: 

57 """Initializes the `ProcessPool` service. 

58 

59 Args: 

60 observability (ObservabilityProtocol): The service for logging. 

61 telemetry (TelemetryProtocol): The service for event tracking. 

62 max_workers (int): The maximum number of worker processes. This can 

63 be overridden by the `BIJUXCLI_MAX_WORKERS` environment variable. 

64 """ 

65 max_workers = int(os.getenv("BIJUXCLI_MAX_WORKERS", str(max_workers))) 

66 self._exec = ProcessPoolExecutor(max_workers=max_workers) 

67 self._log = observability 

68 self._tel = telemetry 

69 self._cache: OrderedDict[tuple[str, ...], tuple[int, bytes, bytes]] = ( 

70 OrderedDict() 

71 ) 

72 

73 def run(self, cmd: list[str], *, executor: str) -> tuple[int, bytes, bytes]: 

74 """Executes a command in the process pool, using a cache. 

75 

76 The command is first looked up in the LRU cache. If not found, it is 

77 validated and then executed in a subprocess. The result is then stored 

78 in the cache. 

79 

80 Args: 

81 cmd (list[str]): The command and its arguments to execute. 

82 executor (str): The name of the entity requesting the execution, 

83 used for telemetry. 

84 

85 Returns: 

86 tuple[int, bytes, bytes]: A tuple containing the command's return 

87 code, standard output, and standard error. 

88 

89 Raises: 

90 BijuxError: If command validation fails or if an unexpected error 

91 occurs during subprocess execution. 

92 """ 

93 from bijux_cli.services.utils import validate_command 

94 

95 key = tuple(cmd) 

96 if key in self._cache: 

97 self._log.log("debug", "Process-pool cache hit", extra={"cmd": cmd}) 

98 self._tel.event("procpool_cache_hit", {"cmd": cmd, "executor": executor}) 

99 self._cache.move_to_end(key) 

100 return self._cache[key] 

101 

102 try: 

103 safe_cmd = validate_command(cmd) 

104 self._log.log("info", "Process-pool executing", extra={"cmd": safe_cmd}) 

105 self._tel.event("procpool_execute", {"cmd": safe_cmd, "executor": executor}) 

106 

107 result = subprocess.run( # noqa: S603 # nosec B603 

108 safe_cmd, 

109 capture_output=True, 

110 check=False, 

111 shell=False, 

112 ) 

113 

114 self._cache[key] = (result.returncode, result.stdout, result.stderr) 

115 self._cache.move_to_end(key) 

116 if len(self._cache) > self._MAX_CACHE: 

117 self._cache.popitem(last=False) 

118 

119 self._tel.event( 

120 "procpool_executed", 

121 { 

122 "cmd": safe_cmd, 

123 "executor": executor, 

124 "returncode": result.returncode, 

125 }, 

126 ) 

127 return result.returncode, result.stdout, result.stderr 

128 

129 except BijuxError: 

130 self._tel.event( 

131 "procpool_execution_failed", 

132 {"cmd": cmd, "executor": executor, "error": "validation"}, 

133 ) 

134 raise 

135 except Exception as exc: 

136 self._tel.event( 

137 "procpool_execution_failed", 

138 {"cmd": cmd, "executor": executor, "error": str(exc)}, 

139 ) 

140 raise BijuxError(f"Process-pool execution failed: {exc}") from exc 

141 

142 def shutdown(self) -> None: 

143 """Gracefully shuts down the worker process pool.""" 

144 self._exec.shutdown(wait=True) 

145 self._tel.event("procpool_shutdown", {}) 

146 self._log.log("debug", "Process-pool shutdown") 

147 

148 def get_status(self) -> dict[str, Any]: 

149 """Returns the current status of the process pool. 

150 

151 Returns: 

152 dict[str, Any]: A dictionary containing status information, such as 

153 the number of commands processed (and cached). 

154 """ 

155 return {"commands_processed": len(self._cache)} 

156 

157 

158def get_process_pool( 

159 logger: ObservabilityProtocol, telemetry: TelemetryProtocol 

160) -> ProcessPoolProtocol: 

161 """A factory function for creating a `ProcessPool` instance. 

162 

163 This helper respects the `BIJUXCLI_MAX_WORKERS` environment variable to 

164 configure the number of worker processes. 

165 

166 Args: 

167 logger (ObservabilityProtocol): The logging service instance. 

168 telemetry (TelemetryProtocol): The telemetry service instance. 

169 

170 Returns: 

171 ProcessPoolProtocol: A configured instance of the process pool service. 

172 """ 

173 workers = int(os.getenv("BIJUXCLI_MAX_WORKERS", "4")) 

174 return ProcessPool(logger, telemetry, max_workers=workers) 

175 

176 

177__all__ = ["get_process_pool", "ProcessPool"]