Coverage for  / home / runner / work / bijux-cli / bijux-cli / src / bijux_cli / core / runtime.py: 91%

85 statements  

« prev     ^ index     » next       coverage.py v7.13.2, created at 2026-01-26 17:59 +0000

1# SPDX-License-Identifier: Apache-2.0 

2# Copyright © 2025 Bijan Mousavi 

3 

4"""Async runtime helpers.""" 

5 

6from __future__ import annotations 

7 

8import asyncio 

9from collections.abc import Awaitable, Callable 

10import contextlib 

11import functools 

12import inspect 

13from typing import Any, TypeVar, cast 

14 

15import anyio 

16import typer 

17 

18from bijux_cli.core.exit_policy import ExitIntentError 

19 

20T = TypeVar("T") 

21 

22 

23async def _execute(func: Callable[..., Any], *args: Any, **kwargs: Any) -> Any: 

24 """Run sync or async callables in the appropriate execution context.""" 

25 if inspect.iscoroutinefunction(func): 

26 return await func(*args, **kwargs) 

27 return await anyio.to_thread.run_sync(functools.partial(func, *args, **kwargs)) 

28 

29 

30def run_command(func: Callable[..., Any], *args: Any, **kwargs: Any) -> Any: 

31 """Run a callable under the CLI-owned event loop.""" 

32 

33 async def _inner() -> Any: 

34 try: 

35 return await _execute(func, *args, **kwargs) 

36 except ExitIntentError as exc: 

37 execute_exit_intent(exc.intent) 

38 

39 return anyio.run(_inner) 

40 

41 

42def run_awaitable(value: Awaitable[T], *, want_result: bool = False) -> T | None: 

43 """Synchronously handle an awaitable, scheduling if already in a loop.""" 

44 

45 async def _inner() -> T: 

46 return await value 

47 

48 try: 

49 loop = asyncio.get_running_loop() 

50 except RuntimeError: 

51 return anyio.run(_inner) 

52 

53 if hasattr(loop, "create_task"): 

54 loop.create_task(_inner()) 

55 return None 

56 

57 run_uc = getattr(loop, "run_until_complete", None) 

58 if callable(run_uc): 

59 result = run_uc(_inner()) 

60 close = getattr(value, "close", None) 

61 if callable(close): 61 ↛ 64line 61 didn't jump to line 64 because the condition on line 61 was always true

62 with contextlib.suppress(Exception): 

63 close() 

64 return cast(T, result) 

65 

66 close = getattr(value, "close", None) 

67 if callable(close): 67 ↛ 70line 67 didn't jump to line 70 because the condition on line 67 was always true

68 close() 

69 

70 return None 

71 

72 

73def command_adapter(func: Callable[..., Any]) -> Callable[..., Any]: 

74 """Wrap a command function so all execution flows through the adapter.""" 

75 if getattr(func, "_bijux_async_adapter", False): 

76 return func 

77 

78 @functools.wraps(func) 

79 def wrapper(*args: Any, **kwargs: Any) -> Any: 

80 return run_command(func, *args, **kwargs) 

81 

82 wrapper._bijux_async_adapter = True # type: ignore[attr-defined] 

83 return wrapper 

84 

85 

86def adapt_typer(app: typer.Typer) -> None: 

87 """Ensure a Typer app's callbacks/commands are routed via the adapter.""" 

88 callback = getattr(app, "registered_callback", None) 

89 if callback and getattr(callback, "callback", None): 

90 callback.callback = command_adapter(callback.callback) 

91 

92 for cmd in getattr(app, "registered_commands", []) or []: 

93 if getattr(cmd, "callback", None): 93 ↛ 92line 93 didn't jump to line 92 because the condition on line 93 was always true

94 cmd.callback = command_adapter(cmd.callback) 

95 

96 for grp in getattr(app, "registered_groups", []) or []: 96 ↛ 97line 96 didn't jump to line 97 because the loop on line 96 never started

97 if getattr(grp, "typer_instance", None): 

98 adapt_typer(grp.typer_instance) 

99 

100 

101class AsyncTyper(typer.Typer): 

102 """Typer subclass that routes all commands through the async adapter.""" 

103 

104 def command(self, *args: Any, **kwargs: Any) -> Callable[[Callable[..., Any]], Any]: 

105 """Wrap Typer commands with the async adapter.""" 

106 decorator = super().command(*args, **kwargs) 

107 

108 def wrapper(func: Callable[..., Any]) -> Any: 

109 return decorator(command_adapter(func)) 

110 

111 return wrapper 

112 

113 def callback( 

114 self, *args: Any, **kwargs: Any 

115 ) -> Callable[[Callable[..., Any]], Any]: 

116 """Wrap Typer callbacks with the async adapter.""" 

117 decorator = super().callback(*args, **kwargs) 

118 

119 def wrapper(func: Callable[..., Any]) -> Any: 

120 return decorator(command_adapter(func)) 

121 

122 return wrapper 

123 

124 def add_typer(self, *args: Any, **kwargs: Any) -> None: 

125 """Attach a Typer sub-app and adapt its commands.""" 

126 super().add_typer(*args, **kwargs) 

127 if args: 127 ↛ exitline 127 didn't return from function 'add_typer' because the condition on line 127 was always true

128 sub = args[0] 

129 if isinstance(sub, typer.Typer): 129 ↛ exitline 129 didn't return from function 'add_typer' because the condition on line 129 was always true

130 adapt_typer(sub) 

131 

132 

133def execute_exit_intent(intent: Any) -> None: 

134 """Execute an exit intent by emitting its payload and raising typer.Exit.""" 

135 from bijux_cli.cli.core.command import emit_payload, resolve_serializer 

136 

137 if intent.stream is not None and intent.payload is not None: 

138 serializer = resolve_serializer() 

139 # Invariant: output routing was resolved in core; infra only executes. 

140 emit_payload( 

141 intent.payload, 

142 serializer=serializer, 

143 emitter=None, 

144 fmt=intent.fmt, 

145 pretty=intent.pretty, 

146 stream=intent.stream, 

147 ) 

148 raise typer.Exit(int(intent.code))