Coverage for /pythoncovmergedfiles/medio/medio/src/jupyter_server/jupyter_server/base/handlers.py: 34%
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1"""Base Tornado handlers for the Jupyter server."""
3# Copyright (c) Jupyter Development Team.
4# Distributed under the terms of the Modified BSD License.
5from __future__ import annotations
7import functools
8import inspect
9import ipaddress
10import json
11import mimetypes
12import os
13import re
14import types
15import warnings
16from collections.abc import Awaitable, Coroutine, Sequence
17from http.client import responses
18from logging import Logger
19from typing import TYPE_CHECKING, Any, cast
20from urllib.parse import urlparse
22import prometheus_client
23from jinja2 import TemplateNotFound
24from jupyter_core.paths import is_hidden
25from tornado import web
26from tornado.log import app_log
27from traitlets.config import Application
29import jupyter_server
30from jupyter_server import CallContext
31from jupyter_server._sysinfo import get_sys_info
32from jupyter_server._tz import utcnow
33from jupyter_server.auth.decorator import allow_unauthenticated, authorized
34from jupyter_server.auth.identity import User
35from jupyter_server.i18n import combine_translations
36from jupyter_server.services.security import csp_report_uri
37from jupyter_server.utils import (
38 ensure_async,
39 filefind,
40 url_escape,
41 url_is_absolute,
42 url_path_join,
43 urldecode_unix_socket_path,
44)
46if TYPE_CHECKING:
47 from jupyter_client.kernelspec import KernelSpecManager
48 from jupyter_events import EventLogger
49 from jupyter_server_terminals.terminalmanager import TerminalManager
50 from tornado.concurrent import Future
52 from jupyter_server.auth.authorizer import Authorizer
53 from jupyter_server.auth.identity import IdentityProvider
54 from jupyter_server.serverapp import ServerApp
55 from jupyter_server.services.config.manager import ConfigManager
56 from jupyter_server.services.contents.manager import ContentsManager
57 from jupyter_server.services.kernels.kernelmanager import AsyncMappingKernelManager
58 from jupyter_server.services.sessions.sessionmanager import SessionManager
60# -----------------------------------------------------------------------------
61# Top-level handlers
62# -----------------------------------------------------------------------------
64_sys_info_cache = None
67def json_sys_info():
68 """Get sys info as json."""
69 global _sys_info_cache # noqa: PLW0603
70 if _sys_info_cache is None:
71 _sys_info_cache = json.dumps(get_sys_info())
72 return _sys_info_cache
75def log() -> Logger:
76 """Get the application log."""
77 if Application.initialized():
78 return cast(Logger, Application.instance().log)
79 else:
80 return app_log
83class AuthenticatedHandler(web.RequestHandler):
84 """A RequestHandler with an authenticated user."""
86 @property
87 def base_url(self) -> str:
88 return cast(str, self.settings.get("base_url", "/"))
90 @property
91 def content_security_policy(self) -> str:
92 """The default Content-Security-Policy header
94 Can be overridden by defining Content-Security-Policy in settings['headers']
95 """
96 if "Content-Security-Policy" in self.settings.get("headers", {}):
97 # user-specified, don't override
98 return cast(str, self.settings["headers"]["Content-Security-Policy"])
100 return "; ".join(
101 [
102 "frame-ancestors 'self'",
103 # Make sure the report-uri is relative to the base_url
104 "report-uri "
105 + self.settings.get("csp_report_uri", url_path_join(self.base_url, csp_report_uri)),
106 ]
107 )
109 def set_default_headers(self) -> None:
110 """Set the default headers."""
111 headers = {}
112 headers["X-Content-Type-Options"] = "nosniff"
113 headers.update(self.settings.get("headers", {}))
115 headers["Content-Security-Policy"] = self.content_security_policy
117 # Allow for overriding headers
118 for header_name, value in headers.items():
119 try:
120 self.set_header(header_name, value)
121 except Exception as e:
122 # tornado raise Exception (not a subclass)
123 # if method is unsupported (websocket and Access-Control-Allow-Origin
124 # for example, so just ignore)
125 self.log.exception( # type:ignore[attr-defined]
126 "Could not set default headers: %s", e
127 )
129 @property
130 def cookie_name(self) -> str:
131 warnings.warn(
132 """JupyterHandler.login_handler is deprecated in 2.0,
133 use JupyterHandler.identity_provider.
134 """,
135 DeprecationWarning,
136 stacklevel=2,
137 )
138 return self.identity_provider.get_cookie_name(self)
140 def force_clear_cookie(self, name: str, path: str = "/", domain: str | None = None) -> None:
141 """Force a cookie clear."""
142 warnings.warn(
143 """JupyterHandler.login_handler is deprecated in 2.0,
144 use JupyterHandler.identity_provider.
145 """,
146 DeprecationWarning,
147 stacklevel=2,
148 )
149 self.identity_provider._force_clear_cookie(self, name, path=path, domain=domain)
151 def clear_login_cookie(self) -> None:
152 """Clear a login cookie."""
153 warnings.warn(
154 """JupyterHandler.login_handler is deprecated in 2.0,
155 use JupyterHandler.identity_provider.
156 """,
157 DeprecationWarning,
158 stacklevel=2,
159 )
160 self.identity_provider.clear_login_cookie(self)
162 def get_current_user(self) -> str:
163 """Get the current user."""
164 clsname = self.__class__.__name__
165 msg = (
166 f"Calling `{clsname}.get_current_user()` directly is deprecated in jupyter-server 2.0."
167 " Use `self.current_user` instead (works in all versions)."
168 )
169 if hasattr(self, "_jupyter_current_user"):
170 # backward-compat: return _jupyter_current_user
171 warnings.warn(
172 msg,
173 DeprecationWarning,
174 stacklevel=2,
175 )
176 return cast(str, self._jupyter_current_user)
177 # haven't called get_user in prepare, raise
178 raise RuntimeError(msg)
180 def skip_check_origin(self) -> bool:
181 """Ask my login_handler if I should skip the origin_check
183 For example: in the default LoginHandler, if a request is token-authenticated,
184 origin checking should be skipped.
185 """
186 if self.request.method == "OPTIONS":
187 # no origin-check on options requests, which are used to check origins!
188 return True
189 return not self.identity_provider.should_check_origin(self)
191 @property
192 def token_authenticated(self) -> bool:
193 """Have I been authenticated with a token?"""
194 return self.identity_provider.is_token_authenticated(self)
196 @property
197 def logged_in(self) -> bool:
198 """Is a user currently logged in?"""
199 user = self.current_user
200 return bool(user and user != "anonymous")
202 @property
203 def login_handler(self) -> Any:
204 """Return the login handler for this application, if any."""
205 warnings.warn(
206 """JupyterHandler.login_handler is deprecated in 2.0,
207 use JupyterHandler.identity_provider.
208 """,
209 DeprecationWarning,
210 stacklevel=2,
211 )
212 return self.identity_provider.login_handler_class
214 @property
215 def token(self) -> str | None:
216 """Return the login token for this application, if any."""
217 return self.identity_provider.token
219 @property
220 def login_available(self) -> bool:
221 """May a user proceed to log in?
223 This returns True if login capability is available, irrespective of
224 whether the user is already logged in or not.
226 """
227 return cast(bool, self.identity_provider.login_available)
229 @property
230 def authorizer(self) -> Authorizer:
231 if "authorizer" not in self.settings:
232 warnings.warn(
233 "The Tornado web application does not have an 'authorizer' defined "
234 "in its settings. In future releases of jupyter_server, this will "
235 "be a required key for all subclasses of `JupyterHandler`. For an "
236 "example, see the jupyter_server source code for how to "
237 "add an authorizer to the tornado settings: "
238 "https://github.com/jupyter-server/jupyter_server/blob/"
239 "653740cbad7ce0c8a8752ce83e4d3c2c754b13cb/jupyter_server/serverapp.py"
240 "#L234-L256",
241 stacklevel=2,
242 )
243 from jupyter_server.auth import AllowAllAuthorizer
245 self.settings["authorizer"] = AllowAllAuthorizer(
246 config=self.settings.get("config", None),
247 identity_provider=self.identity_provider,
248 )
250 return cast("Authorizer", self.settings.get("authorizer"))
252 @property
253 def identity_provider(self) -> IdentityProvider:
254 if "identity_provider" not in self.settings:
255 warnings.warn(
256 "The Tornado web application does not have an 'identity_provider' defined "
257 "in its settings. In future releases of jupyter_server, this will "
258 "be a required key for all subclasses of `JupyterHandler`. For an "
259 "example, see the jupyter_server source code for how to "
260 "add an identity provider to the tornado settings: "
261 "https://github.com/jupyter-server/jupyter_server/blob/v2.0.0/"
262 "jupyter_server/serverapp.py#L242",
263 stacklevel=2,
264 )
265 from jupyter_server.auth import IdentityProvider
267 # no identity provider set, load default
268 self.settings["identity_provider"] = IdentityProvider(
269 config=self.settings.get("config", None)
270 )
271 return cast("IdentityProvider", self.settings["identity_provider"])
274class JupyterHandler(AuthenticatedHandler):
275 """Jupyter-specific extensions to authenticated handling
277 Mostly property shortcuts to Jupyter-specific settings.
278 """
280 @property
281 def config(self) -> dict[str, Any] | None:
282 return cast("dict[str, Any] | None", self.settings.get("config", None))
284 @property
285 def log(self) -> Logger:
286 """use the Jupyter log by default, falling back on tornado's logger"""
287 return log()
289 @property
290 def jinja_template_vars(self) -> dict[str, Any]:
291 """User-supplied values to supply to jinja templates."""
292 return cast("dict[str, Any]", self.settings.get("jinja_template_vars", {}))
294 @property
295 def serverapp(self) -> ServerApp | None:
296 return cast("ServerApp | None", self.settings["serverapp"])
298 # ---------------------------------------------------------------
299 # URLs
300 # ---------------------------------------------------------------
302 @property
303 def version_hash(self) -> str:
304 """The version hash to use for cache hints for static files"""
305 return cast(str, self.settings.get("version_hash", ""))
307 @property
308 def mathjax_url(self) -> str:
309 url = cast(str, self.settings.get("mathjax_url", ""))
310 if not url or url_is_absolute(url):
311 return url
312 return url_path_join(self.base_url, url)
314 @property
315 def mathjax_config(self) -> str:
316 return cast(str, self.settings.get("mathjax_config", "TeX-AMS-MML_HTMLorMML-full,Safe"))
318 @property
319 def default_url(self) -> str:
320 return cast(str, self.settings.get("default_url", ""))
322 @property
323 def ws_url(self) -> str:
324 return cast(str, self.settings.get("websocket_url", ""))
326 @property
327 def contents_js_source(self) -> str:
328 self.log.debug(
329 "Using contents: %s",
330 self.settings.get("contents_js_source", "services/contents"),
331 )
332 return cast(str, self.settings.get("contents_js_source", "services/contents"))
334 # ---------------------------------------------------------------
335 # Manager objects
336 # ---------------------------------------------------------------
338 @property
339 def kernel_manager(self) -> AsyncMappingKernelManager:
340 return cast("AsyncMappingKernelManager", self.settings["kernel_manager"])
342 @property
343 def contents_manager(self) -> ContentsManager:
344 return cast("ContentsManager", self.settings["contents_manager"])
346 @property
347 def session_manager(self) -> SessionManager:
348 return cast("SessionManager", self.settings["session_manager"])
350 @property
351 def terminal_manager(self) -> TerminalManager:
352 return cast("TerminalManager", self.settings["terminal_manager"])
354 @property
355 def kernel_spec_manager(self) -> KernelSpecManager:
356 return cast("KernelSpecManager", self.settings["kernel_spec_manager"])
358 @property
359 def config_manager(self) -> ConfigManager:
360 return cast("ConfigManager", self.settings["config_manager"])
362 @property
363 def event_logger(self) -> EventLogger:
364 return cast("EventLogger", self.settings["event_logger"])
366 # ---------------------------------------------------------------
367 # CORS
368 # ---------------------------------------------------------------
370 @property
371 def allow_origin(self) -> str:
372 """Normal Access-Control-Allow-Origin"""
373 return cast(str, self.settings.get("allow_origin", ""))
375 @property
376 def allow_origin_pat(self) -> str | None:
377 """Regular expression version of allow_origin"""
378 return cast("str | None", self.settings.get("allow_origin_pat", None))
380 @property
381 def allow_credentials(self) -> bool:
382 """Whether to set Access-Control-Allow-Credentials"""
383 return cast(bool, self.settings.get("allow_credentials", False))
385 def set_default_headers(self) -> None:
386 """Add CORS headers, if defined"""
387 super().set_default_headers()
389 def set_cors_headers(self) -> None:
390 """Add CORS headers, if defined
392 Now that current_user is async (jupyter-server 2.0),
393 must be called at the end of prepare(), instead of in set_default_headers.
394 """
395 if self.allow_origin:
396 self.set_header("Access-Control-Allow-Origin", self.allow_origin)
397 elif self.allow_origin_pat:
398 origin = self.get_origin()
399 if origin and re.match(self.allow_origin_pat, origin):
400 self.set_header("Access-Control-Allow-Origin", origin)
401 elif self.token_authenticated and "Access-Control-Allow-Origin" not in self.settings.get(
402 "headers", {}
403 ):
404 # allow token-authenticated requests cross-origin by default.
405 # only apply this exception if allow-origin has not been specified.
406 self.set_header("Access-Control-Allow-Origin", self.request.headers.get("Origin", ""))
408 if self.allow_credentials:
409 self.set_header("Access-Control-Allow-Credentials", "true")
411 def set_attachment_header(self, filename: str) -> None:
412 """Set Content-Disposition: attachment header
414 As a method to ensure handling of filename encoding
415 """
416 escaped_filename = url_escape(filename)
417 self.set_header(
418 "Content-Disposition",
419 f"attachment; filename*=utf-8''{escaped_filename}",
420 )
422 def get_origin(self) -> str | None:
423 # Handle WebSocket Origin naming convention differences
424 # The difference between version 8 and 13 is that in 8 the
425 # client sends a "Sec-Websocket-Origin" header and in 13 it's
426 # simply "Origin".
427 if "Origin" in self.request.headers:
428 origin = self.request.headers.get("Origin")
429 else:
430 origin = self.request.headers.get("Sec-Websocket-Origin", None)
431 return origin
433 # origin_to_satisfy_tornado is present because tornado requires
434 # check_origin to take an origin argument, but we don't use it
435 def check_origin(self, origin_to_satisfy_tornado: str = "") -> bool:
436 """Check Origin for cross-site API requests, including websockets
438 Copied from WebSocket with changes:
440 - allow unspecified host/origin (e.g. scripts)
441 - allow token-authenticated requests
442 """
443 if self.allow_origin == "*" or self.skip_check_origin():
444 return True
446 host = self.request.headers.get("Host")
447 origin = self.request.headers.get("Origin")
449 # If no header is provided, let the request through.
450 # Origin can be None for:
451 # - same-origin (IE, Firefox)
452 # - Cross-site POST form (IE, Firefox)
453 # - Scripts
454 # The cross-site POST (XSRF) case is handled by tornado's xsrf_token
455 if origin is None or host is None:
456 return True
458 origin = origin.lower()
459 origin_host = urlparse(origin).netloc
461 # OK if origin matches host
462 if origin_host == host:
463 return True
465 # Check CORS headers
466 if self.allow_origin:
467 allow = bool(self.allow_origin == origin)
468 elif self.allow_origin_pat:
469 allow = bool(re.match(self.allow_origin_pat, origin))
470 else:
471 # No CORS headers deny the request
472 allow = False
473 if not allow:
474 self.log.warning(
475 "Blocking Cross Origin API request for %s. Origin: %s, Host: %s",
476 self.request.path,
477 origin,
478 host,
479 )
480 return allow
482 def check_referer(self) -> bool:
483 """Check Referer for cross-site requests.
484 Disables requests to certain endpoints with
485 external or missing Referer.
486 If set, allow_origin settings are applied to the Referer
487 to whitelist specific cross-origin sites.
488 Used on GET for api endpoints and /files/
489 to block cross-site inclusion (XSSI).
490 """
491 if self.allow_origin == "*" or self.skip_check_origin():
492 return True
494 host = self.request.headers.get("Host")
495 referer = self.request.headers.get("Referer")
497 if not host:
498 self.log.warning("Blocking request with no host")
499 return False
500 if not referer:
501 self.log.warning("Blocking request with no referer")
502 return False
504 referer_url = urlparse(referer)
505 referer_host = referer_url.netloc
506 if referer_host == host:
507 return True
509 # apply cross-origin checks to Referer:
510 origin = f"{referer_url.scheme}://{referer_url.netloc}"
511 if self.allow_origin:
512 allow = self.allow_origin == origin
513 elif self.allow_origin_pat:
514 allow = bool(re.match(self.allow_origin_pat, origin))
515 else:
516 # No CORS settings, deny the request
517 allow = False
519 if not allow:
520 self.log.warning(
521 "Blocking Cross Origin request for %s. Referer: %s, Host: %s",
522 self.request.path,
523 origin,
524 host,
525 )
526 return allow
528 def check_xsrf_cookie(self) -> None:
529 """Bypass xsrf cookie checks when token-authenticated"""
530 if not hasattr(self, "_jupyter_current_user"):
531 # Called too early, will be checked later
532 return None
533 if self.token_authenticated or self.settings.get("disable_check_xsrf", False):
534 # Token-authenticated requests do not need additional XSRF-check
535 # Servers without authentication are vulnerable to XSRF
536 return None
537 try:
538 return super().check_xsrf_cookie()
539 except web.HTTPError as e:
540 if self.request.method in {"GET", "HEAD"}:
541 # Consider Referer a sufficient cross-origin check for GET requests
542 if not self.check_referer():
543 referer = self.request.headers.get("Referer")
544 if referer:
545 msg = f"Blocking Cross Origin request from {referer}."
546 else:
547 msg = "Blocking request from unknown origin"
548 raise web.HTTPError(403, msg) from e
549 else:
550 raise
552 def check_host(self) -> bool:
553 """Check the host header if remote access disallowed.
555 Returns True if the request should continue, False otherwise.
556 """
557 if self.settings.get("allow_remote_access", False):
558 return True
560 # Remove port (e.g. ':8888') from host
561 match = re.match(r"^(.*?)(:\d+)?$", self.request.host)
562 assert match is not None
563 host = match.group(1)
565 # Browsers format IPv6 addresses like [::1]; we need to remove the []
566 if host.startswith("[") and host.endswith("]"):
567 host = host[1:-1]
569 # UNIX socket handling
570 check_host = urldecode_unix_socket_path(host)
571 if check_host.startswith("/") and os.path.exists(check_host):
572 allow = True
573 else:
574 try:
575 addr = ipaddress.ip_address(host)
576 except ValueError:
577 # Not an IP address: check against hostnames
578 allow = host in self.settings.get("local_hostnames", ["localhost"])
579 else:
580 allow = addr.is_loopback
582 if not allow:
583 self.log.warning(
584 (
585 "Blocking request with non-local 'Host' %s (%s). "
586 "If the server should be accessible at that name, "
587 "set ServerApp.allow_remote_access to disable the check."
588 ),
589 host,
590 self.request.host,
591 )
592 return allow
594 async def prepare(self, *, _redirect_to_login=True) -> Awaitable[None] | None: # type:ignore[override]
595 """Prepare a response."""
596 # Set the current Jupyter Handler context variable.
597 CallContext.set(CallContext.JUPYTER_HANDLER, self)
599 if not self.check_host():
600 self.current_user = self._jupyter_current_user = None
601 raise web.HTTPError(403)
603 from jupyter_server.auth import IdentityProvider
605 mod_obj = inspect.getmodule(self.get_current_user)
606 assert mod_obj is not None
607 user: User | None = None
609 if type(self.identity_provider) is IdentityProvider and mod_obj.__name__ != __name__:
610 # check for overridden get_current_user + default IdentityProvider
611 # deprecated way to override auth (e.g. JupyterHub < 3.0)
612 # allow deprecated, overridden get_current_user
613 warnings.warn(
614 "Overriding JupyterHandler.get_current_user is deprecated in jupyter-server 2.0."
615 " Use an IdentityProvider class.",
616 DeprecationWarning,
617 stacklevel=1,
618 )
619 user = User(self.get_current_user())
620 else:
621 _user = self.identity_provider.get_user(self)
622 if isinstance(_user, Awaitable):
623 # IdentityProvider.get_user _may_ be async
624 _user = await _user
625 user = _user
627 # self.current_user for tornado's @web.authenticated
628 # self._jupyter_current_user for backward-compat in deprecated get_current_user calls
629 # and our own private checks for whether .current_user has been set
630 self.current_user = self._jupyter_current_user = user
631 # complete initial steps which require auth to resolve first:
632 self.set_cors_headers()
633 if self.request.method not in {"GET", "HEAD", "OPTIONS"}:
634 self.check_xsrf_cookie()
636 if not self.settings.get("allow_unauthenticated_access", False):
637 if not self.request.method:
638 raise HTTPError(403)
639 method = getattr(self, self.request.method.lower())
640 if not getattr(method, "__allow_unauthenticated", False):
641 if _redirect_to_login:
642 # reuse `web.authenticated` logic, which redirects to the login
643 # page on GET and HEAD and otherwise raises 403
644 return web.authenticated(lambda _: super().prepare())(self)
645 else:
646 # raise 403 if user is not known without redirecting to login page
647 user = self.current_user
648 if user is None:
649 self.log.warning(
650 f"Couldn't authenticate {self.__class__.__name__} connection"
651 )
652 raise web.HTTPError(403)
654 return super().prepare()
656 # ---------------------------------------------------------------
657 # template rendering
658 # ---------------------------------------------------------------
660 def get_template(self, name):
661 """Return the jinja template object for a given name"""
662 return self.settings["jinja2_env"].get_template(name)
664 def render_template(self, name, **ns):
665 """Render a template by name."""
666 ns.update(self.template_namespace)
667 template = self.get_template(name)
668 return template.render(**ns)
670 @property
671 def template_namespace(self) -> dict[str, Any]:
672 return dict(
673 base_url=self.base_url,
674 default_url=self.default_url,
675 ws_url=self.ws_url,
676 logged_in=self.logged_in,
677 allow_password_change=getattr(self.identity_provider, "allow_password_change", False),
678 auth_enabled=self.identity_provider.auth_enabled,
679 login_available=self.identity_provider.login_available,
680 token_available=bool(self.token),
681 static_url=self.static_url,
682 sys_info=json_sys_info(),
683 contents_js_source=self.contents_js_source,
684 version_hash=self.version_hash,
685 xsrf_form_html=self.xsrf_form_html,
686 token=self.token,
687 xsrf_token=self.xsrf_token.decode("utf8"),
688 nbjs_translations=json.dumps(
689 combine_translations(self.request.headers.get("Accept-Language", ""))
690 ),
691 **self.jinja_template_vars,
692 )
694 def get_json_body(self) -> dict[str, Any] | None:
695 """Return the body of the request as JSON data."""
696 if not self.request.body:
697 return None
698 # Do we need to call body.decode('utf-8') here?
699 body = self.request.body.strip().decode("utf-8")
700 try:
701 model = json.loads(body)
702 except Exception as e:
703 self.log.debug("Bad JSON: %r", body)
704 self.log.error("Couldn't parse JSON", exc_info=True)
705 raise web.HTTPError(400, "Invalid JSON in body of request") from e
706 return cast("dict[str, Any]", model)
708 def write_error(self, status_code: int, **kwargs: Any) -> None:
709 """render custom error pages"""
710 exc_info = kwargs.get("exc_info")
711 message = ""
712 status_message = responses.get(status_code, "Unknown HTTP Error")
714 if exc_info:
715 exception = exc_info[1]
716 # get the custom message, if defined
717 try:
718 message = exception.log_message % exception.args
719 except Exception:
720 pass
722 # construct the custom reason, if defined
723 reason = getattr(exception, "reason", "")
724 if reason:
725 status_message = reason
726 else:
727 exception = "(unknown)"
729 # build template namespace
730 ns = {
731 "status_code": status_code,
732 "status_message": status_message,
733 "message": message,
734 "exception": exception,
735 }
737 self.set_header("Content-Type", "text/html")
738 # render the template
739 try:
740 html = self.render_template("%s.html" % status_code, **ns)
741 except TemplateNotFound:
742 html = self.render_template("error.html", **ns)
744 self.write(html)
747class APIHandler(JupyterHandler):
748 """Base class for API handlers"""
750 async def prepare(self) -> None: # type:ignore[override]
751 """Prepare an API response."""
752 await super().prepare()
753 if not self.check_origin():
754 raise web.HTTPError(404)
756 def write_error(self, status_code: int, **kwargs: Any) -> None:
757 """APIHandler errors are JSON, not human pages"""
758 self.set_header("Content-Type", "application/json")
759 message = responses.get(status_code, "Unknown HTTP Error")
760 reply: dict[str, Any] = {
761 "message": message,
762 }
763 exc_info = kwargs.get("exc_info")
764 if exc_info:
765 e = exc_info[1]
766 if isinstance(e, HTTPError):
767 reply["message"] = e.log_message or message
768 reply["reason"] = e.reason
769 else:
770 reply["message"] = "Unhandled error"
771 reply["reason"] = None
772 # backward-compatibility: traceback field is present,
773 # but always empty
774 reply["traceback"] = ""
775 self.log.warning("wrote error: %r", reply["message"], exc_info=True)
776 self.finish(json.dumps(reply))
778 def get_login_url(self) -> str:
779 """Get the login url."""
780 # if get_login_url is invoked in an API handler,
781 # that means @web.authenticated is trying to trigger a redirect.
782 # instead of redirecting, raise 403 instead.
783 if not self.current_user:
784 raise web.HTTPError(403)
785 return super().get_login_url()
787 @property
788 def content_security_policy(self) -> str:
789 csp = "; ".join( # noqa: FLY002
790 [
791 super().content_security_policy,
792 "default-src 'none'",
793 ]
794 )
795 return csp
797 # set _track_activity = False on API handlers that shouldn't track activity
798 _track_activity = True
800 def update_api_activity(self) -> None:
801 """Update last_activity of API requests"""
802 # record activity of authenticated requests
803 if (
804 self._track_activity
805 and getattr(self, "_jupyter_current_user", None)
806 and self.get_argument("no_track_activity", None) is None
807 ):
808 self.settings["api_last_activity"] = utcnow()
810 def finish(self, *args: Any, **kwargs: Any) -> Future[Any]:
811 """Finish an API response."""
812 self.update_api_activity()
813 # Allow caller to indicate content-type...
814 set_content_type = kwargs.pop("set_content_type", "application/json")
815 self.set_header("Content-Type", set_content_type)
816 return super().finish(*args, **kwargs)
818 @allow_unauthenticated
819 def options(self, *args: Any, **kwargs: Any) -> None:
820 """Get the options."""
821 if "Access-Control-Allow-Headers" in self.settings.get("headers", {}):
822 self.set_header(
823 "Access-Control-Allow-Headers",
824 self.settings["headers"]["Access-Control-Allow-Headers"],
825 )
826 else:
827 self.set_header(
828 "Access-Control-Allow-Headers",
829 "accept, content-type, authorization, x-xsrftoken",
830 )
831 self.set_header("Access-Control-Allow-Methods", "GET, PUT, POST, PATCH, DELETE, OPTIONS")
833 # if authorization header is requested,
834 # that means the request is token-authenticated.
835 # avoid browser-side rejection of the preflight request.
836 # only allow this exception if allow_origin has not been specified
837 # and Jupyter server authentication is enabled.
838 # If the token is not valid, the 'real' request will still be rejected.
839 requested_headers = self.request.headers.get("Access-Control-Request-Headers", "").split(
840 ","
841 )
842 if (
843 requested_headers
844 and any(h.strip().lower() == "authorization" for h in requested_headers)
845 and (
846 # FIXME: it would be even better to check specifically for token-auth,
847 # but there is currently no API for this.
848 self.login_available
849 )
850 and (
851 self.allow_origin
852 or self.allow_origin_pat
853 or "Access-Control-Allow-Origin" in self.settings.get("headers", {})
854 )
855 ):
856 self.set_header("Access-Control-Allow-Origin", self.request.headers.get("Origin", ""))
859class Template404(JupyterHandler):
860 """Render our 404 template"""
862 async def prepare(self) -> None: # type:ignore[override]
863 """Prepare a 404 response."""
864 await super().prepare()
865 raise web.HTTPError(404)
868class AuthenticatedFileHandler(JupyterHandler, web.StaticFileHandler):
869 """static files should only be accessible when logged in"""
871 auth_resource = "contents"
873 @property
874 def content_security_policy(self) -> str:
875 # In case we're serving HTML/SVG, confine any Javascript to a unique
876 # origin so it can't interact with the Jupyter server.
877 return super().content_security_policy + "; sandbox allow-scripts"
879 @web.authenticated
880 @authorized
881 def head(self, path: str) -> Awaitable[None]: # type:ignore[override]
882 """Get the head response for a path."""
883 self.check_xsrf_cookie()
884 return super().head(path)
886 @web.authenticated
887 @authorized
888 def get( # type:ignore[override]
889 self, path: str, **kwargs: Any
890 ) -> Awaitable[None]:
891 """Get a file by path."""
892 self.check_xsrf_cookie()
893 if os.path.splitext(path)[1] == ".ipynb" or self.get_argument("download", None):
894 name = path.rsplit("/", 1)[-1]
895 self.set_attachment_header(name)
897 return web.StaticFileHandler.get(self, path, **kwargs)
899 def get_content_type(self) -> str:
900 """Get the content type."""
901 assert self.absolute_path is not None
902 path = self.absolute_path.strip("/")
903 if "/" in path:
904 _, name = path.rsplit("/", 1)
905 else:
906 name = path
907 if name.endswith(".ipynb"):
908 return "application/x-ipynb+json"
909 else:
910 cur_mime = mimetypes.guess_type(name)[0]
911 if cur_mime == "text/plain":
912 return "text/plain; charset=UTF-8"
913 else:
914 return super().get_content_type()
916 def set_headers(self) -> None:
917 """Set the headers."""
918 super().set_headers()
919 # disable browser caching, rely on 304 replies for savings
920 if "v" not in self.request.arguments:
921 self.add_header("Cache-Control", "no-cache")
923 def compute_etag(self) -> str | None:
924 """Compute the etag."""
925 return None
927 def validate_absolute_path(self, root: str, absolute_path: str) -> str:
928 """Validate and return the absolute path.
930 Requires tornado 3.1
932 Adding to tornado's own handling, forbids the serving of hidden files.
933 """
934 abs_path = super().validate_absolute_path(root, absolute_path)
935 abs_root = os.path.abspath(root)
936 assert abs_path is not None
937 if not self.contents_manager.allow_hidden and is_hidden(abs_path, abs_root):
938 self.log.info(
939 "Refusing to serve hidden file, via 404 Error, use flag 'ContentsManager.allow_hidden' to enable"
940 )
941 raise web.HTTPError(404)
942 return abs_path
945def json_errors(method: Any) -> Any: # pragma: no cover
946 """Decorate methods with this to return GitHub style JSON errors.
948 This should be used on any JSON API on any handler method that can raise HTTPErrors.
950 This will grab the latest HTTPError exception using sys.exc_info
951 and then:
953 1. Set the HTTP status code based on the HTTPError
954 2. Create and return a JSON body with a message field describing
955 the error in a human readable form.
956 """
957 warnings.warn(
958 "@json_errors is deprecated in notebook 5.2.0. Subclass APIHandler instead.",
959 DeprecationWarning,
960 stacklevel=2,
961 )
963 @functools.wraps(method)
964 def wrapper(self, *args, **kwargs):
965 self.write_error = types.MethodType(APIHandler.write_error, self)
966 return method(self, *args, **kwargs)
968 return wrapper
971# -----------------------------------------------------------------------------
972# File handler
973# -----------------------------------------------------------------------------
975# to minimize subclass changes:
976HTTPError = web.HTTPError
979class FileFindHandler(JupyterHandler, web.StaticFileHandler):
980 """subclass of StaticFileHandler for serving files from a search path
982 The setting "static_immutable_cache" can be set up to serve some static
983 file as immutable (e.g. file name containing a hash). The setting is a
984 list of base URL, every static file URL starting with one of those will
985 be immutable.
986 """
988 # cache search results, don't search for files more than once
989 _static_paths: dict[str, str] = {}
990 root: tuple[str] # type:ignore[assignment]
992 def set_headers(self) -> None:
993 """Set the headers."""
994 super().set_headers()
996 immutable_paths = self.settings.get("static_immutable_cache", [])
998 # allow immutable cache for files
999 if any(self.request.path.startswith(path) for path in immutable_paths):
1000 self.set_header("Cache-Control", "public, max-age=31536000, immutable")
1002 # disable browser caching, rely on 304 replies for savings
1003 elif "v" not in self.request.arguments or any(
1004 self.request.path.startswith(path) for path in self.no_cache_paths
1005 ):
1006 self.set_header("Cache-Control", "no-cache")
1008 def initialize(
1009 self,
1010 path: str | list[str],
1011 default_filename: str | None = None,
1012 no_cache_paths: list[str] | None = None,
1013 ) -> None:
1014 """Initialize the file find handler."""
1015 self.no_cache_paths = no_cache_paths or []
1017 if isinstance(path, str):
1018 path = [path]
1020 self.root = tuple(os.path.abspath(os.path.expanduser(p)) + os.sep for p in path) # type:ignore[assignment]
1021 self.default_filename = default_filename
1023 def compute_etag(self) -> str | None:
1024 """Compute the etag."""
1025 return None
1027 # access is allowed as this class is used to serve static assets on login page
1028 # TODO: create an allow-list of files used on login page and remove this decorator
1029 @allow_unauthenticated
1030 def get(self, path: str, include_body: bool = True) -> Coroutine[Any, Any, None]:
1031 return super().get(path, include_body)
1033 # access is allowed as this class is used to serve static assets on login page
1034 # TODO: create an allow-list of files used on login page and remove this decorator
1035 @allow_unauthenticated
1036 def head(self, path: str) -> Awaitable[None]:
1037 return super().head(path)
1039 @classmethod
1040 def get_absolute_path(cls, roots: Sequence[str], path: str) -> str:
1041 """locate a file to serve on our static file search path"""
1042 with cls._lock:
1043 if path in cls._static_paths:
1044 return cls._static_paths[path]
1045 try:
1046 abspath = os.path.abspath(filefind(path, roots))
1047 except OSError:
1048 # IOError means not found
1049 return ""
1051 cls._static_paths[path] = abspath
1053 log().debug(f"Path {path} served from {abspath}")
1054 return abspath
1056 def validate_absolute_path(self, root: str, absolute_path: str) -> str | None:
1057 """check if the file should be served (raises 404, 403, etc.)"""
1058 if not absolute_path:
1059 raise web.HTTPError(404)
1061 for root in self.root:
1062 if (absolute_path + os.sep).startswith(root):
1063 break
1065 return super().validate_absolute_path(root, absolute_path)
1068class APIVersionHandler(APIHandler):
1069 """An API handler for the server version."""
1071 _track_activity = False
1073 @allow_unauthenticated
1074 def get(self) -> None:
1075 """Get the server version info."""
1076 # not authenticated, so give as few info as possible
1077 self.finish(json.dumps({"version": jupyter_server.__version__}))
1080class TrailingSlashHandler(web.RequestHandler):
1081 """Simple redirect handler that strips trailing slashes
1083 This should be the first, highest priority handler.
1084 """
1086 @allow_unauthenticated
1087 def get(self) -> None:
1088 """Handle trailing slashes in a get."""
1089 assert self.request.uri is not None
1090 path, *rest = self.request.uri.partition("?")
1091 # trim trailing *and* leading /
1092 # to avoid misinterpreting repeated '//'
1093 path = "/" + path.strip("/")
1094 new_uri = "".join([path, *rest])
1095 self.redirect(new_uri)
1097 post = put = get
1100class MainHandler(JupyterHandler):
1101 """Simple handler for base_url."""
1103 @allow_unauthenticated
1104 def get(self) -> None:
1105 """Get the main template."""
1106 html = self.render_template("main.html")
1107 self.write(html)
1109 post = put = get
1112class FilesRedirectHandler(JupyterHandler):
1113 """Handler for redirecting relative URLs to the /files/ handler"""
1115 @staticmethod
1116 async def redirect_to_files(self: Any, path: str) -> None:
1117 """make redirect logic a reusable static method
1119 so it can be called from other handlers.
1120 """
1121 cm = self.contents_manager
1122 if await ensure_async(cm.dir_exists(path)):
1123 # it's a *directory*, redirect to /tree
1124 url = url_path_join(self.base_url, "tree", url_escape(path))
1125 else:
1126 orig_path = path
1127 # otherwise, redirect to /files
1128 parts = path.split("/")
1130 if not await ensure_async(cm.file_exists(path=path)) and "files" in parts:
1131 # redirect without files/ iff it would 404
1132 # this preserves pre-2.0-style 'files/' links
1133 self.log.warning("Deprecated files/ URL: %s", orig_path)
1134 parts.remove("files")
1135 path = "/".join(parts)
1137 if not await ensure_async(cm.file_exists(path=path)):
1138 raise web.HTTPError(404)
1140 url = url_path_join(self.base_url, "files", url_escape(path))
1141 self.log.debug("Redirecting %s to %s", self.request.path, url)
1142 self.redirect(url)
1144 @allow_unauthenticated
1145 async def get(self, path: str = "") -> None:
1146 return await self.redirect_to_files(self, path)
1149class RedirectWithParams(web.RequestHandler):
1150 """Same as web.RedirectHandler, but preserves URL parameters"""
1152 def initialize(self, url: str, permanent: bool = True) -> None:
1153 """Initialize a redirect handler."""
1154 self._url = url
1155 self._permanent = permanent
1157 @allow_unauthenticated
1158 def get(self) -> None:
1159 """Get a redirect."""
1160 sep = "&" if "?" in self._url else "?"
1161 url = sep.join([self._url, self.request.query])
1162 self.redirect(url, permanent=self._permanent)
1165class PrometheusMetricsHandler(JupyterHandler):
1166 """
1167 Return prometheus metrics for this server
1168 """
1170 @allow_unauthenticated
1171 def get(self) -> None:
1172 """Get prometheus metrics."""
1173 if self.settings["authenticate_prometheus"] and not self.logged_in:
1174 raise web.HTTPError(403)
1176 self.set_header("Content-Type", prometheus_client.CONTENT_TYPE_LATEST)
1177 self.write(prometheus_client.generate_latest(prometheus_client.REGISTRY))
1180class PublicStaticFileHandler(web.StaticFileHandler):
1181 """Same as web.StaticFileHandler, but decorated to acknowledge that auth is not required."""
1183 @allow_unauthenticated
1184 def head(self, path: str) -> Awaitable[None]:
1185 return super().head(path)
1187 @allow_unauthenticated
1188 def get(self, path: str, include_body: bool = True) -> Coroutine[Any, Any, None]:
1189 return super().get(path, include_body)
1192# -----------------------------------------------------------------------------
1193# URL pattern fragments for reuse
1194# -----------------------------------------------------------------------------
1196# path matches any number of `/foo[/bar...]` or just `/` or ''
1197path_regex = r"(?P<path>(?:(?:/[^/]+)+|/?))"
1199# -----------------------------------------------------------------------------
1200# URL to handler mappings
1201# -----------------------------------------------------------------------------
1204default_handlers = [
1205 (r".*/", TrailingSlashHandler),
1206 (r"api", APIVersionHandler),
1207 (r"/(robots\.txt|favicon\.ico)", PublicStaticFileHandler),
1208 (r"/metrics", PrometheusMetricsHandler),
1209]