Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/jupyter_core/utils/__init__.py: 42%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

105 statements  

1# Copyright (c) Jupyter Development Team. 

2# Distributed under the terms of the Modified BSD License. 

3from __future__ import annotations 

4 

5import asyncio 

6import atexit 

7import errno 

8import inspect 

9import sys 

10import threading 

11import warnings 

12from contextvars import ContextVar 

13from pathlib import Path 

14from types import FrameType 

15from typing import Any, Awaitable, Callable, TypeVar, cast 

16 

17 

18def ensure_dir_exists(path: str | Path, mode: int = 0o777) -> None: 

19 """Ensure that a directory exists 

20 

21 If it doesn't exist, try to create it, protecting against a race condition 

22 if another process is doing the same. 

23 The default permissions are determined by the current umask. 

24 """ 

25 try: 

26 Path(path).mkdir(parents=True, mode=mode) 

27 except OSError as e: 

28 if e.errno != errno.EEXIST: 

29 raise 

30 if not Path(path).is_dir(): 

31 msg = f"{path!r} exists but is not a directory" 

32 raise OSError(msg) 

33 

34 

35def _get_frame(level: int) -> FrameType | None: 

36 """Get the frame at the given stack level.""" 

37 # sys._getframe is much faster than inspect.stack, but isn't guaranteed to 

38 # exist in all python implementations, so we fall back to inspect.stack() 

39 

40 # We need to add one to level to account for this get_frame call. 

41 if hasattr(sys, "_getframe"): 

42 frame = sys._getframe(level + 1) 

43 else: 

44 frame = inspect.stack(context=0)[level + 1].frame 

45 return frame 

46 

47 

48# This function is from https://github.com/python/cpython/issues/67998 

49# (https://bugs.python.org/file39550/deprecated_module_stacklevel.diff) and 

50# calculates the appropriate stacklevel for deprecations to target the 

51# deprecation for the caller, no matter how many internal stack frames we have 

52# added in the process. For example, with the deprecation warning in the 

53# __init__ below, the appropriate stacklevel will change depending on how deep 

54# the inheritance hierarchy is. 

55def _external_stacklevel(internal: list[str]) -> int: 

56 """Find the stacklevel of the first frame that doesn't contain any of the given internal strings 

57 

58 The depth will be 1 at minimum in order to start checking at the caller of 

59 the function that called this utility method. 

60 """ 

61 # Get the level of my caller's caller 

62 level = 2 

63 frame = _get_frame(level) 

64 

65 # Normalize the path separators: 

66 normalized_internal = [str(Path(s)) for s in internal] 

67 

68 # climb the stack frames while we see internal frames 

69 while frame and any(s in str(Path(frame.f_code.co_filename)) for s in normalized_internal): 

70 level += 1 

71 frame = frame.f_back 

72 

73 # Return the stack level from the perspective of whoever called us (i.e., one level up) 

74 return level - 1 

75 

76 

77def deprecation(message: str, internal: str | list[str] = "jupyter_core/") -> None: 

78 """Generate a deprecation warning targeting the first frame that is not 'internal' 

79 

80 internal is a string or list of strings, which if they appear in filenames in the 

81 frames, the frames will be considered internal. Changing this can be useful if, for example, 

82 we know that our internal code is calling out to another library. 

83 """ 

84 _internal: list[str] 

85 _internal = [internal] if isinstance(internal, str) else internal 

86 

87 # stack level of the first external frame from here 

88 stacklevel = _external_stacklevel(_internal) 

89 

90 # The call to .warn adds one frame, so bump the stacklevel up by one 

91 warnings.warn(message, DeprecationWarning, stacklevel=stacklevel + 1) 

92 

93 

94T = TypeVar("T") 

95 

96 

97class _TaskRunner: 

98 """A task runner that runs an asyncio event loop on a background thread.""" 

99 

100 def __init__(self) -> None: 

101 self.__io_loop: asyncio.AbstractEventLoop | None = None 

102 self.__runner_thread: threading.Thread | None = None 

103 self.__lock = threading.Lock() 

104 atexit.register(self._close) 

105 

106 def _close(self) -> None: 

107 if self.__io_loop: 

108 self.__io_loop.stop() 

109 

110 def _runner(self) -> None: 

111 loop = self.__io_loop 

112 assert loop is not None 

113 try: 

114 loop.run_forever() 

115 finally: 

116 loop.close() 

117 

118 def run(self, coro: Any) -> Any: 

119 """Synchronously run a coroutine on a background thread.""" 

120 with self.__lock: 

121 name = f"{threading.current_thread().name} - runner" 

122 if self.__io_loop is None: 

123 self.__io_loop = asyncio.new_event_loop() 

124 self.__runner_thread = threading.Thread(target=self._runner, daemon=True, name=name) 

125 self.__runner_thread.start() 

126 fut = asyncio.run_coroutine_threadsafe(coro, self.__io_loop) 

127 return fut.result(None) 

128 

129 

130_runner_map: dict[str, _TaskRunner] = {} 

131_loop: ContextVar[asyncio.AbstractEventLoop | None] = ContextVar("_loop", default=None) 

132 

133 

134def run_sync(coro: Callable[..., Awaitable[T]]) -> Callable[..., T]: 

135 """Wraps coroutine in a function that blocks until it has executed. 

136 

137 Parameters 

138 ---------- 

139 coro : coroutine-function 

140 The coroutine-function to be executed. 

141 

142 Returns 

143 ------- 

144 result : 

145 Whatever the coroutine-function returns. 

146 """ 

147 

148 assert inspect.iscoroutinefunction(coro) 

149 

150 def wrapped(*args: Any, **kwargs: Any) -> Any: 

151 name = threading.current_thread().name 

152 inner = coro(*args, **kwargs) 

153 try: 

154 asyncio.get_running_loop() 

155 except RuntimeError: 

156 # No loop running, run the loop for this thread. 

157 loop = ensure_event_loop() 

158 return loop.run_until_complete(inner) 

159 

160 # Loop is currently running in this thread, 

161 # use a task runner. 

162 if name not in _runner_map: 

163 _runner_map[name] = _TaskRunner() 

164 return _runner_map[name].run(inner) 

165 

166 wrapped.__doc__ = coro.__doc__ 

167 return wrapped 

168 

169 

170def ensure_event_loop(prefer_selector_loop: bool = False) -> asyncio.AbstractEventLoop: 

171 # Get the loop for this thread, or create a new one. 

172 loop = _loop.get() 

173 if loop is not None and not loop.is_closed(): 

174 return loop 

175 try: 

176 loop = asyncio.get_running_loop() 

177 except RuntimeError: 

178 if sys.platform == "win32" and prefer_selector_loop: 

179 loop = asyncio.WindowsSelectorEventLoopPolicy().new_event_loop() 

180 else: 

181 loop = asyncio.new_event_loop() 

182 asyncio.set_event_loop(loop) 

183 _loop.set(loop) 

184 return loop 

185 

186 

187async def ensure_async(obj: Awaitable[T] | T) -> T: 

188 """Convert a non-awaitable object to a coroutine if needed, 

189 and await it if it was not already awaited. 

190 

191 This function is meant to be called on the result of calling a function, 

192 when that function could either be asynchronous or not. 

193 """ 

194 if inspect.isawaitable(obj): 

195 obj = cast(Awaitable[T], obj) 

196 try: 

197 result = await obj 

198 except RuntimeError as e: 

199 if str(e) == "cannot reuse already awaited coroutine": 

200 # obj is already the coroutine's result 

201 return cast(T, obj) 

202 raise 

203 return result 

204 return obj