Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/asyncio/coroutines.py: 26%

159 statements  

« prev     ^ index     » next       coverage.py v7.0.1, created at 2022-12-25 06:11 +0000

1__all__ = 'coroutine', 'iscoroutinefunction', 'iscoroutine' 

2 

3import collections.abc 

4import functools 

5import inspect 

6import os 

7import sys 

8import traceback 

9import types 

10import warnings 

11 

12from . import base_futures 

13from . import constants 

14from . import format_helpers 

15from .log import logger 

16 

17 

18def _is_debug_mode(): 

19 # If you set _DEBUG to true, @coroutine will wrap the resulting 

20 # generator objects in a CoroWrapper instance (defined below). That 

21 # instance will log a message when the generator is never iterated 

22 # over, which may happen when you forget to use "await" or "yield from" 

23 # with a coroutine call. 

24 # Note that the value of the _DEBUG flag is taken 

25 # when the decorator is used, so to be of any use it must be set 

26 # before you define your coroutines. A downside of using this feature 

27 # is that tracebacks show entries for the CoroWrapper.__next__ method 

28 # when _DEBUG is true. 

29 return sys.flags.dev_mode or (not sys.flags.ignore_environment and 

30 bool(os.environ.get('PYTHONASYNCIODEBUG'))) 

31 

32 

33_DEBUG = _is_debug_mode() 

34 

35 

36class CoroWrapper: 

37 # Wrapper for coroutine object in _DEBUG mode. 

38 

39 def __init__(self, gen, func=None): 

40 assert inspect.isgenerator(gen) or inspect.iscoroutine(gen), gen 

41 self.gen = gen 

42 self.func = func # Used to unwrap @coroutine decorator 

43 self._source_traceback = format_helpers.extract_stack(sys._getframe(1)) 

44 self.__name__ = getattr(gen, '__name__', None) 

45 self.__qualname__ = getattr(gen, '__qualname__', None) 

46 

47 def __repr__(self): 

48 coro_repr = _format_coroutine(self) 

49 if self._source_traceback: 

50 frame = self._source_traceback[-1] 

51 coro_repr += f', created at {frame[0]}:{frame[1]}' 

52 

53 return f'<{self.__class__.__name__} {coro_repr}>' 

54 

55 def __iter__(self): 

56 return self 

57 

58 def __next__(self): 

59 return self.gen.send(None) 

60 

61 def send(self, value): 

62 return self.gen.send(value) 

63 

64 def throw(self, type, value=None, traceback=None): 

65 return self.gen.throw(type, value, traceback) 

66 

67 def close(self): 

68 return self.gen.close() 

69 

70 @property 

71 def gi_frame(self): 

72 return self.gen.gi_frame 

73 

74 @property 

75 def gi_running(self): 

76 return self.gen.gi_running 

77 

78 @property 

79 def gi_code(self): 

80 return self.gen.gi_code 

81 

82 def __await__(self): 

83 return self 

84 

85 @property 

86 def gi_yieldfrom(self): 

87 return self.gen.gi_yieldfrom 

88 

89 def __del__(self): 

90 # Be careful accessing self.gen.frame -- self.gen might not exist. 

91 gen = getattr(self, 'gen', None) 

92 frame = getattr(gen, 'gi_frame', None) 

93 if frame is not None and frame.f_lasti == -1: 

94 msg = f'{self!r} was never yielded from' 

95 tb = getattr(self, '_source_traceback', ()) 

96 if tb: 

97 tb = ''.join(traceback.format_list(tb)) 

98 msg += (f'\nCoroutine object created at ' 

99 f'(most recent call last, truncated to ' 

100 f'{constants.DEBUG_STACK_DEPTH} last lines):\n') 

101 msg += tb.rstrip() 

102 logger.error(msg) 

103 

104 

105def coroutine(func): 

106 """Decorator to mark coroutines. 

107 

108 If the coroutine is not yielded from before it is destroyed, 

109 an error message is logged. 

110 """ 

111 warnings.warn('"@coroutine" decorator is deprecated since Python 3.8, use "async def" instead', 

112 DeprecationWarning, 

113 stacklevel=2) 

114 if inspect.iscoroutinefunction(func): 

115 # In Python 3.5 that's all we need to do for coroutines 

116 # defined with "async def". 

117 return func 

118 

119 if inspect.isgeneratorfunction(func): 

120 coro = func 

121 else: 

122 @functools.wraps(func) 

123 def coro(*args, **kw): 

124 res = func(*args, **kw) 

125 if (base_futures.isfuture(res) or inspect.isgenerator(res) or 

126 isinstance(res, CoroWrapper)): 

127 res = yield from res 

128 else: 

129 # If 'res' is an awaitable, run it. 

130 try: 

131 await_meth = res.__await__ 

132 except AttributeError: 

133 pass 

134 else: 

135 if isinstance(res, collections.abc.Awaitable): 

136 res = yield from await_meth() 

137 return res 

138 

139 coro = types.coroutine(coro) 

140 if not _DEBUG: 

141 wrapper = coro 

142 else: 

143 @functools.wraps(func) 

144 def wrapper(*args, **kwds): 

145 w = CoroWrapper(coro(*args, **kwds), func=func) 

146 if w._source_traceback: 

147 del w._source_traceback[-1] 

148 # Python < 3.5 does not implement __qualname__ 

149 # on generator objects, so we set it manually. 

150 # We use getattr as some callables (such as 

151 # functools.partial may lack __qualname__). 

152 w.__name__ = getattr(func, '__name__', None) 

153 w.__qualname__ = getattr(func, '__qualname__', None) 

154 return w 

155 

156 wrapper._is_coroutine = _is_coroutine # For iscoroutinefunction(). 

157 return wrapper 

158 

159 

160# A marker for iscoroutinefunction. 

161_is_coroutine = object() 

162 

163 

164def iscoroutinefunction(func): 

165 """Return True if func is a decorated coroutine function.""" 

166 return (inspect.iscoroutinefunction(func) or 

167 getattr(func, '_is_coroutine', None) is _is_coroutine) 

168 

169 

170# Prioritize native coroutine check to speed-up 

171# asyncio.iscoroutine. 

172_COROUTINE_TYPES = (types.CoroutineType, types.GeneratorType, 

173 collections.abc.Coroutine, CoroWrapper) 

174_iscoroutine_typecache = set() 

175 

176 

177def iscoroutine(obj): 

178 """Return True if obj is a coroutine object.""" 

179 if type(obj) in _iscoroutine_typecache: 

180 return True 

181 

182 if isinstance(obj, _COROUTINE_TYPES): 

183 # Just in case we don't want to cache more than 100 

184 # positive types. That shouldn't ever happen, unless 

185 # someone stressing the system on purpose. 

186 if len(_iscoroutine_typecache) < 100: 

187 _iscoroutine_typecache.add(type(obj)) 

188 return True 

189 else: 

190 return False 

191 

192 

193def _format_coroutine(coro): 

194 assert iscoroutine(coro) 

195 

196 is_corowrapper = isinstance(coro, CoroWrapper) 

197 

198 def get_name(coro): 

199 # Coroutines compiled with Cython sometimes don't have 

200 # proper __qualname__ or __name__. While that is a bug 

201 # in Cython, asyncio shouldn't crash with an AttributeError 

202 # in its __repr__ functions. 

203 if is_corowrapper: 

204 return format_helpers._format_callback(coro.func, (), {}) 

205 

206 if hasattr(coro, '__qualname__') and coro.__qualname__: 

207 coro_name = coro.__qualname__ 

208 elif hasattr(coro, '__name__') and coro.__name__: 

209 coro_name = coro.__name__ 

210 else: 

211 # Stop masking Cython bugs, expose them in a friendly way. 

212 coro_name = f'<{type(coro).__name__} without __name__>' 

213 return f'{coro_name}()' 

214 

215 def is_running(coro): 

216 try: 

217 return coro.cr_running 

218 except AttributeError: 

219 try: 

220 return coro.gi_running 

221 except AttributeError: 

222 return False 

223 

224 coro_code = None 

225 if hasattr(coro, 'cr_code') and coro.cr_code: 

226 coro_code = coro.cr_code 

227 elif hasattr(coro, 'gi_code') and coro.gi_code: 

228 coro_code = coro.gi_code 

229 

230 coro_name = get_name(coro) 

231 

232 if not coro_code: 

233 # Built-in types might not have __qualname__ or __name__. 

234 if is_running(coro): 

235 return f'{coro_name} running' 

236 else: 

237 return coro_name 

238 

239 coro_frame = None 

240 if hasattr(coro, 'gi_frame') and coro.gi_frame: 

241 coro_frame = coro.gi_frame 

242 elif hasattr(coro, 'cr_frame') and coro.cr_frame: 

243 coro_frame = coro.cr_frame 

244 

245 # If Cython's coroutine has a fake code object without proper 

246 # co_filename -- expose that. 

247 filename = coro_code.co_filename or '<empty co_filename>' 

248 

249 lineno = 0 

250 if (is_corowrapper and 

251 coro.func is not None and 

252 not inspect.isgeneratorfunction(coro.func)): 

253 source = format_helpers._get_function_source(coro.func) 

254 if source is not None: 

255 filename, lineno = source 

256 if coro_frame is None: 

257 coro_repr = f'{coro_name} done, defined at {filename}:{lineno}' 

258 else: 

259 coro_repr = f'{coro_name} running, defined at {filename}:{lineno}' 

260 

261 elif coro_frame is not None: 

262 lineno = coro_frame.f_lineno 

263 coro_repr = f'{coro_name} running at {filename}:{lineno}' 

264 

265 else: 

266 lineno = coro_code.co_firstlineno 

267 coro_repr = f'{coro_name} done, defined at {filename}:{lineno}' 

268 

269 return coro_repr