Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/anyio/from_thread.py: 31%

195 statements  

« prev     ^ index     » next       coverage.py v7.2.7, created at 2023-06-07 07:19 +0000

1from __future__ import annotations 

2 

3import threading 

4from asyncio import iscoroutine 

5from concurrent.futures import FIRST_COMPLETED, Future, ThreadPoolExecutor, wait 

6from contextlib import AbstractContextManager, contextmanager 

7from types import TracebackType 

8from typing import ( 

9 Any, 

10 AsyncContextManager, 

11 Awaitable, 

12 Callable, 

13 ContextManager, 

14 Generator, 

15 Generic, 

16 Iterable, 

17 TypeVar, 

18 cast, 

19 overload, 

20) 

21from warnings import warn 

22 

23from ._core import _eventloop 

24from ._core._eventloop import get_asynclib, get_cancelled_exc_class, threadlocals 

25from ._core._synchronization import Event 

26from ._core._tasks import CancelScope, create_task_group 

27from .abc._tasks import TaskStatus 

28 

29T_Retval = TypeVar("T_Retval") 

30T_co = TypeVar("T_co") 

31 

32 

33def run(func: Callable[..., Awaitable[T_Retval]], *args: object) -> T_Retval: 

34 """ 

35 Call a coroutine function from a worker thread. 

36 

37 :param func: a coroutine function 

38 :param args: positional arguments for the callable 

39 :return: the return value of the coroutine function 

40 

41 """ 

42 try: 

43 asynclib = threadlocals.current_async_module 

44 except AttributeError: 

45 raise RuntimeError("This function can only be run from an AnyIO worker thread") 

46 

47 return asynclib.run_async_from_thread(func, *args) 

48 

49 

50def run_async_from_thread( 

51 func: Callable[..., Awaitable[T_Retval]], *args: object 

52) -> T_Retval: 

53 warn( 

54 "run_async_from_thread() has been deprecated, use anyio.from_thread.run() instead", 

55 DeprecationWarning, 

56 ) 

57 return run(func, *args) 

58 

59 

60def run_sync(func: Callable[..., T_Retval], *args: object) -> T_Retval: 

61 """ 

62 Call a function in the event loop thread from a worker thread. 

63 

64 :param func: a callable 

65 :param args: positional arguments for the callable 

66 :return: the return value of the callable 

67 

68 """ 

69 try: 

70 asynclib = threadlocals.current_async_module 

71 except AttributeError: 

72 raise RuntimeError("This function can only be run from an AnyIO worker thread") 

73 

74 return asynclib.run_sync_from_thread(func, *args) 

75 

76 

77def run_sync_from_thread(func: Callable[..., T_Retval], *args: object) -> T_Retval: 

78 warn( 

79 "run_sync_from_thread() has been deprecated, use anyio.from_thread.run_sync() instead", 

80 DeprecationWarning, 

81 ) 

82 return run_sync(func, *args) 

83 

84 

85class _BlockingAsyncContextManager(Generic[T_co], AbstractContextManager): 

86 _enter_future: Future 

87 _exit_future: Future 

88 _exit_event: Event 

89 _exit_exc_info: tuple[ 

90 type[BaseException] | None, BaseException | None, TracebackType | None 

91 ] = (None, None, None) 

92 

93 def __init__(self, async_cm: AsyncContextManager[T_co], portal: BlockingPortal): 

94 self._async_cm = async_cm 

95 self._portal = portal 

96 

97 async def run_async_cm(self) -> bool | None: 

98 try: 

99 self._exit_event = Event() 

100 value = await self._async_cm.__aenter__() 

101 except BaseException as exc: 

102 self._enter_future.set_exception(exc) 

103 raise 

104 else: 

105 self._enter_future.set_result(value) 

106 

107 try: 

108 # Wait for the sync context manager to exit. 

109 # This next statement can raise `get_cancelled_exc_class()` if 

110 # something went wrong in a task group in this async context 

111 # manager. 

112 await self._exit_event.wait() 

113 finally: 

114 # In case of cancellation, it could be that we end up here before 

115 # `_BlockingAsyncContextManager.__exit__` is called, and an 

116 # `_exit_exc_info` has been set. 

117 result = await self._async_cm.__aexit__(*self._exit_exc_info) 

118 return result 

119 

120 def __enter__(self) -> T_co: 

121 self._enter_future = Future() 

122 self._exit_future = self._portal.start_task_soon(self.run_async_cm) 

123 cm = self._enter_future.result() 

124 return cast(T_co, cm) 

125 

126 def __exit__( 

127 self, 

128 __exc_type: type[BaseException] | None, 

129 __exc_value: BaseException | None, 

130 __traceback: TracebackType | None, 

131 ) -> bool | None: 

132 self._exit_exc_info = __exc_type, __exc_value, __traceback 

133 self._portal.call(self._exit_event.set) 

134 return self._exit_future.result() 

135 

136 

137class _BlockingPortalTaskStatus(TaskStatus): 

138 def __init__(self, future: Future): 

139 self._future = future 

140 

141 def started(self, value: object = None) -> None: 

142 self._future.set_result(value) 

143 

144 

145class BlockingPortal: 

146 """An object that lets external threads run code in an asynchronous event loop.""" 

147 

148 def __new__(cls) -> BlockingPortal: 

149 return get_asynclib().BlockingPortal() 

150 

151 def __init__(self) -> None: 

152 self._event_loop_thread_id: int | None = threading.get_ident() 

153 self._stop_event = Event() 

154 self._task_group = create_task_group() 

155 self._cancelled_exc_class = get_cancelled_exc_class() 

156 

157 async def __aenter__(self) -> BlockingPortal: 

158 await self._task_group.__aenter__() 

159 return self 

160 

161 async def __aexit__( 

162 self, 

163 exc_type: type[BaseException] | None, 

164 exc_val: BaseException | None, 

165 exc_tb: TracebackType | None, 

166 ) -> bool | None: 

167 await self.stop() 

168 return await self._task_group.__aexit__(exc_type, exc_val, exc_tb) 

169 

170 def _check_running(self) -> None: 

171 if self._event_loop_thread_id is None: 

172 raise RuntimeError("This portal is not running") 

173 if self._event_loop_thread_id == threading.get_ident(): 

174 raise RuntimeError( 

175 "This method cannot be called from the event loop thread" 

176 ) 

177 

178 async def sleep_until_stopped(self) -> None: 

179 """Sleep until :meth:`stop` is called.""" 

180 await self._stop_event.wait() 

181 

182 async def stop(self, cancel_remaining: bool = False) -> None: 

183 """ 

184 Signal the portal to shut down. 

185 

186 This marks the portal as no longer accepting new calls and exits from 

187 :meth:`sleep_until_stopped`. 

188 

189 :param cancel_remaining: ``True`` to cancel all the remaining tasks, ``False`` to let them 

190 finish before returning 

191 

192 """ 

193 self._event_loop_thread_id = None 

194 self._stop_event.set() 

195 if cancel_remaining: 

196 self._task_group.cancel_scope.cancel() 

197 

198 async def _call_func( 

199 self, func: Callable, args: tuple, kwargs: dict[str, Any], future: Future 

200 ) -> None: 

201 def callback(f: Future) -> None: 

202 if f.cancelled() and self._event_loop_thread_id not in ( 

203 None, 

204 threading.get_ident(), 

205 ): 

206 self.call(scope.cancel) 

207 

208 try: 

209 retval = func(*args, **kwargs) 

210 if iscoroutine(retval): 

211 with CancelScope() as scope: 

212 if future.cancelled(): 

213 scope.cancel() 

214 else: 

215 future.add_done_callback(callback) 

216 

217 retval = await retval 

218 except self._cancelled_exc_class: 

219 future.cancel() 

220 except BaseException as exc: 

221 if not future.cancelled(): 

222 future.set_exception(exc) 

223 

224 # Let base exceptions fall through 

225 if not isinstance(exc, Exception): 

226 raise 

227 else: 

228 if not future.cancelled(): 

229 future.set_result(retval) 

230 finally: 

231 scope = None # type: ignore[assignment] 

232 

233 def _spawn_task_from_thread( 

234 self, 

235 func: Callable, 

236 args: tuple, 

237 kwargs: dict[str, Any], 

238 name: object, 

239 future: Future, 

240 ) -> None: 

241 """ 

242 Spawn a new task using the given callable. 

243 

244 Implementors must ensure that the future is resolved when the task finishes. 

245 

246 :param func: a callable 

247 :param args: positional arguments to be passed to the callable 

248 :param kwargs: keyword arguments to be passed to the callable 

249 :param name: name of the task (will be coerced to a string if not ``None``) 

250 :param future: a future that will resolve to the return value of the callable, or the 

251 exception raised during its execution 

252 

253 """ 

254 raise NotImplementedError 

255 

256 @overload 

257 def call(self, func: Callable[..., Awaitable[T_Retval]], *args: object) -> T_Retval: 

258 ... 

259 

260 @overload 

261 def call(self, func: Callable[..., T_Retval], *args: object) -> T_Retval: 

262 ... 

263 

264 def call( 

265 self, func: Callable[..., Awaitable[T_Retval] | T_Retval], *args: object 

266 ) -> T_Retval: 

267 """ 

268 Call the given function in the event loop thread. 

269 

270 If the callable returns a coroutine object, it is awaited on. 

271 

272 :param func: any callable 

273 :raises RuntimeError: if the portal is not running or if this method is called from within 

274 the event loop thread 

275 

276 """ 

277 return cast(T_Retval, self.start_task_soon(func, *args).result()) 

278 

279 @overload 

280 def spawn_task( 

281 self, 

282 func: Callable[..., Awaitable[T_Retval]], 

283 *args: object, 

284 name: object = None, 

285 ) -> Future[T_Retval]: 

286 ... 

287 

288 @overload 

289 def spawn_task( 

290 self, func: Callable[..., T_Retval], *args: object, name: object = None 

291 ) -> Future[T_Retval]: 

292 ... 

293 

294 def spawn_task( 

295 self, 

296 func: Callable[..., Awaitable[T_Retval] | T_Retval], 

297 *args: object, 

298 name: object = None, 

299 ) -> Future[T_Retval]: 

300 """ 

301 Start a task in the portal's task group. 

302 

303 :param func: the target coroutine function 

304 :param args: positional arguments passed to ``func`` 

305 :param name: name of the task (will be coerced to a string if not ``None``) 

306 :return: a future that resolves with the return value of the callable if the task completes 

307 successfully, or with the exception raised in the task 

308 :raises RuntimeError: if the portal is not running or if this method is called from within 

309 the event loop thread 

310 

311 .. versionadded:: 2.1 

312 .. deprecated:: 3.0 

313 Use :meth:`start_task_soon` instead. If your code needs AnyIO 2 compatibility, you 

314 can keep using this until AnyIO 4. 

315 

316 """ 

317 warn( 

318 "spawn_task() is deprecated -- use start_task_soon() instead", 

319 DeprecationWarning, 

320 ) 

321 return self.start_task_soon(func, *args, name=name) # type: ignore[arg-type] 

322 

323 @overload 

324 def start_task_soon( 

325 self, 

326 func: Callable[..., Awaitable[T_Retval]], 

327 *args: object, 

328 name: object = None, 

329 ) -> Future[T_Retval]: 

330 ... 

331 

332 @overload 

333 def start_task_soon( 

334 self, func: Callable[..., T_Retval], *args: object, name: object = None 

335 ) -> Future[T_Retval]: 

336 ... 

337 

338 def start_task_soon( 

339 self, 

340 func: Callable[..., Awaitable[T_Retval] | T_Retval], 

341 *args: object, 

342 name: object = None, 

343 ) -> Future[T_Retval]: 

344 """ 

345 Start a task in the portal's task group. 

346 

347 The task will be run inside a cancel scope which can be cancelled by cancelling the 

348 returned future. 

349 

350 :param func: the target function 

351 :param args: positional arguments passed to ``func`` 

352 :param name: name of the task (will be coerced to a string if not ``None``) 

353 :return: a future that resolves with the return value of the callable if the task completes 

354 successfully, or with the exception raised in the task 

355 :raises RuntimeError: if the portal is not running or if this method is called from within 

356 the event loop thread 

357 

358 .. versionadded:: 3.0 

359 

360 """ 

361 self._check_running() 

362 f: Future = Future() 

363 self._spawn_task_from_thread(func, args, {}, name, f) 

364 return f 

365 

366 def start_task( 

367 self, func: Callable[..., Awaitable[Any]], *args: object, name: object = None 

368 ) -> tuple[Future[Any], Any]: 

369 """ 

370 Start a task in the portal's task group and wait until it signals for readiness. 

371 

372 This method works the same way as :meth:`TaskGroup.start`. 

373 

374 :param func: the target function 

375 :param args: positional arguments passed to ``func`` 

376 :param name: name of the task (will be coerced to a string if not ``None``) 

377 :return: a tuple of (future, task_status_value) where the ``task_status_value`` is the 

378 value passed to ``task_status.started()`` from within the target function 

379 

380 .. versionadded:: 3.0 

381 

382 """ 

383 

384 def task_done(future: Future) -> None: 

385 if not task_status_future.done(): 

386 if future.cancelled(): 

387 task_status_future.cancel() 

388 elif future.exception(): 

389 task_status_future.set_exception(future.exception()) 

390 else: 

391 exc = RuntimeError( 

392 "Task exited without calling task_status.started()" 

393 ) 

394 task_status_future.set_exception(exc) 

395 

396 self._check_running() 

397 task_status_future: Future = Future() 

398 task_status = _BlockingPortalTaskStatus(task_status_future) 

399 f: Future = Future() 

400 f.add_done_callback(task_done) 

401 self._spawn_task_from_thread(func, args, {"task_status": task_status}, name, f) 

402 return f, task_status_future.result() 

403 

404 def wrap_async_context_manager( 

405 self, cm: AsyncContextManager[T_co] 

406 ) -> ContextManager[T_co]: 

407 """ 

408 Wrap an async context manager as a synchronous context manager via this portal. 

409 

410 Spawns a task that will call both ``__aenter__()`` and ``__aexit__()``, stopping in the 

411 middle until the synchronous context manager exits. 

412 

413 :param cm: an asynchronous context manager 

414 :return: a synchronous context manager 

415 

416 .. versionadded:: 2.1 

417 

418 """ 

419 return _BlockingAsyncContextManager(cm, self) 

420 

421 

422def create_blocking_portal() -> BlockingPortal: 

423 """ 

424 Create a portal for running functions in the event loop thread from external threads. 

425 

426 Use this function in asynchronous code when you need to allow external threads access to the 

427 event loop where your asynchronous code is currently running. 

428 

429 .. deprecated:: 3.0 

430 Use :class:`.BlockingPortal` directly. 

431 

432 """ 

433 warn( 

434 "create_blocking_portal() has been deprecated -- use anyio.from_thread.BlockingPortal() " 

435 "directly", 

436 DeprecationWarning, 

437 ) 

438 return BlockingPortal() 

439 

440 

441@contextmanager 

442def start_blocking_portal( 

443 backend: str = "asyncio", backend_options: dict[str, Any] | None = None 

444) -> Generator[BlockingPortal, Any, None]: 

445 """ 

446 Start a new event loop in a new thread and run a blocking portal in its main task. 

447 

448 The parameters are the same as for :func:`~anyio.run`. 

449 

450 :param backend: name of the backend 

451 :param backend_options: backend options 

452 :return: a context manager that yields a blocking portal 

453 

454 .. versionchanged:: 3.0 

455 Usage as a context manager is now required. 

456 

457 """ 

458 

459 async def run_portal() -> None: 

460 async with BlockingPortal() as portal_: 

461 if future.set_running_or_notify_cancel(): 

462 future.set_result(portal_) 

463 await portal_.sleep_until_stopped() 

464 

465 future: Future[BlockingPortal] = Future() 

466 with ThreadPoolExecutor(1) as executor: 

467 run_future = executor.submit( 

468 _eventloop.run, 

469 run_portal, # type: ignore[arg-type] 

470 backend=backend, 

471 backend_options=backend_options, 

472 ) 

473 try: 

474 wait( 

475 cast(Iterable[Future], [run_future, future]), 

476 return_when=FIRST_COMPLETED, 

477 ) 

478 except BaseException: 

479 future.cancel() 

480 run_future.cancel() 

481 raise 

482 

483 if future.done(): 

484 portal = future.result() 

485 cancel_remaining_tasks = False 

486 try: 

487 yield portal 

488 except BaseException: 

489 cancel_remaining_tasks = True 

490 raise 

491 finally: 

492 try: 

493 portal.call(portal.stop, cancel_remaining_tasks) 

494 except RuntimeError: 

495 pass 

496 

497 run_future.result()