Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/anyio/from_thread.py: 29%

180 statements  

« prev     ^ index     » next       coverage.py v7.3.1, created at 2023-09-25 06:38 +0000

1from __future__ import annotations 

2 

3import threading 

4from collections.abc import Awaitable, Callable, Generator 

5from concurrent.futures import FIRST_COMPLETED, Future, ThreadPoolExecutor, wait 

6from contextlib import AbstractContextManager, contextmanager 

7from inspect import isawaitable 

8from types import TracebackType 

9from typing import ( 

10 Any, 

11 AsyncContextManager, 

12 ContextManager, 

13 Generic, 

14 Iterable, 

15 TypeVar, 

16 cast, 

17 overload, 

18) 

19 

20from ._core import _eventloop 

21from ._core._eventloop import get_async_backend, get_cancelled_exc_class, threadlocals 

22from ._core._synchronization import Event 

23from ._core._tasks import CancelScope, create_task_group 

24from .abc._tasks import TaskStatus 

25 

26T_Retval = TypeVar("T_Retval") 

27T_co = TypeVar("T_co") 

28 

29 

30def run(func: Callable[..., Awaitable[T_Retval]], *args: object) -> T_Retval: 

31 """ 

32 Call a coroutine function from a worker thread. 

33 

34 :param func: a coroutine function 

35 :param args: positional arguments for the callable 

36 :return: the return value of the coroutine function 

37 

38 """ 

39 try: 

40 async_backend = threadlocals.current_async_backend 

41 token = threadlocals.current_token 

42 except AttributeError: 

43 raise RuntimeError("This function can only be run from an AnyIO worker thread") 

44 

45 return async_backend.run_async_from_thread(func, args, token=token) 

46 

47 

48def run_sync(func: Callable[..., T_Retval], *args: object) -> T_Retval: 

49 """ 

50 Call a function in the event loop thread from a worker thread. 

51 

52 :param func: a callable 

53 :param args: positional arguments for the callable 

54 :return: the return value of the callable 

55 

56 """ 

57 try: 

58 async_backend = threadlocals.current_async_backend 

59 token = threadlocals.current_token 

60 except AttributeError: 

61 raise RuntimeError("This function can only be run from an AnyIO worker thread") 

62 

63 return async_backend.run_sync_from_thread(func, args, token=token) 

64 

65 

66class _BlockingAsyncContextManager(Generic[T_co], AbstractContextManager): 

67 _enter_future: Future 

68 _exit_future: Future 

69 _exit_event: Event 

70 _exit_exc_info: tuple[ 

71 type[BaseException] | None, BaseException | None, TracebackType | None 

72 ] = (None, None, None) 

73 

74 def __init__(self, async_cm: AsyncContextManager[T_co], portal: BlockingPortal): 

75 self._async_cm = async_cm 

76 self._portal = portal 

77 

78 async def run_async_cm(self) -> bool | None: 

79 try: 

80 self._exit_event = Event() 

81 value = await self._async_cm.__aenter__() 

82 except BaseException as exc: 

83 self._enter_future.set_exception(exc) 

84 raise 

85 else: 

86 self._enter_future.set_result(value) 

87 

88 try: 

89 # Wait for the sync context manager to exit. 

90 # This next statement can raise `get_cancelled_exc_class()` if 

91 # something went wrong in a task group in this async context 

92 # manager. 

93 await self._exit_event.wait() 

94 finally: 

95 # In case of cancellation, it could be that we end up here before 

96 # `_BlockingAsyncContextManager.__exit__` is called, and an 

97 # `_exit_exc_info` has been set. 

98 result = await self._async_cm.__aexit__(*self._exit_exc_info) 

99 return result 

100 

101 def __enter__(self) -> T_co: 

102 self._enter_future = Future() 

103 self._exit_future = self._portal.start_task_soon(self.run_async_cm) 

104 cm = self._enter_future.result() 

105 return cast(T_co, cm) 

106 

107 def __exit__( 

108 self, 

109 __exc_type: type[BaseException] | None, 

110 __exc_value: BaseException | None, 

111 __traceback: TracebackType | None, 

112 ) -> bool | None: 

113 self._exit_exc_info = __exc_type, __exc_value, __traceback 

114 self._portal.call(self._exit_event.set) 

115 return self._exit_future.result() 

116 

117 

118class _BlockingPortalTaskStatus(TaskStatus): 

119 def __init__(self, future: Future): 

120 self._future = future 

121 

122 def started(self, value: object = None) -> None: 

123 self._future.set_result(value) 

124 

125 

126class BlockingPortal: 

127 """An object that lets external threads run code in an asynchronous event loop.""" 

128 

129 def __new__(cls) -> BlockingPortal: 

130 return get_async_backend().create_blocking_portal() 

131 

132 def __init__(self) -> None: 

133 self._event_loop_thread_id: int | None = threading.get_ident() 

134 self._stop_event = Event() 

135 self._task_group = create_task_group() 

136 self._cancelled_exc_class = get_cancelled_exc_class() 

137 

138 async def __aenter__(self) -> BlockingPortal: 

139 await self._task_group.__aenter__() 

140 return self 

141 

142 async def __aexit__( 

143 self, 

144 exc_type: type[BaseException] | None, 

145 exc_val: BaseException | None, 

146 exc_tb: TracebackType | None, 

147 ) -> bool | None: 

148 await self.stop() 

149 return await self._task_group.__aexit__(exc_type, exc_val, exc_tb) 

150 

151 def _check_running(self) -> None: 

152 if self._event_loop_thread_id is None: 

153 raise RuntimeError("This portal is not running") 

154 if self._event_loop_thread_id == threading.get_ident(): 

155 raise RuntimeError( 

156 "This method cannot be called from the event loop thread" 

157 ) 

158 

159 async def sleep_until_stopped(self) -> None: 

160 """Sleep until :meth:`stop` is called.""" 

161 await self._stop_event.wait() 

162 

163 async def stop(self, cancel_remaining: bool = False) -> None: 

164 """ 

165 Signal the portal to shut down. 

166 

167 This marks the portal as no longer accepting new calls and exits from 

168 :meth:`sleep_until_stopped`. 

169 

170 :param cancel_remaining: ``True`` to cancel all the remaining tasks, ``False`` 

171 to let them finish before returning 

172 

173 """ 

174 self._event_loop_thread_id = None 

175 self._stop_event.set() 

176 if cancel_remaining: 

177 self._task_group.cancel_scope.cancel() 

178 

179 async def _call_func( 

180 self, func: Callable, args: tuple, kwargs: dict[str, Any], future: Future 

181 ) -> None: 

182 def callback(f: Future) -> None: 

183 if f.cancelled() and self._event_loop_thread_id not in ( 

184 None, 

185 threading.get_ident(), 

186 ): 

187 self.call(scope.cancel) 

188 

189 try: 

190 retval = func(*args, **kwargs) 

191 if isawaitable(retval): 

192 with CancelScope() as scope: 

193 if future.cancelled(): 

194 scope.cancel() 

195 else: 

196 future.add_done_callback(callback) 

197 

198 retval = await retval 

199 except self._cancelled_exc_class: 

200 future.cancel() 

201 future.set_running_or_notify_cancel() 

202 except BaseException as exc: 

203 if not future.cancelled(): 

204 future.set_exception(exc) 

205 

206 # Let base exceptions fall through 

207 if not isinstance(exc, Exception): 

208 raise 

209 else: 

210 if not future.cancelled(): 

211 future.set_result(retval) 

212 finally: 

213 scope = None # type: ignore[assignment] 

214 

215 def _spawn_task_from_thread( 

216 self, 

217 func: Callable, 

218 args: tuple[Any, ...], 

219 kwargs: dict[str, Any], 

220 name: object, 

221 future: Future, 

222 ) -> None: 

223 """ 

224 Spawn a new task using the given callable. 

225 

226 Implementors must ensure that the future is resolved when the task finishes. 

227 

228 :param func: a callable 

229 :param args: positional arguments to be passed to the callable 

230 :param kwargs: keyword arguments to be passed to the callable 

231 :param name: name of the task (will be coerced to a string if not ``None``) 

232 :param future: a future that will resolve to the return value of the callable, 

233 or the exception raised during its execution 

234 

235 """ 

236 raise NotImplementedError 

237 

238 @overload 

239 def call(self, func: Callable[..., Awaitable[T_Retval]], *args: object) -> T_Retval: 

240 ... 

241 

242 @overload 

243 def call(self, func: Callable[..., T_Retval], *args: object) -> T_Retval: 

244 ... 

245 

246 def call( 

247 self, 

248 func: Callable[..., Awaitable[T_Retval] | T_Retval], 

249 *args: object, 

250 ) -> T_Retval: 

251 """ 

252 Call the given function in the event loop thread. 

253 

254 If the callable returns a coroutine object, it is awaited on. 

255 

256 :param func: any callable 

257 :raises RuntimeError: if the portal is not running or if this method is called 

258 from within the event loop thread 

259 

260 """ 

261 return cast(T_Retval, self.start_task_soon(func, *args).result()) 

262 

263 @overload 

264 def start_task_soon( 

265 self, 

266 func: Callable[..., Awaitable[T_Retval]], 

267 *args: object, 

268 name: object = None, 

269 ) -> Future[T_Retval]: 

270 ... 

271 

272 @overload 

273 def start_task_soon( 

274 self, func: Callable[..., T_Retval], *args: object, name: object = None 

275 ) -> Future[T_Retval]: 

276 ... 

277 

278 def start_task_soon( 

279 self, 

280 func: Callable[..., Awaitable[T_Retval] | T_Retval], 

281 *args: object, 

282 name: object = None, 

283 ) -> Future[T_Retval]: 

284 """ 

285 Start a task in the portal's task group. 

286 

287 The task will be run inside a cancel scope which can be cancelled by cancelling 

288 the returned future. 

289 

290 :param func: the target function 

291 :param args: positional arguments passed to ``func`` 

292 :param name: name of the task (will be coerced to a string if not ``None``) 

293 :return: a future that resolves with the return value of the callable if the 

294 task completes successfully, or with the exception raised in the task 

295 :raises RuntimeError: if the portal is not running or if this method is called 

296 from within the event loop thread 

297 :rtype: concurrent.futures.Future[T_Retval] 

298 

299 .. versionadded:: 3.0 

300 

301 """ 

302 self._check_running() 

303 f: Future = Future() 

304 self._spawn_task_from_thread(func, args, {}, name, f) 

305 return f 

306 

307 def start_task( 

308 self, 

309 func: Callable[..., Awaitable[Any]], 

310 *args: object, 

311 name: object = None, 

312 ) -> tuple[Future[Any], Any]: 

313 """ 

314 Start a task in the portal's task group and wait until it signals for readiness. 

315 

316 This method works the same way as :meth:`.abc.TaskGroup.start`. 

317 

318 :param func: the target function 

319 :param args: positional arguments passed to ``func`` 

320 :param name: name of the task (will be coerced to a string if not ``None``) 

321 :return: a tuple of (future, task_status_value) where the ``task_status_value`` 

322 is the value passed to ``task_status.started()`` from within the target 

323 function 

324 :rtype: tuple[concurrent.futures.Future[Any], Any] 

325 

326 .. versionadded:: 3.0 

327 

328 """ 

329 

330 def task_done(future: Future) -> None: 

331 if not task_status_future.done(): 

332 if future.cancelled(): 

333 task_status_future.cancel() 

334 elif future.exception(): 

335 task_status_future.set_exception(future.exception()) 

336 else: 

337 exc = RuntimeError( 

338 "Task exited without calling task_status.started()" 

339 ) 

340 task_status_future.set_exception(exc) 

341 

342 self._check_running() 

343 task_status_future: Future = Future() 

344 task_status = _BlockingPortalTaskStatus(task_status_future) 

345 f: Future = Future() 

346 f.add_done_callback(task_done) 

347 self._spawn_task_from_thread(func, args, {"task_status": task_status}, name, f) 

348 return f, task_status_future.result() 

349 

350 def wrap_async_context_manager( 

351 self, cm: AsyncContextManager[T_co] 

352 ) -> ContextManager[T_co]: 

353 """ 

354 Wrap an async context manager as a synchronous context manager via this portal. 

355 

356 Spawns a task that will call both ``__aenter__()`` and ``__aexit__()``, stopping 

357 in the middle until the synchronous context manager exits. 

358 

359 :param cm: an asynchronous context manager 

360 :return: a synchronous context manager 

361 

362 .. versionadded:: 2.1 

363 

364 """ 

365 return _BlockingAsyncContextManager(cm, self) 

366 

367 

368@contextmanager 

369def start_blocking_portal( 

370 backend: str = "asyncio", backend_options: dict[str, Any] | None = None 

371) -> Generator[BlockingPortal, Any, None]: 

372 """ 

373 Start a new event loop in a new thread and run a blocking portal in its main task. 

374 

375 The parameters are the same as for :func:`~anyio.run`. 

376 

377 :param backend: name of the backend 

378 :param backend_options: backend options 

379 :return: a context manager that yields a blocking portal 

380 

381 .. versionchanged:: 3.0 

382 Usage as a context manager is now required. 

383 

384 """ 

385 

386 async def run_portal() -> None: 

387 async with BlockingPortal() as portal_: 

388 if future.set_running_or_notify_cancel(): 

389 future.set_result(portal_) 

390 await portal_.sleep_until_stopped() 

391 

392 future: Future[BlockingPortal] = Future() 

393 with ThreadPoolExecutor(1) as executor: 

394 run_future = executor.submit( 

395 _eventloop.run, 

396 run_portal, # type: ignore[arg-type] 

397 backend=backend, 

398 backend_options=backend_options, 

399 ) 

400 try: 

401 wait( 

402 cast(Iterable[Future], [run_future, future]), 

403 return_when=FIRST_COMPLETED, 

404 ) 

405 except BaseException: 

406 future.cancel() 

407 run_future.cancel() 

408 raise 

409 

410 if future.done(): 

411 portal = future.result() 

412 cancel_remaining_tasks = False 

413 try: 

414 yield portal 

415 except BaseException: 

416 cancel_remaining_tasks = True 

417 raise 

418 finally: 

419 try: 

420 portal.call(portal.stop, cancel_remaining_tasks) 

421 except RuntimeError: 

422 pass 

423 

424 run_future.result()