Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/anyio/_core/_eventloop.py: 39%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

66 statements  

1from __future__ import annotations 

2 

3import math 

4import sys 

5import threading 

6from collections.abc import Awaitable, Callable, Generator 

7from contextlib import contextmanager 

8from importlib import import_module 

9from typing import TYPE_CHECKING, Any, TypeVar 

10 

11import sniffio 

12 

13if sys.version_info >= (3, 11): 

14 from typing import TypeVarTuple, Unpack 

15else: 

16 from typing_extensions import TypeVarTuple, Unpack 

17 

18if TYPE_CHECKING: 

19 from ..abc import AsyncBackend 

20 

21# This must be updated when new backends are introduced 

22BACKENDS = "asyncio", "trio" 

23 

24T_Retval = TypeVar("T_Retval") 

25PosArgsT = TypeVarTuple("PosArgsT") 

26 

27threadlocals = threading.local() 

28 

29 

30def run( 

31 func: Callable[[Unpack[PosArgsT]], Awaitable[T_Retval]], 

32 *args: Unpack[PosArgsT], 

33 backend: str = "asyncio", 

34 backend_options: dict[str, Any] | None = None, 

35) -> T_Retval: 

36 """ 

37 Run the given coroutine function in an asynchronous event loop. 

38 

39 The current thread must not be already running an event loop. 

40 

41 :param func: a coroutine function 

42 :param args: positional arguments to ``func`` 

43 :param backend: name of the asynchronous event loop implementation – currently 

44 either ``asyncio`` or ``trio`` 

45 :param backend_options: keyword arguments to call the backend ``run()`` 

46 implementation with (documented :ref:`here <backend options>`) 

47 :return: the return value of the coroutine function 

48 :raises RuntimeError: if an asynchronous event loop is already running in this 

49 thread 

50 :raises LookupError: if the named backend is not found 

51 

52 """ 

53 try: 

54 asynclib_name = sniffio.current_async_library() 

55 except sniffio.AsyncLibraryNotFoundError: 

56 pass 

57 else: 

58 raise RuntimeError(f"Already running {asynclib_name} in this thread") 

59 

60 try: 

61 async_backend = get_async_backend(backend) 

62 except ImportError as exc: 

63 raise LookupError(f"No such backend: {backend}") from exc 

64 

65 token = None 

66 if sniffio.current_async_library_cvar.get(None) is None: 

67 # Since we're in control of the event loop, we can cache the name of the async 

68 # library 

69 token = sniffio.current_async_library_cvar.set(backend) 

70 

71 try: 

72 backend_options = backend_options or {} 

73 return async_backend.run(func, args, {}, backend_options) 

74 finally: 

75 if token: 

76 sniffio.current_async_library_cvar.reset(token) 

77 

78 

79async def sleep(delay: float) -> None: 

80 """ 

81 Pause the current task for the specified duration. 

82 

83 :param delay: the duration, in seconds 

84 

85 """ 

86 return await get_async_backend().sleep(delay) 

87 

88 

89async def sleep_forever() -> None: 

90 """ 

91 Pause the current task until it's cancelled. 

92 

93 This is a shortcut for ``sleep(math.inf)``. 

94 

95 .. versionadded:: 3.1 

96 

97 """ 

98 await sleep(math.inf) 

99 

100 

101async def sleep_until(deadline: float) -> None: 

102 """ 

103 Pause the current task until the given time. 

104 

105 :param deadline: the absolute time to wake up at (according to the internal 

106 monotonic clock of the event loop) 

107 

108 .. versionadded:: 3.1 

109 

110 """ 

111 now = current_time() 

112 await sleep(max(deadline - now, 0)) 

113 

114 

115def current_time() -> float: 

116 """ 

117 Return the current value of the event loop's internal clock. 

118 

119 :return: the clock value (seconds) 

120 

121 """ 

122 return get_async_backend().current_time() 

123 

124 

125def get_all_backends() -> tuple[str, ...]: 

126 """Return a tuple of the names of all built-in backends.""" 

127 return BACKENDS 

128 

129 

130def get_cancelled_exc_class() -> type[BaseException]: 

131 """Return the current async library's cancellation exception class.""" 

132 return get_async_backend().cancelled_exception_class() 

133 

134 

135# 

136# Private API 

137# 

138 

139 

140@contextmanager 

141def claim_worker_thread( 

142 backend_class: type[AsyncBackend], token: object 

143) -> Generator[Any, None, None]: 

144 threadlocals.current_async_backend = backend_class 

145 threadlocals.current_token = token 

146 try: 

147 yield 

148 finally: 

149 del threadlocals.current_async_backend 

150 del threadlocals.current_token 

151 

152 

153def get_async_backend(asynclib_name: str | None = None) -> AsyncBackend: 

154 if asynclib_name is None: 

155 asynclib_name = sniffio.current_async_library() 

156 

157 modulename = "anyio._backends._" + asynclib_name 

158 try: 

159 module = sys.modules[modulename] 

160 except KeyError: 

161 module = import_module(modulename) 

162 

163 return getattr(module, "backend_class")