Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/anyio/_core/_eventloop.py: 37%

57 statements  

« prev     ^ index     » next       coverage.py v7.2.2, created at 2023-03-26 06:12 +0000

1import math 

2import sys 

3import threading 

4from contextlib import contextmanager 

5from importlib import import_module 

6from typing import ( 

7 Any, 

8 Callable, 

9 Coroutine, 

10 Dict, 

11 Generator, 

12 Optional, 

13 Tuple, 

14 Type, 

15 TypeVar, 

16) 

17 

18import sniffio 

19 

20# This must be updated when new backends are introduced 

21from ._compat import DeprecatedAwaitableFloat 

22 

23BACKENDS = "asyncio", "trio" 

24 

25T_Retval = TypeVar("T_Retval") 

26threadlocals = threading.local() 

27 

28 

29def run( 

30 func: Callable[..., Coroutine[Any, Any, T_Retval]], 

31 *args: object, 

32 backend: str = "asyncio", 

33 backend_options: Optional[Dict[str, Any]] = None, 

34) -> T_Retval: 

35 """ 

36 Run the given coroutine function in an asynchronous event loop. 

37 

38 The current thread must not be already running an event loop. 

39 

40 :param func: a coroutine function 

41 :param args: positional arguments to ``func`` 

42 :param backend: name of the asynchronous event loop implementation – currently either 

43 ``asyncio`` or ``trio`` 

44 :param backend_options: keyword arguments to call the backend ``run()`` implementation with 

45 (documented :ref:`here <backend options>`) 

46 :return: the return value of the coroutine function 

47 :raises RuntimeError: if an asynchronous event loop is already running in this thread 

48 :raises LookupError: if the named backend is not found 

49 

50 """ 

51 try: 

52 asynclib_name = sniffio.current_async_library() 

53 except sniffio.AsyncLibraryNotFoundError: 

54 pass 

55 else: 

56 raise RuntimeError(f"Already running {asynclib_name} in this thread") 

57 

58 try: 

59 asynclib = import_module(f"..._backends._{backend}", package=__name__) 

60 except ImportError as exc: 

61 raise LookupError(f"No such backend: {backend}") from exc 

62 

63 token = None 

64 if sniffio.current_async_library_cvar.get(None) is None: 

65 # Since we're in control of the event loop, we can cache the name of the async library 

66 token = sniffio.current_async_library_cvar.set(backend) 

67 

68 try: 

69 backend_options = backend_options or {} 

70 return asynclib.run(func, *args, **backend_options) 

71 finally: 

72 if token: 

73 sniffio.current_async_library_cvar.reset(token) 

74 

75 

76async def sleep(delay: float) -> None: 

77 """ 

78 Pause the current task for the specified duration. 

79 

80 :param delay: the duration, in seconds 

81 

82 """ 

83 return await get_asynclib().sleep(delay) 

84 

85 

86async def sleep_forever() -> None: 

87 """ 

88 Pause the current task until it's cancelled. 

89 

90 This is a shortcut for ``sleep(math.inf)``. 

91 

92 .. versionadded:: 3.1 

93 

94 """ 

95 await sleep(math.inf) 

96 

97 

98async def sleep_until(deadline: float) -> None: 

99 """ 

100 Pause the current task until the given time. 

101 

102 :param deadline: the absolute time to wake up at (according to the internal monotonic clock of 

103 the event loop) 

104 

105 .. versionadded:: 3.1 

106 

107 """ 

108 now = current_time() 

109 await sleep(max(deadline - now, 0)) 

110 

111 

112def current_time() -> DeprecatedAwaitableFloat: 

113 """ 

114 Return the current value of the event loop's internal clock. 

115 

116 :return: the clock value (seconds) 

117 

118 """ 

119 return DeprecatedAwaitableFloat(get_asynclib().current_time(), current_time) 

120 

121 

122def get_all_backends() -> Tuple[str, ...]: 

123 """Return a tuple of the names of all built-in backends.""" 

124 return BACKENDS 

125 

126 

127def get_cancelled_exc_class() -> Type[BaseException]: 

128 """Return the current async library's cancellation exception class.""" 

129 return get_asynclib().CancelledError 

130 

131 

132# 

133# Private API 

134# 

135 

136 

137@contextmanager 

138def claim_worker_thread(backend: str) -> Generator[Any, None, None]: 

139 module = sys.modules["anyio._backends._" + backend] 

140 threadlocals.current_async_module = module 

141 try: 

142 yield 

143 finally: 

144 del threadlocals.current_async_module 

145 

146 

147def get_asynclib(asynclib_name: Optional[str] = None) -> Any: 

148 if asynclib_name is None: 

149 asynclib_name = sniffio.current_async_library() 

150 

151 modulename = "anyio._backends._" + asynclib_name 

152 try: 

153 return sys.modules[modulename] 

154 except KeyError: 

155 return import_module(modulename)