Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/requests_oauthlib/oauth2_session.py: 21%

180 statements  

« prev     ^ index     » next       coverage.py v7.3.2, created at 2023-12-08 06:22 +0000

1from __future__ import unicode_literals 

2 

3import logging 

4 

5from oauthlib.common import generate_token, urldecode 

6from oauthlib.oauth2 import WebApplicationClient, InsecureTransportError 

7from oauthlib.oauth2 import LegacyApplicationClient 

8from oauthlib.oauth2 import TokenExpiredError, is_secure_transport 

9import requests 

10 

11log = logging.getLogger(__name__) 

12 

13 

14class TokenUpdated(Warning): 

15 def __init__(self, token): 

16 super(TokenUpdated, self).__init__() 

17 self.token = token 

18 

19 

20class OAuth2Session(requests.Session): 

21 """Versatile OAuth 2 extension to :class:`requests.Session`. 

22 

23 Supports any grant type adhering to :class:`oauthlib.oauth2.Client` spec 

24 including the four core OAuth 2 grants. 

25 

26 Can be used to create authorization urls, fetch tokens and access protected 

27 resources using the :class:`requests.Session` interface you are used to. 

28 

29 - :class:`oauthlib.oauth2.WebApplicationClient` (default): Authorization Code Grant 

30 - :class:`oauthlib.oauth2.MobileApplicationClient`: Implicit Grant 

31 - :class:`oauthlib.oauth2.LegacyApplicationClient`: Password Credentials Grant 

32 - :class:`oauthlib.oauth2.BackendApplicationClient`: Client Credentials Grant 

33 

34 Note that the only time you will be using Implicit Grant from python is if 

35 you are driving a user agent able to obtain URL fragments. 

36 """ 

37 

38 def __init__( 

39 self, 

40 client_id=None, 

41 client=None, 

42 auto_refresh_url=None, 

43 auto_refresh_kwargs=None, 

44 scope=None, 

45 redirect_uri=None, 

46 token=None, 

47 state=None, 

48 token_updater=None, 

49 **kwargs 

50 ): 

51 """Construct a new OAuth 2 client session. 

52 

53 :param client_id: Client id obtained during registration 

54 :param client: :class:`oauthlib.oauth2.Client` to be used. Default is 

55 WebApplicationClient which is useful for any 

56 hosted application but not mobile or desktop. 

57 :param scope: List of scopes you wish to request access to 

58 :param redirect_uri: Redirect URI you registered as callback 

59 :param token: Token dictionary, must include access_token 

60 and token_type. 

61 :param state: State string used to prevent CSRF. This will be given 

62 when creating the authorization url and must be supplied 

63 when parsing the authorization response. 

64 Can be either a string or a no argument callable. 

65 :auto_refresh_url: Refresh token endpoint URL, must be HTTPS. Supply 

66 this if you wish the client to automatically refresh 

67 your access tokens. 

68 :auto_refresh_kwargs: Extra arguments to pass to the refresh token 

69 endpoint. 

70 :token_updater: Method with one argument, token, to be used to update 

71 your token database on automatic token refresh. If not 

72 set a TokenUpdated warning will be raised when a token 

73 has been refreshed. This warning will carry the token 

74 in its token argument. 

75 :param kwargs: Arguments to pass to the Session constructor. 

76 """ 

77 super(OAuth2Session, self).__init__(**kwargs) 

78 self._client = client or WebApplicationClient(client_id, token=token) 

79 self.token = token or {} 

80 self.scope = scope 

81 self.redirect_uri = redirect_uri 

82 self.state = state or generate_token 

83 self._state = state 

84 self.auto_refresh_url = auto_refresh_url 

85 self.auto_refresh_kwargs = auto_refresh_kwargs or {} 

86 self.token_updater = token_updater 

87 

88 # Ensure that requests doesn't do any automatic auth. See #278. 

89 # The default behavior can be re-enabled by setting auth to None. 

90 self.auth = lambda r: r 

91 

92 # Allow customizations for non compliant providers through various 

93 # hooks to adjust requests and responses. 

94 self.compliance_hook = { 

95 "access_token_response": set(), 

96 "refresh_token_response": set(), 

97 "protected_request": set(), 

98 } 

99 

100 def new_state(self): 

101 """Generates a state string to be used in authorizations.""" 

102 try: 

103 self._state = self.state() 

104 log.debug("Generated new state %s.", self._state) 

105 except TypeError: 

106 self._state = self.state 

107 log.debug("Re-using previously supplied state %s.", self._state) 

108 return self._state 

109 

110 @property 

111 def client_id(self): 

112 return getattr(self._client, "client_id", None) 

113 

114 @client_id.setter 

115 def client_id(self, value): 

116 self._client.client_id = value 

117 

118 @client_id.deleter 

119 def client_id(self): 

120 del self._client.client_id 

121 

122 @property 

123 def token(self): 

124 return getattr(self._client, "token", None) 

125 

126 @token.setter 

127 def token(self, value): 

128 self._client.token = value 

129 self._client.populate_token_attributes(value) 

130 

131 @property 

132 def access_token(self): 

133 return getattr(self._client, "access_token", None) 

134 

135 @access_token.setter 

136 def access_token(self, value): 

137 self._client.access_token = value 

138 

139 @access_token.deleter 

140 def access_token(self): 

141 del self._client.access_token 

142 

143 @property 

144 def authorized(self): 

145 """Boolean that indicates whether this session has an OAuth token 

146 or not. If `self.authorized` is True, you can reasonably expect 

147 OAuth-protected requests to the resource to succeed. If 

148 `self.authorized` is False, you need the user to go through the OAuth 

149 authentication dance before OAuth-protected requests to the resource 

150 will succeed. 

151 """ 

152 return bool(self.access_token) 

153 

154 def authorization_url(self, url, state=None, **kwargs): 

155 """Form an authorization URL. 

156 

157 :param url: Authorization endpoint url, must be HTTPS. 

158 :param state: An optional state string for CSRF protection. If not 

159 given it will be generated for you. 

160 :param kwargs: Extra parameters to include. 

161 :return: authorization_url, state 

162 """ 

163 state = state or self.new_state() 

164 return ( 

165 self._client.prepare_request_uri( 

166 url, 

167 redirect_uri=self.redirect_uri, 

168 scope=self.scope, 

169 state=state, 

170 **kwargs 

171 ), 

172 state, 

173 ) 

174 

175 def fetch_token( 

176 self, 

177 token_url, 

178 code=None, 

179 authorization_response=None, 

180 body="", 

181 auth=None, 

182 username=None, 

183 password=None, 

184 method="POST", 

185 force_querystring=False, 

186 timeout=None, 

187 headers=None, 

188 verify=True, 

189 proxies=None, 

190 include_client_id=None, 

191 client_secret=None, 

192 cert=None, 

193 **kwargs 

194 ): 

195 """Generic method for fetching an access token from the token endpoint. 

196 

197 If you are using the MobileApplicationClient you will want to use 

198 `token_from_fragment` instead of `fetch_token`. 

199 

200 The current implementation enforces the RFC guidelines. 

201 

202 :param token_url: Token endpoint URL, must use HTTPS. 

203 :param code: Authorization code (used by WebApplicationClients). 

204 :param authorization_response: Authorization response URL, the callback 

205 URL of the request back to you. Used by 

206 WebApplicationClients instead of code. 

207 :param body: Optional application/x-www-form-urlencoded body to add the 

208 include in the token request. Prefer kwargs over body. 

209 :param auth: An auth tuple or method as accepted by `requests`. 

210 :param username: Username required by LegacyApplicationClients to appear 

211 in the request body. 

212 :param password: Password required by LegacyApplicationClients to appear 

213 in the request body. 

214 :param method: The HTTP method used to make the request. Defaults 

215 to POST, but may also be GET. Other methods should 

216 be added as needed. 

217 :param force_querystring: If True, force the request body to be sent 

218 in the querystring instead. 

219 :param timeout: Timeout of the request in seconds. 

220 :param headers: Dict to default request headers with. 

221 :param verify: Verify SSL certificate. 

222 :param proxies: The `proxies` argument is passed onto `requests`. 

223 :param include_client_id: Should the request body include the 

224 `client_id` parameter. Default is `None`, 

225 which will attempt to autodetect. This can be 

226 forced to always include (True) or never 

227 include (False). 

228 :param client_secret: The `client_secret` paired to the `client_id`. 

229 This is generally required unless provided in the 

230 `auth` tuple. If the value is `None`, it will be 

231 omitted from the request, however if the value is 

232 an empty string, an empty string will be sent. 

233 :param cert: Client certificate to send for OAuth 2.0 Mutual-TLS Client 

234 Authentication (draft-ietf-oauth-mtls). Can either be the 

235 path of a file containing the private key and certificate or 

236 a tuple of two filenames for certificate and key. 

237 :param kwargs: Extra parameters to include in the token request. 

238 :return: A token dict 

239 """ 

240 if not is_secure_transport(token_url): 

241 raise InsecureTransportError() 

242 

243 if not code and authorization_response: 

244 self._client.parse_request_uri_response( 

245 authorization_response, state=self._state 

246 ) 

247 code = self._client.code 

248 elif not code and isinstance(self._client, WebApplicationClient): 

249 code = self._client.code 

250 if not code: 

251 raise ValueError( 

252 "Please supply either code or " "authorization_response parameters." 

253 ) 

254 

255 # Earlier versions of this library build an HTTPBasicAuth header out of 

256 # `username` and `password`. The RFC states, however these attributes 

257 # must be in the request body and not the header. 

258 # If an upstream server is not spec compliant and requires them to 

259 # appear as an Authorization header, supply an explicit `auth` header 

260 # to this function. 

261 # This check will allow for empty strings, but not `None`. 

262 # 

263 # References 

264 # 4.3.2 - Resource Owner Password Credentials Grant 

265 # https://tools.ietf.org/html/rfc6749#section-4.3.2 

266 

267 if isinstance(self._client, LegacyApplicationClient): 

268 if username is None: 

269 raise ValueError( 

270 "`LegacyApplicationClient` requires both the " 

271 "`username` and `password` parameters." 

272 ) 

273 if password is None: 

274 raise ValueError( 

275 "The required parameter `username` was supplied, " 

276 "but `password` was not." 

277 ) 

278 

279 # merge username and password into kwargs for `prepare_request_body` 

280 if username is not None: 

281 kwargs["username"] = username 

282 if password is not None: 

283 kwargs["password"] = password 

284 

285 # is an auth explicitly supplied? 

286 if auth is not None: 

287 # if we're dealing with the default of `include_client_id` (None): 

288 # we will assume the `auth` argument is for an RFC compliant server 

289 # and we should not send the `client_id` in the body. 

290 # This approach allows us to still force the client_id by submitting 

291 # `include_client_id=True` along with an `auth` object. 

292 if include_client_id is None: 

293 include_client_id = False 

294 

295 # otherwise we may need to create an auth header 

296 else: 

297 # since we don't have an auth header, we MAY need to create one 

298 # it is possible that we want to send the `client_id` in the body 

299 # if so, `include_client_id` should be set to True 

300 # otherwise, we will generate an auth header 

301 if include_client_id is not True: 

302 client_id = self.client_id 

303 if client_id: 

304 log.debug( 

305 'Encoding `client_id` "%s" with `client_secret` ' 

306 "as Basic auth credentials.", 

307 client_id, 

308 ) 

309 client_secret = client_secret if client_secret is not None else "" 

310 auth = requests.auth.HTTPBasicAuth(client_id, client_secret) 

311 

312 if include_client_id: 

313 # this was pulled out of the params 

314 # it needs to be passed into prepare_request_body 

315 if client_secret is not None: 

316 kwargs["client_secret"] = client_secret 

317 

318 body = self._client.prepare_request_body( 

319 code=code, 

320 body=body, 

321 redirect_uri=self.redirect_uri, 

322 include_client_id=include_client_id, 

323 **kwargs 

324 ) 

325 

326 headers = headers or { 

327 "Accept": "application/json", 

328 "Content-Type": "application/x-www-form-urlencoded;charset=UTF-8", 

329 } 

330 self.token = {} 

331 request_kwargs = {} 

332 if method.upper() == "POST": 

333 request_kwargs["params" if force_querystring else "data"] = dict( 

334 urldecode(body) 

335 ) 

336 elif method.upper() == "GET": 

337 request_kwargs["params"] = dict(urldecode(body)) 

338 else: 

339 raise ValueError("The method kwarg must be POST or GET.") 

340 

341 r = self.request( 

342 method=method, 

343 url=token_url, 

344 timeout=timeout, 

345 headers=headers, 

346 auth=auth, 

347 verify=verify, 

348 proxies=proxies, 

349 cert=cert, 

350 **request_kwargs 

351 ) 

352 

353 log.debug("Request to fetch token completed with status %s.", r.status_code) 

354 log.debug("Request url was %s", r.request.url) 

355 log.debug("Request headers were %s", r.request.headers) 

356 log.debug("Request body was %s", r.request.body) 

357 log.debug("Response headers were %s and content %s.", r.headers, r.text) 

358 log.debug( 

359 "Invoking %d token response hooks.", 

360 len(self.compliance_hook["access_token_response"]), 

361 ) 

362 for hook in self.compliance_hook["access_token_response"]: 

363 log.debug("Invoking hook %s.", hook) 

364 r = hook(r) 

365 

366 self._client.parse_request_body_response(r.text, scope=self.scope) 

367 self.token = self._client.token 

368 log.debug("Obtained token %s.", self.token) 

369 return self.token 

370 

371 def token_from_fragment(self, authorization_response): 

372 """Parse token from the URI fragment, used by MobileApplicationClients. 

373 

374 :param authorization_response: The full URL of the redirect back to you 

375 :return: A token dict 

376 """ 

377 self._client.parse_request_uri_response( 

378 authorization_response, state=self._state 

379 ) 

380 self.token = self._client.token 

381 return self.token 

382 

383 def refresh_token( 

384 self, 

385 token_url, 

386 refresh_token=None, 

387 body="", 

388 auth=None, 

389 timeout=None, 

390 headers=None, 

391 verify=True, 

392 proxies=None, 

393 **kwargs 

394 ): 

395 """Fetch a new access token using a refresh token. 

396 

397 :param token_url: The token endpoint, must be HTTPS. 

398 :param refresh_token: The refresh_token to use. 

399 :param body: Optional application/x-www-form-urlencoded body to add the 

400 include in the token request. Prefer kwargs over body. 

401 :param auth: An auth tuple or method as accepted by `requests`. 

402 :param timeout: Timeout of the request in seconds. 

403 :param headers: A dict of headers to be used by `requests`. 

404 :param verify: Verify SSL certificate. 

405 :param proxies: The `proxies` argument will be passed to `requests`. 

406 :param kwargs: Extra parameters to include in the token request. 

407 :return: A token dict 

408 """ 

409 if not token_url: 

410 raise ValueError("No token endpoint set for auto_refresh.") 

411 

412 if not is_secure_transport(token_url): 

413 raise InsecureTransportError() 

414 

415 refresh_token = refresh_token or self.token.get("refresh_token") 

416 

417 log.debug( 

418 "Adding auto refresh key word arguments %s.", self.auto_refresh_kwargs 

419 ) 

420 kwargs.update(self.auto_refresh_kwargs) 

421 body = self._client.prepare_refresh_body( 

422 body=body, refresh_token=refresh_token, scope=self.scope, **kwargs 

423 ) 

424 log.debug("Prepared refresh token request body %s", body) 

425 

426 if headers is None: 

427 headers = { 

428 "Accept": "application/json", 

429 "Content-Type": ("application/x-www-form-urlencoded;charset=UTF-8"), 

430 } 

431 

432 r = self.post( 

433 token_url, 

434 data=dict(urldecode(body)), 

435 auth=auth, 

436 timeout=timeout, 

437 headers=headers, 

438 verify=verify, 

439 withhold_token=True, 

440 proxies=proxies, 

441 ) 

442 log.debug("Request to refresh token completed with status %s.", r.status_code) 

443 log.debug("Response headers were %s and content %s.", r.headers, r.text) 

444 log.debug( 

445 "Invoking %d token response hooks.", 

446 len(self.compliance_hook["refresh_token_response"]), 

447 ) 

448 for hook in self.compliance_hook["refresh_token_response"]: 

449 log.debug("Invoking hook %s.", hook) 

450 r = hook(r) 

451 

452 self.token = self._client.parse_request_body_response(r.text, scope=self.scope) 

453 if not "refresh_token" in self.token: 

454 log.debug("No new refresh token given. Re-using old.") 

455 self.token["refresh_token"] = refresh_token 

456 return self.token 

457 

458 def request( 

459 self, 

460 method, 

461 url, 

462 data=None, 

463 headers=None, 

464 withhold_token=False, 

465 client_id=None, 

466 client_secret=None, 

467 **kwargs 

468 ): 

469 """Intercept all requests and add the OAuth 2 token if present.""" 

470 if not is_secure_transport(url): 

471 raise InsecureTransportError() 

472 if self.token and not withhold_token: 

473 log.debug( 

474 "Invoking %d protected resource request hooks.", 

475 len(self.compliance_hook["protected_request"]), 

476 ) 

477 for hook in self.compliance_hook["protected_request"]: 

478 log.debug("Invoking hook %s.", hook) 

479 url, headers, data = hook(url, headers, data) 

480 

481 log.debug("Adding token %s to request.", self.token) 

482 try: 

483 url, headers, data = self._client.add_token( 

484 url, http_method=method, body=data, headers=headers 

485 ) 

486 # Attempt to retrieve and save new access token if expired 

487 except TokenExpiredError: 

488 if self.auto_refresh_url: 

489 log.debug( 

490 "Auto refresh is set, attempting to refresh at %s.", 

491 self.auto_refresh_url, 

492 ) 

493 

494 # We mustn't pass auth twice. 

495 auth = kwargs.pop("auth", None) 

496 if client_id and client_secret and (auth is None): 

497 log.debug( 

498 'Encoding client_id "%s" with client_secret as Basic auth credentials.', 

499 client_id, 

500 ) 

501 auth = requests.auth.HTTPBasicAuth(client_id, client_secret) 

502 token = self.refresh_token( 

503 self.auto_refresh_url, auth=auth, **kwargs 

504 ) 

505 if self.token_updater: 

506 log.debug( 

507 "Updating token to %s using %s.", token, self.token_updater 

508 ) 

509 self.token_updater(token) 

510 url, headers, data = self._client.add_token( 

511 url, http_method=method, body=data, headers=headers 

512 ) 

513 else: 

514 raise TokenUpdated(token) 

515 else: 

516 raise 

517 

518 log.debug("Requesting url %s using method %s.", url, method) 

519 log.debug("Supplying headers %s and data %s", headers, data) 

520 log.debug("Passing through key word arguments %s.", kwargs) 

521 return super(OAuth2Session, self).request( 

522 method, url, headers=headers, data=data, **kwargs 

523 ) 

524 

525 def register_compliance_hook(self, hook_type, hook): 

526 """Register a hook for request/response tweaking. 

527 

528 Available hooks are: 

529 access_token_response invoked before token parsing. 

530 refresh_token_response invoked before refresh token parsing. 

531 protected_request invoked before making a request. 

532 

533 If you find a new hook is needed please send a GitHub PR request 

534 or open an issue. 

535 """ 

536 if hook_type not in self.compliance_hook: 

537 raise ValueError( 

538 "Hook type %s is not in %s.", hook_type, self.compliance_hook 

539 ) 

540 self.compliance_hook[hook_type].add(hook)