Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/botocore/auth.py: 20%
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1# Copyright (c) 2012-2013 Mitch Garnaat http://garnaat.org/
2# Copyright 2012-2014 Amazon.com, Inc. or its affiliates. All Rights Reserved.
3#
4# Licensed under the Apache License, Version 2.0 (the "License"). You
5# may not use this file except in compliance with the License. A copy of
6# the License is located at
7#
8# http://aws.amazon.com/apache2.0/
9#
10# or in the "license" file accompanying this file. This file is
11# distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF
12# ANY KIND, either express or implied. See the License for the specific
13# language governing permissions and limitations under the License.
14import base64
15import calendar
16import datetime
17import functools
18import hmac
19import json
20import logging
21import time
22from collections.abc import Mapping
23from email.utils import formatdate
24from hashlib import sha1, sha256
25from operator import itemgetter
27from botocore.compat import (
28 HAS_CRT,
29 MD5_AVAILABLE, # noqa: F401
30 HTTPHeaders,
31 encodebytes,
32 ensure_unicode,
33 parse_qs,
34 quote,
35 unquote,
36 urlsplit,
37 urlunsplit,
38)
39from botocore.exceptions import (
40 NoAuthTokenError,
41 NoCredentialsError,
42 UnknownSignatureVersionError,
43 UnsupportedSignatureVersionError,
44)
45from botocore.utils import (
46 is_valid_ipv6_endpoint_url,
47 normalize_url_path,
48 percent_encode_sequence,
49)
51logger = logging.getLogger(__name__)
54EMPTY_SHA256_HASH = (
55 'e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855'
56)
57# This is the buffer size used when calculating sha256 checksums.
58# Experimenting with various buffer sizes showed that this value generally
59# gave the best result (in terms of performance).
60PAYLOAD_BUFFER = 1024 * 1024
61ISO8601 = '%Y-%m-%dT%H:%M:%SZ'
62SIGV4_TIMESTAMP = '%Y%m%dT%H%M%SZ'
63SIGNED_HEADERS_BLACKLIST = [
64 'expect',
65 'transfer-encoding',
66 'user-agent',
67 'x-amzn-trace-id',
68]
69UNSIGNED_PAYLOAD = 'UNSIGNED-PAYLOAD'
70STREAMING_UNSIGNED_PAYLOAD_TRAILER = 'STREAMING-UNSIGNED-PAYLOAD-TRAILER'
73def _host_from_url(url):
74 # Given URL, derive value for host header. Ensure that value:
75 # 1) is lowercase
76 # 2) excludes port, if it was the default port
77 # 3) excludes userinfo
78 url_parts = urlsplit(url)
79 host = url_parts.hostname # urlsplit's hostname is always lowercase
80 if is_valid_ipv6_endpoint_url(url):
81 host = f'[{host}]'
82 default_ports = {
83 'http': 80,
84 'https': 443,
85 }
86 if url_parts.port is not None:
87 if url_parts.port != default_ports.get(url_parts.scheme):
88 host = f'{host}:{url_parts.port}'
89 return host
92def _get_body_as_dict(request):
93 # For query services, request.data is form-encoded and is already a
94 # dict, but for other services such as rest-json it could be a json
95 # string or bytes. In those cases we attempt to load the data as a
96 # dict.
97 data = request.data
98 if isinstance(data, bytes):
99 data = json.loads(data.decode('utf-8'))
100 elif isinstance(data, str):
101 data = json.loads(data)
102 return data
105class BaseSigner:
106 REQUIRES_REGION = False
107 REQUIRES_TOKEN = False
109 def add_auth(self, request):
110 raise NotImplementedError("add_auth")
113class TokenSigner(BaseSigner):
114 REQUIRES_TOKEN = True
115 """
116 Signers that expect an authorization token to perform the authorization
117 """
119 def __init__(self, auth_token):
120 self.auth_token = auth_token
123class SigV2Auth(BaseSigner):
124 """
125 Sign a request with Signature V2.
126 """
128 def __init__(self, credentials):
129 self.credentials = credentials
131 def calc_signature(self, request, params):
132 logger.debug("Calculating signature using v2 auth.")
133 split = urlsplit(request.url)
134 path = split.path
135 if len(path) == 0:
136 path = '/'
137 string_to_sign = f"{request.method}\n{split.netloc}\n{path}\n"
138 lhmac = hmac.new(
139 self.credentials.secret_key.encode("utf-8"), digestmod=sha256
140 )
141 pairs = []
142 for key in sorted(params):
143 # Any previous signature should not be a part of this
144 # one, so we skip that particular key. This prevents
145 # issues during retries.
146 if key == 'Signature':
147 continue
148 value = str(params[key])
149 quoted_key = quote(key.encode('utf-8'), safe='')
150 quoted_value = quote(value.encode('utf-8'), safe='-_~')
151 pairs.append(f'{quoted_key}={quoted_value}')
152 qs = '&'.join(pairs)
153 string_to_sign += qs
154 logger.debug('String to sign: %s', string_to_sign)
155 lhmac.update(string_to_sign.encode('utf-8'))
156 b64 = base64.b64encode(lhmac.digest()).strip().decode('utf-8')
157 return (qs, b64)
159 def add_auth(self, request):
160 # The auth handler is the last thing called in the
161 # preparation phase of a prepared request.
162 # Because of this we have to parse the query params
163 # from the request body so we can update them with
164 # the sigv2 auth params.
165 if self.credentials is None:
166 raise NoCredentialsError()
167 if request.data:
168 # POST
169 params = request.data
170 else:
171 # GET
172 params = request.params
173 params['AWSAccessKeyId'] = self.credentials.access_key
174 params['SignatureVersion'] = '2'
175 params['SignatureMethod'] = 'HmacSHA256'
176 params['Timestamp'] = time.strftime(ISO8601, time.gmtime())
177 if self.credentials.token:
178 params['SecurityToken'] = self.credentials.token
179 qs, signature = self.calc_signature(request, params)
180 params['Signature'] = signature
181 return request
184class SigV3Auth(BaseSigner):
185 def __init__(self, credentials):
186 self.credentials = credentials
188 def add_auth(self, request):
189 if self.credentials is None:
190 raise NoCredentialsError()
191 if 'Date' in request.headers:
192 del request.headers['Date']
193 request.headers['Date'] = formatdate(usegmt=True)
194 if self.credentials.token:
195 if 'X-Amz-Security-Token' in request.headers:
196 del request.headers['X-Amz-Security-Token']
197 request.headers['X-Amz-Security-Token'] = self.credentials.token
198 new_hmac = hmac.new(
199 self.credentials.secret_key.encode('utf-8'), digestmod=sha256
200 )
201 new_hmac.update(request.headers['Date'].encode('utf-8'))
202 encoded_signature = encodebytes(new_hmac.digest()).strip()
203 signature = (
204 f"AWS3-HTTPS AWSAccessKeyId={self.credentials.access_key},"
205 f"Algorithm=HmacSHA256,Signature={encoded_signature.decode('utf-8')}"
206 )
207 if 'X-Amzn-Authorization' in request.headers:
208 del request.headers['X-Amzn-Authorization']
209 request.headers['X-Amzn-Authorization'] = signature
212class SigV4Auth(BaseSigner):
213 """
214 Sign a request with Signature V4.
215 """
217 REQUIRES_REGION = True
219 def __init__(self, credentials, service_name, region_name):
220 self.credentials = credentials
221 # We initialize these value here so the unit tests can have
222 # valid values. But these will get overriden in ``add_auth``
223 # later for real requests.
224 self._region_name = region_name
225 self._service_name = service_name
227 def _sign(self, key, msg, hex=False):
228 if hex:
229 sig = hmac.new(key, msg.encode('utf-8'), sha256).hexdigest()
230 else:
231 sig = hmac.new(key, msg.encode('utf-8'), sha256).digest()
232 return sig
234 def headers_to_sign(self, request):
235 """
236 Select the headers from the request that need to be included
237 in the StringToSign.
238 """
239 header_map = HTTPHeaders()
240 for name, value in request.headers.items():
241 lname = name.lower()
242 if lname not in SIGNED_HEADERS_BLACKLIST:
243 header_map[lname] = value
244 if 'host' not in header_map:
245 # TODO: We should set the host ourselves, instead of relying on our
246 # HTTP client to set it for us.
247 header_map['host'] = _host_from_url(request.url)
248 return header_map
250 def canonical_query_string(self, request):
251 # The query string can come from two parts. One is the
252 # params attribute of the request. The other is from the request
253 # url (in which case we have to re-split the url into its components
254 # and parse out the query string component).
255 if request.params:
256 return self._canonical_query_string_params(request.params)
257 else:
258 return self._canonical_query_string_url(urlsplit(request.url))
260 def _canonical_query_string_params(self, params):
261 # [(key, value), (key2, value2)]
262 key_val_pairs = []
263 if isinstance(params, Mapping):
264 params = params.items()
265 for key, value in params:
266 key_val_pairs.append(
267 (quote(key, safe='-_.~'), quote(str(value), safe='-_.~'))
268 )
269 sorted_key_vals = []
270 # Sort by the URI-encoded key names, and in the case of
271 # repeated keys, then sort by the value.
272 for key, value in sorted(key_val_pairs):
273 sorted_key_vals.append(f'{key}={value}')
274 canonical_query_string = '&'.join(sorted_key_vals)
275 return canonical_query_string
277 def _canonical_query_string_url(self, parts):
278 canonical_query_string = ''
279 if parts.query:
280 # [(key, value), (key2, value2)]
281 key_val_pairs = []
282 for pair in parts.query.split('&'):
283 key, _, value = pair.partition('=')
284 key_val_pairs.append((key, value))
285 sorted_key_vals = []
286 # Sort by the URI-encoded key names, and in the case of
287 # repeated keys, then sort by the value.
288 for key, value in sorted(key_val_pairs):
289 sorted_key_vals.append(f'{key}={value}')
290 canonical_query_string = '&'.join(sorted_key_vals)
291 return canonical_query_string
293 def canonical_headers(self, headers_to_sign):
294 """
295 Return the headers that need to be included in the StringToSign
296 in their canonical form by converting all header keys to lower
297 case, sorting them in alphabetical order and then joining
298 them into a string, separated by newlines.
299 """
300 headers = []
301 sorted_header_names = sorted(set(headers_to_sign))
302 for key in sorted_header_names:
303 value = ','.join(
304 self._header_value(v) for v in headers_to_sign.get_all(key)
305 )
306 headers.append(f'{key}:{ensure_unicode(value)}')
307 return '\n'.join(headers)
309 def _header_value(self, value):
310 # From the sigv4 docs:
311 # Lowercase(HeaderName) + ':' + Trimall(HeaderValue)
312 #
313 # The Trimall function removes excess white space before and after
314 # values, and converts sequential spaces to a single space.
315 return ' '.join(value.split())
317 def signed_headers(self, headers_to_sign):
318 headers = sorted(n.lower().strip() for n in set(headers_to_sign))
319 return ';'.join(headers)
321 def _is_streaming_checksum_payload(self, request):
322 checksum_context = request.context.get('checksum', {})
323 algorithm = checksum_context.get('request_algorithm')
324 return isinstance(algorithm, dict) and algorithm.get('in') == 'trailer'
326 def payload(self, request):
327 if self._is_streaming_checksum_payload(request):
328 return STREAMING_UNSIGNED_PAYLOAD_TRAILER
329 elif not self._should_sha256_sign_payload(request):
330 # When payload signing is disabled, we use this static string in
331 # place of the payload checksum.
332 return UNSIGNED_PAYLOAD
333 request_body = request.body
334 if request_body and hasattr(request_body, 'seek'):
335 position = request_body.tell()
336 read_chunksize = functools.partial(
337 request_body.read, PAYLOAD_BUFFER
338 )
339 checksum = sha256()
340 for chunk in iter(read_chunksize, b''):
341 checksum.update(chunk)
342 hex_checksum = checksum.hexdigest()
343 request_body.seek(position)
344 return hex_checksum
345 elif request_body:
346 # The request serialization has ensured that
347 # request.body is a bytes() type.
348 return sha256(request_body).hexdigest()
349 else:
350 return EMPTY_SHA256_HASH
352 def _should_sha256_sign_payload(self, request):
353 # Payloads will always be signed over insecure connections.
354 if not request.url.startswith('https'):
355 return True
357 # Certain operations may have payload signing disabled by default.
358 # Since we don't have access to the operation model, we pass in this
359 # bit of metadata through the request context.
360 return request.context.get('payload_signing_enabled', True)
362 def canonical_request(self, request):
363 cr = [request.method.upper()]
364 path = self._normalize_url_path(urlsplit(request.url).path)
365 cr.append(path)
366 cr.append(self.canonical_query_string(request))
367 headers_to_sign = self.headers_to_sign(request)
368 cr.append(self.canonical_headers(headers_to_sign) + '\n')
369 cr.append(self.signed_headers(headers_to_sign))
370 if 'X-Amz-Content-SHA256' in request.headers:
371 body_checksum = request.headers['X-Amz-Content-SHA256']
372 else:
373 body_checksum = self.payload(request)
374 cr.append(body_checksum)
375 return '\n'.join(cr)
377 def _normalize_url_path(self, path):
378 normalized_path = quote(normalize_url_path(path), safe='/~')
379 return normalized_path
381 def scope(self, request):
382 scope = [self.credentials.access_key]
383 scope.append(request.context['timestamp'][0:8])
384 scope.append(self._region_name)
385 scope.append(self._service_name)
386 scope.append('aws4_request')
387 return '/'.join(scope)
389 def credential_scope(self, request):
390 scope = []
391 scope.append(request.context['timestamp'][0:8])
392 scope.append(self._region_name)
393 scope.append(self._service_name)
394 scope.append('aws4_request')
395 return '/'.join(scope)
397 def string_to_sign(self, request, canonical_request):
398 """
399 Return the canonical StringToSign as well as a dict
400 containing the original version of all headers that
401 were included in the StringToSign.
402 """
403 sts = ['AWS4-HMAC-SHA256']
404 sts.append(request.context['timestamp'])
405 sts.append(self.credential_scope(request))
406 sts.append(sha256(canonical_request.encode('utf-8')).hexdigest())
407 return '\n'.join(sts)
409 def signature(self, string_to_sign, request):
410 key = self.credentials.secret_key
411 k_date = self._sign(
412 (f"AWS4{key}").encode(), request.context["timestamp"][0:8]
413 )
414 k_region = self._sign(k_date, self._region_name)
415 k_service = self._sign(k_region, self._service_name)
416 k_signing = self._sign(k_service, 'aws4_request')
417 return self._sign(k_signing, string_to_sign, hex=True)
419 def add_auth(self, request):
420 if self.credentials is None:
421 raise NoCredentialsError()
422 datetime_now = datetime.datetime.utcnow()
423 request.context['timestamp'] = datetime_now.strftime(SIGV4_TIMESTAMP)
424 # This could be a retry. Make sure the previous
425 # authorization header is removed first.
426 self._modify_request_before_signing(request)
427 canonical_request = self.canonical_request(request)
428 logger.debug("Calculating signature using v4 auth.")
429 logger.debug('CanonicalRequest:\n%s', canonical_request)
430 string_to_sign = self.string_to_sign(request, canonical_request)
431 logger.debug('StringToSign:\n%s', string_to_sign)
432 signature = self.signature(string_to_sign, request)
433 logger.debug('Signature:\n%s', signature)
435 self._inject_signature_to_request(request, signature)
437 def _inject_signature_to_request(self, request, signature):
438 auth_str = [f'AWS4-HMAC-SHA256 Credential={self.scope(request)}']
439 headers_to_sign = self.headers_to_sign(request)
440 auth_str.append(
441 f"SignedHeaders={self.signed_headers(headers_to_sign)}"
442 )
443 auth_str.append(f'Signature={signature}')
444 request.headers['Authorization'] = ', '.join(auth_str)
445 return request
447 def _modify_request_before_signing(self, request):
448 if 'Authorization' in request.headers:
449 del request.headers['Authorization']
450 self._set_necessary_date_headers(request)
451 if self.credentials.token:
452 if 'X-Amz-Security-Token' in request.headers:
453 del request.headers['X-Amz-Security-Token']
454 request.headers['X-Amz-Security-Token'] = self.credentials.token
456 if not request.context.get('payload_signing_enabled', True):
457 if 'X-Amz-Content-SHA256' in request.headers:
458 del request.headers['X-Amz-Content-SHA256']
459 request.headers['X-Amz-Content-SHA256'] = UNSIGNED_PAYLOAD
461 def _set_necessary_date_headers(self, request):
462 # The spec allows for either the Date _or_ the X-Amz-Date value to be
463 # used so we check both. If there's a Date header, we use the date
464 # header. Otherwise we use the X-Amz-Date header.
465 if 'Date' in request.headers:
466 del request.headers['Date']
467 datetime_timestamp = datetime.datetime.strptime(
468 request.context['timestamp'], SIGV4_TIMESTAMP
469 )
470 request.headers['Date'] = formatdate(
471 int(calendar.timegm(datetime_timestamp.timetuple()))
472 )
473 if 'X-Amz-Date' in request.headers:
474 del request.headers['X-Amz-Date']
475 else:
476 if 'X-Amz-Date' in request.headers:
477 del request.headers['X-Amz-Date']
478 request.headers['X-Amz-Date'] = request.context['timestamp']
481class S3SigV4Auth(SigV4Auth):
482 def _modify_request_before_signing(self, request):
483 super()._modify_request_before_signing(request)
484 if 'X-Amz-Content-SHA256' in request.headers:
485 del request.headers['X-Amz-Content-SHA256']
487 request.headers['X-Amz-Content-SHA256'] = self.payload(request)
489 def _should_sha256_sign_payload(self, request):
490 # S3 allows optional body signing, so to minimize the performance
491 # impact, we opt to not SHA256 sign the body on streaming uploads,
492 # provided that we're on https.
493 client_config = request.context.get('client_config')
494 s3_config = getattr(client_config, 's3', None)
496 # The config could be None if it isn't set, or if the customer sets it
497 # to None.
498 if s3_config is None:
499 s3_config = {}
501 # The explicit configuration takes precedence over any implicit
502 # configuration.
503 sign_payload = s3_config.get('payload_signing_enabled', None)
504 if sign_payload is not None:
505 return sign_payload
507 # We require that both a checksum be present and https be enabled
508 # to implicitly disable body signing. The combination of TLS and
509 # a checksum is sufficiently secure and durable for us to be
510 # confident in the request without body signing.
511 checksum_header = 'Content-MD5'
512 checksum_context = request.context.get('checksum', {})
513 algorithm = checksum_context.get('request_algorithm')
514 if isinstance(algorithm, dict) and algorithm.get('in') == 'header':
515 checksum_header = algorithm['name']
516 if (
517 not request.url.startswith("https")
518 or checksum_header not in request.headers
519 ):
520 return True
522 # If the input is streaming we disable body signing by default.
523 if request.context.get('has_streaming_input', False):
524 return False
526 # If the S3-specific checks had no results, delegate to the generic
527 # checks.
528 return super()._should_sha256_sign_payload(request)
530 def _normalize_url_path(self, path):
531 # For S3, we do not normalize the path.
532 return path
535class S3ExpressAuth(S3SigV4Auth):
536 REQUIRES_IDENTITY_CACHE = True
538 def __init__(
539 self, credentials, service_name, region_name, *, identity_cache
540 ):
541 super().__init__(credentials, service_name, region_name)
542 self._identity_cache = identity_cache
544 def add_auth(self, request):
545 super().add_auth(request)
547 def _modify_request_before_signing(self, request):
548 super()._modify_request_before_signing(request)
549 if 'x-amz-s3session-token' not in request.headers:
550 request.headers['x-amz-s3session-token'] = self.credentials.token
551 # S3Express does not support STS' X-Amz-Security-Token
552 if 'X-Amz-Security-Token' in request.headers:
553 del request.headers['X-Amz-Security-Token']
556class S3ExpressPostAuth(S3ExpressAuth):
557 REQUIRES_IDENTITY_CACHE = True
559 def add_auth(self, request):
560 datetime_now = datetime.datetime.utcnow()
561 request.context['timestamp'] = datetime_now.strftime(SIGV4_TIMESTAMP)
563 fields = {}
564 if request.context.get('s3-presign-post-fields', None) is not None:
565 fields = request.context['s3-presign-post-fields']
567 policy = {}
568 conditions = []
569 if request.context.get('s3-presign-post-policy', None) is not None:
570 policy = request.context['s3-presign-post-policy']
571 if policy.get('conditions', None) is not None:
572 conditions = policy['conditions']
574 policy['conditions'] = conditions
576 fields['x-amz-algorithm'] = 'AWS4-HMAC-SHA256'
577 fields['x-amz-credential'] = self.scope(request)
578 fields['x-amz-date'] = request.context['timestamp']
580 conditions.append({'x-amz-algorithm': 'AWS4-HMAC-SHA256'})
581 conditions.append({'x-amz-credential': self.scope(request)})
582 conditions.append({'x-amz-date': request.context['timestamp']})
584 if self.credentials.token is not None:
585 fields['X-Amz-S3session-Token'] = self.credentials.token
586 conditions.append(
587 {'X-Amz-S3session-Token': self.credentials.token}
588 )
590 # Dump the base64 encoded policy into the fields dictionary.
591 fields['policy'] = base64.b64encode(
592 json.dumps(policy).encode('utf-8')
593 ).decode('utf-8')
595 fields['x-amz-signature'] = self.signature(fields['policy'], request)
597 request.context['s3-presign-post-fields'] = fields
598 request.context['s3-presign-post-policy'] = policy
601class S3ExpressQueryAuth(S3ExpressAuth):
602 DEFAULT_EXPIRES = 300
603 REQUIRES_IDENTITY_CACHE = True
605 def __init__(
606 self,
607 credentials,
608 service_name,
609 region_name,
610 *,
611 identity_cache,
612 expires=DEFAULT_EXPIRES,
613 ):
614 super().__init__(
615 credentials,
616 service_name,
617 region_name,
618 identity_cache=identity_cache,
619 )
620 self._expires = expires
622 def _modify_request_before_signing(self, request):
623 # We automatically set this header, so if it's the auto-set value we
624 # want to get rid of it since it doesn't make sense for presigned urls.
625 content_type = request.headers.get('content-type')
626 blocklisted_content_type = (
627 'application/x-www-form-urlencoded; charset=utf-8'
628 )
629 if content_type == blocklisted_content_type:
630 del request.headers['content-type']
632 # Note that we're not including X-Amz-Signature.
633 # From the docs: "The Canonical Query String must include all the query
634 # parameters from the preceding table except for X-Amz-Signature.
635 signed_headers = self.signed_headers(self.headers_to_sign(request))
637 auth_params = {
638 'X-Amz-Algorithm': 'AWS4-HMAC-SHA256',
639 'X-Amz-Credential': self.scope(request),
640 'X-Amz-Date': request.context['timestamp'],
641 'X-Amz-Expires': self._expires,
642 'X-Amz-SignedHeaders': signed_headers,
643 }
644 if self.credentials.token is not None:
645 auth_params['X-Amz-S3session-Token'] = self.credentials.token
646 # Now parse the original query string to a dict, inject our new query
647 # params, and serialize back to a query string.
648 url_parts = urlsplit(request.url)
649 # parse_qs makes each value a list, but in our case we know we won't
650 # have repeated keys so we know we have single element lists which we
651 # can convert back to scalar values.
652 query_string_parts = parse_qs(url_parts.query, keep_blank_values=True)
653 query_dict = {k: v[0] for k, v in query_string_parts.items()}
655 if request.params:
656 query_dict.update(request.params)
657 request.params = {}
658 # The spec is particular about this. It *has* to be:
659 # https://<endpoint>?<operation params>&<auth params>
660 # You can't mix the two types of params together, i.e just keep doing
661 # new_query_params.update(op_params)
662 # new_query_params.update(auth_params)
663 # percent_encode_sequence(new_query_params)
664 operation_params = ''
665 if request.data:
666 # We also need to move the body params into the query string. To
667 # do this, we first have to convert it to a dict.
668 query_dict.update(_get_body_as_dict(request))
669 request.data = ''
670 if query_dict:
671 operation_params = percent_encode_sequence(query_dict) + '&'
672 new_query_string = (
673 f"{operation_params}{percent_encode_sequence(auth_params)}"
674 )
675 # url_parts is a tuple (and therefore immutable) so we need to create
676 # a new url_parts with the new query string.
677 # <part> - <index>
678 # scheme - 0
679 # netloc - 1
680 # path - 2
681 # query - 3 <-- we're replacing this.
682 # fragment - 4
683 p = url_parts
684 new_url_parts = (p[0], p[1], p[2], new_query_string, p[4])
685 request.url = urlunsplit(new_url_parts)
687 def _inject_signature_to_request(self, request, signature):
688 # Rather than calculating an "Authorization" header, for the query
689 # param quth, we just append an 'X-Amz-Signature' param to the end
690 # of the query string.
691 request.url += f'&X-Amz-Signature={signature}'
693 def _normalize_url_path(self, path):
694 # For S3, we do not normalize the path.
695 return path
697 def payload(self, request):
698 # From the doc link above:
699 # "You don't include a payload hash in the Canonical Request, because
700 # when you create a presigned URL, you don't know anything about the
701 # payload. Instead, you use a constant string "UNSIGNED-PAYLOAD".
702 return UNSIGNED_PAYLOAD
705class SigV4QueryAuth(SigV4Auth):
706 DEFAULT_EXPIRES = 3600
708 def __init__(
709 self, credentials, service_name, region_name, expires=DEFAULT_EXPIRES
710 ):
711 super().__init__(credentials, service_name, region_name)
712 self._expires = expires
714 def _modify_request_before_signing(self, request):
715 # We automatically set this header, so if it's the auto-set value we
716 # want to get rid of it since it doesn't make sense for presigned urls.
717 content_type = request.headers.get('content-type')
718 blacklisted_content_type = (
719 'application/x-www-form-urlencoded; charset=utf-8'
720 )
721 if content_type == blacklisted_content_type:
722 del request.headers['content-type']
724 # Note that we're not including X-Amz-Signature.
725 # From the docs: "The Canonical Query String must include all the query
726 # parameters from the preceding table except for X-Amz-Signature.
727 signed_headers = self.signed_headers(self.headers_to_sign(request))
729 auth_params = {
730 'X-Amz-Algorithm': 'AWS4-HMAC-SHA256',
731 'X-Amz-Credential': self.scope(request),
732 'X-Amz-Date': request.context['timestamp'],
733 'X-Amz-Expires': self._expires,
734 'X-Amz-SignedHeaders': signed_headers,
735 }
736 if self.credentials.token is not None:
737 auth_params['X-Amz-Security-Token'] = self.credentials.token
738 # Now parse the original query string to a dict, inject our new query
739 # params, and serialize back to a query string.
740 url_parts = urlsplit(request.url)
741 # parse_qs makes each value a list, but in our case we know we won't
742 # have repeated keys so we know we have single element lists which we
743 # can convert back to scalar values.
744 query_string_parts = parse_qs(url_parts.query, keep_blank_values=True)
745 query_dict = {k: v[0] for k, v in query_string_parts.items()}
747 if request.params:
748 query_dict.update(request.params)
749 request.params = {}
750 # The spec is particular about this. It *has* to be:
751 # https://<endpoint>?<operation params>&<auth params>
752 # You can't mix the two types of params together, i.e just keep doing
753 # new_query_params.update(op_params)
754 # new_query_params.update(auth_params)
755 # percent_encode_sequence(new_query_params)
756 operation_params = ''
757 if request.data:
758 # We also need to move the body params into the query string. To
759 # do this, we first have to convert it to a dict.
760 query_dict.update(_get_body_as_dict(request))
761 request.data = ''
762 if query_dict:
763 operation_params = percent_encode_sequence(query_dict) + '&'
764 new_query_string = (
765 f"{operation_params}{percent_encode_sequence(auth_params)}"
766 )
767 # url_parts is a tuple (and therefore immutable) so we need to create
768 # a new url_parts with the new query string.
769 # <part> - <index>
770 # scheme - 0
771 # netloc - 1
772 # path - 2
773 # query - 3 <-- we're replacing this.
774 # fragment - 4
775 p = url_parts
776 new_url_parts = (p[0], p[1], p[2], new_query_string, p[4])
777 request.url = urlunsplit(new_url_parts)
779 def _inject_signature_to_request(self, request, signature):
780 # Rather than calculating an "Authorization" header, for the query
781 # param quth, we just append an 'X-Amz-Signature' param to the end
782 # of the query string.
783 request.url += f'&X-Amz-Signature={signature}'
786class S3SigV4QueryAuth(SigV4QueryAuth):
787 """S3 SigV4 auth using query parameters.
789 This signer will sign a request using query parameters and signature
790 version 4, i.e a "presigned url" signer.
792 Based off of:
794 http://docs.aws.amazon.com/AmazonS3/latest/API/sigv4-query-string-auth.html
796 """
798 def _normalize_url_path(self, path):
799 # For S3, we do not normalize the path.
800 return path
802 def payload(self, request):
803 # From the doc link above:
804 # "You don't include a payload hash in the Canonical Request, because
805 # when you create a presigned URL, you don't know anything about the
806 # payload. Instead, you use a constant string "UNSIGNED-PAYLOAD".
807 return UNSIGNED_PAYLOAD
810class S3SigV4PostAuth(SigV4Auth):
811 """
812 Presigns a s3 post
814 Implementation doc here:
815 http://docs.aws.amazon.com/AmazonS3/latest/API/sigv4-UsingHTTPPOST.html
816 """
818 def add_auth(self, request):
819 datetime_now = datetime.datetime.utcnow()
820 request.context['timestamp'] = datetime_now.strftime(SIGV4_TIMESTAMP)
822 fields = {}
823 if request.context.get('s3-presign-post-fields', None) is not None:
824 fields = request.context['s3-presign-post-fields']
826 policy = {}
827 conditions = []
828 if request.context.get('s3-presign-post-policy', None) is not None:
829 policy = request.context['s3-presign-post-policy']
830 if policy.get('conditions', None) is not None:
831 conditions = policy['conditions']
833 policy['conditions'] = conditions
835 fields['x-amz-algorithm'] = 'AWS4-HMAC-SHA256'
836 fields['x-amz-credential'] = self.scope(request)
837 fields['x-amz-date'] = request.context['timestamp']
839 conditions.append({'x-amz-algorithm': 'AWS4-HMAC-SHA256'})
840 conditions.append({'x-amz-credential': self.scope(request)})
841 conditions.append({'x-amz-date': request.context['timestamp']})
843 if self.credentials.token is not None:
844 fields['x-amz-security-token'] = self.credentials.token
845 conditions.append({'x-amz-security-token': self.credentials.token})
847 # Dump the base64 encoded policy into the fields dictionary.
848 fields['policy'] = base64.b64encode(
849 json.dumps(policy).encode('utf-8')
850 ).decode('utf-8')
852 fields['x-amz-signature'] = self.signature(fields['policy'], request)
854 request.context['s3-presign-post-fields'] = fields
855 request.context['s3-presign-post-policy'] = policy
858class HmacV1Auth(BaseSigner):
859 # List of Query String Arguments of Interest
860 QSAOfInterest = [
861 'accelerate',
862 'acl',
863 'cors',
864 'defaultObjectAcl',
865 'location',
866 'logging',
867 'partNumber',
868 'policy',
869 'requestPayment',
870 'torrent',
871 'versioning',
872 'versionId',
873 'versions',
874 'website',
875 'uploads',
876 'uploadId',
877 'response-content-type',
878 'response-content-language',
879 'response-expires',
880 'response-cache-control',
881 'response-content-disposition',
882 'response-content-encoding',
883 'delete',
884 'lifecycle',
885 'tagging',
886 'restore',
887 'storageClass',
888 'notification',
889 'replication',
890 'requestPayment',
891 'analytics',
892 'metrics',
893 'inventory',
894 'select',
895 'select-type',
896 'object-lock',
897 ]
899 def __init__(self, credentials, service_name=None, region_name=None):
900 self.credentials = credentials
902 def sign_string(self, string_to_sign):
903 new_hmac = hmac.new(
904 self.credentials.secret_key.encode('utf-8'), digestmod=sha1
905 )
906 new_hmac.update(string_to_sign.encode('utf-8'))
907 return encodebytes(new_hmac.digest()).strip().decode('utf-8')
909 def canonical_standard_headers(self, headers):
910 interesting_headers = ['content-md5', 'content-type', 'date']
911 hoi = []
912 if 'Date' in headers:
913 del headers['Date']
914 headers['Date'] = self._get_date()
915 for ih in interesting_headers:
916 found = False
917 for key in headers:
918 lk = key.lower()
919 if headers[key] is not None and lk == ih:
920 hoi.append(headers[key].strip())
921 found = True
922 if not found:
923 hoi.append('')
924 return '\n'.join(hoi)
926 def canonical_custom_headers(self, headers):
927 hoi = []
928 custom_headers = {}
929 for key in headers:
930 lk = key.lower()
931 if headers[key] is not None:
932 if lk.startswith('x-amz-'):
933 custom_headers[lk] = ','.join(
934 v.strip() for v in headers.get_all(key)
935 )
936 sorted_header_keys = sorted(custom_headers.keys())
937 for key in sorted_header_keys:
938 hoi.append(f"{key}:{custom_headers[key]}")
939 return '\n'.join(hoi)
941 def unquote_v(self, nv):
942 """
943 TODO: Do we need this?
944 """
945 if len(nv) == 1:
946 return nv
947 else:
948 return (nv[0], unquote(nv[1]))
950 def canonical_resource(self, split, auth_path=None):
951 # don't include anything after the first ? in the resource...
952 # unless it is one of the QSA of interest, defined above
953 # NOTE:
954 # The path in the canonical resource should always be the
955 # full path including the bucket name, even for virtual-hosting
956 # style addressing. The ``auth_path`` keeps track of the full
957 # path for the canonical resource and would be passed in if
958 # the client was using virtual-hosting style.
959 if auth_path is not None:
960 buf = auth_path
961 else:
962 buf = split.path
963 if split.query:
964 qsa = split.query.split('&')
965 qsa = [a.split('=', 1) for a in qsa]
966 qsa = [
967 self.unquote_v(a) for a in qsa if a[0] in self.QSAOfInterest
968 ]
969 if len(qsa) > 0:
970 qsa.sort(key=itemgetter(0))
971 qsa = ['='.join(a) for a in qsa]
972 buf += '?'
973 buf += '&'.join(qsa)
974 return buf
976 def canonical_string(
977 self, method, split, headers, expires=None, auth_path=None
978 ):
979 cs = method.upper() + '\n'
980 cs += self.canonical_standard_headers(headers) + '\n'
981 custom_headers = self.canonical_custom_headers(headers)
982 if custom_headers:
983 cs += custom_headers + '\n'
984 cs += self.canonical_resource(split, auth_path=auth_path)
985 return cs
987 def get_signature(
988 self, method, split, headers, expires=None, auth_path=None
989 ):
990 if self.credentials.token:
991 del headers['x-amz-security-token']
992 headers['x-amz-security-token'] = self.credentials.token
993 string_to_sign = self.canonical_string(
994 method, split, headers, auth_path=auth_path
995 )
996 logger.debug(f'StringToSign:\n{string_to_sign}')
997 return self.sign_string(string_to_sign)
999 def add_auth(self, request):
1000 if self.credentials is None:
1001 raise NoCredentialsError
1002 logger.debug("Calculating signature using hmacv1 auth.")
1003 split = urlsplit(request.url)
1004 logger.debug(f'HTTP request method: {request.method}')
1005 signature = self.get_signature(
1006 request.method, split, request.headers, auth_path=request.auth_path
1007 )
1008 self._inject_signature(request, signature)
1010 def _get_date(self):
1011 return formatdate(usegmt=True)
1013 def _inject_signature(self, request, signature):
1014 if 'Authorization' in request.headers:
1015 # We have to do this because request.headers is not
1016 # normal dictionary. It has the (unintuitive) behavior
1017 # of aggregating repeated setattr calls for the same
1018 # key value. For example:
1019 # headers['foo'] = 'a'; headers['foo'] = 'b'
1020 # list(headers) will print ['foo', 'foo'].
1021 del request.headers['Authorization']
1023 auth_header = f"AWS {self.credentials.access_key}:{signature}"
1024 request.headers['Authorization'] = auth_header
1027class HmacV1QueryAuth(HmacV1Auth):
1028 """
1029 Generates a presigned request for s3.
1031 Spec from this document:
1033 http://docs.aws.amazon.com/AmazonS3/latest/dev/RESTAuthentication.html
1034 #RESTAuthenticationQueryStringAuth
1036 """
1038 DEFAULT_EXPIRES = 3600
1040 def __init__(self, credentials, expires=DEFAULT_EXPIRES):
1041 self.credentials = credentials
1042 self._expires = expires
1044 def _get_date(self):
1045 return str(int(time.time() + int(self._expires)))
1047 def _inject_signature(self, request, signature):
1048 query_dict = {}
1049 query_dict['AWSAccessKeyId'] = self.credentials.access_key
1050 query_dict['Signature'] = signature
1052 for header_key in request.headers:
1053 lk = header_key.lower()
1054 # For query string requests, Expires is used instead of the
1055 # Date header.
1056 if header_key == 'Date':
1057 query_dict['Expires'] = request.headers['Date']
1058 # We only want to include relevant headers in the query string.
1059 # These can be anything that starts with x-amz, is Content-MD5,
1060 # or is Content-Type.
1061 elif lk.startswith('x-amz-') or lk in (
1062 'content-md5',
1063 'content-type',
1064 ):
1065 query_dict[lk] = request.headers[lk]
1066 # Combine all of the identified headers into an encoded
1067 # query string
1068 new_query_string = percent_encode_sequence(query_dict)
1070 # Create a new url with the presigned url.
1071 p = urlsplit(request.url)
1072 if p[3]:
1073 # If there was a pre-existing query string, we should
1074 # add that back before injecting the new query string.
1075 new_query_string = f'{p[3]}&{new_query_string}'
1076 new_url_parts = (p[0], p[1], p[2], new_query_string, p[4])
1077 request.url = urlunsplit(new_url_parts)
1080class HmacV1PostAuth(HmacV1Auth):
1081 """
1082 Generates a presigned post for s3.
1084 Spec from this document:
1086 http://docs.aws.amazon.com/AmazonS3/latest/dev/UsingHTTPPOST.html
1087 """
1089 def add_auth(self, request):
1090 fields = {}
1091 if request.context.get('s3-presign-post-fields', None) is not None:
1092 fields = request.context['s3-presign-post-fields']
1094 policy = {}
1095 conditions = []
1096 if request.context.get('s3-presign-post-policy', None) is not None:
1097 policy = request.context['s3-presign-post-policy']
1098 if policy.get('conditions', None) is not None:
1099 conditions = policy['conditions']
1101 policy['conditions'] = conditions
1103 fields['AWSAccessKeyId'] = self.credentials.access_key
1105 if self.credentials.token is not None:
1106 fields['x-amz-security-token'] = self.credentials.token
1107 conditions.append({'x-amz-security-token': self.credentials.token})
1109 # Dump the base64 encoded policy into the fields dictionary.
1110 fields['policy'] = base64.b64encode(
1111 json.dumps(policy).encode('utf-8')
1112 ).decode('utf-8')
1114 fields['signature'] = self.sign_string(fields['policy'])
1116 request.context['s3-presign-post-fields'] = fields
1117 request.context['s3-presign-post-policy'] = policy
1120class BearerAuth(TokenSigner):
1121 """
1122 Performs bearer token authorization by placing the bearer token in the
1123 Authorization header as specified by Section 2.1 of RFC 6750.
1125 https://datatracker.ietf.org/doc/html/rfc6750#section-2.1
1126 """
1128 def add_auth(self, request):
1129 if self.auth_token is None:
1130 raise NoAuthTokenError()
1132 auth_header = f'Bearer {self.auth_token.token}'
1133 if 'Authorization' in request.headers:
1134 del request.headers['Authorization']
1135 request.headers['Authorization'] = auth_header
1138def resolve_auth_type(auth_trait):
1139 for auth_type in auth_trait:
1140 if auth_type == 'smithy.api#noAuth':
1141 return AUTH_TYPE_TO_SIGNATURE_VERSION[auth_type]
1142 elif auth_type in AUTH_TYPE_TO_SIGNATURE_VERSION:
1143 signature_version = AUTH_TYPE_TO_SIGNATURE_VERSION[auth_type]
1144 if signature_version in AUTH_TYPE_MAPS:
1145 return signature_version
1146 else:
1147 raise UnknownSignatureVersionError(signature_version=auth_type)
1148 raise UnsupportedSignatureVersionError(signature_version=auth_trait)
1151AUTH_TYPE_MAPS = {
1152 'v2': SigV2Auth,
1153 'v3': SigV3Auth,
1154 'v3https': SigV3Auth,
1155 's3': HmacV1Auth,
1156 's3-query': HmacV1QueryAuth,
1157 's3-presign-post': HmacV1PostAuth,
1158 's3v4-presign-post': S3SigV4PostAuth,
1159 'v4-s3express': S3ExpressAuth,
1160 'v4-s3express-query': S3ExpressQueryAuth,
1161 'v4-s3express-presign-post': S3ExpressPostAuth,
1162 'bearer': BearerAuth,
1163}
1165# Define v4 signers depending on if CRT is present
1166if HAS_CRT:
1167 from botocore.crt.auth import CRT_AUTH_TYPE_MAPS
1169 AUTH_TYPE_MAPS.update(CRT_AUTH_TYPE_MAPS)
1170else:
1171 AUTH_TYPE_MAPS.update(
1172 {
1173 'v4': SigV4Auth,
1174 'v4-query': SigV4QueryAuth,
1175 's3v4': S3SigV4Auth,
1176 's3v4-query': S3SigV4QueryAuth,
1177 }
1178 )
1180AUTH_TYPE_TO_SIGNATURE_VERSION = {
1181 'aws.auth#sigv4': 'v4',
1182 'aws.auth#sigv4a': 'v4a',
1183 'smithy.api#httpBearerAuth': 'bearer',
1184 'smithy.api#noAuth': 'none',
1185}