Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/oauthlib/oauth2/rfc6749/tokens.py: 27%
153 statements
« prev ^ index » next coverage.py v7.3.2, created at 2023-12-08 06:22 +0000
« prev ^ index » next coverage.py v7.3.2, created at 2023-12-08 06:22 +0000
1"""
2oauthlib.oauth2.rfc6749.tokens
3~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
5This module contains methods for adding two types of access tokens to requests.
7- Bearer https://tools.ietf.org/html/rfc6750
8- MAC https://tools.ietf.org/html/draft-ietf-oauth-v2-http-mac-01
9"""
10import hashlib
11import hmac
12import warnings
13from binascii import b2a_base64
14from urllib.parse import urlparse
16from oauthlib import common
17from oauthlib.common import add_params_to_qs, add_params_to_uri
19from . import utils
22class OAuth2Token(dict):
24 def __init__(self, params, old_scope=None):
25 super().__init__(params)
26 self._new_scope = None
27 if 'scope' in params and params['scope']:
28 self._new_scope = set(utils.scope_to_list(params['scope']))
29 if old_scope is not None:
30 self._old_scope = set(utils.scope_to_list(old_scope))
31 if self._new_scope is None:
32 # the rfc says that if the scope hasn't changed, it's optional
33 # in params so set the new scope to the old scope
34 self._new_scope = self._old_scope
35 else:
36 self._old_scope = self._new_scope
38 @property
39 def scope_changed(self):
40 return self._new_scope != self._old_scope
42 @property
43 def old_scope(self):
44 return utils.list_to_scope(self._old_scope)
46 @property
47 def old_scopes(self):
48 return list(self._old_scope)
50 @property
51 def scope(self):
52 return utils.list_to_scope(self._new_scope)
54 @property
55 def scopes(self):
56 return list(self._new_scope)
58 @property
59 def missing_scopes(self):
60 return list(self._old_scope - self._new_scope)
62 @property
63 def additional_scopes(self):
64 return list(self._new_scope - self._old_scope)
67def prepare_mac_header(token, uri, key, http_method,
68 nonce=None,
69 headers=None,
70 body=None,
71 ext='',
72 hash_algorithm='hmac-sha-1',
73 issue_time=None,
74 draft=0):
75 """Add an `MAC Access Authentication`_ signature to headers.
77 Unlike OAuth 1, this HMAC signature does not require inclusion of the
78 request payload/body, neither does it use a combination of client_secret
79 and token_secret but rather a mac_key provided together with the access
80 token.
82 Currently two algorithms are supported, "hmac-sha-1" and "hmac-sha-256",
83 `extension algorithms`_ are not supported.
85 Example MAC Authorization header, linebreaks added for clarity
87 Authorization: MAC id="h480djs93hd8",
88 nonce="1336363200:dj83hs9s",
89 mac="bhCQXTVyfj5cmA9uKkPFx1zeOXM="
91 .. _`MAC Access Authentication`: https://tools.ietf.org/html/draft-ietf-oauth-v2-http-mac-01
92 .. _`extension algorithms`: https://tools.ietf.org/html/draft-ietf-oauth-v2-http-mac-01#section-7.1
94 :param token:
95 :param uri: Request URI.
96 :param key: MAC given provided by token endpoint.
97 :param http_method: HTTP Request method.
98 :param nonce:
99 :param headers: Request headers as a dictionary.
100 :param body:
101 :param ext:
102 :param hash_algorithm: HMAC algorithm provided by token endpoint.
103 :param issue_time: Time when the MAC credentials were issued (datetime).
104 :param draft: MAC authentication specification version.
105 :return: headers dictionary with the authorization field added.
106 """
107 http_method = http_method.upper()
108 host, port = utils.host_from_uri(uri)
110 if hash_algorithm.lower() == 'hmac-sha-1':
111 h = hashlib.sha1
112 elif hash_algorithm.lower() == 'hmac-sha-256':
113 h = hashlib.sha256
114 else:
115 raise ValueError('unknown hash algorithm')
117 if draft == 0:
118 nonce = nonce or '{}:{}'.format(utils.generate_age(issue_time),
119 common.generate_nonce())
120 else:
121 ts = common.generate_timestamp()
122 nonce = common.generate_nonce()
124 sch, net, path, par, query, fra = urlparse(uri)
126 if query:
127 request_uri = path + '?' + query
128 else:
129 request_uri = path
131 # Hash the body/payload
132 if body is not None and draft == 0:
133 body = body.encode('utf-8')
134 bodyhash = b2a_base64(h(body).digest())[:-1].decode('utf-8')
135 else:
136 bodyhash = ''
138 # Create the normalized base string
139 base = []
140 if draft == 0:
141 base.append(nonce)
142 else:
143 base.append(ts)
144 base.append(nonce)
145 base.append(http_method.upper())
146 base.append(request_uri)
147 base.append(host)
148 base.append(port)
149 if draft == 0:
150 base.append(bodyhash)
151 base.append(ext or '')
152 base_string = '\n'.join(base) + '\n'
154 # hmac struggles with unicode strings - http://bugs.python.org/issue5285
155 if isinstance(key, str):
156 key = key.encode('utf-8')
157 sign = hmac.new(key, base_string.encode('utf-8'), h)
158 sign = b2a_base64(sign.digest())[:-1].decode('utf-8')
160 header = []
161 header.append('MAC id="%s"' % token)
162 if draft != 0:
163 header.append('ts="%s"' % ts)
164 header.append('nonce="%s"' % nonce)
165 if bodyhash:
166 header.append('bodyhash="%s"' % bodyhash)
167 if ext:
168 header.append('ext="%s"' % ext)
169 header.append('mac="%s"' % sign)
171 headers = headers or {}
172 headers['Authorization'] = ', '.join(header)
173 return headers
176def prepare_bearer_uri(token, uri):
177 """Add a `Bearer Token`_ to the request URI.
178 Not recommended, use only if client can't use authorization header or body.
180 http://www.example.com/path?access_token=h480djs93hd8
182 .. _`Bearer Token`: https://tools.ietf.org/html/rfc6750
184 :param token:
185 :param uri:
186 """
187 return add_params_to_uri(uri, [(('access_token', token))])
190def prepare_bearer_headers(token, headers=None):
191 """Add a `Bearer Token`_ to the request URI.
192 Recommended method of passing bearer tokens.
194 Authorization: Bearer h480djs93hd8
196 .. _`Bearer Token`: https://tools.ietf.org/html/rfc6750
198 :param token:
199 :param headers:
200 """
201 headers = headers or {}
202 headers['Authorization'] = 'Bearer %s' % token
203 return headers
206def prepare_bearer_body(token, body=''):
207 """Add a `Bearer Token`_ to the request body.
209 access_token=h480djs93hd8
211 .. _`Bearer Token`: https://tools.ietf.org/html/rfc6750
213 :param token:
214 :param body:
215 """
216 return add_params_to_qs(body, [(('access_token', token))])
219def random_token_generator(request, refresh_token=False):
220 """
221 :param request: OAuthlib request.
222 :type request: oauthlib.common.Request
223 :param refresh_token:
224 """
225 return common.generate_token()
228def signed_token_generator(private_pem, **kwargs):
229 """
230 :param private_pem:
231 """
232 def signed_token_generator(request):
233 request.claims = kwargs
234 return common.generate_signed_token(private_pem, request)
236 return signed_token_generator
239def get_token_from_header(request):
240 """
241 Helper function to extract a token from the request header.
243 :param request: OAuthlib request.
244 :type request: oauthlib.common.Request
245 :return: Return the token or None if the Authorization header is malformed.
246 """
247 token = None
249 if 'Authorization' in request.headers:
250 split_header = request.headers.get('Authorization').split()
251 if len(split_header) == 2 and split_header[0].lower() == 'bearer':
252 token = split_header[1]
253 else:
254 token = request.access_token
256 return token
259class TokenBase:
260 __slots__ = ()
262 def __call__(self, request, refresh_token=False):
263 raise NotImplementedError('Subclasses must implement this method.')
265 def validate_request(self, request):
266 """
267 :param request: OAuthlib request.
268 :type request: oauthlib.common.Request
269 """
270 raise NotImplementedError('Subclasses must implement this method.')
272 def estimate_type(self, request):
273 """
274 :param request: OAuthlib request.
275 :type request: oauthlib.common.Request
276 """
277 raise NotImplementedError('Subclasses must implement this method.')
280class BearerToken(TokenBase):
281 __slots__ = (
282 'request_validator', 'token_generator',
283 'refresh_token_generator', 'expires_in'
284 )
286 def __init__(self, request_validator=None, token_generator=None,
287 expires_in=None, refresh_token_generator=None):
288 self.request_validator = request_validator
289 self.token_generator = token_generator or random_token_generator
290 self.refresh_token_generator = (
291 refresh_token_generator or self.token_generator
292 )
293 self.expires_in = expires_in or 3600
295 def create_token(self, request, refresh_token=False, **kwargs):
296 """
297 Create a BearerToken, by default without refresh token.
299 :param request: OAuthlib request.
300 :type request: oauthlib.common.Request
301 :param refresh_token:
302 """
303 if "save_token" in kwargs:
304 warnings.warn("`save_token` has been deprecated, it was not called internally."
305 "If you do, call `request_validator.save_token()` instead.",
306 DeprecationWarning)
308 if callable(self.expires_in):
309 expires_in = self.expires_in(request)
310 else:
311 expires_in = self.expires_in
313 request.expires_in = expires_in
315 token = {
316 'access_token': self.token_generator(request),
317 'expires_in': expires_in,
318 'token_type': 'Bearer',
319 }
321 # If provided, include - this is optional in some cases https://tools.ietf.org/html/rfc6749#section-3.3 but
322 # there is currently no mechanism to coordinate issuing a token for only a subset of the requested scopes so
323 # all tokens issued are for the entire set of requested scopes.
324 if request.scopes is not None:
325 token['scope'] = ' '.join(request.scopes)
327 if refresh_token:
328 if (request.refresh_token and
329 not self.request_validator.rotate_refresh_token(request)):
330 token['refresh_token'] = request.refresh_token
331 else:
332 token['refresh_token'] = self.refresh_token_generator(request)
334 token.update(request.extra_credentials or {})
335 return OAuth2Token(token)
337 def validate_request(self, request):
338 """
339 :param request: OAuthlib request.
340 :type request: oauthlib.common.Request
341 """
342 token = get_token_from_header(request)
343 return self.request_validator.validate_bearer_token(
344 token, request.scopes, request)
346 def estimate_type(self, request):
347 """
348 :param request: OAuthlib request.
349 :type request: oauthlib.common.Request
350 """
351 if request.headers.get('Authorization', '').split(' ')[0].lower() == 'bearer':
352 return 9
353 elif request.access_token is not None:
354 return 5
355 else:
356 return 0