Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/anyio/from_thread.py: 33%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

221 statements  

1from __future__ import annotations 

2 

3__all__ = ( 

4 "BlockingPortal", 

5 "BlockingPortalProvider", 

6 "check_cancelled", 

7 "run", 

8 "run_sync", 

9 "start_blocking_portal", 

10) 

11 

12import sys 

13from collections.abc import Awaitable, Callable, Generator 

14from concurrent.futures import Future 

15from contextlib import ( 

16 AbstractAsyncContextManager, 

17 AbstractContextManager, 

18 contextmanager, 

19) 

20from dataclasses import dataclass, field 

21from functools import partial 

22from inspect import isawaitable 

23from threading import Lock, Thread, current_thread, get_ident 

24from types import TracebackType 

25from typing import ( 

26 Any, 

27 Generic, 

28 TypeVar, 

29 cast, 

30 overload, 

31) 

32 

33from ._core._eventloop import ( 

34 get_cancelled_exc_class, 

35 threadlocals, 

36) 

37from ._core._eventloop import run as run_eventloop 

38from ._core._exceptions import NoEventLoopError 

39from ._core._synchronization import Event 

40from ._core._tasks import CancelScope, create_task_group 

41from .abc._tasks import TaskStatus 

42from .lowlevel import EventLoopToken, current_token 

43 

44if sys.version_info >= (3, 11): 

45 from typing import TypeVarTuple, Unpack 

46else: 

47 from typing_extensions import TypeVarTuple, Unpack 

48 

49T_Retval = TypeVar("T_Retval") 

50T_co = TypeVar("T_co", covariant=True) 

51PosArgsT = TypeVarTuple("PosArgsT") 

52 

53 

54def _token_or_error(token: EventLoopToken | None) -> EventLoopToken: 

55 if token is not None: 

56 return token 

57 

58 try: 

59 return threadlocals.current_token 

60 except AttributeError: 

61 raise NoEventLoopError( 

62 "Not running inside an AnyIO worker thread, and no event loop token was " 

63 "provided" 

64 ) from None 

65 

66 

67def run( 

68 func: Callable[[Unpack[PosArgsT]], Awaitable[T_Retval]], 

69 *args: Unpack[PosArgsT], 

70 token: EventLoopToken | None = None, 

71) -> T_Retval: 

72 """ 

73 Call a coroutine function from a worker thread. 

74 

75 :param func: a coroutine function 

76 :param args: positional arguments for the callable 

77 :param token: an event loop token to use to get back to the event loop thread 

78 (required if calling this function from outside an AnyIO worker thread) 

79 :return: the return value of the coroutine function 

80 :raises MissingTokenError: if no token was provided and called from outside an 

81 AnyIO worker thread 

82 :raises RunFinishedError: if the event loop tied to ``token`` is no longer running 

83 

84 .. versionchanged:: 4.11.0 

85 Added the ``token`` parameter. 

86 

87 """ 

88 explicit_token = token is not None 

89 token = _token_or_error(token) 

90 return token.backend_class.run_async_from_thread( 

91 func, args, token=token.native_token if explicit_token else None 

92 ) 

93 

94 

95def run_sync( 

96 func: Callable[[Unpack[PosArgsT]], T_Retval], 

97 *args: Unpack[PosArgsT], 

98 token: EventLoopToken | None = None, 

99) -> T_Retval: 

100 """ 

101 Call a function in the event loop thread from a worker thread. 

102 

103 :param func: a callable 

104 :param args: positional arguments for the callable 

105 :param token: an event loop token to use to get back to the event loop thread 

106 (required if calling this function from outside an AnyIO worker thread) 

107 :return: the return value of the callable 

108 :raises MissingTokenError: if no token was provided and called from outside an 

109 AnyIO worker thread 

110 :raises RunFinishedError: if the event loop tied to ``token`` is no longer running 

111 

112 .. versionchanged:: 4.11.0 

113 Added the ``token`` parameter. 

114 

115 """ 

116 explicit_token = token is not None 

117 token = _token_or_error(token) 

118 return token.backend_class.run_sync_from_thread( 

119 func, args, token=token.native_token if explicit_token else None 

120 ) 

121 

122 

123class _BlockingAsyncContextManager(Generic[T_co], AbstractContextManager): 

124 _enter_future: Future[T_co] 

125 _exit_future: Future[bool | None] 

126 _exit_event: Event 

127 _exit_exc_info: tuple[ 

128 type[BaseException] | None, BaseException | None, TracebackType | None 

129 ] = (None, None, None) 

130 

131 def __init__( 

132 self, async_cm: AbstractAsyncContextManager[T_co], portal: BlockingPortal 

133 ): 

134 self._async_cm = async_cm 

135 self._portal = portal 

136 

137 async def run_async_cm(self) -> bool | None: 

138 try: 

139 self._exit_event = Event() 

140 value = await self._async_cm.__aenter__() 

141 except BaseException as exc: 

142 self._enter_future.set_exception(exc) 

143 raise 

144 else: 

145 self._enter_future.set_result(value) 

146 

147 try: 

148 # Wait for the sync context manager to exit. 

149 # This next statement can raise `get_cancelled_exc_class()` if 

150 # something went wrong in a task group in this async context 

151 # manager. 

152 await self._exit_event.wait() 

153 finally: 

154 # In case of cancellation, it could be that we end up here before 

155 # `_BlockingAsyncContextManager.__exit__` is called, and an 

156 # `_exit_exc_info` has been set. 

157 result = await self._async_cm.__aexit__(*self._exit_exc_info) 

158 

159 return result 

160 

161 def __enter__(self) -> T_co: 

162 self._enter_future = Future() 

163 self._exit_future = self._portal.start_task_soon(self.run_async_cm) 

164 return self._enter_future.result() 

165 

166 def __exit__( 

167 self, 

168 __exc_type: type[BaseException] | None, 

169 __exc_value: BaseException | None, 

170 __traceback: TracebackType | None, 

171 ) -> bool | None: 

172 self._exit_exc_info = __exc_type, __exc_value, __traceback 

173 self._portal.call(self._exit_event.set) 

174 return self._exit_future.result() 

175 

176 

177class _BlockingPortalTaskStatus(TaskStatus): 

178 def __init__(self, future: Future): 

179 self._future = future 

180 

181 def started(self, value: object = None) -> None: 

182 self._future.set_result(value) 

183 

184 

185class BlockingPortal: 

186 """ 

187 An object that lets external threads run code in an asynchronous event loop. 

188 

189 :raises NoEventLoopError: if no supported asynchronous event loop is running in the 

190 current thread 

191 """ 

192 

193 def __init__(self) -> None: 

194 self._token = current_token() 

195 self._event_loop_thread_id: int | None = get_ident() 

196 self._stop_event = Event() 

197 self._task_group = create_task_group() 

198 

199 async def __aenter__(self) -> BlockingPortal: 

200 await self._task_group.__aenter__() 

201 return self 

202 

203 async def __aexit__( 

204 self, 

205 exc_type: type[BaseException] | None, 

206 exc_val: BaseException | None, 

207 exc_tb: TracebackType | None, 

208 ) -> bool: 

209 await self.stop() 

210 return await self._task_group.__aexit__(exc_type, exc_val, exc_tb) 

211 

212 def _check_running(self) -> None: 

213 if self._event_loop_thread_id is None: 

214 raise RuntimeError("This portal is not running") 

215 if self._event_loop_thread_id == get_ident(): 

216 raise RuntimeError( 

217 "This method cannot be called from the event loop thread" 

218 ) 

219 

220 async def sleep_until_stopped(self) -> None: 

221 """Sleep until :meth:`stop` is called.""" 

222 await self._stop_event.wait() 

223 

224 async def stop(self, cancel_remaining: bool = False) -> None: 

225 """ 

226 Signal the portal to shut down. 

227 

228 This marks the portal as no longer accepting new calls and exits from 

229 :meth:`sleep_until_stopped`. 

230 

231 :param cancel_remaining: ``True`` to cancel all the remaining tasks, ``False`` 

232 to let them finish before returning 

233 

234 """ 

235 self._event_loop_thread_id = None 

236 self._stop_event.set() 

237 if cancel_remaining: 

238 self._task_group.cancel_scope.cancel("the blocking portal is shutting down") 

239 

240 async def _call_func( 

241 self, 

242 func: Callable[[Unpack[PosArgsT]], Awaitable[T_Retval] | T_Retval], 

243 args: tuple[Unpack[PosArgsT]], 

244 kwargs: dict[str, Any], 

245 future: Future[T_Retval], 

246 ) -> None: 

247 def callback(f: Future[T_Retval]) -> None: 

248 if f.cancelled(): 

249 if self._event_loop_thread_id == get_ident(): 

250 scope.cancel("the future was cancelled") 

251 elif self._event_loop_thread_id is not None: 

252 self.call(scope.cancel, "the future was cancelled") 

253 

254 try: 

255 retval_or_awaitable = func(*args, **kwargs) 

256 if isawaitable(retval_or_awaitable): 

257 with CancelScope() as scope: 

258 future.add_done_callback(callback) 

259 retval = await retval_or_awaitable 

260 else: 

261 retval = retval_or_awaitable 

262 except get_cancelled_exc_class(): 

263 future.cancel() 

264 future.set_running_or_notify_cancel() 

265 except BaseException as exc: 

266 if not future.cancelled(): 

267 future.set_exception(exc) 

268 

269 # Let base exceptions fall through 

270 if not isinstance(exc, Exception): 

271 raise 

272 else: 

273 if not future.cancelled(): 

274 future.set_result(retval) 

275 finally: 

276 scope = None # type: ignore[assignment] 

277 

278 def _spawn_task_from_thread( 

279 self, 

280 func: Callable[[Unpack[PosArgsT]], Awaitable[T_Retval] | T_Retval], 

281 args: tuple[Unpack[PosArgsT]], 

282 kwargs: dict[str, Any], 

283 name: object, 

284 future: Future[T_Retval], 

285 ) -> None: 

286 """ 

287 Spawn a new task using the given callable. 

288 

289 :param func: a callable 

290 :param args: positional arguments to be passed to the callable 

291 :param kwargs: keyword arguments to be passed to the callable 

292 :param name: name of the task (will be coerced to a string if not ``None``) 

293 :param future: a future that will resolve to the return value of the callable, 

294 or the exception raised during its execution 

295 

296 """ 

297 run_sync( 

298 partial(self._task_group.start_soon, name=name), 

299 self._call_func, 

300 func, 

301 args, 

302 kwargs, 

303 future, 

304 token=self._token, 

305 ) 

306 

307 @overload 

308 def call( 

309 self, 

310 func: Callable[[Unpack[PosArgsT]], Awaitable[T_Retval]], 

311 *args: Unpack[PosArgsT], 

312 ) -> T_Retval: ... 

313 

314 @overload 

315 def call( 

316 self, func: Callable[[Unpack[PosArgsT]], T_Retval], *args: Unpack[PosArgsT] 

317 ) -> T_Retval: ... 

318 

319 def call( 

320 self, 

321 func: Callable[[Unpack[PosArgsT]], Awaitable[T_Retval] | T_Retval], 

322 *args: Unpack[PosArgsT], 

323 ) -> T_Retval: 

324 """ 

325 Call the given function in the event loop thread. 

326 

327 If the callable returns a coroutine object, it is awaited on. 

328 

329 :param func: any callable 

330 :raises RuntimeError: if the portal is not running or if this method is called 

331 from within the event loop thread 

332 

333 """ 

334 return cast(T_Retval, self.start_task_soon(func, *args).result()) 

335 

336 @overload 

337 def start_task_soon( 

338 self, 

339 func: Callable[[Unpack[PosArgsT]], Awaitable[T_Retval]], 

340 *args: Unpack[PosArgsT], 

341 name: object = None, 

342 ) -> Future[T_Retval]: ... 

343 

344 @overload 

345 def start_task_soon( 

346 self, 

347 func: Callable[[Unpack[PosArgsT]], T_Retval], 

348 *args: Unpack[PosArgsT], 

349 name: object = None, 

350 ) -> Future[T_Retval]: ... 

351 

352 def start_task_soon( 

353 self, 

354 func: Callable[[Unpack[PosArgsT]], Awaitable[T_Retval] | T_Retval], 

355 *args: Unpack[PosArgsT], 

356 name: object = None, 

357 ) -> Future[T_Retval]: 

358 """ 

359 Start a task in the portal's task group. 

360 

361 The task will be run inside a cancel scope which can be cancelled by cancelling 

362 the returned future. 

363 

364 :param func: the target function 

365 :param args: positional arguments passed to ``func`` 

366 :param name: name of the task (will be coerced to a string if not ``None``) 

367 :return: a future that resolves with the return value of the callable if the 

368 task completes successfully, or with the exception raised in the task 

369 :raises RuntimeError: if the portal is not running or if this method is called 

370 from within the event loop thread 

371 :rtype: concurrent.futures.Future[T_Retval] 

372 

373 .. versionadded:: 3.0 

374 

375 """ 

376 self._check_running() 

377 f: Future[T_Retval] = Future() 

378 self._spawn_task_from_thread(func, args, {}, name, f) 

379 return f 

380 

381 def start_task( 

382 self, 

383 func: Callable[..., Awaitable[T_Retval]], 

384 *args: object, 

385 name: object = None, 

386 ) -> tuple[Future[T_Retval], Any]: 

387 """ 

388 Start a task in the portal's task group and wait until it signals for readiness. 

389 

390 This method works the same way as :meth:`.abc.TaskGroup.start`. 

391 

392 :param func: the target function 

393 :param args: positional arguments passed to ``func`` 

394 :param name: name of the task (will be coerced to a string if not ``None``) 

395 :return: a tuple of (future, task_status_value) where the ``task_status_value`` 

396 is the value passed to ``task_status.started()`` from within the target 

397 function 

398 :rtype: tuple[concurrent.futures.Future[T_Retval], Any] 

399 

400 .. versionadded:: 3.0 

401 

402 """ 

403 

404 def task_done(future: Future[T_Retval]) -> None: 

405 if not task_status_future.done(): 

406 if future.cancelled(): 

407 task_status_future.cancel() 

408 elif future.exception(): 

409 task_status_future.set_exception(future.exception()) 

410 else: 

411 exc = RuntimeError( 

412 "Task exited without calling task_status.started()" 

413 ) 

414 task_status_future.set_exception(exc) 

415 

416 self._check_running() 

417 task_status_future: Future = Future() 

418 task_status = _BlockingPortalTaskStatus(task_status_future) 

419 f: Future = Future() 

420 f.add_done_callback(task_done) 

421 self._spawn_task_from_thread(func, args, {"task_status": task_status}, name, f) 

422 return f, task_status_future.result() 

423 

424 def wrap_async_context_manager( 

425 self, cm: AbstractAsyncContextManager[T_co] 

426 ) -> AbstractContextManager[T_co]: 

427 """ 

428 Wrap an async context manager as a synchronous context manager via this portal. 

429 

430 Spawns a task that will call both ``__aenter__()`` and ``__aexit__()``, stopping 

431 in the middle until the synchronous context manager exits. 

432 

433 :param cm: an asynchronous context manager 

434 :return: a synchronous context manager 

435 

436 .. versionadded:: 2.1 

437 

438 """ 

439 return _BlockingAsyncContextManager(cm, self) 

440 

441 

442@dataclass 

443class BlockingPortalProvider: 

444 """ 

445 A manager for a blocking portal. Used as a context manager. The first thread to 

446 enter this context manager causes a blocking portal to be started with the specific 

447 parameters, and the last thread to exit causes the portal to be shut down. Thus, 

448 there will be exactly one blocking portal running in this context as long as at 

449 least one thread has entered this context manager. 

450 

451 The parameters are the same as for :func:`~anyio.run`. 

452 

453 :param backend: name of the backend 

454 :param backend_options: backend options 

455 

456 .. versionadded:: 4.4 

457 """ 

458 

459 backend: str = "asyncio" 

460 backend_options: dict[str, Any] | None = None 

461 _lock: Lock = field(init=False, default_factory=Lock) 

462 _leases: int = field(init=False, default=0) 

463 _portal: BlockingPortal = field(init=False) 

464 _portal_cm: AbstractContextManager[BlockingPortal] | None = field( 

465 init=False, default=None 

466 ) 

467 

468 def __enter__(self) -> BlockingPortal: 

469 with self._lock: 

470 if self._portal_cm is None: 

471 self._portal_cm = start_blocking_portal( 

472 self.backend, self.backend_options 

473 ) 

474 self._portal = self._portal_cm.__enter__() 

475 

476 self._leases += 1 

477 return self._portal 

478 

479 def __exit__( 

480 self, 

481 exc_type: type[BaseException] | None, 

482 exc_val: BaseException | None, 

483 exc_tb: TracebackType | None, 

484 ) -> None: 

485 portal_cm: AbstractContextManager[BlockingPortal] | None = None 

486 with self._lock: 

487 assert self._portal_cm 

488 assert self._leases > 0 

489 self._leases -= 1 

490 if not self._leases: 

491 portal_cm = self._portal_cm 

492 self._portal_cm = None 

493 del self._portal 

494 

495 if portal_cm: 

496 portal_cm.__exit__(None, None, None) 

497 

498 

499@contextmanager 

500def start_blocking_portal( 

501 backend: str = "asyncio", 

502 backend_options: dict[str, Any] | None = None, 

503 *, 

504 name: str | None = None, 

505) -> Generator[BlockingPortal, Any, None]: 

506 """ 

507 Start a new event loop in a new thread and run a blocking portal in its main task. 

508 

509 The parameters are the same as for :func:`~anyio.run`. 

510 

511 :param backend: name of the backend 

512 :param backend_options: backend options 

513 :param name: name of the thread 

514 :return: a context manager that yields a blocking portal 

515 

516 .. versionchanged:: 3.0 

517 Usage as a context manager is now required. 

518 

519 """ 

520 

521 async def run_portal() -> None: 

522 async with BlockingPortal() as portal_: 

523 if name is None: 

524 current_thread().name = f"{backend}-portal-{id(portal_):x}" 

525 

526 future.set_result(portal_) 

527 await portal_.sleep_until_stopped() 

528 

529 def run_blocking_portal() -> None: 

530 if future.set_running_or_notify_cancel(): 

531 try: 

532 run_eventloop( 

533 run_portal, backend=backend, backend_options=backend_options 

534 ) 

535 except BaseException as exc: 

536 if not future.done(): 

537 future.set_exception(exc) 

538 

539 future: Future[BlockingPortal] = Future() 

540 thread = Thread(target=run_blocking_portal, daemon=True, name=name) 

541 thread.start() 

542 try: 

543 cancel_remaining_tasks = False 

544 portal = future.result() 

545 try: 

546 yield portal 

547 except BaseException: 

548 cancel_remaining_tasks = True 

549 raise 

550 finally: 

551 try: 

552 portal.call(portal.stop, cancel_remaining_tasks) 

553 except RuntimeError: 

554 pass 

555 finally: 

556 thread.join() 

557 

558 

559def check_cancelled() -> None: 

560 """ 

561 Check if the cancel scope of the host task's running the current worker thread has 

562 been cancelled. 

563 

564 If the host task's current cancel scope has indeed been cancelled, the 

565 backend-specific cancellation exception will be raised. 

566 

567 :raises RuntimeError: if the current thread was not spawned by 

568 :func:`.to_thread.run_sync` 

569 

570 """ 

571 try: 

572 token: EventLoopToken = threadlocals.current_token 

573 except AttributeError: 

574 raise NoEventLoopError( 

575 "This function can only be called inside an AnyIO worker thread" 

576 ) from None 

577 

578 token.backend_class.check_cancelled()