Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/anyio/from_thread.py: 32%
190 statements
« prev ^ index » next coverage.py v7.2.2, created at 2023-03-26 06:12 +0000
« prev ^ index » next coverage.py v7.2.2, created at 2023-03-26 06:12 +0000
1import threading
2from asyncio import iscoroutine
3from concurrent.futures import FIRST_COMPLETED, Future, ThreadPoolExecutor, wait
4from contextlib import AbstractContextManager, contextmanager
5from types import TracebackType
6from typing import (
7 Any,
8 AsyncContextManager,
9 Callable,
10 ContextManager,
11 Coroutine,
12 Dict,
13 Generator,
14 Iterable,
15 Optional,
16 Tuple,
17 Type,
18 TypeVar,
19 Union,
20 cast,
21 overload,
22)
23from warnings import warn
25from ._core import _eventloop
26from ._core._eventloop import get_asynclib, get_cancelled_exc_class, threadlocals
27from ._core._synchronization import Event
28from ._core._tasks import CancelScope, create_task_group
29from .abc._tasks import TaskStatus
31T_Retval = TypeVar("T_Retval")
32T_co = TypeVar("T_co")
35def run(func: Callable[..., Coroutine[Any, Any, T_Retval]], *args: object) -> T_Retval:
36 """
37 Call a coroutine function from a worker thread.
39 :param func: a coroutine function
40 :param args: positional arguments for the callable
41 :return: the return value of the coroutine function
43 """
44 try:
45 asynclib = threadlocals.current_async_module
46 except AttributeError:
47 raise RuntimeError("This function can only be run from an AnyIO worker thread")
49 return asynclib.run_async_from_thread(func, *args)
52def run_async_from_thread(
53 func: Callable[..., Coroutine[Any, Any, T_Retval]], *args: object
54) -> T_Retval:
55 warn(
56 "run_async_from_thread() has been deprecated, use anyio.from_thread.run() instead",
57 DeprecationWarning,
58 )
59 return run(func, *args)
62def run_sync(func: Callable[..., T_Retval], *args: object) -> T_Retval:
63 """
64 Call a function in the event loop thread from a worker thread.
66 :param func: a callable
67 :param args: positional arguments for the callable
68 :return: the return value of the callable
70 """
71 try:
72 asynclib = threadlocals.current_async_module
73 except AttributeError:
74 raise RuntimeError("This function can only be run from an AnyIO worker thread")
76 return asynclib.run_sync_from_thread(func, *args)
79def run_sync_from_thread(func: Callable[..., T_Retval], *args: object) -> T_Retval:
80 warn(
81 "run_sync_from_thread() has been deprecated, use anyio.from_thread.run_sync() instead",
82 DeprecationWarning,
83 )
84 return run_sync(func, *args)
87class _BlockingAsyncContextManager(AbstractContextManager):
88 _enter_future: Future
89 _exit_future: Future
90 _exit_event: Event
91 _exit_exc_info: Tuple[
92 Optional[Type[BaseException]], Optional[BaseException], Optional[TracebackType]
93 ] = (None, None, None)
95 def __init__(self, async_cm: AsyncContextManager[T_co], portal: "BlockingPortal"):
96 self._async_cm = async_cm
97 self._portal = portal
99 async def run_async_cm(self) -> Optional[bool]:
100 try:
101 self._exit_event = Event()
102 value = await self._async_cm.__aenter__()
103 except BaseException as exc:
104 self._enter_future.set_exception(exc)
105 raise
106 else:
107 self._enter_future.set_result(value)
109 try:
110 # Wait for the sync context manager to exit.
111 # This next statement can raise `get_cancelled_exc_class()` if
112 # something went wrong in a task group in this async context
113 # manager.
114 await self._exit_event.wait()
115 finally:
116 # In case of cancellation, it could be that we end up here before
117 # `_BlockingAsyncContextManager.__exit__` is called, and an
118 # `_exit_exc_info` has been set.
119 result = await self._async_cm.__aexit__(*self._exit_exc_info)
120 return result
122 def __enter__(self) -> T_co:
123 self._enter_future = Future()
124 self._exit_future = self._portal.start_task_soon(self.run_async_cm)
125 cm = self._enter_future.result()
126 return cast(T_co, cm)
128 def __exit__(
129 self,
130 __exc_type: Optional[Type[BaseException]],
131 __exc_value: Optional[BaseException],
132 __traceback: Optional[TracebackType],
133 ) -> Optional[bool]:
134 self._exit_exc_info = __exc_type, __exc_value, __traceback
135 self._portal.call(self._exit_event.set)
136 return self._exit_future.result()
139class _BlockingPortalTaskStatus(TaskStatus):
140 def __init__(self, future: Future):
141 self._future = future
143 def started(self, value: object = None) -> None:
144 self._future.set_result(value)
147class BlockingPortal:
148 """An object that lets external threads run code in an asynchronous event loop."""
150 def __new__(cls) -> "BlockingPortal":
151 return get_asynclib().BlockingPortal()
153 def __init__(self) -> None:
154 self._event_loop_thread_id: Optional[int] = threading.get_ident()
155 self._stop_event = Event()
156 self._task_group = create_task_group()
157 self._cancelled_exc_class = get_cancelled_exc_class()
159 async def __aenter__(self) -> "BlockingPortal":
160 await self._task_group.__aenter__()
161 return self
163 async def __aexit__(
164 self,
165 exc_type: Optional[Type[BaseException]],
166 exc_val: Optional[BaseException],
167 exc_tb: Optional[TracebackType],
168 ) -> Optional[bool]:
169 await self.stop()
170 return await self._task_group.__aexit__(exc_type, exc_val, exc_tb)
172 def _check_running(self) -> None:
173 if self._event_loop_thread_id is None:
174 raise RuntimeError("This portal is not running")
175 if self._event_loop_thread_id == threading.get_ident():
176 raise RuntimeError(
177 "This method cannot be called from the event loop thread"
178 )
180 async def sleep_until_stopped(self) -> None:
181 """Sleep until :meth:`stop` is called."""
182 await self._stop_event.wait()
184 async def stop(self, cancel_remaining: bool = False) -> None:
185 """
186 Signal the portal to shut down.
188 This marks the portal as no longer accepting new calls and exits from
189 :meth:`sleep_until_stopped`.
191 :param cancel_remaining: ``True`` to cancel all the remaining tasks, ``False`` to let them
192 finish before returning
194 """
195 self._event_loop_thread_id = None
196 self._stop_event.set()
197 if cancel_remaining:
198 self._task_group.cancel_scope.cancel()
200 async def _call_func(
201 self, func: Callable, args: tuple, kwargs: Dict[str, Any], future: Future
202 ) -> None:
203 def callback(f: Future) -> None:
204 if f.cancelled() and self._event_loop_thread_id not in (
205 None,
206 threading.get_ident(),
207 ):
208 self.call(scope.cancel)
210 try:
211 retval = func(*args, **kwargs)
212 if iscoroutine(retval):
213 with CancelScope() as scope:
214 if future.cancelled():
215 scope.cancel()
216 else:
217 future.add_done_callback(callback)
219 retval = await retval
220 except self._cancelled_exc_class:
221 future.cancel()
222 except BaseException as exc:
223 if not future.cancelled():
224 future.set_exception(exc)
226 # Let base exceptions fall through
227 if not isinstance(exc, Exception):
228 raise
229 else:
230 if not future.cancelled():
231 future.set_result(retval)
232 finally:
233 scope = None # type: ignore[assignment]
235 def _spawn_task_from_thread(
236 self,
237 func: Callable,
238 args: tuple,
239 kwargs: Dict[str, Any],
240 name: object,
241 future: Future,
242 ) -> None:
243 """
244 Spawn a new task using the given callable.
246 Implementors must ensure that the future is resolved when the task finishes.
248 :param func: a callable
249 :param args: positional arguments to be passed to the callable
250 :param kwargs: keyword arguments to be passed to the callable
251 :param name: name of the task (will be coerced to a string if not ``None``)
252 :param future: a future that will resolve to the return value of the callable, or the
253 exception raised during its execution
255 """
256 raise NotImplementedError
258 @overload
259 def call(
260 self, func: Callable[..., Coroutine[Any, Any, T_Retval]], *args: object
261 ) -> T_Retval:
262 ...
264 @overload
265 def call(self, func: Callable[..., T_Retval], *args: object) -> T_Retval:
266 ...
268 def call(
269 self,
270 func: Callable[..., Union[Coroutine[Any, Any, T_Retval], T_Retval]],
271 *args: object
272 ) -> T_Retval:
273 """
274 Call the given function in the event loop thread.
276 If the callable returns a coroutine object, it is awaited on.
278 :param func: any callable
279 :raises RuntimeError: if the portal is not running or if this method is called from within
280 the event loop thread
282 """
283 return cast(T_Retval, self.start_task_soon(func, *args).result())
285 @overload
286 def spawn_task(
287 self,
288 func: Callable[..., Coroutine[Any, Any, T_Retval]],
289 *args: object,
290 name: object = None
291 ) -> "Future[T_Retval]":
292 ...
294 @overload
295 def spawn_task(
296 self, func: Callable[..., T_Retval], *args: object, name: object = None
297 ) -> "Future[T_Retval]":
298 ...
300 def spawn_task(
301 self,
302 func: Callable[..., Union[Coroutine[Any, Any, T_Retval], T_Retval]],
303 *args: object,
304 name: object = None
305 ) -> "Future[T_Retval]":
306 """
307 Start a task in the portal's task group.
309 :param func: the target coroutine function
310 :param args: positional arguments passed to ``func``
311 :param name: name of the task (will be coerced to a string if not ``None``)
312 :return: a future that resolves with the return value of the callable if the task completes
313 successfully, or with the exception raised in the task
314 :raises RuntimeError: if the portal is not running or if this method is called from within
315 the event loop thread
317 .. versionadded:: 2.1
318 .. deprecated:: 3.0
319 Use :meth:`start_task_soon` instead. If your code needs AnyIO 2 compatibility, you
320 can keep using this until AnyIO 4.
322 """
323 warn(
324 "spawn_task() is deprecated -- use start_task_soon() instead",
325 DeprecationWarning,
326 )
327 return self.start_task_soon(func, *args, name=name) # type: ignore[arg-type]
329 @overload
330 def start_task_soon(
331 self,
332 func: Callable[..., Coroutine[Any, Any, T_Retval]],
333 *args: object,
334 name: object = None
335 ) -> "Future[T_Retval]":
336 ...
338 @overload
339 def start_task_soon(
340 self, func: Callable[..., T_Retval], *args: object, name: object = None
341 ) -> "Future[T_Retval]":
342 ...
344 def start_task_soon(
345 self,
346 func: Callable[..., Union[Coroutine[Any, Any, T_Retval], T_Retval]],
347 *args: object,
348 name: object = None
349 ) -> "Future[T_Retval]":
350 """
351 Start a task in the portal's task group.
353 The task will be run inside a cancel scope which can be cancelled by cancelling the
354 returned future.
356 :param func: the target coroutine function
357 :param args: positional arguments passed to ``func``
358 :param name: name of the task (will be coerced to a string if not ``None``)
359 :return: a future that resolves with the return value of the callable if the task completes
360 successfully, or with the exception raised in the task
361 :raises RuntimeError: if the portal is not running or if this method is called from within
362 the event loop thread
364 .. versionadded:: 3.0
366 """
367 self._check_running()
368 f: Future = Future()
369 self._spawn_task_from_thread(func, args, {}, name, f)
370 return f
372 def start_task(
373 self,
374 func: Callable[..., Coroutine[Any, Any, Any]],
375 *args: object,
376 name: object = None
377 ) -> Tuple["Future[Any]", Any]:
378 """
379 Start a task in the portal's task group and wait until it signals for readiness.
381 This method works the same way as :meth:`TaskGroup.start`.
383 :param func: the target coroutine function
384 :param args: positional arguments passed to ``func``
385 :param name: name of the task (will be coerced to a string if not ``None``)
386 :return: a tuple of (future, task_status_value) where the ``task_status_value`` is the
387 value passed to ``task_status.started()`` from within the target function
389 .. versionadded:: 3.0
391 """
393 def task_done(future: Future) -> None:
394 if not task_status_future.done():
395 if future.cancelled():
396 task_status_future.cancel()
397 elif future.exception():
398 task_status_future.set_exception(future.exception())
399 else:
400 exc = RuntimeError(
401 "Task exited without calling task_status.started()"
402 )
403 task_status_future.set_exception(exc)
405 self._check_running()
406 task_status_future: Future = Future()
407 task_status = _BlockingPortalTaskStatus(task_status_future)
408 f: Future = Future()
409 f.add_done_callback(task_done)
410 self._spawn_task_from_thread(func, args, {"task_status": task_status}, name, f)
411 return f, task_status_future.result()
413 def wrap_async_context_manager(
414 self, cm: AsyncContextManager[T_co]
415 ) -> ContextManager[T_co]:
416 """
417 Wrap an async context manager as a synchronous context manager via this portal.
419 Spawns a task that will call both ``__aenter__()`` and ``__aexit__()``, stopping in the
420 middle until the synchronous context manager exits.
422 :param cm: an asynchronous context manager
423 :return: a synchronous context manager
425 .. versionadded:: 2.1
427 """
428 return _BlockingAsyncContextManager(cm, self)
431def create_blocking_portal() -> BlockingPortal:
432 """
433 Create a portal for running functions in the event loop thread from external threads.
435 Use this function in asynchronous code when you need to allow external threads access to the
436 event loop where your asynchronous code is currently running.
438 .. deprecated:: 3.0
439 Use :class:`.BlockingPortal` directly.
441 """
442 warn(
443 "create_blocking_portal() has been deprecated -- use anyio.from_thread.BlockingPortal() "
444 "directly",
445 DeprecationWarning,
446 )
447 return BlockingPortal()
450@contextmanager
451def start_blocking_portal(
452 backend: str = "asyncio", backend_options: Optional[Dict[str, Any]] = None
453) -> Generator[BlockingPortal, Any, None]:
454 """
455 Start a new event loop in a new thread and run a blocking portal in its main task.
457 The parameters are the same as for :func:`~anyio.run`.
459 :param backend: name of the backend
460 :param backend_options: backend options
461 :return: a context manager that yields a blocking portal
463 .. versionchanged:: 3.0
464 Usage as a context manager is now required.
466 """
468 async def run_portal() -> None:
469 async with BlockingPortal() as portal_:
470 if future.set_running_or_notify_cancel():
471 future.set_result(portal_)
472 await portal_.sleep_until_stopped()
474 future: Future[BlockingPortal] = Future()
475 with ThreadPoolExecutor(1) as executor:
476 run_future = executor.submit(
477 _eventloop.run,
478 run_portal, # type: ignore[arg-type]
479 backend=backend,
480 backend_options=backend_options,
481 )
482 try:
483 wait(
484 cast(Iterable[Future], [run_future, future]),
485 return_when=FIRST_COMPLETED,
486 )
487 except BaseException:
488 future.cancel()
489 run_future.cancel()
490 raise
492 if future.done():
493 portal = future.result()
494 try:
495 yield portal
496 except BaseException:
497 portal.call(portal.stop, True)
498 raise
500 portal.call(portal.stop, False)
502 run_future.result()