Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/anyio/abc/_eventloop.py: 76%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

181 statements  

1from __future__ import annotations 

2 

3import math 

4import sys 

5from abc import ABCMeta, abstractmethod 

6from collections.abc import AsyncIterator, Awaitable, Callable, Sequence 

7from contextlib import AbstractContextManager 

8from os import PathLike 

9from signal import Signals 

10from socket import AddressFamily, SocketKind, socket 

11from typing import ( 

12 IO, 

13 TYPE_CHECKING, 

14 Any, 

15 TypeVar, 

16 Union, 

17 overload, 

18) 

19 

20if sys.version_info >= (3, 11): 

21 from typing import TypeVarTuple, Unpack 

22else: 

23 from typing_extensions import TypeVarTuple, Unpack 

24 

25if sys.version_info >= (3, 10): 

26 from typing import TypeAlias 

27else: 

28 from typing_extensions import TypeAlias 

29 

30if TYPE_CHECKING: 

31 from _typeshed import HasFileno 

32 

33 from .._core._synchronization import CapacityLimiter, Event, Lock, Semaphore 

34 from .._core._tasks import CancelScope 

35 from .._core._testing import TaskInfo 

36 from ..from_thread import BlockingPortal 

37 from ._sockets import ( 

38 ConnectedUDPSocket, 

39 ConnectedUNIXDatagramSocket, 

40 IPSockAddrType, 

41 SocketListener, 

42 SocketStream, 

43 UDPSocket, 

44 UNIXDatagramSocket, 

45 UNIXSocketStream, 

46 ) 

47 from ._subprocesses import Process 

48 from ._tasks import TaskGroup 

49 from ._testing import TestRunner 

50 

51T_Retval = TypeVar("T_Retval") 

52PosArgsT = TypeVarTuple("PosArgsT") 

53StrOrBytesPath: TypeAlias = Union[str, bytes, "PathLike[str]", "PathLike[bytes]"] 

54 

55 

56class AsyncBackend(metaclass=ABCMeta): 

57 @classmethod 

58 @abstractmethod 

59 def run( 

60 cls, 

61 func: Callable[[Unpack[PosArgsT]], Awaitable[T_Retval]], 

62 args: tuple[Unpack[PosArgsT]], 

63 kwargs: dict[str, Any], 

64 options: dict[str, Any], 

65 ) -> T_Retval: 

66 """ 

67 Run the given coroutine function in an asynchronous event loop. 

68 

69 The current thread must not be already running an event loop. 

70 

71 :param func: a coroutine function 

72 :param args: positional arguments to ``func`` 

73 :param kwargs: positional arguments to ``func`` 

74 :param options: keyword arguments to call the backend ``run()`` implementation 

75 with 

76 :return: the return value of the coroutine function 

77 """ 

78 

79 @classmethod 

80 @abstractmethod 

81 def current_token(cls) -> object: 

82 """ 

83 

84 :return: 

85 """ 

86 

87 @classmethod 

88 @abstractmethod 

89 def current_time(cls) -> float: 

90 """ 

91 Return the current value of the event loop's internal clock. 

92 

93 :return: the clock value (seconds) 

94 """ 

95 

96 @classmethod 

97 @abstractmethod 

98 def cancelled_exception_class(cls) -> type[BaseException]: 

99 """Return the exception class that is raised in a task if it's cancelled.""" 

100 

101 @classmethod 

102 @abstractmethod 

103 async def checkpoint(cls) -> None: 

104 """ 

105 Check if the task has been cancelled, and allow rescheduling of other tasks. 

106 

107 This is effectively the same as running :meth:`checkpoint_if_cancelled` and then 

108 :meth:`cancel_shielded_checkpoint`. 

109 """ 

110 

111 @classmethod 

112 async def checkpoint_if_cancelled(cls) -> None: 

113 """ 

114 Check if the current task group has been cancelled. 

115 

116 This will check if the task has been cancelled, but will not allow other tasks 

117 to be scheduled if not. 

118 

119 """ 

120 if cls.current_effective_deadline() == -math.inf: 

121 await cls.checkpoint() 

122 

123 @classmethod 

124 async def cancel_shielded_checkpoint(cls) -> None: 

125 """ 

126 Allow the rescheduling of other tasks. 

127 

128 This will give other tasks the opportunity to run, but without checking if the 

129 current task group has been cancelled, unlike with :meth:`checkpoint`. 

130 

131 """ 

132 with cls.create_cancel_scope(shield=True): 

133 await cls.sleep(0) 

134 

135 @classmethod 

136 @abstractmethod 

137 async def sleep(cls, delay: float) -> None: 

138 """ 

139 Pause the current task for the specified duration. 

140 

141 :param delay: the duration, in seconds 

142 """ 

143 

144 @classmethod 

145 @abstractmethod 

146 def create_cancel_scope( 

147 cls, *, deadline: float = math.inf, shield: bool = False 

148 ) -> CancelScope: 

149 pass 

150 

151 @classmethod 

152 @abstractmethod 

153 def current_effective_deadline(cls) -> float: 

154 """ 

155 Return the nearest deadline among all the cancel scopes effective for the 

156 current task. 

157 

158 :return: 

159 - a clock value from the event loop's internal clock 

160 - ``inf`` if there is no deadline in effect 

161 - ``-inf`` if the current scope has been cancelled 

162 :rtype: float 

163 """ 

164 

165 @classmethod 

166 @abstractmethod 

167 def create_task_group(cls) -> TaskGroup: 

168 pass 

169 

170 @classmethod 

171 @abstractmethod 

172 def create_event(cls) -> Event: 

173 pass 

174 

175 @classmethod 

176 @abstractmethod 

177 def create_lock(cls, *, fast_acquire: bool) -> Lock: 

178 pass 

179 

180 @classmethod 

181 @abstractmethod 

182 def create_semaphore( 

183 cls, 

184 initial_value: int, 

185 *, 

186 max_value: int | None = None, 

187 fast_acquire: bool = False, 

188 ) -> Semaphore: 

189 pass 

190 

191 @classmethod 

192 @abstractmethod 

193 def create_capacity_limiter(cls, total_tokens: float) -> CapacityLimiter: 

194 pass 

195 

196 @classmethod 

197 @abstractmethod 

198 async def run_sync_in_worker_thread( 

199 cls, 

200 func: Callable[[Unpack[PosArgsT]], T_Retval], 

201 args: tuple[Unpack[PosArgsT]], 

202 abandon_on_cancel: bool = False, 

203 limiter: CapacityLimiter | None = None, 

204 ) -> T_Retval: 

205 pass 

206 

207 @classmethod 

208 @abstractmethod 

209 def check_cancelled(cls) -> None: 

210 pass 

211 

212 @classmethod 

213 @abstractmethod 

214 def run_async_from_thread( 

215 cls, 

216 func: Callable[[Unpack[PosArgsT]], Awaitable[T_Retval]], 

217 args: tuple[Unpack[PosArgsT]], 

218 token: object, 

219 ) -> T_Retval: 

220 pass 

221 

222 @classmethod 

223 @abstractmethod 

224 def run_sync_from_thread( 

225 cls, 

226 func: Callable[[Unpack[PosArgsT]], T_Retval], 

227 args: tuple[Unpack[PosArgsT]], 

228 token: object, 

229 ) -> T_Retval: 

230 pass 

231 

232 @classmethod 

233 @abstractmethod 

234 def create_blocking_portal(cls) -> BlockingPortal: 

235 pass 

236 

237 @classmethod 

238 @abstractmethod 

239 async def open_process( 

240 cls, 

241 command: StrOrBytesPath | Sequence[StrOrBytesPath], 

242 *, 

243 stdin: int | IO[Any] | None, 

244 stdout: int | IO[Any] | None, 

245 stderr: int | IO[Any] | None, 

246 **kwargs: Any, 

247 ) -> Process: 

248 pass 

249 

250 @classmethod 

251 @abstractmethod 

252 def setup_process_pool_exit_at_shutdown(cls, workers: set[Process]) -> None: 

253 pass 

254 

255 @classmethod 

256 @abstractmethod 

257 async def connect_tcp( 

258 cls, host: str, port: int, local_address: IPSockAddrType | None = None 

259 ) -> SocketStream: 

260 pass 

261 

262 @classmethod 

263 @abstractmethod 

264 async def connect_unix(cls, path: str | bytes) -> UNIXSocketStream: 

265 pass 

266 

267 @classmethod 

268 @abstractmethod 

269 def create_tcp_listener(cls, sock: socket) -> SocketListener: 

270 pass 

271 

272 @classmethod 

273 @abstractmethod 

274 def create_unix_listener(cls, sock: socket) -> SocketListener: 

275 pass 

276 

277 @classmethod 

278 @abstractmethod 

279 async def create_udp_socket( 

280 cls, 

281 family: AddressFamily, 

282 local_address: IPSockAddrType | None, 

283 remote_address: IPSockAddrType | None, 

284 reuse_port: bool, 

285 ) -> UDPSocket | ConnectedUDPSocket: 

286 pass 

287 

288 @classmethod 

289 @overload 

290 async def create_unix_datagram_socket( 

291 cls, raw_socket: socket, remote_path: None 

292 ) -> UNIXDatagramSocket: ... 

293 

294 @classmethod 

295 @overload 

296 async def create_unix_datagram_socket( 

297 cls, raw_socket: socket, remote_path: str | bytes 

298 ) -> ConnectedUNIXDatagramSocket: ... 

299 

300 @classmethod 

301 @abstractmethod 

302 async def create_unix_datagram_socket( 

303 cls, raw_socket: socket, remote_path: str | bytes | None 

304 ) -> UNIXDatagramSocket | ConnectedUNIXDatagramSocket: 

305 pass 

306 

307 @classmethod 

308 @abstractmethod 

309 async def getaddrinfo( 

310 cls, 

311 host: bytes | str | None, 

312 port: str | int | None, 

313 *, 

314 family: int | AddressFamily = 0, 

315 type: int | SocketKind = 0, 

316 proto: int = 0, 

317 flags: int = 0, 

318 ) -> Sequence[ 

319 tuple[ 

320 AddressFamily, 

321 SocketKind, 

322 int, 

323 str, 

324 tuple[str, int] | tuple[str, int, int, int] | tuple[int, bytes], 

325 ] 

326 ]: 

327 pass 

328 

329 @classmethod 

330 @abstractmethod 

331 async def getnameinfo( 

332 cls, sockaddr: IPSockAddrType, flags: int = 0 

333 ) -> tuple[str, str]: 

334 pass 

335 

336 @classmethod 

337 @abstractmethod 

338 async def wait_readable(cls, obj: HasFileno | int) -> None: 

339 pass 

340 

341 @classmethod 

342 @abstractmethod 

343 async def wait_writable(cls, obj: HasFileno | int) -> None: 

344 pass 

345 

346 @classmethod 

347 @abstractmethod 

348 def current_default_thread_limiter(cls) -> CapacityLimiter: 

349 pass 

350 

351 @classmethod 

352 @abstractmethod 

353 def open_signal_receiver( 

354 cls, *signals: Signals 

355 ) -> AbstractContextManager[AsyncIterator[Signals]]: 

356 pass 

357 

358 @classmethod 

359 @abstractmethod 

360 def get_current_task(cls) -> TaskInfo: 

361 pass 

362 

363 @classmethod 

364 @abstractmethod 

365 def get_running_tasks(cls) -> Sequence[TaskInfo]: 

366 pass 

367 

368 @classmethod 

369 @abstractmethod 

370 async def wait_all_tasks_blocked(cls) -> None: 

371 pass 

372 

373 @classmethod 

374 @abstractmethod 

375 def create_test_runner(cls, options: dict[str, Any]) -> TestRunner: 

376 pass