Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/jupyter_server/base/handlers.py: 33%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

561 statements  

1"""Base Tornado handlers for the Jupyter server.""" 

2 

3# Copyright (c) Jupyter Development Team. 

4# Distributed under the terms of the Modified BSD License. 

5from __future__ import annotations 

6 

7import functools 

8import inspect 

9import ipaddress 

10import json 

11import mimetypes 

12import os 

13import re 

14import types 

15import warnings 

16from http.client import responses 

17from logging import Logger 

18from typing import TYPE_CHECKING, Any, Awaitable, Coroutine, Sequence, cast 

19from urllib.parse import urlparse 

20 

21import prometheus_client 

22from jinja2 import TemplateNotFound 

23from jupyter_core.paths import is_hidden 

24from tornado import web 

25from tornado.log import app_log 

26from traitlets.config import Application 

27 

28import jupyter_server 

29from jupyter_server import CallContext 

30from jupyter_server._sysinfo import get_sys_info 

31from jupyter_server._tz import utcnow 

32from jupyter_server.auth.decorator import allow_unauthenticated, authorized 

33from jupyter_server.auth.identity import User 

34from jupyter_server.i18n import combine_translations 

35from jupyter_server.services.security import csp_report_uri 

36from jupyter_server.utils import ( 

37 ensure_async, 

38 filefind, 

39 url_escape, 

40 url_is_absolute, 

41 url_path_join, 

42 urldecode_unix_socket_path, 

43) 

44 

45if TYPE_CHECKING: 

46 from jupyter_client.kernelspec import KernelSpecManager 

47 from jupyter_events import EventLogger 

48 from jupyter_server_terminals.terminalmanager import TerminalManager 

49 from tornado.concurrent import Future 

50 

51 from jupyter_server.auth.authorizer import Authorizer 

52 from jupyter_server.auth.identity import IdentityProvider 

53 from jupyter_server.serverapp import ServerApp 

54 from jupyter_server.services.config.manager import ConfigManager 

55 from jupyter_server.services.contents.manager import ContentsManager 

56 from jupyter_server.services.kernels.kernelmanager import AsyncMappingKernelManager 

57 from jupyter_server.services.sessions.sessionmanager import SessionManager 

58 

59# ----------------------------------------------------------------------------- 

60# Top-level handlers 

61# ----------------------------------------------------------------------------- 

62 

63_sys_info_cache = None 

64 

65 

66def json_sys_info(): 

67 """Get sys info as json.""" 

68 global _sys_info_cache # noqa: PLW0603 

69 if _sys_info_cache is None: 

70 _sys_info_cache = json.dumps(get_sys_info()) 

71 return _sys_info_cache 

72 

73 

74def log() -> Logger: 

75 """Get the application log.""" 

76 if Application.initialized(): 

77 return cast(Logger, Application.instance().log) 

78 else: 

79 return app_log 

80 

81 

82class AuthenticatedHandler(web.RequestHandler): 

83 """A RequestHandler with an authenticated user.""" 

84 

85 @property 

86 def base_url(self) -> str: 

87 return cast(str, self.settings.get("base_url", "/")) 

88 

89 @property 

90 def content_security_policy(self) -> str: 

91 """The default Content-Security-Policy header 

92 

93 Can be overridden by defining Content-Security-Policy in settings['headers'] 

94 """ 

95 if "Content-Security-Policy" in self.settings.get("headers", {}): 

96 # user-specified, don't override 

97 return cast(str, self.settings["headers"]["Content-Security-Policy"]) 

98 

99 return "; ".join( 

100 [ 

101 "frame-ancestors 'self'", 

102 # Make sure the report-uri is relative to the base_url 

103 "report-uri " 

104 + self.settings.get("csp_report_uri", url_path_join(self.base_url, csp_report_uri)), 

105 ] 

106 ) 

107 

108 def set_default_headers(self) -> None: 

109 """Set the default headers.""" 

110 headers = {} 

111 headers["X-Content-Type-Options"] = "nosniff" 

112 headers.update(self.settings.get("headers", {})) 

113 

114 headers["Content-Security-Policy"] = self.content_security_policy 

115 

116 # Allow for overriding headers 

117 for header_name, value in headers.items(): 

118 try: 

119 self.set_header(header_name, value) 

120 except Exception as e: 

121 # tornado raise Exception (not a subclass) 

122 # if method is unsupported (websocket and Access-Control-Allow-Origin 

123 # for example, so just ignore) 

124 self.log.exception( # type:ignore[attr-defined] 

125 "Could not set default headers: %s", e 

126 ) 

127 

128 @property 

129 def cookie_name(self) -> str: 

130 warnings.warn( 

131 """JupyterHandler.login_handler is deprecated in 2.0, 

132 use JupyterHandler.identity_provider. 

133 """, 

134 DeprecationWarning, 

135 stacklevel=2, 

136 ) 

137 return self.identity_provider.get_cookie_name(self) 

138 

139 def force_clear_cookie(self, name: str, path: str = "/", domain: str | None = None) -> None: 

140 """Force a cookie clear.""" 

141 warnings.warn( 

142 """JupyterHandler.login_handler is deprecated in 2.0, 

143 use JupyterHandler.identity_provider. 

144 """, 

145 DeprecationWarning, 

146 stacklevel=2, 

147 ) 

148 self.identity_provider._force_clear_cookie(self, name, path=path, domain=domain) 

149 

150 def clear_login_cookie(self) -> None: 

151 """Clear a login cookie.""" 

152 warnings.warn( 

153 """JupyterHandler.login_handler is deprecated in 2.0, 

154 use JupyterHandler.identity_provider. 

155 """, 

156 DeprecationWarning, 

157 stacklevel=2, 

158 ) 

159 self.identity_provider.clear_login_cookie(self) 

160 

161 def get_current_user(self) -> str: 

162 """Get the current user.""" 

163 clsname = self.__class__.__name__ 

164 msg = ( 

165 f"Calling `{clsname}.get_current_user()` directly is deprecated in jupyter-server 2.0." 

166 " Use `self.current_user` instead (works in all versions)." 

167 ) 

168 if hasattr(self, "_jupyter_current_user"): 

169 # backward-compat: return _jupyter_current_user 

170 warnings.warn( 

171 msg, 

172 DeprecationWarning, 

173 stacklevel=2, 

174 ) 

175 return cast(str, self._jupyter_current_user) 

176 # haven't called get_user in prepare, raise 

177 raise RuntimeError(msg) 

178 

179 def skip_check_origin(self) -> bool: 

180 """Ask my login_handler if I should skip the origin_check 

181 

182 For example: in the default LoginHandler, if a request is token-authenticated, 

183 origin checking should be skipped. 

184 """ 

185 if self.request.method == "OPTIONS": 

186 # no origin-check on options requests, which are used to check origins! 

187 return True 

188 return not self.identity_provider.should_check_origin(self) 

189 

190 @property 

191 def token_authenticated(self) -> bool: 

192 """Have I been authenticated with a token?""" 

193 return self.identity_provider.is_token_authenticated(self) 

194 

195 @property 

196 def logged_in(self) -> bool: 

197 """Is a user currently logged in?""" 

198 user = self.current_user 

199 return bool(user and user != "anonymous") 

200 

201 @property 

202 def login_handler(self) -> Any: 

203 """Return the login handler for this application, if any.""" 

204 warnings.warn( 

205 """JupyterHandler.login_handler is deprecated in 2.0, 

206 use JupyterHandler.identity_provider. 

207 """, 

208 DeprecationWarning, 

209 stacklevel=2, 

210 ) 

211 return self.identity_provider.login_handler_class 

212 

213 @property 

214 def token(self) -> str | None: 

215 """Return the login token for this application, if any.""" 

216 return self.identity_provider.token 

217 

218 @property 

219 def login_available(self) -> bool: 

220 """May a user proceed to log in? 

221 

222 This returns True if login capability is available, irrespective of 

223 whether the user is already logged in or not. 

224 

225 """ 

226 return cast(bool, self.identity_provider.login_available) 

227 

228 @property 

229 def authorizer(self) -> Authorizer: 

230 if "authorizer" not in self.settings: 

231 warnings.warn( 

232 "The Tornado web application does not have an 'authorizer' defined " 

233 "in its settings. In future releases of jupyter_server, this will " 

234 "be a required key for all subclasses of `JupyterHandler`. For an " 

235 "example, see the jupyter_server source code for how to " 

236 "add an authorizer to the tornado settings: " 

237 "https://github.com/jupyter-server/jupyter_server/blob/" 

238 "653740cbad7ce0c8a8752ce83e4d3c2c754b13cb/jupyter_server/serverapp.py" 

239 "#L234-L256", 

240 stacklevel=2, 

241 ) 

242 from jupyter_server.auth import AllowAllAuthorizer 

243 

244 self.settings["authorizer"] = AllowAllAuthorizer( 

245 config=self.settings.get("config", None), 

246 identity_provider=self.identity_provider, 

247 ) 

248 

249 return cast("Authorizer", self.settings.get("authorizer")) 

250 

251 @property 

252 def identity_provider(self) -> IdentityProvider: 

253 if "identity_provider" not in self.settings: 

254 warnings.warn( 

255 "The Tornado web application does not have an 'identity_provider' defined " 

256 "in its settings. In future releases of jupyter_server, this will " 

257 "be a required key for all subclasses of `JupyterHandler`. For an " 

258 "example, see the jupyter_server source code for how to " 

259 "add an identity provider to the tornado settings: " 

260 "https://github.com/jupyter-server/jupyter_server/blob/v2.0.0/" 

261 "jupyter_server/serverapp.py#L242", 

262 stacklevel=2, 

263 ) 

264 from jupyter_server.auth import IdentityProvider 

265 

266 # no identity provider set, load default 

267 self.settings["identity_provider"] = IdentityProvider( 

268 config=self.settings.get("config", None) 

269 ) 

270 return cast("IdentityProvider", self.settings["identity_provider"]) 

271 

272 

273class JupyterHandler(AuthenticatedHandler): 

274 """Jupyter-specific extensions to authenticated handling 

275 

276 Mostly property shortcuts to Jupyter-specific settings. 

277 """ 

278 

279 @property 

280 def config(self) -> dict[str, Any] | None: 

281 return cast("dict[str, Any] | None", self.settings.get("config", None)) 

282 

283 @property 

284 def log(self) -> Logger: 

285 """use the Jupyter log by default, falling back on tornado's logger""" 

286 return log() 

287 

288 @property 

289 def jinja_template_vars(self) -> dict[str, Any]: 

290 """User-supplied values to supply to jinja templates.""" 

291 return cast("dict[str, Any]", self.settings.get("jinja_template_vars", {})) 

292 

293 @property 

294 def serverapp(self) -> ServerApp | None: 

295 return cast("ServerApp | None", self.settings["serverapp"]) 

296 

297 # --------------------------------------------------------------- 

298 # URLs 

299 # --------------------------------------------------------------- 

300 

301 @property 

302 def version_hash(self) -> str: 

303 """The version hash to use for cache hints for static files""" 

304 return cast(str, self.settings.get("version_hash", "")) 

305 

306 @property 

307 def mathjax_url(self) -> str: 

308 url = cast(str, self.settings.get("mathjax_url", "")) 

309 if not url or url_is_absolute(url): 

310 return url 

311 return url_path_join(self.base_url, url) 

312 

313 @property 

314 def mathjax_config(self) -> str: 

315 return cast(str, self.settings.get("mathjax_config", "TeX-AMS-MML_HTMLorMML-full,Safe")) 

316 

317 @property 

318 def default_url(self) -> str: 

319 return cast(str, self.settings.get("default_url", "")) 

320 

321 @property 

322 def ws_url(self) -> str: 

323 return cast(str, self.settings.get("websocket_url", "")) 

324 

325 @property 

326 def contents_js_source(self) -> str: 

327 self.log.debug( 

328 "Using contents: %s", 

329 self.settings.get("contents_js_source", "services/contents"), 

330 ) 

331 return cast(str, self.settings.get("contents_js_source", "services/contents")) 

332 

333 # --------------------------------------------------------------- 

334 # Manager objects 

335 # --------------------------------------------------------------- 

336 

337 @property 

338 def kernel_manager(self) -> AsyncMappingKernelManager: 

339 return cast("AsyncMappingKernelManager", self.settings["kernel_manager"]) 

340 

341 @property 

342 def contents_manager(self) -> ContentsManager: 

343 return cast("ContentsManager", self.settings["contents_manager"]) 

344 

345 @property 

346 def session_manager(self) -> SessionManager: 

347 return cast("SessionManager", self.settings["session_manager"]) 

348 

349 @property 

350 def terminal_manager(self) -> TerminalManager: 

351 return cast("TerminalManager", self.settings["terminal_manager"]) 

352 

353 @property 

354 def kernel_spec_manager(self) -> KernelSpecManager: 

355 return cast("KernelSpecManager", self.settings["kernel_spec_manager"]) 

356 

357 @property 

358 def config_manager(self) -> ConfigManager: 

359 return cast("ConfigManager", self.settings["config_manager"]) 

360 

361 @property 

362 def event_logger(self) -> EventLogger: 

363 return cast("EventLogger", self.settings["event_logger"]) 

364 

365 # --------------------------------------------------------------- 

366 # CORS 

367 # --------------------------------------------------------------- 

368 

369 @property 

370 def allow_origin(self) -> str: 

371 """Normal Access-Control-Allow-Origin""" 

372 return cast(str, self.settings.get("allow_origin", "")) 

373 

374 @property 

375 def allow_origin_pat(self) -> str | None: 

376 """Regular expression version of allow_origin""" 

377 return cast("str | None", self.settings.get("allow_origin_pat", None)) 

378 

379 @property 

380 def allow_credentials(self) -> bool: 

381 """Whether to set Access-Control-Allow-Credentials""" 

382 return cast(bool, self.settings.get("allow_credentials", False)) 

383 

384 def set_default_headers(self) -> None: 

385 """Add CORS headers, if defined""" 

386 super().set_default_headers() 

387 

388 def set_cors_headers(self) -> None: 

389 """Add CORS headers, if defined 

390 

391 Now that current_user is async (jupyter-server 2.0), 

392 must be called at the end of prepare(), instead of in set_default_headers. 

393 """ 

394 if self.allow_origin: 

395 self.set_header("Access-Control-Allow-Origin", self.allow_origin) 

396 elif self.allow_origin_pat: 

397 origin = self.get_origin() 

398 if origin and re.match(self.allow_origin_pat, origin): 

399 self.set_header("Access-Control-Allow-Origin", origin) 

400 elif self.token_authenticated and "Access-Control-Allow-Origin" not in self.settings.get( 

401 "headers", {} 

402 ): 

403 # allow token-authenticated requests cross-origin by default. 

404 # only apply this exception if allow-origin has not been specified. 

405 self.set_header("Access-Control-Allow-Origin", self.request.headers.get("Origin", "")) 

406 

407 if self.allow_credentials: 

408 self.set_header("Access-Control-Allow-Credentials", "true") 

409 

410 def set_attachment_header(self, filename: str) -> None: 

411 """Set Content-Disposition: attachment header 

412 

413 As a method to ensure handling of filename encoding 

414 """ 

415 escaped_filename = url_escape(filename) 

416 self.set_header( 

417 "Content-Disposition", 

418 f"attachment; filename*=utf-8''{escaped_filename}", 

419 ) 

420 

421 def get_origin(self) -> str | None: 

422 # Handle WebSocket Origin naming convention differences 

423 # The difference between version 8 and 13 is that in 8 the 

424 # client sends a "Sec-Websocket-Origin" header and in 13 it's 

425 # simply "Origin". 

426 if "Origin" in self.request.headers: 

427 origin = self.request.headers.get("Origin") 

428 else: 

429 origin = self.request.headers.get("Sec-Websocket-Origin", None) 

430 return origin 

431 

432 # origin_to_satisfy_tornado is present because tornado requires 

433 # check_origin to take an origin argument, but we don't use it 

434 def check_origin(self, origin_to_satisfy_tornado: str = "") -> bool: 

435 """Check Origin for cross-site API requests, including websockets 

436 

437 Copied from WebSocket with changes: 

438 

439 - allow unspecified host/origin (e.g. scripts) 

440 - allow token-authenticated requests 

441 """ 

442 if self.allow_origin == "*" or self.skip_check_origin(): 

443 return True 

444 

445 host = self.request.headers.get("Host") 

446 origin = self.request.headers.get("Origin") 

447 

448 # If no header is provided, let the request through. 

449 # Origin can be None for: 

450 # - same-origin (IE, Firefox) 

451 # - Cross-site POST form (IE, Firefox) 

452 # - Scripts 

453 # The cross-site POST (XSRF) case is handled by tornado's xsrf_token 

454 if origin is None or host is None: 

455 return True 

456 

457 origin = origin.lower() 

458 origin_host = urlparse(origin).netloc 

459 

460 # OK if origin matches host 

461 if origin_host == host: 

462 return True 

463 

464 # Check CORS headers 

465 if self.allow_origin: 

466 allow = bool(self.allow_origin == origin) 

467 elif self.allow_origin_pat: 

468 allow = bool(re.match(self.allow_origin_pat, origin)) 

469 else: 

470 # No CORS headers deny the request 

471 allow = False 

472 if not allow: 

473 self.log.warning( 

474 "Blocking Cross Origin API request for %s. Origin: %s, Host: %s", 

475 self.request.path, 

476 origin, 

477 host, 

478 ) 

479 return allow 

480 

481 def check_referer(self) -> bool: 

482 """Check Referer for cross-site requests. 

483 Disables requests to certain endpoints with 

484 external or missing Referer. 

485 If set, allow_origin settings are applied to the Referer 

486 to whitelist specific cross-origin sites. 

487 Used on GET for api endpoints and /files/ 

488 to block cross-site inclusion (XSSI). 

489 """ 

490 if self.allow_origin == "*" or self.skip_check_origin(): 

491 return True 

492 

493 host = self.request.headers.get("Host") 

494 referer = self.request.headers.get("Referer") 

495 

496 if not host: 

497 self.log.warning("Blocking request with no host") 

498 return False 

499 if not referer: 

500 self.log.warning("Blocking request with no referer") 

501 return False 

502 

503 referer_url = urlparse(referer) 

504 referer_host = referer_url.netloc 

505 if referer_host == host: 

506 return True 

507 

508 # apply cross-origin checks to Referer: 

509 origin = f"{referer_url.scheme}://{referer_url.netloc}" 

510 if self.allow_origin: 

511 allow = self.allow_origin == origin 

512 elif self.allow_origin_pat: 

513 allow = bool(re.match(self.allow_origin_pat, origin)) 

514 else: 

515 # No CORS settings, deny the request 

516 allow = False 

517 

518 if not allow: 

519 self.log.warning( 

520 "Blocking Cross Origin request for %s. Referer: %s, Host: %s", 

521 self.request.path, 

522 origin, 

523 host, 

524 ) 

525 return allow 

526 

527 def check_xsrf_cookie(self) -> None: 

528 """Bypass xsrf cookie checks when token-authenticated""" 

529 if not hasattr(self, "_jupyter_current_user"): 

530 # Called too early, will be checked later 

531 return None 

532 if self.token_authenticated or self.settings.get("disable_check_xsrf", False): 

533 # Token-authenticated requests do not need additional XSRF-check 

534 # Servers without authentication are vulnerable to XSRF 

535 return None 

536 try: 

537 return super().check_xsrf_cookie() 

538 except web.HTTPError as e: 

539 if self.request.method in {"GET", "HEAD"}: 

540 # Consider Referer a sufficient cross-origin check for GET requests 

541 if not self.check_referer(): 

542 referer = self.request.headers.get("Referer") 

543 if referer: 

544 msg = f"Blocking Cross Origin request from {referer}." 

545 else: 

546 msg = "Blocking request from unknown origin" 

547 raise web.HTTPError(403, msg) from e 

548 else: 

549 raise 

550 

551 def check_host(self) -> bool: 

552 """Check the host header if remote access disallowed. 

553 

554 Returns True if the request should continue, False otherwise. 

555 """ 

556 if self.settings.get("allow_remote_access", False): 

557 return True 

558 

559 # Remove port (e.g. ':8888') from host 

560 match = re.match(r"^(.*?)(:\d+)?$", self.request.host) 

561 assert match is not None 

562 host = match.group(1) 

563 

564 # Browsers format IPv6 addresses like [::1]; we need to remove the [] 

565 if host.startswith("[") and host.endswith("]"): 

566 host = host[1:-1] 

567 

568 # UNIX socket handling 

569 check_host = urldecode_unix_socket_path(host) 

570 if check_host.startswith("/") and os.path.exists(check_host): 

571 allow = True 

572 else: 

573 try: 

574 addr = ipaddress.ip_address(host) 

575 except ValueError: 

576 # Not an IP address: check against hostnames 

577 allow = host in self.settings.get("local_hostnames", ["localhost"]) 

578 else: 

579 allow = addr.is_loopback 

580 

581 if not allow: 

582 self.log.warning( 

583 ( 

584 "Blocking request with non-local 'Host' %s (%s). " 

585 "If the server should be accessible at that name, " 

586 "set ServerApp.allow_remote_access to disable the check." 

587 ), 

588 host, 

589 self.request.host, 

590 ) 

591 return allow 

592 

593 async def prepare(self, *, _redirect_to_login=True) -> Awaitable[None] | None: # type:ignore[override] 

594 """Prepare a response.""" 

595 # Set the current Jupyter Handler context variable. 

596 CallContext.set(CallContext.JUPYTER_HANDLER, self) 

597 

598 if not self.check_host(): 

599 self.current_user = self._jupyter_current_user = None 

600 raise web.HTTPError(403) 

601 

602 from jupyter_server.auth import IdentityProvider 

603 

604 mod_obj = inspect.getmodule(self.get_current_user) 

605 assert mod_obj is not None 

606 user: User | None = None 

607 

608 if type(self.identity_provider) is IdentityProvider and mod_obj.__name__ != __name__: 

609 # check for overridden get_current_user + default IdentityProvider 

610 # deprecated way to override auth (e.g. JupyterHub < 3.0) 

611 # allow deprecated, overridden get_current_user 

612 warnings.warn( 

613 "Overriding JupyterHandler.get_current_user is deprecated in jupyter-server 2.0." 

614 " Use an IdentityProvider class.", 

615 DeprecationWarning, 

616 stacklevel=1, 

617 ) 

618 user = User(self.get_current_user()) 

619 else: 

620 _user = self.identity_provider.get_user(self) 

621 if isinstance(_user, Awaitable): 

622 # IdentityProvider.get_user _may_ be async 

623 _user = await _user 

624 user = _user 

625 

626 # self.current_user for tornado's @web.authenticated 

627 # self._jupyter_current_user for backward-compat in deprecated get_current_user calls 

628 # and our own private checks for whether .current_user has been set 

629 self.current_user = self._jupyter_current_user = user 

630 # complete initial steps which require auth to resolve first: 

631 self.set_cors_headers() 

632 if self.request.method not in {"GET", "HEAD", "OPTIONS"}: 

633 self.check_xsrf_cookie() 

634 

635 if not self.settings.get("allow_unauthenticated_access", False): 

636 if not self.request.method: 

637 raise HTTPError(403) 

638 method = getattr(self, self.request.method.lower()) 

639 if not getattr(method, "__allow_unauthenticated", False): 

640 if _redirect_to_login: 

641 # reuse `web.authenticated` logic, which redirects to the login 

642 # page on GET and HEAD and otherwise raises 403 

643 return web.authenticated(lambda _: super().prepare())(self) 

644 else: 

645 # raise 403 if user is not known without redirecting to login page 

646 user = self.current_user 

647 if user is None: 

648 self.log.warning( 

649 f"Couldn't authenticate {self.__class__.__name__} connection" 

650 ) 

651 raise web.HTTPError(403) 

652 

653 return super().prepare() 

654 

655 # --------------------------------------------------------------- 

656 # template rendering 

657 # --------------------------------------------------------------- 

658 

659 def get_template(self, name): 

660 """Return the jinja template object for a given name""" 

661 return self.settings["jinja2_env"].get_template(name) 

662 

663 def render_template(self, name, **ns): 

664 """Render a template by name.""" 

665 ns.update(self.template_namespace) 

666 template = self.get_template(name) 

667 return template.render(**ns) 

668 

669 @property 

670 def template_namespace(self) -> dict[str, Any]: 

671 return dict( 

672 base_url=self.base_url, 

673 default_url=self.default_url, 

674 ws_url=self.ws_url, 

675 logged_in=self.logged_in, 

676 allow_password_change=getattr(self.identity_provider, "allow_password_change", False), 

677 auth_enabled=self.identity_provider.auth_enabled, 

678 login_available=self.identity_provider.login_available, 

679 token_available=bool(self.token), 

680 static_url=self.static_url, 

681 sys_info=json_sys_info(), 

682 contents_js_source=self.contents_js_source, 

683 version_hash=self.version_hash, 

684 xsrf_form_html=self.xsrf_form_html, 

685 token=self.token, 

686 xsrf_token=self.xsrf_token.decode("utf8"), 

687 nbjs_translations=json.dumps( 

688 combine_translations(self.request.headers.get("Accept-Language", "")) 

689 ), 

690 **self.jinja_template_vars, 

691 ) 

692 

693 def get_json_body(self) -> dict[str, Any] | None: 

694 """Return the body of the request as JSON data.""" 

695 if not self.request.body: 

696 return None 

697 # Do we need to call body.decode('utf-8') here? 

698 body = self.request.body.strip().decode("utf-8") 

699 try: 

700 model = json.loads(body) 

701 except Exception as e: 

702 self.log.debug("Bad JSON: %r", body) 

703 self.log.error("Couldn't parse JSON", exc_info=True) 

704 raise web.HTTPError(400, "Invalid JSON in body of request") from e 

705 return cast("dict[str, Any]", model) 

706 

707 def write_error(self, status_code: int, **kwargs: Any) -> None: 

708 """render custom error pages""" 

709 exc_info = kwargs.get("exc_info") 

710 message = "" 

711 status_message = responses.get(status_code, "Unknown HTTP Error") 

712 

713 if exc_info: 

714 exception = exc_info[1] 

715 # get the custom message, if defined 

716 try: 

717 message = exception.log_message % exception.args 

718 except Exception: 

719 pass 

720 

721 # construct the custom reason, if defined 

722 reason = getattr(exception, "reason", "") 

723 if reason: 

724 status_message = reason 

725 else: 

726 exception = "(unknown)" 

727 

728 # build template namespace 

729 ns = { 

730 "status_code": status_code, 

731 "status_message": status_message, 

732 "message": message, 

733 "exception": exception, 

734 } 

735 

736 self.set_header("Content-Type", "text/html") 

737 # render the template 

738 try: 

739 html = self.render_template("%s.html" % status_code, **ns) 

740 except TemplateNotFound: 

741 html = self.render_template("error.html", **ns) 

742 

743 self.write(html) 

744 

745 

746class APIHandler(JupyterHandler): 

747 """Base class for API handlers""" 

748 

749 async def prepare(self) -> None: # type:ignore[override] 

750 """Prepare an API response.""" 

751 await super().prepare() 

752 if not self.check_origin(): 

753 raise web.HTTPError(404) 

754 

755 def write_error(self, status_code: int, **kwargs: Any) -> None: 

756 """APIHandler errors are JSON, not human pages""" 

757 self.set_header("Content-Type", "application/json") 

758 message = responses.get(status_code, "Unknown HTTP Error") 

759 reply: dict[str, Any] = { 

760 "message": message, 

761 } 

762 exc_info = kwargs.get("exc_info") 

763 if exc_info: 

764 e = exc_info[1] 

765 if isinstance(e, HTTPError): 

766 reply["message"] = e.log_message or message 

767 reply["reason"] = e.reason 

768 else: 

769 reply["message"] = "Unhandled error" 

770 reply["reason"] = None 

771 # backward-compatibility: traceback field is present, 

772 # but always empty 

773 reply["traceback"] = "" 

774 self.log.warning("wrote error: %r", reply["message"], exc_info=True) 

775 self.finish(json.dumps(reply)) 

776 

777 def get_login_url(self) -> str: 

778 """Get the login url.""" 

779 # if get_login_url is invoked in an API handler, 

780 # that means @web.authenticated is trying to trigger a redirect. 

781 # instead of redirecting, raise 403 instead. 

782 if not self.current_user: 

783 raise web.HTTPError(403) 

784 return super().get_login_url() 

785 

786 @property 

787 def content_security_policy(self) -> str: 

788 csp = "; ".join( # noqa: FLY002 

789 [ 

790 super().content_security_policy, 

791 "default-src 'none'", 

792 ] 

793 ) 

794 return csp 

795 

796 # set _track_activity = False on API handlers that shouldn't track activity 

797 _track_activity = True 

798 

799 def update_api_activity(self) -> None: 

800 """Update last_activity of API requests""" 

801 # record activity of authenticated requests 

802 if ( 

803 self._track_activity 

804 and getattr(self, "_jupyter_current_user", None) 

805 and self.get_argument("no_track_activity", None) is None 

806 ): 

807 self.settings["api_last_activity"] = utcnow() 

808 

809 def finish(self, *args: Any, **kwargs: Any) -> Future[Any]: 

810 """Finish an API response.""" 

811 self.update_api_activity() 

812 # Allow caller to indicate content-type... 

813 set_content_type = kwargs.pop("set_content_type", "application/json") 

814 self.set_header("Content-Type", set_content_type) 

815 return super().finish(*args, **kwargs) 

816 

817 @allow_unauthenticated 

818 def options(self, *args: Any, **kwargs: Any) -> None: 

819 """Get the options.""" 

820 if "Access-Control-Allow-Headers" in self.settings.get("headers", {}): 

821 self.set_header( 

822 "Access-Control-Allow-Headers", 

823 self.settings["headers"]["Access-Control-Allow-Headers"], 

824 ) 

825 else: 

826 self.set_header( 

827 "Access-Control-Allow-Headers", 

828 "accept, content-type, authorization, x-xsrftoken", 

829 ) 

830 self.set_header("Access-Control-Allow-Methods", "GET, PUT, POST, PATCH, DELETE, OPTIONS") 

831 

832 # if authorization header is requested, 

833 # that means the request is token-authenticated. 

834 # avoid browser-side rejection of the preflight request. 

835 # only allow this exception if allow_origin has not been specified 

836 # and Jupyter server authentication is enabled. 

837 # If the token is not valid, the 'real' request will still be rejected. 

838 requested_headers = self.request.headers.get("Access-Control-Request-Headers", "").split( 

839 "," 

840 ) 

841 if ( 

842 requested_headers 

843 and any(h.strip().lower() == "authorization" for h in requested_headers) 

844 and ( 

845 # FIXME: it would be even better to check specifically for token-auth, 

846 # but there is currently no API for this. 

847 self.login_available 

848 ) 

849 and ( 

850 self.allow_origin 

851 or self.allow_origin_pat 

852 or "Access-Control-Allow-Origin" in self.settings.get("headers", {}) 

853 ) 

854 ): 

855 self.set_header("Access-Control-Allow-Origin", self.request.headers.get("Origin", "")) 

856 

857 

858class Template404(JupyterHandler): 

859 """Render our 404 template""" 

860 

861 async def prepare(self) -> None: # type:ignore[override] 

862 """Prepare a 404 response.""" 

863 await super().prepare() 

864 raise web.HTTPError(404) 

865 

866 

867class AuthenticatedFileHandler(JupyterHandler, web.StaticFileHandler): 

868 """static files should only be accessible when logged in""" 

869 

870 auth_resource = "contents" 

871 

872 @property 

873 def content_security_policy(self) -> str: 

874 # In case we're serving HTML/SVG, confine any Javascript to a unique 

875 # origin so it can't interact with the Jupyter server. 

876 return super().content_security_policy + "; sandbox allow-scripts" 

877 

878 @web.authenticated 

879 @authorized 

880 def head(self, path: str) -> Awaitable[None]: # type:ignore[override] 

881 """Get the head response for a path.""" 

882 self.check_xsrf_cookie() 

883 return super().head(path) 

884 

885 @web.authenticated 

886 @authorized 

887 def get( # type:ignore[override] 

888 self, path: str, **kwargs: Any 

889 ) -> Awaitable[None]: 

890 """Get a file by path.""" 

891 self.check_xsrf_cookie() 

892 if os.path.splitext(path)[1] == ".ipynb" or self.get_argument("download", None): 

893 name = path.rsplit("/", 1)[-1] 

894 self.set_attachment_header(name) 

895 

896 return web.StaticFileHandler.get(self, path, **kwargs) 

897 

898 def get_content_type(self) -> str: 

899 """Get the content type.""" 

900 assert self.absolute_path is not None 

901 path = self.absolute_path.strip("/") 

902 if "/" in path: 

903 _, name = path.rsplit("/", 1) 

904 else: 

905 name = path 

906 if name.endswith(".ipynb"): 

907 return "application/x-ipynb+json" 

908 else: 

909 cur_mime = mimetypes.guess_type(name)[0] 

910 if cur_mime == "text/plain": 

911 return "text/plain; charset=UTF-8" 

912 else: 

913 return super().get_content_type() 

914 

915 def set_headers(self) -> None: 

916 """Set the headers.""" 

917 super().set_headers() 

918 # disable browser caching, rely on 304 replies for savings 

919 if "v" not in self.request.arguments: 

920 self.add_header("Cache-Control", "no-cache") 

921 

922 def compute_etag(self) -> str | None: 

923 """Compute the etag.""" 

924 return None 

925 

926 def validate_absolute_path(self, root: str, absolute_path: str) -> str: 

927 """Validate and return the absolute path. 

928 

929 Requires tornado 3.1 

930 

931 Adding to tornado's own handling, forbids the serving of hidden files. 

932 """ 

933 abs_path = super().validate_absolute_path(root, absolute_path) 

934 abs_root = os.path.abspath(root) 

935 assert abs_path is not None 

936 if not self.contents_manager.allow_hidden and is_hidden(abs_path, abs_root): 

937 self.log.info( 

938 "Refusing to serve hidden file, via 404 Error, use flag 'ContentsManager.allow_hidden' to enable" 

939 ) 

940 raise web.HTTPError(404) 

941 return abs_path 

942 

943 

944def json_errors(method: Any) -> Any: # pragma: no cover 

945 """Decorate methods with this to return GitHub style JSON errors. 

946 

947 This should be used on any JSON API on any handler method that can raise HTTPErrors. 

948 

949 This will grab the latest HTTPError exception using sys.exc_info 

950 and then: 

951 

952 1. Set the HTTP status code based on the HTTPError 

953 2. Create and return a JSON body with a message field describing 

954 the error in a human readable form. 

955 """ 

956 warnings.warn( 

957 "@json_errors is deprecated in notebook 5.2.0. Subclass APIHandler instead.", 

958 DeprecationWarning, 

959 stacklevel=2, 

960 ) 

961 

962 @functools.wraps(method) 

963 def wrapper(self, *args, **kwargs): 

964 self.write_error = types.MethodType(APIHandler.write_error, self) 

965 return method(self, *args, **kwargs) 

966 

967 return wrapper 

968 

969 

970# ----------------------------------------------------------------------------- 

971# File handler 

972# ----------------------------------------------------------------------------- 

973 

974# to minimize subclass changes: 

975HTTPError = web.HTTPError 

976 

977 

978class FileFindHandler(JupyterHandler, web.StaticFileHandler): 

979 """subclass of StaticFileHandler for serving files from a search path 

980 

981 The setting "static_immutable_cache" can be set up to serve some static 

982 file as immutable (e.g. file name containing a hash). The setting is a 

983 list of base URL, every static file URL starting with one of those will 

984 be immutable. 

985 """ 

986 

987 # cache search results, don't search for files more than once 

988 _static_paths: dict[str, str] = {} 

989 root: tuple[str] # type:ignore[assignment] 

990 

991 def set_headers(self) -> None: 

992 """Set the headers.""" 

993 super().set_headers() 

994 

995 immutable_paths = self.settings.get("static_immutable_cache", []) 

996 

997 # allow immutable cache for files 

998 if any(self.request.path.startswith(path) for path in immutable_paths): 

999 self.set_header("Cache-Control", "public, max-age=31536000, immutable") 

1000 

1001 # disable browser caching, rely on 304 replies for savings 

1002 elif "v" not in self.request.arguments or any( 

1003 self.request.path.startswith(path) for path in self.no_cache_paths 

1004 ): 

1005 self.set_header("Cache-Control", "no-cache") 

1006 

1007 def initialize( 

1008 self, 

1009 path: str | list[str], 

1010 default_filename: str | None = None, 

1011 no_cache_paths: list[str] | None = None, 

1012 ) -> None: 

1013 """Initialize the file find handler.""" 

1014 self.no_cache_paths = no_cache_paths or [] 

1015 

1016 if isinstance(path, str): 

1017 path = [path] 

1018 

1019 self.root = tuple(os.path.abspath(os.path.expanduser(p)) + os.sep for p in path) # type:ignore[assignment] 

1020 self.default_filename = default_filename 

1021 

1022 def compute_etag(self) -> str | None: 

1023 """Compute the etag.""" 

1024 return None 

1025 

1026 # access is allowed as this class is used to serve static assets on login page 

1027 # TODO: create an allow-list of files used on login page and remove this decorator 

1028 @allow_unauthenticated 

1029 def get(self, path: str, include_body: bool = True) -> Coroutine[Any, Any, None]: 

1030 return super().get(path, include_body) 

1031 

1032 # access is allowed as this class is used to serve static assets on login page 

1033 # TODO: create an allow-list of files used on login page and remove this decorator 

1034 @allow_unauthenticated 

1035 def head(self, path: str) -> Awaitable[None]: 

1036 return super().head(path) 

1037 

1038 @classmethod 

1039 def get_absolute_path(cls, roots: Sequence[str], path: str) -> str: 

1040 """locate a file to serve on our static file search path""" 

1041 with cls._lock: 

1042 if path in cls._static_paths: 

1043 return cls._static_paths[path] 

1044 try: 

1045 abspath = os.path.abspath(filefind(path, roots)) 

1046 except OSError: 

1047 # IOError means not found 

1048 return "" 

1049 

1050 cls._static_paths[path] = abspath 

1051 

1052 log().debug(f"Path {path} served from {abspath}") 

1053 return abspath 

1054 

1055 def validate_absolute_path(self, root: str, absolute_path: str) -> str | None: 

1056 """check if the file should be served (raises 404, 403, etc.)""" 

1057 if not absolute_path: 

1058 raise web.HTTPError(404) 

1059 

1060 for root in self.root: 

1061 if (absolute_path + os.sep).startswith(root): 

1062 break 

1063 

1064 return super().validate_absolute_path(root, absolute_path) 

1065 

1066 

1067class APIVersionHandler(APIHandler): 

1068 """An API handler for the server version.""" 

1069 

1070 _track_activity = False 

1071 

1072 @allow_unauthenticated 

1073 def get(self) -> None: 

1074 """Get the server version info.""" 

1075 # not authenticated, so give as few info as possible 

1076 self.finish(json.dumps({"version": jupyter_server.__version__})) 

1077 

1078 

1079class TrailingSlashHandler(web.RequestHandler): 

1080 """Simple redirect handler that strips trailing slashes 

1081 

1082 This should be the first, highest priority handler. 

1083 """ 

1084 

1085 @allow_unauthenticated 

1086 def get(self) -> None: 

1087 """Handle trailing slashes in a get.""" 

1088 assert self.request.uri is not None 

1089 path, *rest = self.request.uri.partition("?") 

1090 # trim trailing *and* leading / 

1091 # to avoid misinterpreting repeated '//' 

1092 path = "/" + path.strip("/") 

1093 new_uri = "".join([path, *rest]) 

1094 self.redirect(new_uri) 

1095 

1096 post = put = get 

1097 

1098 

1099class MainHandler(JupyterHandler): 

1100 """Simple handler for base_url.""" 

1101 

1102 @allow_unauthenticated 

1103 def get(self) -> None: 

1104 """Get the main template.""" 

1105 html = self.render_template("main.html") 

1106 self.write(html) 

1107 

1108 post = put = get 

1109 

1110 

1111class FilesRedirectHandler(JupyterHandler): 

1112 """Handler for redirecting relative URLs to the /files/ handler""" 

1113 

1114 @staticmethod 

1115 async def redirect_to_files(self: Any, path: str) -> None: 

1116 """make redirect logic a reusable static method 

1117 

1118 so it can be called from other handlers. 

1119 """ 

1120 cm = self.contents_manager 

1121 if await ensure_async(cm.dir_exists(path)): 

1122 # it's a *directory*, redirect to /tree 

1123 url = url_path_join(self.base_url, "tree", url_escape(path)) 

1124 else: 

1125 orig_path = path 

1126 # otherwise, redirect to /files 

1127 parts = path.split("/") 

1128 

1129 if not await ensure_async(cm.file_exists(path=path)) and "files" in parts: 

1130 # redirect without files/ iff it would 404 

1131 # this preserves pre-2.0-style 'files/' links 

1132 self.log.warning("Deprecated files/ URL: %s", orig_path) 

1133 parts.remove("files") 

1134 path = "/".join(parts) 

1135 

1136 if not await ensure_async(cm.file_exists(path=path)): 

1137 raise web.HTTPError(404) 

1138 

1139 url = url_path_join(self.base_url, "files", url_escape(path)) 

1140 self.log.debug("Redirecting %s to %s", self.request.path, url) 

1141 self.redirect(url) 

1142 

1143 @allow_unauthenticated 

1144 async def get(self, path: str = "") -> None: 

1145 return await self.redirect_to_files(self, path) 

1146 

1147 

1148class RedirectWithParams(web.RequestHandler): 

1149 """Same as web.RedirectHandler, but preserves URL parameters""" 

1150 

1151 def initialize(self, url: str, permanent: bool = True) -> None: 

1152 """Initialize a redirect handler.""" 

1153 self._url = url 

1154 self._permanent = permanent 

1155 

1156 @allow_unauthenticated 

1157 def get(self) -> None: 

1158 """Get a redirect.""" 

1159 sep = "&" if "?" in self._url else "?" 

1160 url = sep.join([self._url, self.request.query]) 

1161 self.redirect(url, permanent=self._permanent) 

1162 

1163 

1164class PrometheusMetricsHandler(JupyterHandler): 

1165 """ 

1166 Return prometheus metrics for this server 

1167 """ 

1168 

1169 @allow_unauthenticated 

1170 def get(self) -> None: 

1171 """Get prometheus metrics.""" 

1172 if self.settings["authenticate_prometheus"] and not self.logged_in: 

1173 raise web.HTTPError(403) 

1174 

1175 self.set_header("Content-Type", prometheus_client.CONTENT_TYPE_LATEST) 

1176 self.write(prometheus_client.generate_latest(prometheus_client.REGISTRY)) 

1177 

1178 

1179class PublicStaticFileHandler(web.StaticFileHandler): 

1180 """Same as web.StaticFileHandler, but decorated to acknowledge that auth is not required.""" 

1181 

1182 @allow_unauthenticated 

1183 def head(self, path: str) -> Awaitable[None]: 

1184 return super().head(path) 

1185 

1186 @allow_unauthenticated 

1187 def get(self, path: str, include_body: bool = True) -> Coroutine[Any, Any, None]: 

1188 return super().get(path, include_body) 

1189 

1190 

1191# ----------------------------------------------------------------------------- 

1192# URL pattern fragments for reuse 

1193# ----------------------------------------------------------------------------- 

1194 

1195# path matches any number of `/foo[/bar...]` or just `/` or '' 

1196path_regex = r"(?P<path>(?:(?:/[^/]+)+|/?))" 

1197 

1198# ----------------------------------------------------------------------------- 

1199# URL to handler mappings 

1200# ----------------------------------------------------------------------------- 

1201 

1202 

1203default_handlers = [ 

1204 (r".*/", TrailingSlashHandler), 

1205 (r"api", APIVersionHandler), 

1206 (r"/(robots\.txt|favicon\.ico)", PublicStaticFileHandler), 

1207 (r"/metrics", PrometheusMetricsHandler), 

1208]