Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/anyio/_core/_eventloop.py: 38%
58 statements
« prev ^ index » next coverage.py v7.2.7, created at 2023-06-07 07:19 +0000
« prev ^ index » next coverage.py v7.2.7, created at 2023-06-07 07:19 +0000
1from __future__ import annotations
3import math
4import sys
5import threading
6from contextlib import contextmanager
7from importlib import import_module
8from typing import (
9 Any,
10 Awaitable,
11 Callable,
12 Generator,
13 TypeVar,
14)
16import sniffio
18# This must be updated when new backends are introduced
19from ._compat import DeprecatedAwaitableFloat
21BACKENDS = "asyncio", "trio"
23T_Retval = TypeVar("T_Retval")
24threadlocals = threading.local()
27def run(
28 func: Callable[..., Awaitable[T_Retval]],
29 *args: object,
30 backend: str = "asyncio",
31 backend_options: dict[str, Any] | None = None,
32) -> T_Retval:
33 """
34 Run the given coroutine function in an asynchronous event loop.
36 The current thread must not be already running an event loop.
38 :param func: a coroutine function
39 :param args: positional arguments to ``func``
40 :param backend: name of the asynchronous event loop implementation – currently either
41 ``asyncio`` or ``trio``
42 :param backend_options: keyword arguments to call the backend ``run()`` implementation with
43 (documented :ref:`here <backend options>`)
44 :return: the return value of the coroutine function
45 :raises RuntimeError: if an asynchronous event loop is already running in this thread
46 :raises LookupError: if the named backend is not found
48 """
49 try:
50 asynclib_name = sniffio.current_async_library()
51 except sniffio.AsyncLibraryNotFoundError:
52 pass
53 else:
54 raise RuntimeError(f"Already running {asynclib_name} in this thread")
56 try:
57 asynclib = import_module(f"..._backends._{backend}", package=__name__)
58 except ImportError as exc:
59 raise LookupError(f"No such backend: {backend}") from exc
61 token = None
62 if sniffio.current_async_library_cvar.get(None) is None:
63 # Since we're in control of the event loop, we can cache the name of the async library
64 token = sniffio.current_async_library_cvar.set(backend)
66 try:
67 backend_options = backend_options or {}
68 return asynclib.run(func, *args, **backend_options)
69 finally:
70 if token:
71 sniffio.current_async_library_cvar.reset(token)
74async def sleep(delay: float) -> None:
75 """
76 Pause the current task for the specified duration.
78 :param delay: the duration, in seconds
80 """
81 return await get_asynclib().sleep(delay)
84async def sleep_forever() -> None:
85 """
86 Pause the current task until it's cancelled.
88 This is a shortcut for ``sleep(math.inf)``.
90 .. versionadded:: 3.1
92 """
93 await sleep(math.inf)
96async def sleep_until(deadline: float) -> None:
97 """
98 Pause the current task until the given time.
100 :param deadline: the absolute time to wake up at (according to the internal monotonic clock of
101 the event loop)
103 .. versionadded:: 3.1
105 """
106 now = current_time()
107 await sleep(max(deadline - now, 0))
110def current_time() -> DeprecatedAwaitableFloat:
111 """
112 Return the current value of the event loop's internal clock.
114 :return: the clock value (seconds)
116 """
117 return DeprecatedAwaitableFloat(get_asynclib().current_time(), current_time)
120def get_all_backends() -> tuple[str, ...]:
121 """Return a tuple of the names of all built-in backends."""
122 return BACKENDS
125def get_cancelled_exc_class() -> type[BaseException]:
126 """Return the current async library's cancellation exception class."""
127 return get_asynclib().CancelledError
130#
131# Private API
132#
135@contextmanager
136def claim_worker_thread(backend: str) -> Generator[Any, None, None]:
137 module = sys.modules["anyio._backends._" + backend]
138 threadlocals.current_async_module = module
139 try:
140 yield
141 finally:
142 del threadlocals.current_async_module
145def get_asynclib(asynclib_name: str | None = None) -> Any:
146 if asynclib_name is None:
147 asynclib_name = sniffio.current_async_library()
149 modulename = "anyio._backends._" + asynclib_name
150 try:
151 return sys.modules[modulename]
152 except KeyError:
153 return import_module(modulename)