Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/anyio/abc/_eventloop.py: 76%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

176 statements  

1from __future__ import annotations 

2 

3import math 

4import sys 

5from abc import ABCMeta, abstractmethod 

6from collections.abc import AsyncIterator, Awaitable, Mapping 

7from os import PathLike 

8from signal import Signals 

9from socket import AddressFamily, SocketKind, socket 

10from typing import ( 

11 IO, 

12 TYPE_CHECKING, 

13 Any, 

14 Callable, 

15 ContextManager, 

16 Sequence, 

17 TypeVar, 

18 overload, 

19) 

20 

21if sys.version_info >= (3, 11): 

22 from typing import TypeVarTuple, Unpack 

23else: 

24 from typing_extensions import TypeVarTuple, Unpack 

25 

26if TYPE_CHECKING: 

27 from typing import Literal 

28 

29 from .._core._synchronization import CapacityLimiter, Event 

30 from .._core._tasks import CancelScope 

31 from .._core._testing import TaskInfo 

32 from ..from_thread import BlockingPortal 

33 from ._sockets import ( 

34 ConnectedUDPSocket, 

35 ConnectedUNIXDatagramSocket, 

36 IPSockAddrType, 

37 SocketListener, 

38 SocketStream, 

39 UDPSocket, 

40 UNIXDatagramSocket, 

41 UNIXSocketStream, 

42 ) 

43 from ._subprocesses import Process 

44 from ._tasks import TaskGroup 

45 from ._testing import TestRunner 

46 

47T_Retval = TypeVar("T_Retval") 

48PosArgsT = TypeVarTuple("PosArgsT") 

49 

50 

51class AsyncBackend(metaclass=ABCMeta): 

52 @classmethod 

53 @abstractmethod 

54 def run( 

55 cls, 

56 func: Callable[[Unpack[PosArgsT]], Awaitable[T_Retval]], 

57 args: tuple[Unpack[PosArgsT]], 

58 kwargs: dict[str, Any], 

59 options: dict[str, Any], 

60 ) -> T_Retval: 

61 """ 

62 Run the given coroutine function in an asynchronous event loop. 

63 

64 The current thread must not be already running an event loop. 

65 

66 :param func: a coroutine function 

67 :param args: positional arguments to ``func`` 

68 :param kwargs: positional arguments to ``func`` 

69 :param options: keyword arguments to call the backend ``run()`` implementation 

70 with 

71 :return: the return value of the coroutine function 

72 """ 

73 

74 @classmethod 

75 @abstractmethod 

76 def current_token(cls) -> object: 

77 """ 

78 

79 :return: 

80 """ 

81 

82 @classmethod 

83 @abstractmethod 

84 def current_time(cls) -> float: 

85 """ 

86 Return the current value of the event loop's internal clock. 

87 

88 :return: the clock value (seconds) 

89 """ 

90 

91 @classmethod 

92 @abstractmethod 

93 def cancelled_exception_class(cls) -> type[BaseException]: 

94 """Return the exception class that is raised in a task if it's cancelled.""" 

95 

96 @classmethod 

97 @abstractmethod 

98 async def checkpoint(cls) -> None: 

99 """ 

100 Check if the task has been cancelled, and allow rescheduling of other tasks. 

101 

102 This is effectively the same as running :meth:`checkpoint_if_cancelled` and then 

103 :meth:`cancel_shielded_checkpoint`. 

104 """ 

105 

106 @classmethod 

107 async def checkpoint_if_cancelled(cls) -> None: 

108 """ 

109 Check if the current task group has been cancelled. 

110 

111 This will check if the task has been cancelled, but will not allow other tasks 

112 to be scheduled if not. 

113 

114 """ 

115 if cls.current_effective_deadline() == -math.inf: 

116 await cls.checkpoint() 

117 

118 @classmethod 

119 async def cancel_shielded_checkpoint(cls) -> None: 

120 """ 

121 Allow the rescheduling of other tasks. 

122 

123 This will give other tasks the opportunity to run, but without checking if the 

124 current task group has been cancelled, unlike with :meth:`checkpoint`. 

125 

126 """ 

127 with cls.create_cancel_scope(shield=True): 

128 await cls.sleep(0) 

129 

130 @classmethod 

131 @abstractmethod 

132 async def sleep(cls, delay: float) -> None: 

133 """ 

134 Pause the current task for the specified duration. 

135 

136 :param delay: the duration, in seconds 

137 """ 

138 

139 @classmethod 

140 @abstractmethod 

141 def create_cancel_scope( 

142 cls, *, deadline: float = math.inf, shield: bool = False 

143 ) -> CancelScope: 

144 pass 

145 

146 @classmethod 

147 @abstractmethod 

148 def current_effective_deadline(cls) -> float: 

149 """ 

150 Return the nearest deadline among all the cancel scopes effective for the 

151 current task. 

152 

153 :return: 

154 - a clock value from the event loop's internal clock 

155 - ``inf`` if there is no deadline in effect 

156 - ``-inf`` if the current scope has been cancelled 

157 :rtype: float 

158 """ 

159 

160 @classmethod 

161 @abstractmethod 

162 def create_task_group(cls) -> TaskGroup: 

163 pass 

164 

165 @classmethod 

166 @abstractmethod 

167 def create_event(cls) -> Event: 

168 pass 

169 

170 @classmethod 

171 @abstractmethod 

172 def create_capacity_limiter(cls, total_tokens: float) -> CapacityLimiter: 

173 pass 

174 

175 @classmethod 

176 @abstractmethod 

177 async def run_sync_in_worker_thread( 

178 cls, 

179 func: Callable[[Unpack[PosArgsT]], T_Retval], 

180 args: tuple[Unpack[PosArgsT]], 

181 abandon_on_cancel: bool = False, 

182 limiter: CapacityLimiter | None = None, 

183 ) -> T_Retval: 

184 pass 

185 

186 @classmethod 

187 @abstractmethod 

188 def check_cancelled(cls) -> None: 

189 pass 

190 

191 @classmethod 

192 @abstractmethod 

193 def run_async_from_thread( 

194 cls, 

195 func: Callable[[Unpack[PosArgsT]], Awaitable[T_Retval]], 

196 args: tuple[Unpack[PosArgsT]], 

197 token: object, 

198 ) -> T_Retval: 

199 pass 

200 

201 @classmethod 

202 @abstractmethod 

203 def run_sync_from_thread( 

204 cls, 

205 func: Callable[[Unpack[PosArgsT]], T_Retval], 

206 args: tuple[Unpack[PosArgsT]], 

207 token: object, 

208 ) -> T_Retval: 

209 pass 

210 

211 @classmethod 

212 @abstractmethod 

213 def create_blocking_portal(cls) -> BlockingPortal: 

214 pass 

215 

216 @classmethod 

217 @overload 

218 async def open_process( 

219 cls, 

220 command: str | bytes, 

221 *, 

222 shell: Literal[True], 

223 stdin: int | IO[Any] | None, 

224 stdout: int | IO[Any] | None, 

225 stderr: int | IO[Any] | None, 

226 cwd: str | bytes | PathLike[str] | None = None, 

227 env: Mapping[str, str] | None = None, 

228 start_new_session: bool = False, 

229 ) -> Process: 

230 pass 

231 

232 @classmethod 

233 @overload 

234 async def open_process( 

235 cls, 

236 command: Sequence[str | bytes], 

237 *, 

238 shell: Literal[False], 

239 stdin: int | IO[Any] | None, 

240 stdout: int | IO[Any] | None, 

241 stderr: int | IO[Any] | None, 

242 cwd: str | bytes | PathLike[str] | None = None, 

243 env: Mapping[str, str] | None = None, 

244 start_new_session: bool = False, 

245 ) -> Process: 

246 pass 

247 

248 @classmethod 

249 @abstractmethod 

250 async def open_process( 

251 cls, 

252 command: str | bytes | Sequence[str | bytes], 

253 *, 

254 shell: bool, 

255 stdin: int | IO[Any] | None, 

256 stdout: int | IO[Any] | None, 

257 stderr: int | IO[Any] | None, 

258 cwd: str | bytes | PathLike[str] | None = None, 

259 env: Mapping[str, str] | None = None, 

260 start_new_session: bool = False, 

261 ) -> Process: 

262 pass 

263 

264 @classmethod 

265 @abstractmethod 

266 def setup_process_pool_exit_at_shutdown(cls, workers: set[Process]) -> None: 

267 pass 

268 

269 @classmethod 

270 @abstractmethod 

271 async def connect_tcp( 

272 cls, host: str, port: int, local_address: IPSockAddrType | None = None 

273 ) -> SocketStream: 

274 pass 

275 

276 @classmethod 

277 @abstractmethod 

278 async def connect_unix(cls, path: str | bytes) -> UNIXSocketStream: 

279 pass 

280 

281 @classmethod 

282 @abstractmethod 

283 def create_tcp_listener(cls, sock: socket) -> SocketListener: 

284 pass 

285 

286 @classmethod 

287 @abstractmethod 

288 def create_unix_listener(cls, sock: socket) -> SocketListener: 

289 pass 

290 

291 @classmethod 

292 @abstractmethod 

293 async def create_udp_socket( 

294 cls, 

295 family: AddressFamily, 

296 local_address: IPSockAddrType | None, 

297 remote_address: IPSockAddrType | None, 

298 reuse_port: bool, 

299 ) -> UDPSocket | ConnectedUDPSocket: 

300 pass 

301 

302 @classmethod 

303 @overload 

304 async def create_unix_datagram_socket( 

305 cls, raw_socket: socket, remote_path: None 

306 ) -> UNIXDatagramSocket: ... 

307 

308 @classmethod 

309 @overload 

310 async def create_unix_datagram_socket( 

311 cls, raw_socket: socket, remote_path: str | bytes 

312 ) -> ConnectedUNIXDatagramSocket: ... 

313 

314 @classmethod 

315 @abstractmethod 

316 async def create_unix_datagram_socket( 

317 cls, raw_socket: socket, remote_path: str | bytes | None 

318 ) -> UNIXDatagramSocket | ConnectedUNIXDatagramSocket: 

319 pass 

320 

321 @classmethod 

322 @abstractmethod 

323 async def getaddrinfo( 

324 cls, 

325 host: bytes | str | None, 

326 port: str | int | None, 

327 *, 

328 family: int | AddressFamily = 0, 

329 type: int | SocketKind = 0, 

330 proto: int = 0, 

331 flags: int = 0, 

332 ) -> list[ 

333 tuple[ 

334 AddressFamily, 

335 SocketKind, 

336 int, 

337 str, 

338 tuple[str, int] | tuple[str, int, int, int], 

339 ] 

340 ]: 

341 pass 

342 

343 @classmethod 

344 @abstractmethod 

345 async def getnameinfo( 

346 cls, sockaddr: IPSockAddrType, flags: int = 0 

347 ) -> tuple[str, str]: 

348 pass 

349 

350 @classmethod 

351 @abstractmethod 

352 async def wait_socket_readable(cls, sock: socket) -> None: 

353 pass 

354 

355 @classmethod 

356 @abstractmethod 

357 async def wait_socket_writable(cls, sock: socket) -> None: 

358 pass 

359 

360 @classmethod 

361 @abstractmethod 

362 def current_default_thread_limiter(cls) -> CapacityLimiter: 

363 pass 

364 

365 @classmethod 

366 @abstractmethod 

367 def open_signal_receiver( 

368 cls, *signals: Signals 

369 ) -> ContextManager[AsyncIterator[Signals]]: 

370 pass 

371 

372 @classmethod 

373 @abstractmethod 

374 def get_current_task(cls) -> TaskInfo: 

375 pass 

376 

377 @classmethod 

378 @abstractmethod 

379 def get_running_tasks(cls) -> Sequence[TaskInfo]: 

380 pass 

381 

382 @classmethod 

383 @abstractmethod 

384 async def wait_all_tasks_blocked(cls) -> None: 

385 pass 

386 

387 @classmethod 

388 @abstractmethod 

389 def create_test_runner(cls, options: dict[str, Any]) -> TestRunner: 

390 pass