Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/asyncio/coroutines.py: 26%
159 statements
« prev ^ index » next coverage.py v7.0.1, created at 2022-12-25 06:11 +0000
« prev ^ index » next coverage.py v7.0.1, created at 2022-12-25 06:11 +0000
1__all__ = 'coroutine', 'iscoroutinefunction', 'iscoroutine'
3import collections.abc
4import functools
5import inspect
6import os
7import sys
8import traceback
9import types
10import warnings
12from . import base_futures
13from . import constants
14from . import format_helpers
15from .log import logger
18def _is_debug_mode():
19 # If you set _DEBUG to true, @coroutine will wrap the resulting
20 # generator objects in a CoroWrapper instance (defined below). That
21 # instance will log a message when the generator is never iterated
22 # over, which may happen when you forget to use "await" or "yield from"
23 # with a coroutine call.
24 # Note that the value of the _DEBUG flag is taken
25 # when the decorator is used, so to be of any use it must be set
26 # before you define your coroutines. A downside of using this feature
27 # is that tracebacks show entries for the CoroWrapper.__next__ method
28 # when _DEBUG is true.
29 return sys.flags.dev_mode or (not sys.flags.ignore_environment and
30 bool(os.environ.get('PYTHONASYNCIODEBUG')))
33_DEBUG = _is_debug_mode()
36class CoroWrapper:
37 # Wrapper for coroutine object in _DEBUG mode.
39 def __init__(self, gen, func=None):
40 assert inspect.isgenerator(gen) or inspect.iscoroutine(gen), gen
41 self.gen = gen
42 self.func = func # Used to unwrap @coroutine decorator
43 self._source_traceback = format_helpers.extract_stack(sys._getframe(1))
44 self.__name__ = getattr(gen, '__name__', None)
45 self.__qualname__ = getattr(gen, '__qualname__', None)
47 def __repr__(self):
48 coro_repr = _format_coroutine(self)
49 if self._source_traceback:
50 frame = self._source_traceback[-1]
51 coro_repr += f', created at {frame[0]}:{frame[1]}'
53 return f'<{self.__class__.__name__} {coro_repr}>'
55 def __iter__(self):
56 return self
58 def __next__(self):
59 return self.gen.send(None)
61 def send(self, value):
62 return self.gen.send(value)
64 def throw(self, type, value=None, traceback=None):
65 return self.gen.throw(type, value, traceback)
67 def close(self):
68 return self.gen.close()
70 @property
71 def gi_frame(self):
72 return self.gen.gi_frame
74 @property
75 def gi_running(self):
76 return self.gen.gi_running
78 @property
79 def gi_code(self):
80 return self.gen.gi_code
82 def __await__(self):
83 return self
85 @property
86 def gi_yieldfrom(self):
87 return self.gen.gi_yieldfrom
89 def __del__(self):
90 # Be careful accessing self.gen.frame -- self.gen might not exist.
91 gen = getattr(self, 'gen', None)
92 frame = getattr(gen, 'gi_frame', None)
93 if frame is not None and frame.f_lasti == -1:
94 msg = f'{self!r} was never yielded from'
95 tb = getattr(self, '_source_traceback', ())
96 if tb:
97 tb = ''.join(traceback.format_list(tb))
98 msg += (f'\nCoroutine object created at '
99 f'(most recent call last, truncated to '
100 f'{constants.DEBUG_STACK_DEPTH} last lines):\n')
101 msg += tb.rstrip()
102 logger.error(msg)
105def coroutine(func):
106 """Decorator to mark coroutines.
108 If the coroutine is not yielded from before it is destroyed,
109 an error message is logged.
110 """
111 warnings.warn('"@coroutine" decorator is deprecated since Python 3.8, use "async def" instead',
112 DeprecationWarning,
113 stacklevel=2)
114 if inspect.iscoroutinefunction(func):
115 # In Python 3.5 that's all we need to do for coroutines
116 # defined with "async def".
117 return func
119 if inspect.isgeneratorfunction(func):
120 coro = func
121 else:
122 @functools.wraps(func)
123 def coro(*args, **kw):
124 res = func(*args, **kw)
125 if (base_futures.isfuture(res) or inspect.isgenerator(res) or
126 isinstance(res, CoroWrapper)):
127 res = yield from res
128 else:
129 # If 'res' is an awaitable, run it.
130 try:
131 await_meth = res.__await__
132 except AttributeError:
133 pass
134 else:
135 if isinstance(res, collections.abc.Awaitable):
136 res = yield from await_meth()
137 return res
139 coro = types.coroutine(coro)
140 if not _DEBUG:
141 wrapper = coro
142 else:
143 @functools.wraps(func)
144 def wrapper(*args, **kwds):
145 w = CoroWrapper(coro(*args, **kwds), func=func)
146 if w._source_traceback:
147 del w._source_traceback[-1]
148 # Python < 3.5 does not implement __qualname__
149 # on generator objects, so we set it manually.
150 # We use getattr as some callables (such as
151 # functools.partial may lack __qualname__).
152 w.__name__ = getattr(func, '__name__', None)
153 w.__qualname__ = getattr(func, '__qualname__', None)
154 return w
156 wrapper._is_coroutine = _is_coroutine # For iscoroutinefunction().
157 return wrapper
160# A marker for iscoroutinefunction.
161_is_coroutine = object()
164def iscoroutinefunction(func):
165 """Return True if func is a decorated coroutine function."""
166 return (inspect.iscoroutinefunction(func) or
167 getattr(func, '_is_coroutine', None) is _is_coroutine)
170# Prioritize native coroutine check to speed-up
171# asyncio.iscoroutine.
172_COROUTINE_TYPES = (types.CoroutineType, types.GeneratorType,
173 collections.abc.Coroutine, CoroWrapper)
174_iscoroutine_typecache = set()
177def iscoroutine(obj):
178 """Return True if obj is a coroutine object."""
179 if type(obj) in _iscoroutine_typecache:
180 return True
182 if isinstance(obj, _COROUTINE_TYPES):
183 # Just in case we don't want to cache more than 100
184 # positive types. That shouldn't ever happen, unless
185 # someone stressing the system on purpose.
186 if len(_iscoroutine_typecache) < 100:
187 _iscoroutine_typecache.add(type(obj))
188 return True
189 else:
190 return False
193def _format_coroutine(coro):
194 assert iscoroutine(coro)
196 is_corowrapper = isinstance(coro, CoroWrapper)
198 def get_name(coro):
199 # Coroutines compiled with Cython sometimes don't have
200 # proper __qualname__ or __name__. While that is a bug
201 # in Cython, asyncio shouldn't crash with an AttributeError
202 # in its __repr__ functions.
203 if is_corowrapper:
204 return format_helpers._format_callback(coro.func, (), {})
206 if hasattr(coro, '__qualname__') and coro.__qualname__:
207 coro_name = coro.__qualname__
208 elif hasattr(coro, '__name__') and coro.__name__:
209 coro_name = coro.__name__
210 else:
211 # Stop masking Cython bugs, expose them in a friendly way.
212 coro_name = f'<{type(coro).__name__} without __name__>'
213 return f'{coro_name}()'
215 def is_running(coro):
216 try:
217 return coro.cr_running
218 except AttributeError:
219 try:
220 return coro.gi_running
221 except AttributeError:
222 return False
224 coro_code = None
225 if hasattr(coro, 'cr_code') and coro.cr_code:
226 coro_code = coro.cr_code
227 elif hasattr(coro, 'gi_code') and coro.gi_code:
228 coro_code = coro.gi_code
230 coro_name = get_name(coro)
232 if not coro_code:
233 # Built-in types might not have __qualname__ or __name__.
234 if is_running(coro):
235 return f'{coro_name} running'
236 else:
237 return coro_name
239 coro_frame = None
240 if hasattr(coro, 'gi_frame') and coro.gi_frame:
241 coro_frame = coro.gi_frame
242 elif hasattr(coro, 'cr_frame') and coro.cr_frame:
243 coro_frame = coro.cr_frame
245 # If Cython's coroutine has a fake code object without proper
246 # co_filename -- expose that.
247 filename = coro_code.co_filename or '<empty co_filename>'
249 lineno = 0
250 if (is_corowrapper and
251 coro.func is not None and
252 not inspect.isgeneratorfunction(coro.func)):
253 source = format_helpers._get_function_source(coro.func)
254 if source is not None:
255 filename, lineno = source
256 if coro_frame is None:
257 coro_repr = f'{coro_name} done, defined at {filename}:{lineno}'
258 else:
259 coro_repr = f'{coro_name} running, defined at {filename}:{lineno}'
261 elif coro_frame is not None:
262 lineno = coro_frame.f_lineno
263 coro_repr = f'{coro_name} running at {filename}:{lineno}'
265 else:
266 lineno = coro_code.co_firstlineno
267 coro_repr = f'{coro_name} done, defined at {filename}:{lineno}'
269 return coro_repr