Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/anyio/from_thread.py: 32%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

216 statements  

1from __future__ import annotations 

2 

3import sys 

4import threading 

5from collections.abc import Awaitable, Callable, Generator 

6from concurrent.futures import FIRST_COMPLETED, Future, ThreadPoolExecutor, wait 

7from contextlib import AbstractContextManager, contextmanager 

8from dataclasses import dataclass, field 

9from inspect import isawaitable 

10from types import TracebackType 

11from typing import ( 

12 Any, 

13 AsyncContextManager, 

14 ContextManager, 

15 Generic, 

16 Iterable, 

17 TypeVar, 

18 cast, 

19 overload, 

20) 

21 

22from ._core import _eventloop 

23from ._core._eventloop import get_async_backend, get_cancelled_exc_class, threadlocals 

24from ._core._synchronization import Event 

25from ._core._tasks import CancelScope, create_task_group 

26from .abc import AsyncBackend 

27from .abc._tasks import TaskStatus 

28 

29if sys.version_info >= (3, 11): 

30 from typing import TypeVarTuple, Unpack 

31else: 

32 from typing_extensions import TypeVarTuple, Unpack 

33 

34T_Retval = TypeVar("T_Retval") 

35T_co = TypeVar("T_co", covariant=True) 

36PosArgsT = TypeVarTuple("PosArgsT") 

37 

38 

39def run( 

40 func: Callable[[Unpack[PosArgsT]], Awaitable[T_Retval]], *args: Unpack[PosArgsT] 

41) -> T_Retval: 

42 """ 

43 Call a coroutine function from a worker thread. 

44 

45 :param func: a coroutine function 

46 :param args: positional arguments for the callable 

47 :return: the return value of the coroutine function 

48 

49 """ 

50 try: 

51 async_backend = threadlocals.current_async_backend 

52 token = threadlocals.current_token 

53 except AttributeError: 

54 raise RuntimeError( 

55 "This function can only be run from an AnyIO worker thread" 

56 ) from None 

57 

58 return async_backend.run_async_from_thread(func, args, token=token) 

59 

60 

61def run_sync( 

62 func: Callable[[Unpack[PosArgsT]], T_Retval], *args: Unpack[PosArgsT] 

63) -> T_Retval: 

64 """ 

65 Call a function in the event loop thread from a worker thread. 

66 

67 :param func: a callable 

68 :param args: positional arguments for the callable 

69 :return: the return value of the callable 

70 

71 """ 

72 try: 

73 async_backend = threadlocals.current_async_backend 

74 token = threadlocals.current_token 

75 except AttributeError: 

76 raise RuntimeError( 

77 "This function can only be run from an AnyIO worker thread" 

78 ) from None 

79 

80 return async_backend.run_sync_from_thread(func, args, token=token) 

81 

82 

83class _BlockingAsyncContextManager(Generic[T_co], AbstractContextManager): 

84 _enter_future: Future[T_co] 

85 _exit_future: Future[bool | None] 

86 _exit_event: Event 

87 _exit_exc_info: tuple[ 

88 type[BaseException] | None, BaseException | None, TracebackType | None 

89 ] = (None, None, None) 

90 

91 def __init__(self, async_cm: AsyncContextManager[T_co], portal: BlockingPortal): 

92 self._async_cm = async_cm 

93 self._portal = portal 

94 

95 async def run_async_cm(self) -> bool | None: 

96 try: 

97 self._exit_event = Event() 

98 value = await self._async_cm.__aenter__() 

99 except BaseException as exc: 

100 self._enter_future.set_exception(exc) 

101 raise 

102 else: 

103 self._enter_future.set_result(value) 

104 

105 try: 

106 # Wait for the sync context manager to exit. 

107 # This next statement can raise `get_cancelled_exc_class()` if 

108 # something went wrong in a task group in this async context 

109 # manager. 

110 await self._exit_event.wait() 

111 finally: 

112 # In case of cancellation, it could be that we end up here before 

113 # `_BlockingAsyncContextManager.__exit__` is called, and an 

114 # `_exit_exc_info` has been set. 

115 result = await self._async_cm.__aexit__(*self._exit_exc_info) 

116 return result 

117 

118 def __enter__(self) -> T_co: 

119 self._enter_future = Future() 

120 self._exit_future = self._portal.start_task_soon(self.run_async_cm) 

121 return self._enter_future.result() 

122 

123 def __exit__( 

124 self, 

125 __exc_type: type[BaseException] | None, 

126 __exc_value: BaseException | None, 

127 __traceback: TracebackType | None, 

128 ) -> bool | None: 

129 self._exit_exc_info = __exc_type, __exc_value, __traceback 

130 self._portal.call(self._exit_event.set) 

131 return self._exit_future.result() 

132 

133 

134class _BlockingPortalTaskStatus(TaskStatus): 

135 def __init__(self, future: Future): 

136 self._future = future 

137 

138 def started(self, value: object = None) -> None: 

139 self._future.set_result(value) 

140 

141 

142class BlockingPortal: 

143 """An object that lets external threads run code in an asynchronous event loop.""" 

144 

145 def __new__(cls) -> BlockingPortal: 

146 return get_async_backend().create_blocking_portal() 

147 

148 def __init__(self) -> None: 

149 self._event_loop_thread_id: int | None = threading.get_ident() 

150 self._stop_event = Event() 

151 self._task_group = create_task_group() 

152 self._cancelled_exc_class = get_cancelled_exc_class() 

153 

154 async def __aenter__(self) -> BlockingPortal: 

155 await self._task_group.__aenter__() 

156 return self 

157 

158 async def __aexit__( 

159 self, 

160 exc_type: type[BaseException] | None, 

161 exc_val: BaseException | None, 

162 exc_tb: TracebackType | None, 

163 ) -> bool | None: 

164 await self.stop() 

165 return await self._task_group.__aexit__(exc_type, exc_val, exc_tb) 

166 

167 def _check_running(self) -> None: 

168 if self._event_loop_thread_id is None: 

169 raise RuntimeError("This portal is not running") 

170 if self._event_loop_thread_id == threading.get_ident(): 

171 raise RuntimeError( 

172 "This method cannot be called from the event loop thread" 

173 ) 

174 

175 async def sleep_until_stopped(self) -> None: 

176 """Sleep until :meth:`stop` is called.""" 

177 await self._stop_event.wait() 

178 

179 async def stop(self, cancel_remaining: bool = False) -> None: 

180 """ 

181 Signal the portal to shut down. 

182 

183 This marks the portal as no longer accepting new calls and exits from 

184 :meth:`sleep_until_stopped`. 

185 

186 :param cancel_remaining: ``True`` to cancel all the remaining tasks, ``False`` 

187 to let them finish before returning 

188 

189 """ 

190 self._event_loop_thread_id = None 

191 self._stop_event.set() 

192 if cancel_remaining: 

193 self._task_group.cancel_scope.cancel() 

194 

195 async def _call_func( 

196 self, 

197 func: Callable[[Unpack[PosArgsT]], Awaitable[T_Retval] | T_Retval], 

198 args: tuple[Unpack[PosArgsT]], 

199 kwargs: dict[str, Any], 

200 future: Future[T_Retval], 

201 ) -> None: 

202 def callback(f: Future[T_Retval]) -> None: 

203 if f.cancelled() and self._event_loop_thread_id not in ( 

204 None, 

205 threading.get_ident(), 

206 ): 

207 self.call(scope.cancel) 

208 

209 try: 

210 retval_or_awaitable = func(*args, **kwargs) 

211 if isawaitable(retval_or_awaitable): 

212 with CancelScope() as scope: 

213 if future.cancelled(): 

214 scope.cancel() 

215 else: 

216 future.add_done_callback(callback) 

217 

218 retval = await retval_or_awaitable 

219 else: 

220 retval = retval_or_awaitable 

221 except self._cancelled_exc_class: 

222 future.cancel() 

223 future.set_running_or_notify_cancel() 

224 except BaseException as exc: 

225 if not future.cancelled(): 

226 future.set_exception(exc) 

227 

228 # Let base exceptions fall through 

229 if not isinstance(exc, Exception): 

230 raise 

231 else: 

232 if not future.cancelled(): 

233 future.set_result(retval) 

234 finally: 

235 scope = None # type: ignore[assignment] 

236 

237 def _spawn_task_from_thread( 

238 self, 

239 func: Callable[[Unpack[PosArgsT]], Awaitable[T_Retval] | T_Retval], 

240 args: tuple[Unpack[PosArgsT]], 

241 kwargs: dict[str, Any], 

242 name: object, 

243 future: Future[T_Retval], 

244 ) -> None: 

245 """ 

246 Spawn a new task using the given callable. 

247 

248 Implementors must ensure that the future is resolved when the task finishes. 

249 

250 :param func: a callable 

251 :param args: positional arguments to be passed to the callable 

252 :param kwargs: keyword arguments to be passed to the callable 

253 :param name: name of the task (will be coerced to a string if not ``None``) 

254 :param future: a future that will resolve to the return value of the callable, 

255 or the exception raised during its execution 

256 

257 """ 

258 raise NotImplementedError 

259 

260 @overload 

261 def call( 

262 self, 

263 func: Callable[[Unpack[PosArgsT]], Awaitable[T_Retval]], 

264 *args: Unpack[PosArgsT], 

265 ) -> T_Retval: ... 

266 

267 @overload 

268 def call( 

269 self, func: Callable[[Unpack[PosArgsT]], T_Retval], *args: Unpack[PosArgsT] 

270 ) -> T_Retval: ... 

271 

272 def call( 

273 self, 

274 func: Callable[[Unpack[PosArgsT]], Awaitable[T_Retval] | T_Retval], 

275 *args: Unpack[PosArgsT], 

276 ) -> T_Retval: 

277 """ 

278 Call the given function in the event loop thread. 

279 

280 If the callable returns a coroutine object, it is awaited on. 

281 

282 :param func: any callable 

283 :raises RuntimeError: if the portal is not running or if this method is called 

284 from within the event loop thread 

285 

286 """ 

287 return cast(T_Retval, self.start_task_soon(func, *args).result()) 

288 

289 @overload 

290 def start_task_soon( 

291 self, 

292 func: Callable[[Unpack[PosArgsT]], Awaitable[T_Retval]], 

293 *args: Unpack[PosArgsT], 

294 name: object = None, 

295 ) -> Future[T_Retval]: ... 

296 

297 @overload 

298 def start_task_soon( 

299 self, 

300 func: Callable[[Unpack[PosArgsT]], T_Retval], 

301 *args: Unpack[PosArgsT], 

302 name: object = None, 

303 ) -> Future[T_Retval]: ... 

304 

305 def start_task_soon( 

306 self, 

307 func: Callable[[Unpack[PosArgsT]], Awaitable[T_Retval] | T_Retval], 

308 *args: Unpack[PosArgsT], 

309 name: object = None, 

310 ) -> Future[T_Retval]: 

311 """ 

312 Start a task in the portal's task group. 

313 

314 The task will be run inside a cancel scope which can be cancelled by cancelling 

315 the returned future. 

316 

317 :param func: the target function 

318 :param args: positional arguments passed to ``func`` 

319 :param name: name of the task (will be coerced to a string if not ``None``) 

320 :return: a future that resolves with the return value of the callable if the 

321 task completes successfully, or with the exception raised in the task 

322 :raises RuntimeError: if the portal is not running or if this method is called 

323 from within the event loop thread 

324 :rtype: concurrent.futures.Future[T_Retval] 

325 

326 .. versionadded:: 3.0 

327 

328 """ 

329 self._check_running() 

330 f: Future[T_Retval] = Future() 

331 self._spawn_task_from_thread(func, args, {}, name, f) 

332 return f 

333 

334 def start_task( 

335 self, 

336 func: Callable[..., Awaitable[T_Retval]], 

337 *args: object, 

338 name: object = None, 

339 ) -> tuple[Future[T_Retval], Any]: 

340 """ 

341 Start a task in the portal's task group and wait until it signals for readiness. 

342 

343 This method works the same way as :meth:`.abc.TaskGroup.start`. 

344 

345 :param func: the target function 

346 :param args: positional arguments passed to ``func`` 

347 :param name: name of the task (will be coerced to a string if not ``None``) 

348 :return: a tuple of (future, task_status_value) where the ``task_status_value`` 

349 is the value passed to ``task_status.started()`` from within the target 

350 function 

351 :rtype: tuple[concurrent.futures.Future[T_Retval], Any] 

352 

353 .. versionadded:: 3.0 

354 

355 """ 

356 

357 def task_done(future: Future[T_Retval]) -> None: 

358 if not task_status_future.done(): 

359 if future.cancelled(): 

360 task_status_future.cancel() 

361 elif future.exception(): 

362 task_status_future.set_exception(future.exception()) 

363 else: 

364 exc = RuntimeError( 

365 "Task exited without calling task_status.started()" 

366 ) 

367 task_status_future.set_exception(exc) 

368 

369 self._check_running() 

370 task_status_future: Future = Future() 

371 task_status = _BlockingPortalTaskStatus(task_status_future) 

372 f: Future = Future() 

373 f.add_done_callback(task_done) 

374 self._spawn_task_from_thread(func, args, {"task_status": task_status}, name, f) 

375 return f, task_status_future.result() 

376 

377 def wrap_async_context_manager( 

378 self, cm: AsyncContextManager[T_co] 

379 ) -> ContextManager[T_co]: 

380 """ 

381 Wrap an async context manager as a synchronous context manager via this portal. 

382 

383 Spawns a task that will call both ``__aenter__()`` and ``__aexit__()``, stopping 

384 in the middle until the synchronous context manager exits. 

385 

386 :param cm: an asynchronous context manager 

387 :return: a synchronous context manager 

388 

389 .. versionadded:: 2.1 

390 

391 """ 

392 return _BlockingAsyncContextManager(cm, self) 

393 

394 

395@dataclass 

396class BlockingPortalProvider: 

397 """ 

398 A manager for a blocking portal. Used as a context manager. The first thread to 

399 enter this context manager causes a blocking portal to be started with the specific 

400 parameters, and the last thread to exit causes the portal to be shut down. Thus, 

401 there will be exactly one blocking portal running in this context as long as at 

402 least one thread has entered this context manager. 

403 

404 The parameters are the same as for :func:`~anyio.run`. 

405 

406 :param backend: name of the backend 

407 :param backend_options: backend options 

408 

409 .. versionadded:: 4.4 

410 """ 

411 

412 backend: str = "asyncio" 

413 backend_options: dict[str, Any] | None = None 

414 _lock: threading.Lock = field(init=False, default_factory=threading.Lock) 

415 _leases: int = field(init=False, default=0) 

416 _portal: BlockingPortal = field(init=False) 

417 _portal_cm: AbstractContextManager[BlockingPortal] | None = field( 

418 init=False, default=None 

419 ) 

420 

421 def __enter__(self) -> BlockingPortal: 

422 with self._lock: 

423 if self._portal_cm is None: 

424 self._portal_cm = start_blocking_portal( 

425 self.backend, self.backend_options 

426 ) 

427 self._portal = self._portal_cm.__enter__() 

428 

429 self._leases += 1 

430 return self._portal 

431 

432 def __exit__( 

433 self, 

434 exc_type: type[BaseException] | None, 

435 exc_val: BaseException | None, 

436 exc_tb: TracebackType | None, 

437 ) -> None: 

438 portal_cm: AbstractContextManager[BlockingPortal] | None = None 

439 with self._lock: 

440 assert self._portal_cm 

441 assert self._leases > 0 

442 self._leases -= 1 

443 if not self._leases: 

444 portal_cm = self._portal_cm 

445 self._portal_cm = None 

446 del self._portal 

447 

448 if portal_cm: 

449 portal_cm.__exit__(None, None, None) 

450 

451 

452@contextmanager 

453def start_blocking_portal( 

454 backend: str = "asyncio", backend_options: dict[str, Any] | None = None 

455) -> Generator[BlockingPortal, Any, None]: 

456 """ 

457 Start a new event loop in a new thread and run a blocking portal in its main task. 

458 

459 The parameters are the same as for :func:`~anyio.run`. 

460 

461 :param backend: name of the backend 

462 :param backend_options: backend options 

463 :return: a context manager that yields a blocking portal 

464 

465 .. versionchanged:: 3.0 

466 Usage as a context manager is now required. 

467 

468 """ 

469 

470 async def run_portal() -> None: 

471 async with BlockingPortal() as portal_: 

472 if future.set_running_or_notify_cancel(): 

473 future.set_result(portal_) 

474 await portal_.sleep_until_stopped() 

475 

476 future: Future[BlockingPortal] = Future() 

477 with ThreadPoolExecutor(1) as executor: 

478 run_future = executor.submit( 

479 _eventloop.run, # type: ignore[arg-type] 

480 run_portal, 

481 backend=backend, 

482 backend_options=backend_options, 

483 ) 

484 try: 

485 wait( 

486 cast(Iterable[Future], [run_future, future]), 

487 return_when=FIRST_COMPLETED, 

488 ) 

489 except BaseException: 

490 future.cancel() 

491 run_future.cancel() 

492 raise 

493 

494 if future.done(): 

495 portal = future.result() 

496 cancel_remaining_tasks = False 

497 try: 

498 yield portal 

499 except BaseException: 

500 cancel_remaining_tasks = True 

501 raise 

502 finally: 

503 try: 

504 portal.call(portal.stop, cancel_remaining_tasks) 

505 except RuntimeError: 

506 pass 

507 

508 run_future.result() 

509 

510 

511def check_cancelled() -> None: 

512 """ 

513 Check if the cancel scope of the host task's running the current worker thread has 

514 been cancelled. 

515 

516 If the host task's current cancel scope has indeed been cancelled, the 

517 backend-specific cancellation exception will be raised. 

518 

519 :raises RuntimeError: if the current thread was not spawned by 

520 :func:`.to_thread.run_sync` 

521 

522 """ 

523 try: 

524 async_backend: AsyncBackend = threadlocals.current_async_backend 

525 except AttributeError: 

526 raise RuntimeError( 

527 "This function can only be run from an AnyIO worker thread" 

528 ) from None 

529 

530 async_backend.check_cancelled()