Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/pluggy/_manager.py: 53%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

235 statements  

1from __future__ import annotations 

2 

3import inspect 

4import types 

5from typing import Any 

6from typing import Callable 

7from typing import cast 

8from typing import Final 

9from typing import Iterable 

10from typing import Mapping 

11from typing import Sequence 

12from typing import TYPE_CHECKING 

13import warnings 

14 

15from . import _tracing 

16from ._callers import _multicall 

17from ._hooks import _HookImplFunction 

18from ._hooks import _Namespace 

19from ._hooks import _Plugin 

20from ._hooks import _SubsetHookCaller 

21from ._hooks import HookCaller 

22from ._hooks import HookImpl 

23from ._hooks import HookimplOpts 

24from ._hooks import HookRelay 

25from ._hooks import HookspecOpts 

26from ._hooks import normalize_hookimpl_opts 

27from ._result import Result 

28 

29 

30if TYPE_CHECKING: 

31 # importtlib.metadata import is slow, defer it. 

32 import importlib.metadata 

33 

34 

35_BeforeTrace = Callable[[str, Sequence[HookImpl], Mapping[str, Any]], None] 

36_AfterTrace = Callable[[Result[Any], str, Sequence[HookImpl], Mapping[str, Any]], None] 

37 

38 

39def _warn_for_function(warning: Warning, function: Callable[..., object]) -> None: 

40 func = cast(types.FunctionType, function) 

41 warnings.warn_explicit( 

42 warning, 

43 type(warning), 

44 lineno=func.__code__.co_firstlineno, 

45 filename=func.__code__.co_filename, 

46 ) 

47 

48 

49class PluginValidationError(Exception): 

50 """Plugin failed validation. 

51 

52 :param plugin: The plugin which failed validation. 

53 :param message: Error message. 

54 """ 

55 

56 def __init__(self, plugin: _Plugin, message: str) -> None: 

57 super().__init__(message) 

58 #: The plugin which failed validation. 

59 self.plugin = plugin 

60 

61 

62class DistFacade: 

63 """Emulate a pkg_resources Distribution""" 

64 

65 def __init__(self, dist: importlib.metadata.Distribution) -> None: 

66 self._dist = dist 

67 

68 @property 

69 def project_name(self) -> str: 

70 name: str = self.metadata["name"] 

71 return name 

72 

73 def __getattr__(self, attr: str, default=None): 

74 return getattr(self._dist, attr, default) 

75 

76 def __dir__(self) -> list[str]: 

77 return sorted(dir(self._dist) + ["_dist", "project_name"]) 

78 

79 

80class PluginManager: 

81 """Core class which manages registration of plugin objects and 1:N hook 

82 calling. 

83 

84 You can register new hooks by calling :meth:`add_hookspecs(module_or_class) 

85 <PluginManager.add_hookspecs>`. 

86 

87 You can register plugin objects (which contain hook implementations) by 

88 calling :meth:`register(plugin) <PluginManager.register>`. 

89 

90 For debugging purposes you can call :meth:`PluginManager.enable_tracing` 

91 which will subsequently send debug information to the trace helper. 

92 

93 :param project_name: 

94 The short project name. Prefer snake case. Make sure it's unique! 

95 """ 

96 

97 def __init__(self, project_name: str) -> None: 

98 #: The project name. 

99 self.project_name: Final = project_name 

100 self._name2plugin: Final[dict[str, _Plugin]] = {} 

101 self._plugin_distinfo: Final[list[tuple[_Plugin, DistFacade]]] = [] 

102 #: The "hook relay", used to call a hook on all registered plugins. 

103 #: See :ref:`calling`. 

104 self.hook: Final = HookRelay() 

105 #: The tracing entry point. See :ref:`tracing`. 

106 self.trace: Final[_tracing.TagTracerSub] = _tracing.TagTracer().get( 

107 "pluginmanage" 

108 ) 

109 self._inner_hookexec = _multicall 

110 

111 def _hookexec( 

112 self, 

113 hook_name: str, 

114 methods: Sequence[HookImpl], 

115 kwargs: Mapping[str, object], 

116 firstresult: bool, 

117 ) -> object | list[object]: 

118 # called from all hookcaller instances. 

119 # enable_tracing will set its own wrapping function at self._inner_hookexec 

120 return self._inner_hookexec(hook_name, methods, kwargs, firstresult) 

121 

122 def register(self, plugin: _Plugin, name: str | None = None) -> str | None: 

123 """Register a plugin and return its name. 

124 

125 :param name: 

126 The name under which to register the plugin. If not specified, a 

127 name is generated using :func:`get_canonical_name`. 

128 

129 :returns: 

130 The plugin name. If the name is blocked from registering, returns 

131 ``None``. 

132 

133 If the plugin is already registered, raises a :exc:`ValueError`. 

134 """ 

135 plugin_name = name or self.get_canonical_name(plugin) 

136 

137 if plugin_name in self._name2plugin: 

138 if self._name2plugin.get(plugin_name, -1) is None: 

139 return None # blocked plugin, return None to indicate no registration 

140 raise ValueError( 

141 "Plugin name already registered: %s=%s\n%s" 

142 % (plugin_name, plugin, self._name2plugin) 

143 ) 

144 

145 if plugin in self._name2plugin.values(): 

146 raise ValueError( 

147 "Plugin already registered under a different name: %s=%s\n%s" 

148 % (plugin_name, plugin, self._name2plugin) 

149 ) 

150 

151 # XXX if an error happens we should make sure no state has been 

152 # changed at point of return 

153 self._name2plugin[plugin_name] = plugin 

154 

155 # register matching hook implementations of the plugin 

156 for name in dir(plugin): 

157 hookimpl_opts = self.parse_hookimpl_opts(plugin, name) 

158 if hookimpl_opts is not None: 

159 normalize_hookimpl_opts(hookimpl_opts) 

160 method: _HookImplFunction[object] = getattr(plugin, name) 

161 hookimpl = HookImpl(plugin, plugin_name, method, hookimpl_opts) 

162 name = hookimpl_opts.get("specname") or name 

163 hook: HookCaller | None = getattr(self.hook, name, None) 

164 if hook is None: 

165 hook = HookCaller(name, self._hookexec) 

166 setattr(self.hook, name, hook) 

167 elif hook.has_spec(): 

168 self._verify_hook(hook, hookimpl) 

169 hook._maybe_apply_history(hookimpl) 

170 hook._add_hookimpl(hookimpl) 

171 return plugin_name 

172 

173 def parse_hookimpl_opts(self, plugin: _Plugin, name: str) -> HookimplOpts | None: 

174 """Try to obtain a hook implementation from an item with the given name 

175 in the given plugin which is being searched for hook impls. 

176 

177 :returns: 

178 The parsed hookimpl options, or None to skip the given item. 

179 

180 This method can be overridden by ``PluginManager`` subclasses to 

181 customize how hook implementation are picked up. By default, returns the 

182 options for items decorated with :class:`HookimplMarker`. 

183 """ 

184 method: object = getattr(plugin, name) 

185 if not inspect.isroutine(method): 

186 return None 

187 try: 

188 res: HookimplOpts | None = getattr( 

189 method, self.project_name + "_impl", None 

190 ) 

191 except Exception: 

192 res = {} # type: ignore[assignment] 

193 if res is not None and not isinstance(res, dict): 

194 # false positive 

195 res = None # type:ignore[unreachable] 

196 return res 

197 

198 def unregister( 

199 self, plugin: _Plugin | None = None, name: str | None = None 

200 ) -> Any | None: 

201 """Unregister a plugin and all of its hook implementations. 

202 

203 The plugin can be specified either by the plugin object or the plugin 

204 name. If both are specified, they must agree. 

205 

206 Returns the unregistered plugin, or ``None`` if not found. 

207 """ 

208 if name is None: 

209 assert plugin is not None, "one of name or plugin needs to be specified" 

210 name = self.get_name(plugin) 

211 assert name is not None, "plugin is not registered" 

212 

213 if plugin is None: 

214 plugin = self.get_plugin(name) 

215 if plugin is None: 

216 return None 

217 

218 hookcallers = self.get_hookcallers(plugin) 

219 if hookcallers: 

220 for hookcaller in hookcallers: 

221 hookcaller._remove_plugin(plugin) 

222 

223 # if self._name2plugin[name] == None registration was blocked: ignore 

224 if self._name2plugin.get(name): 

225 assert name is not None 

226 del self._name2plugin[name] 

227 

228 return plugin 

229 

230 def set_blocked(self, name: str) -> None: 

231 """Block registrations of the given name, unregister if already registered.""" 

232 self.unregister(name=name) 

233 self._name2plugin[name] = None 

234 

235 def is_blocked(self, name: str) -> bool: 

236 """Return whether the given plugin name is blocked.""" 

237 return name in self._name2plugin and self._name2plugin[name] is None 

238 

239 def unblock(self, name: str) -> bool: 

240 """Unblocks a name. 

241 

242 Returns whether the name was actually blocked. 

243 """ 

244 if self._name2plugin.get(name, -1) is None: 

245 del self._name2plugin[name] 

246 return True 

247 return False 

248 

249 def add_hookspecs(self, module_or_class: _Namespace) -> None: 

250 """Add new hook specifications defined in the given ``module_or_class``. 

251 

252 Functions are recognized as hook specifications if they have been 

253 decorated with a matching :class:`HookspecMarker`. 

254 """ 

255 names = [] 

256 for name in dir(module_or_class): 

257 spec_opts = self.parse_hookspec_opts(module_or_class, name) 

258 if spec_opts is not None: 

259 hc: HookCaller | None = getattr(self.hook, name, None) 

260 if hc is None: 

261 hc = HookCaller(name, self._hookexec, module_or_class, spec_opts) 

262 setattr(self.hook, name, hc) 

263 else: 

264 # Plugins registered this hook without knowing the spec. 

265 hc.set_specification(module_or_class, spec_opts) 

266 for hookfunction in hc.get_hookimpls(): 

267 self._verify_hook(hc, hookfunction) 

268 names.append(name) 

269 

270 if not names: 

271 raise ValueError( 

272 f"did not find any {self.project_name!r} hooks in {module_or_class!r}" 

273 ) 

274 

275 def parse_hookspec_opts( 

276 self, module_or_class: _Namespace, name: str 

277 ) -> HookspecOpts | None: 

278 """Try to obtain a hook specification from an item with the given name 

279 in the given module or class which is being searched for hook specs. 

280 

281 :returns: 

282 The parsed hookspec options for defining a hook, or None to skip the 

283 given item. 

284 

285 This method can be overridden by ``PluginManager`` subclasses to 

286 customize how hook specifications are picked up. By default, returns the 

287 options for items decorated with :class:`HookspecMarker`. 

288 """ 

289 method = getattr(module_or_class, name) 

290 opts: HookspecOpts | None = getattr(method, self.project_name + "_spec", None) 

291 return opts 

292 

293 def get_plugins(self) -> set[Any]: 

294 """Return a set of all registered plugin objects.""" 

295 return {x for x in self._name2plugin.values() if x is not None} 

296 

297 def is_registered(self, plugin: _Plugin) -> bool: 

298 """Return whether the plugin is already registered.""" 

299 return any(plugin == val for val in self._name2plugin.values()) 

300 

301 def get_canonical_name(self, plugin: _Plugin) -> str: 

302 """Return a canonical name for a plugin object. 

303 

304 Note that a plugin may be registered under a different name 

305 specified by the caller of :meth:`register(plugin, name) <register>`. 

306 To obtain the name of a registered plugin use :meth:`get_name(plugin) 

307 <get_name>` instead. 

308 """ 

309 name: str | None = getattr(plugin, "__name__", None) 

310 return name or str(id(plugin)) 

311 

312 def get_plugin(self, name: str) -> Any | None: 

313 """Return the plugin registered under the given name, if any.""" 

314 return self._name2plugin.get(name) 

315 

316 def has_plugin(self, name: str) -> bool: 

317 """Return whether a plugin with the given name is registered.""" 

318 return self.get_plugin(name) is not None 

319 

320 def get_name(self, plugin: _Plugin) -> str | None: 

321 """Return the name the plugin is registered under, or ``None`` if 

322 is isn't.""" 

323 for name, val in self._name2plugin.items(): 

324 if plugin == val: 

325 return name 

326 return None 

327 

328 def _verify_hook(self, hook: HookCaller, hookimpl: HookImpl) -> None: 

329 if hook.is_historic() and (hookimpl.hookwrapper or hookimpl.wrapper): 

330 raise PluginValidationError( 

331 hookimpl.plugin, 

332 "Plugin %r\nhook %r\nhistoric incompatible with yield/wrapper/hookwrapper" 

333 % (hookimpl.plugin_name, hook.name), 

334 ) 

335 

336 assert hook.spec is not None 

337 if hook.spec.warn_on_impl: 

338 _warn_for_function(hook.spec.warn_on_impl, hookimpl.function) 

339 

340 # positional arg checking 

341 notinspec = set(hookimpl.argnames) - set(hook.spec.argnames) 

342 if notinspec: 

343 raise PluginValidationError( 

344 hookimpl.plugin, 

345 "Plugin %r for hook %r\nhookimpl definition: %s\n" 

346 "Argument(s) %s are declared in the hookimpl but " 

347 "can not be found in the hookspec" 

348 % ( 

349 hookimpl.plugin_name, 

350 hook.name, 

351 _formatdef(hookimpl.function), 

352 notinspec, 

353 ), 

354 ) 

355 

356 if hook.spec.warn_on_impl_args: 

357 for hookimpl_argname in hookimpl.argnames: 

358 argname_warning = hook.spec.warn_on_impl_args.get(hookimpl_argname) 

359 if argname_warning is not None: 

360 _warn_for_function(argname_warning, hookimpl.function) 

361 

362 if ( 

363 hookimpl.wrapper or hookimpl.hookwrapper 

364 ) and not inspect.isgeneratorfunction(hookimpl.function): 

365 raise PluginValidationError( 

366 hookimpl.plugin, 

367 "Plugin %r for hook %r\nhookimpl definition: %s\n" 

368 "Declared as wrapper=True or hookwrapper=True " 

369 "but function is not a generator function" 

370 % (hookimpl.plugin_name, hook.name, _formatdef(hookimpl.function)), 

371 ) 

372 

373 if hookimpl.wrapper and hookimpl.hookwrapper: 

374 raise PluginValidationError( 

375 hookimpl.plugin, 

376 "Plugin %r for hook %r\nhookimpl definition: %s\n" 

377 "The wrapper=True and hookwrapper=True options are mutually exclusive" 

378 % (hookimpl.plugin_name, hook.name, _formatdef(hookimpl.function)), 

379 ) 

380 

381 def check_pending(self) -> None: 

382 """Verify that all hooks which have not been verified against a 

383 hook specification are optional, otherwise raise 

384 :exc:`PluginValidationError`.""" 

385 for name in self.hook.__dict__: 

386 if name[0] != "_": 

387 hook: HookCaller = getattr(self.hook, name) 

388 if not hook.has_spec(): 

389 for hookimpl in hook.get_hookimpls(): 

390 if not hookimpl.optionalhook: 

391 raise PluginValidationError( 

392 hookimpl.plugin, 

393 "unknown hook %r in plugin %r" 

394 % (name, hookimpl.plugin), 

395 ) 

396 

397 def load_setuptools_entrypoints(self, group: str, name: str | None = None) -> int: 

398 """Load modules from querying the specified setuptools ``group``. 

399 

400 :param group: 

401 Entry point group to load plugins. 

402 :param name: 

403 If given, loads only plugins with the given ``name``. 

404 

405 :return: 

406 The number of plugins loaded by this call. 

407 """ 

408 import importlib.metadata 

409 

410 count = 0 

411 for dist in list(importlib.metadata.distributions()): 

412 for ep in dist.entry_points: 

413 if ( 

414 ep.group != group 

415 or (name is not None and ep.name != name) 

416 # already registered 

417 or self.get_plugin(ep.name) 

418 or self.is_blocked(ep.name) 

419 ): 

420 continue 

421 plugin = ep.load() 

422 self.register(plugin, name=ep.name) 

423 self._plugin_distinfo.append((plugin, DistFacade(dist))) 

424 count += 1 

425 return count 

426 

427 def list_plugin_distinfo(self) -> list[tuple[_Plugin, DistFacade]]: 

428 """Return a list of (plugin, distinfo) pairs for all 

429 setuptools-registered plugins.""" 

430 return list(self._plugin_distinfo) 

431 

432 def list_name_plugin(self) -> list[tuple[str, _Plugin]]: 

433 """Return a list of (name, plugin) pairs for all registered plugins.""" 

434 return list(self._name2plugin.items()) 

435 

436 def get_hookcallers(self, plugin: _Plugin) -> list[HookCaller] | None: 

437 """Get all hook callers for the specified plugin. 

438 

439 :returns: 

440 The hook callers, or ``None`` if ``plugin`` is not registered in 

441 this plugin manager. 

442 """ 

443 if self.get_name(plugin) is None: 

444 return None 

445 hookcallers = [] 

446 for hookcaller in self.hook.__dict__.values(): 

447 for hookimpl in hookcaller.get_hookimpls(): 

448 if hookimpl.plugin is plugin: 

449 hookcallers.append(hookcaller) 

450 return hookcallers 

451 

452 def add_hookcall_monitoring( 

453 self, before: _BeforeTrace, after: _AfterTrace 

454 ) -> Callable[[], None]: 

455 """Add before/after tracing functions for all hooks. 

456 

457 Returns an undo function which, when called, removes the added tracers. 

458 

459 ``before(hook_name, hook_impls, kwargs)`` will be called ahead 

460 of all hook calls and receive a hookcaller instance, a list 

461 of HookImpl instances and the keyword arguments for the hook call. 

462 

463 ``after(outcome, hook_name, hook_impls, kwargs)`` receives the 

464 same arguments as ``before`` but also a :class:`~pluggy.Result` object 

465 which represents the result of the overall hook call. 

466 """ 

467 oldcall = self._inner_hookexec 

468 

469 def traced_hookexec( 

470 hook_name: str, 

471 hook_impls: Sequence[HookImpl], 

472 caller_kwargs: Mapping[str, object], 

473 firstresult: bool, 

474 ) -> object | list[object]: 

475 before(hook_name, hook_impls, caller_kwargs) 

476 outcome = Result.from_call( 

477 lambda: oldcall(hook_name, hook_impls, caller_kwargs, firstresult) 

478 ) 

479 after(outcome, hook_name, hook_impls, caller_kwargs) 

480 return outcome.get_result() 

481 

482 self._inner_hookexec = traced_hookexec 

483 

484 def undo() -> None: 

485 self._inner_hookexec = oldcall 

486 

487 return undo 

488 

489 def enable_tracing(self) -> Callable[[], None]: 

490 """Enable tracing of hook calls. 

491 

492 Returns an undo function which, when called, removes the added tracing. 

493 """ 

494 hooktrace = self.trace.root.get("hook") 

495 

496 def before( 

497 hook_name: str, methods: Sequence[HookImpl], kwargs: Mapping[str, object] 

498 ) -> None: 

499 hooktrace.root.indent += 1 

500 hooktrace(hook_name, kwargs) 

501 

502 def after( 

503 outcome: Result[object], 

504 hook_name: str, 

505 methods: Sequence[HookImpl], 

506 kwargs: Mapping[str, object], 

507 ) -> None: 

508 if outcome.exception is None: 

509 hooktrace("finish", hook_name, "-->", outcome.get_result()) 

510 hooktrace.root.indent -= 1 

511 

512 return self.add_hookcall_monitoring(before, after) 

513 

514 def subset_hook_caller( 

515 self, name: str, remove_plugins: Iterable[_Plugin] 

516 ) -> HookCaller: 

517 """Return a proxy :class:`~pluggy.HookCaller` instance for the named 

518 method which manages calls to all registered plugins except the ones 

519 from remove_plugins.""" 

520 orig: HookCaller = getattr(self.hook, name) 

521 plugins_to_remove = {plug for plug in remove_plugins if hasattr(plug, name)} 

522 if plugins_to_remove: 

523 return _SubsetHookCaller(orig, plugins_to_remove) 

524 return orig 

525 

526 

527def _formatdef(func: Callable[..., object]) -> str: 

528 return f"{func.__name__}{inspect.signature(func)}"