Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/anyio/_core/_eventloop.py: 35%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

99 statements  

1from __future__ import annotations 

2 

3import math 

4import sys 

5import threading 

6from collections.abc import Awaitable, Callable, Generator 

7from contextlib import contextmanager 

8from contextvars import Token 

9from importlib import import_module 

10from typing import TYPE_CHECKING, Any, TypeVar 

11 

12from ._exceptions import NoEventLoopError 

13 

14if sys.version_info >= (3, 11): 

15 from typing import TypeVarTuple, Unpack 

16else: 

17 from typing_extensions import TypeVarTuple, Unpack 

18 

19sniffio: Any 

20try: 

21 import sniffio 

22except ModuleNotFoundError: 

23 sniffio = None 

24 

25if TYPE_CHECKING: 

26 from ..abc import AsyncBackend 

27 

28# This must be updated when new backends are introduced 

29BACKENDS = "asyncio", "trio" 

30 

31T_Retval = TypeVar("T_Retval") 

32PosArgsT = TypeVarTuple("PosArgsT") 

33 

34threadlocals = threading.local() 

35loaded_backends: dict[str, type[AsyncBackend]] = {} 

36 

37 

38def run( 

39 func: Callable[[Unpack[PosArgsT]], Awaitable[T_Retval]], 

40 *args: Unpack[PosArgsT], 

41 backend: str = "asyncio", 

42 backend_options: dict[str, Any] | None = None, 

43) -> T_Retval: 

44 """ 

45 Run the given coroutine function in an asynchronous event loop. 

46 

47 The current thread must not be already running an event loop. 

48 

49 :param func: a coroutine function 

50 :param args: positional arguments to ``func`` 

51 :param backend: name of the asynchronous event loop implementation – currently 

52 either ``asyncio`` or ``trio`` 

53 :param backend_options: keyword arguments to call the backend ``run()`` 

54 implementation with (documented :ref:`here <backend options>`) 

55 :return: the return value of the coroutine function 

56 :raises RuntimeError: if an asynchronous event loop is already running in this 

57 thread 

58 :raises LookupError: if the named backend is not found 

59 

60 """ 

61 if asynclib_name := current_async_library(): 

62 raise RuntimeError(f"Already running {asynclib_name} in this thread") 

63 

64 try: 

65 async_backend = get_async_backend(backend) 

66 except ImportError as exc: 

67 raise LookupError(f"No such backend: {backend}") from exc 

68 

69 token = None 

70 if asynclib_name is None: 

71 # Since we're in control of the event loop, we can cache the name of the async 

72 # library 

73 token = set_current_async_library(backend) 

74 

75 try: 

76 backend_options = backend_options or {} 

77 return async_backend.run(func, args, {}, backend_options) 

78 finally: 

79 reset_current_async_library(token) 

80 

81 

82async def sleep(delay: float) -> None: 

83 """ 

84 Pause the current task for the specified duration. 

85 

86 :param delay: the duration, in seconds 

87 

88 """ 

89 return await get_async_backend().sleep(delay) 

90 

91 

92async def sleep_forever() -> None: 

93 """ 

94 Pause the current task until it's cancelled. 

95 

96 This is a shortcut for ``sleep(math.inf)``. 

97 

98 .. versionadded:: 3.1 

99 

100 """ 

101 await sleep(math.inf) 

102 

103 

104async def sleep_until(deadline: float) -> None: 

105 """ 

106 Pause the current task until the given time. 

107 

108 :param deadline: the absolute time to wake up at (according to the internal 

109 monotonic clock of the event loop) 

110 

111 .. versionadded:: 3.1 

112 

113 """ 

114 now = current_time() 

115 await sleep(max(deadline - now, 0)) 

116 

117 

118def current_time() -> float: 

119 """ 

120 Return the current value of the event loop's internal clock. 

121 

122 :return: the clock value (seconds) 

123 :raises NoEventLoopError: if no supported asynchronous event loop is running in the 

124 current thread 

125 

126 """ 

127 return get_async_backend().current_time() 

128 

129 

130def get_all_backends() -> tuple[str, ...]: 

131 """Return a tuple of the names of all built-in backends.""" 

132 return BACKENDS 

133 

134 

135def get_available_backends() -> tuple[str, ...]: 

136 """ 

137 Test for the availability of built-in backends. 

138 

139 :return a tuple of the built-in backend names that were successfully imported 

140 

141 .. versionadded:: 4.12 

142 

143 """ 

144 available_backends: list[str] = [] 

145 for backend_name in get_all_backends(): 

146 try: 

147 get_async_backend(backend_name) 

148 except ImportError: 

149 continue 

150 

151 available_backends.append(backend_name) 

152 

153 return tuple(available_backends) 

154 

155 

156def get_cancelled_exc_class() -> type[BaseException]: 

157 """ 

158 Return the current async library's cancellation exception class. 

159 

160 :raises NoEventLoopError: if no supported asynchronous event loop is running in the 

161 current thread 

162 

163 """ 

164 return get_async_backend().cancelled_exception_class() 

165 

166 

167# 

168# Private API 

169# 

170 

171 

172@contextmanager 

173def claim_worker_thread( 

174 backend_class: type[AsyncBackend], token: object 

175) -> Generator[Any, None, None]: 

176 from ..lowlevel import EventLoopToken 

177 

178 threadlocals.current_token = EventLoopToken(backend_class, token) 

179 try: 

180 yield 

181 finally: 

182 del threadlocals.current_token 

183 

184 

185def get_async_backend(asynclib_name: str | None = None) -> type[AsyncBackend]: 

186 if asynclib_name is None: 

187 asynclib_name = current_async_library() 

188 if not asynclib_name: 

189 raise NoEventLoopError( 

190 f"Not currently running on any asynchronous event loop. " 

191 f"Available async backends: {', '.join(get_all_backends())}" 

192 ) 

193 

194 # We use our own dict instead of sys.modules to get the already imported back-end 

195 # class because the appropriate modules in sys.modules could potentially be only 

196 # partially initialized 

197 try: 

198 return loaded_backends[asynclib_name] 

199 except KeyError: 

200 module = import_module(f"anyio._backends._{asynclib_name}") 

201 loaded_backends[asynclib_name] = module.backend_class 

202 return module.backend_class 

203 

204 

205def current_async_library() -> str | None: 

206 if sniffio is None: 

207 # If sniffio is not installed, we assume we're either running asyncio or nothing 

208 import asyncio 

209 

210 try: 

211 asyncio.get_running_loop() 

212 return "asyncio" 

213 except RuntimeError: 

214 pass 

215 else: 

216 try: 

217 return sniffio.current_async_library() 

218 except sniffio.AsyncLibraryNotFoundError: 

219 pass 

220 

221 return None 

222 

223 

224def set_current_async_library(asynclib_name: str | None) -> Token | None: 

225 # no-op if sniffio is not installed 

226 if sniffio is None: 

227 return None 

228 

229 return sniffio.current_async_library_cvar.set(asynclib_name) 

230 

231 

232def reset_current_async_library(token: Token | None) -> None: 

233 if token is not None: 

234 sniffio.current_async_library_cvar.reset(token)