Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/oauthlib/oauth1/rfc5849/endpoints/base.py: 11%
96 statements
« prev ^ index » next coverage.py v7.3.2, created at 2023-12-08 06:22 +0000
« prev ^ index » next coverage.py v7.3.2, created at 2023-12-08 06:22 +0000
1# -*- coding: utf-8 -*-
2"""
3oauthlib.oauth1.rfc5849.endpoints.base
4~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
6This module is an implementation of various logic needed
7for signing and checking OAuth 1.0 RFC 5849 requests.
8"""
9import time
11from oauthlib.common import CaseInsensitiveDict, Request, generate_token
13from .. import (
14 CONTENT_TYPE_FORM_URLENCODED, SIGNATURE_HMAC_SHA1, SIGNATURE_HMAC_SHA256,
15 SIGNATURE_HMAC_SHA512, SIGNATURE_PLAINTEXT, SIGNATURE_RSA_SHA1,
16 SIGNATURE_RSA_SHA256, SIGNATURE_RSA_SHA512, SIGNATURE_TYPE_AUTH_HEADER,
17 SIGNATURE_TYPE_BODY, SIGNATURE_TYPE_QUERY, errors, signature, utils,
18)
21class BaseEndpoint:
23 def __init__(self, request_validator, token_generator=None):
24 self.request_validator = request_validator
25 self.token_generator = token_generator or generate_token
27 def _get_signature_type_and_params(self, request):
28 """Extracts parameters from query, headers and body. Signature type
29 is set to the source in which parameters were found.
30 """
31 # Per RFC5849, only the Authorization header may contain the 'realm'
32 # optional parameter.
33 header_params = signature.collect_parameters(headers=request.headers,
34 exclude_oauth_signature=False, with_realm=True)
35 body_params = signature.collect_parameters(body=request.body,
36 exclude_oauth_signature=False)
37 query_params = signature.collect_parameters(uri_query=request.uri_query,
38 exclude_oauth_signature=False)
40 params = []
41 params.extend(header_params)
42 params.extend(body_params)
43 params.extend(query_params)
44 signature_types_with_oauth_params = list(filter(lambda s: s[2], (
45 (SIGNATURE_TYPE_AUTH_HEADER, params,
46 utils.filter_oauth_params(header_params)),
47 (SIGNATURE_TYPE_BODY, params,
48 utils.filter_oauth_params(body_params)),
49 (SIGNATURE_TYPE_QUERY, params,
50 utils.filter_oauth_params(query_params))
51 )))
53 if len(signature_types_with_oauth_params) > 1:
54 found_types = [s[0] for s in signature_types_with_oauth_params]
55 raise errors.InvalidRequestError(
56 description=('oauth_ params must come from only 1 signature'
57 'type but were found in %s',
58 ', '.join(found_types)))
60 try:
61 signature_type, params, oauth_params = signature_types_with_oauth_params[
62 0]
63 except IndexError:
64 raise errors.InvalidRequestError(
65 description='Missing mandatory OAuth parameters.')
67 return signature_type, params, oauth_params
69 def _create_request(self, uri, http_method, body, headers):
70 # Only include body data from x-www-form-urlencoded requests
71 headers = CaseInsensitiveDict(headers or {})
72 if ("Content-Type" in headers and
73 CONTENT_TYPE_FORM_URLENCODED in headers["Content-Type"]):
74 request = Request(uri, http_method, body, headers)
75 else:
76 request = Request(uri, http_method, '', headers)
78 signature_type, params, oauth_params = (
79 self._get_signature_type_and_params(request))
81 # The server SHOULD return a 400 (Bad Request) status code when
82 # receiving a request with duplicated protocol parameters.
83 if len(dict(oauth_params)) != len(oauth_params):
84 raise errors.InvalidRequestError(
85 description='Duplicate OAuth1 entries.')
87 oauth_params = dict(oauth_params)
88 request.signature = oauth_params.get('oauth_signature')
89 request.client_key = oauth_params.get('oauth_consumer_key')
90 request.resource_owner_key = oauth_params.get('oauth_token')
91 request.nonce = oauth_params.get('oauth_nonce')
92 request.timestamp = oauth_params.get('oauth_timestamp')
93 request.redirect_uri = oauth_params.get('oauth_callback')
94 request.verifier = oauth_params.get('oauth_verifier')
95 request.signature_method = oauth_params.get('oauth_signature_method')
96 request.realm = dict(params).get('realm')
97 request.oauth_params = oauth_params
99 # Parameters to Client depend on signature method which may vary
100 # for each request. Note that HMAC-SHA1 and PLAINTEXT share parameters
101 request.params = [(k, v) for k, v in params if k != "oauth_signature"]
103 if 'realm' in request.headers.get('Authorization', ''):
104 request.params = [(k, v)
105 for k, v in request.params if k != "realm"]
107 return request
109 def _check_transport_security(self, request):
110 # TODO: move into oauthlib.common from oauth2.utils
111 if (self.request_validator.enforce_ssl and
112 not request.uri.lower().startswith("https://")):
113 raise errors.InsecureTransportError()
115 def _check_mandatory_parameters(self, request):
116 # The server SHOULD return a 400 (Bad Request) status code when
117 # receiving a request with missing parameters.
118 if not all((request.signature, request.client_key,
119 request.nonce, request.timestamp,
120 request.signature_method)):
121 raise errors.InvalidRequestError(
122 description='Missing mandatory OAuth parameters.')
124 # OAuth does not mandate a particular signature method, as each
125 # implementation can have its own unique requirements. Servers are
126 # free to implement and document their own custom methods.
127 # Recommending any particular method is beyond the scope of this
128 # specification. Implementers should review the Security
129 # Considerations section (`Section 4`_) before deciding on which
130 # method to support.
131 # .. _`Section 4`: https://tools.ietf.org/html/rfc5849#section-4
132 if (not request.signature_method in
133 self.request_validator.allowed_signature_methods):
134 raise errors.InvalidSignatureMethodError(
135 description="Invalid signature, {} not in {!r}.".format(
136 request.signature_method,
137 self.request_validator.allowed_signature_methods))
139 # Servers receiving an authenticated request MUST validate it by:
140 # If the "oauth_version" parameter is present, ensuring its value is
141 # "1.0".
142 if ('oauth_version' in request.oauth_params and
143 request.oauth_params['oauth_version'] != '1.0'):
144 raise errors.InvalidRequestError(
145 description='Invalid OAuth version.')
147 # The timestamp value MUST be a positive integer. Unless otherwise
148 # specified by the server's documentation, the timestamp is expressed
149 # in the number of seconds since January 1, 1970 00:00:00 GMT.
150 if len(request.timestamp) != 10:
151 raise errors.InvalidRequestError(
152 description='Invalid timestamp size')
154 try:
155 ts = int(request.timestamp)
157 except ValueError:
158 raise errors.InvalidRequestError(
159 description='Timestamp must be an integer.')
161 else:
162 # To avoid the need to retain an infinite number of nonce values for
163 # future checks, servers MAY choose to restrict the time period after
164 # which a request with an old timestamp is rejected.
165 if abs(time.time() - ts) > self.request_validator.timestamp_lifetime:
166 raise errors.InvalidRequestError(
167 description=('Timestamp given is invalid, differ from '
168 'allowed by over %s seconds.' % (
169 self.request_validator.timestamp_lifetime)))
171 # Provider specific validation of parameters, used to enforce
172 # restrictions such as character set and length.
173 if not self.request_validator.check_client_key(request.client_key):
174 raise errors.InvalidRequestError(
175 description='Invalid client key format.')
177 if not self.request_validator.check_nonce(request.nonce):
178 raise errors.InvalidRequestError(
179 description='Invalid nonce format.')
181 def _check_signature(self, request, is_token_request=False):
182 # ---- RSA Signature verification ----
183 if request.signature_method == SIGNATURE_RSA_SHA1 or \
184 request.signature_method == SIGNATURE_RSA_SHA256 or \
185 request.signature_method == SIGNATURE_RSA_SHA512:
186 # RSA-based signature method
188 # The server verifies the signature per `[RFC3447] section 8.2.2`_
189 # .. _`[RFC3447] section 8.2.2`: https://tools.ietf.org/html/rfc3447#section-8.2.1
191 rsa_key = self.request_validator.get_rsa_key(
192 request.client_key, request)
194 if request.signature_method == SIGNATURE_RSA_SHA1:
195 valid_signature = signature.verify_rsa_sha1(request, rsa_key)
196 elif request.signature_method == SIGNATURE_RSA_SHA256:
197 valid_signature = signature.verify_rsa_sha256(request, rsa_key)
198 elif request.signature_method == SIGNATURE_RSA_SHA512:
199 valid_signature = signature.verify_rsa_sha512(request, rsa_key)
200 else:
201 valid_signature = False
203 # ---- HMAC or Plaintext Signature verification ----
204 else:
205 # Non-RSA based signature method
207 # Servers receiving an authenticated request MUST validate it by:
208 # Recalculating the request signature independently as described in
209 # `Section 3.4`_ and comparing it to the value received from the
210 # client via the "oauth_signature" parameter.
211 # .. _`Section 3.4`: https://tools.ietf.org/html/rfc5849#section-3.4
213 client_secret = self.request_validator.get_client_secret(
214 request.client_key, request)
216 resource_owner_secret = None
217 if request.resource_owner_key:
218 if is_token_request:
219 resource_owner_secret = \
220 self.request_validator.get_request_token_secret(
221 request.client_key, request.resource_owner_key,
222 request)
223 else:
224 resource_owner_secret = \
225 self.request_validator.get_access_token_secret(
226 request.client_key, request.resource_owner_key,
227 request)
229 if request.signature_method == SIGNATURE_HMAC_SHA1:
230 valid_signature = signature.verify_hmac_sha1(
231 request, client_secret, resource_owner_secret)
232 elif request.signature_method == SIGNATURE_HMAC_SHA256:
233 valid_signature = signature.verify_hmac_sha256(
234 request, client_secret, resource_owner_secret)
235 elif request.signature_method == SIGNATURE_HMAC_SHA512:
236 valid_signature = signature.verify_hmac_sha512(
237 request, client_secret, resource_owner_secret)
238 elif request.signature_method == SIGNATURE_PLAINTEXT:
239 valid_signature = signature.verify_plaintext(
240 request, client_secret, resource_owner_secret)
241 else:
242 valid_signature = False
244 return valid_signature