Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/asgiref/sync.py: 22%
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1import asyncio
2import asyncio.coroutines
3import contextvars
4import functools
5import inspect
6import os
7import sys
8import threading
9import warnings
10import weakref
11from concurrent.futures import Future, ThreadPoolExecutor
12from typing import (
13 TYPE_CHECKING,
14 Any,
15 Awaitable,
16 Callable,
17 Coroutine,
18 Dict,
19 Generic,
20 List,
21 Optional,
22 TypeVar,
23 Union,
24 overload,
25)
27from .current_thread_executor import CurrentThreadExecutor
28from .local import Local
30if sys.version_info >= (3, 10):
31 from typing import ParamSpec
32else:
33 from typing_extensions import ParamSpec
35if TYPE_CHECKING:
36 # This is not available to import at runtime
37 from _typeshed import OptExcInfo
39_F = TypeVar("_F", bound=Callable[..., Any])
40_P = ParamSpec("_P")
41_R = TypeVar("_R")
44def _restore_context(context: contextvars.Context) -> None:
45 # Check for changes in contextvars, and set them to the current
46 # context for downstream consumers
47 for cvar in context:
48 cvalue = context.get(cvar)
49 try:
50 if cvar.get() != cvalue:
51 cvar.set(cvalue)
52 except LookupError:
53 cvar.set(cvalue)
56# Python 3.12 deprecates asyncio.iscoroutinefunction() as an alias for
57# inspect.iscoroutinefunction(), whilst also removing the _is_coroutine marker.
58# The latter is replaced with the inspect.markcoroutinefunction decorator.
59# Until 3.12 is the minimum supported Python version, provide a shim.
61if hasattr(inspect, "markcoroutinefunction"):
62 iscoroutinefunction = inspect.iscoroutinefunction
63 markcoroutinefunction: Callable[[_F], _F] = inspect.markcoroutinefunction
64else:
65 iscoroutinefunction = asyncio.iscoroutinefunction # type: ignore[assignment]
67 def markcoroutinefunction(func: _F) -> _F:
68 func._is_coroutine = asyncio.coroutines._is_coroutine # type: ignore
69 return func
72class ThreadSensitiveContext:
73 """Async context manager to manage context for thread sensitive mode
75 This context manager controls which thread pool executor is used when in
76 thread sensitive mode. By default, a single thread pool executor is shared
77 within a process.
79 The ThreadSensitiveContext() context manager may be used to specify a
80 thread pool per context.
82 This context manager is re-entrant, so only the outer-most call to
83 ThreadSensitiveContext will set the context.
85 Usage:
87 >>> import time
88 >>> async with ThreadSensitiveContext():
89 ... await sync_to_async(time.sleep, 1)()
90 """
92 def __init__(self):
93 self.token = None
95 async def __aenter__(self):
96 try:
97 SyncToAsync.thread_sensitive_context.get()
98 except LookupError:
99 self.token = SyncToAsync.thread_sensitive_context.set(self)
101 return self
103 async def __aexit__(self, exc, value, tb):
104 if not self.token:
105 return
107 executor = SyncToAsync.context_to_thread_executor.pop(self, None)
108 if executor:
109 executor.shutdown()
110 SyncToAsync.thread_sensitive_context.reset(self.token)
113class AsyncToSync(Generic[_P, _R]):
114 """
115 Utility class which turns an awaitable that only works on the thread with
116 the event loop into a synchronous callable that works in a subthread.
118 If the call stack contains an async loop, the code runs there.
119 Otherwise, the code runs in a new loop in a new thread.
121 Either way, this thread then pauses and waits to run any thread_sensitive
122 code called from further down the call stack using SyncToAsync, before
123 finally exiting once the async task returns.
124 """
126 # Keeps a reference to the CurrentThreadExecutor in local context, so that
127 # any sync_to_async inside the wrapped code can find it.
128 executors: "Local" = Local()
130 # When we can't find a CurrentThreadExecutor from the context, such as
131 # inside create_task, we'll look it up here from the running event loop.
132 loop_thread_executors: "Dict[asyncio.AbstractEventLoop, CurrentThreadExecutor]" = {}
134 def __init__(
135 self,
136 awaitable: Union[
137 Callable[_P, Coroutine[Any, Any, _R]],
138 Callable[_P, Awaitable[_R]],
139 ],
140 force_new_loop: bool = False,
141 ):
142 if not callable(awaitable) or (
143 not iscoroutinefunction(awaitable)
144 and not iscoroutinefunction(getattr(awaitable, "__call__", awaitable))
145 ):
146 # Python does not have very reliable detection of async functions
147 # (lots of false negatives) so this is just a warning.
148 warnings.warn(
149 "async_to_sync was passed a non-async-marked callable", stacklevel=2
150 )
151 self.awaitable = awaitable
152 try:
153 self.__self__ = self.awaitable.__self__ # type: ignore[union-attr]
154 except AttributeError:
155 pass
156 self.force_new_loop = force_new_loop
157 self.main_event_loop = None
158 try:
159 self.main_event_loop = asyncio.get_running_loop()
160 except RuntimeError:
161 # There's no event loop in this thread.
162 pass
164 def __call__(self, *args: _P.args, **kwargs: _P.kwargs) -> _R:
165 __traceback_hide__ = True # noqa: F841
167 if not self.force_new_loop and not self.main_event_loop:
168 # There's no event loop in this thread. Look for the threadlocal if
169 # we're inside SyncToAsync
170 main_event_loop_pid = getattr(
171 SyncToAsync.threadlocal, "main_event_loop_pid", None
172 )
173 # We make sure the parent loop is from the same process - if
174 # they've forked, this is not going to be valid any more (#194)
175 if main_event_loop_pid and main_event_loop_pid == os.getpid():
176 self.main_event_loop = getattr(
177 SyncToAsync.threadlocal, "main_event_loop", None
178 )
180 # You can't call AsyncToSync from a thread with a running event loop
181 try:
182 event_loop = asyncio.get_running_loop()
183 except RuntimeError:
184 pass
185 else:
186 if event_loop.is_running():
187 raise RuntimeError(
188 "You cannot use AsyncToSync in the same thread as an async event loop - "
189 "just await the async function directly."
190 )
192 # Make a future for the return information
193 call_result: "Future[_R]" = Future()
195 # Make a CurrentThreadExecutor we'll use to idle in this thread - we
196 # need one for every sync frame, even if there's one above us in the
197 # same thread.
198 old_executor = getattr(self.executors, "current", None)
199 current_executor = CurrentThreadExecutor()
200 self.executors.current = current_executor
202 # Wrapping context in list so it can be reassigned from within
203 # `main_wrap`.
204 context = [contextvars.copy_context()]
206 # Get task context so that parent task knows which task to propagate
207 # an asyncio.CancelledError to.
208 task_context = getattr(SyncToAsync.threadlocal, "task_context", None)
210 loop = None
211 # Use call_soon_threadsafe to schedule a synchronous callback on the
212 # main event loop's thread if it's there, otherwise make a new loop
213 # in this thread.
214 try:
215 awaitable = self.main_wrap(
216 call_result,
217 sys.exc_info(),
218 task_context,
219 context,
220 *args,
221 **kwargs,
222 )
224 if not (self.main_event_loop and self.main_event_loop.is_running()):
225 # Make our own event loop - in a new thread - and run inside that.
226 loop = asyncio.new_event_loop()
227 self.loop_thread_executors[loop] = current_executor
228 loop_executor = ThreadPoolExecutor(max_workers=1)
229 loop_future = loop_executor.submit(
230 self._run_event_loop, loop, awaitable
231 )
232 if current_executor:
233 # Run the CurrentThreadExecutor until the future is done
234 current_executor.run_until_future(loop_future)
235 # Wait for future and/or allow for exception propagation
236 loop_future.result()
237 else:
238 # Call it inside the existing loop
239 self.main_event_loop.call_soon_threadsafe(
240 self.main_event_loop.create_task, awaitable
241 )
242 if current_executor:
243 # Run the CurrentThreadExecutor until the future is done
244 current_executor.run_until_future(call_result)
245 finally:
246 # Clean up any executor we were running
247 if loop is not None:
248 del self.loop_thread_executors[loop]
249 _restore_context(context[0])
250 # Restore old current thread executor state
251 self.executors.current = old_executor
253 # Wait for results from the future.
254 return call_result.result()
256 def _run_event_loop(self, loop, coro):
257 """
258 Runs the given event loop (designed to be called in a thread).
259 """
260 asyncio.set_event_loop(loop)
261 try:
262 loop.run_until_complete(coro)
263 finally:
264 try:
265 # mimic asyncio.run() behavior
266 # cancel unexhausted async generators
267 tasks = asyncio.all_tasks(loop)
268 for task in tasks:
269 task.cancel()
271 async def gather():
272 await asyncio.gather(*tasks, return_exceptions=True)
274 loop.run_until_complete(gather())
275 for task in tasks:
276 if task.cancelled():
277 continue
278 if task.exception() is not None:
279 loop.call_exception_handler(
280 {
281 "message": "unhandled exception during loop shutdown",
282 "exception": task.exception(),
283 "task": task,
284 }
285 )
286 if hasattr(loop, "shutdown_asyncgens"):
287 loop.run_until_complete(loop.shutdown_asyncgens())
288 finally:
289 loop.close()
290 asyncio.set_event_loop(self.main_event_loop)
292 def __get__(self, parent: Any, objtype: Any) -> Callable[_P, _R]:
293 """
294 Include self for methods
295 """
296 func = functools.partial(self.__call__, parent)
297 return functools.update_wrapper(func, self.awaitable)
299 async def main_wrap(
300 self,
301 call_result: "Future[_R]",
302 exc_info: "OptExcInfo",
303 task_context: "Optional[List[asyncio.Task[Any]]]",
304 context: List[contextvars.Context],
305 *args: _P.args,
306 **kwargs: _P.kwargs,
307 ) -> None:
308 """
309 Wraps the awaitable with something that puts the result into the
310 result/exception future.
311 """
313 __traceback_hide__ = True # noqa: F841
315 if context is not None:
316 _restore_context(context[0])
318 current_task = asyncio.current_task()
319 if current_task is not None and task_context is not None:
320 task_context.append(current_task)
322 try:
323 # If we have an exception, run the function inside the except block
324 # after raising it so exc_info is correctly populated.
325 if exc_info[1]:
326 try:
327 raise exc_info[1]
328 except BaseException:
329 result = await self.awaitable(*args, **kwargs)
330 else:
331 result = await self.awaitable(*args, **kwargs)
332 except BaseException as e:
333 call_result.set_exception(e)
334 else:
335 call_result.set_result(result)
336 finally:
337 if current_task is not None and task_context is not None:
338 task_context.remove(current_task)
339 context[0] = contextvars.copy_context()
342class SyncToAsync(Generic[_P, _R]):
343 """
344 Utility class which turns a synchronous callable into an awaitable that
345 runs in a threadpool. It also sets a threadlocal inside the thread so
346 calls to AsyncToSync can escape it.
348 If thread_sensitive is passed, the code will run in the same thread as any
349 outer code. This is needed for underlying Python code that is not
350 threadsafe (for example, code which handles SQLite database connections).
352 If the outermost program is async (i.e. SyncToAsync is outermost), then
353 this will be a dedicated single sub-thread that all sync code runs in,
354 one after the other. If the outermost program is sync (i.e. AsyncToSync is
355 outermost), this will just be the main thread. This is achieved by idling
356 with a CurrentThreadExecutor while AsyncToSync is blocking its sync parent,
357 rather than just blocking.
359 If executor is passed in, that will be used instead of the loop's default executor.
360 In order to pass in an executor, thread_sensitive must be set to False, otherwise
361 a TypeError will be raised.
362 """
364 # Storage for main event loop references
365 threadlocal = threading.local()
367 # Single-thread executor for thread-sensitive code
368 single_thread_executor = ThreadPoolExecutor(max_workers=1)
370 # Maintain a contextvar for the current execution context. Optionally used
371 # for thread sensitive mode.
372 thread_sensitive_context: "contextvars.ContextVar[ThreadSensitiveContext]" = (
373 contextvars.ContextVar("thread_sensitive_context")
374 )
376 # Contextvar that is used to detect if the single thread executor
377 # would be awaited on while already being used in the same context
378 deadlock_context: "contextvars.ContextVar[bool]" = contextvars.ContextVar(
379 "deadlock_context"
380 )
382 # Maintaining a weak reference to the context ensures that thread pools are
383 # erased once the context goes out of scope. This terminates the thread pool.
384 context_to_thread_executor: "weakref.WeakKeyDictionary[ThreadSensitiveContext, ThreadPoolExecutor]" = (
385 weakref.WeakKeyDictionary()
386 )
388 def __init__(
389 self,
390 func: Callable[_P, _R],
391 thread_sensitive: bool = True,
392 executor: Optional["ThreadPoolExecutor"] = None,
393 ) -> None:
394 if (
395 not callable(func)
396 or iscoroutinefunction(func)
397 or iscoroutinefunction(getattr(func, "__call__", func))
398 ):
399 raise TypeError("sync_to_async can only be applied to sync functions.")
400 self.func = func
401 functools.update_wrapper(self, func)
402 self._thread_sensitive = thread_sensitive
403 markcoroutinefunction(self)
404 if thread_sensitive and executor is not None:
405 raise TypeError("executor must not be set when thread_sensitive is True")
406 self._executor = executor
407 try:
408 self.__self__ = func.__self__ # type: ignore
409 except AttributeError:
410 pass
412 async def __call__(self, *args: _P.args, **kwargs: _P.kwargs) -> _R:
413 __traceback_hide__ = True # noqa: F841
414 loop = asyncio.get_running_loop()
416 # Work out what thread to run the code in
417 if self._thread_sensitive:
418 current_thread_executor = getattr(AsyncToSync.executors, "current", None)
419 if current_thread_executor:
420 # If we have a parent sync thread above somewhere, use that
421 executor = current_thread_executor
422 elif self.thread_sensitive_context.get(None):
423 # If we have a way of retrieving the current context, attempt
424 # to use a per-context thread pool executor
425 thread_sensitive_context = self.thread_sensitive_context.get()
427 if thread_sensitive_context in self.context_to_thread_executor:
428 # Re-use thread executor in current context
429 executor = self.context_to_thread_executor[thread_sensitive_context]
430 else:
431 # Create new thread executor in current context
432 executor = ThreadPoolExecutor(max_workers=1)
433 self.context_to_thread_executor[thread_sensitive_context] = executor
434 elif loop in AsyncToSync.loop_thread_executors:
435 # Re-use thread executor for running loop
436 executor = AsyncToSync.loop_thread_executors[loop]
437 elif self.deadlock_context.get(False):
438 raise RuntimeError(
439 "Single thread executor already being used, would deadlock"
440 )
441 else:
442 # Otherwise, we run it in a fixed single thread
443 executor = self.single_thread_executor
444 self.deadlock_context.set(True)
445 else:
446 # Use the passed in executor, or the loop's default if it is None
447 executor = self._executor
449 context = contextvars.copy_context()
450 child = functools.partial(self.func, *args, **kwargs)
451 func = context.run
452 task_context: List[asyncio.Task[Any]] = []
454 # Run the code in the right thread
455 exec_coro = loop.run_in_executor(
456 executor,
457 functools.partial(
458 self.thread_handler,
459 loop,
460 sys.exc_info(),
461 task_context,
462 func,
463 child,
464 ),
465 )
466 ret: _R
467 try:
468 ret = await asyncio.shield(exec_coro)
469 except asyncio.CancelledError:
470 cancel_parent = True
471 try:
472 task = task_context[0]
473 task.cancel()
474 try:
475 await task
476 cancel_parent = False
477 except asyncio.CancelledError:
478 pass
479 except IndexError:
480 pass
481 if exec_coro.done():
482 raise
483 if cancel_parent:
484 exec_coro.cancel()
485 ret = await exec_coro
486 finally:
487 _restore_context(context)
488 self.deadlock_context.set(False)
490 return ret
492 def __get__(
493 self, parent: Any, objtype: Any
494 ) -> Callable[_P, Coroutine[Any, Any, _R]]:
495 """
496 Include self for methods
497 """
498 func = functools.partial(self.__call__, parent)
499 return functools.update_wrapper(func, self.func)
501 def thread_handler(self, loop, exc_info, task_context, func, *args, **kwargs):
502 """
503 Wraps the sync application with exception handling.
504 """
506 __traceback_hide__ = True # noqa: F841
508 # Set the threadlocal for AsyncToSync
509 self.threadlocal.main_event_loop = loop
510 self.threadlocal.main_event_loop_pid = os.getpid()
511 self.threadlocal.task_context = task_context
513 # Run the function
514 # If we have an exception, run the function inside the except block
515 # after raising it so exc_info is correctly populated.
516 if exc_info[1]:
517 try:
518 raise exc_info[1]
519 except BaseException:
520 return func(*args, **kwargs)
521 else:
522 return func(*args, **kwargs)
525@overload
526def async_to_sync(
527 *,
528 force_new_loop: bool = False,
529) -> Callable[
530 [Union[Callable[_P, Coroutine[Any, Any, _R]], Callable[_P, Awaitable[_R]]]],
531 Callable[_P, _R],
532]:
533 ...
536@overload
537def async_to_sync(
538 awaitable: Union[
539 Callable[_P, Coroutine[Any, Any, _R]],
540 Callable[_P, Awaitable[_R]],
541 ],
542 *,
543 force_new_loop: bool = False,
544) -> Callable[_P, _R]:
545 ...
548def async_to_sync(
549 awaitable: Optional[
550 Union[
551 Callable[_P, Coroutine[Any, Any, _R]],
552 Callable[_P, Awaitable[_R]],
553 ]
554 ] = None,
555 *,
556 force_new_loop: bool = False,
557) -> Union[
558 Callable[
559 [Union[Callable[_P, Coroutine[Any, Any, _R]], Callable[_P, Awaitable[_R]]]],
560 Callable[_P, _R],
561 ],
562 Callable[_P, _R],
563]:
564 if awaitable is None:
565 return lambda f: AsyncToSync(
566 f,
567 force_new_loop=force_new_loop,
568 )
569 return AsyncToSync(
570 awaitable,
571 force_new_loop=force_new_loop,
572 )
575@overload
576def sync_to_async(
577 *,
578 thread_sensitive: bool = True,
579 executor: Optional["ThreadPoolExecutor"] = None,
580) -> Callable[[Callable[_P, _R]], Callable[_P, Coroutine[Any, Any, _R]]]:
581 ...
584@overload
585def sync_to_async(
586 func: Callable[_P, _R],
587 *,
588 thread_sensitive: bool = True,
589 executor: Optional["ThreadPoolExecutor"] = None,
590) -> Callable[_P, Coroutine[Any, Any, _R]]:
591 ...
594def sync_to_async(
595 func: Optional[Callable[_P, _R]] = None,
596 *,
597 thread_sensitive: bool = True,
598 executor: Optional["ThreadPoolExecutor"] = None,
599) -> Union[
600 Callable[[Callable[_P, _R]], Callable[_P, Coroutine[Any, Any, _R]]],
601 Callable[_P, Coroutine[Any, Any, _R]],
602]:
603 if func is None:
604 return lambda f: SyncToAsync(
605 f,
606 thread_sensitive=thread_sensitive,
607 executor=executor,
608 )
609 return SyncToAsync(
610 func,
611 thread_sensitive=thread_sensitive,
612 executor=executor,
613 )