Coverage for /home/runner/work/bijux-cli/bijux-cli/src/bijux_cli/infra/emitter.py: 100%

44 statements  

« prev     ^ index     » next       coverage.py v7.10.4, created at 2025-08-19 23:36 +0000

1# SPDX-License-Identifier: MIT 

2# Copyright © 2025 Bijan Mousavi 

3 

4"""Provides the concrete implementation of the structured output emitter service. 

5 

6This module defines the `Emitter` class, which implements the `EmitterProtocol`. 

7It is responsible for serializing data payloads into structured formats like 

8JSON or YAML and writing them to standard output or a specified file. The 

9service also integrates with telemetry to log emission events. 

10""" 

11 

12from __future__ import annotations 

13 

14import sys 

15from typing import Any 

16 

17from injector import inject 

18import structlog 

19 

20from bijux_cli.contracts import EmitterProtocol, TelemetryProtocol 

21from bijux_cli.core.enums import OutputFormat 

22from bijux_cli.core.exceptions import CommandError 

23from bijux_cli.infra.serializer import serializer_for 

24 

25 

26class Emitter(EmitterProtocol): 

27 """A service for serializing and emitting structured output. 

28 

29 This class implements the `EmitterProtocol`. It handles the serialization 

30 of data payloads and writes the result to standard output or a file, while 

31 also tracking events with a telemetry service. 

32 

33 Attributes: 

34 _telemetry (TelemetryProtocol): The telemetry service for event tracking. 

35 _default_format (OutputFormat): The default format for serialization. 

36 _debug (bool): Flag indicating if debug mode is enabled. 

37 _quiet (bool): Flag indicating if normal output should be suppressed. 

38 _logger: A configured `structlog` logger instance. 

39 """ 

40 

41 @inject 

42 def __init__( 

43 self, 

44 telemetry: TelemetryProtocol, 

45 output_format: OutputFormat = OutputFormat.JSON, 

46 debug: bool = False, 

47 quiet: bool = False, 

48 **kwargs: Any, 

49 ): 

50 """Initializes the Emitter service. 

51 

52 Args: 

53 telemetry (TelemetryProtocol): The telemetry service for event tracking. 

54 output_format (OutputFormat): The default output format for emissions. 

55 debug (bool): If True, enables debug logging. 

56 quiet (bool): If True, suppresses all non-error output. 

57 **kwargs: Additional keyword arguments (unused). 

58 """ 

59 self._telemetry = telemetry 

60 self._default_format = output_format 

61 self._debug = debug 

62 self._quiet = quiet 

63 self._logger = structlog.get_logger(__name__) 

64 

65 def emit( 

66 self, 

67 payload: Any, 

68 *, 

69 fmt: OutputFormat | None = None, 

70 pretty: bool = False, 

71 level: str = "info", 

72 message: str = "Emitting output", 

73 output: str | None = None, 

74 **context: Any, 

75 ) -> None: 

76 """Serializes and emits a structured data payload. 

77 

78 The payload is serialized to the specified format and written to stdout 

79 or a file path if provided. The operation is suppressed if the emitter 

80 is in quiet mode and the log level is not critical. 

81 

82 Args: 

83 payload (Any): The data payload to serialize and emit. 

84 fmt (OutputFormat | None): The output format. If None, the service's 

85 default format is used. 

86 pretty (bool): If True, formats the output for human readability. 

87 level (str): The log level for any accompanying message (e.g., 

88 "info", "debug", "error"). 

89 message (str): A descriptive message for logging purposes. 

90 output (str | None): An optional file path to write the output to. 

91 If None, output is written to `sys.stdout`. 

92 **context (Any): Additional key-value pairs for structured logging. 

93 

94 Returns: 

95 None: 

96 

97 Raises: 

98 CommandError: If the payload cannot be serialized. 

99 """ 

100 if self._quiet and level not in ["error", "critical"]: 

101 return 

102 

103 output_format = fmt or self._default_format 

104 serializer = serializer_for(output_format, self._telemetry) 

105 try: 

106 output_str = serializer.dumps(payload, fmt=output_format, pretty=pretty) 

107 except Exception as error: 

108 self._logger.error("Serialization failed", error=str(error), **context) 

109 raise CommandError( 

110 f"Serialization failed: {error}", http_status=500 

111 ) from error 

112 

113 stripped = output_str.rstrip("\n") 

114 

115 if output: 

116 with open(output, "w", encoding="utf-8") as f: 

117 f.write(stripped) 

118 else: 

119 print(stripped, file=sys.stdout, flush=True) 

120 

121 if self._debug: 

122 print("Diagnostics: emitted payload", file=sys.stderr) 

123 log = getattr(self._logger, level) 

124 log(message, output=stripped, **context) 

125 

126 try: 

127 self._telemetry.event( 

128 "output_emitted", 

129 {"format": output_format.value, "size_chars": len(stripped)}, 

130 ) 

131 except Exception as tel_err: 

132 if self._debug: 

133 self._logger.error("Telemetry failed", error=str(tel_err), **context) 

134 

135 def flush(self) -> None: 

136 """Flushes any buffered output to standard output.""" 

137 sys.stdout.flush() 

138 

139 

140__all__ = ["Emitter"]