Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/anyio/abc/_eventloop.py: 76%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

208 statements  

1from __future__ import annotations 

2 

3import math 

4import sys 

5from abc import ABCMeta, abstractmethod 

6from collections.abc import AsyncIterator, Awaitable, Callable, Sequence 

7from contextlib import AbstractContextManager 

8from os import PathLike 

9from signal import Signals 

10from socket import AddressFamily, SocketKind, socket 

11from typing import ( 

12 IO, 

13 TYPE_CHECKING, 

14 Any, 

15 TypeVar, 

16 Union, 

17 overload, 

18) 

19 

20if sys.version_info >= (3, 11): 

21 from typing import TypeVarTuple, Unpack 

22else: 

23 from typing_extensions import TypeVarTuple, Unpack 

24 

25if sys.version_info >= (3, 10): 

26 from typing import TypeAlias 

27else: 

28 from typing_extensions import TypeAlias 

29 

30if TYPE_CHECKING: 

31 from _typeshed import FileDescriptorLike 

32 

33 from .._core._synchronization import CapacityLimiter, Event, Lock, Semaphore 

34 from .._core._tasks import CancelScope 

35 from .._core._testing import TaskInfo 

36 from ._sockets import ( 

37 ConnectedUDPSocket, 

38 ConnectedUNIXDatagramSocket, 

39 IPSockAddrType, 

40 SocketListener, 

41 SocketStream, 

42 UDPSocket, 

43 UNIXDatagramSocket, 

44 UNIXSocketStream, 

45 ) 

46 from ._subprocesses import Process 

47 from ._tasks import TaskGroup 

48 from ._testing import TestRunner 

49 

50T_Retval = TypeVar("T_Retval") 

51PosArgsT = TypeVarTuple("PosArgsT") 

52StrOrBytesPath: TypeAlias = Union[str, bytes, "PathLike[str]", "PathLike[bytes]"] 

53 

54 

55class AsyncBackend(metaclass=ABCMeta): 

56 @classmethod 

57 @abstractmethod 

58 def run( 

59 cls, 

60 func: Callable[[Unpack[PosArgsT]], Awaitable[T_Retval]], 

61 args: tuple[Unpack[PosArgsT]], 

62 kwargs: dict[str, Any], 

63 options: dict[str, Any], 

64 ) -> T_Retval: 

65 """ 

66 Run the given coroutine function in an asynchronous event loop. 

67 

68 The current thread must not be already running an event loop. 

69 

70 :param func: a coroutine function 

71 :param args: positional arguments to ``func`` 

72 :param kwargs: positional arguments to ``func`` 

73 :param options: keyword arguments to call the backend ``run()`` implementation 

74 with 

75 :return: the return value of the coroutine function 

76 """ 

77 

78 @classmethod 

79 @abstractmethod 

80 def current_token(cls) -> object: 

81 """ 

82 Return an object that allows other threads to run code inside the event loop. 

83 

84 :return: a token object, specific to the event loop running in the current 

85 thread 

86 """ 

87 

88 @classmethod 

89 @abstractmethod 

90 def current_time(cls) -> float: 

91 """ 

92 Return the current value of the event loop's internal clock. 

93 

94 :return: the clock value (seconds) 

95 """ 

96 

97 @classmethod 

98 @abstractmethod 

99 def cancelled_exception_class(cls) -> type[BaseException]: 

100 """Return the exception class that is raised in a task if it's cancelled.""" 

101 

102 @classmethod 

103 @abstractmethod 

104 async def checkpoint(cls) -> None: 

105 """ 

106 Check if the task has been cancelled, and allow rescheduling of other tasks. 

107 

108 This is effectively the same as running :meth:`checkpoint_if_cancelled` and then 

109 :meth:`cancel_shielded_checkpoint`. 

110 """ 

111 

112 @classmethod 

113 async def checkpoint_if_cancelled(cls) -> None: 

114 """ 

115 Check if the current task group has been cancelled. 

116 

117 This will check if the task has been cancelled, but will not allow other tasks 

118 to be scheduled if not. 

119 

120 """ 

121 if cls.current_effective_deadline() == -math.inf: 

122 await cls.checkpoint() 

123 

124 @classmethod 

125 async def cancel_shielded_checkpoint(cls) -> None: 

126 """ 

127 Allow the rescheduling of other tasks. 

128 

129 This will give other tasks the opportunity to run, but without checking if the 

130 current task group has been cancelled, unlike with :meth:`checkpoint`. 

131 

132 """ 

133 with cls.create_cancel_scope(shield=True): 

134 await cls.sleep(0) 

135 

136 @classmethod 

137 @abstractmethod 

138 async def sleep(cls, delay: float) -> None: 

139 """ 

140 Pause the current task for the specified duration. 

141 

142 :param delay: the duration, in seconds 

143 """ 

144 

145 @classmethod 

146 @abstractmethod 

147 def create_cancel_scope( 

148 cls, *, deadline: float = math.inf, shield: bool = False 

149 ) -> CancelScope: 

150 pass 

151 

152 @classmethod 

153 @abstractmethod 

154 def current_effective_deadline(cls) -> float: 

155 """ 

156 Return the nearest deadline among all the cancel scopes effective for the 

157 current task. 

158 

159 :return: 

160 - a clock value from the event loop's internal clock 

161 - ``inf`` if there is no deadline in effect 

162 - ``-inf`` if the current scope has been cancelled 

163 :rtype: float 

164 """ 

165 

166 @classmethod 

167 @abstractmethod 

168 def create_task_group(cls) -> TaskGroup: 

169 pass 

170 

171 @classmethod 

172 @abstractmethod 

173 def create_event(cls) -> Event: 

174 pass 

175 

176 @classmethod 

177 @abstractmethod 

178 def create_lock(cls, *, fast_acquire: bool) -> Lock: 

179 pass 

180 

181 @classmethod 

182 @abstractmethod 

183 def create_semaphore( 

184 cls, 

185 initial_value: int, 

186 *, 

187 max_value: int | None = None, 

188 fast_acquire: bool = False, 

189 ) -> Semaphore: 

190 pass 

191 

192 @classmethod 

193 @abstractmethod 

194 def create_capacity_limiter(cls, total_tokens: float) -> CapacityLimiter: 

195 pass 

196 

197 @classmethod 

198 @abstractmethod 

199 async def run_sync_in_worker_thread( 

200 cls, 

201 func: Callable[[Unpack[PosArgsT]], T_Retval], 

202 args: tuple[Unpack[PosArgsT]], 

203 abandon_on_cancel: bool = False, 

204 limiter: CapacityLimiter | None = None, 

205 ) -> T_Retval: 

206 pass 

207 

208 @classmethod 

209 @abstractmethod 

210 def check_cancelled(cls) -> None: 

211 pass 

212 

213 @classmethod 

214 @abstractmethod 

215 def run_async_from_thread( 

216 cls, 

217 func: Callable[[Unpack[PosArgsT]], Awaitable[T_Retval]], 

218 args: tuple[Unpack[PosArgsT]], 

219 token: object, 

220 ) -> T_Retval: 

221 pass 

222 

223 @classmethod 

224 @abstractmethod 

225 def run_sync_from_thread( 

226 cls, 

227 func: Callable[[Unpack[PosArgsT]], T_Retval], 

228 args: tuple[Unpack[PosArgsT]], 

229 token: object, 

230 ) -> T_Retval: 

231 pass 

232 

233 @classmethod 

234 @abstractmethod 

235 async def open_process( 

236 cls, 

237 command: StrOrBytesPath | Sequence[StrOrBytesPath], 

238 *, 

239 stdin: int | IO[Any] | None, 

240 stdout: int | IO[Any] | None, 

241 stderr: int | IO[Any] | None, 

242 **kwargs: Any, 

243 ) -> Process: 

244 pass 

245 

246 @classmethod 

247 @abstractmethod 

248 def setup_process_pool_exit_at_shutdown(cls, workers: set[Process]) -> None: 

249 pass 

250 

251 @classmethod 

252 @abstractmethod 

253 async def connect_tcp( 

254 cls, host: str, port: int, local_address: IPSockAddrType | None = None 

255 ) -> SocketStream: 

256 pass 

257 

258 @classmethod 

259 @abstractmethod 

260 async def connect_unix(cls, path: str | bytes) -> UNIXSocketStream: 

261 pass 

262 

263 @classmethod 

264 @abstractmethod 

265 def create_tcp_listener(cls, sock: socket) -> SocketListener: 

266 pass 

267 

268 @classmethod 

269 @abstractmethod 

270 def create_unix_listener(cls, sock: socket) -> SocketListener: 

271 pass 

272 

273 @classmethod 

274 @abstractmethod 

275 async def create_udp_socket( 

276 cls, 

277 family: AddressFamily, 

278 local_address: IPSockAddrType | None, 

279 remote_address: IPSockAddrType | None, 

280 reuse_port: bool, 

281 ) -> UDPSocket | ConnectedUDPSocket: 

282 pass 

283 

284 @classmethod 

285 @overload 

286 async def create_unix_datagram_socket( 

287 cls, raw_socket: socket, remote_path: None 

288 ) -> UNIXDatagramSocket: ... 

289 

290 @classmethod 

291 @overload 

292 async def create_unix_datagram_socket( 

293 cls, raw_socket: socket, remote_path: str | bytes 

294 ) -> ConnectedUNIXDatagramSocket: ... 

295 

296 @classmethod 

297 @abstractmethod 

298 async def create_unix_datagram_socket( 

299 cls, raw_socket: socket, remote_path: str | bytes | None 

300 ) -> UNIXDatagramSocket | ConnectedUNIXDatagramSocket: 

301 pass 

302 

303 @classmethod 

304 @abstractmethod 

305 async def getaddrinfo( 

306 cls, 

307 host: bytes | str | None, 

308 port: str | int | None, 

309 *, 

310 family: int | AddressFamily = 0, 

311 type: int | SocketKind = 0, 

312 proto: int = 0, 

313 flags: int = 0, 

314 ) -> Sequence[ 

315 tuple[ 

316 AddressFamily, 

317 SocketKind, 

318 int, 

319 str, 

320 tuple[str, int] | tuple[str, int, int, int] | tuple[int, bytes], 

321 ] 

322 ]: 

323 pass 

324 

325 @classmethod 

326 @abstractmethod 

327 async def getnameinfo( 

328 cls, sockaddr: IPSockAddrType, flags: int = 0 

329 ) -> tuple[str, str]: 

330 pass 

331 

332 @classmethod 

333 @abstractmethod 

334 async def wait_readable(cls, obj: FileDescriptorLike) -> None: 

335 pass 

336 

337 @classmethod 

338 @abstractmethod 

339 async def wait_writable(cls, obj: FileDescriptorLike) -> None: 

340 pass 

341 

342 @classmethod 

343 @abstractmethod 

344 def notify_closing(cls, obj: FileDescriptorLike) -> None: 

345 pass 

346 

347 @classmethod 

348 @abstractmethod 

349 async def wrap_listener_socket(cls, sock: socket) -> SocketListener: 

350 pass 

351 

352 @classmethod 

353 @abstractmethod 

354 async def wrap_stream_socket(cls, sock: socket) -> SocketStream: 

355 pass 

356 

357 @classmethod 

358 @abstractmethod 

359 async def wrap_unix_stream_socket(cls, sock: socket) -> UNIXSocketStream: 

360 pass 

361 

362 @classmethod 

363 @abstractmethod 

364 async def wrap_udp_socket(cls, sock: socket) -> UDPSocket: 

365 pass 

366 

367 @classmethod 

368 @abstractmethod 

369 async def wrap_connected_udp_socket(cls, sock: socket) -> ConnectedUDPSocket: 

370 pass 

371 

372 @classmethod 

373 @abstractmethod 

374 async def wrap_unix_datagram_socket(cls, sock: socket) -> UNIXDatagramSocket: 

375 pass 

376 

377 @classmethod 

378 @abstractmethod 

379 async def wrap_connected_unix_datagram_socket( 

380 cls, sock: socket 

381 ) -> ConnectedUNIXDatagramSocket: 

382 pass 

383 

384 @classmethod 

385 @abstractmethod 

386 def current_default_thread_limiter(cls) -> CapacityLimiter: 

387 pass 

388 

389 @classmethod 

390 @abstractmethod 

391 def open_signal_receiver( 

392 cls, *signals: Signals 

393 ) -> AbstractContextManager[AsyncIterator[Signals]]: 

394 pass 

395 

396 @classmethod 

397 @abstractmethod 

398 def get_current_task(cls) -> TaskInfo: 

399 pass 

400 

401 @classmethod 

402 @abstractmethod 

403 def get_running_tasks(cls) -> Sequence[TaskInfo]: 

404 pass 

405 

406 @classmethod 

407 @abstractmethod 

408 async def wait_all_tasks_blocked(cls) -> None: 

409 pass 

410 

411 @classmethod 

412 @abstractmethod 

413 def create_test_runner(cls, options: dict[str, Any]) -> TestRunner: 

414 pass