Coverage for /pythoncovmergedfiles/medio/medio/src/jupyter_server/jupyter_server/utils.py: 29%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

174 statements  

1"""Notebook related utilities""" 

2 

3# Copyright (c) Jupyter Development Team. 

4# Distributed under the terms of the Modified BSD License. 

5from __future__ import annotations 

6 

7import errno 

8import importlib.util 

9import os 

10import socket 

11import sys 

12import warnings 

13from _frozen_importlib_external import _NamespacePath 

14from contextlib import contextmanager 

15from pathlib import Path 

16from typing import TYPE_CHECKING, Any, NewType 

17from urllib.parse import ( 

18 SplitResult, 

19 quote, 

20 unquote, 

21 urlparse, 

22 urlsplit, 

23 urlunsplit, 

24) 

25from urllib.parse import ( 

26 urljoin as _urljoin, 

27) 

28from urllib.request import pathname2url as _pathname2url 

29 

30from jupyter_core.utils import ensure_async as _ensure_async 

31from packaging.version import Version 

32from tornado.httpclient import AsyncHTTPClient, HTTPClient, HTTPRequest, HTTPResponse 

33from tornado.netutil import Resolver 

34 

35if TYPE_CHECKING: 

36 from collections.abc import Generator, Sequence 

37 

38ApiPath = NewType("ApiPath", str) 

39 

40# Re-export 

41urljoin = _urljoin 

42pathname2url = _pathname2url 

43ensure_async = _ensure_async 

44 

45 

46def url_path_join(*pieces: str) -> str: 

47 """Join components of url into a relative url 

48 

49 Use to prevent double slash when joining subpath. This will leave the 

50 initial and final / in place 

51 """ 

52 initial = pieces[0].startswith("/") 

53 final = pieces[-1].endswith("/") 

54 stripped = [s.strip("/") for s in pieces] 

55 result = "/".join(s for s in stripped if s) 

56 if initial: 

57 result = "/" + result 

58 if final: 

59 result = result + "/" 

60 if result == "//": 

61 result = "/" 

62 return result 

63 

64 

65def url_is_absolute(url: str) -> bool: 

66 """Determine whether a given URL is absolute""" 

67 return urlparse(url).path.startswith("/") 

68 

69 

70def path2url(path: str) -> str: 

71 """Convert a local file path to a URL""" 

72 pieces = [quote(p) for p in path.split(os.sep)] 

73 # preserve trailing / 

74 if pieces[-1] == "": 

75 pieces[-1] = "/" 

76 url = url_path_join(*pieces) 

77 return url 

78 

79 

80def url2path(url: str) -> str: 

81 """Convert a URL to a local file path""" 

82 pieces = [unquote(p) for p in url.split("/")] 

83 path = os.path.join(*pieces) 

84 return path 

85 

86 

87def url_escape(path: str) -> str: 

88 """Escape special characters in a URL path 

89 

90 Turns '/foo bar/' into '/foo%20bar/' 

91 """ 

92 parts = path.split("/") 

93 return "/".join([quote(p) for p in parts]) 

94 

95 

96def url_unescape(path: str) -> str: 

97 """Unescape special characters in a URL path 

98 

99 Turns '/foo%20bar/' into '/foo bar/' 

100 """ 

101 return "/".join([unquote(p) for p in path.split("/")]) 

102 

103 

104def samefile_simple(path: str, other_path: str) -> bool: 

105 """ 

106 Fill in for os.path.samefile when it is unavailable (Windows+py2). 

107 

108 Do a case-insensitive string comparison in this case 

109 plus comparing the full stat result (including times) 

110 because Windows + py2 doesn't support the stat fields 

111 needed for identifying if it's the same file (st_ino, st_dev). 

112 

113 Only to be used if os.path.samefile is not available. 

114 

115 Parameters 

116 ---------- 

117 path : str 

118 representing a path to a file 

119 other_path : str 

120 representing a path to another file 

121 

122 Returns 

123 ------- 

124 same: Boolean that is True if both path and other path are the same 

125 """ 

126 path_stat = os.stat(path) 

127 other_path_stat = os.stat(other_path) 

128 return path.lower() == other_path.lower() and path_stat == other_path_stat 

129 

130 

131def to_os_path(path: ApiPath, root: str = "") -> str: 

132 """Convert an API path to a filesystem path 

133 

134 If given, root will be prepended to the path. 

135 root must be a filesystem path already. 

136 """ 

137 parts = str(path).strip("/").split("/") 

138 parts = [p for p in parts if p != ""] # remove duplicate splits 

139 path_ = os.path.join(root, *parts) 

140 return os.path.normpath(path_) 

141 

142 

143def to_api_path(os_path: str, root: str = "") -> ApiPath: 

144 """Convert a filesystem path to an API path 

145 

146 If given, root will be removed from the path. 

147 root must be a filesystem path already. 

148 """ 

149 if os_path.startswith(root): 

150 os_path = os_path[len(root) :] 

151 parts = os_path.strip(os.path.sep).split(os.path.sep) 

152 parts = [p for p in parts if p != ""] # remove duplicate splits 

153 path = "/".join(parts) 

154 return ApiPath(path) 

155 

156 

157def check_version(v: str, check: str) -> bool: 

158 """check version string v >= check 

159 

160 If dev/prerelease tags result in TypeError for string-number comparison, 

161 it is assumed that the dependency is satisfied. 

162 Users on dev branches are responsible for keeping their own packages up to date. 

163 """ 

164 try: 

165 return bool(Version(v) >= Version(check)) 

166 except TypeError: 

167 return True 

168 

169 

170# Copy of IPython.utils.process.check_pid: 

171 

172 

173def _check_pid_win32(pid: int) -> bool: 

174 import ctypes 

175 

176 # OpenProcess returns 0 if no such process (of ours) exists 

177 # positive int otherwise 

178 return bool(ctypes.windll.kernel32.OpenProcess(1, 0, pid)) # type:ignore[attr-defined] 

179 

180 

181def _check_pid_posix(pid: int) -> bool: 

182 """Copy of IPython.utils.process.check_pid""" 

183 try: 

184 os.kill(pid, 0) 

185 except OSError as err: 

186 if err.errno == errno.ESRCH: 

187 return False 

188 elif err.errno == errno.EPERM: 

189 # Don't have permission to signal the process - probably means it exists 

190 return True 

191 raise 

192 else: 

193 return True 

194 

195 

196if sys.platform == "win32": 

197 check_pid = _check_pid_win32 

198else: 

199 check_pid = _check_pid_posix 

200 

201 

202async def run_sync_in_loop(maybe_async): 

203 """**DEPRECATED**: Use ``ensure_async`` from jupyter_core instead.""" 

204 warnings.warn( 

205 "run_sync_in_loop is deprecated since Jupyter Server 2.0, use 'ensure_async' from jupyter_core instead", 

206 DeprecationWarning, 

207 stacklevel=2, 

208 ) 

209 return ensure_async(maybe_async) 

210 

211 

212def urlencode_unix_socket_path(socket_path: str) -> str: 

213 """Encodes a UNIX socket path string from a socket path for the `http+unix` URI form.""" 

214 return socket_path.replace("/", "%2F") 

215 

216 

217def urldecode_unix_socket_path(socket_path: str) -> str: 

218 """Decodes a UNIX sock path string from an encoded sock path for the `http+unix` URI form.""" 

219 return socket_path.replace("%2F", "/") 

220 

221 

222def urlencode_unix_socket(socket_path: str) -> str: 

223 """Encodes a UNIX socket URL from a socket path for the `http+unix` URI form.""" 

224 return "http+unix://%s" % urlencode_unix_socket_path(socket_path) 

225 

226 

227def unix_socket_in_use(socket_path: str) -> bool: 

228 """Checks whether a UNIX socket path on disk is in use by attempting to connect to it.""" 

229 if not os.path.exists(socket_path): 

230 return False 

231 

232 try: 

233 sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) 

234 sock.connect(socket_path) 

235 except OSError: 

236 return False 

237 else: 

238 return True 

239 finally: 

240 sock.close() 

241 

242 

243@contextmanager 

244def _request_for_tornado_client( 

245 urlstring: str, method: str = "GET", body: Any = None, headers: Any = None 

246) -> Generator[HTTPRequest, None, None]: 

247 """A utility that provides a context that handles 

248 HTTP, HTTPS, and HTTP+UNIX request. 

249 Creates a tornado HTTPRequest object with a URL 

250 that tornado's HTTPClients can accept. 

251 If the request is made to a unix socket, temporarily 

252 configure the AsyncHTTPClient to resolve the URL 

253 and connect to the proper socket. 

254 """ 

255 parts = urlsplit(urlstring) 

256 if parts.scheme in ["http", "https"]: 

257 pass 

258 elif parts.scheme == "http+unix": 

259 # If unix socket, mimic HTTP. 

260 parts = SplitResult( 

261 scheme="http", 

262 netloc=parts.netloc, 

263 path=parts.path, 

264 query=parts.query, 

265 fragment=parts.fragment, 

266 ) 

267 

268 class UnixSocketResolver(Resolver): 

269 """A resolver that routes HTTP requests to unix sockets 

270 in tornado HTTP clients. 

271 Due to constraints in Tornados' API, the scheme of the 

272 must be `http` (not `http+unix`). Applications should replace 

273 the scheme in URLS before making a request to the HTTP client. 

274 """ 

275 

276 def initialize(self, resolver): 

277 self.resolver = resolver 

278 

279 def close(self): 

280 self.resolver.close() 

281 

282 async def resolve(self, host, port, *args, **kwargs): 

283 return [(socket.AF_UNIX, urldecode_unix_socket_path(host))] 

284 

285 resolver = UnixSocketResolver(resolver=Resolver()) 

286 AsyncHTTPClient.configure(None, resolver=resolver) 

287 else: 

288 msg = "Unknown URL scheme." 

289 raise Exception(msg) 

290 

291 # Yield the request for the given client. 

292 url = urlunsplit(parts) 

293 request = HTTPRequest(url, method=method, body=body, headers=headers, validate_cert=False) 

294 yield request 

295 

296 

297def fetch( 

298 urlstring: str, method: str = "GET", body: Any = None, headers: Any = None 

299) -> HTTPResponse: 

300 """ 

301 Send a HTTP, HTTPS, or HTTP+UNIX request 

302 to a Tornado Web Server. Returns a tornado HTTPResponse. 

303 """ 

304 with _request_for_tornado_client( 

305 urlstring, method=method, body=body, headers=headers 

306 ) as request: 

307 response = HTTPClient(AsyncHTTPClient).fetch(request) 

308 return response 

309 

310 

311async def async_fetch( 

312 urlstring: str, method: str = "GET", body: Any = None, headers: Any = None, io_loop: Any = None 

313) -> HTTPResponse: 

314 """ 

315 Send an asynchronous HTTP, HTTPS, or HTTP+UNIX request 

316 to a Tornado Web Server. Returns a tornado HTTPResponse. 

317 """ 

318 with _request_for_tornado_client( 

319 urlstring, method=method, body=body, headers=headers 

320 ) as request: 

321 response = await AsyncHTTPClient(io_loop).fetch(request) 

322 return response 

323 

324 

325def is_namespace_package(namespace: str) -> bool | None: 

326 """Is the provided namespace a Python Namespace Package (PEP420). 

327 

328 https://www.python.org/dev/peps/pep-0420/#specification 

329 

330 Returns `None` if module is not importable. 

331 

332 """ 

333 # NOTE: using submodule_search_locations because the loader can be None 

334 try: 

335 spec = importlib.util.find_spec(namespace) 

336 except ValueError: # spec is not set - see https://docs.python.org/3/library/importlib.html#importlib.util.find_spec 

337 return None 

338 

339 if not spec: 

340 # e.g. module not installed 

341 return None 

342 return isinstance(spec.submodule_search_locations, _NamespacePath) 

343 

344 

345def filefind(filename: str, path_dirs: Sequence[str]) -> str: 

346 """Find a file by looking through a sequence of paths. 

347 

348 For use in FileFindHandler. 

349 

350 Iterates through a sequence of paths looking for a file and returns 

351 the full, absolute path of the first occurrence of the file. 

352 

353 Absolute paths are not accepted for inputs. 

354 

355 This function does not automatically try any paths, 

356 such as the cwd or the user's home directory. 

357 

358 Parameters 

359 ---------- 

360 filename : str 

361 The filename to look for. Must be a relative path. 

362 path_dirs : sequence of str 

363 The sequence of paths to look in for the file. 

364 Walk through each element and join with ``filename``. 

365 Only after ensuring the path resolves within the directory is it checked for existence. 

366 

367 Returns 

368 ------- 

369 Raises :exc:`OSError` or returns absolute path to file. 

370 """ 

371 file_path = Path(filename) 

372 

373 # If the input is an absolute path, reject it 

374 if file_path.is_absolute(): 

375 msg = f"{filename} is absolute, filefind only accepts relative paths." 

376 raise OSError(msg) 

377 

378 for path_str in path_dirs: 

379 path = Path(path_str).absolute() 

380 test_path = path / file_path 

381 # os.path.abspath resolves '..', but Path.absolute() doesn't 

382 # Path.resolve() does, but traverses symlinks, which we don't want 

383 test_path = Path(os.path.abspath(test_path)) 

384 if not test_path.is_relative_to(path): 

385 # points outside root, e.g. via `filename='../foo'` 

386 continue 

387 # make sure we don't call is_file before we know it's a file within a prefix 

388 # GHSA-hrw6-wg82-cm62 - can leak password hash on windows. 

389 if test_path.is_file(): 

390 return os.path.abspath(test_path) 

391 

392 msg = f"File {filename!r} does not exist in any of the search paths: {path_dirs!r}" 

393 raise OSError(msg) 

394 

395 

396def import_item(name: str) -> Any: 

397 """Import and return ``bar`` given the string ``foo.bar``. 

398 Calling ``bar = import_item("foo.bar")`` is the functional equivalent of 

399 executing the code ``from foo import bar``. 

400 Parameters 

401 ---------- 

402 name : str 

403 The fully qualified name of the module/package being imported. 

404 Returns 

405 ------- 

406 mod : module object 

407 The module that was imported. 

408 """ 

409 

410 parts = name.rsplit(".", 1) 

411 if len(parts) == 2: 

412 # called with 'foo.bar....' 

413 package, obj = parts 

414 module = __import__(package, fromlist=[obj]) 

415 try: 

416 pak = getattr(module, obj) 

417 except AttributeError as e: 

418 raise ImportError("No module named %s" % obj) from e 

419 return pak 

420 else: 

421 # called with un-dotted string 

422 return __import__(parts[0]) 

423 

424 

425class JupyterServerAuthWarning(RuntimeWarning): 

426 """Emitted when authentication configuration issue is detected. 

427 

428 Intended for filtering out expected warnings in tests, including 

429 downstream tests, rather than for users to silence this warning. 

430 """