Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/jupyter_core/utils/__init__.py: 45%

93 statements  

« prev     ^ index     » next       coverage.py v7.2.7, created at 2023-07-01 06:54 +0000

1# Copyright (c) Jupyter Development Team. 

2# Distributed under the terms of the Modified BSD License. 

3 

4import asyncio 

5import atexit 

6import errno 

7import inspect 

8import os 

9import sys 

10import threading 

11import warnings 

12from pathlib import Path 

13from types import FrameType 

14from typing import Awaitable, Callable, List, Optional, TypeVar, Union, cast 

15 

16 

17def ensure_dir_exists(path, mode=0o777): 

18 """Ensure that a directory exists 

19 

20 If it doesn't exist, try to create it, protecting against a race condition 

21 if another process is doing the same. 

22 The default permissions are determined by the current umask. 

23 """ 

24 try: 

25 os.makedirs(path, mode=mode) 

26 except OSError as e: 

27 if e.errno != errno.EEXIST: 

28 raise 

29 if not os.path.isdir(path): 

30 raise OSError("%r exists but is not a directory" % path) 

31 

32 

33def _get_frame(level: int) -> Optional[FrameType]: 

34 """Get the frame at the given stack level.""" 

35 # sys._getframe is much faster than inspect.stack, but isn't guaranteed to 

36 # exist in all python implementations, so we fall back to inspect.stack() 

37 

38 # We need to add one to level to account for this get_frame call. 

39 if hasattr(sys, "_getframe"): 

40 frame = sys._getframe(level + 1) 

41 else: 

42 frame = inspect.stack(context=0)[level + 1].frame 

43 return frame 

44 

45 

46# This function is from https://github.com/python/cpython/issues/67998 

47# (https://bugs.python.org/file39550/deprecated_module_stacklevel.diff) and 

48# calculates the appropriate stacklevel for deprecations to target the 

49# deprecation for the caller, no matter how many internal stack frames we have 

50# added in the process. For example, with the deprecation warning in the 

51# __init__ below, the appropriate stacklevel will change depending on how deep 

52# the inheritance hierarchy is. 

53def _external_stacklevel(internal: List[str]) -> int: 

54 """Find the stacklevel of the first frame that doesn't contain any of the given internal strings 

55 

56 The depth will be 1 at minimum in order to start checking at the caller of 

57 the function that called this utility method. 

58 """ 

59 # Get the level of my caller's caller 

60 level = 2 

61 frame = _get_frame(level) 

62 

63 # Normalize the path separators: 

64 normalized_internal = [str(Path(s)) for s in internal] 

65 

66 # climb the stack frames while we see internal frames 

67 while frame and any(s in str(Path(frame.f_code.co_filename)) for s in normalized_internal): 

68 level += 1 

69 frame = frame.f_back 

70 

71 # Return the stack level from the perspective of whoever called us (i.e., one level up) 

72 return level - 1 

73 

74 

75def deprecation(message: str, internal: Union[str, List[str]] = "jupyter_core/") -> None: 

76 """Generate a deprecation warning targeting the first frame that is not 'internal' 

77 

78 internal is a string or list of strings, which if they appear in filenames in the 

79 frames, the frames will be considered internal. Changing this can be useful if, for examnple, 

80 we know that our internal code is calling out to another library. 

81 """ 

82 _internal: List[str] 

83 _internal = [internal] if isinstance(internal, str) else internal 

84 

85 # stack level of the first external frame from here 

86 stacklevel = _external_stacklevel(_internal) 

87 

88 # The call to .warn adds one frame, so bump the stacklevel up by one 

89 warnings.warn(message, DeprecationWarning, stacklevel=stacklevel + 1) 

90 

91 

92T = TypeVar("T") 

93 

94 

95class _TaskRunner: 

96 """A task runner that runs an asyncio event loop on a background thread.""" 

97 

98 def __init__(self): 

99 self.__io_loop: Optional[asyncio.AbstractEventLoop] = None 

100 self.__runner_thread: Optional[threading.Thread] = None 

101 self.__lock = threading.Lock() 

102 atexit.register(self._close) 

103 

104 def _close(self): 

105 if self.__io_loop: 

106 self.__io_loop.stop() 

107 

108 def _runner(self): 

109 loop = self.__io_loop 

110 assert loop is not None # noqa 

111 try: 

112 loop.run_forever() 

113 finally: 

114 loop.close() 

115 

116 def run(self, coro): 

117 """Synchronously run a coroutine on a background thread.""" 

118 with self.__lock: 

119 name = f"{threading.current_thread().name} - runner" 

120 if self.__io_loop is None: 

121 self.__io_loop = asyncio.new_event_loop() 

122 self.__runner_thread = threading.Thread(target=self._runner, daemon=True, name=name) 

123 self.__runner_thread.start() 

124 fut = asyncio.run_coroutine_threadsafe(coro, self.__io_loop) 

125 return fut.result(None) 

126 

127 

128_runner_map = {} 

129_loop_map = {} 

130 

131 

132def run_sync(coro: Callable[..., Awaitable[T]]) -> Callable[..., T]: 

133 """Wraps coroutine in a function that blocks until it has executed. 

134 

135 Parameters 

136 ---------- 

137 coro : coroutine-function 

138 The coroutine-function to be executed. 

139 

140 Returns 

141 ------- 

142 result : 

143 Whatever the coroutine-function returns. 

144 """ 

145 

146 if not inspect.iscoroutinefunction(coro): 

147 raise AssertionError 

148 

149 def wrapped(*args, **kwargs): 

150 name = threading.current_thread().name 

151 inner = coro(*args, **kwargs) 

152 try: 

153 # If a loop is currently running in this thread, 

154 # use a task runner. 

155 asyncio.get_running_loop() 

156 if name not in _runner_map: 

157 _runner_map[name] = _TaskRunner() 

158 return _runner_map[name].run(inner) 

159 except RuntimeError: 

160 pass 

161 

162 # Run the loop for this thread. 

163 if name not in _loop_map: 

164 _loop_map[name] = asyncio.new_event_loop() 

165 loop = _loop_map[name] 

166 return loop.run_until_complete(inner) 

167 

168 wrapped.__doc__ = coro.__doc__ 

169 return wrapped 

170 

171 

172async def ensure_async(obj: Union[Awaitable[T], T]) -> T: 

173 """Convert a non-awaitable object to a coroutine if needed, 

174 and await it if it was not already awaited. 

175 

176 This function is meant to be called on the result of calling a function, 

177 when that function could either be asynchronous or not. 

178 """ 

179 if inspect.isawaitable(obj): 

180 obj = cast(Awaitable[T], obj) 

181 try: 

182 result = await obj 

183 except RuntimeError as e: 

184 if str(e) == "cannot reuse already awaited coroutine": 

185 # obj is already the coroutine's result 

186 return cast(T, obj) 

187 raise 

188 return result 

189 # obj doesn't need to be awaited 

190 return cast(T, obj)