Coverage for /home/runner/work/bijux-cli/bijux-cli/src/bijux_cli/api.py: 94%

158 statements  

« prev     ^ index     » next       coverage.py v7.10.4, created at 2025-08-19 23:36 +0000

1# SPDX-License-Identifier: MIT 

2# Copyright © 2025 Bijan Mousavi 

3 

4"""Provides a high-level, synchronous facade for the Bijux CLI's core engine. 

5 

6This module defines the `BijuxAPI` class, which serves as the primary public 

7interface for programmatic interaction with the CLI. It wraps the asynchronous 

8core `Engine` and other services to present a stable, thread-safe, and 

9synchronous API. 

10 

11This facade is intended for use in integrations, testing, or any scenario 

12where the CLI's command and plugin management logic needs to be embedded 

13within another Python application. 

14""" 

15 

16from __future__ import annotations 

17 

18import asyncio 

19from collections.abc import Callable 

20from contextlib import suppress 

21import importlib 

22import inspect 

23import os 

24from pathlib import Path 

25import sys 

26from typing import Any, cast 

27 

28from bijux_cli.commands.utilities import validate_common_flags 

29from bijux_cli.contracts import ( 

30 ObservabilityProtocol, 

31 RegistryProtocol, 

32 TelemetryProtocol, 

33) 

34from bijux_cli.core.di import DIContainer 

35from bijux_cli.core.engine import Engine 

36from bijux_cli.core.enums import OutputFormat 

37from bijux_cli.core.exceptions import BijuxError, CommandError, ServiceError 

38 

39IGNORE = {"PS1", "LS_COLORS", "PROMPT_COMMAND", "GIT_PS1_FORMAT"} 

40 

41 

42def _consume_task(task: asyncio.Future[Any]) -> None: 

43 """Consumes an asyncio task to suppress unhandled exceptions.""" 

44 

45 def _eat_exc(t: asyncio.Future[Any]) -> None: 

46 """Retrieves and suppresses exceptions from a future.""" 

47 with suppress(Exception): 

48 _ = t.exception() 

49 

50 task.add_done_callback(_eat_exc) 

51 

52 

53class BijuxAPI: 

54 """A thread-safe, synchronous access layer for the Bijux CLI engine. 

55 

56 This class provides a stable public API for registering commands, executing 

57 them, and managing plugins. It wraps the internal asynchronous `Engine` to 

58 allow for simpler, synchronous integration into other applications. 

59 

60 Attributes: 

61 _di (DIContainer): The dependency injection container. 

62 _engine (Engine): The core asynchronous runtime engine. 

63 _registry (RegistryProtocol): The plugin registry service. 

64 _obs (ObservabilityProtocol): The logging service. 

65 _tel (TelemetryProtocol): The telemetry service. 

66 """ 

67 

68 def __init__(self, *, debug: bool = False) -> None: 

69 """Initializes the `BijuxAPI` and the underlying CLI engine. 

70 

71 Args: 

72 debug (bool): If True, enables debug mode for all underlying 

73 services. Defaults to False. 

74 """ 

75 DIContainer.reset() 

76 self._di = DIContainer.current() 

77 self._engine = Engine( 

78 self._di, 

79 debug=debug, 

80 fmt=OutputFormat.JSON, 

81 ) 

82 self._registry: RegistryProtocol = self._di.resolve(RegistryProtocol) 

83 self._obs: ObservabilityProtocol = self._di.resolve(ObservabilityProtocol) 

84 self._tel: TelemetryProtocol = self._di.resolve(TelemetryProtocol) 

85 

86 def _schedule_event(self, name: str, payload: dict[str, Any]) -> None: 

87 """Schedules a "fire-and-forget" asynchronous telemetry event. 

88 

89 This helper handles the execution of async telemetry calls from a 

90 synchronous context. 

91 

92 Args: 

93 name (str): The name of the telemetry event. 

94 payload (dict[str, Any]): The data associated with the event. 

95 """ 

96 maybe = self._tel.event(name, payload) 

97 if not inspect.isawaitable(maybe): 

98 return 

99 try: 

100 loop = asyncio.get_running_loop() 

101 except RuntimeError: 

102 asyncio.run(maybe) 

103 else: 

104 if hasattr(loop, "create_task"): 

105 loop.create_task(maybe) 

106 else: 

107 asyncio.run(maybe) 

108 

109 def register(self, name: str, callback: Callable[..., Any]) -> None: 

110 """Registers or replaces a Python callable as a CLI command. 

111 

112 The provided callable is wrapped to handle both synchronous and 

113 asynchronous functions automatically. 

114 

115 Args: 

116 name (str): The command name to register. 

117 callback (Callable[..., Any]): The Python function to be executed 

118 when the command is run. 

119 

120 Raises: 

121 BijuxError: If the command name is already in use or another 

122 registration error occurs. 

123 """ 

124 

125 class _Wrapper: 

126 """Wraps a user-provided callable to be executed asynchronously.""" 

127 

128 def __init__(self, cb: Callable[..., Any]) -> None: 

129 """Initializes the wrapper. 

130 

131 Args: 

132 cb (Callable[..., Any]): The callable to wrap. 

133 """ 

134 self._cb = cb 

135 

136 async def execute(self, *args: Any, **kwargs: Any) -> Any: 

137 """Execute the wrapped callable, awaiting if it's a coroutine. 

138 

139 Args: 

140 *args (Any): Positional arguments to pass to the callable. 

141 **kwargs (Any): Keyword arguments to pass to the callable. 

142 

143 Returns: 

144 Any: The result of the callable execution. 

145 """ 

146 if asyncio.iscoroutinefunction(self._cb): 

147 return await self._cb(*args, **kwargs) 

148 return self._cb(*args, **kwargs) 

149 

150 try: 

151 exists = bool(self._await_maybe(self._registry.has(name), want_result=True)) 

152 if exists: 

153 maybe = cast(Any, self._registry.deregister(name)) 

154 self._await_maybe(maybe) 

155 maybe2 = cast(Any, self._registry.register(name, _Wrapper(callback))) 

156 self._await_maybe(maybe2) 

157 

158 self._obs.log("info", "Registered command", extra={"name": name}) 

159 self._schedule_event("api.register", {"name": name}) 

160 except ServiceError as exc: 

161 self._schedule_event( 

162 "api.register.error", {"name": name, "error": str(exc)} 

163 ) 

164 raise BijuxError( 

165 f"Could not register command {name}: {exc}", http_status=500 

166 ) from exc 

167 

168 def run_sync( 

169 self, 

170 name: str, 

171 *args: Any, 

172 quiet: bool = False, 

173 verbose: bool = False, 

174 fmt: str = "json", 

175 pretty: bool = True, 

176 debug: bool = False, 

177 **kwargs: Any, 

178 ) -> Any: 

179 """Runs a command synchronously. 

180 

181 This method is a blocking wrapper around the asynchronous `run_async` 

182 method. It manages the asyncio event loop to provide a simple, 

183 synchronous interface. 

184 

185 Args: 

186 name (str): The name of the command to run. 

187 *args (Any): Positional arguments for the command. 

188 quiet (bool): If True, suppresses output. 

189 verbose (bool): If True, enables verbose logging. 

190 fmt (str): The output format ("json" or "yaml"). 

191 pretty (bool): If True, formats the output for readability. 

192 debug (bool): If True, enables debug mode. 

193 **kwargs (Any): Additional keyword arguments to pass to the command. 

194 

195 Returns: 

196 Any: The result of the command's execution. 

197 """ 

198 try: 

199 loop = asyncio.get_running_loop() 

200 except RuntimeError: 

201 return asyncio.run( 

202 self.run_async( 

203 name, 

204 *args, 

205 quiet=quiet, 

206 verbose=verbose, 

207 fmt=fmt, 

208 pretty=pretty, 

209 debug=debug, 

210 **kwargs, 

211 ) 

212 ) 

213 else: 

214 if not hasattr(loop, "run_until_complete"): 

215 raise RuntimeError("Cannot call run_sync from a running event loop") 

216 coro = self.run_async( 

217 name, 

218 *args, 

219 quiet=quiet, 

220 verbose=verbose, 

221 fmt=fmt, 

222 pretty=pretty, 

223 debug=debug, 

224 **kwargs, 

225 ) 

226 try: 

227 return loop.run_until_complete(coro) 

228 finally: 

229 with suppress(Exception): 

230 if hasattr(coro, "close"): 230 ↛ 229line 230 didn't jump to line 229

231 coro.close() 

232 

233 async def run_async( 

234 self, 

235 name: str, 

236 *args: Any, 

237 quiet: bool = False, 

238 verbose: bool = False, 

239 fmt: str = "json", 

240 pretty: bool = True, 

241 debug: bool = False, 

242 **kwargs: Any, 

243 ) -> Any: 

244 """Runs a command asynchronously with validation. 

245 

246 This method performs validation of flags and environment variables 

247 before dispatching the command to the internal engine for execution. 

248 

249 Args: 

250 name (str): The name of the command to execute. 

251 *args (Any): Positional arguments for the command. 

252 quiet (bool): If True, suppresses output. 

253 verbose (bool): If True, enables verbose output. 

254 fmt (str): The output format ("json" or "yaml"). 

255 pretty (bool): If True, formats the output for readability. 

256 debug (bool): If True, enables debug mode. 

257 **kwargs (Any): Additional keyword arguments to pass to the command. 

258 

259 Returns: 

260 Any: The result of the command's execution. 

261 

262 Raises: 

263 BijuxError: For invalid flags, unsupported formats, or internal 

264 execution errors. 

265 """ 

266 try: 

267 fmt = fmt.lower() 

268 if fmt not in ("json", "yaml"): 

269 raise BijuxError("Unsupported format", http_status=400) 

270 

271 if quiet and (verbose or debug): 

272 raise BijuxError( 

273 "--quiet cannot be combined with --verbose/--debug", http_status=400 

274 ) 

275 

276 validate_common_flags(fmt, name, quiet, verbose or debug) 

277 

278 for k, v in os.environ.items(): 

279 if k in IGNORE: 279 ↛ 280line 279 didn't jump to line 280 because the condition on line 279 was never true

280 continue 

281 if not v.isascii(): 

282 raise BijuxError( 

283 "Non-ASCII characters in environment", http_status=400 

284 ) 

285 

286 result = await self._engine.run_command(name, *args, **kwargs) 

287 self._schedule_event("api.run", {"name": name}) 

288 return result 

289 

290 except CommandError as exc: 

291 self._schedule_event("api.run.error", {"name": name, "error": str(exc)}) 

292 raise BijuxError( 

293 f"Failed to run command {name}: {exc}", http_status=500 

294 ) from exc 

295 

296 except ServiceError as exc: 

297 self._schedule_event("api.run.error", {"name": name, "error": str(exc)}) 

298 raise BijuxError( 

299 f"Failed to run command {name}: {exc}", http_status=500 

300 ) from exc 

301 

302 except BijuxError: 

303 raise 

304 

305 except Exception as exc: 

306 self._schedule_event("api.run.error", {"name": name, "error": str(exc)}) 

307 raise BijuxError( 

308 f"Failed to run command {name}: {exc}", http_status=500 

309 ) from exc 

310 

311 def load_plugin(self, path: str | Path) -> None: 

312 """Loads or reloads a plugin module from a file path. 

313 

314 This method dynamically loads the specified plugin file, initializes it, 

315 and registers it with the CLI system. If the plugin is already loaded, 

316 it is reloaded. 

317 

318 Args: 

319 path (str | Path): The filesystem path to the plugin's Python file. 

320 

321 Raises: 

322 BijuxError: If plugin loading, initialization, or registration fails. 

323 """ 

324 from bijux_cli.__version__ import __version__ 

325 from bijux_cli.services.plugins import load_plugin as _load_plugin 

326 

327 p = Path(path).expanduser().resolve() 

328 module_name = f"bijux_plugin_{p.stem}" 

329 

330 try: 

331 if module_name in sys.modules: 

332 importlib.reload(sys.modules[module_name]) 

333 

334 plugin = _load_plugin(p, module_name) 

335 plugin.startup(self._engine.di) 

336 

337 exists = bool( 

338 self._await_maybe(self._registry.has(p.stem), want_result=True) 

339 ) 

340 if exists: 

341 self._await_maybe(cast(Any, self._registry).deregister(p.stem)) 

342 

343 self._await_maybe( 

344 cast(Any, self._registry).register( 

345 p.stem, plugin, alias=str(__version__) 

346 ) 

347 ) 

348 self._obs.log("info", "Loaded plugin", extra={"path": str(p)}) 

349 self._schedule_event("api.plugin_loaded", {"path": str(p)}) 

350 

351 except Exception as exc: 

352 self._schedule_event( 

353 "api.plugin_load.error", {"path": str(p), "error": str(exc)} 

354 ) 

355 raise BijuxError( 

356 f"Failed to load plugin {p}: {exc}", http_status=500 

357 ) from exc 

358 

359 @staticmethod 

360 def _await_maybe(value: Any, *, want_result: bool = False) -> Any: 

361 """Synchronously handle possibly-awaitable values with safe fallbacks. 

362 

363 Args: 

364 value: A value that may or may not be awaitable (e.g., a coroutine, 

365 Future, Task, or a plain value). 

366 want_result: When `True`, and the coroutine is *scheduled* (not awaited), 

367 return `False` instead of `None` so callers can reliably detect that 

368 no immediate result is available. 

369 

370 Returns: 

371 The original `value` if it is not awaitable; otherwise, either the 

372 awaited result (when run synchronously) or `None`/`False` when the 

373 coroutine is scheduled for background execution. 

374 

375 Raises: 

376 Exception: Any exception raised by the coroutine when it is run 

377 synchronously via `asyncio.run` or `run_until_complete` is propagated. 

378 """ 

379 import inspect as _inspect 

380 

381 if not _inspect.isawaitable(value): 

382 return value 

383 

384 async def _inner() -> Any: 

385 """Await and return the captured awaitable `value`. 

386 

387 Returns: 

388 Any: The result produced by awaiting `value`. 

389 """ 

390 return await value 

391 

392 coro = _inner() 

393 

394 def _close_if_possible(obj: Any) -> None: 

395 """Attempt to call ``close()`` on an object, suppressing errors. 

396 

397 Args: 

398 obj: Object that may expose a callable ``close`` attribute. 

399 

400 Notes: 

401 Any exception raised by ``close()`` is suppressed. 

402 """ 

403 with suppress(Exception): 

404 close = getattr(obj, "close", None) 

405 if callable(close): 405 ↛ exitline 405 didn't jump to the function exit

406 close() 

407 

408 try: 

409 loop = asyncio.get_running_loop() 

410 except RuntimeError: 

411 try: 

412 return asyncio.run(coro) 

413 finally: 

414 _close_if_possible(value) 

415 coro.close() 

416 else: 

417 if hasattr(loop, "create_task"): 

418 task = loop.create_task(coro) 

419 _consume_task(task) 

420 return False if want_result else None 

421 

422 run_uc = getattr(loop, "run_until_complete", None) 

423 if callable(run_uc): 

424 try: 

425 return run_uc(coro) 

426 finally: 

427 _close_if_possible(value) 

428 coro.close() 

429 

430 try: 

431 task = asyncio.ensure_future(coro, loop=loop) 

432 _consume_task(task) 

433 return False if want_result else None 

434 except Exception: 

435 _close_if_possible(value) 

436 coro.close() 

437 return False if want_result else None