Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/oauthlib/oauth1/rfc5849/endpoints/base.py: 11%

96 statements  

« prev     ^ index     » next       coverage.py v7.3.2, created at 2023-12-08 06:22 +0000

1# -*- coding: utf-8 -*- 

2""" 

3oauthlib.oauth1.rfc5849.endpoints.base 

4~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ 

5 

6This module is an implementation of various logic needed 

7for signing and checking OAuth 1.0 RFC 5849 requests. 

8""" 

9import time 

10 

11from oauthlib.common import CaseInsensitiveDict, Request, generate_token 

12 

13from .. import ( 

14 CONTENT_TYPE_FORM_URLENCODED, SIGNATURE_HMAC_SHA1, SIGNATURE_HMAC_SHA256, 

15 SIGNATURE_HMAC_SHA512, SIGNATURE_PLAINTEXT, SIGNATURE_RSA_SHA1, 

16 SIGNATURE_RSA_SHA256, SIGNATURE_RSA_SHA512, SIGNATURE_TYPE_AUTH_HEADER, 

17 SIGNATURE_TYPE_BODY, SIGNATURE_TYPE_QUERY, errors, signature, utils, 

18) 

19 

20 

21class BaseEndpoint: 

22 

23 def __init__(self, request_validator, token_generator=None): 

24 self.request_validator = request_validator 

25 self.token_generator = token_generator or generate_token 

26 

27 def _get_signature_type_and_params(self, request): 

28 """Extracts parameters from query, headers and body. Signature type 

29 is set to the source in which parameters were found. 

30 """ 

31 # Per RFC5849, only the Authorization header may contain the 'realm' 

32 # optional parameter. 

33 header_params = signature.collect_parameters(headers=request.headers, 

34 exclude_oauth_signature=False, with_realm=True) 

35 body_params = signature.collect_parameters(body=request.body, 

36 exclude_oauth_signature=False) 

37 query_params = signature.collect_parameters(uri_query=request.uri_query, 

38 exclude_oauth_signature=False) 

39 

40 params = [] 

41 params.extend(header_params) 

42 params.extend(body_params) 

43 params.extend(query_params) 

44 signature_types_with_oauth_params = list(filter(lambda s: s[2], ( 

45 (SIGNATURE_TYPE_AUTH_HEADER, params, 

46 utils.filter_oauth_params(header_params)), 

47 (SIGNATURE_TYPE_BODY, params, 

48 utils.filter_oauth_params(body_params)), 

49 (SIGNATURE_TYPE_QUERY, params, 

50 utils.filter_oauth_params(query_params)) 

51 ))) 

52 

53 if len(signature_types_with_oauth_params) > 1: 

54 found_types = [s[0] for s in signature_types_with_oauth_params] 

55 raise errors.InvalidRequestError( 

56 description=('oauth_ params must come from only 1 signature' 

57 'type but were found in %s', 

58 ', '.join(found_types))) 

59 

60 try: 

61 signature_type, params, oauth_params = signature_types_with_oauth_params[ 

62 0] 

63 except IndexError: 

64 raise errors.InvalidRequestError( 

65 description='Missing mandatory OAuth parameters.') 

66 

67 return signature_type, params, oauth_params 

68 

69 def _create_request(self, uri, http_method, body, headers): 

70 # Only include body data from x-www-form-urlencoded requests 

71 headers = CaseInsensitiveDict(headers or {}) 

72 if ("Content-Type" in headers and 

73 CONTENT_TYPE_FORM_URLENCODED in headers["Content-Type"]): 

74 request = Request(uri, http_method, body, headers) 

75 else: 

76 request = Request(uri, http_method, '', headers) 

77 

78 signature_type, params, oauth_params = ( 

79 self._get_signature_type_and_params(request)) 

80 

81 # The server SHOULD return a 400 (Bad Request) status code when 

82 # receiving a request with duplicated protocol parameters. 

83 if len(dict(oauth_params)) != len(oauth_params): 

84 raise errors.InvalidRequestError( 

85 description='Duplicate OAuth1 entries.') 

86 

87 oauth_params = dict(oauth_params) 

88 request.signature = oauth_params.get('oauth_signature') 

89 request.client_key = oauth_params.get('oauth_consumer_key') 

90 request.resource_owner_key = oauth_params.get('oauth_token') 

91 request.nonce = oauth_params.get('oauth_nonce') 

92 request.timestamp = oauth_params.get('oauth_timestamp') 

93 request.redirect_uri = oauth_params.get('oauth_callback') 

94 request.verifier = oauth_params.get('oauth_verifier') 

95 request.signature_method = oauth_params.get('oauth_signature_method') 

96 request.realm = dict(params).get('realm') 

97 request.oauth_params = oauth_params 

98 

99 # Parameters to Client depend on signature method which may vary 

100 # for each request. Note that HMAC-SHA1 and PLAINTEXT share parameters 

101 request.params = [(k, v) for k, v in params if k != "oauth_signature"] 

102 

103 if 'realm' in request.headers.get('Authorization', ''): 

104 request.params = [(k, v) 

105 for k, v in request.params if k != "realm"] 

106 

107 return request 

108 

109 def _check_transport_security(self, request): 

110 # TODO: move into oauthlib.common from oauth2.utils 

111 if (self.request_validator.enforce_ssl and 

112 not request.uri.lower().startswith("https://")): 

113 raise errors.InsecureTransportError() 

114 

115 def _check_mandatory_parameters(self, request): 

116 # The server SHOULD return a 400 (Bad Request) status code when 

117 # receiving a request with missing parameters. 

118 if not all((request.signature, request.client_key, 

119 request.nonce, request.timestamp, 

120 request.signature_method)): 

121 raise errors.InvalidRequestError( 

122 description='Missing mandatory OAuth parameters.') 

123 

124 # OAuth does not mandate a particular signature method, as each 

125 # implementation can have its own unique requirements. Servers are 

126 # free to implement and document their own custom methods. 

127 # Recommending any particular method is beyond the scope of this 

128 # specification. Implementers should review the Security 

129 # Considerations section (`Section 4`_) before deciding on which 

130 # method to support. 

131 # .. _`Section 4`: https://tools.ietf.org/html/rfc5849#section-4 

132 if (not request.signature_method in 

133 self.request_validator.allowed_signature_methods): 

134 raise errors.InvalidSignatureMethodError( 

135 description="Invalid signature, {} not in {!r}.".format( 

136 request.signature_method, 

137 self.request_validator.allowed_signature_methods)) 

138 

139 # Servers receiving an authenticated request MUST validate it by: 

140 # If the "oauth_version" parameter is present, ensuring its value is 

141 # "1.0". 

142 if ('oauth_version' in request.oauth_params and 

143 request.oauth_params['oauth_version'] != '1.0'): 

144 raise errors.InvalidRequestError( 

145 description='Invalid OAuth version.') 

146 

147 # The timestamp value MUST be a positive integer. Unless otherwise 

148 # specified by the server's documentation, the timestamp is expressed 

149 # in the number of seconds since January 1, 1970 00:00:00 GMT. 

150 if len(request.timestamp) != 10: 

151 raise errors.InvalidRequestError( 

152 description='Invalid timestamp size') 

153 

154 try: 

155 ts = int(request.timestamp) 

156 

157 except ValueError: 

158 raise errors.InvalidRequestError( 

159 description='Timestamp must be an integer.') 

160 

161 else: 

162 # To avoid the need to retain an infinite number of nonce values for 

163 # future checks, servers MAY choose to restrict the time period after 

164 # which a request with an old timestamp is rejected. 

165 if abs(time.time() - ts) > self.request_validator.timestamp_lifetime: 

166 raise errors.InvalidRequestError( 

167 description=('Timestamp given is invalid, differ from ' 

168 'allowed by over %s seconds.' % ( 

169 self.request_validator.timestamp_lifetime))) 

170 

171 # Provider specific validation of parameters, used to enforce 

172 # restrictions such as character set and length. 

173 if not self.request_validator.check_client_key(request.client_key): 

174 raise errors.InvalidRequestError( 

175 description='Invalid client key format.') 

176 

177 if not self.request_validator.check_nonce(request.nonce): 

178 raise errors.InvalidRequestError( 

179 description='Invalid nonce format.') 

180 

181 def _check_signature(self, request, is_token_request=False): 

182 # ---- RSA Signature verification ---- 

183 if request.signature_method == SIGNATURE_RSA_SHA1 or \ 

184 request.signature_method == SIGNATURE_RSA_SHA256 or \ 

185 request.signature_method == SIGNATURE_RSA_SHA512: 

186 # RSA-based signature method 

187 

188 # The server verifies the signature per `[RFC3447] section 8.2.2`_ 

189 # .. _`[RFC3447] section 8.2.2`: https://tools.ietf.org/html/rfc3447#section-8.2.1 

190 

191 rsa_key = self.request_validator.get_rsa_key( 

192 request.client_key, request) 

193 

194 if request.signature_method == SIGNATURE_RSA_SHA1: 

195 valid_signature = signature.verify_rsa_sha1(request, rsa_key) 

196 elif request.signature_method == SIGNATURE_RSA_SHA256: 

197 valid_signature = signature.verify_rsa_sha256(request, rsa_key) 

198 elif request.signature_method == SIGNATURE_RSA_SHA512: 

199 valid_signature = signature.verify_rsa_sha512(request, rsa_key) 

200 else: 

201 valid_signature = False 

202 

203 # ---- HMAC or Plaintext Signature verification ---- 

204 else: 

205 # Non-RSA based signature method 

206 

207 # Servers receiving an authenticated request MUST validate it by: 

208 # Recalculating the request signature independently as described in 

209 # `Section 3.4`_ and comparing it to the value received from the 

210 # client via the "oauth_signature" parameter. 

211 # .. _`Section 3.4`: https://tools.ietf.org/html/rfc5849#section-3.4 

212 

213 client_secret = self.request_validator.get_client_secret( 

214 request.client_key, request) 

215 

216 resource_owner_secret = None 

217 if request.resource_owner_key: 

218 if is_token_request: 

219 resource_owner_secret = \ 

220 self.request_validator.get_request_token_secret( 

221 request.client_key, request.resource_owner_key, 

222 request) 

223 else: 

224 resource_owner_secret = \ 

225 self.request_validator.get_access_token_secret( 

226 request.client_key, request.resource_owner_key, 

227 request) 

228 

229 if request.signature_method == SIGNATURE_HMAC_SHA1: 

230 valid_signature = signature.verify_hmac_sha1( 

231 request, client_secret, resource_owner_secret) 

232 elif request.signature_method == SIGNATURE_HMAC_SHA256: 

233 valid_signature = signature.verify_hmac_sha256( 

234 request, client_secret, resource_owner_secret) 

235 elif request.signature_method == SIGNATURE_HMAC_SHA512: 

236 valid_signature = signature.verify_hmac_sha512( 

237 request, client_secret, resource_owner_secret) 

238 elif request.signature_method == SIGNATURE_PLAINTEXT: 

239 valid_signature = signature.verify_plaintext( 

240 request, client_secret, resource_owner_secret) 

241 else: 

242 valid_signature = False 

243 

244 return valid_signature