Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/looker_sdk/rtl/auth_session.py: 39%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

153 statements  

1# The MIT License (MIT) 

2# 

3# Copyright (c) 2019 Looker Data Sciences, Inc. 

4# 

5# Permission is hereby granted, free of charge, to any person obtaining a copy 

6# of this software and associated documentation files (the "Software"), to deal 

7# in the Software without restriction, including without limitation the rights 

8# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell 

9# copies of the Software, and to permit persons to whom the Software is 

10# furnished to do so, subject to the following conditions: 

11# 

12# The above copyright notice and this permission notice shall be included in 

13# all copies or substantial portions of the Software. 

14# 

15# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR 

16# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, 

17# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE 

18# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER 

19# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, 

20# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN 

21# THE SOFTWARE. 

22 

23"""AuthSession to provide automatic authentication 

24""" 

25import hashlib 

26import secrets 

27from typing import cast, Dict, Optional, Union 

28import urllib.parse 

29 

30import attr 

31 

32from looker_sdk import error 

33from looker_sdk.rtl import api_settings 

34from looker_sdk.rtl import auth_token 

35from looker_sdk.rtl import model 

36from looker_sdk.rtl import serialize 

37from looker_sdk.rtl import transport 

38 

39 

40class AuthSession: 

41 """AuthSession to provide automatic authentication""" 

42 

43 def __init__( 

44 self, 

45 settings: api_settings.PApiSettings, 

46 transport: transport.Transport, 

47 deserialize: serialize.TDeserialize, 

48 api_version: str, 

49 ): 

50 settings.is_configured() 

51 self.settings = settings 

52 self.api_version = api_version 

53 self.sudo_token: auth_token.AuthToken = auth_token.AuthToken() 

54 self.token: auth_token.AuthToken = auth_token.AuthToken() 

55 self._sudo_id: Optional[int] = None 

56 self.transport = transport 

57 self.deserialize = deserialize 

58 self.token_model = auth_token.AccessToken 

59 

60 def _is_authenticated(self, token: auth_token.AuthToken) -> bool: 

61 """Determines if current token is active.""" 

62 if not (token.access_token): 

63 return False 

64 return token.is_active 

65 

66 @property 

67 def is_sudo_authenticated(self) -> bool: 

68 return self._is_authenticated(self.sudo_token) 

69 

70 @property 

71 def is_authenticated(self) -> bool: 

72 return self._is_authenticated(self.token) 

73 

74 def _get_sudo_token( 

75 self, transport_options: transport.TransportOptions 

76 ) -> auth_token.AuthToken: 

77 """Returns an active sudo token.""" 

78 if not self.is_sudo_authenticated: 

79 self._login_sudo(transport_options) 

80 return self.sudo_token 

81 

82 def _get_token( 

83 self, transport_options: transport.TransportOptions 

84 ) -> auth_token.AuthToken: 

85 """Returns an active token.""" 

86 if not self.is_authenticated: 

87 self._login(transport_options) 

88 return self.token 

89 

90 def authenticate( 

91 self, transport_options: transport.TransportOptions 

92 ) -> Dict[str, str]: 

93 """Return the Authorization header to authenticate each API call. 

94 

95 Expired token renewal happens automatically. 

96 """ 

97 if self._sudo_id: 

98 token = self._get_sudo_token(transport_options) 

99 else: 

100 token = self._get_token(transport_options) 

101 

102 return {"Authorization": f"Bearer {token.access_token}"} 

103 

104 def login_user( 

105 self, 

106 sudo_id: int, 

107 transport_options: Optional[transport.TransportOptions] = None, 

108 ) -> None: 

109 """Authenticate using settings credentials and sudo as sudo_id. 

110 

111 Make API calls as if authenticated as sudo_id. The sudo_id 

112 token is automatically renewed when it expires. In order to 

113 subsequently login_user() as another user you must first logout() 

114 """ 

115 if self._sudo_id is None: 

116 self._sudo_id = sudo_id 

117 try: 

118 self._login_sudo(transport_options or {}) 

119 except error.SDKError: 

120 self._sudo_id = None 

121 raise 

122 

123 else: 

124 if self._sudo_id != sudo_id: 

125 raise error.SDKError( 

126 f"Another user ({self._sudo_id}) " 

127 "is already logged in. Log them out first." 

128 ) 

129 elif not self.is_sudo_authenticated: 

130 self._login_sudo(transport_options or {}) 

131 

132 def _login(self, transport_options: transport.TransportOptions) -> None: 

133 client_id = self.settings.read_config().get("client_id") 

134 client_secret = self.settings.read_config().get("client_secret") 

135 if not (client_id and client_secret): 

136 raise error.SDKError("Required auth credentials not found.") 

137 

138 login = { 

139 "client_id": cast(str, client_id), 

140 "client_secret": cast(str, client_secret), 

141 } 

142 

143 serialized = urllib.parse.urlencode(login).encode("utf-8") 

144 

145 transport_options.setdefault("headers", {}).update( 

146 {"Content-Type": "application/x-www-form-urlencoded"} 

147 ) 

148 response = self._ok( 

149 self.transport.request( 

150 transport.HttpMethod.POST, 

151 f"{self.settings.base_url}/api/{self.api_version}/login", 

152 body=serialized, 

153 transport_options=transport_options, 

154 ) 

155 ) 

156 

157 # ignore type: mypy bug doesn't recognized kwarg `structure` to partial func 

158 access_token = self.deserialize( 

159 data=response, structure=self.token_model 

160 ) # type: ignore 

161 assert isinstance(access_token, auth_token.AccessToken) 

162 self.token = auth_token.AuthToken(access_token) 

163 

164 def _login_sudo(self, transport_options: transport.TransportOptions) -> None: 

165 def authenticator( 

166 transport_options: transport.TransportOptions, 

167 ) -> Dict[str, str]: 

168 return { 

169 "Authorization": f"Bearer {self._get_token(transport_options).access_token}" 

170 } 

171 

172 response = self._ok( 

173 self.transport.request( 

174 transport.HttpMethod.POST, 

175 f"{self.settings.base_url}/api/{self.api_version}/login/{self._sudo_id}", 

176 authenticator=authenticator, 

177 transport_options=transport_options, 

178 ) 

179 ) 

180 # ignore type: mypy bug doesn't recognized kwarg `structure` to partial func 

181 access_token = self.deserialize( 

182 data=response, structure=self.token_model 

183 ) # type: ignore 

184 assert isinstance(access_token, auth_token.AccessToken) 

185 self.sudo_token = auth_token.AuthToken(access_token) 

186 

187 def logout( 

188 self, 

189 full: bool = False, 

190 transport_options: Optional[transport.TransportOptions] = None, 

191 ) -> None: 

192 """Logout of API. 

193 

194 If the session is authenticated as sudo_id, logout() "undoes" 

195 the sudo and deactivates that sudo_id's current token. By default 

196 the current api3credential session is active at which point 

197 you can continue to make API calls as the api3credential user 

198 or logout(). If you want to logout completely in one step pass 

199 full=True 

200 """ 

201 if self._sudo_id: 

202 self._sudo_id = None 

203 if self.is_sudo_authenticated: 

204 self._logout(sudo=True, transport_options=transport_options) 

205 if full: 

206 self._logout(transport_options=transport_options) 

207 

208 elif self.is_authenticated: 

209 self._logout(transport_options=transport_options) 

210 

211 def _logout( 

212 self, 

213 sudo: bool = False, 

214 transport_options: Optional[transport.TransportOptions] = None, 

215 ) -> None: 

216 

217 if sudo: 

218 token = self.sudo_token.access_token 

219 self.sudo_token = auth_token.AuthToken() 

220 else: 

221 token = self.token.access_token 

222 self.token = auth_token.AuthToken() 

223 

224 def authenticator( 

225 _transport_options: transport.TransportOptions, 

226 ) -> Dict[str, str]: 

227 return {"Authorization": f"Bearer {token}"} 

228 

229 self._ok( 

230 self.transport.request( 

231 transport.HttpMethod.DELETE, 

232 f"{self.settings.base_url}/api/logout", 

233 authenticator=authenticator, 

234 transport_options=transport_options, 

235 ) 

236 ) 

237 

238 def _ok(self, response: transport.Response) -> str: 

239 if not response.ok: 

240 raise error.SDKError(response.value.decode(encoding="utf-8")) 

241 return response.value.decode(encoding="utf-8") 

242 

243 

244class CryptoHash: 

245 def secure_random(self, byte_count: int) -> str: 

246 return secrets.token_urlsafe(byte_count) 

247 

248 def sha256_hash(self, message: str) -> str: 

249 value = hashlib.sha256() 

250 value.update(bytes(message, "utf8")) 

251 return value.hexdigest() 

252 

253 

254class OAuthSession(AuthSession): 

255 def __init__( 

256 self, 

257 *, 

258 settings: api_settings.PApiSettings, 

259 transport: transport.Transport, 

260 deserialize: serialize.TDeserialize, 

261 serialize: serialize.TSerialize, 

262 crypto: CryptoHash, 

263 version: str, 

264 ): 

265 super().__init__(settings, transport, deserialize, version) 

266 self.crypto = crypto 

267 self.serialize = serialize 

268 config_data = self.settings.read_config() 

269 for required in ["client_id", "redirect_uri", "looker_url"]: 

270 if required not in config_data: 

271 raise error.SDKError(f"Missing required configuration value {required}") 

272 

273 # would have prefered using setattr(self, required, ...) in loop above 

274 # but mypy can't follow it 

275 self.client_id = config_data["client_id"] 

276 self.redirect_uri = config_data.get("redirect_uri", "") 

277 self.looker_url = config_data.get("looker_url", "") 

278 self.code_verifier = "" 

279 

280 def create_auth_code_request_url(self, scope: str, state: str) -> str: 

281 self.code_verifier = self.crypto.secure_random(32) 

282 code_challenge = self.crypto.sha256_hash(self.code_verifier) 

283 params: Dict[str, str] = { 

284 "response_type": "code", 

285 "client_id": self.client_id, 

286 "redirect_uri": self.redirect_uri, 

287 "scope": scope, 

288 "state": state, 

289 "code_challenge_method": "S256", 

290 "code_challenge": code_challenge, 

291 } 

292 path = urllib.parse.urljoin(self.looker_url, "auth") 

293 query = urllib.parse.urlencode(params) 

294 return f"{path}?{query}" 

295 

296 @attr.s(auto_attribs=True, kw_only=True) 

297 class GrantTypeParams(model.Model): 

298 client_id: str 

299 redirect_uri: str 

300 

301 @attr.s(auto_attribs=True, kw_only=True) 

302 class AuthCodeGrantTypeParams(GrantTypeParams): 

303 code: str 

304 code_verifier: str 

305 grant_type: str = "authorization_code" 

306 

307 @attr.s(auto_attribs=True, kw_only=True) 

308 class RefreshTokenGrantTypeParams(GrantTypeParams): 

309 refresh_token: str 

310 grant_type: str = "refresh_token" 

311 

312 def _request_token( 

313 self, 

314 grant_type: Union[AuthCodeGrantTypeParams, RefreshTokenGrantTypeParams], 

315 transport_options: transport.TransportOptions, 

316 ) -> auth_token.AccessToken: 

317 response = self.transport.request( 

318 transport.HttpMethod.POST, 

319 urllib.parse.urljoin(self.settings.base_url, "/api/token"), 

320 body=self.serialize(api_model=grant_type), # type: ignore 

321 ) 

322 if not response.ok: 

323 raise error.SDKError(response.value.decode(encoding=response.encoding)) 

324 

325 # ignore type: mypy bug doesn't recognized kwarg `structure` to partial func 

326 return self.deserialize( 

327 data=response.value, structure=self.token_model 

328 ) # type: ignore 

329 

330 def redeem_auth_code( 

331 self, 

332 auth_code: str, 

333 code_verifier: Optional[str] = None, 

334 transport_options: Optional[transport.TransportOptions] = None, 

335 ) -> None: 

336 params = self.AuthCodeGrantTypeParams( 

337 client_id=self.client_id, 

338 redirect_uri=self.redirect_uri, 

339 code=auth_code, 

340 code_verifier=code_verifier or self.code_verifier, 

341 ) 

342 

343 access_token = self._request_token(params, transport_options or {}) 

344 self.token = auth_token.AuthToken(access_token) 

345 

346 def _login(self, transport_options: transport.TransportOptions) -> None: 

347 params = self.RefreshTokenGrantTypeParams( 

348 client_id=self.client_id, 

349 redirect_uri=self.redirect_uri, 

350 refresh_token=self.token.refresh_token, 

351 ) 

352 access_token = self._request_token(params, transport_options) 

353 self.token = auth_token.AuthToken(access_token)