1from __future__ import annotations
2
3import math
4import sys
5import threading
6from collections.abc import Awaitable, Callable, Generator
7from contextlib import contextmanager
8from importlib import import_module
9from typing import TYPE_CHECKING, Any, TypeVar
10
11import sniffio
12
13if sys.version_info >= (3, 11):
14 from typing import TypeVarTuple, Unpack
15else:
16 from typing_extensions import TypeVarTuple, Unpack
17
18if TYPE_CHECKING:
19 from ..abc import AsyncBackend
20
21# This must be updated when new backends are introduced
22BACKENDS = "asyncio", "trio"
23
24T_Retval = TypeVar("T_Retval")
25PosArgsT = TypeVarTuple("PosArgsT")
26
27threadlocals = threading.local()
28loaded_backends: dict[str, type[AsyncBackend]] = {}
29
30
31def run(
32 func: Callable[[Unpack[PosArgsT]], Awaitable[T_Retval]],
33 *args: Unpack[PosArgsT],
34 backend: str = "asyncio",
35 backend_options: dict[str, Any] | None = None,
36) -> T_Retval:
37 """
38 Run the given coroutine function in an asynchronous event loop.
39
40 The current thread must not be already running an event loop.
41
42 :param func: a coroutine function
43 :param args: positional arguments to ``func``
44 :param backend: name of the asynchronous event loop implementation – currently
45 either ``asyncio`` or ``trio``
46 :param backend_options: keyword arguments to call the backend ``run()``
47 implementation with (documented :ref:`here <backend options>`)
48 :return: the return value of the coroutine function
49 :raises RuntimeError: if an asynchronous event loop is already running in this
50 thread
51 :raises LookupError: if the named backend is not found
52
53 """
54 try:
55 asynclib_name = sniffio.current_async_library()
56 except sniffio.AsyncLibraryNotFoundError:
57 pass
58 else:
59 raise RuntimeError(f"Already running {asynclib_name} in this thread")
60
61 try:
62 async_backend = get_async_backend(backend)
63 except ImportError as exc:
64 raise LookupError(f"No such backend: {backend}") from exc
65
66 token = None
67 if sniffio.current_async_library_cvar.get(None) is None:
68 # Since we're in control of the event loop, we can cache the name of the async
69 # library
70 token = sniffio.current_async_library_cvar.set(backend)
71
72 try:
73 backend_options = backend_options or {}
74 return async_backend.run(func, args, {}, backend_options)
75 finally:
76 if token:
77 sniffio.current_async_library_cvar.reset(token)
78
79
80async def sleep(delay: float) -> None:
81 """
82 Pause the current task for the specified duration.
83
84 :param delay: the duration, in seconds
85
86 """
87 return await get_async_backend().sleep(delay)
88
89
90async def sleep_forever() -> None:
91 """
92 Pause the current task until it's cancelled.
93
94 This is a shortcut for ``sleep(math.inf)``.
95
96 .. versionadded:: 3.1
97
98 """
99 await sleep(math.inf)
100
101
102async def sleep_until(deadline: float) -> None:
103 """
104 Pause the current task until the given time.
105
106 :param deadline: the absolute time to wake up at (according to the internal
107 monotonic clock of the event loop)
108
109 .. versionadded:: 3.1
110
111 """
112 now = current_time()
113 await sleep(max(deadline - now, 0))
114
115
116def current_time() -> float:
117 """
118 Return the current value of the event loop's internal clock.
119
120 :return: the clock value (seconds)
121
122 """
123 return get_async_backend().current_time()
124
125
126def get_all_backends() -> tuple[str, ...]:
127 """Return a tuple of the names of all built-in backends."""
128 return BACKENDS
129
130
131def get_cancelled_exc_class() -> type[BaseException]:
132 """Return the current async library's cancellation exception class."""
133 return get_async_backend().cancelled_exception_class()
134
135
136#
137# Private API
138#
139
140
141@contextmanager
142def claim_worker_thread(
143 backend_class: type[AsyncBackend], token: object
144) -> Generator[Any, None, None]:
145 threadlocals.current_async_backend = backend_class
146 threadlocals.current_token = token
147 try:
148 yield
149 finally:
150 del threadlocals.current_async_backend
151 del threadlocals.current_token
152
153
154def get_async_backend(asynclib_name: str | None = None) -> type[AsyncBackend]:
155 if asynclib_name is None:
156 asynclib_name = sniffio.current_async_library()
157
158 # We use our own dict instead of sys.modules to get the already imported back-end
159 # class because the appropriate modules in sys.modules could potentially be only
160 # partially initialized
161 try:
162 return loaded_backends[asynclib_name]
163 except KeyError:
164 module = import_module(f"anyio._backends._{asynclib_name}")
165 loaded_backends[asynclib_name] = module.backend_class
166 return module.backend_class