1from urllib.parse import urlparse
2
3import logging
4
5from oauthlib.common import add_params_to_uri
6from oauthlib.common import urldecode as _urldecode
7from oauthlib.oauth1 import SIGNATURE_HMAC, SIGNATURE_RSA, SIGNATURE_TYPE_AUTH_HEADER
8import requests
9
10from . import OAuth1
11
12
13log = logging.getLogger(__name__)
14
15
16def urldecode(body):
17 """Parse query or json to python dictionary"""
18 try:
19 return _urldecode(body)
20 except Exception:
21 import json
22
23 return json.loads(body)
24
25
26class TokenRequestDenied(ValueError):
27 def __init__(self, message, response):
28 super(TokenRequestDenied, self).__init__(message)
29 self.response = response
30
31 @property
32 def status_code(self):
33 """For backwards-compatibility purposes"""
34 return self.response.status_code
35
36
37class TokenMissing(ValueError):
38 def __init__(self, message, response):
39 super(TokenMissing, self).__init__(message)
40 self.response = response
41
42
43class VerifierMissing(ValueError):
44 pass
45
46
47class OAuth1Session(requests.Session):
48 """Request signing and convenience methods for the oauth dance.
49
50 What is the difference between OAuth1Session and OAuth1?
51
52 OAuth1Session actually uses OAuth1 internally and its purpose is to assist
53 in the OAuth workflow through convenience methods to prepare authorization
54 URLs and parse the various token and redirection responses. It also provide
55 rudimentary validation of responses.
56
57 An example of the OAuth workflow using a basic CLI app and Twitter.
58
59 >>> # Credentials obtained during the registration.
60 >>> client_key = 'client key'
61 >>> client_secret = 'secret'
62 >>> callback_uri = 'https://127.0.0.1/callback'
63 >>>
64 >>> # Endpoints found in the OAuth provider API documentation
65 >>> request_token_url = 'https://api.twitter.com/oauth/request_token'
66 >>> authorization_url = 'https://api.twitter.com/oauth/authorize'
67 >>> access_token_url = 'https://api.twitter.com/oauth/access_token'
68 >>>
69 >>> oauth_session = OAuth1Session(client_key,client_secret=client_secret, callback_uri=callback_uri)
70 >>>
71 >>> # First step, fetch the request token.
72 >>> oauth_session.fetch_request_token(request_token_url)
73 {
74 'oauth_token': 'kjerht2309u',
75 'oauth_token_secret': 'lsdajfh923874',
76 }
77 >>>
78 >>> # Second step. Follow this link and authorize
79 >>> oauth_session.authorization_url(authorization_url)
80 'https://api.twitter.com/oauth/authorize?oauth_token=sdf0o9823sjdfsdf&oauth_callback=https%3A%2F%2F127.0.0.1%2Fcallback'
81 >>>
82 >>> # Third step. Fetch the access token
83 >>> redirect_response = input('Paste the full redirect URL here.')
84 >>> oauth_session.parse_authorization_response(redirect_response)
85 {
86 'oauth_token: 'kjerht2309u',
87 'oauth_token_secret: 'lsdajfh923874',
88 'oauth_verifier: 'w34o8967345',
89 }
90 >>> oauth_session.fetch_access_token(access_token_url)
91 {
92 'oauth_token': 'sdf0o9823sjdfsdf',
93 'oauth_token_secret': '2kjshdfp92i34asdasd',
94 }
95 >>> # Done. You can now make OAuth requests.
96 >>> status_url = 'http://api.twitter.com/1/statuses/update.json'
97 >>> new_status = {'status': 'hello world!'}
98 >>> oauth_session.post(status_url, data=new_status)
99 <Response [200]>
100 """
101
102 def __init__(
103 self,
104 client_key,
105 client_secret=None,
106 resource_owner_key=None,
107 resource_owner_secret=None,
108 callback_uri=None,
109 signature_method=SIGNATURE_HMAC,
110 signature_type=SIGNATURE_TYPE_AUTH_HEADER,
111 rsa_key=None,
112 verifier=None,
113 client_class=None,
114 force_include_body=False,
115 **kwargs
116 ):
117 """Construct the OAuth 1 session.
118
119 :param client_key: A client specific identifier.
120 :param client_secret: A client specific secret used to create HMAC and
121 plaintext signatures.
122 :param resource_owner_key: A resource owner key, also referred to as
123 request token or access token depending on
124 when in the workflow it is used.
125 :param resource_owner_secret: A resource owner secret obtained with
126 either a request or access token. Often
127 referred to as token secret.
128 :param callback_uri: The URL the user is redirect back to after
129 authorization.
130 :param signature_method: Signature methods determine how the OAuth
131 signature is created. The three options are
132 oauthlib.oauth1.SIGNATURE_HMAC (default),
133 oauthlib.oauth1.SIGNATURE_RSA and
134 oauthlib.oauth1.SIGNATURE_PLAIN.
135 :param signature_type: Signature type decides where the OAuth
136 parameters are added. Either in the
137 Authorization header (default) or to the URL
138 query parameters or the request body. Defined as
139 oauthlib.oauth1.SIGNATURE_TYPE_AUTH_HEADER,
140 oauthlib.oauth1.SIGNATURE_TYPE_QUERY and
141 oauthlib.oauth1.SIGNATURE_TYPE_BODY
142 respectively.
143 :param rsa_key: The private RSA key as a string. Can only be used with
144 signature_method=oauthlib.oauth1.SIGNATURE_RSA.
145 :param verifier: A verifier string to prove authorization was granted.
146 :param client_class: A subclass of `oauthlib.oauth1.Client` to use with
147 `requests_oauthlib.OAuth1` instead of the default
148 :param force_include_body: Always include the request body in the
149 signature creation.
150 :param **kwargs: Additional keyword arguments passed to `OAuth1`
151 """
152 super(OAuth1Session, self).__init__()
153 self._client = OAuth1(
154 client_key,
155 client_secret=client_secret,
156 resource_owner_key=resource_owner_key,
157 resource_owner_secret=resource_owner_secret,
158 callback_uri=callback_uri,
159 signature_method=signature_method,
160 signature_type=signature_type,
161 rsa_key=rsa_key,
162 verifier=verifier,
163 client_class=client_class,
164 force_include_body=force_include_body,
165 **kwargs
166 )
167 self.auth = self._client
168
169 @property
170 def token(self):
171 oauth_token = self._client.client.resource_owner_key
172 oauth_token_secret = self._client.client.resource_owner_secret
173 oauth_verifier = self._client.client.verifier
174
175 token_dict = {}
176 if oauth_token:
177 token_dict["oauth_token"] = oauth_token
178 if oauth_token_secret:
179 token_dict["oauth_token_secret"] = oauth_token_secret
180 if oauth_verifier:
181 token_dict["oauth_verifier"] = oauth_verifier
182
183 return token_dict
184
185 @token.setter
186 def token(self, value):
187 self._populate_attributes(value)
188
189 @property
190 def authorized(self):
191 """Boolean that indicates whether this session has an OAuth token
192 or not. If `self.authorized` is True, you can reasonably expect
193 OAuth-protected requests to the resource to succeed. If
194 `self.authorized` is False, you need the user to go through the OAuth
195 authentication dance before OAuth-protected requests to the resource
196 will succeed.
197 """
198 if self._client.client.signature_method == SIGNATURE_RSA:
199 # RSA only uses resource_owner_key
200 return bool(self._client.client.resource_owner_key)
201 else:
202 # other methods of authentication use all three pieces
203 return (
204 bool(self._client.client.client_secret)
205 and bool(self._client.client.resource_owner_key)
206 and bool(self._client.client.resource_owner_secret)
207 )
208
209 def authorization_url(self, url, request_token=None, **kwargs):
210 """Create an authorization URL by appending request_token and optional
211 kwargs to url.
212
213 This is the second step in the OAuth 1 workflow. The user should be
214 redirected to this authorization URL, grant access to you, and then
215 be redirected back to you. The redirection back can either be specified
216 during client registration or by supplying a callback URI per request.
217
218 :param url: The authorization endpoint URL.
219 :param request_token: The previously obtained request token.
220 :param kwargs: Optional parameters to append to the URL.
221 :returns: The authorization URL with new parameters embedded.
222
223 An example using a registered default callback URI.
224
225 >>> request_token_url = 'https://api.twitter.com/oauth/request_token'
226 >>> authorization_url = 'https://api.twitter.com/oauth/authorize'
227 >>> oauth_session = OAuth1Session('client-key', client_secret='secret')
228 >>> oauth_session.fetch_request_token(request_token_url)
229 {
230 'oauth_token': 'sdf0o9823sjdfsdf',
231 'oauth_token_secret': '2kjshdfp92i34asdasd',
232 }
233 >>> oauth_session.authorization_url(authorization_url)
234 'https://api.twitter.com/oauth/authorize?oauth_token=sdf0o9823sjdfsdf'
235 >>> oauth_session.authorization_url(authorization_url, foo='bar')
236 'https://api.twitter.com/oauth/authorize?oauth_token=sdf0o9823sjdfsdf&foo=bar'
237
238 An example using an explicit callback URI.
239
240 >>> request_token_url = 'https://api.twitter.com/oauth/request_token'
241 >>> authorization_url = 'https://api.twitter.com/oauth/authorize'
242 >>> oauth_session = OAuth1Session('client-key', client_secret='secret', callback_uri='https://127.0.0.1/callback')
243 >>> oauth_session.fetch_request_token(request_token_url)
244 {
245 'oauth_token': 'sdf0o9823sjdfsdf',
246 'oauth_token_secret': '2kjshdfp92i34asdasd',
247 }
248 >>> oauth_session.authorization_url(authorization_url)
249 'https://api.twitter.com/oauth/authorize?oauth_token=sdf0o9823sjdfsdf&oauth_callback=https%3A%2F%2F127.0.0.1%2Fcallback'
250 """
251 kwargs["oauth_token"] = request_token or self._client.client.resource_owner_key
252 log.debug("Adding parameters %s to url %s", kwargs, url)
253 return add_params_to_uri(url, kwargs.items())
254
255 def fetch_request_token(self, url, realm=None, **request_kwargs):
256 """Fetch a request token.
257
258 This is the first step in the OAuth 1 workflow. A request token is
259 obtained by making a signed post request to url. The token is then
260 parsed from the application/x-www-form-urlencoded response and ready
261 to be used to construct an authorization url.
262
263 :param url: The request token endpoint URL.
264 :param realm: A list of realms to request access to.
265 :param request_kwargs: Optional arguments passed to ''post''
266 function in ''requests.Session''
267 :returns: The response in dict format.
268
269 Note that a previously set callback_uri will be reset for your
270 convenience, or else signature creation will be incorrect on
271 consecutive requests.
272
273 >>> request_token_url = 'https://api.twitter.com/oauth/request_token'
274 >>> oauth_session = OAuth1Session('client-key', client_secret='secret')
275 >>> oauth_session.fetch_request_token(request_token_url)
276 {
277 'oauth_token': 'sdf0o9823sjdfsdf',
278 'oauth_token_secret': '2kjshdfp92i34asdasd',
279 }
280 """
281 self._client.client.realm = " ".join(realm) if realm else None
282 token = self._fetch_token(url, **request_kwargs)
283 log.debug("Resetting callback_uri and realm (not needed in next phase).")
284 self._client.client.callback_uri = None
285 self._client.client.realm = None
286 return token
287
288 def fetch_access_token(self, url, verifier=None, **request_kwargs):
289 """Fetch an access token.
290
291 This is the final step in the OAuth 1 workflow. An access token is
292 obtained using all previously obtained credentials, including the
293 verifier from the authorization step.
294
295 Note that a previously set verifier will be reset for your
296 convenience, or else signature creation will be incorrect on
297 consecutive requests.
298
299 >>> access_token_url = 'https://api.twitter.com/oauth/access_token'
300 >>> redirect_response = 'https://127.0.0.1/callback?oauth_token=kjerht2309uf&oauth_token_secret=lsdajfh923874&oauth_verifier=w34o8967345'
301 >>> oauth_session = OAuth1Session('client-key', client_secret='secret')
302 >>> oauth_session.parse_authorization_response(redirect_response)
303 {
304 'oauth_token: 'kjerht2309u',
305 'oauth_token_secret: 'lsdajfh923874',
306 'oauth_verifier: 'w34o8967345',
307 }
308 >>> oauth_session.fetch_access_token(access_token_url)
309 {
310 'oauth_token': 'sdf0o9823sjdfsdf',
311 'oauth_token_secret': '2kjshdfp92i34asdasd',
312 }
313 """
314 if verifier:
315 self._client.client.verifier = verifier
316 if not getattr(self._client.client, "verifier", None):
317 raise VerifierMissing("No client verifier has been set.")
318 token = self._fetch_token(url, **request_kwargs)
319 log.debug("Resetting verifier attribute, should not be used anymore.")
320 self._client.client.verifier = None
321 return token
322
323 def parse_authorization_response(self, url):
324 """Extract parameters from the post authorization redirect response URL.
325
326 :param url: The full URL that resulted from the user being redirected
327 back from the OAuth provider to you, the client.
328 :returns: A dict of parameters extracted from the URL.
329
330 >>> redirect_response = 'https://127.0.0.1/callback?oauth_token=kjerht2309uf&oauth_token_secret=lsdajfh923874&oauth_verifier=w34o8967345'
331 >>> oauth_session = OAuth1Session('client-key', client_secret='secret')
332 >>> oauth_session.parse_authorization_response(redirect_response)
333 {
334 'oauth_token: 'kjerht2309u',
335 'oauth_token_secret: 'lsdajfh923874',
336 'oauth_verifier: 'w34o8967345',
337 }
338 """
339 log.debug("Parsing token from query part of url %s", url)
340 token = dict(urldecode(urlparse(url).query))
341 log.debug("Updating internal client token attribute.")
342 self._populate_attributes(token)
343 self.token = token
344 return token
345
346 def _populate_attributes(self, token):
347 if "oauth_token" in token:
348 self._client.client.resource_owner_key = token["oauth_token"]
349 else:
350 raise TokenMissing(
351 "Response does not contain a token: {resp}".format(resp=token), token
352 )
353 if "oauth_token_secret" in token:
354 self._client.client.resource_owner_secret = token["oauth_token_secret"]
355 if "oauth_verifier" in token:
356 self._client.client.verifier = token["oauth_verifier"]
357
358 def _fetch_token(self, url, **request_kwargs):
359 log.debug("Fetching token from %s using client %s", url, self._client.client)
360 r = self.post(url, **request_kwargs)
361
362 if r.status_code >= 400:
363 error = "Token request failed with code %s, response was '%s'."
364 raise TokenRequestDenied(error % (r.status_code, r.text), r)
365
366 log.debug('Decoding token from response "%s"', r.text)
367 try:
368 token = dict(urldecode(r.text.strip()))
369 except ValueError as e:
370 error = (
371 "Unable to decode token from token response. "
372 "This is commonly caused by an unsuccessful request where"
373 " a non urlencoded error message is returned. "
374 "The decoding error was %s"
375 "" % e
376 )
377 raise ValueError(error)
378
379 log.debug("Obtained token %s", token)
380 log.debug("Updating internal client attributes from token data.")
381 self._populate_attributes(token)
382 self.token = token
383 return token
384
385 def rebuild_auth(self, prepared_request, response):
386 """
387 When being redirected we should always strip Authorization
388 header, since nonce may not be reused as per OAuth spec.
389 """
390 if "Authorization" in prepared_request.headers:
391 # If we get redirected to a new host, we should strip out
392 # any authentication headers.
393 prepared_request.headers.pop("Authorization", True)
394 prepared_request.prepare_auth(self.auth)
395 return