1"""Identity Provider interface
2
3This defines the _authentication_ layer of Jupyter Server,
4to be used in combination with Authorizer for _authorization_.
5
6.. versionadded:: 2.0
7"""
8
9from __future__ import annotations
10
11import binascii
12import datetime
13import json
14import os
15import re
16import sys
17import typing as t
18import uuid
19from dataclasses import asdict, dataclass
20from http.cookies import Morsel
21
22from tornado import escape, httputil, web
23from traitlets import Bool, Dict, Type, Unicode, default
24from traitlets.config import LoggingConfigurable
25
26from jupyter_server.transutils import _i18n
27
28from .security import passwd_check, set_password
29from .utils import get_anonymous_username
30
31_non_alphanum = re.compile(r"[^A-Za-z0-9]")
32
33
34@dataclass
35class User:
36 """Object representing a User
37
38 This or a subclass should be returned from IdentityProvider.get_user
39 """
40
41 username: str # the only truly required field
42
43 # these fields are filled from username if not specified
44 # name is the 'real' name of the user
45 name: str = ""
46 # display_name is a shorter name for us in UI,
47 # if different from name. e.g. a nickname
48 display_name: str = ""
49
50 # these fields are left as None if undefined
51 initials: str | None = None
52 avatar_url: str | None = None
53 color: str | None = None
54
55 # TODO: extension fields?
56 # ext: Dict[str, Dict[str, Any]] = field(default_factory=dict)
57
58 def __post_init__(self):
59 self.fill_defaults()
60
61 def fill_defaults(self):
62 """Fill out default fields in the identity model
63
64 - Ensures all values are defined
65 - Fills out derivative values for name fields fields
66 - Fills out null values for optional fields
67 """
68
69 # username is the only truly required field
70 if not self.username:
71 msg = f"user.username must not be empty: {self}"
72 raise ValueError(msg)
73
74 # derive name fields from username -> name -> display name
75 if not self.name:
76 self.name = self.username
77 if not self.display_name:
78 self.display_name = self.name
79
80
81def _backward_compat_user(got_user: t.Any) -> User:
82 """Backward-compatibility for LoginHandler.get_user
83
84 Prior to 2.0, LoginHandler.get_user could return anything truthy.
85
86 Typically, this was either a simple string username,
87 or a simple dict.
88
89 Make some effort to allow common patterns to keep working.
90 """
91 if isinstance(got_user, str):
92 return User(username=got_user)
93 elif isinstance(got_user, dict):
94 kwargs = {}
95 if "username" not in got_user and "name" in got_user:
96 kwargs["username"] = got_user["name"]
97 for field in User.__dataclass_fields__:
98 if field in got_user:
99 kwargs[field] = got_user[field]
100 try:
101 return User(**kwargs)
102 except TypeError:
103 msg = f"Unrecognized user: {got_user}"
104 raise ValueError(msg) from None
105 else:
106 msg = f"Unrecognized user: {got_user}"
107 raise ValueError(msg)
108
109
110class IdentityProvider(LoggingConfigurable):
111 """
112 Interface for providing identity management and authentication.
113
114 Two principle methods:
115
116 - :meth:`~jupyter_server.auth.IdentityProvider.get_user` returns a :class:`~.User` object
117 for successful authentication, or None for no-identity-found.
118 - :meth:`~jupyter_server.auth.IdentityProvider.identity_model` turns a :class:`~jupyter_server.auth.User` into a JSONable dict.
119 The default is to use :py:meth:`dataclasses.asdict`,
120 and usually shouldn't need override.
121
122 Additional methods can customize authentication.
123
124 .. versionadded:: 2.0
125 """
126
127 cookie_name: str | Unicode[str, str | bytes] = Unicode(
128 "",
129 config=True,
130 help=_i18n("Name of the cookie to set for persisting login. Default: username-${Host}."),
131 )
132
133 cookie_options = Dict(
134 config=True,
135 help=_i18n(
136 "Extra keyword arguments to pass to `set_secure_cookie`."
137 " See tornado's set_secure_cookie docs for details."
138 ),
139 )
140
141 secure_cookie: bool | Bool[bool | None, bool | int | None] = Bool(
142 None,
143 allow_none=True,
144 config=True,
145 help=_i18n(
146 "Specify whether login cookie should have the `secure` property (HTTPS-only)."
147 "Only needed when protocol-detection gives the wrong answer due to proxies."
148 ),
149 )
150
151 get_secure_cookie_kwargs = Dict(
152 config=True,
153 help=_i18n(
154 "Extra keyword arguments to pass to `get_secure_cookie`."
155 " See tornado's get_secure_cookie docs for details."
156 ),
157 )
158
159 token: str | Unicode[str, str | bytes] = Unicode(
160 "<generated>",
161 help=_i18n(
162 """Token used for authenticating first-time connections to the server.
163
164 The token can be read from the file referenced by JUPYTER_TOKEN_FILE or set directly
165 with the JUPYTER_TOKEN environment variable.
166
167 When no password is enabled,
168 the default is to generate a new, random token.
169
170 Setting to an empty string disables authentication altogether, which is NOT RECOMMENDED.
171
172 Prior to 2.0: configured as ServerApp.token
173 """
174 ),
175 ).tag(config=True)
176
177 login_handler_class = Type(
178 default_value="jupyter_server.auth.login.LoginFormHandler",
179 klass=web.RequestHandler,
180 config=True,
181 help=_i18n("The login handler class to use, if any."),
182 )
183
184 logout_handler_class = Type(
185 default_value="jupyter_server.auth.logout.LogoutHandler",
186 klass=web.RequestHandler,
187 config=True,
188 help=_i18n("The logout handler class to use."),
189 )
190
191 token_generated = False
192
193 @default("token")
194 def _token_default(self):
195 if os.getenv("JUPYTER_TOKEN"):
196 self.token_generated = False
197 return os.environ["JUPYTER_TOKEN"]
198 if os.getenv("JUPYTER_TOKEN_FILE"):
199 self.token_generated = False
200 with open(os.environ["JUPYTER_TOKEN_FILE"]) as token_file:
201 return token_file.read()
202 if not self.need_token:
203 # no token if password is enabled
204 self.token_generated = False
205 return ""
206 else:
207 self.token_generated = True
208 return binascii.hexlify(os.urandom(24)).decode("ascii")
209
210 need_token: bool | Bool[bool, t.Union[bool, int]] = Bool(True)
211
212 def get_user(self, handler: web.RequestHandler) -> User | None | t.Awaitable[User | None]:
213 """Get the authenticated user for a request
214
215 Must return a :class:`jupyter_server.auth.User`,
216 though it may be a subclass.
217
218 Return None if the request is not authenticated.
219
220 _may_ be a coroutine
221 """
222 return self._get_user(handler)
223
224 # not sure how to have optional-async type signature
225 # on base class with `async def` without splitting it into two methods
226
227 async def _get_user(self, handler: web.RequestHandler) -> User | None:
228 """Get the user."""
229 if getattr(handler, "_jupyter_current_user", None):
230 # already authenticated
231 return t.cast(User, handler._jupyter_current_user) # type:ignore[attr-defined]
232 _token_user: User | None | t.Awaitable[User | None] = self.get_user_token(handler)
233 if isinstance(_token_user, t.Awaitable):
234 _token_user = await _token_user
235 token_user: User | None = _token_user # need second variable name to collapse type
236 _cookie_user = self.get_user_cookie(handler)
237 if isinstance(_cookie_user, t.Awaitable):
238 _cookie_user = await _cookie_user
239 cookie_user: User | None = _cookie_user
240 # prefer token to cookie if both given,
241 # because token is always explicit
242 user = token_user or cookie_user
243
244 if user is not None and token_user is not None:
245 # if token-authenticated, persist user_id in cookie
246 # if it hasn't already been stored there
247 if user != cookie_user:
248 self.set_login_cookie(handler, user)
249 # Record that the current request has been authenticated with a token.
250 # Used in is_token_authenticated above.
251 handler._token_authenticated = True # type:ignore[attr-defined]
252
253 if user is None:
254 # If an invalid cookie was sent, clear it to prevent unnecessary
255 # extra warnings. But don't do this on a request with *no* cookie,
256 # because that can erroneously log you out (see gh-3365)
257 cookie_name = self.get_cookie_name(handler)
258 cookie = handler.get_cookie(cookie_name)
259 if cookie is not None:
260 self.log.warning(f"Clearing invalid/expired login cookie {cookie_name}")
261 self.clear_login_cookie(handler)
262 if not self.auth_enabled:
263 # Completely insecure! No authentication at all.
264 # No need to warn here, though; validate_security will have already done that.
265 user = self.generate_anonymous_user(handler)
266 # persist user on first request
267 # so the user data is stable for a given browser session
268 self.set_login_cookie(handler, user)
269
270 return user
271
272 def identity_model(self, user: User) -> dict[str, t.Any]:
273 """Return a User as an Identity model"""
274 # TODO: validate?
275 return asdict(user)
276
277 def get_handlers(self) -> list[tuple[str, object]]:
278 """Return list of additional handlers for this identity provider
279
280 For example, an OAuth callback handler.
281 """
282 handlers = []
283 if self.login_available:
284 handlers.append((r"/login", self.login_handler_class))
285 if self.logout_available:
286 handlers.append((r"/logout", self.logout_handler_class))
287 return handlers
288
289 def user_to_cookie(self, user: User) -> str:
290 """Serialize a user to a string for storage in a cookie
291
292 If overriding in a subclass, make sure to define user_from_cookie as well.
293
294 Default is just the user's username.
295 """
296 # default: username is enough
297 cookie = json.dumps(
298 {
299 "username": user.username,
300 "name": user.name,
301 "display_name": user.display_name,
302 "initials": user.initials,
303 "color": user.color,
304 }
305 )
306 return cookie
307
308 def user_from_cookie(self, cookie_value: str) -> User | None:
309 """Inverse of user_to_cookie"""
310 user = json.loads(cookie_value)
311 return User(
312 user["username"],
313 user["name"],
314 user["display_name"],
315 user["initials"],
316 None,
317 user["color"],
318 )
319
320 def get_cookie_name(self, handler: web.RequestHandler) -> str:
321 """Return the login cookie name
322
323 Uses IdentityProvider.cookie_name, if defined.
324 Default is to generate a string taking host into account to avoid
325 collisions for multiple servers on one hostname with different ports.
326 """
327 if self.cookie_name:
328 return self.cookie_name
329 else:
330 return _non_alphanum.sub("-", f"username-{handler.request.host}")
331
332 def set_login_cookie(self, handler: web.RequestHandler, user: User) -> None:
333 """Call this on handlers to set the login cookie for success"""
334 cookie_options = {}
335 cookie_options.update(self.cookie_options)
336 cookie_options.setdefault("httponly", True)
337 # tornado <4.2 has a bug that considers secure==True as soon as
338 # 'secure' kwarg is passed to set_secure_cookie
339 secure_cookie = self.secure_cookie
340 if secure_cookie is None:
341 secure_cookie = handler.request.protocol == "https"
342 if secure_cookie:
343 cookie_options.setdefault("secure", True)
344 cookie_options.setdefault("path", handler.base_url) # type:ignore[attr-defined]
345 cookie_name = self.get_cookie_name(handler)
346 handler.set_secure_cookie(cookie_name, self.user_to_cookie(user), **cookie_options)
347
348 def _force_clear_cookie(
349 self, handler: web.RequestHandler, name: str, path: str = "/", domain: str | None = None
350 ) -> None:
351 """Deletes the cookie with the given name.
352
353 Tornado's cookie handling currently (Jan 2018) stores cookies in a dict
354 keyed by name, so it can only modify one cookie with a given name per
355 response. The browser can store multiple cookies with the same name
356 but different domains and/or paths. This method lets us clear multiple
357 cookies with the same name.
358
359 Due to limitations of the cookie protocol, you must pass the same
360 path and domain to clear a cookie as were used when that cookie
361 was set (but there is no way to find out on the server side
362 which values were used for a given cookie).
363 """
364 name = escape.native_str(name)
365 expires = datetime.datetime.now(tz=datetime.timezone.utc) - datetime.timedelta(days=365)
366
367 morsel: Morsel[t.Any] = Morsel()
368 morsel.set(name, "", '""')
369 morsel["expires"] = httputil.format_timestamp(expires)
370 morsel["path"] = path
371 if domain:
372 morsel["domain"] = domain
373 handler.add_header("Set-Cookie", morsel.OutputString())
374
375 def clear_login_cookie(self, handler: web.RequestHandler) -> None:
376 """Clear the login cookie, effectively logging out the session."""
377 cookie_options = {}
378 cookie_options.update(self.cookie_options)
379 path = cookie_options.setdefault("path", handler.base_url) # type:ignore[attr-defined]
380 cookie_name = self.get_cookie_name(handler)
381 handler.clear_cookie(cookie_name, path=path)
382 if path and path != "/":
383 # also clear cookie on / to ensure old cookies are cleared
384 # after the change in path behavior.
385 # N.B. This bypasses the normal cookie handling, which can't update
386 # two cookies with the same name. See the method above.
387 self._force_clear_cookie(handler, cookie_name)
388
389 def get_user_cookie(
390 self, handler: web.RequestHandler
391 ) -> User | None | t.Awaitable[User | None]:
392 """Get user from a cookie
393
394 Calls user_from_cookie to deserialize cookie value
395 """
396 _user_cookie = handler.get_secure_cookie(
397 self.get_cookie_name(handler),
398 **self.get_secure_cookie_kwargs,
399 )
400 if not _user_cookie:
401 return None
402 user_cookie = _user_cookie.decode()
403 # TODO: try/catch in case of change in config?
404 try:
405 return self.user_from_cookie(user_cookie)
406 except Exception as e:
407 # log bad cookie itself, only at debug-level
408 self.log.debug(f"Error unpacking user from cookie: cookie={user_cookie}", exc_info=True)
409 self.log.error(f"Error unpacking user from cookie: {e}")
410 return None
411
412 auth_header_pat = re.compile(r"(token|bearer)\s+(.+)", re.IGNORECASE)
413
414 def get_token(self, handler: web.RequestHandler) -> str | None:
415 """Get the user token from a request
416
417 Default:
418
419 - in URL parameters: ?token=<token>
420 - in header: Authorization: token <token>
421 """
422 user_token = handler.get_argument("token", "")
423 if not user_token:
424 # get it from Authorization header
425 m = self.auth_header_pat.match(handler.request.headers.get("Authorization", ""))
426 if m:
427 user_token = m.group(2)
428 return user_token
429
430 async def get_user_token(self, handler: web.RequestHandler) -> User | None:
431 """Identify the user based on a token in the URL or Authorization header
432
433 Returns:
434 - uuid if authenticated
435 - None if not
436 """
437 token = t.cast("str | None", handler.token) # type:ignore[attr-defined]
438 if not token:
439 return None
440 # check login token from URL argument or Authorization header
441 user_token = self.get_token(handler)
442 authenticated = False
443 if user_token == token:
444 # token-authenticated, set the login cookie
445 self.log.debug(
446 "Accepting token-authenticated request from %s",
447 handler.request.remote_ip,
448 )
449 authenticated = True
450
451 if authenticated:
452 # token does not correspond to user-id,
453 # which is stored in a cookie.
454 # still check the cookie for the user id
455 _user = self.get_user_cookie(handler)
456 if isinstance(_user, t.Awaitable):
457 _user = await _user
458 user: User | None = _user
459 if user is None:
460 user = self.generate_anonymous_user(handler)
461 return user
462 else:
463 return None
464
465 def generate_anonymous_user(self, handler: web.RequestHandler) -> User:
466 """Generate a random anonymous user.
467
468 For use when a single shared token is used,
469 but does not identify a user.
470 """
471 user_id = uuid.uuid4().hex
472 moon = get_anonymous_username()
473 name = display_name = f"Anonymous {moon}"
474 initials = f"A{moon[0]}"
475 color = None
476 handler.log.debug(f"Generating new user for token-authenticated request: {user_id}") # type:ignore[attr-defined]
477 return User(user_id, name, display_name, initials, None, color)
478
479 def should_check_origin(self, handler: web.RequestHandler) -> bool:
480 """Should the Handler check for CORS origin validation?
481
482 Origin check should be skipped for token-authenticated requests.
483
484 Returns:
485 - True, if Handler must check for valid CORS origin.
486 - False, if Handler should skip origin check since requests are token-authenticated.
487 """
488 return not self.is_token_authenticated(handler)
489
490 def is_token_authenticated(self, handler: web.RequestHandler) -> bool:
491 """Returns True if handler has been token authenticated. Otherwise, False.
492
493 Login with a token is used to signal certain things, such as:
494
495 - permit access to REST API
496 - xsrf protection
497 - skip origin-checks for scripts
498 """
499 # ensure get_user has been called, so we know if we're token-authenticated
500 handler.current_user # noqa: B018
501 return getattr(handler, "_token_authenticated", False)
502
503 def validate_security(
504 self,
505 app: t.Any,
506 ssl_options: dict[str, t.Any] | None = None,
507 ) -> None:
508 """Check the application's security.
509
510 Show messages, or abort if necessary, based on the security configuration.
511 """
512 if not app.ip:
513 warning = "WARNING: The Jupyter server is listening on all IP addresses"
514 if ssl_options is None:
515 app.log.warning(f"{warning} and not using encryption. This is not recommended.")
516 if not self.auth_enabled:
517 app.log.warning(
518 f"{warning} and not using authentication. "
519 "This is highly insecure and not recommended."
520 )
521 elif not self.auth_enabled:
522 app.log.warning(
523 "All authentication is disabled."
524 " Anyone who can connect to this server will be able to run code."
525 )
526
527 def process_login_form(self, handler: web.RequestHandler) -> User | None:
528 """Process login form data
529
530 Return authenticated User if successful, None if not.
531 """
532 typed_password = handler.get_argument("password", default="")
533 user = None
534 if not self.auth_enabled:
535 self.log.warning("Accepting anonymous login because auth fully disabled!")
536 return self.generate_anonymous_user(handler)
537
538 if self.token and self.token == typed_password:
539 return t.cast(User, self.user_for_token(typed_password)) # type:ignore[attr-defined]
540
541 return user
542
543 @property
544 def auth_enabled(self):
545 """Is authentication enabled?
546
547 Should always be True, but may be False in rare, insecure cases
548 where requests with no auth are allowed.
549
550 Previously: LoginHandler.get_login_available
551 """
552 return True
553
554 @property
555 def login_available(self):
556 """Whether a LoginHandler is needed - and therefore whether the login page should be displayed."""
557 return self.auth_enabled
558
559 @property
560 def logout_available(self):
561 """Whether a LogoutHandler is needed."""
562 return True
563
564
565class PasswordIdentityProvider(IdentityProvider):
566 """A password identity provider."""
567
568 hashed_password = Unicode(
569 "",
570 config=True,
571 help=_i18n(
572 """
573 Hashed password to use for web authentication.
574
575 To generate, type in a python/IPython shell:
576
577 from jupyter_server.auth import passwd; passwd()
578
579 The string should be of the form type:salt:hashed-password.
580 """
581 ),
582 )
583
584 password_required = Bool(
585 False,
586 config=True,
587 help=_i18n(
588 """
589 Forces users to use a password for the Jupyter server.
590 This is useful in a multi user environment, for instance when
591 everybody in the LAN can access each other's machine through ssh.
592
593 In such a case, serving on localhost is not secure since
594 any user can connect to the Jupyter server via ssh.
595
596 """
597 ),
598 )
599
600 allow_password_change = Bool(
601 True,
602 config=True,
603 help=_i18n(
604 """
605 Allow password to be changed at login for the Jupyter server.
606
607 While logging in with a token, the Jupyter server UI will give the opportunity to
608 the user to enter a new password at the same time that will replace
609 the token login mechanism.
610
611 This can be set to False to prevent changing password from the UI/API.
612 """
613 ),
614 )
615
616 @default("need_token")
617 def _need_token_default(self):
618 return not bool(self.hashed_password)
619
620 @property
621 def login_available(self) -> bool:
622 """Whether a LoginHandler is needed - and therefore whether the login page should be displayed."""
623 return self.auth_enabled
624
625 @property
626 def auth_enabled(self) -> bool:
627 """Return whether any auth is enabled"""
628 return bool(self.hashed_password or self.token)
629
630 def passwd_check(self, password):
631 """Check password against our stored hashed password"""
632 return passwd_check(self.hashed_password, password)
633
634 def process_login_form(self, handler: web.RequestHandler) -> User | None:
635 """Process login form data
636
637 Return authenticated User if successful, None if not.
638 """
639 typed_password = handler.get_argument("password", default="")
640 new_password = handler.get_argument("new_password", default="")
641 user = None
642 if not self.auth_enabled:
643 self.log.warning("Accepting anonymous login because auth fully disabled!")
644 return self.generate_anonymous_user(handler)
645
646 if self.passwd_check(typed_password) and not new_password:
647 return self.generate_anonymous_user(handler)
648 elif self.token and self.token == typed_password:
649 user = self.generate_anonymous_user(handler)
650 if new_password and self.allow_password_change:
651 config_dir = handler.settings.get("config_dir", "")
652 config_file = os.path.join(config_dir, "jupyter_server_config.json")
653 self.hashed_password = set_password(new_password, config_file=config_file)
654 self.log.info(_i18n(f"Wrote hashed password to {config_file}"))
655
656 return user
657
658 def validate_security(
659 self,
660 app: t.Any,
661 ssl_options: dict[str, t.Any] | None = None,
662 ) -> None:
663 """Handle security validation."""
664 super().validate_security(app, ssl_options)
665 if self.password_required and (not self.hashed_password):
666 self.log.critical(
667 _i18n("Jupyter servers are configured to only be run with a password.")
668 )
669 self.log.critical(_i18n("Hint: run the following command to set a password"))
670 self.log.critical(_i18n("\t$ python -m jupyter_server.auth password"))
671 sys.exit(1)
672
673
674class LegacyIdentityProvider(PasswordIdentityProvider):
675 """Legacy IdentityProvider for use with custom LoginHandlers
676
677 Login configuration has moved from LoginHandler to IdentityProvider
678 in Jupyter Server 2.0.
679 """
680
681 # settings must be passed for
682 settings = Dict()
683
684 @default("settings")
685 def _default_settings(self):
686 return {
687 "token": self.token,
688 "password": self.hashed_password,
689 }
690
691 @default("login_handler_class")
692 def _default_login_handler_class(self):
693 from .login import LegacyLoginHandler
694
695 return LegacyLoginHandler
696
697 @property
698 def auth_enabled(self):
699 return self.login_available
700
701 def get_user(self, handler: web.RequestHandler) -> User | None:
702 """Get the user."""
703 user = self.login_handler_class.get_user(handler) # type:ignore[attr-defined]
704 if user is None:
705 return None
706 return _backward_compat_user(user)
707
708 @property
709 def login_available(self) -> bool:
710 return bool(
711 self.login_handler_class.get_login_available( # type:ignore[attr-defined]
712 self.settings
713 )
714 )
715
716 def should_check_origin(self, handler: web.RequestHandler) -> bool:
717 """Whether we should check origin."""
718 return bool(self.login_handler_class.should_check_origin(handler)) # type:ignore[attr-defined]
719
720 def is_token_authenticated(self, handler: web.RequestHandler) -> bool:
721 """Whether we are token authenticated."""
722 return bool(self.login_handler_class.is_token_authenticated(handler)) # type:ignore[attr-defined]
723
724 def validate_security(
725 self,
726 app: t.Any,
727 ssl_options: dict[str, t.Any] | None = None,
728 ) -> None:
729 """Validate security."""
730 if self.password_required and (not self.hashed_password):
731 self.log.critical(
732 _i18n("Jupyter servers are configured to only be run with a password.")
733 )
734 self.log.critical(_i18n("Hint: run the following command to set a password"))
735 self.log.critical(_i18n("\t$ python -m jupyter_server.auth password"))
736 sys.exit(1)
737 self.login_handler_class.validate_security( # type:ignore[attr-defined]
738 app, ssl_options
739 )