Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/anyio/abc/_eventloop.py: 75%
169 statements
« prev ^ index » next coverage.py v7.3.1, created at 2023-09-25 06:38 +0000
« prev ^ index » next coverage.py v7.3.1, created at 2023-09-25 06:38 +0000
1from __future__ import annotations
3import math
4from abc import ABCMeta, abstractmethod
5from collections.abc import AsyncIterator, Awaitable, Mapping
6from os import PathLike
7from signal import Signals
8from socket import AddressFamily, SocketKind, socket
9from typing import (
10 IO,
11 TYPE_CHECKING,
12 Any,
13 Callable,
14 ContextManager,
15 Sequence,
16 TypeVar,
17 overload,
18)
20if TYPE_CHECKING:
21 from typing import Literal
23 from .._core._synchronization import CapacityLimiter, Event
24 from .._core._tasks import CancelScope
25 from .._core._testing import TaskInfo
26 from ..from_thread import BlockingPortal
27 from ._sockets import (
28 ConnectedUDPSocket,
29 ConnectedUNIXDatagramSocket,
30 IPSockAddrType,
31 SocketListener,
32 SocketStream,
33 UDPSocket,
34 UNIXDatagramSocket,
35 UNIXSocketStream,
36 )
37 from ._subprocesses import Process
38 from ._tasks import TaskGroup
39 from ._testing import TestRunner
41T_Retval = TypeVar("T_Retval")
44class AsyncBackend(metaclass=ABCMeta):
45 @classmethod
46 @abstractmethod
47 def run(
48 cls,
49 func: Callable[..., Awaitable[T_Retval]],
50 args: tuple[Any, ...],
51 kwargs: dict[str, Any],
52 options: dict[str, Any],
53 ) -> T_Retval:
54 """
55 Run the given coroutine function in an asynchronous event loop.
57 The current thread must not be already running an event loop.
59 :param func: a coroutine function
60 :param args: positional arguments to ``func``
61 :param kwargs: positional arguments to ``func``
62 :param options: keyword arguments to call the backend ``run()`` implementation
63 with
64 :return: the return value of the coroutine function
65 """
67 @classmethod
68 @abstractmethod
69 def current_token(cls) -> object:
70 """
72 :return:
73 """
75 @classmethod
76 @abstractmethod
77 def current_time(cls) -> float:
78 """
79 Return the current value of the event loop's internal clock.
81 :return: the clock value (seconds)
82 """
84 @classmethod
85 @abstractmethod
86 def cancelled_exception_class(cls) -> type[BaseException]:
87 """Return the exception class that is raised in a task if it's cancelled."""
89 @classmethod
90 @abstractmethod
91 async def checkpoint(cls) -> None:
92 """
93 Check if the task has been cancelled, and allow rescheduling of other tasks.
95 This is effectively the same as running :meth:`checkpoint_if_cancelled` and then
96 :meth:`cancel_shielded_checkpoint`.
97 """
99 @classmethod
100 async def checkpoint_if_cancelled(cls) -> None:
101 """
102 Check if the current task group has been cancelled.
104 This will check if the task has been cancelled, but will not allow other tasks
105 to be scheduled if not.
107 """
108 if cls.current_effective_deadline() == -math.inf:
109 await cls.checkpoint()
111 @classmethod
112 async def cancel_shielded_checkpoint(cls) -> None:
113 """
114 Allow the rescheduling of other tasks.
116 This will give other tasks the opportunity to run, but without checking if the
117 current task group has been cancelled, unlike with :meth:`checkpoint`.
119 """
120 with cls.create_cancel_scope(shield=True):
121 await cls.sleep(0)
123 @classmethod
124 @abstractmethod
125 async def sleep(cls, delay: float) -> None:
126 """
127 Pause the current task for the specified duration.
129 :param delay: the duration, in seconds
130 """
132 @classmethod
133 @abstractmethod
134 def create_cancel_scope(
135 cls, *, deadline: float = math.inf, shield: bool = False
136 ) -> CancelScope:
137 pass
139 @classmethod
140 @abstractmethod
141 def current_effective_deadline(cls) -> float:
142 """
143 Return the nearest deadline among all the cancel scopes effective for the
144 current task.
146 :return:
147 - a clock value from the event loop's internal clock
148 - ``inf`` if there is no deadline in effect
149 - ``-inf`` if the current scope has been cancelled
150 :rtype: float
151 """
153 @classmethod
154 @abstractmethod
155 def create_task_group(cls) -> TaskGroup:
156 pass
158 @classmethod
159 @abstractmethod
160 def create_event(cls) -> Event:
161 pass
163 @classmethod
164 @abstractmethod
165 def create_capacity_limiter(cls, total_tokens: float) -> CapacityLimiter:
166 pass
168 @classmethod
169 @abstractmethod
170 async def run_sync_in_worker_thread(
171 cls,
172 func: Callable[..., T_Retval],
173 args: tuple[Any, ...],
174 cancellable: bool = False,
175 limiter: CapacityLimiter | None = None,
176 ) -> T_Retval:
177 pass
179 @classmethod
180 @abstractmethod
181 def run_async_from_thread(
182 cls,
183 func: Callable[..., Awaitable[T_Retval]],
184 args: tuple[Any],
185 token: object,
186 ) -> T_Retval:
187 pass
189 @classmethod
190 @abstractmethod
191 def run_sync_from_thread(
192 cls, func: Callable[..., T_Retval], args: tuple[Any, ...], token: object
193 ) -> T_Retval:
194 pass
196 @classmethod
197 @abstractmethod
198 def create_blocking_portal(cls) -> BlockingPortal:
199 pass
201 @classmethod
202 @overload
203 async def open_process(
204 cls,
205 command: str | bytes,
206 *,
207 shell: Literal[True],
208 stdin: int | IO[Any] | None,
209 stdout: int | IO[Any] | None,
210 stderr: int | IO[Any] | None,
211 cwd: str | bytes | PathLike[str] | None = None,
212 env: Mapping[str, str] | None = None,
213 start_new_session: bool = False,
214 ) -> Process:
215 pass
217 @classmethod
218 @overload
219 async def open_process(
220 cls,
221 command: Sequence[str | bytes],
222 *,
223 shell: Literal[False],
224 stdin: int | IO[Any] | None,
225 stdout: int | IO[Any] | None,
226 stderr: int | IO[Any] | None,
227 cwd: str | bytes | PathLike[str] | None = None,
228 env: Mapping[str, str] | None = None,
229 start_new_session: bool = False,
230 ) -> Process:
231 pass
233 @classmethod
234 @abstractmethod
235 async def open_process(
236 cls,
237 command: str | bytes | Sequence[str | bytes],
238 *,
239 shell: bool,
240 stdin: int | IO[Any] | None,
241 stdout: int | IO[Any] | None,
242 stderr: int | IO[Any] | None,
243 cwd: str | bytes | PathLike[str] | None = None,
244 env: Mapping[str, str] | None = None,
245 start_new_session: bool = False,
246 ) -> Process:
247 pass
249 @classmethod
250 @abstractmethod
251 def setup_process_pool_exit_at_shutdown(cls, workers: set[Process]) -> None:
252 pass
254 @classmethod
255 @abstractmethod
256 async def connect_tcp(
257 cls, host: str, port: int, local_address: IPSockAddrType | None = None
258 ) -> SocketStream:
259 pass
261 @classmethod
262 @abstractmethod
263 async def connect_unix(cls, path: str) -> UNIXSocketStream:
264 pass
266 @classmethod
267 @abstractmethod
268 def create_tcp_listener(cls, sock: socket) -> SocketListener:
269 pass
271 @classmethod
272 @abstractmethod
273 def create_unix_listener(cls, sock: socket) -> SocketListener:
274 pass
276 @classmethod
277 @abstractmethod
278 async def create_udp_socket(
279 cls,
280 family: AddressFamily,
281 local_address: IPSockAddrType | None,
282 remote_address: IPSockAddrType | None,
283 reuse_port: bool,
284 ) -> UDPSocket | ConnectedUDPSocket:
285 pass
287 @classmethod
288 @overload
289 async def create_unix_datagram_socket(
290 cls, raw_socket: socket, remote_path: None
291 ) -> UNIXDatagramSocket:
292 ...
294 @classmethod
295 @overload
296 async def create_unix_datagram_socket(
297 cls, raw_socket: socket, remote_path: str
298 ) -> ConnectedUNIXDatagramSocket:
299 ...
301 @classmethod
302 @abstractmethod
303 async def create_unix_datagram_socket(
304 cls, raw_socket: socket, remote_path: str | None
305 ) -> UNIXDatagramSocket | ConnectedUNIXDatagramSocket:
306 pass
308 @classmethod
309 @abstractmethod
310 async def getaddrinfo(
311 cls,
312 host: bytes | str | None,
313 port: str | int | None,
314 *,
315 family: int | AddressFamily = 0,
316 type: int | SocketKind = 0,
317 proto: int = 0,
318 flags: int = 0,
319 ) -> list[
320 tuple[
321 AddressFamily,
322 SocketKind,
323 int,
324 str,
325 tuple[str, int] | tuple[str, int, int, int],
326 ]
327 ]:
328 pass
330 @classmethod
331 @abstractmethod
332 async def getnameinfo(
333 cls, sockaddr: IPSockAddrType, flags: int = 0
334 ) -> tuple[str, str]:
335 pass
337 @classmethod
338 @abstractmethod
339 async def wait_socket_readable(cls, sock: socket) -> None:
340 pass
342 @classmethod
343 @abstractmethod
344 async def wait_socket_writable(cls, sock: socket) -> None:
345 pass
347 @classmethod
348 @abstractmethod
349 def current_default_thread_limiter(cls) -> CapacityLimiter:
350 pass
352 @classmethod
353 @abstractmethod
354 def open_signal_receiver(
355 cls, *signals: Signals
356 ) -> ContextManager[AsyncIterator[Signals]]:
357 pass
359 @classmethod
360 @abstractmethod
361 def get_current_task(cls) -> TaskInfo:
362 pass
364 @classmethod
365 @abstractmethod
366 def get_running_tasks(cls) -> list[TaskInfo]:
367 pass
369 @classmethod
370 @abstractmethod
371 async def wait_all_tasks_blocked(cls) -> None:
372 pass
374 @classmethod
375 @abstractmethod
376 def create_test_runner(cls, options: dict[str, Any]) -> TestRunner:
377 pass