1"""Notebook related utilities"""
2
3# Copyright (c) Jupyter Development Team.
4# Distributed under the terms of the Modified BSD License.
5from __future__ import annotations
6
7import errno
8import importlib.util
9import os
10import socket
11import sys
12import warnings
13from _frozen_importlib_external import _NamespacePath
14from contextlib import contextmanager
15from pathlib import Path
16from typing import Any, Generator, NewType, Sequence
17from urllib.parse import (
18 SplitResult,
19 quote,
20 unquote,
21 urlparse,
22 urlsplit,
23 urlunsplit,
24)
25from urllib.parse import (
26 urljoin as _urljoin,
27)
28from urllib.request import pathname2url as _pathname2url
29
30from jupyter_core.utils import ensure_async as _ensure_async
31from packaging.version import Version
32from tornado.httpclient import AsyncHTTPClient, HTTPClient, HTTPRequest, HTTPResponse
33from tornado.netutil import Resolver
34
35ApiPath = NewType("ApiPath", str)
36
37# Re-export
38urljoin = _urljoin
39pathname2url = _pathname2url
40ensure_async = _ensure_async
41
42
43def url_path_join(*pieces: str) -> str:
44 """Join components of url into a relative url
45
46 Use to prevent double slash when joining subpath. This will leave the
47 initial and final / in place
48 """
49 initial = pieces[0].startswith("/")
50 final = pieces[-1].endswith("/")
51 stripped = [s.strip("/") for s in pieces]
52 result = "/".join(s for s in stripped if s)
53 if initial:
54 result = "/" + result
55 if final:
56 result = result + "/"
57 if result == "//":
58 result = "/"
59 return result
60
61
62def url_is_absolute(url: str) -> bool:
63 """Determine whether a given URL is absolute"""
64 return urlparse(url).path.startswith("/")
65
66
67def path2url(path: str) -> str:
68 """Convert a local file path to a URL"""
69 pieces = [quote(p) for p in path.split(os.sep)]
70 # preserve trailing /
71 if pieces[-1] == "":
72 pieces[-1] = "/"
73 url = url_path_join(*pieces)
74 return url
75
76
77def url2path(url: str) -> str:
78 """Convert a URL to a local file path"""
79 pieces = [unquote(p) for p in url.split("/")]
80 path = os.path.join(*pieces)
81 return path
82
83
84def url_escape(path: str) -> str:
85 """Escape special characters in a URL path
86
87 Turns '/foo bar/' into '/foo%20bar/'
88 """
89 parts = path.split("/")
90 return "/".join([quote(p) for p in parts])
91
92
93def url_unescape(path: str) -> str:
94 """Unescape special characters in a URL path
95
96 Turns '/foo%20bar/' into '/foo bar/'
97 """
98 return "/".join([unquote(p) for p in path.split("/")])
99
100
101def samefile_simple(path: str, other_path: str) -> bool:
102 """
103 Fill in for os.path.samefile when it is unavailable (Windows+py2).
104
105 Do a case-insensitive string comparison in this case
106 plus comparing the full stat result (including times)
107 because Windows + py2 doesn't support the stat fields
108 needed for identifying if it's the same file (st_ino, st_dev).
109
110 Only to be used if os.path.samefile is not available.
111
112 Parameters
113 ----------
114 path : str
115 representing a path to a file
116 other_path : str
117 representing a path to another file
118
119 Returns
120 -------
121 same: Boolean that is True if both path and other path are the same
122 """
123 path_stat = os.stat(path)
124 other_path_stat = os.stat(other_path)
125 return path.lower() == other_path.lower() and path_stat == other_path_stat
126
127
128def to_os_path(path: ApiPath, root: str = "") -> str:
129 """Convert an API path to a filesystem path
130
131 If given, root will be prepended to the path.
132 root must be a filesystem path already.
133 """
134 parts = str(path).strip("/").split("/")
135 parts = [p for p in parts if p != ""] # remove duplicate splits
136 path_ = os.path.join(root, *parts)
137 return os.path.normpath(path_)
138
139
140def to_api_path(os_path: str, root: str = "") -> ApiPath:
141 """Convert a filesystem path to an API path
142
143 If given, root will be removed from the path.
144 root must be a filesystem path already.
145 """
146 if os_path.startswith(root):
147 os_path = os_path[len(root) :]
148 parts = os_path.strip(os.path.sep).split(os.path.sep)
149 parts = [p for p in parts if p != ""] # remove duplicate splits
150 path = "/".join(parts)
151 return ApiPath(path)
152
153
154def check_version(v: str, check: str) -> bool:
155 """check version string v >= check
156
157 If dev/prerelease tags result in TypeError for string-number comparison,
158 it is assumed that the dependency is satisfied.
159 Users on dev branches are responsible for keeping their own packages up to date.
160 """
161 try:
162 return bool(Version(v) >= Version(check))
163 except TypeError:
164 return True
165
166
167# Copy of IPython.utils.process.check_pid:
168
169
170def _check_pid_win32(pid: int) -> bool:
171 import ctypes
172
173 # OpenProcess returns 0 if no such process (of ours) exists
174 # positive int otherwise
175 return bool(ctypes.windll.kernel32.OpenProcess(1, 0, pid)) # type:ignore[attr-defined]
176
177
178def _check_pid_posix(pid: int) -> bool:
179 """Copy of IPython.utils.process.check_pid"""
180 try:
181 os.kill(pid, 0)
182 except OSError as err:
183 if err.errno == errno.ESRCH:
184 return False
185 elif err.errno == errno.EPERM:
186 # Don't have permission to signal the process - probably means it exists
187 return True
188 raise
189 else:
190 return True
191
192
193if sys.platform == "win32":
194 check_pid = _check_pid_win32
195else:
196 check_pid = _check_pid_posix
197
198
199async def run_sync_in_loop(maybe_async):
200 """**DEPRECATED**: Use ``ensure_async`` from jupyter_core instead."""
201 warnings.warn(
202 "run_sync_in_loop is deprecated since Jupyter Server 2.0, use 'ensure_async' from jupyter_core instead",
203 DeprecationWarning,
204 stacklevel=2,
205 )
206 return ensure_async(maybe_async)
207
208
209def urlencode_unix_socket_path(socket_path: str) -> str:
210 """Encodes a UNIX socket path string from a socket path for the `http+unix` URI form."""
211 return socket_path.replace("/", "%2F")
212
213
214def urldecode_unix_socket_path(socket_path: str) -> str:
215 """Decodes a UNIX sock path string from an encoded sock path for the `http+unix` URI form."""
216 return socket_path.replace("%2F", "/")
217
218
219def urlencode_unix_socket(socket_path: str) -> str:
220 """Encodes a UNIX socket URL from a socket path for the `http+unix` URI form."""
221 return "http+unix://%s" % urlencode_unix_socket_path(socket_path)
222
223
224def unix_socket_in_use(socket_path: str) -> bool:
225 """Checks whether a UNIX socket path on disk is in use by attempting to connect to it."""
226 if not os.path.exists(socket_path):
227 return False
228
229 try:
230 sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
231 sock.connect(socket_path)
232 except OSError:
233 return False
234 else:
235 return True
236 finally:
237 sock.close()
238
239
240@contextmanager
241def _request_for_tornado_client(
242 urlstring: str, method: str = "GET", body: Any = None, headers: Any = None
243) -> Generator[HTTPRequest, None, None]:
244 """A utility that provides a context that handles
245 HTTP, HTTPS, and HTTP+UNIX request.
246 Creates a tornado HTTPRequest object with a URL
247 that tornado's HTTPClients can accept.
248 If the request is made to a unix socket, temporarily
249 configure the AsyncHTTPClient to resolve the URL
250 and connect to the proper socket.
251 """
252 parts = urlsplit(urlstring)
253 if parts.scheme in ["http", "https"]:
254 pass
255 elif parts.scheme == "http+unix":
256 # If unix socket, mimic HTTP.
257 parts = SplitResult(
258 scheme="http",
259 netloc=parts.netloc,
260 path=parts.path,
261 query=parts.query,
262 fragment=parts.fragment,
263 )
264
265 class UnixSocketResolver(Resolver):
266 """A resolver that routes HTTP requests to unix sockets
267 in tornado HTTP clients.
268 Due to constraints in Tornados' API, the scheme of the
269 must be `http` (not `http+unix`). Applications should replace
270 the scheme in URLS before making a request to the HTTP client.
271 """
272
273 def initialize(self, resolver):
274 self.resolver = resolver
275
276 def close(self):
277 self.resolver.close()
278
279 async def resolve(self, host, port, *args, **kwargs):
280 return [(socket.AF_UNIX, urldecode_unix_socket_path(host))]
281
282 resolver = UnixSocketResolver(resolver=Resolver())
283 AsyncHTTPClient.configure(None, resolver=resolver)
284 else:
285 msg = "Unknown URL scheme."
286 raise Exception(msg)
287
288 # Yield the request for the given client.
289 url = urlunsplit(parts)
290 request = HTTPRequest(url, method=method, body=body, headers=headers, validate_cert=False)
291 yield request
292
293
294def fetch(
295 urlstring: str, method: str = "GET", body: Any = None, headers: Any = None
296) -> HTTPResponse:
297 """
298 Send a HTTP, HTTPS, or HTTP+UNIX request
299 to a Tornado Web Server. Returns a tornado HTTPResponse.
300 """
301 with _request_for_tornado_client(
302 urlstring, method=method, body=body, headers=headers
303 ) as request:
304 response = HTTPClient(AsyncHTTPClient).fetch(request)
305 return response
306
307
308async def async_fetch(
309 urlstring: str, method: str = "GET", body: Any = None, headers: Any = None, io_loop: Any = None
310) -> HTTPResponse:
311 """
312 Send an asynchronous HTTP, HTTPS, or HTTP+UNIX request
313 to a Tornado Web Server. Returns a tornado HTTPResponse.
314 """
315 with _request_for_tornado_client(
316 urlstring, method=method, body=body, headers=headers
317 ) as request:
318 response = await AsyncHTTPClient(io_loop).fetch(request)
319 return response
320
321
322def is_namespace_package(namespace: str) -> bool | None:
323 """Is the provided namespace a Python Namespace Package (PEP420).
324
325 https://www.python.org/dev/peps/pep-0420/#specification
326
327 Returns `None` if module is not importable.
328
329 """
330 # NOTE: using submodule_search_locations because the loader can be None
331 try:
332 spec = importlib.util.find_spec(namespace)
333 except ValueError: # spec is not set - see https://docs.python.org/3/library/importlib.html#importlib.util.find_spec
334 return None
335
336 if not spec:
337 # e.g. module not installed
338 return None
339 return isinstance(spec.submodule_search_locations, _NamespacePath)
340
341
342def filefind(filename: str, path_dirs: Sequence[str]) -> str:
343 """Find a file by looking through a sequence of paths.
344
345 For use in FileFindHandler.
346
347 Iterates through a sequence of paths looking for a file and returns
348 the full, absolute path of the first occurrence of the file.
349
350 Absolute paths are not accepted for inputs.
351
352 This function does not automatically try any paths,
353 such as the cwd or the user's home directory.
354
355 Parameters
356 ----------
357 filename : str
358 The filename to look for. Must be a relative path.
359 path_dirs : sequence of str
360 The sequence of paths to look in for the file.
361 Walk through each element and join with ``filename``.
362 Only after ensuring the path resolves within the directory is it checked for existence.
363
364 Returns
365 -------
366 Raises :exc:`OSError` or returns absolute path to file.
367 """
368 file_path = Path(filename)
369
370 # If the input is an absolute path, reject it
371 if file_path.is_absolute():
372 msg = f"{filename} is absolute, filefind only accepts relative paths."
373 raise OSError(msg)
374
375 for path_str in path_dirs:
376 path = Path(path_str).absolute()
377 test_path = path / file_path
378 # os.path.abspath resolves '..', but Path.absolute() doesn't
379 # Path.resolve() does, but traverses symlinks, which we don't want
380 test_path = Path(os.path.abspath(test_path))
381 if sys.version_info >= (3, 9):
382 if not test_path.is_relative_to(path):
383 # points outside root, e.g. via `filename='../foo'`
384 continue
385 else:
386 # is_relative_to is new in 3.9
387 try:
388 test_path.relative_to(path)
389 except ValueError:
390 # points outside root, e.g. via `filename='../foo'`
391 continue
392 # make sure we don't call is_file before we know it's a file within a prefix
393 # GHSA-hrw6-wg82-cm62 - can leak password hash on windows.
394 if test_path.is_file():
395 return os.path.abspath(test_path)
396
397 msg = f"File {filename!r} does not exist in any of the search paths: {path_dirs!r}"
398 raise OSError(msg)
399
400
401def import_item(name: str) -> Any:
402 """Import and return ``bar`` given the string ``foo.bar``.
403 Calling ``bar = import_item("foo.bar")`` is the functional equivalent of
404 executing the code ``from foo import bar``.
405 Parameters
406 ----------
407 name : str
408 The fully qualified name of the module/package being imported.
409 Returns
410 -------
411 mod : module object
412 The module that was imported.
413 """
414
415 parts = name.rsplit(".", 1)
416 if len(parts) == 2:
417 # called with 'foo.bar....'
418 package, obj = parts
419 module = __import__(package, fromlist=[obj])
420 try:
421 pak = getattr(module, obj)
422 except AttributeError as e:
423 raise ImportError("No module named %s" % obj) from e
424 return pak
425 else:
426 # called with un-dotted string
427 return __import__(parts[0])
428
429
430class JupyterServerAuthWarning(RuntimeWarning):
431 """Emitted when authentication configuration issue is detected.
432
433 Intended for filtering out expected warnings in tests, including
434 downstream tests, rather than for users to silence this warning.
435 """