Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/oauthlib/oauth2/rfc6749/tokens.py: 29%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

150 statements  

1""" 

2oauthlib.oauth2.rfc6749.tokens 

3~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ 

4 

5This module contains methods for adding two types of access tokens to requests. 

6 

7- Bearer https://tools.ietf.org/html/rfc6750 

8- MAC https://tools.ietf.org/html/draft-ietf-oauth-v2-http-mac-01 

9""" 

10import hashlib 

11import hmac 

12import warnings 

13from binascii import b2a_base64 

14from urllib.parse import urlparse 

15 

16from oauthlib import common 

17from oauthlib.common import add_params_to_qs, add_params_to_uri 

18 

19from . import utils 

20 

21 

22class OAuth2Token(dict): 

23 

24 def __init__(self, params, old_scope=None): 

25 super().__init__(params) 

26 self._new_scope = None 

27 if params.get('scope'): 

28 self._new_scope = set(utils.scope_to_list(params['scope'])) 

29 if old_scope is not None: 

30 self._old_scope = set(utils.scope_to_list(old_scope)) 

31 if self._new_scope is None: 

32 # the rfc says that if the scope hasn't changed, it's optional 

33 # in params so set the new scope to the old scope 

34 self._new_scope = self._old_scope 

35 else: 

36 self._old_scope = self._new_scope 

37 

38 @property 

39 def scope_changed(self): 

40 return self._new_scope != self._old_scope 

41 

42 @property 

43 def old_scope(self): 

44 return utils.list_to_scope(self._old_scope) 

45 

46 @property 

47 def old_scopes(self): 

48 return list(self._old_scope) 

49 

50 @property 

51 def scope(self): 

52 return utils.list_to_scope(self._new_scope) 

53 

54 @property 

55 def scopes(self): 

56 return list(self._new_scope) 

57 

58 @property 

59 def missing_scopes(self): 

60 return list(self._old_scope - self._new_scope) 

61 

62 @property 

63 def additional_scopes(self): 

64 return list(self._new_scope - self._old_scope) 

65 

66 

67def prepare_mac_header(token, uri, key, http_method, 

68 nonce=None, 

69 headers=None, 

70 body=None, 

71 ext='', 

72 hash_algorithm='hmac-sha-1', 

73 issue_time=None, 

74 draft=0): 

75 """Add an `MAC Access Authentication`_ signature to headers. 

76 

77 Unlike OAuth 1, this HMAC signature does not require inclusion of the 

78 request payload/body, neither does it use a combination of client_secret 

79 and token_secret but rather a mac_key provided together with the access 

80 token. 

81 

82 Currently two algorithms are supported, "hmac-sha-1" and "hmac-sha-256", 

83 `extension algorithms`_ are not supported. 

84 

85 Example MAC Authorization header, linebreaks added for clarity 

86 

87 Authorization: MAC id="h480djs93hd8", 

88 nonce="1336363200:dj83hs9s", 

89 mac="bhCQXTVyfj5cmA9uKkPFx1zeOXM=" 

90 

91 .. _`MAC Access Authentication`: https://tools.ietf.org/html/draft-ietf-oauth-v2-http-mac-01 

92 .. _`extension algorithms`: https://tools.ietf.org/html/draft-ietf-oauth-v2-http-mac-01#section-7.1 

93 

94 :param token: 

95 :param uri: Request URI. 

96 :param key: MAC given provided by token endpoint. 

97 :param http_method: HTTP Request method. 

98 :param nonce: 

99 :param headers: Request headers as a dictionary. 

100 :param body: 

101 :param ext: 

102 :param hash_algorithm: HMAC algorithm provided by token endpoint. 

103 :param issue_time: Time when the MAC credentials were issued (datetime). 

104 :param draft: MAC authentication specification version. 

105 :return: headers dictionary with the authorization field added. 

106 """ 

107 http_method = http_method.upper() 

108 host, port = utils.host_from_uri(uri) 

109 

110 if hash_algorithm.lower() == 'hmac-sha-1': 

111 h = hashlib.sha1 

112 elif hash_algorithm.lower() == 'hmac-sha-256': 

113 h = hashlib.sha256 

114 else: 

115 raise ValueError('unknown hash algorithm') 

116 

117 if draft == 0: 

118 nonce = nonce or '{}:{}'.format(utils.generate_age(issue_time), 

119 common.generate_nonce()) 

120 else: 

121 ts = common.generate_timestamp() 

122 nonce = common.generate_nonce() 

123 

124 sch, net, path, par, query, fra = urlparse(uri) 

125 

126 request_uri = path + '?' + query if query else path 

127 

128 # Hash the body/payload 

129 if body is not None and draft == 0: 

130 body = body.encode('utf-8') 

131 bodyhash = b2a_base64(h(body).digest())[:-1].decode('utf-8') 

132 else: 

133 bodyhash = '' 

134 

135 # Create the normalized base string 

136 base = [] 

137 if draft == 0: 

138 base.append(nonce) 

139 else: 

140 base.append(ts) 

141 base.append(nonce) 

142 base.append(http_method.upper()) 

143 base.append(request_uri) 

144 base.append(host) 

145 base.append(port) 

146 if draft == 0: 

147 base.append(bodyhash) 

148 base.append(ext or '') 

149 base_string = '\n'.join(base) + '\n' 

150 

151 # hmac struggles with unicode strings - http://bugs.python.org/issue5285 

152 if isinstance(key, str): 

153 key = key.encode('utf-8') 

154 sign = hmac.new(key, base_string.encode('utf-8'), h) 

155 sign = b2a_base64(sign.digest())[:-1].decode('utf-8') 

156 

157 header = [] 

158 header.append('MAC id="%s"' % token) 

159 if draft != 0: 

160 header.append('ts="%s"' % ts) 

161 header.append('nonce="%s"' % nonce) 

162 if bodyhash: 

163 header.append('bodyhash="%s"' % bodyhash) 

164 if ext: 

165 header.append('ext="%s"' % ext) 

166 header.append('mac="%s"' % sign) 

167 

168 headers = headers or {} 

169 headers['Authorization'] = ', '.join(header) 

170 return headers 

171 

172 

173def prepare_bearer_uri(token, uri): 

174 """Add a `Bearer Token`_ to the request URI. 

175 Not recommended, use only if client can't use authorization header or body. 

176 

177 http://www.example.com/path?access_token=h480djs93hd8 

178 

179 .. _`Bearer Token`: https://tools.ietf.org/html/rfc6750 

180 

181 :param token: 

182 :param uri: 

183 """ 

184 return add_params_to_uri(uri, [(('access_token', token))]) 

185 

186 

187def prepare_bearer_headers(token, headers=None): 

188 """Add a `Bearer Token`_ to the request URI. 

189 Recommended method of passing bearer tokens. 

190 

191 Authorization: Bearer h480djs93hd8 

192 

193 .. _`Bearer Token`: https://tools.ietf.org/html/rfc6750 

194 

195 :param token: 

196 :param headers: 

197 """ 

198 headers = headers or {} 

199 headers['Authorization'] = 'Bearer %s' % token 

200 return headers 

201 

202 

203def prepare_bearer_body(token, body=''): 

204 """Add a `Bearer Token`_ to the request body. 

205 

206 access_token=h480djs93hd8 

207 

208 .. _`Bearer Token`: https://tools.ietf.org/html/rfc6750 

209 

210 :param token: 

211 :param body: 

212 """ 

213 return add_params_to_qs(body, [(('access_token', token))]) 

214 

215 

216def random_token_generator(request, refresh_token=False): 

217 """ 

218 :param request: OAuthlib request. 

219 :type request: oauthlib.common.Request 

220 :param refresh_token: 

221 """ 

222 return common.generate_token() 

223 

224 

225def signed_token_generator(private_pem, **kwargs): 

226 """ 

227 :param private_pem: 

228 """ 

229 def signed_token_generator(request): 

230 request.claims = kwargs 

231 return common.generate_signed_token(private_pem, request) 

232 

233 return signed_token_generator 

234 

235 

236def get_token_from_header(request): 

237 """ 

238 Helper function to extract a token from the request header. 

239 

240 :param request: OAuthlib request. 

241 :type request: oauthlib.common.Request 

242 :return: Return the token or None if the Authorization header is malformed. 

243 """ 

244 token = None 

245 

246 if 'Authorization' in request.headers: 

247 split_header = request.headers.get('Authorization').split() 

248 if len(split_header) == 2 and split_header[0].lower() == 'bearer': 

249 token = split_header[1] 

250 else: 

251 token = request.access_token 

252 

253 return token 

254 

255 

256class TokenBase: 

257 __slots__ = () 

258 

259 def __call__(self, request, refresh_token=False): 

260 raise NotImplementedError('Subclasses must implement this method.') 

261 

262 def validate_request(self, request): 

263 """ 

264 :param request: OAuthlib request. 

265 :type request: oauthlib.common.Request 

266 """ 

267 raise NotImplementedError('Subclasses must implement this method.') 

268 

269 def estimate_type(self, request): 

270 """ 

271 :param request: OAuthlib request. 

272 :type request: oauthlib.common.Request 

273 """ 

274 raise NotImplementedError('Subclasses must implement this method.') 

275 

276 

277class BearerToken(TokenBase): 

278 __slots__ = ( 

279 'request_validator', 'token_generator', 

280 'refresh_token_generator', 'expires_in' 

281 ) 

282 

283 def __init__(self, request_validator=None, token_generator=None, 

284 expires_in=None, refresh_token_generator=None): 

285 self.request_validator = request_validator 

286 self.token_generator = token_generator or random_token_generator 

287 self.refresh_token_generator = ( 

288 refresh_token_generator or self.token_generator 

289 ) 

290 self.expires_in = expires_in or 3600 

291 

292 def create_token(self, request, refresh_token=False, **kwargs): 

293 """ 

294 Create a BearerToken, by default without refresh token. 

295 

296 :param request: OAuthlib request. 

297 :type request: oauthlib.common.Request 

298 :param refresh_token: 

299 """ 

300 if "save_token" in kwargs: 

301 warnings.warn("`save_token` has been deprecated, it was not called internally." 

302 "If you do, call `request_validator.save_token()` instead.", 

303 DeprecationWarning) 

304 

305 expires_in = self.expires_in(request) if callable(self.expires_in) else self.expires_in 

306 

307 request.expires_in = expires_in 

308 

309 token = { 

310 'access_token': self.token_generator(request), 

311 'expires_in': expires_in, 

312 'token_type': 'Bearer', 

313 } 

314 

315 # If provided, include - this is optional in some cases https://tools.ietf.org/html/rfc6749#section-3.3 but 

316 # there is currently no mechanism to coordinate issuing a token for only a subset of the requested scopes so 

317 # all tokens issued are for the entire set of requested scopes. 

318 if request.scopes is not None: 

319 token['scope'] = ' '.join(request.scopes) 

320 

321 if refresh_token: 

322 if (request.refresh_token and 

323 not self.request_validator.rotate_refresh_token(request)): 

324 token['refresh_token'] = request.refresh_token 

325 else: 

326 token['refresh_token'] = self.refresh_token_generator(request) 

327 

328 token.update(request.extra_credentials or {}) 

329 return OAuth2Token(token) 

330 

331 def validate_request(self, request): 

332 """ 

333 :param request: OAuthlib request. 

334 :type request: oauthlib.common.Request 

335 """ 

336 token = get_token_from_header(request) 

337 return self.request_validator.validate_bearer_token( 

338 token, request.scopes, request) 

339 

340 def estimate_type(self, request): 

341 """ 

342 :param request: OAuthlib request. 

343 :type request: oauthlib.common.Request 

344 """ 

345 if request.headers.get('Authorization', '').split(' ')[0].lower() == 'bearer': 

346 return 9 

347 elif request.access_token is not None: 

348 return 5 

349 else: 

350 return 0