Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/botocore/auth.py: 20%
589 statements
« prev ^ index » next coverage.py v7.3.2, created at 2023-12-08 06:51 +0000
« prev ^ index » next coverage.py v7.3.2, created at 2023-12-08 06:51 +0000
1# Copyright (c) 2012-2013 Mitch Garnaat http://garnaat.org/
2# Copyright 2012-2014 Amazon.com, Inc. or its affiliates. All Rights Reserved.
3#
4# Licensed under the Apache License, Version 2.0 (the "License"). You
5# may not use this file except in compliance with the License. A copy of
6# the License is located at
7#
8# http://aws.amazon.com/apache2.0/
9#
10# or in the "license" file accompanying this file. This file is
11# distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF
12# ANY KIND, either express or implied. See the License for the specific
13# language governing permissions and limitations under the License.
14import base64
15import calendar
16import datetime
17import functools
18import hmac
19import json
20import logging
21import time
22from collections.abc import Mapping
23from email.utils import formatdate
24from hashlib import sha1, sha256
25from operator import itemgetter
27from botocore.compat import (
28 HAS_CRT,
29 HTTPHeaders,
30 encodebytes,
31 ensure_unicode,
32 parse_qs,
33 quote,
34 unquote,
35 urlsplit,
36 urlunsplit,
37)
38from botocore.exceptions import NoAuthTokenError, NoCredentialsError
39from botocore.utils import (
40 is_valid_ipv6_endpoint_url,
41 normalize_url_path,
42 percent_encode_sequence,
43)
45# Imports for backwards compatibility
46from botocore.compat import MD5_AVAILABLE # noqa
49logger = logging.getLogger(__name__)
52EMPTY_SHA256_HASH = (
53 'e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855'
54)
55# This is the buffer size used when calculating sha256 checksums.
56# Experimenting with various buffer sizes showed that this value generally
57# gave the best result (in terms of performance).
58PAYLOAD_BUFFER = 1024 * 1024
59ISO8601 = '%Y-%m-%dT%H:%M:%SZ'
60SIGV4_TIMESTAMP = '%Y%m%dT%H%M%SZ'
61SIGNED_HEADERS_BLACKLIST = [
62 'expect',
63 'user-agent',
64 'x-amzn-trace-id',
65]
66UNSIGNED_PAYLOAD = 'UNSIGNED-PAYLOAD'
67STREAMING_UNSIGNED_PAYLOAD_TRAILER = 'STREAMING-UNSIGNED-PAYLOAD-TRAILER'
70def _host_from_url(url):
71 # Given URL, derive value for host header. Ensure that value:
72 # 1) is lowercase
73 # 2) excludes port, if it was the default port
74 # 3) excludes userinfo
75 url_parts = urlsplit(url)
76 host = url_parts.hostname # urlsplit's hostname is always lowercase
77 if is_valid_ipv6_endpoint_url(url):
78 host = f'[{host}]'
79 default_ports = {
80 'http': 80,
81 'https': 443,
82 }
83 if url_parts.port is not None:
84 if url_parts.port != default_ports.get(url_parts.scheme):
85 host = '%s:%d' % (host, url_parts.port)
86 return host
89def _get_body_as_dict(request):
90 # For query services, request.data is form-encoded and is already a
91 # dict, but for other services such as rest-json it could be a json
92 # string or bytes. In those cases we attempt to load the data as a
93 # dict.
94 data = request.data
95 if isinstance(data, bytes):
96 data = json.loads(data.decode('utf-8'))
97 elif isinstance(data, str):
98 data = json.loads(data)
99 return data
102class BaseSigner:
103 REQUIRES_REGION = False
104 REQUIRES_TOKEN = False
106 def add_auth(self, request):
107 raise NotImplementedError("add_auth")
110class TokenSigner(BaseSigner):
111 REQUIRES_TOKEN = True
112 """
113 Signers that expect an authorization token to perform the authorization
114 """
116 def __init__(self, auth_token):
117 self.auth_token = auth_token
120class SigV2Auth(BaseSigner):
121 """
122 Sign a request with Signature V2.
123 """
125 def __init__(self, credentials):
126 self.credentials = credentials
128 def calc_signature(self, request, params):
129 logger.debug("Calculating signature using v2 auth.")
130 split = urlsplit(request.url)
131 path = split.path
132 if len(path) == 0:
133 path = '/'
134 string_to_sign = f"{request.method}\n{split.netloc}\n{path}\n"
135 lhmac = hmac.new(
136 self.credentials.secret_key.encode("utf-8"), digestmod=sha256
137 )
138 pairs = []
139 for key in sorted(params):
140 # Any previous signature should not be a part of this
141 # one, so we skip that particular key. This prevents
142 # issues during retries.
143 if key == 'Signature':
144 continue
145 value = str(params[key])
146 quoted_key = quote(key.encode('utf-8'), safe='')
147 quoted_value = quote(value.encode('utf-8'), safe='-_~')
148 pairs.append(f'{quoted_key}={quoted_value}')
149 qs = '&'.join(pairs)
150 string_to_sign += qs
151 logger.debug('String to sign: %s', string_to_sign)
152 lhmac.update(string_to_sign.encode('utf-8'))
153 b64 = base64.b64encode(lhmac.digest()).strip().decode('utf-8')
154 return (qs, b64)
156 def add_auth(self, request):
157 # The auth handler is the last thing called in the
158 # preparation phase of a prepared request.
159 # Because of this we have to parse the query params
160 # from the request body so we can update them with
161 # the sigv2 auth params.
162 if self.credentials is None:
163 raise NoCredentialsError()
164 if request.data:
165 # POST
166 params = request.data
167 else:
168 # GET
169 params = request.params
170 params['AWSAccessKeyId'] = self.credentials.access_key
171 params['SignatureVersion'] = '2'
172 params['SignatureMethod'] = 'HmacSHA256'
173 params['Timestamp'] = time.strftime(ISO8601, time.gmtime())
174 if self.credentials.token:
175 params['SecurityToken'] = self.credentials.token
176 qs, signature = self.calc_signature(request, params)
177 params['Signature'] = signature
178 return request
181class SigV3Auth(BaseSigner):
182 def __init__(self, credentials):
183 self.credentials = credentials
185 def add_auth(self, request):
186 if self.credentials is None:
187 raise NoCredentialsError()
188 if 'Date' in request.headers:
189 del request.headers['Date']
190 request.headers['Date'] = formatdate(usegmt=True)
191 if self.credentials.token:
192 if 'X-Amz-Security-Token' in request.headers:
193 del request.headers['X-Amz-Security-Token']
194 request.headers['X-Amz-Security-Token'] = self.credentials.token
195 new_hmac = hmac.new(
196 self.credentials.secret_key.encode('utf-8'), digestmod=sha256
197 )
198 new_hmac.update(request.headers['Date'].encode('utf-8'))
199 encoded_signature = encodebytes(new_hmac.digest()).strip()
200 signature = (
201 f"AWS3-HTTPS AWSAccessKeyId={self.credentials.access_key},"
202 f"Algorithm=HmacSHA256,Signature={encoded_signature.decode('utf-8')}"
203 )
204 if 'X-Amzn-Authorization' in request.headers:
205 del request.headers['X-Amzn-Authorization']
206 request.headers['X-Amzn-Authorization'] = signature
209class SigV4Auth(BaseSigner):
210 """
211 Sign a request with Signature V4.
212 """
214 REQUIRES_REGION = True
216 def __init__(self, credentials, service_name, region_name):
217 self.credentials = credentials
218 # We initialize these value here so the unit tests can have
219 # valid values. But these will get overriden in ``add_auth``
220 # later for real requests.
221 self._region_name = region_name
222 self._service_name = service_name
224 def _sign(self, key, msg, hex=False):
225 if hex:
226 sig = hmac.new(key, msg.encode('utf-8'), sha256).hexdigest()
227 else:
228 sig = hmac.new(key, msg.encode('utf-8'), sha256).digest()
229 return sig
231 def headers_to_sign(self, request):
232 """
233 Select the headers from the request that need to be included
234 in the StringToSign.
235 """
236 header_map = HTTPHeaders()
237 for name, value in request.headers.items():
238 lname = name.lower()
239 if lname not in SIGNED_HEADERS_BLACKLIST:
240 header_map[lname] = value
241 if 'host' not in header_map:
242 # TODO: We should set the host ourselves, instead of relying on our
243 # HTTP client to set it for us.
244 header_map['host'] = _host_from_url(request.url)
245 return header_map
247 def canonical_query_string(self, request):
248 # The query string can come from two parts. One is the
249 # params attribute of the request. The other is from the request
250 # url (in which case we have to re-split the url into its components
251 # and parse out the query string component).
252 if request.params:
253 return self._canonical_query_string_params(request.params)
254 else:
255 return self._canonical_query_string_url(urlsplit(request.url))
257 def _canonical_query_string_params(self, params):
258 # [(key, value), (key2, value2)]
259 key_val_pairs = []
260 if isinstance(params, Mapping):
261 params = params.items()
262 for key, value in params:
263 key_val_pairs.append(
264 (quote(key, safe='-_.~'), quote(str(value), safe='-_.~'))
265 )
266 sorted_key_vals = []
267 # Sort by the URI-encoded key names, and in the case of
268 # repeated keys, then sort by the value.
269 for key, value in sorted(key_val_pairs):
270 sorted_key_vals.append(f'{key}={value}')
271 canonical_query_string = '&'.join(sorted_key_vals)
272 return canonical_query_string
274 def _canonical_query_string_url(self, parts):
275 canonical_query_string = ''
276 if parts.query:
277 # [(key, value), (key2, value2)]
278 key_val_pairs = []
279 for pair in parts.query.split('&'):
280 key, _, value = pair.partition('=')
281 key_val_pairs.append((key, value))
282 sorted_key_vals = []
283 # Sort by the URI-encoded key names, and in the case of
284 # repeated keys, then sort by the value.
285 for key, value in sorted(key_val_pairs):
286 sorted_key_vals.append(f'{key}={value}')
287 canonical_query_string = '&'.join(sorted_key_vals)
288 return canonical_query_string
290 def canonical_headers(self, headers_to_sign):
291 """
292 Return the headers that need to be included in the StringToSign
293 in their canonical form by converting all header keys to lower
294 case, sorting them in alphabetical order and then joining
295 them into a string, separated by newlines.
296 """
297 headers = []
298 sorted_header_names = sorted(set(headers_to_sign))
299 for key in sorted_header_names:
300 value = ','.join(
301 self._header_value(v) for v in headers_to_sign.get_all(key)
302 )
303 headers.append(f'{key}:{ensure_unicode(value)}')
304 return '\n'.join(headers)
306 def _header_value(self, value):
307 # From the sigv4 docs:
308 # Lowercase(HeaderName) + ':' + Trimall(HeaderValue)
309 #
310 # The Trimall function removes excess white space before and after
311 # values, and converts sequential spaces to a single space.
312 return ' '.join(value.split())
314 def signed_headers(self, headers_to_sign):
315 headers = sorted(n.lower().strip() for n in set(headers_to_sign))
316 return ';'.join(headers)
318 def _is_streaming_checksum_payload(self, request):
319 checksum_context = request.context.get('checksum', {})
320 algorithm = checksum_context.get('request_algorithm')
321 return isinstance(algorithm, dict) and algorithm.get('in') == 'trailer'
323 def payload(self, request):
324 if self._is_streaming_checksum_payload(request):
325 return STREAMING_UNSIGNED_PAYLOAD_TRAILER
326 elif not self._should_sha256_sign_payload(request):
327 # When payload signing is disabled, we use this static string in
328 # place of the payload checksum.
329 return UNSIGNED_PAYLOAD
330 request_body = request.body
331 if request_body and hasattr(request_body, 'seek'):
332 position = request_body.tell()
333 read_chunksize = functools.partial(
334 request_body.read, PAYLOAD_BUFFER
335 )
336 checksum = sha256()
337 for chunk in iter(read_chunksize, b''):
338 checksum.update(chunk)
339 hex_checksum = checksum.hexdigest()
340 request_body.seek(position)
341 return hex_checksum
342 elif request_body:
343 # The request serialization has ensured that
344 # request.body is a bytes() type.
345 return sha256(request_body).hexdigest()
346 else:
347 return EMPTY_SHA256_HASH
349 def _should_sha256_sign_payload(self, request):
350 # Payloads will always be signed over insecure connections.
351 if not request.url.startswith('https'):
352 return True
354 # Certain operations may have payload signing disabled by default.
355 # Since we don't have access to the operation model, we pass in this
356 # bit of metadata through the request context.
357 return request.context.get('payload_signing_enabled', True)
359 def canonical_request(self, request):
360 cr = [request.method.upper()]
361 path = self._normalize_url_path(urlsplit(request.url).path)
362 cr.append(path)
363 cr.append(self.canonical_query_string(request))
364 headers_to_sign = self.headers_to_sign(request)
365 cr.append(self.canonical_headers(headers_to_sign) + '\n')
366 cr.append(self.signed_headers(headers_to_sign))
367 if 'X-Amz-Content-SHA256' in request.headers:
368 body_checksum = request.headers['X-Amz-Content-SHA256']
369 else:
370 body_checksum = self.payload(request)
371 cr.append(body_checksum)
372 return '\n'.join(cr)
374 def _normalize_url_path(self, path):
375 normalized_path = quote(normalize_url_path(path), safe='/~')
376 return normalized_path
378 def scope(self, request):
379 scope = [self.credentials.access_key]
380 scope.append(request.context['timestamp'][0:8])
381 scope.append(self._region_name)
382 scope.append(self._service_name)
383 scope.append('aws4_request')
384 return '/'.join(scope)
386 def credential_scope(self, request):
387 scope = []
388 scope.append(request.context['timestamp'][0:8])
389 scope.append(self._region_name)
390 scope.append(self._service_name)
391 scope.append('aws4_request')
392 return '/'.join(scope)
394 def string_to_sign(self, request, canonical_request):
395 """
396 Return the canonical StringToSign as well as a dict
397 containing the original version of all headers that
398 were included in the StringToSign.
399 """
400 sts = ['AWS4-HMAC-SHA256']
401 sts.append(request.context['timestamp'])
402 sts.append(self.credential_scope(request))
403 sts.append(sha256(canonical_request.encode('utf-8')).hexdigest())
404 return '\n'.join(sts)
406 def signature(self, string_to_sign, request):
407 key = self.credentials.secret_key
408 k_date = self._sign(
409 (f"AWS4{key}").encode(), request.context["timestamp"][0:8]
410 )
411 k_region = self._sign(k_date, self._region_name)
412 k_service = self._sign(k_region, self._service_name)
413 k_signing = self._sign(k_service, 'aws4_request')
414 return self._sign(k_signing, string_to_sign, hex=True)
416 def add_auth(self, request):
417 if self.credentials is None:
418 raise NoCredentialsError()
419 datetime_now = datetime.datetime.utcnow()
420 request.context['timestamp'] = datetime_now.strftime(SIGV4_TIMESTAMP)
421 # This could be a retry. Make sure the previous
422 # authorization header is removed first.
423 self._modify_request_before_signing(request)
424 canonical_request = self.canonical_request(request)
425 logger.debug("Calculating signature using v4 auth.")
426 logger.debug('CanonicalRequest:\n%s', canonical_request)
427 string_to_sign = self.string_to_sign(request, canonical_request)
428 logger.debug('StringToSign:\n%s', string_to_sign)
429 signature = self.signature(string_to_sign, request)
430 logger.debug('Signature:\n%s', signature)
432 self._inject_signature_to_request(request, signature)
434 def _inject_signature_to_request(self, request, signature):
435 auth_str = ['AWS4-HMAC-SHA256 Credential=%s' % self.scope(request)]
436 headers_to_sign = self.headers_to_sign(request)
437 auth_str.append(
438 f"SignedHeaders={self.signed_headers(headers_to_sign)}"
439 )
440 auth_str.append('Signature=%s' % signature)
441 request.headers['Authorization'] = ', '.join(auth_str)
442 return request
444 def _modify_request_before_signing(self, request):
445 if 'Authorization' in request.headers:
446 del request.headers['Authorization']
447 self._set_necessary_date_headers(request)
448 if self.credentials.token:
449 if 'X-Amz-Security-Token' in request.headers:
450 del request.headers['X-Amz-Security-Token']
451 request.headers['X-Amz-Security-Token'] = self.credentials.token
453 if not request.context.get('payload_signing_enabled', True):
454 if 'X-Amz-Content-SHA256' in request.headers:
455 del request.headers['X-Amz-Content-SHA256']
456 request.headers['X-Amz-Content-SHA256'] = UNSIGNED_PAYLOAD
458 def _set_necessary_date_headers(self, request):
459 # The spec allows for either the Date _or_ the X-Amz-Date value to be
460 # used so we check both. If there's a Date header, we use the date
461 # header. Otherwise we use the X-Amz-Date header.
462 if 'Date' in request.headers:
463 del request.headers['Date']
464 datetime_timestamp = datetime.datetime.strptime(
465 request.context['timestamp'], SIGV4_TIMESTAMP
466 )
467 request.headers['Date'] = formatdate(
468 int(calendar.timegm(datetime_timestamp.timetuple()))
469 )
470 if 'X-Amz-Date' in request.headers:
471 del request.headers['X-Amz-Date']
472 else:
473 if 'X-Amz-Date' in request.headers:
474 del request.headers['X-Amz-Date']
475 request.headers['X-Amz-Date'] = request.context['timestamp']
478class S3SigV4Auth(SigV4Auth):
479 def _modify_request_before_signing(self, request):
480 super()._modify_request_before_signing(request)
481 if 'X-Amz-Content-SHA256' in request.headers:
482 del request.headers['X-Amz-Content-SHA256']
484 request.headers['X-Amz-Content-SHA256'] = self.payload(request)
486 def _should_sha256_sign_payload(self, request):
487 # S3 allows optional body signing, so to minimize the performance
488 # impact, we opt to not SHA256 sign the body on streaming uploads,
489 # provided that we're on https.
490 client_config = request.context.get('client_config')
491 s3_config = getattr(client_config, 's3', None)
493 # The config could be None if it isn't set, or if the customer sets it
494 # to None.
495 if s3_config is None:
496 s3_config = {}
498 # The explicit configuration takes precedence over any implicit
499 # configuration.
500 sign_payload = s3_config.get('payload_signing_enabled', None)
501 if sign_payload is not None:
502 return sign_payload
504 # We require that both a checksum be present and https be enabled
505 # to implicitly disable body signing. The combination of TLS and
506 # a checksum is sufficiently secure and durable for us to be
507 # confident in the request without body signing.
508 checksum_header = 'Content-MD5'
509 checksum_context = request.context.get('checksum', {})
510 algorithm = checksum_context.get('request_algorithm')
511 if isinstance(algorithm, dict) and algorithm.get('in') == 'header':
512 checksum_header = algorithm['name']
513 if (
514 not request.url.startswith("https")
515 or checksum_header not in request.headers
516 ):
517 return True
519 # If the input is streaming we disable body signing by default.
520 if request.context.get('has_streaming_input', False):
521 return False
523 # If the S3-specific checks had no results, delegate to the generic
524 # checks.
525 return super()._should_sha256_sign_payload(request)
527 def _normalize_url_path(self, path):
528 # For S3, we do not normalize the path.
529 return path
532class S3ExpressAuth(S3SigV4Auth):
533 REQUIRES_IDENTITY_CACHE = True
535 def __init__(
536 self, credentials, service_name, region_name, *, identity_cache
537 ):
538 super().__init__(credentials, service_name, region_name)
539 self._identity_cache = identity_cache
541 def add_auth(self, request):
542 super().add_auth(request)
544 def _modify_request_before_signing(self, request):
545 super()._modify_request_before_signing(request)
546 if 'x-amz-s3session-token' not in request.headers:
547 request.headers['x-amz-s3session-token'] = self.credentials.token
548 # S3Express does not support STS' X-Amz-Security-Token
549 if 'X-Amz-Security-Token' in request.headers:
550 del request.headers['X-Amz-Security-Token']
553class S3ExpressPostAuth(S3ExpressAuth):
554 REQUIRES_IDENTITY_CACHE = True
556 def add_auth(self, request):
557 datetime_now = datetime.datetime.utcnow()
558 request.context['timestamp'] = datetime_now.strftime(SIGV4_TIMESTAMP)
560 fields = {}
561 if request.context.get('s3-presign-post-fields', None) is not None:
562 fields = request.context['s3-presign-post-fields']
564 policy = {}
565 conditions = []
566 if request.context.get('s3-presign-post-policy', None) is not None:
567 policy = request.context['s3-presign-post-policy']
568 if policy.get('conditions', None) is not None:
569 conditions = policy['conditions']
571 policy['conditions'] = conditions
573 fields['x-amz-algorithm'] = 'AWS4-HMAC-SHA256'
574 fields['x-amz-credential'] = self.scope(request)
575 fields['x-amz-date'] = request.context['timestamp']
577 conditions.append({'x-amz-algorithm': 'AWS4-HMAC-SHA256'})
578 conditions.append({'x-amz-credential': self.scope(request)})
579 conditions.append({'x-amz-date': request.context['timestamp']})
581 if self.credentials.token is not None:
582 fields['X-Amz-S3session-Token'] = self.credentials.token
583 conditions.append(
584 {'X-Amz-S3session-Token': self.credentials.token}
585 )
587 # Dump the base64 encoded policy into the fields dictionary.
588 fields['policy'] = base64.b64encode(
589 json.dumps(policy).encode('utf-8')
590 ).decode('utf-8')
592 fields['x-amz-signature'] = self.signature(fields['policy'], request)
594 request.context['s3-presign-post-fields'] = fields
595 request.context['s3-presign-post-policy'] = policy
598class S3ExpressQueryAuth(S3ExpressAuth):
599 DEFAULT_EXPIRES = 300
600 REQUIRES_IDENTITY_CACHE = True
602 def __init__(
603 self,
604 credentials,
605 service_name,
606 region_name,
607 *,
608 identity_cache,
609 expires=DEFAULT_EXPIRES,
610 ):
611 super().__init__(
612 credentials,
613 service_name,
614 region_name,
615 identity_cache=identity_cache,
616 )
617 self._expires = expires
619 def _modify_request_before_signing(self, request):
620 # We automatically set this header, so if it's the auto-set value we
621 # want to get rid of it since it doesn't make sense for presigned urls.
622 content_type = request.headers.get('content-type')
623 blocklisted_content_type = (
624 'application/x-www-form-urlencoded; charset=utf-8'
625 )
626 if content_type == blocklisted_content_type:
627 del request.headers['content-type']
629 # Note that we're not including X-Amz-Signature.
630 # From the docs: "The Canonical Query String must include all the query
631 # parameters from the preceding table except for X-Amz-Signature.
632 signed_headers = self.signed_headers(self.headers_to_sign(request))
634 auth_params = {
635 'X-Amz-Algorithm': 'AWS4-HMAC-SHA256',
636 'X-Amz-Credential': self.scope(request),
637 'X-Amz-Date': request.context['timestamp'],
638 'X-Amz-Expires': self._expires,
639 'X-Amz-SignedHeaders': signed_headers,
640 }
641 if self.credentials.token is not None:
642 auth_params['X-Amz-S3session-Token'] = self.credentials.token
643 # Now parse the original query string to a dict, inject our new query
644 # params, and serialize back to a query string.
645 url_parts = urlsplit(request.url)
646 # parse_qs makes each value a list, but in our case we know we won't
647 # have repeated keys so we know we have single element lists which we
648 # can convert back to scalar values.
649 query_string_parts = parse_qs(url_parts.query, keep_blank_values=True)
650 query_dict = {k: v[0] for k, v in query_string_parts.items()}
652 if request.params:
653 query_dict.update(request.params)
654 request.params = {}
655 # The spec is particular about this. It *has* to be:
656 # https://<endpoint>?<operation params>&<auth params>
657 # You can't mix the two types of params together, i.e just keep doing
658 # new_query_params.update(op_params)
659 # new_query_params.update(auth_params)
660 # percent_encode_sequence(new_query_params)
661 operation_params = ''
662 if request.data:
663 # We also need to move the body params into the query string. To
664 # do this, we first have to convert it to a dict.
665 query_dict.update(_get_body_as_dict(request))
666 request.data = ''
667 if query_dict:
668 operation_params = percent_encode_sequence(query_dict) + '&'
669 new_query_string = (
670 f"{operation_params}{percent_encode_sequence(auth_params)}"
671 )
672 # url_parts is a tuple (and therefore immutable) so we need to create
673 # a new url_parts with the new query string.
674 # <part> - <index>
675 # scheme - 0
676 # netloc - 1
677 # path - 2
678 # query - 3 <-- we're replacing this.
679 # fragment - 4
680 p = url_parts
681 new_url_parts = (p[0], p[1], p[2], new_query_string, p[4])
682 request.url = urlunsplit(new_url_parts)
684 def _inject_signature_to_request(self, request, signature):
685 # Rather than calculating an "Authorization" header, for the query
686 # param quth, we just append an 'X-Amz-Signature' param to the end
687 # of the query string.
688 request.url += '&X-Amz-Signature=%s' % signature
690 def _normalize_url_path(self, path):
691 # For S3, we do not normalize the path.
692 return path
694 def payload(self, request):
695 # From the doc link above:
696 # "You don't include a payload hash in the Canonical Request, because
697 # when you create a presigned URL, you don't know anything about the
698 # payload. Instead, you use a constant string "UNSIGNED-PAYLOAD".
699 return UNSIGNED_PAYLOAD
702class SigV4QueryAuth(SigV4Auth):
703 DEFAULT_EXPIRES = 3600
705 def __init__(
706 self, credentials, service_name, region_name, expires=DEFAULT_EXPIRES
707 ):
708 super().__init__(credentials, service_name, region_name)
709 self._expires = expires
711 def _modify_request_before_signing(self, request):
712 # We automatically set this header, so if it's the auto-set value we
713 # want to get rid of it since it doesn't make sense for presigned urls.
714 content_type = request.headers.get('content-type')
715 blacklisted_content_type = (
716 'application/x-www-form-urlencoded; charset=utf-8'
717 )
718 if content_type == blacklisted_content_type:
719 del request.headers['content-type']
721 # Note that we're not including X-Amz-Signature.
722 # From the docs: "The Canonical Query String must include all the query
723 # parameters from the preceding table except for X-Amz-Signature.
724 signed_headers = self.signed_headers(self.headers_to_sign(request))
726 auth_params = {
727 'X-Amz-Algorithm': 'AWS4-HMAC-SHA256',
728 'X-Amz-Credential': self.scope(request),
729 'X-Amz-Date': request.context['timestamp'],
730 'X-Amz-Expires': self._expires,
731 'X-Amz-SignedHeaders': signed_headers,
732 }
733 if self.credentials.token is not None:
734 auth_params['X-Amz-Security-Token'] = self.credentials.token
735 # Now parse the original query string to a dict, inject our new query
736 # params, and serialize back to a query string.
737 url_parts = urlsplit(request.url)
738 # parse_qs makes each value a list, but in our case we know we won't
739 # have repeated keys so we know we have single element lists which we
740 # can convert back to scalar values.
741 query_string_parts = parse_qs(url_parts.query, keep_blank_values=True)
742 query_dict = {k: v[0] for k, v in query_string_parts.items()}
744 if request.params:
745 query_dict.update(request.params)
746 request.params = {}
747 # The spec is particular about this. It *has* to be:
748 # https://<endpoint>?<operation params>&<auth params>
749 # You can't mix the two types of params together, i.e just keep doing
750 # new_query_params.update(op_params)
751 # new_query_params.update(auth_params)
752 # percent_encode_sequence(new_query_params)
753 operation_params = ''
754 if request.data:
755 # We also need to move the body params into the query string. To
756 # do this, we first have to convert it to a dict.
757 query_dict.update(_get_body_as_dict(request))
758 request.data = ''
759 if query_dict:
760 operation_params = percent_encode_sequence(query_dict) + '&'
761 new_query_string = (
762 f"{operation_params}{percent_encode_sequence(auth_params)}"
763 )
764 # url_parts is a tuple (and therefore immutable) so we need to create
765 # a new url_parts with the new query string.
766 # <part> - <index>
767 # scheme - 0
768 # netloc - 1
769 # path - 2
770 # query - 3 <-- we're replacing this.
771 # fragment - 4
772 p = url_parts
773 new_url_parts = (p[0], p[1], p[2], new_query_string, p[4])
774 request.url = urlunsplit(new_url_parts)
776 def _inject_signature_to_request(self, request, signature):
777 # Rather than calculating an "Authorization" header, for the query
778 # param quth, we just append an 'X-Amz-Signature' param to the end
779 # of the query string.
780 request.url += '&X-Amz-Signature=%s' % signature
783class S3SigV4QueryAuth(SigV4QueryAuth):
784 """S3 SigV4 auth using query parameters.
786 This signer will sign a request using query parameters and signature
787 version 4, i.e a "presigned url" signer.
789 Based off of:
791 http://docs.aws.amazon.com/AmazonS3/latest/API/sigv4-query-string-auth.html
793 """
795 def _normalize_url_path(self, path):
796 # For S3, we do not normalize the path.
797 return path
799 def payload(self, request):
800 # From the doc link above:
801 # "You don't include a payload hash in the Canonical Request, because
802 # when you create a presigned URL, you don't know anything about the
803 # payload. Instead, you use a constant string "UNSIGNED-PAYLOAD".
804 return UNSIGNED_PAYLOAD
807class S3SigV4PostAuth(SigV4Auth):
808 """
809 Presigns a s3 post
811 Implementation doc here:
812 http://docs.aws.amazon.com/AmazonS3/latest/API/sigv4-UsingHTTPPOST.html
813 """
815 def add_auth(self, request):
816 datetime_now = datetime.datetime.utcnow()
817 request.context['timestamp'] = datetime_now.strftime(SIGV4_TIMESTAMP)
819 fields = {}
820 if request.context.get('s3-presign-post-fields', None) is not None:
821 fields = request.context['s3-presign-post-fields']
823 policy = {}
824 conditions = []
825 if request.context.get('s3-presign-post-policy', None) is not None:
826 policy = request.context['s3-presign-post-policy']
827 if policy.get('conditions', None) is not None:
828 conditions = policy['conditions']
830 policy['conditions'] = conditions
832 fields['x-amz-algorithm'] = 'AWS4-HMAC-SHA256'
833 fields['x-amz-credential'] = self.scope(request)
834 fields['x-amz-date'] = request.context['timestamp']
836 conditions.append({'x-amz-algorithm': 'AWS4-HMAC-SHA256'})
837 conditions.append({'x-amz-credential': self.scope(request)})
838 conditions.append({'x-amz-date': request.context['timestamp']})
840 if self.credentials.token is not None:
841 fields['x-amz-security-token'] = self.credentials.token
842 conditions.append({'x-amz-security-token': self.credentials.token})
844 # Dump the base64 encoded policy into the fields dictionary.
845 fields['policy'] = base64.b64encode(
846 json.dumps(policy).encode('utf-8')
847 ).decode('utf-8')
849 fields['x-amz-signature'] = self.signature(fields['policy'], request)
851 request.context['s3-presign-post-fields'] = fields
852 request.context['s3-presign-post-policy'] = policy
855class HmacV1Auth(BaseSigner):
856 # List of Query String Arguments of Interest
857 QSAOfInterest = [
858 'accelerate',
859 'acl',
860 'cors',
861 'defaultObjectAcl',
862 'location',
863 'logging',
864 'partNumber',
865 'policy',
866 'requestPayment',
867 'torrent',
868 'versioning',
869 'versionId',
870 'versions',
871 'website',
872 'uploads',
873 'uploadId',
874 'response-content-type',
875 'response-content-language',
876 'response-expires',
877 'response-cache-control',
878 'response-content-disposition',
879 'response-content-encoding',
880 'delete',
881 'lifecycle',
882 'tagging',
883 'restore',
884 'storageClass',
885 'notification',
886 'replication',
887 'requestPayment',
888 'analytics',
889 'metrics',
890 'inventory',
891 'select',
892 'select-type',
893 'object-lock',
894 ]
896 def __init__(self, credentials, service_name=None, region_name=None):
897 self.credentials = credentials
899 def sign_string(self, string_to_sign):
900 new_hmac = hmac.new(
901 self.credentials.secret_key.encode('utf-8'), digestmod=sha1
902 )
903 new_hmac.update(string_to_sign.encode('utf-8'))
904 return encodebytes(new_hmac.digest()).strip().decode('utf-8')
906 def canonical_standard_headers(self, headers):
907 interesting_headers = ['content-md5', 'content-type', 'date']
908 hoi = []
909 if 'Date' in headers:
910 del headers['Date']
911 headers['Date'] = self._get_date()
912 for ih in interesting_headers:
913 found = False
914 for key in headers:
915 lk = key.lower()
916 if headers[key] is not None and lk == ih:
917 hoi.append(headers[key].strip())
918 found = True
919 if not found:
920 hoi.append('')
921 return '\n'.join(hoi)
923 def canonical_custom_headers(self, headers):
924 hoi = []
925 custom_headers = {}
926 for key in headers:
927 lk = key.lower()
928 if headers[key] is not None:
929 if lk.startswith('x-amz-'):
930 custom_headers[lk] = ','.join(
931 v.strip() for v in headers.get_all(key)
932 )
933 sorted_header_keys = sorted(custom_headers.keys())
934 for key in sorted_header_keys:
935 hoi.append(f"{key}:{custom_headers[key]}")
936 return '\n'.join(hoi)
938 def unquote_v(self, nv):
939 """
940 TODO: Do we need this?
941 """
942 if len(nv) == 1:
943 return nv
944 else:
945 return (nv[0], unquote(nv[1]))
947 def canonical_resource(self, split, auth_path=None):
948 # don't include anything after the first ? in the resource...
949 # unless it is one of the QSA of interest, defined above
950 # NOTE:
951 # The path in the canonical resource should always be the
952 # full path including the bucket name, even for virtual-hosting
953 # style addressing. The ``auth_path`` keeps track of the full
954 # path for the canonical resource and would be passed in if
955 # the client was using virtual-hosting style.
956 if auth_path is not None:
957 buf = auth_path
958 else:
959 buf = split.path
960 if split.query:
961 qsa = split.query.split('&')
962 qsa = [a.split('=', 1) for a in qsa]
963 qsa = [
964 self.unquote_v(a) for a in qsa if a[0] in self.QSAOfInterest
965 ]
966 if len(qsa) > 0:
967 qsa.sort(key=itemgetter(0))
968 qsa = ['='.join(a) for a in qsa]
969 buf += '?'
970 buf += '&'.join(qsa)
971 return buf
973 def canonical_string(
974 self, method, split, headers, expires=None, auth_path=None
975 ):
976 cs = method.upper() + '\n'
977 cs += self.canonical_standard_headers(headers) + '\n'
978 custom_headers = self.canonical_custom_headers(headers)
979 if custom_headers:
980 cs += custom_headers + '\n'
981 cs += self.canonical_resource(split, auth_path=auth_path)
982 return cs
984 def get_signature(
985 self, method, split, headers, expires=None, auth_path=None
986 ):
987 if self.credentials.token:
988 del headers['x-amz-security-token']
989 headers['x-amz-security-token'] = self.credentials.token
990 string_to_sign = self.canonical_string(
991 method, split, headers, auth_path=auth_path
992 )
993 logger.debug('StringToSign:\n%s', string_to_sign)
994 return self.sign_string(string_to_sign)
996 def add_auth(self, request):
997 if self.credentials is None:
998 raise NoCredentialsError
999 logger.debug("Calculating signature using hmacv1 auth.")
1000 split = urlsplit(request.url)
1001 logger.debug('HTTP request method: %s', request.method)
1002 signature = self.get_signature(
1003 request.method, split, request.headers, auth_path=request.auth_path
1004 )
1005 self._inject_signature(request, signature)
1007 def _get_date(self):
1008 return formatdate(usegmt=True)
1010 def _inject_signature(self, request, signature):
1011 if 'Authorization' in request.headers:
1012 # We have to do this because request.headers is not
1013 # normal dictionary. It has the (unintuitive) behavior
1014 # of aggregating repeated setattr calls for the same
1015 # key value. For example:
1016 # headers['foo'] = 'a'; headers['foo'] = 'b'
1017 # list(headers) will print ['foo', 'foo'].
1018 del request.headers['Authorization']
1020 auth_header = f"AWS {self.credentials.access_key}:{signature}"
1021 request.headers['Authorization'] = auth_header
1024class HmacV1QueryAuth(HmacV1Auth):
1025 """
1026 Generates a presigned request for s3.
1028 Spec from this document:
1030 http://docs.aws.amazon.com/AmazonS3/latest/dev/RESTAuthentication.html
1031 #RESTAuthenticationQueryStringAuth
1033 """
1035 DEFAULT_EXPIRES = 3600
1037 def __init__(self, credentials, expires=DEFAULT_EXPIRES):
1038 self.credentials = credentials
1039 self._expires = expires
1041 def _get_date(self):
1042 return str(int(time.time() + int(self._expires)))
1044 def _inject_signature(self, request, signature):
1045 query_dict = {}
1046 query_dict['AWSAccessKeyId'] = self.credentials.access_key
1047 query_dict['Signature'] = signature
1049 for header_key in request.headers:
1050 lk = header_key.lower()
1051 # For query string requests, Expires is used instead of the
1052 # Date header.
1053 if header_key == 'Date':
1054 query_dict['Expires'] = request.headers['Date']
1055 # We only want to include relevant headers in the query string.
1056 # These can be anything that starts with x-amz, is Content-MD5,
1057 # or is Content-Type.
1058 elif lk.startswith('x-amz-') or lk in (
1059 'content-md5',
1060 'content-type',
1061 ):
1062 query_dict[lk] = request.headers[lk]
1063 # Combine all of the identified headers into an encoded
1064 # query string
1065 new_query_string = percent_encode_sequence(query_dict)
1067 # Create a new url with the presigned url.
1068 p = urlsplit(request.url)
1069 if p[3]:
1070 # If there was a pre-existing query string, we should
1071 # add that back before injecting the new query string.
1072 new_query_string = f'{p[3]}&{new_query_string}'
1073 new_url_parts = (p[0], p[1], p[2], new_query_string, p[4])
1074 request.url = urlunsplit(new_url_parts)
1077class HmacV1PostAuth(HmacV1Auth):
1078 """
1079 Generates a presigned post for s3.
1081 Spec from this document:
1083 http://docs.aws.amazon.com/AmazonS3/latest/dev/UsingHTTPPOST.html
1084 """
1086 def add_auth(self, request):
1087 fields = {}
1088 if request.context.get('s3-presign-post-fields', None) is not None:
1089 fields = request.context['s3-presign-post-fields']
1091 policy = {}
1092 conditions = []
1093 if request.context.get('s3-presign-post-policy', None) is not None:
1094 policy = request.context['s3-presign-post-policy']
1095 if policy.get('conditions', None) is not None:
1096 conditions = policy['conditions']
1098 policy['conditions'] = conditions
1100 fields['AWSAccessKeyId'] = self.credentials.access_key
1102 if self.credentials.token is not None:
1103 fields['x-amz-security-token'] = self.credentials.token
1104 conditions.append({'x-amz-security-token': self.credentials.token})
1106 # Dump the base64 encoded policy into the fields dictionary.
1107 fields['policy'] = base64.b64encode(
1108 json.dumps(policy).encode('utf-8')
1109 ).decode('utf-8')
1111 fields['signature'] = self.sign_string(fields['policy'])
1113 request.context['s3-presign-post-fields'] = fields
1114 request.context['s3-presign-post-policy'] = policy
1117class BearerAuth(TokenSigner):
1118 """
1119 Performs bearer token authorization by placing the bearer token in the
1120 Authorization header as specified by Section 2.1 of RFC 6750.
1122 https://datatracker.ietf.org/doc/html/rfc6750#section-2.1
1123 """
1125 def add_auth(self, request):
1126 if self.auth_token is None:
1127 raise NoAuthTokenError()
1129 auth_header = f'Bearer {self.auth_token.token}'
1130 if 'Authorization' in request.headers:
1131 del request.headers['Authorization']
1132 request.headers['Authorization'] = auth_header
1135AUTH_TYPE_MAPS = {
1136 'v2': SigV2Auth,
1137 'v3': SigV3Auth,
1138 'v3https': SigV3Auth,
1139 's3': HmacV1Auth,
1140 's3-query': HmacV1QueryAuth,
1141 's3-presign-post': HmacV1PostAuth,
1142 's3v4-presign-post': S3SigV4PostAuth,
1143 'v4-s3express': S3ExpressAuth,
1144 'v4-s3express-query': S3ExpressQueryAuth,
1145 'v4-s3express-presign-post': S3ExpressPostAuth,
1146 'bearer': BearerAuth,
1147}
1149# Define v4 signers depending on if CRT is present
1150if HAS_CRT:
1151 from botocore.crt.auth import CRT_AUTH_TYPE_MAPS
1153 AUTH_TYPE_MAPS.update(CRT_AUTH_TYPE_MAPS)
1154else:
1155 AUTH_TYPE_MAPS.update(
1156 {
1157 'v4': SigV4Auth,
1158 'v4-query': SigV4QueryAuth,
1159 's3v4': S3SigV4Auth,
1160 's3v4-query': S3SigV4QueryAuth,
1161 }
1162 )