1"""Notebook related utilities"""
2
3# Copyright (c) Jupyter Development Team.
4# Distributed under the terms of the Modified BSD License.
5from __future__ import annotations
6
7import errno
8import importlib.util
9import os
10import socket
11import sys
12import warnings
13from _frozen_importlib_external import _NamespacePath
14from contextlib import contextmanager
15from pathlib import Path
16from typing import TYPE_CHECKING, Any, NewType
17from urllib.parse import (
18 SplitResult,
19 quote,
20 unquote,
21 urlparse,
22 urlsplit,
23 urlunsplit,
24)
25from urllib.parse import (
26 urljoin as _urljoin,
27)
28from urllib.request import pathname2url as _pathname2url
29
30from jupyter_core.utils import ensure_async as _ensure_async
31from packaging.version import Version
32from tornado.httpclient import AsyncHTTPClient, HTTPClient, HTTPRequest, HTTPResponse
33from tornado.netutil import Resolver
34
35if TYPE_CHECKING:
36 from collections.abc import Generator, Sequence
37
38ApiPath = NewType("ApiPath", str)
39
40# Re-export
41urljoin = _urljoin
42pathname2url = _pathname2url
43ensure_async = _ensure_async
44
45
46def url_path_join(*pieces: str) -> str:
47 """Join components of url into a relative url
48
49 Use to prevent double slash when joining subpath. This will leave the
50 initial and final / in place
51 """
52 initial = pieces[0].startswith("/")
53 final = pieces[-1].endswith("/")
54 stripped = [s.strip("/") for s in pieces]
55 result = "/".join(s for s in stripped if s)
56 if initial:
57 result = "/" + result
58 if final:
59 result = result + "/"
60 if result == "//":
61 result = "/"
62 return result
63
64
65def url_is_absolute(url: str) -> bool:
66 """Determine whether a given URL is absolute"""
67 return urlparse(url).path.startswith("/")
68
69
70def path2url(path: str) -> str:
71 """Convert a local file path to a URL"""
72 pieces = [quote(p) for p in path.split(os.sep)]
73 # preserve trailing /
74 if pieces[-1] == "":
75 pieces[-1] = "/"
76 url = url_path_join(*pieces)
77 return url
78
79
80def url2path(url: str) -> str:
81 """Convert a URL to a local file path"""
82 pieces = [unquote(p) for p in url.split("/")]
83 path = os.path.join(*pieces)
84 return path
85
86
87def url_escape(path: str) -> str:
88 """Escape special characters in a URL path
89
90 Turns '/foo bar/' into '/foo%20bar/'
91 """
92 parts = path.split("/")
93 return "/".join([quote(p) for p in parts])
94
95
96def url_unescape(path: str) -> str:
97 """Unescape special characters in a URL path
98
99 Turns '/foo%20bar/' into '/foo bar/'
100 """
101 return "/".join([unquote(p) for p in path.split("/")])
102
103
104def samefile_simple(path: str, other_path: str) -> bool:
105 """
106 Fill in for os.path.samefile when it is unavailable (Windows+py2).
107
108 Do a case-insensitive string comparison in this case
109 plus comparing the full stat result (including times)
110 because Windows + py2 doesn't support the stat fields
111 needed for identifying if it's the same file (st_ino, st_dev).
112
113 Only to be used if os.path.samefile is not available.
114
115 Parameters
116 ----------
117 path : str
118 representing a path to a file
119 other_path : str
120 representing a path to another file
121
122 Returns
123 -------
124 same: Boolean that is True if both path and other path are the same
125 """
126 path_stat = os.stat(path)
127 other_path_stat = os.stat(other_path)
128 return path.lower() == other_path.lower() and path_stat == other_path_stat
129
130
131def to_os_path(path: ApiPath, root: str = "") -> str:
132 """Convert an API path to a filesystem path
133
134 If given, root will be prepended to the path.
135 root must be a filesystem path already.
136 """
137 parts = str(path).strip("/").split("/")
138 parts = [p for p in parts if p != ""] # remove duplicate splits
139 path_ = os.path.join(root, *parts)
140 return os.path.normpath(path_)
141
142
143def to_api_path(os_path: str, root: str = "") -> ApiPath:
144 """Convert a filesystem path to an API path
145
146 If given, root will be removed from the path.
147 root must be a filesystem path already.
148 """
149 if os_path.startswith(root):
150 os_path = os_path[len(root) :]
151 parts = os_path.strip(os.path.sep).split(os.path.sep)
152 parts = [p for p in parts if p != ""] # remove duplicate splits
153 path = "/".join(parts)
154 return ApiPath(path)
155
156
157def check_version(v: str, check: str) -> bool:
158 """check version string v >= check
159
160 If dev/prerelease tags result in TypeError for string-number comparison,
161 it is assumed that the dependency is satisfied.
162 Users on dev branches are responsible for keeping their own packages up to date.
163 """
164 try:
165 return bool(Version(v) >= Version(check))
166 except TypeError:
167 return True
168
169
170# Copy of IPython.utils.process.check_pid:
171
172
173def _check_pid_win32(pid: int) -> bool:
174 import ctypes
175
176 # OpenProcess returns 0 if no such process (of ours) exists
177 # positive int otherwise
178 return bool(ctypes.windll.kernel32.OpenProcess(1, 0, pid)) # type:ignore[attr-defined]
179
180
181def _check_pid_posix(pid: int) -> bool:
182 """Copy of IPython.utils.process.check_pid"""
183 try:
184 os.kill(pid, 0)
185 except OSError as err:
186 if err.errno == errno.ESRCH:
187 return False
188 elif err.errno == errno.EPERM:
189 # Don't have permission to signal the process - probably means it exists
190 return True
191 raise
192 else:
193 return True
194
195
196if sys.platform == "win32":
197 check_pid = _check_pid_win32
198else:
199 check_pid = _check_pid_posix
200
201
202async def run_sync_in_loop(maybe_async):
203 """**DEPRECATED**: Use ``ensure_async`` from jupyter_core instead."""
204 warnings.warn(
205 "run_sync_in_loop is deprecated since Jupyter Server 2.0, use 'ensure_async' from jupyter_core instead",
206 DeprecationWarning,
207 stacklevel=2,
208 )
209 return ensure_async(maybe_async)
210
211
212def urlencode_unix_socket_path(socket_path: str) -> str:
213 """Encodes a UNIX socket path string from a socket path for the `http+unix` URI form."""
214 return socket_path.replace("/", "%2F")
215
216
217def urldecode_unix_socket_path(socket_path: str) -> str:
218 """Decodes a UNIX sock path string from an encoded sock path for the `http+unix` URI form."""
219 return socket_path.replace("%2F", "/")
220
221
222def urlencode_unix_socket(socket_path: str) -> str:
223 """Encodes a UNIX socket URL from a socket path for the `http+unix` URI form."""
224 return "http+unix://%s" % urlencode_unix_socket_path(socket_path)
225
226
227def unix_socket_in_use(socket_path: str) -> bool:
228 """Checks whether a UNIX socket path on disk is in use by attempting to connect to it."""
229 if not os.path.exists(socket_path):
230 return False
231
232 try:
233 sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
234 sock.connect(socket_path)
235 except OSError:
236 return False
237 else:
238 return True
239 finally:
240 sock.close()
241
242
243@contextmanager
244def _request_for_tornado_client(
245 urlstring: str, method: str = "GET", body: Any = None, headers: Any = None
246) -> Generator[HTTPRequest, None, None]:
247 """A utility that provides a context that handles
248 HTTP, HTTPS, and HTTP+UNIX request.
249 Creates a tornado HTTPRequest object with a URL
250 that tornado's HTTPClients can accept.
251 If the request is made to a unix socket, temporarily
252 configure the AsyncHTTPClient to resolve the URL
253 and connect to the proper socket.
254 """
255 parts = urlsplit(urlstring)
256 if parts.scheme in ["http", "https"]:
257 pass
258 elif parts.scheme == "http+unix":
259 # If unix socket, mimic HTTP.
260 parts = SplitResult(
261 scheme="http",
262 netloc=parts.netloc,
263 path=parts.path,
264 query=parts.query,
265 fragment=parts.fragment,
266 )
267
268 class UnixSocketResolver(Resolver):
269 """A resolver that routes HTTP requests to unix sockets
270 in tornado HTTP clients.
271 Due to constraints in Tornados' API, the scheme of the
272 must be `http` (not `http+unix`). Applications should replace
273 the scheme in URLS before making a request to the HTTP client.
274 """
275
276 def initialize(self, resolver):
277 self.resolver = resolver
278
279 def close(self):
280 self.resolver.close()
281
282 async def resolve(self, host, port, *args, **kwargs):
283 return [(socket.AF_UNIX, urldecode_unix_socket_path(host))]
284
285 resolver = UnixSocketResolver(resolver=Resolver())
286 AsyncHTTPClient.configure(None, resolver=resolver)
287 else:
288 msg = "Unknown URL scheme."
289 raise Exception(msg)
290
291 # Yield the request for the given client.
292 url = urlunsplit(parts)
293 request = HTTPRequest(url, method=method, body=body, headers=headers, validate_cert=False)
294 yield request
295
296
297def fetch(
298 urlstring: str, method: str = "GET", body: Any = None, headers: Any = None
299) -> HTTPResponse:
300 """
301 Send a HTTP, HTTPS, or HTTP+UNIX request
302 to a Tornado Web Server. Returns a tornado HTTPResponse.
303 """
304 with _request_for_tornado_client(
305 urlstring, method=method, body=body, headers=headers
306 ) as request:
307 response = HTTPClient(AsyncHTTPClient).fetch(request)
308 return response
309
310
311async def async_fetch(
312 urlstring: str, method: str = "GET", body: Any = None, headers: Any = None, io_loop: Any = None
313) -> HTTPResponse:
314 """
315 Send an asynchronous HTTP, HTTPS, or HTTP+UNIX request
316 to a Tornado Web Server. Returns a tornado HTTPResponse.
317 """
318 with _request_for_tornado_client(
319 urlstring, method=method, body=body, headers=headers
320 ) as request:
321 response = await AsyncHTTPClient(io_loop).fetch(request)
322 return response
323
324
325def is_namespace_package(namespace: str) -> bool | None:
326 """Is the provided namespace a Python Namespace Package (PEP420).
327
328 https://www.python.org/dev/peps/pep-0420/#specification
329
330 Returns `None` if module is not importable.
331
332 """
333 # NOTE: using submodule_search_locations because the loader can be None
334 try:
335 spec = importlib.util.find_spec(namespace)
336 except ValueError: # spec is not set - see https://docs.python.org/3/library/importlib.html#importlib.util.find_spec
337 return None
338
339 if not spec:
340 # e.g. module not installed
341 return None
342 return isinstance(spec.submodule_search_locations, _NamespacePath)
343
344
345def filefind(filename: str, path_dirs: Sequence[str]) -> str:
346 """Find a file by looking through a sequence of paths.
347
348 For use in FileFindHandler.
349
350 Iterates through a sequence of paths looking for a file and returns
351 the full, absolute path of the first occurrence of the file.
352
353 Absolute paths are not accepted for inputs.
354
355 This function does not automatically try any paths,
356 such as the cwd or the user's home directory.
357
358 Parameters
359 ----------
360 filename : str
361 The filename to look for. Must be a relative path.
362 path_dirs : sequence of str
363 The sequence of paths to look in for the file.
364 Walk through each element and join with ``filename``.
365 Only after ensuring the path resolves within the directory is it checked for existence.
366
367 Returns
368 -------
369 Raises :exc:`OSError` or returns absolute path to file.
370 """
371 file_path = Path(filename)
372
373 # If the input is an absolute path, reject it
374 if file_path.is_absolute():
375 msg = f"{filename} is absolute, filefind only accepts relative paths."
376 raise OSError(msg)
377
378 for path_str in path_dirs:
379 path = Path(path_str).absolute()
380 test_path = path / file_path
381 # os.path.abspath resolves '..', but Path.absolute() doesn't
382 # Path.resolve() does, but traverses symlinks, which we don't want
383 test_path = Path(os.path.abspath(test_path))
384 if not test_path.is_relative_to(path):
385 # points outside root, e.g. via `filename='../foo'`
386 continue
387 # make sure we don't call is_file before we know it's a file within a prefix
388 # GHSA-hrw6-wg82-cm62 - can leak password hash on windows.
389 if test_path.is_file():
390 return os.path.abspath(test_path)
391
392 msg = f"File {filename!r} does not exist in any of the search paths: {path_dirs!r}"
393 raise OSError(msg)
394
395
396def import_item(name: str) -> Any:
397 """Import and return ``bar`` given the string ``foo.bar``.
398 Calling ``bar = import_item("foo.bar")`` is the functional equivalent of
399 executing the code ``from foo import bar``.
400 Parameters
401 ----------
402 name : str
403 The fully qualified name of the module/package being imported.
404 Returns
405 -------
406 mod : module object
407 The module that was imported.
408 """
409
410 parts = name.rsplit(".", 1)
411 if len(parts) == 2:
412 # called with 'foo.bar....'
413 package, obj = parts
414 module = __import__(package, fromlist=[obj])
415 try:
416 pak = getattr(module, obj)
417 except AttributeError as e:
418 raise ImportError("No module named %s" % obj) from e
419 return pak
420 else:
421 # called with un-dotted string
422 return __import__(parts[0])
423
424
425class JupyterServerAuthWarning(RuntimeWarning):
426 """Emitted when authentication configuration issue is detected.
427
428 Intended for filtering out expected warnings in tests, including
429 downstream tests, rather than for users to silence this warning.
430 """