Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/requests_oauthlib/oauth1_session.py: 32%

114 statements  

« prev     ^ index     » next       coverage.py v7.3.2, created at 2023-12-08 06:22 +0000

1from __future__ import unicode_literals 

2 

3try: 

4 from urlparse import urlparse 

5except ImportError: 

6 from urllib.parse import urlparse 

7 

8import logging 

9 

10from oauthlib.common import add_params_to_uri 

11from oauthlib.common import urldecode as _urldecode 

12from oauthlib.oauth1 import SIGNATURE_HMAC, SIGNATURE_RSA, SIGNATURE_TYPE_AUTH_HEADER 

13import requests 

14 

15from . import OAuth1 

16 

17 

18log = logging.getLogger(__name__) 

19 

20 

21def urldecode(body): 

22 """Parse query or json to python dictionary""" 

23 try: 

24 return _urldecode(body) 

25 except Exception: 

26 import json 

27 

28 return json.loads(body) 

29 

30 

31class TokenRequestDenied(ValueError): 

32 def __init__(self, message, response): 

33 super(TokenRequestDenied, self).__init__(message) 

34 self.response = response 

35 

36 @property 

37 def status_code(self): 

38 """For backwards-compatibility purposes""" 

39 return self.response.status_code 

40 

41 

42class TokenMissing(ValueError): 

43 def __init__(self, message, response): 

44 super(TokenMissing, self).__init__(message) 

45 self.response = response 

46 

47 

48class VerifierMissing(ValueError): 

49 pass 

50 

51 

52class OAuth1Session(requests.Session): 

53 """Request signing and convenience methods for the oauth dance. 

54 

55 What is the difference between OAuth1Session and OAuth1? 

56 

57 OAuth1Session actually uses OAuth1 internally and its purpose is to assist 

58 in the OAuth workflow through convenience methods to prepare authorization 

59 URLs and parse the various token and redirection responses. It also provide 

60 rudimentary validation of responses. 

61 

62 An example of the OAuth workflow using a basic CLI app and Twitter. 

63 

64 >>> # Credentials obtained during the registration. 

65 >>> client_key = 'client key' 

66 >>> client_secret = 'secret' 

67 >>> callback_uri = 'https://127.0.0.1/callback' 

68 >>> 

69 >>> # Endpoints found in the OAuth provider API documentation 

70 >>> request_token_url = 'https://api.twitter.com/oauth/request_token' 

71 >>> authorization_url = 'https://api.twitter.com/oauth/authorize' 

72 >>> access_token_url = 'https://api.twitter.com/oauth/access_token' 

73 >>> 

74 >>> oauth_session = OAuth1Session(client_key,client_secret=client_secret, callback_uri=callback_uri) 

75 >>> 

76 >>> # First step, fetch the request token. 

77 >>> oauth_session.fetch_request_token(request_token_url) 

78 { 

79 'oauth_token': 'kjerht2309u', 

80 'oauth_token_secret': 'lsdajfh923874', 

81 } 

82 >>> 

83 >>> # Second step. Follow this link and authorize 

84 >>> oauth_session.authorization_url(authorization_url) 

85 'https://api.twitter.com/oauth/authorize?oauth_token=sdf0o9823sjdfsdf&oauth_callback=https%3A%2F%2F127.0.0.1%2Fcallback' 

86 >>> 

87 >>> # Third step. Fetch the access token 

88 >>> redirect_response = raw_input('Paste the full redirect URL here.') 

89 >>> oauth_session.parse_authorization_response(redirect_response) 

90 { 

91 'oauth_token: 'kjerht2309u', 

92 'oauth_token_secret: 'lsdajfh923874', 

93 'oauth_verifier: 'w34o8967345', 

94 } 

95 >>> oauth_session.fetch_access_token(access_token_url) 

96 { 

97 'oauth_token': 'sdf0o9823sjdfsdf', 

98 'oauth_token_secret': '2kjshdfp92i34asdasd', 

99 } 

100 >>> # Done. You can now make OAuth requests. 

101 >>> status_url = 'http://api.twitter.com/1/statuses/update.json' 

102 >>> new_status = {'status': 'hello world!'} 

103 >>> oauth_session.post(status_url, data=new_status) 

104 <Response [200]> 

105 """ 

106 

107 def __init__( 

108 self, 

109 client_key, 

110 client_secret=None, 

111 resource_owner_key=None, 

112 resource_owner_secret=None, 

113 callback_uri=None, 

114 signature_method=SIGNATURE_HMAC, 

115 signature_type=SIGNATURE_TYPE_AUTH_HEADER, 

116 rsa_key=None, 

117 verifier=None, 

118 client_class=None, 

119 force_include_body=False, 

120 **kwargs 

121 ): 

122 """Construct the OAuth 1 session. 

123 

124 :param client_key: A client specific identifier. 

125 :param client_secret: A client specific secret used to create HMAC and 

126 plaintext signatures. 

127 :param resource_owner_key: A resource owner key, also referred to as 

128 request token or access token depending on 

129 when in the workflow it is used. 

130 :param resource_owner_secret: A resource owner secret obtained with 

131 either a request or access token. Often 

132 referred to as token secret. 

133 :param callback_uri: The URL the user is redirect back to after 

134 authorization. 

135 :param signature_method: Signature methods determine how the OAuth 

136 signature is created. The three options are 

137 oauthlib.oauth1.SIGNATURE_HMAC (default), 

138 oauthlib.oauth1.SIGNATURE_RSA and 

139 oauthlib.oauth1.SIGNATURE_PLAIN. 

140 :param signature_type: Signature type decides where the OAuth 

141 parameters are added. Either in the 

142 Authorization header (default) or to the URL 

143 query parameters or the request body. Defined as 

144 oauthlib.oauth1.SIGNATURE_TYPE_AUTH_HEADER, 

145 oauthlib.oauth1.SIGNATURE_TYPE_QUERY and 

146 oauthlib.oauth1.SIGNATURE_TYPE_BODY 

147 respectively. 

148 :param rsa_key: The private RSA key as a string. Can only be used with 

149 signature_method=oauthlib.oauth1.SIGNATURE_RSA. 

150 :param verifier: A verifier string to prove authorization was granted. 

151 :param client_class: A subclass of `oauthlib.oauth1.Client` to use with 

152 `requests_oauthlib.OAuth1` instead of the default 

153 :param force_include_body: Always include the request body in the 

154 signature creation. 

155 :param **kwargs: Additional keyword arguments passed to `OAuth1` 

156 """ 

157 super(OAuth1Session, self).__init__() 

158 self._client = OAuth1( 

159 client_key, 

160 client_secret=client_secret, 

161 resource_owner_key=resource_owner_key, 

162 resource_owner_secret=resource_owner_secret, 

163 callback_uri=callback_uri, 

164 signature_method=signature_method, 

165 signature_type=signature_type, 

166 rsa_key=rsa_key, 

167 verifier=verifier, 

168 client_class=client_class, 

169 force_include_body=force_include_body, 

170 **kwargs 

171 ) 

172 self.auth = self._client 

173 

174 @property 

175 def token(self): 

176 oauth_token = self._client.client.resource_owner_key 

177 oauth_token_secret = self._client.client.resource_owner_secret 

178 oauth_verifier = self._client.client.verifier 

179 

180 token_dict = {} 

181 if oauth_token: 

182 token_dict["oauth_token"] = oauth_token 

183 if oauth_token_secret: 

184 token_dict["oauth_token_secret"] = oauth_token_secret 

185 if oauth_verifier: 

186 token_dict["oauth_verifier"] = oauth_verifier 

187 

188 return token_dict 

189 

190 @token.setter 

191 def token(self, value): 

192 self._populate_attributes(value) 

193 

194 @property 

195 def authorized(self): 

196 """Boolean that indicates whether this session has an OAuth token 

197 or not. If `self.authorized` is True, you can reasonably expect 

198 OAuth-protected requests to the resource to succeed. If 

199 `self.authorized` is False, you need the user to go through the OAuth 

200 authentication dance before OAuth-protected requests to the resource 

201 will succeed. 

202 """ 

203 if self._client.client.signature_method == SIGNATURE_RSA: 

204 # RSA only uses resource_owner_key 

205 return bool(self._client.client.resource_owner_key) 

206 else: 

207 # other methods of authentication use all three pieces 

208 return ( 

209 bool(self._client.client.client_secret) 

210 and bool(self._client.client.resource_owner_key) 

211 and bool(self._client.client.resource_owner_secret) 

212 ) 

213 

214 def authorization_url(self, url, request_token=None, **kwargs): 

215 """Create an authorization URL by appending request_token and optional 

216 kwargs to url. 

217 

218 This is the second step in the OAuth 1 workflow. The user should be 

219 redirected to this authorization URL, grant access to you, and then 

220 be redirected back to you. The redirection back can either be specified 

221 during client registration or by supplying a callback URI per request. 

222 

223 :param url: The authorization endpoint URL. 

224 :param request_token: The previously obtained request token. 

225 :param kwargs: Optional parameters to append to the URL. 

226 :returns: The authorization URL with new parameters embedded. 

227 

228 An example using a registered default callback URI. 

229 

230 >>> request_token_url = 'https://api.twitter.com/oauth/request_token' 

231 >>> authorization_url = 'https://api.twitter.com/oauth/authorize' 

232 >>> oauth_session = OAuth1Session('client-key', client_secret='secret') 

233 >>> oauth_session.fetch_request_token(request_token_url) 

234 { 

235 'oauth_token': 'sdf0o9823sjdfsdf', 

236 'oauth_token_secret': '2kjshdfp92i34asdasd', 

237 } 

238 >>> oauth_session.authorization_url(authorization_url) 

239 'https://api.twitter.com/oauth/authorize?oauth_token=sdf0o9823sjdfsdf' 

240 >>> oauth_session.authorization_url(authorization_url, foo='bar') 

241 'https://api.twitter.com/oauth/authorize?oauth_token=sdf0o9823sjdfsdf&foo=bar' 

242 

243 An example using an explicit callback URI. 

244 

245 >>> request_token_url = 'https://api.twitter.com/oauth/request_token' 

246 >>> authorization_url = 'https://api.twitter.com/oauth/authorize' 

247 >>> oauth_session = OAuth1Session('client-key', client_secret='secret', callback_uri='https://127.0.0.1/callback') 

248 >>> oauth_session.fetch_request_token(request_token_url) 

249 { 

250 'oauth_token': 'sdf0o9823sjdfsdf', 

251 'oauth_token_secret': '2kjshdfp92i34asdasd', 

252 } 

253 >>> oauth_session.authorization_url(authorization_url) 

254 'https://api.twitter.com/oauth/authorize?oauth_token=sdf0o9823sjdfsdf&oauth_callback=https%3A%2F%2F127.0.0.1%2Fcallback' 

255 """ 

256 kwargs["oauth_token"] = request_token or self._client.client.resource_owner_key 

257 log.debug("Adding parameters %s to url %s", kwargs, url) 

258 return add_params_to_uri(url, kwargs.items()) 

259 

260 def fetch_request_token(self, url, realm=None, **request_kwargs): 

261 r"""Fetch a request token. 

262 

263 This is the first step in the OAuth 1 workflow. A request token is 

264 obtained by making a signed post request to url. The token is then 

265 parsed from the application/x-www-form-urlencoded response and ready 

266 to be used to construct an authorization url. 

267 

268 :param url: The request token endpoint URL. 

269 :param realm: A list of realms to request access to. 

270 :param \*\*request_kwargs: Optional arguments passed to ''post'' 

271 function in ''requests.Session'' 

272 :returns: The response in dict format. 

273 

274 Note that a previously set callback_uri will be reset for your 

275 convenience, or else signature creation will be incorrect on 

276 consecutive requests. 

277 

278 >>> request_token_url = 'https://api.twitter.com/oauth/request_token' 

279 >>> oauth_session = OAuth1Session('client-key', client_secret='secret') 

280 >>> oauth_session.fetch_request_token(request_token_url) 

281 { 

282 'oauth_token': 'sdf0o9823sjdfsdf', 

283 'oauth_token_secret': '2kjshdfp92i34asdasd', 

284 } 

285 """ 

286 self._client.client.realm = " ".join(realm) if realm else None 

287 token = self._fetch_token(url, **request_kwargs) 

288 log.debug("Resetting callback_uri and realm (not needed in next phase).") 

289 self._client.client.callback_uri = None 

290 self._client.client.realm = None 

291 return token 

292 

293 def fetch_access_token(self, url, verifier=None, **request_kwargs): 

294 """Fetch an access token. 

295 

296 This is the final step in the OAuth 1 workflow. An access token is 

297 obtained using all previously obtained credentials, including the 

298 verifier from the authorization step. 

299 

300 Note that a previously set verifier will be reset for your 

301 convenience, or else signature creation will be incorrect on 

302 consecutive requests. 

303 

304 >>> access_token_url = 'https://api.twitter.com/oauth/access_token' 

305 >>> redirect_response = 'https://127.0.0.1/callback?oauth_token=kjerht2309uf&oauth_token_secret=lsdajfh923874&oauth_verifier=w34o8967345' 

306 >>> oauth_session = OAuth1Session('client-key', client_secret='secret') 

307 >>> oauth_session.parse_authorization_response(redirect_response) 

308 { 

309 'oauth_token: 'kjerht2309u', 

310 'oauth_token_secret: 'lsdajfh923874', 

311 'oauth_verifier: 'w34o8967345', 

312 } 

313 >>> oauth_session.fetch_access_token(access_token_url) 

314 { 

315 'oauth_token': 'sdf0o9823sjdfsdf', 

316 'oauth_token_secret': '2kjshdfp92i34asdasd', 

317 } 

318 """ 

319 if verifier: 

320 self._client.client.verifier = verifier 

321 if not getattr(self._client.client, "verifier", None): 

322 raise VerifierMissing("No client verifier has been set.") 

323 token = self._fetch_token(url, **request_kwargs) 

324 log.debug("Resetting verifier attribute, should not be used anymore.") 

325 self._client.client.verifier = None 

326 return token 

327 

328 def parse_authorization_response(self, url): 

329 """Extract parameters from the post authorization redirect response URL. 

330 

331 :param url: The full URL that resulted from the user being redirected 

332 back from the OAuth provider to you, the client. 

333 :returns: A dict of parameters extracted from the URL. 

334 

335 >>> redirect_response = 'https://127.0.0.1/callback?oauth_token=kjerht2309uf&oauth_token_secret=lsdajfh923874&oauth_verifier=w34o8967345' 

336 >>> oauth_session = OAuth1Session('client-key', client_secret='secret') 

337 >>> oauth_session.parse_authorization_response(redirect_response) 

338 { 

339 'oauth_token: 'kjerht2309u', 

340 'oauth_token_secret: 'lsdajfh923874', 

341 'oauth_verifier: 'w34o8967345', 

342 } 

343 """ 

344 log.debug("Parsing token from query part of url %s", url) 

345 token = dict(urldecode(urlparse(url).query)) 

346 log.debug("Updating internal client token attribute.") 

347 self._populate_attributes(token) 

348 self.token = token 

349 return token 

350 

351 def _populate_attributes(self, token): 

352 if "oauth_token" in token: 

353 self._client.client.resource_owner_key = token["oauth_token"] 

354 else: 

355 raise TokenMissing( 

356 "Response does not contain a token: {resp}".format(resp=token), token 

357 ) 

358 if "oauth_token_secret" in token: 

359 self._client.client.resource_owner_secret = token["oauth_token_secret"] 

360 if "oauth_verifier" in token: 

361 self._client.client.verifier = token["oauth_verifier"] 

362 

363 def _fetch_token(self, url, **request_kwargs): 

364 log.debug("Fetching token from %s using client %s", url, self._client.client) 

365 r = self.post(url, **request_kwargs) 

366 

367 if r.status_code >= 400: 

368 error = "Token request failed with code %s, response was '%s'." 

369 raise TokenRequestDenied(error % (r.status_code, r.text), r) 

370 

371 log.debug('Decoding token from response "%s"', r.text) 

372 try: 

373 token = dict(urldecode(r.text.strip())) 

374 except ValueError as e: 

375 error = ( 

376 "Unable to decode token from token response. " 

377 "This is commonly caused by an unsuccessful request where" 

378 " a non urlencoded error message is returned. " 

379 "The decoding error was %s" 

380 "" % e 

381 ) 

382 raise ValueError(error) 

383 

384 log.debug("Obtained token %s", token) 

385 log.debug("Updating internal client attributes from token data.") 

386 self._populate_attributes(token) 

387 self.token = token 

388 return token 

389 

390 def rebuild_auth(self, prepared_request, response): 

391 """ 

392 When being redirected we should always strip Authorization 

393 header, since nonce may not be reused as per OAuth spec. 

394 """ 

395 if "Authorization" in prepared_request.headers: 

396 # If we get redirected to a new host, we should strip out 

397 # any authentication headers. 

398 prepared_request.headers.pop("Authorization", True) 

399 prepared_request.prepare_auth(self.auth) 

400 return