Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/anyio/_core/_eventloop.py: 38%

58 statements  

« prev     ^ index     » next       coverage.py v7.2.7, created at 2023-06-07 07:19 +0000

1from __future__ import annotations 

2 

3import math 

4import sys 

5import threading 

6from contextlib import contextmanager 

7from importlib import import_module 

8from typing import ( 

9 Any, 

10 Awaitable, 

11 Callable, 

12 Generator, 

13 TypeVar, 

14) 

15 

16import sniffio 

17 

18# This must be updated when new backends are introduced 

19from ._compat import DeprecatedAwaitableFloat 

20 

21BACKENDS = "asyncio", "trio" 

22 

23T_Retval = TypeVar("T_Retval") 

24threadlocals = threading.local() 

25 

26 

27def run( 

28 func: Callable[..., Awaitable[T_Retval]], 

29 *args: object, 

30 backend: str = "asyncio", 

31 backend_options: dict[str, Any] | None = None, 

32) -> T_Retval: 

33 """ 

34 Run the given coroutine function in an asynchronous event loop. 

35 

36 The current thread must not be already running an event loop. 

37 

38 :param func: a coroutine function 

39 :param args: positional arguments to ``func`` 

40 :param backend: name of the asynchronous event loop implementation – currently either 

41 ``asyncio`` or ``trio`` 

42 :param backend_options: keyword arguments to call the backend ``run()`` implementation with 

43 (documented :ref:`here <backend options>`) 

44 :return: the return value of the coroutine function 

45 :raises RuntimeError: if an asynchronous event loop is already running in this thread 

46 :raises LookupError: if the named backend is not found 

47 

48 """ 

49 try: 

50 asynclib_name = sniffio.current_async_library() 

51 except sniffio.AsyncLibraryNotFoundError: 

52 pass 

53 else: 

54 raise RuntimeError(f"Already running {asynclib_name} in this thread") 

55 

56 try: 

57 asynclib = import_module(f"..._backends._{backend}", package=__name__) 

58 except ImportError as exc: 

59 raise LookupError(f"No such backend: {backend}") from exc 

60 

61 token = None 

62 if sniffio.current_async_library_cvar.get(None) is None: 

63 # Since we're in control of the event loop, we can cache the name of the async library 

64 token = sniffio.current_async_library_cvar.set(backend) 

65 

66 try: 

67 backend_options = backend_options or {} 

68 return asynclib.run(func, *args, **backend_options) 

69 finally: 

70 if token: 

71 sniffio.current_async_library_cvar.reset(token) 

72 

73 

74async def sleep(delay: float) -> None: 

75 """ 

76 Pause the current task for the specified duration. 

77 

78 :param delay: the duration, in seconds 

79 

80 """ 

81 return await get_asynclib().sleep(delay) 

82 

83 

84async def sleep_forever() -> None: 

85 """ 

86 Pause the current task until it's cancelled. 

87 

88 This is a shortcut for ``sleep(math.inf)``. 

89 

90 .. versionadded:: 3.1 

91 

92 """ 

93 await sleep(math.inf) 

94 

95 

96async def sleep_until(deadline: float) -> None: 

97 """ 

98 Pause the current task until the given time. 

99 

100 :param deadline: the absolute time to wake up at (according to the internal monotonic clock of 

101 the event loop) 

102 

103 .. versionadded:: 3.1 

104 

105 """ 

106 now = current_time() 

107 await sleep(max(deadline - now, 0)) 

108 

109 

110def current_time() -> DeprecatedAwaitableFloat: 

111 """ 

112 Return the current value of the event loop's internal clock. 

113 

114 :return: the clock value (seconds) 

115 

116 """ 

117 return DeprecatedAwaitableFloat(get_asynclib().current_time(), current_time) 

118 

119 

120def get_all_backends() -> tuple[str, ...]: 

121 """Return a tuple of the names of all built-in backends.""" 

122 return BACKENDS 

123 

124 

125def get_cancelled_exc_class() -> type[BaseException]: 

126 """Return the current async library's cancellation exception class.""" 

127 return get_asynclib().CancelledError 

128 

129 

130# 

131# Private API 

132# 

133 

134 

135@contextmanager 

136def claim_worker_thread(backend: str) -> Generator[Any, None, None]: 

137 module = sys.modules["anyio._backends._" + backend] 

138 threadlocals.current_async_module = module 

139 try: 

140 yield 

141 finally: 

142 del threadlocals.current_async_module 

143 

144 

145def get_asynclib(asynclib_name: str | None = None) -> Any: 

146 if asynclib_name is None: 

147 asynclib_name = sniffio.current_async_library() 

148 

149 modulename = "anyio._backends._" + asynclib_name 

150 try: 

151 return sys.modules[modulename] 

152 except KeyError: 

153 return import_module(modulename)