Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/asgiref/sync.py: 23%
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1import asyncio
2import asyncio.coroutines
3import contextvars
4import functools
5import inspect
6import os
7import sys
8import threading
9import warnings
10import weakref
11from concurrent.futures import Future, ThreadPoolExecutor
12from typing import (
13 TYPE_CHECKING,
14 Any,
15 Awaitable,
16 Callable,
17 Coroutine,
18 Dict,
19 Generic,
20 List,
21 Optional,
22 TypeVar,
23 Union,
24 overload,
25)
27from .current_thread_executor import CurrentThreadExecutor
28from .local import Local
30if sys.version_info >= (3, 10):
31 from typing import ParamSpec
32else:
33 from typing_extensions import ParamSpec
35if TYPE_CHECKING:
36 # This is not available to import at runtime
37 from _typeshed import OptExcInfo
39_F = TypeVar("_F", bound=Callable[..., Any])
40_P = ParamSpec("_P")
41_R = TypeVar("_R")
44def _restore_context(context: contextvars.Context) -> None:
45 # Check for changes in contextvars, and set them to the current
46 # context for downstream consumers
47 for cvar in context:
48 cvalue = context.get(cvar)
49 try:
50 if cvar.get() != cvalue:
51 cvar.set(cvalue)
52 except LookupError:
53 cvar.set(cvalue)
56# Python 3.12 deprecates asyncio.iscoroutinefunction() as an alias for
57# inspect.iscoroutinefunction(), whilst also removing the _is_coroutine marker.
58# The latter is replaced with the inspect.markcoroutinefunction decorator.
59# Until 3.12 is the minimum supported Python version, provide a shim.
61if hasattr(inspect, "markcoroutinefunction"):
62 iscoroutinefunction = inspect.iscoroutinefunction
63 markcoroutinefunction: Callable[[_F], _F] = inspect.markcoroutinefunction
64else:
65 iscoroutinefunction = asyncio.iscoroutinefunction # type: ignore[assignment]
67 def markcoroutinefunction(func: _F) -> _F:
68 func._is_coroutine = asyncio.coroutines._is_coroutine # type: ignore
69 return func
72class AsyncSingleThreadContext:
73 """Context manager to run async code inside the same thread.
75 Normally, AsyncToSync functions run either inside a separate ThreadPoolExecutor or
76 the main event loop if it exists. This context manager ensures that all AsyncToSync
77 functions execute within the same thread.
79 This context manager is re-entrant, so only the outer-most call to
80 AsyncSingleThreadContext will set the context.
82 Usage:
84 >>> import asyncio
85 >>> with AsyncSingleThreadContext():
86 ... async_to_sync(asyncio.sleep(1))()
87 """
89 def __init__(self):
90 self.token = None
92 def __enter__(self):
93 try:
94 AsyncToSync.async_single_thread_context.get()
95 except LookupError:
96 self.token = AsyncToSync.async_single_thread_context.set(self)
98 return self
100 def __exit__(self, exc, value, tb):
101 if not self.token:
102 return
104 executor = AsyncToSync.context_to_thread_executor.pop(self, None)
105 if executor:
106 executor.shutdown()
108 AsyncToSync.async_single_thread_context.reset(self.token)
111class ThreadSensitiveContext:
112 """Async context manager to manage context for thread sensitive mode
114 This context manager controls which thread pool executor is used when in
115 thread sensitive mode. By default, a single thread pool executor is shared
116 within a process.
118 The ThreadSensitiveContext() context manager may be used to specify a
119 thread pool per context.
121 This context manager is re-entrant, so only the outer-most call to
122 ThreadSensitiveContext will set the context.
124 Usage:
126 >>> import time
127 >>> async with ThreadSensitiveContext():
128 ... await sync_to_async(time.sleep, 1)()
129 """
131 def __init__(self):
132 self.token = None
134 async def __aenter__(self):
135 try:
136 SyncToAsync.thread_sensitive_context.get()
137 except LookupError:
138 self.token = SyncToAsync.thread_sensitive_context.set(self)
140 return self
142 async def __aexit__(self, exc, value, tb):
143 if not self.token:
144 return
146 executor = SyncToAsync.context_to_thread_executor.pop(self, None)
147 if executor:
148 executor.shutdown()
149 SyncToAsync.thread_sensitive_context.reset(self.token)
152class AsyncToSync(Generic[_P, _R]):
153 """
154 Utility class which turns an awaitable that only works on the thread with
155 the event loop into a synchronous callable that works in a subthread.
157 If the call stack contains an async loop, the code runs there.
158 Otherwise, the code runs in a new loop in a new thread.
160 Either way, this thread then pauses and waits to run any thread_sensitive
161 code called from further down the call stack using SyncToAsync, before
162 finally exiting once the async task returns.
163 """
165 # Keeps a reference to the CurrentThreadExecutor in local context, so that
166 # any sync_to_async inside the wrapped code can find it.
167 executors: "Local" = Local()
169 # When we can't find a CurrentThreadExecutor from the context, such as
170 # inside create_task, we'll look it up here from the running event loop.
171 loop_thread_executors: "Dict[asyncio.AbstractEventLoop, CurrentThreadExecutor]" = {}
173 async_single_thread_context: "contextvars.ContextVar[AsyncSingleThreadContext]" = (
174 contextvars.ContextVar("async_single_thread_context")
175 )
177 context_to_thread_executor: "weakref.WeakKeyDictionary[AsyncSingleThreadContext, ThreadPoolExecutor]" = (
178 weakref.WeakKeyDictionary()
179 )
181 def __init__(
182 self,
183 awaitable: Union[
184 Callable[_P, Coroutine[Any, Any, _R]],
185 Callable[_P, Awaitable[_R]],
186 ],
187 force_new_loop: bool = False,
188 ):
189 if not callable(awaitable) or (
190 not iscoroutinefunction(awaitable)
191 and not iscoroutinefunction(getattr(awaitable, "__call__", awaitable))
192 ):
193 # Python does not have very reliable detection of async functions
194 # (lots of false negatives) so this is just a warning.
195 warnings.warn(
196 "async_to_sync was passed a non-async-marked callable", stacklevel=2
197 )
198 self.awaitable = awaitable
199 try:
200 self.__self__ = self.awaitable.__self__ # type: ignore[union-attr]
201 except AttributeError:
202 pass
203 self.force_new_loop = force_new_loop
204 self.main_event_loop = None
205 try:
206 self.main_event_loop = asyncio.get_running_loop()
207 except RuntimeError:
208 # There's no event loop in this thread.
209 pass
211 def __call__(self, *args: _P.args, **kwargs: _P.kwargs) -> _R:
212 __traceback_hide__ = True # noqa: F841
214 if not self.force_new_loop and not self.main_event_loop:
215 # There's no event loop in this thread. Look for the threadlocal if
216 # we're inside SyncToAsync
217 main_event_loop_pid = getattr(
218 SyncToAsync.threadlocal, "main_event_loop_pid", None
219 )
220 # We make sure the parent loop is from the same process - if
221 # they've forked, this is not going to be valid any more (#194)
222 if main_event_loop_pid and main_event_loop_pid == os.getpid():
223 self.main_event_loop = getattr(
224 SyncToAsync.threadlocal, "main_event_loop", None
225 )
227 # You can't call AsyncToSync from a thread with a running event loop
228 try:
229 asyncio.get_running_loop()
230 except RuntimeError:
231 pass
232 else:
233 raise RuntimeError(
234 "You cannot use AsyncToSync in the same thread as an async event loop - "
235 "just await the async function directly."
236 )
238 # Make a future for the return information
239 call_result: "Future[_R]" = Future()
241 # Make a CurrentThreadExecutor we'll use to idle in this thread - we
242 # need one for every sync frame, even if there's one above us in the
243 # same thread.
244 old_executor = getattr(self.executors, "current", None)
245 current_executor = CurrentThreadExecutor(old_executor)
246 self.executors.current = current_executor
248 # Wrapping context in list so it can be reassigned from within
249 # `main_wrap`.
250 context = [contextvars.copy_context()]
252 # Get task context so that parent task knows which task to propagate
253 # an asyncio.CancelledError to.
254 task_context = getattr(SyncToAsync.threadlocal, "task_context", None)
256 # Use call_soon_threadsafe to schedule a synchronous callback on the
257 # main event loop's thread if it's there, otherwise make a new loop
258 # in this thread.
259 try:
260 awaitable = self.main_wrap(
261 call_result,
262 sys.exc_info(),
263 task_context,
264 context,
265 # prepare an awaitable which can be passed as is to self.main_wrap,
266 # so that `args` and `kwargs` don't need to be
267 # destructured when passed to self.main_wrap
268 # (which is required by `ParamSpec`)
269 # as that may cause overlapping arguments
270 self.awaitable(*args, **kwargs),
271 )
273 async def new_loop_wrap() -> None:
274 loop = asyncio.get_running_loop()
275 self.loop_thread_executors[loop] = current_executor
276 try:
277 await awaitable
278 finally:
279 del self.loop_thread_executors[loop]
281 if self.main_event_loop is not None:
282 try:
283 self.main_event_loop.call_soon_threadsafe(
284 self.main_event_loop.create_task, awaitable
285 )
286 except RuntimeError:
287 running_in_main_event_loop = False
288 else:
289 running_in_main_event_loop = True
290 # Run the CurrentThreadExecutor until the future is done.
291 current_executor.run_until_future(call_result)
292 else:
293 running_in_main_event_loop = False
295 if not running_in_main_event_loop:
296 loop_executor = None
298 if self.async_single_thread_context.get(None):
299 single_thread_context = self.async_single_thread_context.get()
301 if single_thread_context in self.context_to_thread_executor:
302 loop_executor = self.context_to_thread_executor[
303 single_thread_context
304 ]
305 else:
306 loop_executor = ThreadPoolExecutor(max_workers=1)
307 self.context_to_thread_executor[
308 single_thread_context
309 ] = loop_executor
310 else:
311 # Make our own event loop - in a new thread - and run inside that.
312 loop_executor = ThreadPoolExecutor(max_workers=1)
314 loop_future = loop_executor.submit(asyncio.run, new_loop_wrap())
315 # Run the CurrentThreadExecutor until the future is done.
316 current_executor.run_until_future(loop_future)
317 # Wait for future and/or allow for exception propagation
318 loop_future.result()
319 finally:
320 _restore_context(context[0])
321 # Restore old current thread executor state
322 self.executors.current = old_executor
324 # Wait for results from the future.
325 return call_result.result()
327 def __get__(self, parent: Any, objtype: Any) -> Callable[_P, _R]:
328 """
329 Include self for methods
330 """
331 func = functools.partial(self.__call__, parent)
332 return functools.update_wrapper(func, self.awaitable)
334 async def main_wrap(
335 self,
336 call_result: "Future[_R]",
337 exc_info: "OptExcInfo",
338 task_context: "Optional[List[asyncio.Task[Any]]]",
339 context: List[contextvars.Context],
340 awaitable: Union[Coroutine[Any, Any, _R], Awaitable[_R]],
341 ) -> None:
342 """
343 Wraps the awaitable with something that puts the result into the
344 result/exception future.
345 """
347 __traceback_hide__ = True # noqa: F841
349 if context is not None:
350 _restore_context(context[0])
352 current_task = asyncio.current_task()
353 if current_task is not None and task_context is not None:
354 task_context.append(current_task)
356 try:
357 # If we have an exception, run the function inside the except block
358 # after raising it so exc_info is correctly populated.
359 if exc_info[1]:
360 try:
361 raise exc_info[1]
362 except BaseException:
363 result = await awaitable
364 else:
365 result = await awaitable
366 except BaseException as e:
367 call_result.set_exception(e)
368 else:
369 call_result.set_result(result)
370 finally:
371 if current_task is not None and task_context is not None:
372 task_context.remove(current_task)
373 context[0] = contextvars.copy_context()
376class SyncToAsync(Generic[_P, _R]):
377 """
378 Utility class which turns a synchronous callable into an awaitable that
379 runs in a threadpool. It also sets a threadlocal inside the thread so
380 calls to AsyncToSync can escape it.
382 If thread_sensitive is passed, the code will run in the same thread as any
383 outer code. This is needed for underlying Python code that is not
384 threadsafe (for example, code which handles SQLite database connections).
386 If the outermost program is async (i.e. SyncToAsync is outermost), then
387 this will be a dedicated single sub-thread that all sync code runs in,
388 one after the other. If the outermost program is sync (i.e. AsyncToSync is
389 outermost), this will just be the main thread. This is achieved by idling
390 with a CurrentThreadExecutor while AsyncToSync is blocking its sync parent,
391 rather than just blocking.
393 If executor is passed in, that will be used instead of the loop's default executor.
394 In order to pass in an executor, thread_sensitive must be set to False, otherwise
395 a TypeError will be raised.
396 """
398 # Storage for main event loop references
399 threadlocal = threading.local()
401 # Single-thread executor for thread-sensitive code
402 single_thread_executor = ThreadPoolExecutor(max_workers=1)
404 # Maintain a contextvar for the current execution context. Optionally used
405 # for thread sensitive mode.
406 thread_sensitive_context: "contextvars.ContextVar[ThreadSensitiveContext]" = (
407 contextvars.ContextVar("thread_sensitive_context")
408 )
410 # Contextvar that is used to detect if the single thread executor
411 # would be awaited on while already being used in the same context
412 deadlock_context: "contextvars.ContextVar[bool]" = contextvars.ContextVar(
413 "deadlock_context"
414 )
416 # Maintaining a weak reference to the context ensures that thread pools are
417 # erased once the context goes out of scope. This terminates the thread pool.
418 context_to_thread_executor: "weakref.WeakKeyDictionary[ThreadSensitiveContext, ThreadPoolExecutor]" = (
419 weakref.WeakKeyDictionary()
420 )
422 def __init__(
423 self,
424 func: Callable[_P, _R],
425 thread_sensitive: bool = True,
426 executor: Optional["ThreadPoolExecutor"] = None,
427 context: Optional[contextvars.Context] = None,
428 ) -> None:
429 if (
430 not callable(func)
431 or iscoroutinefunction(func)
432 or iscoroutinefunction(getattr(func, "__call__", func))
433 ):
434 raise TypeError("sync_to_async can only be applied to sync functions.")
436 functools.update_wrapper(self, func)
437 self.func = func
438 self.context = context
440 self._thread_sensitive = thread_sensitive
441 markcoroutinefunction(self)
442 if thread_sensitive and executor is not None:
443 raise TypeError("executor must not be set when thread_sensitive is True")
444 self._executor = executor
445 try:
446 self.__self__ = func.__self__ # type: ignore
447 except AttributeError:
448 pass
450 async def __call__(self, *args: _P.args, **kwargs: _P.kwargs) -> _R:
451 __traceback_hide__ = True # noqa: F841
452 loop = asyncio.get_running_loop()
454 # Work out what thread to run the code in
455 if self._thread_sensitive:
456 current_thread_executor = getattr(AsyncToSync.executors, "current", None)
457 if current_thread_executor:
458 # If we have a parent sync thread above somewhere, use that
459 executor = current_thread_executor
460 elif self.thread_sensitive_context.get(None):
461 # If we have a way of retrieving the current context, attempt
462 # to use a per-context thread pool executor
463 thread_sensitive_context = self.thread_sensitive_context.get()
465 if thread_sensitive_context in self.context_to_thread_executor:
466 # Re-use thread executor in current context
467 executor = self.context_to_thread_executor[thread_sensitive_context]
468 else:
469 # Create new thread executor in current context
470 executor = ThreadPoolExecutor(max_workers=1)
471 self.context_to_thread_executor[thread_sensitive_context] = executor
472 elif loop in AsyncToSync.loop_thread_executors:
473 # Re-use thread executor for running loop
474 executor = AsyncToSync.loop_thread_executors[loop]
475 elif self.deadlock_context.get(False):
476 raise RuntimeError(
477 "Single thread executor already being used, would deadlock"
478 )
479 else:
480 # Otherwise, we run it in a fixed single thread
481 executor = self.single_thread_executor
482 self.deadlock_context.set(True)
483 else:
484 # Use the passed in executor, or the loop's default if it is None
485 executor = self._executor
487 context = contextvars.copy_context() if self.context is None else self.context
488 child = functools.partial(self.func, *args, **kwargs)
489 func = context.run
490 task_context: List[asyncio.Task[Any]] = []
492 # Run the code in the right thread
493 exec_coro = loop.run_in_executor(
494 executor,
495 functools.partial(
496 self.thread_handler,
497 loop,
498 sys.exc_info(),
499 task_context,
500 func,
501 child,
502 ),
503 )
504 ret: _R
505 try:
506 ret = await asyncio.shield(exec_coro)
507 except asyncio.CancelledError:
508 cancel_parent = True
509 try:
510 task = task_context[0]
511 task.cancel()
512 try:
513 await task
514 cancel_parent = False
515 except asyncio.CancelledError:
516 pass
517 except IndexError:
518 pass
519 if exec_coro.done():
520 raise
521 if cancel_parent:
522 exec_coro.cancel()
523 ret = await exec_coro
524 finally:
525 if self.context is None:
526 _restore_context(context)
527 self.deadlock_context.set(False)
529 return ret
531 def __get__(
532 self, parent: Any, objtype: Any
533 ) -> Callable[_P, Coroutine[Any, Any, _R]]:
534 """
535 Include self for methods
536 """
537 func = functools.partial(self.__call__, parent)
538 return functools.update_wrapper(func, self.func)
540 def thread_handler(self, loop, exc_info, task_context, func, *args, **kwargs):
541 """
542 Wraps the sync application with exception handling.
543 """
545 __traceback_hide__ = True # noqa: F841
547 # Set the threadlocal for AsyncToSync
548 self.threadlocal.main_event_loop = loop
549 self.threadlocal.main_event_loop_pid = os.getpid()
550 self.threadlocal.task_context = task_context
552 # Run the function
553 # If we have an exception, run the function inside the except block
554 # after raising it so exc_info is correctly populated.
555 if exc_info[1]:
556 try:
557 raise exc_info[1]
558 except BaseException:
559 return func(*args, **kwargs)
560 else:
561 return func(*args, **kwargs)
564@overload
565def async_to_sync(
566 *,
567 force_new_loop: bool = False,
568) -> Callable[
569 [Union[Callable[_P, Coroutine[Any, Any, _R]], Callable[_P, Awaitable[_R]]]],
570 Callable[_P, _R],
571]:
572 ...
575@overload
576def async_to_sync(
577 awaitable: Union[
578 Callable[_P, Coroutine[Any, Any, _R]],
579 Callable[_P, Awaitable[_R]],
580 ],
581 *,
582 force_new_loop: bool = False,
583) -> Callable[_P, _R]:
584 ...
587def async_to_sync(
588 awaitable: Optional[
589 Union[
590 Callable[_P, Coroutine[Any, Any, _R]],
591 Callable[_P, Awaitable[_R]],
592 ]
593 ] = None,
594 *,
595 force_new_loop: bool = False,
596) -> Union[
597 Callable[
598 [Union[Callable[_P, Coroutine[Any, Any, _R]], Callable[_P, Awaitable[_R]]]],
599 Callable[_P, _R],
600 ],
601 Callable[_P, _R],
602]:
603 if awaitable is None:
604 return lambda f: AsyncToSync(
605 f,
606 force_new_loop=force_new_loop,
607 )
608 return AsyncToSync(
609 awaitable,
610 force_new_loop=force_new_loop,
611 )
614@overload
615def sync_to_async(
616 *,
617 thread_sensitive: bool = True,
618 executor: Optional["ThreadPoolExecutor"] = None,
619 context: Optional[contextvars.Context] = None,
620) -> Callable[[Callable[_P, _R]], Callable[_P, Coroutine[Any, Any, _R]]]:
621 ...
624@overload
625def sync_to_async(
626 func: Callable[_P, _R],
627 *,
628 thread_sensitive: bool = True,
629 executor: Optional["ThreadPoolExecutor"] = None,
630 context: Optional[contextvars.Context] = None,
631) -> Callable[_P, Coroutine[Any, Any, _R]]:
632 ...
635def sync_to_async(
636 func: Optional[Callable[_P, _R]] = None,
637 *,
638 thread_sensitive: bool = True,
639 executor: Optional["ThreadPoolExecutor"] = None,
640 context: Optional[contextvars.Context] = None,
641) -> Union[
642 Callable[[Callable[_P, _R]], Callable[_P, Coroutine[Any, Any, _R]]],
643 Callable[_P, Coroutine[Any, Any, _R]],
644]:
645 if func is None:
646 return lambda f: SyncToAsync(
647 f,
648 thread_sensitive=thread_sensitive,
649 executor=executor,
650 context=context,
651 )
652 return SyncToAsync(
653 func,
654 thread_sensitive=thread_sensitive,
655 executor=executor,
656 context=context,
657 )