Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/anyio/_core/_eventloop.py: 37%

62 statements  

« prev     ^ index     » next       coverage.py v7.3.1, created at 2023-09-25 06:38 +0000

1from __future__ import annotations 

2 

3import math 

4import sys 

5import threading 

6from collections.abc import Awaitable, Callable, Generator 

7from contextlib import contextmanager 

8from importlib import import_module 

9from typing import TYPE_CHECKING, Any, TypeVar 

10 

11import sniffio 

12 

13if TYPE_CHECKING: 

14 from ..abc import AsyncBackend 

15 

16# This must be updated when new backends are introduced 

17BACKENDS = "asyncio", "trio" 

18 

19T_Retval = TypeVar("T_Retval") 

20threadlocals = threading.local() 

21 

22 

23def run( 

24 func: Callable[..., Awaitable[T_Retval]], 

25 *args: object, 

26 backend: str = "asyncio", 

27 backend_options: dict[str, Any] | None = None, 

28) -> T_Retval: 

29 """ 

30 Run the given coroutine function in an asynchronous event loop. 

31 

32 The current thread must not be already running an event loop. 

33 

34 :param func: a coroutine function 

35 :param args: positional arguments to ``func`` 

36 :param backend: name of the asynchronous event loop implementation – currently 

37 either ``asyncio`` or ``trio`` 

38 :param backend_options: keyword arguments to call the backend ``run()`` 

39 implementation with (documented :ref:`here <backend options>`) 

40 :return: the return value of the coroutine function 

41 :raises RuntimeError: if an asynchronous event loop is already running in this 

42 thread 

43 :raises LookupError: if the named backend is not found 

44 

45 """ 

46 try: 

47 asynclib_name = sniffio.current_async_library() 

48 except sniffio.AsyncLibraryNotFoundError: 

49 pass 

50 else: 

51 raise RuntimeError(f"Already running {asynclib_name} in this thread") 

52 

53 try: 

54 async_backend = get_async_backend(backend) 

55 except ImportError as exc: 

56 raise LookupError(f"No such backend: {backend}") from exc 

57 

58 token = None 

59 if sniffio.current_async_library_cvar.get(None) is None: 

60 # Since we're in control of the event loop, we can cache the name of the async 

61 # library 

62 token = sniffio.current_async_library_cvar.set(backend) 

63 

64 try: 

65 backend_options = backend_options or {} 

66 return async_backend.run(func, args, {}, backend_options) 

67 finally: 

68 if token: 

69 sniffio.current_async_library_cvar.reset(token) 

70 

71 

72async def sleep(delay: float) -> None: 

73 """ 

74 Pause the current task for the specified duration. 

75 

76 :param delay: the duration, in seconds 

77 

78 """ 

79 return await get_async_backend().sleep(delay) 

80 

81 

82async def sleep_forever() -> None: 

83 """ 

84 Pause the current task until it's cancelled. 

85 

86 This is a shortcut for ``sleep(math.inf)``. 

87 

88 .. versionadded:: 3.1 

89 

90 """ 

91 await sleep(math.inf) 

92 

93 

94async def sleep_until(deadline: float) -> None: 

95 """ 

96 Pause the current task until the given time. 

97 

98 :param deadline: the absolute time to wake up at (according to the internal 

99 monotonic clock of the event loop) 

100 

101 .. versionadded:: 3.1 

102 

103 """ 

104 now = current_time() 

105 await sleep(max(deadline - now, 0)) 

106 

107 

108def current_time() -> float: 

109 """ 

110 Return the current value of the event loop's internal clock. 

111 

112 :return: the clock value (seconds) 

113 

114 """ 

115 return get_async_backend().current_time() 

116 

117 

118def get_all_backends() -> tuple[str, ...]: 

119 """Return a tuple of the names of all built-in backends.""" 

120 return BACKENDS 

121 

122 

123def get_cancelled_exc_class() -> type[BaseException]: 

124 """Return the current async library's cancellation exception class.""" 

125 return get_async_backend().cancelled_exception_class() 

126 

127 

128# 

129# Private API 

130# 

131 

132 

133@contextmanager 

134def claim_worker_thread( 

135 backend_class: type[AsyncBackend], token: object 

136) -> Generator[Any, None, None]: 

137 threadlocals.current_async_backend = backend_class 

138 threadlocals.current_token = token 

139 try: 

140 yield 

141 finally: 

142 del threadlocals.current_async_backend 

143 del threadlocals.current_token 

144 

145 

146def get_async_backend(asynclib_name: str | None = None) -> AsyncBackend: 

147 if asynclib_name is None: 

148 asynclib_name = sniffio.current_async_library() 

149 

150 modulename = "anyio._backends._" + asynclib_name 

151 try: 

152 module = sys.modules[modulename] 

153 except KeyError: 

154 module = import_module(modulename) 

155 

156 return getattr(module, "backend_class")