Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/botocore/utils.py: 20%
1562 statements
« prev ^ index » next coverage.py v7.2.2, created at 2023-03-26 06:03 +0000
« prev ^ index » next coverage.py v7.2.2, created at 2023-03-26 06:03 +0000
1# Copyright 2012-2014 Amazon.com, Inc. or its affiliates. All Rights Reserved.
2#
3# Licensed under the Apache License, Version 2.0 (the "License"). You
4# may not use this file except in compliance with the License. A copy of
5# the License is located at
6#
7# http://aws.amazon.com/apache2.0/
8#
9# or in the "license" file accompanying this file. This file is
10# distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF
11# ANY KIND, either express or implied. See the License for the specific
12# language governing permissions and limitations under the License.
13import base64
14import binascii
15import datetime
16import email.message
17import functools
18import hashlib
19import io
20import logging
21import os
22import random
23import re
24import socket
25import time
26import warnings
27import weakref
28from pathlib import Path
29from urllib.request import getproxies, proxy_bypass
31import dateutil.parser
32from dateutil.tz import tzutc
33from urllib3.exceptions import LocationParseError
35import botocore
36import botocore.awsrequest
37import botocore.httpsession
39# IP Regexes retained for backwards compatibility
40from botocore.compat import HEX_PAT # noqa: F401
41from botocore.compat import IPV4_PAT # noqa: F401
42from botocore.compat import IPV6_ADDRZ_PAT # noqa: F401
43from botocore.compat import IPV6_PAT # noqa: F401
44from botocore.compat import LS32_PAT # noqa: F401
45from botocore.compat import UNRESERVED_PAT # noqa: F401
46from botocore.compat import ZONE_ID_PAT # noqa: F401
47from botocore.compat import (
48 HAS_CRT,
49 IPV4_RE,
50 IPV6_ADDRZ_RE,
51 MD5_AVAILABLE,
52 UNSAFE_URL_CHARS,
53 OrderedDict,
54 get_md5,
55 get_tzinfo_options,
56 json,
57 quote,
58 urlparse,
59 urlsplit,
60 urlunsplit,
61 zip_longest,
62)
63from botocore.exceptions import (
64 ClientError,
65 ConfigNotFound,
66 ConnectionClosedError,
67 ConnectTimeoutError,
68 EndpointConnectionError,
69 HTTPClientError,
70 InvalidDNSNameError,
71 InvalidEndpointConfigurationError,
72 InvalidExpressionError,
73 InvalidHostLabelError,
74 InvalidIMDSEndpointError,
75 InvalidIMDSEndpointModeError,
76 InvalidRegionError,
77 MetadataRetrievalError,
78 MissingDependencyException,
79 ReadTimeoutError,
80 SSOTokenLoadError,
81 UnsupportedOutpostResourceError,
82 UnsupportedS3AccesspointConfigurationError,
83 UnsupportedS3ArnError,
84 UnsupportedS3ConfigurationError,
85 UnsupportedS3ControlArnError,
86 UnsupportedS3ControlConfigurationError,
87)
89logger = logging.getLogger(__name__)
90DEFAULT_METADATA_SERVICE_TIMEOUT = 1
91METADATA_BASE_URL = 'http://169.254.169.254/'
92METADATA_BASE_URL_IPv6 = 'http://[fd00:ec2::254]/'
93METADATA_ENDPOINT_MODES = ('ipv4', 'ipv6')
95# These are chars that do not need to be urlencoded.
96# Based on rfc2986, section 2.3
97SAFE_CHARS = '-._~'
98LABEL_RE = re.compile(r'[a-z0-9][a-z0-9\-]*[a-z0-9]')
99RETRYABLE_HTTP_ERRORS = (
100 ReadTimeoutError,
101 EndpointConnectionError,
102 ConnectionClosedError,
103 ConnectTimeoutError,
104)
105S3_ACCELERATE_WHITELIST = ['dualstack']
106# In switching events from using service name / endpoint prefix to service
107# id, we have to preserve compatibility. This maps the instances where either
108# is different than the transformed service id.
109EVENT_ALIASES = {
110 "a4b": "alexa-for-business",
111 "alexaforbusiness": "alexa-for-business",
112 "api.mediatailor": "mediatailor",
113 "api.pricing": "pricing",
114 "api.sagemaker": "sagemaker",
115 "apigateway": "api-gateway",
116 "application-autoscaling": "application-auto-scaling",
117 "appstream2": "appstream",
118 "autoscaling": "auto-scaling",
119 "autoscaling-plans": "auto-scaling-plans",
120 "ce": "cost-explorer",
121 "cloudhsmv2": "cloudhsm-v2",
122 "cloudsearchdomain": "cloudsearch-domain",
123 "cognito-idp": "cognito-identity-provider",
124 "config": "config-service",
125 "cur": "cost-and-usage-report-service",
126 "data.iot": "iot-data-plane",
127 "data.jobs.iot": "iot-jobs-data-plane",
128 "data.mediastore": "mediastore-data",
129 "datapipeline": "data-pipeline",
130 "devicefarm": "device-farm",
131 "devices.iot1click": "iot-1click-devices-service",
132 "directconnect": "direct-connect",
133 "discovery": "application-discovery-service",
134 "dms": "database-migration-service",
135 "ds": "directory-service",
136 "dynamodbstreams": "dynamodb-streams",
137 "elasticbeanstalk": "elastic-beanstalk",
138 "elasticfilesystem": "efs",
139 "elasticloadbalancing": "elastic-load-balancing",
140 "elasticmapreduce": "emr",
141 "elastictranscoder": "elastic-transcoder",
142 "elb": "elastic-load-balancing",
143 "elbv2": "elastic-load-balancing-v2",
144 "email": "ses",
145 "entitlement.marketplace": "marketplace-entitlement-service",
146 "es": "elasticsearch-service",
147 "events": "eventbridge",
148 "cloudwatch-events": "eventbridge",
149 "iot-data": "iot-data-plane",
150 "iot-jobs-data": "iot-jobs-data-plane",
151 "iot1click-devices": "iot-1click-devices-service",
152 "iot1click-projects": "iot-1click-projects",
153 "kinesisanalytics": "kinesis-analytics",
154 "kinesisvideo": "kinesis-video",
155 "lex-models": "lex-model-building-service",
156 "lex-runtime": "lex-runtime-service",
157 "logs": "cloudwatch-logs",
158 "machinelearning": "machine-learning",
159 "marketplace-entitlement": "marketplace-entitlement-service",
160 "marketplacecommerceanalytics": "marketplace-commerce-analytics",
161 "metering.marketplace": "marketplace-metering",
162 "meteringmarketplace": "marketplace-metering",
163 "mgh": "migration-hub",
164 "models.lex": "lex-model-building-service",
165 "monitoring": "cloudwatch",
166 "mturk-requester": "mturk",
167 "opsworks-cm": "opsworkscm",
168 "projects.iot1click": "iot-1click-projects",
169 "resourcegroupstaggingapi": "resource-groups-tagging-api",
170 "route53": "route-53",
171 "route53domains": "route-53-domains",
172 "runtime.lex": "lex-runtime-service",
173 "runtime.sagemaker": "sagemaker-runtime",
174 "sdb": "simpledb",
175 "secretsmanager": "secrets-manager",
176 "serverlessrepo": "serverlessapplicationrepository",
177 "servicecatalog": "service-catalog",
178 "states": "sfn",
179 "stepfunctions": "sfn",
180 "storagegateway": "storage-gateway",
181 "streams.dynamodb": "dynamodb-streams",
182 "tagging": "resource-groups-tagging-api",
183}
186# This pattern can be used to detect if a header is a flexible checksum header
187CHECKSUM_HEADER_PATTERN = re.compile(
188 r'^X-Amz-Checksum-([a-z0-9]*)$',
189 flags=re.IGNORECASE,
190)
193def ensure_boolean(val):
194 """Ensures a boolean value if a string or boolean is provided
196 For strings, the value for True/False is case insensitive
197 """
198 if isinstance(val, bool):
199 return val
200 elif isinstance(val, str):
201 return val.lower() == 'true'
202 else:
203 return False
206def resolve_imds_endpoint_mode(session):
207 """Resolving IMDS endpoint mode to either IPv6 or IPv4.
209 ec2_metadata_service_endpoint_mode takes precedence over imds_use_ipv6.
210 """
211 endpoint_mode = session.get_config_variable(
212 'ec2_metadata_service_endpoint_mode'
213 )
214 if endpoint_mode is not None:
215 lendpoint_mode = endpoint_mode.lower()
216 if lendpoint_mode not in METADATA_ENDPOINT_MODES:
217 error_msg_kwargs = {
218 'mode': endpoint_mode,
219 'valid_modes': METADATA_ENDPOINT_MODES,
220 }
221 raise InvalidIMDSEndpointModeError(**error_msg_kwargs)
222 return lendpoint_mode
223 elif session.get_config_variable('imds_use_ipv6'):
224 return 'ipv6'
225 return 'ipv4'
228def is_json_value_header(shape):
229 """Determines if the provided shape is the special header type jsonvalue.
231 :type shape: botocore.shape
232 :param shape: Shape to be inspected for the jsonvalue trait.
234 :return: True if this type is a jsonvalue, False otherwise
235 :rtype: Bool
236 """
237 return (
238 hasattr(shape, 'serialization')
239 and shape.serialization.get('jsonvalue', False)
240 and shape.serialization.get('location') == 'header'
241 and shape.type_name == 'string'
242 )
245def has_header(header_name, headers):
246 """Case-insensitive check for header key."""
247 if header_name is None:
248 return False
249 elif isinstance(headers, botocore.awsrequest.HeadersDict):
250 return header_name in headers
251 else:
252 return header_name.lower() in [key.lower() for key in headers.keys()]
255def get_service_module_name(service_model):
256 """Returns the module name for a service
258 This is the value used in both the documentation and client class name
259 """
260 name = service_model.metadata.get(
261 'serviceAbbreviation',
262 service_model.metadata.get(
263 'serviceFullName', service_model.service_name
264 ),
265 )
266 name = name.replace('Amazon', '')
267 name = name.replace('AWS', '')
268 name = re.sub(r'\W+', '', name)
269 return name
272def normalize_url_path(path):
273 if not path:
274 return '/'
275 return remove_dot_segments(path)
278def normalize_boolean(val):
279 """Returns None if val is None, otherwise ensure value
280 converted to boolean"""
281 if val is None:
282 return val
283 else:
284 return ensure_boolean(val)
287def remove_dot_segments(url):
288 # RFC 3986, section 5.2.4 "Remove Dot Segments"
289 # Also, AWS services require consecutive slashes to be removed,
290 # so that's done here as well
291 if not url:
292 return ''
293 input_url = url.split('/')
294 output_list = []
295 for x in input_url:
296 if x and x != '.':
297 if x == '..':
298 if output_list:
299 output_list.pop()
300 else:
301 output_list.append(x)
303 if url[0] == '/':
304 first = '/'
305 else:
306 first = ''
307 if url[-1] == '/' and output_list:
308 last = '/'
309 else:
310 last = ''
311 return first + '/'.join(output_list) + last
314def validate_jmespath_for_set(expression):
315 # Validates a limited jmespath expression to determine if we can set a
316 # value based on it. Only works with dotted paths.
317 if not expression or expression == '.':
318 raise InvalidExpressionError(expression=expression)
320 for invalid in ['[', ']', '*']:
321 if invalid in expression:
322 raise InvalidExpressionError(expression=expression)
325def set_value_from_jmespath(source, expression, value, is_first=True):
326 # This takes a (limited) jmespath-like expression & can set a value based
327 # on it.
328 # Limitations:
329 # * Only handles dotted lookups
330 # * No offsets/wildcards/slices/etc.
331 if is_first:
332 validate_jmespath_for_set(expression)
334 bits = expression.split('.', 1)
335 current_key, remainder = bits[0], bits[1] if len(bits) > 1 else ''
337 if not current_key:
338 raise InvalidExpressionError(expression=expression)
340 if remainder:
341 if current_key not in source:
342 # We've got something in the expression that's not present in the
343 # source (new key). If there's any more bits, we'll set the key
344 # with an empty dictionary.
345 source[current_key] = {}
347 return set_value_from_jmespath(
348 source[current_key], remainder, value, is_first=False
349 )
351 # If we're down to a single key, set it.
352 source[current_key] = value
355def is_global_accesspoint(context):
356 """Determine if request is intended for an MRAP accesspoint."""
357 s3_accesspoint = context.get('s3_accesspoint', {})
358 is_global = s3_accesspoint.get('region') == ''
359 return is_global
362class _RetriesExceededError(Exception):
363 """Internal exception used when the number of retries are exceeded."""
365 pass
368class BadIMDSRequestError(Exception):
369 def __init__(self, request):
370 self.request = request
373class IMDSFetcher:
375 _RETRIES_EXCEEDED_ERROR_CLS = _RetriesExceededError
376 _TOKEN_PATH = 'latest/api/token'
377 _TOKEN_TTL = '21600'
379 def __init__(
380 self,
381 timeout=DEFAULT_METADATA_SERVICE_TIMEOUT,
382 num_attempts=1,
383 base_url=METADATA_BASE_URL,
384 env=None,
385 user_agent=None,
386 config=None,
387 ):
388 self._timeout = timeout
389 self._num_attempts = num_attempts
390 if config is None:
391 config = {}
392 self._base_url = self._select_base_url(base_url, config)
393 self._config = config
395 if env is None:
396 env = os.environ.copy()
397 self._disabled = env.get('AWS_EC2_METADATA_DISABLED', 'false').lower()
398 self._disabled = self._disabled == 'true'
399 self._user_agent = user_agent
400 self._session = botocore.httpsession.URLLib3Session(
401 timeout=self._timeout,
402 proxies=get_environ_proxies(self._base_url),
403 )
405 def get_base_url(self):
406 return self._base_url
408 def _select_base_url(self, base_url, config):
409 if config is None:
410 config = {}
412 requires_ipv6 = (
413 config.get('ec2_metadata_service_endpoint_mode') == 'ipv6'
414 )
415 custom_metadata_endpoint = config.get('ec2_metadata_service_endpoint')
417 if requires_ipv6 and custom_metadata_endpoint:
418 logger.warning(
419 "Custom endpoint and IMDS_USE_IPV6 are both set. Using custom endpoint."
420 )
422 chosen_base_url = None
424 if base_url != METADATA_BASE_URL:
425 chosen_base_url = base_url
426 elif custom_metadata_endpoint:
427 chosen_base_url = custom_metadata_endpoint
428 elif requires_ipv6:
429 chosen_base_url = METADATA_BASE_URL_IPv6
430 else:
431 chosen_base_url = METADATA_BASE_URL
433 logger.debug("IMDS ENDPOINT: %s" % chosen_base_url)
434 if not is_valid_uri(chosen_base_url):
435 raise InvalidIMDSEndpointError(endpoint=chosen_base_url)
437 return chosen_base_url
439 def _construct_url(self, path):
440 sep = ''
441 if self._base_url and not self._base_url.endswith('/'):
442 sep = '/'
443 return f'{self._base_url}{sep}{path}'
445 def _fetch_metadata_token(self):
446 self._assert_enabled()
447 url = self._construct_url(self._TOKEN_PATH)
448 headers = {
449 'x-aws-ec2-metadata-token-ttl-seconds': self._TOKEN_TTL,
450 }
451 self._add_user_agent(headers)
452 request = botocore.awsrequest.AWSRequest(
453 method='PUT', url=url, headers=headers
454 )
455 for i in range(self._num_attempts):
456 try:
457 response = self._session.send(request.prepare())
458 if response.status_code == 200:
459 return response.text
460 elif response.status_code in (404, 403, 405):
461 return None
462 elif response.status_code in (400,):
463 raise BadIMDSRequestError(request)
464 except ReadTimeoutError:
465 return None
466 except RETRYABLE_HTTP_ERRORS as e:
467 logger.debug(
468 "Caught retryable HTTP exception while making metadata "
469 "service request to %s: %s",
470 url,
471 e,
472 exc_info=True,
473 )
474 except HTTPClientError as e:
475 if isinstance(e.kwargs.get('error'), LocationParseError):
476 raise InvalidIMDSEndpointError(endpoint=url, error=e)
477 else:
478 raise
479 return None
481 def _get_request(self, url_path, retry_func, token=None):
482 """Make a get request to the Instance Metadata Service.
484 :type url_path: str
485 :param url_path: The path component of the URL to make a get request.
486 This arg is appended to the base_url that was provided in the
487 initializer.
489 :type retry_func: callable
490 :param retry_func: A function that takes the response as an argument
491 and determines if it needs to retry. By default empty and non
492 200 OK responses are retried.
494 :type token: str
495 :param token: Metadata token to send along with GET requests to IMDS.
496 """
497 self._assert_enabled()
498 if retry_func is None:
499 retry_func = self._default_retry
500 url = self._construct_url(url_path)
501 headers = {}
502 if token is not None:
503 headers['x-aws-ec2-metadata-token'] = token
504 self._add_user_agent(headers)
505 for i in range(self._num_attempts):
506 try:
507 request = botocore.awsrequest.AWSRequest(
508 method='GET', url=url, headers=headers
509 )
510 response = self._session.send(request.prepare())
511 if not retry_func(response):
512 return response
513 except RETRYABLE_HTTP_ERRORS as e:
514 logger.debug(
515 "Caught retryable HTTP exception while making metadata "
516 "service request to %s: %s",
517 url,
518 e,
519 exc_info=True,
520 )
521 raise self._RETRIES_EXCEEDED_ERROR_CLS()
523 def _add_user_agent(self, headers):
524 if self._user_agent is not None:
525 headers['User-Agent'] = self._user_agent
527 def _assert_enabled(self):
528 if self._disabled:
529 logger.debug("Access to EC2 metadata has been disabled.")
530 raise self._RETRIES_EXCEEDED_ERROR_CLS()
532 def _default_retry(self, response):
533 return self._is_non_ok_response(response) or self._is_empty(response)
535 def _is_non_ok_response(self, response):
536 if response.status_code != 200:
537 self._log_imds_response(response, 'non-200', log_body=True)
538 return True
539 return False
541 def _is_empty(self, response):
542 if not response.content:
543 self._log_imds_response(response, 'no body', log_body=True)
544 return True
545 return False
547 def _log_imds_response(self, response, reason_to_log, log_body=False):
548 statement = (
549 "Metadata service returned %s response "
550 "with status code of %s for url: %s"
551 )
552 logger_args = [reason_to_log, response.status_code, response.url]
553 if log_body:
554 statement += ", content body: %s"
555 logger_args.append(response.content)
556 logger.debug(statement, *logger_args)
559class InstanceMetadataFetcher(IMDSFetcher):
560 _URL_PATH = 'latest/meta-data/iam/security-credentials/'
561 _REQUIRED_CREDENTIAL_FIELDS = [
562 'AccessKeyId',
563 'SecretAccessKey',
564 'Token',
565 'Expiration',
566 ]
568 def retrieve_iam_role_credentials(self):
569 try:
570 token = self._fetch_metadata_token()
571 role_name = self._get_iam_role(token)
572 credentials = self._get_credentials(role_name, token)
573 if self._contains_all_credential_fields(credentials):
574 credentials = {
575 'role_name': role_name,
576 'access_key': credentials['AccessKeyId'],
577 'secret_key': credentials['SecretAccessKey'],
578 'token': credentials['Token'],
579 'expiry_time': credentials['Expiration'],
580 }
581 self._evaluate_expiration(credentials)
582 return credentials
583 else:
584 # IMDS can return a 200 response that has a JSON formatted
585 # error message (i.e. if ec2 is not trusted entity for the
586 # attached role). We do not necessarily want to retry for
587 # these and we also do not necessarily want to raise a key
588 # error. So at least log the problematic response and return
589 # an empty dictionary to signal that it was not able to
590 # retrieve credentials. These error will contain both a
591 # Code and Message key.
592 if 'Code' in credentials and 'Message' in credentials:
593 logger.debug(
594 'Error response received when retrieving'
595 'credentials: %s.',
596 credentials,
597 )
598 return {}
599 except self._RETRIES_EXCEEDED_ERROR_CLS:
600 logger.debug(
601 "Max number of attempts exceeded (%s) when "
602 "attempting to retrieve data from metadata service.",
603 self._num_attempts,
604 )
605 except BadIMDSRequestError as e:
606 logger.debug("Bad IMDS request: %s", e.request)
607 return {}
609 def _get_iam_role(self, token=None):
610 return self._get_request(
611 url_path=self._URL_PATH,
612 retry_func=self._needs_retry_for_role_name,
613 token=token,
614 ).text
616 def _get_credentials(self, role_name, token=None):
617 r = self._get_request(
618 url_path=self._URL_PATH + role_name,
619 retry_func=self._needs_retry_for_credentials,
620 token=token,
621 )
622 return json.loads(r.text)
624 def _is_invalid_json(self, response):
625 try:
626 json.loads(response.text)
627 return False
628 except ValueError:
629 self._log_imds_response(response, 'invalid json')
630 return True
632 def _needs_retry_for_role_name(self, response):
633 return self._is_non_ok_response(response) or self._is_empty(response)
635 def _needs_retry_for_credentials(self, response):
636 return (
637 self._is_non_ok_response(response)
638 or self._is_empty(response)
639 or self._is_invalid_json(response)
640 )
642 def _contains_all_credential_fields(self, credentials):
643 for field in self._REQUIRED_CREDENTIAL_FIELDS:
644 if field not in credentials:
645 logger.debug(
646 'Retrieved credentials is missing required field: %s',
647 field,
648 )
649 return False
650 return True
652 def _evaluate_expiration(self, credentials):
653 expiration = credentials.get("expiry_time")
654 if expiration is None:
655 return
656 try:
657 expiration = datetime.datetime.strptime(
658 expiration, "%Y-%m-%dT%H:%M:%SZ"
659 )
660 refresh_interval = self._config.get(
661 "ec2_credential_refresh_window", 60 * 10
662 )
663 jitter = random.randint(120, 600) # Between 2 to 10 minutes
664 refresh_interval_with_jitter = refresh_interval + jitter
665 current_time = datetime.datetime.utcnow()
666 refresh_offset = datetime.timedelta(
667 seconds=refresh_interval_with_jitter
668 )
669 extension_time = expiration - refresh_offset
670 if current_time >= extension_time:
671 new_time = current_time + refresh_offset
672 credentials["expiry_time"] = new_time.strftime(
673 "%Y-%m-%dT%H:%M:%SZ"
674 )
675 logger.info(
676 f"Attempting credential expiration extension due to a "
677 f"credential service availability issue. A refresh of "
678 f"these credentials will be attempted again within "
679 f"the next {refresh_interval_with_jitter/60:.0f} minutes."
680 )
681 except ValueError:
682 logger.debug(
683 f"Unable to parse expiry_time in {credentials['expiry_time']}"
684 )
687class IMDSRegionProvider:
688 def __init__(self, session, environ=None, fetcher=None):
689 """Initialize IMDSRegionProvider.
690 :type session: :class:`botocore.session.Session`
691 :param session: The session is needed to look up configuration for
692 how to contact the instance metadata service. Specifically the
693 whether or not it should use the IMDS region at all, and if so how
694 to configure the timeout and number of attempts to reach the
695 service.
696 :type environ: None or dict
697 :param environ: A dictionary of environment variables to use. If
698 ``None`` is the argument then ``os.environ`` will be used by
699 default.
700 :type fecther: :class:`botocore.utils.InstanceMetadataRegionFetcher`
701 :param fetcher: The class to actually handle the fetching of the region
702 from the IMDS. If not provided a default one will be created.
703 """
704 self._session = session
705 if environ is None:
706 environ = os.environ
707 self._environ = environ
708 self._fetcher = fetcher
710 def provide(self):
711 """Provide the region value from IMDS."""
712 instance_region = self._get_instance_metadata_region()
713 return instance_region
715 def _get_instance_metadata_region(self):
716 fetcher = self._get_fetcher()
717 region = fetcher.retrieve_region()
718 return region
720 def _get_fetcher(self):
721 if self._fetcher is None:
722 self._fetcher = self._create_fetcher()
723 return self._fetcher
725 def _create_fetcher(self):
726 metadata_timeout = self._session.get_config_variable(
727 'metadata_service_timeout'
728 )
729 metadata_num_attempts = self._session.get_config_variable(
730 'metadata_service_num_attempts'
731 )
732 imds_config = {
733 'ec2_metadata_service_endpoint': self._session.get_config_variable(
734 'ec2_metadata_service_endpoint'
735 ),
736 'ec2_metadata_service_endpoint_mode': resolve_imds_endpoint_mode(
737 self._session
738 ),
739 }
740 fetcher = InstanceMetadataRegionFetcher(
741 timeout=metadata_timeout,
742 num_attempts=metadata_num_attempts,
743 env=self._environ,
744 user_agent=self._session.user_agent(),
745 config=imds_config,
746 )
747 return fetcher
750class InstanceMetadataRegionFetcher(IMDSFetcher):
751 _URL_PATH = 'latest/meta-data/placement/availability-zone/'
753 def retrieve_region(self):
754 """Get the current region from the instance metadata service.
755 :rvalue: str
756 :returns: The region the current instance is running in or None
757 if the instance metadata service cannot be contacted or does not
758 give a valid response.
759 :rtype: None or str
760 :returns: Returns the region as a string if it is configured to use
761 IMDS as a region source. Otherwise returns ``None``. It will also
762 return ``None`` if it fails to get the region from IMDS due to
763 exhausting its retries or not being able to connect.
764 """
765 try:
766 region = self._get_region()
767 return region
768 except self._RETRIES_EXCEEDED_ERROR_CLS:
769 logger.debug(
770 "Max number of attempts exceeded (%s) when "
771 "attempting to retrieve data from metadata service.",
772 self._num_attempts,
773 )
774 return None
776 def _get_region(self):
777 token = self._fetch_metadata_token()
778 response = self._get_request(
779 url_path=self._URL_PATH,
780 retry_func=self._default_retry,
781 token=token,
782 )
783 availability_zone = response.text
784 region = availability_zone[:-1]
785 return region
788def merge_dicts(dict1, dict2, append_lists=False):
789 """Given two dict, merge the second dict into the first.
791 The dicts can have arbitrary nesting.
793 :param append_lists: If true, instead of clobbering a list with the new
794 value, append all of the new values onto the original list.
795 """
796 for key in dict2:
797 if isinstance(dict2[key], dict):
798 if key in dict1 and key in dict2:
799 merge_dicts(dict1[key], dict2[key])
800 else:
801 dict1[key] = dict2[key]
802 # If the value is a list and the ``append_lists`` flag is set,
803 # append the new values onto the original list
804 elif isinstance(dict2[key], list) and append_lists:
805 # The value in dict1 must be a list in order to append new
806 # values onto it.
807 if key in dict1 and isinstance(dict1[key], list):
808 dict1[key].extend(dict2[key])
809 else:
810 dict1[key] = dict2[key]
811 else:
812 # At scalar types, we iterate and merge the
813 # current dict that we're on.
814 dict1[key] = dict2[key]
817def lowercase_dict(original):
818 """Copies the given dictionary ensuring all keys are lowercase strings."""
819 copy = {}
820 for key in original:
821 copy[key.lower()] = original[key]
822 return copy
825def parse_key_val_file(filename, _open=open):
826 try:
827 with _open(filename) as f:
828 contents = f.read()
829 return parse_key_val_file_contents(contents)
830 except OSError:
831 raise ConfigNotFound(path=filename)
834def parse_key_val_file_contents(contents):
835 # This was originally extracted from the EC2 credential provider, which was
836 # fairly lenient in its parsing. We only try to parse key/val pairs if
837 # there's a '=' in the line.
838 final = {}
839 for line in contents.splitlines():
840 if '=' not in line:
841 continue
842 key, val = line.split('=', 1)
843 key = key.strip()
844 val = val.strip()
845 final[key] = val
846 return final
849def percent_encode_sequence(mapping, safe=SAFE_CHARS):
850 """Urlencode a dict or list into a string.
852 This is similar to urllib.urlencode except that:
854 * It uses quote, and not quote_plus
855 * It has a default list of safe chars that don't need
856 to be encoded, which matches what AWS services expect.
858 If any value in the input ``mapping`` is a list type,
859 then each list element wil be serialized. This is the equivalent
860 to ``urlencode``'s ``doseq=True`` argument.
862 This function should be preferred over the stdlib
863 ``urlencode()`` function.
865 :param mapping: Either a dict to urlencode or a list of
866 ``(key, value)`` pairs.
868 """
869 encoded_pairs = []
870 if hasattr(mapping, 'items'):
871 pairs = mapping.items()
872 else:
873 pairs = mapping
874 for key, value in pairs:
875 if isinstance(value, list):
876 for element in value:
877 encoded_pairs.append(
878 f'{percent_encode(key)}={percent_encode(element)}'
879 )
880 else:
881 encoded_pairs.append(
882 f'{percent_encode(key)}={percent_encode(value)}'
883 )
884 return '&'.join(encoded_pairs)
887def percent_encode(input_str, safe=SAFE_CHARS):
888 """Urlencodes a string.
890 Whereas percent_encode_sequence handles taking a dict/sequence and
891 producing a percent encoded string, this function deals only with
892 taking a string (not a dict/sequence) and percent encoding it.
894 If given the binary type, will simply URL encode it. If given the
895 text type, will produce the binary type by UTF-8 encoding the
896 text. If given something else, will convert it to the text type
897 first.
898 """
899 # If its not a binary or text string, make it a text string.
900 if not isinstance(input_str, (bytes, str)):
901 input_str = str(input_str)
902 # If it's not bytes, make it bytes by UTF-8 encoding it.
903 if not isinstance(input_str, bytes):
904 input_str = input_str.encode('utf-8')
905 return quote(input_str, safe=safe)
908def _parse_timestamp_with_tzinfo(value, tzinfo):
909 """Parse timestamp with pluggable tzinfo options."""
910 if isinstance(value, (int, float)):
911 # Possibly an epoch time.
912 return datetime.datetime.fromtimestamp(value, tzinfo())
913 else:
914 try:
915 return datetime.datetime.fromtimestamp(float(value), tzinfo())
916 except (TypeError, ValueError):
917 pass
918 try:
919 # In certain cases, a timestamp marked with GMT can be parsed into a
920 # different time zone, so here we provide a context which will
921 # enforce that GMT == UTC.
922 return dateutil.parser.parse(value, tzinfos={'GMT': tzutc()})
923 except (TypeError, ValueError) as e:
924 raise ValueError(f'Invalid timestamp "{value}": {e}')
927def parse_timestamp(value):
928 """Parse a timestamp into a datetime object.
930 Supported formats:
932 * iso8601
933 * rfc822
934 * epoch (value is an integer)
936 This will return a ``datetime.datetime`` object.
938 """
939 for tzinfo in get_tzinfo_options():
940 try:
941 return _parse_timestamp_with_tzinfo(value, tzinfo)
942 except OSError as e:
943 logger.debug(
944 'Unable to parse timestamp with "%s" timezone info.',
945 tzinfo.__name__,
946 exc_info=e,
947 )
948 raise RuntimeError(
949 'Unable to calculate correct timezone offset for "%s"' % value
950 )
953def parse_to_aware_datetime(value):
954 """Converted the passed in value to a datetime object with tzinfo.
956 This function can be used to normalize all timestamp inputs. This
957 function accepts a number of different types of inputs, but
958 will always return a datetime.datetime object with time zone
959 information.
961 The input param ``value`` can be one of several types:
963 * A datetime object (both naive and aware)
964 * An integer representing the epoch time (can also be a string
965 of the integer, i.e '0', instead of 0). The epoch time is
966 considered to be UTC.
967 * An iso8601 formatted timestamp. This does not need to be
968 a complete timestamp, it can contain just the date portion
969 without the time component.
971 The returned value will be a datetime object that will have tzinfo.
972 If no timezone info was provided in the input value, then UTC is
973 assumed, not local time.
975 """
976 # This is a general purpose method that handles several cases of
977 # converting the provided value to a string timestamp suitable to be
978 # serialized to an http request. It can handle:
979 # 1) A datetime.datetime object.
980 if isinstance(value, datetime.datetime):
981 datetime_obj = value
982 else:
983 # 2) A string object that's formatted as a timestamp.
984 # We document this as being an iso8601 timestamp, although
985 # parse_timestamp is a bit more flexible.
986 datetime_obj = parse_timestamp(value)
987 if datetime_obj.tzinfo is None:
988 # I think a case would be made that if no time zone is provided,
989 # we should use the local time. However, to restore backwards
990 # compat, the previous behavior was to assume UTC, which is
991 # what we're going to do here.
992 datetime_obj = datetime_obj.replace(tzinfo=tzutc())
993 else:
994 datetime_obj = datetime_obj.astimezone(tzutc())
995 return datetime_obj
998def datetime2timestamp(dt, default_timezone=None):
999 """Calculate the timestamp based on the given datetime instance.
1001 :type dt: datetime
1002 :param dt: A datetime object to be converted into timestamp
1003 :type default_timezone: tzinfo
1004 :param default_timezone: If it is provided as None, we treat it as tzutc().
1005 But it is only used when dt is a naive datetime.
1006 :returns: The timestamp
1007 """
1008 epoch = datetime.datetime(1970, 1, 1)
1009 if dt.tzinfo is None:
1010 if default_timezone is None:
1011 default_timezone = tzutc()
1012 dt = dt.replace(tzinfo=default_timezone)
1013 d = dt.replace(tzinfo=None) - dt.utcoffset() - epoch
1014 if hasattr(d, "total_seconds"):
1015 return d.total_seconds() # Works in Python 3.6+
1016 return (
1017 d.microseconds + (d.seconds + d.days * 24 * 3600) * 10**6
1018 ) / 10**6
1021def calculate_sha256(body, as_hex=False):
1022 """Calculate a sha256 checksum.
1024 This method will calculate the sha256 checksum of a file like
1025 object. Note that this method will iterate through the entire
1026 file contents. The caller is responsible for ensuring the proper
1027 starting position of the file and ``seek()``'ing the file back
1028 to its starting location if other consumers need to read from
1029 the file like object.
1031 :param body: Any file like object. The file must be opened
1032 in binary mode such that a ``.read()`` call returns bytes.
1033 :param as_hex: If True, then the hex digest is returned.
1034 If False, then the digest (as binary bytes) is returned.
1036 :returns: The sha256 checksum
1038 """
1039 checksum = hashlib.sha256()
1040 for chunk in iter(lambda: body.read(1024 * 1024), b''):
1041 checksum.update(chunk)
1042 if as_hex:
1043 return checksum.hexdigest()
1044 else:
1045 return checksum.digest()
1048def calculate_tree_hash(body):
1049 """Calculate a tree hash checksum.
1051 For more information see:
1053 http://docs.aws.amazon.com/amazonglacier/latest/dev/checksum-calculations.html
1055 :param body: Any file like object. This has the same constraints as
1056 the ``body`` param in calculate_sha256
1058 :rtype: str
1059 :returns: The hex version of the calculated tree hash
1061 """
1062 chunks = []
1063 required_chunk_size = 1024 * 1024
1064 sha256 = hashlib.sha256
1065 for chunk in iter(lambda: body.read(required_chunk_size), b''):
1066 chunks.append(sha256(chunk).digest())
1067 if not chunks:
1068 return sha256(b'').hexdigest()
1069 while len(chunks) > 1:
1070 new_chunks = []
1071 for first, second in _in_pairs(chunks):
1072 if second is not None:
1073 new_chunks.append(sha256(first + second).digest())
1074 else:
1075 # We're at the end of the list and there's no pair left.
1076 new_chunks.append(first)
1077 chunks = new_chunks
1078 return binascii.hexlify(chunks[0]).decode('ascii')
1081def _in_pairs(iterable):
1082 # Creates iterator that iterates over the list in pairs:
1083 # for a, b in _in_pairs([0, 1, 2, 3, 4]):
1084 # print(a, b)
1085 #
1086 # will print:
1087 # 0, 1
1088 # 2, 3
1089 # 4, None
1090 shared_iter = iter(iterable)
1091 # Note that zip_longest is a compat import that uses
1092 # the itertools izip_longest. This creates an iterator,
1093 # this call below does _not_ immediately create the list
1094 # of pairs.
1095 return zip_longest(shared_iter, shared_iter)
1098class CachedProperty:
1099 """A read only property that caches the initially computed value.
1101 This descriptor will only call the provided ``fget`` function once.
1102 Subsequent access to this property will return the cached value.
1104 """
1106 def __init__(self, fget):
1107 self._fget = fget
1109 def __get__(self, obj, cls):
1110 if obj is None:
1111 return self
1112 else:
1113 computed_value = self._fget(obj)
1114 obj.__dict__[self._fget.__name__] = computed_value
1115 return computed_value
1118class ArgumentGenerator:
1119 """Generate sample input based on a shape model.
1121 This class contains a ``generate_skeleton`` method that will take
1122 an input/output shape (created from ``botocore.model``) and generate
1123 a sample dictionary corresponding to the input/output shape.
1125 The specific values used are place holder values. For strings either an
1126 empty string or the member name can be used, for numbers 0 or 0.0 is used.
1127 The intended usage of this class is to generate the *shape* of the input
1128 structure.
1130 This can be useful for operations that have complex input shapes.
1131 This allows a user to just fill in the necessary data instead of
1132 worrying about the specific structure of the input arguments.
1134 Example usage::
1136 s = botocore.session.get_session()
1137 ddb = s.get_service_model('dynamodb')
1138 arg_gen = ArgumentGenerator()
1139 sample_input = arg_gen.generate_skeleton(
1140 ddb.operation_model('CreateTable').input_shape)
1141 print("Sample input for dynamodb.CreateTable: %s" % sample_input)
1143 """
1145 def __init__(self, use_member_names=False):
1146 self._use_member_names = use_member_names
1148 def generate_skeleton(self, shape):
1149 """Generate a sample input.
1151 :type shape: ``botocore.model.Shape``
1152 :param shape: The input shape.
1154 :return: The generated skeleton input corresponding to the
1155 provided input shape.
1157 """
1158 stack = []
1159 return self._generate_skeleton(shape, stack)
1161 def _generate_skeleton(self, shape, stack, name=''):
1162 stack.append(shape.name)
1163 try:
1164 if shape.type_name == 'structure':
1165 return self._generate_type_structure(shape, stack)
1166 elif shape.type_name == 'list':
1167 return self._generate_type_list(shape, stack)
1168 elif shape.type_name == 'map':
1169 return self._generate_type_map(shape, stack)
1170 elif shape.type_name == 'string':
1171 if self._use_member_names:
1172 return name
1173 if shape.enum:
1174 return random.choice(shape.enum)
1175 return ''
1176 elif shape.type_name in ['integer', 'long']:
1177 return 0
1178 elif shape.type_name in ['float', 'double']:
1179 return 0.0
1180 elif shape.type_name == 'boolean':
1181 return True
1182 elif shape.type_name == 'timestamp':
1183 return datetime.datetime(1970, 1, 1, 0, 0, 0)
1184 finally:
1185 stack.pop()
1187 def _generate_type_structure(self, shape, stack):
1188 if stack.count(shape.name) > 1:
1189 return {}
1190 skeleton = OrderedDict()
1191 for member_name, member_shape in shape.members.items():
1192 skeleton[member_name] = self._generate_skeleton(
1193 member_shape, stack, name=member_name
1194 )
1195 return skeleton
1197 def _generate_type_list(self, shape, stack):
1198 # For list elements we've arbitrarily decided to
1199 # return two elements for the skeleton list.
1200 name = ''
1201 if self._use_member_names:
1202 name = shape.member.name
1203 return [
1204 self._generate_skeleton(shape.member, stack, name),
1205 ]
1207 def _generate_type_map(self, shape, stack):
1208 key_shape = shape.key
1209 value_shape = shape.value
1210 assert key_shape.type_name == 'string'
1211 return OrderedDict(
1212 [
1213 ('KeyName', self._generate_skeleton(value_shape, stack)),
1214 ]
1215 )
1218def is_valid_ipv6_endpoint_url(endpoint_url):
1219 if UNSAFE_URL_CHARS.intersection(endpoint_url):
1220 return False
1221 hostname = f'[{urlparse(endpoint_url).hostname}]'
1222 return IPV6_ADDRZ_RE.match(hostname) is not None
1225def is_valid_ipv4_endpoint_url(endpoint_url):
1226 hostname = urlparse(endpoint_url).hostname
1227 return IPV4_RE.match(hostname) is not None
1230def is_valid_endpoint_url(endpoint_url):
1231 """Verify the endpoint_url is valid.
1233 :type endpoint_url: string
1234 :param endpoint_url: An endpoint_url. Must have at least a scheme
1235 and a hostname.
1237 :return: True if the endpoint url is valid. False otherwise.
1239 """
1240 # post-bpo-43882 urlsplit() strips unsafe characters from URL, causing
1241 # it to pass hostname validation below. Detect them early to fix that.
1242 if UNSAFE_URL_CHARS.intersection(endpoint_url):
1243 return False
1244 parts = urlsplit(endpoint_url)
1245 hostname = parts.hostname
1246 if hostname is None:
1247 return False
1248 if len(hostname) > 255:
1249 return False
1250 if hostname[-1] == ".":
1251 hostname = hostname[:-1]
1252 allowed = re.compile(
1253 r"^((?!-)[A-Z\d-]{1,63}(?<!-)\.)*((?!-)[A-Z\d-]{1,63}(?<!-))$",
1254 re.IGNORECASE,
1255 )
1256 return allowed.match(hostname)
1259def is_valid_uri(endpoint_url):
1260 return is_valid_endpoint_url(endpoint_url) or is_valid_ipv6_endpoint_url(
1261 endpoint_url
1262 )
1265def validate_region_name(region_name):
1266 """Provided region_name must be a valid host label."""
1267 if region_name is None:
1268 return
1269 valid_host_label = re.compile(r'^(?![0-9]+$)(?!-)[a-zA-Z0-9-]{,63}(?<!-)$')
1270 valid = valid_host_label.match(region_name)
1271 if not valid:
1272 raise InvalidRegionError(region_name=region_name)
1275def check_dns_name(bucket_name):
1276 """
1277 Check to see if the ``bucket_name`` complies with the
1278 restricted DNS naming conventions necessary to allow
1279 access via virtual-hosting style.
1281 Even though "." characters are perfectly valid in this DNS
1282 naming scheme, we are going to punt on any name containing a
1283 "." character because these will cause SSL cert validation
1284 problems if we try to use virtual-hosting style addressing.
1285 """
1286 if '.' in bucket_name:
1287 return False
1288 n = len(bucket_name)
1289 if n < 3 or n > 63:
1290 # Wrong length
1291 return False
1292 match = LABEL_RE.match(bucket_name)
1293 if match is None or match.end() != len(bucket_name):
1294 return False
1295 return True
1298def fix_s3_host(
1299 request,
1300 signature_version,
1301 region_name,
1302 default_endpoint_url=None,
1303 **kwargs,
1304):
1305 """
1306 This handler looks at S3 requests just before they are signed.
1307 If there is a bucket name on the path (true for everything except
1308 ListAllBuckets) it checks to see if that bucket name conforms to
1309 the DNS naming conventions. If it does, it alters the request to
1310 use ``virtual hosting`` style addressing rather than ``path-style``
1311 addressing.
1313 """
1314 if request.context.get('use_global_endpoint', False):
1315 default_endpoint_url = 's3.amazonaws.com'
1316 try:
1317 switch_to_virtual_host_style(
1318 request, signature_version, default_endpoint_url
1319 )
1320 except InvalidDNSNameError as e:
1321 bucket_name = e.kwargs['bucket_name']
1322 logger.debug(
1323 'Not changing URI, bucket is not DNS compatible: %s', bucket_name
1324 )
1327def switch_to_virtual_host_style(
1328 request, signature_version, default_endpoint_url=None, **kwargs
1329):
1330 """
1331 This is a handler to force virtual host style s3 addressing no matter
1332 the signature version (which is taken in consideration for the default
1333 case). If the bucket is not DNS compatible an InvalidDNSName is thrown.
1335 :param request: A AWSRequest object that is about to be sent.
1336 :param signature_version: The signature version to sign with
1337 :param default_endpoint_url: The endpoint to use when switching to a
1338 virtual style. If None is supplied, the virtual host will be
1339 constructed from the url of the request.
1340 """
1341 if request.auth_path is not None:
1342 # The auth_path has already been applied (this may be a
1343 # retried request). We don't need to perform this
1344 # customization again.
1345 return
1346 elif _is_get_bucket_location_request(request):
1347 # For the GetBucketLocation response, we should not be using
1348 # the virtual host style addressing so we can avoid any sigv4
1349 # issues.
1350 logger.debug(
1351 "Request is GetBucketLocation operation, not checking "
1352 "for DNS compatibility."
1353 )
1354 return
1355 parts = urlsplit(request.url)
1356 request.auth_path = parts.path
1357 path_parts = parts.path.split('/')
1359 # Retrieve what the endpoint we will be prepending the bucket name to.
1360 if default_endpoint_url is None:
1361 default_endpoint_url = parts.netloc
1363 if len(path_parts) > 1:
1364 bucket_name = path_parts[1]
1365 if not bucket_name:
1366 # If the bucket name is empty we should not be checking for
1367 # dns compatibility.
1368 return
1369 logger.debug('Checking for DNS compatible bucket for: %s', request.url)
1370 if check_dns_name(bucket_name):
1371 # If the operation is on a bucket, the auth_path must be
1372 # terminated with a '/' character.
1373 if len(path_parts) == 2:
1374 if request.auth_path[-1] != '/':
1375 request.auth_path += '/'
1376 path_parts.remove(bucket_name)
1377 # At the very least the path must be a '/', such as with the
1378 # CreateBucket operation when DNS style is being used. If this
1379 # is not used you will get an empty path which is incorrect.
1380 path = '/'.join(path_parts) or '/'
1381 global_endpoint = default_endpoint_url
1382 host = bucket_name + '.' + global_endpoint
1383 new_tuple = (parts.scheme, host, path, parts.query, '')
1384 new_uri = urlunsplit(new_tuple)
1385 request.url = new_uri
1386 logger.debug('URI updated to: %s', new_uri)
1387 else:
1388 raise InvalidDNSNameError(bucket_name=bucket_name)
1391def _is_get_bucket_location_request(request):
1392 return request.url.endswith('?location')
1395def instance_cache(func):
1396 """Method decorator for caching method calls to a single instance.
1398 **This is not a general purpose caching decorator.**
1400 In order to use this, you *must* provide an ``_instance_cache``
1401 attribute on the instance.
1403 This decorator is used to cache method calls. The cache is only
1404 scoped to a single instance though such that multiple instances
1405 will maintain their own cache. In order to keep things simple,
1406 this decorator requires that you provide an ``_instance_cache``
1407 attribute on your instance.
1409 """
1410 func_name = func.__name__
1412 @functools.wraps(func)
1413 def _cache_guard(self, *args, **kwargs):
1414 cache_key = (func_name, args)
1415 if kwargs:
1416 kwarg_items = tuple(sorted(kwargs.items()))
1417 cache_key = (func_name, args, kwarg_items)
1418 result = self._instance_cache.get(cache_key)
1419 if result is not None:
1420 return result
1421 result = func(self, *args, **kwargs)
1422 self._instance_cache[cache_key] = result
1423 return result
1425 return _cache_guard
1428def switch_host_s3_accelerate(request, operation_name, **kwargs):
1429 """Switches the current s3 endpoint with an S3 Accelerate endpoint"""
1431 # Note that when registered the switching of the s3 host happens
1432 # before it gets changed to virtual. So we are not concerned with ensuring
1433 # that the bucket name is translated to the virtual style here and we
1434 # can hard code the Accelerate endpoint.
1435 parts = urlsplit(request.url).netloc.split('.')
1436 parts = [p for p in parts if p in S3_ACCELERATE_WHITELIST]
1437 endpoint = 'https://s3-accelerate.'
1438 if len(parts) > 0:
1439 endpoint += '.'.join(parts) + '.'
1440 endpoint += 'amazonaws.com'
1442 if operation_name in ['ListBuckets', 'CreateBucket', 'DeleteBucket']:
1443 return
1444 _switch_hosts(request, endpoint, use_new_scheme=False)
1447def switch_host_with_param(request, param_name):
1448 """Switches the host using a parameter value from a JSON request body"""
1449 request_json = json.loads(request.data.decode('utf-8'))
1450 if request_json.get(param_name):
1451 new_endpoint = request_json[param_name]
1452 _switch_hosts(request, new_endpoint)
1455def _switch_hosts(request, new_endpoint, use_new_scheme=True):
1456 final_endpoint = _get_new_endpoint(
1457 request.url, new_endpoint, use_new_scheme
1458 )
1459 request.url = final_endpoint
1462def _get_new_endpoint(original_endpoint, new_endpoint, use_new_scheme=True):
1463 new_endpoint_components = urlsplit(new_endpoint)
1464 original_endpoint_components = urlsplit(original_endpoint)
1465 scheme = original_endpoint_components.scheme
1466 if use_new_scheme:
1467 scheme = new_endpoint_components.scheme
1468 final_endpoint_components = (
1469 scheme,
1470 new_endpoint_components.netloc,
1471 original_endpoint_components.path,
1472 original_endpoint_components.query,
1473 '',
1474 )
1475 final_endpoint = urlunsplit(final_endpoint_components)
1476 logger.debug(f'Updating URI from {original_endpoint} to {final_endpoint}')
1477 return final_endpoint
1480def deep_merge(base, extra):
1481 """Deeply two dictionaries, overriding existing keys in the base.
1483 :param base: The base dictionary which will be merged into.
1484 :param extra: The dictionary to merge into the base. Keys from this
1485 dictionary will take precedence.
1486 """
1487 for key in extra:
1488 # If the key represents a dict on both given dicts, merge the sub-dicts
1489 if (
1490 key in base
1491 and isinstance(base[key], dict)
1492 and isinstance(extra[key], dict)
1493 ):
1494 deep_merge(base[key], extra[key])
1495 continue
1497 # Otherwise, set the key on the base to be the value of the extra.
1498 base[key] = extra[key]
1501def hyphenize_service_id(service_id):
1502 """Translate the form used for event emitters.
1504 :param service_id: The service_id to convert.
1505 """
1506 return service_id.replace(' ', '-').lower()
1509class S3RegionRedirectorv2:
1510 """Updated version of S3RegionRedirector for use when
1511 EndpointRulesetResolver is in use for endpoint resolution.
1513 This class is considered private and subject to abrupt breaking changes or
1514 removal without prior announcement. Please do not use it directly.
1515 """
1517 def __init__(self, endpoint_bridge, client, cache=None):
1518 self._cache = cache or {}
1519 self._client = weakref.proxy(client)
1521 def register(self, event_emitter=None):
1522 logger.debug('Registering S3 region redirector handler')
1523 emitter = event_emitter or self._client.meta.events
1524 emitter.register('needs-retry.s3', self.redirect_from_error)
1525 emitter.register(
1526 'before-parameter-build.s3', self.annotate_request_context
1527 )
1528 emitter.register(
1529 'before-endpoint-resolution.s3', self.redirect_from_cache
1530 )
1532 def redirect_from_error(self, request_dict, response, operation, **kwargs):
1533 """
1534 An S3 request sent to the wrong region will return an error that
1535 contains the endpoint the request should be sent to. This handler
1536 will add the redirect information to the signing context and then
1537 redirect the request.
1538 """
1539 if response is None:
1540 # This could be none if there was a ConnectionError or other
1541 # transport error.
1542 return
1544 redirect_ctx = request_dict.get('context', {}).get('s3_redirect', {})
1545 if ArnParser.is_arn(redirect_ctx.get('bucket')):
1546 logger.debug(
1547 'S3 request was previously for an Accesspoint ARN, not '
1548 'redirecting.'
1549 )
1550 return
1552 if redirect_ctx.get('redirected'):
1553 logger.debug(
1554 'S3 request was previously redirected, not redirecting.'
1555 )
1556 return
1558 error = response[1].get('Error', {})
1559 error_code = error.get('Code')
1560 response_metadata = response[1].get('ResponseMetadata', {})
1562 # We have to account for 400 responses because
1563 # if we sign a Head* request with the wrong region,
1564 # we'll get a 400 Bad Request but we won't get a
1565 # body saying it's an "AuthorizationHeaderMalformed".
1566 is_special_head_object = (
1567 error_code in ('301', '400') and operation.name == 'HeadObject'
1568 )
1569 is_special_head_bucket = (
1570 error_code in ('301', '400')
1571 and operation.name == 'HeadBucket'
1572 and 'x-amz-bucket-region'
1573 in response_metadata.get('HTTPHeaders', {})
1574 )
1575 is_wrong_signing_region = (
1576 error_code == 'AuthorizationHeaderMalformed' and 'Region' in error
1577 )
1578 is_redirect_status = response[0] is not None and response[
1579 0
1580 ].status_code in (301, 302, 307)
1581 is_permanent_redirect = error_code == 'PermanentRedirect'
1582 if not any(
1583 [
1584 is_special_head_object,
1585 is_wrong_signing_region,
1586 is_permanent_redirect,
1587 is_special_head_bucket,
1588 is_redirect_status,
1589 ]
1590 ):
1591 return
1593 bucket = request_dict['context']['s3_redirect']['bucket']
1594 client_region = request_dict['context'].get('client_region')
1595 new_region = self.get_bucket_region(bucket, response)
1597 if new_region is None:
1598 logger.debug(
1599 "S3 client configured for region %s but the bucket %s is not "
1600 "in that region and the proper region could not be "
1601 "automatically determined." % (client_region, bucket)
1602 )
1603 return
1605 logger.debug(
1606 "S3 client configured for region %s but the bucket %s is in region"
1607 " %s; Please configure the proper region to avoid multiple "
1608 "unnecessary redirects and signing attempts."
1609 % (client_region, bucket, new_region)
1610 )
1611 # Adding the new region to _cache will make construct_endpoint() to
1612 # use the new region as value for the AWS::Region builtin parameter.
1613 self._cache[bucket] = new_region
1615 # Re-resolve endpoint with new region and modify request_dict with
1616 # the new URL, auth scheme, and signing context.
1617 ep_resolver = self._client._ruleset_resolver
1618 ep_info = ep_resolver.construct_endpoint(
1619 operation_model=operation,
1620 call_args=request_dict['context']['s3_redirect']['params'],
1621 request_context=request_dict['context'],
1622 )
1623 request_dict['url'] = self.set_request_url(
1624 request_dict['url'], ep_info.url
1625 )
1626 request_dict['context']['s3_redirect']['redirected'] = True
1627 auth_schemes = ep_info.properties.get('authSchemes')
1628 if auth_schemes is not None:
1629 auth_info = ep_resolver.auth_schemes_to_signing_ctx(auth_schemes)
1630 auth_type, signing_context = auth_info
1631 request_dict['context']['auth_type'] = auth_type
1632 request_dict['context']['signing'] = {
1633 **request_dict['context'].get('signing', {}),
1634 **signing_context,
1635 }
1637 # Return 0 so it doesn't wait to retry
1638 return 0
1640 def get_bucket_region(self, bucket, response):
1641 """
1642 There are multiple potential sources for the new region to redirect to,
1643 but they aren't all universally available for use. This will try to
1644 find region from response elements, but will fall back to calling
1645 HEAD on the bucket if all else fails.
1647 :param bucket: The bucket to find the region for. This is necessary if
1648 the region is not available in the error response.
1649 :param response: A response representing a service request that failed
1650 due to incorrect region configuration.
1651 """
1652 # First try to source the region from the headers.
1653 service_response = response[1]
1654 response_headers = service_response['ResponseMetadata']['HTTPHeaders']
1655 if 'x-amz-bucket-region' in response_headers:
1656 return response_headers['x-amz-bucket-region']
1658 # Next, check the error body
1659 region = service_response.get('Error', {}).get('Region', None)
1660 if region is not None:
1661 return region
1663 # Finally, HEAD the bucket. No other choice sadly.
1664 try:
1665 response = self._client.head_bucket(Bucket=bucket)
1666 headers = response['ResponseMetadata']['HTTPHeaders']
1667 except ClientError as e:
1668 headers = e.response['ResponseMetadata']['HTTPHeaders']
1670 region = headers.get('x-amz-bucket-region', None)
1671 return region
1673 def set_request_url(self, old_url, new_endpoint, **kwargs):
1674 """
1675 Splice a new endpoint into an existing URL. Note that some endpoints
1676 from the the endpoint provider have a path component which will be
1677 discarded by this function.
1678 """
1679 return _get_new_endpoint(old_url, new_endpoint, False)
1681 def redirect_from_cache(self, builtins, params, **kwargs):
1682 """
1683 If a bucket name has been redirected before, it is in the cache. This
1684 handler will update the AWS::Region endpoint resolver builtin param
1685 to use the region from cache instead of the client region to avoid the
1686 redirect.
1687 """
1688 bucket = params.get('Bucket')
1689 if bucket is not None and bucket in self._cache:
1690 new_region = self._cache.get(bucket)
1691 builtins['AWS::Region'] = new_region
1693 def annotate_request_context(self, params, context, **kwargs):
1694 """Store the bucket name in context for later use when redirecting.
1695 The bucket name may be an access point ARN or alias.
1696 """
1697 bucket = params.get('Bucket')
1698 context['s3_redirect'] = {
1699 'redirected': False,
1700 'bucket': bucket,
1701 'params': params,
1702 }
1705class S3RegionRedirector:
1706 """This handler has been replaced by S3RegionRedirectorv2. The original
1707 version remains in place for any third-party libraries that import it.
1708 """
1710 def __init__(self, endpoint_bridge, client, cache=None):
1711 self._endpoint_resolver = endpoint_bridge
1712 self._cache = cache
1713 if self._cache is None:
1714 self._cache = {}
1716 # This needs to be a weak ref in order to prevent memory leaks on
1717 # python 2.6
1718 self._client = weakref.proxy(client)
1720 warnings.warn(
1721 'The S3RegionRedirector class has been deprecated for a new '
1722 'internal replacement. A future version of botocore may remove '
1723 'this class.',
1724 category=FutureWarning,
1725 )
1727 def register(self, event_emitter=None):
1728 emitter = event_emitter or self._client.meta.events
1729 emitter.register('needs-retry.s3', self.redirect_from_error)
1730 emitter.register('before-call.s3', self.set_request_url)
1731 emitter.register('before-parameter-build.s3', self.redirect_from_cache)
1733 def redirect_from_error(self, request_dict, response, operation, **kwargs):
1734 """
1735 An S3 request sent to the wrong region will return an error that
1736 contains the endpoint the request should be sent to. This handler
1737 will add the redirect information to the signing context and then
1738 redirect the request.
1739 """
1740 if response is None:
1741 # This could be none if there was a ConnectionError or other
1742 # transport error.
1743 return
1745 if self._is_s3_accesspoint(request_dict.get('context', {})):
1746 logger.debug(
1747 'S3 request was previously to an accesspoint, not redirecting.'
1748 )
1749 return
1751 if request_dict.get('context', {}).get('s3_redirected'):
1752 logger.debug(
1753 'S3 request was previously redirected, not redirecting.'
1754 )
1755 return
1757 error = response[1].get('Error', {})
1758 error_code = error.get('Code')
1759 response_metadata = response[1].get('ResponseMetadata', {})
1761 # We have to account for 400 responses because
1762 # if we sign a Head* request with the wrong region,
1763 # we'll get a 400 Bad Request but we won't get a
1764 # body saying it's an "AuthorizationHeaderMalformed".
1765 is_special_head_object = (
1766 error_code in ('301', '400') and operation.name == 'HeadObject'
1767 )
1768 is_special_head_bucket = (
1769 error_code in ('301', '400')
1770 and operation.name == 'HeadBucket'
1771 and 'x-amz-bucket-region'
1772 in response_metadata.get('HTTPHeaders', {})
1773 )
1774 is_wrong_signing_region = (
1775 error_code == 'AuthorizationHeaderMalformed' and 'Region' in error
1776 )
1777 is_redirect_status = response[0] is not None and response[
1778 0
1779 ].status_code in (301, 302, 307)
1780 is_permanent_redirect = error_code == 'PermanentRedirect'
1781 if not any(
1782 [
1783 is_special_head_object,
1784 is_wrong_signing_region,
1785 is_permanent_redirect,
1786 is_special_head_bucket,
1787 is_redirect_status,
1788 ]
1789 ):
1790 return
1792 bucket = request_dict['context']['signing']['bucket']
1793 client_region = request_dict['context'].get('client_region')
1794 new_region = self.get_bucket_region(bucket, response)
1796 if new_region is None:
1797 logger.debug(
1798 "S3 client configured for region %s but the bucket %s is not "
1799 "in that region and the proper region could not be "
1800 "automatically determined." % (client_region, bucket)
1801 )
1802 return
1804 logger.debug(
1805 "S3 client configured for region %s but the bucket %s is in region"
1806 " %s; Please configure the proper region to avoid multiple "
1807 "unnecessary redirects and signing attempts."
1808 % (client_region, bucket, new_region)
1809 )
1810 endpoint = self._endpoint_resolver.resolve('s3', new_region)
1811 endpoint = endpoint['endpoint_url']
1813 signing_context = {
1814 'region': new_region,
1815 'bucket': bucket,
1816 'endpoint': endpoint,
1817 }
1818 request_dict['context']['signing'] = signing_context
1820 self._cache[bucket] = signing_context
1821 self.set_request_url(request_dict, request_dict['context'])
1823 request_dict['context']['s3_redirected'] = True
1825 # Return 0 so it doesn't wait to retry
1826 return 0
1828 def get_bucket_region(self, bucket, response):
1829 """
1830 There are multiple potential sources for the new region to redirect to,
1831 but they aren't all universally available for use. This will try to
1832 find region from response elements, but will fall back to calling
1833 HEAD on the bucket if all else fails.
1835 :param bucket: The bucket to find the region for. This is necessary if
1836 the region is not available in the error response.
1837 :param response: A response representing a service request that failed
1838 due to incorrect region configuration.
1839 """
1840 # First try to source the region from the headers.
1841 service_response = response[1]
1842 response_headers = service_response['ResponseMetadata']['HTTPHeaders']
1843 if 'x-amz-bucket-region' in response_headers:
1844 return response_headers['x-amz-bucket-region']
1846 # Next, check the error body
1847 region = service_response.get('Error', {}).get('Region', None)
1848 if region is not None:
1849 return region
1851 # Finally, HEAD the bucket. No other choice sadly.
1852 try:
1853 response = self._client.head_bucket(Bucket=bucket)
1854 headers = response['ResponseMetadata']['HTTPHeaders']
1855 except ClientError as e:
1856 headers = e.response['ResponseMetadata']['HTTPHeaders']
1858 region = headers.get('x-amz-bucket-region', None)
1859 return region
1861 def set_request_url(self, params, context, **kwargs):
1862 endpoint = context.get('signing', {}).get('endpoint', None)
1863 if endpoint is not None:
1864 params['url'] = _get_new_endpoint(params['url'], endpoint, False)
1866 def redirect_from_cache(self, params, context, **kwargs):
1867 """
1868 This handler retrieves a given bucket's signing context from the cache
1869 and adds it into the request context.
1870 """
1871 if self._is_s3_accesspoint(context):
1872 return
1873 bucket = params.get('Bucket')
1874 signing_context = self._cache.get(bucket)
1875 if signing_context is not None:
1876 context['signing'] = signing_context
1877 else:
1878 context['signing'] = {'bucket': bucket}
1880 def _is_s3_accesspoint(self, context):
1881 return 's3_accesspoint' in context
1884class InvalidArnException(ValueError):
1885 pass
1888class ArnParser:
1889 def parse_arn(self, arn):
1890 arn_parts = arn.split(':', 5)
1891 if len(arn_parts) < 6:
1892 raise InvalidArnException(
1893 'Provided ARN: %s must be of the format: '
1894 'arn:partition:service:region:account:resource' % arn
1895 )
1896 return {
1897 'partition': arn_parts[1],
1898 'service': arn_parts[2],
1899 'region': arn_parts[3],
1900 'account': arn_parts[4],
1901 'resource': arn_parts[5],
1902 }
1904 @staticmethod
1905 def is_arn(value):
1906 if not isinstance(value, str) or not value.startswith('arn:'):
1907 return False
1908 arn_parser = ArnParser()
1909 try:
1910 arn_parser.parse_arn(value)
1911 return True
1912 except InvalidArnException:
1913 return False
1916class S3ArnParamHandler:
1917 _RESOURCE_REGEX = re.compile(
1918 r'^(?P<resource_type>accesspoint|outpost)[/:](?P<resource_name>.+)$'
1919 )
1920 _OUTPOST_RESOURCE_REGEX = re.compile(
1921 r'^(?P<outpost_name>[a-zA-Z0-9\-]{1,63})[/:]accesspoint[/:]'
1922 r'(?P<accesspoint_name>[a-zA-Z0-9\-]{1,63}$)'
1923 )
1924 _BLACKLISTED_OPERATIONS = ['CreateBucket']
1926 def __init__(self, arn_parser=None):
1927 self._arn_parser = arn_parser
1928 if arn_parser is None:
1929 self._arn_parser = ArnParser()
1931 def register(self, event_emitter):
1932 event_emitter.register('before-parameter-build.s3', self.handle_arn)
1934 def handle_arn(self, params, model, context, **kwargs):
1935 if model.name in self._BLACKLISTED_OPERATIONS:
1936 return
1937 arn_details = self._get_arn_details_from_bucket_param(params)
1938 if arn_details is None:
1939 return
1940 if arn_details['resource_type'] == 'accesspoint':
1941 self._store_accesspoint(params, context, arn_details)
1942 elif arn_details['resource_type'] == 'outpost':
1943 self._store_outpost(params, context, arn_details)
1945 def _get_arn_details_from_bucket_param(self, params):
1946 if 'Bucket' in params:
1947 try:
1948 arn = params['Bucket']
1949 arn_details = self._arn_parser.parse_arn(arn)
1950 self._add_resource_type_and_name(arn, arn_details)
1951 return arn_details
1952 except InvalidArnException:
1953 pass
1954 return None
1956 def _add_resource_type_and_name(self, arn, arn_details):
1957 match = self._RESOURCE_REGEX.match(arn_details['resource'])
1958 if match:
1959 arn_details['resource_type'] = match.group('resource_type')
1960 arn_details['resource_name'] = match.group('resource_name')
1961 else:
1962 raise UnsupportedS3ArnError(arn=arn)
1964 def _store_accesspoint(self, params, context, arn_details):
1965 # Ideally the access-point would be stored as a parameter in the
1966 # request where the serializer would then know how to serialize it,
1967 # but access-points are not modeled in S3 operations so it would fail
1968 # validation. Instead, we set the access-point to the bucket parameter
1969 # to have some value set when serializing the request and additional
1970 # information on the context from the arn to use in forming the
1971 # access-point endpoint.
1972 params['Bucket'] = arn_details['resource_name']
1973 context['s3_accesspoint'] = {
1974 'name': arn_details['resource_name'],
1975 'account': arn_details['account'],
1976 'partition': arn_details['partition'],
1977 'region': arn_details['region'],
1978 'service': arn_details['service'],
1979 }
1981 def _store_outpost(self, params, context, arn_details):
1982 resource_name = arn_details['resource_name']
1983 match = self._OUTPOST_RESOURCE_REGEX.match(resource_name)
1984 if not match:
1985 raise UnsupportedOutpostResourceError(resource_name=resource_name)
1986 # Because we need to set the bucket name to something to pass
1987 # validation we're going to use the access point name to be consistent
1988 # with normal access point arns.
1989 accesspoint_name = match.group('accesspoint_name')
1990 params['Bucket'] = accesspoint_name
1991 context['s3_accesspoint'] = {
1992 'outpost_name': match.group('outpost_name'),
1993 'name': accesspoint_name,
1994 'account': arn_details['account'],
1995 'partition': arn_details['partition'],
1996 'region': arn_details['region'],
1997 'service': arn_details['service'],
1998 }
2001class S3EndpointSetter:
2002 _DEFAULT_PARTITION = 'aws'
2003 _DEFAULT_DNS_SUFFIX = 'amazonaws.com'
2005 def __init__(
2006 self,
2007 endpoint_resolver,
2008 region=None,
2009 s3_config=None,
2010 endpoint_url=None,
2011 partition=None,
2012 use_fips_endpoint=False,
2013 ):
2014 # This is calling the endpoint_resolver in regions.py
2015 self._endpoint_resolver = endpoint_resolver
2016 self._region = region
2017 self._s3_config = s3_config
2018 self._use_fips_endpoint = use_fips_endpoint
2019 if s3_config is None:
2020 self._s3_config = {}
2021 self._endpoint_url = endpoint_url
2022 self._partition = partition
2023 if partition is None:
2024 self._partition = self._DEFAULT_PARTITION
2026 def register(self, event_emitter):
2027 event_emitter.register('before-sign.s3', self.set_endpoint)
2028 event_emitter.register('choose-signer.s3', self.set_signer)
2029 event_emitter.register(
2030 'before-call.s3.WriteGetObjectResponse',
2031 self.update_endpoint_to_s3_object_lambda,
2032 )
2034 def update_endpoint_to_s3_object_lambda(self, params, context, **kwargs):
2035 if self._use_accelerate_endpoint:
2036 raise UnsupportedS3ConfigurationError(
2037 msg='S3 client does not support accelerate endpoints for S3 Object Lambda operations',
2038 )
2040 self._override_signing_name(context, 's3-object-lambda')
2041 if self._endpoint_url:
2042 # Only update the url if an explicit url was not provided
2043 return
2045 resolver = self._endpoint_resolver
2046 # Constructing endpoints as s3-object-lambda as region
2047 resolved = resolver.construct_endpoint(
2048 's3-object-lambda', self._region
2049 )
2051 # Ideally we would be able to replace the endpoint before
2052 # serialization but there's no event to do that currently
2053 # host_prefix is all the arn/bucket specs
2054 new_endpoint = 'https://{host_prefix}{hostname}'.format(
2055 host_prefix=params['host_prefix'],
2056 hostname=resolved['hostname'],
2057 )
2059 params['url'] = _get_new_endpoint(params['url'], new_endpoint, False)
2061 def set_endpoint(self, request, **kwargs):
2062 if self._use_accesspoint_endpoint(request):
2063 self._validate_accesspoint_supported(request)
2064 self._validate_fips_supported(request)
2065 self._validate_global_regions(request)
2066 region_name = self._resolve_region_for_accesspoint_endpoint(
2067 request
2068 )
2069 self._resolve_signing_name_for_accesspoint_endpoint(request)
2070 self._switch_to_accesspoint_endpoint(request, region_name)
2071 return
2072 if self._use_accelerate_endpoint:
2073 if self._use_fips_endpoint:
2074 raise UnsupportedS3ConfigurationError(
2075 msg=(
2076 'Client is configured to use the FIPS psuedo region '
2077 'for "%s", but S3 Accelerate does not have any FIPS '
2078 'compatible endpoints.' % (self._region)
2079 )
2080 )
2081 switch_host_s3_accelerate(request=request, **kwargs)
2082 if self._s3_addressing_handler:
2083 self._s3_addressing_handler(request=request, **kwargs)
2085 def _use_accesspoint_endpoint(self, request):
2086 return 's3_accesspoint' in request.context
2088 def _validate_fips_supported(self, request):
2089 if not self._use_fips_endpoint:
2090 return
2091 if 'fips' in request.context['s3_accesspoint']['region']:
2092 raise UnsupportedS3AccesspointConfigurationError(
2093 msg={'Invalid ARN, FIPS region not allowed in ARN.'}
2094 )
2095 if 'outpost_name' in request.context['s3_accesspoint']:
2096 raise UnsupportedS3AccesspointConfigurationError(
2097 msg=(
2098 'Client is configured to use the FIPS psuedo-region "%s", '
2099 'but outpost ARNs do not support FIPS endpoints.'
2100 % (self._region)
2101 )
2102 )
2103 # Transforming psuedo region to actual region
2104 accesspoint_region = request.context['s3_accesspoint']['region']
2105 if accesspoint_region != self._region:
2106 if not self._s3_config.get('use_arn_region', True):
2107 # TODO: Update message to reflect use_arn_region
2108 # is not set
2109 raise UnsupportedS3AccesspointConfigurationError(
2110 msg=(
2111 'Client is configured to use the FIPS psuedo-region '
2112 'for "%s", but the access-point ARN provided is for '
2113 'the "%s" region. For clients using a FIPS '
2114 'psuedo-region calls to access-point ARNs in another '
2115 'region are not allowed.'
2116 % (self._region, accesspoint_region)
2117 )
2118 )
2120 def _validate_global_regions(self, request):
2121 if self._s3_config.get('use_arn_region', True):
2122 return
2123 if self._region in ['aws-global', 's3-external-1']:
2124 raise UnsupportedS3AccesspointConfigurationError(
2125 msg=(
2126 'Client is configured to use the global psuedo-region '
2127 '"%s". When providing access-point ARNs a regional '
2128 'endpoint must be specified.' % self._region
2129 )
2130 )
2132 def _validate_accesspoint_supported(self, request):
2133 if self._use_accelerate_endpoint:
2134 raise UnsupportedS3AccesspointConfigurationError(
2135 msg=(
2136 'Client does not support s3 accelerate configuration '
2137 'when an access-point ARN is specified.'
2138 )
2139 )
2140 request_partition = request.context['s3_accesspoint']['partition']
2141 if request_partition != self._partition:
2142 raise UnsupportedS3AccesspointConfigurationError(
2143 msg=(
2144 'Client is configured for "%s" partition, but access-point'
2145 ' ARN provided is for "%s" partition. The client and '
2146 ' access-point partition must be the same.'
2147 % (self._partition, request_partition)
2148 )
2149 )
2150 s3_service = request.context['s3_accesspoint'].get('service')
2151 if s3_service == 's3-object-lambda' and self._s3_config.get(
2152 'use_dualstack_endpoint'
2153 ):
2154 raise UnsupportedS3AccesspointConfigurationError(
2155 msg=(
2156 'Client does not support s3 dualstack configuration '
2157 'when an S3 Object Lambda access point ARN is specified.'
2158 )
2159 )
2160 outpost_name = request.context['s3_accesspoint'].get('outpost_name')
2161 if outpost_name and self._s3_config.get('use_dualstack_endpoint'):
2162 raise UnsupportedS3AccesspointConfigurationError(
2163 msg=(
2164 'Client does not support s3 dualstack configuration '
2165 'when an outpost ARN is specified.'
2166 )
2167 )
2168 self._validate_mrap_s3_config(request)
2170 def _validate_mrap_s3_config(self, request):
2171 if not is_global_accesspoint(request.context):
2172 return
2173 if self._s3_config.get('s3_disable_multiregion_access_points'):
2174 raise UnsupportedS3AccesspointConfigurationError(
2175 msg=(
2176 'Invalid configuration, Multi-Region Access Point '
2177 'ARNs are disabled.'
2178 )
2179 )
2180 elif self._s3_config.get('use_dualstack_endpoint'):
2181 raise UnsupportedS3AccesspointConfigurationError(
2182 msg=(
2183 'Client does not support s3 dualstack configuration '
2184 'when a Multi-Region Access Point ARN is specified.'
2185 )
2186 )
2188 def _resolve_region_for_accesspoint_endpoint(self, request):
2189 if is_global_accesspoint(request.context):
2190 # Requests going to MRAP endpoints MUST be set to any (*) region.
2191 self._override_signing_region(request, '*')
2192 elif self._s3_config.get('use_arn_region', True):
2193 accesspoint_region = request.context['s3_accesspoint']['region']
2194 # If we are using the region from the access point,
2195 # we will also want to make sure that we set it as the
2196 # signing region as well
2197 self._override_signing_region(request, accesspoint_region)
2198 return accesspoint_region
2199 return self._region
2201 def set_signer(self, context, **kwargs):
2202 if is_global_accesspoint(context):
2203 if HAS_CRT:
2204 return 's3v4a'
2205 else:
2206 raise MissingDependencyException(
2207 msg="Using S3 with an MRAP arn requires an additional "
2208 "dependency. You will need to pip install "
2209 "botocore[crt] before proceeding."
2210 )
2212 def _resolve_signing_name_for_accesspoint_endpoint(self, request):
2213 accesspoint_service = request.context['s3_accesspoint']['service']
2214 self._override_signing_name(request.context, accesspoint_service)
2216 def _switch_to_accesspoint_endpoint(self, request, region_name):
2217 original_components = urlsplit(request.url)
2218 accesspoint_endpoint = urlunsplit(
2219 (
2220 original_components.scheme,
2221 self._get_netloc(request.context, region_name),
2222 self._get_accesspoint_path(
2223 original_components.path, request.context
2224 ),
2225 original_components.query,
2226 '',
2227 )
2228 )
2229 logger.debug(
2230 f'Updating URI from {request.url} to {accesspoint_endpoint}'
2231 )
2232 request.url = accesspoint_endpoint
2234 def _get_netloc(self, request_context, region_name):
2235 if is_global_accesspoint(request_context):
2236 return self._get_mrap_netloc(request_context)
2237 else:
2238 return self._get_accesspoint_netloc(request_context, region_name)
2240 def _get_mrap_netloc(self, request_context):
2241 s3_accesspoint = request_context['s3_accesspoint']
2242 region_name = 's3-global'
2243 mrap_netloc_components = [s3_accesspoint['name']]
2244 if self._endpoint_url:
2245 endpoint_url_netloc = urlsplit(self._endpoint_url).netloc
2246 mrap_netloc_components.append(endpoint_url_netloc)
2247 else:
2248 partition = s3_accesspoint['partition']
2249 mrap_netloc_components.extend(
2250 [
2251 'accesspoint',
2252 region_name,
2253 self._get_partition_dns_suffix(partition),
2254 ]
2255 )
2256 return '.'.join(mrap_netloc_components)
2258 def _get_accesspoint_netloc(self, request_context, region_name):
2259 s3_accesspoint = request_context['s3_accesspoint']
2260 accesspoint_netloc_components = [
2261 '{}-{}'.format(s3_accesspoint['name'], s3_accesspoint['account']),
2262 ]
2263 outpost_name = s3_accesspoint.get('outpost_name')
2264 if self._endpoint_url:
2265 if outpost_name:
2266 accesspoint_netloc_components.append(outpost_name)
2267 endpoint_url_netloc = urlsplit(self._endpoint_url).netloc
2268 accesspoint_netloc_components.append(endpoint_url_netloc)
2269 else:
2270 if outpost_name:
2271 outpost_host = [outpost_name, 's3-outposts']
2272 accesspoint_netloc_components.extend(outpost_host)
2273 elif s3_accesspoint['service'] == 's3-object-lambda':
2274 component = self._inject_fips_if_needed(
2275 's3-object-lambda', request_context
2276 )
2277 accesspoint_netloc_components.append(component)
2278 else:
2279 component = self._inject_fips_if_needed(
2280 's3-accesspoint', request_context
2281 )
2282 accesspoint_netloc_components.append(component)
2283 if self._s3_config.get('use_dualstack_endpoint'):
2284 accesspoint_netloc_components.append('dualstack')
2285 accesspoint_netloc_components.extend(
2286 [region_name, self._get_dns_suffix(region_name)]
2287 )
2288 return '.'.join(accesspoint_netloc_components)
2290 def _inject_fips_if_needed(self, component, request_context):
2291 if self._use_fips_endpoint:
2292 return '%s-fips' % component
2293 return component
2295 def _get_accesspoint_path(self, original_path, request_context):
2296 # The Bucket parameter was substituted with the access-point name as
2297 # some value was required in serializing the bucket name. Now that
2298 # we are making the request directly to the access point, we will
2299 # want to remove that access-point name from the path.
2300 name = request_context['s3_accesspoint']['name']
2301 # All S3 operations require at least a / in their path.
2302 return original_path.replace('/' + name, '', 1) or '/'
2304 def _get_partition_dns_suffix(self, partition_name):
2305 dns_suffix = self._endpoint_resolver.get_partition_dns_suffix(
2306 partition_name
2307 )
2308 if dns_suffix is None:
2309 dns_suffix = self._DEFAULT_DNS_SUFFIX
2310 return dns_suffix
2312 def _get_dns_suffix(self, region_name):
2313 resolved = self._endpoint_resolver.construct_endpoint(
2314 's3', region_name
2315 )
2316 dns_suffix = self._DEFAULT_DNS_SUFFIX
2317 if resolved and 'dnsSuffix' in resolved:
2318 dns_suffix = resolved['dnsSuffix']
2319 return dns_suffix
2321 def _override_signing_region(self, request, region_name):
2322 signing_context = request.context.get('signing', {})
2323 # S3SigV4Auth will use the context['signing']['region'] value to
2324 # sign with if present. This is used by the Bucket redirector
2325 # as well but we should be fine because the redirector is never
2326 # used in combination with the accesspoint setting logic.
2327 signing_context['region'] = region_name
2328 request.context['signing'] = signing_context
2330 def _override_signing_name(self, context, signing_name):
2331 signing_context = context.get('signing', {})
2332 # S3SigV4Auth will use the context['signing']['signing_name'] value to
2333 # sign with if present. This is used by the Bucket redirector
2334 # as well but we should be fine because the redirector is never
2335 # used in combination with the accesspoint setting logic.
2336 signing_context['signing_name'] = signing_name
2337 context['signing'] = signing_context
2339 @CachedProperty
2340 def _use_accelerate_endpoint(self):
2341 # Enable accelerate if the configuration is set to to true or the
2342 # endpoint being used matches one of the accelerate endpoints.
2344 # Accelerate has been explicitly configured.
2345 if self._s3_config.get('use_accelerate_endpoint'):
2346 return True
2348 # Accelerate mode is turned on automatically if an endpoint url is
2349 # provided that matches the accelerate scheme.
2350 if self._endpoint_url is None:
2351 return False
2353 # Accelerate is only valid for Amazon endpoints.
2354 netloc = urlsplit(self._endpoint_url).netloc
2355 if not netloc.endswith('amazonaws.com'):
2356 return False
2358 # The first part of the url should always be s3-accelerate.
2359 parts = netloc.split('.')
2360 if parts[0] != 's3-accelerate':
2361 return False
2363 # Url parts between 's3-accelerate' and 'amazonaws.com' which
2364 # represent different url features.
2365 feature_parts = parts[1:-2]
2367 # There should be no duplicate url parts.
2368 if len(feature_parts) != len(set(feature_parts)):
2369 return False
2371 # Remaining parts must all be in the whitelist.
2372 return all(p in S3_ACCELERATE_WHITELIST for p in feature_parts)
2374 @CachedProperty
2375 def _addressing_style(self):
2376 # Use virtual host style addressing if accelerate is enabled or if
2377 # the given endpoint url is an accelerate endpoint.
2378 if self._use_accelerate_endpoint:
2379 return 'virtual'
2381 # If a particular addressing style is configured, use it.
2382 configured_addressing_style = self._s3_config.get('addressing_style')
2383 if configured_addressing_style:
2384 return configured_addressing_style
2386 @CachedProperty
2387 def _s3_addressing_handler(self):
2388 # If virtual host style was configured, use it regardless of whether
2389 # or not the bucket looks dns compatible.
2390 if self._addressing_style == 'virtual':
2391 logger.debug("Using S3 virtual host style addressing.")
2392 return switch_to_virtual_host_style
2394 # If path style is configured, no additional steps are needed. If
2395 # endpoint_url was specified, don't default to virtual. We could
2396 # potentially default provided endpoint urls to virtual hosted
2397 # style, but for now it is avoided.
2398 if self._addressing_style == 'path' or self._endpoint_url is not None:
2399 logger.debug("Using S3 path style addressing.")
2400 return None
2402 logger.debug(
2403 "Defaulting to S3 virtual host style addressing with "
2404 "path style addressing fallback."
2405 )
2407 # By default, try to use virtual style with path fallback.
2408 return fix_s3_host
2411class S3ControlEndpointSetter:
2412 _DEFAULT_PARTITION = 'aws'
2413 _DEFAULT_DNS_SUFFIX = 'amazonaws.com'
2414 _HOST_LABEL_REGEX = re.compile(r'^[a-zA-Z0-9\-]{1,63}$')
2416 def __init__(
2417 self,
2418 endpoint_resolver,
2419 region=None,
2420 s3_config=None,
2421 endpoint_url=None,
2422 partition=None,
2423 use_fips_endpoint=False,
2424 ):
2425 self._endpoint_resolver = endpoint_resolver
2426 self._region = region
2427 self._s3_config = s3_config
2428 self._use_fips_endpoint = use_fips_endpoint
2429 if s3_config is None:
2430 self._s3_config = {}
2431 self._endpoint_url = endpoint_url
2432 self._partition = partition
2433 if partition is None:
2434 self._partition = self._DEFAULT_PARTITION
2436 def register(self, event_emitter):
2437 event_emitter.register('before-sign.s3-control', self.set_endpoint)
2439 def set_endpoint(self, request, **kwargs):
2440 if self._use_endpoint_from_arn_details(request):
2441 self._validate_endpoint_from_arn_details_supported(request)
2442 region_name = self._resolve_region_from_arn_details(request)
2443 self._resolve_signing_name_from_arn_details(request)
2444 self._resolve_endpoint_from_arn_details(request, region_name)
2445 self._add_headers_from_arn_details(request)
2446 elif self._use_endpoint_from_outpost_id(request):
2447 self._validate_outpost_redirection_valid(request)
2448 self._override_signing_name(request, 's3-outposts')
2449 new_netloc = self._construct_outpost_endpoint(self._region)
2450 self._update_request_netloc(request, new_netloc)
2452 def _use_endpoint_from_arn_details(self, request):
2453 return 'arn_details' in request.context
2455 def _use_endpoint_from_outpost_id(self, request):
2456 return 'outpost_id' in request.context
2458 def _validate_endpoint_from_arn_details_supported(self, request):
2459 if 'fips' in request.context['arn_details']['region']:
2460 raise UnsupportedS3ControlArnError(
2461 arn=request.context['arn_details']['original'],
2462 msg='Invalid ARN, FIPS region not allowed in ARN.',
2463 )
2464 if not self._s3_config.get('use_arn_region', False):
2465 arn_region = request.context['arn_details']['region']
2466 if arn_region != self._region:
2467 error_msg = (
2468 'The use_arn_region configuration is disabled but '
2469 'received arn for "%s" when the client is configured '
2470 'to use "%s"'
2471 ) % (arn_region, self._region)
2472 raise UnsupportedS3ControlConfigurationError(msg=error_msg)
2473 request_partion = request.context['arn_details']['partition']
2474 if request_partion != self._partition:
2475 raise UnsupportedS3ControlConfigurationError(
2476 msg=(
2477 'Client is configured for "%s" partition, but arn '
2478 'provided is for "%s" partition. The client and '
2479 'arn partition must be the same.'
2480 % (self._partition, request_partion)
2481 )
2482 )
2483 if self._s3_config.get('use_accelerate_endpoint'):
2484 raise UnsupportedS3ControlConfigurationError(
2485 msg='S3 control client does not support accelerate endpoints',
2486 )
2487 if 'outpost_name' in request.context['arn_details']:
2488 self._validate_outpost_redirection_valid(request)
2490 def _validate_outpost_redirection_valid(self, request):
2491 if self._s3_config.get('use_dualstack_endpoint'):
2492 raise UnsupportedS3ControlConfigurationError(
2493 msg=(
2494 'Client does not support s3 dualstack configuration '
2495 'when an outpost is specified.'
2496 )
2497 )
2499 def _resolve_region_from_arn_details(self, request):
2500 if self._s3_config.get('use_arn_region', False):
2501 arn_region = request.context['arn_details']['region']
2502 # If we are using the region from the expanded arn, we will also
2503 # want to make sure that we set it as the signing region as well
2504 self._override_signing_region(request, arn_region)
2505 return arn_region
2506 return self._region
2508 def _resolve_signing_name_from_arn_details(self, request):
2509 arn_service = request.context['arn_details']['service']
2510 self._override_signing_name(request, arn_service)
2511 return arn_service
2513 def _resolve_endpoint_from_arn_details(self, request, region_name):
2514 new_netloc = self._resolve_netloc_from_arn_details(
2515 request, region_name
2516 )
2517 self._update_request_netloc(request, new_netloc)
2519 def _update_request_netloc(self, request, new_netloc):
2520 original_components = urlsplit(request.url)
2521 arn_details_endpoint = urlunsplit(
2522 (
2523 original_components.scheme,
2524 new_netloc,
2525 original_components.path,
2526 original_components.query,
2527 '',
2528 )
2529 )
2530 logger.debug(
2531 f'Updating URI from {request.url} to {arn_details_endpoint}'
2532 )
2533 request.url = arn_details_endpoint
2535 def _resolve_netloc_from_arn_details(self, request, region_name):
2536 arn_details = request.context['arn_details']
2537 if 'outpost_name' in arn_details:
2538 return self._construct_outpost_endpoint(region_name)
2539 account = arn_details['account']
2540 return self._construct_s3_control_endpoint(region_name, account)
2542 def _is_valid_host_label(self, label):
2543 return self._HOST_LABEL_REGEX.match(label)
2545 def _validate_host_labels(self, *labels):
2546 for label in labels:
2547 if not self._is_valid_host_label(label):
2548 raise InvalidHostLabelError(label=label)
2550 def _construct_s3_control_endpoint(self, region_name, account):
2551 self._validate_host_labels(region_name, account)
2552 if self._endpoint_url:
2553 endpoint_url_netloc = urlsplit(self._endpoint_url).netloc
2554 netloc = [account, endpoint_url_netloc]
2555 else:
2556 netloc = [
2557 account,
2558 's3-control',
2559 ]
2560 self._add_dualstack(netloc)
2561 dns_suffix = self._get_dns_suffix(region_name)
2562 netloc.extend([region_name, dns_suffix])
2563 return self._construct_netloc(netloc)
2565 def _construct_outpost_endpoint(self, region_name):
2566 self._validate_host_labels(region_name)
2567 if self._endpoint_url:
2568 return urlsplit(self._endpoint_url).netloc
2569 else:
2570 netloc = [
2571 's3-outposts',
2572 region_name,
2573 self._get_dns_suffix(region_name),
2574 ]
2575 self._add_fips(netloc)
2576 return self._construct_netloc(netloc)
2578 def _construct_netloc(self, netloc):
2579 return '.'.join(netloc)
2581 def _add_fips(self, netloc):
2582 if self._use_fips_endpoint:
2583 netloc[0] = netloc[0] + '-fips'
2585 def _add_dualstack(self, netloc):
2586 if self._s3_config.get('use_dualstack_endpoint'):
2587 netloc.append('dualstack')
2589 def _get_dns_suffix(self, region_name):
2590 resolved = self._endpoint_resolver.construct_endpoint(
2591 's3', region_name
2592 )
2593 dns_suffix = self._DEFAULT_DNS_SUFFIX
2594 if resolved and 'dnsSuffix' in resolved:
2595 dns_suffix = resolved['dnsSuffix']
2596 return dns_suffix
2598 def _override_signing_region(self, request, region_name):
2599 signing_context = request.context.get('signing', {})
2600 # S3SigV4Auth will use the context['signing']['region'] value to
2601 # sign with if present. This is used by the Bucket redirector
2602 # as well but we should be fine because the redirector is never
2603 # used in combination with the accesspoint setting logic.
2604 signing_context['region'] = region_name
2605 request.context['signing'] = signing_context
2607 def _override_signing_name(self, request, signing_name):
2608 signing_context = request.context.get('signing', {})
2609 # S3SigV4Auth will use the context['signing']['signing_name'] value to
2610 # sign with if present. This is used by the Bucket redirector
2611 # as well but we should be fine because the redirector is never
2612 # used in combination with the accesspoint setting logic.
2613 signing_context['signing_name'] = signing_name
2614 request.context['signing'] = signing_context
2616 def _add_headers_from_arn_details(self, request):
2617 arn_details = request.context['arn_details']
2618 outpost_name = arn_details.get('outpost_name')
2619 if outpost_name:
2620 self._add_outpost_id_header(request, outpost_name)
2622 def _add_outpost_id_header(self, request, outpost_name):
2623 request.headers['x-amz-outpost-id'] = outpost_name
2626class S3ControlArnParamHandler:
2627 """This handler has been replaced by S3ControlArnParamHandlerv2. The
2628 original version remains in place for any third-party importers.
2629 """
2631 _RESOURCE_SPLIT_REGEX = re.compile(r'[/:]')
2633 def __init__(self, arn_parser=None):
2634 self._arn_parser = arn_parser
2635 if arn_parser is None:
2636 self._arn_parser = ArnParser()
2637 warnings.warn(
2638 'The S3ControlArnParamHandler class has been deprecated for a new '
2639 'internal replacement. A future version of botocore may remove '
2640 'this class.',
2641 category=FutureWarning,
2642 )
2644 def register(self, event_emitter):
2645 event_emitter.register(
2646 'before-parameter-build.s3-control',
2647 self.handle_arn,
2648 )
2650 def handle_arn(self, params, model, context, **kwargs):
2651 if model.name in ('CreateBucket', 'ListRegionalBuckets'):
2652 # CreateBucket and ListRegionalBuckets are special cases that do
2653 # not obey ARN based redirection but will redirect based off of the
2654 # presence of the OutpostId parameter
2655 self._handle_outpost_id_param(params, model, context)
2656 else:
2657 self._handle_name_param(params, model, context)
2658 self._handle_bucket_param(params, model, context)
2660 def _get_arn_details_from_param(self, params, param_name):
2661 if param_name not in params:
2662 return None
2663 try:
2664 arn = params[param_name]
2665 arn_details = self._arn_parser.parse_arn(arn)
2666 arn_details['original'] = arn
2667 arn_details['resources'] = self._split_resource(arn_details)
2668 return arn_details
2669 except InvalidArnException:
2670 return None
2672 def _split_resource(self, arn_details):
2673 return self._RESOURCE_SPLIT_REGEX.split(arn_details['resource'])
2675 def _override_account_id_param(self, params, arn_details):
2676 account_id = arn_details['account']
2677 if 'AccountId' in params and params['AccountId'] != account_id:
2678 error_msg = (
2679 'Account ID in arn does not match the AccountId parameter '
2680 'provided: "%s"'
2681 ) % params['AccountId']
2682 raise UnsupportedS3ControlArnError(
2683 arn=arn_details['original'],
2684 msg=error_msg,
2685 )
2686 params['AccountId'] = account_id
2688 def _handle_outpost_id_param(self, params, model, context):
2689 if 'OutpostId' not in params:
2690 return
2691 context['outpost_id'] = params['OutpostId']
2693 def _handle_name_param(self, params, model, context):
2694 # CreateAccessPoint is a special case that does not expand Name
2695 if model.name == 'CreateAccessPoint':
2696 return
2697 arn_details = self._get_arn_details_from_param(params, 'Name')
2698 if arn_details is None:
2699 return
2700 if self._is_outpost_accesspoint(arn_details):
2701 self._store_outpost_accesspoint(params, context, arn_details)
2702 else:
2703 error_msg = 'The Name parameter does not support the provided ARN'
2704 raise UnsupportedS3ControlArnError(
2705 arn=arn_details['original'],
2706 msg=error_msg,
2707 )
2709 def _is_outpost_accesspoint(self, arn_details):
2710 if arn_details['service'] != 's3-outposts':
2711 return False
2712 resources = arn_details['resources']
2713 if len(resources) != 4:
2714 return False
2715 # Resource must be of the form outpost/op-123/accesspoint/name
2716 return resources[0] == 'outpost' and resources[2] == 'accesspoint'
2718 def _store_outpost_accesspoint(self, params, context, arn_details):
2719 self._override_account_id_param(params, arn_details)
2720 accesspoint_name = arn_details['resources'][3]
2721 params['Name'] = accesspoint_name
2722 arn_details['accesspoint_name'] = accesspoint_name
2723 arn_details['outpost_name'] = arn_details['resources'][1]
2724 context['arn_details'] = arn_details
2726 def _handle_bucket_param(self, params, model, context):
2727 arn_details = self._get_arn_details_from_param(params, 'Bucket')
2728 if arn_details is None:
2729 return
2730 if self._is_outpost_bucket(arn_details):
2731 self._store_outpost_bucket(params, context, arn_details)
2732 else:
2733 error_msg = (
2734 'The Bucket parameter does not support the provided ARN'
2735 )
2736 raise UnsupportedS3ControlArnError(
2737 arn=arn_details['original'],
2738 msg=error_msg,
2739 )
2741 def _is_outpost_bucket(self, arn_details):
2742 if arn_details['service'] != 's3-outposts':
2743 return False
2744 resources = arn_details['resources']
2745 if len(resources) != 4:
2746 return False
2747 # Resource must be of the form outpost/op-123/bucket/name
2748 return resources[0] == 'outpost' and resources[2] == 'bucket'
2750 def _store_outpost_bucket(self, params, context, arn_details):
2751 self._override_account_id_param(params, arn_details)
2752 bucket_name = arn_details['resources'][3]
2753 params['Bucket'] = bucket_name
2754 arn_details['bucket_name'] = bucket_name
2755 arn_details['outpost_name'] = arn_details['resources'][1]
2756 context['arn_details'] = arn_details
2759class S3ControlArnParamHandlerv2(S3ControlArnParamHandler):
2760 """Updated version of S3ControlArnParamHandler for use when
2761 EndpointRulesetResolver is in use for endpoint resolution.
2763 This class is considered private and subject to abrupt breaking changes or
2764 removal without prior announcement. Please do not use it directly.
2765 """
2767 def __init__(self, arn_parser=None):
2768 self._arn_parser = arn_parser
2769 if arn_parser is None:
2770 self._arn_parser = ArnParser()
2772 def register(self, event_emitter):
2773 event_emitter.register(
2774 'before-endpoint-resolution.s3-control',
2775 self.handle_arn,
2776 )
2778 def _handle_name_param(self, params, model, context):
2779 # CreateAccessPoint is a special case that does not expand Name
2780 if model.name == 'CreateAccessPoint':
2781 return
2782 arn_details = self._get_arn_details_from_param(params, 'Name')
2783 if arn_details is None:
2784 return
2785 self._raise_for_fips_pseudo_region(arn_details)
2786 self._raise_for_accelerate_endpoint(context)
2787 if self._is_outpost_accesspoint(arn_details):
2788 self._store_outpost_accesspoint(params, context, arn_details)
2789 else:
2790 error_msg = 'The Name parameter does not support the provided ARN'
2791 raise UnsupportedS3ControlArnError(
2792 arn=arn_details['original'],
2793 msg=error_msg,
2794 )
2796 def _store_outpost_accesspoint(self, params, context, arn_details):
2797 self._override_account_id_param(params, arn_details)
2799 def _handle_bucket_param(self, params, model, context):
2800 arn_details = self._get_arn_details_from_param(params, 'Bucket')
2801 if arn_details is None:
2802 return
2803 self._raise_for_fips_pseudo_region(arn_details)
2804 self._raise_for_accelerate_endpoint(context)
2805 if self._is_outpost_bucket(arn_details):
2806 self._store_outpost_bucket(params, context, arn_details)
2807 else:
2808 error_msg = (
2809 'The Bucket parameter does not support the provided ARN'
2810 )
2811 raise UnsupportedS3ControlArnError(
2812 arn=arn_details['original'],
2813 msg=error_msg,
2814 )
2816 def _store_outpost_bucket(self, params, context, arn_details):
2817 self._override_account_id_param(params, arn_details)
2819 def _raise_for_fips_pseudo_region(self, arn_details):
2820 # FIPS pseudo region names cannot be used in ARNs
2821 arn_region = arn_details['region']
2822 if arn_region.startswith('fips-') or arn_region.endswith('fips-'):
2823 raise UnsupportedS3ControlArnError(
2824 arn=arn_details['original'],
2825 msg='Invalid ARN, FIPS region not allowed in ARN.',
2826 )
2828 def _raise_for_accelerate_endpoint(self, context):
2829 s3_config = context['client_config'].s3 or {}
2830 if s3_config.get('use_accelerate_endpoint'):
2831 raise UnsupportedS3ControlConfigurationError(
2832 msg='S3 control client does not support accelerate endpoints',
2833 )
2836class ContainerMetadataFetcher:
2838 TIMEOUT_SECONDS = 2
2839 RETRY_ATTEMPTS = 3
2840 SLEEP_TIME = 1
2841 IP_ADDRESS = '169.254.170.2'
2842 _ALLOWED_HOSTS = [IP_ADDRESS, 'localhost', '127.0.0.1']
2844 def __init__(self, session=None, sleep=time.sleep):
2845 if session is None:
2846 session = botocore.httpsession.URLLib3Session(
2847 timeout=self.TIMEOUT_SECONDS
2848 )
2849 self._session = session
2850 self._sleep = sleep
2852 def retrieve_full_uri(self, full_url, headers=None):
2853 """Retrieve JSON metadata from container metadata.
2855 :type full_url: str
2856 :param full_url: The full URL of the metadata service.
2857 This should include the scheme as well, e.g
2858 "http://localhost:123/foo"
2860 """
2861 self._validate_allowed_url(full_url)
2862 return self._retrieve_credentials(full_url, headers)
2864 def _validate_allowed_url(self, full_url):
2865 parsed = botocore.compat.urlparse(full_url)
2866 is_whitelisted_host = self._check_if_whitelisted_host(parsed.hostname)
2867 if not is_whitelisted_host:
2868 raise ValueError(
2869 "Unsupported host '%s'. Can only "
2870 "retrieve metadata from these hosts: %s"
2871 % (parsed.hostname, ', '.join(self._ALLOWED_HOSTS))
2872 )
2874 def _check_if_whitelisted_host(self, host):
2875 if host in self._ALLOWED_HOSTS:
2876 return True
2877 return False
2879 def retrieve_uri(self, relative_uri):
2880 """Retrieve JSON metadata from ECS metadata.
2882 :type relative_uri: str
2883 :param relative_uri: A relative URI, e.g "/foo/bar?id=123"
2885 :return: The parsed JSON response.
2887 """
2888 full_url = self.full_url(relative_uri)
2889 return self._retrieve_credentials(full_url)
2891 def _retrieve_credentials(self, full_url, extra_headers=None):
2892 headers = {'Accept': 'application/json'}
2893 if extra_headers is not None:
2894 headers.update(extra_headers)
2895 attempts = 0
2896 while True:
2897 try:
2898 return self._get_response(
2899 full_url, headers, self.TIMEOUT_SECONDS
2900 )
2901 except MetadataRetrievalError as e:
2902 logger.debug(
2903 "Received error when attempting to retrieve "
2904 "container metadata: %s",
2905 e,
2906 exc_info=True,
2907 )
2908 self._sleep(self.SLEEP_TIME)
2909 attempts += 1
2910 if attempts >= self.RETRY_ATTEMPTS:
2911 raise
2913 def _get_response(self, full_url, headers, timeout):
2914 try:
2915 AWSRequest = botocore.awsrequest.AWSRequest
2916 request = AWSRequest(method='GET', url=full_url, headers=headers)
2917 response = self._session.send(request.prepare())
2918 response_text = response.content.decode('utf-8')
2919 if response.status_code != 200:
2920 raise MetadataRetrievalError(
2921 error_msg=(
2922 "Received non 200 response (%s) from ECS metadata: %s"
2923 )
2924 % (response.status_code, response_text)
2925 )
2926 try:
2927 return json.loads(response_text)
2928 except ValueError:
2929 error_msg = (
2930 "Unable to parse JSON returned from ECS metadata services"
2931 )
2932 logger.debug('%s:%s', error_msg, response_text)
2933 raise MetadataRetrievalError(error_msg=error_msg)
2934 except RETRYABLE_HTTP_ERRORS as e:
2935 error_msg = (
2936 "Received error when attempting to retrieve "
2937 "ECS metadata: %s" % e
2938 )
2939 raise MetadataRetrievalError(error_msg=error_msg)
2941 def full_url(self, relative_uri):
2942 return f'http://{self.IP_ADDRESS}{relative_uri}'
2945def get_environ_proxies(url):
2946 if should_bypass_proxies(url):
2947 return {}
2948 else:
2949 return getproxies()
2952def should_bypass_proxies(url):
2953 """
2954 Returns whether we should bypass proxies or not.
2955 """
2956 # NOTE: requests allowed for ip/cidr entries in no_proxy env that we don't
2957 # support current as urllib only checks DNS suffix
2958 # If the system proxy settings indicate that this URL should be bypassed,
2959 # don't proxy.
2960 # The proxy_bypass function is incredibly buggy on OS X in early versions
2961 # of Python 2.6, so allow this call to fail. Only catch the specific
2962 # exceptions we've seen, though: this call failing in other ways can reveal
2963 # legitimate problems.
2964 try:
2965 if proxy_bypass(urlparse(url).netloc):
2966 return True
2967 except (TypeError, socket.gaierror):
2968 pass
2970 return False
2973def determine_content_length(body):
2974 # No body, content length of 0
2975 if not body:
2976 return 0
2978 # Try asking the body for it's length
2979 try:
2980 return len(body)
2981 except (AttributeError, TypeError):
2982 pass
2984 # Try getting the length from a seekable stream
2985 if hasattr(body, 'seek') and hasattr(body, 'tell'):
2986 try:
2987 orig_pos = body.tell()
2988 body.seek(0, 2)
2989 end_file_pos = body.tell()
2990 body.seek(orig_pos)
2991 return end_file_pos - orig_pos
2992 except io.UnsupportedOperation:
2993 # in case when body is, for example, io.BufferedIOBase object
2994 # it has "seek" method which throws "UnsupportedOperation"
2995 # exception in such case we want to fall back to "chunked"
2996 # encoding
2997 pass
2998 # Failed to determine the length
2999 return None
3002def get_encoding_from_headers(headers, default='ISO-8859-1'):
3003 """Returns encodings from given HTTP Header Dict.
3005 :param headers: dictionary to extract encoding from.
3006 :param default: default encoding if the content-type is text
3007 """
3009 content_type = headers.get('content-type')
3011 if not content_type:
3012 return None
3014 message = email.message.Message()
3015 message['content-type'] = content_type
3016 charset = message.get_param("charset")
3018 if charset is not None:
3019 return charset
3021 if 'text' in content_type:
3022 return default
3025def calculate_md5(body, **kwargs):
3026 if isinstance(body, (bytes, bytearray)):
3027 binary_md5 = _calculate_md5_from_bytes(body)
3028 else:
3029 binary_md5 = _calculate_md5_from_file(body)
3030 return base64.b64encode(binary_md5).decode('ascii')
3033def _calculate_md5_from_bytes(body_bytes):
3034 md5 = get_md5(body_bytes)
3035 return md5.digest()
3038def _calculate_md5_from_file(fileobj):
3039 start_position = fileobj.tell()
3040 md5 = get_md5()
3041 for chunk in iter(lambda: fileobj.read(1024 * 1024), b''):
3042 md5.update(chunk)
3043 fileobj.seek(start_position)
3044 return md5.digest()
3047def conditionally_calculate_md5(params, **kwargs):
3048 """Only add a Content-MD5 if the system supports it."""
3049 headers = params['headers']
3050 body = params['body']
3051 checksum_context = params.get('context', {}).get('checksum', {})
3052 checksum_algorithm = checksum_context.get('request_algorithm')
3053 if checksum_algorithm and checksum_algorithm != 'conditional-md5':
3054 # Skip for requests that will have a flexible checksum applied
3055 return
3056 # If a header matching the x-amz-checksum-* pattern is present, we
3057 # assume a checksum has already been provided and an md5 is not needed
3058 for header in headers:
3059 if CHECKSUM_HEADER_PATTERN.match(header):
3060 return
3061 if MD5_AVAILABLE and body is not None and 'Content-MD5' not in headers:
3062 md5_digest = calculate_md5(body, **kwargs)
3063 params['headers']['Content-MD5'] = md5_digest
3066class FileWebIdentityTokenLoader:
3067 def __init__(self, web_identity_token_path, _open=open):
3068 self._web_identity_token_path = web_identity_token_path
3069 self._open = _open
3071 def __call__(self):
3072 with self._open(self._web_identity_token_path) as token_file:
3073 return token_file.read()
3076class SSOTokenLoader:
3077 def __init__(self, cache=None):
3078 if cache is None:
3079 cache = {}
3080 self._cache = cache
3082 def _generate_cache_key(self, start_url, session_name):
3083 input_str = start_url
3084 if session_name is not None:
3085 input_str = session_name
3086 return hashlib.sha1(input_str.encode('utf-8')).hexdigest()
3088 def save_token(self, start_url, token, session_name=None):
3089 cache_key = self._generate_cache_key(start_url, session_name)
3090 self._cache[cache_key] = token
3092 def __call__(self, start_url, session_name=None):
3093 cache_key = self._generate_cache_key(start_url, session_name)
3094 logger.debug(f'Checking for cached token at: {cache_key}')
3095 if cache_key not in self._cache:
3096 name = start_url
3097 if session_name is not None:
3098 name = session_name
3099 error_msg = f'Token for {name} does not exist'
3100 raise SSOTokenLoadError(error_msg=error_msg)
3102 token = self._cache[cache_key]
3103 if 'accessToken' not in token or 'expiresAt' not in token:
3104 error_msg = f'Token for {start_url} is invalid'
3105 raise SSOTokenLoadError(error_msg=error_msg)
3106 return token
3109class EventbridgeSignerSetter:
3110 _DEFAULT_PARTITION = 'aws'
3111 _DEFAULT_DNS_SUFFIX = 'amazonaws.com'
3113 def __init__(self, endpoint_resolver, region=None, endpoint_url=None):
3114 self._endpoint_resolver = endpoint_resolver
3115 self._region = region
3116 self._endpoint_url = endpoint_url
3118 def register(self, event_emitter):
3119 event_emitter.register(
3120 'before-parameter-build.events.PutEvents',
3121 self.check_for_global_endpoint,
3122 )
3123 event_emitter.register(
3124 'before-call.events.PutEvents', self.set_endpoint_url
3125 )
3127 def set_endpoint_url(self, params, context, **kwargs):
3128 if 'eventbridge_endpoint' in context:
3129 endpoint = context['eventbridge_endpoint']
3130 logger.debug(f"Rewriting URL from {params['url']} to {endpoint}")
3131 params['url'] = endpoint
3133 def check_for_global_endpoint(self, params, context, **kwargs):
3134 endpoint = params.get('EndpointId')
3135 if endpoint is None:
3136 return
3138 if len(endpoint) == 0:
3139 raise InvalidEndpointConfigurationError(
3140 msg='EndpointId must not be a zero length string'
3141 )
3143 if not HAS_CRT:
3144 raise MissingDependencyException(
3145 msg="Using EndpointId requires an additional "
3146 "dependency. You will need to pip install "
3147 "botocore[crt] before proceeding."
3148 )
3150 config = context.get('client_config')
3151 endpoint_variant_tags = None
3152 if config is not None:
3153 if config.use_fips_endpoint:
3154 raise InvalidEndpointConfigurationError(
3155 msg="FIPS is not supported with EventBridge "
3156 "multi-region endpoints."
3157 )
3158 if config.use_dualstack_endpoint:
3159 endpoint_variant_tags = ['dualstack']
3161 if self._endpoint_url is None:
3162 # Validate endpoint is a valid hostname component
3163 parts = urlparse(f'https://{endpoint}')
3164 if parts.hostname != endpoint:
3165 raise InvalidEndpointConfigurationError(
3166 msg='EndpointId is not a valid hostname component.'
3167 )
3168 resolved_endpoint = self._get_global_endpoint(
3169 endpoint, endpoint_variant_tags=endpoint_variant_tags
3170 )
3171 else:
3172 resolved_endpoint = self._endpoint_url
3174 context['eventbridge_endpoint'] = resolved_endpoint
3175 context['auth_type'] = 'v4a'
3177 def _get_global_endpoint(self, endpoint, endpoint_variant_tags=None):
3178 resolver = self._endpoint_resolver
3180 partition = resolver.get_partition_for_region(self._region)
3181 if partition is None:
3182 partition = self._DEFAULT_PARTITION
3183 dns_suffix = resolver.get_partition_dns_suffix(
3184 partition, endpoint_variant_tags=endpoint_variant_tags
3185 )
3186 if dns_suffix is None:
3187 dns_suffix = self._DEFAULT_DNS_SUFFIX
3189 return f"https://{endpoint}.endpoint.events.{dns_suffix}/"
3192def is_s3_accelerate_url(url):
3193 """Does the URL match the S3 Accelerate endpoint scheme?
3195 Virtual host naming style with bucket names in the netloc part of the URL
3196 are not allowed by this function.
3197 """
3198 if url is None:
3199 return False
3201 # Accelerate is only valid for Amazon endpoints.
3202 url_parts = urlsplit(url)
3203 if not url_parts.netloc.endswith(
3204 'amazonaws.com'
3205 ) or url_parts.scheme not in ['https', 'http']:
3206 return False
3208 # The first part of the URL must be s3-accelerate.
3209 parts = url_parts.netloc.split('.')
3210 if parts[0] != 's3-accelerate':
3211 return False
3213 # Url parts between 's3-accelerate' and 'amazonaws.com' which
3214 # represent different url features.
3215 feature_parts = parts[1:-2]
3217 # There should be no duplicate URL parts.
3218 if len(feature_parts) != len(set(feature_parts)):
3219 return False
3221 # Remaining parts must all be in the whitelist.
3222 return all(p in S3_ACCELERATE_WHITELIST for p in feature_parts)
3225class JSONFileCache:
3226 """JSON file cache.
3227 This provides a dict like interface that stores JSON serializable
3228 objects.
3229 The objects are serialized to JSON and stored in a file. These
3230 values can be retrieved at a later time.
3231 """
3233 CACHE_DIR = os.path.expanduser(os.path.join('~', '.aws', 'boto', 'cache'))
3235 def __init__(self, working_dir=CACHE_DIR, dumps_func=None):
3236 self._working_dir = working_dir
3237 if dumps_func is None:
3238 dumps_func = self._default_dumps
3239 self._dumps = dumps_func
3241 def _default_dumps(self, obj):
3242 return json.dumps(obj, default=self._serialize_if_needed)
3244 def __contains__(self, cache_key):
3245 actual_key = self._convert_cache_key(cache_key)
3246 return os.path.isfile(actual_key)
3248 def __getitem__(self, cache_key):
3249 """Retrieve value from a cache key."""
3250 actual_key = self._convert_cache_key(cache_key)
3251 try:
3252 with open(actual_key) as f:
3253 return json.load(f)
3254 except (OSError, ValueError):
3255 raise KeyError(cache_key)
3257 def __delitem__(self, cache_key):
3258 actual_key = self._convert_cache_key(cache_key)
3259 try:
3260 key_path = Path(actual_key)
3261 key_path.unlink()
3262 except FileNotFoundError:
3263 raise KeyError(cache_key)
3265 def __setitem__(self, cache_key, value):
3266 full_key = self._convert_cache_key(cache_key)
3267 try:
3268 file_content = self._dumps(value)
3269 except (TypeError, ValueError):
3270 raise ValueError(
3271 f"Value cannot be cached, must be "
3272 f"JSON serializable: {value}"
3273 )
3274 if not os.path.isdir(self._working_dir):
3275 os.makedirs(self._working_dir)
3276 with os.fdopen(
3277 os.open(full_key, os.O_WRONLY | os.O_CREAT, 0o600), 'w'
3278 ) as f:
3279 f.truncate()
3280 f.write(file_content)
3282 def _convert_cache_key(self, cache_key):
3283 full_path = os.path.join(self._working_dir, cache_key + '.json')
3284 return full_path
3286 def _serialize_if_needed(self, value, iso=False):
3287 if isinstance(value, datetime.datetime):
3288 if iso:
3289 return value.isoformat()
3290 return value.strftime('%Y-%m-%dT%H:%M:%S%Z')
3291 return value