Coverage for /pythoncovmergedfiles/medio/medio/src/jupyter_server/jupyter_server/utils.py: 29%
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1"""Notebook related utilities"""
3# Copyright (c) Jupyter Development Team.
4# Distributed under the terms of the Modified BSD License.
5from __future__ import annotations
7import errno
8import importlib.util
9import os
10import socket
11import sys
12import warnings
13from _frozen_importlib_external import _NamespacePath
14from contextlib import contextmanager
15from pathlib import Path
16from typing import TYPE_CHECKING, Any, NewType
17from urllib.parse import (
18 SplitResult,
19 quote,
20 unquote,
21 urlparse,
22 urlsplit,
23 urlunsplit,
24)
25from urllib.parse import (
26 urljoin as _urljoin,
27)
28from urllib.request import pathname2url as _pathname2url
30from jupyter_core.utils import ensure_async as _ensure_async
31from packaging.version import Version
32from tornado.httpclient import AsyncHTTPClient, HTTPClient, HTTPRequest, HTTPResponse
33from tornado.netutil import Resolver
35if TYPE_CHECKING:
36 from collections.abc import Generator, Sequence
38ApiPath = NewType("ApiPath", str)
40# Re-export
41urljoin = _urljoin
42pathname2url = _pathname2url
43ensure_async = _ensure_async
46def url_path_join(*pieces: str) -> str:
47 """Join components of url into a relative url
49 Use to prevent double slash when joining subpath. This will leave the
50 initial and final / in place
51 """
52 initial = pieces[0].startswith("/")
53 final = pieces[-1].endswith("/")
54 stripped = [s.strip("/") for s in pieces]
55 result = "/".join(s for s in stripped if s)
56 if initial:
57 result = "/" + result
58 if final:
59 result = result + "/"
60 if result == "//":
61 result = "/"
62 return result
65def url_is_absolute(url: str) -> bool:
66 """Determine whether a given URL is absolute"""
67 return urlparse(url).path.startswith("/")
70def path2url(path: str) -> str:
71 """Convert a local file path to a URL"""
72 pieces = [quote(p) for p in path.split(os.sep)]
73 # preserve trailing /
74 if pieces[-1] == "":
75 pieces[-1] = "/"
76 url = url_path_join(*pieces)
77 return url
80def url2path(url: str) -> str:
81 """Convert a URL to a local file path"""
82 pieces = [unquote(p) for p in url.split("/")]
83 path = os.path.join(*pieces)
84 return path
87def url_escape(path: str) -> str:
88 """Escape special characters in a URL path
90 Turns '/foo bar/' into '/foo%20bar/'
91 """
92 parts = path.split("/")
93 return "/".join([quote(p) for p in parts])
96def url_unescape(path: str) -> str:
97 """Unescape special characters in a URL path
99 Turns '/foo%20bar/' into '/foo bar/'
100 """
101 return "/".join([unquote(p) for p in path.split("/")])
104def samefile_simple(path: str, other_path: str) -> bool:
105 """
106 Fill in for os.path.samefile when it is unavailable (Windows+py2).
108 Do a case-insensitive string comparison in this case
109 plus comparing the full stat result (including times)
110 because Windows + py2 doesn't support the stat fields
111 needed for identifying if it's the same file (st_ino, st_dev).
113 Only to be used if os.path.samefile is not available.
115 Parameters
116 ----------
117 path : str
118 representing a path to a file
119 other_path : str
120 representing a path to another file
122 Returns
123 -------
124 same: Boolean that is True if both path and other path are the same
125 """
126 path_stat = os.stat(path)
127 other_path_stat = os.stat(other_path)
128 return path.lower() == other_path.lower() and path_stat == other_path_stat
131def to_os_path(path: ApiPath, root: str = "") -> str:
132 """Convert an API path to a filesystem path
134 If given, root will be prepended to the path.
135 root must be a filesystem path already.
136 """
137 parts = str(path).strip("/").split("/")
138 parts = [p for p in parts if p != ""] # remove duplicate splits
139 path_ = os.path.join(root, *parts)
140 return os.path.normpath(path_)
143def to_api_path(os_path: str, root: str = "") -> ApiPath:
144 """Convert a filesystem path to an API path
146 If given, root will be removed from the path.
147 root must be a filesystem path already.
148 """
149 os_path = os_path.removeprefix(root)
150 parts = os_path.strip(os.path.sep).split(os.path.sep)
151 parts = [p for p in parts if p != ""] # remove duplicate splits
152 path = "/".join(parts)
153 return ApiPath(path)
156def check_version(v: str, check: str) -> bool:
157 """check version string v >= check
159 If dev/prerelease tags result in TypeError for string-number comparison,
160 it is assumed that the dependency is satisfied.
161 Users on dev branches are responsible for keeping their own packages up to date.
162 """
163 try:
164 return bool(Version(v) >= Version(check))
165 except TypeError:
166 return True
169# Copy of IPython.utils.process.check_pid:
172def _check_pid_win32(pid: int) -> bool:
173 import ctypes
175 # OpenProcess returns 0 if no such process (of ours) exists
176 # positive int otherwise
177 return bool(ctypes.windll.kernel32.OpenProcess(1, 0, pid)) # type:ignore[attr-defined]
180def _check_pid_posix(pid: int) -> bool:
181 """Copy of IPython.utils.process.check_pid"""
182 try:
183 os.kill(pid, 0)
184 except OSError as err:
185 if err.errno == errno.ESRCH:
186 return False
187 elif err.errno == errno.EPERM:
188 # Don't have permission to signal the process - probably means it exists
189 return True
190 raise
191 else:
192 return True
195if sys.platform == "win32":
196 check_pid = _check_pid_win32
197else:
198 check_pid = _check_pid_posix
201async def run_sync_in_loop(maybe_async):
202 """**DEPRECATED**: Use ``ensure_async`` from jupyter_core instead."""
203 warnings.warn(
204 "run_sync_in_loop is deprecated since Jupyter Server 2.0, use 'ensure_async' from jupyter_core instead",
205 DeprecationWarning,
206 stacklevel=2,
207 )
208 return ensure_async(maybe_async)
211def urlencode_unix_socket_path(socket_path: str) -> str:
212 """Encodes a UNIX socket path string from a socket path for the `http+unix` URI form."""
213 return socket_path.replace("/", "%2F")
216def urldecode_unix_socket_path(socket_path: str) -> str:
217 """Decodes a UNIX sock path string from an encoded sock path for the `http+unix` URI form."""
218 return socket_path.replace("%2F", "/")
221def urlencode_unix_socket(socket_path: str) -> str:
222 """Encodes a UNIX socket URL from a socket path for the `http+unix` URI form."""
223 return "http+unix://%s" % urlencode_unix_socket_path(socket_path)
226def unix_socket_in_use(socket_path: str) -> bool:
227 """Checks whether a UNIX socket path on disk is in use by attempting to connect to it."""
228 if not os.path.exists(socket_path):
229 return False
231 try:
232 sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
233 sock.connect(socket_path)
234 except OSError:
235 return False
236 else:
237 return True
238 finally:
239 sock.close()
242@contextmanager
243def _request_for_tornado_client(
244 urlstring: str, method: str = "GET", body: Any = None, headers: Any = None
245) -> Generator[HTTPRequest, None, None]:
246 """A utility that provides a context that handles
247 HTTP, HTTPS, and HTTP+UNIX request.
248 Creates a tornado HTTPRequest object with a URL
249 that tornado's HTTPClients can accept.
250 If the request is made to a unix socket, temporarily
251 configure the AsyncHTTPClient to resolve the URL
252 and connect to the proper socket.
253 """
254 parts = urlsplit(urlstring)
255 if parts.scheme in ["http", "https"]:
256 pass
257 elif parts.scheme == "http+unix":
258 # If unix socket, mimic HTTP.
259 parts = SplitResult(
260 scheme="http",
261 netloc=parts.netloc,
262 path=parts.path,
263 query=parts.query,
264 fragment=parts.fragment,
265 )
267 class UnixSocketResolver(Resolver):
268 """A resolver that routes HTTP requests to unix sockets
269 in tornado HTTP clients.
270 Due to constraints in Tornados' API, the scheme of the
271 must be `http` (not `http+unix`). Applications should replace
272 the scheme in URLS before making a request to the HTTP client.
273 """
275 def initialize(self, resolver):
276 self.resolver = resolver
278 def close(self):
279 self.resolver.close()
281 async def resolve(self, host, port, *args, **kwargs):
282 return [(socket.AF_UNIX, urldecode_unix_socket_path(host))]
284 resolver = UnixSocketResolver(resolver=Resolver())
285 AsyncHTTPClient.configure(None, resolver=resolver)
286 else:
287 msg = "Unknown URL scheme."
288 raise Exception(msg)
290 # Yield the request for the given client.
291 url = urlunsplit(parts)
292 request = HTTPRequest(url, method=method, body=body, headers=headers, validate_cert=False)
293 yield request
296def fetch(
297 urlstring: str, method: str = "GET", body: Any = None, headers: Any = None
298) -> HTTPResponse:
299 """
300 Send a HTTP, HTTPS, or HTTP+UNIX request
301 to a Tornado Web Server. Returns a tornado HTTPResponse.
302 """
303 with _request_for_tornado_client(
304 urlstring, method=method, body=body, headers=headers
305 ) as request:
306 response = HTTPClient(AsyncHTTPClient).fetch(request)
307 return response
310async def async_fetch(
311 urlstring: str, method: str = "GET", body: Any = None, headers: Any = None, io_loop: Any = None
312) -> HTTPResponse:
313 """
314 Send an asynchronous HTTP, HTTPS, or HTTP+UNIX request
315 to a Tornado Web Server. Returns a tornado HTTPResponse.
316 """
317 with _request_for_tornado_client(
318 urlstring, method=method, body=body, headers=headers
319 ) as request:
320 response = await AsyncHTTPClient(io_loop).fetch(request)
321 return response
324def is_namespace_package(namespace: str) -> bool | None:
325 """Is the provided namespace a Python Namespace Package (PEP420).
327 https://www.python.org/dev/peps/pep-0420/#specification
329 Returns `None` if module is not importable.
331 """
332 # NOTE: using submodule_search_locations because the loader can be None
333 try:
334 spec = importlib.util.find_spec(namespace)
335 except ValueError: # spec is not set - see https://docs.python.org/3/library/importlib.html#importlib.util.find_spec
336 return None
338 if not spec:
339 # e.g. module not installed
340 return None
341 return isinstance(spec.submodule_search_locations, _NamespacePath)
344def filefind(filename: str, path_dirs: Sequence[str]) -> str:
345 """Find a file by looking through a sequence of paths.
347 For use in FileFindHandler.
349 Iterates through a sequence of paths looking for a file and returns
350 the full, absolute path of the first occurrence of the file.
352 Absolute paths are not accepted for inputs.
354 This function does not automatically try any paths,
355 such as the cwd or the user's home directory.
357 Parameters
358 ----------
359 filename : str
360 The filename to look for. Must be a relative path.
361 path_dirs : sequence of str
362 The sequence of paths to look in for the file.
363 Walk through each element and join with ``filename``.
364 Only after ensuring the path resolves within the directory is it checked for existence.
366 Returns
367 -------
368 Raises :exc:`OSError` or returns absolute path to file.
369 """
370 file_path = Path(filename)
372 # If the input is an absolute path, reject it
373 if file_path.is_absolute():
374 msg = f"{filename} is absolute, filefind only accepts relative paths."
375 raise OSError(msg)
377 for path_str in path_dirs:
378 path = Path(path_str).absolute()
379 test_path = path / file_path
380 # os.path.abspath resolves '..', but Path.absolute() doesn't
381 # Path.resolve() does, but traverses symlinks, which we don't want
382 test_path = Path(os.path.abspath(test_path))
383 if not test_path.is_relative_to(path):
384 # points outside root, e.g. via `filename='../foo'`
385 continue
386 # make sure we don't call is_file before we know it's a file within a prefix
387 # GHSA-hrw6-wg82-cm62 - can leak password hash on windows.
388 if test_path.is_file():
389 return os.path.abspath(test_path)
391 msg = f"File {filename!r} does not exist in any of the search paths: {path_dirs!r}"
392 raise OSError(msg)
395def import_item(name: str) -> Any:
396 """Import and return ``bar`` given the string ``foo.bar``.
397 Calling ``bar = import_item("foo.bar")`` is the functional equivalent of
398 executing the code ``from foo import bar``.
399 Parameters
400 ----------
401 name : str
402 The fully qualified name of the module/package being imported.
403 Returns
404 -------
405 mod : module object
406 The module that was imported.
407 """
409 parts = name.rsplit(".", 1)
410 if len(parts) == 2:
411 # called with 'foo.bar....'
412 package, obj = parts
413 module = __import__(package, fromlist=[obj])
414 try:
415 pak = getattr(module, obj)
416 except AttributeError as e:
417 raise ImportError("No module named %s" % obj) from e
418 return pak
419 else:
420 # called with un-dotted string
421 return __import__(parts[0])
424class JupyterServerAuthWarning(RuntimeWarning):
425 """Emitted when authentication configuration issue is detected.
427 Intended for filtering out expected warnings in tests, including
428 downstream tests, rather than for users to silence this warning.
429 """