Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/anyio/from_thread.py: 31%
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1from __future__ import annotations
3import sys
4import threading
5from collections.abc import Awaitable, Callable, Generator
6from concurrent.futures import FIRST_COMPLETED, Future, ThreadPoolExecutor, wait
7from contextlib import AbstractContextManager, contextmanager
8from inspect import isawaitable
9from types import TracebackType
10from typing import (
11 Any,
12 AsyncContextManager,
13 ContextManager,
14 Generic,
15 Iterable,
16 TypeVar,
17 cast,
18 overload,
19)
21from ._core import _eventloop
22from ._core._eventloop import get_async_backend, get_cancelled_exc_class, threadlocals
23from ._core._synchronization import Event
24from ._core._tasks import CancelScope, create_task_group
25from .abc import AsyncBackend
26from .abc._tasks import TaskStatus
28if sys.version_info >= (3, 11):
29 from typing import TypeVarTuple, Unpack
30else:
31 from typing_extensions import TypeVarTuple, Unpack
33T_Retval = TypeVar("T_Retval")
34T_co = TypeVar("T_co", covariant=True)
35PosArgsT = TypeVarTuple("PosArgsT")
38def run(
39 func: Callable[[Unpack[PosArgsT]], Awaitable[T_Retval]], *args: Unpack[PosArgsT]
40) -> T_Retval:
41 """
42 Call a coroutine function from a worker thread.
44 :param func: a coroutine function
45 :param args: positional arguments for the callable
46 :return: the return value of the coroutine function
48 """
49 try:
50 async_backend = threadlocals.current_async_backend
51 token = threadlocals.current_token
52 except AttributeError:
53 raise RuntimeError(
54 "This function can only be run from an AnyIO worker thread"
55 ) from None
57 return async_backend.run_async_from_thread(func, args, token=token)
60def run_sync(
61 func: Callable[[Unpack[PosArgsT]], T_Retval], *args: Unpack[PosArgsT]
62) -> T_Retval:
63 """
64 Call a function in the event loop thread from a worker thread.
66 :param func: a callable
67 :param args: positional arguments for the callable
68 :return: the return value of the callable
70 """
71 try:
72 async_backend = threadlocals.current_async_backend
73 token = threadlocals.current_token
74 except AttributeError:
75 raise RuntimeError(
76 "This function can only be run from an AnyIO worker thread"
77 ) from None
79 return async_backend.run_sync_from_thread(func, args, token=token)
82class _BlockingAsyncContextManager(Generic[T_co], AbstractContextManager):
83 _enter_future: Future[T_co]
84 _exit_future: Future[bool | None]
85 _exit_event: Event
86 _exit_exc_info: tuple[
87 type[BaseException] | None, BaseException | None, TracebackType | None
88 ] = (None, None, None)
90 def __init__(self, async_cm: AsyncContextManager[T_co], portal: BlockingPortal):
91 self._async_cm = async_cm
92 self._portal = portal
94 async def run_async_cm(self) -> bool | None:
95 try:
96 self._exit_event = Event()
97 value = await self._async_cm.__aenter__()
98 except BaseException as exc:
99 self._enter_future.set_exception(exc)
100 raise
101 else:
102 self._enter_future.set_result(value)
104 try:
105 # Wait for the sync context manager to exit.
106 # This next statement can raise `get_cancelled_exc_class()` if
107 # something went wrong in a task group in this async context
108 # manager.
109 await self._exit_event.wait()
110 finally:
111 # In case of cancellation, it could be that we end up here before
112 # `_BlockingAsyncContextManager.__exit__` is called, and an
113 # `_exit_exc_info` has been set.
114 result = await self._async_cm.__aexit__(*self._exit_exc_info)
115 return result
117 def __enter__(self) -> T_co:
118 self._enter_future = Future()
119 self._exit_future = self._portal.start_task_soon(self.run_async_cm)
120 return self._enter_future.result()
122 def __exit__(
123 self,
124 __exc_type: type[BaseException] | None,
125 __exc_value: BaseException | None,
126 __traceback: TracebackType | None,
127 ) -> bool | None:
128 self._exit_exc_info = __exc_type, __exc_value, __traceback
129 self._portal.call(self._exit_event.set)
130 return self._exit_future.result()
133class _BlockingPortalTaskStatus(TaskStatus):
134 def __init__(self, future: Future):
135 self._future = future
137 def started(self, value: object = None) -> None:
138 self._future.set_result(value)
141class BlockingPortal:
142 """An object that lets external threads run code in an asynchronous event loop."""
144 def __new__(cls) -> BlockingPortal:
145 return get_async_backend().create_blocking_portal()
147 def __init__(self) -> None:
148 self._event_loop_thread_id: int | None = threading.get_ident()
149 self._stop_event = Event()
150 self._task_group = create_task_group()
151 self._cancelled_exc_class = get_cancelled_exc_class()
153 async def __aenter__(self) -> BlockingPortal:
154 await self._task_group.__aenter__()
155 return self
157 async def __aexit__(
158 self,
159 exc_type: type[BaseException] | None,
160 exc_val: BaseException | None,
161 exc_tb: TracebackType | None,
162 ) -> bool | None:
163 await self.stop()
164 return await self._task_group.__aexit__(exc_type, exc_val, exc_tb)
166 def _check_running(self) -> None:
167 if self._event_loop_thread_id is None:
168 raise RuntimeError("This portal is not running")
169 if self._event_loop_thread_id == threading.get_ident():
170 raise RuntimeError(
171 "This method cannot be called from the event loop thread"
172 )
174 async def sleep_until_stopped(self) -> None:
175 """Sleep until :meth:`stop` is called."""
176 await self._stop_event.wait()
178 async def stop(self, cancel_remaining: bool = False) -> None:
179 """
180 Signal the portal to shut down.
182 This marks the portal as no longer accepting new calls and exits from
183 :meth:`sleep_until_stopped`.
185 :param cancel_remaining: ``True`` to cancel all the remaining tasks, ``False``
186 to let them finish before returning
188 """
189 self._event_loop_thread_id = None
190 self._stop_event.set()
191 if cancel_remaining:
192 self._task_group.cancel_scope.cancel()
194 async def _call_func(
195 self,
196 func: Callable[[Unpack[PosArgsT]], Awaitable[T_Retval] | T_Retval],
197 args: tuple[Unpack[PosArgsT]],
198 kwargs: dict[str, Any],
199 future: Future[T_Retval],
200 ) -> None:
201 def callback(f: Future[T_Retval]) -> None:
202 if f.cancelled() and self._event_loop_thread_id not in (
203 None,
204 threading.get_ident(),
205 ):
206 self.call(scope.cancel)
208 try:
209 retval_or_awaitable = func(*args, **kwargs)
210 if isawaitable(retval_or_awaitable):
211 with CancelScope() as scope:
212 if future.cancelled():
213 scope.cancel()
214 else:
215 future.add_done_callback(callback)
217 retval = await retval_or_awaitable
218 else:
219 retval = retval_or_awaitable
220 except self._cancelled_exc_class:
221 future.cancel()
222 future.set_running_or_notify_cancel()
223 except BaseException as exc:
224 if not future.cancelled():
225 future.set_exception(exc)
227 # Let base exceptions fall through
228 if not isinstance(exc, Exception):
229 raise
230 else:
231 if not future.cancelled():
232 future.set_result(retval)
233 finally:
234 scope = None # type: ignore[assignment]
236 def _spawn_task_from_thread(
237 self,
238 func: Callable[[Unpack[PosArgsT]], Awaitable[T_Retval] | T_Retval],
239 args: tuple[Unpack[PosArgsT]],
240 kwargs: dict[str, Any],
241 name: object,
242 future: Future[T_Retval],
243 ) -> None:
244 """
245 Spawn a new task using the given callable.
247 Implementors must ensure that the future is resolved when the task finishes.
249 :param func: a callable
250 :param args: positional arguments to be passed to the callable
251 :param kwargs: keyword arguments to be passed to the callable
252 :param name: name of the task (will be coerced to a string if not ``None``)
253 :param future: a future that will resolve to the return value of the callable,
254 or the exception raised during its execution
256 """
257 raise NotImplementedError
259 @overload
260 def call(
261 self,
262 func: Callable[[Unpack[PosArgsT]], Awaitable[T_Retval]],
263 *args: Unpack[PosArgsT],
264 ) -> T_Retval:
265 ...
267 @overload
268 def call(
269 self, func: Callable[[Unpack[PosArgsT]], T_Retval], *args: Unpack[PosArgsT]
270 ) -> T_Retval:
271 ...
273 def call(
274 self,
275 func: Callable[[Unpack[PosArgsT]], Awaitable[T_Retval] | T_Retval],
276 *args: Unpack[PosArgsT],
277 ) -> T_Retval:
278 """
279 Call the given function in the event loop thread.
281 If the callable returns a coroutine object, it is awaited on.
283 :param func: any callable
284 :raises RuntimeError: if the portal is not running or if this method is called
285 from within the event loop thread
287 """
288 return cast(T_Retval, self.start_task_soon(func, *args).result())
290 @overload
291 def start_task_soon(
292 self,
293 func: Callable[[Unpack[PosArgsT]], Awaitable[T_Retval]],
294 *args: Unpack[PosArgsT],
295 name: object = None,
296 ) -> Future[T_Retval]:
297 ...
299 @overload
300 def start_task_soon(
301 self,
302 func: Callable[[Unpack[PosArgsT]], T_Retval],
303 *args: Unpack[PosArgsT],
304 name: object = None,
305 ) -> Future[T_Retval]:
306 ...
308 def start_task_soon(
309 self,
310 func: Callable[[Unpack[PosArgsT]], Awaitable[T_Retval] | T_Retval],
311 *args: Unpack[PosArgsT],
312 name: object = None,
313 ) -> Future[T_Retval]:
314 """
315 Start a task in the portal's task group.
317 The task will be run inside a cancel scope which can be cancelled by cancelling
318 the returned future.
320 :param func: the target function
321 :param args: positional arguments passed to ``func``
322 :param name: name of the task (will be coerced to a string if not ``None``)
323 :return: a future that resolves with the return value of the callable if the
324 task completes successfully, or with the exception raised in the task
325 :raises RuntimeError: if the portal is not running or if this method is called
326 from within the event loop thread
327 :rtype: concurrent.futures.Future[T_Retval]
329 .. versionadded:: 3.0
331 """
332 self._check_running()
333 f: Future[T_Retval] = Future()
334 self._spawn_task_from_thread(func, args, {}, name, f)
335 return f
337 def start_task(
338 self,
339 func: Callable[..., Awaitable[T_Retval]],
340 *args: object,
341 name: object = None,
342 ) -> tuple[Future[T_Retval], Any]:
343 """
344 Start a task in the portal's task group and wait until it signals for readiness.
346 This method works the same way as :meth:`.abc.TaskGroup.start`.
348 :param func: the target function
349 :param args: positional arguments passed to ``func``
350 :param name: name of the task (will be coerced to a string if not ``None``)
351 :return: a tuple of (future, task_status_value) where the ``task_status_value``
352 is the value passed to ``task_status.started()`` from within the target
353 function
354 :rtype: tuple[concurrent.futures.Future[T_Retval], Any]
356 .. versionadded:: 3.0
358 """
360 def task_done(future: Future[T_Retval]) -> None:
361 if not task_status_future.done():
362 if future.cancelled():
363 task_status_future.cancel()
364 elif future.exception():
365 task_status_future.set_exception(future.exception())
366 else:
367 exc = RuntimeError(
368 "Task exited without calling task_status.started()"
369 )
370 task_status_future.set_exception(exc)
372 self._check_running()
373 task_status_future: Future = Future()
374 task_status = _BlockingPortalTaskStatus(task_status_future)
375 f: Future = Future()
376 f.add_done_callback(task_done)
377 self._spawn_task_from_thread(func, args, {"task_status": task_status}, name, f)
378 return f, task_status_future.result()
380 def wrap_async_context_manager(
381 self, cm: AsyncContextManager[T_co]
382 ) -> ContextManager[T_co]:
383 """
384 Wrap an async context manager as a synchronous context manager via this portal.
386 Spawns a task that will call both ``__aenter__()`` and ``__aexit__()``, stopping
387 in the middle until the synchronous context manager exits.
389 :param cm: an asynchronous context manager
390 :return: a synchronous context manager
392 .. versionadded:: 2.1
394 """
395 return _BlockingAsyncContextManager(cm, self)
398@contextmanager
399def start_blocking_portal(
400 backend: str = "asyncio", backend_options: dict[str, Any] | None = None
401) -> Generator[BlockingPortal, Any, None]:
402 """
403 Start a new event loop in a new thread and run a blocking portal in its main task.
405 The parameters are the same as for :func:`~anyio.run`.
407 :param backend: name of the backend
408 :param backend_options: backend options
409 :return: a context manager that yields a blocking portal
411 .. versionchanged:: 3.0
412 Usage as a context manager is now required.
414 """
416 async def run_portal() -> None:
417 async with BlockingPortal() as portal_:
418 if future.set_running_or_notify_cancel():
419 future.set_result(portal_)
420 await portal_.sleep_until_stopped()
422 future: Future[BlockingPortal] = Future()
423 with ThreadPoolExecutor(1) as executor:
424 run_future = executor.submit(
425 _eventloop.run, # type: ignore[arg-type]
426 run_portal,
427 backend=backend,
428 backend_options=backend_options,
429 )
430 try:
431 wait(
432 cast(Iterable[Future], [run_future, future]),
433 return_when=FIRST_COMPLETED,
434 )
435 except BaseException:
436 future.cancel()
437 run_future.cancel()
438 raise
440 if future.done():
441 portal = future.result()
442 cancel_remaining_tasks = False
443 try:
444 yield portal
445 except BaseException:
446 cancel_remaining_tasks = True
447 raise
448 finally:
449 try:
450 portal.call(portal.stop, cancel_remaining_tasks)
451 except RuntimeError:
452 pass
454 run_future.result()
457def check_cancelled() -> None:
458 """
459 Check if the cancel scope of the host task's running the current worker thread has
460 been cancelled.
462 If the host task's current cancel scope has indeed been cancelled, the
463 backend-specific cancellation exception will be raised.
465 :raises RuntimeError: if the current thread was not spawned by
466 :func:`.to_thread.run_sync`
468 """
469 try:
470 async_backend: AsyncBackend = threadlocals.current_async_backend
471 except AttributeError:
472 raise RuntimeError(
473 "This function can only be run from an AnyIO worker thread"
474 ) from None
476 async_backend.check_cancelled()