Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/anyio/from_thread.py: 33%
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1from __future__ import annotations
3__all__ = (
4 "BlockingPortal",
5 "BlockingPortalProvider",
6 "check_cancelled",
7 "run",
8 "run_sync",
9 "start_blocking_portal",
10)
12import sys
13from collections.abc import Awaitable, Callable, Generator
14from concurrent.futures import Future
15from contextlib import (
16 AbstractAsyncContextManager,
17 AbstractContextManager,
18 contextmanager,
19)
20from dataclasses import dataclass, field
21from inspect import isawaitable
22from threading import Lock, Thread, current_thread, get_ident
23from types import TracebackType
24from typing import (
25 Any,
26 Generic,
27 TypeVar,
28 cast,
29 overload,
30)
32from ._core._eventloop import (
33 get_async_backend,
34 get_cancelled_exc_class,
35 threadlocals,
36)
37from ._core._eventloop import run as run_eventloop
38from ._core._exceptions import NoEventLoopError
39from ._core._synchronization import Event
40from ._core._tasks import CancelScope, create_task_group
41from .abc._tasks import TaskStatus
42from .lowlevel import EventLoopToken
44if sys.version_info >= (3, 11):
45 from typing import TypeVarTuple, Unpack
46else:
47 from typing_extensions import TypeVarTuple, Unpack
49T_Retval = TypeVar("T_Retval")
50T_co = TypeVar("T_co", covariant=True)
51PosArgsT = TypeVarTuple("PosArgsT")
54def _token_or_error(token: EventLoopToken | None) -> EventLoopToken:
55 if token is not None:
56 return token
58 try:
59 return threadlocals.current_token
60 except AttributeError:
61 raise NoEventLoopError(
62 "Not running inside an AnyIO worker thread, and no event loop token was "
63 "provided"
64 ) from None
67def run(
68 func: Callable[[Unpack[PosArgsT]], Awaitable[T_Retval]],
69 *args: Unpack[PosArgsT],
70 token: EventLoopToken | None = None,
71) -> T_Retval:
72 """
73 Call a coroutine function from a worker thread.
75 :param func: a coroutine function
76 :param args: positional arguments for the callable
77 :param token: an event loop token to use to get back to the event loop thread
78 (required if calling this function from outside an AnyIO worker thread)
79 :return: the return value of the coroutine function
80 :raises MissingTokenError: if no token was provided and called from outside an
81 AnyIO worker thread
82 :raises RunFinishedError: if the event loop tied to ``token`` is no longer running
84 .. versionchanged:: 4.11.0
85 Added the ``token`` parameter.
87 """
88 explicit_token = token is not None
89 token = _token_or_error(token)
90 return token.backend_class.run_async_from_thread(
91 func, args, token=token.native_token if explicit_token else None
92 )
95def run_sync(
96 func: Callable[[Unpack[PosArgsT]], T_Retval],
97 *args: Unpack[PosArgsT],
98 token: EventLoopToken | None = None,
99) -> T_Retval:
100 """
101 Call a function in the event loop thread from a worker thread.
103 :param func: a callable
104 :param args: positional arguments for the callable
105 :param token: an event loop token to use to get back to the event loop thread
106 (required if calling this function from outside an AnyIO worker thread)
107 :return: the return value of the callable
108 :raises MissingTokenError: if no token was provided and called from outside an
109 AnyIO worker thread
110 :raises RunFinishedError: if the event loop tied to ``token`` is no longer running
112 .. versionchanged:: 4.11.0
113 Added the ``token`` parameter.
115 """
116 explicit_token = token is not None
117 token = _token_or_error(token)
118 return token.backend_class.run_sync_from_thread(
119 func, args, token=token.native_token if explicit_token else None
120 )
123class _BlockingAsyncContextManager(Generic[T_co], AbstractContextManager):
124 _enter_future: Future[T_co]
125 _exit_future: Future[bool | None]
126 _exit_event: Event
127 _exit_exc_info: tuple[
128 type[BaseException] | None, BaseException | None, TracebackType | None
129 ] = (None, None, None)
131 def __init__(
132 self, async_cm: AbstractAsyncContextManager[T_co], portal: BlockingPortal
133 ):
134 self._async_cm = async_cm
135 self._portal = portal
137 async def run_async_cm(self) -> bool | None:
138 try:
139 self._exit_event = Event()
140 value = await self._async_cm.__aenter__()
141 except BaseException as exc:
142 self._enter_future.set_exception(exc)
143 raise
144 else:
145 self._enter_future.set_result(value)
147 try:
148 # Wait for the sync context manager to exit.
149 # This next statement can raise `get_cancelled_exc_class()` if
150 # something went wrong in a task group in this async context
151 # manager.
152 await self._exit_event.wait()
153 finally:
154 # In case of cancellation, it could be that we end up here before
155 # `_BlockingAsyncContextManager.__exit__` is called, and an
156 # `_exit_exc_info` has been set.
157 result = await self._async_cm.__aexit__(*self._exit_exc_info)
159 return result
161 def __enter__(self) -> T_co:
162 self._enter_future = Future()
163 self._exit_future = self._portal.start_task_soon(self.run_async_cm)
164 return self._enter_future.result()
166 def __exit__(
167 self,
168 __exc_type: type[BaseException] | None,
169 __exc_value: BaseException | None,
170 __traceback: TracebackType | None,
171 ) -> bool | None:
172 self._exit_exc_info = __exc_type, __exc_value, __traceback
173 self._portal.call(self._exit_event.set)
174 return self._exit_future.result()
177class _BlockingPortalTaskStatus(TaskStatus):
178 def __init__(self, future: Future):
179 self._future = future
181 def started(self, value: object = None) -> None:
182 self._future.set_result(value)
185class BlockingPortal:
186 """An object that lets external threads run code in an asynchronous event loop."""
188 def __new__(cls) -> BlockingPortal:
189 return get_async_backend().create_blocking_portal()
191 def __init__(self) -> None:
192 self._event_loop_thread_id: int | None = get_ident()
193 self._stop_event = Event()
194 self._task_group = create_task_group()
195 self._cancelled_exc_class = get_cancelled_exc_class()
197 async def __aenter__(self) -> BlockingPortal:
198 await self._task_group.__aenter__()
199 return self
201 async def __aexit__(
202 self,
203 exc_type: type[BaseException] | None,
204 exc_val: BaseException | None,
205 exc_tb: TracebackType | None,
206 ) -> bool:
207 await self.stop()
208 return await self._task_group.__aexit__(exc_type, exc_val, exc_tb)
210 def _check_running(self) -> None:
211 if self._event_loop_thread_id is None:
212 raise RuntimeError("This portal is not running")
213 if self._event_loop_thread_id == get_ident():
214 raise RuntimeError(
215 "This method cannot be called from the event loop thread"
216 )
218 async def sleep_until_stopped(self) -> None:
219 """Sleep until :meth:`stop` is called."""
220 await self._stop_event.wait()
222 async def stop(self, cancel_remaining: bool = False) -> None:
223 """
224 Signal the portal to shut down.
226 This marks the portal as no longer accepting new calls and exits from
227 :meth:`sleep_until_stopped`.
229 :param cancel_remaining: ``True`` to cancel all the remaining tasks, ``False``
230 to let them finish before returning
232 """
233 self._event_loop_thread_id = None
234 self._stop_event.set()
235 if cancel_remaining:
236 self._task_group.cancel_scope.cancel("the blocking portal is shutting down")
238 async def _call_func(
239 self,
240 func: Callable[[Unpack[PosArgsT]], Awaitable[T_Retval] | T_Retval],
241 args: tuple[Unpack[PosArgsT]],
242 kwargs: dict[str, Any],
243 future: Future[T_Retval],
244 ) -> None:
245 def callback(f: Future[T_Retval]) -> None:
246 if f.cancelled():
247 if self._event_loop_thread_id == get_ident():
248 scope.cancel("the future was cancelled")
249 elif self._event_loop_thread_id is not None:
250 self.call(scope.cancel, "the future was cancelled")
252 try:
253 retval_or_awaitable = func(*args, **kwargs)
254 if isawaitable(retval_or_awaitable):
255 with CancelScope() as scope:
256 future.add_done_callback(callback)
257 retval = await retval_or_awaitable
258 else:
259 retval = retval_or_awaitable
260 except self._cancelled_exc_class:
261 future.cancel()
262 future.set_running_or_notify_cancel()
263 except BaseException as exc:
264 if not future.cancelled():
265 future.set_exception(exc)
267 # Let base exceptions fall through
268 if not isinstance(exc, Exception):
269 raise
270 else:
271 if not future.cancelled():
272 future.set_result(retval)
273 finally:
274 scope = None # type: ignore[assignment]
276 def _spawn_task_from_thread(
277 self,
278 func: Callable[[Unpack[PosArgsT]], Awaitable[T_Retval] | T_Retval],
279 args: tuple[Unpack[PosArgsT]],
280 kwargs: dict[str, Any],
281 name: object,
282 future: Future[T_Retval],
283 ) -> None:
284 """
285 Spawn a new task using the given callable.
287 Implementers must ensure that the future is resolved when the task finishes.
289 :param func: a callable
290 :param args: positional arguments to be passed to the callable
291 :param kwargs: keyword arguments to be passed to the callable
292 :param name: name of the task (will be coerced to a string if not ``None``)
293 :param future: a future that will resolve to the return value of the callable,
294 or the exception raised during its execution
296 """
297 raise NotImplementedError
299 @overload
300 def call(
301 self,
302 func: Callable[[Unpack[PosArgsT]], Awaitable[T_Retval]],
303 *args: Unpack[PosArgsT],
304 ) -> T_Retval: ...
306 @overload
307 def call(
308 self, func: Callable[[Unpack[PosArgsT]], T_Retval], *args: Unpack[PosArgsT]
309 ) -> T_Retval: ...
311 def call(
312 self,
313 func: Callable[[Unpack[PosArgsT]], Awaitable[T_Retval] | T_Retval],
314 *args: Unpack[PosArgsT],
315 ) -> T_Retval:
316 """
317 Call the given function in the event loop thread.
319 If the callable returns a coroutine object, it is awaited on.
321 :param func: any callable
322 :raises RuntimeError: if the portal is not running or if this method is called
323 from within the event loop thread
325 """
326 return cast(T_Retval, self.start_task_soon(func, *args).result())
328 @overload
329 def start_task_soon(
330 self,
331 func: Callable[[Unpack[PosArgsT]], Awaitable[T_Retval]],
332 *args: Unpack[PosArgsT],
333 name: object = None,
334 ) -> Future[T_Retval]: ...
336 @overload
337 def start_task_soon(
338 self,
339 func: Callable[[Unpack[PosArgsT]], T_Retval],
340 *args: Unpack[PosArgsT],
341 name: object = None,
342 ) -> Future[T_Retval]: ...
344 def start_task_soon(
345 self,
346 func: Callable[[Unpack[PosArgsT]], Awaitable[T_Retval] | T_Retval],
347 *args: Unpack[PosArgsT],
348 name: object = None,
349 ) -> Future[T_Retval]:
350 """
351 Start a task in the portal's task group.
353 The task will be run inside a cancel scope which can be cancelled by cancelling
354 the returned future.
356 :param func: the target function
357 :param args: positional arguments passed to ``func``
358 :param name: name of the task (will be coerced to a string if not ``None``)
359 :return: a future that resolves with the return value of the callable if the
360 task completes successfully, or with the exception raised in the task
361 :raises RuntimeError: if the portal is not running or if this method is called
362 from within the event loop thread
363 :rtype: concurrent.futures.Future[T_Retval]
365 .. versionadded:: 3.0
367 """
368 self._check_running()
369 f: Future[T_Retval] = Future()
370 self._spawn_task_from_thread(func, args, {}, name, f)
371 return f
373 def start_task(
374 self,
375 func: Callable[..., Awaitable[T_Retval]],
376 *args: object,
377 name: object = None,
378 ) -> tuple[Future[T_Retval], Any]:
379 """
380 Start a task in the portal's task group and wait until it signals for readiness.
382 This method works the same way as :meth:`.abc.TaskGroup.start`.
384 :param func: the target function
385 :param args: positional arguments passed to ``func``
386 :param name: name of the task (will be coerced to a string if not ``None``)
387 :return: a tuple of (future, task_status_value) where the ``task_status_value``
388 is the value passed to ``task_status.started()`` from within the target
389 function
390 :rtype: tuple[concurrent.futures.Future[T_Retval], Any]
392 .. versionadded:: 3.0
394 """
396 def task_done(future: Future[T_Retval]) -> None:
397 if not task_status_future.done():
398 if future.cancelled():
399 task_status_future.cancel()
400 elif future.exception():
401 task_status_future.set_exception(future.exception())
402 else:
403 exc = RuntimeError(
404 "Task exited without calling task_status.started()"
405 )
406 task_status_future.set_exception(exc)
408 self._check_running()
409 task_status_future: Future = Future()
410 task_status = _BlockingPortalTaskStatus(task_status_future)
411 f: Future = Future()
412 f.add_done_callback(task_done)
413 self._spawn_task_from_thread(func, args, {"task_status": task_status}, name, f)
414 return f, task_status_future.result()
416 def wrap_async_context_manager(
417 self, cm: AbstractAsyncContextManager[T_co]
418 ) -> AbstractContextManager[T_co]:
419 """
420 Wrap an async context manager as a synchronous context manager via this portal.
422 Spawns a task that will call both ``__aenter__()`` and ``__aexit__()``, stopping
423 in the middle until the synchronous context manager exits.
425 :param cm: an asynchronous context manager
426 :return: a synchronous context manager
428 .. versionadded:: 2.1
430 """
431 return _BlockingAsyncContextManager(cm, self)
434@dataclass
435class BlockingPortalProvider:
436 """
437 A manager for a blocking portal. Used as a context manager. The first thread to
438 enter this context manager causes a blocking portal to be started with the specific
439 parameters, and the last thread to exit causes the portal to be shut down. Thus,
440 there will be exactly one blocking portal running in this context as long as at
441 least one thread has entered this context manager.
443 The parameters are the same as for :func:`~anyio.run`.
445 :param backend: name of the backend
446 :param backend_options: backend options
448 .. versionadded:: 4.4
449 """
451 backend: str = "asyncio"
452 backend_options: dict[str, Any] | None = None
453 _lock: Lock = field(init=False, default_factory=Lock)
454 _leases: int = field(init=False, default=0)
455 _portal: BlockingPortal = field(init=False)
456 _portal_cm: AbstractContextManager[BlockingPortal] | None = field(
457 init=False, default=None
458 )
460 def __enter__(self) -> BlockingPortal:
461 with self._lock:
462 if self._portal_cm is None:
463 self._portal_cm = start_blocking_portal(
464 self.backend, self.backend_options
465 )
466 self._portal = self._portal_cm.__enter__()
468 self._leases += 1
469 return self._portal
471 def __exit__(
472 self,
473 exc_type: type[BaseException] | None,
474 exc_val: BaseException | None,
475 exc_tb: TracebackType | None,
476 ) -> None:
477 portal_cm: AbstractContextManager[BlockingPortal] | None = None
478 with self._lock:
479 assert self._portal_cm
480 assert self._leases > 0
481 self._leases -= 1
482 if not self._leases:
483 portal_cm = self._portal_cm
484 self._portal_cm = None
485 del self._portal
487 if portal_cm:
488 portal_cm.__exit__(None, None, None)
491@contextmanager
492def start_blocking_portal(
493 backend: str = "asyncio",
494 backend_options: dict[str, Any] | None = None,
495 *,
496 name: str | None = None,
497) -> Generator[BlockingPortal, Any, None]:
498 """
499 Start a new event loop in a new thread and run a blocking portal in its main task.
501 The parameters are the same as for :func:`~anyio.run`.
503 :param backend: name of the backend
504 :param backend_options: backend options
505 :param name: name of the thread
506 :return: a context manager that yields a blocking portal
508 .. versionchanged:: 3.0
509 Usage as a context manager is now required.
511 """
513 async def run_portal() -> None:
514 async with BlockingPortal() as portal_:
515 if name is None:
516 current_thread().name = f"{backend}-portal-{id(portal_):x}"
518 future.set_result(portal_)
519 await portal_.sleep_until_stopped()
521 def run_blocking_portal() -> None:
522 if future.set_running_or_notify_cancel():
523 try:
524 run_eventloop(
525 run_portal, backend=backend, backend_options=backend_options
526 )
527 except BaseException as exc:
528 if not future.done():
529 future.set_exception(exc)
531 future: Future[BlockingPortal] = Future()
532 thread = Thread(target=run_blocking_portal, daemon=True, name=name)
533 thread.start()
534 try:
535 cancel_remaining_tasks = False
536 portal = future.result()
537 try:
538 yield portal
539 except BaseException:
540 cancel_remaining_tasks = True
541 raise
542 finally:
543 try:
544 portal.call(portal.stop, cancel_remaining_tasks)
545 except RuntimeError:
546 pass
547 finally:
548 thread.join()
551def check_cancelled() -> None:
552 """
553 Check if the cancel scope of the host task's running the current worker thread has
554 been cancelled.
556 If the host task's current cancel scope has indeed been cancelled, the
557 backend-specific cancellation exception will be raised.
559 :raises RuntimeError: if the current thread was not spawned by
560 :func:`.to_thread.run_sync`
562 """
563 try:
564 token: EventLoopToken = threadlocals.current_token
565 except AttributeError:
566 raise NoEventLoopError(
567 "This function can only be called inside an AnyIO worker thread"
568 ) from None
570 token.backend_class.check_cancelled()