Coverage for /pythoncovmergedfiles/medio/medio/src/jupyter_server/jupyter_server/utils.py: 28%
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1"""Notebook related utilities"""
3# Copyright (c) Jupyter Development Team.
4# Distributed under the terms of the Modified BSD License.
5from __future__ import annotations
7import errno
8import importlib.util
9import os
10import socket
11import sys
12import warnings
13from _frozen_importlib_external import _NamespacePath
14from contextlib import contextmanager
15from pathlib import Path
16from typing import Any, Generator, NewType, Sequence
17from urllib.parse import (
18 SplitResult,
19 quote,
20 unquote,
21 urlparse,
22 urlsplit,
23 urlunsplit,
24)
25from urllib.parse import (
26 urljoin as _urljoin,
27)
28from urllib.request import pathname2url as _pathname2url
30from jupyter_core.utils import ensure_async as _ensure_async
31from packaging.version import Version
32from tornado.httpclient import AsyncHTTPClient, HTTPClient, HTTPRequest, HTTPResponse
33from tornado.netutil import Resolver
35ApiPath = NewType("ApiPath", str)
37# Re-export
38urljoin = _urljoin
39pathname2url = _pathname2url
40ensure_async = _ensure_async
43def url_path_join(*pieces: str) -> str:
44 """Join components of url into a relative url
46 Use to prevent double slash when joining subpath. This will leave the
47 initial and final / in place
48 """
49 initial = pieces[0].startswith("/")
50 final = pieces[-1].endswith("/")
51 stripped = [s.strip("/") for s in pieces]
52 result = "/".join(s for s in stripped if s)
53 if initial:
54 result = "/" + result
55 if final:
56 result = result + "/"
57 if result == "//":
58 result = "/"
59 return result
62def url_is_absolute(url: str) -> bool:
63 """Determine whether a given URL is absolute"""
64 return urlparse(url).path.startswith("/")
67def path2url(path: str) -> str:
68 """Convert a local file path to a URL"""
69 pieces = [quote(p) for p in path.split(os.sep)]
70 # preserve trailing /
71 if pieces[-1] == "":
72 pieces[-1] = "/"
73 url = url_path_join(*pieces)
74 return url
77def url2path(url: str) -> str:
78 """Convert a URL to a local file path"""
79 pieces = [unquote(p) for p in url.split("/")]
80 path = os.path.join(*pieces)
81 return path
84def url_escape(path: str) -> str:
85 """Escape special characters in a URL path
87 Turns '/foo bar/' into '/foo%20bar/'
88 """
89 parts = path.split("/")
90 return "/".join([quote(p) for p in parts])
93def url_unescape(path: str) -> str:
94 """Unescape special characters in a URL path
96 Turns '/foo%20bar/' into '/foo bar/'
97 """
98 return "/".join([unquote(p) for p in path.split("/")])
101def samefile_simple(path: str, other_path: str) -> bool:
102 """
103 Fill in for os.path.samefile when it is unavailable (Windows+py2).
105 Do a case-insensitive string comparison in this case
106 plus comparing the full stat result (including times)
107 because Windows + py2 doesn't support the stat fields
108 needed for identifying if it's the same file (st_ino, st_dev).
110 Only to be used if os.path.samefile is not available.
112 Parameters
113 ----------
114 path : str
115 representing a path to a file
116 other_path : str
117 representing a path to another file
119 Returns
120 -------
121 same: Boolean that is True if both path and other path are the same
122 """
123 path_stat = os.stat(path)
124 other_path_stat = os.stat(other_path)
125 return path.lower() == other_path.lower() and path_stat == other_path_stat
128def to_os_path(path: ApiPath, root: str = "") -> str:
129 """Convert an API path to a filesystem path
131 If given, root will be prepended to the path.
132 root must be a filesystem path already.
133 """
134 parts = str(path).strip("/").split("/")
135 parts = [p for p in parts if p != ""] # remove duplicate splits
136 path_ = os.path.join(root, *parts)
137 return os.path.normpath(path_)
140def to_api_path(os_path: str, root: str = "") -> ApiPath:
141 """Convert a filesystem path to an API path
143 If given, root will be removed from the path.
144 root must be a filesystem path already.
145 """
146 if os_path.startswith(root):
147 os_path = os_path[len(root) :]
148 parts = os_path.strip(os.path.sep).split(os.path.sep)
149 parts = [p for p in parts if p != ""] # remove duplicate splits
150 path = "/".join(parts)
151 return ApiPath(path)
154def check_version(v: str, check: str) -> bool:
155 """check version string v >= check
157 If dev/prerelease tags result in TypeError for string-number comparison,
158 it is assumed that the dependency is satisfied.
159 Users on dev branches are responsible for keeping their own packages up to date.
160 """
161 try:
162 return bool(Version(v) >= Version(check))
163 except TypeError:
164 return True
167# Copy of IPython.utils.process.check_pid:
170def _check_pid_win32(pid: int) -> bool:
171 import ctypes
173 # OpenProcess returns 0 if no such process (of ours) exists
174 # positive int otherwise
175 return bool(ctypes.windll.kernel32.OpenProcess(1, 0, pid)) # type:ignore[attr-defined]
178def _check_pid_posix(pid: int) -> bool:
179 """Copy of IPython.utils.process.check_pid"""
180 try:
181 os.kill(pid, 0)
182 except OSError as err:
183 if err.errno == errno.ESRCH:
184 return False
185 elif err.errno == errno.EPERM:
186 # Don't have permission to signal the process - probably means it exists
187 return True
188 raise
189 else:
190 return True
193if sys.platform == "win32":
194 check_pid = _check_pid_win32
195else:
196 check_pid = _check_pid_posix
199async def run_sync_in_loop(maybe_async):
200 """**DEPRECATED**: Use ``ensure_async`` from jupyter_core instead."""
201 warnings.warn(
202 "run_sync_in_loop is deprecated since Jupyter Server 2.0, use 'ensure_async' from jupyter_core instead",
203 DeprecationWarning,
204 stacklevel=2,
205 )
206 return ensure_async(maybe_async)
209def urlencode_unix_socket_path(socket_path: str) -> str:
210 """Encodes a UNIX socket path string from a socket path for the `http+unix` URI form."""
211 return socket_path.replace("/", "%2F")
214def urldecode_unix_socket_path(socket_path: str) -> str:
215 """Decodes a UNIX sock path string from an encoded sock path for the `http+unix` URI form."""
216 return socket_path.replace("%2F", "/")
219def urlencode_unix_socket(socket_path: str) -> str:
220 """Encodes a UNIX socket URL from a socket path for the `http+unix` URI form."""
221 return "http+unix://%s" % urlencode_unix_socket_path(socket_path)
224def unix_socket_in_use(socket_path: str) -> bool:
225 """Checks whether a UNIX socket path on disk is in use by attempting to connect to it."""
226 if not os.path.exists(socket_path):
227 return False
229 try:
230 sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
231 sock.connect(socket_path)
232 except OSError:
233 return False
234 else:
235 return True
236 finally:
237 sock.close()
240@contextmanager
241def _request_for_tornado_client(
242 urlstring: str, method: str = "GET", body: Any = None, headers: Any = None
243) -> Generator[HTTPRequest, None, None]:
244 """A utility that provides a context that handles
245 HTTP, HTTPS, and HTTP+UNIX request.
246 Creates a tornado HTTPRequest object with a URL
247 that tornado's HTTPClients can accept.
248 If the request is made to a unix socket, temporarily
249 configure the AsyncHTTPClient to resolve the URL
250 and connect to the proper socket.
251 """
252 parts = urlsplit(urlstring)
253 if parts.scheme in ["http", "https"]:
254 pass
255 elif parts.scheme == "http+unix":
256 # If unix socket, mimic HTTP.
257 parts = SplitResult(
258 scheme="http",
259 netloc=parts.netloc,
260 path=parts.path,
261 query=parts.query,
262 fragment=parts.fragment,
263 )
265 class UnixSocketResolver(Resolver):
266 """A resolver that routes HTTP requests to unix sockets
267 in tornado HTTP clients.
268 Due to constraints in Tornados' API, the scheme of the
269 must be `http` (not `http+unix`). Applications should replace
270 the scheme in URLS before making a request to the HTTP client.
271 """
273 def initialize(self, resolver):
274 self.resolver = resolver
276 def close(self):
277 self.resolver.close()
279 async def resolve(self, host, port, *args, **kwargs):
280 return [(socket.AF_UNIX, urldecode_unix_socket_path(host))]
282 resolver = UnixSocketResolver(resolver=Resolver())
283 AsyncHTTPClient.configure(None, resolver=resolver)
284 else:
285 msg = "Unknown URL scheme."
286 raise Exception(msg)
288 # Yield the request for the given client.
289 url = urlunsplit(parts)
290 request = HTTPRequest(url, method=method, body=body, headers=headers, validate_cert=False)
291 yield request
294def fetch(
295 urlstring: str, method: str = "GET", body: Any = None, headers: Any = None
296) -> HTTPResponse:
297 """
298 Send a HTTP, HTTPS, or HTTP+UNIX request
299 to a Tornado Web Server. Returns a tornado HTTPResponse.
300 """
301 with _request_for_tornado_client(
302 urlstring, method=method, body=body, headers=headers
303 ) as request:
304 response = HTTPClient(AsyncHTTPClient).fetch(request)
305 return response
308async def async_fetch(
309 urlstring: str, method: str = "GET", body: Any = None, headers: Any = None, io_loop: Any = None
310) -> HTTPResponse:
311 """
312 Send an asynchronous HTTP, HTTPS, or HTTP+UNIX request
313 to a Tornado Web Server. Returns a tornado HTTPResponse.
314 """
315 with _request_for_tornado_client(
316 urlstring, method=method, body=body, headers=headers
317 ) as request:
318 response = await AsyncHTTPClient(io_loop).fetch(request)
319 return response
322def is_namespace_package(namespace: str) -> bool | None:
323 """Is the provided namespace a Python Namespace Package (PEP420).
325 https://www.python.org/dev/peps/pep-0420/#specification
327 Returns `None` if module is not importable.
329 """
330 # NOTE: using submodule_search_locations because the loader can be None
331 try:
332 spec = importlib.util.find_spec(namespace)
333 except ValueError: # spec is not set - see https://docs.python.org/3/library/importlib.html#importlib.util.find_spec
334 return None
336 if not spec:
337 # e.g. module not installed
338 return None
339 return isinstance(spec.submodule_search_locations, _NamespacePath)
342def filefind(filename: str, path_dirs: Sequence[str]) -> str:
343 """Find a file by looking through a sequence of paths.
345 For use in FileFindHandler.
347 Iterates through a sequence of paths looking for a file and returns
348 the full, absolute path of the first occurrence of the file.
350 Absolute paths are not accepted for inputs.
352 This function does not automatically try any paths,
353 such as the cwd or the user's home directory.
355 Parameters
356 ----------
357 filename : str
358 The filename to look for. Must be a relative path.
359 path_dirs : sequence of str
360 The sequence of paths to look in for the file.
361 Walk through each element and join with ``filename``.
362 Only after ensuring the path resolves within the directory is it checked for existence.
364 Returns
365 -------
366 Raises :exc:`OSError` or returns absolute path to file.
367 """
368 file_path = Path(filename)
370 # If the input is an absolute path, reject it
371 if file_path.is_absolute():
372 msg = f"{filename} is absolute, filefind only accepts relative paths."
373 raise OSError(msg)
375 for path_str in path_dirs:
376 path = Path(path_str).absolute()
377 test_path = path / file_path
378 # os.path.abspath resolves '..', but Path.absolute() doesn't
379 # Path.resolve() does, but traverses symlinks, which we don't want
380 test_path = Path(os.path.abspath(test_path))
381 if sys.version_info >= (3, 9):
382 if not test_path.is_relative_to(path):
383 # points outside root, e.g. via `filename='../foo'`
384 continue
385 else:
386 # is_relative_to is new in 3.9
387 try:
388 test_path.relative_to(path)
389 except ValueError:
390 # points outside root, e.g. via `filename='../foo'`
391 continue
392 # make sure we don't call is_file before we know it's a file within a prefix
393 # GHSA-hrw6-wg82-cm62 - can leak password hash on windows.
394 if test_path.is_file():
395 return os.path.abspath(test_path)
397 msg = f"File {filename!r} does not exist in any of the search paths: {path_dirs!r}"
398 raise OSError(msg)
401def import_item(name: str) -> Any:
402 """Import and return ``bar`` given the string ``foo.bar``.
403 Calling ``bar = import_item("foo.bar")`` is the functional equivalent of
404 executing the code ``from foo import bar``.
405 Parameters
406 ----------
407 name : str
408 The fully qualified name of the module/package being imported.
409 Returns
410 -------
411 mod : module object
412 The module that was imported.
413 """
415 parts = name.rsplit(".", 1)
416 if len(parts) == 2:
417 # called with 'foo.bar....'
418 package, obj = parts
419 module = __import__(package, fromlist=[obj])
420 try:
421 pak = getattr(module, obj)
422 except AttributeError as e:
423 raise ImportError("No module named %s" % obj) from e
424 return pak
425 else:
426 # called with un-dotted string
427 return __import__(parts[0])
430class JupyterServerAuthWarning(RuntimeWarning):
431 """Emitted when authentication configuration issue is detected.
433 Intended for filtering out expected warnings in tests, including
434 downstream tests, rather than for users to silence this warning.
435 """