Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/anyio/from_thread.py: 33%
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1from __future__ import annotations
3__all__ = (
4 "BlockingPortal",
5 "BlockingPortalProvider",
6 "check_cancelled",
7 "run",
8 "run_sync",
9 "start_blocking_portal",
10)
12import sys
13from collections.abc import Awaitable, Callable, Generator
14from concurrent.futures import Future
15from contextlib import (
16 AbstractAsyncContextManager,
17 AbstractContextManager,
18 contextmanager,
19)
20from dataclasses import dataclass, field
21from functools import partial
22from inspect import isawaitable
23from threading import Lock, Thread, current_thread, get_ident
24from types import TracebackType
25from typing import (
26 Any,
27 Generic,
28 TypeVar,
29 cast,
30 overload,
31)
33from ._core._eventloop import (
34 get_cancelled_exc_class,
35 threadlocals,
36)
37from ._core._eventloop import run as run_eventloop
38from ._core._exceptions import NoEventLoopError
39from ._core._synchronization import Event
40from ._core._tasks import CancelScope, create_task_group
41from .abc._tasks import TaskStatus
42from .lowlevel import EventLoopToken, current_token
44if sys.version_info >= (3, 11):
45 from typing import TypeVarTuple, Unpack
46else:
47 from typing_extensions import TypeVarTuple, Unpack
49T_Retval = TypeVar("T_Retval")
50T_co = TypeVar("T_co", covariant=True)
51PosArgsT = TypeVarTuple("PosArgsT")
54def _token_or_error(token: EventLoopToken | None) -> EventLoopToken:
55 if token is not None:
56 return token
58 try:
59 return threadlocals.current_token
60 except AttributeError:
61 raise NoEventLoopError(
62 "Not running inside an AnyIO worker thread, and no event loop token was "
63 "provided"
64 ) from None
67def run(
68 func: Callable[[Unpack[PosArgsT]], Awaitable[T_Retval]],
69 *args: Unpack[PosArgsT],
70 token: EventLoopToken | None = None,
71) -> T_Retval:
72 """
73 Call a coroutine function from a worker thread.
75 :param func: a coroutine function
76 :param args: positional arguments for the callable
77 :param token: an event loop token to use to get back to the event loop thread
78 (required if calling this function from outside an AnyIO worker thread)
79 :return: the return value of the coroutine function
80 :raises MissingTokenError: if no token was provided and called from outside an
81 AnyIO worker thread
82 :raises RunFinishedError: if the event loop tied to ``token`` is no longer running
84 .. versionchanged:: 4.11.0
85 Added the ``token`` parameter.
87 """
88 explicit_token = token is not None
89 token = _token_or_error(token)
90 return token.backend_class.run_async_from_thread(
91 func, args, token=token.native_token if explicit_token else None
92 )
95def run_sync(
96 func: Callable[[Unpack[PosArgsT]], T_Retval],
97 *args: Unpack[PosArgsT],
98 token: EventLoopToken | None = None,
99) -> T_Retval:
100 """
101 Call a function in the event loop thread from a worker thread.
103 :param func: a callable
104 :param args: positional arguments for the callable
105 :param token: an event loop token to use to get back to the event loop thread
106 (required if calling this function from outside an AnyIO worker thread)
107 :return: the return value of the callable
108 :raises MissingTokenError: if no token was provided and called from outside an
109 AnyIO worker thread
110 :raises RunFinishedError: if the event loop tied to ``token`` is no longer running
112 .. versionchanged:: 4.11.0
113 Added the ``token`` parameter.
115 """
116 explicit_token = token is not None
117 token = _token_or_error(token)
118 return token.backend_class.run_sync_from_thread(
119 func, args, token=token.native_token if explicit_token else None
120 )
123class _BlockingAsyncContextManager(Generic[T_co], AbstractContextManager):
124 _enter_future: Future[T_co]
125 _exit_future: Future[bool | None]
126 _exit_event: Event
127 _exit_exc_info: tuple[
128 type[BaseException] | None, BaseException | None, TracebackType | None
129 ] = (None, None, None)
131 def __init__(
132 self, async_cm: AbstractAsyncContextManager[T_co], portal: BlockingPortal
133 ):
134 self._async_cm = async_cm
135 self._portal = portal
137 async def run_async_cm(self) -> bool | None:
138 try:
139 self._exit_event = Event()
140 value = await self._async_cm.__aenter__()
141 except BaseException as exc:
142 self._enter_future.set_exception(exc)
143 raise
144 else:
145 self._enter_future.set_result(value)
147 try:
148 # Wait for the sync context manager to exit.
149 # This next statement can raise `get_cancelled_exc_class()` if
150 # something went wrong in a task group in this async context
151 # manager.
152 await self._exit_event.wait()
153 finally:
154 # In case of cancellation, it could be that we end up here before
155 # `_BlockingAsyncContextManager.__exit__` is called, and an
156 # `_exit_exc_info` has been set.
157 result = await self._async_cm.__aexit__(*self._exit_exc_info)
159 return result
161 def __enter__(self) -> T_co:
162 self._enter_future = Future()
163 self._exit_future = self._portal.start_task_soon(self.run_async_cm)
164 return self._enter_future.result()
166 def __exit__(
167 self,
168 __exc_type: type[BaseException] | None,
169 __exc_value: BaseException | None,
170 __traceback: TracebackType | None,
171 ) -> bool | None:
172 self._exit_exc_info = __exc_type, __exc_value, __traceback
173 self._portal.call(self._exit_event.set)
174 return self._exit_future.result()
177class _BlockingPortalTaskStatus(TaskStatus):
178 def __init__(self, future: Future):
179 self._future = future
181 def started(self, value: object = None) -> None:
182 self._future.set_result(value)
185class BlockingPortal:
186 """
187 An object that lets external threads run code in an asynchronous event loop.
189 :raises NoEventLoopError: if no supported asynchronous event loop is running in the
190 current thread
191 """
193 def __init__(self) -> None:
194 self._token = current_token()
195 self._event_loop_thread_id: int | None = get_ident()
196 self._stop_event = Event()
197 self._task_group = create_task_group()
199 async def __aenter__(self) -> BlockingPortal:
200 await self._task_group.__aenter__()
201 return self
203 async def __aexit__(
204 self,
205 exc_type: type[BaseException] | None,
206 exc_val: BaseException | None,
207 exc_tb: TracebackType | None,
208 ) -> bool:
209 await self.stop()
210 return await self._task_group.__aexit__(exc_type, exc_val, exc_tb)
212 def _check_running(self) -> None:
213 if self._event_loop_thread_id is None:
214 raise RuntimeError("This portal is not running")
215 if self._event_loop_thread_id == get_ident():
216 raise RuntimeError(
217 "This method cannot be called from the event loop thread"
218 )
220 async def sleep_until_stopped(self) -> None:
221 """Sleep until :meth:`stop` is called."""
222 await self._stop_event.wait()
224 async def stop(self, cancel_remaining: bool = False) -> None:
225 """
226 Signal the portal to shut down.
228 This marks the portal as no longer accepting new calls and exits from
229 :meth:`sleep_until_stopped`.
231 :param cancel_remaining: ``True`` to cancel all the remaining tasks, ``False``
232 to let them finish before returning
234 """
235 self._event_loop_thread_id = None
236 self._stop_event.set()
237 if cancel_remaining:
238 self._task_group.cancel_scope.cancel("the blocking portal is shutting down")
240 async def _call_func(
241 self,
242 func: Callable[[Unpack[PosArgsT]], Awaitable[T_Retval] | T_Retval],
243 args: tuple[Unpack[PosArgsT]],
244 kwargs: dict[str, Any],
245 future: Future[T_Retval],
246 ) -> None:
247 def callback(f: Future[T_Retval]) -> None:
248 if f.cancelled():
249 if self._event_loop_thread_id == get_ident():
250 scope.cancel("the future was cancelled")
251 elif self._event_loop_thread_id is not None:
252 self.call(scope.cancel, "the future was cancelled")
254 try:
255 retval_or_awaitable = func(*args, **kwargs)
256 if isawaitable(retval_or_awaitable):
257 with CancelScope() as scope:
258 future.add_done_callback(callback)
259 retval = await retval_or_awaitable
260 else:
261 retval = retval_or_awaitable
262 except get_cancelled_exc_class():
263 future.cancel()
264 future.set_running_or_notify_cancel()
265 except BaseException as exc:
266 if not future.cancelled():
267 future.set_exception(exc)
269 # Let base exceptions fall through
270 if not isinstance(exc, Exception):
271 raise
272 else:
273 if not future.cancelled():
274 future.set_result(retval)
275 finally:
276 scope = None # type: ignore[assignment]
278 def _spawn_task_from_thread(
279 self,
280 func: Callable[[Unpack[PosArgsT]], Awaitable[T_Retval] | T_Retval],
281 args: tuple[Unpack[PosArgsT]],
282 kwargs: dict[str, Any],
283 name: object,
284 future: Future[T_Retval],
285 ) -> None:
286 """
287 Spawn a new task using the given callable.
289 :param func: a callable
290 :param args: positional arguments to be passed to the callable
291 :param kwargs: keyword arguments to be passed to the callable
292 :param name: name of the task (will be coerced to a string if not ``None``)
293 :param future: a future that will resolve to the return value of the callable,
294 or the exception raised during its execution
296 """
297 run_sync(
298 partial(self._task_group.start_soon, name=name),
299 self._call_func,
300 func,
301 args,
302 kwargs,
303 future,
304 token=self._token,
305 )
307 @overload
308 def call(
309 self,
310 func: Callable[[Unpack[PosArgsT]], Awaitable[T_Retval]],
311 *args: Unpack[PosArgsT],
312 ) -> T_Retval: ...
314 @overload
315 def call(
316 self, func: Callable[[Unpack[PosArgsT]], T_Retval], *args: Unpack[PosArgsT]
317 ) -> T_Retval: ...
319 def call(
320 self,
321 func: Callable[[Unpack[PosArgsT]], Awaitable[T_Retval] | T_Retval],
322 *args: Unpack[PosArgsT],
323 ) -> T_Retval:
324 """
325 Call the given function in the event loop thread.
327 If the callable returns a coroutine object, it is awaited on.
329 :param func: any callable
330 :raises RuntimeError: if the portal is not running or if this method is called
331 from within the event loop thread
333 """
334 return cast(T_Retval, self.start_task_soon(func, *args).result())
336 @overload
337 def start_task_soon(
338 self,
339 func: Callable[[Unpack[PosArgsT]], Awaitable[T_Retval]],
340 *args: Unpack[PosArgsT],
341 name: object = None,
342 ) -> Future[T_Retval]: ...
344 @overload
345 def start_task_soon(
346 self,
347 func: Callable[[Unpack[PosArgsT]], T_Retval],
348 *args: Unpack[PosArgsT],
349 name: object = None,
350 ) -> Future[T_Retval]: ...
352 def start_task_soon(
353 self,
354 func: Callable[[Unpack[PosArgsT]], Awaitable[T_Retval] | T_Retval],
355 *args: Unpack[PosArgsT],
356 name: object = None,
357 ) -> Future[T_Retval]:
358 """
359 Start a task in the portal's task group.
361 The task will be run inside a cancel scope which can be cancelled by cancelling
362 the returned future.
364 :param func: the target function
365 :param args: positional arguments passed to ``func``
366 :param name: name of the task (will be coerced to a string if not ``None``)
367 :return: a future that resolves with the return value of the callable if the
368 task completes successfully, or with the exception raised in the task
369 :raises RuntimeError: if the portal is not running or if this method is called
370 from within the event loop thread
371 :rtype: concurrent.futures.Future[T_Retval]
373 .. versionadded:: 3.0
375 """
376 self._check_running()
377 f: Future[T_Retval] = Future()
378 self._spawn_task_from_thread(func, args, {}, name, f)
379 return f
381 def start_task(
382 self,
383 func: Callable[..., Awaitable[T_Retval]],
384 *args: object,
385 name: object = None,
386 ) -> tuple[Future[T_Retval], Any]:
387 """
388 Start a task in the portal's task group and wait until it signals for readiness.
390 This method works the same way as :meth:`.abc.TaskGroup.start`.
392 :param func: the target function
393 :param args: positional arguments passed to ``func``
394 :param name: name of the task (will be coerced to a string if not ``None``)
395 :return: a tuple of (future, task_status_value) where the ``task_status_value``
396 is the value passed to ``task_status.started()`` from within the target
397 function
398 :rtype: tuple[concurrent.futures.Future[T_Retval], Any]
400 .. versionadded:: 3.0
402 """
404 def task_done(future: Future[T_Retval]) -> None:
405 if not task_status_future.done():
406 if future.cancelled():
407 task_status_future.cancel()
408 elif future.exception():
409 task_status_future.set_exception(future.exception())
410 else:
411 exc = RuntimeError(
412 "Task exited without calling task_status.started()"
413 )
414 task_status_future.set_exception(exc)
416 self._check_running()
417 task_status_future: Future = Future()
418 task_status = _BlockingPortalTaskStatus(task_status_future)
419 f: Future = Future()
420 f.add_done_callback(task_done)
421 self._spawn_task_from_thread(func, args, {"task_status": task_status}, name, f)
422 return f, task_status_future.result()
424 def wrap_async_context_manager(
425 self, cm: AbstractAsyncContextManager[T_co]
426 ) -> AbstractContextManager[T_co]:
427 """
428 Wrap an async context manager as a synchronous context manager via this portal.
430 Spawns a task that will call both ``__aenter__()`` and ``__aexit__()``, stopping
431 in the middle until the synchronous context manager exits.
433 :param cm: an asynchronous context manager
434 :return: a synchronous context manager
436 .. versionadded:: 2.1
438 """
439 return _BlockingAsyncContextManager(cm, self)
442@dataclass
443class BlockingPortalProvider:
444 """
445 A manager for a blocking portal. Used as a context manager. The first thread to
446 enter this context manager causes a blocking portal to be started with the specific
447 parameters, and the last thread to exit causes the portal to be shut down. Thus,
448 there will be exactly one blocking portal running in this context as long as at
449 least one thread has entered this context manager.
451 The parameters are the same as for :func:`~anyio.run`.
453 :param backend: name of the backend
454 :param backend_options: backend options
456 .. versionadded:: 4.4
457 """
459 backend: str = "asyncio"
460 backend_options: dict[str, Any] | None = None
461 _lock: Lock = field(init=False, default_factory=Lock)
462 _leases: int = field(init=False, default=0)
463 _portal: BlockingPortal = field(init=False)
464 _portal_cm: AbstractContextManager[BlockingPortal] | None = field(
465 init=False, default=None
466 )
468 def __enter__(self) -> BlockingPortal:
469 with self._lock:
470 if self._portal_cm is None:
471 self._portal_cm = start_blocking_portal(
472 self.backend, self.backend_options
473 )
474 self._portal = self._portal_cm.__enter__()
476 self._leases += 1
477 return self._portal
479 def __exit__(
480 self,
481 exc_type: type[BaseException] | None,
482 exc_val: BaseException | None,
483 exc_tb: TracebackType | None,
484 ) -> None:
485 portal_cm: AbstractContextManager[BlockingPortal] | None = None
486 with self._lock:
487 assert self._portal_cm
488 assert self._leases > 0
489 self._leases -= 1
490 if not self._leases:
491 portal_cm = self._portal_cm
492 self._portal_cm = None
493 del self._portal
495 if portal_cm:
496 portal_cm.__exit__(None, None, None)
499@contextmanager
500def start_blocking_portal(
501 backend: str = "asyncio",
502 backend_options: dict[str, Any] | None = None,
503 *,
504 name: str | None = None,
505) -> Generator[BlockingPortal, Any, None]:
506 """
507 Start a new event loop in a new thread and run a blocking portal in its main task.
509 The parameters are the same as for :func:`~anyio.run`.
511 :param backend: name of the backend
512 :param backend_options: backend options
513 :param name: name of the thread
514 :return: a context manager that yields a blocking portal
516 .. versionchanged:: 3.0
517 Usage as a context manager is now required.
519 """
521 async def run_portal() -> None:
522 async with BlockingPortal() as portal_:
523 if name is None:
524 current_thread().name = f"{backend}-portal-{id(portal_):x}"
526 future.set_result(portal_)
527 await portal_.sleep_until_stopped()
529 def run_blocking_portal() -> None:
530 if future.set_running_or_notify_cancel():
531 try:
532 run_eventloop(
533 run_portal, backend=backend, backend_options=backend_options
534 )
535 except BaseException as exc:
536 if not future.done():
537 future.set_exception(exc)
539 future: Future[BlockingPortal] = Future()
540 thread = Thread(target=run_blocking_portal, daemon=True, name=name)
541 thread.start()
542 try:
543 cancel_remaining_tasks = False
544 portal = future.result()
545 try:
546 yield portal
547 except BaseException:
548 cancel_remaining_tasks = True
549 raise
550 finally:
551 try:
552 portal.call(portal.stop, cancel_remaining_tasks)
553 except RuntimeError:
554 pass
555 finally:
556 thread.join()
559def check_cancelled() -> None:
560 """
561 Check if the cancel scope of the host task's running the current worker thread has
562 been cancelled.
564 If the host task's current cancel scope has indeed been cancelled, the
565 backend-specific cancellation exception will be raised.
567 :raises RuntimeError: if the current thread was not spawned by
568 :func:`.to_thread.run_sync`
570 """
571 try:
572 token: EventLoopToken = threadlocals.current_token
573 except AttributeError:
574 raise NoEventLoopError(
575 "This function can only be called inside an AnyIO worker thread"
576 ) from None
578 token.backend_class.check_cancelled()