1# The MIT License (MIT)
2#
3# Copyright (c) 2019 Looker Data Sciences, Inc.
4#
5# Permission is hereby granted, free of charge, to any person obtaining a copy
6# of this software and associated documentation files (the "Software"), to deal
7# in the Software without restriction, including without limitation the rights
8# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
9# copies of the Software, and to permit persons to whom the Software is
10# furnished to do so, subject to the following conditions:
11#
12# The above copyright notice and this permission notice shall be included in
13# all copies or substantial portions of the Software.
14#
15# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
16# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
17# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
18# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
19# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
20# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
21# THE SOFTWARE.
22
23"""AuthSession to provide automatic authentication
24"""
25import hashlib
26import secrets
27from typing import cast, Dict, Optional, Union
28import urllib.parse
29
30import attr
31
32from looker_sdk import error
33from looker_sdk.rtl import api_settings
34from looker_sdk.rtl import auth_token
35from looker_sdk.rtl import model
36from looker_sdk.rtl import serialize
37from looker_sdk.rtl import transport
38
39
40class AuthSession:
41 """AuthSession to provide automatic authentication"""
42
43 def __init__(
44 self,
45 settings: api_settings.PApiSettings,
46 transport: transport.Transport,
47 deserialize: serialize.TDeserialize,
48 api_version: str,
49 ):
50 settings.is_configured()
51 self.settings = settings
52 self.api_version = api_version
53 self.sudo_token: auth_token.AuthToken = auth_token.AuthToken()
54 self.token: auth_token.AuthToken = auth_token.AuthToken()
55 self._sudo_id: Optional[int] = None
56 self.transport = transport
57 self.deserialize = deserialize
58 self.token_model = auth_token.AccessToken
59
60 def _is_authenticated(self, token: auth_token.AuthToken) -> bool:
61 """Determines if current token is active."""
62 if not (token.access_token):
63 return False
64 return token.is_active
65
66 @property
67 def is_sudo_authenticated(self) -> bool:
68 return self._is_authenticated(self.sudo_token)
69
70 @property
71 def is_authenticated(self) -> bool:
72 return self._is_authenticated(self.token)
73
74 def _get_sudo_token(
75 self, transport_options: transport.TransportOptions
76 ) -> auth_token.AuthToken:
77 """Returns an active sudo token."""
78 if not self.is_sudo_authenticated:
79 self._login_sudo(transport_options)
80 return self.sudo_token
81
82 def _get_token(
83 self, transport_options: transport.TransportOptions
84 ) -> auth_token.AuthToken:
85 """Returns an active token."""
86 if not self.is_authenticated:
87 self._login(transport_options)
88 return self.token
89
90 def authenticate(
91 self, transport_options: transport.TransportOptions
92 ) -> Dict[str, str]:
93 """Return the Authorization header to authenticate each API call.
94
95 Expired token renewal happens automatically.
96 """
97 if self._sudo_id:
98 token = self._get_sudo_token(transport_options)
99 else:
100 token = self._get_token(transport_options)
101
102 return {"Authorization": f"Bearer {token.access_token}"}
103
104 def login_user(
105 self,
106 sudo_id: int,
107 transport_options: Optional[transport.TransportOptions] = None,
108 ) -> None:
109 """Authenticate using settings credentials and sudo as sudo_id.
110
111 Make API calls as if authenticated as sudo_id. The sudo_id
112 token is automatically renewed when it expires. In order to
113 subsequently login_user() as another user you must first logout()
114 """
115 if self._sudo_id is None:
116 self._sudo_id = sudo_id
117 try:
118 self._login_sudo(transport_options or {})
119 except error.SDKError:
120 self._sudo_id = None
121 raise
122
123 else:
124 if self._sudo_id != sudo_id:
125 raise error.SDKError(
126 f"Another user ({self._sudo_id}) "
127 "is already logged in. Log them out first."
128 )
129 elif not self.is_sudo_authenticated:
130 self._login_sudo(transport_options or {})
131
132 def _login(self, transport_options: transport.TransportOptions) -> None:
133 client_id = self.settings.read_config().get("client_id")
134 client_secret = self.settings.read_config().get("client_secret")
135 if not (client_id and client_secret):
136 raise error.SDKError("Required auth credentials not found.")
137
138 login = {
139 "client_id": cast(str, client_id),
140 "client_secret": cast(str, client_secret),
141 }
142
143 serialized = urllib.parse.urlencode(login).encode("utf-8")
144
145 transport_options.setdefault("headers", {}).update(
146 {"Content-Type": "application/x-www-form-urlencoded"}
147 )
148 response = self._ok(
149 self.transport.request(
150 transport.HttpMethod.POST,
151 f"{self.settings.base_url}/api/{self.api_version}/login",
152 body=serialized,
153 transport_options=transport_options,
154 )
155 )
156
157 # ignore type: mypy bug doesn't recognized kwarg `structure` to partial func
158 access_token = self.deserialize(
159 data=response, structure=self.token_model
160 ) # type: ignore
161 assert isinstance(access_token, auth_token.AccessToken)
162 self.token = auth_token.AuthToken(access_token)
163
164 def _login_sudo(self, transport_options: transport.TransportOptions) -> None:
165 def authenticator(
166 transport_options: transport.TransportOptions,
167 ) -> Dict[str, str]:
168 return {
169 "Authorization": f"Bearer {self._get_token(transport_options).access_token}"
170 }
171
172 response = self._ok(
173 self.transport.request(
174 transport.HttpMethod.POST,
175 f"{self.settings.base_url}/api/{self.api_version}/login/{self._sudo_id}",
176 authenticator=authenticator,
177 transport_options=transport_options,
178 )
179 )
180 # ignore type: mypy bug doesn't recognized kwarg `structure` to partial func
181 access_token = self.deserialize(
182 data=response, structure=self.token_model
183 ) # type: ignore
184 assert isinstance(access_token, auth_token.AccessToken)
185 self.sudo_token = auth_token.AuthToken(access_token)
186
187 def logout(
188 self,
189 full: bool = False,
190 transport_options: Optional[transport.TransportOptions] = None,
191 ) -> None:
192 """Logout of API.
193
194 If the session is authenticated as sudo_id, logout() "undoes"
195 the sudo and deactivates that sudo_id's current token. By default
196 the current api3credential session is active at which point
197 you can continue to make API calls as the api3credential user
198 or logout(). If you want to logout completely in one step pass
199 full=True
200 """
201 if self._sudo_id:
202 self._sudo_id = None
203 if self.is_sudo_authenticated:
204 self._logout(sudo=True, transport_options=transport_options)
205 if full:
206 self._logout(transport_options=transport_options)
207
208 elif self.is_authenticated:
209 self._logout(transport_options=transport_options)
210
211 def _logout(
212 self,
213 sudo: bool = False,
214 transport_options: Optional[transport.TransportOptions] = None,
215 ) -> None:
216
217 if sudo:
218 token = self.sudo_token.access_token
219 self.sudo_token = auth_token.AuthToken()
220 else:
221 token = self.token.access_token
222 self.token = auth_token.AuthToken()
223
224 def authenticator(
225 _transport_options: transport.TransportOptions,
226 ) -> Dict[str, str]:
227 return {"Authorization": f"Bearer {token}"}
228
229 self._ok(
230 self.transport.request(
231 transport.HttpMethod.DELETE,
232 f"{self.settings.base_url}/api/logout",
233 authenticator=authenticator,
234 transport_options=transport_options,
235 )
236 )
237
238 def _ok(self, response: transport.Response) -> str:
239 if not response.ok:
240 raise error.SDKError(response.value.decode(encoding="utf-8"))
241 return response.value.decode(encoding="utf-8")
242
243
244class CryptoHash:
245 def secure_random(self, byte_count: int) -> str:
246 return secrets.token_urlsafe(byte_count)
247
248 def sha256_hash(self, message: str) -> str:
249 value = hashlib.sha256()
250 value.update(bytes(message, "utf8"))
251 return value.hexdigest()
252
253
254class OAuthSession(AuthSession):
255 def __init__(
256 self,
257 *,
258 settings: api_settings.PApiSettings,
259 transport: transport.Transport,
260 deserialize: serialize.TDeserialize,
261 serialize: serialize.TSerialize,
262 crypto: CryptoHash,
263 version: str,
264 ):
265 super().__init__(settings, transport, deserialize, version)
266 self.crypto = crypto
267 self.serialize = serialize
268 config_data = self.settings.read_config()
269 for required in ["client_id", "redirect_uri", "looker_url"]:
270 if required not in config_data:
271 raise error.SDKError(f"Missing required configuration value {required}")
272
273 # would have prefered using setattr(self, required, ...) in loop above
274 # but mypy can't follow it
275 self.client_id = config_data["client_id"]
276 self.redirect_uri = config_data.get("redirect_uri", "")
277 self.looker_url = config_data.get("looker_url", "")
278 self.code_verifier = ""
279
280 def create_auth_code_request_url(self, scope: str, state: str) -> str:
281 self.code_verifier = self.crypto.secure_random(32)
282 code_challenge = self.crypto.sha256_hash(self.code_verifier)
283 params: Dict[str, str] = {
284 "response_type": "code",
285 "client_id": self.client_id,
286 "redirect_uri": self.redirect_uri,
287 "scope": scope,
288 "state": state,
289 "code_challenge_method": "S256",
290 "code_challenge": code_challenge,
291 }
292 path = urllib.parse.urljoin(self.looker_url, "auth")
293 query = urllib.parse.urlencode(params)
294 return f"{path}?{query}"
295
296 @attr.s(auto_attribs=True, kw_only=True)
297 class GrantTypeParams(model.Model):
298 client_id: str
299 redirect_uri: str
300
301 @attr.s(auto_attribs=True, kw_only=True)
302 class AuthCodeGrantTypeParams(GrantTypeParams):
303 code: str
304 code_verifier: str
305 grant_type: str = "authorization_code"
306
307 @attr.s(auto_attribs=True, kw_only=True)
308 class RefreshTokenGrantTypeParams(GrantTypeParams):
309 refresh_token: str
310 grant_type: str = "refresh_token"
311
312 def _request_token(
313 self,
314 grant_type: Union[AuthCodeGrantTypeParams, RefreshTokenGrantTypeParams],
315 transport_options: transport.TransportOptions,
316 ) -> auth_token.AccessToken:
317 response = self.transport.request(
318 transport.HttpMethod.POST,
319 urllib.parse.urljoin(self.settings.base_url, "/api/token"),
320 body=self.serialize(api_model=grant_type), # type: ignore
321 )
322 if not response.ok:
323 raise error.SDKError(response.value.decode(encoding=response.encoding))
324
325 # ignore type: mypy bug doesn't recognized kwarg `structure` to partial func
326 return self.deserialize(
327 data=response.value, structure=self.token_model
328 ) # type: ignore
329
330 def redeem_auth_code(
331 self,
332 auth_code: str,
333 code_verifier: Optional[str] = None,
334 transport_options: Optional[transport.TransportOptions] = None,
335 ) -> None:
336 params = self.AuthCodeGrantTypeParams(
337 client_id=self.client_id,
338 redirect_uri=self.redirect_uri,
339 code=auth_code,
340 code_verifier=code_verifier or self.code_verifier,
341 )
342
343 access_token = self._request_token(params, transport_options or {})
344 self.token = auth_token.AuthToken(access_token)
345
346 def _login(self, transport_options: transport.TransportOptions) -> None:
347 params = self.RefreshTokenGrantTypeParams(
348 client_id=self.client_id,
349 redirect_uri=self.redirect_uri,
350 refresh_token=self.token.refresh_token,
351 )
352 access_token = self._request_token(params, transport_options)
353 self.token = auth_token.AuthToken(access_token)