Coverage for  / home / runner / work / bijux-cli / bijux-cli / src / bijux_cli / services / diagnostics / audit.py: 100%

79 statements  

« prev     ^ index     » next       coverage.py v7.13.2, created at 2026-01-26 17:59 +0000

1# SPDX-License-Identifier: Apache-2.0 

2# Copyright © 2025 Bijan Mousavi 

3 

4"""Provides concrete audit service implementations. 

5 

6This module defines concrete classes that implement the `AuditProtocol`. It 

7offers different strategies for handling command auditing and execution, 

8allowing the application to switch between a simulation mode (`DryRunAudit`) 

9and a real execution mode (`RealAudit`). 

10 

11A factory function, `get_audit_service`, is provided to select the 

12appropriate implementation based on a `dry_run` flag. 

13""" 

14 

15from __future__ import annotations 

16 

17from contextlib import suppress 

18import os 

19import subprocess # nosec B404 

20from typing import Any 

21 

22from injector import inject 

23 

24from bijux_cli.core.errors import BijuxError 

25from bijux_cli.infra.process import validate_command 

26from bijux_cli.services.contracts import ObservabilityProtocol, TelemetryProtocol 

27from bijux_cli.services.diagnostics.contracts import AuditProtocol 

28 

29 

30class _BaseAudit(AuditProtocol): 

31 """A base class providing common logic for audit services. 

32 

33 Attributes: 

34 _log (ObservabilityProtocol): The logging service. 

35 _tel (TelemetryProtocol): The telemetry service for event tracking. 

36 _commands (list): An in-memory list of logged command events. 

37 """ 

38 

39 _log: ObservabilityProtocol 

40 _tel: TelemetryProtocol 

41 _commands: list[dict[str, Any]] 

42 

43 @inject 

44 def __init__(self, log: ObservabilityProtocol, tel: TelemetryProtocol) -> None: 

45 """Initializes the base audit service. 

46 

47 Args: 

48 log (ObservabilityProtocol): The service for structured logging. 

49 tel (TelemetryProtocol): The service for event tracking. 

50 """ 

51 self._log: ObservabilityProtocol = log 

52 self._tel: TelemetryProtocol = tel 

53 self._commands: list[dict[str, Any]] = [] 

54 

55 def shutdown(self) -> None: 

56 """Flushes telemetry and closes the logger, suppressing errors.""" 

57 with suppress(Exception): 

58 self._tel.flush() 

59 with suppress(Exception): 

60 self._log.close() 

61 

62 def log(self, cmd: list[str], *, executor: str) -> None: 

63 """Logs a command execution for auditing purposes. 

64 

65 Args: 

66 cmd (list[str]): The command and its arguments to log. 

67 executor (str): The name of the entity executing the command. 

68 

69 Returns: 

70 None: 

71 """ 

72 pass 

73 

74 def run(self, cmd: list[str], *, executor: str) -> tuple[int, bytes, bytes]: 

75 """Validates, logs, and executes the given command securely. 

76 

77 Args: 

78 cmd (list[str]): The command and its arguments to execute. 

79 executor (str): The name of the entity executing the command. 

80 

81 Returns: 

82 tuple[int, bytes, bytes]: A tuple containing the command's return 

83 code, standard output, and standard error. 

84 

85 Raises: 

86 NotImplementedError: This method must be implemented by subclasses. 

87 """ 

88 raise NotImplementedError("Subclasses must implement 'run' method.") 

89 

90 def get_commands(self) -> list[dict[str, Any]]: 

91 """Returns a copy of the recorded command audit trail. 

92 

93 Returns: 

94 list[dict[str, Any]]: A list of dictionaries, where each represents 

95 a logged command. 

96 """ 

97 return self._commands.copy() 

98 

99 def get_status(self) -> dict[str, Any]: 

100 """Returns the current status of the audit service. 

101 

102 Returns: 

103 dict[str, Any]: A dictionary containing status information, such as 

104 the number of commands processed. 

105 """ 

106 return {"commands_processed": len(self._commands)} 

107 

108 def cli_audit(self) -> None: 

109 """Performs a no-op CLI audit to conform to the protocol.""" 

110 pass 

111 

112 

113class DryRunAudit(_BaseAudit): 

114 """An audit service that records events and simulates command execution.""" 

115 

116 def __init__(self, log: ObservabilityProtocol, tel: TelemetryProtocol) -> None: 

117 """Initializes the `DryRunAudit` service. 

118 

119 Args: 

120 log (ObservabilityProtocol): The service for structured logging. 

121 tel (TelemetryProtocol): The service for event tracking. 

122 """ 

123 super().__init__(log, tel) 

124 

125 def log(self, cmd: list[str], *, executor: str) -> None: 

126 """Logs and records a command without executing it. 

127 

128 Args: 

129 cmd (list[str]): The command and arguments to log. 

130 executor (str): The name of the entity executing the command. 

131 

132 Returns: 

133 None: 

134 """ 

135 entry = {"cmd": cmd, "executor": executor} 

136 self._commands.append(entry) 

137 self._log.log("info", "Dry-run", extra=entry) 

138 self._tel.event("audit_dry_run", entry) 

139 

140 def run(self, cmd: list[str], *, executor: str) -> tuple[int, bytes, bytes]: 

141 """Simulates the execution of a command. 

142 

143 This method logs the command and returns a successful result without 

144 actually running a subprocess. 

145 

146 Args: 

147 cmd (list[str]): The command to simulate. 

148 executor (str): The name of the entity executing the command. 

149 

150 Returns: 

151 tuple[int, bytes, bytes]: A tuple of dummy values: `(0, b"", b"")`. 

152 """ 

153 self.log(cmd, executor=executor) 

154 return 0, b"", b"" 

155 

156 def cli_audit(self) -> None: 

157 """Logs a dry-run CLI audit event.""" 

158 self._log.log("info", "CLI audit (dry-run)", extra={}) 

159 self._tel.event("audit_cli_dry_run", {}) 

160 

161 

162class RealAudit(_BaseAudit): 

163 """An audit service that validates, logs, and executes real commands.""" 

164 

165 def __init__( 

166 self, 

167 log: ObservabilityProtocol, 

168 tel: TelemetryProtocol, 

169 *, 

170 allowed_commands: list[str], 

171 ) -> None: 

172 """Initializes the `RealAudit` service. 

173 

174 Args: 

175 log (ObservabilityProtocol): The service for structured logging. 

176 tel (TelemetryProtocol): The service for event tracking. 

177 allowed_commands (list[str]): Explicit allowlist for audited commands. 

178 """ 

179 super().__init__(log, tel) 

180 self._allowed_commands = allowed_commands 

181 

182 def log(self, cmd: list[str], *, executor: str) -> None: 

183 """Logs a command with the intent to execute it. 

184 

185 Args: 

186 cmd (list[str]): The command and arguments to log. 

187 executor (str): The name of the entity executing the command. 

188 

189 Returns: 

190 None: 

191 """ 

192 entry = {"cmd": cmd, "executor": executor} 

193 self._commands.append(entry) 

194 self._log.log("debug", f"Executing {executor}", extra=entry) 

195 self._tel.event("audit_execute", entry) 

196 

197 def run(self, cmd: list[str], *, executor: str) -> tuple[int, bytes, bytes]: 

198 """Validates, logs, and executes a command in a subprocess. 

199 

200 Args: 

201 cmd (list[str]): The command and arguments to execute. 

202 executor (str): The name of the entity executing the command. 

203 

204 Returns: 

205 tuple[int, bytes, bytes]: A tuple containing the command's return 

206 code, standard output, and standard error. 

207 

208 Raises: 

209 BijuxError: If command validation fails or an unexpected error 

210 occurs during execution. 

211 """ 

212 try: 

213 safe_cmd = validate_command(cmd, allowed_commands=self._allowed_commands) 

214 self.log(safe_cmd, executor=executor) 

215 proc = subprocess.run( # noqa: S603 # nosec B603 

216 safe_cmd, 

217 capture_output=True, 

218 check=False, 

219 shell=False, 

220 ) 

221 self._tel.event( 

222 "audit_executed", 

223 { 

224 "cmd": safe_cmd, 

225 "executor": executor, 

226 "returncode": proc.returncode, 

227 }, 

228 ) 

229 return proc.returncode, proc.stdout, proc.stderr 

230 except ValueError as err: 

231 self._tel.event( 

232 "audit_execution_failed", 

233 {"cmd": cmd, "executor": executor, "error": str(err)}, 

234 ) 

235 raise BijuxError(f"Command validation failed: {err}") from err 

236 except Exception as err: 

237 self._tel.event( 

238 "audit_execution_failed", 

239 {"cmd": cmd, "executor": executor, "error": str(err)}, 

240 ) 

241 raise BijuxError(f"Failed to execute {executor!r}: {err}") from err 

242 

243 def cli_audit(self) -> None: 

244 """Logs a real CLI audit event.""" 

245 self._log.log( 

246 "info", "CLI audit (real)", extra={"commands": len(self._commands)} 

247 ) 

248 self._tel.event("audit_cli_real", {"commands": len(self._commands)}) 

249 

250 

251def get_audit_service( 

252 observability: ObservabilityProtocol, 

253 telemetry: TelemetryProtocol, 

254 dry_run: bool = False, 

255) -> AuditProtocol: 

256 """A factory function for creating an audit service instance. 

257 

258 Args: 

259 observability (ObservabilityProtocol): The service for logging. 

260 telemetry (TelemetryProtocol): The service for event tracking. 

261 dry_run (bool): If True, returns a `DryRunAudit` instance; otherwise, 

262 returns a `RealAudit` instance. 

263 

264 Returns: 

265 AuditProtocol: An instance of the appropriate audit service. 

266 """ 

267 if dry_run: 

268 return DryRunAudit(observability, telemetry) 

269 allowed = os.getenv("BIJUXCLI_ALLOWED_COMMANDS", "echo,ls,cat,grep").split(",") 

270 return RealAudit(observability, telemetry, allowed_commands=allowed) 

271 

272 

273__all__ = [ 

274 "DryRunAudit", 

275 "RealAudit", 

276 "get_audit_service", 

277]