Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/anyio/abc/_eventloop.py: 76%
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1from __future__ import annotations
3import math
4import sys
5from abc import ABCMeta, abstractmethod
6from collections.abc import AsyncIterator, Awaitable, Callable, Sequence
7from contextlib import AbstractContextManager
8from os import PathLike
9from signal import Signals
10from socket import AddressFamily, SocketKind, socket
11from typing import (
12 IO,
13 TYPE_CHECKING,
14 Any,
15 TypeVar,
16 Union,
17 overload,
18)
20if sys.version_info >= (3, 11):
21 from typing import TypeVarTuple, Unpack
22else:
23 from typing_extensions import TypeVarTuple, Unpack
25if sys.version_info >= (3, 10):
26 from typing import TypeAlias
27else:
28 from typing_extensions import TypeAlias
30if TYPE_CHECKING:
31 from _typeshed import FileDescriptorLike
33 from .._core._synchronization import CapacityLimiter, Event, Lock, Semaphore
34 from .._core._tasks import CancelScope
35 from .._core._testing import TaskInfo
36 from ._sockets import (
37 ConnectedUDPSocket,
38 ConnectedUNIXDatagramSocket,
39 IPSockAddrType,
40 SocketListener,
41 SocketStream,
42 UDPSocket,
43 UNIXDatagramSocket,
44 UNIXSocketStream,
45 )
46 from ._subprocesses import Process
47 from ._tasks import TaskGroup
48 from ._testing import TestRunner
50T_Retval = TypeVar("T_Retval")
51PosArgsT = TypeVarTuple("PosArgsT")
52StrOrBytesPath: TypeAlias = Union[str, bytes, "PathLike[str]", "PathLike[bytes]"]
55class AsyncBackend(metaclass=ABCMeta):
56 @classmethod
57 @abstractmethod
58 def run(
59 cls,
60 func: Callable[[Unpack[PosArgsT]], Awaitable[T_Retval]],
61 args: tuple[Unpack[PosArgsT]],
62 kwargs: dict[str, Any],
63 options: dict[str, Any],
64 ) -> T_Retval:
65 """
66 Run the given coroutine function in an asynchronous event loop.
68 The current thread must not be already running an event loop.
70 :param func: a coroutine function
71 :param args: positional arguments to ``func``
72 :param kwargs: positional arguments to ``func``
73 :param options: keyword arguments to call the backend ``run()`` implementation
74 with
75 :return: the return value of the coroutine function
76 """
78 @classmethod
79 @abstractmethod
80 def current_token(cls) -> object:
81 """
82 Return an object that allows other threads to run code inside the event loop.
84 :return: a token object, specific to the event loop running in the current
85 thread
86 """
88 @classmethod
89 @abstractmethod
90 def current_time(cls) -> float:
91 """
92 Return the current value of the event loop's internal clock.
94 :return: the clock value (seconds)
95 """
97 @classmethod
98 @abstractmethod
99 def cancelled_exception_class(cls) -> type[BaseException]:
100 """Return the exception class that is raised in a task if it's cancelled."""
102 @classmethod
103 @abstractmethod
104 async def checkpoint(cls) -> None:
105 """
106 Check if the task has been cancelled, and allow rescheduling of other tasks.
108 This is effectively the same as running :meth:`checkpoint_if_cancelled` and then
109 :meth:`cancel_shielded_checkpoint`.
110 """
112 @classmethod
113 async def checkpoint_if_cancelled(cls) -> None:
114 """
115 Check if the current task group has been cancelled.
117 This will check if the task has been cancelled, but will not allow other tasks
118 to be scheduled if not.
120 """
121 if cls.current_effective_deadline() == -math.inf:
122 await cls.checkpoint()
124 @classmethod
125 async def cancel_shielded_checkpoint(cls) -> None:
126 """
127 Allow the rescheduling of other tasks.
129 This will give other tasks the opportunity to run, but without checking if the
130 current task group has been cancelled, unlike with :meth:`checkpoint`.
132 """
133 with cls.create_cancel_scope(shield=True):
134 await cls.sleep(0)
136 @classmethod
137 @abstractmethod
138 async def sleep(cls, delay: float) -> None:
139 """
140 Pause the current task for the specified duration.
142 :param delay: the duration, in seconds
143 """
145 @classmethod
146 @abstractmethod
147 def create_cancel_scope(
148 cls, *, deadline: float = math.inf, shield: bool = False
149 ) -> CancelScope:
150 pass
152 @classmethod
153 @abstractmethod
154 def current_effective_deadline(cls) -> float:
155 """
156 Return the nearest deadline among all the cancel scopes effective for the
157 current task.
159 :return:
160 - a clock value from the event loop's internal clock
161 - ``inf`` if there is no deadline in effect
162 - ``-inf`` if the current scope has been cancelled
163 :rtype: float
164 """
166 @classmethod
167 @abstractmethod
168 def create_task_group(cls) -> TaskGroup:
169 pass
171 @classmethod
172 @abstractmethod
173 def create_event(cls) -> Event:
174 pass
176 @classmethod
177 @abstractmethod
178 def create_lock(cls, *, fast_acquire: bool) -> Lock:
179 pass
181 @classmethod
182 @abstractmethod
183 def create_semaphore(
184 cls,
185 initial_value: int,
186 *,
187 max_value: int | None = None,
188 fast_acquire: bool = False,
189 ) -> Semaphore:
190 pass
192 @classmethod
193 @abstractmethod
194 def create_capacity_limiter(cls, total_tokens: float) -> CapacityLimiter:
195 pass
197 @classmethod
198 @abstractmethod
199 async def run_sync_in_worker_thread(
200 cls,
201 func: Callable[[Unpack[PosArgsT]], T_Retval],
202 args: tuple[Unpack[PosArgsT]],
203 abandon_on_cancel: bool = False,
204 limiter: CapacityLimiter | None = None,
205 ) -> T_Retval:
206 pass
208 @classmethod
209 @abstractmethod
210 def check_cancelled(cls) -> None:
211 pass
213 @classmethod
214 @abstractmethod
215 def run_async_from_thread(
216 cls,
217 func: Callable[[Unpack[PosArgsT]], Awaitable[T_Retval]],
218 args: tuple[Unpack[PosArgsT]],
219 token: object,
220 ) -> T_Retval:
221 pass
223 @classmethod
224 @abstractmethod
225 def run_sync_from_thread(
226 cls,
227 func: Callable[[Unpack[PosArgsT]], T_Retval],
228 args: tuple[Unpack[PosArgsT]],
229 token: object,
230 ) -> T_Retval:
231 pass
233 @classmethod
234 @abstractmethod
235 async def open_process(
236 cls,
237 command: StrOrBytesPath | Sequence[StrOrBytesPath],
238 *,
239 stdin: int | IO[Any] | None,
240 stdout: int | IO[Any] | None,
241 stderr: int | IO[Any] | None,
242 **kwargs: Any,
243 ) -> Process:
244 pass
246 @classmethod
247 @abstractmethod
248 def setup_process_pool_exit_at_shutdown(cls, workers: set[Process]) -> None:
249 pass
251 @classmethod
252 @abstractmethod
253 async def connect_tcp(
254 cls, host: str, port: int, local_address: IPSockAddrType | None = None
255 ) -> SocketStream:
256 pass
258 @classmethod
259 @abstractmethod
260 async def connect_unix(cls, path: str | bytes) -> UNIXSocketStream:
261 pass
263 @classmethod
264 @abstractmethod
265 def create_tcp_listener(cls, sock: socket) -> SocketListener:
266 pass
268 @classmethod
269 @abstractmethod
270 def create_unix_listener(cls, sock: socket) -> SocketListener:
271 pass
273 @classmethod
274 @abstractmethod
275 async def create_udp_socket(
276 cls,
277 family: AddressFamily,
278 local_address: IPSockAddrType | None,
279 remote_address: IPSockAddrType | None,
280 reuse_port: bool,
281 ) -> UDPSocket | ConnectedUDPSocket:
282 pass
284 @classmethod
285 @overload
286 async def create_unix_datagram_socket(
287 cls, raw_socket: socket, remote_path: None
288 ) -> UNIXDatagramSocket: ...
290 @classmethod
291 @overload
292 async def create_unix_datagram_socket(
293 cls, raw_socket: socket, remote_path: str | bytes
294 ) -> ConnectedUNIXDatagramSocket: ...
296 @classmethod
297 @abstractmethod
298 async def create_unix_datagram_socket(
299 cls, raw_socket: socket, remote_path: str | bytes | None
300 ) -> UNIXDatagramSocket | ConnectedUNIXDatagramSocket:
301 pass
303 @classmethod
304 @abstractmethod
305 async def getaddrinfo(
306 cls,
307 host: bytes | str | None,
308 port: str | int | None,
309 *,
310 family: int | AddressFamily = 0,
311 type: int | SocketKind = 0,
312 proto: int = 0,
313 flags: int = 0,
314 ) -> Sequence[
315 tuple[
316 AddressFamily,
317 SocketKind,
318 int,
319 str,
320 tuple[str, int] | tuple[str, int, int, int] | tuple[int, bytes],
321 ]
322 ]:
323 pass
325 @classmethod
326 @abstractmethod
327 async def getnameinfo(
328 cls, sockaddr: IPSockAddrType, flags: int = 0
329 ) -> tuple[str, str]:
330 pass
332 @classmethod
333 @abstractmethod
334 async def wait_readable(cls, obj: FileDescriptorLike) -> None:
335 pass
337 @classmethod
338 @abstractmethod
339 async def wait_writable(cls, obj: FileDescriptorLike) -> None:
340 pass
342 @classmethod
343 @abstractmethod
344 def notify_closing(cls, obj: FileDescriptorLike) -> None:
345 pass
347 @classmethod
348 @abstractmethod
349 async def wrap_listener_socket(cls, sock: socket) -> SocketListener:
350 pass
352 @classmethod
353 @abstractmethod
354 async def wrap_stream_socket(cls, sock: socket) -> SocketStream:
355 pass
357 @classmethod
358 @abstractmethod
359 async def wrap_unix_stream_socket(cls, sock: socket) -> UNIXSocketStream:
360 pass
362 @classmethod
363 @abstractmethod
364 async def wrap_udp_socket(cls, sock: socket) -> UDPSocket:
365 pass
367 @classmethod
368 @abstractmethod
369 async def wrap_connected_udp_socket(cls, sock: socket) -> ConnectedUDPSocket:
370 pass
372 @classmethod
373 @abstractmethod
374 async def wrap_unix_datagram_socket(cls, sock: socket) -> UNIXDatagramSocket:
375 pass
377 @classmethod
378 @abstractmethod
379 async def wrap_connected_unix_datagram_socket(
380 cls, sock: socket
381 ) -> ConnectedUNIXDatagramSocket:
382 pass
384 @classmethod
385 @abstractmethod
386 def current_default_thread_limiter(cls) -> CapacityLimiter:
387 pass
389 @classmethod
390 @abstractmethod
391 def open_signal_receiver(
392 cls, *signals: Signals
393 ) -> AbstractContextManager[AsyncIterator[Signals]]:
394 pass
396 @classmethod
397 @abstractmethod
398 def get_current_task(cls) -> TaskInfo:
399 pass
401 @classmethod
402 @abstractmethod
403 def get_running_tasks(cls) -> Sequence[TaskInfo]:
404 pass
406 @classmethod
407 @abstractmethod
408 async def wait_all_tasks_blocked(cls) -> None:
409 pass
411 @classmethod
412 @abstractmethod
413 def create_test_runner(cls, options: dict[str, Any]) -> TestRunner:
414 pass