Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/msal/oauth2cli/oidc.py: 38%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

109 statements  

1import json 

2import base64 

3import time 

4import random 

5import string 

6import warnings 

7import hashlib 

8import logging 

9 

10from . import oauth2 

11 

12 

13logger = logging.getLogger(__name__) 

14 

15def decode_part(raw, encoding="utf-8"): 

16 """Decode a part of the JWT. 

17 

18 JWT is encoded by padding-less base64url, 

19 based on `JWS specs <https://tools.ietf.org/html/rfc7515#appendix-C>`_. 

20 

21 :param encoding: 

22 If you are going to decode the first 2 parts of a JWT, i.e. the header 

23 or the payload, the default value "utf-8" would work fine. 

24 If you are going to decode the last part i.e. the signature part, 

25 it is a binary string so you should use `None` as encoding here. 

26 """ 

27 raw += '=' * (-len(raw) % 4) # https://stackoverflow.com/a/32517907/728675 

28 raw = str( 

29 # On Python 2.7, argument of urlsafe_b64decode must be str, not unicode. 

30 # This is not required on Python 3. 

31 raw) 

32 output = base64.urlsafe_b64decode(raw) 

33 if encoding: 

34 output = output.decode(encoding) 

35 return output 

36 

37base64decode = decode_part # Obsolete. For backward compatibility only. 

38 

39def _epoch_to_local(epoch): 

40 return time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(epoch)) 

41 

42class IdTokenError(RuntimeError): # We waised RuntimeError before, so keep it 

43 """In unlikely event of an ID token is malformed, this exception will be raised.""" 

44 def __init__(self, reason, now, claims): 

45 super(IdTokenError, self).__init__( 

46 "%s Current epoch = %s. The id_token was approximately: %s" % ( 

47 reason, _epoch_to_local(now), json.dumps(dict( 

48 claims, 

49 iat=_epoch_to_local(claims["iat"]) if claims.get("iat") else None, 

50 exp=_epoch_to_local(claims["exp"]) if claims.get("exp") else None, 

51 ), indent=2))) 

52 

53class _IdTokenTimeError(IdTokenError): # This is not intended to be raised and caught 

54 _SUGGESTION = "Make sure your computer's time and time zone are both correct." 

55 def __init__(self, reason, now, claims): 

56 super(_IdTokenTimeError, self).__init__(reason+ " " + self._SUGGESTION, now, claims) 

57 def log(self): 

58 # Influenced by JWT specs https://tools.ietf.org/html/rfc7519#section-4.1.5 

59 # and OIDC specs https://openid.net/specs/openid-connect-core-1_0.html#IDTokenValidation 

60 # We used to raise this error, but now we just log it as warning, because: 

61 # 1. If it is caused by incorrect local machine time, 

62 # then the token(s) are still correct and probably functioning, 

63 # so, there is no point to error out. 

64 # 2. If it is caused by incorrect IdP time, then it is IdP's fault, 

65 # There is not much a client can do, so, we might as well return the token(s) 

66 # and let downstream components to decide what to do. 

67 logger.warning(str(self)) 

68 

69class IdTokenIssuerError(IdTokenError): 

70 pass 

71 

72class IdTokenAudienceError(IdTokenError): 

73 pass 

74 

75class IdTokenNonceError(IdTokenError): 

76 pass 

77 

78def decode_id_token(id_token, client_id=None, issuer=None, nonce=None, now=None): 

79 """Decodes and validates an id_token and returns its claims as a dictionary. 

80 

81 ID token claims would at least contain: "iss", "sub", "aud", "exp", "iat", 

82 per `specs <https://openid.net/specs/openid-connect-core-1_0.html#IDToken>`_ 

83 and it may contain other optional content such as "preferred_username", 

84 `maybe more <https://openid.net/specs/openid-connect-core-1_0.html#Claims>`_ 

85 """ 

86 decoded = json.loads(decode_part(id_token.split('.')[1])) 

87 # Based on https://openid.net/specs/openid-connect-core-1_0.html#IDTokenValidation 

88 _now = int(now or time.time()) 

89 skew = 120 # 2 minutes 

90 

91 if _now + skew < decoded.get("nbf", _now - 1): # nbf is optional per JWT specs 

92 # This is not an ID token validation, but a JWT validation 

93 # https://tools.ietf.org/html/rfc7519#section-4.1.5 

94 _IdTokenTimeError("0. The ID token is not yet valid.", _now, decoded).log() 

95 

96 if issuer and issuer != decoded["iss"]: 

97 # https://openid.net/specs/openid-connect-discovery-1_0.html#ProviderConfigurationResponse 

98 raise IdTokenIssuerError( 

99 '2. The Issuer Identifier for the OpenID Provider, "%s", ' 

100 "(which is typically obtained during Discovery), " 

101 "MUST exactly match the value of the iss (issuer) Claim." % issuer, 

102 _now, 

103 decoded) 

104 

105 if client_id: 

106 valid_aud = client_id in decoded["aud"] if isinstance( 

107 decoded["aud"], list) else client_id == decoded["aud"] 

108 if not valid_aud: 

109 raise IdTokenAudienceError( 

110 "3. The aud (audience) claim must contain this client's client_id " 

111 '"%s", case-sensitively. Was your client_id in wrong casing?' 

112 # Some IdP accepts wrong casing request but issues right casing IDT 

113 % client_id, 

114 _now, 

115 decoded) 

116 

117 # Per specs: 

118 # 6. If the ID Token is received via direct communication between 

119 # the Client and the Token Endpoint (which it is during _obtain_token()), 

120 # the TLS server validation MAY be used to validate the issuer 

121 # in place of checking the token signature. 

122 

123 if _now - skew > decoded["exp"]: 

124 _IdTokenTimeError("9. The ID token already expires.", _now, decoded).log() 

125 

126 if nonce and nonce != decoded.get("nonce"): 

127 raise IdTokenNonceError( 

128 "11. Nonce must be the same value " 

129 "as the one that was sent in the Authentication Request.", 

130 _now, 

131 decoded) 

132 

133 return decoded 

134 

135 

136def _nonce_hash(nonce): 

137 # https://openid.net/specs/openid-connect-core-1_0.html#NonceNotes 

138 return hashlib.sha256(nonce.encode("ascii")).hexdigest() 

139 

140 

141class Prompt(object): 

142 """This class defines the constant strings for prompt parameter. 

143 

144 The values are based on 

145 https://openid.net/specs/openid-connect-core-1_0.html#AuthRequest 

146 """ 

147 NONE = "none" 

148 LOGIN = "login" 

149 CONSENT = "consent" 

150 SELECT_ACCOUNT = "select_account" 

151 CREATE = "create" # Defined in https://openid.net/specs/openid-connect-prompt-create-1_0.html#PromptParameter 

152 

153 

154class Client(oauth2.Client): 

155 """OpenID Connect is a layer on top of the OAuth2. 

156 

157 See its specs at https://openid.net/connect/ 

158 """ 

159 

160 def decode_id_token(self, id_token, nonce=None): 

161 """See :func:`~decode_id_token`.""" 

162 return decode_id_token( 

163 id_token, nonce=nonce, 

164 client_id=self.client_id, issuer=self.configuration.get("issuer")) 

165 

166 def _obtain_token(self, grant_type, *args, **kwargs): 

167 """The result will also contain one more key "id_token_claims", 

168 whose value will be a dictionary returned by :func:`~decode_id_token`. 

169 """ 

170 ret = super(Client, self)._obtain_token(grant_type, *args, **kwargs) 

171 if "id_token" in ret: 

172 ret["id_token_claims"] = self.decode_id_token(ret["id_token"]) 

173 return ret 

174 

175 def build_auth_request_uri(self, response_type, nonce=None, **kwargs): 

176 """Generate an authorization uri to be visited by resource owner. 

177 

178 Return value and all other parameters are the same as 

179 :func:`oauth2.Client.build_auth_request_uri`, plus new parameter(s): 

180 

181 :param nonce: 

182 A hard-to-guess string used to mitigate replay attacks. See also 

183 `OIDC specs <https://openid.net/specs/openid-connect-core-1_0.html#AuthRequest>`_. 

184 """ 

185 warnings.warn("Use initiate_auth_code_flow() instead", DeprecationWarning) 

186 return super(Client, self).build_auth_request_uri( 

187 response_type, nonce=nonce, **kwargs) 

188 

189 def obtain_token_by_authorization_code(self, code, nonce=None, **kwargs): 

190 """Get a token via authorization code. a.k.a. Authorization Code Grant. 

191 

192 Return value and all other parameters are the same as 

193 :func:`oauth2.Client.obtain_token_by_authorization_code`, 

194 plus new parameter(s): 

195 

196 :param nonce: 

197 If you provided a nonce when calling :func:`build_auth_request_uri`, 

198 same nonce should also be provided here, so that we'll validate it. 

199 An exception will be raised if the nonce in id token mismatches. 

200 """ 

201 warnings.warn( 

202 "Use obtain_token_by_auth_code_flow() instead", DeprecationWarning) 

203 result = super(Client, self).obtain_token_by_authorization_code( 

204 code, **kwargs) 

205 nonce_in_id_token = result.get("id_token_claims", {}).get("nonce") 

206 if "id_token_claims" in result and nonce and nonce != nonce_in_id_token: 

207 raise ValueError( 

208 'The nonce in id token ("%s") should match your nonce ("%s")' % 

209 (nonce_in_id_token, nonce)) 

210 return result 

211 

212 def initiate_auth_code_flow( 

213 self, 

214 scope=None, 

215 **kwargs): 

216 """Initiate an auth code flow. 

217 

218 It provides nonce protection automatically. 

219 

220 :param list scope: 

221 A list of strings, e.g. ["profile", "email", ...]. 

222 This method will automatically send ["openid"] to the wire, 

223 although it won't modify your input list. 

224 

225 See :func:`oauth2.Client.initiate_auth_code_flow` in parent class 

226 for descriptions on other parameters and return value. 

227 """ 

228 if "id_token" in kwargs.get("response_type", ""): 

229 # Implicit grant would cause auth response coming back in #fragment, 

230 # but fragment won't reach a web service. 

231 raise ValueError('response_type="id_token ..." is not allowed') 

232 _scope = list(scope) if scope else [] # We won't modify input parameter 

233 if "openid" not in _scope: 

234 # "If no openid scope value is present, 

235 # the request may still be a valid OAuth 2.0 request, 

236 # but is not an OpenID Connect request." -- OIDC Core Specs, 3.1.2.2 

237 # https://openid.net/specs/openid-connect-core-1_0.html#AuthRequestValidation 

238 # Here we just automatically add it. If the caller do not want id_token, 

239 # they should simply go with oauth2.Client. 

240 _scope.append("openid") 

241 nonce = "".join(random.sample(string.ascii_letters, 16)) 

242 flow = super(Client, self).initiate_auth_code_flow( 

243 scope=_scope, nonce=_nonce_hash(nonce), **kwargs) 

244 flow["nonce"] = nonce 

245 if kwargs.get("max_age") is not None: 

246 flow["max_age"] = kwargs["max_age"] 

247 return flow 

248 

249 def obtain_token_by_auth_code_flow(self, auth_code_flow, auth_response, **kwargs): 

250 """Validate the auth_response being redirected back, and then obtain tokens, 

251 including ID token which can be used for user sign in. 

252 

253 Internally, it implements nonce to mitigate replay attack. 

254 It also implements PKCE to mitigate the auth code interception attack. 

255 

256 See :func:`oauth2.Client.obtain_token_by_auth_code_flow` in parent class 

257 for descriptions on other parameters and return value. 

258 """ 

259 result = super(Client, self).obtain_token_by_auth_code_flow( 

260 auth_code_flow, auth_response, **kwargs) 

261 if "id_token_claims" in result: 

262 nonce_in_id_token = result.get("id_token_claims", {}).get("nonce") 

263 expected_hash = _nonce_hash(auth_code_flow["nonce"]) 

264 if nonce_in_id_token != expected_hash: 

265 raise RuntimeError( 

266 'The nonce in id token ("%s") should match our nonce ("%s")' % 

267 (nonce_in_id_token, expected_hash)) 

268 

269 if auth_code_flow.get("max_age") is not None: 

270 auth_time = result.get("id_token_claims", {}).get("auth_time") 

271 if not auth_time: 

272 raise RuntimeError( 

273 "13. max_age was requested, ID token should contain auth_time") 

274 now = int(time.time()) 

275 skew = 120 # 2 minutes. Hardcoded, for now 

276 if now - skew > auth_time + auth_code_flow["max_age"]: 

277 raise RuntimeError( 

278 "13. auth_time ({auth_time}) was requested, " 

279 "by using max_age ({max_age}) parameter, " 

280 "and now ({now}) too much time has elasped " 

281 "since last end-user authentication. " 

282 "The ID token was: {id_token}".format( 

283 auth_time=auth_time, 

284 max_age=auth_code_flow["max_age"], 

285 now=now, 

286 id_token=json.dumps(result["id_token_claims"], indent=2), 

287 )) 

288 return result 

289 

290 def obtain_token_by_browser( 

291 self, 

292 display=None, 

293 prompt=None, 

294 max_age=None, 

295 ui_locales=None, 

296 id_token_hint=None, # It is relevant, 

297 # because this library exposes raw ID token 

298 login_hint=None, 

299 acr_values=None, 

300 **kwargs): 

301 """A native app can use this method to obtain token via a local browser. 

302 

303 Internally, it implements nonce to mitigate replay attack. 

304 It also implements PKCE to mitigate the auth code interception attack. 

305 

306 :param string display: Defined in 

307 `OIDC <https://openid.net/specs/openid-connect-core-1_0.html#AuthRequest>`_. 

308 :param string prompt: Defined in 

309 `OIDC <https://openid.net/specs/openid-connect-core-1_0.html#AuthRequest>`_. 

310 You can find the valid string values defined in :class:`oidc.Prompt`. 

311 

312 :param int max_age: Defined in 

313 `OIDC <https://openid.net/specs/openid-connect-core-1_0.html#AuthRequest>`_. 

314 :param string ui_locales: Defined in 

315 `OIDC <https://openid.net/specs/openid-connect-core-1_0.html#AuthRequest>`_. 

316 :param string id_token_hint: Defined in 

317 `OIDC <https://openid.net/specs/openid-connect-core-1_0.html#AuthRequest>`_. 

318 :param string login_hint: Defined in 

319 `OIDC <https://openid.net/specs/openid-connect-core-1_0.html#AuthRequest>`_. 

320 :param string acr_values: Defined in 

321 `OIDC <https://openid.net/specs/openid-connect-core-1_0.html#AuthRequest>`_. 

322 

323 See :func:`oauth2.Client.obtain_token_by_browser` in parent class 

324 for descriptions on other parameters and return value. 

325 """ 

326 filtered_params = {k:v for k, v in dict( 

327 prompt=" ".join(prompt) if isinstance(prompt, (list, tuple)) else prompt, 

328 display=display, 

329 max_age=max_age, 

330 ui_locales=ui_locales, 

331 id_token_hint=id_token_hint, 

332 login_hint=login_hint, 

333 acr_values=acr_values, 

334 ).items() if v is not None} # Filter out None values 

335 return super(Client, self).obtain_token_by_browser( 

336 auth_params=dict(kwargs.pop("auth_params", {}), **filtered_params), 

337 **kwargs) 

338