Coverage for /home/runner/work/bijux-cli/bijux-cli/src/bijux_cli/services/plugins/registry.py: 100%

116 statements  

« prev     ^ index     » next       coverage.py v7.10.4, created at 2025-08-19 23:36 +0000

1# SPDX-License-Identifier: MIT 

2# Copyright © 2025 Bijan Mousavi 

3 

4"""Provides a concrete plugin registry service using the `pluggy` framework. 

5 

6This module defines the `Registry` class, which implements the 

7`RegistryProtocol`. It serves as the central manager for the entire plugin 

8lifecycle, including registration, aliasing, metadata storage, and the 

9invocation of plugin hooks. It is built on top of the `pluggy` library to 

10provide a robust and extensible plugin architecture. 

11""" 

12 

13from __future__ import annotations 

14 

15import asyncio 

16from collections.abc import AsyncIterable 

17from types import MappingProxyType 

18from typing import Any 

19 

20from injector import inject 

21import pluggy 

22 

23from bijux_cli.contracts import RegistryProtocol 

24from bijux_cli.core.exceptions import ServiceError 

25from bijux_cli.infra.telemetry import LoggingTelemetry 

26 

27 

28class Registry(RegistryProtocol): 

29 """A `pluggy`-based registry for managing CLI plugins. 

30 

31 This class provides aliasing, metadata storage, and telemetry integration 

32 on top of the core `pluggy` plugin management system. 

33 

34 Attributes: 

35 _telemetry (LoggingTelemetry): The telemetry service for events. 

36 _pm (pluggy.PluginManager): The underlying `pluggy` plugin manager. 

37 _plugins (dict): A mapping of canonical plugin names to plugin objects. 

38 _aliases (dict): A mapping of alias names to canonical plugin names. 

39 _meta (dict): A mapping of canonical plugin names to their metadata. 

40 mapping (MappingProxyType): A read-only view of the `_plugins` mapping. 

41 """ 

42 

43 @inject 

44 def __init__(self, telemetry: LoggingTelemetry): 

45 """Initializes the `Registry` service. 

46 

47 Args: 

48 telemetry (LoggingTelemetry): The telemetry service for tracking 

49 registry events. 

50 """ 

51 self._telemetry = telemetry 

52 self._pm = pluggy.PluginManager("bijux") 

53 from bijux_cli.services.plugins.hooks import CoreSpec 

54 

55 self._pm.add_hookspecs(CoreSpec) 

56 self._plugins: dict[str, object] = {} 

57 self._aliases: dict[str, str] = {} 

58 self._meta: dict[str, dict[str, str]] = {} 

59 self.mapping = MappingProxyType(self._plugins) 

60 

61 def register( 

62 self, 

63 name: str, 

64 plugin: object, 

65 *, 

66 alias: str | None = None, 

67 version: str | None = None, 

68 ) -> None: 

69 """Registers a plugin with the registry. 

70 

71 Args: 

72 name (str): The canonical name of the plugin. 

73 plugin (object): The plugin object to register. 

74 alias (str | None): An optional alias for the plugin. 

75 version (str | None): An optional version string for the plugin. 

76 

77 Returns: 

78 None: 

79 

80 Raises: 

81 ServiceError: If the name, alias, or plugin object is already 

82 registered, or if the underlying `pluggy` registration fails. 

83 """ 

84 if name in self._plugins: 

85 raise ServiceError(f"Plugin {name!r} already registered", http_status=400) 

86 if plugin in self._plugins.values(): 

87 raise ServiceError( 

88 "Plugin object already registered under a different name", 

89 http_status=400, 

90 ) 

91 if alias and (alias in self._plugins or alias in self._aliases): 

92 raise ServiceError(f"Alias {alias!r} already in use", http_status=400) 

93 try: 

94 self._pm.register(plugin, name) 

95 except ValueError as error: 

96 raise ServiceError( 

97 f"Pluggy failed to register {name}: {error}", http_status=500 

98 ) from error 

99 self._plugins[name] = plugin 

100 self._meta[name] = {"version": version or "unknown"} 

101 if alias: 

102 self._aliases[alias] = name 

103 try: 

104 self._telemetry.event( 

105 "registry_plugin_registered", 

106 {"name": name, "alias": alias, "version": version}, 

107 ) 

108 except RuntimeError as error: 

109 self._telemetry.event( 

110 "registry_telemetry_failed", 

111 {"operation": "register", "error": str(error)}, 

112 ) 

113 

114 def deregister(self, name: str) -> None: 

115 """Deregisters a plugin from the registry. 

116 

117 Args: 

118 name (str): The name or alias of the plugin to deregister. 

119 

120 Returns: 

121 None: 

122 

123 Raises: 

124 ServiceError: If the underlying `pluggy` deregistration fails. 

125 """ 

126 canonical = self._aliases.get(name, name) 

127 plugin = self._plugins.pop(canonical, None) 

128 if not plugin: 

129 return 

130 try: 

131 self._pm.unregister(plugin) 

132 except ValueError as error: 

133 raise ServiceError( 

134 f"Pluggy failed to deregister {canonical}: {error}", http_status=500 

135 ) from error 

136 self._meta.pop(canonical, None) 

137 self._aliases = {a: n for a, n in self._aliases.items() if n != canonical} 

138 try: 

139 self._telemetry.event("registry_plugin_deregistered", {"name": canonical}) 

140 except RuntimeError as error: 

141 self._telemetry.event( 

142 "registry_telemetry_failed", 

143 {"operation": "deregister", "error": str(error)}, 

144 ) 

145 

146 def get(self, name: str) -> object: 

147 """Retrieves a plugin by its name or alias. 

148 

149 Args: 

150 name (str): The name or alias of the plugin to retrieve. 

151 

152 Returns: 

153 object: The registered plugin object. 

154 

155 Raises: 

156 ServiceError: If the plugin is not found. 

157 """ 

158 canonical = self._aliases.get(name, name) 

159 try: 

160 plugin = self._plugins[canonical] 

161 except KeyError as key_error: 

162 try: 

163 self._telemetry.event( 

164 "registry_plugin_retrieve_failed", 

165 {"name": name, "error": str(key_error)}, 

166 ) 

167 except RuntimeError as telemetry_error: 

168 self._telemetry.event( 

169 "registry_telemetry_failed", 

170 {"operation": "retrieve_failed", "error": str(telemetry_error)}, 

171 ) 

172 raise ServiceError( 

173 f"Plugin {name!r} not found", http_status=404 

174 ) from key_error 

175 try: 

176 self._telemetry.event("registry_plugin_retrieved", {"name": canonical}) 

177 except RuntimeError as error: 

178 self._telemetry.event( 

179 "registry_telemetry_failed", 

180 {"operation": "retrieve", "error": str(error)}, 

181 ) 

182 return plugin 

183 

184 def names(self) -> list[str]: 

185 """Returns a list of all registered plugin names. 

186 

187 Returns: 

188 list[str]: A list of the canonical names of all registered plugins. 

189 """ 

190 names = list(self._plugins.keys()) 

191 try: 

192 self._telemetry.event("registry_list", {"names": names}) 

193 except RuntimeError as error: 

194 self._telemetry.event( 

195 "registry_telemetry_failed", {"operation": "list", "error": str(error)} 

196 ) 

197 return names 

198 

199 def has(self, name: str) -> bool: 

200 """Checks if a plugin is registered under a given name or alias. 

201 

202 Args: 

203 name (str): The name or alias of the plugin to check. 

204 

205 Returns: 

206 bool: True if the plugin is registered, otherwise False. 

207 """ 

208 exists = name in self._plugins or name in self._aliases 

209 try: 

210 self._telemetry.event("registry_contains", {"name": name, "result": exists}) 

211 except RuntimeError as error: 

212 self._telemetry.event( 

213 "registry_telemetry_failed", 

214 {"operation": "contains", "error": str(error)}, 

215 ) 

216 return exists 

217 

218 def meta(self, name: str) -> dict[str, str]: 

219 """Retrieves metadata for a specific plugin. 

220 

221 Args: 

222 name (str): The name or alias of the plugin. 

223 

224 Returns: 

225 dict[str, str]: A dictionary containing the plugin's metadata. 

226 """ 

227 canonical = self._aliases.get(name, name) 

228 info = dict(self._meta.get(canonical, {})) 

229 try: 

230 self._telemetry.event("registry_meta_retrieved", {"name": canonical}) 

231 except RuntimeError as error: 

232 self._telemetry.event( 

233 "registry_telemetry_failed", 

234 {"operation": "meta_retrieved", "error": str(error)}, 

235 ) 

236 return info 

237 

238 async def call_hook(self, hook: str, *args: Any, **kwargs: Any) -> list[Any]: 

239 """Invokes a hook on all registered plugins that implement it. 

240 

241 This method handles results from multiple plugins, awaiting any results 

242 that are coroutines. 

243 

244 Args: 

245 hook (str): The name of the hook to invoke. 

246 *args (Any): Positional arguments to pass to the hook. 

247 **kwargs (Any): Keyword arguments to pass to the hook. 

248 

249 Returns: 

250 list[Any]: A list containing the results from all hook 

251 implementations that did not return `None`. 

252 

253 Raises: 

254 ServiceError: If the specified hook does not exist. 

255 """ 

256 try: 

257 hook_fn = getattr(self._pm.hook, hook) 

258 results = hook_fn(*args, **kwargs) 

259 except AttributeError as error: 

260 raise ServiceError(f"Hook {hook!r} not found", http_status=404) from error 

261 collected = [] 

262 if isinstance(results, AsyncIterable): 

263 async for result in results: 

264 if asyncio.iscoroutine(result): 

265 collected.append(await result) 

266 elif result is not None: 

267 collected.append(result) 

268 else: 

269 for result in results: 

270 if asyncio.iscoroutine(result): 

271 collected.append(await result) 

272 elif result is not None: 

273 collected.append(result) 

274 try: 

275 self._telemetry.event("registry_hook_called", {"hook": hook}) 

276 except RuntimeError as error: 

277 self._telemetry.event( 

278 "registry_telemetry_failed", 

279 {"operation": "hook_called", "error": str(error)}, 

280 ) 

281 return collected 

282 

283 

284__all__ = ["Registry"]