Coverage for /pythoncovmergedfiles/medio/medio/src/jupyter_server/jupyter_server/base/handlers.py: 33%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

565 statements  

1"""Base Tornado handlers for the Jupyter server.""" 

2 

3# Copyright (c) Jupyter Development Team. 

4# Distributed under the terms of the Modified BSD License. 

5from __future__ import annotations 

6 

7import functools 

8import inspect 

9import ipaddress 

10import json 

11import mimetypes 

12import os 

13import re 

14import types 

15import warnings 

16from collections.abc import Awaitable, Coroutine, Sequence 

17from http.client import responses 

18from logging import Logger 

19from typing import TYPE_CHECKING, Any, cast 

20from urllib.parse import urlparse 

21 

22import prometheus_client 

23from jinja2 import TemplateNotFound 

24from jupyter_core.paths import is_hidden 

25from tornado import web 

26from tornado.log import app_log 

27from traitlets.config import Application 

28 

29import jupyter_server 

30from jupyter_server import CallContext 

31from jupyter_server._sysinfo import get_sys_info 

32from jupyter_server._tz import utcnow 

33from jupyter_server.auth.decorator import allow_unauthenticated, authorized 

34from jupyter_server.auth.identity import User 

35from jupyter_server.i18n import combine_translations 

36from jupyter_server.services.security import csp_report_uri 

37from jupyter_server.utils import ( 

38 ensure_async, 

39 filefind, 

40 url_escape, 

41 url_is_absolute, 

42 url_path_join, 

43 urldecode_unix_socket_path, 

44) 

45 

46if TYPE_CHECKING: 

47 from jupyter_client.kernelspec import KernelSpecManager 

48 from jupyter_events import EventLogger 

49 from jupyter_server_terminals.terminalmanager import TerminalManager 

50 from tornado.concurrent import Future 

51 

52 from jupyter_server.auth.authorizer import Authorizer 

53 from jupyter_server.auth.identity import IdentityProvider 

54 from jupyter_server.serverapp import ServerApp 

55 from jupyter_server.services.config.manager import ConfigManager 

56 from jupyter_server.services.contents.manager import ContentsManager 

57 from jupyter_server.services.kernels.kernelmanager import AsyncMappingKernelManager 

58 from jupyter_server.services.sessions.sessionmanager import SessionManager 

59 

60# ----------------------------------------------------------------------------- 

61# Top-level handlers 

62# ----------------------------------------------------------------------------- 

63 

64_sys_info_cache = None 

65 

66 

67def json_sys_info(): 

68 """Get sys info as json.""" 

69 global _sys_info_cache # noqa: PLW0603 

70 if _sys_info_cache is None: 

71 _sys_info_cache = json.dumps(get_sys_info()) 

72 return _sys_info_cache 

73 

74 

75def log() -> Logger: 

76 """Get the application log.""" 

77 if Application.initialized(): 

78 return cast(Logger, Application.instance().log) 

79 else: 

80 return app_log 

81 

82 

83class AuthenticatedHandler(web.RequestHandler): 

84 """A RequestHandler with an authenticated user.""" 

85 

86 @property 

87 def base_url(self) -> str: 

88 return cast(str, self.settings.get("base_url", "/")) 

89 

90 @property 

91 def content_security_policy(self) -> str: 

92 """The default Content-Security-Policy header 

93 

94 Can be overridden by defining Content-Security-Policy in settings['headers'] 

95 """ 

96 if "Content-Security-Policy" in self.settings.get("headers", {}): 

97 # user-specified, don't override 

98 return cast(str, self.settings["headers"]["Content-Security-Policy"]) 

99 

100 return "; ".join( 

101 [ 

102 "frame-ancestors 'self'", 

103 # Make sure the report-uri is relative to the base_url 

104 "report-uri " 

105 + self.settings.get("csp_report_uri", url_path_join(self.base_url, csp_report_uri)), 

106 ] 

107 ) 

108 

109 def set_default_headers(self) -> None: 

110 """Set the default headers.""" 

111 headers = {} 

112 headers["X-Content-Type-Options"] = "nosniff" 

113 headers.update(self.settings.get("headers", {})) 

114 

115 headers["Content-Security-Policy"] = self.content_security_policy 

116 

117 # Allow for overriding headers 

118 for header_name, value in headers.items(): 

119 try: 

120 self.set_header(header_name, value) 

121 except Exception as e: 

122 # tornado raise Exception (not a subclass) 

123 # if method is unsupported (websocket and Access-Control-Allow-Origin 

124 # for example, so just ignore) 

125 self.log.exception( # type:ignore[attr-defined] 

126 "Could not set default headers: %s", e 

127 ) 

128 

129 @property 

130 def cookie_name(self) -> str: 

131 warnings.warn( 

132 """JupyterHandler.login_handler is deprecated in 2.0, 

133 use JupyterHandler.identity_provider. 

134 """, 

135 DeprecationWarning, 

136 stacklevel=2, 

137 ) 

138 return self.identity_provider.get_cookie_name(self) 

139 

140 def force_clear_cookie(self, name: str, path: str = "/", domain: str | None = None) -> None: 

141 """Force a cookie clear.""" 

142 warnings.warn( 

143 """JupyterHandler.login_handler is deprecated in 2.0, 

144 use JupyterHandler.identity_provider. 

145 """, 

146 DeprecationWarning, 

147 stacklevel=2, 

148 ) 

149 self.identity_provider._force_clear_cookie(self, name, path=path, domain=domain) 

150 

151 def clear_login_cookie(self) -> None: 

152 """Clear a login cookie.""" 

153 warnings.warn( 

154 """JupyterHandler.login_handler is deprecated in 2.0, 

155 use JupyterHandler.identity_provider. 

156 """, 

157 DeprecationWarning, 

158 stacklevel=2, 

159 ) 

160 self.identity_provider.clear_login_cookie(self) 

161 

162 def get_current_user(self) -> str: 

163 """Get the current user.""" 

164 clsname = self.__class__.__name__ 

165 msg = ( 

166 f"Calling `{clsname}.get_current_user()` directly is deprecated in jupyter-server 2.0." 

167 " Use `self.current_user` instead (works in all versions)." 

168 ) 

169 if hasattr(self, "_jupyter_current_user"): 

170 # backward-compat: return _jupyter_current_user 

171 warnings.warn( 

172 msg, 

173 DeprecationWarning, 

174 stacklevel=2, 

175 ) 

176 return cast(str, self._jupyter_current_user) 

177 # haven't called get_user in prepare, raise 

178 raise RuntimeError(msg) 

179 

180 def skip_check_origin(self) -> bool: 

181 """Ask my login_handler if I should skip the origin_check 

182 

183 For example: in the default LoginHandler, if a request is token-authenticated, 

184 origin checking should be skipped. 

185 """ 

186 if self.request.method == "OPTIONS": 

187 # no origin-check on options requests, which are used to check origins! 

188 return True 

189 return not self.identity_provider.should_check_origin(self) 

190 

191 @property 

192 def token_authenticated(self) -> bool: 

193 """Have I been authenticated with a token?""" 

194 return self.identity_provider.is_token_authenticated(self) 

195 

196 @property 

197 def logged_in(self) -> bool: 

198 """Is a user currently logged in?""" 

199 user = self.current_user 

200 return bool(user and user != "anonymous") 

201 

202 @property 

203 def login_handler(self) -> Any: 

204 """Return the login handler for this application, if any.""" 

205 warnings.warn( 

206 """JupyterHandler.login_handler is deprecated in 2.0, 

207 use JupyterHandler.identity_provider. 

208 """, 

209 DeprecationWarning, 

210 stacklevel=2, 

211 ) 

212 return self.identity_provider.login_handler_class 

213 

214 @property 

215 def token(self) -> str | None: 

216 """Return the login token for this application, if any.""" 

217 return self.identity_provider.token 

218 

219 @property 

220 def login_available(self) -> bool: 

221 """May a user proceed to log in? 

222 

223 This returns True if login capability is available, irrespective of 

224 whether the user is already logged in or not. 

225 

226 """ 

227 return cast(bool, self.identity_provider.login_available) 

228 

229 @property 

230 def authorizer(self) -> Authorizer: 

231 if "authorizer" not in self.settings: 

232 warnings.warn( 

233 "The Tornado web application does not have an 'authorizer' defined " 

234 "in its settings. In future releases of jupyter_server, this will " 

235 "be a required key for all subclasses of `JupyterHandler`. For an " 

236 "example, see the jupyter_server source code for how to " 

237 "add an authorizer to the tornado settings: " 

238 "https://github.com/jupyter-server/jupyter_server/blob/" 

239 "653740cbad7ce0c8a8752ce83e4d3c2c754b13cb/jupyter_server/serverapp.py" 

240 "#L234-L256", 

241 stacklevel=2, 

242 ) 

243 from jupyter_server.auth import AllowAllAuthorizer 

244 

245 self.settings["authorizer"] = AllowAllAuthorizer( 

246 config=self.settings.get("config", None), 

247 identity_provider=self.identity_provider, 

248 ) 

249 

250 return cast("Authorizer", self.settings.get("authorizer")) 

251 

252 @property 

253 def identity_provider(self) -> IdentityProvider: 

254 if "identity_provider" not in self.settings: 

255 warnings.warn( 

256 "The Tornado web application does not have an 'identity_provider' defined " 

257 "in its settings. In future releases of jupyter_server, this will " 

258 "be a required key for all subclasses of `JupyterHandler`. For an " 

259 "example, see the jupyter_server source code for how to " 

260 "add an identity provider to the tornado settings: " 

261 "https://github.com/jupyter-server/jupyter_server/blob/v2.0.0/" 

262 "jupyter_server/serverapp.py#L242", 

263 stacklevel=2, 

264 ) 

265 from jupyter_server.auth import IdentityProvider 

266 

267 # no identity provider set, load default 

268 self.settings["identity_provider"] = IdentityProvider( 

269 config=self.settings.get("config", None) 

270 ) 

271 return cast("IdentityProvider", self.settings["identity_provider"]) 

272 

273 

274class JupyterHandler(AuthenticatedHandler): 

275 """Jupyter-specific extensions to authenticated handling 

276 

277 Mostly property shortcuts to Jupyter-specific settings. 

278 """ 

279 

280 @property 

281 def config(self) -> dict[str, Any] | None: 

282 return cast("dict[str, Any] | None", self.settings.get("config", None)) 

283 

284 @property 

285 def log(self) -> Logger: 

286 """use the Jupyter log by default, falling back on tornado's logger""" 

287 return log() 

288 

289 @property 

290 def jinja_template_vars(self) -> dict[str, Any]: 

291 """User-supplied values to supply to jinja templates.""" 

292 return cast("dict[str, Any]", self.settings.get("jinja_template_vars", {})) 

293 

294 @property 

295 def serverapp(self) -> ServerApp | None: 

296 return cast("ServerApp | None", self.settings["serverapp"]) 

297 

298 # --------------------------------------------------------------- 

299 # URLs 

300 # --------------------------------------------------------------- 

301 

302 @property 

303 def version_hash(self) -> str: 

304 """The version hash to use for cache hints for static files""" 

305 return cast(str, self.settings.get("version_hash", "")) 

306 

307 @property 

308 def mathjax_url(self) -> str: 

309 url = cast(str, self.settings.get("mathjax_url", "")) 

310 if not url or url_is_absolute(url): 

311 return url 

312 return url_path_join(self.base_url, url) 

313 

314 @property 

315 def mathjax_config(self) -> str: 

316 return cast(str, self.settings.get("mathjax_config", "TeX-AMS-MML_HTMLorMML-full,Safe")) 

317 

318 @property 

319 def default_url(self) -> str: 

320 return cast(str, self.settings.get("default_url", "")) 

321 

322 @property 

323 def ws_url(self) -> str: 

324 return cast(str, self.settings.get("websocket_url", "")) 

325 

326 @property 

327 def contents_js_source(self) -> str: 

328 self.log.debug( 

329 "Using contents: %s", 

330 self.settings.get("contents_js_source", "services/contents"), 

331 ) 

332 return cast(str, self.settings.get("contents_js_source", "services/contents")) 

333 

334 # --------------------------------------------------------------- 

335 # Manager objects 

336 # --------------------------------------------------------------- 

337 

338 @property 

339 def kernel_manager(self) -> AsyncMappingKernelManager: 

340 return cast("AsyncMappingKernelManager", self.settings["kernel_manager"]) 

341 

342 @property 

343 def contents_manager(self) -> ContentsManager: 

344 return cast("ContentsManager", self.settings["contents_manager"]) 

345 

346 @property 

347 def session_manager(self) -> SessionManager: 

348 return cast("SessionManager", self.settings["session_manager"]) 

349 

350 @property 

351 def terminal_manager(self) -> TerminalManager: 

352 return cast("TerminalManager", self.settings["terminal_manager"]) 

353 

354 @property 

355 def kernel_spec_manager(self) -> KernelSpecManager: 

356 return cast("KernelSpecManager", self.settings["kernel_spec_manager"]) 

357 

358 @property 

359 def config_manager(self) -> ConfigManager: 

360 return cast("ConfigManager", self.settings["config_manager"]) 

361 

362 @property 

363 def event_logger(self) -> EventLogger: 

364 return cast("EventLogger", self.settings["event_logger"]) 

365 

366 # --------------------------------------------------------------- 

367 # CORS 

368 # --------------------------------------------------------------- 

369 

370 @property 

371 def allow_origin(self) -> str: 

372 """Normal Access-Control-Allow-Origin""" 

373 return cast(str, self.settings.get("allow_origin", "")) 

374 

375 @property 

376 def allow_origin_pat(self) -> str | None: 

377 """Regular expression version of allow_origin""" 

378 return cast("str | None", self.settings.get("allow_origin_pat", None)) 

379 

380 @property 

381 def allow_credentials(self) -> bool: 

382 """Whether to set Access-Control-Allow-Credentials""" 

383 return cast(bool, self.settings.get("allow_credentials", False)) 

384 

385 def set_default_headers(self) -> None: 

386 """Add CORS headers, if defined""" 

387 super().set_default_headers() 

388 

389 def set_cors_headers(self) -> None: 

390 """Add CORS headers, if defined 

391 

392 Now that current_user is async (jupyter-server 2.0), 

393 must be called at the end of prepare(), instead of in set_default_headers. 

394 """ 

395 if self.allow_origin: 

396 self.set_header("Access-Control-Allow-Origin", self.allow_origin) 

397 elif self.allow_origin_pat: 

398 origin = self.get_origin() 

399 if origin and re.match(self.allow_origin_pat, origin): 

400 self.set_header("Access-Control-Allow-Origin", origin) 

401 elif self.token_authenticated and "Access-Control-Allow-Origin" not in self.settings.get( 

402 "headers", {} 

403 ): 

404 # allow token-authenticated requests cross-origin by default. 

405 # only apply this exception if allow-origin has not been specified. 

406 self.set_header("Access-Control-Allow-Origin", self.request.headers.get("Origin", "")) 

407 

408 if self.allow_credentials: 

409 self.set_header("Access-Control-Allow-Credentials", "true") 

410 

411 def set_attachment_header(self, filename: str) -> None: 

412 """Set Content-Disposition: attachment header 

413 

414 As a method to ensure handling of filename encoding 

415 """ 

416 escaped_filename = url_escape(filename) 

417 self.set_header( 

418 "Content-Disposition", 

419 f"attachment; filename*=utf-8''{escaped_filename}", 

420 ) 

421 

422 def get_origin(self) -> str | None: 

423 # Handle WebSocket Origin naming convention differences 

424 # The difference between version 8 and 13 is that in 8 the 

425 # client sends a "Sec-Websocket-Origin" header and in 13 it's 

426 # simply "Origin". 

427 if "Origin" in self.request.headers: 

428 origin = self.request.headers.get("Origin") 

429 else: 

430 origin = self.request.headers.get("Sec-Websocket-Origin", None) 

431 return origin 

432 

433 # origin_to_satisfy_tornado is present because tornado requires 

434 # check_origin to take an origin argument, but we don't use it 

435 def check_origin(self, origin_to_satisfy_tornado: str = "") -> bool: 

436 """Check Origin for cross-site API requests, including websockets 

437 

438 Copied from WebSocket with changes: 

439 

440 - allow unspecified host/origin (e.g. scripts) 

441 - allow token-authenticated requests 

442 """ 

443 if self.allow_origin == "*" or self.skip_check_origin(): 

444 return True 

445 

446 host = self.request.headers.get("Host") 

447 origin = self.request.headers.get("Origin") 

448 

449 # If no header is provided, let the request through. 

450 # Origin can be None for: 

451 # - same-origin (IE, Firefox) 

452 # - Cross-site POST form (IE, Firefox) 

453 # - Scripts 

454 # The cross-site POST (XSRF) case is handled by tornado's xsrf_token 

455 if origin is None or host is None: 

456 return True 

457 

458 origin = origin.lower() 

459 origin_host = urlparse(origin).netloc 

460 

461 # OK if origin matches host 

462 if origin_host == host: 

463 return True 

464 

465 # Check CORS headers 

466 if self.allow_origin: 

467 allow = bool(self.allow_origin == origin) 

468 elif self.allow_origin_pat: 

469 allow = bool(re.match(self.allow_origin_pat, origin)) 

470 else: 

471 # No CORS headers deny the request 

472 allow = False 

473 if not allow: 

474 self.log.warning( 

475 "Blocking Cross Origin API request for %s. Origin: %s, Host: %s", 

476 self.request.path, 

477 origin, 

478 host, 

479 ) 

480 return allow 

481 

482 def check_referer(self) -> bool: 

483 """Check Referer for cross-site requests. 

484 Disables requests to certain endpoints with 

485 external or missing Referer. 

486 If set, allow_origin settings are applied to the Referer 

487 to whitelist specific cross-origin sites. 

488 Used on GET for api endpoints and /files/ 

489 to block cross-site inclusion (XSSI). 

490 """ 

491 if self.allow_origin == "*" or self.skip_check_origin(): 

492 return True 

493 

494 host = self.request.headers.get("Host") 

495 referer = self.request.headers.get("Referer") 

496 

497 if not host: 

498 self.log.warning("Blocking request with no host") 

499 return False 

500 if not referer: 

501 self.log.warning("Blocking request with no referer") 

502 return False 

503 

504 referer_url = urlparse(referer) 

505 referer_host = referer_url.netloc 

506 if referer_host == host: 

507 return True 

508 

509 # apply cross-origin checks to Referer: 

510 origin = f"{referer_url.scheme}://{referer_url.netloc}" 

511 if self.allow_origin: 

512 allow = self.allow_origin == origin 

513 elif self.allow_origin_pat: 

514 allow = bool(re.match(self.allow_origin_pat, origin)) 

515 else: 

516 # No CORS settings, deny the request 

517 allow = False 

518 

519 if not allow: 

520 self.log.warning( 

521 "Blocking Cross Origin request for %s. Referer: %s, Host: %s", 

522 self.request.path, 

523 origin, 

524 host, 

525 ) 

526 return allow 

527 

528 def check_xsrf_cookie(self) -> None: 

529 """Bypass xsrf cookie checks when token-authenticated""" 

530 if not hasattr(self, "_jupyter_current_user"): 

531 # Called too early, will be checked later 

532 return None 

533 if self.token_authenticated or self.settings.get("disable_check_xsrf", False): 

534 # Token-authenticated requests do not need additional XSRF-check 

535 # Servers without authentication are vulnerable to XSRF 

536 return None 

537 try: 

538 if not self.check_origin(): 

539 raise web.HTTPError(404) 

540 return super().check_xsrf_cookie() 

541 except web.HTTPError as e: 

542 if self.request.method in {"GET", "HEAD"}: 

543 # Consider Referer a sufficient cross-origin check for GET requests 

544 if not self.check_referer(): 

545 referer = self.request.headers.get("Referer") 

546 if referer: 

547 msg = f"Blocking Cross Origin request from {referer}." 

548 else: 

549 msg = "Blocking request from unknown origin" 

550 raise web.HTTPError(403, msg) from e 

551 else: 

552 raise 

553 

554 def check_host(self) -> bool: 

555 """Check the host header if remote access disallowed. 

556 

557 Returns True if the request should continue, False otherwise. 

558 """ 

559 if self.settings.get("allow_remote_access", False): 

560 return True 

561 

562 # Remove port (e.g. ':8888') from host 

563 match = re.match(r"^(.*?)(:\d+)?$", self.request.host) 

564 assert match is not None 

565 host = match.group(1) 

566 

567 # Browsers format IPv6 addresses like [::1]; we need to remove the [] 

568 if host.startswith("[") and host.endswith("]"): 

569 host = host[1:-1] 

570 

571 # UNIX socket handling 

572 check_host = urldecode_unix_socket_path(host) 

573 if check_host.startswith("/") and os.path.exists(check_host): 

574 allow = True 

575 else: 

576 try: 

577 addr = ipaddress.ip_address(host) 

578 except ValueError: 

579 # Not an IP address: check against hostnames 

580 allow = host in self.settings.get("local_hostnames", ["localhost"]) 

581 else: 

582 allow = addr.is_loopback 

583 

584 if not allow: 

585 self.log.warning( 

586 ( 

587 "Blocking request with non-local 'Host' %s (%s). " 

588 "If the server should be accessible at that name, " 

589 "set ServerApp.allow_remote_access to disable the check." 

590 ), 

591 host, 

592 self.request.host, 

593 ) 

594 return allow 

595 

596 async def prepare(self, *, _redirect_to_login=True) -> Awaitable[None] | None: # type:ignore[override] 

597 """Prepare a response.""" 

598 # Set the current Jupyter Handler context variable. 

599 CallContext.set(CallContext.JUPYTER_HANDLER, self) 

600 

601 if not self.check_host(): 

602 self.current_user = self._jupyter_current_user = None 

603 raise web.HTTPError(403) 

604 

605 from jupyter_server.auth import IdentityProvider 

606 

607 mod_obj = inspect.getmodule(self.get_current_user) 

608 assert mod_obj is not None 

609 user: User | None = None 

610 

611 if type(self.identity_provider) is IdentityProvider and mod_obj.__name__ != __name__: 

612 # check for overridden get_current_user + default IdentityProvider 

613 # deprecated way to override auth (e.g. JupyterHub < 3.0) 

614 # allow deprecated, overridden get_current_user 

615 warnings.warn( 

616 "Overriding JupyterHandler.get_current_user is deprecated in jupyter-server 2.0." 

617 " Use an IdentityProvider class.", 

618 DeprecationWarning, 

619 stacklevel=1, 

620 ) 

621 user = User(self.get_current_user()) 

622 else: 

623 _user = self.identity_provider.get_user(self) 

624 if isinstance(_user, Awaitable): 

625 # IdentityProvider.get_user _may_ be async 

626 _user = await _user 

627 user = _user 

628 

629 # self.current_user for tornado's @web.authenticated 

630 # self._jupyter_current_user for backward-compat in deprecated get_current_user calls 

631 # and our own private checks for whether .current_user has been set 

632 self.current_user = self._jupyter_current_user = user 

633 # complete initial steps which require auth to resolve first: 

634 self.set_cors_headers() 

635 if self.request.method not in {"GET", "HEAD", "OPTIONS"}: 

636 self.check_xsrf_cookie() 

637 

638 if not self.settings.get("allow_unauthenticated_access", False): 

639 if not self.request.method: 

640 raise HTTPError(403) 

641 method = getattr(self, self.request.method.lower()) 

642 if not getattr(method, "__allow_unauthenticated", False): 

643 if _redirect_to_login: 

644 # reuse `web.authenticated` logic, which redirects to the login 

645 # page on GET and HEAD and otherwise raises 403 

646 return web.authenticated(lambda _: super().prepare())(self) 

647 else: 

648 # raise 403 if user is not known without redirecting to login page 

649 user = self.current_user 

650 if user is None: 

651 self.log.warning( 

652 f"Couldn't authenticate {self.__class__.__name__} connection" 

653 ) 

654 raise web.HTTPError(403) 

655 

656 return super().prepare() 

657 

658 # --------------------------------------------------------------- 

659 # template rendering 

660 # --------------------------------------------------------------- 

661 

662 def get_template(self, name): 

663 """Return the jinja template object for a given name""" 

664 return self.settings["jinja2_env"].get_template(name) 

665 

666 def render_template(self, name, **ns): 

667 """Render a template by name.""" 

668 ns.update(self.template_namespace) 

669 template = self.get_template(name) 

670 return template.render(**ns) 

671 

672 @property 

673 def template_namespace(self) -> dict[str, Any]: 

674 return dict( 

675 base_url=self.base_url, 

676 default_url=self.default_url, 

677 ws_url=self.ws_url, 

678 logged_in=self.logged_in, 

679 allow_password_change=getattr(self.identity_provider, "allow_password_change", False), 

680 auth_enabled=self.identity_provider.auth_enabled, 

681 login_available=self.identity_provider.login_available, 

682 token_available=bool(self.token), 

683 static_url=self.static_url, 

684 sys_info=json_sys_info(), 

685 contents_js_source=self.contents_js_source, 

686 version_hash=self.version_hash, 

687 xsrf_form_html=self.xsrf_form_html, 

688 token=self.token, 

689 xsrf_token=self.xsrf_token.decode("utf8"), 

690 nbjs_translations=json.dumps( 

691 combine_translations(self.request.headers.get("Accept-Language", "")) 

692 ), 

693 **self.jinja_template_vars, 

694 ) 

695 

696 def get_json_body(self) -> dict[str, Any] | None: 

697 """Return the body of the request as JSON data.""" 

698 if not self.request.body: 

699 return None 

700 # Do we need to call body.decode('utf-8') here? 

701 body = self.request.body.strip().decode("utf-8") 

702 try: 

703 model = json.loads(body) 

704 except Exception as e: 

705 self.log.debug("Bad JSON: %r", body) 

706 self.log.error("Couldn't parse JSON", exc_info=True) 

707 raise web.HTTPError(400, "Invalid JSON in body of request") from e 

708 return cast("dict[str, Any]", model) 

709 

710 def write_error(self, status_code: int, **kwargs: Any) -> None: 

711 """render custom error pages""" 

712 exc_info = kwargs.get("exc_info") 

713 message = "" 

714 status_message = responses.get(status_code, "Unknown HTTP Error") 

715 

716 if exc_info: 

717 exception = exc_info[1] 

718 # get the custom message, if defined 

719 try: 

720 message = exception.log_message % exception.args 

721 except Exception: 

722 pass 

723 

724 # construct the custom reason, if defined 

725 reason = getattr(exception, "reason", "") 

726 if reason: 

727 status_message = reason 

728 else: 

729 exception = "(unknown)" 

730 

731 # build template namespace 

732 ns = { 

733 "status_code": status_code, 

734 "status_message": status_message, 

735 "message": message, 

736 "exception": exception, 

737 } 

738 

739 self.set_header("Content-Type", "text/html") 

740 # render the template 

741 try: 

742 html = self.render_template("%s.html" % status_code, **ns) 

743 except TemplateNotFound: 

744 html = self.render_template("error.html", **ns) 

745 

746 self.write(html) 

747 

748 

749class APIHandler(JupyterHandler): 

750 """Base class for API handlers""" 

751 

752 async def prepare(self) -> None: # type:ignore[override] 

753 """Prepare an API response.""" 

754 await super().prepare() 

755 if not self.check_origin(): 

756 raise web.HTTPError(404) 

757 

758 def write_error(self, status_code: int, **kwargs: Any) -> None: 

759 """APIHandler errors are JSON, not human pages""" 

760 self.set_header("Content-Type", "application/json") 

761 message = responses.get(status_code, "Unknown HTTP Error") 

762 reply: dict[str, Any] = { 

763 "message": message, 

764 } 

765 exc_info = kwargs.get("exc_info") 

766 if exc_info: 

767 e = exc_info[1] 

768 if isinstance(e, HTTPError): 

769 reply["message"] = e.log_message or message 

770 reply["reason"] = e.reason 

771 else: 

772 reply["message"] = "Unhandled error" 

773 reply["reason"] = None 

774 # backward-compatibility: traceback field is present, 

775 # but always empty 

776 reply["traceback"] = "" 

777 self.log.warning("wrote error: %r", reply["message"], exc_info=True) 

778 self.finish(json.dumps(reply)) 

779 

780 def get_login_url(self) -> str: 

781 """Get the login url.""" 

782 # if get_login_url is invoked in an API handler, 

783 # that means @web.authenticated is trying to trigger a redirect. 

784 # instead of redirecting, raise 403 instead. 

785 if not self.current_user: 

786 raise web.HTTPError(403) 

787 return super().get_login_url() 

788 

789 @property 

790 def content_security_policy(self) -> str: 

791 csp = "; ".join( # noqa: FLY002 

792 [ 

793 super().content_security_policy, 

794 "default-src 'none'", 

795 ] 

796 ) 

797 return csp 

798 

799 # set _track_activity = False on API handlers that shouldn't track activity 

800 _track_activity = True 

801 

802 def update_api_activity(self) -> None: 

803 """Update last_activity of API requests""" 

804 # record activity of authenticated requests 

805 if ( 

806 self._track_activity 

807 and getattr(self, "_jupyter_current_user", None) 

808 and self.get_argument("no_track_activity", None) is None 

809 ): 

810 self.settings["api_last_activity"] = utcnow() 

811 

812 def finish(self, *args: Any, **kwargs: Any) -> Future[Any]: 

813 """Finish an API response.""" 

814 self.update_api_activity() 

815 # Allow caller to indicate content-type... 

816 set_content_type = kwargs.pop("set_content_type", "application/json") 

817 self.set_header("Content-Type", set_content_type) 

818 return super().finish(*args, **kwargs) 

819 

820 @allow_unauthenticated 

821 def options(self, *args: Any, **kwargs: Any) -> None: 

822 """Get the options.""" 

823 if "Access-Control-Allow-Headers" in self.settings.get("headers", {}): 

824 self.set_header( 

825 "Access-Control-Allow-Headers", 

826 self.settings["headers"]["Access-Control-Allow-Headers"], 

827 ) 

828 else: 

829 self.set_header( 

830 "Access-Control-Allow-Headers", 

831 "accept, content-type, authorization, x-xsrftoken", 

832 ) 

833 self.set_header("Access-Control-Allow-Methods", "GET, PUT, POST, PATCH, DELETE, OPTIONS") 

834 

835 # if authorization header is requested, 

836 # that means the request is token-authenticated. 

837 # avoid browser-side rejection of the preflight request. 

838 # only allow this exception if allow_origin has not been specified 

839 # and Jupyter server authentication is enabled. 

840 # If the token is not valid, the 'real' request will still be rejected. 

841 requested_headers = self.request.headers.get("Access-Control-Request-Headers", "").split( 

842 "," 

843 ) 

844 if ( 

845 requested_headers 

846 and any(h.strip().lower() == "authorization" for h in requested_headers) 

847 and ( 

848 # FIXME: it would be even better to check specifically for token-auth, 

849 # but there is currently no API for this. 

850 self.login_available 

851 ) 

852 and ( 

853 self.allow_origin 

854 or self.allow_origin_pat 

855 or "Access-Control-Allow-Origin" in self.settings.get("headers", {}) 

856 ) 

857 ): 

858 self.set_header("Access-Control-Allow-Origin", self.request.headers.get("Origin", "")) 

859 

860 

861class Template404(JupyterHandler): 

862 """Render our 404 template""" 

863 

864 async def prepare(self) -> None: # type:ignore[override] 

865 """Prepare a 404 response.""" 

866 await super().prepare() 

867 raise web.HTTPError(404) 

868 

869 

870class AuthenticatedFileHandler(JupyterHandler, web.StaticFileHandler): 

871 """static files should only be accessible when logged in""" 

872 

873 auth_resource = "contents" 

874 

875 @property 

876 def content_security_policy(self) -> str: 

877 # In case we're serving HTML/SVG, confine any Javascript to a unique 

878 # origin so it can't interact with the Jupyter server. 

879 return super().content_security_policy + "; sandbox allow-scripts" 

880 

881 @web.authenticated 

882 @authorized 

883 def head(self, path: str) -> Awaitable[None]: # type:ignore[override] 

884 """Get the head response for a path.""" 

885 self.check_xsrf_cookie() 

886 return super().head(path) 

887 

888 @web.authenticated 

889 @authorized 

890 def get( # type:ignore[override] 

891 self, path: str, **kwargs: Any 

892 ) -> Awaitable[None]: 

893 """Get a file by path.""" 

894 self.check_xsrf_cookie() 

895 if os.path.splitext(path)[1] == ".ipynb" or self.get_argument("download", None): 

896 name = path.rsplit("/", 1)[-1] 

897 self.set_attachment_header(name) 

898 

899 return web.StaticFileHandler.get(self, path, **kwargs) 

900 

901 def get_content_type(self) -> str: 

902 """Get the content type.""" 

903 assert self.absolute_path is not None 

904 path = self.absolute_path.strip("/") 

905 if "/" in path: 

906 _, name = path.rsplit("/", 1) 

907 else: 

908 name = path 

909 if name.endswith(".ipynb"): 

910 return "application/x-ipynb+json" 

911 else: 

912 cur_mime = mimetypes.guess_type(name)[0] 

913 if cur_mime == "text/plain": 

914 return "text/plain; charset=UTF-8" 

915 else: 

916 return super().get_content_type() 

917 

918 def set_headers(self) -> None: 

919 """Set the headers.""" 

920 super().set_headers() 

921 # disable browser caching, rely on 304 replies for savings 

922 if "v" not in self.request.arguments: 

923 self.add_header("Cache-Control", "no-cache") 

924 

925 def compute_etag(self) -> str | None: 

926 """Compute the etag.""" 

927 return None 

928 

929 def validate_absolute_path(self, root: str, absolute_path: str) -> str: 

930 """Validate and return the absolute path. 

931 

932 Requires tornado 3.1 

933 

934 Adding to tornado's own handling, forbids the serving of hidden files. 

935 """ 

936 abs_path = super().validate_absolute_path(root, absolute_path) 

937 abs_root = os.path.abspath(root) 

938 assert abs_path is not None 

939 if not self.contents_manager.allow_hidden and is_hidden(abs_path, abs_root): 

940 self.log.info( 

941 "Refusing to serve hidden file, via 404 Error, use flag 'ContentsManager.allow_hidden' to enable" 

942 ) 

943 raise web.HTTPError(404) 

944 return abs_path 

945 

946 

947def json_errors(method: Any) -> Any: # pragma: no cover 

948 """Decorate methods with this to return GitHub style JSON errors. 

949 

950 This should be used on any JSON API on any handler method that can raise HTTPErrors. 

951 

952 This will grab the latest HTTPError exception using sys.exc_info 

953 and then: 

954 

955 1. Set the HTTP status code based on the HTTPError 

956 2. Create and return a JSON body with a message field describing 

957 the error in a human readable form. 

958 """ 

959 warnings.warn( 

960 "@json_errors is deprecated in notebook 5.2.0. Subclass APIHandler instead.", 

961 DeprecationWarning, 

962 stacklevel=2, 

963 ) 

964 

965 @functools.wraps(method) 

966 def wrapper(self, *args, **kwargs): 

967 self.write_error = types.MethodType(APIHandler.write_error, self) 

968 return method(self, *args, **kwargs) 

969 

970 return wrapper 

971 

972 

973# ----------------------------------------------------------------------------- 

974# File handler 

975# ----------------------------------------------------------------------------- 

976 

977# to minimize subclass changes: 

978HTTPError = web.HTTPError 

979 

980 

981class FileFindHandler(JupyterHandler, web.StaticFileHandler): 

982 """subclass of StaticFileHandler for serving files from a search path 

983 

984 The setting "static_immutable_cache" can be set up to serve some static 

985 file as immutable (e.g. file name containing a hash). The setting is a 

986 list of base URL, every static file URL starting with one of those will 

987 be immutable. 

988 """ 

989 

990 # cache search results, don't search for files more than once 

991 _static_paths: dict[str, str] = {} 

992 root: tuple[str] # type:ignore[assignment] 

993 

994 def set_headers(self) -> None: 

995 """Set the headers.""" 

996 super().set_headers() 

997 

998 immutable_paths = self.settings.get("static_immutable_cache", []) 

999 

1000 # allow immutable cache for files 

1001 if any(self.request.path.startswith(path) for path in immutable_paths): 

1002 self.set_header("Cache-Control", "public, max-age=31536000, immutable") 

1003 

1004 # disable browser caching, rely on 304 replies for savings 

1005 elif "v" not in self.request.arguments or any( 

1006 self.request.path.startswith(path) for path in self.no_cache_paths 

1007 ): 

1008 self.set_header("Cache-Control", "no-cache") 

1009 

1010 def initialize( 

1011 self, 

1012 path: str | list[str], 

1013 default_filename: str | None = None, 

1014 no_cache_paths: list[str] | None = None, 

1015 ) -> None: 

1016 """Initialize the file find handler.""" 

1017 self.no_cache_paths = no_cache_paths or [] 

1018 

1019 if isinstance(path, str): 

1020 path = [path] 

1021 

1022 self.root = tuple(os.path.abspath(os.path.expanduser(p)) + os.sep for p in path) # type:ignore[assignment] 

1023 self.default_filename = default_filename 

1024 

1025 def compute_etag(self) -> str | None: 

1026 """Compute the etag.""" 

1027 return None 

1028 

1029 # access is allowed as this class is used to serve static assets on login page 

1030 # TODO: create an allow-list of files used on login page and remove this decorator 

1031 @allow_unauthenticated 

1032 def get(self, path: str, include_body: bool = True) -> Coroutine[Any, Any, None]: 

1033 return super().get(path, include_body) 

1034 

1035 # access is allowed as this class is used to serve static assets on login page 

1036 # TODO: create an allow-list of files used on login page and remove this decorator 

1037 @allow_unauthenticated 

1038 def head(self, path: str) -> Awaitable[None]: 

1039 return super().head(path) 

1040 

1041 @classmethod 

1042 def get_absolute_path(cls, roots: Sequence[str], path: str) -> str: 

1043 """locate a file to serve on our static file search path""" 

1044 with cls._lock: 

1045 if path in cls._static_paths: 

1046 return cls._static_paths[path] 

1047 try: 

1048 abspath = os.path.abspath(filefind(path, roots)) 

1049 except OSError: 

1050 # IOError means not found 

1051 return "" 

1052 

1053 cls._static_paths[path] = abspath 

1054 

1055 log().debug(f"Path {path} served from {abspath}") 

1056 return abspath 

1057 

1058 def validate_absolute_path(self, root: str, absolute_path: str) -> str | None: 

1059 """check if the file should be served (raises 404, 403, etc.)""" 

1060 if not absolute_path: 

1061 raise web.HTTPError(404) 

1062 

1063 for root in self.root: 

1064 if (absolute_path + os.sep).startswith(root): 

1065 break 

1066 

1067 return super().validate_absolute_path(root, absolute_path) 

1068 

1069 

1070class APIVersionHandler(APIHandler): 

1071 """An API handler for the server version.""" 

1072 

1073 _track_activity = False 

1074 

1075 @allow_unauthenticated 

1076 def get(self) -> None: 

1077 """Get the server version info.""" 

1078 # not authenticated, so give as few info as possible 

1079 self.finish(json.dumps({"version": jupyter_server.__version__})) 

1080 

1081 

1082class TrailingSlashHandler(web.RequestHandler): 

1083 """Simple redirect handler that strips trailing slashes 

1084 

1085 This should be the first, highest priority handler. 

1086 """ 

1087 

1088 @allow_unauthenticated 

1089 def get(self) -> None: 

1090 """Handle trailing slashes in a get.""" 

1091 assert self.request.uri is not None 

1092 path, *rest = self.request.uri.partition("?") 

1093 # trim trailing *and* leading / 

1094 # to avoid misinterpreting repeated '//' 

1095 path = "/" + path.strip("/") 

1096 new_uri = "".join([path, *rest]) 

1097 self.redirect(new_uri) 

1098 

1099 post = put = get 

1100 

1101 

1102class MainHandler(JupyterHandler): 

1103 """Simple handler for base_url.""" 

1104 

1105 @allow_unauthenticated 

1106 def get(self) -> None: 

1107 """Get the main template.""" 

1108 html = self.render_template("main.html") 

1109 self.write(html) 

1110 

1111 post = put = get 

1112 

1113 

1114class FilesRedirectHandler(JupyterHandler): 

1115 """Handler for redirecting relative URLs to the /files/ handler""" 

1116 

1117 @staticmethod 

1118 async def redirect_to_files(self: Any, path: str) -> None: 

1119 """make redirect logic a reusable static method 

1120 

1121 so it can be called from other handlers. 

1122 """ 

1123 cm = self.contents_manager 

1124 if await ensure_async(cm.dir_exists(path)): 

1125 # it's a *directory*, redirect to /tree 

1126 url = url_path_join(self.base_url, "tree", url_escape(path)) 

1127 else: 

1128 orig_path = path 

1129 # otherwise, redirect to /files 

1130 parts = path.split("/") 

1131 

1132 if not await ensure_async(cm.file_exists(path=path)) and "files" in parts: 

1133 # redirect without files/ iff it would 404 

1134 # this preserves pre-2.0-style 'files/' links 

1135 self.log.warning("Deprecated files/ URL: %s", orig_path) 

1136 parts.remove("files") 

1137 path = "/".join(parts) 

1138 

1139 if not await ensure_async(cm.file_exists(path=path)): 

1140 raise web.HTTPError(404) 

1141 

1142 url = url_path_join(self.base_url, "files", url_escape(path)) 

1143 self.log.debug("Redirecting %s to %s", self.request.path, url) 

1144 self.redirect(url) 

1145 

1146 @allow_unauthenticated 

1147 async def get(self, path: str = "") -> None: 

1148 return await self.redirect_to_files(self, path) 

1149 

1150 

1151class RedirectWithParams(web.RequestHandler): 

1152 """Same as web.RedirectHandler, but preserves URL parameters""" 

1153 

1154 def initialize(self, url: str, permanent: bool = True) -> None: 

1155 """Initialize a redirect handler.""" 

1156 self._url = url 

1157 self._permanent = permanent 

1158 

1159 @allow_unauthenticated 

1160 def get(self) -> None: 

1161 """Get a redirect.""" 

1162 sep = "&" if "?" in self._url else "?" 

1163 url = sep.join([self._url, self.request.query]) 

1164 self.redirect(url, permanent=self._permanent) 

1165 

1166 

1167class PrometheusMetricsHandler(JupyterHandler): 

1168 """ 

1169 Return prometheus metrics for this server 

1170 """ 

1171 

1172 @allow_unauthenticated 

1173 def get(self) -> None: 

1174 """Get prometheus metrics.""" 

1175 if self.settings["authenticate_prometheus"] and not self.logged_in: 

1176 raise web.HTTPError(403) 

1177 

1178 self.set_header("Content-Type", prometheus_client.CONTENT_TYPE_LATEST) 

1179 self.write(prometheus_client.generate_latest(prometheus_client.REGISTRY)) 

1180 

1181 

1182class PublicStaticFileHandler(web.StaticFileHandler): 

1183 """Same as web.StaticFileHandler, but decorated to acknowledge that auth is not required.""" 

1184 

1185 @allow_unauthenticated 

1186 def head(self, path: str) -> Awaitable[None]: 

1187 return super().head(path) 

1188 

1189 @allow_unauthenticated 

1190 def get(self, path: str, include_body: bool = True) -> Coroutine[Any, Any, None]: 

1191 return super().get(path, include_body) 

1192 

1193 

1194# ----------------------------------------------------------------------------- 

1195# URL pattern fragments for reuse 

1196# ----------------------------------------------------------------------------- 

1197 

1198# path matches any number of `/foo[/bar...]` or just `/` or '' 

1199path_regex = r"(?P<path>(?:(?:/[^/]+)+|/?))" 

1200 

1201# ----------------------------------------------------------------------------- 

1202# URL to handler mappings 

1203# ----------------------------------------------------------------------------- 

1204 

1205 

1206default_handlers = [ 

1207 (r".*/", TrailingSlashHandler), 

1208 (r"api", APIVersionHandler), 

1209 (r"/(robots\.txt|favicon\.ico)", PublicStaticFileHandler), 

1210 (r"/metrics", PrometheusMetricsHandler), 

1211]