Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/oauthlib/oauth1/rfc5849/endpoints/base.py: 11%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

96 statements  

1# -*- coding: utf-8 -*- 

2""" 

3oauthlib.oauth1.rfc5849.endpoints.base 

4~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ 

5 

6This module is an implementation of various logic needed 

7for signing and checking OAuth 1.0 RFC 5849 requests. 

8""" 

9import time 

10 

11from oauthlib.common import CaseInsensitiveDict, Request, generate_token 

12 

13from .. import ( 

14 CONTENT_TYPE_FORM_URLENCODED, SIGNATURE_HMAC_SHA1, SIGNATURE_HMAC_SHA256, 

15 SIGNATURE_HMAC_SHA512, SIGNATURE_PLAINTEXT, SIGNATURE_RSA_SHA1, 

16 SIGNATURE_RSA_SHA256, SIGNATURE_RSA_SHA512, SIGNATURE_TYPE_AUTH_HEADER, 

17 SIGNATURE_TYPE_BODY, SIGNATURE_TYPE_QUERY, errors, signature, utils, 

18) 

19 

20 

21class BaseEndpoint: 

22 

23 def __init__(self, request_validator, token_generator=None): 

24 self.request_validator = request_validator 

25 self.token_generator = token_generator or generate_token 

26 

27 def _get_signature_type_and_params(self, request): 

28 """Extracts parameters from query, headers and body. Signature type 

29 is set to the source in which parameters were found. 

30 """ 

31 # Per RFC5849, only the Authorization header may contain the 'realm' 

32 # optional parameter. 

33 header_params = signature.collect_parameters(headers=request.headers, 

34 exclude_oauth_signature=False, with_realm=True) 

35 body_params = signature.collect_parameters(body=request.body, 

36 exclude_oauth_signature=False) 

37 query_params = signature.collect_parameters(uri_query=request.uri_query, 

38 exclude_oauth_signature=False) 

39 

40 params = [] 

41 params.extend(header_params) 

42 params.extend(body_params) 

43 params.extend(query_params) 

44 signature_types_with_oauth_params = list(filter(lambda s: s[2], ( 

45 (SIGNATURE_TYPE_AUTH_HEADER, params, 

46 utils.filter_oauth_params(header_params)), 

47 (SIGNATURE_TYPE_BODY, params, 

48 utils.filter_oauth_params(body_params)), 

49 (SIGNATURE_TYPE_QUERY, params, 

50 utils.filter_oauth_params(query_params)) 

51 ))) 

52 

53 if len(signature_types_with_oauth_params) > 1: 

54 found_types = [s[0] for s in signature_types_with_oauth_params] 

55 raise errors.InvalidRequestError( 

56 description=('oauth_ params must come from only 1 signature' 

57 'type but were found in %s', 

58 ', '.join(found_types))) 

59 

60 try: 

61 signature_type, params, oauth_params = signature_types_with_oauth_params[ 

62 0] 

63 except IndexError: 

64 raise errors.InvalidRequestError( 

65 description='Missing mandatory OAuth parameters.') 

66 

67 return signature_type, params, oauth_params 

68 

69 def _create_request(self, uri, http_method, body, headers): 

70 # Only include body data from x-www-form-urlencoded requests 

71 headers = CaseInsensitiveDict(headers or {}) 

72 if "Content-Type" in headers and CONTENT_TYPE_FORM_URLENCODED in headers["Content-Type"]: # noqa: SIM108 

73 request = Request(uri, http_method, body, headers) 

74 else: 

75 request = Request(uri, http_method, '', headers) 

76 signature_type, params, oauth_params = ( 

77 self._get_signature_type_and_params(request)) 

78 

79 # The server SHOULD return a 400 (Bad Request) status code when 

80 # receiving a request with duplicated protocol parameters. 

81 if len(dict(oauth_params)) != len(oauth_params): 

82 raise errors.InvalidRequestError( 

83 description='Duplicate OAuth1 entries.') 

84 

85 oauth_params = dict(oauth_params) 

86 request.signature = oauth_params.get('oauth_signature') 

87 request.client_key = oauth_params.get('oauth_consumer_key') 

88 request.resource_owner_key = oauth_params.get('oauth_token') 

89 request.nonce = oauth_params.get('oauth_nonce') 

90 request.timestamp = oauth_params.get('oauth_timestamp') 

91 request.redirect_uri = oauth_params.get('oauth_callback') 

92 request.verifier = oauth_params.get('oauth_verifier') 

93 request.signature_method = oauth_params.get('oauth_signature_method') 

94 request.realm = dict(params).get('realm') 

95 request.oauth_params = oauth_params 

96 

97 # Parameters to Client depend on signature method which may vary 

98 # for each request. Note that HMAC-SHA1 and PLAINTEXT share parameters 

99 request.params = [(k, v) for k, v in params if k != "oauth_signature"] 

100 

101 if 'realm' in request.headers.get('Authorization', ''): 

102 request.params = [(k, v) 

103 for k, v in request.params if k != "realm"] 

104 

105 return request 

106 

107 def _check_transport_security(self, request): 

108 # TODO: move into oauthlib.common from oauth2.utils 

109 if (self.request_validator.enforce_ssl and 

110 not request.uri.lower().startswith("https://")): 

111 raise errors.InsecureTransportError() 

112 

113 def _check_mandatory_parameters(self, request): 

114 # The server SHOULD return a 400 (Bad Request) status code when 

115 # receiving a request with missing parameters. 

116 if not all((request.signature, request.client_key, 

117 request.nonce, request.timestamp, 

118 request.signature_method)): 

119 raise errors.InvalidRequestError( 

120 description='Missing mandatory OAuth parameters.') 

121 

122 # OAuth does not mandate a particular signature method, as each 

123 # implementation can have its own unique requirements. Servers are 

124 # free to implement and document their own custom methods. 

125 # Recommending any particular method is beyond the scope of this 

126 # specification. Implementers should review the Security 

127 # Considerations section (`Section 4`_) before deciding on which 

128 # method to support. 

129 # .. _`Section 4`: https://tools.ietf.org/html/rfc5849#section-4 

130 if (request.signature_method not in self.request_validator.allowed_signature_methods): 

131 raise errors.InvalidSignatureMethodError( 

132 description="Invalid signature, {} not in {!r}.".format( 

133 request.signature_method, 

134 self.request_validator.allowed_signature_methods)) 

135 

136 # Servers receiving an authenticated request MUST validate it by: 

137 # If the "oauth_version" parameter is present, ensuring its value is 

138 # "1.0". 

139 if ('oauth_version' in request.oauth_params and 

140 request.oauth_params['oauth_version'] != '1.0'): 

141 raise errors.InvalidRequestError( 

142 description='Invalid OAuth version.') 

143 

144 # The timestamp value MUST be a positive integer. Unless otherwise 

145 # specified by the server's documentation, the timestamp is expressed 

146 # in the number of seconds since January 1, 1970 00:00:00 GMT. 

147 if len(request.timestamp) != 10: 

148 raise errors.InvalidRequestError( 

149 description='Invalid timestamp size') 

150 

151 try: 

152 ts = int(request.timestamp) 

153 

154 except ValueError: 

155 raise errors.InvalidRequestError( 

156 description='Timestamp must be an integer.') 

157 

158 else: 

159 # To avoid the need to retain an infinite number of nonce values for 

160 # future checks, servers MAY choose to restrict the time period after 

161 # which a request with an old timestamp is rejected. 

162 if abs(time.time() - ts) > self.request_validator.timestamp_lifetime: 

163 raise errors.InvalidRequestError( 

164 description=('Timestamp given is invalid, differ from ' 

165 'allowed by over %s seconds.' % ( 

166 self.request_validator.timestamp_lifetime))) 

167 

168 # Provider specific validation of parameters, used to enforce 

169 # restrictions such as character set and length. 

170 if not self.request_validator.check_client_key(request.client_key): 

171 raise errors.InvalidRequestError( 

172 description='Invalid client key format.') 

173 

174 if not self.request_validator.check_nonce(request.nonce): 

175 raise errors.InvalidRequestError( 

176 description='Invalid nonce format.') 

177 

178 def _check_signature(self, request, is_token_request=False): 

179 # ---- RSA Signature verification ---- 

180 if request.signature_method in {SIGNATURE_RSA_SHA1, SIGNATURE_RSA_SHA256, SIGNATURE_RSA_SHA512}: 

181 # RSA-based signature method 

182 

183 # The server verifies the signature per `[RFC3447] section 8.2.2`_ 

184 # .. _`[RFC3447] section 8.2.2`: https://tools.ietf.org/html/rfc3447#section-8.2.1 

185 

186 rsa_key = self.request_validator.get_rsa_key( 

187 request.client_key, request) 

188 

189 if request.signature_method == SIGNATURE_RSA_SHA1: 

190 valid_signature = signature.verify_rsa_sha1(request, rsa_key) 

191 elif request.signature_method == SIGNATURE_RSA_SHA256: 

192 valid_signature = signature.verify_rsa_sha256(request, rsa_key) 

193 elif request.signature_method == SIGNATURE_RSA_SHA512: 

194 valid_signature = signature.verify_rsa_sha512(request, rsa_key) 

195 else: 

196 valid_signature = False 

197 

198 # ---- HMAC or Plaintext Signature verification ---- 

199 else: 

200 # Non-RSA based signature method 

201 

202 # Servers receiving an authenticated request MUST validate it by: 

203 # Recalculating the request signature independently as described in 

204 # `Section 3.4`_ and comparing it to the value received from the 

205 # client via the "oauth_signature" parameter. 

206 # .. _`Section 3.4`: https://tools.ietf.org/html/rfc5849#section-3.4 

207 

208 client_secret = self.request_validator.get_client_secret( 

209 request.client_key, request) 

210 

211 resource_owner_secret = None 

212 if request.resource_owner_key: 

213 if is_token_request: 

214 resource_owner_secret = \ 

215 self.request_validator.get_request_token_secret( 

216 request.client_key, request.resource_owner_key, 

217 request) 

218 else: 

219 resource_owner_secret = \ 

220 self.request_validator.get_access_token_secret( 

221 request.client_key, request.resource_owner_key, 

222 request) 

223 

224 if request.signature_method == SIGNATURE_HMAC_SHA1: 

225 valid_signature = signature.verify_hmac_sha1( 

226 request, client_secret, resource_owner_secret) 

227 elif request.signature_method == SIGNATURE_HMAC_SHA256: 

228 valid_signature = signature.verify_hmac_sha256( 

229 request, client_secret, resource_owner_secret) 

230 elif request.signature_method == SIGNATURE_HMAC_SHA512: 

231 valid_signature = signature.verify_hmac_sha512( 

232 request, client_secret, resource_owner_secret) 

233 elif request.signature_method == SIGNATURE_PLAINTEXT: 

234 valid_signature = signature.verify_plaintext( 

235 request, client_secret, resource_owner_secret) 

236 else: 

237 valid_signature = False 

238 

239 return valid_signature