Coverage for /home/runner/work/bijux-cli/bijux-cli/src/bijux_cli/commands/utilities.py: 100%

209 statements  

« prev     ^ index     » next       coverage.py v7.10.4, created at 2025-08-19 23:36 +0000

1# SPDX-License-Identifier: MIT 

2# Copyright © 2025 Bijan Mousavi 

3 

4"""Provides shared, reusable utilities for Bijux CLI commands. 

5 

6This module centralizes common logic to ensure consistency and reduce code 

7duplication across the various command implementations. It includes a suite of 

8functions for handling standard CLI tasks, such as: 

9 

10* **Validation:** Functions for validating common CLI flags (like `--format`) 

11 and checking the environment for non-ASCII characters or malformed 

12 configuration files. 

13* **Output & Exit:** A set of high-level emitters (`emit_and_exit`, 

14 `emit_error_and_exit`) that handle payload serialization (JSON/YAML), 

15 pretty-printing, and terminating the application with a contract-compliant 

16 exit code and structured message. 

17* **Command Orchestration:** A primary helper (`new_run_command`) that 

18 encapsulates the standard lifecycle of a command: validation, payload 

19 construction, and emission. 

20* **Parsing & Sanitization:** Helpers for sanitizing strings to be ASCII-safe 

21 and a pre-parser for global flags (`--quiet`, `--debug`, etc.) that 

22 operates before Typer's main dispatch. 

23* **Plugin Management:** Utilities for discovering and listing installed 

24 plugins from the filesystem. 

25""" 

26 

27from __future__ import annotations 

28 

29from collections.abc import Callable, Mapping 

30from contextlib import suppress 

31import json 

32import os 

33from pathlib import Path 

34import platform 

35import re 

36import sys 

37import time 

38from typing import Any, NoReturn 

39 

40import yaml 

41 

42from bijux_cli.core.enums import OutputFormat 

43from bijux_cli.services.plugins import get_plugins_dir 

44 

45_ALLOWED_CTRL = {"\n", "\r", "\t"} 

46_ENV_LINE_RX = re.compile(r"^[A-Za-z_][A-Za-z0-9_]*=[A-Za-z0-9_./\-]*$") 

47KNOWN = { 

48 "-h", 

49 "--help", 

50 "-q", 

51 "--quiet", 

52 "--debug", 

53 "-v", 

54 "--verbose", 

55 "-f", 

56 "--format", 

57 "--pretty", 

58 "--no-pretty", 

59} 

60 

61 

62def ascii_safe(text: Any, _field: str = "") -> str: 

63 """Converts any value to a string containing only printable ASCII characters. 

64 

65 Non-ASCII characters are replaced with '?'. Newlines, carriage returns, 

66 and tabs are preserved. 

67 

68 Args: 

69 text (Any): The value to sanitize. 

70 _field (str, optional): An unused parameter for potential future use 

71 in context or telemetry. Defaults to "". 

72 

73 Returns: 

74 str: An ASCII-safe string. 

75 """ 

76 text_str = text if isinstance(text, str) else str(text) 

77 

78 return "".join( 

79 ch if (32 <= ord(ch) <= 126) or ch in _ALLOWED_CTRL else "?" for ch in text_str 

80 ) 

81 

82 

83def normalize_format(fmt: str | None) -> str: 

84 """Normalizes a format string to lowercase and removes whitespace. 

85 

86 Args: 

87 fmt (str | None): The format string to normalize. 

88 

89 Returns: 

90 str: The normalized format string, or an empty string if input is None. 

91 """ 

92 return (fmt or "").strip().lower() 

93 

94 

95def contains_non_ascii_env() -> bool: 

96 """Checks for non-ASCII characters in the CLI's environment. 

97 

98 This function returns True if any of the following are detected: 

99 * The `BIJUXCLI_CONFIG` environment variable contains non-ASCII characters. 

100 * The file path pointed to by `BIJUXCLI_CONFIG` exists and its contents 

101 cannot be decoded as ASCII. 

102 * Any environment variable with a name starting with `BIJUXCLI_` has a 

103 value containing non-ASCII characters. 

104 

105 Returns: 

106 bool: True if a non-ASCII condition is found, otherwise False. 

107 """ 

108 config_path_str = os.environ.get("BIJUXCLI_CONFIG") 

109 if config_path_str: 

110 if not config_path_str.isascii(): 

111 return True 

112 config_path = Path(config_path_str) 

113 if config_path.exists(): 

114 try: 

115 config_path.read_text(encoding="ascii") 

116 except UnicodeDecodeError: 

117 return True 

118 except (IsADirectoryError, PermissionError, FileNotFoundError, OSError): 

119 pass 

120 

121 for k, v in os.environ.items(): 

122 if k.startswith("BIJUXCLI_") and not v.isascii(): 

123 return True 

124 return False 

125 

126 

127def validate_common_flags( 

128 fmt: str, 

129 command: str, 

130 quiet: bool, 

131 include_runtime: bool = False, 

132) -> str: 

133 """Validates common CLI flags and environment settings. 

134 

135 This function ensures the format is supported and the environment is 

136 ASCII-safe, exiting with a structured error if validation fails. 

137 

138 Args: 

139 fmt (str): The requested output format. 

140 command (str): The name of the command for error reporting context. 

141 quiet (bool): If True, suppresses output on error before exiting. 

142 include_runtime (bool): If True, includes runtime info in error payloads. 

143 

144 Returns: 

145 str: The validated and normalized format string ("json" or "yaml"). 

146 

147 Raises: 

148 SystemExit: Exits with code 2 for an unsupported format or 3 for 

149 a non-ASCII environment. 

150 """ 

151 format_lower = (fmt or "").lower() 

152 if format_lower not in ("json", "yaml"): 

153 emit_error_and_exit( 

154 f"Unsupported format: {fmt}", 

155 code=2, 

156 failure="format", 

157 command=command, 

158 fmt=format_lower or "json", 

159 quiet=quiet, 

160 include_runtime=include_runtime, 

161 debug=False, 

162 ) 

163 

164 if contains_non_ascii_env(): 

165 emit_error_and_exit( 

166 "Non-ASCII in configuration or environment", 

167 code=3, 

168 failure="ascii", 

169 command=command, 

170 fmt=format_lower, 

171 quiet=quiet, 

172 include_runtime=include_runtime, 

173 debug=False, 

174 ) 

175 

176 return format_lower 

177 

178 

179def validate_env_file_if_present(path_str: str) -> None: 

180 """Validates the syntax of an environment configuration file if it exists. 

181 

182 Checks that every non-comment, non-blank line conforms to a `KEY=VALUE` 

183 pattern. 

184 

185 Args: 

186 path_str (str): The path to the environment file. 

187 

188 Raises: 

189 ValueError: If the file cannot be read or contains a malformed line. 

190 """ 

191 if not path_str or not Path(path_str).exists(): 

192 return 

193 try: 

194 text = Path(path_str).read_text(encoding="utf-8", errors="strict") 

195 except Exception as exc: 

196 raise ValueError(f"Cannot read config file: {exc}") from exc 

197 

198 for i, line in enumerate(text.splitlines(), start=1): 

199 s = line.strip() 

200 if s and not s.startswith("#") and not _ENV_LINE_RX.match(s): 

201 raise ValueError(f"Malformed line {i} in config: {line!r}") 

202 

203 

204def new_run_command( 

205 command_name: str, 

206 payload_builder: Callable[[bool], Mapping[str, object]], 

207 quiet: bool, 

208 verbose: bool, 

209 fmt: str, 

210 pretty: bool, 

211 debug: bool, 

212 exit_code: int = 0, 

213) -> NoReturn: 

214 """Orchestrates the standard execution flow of a CLI command. 

215 

216 This function handles dependency resolution, validation, payload 

217 construction, and final emission, ensuring a consistent lifecycle for all 

218 commands that use it. 

219 

220 Args: 

221 command_name (str): The name of the command for telemetry/error context. 

222 payload_builder: A function that takes a boolean `include_runtime` and 

223 returns the command's structured output payload. 

224 quiet (bool): If True, suppresses normal output. 

225 verbose (bool): If True, includes runtime metadata in the output. 

226 fmt (str): The output format ("json" or "yaml"). 

227 pretty (bool): If True, pretty-prints the output. 

228 debug (bool): If True, enables debug-level output. 

229 exit_code (int): The exit code to use on successful execution. 

230 

231 Raises: 

232 SystemExit: Always exits the process with the given `exit_code` or an 

233 appropriate error code on failure. 

234 """ 

235 from bijux_cli.contracts import EmitterProtocol, TelemetryProtocol 

236 from bijux_cli.core.di import DIContainer 

237 

238 DIContainer.current().resolve(EmitterProtocol) 

239 DIContainer.current().resolve(TelemetryProtocol) 

240 

241 include_runtime = verbose or debug 

242 

243 format_lower = validate_common_flags( 

244 fmt, 

245 command_name, 

246 quiet, 

247 include_runtime=include_runtime, 

248 ) 

249 

250 output_format = OutputFormat.YAML if format_lower == "yaml" else OutputFormat.JSON 

251 effective_pretty = debug or pretty 

252 

253 try: 

254 payload = payload_builder(include_runtime) 

255 except ValueError as exc: 

256 emit_error_and_exit( 

257 str(exc), 

258 code=3, 

259 failure="ascii", 

260 command=command_name, 

261 fmt=output_format, 

262 quiet=quiet, 

263 include_runtime=include_runtime, 

264 debug=debug, 

265 ) 

266 else: 

267 emit_and_exit( 

268 payload=payload, 

269 fmt=output_format, 

270 effective_pretty=effective_pretty, 

271 verbose=verbose, 

272 debug=debug, 

273 quiet=quiet, 

274 command=command_name, 

275 exit_code=exit_code, 

276 ) 

277 

278 

279def emit_and_exit( 

280 payload: Mapping[str, Any], 

281 fmt: OutputFormat, 

282 effective_pretty: bool, 

283 verbose: bool, 

284 debug: bool, 

285 quiet: bool, 

286 command: str, 

287 *, 

288 exit_code: int = 0, 

289) -> NoReturn: 

290 """Serializes and emits a payload, records history, and exits. 

291 

292 Args: 

293 payload (Mapping[str, Any]): The data to serialize and print. 

294 fmt (OutputFormat): The output format (JSON or YAML). 

295 effective_pretty (bool): If True, pretty-prints the output. 

296 verbose (bool): If True, includes runtime info in history records. 

297 debug (bool): If True, emits a diagnostic message to stderr. 

298 quiet (bool): If True, suppresses all output and exits immediately. 

299 command (str): The command name, used for history tracking. 

300 exit_code (int): The exit status code to use. 

301 

302 Raises: 

303 SystemExit: Always exits the process with `exit_code`. 

304 """ 

305 if (not quiet) and (not command.startswith("history")): 

306 try: 

307 from bijux_cli.contracts import HistoryProtocol 

308 from bijux_cli.core.di import DIContainer 

309 

310 hist = DIContainer.current().resolve(HistoryProtocol) 

311 hist.add( 

312 command=command, 

313 params=[], 

314 success=(exit_code == 0), 

315 return_code=exit_code, 

316 duration_ms=0.0, 

317 ) 

318 except PermissionError as exc: 

319 print(f"Permission denied writing history: {exc}", file=sys.stderr) 

320 except OSError as exc: 

321 import errno as _errno 

322 

323 if exc.errno in (_errno.EACCES, _errno.EPERM): 

324 print(f"Permission denied writing history: {exc}", file=sys.stderr) 

325 elif exc.errno in (_errno.ENOSPC, _errno.EDQUOT): 

326 print( 

327 f"No space left on device while writing history: {exc}", 

328 file=sys.stderr, 

329 ) 

330 else: 

331 print(f"Error writing history: {exc}", file=sys.stderr) 

332 except Exception as exc: 

333 print(f"Error writing history: {exc}", file=sys.stderr) 

334 

335 if quiet: 

336 sys.exit(exit_code) 

337 

338 if debug: 

339 print("Diagnostics: emitted payload", file=sys.stderr) 

340 

341 indent = 2 if effective_pretty else None 

342 if fmt == OutputFormat.JSON: 

343 separators = (", ", ": ") if effective_pretty else (",", ":") 

344 output = json.dumps(payload, indent=indent, separators=separators) 

345 else: 

346 default_flow_style = None if effective_pretty else True 

347 output = yaml.safe_dump( 

348 payload, 

349 indent=indent, 

350 sort_keys=False, 

351 default_flow_style=default_flow_style, 

352 ) 

353 cleaned = output.rstrip("\n") 

354 print(cleaned) 

355 sys.exit(exit_code) 

356 

357 

358def emit_error_and_exit( 

359 message: str, 

360 code: int, 

361 failure: str, 

362 command: str | None = None, 

363 fmt: str | None = None, 

364 quiet: bool = False, 

365 include_runtime: bool = False, 

366 debug: bool = False, 

367 extra: dict[str, Any] | None = None, 

368) -> NoReturn: 

369 """Emits a structured error payload to stderr and exits the process. 

370 

371 Args: 

372 message (str): The primary error message. 

373 code (int): The exit status code. 

374 failure (str): A short, machine-readable failure code. 

375 command (str | None): The command name where the error occurred. 

376 fmt (str | None): The output format context. 

377 quiet (bool): If True, suppresses all output and exits immediately. 

378 include_runtime (bool): If True, adds runtime info to the error payload. 

379 debug (bool): If True, prints a full traceback to stderr. 

380 extra (dict[str, Any] | None): Additional fields to merge into the payload. 

381 

382 Raises: 

383 SystemExit: Always exits the process with the specified `code`. 

384 """ 

385 if quiet: 

386 sys.exit(code) 

387 

388 if debug: 

389 import traceback 

390 

391 traceback.print_exc(file=sys.stderr) 

392 

393 error_payload = {"error": message, "code": code} 

394 if failure: 

395 error_payload["failure"] = failure 

396 if command: 

397 error_payload["command"] = command 

398 if fmt: 

399 error_payload["fmt"] = fmt 

400 if extra: 

401 error_payload.update(extra) 

402 if include_runtime: 

403 error_payload["python"] = ascii_safe(sys.version.split()[0], "python_version") 

404 error_payload["platform"] = ascii_safe(platform.platform(), "platform") 

405 error_payload["timestamp"] = str(time.time()) 

406 

407 try: 

408 output = json.dumps(error_payload).rstrip("\n") 

409 print(output, file=sys.stderr, flush=True) 

410 except Exception: 

411 print('{"error": "Unserializable error"}', file=sys.stderr, flush=True) 

412 sys.exit(code) 

413 

414 

415def parse_global_flags() -> dict[str, Any]: 

416 """Parses global CLI flags from `sys.argv` before Typer dispatch. 

417 

418 This function inspects and consumes known global flags, rewriting `sys.argv` 

419 to contain only the remaining arguments. This allows global settings to be 

420 processed independently of the command-specific parsing done by Typer. 

421 

422 Returns: 

423 dict[str, Any]: A dictionary of parsed flag values, such as `help`, 

424 `quiet`, `debug`, `verbose`, `format`, and `pretty`. 

425 

426 Raises: 

427 SystemExit: If a flag requires an argument that is missing (e.g., 

428 `--format` with no value). 

429 """ 

430 argv = sys.argv[1:] 

431 flags: dict[str, Any] = { 

432 "help": False, 

433 "quiet": False, 

434 "debug": False, 

435 "verbose": False, 

436 "format": "json", 

437 "pretty": True, 

438 } 

439 retained: list[str] = [] 

440 

441 def _bail(msg: str, failure: str) -> NoReturn: 

442 """Emits a standardized error and exits with code 2. 

443 

444 Args: 

445 msg (str): The error message to report. 

446 failure (str): A short failure code (e.g., "missing_argument"). 

447 

448 Raises: 

449 SystemExit: Always exits the process. 

450 """ 

451 emit_error_and_exit( 

452 msg, 

453 code=2, 

454 failure=failure, 

455 command="global", 

456 fmt=flags["format"], 

457 quiet=flags["quiet"], 

458 include_runtime=flags["verbose"], 

459 debug=flags["debug"], 

460 ) 

461 

462 i = 0 

463 while i < len(argv): 

464 tok = argv[i] 

465 

466 if tok in ("-h", "--help"): 

467 flags["help"] = True 

468 retained.append(tok) 

469 i += 1 

470 elif tok in ("-q", "--quiet"): 

471 flags["quiet"] = True 

472 i += 1 

473 elif tok == "--debug": 

474 flags["debug"] = True 

475 flags["verbose"] = True 

476 flags["pretty"] = True 

477 i += 1 

478 elif tok in ("-v", "--verbose"): 

479 flags["verbose"] = True 

480 i += 1 

481 elif tok == "--pretty": 

482 flags["pretty"] = True 

483 i += 1 

484 elif tok == "--no-pretty": 

485 flags["pretty"] = False 

486 i += 1 

487 elif tok in ("-f", "--format"): 

488 i += 1 

489 if i >= len(argv): 

490 _bail("Missing argument for --format", "missing_argument") 

491 else: 

492 value = argv[i].lower() 

493 flags["format"] = value 

494 if flags["help"]: 

495 retained.append(tok.lstrip("-")) 

496 retained.append(argv[i]) 

497 if not flags["help"] and value not in ("json", "yaml"): 

498 _bail(f"Unsupported format: {value}", "invalid_format") 

499 i += 1 

500 else: 

501 retained.append(tok) 

502 i += 1 

503 

504 if flags["help"]: 

505 retained = [ 

506 arg.lstrip("-") if arg.startswith("-") and arg not in KNOWN else arg 

507 for arg in retained 

508 ] 

509 

510 sys.argv = [sys.argv[0], *retained] 

511 return flags 

512 

513 

514def list_installed_plugins() -> list[str]: 

515 """Scans the plugins directory and returns a list of installed plugin names. 

516 

517 A directory is considered a valid plugin if it is a direct child of the 

518 plugins directory and contains a `plugin.py` file. 

519 

520 Returns: 

521 list[str]: A sorted list of valid plugin names. 

522 

523 Raises: 

524 RuntimeError: If the plugins directory is invalid, inaccessible, 

525 is not a directory, or contains a symlink loop. 

526 """ 

527 plugins_dir = get_plugins_dir() 

528 

529 try: 

530 resolved = plugins_dir.resolve(strict=True) 

531 except FileNotFoundError: 

532 return [] 

533 except RuntimeError as e: 

534 raise RuntimeError(f"Symlink loop detected at '{plugins_dir}'.") from e 

535 except Exception as exc: 

536 raise RuntimeError( 

537 f"Plugins directory '{plugins_dir}' invalid or inaccessible." 

538 ) from exc 

539 

540 if not resolved.is_dir(): 

541 raise RuntimeError(f"Plugins directory '{plugins_dir}' is not a directory.") 

542 

543 plugins: list[str] = [] 

544 for entry in resolved.iterdir(): 

545 with suppress(Exception): 

546 p = entry.resolve() 

547 if p.is_dir() and (p / "plugin.py").is_file(): 

548 plugins.append(entry.name) 

549 

550 plugins.sort() 

551 return plugins 

552 

553 

554def handle_list_plugins( 

555 command: str, 

556 quiet: bool, 

557 verbose: bool, 

558 fmt: str, 

559 pretty: bool, 

560 debug: bool, 

561) -> None: 

562 """Handles the logic for commands that list installed plugins. 

563 

564 This function serves as a common handler for `plugins list` and similar 

565 commands. It retrieves the list of plugins and uses `new_run_command` 

566 to emit the result. 

567 

568 Args: 

569 command (str): The name of the command being executed. 

570 quiet (bool): If True, suppresses normal output. 

571 verbose (bool): If True, includes runtime metadata in the payload. 

572 fmt (str): The requested output format ("json" or "yaml"). 

573 pretty (bool): If True, pretty-prints the output. 

574 debug (bool): If True, enables debug mode. 

575 

576 Returns: 

577 None: 

578 """ 

579 format_lower = validate_common_flags(fmt, command, quiet) 

580 

581 try: 

582 plugins = list_installed_plugins() 

583 except RuntimeError as exc: 

584 emit_error_and_exit( 

585 str(exc), 

586 code=1, 

587 failure="dir_error", 

588 command=command, 

589 fmt=format_lower, 

590 quiet=quiet, 

591 include_runtime=verbose, 

592 debug=debug, 

593 ) 

594 else: 

595 

596 def _build_payload(include: bool) -> dict[str, object]: 

597 """Constructs a payload describing installed plugins. 

598 

599 Args: 

600 include (bool): If True, includes Python/platform info. 

601 

602 Returns: 

603 dict[str, object]: A dictionary containing a "plugins" list 

604 and optional runtime metadata. 

605 """ 

606 payload: dict[str, object] = {"plugins": plugins} 

607 if include: 

608 payload["python"] = ascii_safe( 

609 platform.python_version(), "python_version" 

610 ) 

611 payload["platform"] = ascii_safe(platform.platform(), "platform") 

612 return payload 

613 

614 new_run_command( 

615 command_name=command, 

616 payload_builder=_build_payload, 

617 quiet=quiet, 

618 verbose=verbose, 

619 fmt=format_lower, 

620 pretty=pretty, 

621 debug=debug, 

622 ) 

623 

624 

625__all__ = [ 

626 "handle_list_plugins", 

627 "list_installed_plugins", 

628 "parse_global_flags", 

629 "emit_error_and_exit", 

630 "emit_and_exit", 

631 "new_run_command", 

632 "validate_env_file_if_present", 

633 "validate_common_flags", 

634 "contains_non_ascii_env", 

635 "normalize_format", 

636 "ascii_safe", 

637]