1from __future__ import annotations
2
3import math
4import sys
5from abc import ABCMeta, abstractmethod
6from collections.abc import AsyncIterator, Awaitable, Mapping
7from os import PathLike
8from signal import Signals
9from socket import AddressFamily, SocketKind, socket
10from typing import (
11 IO,
12 TYPE_CHECKING,
13 Any,
14 Callable,
15 ContextManager,
16 Sequence,
17 TypeVar,
18 overload,
19)
20
21if sys.version_info >= (3, 11):
22 from typing import TypeVarTuple, Unpack
23else:
24 from typing_extensions import TypeVarTuple, Unpack
25
26if TYPE_CHECKING:
27 from typing import Literal
28
29 from .._core._synchronization import CapacityLimiter, Event
30 from .._core._tasks import CancelScope
31 from .._core._testing import TaskInfo
32 from ..from_thread import BlockingPortal
33 from ._sockets import (
34 ConnectedUDPSocket,
35 ConnectedUNIXDatagramSocket,
36 IPSockAddrType,
37 SocketListener,
38 SocketStream,
39 UDPSocket,
40 UNIXDatagramSocket,
41 UNIXSocketStream,
42 )
43 from ._subprocesses import Process
44 from ._tasks import TaskGroup
45 from ._testing import TestRunner
46
47T_Retval = TypeVar("T_Retval")
48PosArgsT = TypeVarTuple("PosArgsT")
49
50
51class AsyncBackend(metaclass=ABCMeta):
52 @classmethod
53 @abstractmethod
54 def run(
55 cls,
56 func: Callable[[Unpack[PosArgsT]], Awaitable[T_Retval]],
57 args: tuple[Unpack[PosArgsT]],
58 kwargs: dict[str, Any],
59 options: dict[str, Any],
60 ) -> T_Retval:
61 """
62 Run the given coroutine function in an asynchronous event loop.
63
64 The current thread must not be already running an event loop.
65
66 :param func: a coroutine function
67 :param args: positional arguments to ``func``
68 :param kwargs: positional arguments to ``func``
69 :param options: keyword arguments to call the backend ``run()`` implementation
70 with
71 :return: the return value of the coroutine function
72 """
73
74 @classmethod
75 @abstractmethod
76 def current_token(cls) -> object:
77 """
78
79 :return:
80 """
81
82 @classmethod
83 @abstractmethod
84 def current_time(cls) -> float:
85 """
86 Return the current value of the event loop's internal clock.
87
88 :return: the clock value (seconds)
89 """
90
91 @classmethod
92 @abstractmethod
93 def cancelled_exception_class(cls) -> type[BaseException]:
94 """Return the exception class that is raised in a task if it's cancelled."""
95
96 @classmethod
97 @abstractmethod
98 async def checkpoint(cls) -> None:
99 """
100 Check if the task has been cancelled, and allow rescheduling of other tasks.
101
102 This is effectively the same as running :meth:`checkpoint_if_cancelled` and then
103 :meth:`cancel_shielded_checkpoint`.
104 """
105
106 @classmethod
107 async def checkpoint_if_cancelled(cls) -> None:
108 """
109 Check if the current task group has been cancelled.
110
111 This will check if the task has been cancelled, but will not allow other tasks
112 to be scheduled if not.
113
114 """
115 if cls.current_effective_deadline() == -math.inf:
116 await cls.checkpoint()
117
118 @classmethod
119 async def cancel_shielded_checkpoint(cls) -> None:
120 """
121 Allow the rescheduling of other tasks.
122
123 This will give other tasks the opportunity to run, but without checking if the
124 current task group has been cancelled, unlike with :meth:`checkpoint`.
125
126 """
127 with cls.create_cancel_scope(shield=True):
128 await cls.sleep(0)
129
130 @classmethod
131 @abstractmethod
132 async def sleep(cls, delay: float) -> None:
133 """
134 Pause the current task for the specified duration.
135
136 :param delay: the duration, in seconds
137 """
138
139 @classmethod
140 @abstractmethod
141 def create_cancel_scope(
142 cls, *, deadline: float = math.inf, shield: bool = False
143 ) -> CancelScope:
144 pass
145
146 @classmethod
147 @abstractmethod
148 def current_effective_deadline(cls) -> float:
149 """
150 Return the nearest deadline among all the cancel scopes effective for the
151 current task.
152
153 :return:
154 - a clock value from the event loop's internal clock
155 - ``inf`` if there is no deadline in effect
156 - ``-inf`` if the current scope has been cancelled
157 :rtype: float
158 """
159
160 @classmethod
161 @abstractmethod
162 def create_task_group(cls) -> TaskGroup:
163 pass
164
165 @classmethod
166 @abstractmethod
167 def create_event(cls) -> Event:
168 pass
169
170 @classmethod
171 @abstractmethod
172 def create_capacity_limiter(cls, total_tokens: float) -> CapacityLimiter:
173 pass
174
175 @classmethod
176 @abstractmethod
177 async def run_sync_in_worker_thread(
178 cls,
179 func: Callable[[Unpack[PosArgsT]], T_Retval],
180 args: tuple[Unpack[PosArgsT]],
181 abandon_on_cancel: bool = False,
182 limiter: CapacityLimiter | None = None,
183 ) -> T_Retval:
184 pass
185
186 @classmethod
187 @abstractmethod
188 def check_cancelled(cls) -> None:
189 pass
190
191 @classmethod
192 @abstractmethod
193 def run_async_from_thread(
194 cls,
195 func: Callable[[Unpack[PosArgsT]], Awaitable[T_Retval]],
196 args: tuple[Unpack[PosArgsT]],
197 token: object,
198 ) -> T_Retval:
199 pass
200
201 @classmethod
202 @abstractmethod
203 def run_sync_from_thread(
204 cls,
205 func: Callable[[Unpack[PosArgsT]], T_Retval],
206 args: tuple[Unpack[PosArgsT]],
207 token: object,
208 ) -> T_Retval:
209 pass
210
211 @classmethod
212 @abstractmethod
213 def create_blocking_portal(cls) -> BlockingPortal:
214 pass
215
216 @classmethod
217 @overload
218 async def open_process(
219 cls,
220 command: str | bytes,
221 *,
222 shell: Literal[True],
223 stdin: int | IO[Any] | None,
224 stdout: int | IO[Any] | None,
225 stderr: int | IO[Any] | None,
226 cwd: str | bytes | PathLike[str] | None = None,
227 env: Mapping[str, str] | None = None,
228 start_new_session: bool = False,
229 ) -> Process:
230 pass
231
232 @classmethod
233 @overload
234 async def open_process(
235 cls,
236 command: Sequence[str | bytes],
237 *,
238 shell: Literal[False],
239 stdin: int | IO[Any] | None,
240 stdout: int | IO[Any] | None,
241 stderr: int | IO[Any] | None,
242 cwd: str | bytes | PathLike[str] | None = None,
243 env: Mapping[str, str] | None = None,
244 start_new_session: bool = False,
245 ) -> Process:
246 pass
247
248 @classmethod
249 @abstractmethod
250 async def open_process(
251 cls,
252 command: str | bytes | Sequence[str | bytes],
253 *,
254 shell: bool,
255 stdin: int | IO[Any] | None,
256 stdout: int | IO[Any] | None,
257 stderr: int | IO[Any] | None,
258 cwd: str | bytes | PathLike[str] | None = None,
259 env: Mapping[str, str] | None = None,
260 start_new_session: bool = False,
261 ) -> Process:
262 pass
263
264 @classmethod
265 @abstractmethod
266 def setup_process_pool_exit_at_shutdown(cls, workers: set[Process]) -> None:
267 pass
268
269 @classmethod
270 @abstractmethod
271 async def connect_tcp(
272 cls, host: str, port: int, local_address: IPSockAddrType | None = None
273 ) -> SocketStream:
274 pass
275
276 @classmethod
277 @abstractmethod
278 async def connect_unix(cls, path: str | bytes) -> UNIXSocketStream:
279 pass
280
281 @classmethod
282 @abstractmethod
283 def create_tcp_listener(cls, sock: socket) -> SocketListener:
284 pass
285
286 @classmethod
287 @abstractmethod
288 def create_unix_listener(cls, sock: socket) -> SocketListener:
289 pass
290
291 @classmethod
292 @abstractmethod
293 async def create_udp_socket(
294 cls,
295 family: AddressFamily,
296 local_address: IPSockAddrType | None,
297 remote_address: IPSockAddrType | None,
298 reuse_port: bool,
299 ) -> UDPSocket | ConnectedUDPSocket:
300 pass
301
302 @classmethod
303 @overload
304 async def create_unix_datagram_socket(
305 cls, raw_socket: socket, remote_path: None
306 ) -> UNIXDatagramSocket:
307 ...
308
309 @classmethod
310 @overload
311 async def create_unix_datagram_socket(
312 cls, raw_socket: socket, remote_path: str | bytes
313 ) -> ConnectedUNIXDatagramSocket:
314 ...
315
316 @classmethod
317 @abstractmethod
318 async def create_unix_datagram_socket(
319 cls, raw_socket: socket, remote_path: str | bytes | None
320 ) -> UNIXDatagramSocket | ConnectedUNIXDatagramSocket:
321 pass
322
323 @classmethod
324 @abstractmethod
325 async def getaddrinfo(
326 cls,
327 host: bytes | str | None,
328 port: str | int | None,
329 *,
330 family: int | AddressFamily = 0,
331 type: int | SocketKind = 0,
332 proto: int = 0,
333 flags: int = 0,
334 ) -> list[
335 tuple[
336 AddressFamily,
337 SocketKind,
338 int,
339 str,
340 tuple[str, int] | tuple[str, int, int, int],
341 ]
342 ]:
343 pass
344
345 @classmethod
346 @abstractmethod
347 async def getnameinfo(
348 cls, sockaddr: IPSockAddrType, flags: int = 0
349 ) -> tuple[str, str]:
350 pass
351
352 @classmethod
353 @abstractmethod
354 async def wait_socket_readable(cls, sock: socket) -> None:
355 pass
356
357 @classmethod
358 @abstractmethod
359 async def wait_socket_writable(cls, sock: socket) -> None:
360 pass
361
362 @classmethod
363 @abstractmethod
364 def current_default_thread_limiter(cls) -> CapacityLimiter:
365 pass
366
367 @classmethod
368 @abstractmethod
369 def open_signal_receiver(
370 cls, *signals: Signals
371 ) -> ContextManager[AsyncIterator[Signals]]:
372 pass
373
374 @classmethod
375 @abstractmethod
376 def get_current_task(cls) -> TaskInfo:
377 pass
378
379 @classmethod
380 @abstractmethod
381 def get_running_tasks(cls) -> list[TaskInfo]:
382 pass
383
384 @classmethod
385 @abstractmethod
386 async def wait_all_tasks_blocked(cls) -> None:
387 pass
388
389 @classmethod
390 @abstractmethod
391 def create_test_runner(cls, options: dict[str, Any]) -> TestRunner:
392 pass