Coverage for /home/runner/work/bijux-cli/bijux-cli/src/bijux_cli/services/audit.py: 100%
73 statements
« prev ^ index » next coverage.py v7.10.4, created at 2025-08-19 23:36 +0000
« prev ^ index » next coverage.py v7.10.4, created at 2025-08-19 23:36 +0000
1# SPDX-License-Identifier: MIT
2# Copyright © 2025 Bijan Mousavi
4"""Provides concrete audit service implementations.
6This module defines concrete classes that implement the `AuditProtocol`. It
7offers different strategies for handling command auditing and execution,
8allowing the application to switch between a simulation mode (`DryRunAudit`)
9and a real execution mode (`RealAudit`).
11A factory function, `get_audit_service`, is provided to select the
12appropriate implementation based on a `dry_run` flag.
13"""
15from __future__ import annotations
17from contextlib import suppress
18import subprocess # nosec B404
19from typing import Any
21from injector import inject
23from bijux_cli.contracts import AuditProtocol, ObservabilityProtocol, TelemetryProtocol
24from bijux_cli.core.exceptions import BijuxError
25from bijux_cli.services.utils import validate_command
28class _BaseAudit(AuditProtocol):
29 """A base class providing common logic for audit services.
31 Attributes:
32 _log (ObservabilityProtocol): The logging service.
33 _tel (TelemetryProtocol): The telemetry service for event tracking.
34 _commands (list): An in-memory list of logged command events.
35 """
37 _log: ObservabilityProtocol
38 _tel: TelemetryProtocol
39 _commands: list[dict[str, Any]]
41 @inject
42 def __init__(self, log: ObservabilityProtocol, tel: TelemetryProtocol) -> None:
43 """Initializes the base audit service.
45 Args:
46 log (ObservabilityProtocol): The service for structured logging.
47 tel (TelemetryProtocol): The service for event tracking.
48 """
49 self._log: ObservabilityProtocol = log
50 self._tel: TelemetryProtocol = tel
51 self._commands: list[dict[str, Any]] = []
53 def shutdown(self) -> None:
54 """Flushes telemetry and closes the logger, suppressing errors."""
55 with suppress(Exception):
56 self._tel.flush()
57 with suppress(Exception):
58 self._log.close()
60 def log(self, cmd: list[str], *, executor: str) -> None:
61 """Logs a command execution for auditing purposes.
63 Args:
64 cmd (list[str]): The command and its arguments to log.
65 executor (str): The name of the entity executing the command.
67 Returns:
68 None:
69 """
70 pass
72 def run(self, cmd: list[str], *, executor: str) -> tuple[int, bytes, bytes]:
73 """Validates, logs, and executes the given command securely.
75 Args:
76 cmd (list[str]): The command and its arguments to execute.
77 executor (str): The name of the entity executing the command.
79 Returns:
80 tuple[int, bytes, bytes]: A tuple containing the command's return
81 code, standard output, and standard error.
83 Raises:
84 NotImplementedError: This method must be implemented by subclasses.
85 """
86 raise NotImplementedError("Subclasses must implement 'run' method.")
88 def get_commands(self) -> list[dict[str, Any]]:
89 """Returns a copy of the recorded command audit trail.
91 Returns:
92 list[dict[str, Any]]: A list of dictionaries, where each represents
93 a logged command.
94 """
95 return self._commands.copy()
97 def get_status(self) -> dict[str, Any]:
98 """Returns the current status of the audit service.
100 Returns:
101 dict[str, Any]: A dictionary containing status information, such as
102 the number of commands processed.
103 """
104 return {"commands_processed": len(self._commands)}
106 def cli_audit(self) -> None:
107 """Performs a no-op CLI audit to conform to the protocol."""
108 pass
111class DryRunAudit(_BaseAudit):
112 """An audit service that records events and simulates command execution."""
114 def __init__(self, log: ObservabilityProtocol, tel: TelemetryProtocol) -> None:
115 """Initializes the `DryRunAudit` service.
117 Args:
118 log (ObservabilityProtocol): The service for structured logging.
119 tel (TelemetryProtocol): The service for event tracking.
120 """
121 super().__init__(log, tel)
123 def log(self, cmd: list[str], *, executor: str) -> None:
124 """Logs and records a command without executing it.
126 Args:
127 cmd (list[str]): The command and arguments to log.
128 executor (str): The name of the entity executing the command.
130 Returns:
131 None:
132 """
133 entry = {"cmd": cmd, "executor": executor}
134 self._commands.append(entry)
135 self._log.log("info", "Dry-run", extra=entry)
136 self._tel.event("audit_dry_run", entry)
138 def run(self, cmd: list[str], *, executor: str) -> tuple[int, bytes, bytes]:
139 """Simulates the execution of a command.
141 This method logs the command and returns a successful result without
142 actually running a subprocess.
144 Args:
145 cmd (list[str]): The command to simulate.
146 executor (str): The name of the entity executing the command.
148 Returns:
149 tuple[int, bytes, bytes]: A tuple of dummy values: `(0, b"", b"")`.
150 """
151 self.log(cmd, executor=executor)
152 return 0, b"", b""
154 def cli_audit(self) -> None:
155 """Logs a dry-run CLI audit event."""
156 self._log.log("info", "CLI audit (dry-run)", extra={})
157 self._tel.event("audit_cli_dry_run", {})
160class RealAudit(_BaseAudit):
161 """An audit service that validates, logs, and executes real commands."""
163 def __init__(self, log: ObservabilityProtocol, tel: TelemetryProtocol) -> None:
164 """Initializes the `RealAudit` service.
166 Args:
167 log (ObservabilityProtocol): The service for structured logging.
168 tel (TelemetryProtocol): The service for event tracking.
169 """
170 super().__init__(log, tel)
172 def log(self, cmd: list[str], *, executor: str) -> None:
173 """Logs a command with the intent to execute it.
175 Args:
176 cmd (list[str]): The command and arguments to log.
177 executor (str): The name of the entity executing the command.
179 Returns:
180 None:
181 """
182 entry = {"cmd": cmd, "executor": executor}
183 self._commands.append(entry)
184 self._log.log("debug", f"Executing {executor}", extra=entry)
185 self._tel.event("audit_execute", entry)
187 def run(self, cmd: list[str], *, executor: str) -> tuple[int, bytes, bytes]:
188 """Validates, logs, and executes a command in a subprocess.
190 Args:
191 cmd (list[str]): The command and arguments to execute.
192 executor (str): The name of the entity executing the command.
194 Returns:
195 tuple[int, bytes, bytes]: A tuple containing the command's return
196 code, standard output, and standard error.
198 Raises:
199 BijuxError: If command validation fails or an unexpected error
200 occurs during execution.
201 """
202 try:
203 safe_cmd = validate_command(cmd)
204 self.log(safe_cmd, executor=executor)
205 proc = subprocess.run( # noqa: S603 # nosec B603
206 safe_cmd,
207 capture_output=True,
208 check=False,
209 shell=False,
210 )
211 self._tel.event(
212 "audit_executed",
213 {
214 "cmd": safe_cmd,
215 "executor": executor,
216 "returncode": proc.returncode,
217 },
218 )
219 return proc.returncode, proc.stdout, proc.stderr
220 except BijuxError as err:
221 self._tel.event(
222 "audit_execution_failed",
223 {"cmd": cmd, "executor": executor, "error": str(err)},
224 )
225 raise
226 except Exception as err:
227 self._tel.event(
228 "audit_execution_failed",
229 {"cmd": cmd, "executor": executor, "error": str(err)},
230 )
231 raise BijuxError(f"Failed to execute {executor!r}: {err}") from err
233 def cli_audit(self) -> None:
234 """Logs a real CLI audit event."""
235 self._log.log(
236 "info", "CLI audit (real)", extra={"commands": len(self._commands)}
237 )
238 self._tel.event("audit_cli_real", {"commands": len(self._commands)})
241def get_audit_service(
242 observability: ObservabilityProtocol,
243 telemetry: TelemetryProtocol,
244 dry_run: bool = False,
245) -> AuditProtocol:
246 """A factory function for creating an audit service instance.
248 Args:
249 observability (ObservabilityProtocol): The service for logging.
250 telemetry (TelemetryProtocol): The service for event tracking.
251 dry_run (bool): If True, returns a `DryRunAudit` instance; otherwise,
252 returns a `RealAudit` instance.
254 Returns:
255 AuditProtocol: An instance of the appropriate audit service.
256 """
257 return (
258 DryRunAudit(observability, telemetry)
259 if dry_run
260 else RealAudit(observability, telemetry)
261 )
264__all__ = [
265 "DryRunAudit",
266 "RealAudit",
267 "get_audit_service",
268]