Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/requests_oauthlib/oauth2_session.py: 21%
180 statements
« prev ^ index » next coverage.py v7.3.2, created at 2023-12-08 06:22 +0000
« prev ^ index » next coverage.py v7.3.2, created at 2023-12-08 06:22 +0000
1from __future__ import unicode_literals
3import logging
5from oauthlib.common import generate_token, urldecode
6from oauthlib.oauth2 import WebApplicationClient, InsecureTransportError
7from oauthlib.oauth2 import LegacyApplicationClient
8from oauthlib.oauth2 import TokenExpiredError, is_secure_transport
9import requests
11log = logging.getLogger(__name__)
14class TokenUpdated(Warning):
15 def __init__(self, token):
16 super(TokenUpdated, self).__init__()
17 self.token = token
20class OAuth2Session(requests.Session):
21 """Versatile OAuth 2 extension to :class:`requests.Session`.
23 Supports any grant type adhering to :class:`oauthlib.oauth2.Client` spec
24 including the four core OAuth 2 grants.
26 Can be used to create authorization urls, fetch tokens and access protected
27 resources using the :class:`requests.Session` interface you are used to.
29 - :class:`oauthlib.oauth2.WebApplicationClient` (default): Authorization Code Grant
30 - :class:`oauthlib.oauth2.MobileApplicationClient`: Implicit Grant
31 - :class:`oauthlib.oauth2.LegacyApplicationClient`: Password Credentials Grant
32 - :class:`oauthlib.oauth2.BackendApplicationClient`: Client Credentials Grant
34 Note that the only time you will be using Implicit Grant from python is if
35 you are driving a user agent able to obtain URL fragments.
36 """
38 def __init__(
39 self,
40 client_id=None,
41 client=None,
42 auto_refresh_url=None,
43 auto_refresh_kwargs=None,
44 scope=None,
45 redirect_uri=None,
46 token=None,
47 state=None,
48 token_updater=None,
49 **kwargs
50 ):
51 """Construct a new OAuth 2 client session.
53 :param client_id: Client id obtained during registration
54 :param client: :class:`oauthlib.oauth2.Client` to be used. Default is
55 WebApplicationClient which is useful for any
56 hosted application but not mobile or desktop.
57 :param scope: List of scopes you wish to request access to
58 :param redirect_uri: Redirect URI you registered as callback
59 :param token: Token dictionary, must include access_token
60 and token_type.
61 :param state: State string used to prevent CSRF. This will be given
62 when creating the authorization url and must be supplied
63 when parsing the authorization response.
64 Can be either a string or a no argument callable.
65 :auto_refresh_url: Refresh token endpoint URL, must be HTTPS. Supply
66 this if you wish the client to automatically refresh
67 your access tokens.
68 :auto_refresh_kwargs: Extra arguments to pass to the refresh token
69 endpoint.
70 :token_updater: Method with one argument, token, to be used to update
71 your token database on automatic token refresh. If not
72 set a TokenUpdated warning will be raised when a token
73 has been refreshed. This warning will carry the token
74 in its token argument.
75 :param kwargs: Arguments to pass to the Session constructor.
76 """
77 super(OAuth2Session, self).__init__(**kwargs)
78 self._client = client or WebApplicationClient(client_id, token=token)
79 self.token = token or {}
80 self.scope = scope
81 self.redirect_uri = redirect_uri
82 self.state = state or generate_token
83 self._state = state
84 self.auto_refresh_url = auto_refresh_url
85 self.auto_refresh_kwargs = auto_refresh_kwargs or {}
86 self.token_updater = token_updater
88 # Ensure that requests doesn't do any automatic auth. See #278.
89 # The default behavior can be re-enabled by setting auth to None.
90 self.auth = lambda r: r
92 # Allow customizations for non compliant providers through various
93 # hooks to adjust requests and responses.
94 self.compliance_hook = {
95 "access_token_response": set(),
96 "refresh_token_response": set(),
97 "protected_request": set(),
98 }
100 def new_state(self):
101 """Generates a state string to be used in authorizations."""
102 try:
103 self._state = self.state()
104 log.debug("Generated new state %s.", self._state)
105 except TypeError:
106 self._state = self.state
107 log.debug("Re-using previously supplied state %s.", self._state)
108 return self._state
110 @property
111 def client_id(self):
112 return getattr(self._client, "client_id", None)
114 @client_id.setter
115 def client_id(self, value):
116 self._client.client_id = value
118 @client_id.deleter
119 def client_id(self):
120 del self._client.client_id
122 @property
123 def token(self):
124 return getattr(self._client, "token", None)
126 @token.setter
127 def token(self, value):
128 self._client.token = value
129 self._client.populate_token_attributes(value)
131 @property
132 def access_token(self):
133 return getattr(self._client, "access_token", None)
135 @access_token.setter
136 def access_token(self, value):
137 self._client.access_token = value
139 @access_token.deleter
140 def access_token(self):
141 del self._client.access_token
143 @property
144 def authorized(self):
145 """Boolean that indicates whether this session has an OAuth token
146 or not. If `self.authorized` is True, you can reasonably expect
147 OAuth-protected requests to the resource to succeed. If
148 `self.authorized` is False, you need the user to go through the OAuth
149 authentication dance before OAuth-protected requests to the resource
150 will succeed.
151 """
152 return bool(self.access_token)
154 def authorization_url(self, url, state=None, **kwargs):
155 """Form an authorization URL.
157 :param url: Authorization endpoint url, must be HTTPS.
158 :param state: An optional state string for CSRF protection. If not
159 given it will be generated for you.
160 :param kwargs: Extra parameters to include.
161 :return: authorization_url, state
162 """
163 state = state or self.new_state()
164 return (
165 self._client.prepare_request_uri(
166 url,
167 redirect_uri=self.redirect_uri,
168 scope=self.scope,
169 state=state,
170 **kwargs
171 ),
172 state,
173 )
175 def fetch_token(
176 self,
177 token_url,
178 code=None,
179 authorization_response=None,
180 body="",
181 auth=None,
182 username=None,
183 password=None,
184 method="POST",
185 force_querystring=False,
186 timeout=None,
187 headers=None,
188 verify=True,
189 proxies=None,
190 include_client_id=None,
191 client_secret=None,
192 cert=None,
193 **kwargs
194 ):
195 """Generic method for fetching an access token from the token endpoint.
197 If you are using the MobileApplicationClient you will want to use
198 `token_from_fragment` instead of `fetch_token`.
200 The current implementation enforces the RFC guidelines.
202 :param token_url: Token endpoint URL, must use HTTPS.
203 :param code: Authorization code (used by WebApplicationClients).
204 :param authorization_response: Authorization response URL, the callback
205 URL of the request back to you. Used by
206 WebApplicationClients instead of code.
207 :param body: Optional application/x-www-form-urlencoded body to add the
208 include in the token request. Prefer kwargs over body.
209 :param auth: An auth tuple or method as accepted by `requests`.
210 :param username: Username required by LegacyApplicationClients to appear
211 in the request body.
212 :param password: Password required by LegacyApplicationClients to appear
213 in the request body.
214 :param method: The HTTP method used to make the request. Defaults
215 to POST, but may also be GET. Other methods should
216 be added as needed.
217 :param force_querystring: If True, force the request body to be sent
218 in the querystring instead.
219 :param timeout: Timeout of the request in seconds.
220 :param headers: Dict to default request headers with.
221 :param verify: Verify SSL certificate.
222 :param proxies: The `proxies` argument is passed onto `requests`.
223 :param include_client_id: Should the request body include the
224 `client_id` parameter. Default is `None`,
225 which will attempt to autodetect. This can be
226 forced to always include (True) or never
227 include (False).
228 :param client_secret: The `client_secret` paired to the `client_id`.
229 This is generally required unless provided in the
230 `auth` tuple. If the value is `None`, it will be
231 omitted from the request, however if the value is
232 an empty string, an empty string will be sent.
233 :param cert: Client certificate to send for OAuth 2.0 Mutual-TLS Client
234 Authentication (draft-ietf-oauth-mtls). Can either be the
235 path of a file containing the private key and certificate or
236 a tuple of two filenames for certificate and key.
237 :param kwargs: Extra parameters to include in the token request.
238 :return: A token dict
239 """
240 if not is_secure_transport(token_url):
241 raise InsecureTransportError()
243 if not code and authorization_response:
244 self._client.parse_request_uri_response(
245 authorization_response, state=self._state
246 )
247 code = self._client.code
248 elif not code and isinstance(self._client, WebApplicationClient):
249 code = self._client.code
250 if not code:
251 raise ValueError(
252 "Please supply either code or " "authorization_response parameters."
253 )
255 # Earlier versions of this library build an HTTPBasicAuth header out of
256 # `username` and `password`. The RFC states, however these attributes
257 # must be in the request body and not the header.
258 # If an upstream server is not spec compliant and requires them to
259 # appear as an Authorization header, supply an explicit `auth` header
260 # to this function.
261 # This check will allow for empty strings, but not `None`.
262 #
263 # References
264 # 4.3.2 - Resource Owner Password Credentials Grant
265 # https://tools.ietf.org/html/rfc6749#section-4.3.2
267 if isinstance(self._client, LegacyApplicationClient):
268 if username is None:
269 raise ValueError(
270 "`LegacyApplicationClient` requires both the "
271 "`username` and `password` parameters."
272 )
273 if password is None:
274 raise ValueError(
275 "The required parameter `username` was supplied, "
276 "but `password` was not."
277 )
279 # merge username and password into kwargs for `prepare_request_body`
280 if username is not None:
281 kwargs["username"] = username
282 if password is not None:
283 kwargs["password"] = password
285 # is an auth explicitly supplied?
286 if auth is not None:
287 # if we're dealing with the default of `include_client_id` (None):
288 # we will assume the `auth` argument is for an RFC compliant server
289 # and we should not send the `client_id` in the body.
290 # This approach allows us to still force the client_id by submitting
291 # `include_client_id=True` along with an `auth` object.
292 if include_client_id is None:
293 include_client_id = False
295 # otherwise we may need to create an auth header
296 else:
297 # since we don't have an auth header, we MAY need to create one
298 # it is possible that we want to send the `client_id` in the body
299 # if so, `include_client_id` should be set to True
300 # otherwise, we will generate an auth header
301 if include_client_id is not True:
302 client_id = self.client_id
303 if client_id:
304 log.debug(
305 'Encoding `client_id` "%s" with `client_secret` '
306 "as Basic auth credentials.",
307 client_id,
308 )
309 client_secret = client_secret if client_secret is not None else ""
310 auth = requests.auth.HTTPBasicAuth(client_id, client_secret)
312 if include_client_id:
313 # this was pulled out of the params
314 # it needs to be passed into prepare_request_body
315 if client_secret is not None:
316 kwargs["client_secret"] = client_secret
318 body = self._client.prepare_request_body(
319 code=code,
320 body=body,
321 redirect_uri=self.redirect_uri,
322 include_client_id=include_client_id,
323 **kwargs
324 )
326 headers = headers or {
327 "Accept": "application/json",
328 "Content-Type": "application/x-www-form-urlencoded;charset=UTF-8",
329 }
330 self.token = {}
331 request_kwargs = {}
332 if method.upper() == "POST":
333 request_kwargs["params" if force_querystring else "data"] = dict(
334 urldecode(body)
335 )
336 elif method.upper() == "GET":
337 request_kwargs["params"] = dict(urldecode(body))
338 else:
339 raise ValueError("The method kwarg must be POST or GET.")
341 r = self.request(
342 method=method,
343 url=token_url,
344 timeout=timeout,
345 headers=headers,
346 auth=auth,
347 verify=verify,
348 proxies=proxies,
349 cert=cert,
350 **request_kwargs
351 )
353 log.debug("Request to fetch token completed with status %s.", r.status_code)
354 log.debug("Request url was %s", r.request.url)
355 log.debug("Request headers were %s", r.request.headers)
356 log.debug("Request body was %s", r.request.body)
357 log.debug("Response headers were %s and content %s.", r.headers, r.text)
358 log.debug(
359 "Invoking %d token response hooks.",
360 len(self.compliance_hook["access_token_response"]),
361 )
362 for hook in self.compliance_hook["access_token_response"]:
363 log.debug("Invoking hook %s.", hook)
364 r = hook(r)
366 self._client.parse_request_body_response(r.text, scope=self.scope)
367 self.token = self._client.token
368 log.debug("Obtained token %s.", self.token)
369 return self.token
371 def token_from_fragment(self, authorization_response):
372 """Parse token from the URI fragment, used by MobileApplicationClients.
374 :param authorization_response: The full URL of the redirect back to you
375 :return: A token dict
376 """
377 self._client.parse_request_uri_response(
378 authorization_response, state=self._state
379 )
380 self.token = self._client.token
381 return self.token
383 def refresh_token(
384 self,
385 token_url,
386 refresh_token=None,
387 body="",
388 auth=None,
389 timeout=None,
390 headers=None,
391 verify=True,
392 proxies=None,
393 **kwargs
394 ):
395 """Fetch a new access token using a refresh token.
397 :param token_url: The token endpoint, must be HTTPS.
398 :param refresh_token: The refresh_token to use.
399 :param body: Optional application/x-www-form-urlencoded body to add the
400 include in the token request. Prefer kwargs over body.
401 :param auth: An auth tuple or method as accepted by `requests`.
402 :param timeout: Timeout of the request in seconds.
403 :param headers: A dict of headers to be used by `requests`.
404 :param verify: Verify SSL certificate.
405 :param proxies: The `proxies` argument will be passed to `requests`.
406 :param kwargs: Extra parameters to include in the token request.
407 :return: A token dict
408 """
409 if not token_url:
410 raise ValueError("No token endpoint set for auto_refresh.")
412 if not is_secure_transport(token_url):
413 raise InsecureTransportError()
415 refresh_token = refresh_token or self.token.get("refresh_token")
417 log.debug(
418 "Adding auto refresh key word arguments %s.", self.auto_refresh_kwargs
419 )
420 kwargs.update(self.auto_refresh_kwargs)
421 body = self._client.prepare_refresh_body(
422 body=body, refresh_token=refresh_token, scope=self.scope, **kwargs
423 )
424 log.debug("Prepared refresh token request body %s", body)
426 if headers is None:
427 headers = {
428 "Accept": "application/json",
429 "Content-Type": ("application/x-www-form-urlencoded;charset=UTF-8"),
430 }
432 r = self.post(
433 token_url,
434 data=dict(urldecode(body)),
435 auth=auth,
436 timeout=timeout,
437 headers=headers,
438 verify=verify,
439 withhold_token=True,
440 proxies=proxies,
441 )
442 log.debug("Request to refresh token completed with status %s.", r.status_code)
443 log.debug("Response headers were %s and content %s.", r.headers, r.text)
444 log.debug(
445 "Invoking %d token response hooks.",
446 len(self.compliance_hook["refresh_token_response"]),
447 )
448 for hook in self.compliance_hook["refresh_token_response"]:
449 log.debug("Invoking hook %s.", hook)
450 r = hook(r)
452 self.token = self._client.parse_request_body_response(r.text, scope=self.scope)
453 if not "refresh_token" in self.token:
454 log.debug("No new refresh token given. Re-using old.")
455 self.token["refresh_token"] = refresh_token
456 return self.token
458 def request(
459 self,
460 method,
461 url,
462 data=None,
463 headers=None,
464 withhold_token=False,
465 client_id=None,
466 client_secret=None,
467 **kwargs
468 ):
469 """Intercept all requests and add the OAuth 2 token if present."""
470 if not is_secure_transport(url):
471 raise InsecureTransportError()
472 if self.token and not withhold_token:
473 log.debug(
474 "Invoking %d protected resource request hooks.",
475 len(self.compliance_hook["protected_request"]),
476 )
477 for hook in self.compliance_hook["protected_request"]:
478 log.debug("Invoking hook %s.", hook)
479 url, headers, data = hook(url, headers, data)
481 log.debug("Adding token %s to request.", self.token)
482 try:
483 url, headers, data = self._client.add_token(
484 url, http_method=method, body=data, headers=headers
485 )
486 # Attempt to retrieve and save new access token if expired
487 except TokenExpiredError:
488 if self.auto_refresh_url:
489 log.debug(
490 "Auto refresh is set, attempting to refresh at %s.",
491 self.auto_refresh_url,
492 )
494 # We mustn't pass auth twice.
495 auth = kwargs.pop("auth", None)
496 if client_id and client_secret and (auth is None):
497 log.debug(
498 'Encoding client_id "%s" with client_secret as Basic auth credentials.',
499 client_id,
500 )
501 auth = requests.auth.HTTPBasicAuth(client_id, client_secret)
502 token = self.refresh_token(
503 self.auto_refresh_url, auth=auth, **kwargs
504 )
505 if self.token_updater:
506 log.debug(
507 "Updating token to %s using %s.", token, self.token_updater
508 )
509 self.token_updater(token)
510 url, headers, data = self._client.add_token(
511 url, http_method=method, body=data, headers=headers
512 )
513 else:
514 raise TokenUpdated(token)
515 else:
516 raise
518 log.debug("Requesting url %s using method %s.", url, method)
519 log.debug("Supplying headers %s and data %s", headers, data)
520 log.debug("Passing through key word arguments %s.", kwargs)
521 return super(OAuth2Session, self).request(
522 method, url, headers=headers, data=data, **kwargs
523 )
525 def register_compliance_hook(self, hook_type, hook):
526 """Register a hook for request/response tweaking.
528 Available hooks are:
529 access_token_response invoked before token parsing.
530 refresh_token_response invoked before refresh token parsing.
531 protected_request invoked before making a request.
533 If you find a new hook is needed please send a GitHub PR request
534 or open an issue.
535 """
536 if hook_type not in self.compliance_hook:
537 raise ValueError(
538 "Hook type %s is not in %s.", hook_type, self.compliance_hook
539 )
540 self.compliance_hook[hook_type].add(hook)