Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/pluggy/_callers.py: 16%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

109 statements  

1""" 

2Call loop machinery 

3""" 

4 

5from __future__ import annotations 

6 

7from typing import cast 

8from typing import Generator 

9from typing import Mapping 

10from typing import NoReturn 

11from typing import Sequence 

12from typing import Tuple 

13from typing import Union 

14import warnings 

15 

16from ._hooks import HookImpl 

17from ._result import HookCallError 

18from ._result import Result 

19from ._warnings import PluggyTeardownRaisedWarning 

20 

21 

22# Need to distinguish between old- and new-style hook wrappers. 

23# Wrapping with a tuple is the fastest type-safe way I found to do it. 

24Teardown = Union[ 

25 Tuple[Generator[None, Result[object], None], HookImpl], 

26 Generator[None, object, object], 

27] 

28 

29 

30def _raise_wrapfail( 

31 wrap_controller: ( 

32 Generator[None, Result[object], None] | Generator[None, object, object] 

33 ), 

34 msg: str, 

35) -> NoReturn: 

36 co = wrap_controller.gi_code 

37 raise RuntimeError( 

38 "wrap_controller at %r %s:%d %s" 

39 % (co.co_name, co.co_filename, co.co_firstlineno, msg) 

40 ) 

41 

42 

43def _warn_teardown_exception( 

44 hook_name: str, hook_impl: HookImpl, e: BaseException 

45) -> None: 

46 msg = "A plugin raised an exception during an old-style hookwrapper teardown.\n" 

47 msg += f"Plugin: {hook_impl.plugin_name}, Hook: {hook_name}\n" 

48 msg += f"{type(e).__name__}: {e}\n" 

49 msg += "For more information see https://pluggy.readthedocs.io/en/stable/api_reference.html#pluggy.PluggyTeardownRaisedWarning" # noqa: E501 

50 warnings.warn(PluggyTeardownRaisedWarning(msg), stacklevel=5) 

51 

52 

53def _multicall( 

54 hook_name: str, 

55 hook_impls: Sequence[HookImpl], 

56 caller_kwargs: Mapping[str, object], 

57 firstresult: bool, 

58) -> object | list[object]: 

59 """Execute a call into multiple python functions/methods and return the 

60 result(s). 

61 

62 ``caller_kwargs`` comes from HookCaller.__call__(). 

63 """ 

64 __tracebackhide__ = True 

65 results: list[object] = [] 

66 exception = None 

67 only_new_style_wrappers = True 

68 try: # run impl and wrapper setup functions in a loop 

69 teardowns: list[Teardown] = [] 

70 try: 

71 for hook_impl in reversed(hook_impls): 

72 try: 

73 args = [caller_kwargs[argname] for argname in hook_impl.argnames] 

74 except KeyError: 

75 for argname in hook_impl.argnames: 

76 if argname not in caller_kwargs: 

77 raise HookCallError( 

78 f"hook call must provide argument {argname!r}" 

79 ) 

80 

81 if hook_impl.hookwrapper: 

82 only_new_style_wrappers = False 

83 try: 

84 # If this cast is not valid, a type error is raised below, 

85 # which is the desired response. 

86 res = hook_impl.function(*args) 

87 wrapper_gen = cast(Generator[None, Result[object], None], res) 

88 next(wrapper_gen) # first yield 

89 teardowns.append((wrapper_gen, hook_impl)) 

90 except StopIteration: 

91 _raise_wrapfail(wrapper_gen, "did not yield") 

92 elif hook_impl.wrapper: 

93 try: 

94 # If this cast is not valid, a type error is raised below, 

95 # which is the desired response. 

96 res = hook_impl.function(*args) 

97 function_gen = cast(Generator[None, object, object], res) 

98 next(function_gen) # first yield 

99 teardowns.append(function_gen) 

100 except StopIteration: 

101 _raise_wrapfail(function_gen, "did not yield") 

102 else: 

103 res = hook_impl.function(*args) 

104 if res is not None: 

105 results.append(res) 

106 if firstresult: # halt further impl calls 

107 break 

108 except BaseException as exc: 

109 exception = exc 

110 finally: 

111 # Fast path - only new-style wrappers, no Result. 

112 if only_new_style_wrappers: 

113 if firstresult: # first result hooks return a single value 

114 result = results[0] if results else None 

115 else: 

116 result = results 

117 

118 # run all wrapper post-yield blocks 

119 for teardown in reversed(teardowns): 

120 try: 

121 if exception is not None: 

122 teardown.throw(exception) # type: ignore[union-attr] 

123 else: 

124 teardown.send(result) # type: ignore[union-attr] 

125 # Following is unreachable for a well behaved hook wrapper. 

126 # Try to force finalizers otherwise postponed till GC action. 

127 # Note: close() may raise if generator handles GeneratorExit. 

128 teardown.close() # type: ignore[union-attr] 

129 except StopIteration as si: 

130 result = si.value 

131 exception = None 

132 continue 

133 except BaseException as e: 

134 exception = e 

135 continue 

136 _raise_wrapfail(teardown, "has second yield") # type: ignore[arg-type] 

137 

138 if exception is not None: 

139 raise exception.with_traceback(exception.__traceback__) 

140 else: 

141 return result 

142 

143 # Slow path - need to support old-style wrappers. 

144 else: 

145 if firstresult: # first result hooks return a single value 

146 outcome: Result[object | list[object]] = Result( 

147 results[0] if results else None, exception 

148 ) 

149 else: 

150 outcome = Result(results, exception) 

151 

152 # run all wrapper post-yield blocks 

153 for teardown in reversed(teardowns): 

154 if isinstance(teardown, tuple): 

155 try: 

156 teardown[0].send(outcome) 

157 except StopIteration: 

158 pass 

159 except BaseException as e: 

160 _warn_teardown_exception(hook_name, teardown[1], e) 

161 raise 

162 else: 

163 _raise_wrapfail(teardown[0], "has second yield") 

164 else: 

165 try: 

166 if outcome._exception is not None: 

167 teardown.throw(outcome._exception) 

168 else: 

169 teardown.send(outcome._result) 

170 # Following is unreachable for a well behaved hook wrapper. 

171 # Try to force finalizers otherwise postponed till GC action. 

172 # Note: close() may raise if generator handles GeneratorExit. 

173 teardown.close() 

174 except StopIteration as si: 

175 outcome.force_result(si.value) 

176 continue 

177 except BaseException as e: 

178 outcome.force_exception(e) 

179 continue 

180 _raise_wrapfail(teardown, "has second yield") 

181 

182 return outcome.get_result()