Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/asgiref/sync.py: 2%
241 statements
« prev ^ index » next coverage.py v7.0.5, created at 2023-01-17 06:13 +0000
« prev ^ index » next coverage.py v7.0.5, created at 2023-01-17 06:13 +0000
1import asyncio
2import asyncio.coroutines
3import contextvars
4import functools
5import inspect
6import os
7import sys
8import threading
9import warnings
10import weakref
11from concurrent.futures import Future, ThreadPoolExecutor
12from typing import Any, Callable, Dict, Optional, overload
14from .current_thread_executor import CurrentThreadExecutor
15from .local import Local
18def _restore_context(context):
19 # Check for changes in contextvars, and set them to the current
20 # context for downstream consumers
21 for cvar in context:
22 try:
23 if cvar.get() != context.get(cvar):
24 cvar.set(context.get(cvar))
25 except LookupError:
26 cvar.set(context.get(cvar))
29# Python 3.12 deprecates asyncio.iscoroutinefunction() as an alias for
30# inspect.iscoroutinefunction(), whilst also removing the _is_coroutine marker.
31# The latter is replaced with the inspect.markcoroutinefunction decorator.
32# Until 3.12 is the minimum supported Python version, provide a shim.
33# Django 4.0 only supports 3.8+, so don't concern with the _or_partial backport.
35# Type hint: should be generic: whatever T it takes it returns. (Same id)
36def markcoroutinefunction(func: Any) -> Any:
37 if hasattr(inspect, "markcoroutinefunction"):
38 return inspect.markcoroutinefunction(func)
39 else:
40 func._is_coroutine = asyncio.coroutines._is_coroutine # type: ignore
41 return func
44def iscoroutinefunction(func: Any) -> bool:
45 if hasattr(inspect, "markcoroutinefunction"):
46 return inspect.iscoroutinefunction(func)
47 else:
48 return asyncio.iscoroutinefunction(func)
51def _iscoroutinefunction_or_partial(func: Any) -> bool:
52 # Python < 3.8 does not correctly determine partially wrapped
53 # coroutine functions are coroutine functions, hence the need for
54 # this to exist. Code taken from CPython.
55 if sys.version_info >= (3, 8):
56 return iscoroutinefunction(func)
57 else:
58 while inspect.ismethod(func):
59 func = func.__func__
60 while isinstance(func, functools.partial):
61 func = func.func
63 return iscoroutinefunction(func)
66class ThreadSensitiveContext:
67 """Async context manager to manage context for thread sensitive mode
69 This context manager controls which thread pool executor is used when in
70 thread sensitive mode. By default, a single thread pool executor is shared
71 within a process.
73 In Python 3.7+, the ThreadSensitiveContext() context manager may be used to
74 specify a thread pool per context.
76 This context manager is re-entrant, so only the outer-most call to
77 ThreadSensitiveContext will set the context.
79 Usage:
81 >>> import time
82 >>> async with ThreadSensitiveContext():
83 ... await sync_to_async(time.sleep, 1)()
84 """
86 def __init__(self):
87 self.token = None
89 async def __aenter__(self):
90 try:
91 SyncToAsync.thread_sensitive_context.get()
92 except LookupError:
93 self.token = SyncToAsync.thread_sensitive_context.set(self)
95 return self
97 async def __aexit__(self, exc, value, tb):
98 if not self.token:
99 return
101 executor = SyncToAsync.context_to_thread_executor.pop(self, None)
102 if executor:
103 executor.shutdown()
104 SyncToAsync.thread_sensitive_context.reset(self.token)
107class AsyncToSync:
108 """
109 Utility class which turns an awaitable that only works on the thread with
110 the event loop into a synchronous callable that works in a subthread.
112 If the call stack contains an async loop, the code runs there.
113 Otherwise, the code runs in a new loop in a new thread.
115 Either way, this thread then pauses and waits to run any thread_sensitive
116 code called from further down the call stack using SyncToAsync, before
117 finally exiting once the async task returns.
118 """
120 # Maps launched Tasks to the threads that launched them (for locals impl)
121 launch_map: "Dict[asyncio.Task[object], threading.Thread]" = {}
123 # Keeps track of which CurrentThreadExecutor to use. This uses an asgiref
124 # Local, not a threadlocal, so that tasks can work out what their parent used.
125 executors = Local()
127 # When we can't find a CurrentThreadExecutor from the context, such as
128 # inside create_task, we'll look it up here from the running event loop.
129 loop_thread_executors: "Dict[asyncio.AbstractEventLoop, CurrentThreadExecutor]" = {}
131 def __init__(self, awaitable, force_new_loop=False):
132 if not callable(awaitable) or (
133 not _iscoroutinefunction_or_partial(awaitable)
134 and not _iscoroutinefunction_or_partial(
135 getattr(awaitable, "__call__", awaitable)
136 )
137 ):
138 # Python does not have very reliable detection of async functions
139 # (lots of false negatives) so this is just a warning.
140 warnings.warn(
141 "async_to_sync was passed a non-async-marked callable", stacklevel=2
142 )
143 self.awaitable = awaitable
144 try:
145 self.__self__ = self.awaitable.__self__
146 except AttributeError:
147 pass
148 if force_new_loop:
149 # They have asked that we always run in a new sub-loop.
150 self.main_event_loop = None
151 else:
152 try:
153 self.main_event_loop = asyncio.get_running_loop()
154 except RuntimeError:
155 # There's no event loop in this thread. Look for the threadlocal if
156 # we're inside SyncToAsync
157 main_event_loop_pid = getattr(
158 SyncToAsync.threadlocal, "main_event_loop_pid", None
159 )
160 # We make sure the parent loop is from the same process - if
161 # they've forked, this is not going to be valid any more (#194)
162 if main_event_loop_pid and main_event_loop_pid == os.getpid():
163 self.main_event_loop = getattr(
164 SyncToAsync.threadlocal, "main_event_loop", None
165 )
166 else:
167 self.main_event_loop = None
169 def __call__(self, *args, **kwargs):
170 # You can't call AsyncToSync from a thread with a running event loop
171 try:
172 event_loop = asyncio.get_running_loop()
173 except RuntimeError:
174 pass
175 else:
176 if event_loop.is_running():
177 raise RuntimeError(
178 "You cannot use AsyncToSync in the same thread as an async event loop - "
179 "just await the async function directly."
180 )
182 # Wrapping context in list so it can be reassigned from within
183 # `main_wrap`.
184 context = [contextvars.copy_context()]
186 # Make a future for the return information
187 call_result = Future()
188 # Get the source thread
189 source_thread = threading.current_thread()
190 # Make a CurrentThreadExecutor we'll use to idle in this thread - we
191 # need one for every sync frame, even if there's one above us in the
192 # same thread.
193 if hasattr(self.executors, "current"):
194 old_current_executor = self.executors.current
195 else:
196 old_current_executor = None
197 current_executor = CurrentThreadExecutor()
198 self.executors.current = current_executor
199 loop = None
200 # Use call_soon_threadsafe to schedule a synchronous callback on the
201 # main event loop's thread if it's there, otherwise make a new loop
202 # in this thread.
203 try:
204 awaitable = self.main_wrap(
205 args, kwargs, call_result, source_thread, sys.exc_info(), context
206 )
208 if not (self.main_event_loop and self.main_event_loop.is_running()):
209 # Make our own event loop - in a new thread - and run inside that.
210 loop = asyncio.new_event_loop()
211 self.loop_thread_executors[loop] = current_executor
212 loop_executor = ThreadPoolExecutor(max_workers=1)
213 loop_future = loop_executor.submit(
214 self._run_event_loop, loop, awaitable
215 )
216 if current_executor:
217 # Run the CurrentThreadExecutor until the future is done
218 current_executor.run_until_future(loop_future)
219 # Wait for future and/or allow for exception propagation
220 loop_future.result()
221 else:
222 # Call it inside the existing loop
223 self.main_event_loop.call_soon_threadsafe(
224 self.main_event_loop.create_task, awaitable
225 )
226 if current_executor:
227 # Run the CurrentThreadExecutor until the future is done
228 current_executor.run_until_future(call_result)
229 finally:
230 # Clean up any executor we were running
231 if loop is not None:
232 del self.loop_thread_executors[loop]
233 if hasattr(self.executors, "current"):
234 del self.executors.current
235 if old_current_executor:
236 self.executors.current = old_current_executor
237 _restore_context(context[0])
239 # Wait for results from the future.
240 return call_result.result()
242 def _run_event_loop(self, loop, coro):
243 """
244 Runs the given event loop (designed to be called in a thread).
245 """
246 asyncio.set_event_loop(loop)
247 try:
248 loop.run_until_complete(coro)
249 finally:
250 try:
251 # mimic asyncio.run() behavior
252 # cancel unexhausted async generators
253 tasks = asyncio.all_tasks(loop)
254 for task in tasks:
255 task.cancel()
257 async def gather():
258 await asyncio.gather(*tasks, return_exceptions=True)
260 loop.run_until_complete(gather())
261 for task in tasks:
262 if task.cancelled():
263 continue
264 if task.exception() is not None:
265 loop.call_exception_handler(
266 {
267 "message": "unhandled exception during loop shutdown",
268 "exception": task.exception(),
269 "task": task,
270 }
271 )
272 if hasattr(loop, "shutdown_asyncgens"):
273 loop.run_until_complete(loop.shutdown_asyncgens())
274 finally:
275 loop.close()
276 asyncio.set_event_loop(self.main_event_loop)
278 def __get__(self, parent, objtype):
279 """
280 Include self for methods
281 """
282 func = functools.partial(self.__call__, parent)
283 return functools.update_wrapper(func, self.awaitable)
285 async def main_wrap(
286 self, args, kwargs, call_result, source_thread, exc_info, context
287 ):
288 """
289 Wraps the awaitable with something that puts the result into the
290 result/exception future.
291 """
292 if context is not None:
293 _restore_context(context[0])
295 current_task = SyncToAsync.get_current_task()
296 self.launch_map[current_task] = source_thread
297 try:
298 # If we have an exception, run the function inside the except block
299 # after raising it so exc_info is correctly populated.
300 if exc_info[1]:
301 try:
302 raise exc_info[1]
303 except BaseException:
304 result = await self.awaitable(*args, **kwargs)
305 else:
306 result = await self.awaitable(*args, **kwargs)
307 except BaseException as e:
308 call_result.set_exception(e)
309 else:
310 call_result.set_result(result)
311 finally:
312 del self.launch_map[current_task]
314 context[0] = contextvars.copy_context()
317class SyncToAsync:
318 """
319 Utility class which turns a synchronous callable into an awaitable that
320 runs in a threadpool. It also sets a threadlocal inside the thread so
321 calls to AsyncToSync can escape it.
323 If thread_sensitive is passed, the code will run in the same thread as any
324 outer code. This is needed for underlying Python code that is not
325 threadsafe (for example, code which handles SQLite database connections).
327 If the outermost program is async (i.e. SyncToAsync is outermost), then
328 this will be a dedicated single sub-thread that all sync code runs in,
329 one after the other. If the outermost program is sync (i.e. AsyncToSync is
330 outermost), this will just be the main thread. This is achieved by idling
331 with a CurrentThreadExecutor while AsyncToSync is blocking its sync parent,
332 rather than just blocking.
334 If executor is passed in, that will be used instead of the loop's default executor.
335 In order to pass in an executor, thread_sensitive must be set to False, otherwise
336 a TypeError will be raised.
337 """
339 # Maps launched threads to the coroutines that spawned them
340 launch_map: "Dict[threading.Thread, asyncio.Task[object]]" = {}
342 # Storage for main event loop references
343 threadlocal = threading.local()
345 # Single-thread executor for thread-sensitive code
346 single_thread_executor = ThreadPoolExecutor(max_workers=1)
348 # Maintain a contextvar for the current execution context. Optionally used
349 # for thread sensitive mode.
350 thread_sensitive_context: "contextvars.ContextVar[str]" = contextvars.ContextVar(
351 "thread_sensitive_context"
352 )
354 # Contextvar that is used to detect if the single thread executor
355 # would be awaited on while already being used in the same context
356 deadlock_context: "contextvars.ContextVar[bool]" = contextvars.ContextVar(
357 "deadlock_context"
358 )
360 # Maintaining a weak reference to the context ensures that thread pools are
361 # erased once the context goes out of scope. This terminates the thread pool.
362 context_to_thread_executor: "weakref.WeakKeyDictionary[object, ThreadPoolExecutor]" = (
363 weakref.WeakKeyDictionary()
364 )
366 def __init__(
367 self,
368 func: Callable[..., Any],
369 thread_sensitive: bool = True,
370 executor: Optional["ThreadPoolExecutor"] = None,
371 ) -> None:
372 if (
373 not callable(func)
374 or _iscoroutinefunction_or_partial(func)
375 or _iscoroutinefunction_or_partial(getattr(func, "__call__", func))
376 ):
377 raise TypeError("sync_to_async can only be applied to sync functions.")
378 self.func = func
379 functools.update_wrapper(self, func)
380 self._thread_sensitive = thread_sensitive
381 markcoroutinefunction(self)
382 if thread_sensitive and executor is not None:
383 raise TypeError("executor must not be set when thread_sensitive is True")
384 self._executor = executor
385 try:
386 self.__self__ = func.__self__ # type: ignore
387 except AttributeError:
388 pass
390 async def __call__(self, *args, **kwargs):
391 loop = asyncio.get_running_loop()
393 # Work out what thread to run the code in
394 if self._thread_sensitive:
395 if hasattr(AsyncToSync.executors, "current"):
396 # If we have a parent sync thread above somewhere, use that
397 executor = AsyncToSync.executors.current
398 elif self.thread_sensitive_context and self.thread_sensitive_context.get(
399 None
400 ):
401 # If we have a way of retrieving the current context, attempt
402 # to use a per-context thread pool executor
403 thread_sensitive_context = self.thread_sensitive_context.get()
405 if thread_sensitive_context in self.context_to_thread_executor:
406 # Re-use thread executor in current context
407 executor = self.context_to_thread_executor[thread_sensitive_context]
408 else:
409 # Create new thread executor in current context
410 executor = ThreadPoolExecutor(max_workers=1)
411 self.context_to_thread_executor[thread_sensitive_context] = executor
412 elif loop in AsyncToSync.loop_thread_executors:
413 # Re-use thread executor for running loop
414 executor = AsyncToSync.loop_thread_executors[loop]
415 elif self.deadlock_context and self.deadlock_context.get(False):
416 raise RuntimeError(
417 "Single thread executor already being used, would deadlock"
418 )
419 else:
420 # Otherwise, we run it in a fixed single thread
421 executor = self.single_thread_executor
422 if self.deadlock_context:
423 self.deadlock_context.set(True)
424 else:
425 # Use the passed in executor, or the loop's default if it is None
426 executor = self._executor
428 context = contextvars.copy_context()
429 child = functools.partial(self.func, *args, **kwargs)
430 func = context.run
431 args = (child,)
432 kwargs = {}
434 try:
435 # Run the code in the right thread
436 future = loop.run_in_executor(
437 executor,
438 functools.partial(
439 self.thread_handler,
440 loop,
441 self.get_current_task(),
442 sys.exc_info(),
443 func,
444 *args,
445 **kwargs,
446 ),
447 )
448 ret = await asyncio.wait_for(future, timeout=None)
450 finally:
451 _restore_context(context)
452 if self.deadlock_context:
453 self.deadlock_context.set(False)
455 return ret
457 def __get__(self, parent, objtype):
458 """
459 Include self for methods
460 """
461 func = functools.partial(self.__call__, parent)
462 return functools.update_wrapper(func, self.func)
464 def thread_handler(self, loop, source_task, exc_info, func, *args, **kwargs):
465 """
466 Wraps the sync application with exception handling.
467 """
468 # Set the threadlocal for AsyncToSync
469 self.threadlocal.main_event_loop = loop
470 self.threadlocal.main_event_loop_pid = os.getpid()
471 # Set the task mapping (used for the locals module)
472 current_thread = threading.current_thread()
473 if AsyncToSync.launch_map.get(source_task) == current_thread:
474 # Our parent task was launched from this same thread, so don't make
475 # a launch map entry - let it shortcut over us! (and stop infinite loops)
476 parent_set = False
477 else:
478 self.launch_map[current_thread] = source_task
479 parent_set = True
480 # Run the function
481 try:
482 # If we have an exception, run the function inside the except block
483 # after raising it so exc_info is correctly populated.
484 if exc_info[1]:
485 try:
486 raise exc_info[1]
487 except BaseException:
488 return func(*args, **kwargs)
489 else:
490 return func(*args, **kwargs)
491 finally:
492 # Only delete the launch_map parent if we set it, otherwise it is
493 # from someone else.
494 if parent_set:
495 del self.launch_map[current_thread]
497 @staticmethod
498 def get_current_task():
499 """
500 Implementation of asyncio.current_task()
501 that returns None if there is no task.
502 """
503 try:
504 return asyncio.current_task()
505 except RuntimeError:
506 return None
509# Lowercase aliases (and decorator friendliness)
510async_to_sync = AsyncToSync
513@overload
514def sync_to_async(
515 func: None = None,
516 thread_sensitive: bool = True,
517 executor: Optional["ThreadPoolExecutor"] = None,
518) -> Callable[[Callable[..., Any]], SyncToAsync]:
519 ...
522@overload
523def sync_to_async(
524 func: Callable[..., Any],
525 thread_sensitive: bool = True,
526 executor: Optional["ThreadPoolExecutor"] = None,
527) -> SyncToAsync:
528 ...
531def sync_to_async(
532 func=None,
533 thread_sensitive=True,
534 executor=None,
535):
536 if func is None:
537 return lambda f: SyncToAsync(
538 f,
539 thread_sensitive=thread_sensitive,
540 executor=executor,
541 )
542 return SyncToAsync(
543 func,
544 thread_sensitive=thread_sensitive,
545 executor=executor,
546 )