Coverage for /home/runner/work/bijux-cli/bijux-cli/src/bijux_cli/services/audit.py: 100%

73 statements  

« prev     ^ index     » next       coverage.py v7.10.4, created at 2025-08-19 23:36 +0000

1# SPDX-License-Identifier: MIT 

2# Copyright © 2025 Bijan Mousavi 

3 

4"""Provides concrete audit service implementations. 

5 

6This module defines concrete classes that implement the `AuditProtocol`. It 

7offers different strategies for handling command auditing and execution, 

8allowing the application to switch between a simulation mode (`DryRunAudit`) 

9and a real execution mode (`RealAudit`). 

10 

11A factory function, `get_audit_service`, is provided to select the 

12appropriate implementation based on a `dry_run` flag. 

13""" 

14 

15from __future__ import annotations 

16 

17from contextlib import suppress 

18import subprocess # nosec B404 

19from typing import Any 

20 

21from injector import inject 

22 

23from bijux_cli.contracts import AuditProtocol, ObservabilityProtocol, TelemetryProtocol 

24from bijux_cli.core.exceptions import BijuxError 

25from bijux_cli.services.utils import validate_command 

26 

27 

28class _BaseAudit(AuditProtocol): 

29 """A base class providing common logic for audit services. 

30 

31 Attributes: 

32 _log (ObservabilityProtocol): The logging service. 

33 _tel (TelemetryProtocol): The telemetry service for event tracking. 

34 _commands (list): An in-memory list of logged command events. 

35 """ 

36 

37 _log: ObservabilityProtocol 

38 _tel: TelemetryProtocol 

39 _commands: list[dict[str, Any]] 

40 

41 @inject 

42 def __init__(self, log: ObservabilityProtocol, tel: TelemetryProtocol) -> None: 

43 """Initializes the base audit service. 

44 

45 Args: 

46 log (ObservabilityProtocol): The service for structured logging. 

47 tel (TelemetryProtocol): The service for event tracking. 

48 """ 

49 self._log: ObservabilityProtocol = log 

50 self._tel: TelemetryProtocol = tel 

51 self._commands: list[dict[str, Any]] = [] 

52 

53 def shutdown(self) -> None: 

54 """Flushes telemetry and closes the logger, suppressing errors.""" 

55 with suppress(Exception): 

56 self._tel.flush() 

57 with suppress(Exception): 

58 self._log.close() 

59 

60 def log(self, cmd: list[str], *, executor: str) -> None: 

61 """Logs a command execution for auditing purposes. 

62 

63 Args: 

64 cmd (list[str]): The command and its arguments to log. 

65 executor (str): The name of the entity executing the command. 

66 

67 Returns: 

68 None: 

69 """ 

70 pass 

71 

72 def run(self, cmd: list[str], *, executor: str) -> tuple[int, bytes, bytes]: 

73 """Validates, logs, and executes the given command securely. 

74 

75 Args: 

76 cmd (list[str]): The command and its arguments to execute. 

77 executor (str): The name of the entity executing the command. 

78 

79 Returns: 

80 tuple[int, bytes, bytes]: A tuple containing the command's return 

81 code, standard output, and standard error. 

82 

83 Raises: 

84 NotImplementedError: This method must be implemented by subclasses. 

85 """ 

86 raise NotImplementedError("Subclasses must implement 'run' method.") 

87 

88 def get_commands(self) -> list[dict[str, Any]]: 

89 """Returns a copy of the recorded command audit trail. 

90 

91 Returns: 

92 list[dict[str, Any]]: A list of dictionaries, where each represents 

93 a logged command. 

94 """ 

95 return self._commands.copy() 

96 

97 def get_status(self) -> dict[str, Any]: 

98 """Returns the current status of the audit service. 

99 

100 Returns: 

101 dict[str, Any]: A dictionary containing status information, such as 

102 the number of commands processed. 

103 """ 

104 return {"commands_processed": len(self._commands)} 

105 

106 def cli_audit(self) -> None: 

107 """Performs a no-op CLI audit to conform to the protocol.""" 

108 pass 

109 

110 

111class DryRunAudit(_BaseAudit): 

112 """An audit service that records events and simulates command execution.""" 

113 

114 def __init__(self, log: ObservabilityProtocol, tel: TelemetryProtocol) -> None: 

115 """Initializes the `DryRunAudit` service. 

116 

117 Args: 

118 log (ObservabilityProtocol): The service for structured logging. 

119 tel (TelemetryProtocol): The service for event tracking. 

120 """ 

121 super().__init__(log, tel) 

122 

123 def log(self, cmd: list[str], *, executor: str) -> None: 

124 """Logs and records a command without executing it. 

125 

126 Args: 

127 cmd (list[str]): The command and arguments to log. 

128 executor (str): The name of the entity executing the command. 

129 

130 Returns: 

131 None: 

132 """ 

133 entry = {"cmd": cmd, "executor": executor} 

134 self._commands.append(entry) 

135 self._log.log("info", "Dry-run", extra=entry) 

136 self._tel.event("audit_dry_run", entry) 

137 

138 def run(self, cmd: list[str], *, executor: str) -> tuple[int, bytes, bytes]: 

139 """Simulates the execution of a command. 

140 

141 This method logs the command and returns a successful result without 

142 actually running a subprocess. 

143 

144 Args: 

145 cmd (list[str]): The command to simulate. 

146 executor (str): The name of the entity executing the command. 

147 

148 Returns: 

149 tuple[int, bytes, bytes]: A tuple of dummy values: `(0, b"", b"")`. 

150 """ 

151 self.log(cmd, executor=executor) 

152 return 0, b"", b"" 

153 

154 def cli_audit(self) -> None: 

155 """Logs a dry-run CLI audit event.""" 

156 self._log.log("info", "CLI audit (dry-run)", extra={}) 

157 self._tel.event("audit_cli_dry_run", {}) 

158 

159 

160class RealAudit(_BaseAudit): 

161 """An audit service that validates, logs, and executes real commands.""" 

162 

163 def __init__(self, log: ObservabilityProtocol, tel: TelemetryProtocol) -> None: 

164 """Initializes the `RealAudit` service. 

165 

166 Args: 

167 log (ObservabilityProtocol): The service for structured logging. 

168 tel (TelemetryProtocol): The service for event tracking. 

169 """ 

170 super().__init__(log, tel) 

171 

172 def log(self, cmd: list[str], *, executor: str) -> None: 

173 """Logs a command with the intent to execute it. 

174 

175 Args: 

176 cmd (list[str]): The command and arguments to log. 

177 executor (str): The name of the entity executing the command. 

178 

179 Returns: 

180 None: 

181 """ 

182 entry = {"cmd": cmd, "executor": executor} 

183 self._commands.append(entry) 

184 self._log.log("debug", f"Executing {executor}", extra=entry) 

185 self._tel.event("audit_execute", entry) 

186 

187 def run(self, cmd: list[str], *, executor: str) -> tuple[int, bytes, bytes]: 

188 """Validates, logs, and executes a command in a subprocess. 

189 

190 Args: 

191 cmd (list[str]): The command and arguments to execute. 

192 executor (str): The name of the entity executing the command. 

193 

194 Returns: 

195 tuple[int, bytes, bytes]: A tuple containing the command's return 

196 code, standard output, and standard error. 

197 

198 Raises: 

199 BijuxError: If command validation fails or an unexpected error 

200 occurs during execution. 

201 """ 

202 try: 

203 safe_cmd = validate_command(cmd) 

204 self.log(safe_cmd, executor=executor) 

205 proc = subprocess.run( # noqa: S603 # nosec B603 

206 safe_cmd, 

207 capture_output=True, 

208 check=False, 

209 shell=False, 

210 ) 

211 self._tel.event( 

212 "audit_executed", 

213 { 

214 "cmd": safe_cmd, 

215 "executor": executor, 

216 "returncode": proc.returncode, 

217 }, 

218 ) 

219 return proc.returncode, proc.stdout, proc.stderr 

220 except BijuxError as err: 

221 self._tel.event( 

222 "audit_execution_failed", 

223 {"cmd": cmd, "executor": executor, "error": str(err)}, 

224 ) 

225 raise 

226 except Exception as err: 

227 self._tel.event( 

228 "audit_execution_failed", 

229 {"cmd": cmd, "executor": executor, "error": str(err)}, 

230 ) 

231 raise BijuxError(f"Failed to execute {executor!r}: {err}") from err 

232 

233 def cli_audit(self) -> None: 

234 """Logs a real CLI audit event.""" 

235 self._log.log( 

236 "info", "CLI audit (real)", extra={"commands": len(self._commands)} 

237 ) 

238 self._tel.event("audit_cli_real", {"commands": len(self._commands)}) 

239 

240 

241def get_audit_service( 

242 observability: ObservabilityProtocol, 

243 telemetry: TelemetryProtocol, 

244 dry_run: bool = False, 

245) -> AuditProtocol: 

246 """A factory function for creating an audit service instance. 

247 

248 Args: 

249 observability (ObservabilityProtocol): The service for logging. 

250 telemetry (TelemetryProtocol): The service for event tracking. 

251 dry_run (bool): If True, returns a `DryRunAudit` instance; otherwise, 

252 returns a `RealAudit` instance. 

253 

254 Returns: 

255 AuditProtocol: An instance of the appropriate audit service. 

256 """ 

257 return ( 

258 DryRunAudit(observability, telemetry) 

259 if dry_run 

260 else RealAudit(observability, telemetry) 

261 ) 

262 

263 

264__all__ = [ 

265 "DryRunAudit", 

266 "RealAudit", 

267 "get_audit_service", 

268]