Coverage for /home/runner/work/bijux-cli/bijux-cli/src/bijux_cli/infra/observability.py: 100%

40 statements  

« prev     ^ index     » next       coverage.py v7.10.4, created at 2025-08-19 23:36 +0000

1# SPDX-License-Identifier: MIT 

2# Copyright © 2025 Bijan Mousavi 

3 

4"""Provides the concrete implementation of the observability and logging service. 

5 

6This module defines the `Observability` class, which implements the 

7`ObservabilityProtocol`. It serves as the primary interface for structured 

8logging throughout the application, using `structlog` as its underlying engine. 

9It can also be configured to forward log entries to a telemetry backend, 

10unifying logging and event tracking. 

11""" 

12 

13from __future__ import annotations 

14 

15from typing import Any, Self 

16 

17from injector import inject 

18import structlog 

19from structlog.typing import FilteringBoundLogger 

20 

21from bijux_cli.contracts import ObservabilityProtocol, TelemetryProtocol 

22from bijux_cli.core.exceptions import ServiceError 

23from bijux_cli.infra.telemetry import NullTelemetry 

24 

25 

26class Observability(ObservabilityProtocol): 

27 """A structured logging service integrating `structlog` and telemetry. 

28 

29 This class wraps a `structlog` logger to produce structured log entries. 

30 If configured with a telemetry backend, it also forwards these events for 

31 analytics and monitoring. 

32 

33 Attributes: 

34 _logger (FilteringBoundLogger): The underlying `structlog` logger instance. 

35 _telemetry (TelemetryProtocol): The telemetry service for event forwarding. 

36 Defaults to a `NullTelemetry` instance that does nothing. 

37 """ 

38 

39 @inject 

40 def __init__(self, *, debug: bool = False) -> None: 

41 """Initializes the observability service. 

42 

43 Args: 

44 debug (bool): If True, configures the service for debug-level 

45 logging. 

46 """ 

47 self._logger: FilteringBoundLogger = structlog.get_logger("bijux_cli") 

48 self._telemetry: TelemetryProtocol = NullTelemetry() 

49 

50 def set_telemetry(self, telemetry: TelemetryProtocol) -> Self: 

51 """Attaches a telemetry backend for forwarding log events. 

52 

53 This allows the service to be "upgraded" from a simple logger to a full 

54 observability tool after its initial creation. 

55 

56 Args: 

57 telemetry (TelemetryProtocol): The telemetry service to receive events. 

58 

59 Returns: 

60 Self: The service instance, allowing for method chaining. 

61 """ 

62 self._telemetry = telemetry 

63 return self 

64 

65 @classmethod 

66 def setup(cls, *, debug: bool = False) -> Self: 

67 """Instantiates and configures an `Observability` service. 

68 

69 Args: 

70 debug (bool): If True, enables debug-level logging. 

71 

72 Returns: 

73 Self: A new, configured `Observability` instance. 

74 """ 

75 return cls(debug=debug) 

76 

77 def get_logger(self) -> FilteringBoundLogger: 

78 """Retrieves the underlying `structlog` logger instance. 

79 

80 Returns: 

81 FilteringBoundLogger: The `structlog` logger, which can be used 

82 directly if needed. 

83 """ 

84 return self._logger 

85 

86 def bind(self, **kwargs: Any) -> Self: 

87 """Binds context key-value pairs to all subsequent log entries. 

88 

89 Args: 

90 **kwargs (Any): Context values to include in each log entry. 

91 

92 Returns: 

93 Self: The service instance, allowing for method chaining. 

94 """ 

95 self._logger = self._logger.bind(**kwargs) 

96 return self 

97 

98 def log( 

99 self, 

100 level: str, 

101 msg: str, 

102 *, 

103 extra: dict[str, Any] | None = None, 

104 ) -> Self: 

105 """Logs a structured message and emits a corresponding telemetry event. 

106 

107 Args: 

108 level (str): The severity level of the log (e.g., 'debug', 'info', 

109 'warning', 'error', 'critical'). 

110 msg (str): The log message. 

111 extra (dict[str, Any] | None): Additional context to include in the 

112 log entry. 

113 

114 Returns: 

115 Self: The service instance, allowing for method chaining. 

116 

117 Raises: 

118 ServiceError: If `level` is not a valid log level name. 

119 """ 

120 log_func = getattr(self._logger, level.lower(), None) 

121 if not callable(log_func): 

122 raise ServiceError(f"Invalid log level: {level}") 

123 

124 log_context = extra or {} 

125 if log_context: 

126 log_func(msg, **log_context) 

127 else: 

128 log_func(msg) 

129 

130 if not isinstance(self._telemetry, NullTelemetry): 

131 telemetry_payload = {"level": level, "message": msg} 

132 telemetry_payload.update(log_context) 

133 self._telemetry.event("LOG_EMITTED", telemetry_payload) 

134 

135 return self 

136 

137 def close(self) -> None: 

138 """Logs the shutdown of the observability service. 

139 

140 Note: 

141 In this implementation, this method only logs a debug message and 

142 does not perform resource cleanup like flushing. Flushing is 

143 handled by the telemetry service's own lifecycle methods. 

144 """ 

145 self._logger.debug("Observability shutdown") 

146 

147 

148__all__ = ["Observability"]