1from __future__ import annotations
2
3import math
4import sys
5from abc import ABCMeta, abstractmethod
6from collections.abc import AsyncIterator, Awaitable, Callable, Sequence
7from contextlib import AbstractContextManager
8from os import PathLike
9from signal import Signals
10from socket import AddressFamily, SocketKind, socket
11from typing import (
12 IO,
13 TYPE_CHECKING,
14 Any,
15 TypeVar,
16 Union,
17 overload,
18)
19
20if sys.version_info >= (3, 11):
21 from typing import TypeVarTuple, Unpack
22else:
23 from typing_extensions import TypeVarTuple, Unpack
24
25if sys.version_info >= (3, 10):
26 from typing import TypeAlias
27else:
28 from typing_extensions import TypeAlias
29
30if TYPE_CHECKING:
31 from _typeshed import HasFileno
32
33 from .._core._synchronization import CapacityLimiter, Event, Lock, Semaphore
34 from .._core._tasks import CancelScope
35 from .._core._testing import TaskInfo
36 from ..from_thread import BlockingPortal
37 from ._sockets import (
38 ConnectedUDPSocket,
39 ConnectedUNIXDatagramSocket,
40 IPSockAddrType,
41 SocketListener,
42 SocketStream,
43 UDPSocket,
44 UNIXDatagramSocket,
45 UNIXSocketStream,
46 )
47 from ._subprocesses import Process
48 from ._tasks import TaskGroup
49 from ._testing import TestRunner
50
51T_Retval = TypeVar("T_Retval")
52PosArgsT = TypeVarTuple("PosArgsT")
53StrOrBytesPath: TypeAlias = Union[str, bytes, "PathLike[str]", "PathLike[bytes]"]
54
55
56class AsyncBackend(metaclass=ABCMeta):
57 @classmethod
58 @abstractmethod
59 def run(
60 cls,
61 func: Callable[[Unpack[PosArgsT]], Awaitable[T_Retval]],
62 args: tuple[Unpack[PosArgsT]],
63 kwargs: dict[str, Any],
64 options: dict[str, Any],
65 ) -> T_Retval:
66 """
67 Run the given coroutine function in an asynchronous event loop.
68
69 The current thread must not be already running an event loop.
70
71 :param func: a coroutine function
72 :param args: positional arguments to ``func``
73 :param kwargs: positional arguments to ``func``
74 :param options: keyword arguments to call the backend ``run()`` implementation
75 with
76 :return: the return value of the coroutine function
77 """
78
79 @classmethod
80 @abstractmethod
81 def current_token(cls) -> object:
82 """
83
84 :return:
85 """
86
87 @classmethod
88 @abstractmethod
89 def current_time(cls) -> float:
90 """
91 Return the current value of the event loop's internal clock.
92
93 :return: the clock value (seconds)
94 """
95
96 @classmethod
97 @abstractmethod
98 def cancelled_exception_class(cls) -> type[BaseException]:
99 """Return the exception class that is raised in a task if it's cancelled."""
100
101 @classmethod
102 @abstractmethod
103 async def checkpoint(cls) -> None:
104 """
105 Check if the task has been cancelled, and allow rescheduling of other tasks.
106
107 This is effectively the same as running :meth:`checkpoint_if_cancelled` and then
108 :meth:`cancel_shielded_checkpoint`.
109 """
110
111 @classmethod
112 async def checkpoint_if_cancelled(cls) -> None:
113 """
114 Check if the current task group has been cancelled.
115
116 This will check if the task has been cancelled, but will not allow other tasks
117 to be scheduled if not.
118
119 """
120 if cls.current_effective_deadline() == -math.inf:
121 await cls.checkpoint()
122
123 @classmethod
124 async def cancel_shielded_checkpoint(cls) -> None:
125 """
126 Allow the rescheduling of other tasks.
127
128 This will give other tasks the opportunity to run, but without checking if the
129 current task group has been cancelled, unlike with :meth:`checkpoint`.
130
131 """
132 with cls.create_cancel_scope(shield=True):
133 await cls.sleep(0)
134
135 @classmethod
136 @abstractmethod
137 async def sleep(cls, delay: float) -> None:
138 """
139 Pause the current task for the specified duration.
140
141 :param delay: the duration, in seconds
142 """
143
144 @classmethod
145 @abstractmethod
146 def create_cancel_scope(
147 cls, *, deadline: float = math.inf, shield: bool = False
148 ) -> CancelScope:
149 pass
150
151 @classmethod
152 @abstractmethod
153 def current_effective_deadline(cls) -> float:
154 """
155 Return the nearest deadline among all the cancel scopes effective for the
156 current task.
157
158 :return:
159 - a clock value from the event loop's internal clock
160 - ``inf`` if there is no deadline in effect
161 - ``-inf`` if the current scope has been cancelled
162 :rtype: float
163 """
164
165 @classmethod
166 @abstractmethod
167 def create_task_group(cls) -> TaskGroup:
168 pass
169
170 @classmethod
171 @abstractmethod
172 def create_event(cls) -> Event:
173 pass
174
175 @classmethod
176 @abstractmethod
177 def create_lock(cls, *, fast_acquire: bool) -> Lock:
178 pass
179
180 @classmethod
181 @abstractmethod
182 def create_semaphore(
183 cls,
184 initial_value: int,
185 *,
186 max_value: int | None = None,
187 fast_acquire: bool = False,
188 ) -> Semaphore:
189 pass
190
191 @classmethod
192 @abstractmethod
193 def create_capacity_limiter(cls, total_tokens: float) -> CapacityLimiter:
194 pass
195
196 @classmethod
197 @abstractmethod
198 async def run_sync_in_worker_thread(
199 cls,
200 func: Callable[[Unpack[PosArgsT]], T_Retval],
201 args: tuple[Unpack[PosArgsT]],
202 abandon_on_cancel: bool = False,
203 limiter: CapacityLimiter | None = None,
204 ) -> T_Retval:
205 pass
206
207 @classmethod
208 @abstractmethod
209 def check_cancelled(cls) -> None:
210 pass
211
212 @classmethod
213 @abstractmethod
214 def run_async_from_thread(
215 cls,
216 func: Callable[[Unpack[PosArgsT]], Awaitable[T_Retval]],
217 args: tuple[Unpack[PosArgsT]],
218 token: object,
219 ) -> T_Retval:
220 pass
221
222 @classmethod
223 @abstractmethod
224 def run_sync_from_thread(
225 cls,
226 func: Callable[[Unpack[PosArgsT]], T_Retval],
227 args: tuple[Unpack[PosArgsT]],
228 token: object,
229 ) -> T_Retval:
230 pass
231
232 @classmethod
233 @abstractmethod
234 def create_blocking_portal(cls) -> BlockingPortal:
235 pass
236
237 @classmethod
238 @abstractmethod
239 async def open_process(
240 cls,
241 command: StrOrBytesPath | Sequence[StrOrBytesPath],
242 *,
243 stdin: int | IO[Any] | None,
244 stdout: int | IO[Any] | None,
245 stderr: int | IO[Any] | None,
246 **kwargs: Any,
247 ) -> Process:
248 pass
249
250 @classmethod
251 @abstractmethod
252 def setup_process_pool_exit_at_shutdown(cls, workers: set[Process]) -> None:
253 pass
254
255 @classmethod
256 @abstractmethod
257 async def connect_tcp(
258 cls, host: str, port: int, local_address: IPSockAddrType | None = None
259 ) -> SocketStream:
260 pass
261
262 @classmethod
263 @abstractmethod
264 async def connect_unix(cls, path: str | bytes) -> UNIXSocketStream:
265 pass
266
267 @classmethod
268 @abstractmethod
269 def create_tcp_listener(cls, sock: socket) -> SocketListener:
270 pass
271
272 @classmethod
273 @abstractmethod
274 def create_unix_listener(cls, sock: socket) -> SocketListener:
275 pass
276
277 @classmethod
278 @abstractmethod
279 async def create_udp_socket(
280 cls,
281 family: AddressFamily,
282 local_address: IPSockAddrType | None,
283 remote_address: IPSockAddrType | None,
284 reuse_port: bool,
285 ) -> UDPSocket | ConnectedUDPSocket:
286 pass
287
288 @classmethod
289 @overload
290 async def create_unix_datagram_socket(
291 cls, raw_socket: socket, remote_path: None
292 ) -> UNIXDatagramSocket: ...
293
294 @classmethod
295 @overload
296 async def create_unix_datagram_socket(
297 cls, raw_socket: socket, remote_path: str | bytes
298 ) -> ConnectedUNIXDatagramSocket: ...
299
300 @classmethod
301 @abstractmethod
302 async def create_unix_datagram_socket(
303 cls, raw_socket: socket, remote_path: str | bytes | None
304 ) -> UNIXDatagramSocket | ConnectedUNIXDatagramSocket:
305 pass
306
307 @classmethod
308 @abstractmethod
309 async def getaddrinfo(
310 cls,
311 host: bytes | str | None,
312 port: str | int | None,
313 *,
314 family: int | AddressFamily = 0,
315 type: int | SocketKind = 0,
316 proto: int = 0,
317 flags: int = 0,
318 ) -> Sequence[
319 tuple[
320 AddressFamily,
321 SocketKind,
322 int,
323 str,
324 tuple[str, int] | tuple[str, int, int, int] | tuple[int, bytes],
325 ]
326 ]:
327 pass
328
329 @classmethod
330 @abstractmethod
331 async def getnameinfo(
332 cls, sockaddr: IPSockAddrType, flags: int = 0
333 ) -> tuple[str, str]:
334 pass
335
336 @classmethod
337 @abstractmethod
338 async def wait_readable(cls, obj: HasFileno | int) -> None:
339 pass
340
341 @classmethod
342 @abstractmethod
343 async def wait_writable(cls, obj: HasFileno | int) -> None:
344 pass
345
346 @classmethod
347 @abstractmethod
348 def current_default_thread_limiter(cls) -> CapacityLimiter:
349 pass
350
351 @classmethod
352 @abstractmethod
353 def open_signal_receiver(
354 cls, *signals: Signals
355 ) -> AbstractContextManager[AsyncIterator[Signals]]:
356 pass
357
358 @classmethod
359 @abstractmethod
360 def get_current_task(cls) -> TaskInfo:
361 pass
362
363 @classmethod
364 @abstractmethod
365 def get_running_tasks(cls) -> Sequence[TaskInfo]:
366 pass
367
368 @classmethod
369 @abstractmethod
370 async def wait_all_tasks_blocked(cls) -> None:
371 pass
372
373 @classmethod
374 @abstractmethod
375 def create_test_runner(cls, options: dict[str, Any]) -> TestRunner:
376 pass