1from __future__ import annotations
2
3import math
4import sys
5import threading
6from collections.abc import Awaitable, Callable, Generator
7from contextlib import contextmanager
8from importlib import import_module
9from typing import TYPE_CHECKING, Any, TypeVar
10
11import sniffio
12
13if sys.version_info >= (3, 11):
14 from typing import TypeVarTuple, Unpack
15else:
16 from typing_extensions import TypeVarTuple, Unpack
17
18if TYPE_CHECKING:
19 from ..abc import AsyncBackend
20
21# This must be updated when new backends are introduced
22BACKENDS = "asyncio", "trio"
23
24T_Retval = TypeVar("T_Retval")
25PosArgsT = TypeVarTuple("PosArgsT")
26
27threadlocals = threading.local()
28
29
30def run(
31 func: Callable[[Unpack[PosArgsT]], Awaitable[T_Retval]],
32 *args: Unpack[PosArgsT],
33 backend: str = "asyncio",
34 backend_options: dict[str, Any] | None = None,
35) -> T_Retval:
36 """
37 Run the given coroutine function in an asynchronous event loop.
38
39 The current thread must not be already running an event loop.
40
41 :param func: a coroutine function
42 :param args: positional arguments to ``func``
43 :param backend: name of the asynchronous event loop implementation – currently
44 either ``asyncio`` or ``trio``
45 :param backend_options: keyword arguments to call the backend ``run()``
46 implementation with (documented :ref:`here <backend options>`)
47 :return: the return value of the coroutine function
48 :raises RuntimeError: if an asynchronous event loop is already running in this
49 thread
50 :raises LookupError: if the named backend is not found
51
52 """
53 try:
54 asynclib_name = sniffio.current_async_library()
55 except sniffio.AsyncLibraryNotFoundError:
56 pass
57 else:
58 raise RuntimeError(f"Already running {asynclib_name} in this thread")
59
60 try:
61 async_backend = get_async_backend(backend)
62 except ImportError as exc:
63 raise LookupError(f"No such backend: {backend}") from exc
64
65 token = None
66 if sniffio.current_async_library_cvar.get(None) is None:
67 # Since we're in control of the event loop, we can cache the name of the async
68 # library
69 token = sniffio.current_async_library_cvar.set(backend)
70
71 try:
72 backend_options = backend_options or {}
73 return async_backend.run(func, args, {}, backend_options)
74 finally:
75 if token:
76 sniffio.current_async_library_cvar.reset(token)
77
78
79async def sleep(delay: float) -> None:
80 """
81 Pause the current task for the specified duration.
82
83 :param delay: the duration, in seconds
84
85 """
86 return await get_async_backend().sleep(delay)
87
88
89async def sleep_forever() -> None:
90 """
91 Pause the current task until it's cancelled.
92
93 This is a shortcut for ``sleep(math.inf)``.
94
95 .. versionadded:: 3.1
96
97 """
98 await sleep(math.inf)
99
100
101async def sleep_until(deadline: float) -> None:
102 """
103 Pause the current task until the given time.
104
105 :param deadline: the absolute time to wake up at (according to the internal
106 monotonic clock of the event loop)
107
108 .. versionadded:: 3.1
109
110 """
111 now = current_time()
112 await sleep(max(deadline - now, 0))
113
114
115def current_time() -> float:
116 """
117 Return the current value of the event loop's internal clock.
118
119 :return: the clock value (seconds)
120
121 """
122 return get_async_backend().current_time()
123
124
125def get_all_backends() -> tuple[str, ...]:
126 """Return a tuple of the names of all built-in backends."""
127 return BACKENDS
128
129
130def get_cancelled_exc_class() -> type[BaseException]:
131 """Return the current async library's cancellation exception class."""
132 return get_async_backend().cancelled_exception_class()
133
134
135#
136# Private API
137#
138
139
140@contextmanager
141def claim_worker_thread(
142 backend_class: type[AsyncBackend], token: object
143) -> Generator[Any, None, None]:
144 threadlocals.current_async_backend = backend_class
145 threadlocals.current_token = token
146 try:
147 yield
148 finally:
149 del threadlocals.current_async_backend
150 del threadlocals.current_token
151
152
153def get_async_backend(asynclib_name: str | None = None) -> AsyncBackend:
154 if asynclib_name is None:
155 asynclib_name = sniffio.current_async_library()
156
157 modulename = "anyio._backends._" + asynclib_name
158 try:
159 module = sys.modules[modulename]
160 except KeyError:
161 module = import_module(modulename)
162
163 return getattr(module, "backend_class")