Coverage for /pythoncovmergedfiles/medio/medio/src/jupyter_server/jupyter_server/base/handlers.py: 34%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

563 statements  

1"""Base Tornado handlers for the Jupyter server.""" 

2 

3# Copyright (c) Jupyter Development Team. 

4# Distributed under the terms of the Modified BSD License. 

5from __future__ import annotations 

6 

7import functools 

8import inspect 

9import ipaddress 

10import json 

11import mimetypes 

12import os 

13import re 

14import types 

15import warnings 

16from collections.abc import Awaitable, Coroutine, Sequence 

17from http.client import responses 

18from logging import Logger 

19from typing import TYPE_CHECKING, Any, cast 

20from urllib.parse import urlparse 

21 

22import prometheus_client 

23from jinja2 import TemplateNotFound 

24from jupyter_core.paths import is_hidden 

25from tornado import web 

26from tornado.log import app_log 

27from traitlets.config import Application 

28 

29import jupyter_server 

30from jupyter_server import CallContext 

31from jupyter_server._sysinfo import get_sys_info 

32from jupyter_server._tz import utcnow 

33from jupyter_server.auth.decorator import allow_unauthenticated, authorized 

34from jupyter_server.auth.identity import User 

35from jupyter_server.i18n import combine_translations 

36from jupyter_server.services.security import csp_report_uri 

37from jupyter_server.utils import ( 

38 ensure_async, 

39 filefind, 

40 url_escape, 

41 url_is_absolute, 

42 url_path_join, 

43 urldecode_unix_socket_path, 

44) 

45 

46if TYPE_CHECKING: 

47 from jupyter_client.kernelspec import KernelSpecManager 

48 from jupyter_events import EventLogger 

49 from jupyter_server_terminals.terminalmanager import TerminalManager 

50 from tornado.concurrent import Future 

51 

52 from jupyter_server.auth.authorizer import Authorizer 

53 from jupyter_server.auth.identity import IdentityProvider 

54 from jupyter_server.serverapp import ServerApp 

55 from jupyter_server.services.config.manager import ConfigManager 

56 from jupyter_server.services.contents.manager import ContentsManager 

57 from jupyter_server.services.kernels.kernelmanager import AsyncMappingKernelManager 

58 from jupyter_server.services.sessions.sessionmanager import SessionManager 

59 

60# ----------------------------------------------------------------------------- 

61# Top-level handlers 

62# ----------------------------------------------------------------------------- 

63 

64_sys_info_cache = None 

65 

66 

67def json_sys_info(): 

68 """Get sys info as json.""" 

69 global _sys_info_cache # noqa: PLW0603 

70 if _sys_info_cache is None: 

71 _sys_info_cache = json.dumps(get_sys_info()) 

72 return _sys_info_cache 

73 

74 

75def log() -> Logger: 

76 """Get the application log.""" 

77 if Application.initialized(): 

78 return cast(Logger, Application.instance().log) 

79 else: 

80 return app_log 

81 

82 

83class AuthenticatedHandler(web.RequestHandler): 

84 """A RequestHandler with an authenticated user.""" 

85 

86 @property 

87 def base_url(self) -> str: 

88 return cast(str, self.settings.get("base_url", "/")) 

89 

90 @property 

91 def content_security_policy(self) -> str: 

92 """The default Content-Security-Policy header 

93 

94 Can be overridden by defining Content-Security-Policy in settings['headers'] 

95 """ 

96 if "Content-Security-Policy" in self.settings.get("headers", {}): 

97 # user-specified, don't override 

98 return cast(str, self.settings["headers"]["Content-Security-Policy"]) 

99 

100 return "; ".join( 

101 [ 

102 "frame-ancestors 'self'", 

103 # Make sure the report-uri is relative to the base_url 

104 "report-uri " 

105 + self.settings.get("csp_report_uri", url_path_join(self.base_url, csp_report_uri)), 

106 ] 

107 ) 

108 

109 def set_default_headers(self) -> None: 

110 """Set the default headers.""" 

111 headers = {} 

112 headers["X-Content-Type-Options"] = "nosniff" 

113 headers.update(self.settings.get("headers", {})) 

114 

115 headers["Content-Security-Policy"] = self.content_security_policy 

116 

117 # Allow for overriding headers 

118 for header_name, value in headers.items(): 

119 try: 

120 self.set_header(header_name, value) 

121 except Exception as e: 

122 # tornado raise Exception (not a subclass) 

123 # if method is unsupported (websocket and Access-Control-Allow-Origin 

124 # for example, so just ignore) 

125 self.log.exception( # type:ignore[attr-defined] 

126 "Could not set default headers: %s", e 

127 ) 

128 

129 @property 

130 def cookie_name(self) -> str: 

131 warnings.warn( 

132 """JupyterHandler.login_handler is deprecated in 2.0, 

133 use JupyterHandler.identity_provider. 

134 """, 

135 DeprecationWarning, 

136 stacklevel=2, 

137 ) 

138 return self.identity_provider.get_cookie_name(self) 

139 

140 def force_clear_cookie(self, name: str, path: str = "/", domain: str | None = None) -> None: 

141 """Force a cookie clear.""" 

142 warnings.warn( 

143 """JupyterHandler.login_handler is deprecated in 2.0, 

144 use JupyterHandler.identity_provider. 

145 """, 

146 DeprecationWarning, 

147 stacklevel=2, 

148 ) 

149 self.identity_provider._force_clear_cookie(self, name, path=path, domain=domain) 

150 

151 def clear_login_cookie(self) -> None: 

152 """Clear a login cookie.""" 

153 warnings.warn( 

154 """JupyterHandler.login_handler is deprecated in 2.0, 

155 use JupyterHandler.identity_provider. 

156 """, 

157 DeprecationWarning, 

158 stacklevel=2, 

159 ) 

160 self.identity_provider.clear_login_cookie(self) 

161 

162 def get_current_user(self) -> str: 

163 """Get the current user.""" 

164 clsname = self.__class__.__name__ 

165 msg = ( 

166 f"Calling `{clsname}.get_current_user()` directly is deprecated in jupyter-server 2.0." 

167 " Use `self.current_user` instead (works in all versions)." 

168 ) 

169 if hasattr(self, "_jupyter_current_user"): 

170 # backward-compat: return _jupyter_current_user 

171 warnings.warn( 

172 msg, 

173 DeprecationWarning, 

174 stacklevel=2, 

175 ) 

176 return cast(str, self._jupyter_current_user) 

177 # haven't called get_user in prepare, raise 

178 raise RuntimeError(msg) 

179 

180 def skip_check_origin(self) -> bool: 

181 """Ask my login_handler if I should skip the origin_check 

182 

183 For example: in the default LoginHandler, if a request is token-authenticated, 

184 origin checking should be skipped. 

185 """ 

186 if self.request.method == "OPTIONS": 

187 # no origin-check on options requests, which are used to check origins! 

188 return True 

189 return not self.identity_provider.should_check_origin(self) 

190 

191 @property 

192 def token_authenticated(self) -> bool: 

193 """Have I been authenticated with a token?""" 

194 return self.identity_provider.is_token_authenticated(self) 

195 

196 @property 

197 def logged_in(self) -> bool: 

198 """Is a user currently logged in?""" 

199 user = self.current_user 

200 return bool(user and user != "anonymous") 

201 

202 @property 

203 def login_handler(self) -> Any: 

204 """Return the login handler for this application, if any.""" 

205 warnings.warn( 

206 """JupyterHandler.login_handler is deprecated in 2.0, 

207 use JupyterHandler.identity_provider. 

208 """, 

209 DeprecationWarning, 

210 stacklevel=2, 

211 ) 

212 return self.identity_provider.login_handler_class 

213 

214 @property 

215 def token(self) -> str | None: 

216 """Return the login token for this application, if any.""" 

217 return self.identity_provider.token 

218 

219 @property 

220 def login_available(self) -> bool: 

221 """May a user proceed to log in? 

222 

223 This returns True if login capability is available, irrespective of 

224 whether the user is already logged in or not. 

225 

226 """ 

227 return cast(bool, self.identity_provider.login_available) 

228 

229 @property 

230 def authorizer(self) -> Authorizer: 

231 if "authorizer" not in self.settings: 

232 warnings.warn( 

233 "The Tornado web application does not have an 'authorizer' defined " 

234 "in its settings. In future releases of jupyter_server, this will " 

235 "be a required key for all subclasses of `JupyterHandler`. For an " 

236 "example, see the jupyter_server source code for how to " 

237 "add an authorizer to the tornado settings: " 

238 "https://github.com/jupyter-server/jupyter_server/blob/" 

239 "653740cbad7ce0c8a8752ce83e4d3c2c754b13cb/jupyter_server/serverapp.py" 

240 "#L234-L256", 

241 stacklevel=2, 

242 ) 

243 from jupyter_server.auth import AllowAllAuthorizer 

244 

245 self.settings["authorizer"] = AllowAllAuthorizer( 

246 config=self.settings.get("config", None), 

247 identity_provider=self.identity_provider, 

248 ) 

249 

250 return cast("Authorizer", self.settings.get("authorizer")) 

251 

252 @property 

253 def identity_provider(self) -> IdentityProvider: 

254 if "identity_provider" not in self.settings: 

255 warnings.warn( 

256 "The Tornado web application does not have an 'identity_provider' defined " 

257 "in its settings. In future releases of jupyter_server, this will " 

258 "be a required key for all subclasses of `JupyterHandler`. For an " 

259 "example, see the jupyter_server source code for how to " 

260 "add an identity provider to the tornado settings: " 

261 "https://github.com/jupyter-server/jupyter_server/blob/v2.0.0/" 

262 "jupyter_server/serverapp.py#L242", 

263 stacklevel=2, 

264 ) 

265 from jupyter_server.auth import IdentityProvider 

266 

267 # no identity provider set, load default 

268 self.settings["identity_provider"] = IdentityProvider( 

269 config=self.settings.get("config", None) 

270 ) 

271 return cast("IdentityProvider", self.settings["identity_provider"]) 

272 

273 

274class JupyterHandler(AuthenticatedHandler): 

275 """Jupyter-specific extensions to authenticated handling 

276 

277 Mostly property shortcuts to Jupyter-specific settings. 

278 """ 

279 

280 @property 

281 def config(self) -> dict[str, Any] | None: 

282 return cast("dict[str, Any] | None", self.settings.get("config", None)) 

283 

284 @property 

285 def log(self) -> Logger: 

286 """use the Jupyter log by default, falling back on tornado's logger""" 

287 return log() 

288 

289 @property 

290 def jinja_template_vars(self) -> dict[str, Any]: 

291 """User-supplied values to supply to jinja templates.""" 

292 return cast("dict[str, Any]", self.settings.get("jinja_template_vars", {})) 

293 

294 @property 

295 def serverapp(self) -> ServerApp | None: 

296 return cast("ServerApp | None", self.settings["serverapp"]) 

297 

298 # --------------------------------------------------------------- 

299 # URLs 

300 # --------------------------------------------------------------- 

301 

302 @property 

303 def version_hash(self) -> str: 

304 """The version hash to use for cache hints for static files""" 

305 return cast(str, self.settings.get("version_hash", "")) 

306 

307 @property 

308 def mathjax_url(self) -> str: 

309 url = cast(str, self.settings.get("mathjax_url", "")) 

310 if not url or url_is_absolute(url): 

311 return url 

312 return url_path_join(self.base_url, url) 

313 

314 @property 

315 def mathjax_config(self) -> str: 

316 return cast(str, self.settings.get("mathjax_config", "TeX-AMS-MML_HTMLorMML-full,Safe")) 

317 

318 @property 

319 def default_url(self) -> str: 

320 return cast(str, self.settings.get("default_url", "")) 

321 

322 @property 

323 def ws_url(self) -> str: 

324 return cast(str, self.settings.get("websocket_url", "")) 

325 

326 @property 

327 def contents_js_source(self) -> str: 

328 self.log.debug( 

329 "Using contents: %s", 

330 self.settings.get("contents_js_source", "services/contents"), 

331 ) 

332 return cast(str, self.settings.get("contents_js_source", "services/contents")) 

333 

334 # --------------------------------------------------------------- 

335 # Manager objects 

336 # --------------------------------------------------------------- 

337 

338 @property 

339 def kernel_manager(self) -> AsyncMappingKernelManager: 

340 return cast("AsyncMappingKernelManager", self.settings["kernel_manager"]) 

341 

342 @property 

343 def contents_manager(self) -> ContentsManager: 

344 return cast("ContentsManager", self.settings["contents_manager"]) 

345 

346 @property 

347 def session_manager(self) -> SessionManager: 

348 return cast("SessionManager", self.settings["session_manager"]) 

349 

350 @property 

351 def terminal_manager(self) -> TerminalManager: 

352 return cast("TerminalManager", self.settings["terminal_manager"]) 

353 

354 @property 

355 def kernel_spec_manager(self) -> KernelSpecManager: 

356 return cast("KernelSpecManager", self.settings["kernel_spec_manager"]) 

357 

358 @property 

359 def config_manager(self) -> ConfigManager: 

360 return cast("ConfigManager", self.settings["config_manager"]) 

361 

362 @property 

363 def event_logger(self) -> EventLogger: 

364 return cast("EventLogger", self.settings["event_logger"]) 

365 

366 # --------------------------------------------------------------- 

367 # CORS 

368 # --------------------------------------------------------------- 

369 

370 @property 

371 def allow_origin(self) -> str: 

372 """Normal Access-Control-Allow-Origin""" 

373 return cast(str, self.settings.get("allow_origin", "")) 

374 

375 @property 

376 def allow_origin_pat(self) -> str | None: 

377 """Regular expression version of allow_origin""" 

378 return cast("str | None", self.settings.get("allow_origin_pat", None)) 

379 

380 @property 

381 def allow_credentials(self) -> bool: 

382 """Whether to set Access-Control-Allow-Credentials""" 

383 return cast(bool, self.settings.get("allow_credentials", False)) 

384 

385 def set_default_headers(self) -> None: 

386 """Add CORS headers, if defined""" 

387 super().set_default_headers() 

388 

389 def set_cors_headers(self) -> None: 

390 """Add CORS headers, if defined 

391 

392 Now that current_user is async (jupyter-server 2.0), 

393 must be called at the end of prepare(), instead of in set_default_headers. 

394 """ 

395 if self.allow_origin: 

396 self.set_header("Access-Control-Allow-Origin", self.allow_origin) 

397 elif self.allow_origin_pat: 

398 origin = self.get_origin() 

399 if origin and re.match(self.allow_origin_pat, origin): 

400 self.set_header("Access-Control-Allow-Origin", origin) 

401 elif self.token_authenticated and "Access-Control-Allow-Origin" not in self.settings.get( 

402 "headers", {} 

403 ): 

404 # allow token-authenticated requests cross-origin by default. 

405 # only apply this exception if allow-origin has not been specified. 

406 self.set_header("Access-Control-Allow-Origin", self.request.headers.get("Origin", "")) 

407 

408 if self.allow_credentials: 

409 self.set_header("Access-Control-Allow-Credentials", "true") 

410 

411 def set_attachment_header(self, filename: str) -> None: 

412 """Set Content-Disposition: attachment header 

413 

414 As a method to ensure handling of filename encoding 

415 """ 

416 escaped_filename = url_escape(filename) 

417 self.set_header( 

418 "Content-Disposition", 

419 f"attachment; filename*=utf-8''{escaped_filename}", 

420 ) 

421 

422 def get_origin(self) -> str | None: 

423 # Handle WebSocket Origin naming convention differences 

424 # The difference between version 8 and 13 is that in 8 the 

425 # client sends a "Sec-Websocket-Origin" header and in 13 it's 

426 # simply "Origin". 

427 if "Origin" in self.request.headers: 

428 origin = self.request.headers.get("Origin") 

429 else: 

430 origin = self.request.headers.get("Sec-Websocket-Origin", None) 

431 return origin 

432 

433 # origin_to_satisfy_tornado is present because tornado requires 

434 # check_origin to take an origin argument, but we don't use it 

435 def check_origin(self, origin_to_satisfy_tornado: str = "") -> bool: 

436 """Check Origin for cross-site API requests, including websockets 

437 

438 Copied from WebSocket with changes: 

439 

440 - allow unspecified host/origin (e.g. scripts) 

441 - allow token-authenticated requests 

442 """ 

443 if self.allow_origin == "*" or self.skip_check_origin(): 

444 return True 

445 

446 host = self.request.headers.get("Host") 

447 origin = self.request.headers.get("Origin") 

448 

449 # If no header is provided, let the request through. 

450 # Origin can be None for: 

451 # - same-origin (IE, Firefox) 

452 # - Cross-site POST form (IE, Firefox) 

453 # - Scripts 

454 # The cross-site POST (XSRF) case is handled by tornado's xsrf_token 

455 if origin is None or host is None: 

456 return True 

457 

458 origin = origin.lower() 

459 origin_host = urlparse(origin).netloc 

460 

461 # OK if origin matches host 

462 if origin_host == host: 

463 return True 

464 

465 # Check CORS headers 

466 if self.allow_origin: 

467 allow = bool(self.allow_origin == origin) 

468 elif self.allow_origin_pat: 

469 allow = bool(re.match(self.allow_origin_pat, origin)) 

470 else: 

471 # No CORS headers deny the request 

472 allow = False 

473 if not allow: 

474 self.log.warning( 

475 "Blocking Cross Origin API request for %s. Origin: %s, Host: %s", 

476 self.request.path, 

477 origin, 

478 host, 

479 ) 

480 return allow 

481 

482 def check_referer(self) -> bool: 

483 """Check Referer for cross-site requests. 

484 Disables requests to certain endpoints with 

485 external or missing Referer. 

486 If set, allow_origin settings are applied to the Referer 

487 to whitelist specific cross-origin sites. 

488 Used on GET for api endpoints and /files/ 

489 to block cross-site inclusion (XSSI). 

490 """ 

491 if self.allow_origin == "*" or self.skip_check_origin(): 

492 return True 

493 

494 host = self.request.headers.get("Host") 

495 referer = self.request.headers.get("Referer") 

496 

497 if not host: 

498 self.log.warning("Blocking request with no host") 

499 return False 

500 if not referer: 

501 self.log.warning("Blocking request with no referer") 

502 return False 

503 

504 referer_url = urlparse(referer) 

505 referer_host = referer_url.netloc 

506 if referer_host == host: 

507 return True 

508 

509 # apply cross-origin checks to Referer: 

510 origin = f"{referer_url.scheme}://{referer_url.netloc}" 

511 if self.allow_origin: 

512 allow = self.allow_origin == origin 

513 elif self.allow_origin_pat: 

514 allow = bool(re.match(self.allow_origin_pat, origin)) 

515 else: 

516 # No CORS settings, deny the request 

517 allow = False 

518 

519 if not allow: 

520 self.log.warning( 

521 "Blocking Cross Origin request for %s. Referer: %s, Host: %s", 

522 self.request.path, 

523 origin, 

524 host, 

525 ) 

526 return allow 

527 

528 def check_xsrf_cookie(self) -> None: 

529 """Bypass xsrf cookie checks when token-authenticated""" 

530 if not hasattr(self, "_jupyter_current_user"): 

531 # Called too early, will be checked later 

532 return None 

533 if self.token_authenticated or self.settings.get("disable_check_xsrf", False): 

534 # Token-authenticated requests do not need additional XSRF-check 

535 # Servers without authentication are vulnerable to XSRF 

536 return None 

537 try: 

538 return super().check_xsrf_cookie() 

539 except web.HTTPError as e: 

540 if self.request.method in {"GET", "HEAD"}: 

541 # Consider Referer a sufficient cross-origin check for GET requests 

542 if not self.check_referer(): 

543 referer = self.request.headers.get("Referer") 

544 if referer: 

545 msg = f"Blocking Cross Origin request from {referer}." 

546 else: 

547 msg = "Blocking request from unknown origin" 

548 raise web.HTTPError(403, msg) from e 

549 else: 

550 raise 

551 

552 def check_host(self) -> bool: 

553 """Check the host header if remote access disallowed. 

554 

555 Returns True if the request should continue, False otherwise. 

556 """ 

557 if self.settings.get("allow_remote_access", False): 

558 return True 

559 

560 # Remove port (e.g. ':8888') from host 

561 match = re.match(r"^(.*?)(:\d+)?$", self.request.host) 

562 assert match is not None 

563 host = match.group(1) 

564 

565 # Browsers format IPv6 addresses like [::1]; we need to remove the [] 

566 if host.startswith("[") and host.endswith("]"): 

567 host = host[1:-1] 

568 

569 # UNIX socket handling 

570 check_host = urldecode_unix_socket_path(host) 

571 if check_host.startswith("/") and os.path.exists(check_host): 

572 allow = True 

573 else: 

574 try: 

575 addr = ipaddress.ip_address(host) 

576 except ValueError: 

577 # Not an IP address: check against hostnames 

578 allow = host in self.settings.get("local_hostnames", ["localhost"]) 

579 else: 

580 allow = addr.is_loopback 

581 

582 if not allow: 

583 self.log.warning( 

584 ( 

585 "Blocking request with non-local 'Host' %s (%s). " 

586 "If the server should be accessible at that name, " 

587 "set ServerApp.allow_remote_access to disable the check." 

588 ), 

589 host, 

590 self.request.host, 

591 ) 

592 return allow 

593 

594 async def prepare(self, *, _redirect_to_login=True) -> Awaitable[None] | None: # type:ignore[override] 

595 """Prepare a response.""" 

596 # Set the current Jupyter Handler context variable. 

597 CallContext.set(CallContext.JUPYTER_HANDLER, self) 

598 

599 if not self.check_host(): 

600 self.current_user = self._jupyter_current_user = None 

601 raise web.HTTPError(403) 

602 

603 from jupyter_server.auth import IdentityProvider 

604 

605 mod_obj = inspect.getmodule(self.get_current_user) 

606 assert mod_obj is not None 

607 user: User | None = None 

608 

609 if type(self.identity_provider) is IdentityProvider and mod_obj.__name__ != __name__: 

610 # check for overridden get_current_user + default IdentityProvider 

611 # deprecated way to override auth (e.g. JupyterHub < 3.0) 

612 # allow deprecated, overridden get_current_user 

613 warnings.warn( 

614 "Overriding JupyterHandler.get_current_user is deprecated in jupyter-server 2.0." 

615 " Use an IdentityProvider class.", 

616 DeprecationWarning, 

617 stacklevel=1, 

618 ) 

619 user = User(self.get_current_user()) 

620 else: 

621 _user = self.identity_provider.get_user(self) 

622 if isinstance(_user, Awaitable): 

623 # IdentityProvider.get_user _may_ be async 

624 _user = await _user 

625 user = _user 

626 

627 # self.current_user for tornado's @web.authenticated 

628 # self._jupyter_current_user for backward-compat in deprecated get_current_user calls 

629 # and our own private checks for whether .current_user has been set 

630 self.current_user = self._jupyter_current_user = user 

631 # complete initial steps which require auth to resolve first: 

632 self.set_cors_headers() 

633 if self.request.method not in {"GET", "HEAD", "OPTIONS"}: 

634 self.check_xsrf_cookie() 

635 

636 if not self.settings.get("allow_unauthenticated_access", False): 

637 if not self.request.method: 

638 raise HTTPError(403) 

639 method = getattr(self, self.request.method.lower()) 

640 if not getattr(method, "__allow_unauthenticated", False): 

641 if _redirect_to_login: 

642 # reuse `web.authenticated` logic, which redirects to the login 

643 # page on GET and HEAD and otherwise raises 403 

644 return web.authenticated(lambda _: super().prepare())(self) 

645 else: 

646 # raise 403 if user is not known without redirecting to login page 

647 user = self.current_user 

648 if user is None: 

649 self.log.warning( 

650 f"Couldn't authenticate {self.__class__.__name__} connection" 

651 ) 

652 raise web.HTTPError(403) 

653 

654 return super().prepare() 

655 

656 # --------------------------------------------------------------- 

657 # template rendering 

658 # --------------------------------------------------------------- 

659 

660 def get_template(self, name): 

661 """Return the jinja template object for a given name""" 

662 return self.settings["jinja2_env"].get_template(name) 

663 

664 def render_template(self, name, **ns): 

665 """Render a template by name.""" 

666 ns.update(self.template_namespace) 

667 template = self.get_template(name) 

668 return template.render(**ns) 

669 

670 @property 

671 def template_namespace(self) -> dict[str, Any]: 

672 return dict( 

673 base_url=self.base_url, 

674 default_url=self.default_url, 

675 ws_url=self.ws_url, 

676 logged_in=self.logged_in, 

677 allow_password_change=getattr(self.identity_provider, "allow_password_change", False), 

678 auth_enabled=self.identity_provider.auth_enabled, 

679 login_available=self.identity_provider.login_available, 

680 token_available=bool(self.token), 

681 static_url=self.static_url, 

682 sys_info=json_sys_info(), 

683 contents_js_source=self.contents_js_source, 

684 version_hash=self.version_hash, 

685 xsrf_form_html=self.xsrf_form_html, 

686 token=self.token, 

687 xsrf_token=self.xsrf_token.decode("utf8"), 

688 nbjs_translations=json.dumps( 

689 combine_translations(self.request.headers.get("Accept-Language", "")) 

690 ), 

691 **self.jinja_template_vars, 

692 ) 

693 

694 def get_json_body(self) -> dict[str, Any] | None: 

695 """Return the body of the request as JSON data.""" 

696 if not self.request.body: 

697 return None 

698 # Do we need to call body.decode('utf-8') here? 

699 body = self.request.body.strip().decode("utf-8") 

700 try: 

701 model = json.loads(body) 

702 except Exception as e: 

703 self.log.debug("Bad JSON: %r", body) 

704 self.log.error("Couldn't parse JSON", exc_info=True) 

705 raise web.HTTPError(400, "Invalid JSON in body of request") from e 

706 return cast("dict[str, Any]", model) 

707 

708 def write_error(self, status_code: int, **kwargs: Any) -> None: 

709 """render custom error pages""" 

710 exc_info = kwargs.get("exc_info") 

711 message = "" 

712 status_message = responses.get(status_code, "Unknown HTTP Error") 

713 

714 if exc_info: 

715 exception = exc_info[1] 

716 # get the custom message, if defined 

717 try: 

718 message = exception.log_message % exception.args 

719 except Exception: 

720 pass 

721 

722 # construct the custom reason, if defined 

723 reason = getattr(exception, "reason", "") 

724 if reason: 

725 status_message = reason 

726 else: 

727 exception = "(unknown)" 

728 

729 # build template namespace 

730 ns = { 

731 "status_code": status_code, 

732 "status_message": status_message, 

733 "message": message, 

734 "exception": exception, 

735 } 

736 

737 self.set_header("Content-Type", "text/html") 

738 # render the template 

739 try: 

740 html = self.render_template("%s.html" % status_code, **ns) 

741 except TemplateNotFound: 

742 html = self.render_template("error.html", **ns) 

743 

744 self.write(html) 

745 

746 

747class APIHandler(JupyterHandler): 

748 """Base class for API handlers""" 

749 

750 async def prepare(self) -> None: # type:ignore[override] 

751 """Prepare an API response.""" 

752 await super().prepare() 

753 if not self.check_origin(): 

754 raise web.HTTPError(404) 

755 

756 def write_error(self, status_code: int, **kwargs: Any) -> None: 

757 """APIHandler errors are JSON, not human pages""" 

758 self.set_header("Content-Type", "application/json") 

759 message = responses.get(status_code, "Unknown HTTP Error") 

760 reply: dict[str, Any] = { 

761 "message": message, 

762 } 

763 exc_info = kwargs.get("exc_info") 

764 if exc_info: 

765 e = exc_info[1] 

766 if isinstance(e, HTTPError): 

767 reply["message"] = e.log_message or message 

768 reply["reason"] = e.reason 

769 else: 

770 reply["message"] = "Unhandled error" 

771 reply["reason"] = None 

772 # backward-compatibility: traceback field is present, 

773 # but always empty 

774 reply["traceback"] = "" 

775 self.log.warning("wrote error: %r", reply["message"], exc_info=True) 

776 self.finish(json.dumps(reply)) 

777 

778 def get_login_url(self) -> str: 

779 """Get the login url.""" 

780 # if get_login_url is invoked in an API handler, 

781 # that means @web.authenticated is trying to trigger a redirect. 

782 # instead of redirecting, raise 403 instead. 

783 if not self.current_user: 

784 raise web.HTTPError(403) 

785 return super().get_login_url() 

786 

787 @property 

788 def content_security_policy(self) -> str: 

789 csp = "; ".join( # noqa: FLY002 

790 [ 

791 super().content_security_policy, 

792 "default-src 'none'", 

793 ] 

794 ) 

795 return csp 

796 

797 # set _track_activity = False on API handlers that shouldn't track activity 

798 _track_activity = True 

799 

800 def update_api_activity(self) -> None: 

801 """Update last_activity of API requests""" 

802 # record activity of authenticated requests 

803 if ( 

804 self._track_activity 

805 and getattr(self, "_jupyter_current_user", None) 

806 and self.get_argument("no_track_activity", None) is None 

807 ): 

808 self.settings["api_last_activity"] = utcnow() 

809 

810 def finish(self, *args: Any, **kwargs: Any) -> Future[Any]: 

811 """Finish an API response.""" 

812 self.update_api_activity() 

813 # Allow caller to indicate content-type... 

814 set_content_type = kwargs.pop("set_content_type", "application/json") 

815 self.set_header("Content-Type", set_content_type) 

816 return super().finish(*args, **kwargs) 

817 

818 @allow_unauthenticated 

819 def options(self, *args: Any, **kwargs: Any) -> None: 

820 """Get the options.""" 

821 if "Access-Control-Allow-Headers" in self.settings.get("headers", {}): 

822 self.set_header( 

823 "Access-Control-Allow-Headers", 

824 self.settings["headers"]["Access-Control-Allow-Headers"], 

825 ) 

826 else: 

827 self.set_header( 

828 "Access-Control-Allow-Headers", 

829 "accept, content-type, authorization, x-xsrftoken", 

830 ) 

831 self.set_header("Access-Control-Allow-Methods", "GET, PUT, POST, PATCH, DELETE, OPTIONS") 

832 

833 # if authorization header is requested, 

834 # that means the request is token-authenticated. 

835 # avoid browser-side rejection of the preflight request. 

836 # only allow this exception if allow_origin has not been specified 

837 # and Jupyter server authentication is enabled. 

838 # If the token is not valid, the 'real' request will still be rejected. 

839 requested_headers = self.request.headers.get("Access-Control-Request-Headers", "").split( 

840 "," 

841 ) 

842 if ( 

843 requested_headers 

844 and any(h.strip().lower() == "authorization" for h in requested_headers) 

845 and ( 

846 # FIXME: it would be even better to check specifically for token-auth, 

847 # but there is currently no API for this. 

848 self.login_available 

849 ) 

850 and ( 

851 self.allow_origin 

852 or self.allow_origin_pat 

853 or "Access-Control-Allow-Origin" in self.settings.get("headers", {}) 

854 ) 

855 ): 

856 self.set_header("Access-Control-Allow-Origin", self.request.headers.get("Origin", "")) 

857 

858 

859class Template404(JupyterHandler): 

860 """Render our 404 template""" 

861 

862 async def prepare(self) -> None: # type:ignore[override] 

863 """Prepare a 404 response.""" 

864 await super().prepare() 

865 raise web.HTTPError(404) 

866 

867 

868class AuthenticatedFileHandler(JupyterHandler, web.StaticFileHandler): 

869 """static files should only be accessible when logged in""" 

870 

871 auth_resource = "contents" 

872 

873 @property 

874 def content_security_policy(self) -> str: 

875 # In case we're serving HTML/SVG, confine any Javascript to a unique 

876 # origin so it can't interact with the Jupyter server. 

877 return super().content_security_policy + "; sandbox allow-scripts" 

878 

879 @web.authenticated 

880 @authorized 

881 def head(self, path: str) -> Awaitable[None]: # type:ignore[override] 

882 """Get the head response for a path.""" 

883 self.check_xsrf_cookie() 

884 return super().head(path) 

885 

886 @web.authenticated 

887 @authorized 

888 def get( # type:ignore[override] 

889 self, path: str, **kwargs: Any 

890 ) -> Awaitable[None]: 

891 """Get a file by path.""" 

892 self.check_xsrf_cookie() 

893 if os.path.splitext(path)[1] == ".ipynb" or self.get_argument("download", None): 

894 name = path.rsplit("/", 1)[-1] 

895 self.set_attachment_header(name) 

896 

897 return web.StaticFileHandler.get(self, path, **kwargs) 

898 

899 def get_content_type(self) -> str: 

900 """Get the content type.""" 

901 assert self.absolute_path is not None 

902 path = self.absolute_path.strip("/") 

903 if "/" in path: 

904 _, name = path.rsplit("/", 1) 

905 else: 

906 name = path 

907 if name.endswith(".ipynb"): 

908 return "application/x-ipynb+json" 

909 else: 

910 cur_mime = mimetypes.guess_type(name)[0] 

911 if cur_mime == "text/plain": 

912 return "text/plain; charset=UTF-8" 

913 else: 

914 return super().get_content_type() 

915 

916 def set_headers(self) -> None: 

917 """Set the headers.""" 

918 super().set_headers() 

919 # disable browser caching, rely on 304 replies for savings 

920 if "v" not in self.request.arguments: 

921 self.add_header("Cache-Control", "no-cache") 

922 

923 def compute_etag(self) -> str | None: 

924 """Compute the etag.""" 

925 return None 

926 

927 def validate_absolute_path(self, root: str, absolute_path: str) -> str: 

928 """Validate and return the absolute path. 

929 

930 Requires tornado 3.1 

931 

932 Adding to tornado's own handling, forbids the serving of hidden files. 

933 """ 

934 abs_path = super().validate_absolute_path(root, absolute_path) 

935 abs_root = os.path.abspath(root) 

936 assert abs_path is not None 

937 if not self.contents_manager.allow_hidden and is_hidden(abs_path, abs_root): 

938 self.log.info( 

939 "Refusing to serve hidden file, via 404 Error, use flag 'ContentsManager.allow_hidden' to enable" 

940 ) 

941 raise web.HTTPError(404) 

942 return abs_path 

943 

944 

945def json_errors(method: Any) -> Any: # pragma: no cover 

946 """Decorate methods with this to return GitHub style JSON errors. 

947 

948 This should be used on any JSON API on any handler method that can raise HTTPErrors. 

949 

950 This will grab the latest HTTPError exception using sys.exc_info 

951 and then: 

952 

953 1. Set the HTTP status code based on the HTTPError 

954 2. Create and return a JSON body with a message field describing 

955 the error in a human readable form. 

956 """ 

957 warnings.warn( 

958 "@json_errors is deprecated in notebook 5.2.0. Subclass APIHandler instead.", 

959 DeprecationWarning, 

960 stacklevel=2, 

961 ) 

962 

963 @functools.wraps(method) 

964 def wrapper(self, *args, **kwargs): 

965 self.write_error = types.MethodType(APIHandler.write_error, self) 

966 return method(self, *args, **kwargs) 

967 

968 return wrapper 

969 

970 

971# ----------------------------------------------------------------------------- 

972# File handler 

973# ----------------------------------------------------------------------------- 

974 

975# to minimize subclass changes: 

976HTTPError = web.HTTPError 

977 

978 

979class FileFindHandler(JupyterHandler, web.StaticFileHandler): 

980 """subclass of StaticFileHandler for serving files from a search path 

981 

982 The setting "static_immutable_cache" can be set up to serve some static 

983 file as immutable (e.g. file name containing a hash). The setting is a 

984 list of base URL, every static file URL starting with one of those will 

985 be immutable. 

986 """ 

987 

988 # cache search results, don't search for files more than once 

989 _static_paths: dict[str, str] = {} 

990 root: tuple[str] # type:ignore[assignment] 

991 

992 def set_headers(self) -> None: 

993 """Set the headers.""" 

994 super().set_headers() 

995 

996 immutable_paths = self.settings.get("static_immutable_cache", []) 

997 

998 # allow immutable cache for files 

999 if any(self.request.path.startswith(path) for path in immutable_paths): 

1000 self.set_header("Cache-Control", "public, max-age=31536000, immutable") 

1001 

1002 # disable browser caching, rely on 304 replies for savings 

1003 elif "v" not in self.request.arguments or any( 

1004 self.request.path.startswith(path) for path in self.no_cache_paths 

1005 ): 

1006 self.set_header("Cache-Control", "no-cache") 

1007 

1008 def initialize( 

1009 self, 

1010 path: str | list[str], 

1011 default_filename: str | None = None, 

1012 no_cache_paths: list[str] | None = None, 

1013 ) -> None: 

1014 """Initialize the file find handler.""" 

1015 self.no_cache_paths = no_cache_paths or [] 

1016 

1017 if isinstance(path, str): 

1018 path = [path] 

1019 

1020 self.root = tuple(os.path.abspath(os.path.expanduser(p)) + os.sep for p in path) # type:ignore[assignment] 

1021 self.default_filename = default_filename 

1022 

1023 def compute_etag(self) -> str | None: 

1024 """Compute the etag.""" 

1025 return None 

1026 

1027 # access is allowed as this class is used to serve static assets on login page 

1028 # TODO: create an allow-list of files used on login page and remove this decorator 

1029 @allow_unauthenticated 

1030 def get(self, path: str, include_body: bool = True) -> Coroutine[Any, Any, None]: 

1031 return super().get(path, include_body) 

1032 

1033 # access is allowed as this class is used to serve static assets on login page 

1034 # TODO: create an allow-list of files used on login page and remove this decorator 

1035 @allow_unauthenticated 

1036 def head(self, path: str) -> Awaitable[None]: 

1037 return super().head(path) 

1038 

1039 @classmethod 

1040 def get_absolute_path(cls, roots: Sequence[str], path: str) -> str: 

1041 """locate a file to serve on our static file search path""" 

1042 with cls._lock: 

1043 if path in cls._static_paths: 

1044 return cls._static_paths[path] 

1045 try: 

1046 abspath = os.path.abspath(filefind(path, roots)) 

1047 except OSError: 

1048 # IOError means not found 

1049 return "" 

1050 

1051 cls._static_paths[path] = abspath 

1052 

1053 log().debug(f"Path {path} served from {abspath}") 

1054 return abspath 

1055 

1056 def validate_absolute_path(self, root: str, absolute_path: str) -> str | None: 

1057 """check if the file should be served (raises 404, 403, etc.)""" 

1058 if not absolute_path: 

1059 raise web.HTTPError(404) 

1060 

1061 for root in self.root: 

1062 if (absolute_path + os.sep).startswith(root): 

1063 break 

1064 

1065 return super().validate_absolute_path(root, absolute_path) 

1066 

1067 

1068class APIVersionHandler(APIHandler): 

1069 """An API handler for the server version.""" 

1070 

1071 _track_activity = False 

1072 

1073 @allow_unauthenticated 

1074 def get(self) -> None: 

1075 """Get the server version info.""" 

1076 # not authenticated, so give as few info as possible 

1077 self.finish(json.dumps({"version": jupyter_server.__version__})) 

1078 

1079 

1080class TrailingSlashHandler(web.RequestHandler): 

1081 """Simple redirect handler that strips trailing slashes 

1082 

1083 This should be the first, highest priority handler. 

1084 """ 

1085 

1086 @allow_unauthenticated 

1087 def get(self) -> None: 

1088 """Handle trailing slashes in a get.""" 

1089 assert self.request.uri is not None 

1090 path, *rest = self.request.uri.partition("?") 

1091 # trim trailing *and* leading / 

1092 # to avoid misinterpreting repeated '//' 

1093 path = "/" + path.strip("/") 

1094 new_uri = "".join([path, *rest]) 

1095 self.redirect(new_uri) 

1096 

1097 post = put = get 

1098 

1099 

1100class MainHandler(JupyterHandler): 

1101 """Simple handler for base_url.""" 

1102 

1103 @allow_unauthenticated 

1104 def get(self) -> None: 

1105 """Get the main template.""" 

1106 html = self.render_template("main.html") 

1107 self.write(html) 

1108 

1109 post = put = get 

1110 

1111 

1112class FilesRedirectHandler(JupyterHandler): 

1113 """Handler for redirecting relative URLs to the /files/ handler""" 

1114 

1115 @staticmethod 

1116 async def redirect_to_files(self: Any, path: str) -> None: 

1117 """make redirect logic a reusable static method 

1118 

1119 so it can be called from other handlers. 

1120 """ 

1121 cm = self.contents_manager 

1122 if await ensure_async(cm.dir_exists(path)): 

1123 # it's a *directory*, redirect to /tree 

1124 url = url_path_join(self.base_url, "tree", url_escape(path)) 

1125 else: 

1126 orig_path = path 

1127 # otherwise, redirect to /files 

1128 parts = path.split("/") 

1129 

1130 if not await ensure_async(cm.file_exists(path=path)) and "files" in parts: 

1131 # redirect without files/ iff it would 404 

1132 # this preserves pre-2.0-style 'files/' links 

1133 self.log.warning("Deprecated files/ URL: %s", orig_path) 

1134 parts.remove("files") 

1135 path = "/".join(parts) 

1136 

1137 if not await ensure_async(cm.file_exists(path=path)): 

1138 raise web.HTTPError(404) 

1139 

1140 url = url_path_join(self.base_url, "files", url_escape(path)) 

1141 self.log.debug("Redirecting %s to %s", self.request.path, url) 

1142 self.redirect(url) 

1143 

1144 @allow_unauthenticated 

1145 async def get(self, path: str = "") -> None: 

1146 return await self.redirect_to_files(self, path) 

1147 

1148 

1149class RedirectWithParams(web.RequestHandler): 

1150 """Same as web.RedirectHandler, but preserves URL parameters""" 

1151 

1152 def initialize(self, url: str, permanent: bool = True) -> None: 

1153 """Initialize a redirect handler.""" 

1154 self._url = url 

1155 self._permanent = permanent 

1156 

1157 @allow_unauthenticated 

1158 def get(self) -> None: 

1159 """Get a redirect.""" 

1160 sep = "&" if "?" in self._url else "?" 

1161 url = sep.join([self._url, self.request.query]) 

1162 self.redirect(url, permanent=self._permanent) 

1163 

1164 

1165class PrometheusMetricsHandler(JupyterHandler): 

1166 """ 

1167 Return prometheus metrics for this server 

1168 """ 

1169 

1170 @allow_unauthenticated 

1171 def get(self) -> None: 

1172 """Get prometheus metrics.""" 

1173 if self.settings["authenticate_prometheus"] and not self.logged_in: 

1174 raise web.HTTPError(403) 

1175 

1176 self.set_header("Content-Type", prometheus_client.CONTENT_TYPE_LATEST) 

1177 self.write(prometheus_client.generate_latest(prometheus_client.REGISTRY)) 

1178 

1179 

1180class PublicStaticFileHandler(web.StaticFileHandler): 

1181 """Same as web.StaticFileHandler, but decorated to acknowledge that auth is not required.""" 

1182 

1183 @allow_unauthenticated 

1184 def head(self, path: str) -> Awaitable[None]: 

1185 return super().head(path) 

1186 

1187 @allow_unauthenticated 

1188 def get(self, path: str, include_body: bool = True) -> Coroutine[Any, Any, None]: 

1189 return super().get(path, include_body) 

1190 

1191 

1192# ----------------------------------------------------------------------------- 

1193# URL pattern fragments for reuse 

1194# ----------------------------------------------------------------------------- 

1195 

1196# path matches any number of `/foo[/bar...]` or just `/` or '' 

1197path_regex = r"(?P<path>(?:(?:/[^/]+)+|/?))" 

1198 

1199# ----------------------------------------------------------------------------- 

1200# URL to handler mappings 

1201# ----------------------------------------------------------------------------- 

1202 

1203 

1204default_handlers = [ 

1205 (r".*/", TrailingSlashHandler), 

1206 (r"api", APIVersionHandler), 

1207 (r"/(robots\.txt|favicon\.ico)", PublicStaticFileHandler), 

1208 (r"/metrics", PrometheusMetricsHandler), 

1209]