Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/anyio/abc/_eventloop.py: 75%

169 statements  

« prev     ^ index     » next       coverage.py v7.3.1, created at 2023-09-25 06:38 +0000

1from __future__ import annotations 

2 

3import math 

4from abc import ABCMeta, abstractmethod 

5from collections.abc import AsyncIterator, Awaitable, Mapping 

6from os import PathLike 

7from signal import Signals 

8from socket import AddressFamily, SocketKind, socket 

9from typing import ( 

10 IO, 

11 TYPE_CHECKING, 

12 Any, 

13 Callable, 

14 ContextManager, 

15 Sequence, 

16 TypeVar, 

17 overload, 

18) 

19 

20if TYPE_CHECKING: 

21 from typing import Literal 

22 

23 from .._core._synchronization import CapacityLimiter, Event 

24 from .._core._tasks import CancelScope 

25 from .._core._testing import TaskInfo 

26 from ..from_thread import BlockingPortal 

27 from ._sockets import ( 

28 ConnectedUDPSocket, 

29 ConnectedUNIXDatagramSocket, 

30 IPSockAddrType, 

31 SocketListener, 

32 SocketStream, 

33 UDPSocket, 

34 UNIXDatagramSocket, 

35 UNIXSocketStream, 

36 ) 

37 from ._subprocesses import Process 

38 from ._tasks import TaskGroup 

39 from ._testing import TestRunner 

40 

41T_Retval = TypeVar("T_Retval") 

42 

43 

44class AsyncBackend(metaclass=ABCMeta): 

45 @classmethod 

46 @abstractmethod 

47 def run( 

48 cls, 

49 func: Callable[..., Awaitable[T_Retval]], 

50 args: tuple[Any, ...], 

51 kwargs: dict[str, Any], 

52 options: dict[str, Any], 

53 ) -> T_Retval: 

54 """ 

55 Run the given coroutine function in an asynchronous event loop. 

56 

57 The current thread must not be already running an event loop. 

58 

59 :param func: a coroutine function 

60 :param args: positional arguments to ``func`` 

61 :param kwargs: positional arguments to ``func`` 

62 :param options: keyword arguments to call the backend ``run()`` implementation 

63 with 

64 :return: the return value of the coroutine function 

65 """ 

66 

67 @classmethod 

68 @abstractmethod 

69 def current_token(cls) -> object: 

70 """ 

71 

72 :return: 

73 """ 

74 

75 @classmethod 

76 @abstractmethod 

77 def current_time(cls) -> float: 

78 """ 

79 Return the current value of the event loop's internal clock. 

80 

81 :return: the clock value (seconds) 

82 """ 

83 

84 @classmethod 

85 @abstractmethod 

86 def cancelled_exception_class(cls) -> type[BaseException]: 

87 """Return the exception class that is raised in a task if it's cancelled.""" 

88 

89 @classmethod 

90 @abstractmethod 

91 async def checkpoint(cls) -> None: 

92 """ 

93 Check if the task has been cancelled, and allow rescheduling of other tasks. 

94 

95 This is effectively the same as running :meth:`checkpoint_if_cancelled` and then 

96 :meth:`cancel_shielded_checkpoint`. 

97 """ 

98 

99 @classmethod 

100 async def checkpoint_if_cancelled(cls) -> None: 

101 """ 

102 Check if the current task group has been cancelled. 

103 

104 This will check if the task has been cancelled, but will not allow other tasks 

105 to be scheduled if not. 

106 

107 """ 

108 if cls.current_effective_deadline() == -math.inf: 

109 await cls.checkpoint() 

110 

111 @classmethod 

112 async def cancel_shielded_checkpoint(cls) -> None: 

113 """ 

114 Allow the rescheduling of other tasks. 

115 

116 This will give other tasks the opportunity to run, but without checking if the 

117 current task group has been cancelled, unlike with :meth:`checkpoint`. 

118 

119 """ 

120 with cls.create_cancel_scope(shield=True): 

121 await cls.sleep(0) 

122 

123 @classmethod 

124 @abstractmethod 

125 async def sleep(cls, delay: float) -> None: 

126 """ 

127 Pause the current task for the specified duration. 

128 

129 :param delay: the duration, in seconds 

130 """ 

131 

132 @classmethod 

133 @abstractmethod 

134 def create_cancel_scope( 

135 cls, *, deadline: float = math.inf, shield: bool = False 

136 ) -> CancelScope: 

137 pass 

138 

139 @classmethod 

140 @abstractmethod 

141 def current_effective_deadline(cls) -> float: 

142 """ 

143 Return the nearest deadline among all the cancel scopes effective for the 

144 current task. 

145 

146 :return: 

147 - a clock value from the event loop's internal clock 

148 - ``inf`` if there is no deadline in effect 

149 - ``-inf`` if the current scope has been cancelled 

150 :rtype: float 

151 """ 

152 

153 @classmethod 

154 @abstractmethod 

155 def create_task_group(cls) -> TaskGroup: 

156 pass 

157 

158 @classmethod 

159 @abstractmethod 

160 def create_event(cls) -> Event: 

161 pass 

162 

163 @classmethod 

164 @abstractmethod 

165 def create_capacity_limiter(cls, total_tokens: float) -> CapacityLimiter: 

166 pass 

167 

168 @classmethod 

169 @abstractmethod 

170 async def run_sync_in_worker_thread( 

171 cls, 

172 func: Callable[..., T_Retval], 

173 args: tuple[Any, ...], 

174 cancellable: bool = False, 

175 limiter: CapacityLimiter | None = None, 

176 ) -> T_Retval: 

177 pass 

178 

179 @classmethod 

180 @abstractmethod 

181 def run_async_from_thread( 

182 cls, 

183 func: Callable[..., Awaitable[T_Retval]], 

184 args: tuple[Any], 

185 token: object, 

186 ) -> T_Retval: 

187 pass 

188 

189 @classmethod 

190 @abstractmethod 

191 def run_sync_from_thread( 

192 cls, func: Callable[..., T_Retval], args: tuple[Any, ...], token: object 

193 ) -> T_Retval: 

194 pass 

195 

196 @classmethod 

197 @abstractmethod 

198 def create_blocking_portal(cls) -> BlockingPortal: 

199 pass 

200 

201 @classmethod 

202 @overload 

203 async def open_process( 

204 cls, 

205 command: str | bytes, 

206 *, 

207 shell: Literal[True], 

208 stdin: int | IO[Any] | None, 

209 stdout: int | IO[Any] | None, 

210 stderr: int | IO[Any] | None, 

211 cwd: str | bytes | PathLike[str] | None = None, 

212 env: Mapping[str, str] | None = None, 

213 start_new_session: bool = False, 

214 ) -> Process: 

215 pass 

216 

217 @classmethod 

218 @overload 

219 async def open_process( 

220 cls, 

221 command: Sequence[str | bytes], 

222 *, 

223 shell: Literal[False], 

224 stdin: int | IO[Any] | None, 

225 stdout: int | IO[Any] | None, 

226 stderr: int | IO[Any] | None, 

227 cwd: str | bytes | PathLike[str] | None = None, 

228 env: Mapping[str, str] | None = None, 

229 start_new_session: bool = False, 

230 ) -> Process: 

231 pass 

232 

233 @classmethod 

234 @abstractmethod 

235 async def open_process( 

236 cls, 

237 command: str | bytes | Sequence[str | bytes], 

238 *, 

239 shell: bool, 

240 stdin: int | IO[Any] | None, 

241 stdout: int | IO[Any] | None, 

242 stderr: int | IO[Any] | None, 

243 cwd: str | bytes | PathLike[str] | None = None, 

244 env: Mapping[str, str] | None = None, 

245 start_new_session: bool = False, 

246 ) -> Process: 

247 pass 

248 

249 @classmethod 

250 @abstractmethod 

251 def setup_process_pool_exit_at_shutdown(cls, workers: set[Process]) -> None: 

252 pass 

253 

254 @classmethod 

255 @abstractmethod 

256 async def connect_tcp( 

257 cls, host: str, port: int, local_address: IPSockAddrType | None = None 

258 ) -> SocketStream: 

259 pass 

260 

261 @classmethod 

262 @abstractmethod 

263 async def connect_unix(cls, path: str) -> UNIXSocketStream: 

264 pass 

265 

266 @classmethod 

267 @abstractmethod 

268 def create_tcp_listener(cls, sock: socket) -> SocketListener: 

269 pass 

270 

271 @classmethod 

272 @abstractmethod 

273 def create_unix_listener(cls, sock: socket) -> SocketListener: 

274 pass 

275 

276 @classmethod 

277 @abstractmethod 

278 async def create_udp_socket( 

279 cls, 

280 family: AddressFamily, 

281 local_address: IPSockAddrType | None, 

282 remote_address: IPSockAddrType | None, 

283 reuse_port: bool, 

284 ) -> UDPSocket | ConnectedUDPSocket: 

285 pass 

286 

287 @classmethod 

288 @overload 

289 async def create_unix_datagram_socket( 

290 cls, raw_socket: socket, remote_path: None 

291 ) -> UNIXDatagramSocket: 

292 ... 

293 

294 @classmethod 

295 @overload 

296 async def create_unix_datagram_socket( 

297 cls, raw_socket: socket, remote_path: str 

298 ) -> ConnectedUNIXDatagramSocket: 

299 ... 

300 

301 @classmethod 

302 @abstractmethod 

303 async def create_unix_datagram_socket( 

304 cls, raw_socket: socket, remote_path: str | None 

305 ) -> UNIXDatagramSocket | ConnectedUNIXDatagramSocket: 

306 pass 

307 

308 @classmethod 

309 @abstractmethod 

310 async def getaddrinfo( 

311 cls, 

312 host: bytes | str | None, 

313 port: str | int | None, 

314 *, 

315 family: int | AddressFamily = 0, 

316 type: int | SocketKind = 0, 

317 proto: int = 0, 

318 flags: int = 0, 

319 ) -> list[ 

320 tuple[ 

321 AddressFamily, 

322 SocketKind, 

323 int, 

324 str, 

325 tuple[str, int] | tuple[str, int, int, int], 

326 ] 

327 ]: 

328 pass 

329 

330 @classmethod 

331 @abstractmethod 

332 async def getnameinfo( 

333 cls, sockaddr: IPSockAddrType, flags: int = 0 

334 ) -> tuple[str, str]: 

335 pass 

336 

337 @classmethod 

338 @abstractmethod 

339 async def wait_socket_readable(cls, sock: socket) -> None: 

340 pass 

341 

342 @classmethod 

343 @abstractmethod 

344 async def wait_socket_writable(cls, sock: socket) -> None: 

345 pass 

346 

347 @classmethod 

348 @abstractmethod 

349 def current_default_thread_limiter(cls) -> CapacityLimiter: 

350 pass 

351 

352 @classmethod 

353 @abstractmethod 

354 def open_signal_receiver( 

355 cls, *signals: Signals 

356 ) -> ContextManager[AsyncIterator[Signals]]: 

357 pass 

358 

359 @classmethod 

360 @abstractmethod 

361 def get_current_task(cls) -> TaskInfo: 

362 pass 

363 

364 @classmethod 

365 @abstractmethod 

366 def get_running_tasks(cls) -> list[TaskInfo]: 

367 pass 

368 

369 @classmethod 

370 @abstractmethod 

371 async def wait_all_tasks_blocked(cls) -> None: 

372 pass 

373 

374 @classmethod 

375 @abstractmethod 

376 def create_test_runner(cls, options: dict[str, Any]) -> TestRunner: 

377 pass