Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/jupyter_core/utils/__init__.py: 38%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

105 statements  

1# Copyright (c) Jupyter Development Team. 

2# Distributed under the terms of the Modified BSD License. 

3from __future__ import annotations 

4 

5import asyncio 

6import atexit 

7import errno 

8import inspect 

9import sys 

10import threading 

11import warnings 

12from contextvars import ContextVar 

13from pathlib import Path 

14from types import FrameType 

15from typing import Any, Awaitable, Callable, TypeVar, cast 

16 

17 

18def ensure_dir_exists(path: str | Path, mode: int = 0o777) -> None: 

19 """Ensure that a directory exists 

20 

21 If it doesn't exist, try to create it, protecting against a race condition 

22 if another process is doing the same. 

23 The default permissions are determined by the current umask. 

24 """ 

25 try: 

26 Path(path).mkdir(parents=True, mode=mode) 

27 except OSError as e: 

28 if e.errno != errno.EEXIST: 

29 raise 

30 if not Path(path).is_dir(): 

31 raise OSError("%r exists but is not a directory" % path) 

32 

33 

34def _get_frame(level: int) -> FrameType | None: 

35 """Get the frame at the given stack level.""" 

36 # sys._getframe is much faster than inspect.stack, but isn't guaranteed to 

37 # exist in all python implementations, so we fall back to inspect.stack() 

38 

39 # We need to add one to level to account for this get_frame call. 

40 if hasattr(sys, "_getframe"): 

41 frame = sys._getframe(level + 1) 

42 else: 

43 frame = inspect.stack(context=0)[level + 1].frame 

44 return frame 

45 

46 

47# This function is from https://github.com/python/cpython/issues/67998 

48# (https://bugs.python.org/file39550/deprecated_module_stacklevel.diff) and 

49# calculates the appropriate stacklevel for deprecations to target the 

50# deprecation for the caller, no matter how many internal stack frames we have 

51# added in the process. For example, with the deprecation warning in the 

52# __init__ below, the appropriate stacklevel will change depending on how deep 

53# the inheritance hierarchy is. 

54def _external_stacklevel(internal: list[str]) -> int: 

55 """Find the stacklevel of the first frame that doesn't contain any of the given internal strings 

56 

57 The depth will be 1 at minimum in order to start checking at the caller of 

58 the function that called this utility method. 

59 """ 

60 # Get the level of my caller's caller 

61 level = 2 

62 frame = _get_frame(level) 

63 

64 # Normalize the path separators: 

65 normalized_internal = [str(Path(s)) for s in internal] 

66 

67 # climb the stack frames while we see internal frames 

68 while frame and any(s in str(Path(frame.f_code.co_filename)) for s in normalized_internal): 

69 level += 1 

70 frame = frame.f_back 

71 

72 # Return the stack level from the perspective of whoever called us (i.e., one level up) 

73 return level - 1 

74 

75 

76def deprecation(message: str, internal: str | list[str] = "jupyter_core/") -> None: 

77 """Generate a deprecation warning targeting the first frame that is not 'internal' 

78 

79 internal is a string or list of strings, which if they appear in filenames in the 

80 frames, the frames will be considered internal. Changing this can be useful if, for example, 

81 we know that our internal code is calling out to another library. 

82 """ 

83 _internal: list[str] 

84 _internal = [internal] if isinstance(internal, str) else internal 

85 

86 # stack level of the first external frame from here 

87 stacklevel = _external_stacklevel(_internal) 

88 

89 # The call to .warn adds one frame, so bump the stacklevel up by one 

90 warnings.warn(message, DeprecationWarning, stacklevel=stacklevel + 1) 

91 

92 

93T = TypeVar("T") 

94 

95 

96class _TaskRunner: 

97 """A task runner that runs an asyncio event loop on a background thread.""" 

98 

99 def __init__(self) -> None: 

100 self.__io_loop: asyncio.AbstractEventLoop | None = None 

101 self.__runner_thread: threading.Thread | None = None 

102 self.__lock = threading.Lock() 

103 atexit.register(self._close) 

104 

105 def _close(self) -> None: 

106 if self.__io_loop: 

107 self.__io_loop.stop() 

108 

109 def _runner(self) -> None: 

110 loop = self.__io_loop 

111 assert loop is not None 

112 try: 

113 loop.run_forever() 

114 finally: 

115 loop.close() 

116 

117 def run(self, coro: Any) -> Any: 

118 """Synchronously run a coroutine on a background thread.""" 

119 with self.__lock: 

120 name = f"{threading.current_thread().name} - runner" 

121 if self.__io_loop is None: 

122 self.__io_loop = asyncio.new_event_loop() 

123 self.__runner_thread = threading.Thread(target=self._runner, daemon=True, name=name) 

124 self.__runner_thread.start() 

125 fut = asyncio.run_coroutine_threadsafe(coro, self.__io_loop) 

126 return fut.result(None) 

127 

128 

129_runner_map: dict[str, _TaskRunner] = {} 

130_loop: ContextVar[asyncio.AbstractEventLoop | None] = ContextVar("_loop", default=None) 

131 

132 

133def run_sync(coro: Callable[..., Awaitable[T]]) -> Callable[..., T]: 

134 """Wraps coroutine in a function that blocks until it has executed. 

135 

136 Parameters 

137 ---------- 

138 coro : coroutine-function 

139 The coroutine-function to be executed. 

140 

141 Returns 

142 ------- 

143 result : 

144 Whatever the coroutine-function returns. 

145 """ 

146 

147 if not inspect.iscoroutinefunction(coro): 

148 raise AssertionError 

149 

150 def wrapped(*args: Any, **kwargs: Any) -> Any: 

151 name = threading.current_thread().name 

152 inner = coro(*args, **kwargs) 

153 try: 

154 # If a loop is currently running in this thread, 

155 # use a task runner. 

156 asyncio.get_running_loop() 

157 if name not in _runner_map: 

158 _runner_map[name] = _TaskRunner() 

159 return _runner_map[name].run(inner) 

160 except RuntimeError: 

161 pass 

162 

163 # Run the loop for this thread. 

164 loop = ensure_event_loop() 

165 return loop.run_until_complete(inner) 

166 

167 wrapped.__doc__ = coro.__doc__ 

168 return wrapped 

169 

170 

171def ensure_event_loop(prefer_selector_loop: bool = False) -> asyncio.AbstractEventLoop: 

172 # Get the loop for this thread, or create a new one. 

173 loop = _loop.get() 

174 if loop is not None and not loop.is_closed(): 

175 return loop 

176 try: 

177 loop = asyncio.get_running_loop() 

178 except RuntimeError: 

179 if sys.platform == "win32" and prefer_selector_loop: 

180 loop = asyncio.WindowsSelectorEventLoopPolicy().new_event_loop() 

181 else: 

182 loop = asyncio.new_event_loop() 

183 asyncio.set_event_loop(loop) 

184 _loop.set(loop) 

185 return loop 

186 

187 

188async def ensure_async(obj: Awaitable[T] | T) -> T: 

189 """Convert a non-awaitable object to a coroutine if needed, 

190 and await it if it was not already awaited. 

191 

192 This function is meant to be called on the result of calling a function, 

193 when that function could either be asynchronous or not. 

194 """ 

195 if inspect.isawaitable(obj): 

196 obj = cast(Awaitable[T], obj) 

197 try: 

198 result = await obj 

199 except RuntimeError as e: 

200 if str(e) == "cannot reuse already awaited coroutine": 

201 # obj is already the coroutine's result 

202 return cast(T, obj) 

203 raise 

204 return result 

205 # obj doesn't need to be awaited 

206 return cast(T, obj)