Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/pluggy/_manager.py: 53%
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1from __future__ import annotations
3import inspect
4import types
5from typing import Any
6from typing import Callable
7from typing import cast
8from typing import Final
9from typing import Iterable
10from typing import Mapping
11from typing import Sequence
12from typing import TYPE_CHECKING
13import warnings
15from . import _tracing
16from ._callers import _multicall
17from ._hooks import _HookImplFunction
18from ._hooks import _Namespace
19from ._hooks import _Plugin
20from ._hooks import _SubsetHookCaller
21from ._hooks import HookCaller
22from ._hooks import HookImpl
23from ._hooks import HookimplOpts
24from ._hooks import HookRelay
25from ._hooks import HookspecOpts
26from ._hooks import normalize_hookimpl_opts
27from ._result import Result
30if TYPE_CHECKING:
31 # importtlib.metadata import is slow, defer it.
32 import importlib.metadata
35_BeforeTrace = Callable[[str, Sequence[HookImpl], Mapping[str, Any]], None]
36_AfterTrace = Callable[[Result[Any], str, Sequence[HookImpl], Mapping[str, Any]], None]
39def _warn_for_function(warning: Warning, function: Callable[..., object]) -> None:
40 func = cast(types.FunctionType, function)
41 warnings.warn_explicit(
42 warning,
43 type(warning),
44 lineno=func.__code__.co_firstlineno,
45 filename=func.__code__.co_filename,
46 )
49class PluginValidationError(Exception):
50 """Plugin failed validation.
52 :param plugin: The plugin which failed validation.
53 :param message: Error message.
54 """
56 def __init__(self, plugin: _Plugin, message: str) -> None:
57 super().__init__(message)
58 #: The plugin which failed validation.
59 self.plugin = plugin
62class DistFacade:
63 """Emulate a pkg_resources Distribution"""
65 def __init__(self, dist: importlib.metadata.Distribution) -> None:
66 self._dist = dist
68 @property
69 def project_name(self) -> str:
70 name: str = self.metadata["name"]
71 return name
73 def __getattr__(self, attr: str, default=None):
74 return getattr(self._dist, attr, default)
76 def __dir__(self) -> list[str]:
77 return sorted(dir(self._dist) + ["_dist", "project_name"])
80class PluginManager:
81 """Core class which manages registration of plugin objects and 1:N hook
82 calling.
84 You can register new hooks by calling :meth:`add_hookspecs(module_or_class)
85 <PluginManager.add_hookspecs>`.
87 You can register plugin objects (which contain hook implementations) by
88 calling :meth:`register(plugin) <PluginManager.register>`.
90 For debugging purposes you can call :meth:`PluginManager.enable_tracing`
91 which will subsequently send debug information to the trace helper.
93 :param project_name:
94 The short project name. Prefer snake case. Make sure it's unique!
95 """
97 def __init__(self, project_name: str) -> None:
98 #: The project name.
99 self.project_name: Final = project_name
100 self._name2plugin: Final[dict[str, _Plugin]] = {}
101 self._plugin_distinfo: Final[list[tuple[_Plugin, DistFacade]]] = []
102 #: The "hook relay", used to call a hook on all registered plugins.
103 #: See :ref:`calling`.
104 self.hook: Final = HookRelay()
105 #: The tracing entry point. See :ref:`tracing`.
106 self.trace: Final[_tracing.TagTracerSub] = _tracing.TagTracer().get(
107 "pluginmanage"
108 )
109 self._inner_hookexec = _multicall
111 def _hookexec(
112 self,
113 hook_name: str,
114 methods: Sequence[HookImpl],
115 kwargs: Mapping[str, object],
116 firstresult: bool,
117 ) -> object | list[object]:
118 # called from all hookcaller instances.
119 # enable_tracing will set its own wrapping function at self._inner_hookexec
120 return self._inner_hookexec(hook_name, methods, kwargs, firstresult)
122 def register(self, plugin: _Plugin, name: str | None = None) -> str | None:
123 """Register a plugin and return its name.
125 :param name:
126 The name under which to register the plugin. If not specified, a
127 name is generated using :func:`get_canonical_name`.
129 :returns:
130 The plugin name. If the name is blocked from registering, returns
131 ``None``.
133 If the plugin is already registered, raises a :exc:`ValueError`.
134 """
135 plugin_name = name or self.get_canonical_name(plugin)
137 if plugin_name in self._name2plugin:
138 if self._name2plugin.get(plugin_name, -1) is None:
139 return None # blocked plugin, return None to indicate no registration
140 raise ValueError(
141 "Plugin name already registered: %s=%s\n%s"
142 % (plugin_name, plugin, self._name2plugin)
143 )
145 if plugin in self._name2plugin.values():
146 raise ValueError(
147 "Plugin already registered under a different name: %s=%s\n%s"
148 % (plugin_name, plugin, self._name2plugin)
149 )
151 # XXX if an error happens we should make sure no state has been
152 # changed at point of return
153 self._name2plugin[plugin_name] = plugin
155 # register matching hook implementations of the plugin
156 for name in dir(plugin):
157 hookimpl_opts = self.parse_hookimpl_opts(plugin, name)
158 if hookimpl_opts is not None:
159 normalize_hookimpl_opts(hookimpl_opts)
160 method: _HookImplFunction[object] = getattr(plugin, name)
161 hookimpl = HookImpl(plugin, plugin_name, method, hookimpl_opts)
162 name = hookimpl_opts.get("specname") or name
163 hook: HookCaller | None = getattr(self.hook, name, None)
164 if hook is None:
165 hook = HookCaller(name, self._hookexec)
166 setattr(self.hook, name, hook)
167 elif hook.has_spec():
168 self._verify_hook(hook, hookimpl)
169 hook._maybe_apply_history(hookimpl)
170 hook._add_hookimpl(hookimpl)
171 return plugin_name
173 def parse_hookimpl_opts(self, plugin: _Plugin, name: str) -> HookimplOpts | None:
174 """Try to obtain a hook implementation from an item with the given name
175 in the given plugin which is being searched for hook impls.
177 :returns:
178 The parsed hookimpl options, or None to skip the given item.
180 This method can be overridden by ``PluginManager`` subclasses to
181 customize how hook implementation are picked up. By default, returns the
182 options for items decorated with :class:`HookimplMarker`.
183 """
184 method: object = getattr(plugin, name)
185 if not inspect.isroutine(method):
186 return None
187 try:
188 res: HookimplOpts | None = getattr(
189 method, self.project_name + "_impl", None
190 )
191 except Exception:
192 res = {} # type: ignore[assignment]
193 if res is not None and not isinstance(res, dict):
194 # false positive
195 res = None # type:ignore[unreachable]
196 return res
198 def unregister(
199 self, plugin: _Plugin | None = None, name: str | None = None
200 ) -> Any | None:
201 """Unregister a plugin and all of its hook implementations.
203 The plugin can be specified either by the plugin object or the plugin
204 name. If both are specified, they must agree.
206 Returns the unregistered plugin, or ``None`` if not found.
207 """
208 if name is None:
209 assert plugin is not None, "one of name or plugin needs to be specified"
210 name = self.get_name(plugin)
211 assert name is not None, "plugin is not registered"
213 if plugin is None:
214 plugin = self.get_plugin(name)
215 if plugin is None:
216 return None
218 hookcallers = self.get_hookcallers(plugin)
219 if hookcallers:
220 for hookcaller in hookcallers:
221 hookcaller._remove_plugin(plugin)
223 # if self._name2plugin[name] == None registration was blocked: ignore
224 if self._name2plugin.get(name):
225 assert name is not None
226 del self._name2plugin[name]
228 return plugin
230 def set_blocked(self, name: str) -> None:
231 """Block registrations of the given name, unregister if already registered."""
232 self.unregister(name=name)
233 self._name2plugin[name] = None
235 def is_blocked(self, name: str) -> bool:
236 """Return whether the given plugin name is blocked."""
237 return name in self._name2plugin and self._name2plugin[name] is None
239 def unblock(self, name: str) -> bool:
240 """Unblocks a name.
242 Returns whether the name was actually blocked.
243 """
244 if self._name2plugin.get(name, -1) is None:
245 del self._name2plugin[name]
246 return True
247 return False
249 def add_hookspecs(self, module_or_class: _Namespace) -> None:
250 """Add new hook specifications defined in the given ``module_or_class``.
252 Functions are recognized as hook specifications if they have been
253 decorated with a matching :class:`HookspecMarker`.
254 """
255 names = []
256 for name in dir(module_or_class):
257 spec_opts = self.parse_hookspec_opts(module_or_class, name)
258 if spec_opts is not None:
259 hc: HookCaller | None = getattr(self.hook, name, None)
260 if hc is None:
261 hc = HookCaller(name, self._hookexec, module_or_class, spec_opts)
262 setattr(self.hook, name, hc)
263 else:
264 # Plugins registered this hook without knowing the spec.
265 hc.set_specification(module_or_class, spec_opts)
266 for hookfunction in hc.get_hookimpls():
267 self._verify_hook(hc, hookfunction)
268 names.append(name)
270 if not names:
271 raise ValueError(
272 f"did not find any {self.project_name!r} hooks in {module_or_class!r}"
273 )
275 def parse_hookspec_opts(
276 self, module_or_class: _Namespace, name: str
277 ) -> HookspecOpts | None:
278 """Try to obtain a hook specification from an item with the given name
279 in the given module or class which is being searched for hook specs.
281 :returns:
282 The parsed hookspec options for defining a hook, or None to skip the
283 given item.
285 This method can be overridden by ``PluginManager`` subclasses to
286 customize how hook specifications are picked up. By default, returns the
287 options for items decorated with :class:`HookspecMarker`.
288 """
289 method = getattr(module_or_class, name)
290 opts: HookspecOpts | None = getattr(method, self.project_name + "_spec", None)
291 return opts
293 def get_plugins(self) -> set[Any]:
294 """Return a set of all registered plugin objects."""
295 return {x for x in self._name2plugin.values() if x is not None}
297 def is_registered(self, plugin: _Plugin) -> bool:
298 """Return whether the plugin is already registered."""
299 return any(plugin == val for val in self._name2plugin.values())
301 def get_canonical_name(self, plugin: _Plugin) -> str:
302 """Return a canonical name for a plugin object.
304 Note that a plugin may be registered under a different name
305 specified by the caller of :meth:`register(plugin, name) <register>`.
306 To obtain the name of a registered plugin use :meth:`get_name(plugin)
307 <get_name>` instead.
308 """
309 name: str | None = getattr(plugin, "__name__", None)
310 return name or str(id(plugin))
312 def get_plugin(self, name: str) -> Any | None:
313 """Return the plugin registered under the given name, if any."""
314 return self._name2plugin.get(name)
316 def has_plugin(self, name: str) -> bool:
317 """Return whether a plugin with the given name is registered."""
318 return self.get_plugin(name) is not None
320 def get_name(self, plugin: _Plugin) -> str | None:
321 """Return the name the plugin is registered under, or ``None`` if
322 is isn't."""
323 for name, val in self._name2plugin.items():
324 if plugin == val:
325 return name
326 return None
328 def _verify_hook(self, hook: HookCaller, hookimpl: HookImpl) -> None:
329 if hook.is_historic() and (hookimpl.hookwrapper or hookimpl.wrapper):
330 raise PluginValidationError(
331 hookimpl.plugin,
332 "Plugin %r\nhook %r\nhistoric incompatible with yield/wrapper/hookwrapper"
333 % (hookimpl.plugin_name, hook.name),
334 )
336 assert hook.spec is not None
337 if hook.spec.warn_on_impl:
338 _warn_for_function(hook.spec.warn_on_impl, hookimpl.function)
340 # positional arg checking
341 notinspec = set(hookimpl.argnames) - set(hook.spec.argnames)
342 if notinspec:
343 raise PluginValidationError(
344 hookimpl.plugin,
345 "Plugin %r for hook %r\nhookimpl definition: %s\n"
346 "Argument(s) %s are declared in the hookimpl but "
347 "can not be found in the hookspec"
348 % (
349 hookimpl.plugin_name,
350 hook.name,
351 _formatdef(hookimpl.function),
352 notinspec,
353 ),
354 )
356 if hook.spec.warn_on_impl_args:
357 for hookimpl_argname in hookimpl.argnames:
358 argname_warning = hook.spec.warn_on_impl_args.get(hookimpl_argname)
359 if argname_warning is not None:
360 _warn_for_function(argname_warning, hookimpl.function)
362 if (
363 hookimpl.wrapper or hookimpl.hookwrapper
364 ) and not inspect.isgeneratorfunction(hookimpl.function):
365 raise PluginValidationError(
366 hookimpl.plugin,
367 "Plugin %r for hook %r\nhookimpl definition: %s\n"
368 "Declared as wrapper=True or hookwrapper=True "
369 "but function is not a generator function"
370 % (hookimpl.plugin_name, hook.name, _formatdef(hookimpl.function)),
371 )
373 if hookimpl.wrapper and hookimpl.hookwrapper:
374 raise PluginValidationError(
375 hookimpl.plugin,
376 "Plugin %r for hook %r\nhookimpl definition: %s\n"
377 "The wrapper=True and hookwrapper=True options are mutually exclusive"
378 % (hookimpl.plugin_name, hook.name, _formatdef(hookimpl.function)),
379 )
381 def check_pending(self) -> None:
382 """Verify that all hooks which have not been verified against a
383 hook specification are optional, otherwise raise
384 :exc:`PluginValidationError`."""
385 for name in self.hook.__dict__:
386 if name[0] != "_":
387 hook: HookCaller = getattr(self.hook, name)
388 if not hook.has_spec():
389 for hookimpl in hook.get_hookimpls():
390 if not hookimpl.optionalhook:
391 raise PluginValidationError(
392 hookimpl.plugin,
393 "unknown hook %r in plugin %r"
394 % (name, hookimpl.plugin),
395 )
397 def load_setuptools_entrypoints(self, group: str, name: str | None = None) -> int:
398 """Load modules from querying the specified setuptools ``group``.
400 :param group:
401 Entry point group to load plugins.
402 :param name:
403 If given, loads only plugins with the given ``name``.
405 :return:
406 The number of plugins loaded by this call.
407 """
408 import importlib.metadata
410 count = 0
411 for dist in list(importlib.metadata.distributions()):
412 for ep in dist.entry_points:
413 if (
414 ep.group != group
415 or (name is not None and ep.name != name)
416 # already registered
417 or self.get_plugin(ep.name)
418 or self.is_blocked(ep.name)
419 ):
420 continue
421 plugin = ep.load()
422 self.register(plugin, name=ep.name)
423 self._plugin_distinfo.append((plugin, DistFacade(dist)))
424 count += 1
425 return count
427 def list_plugin_distinfo(self) -> list[tuple[_Plugin, DistFacade]]:
428 """Return a list of (plugin, distinfo) pairs for all
429 setuptools-registered plugins."""
430 return list(self._plugin_distinfo)
432 def list_name_plugin(self) -> list[tuple[str, _Plugin]]:
433 """Return a list of (name, plugin) pairs for all registered plugins."""
434 return list(self._name2plugin.items())
436 def get_hookcallers(self, plugin: _Plugin) -> list[HookCaller] | None:
437 """Get all hook callers for the specified plugin.
439 :returns:
440 The hook callers, or ``None`` if ``plugin`` is not registered in
441 this plugin manager.
442 """
443 if self.get_name(plugin) is None:
444 return None
445 hookcallers = []
446 for hookcaller in self.hook.__dict__.values():
447 for hookimpl in hookcaller.get_hookimpls():
448 if hookimpl.plugin is plugin:
449 hookcallers.append(hookcaller)
450 return hookcallers
452 def add_hookcall_monitoring(
453 self, before: _BeforeTrace, after: _AfterTrace
454 ) -> Callable[[], None]:
455 """Add before/after tracing functions for all hooks.
457 Returns an undo function which, when called, removes the added tracers.
459 ``before(hook_name, hook_impls, kwargs)`` will be called ahead
460 of all hook calls and receive a hookcaller instance, a list
461 of HookImpl instances and the keyword arguments for the hook call.
463 ``after(outcome, hook_name, hook_impls, kwargs)`` receives the
464 same arguments as ``before`` but also a :class:`~pluggy.Result` object
465 which represents the result of the overall hook call.
466 """
467 oldcall = self._inner_hookexec
469 def traced_hookexec(
470 hook_name: str,
471 hook_impls: Sequence[HookImpl],
472 caller_kwargs: Mapping[str, object],
473 firstresult: bool,
474 ) -> object | list[object]:
475 before(hook_name, hook_impls, caller_kwargs)
476 outcome = Result.from_call(
477 lambda: oldcall(hook_name, hook_impls, caller_kwargs, firstresult)
478 )
479 after(outcome, hook_name, hook_impls, caller_kwargs)
480 return outcome.get_result()
482 self._inner_hookexec = traced_hookexec
484 def undo() -> None:
485 self._inner_hookexec = oldcall
487 return undo
489 def enable_tracing(self) -> Callable[[], None]:
490 """Enable tracing of hook calls.
492 Returns an undo function which, when called, removes the added tracing.
493 """
494 hooktrace = self.trace.root.get("hook")
496 def before(
497 hook_name: str, methods: Sequence[HookImpl], kwargs: Mapping[str, object]
498 ) -> None:
499 hooktrace.root.indent += 1
500 hooktrace(hook_name, kwargs)
502 def after(
503 outcome: Result[object],
504 hook_name: str,
505 methods: Sequence[HookImpl],
506 kwargs: Mapping[str, object],
507 ) -> None:
508 if outcome.exception is None:
509 hooktrace("finish", hook_name, "-->", outcome.get_result())
510 hooktrace.root.indent -= 1
512 return self.add_hookcall_monitoring(before, after)
514 def subset_hook_caller(
515 self, name: str, remove_plugins: Iterable[_Plugin]
516 ) -> HookCaller:
517 """Return a proxy :class:`~pluggy.HookCaller` instance for the named
518 method which manages calls to all registered plugins except the ones
519 from remove_plugins."""
520 orig: HookCaller = getattr(self.hook, name)
521 plugins_to_remove = {plug for plug in remove_plugins if hasattr(plug, name)}
522 if plugins_to_remove:
523 return _SubsetHookCaller(orig, plugins_to_remove)
524 return orig
527def _formatdef(func: Callable[..., object]) -> str:
528 return f"{func.__name__}{inspect.signature(func)}"