Coverage for / home / runner / work / bijux-cli / bijux-cli / src / bijux_cli / core / runtime.py: 91%
85 statements
« prev ^ index » next coverage.py v7.13.2, created at 2026-01-26 17:59 +0000
« prev ^ index » next coverage.py v7.13.2, created at 2026-01-26 17:59 +0000
1# SPDX-License-Identifier: Apache-2.0
2# Copyright © 2025 Bijan Mousavi
4"""Async runtime helpers."""
6from __future__ import annotations
8import asyncio
9from collections.abc import Awaitable, Callable
10import contextlib
11import functools
12import inspect
13from typing import Any, TypeVar, cast
15import anyio
16import typer
18from bijux_cli.core.exit_policy import ExitIntentError
20T = TypeVar("T")
23async def _execute(func: Callable[..., Any], *args: Any, **kwargs: Any) -> Any:
24 """Run sync or async callables in the appropriate execution context."""
25 if inspect.iscoroutinefunction(func):
26 return await func(*args, **kwargs)
27 return await anyio.to_thread.run_sync(functools.partial(func, *args, **kwargs))
30def run_command(func: Callable[..., Any], *args: Any, **kwargs: Any) -> Any:
31 """Run a callable under the CLI-owned event loop."""
33 async def _inner() -> Any:
34 try:
35 return await _execute(func, *args, **kwargs)
36 except ExitIntentError as exc:
37 execute_exit_intent(exc.intent)
39 return anyio.run(_inner)
42def run_awaitable(value: Awaitable[T], *, want_result: bool = False) -> T | None:
43 """Synchronously handle an awaitable, scheduling if already in a loop."""
45 async def _inner() -> T:
46 return await value
48 try:
49 loop = asyncio.get_running_loop()
50 except RuntimeError:
51 return anyio.run(_inner)
53 if hasattr(loop, "create_task"):
54 loop.create_task(_inner())
55 return None
57 run_uc = getattr(loop, "run_until_complete", None)
58 if callable(run_uc):
59 result = run_uc(_inner())
60 close = getattr(value, "close", None)
61 if callable(close): 61 ↛ 64line 61 didn't jump to line 64 because the condition on line 61 was always true
62 with contextlib.suppress(Exception):
63 close()
64 return cast(T, result)
66 close = getattr(value, "close", None)
67 if callable(close): 67 ↛ 70line 67 didn't jump to line 70 because the condition on line 67 was always true
68 close()
70 return None
73def command_adapter(func: Callable[..., Any]) -> Callable[..., Any]:
74 """Wrap a command function so all execution flows through the adapter."""
75 if getattr(func, "_bijux_async_adapter", False):
76 return func
78 @functools.wraps(func)
79 def wrapper(*args: Any, **kwargs: Any) -> Any:
80 return run_command(func, *args, **kwargs)
82 wrapper._bijux_async_adapter = True # type: ignore[attr-defined]
83 return wrapper
86def adapt_typer(app: typer.Typer) -> None:
87 """Ensure a Typer app's callbacks/commands are routed via the adapter."""
88 callback = getattr(app, "registered_callback", None)
89 if callback and getattr(callback, "callback", None):
90 callback.callback = command_adapter(callback.callback)
92 for cmd in getattr(app, "registered_commands", []) or []:
93 if getattr(cmd, "callback", None): 93 ↛ 92line 93 didn't jump to line 92 because the condition on line 93 was always true
94 cmd.callback = command_adapter(cmd.callback)
96 for grp in getattr(app, "registered_groups", []) or []: 96 ↛ 97line 96 didn't jump to line 97 because the loop on line 96 never started
97 if getattr(grp, "typer_instance", None):
98 adapt_typer(grp.typer_instance)
101class AsyncTyper(typer.Typer):
102 """Typer subclass that routes all commands through the async adapter."""
104 def command(self, *args: Any, **kwargs: Any) -> Callable[[Callable[..., Any]], Any]:
105 """Wrap Typer commands with the async adapter."""
106 decorator = super().command(*args, **kwargs)
108 def wrapper(func: Callable[..., Any]) -> Any:
109 return decorator(command_adapter(func))
111 return wrapper
113 def callback(
114 self, *args: Any, **kwargs: Any
115 ) -> Callable[[Callable[..., Any]], Any]:
116 """Wrap Typer callbacks with the async adapter."""
117 decorator = super().callback(*args, **kwargs)
119 def wrapper(func: Callable[..., Any]) -> Any:
120 return decorator(command_adapter(func))
122 return wrapper
124 def add_typer(self, *args: Any, **kwargs: Any) -> None:
125 """Attach a Typer sub-app and adapt its commands."""
126 super().add_typer(*args, **kwargs)
127 if args: 127 ↛ exitline 127 didn't return from function 'add_typer' because the condition on line 127 was always true
128 sub = args[0]
129 if isinstance(sub, typer.Typer): 129 ↛ exitline 129 didn't return from function 'add_typer' because the condition on line 129 was always true
130 adapt_typer(sub)
133def execute_exit_intent(intent: Any) -> None:
134 """Execute an exit intent by emitting its payload and raising typer.Exit."""
135 from bijux_cli.cli.core.command import emit_payload, resolve_serializer
137 if intent.stream is not None and intent.payload is not None:
138 serializer = resolve_serializer()
139 # Invariant: output routing was resolved in core; infra only executes.
140 emit_payload(
141 intent.payload,
142 serializer=serializer,
143 emitter=None,
144 fmt=intent.fmt,
145 pretty=intent.pretty,
146 stream=intent.stream,
147 )
148 raise typer.Exit(int(intent.code))