Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/pluggy/_manager.py: 23%

190 statements  

« prev     ^ index     » next       coverage.py v7.0.1, created at 2022-12-25 06:11 +0000

1import inspect 

2import sys 

3import warnings 

4 

5from . import _tracing 

6from ._callers import _Result, _multicall 

7from ._hooks import HookImpl, _HookRelay, _HookCaller, normalize_hookimpl_opts 

8 

9if sys.version_info >= (3, 8): 

10 from importlib import metadata as importlib_metadata 

11else: 

12 import importlib_metadata 

13 

14 

15def _warn_for_function(warning, function): 

16 warnings.warn_explicit( 

17 warning, 

18 type(warning), 

19 lineno=function.__code__.co_firstlineno, 

20 filename=function.__code__.co_filename, 

21 ) 

22 

23 

24class PluginValidationError(Exception): 

25 """plugin failed validation. 

26 

27 :param object plugin: the plugin which failed validation, 

28 may be a module or an arbitrary object. 

29 """ 

30 

31 def __init__(self, plugin, message): 

32 self.plugin = plugin 

33 super(Exception, self).__init__(message) 

34 

35 

36class DistFacade: 

37 """Emulate a pkg_resources Distribution""" 

38 

39 def __init__(self, dist): 

40 self._dist = dist 

41 

42 @property 

43 def project_name(self): 

44 return self.metadata["name"] 

45 

46 def __getattr__(self, attr, default=None): 

47 return getattr(self._dist, attr, default) 

48 

49 def __dir__(self): 

50 return sorted(dir(self._dist) + ["_dist", "project_name"]) 

51 

52 

53class PluginManager: 

54 """Core :py:class:`.PluginManager` class which manages registration 

55 of plugin objects and 1:N hook calling. 

56 

57 You can register new hooks by calling :py:meth:`add_hookspecs(module_or_class) 

58 <.PluginManager.add_hookspecs>`. 

59 You can register plugin objects (which contain hooks) by calling 

60 :py:meth:`register(plugin) <.PluginManager.register>`. The :py:class:`.PluginManager` 

61 is initialized with a prefix that is searched for in the names of the dict 

62 of registered plugin objects. 

63 

64 For debugging purposes you can call :py:meth:`.PluginManager.enable_tracing` 

65 which will subsequently send debug information to the trace helper. 

66 """ 

67 

68 def __init__(self, project_name): 

69 self.project_name = project_name 

70 self._name2plugin = {} 

71 self._plugin2hookcallers = {} 

72 self._plugin_distinfo = [] 

73 self.trace = _tracing.TagTracer().get("pluginmanage") 

74 self.hook = _HookRelay() 

75 self._inner_hookexec = _multicall 

76 

77 def _hookexec(self, hook_name, methods, kwargs, firstresult): 

78 # called from all hookcaller instances. 

79 # enable_tracing will set its own wrapping function at self._inner_hookexec 

80 return self._inner_hookexec(hook_name, methods, kwargs, firstresult) 

81 

82 def register(self, plugin, name=None): 

83 """Register a plugin and return its canonical name or ``None`` if the name 

84 is blocked from registering. Raise a :py:class:`ValueError` if the plugin 

85 is already registered.""" 

86 plugin_name = name or self.get_canonical_name(plugin) 

87 

88 if plugin_name in self._name2plugin or plugin in self._plugin2hookcallers: 

89 if self._name2plugin.get(plugin_name, -1) is None: 

90 return # blocked plugin, return None to indicate no registration 

91 raise ValueError( 

92 "Plugin already registered: %s=%s\n%s" 

93 % (plugin_name, plugin, self._name2plugin) 

94 ) 

95 

96 # XXX if an error happens we should make sure no state has been 

97 # changed at point of return 

98 self._name2plugin[plugin_name] = plugin 

99 

100 # register matching hook implementations of the plugin 

101 self._plugin2hookcallers[plugin] = hookcallers = [] 

102 for name in dir(plugin): 

103 hookimpl_opts = self.parse_hookimpl_opts(plugin, name) 

104 if hookimpl_opts is not None: 

105 normalize_hookimpl_opts(hookimpl_opts) 

106 method = getattr(plugin, name) 

107 hookimpl = HookImpl(plugin, plugin_name, method, hookimpl_opts) 

108 name = hookimpl_opts.get("specname") or name 

109 hook = getattr(self.hook, name, None) 

110 if hook is None: 

111 hook = _HookCaller(name, self._hookexec) 

112 setattr(self.hook, name, hook) 

113 elif hook.has_spec(): 

114 self._verify_hook(hook, hookimpl) 

115 hook._maybe_apply_history(hookimpl) 

116 hook._add_hookimpl(hookimpl) 

117 hookcallers.append(hook) 

118 return plugin_name 

119 

120 def parse_hookimpl_opts(self, plugin, name): 

121 method = getattr(plugin, name) 

122 if not inspect.isroutine(method): 

123 return 

124 try: 

125 res = getattr(method, self.project_name + "_impl", None) 

126 except Exception: 

127 res = {} 

128 if res is not None and not isinstance(res, dict): 

129 # false positive 

130 res = None 

131 return res 

132 

133 def unregister(self, plugin=None, name=None): 

134 """unregister a plugin object and all its contained hook implementations 

135 from internal data structures.""" 

136 if name is None: 

137 assert plugin is not None, "one of name or plugin needs to be specified" 

138 name = self.get_name(plugin) 

139 

140 if plugin is None: 

141 plugin = self.get_plugin(name) 

142 

143 # if self._name2plugin[name] == None registration was blocked: ignore 

144 if self._name2plugin.get(name): 

145 del self._name2plugin[name] 

146 

147 for hookcaller in self._plugin2hookcallers.pop(plugin, []): 

148 hookcaller._remove_plugin(plugin) 

149 

150 return plugin 

151 

152 def set_blocked(self, name): 

153 """block registrations of the given name, unregister if already registered.""" 

154 self.unregister(name=name) 

155 self._name2plugin[name] = None 

156 

157 def is_blocked(self, name): 

158 """return ``True`` if the given plugin name is blocked.""" 

159 return name in self._name2plugin and self._name2plugin[name] is None 

160 

161 def add_hookspecs(self, module_or_class): 

162 """add new hook specifications defined in the given ``module_or_class``. 

163 Functions are recognized if they have been decorated accordingly.""" 

164 names = [] 

165 for name in dir(module_or_class): 

166 spec_opts = self.parse_hookspec_opts(module_or_class, name) 

167 if spec_opts is not None: 

168 hc = getattr(self.hook, name, None) 

169 if hc is None: 

170 hc = _HookCaller(name, self._hookexec, module_or_class, spec_opts) 

171 setattr(self.hook, name, hc) 

172 else: 

173 # plugins registered this hook without knowing the spec 

174 hc.set_specification(module_or_class, spec_opts) 

175 for hookfunction in hc.get_hookimpls(): 

176 self._verify_hook(hc, hookfunction) 

177 names.append(name) 

178 

179 if not names: 

180 raise ValueError( 

181 f"did not find any {self.project_name!r} hooks in {module_or_class!r}" 

182 ) 

183 

184 def parse_hookspec_opts(self, module_or_class, name): 

185 method = getattr(module_or_class, name) 

186 return getattr(method, self.project_name + "_spec", None) 

187 

188 def get_plugins(self): 

189 """return the set of registered plugins.""" 

190 return set(self._plugin2hookcallers) 

191 

192 def is_registered(self, plugin): 

193 """Return ``True`` if the plugin is already registered.""" 

194 return plugin in self._plugin2hookcallers 

195 

196 def get_canonical_name(self, plugin): 

197 """Return canonical name for a plugin object. Note that a plugin 

198 may be registered under a different name which was specified 

199 by the caller of :py:meth:`register(plugin, name) <.PluginManager.register>`. 

200 To obtain the name of an registered plugin use :py:meth:`get_name(plugin) 

201 <.PluginManager.get_name>` instead.""" 

202 return getattr(plugin, "__name__", None) or str(id(plugin)) 

203 

204 def get_plugin(self, name): 

205 """Return a plugin or ``None`` for the given name.""" 

206 return self._name2plugin.get(name) 

207 

208 def has_plugin(self, name): 

209 """Return ``True`` if a plugin with the given name is registered.""" 

210 return self.get_plugin(name) is not None 

211 

212 def get_name(self, plugin): 

213 """Return name for registered plugin or ``None`` if not registered.""" 

214 for name, val in self._name2plugin.items(): 

215 if plugin == val: 

216 return name 

217 

218 def _verify_hook(self, hook, hookimpl): 

219 if hook.is_historic() and hookimpl.hookwrapper: 

220 raise PluginValidationError( 

221 hookimpl.plugin, 

222 "Plugin %r\nhook %r\nhistoric incompatible to hookwrapper" 

223 % (hookimpl.plugin_name, hook.name), 

224 ) 

225 

226 if hook.spec.warn_on_impl: 

227 _warn_for_function(hook.spec.warn_on_impl, hookimpl.function) 

228 

229 # positional arg checking 

230 notinspec = set(hookimpl.argnames) - set(hook.spec.argnames) 

231 if notinspec: 

232 raise PluginValidationError( 

233 hookimpl.plugin, 

234 "Plugin %r for hook %r\nhookimpl definition: %s\n" 

235 "Argument(s) %s are declared in the hookimpl but " 

236 "can not be found in the hookspec" 

237 % ( 

238 hookimpl.plugin_name, 

239 hook.name, 

240 _formatdef(hookimpl.function), 

241 notinspec, 

242 ), 

243 ) 

244 

245 if hookimpl.hookwrapper and not inspect.isgeneratorfunction(hookimpl.function): 

246 raise PluginValidationError( 

247 hookimpl.plugin, 

248 "Plugin %r for hook %r\nhookimpl definition: %s\n" 

249 "Declared as hookwrapper=True but function is not a generator function" 

250 % (hookimpl.plugin_name, hook.name, _formatdef(hookimpl.function)), 

251 ) 

252 

253 def check_pending(self): 

254 """Verify that all hooks which have not been verified against 

255 a hook specification are optional, otherwise raise :py:class:`.PluginValidationError`.""" 

256 for name in self.hook.__dict__: 

257 if name[0] != "_": 

258 hook = getattr(self.hook, name) 

259 if not hook.has_spec(): 

260 for hookimpl in hook.get_hookimpls(): 

261 if not hookimpl.optionalhook: 

262 raise PluginValidationError( 

263 hookimpl.plugin, 

264 "unknown hook %r in plugin %r" 

265 % (name, hookimpl.plugin), 

266 ) 

267 

268 def load_setuptools_entrypoints(self, group, name=None): 

269 """Load modules from querying the specified setuptools ``group``. 

270 

271 :param str group: entry point group to load plugins 

272 :param str name: if given, loads only plugins with the given ``name``. 

273 :rtype: int 

274 :return: return the number of loaded plugins by this call. 

275 """ 

276 count = 0 

277 for dist in list(importlib_metadata.distributions()): 

278 for ep in dist.entry_points: 

279 if ( 

280 ep.group != group 

281 or (name is not None and ep.name != name) 

282 # already registered 

283 or self.get_plugin(ep.name) 

284 or self.is_blocked(ep.name) 

285 ): 

286 continue 

287 plugin = ep.load() 

288 self.register(plugin, name=ep.name) 

289 self._plugin_distinfo.append((plugin, DistFacade(dist))) 

290 count += 1 

291 return count 

292 

293 def list_plugin_distinfo(self): 

294 """return list of distinfo/plugin tuples for all setuptools registered 

295 plugins.""" 

296 return list(self._plugin_distinfo) 

297 

298 def list_name_plugin(self): 

299 """return list of name/plugin pairs.""" 

300 return list(self._name2plugin.items()) 

301 

302 def get_hookcallers(self, plugin): 

303 """get all hook callers for the specified plugin.""" 

304 return self._plugin2hookcallers.get(plugin) 

305 

306 def add_hookcall_monitoring(self, before, after): 

307 """add before/after tracing functions for all hooks 

308 and return an undo function which, when called, 

309 will remove the added tracers. 

310 

311 ``before(hook_name, hook_impls, kwargs)`` will be called ahead 

312 of all hook calls and receive a hookcaller instance, a list 

313 of HookImpl instances and the keyword arguments for the hook call. 

314 

315 ``after(outcome, hook_name, hook_impls, kwargs)`` receives the 

316 same arguments as ``before`` but also a :py:class:`pluggy._callers._Result` object 

317 which represents the result of the overall hook call. 

318 """ 

319 oldcall = self._inner_hookexec 

320 

321 def traced_hookexec(hook_name, hook_impls, kwargs, firstresult): 

322 before(hook_name, hook_impls, kwargs) 

323 outcome = _Result.from_call( 

324 lambda: oldcall(hook_name, hook_impls, kwargs, firstresult) 

325 ) 

326 after(outcome, hook_name, hook_impls, kwargs) 

327 return outcome.get_result() 

328 

329 self._inner_hookexec = traced_hookexec 

330 

331 def undo(): 

332 self._inner_hookexec = oldcall 

333 

334 return undo 

335 

336 def enable_tracing(self): 

337 """enable tracing of hook calls and return an undo function.""" 

338 hooktrace = self.trace.root.get("hook") 

339 

340 def before(hook_name, methods, kwargs): 

341 hooktrace.root.indent += 1 

342 hooktrace(hook_name, kwargs) 

343 

344 def after(outcome, hook_name, methods, kwargs): 

345 if outcome.excinfo is None: 

346 hooktrace("finish", hook_name, "-->", outcome.get_result()) 

347 hooktrace.root.indent -= 1 

348 

349 return self.add_hookcall_monitoring(before, after) 

350 

351 def subset_hook_caller(self, name, remove_plugins): 

352 """Return a new :py:class:`._hooks._HookCaller` instance for the named method 

353 which manages calls to all registered plugins except the 

354 ones from remove_plugins.""" 

355 orig = getattr(self.hook, name) 

356 plugins_to_remove = [plug for plug in remove_plugins if hasattr(plug, name)] 

357 if plugins_to_remove: 

358 hc = _HookCaller( 

359 orig.name, orig._hookexec, orig.spec.namespace, orig.spec.opts 

360 ) 

361 for hookimpl in orig.get_hookimpls(): 

362 plugin = hookimpl.plugin 

363 if plugin not in plugins_to_remove: 

364 hc._add_hookimpl(hookimpl) 

365 # we also keep track of this hook caller so it 

366 # gets properly removed on plugin unregistration 

367 self._plugin2hookcallers.setdefault(plugin, []).append(hc) 

368 return hc 

369 return orig 

370 

371 

372def _formatdef(func): 

373 return f"{func.__name__}{inspect.signature(func)}"