Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/anyio/from_thread.py: 31%
195 statements
« prev ^ index » next coverage.py v7.2.7, created at 2023-06-07 07:19 +0000
« prev ^ index » next coverage.py v7.2.7, created at 2023-06-07 07:19 +0000
1from __future__ import annotations
3import threading
4from asyncio import iscoroutine
5from concurrent.futures import FIRST_COMPLETED, Future, ThreadPoolExecutor, wait
6from contextlib import AbstractContextManager, contextmanager
7from types import TracebackType
8from typing import (
9 Any,
10 AsyncContextManager,
11 Awaitable,
12 Callable,
13 ContextManager,
14 Generator,
15 Generic,
16 Iterable,
17 TypeVar,
18 cast,
19 overload,
20)
21from warnings import warn
23from ._core import _eventloop
24from ._core._eventloop import get_asynclib, get_cancelled_exc_class, threadlocals
25from ._core._synchronization import Event
26from ._core._tasks import CancelScope, create_task_group
27from .abc._tasks import TaskStatus
29T_Retval = TypeVar("T_Retval")
30T_co = TypeVar("T_co")
33def run(func: Callable[..., Awaitable[T_Retval]], *args: object) -> T_Retval:
34 """
35 Call a coroutine function from a worker thread.
37 :param func: a coroutine function
38 :param args: positional arguments for the callable
39 :return: the return value of the coroutine function
41 """
42 try:
43 asynclib = threadlocals.current_async_module
44 except AttributeError:
45 raise RuntimeError("This function can only be run from an AnyIO worker thread")
47 return asynclib.run_async_from_thread(func, *args)
50def run_async_from_thread(
51 func: Callable[..., Awaitable[T_Retval]], *args: object
52) -> T_Retval:
53 warn(
54 "run_async_from_thread() has been deprecated, use anyio.from_thread.run() instead",
55 DeprecationWarning,
56 )
57 return run(func, *args)
60def run_sync(func: Callable[..., T_Retval], *args: object) -> T_Retval:
61 """
62 Call a function in the event loop thread from a worker thread.
64 :param func: a callable
65 :param args: positional arguments for the callable
66 :return: the return value of the callable
68 """
69 try:
70 asynclib = threadlocals.current_async_module
71 except AttributeError:
72 raise RuntimeError("This function can only be run from an AnyIO worker thread")
74 return asynclib.run_sync_from_thread(func, *args)
77def run_sync_from_thread(func: Callable[..., T_Retval], *args: object) -> T_Retval:
78 warn(
79 "run_sync_from_thread() has been deprecated, use anyio.from_thread.run_sync() instead",
80 DeprecationWarning,
81 )
82 return run_sync(func, *args)
85class _BlockingAsyncContextManager(Generic[T_co], AbstractContextManager):
86 _enter_future: Future
87 _exit_future: Future
88 _exit_event: Event
89 _exit_exc_info: tuple[
90 type[BaseException] | None, BaseException | None, TracebackType | None
91 ] = (None, None, None)
93 def __init__(self, async_cm: AsyncContextManager[T_co], portal: BlockingPortal):
94 self._async_cm = async_cm
95 self._portal = portal
97 async def run_async_cm(self) -> bool | None:
98 try:
99 self._exit_event = Event()
100 value = await self._async_cm.__aenter__()
101 except BaseException as exc:
102 self._enter_future.set_exception(exc)
103 raise
104 else:
105 self._enter_future.set_result(value)
107 try:
108 # Wait for the sync context manager to exit.
109 # This next statement can raise `get_cancelled_exc_class()` if
110 # something went wrong in a task group in this async context
111 # manager.
112 await self._exit_event.wait()
113 finally:
114 # In case of cancellation, it could be that we end up here before
115 # `_BlockingAsyncContextManager.__exit__` is called, and an
116 # `_exit_exc_info` has been set.
117 result = await self._async_cm.__aexit__(*self._exit_exc_info)
118 return result
120 def __enter__(self) -> T_co:
121 self._enter_future = Future()
122 self._exit_future = self._portal.start_task_soon(self.run_async_cm)
123 cm = self._enter_future.result()
124 return cast(T_co, cm)
126 def __exit__(
127 self,
128 __exc_type: type[BaseException] | None,
129 __exc_value: BaseException | None,
130 __traceback: TracebackType | None,
131 ) -> bool | None:
132 self._exit_exc_info = __exc_type, __exc_value, __traceback
133 self._portal.call(self._exit_event.set)
134 return self._exit_future.result()
137class _BlockingPortalTaskStatus(TaskStatus):
138 def __init__(self, future: Future):
139 self._future = future
141 def started(self, value: object = None) -> None:
142 self._future.set_result(value)
145class BlockingPortal:
146 """An object that lets external threads run code in an asynchronous event loop."""
148 def __new__(cls) -> BlockingPortal:
149 return get_asynclib().BlockingPortal()
151 def __init__(self) -> None:
152 self._event_loop_thread_id: int | None = threading.get_ident()
153 self._stop_event = Event()
154 self._task_group = create_task_group()
155 self._cancelled_exc_class = get_cancelled_exc_class()
157 async def __aenter__(self) -> BlockingPortal:
158 await self._task_group.__aenter__()
159 return self
161 async def __aexit__(
162 self,
163 exc_type: type[BaseException] | None,
164 exc_val: BaseException | None,
165 exc_tb: TracebackType | None,
166 ) -> bool | None:
167 await self.stop()
168 return await self._task_group.__aexit__(exc_type, exc_val, exc_tb)
170 def _check_running(self) -> None:
171 if self._event_loop_thread_id is None:
172 raise RuntimeError("This portal is not running")
173 if self._event_loop_thread_id == threading.get_ident():
174 raise RuntimeError(
175 "This method cannot be called from the event loop thread"
176 )
178 async def sleep_until_stopped(self) -> None:
179 """Sleep until :meth:`stop` is called."""
180 await self._stop_event.wait()
182 async def stop(self, cancel_remaining: bool = False) -> None:
183 """
184 Signal the portal to shut down.
186 This marks the portal as no longer accepting new calls and exits from
187 :meth:`sleep_until_stopped`.
189 :param cancel_remaining: ``True`` to cancel all the remaining tasks, ``False`` to let them
190 finish before returning
192 """
193 self._event_loop_thread_id = None
194 self._stop_event.set()
195 if cancel_remaining:
196 self._task_group.cancel_scope.cancel()
198 async def _call_func(
199 self, func: Callable, args: tuple, kwargs: dict[str, Any], future: Future
200 ) -> None:
201 def callback(f: Future) -> None:
202 if f.cancelled() and self._event_loop_thread_id not in (
203 None,
204 threading.get_ident(),
205 ):
206 self.call(scope.cancel)
208 try:
209 retval = func(*args, **kwargs)
210 if iscoroutine(retval):
211 with CancelScope() as scope:
212 if future.cancelled():
213 scope.cancel()
214 else:
215 future.add_done_callback(callback)
217 retval = await retval
218 except self._cancelled_exc_class:
219 future.cancel()
220 except BaseException as exc:
221 if not future.cancelled():
222 future.set_exception(exc)
224 # Let base exceptions fall through
225 if not isinstance(exc, Exception):
226 raise
227 else:
228 if not future.cancelled():
229 future.set_result(retval)
230 finally:
231 scope = None # type: ignore[assignment]
233 def _spawn_task_from_thread(
234 self,
235 func: Callable,
236 args: tuple,
237 kwargs: dict[str, Any],
238 name: object,
239 future: Future,
240 ) -> None:
241 """
242 Spawn a new task using the given callable.
244 Implementors must ensure that the future is resolved when the task finishes.
246 :param func: a callable
247 :param args: positional arguments to be passed to the callable
248 :param kwargs: keyword arguments to be passed to the callable
249 :param name: name of the task (will be coerced to a string if not ``None``)
250 :param future: a future that will resolve to the return value of the callable, or the
251 exception raised during its execution
253 """
254 raise NotImplementedError
256 @overload
257 def call(self, func: Callable[..., Awaitable[T_Retval]], *args: object) -> T_Retval:
258 ...
260 @overload
261 def call(self, func: Callable[..., T_Retval], *args: object) -> T_Retval:
262 ...
264 def call(
265 self, func: Callable[..., Awaitable[T_Retval] | T_Retval], *args: object
266 ) -> T_Retval:
267 """
268 Call the given function in the event loop thread.
270 If the callable returns a coroutine object, it is awaited on.
272 :param func: any callable
273 :raises RuntimeError: if the portal is not running or if this method is called from within
274 the event loop thread
276 """
277 return cast(T_Retval, self.start_task_soon(func, *args).result())
279 @overload
280 def spawn_task(
281 self,
282 func: Callable[..., Awaitable[T_Retval]],
283 *args: object,
284 name: object = None,
285 ) -> Future[T_Retval]:
286 ...
288 @overload
289 def spawn_task(
290 self, func: Callable[..., T_Retval], *args: object, name: object = None
291 ) -> Future[T_Retval]:
292 ...
294 def spawn_task(
295 self,
296 func: Callable[..., Awaitable[T_Retval] | T_Retval],
297 *args: object,
298 name: object = None,
299 ) -> Future[T_Retval]:
300 """
301 Start a task in the portal's task group.
303 :param func: the target coroutine function
304 :param args: positional arguments passed to ``func``
305 :param name: name of the task (will be coerced to a string if not ``None``)
306 :return: a future that resolves with the return value of the callable if the task completes
307 successfully, or with the exception raised in the task
308 :raises RuntimeError: if the portal is not running or if this method is called from within
309 the event loop thread
311 .. versionadded:: 2.1
312 .. deprecated:: 3.0
313 Use :meth:`start_task_soon` instead. If your code needs AnyIO 2 compatibility, you
314 can keep using this until AnyIO 4.
316 """
317 warn(
318 "spawn_task() is deprecated -- use start_task_soon() instead",
319 DeprecationWarning,
320 )
321 return self.start_task_soon(func, *args, name=name) # type: ignore[arg-type]
323 @overload
324 def start_task_soon(
325 self,
326 func: Callable[..., Awaitable[T_Retval]],
327 *args: object,
328 name: object = None,
329 ) -> Future[T_Retval]:
330 ...
332 @overload
333 def start_task_soon(
334 self, func: Callable[..., T_Retval], *args: object, name: object = None
335 ) -> Future[T_Retval]:
336 ...
338 def start_task_soon(
339 self,
340 func: Callable[..., Awaitable[T_Retval] | T_Retval],
341 *args: object,
342 name: object = None,
343 ) -> Future[T_Retval]:
344 """
345 Start a task in the portal's task group.
347 The task will be run inside a cancel scope which can be cancelled by cancelling the
348 returned future.
350 :param func: the target function
351 :param args: positional arguments passed to ``func``
352 :param name: name of the task (will be coerced to a string if not ``None``)
353 :return: a future that resolves with the return value of the callable if the task completes
354 successfully, or with the exception raised in the task
355 :raises RuntimeError: if the portal is not running or if this method is called from within
356 the event loop thread
358 .. versionadded:: 3.0
360 """
361 self._check_running()
362 f: Future = Future()
363 self._spawn_task_from_thread(func, args, {}, name, f)
364 return f
366 def start_task(
367 self, func: Callable[..., Awaitable[Any]], *args: object, name: object = None
368 ) -> tuple[Future[Any], Any]:
369 """
370 Start a task in the portal's task group and wait until it signals for readiness.
372 This method works the same way as :meth:`TaskGroup.start`.
374 :param func: the target function
375 :param args: positional arguments passed to ``func``
376 :param name: name of the task (will be coerced to a string if not ``None``)
377 :return: a tuple of (future, task_status_value) where the ``task_status_value`` is the
378 value passed to ``task_status.started()`` from within the target function
380 .. versionadded:: 3.0
382 """
384 def task_done(future: Future) -> None:
385 if not task_status_future.done():
386 if future.cancelled():
387 task_status_future.cancel()
388 elif future.exception():
389 task_status_future.set_exception(future.exception())
390 else:
391 exc = RuntimeError(
392 "Task exited without calling task_status.started()"
393 )
394 task_status_future.set_exception(exc)
396 self._check_running()
397 task_status_future: Future = Future()
398 task_status = _BlockingPortalTaskStatus(task_status_future)
399 f: Future = Future()
400 f.add_done_callback(task_done)
401 self._spawn_task_from_thread(func, args, {"task_status": task_status}, name, f)
402 return f, task_status_future.result()
404 def wrap_async_context_manager(
405 self, cm: AsyncContextManager[T_co]
406 ) -> ContextManager[T_co]:
407 """
408 Wrap an async context manager as a synchronous context manager via this portal.
410 Spawns a task that will call both ``__aenter__()`` and ``__aexit__()``, stopping in the
411 middle until the synchronous context manager exits.
413 :param cm: an asynchronous context manager
414 :return: a synchronous context manager
416 .. versionadded:: 2.1
418 """
419 return _BlockingAsyncContextManager(cm, self)
422def create_blocking_portal() -> BlockingPortal:
423 """
424 Create a portal for running functions in the event loop thread from external threads.
426 Use this function in asynchronous code when you need to allow external threads access to the
427 event loop where your asynchronous code is currently running.
429 .. deprecated:: 3.0
430 Use :class:`.BlockingPortal` directly.
432 """
433 warn(
434 "create_blocking_portal() has been deprecated -- use anyio.from_thread.BlockingPortal() "
435 "directly",
436 DeprecationWarning,
437 )
438 return BlockingPortal()
441@contextmanager
442def start_blocking_portal(
443 backend: str = "asyncio", backend_options: dict[str, Any] | None = None
444) -> Generator[BlockingPortal, Any, None]:
445 """
446 Start a new event loop in a new thread and run a blocking portal in its main task.
448 The parameters are the same as for :func:`~anyio.run`.
450 :param backend: name of the backend
451 :param backend_options: backend options
452 :return: a context manager that yields a blocking portal
454 .. versionchanged:: 3.0
455 Usage as a context manager is now required.
457 """
459 async def run_portal() -> None:
460 async with BlockingPortal() as portal_:
461 if future.set_running_or_notify_cancel():
462 future.set_result(portal_)
463 await portal_.sleep_until_stopped()
465 future: Future[BlockingPortal] = Future()
466 with ThreadPoolExecutor(1) as executor:
467 run_future = executor.submit(
468 _eventloop.run,
469 run_portal, # type: ignore[arg-type]
470 backend=backend,
471 backend_options=backend_options,
472 )
473 try:
474 wait(
475 cast(Iterable[Future], [run_future, future]),
476 return_when=FIRST_COMPLETED,
477 )
478 except BaseException:
479 future.cancel()
480 run_future.cancel()
481 raise
483 if future.done():
484 portal = future.result()
485 cancel_remaining_tasks = False
486 try:
487 yield portal
488 except BaseException:
489 cancel_remaining_tasks = True
490 raise
491 finally:
492 try:
493 portal.call(portal.stop, cancel_remaining_tasks)
494 except RuntimeError:
495 pass
497 run_future.result()