Coverage for / home / runner / work / bijux-cli / bijux-cli / src / bijux_cli / services / diagnostics / audit.py: 100%
79 statements
« prev ^ index » next coverage.py v7.13.2, created at 2026-01-26 17:59 +0000
« prev ^ index » next coverage.py v7.13.2, created at 2026-01-26 17:59 +0000
1# SPDX-License-Identifier: Apache-2.0
2# Copyright © 2025 Bijan Mousavi
4"""Provides concrete audit service implementations.
6This module defines concrete classes that implement the `AuditProtocol`. It
7offers different strategies for handling command auditing and execution,
8allowing the application to switch between a simulation mode (`DryRunAudit`)
9and a real execution mode (`RealAudit`).
11A factory function, `get_audit_service`, is provided to select the
12appropriate implementation based on a `dry_run` flag.
13"""
15from __future__ import annotations
17from contextlib import suppress
18import os
19import subprocess # nosec B404
20from typing import Any
22from injector import inject
24from bijux_cli.core.errors import BijuxError
25from bijux_cli.infra.process import validate_command
26from bijux_cli.services.contracts import ObservabilityProtocol, TelemetryProtocol
27from bijux_cli.services.diagnostics.contracts import AuditProtocol
30class _BaseAudit(AuditProtocol):
31 """A base class providing common logic for audit services.
33 Attributes:
34 _log (ObservabilityProtocol): The logging service.
35 _tel (TelemetryProtocol): The telemetry service for event tracking.
36 _commands (list): An in-memory list of logged command events.
37 """
39 _log: ObservabilityProtocol
40 _tel: TelemetryProtocol
41 _commands: list[dict[str, Any]]
43 @inject
44 def __init__(self, log: ObservabilityProtocol, tel: TelemetryProtocol) -> None:
45 """Initializes the base audit service.
47 Args:
48 log (ObservabilityProtocol): The service for structured logging.
49 tel (TelemetryProtocol): The service for event tracking.
50 """
51 self._log: ObservabilityProtocol = log
52 self._tel: TelemetryProtocol = tel
53 self._commands: list[dict[str, Any]] = []
55 def shutdown(self) -> None:
56 """Flushes telemetry and closes the logger, suppressing errors."""
57 with suppress(Exception):
58 self._tel.flush()
59 with suppress(Exception):
60 self._log.close()
62 def log(self, cmd: list[str], *, executor: str) -> None:
63 """Logs a command execution for auditing purposes.
65 Args:
66 cmd (list[str]): The command and its arguments to log.
67 executor (str): The name of the entity executing the command.
69 Returns:
70 None:
71 """
72 pass
74 def run(self, cmd: list[str], *, executor: str) -> tuple[int, bytes, bytes]:
75 """Validates, logs, and executes the given command securely.
77 Args:
78 cmd (list[str]): The command and its arguments to execute.
79 executor (str): The name of the entity executing the command.
81 Returns:
82 tuple[int, bytes, bytes]: A tuple containing the command's return
83 code, standard output, and standard error.
85 Raises:
86 NotImplementedError: This method must be implemented by subclasses.
87 """
88 raise NotImplementedError("Subclasses must implement 'run' method.")
90 def get_commands(self) -> list[dict[str, Any]]:
91 """Returns a copy of the recorded command audit trail.
93 Returns:
94 list[dict[str, Any]]: A list of dictionaries, where each represents
95 a logged command.
96 """
97 return self._commands.copy()
99 def get_status(self) -> dict[str, Any]:
100 """Returns the current status of the audit service.
102 Returns:
103 dict[str, Any]: A dictionary containing status information, such as
104 the number of commands processed.
105 """
106 return {"commands_processed": len(self._commands)}
108 def cli_audit(self) -> None:
109 """Performs a no-op CLI audit to conform to the protocol."""
110 pass
113class DryRunAudit(_BaseAudit):
114 """An audit service that records events and simulates command execution."""
116 def __init__(self, log: ObservabilityProtocol, tel: TelemetryProtocol) -> None:
117 """Initializes the `DryRunAudit` service.
119 Args:
120 log (ObservabilityProtocol): The service for structured logging.
121 tel (TelemetryProtocol): The service for event tracking.
122 """
123 super().__init__(log, tel)
125 def log(self, cmd: list[str], *, executor: str) -> None:
126 """Logs and records a command without executing it.
128 Args:
129 cmd (list[str]): The command and arguments to log.
130 executor (str): The name of the entity executing the command.
132 Returns:
133 None:
134 """
135 entry = {"cmd": cmd, "executor": executor}
136 self._commands.append(entry)
137 self._log.log("info", "Dry-run", extra=entry)
138 self._tel.event("audit_dry_run", entry)
140 def run(self, cmd: list[str], *, executor: str) -> tuple[int, bytes, bytes]:
141 """Simulates the execution of a command.
143 This method logs the command and returns a successful result without
144 actually running a subprocess.
146 Args:
147 cmd (list[str]): The command to simulate.
148 executor (str): The name of the entity executing the command.
150 Returns:
151 tuple[int, bytes, bytes]: A tuple of dummy values: `(0, b"", b"")`.
152 """
153 self.log(cmd, executor=executor)
154 return 0, b"", b""
156 def cli_audit(self) -> None:
157 """Logs a dry-run CLI audit event."""
158 self._log.log("info", "CLI audit (dry-run)", extra={})
159 self._tel.event("audit_cli_dry_run", {})
162class RealAudit(_BaseAudit):
163 """An audit service that validates, logs, and executes real commands."""
165 def __init__(
166 self,
167 log: ObservabilityProtocol,
168 tel: TelemetryProtocol,
169 *,
170 allowed_commands: list[str],
171 ) -> None:
172 """Initializes the `RealAudit` service.
174 Args:
175 log (ObservabilityProtocol): The service for structured logging.
176 tel (TelemetryProtocol): The service for event tracking.
177 allowed_commands (list[str]): Explicit allowlist for audited commands.
178 """
179 super().__init__(log, tel)
180 self._allowed_commands = allowed_commands
182 def log(self, cmd: list[str], *, executor: str) -> None:
183 """Logs a command with the intent to execute it.
185 Args:
186 cmd (list[str]): The command and arguments to log.
187 executor (str): The name of the entity executing the command.
189 Returns:
190 None:
191 """
192 entry = {"cmd": cmd, "executor": executor}
193 self._commands.append(entry)
194 self._log.log("debug", f"Executing {executor}", extra=entry)
195 self._tel.event("audit_execute", entry)
197 def run(self, cmd: list[str], *, executor: str) -> tuple[int, bytes, bytes]:
198 """Validates, logs, and executes a command in a subprocess.
200 Args:
201 cmd (list[str]): The command and arguments to execute.
202 executor (str): The name of the entity executing the command.
204 Returns:
205 tuple[int, bytes, bytes]: A tuple containing the command's return
206 code, standard output, and standard error.
208 Raises:
209 BijuxError: If command validation fails or an unexpected error
210 occurs during execution.
211 """
212 try:
213 safe_cmd = validate_command(cmd, allowed_commands=self._allowed_commands)
214 self.log(safe_cmd, executor=executor)
215 proc = subprocess.run( # noqa: S603 # nosec B603
216 safe_cmd,
217 capture_output=True,
218 check=False,
219 shell=False,
220 )
221 self._tel.event(
222 "audit_executed",
223 {
224 "cmd": safe_cmd,
225 "executor": executor,
226 "returncode": proc.returncode,
227 },
228 )
229 return proc.returncode, proc.stdout, proc.stderr
230 except ValueError as err:
231 self._tel.event(
232 "audit_execution_failed",
233 {"cmd": cmd, "executor": executor, "error": str(err)},
234 )
235 raise BijuxError(f"Command validation failed: {err}") from err
236 except Exception as err:
237 self._tel.event(
238 "audit_execution_failed",
239 {"cmd": cmd, "executor": executor, "error": str(err)},
240 )
241 raise BijuxError(f"Failed to execute {executor!r}: {err}") from err
243 def cli_audit(self) -> None:
244 """Logs a real CLI audit event."""
245 self._log.log(
246 "info", "CLI audit (real)", extra={"commands": len(self._commands)}
247 )
248 self._tel.event("audit_cli_real", {"commands": len(self._commands)})
251def get_audit_service(
252 observability: ObservabilityProtocol,
253 telemetry: TelemetryProtocol,
254 dry_run: bool = False,
255) -> AuditProtocol:
256 """A factory function for creating an audit service instance.
258 Args:
259 observability (ObservabilityProtocol): The service for logging.
260 telemetry (TelemetryProtocol): The service for event tracking.
261 dry_run (bool): If True, returns a `DryRunAudit` instance; otherwise,
262 returns a `RealAudit` instance.
264 Returns:
265 AuditProtocol: An instance of the appropriate audit service.
266 """
267 if dry_run:
268 return DryRunAudit(observability, telemetry)
269 allowed = os.getenv("BIJUXCLI_ALLOWED_COMMANDS", "echo,ls,cat,grep").split(",")
270 return RealAudit(observability, telemetry, allowed_commands=allowed)
273__all__ = [
274 "DryRunAudit",
275 "RealAudit",
276 "get_audit_service",
277]