Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/pluggy/_callers.py: 16%
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1"""
2Call loop machinery
3"""
5from __future__ import annotations
7from typing import cast
8from typing import Generator
9from typing import Mapping
10from typing import NoReturn
11from typing import Sequence
12from typing import Tuple
13from typing import Union
14import warnings
16from ._hooks import HookImpl
17from ._result import HookCallError
18from ._result import Result
19from ._warnings import PluggyTeardownRaisedWarning
22# Need to distinguish between old- and new-style hook wrappers.
23# Wrapping with a tuple is the fastest type-safe way I found to do it.
24Teardown = Union[
25 Tuple[Generator[None, Result[object], None], HookImpl],
26 Generator[None, object, object],
27]
30def _raise_wrapfail(
31 wrap_controller: (
32 Generator[None, Result[object], None] | Generator[None, object, object]
33 ),
34 msg: str,
35) -> NoReturn:
36 co = wrap_controller.gi_code
37 raise RuntimeError(
38 "wrap_controller at %r %s:%d %s"
39 % (co.co_name, co.co_filename, co.co_firstlineno, msg)
40 )
43def _warn_teardown_exception(
44 hook_name: str, hook_impl: HookImpl, e: BaseException
45) -> None:
46 msg = "A plugin raised an exception during an old-style hookwrapper teardown.\n"
47 msg += f"Plugin: {hook_impl.plugin_name}, Hook: {hook_name}\n"
48 msg += f"{type(e).__name__}: {e}\n"
49 msg += "For more information see https://pluggy.readthedocs.io/en/stable/api_reference.html#pluggy.PluggyTeardownRaisedWarning" # noqa: E501
50 warnings.warn(PluggyTeardownRaisedWarning(msg), stacklevel=5)
53def _multicall(
54 hook_name: str,
55 hook_impls: Sequence[HookImpl],
56 caller_kwargs: Mapping[str, object],
57 firstresult: bool,
58) -> object | list[object]:
59 """Execute a call into multiple python functions/methods and return the
60 result(s).
62 ``caller_kwargs`` comes from HookCaller.__call__().
63 """
64 __tracebackhide__ = True
65 results: list[object] = []
66 exception = None
67 only_new_style_wrappers = True
68 try: # run impl and wrapper setup functions in a loop
69 teardowns: list[Teardown] = []
70 try:
71 for hook_impl in reversed(hook_impls):
72 try:
73 args = [caller_kwargs[argname] for argname in hook_impl.argnames]
74 except KeyError:
75 for argname in hook_impl.argnames:
76 if argname not in caller_kwargs:
77 raise HookCallError(
78 f"hook call must provide argument {argname!r}"
79 )
81 if hook_impl.hookwrapper:
82 only_new_style_wrappers = False
83 try:
84 # If this cast is not valid, a type error is raised below,
85 # which is the desired response.
86 res = hook_impl.function(*args)
87 wrapper_gen = cast(Generator[None, Result[object], None], res)
88 next(wrapper_gen) # first yield
89 teardowns.append((wrapper_gen, hook_impl))
90 except StopIteration:
91 _raise_wrapfail(wrapper_gen, "did not yield")
92 elif hook_impl.wrapper:
93 try:
94 # If this cast is not valid, a type error is raised below,
95 # which is the desired response.
96 res = hook_impl.function(*args)
97 function_gen = cast(Generator[None, object, object], res)
98 next(function_gen) # first yield
99 teardowns.append(function_gen)
100 except StopIteration:
101 _raise_wrapfail(function_gen, "did not yield")
102 else:
103 res = hook_impl.function(*args)
104 if res is not None:
105 results.append(res)
106 if firstresult: # halt further impl calls
107 break
108 except BaseException as exc:
109 exception = exc
110 finally:
111 # Fast path - only new-style wrappers, no Result.
112 if only_new_style_wrappers:
113 if firstresult: # first result hooks return a single value
114 result = results[0] if results else None
115 else:
116 result = results
118 # run all wrapper post-yield blocks
119 for teardown in reversed(teardowns):
120 try:
121 if exception is not None:
122 teardown.throw(exception) # type: ignore[union-attr]
123 else:
124 teardown.send(result) # type: ignore[union-attr]
125 # Following is unreachable for a well behaved hook wrapper.
126 # Try to force finalizers otherwise postponed till GC action.
127 # Note: close() may raise if generator handles GeneratorExit.
128 teardown.close() # type: ignore[union-attr]
129 except StopIteration as si:
130 result = si.value
131 exception = None
132 continue
133 except BaseException as e:
134 exception = e
135 continue
136 _raise_wrapfail(teardown, "has second yield") # type: ignore[arg-type]
138 if exception is not None:
139 raise exception.with_traceback(exception.__traceback__)
140 else:
141 return result
143 # Slow path - need to support old-style wrappers.
144 else:
145 if firstresult: # first result hooks return a single value
146 outcome: Result[object | list[object]] = Result(
147 results[0] if results else None, exception
148 )
149 else:
150 outcome = Result(results, exception)
152 # run all wrapper post-yield blocks
153 for teardown in reversed(teardowns):
154 if isinstance(teardown, tuple):
155 try:
156 teardown[0].send(outcome)
157 except StopIteration:
158 pass
159 except BaseException as e:
160 _warn_teardown_exception(hook_name, teardown[1], e)
161 raise
162 else:
163 _raise_wrapfail(teardown[0], "has second yield")
164 else:
165 try:
166 if outcome._exception is not None:
167 teardown.throw(outcome._exception)
168 else:
169 teardown.send(outcome._result)
170 # Following is unreachable for a well behaved hook wrapper.
171 # Try to force finalizers otherwise postponed till GC action.
172 # Note: close() may raise if generator handles GeneratorExit.
173 teardown.close()
174 except StopIteration as si:
175 outcome.force_result(si.value)
176 continue
177 except BaseException as e:
178 outcome.force_exception(e)
179 continue
180 _raise_wrapfail(teardown, "has second yield")
182 return outcome.get_result()