Coverage for /home/runner/work/bijux-cli/bijux-cli/src/bijux_cli/commands/utilities.py: 100%
209 statements
« prev ^ index » next coverage.py v7.10.4, created at 2025-08-19 23:36 +0000
« prev ^ index » next coverage.py v7.10.4, created at 2025-08-19 23:36 +0000
1# SPDX-License-Identifier: MIT
2# Copyright © 2025 Bijan Mousavi
4"""Provides shared, reusable utilities for Bijux CLI commands.
6This module centralizes common logic to ensure consistency and reduce code
7duplication across the various command implementations. It includes a suite of
8functions for handling standard CLI tasks, such as:
10* **Validation:** Functions for validating common CLI flags (like `--format`)
11 and checking the environment for non-ASCII characters or malformed
12 configuration files.
13* **Output & Exit:** A set of high-level emitters (`emit_and_exit`,
14 `emit_error_and_exit`) that handle payload serialization (JSON/YAML),
15 pretty-printing, and terminating the application with a contract-compliant
16 exit code and structured message.
17* **Command Orchestration:** A primary helper (`new_run_command`) that
18 encapsulates the standard lifecycle of a command: validation, payload
19 construction, and emission.
20* **Parsing & Sanitization:** Helpers for sanitizing strings to be ASCII-safe
21 and a pre-parser for global flags (`--quiet`, `--debug`, etc.) that
22 operates before Typer's main dispatch.
23* **Plugin Management:** Utilities for discovering and listing installed
24 plugins from the filesystem.
25"""
27from __future__ import annotations
29from collections.abc import Callable, Mapping
30from contextlib import suppress
31import json
32import os
33from pathlib import Path
34import platform
35import re
36import sys
37import time
38from typing import Any, NoReturn
40import yaml
42from bijux_cli.core.enums import OutputFormat
43from bijux_cli.services.plugins import get_plugins_dir
45_ALLOWED_CTRL = {"\n", "\r", "\t"}
46_ENV_LINE_RX = re.compile(r"^[A-Za-z_][A-Za-z0-9_]*=[A-Za-z0-9_./\-]*$")
47KNOWN = {
48 "-h",
49 "--help",
50 "-q",
51 "--quiet",
52 "--debug",
53 "-v",
54 "--verbose",
55 "-f",
56 "--format",
57 "--pretty",
58 "--no-pretty",
59}
62def ascii_safe(text: Any, _field: str = "") -> str:
63 """Converts any value to a string containing only printable ASCII characters.
65 Non-ASCII characters are replaced with '?'. Newlines, carriage returns,
66 and tabs are preserved.
68 Args:
69 text (Any): The value to sanitize.
70 _field (str, optional): An unused parameter for potential future use
71 in context or telemetry. Defaults to "".
73 Returns:
74 str: An ASCII-safe string.
75 """
76 text_str = text if isinstance(text, str) else str(text)
78 return "".join(
79 ch if (32 <= ord(ch) <= 126) or ch in _ALLOWED_CTRL else "?" for ch in text_str
80 )
83def normalize_format(fmt: str | None) -> str:
84 """Normalizes a format string to lowercase and removes whitespace.
86 Args:
87 fmt (str | None): The format string to normalize.
89 Returns:
90 str: The normalized format string, or an empty string if input is None.
91 """
92 return (fmt or "").strip().lower()
95def contains_non_ascii_env() -> bool:
96 """Checks for non-ASCII characters in the CLI's environment.
98 This function returns True if any of the following are detected:
99 * The `BIJUXCLI_CONFIG` environment variable contains non-ASCII characters.
100 * The file path pointed to by `BIJUXCLI_CONFIG` exists and its contents
101 cannot be decoded as ASCII.
102 * Any environment variable with a name starting with `BIJUXCLI_` has a
103 value containing non-ASCII characters.
105 Returns:
106 bool: True if a non-ASCII condition is found, otherwise False.
107 """
108 config_path_str = os.environ.get("BIJUXCLI_CONFIG")
109 if config_path_str:
110 if not config_path_str.isascii():
111 return True
112 config_path = Path(config_path_str)
113 if config_path.exists():
114 try:
115 config_path.read_text(encoding="ascii")
116 except UnicodeDecodeError:
117 return True
118 except (IsADirectoryError, PermissionError, FileNotFoundError, OSError):
119 pass
121 for k, v in os.environ.items():
122 if k.startswith("BIJUXCLI_") and not v.isascii():
123 return True
124 return False
127def validate_common_flags(
128 fmt: str,
129 command: str,
130 quiet: bool,
131 include_runtime: bool = False,
132) -> str:
133 """Validates common CLI flags and environment settings.
135 This function ensures the format is supported and the environment is
136 ASCII-safe, exiting with a structured error if validation fails.
138 Args:
139 fmt (str): The requested output format.
140 command (str): The name of the command for error reporting context.
141 quiet (bool): If True, suppresses output on error before exiting.
142 include_runtime (bool): If True, includes runtime info in error payloads.
144 Returns:
145 str: The validated and normalized format string ("json" or "yaml").
147 Raises:
148 SystemExit: Exits with code 2 for an unsupported format or 3 for
149 a non-ASCII environment.
150 """
151 format_lower = (fmt or "").lower()
152 if format_lower not in ("json", "yaml"):
153 emit_error_and_exit(
154 f"Unsupported format: {fmt}",
155 code=2,
156 failure="format",
157 command=command,
158 fmt=format_lower or "json",
159 quiet=quiet,
160 include_runtime=include_runtime,
161 debug=False,
162 )
164 if contains_non_ascii_env():
165 emit_error_and_exit(
166 "Non-ASCII in configuration or environment",
167 code=3,
168 failure="ascii",
169 command=command,
170 fmt=format_lower,
171 quiet=quiet,
172 include_runtime=include_runtime,
173 debug=False,
174 )
176 return format_lower
179def validate_env_file_if_present(path_str: str) -> None:
180 """Validates the syntax of an environment configuration file if it exists.
182 Checks that every non-comment, non-blank line conforms to a `KEY=VALUE`
183 pattern.
185 Args:
186 path_str (str): The path to the environment file.
188 Raises:
189 ValueError: If the file cannot be read or contains a malformed line.
190 """
191 if not path_str or not Path(path_str).exists():
192 return
193 try:
194 text = Path(path_str).read_text(encoding="utf-8", errors="strict")
195 except Exception as exc:
196 raise ValueError(f"Cannot read config file: {exc}") from exc
198 for i, line in enumerate(text.splitlines(), start=1):
199 s = line.strip()
200 if s and not s.startswith("#") and not _ENV_LINE_RX.match(s):
201 raise ValueError(f"Malformed line {i} in config: {line!r}")
204def new_run_command(
205 command_name: str,
206 payload_builder: Callable[[bool], Mapping[str, object]],
207 quiet: bool,
208 verbose: bool,
209 fmt: str,
210 pretty: bool,
211 debug: bool,
212 exit_code: int = 0,
213) -> NoReturn:
214 """Orchestrates the standard execution flow of a CLI command.
216 This function handles dependency resolution, validation, payload
217 construction, and final emission, ensuring a consistent lifecycle for all
218 commands that use it.
220 Args:
221 command_name (str): The name of the command for telemetry/error context.
222 payload_builder: A function that takes a boolean `include_runtime` and
223 returns the command's structured output payload.
224 quiet (bool): If True, suppresses normal output.
225 verbose (bool): If True, includes runtime metadata in the output.
226 fmt (str): The output format ("json" or "yaml").
227 pretty (bool): If True, pretty-prints the output.
228 debug (bool): If True, enables debug-level output.
229 exit_code (int): The exit code to use on successful execution.
231 Raises:
232 SystemExit: Always exits the process with the given `exit_code` or an
233 appropriate error code on failure.
234 """
235 from bijux_cli.contracts import EmitterProtocol, TelemetryProtocol
236 from bijux_cli.core.di import DIContainer
238 DIContainer.current().resolve(EmitterProtocol)
239 DIContainer.current().resolve(TelemetryProtocol)
241 include_runtime = verbose or debug
243 format_lower = validate_common_flags(
244 fmt,
245 command_name,
246 quiet,
247 include_runtime=include_runtime,
248 )
250 output_format = OutputFormat.YAML if format_lower == "yaml" else OutputFormat.JSON
251 effective_pretty = debug or pretty
253 try:
254 payload = payload_builder(include_runtime)
255 except ValueError as exc:
256 emit_error_and_exit(
257 str(exc),
258 code=3,
259 failure="ascii",
260 command=command_name,
261 fmt=output_format,
262 quiet=quiet,
263 include_runtime=include_runtime,
264 debug=debug,
265 )
266 else:
267 emit_and_exit(
268 payload=payload,
269 fmt=output_format,
270 effective_pretty=effective_pretty,
271 verbose=verbose,
272 debug=debug,
273 quiet=quiet,
274 command=command_name,
275 exit_code=exit_code,
276 )
279def emit_and_exit(
280 payload: Mapping[str, Any],
281 fmt: OutputFormat,
282 effective_pretty: bool,
283 verbose: bool,
284 debug: bool,
285 quiet: bool,
286 command: str,
287 *,
288 exit_code: int = 0,
289) -> NoReturn:
290 """Serializes and emits a payload, records history, and exits.
292 Args:
293 payload (Mapping[str, Any]): The data to serialize and print.
294 fmt (OutputFormat): The output format (JSON or YAML).
295 effective_pretty (bool): If True, pretty-prints the output.
296 verbose (bool): If True, includes runtime info in history records.
297 debug (bool): If True, emits a diagnostic message to stderr.
298 quiet (bool): If True, suppresses all output and exits immediately.
299 command (str): The command name, used for history tracking.
300 exit_code (int): The exit status code to use.
302 Raises:
303 SystemExit: Always exits the process with `exit_code`.
304 """
305 if (not quiet) and (not command.startswith("history")):
306 try:
307 from bijux_cli.contracts import HistoryProtocol
308 from bijux_cli.core.di import DIContainer
310 hist = DIContainer.current().resolve(HistoryProtocol)
311 hist.add(
312 command=command,
313 params=[],
314 success=(exit_code == 0),
315 return_code=exit_code,
316 duration_ms=0.0,
317 )
318 except PermissionError as exc:
319 print(f"Permission denied writing history: {exc}", file=sys.stderr)
320 except OSError as exc:
321 import errno as _errno
323 if exc.errno in (_errno.EACCES, _errno.EPERM):
324 print(f"Permission denied writing history: {exc}", file=sys.stderr)
325 elif exc.errno in (_errno.ENOSPC, _errno.EDQUOT):
326 print(
327 f"No space left on device while writing history: {exc}",
328 file=sys.stderr,
329 )
330 else:
331 print(f"Error writing history: {exc}", file=sys.stderr)
332 except Exception as exc:
333 print(f"Error writing history: {exc}", file=sys.stderr)
335 if quiet:
336 sys.exit(exit_code)
338 if debug:
339 print("Diagnostics: emitted payload", file=sys.stderr)
341 indent = 2 if effective_pretty else None
342 if fmt == OutputFormat.JSON:
343 separators = (", ", ": ") if effective_pretty else (",", ":")
344 output = json.dumps(payload, indent=indent, separators=separators)
345 else:
346 default_flow_style = None if effective_pretty else True
347 output = yaml.safe_dump(
348 payload,
349 indent=indent,
350 sort_keys=False,
351 default_flow_style=default_flow_style,
352 )
353 cleaned = output.rstrip("\n")
354 print(cleaned)
355 sys.exit(exit_code)
358def emit_error_and_exit(
359 message: str,
360 code: int,
361 failure: str,
362 command: str | None = None,
363 fmt: str | None = None,
364 quiet: bool = False,
365 include_runtime: bool = False,
366 debug: bool = False,
367 extra: dict[str, Any] | None = None,
368) -> NoReturn:
369 """Emits a structured error payload to stderr and exits the process.
371 Args:
372 message (str): The primary error message.
373 code (int): The exit status code.
374 failure (str): A short, machine-readable failure code.
375 command (str | None): The command name where the error occurred.
376 fmt (str | None): The output format context.
377 quiet (bool): If True, suppresses all output and exits immediately.
378 include_runtime (bool): If True, adds runtime info to the error payload.
379 debug (bool): If True, prints a full traceback to stderr.
380 extra (dict[str, Any] | None): Additional fields to merge into the payload.
382 Raises:
383 SystemExit: Always exits the process with the specified `code`.
384 """
385 if quiet:
386 sys.exit(code)
388 if debug:
389 import traceback
391 traceback.print_exc(file=sys.stderr)
393 error_payload = {"error": message, "code": code}
394 if failure:
395 error_payload["failure"] = failure
396 if command:
397 error_payload["command"] = command
398 if fmt:
399 error_payload["fmt"] = fmt
400 if extra:
401 error_payload.update(extra)
402 if include_runtime:
403 error_payload["python"] = ascii_safe(sys.version.split()[0], "python_version")
404 error_payload["platform"] = ascii_safe(platform.platform(), "platform")
405 error_payload["timestamp"] = str(time.time())
407 try:
408 output = json.dumps(error_payload).rstrip("\n")
409 print(output, file=sys.stderr, flush=True)
410 except Exception:
411 print('{"error": "Unserializable error"}', file=sys.stderr, flush=True)
412 sys.exit(code)
415def parse_global_flags() -> dict[str, Any]:
416 """Parses global CLI flags from `sys.argv` before Typer dispatch.
418 This function inspects and consumes known global flags, rewriting `sys.argv`
419 to contain only the remaining arguments. This allows global settings to be
420 processed independently of the command-specific parsing done by Typer.
422 Returns:
423 dict[str, Any]: A dictionary of parsed flag values, such as `help`,
424 `quiet`, `debug`, `verbose`, `format`, and `pretty`.
426 Raises:
427 SystemExit: If a flag requires an argument that is missing (e.g.,
428 `--format` with no value).
429 """
430 argv = sys.argv[1:]
431 flags: dict[str, Any] = {
432 "help": False,
433 "quiet": False,
434 "debug": False,
435 "verbose": False,
436 "format": "json",
437 "pretty": True,
438 }
439 retained: list[str] = []
441 def _bail(msg: str, failure: str) -> NoReturn:
442 """Emits a standardized error and exits with code 2.
444 Args:
445 msg (str): The error message to report.
446 failure (str): A short failure code (e.g., "missing_argument").
448 Raises:
449 SystemExit: Always exits the process.
450 """
451 emit_error_and_exit(
452 msg,
453 code=2,
454 failure=failure,
455 command="global",
456 fmt=flags["format"],
457 quiet=flags["quiet"],
458 include_runtime=flags["verbose"],
459 debug=flags["debug"],
460 )
462 i = 0
463 while i < len(argv):
464 tok = argv[i]
466 if tok in ("-h", "--help"):
467 flags["help"] = True
468 retained.append(tok)
469 i += 1
470 elif tok in ("-q", "--quiet"):
471 flags["quiet"] = True
472 i += 1
473 elif tok == "--debug":
474 flags["debug"] = True
475 flags["verbose"] = True
476 flags["pretty"] = True
477 i += 1
478 elif tok in ("-v", "--verbose"):
479 flags["verbose"] = True
480 i += 1
481 elif tok == "--pretty":
482 flags["pretty"] = True
483 i += 1
484 elif tok == "--no-pretty":
485 flags["pretty"] = False
486 i += 1
487 elif tok in ("-f", "--format"):
488 i += 1
489 if i >= len(argv):
490 _bail("Missing argument for --format", "missing_argument")
491 else:
492 value = argv[i].lower()
493 flags["format"] = value
494 if flags["help"]:
495 retained.append(tok.lstrip("-"))
496 retained.append(argv[i])
497 if not flags["help"] and value not in ("json", "yaml"):
498 _bail(f"Unsupported format: {value}", "invalid_format")
499 i += 1
500 else:
501 retained.append(tok)
502 i += 1
504 if flags["help"]:
505 retained = [
506 arg.lstrip("-") if arg.startswith("-") and arg not in KNOWN else arg
507 for arg in retained
508 ]
510 sys.argv = [sys.argv[0], *retained]
511 return flags
514def list_installed_plugins() -> list[str]:
515 """Scans the plugins directory and returns a list of installed plugin names.
517 A directory is considered a valid plugin if it is a direct child of the
518 plugins directory and contains a `plugin.py` file.
520 Returns:
521 list[str]: A sorted list of valid plugin names.
523 Raises:
524 RuntimeError: If the plugins directory is invalid, inaccessible,
525 is not a directory, or contains a symlink loop.
526 """
527 plugins_dir = get_plugins_dir()
529 try:
530 resolved = plugins_dir.resolve(strict=True)
531 except FileNotFoundError:
532 return []
533 except RuntimeError as e:
534 raise RuntimeError(f"Symlink loop detected at '{plugins_dir}'.") from e
535 except Exception as exc:
536 raise RuntimeError(
537 f"Plugins directory '{plugins_dir}' invalid or inaccessible."
538 ) from exc
540 if not resolved.is_dir():
541 raise RuntimeError(f"Plugins directory '{plugins_dir}' is not a directory.")
543 plugins: list[str] = []
544 for entry in resolved.iterdir():
545 with suppress(Exception):
546 p = entry.resolve()
547 if p.is_dir() and (p / "plugin.py").is_file():
548 plugins.append(entry.name)
550 plugins.sort()
551 return plugins
554def handle_list_plugins(
555 command: str,
556 quiet: bool,
557 verbose: bool,
558 fmt: str,
559 pretty: bool,
560 debug: bool,
561) -> None:
562 """Handles the logic for commands that list installed plugins.
564 This function serves as a common handler for `plugins list` and similar
565 commands. It retrieves the list of plugins and uses `new_run_command`
566 to emit the result.
568 Args:
569 command (str): The name of the command being executed.
570 quiet (bool): If True, suppresses normal output.
571 verbose (bool): If True, includes runtime metadata in the payload.
572 fmt (str): The requested output format ("json" or "yaml").
573 pretty (bool): If True, pretty-prints the output.
574 debug (bool): If True, enables debug mode.
576 Returns:
577 None:
578 """
579 format_lower = validate_common_flags(fmt, command, quiet)
581 try:
582 plugins = list_installed_plugins()
583 except RuntimeError as exc:
584 emit_error_and_exit(
585 str(exc),
586 code=1,
587 failure="dir_error",
588 command=command,
589 fmt=format_lower,
590 quiet=quiet,
591 include_runtime=verbose,
592 debug=debug,
593 )
594 else:
596 def _build_payload(include: bool) -> dict[str, object]:
597 """Constructs a payload describing installed plugins.
599 Args:
600 include (bool): If True, includes Python/platform info.
602 Returns:
603 dict[str, object]: A dictionary containing a "plugins" list
604 and optional runtime metadata.
605 """
606 payload: dict[str, object] = {"plugins": plugins}
607 if include:
608 payload["python"] = ascii_safe(
609 platform.python_version(), "python_version"
610 )
611 payload["platform"] = ascii_safe(platform.platform(), "platform")
612 return payload
614 new_run_command(
615 command_name=command,
616 payload_builder=_build_payload,
617 quiet=quiet,
618 verbose=verbose,
619 fmt=format_lower,
620 pretty=pretty,
621 debug=debug,
622 )
625__all__ = [
626 "handle_list_plugins",
627 "list_installed_plugins",
628 "parse_global_flags",
629 "emit_error_and_exit",
630 "emit_and_exit",
631 "new_run_command",
632 "validate_env_file_if_present",
633 "validate_common_flags",
634 "contains_non_ascii_env",
635 "normalize_format",
636 "ascii_safe",
637]