Coverage for /pythoncovmergedfiles/medio/medio/src/jupyter_server/jupyter_server/base/handlers.py: 33%
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1"""Base Tornado handlers for the Jupyter server."""
3# Copyright (c) Jupyter Development Team.
4# Distributed under the terms of the Modified BSD License.
5from __future__ import annotations
7import functools
8import inspect
9import ipaddress
10import json
11import mimetypes
12import os
13import re
14import types
15import warnings
16from collections.abc import Awaitable, Coroutine, Sequence
17from http.client import responses
18from typing import TYPE_CHECKING, Any, cast
19from urllib.parse import urlparse
21import prometheus_client
22from jinja2 import TemplateNotFound
23from jupyter_core.paths import is_hidden
24from tornado import web
25from tornado.log import app_log
26from traitlets.config import Application
28import jupyter_server
29from jupyter_server import CallContext
30from jupyter_server._sysinfo import get_sys_info
31from jupyter_server._tz import utcnow
32from jupyter_server.auth.decorator import allow_unauthenticated, authorized
33from jupyter_server.auth.identity import User
34from jupyter_server.i18n import combine_translations
35from jupyter_server.services.security import csp_report_uri
36from jupyter_server.utils import (
37 ensure_async,
38 filefind,
39 url_escape,
40 url_is_absolute,
41 url_path_join,
42 urldecode_unix_socket_path,
43)
45if TYPE_CHECKING:
46 from logging import Logger
48 from jupyter_client.kernelspec import KernelSpecManager
49 from jupyter_events import EventLogger
50 from jupyter_server_terminals.terminalmanager import TerminalManager
51 from tornado.concurrent import Future
53 from jupyter_server.auth.authorizer import Authorizer
54 from jupyter_server.auth.identity import IdentityProvider
55 from jupyter_server.serverapp import ServerApp
56 from jupyter_server.services.config.manager import ConfigManager
57 from jupyter_server.services.contents.manager import ContentsManager
58 from jupyter_server.services.kernels.kernelmanager import AsyncMappingKernelManager
59 from jupyter_server.services.sessions.sessionmanager import SessionManager
61# -----------------------------------------------------------------------------
62# Top-level handlers
63# -----------------------------------------------------------------------------
65_sys_info_cache = None
68def json_sys_info():
69 """Get sys info as json."""
70 global _sys_info_cache # noqa: PLW0603
71 if _sys_info_cache is None:
72 _sys_info_cache = json.dumps(get_sys_info())
73 return _sys_info_cache
76def log() -> Logger:
77 """Get the application log."""
78 if Application.initialized():
79 return cast("Logger", Application.instance().log)
80 else:
81 return app_log
84class AuthenticatedHandler(web.RequestHandler):
85 """A RequestHandler with an authenticated user."""
87 @property
88 def base_url(self) -> str:
89 return cast("str", self.settings.get("base_url", "/"))
91 @property
92 def content_security_policy(self) -> str:
93 """The default Content-Security-Policy header
95 Can be overridden by defining Content-Security-Policy in settings['headers']
96 """
97 if "Content-Security-Policy" in self.settings.get("headers", {}):
98 # user-specified, don't override
99 return cast("str", self.settings["headers"]["Content-Security-Policy"])
101 return "; ".join(
102 [
103 "frame-ancestors 'self'",
104 # Make sure the report-uri is relative to the base_url
105 "report-uri "
106 + self.settings.get("csp_report_uri", url_path_join(self.base_url, csp_report_uri)),
107 ]
108 )
110 def set_default_headers(self) -> None:
111 """Set the default headers."""
112 headers = {}
113 headers["X-Content-Type-Options"] = "nosniff"
114 headers.update(self.settings.get("headers", {}))
116 headers["Content-Security-Policy"] = self.content_security_policy
118 # Allow for overriding headers
119 for header_name, value in headers.items():
120 try:
121 self.set_header(header_name, value)
122 except Exception as e:
123 # tornado raise Exception (not a subclass)
124 # if method is unsupported (websocket and Access-Control-Allow-Origin
125 # for example, so just ignore)
126 self.log.exception( # type:ignore[attr-defined]
127 "Could not set default headers: %s", e
128 )
130 @property
131 def cookie_name(self) -> str:
132 warnings.warn(
133 """JupyterHandler.login_handler is deprecated in 2.0,
134 use JupyterHandler.identity_provider.
135 """,
136 DeprecationWarning,
137 stacklevel=2,
138 )
139 return self.identity_provider.get_cookie_name(self)
141 def force_clear_cookie(self, name: str, path: str = "/", domain: str | None = None) -> None:
142 """Force a cookie clear."""
143 warnings.warn(
144 """JupyterHandler.login_handler is deprecated in 2.0,
145 use JupyterHandler.identity_provider.
146 """,
147 DeprecationWarning,
148 stacklevel=2,
149 )
150 self.identity_provider._force_clear_cookie(self, name, path=path, domain=domain)
152 def clear_login_cookie(self) -> None:
153 """Clear a login cookie."""
154 warnings.warn(
155 """JupyterHandler.login_handler is deprecated in 2.0,
156 use JupyterHandler.identity_provider.
157 """,
158 DeprecationWarning,
159 stacklevel=2,
160 )
161 self.identity_provider.clear_login_cookie(self)
163 def get_current_user(self) -> str:
164 """Get the current user."""
165 clsname = self.__class__.__name__
166 msg = (
167 f"Calling `{clsname}.get_current_user()` directly is deprecated in jupyter-server 2.0."
168 " Use `self.current_user` instead (works in all versions)."
169 )
170 if hasattr(self, "_jupyter_current_user"):
171 # backward-compat: return _jupyter_current_user
172 warnings.warn(
173 msg,
174 DeprecationWarning,
175 stacklevel=2,
176 )
177 return cast("str", self._jupyter_current_user)
178 # haven't called get_user in prepare, raise
179 raise RuntimeError(msg)
181 def skip_check_origin(self) -> bool:
182 """Ask my login_handler if I should skip the origin_check
184 For example: in the default LoginHandler, if a request is token-authenticated,
185 origin checking should be skipped.
186 """
187 if self.request.method == "OPTIONS":
188 # no origin-check on options requests, which are used to check origins!
189 return True
190 return not self.identity_provider.should_check_origin(self)
192 @property
193 def token_authenticated(self) -> bool:
194 """Have I been authenticated with a token?"""
195 return self.identity_provider.is_token_authenticated(self)
197 @property
198 def logged_in(self) -> bool:
199 """Is a user currently logged in?"""
200 user = self.current_user
201 return bool(user and user != "anonymous")
203 @property
204 def login_handler(self) -> Any:
205 """Return the login handler for this application, if any."""
206 warnings.warn(
207 """JupyterHandler.login_handler is deprecated in 2.0,
208 use JupyterHandler.identity_provider.
209 """,
210 DeprecationWarning,
211 stacklevel=2,
212 )
213 return self.identity_provider.login_handler_class
215 @property
216 def token(self) -> str | None:
217 """Return the login token for this application, if any."""
218 return self.identity_provider.token
220 @property
221 def login_available(self) -> bool:
222 """May a user proceed to log in?
224 This returns True if login capability is available, irrespective of
225 whether the user is already logged in or not.
227 """
228 return cast("bool", self.identity_provider.login_available)
230 @property
231 def authorizer(self) -> Authorizer:
232 if "authorizer" not in self.settings:
233 warnings.warn(
234 "The Tornado web application does not have an 'authorizer' defined "
235 "in its settings. In future releases of jupyter_server, this will "
236 "be a required key for all subclasses of `JupyterHandler`. For an "
237 "example, see the jupyter_server source code for how to "
238 "add an authorizer to the tornado settings: "
239 "https://github.com/jupyter-server/jupyter_server/blob/"
240 "653740cbad7ce0c8a8752ce83e4d3c2c754b13cb/jupyter_server/serverapp.py"
241 "#L234-L256",
242 stacklevel=2,
243 )
244 from jupyter_server.auth import AllowAllAuthorizer
246 self.settings["authorizer"] = AllowAllAuthorizer(
247 config=self.settings.get("config", None),
248 identity_provider=self.identity_provider,
249 )
251 return cast("Authorizer", self.settings.get("authorizer"))
253 @property
254 def identity_provider(self) -> IdentityProvider:
255 if "identity_provider" not in self.settings:
256 warnings.warn(
257 "The Tornado web application does not have an 'identity_provider' defined "
258 "in its settings. In future releases of jupyter_server, this will "
259 "be a required key for all subclasses of `JupyterHandler`. For an "
260 "example, see the jupyter_server source code for how to "
261 "add an identity provider to the tornado settings: "
262 "https://github.com/jupyter-server/jupyter_server/blob/v2.0.0/"
263 "jupyter_server/serverapp.py#L242",
264 stacklevel=2,
265 )
266 from jupyter_server.auth import IdentityProvider
268 # no identity provider set, load default
269 self.settings["identity_provider"] = IdentityProvider(
270 config=self.settings.get("config", None)
271 )
272 return cast("IdentityProvider", self.settings["identity_provider"])
275class JupyterHandler(AuthenticatedHandler):
276 """Jupyter-specific extensions to authenticated handling
278 Mostly property shortcuts to Jupyter-specific settings.
279 """
281 @property
282 def config(self) -> dict[str, Any] | None:
283 return cast("dict[str, Any] | None", self.settings.get("config", None))
285 @property
286 def log(self) -> Logger:
287 """use the Jupyter log by default, falling back on tornado's logger"""
288 return log()
290 @property
291 def jinja_template_vars(self) -> dict[str, Any]:
292 """User-supplied values to supply to jinja templates."""
293 return cast("dict[str, Any]", self.settings.get("jinja_template_vars", {}))
295 @property
296 def serverapp(self) -> ServerApp | None:
297 return cast("ServerApp | None", self.settings["serverapp"])
299 # ---------------------------------------------------------------
300 # URLs
301 # ---------------------------------------------------------------
303 @property
304 def version_hash(self) -> str:
305 """The version hash to use for cache hints for static files"""
306 return cast("str", self.settings.get("version_hash", ""))
308 @property
309 def mathjax_url(self) -> str:
310 url = cast("str", self.settings.get("mathjax_url", ""))
311 if not url or url_is_absolute(url):
312 return url
313 return url_path_join(self.base_url, url)
315 @property
316 def mathjax_config(self) -> str:
317 return cast("str", self.settings.get("mathjax_config", "TeX-AMS-MML_HTMLorMML-full,Safe"))
319 @property
320 def default_url(self) -> str:
321 return cast("str", self.settings.get("default_url", ""))
323 @property
324 def ws_url(self) -> str:
325 return cast("str", self.settings.get("websocket_url", ""))
327 @property
328 def contents_js_source(self) -> str:
329 self.log.debug(
330 "Using contents: %s",
331 self.settings.get("contents_js_source", "services/contents"),
332 )
333 return cast("str", self.settings.get("contents_js_source", "services/contents"))
335 # ---------------------------------------------------------------
336 # Manager objects
337 # ---------------------------------------------------------------
339 @property
340 def kernel_manager(self) -> AsyncMappingKernelManager:
341 return cast("AsyncMappingKernelManager", self.settings["kernel_manager"])
343 @property
344 def contents_manager(self) -> ContentsManager:
345 return cast("ContentsManager", self.settings["contents_manager"])
347 @property
348 def session_manager(self) -> SessionManager:
349 return cast("SessionManager", self.settings["session_manager"])
351 @property
352 def terminal_manager(self) -> TerminalManager:
353 return cast("TerminalManager", self.settings["terminal_manager"])
355 @property
356 def kernel_spec_manager(self) -> KernelSpecManager:
357 return cast("KernelSpecManager", self.settings["kernel_spec_manager"])
359 @property
360 def config_manager(self) -> ConfigManager:
361 return cast("ConfigManager", self.settings["config_manager"])
363 @property
364 def event_logger(self) -> EventLogger:
365 return cast("EventLogger", self.settings["event_logger"])
367 # ---------------------------------------------------------------
368 # CORS
369 # ---------------------------------------------------------------
371 @property
372 def allow_origin(self) -> str:
373 """Normal Access-Control-Allow-Origin"""
374 return cast("str", self.settings.get("allow_origin", ""))
376 @property
377 def allow_origin_pat(self) -> str | None:
378 """Regular expression version of allow_origin"""
379 return cast("str | None", self.settings.get("allow_origin_pat", None))
381 @property
382 def allow_credentials(self) -> bool:
383 """Whether to set Access-Control-Allow-Credentials"""
384 return cast("bool", self.settings.get("allow_credentials", False))
386 def set_default_headers(self) -> None:
387 """Add CORS headers, if defined"""
388 super().set_default_headers()
390 def set_cors_headers(self) -> None:
391 """Add CORS headers, if defined
393 Now that current_user is async (jupyter-server 2.0),
394 must be called at the end of prepare(), instead of in set_default_headers.
395 """
396 if self.allow_origin:
397 self.set_header("Access-Control-Allow-Origin", self.allow_origin)
398 elif self.allow_origin_pat:
399 origin = self.get_origin()
400 if origin and re.match(self.allow_origin_pat, origin):
401 self.set_header("Access-Control-Allow-Origin", origin)
402 elif self.token_authenticated and "Access-Control-Allow-Origin" not in self.settings.get(
403 "headers", {}
404 ):
405 # allow token-authenticated requests cross-origin by default.
406 # only apply this exception if allow-origin has not been specified.
407 self.set_header("Access-Control-Allow-Origin", self.request.headers.get("Origin", ""))
409 if self.allow_credentials:
410 self.set_header("Access-Control-Allow-Credentials", "true")
412 def set_attachment_header(self, filename: str) -> None:
413 """Set Content-Disposition: attachment header
415 As a method to ensure handling of filename encoding
416 """
417 escaped_filename = url_escape(filename)
418 self.set_header(
419 "Content-Disposition",
420 f"attachment; filename*=utf-8''{escaped_filename}",
421 )
423 def get_origin(self) -> str | None:
424 # Handle WebSocket Origin naming convention differences
425 # The difference between version 8 and 13 is that in 8 the
426 # client sends a "Sec-Websocket-Origin" header and in 13 it's
427 # simply "Origin".
428 if "Origin" in self.request.headers:
429 origin = self.request.headers.get("Origin")
430 else:
431 origin = self.request.headers.get("Sec-Websocket-Origin", None)
432 return origin
434 # origin_to_satisfy_tornado is present because tornado requires
435 # check_origin to take an origin argument, but we don't use it
436 def check_origin(self, origin_to_satisfy_tornado: str = "") -> bool:
437 """Check Origin for cross-site API requests, including websockets
439 Copied from WebSocket with changes:
441 - allow unspecified host/origin (e.g. scripts)
442 - allow token-authenticated requests
443 """
444 if self.allow_origin == "*" or self.skip_check_origin():
445 return True
447 host = self.request.headers.get("Host")
448 origin = self.request.headers.get("Origin")
450 # If no header is provided, let the request through.
451 # Origin can be None for:
452 # - same-origin (IE, Firefox)
453 # - Cross-site POST form (IE, Firefox)
454 # - Scripts
455 # The cross-site POST (XSRF) case is handled by tornado's xsrf_token
456 if origin is None or host is None:
457 return True
459 origin = origin.lower()
460 origin_host = urlparse(origin).netloc
462 # OK if origin matches host
463 if origin_host == host:
464 return True
466 # Check CORS headers
467 if self.allow_origin:
468 allow = bool(self.allow_origin == origin)
469 elif self.allow_origin_pat:
470 allow = bool(re.match(self.allow_origin_pat, origin))
471 else:
472 # No CORS headers deny the request
473 allow = False
474 if not allow:
475 self.log.warning(
476 "Blocking Cross Origin API request for %s. Origin: %s, Host: %s",
477 self.request.path,
478 origin,
479 host,
480 )
481 return allow
483 def check_referer(self) -> bool:
484 """Check Referer for cross-site requests.
485 Disables requests to certain endpoints with
486 external or missing Referer.
487 If set, allow_origin settings are applied to the Referer
488 to whitelist specific cross-origin sites.
489 Used on GET for api endpoints and /files/
490 to block cross-site inclusion (XSSI).
491 """
492 if self.allow_origin == "*" or self.skip_check_origin():
493 return True
495 host = self.request.headers.get("Host")
496 referer = self.request.headers.get("Referer")
498 if not host:
499 self.log.warning("Blocking request with no host")
500 return False
501 if not referer:
502 self.log.warning("Blocking request with no referer")
503 return False
505 referer_url = urlparse(referer)
506 referer_host = referer_url.netloc
507 if referer_host == host:
508 return True
510 # apply cross-origin checks to Referer:
511 origin = f"{referer_url.scheme}://{referer_url.netloc}"
512 if self.allow_origin:
513 allow = self.allow_origin == origin
514 elif self.allow_origin_pat:
515 allow = bool(re.match(self.allow_origin_pat, origin))
516 else:
517 # No CORS settings, deny the request
518 allow = False
520 if not allow:
521 self.log.warning(
522 "Blocking Cross Origin request for %s. Referer: %s, Host: %s",
523 self.request.path,
524 origin,
525 host,
526 )
527 return allow
529 def check_xsrf_cookie(self) -> None:
530 """Bypass xsrf cookie checks when token-authenticated"""
531 if not hasattr(self, "_jupyter_current_user"):
532 # Called too early, will be checked later
533 return None
534 if self.token_authenticated or self.settings.get("disable_check_xsrf", False):
535 # Token-authenticated requests do not need additional XSRF-check
536 # Servers without authentication are vulnerable to XSRF
537 return None
538 try:
539 if not self.check_origin():
540 raise web.HTTPError(404)
541 return super().check_xsrf_cookie()
542 except web.HTTPError as e:
543 if self.request.method in {"GET", "HEAD"}:
544 # Consider Referer a sufficient cross-origin check for GET requests
545 if not self.check_referer():
546 referer = self.request.headers.get("Referer")
547 if referer:
548 msg = f"Blocking Cross Origin request from {referer}."
549 else:
550 msg = "Blocking request from unknown origin"
551 raise web.HTTPError(403, msg) from e
552 else:
553 raise
555 def check_host(self) -> bool:
556 """Check the host header if remote access disallowed.
558 Returns True if the request should continue, False otherwise.
559 """
560 if self.settings.get("allow_remote_access", False):
561 return True
563 # Remove port (e.g. ':8888') from host
564 match = re.match(r"^(.*?)(:\d+)?$", self.request.host)
565 assert match is not None
566 host = match.group(1)
568 # Browsers format IPv6 addresses like [::1]; we need to remove the []
569 if host.startswith("[") and host.endswith("]"):
570 host = host[1:-1]
572 # UNIX socket handling
573 check_host = urldecode_unix_socket_path(host)
574 if check_host.startswith("/") and os.path.exists(check_host):
575 allow = True
576 else:
577 try:
578 addr = ipaddress.ip_address(host)
579 except ValueError:
580 # Not an IP address: check against hostnames
581 allow = host in self.settings.get("local_hostnames", ["localhost"])
582 else:
583 allow = addr.is_loopback
585 if not allow:
586 self.log.warning(
587 (
588 "Blocking request with non-local 'Host' %s (%s). "
589 "If the server should be accessible at that name, "
590 "set ServerApp.allow_remote_access to disable the check."
591 ),
592 host,
593 self.request.host,
594 )
595 return allow
597 async def prepare(self, *, _redirect_to_login=True) -> Awaitable[None] | None: # type:ignore[override]
598 """Prepare a response."""
599 # Set the current Jupyter Handler context variable.
600 CallContext.set(CallContext.JUPYTER_HANDLER, self)
602 if not self.check_host():
603 self.current_user = self._jupyter_current_user = None
604 raise web.HTTPError(403)
606 from jupyter_server.auth import IdentityProvider
608 mod_obj = inspect.getmodule(self.get_current_user)
609 assert mod_obj is not None
610 user: User | None = None
612 if type(self.identity_provider) is IdentityProvider and mod_obj.__name__ != __name__:
613 # check for overridden get_current_user + default IdentityProvider
614 # deprecated way to override auth (e.g. JupyterHub < 3.0)
615 # allow deprecated, overridden get_current_user
616 warnings.warn(
617 "Overriding JupyterHandler.get_current_user is deprecated in jupyter-server 2.0."
618 " Use an IdentityProvider class.",
619 DeprecationWarning,
620 stacklevel=1,
621 )
622 user = User(self.get_current_user())
623 else:
624 _user = self.identity_provider.get_user(self)
625 if isinstance(_user, Awaitable):
626 # IdentityProvider.get_user _may_ be async
627 _user = await _user
628 user = _user
630 # self.current_user for tornado's @web.authenticated
631 # self._jupyter_current_user for backward-compat in deprecated get_current_user calls
632 # and our own private checks for whether .current_user has been set
633 self.current_user = self._jupyter_current_user = user
634 # complete initial steps which require auth to resolve first:
635 self.set_cors_headers()
636 if self.request.method not in {"GET", "HEAD", "OPTIONS"}:
637 self.check_xsrf_cookie()
639 if not self.settings.get("allow_unauthenticated_access", False):
640 if not self.request.method:
641 raise HTTPError(403)
642 method = getattr(self, self.request.method.lower())
643 if not getattr(method, "__allow_unauthenticated", False):
644 if _redirect_to_login:
645 # reuse `web.authenticated` logic, which redirects to the login
646 # page on GET and HEAD and otherwise raises 403
647 return web.authenticated(lambda _: super().prepare())(self)
648 else:
649 # raise 403 if user is not known without redirecting to login page
650 user = self.current_user
651 if user is None:
652 self.log.warning(
653 f"Couldn't authenticate {self.__class__.__name__} connection"
654 )
655 raise web.HTTPError(403)
657 return super().prepare()
659 # ---------------------------------------------------------------
660 # template rendering
661 # ---------------------------------------------------------------
663 def get_template(self, name):
664 """Return the jinja template object for a given name"""
665 return self.settings["jinja2_env"].get_template(name)
667 def render_template(self, name, **ns):
668 """Render a template by name."""
669 ns.update(self.template_namespace)
670 template = self.get_template(name)
671 return template.render(**ns)
673 @property
674 def template_namespace(self) -> dict[str, Any]:
675 return dict(
676 base_url=self.base_url,
677 default_url=self.default_url,
678 ws_url=self.ws_url,
679 logged_in=self.logged_in,
680 allow_password_change=getattr(self.identity_provider, "allow_password_change", False),
681 auth_enabled=self.identity_provider.auth_enabled,
682 login_available=self.identity_provider.login_available,
683 token_available=bool(self.token),
684 static_url=self.static_url,
685 sys_info=json_sys_info(),
686 contents_js_source=self.contents_js_source,
687 version_hash=self.version_hash,
688 xsrf_form_html=self.xsrf_form_html,
689 token=self.token,
690 xsrf_token=self.xsrf_token.decode("utf8"),
691 nbjs_translations=json.dumps(
692 combine_translations(self.request.headers.get("Accept-Language", ""))
693 ),
694 **self.jinja_template_vars,
695 )
697 def get_json_body(self) -> dict[str, Any] | None:
698 """Return the body of the request as JSON data."""
699 if not self.request.body:
700 return None
701 # Do we need to call body.decode('utf-8') here?
702 body = self.request.body.strip().decode("utf-8")
703 try:
704 model = json.loads(body)
705 except Exception as e:
706 self.log.debug("Bad JSON: %r", body)
707 self.log.error("Couldn't parse JSON", exc_info=True)
708 raise web.HTTPError(400, "Invalid JSON in body of request") from e
709 return cast("dict[str, Any]", model)
711 def write_error(self, status_code: int, **kwargs: Any) -> None:
712 """render custom error pages"""
713 exc_info = kwargs.get("exc_info")
714 message = ""
715 status_message = responses.get(status_code, "Unknown HTTP Error")
717 if exc_info:
718 exception = exc_info[1]
719 # get the custom message, if defined
720 try:
721 message = exception.log_message % exception.args
722 except Exception:
723 pass
725 # construct the custom reason, if defined
726 reason = getattr(exception, "reason", "")
727 if reason:
728 status_message = reason
729 else:
730 exception = "(unknown)"
732 # build template namespace
733 ns = {
734 "status_code": status_code,
735 "status_message": status_message,
736 "message": message,
737 "exception": exception,
738 }
740 self.set_header("Content-Type", "text/html")
741 # render the template
742 try:
743 html = self.render_template("%s.html" % status_code, **ns)
744 except TemplateNotFound:
745 html = self.render_template("error.html", **ns)
747 self.write(html)
750class APIHandler(JupyterHandler):
751 """Base class for API handlers"""
753 async def prepare(self) -> None: # type:ignore[override]
754 """Prepare an API response."""
755 await super().prepare()
756 if not self.check_origin():
757 raise web.HTTPError(404)
759 def write_error(self, status_code: int, **kwargs: Any) -> None:
760 """APIHandler errors are JSON, not human pages"""
761 self.set_header("Content-Type", "application/json")
762 message = responses.get(status_code, "Unknown HTTP Error")
763 reply: dict[str, Any] = {
764 "message": message,
765 }
766 exc_info = kwargs.get("exc_info")
767 if exc_info:
768 e = exc_info[1]
769 if isinstance(e, HTTPError):
770 reply["message"] = e.log_message or message
771 reply["reason"] = e.reason
772 else:
773 reply["message"] = "Unhandled error"
774 reply["reason"] = None
775 # backward-compatibility: traceback field is present,
776 # but always empty
777 reply["traceback"] = ""
778 self.log.warning("wrote error: %r", reply["message"])
779 self.finish(json.dumps(reply))
781 def get_login_url(self) -> str:
782 """Get the login url."""
783 # if get_login_url is invoked in an API handler,
784 # that means @web.authenticated is trying to trigger a redirect.
785 # instead of redirecting, raise 403 instead.
786 if not self.current_user:
787 raise web.HTTPError(403)
788 return super().get_login_url()
790 @property
791 def content_security_policy(self) -> str:
792 csp = "; ".join( # noqa: FLY002
793 [
794 super().content_security_policy,
795 "default-src 'none'",
796 ]
797 )
798 return csp
800 # set _track_activity = False on API handlers that shouldn't track activity
801 _track_activity = True
803 def update_api_activity(self) -> None:
804 """Update last_activity of API requests"""
805 # record activity of authenticated requests
806 if (
807 self._track_activity
808 and getattr(self, "_jupyter_current_user", None)
809 and self.get_argument("no_track_activity", None) is None
810 ):
811 self.settings["api_last_activity"] = utcnow()
813 def finish(self, *args: Any, **kwargs: Any) -> Future[Any]:
814 """Finish an API response."""
815 self.update_api_activity()
816 # Allow caller to indicate content-type...
817 set_content_type = kwargs.pop("set_content_type", "application/json")
818 self.set_header("Content-Type", set_content_type)
819 return super().finish(*args, **kwargs)
821 @allow_unauthenticated
822 def options(self, *args: Any, **kwargs: Any) -> None:
823 """Get the options."""
824 if "Access-Control-Allow-Headers" in self.settings.get("headers", {}):
825 self.set_header(
826 "Access-Control-Allow-Headers",
827 self.settings["headers"]["Access-Control-Allow-Headers"],
828 )
829 else:
830 self.set_header(
831 "Access-Control-Allow-Headers",
832 "accept, content-type, authorization, x-xsrftoken",
833 )
834 self.set_header("Access-Control-Allow-Methods", "GET, PUT, POST, PATCH, DELETE, OPTIONS")
836 # if authorization header is requested,
837 # that means the request is token-authenticated.
838 # avoid browser-side rejection of the preflight request.
839 # only allow this exception if allow_origin has not been specified
840 # and Jupyter server authentication is enabled.
841 # If the token is not valid, the 'real' request will still be rejected.
842 requested_headers = self.request.headers.get("Access-Control-Request-Headers", "").split(
843 ","
844 )
845 if (
846 requested_headers
847 and any(h.strip().lower() == "authorization" for h in requested_headers)
848 and (
849 # FIXME: it would be even better to check specifically for token-auth,
850 # but there is currently no API for this.
851 self.login_available
852 )
853 and (
854 self.allow_origin
855 or self.allow_origin_pat
856 or "Access-Control-Allow-Origin" in self.settings.get("headers", {})
857 )
858 ):
859 self.set_header("Access-Control-Allow-Origin", self.request.headers.get("Origin", ""))
862class Template404(JupyterHandler):
863 """Render our 404 template"""
865 async def prepare(self) -> None: # type:ignore[override]
866 """Prepare a 404 response."""
867 await super().prepare()
868 raise web.HTTPError(404)
871class AuthenticatedFileHandler(JupyterHandler, web.StaticFileHandler):
872 """static files should only be accessible when logged in"""
874 auth_resource = "contents"
876 @property
877 def content_security_policy(self) -> str:
878 # In case we're serving HTML/SVG, confine any Javascript to a unique
879 # origin so it can't interact with the Jupyter server.
880 return super().content_security_policy + "; sandbox allow-scripts"
882 @web.authenticated
883 @authorized
884 def head(self, path: str) -> Awaitable[None]: # type:ignore[override]
885 """Get the head response for a path."""
886 self.check_xsrf_cookie()
887 return super().head(path)
889 @web.authenticated
890 @authorized
891 def get( # type:ignore[override]
892 self, path: str, **kwargs: Any
893 ) -> Awaitable[None]:
894 """Get a file by path."""
895 self.check_xsrf_cookie()
896 if os.path.splitext(path)[1] == ".ipynb" or self.get_argument("download", None):
897 name = path.rsplit("/", 1)[-1]
898 self.set_attachment_header(name)
900 return web.StaticFileHandler.get(self, path, **kwargs)
902 def get_content_type(self) -> str:
903 """Get the content type."""
904 assert self.absolute_path is not None
905 path = self.absolute_path.strip("/")
906 if "/" in path:
907 _, name = path.rsplit("/", 1)
908 else:
909 name = path
910 if name.endswith(".ipynb"):
911 return "application/x-ipynb+json"
912 else:
913 cur_mime = mimetypes.guess_type(name)[0]
914 if cur_mime == "text/plain":
915 return "text/plain; charset=UTF-8"
916 else:
917 return super().get_content_type()
919 def set_headers(self) -> None:
920 """Set the headers."""
921 super().set_headers()
922 # disable browser caching, rely on 304 replies for savings
923 if "v" not in self.request.arguments:
924 self.add_header("Cache-Control", "no-cache")
926 def compute_etag(self) -> str | None:
927 """Compute the etag."""
928 return None
930 def validate_absolute_path(self, root: str, absolute_path: str) -> str:
931 """Validate and return the absolute path.
933 Requires tornado 3.1
935 Adding to tornado's own handling, forbids the serving of hidden files.
936 """
937 abs_path = super().validate_absolute_path(root, absolute_path)
938 abs_root = os.path.abspath(root)
939 assert abs_path is not None
940 if not self.contents_manager.allow_hidden and is_hidden(abs_path, abs_root):
941 self.log.info(
942 "Refusing to serve hidden file, via 404 Error, use flag 'ContentsManager.allow_hidden' to enable"
943 )
944 raise web.HTTPError(404)
945 return abs_path
948def json_errors(method: Any) -> Any: # pragma: no cover
949 """Decorate methods with this to return GitHub style JSON errors.
951 This should be used on any JSON API on any handler method that can raise HTTPErrors.
953 This will grab the latest HTTPError exception using sys.exc_info
954 and then:
956 1. Set the HTTP status code based on the HTTPError
957 2. Create and return a JSON body with a message field describing
958 the error in a human readable form.
959 """
960 warnings.warn(
961 "@json_errors is deprecated in notebook 5.2.0. Subclass APIHandler instead.",
962 DeprecationWarning,
963 stacklevel=2,
964 )
966 @functools.wraps(method)
967 def wrapper(self, *args, **kwargs):
968 self.write_error = types.MethodType(APIHandler.write_error, self)
969 return method(self, *args, **kwargs)
971 return wrapper
974# -----------------------------------------------------------------------------
975# File handler
976# -----------------------------------------------------------------------------
978# to minimize subclass changes:
979HTTPError = web.HTTPError
982class FileFindHandler(JupyterHandler, web.StaticFileHandler):
983 """subclass of StaticFileHandler for serving files from a search path
985 The setting "static_immutable_cache" can be set up to serve some static
986 file as immutable (e.g. file name containing a hash). The setting is a
987 list of base URL, every static file URL starting with one of those will
988 be immutable.
989 """
991 # cache search results, don't search for files more than once
992 _static_paths: dict[str, str] = {}
993 root: tuple[str] # type:ignore[assignment]
995 def set_headers(self) -> None:
996 """Set the headers."""
997 super().set_headers()
999 immutable_paths = self.settings.get("static_immutable_cache", [])
1001 # allow immutable cache for files
1002 if any(self.request.path.startswith(path) for path in immutable_paths):
1003 self.set_header("Cache-Control", "public, max-age=31536000, immutable")
1005 # disable browser caching, rely on 304 replies for savings
1006 elif "v" not in self.request.arguments or any(
1007 self.request.path.startswith(path) for path in self.no_cache_paths
1008 ):
1009 self.set_header("Cache-Control", "no-cache")
1011 def initialize(
1012 self,
1013 path: str | list[str],
1014 default_filename: str | None = None,
1015 no_cache_paths: list[str] | None = None,
1016 ) -> None:
1017 """Initialize the file find handler."""
1018 self.no_cache_paths = no_cache_paths or []
1020 if isinstance(path, str):
1021 path = [path]
1023 self.root = tuple(os.path.abspath(os.path.expanduser(p)) + os.sep for p in path) # type:ignore[assignment]
1024 self.default_filename = default_filename
1026 def compute_etag(self) -> str | None:
1027 """Compute the etag."""
1028 return None
1030 # access is allowed as this class is used to serve static assets on login page
1031 # TODO: create an allow-list of files used on login page and remove this decorator
1032 @allow_unauthenticated
1033 def get(self, path: str, include_body: bool = True) -> Coroutine[Any, Any, None]:
1034 return super().get(path, include_body)
1036 # access is allowed as this class is used to serve static assets on login page
1037 # TODO: create an allow-list of files used on login page and remove this decorator
1038 @allow_unauthenticated
1039 def head(self, path: str) -> Awaitable[None]:
1040 return super().head(path)
1042 @classmethod
1043 def get_absolute_path(cls, roots: Sequence[str], path: str) -> str:
1044 """locate a file to serve on our static file search path"""
1045 with cls._lock:
1046 if path in cls._static_paths:
1047 return cls._static_paths[path]
1048 try:
1049 abspath = os.path.abspath(filefind(path, roots))
1050 except OSError:
1051 # IOError means not found
1052 return ""
1054 cls._static_paths[path] = abspath
1056 log().debug(f"Path {path} served from {abspath}")
1057 return abspath
1059 def validate_absolute_path(self, root: str, absolute_path: str) -> str | None:
1060 """check if the file should be served (raises 404, 403, etc.)"""
1061 if not absolute_path:
1062 raise web.HTTPError(404)
1064 for root in self.root: # noqa: PLR1704
1065 if (absolute_path + os.sep).startswith(root):
1066 break
1068 return super().validate_absolute_path(root, absolute_path)
1071class APIVersionHandler(APIHandler):
1072 """An API handler for the server version."""
1074 _track_activity = False
1076 @allow_unauthenticated
1077 def get(self) -> None:
1078 """Get the server version info."""
1079 # not authenticated, so give as few info as possible
1080 self.finish(json.dumps({"version": jupyter_server.__version__}))
1083class TrailingSlashHandler(web.RequestHandler):
1084 """Simple redirect handler that strips trailing slashes
1086 This should be the first, highest priority handler.
1087 """
1089 @allow_unauthenticated
1090 def get(self) -> None:
1091 """Handle trailing slashes in a get."""
1092 assert self.request.uri is not None
1093 path, *rest = self.request.uri.partition("?")
1094 # trim trailing *and* leading /
1095 # to avoid misinterpreting repeated '//'
1096 path = "/" + path.strip("/")
1097 new_uri = "".join([path, *rest])
1098 self.redirect(new_uri)
1100 post = put = get
1103class MainHandler(JupyterHandler):
1104 """Simple handler for base_url."""
1106 @allow_unauthenticated
1107 def get(self) -> None:
1108 """Get the main template."""
1109 html = self.render_template("main.html")
1110 self.write(html)
1112 post = put = get
1115class FilesRedirectHandler(JupyterHandler):
1116 """Handler for redirecting relative URLs to the /files/ handler"""
1118 @staticmethod
1119 async def redirect_to_files(self: Any, path: str) -> None: # noqa: PLW0211
1120 """make redirect logic a reusable static method
1122 so it can be called from other handlers.
1123 """
1124 cm = self.contents_manager
1125 if await ensure_async(cm.dir_exists(path)):
1126 # it's a *directory*, redirect to /tree
1127 url = url_path_join(self.base_url, "tree", url_escape(path))
1128 else:
1129 orig_path = path
1130 # otherwise, redirect to /files
1131 parts = path.split("/")
1133 if not await ensure_async(cm.file_exists(path=path)) and "files" in parts:
1134 # redirect without files/ iff it would 404
1135 # this preserves pre-2.0-style 'files/' links
1136 self.log.warning("Deprecated files/ URL: %s", orig_path)
1137 parts.remove("files")
1138 path = "/".join(parts)
1140 if not await ensure_async(cm.file_exists(path=path)):
1141 raise web.HTTPError(404)
1143 url = url_path_join(self.base_url, "files", url_escape(path))
1144 self.log.debug("Redirecting %s to %s", self.request.path, url)
1145 self.redirect(url)
1147 @allow_unauthenticated
1148 async def get(self, path: str = "") -> None:
1149 return await self.redirect_to_files(self, path)
1152class RedirectWithParams(web.RequestHandler):
1153 """Same as web.RedirectHandler, but preserves URL parameters"""
1155 def initialize(self, url: str, permanent: bool = True) -> None:
1156 """Initialize a redirect handler."""
1157 self._url = url
1158 self._permanent = permanent
1160 @allow_unauthenticated
1161 def get(self) -> None:
1162 """Get a redirect."""
1163 sep = "&" if "?" in self._url else "?"
1164 url = sep.join([self._url, self.request.query])
1165 self.redirect(url, permanent=self._permanent)
1168class PrometheusMetricsHandler(JupyterHandler):
1169 """
1170 Return prometheus metrics for this server
1171 """
1173 @allow_unauthenticated
1174 def get(self) -> None:
1175 """Get prometheus metrics."""
1176 if self.settings["authenticate_prometheus"] and not self.logged_in:
1177 raise web.HTTPError(403)
1179 self.set_header("Content-Type", prometheus_client.CONTENT_TYPE_LATEST)
1180 self.write(prometheus_client.generate_latest(prometheus_client.REGISTRY))
1183class PublicStaticFileHandler(web.StaticFileHandler):
1184 """Same as web.StaticFileHandler, but decorated to acknowledge that auth is not required."""
1186 @allow_unauthenticated
1187 def head(self, path: str) -> Awaitable[None]:
1188 return super().head(path)
1190 @allow_unauthenticated
1191 def get(self, path: str, include_body: bool = True) -> Coroutine[Any, Any, None]:
1192 return super().get(path, include_body)
1195# -----------------------------------------------------------------------------
1196# URL pattern fragments for reuse
1197# -----------------------------------------------------------------------------
1199# path matches any number of `/foo[/bar...]` or just `/` or ''
1200path_regex = r"(?P<path>(?:(?:/[^/]+)+|/?))"
1202# -----------------------------------------------------------------------------
1203# URL to handler mappings
1204# -----------------------------------------------------------------------------
1207default_handlers = [
1208 (r".*/", TrailingSlashHandler),
1209 (r"api", APIVersionHandler),
1210 (r"/(robots\.txt|favicon\.ico)", PublicStaticFileHandler),
1211 (r"/metrics", PrometheusMetricsHandler),
1212]