Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/msal/oauth2cli/oidc.py: 28%

94 statements  

« prev     ^ index     » next       coverage.py v7.2.7, created at 2023-06-07 06:20 +0000

1import json 

2import base64 

3import time 

4import random 

5import string 

6import warnings 

7import hashlib 

8 

9from . import oauth2 

10 

11def decode_part(raw, encoding="utf-8"): 

12 """Decode a part of the JWT. 

13 

14 JWT is encoded by padding-less base64url, 

15 based on `JWS specs <https://tools.ietf.org/html/rfc7515#appendix-C>`_. 

16 

17 :param encoding: 

18 If you are going to decode the first 2 parts of a JWT, i.e. the header 

19 or the payload, the default value "utf-8" would work fine. 

20 If you are going to decode the last part i.e. the signature part, 

21 it is a binary string so you should use `None` as encoding here. 

22 """ 

23 raw += '=' * (-len(raw) % 4) # https://stackoverflow.com/a/32517907/728675 

24 raw = str( 

25 # On Python 2.7, argument of urlsafe_b64decode must be str, not unicode. 

26 # This is not required on Python 3. 

27 raw) 

28 output = base64.urlsafe_b64decode(raw) 

29 if encoding: 

30 output = output.decode(encoding) 

31 return output 

32 

33base64decode = decode_part # Obsolete. For backward compatibility only. 

34 

35def decode_id_token(id_token, client_id=None, issuer=None, nonce=None, now=None): 

36 """Decodes and validates an id_token and returns its claims as a dictionary. 

37 

38 ID token claims would at least contain: "iss", "sub", "aud", "exp", "iat", 

39 per `specs <https://openid.net/specs/openid-connect-core-1_0.html#IDToken>`_ 

40 and it may contain other optional content such as "preferred_username", 

41 `maybe more <https://openid.net/specs/openid-connect-core-1_0.html#Claims>`_ 

42 """ 

43 decoded = json.loads(decode_part(id_token.split('.')[1])) 

44 err = None # https://openid.net/specs/openid-connect-core-1_0.html#IDTokenValidation 

45 _now = int(now or time.time()) 

46 skew = 120 # 2 minutes 

47 TIME_SUGGESTION = "Make sure your computer's time and time zone are both correct." 

48 if _now + skew < decoded.get("nbf", _now - 1): # nbf is optional per JWT specs 

49 # This is not an ID token validation, but a JWT validation 

50 # https://tools.ietf.org/html/rfc7519#section-4.1.5 

51 err = "0. The ID token is not yet valid. " + TIME_SUGGESTION 

52 if issuer and issuer != decoded["iss"]: 

53 # https://openid.net/specs/openid-connect-discovery-1_0.html#ProviderConfigurationResponse 

54 err = ('2. The Issuer Identifier for the OpenID Provider, "%s", ' 

55 "(which is typically obtained during Discovery), " 

56 "MUST exactly match the value of the iss (issuer) Claim.") % issuer 

57 if client_id: 

58 valid_aud = client_id in decoded["aud"] if isinstance( 

59 decoded["aud"], list) else client_id == decoded["aud"] 

60 if not valid_aud: 

61 err = ( 

62 "3. The aud (audience) claim must contain this client's client_id " 

63 '"%s", case-sensitively. Was your client_id in wrong casing?' 

64 # Some IdP accepts wrong casing request but issues right casing IDT 

65 ) % client_id 

66 # Per specs: 

67 # 6. If the ID Token is received via direct communication between 

68 # the Client and the Token Endpoint (which it is during _obtain_token()), 

69 # the TLS server validation MAY be used to validate the issuer 

70 # in place of checking the token signature. 

71 if _now - skew > decoded["exp"]: 

72 err = "9. The ID token already expires. " + TIME_SUGGESTION 

73 if nonce and nonce != decoded.get("nonce"): 

74 err = ("11. Nonce must be the same value " 

75 "as the one that was sent in the Authentication Request.") 

76 if err: 

77 raise RuntimeError("%s Current epoch = %s. The id_token was: %s" % ( 

78 err, _now, json.dumps(decoded, indent=2))) 

79 return decoded 

80 

81 

82def _nonce_hash(nonce): 

83 # https://openid.net/specs/openid-connect-core-1_0.html#NonceNotes 

84 return hashlib.sha256(nonce.encode("ascii")).hexdigest() 

85 

86 

87class Prompt(object): 

88 """This class defines the constant strings for prompt parameter. 

89 

90 The values are based on 

91 https://openid.net/specs/openid-connect-core-1_0.html#AuthRequest 

92 """ 

93 NONE = "none" 

94 LOGIN = "login" 

95 CONSENT = "consent" 

96 SELECT_ACCOUNT = "select_account" 

97 CREATE = "create" # Defined in https://openid.net/specs/openid-connect-prompt-create-1_0.html#PromptParameter 

98 

99 

100class Client(oauth2.Client): 

101 """OpenID Connect is a layer on top of the OAuth2. 

102 

103 See its specs at https://openid.net/connect/ 

104 """ 

105 

106 def decode_id_token(self, id_token, nonce=None): 

107 """See :func:`~decode_id_token`.""" 

108 return decode_id_token( 

109 id_token, nonce=nonce, 

110 client_id=self.client_id, issuer=self.configuration.get("issuer")) 

111 

112 def _obtain_token(self, grant_type, *args, **kwargs): 

113 """The result will also contain one more key "id_token_claims", 

114 whose value will be a dictionary returned by :func:`~decode_id_token`. 

115 """ 

116 ret = super(Client, self)._obtain_token(grant_type, *args, **kwargs) 

117 if "id_token" in ret: 

118 ret["id_token_claims"] = self.decode_id_token(ret["id_token"]) 

119 return ret 

120 

121 def build_auth_request_uri(self, response_type, nonce=None, **kwargs): 

122 """Generate an authorization uri to be visited by resource owner. 

123 

124 Return value and all other parameters are the same as 

125 :func:`oauth2.Client.build_auth_request_uri`, plus new parameter(s): 

126 

127 :param nonce: 

128 A hard-to-guess string used to mitigate replay attacks. See also 

129 `OIDC specs <https://openid.net/specs/openid-connect-core-1_0.html#AuthRequest>`_. 

130 """ 

131 warnings.warn("Use initiate_auth_code_flow() instead", DeprecationWarning) 

132 return super(Client, self).build_auth_request_uri( 

133 response_type, nonce=nonce, **kwargs) 

134 

135 def obtain_token_by_authorization_code(self, code, nonce=None, **kwargs): 

136 """Get a token via authorization code. a.k.a. Authorization Code Grant. 

137 

138 Return value and all other parameters are the same as 

139 :func:`oauth2.Client.obtain_token_by_authorization_code`, 

140 plus new parameter(s): 

141 

142 :param nonce: 

143 If you provided a nonce when calling :func:`build_auth_request_uri`, 

144 same nonce should also be provided here, so that we'll validate it. 

145 An exception will be raised if the nonce in id token mismatches. 

146 """ 

147 warnings.warn( 

148 "Use obtain_token_by_auth_code_flow() instead", DeprecationWarning) 

149 result = super(Client, self).obtain_token_by_authorization_code( 

150 code, **kwargs) 

151 nonce_in_id_token = result.get("id_token_claims", {}).get("nonce") 

152 if "id_token_claims" in result and nonce and nonce != nonce_in_id_token: 

153 raise ValueError( 

154 'The nonce in id token ("%s") should match your nonce ("%s")' % 

155 (nonce_in_id_token, nonce)) 

156 return result 

157 

158 def initiate_auth_code_flow( 

159 self, 

160 scope=None, 

161 **kwargs): 

162 """Initiate an auth code flow. 

163 

164 It provides nonce protection automatically. 

165 

166 :param list scope: 

167 A list of strings, e.g. ["profile", "email", ...]. 

168 This method will automatically send ["openid"] to the wire, 

169 although it won't modify your input list. 

170 

171 See :func:`oauth2.Client.initiate_auth_code_flow` in parent class 

172 for descriptions on other parameters and return value. 

173 """ 

174 if "id_token" in kwargs.get("response_type", ""): 

175 # Implicit grant would cause auth response coming back in #fragment, 

176 # but fragment won't reach a web service. 

177 raise ValueError('response_type="id_token ..." is not allowed') 

178 _scope = list(scope) if scope else [] # We won't modify input parameter 

179 if "openid" not in _scope: 

180 # "If no openid scope value is present, 

181 # the request may still be a valid OAuth 2.0 request, 

182 # but is not an OpenID Connect request." -- OIDC Core Specs, 3.1.2.2 

183 # https://openid.net/specs/openid-connect-core-1_0.html#AuthRequestValidation 

184 # Here we just automatically add it. If the caller do not want id_token, 

185 # they should simply go with oauth2.Client. 

186 _scope.append("openid") 

187 nonce = "".join(random.sample(string.ascii_letters, 16)) 

188 flow = super(Client, self).initiate_auth_code_flow( 

189 scope=_scope, nonce=_nonce_hash(nonce), **kwargs) 

190 flow["nonce"] = nonce 

191 if kwargs.get("max_age") is not None: 

192 flow["max_age"] = kwargs["max_age"] 

193 return flow 

194 

195 def obtain_token_by_auth_code_flow(self, auth_code_flow, auth_response, **kwargs): 

196 """Validate the auth_response being redirected back, and then obtain tokens, 

197 including ID token which can be used for user sign in. 

198 

199 Internally, it implements nonce to mitigate replay attack. 

200 It also implements PKCE to mitigate the auth code interception attack. 

201 

202 See :func:`oauth2.Client.obtain_token_by_auth_code_flow` in parent class 

203 for descriptions on other parameters and return value. 

204 """ 

205 result = super(Client, self).obtain_token_by_auth_code_flow( 

206 auth_code_flow, auth_response, **kwargs) 

207 if "id_token_claims" in result: 

208 nonce_in_id_token = result.get("id_token_claims", {}).get("nonce") 

209 expected_hash = _nonce_hash(auth_code_flow["nonce"]) 

210 if nonce_in_id_token != expected_hash: 

211 raise RuntimeError( 

212 'The nonce in id token ("%s") should match our nonce ("%s")' % 

213 (nonce_in_id_token, expected_hash)) 

214 

215 if auth_code_flow.get("max_age") is not None: 

216 auth_time = result.get("id_token_claims", {}).get("auth_time") 

217 if not auth_time: 

218 raise RuntimeError( 

219 "13. max_age was requested, ID token should contain auth_time") 

220 now = int(time.time()) 

221 skew = 120 # 2 minutes. Hardcoded, for now 

222 if now - skew > auth_time + auth_code_flow["max_age"]: 

223 raise RuntimeError( 

224 "13. auth_time ({auth_time}) was requested, " 

225 "by using max_age ({max_age}) parameter, " 

226 "and now ({now}) too much time has elasped " 

227 "since last end-user authentication. " 

228 "The ID token was: {id_token}".format( 

229 auth_time=auth_time, 

230 max_age=auth_code_flow["max_age"], 

231 now=now, 

232 id_token=json.dumps(result["id_token_claims"], indent=2), 

233 )) 

234 return result 

235 

236 def obtain_token_by_browser( 

237 self, 

238 display=None, 

239 prompt=None, 

240 max_age=None, 

241 ui_locales=None, 

242 id_token_hint=None, # It is relevant, 

243 # because this library exposes raw ID token 

244 login_hint=None, 

245 acr_values=None, 

246 **kwargs): 

247 """A native app can use this method to obtain token via a local browser. 

248 

249 Internally, it implements nonce to mitigate replay attack. 

250 It also implements PKCE to mitigate the auth code interception attack. 

251 

252 :param string display: Defined in 

253 `OIDC <https://openid.net/specs/openid-connect-core-1_0.html#AuthRequest>`_. 

254 :param string prompt: Defined in 

255 `OIDC <https://openid.net/specs/openid-connect-core-1_0.html#AuthRequest>`_. 

256 You can find the valid string values defined in :class:`oidc.Prompt`. 

257 

258 :param int max_age: Defined in 

259 `OIDC <https://openid.net/specs/openid-connect-core-1_0.html#AuthRequest>`_. 

260 :param string ui_locales: Defined in 

261 `OIDC <https://openid.net/specs/openid-connect-core-1_0.html#AuthRequest>`_. 

262 :param string id_token_hint: Defined in 

263 `OIDC <https://openid.net/specs/openid-connect-core-1_0.html#AuthRequest>`_. 

264 :param string login_hint: Defined in 

265 `OIDC <https://openid.net/specs/openid-connect-core-1_0.html#AuthRequest>`_. 

266 :param string acr_values: Defined in 

267 `OIDC <https://openid.net/specs/openid-connect-core-1_0.html#AuthRequest>`_. 

268 

269 See :func:`oauth2.Client.obtain_token_by_browser` in parent class 

270 for descriptions on other parameters and return value. 

271 """ 

272 filtered_params = {k:v for k, v in dict( 

273 prompt=" ".join(prompt) if isinstance(prompt, (list, tuple)) else prompt, 

274 display=display, 

275 max_age=max_age, 

276 ui_locales=ui_locales, 

277 id_token_hint=id_token_hint, 

278 login_hint=login_hint, 

279 acr_values=acr_values, 

280 ).items() if v is not None} # Filter out None values 

281 return super(Client, self).obtain_token_by_browser( 

282 auth_params=dict(kwargs.pop("auth_params", {}), **filtered_params), 

283 **kwargs) 

284