Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/anyio/_core/_eventloop.py: 38%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

101 statements  

1from __future__ import annotations 

2 

3import math 

4import sys 

5import threading 

6from collections.abc import Awaitable, Callable, Generator 

7from contextlib import contextmanager 

8from contextvars import Token 

9from importlib import import_module 

10from typing import TYPE_CHECKING, Any, TypeVar 

11 

12if sys.version_info >= (3, 11): 

13 from typing import TypeVarTuple, Unpack 

14else: 

15 from typing_extensions import TypeVarTuple, Unpack 

16 

17sniffio: Any 

18try: 

19 import sniffio 

20except ModuleNotFoundError: 

21 sniffio = None 

22 

23if TYPE_CHECKING: 

24 from ..abc import AsyncBackend 

25 

26# This must be updated when new backends are introduced 

27BACKENDS = "asyncio", "trio" 

28 

29T_Retval = TypeVar("T_Retval") 

30PosArgsT = TypeVarTuple("PosArgsT") 

31 

32threadlocals = threading.local() 

33loaded_backends: dict[str, type[AsyncBackend]] = {} 

34 

35 

36def run( 

37 func: Callable[[Unpack[PosArgsT]], Awaitable[T_Retval]], 

38 *args: Unpack[PosArgsT], 

39 backend: str = "asyncio", 

40 backend_options: dict[str, Any] | None = None, 

41) -> T_Retval: 

42 """ 

43 Run the given coroutine function in an asynchronous event loop. 

44 

45 The current thread must not be already running an event loop. 

46 

47 :param func: a coroutine function 

48 :param args: positional arguments to ``func`` 

49 :param backend: name of the asynchronous event loop implementation – currently 

50 either ``asyncio`` or ``trio`` 

51 :param backend_options: keyword arguments to call the backend ``run()`` 

52 implementation with (documented :ref:`here <backend options>`) 

53 :return: the return value of the coroutine function 

54 :raises RuntimeError: if an asynchronous event loop is already running in this 

55 thread 

56 :raises LookupError: if the named backend is not found 

57 

58 """ 

59 if asynclib_name := current_async_library(): 

60 raise RuntimeError(f"Already running {asynclib_name} in this thread") 

61 

62 try: 

63 async_backend = get_async_backend(backend) 

64 except ImportError as exc: 

65 raise LookupError(f"No such backend: {backend}") from exc 

66 

67 token = None 

68 if asynclib_name is None: 

69 # Since we're in control of the event loop, we can cache the name of the async 

70 # library 

71 token = set_current_async_library(backend) 

72 

73 try: 

74 backend_options = backend_options or {} 

75 return async_backend.run(func, args, {}, backend_options) 

76 finally: 

77 reset_current_async_library(token) 

78 

79 

80async def sleep(delay: float) -> None: 

81 """ 

82 Pause the current task for the specified duration. 

83 

84 :param delay: the duration, in seconds 

85 

86 """ 

87 return await get_async_backend().sleep(delay) 

88 

89 

90async def sleep_forever() -> None: 

91 """ 

92 Pause the current task until it's cancelled. 

93 

94 This is a shortcut for ``sleep(math.inf)``. 

95 

96 .. versionadded:: 3.1 

97 

98 """ 

99 await sleep(math.inf) 

100 

101 

102async def sleep_until(deadline: float) -> None: 

103 """ 

104 Pause the current task until the given time. 

105 

106 :param deadline: the absolute time to wake up at (according to the internal 

107 monotonic clock of the event loop) 

108 

109 .. versionadded:: 3.1 

110 

111 """ 

112 now = current_time() 

113 await sleep(max(deadline - now, 0)) 

114 

115 

116def current_time() -> float: 

117 """ 

118 Return the current value of the event loop's internal clock. 

119 

120 :return: the clock value (seconds) 

121 

122 """ 

123 return get_async_backend().current_time() 

124 

125 

126def get_all_backends() -> tuple[str, ...]: 

127 """Return a tuple of the names of all built-in backends.""" 

128 return BACKENDS 

129 

130 

131def get_available_backends() -> tuple[str, ...]: 

132 """ 

133 Test for the availability of built-in backends. 

134 

135 :return a tuple of the built-in backend names that were successfully imported 

136 

137 .. versionadded:: 4.12 

138 

139 """ 

140 available_backends: list[str] = [] 

141 for backend_name in get_all_backends(): 

142 try: 

143 get_async_backend(backend_name) 

144 except ImportError: 

145 continue 

146 

147 available_backends.append(backend_name) 

148 

149 return tuple(available_backends) 

150 

151 

152def get_cancelled_exc_class() -> type[BaseException]: 

153 """Return the current async library's cancellation exception class.""" 

154 return get_async_backend().cancelled_exception_class() 

155 

156 

157# 

158# Private API 

159# 

160 

161 

162@contextmanager 

163def claim_worker_thread( 

164 backend_class: type[AsyncBackend], token: object 

165) -> Generator[Any, None, None]: 

166 from ..lowlevel import EventLoopToken 

167 

168 threadlocals.current_token = EventLoopToken(backend_class, token) 

169 try: 

170 yield 

171 finally: 

172 del threadlocals.current_token 

173 

174 

175class NoCurrentAsyncBackend(Exception): 

176 def __init__(self) -> None: 

177 super().__init__( 

178 f"Not currently running on any asynchronous event loop" 

179 f"Available async backends: {', '.join(get_all_backends())}" 

180 ) 

181 

182 

183def get_async_backend(asynclib_name: str | None = None) -> type[AsyncBackend]: 

184 if asynclib_name is None: 

185 asynclib_name = current_async_library() 

186 if not asynclib_name: 

187 raise NoCurrentAsyncBackend 

188 

189 # We use our own dict instead of sys.modules to get the already imported back-end 

190 # class because the appropriate modules in sys.modules could potentially be only 

191 # partially initialized 

192 try: 

193 return loaded_backends[asynclib_name] 

194 except KeyError: 

195 module = import_module(f"anyio._backends._{asynclib_name}") 

196 loaded_backends[asynclib_name] = module.backend_class 

197 return module.backend_class 

198 

199 

200def current_async_library() -> str | None: 

201 if sniffio is None: 

202 # If sniffio is not installed, we assume we're either running asyncio or nothing 

203 import asyncio 

204 

205 try: 

206 asyncio.get_running_loop() 

207 return "asyncio" 

208 except RuntimeError: 

209 pass 

210 else: 

211 try: 

212 return sniffio.current_async_library() 

213 except sniffio.AsyncLibraryNotFoundError: 

214 pass 

215 

216 return None 

217 

218 

219def set_current_async_library(asynclib_name: str | None) -> Token | None: 

220 # no-op if sniffio is not installed 

221 if sniffio is None: 

222 return None 

223 

224 return sniffio.current_async_library_cvar.set(asynclib_name) 

225 

226 

227def reset_current_async_library(token: Token | None) -> None: 

228 if token is not None: 

229 sniffio.current_async_library_cvar.reset(token)