Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/asgiref/sync.py: 23%
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1import asyncio
2import asyncio.coroutines
3import contextvars
4import functools
5import inspect
6import os
7import sys
8import threading
9import warnings
10import weakref
11from concurrent.futures import Future, ThreadPoolExecutor
12from typing import (
13 TYPE_CHECKING,
14 Any,
15 Awaitable,
16 Callable,
17 Coroutine,
18 Dict,
19 Generic,
20 List,
21 Optional,
22 TypeVar,
23 Union,
24 overload,
25)
27from .current_thread_executor import CurrentThreadExecutor
28from .local import Local
30if sys.version_info >= (3, 10):
31 from typing import ParamSpec
32else:
33 from typing_extensions import ParamSpec
35if TYPE_CHECKING:
36 # This is not available to import at runtime
37 from _typeshed import OptExcInfo
39_F = TypeVar("_F", bound=Callable[..., Any])
40_P = ParamSpec("_P")
41_R = TypeVar("_R")
44def _restore_context(context: contextvars.Context) -> None:
45 # Check for changes in contextvars, and set them to the current
46 # context for downstream consumers
47 for cvar in context:
48 cvalue = context.get(cvar)
49 try:
50 if cvar.get() != cvalue:
51 cvar.set(cvalue)
52 except LookupError:
53 cvar.set(cvalue)
56# Python 3.12 deprecates asyncio.iscoroutinefunction() as an alias for
57# inspect.iscoroutinefunction(), whilst also removing the _is_coroutine marker.
58# The latter is replaced with the inspect.markcoroutinefunction decorator.
59# Until 3.12 is the minimum supported Python version, provide a shim.
61if hasattr(inspect, "markcoroutinefunction"):
62 iscoroutinefunction = inspect.iscoroutinefunction
63 markcoroutinefunction: Callable[[_F], _F] = inspect.markcoroutinefunction
64else:
65 iscoroutinefunction = asyncio.iscoroutinefunction # type: ignore[assignment]
67 def markcoroutinefunction(func: _F) -> _F:
68 func._is_coroutine = asyncio.coroutines._is_coroutine # type: ignore
69 return func
72class AsyncSingleThreadContext:
73 """Context manager to run async code inside the same thread.
75 Normally, AsyncToSync functions run either inside a separate ThreadPoolExecutor or
76 the main event loop if it exists. This context manager ensures that all AsyncToSync
77 functions execute within the same thread.
79 This context manager is re-entrant, so only the outer-most call to
80 AsyncSingleThreadContext will set the context.
82 Usage:
84 >>> import asyncio
85 >>> with AsyncSingleThreadContext():
86 ... async_to_sync(asyncio.sleep(1))()
87 """
89 def __init__(self):
90 self.token = None
92 def __enter__(self):
93 try:
94 AsyncToSync.async_single_thread_context.get()
95 except LookupError:
96 self.token = AsyncToSync.async_single_thread_context.set(self)
98 return self
100 def __exit__(self, exc, value, tb):
101 if not self.token:
102 return
104 executor = AsyncToSync.context_to_thread_executor.pop(self, None)
105 if executor:
106 executor.shutdown()
108 AsyncToSync.async_single_thread_context.reset(self.token)
111class ThreadSensitiveContext:
112 """Async context manager to manage context for thread sensitive mode
114 This context manager controls which thread pool executor is used when in
115 thread sensitive mode. By default, a single thread pool executor is shared
116 within a process.
118 The ThreadSensitiveContext() context manager may be used to specify a
119 thread pool per context.
121 This context manager is re-entrant, so only the outer-most call to
122 ThreadSensitiveContext will set the context.
124 Usage:
126 >>> import time
127 >>> async with ThreadSensitiveContext():
128 ... await sync_to_async(time.sleep, 1)()
129 """
131 def __init__(self):
132 self.token = None
134 async def __aenter__(self):
135 try:
136 SyncToAsync.thread_sensitive_context.get()
137 except LookupError:
138 self.token = SyncToAsync.thread_sensitive_context.set(self)
140 return self
142 async def __aexit__(self, exc, value, tb):
143 if not self.token:
144 return
146 executor = SyncToAsync.context_to_thread_executor.pop(self, None)
147 if executor:
148 executor.shutdown()
149 SyncToAsync.thread_sensitive_context.reset(self.token)
152class AsyncToSync(Generic[_P, _R]):
153 """
154 Utility class which turns an awaitable that only works on the thread with
155 the event loop into a synchronous callable that works in a subthread.
157 If the call stack contains an async loop, the code runs there.
158 Otherwise, the code runs in a new loop in a new thread.
160 Either way, this thread then pauses and waits to run any thread_sensitive
161 code called from further down the call stack using SyncToAsync, before
162 finally exiting once the async task returns.
163 """
165 # Keeps a reference to the CurrentThreadExecutor in local context, so that
166 # any sync_to_async inside the wrapped code can find it.
167 executors: "Local" = Local()
169 # When we can't find a CurrentThreadExecutor from the context, such as
170 # inside create_task, we'll look it up here from the running event loop.
171 loop_thread_executors: "Dict[asyncio.AbstractEventLoop, CurrentThreadExecutor]" = {}
173 async_single_thread_context: "contextvars.ContextVar[AsyncSingleThreadContext]" = (
174 contextvars.ContextVar("async_single_thread_context")
175 )
177 context_to_thread_executor: "weakref.WeakKeyDictionary[AsyncSingleThreadContext, ThreadPoolExecutor]" = (
178 weakref.WeakKeyDictionary()
179 )
181 def __init__(
182 self,
183 awaitable: Union[
184 Callable[_P, Coroutine[Any, Any, _R]],
185 Callable[_P, Awaitable[_R]],
186 ],
187 force_new_loop: bool = False,
188 ):
189 if not callable(awaitable) or (
190 not iscoroutinefunction(awaitable)
191 and not iscoroutinefunction(getattr(awaitable, "__call__", awaitable))
192 ):
193 # Python does not have very reliable detection of async functions
194 # (lots of false negatives) so this is just a warning.
195 warnings.warn(
196 "async_to_sync was passed a non-async-marked callable", stacklevel=2
197 )
198 self.awaitable = awaitable
199 try:
200 self.__self__ = self.awaitable.__self__ # type: ignore[union-attr]
201 except AttributeError:
202 pass
203 self.force_new_loop = force_new_loop
204 self.main_event_loop = None
205 try:
206 self.main_event_loop = asyncio.get_running_loop()
207 except RuntimeError:
208 # There's no event loop in this thread.
209 pass
211 def __call__(self, *args: _P.args, **kwargs: _P.kwargs) -> _R:
212 __traceback_hide__ = True # noqa: F841
214 if not self.force_new_loop and not self.main_event_loop:
215 # There's no event loop in this thread. Look for the threadlocal if
216 # we're inside SyncToAsync
217 main_event_loop_pid = getattr(
218 SyncToAsync.threadlocal, "main_event_loop_pid", None
219 )
220 # We make sure the parent loop is from the same process - if
221 # they've forked, this is not going to be valid any more (#194)
222 if main_event_loop_pid and main_event_loop_pid == os.getpid():
223 self.main_event_loop = getattr(
224 SyncToAsync.threadlocal, "main_event_loop", None
225 )
227 # You can't call AsyncToSync from a thread with a running event loop
228 try:
229 asyncio.get_running_loop()
230 except RuntimeError:
231 pass
232 else:
233 raise RuntimeError(
234 "You cannot use AsyncToSync in the same thread as an async event loop - "
235 "just await the async function directly."
236 )
238 # Make a future for the return information
239 call_result: "Future[_R]" = Future()
241 # Make a CurrentThreadExecutor we'll use to idle in this thread - we
242 # need one for every sync frame, even if there's one above us in the
243 # same thread.
244 old_executor = getattr(self.executors, "current", None)
245 current_executor = CurrentThreadExecutor(old_executor)
246 self.executors.current = current_executor
248 # Wrapping context in list so it can be reassigned from within
249 # `main_wrap`.
250 context = [contextvars.copy_context()]
252 # Get task context so that parent task knows which task to propagate
253 # an asyncio.CancelledError to.
254 task_context = getattr(SyncToAsync.threadlocal, "task_context", None)
256 # Use call_soon_threadsafe to schedule a synchronous callback on the
257 # main event loop's thread if it's there, otherwise make a new loop
258 # in this thread.
259 try:
260 awaitable = self.main_wrap(
261 call_result,
262 sys.exc_info(),
263 task_context,
264 context,
265 # prepare an awaitable which can be passed as is to self.main_wrap,
266 # so that `args` and `kwargs` don't need to be
267 # destructured when passed to self.main_wrap
268 # (which is required by `ParamSpec`)
269 # as that may cause overlapping arguments
270 self.awaitable(*args, **kwargs),
271 )
273 async def new_loop_wrap() -> None:
274 loop = asyncio.get_running_loop()
275 self.loop_thread_executors[loop] = current_executor
276 try:
277 await awaitable
278 finally:
279 del self.loop_thread_executors[loop]
281 if self.main_event_loop is not None:
282 try:
283 self.main_event_loop.call_soon_threadsafe(
284 self.main_event_loop.create_task, awaitable
285 )
286 except RuntimeError:
287 running_in_main_event_loop = False
288 else:
289 running_in_main_event_loop = True
290 # Run the CurrentThreadExecutor until the future is done.
291 current_executor.run_until_future(call_result)
292 else:
293 running_in_main_event_loop = False
295 if not running_in_main_event_loop:
296 loop_executor = None
298 if self.async_single_thread_context.get(None):
299 single_thread_context = self.async_single_thread_context.get()
301 if single_thread_context in self.context_to_thread_executor:
302 loop_executor = self.context_to_thread_executor[
303 single_thread_context
304 ]
305 else:
306 loop_executor = ThreadPoolExecutor(max_workers=1)
307 self.context_to_thread_executor[
308 single_thread_context
309 ] = loop_executor
310 else:
311 # Make our own event loop - in a new thread - and run inside that.
312 loop_executor = ThreadPoolExecutor(max_workers=1)
314 loop_future = loop_executor.submit(asyncio.run, new_loop_wrap())
315 # Run the CurrentThreadExecutor until the future is done.
316 current_executor.run_until_future(loop_future)
317 # Wait for future and/or allow for exception propagation
318 loop_future.result()
319 finally:
320 _restore_context(context[0])
321 # Restore old current thread executor state
322 self.executors.current = old_executor
324 # Wait for results from the future.
325 return call_result.result()
327 def __get__(self, parent: Any, objtype: Any) -> Callable[_P, _R]:
328 """
329 Include self for methods
330 """
331 func = functools.partial(self.__call__, parent)
332 return functools.update_wrapper(func, self.awaitable)
334 async def main_wrap(
335 self,
336 call_result: "Future[_R]",
337 exc_info: "OptExcInfo",
338 task_context: "Optional[List[asyncio.Task[Any]]]",
339 context: List[contextvars.Context],
340 awaitable: Union[Coroutine[Any, Any, _R], Awaitable[_R]],
341 ) -> None:
342 """
343 Wraps the awaitable with something that puts the result into the
344 result/exception future.
345 """
347 __traceback_hide__ = True # noqa: F841
349 if context is not None:
350 _restore_context(context[0])
352 current_task = asyncio.current_task()
353 if current_task is not None and task_context is not None:
354 task_context.append(current_task)
356 try:
357 # If we have an exception, run the function inside the except block
358 # after raising it so exc_info is correctly populated.
359 if exc_info[1]:
360 try:
361 raise exc_info[1]
362 except BaseException:
363 result = await awaitable
364 else:
365 result = await awaitable
366 except BaseException as e:
367 call_result.set_exception(e)
368 else:
369 call_result.set_result(result)
370 finally:
371 if current_task is not None and task_context is not None:
372 task_context.remove(current_task)
373 context[0] = contextvars.copy_context()
376class SyncToAsync(Generic[_P, _R]):
377 """
378 Utility class which turns a synchronous callable into an awaitable that
379 runs in a threadpool. It also sets a threadlocal inside the thread so
380 calls to AsyncToSync can escape it.
382 If thread_sensitive is passed, the code will run in the same thread as any
383 outer code. This is needed for underlying Python code that is not
384 threadsafe (for example, code which handles SQLite database connections).
386 If the outermost program is async (i.e. SyncToAsync is outermost), then
387 this will be a dedicated single sub-thread that all sync code runs in,
388 one after the other. If the outermost program is sync (i.e. AsyncToSync is
389 outermost), this will just be the main thread. This is achieved by idling
390 with a CurrentThreadExecutor while AsyncToSync is blocking its sync parent,
391 rather than just blocking.
393 If executor is passed in, that will be used instead of the loop's default executor.
394 In order to pass in an executor, thread_sensitive must be set to False, otherwise
395 a TypeError will be raised.
396 """
398 # Storage for main event loop references
399 threadlocal = threading.local()
401 # Single-thread executor for thread-sensitive code
402 single_thread_executor = ThreadPoolExecutor(max_workers=1)
404 # Maintain a contextvar for the current execution context. Optionally used
405 # for thread sensitive mode.
406 thread_sensitive_context: "contextvars.ContextVar[ThreadSensitiveContext]" = (
407 contextvars.ContextVar("thread_sensitive_context")
408 )
410 # Contextvar that is used to detect if the single thread executor
411 # would be awaited on while already being used in the same context
412 deadlock_context: "contextvars.ContextVar[bool]" = contextvars.ContextVar(
413 "deadlock_context"
414 )
416 # Maintaining a weak reference to the context ensures that thread pools are
417 # erased once the context goes out of scope. This terminates the thread pool.
418 context_to_thread_executor: "weakref.WeakKeyDictionary[ThreadSensitiveContext, ThreadPoolExecutor]" = (
419 weakref.WeakKeyDictionary()
420 )
422 def __init__(
423 self,
424 func: Callable[_P, _R],
425 thread_sensitive: bool = True,
426 executor: Optional["ThreadPoolExecutor"] = None,
427 context: Optional[contextvars.Context] = None,
428 ) -> None:
429 if (
430 not callable(func)
431 or iscoroutinefunction(func)
432 or iscoroutinefunction(getattr(func, "__call__", func))
433 ):
434 raise TypeError("sync_to_async can only be applied to sync functions.")
435 self.func = func
436 self.context = context
437 functools.update_wrapper(self, func)
438 self._thread_sensitive = thread_sensitive
439 markcoroutinefunction(self)
440 if thread_sensitive and executor is not None:
441 raise TypeError("executor must not be set when thread_sensitive is True")
442 self._executor = executor
443 try:
444 self.__self__ = func.__self__ # type: ignore
445 except AttributeError:
446 pass
448 async def __call__(self, *args: _P.args, **kwargs: _P.kwargs) -> _R:
449 __traceback_hide__ = True # noqa: F841
450 loop = asyncio.get_running_loop()
452 # Work out what thread to run the code in
453 if self._thread_sensitive:
454 current_thread_executor = getattr(AsyncToSync.executors, "current", None)
455 if current_thread_executor:
456 # If we have a parent sync thread above somewhere, use that
457 executor = current_thread_executor
458 elif self.thread_sensitive_context.get(None):
459 # If we have a way of retrieving the current context, attempt
460 # to use a per-context thread pool executor
461 thread_sensitive_context = self.thread_sensitive_context.get()
463 if thread_sensitive_context in self.context_to_thread_executor:
464 # Re-use thread executor in current context
465 executor = self.context_to_thread_executor[thread_sensitive_context]
466 else:
467 # Create new thread executor in current context
468 executor = ThreadPoolExecutor(max_workers=1)
469 self.context_to_thread_executor[thread_sensitive_context] = executor
470 elif loop in AsyncToSync.loop_thread_executors:
471 # Re-use thread executor for running loop
472 executor = AsyncToSync.loop_thread_executors[loop]
473 elif self.deadlock_context.get(False):
474 raise RuntimeError(
475 "Single thread executor already being used, would deadlock"
476 )
477 else:
478 # Otherwise, we run it in a fixed single thread
479 executor = self.single_thread_executor
480 self.deadlock_context.set(True)
481 else:
482 # Use the passed in executor, or the loop's default if it is None
483 executor = self._executor
485 context = contextvars.copy_context() if self.context is None else self.context
486 child = functools.partial(self.func, *args, **kwargs)
487 func = context.run
488 task_context: List[asyncio.Task[Any]] = []
490 # Run the code in the right thread
491 exec_coro = loop.run_in_executor(
492 executor,
493 functools.partial(
494 self.thread_handler,
495 loop,
496 sys.exc_info(),
497 task_context,
498 func,
499 child,
500 ),
501 )
502 ret: _R
503 try:
504 ret = await asyncio.shield(exec_coro)
505 except asyncio.CancelledError:
506 cancel_parent = True
507 try:
508 task = task_context[0]
509 task.cancel()
510 try:
511 await task
512 cancel_parent = False
513 except asyncio.CancelledError:
514 pass
515 except IndexError:
516 pass
517 if exec_coro.done():
518 raise
519 if cancel_parent:
520 exec_coro.cancel()
521 ret = await exec_coro
522 finally:
523 if self.context is None:
524 _restore_context(context)
525 self.deadlock_context.set(False)
527 return ret
529 def __get__(
530 self, parent: Any, objtype: Any
531 ) -> Callable[_P, Coroutine[Any, Any, _R]]:
532 """
533 Include self for methods
534 """
535 func = functools.partial(self.__call__, parent)
536 return functools.update_wrapper(func, self.func)
538 def thread_handler(self, loop, exc_info, task_context, func, *args, **kwargs):
539 """
540 Wraps the sync application with exception handling.
541 """
543 __traceback_hide__ = True # noqa: F841
545 # Set the threadlocal for AsyncToSync
546 self.threadlocal.main_event_loop = loop
547 self.threadlocal.main_event_loop_pid = os.getpid()
548 self.threadlocal.task_context = task_context
550 # Run the function
551 # If we have an exception, run the function inside the except block
552 # after raising it so exc_info is correctly populated.
553 if exc_info[1]:
554 try:
555 raise exc_info[1]
556 except BaseException:
557 return func(*args, **kwargs)
558 else:
559 return func(*args, **kwargs)
562@overload
563def async_to_sync(
564 *,
565 force_new_loop: bool = False,
566) -> Callable[
567 [Union[Callable[_P, Coroutine[Any, Any, _R]], Callable[_P, Awaitable[_R]]]],
568 Callable[_P, _R],
569]:
570 ...
573@overload
574def async_to_sync(
575 awaitable: Union[
576 Callable[_P, Coroutine[Any, Any, _R]],
577 Callable[_P, Awaitable[_R]],
578 ],
579 *,
580 force_new_loop: bool = False,
581) -> Callable[_P, _R]:
582 ...
585def async_to_sync(
586 awaitable: Optional[
587 Union[
588 Callable[_P, Coroutine[Any, Any, _R]],
589 Callable[_P, Awaitable[_R]],
590 ]
591 ] = None,
592 *,
593 force_new_loop: bool = False,
594) -> Union[
595 Callable[
596 [Union[Callable[_P, Coroutine[Any, Any, _R]], Callable[_P, Awaitable[_R]]]],
597 Callable[_P, _R],
598 ],
599 Callable[_P, _R],
600]:
601 if awaitable is None:
602 return lambda f: AsyncToSync(
603 f,
604 force_new_loop=force_new_loop,
605 )
606 return AsyncToSync(
607 awaitable,
608 force_new_loop=force_new_loop,
609 )
612@overload
613def sync_to_async(
614 *,
615 thread_sensitive: bool = True,
616 executor: Optional["ThreadPoolExecutor"] = None,
617 context: Optional[contextvars.Context] = None,
618) -> Callable[[Callable[_P, _R]], Callable[_P, Coroutine[Any, Any, _R]]]:
619 ...
622@overload
623def sync_to_async(
624 func: Callable[_P, _R],
625 *,
626 thread_sensitive: bool = True,
627 executor: Optional["ThreadPoolExecutor"] = None,
628 context: Optional[contextvars.Context] = None,
629) -> Callable[_P, Coroutine[Any, Any, _R]]:
630 ...
633def sync_to_async(
634 func: Optional[Callable[_P, _R]] = None,
635 *,
636 thread_sensitive: bool = True,
637 executor: Optional["ThreadPoolExecutor"] = None,
638 context: Optional[contextvars.Context] = None,
639) -> Union[
640 Callable[[Callable[_P, _R]], Callable[_P, Coroutine[Any, Any, _R]]],
641 Callable[_P, Coroutine[Any, Any, _R]],
642]:
643 if func is None:
644 return lambda f: SyncToAsync(
645 f,
646 thread_sensitive=thread_sensitive,
647 executor=executor,
648 context=context,
649 )
650 return SyncToAsync(
651 func,
652 thread_sensitive=thread_sensitive,
653 executor=executor,
654 context=context,
655 )