Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/botocore/auth.py: 20%
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1# Copyright (c) 2012-2013 Mitch Garnaat http://garnaat.org/
2# Copyright 2012-2014 Amazon.com, Inc. or its affiliates. All Rights Reserved.
3#
4# Licensed under the Apache License, Version 2.0 (the "License"). You
5# may not use this file except in compliance with the License. A copy of
6# the License is located at
7#
8# http://aws.amazon.com/apache2.0/
9#
10# or in the "license" file accompanying this file. This file is
11# distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF
12# ANY KIND, either express or implied. See the License for the specific
13# language governing permissions and limitations under the License.
14import base64
15import calendar
16import datetime
17import functools
18import hmac
19import json
20import logging
21import time
22from collections.abc import Mapping
23from email.utils import formatdate
24from hashlib import sha1, sha256
25from operator import itemgetter
27from botocore.compat import (
28 HAS_CRT,
29 MD5_AVAILABLE, # noqa: F401
30 HTTPHeaders,
31 encodebytes,
32 ensure_unicode,
33 get_current_datetime,
34 parse_qs,
35 quote,
36 unquote,
37 urlsplit,
38 urlunsplit,
39)
40from botocore.exceptions import (
41 NoAuthTokenError,
42 NoCredentialsError,
43 UnknownSignatureVersionError,
44 UnsupportedSignatureVersionError,
45)
46from botocore.utils import (
47 is_valid_ipv6_endpoint_url,
48 normalize_url_path,
49 percent_encode_sequence,
50)
52logger = logging.getLogger(__name__)
55EMPTY_SHA256_HASH = (
56 'e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855'
57)
58# This is the buffer size used when calculating sha256 checksums.
59# Experimenting with various buffer sizes showed that this value generally
60# gave the best result (in terms of performance).
61PAYLOAD_BUFFER = 1024 * 1024
62ISO8601 = '%Y-%m-%dT%H:%M:%SZ'
63SIGV4_TIMESTAMP = '%Y%m%dT%H%M%SZ'
64SIGNED_HEADERS_BLACKLIST = [
65 'connection',
66 'expect',
67 'keep-alive',
68 'proxy-authenticate',
69 'proxy-authorization',
70 'te',
71 'trailer',
72 'transfer-encoding',
73 'upgrade',
74 'user-agent',
75 'x-amzn-trace-id',
76]
77UNSIGNED_PAYLOAD = 'UNSIGNED-PAYLOAD'
78STREAMING_UNSIGNED_PAYLOAD_TRAILER = 'STREAMING-UNSIGNED-PAYLOAD-TRAILER'
81def _host_from_url(url):
82 # Given URL, derive value for host header. Ensure that value:
83 # 1) is lowercase
84 # 2) excludes port, if it was the default port
85 # 3) excludes userinfo
86 url_parts = urlsplit(url)
87 host = url_parts.hostname # urlsplit's hostname is always lowercase
88 if is_valid_ipv6_endpoint_url(url):
89 host = f'[{host}]'
90 default_ports = {
91 'http': 80,
92 'https': 443,
93 }
94 if url_parts.port is not None:
95 if url_parts.port != default_ports.get(url_parts.scheme):
96 host = f'{host}:{url_parts.port}'
97 return host
100def _get_body_as_dict(request):
101 # For query services, request.data is form-encoded and is already a
102 # dict, but for other services such as rest-json it could be a json
103 # string or bytes. In those cases we attempt to load the data as a
104 # dict.
105 data = request.data
106 if isinstance(data, bytes):
107 data = json.loads(data.decode('utf-8'))
108 elif isinstance(data, str):
109 data = json.loads(data)
110 return data
113class BaseSigner:
114 REQUIRES_REGION = False
115 REQUIRES_TOKEN = False
117 def add_auth(self, request):
118 raise NotImplementedError("add_auth")
121class TokenSigner(BaseSigner):
122 REQUIRES_TOKEN = True
123 """
124 Signers that expect an authorization token to perform the authorization
125 """
127 def __init__(self, auth_token):
128 self.auth_token = auth_token
131class SigV2Auth(BaseSigner):
132 """
133 Sign a request with Signature V2.
134 """
136 def __init__(self, credentials):
137 self.credentials = credentials
139 def calc_signature(self, request, params):
140 logger.debug("Calculating signature using v2 auth.")
141 split = urlsplit(request.url)
142 path = split.path
143 if len(path) == 0:
144 path = '/'
145 string_to_sign = f"{request.method}\n{split.netloc}\n{path}\n"
146 lhmac = hmac.new(
147 self.credentials.secret_key.encode("utf-8"), digestmod=sha256
148 )
149 pairs = []
150 for key in sorted(params):
151 # Any previous signature should not be a part of this
152 # one, so we skip that particular key. This prevents
153 # issues during retries.
154 if key == 'Signature':
155 continue
156 value = str(params[key])
157 quoted_key = quote(key.encode('utf-8'), safe='')
158 quoted_value = quote(value.encode('utf-8'), safe='-_~')
159 pairs.append(f'{quoted_key}={quoted_value}')
160 qs = '&'.join(pairs)
161 string_to_sign += qs
162 logger.debug('String to sign: %s', string_to_sign)
163 lhmac.update(string_to_sign.encode('utf-8'))
164 b64 = base64.b64encode(lhmac.digest()).strip().decode('utf-8')
165 return (qs, b64)
167 def add_auth(self, request):
168 # The auth handler is the last thing called in the
169 # preparation phase of a prepared request.
170 # Because of this we have to parse the query params
171 # from the request body so we can update them with
172 # the sigv2 auth params.
173 if self.credentials is None:
174 raise NoCredentialsError()
175 if request.data:
176 # POST
177 params = request.data
178 else:
179 # GET
180 params = request.params
181 params['AWSAccessKeyId'] = self.credentials.access_key
182 params['SignatureVersion'] = '2'
183 params['SignatureMethod'] = 'HmacSHA256'
184 params['Timestamp'] = time.strftime(ISO8601, time.gmtime())
185 if self.credentials.token:
186 params['SecurityToken'] = self.credentials.token
187 qs, signature = self.calc_signature(request, params)
188 params['Signature'] = signature
189 return request
192class SigV3Auth(BaseSigner):
193 def __init__(self, credentials):
194 self.credentials = credentials
196 def add_auth(self, request):
197 if self.credentials is None:
198 raise NoCredentialsError()
199 if 'Date' in request.headers:
200 del request.headers['Date']
201 request.headers['Date'] = formatdate(usegmt=True)
202 if self.credentials.token:
203 if 'X-Amz-Security-Token' in request.headers:
204 del request.headers['X-Amz-Security-Token']
205 request.headers['X-Amz-Security-Token'] = self.credentials.token
206 new_hmac = hmac.new(
207 self.credentials.secret_key.encode('utf-8'), digestmod=sha256
208 )
209 new_hmac.update(request.headers['Date'].encode('utf-8'))
210 encoded_signature = encodebytes(new_hmac.digest()).strip()
211 signature = (
212 f"AWS3-HTTPS AWSAccessKeyId={self.credentials.access_key},"
213 f"Algorithm=HmacSHA256,Signature={encoded_signature.decode('utf-8')}"
214 )
215 if 'X-Amzn-Authorization' in request.headers:
216 del request.headers['X-Amzn-Authorization']
217 request.headers['X-Amzn-Authorization'] = signature
220class SigV4Auth(BaseSigner):
221 """
222 Sign a request with Signature V4.
223 """
225 REQUIRES_REGION = True
227 def __init__(self, credentials, service_name, region_name):
228 self.credentials = credentials
229 # We initialize these value here so the unit tests can have
230 # valid values. But these will get overriden in ``add_auth``
231 # later for real requests.
232 self._region_name = region_name
233 self._service_name = service_name
235 def _sign(self, key, msg, hex=False):
236 if hex:
237 sig = hmac.new(key, msg.encode('utf-8'), sha256).hexdigest()
238 else:
239 sig = hmac.new(key, msg.encode('utf-8'), sha256).digest()
240 return sig
242 def headers_to_sign(self, request):
243 """
244 Select the headers from the request that need to be included
245 in the StringToSign.
246 """
247 header_map = HTTPHeaders()
248 for name, value in request.headers.items():
249 lname = name.lower()
250 if lname not in SIGNED_HEADERS_BLACKLIST:
251 header_map[lname] = value
252 if 'host' not in header_map:
253 # TODO: We should set the host ourselves, instead of relying on our
254 # HTTP client to set it for us.
255 header_map['host'] = _host_from_url(request.url)
256 return header_map
258 def canonical_query_string(self, request):
259 # The query string can come from two parts. One is the
260 # params attribute of the request. The other is from the request
261 # url (in which case we have to re-split the url into its components
262 # and parse out the query string component).
263 if request.params:
264 return self._canonical_query_string_params(request.params)
265 else:
266 return self._canonical_query_string_url(urlsplit(request.url))
268 def _canonical_query_string_params(self, params):
269 # [(key, value), (key2, value2)]
270 key_val_pairs = []
271 if isinstance(params, Mapping):
272 params = params.items()
273 for key, value in params:
274 key_val_pairs.append(
275 (quote(key, safe='-_.~'), quote(str(value), safe='-_.~'))
276 )
277 sorted_key_vals = []
278 # Sort by the URI-encoded key names, and in the case of
279 # repeated keys, then sort by the value.
280 for key, value in sorted(key_val_pairs):
281 sorted_key_vals.append(f'{key}={value}')
282 canonical_query_string = '&'.join(sorted_key_vals)
283 return canonical_query_string
285 def _canonical_query_string_url(self, parts):
286 canonical_query_string = ''
287 if parts.query:
288 # [(key, value), (key2, value2)]
289 key_val_pairs = []
290 for pair in parts.query.split('&'):
291 key, _, value = pair.partition('=')
292 key_val_pairs.append((key, value))
293 sorted_key_vals = []
294 # Sort by the URI-encoded key names, and in the case of
295 # repeated keys, then sort by the value.
296 for key, value in sorted(key_val_pairs):
297 sorted_key_vals.append(f'{key}={value}')
298 canonical_query_string = '&'.join(sorted_key_vals)
299 return canonical_query_string
301 def canonical_headers(self, headers_to_sign):
302 """
303 Return the headers that need to be included in the StringToSign
304 in their canonical form by converting all header keys to lower
305 case, sorting them in alphabetical order and then joining
306 them into a string, separated by newlines.
307 """
308 headers = []
309 sorted_header_names = sorted(set(headers_to_sign))
310 for key in sorted_header_names:
311 value = ','.join(
312 self._header_value(v) for v in headers_to_sign.get_all(key)
313 )
314 headers.append(f'{key}:{ensure_unicode(value)}')
315 return '\n'.join(headers)
317 def _header_value(self, value):
318 # From the sigv4 docs:
319 # Lowercase(HeaderName) + ':' + Trimall(HeaderValue)
320 #
321 # The Trimall function removes excess white space before and after
322 # values, and converts sequential spaces to a single space.
323 return ' '.join(value.split())
325 def signed_headers(self, headers_to_sign):
326 headers = sorted(n.lower().strip() for n in set(headers_to_sign))
327 return ';'.join(headers)
329 def _is_streaming_checksum_payload(self, request):
330 checksum_context = request.context.get('checksum', {})
331 algorithm = checksum_context.get('request_algorithm')
332 return isinstance(algorithm, dict) and algorithm.get('in') == 'trailer'
334 def payload(self, request):
335 if self._is_streaming_checksum_payload(request):
336 return STREAMING_UNSIGNED_PAYLOAD_TRAILER
337 elif not self._should_sha256_sign_payload(request):
338 # When payload signing is disabled, we use this static string in
339 # place of the payload checksum.
340 return UNSIGNED_PAYLOAD
341 request_body = request.body
342 if request_body and hasattr(request_body, 'seek'):
343 position = request_body.tell()
344 read_chunksize = functools.partial(
345 request_body.read, PAYLOAD_BUFFER
346 )
347 checksum = sha256()
348 for chunk in iter(read_chunksize, b''):
349 checksum.update(chunk)
350 hex_checksum = checksum.hexdigest()
351 request_body.seek(position)
352 return hex_checksum
353 elif request_body:
354 # The request serialization has ensured that
355 # request.body is a bytes() type.
356 return sha256(request_body).hexdigest()
357 else:
358 return EMPTY_SHA256_HASH
360 def _should_sha256_sign_payload(self, request):
361 # Payloads will always be signed over insecure connections.
362 if not request.url.startswith('https'):
363 return True
365 # Certain operations may have payload signing disabled by default.
366 # Since we don't have access to the operation model, we pass in this
367 # bit of metadata through the request context.
368 return request.context.get('payload_signing_enabled', True)
370 def canonical_request(self, request):
371 cr = [request.method.upper()]
372 path = self._normalize_url_path(urlsplit(request.url).path)
373 cr.append(path)
374 cr.append(self.canonical_query_string(request))
375 headers_to_sign = self.headers_to_sign(request)
376 cr.append(self.canonical_headers(headers_to_sign) + '\n')
377 cr.append(self.signed_headers(headers_to_sign))
378 if 'X-Amz-Content-SHA256' in request.headers:
379 body_checksum = request.headers['X-Amz-Content-SHA256']
380 else:
381 body_checksum = self.payload(request)
382 cr.append(body_checksum)
383 return '\n'.join(cr)
385 def _normalize_url_path(self, path):
386 normalized_path = quote(normalize_url_path(path), safe='/~')
387 return normalized_path
389 def scope(self, request):
390 scope = [self.credentials.access_key]
391 scope.append(request.context['timestamp'][0:8])
392 scope.append(self._region_name)
393 scope.append(self._service_name)
394 scope.append('aws4_request')
395 return '/'.join(scope)
397 def credential_scope(self, request):
398 scope = []
399 scope.append(request.context['timestamp'][0:8])
400 scope.append(self._region_name)
401 scope.append(self._service_name)
402 scope.append('aws4_request')
403 return '/'.join(scope)
405 def string_to_sign(self, request, canonical_request):
406 """
407 Return the canonical StringToSign as well as a dict
408 containing the original version of all headers that
409 were included in the StringToSign.
410 """
411 sts = ['AWS4-HMAC-SHA256']
412 sts.append(request.context['timestamp'])
413 sts.append(self.credential_scope(request))
414 sts.append(sha256(canonical_request.encode('utf-8')).hexdigest())
415 return '\n'.join(sts)
417 def signature(self, string_to_sign, request):
418 key = self.credentials.secret_key
419 k_date = self._sign(
420 (f"AWS4{key}").encode(), request.context["timestamp"][0:8]
421 )
422 k_region = self._sign(k_date, self._region_name)
423 k_service = self._sign(k_region, self._service_name)
424 k_signing = self._sign(k_service, 'aws4_request')
425 return self._sign(k_signing, string_to_sign, hex=True)
427 def add_auth(self, request):
428 if self.credentials is None:
429 raise NoCredentialsError()
430 datetime_now = get_current_datetime()
431 request.context['timestamp'] = datetime_now.strftime(SIGV4_TIMESTAMP)
432 # This could be a retry. Make sure the previous
433 # authorization header is removed first.
434 self._modify_request_before_signing(request)
435 canonical_request = self.canonical_request(request)
436 logger.debug("Calculating signature using v4 auth.")
437 logger.debug('CanonicalRequest:\n%s', canonical_request)
438 string_to_sign = self.string_to_sign(request, canonical_request)
439 logger.debug('StringToSign:\n%s', string_to_sign)
440 signature = self.signature(string_to_sign, request)
441 logger.debug('Signature:\n%s', signature)
443 self._inject_signature_to_request(request, signature)
445 def _inject_signature_to_request(self, request, signature):
446 auth_str = [f'AWS4-HMAC-SHA256 Credential={self.scope(request)}']
447 headers_to_sign = self.headers_to_sign(request)
448 auth_str.append(
449 f"SignedHeaders={self.signed_headers(headers_to_sign)}"
450 )
451 auth_str.append(f'Signature={signature}')
452 request.headers['Authorization'] = ', '.join(auth_str)
453 return request
455 def _modify_request_before_signing(self, request):
456 if 'Authorization' in request.headers:
457 del request.headers['Authorization']
458 self._set_necessary_date_headers(request)
459 if self.credentials.token:
460 if 'X-Amz-Security-Token' in request.headers:
461 del request.headers['X-Amz-Security-Token']
462 request.headers['X-Amz-Security-Token'] = self.credentials.token
464 if not request.context.get('payload_signing_enabled', True):
465 if 'X-Amz-Content-SHA256' in request.headers:
466 del request.headers['X-Amz-Content-SHA256']
467 request.headers['X-Amz-Content-SHA256'] = UNSIGNED_PAYLOAD
469 def _set_necessary_date_headers(self, request):
470 # The spec allows for either the Date _or_ the X-Amz-Date value to be
471 # used so we check both. If there's a Date header, we use the date
472 # header. Otherwise we use the X-Amz-Date header.
473 if 'Date' in request.headers:
474 del request.headers['Date']
475 datetime_timestamp = datetime.datetime.strptime(
476 request.context['timestamp'], SIGV4_TIMESTAMP
477 )
478 request.headers['Date'] = formatdate(
479 int(calendar.timegm(datetime_timestamp.timetuple()))
480 )
481 if 'X-Amz-Date' in request.headers:
482 del request.headers['X-Amz-Date']
483 else:
484 if 'X-Amz-Date' in request.headers:
485 del request.headers['X-Amz-Date']
486 request.headers['X-Amz-Date'] = request.context['timestamp']
489class S3SigV4Auth(SigV4Auth):
490 def _modify_request_before_signing(self, request):
491 super()._modify_request_before_signing(request)
492 if 'X-Amz-Content-SHA256' in request.headers:
493 del request.headers['X-Amz-Content-SHA256']
495 request.headers['X-Amz-Content-SHA256'] = self.payload(request)
497 def _should_sha256_sign_payload(self, request):
498 # S3 allows optional body signing, so to minimize the performance
499 # impact, we opt to not SHA256 sign the body on streaming uploads,
500 # provided that we're on https.
501 client_config = request.context.get('client_config')
502 s3_config = getattr(client_config, 's3', None)
504 # The config could be None if it isn't set, or if the customer sets it
505 # to None.
506 if s3_config is None:
507 s3_config = {}
509 # The explicit configuration takes precedence over any implicit
510 # configuration.
511 sign_payload = s3_config.get('payload_signing_enabled', None)
512 if sign_payload is not None:
513 return sign_payload
515 # We require that both a checksum be present and https be enabled
516 # to implicitly disable body signing. The combination of TLS and
517 # a checksum is sufficiently secure and durable for us to be
518 # confident in the request without body signing.
519 checksum_header = 'Content-MD5'
520 checksum_context = request.context.get('checksum', {})
521 algorithm = checksum_context.get('request_algorithm')
522 if isinstance(algorithm, dict) and algorithm.get('in') == 'header':
523 checksum_header = algorithm['name']
524 if (
525 not request.url.startswith("https")
526 or checksum_header not in request.headers
527 ):
528 return True
530 # If the input is streaming we disable body signing by default.
531 if request.context.get('has_streaming_input', False):
532 return False
534 # If the S3-specific checks had no results, delegate to the generic
535 # checks.
536 return super()._should_sha256_sign_payload(request)
538 def _normalize_url_path(self, path):
539 # For S3, we do not normalize the path.
540 return path
543class S3ExpressAuth(S3SigV4Auth):
544 REQUIRES_IDENTITY_CACHE = True
546 def __init__(
547 self, credentials, service_name, region_name, *, identity_cache
548 ):
549 super().__init__(credentials, service_name, region_name)
550 self._identity_cache = identity_cache
552 def add_auth(self, request):
553 super().add_auth(request)
555 def _modify_request_before_signing(self, request):
556 super()._modify_request_before_signing(request)
557 if 'x-amz-s3session-token' not in request.headers:
558 request.headers['x-amz-s3session-token'] = self.credentials.token
559 # S3Express does not support STS' X-Amz-Security-Token
560 if 'X-Amz-Security-Token' in request.headers:
561 del request.headers['X-Amz-Security-Token']
564class S3ExpressPostAuth(S3ExpressAuth):
565 REQUIRES_IDENTITY_CACHE = True
567 def add_auth(self, request):
568 datetime_now = get_current_datetime()
569 request.context['timestamp'] = datetime_now.strftime(SIGV4_TIMESTAMP)
571 fields = {}
572 if request.context.get('s3-presign-post-fields', None) is not None:
573 fields = request.context['s3-presign-post-fields']
575 policy = {}
576 conditions = []
577 if request.context.get('s3-presign-post-policy', None) is not None:
578 policy = request.context['s3-presign-post-policy']
579 if policy.get('conditions', None) is not None:
580 conditions = policy['conditions']
582 policy['conditions'] = conditions
584 fields['x-amz-algorithm'] = 'AWS4-HMAC-SHA256'
585 fields['x-amz-credential'] = self.scope(request)
586 fields['x-amz-date'] = request.context['timestamp']
588 conditions.append({'x-amz-algorithm': 'AWS4-HMAC-SHA256'})
589 conditions.append({'x-amz-credential': self.scope(request)})
590 conditions.append({'x-amz-date': request.context['timestamp']})
592 if self.credentials.token is not None:
593 fields['X-Amz-S3session-Token'] = self.credentials.token
594 conditions.append(
595 {'X-Amz-S3session-Token': self.credentials.token}
596 )
598 # Dump the base64 encoded policy into the fields dictionary.
599 fields['policy'] = base64.b64encode(
600 json.dumps(policy).encode('utf-8')
601 ).decode('utf-8')
603 fields['x-amz-signature'] = self.signature(fields['policy'], request)
605 request.context['s3-presign-post-fields'] = fields
606 request.context['s3-presign-post-policy'] = policy
609class S3ExpressQueryAuth(S3ExpressAuth):
610 DEFAULT_EXPIRES = 300
611 REQUIRES_IDENTITY_CACHE = True
613 def __init__(
614 self,
615 credentials,
616 service_name,
617 region_name,
618 *,
619 identity_cache,
620 expires=DEFAULT_EXPIRES,
621 ):
622 super().__init__(
623 credentials,
624 service_name,
625 region_name,
626 identity_cache=identity_cache,
627 )
628 self._expires = expires
630 def _modify_request_before_signing(self, request):
631 # We automatically set this header, so if it's the auto-set value we
632 # want to get rid of it since it doesn't make sense for presigned urls.
633 content_type = request.headers.get('content-type')
634 blocklisted_content_type = (
635 'application/x-www-form-urlencoded; charset=utf-8'
636 )
637 if content_type == blocklisted_content_type:
638 del request.headers['content-type']
640 # Note that we're not including X-Amz-Signature.
641 # From the docs: "The Canonical Query String must include all the query
642 # parameters from the preceding table except for X-Amz-Signature.
643 signed_headers = self.signed_headers(self.headers_to_sign(request))
645 auth_params = {
646 'X-Amz-Algorithm': 'AWS4-HMAC-SHA256',
647 'X-Amz-Credential': self.scope(request),
648 'X-Amz-Date': request.context['timestamp'],
649 'X-Amz-Expires': self._expires,
650 'X-Amz-SignedHeaders': signed_headers,
651 }
652 if self.credentials.token is not None:
653 auth_params['X-Amz-S3session-Token'] = self.credentials.token
654 # Now parse the original query string to a dict, inject our new query
655 # params, and serialize back to a query string.
656 url_parts = urlsplit(request.url)
657 # parse_qs makes each value a list, but in our case we know we won't
658 # have repeated keys so we know we have single element lists which we
659 # can convert back to scalar values.
660 query_string_parts = parse_qs(url_parts.query, keep_blank_values=True)
661 query_dict = {k: v[0] for k, v in query_string_parts.items()}
663 if request.params:
664 query_dict.update(request.params)
665 request.params = {}
666 # The spec is particular about this. It *has* to be:
667 # https://<endpoint>?<operation params>&<auth params>
668 # You can't mix the two types of params together, i.e just keep doing
669 # new_query_params.update(op_params)
670 # new_query_params.update(auth_params)
671 # percent_encode_sequence(new_query_params)
672 operation_params = ''
673 if request.data:
674 # We also need to move the body params into the query string. To
675 # do this, we first have to convert it to a dict.
676 query_dict.update(_get_body_as_dict(request))
677 request.data = ''
678 if query_dict:
679 operation_params = percent_encode_sequence(query_dict) + '&'
680 new_query_string = (
681 f"{operation_params}{percent_encode_sequence(auth_params)}"
682 )
683 # url_parts is a tuple (and therefore immutable) so we need to create
684 # a new url_parts with the new query string.
685 # <part> - <index>
686 # scheme - 0
687 # netloc - 1
688 # path - 2
689 # query - 3 <-- we're replacing this.
690 # fragment - 4
691 p = url_parts
692 new_url_parts = (p[0], p[1], p[2], new_query_string, p[4])
693 request.url = urlunsplit(new_url_parts)
695 def _inject_signature_to_request(self, request, signature):
696 # Rather than calculating an "Authorization" header, for the query
697 # param quth, we just append an 'X-Amz-Signature' param to the end
698 # of the query string.
699 request.url += f'&X-Amz-Signature={signature}'
701 def _normalize_url_path(self, path):
702 # For S3, we do not normalize the path.
703 return path
705 def payload(self, request):
706 # From the doc link above:
707 # "You don't include a payload hash in the Canonical Request, because
708 # when you create a presigned URL, you don't know anything about the
709 # payload. Instead, you use a constant string "UNSIGNED-PAYLOAD".
710 return UNSIGNED_PAYLOAD
713class SigV4QueryAuth(SigV4Auth):
714 DEFAULT_EXPIRES = 3600
716 def __init__(
717 self, credentials, service_name, region_name, expires=DEFAULT_EXPIRES
718 ):
719 super().__init__(credentials, service_name, region_name)
720 self._expires = expires
722 def _modify_request_before_signing(self, request):
723 # We automatically set this header, so if it's the auto-set value we
724 # want to get rid of it since it doesn't make sense for presigned urls.
725 content_type = request.headers.get('content-type')
726 blacklisted_content_type = (
727 'application/x-www-form-urlencoded; charset=utf-8'
728 )
729 if content_type == blacklisted_content_type:
730 del request.headers['content-type']
732 # Note that we're not including X-Amz-Signature.
733 # From the docs: "The Canonical Query String must include all the query
734 # parameters from the preceding table except for X-Amz-Signature.
735 signed_headers = self.signed_headers(self.headers_to_sign(request))
737 auth_params = {
738 'X-Amz-Algorithm': 'AWS4-HMAC-SHA256',
739 'X-Amz-Credential': self.scope(request),
740 'X-Amz-Date': request.context['timestamp'],
741 'X-Amz-Expires': self._expires,
742 'X-Amz-SignedHeaders': signed_headers,
743 }
744 if self.credentials.token is not None:
745 auth_params['X-Amz-Security-Token'] = self.credentials.token
746 # Now parse the original query string to a dict, inject our new query
747 # params, and serialize back to a query string.
748 url_parts = urlsplit(request.url)
749 # parse_qs makes each value a list, but in our case we know we won't
750 # have repeated keys so we know we have single element lists which we
751 # can convert back to scalar values.
752 query_string_parts = parse_qs(url_parts.query, keep_blank_values=True)
753 query_dict = {k: v[0] for k, v in query_string_parts.items()}
755 if request.params:
756 query_dict.update(request.params)
757 request.params = {}
758 # The spec is particular about this. It *has* to be:
759 # https://<endpoint>?<operation params>&<auth params>
760 # You can't mix the two types of params together, i.e just keep doing
761 # new_query_params.update(op_params)
762 # new_query_params.update(auth_params)
763 # percent_encode_sequence(new_query_params)
764 operation_params = ''
765 if request.data:
766 # We also need to move the body params into the query string. To
767 # do this, we first have to convert it to a dict.
768 query_dict.update(_get_body_as_dict(request))
769 request.data = ''
770 if query_dict:
771 operation_params = percent_encode_sequence(query_dict) + '&'
772 new_query_string = (
773 f"{operation_params}{percent_encode_sequence(auth_params)}"
774 )
775 # url_parts is a tuple (and therefore immutable) so we need to create
776 # a new url_parts with the new query string.
777 # <part> - <index>
778 # scheme - 0
779 # netloc - 1
780 # path - 2
781 # query - 3 <-- we're replacing this.
782 # fragment - 4
783 p = url_parts
784 new_url_parts = (p[0], p[1], p[2], new_query_string, p[4])
785 request.url = urlunsplit(new_url_parts)
787 def _inject_signature_to_request(self, request, signature):
788 # Rather than calculating an "Authorization" header, for the query
789 # param quth, we just append an 'X-Amz-Signature' param to the end
790 # of the query string.
791 request.url += f'&X-Amz-Signature={signature}'
794class S3SigV4QueryAuth(SigV4QueryAuth):
795 """S3 SigV4 auth using query parameters.
797 This signer will sign a request using query parameters and signature
798 version 4, i.e a "presigned url" signer.
800 Based off of:
802 http://docs.aws.amazon.com/AmazonS3/latest/API/sigv4-query-string-auth.html
804 """
806 def _normalize_url_path(self, path):
807 # For S3, we do not normalize the path.
808 return path
810 def payload(self, request):
811 # From the doc link above:
812 # "You don't include a payload hash in the Canonical Request, because
813 # when you create a presigned URL, you don't know anything about the
814 # payload. Instead, you use a constant string "UNSIGNED-PAYLOAD".
815 return UNSIGNED_PAYLOAD
818class S3SigV4PostAuth(SigV4Auth):
819 """
820 Presigns a s3 post
822 Implementation doc here:
823 http://docs.aws.amazon.com/AmazonS3/latest/API/sigv4-UsingHTTPPOST.html
824 """
826 def add_auth(self, request):
827 datetime_now = get_current_datetime()
828 request.context['timestamp'] = datetime_now.strftime(SIGV4_TIMESTAMP)
830 fields = {}
831 if request.context.get('s3-presign-post-fields', None) is not None:
832 fields = request.context['s3-presign-post-fields']
834 policy = {}
835 conditions = []
836 if request.context.get('s3-presign-post-policy', None) is not None:
837 policy = request.context['s3-presign-post-policy']
838 if policy.get('conditions', None) is not None:
839 conditions = policy['conditions']
841 policy['conditions'] = conditions
843 fields['x-amz-algorithm'] = 'AWS4-HMAC-SHA256'
844 fields['x-amz-credential'] = self.scope(request)
845 fields['x-amz-date'] = request.context['timestamp']
847 conditions.append({'x-amz-algorithm': 'AWS4-HMAC-SHA256'})
848 conditions.append({'x-amz-credential': self.scope(request)})
849 conditions.append({'x-amz-date': request.context['timestamp']})
851 if self.credentials.token is not None:
852 fields['x-amz-security-token'] = self.credentials.token
853 conditions.append({'x-amz-security-token': self.credentials.token})
855 # Dump the base64 encoded policy into the fields dictionary.
856 fields['policy'] = base64.b64encode(
857 json.dumps(policy).encode('utf-8')
858 ).decode('utf-8')
860 fields['x-amz-signature'] = self.signature(fields['policy'], request)
862 request.context['s3-presign-post-fields'] = fields
863 request.context['s3-presign-post-policy'] = policy
866class HmacV1Auth(BaseSigner):
867 # List of Query String Arguments of Interest
868 QSAOfInterest = [
869 'accelerate',
870 'acl',
871 'cors',
872 'defaultObjectAcl',
873 'location',
874 'logging',
875 'partNumber',
876 'policy',
877 'requestPayment',
878 'torrent',
879 'versioning',
880 'versionId',
881 'versions',
882 'website',
883 'uploads',
884 'uploadId',
885 'response-content-type',
886 'response-content-language',
887 'response-expires',
888 'response-cache-control',
889 'response-content-disposition',
890 'response-content-encoding',
891 'delete',
892 'lifecycle',
893 'tagging',
894 'restore',
895 'storageClass',
896 'notification',
897 'replication',
898 'requestPayment',
899 'analytics',
900 'metrics',
901 'inventory',
902 'select',
903 'select-type',
904 'object-lock',
905 ]
907 def __init__(self, credentials, service_name=None, region_name=None):
908 self.credentials = credentials
910 def sign_string(self, string_to_sign):
911 new_hmac = hmac.new(
912 self.credentials.secret_key.encode('utf-8'), digestmod=sha1
913 )
914 new_hmac.update(string_to_sign.encode('utf-8'))
915 return encodebytes(new_hmac.digest()).strip().decode('utf-8')
917 def canonical_standard_headers(self, headers):
918 interesting_headers = ['content-md5', 'content-type', 'date']
919 hoi = []
920 if 'Date' in headers:
921 del headers['Date']
922 headers['Date'] = self._get_date()
923 for ih in interesting_headers:
924 found = False
925 for key in headers:
926 lk = key.lower()
927 if headers[key] is not None and lk == ih:
928 hoi.append(headers[key].strip())
929 found = True
930 if not found:
931 hoi.append('')
932 return '\n'.join(hoi)
934 def canonical_custom_headers(self, headers):
935 hoi = []
936 custom_headers = {}
937 for key in headers:
938 lk = key.lower()
939 if headers[key] is not None:
940 if lk.startswith('x-amz-'):
941 custom_headers[lk] = ','.join(
942 v.strip() for v in headers.get_all(key)
943 )
944 sorted_header_keys = sorted(custom_headers.keys())
945 for key in sorted_header_keys:
946 hoi.append(f"{key}:{custom_headers[key]}")
947 return '\n'.join(hoi)
949 def unquote_v(self, nv):
950 """
951 TODO: Do we need this?
952 """
953 if len(nv) == 1:
954 return nv
955 else:
956 return (nv[0], unquote(nv[1]))
958 def canonical_resource(self, split, auth_path=None):
959 # don't include anything after the first ? in the resource...
960 # unless it is one of the QSA of interest, defined above
961 # NOTE:
962 # The path in the canonical resource should always be the
963 # full path including the bucket name, even for virtual-hosting
964 # style addressing. The ``auth_path`` keeps track of the full
965 # path for the canonical resource and would be passed in if
966 # the client was using virtual-hosting style.
967 if auth_path is not None:
968 buf = auth_path
969 else:
970 buf = split.path
971 if split.query:
972 qsa = split.query.split('&')
973 qsa = [a.split('=', 1) for a in qsa]
974 qsa = [
975 self.unquote_v(a) for a in qsa if a[0] in self.QSAOfInterest
976 ]
977 if len(qsa) > 0:
978 qsa.sort(key=itemgetter(0))
979 qsa = ['='.join(a) for a in qsa]
980 buf += '?'
981 buf += '&'.join(qsa)
982 return buf
984 def canonical_string(
985 self, method, split, headers, expires=None, auth_path=None
986 ):
987 cs = method.upper() + '\n'
988 cs += self.canonical_standard_headers(headers) + '\n'
989 custom_headers = self.canonical_custom_headers(headers)
990 if custom_headers:
991 cs += custom_headers + '\n'
992 cs += self.canonical_resource(split, auth_path=auth_path)
993 return cs
995 def get_signature(
996 self, method, split, headers, expires=None, auth_path=None
997 ):
998 if self.credentials.token:
999 del headers['x-amz-security-token']
1000 headers['x-amz-security-token'] = self.credentials.token
1001 string_to_sign = self.canonical_string(
1002 method, split, headers, auth_path=auth_path
1003 )
1004 logger.debug('StringToSign:\n%s', string_to_sign)
1005 return self.sign_string(string_to_sign)
1007 def add_auth(self, request):
1008 if self.credentials is None:
1009 raise NoCredentialsError
1010 logger.debug("Calculating signature using hmacv1 auth.")
1011 split = urlsplit(request.url)
1012 logger.debug("HTTP request method: %s", request.method)
1013 signature = self.get_signature(
1014 request.method, split, request.headers, auth_path=request.auth_path
1015 )
1016 self._inject_signature(request, signature)
1018 def _get_date(self):
1019 return formatdate(usegmt=True)
1021 def _inject_signature(self, request, signature):
1022 if 'Authorization' in request.headers:
1023 # We have to do this because request.headers is not
1024 # normal dictionary. It has the (unintuitive) behavior
1025 # of aggregating repeated setattr calls for the same
1026 # key value. For example:
1027 # headers['foo'] = 'a'; headers['foo'] = 'b'
1028 # list(headers) will print ['foo', 'foo'].
1029 del request.headers['Authorization']
1031 auth_header = f"AWS {self.credentials.access_key}:{signature}"
1032 request.headers['Authorization'] = auth_header
1035class HmacV1QueryAuth(HmacV1Auth):
1036 """
1037 Generates a presigned request for s3.
1039 Spec from this document:
1041 http://docs.aws.amazon.com/AmazonS3/latest/dev/RESTAuthentication.html
1042 #RESTAuthenticationQueryStringAuth
1044 """
1046 DEFAULT_EXPIRES = 3600
1048 def __init__(self, credentials, expires=DEFAULT_EXPIRES):
1049 self.credentials = credentials
1050 self._expires = expires
1052 def _get_date(self):
1053 return str(int(time.time() + int(self._expires)))
1055 def _inject_signature(self, request, signature):
1056 query_dict = {}
1057 query_dict['AWSAccessKeyId'] = self.credentials.access_key
1058 query_dict['Signature'] = signature
1060 for header_key in request.headers:
1061 lk = header_key.lower()
1062 # For query string requests, Expires is used instead of the
1063 # Date header.
1064 if header_key == 'Date':
1065 query_dict['Expires'] = request.headers['Date']
1066 # We only want to include relevant headers in the query string.
1067 # These can be anything that starts with x-amz, is Content-MD5,
1068 # or is Content-Type.
1069 elif lk.startswith('x-amz-') or lk in (
1070 'content-md5',
1071 'content-type',
1072 ):
1073 query_dict[lk] = request.headers[lk]
1074 # Combine all of the identified headers into an encoded
1075 # query string
1076 new_query_string = percent_encode_sequence(query_dict)
1078 # Create a new url with the presigned url.
1079 p = urlsplit(request.url)
1080 if p[3]:
1081 # If there was a pre-existing query string, we should
1082 # add that back before injecting the new query string.
1083 new_query_string = f'{p[3]}&{new_query_string}'
1084 new_url_parts = (p[0], p[1], p[2], new_query_string, p[4])
1085 request.url = urlunsplit(new_url_parts)
1088class HmacV1PostAuth(HmacV1Auth):
1089 """
1090 Generates a presigned post for s3.
1092 Spec from this document:
1094 http://docs.aws.amazon.com/AmazonS3/latest/dev/UsingHTTPPOST.html
1095 """
1097 def add_auth(self, request):
1098 fields = {}
1099 if request.context.get('s3-presign-post-fields', None) is not None:
1100 fields = request.context['s3-presign-post-fields']
1102 policy = {}
1103 conditions = []
1104 if request.context.get('s3-presign-post-policy', None) is not None:
1105 policy = request.context['s3-presign-post-policy']
1106 if policy.get('conditions', None) is not None:
1107 conditions = policy['conditions']
1109 policy['conditions'] = conditions
1111 fields['AWSAccessKeyId'] = self.credentials.access_key
1113 if self.credentials.token is not None:
1114 fields['x-amz-security-token'] = self.credentials.token
1115 conditions.append({'x-amz-security-token': self.credentials.token})
1117 # Dump the base64 encoded policy into the fields dictionary.
1118 fields['policy'] = base64.b64encode(
1119 json.dumps(policy).encode('utf-8')
1120 ).decode('utf-8')
1122 fields['signature'] = self.sign_string(fields['policy'])
1124 request.context['s3-presign-post-fields'] = fields
1125 request.context['s3-presign-post-policy'] = policy
1128class BearerAuth(TokenSigner):
1129 """
1130 Performs bearer token authorization by placing the bearer token in the
1131 Authorization header as specified by Section 2.1 of RFC 6750.
1133 https://datatracker.ietf.org/doc/html/rfc6750#section-2.1
1134 """
1136 def add_auth(self, request):
1137 if self.auth_token is None:
1138 raise NoAuthTokenError()
1140 auth_header = f'Bearer {self.auth_token.token}'
1141 if 'Authorization' in request.headers:
1142 del request.headers['Authorization']
1143 request.headers['Authorization'] = auth_header
1146def resolve_auth_type(auth_trait):
1147 for auth_type in auth_trait:
1148 if auth_type == 'smithy.api#noAuth':
1149 return AUTH_TYPE_TO_SIGNATURE_VERSION[auth_type]
1150 elif auth_type in AUTH_TYPE_TO_SIGNATURE_VERSION:
1151 signature_version = AUTH_TYPE_TO_SIGNATURE_VERSION[auth_type]
1152 if signature_version in AUTH_TYPE_MAPS:
1153 return signature_version
1154 else:
1155 raise UnknownSignatureVersionError(signature_version=auth_type)
1156 raise UnsupportedSignatureVersionError(signature_version=auth_trait)
1159def resolve_auth_scheme_preference(preference_list, auth_options):
1160 service_supported = [scheme.split('#')[-1] for scheme in auth_options]
1162 unsupported = [
1163 scheme
1164 for scheme in preference_list
1165 if scheme not in AUTH_PREF_TO_SIGNATURE_VERSION
1166 ]
1167 if unsupported:
1168 logger.debug(
1169 "Unsupported auth schemes in preference list: %r", unsupported
1170 )
1172 combined = preference_list + service_supported
1173 prioritized_schemes = [
1174 scheme
1175 for scheme in dict.fromkeys(combined)
1176 if scheme in service_supported
1177 ]
1179 for scheme in prioritized_schemes:
1180 if scheme == 'noAuth':
1181 return AUTH_PREF_TO_SIGNATURE_VERSION[scheme]
1182 sig_version = AUTH_PREF_TO_SIGNATURE_VERSION.get(scheme)
1183 if sig_version in AUTH_TYPE_MAPS:
1184 return sig_version
1186 raise UnsupportedSignatureVersionError(
1187 signature_version=', '.join(sorted(service_supported))
1188 )
1191AUTH_TYPE_MAPS = {
1192 'v2': SigV2Auth,
1193 'v3': SigV3Auth,
1194 'v3https': SigV3Auth,
1195 's3': HmacV1Auth,
1196 's3-query': HmacV1QueryAuth,
1197 's3-presign-post': HmacV1PostAuth,
1198 's3v4-presign-post': S3SigV4PostAuth,
1199 'v4-s3express': S3ExpressAuth,
1200 'v4-s3express-query': S3ExpressQueryAuth,
1201 'v4-s3express-presign-post': S3ExpressPostAuth,
1202 'bearer': BearerAuth,
1203}
1205# Define v4 signers depending on if CRT is present
1206if HAS_CRT:
1207 from botocore.crt.auth import CRT_AUTH_TYPE_MAPS
1209 AUTH_TYPE_MAPS.update(CRT_AUTH_TYPE_MAPS)
1210else:
1211 AUTH_TYPE_MAPS.update(
1212 {
1213 'v4': SigV4Auth,
1214 'v4-query': SigV4QueryAuth,
1215 's3v4': S3SigV4Auth,
1216 's3v4-query': S3SigV4QueryAuth,
1217 }
1218 )
1220AUTH_TYPE_TO_SIGNATURE_VERSION = {
1221 'aws.auth#sigv4': 'v4',
1222 'aws.auth#sigv4a': 'v4a',
1223 'smithy.api#httpBearerAuth': 'bearer',
1224 'smithy.api#noAuth': 'none',
1225}
1227# Mapping used specifically for resolving user-configured auth scheme preferences.
1228# This is similar to AUTH_TYPE_TO_SIGNATURE_VERSION, but uses simplified keys by
1229# stripping the auth trait prefixes ('smithy.api#httpBearerAuth' → 'httpBearerAuth').
1230# These simplified keys match what customers are expected to provide in configuration.
1231AUTH_PREF_TO_SIGNATURE_VERSION = {
1232 auth_scheme.split('#')[-1]: sig_version
1233 for auth_scheme, sig_version in AUTH_TYPE_TO_SIGNATURE_VERSION.items()
1234}