Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/jupyter_core/utils/__init__.py: 43%
97 statements
« prev ^ index » next coverage.py v7.3.3, created at 2023-12-15 06:13 +0000
« prev ^ index » next coverage.py v7.3.3, created at 2023-12-15 06:13 +0000
1# Copyright (c) Jupyter Development Team.
2# Distributed under the terms of the Modified BSD License.
3from __future__ import annotations
5import asyncio
6import atexit
7import errno
8import inspect
9import os
10import sys
11import threading
12import warnings
13from pathlib import Path
14from types import FrameType
15from typing import Any, Awaitable, Callable, TypeVar, cast
18def ensure_dir_exists(path: str, mode: int = 0o777) -> None:
19 """Ensure that a directory exists
21 If it doesn't exist, try to create it, protecting against a race condition
22 if another process is doing the same.
23 The default permissions are determined by the current umask.
24 """
25 try:
26 os.makedirs(path, mode=mode)
27 except OSError as e:
28 if e.errno != errno.EEXIST:
29 raise
30 if not os.path.isdir(path):
31 raise OSError("%r exists but is not a directory" % path)
34def _get_frame(level: int) -> FrameType | None:
35 """Get the frame at the given stack level."""
36 # sys._getframe is much faster than inspect.stack, but isn't guaranteed to
37 # exist in all python implementations, so we fall back to inspect.stack()
39 # We need to add one to level to account for this get_frame call.
40 if hasattr(sys, "_getframe"):
41 frame = sys._getframe(level + 1)
42 else:
43 frame = inspect.stack(context=0)[level + 1].frame
44 return frame
47# This function is from https://github.com/python/cpython/issues/67998
48# (https://bugs.python.org/file39550/deprecated_module_stacklevel.diff) and
49# calculates the appropriate stacklevel for deprecations to target the
50# deprecation for the caller, no matter how many internal stack frames we have
51# added in the process. For example, with the deprecation warning in the
52# __init__ below, the appropriate stacklevel will change depending on how deep
53# the inheritance hierarchy is.
54def _external_stacklevel(internal: list[str]) -> int:
55 """Find the stacklevel of the first frame that doesn't contain any of the given internal strings
57 The depth will be 1 at minimum in order to start checking at the caller of
58 the function that called this utility method.
59 """
60 # Get the level of my caller's caller
61 level = 2
62 frame = _get_frame(level)
64 # Normalize the path separators:
65 normalized_internal = [str(Path(s)) for s in internal]
67 # climb the stack frames while we see internal frames
68 while frame and any(s in str(Path(frame.f_code.co_filename)) for s in normalized_internal):
69 level += 1
70 frame = frame.f_back
72 # Return the stack level from the perspective of whoever called us (i.e., one level up)
73 return level - 1
76def deprecation(message: str, internal: str | list[str] = "jupyter_core/") -> None:
77 """Generate a deprecation warning targeting the first frame that is not 'internal'
79 internal is a string or list of strings, which if they appear in filenames in the
80 frames, the frames will be considered internal. Changing this can be useful if, for example,
81 we know that our internal code is calling out to another library.
82 """
83 _internal: list[str]
84 _internal = [internal] if isinstance(internal, str) else internal
86 # stack level of the first external frame from here
87 stacklevel = _external_stacklevel(_internal)
89 # The call to .warn adds one frame, so bump the stacklevel up by one
90 warnings.warn(message, DeprecationWarning, stacklevel=stacklevel + 1)
93T = TypeVar("T")
96class _TaskRunner:
97 """A task runner that runs an asyncio event loop on a background thread."""
99 def __init__(self) -> None:
100 self.__io_loop: asyncio.AbstractEventLoop | None = None
101 self.__runner_thread: threading.Thread | None = None
102 self.__lock = threading.Lock()
103 atexit.register(self._close)
105 def _close(self) -> None:
106 if self.__io_loop:
107 self.__io_loop.stop()
109 def _runner(self) -> None:
110 loop = self.__io_loop
111 assert loop is not None # noqa
112 try:
113 loop.run_forever()
114 finally:
115 loop.close()
117 def run(self, coro: Any) -> Any:
118 """Synchronously run a coroutine on a background thread."""
119 with self.__lock:
120 name = f"{threading.current_thread().name} - runner"
121 if self.__io_loop is None:
122 self.__io_loop = asyncio.new_event_loop()
123 self.__runner_thread = threading.Thread(target=self._runner, daemon=True, name=name)
124 self.__runner_thread.start()
125 fut = asyncio.run_coroutine_threadsafe(coro, self.__io_loop)
126 return fut.result(None)
129_runner_map: dict[str, _TaskRunner] = {}
132def run_sync(coro: Callable[..., Awaitable[T]]) -> Callable[..., T]:
133 """Wraps coroutine in a function that blocks until it has executed.
135 Parameters
136 ----------
137 coro : coroutine-function
138 The coroutine-function to be executed.
140 Returns
141 -------
142 result :
143 Whatever the coroutine-function returns.
144 """
146 if not inspect.iscoroutinefunction(coro):
147 raise AssertionError
149 def wrapped(*args: Any, **kwargs: Any) -> Any:
150 name = threading.current_thread().name
151 inner = coro(*args, **kwargs)
152 try:
153 # If a loop is currently running in this thread,
154 # use a task runner.
155 asyncio.get_running_loop()
156 if name not in _runner_map:
157 _runner_map[name] = _TaskRunner()
158 return _runner_map[name].run(inner)
159 except RuntimeError:
160 pass
162 # Run the loop for this thread.
163 # In Python 3.12, a deprecation warning is raised, which
164 # may later turn into a RuntimeError. We handle both
165 # cases.
166 with warnings.catch_warnings():
167 warnings.simplefilter("ignore", DeprecationWarning)
168 try:
169 loop = asyncio.get_event_loop()
170 except RuntimeError:
171 loop = asyncio.new_event_loop()
172 asyncio.set_event_loop(loop)
173 return loop.run_until_complete(inner)
175 wrapped.__doc__ = coro.__doc__
176 return wrapped
179async def ensure_async(obj: Awaitable[T] | T) -> T:
180 """Convert a non-awaitable object to a coroutine if needed,
181 and await it if it was not already awaited.
183 This function is meant to be called on the result of calling a function,
184 when that function could either be asynchronous or not.
185 """
186 if inspect.isawaitable(obj):
187 obj = cast(Awaitable[T], obj)
188 try:
189 result = await obj
190 except RuntimeError as e:
191 if str(e) == "cannot reuse already awaited coroutine":
192 # obj is already the coroutine's result
193 return cast(T, obj)
194 raise
195 return result
196 # obj doesn't need to be awaited
197 return cast(T, obj)