Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/anyio/abc/_eventloop.py: 76%
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1from __future__ import annotations
3import math
4import sys
5from abc import ABCMeta, abstractmethod
6from collections.abc import AsyncIterator, Awaitable, Callable, Sequence
7from contextlib import AbstractContextManager
8from os import PathLike
9from signal import Signals
10from socket import AddressFamily, SocketKind, socket
11from typing import (
12 IO,
13 TYPE_CHECKING,
14 Any,
15 TypeAlias,
16 TypeVar,
17 overload,
18)
20if sys.version_info >= (3, 11):
21 from typing import TypeVarTuple, Unpack
22else:
23 from typing_extensions import TypeVarTuple, Unpack
25if TYPE_CHECKING:
26 from _typeshed import FileDescriptorLike
28 from .._core._synchronization import CapacityLimiter, Event, Lock, Semaphore
29 from .._core._tasks import CancelScope
30 from .._core._testing import TaskInfo
31 from ._sockets import (
32 ConnectedUDPSocket,
33 ConnectedUNIXDatagramSocket,
34 IPSockAddrType,
35 SocketListener,
36 SocketStream,
37 UDPSocket,
38 UNIXDatagramSocket,
39 UNIXSocketStream,
40 )
41 from ._subprocesses import Process
42 from ._tasks import TaskGroup
43 from ._testing import TestRunner
45T_Retval = TypeVar("T_Retval")
46PosArgsT = TypeVarTuple("PosArgsT")
47StrOrBytesPath: TypeAlias = str | bytes | PathLike[str] | PathLike[bytes]
50class AsyncBackend(metaclass=ABCMeta):
51 @classmethod
52 @abstractmethod
53 def run(
54 cls,
55 func: Callable[[Unpack[PosArgsT]], Awaitable[T_Retval]],
56 args: tuple[Unpack[PosArgsT]],
57 kwargs: dict[str, Any],
58 options: dict[str, Any],
59 ) -> T_Retval:
60 """
61 Run the given coroutine function in an asynchronous event loop.
63 The current thread must not be already running an event loop.
65 :param func: a coroutine function
66 :param args: positional arguments to ``func``
67 :param kwargs: positional arguments to ``func``
68 :param options: keyword arguments to call the backend ``run()`` implementation
69 with
70 :return: the return value of the coroutine function
71 """
73 @classmethod
74 @abstractmethod
75 def current_token(cls) -> object:
76 """
77 Return an object that allows other threads to run code inside the event loop.
79 :return: a token object, specific to the event loop running in the current
80 thread
81 """
83 @classmethod
84 @abstractmethod
85 def current_time(cls) -> float:
86 """
87 Return the current value of the event loop's internal clock.
89 :return: the clock value (seconds)
90 """
92 @classmethod
93 @abstractmethod
94 def cancelled_exception_class(cls) -> type[BaseException]:
95 """Return the exception class that is raised in a task if it's cancelled."""
97 @classmethod
98 @abstractmethod
99 async def checkpoint(cls) -> None:
100 """
101 Check if the task has been cancelled, and allow rescheduling of other tasks.
103 This is effectively the same as running :meth:`checkpoint_if_cancelled` and then
104 :meth:`cancel_shielded_checkpoint`.
105 """
107 @classmethod
108 async def checkpoint_if_cancelled(cls) -> None:
109 """
110 Check if the current task group has been cancelled.
112 This will check if the task has been cancelled, but will not allow other tasks
113 to be scheduled if not.
115 """
116 if cls.current_effective_deadline() == -math.inf:
117 await cls.checkpoint()
119 @classmethod
120 async def cancel_shielded_checkpoint(cls) -> None:
121 """
122 Allow the rescheduling of other tasks.
124 This will give other tasks the opportunity to run, but without checking if the
125 current task group has been cancelled, unlike with :meth:`checkpoint`.
127 """
128 with cls.create_cancel_scope(shield=True):
129 await cls.sleep(0)
131 @classmethod
132 @abstractmethod
133 async def sleep(cls, delay: float) -> None:
134 """
135 Pause the current task for the specified duration.
137 :param delay: the duration, in seconds
138 """
140 @classmethod
141 @abstractmethod
142 def create_cancel_scope(
143 cls, *, deadline: float = math.inf, shield: bool = False
144 ) -> CancelScope:
145 pass
147 @classmethod
148 @abstractmethod
149 def current_effective_deadline(cls) -> float:
150 """
151 Return the nearest deadline among all the cancel scopes effective for the
152 current task.
154 :return:
155 - a clock value from the event loop's internal clock
156 - ``inf`` if there is no deadline in effect
157 - ``-inf`` if the current scope has been cancelled
158 :rtype: float
159 """
161 @classmethod
162 @abstractmethod
163 def create_task_group(cls) -> TaskGroup:
164 pass
166 @classmethod
167 @abstractmethod
168 def create_event(cls) -> Event:
169 pass
171 @classmethod
172 @abstractmethod
173 def create_lock(cls, *, fast_acquire: bool) -> Lock:
174 pass
176 @classmethod
177 @abstractmethod
178 def create_semaphore(
179 cls,
180 initial_value: int,
181 *,
182 max_value: int | None = None,
183 fast_acquire: bool = False,
184 ) -> Semaphore:
185 pass
187 @classmethod
188 @abstractmethod
189 def create_capacity_limiter(cls, total_tokens: float) -> CapacityLimiter:
190 pass
192 @classmethod
193 @abstractmethod
194 async def run_sync_in_worker_thread(
195 cls,
196 func: Callable[[Unpack[PosArgsT]], T_Retval],
197 args: tuple[Unpack[PosArgsT]],
198 abandon_on_cancel: bool = False,
199 limiter: CapacityLimiter | None = None,
200 ) -> T_Retval:
201 pass
203 @classmethod
204 @abstractmethod
205 def check_cancelled(cls) -> None:
206 pass
208 @classmethod
209 @abstractmethod
210 def run_async_from_thread(
211 cls,
212 func: Callable[[Unpack[PosArgsT]], Awaitable[T_Retval]],
213 args: tuple[Unpack[PosArgsT]],
214 token: object,
215 ) -> T_Retval:
216 pass
218 @classmethod
219 @abstractmethod
220 def run_sync_from_thread(
221 cls,
222 func: Callable[[Unpack[PosArgsT]], T_Retval],
223 args: tuple[Unpack[PosArgsT]],
224 token: object,
225 ) -> T_Retval:
226 pass
228 @classmethod
229 @abstractmethod
230 async def open_process(
231 cls,
232 command: StrOrBytesPath | Sequence[StrOrBytesPath],
233 *,
234 stdin: int | IO[Any] | None,
235 stdout: int | IO[Any] | None,
236 stderr: int | IO[Any] | None,
237 **kwargs: Any,
238 ) -> Process:
239 pass
241 @classmethod
242 @abstractmethod
243 def setup_process_pool_exit_at_shutdown(cls, workers: set[Process]) -> None:
244 pass
246 @classmethod
247 @abstractmethod
248 async def connect_tcp(
249 cls, host: str, port: int, local_address: IPSockAddrType | None = None
250 ) -> SocketStream:
251 pass
253 @classmethod
254 @abstractmethod
255 async def connect_unix(cls, path: str | bytes) -> UNIXSocketStream:
256 pass
258 @classmethod
259 @abstractmethod
260 def create_tcp_listener(cls, sock: socket) -> SocketListener:
261 pass
263 @classmethod
264 @abstractmethod
265 def create_unix_listener(cls, sock: socket) -> SocketListener:
266 pass
268 @classmethod
269 @abstractmethod
270 async def create_udp_socket(
271 cls,
272 family: AddressFamily,
273 local_address: IPSockAddrType | None,
274 remote_address: IPSockAddrType | None,
275 reuse_port: bool,
276 ) -> UDPSocket | ConnectedUDPSocket:
277 pass
279 @classmethod
280 @overload
281 async def create_unix_datagram_socket(
282 cls, raw_socket: socket, remote_path: None
283 ) -> UNIXDatagramSocket: ...
285 @classmethod
286 @overload
287 async def create_unix_datagram_socket(
288 cls, raw_socket: socket, remote_path: str | bytes
289 ) -> ConnectedUNIXDatagramSocket: ...
291 @classmethod
292 @abstractmethod
293 async def create_unix_datagram_socket(
294 cls, raw_socket: socket, remote_path: str | bytes | None
295 ) -> UNIXDatagramSocket | ConnectedUNIXDatagramSocket:
296 pass
298 @classmethod
299 @abstractmethod
300 async def getaddrinfo(
301 cls,
302 host: bytes | str | None,
303 port: str | int | None,
304 *,
305 family: int | AddressFamily = 0,
306 type: int | SocketKind = 0,
307 proto: int = 0,
308 flags: int = 0,
309 ) -> Sequence[
310 tuple[
311 AddressFamily,
312 SocketKind,
313 int,
314 str,
315 tuple[str, int] | tuple[str, int, int, int] | tuple[int, bytes],
316 ]
317 ]:
318 pass
320 @classmethod
321 @abstractmethod
322 async def getnameinfo(
323 cls, sockaddr: IPSockAddrType, flags: int = 0
324 ) -> tuple[str, str]:
325 pass
327 @classmethod
328 @abstractmethod
329 async def wait_readable(cls, obj: FileDescriptorLike) -> None:
330 pass
332 @classmethod
333 @abstractmethod
334 async def wait_writable(cls, obj: FileDescriptorLike) -> None:
335 pass
337 @classmethod
338 @abstractmethod
339 def notify_closing(cls, obj: FileDescriptorLike) -> None:
340 pass
342 @classmethod
343 @abstractmethod
344 async def wrap_listener_socket(cls, sock: socket) -> SocketListener:
345 pass
347 @classmethod
348 @abstractmethod
349 async def wrap_stream_socket(cls, sock: socket) -> SocketStream:
350 pass
352 @classmethod
353 @abstractmethod
354 async def wrap_unix_stream_socket(cls, sock: socket) -> UNIXSocketStream:
355 pass
357 @classmethod
358 @abstractmethod
359 async def wrap_udp_socket(cls, sock: socket) -> UDPSocket:
360 pass
362 @classmethod
363 @abstractmethod
364 async def wrap_connected_udp_socket(cls, sock: socket) -> ConnectedUDPSocket:
365 pass
367 @classmethod
368 @abstractmethod
369 async def wrap_unix_datagram_socket(cls, sock: socket) -> UNIXDatagramSocket:
370 pass
372 @classmethod
373 @abstractmethod
374 async def wrap_connected_unix_datagram_socket(
375 cls, sock: socket
376 ) -> ConnectedUNIXDatagramSocket:
377 pass
379 @classmethod
380 @abstractmethod
381 def current_default_thread_limiter(cls) -> CapacityLimiter:
382 pass
384 @classmethod
385 @abstractmethod
386 def open_signal_receiver(
387 cls, *signals: Signals
388 ) -> AbstractContextManager[AsyncIterator[Signals]]:
389 pass
391 @classmethod
392 @abstractmethod
393 def get_current_task(cls) -> TaskInfo:
394 pass
396 @classmethod
397 @abstractmethod
398 def get_running_tasks(cls) -> Sequence[TaskInfo]:
399 pass
401 @classmethod
402 @abstractmethod
403 async def wait_all_tasks_blocked(cls) -> None:
404 pass
406 @classmethod
407 @abstractmethod
408 def create_test_runner(cls, options: dict[str, Any]) -> TestRunner:
409 pass