Coverage for  / home / runner / work / bijux-cli / bijux-cli / src / bijux_cli / plugins / metadata.py: 90%

138 statements  

« prev     ^ index     » next       coverage.py v7.13.2, created at 2026-01-26 17:59 +0000

1# SPDX-License-Identifier: Apache-2.0 

2# Copyright © 2025 Bijan Mousavi 

3 

4"""Plugin discovery, metadata validation, and caching.""" 

5 

6from __future__ import annotations 

7 

8from collections.abc import Mapping 

9from dataclasses import dataclass 

10import importlib.metadata as im 

11import json 

12from pathlib import Path 

13from typing import Any, cast 

14 

15from packaging.requirements import Requirement 

16from packaging.specifiers import SpecifierSet 

17from packaging.utils import canonicalize_name 

18 

19from bijux_cli.core.errors import BijuxError 

20from bijux_cli.core.version import __version__ as cli_version 

21from bijux_cli.plugins import get_plugins_dir 

22from bijux_cli.plugins.catalog import PLUGIN_NAME_RE 

23 

24 

25class PluginMetadataError(BijuxError): 

26 """Raised when plugin metadata is missing or incompatible.""" 

27 

28 

29@dataclass(frozen=True) 

30class PluginMetadata: 

31 """Holds metadata for a discovered plugin.""" 

32 

33 name: str 

34 version: str 

35 enabled: bool 

36 source: str 

37 requires_cli: str 

38 schema_version: str = "1" 

39 dist_name: str | None = None 

40 entrypoint: im.EntryPoint | None = None 

41 path: Path | None = None 

42 

43 

44_CACHE: list[PluginMetadata] | None = None 

45 

46 

47def invalidate_plugin_cache() -> None: 

48 """Invalidates the discovery cache.""" 

49 global _CACHE 

50 _CACHE = None 

51 

52 

53def _require_cli_spec(spec: str, *, name: str) -> None: 

54 """Validate that the current CLI version satisfies the plugin spec.""" 

55 try: 

56 SpecifierSet(spec).contains(cli_version, prereleases=True) 

57 except Exception as exc: 

58 raise PluginMetadataError( 

59 f"Plugin {name!r} has invalid version spec {spec!r}: {exc}", 

60 http_status=400, 

61 ) from exc 

62 

63 if not SpecifierSet(spec).contains(cli_version, prereleases=True): 

64 raise PluginMetadataError( 

65 f"Plugin {name!r} requires bijux-cli {spec}, host is {cli_version}", 

66 http_status=400, 

67 ) 

68 

69 

70def _plugin_meta_from_dist(ep: im.EntryPoint) -> PluginMetadata: 

71 """Build plugin metadata from an entry point distribution.""" 

72 if not PLUGIN_NAME_RE.fullmatch(ep.name) or not ep.name.isascii(): 

73 raise PluginMetadataError( 

74 f"Plugin name {ep.name!r} is invalid", 

75 http_status=400, 

76 ) 

77 dist = getattr(ep, "dist", None) 

78 if dist is None: 

79 try: 

80 dist = im.distribution(ep.module.split(".")[0]) 

81 except Exception as exc: 

82 raise PluginMetadataError( 

83 f"Entry point {ep.name!r} has no distribution metadata: {exc}", 

84 http_status=400, 

85 ) from exc 

86 

87 meta = dist.metadata 

88 if hasattr(meta, "get"): 

89 dist_name = cast(Mapping[str, str], meta).get("Name") or dist.name 

90 else: 

91 dist_name = dist.name 

92 requires = dist.metadata.get_all("Requires-Dist") or [] 

93 spec = None 

94 for req_line in requires: 

95 req = Requirement(req_line) 

96 if canonicalize_name(req.name) == canonicalize_name("bijux-cli"): 

97 spec = str(req.specifier) or None 

98 break 

99 if not spec: 

100 raise PluginMetadataError( 

101 f"Plugin {ep.name!r} missing bijux-cli requirement", 

102 http_status=400, 

103 ) 

104 

105 _require_cli_spec(spec, name=ep.name) 

106 

107 return PluginMetadata( 

108 name=ep.name, 

109 version=dist.version or "unknown", 

110 enabled=True, 

111 source="entrypoint", 

112 requires_cli=spec, 

113 schema_version="1", 

114 dist_name=dist_name, 

115 entrypoint=ep, 

116 ) 

117 

118 

119def _plugin_meta_from_local(plug_dir: Path) -> PluginMetadata: 

120 """Build plugin metadata from a local plugin directory.""" 

121 meta_file = plug_dir / "plugin.json" 

122 if not meta_file.is_file(): 

123 raise PluginMetadataError( 

124 f"Plugin {plug_dir.name!r} missing plugin.json", 

125 http_status=400, 

126 ) 

127 

128 try: 

129 meta = json.loads(meta_file.read_text("utf-8")) 

130 except Exception as exc: 

131 raise PluginMetadataError( 

132 f"Plugin {plug_dir.name!r} has invalid plugin.json: {exc}", 

133 http_status=400, 

134 ) from exc 

135 

136 name = meta.get("name") 

137 version = meta.get("version") 

138 requires = meta.get("bijux_cli_version") 

139 enabled = bool(meta.get("enabled", True)) 

140 schema_version = meta.get("schema_version") 

141 

142 if not name or not version or not requires or not schema_version: 

143 raise PluginMetadataError( 

144 f"Plugin {plug_dir.name!r} missing required metadata fields", 

145 http_status=400, 

146 ) 

147 

148 if str(schema_version) != "1": 

149 raise PluginMetadataError( 

150 f"Plugin {plug_dir.name!r} has unsupported schema version {schema_version!r}", 

151 http_status=400, 

152 ) 

153 

154 if not PLUGIN_NAME_RE.fullmatch(name) or not name.isascii(): 

155 raise PluginMetadataError( 

156 f"Plugin name {name!r} is invalid", 

157 http_status=400, 

158 ) 

159 

160 if name != plug_dir.name: 

161 raise PluginMetadataError( 

162 f"Plugin dir {plug_dir.name!r} does not match metadata name {name!r}", 

163 http_status=400, 

164 ) 

165 

166 _require_cli_spec(str(requires), name=name) 

167 

168 return PluginMetadata( 

169 name=name, 

170 version=str(version), 

171 enabled=enabled, 

172 source="local", 

173 requires_cli=str(requires), 

174 schema_version=str(schema_version), 

175 path=plug_dir, 

176 ) 

177 

178 

179def validate_plugin_metadata(meta: PluginMetadata) -> None: 

180 """Validate core metadata fields for a plugin.""" 

181 if not PLUGIN_NAME_RE.fullmatch(meta.name) or not meta.name.isascii(): 181 ↛ 182line 181 didn't jump to line 182 because the condition on line 181 was never true

182 raise PluginMetadataError( 

183 f"Plugin name {meta.name!r} is invalid", 

184 http_status=400, 

185 ) 

186 if not meta.version or str(meta.version).strip() == "": 

187 raise PluginMetadataError( 

188 f"Plugin {meta.name!r} missing version", 

189 http_status=400, 

190 ) 

191 if not meta.requires_cli: 

192 raise PluginMetadataError( 

193 f"Plugin {meta.name!r} missing bijux-cli requirement", 

194 http_status=400, 

195 ) 

196 if str(meta.schema_version or "").strip() == "": 

197 raise PluginMetadataError( 

198 f"Plugin {meta.name!r} missing schema version", 

199 http_status=400, 

200 ) 

201 if str(meta.schema_version) != "1": 

202 raise PluginMetadataError( 

203 f"Plugin {meta.name!r} has unsupported schema version {meta.schema_version!r}", 

204 http_status=400, 

205 ) 

206 _require_cli_spec(meta.requires_cli, name=meta.name) 

207 

208 

209def discover_plugins(*, strict: bool = True) -> list[PluginMetadata]: 

210 """Discover plugins without importing plugin bodies. 

211 

212 Stages: discover → validate metadata → register → activate (lazy) → unload. 

213 This function performs discovery + metadata validation only. 

214 """ 

215 global _CACHE 

216 if _CACHE is not None: 

217 return list(_CACHE) 

218 

219 seen: dict[str, PluginMetadata] = {} 

220 

221 for ep in im.entry_points().select(group="bijux_cli.plugins"): 

222 try: 

223 meta = _plugin_meta_from_dist(ep) 

224 except PluginMetadataError: 

225 if strict: 

226 raise 

227 continue 

228 validate_plugin_metadata(meta) 

229 if meta.name in seen: 229 ↛ 230line 229 didn't jump to line 230 because the condition on line 229 was never true

230 raise PluginMetadataError( 

231 f"Duplicate plugin name detected: {meta.name!r}", http_status=400 

232 ) 

233 seen[meta.name] = meta 

234 

235 plugins_dir = get_plugins_dir() 

236 if plugins_dir.exists(): 236 ↛ 254line 236 didn't jump to line 254 because the condition on line 236 was always true

237 for pdir in plugins_dir.iterdir(): 

238 plug_py = pdir / "plugin.py" 

239 if not plug_py.is_file(): 

240 continue 

241 try: 

242 meta = _plugin_meta_from_local(pdir) 

243 except PluginMetadataError: 

244 if strict: 244 ↛ 246line 244 didn't jump to line 246 because the condition on line 244 was always true

245 raise 

246 continue 

247 validate_plugin_metadata(meta) 

248 if meta.name in seen: 248 ↛ 249line 248 didn't jump to line 249 because the condition on line 248 was never true

249 raise PluginMetadataError( 

250 f"Duplicate plugin name detected: {meta.name!r}", http_status=400 

251 ) 

252 seen[meta.name] = meta 

253 

254 _CACHE = sorted(seen.values(), key=lambda m: m.name) 

255 return list(_CACHE) 

256 

257 

258def get_plugin_metadata(name: str) -> PluginMetadata: 

259 """Return metadata for a plugin by name.""" 

260 for meta in discover_plugins(): 260 ↛ 261line 260 didn't jump to line 261 because the loop on line 260 never started

261 if meta.name == name: 

262 return meta 

263 raise PluginMetadataError(f"Plugin {name!r} not found", http_status=404) 

264 

265 

266def list_plugins() -> list[dict[str, Any]]: 

267 """List plugin metadata as dictionaries.""" 

268 return [ 

269 { 

270 "name": meta.name, 

271 "version": meta.version, 

272 "enabled": meta.enabled, 

273 } 

274 for meta in discover_plugins(strict=False) 

275 ] 

276 

277 

278def plugins_for_package(package: str) -> list[PluginMetadata]: 

279 """Return plugins belonging to a package name.""" 

280 pkg = canonicalize_name(package) 

281 return [ 

282 meta 

283 for meta in discover_plugins() 

284 if meta.dist_name and canonicalize_name(meta.dist_name) == pkg 

285 ]