Coverage for /pythoncovmergedfiles/medio/medio/src/jupyter_server/jupyter_server/base/handlers.py: 33%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

565 statements  

1"""Base Tornado handlers for the Jupyter server.""" 

2 

3# Copyright (c) Jupyter Development Team. 

4# Distributed under the terms of the Modified BSD License. 

5from __future__ import annotations 

6 

7import functools 

8import inspect 

9import ipaddress 

10import json 

11import mimetypes 

12import os 

13import re 

14import types 

15import warnings 

16from collections.abc import Awaitable, Coroutine, Sequence 

17from http.client import responses 

18from typing import TYPE_CHECKING, Any, cast 

19from urllib.parse import urlparse 

20 

21import prometheus_client 

22from jinja2 import TemplateNotFound 

23from jupyter_core.paths import is_hidden 

24from tornado import web 

25from tornado.log import app_log 

26from traitlets.config import Application 

27 

28import jupyter_server 

29from jupyter_server import CallContext 

30from jupyter_server._sysinfo import get_sys_info 

31from jupyter_server._tz import utcnow 

32from jupyter_server.auth.decorator import allow_unauthenticated, authorized 

33from jupyter_server.auth.identity import User 

34from jupyter_server.i18n import combine_translations 

35from jupyter_server.services.security import csp_report_uri 

36from jupyter_server.utils import ( 

37 ensure_async, 

38 filefind, 

39 url_escape, 

40 url_is_absolute, 

41 url_path_join, 

42 urldecode_unix_socket_path, 

43) 

44 

45if TYPE_CHECKING: 

46 from logging import Logger 

47 

48 from jupyter_client.kernelspec import KernelSpecManager 

49 from jupyter_events import EventLogger 

50 from jupyter_server_terminals.terminalmanager import TerminalManager 

51 from tornado.concurrent import Future 

52 

53 from jupyter_server.auth.authorizer import Authorizer 

54 from jupyter_server.auth.identity import IdentityProvider 

55 from jupyter_server.serverapp import ServerApp 

56 from jupyter_server.services.config.manager import ConfigManager 

57 from jupyter_server.services.contents.manager import ContentsManager 

58 from jupyter_server.services.kernels.kernelmanager import AsyncMappingKernelManager 

59 from jupyter_server.services.sessions.sessionmanager import SessionManager 

60 

61# ----------------------------------------------------------------------------- 

62# Top-level handlers 

63# ----------------------------------------------------------------------------- 

64 

65_sys_info_cache = None 

66 

67 

68def json_sys_info(): 

69 """Get sys info as json.""" 

70 global _sys_info_cache # noqa: PLW0603 

71 if _sys_info_cache is None: 

72 _sys_info_cache = json.dumps(get_sys_info()) 

73 return _sys_info_cache 

74 

75 

76def log() -> Logger: 

77 """Get the application log.""" 

78 if Application.initialized(): 

79 return cast("Logger", Application.instance().log) 

80 else: 

81 return app_log 

82 

83 

84class AuthenticatedHandler(web.RequestHandler): 

85 """A RequestHandler with an authenticated user.""" 

86 

87 @property 

88 def base_url(self) -> str: 

89 return cast("str", self.settings.get("base_url", "/")) 

90 

91 @property 

92 def content_security_policy(self) -> str: 

93 """The default Content-Security-Policy header 

94 

95 Can be overridden by defining Content-Security-Policy in settings['headers'] 

96 """ 

97 if "Content-Security-Policy" in self.settings.get("headers", {}): 

98 # user-specified, don't override 

99 return cast("str", self.settings["headers"]["Content-Security-Policy"]) 

100 

101 return "; ".join( 

102 [ 

103 "frame-ancestors 'self'", 

104 # Make sure the report-uri is relative to the base_url 

105 "report-uri " 

106 + self.settings.get("csp_report_uri", url_path_join(self.base_url, csp_report_uri)), 

107 ] 

108 ) 

109 

110 def set_default_headers(self) -> None: 

111 """Set the default headers.""" 

112 headers = {} 

113 headers["X-Content-Type-Options"] = "nosniff" 

114 headers.update(self.settings.get("headers", {})) 

115 

116 headers["Content-Security-Policy"] = self.content_security_policy 

117 

118 # Allow for overriding headers 

119 for header_name, value in headers.items(): 

120 try: 

121 self.set_header(header_name, value) 

122 except Exception as e: 

123 # tornado raise Exception (not a subclass) 

124 # if method is unsupported (websocket and Access-Control-Allow-Origin 

125 # for example, so just ignore) 

126 self.log.exception( # type:ignore[attr-defined] 

127 "Could not set default headers: %s", e 

128 ) 

129 

130 @property 

131 def cookie_name(self) -> str: 

132 warnings.warn( 

133 """JupyterHandler.login_handler is deprecated in 2.0, 

134 use JupyterHandler.identity_provider. 

135 """, 

136 DeprecationWarning, 

137 stacklevel=2, 

138 ) 

139 return self.identity_provider.get_cookie_name(self) 

140 

141 def force_clear_cookie(self, name: str, path: str = "/", domain: str | None = None) -> None: 

142 """Force a cookie clear.""" 

143 warnings.warn( 

144 """JupyterHandler.login_handler is deprecated in 2.0, 

145 use JupyterHandler.identity_provider. 

146 """, 

147 DeprecationWarning, 

148 stacklevel=2, 

149 ) 

150 self.identity_provider._force_clear_cookie(self, name, path=path, domain=domain) 

151 

152 def clear_login_cookie(self) -> None: 

153 """Clear a login cookie.""" 

154 warnings.warn( 

155 """JupyterHandler.login_handler is deprecated in 2.0, 

156 use JupyterHandler.identity_provider. 

157 """, 

158 DeprecationWarning, 

159 stacklevel=2, 

160 ) 

161 self.identity_provider.clear_login_cookie(self) 

162 

163 def get_current_user(self) -> str: 

164 """Get the current user.""" 

165 clsname = self.__class__.__name__ 

166 msg = ( 

167 f"Calling `{clsname}.get_current_user()` directly is deprecated in jupyter-server 2.0." 

168 " Use `self.current_user` instead (works in all versions)." 

169 ) 

170 if hasattr(self, "_jupyter_current_user"): 

171 # backward-compat: return _jupyter_current_user 

172 warnings.warn( 

173 msg, 

174 DeprecationWarning, 

175 stacklevel=2, 

176 ) 

177 return cast("str", self._jupyter_current_user) 

178 # haven't called get_user in prepare, raise 

179 raise RuntimeError(msg) 

180 

181 def skip_check_origin(self) -> bool: 

182 """Ask my login_handler if I should skip the origin_check 

183 

184 For example: in the default LoginHandler, if a request is token-authenticated, 

185 origin checking should be skipped. 

186 """ 

187 if self.request.method == "OPTIONS": 

188 # no origin-check on options requests, which are used to check origins! 

189 return True 

190 return not self.identity_provider.should_check_origin(self) 

191 

192 @property 

193 def token_authenticated(self) -> bool: 

194 """Have I been authenticated with a token?""" 

195 return self.identity_provider.is_token_authenticated(self) 

196 

197 @property 

198 def logged_in(self) -> bool: 

199 """Is a user currently logged in?""" 

200 user = self.current_user 

201 return bool(user and user != "anonymous") 

202 

203 @property 

204 def login_handler(self) -> Any: 

205 """Return the login handler for this application, if any.""" 

206 warnings.warn( 

207 """JupyterHandler.login_handler is deprecated in 2.0, 

208 use JupyterHandler.identity_provider. 

209 """, 

210 DeprecationWarning, 

211 stacklevel=2, 

212 ) 

213 return self.identity_provider.login_handler_class 

214 

215 @property 

216 def token(self) -> str | None: 

217 """Return the login token for this application, if any.""" 

218 return self.identity_provider.token 

219 

220 @property 

221 def login_available(self) -> bool: 

222 """May a user proceed to log in? 

223 

224 This returns True if login capability is available, irrespective of 

225 whether the user is already logged in or not. 

226 

227 """ 

228 return cast("bool", self.identity_provider.login_available) 

229 

230 @property 

231 def authorizer(self) -> Authorizer: 

232 if "authorizer" not in self.settings: 

233 warnings.warn( 

234 "The Tornado web application does not have an 'authorizer' defined " 

235 "in its settings. In future releases of jupyter_server, this will " 

236 "be a required key for all subclasses of `JupyterHandler`. For an " 

237 "example, see the jupyter_server source code for how to " 

238 "add an authorizer to the tornado settings: " 

239 "https://github.com/jupyter-server/jupyter_server/blob/" 

240 "653740cbad7ce0c8a8752ce83e4d3c2c754b13cb/jupyter_server/serverapp.py" 

241 "#L234-L256", 

242 stacklevel=2, 

243 ) 

244 from jupyter_server.auth import AllowAllAuthorizer 

245 

246 self.settings["authorizer"] = AllowAllAuthorizer( 

247 config=self.settings.get("config", None), 

248 identity_provider=self.identity_provider, 

249 ) 

250 

251 return cast("Authorizer", self.settings.get("authorizer")) 

252 

253 @property 

254 def identity_provider(self) -> IdentityProvider: 

255 if "identity_provider" not in self.settings: 

256 warnings.warn( 

257 "The Tornado web application does not have an 'identity_provider' defined " 

258 "in its settings. In future releases of jupyter_server, this will " 

259 "be a required key for all subclasses of `JupyterHandler`. For an " 

260 "example, see the jupyter_server source code for how to " 

261 "add an identity provider to the tornado settings: " 

262 "https://github.com/jupyter-server/jupyter_server/blob/v2.0.0/" 

263 "jupyter_server/serverapp.py#L242", 

264 stacklevel=2, 

265 ) 

266 from jupyter_server.auth import IdentityProvider 

267 

268 # no identity provider set, load default 

269 self.settings["identity_provider"] = IdentityProvider( 

270 config=self.settings.get("config", None) 

271 ) 

272 return cast("IdentityProvider", self.settings["identity_provider"]) 

273 

274 

275class JupyterHandler(AuthenticatedHandler): 

276 """Jupyter-specific extensions to authenticated handling 

277 

278 Mostly property shortcuts to Jupyter-specific settings. 

279 """ 

280 

281 @property 

282 def config(self) -> dict[str, Any] | None: 

283 return cast("dict[str, Any] | None", self.settings.get("config", None)) 

284 

285 @property 

286 def log(self) -> Logger: 

287 """use the Jupyter log by default, falling back on tornado's logger""" 

288 return log() 

289 

290 @property 

291 def jinja_template_vars(self) -> dict[str, Any]: 

292 """User-supplied values to supply to jinja templates.""" 

293 return cast("dict[str, Any]", self.settings.get("jinja_template_vars", {})) 

294 

295 @property 

296 def serverapp(self) -> ServerApp | None: 

297 return cast("ServerApp | None", self.settings["serverapp"]) 

298 

299 # --------------------------------------------------------------- 

300 # URLs 

301 # --------------------------------------------------------------- 

302 

303 @property 

304 def version_hash(self) -> str: 

305 """The version hash to use for cache hints for static files""" 

306 return cast("str", self.settings.get("version_hash", "")) 

307 

308 @property 

309 def mathjax_url(self) -> str: 

310 url = cast("str", self.settings.get("mathjax_url", "")) 

311 if not url or url_is_absolute(url): 

312 return url 

313 return url_path_join(self.base_url, url) 

314 

315 @property 

316 def mathjax_config(self) -> str: 

317 return cast("str", self.settings.get("mathjax_config", "TeX-AMS-MML_HTMLorMML-full,Safe")) 

318 

319 @property 

320 def default_url(self) -> str: 

321 return cast("str", self.settings.get("default_url", "")) 

322 

323 @property 

324 def ws_url(self) -> str: 

325 return cast("str", self.settings.get("websocket_url", "")) 

326 

327 @property 

328 def contents_js_source(self) -> str: 

329 self.log.debug( 

330 "Using contents: %s", 

331 self.settings.get("contents_js_source", "services/contents"), 

332 ) 

333 return cast("str", self.settings.get("contents_js_source", "services/contents")) 

334 

335 # --------------------------------------------------------------- 

336 # Manager objects 

337 # --------------------------------------------------------------- 

338 

339 @property 

340 def kernel_manager(self) -> AsyncMappingKernelManager: 

341 return cast("AsyncMappingKernelManager", self.settings["kernel_manager"]) 

342 

343 @property 

344 def contents_manager(self) -> ContentsManager: 

345 return cast("ContentsManager", self.settings["contents_manager"]) 

346 

347 @property 

348 def session_manager(self) -> SessionManager: 

349 return cast("SessionManager", self.settings["session_manager"]) 

350 

351 @property 

352 def terminal_manager(self) -> TerminalManager: 

353 return cast("TerminalManager", self.settings["terminal_manager"]) 

354 

355 @property 

356 def kernel_spec_manager(self) -> KernelSpecManager: 

357 return cast("KernelSpecManager", self.settings["kernel_spec_manager"]) 

358 

359 @property 

360 def config_manager(self) -> ConfigManager: 

361 return cast("ConfigManager", self.settings["config_manager"]) 

362 

363 @property 

364 def event_logger(self) -> EventLogger: 

365 return cast("EventLogger", self.settings["event_logger"]) 

366 

367 # --------------------------------------------------------------- 

368 # CORS 

369 # --------------------------------------------------------------- 

370 

371 @property 

372 def allow_origin(self) -> str: 

373 """Normal Access-Control-Allow-Origin""" 

374 return cast("str", self.settings.get("allow_origin", "")) 

375 

376 @property 

377 def allow_origin_pat(self) -> str | None: 

378 """Regular expression version of allow_origin""" 

379 return cast("str | None", self.settings.get("allow_origin_pat", None)) 

380 

381 @property 

382 def allow_credentials(self) -> bool: 

383 """Whether to set Access-Control-Allow-Credentials""" 

384 return cast("bool", self.settings.get("allow_credentials", False)) 

385 

386 def set_default_headers(self) -> None: 

387 """Add CORS headers, if defined""" 

388 super().set_default_headers() 

389 

390 def set_cors_headers(self) -> None: 

391 """Add CORS headers, if defined 

392 

393 Now that current_user is async (jupyter-server 2.0), 

394 must be called at the end of prepare(), instead of in set_default_headers. 

395 """ 

396 if self.allow_origin: 

397 self.set_header("Access-Control-Allow-Origin", self.allow_origin) 

398 elif self.allow_origin_pat: 

399 origin = self.get_origin() 

400 if origin and re.match(self.allow_origin_pat, origin): 

401 self.set_header("Access-Control-Allow-Origin", origin) 

402 elif self.token_authenticated and "Access-Control-Allow-Origin" not in self.settings.get( 

403 "headers", {} 

404 ): 

405 # allow token-authenticated requests cross-origin by default. 

406 # only apply this exception if allow-origin has not been specified. 

407 self.set_header("Access-Control-Allow-Origin", self.request.headers.get("Origin", "")) 

408 

409 if self.allow_credentials: 

410 self.set_header("Access-Control-Allow-Credentials", "true") 

411 

412 def set_attachment_header(self, filename: str) -> None: 

413 """Set Content-Disposition: attachment header 

414 

415 As a method to ensure handling of filename encoding 

416 """ 

417 escaped_filename = url_escape(filename) 

418 self.set_header( 

419 "Content-Disposition", 

420 f"attachment; filename*=utf-8''{escaped_filename}", 

421 ) 

422 

423 def get_origin(self) -> str | None: 

424 # Handle WebSocket Origin naming convention differences 

425 # The difference between version 8 and 13 is that in 8 the 

426 # client sends a "Sec-Websocket-Origin" header and in 13 it's 

427 # simply "Origin". 

428 if "Origin" in self.request.headers: 

429 origin = self.request.headers.get("Origin") 

430 else: 

431 origin = self.request.headers.get("Sec-Websocket-Origin", None) 

432 return origin 

433 

434 # origin_to_satisfy_tornado is present because tornado requires 

435 # check_origin to take an origin argument, but we don't use it 

436 def check_origin(self, origin_to_satisfy_tornado: str = "") -> bool: 

437 """Check Origin for cross-site API requests, including websockets 

438 

439 Copied from WebSocket with changes: 

440 

441 - allow unspecified host/origin (e.g. scripts) 

442 - allow token-authenticated requests 

443 """ 

444 if self.allow_origin == "*" or self.skip_check_origin(): 

445 return True 

446 

447 host = self.request.headers.get("Host") 

448 origin = self.request.headers.get("Origin") 

449 

450 # If no header is provided, let the request through. 

451 # Origin can be None for: 

452 # - same-origin (IE, Firefox) 

453 # - Cross-site POST form (IE, Firefox) 

454 # - Scripts 

455 # The cross-site POST (XSRF) case is handled by tornado's xsrf_token 

456 if origin is None or host is None: 

457 return True 

458 

459 origin = origin.lower() 

460 origin_host = urlparse(origin).netloc 

461 

462 # OK if origin matches host 

463 if origin_host == host: 

464 return True 

465 

466 # Check CORS headers 

467 if self.allow_origin: 

468 allow = bool(self.allow_origin == origin) 

469 elif self.allow_origin_pat: 

470 allow = bool(re.match(self.allow_origin_pat, origin)) 

471 else: 

472 # No CORS headers deny the request 

473 allow = False 

474 if not allow: 

475 self.log.warning( 

476 "Blocking Cross Origin API request for %s. Origin: %s, Host: %s", 

477 self.request.path, 

478 origin, 

479 host, 

480 ) 

481 return allow 

482 

483 def check_referer(self) -> bool: 

484 """Check Referer for cross-site requests. 

485 Disables requests to certain endpoints with 

486 external or missing Referer. 

487 If set, allow_origin settings are applied to the Referer 

488 to whitelist specific cross-origin sites. 

489 Used on GET for api endpoints and /files/ 

490 to block cross-site inclusion (XSSI). 

491 """ 

492 if self.allow_origin == "*" or self.skip_check_origin(): 

493 return True 

494 

495 host = self.request.headers.get("Host") 

496 referer = self.request.headers.get("Referer") 

497 

498 if not host: 

499 self.log.warning("Blocking request with no host") 

500 return False 

501 if not referer: 

502 self.log.warning("Blocking request with no referer") 

503 return False 

504 

505 referer_url = urlparse(referer) 

506 referer_host = referer_url.netloc 

507 if referer_host == host: 

508 return True 

509 

510 # apply cross-origin checks to Referer: 

511 origin = f"{referer_url.scheme}://{referer_url.netloc}" 

512 if self.allow_origin: 

513 allow = self.allow_origin == origin 

514 elif self.allow_origin_pat: 

515 allow = bool(re.match(self.allow_origin_pat, origin)) 

516 else: 

517 # No CORS settings, deny the request 

518 allow = False 

519 

520 if not allow: 

521 self.log.warning( 

522 "Blocking Cross Origin request for %s. Referer: %s, Host: %s", 

523 self.request.path, 

524 origin, 

525 host, 

526 ) 

527 return allow 

528 

529 def check_xsrf_cookie(self) -> None: 

530 """Bypass xsrf cookie checks when token-authenticated""" 

531 if not hasattr(self, "_jupyter_current_user"): 

532 # Called too early, will be checked later 

533 return None 

534 if self.token_authenticated or self.settings.get("disable_check_xsrf", False): 

535 # Token-authenticated requests do not need additional XSRF-check 

536 # Servers without authentication are vulnerable to XSRF 

537 return None 

538 try: 

539 if not self.check_origin(): 

540 raise web.HTTPError(404) 

541 return super().check_xsrf_cookie() 

542 except web.HTTPError as e: 

543 if self.request.method in {"GET", "HEAD"}: 

544 # Consider Referer a sufficient cross-origin check for GET requests 

545 if not self.check_referer(): 

546 referer = self.request.headers.get("Referer") 

547 if referer: 

548 msg = f"Blocking Cross Origin request from {referer}." 

549 else: 

550 msg = "Blocking request from unknown origin" 

551 raise web.HTTPError(403, msg) from e 

552 else: 

553 raise 

554 

555 def check_host(self) -> bool: 

556 """Check the host header if remote access disallowed. 

557 

558 Returns True if the request should continue, False otherwise. 

559 """ 

560 if self.settings.get("allow_remote_access", False): 

561 return True 

562 

563 # Remove port (e.g. ':8888') from host 

564 match = re.match(r"^(.*?)(:\d+)?$", self.request.host) 

565 assert match is not None 

566 host = match.group(1) 

567 

568 # Browsers format IPv6 addresses like [::1]; we need to remove the [] 

569 if host.startswith("[") and host.endswith("]"): 

570 host = host[1:-1] 

571 

572 # UNIX socket handling 

573 check_host = urldecode_unix_socket_path(host) 

574 if check_host.startswith("/") and os.path.exists(check_host): 

575 allow = True 

576 else: 

577 try: 

578 addr = ipaddress.ip_address(host) 

579 except ValueError: 

580 # Not an IP address: check against hostnames 

581 allow = host in self.settings.get("local_hostnames", ["localhost"]) 

582 else: 

583 allow = addr.is_loopback 

584 

585 if not allow: 

586 self.log.warning( 

587 ( 

588 "Blocking request with non-local 'Host' %s (%s). " 

589 "If the server should be accessible at that name, " 

590 "set ServerApp.allow_remote_access to disable the check." 

591 ), 

592 host, 

593 self.request.host, 

594 ) 

595 return allow 

596 

597 async def prepare(self, *, _redirect_to_login=True) -> Awaitable[None] | None: # type:ignore[override] 

598 """Prepare a response.""" 

599 # Set the current Jupyter Handler context variable. 

600 CallContext.set(CallContext.JUPYTER_HANDLER, self) 

601 

602 if not self.check_host(): 

603 self.current_user = self._jupyter_current_user = None 

604 raise web.HTTPError(403) 

605 

606 from jupyter_server.auth import IdentityProvider 

607 

608 mod_obj = inspect.getmodule(self.get_current_user) 

609 assert mod_obj is not None 

610 user: User | None = None 

611 

612 if type(self.identity_provider) is IdentityProvider and mod_obj.__name__ != __name__: 

613 # check for overridden get_current_user + default IdentityProvider 

614 # deprecated way to override auth (e.g. JupyterHub < 3.0) 

615 # allow deprecated, overridden get_current_user 

616 warnings.warn( 

617 "Overriding JupyterHandler.get_current_user is deprecated in jupyter-server 2.0." 

618 " Use an IdentityProvider class.", 

619 DeprecationWarning, 

620 stacklevel=1, 

621 ) 

622 user = User(self.get_current_user()) 

623 else: 

624 _user = self.identity_provider.get_user(self) 

625 if isinstance(_user, Awaitable): 

626 # IdentityProvider.get_user _may_ be async 

627 _user = await _user 

628 user = _user 

629 

630 # self.current_user for tornado's @web.authenticated 

631 # self._jupyter_current_user for backward-compat in deprecated get_current_user calls 

632 # and our own private checks for whether .current_user has been set 

633 self.current_user = self._jupyter_current_user = user 

634 # complete initial steps which require auth to resolve first: 

635 self.set_cors_headers() 

636 if self.request.method not in {"GET", "HEAD", "OPTIONS"}: 

637 self.check_xsrf_cookie() 

638 

639 if not self.settings.get("allow_unauthenticated_access", False): 

640 if not self.request.method: 

641 raise HTTPError(403) 

642 method = getattr(self, self.request.method.lower()) 

643 if not getattr(method, "__allow_unauthenticated", False): 

644 if _redirect_to_login: 

645 # reuse `web.authenticated` logic, which redirects to the login 

646 # page on GET and HEAD and otherwise raises 403 

647 return web.authenticated(lambda _: super().prepare())(self) 

648 else: 

649 # raise 403 if user is not known without redirecting to login page 

650 user = self.current_user 

651 if user is None: 

652 self.log.warning( 

653 f"Couldn't authenticate {self.__class__.__name__} connection" 

654 ) 

655 raise web.HTTPError(403) 

656 

657 return super().prepare() 

658 

659 # --------------------------------------------------------------- 

660 # template rendering 

661 # --------------------------------------------------------------- 

662 

663 def get_template(self, name): 

664 """Return the jinja template object for a given name""" 

665 return self.settings["jinja2_env"].get_template(name) 

666 

667 def render_template(self, name, **ns): 

668 """Render a template by name.""" 

669 ns.update(self.template_namespace) 

670 template = self.get_template(name) 

671 return template.render(**ns) 

672 

673 @property 

674 def template_namespace(self) -> dict[str, Any]: 

675 return dict( 

676 base_url=self.base_url, 

677 default_url=self.default_url, 

678 ws_url=self.ws_url, 

679 logged_in=self.logged_in, 

680 allow_password_change=getattr(self.identity_provider, "allow_password_change", False), 

681 auth_enabled=self.identity_provider.auth_enabled, 

682 login_available=self.identity_provider.login_available, 

683 token_available=bool(self.token), 

684 static_url=self.static_url, 

685 sys_info=json_sys_info(), 

686 contents_js_source=self.contents_js_source, 

687 version_hash=self.version_hash, 

688 xsrf_form_html=self.xsrf_form_html, 

689 token=self.token, 

690 xsrf_token=self.xsrf_token.decode("utf8"), 

691 nbjs_translations=json.dumps( 

692 combine_translations(self.request.headers.get("Accept-Language", "")) 

693 ), 

694 **self.jinja_template_vars, 

695 ) 

696 

697 def get_json_body(self) -> dict[str, Any] | None: 

698 """Return the body of the request as JSON data.""" 

699 if not self.request.body: 

700 return None 

701 # Do we need to call body.decode('utf-8') here? 

702 body = self.request.body.strip().decode("utf-8") 

703 try: 

704 model = json.loads(body) 

705 except Exception as e: 

706 self.log.debug("Bad JSON: %r", body) 

707 self.log.error("Couldn't parse JSON", exc_info=True) 

708 raise web.HTTPError(400, "Invalid JSON in body of request") from e 

709 return cast("dict[str, Any]", model) 

710 

711 def write_error(self, status_code: int, **kwargs: Any) -> None: 

712 """render custom error pages""" 

713 exc_info = kwargs.get("exc_info") 

714 message = "" 

715 status_message = responses.get(status_code, "Unknown HTTP Error") 

716 

717 if exc_info: 

718 exception = exc_info[1] 

719 # get the custom message, if defined 

720 try: 

721 message = exception.log_message % exception.args 

722 except Exception: 

723 pass 

724 

725 # construct the custom reason, if defined 

726 reason = getattr(exception, "reason", "") 

727 if reason: 

728 status_message = reason 

729 else: 

730 exception = "(unknown)" 

731 

732 # build template namespace 

733 ns = { 

734 "status_code": status_code, 

735 "status_message": status_message, 

736 "message": message, 

737 "exception": exception, 

738 } 

739 

740 self.set_header("Content-Type", "text/html") 

741 # render the template 

742 try: 

743 html = self.render_template("%s.html" % status_code, **ns) 

744 except TemplateNotFound: 

745 html = self.render_template("error.html", **ns) 

746 

747 self.write(html) 

748 

749 

750class APIHandler(JupyterHandler): 

751 """Base class for API handlers""" 

752 

753 async def prepare(self) -> None: # type:ignore[override] 

754 """Prepare an API response.""" 

755 await super().prepare() 

756 if not self.check_origin(): 

757 raise web.HTTPError(404) 

758 

759 def write_error(self, status_code: int, **kwargs: Any) -> None: 

760 """APIHandler errors are JSON, not human pages""" 

761 self.set_header("Content-Type", "application/json") 

762 message = responses.get(status_code, "Unknown HTTP Error") 

763 reply: dict[str, Any] = { 

764 "message": message, 

765 } 

766 exc_info = kwargs.get("exc_info") 

767 if exc_info: 

768 e = exc_info[1] 

769 if isinstance(e, HTTPError): 

770 reply["message"] = e.log_message or message 

771 reply["reason"] = e.reason 

772 else: 

773 reply["message"] = "Unhandled error" 

774 reply["reason"] = None 

775 # backward-compatibility: traceback field is present, 

776 # but always empty 

777 reply["traceback"] = "" 

778 self.log.warning("wrote error: %r", reply["message"]) 

779 self.finish(json.dumps(reply)) 

780 

781 def get_login_url(self) -> str: 

782 """Get the login url.""" 

783 # if get_login_url is invoked in an API handler, 

784 # that means @web.authenticated is trying to trigger a redirect. 

785 # instead of redirecting, raise 403 instead. 

786 if not self.current_user: 

787 raise web.HTTPError(403) 

788 return super().get_login_url() 

789 

790 @property 

791 def content_security_policy(self) -> str: 

792 csp = "; ".join( # noqa: FLY002 

793 [ 

794 super().content_security_policy, 

795 "default-src 'none'", 

796 ] 

797 ) 

798 return csp 

799 

800 # set _track_activity = False on API handlers that shouldn't track activity 

801 _track_activity = True 

802 

803 def update_api_activity(self) -> None: 

804 """Update last_activity of API requests""" 

805 # record activity of authenticated requests 

806 if ( 

807 self._track_activity 

808 and getattr(self, "_jupyter_current_user", None) 

809 and self.get_argument("no_track_activity", None) is None 

810 ): 

811 self.settings["api_last_activity"] = utcnow() 

812 

813 def finish(self, *args: Any, **kwargs: Any) -> Future[Any]: 

814 """Finish an API response.""" 

815 self.update_api_activity() 

816 # Allow caller to indicate content-type... 

817 set_content_type = kwargs.pop("set_content_type", "application/json") 

818 self.set_header("Content-Type", set_content_type) 

819 return super().finish(*args, **kwargs) 

820 

821 @allow_unauthenticated 

822 def options(self, *args: Any, **kwargs: Any) -> None: 

823 """Get the options.""" 

824 if "Access-Control-Allow-Headers" in self.settings.get("headers", {}): 

825 self.set_header( 

826 "Access-Control-Allow-Headers", 

827 self.settings["headers"]["Access-Control-Allow-Headers"], 

828 ) 

829 else: 

830 self.set_header( 

831 "Access-Control-Allow-Headers", 

832 "accept, content-type, authorization, x-xsrftoken", 

833 ) 

834 self.set_header("Access-Control-Allow-Methods", "GET, PUT, POST, PATCH, DELETE, OPTIONS") 

835 

836 # if authorization header is requested, 

837 # that means the request is token-authenticated. 

838 # avoid browser-side rejection of the preflight request. 

839 # only allow this exception if allow_origin has not been specified 

840 # and Jupyter server authentication is enabled. 

841 # If the token is not valid, the 'real' request will still be rejected. 

842 requested_headers = self.request.headers.get("Access-Control-Request-Headers", "").split( 

843 "," 

844 ) 

845 if ( 

846 requested_headers 

847 and any(h.strip().lower() == "authorization" for h in requested_headers) 

848 and ( 

849 # FIXME: it would be even better to check specifically for token-auth, 

850 # but there is currently no API for this. 

851 self.login_available 

852 ) 

853 and ( 

854 self.allow_origin 

855 or self.allow_origin_pat 

856 or "Access-Control-Allow-Origin" in self.settings.get("headers", {}) 

857 ) 

858 ): 

859 self.set_header("Access-Control-Allow-Origin", self.request.headers.get("Origin", "")) 

860 

861 

862class Template404(JupyterHandler): 

863 """Render our 404 template""" 

864 

865 async def prepare(self) -> None: # type:ignore[override] 

866 """Prepare a 404 response.""" 

867 await super().prepare() 

868 raise web.HTTPError(404) 

869 

870 

871class AuthenticatedFileHandler(JupyterHandler, web.StaticFileHandler): 

872 """static files should only be accessible when logged in""" 

873 

874 auth_resource = "contents" 

875 

876 @property 

877 def content_security_policy(self) -> str: 

878 # In case we're serving HTML/SVG, confine any Javascript to a unique 

879 # origin so it can't interact with the Jupyter server. 

880 return super().content_security_policy + "; sandbox allow-scripts" 

881 

882 @web.authenticated 

883 @authorized 

884 def head(self, path: str) -> Awaitable[None]: # type:ignore[override] 

885 """Get the head response for a path.""" 

886 self.check_xsrf_cookie() 

887 return super().head(path) 

888 

889 @web.authenticated 

890 @authorized 

891 def get( # type:ignore[override] 

892 self, path: str, **kwargs: Any 

893 ) -> Awaitable[None]: 

894 """Get a file by path.""" 

895 self.check_xsrf_cookie() 

896 if os.path.splitext(path)[1] == ".ipynb" or self.get_argument("download", None): 

897 name = path.rsplit("/", 1)[-1] 

898 self.set_attachment_header(name) 

899 

900 return web.StaticFileHandler.get(self, path, **kwargs) 

901 

902 def get_content_type(self) -> str: 

903 """Get the content type.""" 

904 assert self.absolute_path is not None 

905 path = self.absolute_path.strip("/") 

906 if "/" in path: 

907 _, name = path.rsplit("/", 1) 

908 else: 

909 name = path 

910 if name.endswith(".ipynb"): 

911 return "application/x-ipynb+json" 

912 else: 

913 cur_mime = mimetypes.guess_type(name)[0] 

914 if cur_mime == "text/plain": 

915 return "text/plain; charset=UTF-8" 

916 else: 

917 return super().get_content_type() 

918 

919 def set_headers(self) -> None: 

920 """Set the headers.""" 

921 super().set_headers() 

922 # disable browser caching, rely on 304 replies for savings 

923 if "v" not in self.request.arguments: 

924 self.add_header("Cache-Control", "no-cache") 

925 

926 def compute_etag(self) -> str | None: 

927 """Compute the etag.""" 

928 return None 

929 

930 def validate_absolute_path(self, root: str, absolute_path: str) -> str: 

931 """Validate and return the absolute path. 

932 

933 Requires tornado 3.1 

934 

935 Adding to tornado's own handling, forbids the serving of hidden files. 

936 """ 

937 abs_path = super().validate_absolute_path(root, absolute_path) 

938 abs_root = os.path.abspath(root) 

939 assert abs_path is not None 

940 if not self.contents_manager.allow_hidden and is_hidden(abs_path, abs_root): 

941 self.log.info( 

942 "Refusing to serve hidden file, via 404 Error, use flag 'ContentsManager.allow_hidden' to enable" 

943 ) 

944 raise web.HTTPError(404) 

945 return abs_path 

946 

947 

948def json_errors(method: Any) -> Any: # pragma: no cover 

949 """Decorate methods with this to return GitHub style JSON errors. 

950 

951 This should be used on any JSON API on any handler method that can raise HTTPErrors. 

952 

953 This will grab the latest HTTPError exception using sys.exc_info 

954 and then: 

955 

956 1. Set the HTTP status code based on the HTTPError 

957 2. Create and return a JSON body with a message field describing 

958 the error in a human readable form. 

959 """ 

960 warnings.warn( 

961 "@json_errors is deprecated in notebook 5.2.0. Subclass APIHandler instead.", 

962 DeprecationWarning, 

963 stacklevel=2, 

964 ) 

965 

966 @functools.wraps(method) 

967 def wrapper(self, *args, **kwargs): 

968 self.write_error = types.MethodType(APIHandler.write_error, self) 

969 return method(self, *args, **kwargs) 

970 

971 return wrapper 

972 

973 

974# ----------------------------------------------------------------------------- 

975# File handler 

976# ----------------------------------------------------------------------------- 

977 

978# to minimize subclass changes: 

979HTTPError = web.HTTPError 

980 

981 

982class FileFindHandler(JupyterHandler, web.StaticFileHandler): 

983 """subclass of StaticFileHandler for serving files from a search path 

984 

985 The setting "static_immutable_cache" can be set up to serve some static 

986 file as immutable (e.g. file name containing a hash). The setting is a 

987 list of base URL, every static file URL starting with one of those will 

988 be immutable. 

989 """ 

990 

991 # cache search results, don't search for files more than once 

992 _static_paths: dict[str, str] = {} 

993 root: tuple[str] # type:ignore[assignment] 

994 

995 def set_headers(self) -> None: 

996 """Set the headers.""" 

997 super().set_headers() 

998 

999 immutable_paths = self.settings.get("static_immutable_cache", []) 

1000 

1001 # allow immutable cache for files 

1002 if any(self.request.path.startswith(path) for path in immutable_paths): 

1003 self.set_header("Cache-Control", "public, max-age=31536000, immutable") 

1004 

1005 # disable browser caching, rely on 304 replies for savings 

1006 elif "v" not in self.request.arguments or any( 

1007 self.request.path.startswith(path) for path in self.no_cache_paths 

1008 ): 

1009 self.set_header("Cache-Control", "no-cache") 

1010 

1011 def initialize( 

1012 self, 

1013 path: str | list[str], 

1014 default_filename: str | None = None, 

1015 no_cache_paths: list[str] | None = None, 

1016 ) -> None: 

1017 """Initialize the file find handler.""" 

1018 self.no_cache_paths = no_cache_paths or [] 

1019 

1020 if isinstance(path, str): 

1021 path = [path] 

1022 

1023 self.root = tuple(os.path.abspath(os.path.expanduser(p)) + os.sep for p in path) # type:ignore[assignment] 

1024 self.default_filename = default_filename 

1025 

1026 def compute_etag(self) -> str | None: 

1027 """Compute the etag.""" 

1028 return None 

1029 

1030 # access is allowed as this class is used to serve static assets on login page 

1031 # TODO: create an allow-list of files used on login page and remove this decorator 

1032 @allow_unauthenticated 

1033 def get(self, path: str, include_body: bool = True) -> Coroutine[Any, Any, None]: 

1034 return super().get(path, include_body) 

1035 

1036 # access is allowed as this class is used to serve static assets on login page 

1037 # TODO: create an allow-list of files used on login page and remove this decorator 

1038 @allow_unauthenticated 

1039 def head(self, path: str) -> Awaitable[None]: 

1040 return super().head(path) 

1041 

1042 @classmethod 

1043 def get_absolute_path(cls, roots: Sequence[str], path: str) -> str: 

1044 """locate a file to serve on our static file search path""" 

1045 with cls._lock: 

1046 if path in cls._static_paths: 

1047 return cls._static_paths[path] 

1048 try: 

1049 abspath = os.path.abspath(filefind(path, roots)) 

1050 except OSError: 

1051 # IOError means not found 

1052 return "" 

1053 

1054 cls._static_paths[path] = abspath 

1055 

1056 log().debug(f"Path {path} served from {abspath}") 

1057 return abspath 

1058 

1059 def validate_absolute_path(self, root: str, absolute_path: str) -> str | None: 

1060 """check if the file should be served (raises 404, 403, etc.)""" 

1061 if not absolute_path: 

1062 raise web.HTTPError(404) 

1063 

1064 for root in self.root: # noqa: PLR1704 

1065 if (absolute_path + os.sep).startswith(root): 

1066 break 

1067 

1068 return super().validate_absolute_path(root, absolute_path) 

1069 

1070 

1071class APIVersionHandler(APIHandler): 

1072 """An API handler for the server version.""" 

1073 

1074 _track_activity = False 

1075 

1076 @allow_unauthenticated 

1077 def get(self) -> None: 

1078 """Get the server version info.""" 

1079 # not authenticated, so give as few info as possible 

1080 self.finish(json.dumps({"version": jupyter_server.__version__})) 

1081 

1082 

1083class TrailingSlashHandler(web.RequestHandler): 

1084 """Simple redirect handler that strips trailing slashes 

1085 

1086 This should be the first, highest priority handler. 

1087 """ 

1088 

1089 @allow_unauthenticated 

1090 def get(self) -> None: 

1091 """Handle trailing slashes in a get.""" 

1092 assert self.request.uri is not None 

1093 path, *rest = self.request.uri.partition("?") 

1094 # trim trailing *and* leading / 

1095 # to avoid misinterpreting repeated '//' 

1096 path = "/" + path.strip("/") 

1097 new_uri = "".join([path, *rest]) 

1098 self.redirect(new_uri) 

1099 

1100 post = put = get 

1101 

1102 

1103class MainHandler(JupyterHandler): 

1104 """Simple handler for base_url.""" 

1105 

1106 @allow_unauthenticated 

1107 def get(self) -> None: 

1108 """Get the main template.""" 

1109 html = self.render_template("main.html") 

1110 self.write(html) 

1111 

1112 post = put = get 

1113 

1114 

1115class FilesRedirectHandler(JupyterHandler): 

1116 """Handler for redirecting relative URLs to the /files/ handler""" 

1117 

1118 @staticmethod 

1119 async def redirect_to_files(self: Any, path: str) -> None: # noqa: PLW0211 

1120 """make redirect logic a reusable static method 

1121 

1122 so it can be called from other handlers. 

1123 """ 

1124 cm = self.contents_manager 

1125 if await ensure_async(cm.dir_exists(path)): 

1126 # it's a *directory*, redirect to /tree 

1127 url = url_path_join(self.base_url, "tree", url_escape(path)) 

1128 else: 

1129 orig_path = path 

1130 # otherwise, redirect to /files 

1131 parts = path.split("/") 

1132 

1133 if not await ensure_async(cm.file_exists(path=path)) and "files" in parts: 

1134 # redirect without files/ iff it would 404 

1135 # this preserves pre-2.0-style 'files/' links 

1136 self.log.warning("Deprecated files/ URL: %s", orig_path) 

1137 parts.remove("files") 

1138 path = "/".join(parts) 

1139 

1140 if not await ensure_async(cm.file_exists(path=path)): 

1141 raise web.HTTPError(404) 

1142 

1143 url = url_path_join(self.base_url, "files", url_escape(path)) 

1144 self.log.debug("Redirecting %s to %s", self.request.path, url) 

1145 self.redirect(url) 

1146 

1147 @allow_unauthenticated 

1148 async def get(self, path: str = "") -> None: 

1149 return await self.redirect_to_files(self, path) 

1150 

1151 

1152class RedirectWithParams(web.RequestHandler): 

1153 """Same as web.RedirectHandler, but preserves URL parameters""" 

1154 

1155 def initialize(self, url: str, permanent: bool = True) -> None: 

1156 """Initialize a redirect handler.""" 

1157 self._url = url 

1158 self._permanent = permanent 

1159 

1160 @allow_unauthenticated 

1161 def get(self) -> None: 

1162 """Get a redirect.""" 

1163 sep = "&" if "?" in self._url else "?" 

1164 url = sep.join([self._url, self.request.query]) 

1165 self.redirect(url, permanent=self._permanent) 

1166 

1167 

1168class PrometheusMetricsHandler(JupyterHandler): 

1169 """ 

1170 Return prometheus metrics for this server 

1171 """ 

1172 

1173 @allow_unauthenticated 

1174 def get(self) -> None: 

1175 """Get prometheus metrics.""" 

1176 if self.settings["authenticate_prometheus"] and not self.logged_in: 

1177 raise web.HTTPError(403) 

1178 

1179 self.set_header("Content-Type", prometheus_client.CONTENT_TYPE_LATEST) 

1180 self.write(prometheus_client.generate_latest(prometheus_client.REGISTRY)) 

1181 

1182 

1183class PublicStaticFileHandler(web.StaticFileHandler): 

1184 """Same as web.StaticFileHandler, but decorated to acknowledge that auth is not required.""" 

1185 

1186 @allow_unauthenticated 

1187 def head(self, path: str) -> Awaitable[None]: 

1188 return super().head(path) 

1189 

1190 @allow_unauthenticated 

1191 def get(self, path: str, include_body: bool = True) -> Coroutine[Any, Any, None]: 

1192 return super().get(path, include_body) 

1193 

1194 

1195# ----------------------------------------------------------------------------- 

1196# URL pattern fragments for reuse 

1197# ----------------------------------------------------------------------------- 

1198 

1199# path matches any number of `/foo[/bar...]` or just `/` or '' 

1200path_regex = r"(?P<path>(?:(?:/[^/]+)+|/?))" 

1201 

1202# ----------------------------------------------------------------------------- 

1203# URL to handler mappings 

1204# ----------------------------------------------------------------------------- 

1205 

1206 

1207default_handlers = [ 

1208 (r".*/", TrailingSlashHandler), 

1209 (r"api", APIVersionHandler), 

1210 (r"/(robots\.txt|favicon\.ico)", PublicStaticFileHandler), 

1211 (r"/metrics", PrometheusMetricsHandler), 

1212]