Coverage for / home / runner / work / bijux-cli / bijux-cli / src / bijux_cli / api / facade.py: 98%
146 statements
« prev ^ index » next coverage.py v7.13.2, created at 2026-01-26 17:59 +0000
« prev ^ index » next coverage.py v7.13.2, created at 2026-01-26 17:59 +0000
1# SPDX-License-Identifier: Apache-2.0
2# Copyright © 2025 Bijan Mousavi
4"""Provides a high-level, synchronous facade for the Bijux CLI's core engine.
6This module defines the `BijuxAPI` class, which serves as the primary public
7interface for programmatic interaction with the CLI. It wraps the asynchronous
8core `Engine` and other services to present a stable, thread-safe, and
9synchronous API.
11This facade is intended for use in integrations, testing, or any scenario
12where the CLI's command and plugin management logic needs to be embedded
13within another Python application.
14"""
16from __future__ import annotations
18import asyncio
19from collections.abc import Awaitable, Callable, Iterator
20from contextlib import contextmanager, suppress
21import importlib
22import inspect
23import io
24import os
25from pathlib import Path
26import sys
27from typing import Any, cast
29from bijux_cli.core.di import DIContainer
30from bijux_cli.core.engine import Engine
31from bijux_cli.core.enums import ColorMode, LogLevel, OutputFormat
32from bijux_cli.core.errors import BijuxError, PluginError
33from bijux_cli.core.precedence import FlagLayer, Flags, resolve_effective_config
34from bijux_cli.core.runtime import run_awaitable, run_command
35from bijux_cli.plugins.contracts import RegistryProtocol
36from bijux_cli.services.contracts import ObservabilityProtocol, TelemetryProtocol
37from bijux_cli.services.errors import ServiceError
39IGNORE = {"PS1", "LS_COLORS", "PROMPT_COMMAND", "GIT_PS1_FORMAT"}
40_API_GUARD_ENV = "BIJUXCLI_API_GUARD"
43def _api_guard_enabled() -> bool:
44 """Return True when strict API guardrails are enabled."""
45 return os.environ.get(_API_GUARD_ENV) == "1"
48@contextmanager
49def _api_io_guard() -> Iterator[None]:
50 """Ensure API calls do not write to stdout/stderr when guarded."""
51 if not _api_guard_enabled():
52 yield
53 return
54 out_buf = io.StringIO()
55 err_buf = io.StringIO()
56 with suppress(Exception):
57 from contextlib import redirect_stderr, redirect_stdout
59 with redirect_stdout(out_buf), redirect_stderr(err_buf):
60 yield
61 if out_buf.getvalue().strip() or err_buf.getvalue().strip(): 61 ↛ exitline 61 didn't return from function '_api_io_guard' because the condition on line 61 was always true
62 raise BijuxError(
63 "API purity guard: stdout/stderr output is not allowed",
64 http_status=500,
65 )
68def _consume_task(task: asyncio.Future[Any]) -> None:
69 """Consumes an asyncio task to suppress unhandled exceptions."""
71 def _eat_exc(t: asyncio.Future[Any]) -> None:
72 """Retrieves and suppresses exceptions from a future."""
73 with suppress(Exception):
74 _ = t.exception()
76 task.add_done_callback(_eat_exc)
79class BijuxAPI:
80 """A thread-safe, synchronous access layer for the Bijux CLI engine.
82 This class provides a stable public API for registering commands, executing
83 them, and managing plugins. It wraps the internal asynchronous `Engine` to
84 allow for simpler, synchronous integration into other applications.
86 Attributes:
87 _di (DIContainer): The dependency injection container.
88 _engine (Engine): The core asynchronous runtime engine.
89 _registry (RegistryProtocol): The plugin registry service.
90 _obs (ObservabilityProtocol): The logging service.
91 _tel (TelemetryProtocol): The telemetry service.
92 """
94 def __init__(self, *, log_level: LogLevel = LogLevel.INFO) -> None:
95 """Initializes the `BijuxAPI` and the underlying CLI engine.
97 Args:
98 log_level (str): The default log level name for all underlying
99 services.
100 """
101 DIContainer.reset()
102 self._di = DIContainer.current()
103 self._engine = Engine(
104 self._di,
105 log_level=log_level,
106 fmt=OutputFormat.JSON,
107 )
108 self._registry: RegistryProtocol = self._di.resolve(RegistryProtocol)
109 self._obs: ObservabilityProtocol = self._di.resolve(ObservabilityProtocol)
110 self._tel: TelemetryProtocol = self._di.resolve(TelemetryProtocol)
112 def _schedule_event(self, name: str, payload: dict[str, Any]) -> None:
113 """Schedules a "fire-and-forget" asynchronous telemetry event.
115 This helper handles the execution of async telemetry calls from a
116 synchronous context.
118 Args:
119 name (str): The name of the telemetry event.
120 payload (dict[str, Any]): The data associated with the event.
121 """
122 maybe = self._tel.event(name, payload)
123 if inspect.isawaitable(maybe):
124 run_awaitable(cast(Awaitable[Any], maybe))
126 def register(self, name: str, callback: Callable[..., Any]) -> None:
127 """Registers or replaces a Python callable as a CLI command.
129 The provided callable is wrapped to handle both synchronous and
130 asynchronous functions automatically.
132 Args:
133 name (str): The command name to register.
134 callback (Callable[..., Any]): The Python function to be executed
135 when the command is run.
137 Raises:
138 BijuxError: If the command name is already in use or another
139 registration error occurs.
140 """
142 class _Wrapper:
143 """Wraps a user-provided callable to be executed asynchronously."""
145 def __init__(self, cb: Callable[..., Any]) -> None:
146 """Initializes the wrapper.
148 Args:
149 cb (Callable[..., Any]): The callable to wrap.
150 """
151 self._cb = cb
153 async def execute(self, *args: Any, **kwargs: Any) -> Any:
154 """Execute the wrapped callable, awaiting if it's a coroutine.
156 Args:
157 *args (Any): Positional arguments to pass to the callable.
158 **kwargs (Any): Keyword arguments to pass to the callable.
160 Returns:
161 Any: The result of the callable execution.
162 """
163 if asyncio.iscoroutinefunction(self._cb):
164 return await self._cb(*args, **kwargs)
165 return self._cb(*args, **kwargs)
167 try:
168 exists = bool(self._await_maybe(self._registry.has(name), want_result=True))
169 if exists:
170 maybe = cast(Any, self._registry.deregister(name))
171 self._await_maybe(maybe)
172 maybe2 = cast(
173 Any,
174 self._registry.register(
175 name, _Wrapper(callback), alias=None, version=None
176 ),
177 )
178 self._await_maybe(maybe2)
180 self._obs.log("info", "Registered command", extra={"name": name})
181 self._schedule_event("api.register", {"name": name})
182 except ServiceError as exc:
183 self._schedule_event(
184 "api.register.error", {"name": name, "error": str(exc)}
185 )
186 raise BijuxError(
187 f"Could not register command {name}: {exc}", http_status=500
188 ) from exc
190 def run_sync(
191 self,
192 name: str,
193 *args: Any,
194 quiet: bool = False,
195 fmt: str = "json",
196 pretty: bool = True,
197 log_level: LogLevel = LogLevel.INFO,
198 **kwargs: Any,
199 ) -> Any:
200 """Runs a command synchronously.
202 This method is a blocking wrapper around the asynchronous `run_async`
203 method. It manages the asyncio event loop to provide a simple,
204 synchronous interface.
206 Args:
207 name (str): The name of the command to run.
208 *args (Any): Positional arguments for the command.
209 quiet (bool): If True, suppresses output.
210 fmt (str): The output format ("json" or "yaml").
211 pretty (bool): If True, formats the output for readability.
212 log_level (str): The requested log level.
213 **kwargs (Any): Additional keyword arguments to pass to the command.
215 Returns:
216 Any: The result of the command's execution.
217 """
218 try:
219 with _api_io_guard():
220 return run_command(
221 self.run_async,
222 name,
223 *args,
224 quiet=quiet,
225 fmt=fmt,
226 pretty=pretty,
227 log_level=log_level,
228 **kwargs,
229 )
230 except SystemExit as exc:
231 raise BijuxError(
232 f"API purity guard: unexpected SystemExit({exc.code})",
233 http_status=500,
234 ) from exc
236 async def run_async(
237 self,
238 name: str,
239 *args: Any,
240 quiet: bool = False,
241 fmt: str = "json",
242 pretty: bool = True,
243 log_level: LogLevel = LogLevel.INFO,
244 **kwargs: Any,
245 ) -> Any:
246 """Runs a command asynchronously with validation.
248 This method performs validation of flags and environment variables
249 before dispatching the command to the internal engine for execution.
251 Args:
252 name (str): The name of the command to execute.
253 *args (Any): Positional arguments for the command.
254 quiet (bool): If True, suppresses output.
255 fmt (str): The output format ("json" or "yaml").
256 pretty (bool): If True, formats the output for readability.
257 log_level (str): The requested log level.
258 **kwargs (Any): Additional keyword arguments to pass to the command.
260 Returns:
261 Any: The result of the command's execution.
263 Raises:
264 BijuxError: For invalid flags, unsupported formats, or internal
265 execution errors.
266 """
267 try:
268 _ = pretty
269 fmt_value = OutputFormat(fmt)
270 log_value = (
271 log_level if isinstance(log_level, LogLevel) else LogLevel(log_level)
272 )
273 resolved = resolve_effective_config(
274 cli=FlagLayer(
275 quiet=quiet,
276 log_level=log_value,
277 color=ColorMode.AUTO,
278 format=fmt_value,
279 ),
280 env=FlagLayer(),
281 file=FlagLayer(),
282 defaults=Flags(
283 quiet=False,
284 log_level=LogLevel.INFO,
285 color=ColorMode.AUTO,
286 format=OutputFormat.JSON,
287 ),
288 )
289 fmt_value = resolved.flags.format
291 for k, v in os.environ.items():
292 if k in IGNORE: 292 ↛ 293line 292 didn't jump to line 293 because the condition on line 292 was never true
293 continue
294 if not v.isascii():
295 raise BijuxError(
296 "Non-ASCII characters in environment", http_status=400
297 )
299 with _api_io_guard():
300 result = await self._engine.run_command(name, *args, **kwargs)
301 self._schedule_event("api.run", {"name": name})
302 return result
304 except PluginError as exc:
305 self._schedule_event("api.run.error", {"name": name, "error": str(exc)})
306 raise BijuxError(
307 f"Failed to run command {name}: {exc}", http_status=500
308 ) from exc
310 except ServiceError as exc:
311 self._schedule_event("api.run.error", {"name": name, "error": str(exc)})
312 raise BijuxError(
313 f"Failed to run command {name}: {exc}", http_status=500
314 ) from exc
316 except BijuxError:
317 raise
319 except SystemExit as exc:
320 raise BijuxError(
321 f"API purity guard: unexpected SystemExit({exc.code})",
322 http_status=500,
323 ) from exc
325 except Exception as exc:
326 self._schedule_event("api.run.error", {"name": name, "error": str(exc)})
327 raise BijuxError(
328 f"Failed to run command {name}: {exc}", http_status=500
329 ) from exc
331 def load_plugin(self, path: str | Path) -> None:
332 """Loads or reloads a plugin module from a file path.
334 This method dynamically loads the specified plugin file, initializes it,
335 and registers it with the CLI system. If the plugin is already loaded,
336 it is reloaded.
338 Args:
339 path (str | Path): The filesystem path to the plugin's Python file.
341 Raises:
342 BijuxError: If plugin loading, initialization, or registration fails.
343 """
344 from bijux_cli.core.version import __version__
345 from bijux_cli.plugins import load_plugin as _load_plugin
347 p = Path(path).expanduser().resolve()
348 module_name = f"bijux_plugin_{p.stem}"
350 try:
351 if module_name in sys.modules:
352 importlib.reload(sys.modules[module_name])
354 plugin = _load_plugin(p, module_name)
355 plugin.startup(self._engine.di)
357 exists = bool(
358 self._await_maybe(self._registry.has(p.stem), want_result=True)
359 )
360 if exists:
361 self._await_maybe(cast(Any, self._registry).deregister(p.stem))
363 self._await_maybe(
364 cast(Any, self._registry).register(
365 p.stem,
366 plugin,
367 alias=str(__version__),
368 version=getattr(plugin, "version", None),
369 )
370 )
371 self._obs.log("info", "Loaded plugin", extra={"path": str(p)})
372 self._schedule_event("api.plugin_loaded", {"path": str(p)})
374 except Exception as exc:
375 self._schedule_event(
376 "api.plugin_load.error", {"path": str(p), "error": str(exc)}
377 )
378 raise BijuxError(
379 f"Failed to load plugin {p}: {exc}", http_status=500
380 ) from exc
382 @staticmethod
383 def _await_maybe(value: Any, *, want_result: bool = False) -> Any:
384 """Synchronously handle possibly-awaitable values with safe fallbacks.
386 Args:
387 value: A value that may or may not be awaitable (e.g., a coroutine,
388 Future, Task, or a plain value).
389 want_result: When `True`, and the coroutine is *scheduled* (not awaited),
390 return `False` instead of `None` so callers can reliably detect that
391 no immediate result is available.
393 Returns:
394 The original `value` if it is not awaitable; otherwise, either the
395 awaited result (when run synchronously) or `None`/`False` when the
396 coroutine is scheduled for background execution.
398 Raises:
399 Exception: Any exception raised by the coroutine when it is run
400 synchronously via `asyncio.run` or `run_until_complete` is propagated.
401 """
402 import inspect as _inspect
404 if not _inspect.isawaitable(value):
405 return value
407 async def _inner() -> Any:
408 """Await and return the captured awaitable `value`.
410 Returns:
411 Any: The result produced by awaiting `value`.
412 """
413 return await value
415 coro = _inner()
417 def _close_if_possible(obj: Any) -> None:
418 """Attempt to call ``close()`` on an object, suppressing errors.
420 Args:
421 obj: Object that may expose a callable ``close`` attribute.
423 Notes:
424 Any exception raised by ``close()`` is suppressed.
425 """
426 with suppress(Exception):
427 close = getattr(obj, "close", None)
428 if callable(close): 428 ↛ exitline 428 didn't jump to the function exit
429 close()
431 try:
432 return run_awaitable(coro, want_result=want_result)
433 finally:
434 _close_if_possible(value)
435 with suppress(Exception):
436 coro.close()