Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/asgiref/sync.py: 23%
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1import asyncio
2import asyncio.coroutines
3import contextvars
4import functools
5import inspect
6import os
7import sys
8import threading
9import warnings
10import weakref
11from concurrent.futures import Future, ThreadPoolExecutor
12from typing import (
13 TYPE_CHECKING,
14 Any,
15 Awaitable,
16 Callable,
17 Coroutine,
18 Dict,
19 Generic,
20 List,
21 Optional,
22 TypeVar,
23 Union,
24 overload,
25)
27from .current_thread_executor import CurrentThreadExecutor
28from .local import Local
30if sys.version_info >= (3, 10):
31 from typing import ParamSpec
32else:
33 from typing_extensions import ParamSpec
35if TYPE_CHECKING:
36 # This is not available to import at runtime
37 from _typeshed import OptExcInfo
39_F = TypeVar("_F", bound=Callable[..., Any])
40_P = ParamSpec("_P")
41_R = TypeVar("_R")
44def _restore_context(context: contextvars.Context) -> None:
45 # Check for changes in contextvars, and set them to the current
46 # context for downstream consumers
47 for cvar in context:
48 cvalue = context.get(cvar)
49 try:
50 if cvar.get() != cvalue:
51 cvar.set(cvalue)
52 except LookupError:
53 cvar.set(cvalue)
56# Python 3.12 deprecates asyncio.iscoroutinefunction() as an alias for
57# inspect.iscoroutinefunction(), whilst also removing the _is_coroutine marker.
58# The latter is replaced with the inspect.markcoroutinefunction decorator.
59# Until 3.12 is the minimum supported Python version, provide a shim.
61if hasattr(inspect, "markcoroutinefunction"):
62 iscoroutinefunction = inspect.iscoroutinefunction
63 markcoroutinefunction: Callable[[_F], _F] = inspect.markcoroutinefunction
64else:
65 iscoroutinefunction = asyncio.iscoroutinefunction # type: ignore[assignment]
67 def markcoroutinefunction(func: _F) -> _F:
68 func._is_coroutine = asyncio.coroutines._is_coroutine # type: ignore
69 return func
72class ThreadSensitiveContext:
73 """Async context manager to manage context for thread sensitive mode
75 This context manager controls which thread pool executor is used when in
76 thread sensitive mode. By default, a single thread pool executor is shared
77 within a process.
79 The ThreadSensitiveContext() context manager may be used to specify a
80 thread pool per context.
82 This context manager is re-entrant, so only the outer-most call to
83 ThreadSensitiveContext will set the context.
85 Usage:
87 >>> import time
88 >>> async with ThreadSensitiveContext():
89 ... await sync_to_async(time.sleep, 1)()
90 """
92 def __init__(self):
93 self.token = None
95 async def __aenter__(self):
96 try:
97 SyncToAsync.thread_sensitive_context.get()
98 except LookupError:
99 self.token = SyncToAsync.thread_sensitive_context.set(self)
101 return self
103 async def __aexit__(self, exc, value, tb):
104 if not self.token:
105 return
107 executor = SyncToAsync.context_to_thread_executor.pop(self, None)
108 if executor:
109 executor.shutdown()
110 SyncToAsync.thread_sensitive_context.reset(self.token)
113class AsyncToSync(Generic[_P, _R]):
114 """
115 Utility class which turns an awaitable that only works on the thread with
116 the event loop into a synchronous callable that works in a subthread.
118 If the call stack contains an async loop, the code runs there.
119 Otherwise, the code runs in a new loop in a new thread.
121 Either way, this thread then pauses and waits to run any thread_sensitive
122 code called from further down the call stack using SyncToAsync, before
123 finally exiting once the async task returns.
124 """
126 # Keeps a reference to the CurrentThreadExecutor in local context, so that
127 # any sync_to_async inside the wrapped code can find it.
128 executors: "Local" = Local()
130 # When we can't find a CurrentThreadExecutor from the context, such as
131 # inside create_task, we'll look it up here from the running event loop.
132 loop_thread_executors: "Dict[asyncio.AbstractEventLoop, CurrentThreadExecutor]" = {}
134 def __init__(
135 self,
136 awaitable: Union[
137 Callable[_P, Coroutine[Any, Any, _R]],
138 Callable[_P, Awaitable[_R]],
139 ],
140 force_new_loop: bool = False,
141 ):
142 if not callable(awaitable) or (
143 not iscoroutinefunction(awaitable)
144 and not iscoroutinefunction(getattr(awaitable, "__call__", awaitable))
145 ):
146 # Python does not have very reliable detection of async functions
147 # (lots of false negatives) so this is just a warning.
148 warnings.warn(
149 "async_to_sync was passed a non-async-marked callable", stacklevel=2
150 )
151 self.awaitable = awaitable
152 try:
153 self.__self__ = self.awaitable.__self__ # type: ignore[union-attr]
154 except AttributeError:
155 pass
156 self.force_new_loop = force_new_loop
157 self.main_event_loop = None
158 try:
159 self.main_event_loop = asyncio.get_running_loop()
160 except RuntimeError:
161 # There's no event loop in this thread.
162 pass
164 def __call__(self, *args: _P.args, **kwargs: _P.kwargs) -> _R:
165 __traceback_hide__ = True # noqa: F841
167 if not self.force_new_loop and not self.main_event_loop:
168 # There's no event loop in this thread. Look for the threadlocal if
169 # we're inside SyncToAsync
170 main_event_loop_pid = getattr(
171 SyncToAsync.threadlocal, "main_event_loop_pid", None
172 )
173 # We make sure the parent loop is from the same process - if
174 # they've forked, this is not going to be valid any more (#194)
175 if main_event_loop_pid and main_event_loop_pid == os.getpid():
176 self.main_event_loop = getattr(
177 SyncToAsync.threadlocal, "main_event_loop", None
178 )
180 # You can't call AsyncToSync from a thread with a running event loop
181 try:
182 asyncio.get_running_loop()
183 except RuntimeError:
184 pass
185 else:
186 raise RuntimeError(
187 "You cannot use AsyncToSync in the same thread as an async event loop - "
188 "just await the async function directly."
189 )
191 # Make a future for the return information
192 call_result: "Future[_R]" = Future()
194 # Make a CurrentThreadExecutor we'll use to idle in this thread - we
195 # need one for every sync frame, even if there's one above us in the
196 # same thread.
197 old_executor = getattr(self.executors, "current", None)
198 current_executor = CurrentThreadExecutor(old_executor)
199 self.executors.current = current_executor
201 # Wrapping context in list so it can be reassigned from within
202 # `main_wrap`.
203 context = [contextvars.copy_context()]
205 # Get task context so that parent task knows which task to propagate
206 # an asyncio.CancelledError to.
207 task_context = getattr(SyncToAsync.threadlocal, "task_context", None)
209 # Use call_soon_threadsafe to schedule a synchronous callback on the
210 # main event loop's thread if it's there, otherwise make a new loop
211 # in this thread.
212 try:
213 awaitable = self.main_wrap(
214 call_result,
215 sys.exc_info(),
216 task_context,
217 context,
218 # prepare an awaitable which can be passed as is to self.main_wrap,
219 # so that `args` and `kwargs` don't need to be
220 # destructured when passed to self.main_wrap
221 # (which is required by `ParamSpec`)
222 # as that may cause overlapping arguments
223 self.awaitable(*args, **kwargs),
224 )
226 async def new_loop_wrap() -> None:
227 loop = asyncio.get_running_loop()
228 self.loop_thread_executors[loop] = current_executor
229 try:
230 await awaitable
231 finally:
232 del self.loop_thread_executors[loop]
234 if self.main_event_loop is not None:
235 try:
236 self.main_event_loop.call_soon_threadsafe(
237 self.main_event_loop.create_task, awaitable
238 )
239 except RuntimeError:
240 running_in_main_event_loop = False
241 else:
242 running_in_main_event_loop = True
243 # Run the CurrentThreadExecutor until the future is done.
244 current_executor.run_until_future(call_result)
245 else:
246 running_in_main_event_loop = False
248 if not running_in_main_event_loop:
249 # Make our own event loop - in a new thread - and run inside that.
250 loop_executor = ThreadPoolExecutor(max_workers=1)
251 loop_future = loop_executor.submit(asyncio.run, new_loop_wrap())
252 # Run the CurrentThreadExecutor until the future is done.
253 current_executor.run_until_future(loop_future)
254 # Wait for future and/or allow for exception propagation
255 loop_future.result()
256 finally:
257 _restore_context(context[0])
258 # Restore old current thread executor state
259 self.executors.current = old_executor
261 # Wait for results from the future.
262 return call_result.result()
264 def __get__(self, parent: Any, objtype: Any) -> Callable[_P, _R]:
265 """
266 Include self for methods
267 """
268 func = functools.partial(self.__call__, parent)
269 return functools.update_wrapper(func, self.awaitable)
271 async def main_wrap(
272 self,
273 call_result: "Future[_R]",
274 exc_info: "OptExcInfo",
275 task_context: "Optional[List[asyncio.Task[Any]]]",
276 context: List[contextvars.Context],
277 awaitable: Union[Coroutine[Any, Any, _R], Awaitable[_R]],
278 ) -> None:
279 """
280 Wraps the awaitable with something that puts the result into the
281 result/exception future.
282 """
284 __traceback_hide__ = True # noqa: F841
286 if context is not None:
287 _restore_context(context[0])
289 current_task = asyncio.current_task()
290 if current_task is not None and task_context is not None:
291 task_context.append(current_task)
293 try:
294 # If we have an exception, run the function inside the except block
295 # after raising it so exc_info is correctly populated.
296 if exc_info[1]:
297 try:
298 raise exc_info[1]
299 except BaseException:
300 result = await awaitable
301 else:
302 result = await awaitable
303 except BaseException as e:
304 call_result.set_exception(e)
305 else:
306 call_result.set_result(result)
307 finally:
308 if current_task is not None and task_context is not None:
309 task_context.remove(current_task)
310 context[0] = contextvars.copy_context()
313class SyncToAsync(Generic[_P, _R]):
314 """
315 Utility class which turns a synchronous callable into an awaitable that
316 runs in a threadpool. It also sets a threadlocal inside the thread so
317 calls to AsyncToSync can escape it.
319 If thread_sensitive is passed, the code will run in the same thread as any
320 outer code. This is needed for underlying Python code that is not
321 threadsafe (for example, code which handles SQLite database connections).
323 If the outermost program is async (i.e. SyncToAsync is outermost), then
324 this will be a dedicated single sub-thread that all sync code runs in,
325 one after the other. If the outermost program is sync (i.e. AsyncToSync is
326 outermost), this will just be the main thread. This is achieved by idling
327 with a CurrentThreadExecutor while AsyncToSync is blocking its sync parent,
328 rather than just blocking.
330 If executor is passed in, that will be used instead of the loop's default executor.
331 In order to pass in an executor, thread_sensitive must be set to False, otherwise
332 a TypeError will be raised.
333 """
335 # Storage for main event loop references
336 threadlocal = threading.local()
338 # Single-thread executor for thread-sensitive code
339 single_thread_executor = ThreadPoolExecutor(max_workers=1)
341 # Maintain a contextvar for the current execution context. Optionally used
342 # for thread sensitive mode.
343 thread_sensitive_context: "contextvars.ContextVar[ThreadSensitiveContext]" = (
344 contextvars.ContextVar("thread_sensitive_context")
345 )
347 # Contextvar that is used to detect if the single thread executor
348 # would be awaited on while already being used in the same context
349 deadlock_context: "contextvars.ContextVar[bool]" = contextvars.ContextVar(
350 "deadlock_context"
351 )
353 # Maintaining a weak reference to the context ensures that thread pools are
354 # erased once the context goes out of scope. This terminates the thread pool.
355 context_to_thread_executor: "weakref.WeakKeyDictionary[ThreadSensitiveContext, ThreadPoolExecutor]" = (
356 weakref.WeakKeyDictionary()
357 )
359 def __init__(
360 self,
361 func: Callable[_P, _R],
362 thread_sensitive: bool = True,
363 executor: Optional["ThreadPoolExecutor"] = None,
364 ) -> None:
365 if (
366 not callable(func)
367 or iscoroutinefunction(func)
368 or iscoroutinefunction(getattr(func, "__call__", func))
369 ):
370 raise TypeError("sync_to_async can only be applied to sync functions.")
371 self.func = func
372 functools.update_wrapper(self, func)
373 self._thread_sensitive = thread_sensitive
374 markcoroutinefunction(self)
375 if thread_sensitive and executor is not None:
376 raise TypeError("executor must not be set when thread_sensitive is True")
377 self._executor = executor
378 try:
379 self.__self__ = func.__self__ # type: ignore
380 except AttributeError:
381 pass
383 async def __call__(self, *args: _P.args, **kwargs: _P.kwargs) -> _R:
384 __traceback_hide__ = True # noqa: F841
385 loop = asyncio.get_running_loop()
387 # Work out what thread to run the code in
388 if self._thread_sensitive:
389 current_thread_executor = getattr(AsyncToSync.executors, "current", None)
390 if current_thread_executor:
391 # If we have a parent sync thread above somewhere, use that
392 executor = current_thread_executor
393 elif self.thread_sensitive_context.get(None):
394 # If we have a way of retrieving the current context, attempt
395 # to use a per-context thread pool executor
396 thread_sensitive_context = self.thread_sensitive_context.get()
398 if thread_sensitive_context in self.context_to_thread_executor:
399 # Re-use thread executor in current context
400 executor = self.context_to_thread_executor[thread_sensitive_context]
401 else:
402 # Create new thread executor in current context
403 executor = ThreadPoolExecutor(max_workers=1)
404 self.context_to_thread_executor[thread_sensitive_context] = executor
405 elif loop in AsyncToSync.loop_thread_executors:
406 # Re-use thread executor for running loop
407 executor = AsyncToSync.loop_thread_executors[loop]
408 elif self.deadlock_context.get(False):
409 raise RuntimeError(
410 "Single thread executor already being used, would deadlock"
411 )
412 else:
413 # Otherwise, we run it in a fixed single thread
414 executor = self.single_thread_executor
415 self.deadlock_context.set(True)
416 else:
417 # Use the passed in executor, or the loop's default if it is None
418 executor = self._executor
420 context = contextvars.copy_context()
421 child = functools.partial(self.func, *args, **kwargs)
422 func = context.run
423 task_context: List[asyncio.Task[Any]] = []
425 # Run the code in the right thread
426 exec_coro = loop.run_in_executor(
427 executor,
428 functools.partial(
429 self.thread_handler,
430 loop,
431 sys.exc_info(),
432 task_context,
433 func,
434 child,
435 ),
436 )
437 ret: _R
438 try:
439 ret = await asyncio.shield(exec_coro)
440 except asyncio.CancelledError:
441 cancel_parent = True
442 try:
443 task = task_context[0]
444 task.cancel()
445 try:
446 await task
447 cancel_parent = False
448 except asyncio.CancelledError:
449 pass
450 except IndexError:
451 pass
452 if exec_coro.done():
453 raise
454 if cancel_parent:
455 exec_coro.cancel()
456 ret = await exec_coro
457 finally:
458 _restore_context(context)
459 self.deadlock_context.set(False)
461 return ret
463 def __get__(
464 self, parent: Any, objtype: Any
465 ) -> Callable[_P, Coroutine[Any, Any, _R]]:
466 """
467 Include self for methods
468 """
469 func = functools.partial(self.__call__, parent)
470 return functools.update_wrapper(func, self.func)
472 def thread_handler(self, loop, exc_info, task_context, func, *args, **kwargs):
473 """
474 Wraps the sync application with exception handling.
475 """
477 __traceback_hide__ = True # noqa: F841
479 # Set the threadlocal for AsyncToSync
480 self.threadlocal.main_event_loop = loop
481 self.threadlocal.main_event_loop_pid = os.getpid()
482 self.threadlocal.task_context = task_context
484 # Run the function
485 # If we have an exception, run the function inside the except block
486 # after raising it so exc_info is correctly populated.
487 if exc_info[1]:
488 try:
489 raise exc_info[1]
490 except BaseException:
491 return func(*args, **kwargs)
492 else:
493 return func(*args, **kwargs)
496@overload
497def async_to_sync(
498 *,
499 force_new_loop: bool = False,
500) -> Callable[
501 [Union[Callable[_P, Coroutine[Any, Any, _R]], Callable[_P, Awaitable[_R]]]],
502 Callable[_P, _R],
503]:
504 ...
507@overload
508def async_to_sync(
509 awaitable: Union[
510 Callable[_P, Coroutine[Any, Any, _R]],
511 Callable[_P, Awaitable[_R]],
512 ],
513 *,
514 force_new_loop: bool = False,
515) -> Callable[_P, _R]:
516 ...
519def async_to_sync(
520 awaitable: Optional[
521 Union[
522 Callable[_P, Coroutine[Any, Any, _R]],
523 Callable[_P, Awaitable[_R]],
524 ]
525 ] = None,
526 *,
527 force_new_loop: bool = False,
528) -> Union[
529 Callable[
530 [Union[Callable[_P, Coroutine[Any, Any, _R]], Callable[_P, Awaitable[_R]]]],
531 Callable[_P, _R],
532 ],
533 Callable[_P, _R],
534]:
535 if awaitable is None:
536 return lambda f: AsyncToSync(
537 f,
538 force_new_loop=force_new_loop,
539 )
540 return AsyncToSync(
541 awaitable,
542 force_new_loop=force_new_loop,
543 )
546@overload
547def sync_to_async(
548 *,
549 thread_sensitive: bool = True,
550 executor: Optional["ThreadPoolExecutor"] = None,
551) -> Callable[[Callable[_P, _R]], Callable[_P, Coroutine[Any, Any, _R]]]:
552 ...
555@overload
556def sync_to_async(
557 func: Callable[_P, _R],
558 *,
559 thread_sensitive: bool = True,
560 executor: Optional["ThreadPoolExecutor"] = None,
561) -> Callable[_P, Coroutine[Any, Any, _R]]:
562 ...
565def sync_to_async(
566 func: Optional[Callable[_P, _R]] = None,
567 *,
568 thread_sensitive: bool = True,
569 executor: Optional["ThreadPoolExecutor"] = None,
570) -> Union[
571 Callable[[Callable[_P, _R]], Callable[_P, Coroutine[Any, Any, _R]]],
572 Callable[_P, Coroutine[Any, Any, _R]],
573]:
574 if func is None:
575 return lambda f: SyncToAsync(
576 f,
577 thread_sensitive=thread_sensitive,
578 executor=executor,
579 )
580 return SyncToAsync(
581 func,
582 thread_sensitive=thread_sensitive,
583 executor=executor,
584 )