Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/anyio/from_thread.py: 32%

190 statements  

« prev     ^ index     » next       coverage.py v7.2.2, created at 2023-03-26 06:12 +0000

1import threading 

2from asyncio import iscoroutine 

3from concurrent.futures import FIRST_COMPLETED, Future, ThreadPoolExecutor, wait 

4from contextlib import AbstractContextManager, contextmanager 

5from types import TracebackType 

6from typing import ( 

7 Any, 

8 AsyncContextManager, 

9 Callable, 

10 ContextManager, 

11 Coroutine, 

12 Dict, 

13 Generator, 

14 Iterable, 

15 Optional, 

16 Tuple, 

17 Type, 

18 TypeVar, 

19 Union, 

20 cast, 

21 overload, 

22) 

23from warnings import warn 

24 

25from ._core import _eventloop 

26from ._core._eventloop import get_asynclib, get_cancelled_exc_class, threadlocals 

27from ._core._synchronization import Event 

28from ._core._tasks import CancelScope, create_task_group 

29from .abc._tasks import TaskStatus 

30 

31T_Retval = TypeVar("T_Retval") 

32T_co = TypeVar("T_co") 

33 

34 

35def run(func: Callable[..., Coroutine[Any, Any, T_Retval]], *args: object) -> T_Retval: 

36 """ 

37 Call a coroutine function from a worker thread. 

38 

39 :param func: a coroutine function 

40 :param args: positional arguments for the callable 

41 :return: the return value of the coroutine function 

42 

43 """ 

44 try: 

45 asynclib = threadlocals.current_async_module 

46 except AttributeError: 

47 raise RuntimeError("This function can only be run from an AnyIO worker thread") 

48 

49 return asynclib.run_async_from_thread(func, *args) 

50 

51 

52def run_async_from_thread( 

53 func: Callable[..., Coroutine[Any, Any, T_Retval]], *args: object 

54) -> T_Retval: 

55 warn( 

56 "run_async_from_thread() has been deprecated, use anyio.from_thread.run() instead", 

57 DeprecationWarning, 

58 ) 

59 return run(func, *args) 

60 

61 

62def run_sync(func: Callable[..., T_Retval], *args: object) -> T_Retval: 

63 """ 

64 Call a function in the event loop thread from a worker thread. 

65 

66 :param func: a callable 

67 :param args: positional arguments for the callable 

68 :return: the return value of the callable 

69 

70 """ 

71 try: 

72 asynclib = threadlocals.current_async_module 

73 except AttributeError: 

74 raise RuntimeError("This function can only be run from an AnyIO worker thread") 

75 

76 return asynclib.run_sync_from_thread(func, *args) 

77 

78 

79def run_sync_from_thread(func: Callable[..., T_Retval], *args: object) -> T_Retval: 

80 warn( 

81 "run_sync_from_thread() has been deprecated, use anyio.from_thread.run_sync() instead", 

82 DeprecationWarning, 

83 ) 

84 return run_sync(func, *args) 

85 

86 

87class _BlockingAsyncContextManager(AbstractContextManager): 

88 _enter_future: Future 

89 _exit_future: Future 

90 _exit_event: Event 

91 _exit_exc_info: Tuple[ 

92 Optional[Type[BaseException]], Optional[BaseException], Optional[TracebackType] 

93 ] = (None, None, None) 

94 

95 def __init__(self, async_cm: AsyncContextManager[T_co], portal: "BlockingPortal"): 

96 self._async_cm = async_cm 

97 self._portal = portal 

98 

99 async def run_async_cm(self) -> Optional[bool]: 

100 try: 

101 self._exit_event = Event() 

102 value = await self._async_cm.__aenter__() 

103 except BaseException as exc: 

104 self._enter_future.set_exception(exc) 

105 raise 

106 else: 

107 self._enter_future.set_result(value) 

108 

109 try: 

110 # Wait for the sync context manager to exit. 

111 # This next statement can raise `get_cancelled_exc_class()` if 

112 # something went wrong in a task group in this async context 

113 # manager. 

114 await self._exit_event.wait() 

115 finally: 

116 # In case of cancellation, it could be that we end up here before 

117 # `_BlockingAsyncContextManager.__exit__` is called, and an 

118 # `_exit_exc_info` has been set. 

119 result = await self._async_cm.__aexit__(*self._exit_exc_info) 

120 return result 

121 

122 def __enter__(self) -> T_co: 

123 self._enter_future = Future() 

124 self._exit_future = self._portal.start_task_soon(self.run_async_cm) 

125 cm = self._enter_future.result() 

126 return cast(T_co, cm) 

127 

128 def __exit__( 

129 self, 

130 __exc_type: Optional[Type[BaseException]], 

131 __exc_value: Optional[BaseException], 

132 __traceback: Optional[TracebackType], 

133 ) -> Optional[bool]: 

134 self._exit_exc_info = __exc_type, __exc_value, __traceback 

135 self._portal.call(self._exit_event.set) 

136 return self._exit_future.result() 

137 

138 

139class _BlockingPortalTaskStatus(TaskStatus): 

140 def __init__(self, future: Future): 

141 self._future = future 

142 

143 def started(self, value: object = None) -> None: 

144 self._future.set_result(value) 

145 

146 

147class BlockingPortal: 

148 """An object that lets external threads run code in an asynchronous event loop.""" 

149 

150 def __new__(cls) -> "BlockingPortal": 

151 return get_asynclib().BlockingPortal() 

152 

153 def __init__(self) -> None: 

154 self._event_loop_thread_id: Optional[int] = threading.get_ident() 

155 self._stop_event = Event() 

156 self._task_group = create_task_group() 

157 self._cancelled_exc_class = get_cancelled_exc_class() 

158 

159 async def __aenter__(self) -> "BlockingPortal": 

160 await self._task_group.__aenter__() 

161 return self 

162 

163 async def __aexit__( 

164 self, 

165 exc_type: Optional[Type[BaseException]], 

166 exc_val: Optional[BaseException], 

167 exc_tb: Optional[TracebackType], 

168 ) -> Optional[bool]: 

169 await self.stop() 

170 return await self._task_group.__aexit__(exc_type, exc_val, exc_tb) 

171 

172 def _check_running(self) -> None: 

173 if self._event_loop_thread_id is None: 

174 raise RuntimeError("This portal is not running") 

175 if self._event_loop_thread_id == threading.get_ident(): 

176 raise RuntimeError( 

177 "This method cannot be called from the event loop thread" 

178 ) 

179 

180 async def sleep_until_stopped(self) -> None: 

181 """Sleep until :meth:`stop` is called.""" 

182 await self._stop_event.wait() 

183 

184 async def stop(self, cancel_remaining: bool = False) -> None: 

185 """ 

186 Signal the portal to shut down. 

187 

188 This marks the portal as no longer accepting new calls and exits from 

189 :meth:`sleep_until_stopped`. 

190 

191 :param cancel_remaining: ``True`` to cancel all the remaining tasks, ``False`` to let them 

192 finish before returning 

193 

194 """ 

195 self._event_loop_thread_id = None 

196 self._stop_event.set() 

197 if cancel_remaining: 

198 self._task_group.cancel_scope.cancel() 

199 

200 async def _call_func( 

201 self, func: Callable, args: tuple, kwargs: Dict[str, Any], future: Future 

202 ) -> None: 

203 def callback(f: Future) -> None: 

204 if f.cancelled() and self._event_loop_thread_id not in ( 

205 None, 

206 threading.get_ident(), 

207 ): 

208 self.call(scope.cancel) 

209 

210 try: 

211 retval = func(*args, **kwargs) 

212 if iscoroutine(retval): 

213 with CancelScope() as scope: 

214 if future.cancelled(): 

215 scope.cancel() 

216 else: 

217 future.add_done_callback(callback) 

218 

219 retval = await retval 

220 except self._cancelled_exc_class: 

221 future.cancel() 

222 except BaseException as exc: 

223 if not future.cancelled(): 

224 future.set_exception(exc) 

225 

226 # Let base exceptions fall through 

227 if not isinstance(exc, Exception): 

228 raise 

229 else: 

230 if not future.cancelled(): 

231 future.set_result(retval) 

232 finally: 

233 scope = None # type: ignore[assignment] 

234 

235 def _spawn_task_from_thread( 

236 self, 

237 func: Callable, 

238 args: tuple, 

239 kwargs: Dict[str, Any], 

240 name: object, 

241 future: Future, 

242 ) -> None: 

243 """ 

244 Spawn a new task using the given callable. 

245 

246 Implementors must ensure that the future is resolved when the task finishes. 

247 

248 :param func: a callable 

249 :param args: positional arguments to be passed to the callable 

250 :param kwargs: keyword arguments to be passed to the callable 

251 :param name: name of the task (will be coerced to a string if not ``None``) 

252 :param future: a future that will resolve to the return value of the callable, or the 

253 exception raised during its execution 

254 

255 """ 

256 raise NotImplementedError 

257 

258 @overload 

259 def call( 

260 self, func: Callable[..., Coroutine[Any, Any, T_Retval]], *args: object 

261 ) -> T_Retval: 

262 ... 

263 

264 @overload 

265 def call(self, func: Callable[..., T_Retval], *args: object) -> T_Retval: 

266 ... 

267 

268 def call( 

269 self, 

270 func: Callable[..., Union[Coroutine[Any, Any, T_Retval], T_Retval]], 

271 *args: object 

272 ) -> T_Retval: 

273 """ 

274 Call the given function in the event loop thread. 

275 

276 If the callable returns a coroutine object, it is awaited on. 

277 

278 :param func: any callable 

279 :raises RuntimeError: if the portal is not running or if this method is called from within 

280 the event loop thread 

281 

282 """ 

283 return cast(T_Retval, self.start_task_soon(func, *args).result()) 

284 

285 @overload 

286 def spawn_task( 

287 self, 

288 func: Callable[..., Coroutine[Any, Any, T_Retval]], 

289 *args: object, 

290 name: object = None 

291 ) -> "Future[T_Retval]": 

292 ... 

293 

294 @overload 

295 def spawn_task( 

296 self, func: Callable[..., T_Retval], *args: object, name: object = None 

297 ) -> "Future[T_Retval]": 

298 ... 

299 

300 def spawn_task( 

301 self, 

302 func: Callable[..., Union[Coroutine[Any, Any, T_Retval], T_Retval]], 

303 *args: object, 

304 name: object = None 

305 ) -> "Future[T_Retval]": 

306 """ 

307 Start a task in the portal's task group. 

308 

309 :param func: the target coroutine function 

310 :param args: positional arguments passed to ``func`` 

311 :param name: name of the task (will be coerced to a string if not ``None``) 

312 :return: a future that resolves with the return value of the callable if the task completes 

313 successfully, or with the exception raised in the task 

314 :raises RuntimeError: if the portal is not running or if this method is called from within 

315 the event loop thread 

316 

317 .. versionadded:: 2.1 

318 .. deprecated:: 3.0 

319 Use :meth:`start_task_soon` instead. If your code needs AnyIO 2 compatibility, you 

320 can keep using this until AnyIO 4. 

321 

322 """ 

323 warn( 

324 "spawn_task() is deprecated -- use start_task_soon() instead", 

325 DeprecationWarning, 

326 ) 

327 return self.start_task_soon(func, *args, name=name) # type: ignore[arg-type] 

328 

329 @overload 

330 def start_task_soon( 

331 self, 

332 func: Callable[..., Coroutine[Any, Any, T_Retval]], 

333 *args: object, 

334 name: object = None 

335 ) -> "Future[T_Retval]": 

336 ... 

337 

338 @overload 

339 def start_task_soon( 

340 self, func: Callable[..., T_Retval], *args: object, name: object = None 

341 ) -> "Future[T_Retval]": 

342 ... 

343 

344 def start_task_soon( 

345 self, 

346 func: Callable[..., Union[Coroutine[Any, Any, T_Retval], T_Retval]], 

347 *args: object, 

348 name: object = None 

349 ) -> "Future[T_Retval]": 

350 """ 

351 Start a task in the portal's task group. 

352 

353 The task will be run inside a cancel scope which can be cancelled by cancelling the 

354 returned future. 

355 

356 :param func: the target coroutine function 

357 :param args: positional arguments passed to ``func`` 

358 :param name: name of the task (will be coerced to a string if not ``None``) 

359 :return: a future that resolves with the return value of the callable if the task completes 

360 successfully, or with the exception raised in the task 

361 :raises RuntimeError: if the portal is not running or if this method is called from within 

362 the event loop thread 

363 

364 .. versionadded:: 3.0 

365 

366 """ 

367 self._check_running() 

368 f: Future = Future() 

369 self._spawn_task_from_thread(func, args, {}, name, f) 

370 return f 

371 

372 def start_task( 

373 self, 

374 func: Callable[..., Coroutine[Any, Any, Any]], 

375 *args: object, 

376 name: object = None 

377 ) -> Tuple["Future[Any]", Any]: 

378 """ 

379 Start a task in the portal's task group and wait until it signals for readiness. 

380 

381 This method works the same way as :meth:`TaskGroup.start`. 

382 

383 :param func: the target coroutine function 

384 :param args: positional arguments passed to ``func`` 

385 :param name: name of the task (will be coerced to a string if not ``None``) 

386 :return: a tuple of (future, task_status_value) where the ``task_status_value`` is the 

387 value passed to ``task_status.started()`` from within the target function 

388 

389 .. versionadded:: 3.0 

390 

391 """ 

392 

393 def task_done(future: Future) -> None: 

394 if not task_status_future.done(): 

395 if future.cancelled(): 

396 task_status_future.cancel() 

397 elif future.exception(): 

398 task_status_future.set_exception(future.exception()) 

399 else: 

400 exc = RuntimeError( 

401 "Task exited without calling task_status.started()" 

402 ) 

403 task_status_future.set_exception(exc) 

404 

405 self._check_running() 

406 task_status_future: Future = Future() 

407 task_status = _BlockingPortalTaskStatus(task_status_future) 

408 f: Future = Future() 

409 f.add_done_callback(task_done) 

410 self._spawn_task_from_thread(func, args, {"task_status": task_status}, name, f) 

411 return f, task_status_future.result() 

412 

413 def wrap_async_context_manager( 

414 self, cm: AsyncContextManager[T_co] 

415 ) -> ContextManager[T_co]: 

416 """ 

417 Wrap an async context manager as a synchronous context manager via this portal. 

418 

419 Spawns a task that will call both ``__aenter__()`` and ``__aexit__()``, stopping in the 

420 middle until the synchronous context manager exits. 

421 

422 :param cm: an asynchronous context manager 

423 :return: a synchronous context manager 

424 

425 .. versionadded:: 2.1 

426 

427 """ 

428 return _BlockingAsyncContextManager(cm, self) 

429 

430 

431def create_blocking_portal() -> BlockingPortal: 

432 """ 

433 Create a portal for running functions in the event loop thread from external threads. 

434 

435 Use this function in asynchronous code when you need to allow external threads access to the 

436 event loop where your asynchronous code is currently running. 

437 

438 .. deprecated:: 3.0 

439 Use :class:`.BlockingPortal` directly. 

440 

441 """ 

442 warn( 

443 "create_blocking_portal() has been deprecated -- use anyio.from_thread.BlockingPortal() " 

444 "directly", 

445 DeprecationWarning, 

446 ) 

447 return BlockingPortal() 

448 

449 

450@contextmanager 

451def start_blocking_portal( 

452 backend: str = "asyncio", backend_options: Optional[Dict[str, Any]] = None 

453) -> Generator[BlockingPortal, Any, None]: 

454 """ 

455 Start a new event loop in a new thread and run a blocking portal in its main task. 

456 

457 The parameters are the same as for :func:`~anyio.run`. 

458 

459 :param backend: name of the backend 

460 :param backend_options: backend options 

461 :return: a context manager that yields a blocking portal 

462 

463 .. versionchanged:: 3.0 

464 Usage as a context manager is now required. 

465 

466 """ 

467 

468 async def run_portal() -> None: 

469 async with BlockingPortal() as portal_: 

470 if future.set_running_or_notify_cancel(): 

471 future.set_result(portal_) 

472 await portal_.sleep_until_stopped() 

473 

474 future: Future[BlockingPortal] = Future() 

475 with ThreadPoolExecutor(1) as executor: 

476 run_future = executor.submit( 

477 _eventloop.run, 

478 run_portal, # type: ignore[arg-type] 

479 backend=backend, 

480 backend_options=backend_options, 

481 ) 

482 try: 

483 wait( 

484 cast(Iterable[Future], [run_future, future]), 

485 return_when=FIRST_COMPLETED, 

486 ) 

487 except BaseException: 

488 future.cancel() 

489 run_future.cancel() 

490 raise 

491 

492 if future.done(): 

493 portal = future.result() 

494 try: 

495 yield portal 

496 except BaseException: 

497 portal.call(portal.stop, True) 

498 raise 

499 

500 portal.call(portal.stop, False) 

501 

502 run_future.result()