Coverage for /pythoncovmergedfiles/medio/medio/src/jupyter_server/jupyter_server/base/handlers.py: 33%
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1"""Base Tornado handlers for the Jupyter server."""
3# Copyright (c) Jupyter Development Team.
4# Distributed under the terms of the Modified BSD License.
5from __future__ import annotations
7import functools
8import inspect
9import ipaddress
10import json
11import mimetypes
12import os
13import re
14import types
15import warnings
16from collections.abc import Awaitable, Coroutine, Sequence
17from http.client import responses
18from logging import Logger
19from typing import TYPE_CHECKING, Any, cast
20from urllib.parse import urlparse
22import prometheus_client
23from jinja2 import TemplateNotFound
24from jupyter_core.paths import is_hidden
25from tornado import web
26from tornado.log import app_log
27from traitlets.config import Application
29import jupyter_server
30from jupyter_server import CallContext
31from jupyter_server._sysinfo import get_sys_info
32from jupyter_server._tz import utcnow
33from jupyter_server.auth.decorator import allow_unauthenticated, authorized
34from jupyter_server.auth.identity import User
35from jupyter_server.i18n import combine_translations
36from jupyter_server.services.security import csp_report_uri
37from jupyter_server.utils import (
38 ensure_async,
39 filefind,
40 url_escape,
41 url_is_absolute,
42 url_path_join,
43 urldecode_unix_socket_path,
44)
46if TYPE_CHECKING:
47 from jupyter_client.kernelspec import KernelSpecManager
48 from jupyter_events import EventLogger
49 from jupyter_server_terminals.terminalmanager import TerminalManager
50 from tornado.concurrent import Future
52 from jupyter_server.auth.authorizer import Authorizer
53 from jupyter_server.auth.identity import IdentityProvider
54 from jupyter_server.serverapp import ServerApp
55 from jupyter_server.services.config.manager import ConfigManager
56 from jupyter_server.services.contents.manager import ContentsManager
57 from jupyter_server.services.kernels.kernelmanager import AsyncMappingKernelManager
58 from jupyter_server.services.sessions.sessionmanager import SessionManager
60# -----------------------------------------------------------------------------
61# Top-level handlers
62# -----------------------------------------------------------------------------
64_sys_info_cache = None
67def json_sys_info():
68 """Get sys info as json."""
69 global _sys_info_cache # noqa: PLW0603
70 if _sys_info_cache is None:
71 _sys_info_cache = json.dumps(get_sys_info())
72 return _sys_info_cache
75def log() -> Logger:
76 """Get the application log."""
77 if Application.initialized():
78 return cast(Logger, Application.instance().log)
79 else:
80 return app_log
83class AuthenticatedHandler(web.RequestHandler):
84 """A RequestHandler with an authenticated user."""
86 @property
87 def base_url(self) -> str:
88 return cast(str, self.settings.get("base_url", "/"))
90 @property
91 def content_security_policy(self) -> str:
92 """The default Content-Security-Policy header
94 Can be overridden by defining Content-Security-Policy in settings['headers']
95 """
96 if "Content-Security-Policy" in self.settings.get("headers", {}):
97 # user-specified, don't override
98 return cast(str, self.settings["headers"]["Content-Security-Policy"])
100 return "; ".join(
101 [
102 "frame-ancestors 'self'",
103 # Make sure the report-uri is relative to the base_url
104 "report-uri "
105 + self.settings.get("csp_report_uri", url_path_join(self.base_url, csp_report_uri)),
106 ]
107 )
109 def set_default_headers(self) -> None:
110 """Set the default headers."""
111 headers = {}
112 headers["X-Content-Type-Options"] = "nosniff"
113 headers.update(self.settings.get("headers", {}))
115 headers["Content-Security-Policy"] = self.content_security_policy
117 # Allow for overriding headers
118 for header_name, value in headers.items():
119 try:
120 self.set_header(header_name, value)
121 except Exception as e:
122 # tornado raise Exception (not a subclass)
123 # if method is unsupported (websocket and Access-Control-Allow-Origin
124 # for example, so just ignore)
125 self.log.exception( # type:ignore[attr-defined]
126 "Could not set default headers: %s", e
127 )
129 @property
130 def cookie_name(self) -> str:
131 warnings.warn(
132 """JupyterHandler.login_handler is deprecated in 2.0,
133 use JupyterHandler.identity_provider.
134 """,
135 DeprecationWarning,
136 stacklevel=2,
137 )
138 return self.identity_provider.get_cookie_name(self)
140 def force_clear_cookie(self, name: str, path: str = "/", domain: str | None = None) -> None:
141 """Force a cookie clear."""
142 warnings.warn(
143 """JupyterHandler.login_handler is deprecated in 2.0,
144 use JupyterHandler.identity_provider.
145 """,
146 DeprecationWarning,
147 stacklevel=2,
148 )
149 self.identity_provider._force_clear_cookie(self, name, path=path, domain=domain)
151 def clear_login_cookie(self) -> None:
152 """Clear a login cookie."""
153 warnings.warn(
154 """JupyterHandler.login_handler is deprecated in 2.0,
155 use JupyterHandler.identity_provider.
156 """,
157 DeprecationWarning,
158 stacklevel=2,
159 )
160 self.identity_provider.clear_login_cookie(self)
162 def get_current_user(self) -> str:
163 """Get the current user."""
164 clsname = self.__class__.__name__
165 msg = (
166 f"Calling `{clsname}.get_current_user()` directly is deprecated in jupyter-server 2.0."
167 " Use `self.current_user` instead (works in all versions)."
168 )
169 if hasattr(self, "_jupyter_current_user"):
170 # backward-compat: return _jupyter_current_user
171 warnings.warn(
172 msg,
173 DeprecationWarning,
174 stacklevel=2,
175 )
176 return cast(str, self._jupyter_current_user)
177 # haven't called get_user in prepare, raise
178 raise RuntimeError(msg)
180 def skip_check_origin(self) -> bool:
181 """Ask my login_handler if I should skip the origin_check
183 For example: in the default LoginHandler, if a request is token-authenticated,
184 origin checking should be skipped.
185 """
186 if self.request.method == "OPTIONS":
187 # no origin-check on options requests, which are used to check origins!
188 return True
189 return not self.identity_provider.should_check_origin(self)
191 @property
192 def token_authenticated(self) -> bool:
193 """Have I been authenticated with a token?"""
194 return self.identity_provider.is_token_authenticated(self)
196 @property
197 def logged_in(self) -> bool:
198 """Is a user currently logged in?"""
199 user = self.current_user
200 return bool(user and user != "anonymous")
202 @property
203 def login_handler(self) -> Any:
204 """Return the login handler for this application, if any."""
205 warnings.warn(
206 """JupyterHandler.login_handler is deprecated in 2.0,
207 use JupyterHandler.identity_provider.
208 """,
209 DeprecationWarning,
210 stacklevel=2,
211 )
212 return self.identity_provider.login_handler_class
214 @property
215 def token(self) -> str | None:
216 """Return the login token for this application, if any."""
217 return self.identity_provider.token
219 @property
220 def login_available(self) -> bool:
221 """May a user proceed to log in?
223 This returns True if login capability is available, irrespective of
224 whether the user is already logged in or not.
226 """
227 return cast(bool, self.identity_provider.login_available)
229 @property
230 def authorizer(self) -> Authorizer:
231 if "authorizer" not in self.settings:
232 warnings.warn(
233 "The Tornado web application does not have an 'authorizer' defined "
234 "in its settings. In future releases of jupyter_server, this will "
235 "be a required key for all subclasses of `JupyterHandler`. For an "
236 "example, see the jupyter_server source code for how to "
237 "add an authorizer to the tornado settings: "
238 "https://github.com/jupyter-server/jupyter_server/blob/"
239 "653740cbad7ce0c8a8752ce83e4d3c2c754b13cb/jupyter_server/serverapp.py"
240 "#L234-L256",
241 stacklevel=2,
242 )
243 from jupyter_server.auth import AllowAllAuthorizer
245 self.settings["authorizer"] = AllowAllAuthorizer(
246 config=self.settings.get("config", None),
247 identity_provider=self.identity_provider,
248 )
250 return cast("Authorizer", self.settings.get("authorizer"))
252 @property
253 def identity_provider(self) -> IdentityProvider:
254 if "identity_provider" not in self.settings:
255 warnings.warn(
256 "The Tornado web application does not have an 'identity_provider' defined "
257 "in its settings. In future releases of jupyter_server, this will "
258 "be a required key for all subclasses of `JupyterHandler`. For an "
259 "example, see the jupyter_server source code for how to "
260 "add an identity provider to the tornado settings: "
261 "https://github.com/jupyter-server/jupyter_server/blob/v2.0.0/"
262 "jupyter_server/serverapp.py#L242",
263 stacklevel=2,
264 )
265 from jupyter_server.auth import IdentityProvider
267 # no identity provider set, load default
268 self.settings["identity_provider"] = IdentityProvider(
269 config=self.settings.get("config", None)
270 )
271 return cast("IdentityProvider", self.settings["identity_provider"])
274class JupyterHandler(AuthenticatedHandler):
275 """Jupyter-specific extensions to authenticated handling
277 Mostly property shortcuts to Jupyter-specific settings.
278 """
280 @property
281 def config(self) -> dict[str, Any] | None:
282 return cast("dict[str, Any] | None", self.settings.get("config", None))
284 @property
285 def log(self) -> Logger:
286 """use the Jupyter log by default, falling back on tornado's logger"""
287 return log()
289 @property
290 def jinja_template_vars(self) -> dict[str, Any]:
291 """User-supplied values to supply to jinja templates."""
292 return cast("dict[str, Any]", self.settings.get("jinja_template_vars", {}))
294 @property
295 def serverapp(self) -> ServerApp | None:
296 return cast("ServerApp | None", self.settings["serverapp"])
298 # ---------------------------------------------------------------
299 # URLs
300 # ---------------------------------------------------------------
302 @property
303 def version_hash(self) -> str:
304 """The version hash to use for cache hints for static files"""
305 return cast(str, self.settings.get("version_hash", ""))
307 @property
308 def mathjax_url(self) -> str:
309 url = cast(str, self.settings.get("mathjax_url", ""))
310 if not url or url_is_absolute(url):
311 return url
312 return url_path_join(self.base_url, url)
314 @property
315 def mathjax_config(self) -> str:
316 return cast(str, self.settings.get("mathjax_config", "TeX-AMS-MML_HTMLorMML-full,Safe"))
318 @property
319 def default_url(self) -> str:
320 return cast(str, self.settings.get("default_url", ""))
322 @property
323 def ws_url(self) -> str:
324 return cast(str, self.settings.get("websocket_url", ""))
326 @property
327 def contents_js_source(self) -> str:
328 self.log.debug(
329 "Using contents: %s",
330 self.settings.get("contents_js_source", "services/contents"),
331 )
332 return cast(str, self.settings.get("contents_js_source", "services/contents"))
334 # ---------------------------------------------------------------
335 # Manager objects
336 # ---------------------------------------------------------------
338 @property
339 def kernel_manager(self) -> AsyncMappingKernelManager:
340 return cast("AsyncMappingKernelManager", self.settings["kernel_manager"])
342 @property
343 def contents_manager(self) -> ContentsManager:
344 return cast("ContentsManager", self.settings["contents_manager"])
346 @property
347 def session_manager(self) -> SessionManager:
348 return cast("SessionManager", self.settings["session_manager"])
350 @property
351 def terminal_manager(self) -> TerminalManager:
352 return cast("TerminalManager", self.settings["terminal_manager"])
354 @property
355 def kernel_spec_manager(self) -> KernelSpecManager:
356 return cast("KernelSpecManager", self.settings["kernel_spec_manager"])
358 @property
359 def config_manager(self) -> ConfigManager:
360 return cast("ConfigManager", self.settings["config_manager"])
362 @property
363 def event_logger(self) -> EventLogger:
364 return cast("EventLogger", self.settings["event_logger"])
366 # ---------------------------------------------------------------
367 # CORS
368 # ---------------------------------------------------------------
370 @property
371 def allow_origin(self) -> str:
372 """Normal Access-Control-Allow-Origin"""
373 return cast(str, self.settings.get("allow_origin", ""))
375 @property
376 def allow_origin_pat(self) -> str | None:
377 """Regular expression version of allow_origin"""
378 return cast("str | None", self.settings.get("allow_origin_pat", None))
380 @property
381 def allow_credentials(self) -> bool:
382 """Whether to set Access-Control-Allow-Credentials"""
383 return cast(bool, self.settings.get("allow_credentials", False))
385 def set_default_headers(self) -> None:
386 """Add CORS headers, if defined"""
387 super().set_default_headers()
389 def set_cors_headers(self) -> None:
390 """Add CORS headers, if defined
392 Now that current_user is async (jupyter-server 2.0),
393 must be called at the end of prepare(), instead of in set_default_headers.
394 """
395 if self.allow_origin:
396 self.set_header("Access-Control-Allow-Origin", self.allow_origin)
397 elif self.allow_origin_pat:
398 origin = self.get_origin()
399 if origin and re.match(self.allow_origin_pat, origin):
400 self.set_header("Access-Control-Allow-Origin", origin)
401 elif self.token_authenticated and "Access-Control-Allow-Origin" not in self.settings.get(
402 "headers", {}
403 ):
404 # allow token-authenticated requests cross-origin by default.
405 # only apply this exception if allow-origin has not been specified.
406 self.set_header("Access-Control-Allow-Origin", self.request.headers.get("Origin", ""))
408 if self.allow_credentials:
409 self.set_header("Access-Control-Allow-Credentials", "true")
411 def set_attachment_header(self, filename: str) -> None:
412 """Set Content-Disposition: attachment header
414 As a method to ensure handling of filename encoding
415 """
416 escaped_filename = url_escape(filename)
417 self.set_header(
418 "Content-Disposition",
419 f"attachment; filename*=utf-8''{escaped_filename}",
420 )
422 def get_origin(self) -> str | None:
423 # Handle WebSocket Origin naming convention differences
424 # The difference between version 8 and 13 is that in 8 the
425 # client sends a "Sec-Websocket-Origin" header and in 13 it's
426 # simply "Origin".
427 if "Origin" in self.request.headers:
428 origin = self.request.headers.get("Origin")
429 else:
430 origin = self.request.headers.get("Sec-Websocket-Origin", None)
431 return origin
433 # origin_to_satisfy_tornado is present because tornado requires
434 # check_origin to take an origin argument, but we don't use it
435 def check_origin(self, origin_to_satisfy_tornado: str = "") -> bool:
436 """Check Origin for cross-site API requests, including websockets
438 Copied from WebSocket with changes:
440 - allow unspecified host/origin (e.g. scripts)
441 - allow token-authenticated requests
442 """
443 if self.allow_origin == "*" or self.skip_check_origin():
444 return True
446 host = self.request.headers.get("Host")
447 origin = self.request.headers.get("Origin")
449 # If no header is provided, let the request through.
450 # Origin can be None for:
451 # - same-origin (IE, Firefox)
452 # - Cross-site POST form (IE, Firefox)
453 # - Scripts
454 # The cross-site POST (XSRF) case is handled by tornado's xsrf_token
455 if origin is None or host is None:
456 return True
458 origin = origin.lower()
459 origin_host = urlparse(origin).netloc
461 # OK if origin matches host
462 if origin_host == host:
463 return True
465 # Check CORS headers
466 if self.allow_origin:
467 allow = bool(self.allow_origin == origin)
468 elif self.allow_origin_pat:
469 allow = bool(re.match(self.allow_origin_pat, origin))
470 else:
471 # No CORS headers deny the request
472 allow = False
473 if not allow:
474 self.log.warning(
475 "Blocking Cross Origin API request for %s. Origin: %s, Host: %s",
476 self.request.path,
477 origin,
478 host,
479 )
480 return allow
482 def check_referer(self) -> bool:
483 """Check Referer for cross-site requests.
484 Disables requests to certain endpoints with
485 external or missing Referer.
486 If set, allow_origin settings are applied to the Referer
487 to whitelist specific cross-origin sites.
488 Used on GET for api endpoints and /files/
489 to block cross-site inclusion (XSSI).
490 """
491 if self.allow_origin == "*" or self.skip_check_origin():
492 return True
494 host = self.request.headers.get("Host")
495 referer = self.request.headers.get("Referer")
497 if not host:
498 self.log.warning("Blocking request with no host")
499 return False
500 if not referer:
501 self.log.warning("Blocking request with no referer")
502 return False
504 referer_url = urlparse(referer)
505 referer_host = referer_url.netloc
506 if referer_host == host:
507 return True
509 # apply cross-origin checks to Referer:
510 origin = f"{referer_url.scheme}://{referer_url.netloc}"
511 if self.allow_origin:
512 allow = self.allow_origin == origin
513 elif self.allow_origin_pat:
514 allow = bool(re.match(self.allow_origin_pat, origin))
515 else:
516 # No CORS settings, deny the request
517 allow = False
519 if not allow:
520 self.log.warning(
521 "Blocking Cross Origin request for %s. Referer: %s, Host: %s",
522 self.request.path,
523 origin,
524 host,
525 )
526 return allow
528 def check_xsrf_cookie(self) -> None:
529 """Bypass xsrf cookie checks when token-authenticated"""
530 if not hasattr(self, "_jupyter_current_user"):
531 # Called too early, will be checked later
532 return None
533 if self.token_authenticated or self.settings.get("disable_check_xsrf", False):
534 # Token-authenticated requests do not need additional XSRF-check
535 # Servers without authentication are vulnerable to XSRF
536 return None
537 try:
538 if not self.check_origin():
539 raise web.HTTPError(404)
540 return super().check_xsrf_cookie()
541 except web.HTTPError as e:
542 if self.request.method in {"GET", "HEAD"}:
543 # Consider Referer a sufficient cross-origin check for GET requests
544 if not self.check_referer():
545 referer = self.request.headers.get("Referer")
546 if referer:
547 msg = f"Blocking Cross Origin request from {referer}."
548 else:
549 msg = "Blocking request from unknown origin"
550 raise web.HTTPError(403, msg) from e
551 else:
552 raise
554 def check_host(self) -> bool:
555 """Check the host header if remote access disallowed.
557 Returns True if the request should continue, False otherwise.
558 """
559 if self.settings.get("allow_remote_access", False):
560 return True
562 # Remove port (e.g. ':8888') from host
563 match = re.match(r"^(.*?)(:\d+)?$", self.request.host)
564 assert match is not None
565 host = match.group(1)
567 # Browsers format IPv6 addresses like [::1]; we need to remove the []
568 if host.startswith("[") and host.endswith("]"):
569 host = host[1:-1]
571 # UNIX socket handling
572 check_host = urldecode_unix_socket_path(host)
573 if check_host.startswith("/") and os.path.exists(check_host):
574 allow = True
575 else:
576 try:
577 addr = ipaddress.ip_address(host)
578 except ValueError:
579 # Not an IP address: check against hostnames
580 allow = host in self.settings.get("local_hostnames", ["localhost"])
581 else:
582 allow = addr.is_loopback
584 if not allow:
585 self.log.warning(
586 (
587 "Blocking request with non-local 'Host' %s (%s). "
588 "If the server should be accessible at that name, "
589 "set ServerApp.allow_remote_access to disable the check."
590 ),
591 host,
592 self.request.host,
593 )
594 return allow
596 async def prepare(self, *, _redirect_to_login=True) -> Awaitable[None] | None: # type:ignore[override]
597 """Prepare a response."""
598 # Set the current Jupyter Handler context variable.
599 CallContext.set(CallContext.JUPYTER_HANDLER, self)
601 if not self.check_host():
602 self.current_user = self._jupyter_current_user = None
603 raise web.HTTPError(403)
605 from jupyter_server.auth import IdentityProvider
607 mod_obj = inspect.getmodule(self.get_current_user)
608 assert mod_obj is not None
609 user: User | None = None
611 if type(self.identity_provider) is IdentityProvider and mod_obj.__name__ != __name__:
612 # check for overridden get_current_user + default IdentityProvider
613 # deprecated way to override auth (e.g. JupyterHub < 3.0)
614 # allow deprecated, overridden get_current_user
615 warnings.warn(
616 "Overriding JupyterHandler.get_current_user is deprecated in jupyter-server 2.0."
617 " Use an IdentityProvider class.",
618 DeprecationWarning,
619 stacklevel=1,
620 )
621 user = User(self.get_current_user())
622 else:
623 _user = self.identity_provider.get_user(self)
624 if isinstance(_user, Awaitable):
625 # IdentityProvider.get_user _may_ be async
626 _user = await _user
627 user = _user
629 # self.current_user for tornado's @web.authenticated
630 # self._jupyter_current_user for backward-compat in deprecated get_current_user calls
631 # and our own private checks for whether .current_user has been set
632 self.current_user = self._jupyter_current_user = user
633 # complete initial steps which require auth to resolve first:
634 self.set_cors_headers()
635 if self.request.method not in {"GET", "HEAD", "OPTIONS"}:
636 self.check_xsrf_cookie()
638 if not self.settings.get("allow_unauthenticated_access", False):
639 if not self.request.method:
640 raise HTTPError(403)
641 method = getattr(self, self.request.method.lower())
642 if not getattr(method, "__allow_unauthenticated", False):
643 if _redirect_to_login:
644 # reuse `web.authenticated` logic, which redirects to the login
645 # page on GET and HEAD and otherwise raises 403
646 return web.authenticated(lambda _: super().prepare())(self)
647 else:
648 # raise 403 if user is not known without redirecting to login page
649 user = self.current_user
650 if user is None:
651 self.log.warning(
652 f"Couldn't authenticate {self.__class__.__name__} connection"
653 )
654 raise web.HTTPError(403)
656 return super().prepare()
658 # ---------------------------------------------------------------
659 # template rendering
660 # ---------------------------------------------------------------
662 def get_template(self, name):
663 """Return the jinja template object for a given name"""
664 return self.settings["jinja2_env"].get_template(name)
666 def render_template(self, name, **ns):
667 """Render a template by name."""
668 ns.update(self.template_namespace)
669 template = self.get_template(name)
670 return template.render(**ns)
672 @property
673 def template_namespace(self) -> dict[str, Any]:
674 return dict(
675 base_url=self.base_url,
676 default_url=self.default_url,
677 ws_url=self.ws_url,
678 logged_in=self.logged_in,
679 allow_password_change=getattr(self.identity_provider, "allow_password_change", False),
680 auth_enabled=self.identity_provider.auth_enabled,
681 login_available=self.identity_provider.login_available,
682 token_available=bool(self.token),
683 static_url=self.static_url,
684 sys_info=json_sys_info(),
685 contents_js_source=self.contents_js_source,
686 version_hash=self.version_hash,
687 xsrf_form_html=self.xsrf_form_html,
688 token=self.token,
689 xsrf_token=self.xsrf_token.decode("utf8"),
690 nbjs_translations=json.dumps(
691 combine_translations(self.request.headers.get("Accept-Language", ""))
692 ),
693 **self.jinja_template_vars,
694 )
696 def get_json_body(self) -> dict[str, Any] | None:
697 """Return the body of the request as JSON data."""
698 if not self.request.body:
699 return None
700 # Do we need to call body.decode('utf-8') here?
701 body = self.request.body.strip().decode("utf-8")
702 try:
703 model = json.loads(body)
704 except Exception as e:
705 self.log.debug("Bad JSON: %r", body)
706 self.log.error("Couldn't parse JSON", exc_info=True)
707 raise web.HTTPError(400, "Invalid JSON in body of request") from e
708 return cast("dict[str, Any]", model)
710 def write_error(self, status_code: int, **kwargs: Any) -> None:
711 """render custom error pages"""
712 exc_info = kwargs.get("exc_info")
713 message = ""
714 status_message = responses.get(status_code, "Unknown HTTP Error")
716 if exc_info:
717 exception = exc_info[1]
718 # get the custom message, if defined
719 try:
720 message = exception.log_message % exception.args
721 except Exception:
722 pass
724 # construct the custom reason, if defined
725 reason = getattr(exception, "reason", "")
726 if reason:
727 status_message = reason
728 else:
729 exception = "(unknown)"
731 # build template namespace
732 ns = {
733 "status_code": status_code,
734 "status_message": status_message,
735 "message": message,
736 "exception": exception,
737 }
739 self.set_header("Content-Type", "text/html")
740 # render the template
741 try:
742 html = self.render_template("%s.html" % status_code, **ns)
743 except TemplateNotFound:
744 html = self.render_template("error.html", **ns)
746 self.write(html)
749class APIHandler(JupyterHandler):
750 """Base class for API handlers"""
752 async def prepare(self) -> None: # type:ignore[override]
753 """Prepare an API response."""
754 await super().prepare()
755 if not self.check_origin():
756 raise web.HTTPError(404)
758 def write_error(self, status_code: int, **kwargs: Any) -> None:
759 """APIHandler errors are JSON, not human pages"""
760 self.set_header("Content-Type", "application/json")
761 message = responses.get(status_code, "Unknown HTTP Error")
762 reply: dict[str, Any] = {
763 "message": message,
764 }
765 exc_info = kwargs.get("exc_info")
766 if exc_info:
767 e = exc_info[1]
768 if isinstance(e, HTTPError):
769 reply["message"] = e.log_message or message
770 reply["reason"] = e.reason
771 else:
772 reply["message"] = "Unhandled error"
773 reply["reason"] = None
774 # backward-compatibility: traceback field is present,
775 # but always empty
776 reply["traceback"] = ""
777 self.log.warning("wrote error: %r", reply["message"], exc_info=True)
778 self.finish(json.dumps(reply))
780 def get_login_url(self) -> str:
781 """Get the login url."""
782 # if get_login_url is invoked in an API handler,
783 # that means @web.authenticated is trying to trigger a redirect.
784 # instead of redirecting, raise 403 instead.
785 if not self.current_user:
786 raise web.HTTPError(403)
787 return super().get_login_url()
789 @property
790 def content_security_policy(self) -> str:
791 csp = "; ".join( # noqa: FLY002
792 [
793 super().content_security_policy,
794 "default-src 'none'",
795 ]
796 )
797 return csp
799 # set _track_activity = False on API handlers that shouldn't track activity
800 _track_activity = True
802 def update_api_activity(self) -> None:
803 """Update last_activity of API requests"""
804 # record activity of authenticated requests
805 if (
806 self._track_activity
807 and getattr(self, "_jupyter_current_user", None)
808 and self.get_argument("no_track_activity", None) is None
809 ):
810 self.settings["api_last_activity"] = utcnow()
812 def finish(self, *args: Any, **kwargs: Any) -> Future[Any]:
813 """Finish an API response."""
814 self.update_api_activity()
815 # Allow caller to indicate content-type...
816 set_content_type = kwargs.pop("set_content_type", "application/json")
817 self.set_header("Content-Type", set_content_type)
818 return super().finish(*args, **kwargs)
820 @allow_unauthenticated
821 def options(self, *args: Any, **kwargs: Any) -> None:
822 """Get the options."""
823 if "Access-Control-Allow-Headers" in self.settings.get("headers", {}):
824 self.set_header(
825 "Access-Control-Allow-Headers",
826 self.settings["headers"]["Access-Control-Allow-Headers"],
827 )
828 else:
829 self.set_header(
830 "Access-Control-Allow-Headers",
831 "accept, content-type, authorization, x-xsrftoken",
832 )
833 self.set_header("Access-Control-Allow-Methods", "GET, PUT, POST, PATCH, DELETE, OPTIONS")
835 # if authorization header is requested,
836 # that means the request is token-authenticated.
837 # avoid browser-side rejection of the preflight request.
838 # only allow this exception if allow_origin has not been specified
839 # and Jupyter server authentication is enabled.
840 # If the token is not valid, the 'real' request will still be rejected.
841 requested_headers = self.request.headers.get("Access-Control-Request-Headers", "").split(
842 ","
843 )
844 if (
845 requested_headers
846 and any(h.strip().lower() == "authorization" for h in requested_headers)
847 and (
848 # FIXME: it would be even better to check specifically for token-auth,
849 # but there is currently no API for this.
850 self.login_available
851 )
852 and (
853 self.allow_origin
854 or self.allow_origin_pat
855 or "Access-Control-Allow-Origin" in self.settings.get("headers", {})
856 )
857 ):
858 self.set_header("Access-Control-Allow-Origin", self.request.headers.get("Origin", ""))
861class Template404(JupyterHandler):
862 """Render our 404 template"""
864 async def prepare(self) -> None: # type:ignore[override]
865 """Prepare a 404 response."""
866 await super().prepare()
867 raise web.HTTPError(404)
870class AuthenticatedFileHandler(JupyterHandler, web.StaticFileHandler):
871 """static files should only be accessible when logged in"""
873 auth_resource = "contents"
875 @property
876 def content_security_policy(self) -> str:
877 # In case we're serving HTML/SVG, confine any Javascript to a unique
878 # origin so it can't interact with the Jupyter server.
879 return super().content_security_policy + "; sandbox allow-scripts"
881 @web.authenticated
882 @authorized
883 def head(self, path: str) -> Awaitable[None]: # type:ignore[override]
884 """Get the head response for a path."""
885 self.check_xsrf_cookie()
886 return super().head(path)
888 @web.authenticated
889 @authorized
890 def get( # type:ignore[override]
891 self, path: str, **kwargs: Any
892 ) -> Awaitable[None]:
893 """Get a file by path."""
894 self.check_xsrf_cookie()
895 if os.path.splitext(path)[1] == ".ipynb" or self.get_argument("download", None):
896 name = path.rsplit("/", 1)[-1]
897 self.set_attachment_header(name)
899 return web.StaticFileHandler.get(self, path, **kwargs)
901 def get_content_type(self) -> str:
902 """Get the content type."""
903 assert self.absolute_path is not None
904 path = self.absolute_path.strip("/")
905 if "/" in path:
906 _, name = path.rsplit("/", 1)
907 else:
908 name = path
909 if name.endswith(".ipynb"):
910 return "application/x-ipynb+json"
911 else:
912 cur_mime = mimetypes.guess_type(name)[0]
913 if cur_mime == "text/plain":
914 return "text/plain; charset=UTF-8"
915 else:
916 return super().get_content_type()
918 def set_headers(self) -> None:
919 """Set the headers."""
920 super().set_headers()
921 # disable browser caching, rely on 304 replies for savings
922 if "v" not in self.request.arguments:
923 self.add_header("Cache-Control", "no-cache")
925 def compute_etag(self) -> str | None:
926 """Compute the etag."""
927 return None
929 def validate_absolute_path(self, root: str, absolute_path: str) -> str:
930 """Validate and return the absolute path.
932 Requires tornado 3.1
934 Adding to tornado's own handling, forbids the serving of hidden files.
935 """
936 abs_path = super().validate_absolute_path(root, absolute_path)
937 abs_root = os.path.abspath(root)
938 assert abs_path is not None
939 if not self.contents_manager.allow_hidden and is_hidden(abs_path, abs_root):
940 self.log.info(
941 "Refusing to serve hidden file, via 404 Error, use flag 'ContentsManager.allow_hidden' to enable"
942 )
943 raise web.HTTPError(404)
944 return abs_path
947def json_errors(method: Any) -> Any: # pragma: no cover
948 """Decorate methods with this to return GitHub style JSON errors.
950 This should be used on any JSON API on any handler method that can raise HTTPErrors.
952 This will grab the latest HTTPError exception using sys.exc_info
953 and then:
955 1. Set the HTTP status code based on the HTTPError
956 2. Create and return a JSON body with a message field describing
957 the error in a human readable form.
958 """
959 warnings.warn(
960 "@json_errors is deprecated in notebook 5.2.0. Subclass APIHandler instead.",
961 DeprecationWarning,
962 stacklevel=2,
963 )
965 @functools.wraps(method)
966 def wrapper(self, *args, **kwargs):
967 self.write_error = types.MethodType(APIHandler.write_error, self)
968 return method(self, *args, **kwargs)
970 return wrapper
973# -----------------------------------------------------------------------------
974# File handler
975# -----------------------------------------------------------------------------
977# to minimize subclass changes:
978HTTPError = web.HTTPError
981class FileFindHandler(JupyterHandler, web.StaticFileHandler):
982 """subclass of StaticFileHandler for serving files from a search path
984 The setting "static_immutable_cache" can be set up to serve some static
985 file as immutable (e.g. file name containing a hash). The setting is a
986 list of base URL, every static file URL starting with one of those will
987 be immutable.
988 """
990 # cache search results, don't search for files more than once
991 _static_paths: dict[str, str] = {}
992 root: tuple[str] # type:ignore[assignment]
994 def set_headers(self) -> None:
995 """Set the headers."""
996 super().set_headers()
998 immutable_paths = self.settings.get("static_immutable_cache", [])
1000 # allow immutable cache for files
1001 if any(self.request.path.startswith(path) for path in immutable_paths):
1002 self.set_header("Cache-Control", "public, max-age=31536000, immutable")
1004 # disable browser caching, rely on 304 replies for savings
1005 elif "v" not in self.request.arguments or any(
1006 self.request.path.startswith(path) for path in self.no_cache_paths
1007 ):
1008 self.set_header("Cache-Control", "no-cache")
1010 def initialize(
1011 self,
1012 path: str | list[str],
1013 default_filename: str | None = None,
1014 no_cache_paths: list[str] | None = None,
1015 ) -> None:
1016 """Initialize the file find handler."""
1017 self.no_cache_paths = no_cache_paths or []
1019 if isinstance(path, str):
1020 path = [path]
1022 self.root = tuple(os.path.abspath(os.path.expanduser(p)) + os.sep for p in path) # type:ignore[assignment]
1023 self.default_filename = default_filename
1025 def compute_etag(self) -> str | None:
1026 """Compute the etag."""
1027 return None
1029 # access is allowed as this class is used to serve static assets on login page
1030 # TODO: create an allow-list of files used on login page and remove this decorator
1031 @allow_unauthenticated
1032 def get(self, path: str, include_body: bool = True) -> Coroutine[Any, Any, None]:
1033 return super().get(path, include_body)
1035 # access is allowed as this class is used to serve static assets on login page
1036 # TODO: create an allow-list of files used on login page and remove this decorator
1037 @allow_unauthenticated
1038 def head(self, path: str) -> Awaitable[None]:
1039 return super().head(path)
1041 @classmethod
1042 def get_absolute_path(cls, roots: Sequence[str], path: str) -> str:
1043 """locate a file to serve on our static file search path"""
1044 with cls._lock:
1045 if path in cls._static_paths:
1046 return cls._static_paths[path]
1047 try:
1048 abspath = os.path.abspath(filefind(path, roots))
1049 except OSError:
1050 # IOError means not found
1051 return ""
1053 cls._static_paths[path] = abspath
1055 log().debug(f"Path {path} served from {abspath}")
1056 return abspath
1058 def validate_absolute_path(self, root: str, absolute_path: str) -> str | None:
1059 """check if the file should be served (raises 404, 403, etc.)"""
1060 if not absolute_path:
1061 raise web.HTTPError(404)
1063 for root in self.root:
1064 if (absolute_path + os.sep).startswith(root):
1065 break
1067 return super().validate_absolute_path(root, absolute_path)
1070class APIVersionHandler(APIHandler):
1071 """An API handler for the server version."""
1073 _track_activity = False
1075 @allow_unauthenticated
1076 def get(self) -> None:
1077 """Get the server version info."""
1078 # not authenticated, so give as few info as possible
1079 self.finish(json.dumps({"version": jupyter_server.__version__}))
1082class TrailingSlashHandler(web.RequestHandler):
1083 """Simple redirect handler that strips trailing slashes
1085 This should be the first, highest priority handler.
1086 """
1088 @allow_unauthenticated
1089 def get(self) -> None:
1090 """Handle trailing slashes in a get."""
1091 assert self.request.uri is not None
1092 path, *rest = self.request.uri.partition("?")
1093 # trim trailing *and* leading /
1094 # to avoid misinterpreting repeated '//'
1095 path = "/" + path.strip("/")
1096 new_uri = "".join([path, *rest])
1097 self.redirect(new_uri)
1099 post = put = get
1102class MainHandler(JupyterHandler):
1103 """Simple handler for base_url."""
1105 @allow_unauthenticated
1106 def get(self) -> None:
1107 """Get the main template."""
1108 html = self.render_template("main.html")
1109 self.write(html)
1111 post = put = get
1114class FilesRedirectHandler(JupyterHandler):
1115 """Handler for redirecting relative URLs to the /files/ handler"""
1117 @staticmethod
1118 async def redirect_to_files(self: Any, path: str) -> None:
1119 """make redirect logic a reusable static method
1121 so it can be called from other handlers.
1122 """
1123 cm = self.contents_manager
1124 if await ensure_async(cm.dir_exists(path)):
1125 # it's a *directory*, redirect to /tree
1126 url = url_path_join(self.base_url, "tree", url_escape(path))
1127 else:
1128 orig_path = path
1129 # otherwise, redirect to /files
1130 parts = path.split("/")
1132 if not await ensure_async(cm.file_exists(path=path)) and "files" in parts:
1133 # redirect without files/ iff it would 404
1134 # this preserves pre-2.0-style 'files/' links
1135 self.log.warning("Deprecated files/ URL: %s", orig_path)
1136 parts.remove("files")
1137 path = "/".join(parts)
1139 if not await ensure_async(cm.file_exists(path=path)):
1140 raise web.HTTPError(404)
1142 url = url_path_join(self.base_url, "files", url_escape(path))
1143 self.log.debug("Redirecting %s to %s", self.request.path, url)
1144 self.redirect(url)
1146 @allow_unauthenticated
1147 async def get(self, path: str = "") -> None:
1148 return await self.redirect_to_files(self, path)
1151class RedirectWithParams(web.RequestHandler):
1152 """Same as web.RedirectHandler, but preserves URL parameters"""
1154 def initialize(self, url: str, permanent: bool = True) -> None:
1155 """Initialize a redirect handler."""
1156 self._url = url
1157 self._permanent = permanent
1159 @allow_unauthenticated
1160 def get(self) -> None:
1161 """Get a redirect."""
1162 sep = "&" if "?" in self._url else "?"
1163 url = sep.join([self._url, self.request.query])
1164 self.redirect(url, permanent=self._permanent)
1167class PrometheusMetricsHandler(JupyterHandler):
1168 """
1169 Return prometheus metrics for this server
1170 """
1172 @allow_unauthenticated
1173 def get(self) -> None:
1174 """Get prometheus metrics."""
1175 if self.settings["authenticate_prometheus"] and not self.logged_in:
1176 raise web.HTTPError(403)
1178 self.set_header("Content-Type", prometheus_client.CONTENT_TYPE_LATEST)
1179 self.write(prometheus_client.generate_latest(prometheus_client.REGISTRY))
1182class PublicStaticFileHandler(web.StaticFileHandler):
1183 """Same as web.StaticFileHandler, but decorated to acknowledge that auth is not required."""
1185 @allow_unauthenticated
1186 def head(self, path: str) -> Awaitable[None]:
1187 return super().head(path)
1189 @allow_unauthenticated
1190 def get(self, path: str, include_body: bool = True) -> Coroutine[Any, Any, None]:
1191 return super().get(path, include_body)
1194# -----------------------------------------------------------------------------
1195# URL pattern fragments for reuse
1196# -----------------------------------------------------------------------------
1198# path matches any number of `/foo[/bar...]` or just `/` or ''
1199path_regex = r"(?P<path>(?:(?:/[^/]+)+|/?))"
1201# -----------------------------------------------------------------------------
1202# URL to handler mappings
1203# -----------------------------------------------------------------------------
1206default_handlers = [
1207 (r".*/", TrailingSlashHandler),
1208 (r"api", APIVersionHandler),
1209 (r"/(robots\.txt|favicon\.ico)", PublicStaticFileHandler),
1210 (r"/metrics", PrometheusMetricsHandler),
1211]