1from __future__ import annotations
2
3import math
4import sys
5from abc import ABCMeta, abstractmethod
6from collections.abc import AsyncIterator, Awaitable, Callable, Sequence
7from contextlib import AbstractContextManager
8from os import PathLike
9from signal import Signals
10from socket import AddressFamily, SocketKind, socket
11from typing import (
12 IO,
13 TYPE_CHECKING,
14 Any,
15 TypeVar,
16 Union,
17 overload,
18)
19
20if sys.version_info >= (3, 11):
21 from typing import TypeVarTuple, Unpack
22else:
23 from typing_extensions import TypeVarTuple, Unpack
24
25if sys.version_info >= (3, 10):
26 from typing import TypeAlias
27else:
28 from typing_extensions import TypeAlias
29
30if TYPE_CHECKING:
31 from _typeshed import FileDescriptorLike
32
33 from .._core._synchronization import CapacityLimiter, Event, Lock, Semaphore
34 from .._core._tasks import CancelScope
35 from .._core._testing import TaskInfo
36 from ..from_thread import BlockingPortal
37 from ._sockets import (
38 ConnectedUDPSocket,
39 ConnectedUNIXDatagramSocket,
40 IPSockAddrType,
41 SocketListener,
42 SocketStream,
43 UDPSocket,
44 UNIXDatagramSocket,
45 UNIXSocketStream,
46 )
47 from ._subprocesses import Process
48 from ._tasks import TaskGroup
49 from ._testing import TestRunner
50
51T_Retval = TypeVar("T_Retval")
52PosArgsT = TypeVarTuple("PosArgsT")
53StrOrBytesPath: TypeAlias = Union[str, bytes, "PathLike[str]", "PathLike[bytes]"]
54
55
56class AsyncBackend(metaclass=ABCMeta):
57 @classmethod
58 @abstractmethod
59 def run(
60 cls,
61 func: Callable[[Unpack[PosArgsT]], Awaitable[T_Retval]],
62 args: tuple[Unpack[PosArgsT]],
63 kwargs: dict[str, Any],
64 options: dict[str, Any],
65 ) -> T_Retval:
66 """
67 Run the given coroutine function in an asynchronous event loop.
68
69 The current thread must not be already running an event loop.
70
71 :param func: a coroutine function
72 :param args: positional arguments to ``func``
73 :param kwargs: positional arguments to ``func``
74 :param options: keyword arguments to call the backend ``run()`` implementation
75 with
76 :return: the return value of the coroutine function
77 """
78
79 @classmethod
80 @abstractmethod
81 def current_token(cls) -> object:
82 """
83 Return an object that allows other threads to run code inside the event loop.
84
85 :return: a token object, specific to the event loop running in the current
86 thread
87 """
88
89 @classmethod
90 @abstractmethod
91 def current_time(cls) -> float:
92 """
93 Return the current value of the event loop's internal clock.
94
95 :return: the clock value (seconds)
96 """
97
98 @classmethod
99 @abstractmethod
100 def cancelled_exception_class(cls) -> type[BaseException]:
101 """Return the exception class that is raised in a task if it's cancelled."""
102
103 @classmethod
104 @abstractmethod
105 async def checkpoint(cls) -> None:
106 """
107 Check if the task has been cancelled, and allow rescheduling of other tasks.
108
109 This is effectively the same as running :meth:`checkpoint_if_cancelled` and then
110 :meth:`cancel_shielded_checkpoint`.
111 """
112
113 @classmethod
114 async def checkpoint_if_cancelled(cls) -> None:
115 """
116 Check if the current task group has been cancelled.
117
118 This will check if the task has been cancelled, but will not allow other tasks
119 to be scheduled if not.
120
121 """
122 if cls.current_effective_deadline() == -math.inf:
123 await cls.checkpoint()
124
125 @classmethod
126 async def cancel_shielded_checkpoint(cls) -> None:
127 """
128 Allow the rescheduling of other tasks.
129
130 This will give other tasks the opportunity to run, but without checking if the
131 current task group has been cancelled, unlike with :meth:`checkpoint`.
132
133 """
134 with cls.create_cancel_scope(shield=True):
135 await cls.sleep(0)
136
137 @classmethod
138 @abstractmethod
139 async def sleep(cls, delay: float) -> None:
140 """
141 Pause the current task for the specified duration.
142
143 :param delay: the duration, in seconds
144 """
145
146 @classmethod
147 @abstractmethod
148 def create_cancel_scope(
149 cls, *, deadline: float = math.inf, shield: bool = False
150 ) -> CancelScope:
151 pass
152
153 @classmethod
154 @abstractmethod
155 def current_effective_deadline(cls) -> float:
156 """
157 Return the nearest deadline among all the cancel scopes effective for the
158 current task.
159
160 :return:
161 - a clock value from the event loop's internal clock
162 - ``inf`` if there is no deadline in effect
163 - ``-inf`` if the current scope has been cancelled
164 :rtype: float
165 """
166
167 @classmethod
168 @abstractmethod
169 def create_task_group(cls) -> TaskGroup:
170 pass
171
172 @classmethod
173 @abstractmethod
174 def create_event(cls) -> Event:
175 pass
176
177 @classmethod
178 @abstractmethod
179 def create_lock(cls, *, fast_acquire: bool) -> Lock:
180 pass
181
182 @classmethod
183 @abstractmethod
184 def create_semaphore(
185 cls,
186 initial_value: int,
187 *,
188 max_value: int | None = None,
189 fast_acquire: bool = False,
190 ) -> Semaphore:
191 pass
192
193 @classmethod
194 @abstractmethod
195 def create_capacity_limiter(cls, total_tokens: float) -> CapacityLimiter:
196 pass
197
198 @classmethod
199 @abstractmethod
200 async def run_sync_in_worker_thread(
201 cls,
202 func: Callable[[Unpack[PosArgsT]], T_Retval],
203 args: tuple[Unpack[PosArgsT]],
204 abandon_on_cancel: bool = False,
205 limiter: CapacityLimiter | None = None,
206 ) -> T_Retval:
207 pass
208
209 @classmethod
210 @abstractmethod
211 def check_cancelled(cls) -> None:
212 pass
213
214 @classmethod
215 @abstractmethod
216 def run_async_from_thread(
217 cls,
218 func: Callable[[Unpack[PosArgsT]], Awaitable[T_Retval]],
219 args: tuple[Unpack[PosArgsT]],
220 token: object,
221 ) -> T_Retval:
222 pass
223
224 @classmethod
225 @abstractmethod
226 def run_sync_from_thread(
227 cls,
228 func: Callable[[Unpack[PosArgsT]], T_Retval],
229 args: tuple[Unpack[PosArgsT]],
230 token: object,
231 ) -> T_Retval:
232 pass
233
234 @classmethod
235 @abstractmethod
236 def create_blocking_portal(cls) -> BlockingPortal:
237 pass
238
239 @classmethod
240 @abstractmethod
241 async def open_process(
242 cls,
243 command: StrOrBytesPath | Sequence[StrOrBytesPath],
244 *,
245 stdin: int | IO[Any] | None,
246 stdout: int | IO[Any] | None,
247 stderr: int | IO[Any] | None,
248 **kwargs: Any,
249 ) -> Process:
250 pass
251
252 @classmethod
253 @abstractmethod
254 def setup_process_pool_exit_at_shutdown(cls, workers: set[Process]) -> None:
255 pass
256
257 @classmethod
258 @abstractmethod
259 async def connect_tcp(
260 cls, host: str, port: int, local_address: IPSockAddrType | None = None
261 ) -> SocketStream:
262 pass
263
264 @classmethod
265 @abstractmethod
266 async def connect_unix(cls, path: str | bytes) -> UNIXSocketStream:
267 pass
268
269 @classmethod
270 @abstractmethod
271 def create_tcp_listener(cls, sock: socket) -> SocketListener:
272 pass
273
274 @classmethod
275 @abstractmethod
276 def create_unix_listener(cls, sock: socket) -> SocketListener:
277 pass
278
279 @classmethod
280 @abstractmethod
281 async def create_udp_socket(
282 cls,
283 family: AddressFamily,
284 local_address: IPSockAddrType | None,
285 remote_address: IPSockAddrType | None,
286 reuse_port: bool,
287 ) -> UDPSocket | ConnectedUDPSocket:
288 pass
289
290 @classmethod
291 @overload
292 async def create_unix_datagram_socket(
293 cls, raw_socket: socket, remote_path: None
294 ) -> UNIXDatagramSocket: ...
295
296 @classmethod
297 @overload
298 async def create_unix_datagram_socket(
299 cls, raw_socket: socket, remote_path: str | bytes
300 ) -> ConnectedUNIXDatagramSocket: ...
301
302 @classmethod
303 @abstractmethod
304 async def create_unix_datagram_socket(
305 cls, raw_socket: socket, remote_path: str | bytes | None
306 ) -> UNIXDatagramSocket | ConnectedUNIXDatagramSocket:
307 pass
308
309 @classmethod
310 @abstractmethod
311 async def getaddrinfo(
312 cls,
313 host: bytes | str | None,
314 port: str | int | None,
315 *,
316 family: int | AddressFamily = 0,
317 type: int | SocketKind = 0,
318 proto: int = 0,
319 flags: int = 0,
320 ) -> Sequence[
321 tuple[
322 AddressFamily,
323 SocketKind,
324 int,
325 str,
326 tuple[str, int] | tuple[str, int, int, int] | tuple[int, bytes],
327 ]
328 ]:
329 pass
330
331 @classmethod
332 @abstractmethod
333 async def getnameinfo(
334 cls, sockaddr: IPSockAddrType, flags: int = 0
335 ) -> tuple[str, str]:
336 pass
337
338 @classmethod
339 @abstractmethod
340 async def wait_readable(cls, obj: FileDescriptorLike) -> None:
341 pass
342
343 @classmethod
344 @abstractmethod
345 async def wait_writable(cls, obj: FileDescriptorLike) -> None:
346 pass
347
348 @classmethod
349 @abstractmethod
350 def notify_closing(cls, obj: FileDescriptorLike) -> None:
351 pass
352
353 @classmethod
354 @abstractmethod
355 async def wrap_listener_socket(cls, sock: socket) -> SocketListener:
356 pass
357
358 @classmethod
359 @abstractmethod
360 async def wrap_stream_socket(cls, sock: socket) -> SocketStream:
361 pass
362
363 @classmethod
364 @abstractmethod
365 async def wrap_unix_stream_socket(cls, sock: socket) -> UNIXSocketStream:
366 pass
367
368 @classmethod
369 @abstractmethod
370 async def wrap_udp_socket(cls, sock: socket) -> UDPSocket:
371 pass
372
373 @classmethod
374 @abstractmethod
375 async def wrap_connected_udp_socket(cls, sock: socket) -> ConnectedUDPSocket:
376 pass
377
378 @classmethod
379 @abstractmethod
380 async def wrap_unix_datagram_socket(cls, sock: socket) -> UNIXDatagramSocket:
381 pass
382
383 @classmethod
384 @abstractmethod
385 async def wrap_connected_unix_datagram_socket(
386 cls, sock: socket
387 ) -> ConnectedUNIXDatagramSocket:
388 pass
389
390 @classmethod
391 @abstractmethod
392 def current_default_thread_limiter(cls) -> CapacityLimiter:
393 pass
394
395 @classmethod
396 @abstractmethod
397 def open_signal_receiver(
398 cls, *signals: Signals
399 ) -> AbstractContextManager[AsyncIterator[Signals]]:
400 pass
401
402 @classmethod
403 @abstractmethod
404 def get_current_task(cls) -> TaskInfo:
405 pass
406
407 @classmethod
408 @abstractmethod
409 def get_running_tasks(cls) -> Sequence[TaskInfo]:
410 pass
411
412 @classmethod
413 @abstractmethod
414 async def wait_all_tasks_blocked(cls) -> None:
415 pass
416
417 @classmethod
418 @abstractmethod
419 def create_test_runner(cls, options: dict[str, Any]) -> TestRunner:
420 pass