Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/anyio/abc/_eventloop.py: 75%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

178 statements  

1from __future__ import annotations 

2 

3import math 

4import sys 

5from abc import ABCMeta, abstractmethod 

6from collections.abc import AsyncIterator, Awaitable, Mapping 

7from os import PathLike 

8from signal import Signals 

9from socket import AddressFamily, SocketKind, socket 

10from typing import ( 

11 IO, 

12 TYPE_CHECKING, 

13 Any, 

14 Callable, 

15 ContextManager, 

16 Sequence, 

17 TypeVar, 

18 overload, 

19) 

20 

21if sys.version_info >= (3, 11): 

22 from typing import TypeVarTuple, Unpack 

23else: 

24 from typing_extensions import TypeVarTuple, Unpack 

25 

26if TYPE_CHECKING: 

27 from typing import Literal 

28 

29 from .._core._synchronization import CapacityLimiter, Event 

30 from .._core._tasks import CancelScope 

31 from .._core._testing import TaskInfo 

32 from ..from_thread import BlockingPortal 

33 from ._sockets import ( 

34 ConnectedUDPSocket, 

35 ConnectedUNIXDatagramSocket, 

36 IPSockAddrType, 

37 SocketListener, 

38 SocketStream, 

39 UDPSocket, 

40 UNIXDatagramSocket, 

41 UNIXSocketStream, 

42 ) 

43 from ._subprocesses import Process 

44 from ._tasks import TaskGroup 

45 from ._testing import TestRunner 

46 

47T_Retval = TypeVar("T_Retval") 

48PosArgsT = TypeVarTuple("PosArgsT") 

49 

50 

51class AsyncBackend(metaclass=ABCMeta): 

52 @classmethod 

53 @abstractmethod 

54 def run( 

55 cls, 

56 func: Callable[[Unpack[PosArgsT]], Awaitable[T_Retval]], 

57 args: tuple[Unpack[PosArgsT]], 

58 kwargs: dict[str, Any], 

59 options: dict[str, Any], 

60 ) -> T_Retval: 

61 """ 

62 Run the given coroutine function in an asynchronous event loop. 

63 

64 The current thread must not be already running an event loop. 

65 

66 :param func: a coroutine function 

67 :param args: positional arguments to ``func`` 

68 :param kwargs: positional arguments to ``func`` 

69 :param options: keyword arguments to call the backend ``run()`` implementation 

70 with 

71 :return: the return value of the coroutine function 

72 """ 

73 

74 @classmethod 

75 @abstractmethod 

76 def current_token(cls) -> object: 

77 """ 

78 

79 :return: 

80 """ 

81 

82 @classmethod 

83 @abstractmethod 

84 def current_time(cls) -> float: 

85 """ 

86 Return the current value of the event loop's internal clock. 

87 

88 :return: the clock value (seconds) 

89 """ 

90 

91 @classmethod 

92 @abstractmethod 

93 def cancelled_exception_class(cls) -> type[BaseException]: 

94 """Return the exception class that is raised in a task if it's cancelled.""" 

95 

96 @classmethod 

97 @abstractmethod 

98 async def checkpoint(cls) -> None: 

99 """ 

100 Check if the task has been cancelled, and allow rescheduling of other tasks. 

101 

102 This is effectively the same as running :meth:`checkpoint_if_cancelled` and then 

103 :meth:`cancel_shielded_checkpoint`. 

104 """ 

105 

106 @classmethod 

107 async def checkpoint_if_cancelled(cls) -> None: 

108 """ 

109 Check if the current task group has been cancelled. 

110 

111 This will check if the task has been cancelled, but will not allow other tasks 

112 to be scheduled if not. 

113 

114 """ 

115 if cls.current_effective_deadline() == -math.inf: 

116 await cls.checkpoint() 

117 

118 @classmethod 

119 async def cancel_shielded_checkpoint(cls) -> None: 

120 """ 

121 Allow the rescheduling of other tasks. 

122 

123 This will give other tasks the opportunity to run, but without checking if the 

124 current task group has been cancelled, unlike with :meth:`checkpoint`. 

125 

126 """ 

127 with cls.create_cancel_scope(shield=True): 

128 await cls.sleep(0) 

129 

130 @classmethod 

131 @abstractmethod 

132 async def sleep(cls, delay: float) -> None: 

133 """ 

134 Pause the current task for the specified duration. 

135 

136 :param delay: the duration, in seconds 

137 """ 

138 

139 @classmethod 

140 @abstractmethod 

141 def create_cancel_scope( 

142 cls, *, deadline: float = math.inf, shield: bool = False 

143 ) -> CancelScope: 

144 pass 

145 

146 @classmethod 

147 @abstractmethod 

148 def current_effective_deadline(cls) -> float: 

149 """ 

150 Return the nearest deadline among all the cancel scopes effective for the 

151 current task. 

152 

153 :return: 

154 - a clock value from the event loop's internal clock 

155 - ``inf`` if there is no deadline in effect 

156 - ``-inf`` if the current scope has been cancelled 

157 :rtype: float 

158 """ 

159 

160 @classmethod 

161 @abstractmethod 

162 def create_task_group(cls) -> TaskGroup: 

163 pass 

164 

165 @classmethod 

166 @abstractmethod 

167 def create_event(cls) -> Event: 

168 pass 

169 

170 @classmethod 

171 @abstractmethod 

172 def create_capacity_limiter(cls, total_tokens: float) -> CapacityLimiter: 

173 pass 

174 

175 @classmethod 

176 @abstractmethod 

177 async def run_sync_in_worker_thread( 

178 cls, 

179 func: Callable[[Unpack[PosArgsT]], T_Retval], 

180 args: tuple[Unpack[PosArgsT]], 

181 abandon_on_cancel: bool = False, 

182 limiter: CapacityLimiter | None = None, 

183 ) -> T_Retval: 

184 pass 

185 

186 @classmethod 

187 @abstractmethod 

188 def check_cancelled(cls) -> None: 

189 pass 

190 

191 @classmethod 

192 @abstractmethod 

193 def run_async_from_thread( 

194 cls, 

195 func: Callable[[Unpack[PosArgsT]], Awaitable[T_Retval]], 

196 args: tuple[Unpack[PosArgsT]], 

197 token: object, 

198 ) -> T_Retval: 

199 pass 

200 

201 @classmethod 

202 @abstractmethod 

203 def run_sync_from_thread( 

204 cls, 

205 func: Callable[[Unpack[PosArgsT]], T_Retval], 

206 args: tuple[Unpack[PosArgsT]], 

207 token: object, 

208 ) -> T_Retval: 

209 pass 

210 

211 @classmethod 

212 @abstractmethod 

213 def create_blocking_portal(cls) -> BlockingPortal: 

214 pass 

215 

216 @classmethod 

217 @overload 

218 async def open_process( 

219 cls, 

220 command: str | bytes, 

221 *, 

222 shell: Literal[True], 

223 stdin: int | IO[Any] | None, 

224 stdout: int | IO[Any] | None, 

225 stderr: int | IO[Any] | None, 

226 cwd: str | bytes | PathLike[str] | None = None, 

227 env: Mapping[str, str] | None = None, 

228 start_new_session: bool = False, 

229 ) -> Process: 

230 pass 

231 

232 @classmethod 

233 @overload 

234 async def open_process( 

235 cls, 

236 command: Sequence[str | bytes], 

237 *, 

238 shell: Literal[False], 

239 stdin: int | IO[Any] | None, 

240 stdout: int | IO[Any] | None, 

241 stderr: int | IO[Any] | None, 

242 cwd: str | bytes | PathLike[str] | None = None, 

243 env: Mapping[str, str] | None = None, 

244 start_new_session: bool = False, 

245 ) -> Process: 

246 pass 

247 

248 @classmethod 

249 @abstractmethod 

250 async def open_process( 

251 cls, 

252 command: str | bytes | Sequence[str | bytes], 

253 *, 

254 shell: bool, 

255 stdin: int | IO[Any] | None, 

256 stdout: int | IO[Any] | None, 

257 stderr: int | IO[Any] | None, 

258 cwd: str | bytes | PathLike[str] | None = None, 

259 env: Mapping[str, str] | None = None, 

260 start_new_session: bool = False, 

261 ) -> Process: 

262 pass 

263 

264 @classmethod 

265 @abstractmethod 

266 def setup_process_pool_exit_at_shutdown(cls, workers: set[Process]) -> None: 

267 pass 

268 

269 @classmethod 

270 @abstractmethod 

271 async def connect_tcp( 

272 cls, host: str, port: int, local_address: IPSockAddrType | None = None 

273 ) -> SocketStream: 

274 pass 

275 

276 @classmethod 

277 @abstractmethod 

278 async def connect_unix(cls, path: str | bytes) -> UNIXSocketStream: 

279 pass 

280 

281 @classmethod 

282 @abstractmethod 

283 def create_tcp_listener(cls, sock: socket) -> SocketListener: 

284 pass 

285 

286 @classmethod 

287 @abstractmethod 

288 def create_unix_listener(cls, sock: socket) -> SocketListener: 

289 pass 

290 

291 @classmethod 

292 @abstractmethod 

293 async def create_udp_socket( 

294 cls, 

295 family: AddressFamily, 

296 local_address: IPSockAddrType | None, 

297 remote_address: IPSockAddrType | None, 

298 reuse_port: bool, 

299 ) -> UDPSocket | ConnectedUDPSocket: 

300 pass 

301 

302 @classmethod 

303 @overload 

304 async def create_unix_datagram_socket( 

305 cls, raw_socket: socket, remote_path: None 

306 ) -> UNIXDatagramSocket: 

307 ... 

308 

309 @classmethod 

310 @overload 

311 async def create_unix_datagram_socket( 

312 cls, raw_socket: socket, remote_path: str | bytes 

313 ) -> ConnectedUNIXDatagramSocket: 

314 ... 

315 

316 @classmethod 

317 @abstractmethod 

318 async def create_unix_datagram_socket( 

319 cls, raw_socket: socket, remote_path: str | bytes | None 

320 ) -> UNIXDatagramSocket | ConnectedUNIXDatagramSocket: 

321 pass 

322 

323 @classmethod 

324 @abstractmethod 

325 async def getaddrinfo( 

326 cls, 

327 host: bytes | str | None, 

328 port: str | int | None, 

329 *, 

330 family: int | AddressFamily = 0, 

331 type: int | SocketKind = 0, 

332 proto: int = 0, 

333 flags: int = 0, 

334 ) -> list[ 

335 tuple[ 

336 AddressFamily, 

337 SocketKind, 

338 int, 

339 str, 

340 tuple[str, int] | tuple[str, int, int, int], 

341 ] 

342 ]: 

343 pass 

344 

345 @classmethod 

346 @abstractmethod 

347 async def getnameinfo( 

348 cls, sockaddr: IPSockAddrType, flags: int = 0 

349 ) -> tuple[str, str]: 

350 pass 

351 

352 @classmethod 

353 @abstractmethod 

354 async def wait_socket_readable(cls, sock: socket) -> None: 

355 pass 

356 

357 @classmethod 

358 @abstractmethod 

359 async def wait_socket_writable(cls, sock: socket) -> None: 

360 pass 

361 

362 @classmethod 

363 @abstractmethod 

364 def current_default_thread_limiter(cls) -> CapacityLimiter: 

365 pass 

366 

367 @classmethod 

368 @abstractmethod 

369 def open_signal_receiver( 

370 cls, *signals: Signals 

371 ) -> ContextManager[AsyncIterator[Signals]]: 

372 pass 

373 

374 @classmethod 

375 @abstractmethod 

376 def get_current_task(cls) -> TaskInfo: 

377 pass 

378 

379 @classmethod 

380 @abstractmethod 

381 def get_running_tasks(cls) -> list[TaskInfo]: 

382 pass 

383 

384 @classmethod 

385 @abstractmethod 

386 async def wait_all_tasks_blocked(cls) -> None: 

387 pass 

388 

389 @classmethod 

390 @abstractmethod 

391 def create_test_runner(cls, options: dict[str, Any]) -> TestRunner: 

392 pass