Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/asgiref/sync.py: 23%
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1import asyncio
2import asyncio.coroutines
3import contextvars
4import functools
5import inspect
6import os
7import sys
8import threading
9import warnings
10import weakref
11from concurrent.futures import Future, ThreadPoolExecutor
12from typing import (
13 TYPE_CHECKING,
14 Any,
15 Awaitable,
16 Callable,
17 Coroutine,
18 Dict,
19 Generic,
20 List,
21 Optional,
22 TypeVar,
23 Union,
24 overload,
25)
27from .current_thread_executor import CurrentThreadExecutor
28from .local import Local
30if sys.version_info >= (3, 10):
31 from typing import ParamSpec
32else:
33 from typing_extensions import ParamSpec
35if TYPE_CHECKING:
36 # This is not available to import at runtime
37 from _typeshed import OptExcInfo
39_F = TypeVar("_F", bound=Callable[..., Any])
40_P = ParamSpec("_P")
41_R = TypeVar("_R")
44def _restore_context(context: contextvars.Context) -> None:
45 # Check for changes in contextvars, and set them to the current
46 # context for downstream consumers
47 for cvar in context:
48 cvalue = context.get(cvar)
49 try:
50 if cvar.get() != cvalue:
51 cvar.set(cvalue)
52 except LookupError:
53 cvar.set(cvalue)
56# Python 3.12 deprecates asyncio.iscoroutinefunction() as an alias for
57# inspect.iscoroutinefunction(), whilst also removing the _is_coroutine marker.
58# The latter is replaced with the inspect.markcoroutinefunction decorator.
59# Until 3.12 is the minimum supported Python version, provide a shim.
61if hasattr(inspect, "markcoroutinefunction"):
62 iscoroutinefunction = inspect.iscoroutinefunction
63 markcoroutinefunction: Callable[[_F], _F] = inspect.markcoroutinefunction
64else:
65 iscoroutinefunction = asyncio.iscoroutinefunction # type: ignore[assignment]
67 def markcoroutinefunction(func: _F) -> _F:
68 func._is_coroutine = asyncio.coroutines._is_coroutine # type: ignore
69 return func
72class AsyncSingleThreadContext:
73 """Context manager to run async code inside the same thread.
75 Normally, AsyncToSync functions run either inside a separate ThreadPoolExecutor or
76 the main event loop if it exists. This context manager ensures that all AsyncToSync
77 functions execute within the same thread.
79 This context manager is re-entrant, so only the outer-most call to
80 AsyncSingleThreadContext will set the context.
82 Usage:
84 >>> import asyncio
85 >>> with AsyncSingleThreadContext():
86 ... async_to_sync(asyncio.sleep(1))()
87 """
89 def __init__(self):
90 self.token = None
92 def __enter__(self):
93 try:
94 AsyncToSync.async_single_thread_context.get()
95 except LookupError:
96 self.token = AsyncToSync.async_single_thread_context.set(self)
98 return self
100 def __exit__(self, exc, value, tb):
101 if not self.token:
102 return
104 executor = AsyncToSync.context_to_thread_executor.pop(self, None)
105 if executor:
106 executor.shutdown()
108 AsyncToSync.async_single_thread_context.reset(self.token)
111class ThreadSensitiveContext:
112 """Async context manager to manage context for thread sensitive mode
114 This context manager controls which thread pool executor is used when in
115 thread sensitive mode. By default, a single thread pool executor is shared
116 within a process.
118 The ThreadSensitiveContext() context manager may be used to specify a
119 thread pool per context.
121 This context manager is re-entrant, so only the outer-most call to
122 ThreadSensitiveContext will set the context.
124 Usage:
126 >>> import time
127 >>> async with ThreadSensitiveContext():
128 ... await sync_to_async(time.sleep, 1)()
129 """
131 def __init__(self):
132 self.token = None
134 async def __aenter__(self):
135 try:
136 SyncToAsync.thread_sensitive_context.get()
137 except LookupError:
138 self.token = SyncToAsync.thread_sensitive_context.set(self)
140 return self
142 async def __aexit__(self, exc, value, tb):
143 if not self.token:
144 return
146 executor = SyncToAsync.context_to_thread_executor.pop(self, None)
147 if executor:
148 executor.shutdown()
149 SyncToAsync.thread_sensitive_context.reset(self.token)
152class AsyncToSync(Generic[_P, _R]):
153 """
154 Utility class which turns an awaitable that only works on the thread with
155 the event loop into a synchronous callable that works in a subthread.
157 If the call stack contains an async loop, the code runs there.
158 Otherwise, the code runs in a new loop in a new thread.
160 Either way, this thread then pauses and waits to run any thread_sensitive
161 code called from further down the call stack using SyncToAsync, before
162 finally exiting once the async task returns.
163 """
165 # Keeps a reference to the CurrentThreadExecutor in local context, so that
166 # any sync_to_async inside the wrapped code can find it.
167 executors: "Local" = Local()
169 # When we can't find a CurrentThreadExecutor from the context, such as
170 # inside create_task, we'll look it up here from the running event loop.
171 loop_thread_executors: "Dict[asyncio.AbstractEventLoop, CurrentThreadExecutor]" = {}
173 async_single_thread_context: "contextvars.ContextVar[AsyncSingleThreadContext]" = (
174 contextvars.ContextVar("async_single_thread_context")
175 )
177 context_to_thread_executor: "weakref.WeakKeyDictionary[AsyncSingleThreadContext, ThreadPoolExecutor]" = (
178 weakref.WeakKeyDictionary()
179 )
181 def __init__(
182 self,
183 awaitable: Union[
184 Callable[_P, Coroutine[Any, Any, _R]],
185 Callable[_P, Awaitable[_R]],
186 ],
187 force_new_loop: bool = False,
188 ):
189 if not callable(awaitable) or (
190 not iscoroutinefunction(awaitable)
191 and not iscoroutinefunction(getattr(awaitable, "__call__", awaitable))
192 ):
193 # Python does not have very reliable detection of async functions
194 # (lots of false negatives) so this is just a warning.
195 warnings.warn(
196 "async_to_sync was passed a non-async-marked callable", stacklevel=2
197 )
198 self.awaitable = awaitable
199 try:
200 self.__self__ = self.awaitable.__self__ # type: ignore[union-attr]
201 except AttributeError:
202 pass
203 self.force_new_loop = force_new_loop
204 self.main_event_loop = None
205 try:
206 self.main_event_loop = asyncio.get_running_loop()
207 except RuntimeError:
208 # There's no event loop in this thread.
209 pass
211 def __call__(self, *args: _P.args, **kwargs: _P.kwargs) -> _R:
212 __traceback_hide__ = True # noqa: F841
214 if not self.force_new_loop and not self.main_event_loop:
215 # There's no event loop in this thread. Look for the threadlocal if
216 # we're inside SyncToAsync
217 main_event_loop_pid = getattr(
218 SyncToAsync.threadlocal, "main_event_loop_pid", None
219 )
220 # We make sure the parent loop is from the same process - if
221 # they've forked, this is not going to be valid any more (#194)
222 if main_event_loop_pid and main_event_loop_pid == os.getpid():
223 self.main_event_loop = getattr(
224 SyncToAsync.threadlocal, "main_event_loop", None
225 )
227 # You can't call AsyncToSync from a thread with a running event loop
228 try:
229 asyncio.get_running_loop()
230 except RuntimeError:
231 pass
232 else:
233 raise RuntimeError(
234 "You cannot use AsyncToSync in the same thread as an async event loop - "
235 "just await the async function directly."
236 )
238 # Make a future for the return information
239 call_result: "Future[_R]" = Future()
241 # Make a CurrentThreadExecutor we'll use to idle in this thread - we
242 # need one for every sync frame, even if there's one above us in the
243 # same thread.
244 old_executor = getattr(self.executors, "current", None)
245 current_executor = CurrentThreadExecutor(old_executor)
246 self.executors.current = current_executor
248 # Wrapping context in list so it can be reassigned from within
249 # `main_wrap`.
250 context = [contextvars.copy_context()]
252 # Get task context so that parent task knows which task to propagate
253 # an asyncio.CancelledError to.
254 task_context = getattr(SyncToAsync.threadlocal, "task_context", None)
256 # Use call_soon_threadsafe to schedule a synchronous callback on the
257 # main event loop's thread if it's there, otherwise make a new loop
258 # in this thread.
259 try:
260 awaitable = self.main_wrap(
261 call_result,
262 sys.exc_info(),
263 task_context,
264 context,
265 # prepare an awaitable which can be passed as is to self.main_wrap,
266 # so that `args` and `kwargs` don't need to be
267 # destructured when passed to self.main_wrap
268 # (which is required by `ParamSpec`)
269 # as that may cause overlapping arguments
270 self.awaitable(*args, **kwargs),
271 )
273 async def new_loop_wrap() -> None:
274 loop = asyncio.get_running_loop()
275 self.loop_thread_executors[loop] = current_executor
276 try:
277 await awaitable
278 finally:
279 del self.loop_thread_executors[loop]
281 if self.main_event_loop is not None:
282 try:
283 self.main_event_loop.call_soon_threadsafe(
284 self.main_event_loop.create_task, awaitable
285 )
286 except RuntimeError:
287 running_in_main_event_loop = False
288 else:
289 running_in_main_event_loop = True
290 # Run the CurrentThreadExecutor until the future is done.
291 current_executor.run_until_future(call_result)
292 else:
293 running_in_main_event_loop = False
295 if not running_in_main_event_loop:
296 loop_executor = None
298 if self.async_single_thread_context.get(None):
299 single_thread_context = self.async_single_thread_context.get()
301 if single_thread_context in self.context_to_thread_executor:
302 loop_executor = self.context_to_thread_executor[
303 single_thread_context
304 ]
305 else:
306 loop_executor = ThreadPoolExecutor(max_workers=1)
307 self.context_to_thread_executor[
308 single_thread_context
309 ] = loop_executor
310 else:
311 # Make our own event loop - in a new thread - and run inside that.
312 loop_executor = ThreadPoolExecutor(max_workers=1)
314 loop_future = loop_executor.submit(asyncio.run, new_loop_wrap())
315 # Run the CurrentThreadExecutor until the future is done.
316 current_executor.run_until_future(loop_future)
317 # Wait for future and/or allow for exception propagation
318 loop_future.result()
319 finally:
320 _restore_context(context[0])
321 # Restore old current thread executor state
322 self.executors.current = old_executor
324 # Wait for results from the future.
325 return call_result.result()
327 def __get__(self, parent: Any, objtype: Any) -> Callable[_P, _R]:
328 """
329 Include self for methods
330 """
331 func = functools.partial(self.__call__, parent)
332 return functools.update_wrapper(func, self.awaitable)
334 async def main_wrap(
335 self,
336 call_result: "Future[_R]",
337 exc_info: "OptExcInfo",
338 task_context: "Optional[List[asyncio.Task[Any]]]",
339 context: List[contextvars.Context],
340 awaitable: Union[Coroutine[Any, Any, _R], Awaitable[_R]],
341 ) -> None:
342 """
343 Wraps the awaitable with something that puts the result into the
344 result/exception future.
345 """
347 __traceback_hide__ = True # noqa: F841
349 if context is not None:
350 _restore_context(context[0])
352 current_task = asyncio.current_task()
353 if current_task is not None and task_context is not None:
354 task_context.append(current_task)
356 try:
357 # If we have an exception, run the function inside the except block
358 # after raising it so exc_info is correctly populated.
359 if exc_info[1]:
360 try:
361 raise exc_info[1]
362 except BaseException:
363 result = await awaitable
364 else:
365 result = await awaitable
366 except BaseException as e:
367 call_result.set_exception(e)
368 else:
369 call_result.set_result(result)
370 finally:
371 if current_task is not None and task_context is not None:
372 task_context.remove(current_task)
373 context[0] = contextvars.copy_context()
376class SyncToAsync(Generic[_P, _R]):
377 """
378 Utility class which turns a synchronous callable into an awaitable that
379 runs in a threadpool. It also sets a threadlocal inside the thread so
380 calls to AsyncToSync can escape it.
382 If thread_sensitive is passed, the code will run in the same thread as any
383 outer code. This is needed for underlying Python code that is not
384 threadsafe (for example, code which handles SQLite database connections).
386 If the outermost program is async (i.e. SyncToAsync is outermost), then
387 this will be a dedicated single sub-thread that all sync code runs in,
388 one after the other. If the outermost program is sync (i.e. AsyncToSync is
389 outermost), this will just be the main thread. This is achieved by idling
390 with a CurrentThreadExecutor while AsyncToSync is blocking its sync parent,
391 rather than just blocking.
393 If executor is passed in, that will be used instead of the loop's default executor.
394 In order to pass in an executor, thread_sensitive must be set to False, otherwise
395 a TypeError will be raised.
396 """
398 # Storage for main event loop references
399 threadlocal = threading.local()
401 # Single-thread executor for thread-sensitive code
402 single_thread_executor = ThreadPoolExecutor(max_workers=1)
404 # Maintain a contextvar for the current execution context. Optionally used
405 # for thread sensitive mode.
406 thread_sensitive_context: "contextvars.ContextVar[ThreadSensitiveContext]" = (
407 contextvars.ContextVar("thread_sensitive_context")
408 )
410 # Contextvar that is used to detect if the single thread executor
411 # would be awaited on while already being used in the same context
412 deadlock_context: "contextvars.ContextVar[bool]" = contextvars.ContextVar(
413 "deadlock_context"
414 )
416 # Maintaining a weak reference to the context ensures that thread pools are
417 # erased once the context goes out of scope. This terminates the thread pool.
418 context_to_thread_executor: "weakref.WeakKeyDictionary[ThreadSensitiveContext, ThreadPoolExecutor]" = (
419 weakref.WeakKeyDictionary()
420 )
422 def __init__(
423 self,
424 func: Callable[_P, _R],
425 thread_sensitive: bool = True,
426 executor: Optional["ThreadPoolExecutor"] = None,
427 ) -> None:
428 if (
429 not callable(func)
430 or iscoroutinefunction(func)
431 or iscoroutinefunction(getattr(func, "__call__", func))
432 ):
433 raise TypeError("sync_to_async can only be applied to sync functions.")
434 self.func = func
435 functools.update_wrapper(self, func)
436 self._thread_sensitive = thread_sensitive
437 markcoroutinefunction(self)
438 if thread_sensitive and executor is not None:
439 raise TypeError("executor must not be set when thread_sensitive is True")
440 self._executor = executor
441 try:
442 self.__self__ = func.__self__ # type: ignore
443 except AttributeError:
444 pass
446 async def __call__(self, *args: _P.args, **kwargs: _P.kwargs) -> _R:
447 __traceback_hide__ = True # noqa: F841
448 loop = asyncio.get_running_loop()
450 # Work out what thread to run the code in
451 if self._thread_sensitive:
452 current_thread_executor = getattr(AsyncToSync.executors, "current", None)
453 if current_thread_executor:
454 # If we have a parent sync thread above somewhere, use that
455 executor = current_thread_executor
456 elif self.thread_sensitive_context.get(None):
457 # If we have a way of retrieving the current context, attempt
458 # to use a per-context thread pool executor
459 thread_sensitive_context = self.thread_sensitive_context.get()
461 if thread_sensitive_context in self.context_to_thread_executor:
462 # Re-use thread executor in current context
463 executor = self.context_to_thread_executor[thread_sensitive_context]
464 else:
465 # Create new thread executor in current context
466 executor = ThreadPoolExecutor(max_workers=1)
467 self.context_to_thread_executor[thread_sensitive_context] = executor
468 elif loop in AsyncToSync.loop_thread_executors:
469 # Re-use thread executor for running loop
470 executor = AsyncToSync.loop_thread_executors[loop]
471 elif self.deadlock_context.get(False):
472 raise RuntimeError(
473 "Single thread executor already being used, would deadlock"
474 )
475 else:
476 # Otherwise, we run it in a fixed single thread
477 executor = self.single_thread_executor
478 self.deadlock_context.set(True)
479 else:
480 # Use the passed in executor, or the loop's default if it is None
481 executor = self._executor
483 context = contextvars.copy_context()
484 child = functools.partial(self.func, *args, **kwargs)
485 func = context.run
486 task_context: List[asyncio.Task[Any]] = []
488 # Run the code in the right thread
489 exec_coro = loop.run_in_executor(
490 executor,
491 functools.partial(
492 self.thread_handler,
493 loop,
494 sys.exc_info(),
495 task_context,
496 func,
497 child,
498 ),
499 )
500 ret: _R
501 try:
502 ret = await asyncio.shield(exec_coro)
503 except asyncio.CancelledError:
504 cancel_parent = True
505 try:
506 task = task_context[0]
507 task.cancel()
508 try:
509 await task
510 cancel_parent = False
511 except asyncio.CancelledError:
512 pass
513 except IndexError:
514 pass
515 if exec_coro.done():
516 raise
517 if cancel_parent:
518 exec_coro.cancel()
519 ret = await exec_coro
520 finally:
521 _restore_context(context)
522 self.deadlock_context.set(False)
524 return ret
526 def __get__(
527 self, parent: Any, objtype: Any
528 ) -> Callable[_P, Coroutine[Any, Any, _R]]:
529 """
530 Include self for methods
531 """
532 func = functools.partial(self.__call__, parent)
533 return functools.update_wrapper(func, self.func)
535 def thread_handler(self, loop, exc_info, task_context, func, *args, **kwargs):
536 """
537 Wraps the sync application with exception handling.
538 """
540 __traceback_hide__ = True # noqa: F841
542 # Set the threadlocal for AsyncToSync
543 self.threadlocal.main_event_loop = loop
544 self.threadlocal.main_event_loop_pid = os.getpid()
545 self.threadlocal.task_context = task_context
547 # Run the function
548 # If we have an exception, run the function inside the except block
549 # after raising it so exc_info is correctly populated.
550 if exc_info[1]:
551 try:
552 raise exc_info[1]
553 except BaseException:
554 return func(*args, **kwargs)
555 else:
556 return func(*args, **kwargs)
559@overload
560def async_to_sync(
561 *,
562 force_new_loop: bool = False,
563) -> Callable[
564 [Union[Callable[_P, Coroutine[Any, Any, _R]], Callable[_P, Awaitable[_R]]]],
565 Callable[_P, _R],
566]:
567 ...
570@overload
571def async_to_sync(
572 awaitable: Union[
573 Callable[_P, Coroutine[Any, Any, _R]],
574 Callable[_P, Awaitable[_R]],
575 ],
576 *,
577 force_new_loop: bool = False,
578) -> Callable[_P, _R]:
579 ...
582def async_to_sync(
583 awaitable: Optional[
584 Union[
585 Callable[_P, Coroutine[Any, Any, _R]],
586 Callable[_P, Awaitable[_R]],
587 ]
588 ] = None,
589 *,
590 force_new_loop: bool = False,
591) -> Union[
592 Callable[
593 [Union[Callable[_P, Coroutine[Any, Any, _R]], Callable[_P, Awaitable[_R]]]],
594 Callable[_P, _R],
595 ],
596 Callable[_P, _R],
597]:
598 if awaitable is None:
599 return lambda f: AsyncToSync(
600 f,
601 force_new_loop=force_new_loop,
602 )
603 return AsyncToSync(
604 awaitable,
605 force_new_loop=force_new_loop,
606 )
609@overload
610def sync_to_async(
611 *,
612 thread_sensitive: bool = True,
613 executor: Optional["ThreadPoolExecutor"] = None,
614) -> Callable[[Callable[_P, _R]], Callable[_P, Coroutine[Any, Any, _R]]]:
615 ...
618@overload
619def sync_to_async(
620 func: Callable[_P, _R],
621 *,
622 thread_sensitive: bool = True,
623 executor: Optional["ThreadPoolExecutor"] = None,
624) -> Callable[_P, Coroutine[Any, Any, _R]]:
625 ...
628def sync_to_async(
629 func: Optional[Callable[_P, _R]] = None,
630 *,
631 thread_sensitive: bool = True,
632 executor: Optional["ThreadPoolExecutor"] = None,
633) -> Union[
634 Callable[[Callable[_P, _R]], Callable[_P, Coroutine[Any, Any, _R]]],
635 Callable[_P, Coroutine[Any, Any, _R]],
636]:
637 if func is None:
638 return lambda f: SyncToAsync(
639 f,
640 thread_sensitive=thread_sensitive,
641 executor=executor,
642 )
643 return SyncToAsync(
644 func,
645 thread_sensitive=thread_sensitive,
646 executor=executor,
647 )