1from __future__ import annotations
2
3import math
4import sys
5from abc import ABCMeta, abstractmethod
6from collections.abc import AsyncIterator, Awaitable, Mapping
7from os import PathLike
8from signal import Signals
9from socket import AddressFamily, SocketKind, socket
10from typing import (
11 IO,
12 TYPE_CHECKING,
13 Any,
14 Callable,
15 ContextManager,
16 Sequence,
17 TypeVar,
18 overload,
19)
20
21if sys.version_info >= (3, 11):
22 from typing import TypeVarTuple, Unpack
23else:
24 from typing_extensions import TypeVarTuple, Unpack
25
26if TYPE_CHECKING:
27 from typing import Literal
28
29 from .._core._synchronization import CapacityLimiter, Event
30 from .._core._tasks import CancelScope
31 from .._core._testing import TaskInfo
32 from ..from_thread import BlockingPortal
33 from ._sockets import (
34 ConnectedUDPSocket,
35 ConnectedUNIXDatagramSocket,
36 IPSockAddrType,
37 SocketListener,
38 SocketStream,
39 UDPSocket,
40 UNIXDatagramSocket,
41 UNIXSocketStream,
42 )
43 from ._subprocesses import Process
44 from ._tasks import TaskGroup
45 from ._testing import TestRunner
46
47T_Retval = TypeVar("T_Retval")
48PosArgsT = TypeVarTuple("PosArgsT")
49
50
51class AsyncBackend(metaclass=ABCMeta):
52 @classmethod
53 @abstractmethod
54 def run(
55 cls,
56 func: Callable[[Unpack[PosArgsT]], Awaitable[T_Retval]],
57 args: tuple[Unpack[PosArgsT]],
58 kwargs: dict[str, Any],
59 options: dict[str, Any],
60 ) -> T_Retval:
61 """
62 Run the given coroutine function in an asynchronous event loop.
63
64 The current thread must not be already running an event loop.
65
66 :param func: a coroutine function
67 :param args: positional arguments to ``func``
68 :param kwargs: positional arguments to ``func``
69 :param options: keyword arguments to call the backend ``run()`` implementation
70 with
71 :return: the return value of the coroutine function
72 """
73
74 @classmethod
75 @abstractmethod
76 def current_token(cls) -> object:
77 """
78
79 :return:
80 """
81
82 @classmethod
83 @abstractmethod
84 def current_time(cls) -> float:
85 """
86 Return the current value of the event loop's internal clock.
87
88 :return: the clock value (seconds)
89 """
90
91 @classmethod
92 @abstractmethod
93 def cancelled_exception_class(cls) -> type[BaseException]:
94 """Return the exception class that is raised in a task if it's cancelled."""
95
96 @classmethod
97 @abstractmethod
98 async def checkpoint(cls) -> None:
99 """
100 Check if the task has been cancelled, and allow rescheduling of other tasks.
101
102 This is effectively the same as running :meth:`checkpoint_if_cancelled` and then
103 :meth:`cancel_shielded_checkpoint`.
104 """
105
106 @classmethod
107 async def checkpoint_if_cancelled(cls) -> None:
108 """
109 Check if the current task group has been cancelled.
110
111 This will check if the task has been cancelled, but will not allow other tasks
112 to be scheduled if not.
113
114 """
115 if cls.current_effective_deadline() == -math.inf:
116 await cls.checkpoint()
117
118 @classmethod
119 async def cancel_shielded_checkpoint(cls) -> None:
120 """
121 Allow the rescheduling of other tasks.
122
123 This will give other tasks the opportunity to run, but without checking if the
124 current task group has been cancelled, unlike with :meth:`checkpoint`.
125
126 """
127 with cls.create_cancel_scope(shield=True):
128 await cls.sleep(0)
129
130 @classmethod
131 @abstractmethod
132 async def sleep(cls, delay: float) -> None:
133 """
134 Pause the current task for the specified duration.
135
136 :param delay: the duration, in seconds
137 """
138
139 @classmethod
140 @abstractmethod
141 def create_cancel_scope(
142 cls, *, deadline: float = math.inf, shield: bool = False
143 ) -> CancelScope:
144 pass
145
146 @classmethod
147 @abstractmethod
148 def current_effective_deadline(cls) -> float:
149 """
150 Return the nearest deadline among all the cancel scopes effective for the
151 current task.
152
153 :return:
154 - a clock value from the event loop's internal clock
155 - ``inf`` if there is no deadline in effect
156 - ``-inf`` if the current scope has been cancelled
157 :rtype: float
158 """
159
160 @classmethod
161 @abstractmethod
162 def create_task_group(cls) -> TaskGroup:
163 pass
164
165 @classmethod
166 @abstractmethod
167 def create_event(cls) -> Event:
168 pass
169
170 @classmethod
171 @abstractmethod
172 def create_capacity_limiter(cls, total_tokens: float) -> CapacityLimiter:
173 pass
174
175 @classmethod
176 @abstractmethod
177 async def run_sync_in_worker_thread(
178 cls,
179 func: Callable[[Unpack[PosArgsT]], T_Retval],
180 args: tuple[Unpack[PosArgsT]],
181 abandon_on_cancel: bool = False,
182 limiter: CapacityLimiter | None = None,
183 ) -> T_Retval:
184 pass
185
186 @classmethod
187 @abstractmethod
188 def check_cancelled(cls) -> None:
189 pass
190
191 @classmethod
192 @abstractmethod
193 def run_async_from_thread(
194 cls,
195 func: Callable[[Unpack[PosArgsT]], Awaitable[T_Retval]],
196 args: tuple[Unpack[PosArgsT]],
197 token: object,
198 ) -> T_Retval:
199 pass
200
201 @classmethod
202 @abstractmethod
203 def run_sync_from_thread(
204 cls,
205 func: Callable[[Unpack[PosArgsT]], T_Retval],
206 args: tuple[Unpack[PosArgsT]],
207 token: object,
208 ) -> T_Retval:
209 pass
210
211 @classmethod
212 @abstractmethod
213 def create_blocking_portal(cls) -> BlockingPortal:
214 pass
215
216 @classmethod
217 @overload
218 async def open_process(
219 cls,
220 command: str | bytes,
221 *,
222 shell: Literal[True],
223 stdin: int | IO[Any] | None,
224 stdout: int | IO[Any] | None,
225 stderr: int | IO[Any] | None,
226 cwd: str | bytes | PathLike[str] | None = None,
227 env: Mapping[str, str] | None = None,
228 start_new_session: bool = False,
229 ) -> Process:
230 pass
231
232 @classmethod
233 @overload
234 async def open_process(
235 cls,
236 command: Sequence[str | bytes],
237 *,
238 shell: Literal[False],
239 stdin: int | IO[Any] | None,
240 stdout: int | IO[Any] | None,
241 stderr: int | IO[Any] | None,
242 cwd: str | bytes | PathLike[str] | None = None,
243 env: Mapping[str, str] | None = None,
244 start_new_session: bool = False,
245 ) -> Process:
246 pass
247
248 @classmethod
249 @abstractmethod
250 async def open_process(
251 cls,
252 command: str | bytes | Sequence[str | bytes],
253 *,
254 shell: bool,
255 stdin: int | IO[Any] | None,
256 stdout: int | IO[Any] | None,
257 stderr: int | IO[Any] | None,
258 cwd: str | bytes | PathLike[str] | None = None,
259 env: Mapping[str, str] | None = None,
260 start_new_session: bool = False,
261 ) -> Process:
262 pass
263
264 @classmethod
265 @abstractmethod
266 def setup_process_pool_exit_at_shutdown(cls, workers: set[Process]) -> None:
267 pass
268
269 @classmethod
270 @abstractmethod
271 async def connect_tcp(
272 cls, host: str, port: int, local_address: IPSockAddrType | None = None
273 ) -> SocketStream:
274 pass
275
276 @classmethod
277 @abstractmethod
278 async def connect_unix(cls, path: str | bytes) -> UNIXSocketStream:
279 pass
280
281 @classmethod
282 @abstractmethod
283 def create_tcp_listener(cls, sock: socket) -> SocketListener:
284 pass
285
286 @classmethod
287 @abstractmethod
288 def create_unix_listener(cls, sock: socket) -> SocketListener:
289 pass
290
291 @classmethod
292 @abstractmethod
293 async def create_udp_socket(
294 cls,
295 family: AddressFamily,
296 local_address: IPSockAddrType | None,
297 remote_address: IPSockAddrType | None,
298 reuse_port: bool,
299 ) -> UDPSocket | ConnectedUDPSocket:
300 pass
301
302 @classmethod
303 @overload
304 async def create_unix_datagram_socket(
305 cls, raw_socket: socket, remote_path: None
306 ) -> UNIXDatagramSocket: ...
307
308 @classmethod
309 @overload
310 async def create_unix_datagram_socket(
311 cls, raw_socket: socket, remote_path: str | bytes
312 ) -> ConnectedUNIXDatagramSocket: ...
313
314 @classmethod
315 @abstractmethod
316 async def create_unix_datagram_socket(
317 cls, raw_socket: socket, remote_path: str | bytes | None
318 ) -> UNIXDatagramSocket | ConnectedUNIXDatagramSocket:
319 pass
320
321 @classmethod
322 @abstractmethod
323 async def getaddrinfo(
324 cls,
325 host: bytes | str | None,
326 port: str | int | None,
327 *,
328 family: int | AddressFamily = 0,
329 type: int | SocketKind = 0,
330 proto: int = 0,
331 flags: int = 0,
332 ) -> list[
333 tuple[
334 AddressFamily,
335 SocketKind,
336 int,
337 str,
338 tuple[str, int] | tuple[str, int, int, int],
339 ]
340 ]:
341 pass
342
343 @classmethod
344 @abstractmethod
345 async def getnameinfo(
346 cls, sockaddr: IPSockAddrType, flags: int = 0
347 ) -> tuple[str, str]:
348 pass
349
350 @classmethod
351 @abstractmethod
352 async def wait_socket_readable(cls, sock: socket) -> None:
353 pass
354
355 @classmethod
356 @abstractmethod
357 async def wait_socket_writable(cls, sock: socket) -> None:
358 pass
359
360 @classmethod
361 @abstractmethod
362 def current_default_thread_limiter(cls) -> CapacityLimiter:
363 pass
364
365 @classmethod
366 @abstractmethod
367 def open_signal_receiver(
368 cls, *signals: Signals
369 ) -> ContextManager[AsyncIterator[Signals]]:
370 pass
371
372 @classmethod
373 @abstractmethod
374 def get_current_task(cls) -> TaskInfo:
375 pass
376
377 @classmethod
378 @abstractmethod
379 def get_running_tasks(cls) -> Sequence[TaskInfo]:
380 pass
381
382 @classmethod
383 @abstractmethod
384 async def wait_all_tasks_blocked(cls) -> None:
385 pass
386
387 @classmethod
388 @abstractmethod
389 def create_test_runner(cls, options: dict[str, Any]) -> TestRunner:
390 pass