Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/pluggy/_manager.py: 54%
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1from __future__ import annotations
3from collections.abc import Iterable
4from collections.abc import Mapping
5from collections.abc import Sequence
6import inspect
7import types
8from typing import Any
9from typing import Callable
10from typing import cast
11from typing import Final
12from typing import TYPE_CHECKING
13import warnings
15from . import _tracing
16from ._callers import _multicall
17from ._hooks import _HookImplFunction
18from ._hooks import _Namespace
19from ._hooks import _Plugin
20from ._hooks import _SubsetHookCaller
21from ._hooks import HookCaller
22from ._hooks import HookImpl
23from ._hooks import HookimplOpts
24from ._hooks import HookRelay
25from ._hooks import HookspecOpts
26from ._hooks import normalize_hookimpl_opts
27from ._result import Result
30if TYPE_CHECKING:
31 # importtlib.metadata import is slow, defer it.
32 import importlib.metadata
35_BeforeTrace = Callable[[str, Sequence[HookImpl], Mapping[str, Any]], None]
36_AfterTrace = Callable[[Result[Any], str, Sequence[HookImpl], Mapping[str, Any]], None]
39def _warn_for_function(warning: Warning, function: Callable[..., object]) -> None:
40 func = cast(types.FunctionType, function)
41 warnings.warn_explicit(
42 warning,
43 type(warning),
44 lineno=func.__code__.co_firstlineno,
45 filename=func.__code__.co_filename,
46 )
49class PluginValidationError(Exception):
50 """Plugin failed validation.
52 :param plugin: The plugin which failed validation.
53 :param message: Error message.
54 """
56 def __init__(self, plugin: _Plugin, message: str) -> None:
57 super().__init__(message)
58 #: The plugin which failed validation.
59 self.plugin = plugin
62class DistFacade:
63 """Emulate a pkg_resources Distribution"""
65 def __init__(self, dist: importlib.metadata.Distribution) -> None:
66 self._dist = dist
68 @property
69 def project_name(self) -> str:
70 name: str = self.metadata["name"]
71 return name
73 def __getattr__(self, attr: str, default: Any | None = None) -> Any:
74 return getattr(self._dist, attr, default)
76 def __dir__(self) -> list[str]:
77 return sorted(dir(self._dist) + ["_dist", "project_name"])
80class PluginManager:
81 """Core class which manages registration of plugin objects and 1:N hook
82 calling.
84 You can register new hooks by calling :meth:`add_hookspecs(module_or_class)
85 <PluginManager.add_hookspecs>`.
87 You can register plugin objects (which contain hook implementations) by
88 calling :meth:`register(plugin) <PluginManager.register>`.
90 For debugging purposes you can call :meth:`PluginManager.enable_tracing`
91 which will subsequently send debug information to the trace helper.
93 :param project_name:
94 The short project name. Prefer snake case. Make sure it's unique!
95 """
97 def __init__(self, project_name: str) -> None:
98 #: The project name.
99 self.project_name: Final = project_name
100 self._name2plugin: Final[dict[str, _Plugin]] = {}
101 self._plugin_distinfo: Final[list[tuple[_Plugin, DistFacade]]] = []
102 #: The "hook relay", used to call a hook on all registered plugins.
103 #: See :ref:`calling`.
104 self.hook: Final = HookRelay()
105 #: The tracing entry point. See :ref:`tracing`.
106 self.trace: Final[_tracing.TagTracerSub] = _tracing.TagTracer().get(
107 "pluginmanage"
108 )
109 self._inner_hookexec = _multicall
111 def _hookexec(
112 self,
113 hook_name: str,
114 methods: Sequence[HookImpl],
115 kwargs: Mapping[str, object],
116 firstresult: bool,
117 ) -> object | list[object]:
118 # called from all hookcaller instances.
119 # enable_tracing will set its own wrapping function at self._inner_hookexec
120 return self._inner_hookexec(hook_name, methods, kwargs, firstresult)
122 def register(self, plugin: _Plugin, name: str | None = None) -> str | None:
123 """Register a plugin and return its name.
125 :param name:
126 The name under which to register the plugin. If not specified, a
127 name is generated using :func:`get_canonical_name`.
129 :returns:
130 The plugin name. If the name is blocked from registering, returns
131 ``None``.
133 If the plugin is already registered, raises a :exc:`ValueError`.
134 """
135 plugin_name = name or self.get_canonical_name(plugin)
137 if plugin_name in self._name2plugin:
138 if self._name2plugin.get(plugin_name, -1) is None:
139 return None # blocked plugin, return None to indicate no registration
140 raise ValueError(
141 "Plugin name already registered: "
142 f"{plugin_name}={plugin}\n{self._name2plugin}"
143 )
145 if plugin in self._name2plugin.values():
146 raise ValueError(
147 "Plugin already registered under a different name: "
148 f"{plugin_name}={plugin}\n{self._name2plugin}"
149 )
151 # XXX if an error happens we should make sure no state has been
152 # changed at point of return
153 self._name2plugin[plugin_name] = plugin
155 # register matching hook implementations of the plugin
156 for name in dir(plugin):
157 hookimpl_opts = self.parse_hookimpl_opts(plugin, name)
158 if hookimpl_opts is not None:
159 normalize_hookimpl_opts(hookimpl_opts)
160 method: _HookImplFunction[object] = getattr(plugin, name)
161 hookimpl = HookImpl(plugin, plugin_name, method, hookimpl_opts)
162 name = hookimpl_opts.get("specname") or name
163 hook: HookCaller | None = getattr(self.hook, name, None)
164 if hook is None:
165 hook = HookCaller(name, self._hookexec)
166 setattr(self.hook, name, hook)
167 elif hook.has_spec():
168 self._verify_hook(hook, hookimpl)
169 hook._maybe_apply_history(hookimpl)
170 hook._add_hookimpl(hookimpl)
171 return plugin_name
173 def parse_hookimpl_opts(self, plugin: _Plugin, name: str) -> HookimplOpts | None:
174 """Try to obtain a hook implementation from an item with the given name
175 in the given plugin which is being searched for hook impls.
177 :returns:
178 The parsed hookimpl options, or None to skip the given item.
180 This method can be overridden by ``PluginManager`` subclasses to
181 customize how hook implementation are picked up. By default, returns the
182 options for items decorated with :class:`HookimplMarker`.
183 """
184 method: object = getattr(plugin, name)
185 if not inspect.isroutine(method):
186 return None
187 try:
188 res: HookimplOpts | None = getattr(
189 method, self.project_name + "_impl", None
190 )
191 except Exception: # pragma: no cover
192 res = {} # type: ignore[assignment] #pragma: no cover
193 if res is not None and not isinstance(res, dict):
194 # false positive
195 res = None # type:ignore[unreachable] #pragma: no cover
196 return res
198 def unregister(
199 self, plugin: _Plugin | None = None, name: str | None = None
200 ) -> Any | None:
201 """Unregister a plugin and all of its hook implementations.
203 The plugin can be specified either by the plugin object or the plugin
204 name. If both are specified, they must agree.
206 Returns the unregistered plugin, or ``None`` if not found.
207 """
208 if name is None:
209 assert plugin is not None, "one of name or plugin needs to be specified"
210 name = self.get_name(plugin)
211 assert name is not None, "plugin is not registered"
213 if plugin is None:
214 plugin = self.get_plugin(name)
215 if plugin is None:
216 return None
218 hookcallers = self.get_hookcallers(plugin)
219 if hookcallers:
220 for hookcaller in hookcallers:
221 hookcaller._remove_plugin(plugin)
223 # if self._name2plugin[name] == None registration was blocked: ignore
224 if self._name2plugin.get(name):
225 assert name is not None
226 del self._name2plugin[name]
228 return plugin
230 def set_blocked(self, name: str) -> None:
231 """Block registrations of the given name, unregister if already registered."""
232 self.unregister(name=name)
233 self._name2plugin[name] = None
235 def is_blocked(self, name: str) -> bool:
236 """Return whether the given plugin name is blocked."""
237 return name in self._name2plugin and self._name2plugin[name] is None
239 def unblock(self, name: str) -> bool:
240 """Unblocks a name.
242 Returns whether the name was actually blocked.
243 """
244 if self._name2plugin.get(name, -1) is None:
245 del self._name2plugin[name]
246 return True
247 return False
249 def add_hookspecs(self, module_or_class: _Namespace) -> None:
250 """Add new hook specifications defined in the given ``module_or_class``.
252 Functions are recognized as hook specifications if they have been
253 decorated with a matching :class:`HookspecMarker`.
254 """
255 names = []
256 for name in dir(module_or_class):
257 spec_opts = self.parse_hookspec_opts(module_or_class, name)
258 if spec_opts is not None:
259 hc: HookCaller | None = getattr(self.hook, name, None)
260 if hc is None:
261 hc = HookCaller(name, self._hookexec, module_or_class, spec_opts)
262 setattr(self.hook, name, hc)
263 else:
264 # Plugins registered this hook without knowing the spec.
265 hc.set_specification(module_or_class, spec_opts)
266 for hookfunction in hc.get_hookimpls():
267 self._verify_hook(hc, hookfunction)
268 names.append(name)
270 if not names:
271 raise ValueError(
272 f"did not find any {self.project_name!r} hooks in {module_or_class!r}"
273 )
275 def parse_hookspec_opts(
276 self, module_or_class: _Namespace, name: str
277 ) -> HookspecOpts | None:
278 """Try to obtain a hook specification from an item with the given name
279 in the given module or class which is being searched for hook specs.
281 :returns:
282 The parsed hookspec options for defining a hook, or None to skip the
283 given item.
285 This method can be overridden by ``PluginManager`` subclasses to
286 customize how hook specifications are picked up. By default, returns the
287 options for items decorated with :class:`HookspecMarker`.
288 """
289 method = getattr(module_or_class, name)
290 opts: HookspecOpts | None = getattr(method, self.project_name + "_spec", None)
291 return opts
293 def get_plugins(self) -> set[Any]:
294 """Return a set of all registered plugin objects."""
295 return {x for x in self._name2plugin.values() if x is not None}
297 def is_registered(self, plugin: _Plugin) -> bool:
298 """Return whether the plugin is already registered."""
299 return any(plugin == val for val in self._name2plugin.values())
301 def get_canonical_name(self, plugin: _Plugin) -> str:
302 """Return a canonical name for a plugin object.
304 Note that a plugin may be registered under a different name
305 specified by the caller of :meth:`register(plugin, name) <register>`.
306 To obtain the name of a registered plugin use :meth:`get_name(plugin)
307 <get_name>` instead.
308 """
309 name: str | None = getattr(plugin, "__name__", None)
310 return name or str(id(plugin))
312 def get_plugin(self, name: str) -> Any | None:
313 """Return the plugin registered under the given name, if any."""
314 return self._name2plugin.get(name)
316 def has_plugin(self, name: str) -> bool:
317 """Return whether a plugin with the given name is registered."""
318 return self.get_plugin(name) is not None
320 def get_name(self, plugin: _Plugin) -> str | None:
321 """Return the name the plugin is registered under, or ``None`` if
322 is isn't."""
323 for name, val in self._name2plugin.items():
324 if plugin == val:
325 return name
326 return None
328 def _verify_hook(self, hook: HookCaller, hookimpl: HookImpl) -> None:
329 if hook.is_historic() and (hookimpl.hookwrapper or hookimpl.wrapper):
330 raise PluginValidationError(
331 hookimpl.plugin,
332 f"Plugin {hookimpl.plugin_name!r}\nhook {hook.name!r}\n"
333 "historic incompatible with yield/wrapper/hookwrapper",
334 )
336 assert hook.spec is not None
337 if hook.spec.warn_on_impl:
338 _warn_for_function(hook.spec.warn_on_impl, hookimpl.function)
340 # positional arg checking
341 notinspec = set(hookimpl.argnames) - set(hook.spec.argnames)
342 if notinspec:
343 raise PluginValidationError(
344 hookimpl.plugin,
345 f"Plugin {hookimpl.plugin_name!r} for hook {hook.name!r}\n"
346 f"hookimpl definition: {_formatdef(hookimpl.function)}\n"
347 f"Argument(s) {notinspec} are declared in the hookimpl but "
348 "can not be found in the hookspec",
349 )
351 if hook.spec.warn_on_impl_args:
352 for hookimpl_argname in hookimpl.argnames:
353 argname_warning = hook.spec.warn_on_impl_args.get(hookimpl_argname)
354 if argname_warning is not None:
355 _warn_for_function(argname_warning, hookimpl.function)
357 if (
358 hookimpl.wrapper or hookimpl.hookwrapper
359 ) and not inspect.isgeneratorfunction(hookimpl.function):
360 raise PluginValidationError(
361 hookimpl.plugin,
362 f"Plugin {hookimpl.plugin_name!r} for hook {hook.name!r}\n"
363 f"hookimpl definition: {_formatdef(hookimpl.function)}\n"
364 "Declared as wrapper=True or hookwrapper=True "
365 "but function is not a generator function",
366 )
368 if hookimpl.wrapper and hookimpl.hookwrapper:
369 raise PluginValidationError(
370 hookimpl.plugin,
371 f"Plugin {hookimpl.plugin_name!r} for hook {hook.name!r}\n"
372 f"hookimpl definition: {_formatdef(hookimpl.function)}\n"
373 "The wrapper=True and hookwrapper=True options are mutually exclusive",
374 )
376 def check_pending(self) -> None:
377 """Verify that all hooks which have not been verified against a
378 hook specification are optional, otherwise raise
379 :exc:`PluginValidationError`."""
380 for name in self.hook.__dict__:
381 if name[0] == "_":
382 continue
383 hook: HookCaller = getattr(self.hook, name)
384 if not hook.has_spec():
385 for hookimpl in hook.get_hookimpls():
386 if not hookimpl.optionalhook:
387 raise PluginValidationError(
388 hookimpl.plugin,
389 f"unknown hook {name!r} in plugin {hookimpl.plugin!r}",
390 )
392 def load_setuptools_entrypoints(self, group: str, name: str | None = None) -> int:
393 """Load modules from querying the specified setuptools ``group``.
395 :param group:
396 Entry point group to load plugins.
397 :param name:
398 If given, loads only plugins with the given ``name``.
400 :return:
401 The number of plugins loaded by this call.
402 """
403 import importlib.metadata
405 count = 0
406 for dist in list(importlib.metadata.distributions()):
407 for ep in dist.entry_points:
408 if (
409 ep.group != group
410 or (name is not None and ep.name != name)
411 # already registered
412 or self.get_plugin(ep.name)
413 or self.is_blocked(ep.name)
414 ):
415 continue
416 plugin = ep.load()
417 self.register(plugin, name=ep.name)
418 self._plugin_distinfo.append((plugin, DistFacade(dist)))
419 count += 1
420 return count
422 def list_plugin_distinfo(self) -> list[tuple[_Plugin, DistFacade]]:
423 """Return a list of (plugin, distinfo) pairs for all
424 setuptools-registered plugins."""
425 return list(self._plugin_distinfo)
427 def list_name_plugin(self) -> list[tuple[str, _Plugin]]:
428 """Return a list of (name, plugin) pairs for all registered plugins."""
429 return list(self._name2plugin.items())
431 def get_hookcallers(self, plugin: _Plugin) -> list[HookCaller] | None:
432 """Get all hook callers for the specified plugin.
434 :returns:
435 The hook callers, or ``None`` if ``plugin`` is not registered in
436 this plugin manager.
437 """
438 if self.get_name(plugin) is None:
439 return None
440 hookcallers = []
441 for hookcaller in self.hook.__dict__.values():
442 for hookimpl in hookcaller.get_hookimpls():
443 if hookimpl.plugin is plugin:
444 hookcallers.append(hookcaller)
445 return hookcallers
447 def add_hookcall_monitoring(
448 self, before: _BeforeTrace, after: _AfterTrace
449 ) -> Callable[[], None]:
450 """Add before/after tracing functions for all hooks.
452 Returns an undo function which, when called, removes the added tracers.
454 ``before(hook_name, hook_impls, kwargs)`` will be called ahead
455 of all hook calls and receive a hookcaller instance, a list
456 of HookImpl instances and the keyword arguments for the hook call.
458 ``after(outcome, hook_name, hook_impls, kwargs)`` receives the
459 same arguments as ``before`` but also a :class:`~pluggy.Result` object
460 which represents the result of the overall hook call.
461 """
462 oldcall = self._inner_hookexec
464 def traced_hookexec(
465 hook_name: str,
466 hook_impls: Sequence[HookImpl],
467 caller_kwargs: Mapping[str, object],
468 firstresult: bool,
469 ) -> object | list[object]:
470 before(hook_name, hook_impls, caller_kwargs)
471 outcome = Result.from_call(
472 lambda: oldcall(hook_name, hook_impls, caller_kwargs, firstresult)
473 )
474 after(outcome, hook_name, hook_impls, caller_kwargs)
475 return outcome.get_result()
477 self._inner_hookexec = traced_hookexec
479 def undo() -> None:
480 self._inner_hookexec = oldcall
482 return undo
484 def enable_tracing(self) -> Callable[[], None]:
485 """Enable tracing of hook calls.
487 Returns an undo function which, when called, removes the added tracing.
488 """
489 hooktrace = self.trace.root.get("hook")
491 def before(
492 hook_name: str, methods: Sequence[HookImpl], kwargs: Mapping[str, object]
493 ) -> None:
494 hooktrace.root.indent += 1
495 hooktrace(hook_name, kwargs)
497 def after(
498 outcome: Result[object],
499 hook_name: str,
500 methods: Sequence[HookImpl],
501 kwargs: Mapping[str, object],
502 ) -> None:
503 if outcome.exception is None:
504 hooktrace("finish", hook_name, "-->", outcome.get_result())
505 hooktrace.root.indent -= 1
507 return self.add_hookcall_monitoring(before, after)
509 def subset_hook_caller(
510 self, name: str, remove_plugins: Iterable[_Plugin]
511 ) -> HookCaller:
512 """Return a proxy :class:`~pluggy.HookCaller` instance for the named
513 method which manages calls to all registered plugins except the ones
514 from remove_plugins."""
515 orig: HookCaller = getattr(self.hook, name)
516 plugins_to_remove = {plug for plug in remove_plugins if hasattr(plug, name)}
517 if plugins_to_remove:
518 return _SubsetHookCaller(orig, plugins_to_remove)
519 return orig
522def _formatdef(func: Callable[..., object]) -> str:
523 return f"{func.__name__}{inspect.signature(func)}"