1from __future__ import annotations
2
3import math
4import sys
5import threading
6from collections.abc import Awaitable, Callable, Generator
7from contextlib import contextmanager
8from contextvars import Token
9from importlib import import_module
10from typing import TYPE_CHECKING, Any, TypeVar
11
12from ._exceptions import NoEventLoopError
13
14if sys.version_info >= (3, 11):
15 from typing import TypeVarTuple, Unpack
16else:
17 from typing_extensions import TypeVarTuple, Unpack
18
19sniffio: Any
20try:
21 import sniffio
22except ModuleNotFoundError:
23 sniffio = None
24
25if TYPE_CHECKING:
26 from ..abc import AsyncBackend
27
28# This must be updated when new backends are introduced
29BACKENDS = "asyncio", "trio"
30
31T_Retval = TypeVar("T_Retval")
32PosArgsT = TypeVarTuple("PosArgsT")
33
34threadlocals = threading.local()
35loaded_backends: dict[str, type[AsyncBackend]] = {}
36
37
38def run(
39 func: Callable[[Unpack[PosArgsT]], Awaitable[T_Retval]],
40 *args: Unpack[PosArgsT],
41 backend: str = "asyncio",
42 backend_options: dict[str, Any] | None = None,
43) -> T_Retval:
44 """
45 Run the given coroutine function in an asynchronous event loop.
46
47 The current thread must not be already running an event loop.
48
49 :param func: a coroutine function
50 :param args: positional arguments to ``func``
51 :param backend: name of the asynchronous event loop implementation – currently
52 either ``asyncio`` or ``trio``
53 :param backend_options: keyword arguments to call the backend ``run()``
54 implementation with (documented :ref:`here <backend options>`)
55 :return: the return value of the coroutine function
56 :raises RuntimeError: if an asynchronous event loop is already running in this
57 thread
58 :raises LookupError: if the named backend is not found
59
60 """
61 if asynclib_name := current_async_library():
62 raise RuntimeError(f"Already running {asynclib_name} in this thread")
63
64 try:
65 async_backend = get_async_backend(backend)
66 except ImportError as exc:
67 raise LookupError(f"No such backend: {backend}") from exc
68
69 token = None
70 if asynclib_name is None:
71 # Since we're in control of the event loop, we can cache the name of the async
72 # library
73 token = set_current_async_library(backend)
74
75 try:
76 backend_options = backend_options or {}
77 return async_backend.run(func, args, {}, backend_options)
78 finally:
79 reset_current_async_library(token)
80
81
82async def sleep(delay: float) -> None:
83 """
84 Pause the current task for the specified duration.
85
86 :param delay: the duration, in seconds
87
88 """
89 return await get_async_backend().sleep(delay)
90
91
92async def sleep_forever() -> None:
93 """
94 Pause the current task until it's cancelled.
95
96 This is a shortcut for ``sleep(math.inf)``.
97
98 .. versionadded:: 3.1
99
100 """
101 await sleep(math.inf)
102
103
104async def sleep_until(deadline: float) -> None:
105 """
106 Pause the current task until the given time.
107
108 :param deadline: the absolute time to wake up at (according to the internal
109 monotonic clock of the event loop)
110
111 .. versionadded:: 3.1
112
113 """
114 now = current_time()
115 await sleep(max(deadline - now, 0))
116
117
118def current_time() -> float:
119 """
120 Return the current value of the event loop's internal clock.
121
122 :return: the clock value (seconds)
123 :raises NoEventLoopError: if no supported asynchronous event loop is running in the
124 current thread
125
126 """
127 return get_async_backend().current_time()
128
129
130def get_all_backends() -> tuple[str, ...]:
131 """Return a tuple of the names of all built-in backends."""
132 return BACKENDS
133
134
135def get_available_backends() -> tuple[str, ...]:
136 """
137 Test for the availability of built-in backends.
138
139 :return a tuple of the built-in backend names that were successfully imported
140
141 .. versionadded:: 4.12
142
143 """
144 available_backends: list[str] = []
145 for backend_name in get_all_backends():
146 try:
147 get_async_backend(backend_name)
148 except ImportError:
149 continue
150
151 available_backends.append(backend_name)
152
153 return tuple(available_backends)
154
155
156def get_cancelled_exc_class() -> type[BaseException]:
157 """
158 Return the current async library's cancellation exception class.
159
160 :raises NoEventLoopError: if no supported asynchronous event loop is running in the
161 current thread
162
163 """
164 return get_async_backend().cancelled_exception_class()
165
166
167#
168# Private API
169#
170
171
172@contextmanager
173def claim_worker_thread(
174 backend_class: type[AsyncBackend], token: object
175) -> Generator[Any, None, None]:
176 from ..lowlevel import EventLoopToken
177
178 threadlocals.current_token = EventLoopToken(backend_class, token)
179 try:
180 yield
181 finally:
182 del threadlocals.current_token
183
184
185def get_async_backend(asynclib_name: str | None = None) -> type[AsyncBackend]:
186 if asynclib_name is None:
187 asynclib_name = current_async_library()
188 if not asynclib_name:
189 raise NoEventLoopError(
190 f"Not currently running on any asynchronous event loop. "
191 f"Available async backends: {', '.join(get_all_backends())}"
192 )
193
194 # We use our own dict instead of sys.modules to get the already imported back-end
195 # class because the appropriate modules in sys.modules could potentially be only
196 # partially initialized
197 try:
198 return loaded_backends[asynclib_name]
199 except KeyError:
200 module = import_module(f"anyio._backends._{asynclib_name}")
201 loaded_backends[asynclib_name] = module.backend_class
202 return module.backend_class
203
204
205def current_async_library() -> str | None:
206 if sniffio is None:
207 # If sniffio is not installed, we assume we're either running asyncio or nothing
208 import asyncio
209
210 try:
211 asyncio.get_running_loop()
212 return "asyncio"
213 except RuntimeError:
214 pass
215 else:
216 try:
217 return sniffio.current_async_library()
218 except sniffio.AsyncLibraryNotFoundError:
219 pass
220
221 return None
222
223
224def set_current_async_library(asynclib_name: str | None) -> Token | None:
225 # no-op if sniffio is not installed
226 if sniffio is None:
227 return None
228
229 return sniffio.current_async_library_cvar.set(asynclib_name)
230
231
232def reset_current_async_library(token: Token | None) -> None:
233 if token is not None:
234 sniffio.current_async_library_cvar.reset(token)