Coverage for /home/runner/work/bijux-cli/bijux-cli/src/bijux_cli/services/plugins/registry.py: 100%
116 statements
« prev ^ index » next coverage.py v7.10.4, created at 2025-08-19 23:36 +0000
« prev ^ index » next coverage.py v7.10.4, created at 2025-08-19 23:36 +0000
1# SPDX-License-Identifier: MIT
2# Copyright © 2025 Bijan Mousavi
4"""Provides a concrete plugin registry service using the `pluggy` framework.
6This module defines the `Registry` class, which implements the
7`RegistryProtocol`. It serves as the central manager for the entire plugin
8lifecycle, including registration, aliasing, metadata storage, and the
9invocation of plugin hooks. It is built on top of the `pluggy` library to
10provide a robust and extensible plugin architecture.
11"""
13from __future__ import annotations
15import asyncio
16from collections.abc import AsyncIterable
17from types import MappingProxyType
18from typing import Any
20from injector import inject
21import pluggy
23from bijux_cli.contracts import RegistryProtocol
24from bijux_cli.core.exceptions import ServiceError
25from bijux_cli.infra.telemetry import LoggingTelemetry
28class Registry(RegistryProtocol):
29 """A `pluggy`-based registry for managing CLI plugins.
31 This class provides aliasing, metadata storage, and telemetry integration
32 on top of the core `pluggy` plugin management system.
34 Attributes:
35 _telemetry (LoggingTelemetry): The telemetry service for events.
36 _pm (pluggy.PluginManager): The underlying `pluggy` plugin manager.
37 _plugins (dict): A mapping of canonical plugin names to plugin objects.
38 _aliases (dict): A mapping of alias names to canonical plugin names.
39 _meta (dict): A mapping of canonical plugin names to their metadata.
40 mapping (MappingProxyType): A read-only view of the `_plugins` mapping.
41 """
43 @inject
44 def __init__(self, telemetry: LoggingTelemetry):
45 """Initializes the `Registry` service.
47 Args:
48 telemetry (LoggingTelemetry): The telemetry service for tracking
49 registry events.
50 """
51 self._telemetry = telemetry
52 self._pm = pluggy.PluginManager("bijux")
53 from bijux_cli.services.plugins.hooks import CoreSpec
55 self._pm.add_hookspecs(CoreSpec)
56 self._plugins: dict[str, object] = {}
57 self._aliases: dict[str, str] = {}
58 self._meta: dict[str, dict[str, str]] = {}
59 self.mapping = MappingProxyType(self._plugins)
61 def register(
62 self,
63 name: str,
64 plugin: object,
65 *,
66 alias: str | None = None,
67 version: str | None = None,
68 ) -> None:
69 """Registers a plugin with the registry.
71 Args:
72 name (str): The canonical name of the plugin.
73 plugin (object): The plugin object to register.
74 alias (str | None): An optional alias for the plugin.
75 version (str | None): An optional version string for the plugin.
77 Returns:
78 None:
80 Raises:
81 ServiceError: If the name, alias, or plugin object is already
82 registered, or if the underlying `pluggy` registration fails.
83 """
84 if name in self._plugins:
85 raise ServiceError(f"Plugin {name!r} already registered", http_status=400)
86 if plugin in self._plugins.values():
87 raise ServiceError(
88 "Plugin object already registered under a different name",
89 http_status=400,
90 )
91 if alias and (alias in self._plugins or alias in self._aliases):
92 raise ServiceError(f"Alias {alias!r} already in use", http_status=400)
93 try:
94 self._pm.register(plugin, name)
95 except ValueError as error:
96 raise ServiceError(
97 f"Pluggy failed to register {name}: {error}", http_status=500
98 ) from error
99 self._plugins[name] = plugin
100 self._meta[name] = {"version": version or "unknown"}
101 if alias:
102 self._aliases[alias] = name
103 try:
104 self._telemetry.event(
105 "registry_plugin_registered",
106 {"name": name, "alias": alias, "version": version},
107 )
108 except RuntimeError as error:
109 self._telemetry.event(
110 "registry_telemetry_failed",
111 {"operation": "register", "error": str(error)},
112 )
114 def deregister(self, name: str) -> None:
115 """Deregisters a plugin from the registry.
117 Args:
118 name (str): The name or alias of the plugin to deregister.
120 Returns:
121 None:
123 Raises:
124 ServiceError: If the underlying `pluggy` deregistration fails.
125 """
126 canonical = self._aliases.get(name, name)
127 plugin = self._plugins.pop(canonical, None)
128 if not plugin:
129 return
130 try:
131 self._pm.unregister(plugin)
132 except ValueError as error:
133 raise ServiceError(
134 f"Pluggy failed to deregister {canonical}: {error}", http_status=500
135 ) from error
136 self._meta.pop(canonical, None)
137 self._aliases = {a: n for a, n in self._aliases.items() if n != canonical}
138 try:
139 self._telemetry.event("registry_plugin_deregistered", {"name": canonical})
140 except RuntimeError as error:
141 self._telemetry.event(
142 "registry_telemetry_failed",
143 {"operation": "deregister", "error": str(error)},
144 )
146 def get(self, name: str) -> object:
147 """Retrieves a plugin by its name or alias.
149 Args:
150 name (str): The name or alias of the plugin to retrieve.
152 Returns:
153 object: The registered plugin object.
155 Raises:
156 ServiceError: If the plugin is not found.
157 """
158 canonical = self._aliases.get(name, name)
159 try:
160 plugin = self._plugins[canonical]
161 except KeyError as key_error:
162 try:
163 self._telemetry.event(
164 "registry_plugin_retrieve_failed",
165 {"name": name, "error": str(key_error)},
166 )
167 except RuntimeError as telemetry_error:
168 self._telemetry.event(
169 "registry_telemetry_failed",
170 {"operation": "retrieve_failed", "error": str(telemetry_error)},
171 )
172 raise ServiceError(
173 f"Plugin {name!r} not found", http_status=404
174 ) from key_error
175 try:
176 self._telemetry.event("registry_plugin_retrieved", {"name": canonical})
177 except RuntimeError as error:
178 self._telemetry.event(
179 "registry_telemetry_failed",
180 {"operation": "retrieve", "error": str(error)},
181 )
182 return plugin
184 def names(self) -> list[str]:
185 """Returns a list of all registered plugin names.
187 Returns:
188 list[str]: A list of the canonical names of all registered plugins.
189 """
190 names = list(self._plugins.keys())
191 try:
192 self._telemetry.event("registry_list", {"names": names})
193 except RuntimeError as error:
194 self._telemetry.event(
195 "registry_telemetry_failed", {"operation": "list", "error": str(error)}
196 )
197 return names
199 def has(self, name: str) -> bool:
200 """Checks if a plugin is registered under a given name or alias.
202 Args:
203 name (str): The name or alias of the plugin to check.
205 Returns:
206 bool: True if the plugin is registered, otherwise False.
207 """
208 exists = name in self._plugins or name in self._aliases
209 try:
210 self._telemetry.event("registry_contains", {"name": name, "result": exists})
211 except RuntimeError as error:
212 self._telemetry.event(
213 "registry_telemetry_failed",
214 {"operation": "contains", "error": str(error)},
215 )
216 return exists
218 def meta(self, name: str) -> dict[str, str]:
219 """Retrieves metadata for a specific plugin.
221 Args:
222 name (str): The name or alias of the plugin.
224 Returns:
225 dict[str, str]: A dictionary containing the plugin's metadata.
226 """
227 canonical = self._aliases.get(name, name)
228 info = dict(self._meta.get(canonical, {}))
229 try:
230 self._telemetry.event("registry_meta_retrieved", {"name": canonical})
231 except RuntimeError as error:
232 self._telemetry.event(
233 "registry_telemetry_failed",
234 {"operation": "meta_retrieved", "error": str(error)},
235 )
236 return info
238 async def call_hook(self, hook: str, *args: Any, **kwargs: Any) -> list[Any]:
239 """Invokes a hook on all registered plugins that implement it.
241 This method handles results from multiple plugins, awaiting any results
242 that are coroutines.
244 Args:
245 hook (str): The name of the hook to invoke.
246 *args (Any): Positional arguments to pass to the hook.
247 **kwargs (Any): Keyword arguments to pass to the hook.
249 Returns:
250 list[Any]: A list containing the results from all hook
251 implementations that did not return `None`.
253 Raises:
254 ServiceError: If the specified hook does not exist.
255 """
256 try:
257 hook_fn = getattr(self._pm.hook, hook)
258 results = hook_fn(*args, **kwargs)
259 except AttributeError as error:
260 raise ServiceError(f"Hook {hook!r} not found", http_status=404) from error
261 collected = []
262 if isinstance(results, AsyncIterable):
263 async for result in results:
264 if asyncio.iscoroutine(result):
265 collected.append(await result)
266 elif result is not None:
267 collected.append(result)
268 else:
269 for result in results:
270 if asyncio.iscoroutine(result):
271 collected.append(await result)
272 elif result is not None:
273 collected.append(result)
274 try:
275 self._telemetry.event("registry_hook_called", {"hook": hook})
276 except RuntimeError as error:
277 self._telemetry.event(
278 "registry_telemetry_failed",
279 {"operation": "hook_called", "error": str(error)},
280 )
281 return collected
284__all__ = ["Registry"]