Coverage for /pythoncovmergedfiles/medio/medio/src/jupyter_server/jupyter_server/utils.py: 29%
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1"""Notebook related utilities"""
3# Copyright (c) Jupyter Development Team.
4# Distributed under the terms of the Modified BSD License.
5from __future__ import annotations
7import errno
8import importlib.util
9import os
10import socket
11import sys
12import warnings
13from _frozen_importlib_external import _NamespacePath
14from contextlib import contextmanager
15from pathlib import Path
16from typing import TYPE_CHECKING, Any, NewType
17from urllib.parse import (
18 SplitResult,
19 quote,
20 unquote,
21 urlparse,
22 urlsplit,
23 urlunsplit,
24)
25from urllib.parse import (
26 urljoin as _urljoin,
27)
28from urllib.request import pathname2url as _pathname2url
30from jupyter_core.utils import ensure_async as _ensure_async
31from packaging.version import Version
32from tornado.httpclient import AsyncHTTPClient, HTTPClient, HTTPRequest, HTTPResponse
33from tornado.netutil import Resolver
35if TYPE_CHECKING:
36 from collections.abc import Generator, Sequence
38ApiPath = NewType("ApiPath", str)
40# Re-export
41urljoin = _urljoin
42pathname2url = _pathname2url
43ensure_async = _ensure_async
46def url_path_join(*pieces: str) -> str:
47 """Join components of url into a relative url
49 Use to prevent double slash when joining subpath. This will leave the
50 initial and final / in place
51 """
52 initial = pieces[0].startswith("/")
53 final = pieces[-1].endswith("/")
54 stripped = [s.strip("/") for s in pieces]
55 result = "/".join(s for s in stripped if s)
56 if initial:
57 result = "/" + result
58 if final:
59 result = result + "/"
60 if result == "//":
61 result = "/"
62 return result
65def url_is_absolute(url: str) -> bool:
66 """Determine whether a given URL is absolute"""
67 return urlparse(url).path.startswith("/")
70def path2url(path: str) -> str:
71 """Convert a local file path to a URL"""
72 pieces = [quote(p) for p in path.split(os.sep)]
73 # preserve trailing /
74 if pieces[-1] == "":
75 pieces[-1] = "/"
76 url = url_path_join(*pieces)
77 return url
80def url2path(url: str) -> str:
81 """Convert a URL to a local file path"""
82 pieces = [unquote(p) for p in url.split("/")]
83 path = os.path.join(*pieces)
84 return path
87def url_escape(path: str) -> str:
88 """Escape special characters in a URL path
90 Turns '/foo bar/' into '/foo%20bar/'
91 """
92 parts = path.split("/")
93 return "/".join([quote(p) for p in parts])
96def url_unescape(path: str) -> str:
97 """Unescape special characters in a URL path
99 Turns '/foo%20bar/' into '/foo bar/'
100 """
101 return "/".join([unquote(p) for p in path.split("/")])
104def samefile_simple(path: str, other_path: str) -> bool:
105 """
106 Fill in for os.path.samefile when it is unavailable (Windows+py2).
108 Do a case-insensitive string comparison in this case
109 plus comparing the full stat result (including times)
110 because Windows + py2 doesn't support the stat fields
111 needed for identifying if it's the same file (st_ino, st_dev).
113 Only to be used if os.path.samefile is not available.
115 Parameters
116 ----------
117 path : str
118 representing a path to a file
119 other_path : str
120 representing a path to another file
122 Returns
123 -------
124 same: Boolean that is True if both path and other path are the same
125 """
126 path_stat = os.stat(path)
127 other_path_stat = os.stat(other_path)
128 return path.lower() == other_path.lower() and path_stat == other_path_stat
131def to_os_path(path: ApiPath, root: str = "") -> str:
132 """Convert an API path to a filesystem path
134 If given, root will be prepended to the path.
135 root must be a filesystem path already.
136 """
137 parts = str(path).strip("/").split("/")
138 parts = [p for p in parts if p != ""] # remove duplicate splits
139 path_ = os.path.join(root, *parts)
140 return os.path.normpath(path_)
143def to_api_path(os_path: str, root: str = "") -> ApiPath:
144 """Convert a filesystem path to an API path
146 If given, root will be removed from the path.
147 root must be a filesystem path already.
148 """
149 if os_path.startswith(root):
150 os_path = os_path[len(root) :]
151 parts = os_path.strip(os.path.sep).split(os.path.sep)
152 parts = [p for p in parts if p != ""] # remove duplicate splits
153 path = "/".join(parts)
154 return ApiPath(path)
157def check_version(v: str, check: str) -> bool:
158 """check version string v >= check
160 If dev/prerelease tags result in TypeError for string-number comparison,
161 it is assumed that the dependency is satisfied.
162 Users on dev branches are responsible for keeping their own packages up to date.
163 """
164 try:
165 return bool(Version(v) >= Version(check))
166 except TypeError:
167 return True
170# Copy of IPython.utils.process.check_pid:
173def _check_pid_win32(pid: int) -> bool:
174 import ctypes
176 # OpenProcess returns 0 if no such process (of ours) exists
177 # positive int otherwise
178 return bool(ctypes.windll.kernel32.OpenProcess(1, 0, pid)) # type:ignore[attr-defined]
181def _check_pid_posix(pid: int) -> bool:
182 """Copy of IPython.utils.process.check_pid"""
183 try:
184 os.kill(pid, 0)
185 except OSError as err:
186 if err.errno == errno.ESRCH:
187 return False
188 elif err.errno == errno.EPERM:
189 # Don't have permission to signal the process - probably means it exists
190 return True
191 raise
192 else:
193 return True
196if sys.platform == "win32":
197 check_pid = _check_pid_win32
198else:
199 check_pid = _check_pid_posix
202async def run_sync_in_loop(maybe_async):
203 """**DEPRECATED**: Use ``ensure_async`` from jupyter_core instead."""
204 warnings.warn(
205 "run_sync_in_loop is deprecated since Jupyter Server 2.0, use 'ensure_async' from jupyter_core instead",
206 DeprecationWarning,
207 stacklevel=2,
208 )
209 return ensure_async(maybe_async)
212def urlencode_unix_socket_path(socket_path: str) -> str:
213 """Encodes a UNIX socket path string from a socket path for the `http+unix` URI form."""
214 return socket_path.replace("/", "%2F")
217def urldecode_unix_socket_path(socket_path: str) -> str:
218 """Decodes a UNIX sock path string from an encoded sock path for the `http+unix` URI form."""
219 return socket_path.replace("%2F", "/")
222def urlencode_unix_socket(socket_path: str) -> str:
223 """Encodes a UNIX socket URL from a socket path for the `http+unix` URI form."""
224 return "http+unix://%s" % urlencode_unix_socket_path(socket_path)
227def unix_socket_in_use(socket_path: str) -> bool:
228 """Checks whether a UNIX socket path on disk is in use by attempting to connect to it."""
229 if not os.path.exists(socket_path):
230 return False
232 try:
233 sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
234 sock.connect(socket_path)
235 except OSError:
236 return False
237 else:
238 return True
239 finally:
240 sock.close()
243@contextmanager
244def _request_for_tornado_client(
245 urlstring: str, method: str = "GET", body: Any = None, headers: Any = None
246) -> Generator[HTTPRequest, None, None]:
247 """A utility that provides a context that handles
248 HTTP, HTTPS, and HTTP+UNIX request.
249 Creates a tornado HTTPRequest object with a URL
250 that tornado's HTTPClients can accept.
251 If the request is made to a unix socket, temporarily
252 configure the AsyncHTTPClient to resolve the URL
253 and connect to the proper socket.
254 """
255 parts = urlsplit(urlstring)
256 if parts.scheme in ["http", "https"]:
257 pass
258 elif parts.scheme == "http+unix":
259 # If unix socket, mimic HTTP.
260 parts = SplitResult(
261 scheme="http",
262 netloc=parts.netloc,
263 path=parts.path,
264 query=parts.query,
265 fragment=parts.fragment,
266 )
268 class UnixSocketResolver(Resolver):
269 """A resolver that routes HTTP requests to unix sockets
270 in tornado HTTP clients.
271 Due to constraints in Tornados' API, the scheme of the
272 must be `http` (not `http+unix`). Applications should replace
273 the scheme in URLS before making a request to the HTTP client.
274 """
276 def initialize(self, resolver):
277 self.resolver = resolver
279 def close(self):
280 self.resolver.close()
282 async def resolve(self, host, port, *args, **kwargs):
283 return [(socket.AF_UNIX, urldecode_unix_socket_path(host))]
285 resolver = UnixSocketResolver(resolver=Resolver())
286 AsyncHTTPClient.configure(None, resolver=resolver)
287 else:
288 msg = "Unknown URL scheme."
289 raise Exception(msg)
291 # Yield the request for the given client.
292 url = urlunsplit(parts)
293 request = HTTPRequest(url, method=method, body=body, headers=headers, validate_cert=False)
294 yield request
297def fetch(
298 urlstring: str, method: str = "GET", body: Any = None, headers: Any = None
299) -> HTTPResponse:
300 """
301 Send a HTTP, HTTPS, or HTTP+UNIX request
302 to a Tornado Web Server. Returns a tornado HTTPResponse.
303 """
304 with _request_for_tornado_client(
305 urlstring, method=method, body=body, headers=headers
306 ) as request:
307 response = HTTPClient(AsyncHTTPClient).fetch(request)
308 return response
311async def async_fetch(
312 urlstring: str, method: str = "GET", body: Any = None, headers: Any = None, io_loop: Any = None
313) -> HTTPResponse:
314 """
315 Send an asynchronous HTTP, HTTPS, or HTTP+UNIX request
316 to a Tornado Web Server. Returns a tornado HTTPResponse.
317 """
318 with _request_for_tornado_client(
319 urlstring, method=method, body=body, headers=headers
320 ) as request:
321 response = await AsyncHTTPClient(io_loop).fetch(request)
322 return response
325def is_namespace_package(namespace: str) -> bool | None:
326 """Is the provided namespace a Python Namespace Package (PEP420).
328 https://www.python.org/dev/peps/pep-0420/#specification
330 Returns `None` if module is not importable.
332 """
333 # NOTE: using submodule_search_locations because the loader can be None
334 try:
335 spec = importlib.util.find_spec(namespace)
336 except ValueError: # spec is not set - see https://docs.python.org/3/library/importlib.html#importlib.util.find_spec
337 return None
339 if not spec:
340 # e.g. module not installed
341 return None
342 return isinstance(spec.submodule_search_locations, _NamespacePath)
345def filefind(filename: str, path_dirs: Sequence[str]) -> str:
346 """Find a file by looking through a sequence of paths.
348 For use in FileFindHandler.
350 Iterates through a sequence of paths looking for a file and returns
351 the full, absolute path of the first occurrence of the file.
353 Absolute paths are not accepted for inputs.
355 This function does not automatically try any paths,
356 such as the cwd or the user's home directory.
358 Parameters
359 ----------
360 filename : str
361 The filename to look for. Must be a relative path.
362 path_dirs : sequence of str
363 The sequence of paths to look in for the file.
364 Walk through each element and join with ``filename``.
365 Only after ensuring the path resolves within the directory is it checked for existence.
367 Returns
368 -------
369 Raises :exc:`OSError` or returns absolute path to file.
370 """
371 file_path = Path(filename)
373 # If the input is an absolute path, reject it
374 if file_path.is_absolute():
375 msg = f"{filename} is absolute, filefind only accepts relative paths."
376 raise OSError(msg)
378 for path_str in path_dirs:
379 path = Path(path_str).absolute()
380 test_path = path / file_path
381 # os.path.abspath resolves '..', but Path.absolute() doesn't
382 # Path.resolve() does, but traverses symlinks, which we don't want
383 test_path = Path(os.path.abspath(test_path))
384 if not test_path.is_relative_to(path):
385 # points outside root, e.g. via `filename='../foo'`
386 continue
387 # make sure we don't call is_file before we know it's a file within a prefix
388 # GHSA-hrw6-wg82-cm62 - can leak password hash on windows.
389 if test_path.is_file():
390 return os.path.abspath(test_path)
392 msg = f"File {filename!r} does not exist in any of the search paths: {path_dirs!r}"
393 raise OSError(msg)
396def import_item(name: str) -> Any:
397 """Import and return ``bar`` given the string ``foo.bar``.
398 Calling ``bar = import_item("foo.bar")`` is the functional equivalent of
399 executing the code ``from foo import bar``.
400 Parameters
401 ----------
402 name : str
403 The fully qualified name of the module/package being imported.
404 Returns
405 -------
406 mod : module object
407 The module that was imported.
408 """
410 parts = name.rsplit(".", 1)
411 if len(parts) == 2:
412 # called with 'foo.bar....'
413 package, obj = parts
414 module = __import__(package, fromlist=[obj])
415 try:
416 pak = getattr(module, obj)
417 except AttributeError as e:
418 raise ImportError("No module named %s" % obj) from e
419 return pak
420 else:
421 # called with un-dotted string
422 return __import__(parts[0])
425class JupyterServerAuthWarning(RuntimeWarning):
426 """Emitted when authentication configuration issue is detected.
428 Intended for filtering out expected warnings in tests, including
429 downstream tests, rather than for users to silence this warning.
430 """