Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/oauthlib/common.py: 31%
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1"""
2oauthlib.common
3~~~~~~~~~~~~~~
5This module provides data structures and utilities common
6to all implementations of OAuth.
7"""
8import collections
9import datetime
10import logging
11import re
12import time
13import urllib.parse as urlparse
14from urllib.parse import (
15 quote as _quote, unquote as _unquote, urlencode as _urlencode,
16)
18from . import get_debug
20try:
21 from secrets import SystemRandom, randbits
22except ImportError:
23 from random import SystemRandom, getrandbits as randbits
25UNICODE_ASCII_CHARACTER_SET = ('abcdefghijklmnopqrstuvwxyz'
26 'ABCDEFGHIJKLMNOPQRSTUVWXYZ'
27 '0123456789')
29CLIENT_ID_CHARACTER_SET = (r' !"#$%&\'()*+,-./0123456789:;<=>?@ABCDEFGHIJKLMN'
30 'OPQRSTUVWXYZ[\\]^_`abcdefghijklmnopqrstuvwxyz{|}')
32SANITIZE_PATTERN = re.compile(r'([^&;]*(?:password|token)[^=]*=)[^&;]+', re.IGNORECASE)
33INVALID_HEX_PATTERN = re.compile(r'%[^0-9A-Fa-f]|%[0-9A-Fa-f][^0-9A-Fa-f]')
35always_safe = ('ABCDEFGHIJKLMNOPQRSTUVWXYZ'
36 'abcdefghijklmnopqrstuvwxyz'
37 '0123456789_.-')
39log = logging.getLogger('oauthlib')
42# 'safe' must be bytes (Python 2.6 requires bytes, other versions allow either)
43def quote(s, safe=b'/'):
44 s = s.encode('utf-8') if isinstance(s, str) else s
45 s = _quote(s, safe)
46 # PY3 always returns unicode. PY2 may return either, depending on whether
47 # it had to modify the string.
48 if isinstance(s, bytes):
49 s = s.decode('utf-8')
50 return s
53def unquote(s):
54 s = _unquote(s)
55 # PY3 always returns unicode. PY2 seems to always return what you give it,
56 # which differs from quote's behavior. Just to be safe, make sure it is
57 # unicode before we return.
58 if isinstance(s, bytes):
59 s = s.decode('utf-8')
60 return s
63def urlencode(params):
64 utf8_params = encode_params_utf8(params)
65 urlencoded = _urlencode(utf8_params)
66 if isinstance(urlencoded, str):
67 return urlencoded
68 else:
69 return urlencoded.decode("utf-8")
72def encode_params_utf8(params):
73 """Ensures that all parameters in a list of 2-element tuples are encoded to
74 bytestrings using UTF-8
75 """
76 encoded = []
77 for k, v in params:
78 encoded.append((
79 k.encode('utf-8') if isinstance(k, str) else k,
80 v.encode('utf-8') if isinstance(v, str) else v))
81 return encoded
84def decode_params_utf8(params):
85 """Ensures that all parameters in a list of 2-element tuples are decoded to
86 unicode using UTF-8.
87 """
88 decoded = []
89 for k, v in params:
90 decoded.append((
91 k.decode('utf-8') if isinstance(k, bytes) else k,
92 v.decode('utf-8') if isinstance(v, bytes) else v))
93 return decoded
96urlencoded = set(always_safe) | set('=&;:%+~,*@!()/?\'$')
99def urldecode(query):
100 """Decode a query string in x-www-form-urlencoded format into a sequence
101 of two-element tuples.
103 Unlike urlparse.parse_qsl(..., strict_parsing=True) urldecode will enforce
104 correct formatting of the query string by validation. If validation fails
105 a ValueError will be raised. urllib.parse_qsl will only raise errors if
106 any of name-value pairs omits the equals sign.
107 """
108 # Check if query contains invalid characters
109 if query and not set(query) <= urlencoded:
110 error = ("Error trying to decode a non urlencoded string. "
111 "Found invalid characters: %s "
112 "in the string: '%s'. "
113 "Please ensure the request/response body is "
114 "x-www-form-urlencoded.")
115 raise ValueError(error % (set(query) - urlencoded, query))
117 # Check for correctly hex encoded values using a regular expression
118 # All encoded values begin with % followed by two hex characters
119 # correct = %00, %A0, %0A, %FF
120 # invalid = %G0, %5H, %PO
121 if INVALID_HEX_PATTERN.search(query):
122 raise ValueError('Invalid hex encoding in query string.')
124 # We want to allow queries such as "c2" whereas urlparse.parse_qsl
125 # with the strict_parsing flag will not.
126 params = urlparse.parse_qsl(query, keep_blank_values=True)
128 # unicode all the things
129 return decode_params_utf8(params)
132def extract_params(raw):
133 """Extract parameters and return them as a list of 2-tuples.
135 Will successfully extract parameters from urlencoded query strings,
136 dicts, or lists of 2-tuples. Empty strings/dicts/lists will return an
137 empty list of parameters. Any other input will result in a return
138 value of None.
139 """
140 if isinstance(raw, (bytes, str)):
141 try:
142 params = urldecode(raw)
143 except ValueError:
144 params = None
145 elif hasattr(raw, '__iter__'):
146 try:
147 dict(raw)
148 except ValueError:
149 params = None
150 except TypeError:
151 params = None
152 else:
153 params = list(raw.items() if isinstance(raw, dict) else raw)
154 params = decode_params_utf8(params)
155 else:
156 params = None
158 return params
161def generate_nonce():
162 """Generate pseudorandom nonce that is unlikely to repeat.
164 Per `section 3.3`_ of the OAuth 1 RFC 5849 spec.
165 Per `section 3.2.1`_ of the MAC Access Authentication spec.
167 A random 64-bit number is appended to the epoch timestamp for both
168 randomness and to decrease the likelihood of collisions.
170 .. _`section 3.2.1`: https://tools.ietf.org/html/draft-ietf-oauth-v2-http-mac-01#section-3.2.1
171 .. _`section 3.3`: https://tools.ietf.org/html/rfc5849#section-3.3
172 """
173 return str(str(randbits(64)) + generate_timestamp())
176def generate_timestamp():
177 """Get seconds since epoch (UTC).
179 Per `section 3.3`_ of the OAuth 1 RFC 5849 spec.
180 Per `section 3.2.1`_ of the MAC Access Authentication spec.
182 .. _`section 3.2.1`: https://tools.ietf.org/html/draft-ietf-oauth-v2-http-mac-01#section-3.2.1
183 .. _`section 3.3`: https://tools.ietf.org/html/rfc5849#section-3.3
184 """
185 return str(int(time.time()))
188def generate_token(length=30, chars=UNICODE_ASCII_CHARACTER_SET):
189 """Generates a non-guessable OAuth token
191 OAuth (1 and 2) does not specify the format of tokens except that they
192 should be strings of random characters. Tokens should not be guessable
193 and entropy when generating the random characters is important. Which is
194 why SystemRandom is used instead of the default random.choice method.
195 """
196 rand = SystemRandom()
197 return ''.join(rand.choice(chars) for x in range(length))
200def generate_signed_token(private_pem, request):
201 import jwt
203 now = datetime.datetime.utcnow()
205 claims = {
206 'scope': request.scope,
207 'exp': now + datetime.timedelta(seconds=request.expires_in)
208 }
210 claims.update(request.claims)
212 token = jwt.encode(claims, private_pem, 'RS256')
213 token = to_unicode(token, "UTF-8")
215 return token
218def verify_signed_token(public_pem, token):
219 import jwt
221 return jwt.decode(token, public_pem, algorithms=['RS256'])
224def generate_client_id(length=30, chars=CLIENT_ID_CHARACTER_SET):
225 """Generates an OAuth client_id
227 OAuth 2 specify the format of client_id in
228 https://tools.ietf.org/html/rfc6749#appendix-A.
229 """
230 return generate_token(length, chars)
233def add_params_to_qs(query, params):
234 """Extend a query with a list of two-tuples."""
235 if isinstance(params, dict):
236 params = params.items()
237 queryparams = urlparse.parse_qsl(query, keep_blank_values=True)
238 queryparams.extend(params)
239 return urlencode(queryparams)
242def add_params_to_uri(uri, params, fragment=False):
243 """Add a list of two-tuples to the uri query components."""
244 sch, net, path, par, query, fra = urlparse.urlparse(uri)
245 if fragment:
246 fra = add_params_to_qs(fra, params)
247 else:
248 query = add_params_to_qs(query, params)
249 return urlparse.urlunparse((sch, net, path, par, query, fra))
252def safe_string_equals(a, b):
253 """ Near-constant time string comparison.
255 Used in order to avoid timing attacks on sensitive information such
256 as secret keys during request verification (`rootLabs`_).
258 .. _`rootLabs`: http://rdist.root.org/2010/01/07/timing-independent-array-comparison/
260 """
261 if len(a) != len(b):
262 return False
264 result = 0
265 for x, y in zip(a, b):
266 result |= ord(x) ^ ord(y)
267 return result == 0
270def to_unicode(data, encoding='UTF-8'):
271 """Convert a number of different types of objects to unicode."""
272 if isinstance(data, str):
273 return data
275 if isinstance(data, bytes):
276 return str(data, encoding=encoding)
278 if hasattr(data, '__iter__'):
279 try:
280 dict(data)
281 except TypeError:
282 pass
283 except ValueError:
284 # Assume it's a one dimensional data structure
285 return (to_unicode(i, encoding) for i in data)
286 else:
287 # We support 2.6 which lacks dict comprehensions
288 if hasattr(data, 'items'):
289 data = data.items()
290 return {to_unicode(k, encoding): to_unicode(v, encoding) for k, v in data}
292 return data
295class CaseInsensitiveDict(dict):
297 """Basic case insensitive dict with strings only keys."""
299 proxy = {}
301 def __init__(self, data):
302 self.proxy = {k.lower(): k for k in data}
303 for k in data:
304 self[k] = data[k]
306 def __contains__(self, k):
307 return k.lower() in self.proxy
309 def __delitem__(self, k):
310 key = self.proxy[k.lower()]
311 super().__delitem__(key)
312 del self.proxy[k.lower()]
314 def __getitem__(self, k):
315 key = self.proxy[k.lower()]
316 return super().__getitem__(key)
318 def get(self, k, default=None):
319 return self[k] if k in self else default # noqa: SIM401
321 def __setitem__(self, k, v):
322 super().__setitem__(k, v)
323 self.proxy[k.lower()] = k
325 def update(self, *args, **kwargs):
326 super().update(*args, **kwargs)
327 for k in dict(*args, **kwargs):
328 self.proxy[k.lower()] = k
331class Request:
333 """A malleable representation of a signable HTTP request.
335 Body argument may contain any data, but parameters will only be decoded if
336 they are one of:
338 * urlencoded query string
339 * dict
340 * list of 2-tuples
342 Anything else will be treated as raw body data to be passed through
343 unmolested.
344 """
346 def __init__(self, uri, http_method='GET', body=None, headers=None,
347 encoding='utf-8'):
348 # Convert to unicode using encoding if given, else assume unicode
349 def encode(x):
350 return to_unicode(x, encoding) if encoding else x
352 self.uri = encode(uri)
353 self.http_method = encode(http_method)
354 self.headers = CaseInsensitiveDict(encode(headers or {}))
355 self.body = encode(body)
356 self.decoded_body = extract_params(self.body)
357 self.oauth_params = []
358 self.validator_log = {}
360 self._params = {
361 "access_token": None,
362 "client": None,
363 "client_id": None,
364 "client_secret": None,
365 "code": None,
366 "code_challenge": None,
367 "code_challenge_method": None,
368 "code_verifier": None,
369 "extra_credentials": None,
370 "grant_type": None,
371 "redirect_uri": None,
372 "refresh_token": None,
373 "request_token": None,
374 "response_type": None,
375 "scope": None,
376 "scopes": None,
377 "state": None,
378 "token": None,
379 "user": None,
380 "token_type_hint": None,
382 # OpenID Connect
383 "response_mode": None,
384 "nonce": None,
385 "display": None,
386 "prompt": None,
387 "claims": None,
388 "max_age": None,
389 "ui_locales": None,
390 "id_token_hint": None,
391 "login_hint": None,
392 "acr_values": None
393 }
394 self._params.update(dict(urldecode(self.uri_query)))
395 self._params.update(dict(self.decoded_body or []))
397 def __getattr__(self, name):
398 if name in self._params:
399 return self._params[name]
400 else:
401 raise AttributeError(name)
403 def __repr__(self):
404 if not get_debug():
405 return "<oauthlib.Request SANITIZED>"
406 body = self.body
407 headers = self.headers.copy()
408 if body:
409 body = SANITIZE_PATTERN.sub('\1<SANITIZED>', str(body))
410 if 'Authorization' in headers:
411 headers['Authorization'] = '<SANITIZED>'
412 return '<oauthlib.Request url="{}", http_method="{}", headers="{}", body="{}">'.format(
413 self.uri, self.http_method, headers, body)
415 @property
416 def uri_query(self):
417 return urlparse.urlparse(self.uri).query
419 @property
420 def uri_query_params(self):
421 if not self.uri_query:
422 return []
423 return urlparse.parse_qsl(self.uri_query, keep_blank_values=True,
424 strict_parsing=True)
426 @property
427 def duplicate_params(self):
428 seen_keys = collections.defaultdict(int)
429 all_keys = (p[0]
430 for p in (self.decoded_body or []) + self.uri_query_params)
431 for k in all_keys:
432 seen_keys[k] += 1
433 return [k for k, c in seen_keys.items() if c > 1]