Coverage for  / home / runner / work / bijux-cli / bijux-cli / src / bijux_cli / api / facade.py: 98%

146 statements  

« prev     ^ index     » next       coverage.py v7.13.2, created at 2026-01-26 17:59 +0000

1# SPDX-License-Identifier: Apache-2.0 

2# Copyright © 2025 Bijan Mousavi 

3 

4"""Provides a high-level, synchronous facade for the Bijux CLI's core engine. 

5 

6This module defines the `BijuxAPI` class, which serves as the primary public 

7interface for programmatic interaction with the CLI. It wraps the asynchronous 

8core `Engine` and other services to present a stable, thread-safe, and 

9synchronous API. 

10 

11This facade is intended for use in integrations, testing, or any scenario 

12where the CLI's command and plugin management logic needs to be embedded 

13within another Python application. 

14""" 

15 

16from __future__ import annotations 

17 

18import asyncio 

19from collections.abc import Awaitable, Callable, Iterator 

20from contextlib import contextmanager, suppress 

21import importlib 

22import inspect 

23import io 

24import os 

25from pathlib import Path 

26import sys 

27from typing import Any, cast 

28 

29from bijux_cli.core.di import DIContainer 

30from bijux_cli.core.engine import Engine 

31from bijux_cli.core.enums import ColorMode, LogLevel, OutputFormat 

32from bijux_cli.core.errors import BijuxError, PluginError 

33from bijux_cli.core.precedence import FlagLayer, Flags, resolve_effective_config 

34from bijux_cli.core.runtime import run_awaitable, run_command 

35from bijux_cli.plugins.contracts import RegistryProtocol 

36from bijux_cli.services.contracts import ObservabilityProtocol, TelemetryProtocol 

37from bijux_cli.services.errors import ServiceError 

38 

39IGNORE = {"PS1", "LS_COLORS", "PROMPT_COMMAND", "GIT_PS1_FORMAT"} 

40_API_GUARD_ENV = "BIJUXCLI_API_GUARD" 

41 

42 

43def _api_guard_enabled() -> bool: 

44 """Return True when strict API guardrails are enabled.""" 

45 return os.environ.get(_API_GUARD_ENV) == "1" 

46 

47 

48@contextmanager 

49def _api_io_guard() -> Iterator[None]: 

50 """Ensure API calls do not write to stdout/stderr when guarded.""" 

51 if not _api_guard_enabled(): 

52 yield 

53 return 

54 out_buf = io.StringIO() 

55 err_buf = io.StringIO() 

56 with suppress(Exception): 

57 from contextlib import redirect_stderr, redirect_stdout 

58 

59 with redirect_stdout(out_buf), redirect_stderr(err_buf): 

60 yield 

61 if out_buf.getvalue().strip() or err_buf.getvalue().strip(): 61 ↛ exitline 61 didn't return from function '_api_io_guard' because the condition on line 61 was always true

62 raise BijuxError( 

63 "API purity guard: stdout/stderr output is not allowed", 

64 http_status=500, 

65 ) 

66 

67 

68def _consume_task(task: asyncio.Future[Any]) -> None: 

69 """Consumes an asyncio task to suppress unhandled exceptions.""" 

70 

71 def _eat_exc(t: asyncio.Future[Any]) -> None: 

72 """Retrieves and suppresses exceptions from a future.""" 

73 with suppress(Exception): 

74 _ = t.exception() 

75 

76 task.add_done_callback(_eat_exc) 

77 

78 

79class BijuxAPI: 

80 """A thread-safe, synchronous access layer for the Bijux CLI engine. 

81 

82 This class provides a stable public API for registering commands, executing 

83 them, and managing plugins. It wraps the internal asynchronous `Engine` to 

84 allow for simpler, synchronous integration into other applications. 

85 

86 Attributes: 

87 _di (DIContainer): The dependency injection container. 

88 _engine (Engine): The core asynchronous runtime engine. 

89 _registry (RegistryProtocol): The plugin registry service. 

90 _obs (ObservabilityProtocol): The logging service. 

91 _tel (TelemetryProtocol): The telemetry service. 

92 """ 

93 

94 def __init__(self, *, log_level: LogLevel = LogLevel.INFO) -> None: 

95 """Initializes the `BijuxAPI` and the underlying CLI engine. 

96 

97 Args: 

98 log_level (str): The default log level name for all underlying 

99 services. 

100 """ 

101 DIContainer.reset() 

102 self._di = DIContainer.current() 

103 self._engine = Engine( 

104 self._di, 

105 log_level=log_level, 

106 fmt=OutputFormat.JSON, 

107 ) 

108 self._registry: RegistryProtocol = self._di.resolve(RegistryProtocol) 

109 self._obs: ObservabilityProtocol = self._di.resolve(ObservabilityProtocol) 

110 self._tel: TelemetryProtocol = self._di.resolve(TelemetryProtocol) 

111 

112 def _schedule_event(self, name: str, payload: dict[str, Any]) -> None: 

113 """Schedules a "fire-and-forget" asynchronous telemetry event. 

114 

115 This helper handles the execution of async telemetry calls from a 

116 synchronous context. 

117 

118 Args: 

119 name (str): The name of the telemetry event. 

120 payload (dict[str, Any]): The data associated with the event. 

121 """ 

122 maybe = self._tel.event(name, payload) 

123 if inspect.isawaitable(maybe): 

124 run_awaitable(cast(Awaitable[Any], maybe)) 

125 

126 def register(self, name: str, callback: Callable[..., Any]) -> None: 

127 """Registers or replaces a Python callable as a CLI command. 

128 

129 The provided callable is wrapped to handle both synchronous and 

130 asynchronous functions automatically. 

131 

132 Args: 

133 name (str): The command name to register. 

134 callback (Callable[..., Any]): The Python function to be executed 

135 when the command is run. 

136 

137 Raises: 

138 BijuxError: If the command name is already in use or another 

139 registration error occurs. 

140 """ 

141 

142 class _Wrapper: 

143 """Wraps a user-provided callable to be executed asynchronously.""" 

144 

145 def __init__(self, cb: Callable[..., Any]) -> None: 

146 """Initializes the wrapper. 

147 

148 Args: 

149 cb (Callable[..., Any]): The callable to wrap. 

150 """ 

151 self._cb = cb 

152 

153 async def execute(self, *args: Any, **kwargs: Any) -> Any: 

154 """Execute the wrapped callable, awaiting if it's a coroutine. 

155 

156 Args: 

157 *args (Any): Positional arguments to pass to the callable. 

158 **kwargs (Any): Keyword arguments to pass to the callable. 

159 

160 Returns: 

161 Any: The result of the callable execution. 

162 """ 

163 if asyncio.iscoroutinefunction(self._cb): 

164 return await self._cb(*args, **kwargs) 

165 return self._cb(*args, **kwargs) 

166 

167 try: 

168 exists = bool(self._await_maybe(self._registry.has(name), want_result=True)) 

169 if exists: 

170 maybe = cast(Any, self._registry.deregister(name)) 

171 self._await_maybe(maybe) 

172 maybe2 = cast( 

173 Any, 

174 self._registry.register( 

175 name, _Wrapper(callback), alias=None, version=None 

176 ), 

177 ) 

178 self._await_maybe(maybe2) 

179 

180 self._obs.log("info", "Registered command", extra={"name": name}) 

181 self._schedule_event("api.register", {"name": name}) 

182 except ServiceError as exc: 

183 self._schedule_event( 

184 "api.register.error", {"name": name, "error": str(exc)} 

185 ) 

186 raise BijuxError( 

187 f"Could not register command {name}: {exc}", http_status=500 

188 ) from exc 

189 

190 def run_sync( 

191 self, 

192 name: str, 

193 *args: Any, 

194 quiet: bool = False, 

195 fmt: str = "json", 

196 pretty: bool = True, 

197 log_level: LogLevel = LogLevel.INFO, 

198 **kwargs: Any, 

199 ) -> Any: 

200 """Runs a command synchronously. 

201 

202 This method is a blocking wrapper around the asynchronous `run_async` 

203 method. It manages the asyncio event loop to provide a simple, 

204 synchronous interface. 

205 

206 Args: 

207 name (str): The name of the command to run. 

208 *args (Any): Positional arguments for the command. 

209 quiet (bool): If True, suppresses output. 

210 fmt (str): The output format ("json" or "yaml"). 

211 pretty (bool): If True, formats the output for readability. 

212 log_level (str): The requested log level. 

213 **kwargs (Any): Additional keyword arguments to pass to the command. 

214 

215 Returns: 

216 Any: The result of the command's execution. 

217 """ 

218 try: 

219 with _api_io_guard(): 

220 return run_command( 

221 self.run_async, 

222 name, 

223 *args, 

224 quiet=quiet, 

225 fmt=fmt, 

226 pretty=pretty, 

227 log_level=log_level, 

228 **kwargs, 

229 ) 

230 except SystemExit as exc: 

231 raise BijuxError( 

232 f"API purity guard: unexpected SystemExit({exc.code})", 

233 http_status=500, 

234 ) from exc 

235 

236 async def run_async( 

237 self, 

238 name: str, 

239 *args: Any, 

240 quiet: bool = False, 

241 fmt: str = "json", 

242 pretty: bool = True, 

243 log_level: LogLevel = LogLevel.INFO, 

244 **kwargs: Any, 

245 ) -> Any: 

246 """Runs a command asynchronously with validation. 

247 

248 This method performs validation of flags and environment variables 

249 before dispatching the command to the internal engine for execution. 

250 

251 Args: 

252 name (str): The name of the command to execute. 

253 *args (Any): Positional arguments for the command. 

254 quiet (bool): If True, suppresses output. 

255 fmt (str): The output format ("json" or "yaml"). 

256 pretty (bool): If True, formats the output for readability. 

257 log_level (str): The requested log level. 

258 **kwargs (Any): Additional keyword arguments to pass to the command. 

259 

260 Returns: 

261 Any: The result of the command's execution. 

262 

263 Raises: 

264 BijuxError: For invalid flags, unsupported formats, or internal 

265 execution errors. 

266 """ 

267 try: 

268 _ = pretty 

269 fmt_value = OutputFormat(fmt) 

270 log_value = ( 

271 log_level if isinstance(log_level, LogLevel) else LogLevel(log_level) 

272 ) 

273 resolved = resolve_effective_config( 

274 cli=FlagLayer( 

275 quiet=quiet, 

276 log_level=log_value, 

277 color=ColorMode.AUTO, 

278 format=fmt_value, 

279 ), 

280 env=FlagLayer(), 

281 file=FlagLayer(), 

282 defaults=Flags( 

283 quiet=False, 

284 log_level=LogLevel.INFO, 

285 color=ColorMode.AUTO, 

286 format=OutputFormat.JSON, 

287 ), 

288 ) 

289 fmt_value = resolved.flags.format 

290 

291 for k, v in os.environ.items(): 

292 if k in IGNORE: 292 ↛ 293line 292 didn't jump to line 293 because the condition on line 292 was never true

293 continue 

294 if not v.isascii(): 

295 raise BijuxError( 

296 "Non-ASCII characters in environment", http_status=400 

297 ) 

298 

299 with _api_io_guard(): 

300 result = await self._engine.run_command(name, *args, **kwargs) 

301 self._schedule_event("api.run", {"name": name}) 

302 return result 

303 

304 except PluginError as exc: 

305 self._schedule_event("api.run.error", {"name": name, "error": str(exc)}) 

306 raise BijuxError( 

307 f"Failed to run command {name}: {exc}", http_status=500 

308 ) from exc 

309 

310 except ServiceError as exc: 

311 self._schedule_event("api.run.error", {"name": name, "error": str(exc)}) 

312 raise BijuxError( 

313 f"Failed to run command {name}: {exc}", http_status=500 

314 ) from exc 

315 

316 except BijuxError: 

317 raise 

318 

319 except SystemExit as exc: 

320 raise BijuxError( 

321 f"API purity guard: unexpected SystemExit({exc.code})", 

322 http_status=500, 

323 ) from exc 

324 

325 except Exception as exc: 

326 self._schedule_event("api.run.error", {"name": name, "error": str(exc)}) 

327 raise BijuxError( 

328 f"Failed to run command {name}: {exc}", http_status=500 

329 ) from exc 

330 

331 def load_plugin(self, path: str | Path) -> None: 

332 """Loads or reloads a plugin module from a file path. 

333 

334 This method dynamically loads the specified plugin file, initializes it, 

335 and registers it with the CLI system. If the plugin is already loaded, 

336 it is reloaded. 

337 

338 Args: 

339 path (str | Path): The filesystem path to the plugin's Python file. 

340 

341 Raises: 

342 BijuxError: If plugin loading, initialization, or registration fails. 

343 """ 

344 from bijux_cli.core.version import __version__ 

345 from bijux_cli.plugins import load_plugin as _load_plugin 

346 

347 p = Path(path).expanduser().resolve() 

348 module_name = f"bijux_plugin_{p.stem}" 

349 

350 try: 

351 if module_name in sys.modules: 

352 importlib.reload(sys.modules[module_name]) 

353 

354 plugin = _load_plugin(p, module_name) 

355 plugin.startup(self._engine.di) 

356 

357 exists = bool( 

358 self._await_maybe(self._registry.has(p.stem), want_result=True) 

359 ) 

360 if exists: 

361 self._await_maybe(cast(Any, self._registry).deregister(p.stem)) 

362 

363 self._await_maybe( 

364 cast(Any, self._registry).register( 

365 p.stem, 

366 plugin, 

367 alias=str(__version__), 

368 version=getattr(plugin, "version", None), 

369 ) 

370 ) 

371 self._obs.log("info", "Loaded plugin", extra={"path": str(p)}) 

372 self._schedule_event("api.plugin_loaded", {"path": str(p)}) 

373 

374 except Exception as exc: 

375 self._schedule_event( 

376 "api.plugin_load.error", {"path": str(p), "error": str(exc)} 

377 ) 

378 raise BijuxError( 

379 f"Failed to load plugin {p}: {exc}", http_status=500 

380 ) from exc 

381 

382 @staticmethod 

383 def _await_maybe(value: Any, *, want_result: bool = False) -> Any: 

384 """Synchronously handle possibly-awaitable values with safe fallbacks. 

385 

386 Args: 

387 value: A value that may or may not be awaitable (e.g., a coroutine, 

388 Future, Task, or a plain value). 

389 want_result: When `True`, and the coroutine is *scheduled* (not awaited), 

390 return `False` instead of `None` so callers can reliably detect that 

391 no immediate result is available. 

392 

393 Returns: 

394 The original `value` if it is not awaitable; otherwise, either the 

395 awaited result (when run synchronously) or `None`/`False` when the 

396 coroutine is scheduled for background execution. 

397 

398 Raises: 

399 Exception: Any exception raised by the coroutine when it is run 

400 synchronously via `asyncio.run` or `run_until_complete` is propagated. 

401 """ 

402 import inspect as _inspect 

403 

404 if not _inspect.isawaitable(value): 

405 return value 

406 

407 async def _inner() -> Any: 

408 """Await and return the captured awaitable `value`. 

409 

410 Returns: 

411 Any: The result produced by awaiting `value`. 

412 """ 

413 return await value 

414 

415 coro = _inner() 

416 

417 def _close_if_possible(obj: Any) -> None: 

418 """Attempt to call ``close()`` on an object, suppressing errors. 

419 

420 Args: 

421 obj: Object that may expose a callable ``close`` attribute. 

422 

423 Notes: 

424 Any exception raised by ``close()`` is suppressed. 

425 """ 

426 with suppress(Exception): 

427 close = getattr(obj, "close", None) 

428 if callable(close): 428 ↛ exitline 428 didn't jump to the function exit

429 close() 

430 

431 try: 

432 return run_awaitable(coro, want_result=want_result) 

433 finally: 

434 _close_if_possible(value) 

435 with suppress(Exception): 

436 coro.close()