Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/anyio/from_thread.py: 29%
180 statements
« prev ^ index » next coverage.py v7.3.1, created at 2023-09-25 06:38 +0000
« prev ^ index » next coverage.py v7.3.1, created at 2023-09-25 06:38 +0000
1from __future__ import annotations
3import threading
4from collections.abc import Awaitable, Callable, Generator
5from concurrent.futures import FIRST_COMPLETED, Future, ThreadPoolExecutor, wait
6from contextlib import AbstractContextManager, contextmanager
7from inspect import isawaitable
8from types import TracebackType
9from typing import (
10 Any,
11 AsyncContextManager,
12 ContextManager,
13 Generic,
14 Iterable,
15 TypeVar,
16 cast,
17 overload,
18)
20from ._core import _eventloop
21from ._core._eventloop import get_async_backend, get_cancelled_exc_class, threadlocals
22from ._core._synchronization import Event
23from ._core._tasks import CancelScope, create_task_group
24from .abc._tasks import TaskStatus
26T_Retval = TypeVar("T_Retval")
27T_co = TypeVar("T_co")
30def run(func: Callable[..., Awaitable[T_Retval]], *args: object) -> T_Retval:
31 """
32 Call a coroutine function from a worker thread.
34 :param func: a coroutine function
35 :param args: positional arguments for the callable
36 :return: the return value of the coroutine function
38 """
39 try:
40 async_backend = threadlocals.current_async_backend
41 token = threadlocals.current_token
42 except AttributeError:
43 raise RuntimeError("This function can only be run from an AnyIO worker thread")
45 return async_backend.run_async_from_thread(func, args, token=token)
48def run_sync(func: Callable[..., T_Retval], *args: object) -> T_Retval:
49 """
50 Call a function in the event loop thread from a worker thread.
52 :param func: a callable
53 :param args: positional arguments for the callable
54 :return: the return value of the callable
56 """
57 try:
58 async_backend = threadlocals.current_async_backend
59 token = threadlocals.current_token
60 except AttributeError:
61 raise RuntimeError("This function can only be run from an AnyIO worker thread")
63 return async_backend.run_sync_from_thread(func, args, token=token)
66class _BlockingAsyncContextManager(Generic[T_co], AbstractContextManager):
67 _enter_future: Future
68 _exit_future: Future
69 _exit_event: Event
70 _exit_exc_info: tuple[
71 type[BaseException] | None, BaseException | None, TracebackType | None
72 ] = (None, None, None)
74 def __init__(self, async_cm: AsyncContextManager[T_co], portal: BlockingPortal):
75 self._async_cm = async_cm
76 self._portal = portal
78 async def run_async_cm(self) -> bool | None:
79 try:
80 self._exit_event = Event()
81 value = await self._async_cm.__aenter__()
82 except BaseException as exc:
83 self._enter_future.set_exception(exc)
84 raise
85 else:
86 self._enter_future.set_result(value)
88 try:
89 # Wait for the sync context manager to exit.
90 # This next statement can raise `get_cancelled_exc_class()` if
91 # something went wrong in a task group in this async context
92 # manager.
93 await self._exit_event.wait()
94 finally:
95 # In case of cancellation, it could be that we end up here before
96 # `_BlockingAsyncContextManager.__exit__` is called, and an
97 # `_exit_exc_info` has been set.
98 result = await self._async_cm.__aexit__(*self._exit_exc_info)
99 return result
101 def __enter__(self) -> T_co:
102 self._enter_future = Future()
103 self._exit_future = self._portal.start_task_soon(self.run_async_cm)
104 cm = self._enter_future.result()
105 return cast(T_co, cm)
107 def __exit__(
108 self,
109 __exc_type: type[BaseException] | None,
110 __exc_value: BaseException | None,
111 __traceback: TracebackType | None,
112 ) -> bool | None:
113 self._exit_exc_info = __exc_type, __exc_value, __traceback
114 self._portal.call(self._exit_event.set)
115 return self._exit_future.result()
118class _BlockingPortalTaskStatus(TaskStatus):
119 def __init__(self, future: Future):
120 self._future = future
122 def started(self, value: object = None) -> None:
123 self._future.set_result(value)
126class BlockingPortal:
127 """An object that lets external threads run code in an asynchronous event loop."""
129 def __new__(cls) -> BlockingPortal:
130 return get_async_backend().create_blocking_portal()
132 def __init__(self) -> None:
133 self._event_loop_thread_id: int | None = threading.get_ident()
134 self._stop_event = Event()
135 self._task_group = create_task_group()
136 self._cancelled_exc_class = get_cancelled_exc_class()
138 async def __aenter__(self) -> BlockingPortal:
139 await self._task_group.__aenter__()
140 return self
142 async def __aexit__(
143 self,
144 exc_type: type[BaseException] | None,
145 exc_val: BaseException | None,
146 exc_tb: TracebackType | None,
147 ) -> bool | None:
148 await self.stop()
149 return await self._task_group.__aexit__(exc_type, exc_val, exc_tb)
151 def _check_running(self) -> None:
152 if self._event_loop_thread_id is None:
153 raise RuntimeError("This portal is not running")
154 if self._event_loop_thread_id == threading.get_ident():
155 raise RuntimeError(
156 "This method cannot be called from the event loop thread"
157 )
159 async def sleep_until_stopped(self) -> None:
160 """Sleep until :meth:`stop` is called."""
161 await self._stop_event.wait()
163 async def stop(self, cancel_remaining: bool = False) -> None:
164 """
165 Signal the portal to shut down.
167 This marks the portal as no longer accepting new calls and exits from
168 :meth:`sleep_until_stopped`.
170 :param cancel_remaining: ``True`` to cancel all the remaining tasks, ``False``
171 to let them finish before returning
173 """
174 self._event_loop_thread_id = None
175 self._stop_event.set()
176 if cancel_remaining:
177 self._task_group.cancel_scope.cancel()
179 async def _call_func(
180 self, func: Callable, args: tuple, kwargs: dict[str, Any], future: Future
181 ) -> None:
182 def callback(f: Future) -> None:
183 if f.cancelled() and self._event_loop_thread_id not in (
184 None,
185 threading.get_ident(),
186 ):
187 self.call(scope.cancel)
189 try:
190 retval = func(*args, **kwargs)
191 if isawaitable(retval):
192 with CancelScope() as scope:
193 if future.cancelled():
194 scope.cancel()
195 else:
196 future.add_done_callback(callback)
198 retval = await retval
199 except self._cancelled_exc_class:
200 future.cancel()
201 future.set_running_or_notify_cancel()
202 except BaseException as exc:
203 if not future.cancelled():
204 future.set_exception(exc)
206 # Let base exceptions fall through
207 if not isinstance(exc, Exception):
208 raise
209 else:
210 if not future.cancelled():
211 future.set_result(retval)
212 finally:
213 scope = None # type: ignore[assignment]
215 def _spawn_task_from_thread(
216 self,
217 func: Callable,
218 args: tuple[Any, ...],
219 kwargs: dict[str, Any],
220 name: object,
221 future: Future,
222 ) -> None:
223 """
224 Spawn a new task using the given callable.
226 Implementors must ensure that the future is resolved when the task finishes.
228 :param func: a callable
229 :param args: positional arguments to be passed to the callable
230 :param kwargs: keyword arguments to be passed to the callable
231 :param name: name of the task (will be coerced to a string if not ``None``)
232 :param future: a future that will resolve to the return value of the callable,
233 or the exception raised during its execution
235 """
236 raise NotImplementedError
238 @overload
239 def call(self, func: Callable[..., Awaitable[T_Retval]], *args: object) -> T_Retval:
240 ...
242 @overload
243 def call(self, func: Callable[..., T_Retval], *args: object) -> T_Retval:
244 ...
246 def call(
247 self,
248 func: Callable[..., Awaitable[T_Retval] | T_Retval],
249 *args: object,
250 ) -> T_Retval:
251 """
252 Call the given function in the event loop thread.
254 If the callable returns a coroutine object, it is awaited on.
256 :param func: any callable
257 :raises RuntimeError: if the portal is not running or if this method is called
258 from within the event loop thread
260 """
261 return cast(T_Retval, self.start_task_soon(func, *args).result())
263 @overload
264 def start_task_soon(
265 self,
266 func: Callable[..., Awaitable[T_Retval]],
267 *args: object,
268 name: object = None,
269 ) -> Future[T_Retval]:
270 ...
272 @overload
273 def start_task_soon(
274 self, func: Callable[..., T_Retval], *args: object, name: object = None
275 ) -> Future[T_Retval]:
276 ...
278 def start_task_soon(
279 self,
280 func: Callable[..., Awaitable[T_Retval] | T_Retval],
281 *args: object,
282 name: object = None,
283 ) -> Future[T_Retval]:
284 """
285 Start a task in the portal's task group.
287 The task will be run inside a cancel scope which can be cancelled by cancelling
288 the returned future.
290 :param func: the target function
291 :param args: positional arguments passed to ``func``
292 :param name: name of the task (will be coerced to a string if not ``None``)
293 :return: a future that resolves with the return value of the callable if the
294 task completes successfully, or with the exception raised in the task
295 :raises RuntimeError: if the portal is not running or if this method is called
296 from within the event loop thread
297 :rtype: concurrent.futures.Future[T_Retval]
299 .. versionadded:: 3.0
301 """
302 self._check_running()
303 f: Future = Future()
304 self._spawn_task_from_thread(func, args, {}, name, f)
305 return f
307 def start_task(
308 self,
309 func: Callable[..., Awaitable[Any]],
310 *args: object,
311 name: object = None,
312 ) -> tuple[Future[Any], Any]:
313 """
314 Start a task in the portal's task group and wait until it signals for readiness.
316 This method works the same way as :meth:`.abc.TaskGroup.start`.
318 :param func: the target function
319 :param args: positional arguments passed to ``func``
320 :param name: name of the task (will be coerced to a string if not ``None``)
321 :return: a tuple of (future, task_status_value) where the ``task_status_value``
322 is the value passed to ``task_status.started()`` from within the target
323 function
324 :rtype: tuple[concurrent.futures.Future[Any], Any]
326 .. versionadded:: 3.0
328 """
330 def task_done(future: Future) -> None:
331 if not task_status_future.done():
332 if future.cancelled():
333 task_status_future.cancel()
334 elif future.exception():
335 task_status_future.set_exception(future.exception())
336 else:
337 exc = RuntimeError(
338 "Task exited without calling task_status.started()"
339 )
340 task_status_future.set_exception(exc)
342 self._check_running()
343 task_status_future: Future = Future()
344 task_status = _BlockingPortalTaskStatus(task_status_future)
345 f: Future = Future()
346 f.add_done_callback(task_done)
347 self._spawn_task_from_thread(func, args, {"task_status": task_status}, name, f)
348 return f, task_status_future.result()
350 def wrap_async_context_manager(
351 self, cm: AsyncContextManager[T_co]
352 ) -> ContextManager[T_co]:
353 """
354 Wrap an async context manager as a synchronous context manager via this portal.
356 Spawns a task that will call both ``__aenter__()`` and ``__aexit__()``, stopping
357 in the middle until the synchronous context manager exits.
359 :param cm: an asynchronous context manager
360 :return: a synchronous context manager
362 .. versionadded:: 2.1
364 """
365 return _BlockingAsyncContextManager(cm, self)
368@contextmanager
369def start_blocking_portal(
370 backend: str = "asyncio", backend_options: dict[str, Any] | None = None
371) -> Generator[BlockingPortal, Any, None]:
372 """
373 Start a new event loop in a new thread and run a blocking portal in its main task.
375 The parameters are the same as for :func:`~anyio.run`.
377 :param backend: name of the backend
378 :param backend_options: backend options
379 :return: a context manager that yields a blocking portal
381 .. versionchanged:: 3.0
382 Usage as a context manager is now required.
384 """
386 async def run_portal() -> None:
387 async with BlockingPortal() as portal_:
388 if future.set_running_or_notify_cancel():
389 future.set_result(portal_)
390 await portal_.sleep_until_stopped()
392 future: Future[BlockingPortal] = Future()
393 with ThreadPoolExecutor(1) as executor:
394 run_future = executor.submit(
395 _eventloop.run,
396 run_portal, # type: ignore[arg-type]
397 backend=backend,
398 backend_options=backend_options,
399 )
400 try:
401 wait(
402 cast(Iterable[Future], [run_future, future]),
403 return_when=FIRST_COMPLETED,
404 )
405 except BaseException:
406 future.cancel()
407 run_future.cancel()
408 raise
410 if future.done():
411 portal = future.result()
412 cancel_remaining_tasks = False
413 try:
414 yield portal
415 except BaseException:
416 cancel_remaining_tasks = True
417 raise
418 finally:
419 try:
420 portal.call(portal.stop, cancel_remaining_tasks)
421 except RuntimeError:
422 pass
424 run_future.result()