Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/requests_oauthlib/oauth1_session.py: 29%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

110 statements  

1from urllib.parse import urlparse 

2 

3import logging 

4 

5from oauthlib.common import add_params_to_uri 

6from oauthlib.common import urldecode as _urldecode 

7from oauthlib.oauth1 import SIGNATURE_HMAC, SIGNATURE_RSA, SIGNATURE_TYPE_AUTH_HEADER 

8import requests 

9 

10from . import OAuth1 

11 

12 

13log = logging.getLogger(__name__) 

14 

15 

16def urldecode(body): 

17 """Parse query or json to python dictionary""" 

18 try: 

19 return _urldecode(body) 

20 except Exception: 

21 import json 

22 

23 return json.loads(body) 

24 

25 

26class TokenRequestDenied(ValueError): 

27 def __init__(self, message, response): 

28 super(TokenRequestDenied, self).__init__(message) 

29 self.response = response 

30 

31 @property 

32 def status_code(self): 

33 """For backwards-compatibility purposes""" 

34 return self.response.status_code 

35 

36 

37class TokenMissing(ValueError): 

38 def __init__(self, message, response): 

39 super(TokenMissing, self).__init__(message) 

40 self.response = response 

41 

42 

43class VerifierMissing(ValueError): 

44 pass 

45 

46 

47class OAuth1Session(requests.Session): 

48 """Request signing and convenience methods for the oauth dance. 

49 

50 What is the difference between OAuth1Session and OAuth1? 

51 

52 OAuth1Session actually uses OAuth1 internally and its purpose is to assist 

53 in the OAuth workflow through convenience methods to prepare authorization 

54 URLs and parse the various token and redirection responses. It also provide 

55 rudimentary validation of responses. 

56 

57 An example of the OAuth workflow using a basic CLI app and Twitter. 

58 

59 >>> # Credentials obtained during the registration. 

60 >>> client_key = 'client key' 

61 >>> client_secret = 'secret' 

62 >>> callback_uri = 'https://127.0.0.1/callback' 

63 >>> 

64 >>> # Endpoints found in the OAuth provider API documentation 

65 >>> request_token_url = 'https://api.twitter.com/oauth/request_token' 

66 >>> authorization_url = 'https://api.twitter.com/oauth/authorize' 

67 >>> access_token_url = 'https://api.twitter.com/oauth/access_token' 

68 >>> 

69 >>> oauth_session = OAuth1Session(client_key,client_secret=client_secret, callback_uri=callback_uri) 

70 >>> 

71 >>> # First step, fetch the request token. 

72 >>> oauth_session.fetch_request_token(request_token_url) 

73 { 

74 'oauth_token': 'kjerht2309u', 

75 'oauth_token_secret': 'lsdajfh923874', 

76 } 

77 >>> 

78 >>> # Second step. Follow this link and authorize 

79 >>> oauth_session.authorization_url(authorization_url) 

80 'https://api.twitter.com/oauth/authorize?oauth_token=sdf0o9823sjdfsdf&oauth_callback=https%3A%2F%2F127.0.0.1%2Fcallback' 

81 >>> 

82 >>> # Third step. Fetch the access token 

83 >>> redirect_response = input('Paste the full redirect URL here.') 

84 >>> oauth_session.parse_authorization_response(redirect_response) 

85 { 

86 'oauth_token: 'kjerht2309u', 

87 'oauth_token_secret: 'lsdajfh923874', 

88 'oauth_verifier: 'w34o8967345', 

89 } 

90 >>> oauth_session.fetch_access_token(access_token_url) 

91 { 

92 'oauth_token': 'sdf0o9823sjdfsdf', 

93 'oauth_token_secret': '2kjshdfp92i34asdasd', 

94 } 

95 >>> # Done. You can now make OAuth requests. 

96 >>> status_url = 'http://api.twitter.com/1/statuses/update.json' 

97 >>> new_status = {'status': 'hello world!'} 

98 >>> oauth_session.post(status_url, data=new_status) 

99 <Response [200]> 

100 """ 

101 

102 def __init__( 

103 self, 

104 client_key, 

105 client_secret=None, 

106 resource_owner_key=None, 

107 resource_owner_secret=None, 

108 callback_uri=None, 

109 signature_method=SIGNATURE_HMAC, 

110 signature_type=SIGNATURE_TYPE_AUTH_HEADER, 

111 rsa_key=None, 

112 verifier=None, 

113 client_class=None, 

114 force_include_body=False, 

115 **kwargs 

116 ): 

117 """Construct the OAuth 1 session. 

118 

119 :param client_key: A client specific identifier. 

120 :param client_secret: A client specific secret used to create HMAC and 

121 plaintext signatures. 

122 :param resource_owner_key: A resource owner key, also referred to as 

123 request token or access token depending on 

124 when in the workflow it is used. 

125 :param resource_owner_secret: A resource owner secret obtained with 

126 either a request or access token. Often 

127 referred to as token secret. 

128 :param callback_uri: The URL the user is redirect back to after 

129 authorization. 

130 :param signature_method: Signature methods determine how the OAuth 

131 signature is created. The three options are 

132 oauthlib.oauth1.SIGNATURE_HMAC (default), 

133 oauthlib.oauth1.SIGNATURE_RSA and 

134 oauthlib.oauth1.SIGNATURE_PLAIN. 

135 :param signature_type: Signature type decides where the OAuth 

136 parameters are added. Either in the 

137 Authorization header (default) or to the URL 

138 query parameters or the request body. Defined as 

139 oauthlib.oauth1.SIGNATURE_TYPE_AUTH_HEADER, 

140 oauthlib.oauth1.SIGNATURE_TYPE_QUERY and 

141 oauthlib.oauth1.SIGNATURE_TYPE_BODY 

142 respectively. 

143 :param rsa_key: The private RSA key as a string. Can only be used with 

144 signature_method=oauthlib.oauth1.SIGNATURE_RSA. 

145 :param verifier: A verifier string to prove authorization was granted. 

146 :param client_class: A subclass of `oauthlib.oauth1.Client` to use with 

147 `requests_oauthlib.OAuth1` instead of the default 

148 :param force_include_body: Always include the request body in the 

149 signature creation. 

150 :param **kwargs: Additional keyword arguments passed to `OAuth1` 

151 """ 

152 super(OAuth1Session, self).__init__() 

153 self._client = OAuth1( 

154 client_key, 

155 client_secret=client_secret, 

156 resource_owner_key=resource_owner_key, 

157 resource_owner_secret=resource_owner_secret, 

158 callback_uri=callback_uri, 

159 signature_method=signature_method, 

160 signature_type=signature_type, 

161 rsa_key=rsa_key, 

162 verifier=verifier, 

163 client_class=client_class, 

164 force_include_body=force_include_body, 

165 **kwargs 

166 ) 

167 self.auth = self._client 

168 

169 @property 

170 def token(self): 

171 oauth_token = self._client.client.resource_owner_key 

172 oauth_token_secret = self._client.client.resource_owner_secret 

173 oauth_verifier = self._client.client.verifier 

174 

175 token_dict = {} 

176 if oauth_token: 

177 token_dict["oauth_token"] = oauth_token 

178 if oauth_token_secret: 

179 token_dict["oauth_token_secret"] = oauth_token_secret 

180 if oauth_verifier: 

181 token_dict["oauth_verifier"] = oauth_verifier 

182 

183 return token_dict 

184 

185 @token.setter 

186 def token(self, value): 

187 self._populate_attributes(value) 

188 

189 @property 

190 def authorized(self): 

191 """Boolean that indicates whether this session has an OAuth token 

192 or not. If `self.authorized` is True, you can reasonably expect 

193 OAuth-protected requests to the resource to succeed. If 

194 `self.authorized` is False, you need the user to go through the OAuth 

195 authentication dance before OAuth-protected requests to the resource 

196 will succeed. 

197 """ 

198 if self._client.client.signature_method == SIGNATURE_RSA: 

199 # RSA only uses resource_owner_key 

200 return bool(self._client.client.resource_owner_key) 

201 else: 

202 # other methods of authentication use all three pieces 

203 return ( 

204 bool(self._client.client.client_secret) 

205 and bool(self._client.client.resource_owner_key) 

206 and bool(self._client.client.resource_owner_secret) 

207 ) 

208 

209 def authorization_url(self, url, request_token=None, **kwargs): 

210 """Create an authorization URL by appending request_token and optional 

211 kwargs to url. 

212 

213 This is the second step in the OAuth 1 workflow. The user should be 

214 redirected to this authorization URL, grant access to you, and then 

215 be redirected back to you. The redirection back can either be specified 

216 during client registration or by supplying a callback URI per request. 

217 

218 :param url: The authorization endpoint URL. 

219 :param request_token: The previously obtained request token. 

220 :param kwargs: Optional parameters to append to the URL. 

221 :returns: The authorization URL with new parameters embedded. 

222 

223 An example using a registered default callback URI. 

224 

225 >>> request_token_url = 'https://api.twitter.com/oauth/request_token' 

226 >>> authorization_url = 'https://api.twitter.com/oauth/authorize' 

227 >>> oauth_session = OAuth1Session('client-key', client_secret='secret') 

228 >>> oauth_session.fetch_request_token(request_token_url) 

229 { 

230 'oauth_token': 'sdf0o9823sjdfsdf', 

231 'oauth_token_secret': '2kjshdfp92i34asdasd', 

232 } 

233 >>> oauth_session.authorization_url(authorization_url) 

234 'https://api.twitter.com/oauth/authorize?oauth_token=sdf0o9823sjdfsdf' 

235 >>> oauth_session.authorization_url(authorization_url, foo='bar') 

236 'https://api.twitter.com/oauth/authorize?oauth_token=sdf0o9823sjdfsdf&foo=bar' 

237 

238 An example using an explicit callback URI. 

239 

240 >>> request_token_url = 'https://api.twitter.com/oauth/request_token' 

241 >>> authorization_url = 'https://api.twitter.com/oauth/authorize' 

242 >>> oauth_session = OAuth1Session('client-key', client_secret='secret', callback_uri='https://127.0.0.1/callback') 

243 >>> oauth_session.fetch_request_token(request_token_url) 

244 { 

245 'oauth_token': 'sdf0o9823sjdfsdf', 

246 'oauth_token_secret': '2kjshdfp92i34asdasd', 

247 } 

248 >>> oauth_session.authorization_url(authorization_url) 

249 'https://api.twitter.com/oauth/authorize?oauth_token=sdf0o9823sjdfsdf&oauth_callback=https%3A%2F%2F127.0.0.1%2Fcallback' 

250 """ 

251 kwargs["oauth_token"] = request_token or self._client.client.resource_owner_key 

252 log.debug("Adding parameters %s to url %s", kwargs, url) 

253 return add_params_to_uri(url, kwargs.items()) 

254 

255 def fetch_request_token(self, url, realm=None, **request_kwargs): 

256 """Fetch a request token. 

257 

258 This is the first step in the OAuth 1 workflow. A request token is 

259 obtained by making a signed post request to url. The token is then 

260 parsed from the application/x-www-form-urlencoded response and ready 

261 to be used to construct an authorization url. 

262 

263 :param url: The request token endpoint URL. 

264 :param realm: A list of realms to request access to. 

265 :param request_kwargs: Optional arguments passed to ''post'' 

266 function in ''requests.Session'' 

267 :returns: The response in dict format. 

268 

269 Note that a previously set callback_uri will be reset for your 

270 convenience, or else signature creation will be incorrect on 

271 consecutive requests. 

272 

273 >>> request_token_url = 'https://api.twitter.com/oauth/request_token' 

274 >>> oauth_session = OAuth1Session('client-key', client_secret='secret') 

275 >>> oauth_session.fetch_request_token(request_token_url) 

276 { 

277 'oauth_token': 'sdf0o9823sjdfsdf', 

278 'oauth_token_secret': '2kjshdfp92i34asdasd', 

279 } 

280 """ 

281 self._client.client.realm = " ".join(realm) if realm else None 

282 token = self._fetch_token(url, **request_kwargs) 

283 log.debug("Resetting callback_uri and realm (not needed in next phase).") 

284 self._client.client.callback_uri = None 

285 self._client.client.realm = None 

286 return token 

287 

288 def fetch_access_token(self, url, verifier=None, **request_kwargs): 

289 """Fetch an access token. 

290 

291 This is the final step in the OAuth 1 workflow. An access token is 

292 obtained using all previously obtained credentials, including the 

293 verifier from the authorization step. 

294 

295 Note that a previously set verifier will be reset for your 

296 convenience, or else signature creation will be incorrect on 

297 consecutive requests. 

298 

299 >>> access_token_url = 'https://api.twitter.com/oauth/access_token' 

300 >>> redirect_response = 'https://127.0.0.1/callback?oauth_token=kjerht2309uf&oauth_token_secret=lsdajfh923874&oauth_verifier=w34o8967345' 

301 >>> oauth_session = OAuth1Session('client-key', client_secret='secret') 

302 >>> oauth_session.parse_authorization_response(redirect_response) 

303 { 

304 'oauth_token: 'kjerht2309u', 

305 'oauth_token_secret: 'lsdajfh923874', 

306 'oauth_verifier: 'w34o8967345', 

307 } 

308 >>> oauth_session.fetch_access_token(access_token_url) 

309 { 

310 'oauth_token': 'sdf0o9823sjdfsdf', 

311 'oauth_token_secret': '2kjshdfp92i34asdasd', 

312 } 

313 """ 

314 if verifier: 

315 self._client.client.verifier = verifier 

316 if not getattr(self._client.client, "verifier", None): 

317 raise VerifierMissing("No client verifier has been set.") 

318 token = self._fetch_token(url, **request_kwargs) 

319 log.debug("Resetting verifier attribute, should not be used anymore.") 

320 self._client.client.verifier = None 

321 return token 

322 

323 def parse_authorization_response(self, url): 

324 """Extract parameters from the post authorization redirect response URL. 

325 

326 :param url: The full URL that resulted from the user being redirected 

327 back from the OAuth provider to you, the client. 

328 :returns: A dict of parameters extracted from the URL. 

329 

330 >>> redirect_response = 'https://127.0.0.1/callback?oauth_token=kjerht2309uf&oauth_token_secret=lsdajfh923874&oauth_verifier=w34o8967345' 

331 >>> oauth_session = OAuth1Session('client-key', client_secret='secret') 

332 >>> oauth_session.parse_authorization_response(redirect_response) 

333 { 

334 'oauth_token: 'kjerht2309u', 

335 'oauth_token_secret: 'lsdajfh923874', 

336 'oauth_verifier: 'w34o8967345', 

337 } 

338 """ 

339 log.debug("Parsing token from query part of url %s", url) 

340 token = dict(urldecode(urlparse(url).query)) 

341 log.debug("Updating internal client token attribute.") 

342 self._populate_attributes(token) 

343 self.token = token 

344 return token 

345 

346 def _populate_attributes(self, token): 

347 if "oauth_token" in token: 

348 self._client.client.resource_owner_key = token["oauth_token"] 

349 else: 

350 raise TokenMissing( 

351 "Response does not contain a token: {resp}".format(resp=token), token 

352 ) 

353 if "oauth_token_secret" in token: 

354 self._client.client.resource_owner_secret = token["oauth_token_secret"] 

355 if "oauth_verifier" in token: 

356 self._client.client.verifier = token["oauth_verifier"] 

357 

358 def _fetch_token(self, url, **request_kwargs): 

359 log.debug("Fetching token from %s using client %s", url, self._client.client) 

360 r = self.post(url, **request_kwargs) 

361 

362 if r.status_code >= 400: 

363 error = "Token request failed with code %s, response was '%s'." 

364 raise TokenRequestDenied(error % (r.status_code, r.text), r) 

365 

366 log.debug('Decoding token from response "%s"', r.text) 

367 try: 

368 token = dict(urldecode(r.text.strip())) 

369 except ValueError as e: 

370 error = ( 

371 "Unable to decode token from token response. " 

372 "This is commonly caused by an unsuccessful request where" 

373 " a non urlencoded error message is returned. " 

374 "The decoding error was %s" 

375 "" % e 

376 ) 

377 raise ValueError(error) 

378 

379 log.debug("Obtained token %s", token) 

380 log.debug("Updating internal client attributes from token data.") 

381 self._populate_attributes(token) 

382 self.token = token 

383 return token 

384 

385 def rebuild_auth(self, prepared_request, response): 

386 """ 

387 When being redirected we should always strip Authorization 

388 header, since nonce may not be reused as per OAuth spec. 

389 """ 

390 if "Authorization" in prepared_request.headers: 

391 # If we get redirected to a new host, we should strip out 

392 # any authentication headers. 

393 prepared_request.headers.pop("Authorization", True) 

394 prepared_request.prepare_auth(self.auth) 

395 return