Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/anyio/abc/_eventloop.py: 76%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

205 statements  

1from __future__ import annotations 

2 

3import math 

4import sys 

5from abc import ABCMeta, abstractmethod 

6from collections.abc import AsyncIterator, Awaitable, Callable, Sequence 

7from contextlib import AbstractContextManager 

8from os import PathLike 

9from signal import Signals 

10from socket import AddressFamily, SocketKind, socket 

11from typing import ( 

12 IO, 

13 TYPE_CHECKING, 

14 Any, 

15 TypeAlias, 

16 TypeVar, 

17 overload, 

18) 

19 

20if sys.version_info >= (3, 11): 

21 from typing import TypeVarTuple, Unpack 

22else: 

23 from typing_extensions import TypeVarTuple, Unpack 

24 

25if TYPE_CHECKING: 

26 from _typeshed import FileDescriptorLike 

27 

28 from .._core._synchronization import CapacityLimiter, Event, Lock, Semaphore 

29 from .._core._tasks import CancelScope 

30 from .._core._testing import TaskInfo 

31 from ._sockets import ( 

32 ConnectedUDPSocket, 

33 ConnectedUNIXDatagramSocket, 

34 IPSockAddrType, 

35 SocketListener, 

36 SocketStream, 

37 UDPSocket, 

38 UNIXDatagramSocket, 

39 UNIXSocketStream, 

40 ) 

41 from ._subprocesses import Process 

42 from ._tasks import TaskGroup 

43 from ._testing import TestRunner 

44 

45T_Retval = TypeVar("T_Retval") 

46PosArgsT = TypeVarTuple("PosArgsT") 

47StrOrBytesPath: TypeAlias = str | bytes | PathLike[str] | PathLike[bytes] 

48 

49 

50class AsyncBackend(metaclass=ABCMeta): 

51 @classmethod 

52 @abstractmethod 

53 def run( 

54 cls, 

55 func: Callable[[Unpack[PosArgsT]], Awaitable[T_Retval]], 

56 args: tuple[Unpack[PosArgsT]], 

57 kwargs: dict[str, Any], 

58 options: dict[str, Any], 

59 ) -> T_Retval: 

60 """ 

61 Run the given coroutine function in an asynchronous event loop. 

62 

63 The current thread must not be already running an event loop. 

64 

65 :param func: a coroutine function 

66 :param args: positional arguments to ``func`` 

67 :param kwargs: positional arguments to ``func`` 

68 :param options: keyword arguments to call the backend ``run()`` implementation 

69 with 

70 :return: the return value of the coroutine function 

71 """ 

72 

73 @classmethod 

74 @abstractmethod 

75 def current_token(cls) -> object: 

76 """ 

77 Return an object that allows other threads to run code inside the event loop. 

78 

79 :return: a token object, specific to the event loop running in the current 

80 thread 

81 """ 

82 

83 @classmethod 

84 @abstractmethod 

85 def current_time(cls) -> float: 

86 """ 

87 Return the current value of the event loop's internal clock. 

88 

89 :return: the clock value (seconds) 

90 """ 

91 

92 @classmethod 

93 @abstractmethod 

94 def cancelled_exception_class(cls) -> type[BaseException]: 

95 """Return the exception class that is raised in a task if it's cancelled.""" 

96 

97 @classmethod 

98 @abstractmethod 

99 async def checkpoint(cls) -> None: 

100 """ 

101 Check if the task has been cancelled, and allow rescheduling of other tasks. 

102 

103 This is effectively the same as running :meth:`checkpoint_if_cancelled` and then 

104 :meth:`cancel_shielded_checkpoint`. 

105 """ 

106 

107 @classmethod 

108 async def checkpoint_if_cancelled(cls) -> None: 

109 """ 

110 Check if the current task group has been cancelled. 

111 

112 This will check if the task has been cancelled, but will not allow other tasks 

113 to be scheduled if not. 

114 

115 """ 

116 if cls.current_effective_deadline() == -math.inf: 

117 await cls.checkpoint() 

118 

119 @classmethod 

120 async def cancel_shielded_checkpoint(cls) -> None: 

121 """ 

122 Allow the rescheduling of other tasks. 

123 

124 This will give other tasks the opportunity to run, but without checking if the 

125 current task group has been cancelled, unlike with :meth:`checkpoint`. 

126 

127 """ 

128 with cls.create_cancel_scope(shield=True): 

129 await cls.sleep(0) 

130 

131 @classmethod 

132 @abstractmethod 

133 async def sleep(cls, delay: float) -> None: 

134 """ 

135 Pause the current task for the specified duration. 

136 

137 :param delay: the duration, in seconds 

138 """ 

139 

140 @classmethod 

141 @abstractmethod 

142 def create_cancel_scope( 

143 cls, *, deadline: float = math.inf, shield: bool = False 

144 ) -> CancelScope: 

145 pass 

146 

147 @classmethod 

148 @abstractmethod 

149 def current_effective_deadline(cls) -> float: 

150 """ 

151 Return the nearest deadline among all the cancel scopes effective for the 

152 current task. 

153 

154 :return: 

155 - a clock value from the event loop's internal clock 

156 - ``inf`` if there is no deadline in effect 

157 - ``-inf`` if the current scope has been cancelled 

158 :rtype: float 

159 """ 

160 

161 @classmethod 

162 @abstractmethod 

163 def create_task_group(cls) -> TaskGroup: 

164 pass 

165 

166 @classmethod 

167 @abstractmethod 

168 def create_event(cls) -> Event: 

169 pass 

170 

171 @classmethod 

172 @abstractmethod 

173 def create_lock(cls, *, fast_acquire: bool) -> Lock: 

174 pass 

175 

176 @classmethod 

177 @abstractmethod 

178 def create_semaphore( 

179 cls, 

180 initial_value: int, 

181 *, 

182 max_value: int | None = None, 

183 fast_acquire: bool = False, 

184 ) -> Semaphore: 

185 pass 

186 

187 @classmethod 

188 @abstractmethod 

189 def create_capacity_limiter(cls, total_tokens: float) -> CapacityLimiter: 

190 pass 

191 

192 @classmethod 

193 @abstractmethod 

194 async def run_sync_in_worker_thread( 

195 cls, 

196 func: Callable[[Unpack[PosArgsT]], T_Retval], 

197 args: tuple[Unpack[PosArgsT]], 

198 abandon_on_cancel: bool = False, 

199 limiter: CapacityLimiter | None = None, 

200 ) -> T_Retval: 

201 pass 

202 

203 @classmethod 

204 @abstractmethod 

205 def check_cancelled(cls) -> None: 

206 pass 

207 

208 @classmethod 

209 @abstractmethod 

210 def run_async_from_thread( 

211 cls, 

212 func: Callable[[Unpack[PosArgsT]], Awaitable[T_Retval]], 

213 args: tuple[Unpack[PosArgsT]], 

214 token: object, 

215 ) -> T_Retval: 

216 pass 

217 

218 @classmethod 

219 @abstractmethod 

220 def run_sync_from_thread( 

221 cls, 

222 func: Callable[[Unpack[PosArgsT]], T_Retval], 

223 args: tuple[Unpack[PosArgsT]], 

224 token: object, 

225 ) -> T_Retval: 

226 pass 

227 

228 @classmethod 

229 @abstractmethod 

230 async def open_process( 

231 cls, 

232 command: StrOrBytesPath | Sequence[StrOrBytesPath], 

233 *, 

234 stdin: int | IO[Any] | None, 

235 stdout: int | IO[Any] | None, 

236 stderr: int | IO[Any] | None, 

237 **kwargs: Any, 

238 ) -> Process: 

239 pass 

240 

241 @classmethod 

242 @abstractmethod 

243 def setup_process_pool_exit_at_shutdown(cls, workers: set[Process]) -> None: 

244 pass 

245 

246 @classmethod 

247 @abstractmethod 

248 async def connect_tcp( 

249 cls, host: str, port: int, local_address: IPSockAddrType | None = None 

250 ) -> SocketStream: 

251 pass 

252 

253 @classmethod 

254 @abstractmethod 

255 async def connect_unix(cls, path: str | bytes) -> UNIXSocketStream: 

256 pass 

257 

258 @classmethod 

259 @abstractmethod 

260 def create_tcp_listener(cls, sock: socket) -> SocketListener: 

261 pass 

262 

263 @classmethod 

264 @abstractmethod 

265 def create_unix_listener(cls, sock: socket) -> SocketListener: 

266 pass 

267 

268 @classmethod 

269 @abstractmethod 

270 async def create_udp_socket( 

271 cls, 

272 family: AddressFamily, 

273 local_address: IPSockAddrType | None, 

274 remote_address: IPSockAddrType | None, 

275 reuse_port: bool, 

276 ) -> UDPSocket | ConnectedUDPSocket: 

277 pass 

278 

279 @classmethod 

280 @overload 

281 async def create_unix_datagram_socket( 

282 cls, raw_socket: socket, remote_path: None 

283 ) -> UNIXDatagramSocket: ... 

284 

285 @classmethod 

286 @overload 

287 async def create_unix_datagram_socket( 

288 cls, raw_socket: socket, remote_path: str | bytes 

289 ) -> ConnectedUNIXDatagramSocket: ... 

290 

291 @classmethod 

292 @abstractmethod 

293 async def create_unix_datagram_socket( 

294 cls, raw_socket: socket, remote_path: str | bytes | None 

295 ) -> UNIXDatagramSocket | ConnectedUNIXDatagramSocket: 

296 pass 

297 

298 @classmethod 

299 @abstractmethod 

300 async def getaddrinfo( 

301 cls, 

302 host: bytes | str | None, 

303 port: str | int | None, 

304 *, 

305 family: int | AddressFamily = 0, 

306 type: int | SocketKind = 0, 

307 proto: int = 0, 

308 flags: int = 0, 

309 ) -> Sequence[ 

310 tuple[ 

311 AddressFamily, 

312 SocketKind, 

313 int, 

314 str, 

315 tuple[str, int] | tuple[str, int, int, int] | tuple[int, bytes], 

316 ] 

317 ]: 

318 pass 

319 

320 @classmethod 

321 @abstractmethod 

322 async def getnameinfo( 

323 cls, sockaddr: IPSockAddrType, flags: int = 0 

324 ) -> tuple[str, str]: 

325 pass 

326 

327 @classmethod 

328 @abstractmethod 

329 async def wait_readable(cls, obj: FileDescriptorLike) -> None: 

330 pass 

331 

332 @classmethod 

333 @abstractmethod 

334 async def wait_writable(cls, obj: FileDescriptorLike) -> None: 

335 pass 

336 

337 @classmethod 

338 @abstractmethod 

339 def notify_closing(cls, obj: FileDescriptorLike) -> None: 

340 pass 

341 

342 @classmethod 

343 @abstractmethod 

344 async def wrap_listener_socket(cls, sock: socket) -> SocketListener: 

345 pass 

346 

347 @classmethod 

348 @abstractmethod 

349 async def wrap_stream_socket(cls, sock: socket) -> SocketStream: 

350 pass 

351 

352 @classmethod 

353 @abstractmethod 

354 async def wrap_unix_stream_socket(cls, sock: socket) -> UNIXSocketStream: 

355 pass 

356 

357 @classmethod 

358 @abstractmethod 

359 async def wrap_udp_socket(cls, sock: socket) -> UDPSocket: 

360 pass 

361 

362 @classmethod 

363 @abstractmethod 

364 async def wrap_connected_udp_socket(cls, sock: socket) -> ConnectedUDPSocket: 

365 pass 

366 

367 @classmethod 

368 @abstractmethod 

369 async def wrap_unix_datagram_socket(cls, sock: socket) -> UNIXDatagramSocket: 

370 pass 

371 

372 @classmethod 

373 @abstractmethod 

374 async def wrap_connected_unix_datagram_socket( 

375 cls, sock: socket 

376 ) -> ConnectedUNIXDatagramSocket: 

377 pass 

378 

379 @classmethod 

380 @abstractmethod 

381 def current_default_thread_limiter(cls) -> CapacityLimiter: 

382 pass 

383 

384 @classmethod 

385 @abstractmethod 

386 def open_signal_receiver( 

387 cls, *signals: Signals 

388 ) -> AbstractContextManager[AsyncIterator[Signals]]: 

389 pass 

390 

391 @classmethod 

392 @abstractmethod 

393 def get_current_task(cls) -> TaskInfo: 

394 pass 

395 

396 @classmethod 

397 @abstractmethod 

398 def get_running_tasks(cls) -> Sequence[TaskInfo]: 

399 pass 

400 

401 @classmethod 

402 @abstractmethod 

403 async def wait_all_tasks_blocked(cls) -> None: 

404 pass 

405 

406 @classmethod 

407 @abstractmethod 

408 def create_test_runner(cls, options: dict[str, Any]) -> TestRunner: 

409 pass