Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/botocore/auth.py: 20%
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1# Copyright (c) 2012-2013 Mitch Garnaat http://garnaat.org/
2# Copyright 2012-2014 Amazon.com, Inc. or its affiliates. All Rights Reserved.
3#
4# Licensed under the Apache License, Version 2.0 (the "License"). You
5# may not use this file except in compliance with the License. A copy of
6# the License is located at
7#
8# http://aws.amazon.com/apache2.0/
9#
10# or in the "license" file accompanying this file. This file is
11# distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF
12# ANY KIND, either express or implied. See the License for the specific
13# language governing permissions and limitations under the License.
14import base64
15import calendar
16import datetime
17import functools
18import hmac
19import json
20import logging
21import time
22from collections.abc import Mapping
23from email.utils import formatdate
24from hashlib import sha1, sha256
25from operator import itemgetter
27from botocore.compat import (
28 HAS_CRT,
29 HTTPHeaders,
30 encodebytes,
31 ensure_unicode,
32 parse_qs,
33 quote,
34 unquote,
35 urlsplit,
36 urlunsplit,
37)
38from botocore.exceptions import (
39 NoAuthTokenError,
40 NoCredentialsError,
41 UnknownSignatureVersionError,
42 UnsupportedSignatureVersionError,
43)
44from botocore.utils import (
45 is_valid_ipv6_endpoint_url,
46 normalize_url_path,
47 percent_encode_sequence,
48)
50# Imports for backwards compatibility
51from botocore.compat import MD5_AVAILABLE # noqa
54logger = logging.getLogger(__name__)
57EMPTY_SHA256_HASH = (
58 'e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855'
59)
60# This is the buffer size used when calculating sha256 checksums.
61# Experimenting with various buffer sizes showed that this value generally
62# gave the best result (in terms of performance).
63PAYLOAD_BUFFER = 1024 * 1024
64ISO8601 = '%Y-%m-%dT%H:%M:%SZ'
65SIGV4_TIMESTAMP = '%Y%m%dT%H%M%SZ'
66SIGNED_HEADERS_BLACKLIST = [
67 'expect',
68 'user-agent',
69 'x-amzn-trace-id',
70]
71UNSIGNED_PAYLOAD = 'UNSIGNED-PAYLOAD'
72STREAMING_UNSIGNED_PAYLOAD_TRAILER = 'STREAMING-UNSIGNED-PAYLOAD-TRAILER'
75def _host_from_url(url):
76 # Given URL, derive value for host header. Ensure that value:
77 # 1) is lowercase
78 # 2) excludes port, if it was the default port
79 # 3) excludes userinfo
80 url_parts = urlsplit(url)
81 host = url_parts.hostname # urlsplit's hostname is always lowercase
82 if is_valid_ipv6_endpoint_url(url):
83 host = f'[{host}]'
84 default_ports = {
85 'http': 80,
86 'https': 443,
87 }
88 if url_parts.port is not None:
89 if url_parts.port != default_ports.get(url_parts.scheme):
90 host = '%s:%d' % (host, url_parts.port)
91 return host
94def _get_body_as_dict(request):
95 # For query services, request.data is form-encoded and is already a
96 # dict, but for other services such as rest-json it could be a json
97 # string or bytes. In those cases we attempt to load the data as a
98 # dict.
99 data = request.data
100 if isinstance(data, bytes):
101 data = json.loads(data.decode('utf-8'))
102 elif isinstance(data, str):
103 data = json.loads(data)
104 return data
107class BaseSigner:
108 REQUIRES_REGION = False
109 REQUIRES_TOKEN = False
111 def add_auth(self, request):
112 raise NotImplementedError("add_auth")
115class TokenSigner(BaseSigner):
116 REQUIRES_TOKEN = True
117 """
118 Signers that expect an authorization token to perform the authorization
119 """
121 def __init__(self, auth_token):
122 self.auth_token = auth_token
125class SigV2Auth(BaseSigner):
126 """
127 Sign a request with Signature V2.
128 """
130 def __init__(self, credentials):
131 self.credentials = credentials
133 def calc_signature(self, request, params):
134 logger.debug("Calculating signature using v2 auth.")
135 split = urlsplit(request.url)
136 path = split.path
137 if len(path) == 0:
138 path = '/'
139 string_to_sign = f"{request.method}\n{split.netloc}\n{path}\n"
140 lhmac = hmac.new(
141 self.credentials.secret_key.encode("utf-8"), digestmod=sha256
142 )
143 pairs = []
144 for key in sorted(params):
145 # Any previous signature should not be a part of this
146 # one, so we skip that particular key. This prevents
147 # issues during retries.
148 if key == 'Signature':
149 continue
150 value = str(params[key])
151 quoted_key = quote(key.encode('utf-8'), safe='')
152 quoted_value = quote(value.encode('utf-8'), safe='-_~')
153 pairs.append(f'{quoted_key}={quoted_value}')
154 qs = '&'.join(pairs)
155 string_to_sign += qs
156 logger.debug('String to sign: %s', string_to_sign)
157 lhmac.update(string_to_sign.encode('utf-8'))
158 b64 = base64.b64encode(lhmac.digest()).strip().decode('utf-8')
159 return (qs, b64)
161 def add_auth(self, request):
162 # The auth handler is the last thing called in the
163 # preparation phase of a prepared request.
164 # Because of this we have to parse the query params
165 # from the request body so we can update them with
166 # the sigv2 auth params.
167 if self.credentials is None:
168 raise NoCredentialsError()
169 if request.data:
170 # POST
171 params = request.data
172 else:
173 # GET
174 params = request.params
175 params['AWSAccessKeyId'] = self.credentials.access_key
176 params['SignatureVersion'] = '2'
177 params['SignatureMethod'] = 'HmacSHA256'
178 params['Timestamp'] = time.strftime(ISO8601, time.gmtime())
179 if self.credentials.token:
180 params['SecurityToken'] = self.credentials.token
181 qs, signature = self.calc_signature(request, params)
182 params['Signature'] = signature
183 return request
186class SigV3Auth(BaseSigner):
187 def __init__(self, credentials):
188 self.credentials = credentials
190 def add_auth(self, request):
191 if self.credentials is None:
192 raise NoCredentialsError()
193 if 'Date' in request.headers:
194 del request.headers['Date']
195 request.headers['Date'] = formatdate(usegmt=True)
196 if self.credentials.token:
197 if 'X-Amz-Security-Token' in request.headers:
198 del request.headers['X-Amz-Security-Token']
199 request.headers['X-Amz-Security-Token'] = self.credentials.token
200 new_hmac = hmac.new(
201 self.credentials.secret_key.encode('utf-8'), digestmod=sha256
202 )
203 new_hmac.update(request.headers['Date'].encode('utf-8'))
204 encoded_signature = encodebytes(new_hmac.digest()).strip()
205 signature = (
206 f"AWS3-HTTPS AWSAccessKeyId={self.credentials.access_key},"
207 f"Algorithm=HmacSHA256,Signature={encoded_signature.decode('utf-8')}"
208 )
209 if 'X-Amzn-Authorization' in request.headers:
210 del request.headers['X-Amzn-Authorization']
211 request.headers['X-Amzn-Authorization'] = signature
214class SigV4Auth(BaseSigner):
215 """
216 Sign a request with Signature V4.
217 """
219 REQUIRES_REGION = True
221 def __init__(self, credentials, service_name, region_name):
222 self.credentials = credentials
223 # We initialize these value here so the unit tests can have
224 # valid values. But these will get overriden in ``add_auth``
225 # later for real requests.
226 self._region_name = region_name
227 self._service_name = service_name
229 def _sign(self, key, msg, hex=False):
230 if hex:
231 sig = hmac.new(key, msg.encode('utf-8'), sha256).hexdigest()
232 else:
233 sig = hmac.new(key, msg.encode('utf-8'), sha256).digest()
234 return sig
236 def headers_to_sign(self, request):
237 """
238 Select the headers from the request that need to be included
239 in the StringToSign.
240 """
241 header_map = HTTPHeaders()
242 for name, value in request.headers.items():
243 lname = name.lower()
244 if lname not in SIGNED_HEADERS_BLACKLIST:
245 header_map[lname] = value
246 if 'host' not in header_map:
247 # TODO: We should set the host ourselves, instead of relying on our
248 # HTTP client to set it for us.
249 header_map['host'] = _host_from_url(request.url)
250 return header_map
252 def canonical_query_string(self, request):
253 # The query string can come from two parts. One is the
254 # params attribute of the request. The other is from the request
255 # url (in which case we have to re-split the url into its components
256 # and parse out the query string component).
257 if request.params:
258 return self._canonical_query_string_params(request.params)
259 else:
260 return self._canonical_query_string_url(urlsplit(request.url))
262 def _canonical_query_string_params(self, params):
263 # [(key, value), (key2, value2)]
264 key_val_pairs = []
265 if isinstance(params, Mapping):
266 params = params.items()
267 for key, value in params:
268 key_val_pairs.append(
269 (quote(key, safe='-_.~'), quote(str(value), safe='-_.~'))
270 )
271 sorted_key_vals = []
272 # Sort by the URI-encoded key names, and in the case of
273 # repeated keys, then sort by the value.
274 for key, value in sorted(key_val_pairs):
275 sorted_key_vals.append(f'{key}={value}')
276 canonical_query_string = '&'.join(sorted_key_vals)
277 return canonical_query_string
279 def _canonical_query_string_url(self, parts):
280 canonical_query_string = ''
281 if parts.query:
282 # [(key, value), (key2, value2)]
283 key_val_pairs = []
284 for pair in parts.query.split('&'):
285 key, _, value = pair.partition('=')
286 key_val_pairs.append((key, value))
287 sorted_key_vals = []
288 # Sort by the URI-encoded key names, and in the case of
289 # repeated keys, then sort by the value.
290 for key, value in sorted(key_val_pairs):
291 sorted_key_vals.append(f'{key}={value}')
292 canonical_query_string = '&'.join(sorted_key_vals)
293 return canonical_query_string
295 def canonical_headers(self, headers_to_sign):
296 """
297 Return the headers that need to be included in the StringToSign
298 in their canonical form by converting all header keys to lower
299 case, sorting them in alphabetical order and then joining
300 them into a string, separated by newlines.
301 """
302 headers = []
303 sorted_header_names = sorted(set(headers_to_sign))
304 for key in sorted_header_names:
305 value = ','.join(
306 self._header_value(v) for v in headers_to_sign.get_all(key)
307 )
308 headers.append(f'{key}:{ensure_unicode(value)}')
309 return '\n'.join(headers)
311 def _header_value(self, value):
312 # From the sigv4 docs:
313 # Lowercase(HeaderName) + ':' + Trimall(HeaderValue)
314 #
315 # The Trimall function removes excess white space before and after
316 # values, and converts sequential spaces to a single space.
317 return ' '.join(value.split())
319 def signed_headers(self, headers_to_sign):
320 headers = sorted(n.lower().strip() for n in set(headers_to_sign))
321 return ';'.join(headers)
323 def _is_streaming_checksum_payload(self, request):
324 checksum_context = request.context.get('checksum', {})
325 algorithm = checksum_context.get('request_algorithm')
326 return isinstance(algorithm, dict) and algorithm.get('in') == 'trailer'
328 def payload(self, request):
329 if self._is_streaming_checksum_payload(request):
330 return STREAMING_UNSIGNED_PAYLOAD_TRAILER
331 elif not self._should_sha256_sign_payload(request):
332 # When payload signing is disabled, we use this static string in
333 # place of the payload checksum.
334 return UNSIGNED_PAYLOAD
335 request_body = request.body
336 if request_body and hasattr(request_body, 'seek'):
337 position = request_body.tell()
338 read_chunksize = functools.partial(
339 request_body.read, PAYLOAD_BUFFER
340 )
341 checksum = sha256()
342 for chunk in iter(read_chunksize, b''):
343 checksum.update(chunk)
344 hex_checksum = checksum.hexdigest()
345 request_body.seek(position)
346 return hex_checksum
347 elif request_body:
348 # The request serialization has ensured that
349 # request.body is a bytes() type.
350 return sha256(request_body).hexdigest()
351 else:
352 return EMPTY_SHA256_HASH
354 def _should_sha256_sign_payload(self, request):
355 # Payloads will always be signed over insecure connections.
356 if not request.url.startswith('https'):
357 return True
359 # Certain operations may have payload signing disabled by default.
360 # Since we don't have access to the operation model, we pass in this
361 # bit of metadata through the request context.
362 return request.context.get('payload_signing_enabled', True)
364 def canonical_request(self, request):
365 cr = [request.method.upper()]
366 path = self._normalize_url_path(urlsplit(request.url).path)
367 cr.append(path)
368 cr.append(self.canonical_query_string(request))
369 headers_to_sign = self.headers_to_sign(request)
370 cr.append(self.canonical_headers(headers_to_sign) + '\n')
371 cr.append(self.signed_headers(headers_to_sign))
372 if 'X-Amz-Content-SHA256' in request.headers:
373 body_checksum = request.headers['X-Amz-Content-SHA256']
374 else:
375 body_checksum = self.payload(request)
376 cr.append(body_checksum)
377 return '\n'.join(cr)
379 def _normalize_url_path(self, path):
380 normalized_path = quote(normalize_url_path(path), safe='/~')
381 return normalized_path
383 def scope(self, request):
384 scope = [self.credentials.access_key]
385 scope.append(request.context['timestamp'][0:8])
386 scope.append(self._region_name)
387 scope.append(self._service_name)
388 scope.append('aws4_request')
389 return '/'.join(scope)
391 def credential_scope(self, request):
392 scope = []
393 scope.append(request.context['timestamp'][0:8])
394 scope.append(self._region_name)
395 scope.append(self._service_name)
396 scope.append('aws4_request')
397 return '/'.join(scope)
399 def string_to_sign(self, request, canonical_request):
400 """
401 Return the canonical StringToSign as well as a dict
402 containing the original version of all headers that
403 were included in the StringToSign.
404 """
405 sts = ['AWS4-HMAC-SHA256']
406 sts.append(request.context['timestamp'])
407 sts.append(self.credential_scope(request))
408 sts.append(sha256(canonical_request.encode('utf-8')).hexdigest())
409 return '\n'.join(sts)
411 def signature(self, string_to_sign, request):
412 key = self.credentials.secret_key
413 k_date = self._sign(
414 (f"AWS4{key}").encode(), request.context["timestamp"][0:8]
415 )
416 k_region = self._sign(k_date, self._region_name)
417 k_service = self._sign(k_region, self._service_name)
418 k_signing = self._sign(k_service, 'aws4_request')
419 return self._sign(k_signing, string_to_sign, hex=True)
421 def add_auth(self, request):
422 if self.credentials is None:
423 raise NoCredentialsError()
424 datetime_now = datetime.datetime.utcnow()
425 request.context['timestamp'] = datetime_now.strftime(SIGV4_TIMESTAMP)
426 # This could be a retry. Make sure the previous
427 # authorization header is removed first.
428 self._modify_request_before_signing(request)
429 canonical_request = self.canonical_request(request)
430 logger.debug("Calculating signature using v4 auth.")
431 logger.debug('CanonicalRequest:\n%s', canonical_request)
432 string_to_sign = self.string_to_sign(request, canonical_request)
433 logger.debug('StringToSign:\n%s', string_to_sign)
434 signature = self.signature(string_to_sign, request)
435 logger.debug('Signature:\n%s', signature)
437 self._inject_signature_to_request(request, signature)
439 def _inject_signature_to_request(self, request, signature):
440 auth_str = [f'AWS4-HMAC-SHA256 Credential={self.scope(request)}']
441 headers_to_sign = self.headers_to_sign(request)
442 auth_str.append(
443 f"SignedHeaders={self.signed_headers(headers_to_sign)}"
444 )
445 auth_str.append(f'Signature={signature}')
446 request.headers['Authorization'] = ', '.join(auth_str)
447 return request
449 def _modify_request_before_signing(self, request):
450 if 'Authorization' in request.headers:
451 del request.headers['Authorization']
452 self._set_necessary_date_headers(request)
453 if self.credentials.token:
454 if 'X-Amz-Security-Token' in request.headers:
455 del request.headers['X-Amz-Security-Token']
456 request.headers['X-Amz-Security-Token'] = self.credentials.token
458 if not request.context.get('payload_signing_enabled', True):
459 if 'X-Amz-Content-SHA256' in request.headers:
460 del request.headers['X-Amz-Content-SHA256']
461 request.headers['X-Amz-Content-SHA256'] = UNSIGNED_PAYLOAD
463 def _set_necessary_date_headers(self, request):
464 # The spec allows for either the Date _or_ the X-Amz-Date value to be
465 # used so we check both. If there's a Date header, we use the date
466 # header. Otherwise we use the X-Amz-Date header.
467 if 'Date' in request.headers:
468 del request.headers['Date']
469 datetime_timestamp = datetime.datetime.strptime(
470 request.context['timestamp'], SIGV4_TIMESTAMP
471 )
472 request.headers['Date'] = formatdate(
473 int(calendar.timegm(datetime_timestamp.timetuple()))
474 )
475 if 'X-Amz-Date' in request.headers:
476 del request.headers['X-Amz-Date']
477 else:
478 if 'X-Amz-Date' in request.headers:
479 del request.headers['X-Amz-Date']
480 request.headers['X-Amz-Date'] = request.context['timestamp']
483class S3SigV4Auth(SigV4Auth):
484 def _modify_request_before_signing(self, request):
485 super()._modify_request_before_signing(request)
486 if 'X-Amz-Content-SHA256' in request.headers:
487 del request.headers['X-Amz-Content-SHA256']
489 request.headers['X-Amz-Content-SHA256'] = self.payload(request)
491 def _should_sha256_sign_payload(self, request):
492 # S3 allows optional body signing, so to minimize the performance
493 # impact, we opt to not SHA256 sign the body on streaming uploads,
494 # provided that we're on https.
495 client_config = request.context.get('client_config')
496 s3_config = getattr(client_config, 's3', None)
498 # The config could be None if it isn't set, or if the customer sets it
499 # to None.
500 if s3_config is None:
501 s3_config = {}
503 # The explicit configuration takes precedence over any implicit
504 # configuration.
505 sign_payload = s3_config.get('payload_signing_enabled', None)
506 if sign_payload is not None:
507 return sign_payload
509 # We require that both a checksum be present and https be enabled
510 # to implicitly disable body signing. The combination of TLS and
511 # a checksum is sufficiently secure and durable for us to be
512 # confident in the request without body signing.
513 checksum_header = 'Content-MD5'
514 checksum_context = request.context.get('checksum', {})
515 algorithm = checksum_context.get('request_algorithm')
516 if isinstance(algorithm, dict) and algorithm.get('in') == 'header':
517 checksum_header = algorithm['name']
518 if (
519 not request.url.startswith("https")
520 or checksum_header not in request.headers
521 ):
522 return True
524 # If the input is streaming we disable body signing by default.
525 if request.context.get('has_streaming_input', False):
526 return False
528 # If the S3-specific checks had no results, delegate to the generic
529 # checks.
530 return super()._should_sha256_sign_payload(request)
532 def _normalize_url_path(self, path):
533 # For S3, we do not normalize the path.
534 return path
537class S3ExpressAuth(S3SigV4Auth):
538 REQUIRES_IDENTITY_CACHE = True
540 def __init__(
541 self, credentials, service_name, region_name, *, identity_cache
542 ):
543 super().__init__(credentials, service_name, region_name)
544 self._identity_cache = identity_cache
546 def add_auth(self, request):
547 super().add_auth(request)
549 def _modify_request_before_signing(self, request):
550 super()._modify_request_before_signing(request)
551 if 'x-amz-s3session-token' not in request.headers:
552 request.headers['x-amz-s3session-token'] = self.credentials.token
553 # S3Express does not support STS' X-Amz-Security-Token
554 if 'X-Amz-Security-Token' in request.headers:
555 del request.headers['X-Amz-Security-Token']
558class S3ExpressPostAuth(S3ExpressAuth):
559 REQUIRES_IDENTITY_CACHE = True
561 def add_auth(self, request):
562 datetime_now = datetime.datetime.utcnow()
563 request.context['timestamp'] = datetime_now.strftime(SIGV4_TIMESTAMP)
565 fields = {}
566 if request.context.get('s3-presign-post-fields', None) is not None:
567 fields = request.context['s3-presign-post-fields']
569 policy = {}
570 conditions = []
571 if request.context.get('s3-presign-post-policy', None) is not None:
572 policy = request.context['s3-presign-post-policy']
573 if policy.get('conditions', None) is not None:
574 conditions = policy['conditions']
576 policy['conditions'] = conditions
578 fields['x-amz-algorithm'] = 'AWS4-HMAC-SHA256'
579 fields['x-amz-credential'] = self.scope(request)
580 fields['x-amz-date'] = request.context['timestamp']
582 conditions.append({'x-amz-algorithm': 'AWS4-HMAC-SHA256'})
583 conditions.append({'x-amz-credential': self.scope(request)})
584 conditions.append({'x-amz-date': request.context['timestamp']})
586 if self.credentials.token is not None:
587 fields['X-Amz-S3session-Token'] = self.credentials.token
588 conditions.append(
589 {'X-Amz-S3session-Token': self.credentials.token}
590 )
592 # Dump the base64 encoded policy into the fields dictionary.
593 fields['policy'] = base64.b64encode(
594 json.dumps(policy).encode('utf-8')
595 ).decode('utf-8')
597 fields['x-amz-signature'] = self.signature(fields['policy'], request)
599 request.context['s3-presign-post-fields'] = fields
600 request.context['s3-presign-post-policy'] = policy
603class S3ExpressQueryAuth(S3ExpressAuth):
604 DEFAULT_EXPIRES = 300
605 REQUIRES_IDENTITY_CACHE = True
607 def __init__(
608 self,
609 credentials,
610 service_name,
611 region_name,
612 *,
613 identity_cache,
614 expires=DEFAULT_EXPIRES,
615 ):
616 super().__init__(
617 credentials,
618 service_name,
619 region_name,
620 identity_cache=identity_cache,
621 )
622 self._expires = expires
624 def _modify_request_before_signing(self, request):
625 # We automatically set this header, so if it's the auto-set value we
626 # want to get rid of it since it doesn't make sense for presigned urls.
627 content_type = request.headers.get('content-type')
628 blocklisted_content_type = (
629 'application/x-www-form-urlencoded; charset=utf-8'
630 )
631 if content_type == blocklisted_content_type:
632 del request.headers['content-type']
634 # Note that we're not including X-Amz-Signature.
635 # From the docs: "The Canonical Query String must include all the query
636 # parameters from the preceding table except for X-Amz-Signature.
637 signed_headers = self.signed_headers(self.headers_to_sign(request))
639 auth_params = {
640 'X-Amz-Algorithm': 'AWS4-HMAC-SHA256',
641 'X-Amz-Credential': self.scope(request),
642 'X-Amz-Date': request.context['timestamp'],
643 'X-Amz-Expires': self._expires,
644 'X-Amz-SignedHeaders': signed_headers,
645 }
646 if self.credentials.token is not None:
647 auth_params['X-Amz-S3session-Token'] = self.credentials.token
648 # Now parse the original query string to a dict, inject our new query
649 # params, and serialize back to a query string.
650 url_parts = urlsplit(request.url)
651 # parse_qs makes each value a list, but in our case we know we won't
652 # have repeated keys so we know we have single element lists which we
653 # can convert back to scalar values.
654 query_string_parts = parse_qs(url_parts.query, keep_blank_values=True)
655 query_dict = {k: v[0] for k, v in query_string_parts.items()}
657 if request.params:
658 query_dict.update(request.params)
659 request.params = {}
660 # The spec is particular about this. It *has* to be:
661 # https://<endpoint>?<operation params>&<auth params>
662 # You can't mix the two types of params together, i.e just keep doing
663 # new_query_params.update(op_params)
664 # new_query_params.update(auth_params)
665 # percent_encode_sequence(new_query_params)
666 operation_params = ''
667 if request.data:
668 # We also need to move the body params into the query string. To
669 # do this, we first have to convert it to a dict.
670 query_dict.update(_get_body_as_dict(request))
671 request.data = ''
672 if query_dict:
673 operation_params = percent_encode_sequence(query_dict) + '&'
674 new_query_string = (
675 f"{operation_params}{percent_encode_sequence(auth_params)}"
676 )
677 # url_parts is a tuple (and therefore immutable) so we need to create
678 # a new url_parts with the new query string.
679 # <part> - <index>
680 # scheme - 0
681 # netloc - 1
682 # path - 2
683 # query - 3 <-- we're replacing this.
684 # fragment - 4
685 p = url_parts
686 new_url_parts = (p[0], p[1], p[2], new_query_string, p[4])
687 request.url = urlunsplit(new_url_parts)
689 def _inject_signature_to_request(self, request, signature):
690 # Rather than calculating an "Authorization" header, for the query
691 # param quth, we just append an 'X-Amz-Signature' param to the end
692 # of the query string.
693 request.url += f'&X-Amz-Signature={signature}'
695 def _normalize_url_path(self, path):
696 # For S3, we do not normalize the path.
697 return path
699 def payload(self, request):
700 # From the doc link above:
701 # "You don't include a payload hash in the Canonical Request, because
702 # when you create a presigned URL, you don't know anything about the
703 # payload. Instead, you use a constant string "UNSIGNED-PAYLOAD".
704 return UNSIGNED_PAYLOAD
707class SigV4QueryAuth(SigV4Auth):
708 DEFAULT_EXPIRES = 3600
710 def __init__(
711 self, credentials, service_name, region_name, expires=DEFAULT_EXPIRES
712 ):
713 super().__init__(credentials, service_name, region_name)
714 self._expires = expires
716 def _modify_request_before_signing(self, request):
717 # We automatically set this header, so if it's the auto-set value we
718 # want to get rid of it since it doesn't make sense for presigned urls.
719 content_type = request.headers.get('content-type')
720 blacklisted_content_type = (
721 'application/x-www-form-urlencoded; charset=utf-8'
722 )
723 if content_type == blacklisted_content_type:
724 del request.headers['content-type']
726 # Note that we're not including X-Amz-Signature.
727 # From the docs: "The Canonical Query String must include all the query
728 # parameters from the preceding table except for X-Amz-Signature.
729 signed_headers = self.signed_headers(self.headers_to_sign(request))
731 auth_params = {
732 'X-Amz-Algorithm': 'AWS4-HMAC-SHA256',
733 'X-Amz-Credential': self.scope(request),
734 'X-Amz-Date': request.context['timestamp'],
735 'X-Amz-Expires': self._expires,
736 'X-Amz-SignedHeaders': signed_headers,
737 }
738 if self.credentials.token is not None:
739 auth_params['X-Amz-Security-Token'] = self.credentials.token
740 # Now parse the original query string to a dict, inject our new query
741 # params, and serialize back to a query string.
742 url_parts = urlsplit(request.url)
743 # parse_qs makes each value a list, but in our case we know we won't
744 # have repeated keys so we know we have single element lists which we
745 # can convert back to scalar values.
746 query_string_parts = parse_qs(url_parts.query, keep_blank_values=True)
747 query_dict = {k: v[0] for k, v in query_string_parts.items()}
749 if request.params:
750 query_dict.update(request.params)
751 request.params = {}
752 # The spec is particular about this. It *has* to be:
753 # https://<endpoint>?<operation params>&<auth params>
754 # You can't mix the two types of params together, i.e just keep doing
755 # new_query_params.update(op_params)
756 # new_query_params.update(auth_params)
757 # percent_encode_sequence(new_query_params)
758 operation_params = ''
759 if request.data:
760 # We also need to move the body params into the query string. To
761 # do this, we first have to convert it to a dict.
762 query_dict.update(_get_body_as_dict(request))
763 request.data = ''
764 if query_dict:
765 operation_params = percent_encode_sequence(query_dict) + '&'
766 new_query_string = (
767 f"{operation_params}{percent_encode_sequence(auth_params)}"
768 )
769 # url_parts is a tuple (and therefore immutable) so we need to create
770 # a new url_parts with the new query string.
771 # <part> - <index>
772 # scheme - 0
773 # netloc - 1
774 # path - 2
775 # query - 3 <-- we're replacing this.
776 # fragment - 4
777 p = url_parts
778 new_url_parts = (p[0], p[1], p[2], new_query_string, p[4])
779 request.url = urlunsplit(new_url_parts)
781 def _inject_signature_to_request(self, request, signature):
782 # Rather than calculating an "Authorization" header, for the query
783 # param quth, we just append an 'X-Amz-Signature' param to the end
784 # of the query string.
785 request.url += f'&X-Amz-Signature={signature}'
788class S3SigV4QueryAuth(SigV4QueryAuth):
789 """S3 SigV4 auth using query parameters.
791 This signer will sign a request using query parameters and signature
792 version 4, i.e a "presigned url" signer.
794 Based off of:
796 http://docs.aws.amazon.com/AmazonS3/latest/API/sigv4-query-string-auth.html
798 """
800 def _normalize_url_path(self, path):
801 # For S3, we do not normalize the path.
802 return path
804 def payload(self, request):
805 # From the doc link above:
806 # "You don't include a payload hash in the Canonical Request, because
807 # when you create a presigned URL, you don't know anything about the
808 # payload. Instead, you use a constant string "UNSIGNED-PAYLOAD".
809 return UNSIGNED_PAYLOAD
812class S3SigV4PostAuth(SigV4Auth):
813 """
814 Presigns a s3 post
816 Implementation doc here:
817 http://docs.aws.amazon.com/AmazonS3/latest/API/sigv4-UsingHTTPPOST.html
818 """
820 def add_auth(self, request):
821 datetime_now = datetime.datetime.utcnow()
822 request.context['timestamp'] = datetime_now.strftime(SIGV4_TIMESTAMP)
824 fields = {}
825 if request.context.get('s3-presign-post-fields', None) is not None:
826 fields = request.context['s3-presign-post-fields']
828 policy = {}
829 conditions = []
830 if request.context.get('s3-presign-post-policy', None) is not None:
831 policy = request.context['s3-presign-post-policy']
832 if policy.get('conditions', None) is not None:
833 conditions = policy['conditions']
835 policy['conditions'] = conditions
837 fields['x-amz-algorithm'] = 'AWS4-HMAC-SHA256'
838 fields['x-amz-credential'] = self.scope(request)
839 fields['x-amz-date'] = request.context['timestamp']
841 conditions.append({'x-amz-algorithm': 'AWS4-HMAC-SHA256'})
842 conditions.append({'x-amz-credential': self.scope(request)})
843 conditions.append({'x-amz-date': request.context['timestamp']})
845 if self.credentials.token is not None:
846 fields['x-amz-security-token'] = self.credentials.token
847 conditions.append({'x-amz-security-token': self.credentials.token})
849 # Dump the base64 encoded policy into the fields dictionary.
850 fields['policy'] = base64.b64encode(
851 json.dumps(policy).encode('utf-8')
852 ).decode('utf-8')
854 fields['x-amz-signature'] = self.signature(fields['policy'], request)
856 request.context['s3-presign-post-fields'] = fields
857 request.context['s3-presign-post-policy'] = policy
860class HmacV1Auth(BaseSigner):
861 # List of Query String Arguments of Interest
862 QSAOfInterest = [
863 'accelerate',
864 'acl',
865 'cors',
866 'defaultObjectAcl',
867 'location',
868 'logging',
869 'partNumber',
870 'policy',
871 'requestPayment',
872 'torrent',
873 'versioning',
874 'versionId',
875 'versions',
876 'website',
877 'uploads',
878 'uploadId',
879 'response-content-type',
880 'response-content-language',
881 'response-expires',
882 'response-cache-control',
883 'response-content-disposition',
884 'response-content-encoding',
885 'delete',
886 'lifecycle',
887 'tagging',
888 'restore',
889 'storageClass',
890 'notification',
891 'replication',
892 'requestPayment',
893 'analytics',
894 'metrics',
895 'inventory',
896 'select',
897 'select-type',
898 'object-lock',
899 ]
901 def __init__(self, credentials, service_name=None, region_name=None):
902 self.credentials = credentials
904 def sign_string(self, string_to_sign):
905 new_hmac = hmac.new(
906 self.credentials.secret_key.encode('utf-8'), digestmod=sha1
907 )
908 new_hmac.update(string_to_sign.encode('utf-8'))
909 return encodebytes(new_hmac.digest()).strip().decode('utf-8')
911 def canonical_standard_headers(self, headers):
912 interesting_headers = ['content-md5', 'content-type', 'date']
913 hoi = []
914 if 'Date' in headers:
915 del headers['Date']
916 headers['Date'] = self._get_date()
917 for ih in interesting_headers:
918 found = False
919 for key in headers:
920 lk = key.lower()
921 if headers[key] is not None and lk == ih:
922 hoi.append(headers[key].strip())
923 found = True
924 if not found:
925 hoi.append('')
926 return '\n'.join(hoi)
928 def canonical_custom_headers(self, headers):
929 hoi = []
930 custom_headers = {}
931 for key in headers:
932 lk = key.lower()
933 if headers[key] is not None:
934 if lk.startswith('x-amz-'):
935 custom_headers[lk] = ','.join(
936 v.strip() for v in headers.get_all(key)
937 )
938 sorted_header_keys = sorted(custom_headers.keys())
939 for key in sorted_header_keys:
940 hoi.append(f"{key}:{custom_headers[key]}")
941 return '\n'.join(hoi)
943 def unquote_v(self, nv):
944 """
945 TODO: Do we need this?
946 """
947 if len(nv) == 1:
948 return nv
949 else:
950 return (nv[0], unquote(nv[1]))
952 def canonical_resource(self, split, auth_path=None):
953 # don't include anything after the first ? in the resource...
954 # unless it is one of the QSA of interest, defined above
955 # NOTE:
956 # The path in the canonical resource should always be the
957 # full path including the bucket name, even for virtual-hosting
958 # style addressing. The ``auth_path`` keeps track of the full
959 # path for the canonical resource and would be passed in if
960 # the client was using virtual-hosting style.
961 if auth_path is not None:
962 buf = auth_path
963 else:
964 buf = split.path
965 if split.query:
966 qsa = split.query.split('&')
967 qsa = [a.split('=', 1) for a in qsa]
968 qsa = [
969 self.unquote_v(a) for a in qsa if a[0] in self.QSAOfInterest
970 ]
971 if len(qsa) > 0:
972 qsa.sort(key=itemgetter(0))
973 qsa = ['='.join(a) for a in qsa]
974 buf += '?'
975 buf += '&'.join(qsa)
976 return buf
978 def canonical_string(
979 self, method, split, headers, expires=None, auth_path=None
980 ):
981 cs = method.upper() + '\n'
982 cs += self.canonical_standard_headers(headers) + '\n'
983 custom_headers = self.canonical_custom_headers(headers)
984 if custom_headers:
985 cs += custom_headers + '\n'
986 cs += self.canonical_resource(split, auth_path=auth_path)
987 return cs
989 def get_signature(
990 self, method, split, headers, expires=None, auth_path=None
991 ):
992 if self.credentials.token:
993 del headers['x-amz-security-token']
994 headers['x-amz-security-token'] = self.credentials.token
995 string_to_sign = self.canonical_string(
996 method, split, headers, auth_path=auth_path
997 )
998 logger.debug(f'StringToSign:\n{string_to_sign}')
999 return self.sign_string(string_to_sign)
1001 def add_auth(self, request):
1002 if self.credentials is None:
1003 raise NoCredentialsError
1004 logger.debug("Calculating signature using hmacv1 auth.")
1005 split = urlsplit(request.url)
1006 logger.debug(f'HTTP request method: {request.method}')
1007 signature = self.get_signature(
1008 request.method, split, request.headers, auth_path=request.auth_path
1009 )
1010 self._inject_signature(request, signature)
1012 def _get_date(self):
1013 return formatdate(usegmt=True)
1015 def _inject_signature(self, request, signature):
1016 if 'Authorization' in request.headers:
1017 # We have to do this because request.headers is not
1018 # normal dictionary. It has the (unintuitive) behavior
1019 # of aggregating repeated setattr calls for the same
1020 # key value. For example:
1021 # headers['foo'] = 'a'; headers['foo'] = 'b'
1022 # list(headers) will print ['foo', 'foo'].
1023 del request.headers['Authorization']
1025 auth_header = f"AWS {self.credentials.access_key}:{signature}"
1026 request.headers['Authorization'] = auth_header
1029class HmacV1QueryAuth(HmacV1Auth):
1030 """
1031 Generates a presigned request for s3.
1033 Spec from this document:
1035 http://docs.aws.amazon.com/AmazonS3/latest/dev/RESTAuthentication.html
1036 #RESTAuthenticationQueryStringAuth
1038 """
1040 DEFAULT_EXPIRES = 3600
1042 def __init__(self, credentials, expires=DEFAULT_EXPIRES):
1043 self.credentials = credentials
1044 self._expires = expires
1046 def _get_date(self):
1047 return str(int(time.time() + int(self._expires)))
1049 def _inject_signature(self, request, signature):
1050 query_dict = {}
1051 query_dict['AWSAccessKeyId'] = self.credentials.access_key
1052 query_dict['Signature'] = signature
1054 for header_key in request.headers:
1055 lk = header_key.lower()
1056 # For query string requests, Expires is used instead of the
1057 # Date header.
1058 if header_key == 'Date':
1059 query_dict['Expires'] = request.headers['Date']
1060 # We only want to include relevant headers in the query string.
1061 # These can be anything that starts with x-amz, is Content-MD5,
1062 # or is Content-Type.
1063 elif lk.startswith('x-amz-') or lk in (
1064 'content-md5',
1065 'content-type',
1066 ):
1067 query_dict[lk] = request.headers[lk]
1068 # Combine all of the identified headers into an encoded
1069 # query string
1070 new_query_string = percent_encode_sequence(query_dict)
1072 # Create a new url with the presigned url.
1073 p = urlsplit(request.url)
1074 if p[3]:
1075 # If there was a pre-existing query string, we should
1076 # add that back before injecting the new query string.
1077 new_query_string = f'{p[3]}&{new_query_string}'
1078 new_url_parts = (p[0], p[1], p[2], new_query_string, p[4])
1079 request.url = urlunsplit(new_url_parts)
1082class HmacV1PostAuth(HmacV1Auth):
1083 """
1084 Generates a presigned post for s3.
1086 Spec from this document:
1088 http://docs.aws.amazon.com/AmazonS3/latest/dev/UsingHTTPPOST.html
1089 """
1091 def add_auth(self, request):
1092 fields = {}
1093 if request.context.get('s3-presign-post-fields', None) is not None:
1094 fields = request.context['s3-presign-post-fields']
1096 policy = {}
1097 conditions = []
1098 if request.context.get('s3-presign-post-policy', None) is not None:
1099 policy = request.context['s3-presign-post-policy']
1100 if policy.get('conditions', None) is not None:
1101 conditions = policy['conditions']
1103 policy['conditions'] = conditions
1105 fields['AWSAccessKeyId'] = self.credentials.access_key
1107 if self.credentials.token is not None:
1108 fields['x-amz-security-token'] = self.credentials.token
1109 conditions.append({'x-amz-security-token': self.credentials.token})
1111 # Dump the base64 encoded policy into the fields dictionary.
1112 fields['policy'] = base64.b64encode(
1113 json.dumps(policy).encode('utf-8')
1114 ).decode('utf-8')
1116 fields['signature'] = self.sign_string(fields['policy'])
1118 request.context['s3-presign-post-fields'] = fields
1119 request.context['s3-presign-post-policy'] = policy
1122class BearerAuth(TokenSigner):
1123 """
1124 Performs bearer token authorization by placing the bearer token in the
1125 Authorization header as specified by Section 2.1 of RFC 6750.
1127 https://datatracker.ietf.org/doc/html/rfc6750#section-2.1
1128 """
1130 def add_auth(self, request):
1131 if self.auth_token is None:
1132 raise NoAuthTokenError()
1134 auth_header = f'Bearer {self.auth_token.token}'
1135 if 'Authorization' in request.headers:
1136 del request.headers['Authorization']
1137 request.headers['Authorization'] = auth_header
1140def resolve_auth_type(auth_trait):
1141 for auth_type in auth_trait:
1142 if auth_type == 'smithy.api#noAuth':
1143 return AUTH_TYPE_TO_SIGNATURE_VERSION[auth_type]
1144 elif auth_type in AUTH_TYPE_TO_SIGNATURE_VERSION:
1145 signature_version = AUTH_TYPE_TO_SIGNATURE_VERSION[auth_type]
1146 if signature_version in AUTH_TYPE_MAPS:
1147 return signature_version
1148 else:
1149 raise UnknownSignatureVersionError(signature_version=auth_type)
1150 raise UnsupportedSignatureVersionError(signature_version=auth_trait)
1153AUTH_TYPE_MAPS = {
1154 'v2': SigV2Auth,
1155 'v3': SigV3Auth,
1156 'v3https': SigV3Auth,
1157 's3': HmacV1Auth,
1158 's3-query': HmacV1QueryAuth,
1159 's3-presign-post': HmacV1PostAuth,
1160 's3v4-presign-post': S3SigV4PostAuth,
1161 'v4-s3express': S3ExpressAuth,
1162 'v4-s3express-query': S3ExpressQueryAuth,
1163 'v4-s3express-presign-post': S3ExpressPostAuth,
1164 'bearer': BearerAuth,
1165}
1167# Define v4 signers depending on if CRT is present
1168if HAS_CRT:
1169 from botocore.crt.auth import CRT_AUTH_TYPE_MAPS
1171 AUTH_TYPE_MAPS.update(CRT_AUTH_TYPE_MAPS)
1172else:
1173 AUTH_TYPE_MAPS.update(
1174 {
1175 'v4': SigV4Auth,
1176 'v4-query': SigV4QueryAuth,
1177 's3v4': S3SigV4Auth,
1178 's3v4-query': S3SigV4QueryAuth,
1179 }
1180 )
1182AUTH_TYPE_TO_SIGNATURE_VERSION = {
1183 'aws.auth#sigv4': 'v4',
1184 'aws.auth#sigv4a': 'v4a',
1185 'smithy.api#httpBearerAuth': 'bearer',
1186 'smithy.api#noAuth': 'none',
1187}