Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/anyio/from_thread.py: 33%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

222 statements  

1from __future__ import annotations 

2 

3__all__ = ( 

4 "BlockingPortal", 

5 "BlockingPortalProvider", 

6 "check_cancelled", 

7 "run", 

8 "run_sync", 

9 "start_blocking_portal", 

10) 

11 

12import sys 

13from collections.abc import Awaitable, Callable, Generator 

14from concurrent.futures import Future 

15from contextlib import ( 

16 AbstractAsyncContextManager, 

17 AbstractContextManager, 

18 contextmanager, 

19) 

20from dataclasses import dataclass, field 

21from inspect import isawaitable 

22from threading import Lock, Thread, current_thread, get_ident 

23from types import TracebackType 

24from typing import ( 

25 Any, 

26 Generic, 

27 TypeVar, 

28 cast, 

29 overload, 

30) 

31 

32from ._core._eventloop import ( 

33 get_async_backend, 

34 get_cancelled_exc_class, 

35 threadlocals, 

36) 

37from ._core._eventloop import run as run_eventloop 

38from ._core._exceptions import NoEventLoopError 

39from ._core._synchronization import Event 

40from ._core._tasks import CancelScope, create_task_group 

41from .abc._tasks import TaskStatus 

42from .lowlevel import EventLoopToken 

43 

44if sys.version_info >= (3, 11): 

45 from typing import TypeVarTuple, Unpack 

46else: 

47 from typing_extensions import TypeVarTuple, Unpack 

48 

49T_Retval = TypeVar("T_Retval") 

50T_co = TypeVar("T_co", covariant=True) 

51PosArgsT = TypeVarTuple("PosArgsT") 

52 

53 

54def _token_or_error(token: EventLoopToken | None) -> EventLoopToken: 

55 if token is not None: 

56 return token 

57 

58 try: 

59 return threadlocals.current_token 

60 except AttributeError: 

61 raise NoEventLoopError( 

62 "Not running inside an AnyIO worker thread, and no event loop token was " 

63 "provided" 

64 ) from None 

65 

66 

67def run( 

68 func: Callable[[Unpack[PosArgsT]], Awaitable[T_Retval]], 

69 *args: Unpack[PosArgsT], 

70 token: EventLoopToken | None = None, 

71) -> T_Retval: 

72 """ 

73 Call a coroutine function from a worker thread. 

74 

75 :param func: a coroutine function 

76 :param args: positional arguments for the callable 

77 :param token: an event loop token to use to get back to the event loop thread 

78 (required if calling this function from outside an AnyIO worker thread) 

79 :return: the return value of the coroutine function 

80 :raises MissingTokenError: if no token was provided and called from outside an 

81 AnyIO worker thread 

82 :raises RunFinishedError: if the event loop tied to ``token`` is no longer running 

83 

84 .. versionchanged:: 4.11.0 

85 Added the ``token`` parameter. 

86 

87 """ 

88 explicit_token = token is not None 

89 token = _token_or_error(token) 

90 return token.backend_class.run_async_from_thread( 

91 func, args, token=token.native_token if explicit_token else None 

92 ) 

93 

94 

95def run_sync( 

96 func: Callable[[Unpack[PosArgsT]], T_Retval], 

97 *args: Unpack[PosArgsT], 

98 token: EventLoopToken | None = None, 

99) -> T_Retval: 

100 """ 

101 Call a function in the event loop thread from a worker thread. 

102 

103 :param func: a callable 

104 :param args: positional arguments for the callable 

105 :param token: an event loop token to use to get back to the event loop thread 

106 (required if calling this function from outside an AnyIO worker thread) 

107 :return: the return value of the callable 

108 :raises MissingTokenError: if no token was provided and called from outside an 

109 AnyIO worker thread 

110 :raises RunFinishedError: if the event loop tied to ``token`` is no longer running 

111 

112 .. versionchanged:: 4.11.0 

113 Added the ``token`` parameter. 

114 

115 """ 

116 explicit_token = token is not None 

117 token = _token_or_error(token) 

118 return token.backend_class.run_sync_from_thread( 

119 func, args, token=token.native_token if explicit_token else None 

120 ) 

121 

122 

123class _BlockingAsyncContextManager(Generic[T_co], AbstractContextManager): 

124 _enter_future: Future[T_co] 

125 _exit_future: Future[bool | None] 

126 _exit_event: Event 

127 _exit_exc_info: tuple[ 

128 type[BaseException] | None, BaseException | None, TracebackType | None 

129 ] = (None, None, None) 

130 

131 def __init__( 

132 self, async_cm: AbstractAsyncContextManager[T_co], portal: BlockingPortal 

133 ): 

134 self._async_cm = async_cm 

135 self._portal = portal 

136 

137 async def run_async_cm(self) -> bool | None: 

138 try: 

139 self._exit_event = Event() 

140 value = await self._async_cm.__aenter__() 

141 except BaseException as exc: 

142 self._enter_future.set_exception(exc) 

143 raise 

144 else: 

145 self._enter_future.set_result(value) 

146 

147 try: 

148 # Wait for the sync context manager to exit. 

149 # This next statement can raise `get_cancelled_exc_class()` if 

150 # something went wrong in a task group in this async context 

151 # manager. 

152 await self._exit_event.wait() 

153 finally: 

154 # In case of cancellation, it could be that we end up here before 

155 # `_BlockingAsyncContextManager.__exit__` is called, and an 

156 # `_exit_exc_info` has been set. 

157 result = await self._async_cm.__aexit__(*self._exit_exc_info) 

158 

159 return result 

160 

161 def __enter__(self) -> T_co: 

162 self._enter_future = Future() 

163 self._exit_future = self._portal.start_task_soon(self.run_async_cm) 

164 return self._enter_future.result() 

165 

166 def __exit__( 

167 self, 

168 __exc_type: type[BaseException] | None, 

169 __exc_value: BaseException | None, 

170 __traceback: TracebackType | None, 

171 ) -> bool | None: 

172 self._exit_exc_info = __exc_type, __exc_value, __traceback 

173 self._portal.call(self._exit_event.set) 

174 return self._exit_future.result() 

175 

176 

177class _BlockingPortalTaskStatus(TaskStatus): 

178 def __init__(self, future: Future): 

179 self._future = future 

180 

181 def started(self, value: object = None) -> None: 

182 self._future.set_result(value) 

183 

184 

185class BlockingPortal: 

186 """An object that lets external threads run code in an asynchronous event loop.""" 

187 

188 def __new__(cls) -> BlockingPortal: 

189 return get_async_backend().create_blocking_portal() 

190 

191 def __init__(self) -> None: 

192 self._event_loop_thread_id: int | None = get_ident() 

193 self._stop_event = Event() 

194 self._task_group = create_task_group() 

195 self._cancelled_exc_class = get_cancelled_exc_class() 

196 

197 async def __aenter__(self) -> BlockingPortal: 

198 await self._task_group.__aenter__() 

199 return self 

200 

201 async def __aexit__( 

202 self, 

203 exc_type: type[BaseException] | None, 

204 exc_val: BaseException | None, 

205 exc_tb: TracebackType | None, 

206 ) -> bool: 

207 await self.stop() 

208 return await self._task_group.__aexit__(exc_type, exc_val, exc_tb) 

209 

210 def _check_running(self) -> None: 

211 if self._event_loop_thread_id is None: 

212 raise RuntimeError("This portal is not running") 

213 if self._event_loop_thread_id == get_ident(): 

214 raise RuntimeError( 

215 "This method cannot be called from the event loop thread" 

216 ) 

217 

218 async def sleep_until_stopped(self) -> None: 

219 """Sleep until :meth:`stop` is called.""" 

220 await self._stop_event.wait() 

221 

222 async def stop(self, cancel_remaining: bool = False) -> None: 

223 """ 

224 Signal the portal to shut down. 

225 

226 This marks the portal as no longer accepting new calls and exits from 

227 :meth:`sleep_until_stopped`. 

228 

229 :param cancel_remaining: ``True`` to cancel all the remaining tasks, ``False`` 

230 to let them finish before returning 

231 

232 """ 

233 self._event_loop_thread_id = None 

234 self._stop_event.set() 

235 if cancel_remaining: 

236 self._task_group.cancel_scope.cancel("the blocking portal is shutting down") 

237 

238 async def _call_func( 

239 self, 

240 func: Callable[[Unpack[PosArgsT]], Awaitable[T_Retval] | T_Retval], 

241 args: tuple[Unpack[PosArgsT]], 

242 kwargs: dict[str, Any], 

243 future: Future[T_Retval], 

244 ) -> None: 

245 def callback(f: Future[T_Retval]) -> None: 

246 if f.cancelled(): 

247 if self._event_loop_thread_id == get_ident(): 

248 scope.cancel("the future was cancelled") 

249 elif self._event_loop_thread_id is not None: 

250 self.call(scope.cancel, "the future was cancelled") 

251 

252 try: 

253 retval_or_awaitable = func(*args, **kwargs) 

254 if isawaitable(retval_or_awaitable): 

255 with CancelScope() as scope: 

256 future.add_done_callback(callback) 

257 retval = await retval_or_awaitable 

258 else: 

259 retval = retval_or_awaitable 

260 except self._cancelled_exc_class: 

261 future.cancel() 

262 future.set_running_or_notify_cancel() 

263 except BaseException as exc: 

264 if not future.cancelled(): 

265 future.set_exception(exc) 

266 

267 # Let base exceptions fall through 

268 if not isinstance(exc, Exception): 

269 raise 

270 else: 

271 if not future.cancelled(): 

272 future.set_result(retval) 

273 finally: 

274 scope = None # type: ignore[assignment] 

275 

276 def _spawn_task_from_thread( 

277 self, 

278 func: Callable[[Unpack[PosArgsT]], Awaitable[T_Retval] | T_Retval], 

279 args: tuple[Unpack[PosArgsT]], 

280 kwargs: dict[str, Any], 

281 name: object, 

282 future: Future[T_Retval], 

283 ) -> None: 

284 """ 

285 Spawn a new task using the given callable. 

286 

287 Implementers must ensure that the future is resolved when the task finishes. 

288 

289 :param func: a callable 

290 :param args: positional arguments to be passed to the callable 

291 :param kwargs: keyword arguments to be passed to the callable 

292 :param name: name of the task (will be coerced to a string if not ``None``) 

293 :param future: a future that will resolve to the return value of the callable, 

294 or the exception raised during its execution 

295 

296 """ 

297 raise NotImplementedError 

298 

299 @overload 

300 def call( 

301 self, 

302 func: Callable[[Unpack[PosArgsT]], Awaitable[T_Retval]], 

303 *args: Unpack[PosArgsT], 

304 ) -> T_Retval: ... 

305 

306 @overload 

307 def call( 

308 self, func: Callable[[Unpack[PosArgsT]], T_Retval], *args: Unpack[PosArgsT] 

309 ) -> T_Retval: ... 

310 

311 def call( 

312 self, 

313 func: Callable[[Unpack[PosArgsT]], Awaitable[T_Retval] | T_Retval], 

314 *args: Unpack[PosArgsT], 

315 ) -> T_Retval: 

316 """ 

317 Call the given function in the event loop thread. 

318 

319 If the callable returns a coroutine object, it is awaited on. 

320 

321 :param func: any callable 

322 :raises RuntimeError: if the portal is not running or if this method is called 

323 from within the event loop thread 

324 

325 """ 

326 return cast(T_Retval, self.start_task_soon(func, *args).result()) 

327 

328 @overload 

329 def start_task_soon( 

330 self, 

331 func: Callable[[Unpack[PosArgsT]], Awaitable[T_Retval]], 

332 *args: Unpack[PosArgsT], 

333 name: object = None, 

334 ) -> Future[T_Retval]: ... 

335 

336 @overload 

337 def start_task_soon( 

338 self, 

339 func: Callable[[Unpack[PosArgsT]], T_Retval], 

340 *args: Unpack[PosArgsT], 

341 name: object = None, 

342 ) -> Future[T_Retval]: ... 

343 

344 def start_task_soon( 

345 self, 

346 func: Callable[[Unpack[PosArgsT]], Awaitable[T_Retval] | T_Retval], 

347 *args: Unpack[PosArgsT], 

348 name: object = None, 

349 ) -> Future[T_Retval]: 

350 """ 

351 Start a task in the portal's task group. 

352 

353 The task will be run inside a cancel scope which can be cancelled by cancelling 

354 the returned future. 

355 

356 :param func: the target function 

357 :param args: positional arguments passed to ``func`` 

358 :param name: name of the task (will be coerced to a string if not ``None``) 

359 :return: a future that resolves with the return value of the callable if the 

360 task completes successfully, or with the exception raised in the task 

361 :raises RuntimeError: if the portal is not running or if this method is called 

362 from within the event loop thread 

363 :rtype: concurrent.futures.Future[T_Retval] 

364 

365 .. versionadded:: 3.0 

366 

367 """ 

368 self._check_running() 

369 f: Future[T_Retval] = Future() 

370 self._spawn_task_from_thread(func, args, {}, name, f) 

371 return f 

372 

373 def start_task( 

374 self, 

375 func: Callable[..., Awaitable[T_Retval]], 

376 *args: object, 

377 name: object = None, 

378 ) -> tuple[Future[T_Retval], Any]: 

379 """ 

380 Start a task in the portal's task group and wait until it signals for readiness. 

381 

382 This method works the same way as :meth:`.abc.TaskGroup.start`. 

383 

384 :param func: the target function 

385 :param args: positional arguments passed to ``func`` 

386 :param name: name of the task (will be coerced to a string if not ``None``) 

387 :return: a tuple of (future, task_status_value) where the ``task_status_value`` 

388 is the value passed to ``task_status.started()`` from within the target 

389 function 

390 :rtype: tuple[concurrent.futures.Future[T_Retval], Any] 

391 

392 .. versionadded:: 3.0 

393 

394 """ 

395 

396 def task_done(future: Future[T_Retval]) -> None: 

397 if not task_status_future.done(): 

398 if future.cancelled(): 

399 task_status_future.cancel() 

400 elif future.exception(): 

401 task_status_future.set_exception(future.exception()) 

402 else: 

403 exc = RuntimeError( 

404 "Task exited without calling task_status.started()" 

405 ) 

406 task_status_future.set_exception(exc) 

407 

408 self._check_running() 

409 task_status_future: Future = Future() 

410 task_status = _BlockingPortalTaskStatus(task_status_future) 

411 f: Future = Future() 

412 f.add_done_callback(task_done) 

413 self._spawn_task_from_thread(func, args, {"task_status": task_status}, name, f) 

414 return f, task_status_future.result() 

415 

416 def wrap_async_context_manager( 

417 self, cm: AbstractAsyncContextManager[T_co] 

418 ) -> AbstractContextManager[T_co]: 

419 """ 

420 Wrap an async context manager as a synchronous context manager via this portal. 

421 

422 Spawns a task that will call both ``__aenter__()`` and ``__aexit__()``, stopping 

423 in the middle until the synchronous context manager exits. 

424 

425 :param cm: an asynchronous context manager 

426 :return: a synchronous context manager 

427 

428 .. versionadded:: 2.1 

429 

430 """ 

431 return _BlockingAsyncContextManager(cm, self) 

432 

433 

434@dataclass 

435class BlockingPortalProvider: 

436 """ 

437 A manager for a blocking portal. Used as a context manager. The first thread to 

438 enter this context manager causes a blocking portal to be started with the specific 

439 parameters, and the last thread to exit causes the portal to be shut down. Thus, 

440 there will be exactly one blocking portal running in this context as long as at 

441 least one thread has entered this context manager. 

442 

443 The parameters are the same as for :func:`~anyio.run`. 

444 

445 :param backend: name of the backend 

446 :param backend_options: backend options 

447 

448 .. versionadded:: 4.4 

449 """ 

450 

451 backend: str = "asyncio" 

452 backend_options: dict[str, Any] | None = None 

453 _lock: Lock = field(init=False, default_factory=Lock) 

454 _leases: int = field(init=False, default=0) 

455 _portal: BlockingPortal = field(init=False) 

456 _portal_cm: AbstractContextManager[BlockingPortal] | None = field( 

457 init=False, default=None 

458 ) 

459 

460 def __enter__(self) -> BlockingPortal: 

461 with self._lock: 

462 if self._portal_cm is None: 

463 self._portal_cm = start_blocking_portal( 

464 self.backend, self.backend_options 

465 ) 

466 self._portal = self._portal_cm.__enter__() 

467 

468 self._leases += 1 

469 return self._portal 

470 

471 def __exit__( 

472 self, 

473 exc_type: type[BaseException] | None, 

474 exc_val: BaseException | None, 

475 exc_tb: TracebackType | None, 

476 ) -> None: 

477 portal_cm: AbstractContextManager[BlockingPortal] | None = None 

478 with self._lock: 

479 assert self._portal_cm 

480 assert self._leases > 0 

481 self._leases -= 1 

482 if not self._leases: 

483 portal_cm = self._portal_cm 

484 self._portal_cm = None 

485 del self._portal 

486 

487 if portal_cm: 

488 portal_cm.__exit__(None, None, None) 

489 

490 

491@contextmanager 

492def start_blocking_portal( 

493 backend: str = "asyncio", 

494 backend_options: dict[str, Any] | None = None, 

495 *, 

496 name: str | None = None, 

497) -> Generator[BlockingPortal, Any, None]: 

498 """ 

499 Start a new event loop in a new thread and run a blocking portal in its main task. 

500 

501 The parameters are the same as for :func:`~anyio.run`. 

502 

503 :param backend: name of the backend 

504 :param backend_options: backend options 

505 :param name: name of the thread 

506 :return: a context manager that yields a blocking portal 

507 

508 .. versionchanged:: 3.0 

509 Usage as a context manager is now required. 

510 

511 """ 

512 

513 async def run_portal() -> None: 

514 async with BlockingPortal() as portal_: 

515 if name is None: 

516 current_thread().name = f"{backend}-portal-{id(portal_):x}" 

517 

518 future.set_result(portal_) 

519 await portal_.sleep_until_stopped() 

520 

521 def run_blocking_portal() -> None: 

522 if future.set_running_or_notify_cancel(): 

523 try: 

524 run_eventloop( 

525 run_portal, backend=backend, backend_options=backend_options 

526 ) 

527 except BaseException as exc: 

528 if not future.done(): 

529 future.set_exception(exc) 

530 

531 future: Future[BlockingPortal] = Future() 

532 thread = Thread(target=run_blocking_portal, daemon=True, name=name) 

533 thread.start() 

534 try: 

535 cancel_remaining_tasks = False 

536 portal = future.result() 

537 try: 

538 yield portal 

539 except BaseException: 

540 cancel_remaining_tasks = True 

541 raise 

542 finally: 

543 try: 

544 portal.call(portal.stop, cancel_remaining_tasks) 

545 except RuntimeError: 

546 pass 

547 finally: 

548 thread.join() 

549 

550 

551def check_cancelled() -> None: 

552 """ 

553 Check if the cancel scope of the host task's running the current worker thread has 

554 been cancelled. 

555 

556 If the host task's current cancel scope has indeed been cancelled, the 

557 backend-specific cancellation exception will be raised. 

558 

559 :raises RuntimeError: if the current thread was not spawned by 

560 :func:`.to_thread.run_sync` 

561 

562 """ 

563 try: 

564 token: EventLoopToken = threadlocals.current_token 

565 except AttributeError: 

566 raise NoEventLoopError( 

567 "This function can only be called inside an AnyIO worker thread" 

568 ) from None 

569 

570 token.backend_class.check_cancelled()