Coverage for /home/runner/work/bijux-cli/bijux-cli/src/bijux_cli/infra/observability.py: 100%
40 statements
« prev ^ index » next coverage.py v7.10.4, created at 2025-08-19 23:36 +0000
« prev ^ index » next coverage.py v7.10.4, created at 2025-08-19 23:36 +0000
1# SPDX-License-Identifier: MIT
2# Copyright © 2025 Bijan Mousavi
4"""Provides the concrete implementation of the observability and logging service.
6This module defines the `Observability` class, which implements the
7`ObservabilityProtocol`. It serves as the primary interface for structured
8logging throughout the application, using `structlog` as its underlying engine.
9It can also be configured to forward log entries to a telemetry backend,
10unifying logging and event tracking.
11"""
13from __future__ import annotations
15from typing import Any, Self
17from injector import inject
18import structlog
19from structlog.typing import FilteringBoundLogger
21from bijux_cli.contracts import ObservabilityProtocol, TelemetryProtocol
22from bijux_cli.core.exceptions import ServiceError
23from bijux_cli.infra.telemetry import NullTelemetry
26class Observability(ObservabilityProtocol):
27 """A structured logging service integrating `structlog` and telemetry.
29 This class wraps a `structlog` logger to produce structured log entries.
30 If configured with a telemetry backend, it also forwards these events for
31 analytics and monitoring.
33 Attributes:
34 _logger (FilteringBoundLogger): The underlying `structlog` logger instance.
35 _telemetry (TelemetryProtocol): The telemetry service for event forwarding.
36 Defaults to a `NullTelemetry` instance that does nothing.
37 """
39 @inject
40 def __init__(self, *, debug: bool = False) -> None:
41 """Initializes the observability service.
43 Args:
44 debug (bool): If True, configures the service for debug-level
45 logging.
46 """
47 self._logger: FilteringBoundLogger = structlog.get_logger("bijux_cli")
48 self._telemetry: TelemetryProtocol = NullTelemetry()
50 def set_telemetry(self, telemetry: TelemetryProtocol) -> Self:
51 """Attaches a telemetry backend for forwarding log events.
53 This allows the service to be "upgraded" from a simple logger to a full
54 observability tool after its initial creation.
56 Args:
57 telemetry (TelemetryProtocol): The telemetry service to receive events.
59 Returns:
60 Self: The service instance, allowing for method chaining.
61 """
62 self._telemetry = telemetry
63 return self
65 @classmethod
66 def setup(cls, *, debug: bool = False) -> Self:
67 """Instantiates and configures an `Observability` service.
69 Args:
70 debug (bool): If True, enables debug-level logging.
72 Returns:
73 Self: A new, configured `Observability` instance.
74 """
75 return cls(debug=debug)
77 def get_logger(self) -> FilteringBoundLogger:
78 """Retrieves the underlying `structlog` logger instance.
80 Returns:
81 FilteringBoundLogger: The `structlog` logger, which can be used
82 directly if needed.
83 """
84 return self._logger
86 def bind(self, **kwargs: Any) -> Self:
87 """Binds context key-value pairs to all subsequent log entries.
89 Args:
90 **kwargs (Any): Context values to include in each log entry.
92 Returns:
93 Self: The service instance, allowing for method chaining.
94 """
95 self._logger = self._logger.bind(**kwargs)
96 return self
98 def log(
99 self,
100 level: str,
101 msg: str,
102 *,
103 extra: dict[str, Any] | None = None,
104 ) -> Self:
105 """Logs a structured message and emits a corresponding telemetry event.
107 Args:
108 level (str): The severity level of the log (e.g., 'debug', 'info',
109 'warning', 'error', 'critical').
110 msg (str): The log message.
111 extra (dict[str, Any] | None): Additional context to include in the
112 log entry.
114 Returns:
115 Self: The service instance, allowing for method chaining.
117 Raises:
118 ServiceError: If `level` is not a valid log level name.
119 """
120 log_func = getattr(self._logger, level.lower(), None)
121 if not callable(log_func):
122 raise ServiceError(f"Invalid log level: {level}")
124 log_context = extra or {}
125 if log_context:
126 log_func(msg, **log_context)
127 else:
128 log_func(msg)
130 if not isinstance(self._telemetry, NullTelemetry):
131 telemetry_payload = {"level": level, "message": msg}
132 telemetry_payload.update(log_context)
133 self._telemetry.event("LOG_EMITTED", telemetry_payload)
135 return self
137 def close(self) -> None:
138 """Logs the shutdown of the observability service.
140 Note:
141 In this implementation, this method only logs a debug message and
142 does not perform resource cleanup like flushing. Flushing is
143 handled by the telemetry service's own lifecycle methods.
144 """
145 self._logger.debug("Observability shutdown")
148__all__ = ["Observability"]