1"""Decorator for layering authorization into JupyterHandlers."""
2
3# Copyright (c) Jupyter Development Team.
4# Distributed under the terms of the Modified BSD License.
5import asyncio
6from functools import wraps
7from typing import Any, Callable, Optional, TypeVar, Union, cast
8
9from jupyter_core.utils import ensure_async
10from tornado.log import app_log
11from tornado.web import HTTPError
12
13from .utils import HTTP_METHOD_TO_AUTH_ACTION
14
15FuncT = TypeVar("FuncT", bound=Callable[..., Any])
16
17
18def authorized(
19 action: Optional[Union[str, FuncT]] = None,
20 resource: Optional[str] = None,
21 message: Optional[str] = None,
22) -> FuncT:
23 """A decorator for tornado.web.RequestHandler methods
24 that verifies whether the current user is authorized
25 to make the following request.
26
27 Helpful for adding an 'authorization' layer to
28 a REST API.
29
30 .. versionadded:: 2.0
31
32 Parameters
33 ----------
34 action : str
35 the type of permission or action to check.
36
37 resource: str or None
38 the name of the resource the action is being authorized
39 to access.
40
41 message : str or none
42 a message for the unauthorized action.
43 """
44
45 def wrapper(method):
46 @wraps(method)
47 async def inner(self, *args, **kwargs):
48 # default values for action, resource
49 nonlocal action
50 nonlocal resource
51 nonlocal message
52 if action is None:
53 http_method = self.request.method.upper()
54 action = HTTP_METHOD_TO_AUTH_ACTION[http_method]
55 if resource is None:
56 resource = self.auth_resource
57 if message is None:
58 message = f"User is not authorized to {action} on resource: {resource}."
59
60 user = self.current_user
61 if not user:
62 app_log.warning("Attempting to authorize request without authentication!")
63 raise HTTPError(status_code=403, log_message=message)
64 # If the user is allowed to do this action,
65 # call the method.
66 authorized = await ensure_async(
67 self.authorizer.is_authorized(self, user, action, resource)
68 )
69 if authorized:
70 out = method(self, *args, **kwargs)
71 # If the method is a coroutine, await it
72 if asyncio.iscoroutine(out):
73 return await out
74 return out
75 # else raise an exception.
76 else:
77 raise HTTPError(status_code=403, log_message=message)
78
79 return inner
80
81 if callable(action):
82 method = action
83 action = None
84 # no-arguments `@authorized` decorator called
85 return cast(FuncT, wrapper(method))
86
87 return cast(FuncT, wrapper)
88
89
90def allow_unauthenticated(method: FuncT) -> FuncT:
91 """A decorator for tornado.web.RequestHandler methods
92 that allows any user to make the following request.
93
94 Selectively disables the 'authentication' layer of REST API which
95 is active when `ServerApp.allow_unauthenticated_access = False`.
96
97 To be used exclusively on endpoints which may be considered public,
98 for example the login page handler.
99
100 .. versionadded:: 2.13
101
102 Parameters
103 ----------
104 method : bound callable
105 the endpoint method to remove authentication from.
106 """
107
108 @wraps(method)
109 def wrapper(self, *args, **kwargs):
110 return method(self, *args, **kwargs)
111
112 setattr(wrapper, "__allow_unauthenticated", True)
113
114 return cast(FuncT, wrapper)
115
116
117def ws_authenticated(method: FuncT) -> FuncT:
118 """A decorator for websockets derived from `WebSocketHandler`
119 that authenticates user before allowing to proceed.
120
121 Differently from tornado.web.authenticated, does not redirect
122 to the login page, which would be meaningless for websockets.
123
124 .. versionadded:: 2.13
125
126 Parameters
127 ----------
128 method : bound callable
129 the endpoint method to add authentication for.
130 """
131
132 @wraps(method)
133 def wrapper(self, *args, **kwargs):
134 user = self.current_user
135 if user is None:
136 self.log.warning("Couldn't authenticate WebSocket connection")
137 raise HTTPError(403)
138 return method(self, *args, **kwargs)
139
140 setattr(wrapper, "__allow_unauthenticated", False)
141
142 return cast(FuncT, wrapper)