Coverage for /pythoncovmergedfiles/medio/medio/src/jupyter_server/jupyter_server/auth/identity.py: 31%
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1"""Identity Provider interface
3This defines the _authentication_ layer of Jupyter Server,
4to be used in combination with Authorizer for _authorization_.
6.. versionadded:: 2.0
7"""
9from __future__ import annotations
11import binascii
12import datetime
13import json
14import os
15import re
16import sys
17import typing as t
18import uuid
19from dataclasses import asdict, dataclass
20from http.cookies import Morsel
22from tornado import escape, httputil, web
23from traitlets import Bool, Dict, Type, Unicode, default
24from traitlets.config import LoggingConfigurable
26from jupyter_server.transutils import _i18n
28from .security import passwd_check, set_password
29from .utils import get_anonymous_username
31_non_alphanum = re.compile(r"[^A-Za-z0-9]")
34@dataclass
35class User:
36 """Object representing a User
38 This or a subclass should be returned from IdentityProvider.get_user
39 """
41 username: str # the only truly required field
43 # these fields are filled from username if not specified
44 # name is the 'real' name of the user
45 name: str = ""
46 # display_name is a shorter name for us in UI,
47 # if different from name. e.g. a nickname
48 display_name: str = ""
50 # these fields are left as None if undefined
51 initials: str | None = None
52 avatar_url: str | None = None
53 color: str | None = None
55 # TODO: extension fields?
56 # ext: Dict[str, Dict[str, Any]] = field(default_factory=dict)
58 def __post_init__(self):
59 self.fill_defaults()
61 def fill_defaults(self):
62 """Fill out default fields in the identity model
64 - Ensures all values are defined
65 - Fills out derivative values for name fields fields
66 - Fills out null values for optional fields
67 """
69 # username is the only truly required field
70 if not self.username:
71 msg = f"user.username must not be empty: {self}"
72 raise ValueError(msg)
74 # derive name fields from username -> name -> display name
75 if not self.name:
76 self.name = self.username
77 if not self.display_name:
78 self.display_name = self.name
81def _backward_compat_user(got_user: t.Any) -> User:
82 """Backward-compatibility for LoginHandler.get_user
84 Prior to 2.0, LoginHandler.get_user could return anything truthy.
86 Typically, this was either a simple string username,
87 or a simple dict.
89 Make some effort to allow common patterns to keep working.
90 """
91 if isinstance(got_user, str):
92 return User(username=got_user)
93 elif isinstance(got_user, dict):
94 kwargs = {}
95 if "username" not in got_user and "name" in got_user:
96 kwargs["username"] = got_user["name"]
97 for field in User.__dataclass_fields__:
98 if field in got_user:
99 kwargs[field] = got_user[field]
100 try:
101 return User(**kwargs)
102 except TypeError:
103 msg = f"Unrecognized user: {got_user}"
104 raise ValueError(msg) from None
105 else:
106 msg = f"Unrecognized user: {got_user}"
107 raise ValueError(msg)
110class IdentityProvider(LoggingConfigurable):
111 """
112 Interface for providing identity management and authentication.
114 Two principle methods:
116 - :meth:`~jupyter_server.auth.IdentityProvider.get_user` returns a :class:`~.User` object
117 for successful authentication, or None for no-identity-found.
118 - :meth:`~jupyter_server.auth.IdentityProvider.identity_model` turns a :class:`~jupyter_server.auth.User` into a JSONable dict.
119 The default is to use :py:meth:`dataclasses.asdict`,
120 and usually shouldn't need override.
122 Additional methods can customize authentication.
124 .. versionadded:: 2.0
125 """
127 cookie_name: str | Unicode[str, str | bytes] = Unicode(
128 "",
129 config=True,
130 help=_i18n("Name of the cookie to set for persisting login. Default: username-${Host}."),
131 )
133 cookie_options = Dict(
134 config=True,
135 help=_i18n(
136 "Extra keyword arguments to pass to `set_secure_cookie`."
137 " See tornado's set_secure_cookie docs for details."
138 ),
139 )
141 secure_cookie: bool | Bool[bool | None, bool | int | None] = Bool(
142 None,
143 allow_none=True,
144 config=True,
145 help=_i18n(
146 "Specify whether login cookie should have the `secure` property (HTTPS-only)."
147 "Only needed when protocol-detection gives the wrong answer due to proxies."
148 ),
149 )
151 get_secure_cookie_kwargs = Dict(
152 config=True,
153 help=_i18n(
154 "Extra keyword arguments to pass to `get_secure_cookie`."
155 " See tornado's get_secure_cookie docs for details."
156 ),
157 )
159 token: str | Unicode[str, str | bytes] = Unicode(
160 "<generated>",
161 help=_i18n(
162 """Token used for authenticating first-time connections to the server.
164 The token can be read from the file referenced by JUPYTER_TOKEN_FILE or set directly
165 with the JUPYTER_TOKEN environment variable.
167 When no password is enabled,
168 the default is to generate a new, random token.
170 Setting to an empty string disables authentication altogether, which is NOT RECOMMENDED.
172 Prior to 2.0: configured as ServerApp.token
173 """
174 ),
175 ).tag(config=True)
177 login_handler_class = Type(
178 default_value="jupyter_server.auth.login.LoginFormHandler",
179 klass=web.RequestHandler,
180 config=True,
181 help=_i18n("The login handler class to use, if any."),
182 )
184 logout_handler_class = Type(
185 default_value="jupyter_server.auth.logout.LogoutHandler",
186 klass=web.RequestHandler,
187 config=True,
188 help=_i18n("The logout handler class to use."),
189 )
191 token_generated = False
193 @default("token")
194 def _token_default(self):
195 if os.getenv("JUPYTER_TOKEN"):
196 self.token_generated = False
197 return os.environ["JUPYTER_TOKEN"]
198 if os.getenv("JUPYTER_TOKEN_FILE"):
199 self.token_generated = False
200 with open(os.environ["JUPYTER_TOKEN_FILE"]) as token_file:
201 return token_file.read()
202 if not self.need_token:
203 # no token if password is enabled
204 self.token_generated = False
205 return ""
206 else:
207 self.token_generated = True
208 return binascii.hexlify(os.urandom(24)).decode("ascii")
210 need_token: bool | Bool[bool, t.Union[bool, int]] = Bool(True)
212 def get_user(self, handler: web.RequestHandler) -> User | None | t.Awaitable[User | None]:
213 """Get the authenticated user for a request
215 Must return a :class:`jupyter_server.auth.User`,
216 though it may be a subclass.
218 Return None if the request is not authenticated.
220 _may_ be a coroutine
221 """
222 return self._get_user(handler)
224 # not sure how to have optional-async type signature
225 # on base class with `async def` without splitting it into two methods
227 async def _get_user(self, handler: web.RequestHandler) -> User | None:
228 """Get the user."""
229 if getattr(handler, "_jupyter_current_user", None):
230 # already authenticated
231 return t.cast(User, handler._jupyter_current_user) # type:ignore[attr-defined]
232 _token_user: User | None | t.Awaitable[User | None] = self.get_user_token(handler)
233 if isinstance(_token_user, t.Awaitable):
234 _token_user = await _token_user
235 token_user: User | None = _token_user # need second variable name to collapse type
236 _cookie_user = self.get_user_cookie(handler)
237 if isinstance(_cookie_user, t.Awaitable):
238 _cookie_user = await _cookie_user
239 cookie_user: User | None = _cookie_user
240 # prefer token to cookie if both given,
241 # because token is always explicit
242 user = token_user or cookie_user
244 if user is not None and token_user is not None:
245 # if token-authenticated, persist user_id in cookie
246 # if it hasn't already been stored there
247 if user != cookie_user:
248 self.set_login_cookie(handler, user)
249 # Record that the current request has been authenticated with a token.
250 # Used in is_token_authenticated above.
251 handler._token_authenticated = True # type:ignore[attr-defined]
253 if user is None:
254 # If an invalid cookie was sent, clear it to prevent unnecessary
255 # extra warnings. But don't do this on a request with *no* cookie,
256 # because that can erroneously log you out (see gh-3365)
257 cookie_name = self.get_cookie_name(handler)
258 cookie = handler.get_cookie(cookie_name)
259 if cookie is not None:
260 self.log.warning(f"Clearing invalid/expired login cookie {cookie_name}")
261 self.clear_login_cookie(handler)
262 if not self.auth_enabled:
263 # Completely insecure! No authentication at all.
264 # No need to warn here, though; validate_security will have already done that.
265 user = self.generate_anonymous_user(handler)
266 # persist user on first request
267 # so the user data is stable for a given browser session
268 self.set_login_cookie(handler, user)
270 return user
272 def identity_model(self, user: User) -> dict[str, t.Any]:
273 """Return a User as an Identity model"""
274 # TODO: validate?
275 return asdict(user)
277 def get_handlers(self) -> list[tuple[str, object]]:
278 """Return list of additional handlers for this identity provider
280 For example, an OAuth callback handler.
281 """
282 handlers = []
283 if self.login_available:
284 handlers.append((r"/login", self.login_handler_class))
285 if self.logout_available:
286 handlers.append((r"/logout", self.logout_handler_class))
287 return handlers
289 def user_to_cookie(self, user: User) -> str:
290 """Serialize a user to a string for storage in a cookie
292 If overriding in a subclass, make sure to define user_from_cookie as well.
294 Default is just the user's username.
295 """
296 # default: username is enough
297 cookie = json.dumps(
298 {
299 "username": user.username,
300 "name": user.name,
301 "display_name": user.display_name,
302 "initials": user.initials,
303 "color": user.color,
304 }
305 )
306 return cookie
308 def user_from_cookie(self, cookie_value: str) -> User | None:
309 """Inverse of user_to_cookie"""
310 user = json.loads(cookie_value)
311 return User(
312 user["username"],
313 user["name"],
314 user["display_name"],
315 user["initials"],
316 None,
317 user["color"],
318 )
320 def get_cookie_name(self, handler: web.RequestHandler) -> str:
321 """Return the login cookie name
323 Uses IdentityProvider.cookie_name, if defined.
324 Default is to generate a string taking host into account to avoid
325 collisions for multiple servers on one hostname with different ports.
326 """
327 if self.cookie_name:
328 return self.cookie_name
329 else:
330 return _non_alphanum.sub("-", f"username-{handler.request.host}")
332 def set_login_cookie(self, handler: web.RequestHandler, user: User) -> None:
333 """Call this on handlers to set the login cookie for success"""
334 cookie_options = {}
335 cookie_options.update(self.cookie_options)
336 cookie_options.setdefault("httponly", True)
337 # tornado <4.2 has a bug that considers secure==True as soon as
338 # 'secure' kwarg is passed to set_secure_cookie
339 secure_cookie = self.secure_cookie
340 if secure_cookie is None:
341 secure_cookie = handler.request.protocol == "https"
342 if secure_cookie:
343 cookie_options.setdefault("secure", True)
344 cookie_options.setdefault("path", handler.base_url) # type:ignore[attr-defined]
345 cookie_name = self.get_cookie_name(handler)
346 handler.set_secure_cookie(cookie_name, self.user_to_cookie(user), **cookie_options)
348 def _force_clear_cookie(
349 self, handler: web.RequestHandler, name: str, path: str = "/", domain: str | None = None
350 ) -> None:
351 """Deletes the cookie with the given name.
353 Tornado's cookie handling currently (Jan 2018) stores cookies in a dict
354 keyed by name, so it can only modify one cookie with a given name per
355 response. The browser can store multiple cookies with the same name
356 but different domains and/or paths. This method lets us clear multiple
357 cookies with the same name.
359 Due to limitations of the cookie protocol, you must pass the same
360 path and domain to clear a cookie as were used when that cookie
361 was set (but there is no way to find out on the server side
362 which values were used for a given cookie).
363 """
364 name = escape.native_str(name)
365 expires = datetime.datetime.now(tz=datetime.timezone.utc) - datetime.timedelta(days=365)
367 morsel: Morsel[t.Any] = Morsel()
368 morsel.set(name, "", '""')
369 morsel["expires"] = httputil.format_timestamp(expires)
370 morsel["path"] = path
371 if domain:
372 morsel["domain"] = domain
373 handler.add_header("Set-Cookie", morsel.OutputString())
375 def clear_login_cookie(self, handler: web.RequestHandler) -> None:
376 """Clear the login cookie, effectively logging out the session."""
377 cookie_options = {}
378 cookie_options.update(self.cookie_options)
379 path = cookie_options.setdefault("path", handler.base_url) # type:ignore[attr-defined]
380 cookie_name = self.get_cookie_name(handler)
381 handler.clear_cookie(cookie_name, path=path)
382 if path and path != "/":
383 # also clear cookie on / to ensure old cookies are cleared
384 # after the change in path behavior.
385 # N.B. This bypasses the normal cookie handling, which can't update
386 # two cookies with the same name. See the method above.
387 self._force_clear_cookie(handler, cookie_name)
389 def get_user_cookie(
390 self, handler: web.RequestHandler
391 ) -> User | None | t.Awaitable[User | None]:
392 """Get user from a cookie
394 Calls user_from_cookie to deserialize cookie value
395 """
396 _user_cookie = handler.get_secure_cookie(
397 self.get_cookie_name(handler),
398 **self.get_secure_cookie_kwargs,
399 )
400 if not _user_cookie:
401 return None
402 user_cookie = _user_cookie.decode()
403 # TODO: try/catch in case of change in config?
404 try:
405 return self.user_from_cookie(user_cookie)
406 except Exception as e:
407 # log bad cookie itself, only at debug-level
408 self.log.debug(f"Error unpacking user from cookie: cookie={user_cookie}", exc_info=True)
409 self.log.error(f"Error unpacking user from cookie: {e}")
410 return None
412 auth_header_pat = re.compile(r"(token|bearer)\s+(.+)", re.IGNORECASE)
414 def get_token(self, handler: web.RequestHandler) -> str | None:
415 """Get the user token from a request
417 Default:
419 - in URL parameters: ?token=<token>
420 - in header: Authorization: token <token>
421 """
422 user_token = handler.get_argument("token", "")
423 if not user_token:
424 # get it from Authorization header
425 m = self.auth_header_pat.match(handler.request.headers.get("Authorization", ""))
426 if m:
427 user_token = m.group(2)
428 return user_token
430 async def get_user_token(self, handler: web.RequestHandler) -> User | None:
431 """Identify the user based on a token in the URL or Authorization header
433 Returns:
434 - uuid if authenticated
435 - None if not
436 """
437 token = t.cast("str | None", handler.token) # type:ignore[attr-defined]
438 if not token:
439 return None
440 # check login token from URL argument or Authorization header
441 user_token = self.get_token(handler)
442 authenticated = False
443 if user_token == token:
444 # token-authenticated, set the login cookie
445 self.log.debug(
446 "Accepting token-authenticated request from %s",
447 handler.request.remote_ip,
448 )
449 authenticated = True
451 if authenticated:
452 # token does not correspond to user-id,
453 # which is stored in a cookie.
454 # still check the cookie for the user id
455 _user = self.get_user_cookie(handler)
456 if isinstance(_user, t.Awaitable):
457 _user = await _user
458 user: User | None = _user
459 if user is None:
460 user = self.generate_anonymous_user(handler)
461 return user
462 else:
463 return None
465 def generate_anonymous_user(self, handler: web.RequestHandler) -> User:
466 """Generate a random anonymous user.
468 For use when a single shared token is used,
469 but does not identify a user.
470 """
471 user_id = uuid.uuid4().hex
472 moon = get_anonymous_username()
473 name = display_name = f"Anonymous {moon}"
474 initials = f"A{moon[0]}"
475 color = None
476 handler.log.debug(f"Generating new user for token-authenticated request: {user_id}") # type:ignore[attr-defined]
477 return User(user_id, name, display_name, initials, None, color)
479 def should_check_origin(self, handler: web.RequestHandler) -> bool:
480 """Should the Handler check for CORS origin validation?
482 Origin check should be skipped for token-authenticated requests.
484 Returns:
485 - True, if Handler must check for valid CORS origin.
486 - False, if Handler should skip origin check since requests are token-authenticated.
487 """
488 return not self.is_token_authenticated(handler)
490 def is_token_authenticated(self, handler: web.RequestHandler) -> bool:
491 """Returns True if handler has been token authenticated. Otherwise, False.
493 Login with a token is used to signal certain things, such as:
495 - permit access to REST API
496 - xsrf protection
497 - skip origin-checks for scripts
498 """
499 # ensure get_user has been called, so we know if we're token-authenticated
500 handler.current_user # noqa: B018
501 return getattr(handler, "_token_authenticated", False)
503 def validate_security(
504 self,
505 app: t.Any,
506 ssl_options: dict[str, t.Any] | None = None,
507 ) -> None:
508 """Check the application's security.
510 Show messages, or abort if necessary, based on the security configuration.
511 """
512 if not app.ip:
513 warning = "WARNING: The Jupyter server is listening on all IP addresses"
514 if ssl_options is None:
515 app.log.warning(f"{warning} and not using encryption. This is not recommended.")
516 if not self.auth_enabled:
517 app.log.warning(
518 f"{warning} and not using authentication. "
519 "This is highly insecure and not recommended."
520 )
521 elif not self.auth_enabled:
522 app.log.warning(
523 "All authentication is disabled."
524 " Anyone who can connect to this server will be able to run code."
525 )
527 def process_login_form(self, handler: web.RequestHandler) -> User | None:
528 """Process login form data
530 Return authenticated User if successful, None if not.
531 """
532 typed_password = handler.get_argument("password", default="")
533 user = None
534 if not self.auth_enabled:
535 self.log.warning("Accepting anonymous login because auth fully disabled!")
536 return self.generate_anonymous_user(handler)
538 if self.token and self.token == typed_password:
539 return t.cast(User, self.user_for_token(typed_password)) # type:ignore[attr-defined]
541 return user
543 @property
544 def auth_enabled(self):
545 """Is authentication enabled?
547 Should always be True, but may be False in rare, insecure cases
548 where requests with no auth are allowed.
550 Previously: LoginHandler.get_login_available
551 """
552 return True
554 @property
555 def login_available(self):
556 """Whether a LoginHandler is needed - and therefore whether the login page should be displayed."""
557 return self.auth_enabled
559 @property
560 def logout_available(self):
561 """Whether a LogoutHandler is needed."""
562 return True
565class PasswordIdentityProvider(IdentityProvider):
566 """A password identity provider."""
568 hashed_password = Unicode(
569 "",
570 config=True,
571 help=_i18n(
572 """
573 Hashed password to use for web authentication.
575 To generate, type in a python/IPython shell:
577 from jupyter_server.auth import passwd; passwd()
579 The string should be of the form type:salt:hashed-password.
580 """
581 ),
582 )
584 password_required = Bool(
585 False,
586 config=True,
587 help=_i18n(
588 """
589 Forces users to use a password for the Jupyter server.
590 This is useful in a multi user environment, for instance when
591 everybody in the LAN can access each other's machine through ssh.
593 In such a case, serving on localhost is not secure since
594 any user can connect to the Jupyter server via ssh.
596 """
597 ),
598 )
600 allow_password_change = Bool(
601 True,
602 config=True,
603 help=_i18n(
604 """
605 Allow password to be changed at login for the Jupyter server.
607 While logging in with a token, the Jupyter server UI will give the opportunity to
608 the user to enter a new password at the same time that will replace
609 the token login mechanism.
611 This can be set to False to prevent changing password from the UI/API.
612 """
613 ),
614 )
616 @default("need_token")
617 def _need_token_default(self):
618 return not bool(self.hashed_password)
620 @property
621 def login_available(self) -> bool:
622 """Whether a LoginHandler is needed - and therefore whether the login page should be displayed."""
623 return self.auth_enabled
625 @property
626 def auth_enabled(self) -> bool:
627 """Return whether any auth is enabled"""
628 return bool(self.hashed_password or self.token)
630 def passwd_check(self, password):
631 """Check password against our stored hashed password"""
632 return passwd_check(self.hashed_password, password)
634 def process_login_form(self, handler: web.RequestHandler) -> User | None:
635 """Process login form data
637 Return authenticated User if successful, None if not.
638 """
639 typed_password = handler.get_argument("password", default="")
640 new_password = handler.get_argument("new_password", default="")
641 user = None
642 if not self.auth_enabled:
643 self.log.warning("Accepting anonymous login because auth fully disabled!")
644 return self.generate_anonymous_user(handler)
646 if self.passwd_check(typed_password) and not new_password:
647 return self.generate_anonymous_user(handler)
648 elif self.token and self.token == typed_password:
649 user = self.generate_anonymous_user(handler)
650 if new_password and self.allow_password_change:
651 config_dir = handler.settings.get("config_dir", "")
652 config_file = os.path.join(config_dir, "jupyter_server_config.json")
653 self.hashed_password = set_password(new_password, config_file=config_file)
654 self.log.info(_i18n(f"Wrote hashed password to {config_file}"))
656 return user
658 def validate_security(
659 self,
660 app: t.Any,
661 ssl_options: dict[str, t.Any] | None = None,
662 ) -> None:
663 """Handle security validation."""
664 super().validate_security(app, ssl_options)
665 if self.password_required and (not self.hashed_password):
666 self.log.critical(
667 _i18n("Jupyter servers are configured to only be run with a password.")
668 )
669 self.log.critical(_i18n("Hint: run the following command to set a password"))
670 self.log.critical(_i18n("\t$ python -m jupyter_server.auth password"))
671 sys.exit(1)
674class LegacyIdentityProvider(PasswordIdentityProvider):
675 """Legacy IdentityProvider for use with custom LoginHandlers
677 Login configuration has moved from LoginHandler to IdentityProvider
678 in Jupyter Server 2.0.
679 """
681 # settings must be passed for
682 settings = Dict()
684 @default("settings")
685 def _default_settings(self):
686 return {
687 "token": self.token,
688 "password": self.hashed_password,
689 }
691 @default("login_handler_class")
692 def _default_login_handler_class(self):
693 from .login import LegacyLoginHandler
695 return LegacyLoginHandler
697 @property
698 def auth_enabled(self):
699 return self.login_available
701 def get_user(self, handler: web.RequestHandler) -> User | None:
702 """Get the user."""
703 user = self.login_handler_class.get_user(handler) # type:ignore[attr-defined]
704 if user is None:
705 return None
706 return _backward_compat_user(user)
708 @property
709 def login_available(self) -> bool:
710 return bool(
711 self.login_handler_class.get_login_available( # type:ignore[attr-defined]
712 self.settings
713 )
714 )
716 def should_check_origin(self, handler: web.RequestHandler) -> bool:
717 """Whether we should check origin."""
718 return bool(self.login_handler_class.should_check_origin(handler)) # type:ignore[attr-defined]
720 def is_token_authenticated(self, handler: web.RequestHandler) -> bool:
721 """Whether we are token authenticated."""
722 return bool(self.login_handler_class.is_token_authenticated(handler)) # type:ignore[attr-defined]
724 def validate_security(
725 self,
726 app: t.Any,
727 ssl_options: dict[str, t.Any] | None = None,
728 ) -> None:
729 """Validate security."""
730 if self.password_required and (not self.hashed_password):
731 self.log.critical(
732 _i18n("Jupyter servers are configured to only be run with a password.")
733 )
734 self.log.critical(_i18n("Hint: run the following command to set a password"))
735 self.log.critical(_i18n("\t$ python -m jupyter_server.auth password"))
736 sys.exit(1)
737 self.login_handler_class.validate_security( # type:ignore[attr-defined]
738 app, ssl_options
739 )