Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/oauthlib/oauth2/rfc6749/tokens.py: 27%

153 statements  

« prev     ^ index     » next       coverage.py v7.3.2, created at 2023-12-08 06:22 +0000

1""" 

2oauthlib.oauth2.rfc6749.tokens 

3~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ 

4 

5This module contains methods for adding two types of access tokens to requests. 

6 

7- Bearer https://tools.ietf.org/html/rfc6750 

8- MAC https://tools.ietf.org/html/draft-ietf-oauth-v2-http-mac-01 

9""" 

10import hashlib 

11import hmac 

12import warnings 

13from binascii import b2a_base64 

14from urllib.parse import urlparse 

15 

16from oauthlib import common 

17from oauthlib.common import add_params_to_qs, add_params_to_uri 

18 

19from . import utils 

20 

21 

22class OAuth2Token(dict): 

23 

24 def __init__(self, params, old_scope=None): 

25 super().__init__(params) 

26 self._new_scope = None 

27 if 'scope' in params and params['scope']: 

28 self._new_scope = set(utils.scope_to_list(params['scope'])) 

29 if old_scope is not None: 

30 self._old_scope = set(utils.scope_to_list(old_scope)) 

31 if self._new_scope is None: 

32 # the rfc says that if the scope hasn't changed, it's optional 

33 # in params so set the new scope to the old scope 

34 self._new_scope = self._old_scope 

35 else: 

36 self._old_scope = self._new_scope 

37 

38 @property 

39 def scope_changed(self): 

40 return self._new_scope != self._old_scope 

41 

42 @property 

43 def old_scope(self): 

44 return utils.list_to_scope(self._old_scope) 

45 

46 @property 

47 def old_scopes(self): 

48 return list(self._old_scope) 

49 

50 @property 

51 def scope(self): 

52 return utils.list_to_scope(self._new_scope) 

53 

54 @property 

55 def scopes(self): 

56 return list(self._new_scope) 

57 

58 @property 

59 def missing_scopes(self): 

60 return list(self._old_scope - self._new_scope) 

61 

62 @property 

63 def additional_scopes(self): 

64 return list(self._new_scope - self._old_scope) 

65 

66 

67def prepare_mac_header(token, uri, key, http_method, 

68 nonce=None, 

69 headers=None, 

70 body=None, 

71 ext='', 

72 hash_algorithm='hmac-sha-1', 

73 issue_time=None, 

74 draft=0): 

75 """Add an `MAC Access Authentication`_ signature to headers. 

76 

77 Unlike OAuth 1, this HMAC signature does not require inclusion of the 

78 request payload/body, neither does it use a combination of client_secret 

79 and token_secret but rather a mac_key provided together with the access 

80 token. 

81 

82 Currently two algorithms are supported, "hmac-sha-1" and "hmac-sha-256", 

83 `extension algorithms`_ are not supported. 

84 

85 Example MAC Authorization header, linebreaks added for clarity 

86 

87 Authorization: MAC id="h480djs93hd8", 

88 nonce="1336363200:dj83hs9s", 

89 mac="bhCQXTVyfj5cmA9uKkPFx1zeOXM=" 

90 

91 .. _`MAC Access Authentication`: https://tools.ietf.org/html/draft-ietf-oauth-v2-http-mac-01 

92 .. _`extension algorithms`: https://tools.ietf.org/html/draft-ietf-oauth-v2-http-mac-01#section-7.1 

93 

94 :param token: 

95 :param uri: Request URI. 

96 :param key: MAC given provided by token endpoint. 

97 :param http_method: HTTP Request method. 

98 :param nonce: 

99 :param headers: Request headers as a dictionary. 

100 :param body: 

101 :param ext: 

102 :param hash_algorithm: HMAC algorithm provided by token endpoint. 

103 :param issue_time: Time when the MAC credentials were issued (datetime). 

104 :param draft: MAC authentication specification version. 

105 :return: headers dictionary with the authorization field added. 

106 """ 

107 http_method = http_method.upper() 

108 host, port = utils.host_from_uri(uri) 

109 

110 if hash_algorithm.lower() == 'hmac-sha-1': 

111 h = hashlib.sha1 

112 elif hash_algorithm.lower() == 'hmac-sha-256': 

113 h = hashlib.sha256 

114 else: 

115 raise ValueError('unknown hash algorithm') 

116 

117 if draft == 0: 

118 nonce = nonce or '{}:{}'.format(utils.generate_age(issue_time), 

119 common.generate_nonce()) 

120 else: 

121 ts = common.generate_timestamp() 

122 nonce = common.generate_nonce() 

123 

124 sch, net, path, par, query, fra = urlparse(uri) 

125 

126 if query: 

127 request_uri = path + '?' + query 

128 else: 

129 request_uri = path 

130 

131 # Hash the body/payload 

132 if body is not None and draft == 0: 

133 body = body.encode('utf-8') 

134 bodyhash = b2a_base64(h(body).digest())[:-1].decode('utf-8') 

135 else: 

136 bodyhash = '' 

137 

138 # Create the normalized base string 

139 base = [] 

140 if draft == 0: 

141 base.append(nonce) 

142 else: 

143 base.append(ts) 

144 base.append(nonce) 

145 base.append(http_method.upper()) 

146 base.append(request_uri) 

147 base.append(host) 

148 base.append(port) 

149 if draft == 0: 

150 base.append(bodyhash) 

151 base.append(ext or '') 

152 base_string = '\n'.join(base) + '\n' 

153 

154 # hmac struggles with unicode strings - http://bugs.python.org/issue5285 

155 if isinstance(key, str): 

156 key = key.encode('utf-8') 

157 sign = hmac.new(key, base_string.encode('utf-8'), h) 

158 sign = b2a_base64(sign.digest())[:-1].decode('utf-8') 

159 

160 header = [] 

161 header.append('MAC id="%s"' % token) 

162 if draft != 0: 

163 header.append('ts="%s"' % ts) 

164 header.append('nonce="%s"' % nonce) 

165 if bodyhash: 

166 header.append('bodyhash="%s"' % bodyhash) 

167 if ext: 

168 header.append('ext="%s"' % ext) 

169 header.append('mac="%s"' % sign) 

170 

171 headers = headers or {} 

172 headers['Authorization'] = ', '.join(header) 

173 return headers 

174 

175 

176def prepare_bearer_uri(token, uri): 

177 """Add a `Bearer Token`_ to the request URI. 

178 Not recommended, use only if client can't use authorization header or body. 

179 

180 http://www.example.com/path?access_token=h480djs93hd8 

181 

182 .. _`Bearer Token`: https://tools.ietf.org/html/rfc6750 

183 

184 :param token: 

185 :param uri: 

186 """ 

187 return add_params_to_uri(uri, [(('access_token', token))]) 

188 

189 

190def prepare_bearer_headers(token, headers=None): 

191 """Add a `Bearer Token`_ to the request URI. 

192 Recommended method of passing bearer tokens. 

193 

194 Authorization: Bearer h480djs93hd8 

195 

196 .. _`Bearer Token`: https://tools.ietf.org/html/rfc6750 

197 

198 :param token: 

199 :param headers: 

200 """ 

201 headers = headers or {} 

202 headers['Authorization'] = 'Bearer %s' % token 

203 return headers 

204 

205 

206def prepare_bearer_body(token, body=''): 

207 """Add a `Bearer Token`_ to the request body. 

208 

209 access_token=h480djs93hd8 

210 

211 .. _`Bearer Token`: https://tools.ietf.org/html/rfc6750 

212 

213 :param token: 

214 :param body: 

215 """ 

216 return add_params_to_qs(body, [(('access_token', token))]) 

217 

218 

219def random_token_generator(request, refresh_token=False): 

220 """ 

221 :param request: OAuthlib request. 

222 :type request: oauthlib.common.Request 

223 :param refresh_token: 

224 """ 

225 return common.generate_token() 

226 

227 

228def signed_token_generator(private_pem, **kwargs): 

229 """ 

230 :param private_pem: 

231 """ 

232 def signed_token_generator(request): 

233 request.claims = kwargs 

234 return common.generate_signed_token(private_pem, request) 

235 

236 return signed_token_generator 

237 

238 

239def get_token_from_header(request): 

240 """ 

241 Helper function to extract a token from the request header. 

242 

243 :param request: OAuthlib request. 

244 :type request: oauthlib.common.Request 

245 :return: Return the token or None if the Authorization header is malformed. 

246 """ 

247 token = None 

248 

249 if 'Authorization' in request.headers: 

250 split_header = request.headers.get('Authorization').split() 

251 if len(split_header) == 2 and split_header[0].lower() == 'bearer': 

252 token = split_header[1] 

253 else: 

254 token = request.access_token 

255 

256 return token 

257 

258 

259class TokenBase: 

260 __slots__ = () 

261 

262 def __call__(self, request, refresh_token=False): 

263 raise NotImplementedError('Subclasses must implement this method.') 

264 

265 def validate_request(self, request): 

266 """ 

267 :param request: OAuthlib request. 

268 :type request: oauthlib.common.Request 

269 """ 

270 raise NotImplementedError('Subclasses must implement this method.') 

271 

272 def estimate_type(self, request): 

273 """ 

274 :param request: OAuthlib request. 

275 :type request: oauthlib.common.Request 

276 """ 

277 raise NotImplementedError('Subclasses must implement this method.') 

278 

279 

280class BearerToken(TokenBase): 

281 __slots__ = ( 

282 'request_validator', 'token_generator', 

283 'refresh_token_generator', 'expires_in' 

284 ) 

285 

286 def __init__(self, request_validator=None, token_generator=None, 

287 expires_in=None, refresh_token_generator=None): 

288 self.request_validator = request_validator 

289 self.token_generator = token_generator or random_token_generator 

290 self.refresh_token_generator = ( 

291 refresh_token_generator or self.token_generator 

292 ) 

293 self.expires_in = expires_in or 3600 

294 

295 def create_token(self, request, refresh_token=False, **kwargs): 

296 """ 

297 Create a BearerToken, by default without refresh token. 

298 

299 :param request: OAuthlib request. 

300 :type request: oauthlib.common.Request 

301 :param refresh_token: 

302 """ 

303 if "save_token" in kwargs: 

304 warnings.warn("`save_token` has been deprecated, it was not called internally." 

305 "If you do, call `request_validator.save_token()` instead.", 

306 DeprecationWarning) 

307 

308 if callable(self.expires_in): 

309 expires_in = self.expires_in(request) 

310 else: 

311 expires_in = self.expires_in 

312 

313 request.expires_in = expires_in 

314 

315 token = { 

316 'access_token': self.token_generator(request), 

317 'expires_in': expires_in, 

318 'token_type': 'Bearer', 

319 } 

320 

321 # If provided, include - this is optional in some cases https://tools.ietf.org/html/rfc6749#section-3.3 but 

322 # there is currently no mechanism to coordinate issuing a token for only a subset of the requested scopes so 

323 # all tokens issued are for the entire set of requested scopes. 

324 if request.scopes is not None: 

325 token['scope'] = ' '.join(request.scopes) 

326 

327 if refresh_token: 

328 if (request.refresh_token and 

329 not self.request_validator.rotate_refresh_token(request)): 

330 token['refresh_token'] = request.refresh_token 

331 else: 

332 token['refresh_token'] = self.refresh_token_generator(request) 

333 

334 token.update(request.extra_credentials or {}) 

335 return OAuth2Token(token) 

336 

337 def validate_request(self, request): 

338 """ 

339 :param request: OAuthlib request. 

340 :type request: oauthlib.common.Request 

341 """ 

342 token = get_token_from_header(request) 

343 return self.request_validator.validate_bearer_token( 

344 token, request.scopes, request) 

345 

346 def estimate_type(self, request): 

347 """ 

348 :param request: OAuthlib request. 

349 :type request: oauthlib.common.Request 

350 """ 

351 if request.headers.get('Authorization', '').split(' ')[0].lower() == 'bearer': 

352 return 9 

353 elif request.access_token is not None: 

354 return 5 

355 else: 

356 return 0