Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/anyio/_core/_eventloop.py: 40%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

67 statements  

1from __future__ import annotations 

2 

3import math 

4import sys 

5import threading 

6from collections.abc import Awaitable, Callable, Generator 

7from contextlib import contextmanager 

8from importlib import import_module 

9from typing import TYPE_CHECKING, Any, TypeVar 

10 

11import sniffio 

12 

13if sys.version_info >= (3, 11): 

14 from typing import TypeVarTuple, Unpack 

15else: 

16 from typing_extensions import TypeVarTuple, Unpack 

17 

18if TYPE_CHECKING: 

19 from ..abc import AsyncBackend 

20 

21# This must be updated when new backends are introduced 

22BACKENDS = "asyncio", "trio" 

23 

24T_Retval = TypeVar("T_Retval") 

25PosArgsT = TypeVarTuple("PosArgsT") 

26 

27threadlocals = threading.local() 

28loaded_backends: dict[str, type[AsyncBackend]] = {} 

29 

30 

31def run( 

32 func: Callable[[Unpack[PosArgsT]], Awaitable[T_Retval]], 

33 *args: Unpack[PosArgsT], 

34 backend: str = "asyncio", 

35 backend_options: dict[str, Any] | None = None, 

36) -> T_Retval: 

37 """ 

38 Run the given coroutine function in an asynchronous event loop. 

39 

40 The current thread must not be already running an event loop. 

41 

42 :param func: a coroutine function 

43 :param args: positional arguments to ``func`` 

44 :param backend: name of the asynchronous event loop implementation – currently 

45 either ``asyncio`` or ``trio`` 

46 :param backend_options: keyword arguments to call the backend ``run()`` 

47 implementation with (documented :ref:`here <backend options>`) 

48 :return: the return value of the coroutine function 

49 :raises RuntimeError: if an asynchronous event loop is already running in this 

50 thread 

51 :raises LookupError: if the named backend is not found 

52 

53 """ 

54 try: 

55 asynclib_name = sniffio.current_async_library() 

56 except sniffio.AsyncLibraryNotFoundError: 

57 pass 

58 else: 

59 raise RuntimeError(f"Already running {asynclib_name} in this thread") 

60 

61 try: 

62 async_backend = get_async_backend(backend) 

63 except ImportError as exc: 

64 raise LookupError(f"No such backend: {backend}") from exc 

65 

66 token = None 

67 if sniffio.current_async_library_cvar.get(None) is None: 

68 # Since we're in control of the event loop, we can cache the name of the async 

69 # library 

70 token = sniffio.current_async_library_cvar.set(backend) 

71 

72 try: 

73 backend_options = backend_options or {} 

74 return async_backend.run(func, args, {}, backend_options) 

75 finally: 

76 if token: 

77 sniffio.current_async_library_cvar.reset(token) 

78 

79 

80async def sleep(delay: float) -> None: 

81 """ 

82 Pause the current task for the specified duration. 

83 

84 :param delay: the duration, in seconds 

85 

86 """ 

87 return await get_async_backend().sleep(delay) 

88 

89 

90async def sleep_forever() -> None: 

91 """ 

92 Pause the current task until it's cancelled. 

93 

94 This is a shortcut for ``sleep(math.inf)``. 

95 

96 .. versionadded:: 3.1 

97 

98 """ 

99 await sleep(math.inf) 

100 

101 

102async def sleep_until(deadline: float) -> None: 

103 """ 

104 Pause the current task until the given time. 

105 

106 :param deadline: the absolute time to wake up at (according to the internal 

107 monotonic clock of the event loop) 

108 

109 .. versionadded:: 3.1 

110 

111 """ 

112 now = current_time() 

113 await sleep(max(deadline - now, 0)) 

114 

115 

116def current_time() -> float: 

117 """ 

118 Return the current value of the event loop's internal clock. 

119 

120 :return: the clock value (seconds) 

121 

122 """ 

123 return get_async_backend().current_time() 

124 

125 

126def get_all_backends() -> tuple[str, ...]: 

127 """Return a tuple of the names of all built-in backends.""" 

128 return BACKENDS 

129 

130 

131def get_cancelled_exc_class() -> type[BaseException]: 

132 """Return the current async library's cancellation exception class.""" 

133 return get_async_backend().cancelled_exception_class() 

134 

135 

136# 

137# Private API 

138# 

139 

140 

141@contextmanager 

142def claim_worker_thread( 

143 backend_class: type[AsyncBackend], token: object 

144) -> Generator[Any, None, None]: 

145 threadlocals.current_async_backend = backend_class 

146 threadlocals.current_token = token 

147 try: 

148 yield 

149 finally: 

150 del threadlocals.current_async_backend 

151 del threadlocals.current_token 

152 

153 

154def get_async_backend(asynclib_name: str | None = None) -> type[AsyncBackend]: 

155 if asynclib_name is None: 

156 asynclib_name = sniffio.current_async_library() 

157 

158 # We use our own dict instead of sys.modules to get the already imported back-end 

159 # class because the appropriate modules in sys.modules could potentially be only 

160 # partially initialized 

161 try: 

162 return loaded_backends[asynclib_name] 

163 except KeyError: 

164 module = import_module(f"anyio._backends._{asynclib_name}") 

165 loaded_backends[asynclib_name] = module.backend_class 

166 return module.backend_class