1from __future__ import annotations
2
3import math
4import sys
5import threading
6from collections.abc import Awaitable, Callable, Generator
7from contextlib import contextmanager
8from contextvars import Token
9from importlib import import_module
10from typing import TYPE_CHECKING, Any, TypeVar
11
12if sys.version_info >= (3, 11):
13 from typing import TypeVarTuple, Unpack
14else:
15 from typing_extensions import TypeVarTuple, Unpack
16
17sniffio: Any
18try:
19 import sniffio
20except ModuleNotFoundError:
21 sniffio = None
22
23if TYPE_CHECKING:
24 from ..abc import AsyncBackend
25
26# This must be updated when new backends are introduced
27BACKENDS = "asyncio", "trio"
28
29T_Retval = TypeVar("T_Retval")
30PosArgsT = TypeVarTuple("PosArgsT")
31
32threadlocals = threading.local()
33loaded_backends: dict[str, type[AsyncBackend]] = {}
34
35
36def run(
37 func: Callable[[Unpack[PosArgsT]], Awaitable[T_Retval]],
38 *args: Unpack[PosArgsT],
39 backend: str = "asyncio",
40 backend_options: dict[str, Any] | None = None,
41) -> T_Retval:
42 """
43 Run the given coroutine function in an asynchronous event loop.
44
45 The current thread must not be already running an event loop.
46
47 :param func: a coroutine function
48 :param args: positional arguments to ``func``
49 :param backend: name of the asynchronous event loop implementation – currently
50 either ``asyncio`` or ``trio``
51 :param backend_options: keyword arguments to call the backend ``run()``
52 implementation with (documented :ref:`here <backend options>`)
53 :return: the return value of the coroutine function
54 :raises RuntimeError: if an asynchronous event loop is already running in this
55 thread
56 :raises LookupError: if the named backend is not found
57
58 """
59 if asynclib_name := current_async_library():
60 raise RuntimeError(f"Already running {asynclib_name} in this thread")
61
62 try:
63 async_backend = get_async_backend(backend)
64 except ImportError as exc:
65 raise LookupError(f"No such backend: {backend}") from exc
66
67 token = None
68 if asynclib_name is None:
69 # Since we're in control of the event loop, we can cache the name of the async
70 # library
71 token = set_current_async_library(backend)
72
73 try:
74 backend_options = backend_options or {}
75 return async_backend.run(func, args, {}, backend_options)
76 finally:
77 reset_current_async_library(token)
78
79
80async def sleep(delay: float) -> None:
81 """
82 Pause the current task for the specified duration.
83
84 :param delay: the duration, in seconds
85
86 """
87 return await get_async_backend().sleep(delay)
88
89
90async def sleep_forever() -> None:
91 """
92 Pause the current task until it's cancelled.
93
94 This is a shortcut for ``sleep(math.inf)``.
95
96 .. versionadded:: 3.1
97
98 """
99 await sleep(math.inf)
100
101
102async def sleep_until(deadline: float) -> None:
103 """
104 Pause the current task until the given time.
105
106 :param deadline: the absolute time to wake up at (according to the internal
107 monotonic clock of the event loop)
108
109 .. versionadded:: 3.1
110
111 """
112 now = current_time()
113 await sleep(max(deadline - now, 0))
114
115
116def current_time() -> float:
117 """
118 Return the current value of the event loop's internal clock.
119
120 :return: the clock value (seconds)
121
122 """
123 return get_async_backend().current_time()
124
125
126def get_all_backends() -> tuple[str, ...]:
127 """Return a tuple of the names of all built-in backends."""
128 return BACKENDS
129
130
131def get_available_backends() -> tuple[str, ...]:
132 """
133 Test for the availability of built-in backends.
134
135 :return a tuple of the built-in backend names that were successfully imported
136
137 .. versionadded:: 4.12
138
139 """
140 available_backends: list[str] = []
141 for backend_name in get_all_backends():
142 try:
143 get_async_backend(backend_name)
144 except ImportError:
145 continue
146
147 available_backends.append(backend_name)
148
149 return tuple(available_backends)
150
151
152def get_cancelled_exc_class() -> type[BaseException]:
153 """Return the current async library's cancellation exception class."""
154 return get_async_backend().cancelled_exception_class()
155
156
157#
158# Private API
159#
160
161
162@contextmanager
163def claim_worker_thread(
164 backend_class: type[AsyncBackend], token: object
165) -> Generator[Any, None, None]:
166 from ..lowlevel import EventLoopToken
167
168 threadlocals.current_token = EventLoopToken(backend_class, token)
169 try:
170 yield
171 finally:
172 del threadlocals.current_token
173
174
175class NoCurrentAsyncBackend(Exception):
176 def __init__(self) -> None:
177 super().__init__(
178 f"Not currently running on any asynchronous event loop"
179 f"Available async backends: {', '.join(get_all_backends())}"
180 )
181
182
183def get_async_backend(asynclib_name: str | None = None) -> type[AsyncBackend]:
184 if asynclib_name is None:
185 asynclib_name = current_async_library()
186 if not asynclib_name:
187 raise NoCurrentAsyncBackend
188
189 # We use our own dict instead of sys.modules to get the already imported back-end
190 # class because the appropriate modules in sys.modules could potentially be only
191 # partially initialized
192 try:
193 return loaded_backends[asynclib_name]
194 except KeyError:
195 module = import_module(f"anyio._backends._{asynclib_name}")
196 loaded_backends[asynclib_name] = module.backend_class
197 return module.backend_class
198
199
200def current_async_library() -> str | None:
201 if sniffio is None:
202 # If sniffio is not installed, we assume we're either running asyncio or nothing
203 import asyncio
204
205 try:
206 asyncio.get_running_loop()
207 return "asyncio"
208 except RuntimeError:
209 pass
210 else:
211 try:
212 return sniffio.current_async_library()
213 except sniffio.AsyncLibraryNotFoundError:
214 pass
215
216 return None
217
218
219def set_current_async_library(asynclib_name: str | None) -> Token | None:
220 # no-op if sniffio is not installed
221 if sniffio is None:
222 return None
223
224 return sniffio.current_async_library_cvar.set(asynclib_name)
225
226
227def reset_current_async_library(token: Token | None) -> None:
228 if token is not None:
229 sniffio.current_async_library_cvar.reset(token)