Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/jupyter_server/utils.py: 28%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

176 statements  

1"""Notebook related utilities""" 

2 

3# Copyright (c) Jupyter Development Team. 

4# Distributed under the terms of the Modified BSD License. 

5from __future__ import annotations 

6 

7import errno 

8import importlib.util 

9import os 

10import socket 

11import sys 

12import warnings 

13from _frozen_importlib_external import _NamespacePath 

14from contextlib import contextmanager 

15from pathlib import Path 

16from typing import Any, Generator, NewType, Sequence 

17from urllib.parse import ( 

18 SplitResult, 

19 quote, 

20 unquote, 

21 urlparse, 

22 urlsplit, 

23 urlunsplit, 

24) 

25from urllib.parse import ( 

26 urljoin as _urljoin, 

27) 

28from urllib.request import pathname2url as _pathname2url 

29 

30from jupyter_core.utils import ensure_async as _ensure_async 

31from packaging.version import Version 

32from tornado.httpclient import AsyncHTTPClient, HTTPClient, HTTPRequest, HTTPResponse 

33from tornado.netutil import Resolver 

34 

35ApiPath = NewType("ApiPath", str) 

36 

37# Re-export 

38urljoin = _urljoin 

39pathname2url = _pathname2url 

40ensure_async = _ensure_async 

41 

42 

43def url_path_join(*pieces: str) -> str: 

44 """Join components of url into a relative url 

45 

46 Use to prevent double slash when joining subpath. This will leave the 

47 initial and final / in place 

48 """ 

49 initial = pieces[0].startswith("/") 

50 final = pieces[-1].endswith("/") 

51 stripped = [s.strip("/") for s in pieces] 

52 result = "/".join(s for s in stripped if s) 

53 if initial: 

54 result = "/" + result 

55 if final: 

56 result = result + "/" 

57 if result == "//": 

58 result = "/" 

59 return result 

60 

61 

62def url_is_absolute(url: str) -> bool: 

63 """Determine whether a given URL is absolute""" 

64 return urlparse(url).path.startswith("/") 

65 

66 

67def path2url(path: str) -> str: 

68 """Convert a local file path to a URL""" 

69 pieces = [quote(p) for p in path.split(os.sep)] 

70 # preserve trailing / 

71 if pieces[-1] == "": 

72 pieces[-1] = "/" 

73 url = url_path_join(*pieces) 

74 return url 

75 

76 

77def url2path(url: str) -> str: 

78 """Convert a URL to a local file path""" 

79 pieces = [unquote(p) for p in url.split("/")] 

80 path = os.path.join(*pieces) 

81 return path 

82 

83 

84def url_escape(path: str) -> str: 

85 """Escape special characters in a URL path 

86 

87 Turns '/foo bar/' into '/foo%20bar/' 

88 """ 

89 parts = path.split("/") 

90 return "/".join([quote(p) for p in parts]) 

91 

92 

93def url_unescape(path: str) -> str: 

94 """Unescape special characters in a URL path 

95 

96 Turns '/foo%20bar/' into '/foo bar/' 

97 """ 

98 return "/".join([unquote(p) for p in path.split("/")]) 

99 

100 

101def samefile_simple(path: str, other_path: str) -> bool: 

102 """ 

103 Fill in for os.path.samefile when it is unavailable (Windows+py2). 

104 

105 Do a case-insensitive string comparison in this case 

106 plus comparing the full stat result (including times) 

107 because Windows + py2 doesn't support the stat fields 

108 needed for identifying if it's the same file (st_ino, st_dev). 

109 

110 Only to be used if os.path.samefile is not available. 

111 

112 Parameters 

113 ---------- 

114 path : str 

115 representing a path to a file 

116 other_path : str 

117 representing a path to another file 

118 

119 Returns 

120 ------- 

121 same: Boolean that is True if both path and other path are the same 

122 """ 

123 path_stat = os.stat(path) 

124 other_path_stat = os.stat(other_path) 

125 return path.lower() == other_path.lower() and path_stat == other_path_stat 

126 

127 

128def to_os_path(path: ApiPath, root: str = "") -> str: 

129 """Convert an API path to a filesystem path 

130 

131 If given, root will be prepended to the path. 

132 root must be a filesystem path already. 

133 """ 

134 parts = str(path).strip("/").split("/") 

135 parts = [p for p in parts if p != ""] # remove duplicate splits 

136 path_ = os.path.join(root, *parts) 

137 return os.path.normpath(path_) 

138 

139 

140def to_api_path(os_path: str, root: str = "") -> ApiPath: 

141 """Convert a filesystem path to an API path 

142 

143 If given, root will be removed from the path. 

144 root must be a filesystem path already. 

145 """ 

146 if os_path.startswith(root): 

147 os_path = os_path[len(root) :] 

148 parts = os_path.strip(os.path.sep).split(os.path.sep) 

149 parts = [p for p in parts if p != ""] # remove duplicate splits 

150 path = "/".join(parts) 

151 return ApiPath(path) 

152 

153 

154def check_version(v: str, check: str) -> bool: 

155 """check version string v >= check 

156 

157 If dev/prerelease tags result in TypeError for string-number comparison, 

158 it is assumed that the dependency is satisfied. 

159 Users on dev branches are responsible for keeping their own packages up to date. 

160 """ 

161 try: 

162 return bool(Version(v) >= Version(check)) 

163 except TypeError: 

164 return True 

165 

166 

167# Copy of IPython.utils.process.check_pid: 

168 

169 

170def _check_pid_win32(pid: int) -> bool: 

171 import ctypes 

172 

173 # OpenProcess returns 0 if no such process (of ours) exists 

174 # positive int otherwise 

175 return bool(ctypes.windll.kernel32.OpenProcess(1, 0, pid)) # type:ignore[attr-defined] 

176 

177 

178def _check_pid_posix(pid: int) -> bool: 

179 """Copy of IPython.utils.process.check_pid""" 

180 try: 

181 os.kill(pid, 0) 

182 except OSError as err: 

183 if err.errno == errno.ESRCH: 

184 return False 

185 elif err.errno == errno.EPERM: 

186 # Don't have permission to signal the process - probably means it exists 

187 return True 

188 raise 

189 else: 

190 return True 

191 

192 

193if sys.platform == "win32": 

194 check_pid = _check_pid_win32 

195else: 

196 check_pid = _check_pid_posix 

197 

198 

199async def run_sync_in_loop(maybe_async): 

200 """**DEPRECATED**: Use ``ensure_async`` from jupyter_core instead.""" 

201 warnings.warn( 

202 "run_sync_in_loop is deprecated since Jupyter Server 2.0, use 'ensure_async' from jupyter_core instead", 

203 DeprecationWarning, 

204 stacklevel=2, 

205 ) 

206 return ensure_async(maybe_async) 

207 

208 

209def urlencode_unix_socket_path(socket_path: str) -> str: 

210 """Encodes a UNIX socket path string from a socket path for the `http+unix` URI form.""" 

211 return socket_path.replace("/", "%2F") 

212 

213 

214def urldecode_unix_socket_path(socket_path: str) -> str: 

215 """Decodes a UNIX sock path string from an encoded sock path for the `http+unix` URI form.""" 

216 return socket_path.replace("%2F", "/") 

217 

218 

219def urlencode_unix_socket(socket_path: str) -> str: 

220 """Encodes a UNIX socket URL from a socket path for the `http+unix` URI form.""" 

221 return "http+unix://%s" % urlencode_unix_socket_path(socket_path) 

222 

223 

224def unix_socket_in_use(socket_path: str) -> bool: 

225 """Checks whether a UNIX socket path on disk is in use by attempting to connect to it.""" 

226 if not os.path.exists(socket_path): 

227 return False 

228 

229 try: 

230 sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) 

231 sock.connect(socket_path) 

232 except OSError: 

233 return False 

234 else: 

235 return True 

236 finally: 

237 sock.close() 

238 

239 

240@contextmanager 

241def _request_for_tornado_client( 

242 urlstring: str, method: str = "GET", body: Any = None, headers: Any = None 

243) -> Generator[HTTPRequest, None, None]: 

244 """A utility that provides a context that handles 

245 HTTP, HTTPS, and HTTP+UNIX request. 

246 Creates a tornado HTTPRequest object with a URL 

247 that tornado's HTTPClients can accept. 

248 If the request is made to a unix socket, temporarily 

249 configure the AsyncHTTPClient to resolve the URL 

250 and connect to the proper socket. 

251 """ 

252 parts = urlsplit(urlstring) 

253 if parts.scheme in ["http", "https"]: 

254 pass 

255 elif parts.scheme == "http+unix": 

256 # If unix socket, mimic HTTP. 

257 parts = SplitResult( 

258 scheme="http", 

259 netloc=parts.netloc, 

260 path=parts.path, 

261 query=parts.query, 

262 fragment=parts.fragment, 

263 ) 

264 

265 class UnixSocketResolver(Resolver): 

266 """A resolver that routes HTTP requests to unix sockets 

267 in tornado HTTP clients. 

268 Due to constraints in Tornados' API, the scheme of the 

269 must be `http` (not `http+unix`). Applications should replace 

270 the scheme in URLS before making a request to the HTTP client. 

271 """ 

272 

273 def initialize(self, resolver): 

274 self.resolver = resolver 

275 

276 def close(self): 

277 self.resolver.close() 

278 

279 async def resolve(self, host, port, *args, **kwargs): 

280 return [(socket.AF_UNIX, urldecode_unix_socket_path(host))] 

281 

282 resolver = UnixSocketResolver(resolver=Resolver()) 

283 AsyncHTTPClient.configure(None, resolver=resolver) 

284 else: 

285 msg = "Unknown URL scheme." 

286 raise Exception(msg) 

287 

288 # Yield the request for the given client. 

289 url = urlunsplit(parts) 

290 request = HTTPRequest(url, method=method, body=body, headers=headers, validate_cert=False) 

291 yield request 

292 

293 

294def fetch( 

295 urlstring: str, method: str = "GET", body: Any = None, headers: Any = None 

296) -> HTTPResponse: 

297 """ 

298 Send a HTTP, HTTPS, or HTTP+UNIX request 

299 to a Tornado Web Server. Returns a tornado HTTPResponse. 

300 """ 

301 with _request_for_tornado_client( 

302 urlstring, method=method, body=body, headers=headers 

303 ) as request: 

304 response = HTTPClient(AsyncHTTPClient).fetch(request) 

305 return response 

306 

307 

308async def async_fetch( 

309 urlstring: str, method: str = "GET", body: Any = None, headers: Any = None, io_loop: Any = None 

310) -> HTTPResponse: 

311 """ 

312 Send an asynchronous HTTP, HTTPS, or HTTP+UNIX request 

313 to a Tornado Web Server. Returns a tornado HTTPResponse. 

314 """ 

315 with _request_for_tornado_client( 

316 urlstring, method=method, body=body, headers=headers 

317 ) as request: 

318 response = await AsyncHTTPClient(io_loop).fetch(request) 

319 return response 

320 

321 

322def is_namespace_package(namespace: str) -> bool | None: 

323 """Is the provided namespace a Python Namespace Package (PEP420). 

324 

325 https://www.python.org/dev/peps/pep-0420/#specification 

326 

327 Returns `None` if module is not importable. 

328 

329 """ 

330 # NOTE: using submodule_search_locations because the loader can be None 

331 try: 

332 spec = importlib.util.find_spec(namespace) 

333 except ValueError: # spec is not set - see https://docs.python.org/3/library/importlib.html#importlib.util.find_spec 

334 return None 

335 

336 if not spec: 

337 # e.g. module not installed 

338 return None 

339 return isinstance(spec.submodule_search_locations, _NamespacePath) 

340 

341 

342def filefind(filename: str, path_dirs: Sequence[str]) -> str: 

343 """Find a file by looking through a sequence of paths. 

344 

345 For use in FileFindHandler. 

346 

347 Iterates through a sequence of paths looking for a file and returns 

348 the full, absolute path of the first occurrence of the file. 

349 

350 Absolute paths are not accepted for inputs. 

351 

352 This function does not automatically try any paths, 

353 such as the cwd or the user's home directory. 

354 

355 Parameters 

356 ---------- 

357 filename : str 

358 The filename to look for. Must be a relative path. 

359 path_dirs : sequence of str 

360 The sequence of paths to look in for the file. 

361 Walk through each element and join with ``filename``. 

362 Only after ensuring the path resolves within the directory is it checked for existence. 

363 

364 Returns 

365 ------- 

366 Raises :exc:`OSError` or returns absolute path to file. 

367 """ 

368 file_path = Path(filename) 

369 

370 # If the input is an absolute path, reject it 

371 if file_path.is_absolute(): 

372 msg = f"{filename} is absolute, filefind only accepts relative paths." 

373 raise OSError(msg) 

374 

375 for path_str in path_dirs: 

376 path = Path(path_str).absolute() 

377 test_path = path / file_path 

378 # os.path.abspath resolves '..', but Path.absolute() doesn't 

379 # Path.resolve() does, but traverses symlinks, which we don't want 

380 test_path = Path(os.path.abspath(test_path)) 

381 if sys.version_info >= (3, 9): 

382 if not test_path.is_relative_to(path): 

383 # points outside root, e.g. via `filename='../foo'` 

384 continue 

385 else: 

386 # is_relative_to is new in 3.9 

387 try: 

388 test_path.relative_to(path) 

389 except ValueError: 

390 # points outside root, e.g. via `filename='../foo'` 

391 continue 

392 # make sure we don't call is_file before we know it's a file within a prefix 

393 # GHSA-hrw6-wg82-cm62 - can leak password hash on windows. 

394 if test_path.is_file(): 

395 return os.path.abspath(test_path) 

396 

397 msg = f"File {filename!r} does not exist in any of the search paths: {path_dirs!r}" 

398 raise OSError(msg) 

399 

400 

401def import_item(name: str) -> Any: 

402 """Import and return ``bar`` given the string ``foo.bar``. 

403 Calling ``bar = import_item("foo.bar")`` is the functional equivalent of 

404 executing the code ``from foo import bar``. 

405 Parameters 

406 ---------- 

407 name : str 

408 The fully qualified name of the module/package being imported. 

409 Returns 

410 ------- 

411 mod : module object 

412 The module that was imported. 

413 """ 

414 

415 parts = name.rsplit(".", 1) 

416 if len(parts) == 2: 

417 # called with 'foo.bar....' 

418 package, obj = parts 

419 module = __import__(package, fromlist=[obj]) 

420 try: 

421 pak = getattr(module, obj) 

422 except AttributeError as e: 

423 raise ImportError("No module named %s" % obj) from e 

424 return pak 

425 else: 

426 # called with un-dotted string 

427 return __import__(parts[0]) 

428 

429 

430class JupyterServerAuthWarning(RuntimeWarning): 

431 """Emitted when authentication configuration issue is detected. 

432 

433 Intended for filtering out expected warnings in tests, including 

434 downstream tests, rather than for users to silence this warning. 

435 """