Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/anyio/abc/_eventloop.py: 76%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

213 statements  

1from __future__ import annotations 

2 

3import math 

4import sys 

5from abc import ABCMeta, abstractmethod 

6from collections.abc import AsyncIterator, Awaitable, Callable, Sequence 

7from contextlib import AbstractContextManager 

8from os import PathLike 

9from signal import Signals 

10from socket import AddressFamily, SocketKind, socket 

11from typing import ( 

12 IO, 

13 TYPE_CHECKING, 

14 Any, 

15 TypeVar, 

16 Union, 

17 overload, 

18) 

19 

20if sys.version_info >= (3, 11): 

21 from typing import TypeVarTuple, Unpack 

22else: 

23 from typing_extensions import TypeVarTuple, Unpack 

24 

25if sys.version_info >= (3, 10): 

26 from typing import TypeAlias 

27else: 

28 from typing_extensions import TypeAlias 

29 

30if TYPE_CHECKING: 

31 from _typeshed import FileDescriptorLike 

32 

33 from .._core._synchronization import CapacityLimiter, Event, Lock, Semaphore 

34 from .._core._tasks import CancelScope 

35 from .._core._testing import TaskInfo 

36 from ..from_thread import BlockingPortal 

37 from ._sockets import ( 

38 ConnectedUDPSocket, 

39 ConnectedUNIXDatagramSocket, 

40 IPSockAddrType, 

41 SocketListener, 

42 SocketStream, 

43 UDPSocket, 

44 UNIXDatagramSocket, 

45 UNIXSocketStream, 

46 ) 

47 from ._subprocesses import Process 

48 from ._tasks import TaskGroup 

49 from ._testing import TestRunner 

50 

51T_Retval = TypeVar("T_Retval") 

52PosArgsT = TypeVarTuple("PosArgsT") 

53StrOrBytesPath: TypeAlias = Union[str, bytes, "PathLike[str]", "PathLike[bytes]"] 

54 

55 

56class AsyncBackend(metaclass=ABCMeta): 

57 @classmethod 

58 @abstractmethod 

59 def run( 

60 cls, 

61 func: Callable[[Unpack[PosArgsT]], Awaitable[T_Retval]], 

62 args: tuple[Unpack[PosArgsT]], 

63 kwargs: dict[str, Any], 

64 options: dict[str, Any], 

65 ) -> T_Retval: 

66 """ 

67 Run the given coroutine function in an asynchronous event loop. 

68 

69 The current thread must not be already running an event loop. 

70 

71 :param func: a coroutine function 

72 :param args: positional arguments to ``func`` 

73 :param kwargs: positional arguments to ``func`` 

74 :param options: keyword arguments to call the backend ``run()`` implementation 

75 with 

76 :return: the return value of the coroutine function 

77 """ 

78 

79 @classmethod 

80 @abstractmethod 

81 def current_token(cls) -> object: 

82 """ 

83 Return an object that allows other threads to run code inside the event loop. 

84 

85 :return: a token object, specific to the event loop running in the current 

86 thread 

87 """ 

88 

89 @classmethod 

90 @abstractmethod 

91 def current_time(cls) -> float: 

92 """ 

93 Return the current value of the event loop's internal clock. 

94 

95 :return: the clock value (seconds) 

96 """ 

97 

98 @classmethod 

99 @abstractmethod 

100 def cancelled_exception_class(cls) -> type[BaseException]: 

101 """Return the exception class that is raised in a task if it's cancelled.""" 

102 

103 @classmethod 

104 @abstractmethod 

105 async def checkpoint(cls) -> None: 

106 """ 

107 Check if the task has been cancelled, and allow rescheduling of other tasks. 

108 

109 This is effectively the same as running :meth:`checkpoint_if_cancelled` and then 

110 :meth:`cancel_shielded_checkpoint`. 

111 """ 

112 

113 @classmethod 

114 async def checkpoint_if_cancelled(cls) -> None: 

115 """ 

116 Check if the current task group has been cancelled. 

117 

118 This will check if the task has been cancelled, but will not allow other tasks 

119 to be scheduled if not. 

120 

121 """ 

122 if cls.current_effective_deadline() == -math.inf: 

123 await cls.checkpoint() 

124 

125 @classmethod 

126 async def cancel_shielded_checkpoint(cls) -> None: 

127 """ 

128 Allow the rescheduling of other tasks. 

129 

130 This will give other tasks the opportunity to run, but without checking if the 

131 current task group has been cancelled, unlike with :meth:`checkpoint`. 

132 

133 """ 

134 with cls.create_cancel_scope(shield=True): 

135 await cls.sleep(0) 

136 

137 @classmethod 

138 @abstractmethod 

139 async def sleep(cls, delay: float) -> None: 

140 """ 

141 Pause the current task for the specified duration. 

142 

143 :param delay: the duration, in seconds 

144 """ 

145 

146 @classmethod 

147 @abstractmethod 

148 def create_cancel_scope( 

149 cls, *, deadline: float = math.inf, shield: bool = False 

150 ) -> CancelScope: 

151 pass 

152 

153 @classmethod 

154 @abstractmethod 

155 def current_effective_deadline(cls) -> float: 

156 """ 

157 Return the nearest deadline among all the cancel scopes effective for the 

158 current task. 

159 

160 :return: 

161 - a clock value from the event loop's internal clock 

162 - ``inf`` if there is no deadline in effect 

163 - ``-inf`` if the current scope has been cancelled 

164 :rtype: float 

165 """ 

166 

167 @classmethod 

168 @abstractmethod 

169 def create_task_group(cls) -> TaskGroup: 

170 pass 

171 

172 @classmethod 

173 @abstractmethod 

174 def create_event(cls) -> Event: 

175 pass 

176 

177 @classmethod 

178 @abstractmethod 

179 def create_lock(cls, *, fast_acquire: bool) -> Lock: 

180 pass 

181 

182 @classmethod 

183 @abstractmethod 

184 def create_semaphore( 

185 cls, 

186 initial_value: int, 

187 *, 

188 max_value: int | None = None, 

189 fast_acquire: bool = False, 

190 ) -> Semaphore: 

191 pass 

192 

193 @classmethod 

194 @abstractmethod 

195 def create_capacity_limiter(cls, total_tokens: float) -> CapacityLimiter: 

196 pass 

197 

198 @classmethod 

199 @abstractmethod 

200 async def run_sync_in_worker_thread( 

201 cls, 

202 func: Callable[[Unpack[PosArgsT]], T_Retval], 

203 args: tuple[Unpack[PosArgsT]], 

204 abandon_on_cancel: bool = False, 

205 limiter: CapacityLimiter | None = None, 

206 ) -> T_Retval: 

207 pass 

208 

209 @classmethod 

210 @abstractmethod 

211 def check_cancelled(cls) -> None: 

212 pass 

213 

214 @classmethod 

215 @abstractmethod 

216 def run_async_from_thread( 

217 cls, 

218 func: Callable[[Unpack[PosArgsT]], Awaitable[T_Retval]], 

219 args: tuple[Unpack[PosArgsT]], 

220 token: object, 

221 ) -> T_Retval: 

222 pass 

223 

224 @classmethod 

225 @abstractmethod 

226 def run_sync_from_thread( 

227 cls, 

228 func: Callable[[Unpack[PosArgsT]], T_Retval], 

229 args: tuple[Unpack[PosArgsT]], 

230 token: object, 

231 ) -> T_Retval: 

232 pass 

233 

234 @classmethod 

235 @abstractmethod 

236 def create_blocking_portal(cls) -> BlockingPortal: 

237 pass 

238 

239 @classmethod 

240 @abstractmethod 

241 async def open_process( 

242 cls, 

243 command: StrOrBytesPath | Sequence[StrOrBytesPath], 

244 *, 

245 stdin: int | IO[Any] | None, 

246 stdout: int | IO[Any] | None, 

247 stderr: int | IO[Any] | None, 

248 **kwargs: Any, 

249 ) -> Process: 

250 pass 

251 

252 @classmethod 

253 @abstractmethod 

254 def setup_process_pool_exit_at_shutdown(cls, workers: set[Process]) -> None: 

255 pass 

256 

257 @classmethod 

258 @abstractmethod 

259 async def connect_tcp( 

260 cls, host: str, port: int, local_address: IPSockAddrType | None = None 

261 ) -> SocketStream: 

262 pass 

263 

264 @classmethod 

265 @abstractmethod 

266 async def connect_unix(cls, path: str | bytes) -> UNIXSocketStream: 

267 pass 

268 

269 @classmethod 

270 @abstractmethod 

271 def create_tcp_listener(cls, sock: socket) -> SocketListener: 

272 pass 

273 

274 @classmethod 

275 @abstractmethod 

276 def create_unix_listener(cls, sock: socket) -> SocketListener: 

277 pass 

278 

279 @classmethod 

280 @abstractmethod 

281 async def create_udp_socket( 

282 cls, 

283 family: AddressFamily, 

284 local_address: IPSockAddrType | None, 

285 remote_address: IPSockAddrType | None, 

286 reuse_port: bool, 

287 ) -> UDPSocket | ConnectedUDPSocket: 

288 pass 

289 

290 @classmethod 

291 @overload 

292 async def create_unix_datagram_socket( 

293 cls, raw_socket: socket, remote_path: None 

294 ) -> UNIXDatagramSocket: ... 

295 

296 @classmethod 

297 @overload 

298 async def create_unix_datagram_socket( 

299 cls, raw_socket: socket, remote_path: str | bytes 

300 ) -> ConnectedUNIXDatagramSocket: ... 

301 

302 @classmethod 

303 @abstractmethod 

304 async def create_unix_datagram_socket( 

305 cls, raw_socket: socket, remote_path: str | bytes | None 

306 ) -> UNIXDatagramSocket | ConnectedUNIXDatagramSocket: 

307 pass 

308 

309 @classmethod 

310 @abstractmethod 

311 async def getaddrinfo( 

312 cls, 

313 host: bytes | str | None, 

314 port: str | int | None, 

315 *, 

316 family: int | AddressFamily = 0, 

317 type: int | SocketKind = 0, 

318 proto: int = 0, 

319 flags: int = 0, 

320 ) -> Sequence[ 

321 tuple[ 

322 AddressFamily, 

323 SocketKind, 

324 int, 

325 str, 

326 tuple[str, int] | tuple[str, int, int, int] | tuple[int, bytes], 

327 ] 

328 ]: 

329 pass 

330 

331 @classmethod 

332 @abstractmethod 

333 async def getnameinfo( 

334 cls, sockaddr: IPSockAddrType, flags: int = 0 

335 ) -> tuple[str, str]: 

336 pass 

337 

338 @classmethod 

339 @abstractmethod 

340 async def wait_readable(cls, obj: FileDescriptorLike) -> None: 

341 pass 

342 

343 @classmethod 

344 @abstractmethod 

345 async def wait_writable(cls, obj: FileDescriptorLike) -> None: 

346 pass 

347 

348 @classmethod 

349 @abstractmethod 

350 def notify_closing(cls, obj: FileDescriptorLike) -> None: 

351 pass 

352 

353 @classmethod 

354 @abstractmethod 

355 async def wrap_listener_socket(cls, sock: socket) -> SocketListener: 

356 pass 

357 

358 @classmethod 

359 @abstractmethod 

360 async def wrap_stream_socket(cls, sock: socket) -> SocketStream: 

361 pass 

362 

363 @classmethod 

364 @abstractmethod 

365 async def wrap_unix_stream_socket(cls, sock: socket) -> UNIXSocketStream: 

366 pass 

367 

368 @classmethod 

369 @abstractmethod 

370 async def wrap_udp_socket(cls, sock: socket) -> UDPSocket: 

371 pass 

372 

373 @classmethod 

374 @abstractmethod 

375 async def wrap_connected_udp_socket(cls, sock: socket) -> ConnectedUDPSocket: 

376 pass 

377 

378 @classmethod 

379 @abstractmethod 

380 async def wrap_unix_datagram_socket(cls, sock: socket) -> UNIXDatagramSocket: 

381 pass 

382 

383 @classmethod 

384 @abstractmethod 

385 async def wrap_connected_unix_datagram_socket( 

386 cls, sock: socket 

387 ) -> ConnectedUNIXDatagramSocket: 

388 pass 

389 

390 @classmethod 

391 @abstractmethod 

392 def current_default_thread_limiter(cls) -> CapacityLimiter: 

393 pass 

394 

395 @classmethod 

396 @abstractmethod 

397 def open_signal_receiver( 

398 cls, *signals: Signals 

399 ) -> AbstractContextManager[AsyncIterator[Signals]]: 

400 pass 

401 

402 @classmethod 

403 @abstractmethod 

404 def get_current_task(cls) -> TaskInfo: 

405 pass 

406 

407 @classmethod 

408 @abstractmethod 

409 def get_running_tasks(cls) -> Sequence[TaskInfo]: 

410 pass 

411 

412 @classmethod 

413 @abstractmethod 

414 async def wait_all_tasks_blocked(cls) -> None: 

415 pass 

416 

417 @classmethod 

418 @abstractmethod 

419 def create_test_runner(cls, options: dict[str, Any]) -> TestRunner: 

420 pass