Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/anyio/from_thread.py: 32%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

216 statements  

1from __future__ import annotations 

2 

3import sys 

4from collections.abc import Awaitable, Callable, Generator 

5from concurrent.futures import Future 

6from contextlib import AbstractContextManager, contextmanager 

7from dataclasses import dataclass, field 

8from inspect import isawaitable 

9from threading import Lock, Thread, get_ident 

10from types import TracebackType 

11from typing import ( 

12 Any, 

13 AsyncContextManager, 

14 ContextManager, 

15 Generic, 

16 TypeVar, 

17 cast, 

18 overload, 

19) 

20 

21from ._core import _eventloop 

22from ._core._eventloop import get_async_backend, get_cancelled_exc_class, threadlocals 

23from ._core._synchronization import Event 

24from ._core._tasks import CancelScope, create_task_group 

25from .abc import AsyncBackend 

26from .abc._tasks import TaskStatus 

27 

28if sys.version_info >= (3, 11): 

29 from typing import TypeVarTuple, Unpack 

30else: 

31 from typing_extensions import TypeVarTuple, Unpack 

32 

33T_Retval = TypeVar("T_Retval") 

34T_co = TypeVar("T_co", covariant=True) 

35PosArgsT = TypeVarTuple("PosArgsT") 

36 

37 

38def run( 

39 func: Callable[[Unpack[PosArgsT]], Awaitable[T_Retval]], *args: Unpack[PosArgsT] 

40) -> T_Retval: 

41 """ 

42 Call a coroutine function from a worker thread. 

43 

44 :param func: a coroutine function 

45 :param args: positional arguments for the callable 

46 :return: the return value of the coroutine function 

47 

48 """ 

49 try: 

50 async_backend = threadlocals.current_async_backend 

51 token = threadlocals.current_token 

52 except AttributeError: 

53 raise RuntimeError( 

54 "This function can only be run from an AnyIO worker thread" 

55 ) from None 

56 

57 return async_backend.run_async_from_thread(func, args, token=token) 

58 

59 

60def run_sync( 

61 func: Callable[[Unpack[PosArgsT]], T_Retval], *args: Unpack[PosArgsT] 

62) -> T_Retval: 

63 """ 

64 Call a function in the event loop thread from a worker thread. 

65 

66 :param func: a callable 

67 :param args: positional arguments for the callable 

68 :return: the return value of the callable 

69 

70 """ 

71 try: 

72 async_backend = threadlocals.current_async_backend 

73 token = threadlocals.current_token 

74 except AttributeError: 

75 raise RuntimeError( 

76 "This function can only be run from an AnyIO worker thread" 

77 ) from None 

78 

79 return async_backend.run_sync_from_thread(func, args, token=token) 

80 

81 

82class _BlockingAsyncContextManager(Generic[T_co], AbstractContextManager): 

83 _enter_future: Future[T_co] 

84 _exit_future: Future[bool | None] 

85 _exit_event: Event 

86 _exit_exc_info: tuple[ 

87 type[BaseException] | None, BaseException | None, TracebackType | None 

88 ] = (None, None, None) 

89 

90 def __init__(self, async_cm: AsyncContextManager[T_co], portal: BlockingPortal): 

91 self._async_cm = async_cm 

92 self._portal = portal 

93 

94 async def run_async_cm(self) -> bool | None: 

95 try: 

96 self._exit_event = Event() 

97 value = await self._async_cm.__aenter__() 

98 except BaseException as exc: 

99 self._enter_future.set_exception(exc) 

100 raise 

101 else: 

102 self._enter_future.set_result(value) 

103 

104 try: 

105 # Wait for the sync context manager to exit. 

106 # This next statement can raise `get_cancelled_exc_class()` if 

107 # something went wrong in a task group in this async context 

108 # manager. 

109 await self._exit_event.wait() 

110 finally: 

111 # In case of cancellation, it could be that we end up here before 

112 # `_BlockingAsyncContextManager.__exit__` is called, and an 

113 # `_exit_exc_info` has been set. 

114 result = await self._async_cm.__aexit__(*self._exit_exc_info) 

115 return result 

116 

117 def __enter__(self) -> T_co: 

118 self._enter_future = Future() 

119 self._exit_future = self._portal.start_task_soon(self.run_async_cm) 

120 return self._enter_future.result() 

121 

122 def __exit__( 

123 self, 

124 __exc_type: type[BaseException] | None, 

125 __exc_value: BaseException | None, 

126 __traceback: TracebackType | None, 

127 ) -> bool | None: 

128 self._exit_exc_info = __exc_type, __exc_value, __traceback 

129 self._portal.call(self._exit_event.set) 

130 return self._exit_future.result() 

131 

132 

133class _BlockingPortalTaskStatus(TaskStatus): 

134 def __init__(self, future: Future): 

135 self._future = future 

136 

137 def started(self, value: object = None) -> None: 

138 self._future.set_result(value) 

139 

140 

141class BlockingPortal: 

142 """An object that lets external threads run code in an asynchronous event loop.""" 

143 

144 def __new__(cls) -> BlockingPortal: 

145 return get_async_backend().create_blocking_portal() 

146 

147 def __init__(self) -> None: 

148 self._event_loop_thread_id: int | None = get_ident() 

149 self._stop_event = Event() 

150 self._task_group = create_task_group() 

151 self._cancelled_exc_class = get_cancelled_exc_class() 

152 

153 async def __aenter__(self) -> BlockingPortal: 

154 await self._task_group.__aenter__() 

155 return self 

156 

157 async def __aexit__( 

158 self, 

159 exc_type: type[BaseException] | None, 

160 exc_val: BaseException | None, 

161 exc_tb: TracebackType | None, 

162 ) -> bool | None: 

163 await self.stop() 

164 return await self._task_group.__aexit__(exc_type, exc_val, exc_tb) 

165 

166 def _check_running(self) -> None: 

167 if self._event_loop_thread_id is None: 

168 raise RuntimeError("This portal is not running") 

169 if self._event_loop_thread_id == get_ident(): 

170 raise RuntimeError( 

171 "This method cannot be called from the event loop thread" 

172 ) 

173 

174 async def sleep_until_stopped(self) -> None: 

175 """Sleep until :meth:`stop` is called.""" 

176 await self._stop_event.wait() 

177 

178 async def stop(self, cancel_remaining: bool = False) -> None: 

179 """ 

180 Signal the portal to shut down. 

181 

182 This marks the portal as no longer accepting new calls and exits from 

183 :meth:`sleep_until_stopped`. 

184 

185 :param cancel_remaining: ``True`` to cancel all the remaining tasks, ``False`` 

186 to let them finish before returning 

187 

188 """ 

189 self._event_loop_thread_id = None 

190 self._stop_event.set() 

191 if cancel_remaining: 

192 self._task_group.cancel_scope.cancel() 

193 

194 async def _call_func( 

195 self, 

196 func: Callable[[Unpack[PosArgsT]], Awaitable[T_Retval] | T_Retval], 

197 args: tuple[Unpack[PosArgsT]], 

198 kwargs: dict[str, Any], 

199 future: Future[T_Retval], 

200 ) -> None: 

201 def callback(f: Future[T_Retval]) -> None: 

202 if f.cancelled() and self._event_loop_thread_id not in ( 

203 None, 

204 get_ident(), 

205 ): 

206 self.call(scope.cancel) 

207 

208 try: 

209 retval_or_awaitable = func(*args, **kwargs) 

210 if isawaitable(retval_or_awaitable): 

211 with CancelScope() as scope: 

212 if future.cancelled(): 

213 scope.cancel() 

214 else: 

215 future.add_done_callback(callback) 

216 

217 retval = await retval_or_awaitable 

218 else: 

219 retval = retval_or_awaitable 

220 except self._cancelled_exc_class: 

221 future.cancel() 

222 future.set_running_or_notify_cancel() 

223 except BaseException as exc: 

224 if not future.cancelled(): 

225 future.set_exception(exc) 

226 

227 # Let base exceptions fall through 

228 if not isinstance(exc, Exception): 

229 raise 

230 else: 

231 if not future.cancelled(): 

232 future.set_result(retval) 

233 finally: 

234 scope = None # type: ignore[assignment] 

235 

236 def _spawn_task_from_thread( 

237 self, 

238 func: Callable[[Unpack[PosArgsT]], Awaitable[T_Retval] | T_Retval], 

239 args: tuple[Unpack[PosArgsT]], 

240 kwargs: dict[str, Any], 

241 name: object, 

242 future: Future[T_Retval], 

243 ) -> None: 

244 """ 

245 Spawn a new task using the given callable. 

246 

247 Implementors must ensure that the future is resolved when the task finishes. 

248 

249 :param func: a callable 

250 :param args: positional arguments to be passed to the callable 

251 :param kwargs: keyword arguments to be passed to the callable 

252 :param name: name of the task (will be coerced to a string if not ``None``) 

253 :param future: a future that will resolve to the return value of the callable, 

254 or the exception raised during its execution 

255 

256 """ 

257 raise NotImplementedError 

258 

259 @overload 

260 def call( 

261 self, 

262 func: Callable[[Unpack[PosArgsT]], Awaitable[T_Retval]], 

263 *args: Unpack[PosArgsT], 

264 ) -> T_Retval: ... 

265 

266 @overload 

267 def call( 

268 self, func: Callable[[Unpack[PosArgsT]], T_Retval], *args: Unpack[PosArgsT] 

269 ) -> T_Retval: ... 

270 

271 def call( 

272 self, 

273 func: Callable[[Unpack[PosArgsT]], Awaitable[T_Retval] | T_Retval], 

274 *args: Unpack[PosArgsT], 

275 ) -> T_Retval: 

276 """ 

277 Call the given function in the event loop thread. 

278 

279 If the callable returns a coroutine object, it is awaited on. 

280 

281 :param func: any callable 

282 :raises RuntimeError: if the portal is not running or if this method is called 

283 from within the event loop thread 

284 

285 """ 

286 return cast(T_Retval, self.start_task_soon(func, *args).result()) 

287 

288 @overload 

289 def start_task_soon( 

290 self, 

291 func: Callable[[Unpack[PosArgsT]], Awaitable[T_Retval]], 

292 *args: Unpack[PosArgsT], 

293 name: object = None, 

294 ) -> Future[T_Retval]: ... 

295 

296 @overload 

297 def start_task_soon( 

298 self, 

299 func: Callable[[Unpack[PosArgsT]], T_Retval], 

300 *args: Unpack[PosArgsT], 

301 name: object = None, 

302 ) -> Future[T_Retval]: ... 

303 

304 def start_task_soon( 

305 self, 

306 func: Callable[[Unpack[PosArgsT]], Awaitable[T_Retval] | T_Retval], 

307 *args: Unpack[PosArgsT], 

308 name: object = None, 

309 ) -> Future[T_Retval]: 

310 """ 

311 Start a task in the portal's task group. 

312 

313 The task will be run inside a cancel scope which can be cancelled by cancelling 

314 the returned future. 

315 

316 :param func: the target function 

317 :param args: positional arguments passed to ``func`` 

318 :param name: name of the task (will be coerced to a string if not ``None``) 

319 :return: a future that resolves with the return value of the callable if the 

320 task completes successfully, or with the exception raised in the task 

321 :raises RuntimeError: if the portal is not running or if this method is called 

322 from within the event loop thread 

323 :rtype: concurrent.futures.Future[T_Retval] 

324 

325 .. versionadded:: 3.0 

326 

327 """ 

328 self._check_running() 

329 f: Future[T_Retval] = Future() 

330 self._spawn_task_from_thread(func, args, {}, name, f) 

331 return f 

332 

333 def start_task( 

334 self, 

335 func: Callable[..., Awaitable[T_Retval]], 

336 *args: object, 

337 name: object = None, 

338 ) -> tuple[Future[T_Retval], Any]: 

339 """ 

340 Start a task in the portal's task group and wait until it signals for readiness. 

341 

342 This method works the same way as :meth:`.abc.TaskGroup.start`. 

343 

344 :param func: the target function 

345 :param args: positional arguments passed to ``func`` 

346 :param name: name of the task (will be coerced to a string if not ``None``) 

347 :return: a tuple of (future, task_status_value) where the ``task_status_value`` 

348 is the value passed to ``task_status.started()`` from within the target 

349 function 

350 :rtype: tuple[concurrent.futures.Future[T_Retval], Any] 

351 

352 .. versionadded:: 3.0 

353 

354 """ 

355 

356 def task_done(future: Future[T_Retval]) -> None: 

357 if not task_status_future.done(): 

358 if future.cancelled(): 

359 task_status_future.cancel() 

360 elif future.exception(): 

361 task_status_future.set_exception(future.exception()) 

362 else: 

363 exc = RuntimeError( 

364 "Task exited without calling task_status.started()" 

365 ) 

366 task_status_future.set_exception(exc) 

367 

368 self._check_running() 

369 task_status_future: Future = Future() 

370 task_status = _BlockingPortalTaskStatus(task_status_future) 

371 f: Future = Future() 

372 f.add_done_callback(task_done) 

373 self._spawn_task_from_thread(func, args, {"task_status": task_status}, name, f) 

374 return f, task_status_future.result() 

375 

376 def wrap_async_context_manager( 

377 self, cm: AsyncContextManager[T_co] 

378 ) -> ContextManager[T_co]: 

379 """ 

380 Wrap an async context manager as a synchronous context manager via this portal. 

381 

382 Spawns a task that will call both ``__aenter__()`` and ``__aexit__()``, stopping 

383 in the middle until the synchronous context manager exits. 

384 

385 :param cm: an asynchronous context manager 

386 :return: a synchronous context manager 

387 

388 .. versionadded:: 2.1 

389 

390 """ 

391 return _BlockingAsyncContextManager(cm, self) 

392 

393 

394@dataclass 

395class BlockingPortalProvider: 

396 """ 

397 A manager for a blocking portal. Used as a context manager. The first thread to 

398 enter this context manager causes a blocking portal to be started with the specific 

399 parameters, and the last thread to exit causes the portal to be shut down. Thus, 

400 there will be exactly one blocking portal running in this context as long as at 

401 least one thread has entered this context manager. 

402 

403 The parameters are the same as for :func:`~anyio.run`. 

404 

405 :param backend: name of the backend 

406 :param backend_options: backend options 

407 

408 .. versionadded:: 4.4 

409 """ 

410 

411 backend: str = "asyncio" 

412 backend_options: dict[str, Any] | None = None 

413 _lock: Lock = field(init=False, default_factory=Lock) 

414 _leases: int = field(init=False, default=0) 

415 _portal: BlockingPortal = field(init=False) 

416 _portal_cm: AbstractContextManager[BlockingPortal] | None = field( 

417 init=False, default=None 

418 ) 

419 

420 def __enter__(self) -> BlockingPortal: 

421 with self._lock: 

422 if self._portal_cm is None: 

423 self._portal_cm = start_blocking_portal( 

424 self.backend, self.backend_options 

425 ) 

426 self._portal = self._portal_cm.__enter__() 

427 

428 self._leases += 1 

429 return self._portal 

430 

431 def __exit__( 

432 self, 

433 exc_type: type[BaseException] | None, 

434 exc_val: BaseException | None, 

435 exc_tb: TracebackType | None, 

436 ) -> None: 

437 portal_cm: AbstractContextManager[BlockingPortal] | None = None 

438 with self._lock: 

439 assert self._portal_cm 

440 assert self._leases > 0 

441 self._leases -= 1 

442 if not self._leases: 

443 portal_cm = self._portal_cm 

444 self._portal_cm = None 

445 del self._portal 

446 

447 if portal_cm: 

448 portal_cm.__exit__(None, None, None) 

449 

450 

451@contextmanager 

452def start_blocking_portal( 

453 backend: str = "asyncio", backend_options: dict[str, Any] | None = None 

454) -> Generator[BlockingPortal, Any, None]: 

455 """ 

456 Start a new event loop in a new thread and run a blocking portal in its main task. 

457 

458 The parameters are the same as for :func:`~anyio.run`. 

459 

460 :param backend: name of the backend 

461 :param backend_options: backend options 

462 :return: a context manager that yields a blocking portal 

463 

464 .. versionchanged:: 3.0 

465 Usage as a context manager is now required. 

466 

467 """ 

468 

469 async def run_portal() -> None: 

470 async with BlockingPortal() as portal_: 

471 future.set_result(portal_) 

472 await portal_.sleep_until_stopped() 

473 

474 def run_blocking_portal() -> None: 

475 if future.set_running_or_notify_cancel(): 

476 try: 

477 _eventloop.run( 

478 run_portal, backend=backend, backend_options=backend_options 

479 ) 

480 except BaseException as exc: 

481 if not future.done(): 

482 future.set_exception(exc) 

483 

484 future: Future[BlockingPortal] = Future() 

485 thread = Thread(target=run_blocking_portal, daemon=True) 

486 thread.start() 

487 try: 

488 cancel_remaining_tasks = False 

489 portal = future.result() 

490 try: 

491 yield portal 

492 except BaseException: 

493 cancel_remaining_tasks = True 

494 raise 

495 finally: 

496 try: 

497 portal.call(portal.stop, cancel_remaining_tasks) 

498 except RuntimeError: 

499 pass 

500 finally: 

501 thread.join() 

502 

503 

504def check_cancelled() -> None: 

505 """ 

506 Check if the cancel scope of the host task's running the current worker thread has 

507 been cancelled. 

508 

509 If the host task's current cancel scope has indeed been cancelled, the 

510 backend-specific cancellation exception will be raised. 

511 

512 :raises RuntimeError: if the current thread was not spawned by 

513 :func:`.to_thread.run_sync` 

514 

515 """ 

516 try: 

517 async_backend: AsyncBackend = threadlocals.current_async_backend 

518 except AttributeError: 

519 raise RuntimeError( 

520 "This function can only be run from an AnyIO worker thread" 

521 ) from None 

522 

523 async_backend.check_cancelled()