Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/oauthlib/common.py: 31%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

193 statements  

1""" 

2oauthlib.common 

3~~~~~~~~~~~~~~ 

4 

5This module provides data structures and utilities common 

6to all implementations of OAuth. 

7""" 

8import collections 

9import datetime 

10import logging 

11import re 

12import time 

13import urllib.parse as urlparse 

14from urllib.parse import ( 

15 quote as _quote, unquote as _unquote, urlencode as _urlencode, 

16) 

17 

18from . import get_debug 

19 

20try: 

21 from secrets import SystemRandom, randbits 

22except ImportError: 

23 from random import SystemRandom, getrandbits as randbits 

24 

25UNICODE_ASCII_CHARACTER_SET = ('abcdefghijklmnopqrstuvwxyz' 

26 'ABCDEFGHIJKLMNOPQRSTUVWXYZ' 

27 '0123456789') 

28 

29CLIENT_ID_CHARACTER_SET = (r' !"#$%&\'()*+,-./0123456789:;<=>?@ABCDEFGHIJKLMN' 

30 'OPQRSTUVWXYZ[\\]^_`abcdefghijklmnopqrstuvwxyz{|}') 

31 

32SANITIZE_PATTERN = re.compile(r'([^&;]*(?:password|token)[^=]*=)[^&;]+', re.IGNORECASE) 

33INVALID_HEX_PATTERN = re.compile(r'%[^0-9A-Fa-f]|%[0-9A-Fa-f][^0-9A-Fa-f]') 

34 

35always_safe = ('ABCDEFGHIJKLMNOPQRSTUVWXYZ' 

36 'abcdefghijklmnopqrstuvwxyz' 

37 '0123456789_.-') 

38 

39log = logging.getLogger('oauthlib') 

40 

41 

42# 'safe' must be bytes (Python 2.6 requires bytes, other versions allow either) 

43def quote(s, safe=b'/'): 

44 s = s.encode('utf-8') if isinstance(s, str) else s 

45 s = _quote(s, safe) 

46 # PY3 always returns unicode. PY2 may return either, depending on whether 

47 # it had to modify the string. 

48 if isinstance(s, bytes): 

49 s = s.decode('utf-8') 

50 return s 

51 

52 

53def unquote(s): 

54 s = _unquote(s) 

55 # PY3 always returns unicode. PY2 seems to always return what you give it, 

56 # which differs from quote's behavior. Just to be safe, make sure it is 

57 # unicode before we return. 

58 if isinstance(s, bytes): 

59 s = s.decode('utf-8') 

60 return s 

61 

62 

63def urlencode(params): 

64 utf8_params = encode_params_utf8(params) 

65 urlencoded = _urlencode(utf8_params) 

66 if isinstance(urlencoded, str): 

67 return urlencoded 

68 else: 

69 return urlencoded.decode("utf-8") 

70 

71 

72def encode_params_utf8(params): 

73 """Ensures that all parameters in a list of 2-element tuples are encoded to 

74 bytestrings using UTF-8 

75 """ 

76 encoded = [] 

77 for k, v in params: 

78 encoded.append(( 

79 k.encode('utf-8') if isinstance(k, str) else k, 

80 v.encode('utf-8') if isinstance(v, str) else v)) 

81 return encoded 

82 

83 

84def decode_params_utf8(params): 

85 """Ensures that all parameters in a list of 2-element tuples are decoded to 

86 unicode using UTF-8. 

87 """ 

88 decoded = [] 

89 for k, v in params: 

90 decoded.append(( 

91 k.decode('utf-8') if isinstance(k, bytes) else k, 

92 v.decode('utf-8') if isinstance(v, bytes) else v)) 

93 return decoded 

94 

95 

96urlencoded = set(always_safe) | set('=&;:%+~,*@!()/?\'$') 

97 

98 

99def urldecode(query): 

100 """Decode a query string in x-www-form-urlencoded format into a sequence 

101 of two-element tuples. 

102 

103 Unlike urlparse.parse_qsl(..., strict_parsing=True) urldecode will enforce 

104 correct formatting of the query string by validation. If validation fails 

105 a ValueError will be raised. urllib.parse_qsl will only raise errors if 

106 any of name-value pairs omits the equals sign. 

107 """ 

108 # Check if query contains invalid characters 

109 if query and not set(query) <= urlencoded: 

110 error = ("Error trying to decode a non urlencoded string. " 

111 "Found invalid characters: %s " 

112 "in the string: '%s'. " 

113 "Please ensure the request/response body is " 

114 "x-www-form-urlencoded.") 

115 raise ValueError(error % (set(query) - urlencoded, query)) 

116 

117 # Check for correctly hex encoded values using a regular expression 

118 # All encoded values begin with % followed by two hex characters 

119 # correct = %00, %A0, %0A, %FF 

120 # invalid = %G0, %5H, %PO 

121 if INVALID_HEX_PATTERN.search(query): 

122 raise ValueError('Invalid hex encoding in query string.') 

123 

124 # We want to allow queries such as "c2" whereas urlparse.parse_qsl 

125 # with the strict_parsing flag will not. 

126 params = urlparse.parse_qsl(query, keep_blank_values=True) 

127 

128 # unicode all the things 

129 return decode_params_utf8(params) 

130 

131 

132def extract_params(raw): 

133 """Extract parameters and return them as a list of 2-tuples. 

134 

135 Will successfully extract parameters from urlencoded query strings, 

136 dicts, or lists of 2-tuples. Empty strings/dicts/lists will return an 

137 empty list of parameters. Any other input will result in a return 

138 value of None. 

139 """ 

140 if isinstance(raw, (bytes, str)): 

141 try: 

142 params = urldecode(raw) 

143 except ValueError: 

144 params = None 

145 elif hasattr(raw, '__iter__'): 

146 try: 

147 dict(raw) 

148 except ValueError: 

149 params = None 

150 except TypeError: 

151 params = None 

152 else: 

153 params = list(raw.items() if isinstance(raw, dict) else raw) 

154 params = decode_params_utf8(params) 

155 else: 

156 params = None 

157 

158 return params 

159 

160 

161def generate_nonce(): 

162 """Generate pseudorandom nonce that is unlikely to repeat. 

163 

164 Per `section 3.3`_ of the OAuth 1 RFC 5849 spec. 

165 Per `section 3.2.1`_ of the MAC Access Authentication spec. 

166 

167 A random 64-bit number is appended to the epoch timestamp for both 

168 randomness and to decrease the likelihood of collisions. 

169 

170 .. _`section 3.2.1`: https://tools.ietf.org/html/draft-ietf-oauth-v2-http-mac-01#section-3.2.1 

171 .. _`section 3.3`: https://tools.ietf.org/html/rfc5849#section-3.3 

172 """ 

173 return str(str(randbits(64)) + generate_timestamp()) 

174 

175 

176def generate_timestamp(): 

177 """Get seconds since epoch (UTC). 

178 

179 Per `section 3.3`_ of the OAuth 1 RFC 5849 spec. 

180 Per `section 3.2.1`_ of the MAC Access Authentication spec. 

181 

182 .. _`section 3.2.1`: https://tools.ietf.org/html/draft-ietf-oauth-v2-http-mac-01#section-3.2.1 

183 .. _`section 3.3`: https://tools.ietf.org/html/rfc5849#section-3.3 

184 """ 

185 return str(int(time.time())) 

186 

187 

188def generate_token(length=30, chars=UNICODE_ASCII_CHARACTER_SET): 

189 """Generates a non-guessable OAuth token 

190 

191 OAuth (1 and 2) does not specify the format of tokens except that they 

192 should be strings of random characters. Tokens should not be guessable 

193 and entropy when generating the random characters is important. Which is 

194 why SystemRandom is used instead of the default random.choice method. 

195 """ 

196 rand = SystemRandom() 

197 return ''.join(rand.choice(chars) for x in range(length)) 

198 

199 

200def generate_signed_token(private_pem, request): 

201 import jwt 

202 

203 now = datetime.datetime.utcnow() 

204 

205 claims = { 

206 'scope': request.scope, 

207 'exp': now + datetime.timedelta(seconds=request.expires_in) 

208 } 

209 

210 claims.update(request.claims) 

211 

212 token = jwt.encode(claims, private_pem, 'RS256') 

213 token = to_unicode(token, "UTF-8") 

214 

215 return token 

216 

217 

218def verify_signed_token(public_pem, token): 

219 import jwt 

220 

221 return jwt.decode(token, public_pem, algorithms=['RS256']) 

222 

223 

224def generate_client_id(length=30, chars=CLIENT_ID_CHARACTER_SET): 

225 """Generates an OAuth client_id 

226 

227 OAuth 2 specify the format of client_id in 

228 https://tools.ietf.org/html/rfc6749#appendix-A. 

229 """ 

230 return generate_token(length, chars) 

231 

232 

233def add_params_to_qs(query, params): 

234 """Extend a query with a list of two-tuples.""" 

235 if isinstance(params, dict): 

236 params = params.items() 

237 queryparams = urlparse.parse_qsl(query, keep_blank_values=True) 

238 queryparams.extend(params) 

239 return urlencode(queryparams) 

240 

241 

242def add_params_to_uri(uri, params, fragment=False): 

243 """Add a list of two-tuples to the uri query components.""" 

244 sch, net, path, par, query, fra = urlparse.urlparse(uri) 

245 if fragment: 

246 fra = add_params_to_qs(fra, params) 

247 else: 

248 query = add_params_to_qs(query, params) 

249 return urlparse.urlunparse((sch, net, path, par, query, fra)) 

250 

251 

252def safe_string_equals(a, b): 

253 """ Near-constant time string comparison. 

254 

255 Used in order to avoid timing attacks on sensitive information such 

256 as secret keys during request verification (`rootLabs`_). 

257 

258 .. _`rootLabs`: http://rdist.root.org/2010/01/07/timing-independent-array-comparison/ 

259 

260 """ 

261 if len(a) != len(b): 

262 return False 

263 

264 result = 0 

265 for x, y in zip(a, b): 

266 result |= ord(x) ^ ord(y) 

267 return result == 0 

268 

269 

270def to_unicode(data, encoding='UTF-8'): 

271 """Convert a number of different types of objects to unicode.""" 

272 if isinstance(data, str): 

273 return data 

274 

275 if isinstance(data, bytes): 

276 return str(data, encoding=encoding) 

277 

278 if hasattr(data, '__iter__'): 

279 try: 

280 dict(data) 

281 except TypeError: 

282 pass 

283 except ValueError: 

284 # Assume it's a one dimensional data structure 

285 return (to_unicode(i, encoding) for i in data) 

286 else: 

287 # We support 2.6 which lacks dict comprehensions 

288 if hasattr(data, 'items'): 

289 data = data.items() 

290 return {to_unicode(k, encoding): to_unicode(v, encoding) for k, v in data} 

291 

292 return data 

293 

294 

295class CaseInsensitiveDict(dict): 

296 

297 """Basic case insensitive dict with strings only keys.""" 

298 

299 proxy = {} 

300 

301 def __init__(self, data): 

302 self.proxy = {k.lower(): k for k in data} 

303 for k in data: 

304 self[k] = data[k] 

305 

306 def __contains__(self, k): 

307 return k.lower() in self.proxy 

308 

309 def __delitem__(self, k): 

310 key = self.proxy[k.lower()] 

311 super().__delitem__(key) 

312 del self.proxy[k.lower()] 

313 

314 def __getitem__(self, k): 

315 key = self.proxy[k.lower()] 

316 return super().__getitem__(key) 

317 

318 def get(self, k, default=None): 

319 return self[k] if k in self else default # noqa: SIM401 

320 

321 def __setitem__(self, k, v): 

322 super().__setitem__(k, v) 

323 self.proxy[k.lower()] = k 

324 

325 def update(self, *args, **kwargs): 

326 super().update(*args, **kwargs) 

327 for k in dict(*args, **kwargs): 

328 self.proxy[k.lower()] = k 

329 

330 

331class Request: 

332 

333 """A malleable representation of a signable HTTP request. 

334 

335 Body argument may contain any data, but parameters will only be decoded if 

336 they are one of: 

337 

338 * urlencoded query string 

339 * dict 

340 * list of 2-tuples 

341 

342 Anything else will be treated as raw body data to be passed through 

343 unmolested. 

344 """ 

345 

346 def __init__(self, uri, http_method='GET', body=None, headers=None, 

347 encoding='utf-8'): 

348 # Convert to unicode using encoding if given, else assume unicode 

349 def encode(x): 

350 return to_unicode(x, encoding) if encoding else x 

351 

352 self.uri = encode(uri) 

353 self.http_method = encode(http_method) 

354 self.headers = CaseInsensitiveDict(encode(headers or {})) 

355 self.body = encode(body) 

356 self.decoded_body = extract_params(self.body) 

357 self.oauth_params = [] 

358 self.validator_log = {} 

359 

360 self._params = { 

361 "access_token": None, 

362 "client": None, 

363 "client_id": None, 

364 "client_secret": None, 

365 "code": None, 

366 "code_challenge": None, 

367 "code_challenge_method": None, 

368 "code_verifier": None, 

369 "extra_credentials": None, 

370 "grant_type": None, 

371 "redirect_uri": None, 

372 "refresh_token": None, 

373 "request_token": None, 

374 "response_type": None, 

375 "scope": None, 

376 "scopes": None, 

377 "state": None, 

378 "token": None, 

379 "user": None, 

380 "token_type_hint": None, 

381 

382 # OpenID Connect 

383 "response_mode": None, 

384 "nonce": None, 

385 "display": None, 

386 "prompt": None, 

387 "claims": None, 

388 "max_age": None, 

389 "ui_locales": None, 

390 "id_token_hint": None, 

391 "login_hint": None, 

392 "acr_values": None 

393 } 

394 self._params.update(dict(urldecode(self.uri_query))) 

395 self._params.update(dict(self.decoded_body or [])) 

396 

397 def __getattr__(self, name): 

398 if name in self._params: 

399 return self._params[name] 

400 else: 

401 raise AttributeError(name) 

402 

403 def __repr__(self): 

404 if not get_debug(): 

405 return "<oauthlib.Request SANITIZED>" 

406 body = self.body 

407 headers = self.headers.copy() 

408 if body: 

409 body = SANITIZE_PATTERN.sub('\1<SANITIZED>', str(body)) 

410 if 'Authorization' in headers: 

411 headers['Authorization'] = '<SANITIZED>' 

412 return '<oauthlib.Request url="{}", http_method="{}", headers="{}", body="{}">'.format( 

413 self.uri, self.http_method, headers, body) 

414 

415 @property 

416 def uri_query(self): 

417 return urlparse.urlparse(self.uri).query 

418 

419 @property 

420 def uri_query_params(self): 

421 if not self.uri_query: 

422 return [] 

423 return urlparse.parse_qsl(self.uri_query, keep_blank_values=True, 

424 strict_parsing=True) 

425 

426 @property 

427 def duplicate_params(self): 

428 seen_keys = collections.defaultdict(int) 

429 all_keys = (p[0] 

430 for p in (self.decoded_body or []) + self.uri_query_params) 

431 for k in all_keys: 

432 seen_keys[k] += 1 

433 return [k for k, c in seen_keys.items() if c > 1]