Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/pluggy/_manager.py: 54%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

233 statements  

1from __future__ import annotations 

2 

3from collections.abc import Iterable 

4from collections.abc import Mapping 

5from collections.abc import Sequence 

6import inspect 

7import types 

8from typing import Any 

9from typing import Callable 

10from typing import cast 

11from typing import Final 

12from typing import TYPE_CHECKING 

13import warnings 

14 

15from . import _tracing 

16from ._callers import _multicall 

17from ._hooks import _HookImplFunction 

18from ._hooks import _Namespace 

19from ._hooks import _Plugin 

20from ._hooks import _SubsetHookCaller 

21from ._hooks import HookCaller 

22from ._hooks import HookImpl 

23from ._hooks import HookimplOpts 

24from ._hooks import HookRelay 

25from ._hooks import HookspecOpts 

26from ._hooks import normalize_hookimpl_opts 

27from ._result import Result 

28 

29 

30if TYPE_CHECKING: 

31 # importtlib.metadata import is slow, defer it. 

32 import importlib.metadata 

33 

34 

35_BeforeTrace = Callable[[str, Sequence[HookImpl], Mapping[str, Any]], None] 

36_AfterTrace = Callable[[Result[Any], str, Sequence[HookImpl], Mapping[str, Any]], None] 

37 

38 

39def _warn_for_function(warning: Warning, function: Callable[..., object]) -> None: 

40 func = cast(types.FunctionType, function) 

41 warnings.warn_explicit( 

42 warning, 

43 type(warning), 

44 lineno=func.__code__.co_firstlineno, 

45 filename=func.__code__.co_filename, 

46 ) 

47 

48 

49class PluginValidationError(Exception): 

50 """Plugin failed validation. 

51 

52 :param plugin: The plugin which failed validation. 

53 :param message: Error message. 

54 """ 

55 

56 def __init__(self, plugin: _Plugin, message: str) -> None: 

57 super().__init__(message) 

58 #: The plugin which failed validation. 

59 self.plugin = plugin 

60 

61 

62class DistFacade: 

63 """Emulate a pkg_resources Distribution""" 

64 

65 def __init__(self, dist: importlib.metadata.Distribution) -> None: 

66 self._dist = dist 

67 

68 @property 

69 def project_name(self) -> str: 

70 name: str = self.metadata["name"] 

71 return name 

72 

73 def __getattr__(self, attr: str, default: Any | None = None) -> Any: 

74 return getattr(self._dist, attr, default) 

75 

76 def __dir__(self) -> list[str]: 

77 return sorted(dir(self._dist) + ["_dist", "project_name"]) 

78 

79 

80class PluginManager: 

81 """Core class which manages registration of plugin objects and 1:N hook 

82 calling. 

83 

84 You can register new hooks by calling :meth:`add_hookspecs(module_or_class) 

85 <PluginManager.add_hookspecs>`. 

86 

87 You can register plugin objects (which contain hook implementations) by 

88 calling :meth:`register(plugin) <PluginManager.register>`. 

89 

90 For debugging purposes you can call :meth:`PluginManager.enable_tracing` 

91 which will subsequently send debug information to the trace helper. 

92 

93 :param project_name: 

94 The short project name. Prefer snake case. Make sure it's unique! 

95 """ 

96 

97 def __init__(self, project_name: str) -> None: 

98 #: The project name. 

99 self.project_name: Final = project_name 

100 self._name2plugin: Final[dict[str, _Plugin]] = {} 

101 self._plugin_distinfo: Final[list[tuple[_Plugin, DistFacade]]] = [] 

102 #: The "hook relay", used to call a hook on all registered plugins. 

103 #: See :ref:`calling`. 

104 self.hook: Final = HookRelay() 

105 #: The tracing entry point. See :ref:`tracing`. 

106 self.trace: Final[_tracing.TagTracerSub] = _tracing.TagTracer().get( 

107 "pluginmanage" 

108 ) 

109 self._inner_hookexec = _multicall 

110 

111 def _hookexec( 

112 self, 

113 hook_name: str, 

114 methods: Sequence[HookImpl], 

115 kwargs: Mapping[str, object], 

116 firstresult: bool, 

117 ) -> object | list[object]: 

118 # called from all hookcaller instances. 

119 # enable_tracing will set its own wrapping function at self._inner_hookexec 

120 return self._inner_hookexec(hook_name, methods, kwargs, firstresult) 

121 

122 def register(self, plugin: _Plugin, name: str | None = None) -> str | None: 

123 """Register a plugin and return its name. 

124 

125 :param name: 

126 The name under which to register the plugin. If not specified, a 

127 name is generated using :func:`get_canonical_name`. 

128 

129 :returns: 

130 The plugin name. If the name is blocked from registering, returns 

131 ``None``. 

132 

133 If the plugin is already registered, raises a :exc:`ValueError`. 

134 """ 

135 plugin_name = name or self.get_canonical_name(plugin) 

136 

137 if plugin_name in self._name2plugin: 

138 if self._name2plugin.get(plugin_name, -1) is None: 

139 return None # blocked plugin, return None to indicate no registration 

140 raise ValueError( 

141 "Plugin name already registered: " 

142 f"{plugin_name}={plugin}\n{self._name2plugin}" 

143 ) 

144 

145 if plugin in self._name2plugin.values(): 

146 raise ValueError( 

147 "Plugin already registered under a different name: " 

148 f"{plugin_name}={plugin}\n{self._name2plugin}" 

149 ) 

150 

151 # XXX if an error happens we should make sure no state has been 

152 # changed at point of return 

153 self._name2plugin[plugin_name] = plugin 

154 

155 # register matching hook implementations of the plugin 

156 for name in dir(plugin): 

157 hookimpl_opts = self.parse_hookimpl_opts(plugin, name) 

158 if hookimpl_opts is not None: 

159 normalize_hookimpl_opts(hookimpl_opts) 

160 method: _HookImplFunction[object] = getattr(plugin, name) 

161 hookimpl = HookImpl(plugin, plugin_name, method, hookimpl_opts) 

162 name = hookimpl_opts.get("specname") or name 

163 hook: HookCaller | None = getattr(self.hook, name, None) 

164 if hook is None: 

165 hook = HookCaller(name, self._hookexec) 

166 setattr(self.hook, name, hook) 

167 elif hook.has_spec(): 

168 self._verify_hook(hook, hookimpl) 

169 hook._maybe_apply_history(hookimpl) 

170 hook._add_hookimpl(hookimpl) 

171 return plugin_name 

172 

173 def parse_hookimpl_opts(self, plugin: _Plugin, name: str) -> HookimplOpts | None: 

174 """Try to obtain a hook implementation from an item with the given name 

175 in the given plugin which is being searched for hook impls. 

176 

177 :returns: 

178 The parsed hookimpl options, or None to skip the given item. 

179 

180 This method can be overridden by ``PluginManager`` subclasses to 

181 customize how hook implementation are picked up. By default, returns the 

182 options for items decorated with :class:`HookimplMarker`. 

183 """ 

184 method: object = getattr(plugin, name) 

185 if not inspect.isroutine(method): 

186 return None 

187 try: 

188 res: HookimplOpts | None = getattr( 

189 method, self.project_name + "_impl", None 

190 ) 

191 except Exception: # pragma: no cover 

192 res = {} # type: ignore[assignment] #pragma: no cover 

193 if res is not None and not isinstance(res, dict): 

194 # false positive 

195 res = None # type:ignore[unreachable] #pragma: no cover 

196 return res 

197 

198 def unregister( 

199 self, plugin: _Plugin | None = None, name: str | None = None 

200 ) -> Any | None: 

201 """Unregister a plugin and all of its hook implementations. 

202 

203 The plugin can be specified either by the plugin object or the plugin 

204 name. If both are specified, they must agree. 

205 

206 Returns the unregistered plugin, or ``None`` if not found. 

207 """ 

208 if name is None: 

209 assert plugin is not None, "one of name or plugin needs to be specified" 

210 name = self.get_name(plugin) 

211 assert name is not None, "plugin is not registered" 

212 

213 if plugin is None: 

214 plugin = self.get_plugin(name) 

215 if plugin is None: 

216 return None 

217 

218 hookcallers = self.get_hookcallers(plugin) 

219 if hookcallers: 

220 for hookcaller in hookcallers: 

221 hookcaller._remove_plugin(plugin) 

222 

223 # if self._name2plugin[name] == None registration was blocked: ignore 

224 if self._name2plugin.get(name): 

225 assert name is not None 

226 del self._name2plugin[name] 

227 

228 return plugin 

229 

230 def set_blocked(self, name: str) -> None: 

231 """Block registrations of the given name, unregister if already registered.""" 

232 self.unregister(name=name) 

233 self._name2plugin[name] = None 

234 

235 def is_blocked(self, name: str) -> bool: 

236 """Return whether the given plugin name is blocked.""" 

237 return name in self._name2plugin and self._name2plugin[name] is None 

238 

239 def unblock(self, name: str) -> bool: 

240 """Unblocks a name. 

241 

242 Returns whether the name was actually blocked. 

243 """ 

244 if self._name2plugin.get(name, -1) is None: 

245 del self._name2plugin[name] 

246 return True 

247 return False 

248 

249 def add_hookspecs(self, module_or_class: _Namespace) -> None: 

250 """Add new hook specifications defined in the given ``module_or_class``. 

251 

252 Functions are recognized as hook specifications if they have been 

253 decorated with a matching :class:`HookspecMarker`. 

254 """ 

255 names = [] 

256 for name in dir(module_or_class): 

257 spec_opts = self.parse_hookspec_opts(module_or_class, name) 

258 if spec_opts is not None: 

259 hc: HookCaller | None = getattr(self.hook, name, None) 

260 if hc is None: 

261 hc = HookCaller(name, self._hookexec, module_or_class, spec_opts) 

262 setattr(self.hook, name, hc) 

263 else: 

264 # Plugins registered this hook without knowing the spec. 

265 hc.set_specification(module_or_class, spec_opts) 

266 for hookfunction in hc.get_hookimpls(): 

267 self._verify_hook(hc, hookfunction) 

268 names.append(name) 

269 

270 if not names: 

271 raise ValueError( 

272 f"did not find any {self.project_name!r} hooks in {module_or_class!r}" 

273 ) 

274 

275 def parse_hookspec_opts( 

276 self, module_or_class: _Namespace, name: str 

277 ) -> HookspecOpts | None: 

278 """Try to obtain a hook specification from an item with the given name 

279 in the given module or class which is being searched for hook specs. 

280 

281 :returns: 

282 The parsed hookspec options for defining a hook, or None to skip the 

283 given item. 

284 

285 This method can be overridden by ``PluginManager`` subclasses to 

286 customize how hook specifications are picked up. By default, returns the 

287 options for items decorated with :class:`HookspecMarker`. 

288 """ 

289 method = getattr(module_or_class, name) 

290 opts: HookspecOpts | None = getattr(method, self.project_name + "_spec", None) 

291 return opts 

292 

293 def get_plugins(self) -> set[Any]: 

294 """Return a set of all registered plugin objects.""" 

295 return {x for x in self._name2plugin.values() if x is not None} 

296 

297 def is_registered(self, plugin: _Plugin) -> bool: 

298 """Return whether the plugin is already registered.""" 

299 return any(plugin == val for val in self._name2plugin.values()) 

300 

301 def get_canonical_name(self, plugin: _Plugin) -> str: 

302 """Return a canonical name for a plugin object. 

303 

304 Note that a plugin may be registered under a different name 

305 specified by the caller of :meth:`register(plugin, name) <register>`. 

306 To obtain the name of a registered plugin use :meth:`get_name(plugin) 

307 <get_name>` instead. 

308 """ 

309 name: str | None = getattr(plugin, "__name__", None) 

310 return name or str(id(plugin)) 

311 

312 def get_plugin(self, name: str) -> Any | None: 

313 """Return the plugin registered under the given name, if any.""" 

314 return self._name2plugin.get(name) 

315 

316 def has_plugin(self, name: str) -> bool: 

317 """Return whether a plugin with the given name is registered.""" 

318 return self.get_plugin(name) is not None 

319 

320 def get_name(self, plugin: _Plugin) -> str | None: 

321 """Return the name the plugin is registered under, or ``None`` if 

322 is isn't.""" 

323 for name, val in self._name2plugin.items(): 

324 if plugin == val: 

325 return name 

326 return None 

327 

328 def _verify_hook(self, hook: HookCaller, hookimpl: HookImpl) -> None: 

329 if hook.is_historic() and (hookimpl.hookwrapper or hookimpl.wrapper): 

330 raise PluginValidationError( 

331 hookimpl.plugin, 

332 f"Plugin {hookimpl.plugin_name!r}\nhook {hook.name!r}\n" 

333 "historic incompatible with yield/wrapper/hookwrapper", 

334 ) 

335 

336 assert hook.spec is not None 

337 if hook.spec.warn_on_impl: 

338 _warn_for_function(hook.spec.warn_on_impl, hookimpl.function) 

339 

340 # positional arg checking 

341 notinspec = set(hookimpl.argnames) - set(hook.spec.argnames) 

342 if notinspec: 

343 raise PluginValidationError( 

344 hookimpl.plugin, 

345 f"Plugin {hookimpl.plugin_name!r} for hook {hook.name!r}\n" 

346 f"hookimpl definition: {_formatdef(hookimpl.function)}\n" 

347 f"Argument(s) {notinspec} are declared in the hookimpl but " 

348 "can not be found in the hookspec", 

349 ) 

350 

351 if hook.spec.warn_on_impl_args: 

352 for hookimpl_argname in hookimpl.argnames: 

353 argname_warning = hook.spec.warn_on_impl_args.get(hookimpl_argname) 

354 if argname_warning is not None: 

355 _warn_for_function(argname_warning, hookimpl.function) 

356 

357 if ( 

358 hookimpl.wrapper or hookimpl.hookwrapper 

359 ) and not inspect.isgeneratorfunction(hookimpl.function): 

360 raise PluginValidationError( 

361 hookimpl.plugin, 

362 f"Plugin {hookimpl.plugin_name!r} for hook {hook.name!r}\n" 

363 f"hookimpl definition: {_formatdef(hookimpl.function)}\n" 

364 "Declared as wrapper=True or hookwrapper=True " 

365 "but function is not a generator function", 

366 ) 

367 

368 if hookimpl.wrapper and hookimpl.hookwrapper: 

369 raise PluginValidationError( 

370 hookimpl.plugin, 

371 f"Plugin {hookimpl.plugin_name!r} for hook {hook.name!r}\n" 

372 f"hookimpl definition: {_formatdef(hookimpl.function)}\n" 

373 "The wrapper=True and hookwrapper=True options are mutually exclusive", 

374 ) 

375 

376 def check_pending(self) -> None: 

377 """Verify that all hooks which have not been verified against a 

378 hook specification are optional, otherwise raise 

379 :exc:`PluginValidationError`.""" 

380 for name in self.hook.__dict__: 

381 if name[0] == "_": 

382 continue 

383 hook: HookCaller = getattr(self.hook, name) 

384 if not hook.has_spec(): 

385 for hookimpl in hook.get_hookimpls(): 

386 if not hookimpl.optionalhook: 

387 raise PluginValidationError( 

388 hookimpl.plugin, 

389 f"unknown hook {name!r} in plugin {hookimpl.plugin!r}", 

390 ) 

391 

392 def load_setuptools_entrypoints(self, group: str, name: str | None = None) -> int: 

393 """Load modules from querying the specified setuptools ``group``. 

394 

395 :param group: 

396 Entry point group to load plugins. 

397 :param name: 

398 If given, loads only plugins with the given ``name``. 

399 

400 :return: 

401 The number of plugins loaded by this call. 

402 """ 

403 import importlib.metadata 

404 

405 count = 0 

406 for dist in list(importlib.metadata.distributions()): 

407 for ep in dist.entry_points: 

408 if ( 

409 ep.group != group 

410 or (name is not None and ep.name != name) 

411 # already registered 

412 or self.get_plugin(ep.name) 

413 or self.is_blocked(ep.name) 

414 ): 

415 continue 

416 plugin = ep.load() 

417 self.register(plugin, name=ep.name) 

418 self._plugin_distinfo.append((plugin, DistFacade(dist))) 

419 count += 1 

420 return count 

421 

422 def list_plugin_distinfo(self) -> list[tuple[_Plugin, DistFacade]]: 

423 """Return a list of (plugin, distinfo) pairs for all 

424 setuptools-registered plugins.""" 

425 return list(self._plugin_distinfo) 

426 

427 def list_name_plugin(self) -> list[tuple[str, _Plugin]]: 

428 """Return a list of (name, plugin) pairs for all registered plugins.""" 

429 return list(self._name2plugin.items()) 

430 

431 def get_hookcallers(self, plugin: _Plugin) -> list[HookCaller] | None: 

432 """Get all hook callers for the specified plugin. 

433 

434 :returns: 

435 The hook callers, or ``None`` if ``plugin`` is not registered in 

436 this plugin manager. 

437 """ 

438 if self.get_name(plugin) is None: 

439 return None 

440 hookcallers = [] 

441 for hookcaller in self.hook.__dict__.values(): 

442 for hookimpl in hookcaller.get_hookimpls(): 

443 if hookimpl.plugin is plugin: 

444 hookcallers.append(hookcaller) 

445 return hookcallers 

446 

447 def add_hookcall_monitoring( 

448 self, before: _BeforeTrace, after: _AfterTrace 

449 ) -> Callable[[], None]: 

450 """Add before/after tracing functions for all hooks. 

451 

452 Returns an undo function which, when called, removes the added tracers. 

453 

454 ``before(hook_name, hook_impls, kwargs)`` will be called ahead 

455 of all hook calls and receive a hookcaller instance, a list 

456 of HookImpl instances and the keyword arguments for the hook call. 

457 

458 ``after(outcome, hook_name, hook_impls, kwargs)`` receives the 

459 same arguments as ``before`` but also a :class:`~pluggy.Result` object 

460 which represents the result of the overall hook call. 

461 """ 

462 oldcall = self._inner_hookexec 

463 

464 def traced_hookexec( 

465 hook_name: str, 

466 hook_impls: Sequence[HookImpl], 

467 caller_kwargs: Mapping[str, object], 

468 firstresult: bool, 

469 ) -> object | list[object]: 

470 before(hook_name, hook_impls, caller_kwargs) 

471 outcome = Result.from_call( 

472 lambda: oldcall(hook_name, hook_impls, caller_kwargs, firstresult) 

473 ) 

474 after(outcome, hook_name, hook_impls, caller_kwargs) 

475 return outcome.get_result() 

476 

477 self._inner_hookexec = traced_hookexec 

478 

479 def undo() -> None: 

480 self._inner_hookexec = oldcall 

481 

482 return undo 

483 

484 def enable_tracing(self) -> Callable[[], None]: 

485 """Enable tracing of hook calls. 

486 

487 Returns an undo function which, when called, removes the added tracing. 

488 """ 

489 hooktrace = self.trace.root.get("hook") 

490 

491 def before( 

492 hook_name: str, methods: Sequence[HookImpl], kwargs: Mapping[str, object] 

493 ) -> None: 

494 hooktrace.root.indent += 1 

495 hooktrace(hook_name, kwargs) 

496 

497 def after( 

498 outcome: Result[object], 

499 hook_name: str, 

500 methods: Sequence[HookImpl], 

501 kwargs: Mapping[str, object], 

502 ) -> None: 

503 if outcome.exception is None: 

504 hooktrace("finish", hook_name, "-->", outcome.get_result()) 

505 hooktrace.root.indent -= 1 

506 

507 return self.add_hookcall_monitoring(before, after) 

508 

509 def subset_hook_caller( 

510 self, name: str, remove_plugins: Iterable[_Plugin] 

511 ) -> HookCaller: 

512 """Return a proxy :class:`~pluggy.HookCaller` instance for the named 

513 method which manages calls to all registered plugins except the ones 

514 from remove_plugins.""" 

515 orig: HookCaller = getattr(self.hook, name) 

516 plugins_to_remove = {plug for plug in remove_plugins if hasattr(plug, name)} 

517 if plugins_to_remove: 

518 return _SubsetHookCaller(orig, plugins_to_remove) 

519 return orig 

520 

521 

522def _formatdef(func: Callable[..., object]) -> str: 

523 return f"{func.__name__}{inspect.signature(func)}"