Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/botocore/utils.py: 21%
1690 statements
« prev ^ index » next coverage.py v7.3.2, created at 2023-12-08 06:51 +0000
« prev ^ index » next coverage.py v7.3.2, created at 2023-12-08 06:51 +0000
1# Copyright 2012-2014 Amazon.com, Inc. or its affiliates. All Rights Reserved.
2#
3# Licensed under the Apache License, Version 2.0 (the "License"). You
4# may not use this file except in compliance with the License. A copy of
5# the License is located at
6#
7# http://aws.amazon.com/apache2.0/
8#
9# or in the "license" file accompanying this file. This file is
10# distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF
11# ANY KIND, either express or implied. See the License for the specific
12# language governing permissions and limitations under the License.
13import base64
14import binascii
15import datetime
16import email.message
17import functools
18import hashlib
19import io
20import logging
21import os
22import random
23import re
24import socket
25import time
26import warnings
27import weakref
28from datetime import datetime as _DatetimeClass
29from ipaddress import ip_address
30from pathlib import Path
31from urllib.request import getproxies, proxy_bypass
33import dateutil.parser
34from dateutil.tz import tzutc
35from urllib3.exceptions import LocationParseError
37import botocore
38import botocore.awsrequest
39import botocore.httpsession
41# IP Regexes retained for backwards compatibility
42from botocore.compat import HEX_PAT # noqa: F401
43from botocore.compat import IPV4_PAT # noqa: F401
44from botocore.compat import IPV6_ADDRZ_PAT # noqa: F401
45from botocore.compat import IPV6_PAT # noqa: F401
46from botocore.compat import LS32_PAT # noqa: F401
47from botocore.compat import UNRESERVED_PAT # noqa: F401
48from botocore.compat import ZONE_ID_PAT # noqa: F401
49from botocore.compat import (
50 HAS_CRT,
51 IPV4_RE,
52 IPV6_ADDRZ_RE,
53 MD5_AVAILABLE,
54 UNSAFE_URL_CHARS,
55 OrderedDict,
56 get_md5,
57 get_tzinfo_options,
58 json,
59 quote,
60 urlparse,
61 urlsplit,
62 urlunsplit,
63 zip_longest,
64)
65from botocore.exceptions import (
66 ClientError,
67 ConfigNotFound,
68 ConnectionClosedError,
69 ConnectTimeoutError,
70 EndpointConnectionError,
71 HTTPClientError,
72 InvalidDNSNameError,
73 InvalidEndpointConfigurationError,
74 InvalidExpressionError,
75 InvalidHostLabelError,
76 InvalidIMDSEndpointError,
77 InvalidIMDSEndpointModeError,
78 InvalidRegionError,
79 MetadataRetrievalError,
80 MissingDependencyException,
81 ReadTimeoutError,
82 SSOTokenLoadError,
83 UnsupportedOutpostResourceError,
84 UnsupportedS3AccesspointConfigurationError,
85 UnsupportedS3ArnError,
86 UnsupportedS3ConfigurationError,
87 UnsupportedS3ControlArnError,
88 UnsupportedS3ControlConfigurationError,
89)
91logger = logging.getLogger(__name__)
92DEFAULT_METADATA_SERVICE_TIMEOUT = 1
93METADATA_BASE_URL = 'http://169.254.169.254/'
94METADATA_BASE_URL_IPv6 = 'http://[fd00:ec2::254]/'
95METADATA_ENDPOINT_MODES = ('ipv4', 'ipv6')
97# These are chars that do not need to be urlencoded.
98# Based on rfc2986, section 2.3
99SAFE_CHARS = '-._~'
100LABEL_RE = re.compile(r'[a-z0-9][a-z0-9\-]*[a-z0-9]')
101RETRYABLE_HTTP_ERRORS = (
102 ReadTimeoutError,
103 EndpointConnectionError,
104 ConnectionClosedError,
105 ConnectTimeoutError,
106)
107S3_ACCELERATE_WHITELIST = ['dualstack']
108# In switching events from using service name / endpoint prefix to service
109# id, we have to preserve compatibility. This maps the instances where either
110# is different than the transformed service id.
111EVENT_ALIASES = {
112 "a4b": "alexa-for-business",
113 "alexaforbusiness": "alexa-for-business",
114 "api.mediatailor": "mediatailor",
115 "api.pricing": "pricing",
116 "api.sagemaker": "sagemaker",
117 "apigateway": "api-gateway",
118 "application-autoscaling": "application-auto-scaling",
119 "appstream2": "appstream",
120 "autoscaling": "auto-scaling",
121 "autoscaling-plans": "auto-scaling-plans",
122 "ce": "cost-explorer",
123 "cloudhsmv2": "cloudhsm-v2",
124 "cloudsearchdomain": "cloudsearch-domain",
125 "cognito-idp": "cognito-identity-provider",
126 "config": "config-service",
127 "cur": "cost-and-usage-report-service",
128 "data.iot": "iot-data-plane",
129 "data.jobs.iot": "iot-jobs-data-plane",
130 "data.mediastore": "mediastore-data",
131 "datapipeline": "data-pipeline",
132 "devicefarm": "device-farm",
133 "devices.iot1click": "iot-1click-devices-service",
134 "directconnect": "direct-connect",
135 "discovery": "application-discovery-service",
136 "dms": "database-migration-service",
137 "ds": "directory-service",
138 "dynamodbstreams": "dynamodb-streams",
139 "elasticbeanstalk": "elastic-beanstalk",
140 "elasticfilesystem": "efs",
141 "elasticloadbalancing": "elastic-load-balancing",
142 "elasticmapreduce": "emr",
143 "elastictranscoder": "elastic-transcoder",
144 "elb": "elastic-load-balancing",
145 "elbv2": "elastic-load-balancing-v2",
146 "email": "ses",
147 "entitlement.marketplace": "marketplace-entitlement-service",
148 "es": "elasticsearch-service",
149 "events": "eventbridge",
150 "cloudwatch-events": "eventbridge",
151 "iot-data": "iot-data-plane",
152 "iot-jobs-data": "iot-jobs-data-plane",
153 "iot1click-devices": "iot-1click-devices-service",
154 "iot1click-projects": "iot-1click-projects",
155 "kinesisanalytics": "kinesis-analytics",
156 "kinesisvideo": "kinesis-video",
157 "lex-models": "lex-model-building-service",
158 "lex-runtime": "lex-runtime-service",
159 "logs": "cloudwatch-logs",
160 "machinelearning": "machine-learning",
161 "marketplace-entitlement": "marketplace-entitlement-service",
162 "marketplacecommerceanalytics": "marketplace-commerce-analytics",
163 "metering.marketplace": "marketplace-metering",
164 "meteringmarketplace": "marketplace-metering",
165 "mgh": "migration-hub",
166 "models.lex": "lex-model-building-service",
167 "monitoring": "cloudwatch",
168 "mturk-requester": "mturk",
169 "opsworks-cm": "opsworkscm",
170 "projects.iot1click": "iot-1click-projects",
171 "resourcegroupstaggingapi": "resource-groups-tagging-api",
172 "route53": "route-53",
173 "route53domains": "route-53-domains",
174 "runtime.lex": "lex-runtime-service",
175 "runtime.sagemaker": "sagemaker-runtime",
176 "sdb": "simpledb",
177 "secretsmanager": "secrets-manager",
178 "serverlessrepo": "serverlessapplicationrepository",
179 "servicecatalog": "service-catalog",
180 "states": "sfn",
181 "stepfunctions": "sfn",
182 "storagegateway": "storage-gateway",
183 "streams.dynamodb": "dynamodb-streams",
184 "tagging": "resource-groups-tagging-api",
185}
188# This pattern can be used to detect if a header is a flexible checksum header
189CHECKSUM_HEADER_PATTERN = re.compile(
190 r'^X-Amz-Checksum-([a-z0-9]*)$',
191 flags=re.IGNORECASE,
192)
195def ensure_boolean(val):
196 """Ensures a boolean value if a string or boolean is provided
198 For strings, the value for True/False is case insensitive
199 """
200 if isinstance(val, bool):
201 return val
202 elif isinstance(val, str):
203 return val.lower() == 'true'
204 else:
205 return False
208def resolve_imds_endpoint_mode(session):
209 """Resolving IMDS endpoint mode to either IPv6 or IPv4.
211 ec2_metadata_service_endpoint_mode takes precedence over imds_use_ipv6.
212 """
213 endpoint_mode = session.get_config_variable(
214 'ec2_metadata_service_endpoint_mode'
215 )
216 if endpoint_mode is not None:
217 lendpoint_mode = endpoint_mode.lower()
218 if lendpoint_mode not in METADATA_ENDPOINT_MODES:
219 error_msg_kwargs = {
220 'mode': endpoint_mode,
221 'valid_modes': METADATA_ENDPOINT_MODES,
222 }
223 raise InvalidIMDSEndpointModeError(**error_msg_kwargs)
224 return lendpoint_mode
225 elif session.get_config_variable('imds_use_ipv6'):
226 return 'ipv6'
227 return 'ipv4'
230def is_json_value_header(shape):
231 """Determines if the provided shape is the special header type jsonvalue.
233 :type shape: botocore.shape
234 :param shape: Shape to be inspected for the jsonvalue trait.
236 :return: True if this type is a jsonvalue, False otherwise
237 :rtype: Bool
238 """
239 return (
240 hasattr(shape, 'serialization')
241 and shape.serialization.get('jsonvalue', False)
242 and shape.serialization.get('location') == 'header'
243 and shape.type_name == 'string'
244 )
247def has_header(header_name, headers):
248 """Case-insensitive check for header key."""
249 if header_name is None:
250 return False
251 elif isinstance(headers, botocore.awsrequest.HeadersDict):
252 return header_name in headers
253 else:
254 return header_name.lower() in [key.lower() for key in headers.keys()]
257def get_service_module_name(service_model):
258 """Returns the module name for a service
260 This is the value used in both the documentation and client class name
261 """
262 name = service_model.metadata.get(
263 'serviceAbbreviation',
264 service_model.metadata.get(
265 'serviceFullName', service_model.service_name
266 ),
267 )
268 name = name.replace('Amazon', '')
269 name = name.replace('AWS', '')
270 name = re.sub(r'\W+', '', name)
271 return name
274def normalize_url_path(path):
275 if not path:
276 return '/'
277 return remove_dot_segments(path)
280def normalize_boolean(val):
281 """Returns None if val is None, otherwise ensure value
282 converted to boolean"""
283 if val is None:
284 return val
285 else:
286 return ensure_boolean(val)
289def remove_dot_segments(url):
290 # RFC 3986, section 5.2.4 "Remove Dot Segments"
291 # Also, AWS services require consecutive slashes to be removed,
292 # so that's done here as well
293 if not url:
294 return ''
295 input_url = url.split('/')
296 output_list = []
297 for x in input_url:
298 if x and x != '.':
299 if x == '..':
300 if output_list:
301 output_list.pop()
302 else:
303 output_list.append(x)
305 if url[0] == '/':
306 first = '/'
307 else:
308 first = ''
309 if url[-1] == '/' and output_list:
310 last = '/'
311 else:
312 last = ''
313 return first + '/'.join(output_list) + last
316def validate_jmespath_for_set(expression):
317 # Validates a limited jmespath expression to determine if we can set a
318 # value based on it. Only works with dotted paths.
319 if not expression or expression == '.':
320 raise InvalidExpressionError(expression=expression)
322 for invalid in ['[', ']', '*']:
323 if invalid in expression:
324 raise InvalidExpressionError(expression=expression)
327def set_value_from_jmespath(source, expression, value, is_first=True):
328 # This takes a (limited) jmespath-like expression & can set a value based
329 # on it.
330 # Limitations:
331 # * Only handles dotted lookups
332 # * No offsets/wildcards/slices/etc.
333 if is_first:
334 validate_jmespath_for_set(expression)
336 bits = expression.split('.', 1)
337 current_key, remainder = bits[0], bits[1] if len(bits) > 1 else ''
339 if not current_key:
340 raise InvalidExpressionError(expression=expression)
342 if remainder:
343 if current_key not in source:
344 # We've got something in the expression that's not present in the
345 # source (new key). If there's any more bits, we'll set the key
346 # with an empty dictionary.
347 source[current_key] = {}
349 return set_value_from_jmespath(
350 source[current_key], remainder, value, is_first=False
351 )
353 # If we're down to a single key, set it.
354 source[current_key] = value
357def is_global_accesspoint(context):
358 """Determine if request is intended for an MRAP accesspoint."""
359 s3_accesspoint = context.get('s3_accesspoint', {})
360 is_global = s3_accesspoint.get('region') == ''
361 return is_global
364class _RetriesExceededError(Exception):
365 """Internal exception used when the number of retries are exceeded."""
367 pass
370class BadIMDSRequestError(Exception):
371 def __init__(self, request):
372 self.request = request
375class IMDSFetcher:
376 _RETRIES_EXCEEDED_ERROR_CLS = _RetriesExceededError
377 _TOKEN_PATH = 'latest/api/token'
378 _TOKEN_TTL = '21600'
380 def __init__(
381 self,
382 timeout=DEFAULT_METADATA_SERVICE_TIMEOUT,
383 num_attempts=1,
384 base_url=METADATA_BASE_URL,
385 env=None,
386 user_agent=None,
387 config=None,
388 ):
389 self._timeout = timeout
390 self._num_attempts = num_attempts
391 if config is None:
392 config = {}
393 self._base_url = self._select_base_url(base_url, config)
394 self._config = config
396 if env is None:
397 env = os.environ.copy()
398 self._disabled = (
399 env.get('AWS_EC2_METADATA_DISABLED', 'false').lower() == 'true'
400 )
401 self._imds_v1_disabled = config.get('ec2_metadata_v1_disabled')
402 self._user_agent = user_agent
403 self._session = botocore.httpsession.URLLib3Session(
404 timeout=self._timeout,
405 proxies=get_environ_proxies(self._base_url),
406 )
408 def get_base_url(self):
409 return self._base_url
411 def _select_base_url(self, base_url, config):
412 if config is None:
413 config = {}
415 requires_ipv6 = (
416 config.get('ec2_metadata_service_endpoint_mode') == 'ipv6'
417 )
418 custom_metadata_endpoint = config.get('ec2_metadata_service_endpoint')
420 if requires_ipv6 and custom_metadata_endpoint:
421 logger.warning(
422 "Custom endpoint and IMDS_USE_IPV6 are both set. Using custom endpoint."
423 )
425 chosen_base_url = None
427 if base_url != METADATA_BASE_URL:
428 chosen_base_url = base_url
429 elif custom_metadata_endpoint:
430 chosen_base_url = custom_metadata_endpoint
431 elif requires_ipv6:
432 chosen_base_url = METADATA_BASE_URL_IPv6
433 else:
434 chosen_base_url = METADATA_BASE_URL
436 logger.debug("IMDS ENDPOINT: %s" % chosen_base_url)
437 if not is_valid_uri(chosen_base_url):
438 raise InvalidIMDSEndpointError(endpoint=chosen_base_url)
440 return chosen_base_url
442 def _construct_url(self, path):
443 sep = ''
444 if self._base_url and not self._base_url.endswith('/'):
445 sep = '/'
446 return f'{self._base_url}{sep}{path}'
448 def _fetch_metadata_token(self):
449 self._assert_enabled()
450 url = self._construct_url(self._TOKEN_PATH)
451 headers = {
452 'x-aws-ec2-metadata-token-ttl-seconds': self._TOKEN_TTL,
453 }
454 self._add_user_agent(headers)
455 request = botocore.awsrequest.AWSRequest(
456 method='PUT', url=url, headers=headers
457 )
458 for i in range(self._num_attempts):
459 try:
460 response = self._session.send(request.prepare())
461 if response.status_code == 200:
462 return response.text
463 elif response.status_code in (404, 403, 405):
464 return None
465 elif response.status_code in (400,):
466 raise BadIMDSRequestError(request)
467 except ReadTimeoutError:
468 return None
469 except RETRYABLE_HTTP_ERRORS as e:
470 logger.debug(
471 "Caught retryable HTTP exception while making metadata "
472 "service request to %s: %s",
473 url,
474 e,
475 exc_info=True,
476 )
477 except HTTPClientError as e:
478 if isinstance(e.kwargs.get('error'), LocationParseError):
479 raise InvalidIMDSEndpointError(endpoint=url, error=e)
480 else:
481 raise
482 return None
484 def _get_request(self, url_path, retry_func, token=None):
485 """Make a get request to the Instance Metadata Service.
487 :type url_path: str
488 :param url_path: The path component of the URL to make a get request.
489 This arg is appended to the base_url that was provided in the
490 initializer.
492 :type retry_func: callable
493 :param retry_func: A function that takes the response as an argument
494 and determines if it needs to retry. By default empty and non
495 200 OK responses are retried.
497 :type token: str
498 :param token: Metadata token to send along with GET requests to IMDS.
499 """
500 self._assert_enabled()
501 if not token:
502 self._assert_v1_enabled()
503 if retry_func is None:
504 retry_func = self._default_retry
505 url = self._construct_url(url_path)
506 headers = {}
507 if token is not None:
508 headers['x-aws-ec2-metadata-token'] = token
509 self._add_user_agent(headers)
510 for i in range(self._num_attempts):
511 try:
512 request = botocore.awsrequest.AWSRequest(
513 method='GET', url=url, headers=headers
514 )
515 response = self._session.send(request.prepare())
516 if not retry_func(response):
517 return response
518 except RETRYABLE_HTTP_ERRORS as e:
519 logger.debug(
520 "Caught retryable HTTP exception while making metadata "
521 "service request to %s: %s",
522 url,
523 e,
524 exc_info=True,
525 )
526 raise self._RETRIES_EXCEEDED_ERROR_CLS()
528 def _add_user_agent(self, headers):
529 if self._user_agent is not None:
530 headers['User-Agent'] = self._user_agent
532 def _assert_enabled(self):
533 if self._disabled:
534 logger.debug("Access to EC2 metadata has been disabled.")
535 raise self._RETRIES_EXCEEDED_ERROR_CLS()
537 def _assert_v1_enabled(self):
538 if self._imds_v1_disabled:
539 raise MetadataRetrievalError(
540 error_msg="Unable to retrieve token for use in IMDSv2 call and IMDSv1 has been disabled"
541 )
543 def _default_retry(self, response):
544 return self._is_non_ok_response(response) or self._is_empty(response)
546 def _is_non_ok_response(self, response):
547 if response.status_code != 200:
548 self._log_imds_response(response, 'non-200', log_body=True)
549 return True
550 return False
552 def _is_empty(self, response):
553 if not response.content:
554 self._log_imds_response(response, 'no body', log_body=True)
555 return True
556 return False
558 def _log_imds_response(self, response, reason_to_log, log_body=False):
559 statement = (
560 "Metadata service returned %s response "
561 "with status code of %s for url: %s"
562 )
563 logger_args = [reason_to_log, response.status_code, response.url]
564 if log_body:
565 statement += ", content body: %s"
566 logger_args.append(response.content)
567 logger.debug(statement, *logger_args)
570class InstanceMetadataFetcher(IMDSFetcher):
571 _URL_PATH = 'latest/meta-data/iam/security-credentials/'
572 _REQUIRED_CREDENTIAL_FIELDS = [
573 'AccessKeyId',
574 'SecretAccessKey',
575 'Token',
576 'Expiration',
577 ]
579 def retrieve_iam_role_credentials(self):
580 try:
581 token = self._fetch_metadata_token()
582 role_name = self._get_iam_role(token)
583 credentials = self._get_credentials(role_name, token)
584 if self._contains_all_credential_fields(credentials):
585 credentials = {
586 'role_name': role_name,
587 'access_key': credentials['AccessKeyId'],
588 'secret_key': credentials['SecretAccessKey'],
589 'token': credentials['Token'],
590 'expiry_time': credentials['Expiration'],
591 }
592 self._evaluate_expiration(credentials)
593 return credentials
594 else:
595 # IMDS can return a 200 response that has a JSON formatted
596 # error message (i.e. if ec2 is not trusted entity for the
597 # attached role). We do not necessarily want to retry for
598 # these and we also do not necessarily want to raise a key
599 # error. So at least log the problematic response and return
600 # an empty dictionary to signal that it was not able to
601 # retrieve credentials. These error will contain both a
602 # Code and Message key.
603 if 'Code' in credentials and 'Message' in credentials:
604 logger.debug(
605 'Error response received when retrieving'
606 'credentials: %s.',
607 credentials,
608 )
609 return {}
610 except self._RETRIES_EXCEEDED_ERROR_CLS:
611 logger.debug(
612 "Max number of attempts exceeded (%s) when "
613 "attempting to retrieve data from metadata service.",
614 self._num_attempts,
615 )
616 except BadIMDSRequestError as e:
617 logger.debug("Bad IMDS request: %s", e.request)
618 return {}
620 def _get_iam_role(self, token=None):
621 return self._get_request(
622 url_path=self._URL_PATH,
623 retry_func=self._needs_retry_for_role_name,
624 token=token,
625 ).text
627 def _get_credentials(self, role_name, token=None):
628 r = self._get_request(
629 url_path=self._URL_PATH + role_name,
630 retry_func=self._needs_retry_for_credentials,
631 token=token,
632 )
633 return json.loads(r.text)
635 def _is_invalid_json(self, response):
636 try:
637 json.loads(response.text)
638 return False
639 except ValueError:
640 self._log_imds_response(response, 'invalid json')
641 return True
643 def _needs_retry_for_role_name(self, response):
644 return self._is_non_ok_response(response) or self._is_empty(response)
646 def _needs_retry_for_credentials(self, response):
647 return (
648 self._is_non_ok_response(response)
649 or self._is_empty(response)
650 or self._is_invalid_json(response)
651 )
653 def _contains_all_credential_fields(self, credentials):
654 for field in self._REQUIRED_CREDENTIAL_FIELDS:
655 if field not in credentials:
656 logger.debug(
657 'Retrieved credentials is missing required field: %s',
658 field,
659 )
660 return False
661 return True
663 def _evaluate_expiration(self, credentials):
664 expiration = credentials.get("expiry_time")
665 if expiration is None:
666 return
667 try:
668 expiration = datetime.datetime.strptime(
669 expiration, "%Y-%m-%dT%H:%M:%SZ"
670 )
671 refresh_interval = self._config.get(
672 "ec2_credential_refresh_window", 60 * 10
673 )
674 jitter = random.randint(120, 600) # Between 2 to 10 minutes
675 refresh_interval_with_jitter = refresh_interval + jitter
676 current_time = datetime.datetime.utcnow()
677 refresh_offset = datetime.timedelta(
678 seconds=refresh_interval_with_jitter
679 )
680 extension_time = expiration - refresh_offset
681 if current_time >= extension_time:
682 new_time = current_time + refresh_offset
683 credentials["expiry_time"] = new_time.strftime(
684 "%Y-%m-%dT%H:%M:%SZ"
685 )
686 logger.info(
687 f"Attempting credential expiration extension due to a "
688 f"credential service availability issue. A refresh of "
689 f"these credentials will be attempted again within "
690 f"the next {refresh_interval_with_jitter/60:.0f} minutes."
691 )
692 except ValueError:
693 logger.debug(
694 f"Unable to parse expiry_time in {credentials['expiry_time']}"
695 )
698class IMDSRegionProvider:
699 def __init__(self, session, environ=None, fetcher=None):
700 """Initialize IMDSRegionProvider.
701 :type session: :class:`botocore.session.Session`
702 :param session: The session is needed to look up configuration for
703 how to contact the instance metadata service. Specifically the
704 whether or not it should use the IMDS region at all, and if so how
705 to configure the timeout and number of attempts to reach the
706 service.
707 :type environ: None or dict
708 :param environ: A dictionary of environment variables to use. If
709 ``None`` is the argument then ``os.environ`` will be used by
710 default.
711 :type fecther: :class:`botocore.utils.InstanceMetadataRegionFetcher`
712 :param fetcher: The class to actually handle the fetching of the region
713 from the IMDS. If not provided a default one will be created.
714 """
715 self._session = session
716 if environ is None:
717 environ = os.environ
718 self._environ = environ
719 self._fetcher = fetcher
721 def provide(self):
722 """Provide the region value from IMDS."""
723 instance_region = self._get_instance_metadata_region()
724 return instance_region
726 def _get_instance_metadata_region(self):
727 fetcher = self._get_fetcher()
728 region = fetcher.retrieve_region()
729 return region
731 def _get_fetcher(self):
732 if self._fetcher is None:
733 self._fetcher = self._create_fetcher()
734 return self._fetcher
736 def _create_fetcher(self):
737 metadata_timeout = self._session.get_config_variable(
738 'metadata_service_timeout'
739 )
740 metadata_num_attempts = self._session.get_config_variable(
741 'metadata_service_num_attempts'
742 )
743 imds_config = {
744 'ec2_metadata_service_endpoint': self._session.get_config_variable(
745 'ec2_metadata_service_endpoint'
746 ),
747 'ec2_metadata_service_endpoint_mode': resolve_imds_endpoint_mode(
748 self._session
749 ),
750 'ec2_metadata_v1_disabled': self._session.get_config_variable(
751 'ec2_metadata_v1_disabled'
752 ),
753 }
754 fetcher = InstanceMetadataRegionFetcher(
755 timeout=metadata_timeout,
756 num_attempts=metadata_num_attempts,
757 env=self._environ,
758 user_agent=self._session.user_agent(),
759 config=imds_config,
760 )
761 return fetcher
764class InstanceMetadataRegionFetcher(IMDSFetcher):
765 _URL_PATH = 'latest/meta-data/placement/availability-zone/'
767 def retrieve_region(self):
768 """Get the current region from the instance metadata service.
769 :rvalue: str
770 :returns: The region the current instance is running in or None
771 if the instance metadata service cannot be contacted or does not
772 give a valid response.
773 :rtype: None or str
774 :returns: Returns the region as a string if it is configured to use
775 IMDS as a region source. Otherwise returns ``None``. It will also
776 return ``None`` if it fails to get the region from IMDS due to
777 exhausting its retries or not being able to connect.
778 """
779 try:
780 region = self._get_region()
781 return region
782 except self._RETRIES_EXCEEDED_ERROR_CLS:
783 logger.debug(
784 "Max number of attempts exceeded (%s) when "
785 "attempting to retrieve data from metadata service.",
786 self._num_attempts,
787 )
788 return None
790 def _get_region(self):
791 token = self._fetch_metadata_token()
792 response = self._get_request(
793 url_path=self._URL_PATH,
794 retry_func=self._default_retry,
795 token=token,
796 )
797 availability_zone = response.text
798 region = availability_zone[:-1]
799 return region
802def merge_dicts(dict1, dict2, append_lists=False):
803 """Given two dict, merge the second dict into the first.
805 The dicts can have arbitrary nesting.
807 :param append_lists: If true, instead of clobbering a list with the new
808 value, append all of the new values onto the original list.
809 """
810 for key in dict2:
811 if isinstance(dict2[key], dict):
812 if key in dict1 and key in dict2:
813 merge_dicts(dict1[key], dict2[key])
814 else:
815 dict1[key] = dict2[key]
816 # If the value is a list and the ``append_lists`` flag is set,
817 # append the new values onto the original list
818 elif isinstance(dict2[key], list) and append_lists:
819 # The value in dict1 must be a list in order to append new
820 # values onto it.
821 if key in dict1 and isinstance(dict1[key], list):
822 dict1[key].extend(dict2[key])
823 else:
824 dict1[key] = dict2[key]
825 else:
826 # At scalar types, we iterate and merge the
827 # current dict that we're on.
828 dict1[key] = dict2[key]
831def lowercase_dict(original):
832 """Copies the given dictionary ensuring all keys are lowercase strings."""
833 copy = {}
834 for key in original:
835 copy[key.lower()] = original[key]
836 return copy
839def parse_key_val_file(filename, _open=open):
840 try:
841 with _open(filename) as f:
842 contents = f.read()
843 return parse_key_val_file_contents(contents)
844 except OSError:
845 raise ConfigNotFound(path=filename)
848def parse_key_val_file_contents(contents):
849 # This was originally extracted from the EC2 credential provider, which was
850 # fairly lenient in its parsing. We only try to parse key/val pairs if
851 # there's a '=' in the line.
852 final = {}
853 for line in contents.splitlines():
854 if '=' not in line:
855 continue
856 key, val = line.split('=', 1)
857 key = key.strip()
858 val = val.strip()
859 final[key] = val
860 return final
863def percent_encode_sequence(mapping, safe=SAFE_CHARS):
864 """Urlencode a dict or list into a string.
866 This is similar to urllib.urlencode except that:
868 * It uses quote, and not quote_plus
869 * It has a default list of safe chars that don't need
870 to be encoded, which matches what AWS services expect.
872 If any value in the input ``mapping`` is a list type,
873 then each list element wil be serialized. This is the equivalent
874 to ``urlencode``'s ``doseq=True`` argument.
876 This function should be preferred over the stdlib
877 ``urlencode()`` function.
879 :param mapping: Either a dict to urlencode or a list of
880 ``(key, value)`` pairs.
882 """
883 encoded_pairs = []
884 if hasattr(mapping, 'items'):
885 pairs = mapping.items()
886 else:
887 pairs = mapping
888 for key, value in pairs:
889 if isinstance(value, list):
890 for element in value:
891 encoded_pairs.append(
892 f'{percent_encode(key)}={percent_encode(element)}'
893 )
894 else:
895 encoded_pairs.append(
896 f'{percent_encode(key)}={percent_encode(value)}'
897 )
898 return '&'.join(encoded_pairs)
901def percent_encode(input_str, safe=SAFE_CHARS):
902 """Urlencodes a string.
904 Whereas percent_encode_sequence handles taking a dict/sequence and
905 producing a percent encoded string, this function deals only with
906 taking a string (not a dict/sequence) and percent encoding it.
908 If given the binary type, will simply URL encode it. If given the
909 text type, will produce the binary type by UTF-8 encoding the
910 text. If given something else, will convert it to the text type
911 first.
912 """
913 # If its not a binary or text string, make it a text string.
914 if not isinstance(input_str, (bytes, str)):
915 input_str = str(input_str)
916 # If it's not bytes, make it bytes by UTF-8 encoding it.
917 if not isinstance(input_str, bytes):
918 input_str = input_str.encode('utf-8')
919 return quote(input_str, safe=safe)
922def _epoch_seconds_to_datetime(value, tzinfo):
923 """Parse numerical epoch timestamps (seconds since 1970) into a
924 ``datetime.datetime`` in UTC using ``datetime.timedelta``. This is intended
925 as fallback when ``fromtimestamp`` raises ``OverflowError`` or ``OSError``.
927 :type value: float or int
928 :param value: The Unix timestamps as number.
930 :type tzinfo: callable
931 :param tzinfo: A ``datetime.tzinfo`` class or compatible callable.
932 """
933 epoch_zero = datetime.datetime(1970, 1, 1, 0, 0, 0, tzinfo=tzutc())
934 epoch_zero_localized = epoch_zero.astimezone(tzinfo())
935 return epoch_zero_localized + datetime.timedelta(seconds=value)
938def _parse_timestamp_with_tzinfo(value, tzinfo):
939 """Parse timestamp with pluggable tzinfo options."""
940 if isinstance(value, (int, float)):
941 # Possibly an epoch time.
942 return datetime.datetime.fromtimestamp(value, tzinfo())
943 else:
944 try:
945 return datetime.datetime.fromtimestamp(float(value), tzinfo())
946 except (TypeError, ValueError):
947 pass
948 try:
949 # In certain cases, a timestamp marked with GMT can be parsed into a
950 # different time zone, so here we provide a context which will
951 # enforce that GMT == UTC.
952 return dateutil.parser.parse(value, tzinfos={'GMT': tzutc()})
953 except (TypeError, ValueError) as e:
954 raise ValueError(f'Invalid timestamp "{value}": {e}')
957def parse_timestamp(value):
958 """Parse a timestamp into a datetime object.
960 Supported formats:
962 * iso8601
963 * rfc822
964 * epoch (value is an integer)
966 This will return a ``datetime.datetime`` object.
968 """
969 tzinfo_options = get_tzinfo_options()
970 for tzinfo in tzinfo_options:
971 try:
972 return _parse_timestamp_with_tzinfo(value, tzinfo)
973 except (OSError, OverflowError) as e:
974 logger.debug(
975 'Unable to parse timestamp with "%s" timezone info.',
976 tzinfo.__name__,
977 exc_info=e,
978 )
979 # For numeric values attempt fallback to using fromtimestamp-free method.
980 # From Python's ``datetime.datetime.fromtimestamp`` documentation: "This
981 # may raise ``OverflowError``, if the timestamp is out of the range of
982 # values supported by the platform C localtime() function, and ``OSError``
983 # on localtime() failure. It's common for this to be restricted to years
984 # from 1970 through 2038."
985 try:
986 numeric_value = float(value)
987 except (TypeError, ValueError):
988 pass
989 else:
990 try:
991 for tzinfo in tzinfo_options:
992 return _epoch_seconds_to_datetime(numeric_value, tzinfo=tzinfo)
993 except (OSError, OverflowError) as e:
994 logger.debug(
995 'Unable to parse timestamp using fallback method with "%s" '
996 'timezone info.',
997 tzinfo.__name__,
998 exc_info=e,
999 )
1000 raise RuntimeError(
1001 'Unable to calculate correct timezone offset for "%s"' % value
1002 )
1005def parse_to_aware_datetime(value):
1006 """Converted the passed in value to a datetime object with tzinfo.
1008 This function can be used to normalize all timestamp inputs. This
1009 function accepts a number of different types of inputs, but
1010 will always return a datetime.datetime object with time zone
1011 information.
1013 The input param ``value`` can be one of several types:
1015 * A datetime object (both naive and aware)
1016 * An integer representing the epoch time (can also be a string
1017 of the integer, i.e '0', instead of 0). The epoch time is
1018 considered to be UTC.
1019 * An iso8601 formatted timestamp. This does not need to be
1020 a complete timestamp, it can contain just the date portion
1021 without the time component.
1023 The returned value will be a datetime object that will have tzinfo.
1024 If no timezone info was provided in the input value, then UTC is
1025 assumed, not local time.
1027 """
1028 # This is a general purpose method that handles several cases of
1029 # converting the provided value to a string timestamp suitable to be
1030 # serialized to an http request. It can handle:
1031 # 1) A datetime.datetime object.
1032 if isinstance(value, _DatetimeClass):
1033 datetime_obj = value
1034 else:
1035 # 2) A string object that's formatted as a timestamp.
1036 # We document this as being an iso8601 timestamp, although
1037 # parse_timestamp is a bit more flexible.
1038 datetime_obj = parse_timestamp(value)
1039 if datetime_obj.tzinfo is None:
1040 # I think a case would be made that if no time zone is provided,
1041 # we should use the local time. However, to restore backwards
1042 # compat, the previous behavior was to assume UTC, which is
1043 # what we're going to do here.
1044 datetime_obj = datetime_obj.replace(tzinfo=tzutc())
1045 else:
1046 datetime_obj = datetime_obj.astimezone(tzutc())
1047 return datetime_obj
1050def datetime2timestamp(dt, default_timezone=None):
1051 """Calculate the timestamp based on the given datetime instance.
1053 :type dt: datetime
1054 :param dt: A datetime object to be converted into timestamp
1055 :type default_timezone: tzinfo
1056 :param default_timezone: If it is provided as None, we treat it as tzutc().
1057 But it is only used when dt is a naive datetime.
1058 :returns: The timestamp
1059 """
1060 epoch = datetime.datetime(1970, 1, 1)
1061 if dt.tzinfo is None:
1062 if default_timezone is None:
1063 default_timezone = tzutc()
1064 dt = dt.replace(tzinfo=default_timezone)
1065 d = dt.replace(tzinfo=None) - dt.utcoffset() - epoch
1066 if hasattr(d, "total_seconds"):
1067 return d.total_seconds() # Works in Python 3.6+
1068 return (
1069 d.microseconds + (d.seconds + d.days * 24 * 3600) * 10**6
1070 ) / 10**6
1073def calculate_sha256(body, as_hex=False):
1074 """Calculate a sha256 checksum.
1076 This method will calculate the sha256 checksum of a file like
1077 object. Note that this method will iterate through the entire
1078 file contents. The caller is responsible for ensuring the proper
1079 starting position of the file and ``seek()``'ing the file back
1080 to its starting location if other consumers need to read from
1081 the file like object.
1083 :param body: Any file like object. The file must be opened
1084 in binary mode such that a ``.read()`` call returns bytes.
1085 :param as_hex: If True, then the hex digest is returned.
1086 If False, then the digest (as binary bytes) is returned.
1088 :returns: The sha256 checksum
1090 """
1091 checksum = hashlib.sha256()
1092 for chunk in iter(lambda: body.read(1024 * 1024), b''):
1093 checksum.update(chunk)
1094 if as_hex:
1095 return checksum.hexdigest()
1096 else:
1097 return checksum.digest()
1100def calculate_tree_hash(body):
1101 """Calculate a tree hash checksum.
1103 For more information see:
1105 http://docs.aws.amazon.com/amazonglacier/latest/dev/checksum-calculations.html
1107 :param body: Any file like object. This has the same constraints as
1108 the ``body`` param in calculate_sha256
1110 :rtype: str
1111 :returns: The hex version of the calculated tree hash
1113 """
1114 chunks = []
1115 required_chunk_size = 1024 * 1024
1116 sha256 = hashlib.sha256
1117 for chunk in iter(lambda: body.read(required_chunk_size), b''):
1118 chunks.append(sha256(chunk).digest())
1119 if not chunks:
1120 return sha256(b'').hexdigest()
1121 while len(chunks) > 1:
1122 new_chunks = []
1123 for first, second in _in_pairs(chunks):
1124 if second is not None:
1125 new_chunks.append(sha256(first + second).digest())
1126 else:
1127 # We're at the end of the list and there's no pair left.
1128 new_chunks.append(first)
1129 chunks = new_chunks
1130 return binascii.hexlify(chunks[0]).decode('ascii')
1133def _in_pairs(iterable):
1134 # Creates iterator that iterates over the list in pairs:
1135 # for a, b in _in_pairs([0, 1, 2, 3, 4]):
1136 # print(a, b)
1137 #
1138 # will print:
1139 # 0, 1
1140 # 2, 3
1141 # 4, None
1142 shared_iter = iter(iterable)
1143 # Note that zip_longest is a compat import that uses
1144 # the itertools izip_longest. This creates an iterator,
1145 # this call below does _not_ immediately create the list
1146 # of pairs.
1147 return zip_longest(shared_iter, shared_iter)
1150class CachedProperty:
1151 """A read only property that caches the initially computed value.
1153 This descriptor will only call the provided ``fget`` function once.
1154 Subsequent access to this property will return the cached value.
1156 """
1158 def __init__(self, fget):
1159 self._fget = fget
1161 def __get__(self, obj, cls):
1162 if obj is None:
1163 return self
1164 else:
1165 computed_value = self._fget(obj)
1166 obj.__dict__[self._fget.__name__] = computed_value
1167 return computed_value
1170class ArgumentGenerator:
1171 """Generate sample input based on a shape model.
1173 This class contains a ``generate_skeleton`` method that will take
1174 an input/output shape (created from ``botocore.model``) and generate
1175 a sample dictionary corresponding to the input/output shape.
1177 The specific values used are place holder values. For strings either an
1178 empty string or the member name can be used, for numbers 0 or 0.0 is used.
1179 The intended usage of this class is to generate the *shape* of the input
1180 structure.
1182 This can be useful for operations that have complex input shapes.
1183 This allows a user to just fill in the necessary data instead of
1184 worrying about the specific structure of the input arguments.
1186 Example usage::
1188 s = botocore.session.get_session()
1189 ddb = s.get_service_model('dynamodb')
1190 arg_gen = ArgumentGenerator()
1191 sample_input = arg_gen.generate_skeleton(
1192 ddb.operation_model('CreateTable').input_shape)
1193 print("Sample input for dynamodb.CreateTable: %s" % sample_input)
1195 """
1197 def __init__(self, use_member_names=False):
1198 self._use_member_names = use_member_names
1200 def generate_skeleton(self, shape):
1201 """Generate a sample input.
1203 :type shape: ``botocore.model.Shape``
1204 :param shape: The input shape.
1206 :return: The generated skeleton input corresponding to the
1207 provided input shape.
1209 """
1210 stack = []
1211 return self._generate_skeleton(shape, stack)
1213 def _generate_skeleton(self, shape, stack, name=''):
1214 stack.append(shape.name)
1215 try:
1216 if shape.type_name == 'structure':
1217 return self._generate_type_structure(shape, stack)
1218 elif shape.type_name == 'list':
1219 return self._generate_type_list(shape, stack)
1220 elif shape.type_name == 'map':
1221 return self._generate_type_map(shape, stack)
1222 elif shape.type_name == 'string':
1223 if self._use_member_names:
1224 return name
1225 if shape.enum:
1226 return random.choice(shape.enum)
1227 return ''
1228 elif shape.type_name in ['integer', 'long']:
1229 return 0
1230 elif shape.type_name in ['float', 'double']:
1231 return 0.0
1232 elif shape.type_name == 'boolean':
1233 return True
1234 elif shape.type_name == 'timestamp':
1235 return datetime.datetime(1970, 1, 1, 0, 0, 0)
1236 finally:
1237 stack.pop()
1239 def _generate_type_structure(self, shape, stack):
1240 if stack.count(shape.name) > 1:
1241 return {}
1242 skeleton = OrderedDict()
1243 for member_name, member_shape in shape.members.items():
1244 skeleton[member_name] = self._generate_skeleton(
1245 member_shape, stack, name=member_name
1246 )
1247 return skeleton
1249 def _generate_type_list(self, shape, stack):
1250 # For list elements we've arbitrarily decided to
1251 # return two elements for the skeleton list.
1252 name = ''
1253 if self._use_member_names:
1254 name = shape.member.name
1255 return [
1256 self._generate_skeleton(shape.member, stack, name),
1257 ]
1259 def _generate_type_map(self, shape, stack):
1260 key_shape = shape.key
1261 value_shape = shape.value
1262 assert key_shape.type_name == 'string'
1263 return OrderedDict(
1264 [
1265 ('KeyName', self._generate_skeleton(value_shape, stack)),
1266 ]
1267 )
1270def is_valid_ipv6_endpoint_url(endpoint_url):
1271 if UNSAFE_URL_CHARS.intersection(endpoint_url):
1272 return False
1273 hostname = f'[{urlparse(endpoint_url).hostname}]'
1274 return IPV6_ADDRZ_RE.match(hostname) is not None
1277def is_valid_ipv4_endpoint_url(endpoint_url):
1278 hostname = urlparse(endpoint_url).hostname
1279 return IPV4_RE.match(hostname) is not None
1282def is_valid_endpoint_url(endpoint_url):
1283 """Verify the endpoint_url is valid.
1285 :type endpoint_url: string
1286 :param endpoint_url: An endpoint_url. Must have at least a scheme
1287 and a hostname.
1289 :return: True if the endpoint url is valid. False otherwise.
1291 """
1292 # post-bpo-43882 urlsplit() strips unsafe characters from URL, causing
1293 # it to pass hostname validation below. Detect them early to fix that.
1294 if UNSAFE_URL_CHARS.intersection(endpoint_url):
1295 return False
1296 parts = urlsplit(endpoint_url)
1297 hostname = parts.hostname
1298 if hostname is None:
1299 return False
1300 if len(hostname) > 255:
1301 return False
1302 if hostname[-1] == ".":
1303 hostname = hostname[:-1]
1304 allowed = re.compile(
1305 r"^((?!-)[A-Z\d-]{1,63}(?<!-)\.)*((?!-)[A-Z\d-]{1,63}(?<!-))$",
1306 re.IGNORECASE,
1307 )
1308 return allowed.match(hostname)
1311def is_valid_uri(endpoint_url):
1312 return is_valid_endpoint_url(endpoint_url) or is_valid_ipv6_endpoint_url(
1313 endpoint_url
1314 )
1317def validate_region_name(region_name):
1318 """Provided region_name must be a valid host label."""
1319 if region_name is None:
1320 return
1321 valid_host_label = re.compile(r'^(?![0-9]+$)(?!-)[a-zA-Z0-9-]{,63}(?<!-)$')
1322 valid = valid_host_label.match(region_name)
1323 if not valid:
1324 raise InvalidRegionError(region_name=region_name)
1327def check_dns_name(bucket_name):
1328 """
1329 Check to see if the ``bucket_name`` complies with the
1330 restricted DNS naming conventions necessary to allow
1331 access via virtual-hosting style.
1333 Even though "." characters are perfectly valid in this DNS
1334 naming scheme, we are going to punt on any name containing a
1335 "." character because these will cause SSL cert validation
1336 problems if we try to use virtual-hosting style addressing.
1337 """
1338 if '.' in bucket_name:
1339 return False
1340 n = len(bucket_name)
1341 if n < 3 or n > 63:
1342 # Wrong length
1343 return False
1344 match = LABEL_RE.match(bucket_name)
1345 if match is None or match.end() != len(bucket_name):
1346 return False
1347 return True
1350def fix_s3_host(
1351 request,
1352 signature_version,
1353 region_name,
1354 default_endpoint_url=None,
1355 **kwargs,
1356):
1357 """
1358 This handler looks at S3 requests just before they are signed.
1359 If there is a bucket name on the path (true for everything except
1360 ListAllBuckets) it checks to see if that bucket name conforms to
1361 the DNS naming conventions. If it does, it alters the request to
1362 use ``virtual hosting`` style addressing rather than ``path-style``
1363 addressing.
1365 """
1366 if request.context.get('use_global_endpoint', False):
1367 default_endpoint_url = 's3.amazonaws.com'
1368 try:
1369 switch_to_virtual_host_style(
1370 request, signature_version, default_endpoint_url
1371 )
1372 except InvalidDNSNameError as e:
1373 bucket_name = e.kwargs['bucket_name']
1374 logger.debug(
1375 'Not changing URI, bucket is not DNS compatible: %s', bucket_name
1376 )
1379def switch_to_virtual_host_style(
1380 request, signature_version, default_endpoint_url=None, **kwargs
1381):
1382 """
1383 This is a handler to force virtual host style s3 addressing no matter
1384 the signature version (which is taken in consideration for the default
1385 case). If the bucket is not DNS compatible an InvalidDNSName is thrown.
1387 :param request: A AWSRequest object that is about to be sent.
1388 :param signature_version: The signature version to sign with
1389 :param default_endpoint_url: The endpoint to use when switching to a
1390 virtual style. If None is supplied, the virtual host will be
1391 constructed from the url of the request.
1392 """
1393 if request.auth_path is not None:
1394 # The auth_path has already been applied (this may be a
1395 # retried request). We don't need to perform this
1396 # customization again.
1397 return
1398 elif _is_get_bucket_location_request(request):
1399 # For the GetBucketLocation response, we should not be using
1400 # the virtual host style addressing so we can avoid any sigv4
1401 # issues.
1402 logger.debug(
1403 "Request is GetBucketLocation operation, not checking "
1404 "for DNS compatibility."
1405 )
1406 return
1407 parts = urlsplit(request.url)
1408 request.auth_path = parts.path
1409 path_parts = parts.path.split('/')
1411 # Retrieve what the endpoint we will be prepending the bucket name to.
1412 if default_endpoint_url is None:
1413 default_endpoint_url = parts.netloc
1415 if len(path_parts) > 1:
1416 bucket_name = path_parts[1]
1417 if not bucket_name:
1418 # If the bucket name is empty we should not be checking for
1419 # dns compatibility.
1420 return
1421 logger.debug('Checking for DNS compatible bucket for: %s', request.url)
1422 if check_dns_name(bucket_name):
1423 # If the operation is on a bucket, the auth_path must be
1424 # terminated with a '/' character.
1425 if len(path_parts) == 2:
1426 if request.auth_path[-1] != '/':
1427 request.auth_path += '/'
1428 path_parts.remove(bucket_name)
1429 # At the very least the path must be a '/', such as with the
1430 # CreateBucket operation when DNS style is being used. If this
1431 # is not used you will get an empty path which is incorrect.
1432 path = '/'.join(path_parts) or '/'
1433 global_endpoint = default_endpoint_url
1434 host = bucket_name + '.' + global_endpoint
1435 new_tuple = (parts.scheme, host, path, parts.query, '')
1436 new_uri = urlunsplit(new_tuple)
1437 request.url = new_uri
1438 logger.debug('URI updated to: %s', new_uri)
1439 else:
1440 raise InvalidDNSNameError(bucket_name=bucket_name)
1443def _is_get_bucket_location_request(request):
1444 return request.url.endswith('?location')
1447def instance_cache(func):
1448 """Method decorator for caching method calls to a single instance.
1450 **This is not a general purpose caching decorator.**
1452 In order to use this, you *must* provide an ``_instance_cache``
1453 attribute on the instance.
1455 This decorator is used to cache method calls. The cache is only
1456 scoped to a single instance though such that multiple instances
1457 will maintain their own cache. In order to keep things simple,
1458 this decorator requires that you provide an ``_instance_cache``
1459 attribute on your instance.
1461 """
1462 func_name = func.__name__
1464 @functools.wraps(func)
1465 def _cache_guard(self, *args, **kwargs):
1466 cache_key = (func_name, args)
1467 if kwargs:
1468 kwarg_items = tuple(sorted(kwargs.items()))
1469 cache_key = (func_name, args, kwarg_items)
1470 result = self._instance_cache.get(cache_key)
1471 if result is not None:
1472 return result
1473 result = func(self, *args, **kwargs)
1474 self._instance_cache[cache_key] = result
1475 return result
1477 return _cache_guard
1480def lru_cache_weakref(*cache_args, **cache_kwargs):
1481 """
1482 Version of functools.lru_cache that stores a weak reference to ``self``.
1484 Serves the same purpose as :py:func:`instance_cache` but uses Python's
1485 functools implementation which offers ``max_size`` and ``typed`` properties.
1487 lru_cache is a global cache even when used on a method. The cache's
1488 reference to ``self`` will prevent garbace collection of the object. This
1489 wrapper around functools.lru_cache replaces the reference to ``self`` with
1490 a weak reference to not interfere with garbage collection.
1491 """
1493 def wrapper(func):
1494 @functools.lru_cache(*cache_args, **cache_kwargs)
1495 def func_with_weakref(weakref_to_self, *args, **kwargs):
1496 return func(weakref_to_self(), *args, **kwargs)
1498 @functools.wraps(func)
1499 def inner(self, *args, **kwargs):
1500 return func_with_weakref(weakref.ref(self), *args, **kwargs)
1502 inner.cache_info = func_with_weakref.cache_info
1503 return inner
1505 return wrapper
1508def switch_host_s3_accelerate(request, operation_name, **kwargs):
1509 """Switches the current s3 endpoint with an S3 Accelerate endpoint"""
1511 # Note that when registered the switching of the s3 host happens
1512 # before it gets changed to virtual. So we are not concerned with ensuring
1513 # that the bucket name is translated to the virtual style here and we
1514 # can hard code the Accelerate endpoint.
1515 parts = urlsplit(request.url).netloc.split('.')
1516 parts = [p for p in parts if p in S3_ACCELERATE_WHITELIST]
1517 endpoint = 'https://s3-accelerate.'
1518 if len(parts) > 0:
1519 endpoint += '.'.join(parts) + '.'
1520 endpoint += 'amazonaws.com'
1522 if operation_name in ['ListBuckets', 'CreateBucket', 'DeleteBucket']:
1523 return
1524 _switch_hosts(request, endpoint, use_new_scheme=False)
1527def switch_host_with_param(request, param_name):
1528 """Switches the host using a parameter value from a JSON request body"""
1529 request_json = json.loads(request.data.decode('utf-8'))
1530 if request_json.get(param_name):
1531 new_endpoint = request_json[param_name]
1532 _switch_hosts(request, new_endpoint)
1535def _switch_hosts(request, new_endpoint, use_new_scheme=True):
1536 final_endpoint = _get_new_endpoint(
1537 request.url, new_endpoint, use_new_scheme
1538 )
1539 request.url = final_endpoint
1542def _get_new_endpoint(original_endpoint, new_endpoint, use_new_scheme=True):
1543 new_endpoint_components = urlsplit(new_endpoint)
1544 original_endpoint_components = urlsplit(original_endpoint)
1545 scheme = original_endpoint_components.scheme
1546 if use_new_scheme:
1547 scheme = new_endpoint_components.scheme
1548 final_endpoint_components = (
1549 scheme,
1550 new_endpoint_components.netloc,
1551 original_endpoint_components.path,
1552 original_endpoint_components.query,
1553 '',
1554 )
1555 final_endpoint = urlunsplit(final_endpoint_components)
1556 logger.debug(f'Updating URI from {original_endpoint} to {final_endpoint}')
1557 return final_endpoint
1560def deep_merge(base, extra):
1561 """Deeply two dictionaries, overriding existing keys in the base.
1563 :param base: The base dictionary which will be merged into.
1564 :param extra: The dictionary to merge into the base. Keys from this
1565 dictionary will take precedence.
1566 """
1567 for key in extra:
1568 # If the key represents a dict on both given dicts, merge the sub-dicts
1569 if (
1570 key in base
1571 and isinstance(base[key], dict)
1572 and isinstance(extra[key], dict)
1573 ):
1574 deep_merge(base[key], extra[key])
1575 continue
1577 # Otherwise, set the key on the base to be the value of the extra.
1578 base[key] = extra[key]
1581def hyphenize_service_id(service_id):
1582 """Translate the form used for event emitters.
1584 :param service_id: The service_id to convert.
1585 """
1586 return service_id.replace(' ', '-').lower()
1589class IdentityCache:
1590 """Base IdentityCache implementation for storing and retrieving
1591 highly accessed credentials.
1593 This class is not intended to be instantiated in user code.
1594 """
1596 METHOD = "base_identity_cache"
1598 def __init__(self, client, credential_cls):
1599 self._client = client
1600 self._credential_cls = credential_cls
1602 def get_credentials(self, **kwargs):
1603 callback = self.build_refresh_callback(**kwargs)
1604 metadata = callback()
1605 credential_entry = self._credential_cls.create_from_metadata(
1606 metadata=metadata,
1607 refresh_using=callback,
1608 method=self.METHOD,
1609 advisory_timeout=45,
1610 mandatory_timeout=10,
1611 )
1612 return credential_entry
1614 def build_refresh_callback(**kwargs):
1615 """Callback to be implemented by subclasses.
1617 Returns a set of metadata to be converted into a new
1618 credential instance.
1619 """
1620 raise NotImplementedError()
1623class S3ExpressIdentityCache(IdentityCache):
1624 """S3Express IdentityCache for retrieving and storing
1625 credentials from CreateSession calls.
1627 This class is not intended to be instantiated in user code.
1628 """
1630 METHOD = "s3express"
1632 def __init__(self, client, credential_cls):
1633 self._client = client
1634 self._credential_cls = credential_cls
1636 @functools.lru_cache(maxsize=100)
1637 def get_credentials(self, bucket):
1638 return super().get_credentials(bucket=bucket)
1640 def build_refresh_callback(self, bucket):
1641 def refresher():
1642 response = self._client.create_session(Bucket=bucket)
1643 creds = response['Credentials']
1644 expiration = self._serialize_if_needed(
1645 creds['Expiration'], iso=True
1646 )
1647 return {
1648 "access_key": creds['AccessKeyId'],
1649 "secret_key": creds['SecretAccessKey'],
1650 "token": creds['SessionToken'],
1651 "expiry_time": expiration,
1652 }
1654 return refresher
1656 def _serialize_if_needed(self, value, iso=False):
1657 if isinstance(value, _DatetimeClass):
1658 if iso:
1659 return value.isoformat()
1660 return value.strftime('%Y-%m-%dT%H:%M:%S%Z')
1661 return value
1664class S3ExpressIdentityResolver:
1665 def __init__(self, client, credential_cls, cache=None):
1666 self._client = weakref.proxy(client)
1668 if cache is None:
1669 cache = S3ExpressIdentityCache(self._client, credential_cls)
1670 self._cache = cache
1672 def register(self, event_emitter=None):
1673 logger.debug('Registering S3Express Identity Resolver')
1674 emitter = event_emitter or self._client.meta.events
1675 emitter.register(
1676 'before-parameter-build.s3', self.inject_signing_cache_key
1677 )
1678 emitter.register('before-call.s3', self.apply_signing_cache_key)
1679 emitter.register('before-sign.s3', self.resolve_s3express_identity)
1681 def inject_signing_cache_key(self, params, context, **kwargs):
1682 if 'Bucket' in params:
1683 context['S3Express'] = {'bucket_name': params['Bucket']}
1685 def apply_signing_cache_key(self, params, context, **kwargs):
1686 endpoint_properties = context.get('endpoint_properties', {})
1687 backend = endpoint_properties.get('backend', None)
1689 # Add cache key if Bucket supplied for s3express request
1690 bucket_name = context.get('S3Express', {}).get('bucket_name')
1691 if backend == 'S3Express' and bucket_name is not None:
1692 context.setdefault('signing', {})
1693 context['signing']['cache_key'] = bucket_name
1695 def resolve_s3express_identity(
1696 self,
1697 request,
1698 signing_name,
1699 region_name,
1700 signature_version,
1701 request_signer,
1702 operation_name,
1703 **kwargs,
1704 ):
1705 signing_context = request.context.get('signing', {})
1706 signing_name = signing_context.get('signing_name')
1707 if signing_name == 's3express' and signature_version.startswith(
1708 'v4-s3express'
1709 ):
1710 signing_context['identity_cache'] = self._cache
1711 if 'cache_key' not in signing_context:
1712 signing_context['cache_key'] = (
1713 request.context.get('s3_redirect', {})
1714 .get('params', {})
1715 .get('Bucket')
1716 )
1719class S3RegionRedirectorv2:
1720 """Updated version of S3RegionRedirector for use when
1721 EndpointRulesetResolver is in use for endpoint resolution.
1723 This class is considered private and subject to abrupt breaking changes or
1724 removal without prior announcement. Please do not use it directly.
1725 """
1727 def __init__(self, endpoint_bridge, client, cache=None):
1728 self._cache = cache or {}
1729 self._client = weakref.proxy(client)
1731 def register(self, event_emitter=None):
1732 logger.debug('Registering S3 region redirector handler')
1733 emitter = event_emitter or self._client.meta.events
1734 emitter.register('needs-retry.s3', self.redirect_from_error)
1735 emitter.register(
1736 'before-parameter-build.s3', self.annotate_request_context
1737 )
1738 emitter.register(
1739 'before-endpoint-resolution.s3', self.redirect_from_cache
1740 )
1742 def redirect_from_error(self, request_dict, response, operation, **kwargs):
1743 """
1744 An S3 request sent to the wrong region will return an error that
1745 contains the endpoint the request should be sent to. This handler
1746 will add the redirect information to the signing context and then
1747 redirect the request.
1748 """
1749 if response is None:
1750 # This could be none if there was a ConnectionError or other
1751 # transport error.
1752 return
1754 redirect_ctx = request_dict.get('context', {}).get('s3_redirect', {})
1755 if ArnParser.is_arn(redirect_ctx.get('bucket')):
1756 logger.debug(
1757 'S3 request was previously for an Accesspoint ARN, not '
1758 'redirecting.'
1759 )
1760 return
1762 if redirect_ctx.get('redirected'):
1763 logger.debug(
1764 'S3 request was previously redirected, not redirecting.'
1765 )
1766 return
1768 error = response[1].get('Error', {})
1769 error_code = error.get('Code')
1770 response_metadata = response[1].get('ResponseMetadata', {})
1772 # We have to account for 400 responses because
1773 # if we sign a Head* request with the wrong region,
1774 # we'll get a 400 Bad Request but we won't get a
1775 # body saying it's an "AuthorizationHeaderMalformed".
1776 is_special_head_object = (
1777 error_code in ('301', '400') and operation.name == 'HeadObject'
1778 )
1779 is_special_head_bucket = (
1780 error_code in ('301', '400')
1781 and operation.name == 'HeadBucket'
1782 and 'x-amz-bucket-region'
1783 in response_metadata.get('HTTPHeaders', {})
1784 )
1785 is_wrong_signing_region = (
1786 error_code == 'AuthorizationHeaderMalformed' and 'Region' in error
1787 )
1788 is_redirect_status = response[0] is not None and response[
1789 0
1790 ].status_code in (301, 302, 307)
1791 is_permanent_redirect = error_code == 'PermanentRedirect'
1792 if not any(
1793 [
1794 is_special_head_object,
1795 is_wrong_signing_region,
1796 is_permanent_redirect,
1797 is_special_head_bucket,
1798 is_redirect_status,
1799 ]
1800 ):
1801 return
1803 bucket = request_dict['context']['s3_redirect']['bucket']
1804 client_region = request_dict['context'].get('client_region')
1805 new_region = self.get_bucket_region(bucket, response)
1807 if new_region is None:
1808 logger.debug(
1809 "S3 client configured for region %s but the bucket %s is not "
1810 "in that region and the proper region could not be "
1811 "automatically determined." % (client_region, bucket)
1812 )
1813 return
1815 logger.debug(
1816 "S3 client configured for region %s but the bucket %s is in region"
1817 " %s; Please configure the proper region to avoid multiple "
1818 "unnecessary redirects and signing attempts."
1819 % (client_region, bucket, new_region)
1820 )
1821 # Adding the new region to _cache will make construct_endpoint() to
1822 # use the new region as value for the AWS::Region builtin parameter.
1823 self._cache[bucket] = new_region
1825 # Re-resolve endpoint with new region and modify request_dict with
1826 # the new URL, auth scheme, and signing context.
1827 ep_resolver = self._client._ruleset_resolver
1828 ep_info = ep_resolver.construct_endpoint(
1829 operation_model=operation,
1830 call_args=request_dict['context']['s3_redirect']['params'],
1831 request_context=request_dict['context'],
1832 )
1833 request_dict['url'] = self.set_request_url(
1834 request_dict['url'], ep_info.url
1835 )
1836 request_dict['context']['s3_redirect']['redirected'] = True
1837 auth_schemes = ep_info.properties.get('authSchemes')
1838 if auth_schemes is not None:
1839 auth_info = ep_resolver.auth_schemes_to_signing_ctx(auth_schemes)
1840 auth_type, signing_context = auth_info
1841 request_dict['context']['auth_type'] = auth_type
1842 request_dict['context']['signing'] = {
1843 **request_dict['context'].get('signing', {}),
1844 **signing_context,
1845 }
1847 # Return 0 so it doesn't wait to retry
1848 return 0
1850 def get_bucket_region(self, bucket, response):
1851 """
1852 There are multiple potential sources for the new region to redirect to,
1853 but they aren't all universally available for use. This will try to
1854 find region from response elements, but will fall back to calling
1855 HEAD on the bucket if all else fails.
1857 :param bucket: The bucket to find the region for. This is necessary if
1858 the region is not available in the error response.
1859 :param response: A response representing a service request that failed
1860 due to incorrect region configuration.
1861 """
1862 # First try to source the region from the headers.
1863 service_response = response[1]
1864 response_headers = service_response['ResponseMetadata']['HTTPHeaders']
1865 if 'x-amz-bucket-region' in response_headers:
1866 return response_headers['x-amz-bucket-region']
1868 # Next, check the error body
1869 region = service_response.get('Error', {}).get('Region', None)
1870 if region is not None:
1871 return region
1873 # Finally, HEAD the bucket. No other choice sadly.
1874 try:
1875 response = self._client.head_bucket(Bucket=bucket)
1876 headers = response['ResponseMetadata']['HTTPHeaders']
1877 except ClientError as e:
1878 headers = e.response['ResponseMetadata']['HTTPHeaders']
1880 region = headers.get('x-amz-bucket-region', None)
1881 return region
1883 def set_request_url(self, old_url, new_endpoint, **kwargs):
1884 """
1885 Splice a new endpoint into an existing URL. Note that some endpoints
1886 from the the endpoint provider have a path component which will be
1887 discarded by this function.
1888 """
1889 return _get_new_endpoint(old_url, new_endpoint, False)
1891 def redirect_from_cache(self, builtins, params, **kwargs):
1892 """
1893 If a bucket name has been redirected before, it is in the cache. This
1894 handler will update the AWS::Region endpoint resolver builtin param
1895 to use the region from cache instead of the client region to avoid the
1896 redirect.
1897 """
1898 bucket = params.get('Bucket')
1899 if bucket is not None and bucket in self._cache:
1900 new_region = self._cache.get(bucket)
1901 builtins['AWS::Region'] = new_region
1903 def annotate_request_context(self, params, context, **kwargs):
1904 """Store the bucket name in context for later use when redirecting.
1905 The bucket name may be an access point ARN or alias.
1906 """
1907 bucket = params.get('Bucket')
1908 context['s3_redirect'] = {
1909 'redirected': False,
1910 'bucket': bucket,
1911 'params': params,
1912 }
1915class S3RegionRedirector:
1916 """This handler has been replaced by S3RegionRedirectorv2. The original
1917 version remains in place for any third-party libraries that import it.
1918 """
1920 def __init__(self, endpoint_bridge, client, cache=None):
1921 self._endpoint_resolver = endpoint_bridge
1922 self._cache = cache
1923 if self._cache is None:
1924 self._cache = {}
1926 # This needs to be a weak ref in order to prevent memory leaks on
1927 # python 2.6
1928 self._client = weakref.proxy(client)
1930 warnings.warn(
1931 'The S3RegionRedirector class has been deprecated for a new '
1932 'internal replacement. A future version of botocore may remove '
1933 'this class.',
1934 category=FutureWarning,
1935 )
1937 def register(self, event_emitter=None):
1938 emitter = event_emitter or self._client.meta.events
1939 emitter.register('needs-retry.s3', self.redirect_from_error)
1940 emitter.register('before-call.s3', self.set_request_url)
1941 emitter.register('before-parameter-build.s3', self.redirect_from_cache)
1943 def redirect_from_error(self, request_dict, response, operation, **kwargs):
1944 """
1945 An S3 request sent to the wrong region will return an error that
1946 contains the endpoint the request should be sent to. This handler
1947 will add the redirect information to the signing context and then
1948 redirect the request.
1949 """
1950 if response is None:
1951 # This could be none if there was a ConnectionError or other
1952 # transport error.
1953 return
1955 if self._is_s3_accesspoint(request_dict.get('context', {})):
1956 logger.debug(
1957 'S3 request was previously to an accesspoint, not redirecting.'
1958 )
1959 return
1961 if request_dict.get('context', {}).get('s3_redirected'):
1962 logger.debug(
1963 'S3 request was previously redirected, not redirecting.'
1964 )
1965 return
1967 error = response[1].get('Error', {})
1968 error_code = error.get('Code')
1969 response_metadata = response[1].get('ResponseMetadata', {})
1971 # We have to account for 400 responses because
1972 # if we sign a Head* request with the wrong region,
1973 # we'll get a 400 Bad Request but we won't get a
1974 # body saying it's an "AuthorizationHeaderMalformed".
1975 is_special_head_object = (
1976 error_code in ('301', '400') and operation.name == 'HeadObject'
1977 )
1978 is_special_head_bucket = (
1979 error_code in ('301', '400')
1980 and operation.name == 'HeadBucket'
1981 and 'x-amz-bucket-region'
1982 in response_metadata.get('HTTPHeaders', {})
1983 )
1984 is_wrong_signing_region = (
1985 error_code == 'AuthorizationHeaderMalformed' and 'Region' in error
1986 )
1987 is_redirect_status = response[0] is not None and response[
1988 0
1989 ].status_code in (301, 302, 307)
1990 is_permanent_redirect = error_code == 'PermanentRedirect'
1991 if not any(
1992 [
1993 is_special_head_object,
1994 is_wrong_signing_region,
1995 is_permanent_redirect,
1996 is_special_head_bucket,
1997 is_redirect_status,
1998 ]
1999 ):
2000 return
2002 bucket = request_dict['context']['signing']['bucket']
2003 client_region = request_dict['context'].get('client_region')
2004 new_region = self.get_bucket_region(bucket, response)
2006 if new_region is None:
2007 logger.debug(
2008 "S3 client configured for region %s but the bucket %s is not "
2009 "in that region and the proper region could not be "
2010 "automatically determined." % (client_region, bucket)
2011 )
2012 return
2014 logger.debug(
2015 "S3 client configured for region %s but the bucket %s is in region"
2016 " %s; Please configure the proper region to avoid multiple "
2017 "unnecessary redirects and signing attempts."
2018 % (client_region, bucket, new_region)
2019 )
2020 endpoint = self._endpoint_resolver.resolve('s3', new_region)
2021 endpoint = endpoint['endpoint_url']
2023 signing_context = {
2024 'region': new_region,
2025 'bucket': bucket,
2026 'endpoint': endpoint,
2027 }
2028 request_dict['context']['signing'] = signing_context
2030 self._cache[bucket] = signing_context
2031 self.set_request_url(request_dict, request_dict['context'])
2033 request_dict['context']['s3_redirected'] = True
2035 # Return 0 so it doesn't wait to retry
2036 return 0
2038 def get_bucket_region(self, bucket, response):
2039 """
2040 There are multiple potential sources for the new region to redirect to,
2041 but they aren't all universally available for use. This will try to
2042 find region from response elements, but will fall back to calling
2043 HEAD on the bucket if all else fails.
2045 :param bucket: The bucket to find the region for. This is necessary if
2046 the region is not available in the error response.
2047 :param response: A response representing a service request that failed
2048 due to incorrect region configuration.
2049 """
2050 # First try to source the region from the headers.
2051 service_response = response[1]
2052 response_headers = service_response['ResponseMetadata']['HTTPHeaders']
2053 if 'x-amz-bucket-region' in response_headers:
2054 return response_headers['x-amz-bucket-region']
2056 # Next, check the error body
2057 region = service_response.get('Error', {}).get('Region', None)
2058 if region is not None:
2059 return region
2061 # Finally, HEAD the bucket. No other choice sadly.
2062 try:
2063 response = self._client.head_bucket(Bucket=bucket)
2064 headers = response['ResponseMetadata']['HTTPHeaders']
2065 except ClientError as e:
2066 headers = e.response['ResponseMetadata']['HTTPHeaders']
2068 region = headers.get('x-amz-bucket-region', None)
2069 return region
2071 def set_request_url(self, params, context, **kwargs):
2072 endpoint = context.get('signing', {}).get('endpoint', None)
2073 if endpoint is not None:
2074 params['url'] = _get_new_endpoint(params['url'], endpoint, False)
2076 def redirect_from_cache(self, params, context, **kwargs):
2077 """
2078 This handler retrieves a given bucket's signing context from the cache
2079 and adds it into the request context.
2080 """
2081 if self._is_s3_accesspoint(context):
2082 return
2083 bucket = params.get('Bucket')
2084 signing_context = self._cache.get(bucket)
2085 if signing_context is not None:
2086 context['signing'] = signing_context
2087 else:
2088 context['signing'] = {'bucket': bucket}
2090 def _is_s3_accesspoint(self, context):
2091 return 's3_accesspoint' in context
2094class InvalidArnException(ValueError):
2095 pass
2098class ArnParser:
2099 def parse_arn(self, arn):
2100 arn_parts = arn.split(':', 5)
2101 if len(arn_parts) < 6:
2102 raise InvalidArnException(
2103 'Provided ARN: %s must be of the format: '
2104 'arn:partition:service:region:account:resource' % arn
2105 )
2106 return {
2107 'partition': arn_parts[1],
2108 'service': arn_parts[2],
2109 'region': arn_parts[3],
2110 'account': arn_parts[4],
2111 'resource': arn_parts[5],
2112 }
2114 @staticmethod
2115 def is_arn(value):
2116 if not isinstance(value, str) or not value.startswith('arn:'):
2117 return False
2118 arn_parser = ArnParser()
2119 try:
2120 arn_parser.parse_arn(value)
2121 return True
2122 except InvalidArnException:
2123 return False
2126class S3ArnParamHandler:
2127 _RESOURCE_REGEX = re.compile(
2128 r'^(?P<resource_type>accesspoint|outpost)[/:](?P<resource_name>.+)$'
2129 )
2130 _OUTPOST_RESOURCE_REGEX = re.compile(
2131 r'^(?P<outpost_name>[a-zA-Z0-9\-]{1,63})[/:]accesspoint[/:]'
2132 r'(?P<accesspoint_name>[a-zA-Z0-9\-]{1,63}$)'
2133 )
2134 _BLACKLISTED_OPERATIONS = ['CreateBucket']
2136 def __init__(self, arn_parser=None):
2137 self._arn_parser = arn_parser
2138 if arn_parser is None:
2139 self._arn_parser = ArnParser()
2141 def register(self, event_emitter):
2142 event_emitter.register('before-parameter-build.s3', self.handle_arn)
2144 def handle_arn(self, params, model, context, **kwargs):
2145 if model.name in self._BLACKLISTED_OPERATIONS:
2146 return
2147 arn_details = self._get_arn_details_from_bucket_param(params)
2148 if arn_details is None:
2149 return
2150 if arn_details['resource_type'] == 'accesspoint':
2151 self._store_accesspoint(params, context, arn_details)
2152 elif arn_details['resource_type'] == 'outpost':
2153 self._store_outpost(params, context, arn_details)
2155 def _get_arn_details_from_bucket_param(self, params):
2156 if 'Bucket' in params:
2157 try:
2158 arn = params['Bucket']
2159 arn_details = self._arn_parser.parse_arn(arn)
2160 self._add_resource_type_and_name(arn, arn_details)
2161 return arn_details
2162 except InvalidArnException:
2163 pass
2164 return None
2166 def _add_resource_type_and_name(self, arn, arn_details):
2167 match = self._RESOURCE_REGEX.match(arn_details['resource'])
2168 if match:
2169 arn_details['resource_type'] = match.group('resource_type')
2170 arn_details['resource_name'] = match.group('resource_name')
2171 else:
2172 raise UnsupportedS3ArnError(arn=arn)
2174 def _store_accesspoint(self, params, context, arn_details):
2175 # Ideally the access-point would be stored as a parameter in the
2176 # request where the serializer would then know how to serialize it,
2177 # but access-points are not modeled in S3 operations so it would fail
2178 # validation. Instead, we set the access-point to the bucket parameter
2179 # to have some value set when serializing the request and additional
2180 # information on the context from the arn to use in forming the
2181 # access-point endpoint.
2182 params['Bucket'] = arn_details['resource_name']
2183 context['s3_accesspoint'] = {
2184 'name': arn_details['resource_name'],
2185 'account': arn_details['account'],
2186 'partition': arn_details['partition'],
2187 'region': arn_details['region'],
2188 'service': arn_details['service'],
2189 }
2191 def _store_outpost(self, params, context, arn_details):
2192 resource_name = arn_details['resource_name']
2193 match = self._OUTPOST_RESOURCE_REGEX.match(resource_name)
2194 if not match:
2195 raise UnsupportedOutpostResourceError(resource_name=resource_name)
2196 # Because we need to set the bucket name to something to pass
2197 # validation we're going to use the access point name to be consistent
2198 # with normal access point arns.
2199 accesspoint_name = match.group('accesspoint_name')
2200 params['Bucket'] = accesspoint_name
2201 context['s3_accesspoint'] = {
2202 'outpost_name': match.group('outpost_name'),
2203 'name': accesspoint_name,
2204 'account': arn_details['account'],
2205 'partition': arn_details['partition'],
2206 'region': arn_details['region'],
2207 'service': arn_details['service'],
2208 }
2211class S3EndpointSetter:
2212 _DEFAULT_PARTITION = 'aws'
2213 _DEFAULT_DNS_SUFFIX = 'amazonaws.com'
2215 def __init__(
2216 self,
2217 endpoint_resolver,
2218 region=None,
2219 s3_config=None,
2220 endpoint_url=None,
2221 partition=None,
2222 use_fips_endpoint=False,
2223 ):
2224 # This is calling the endpoint_resolver in regions.py
2225 self._endpoint_resolver = endpoint_resolver
2226 self._region = region
2227 self._s3_config = s3_config
2228 self._use_fips_endpoint = use_fips_endpoint
2229 if s3_config is None:
2230 self._s3_config = {}
2231 self._endpoint_url = endpoint_url
2232 self._partition = partition
2233 if partition is None:
2234 self._partition = self._DEFAULT_PARTITION
2236 def register(self, event_emitter):
2237 event_emitter.register('before-sign.s3', self.set_endpoint)
2238 event_emitter.register('choose-signer.s3', self.set_signer)
2239 event_emitter.register(
2240 'before-call.s3.WriteGetObjectResponse',
2241 self.update_endpoint_to_s3_object_lambda,
2242 )
2244 def update_endpoint_to_s3_object_lambda(self, params, context, **kwargs):
2245 if self._use_accelerate_endpoint:
2246 raise UnsupportedS3ConfigurationError(
2247 msg='S3 client does not support accelerate endpoints for S3 Object Lambda operations',
2248 )
2250 self._override_signing_name(context, 's3-object-lambda')
2251 if self._endpoint_url:
2252 # Only update the url if an explicit url was not provided
2253 return
2255 resolver = self._endpoint_resolver
2256 # Constructing endpoints as s3-object-lambda as region
2257 resolved = resolver.construct_endpoint(
2258 's3-object-lambda', self._region
2259 )
2261 # Ideally we would be able to replace the endpoint before
2262 # serialization but there's no event to do that currently
2263 # host_prefix is all the arn/bucket specs
2264 new_endpoint = 'https://{host_prefix}{hostname}'.format(
2265 host_prefix=params['host_prefix'],
2266 hostname=resolved['hostname'],
2267 )
2269 params['url'] = _get_new_endpoint(params['url'], new_endpoint, False)
2271 def set_endpoint(self, request, **kwargs):
2272 if self._use_accesspoint_endpoint(request):
2273 self._validate_accesspoint_supported(request)
2274 self._validate_fips_supported(request)
2275 self._validate_global_regions(request)
2276 region_name = self._resolve_region_for_accesspoint_endpoint(
2277 request
2278 )
2279 self._resolve_signing_name_for_accesspoint_endpoint(request)
2280 self._switch_to_accesspoint_endpoint(request, region_name)
2281 return
2282 if self._use_accelerate_endpoint:
2283 if self._use_fips_endpoint:
2284 raise UnsupportedS3ConfigurationError(
2285 msg=(
2286 'Client is configured to use the FIPS psuedo region '
2287 'for "%s", but S3 Accelerate does not have any FIPS '
2288 'compatible endpoints.' % (self._region)
2289 )
2290 )
2291 switch_host_s3_accelerate(request=request, **kwargs)
2292 if self._s3_addressing_handler:
2293 self._s3_addressing_handler(request=request, **kwargs)
2295 def _use_accesspoint_endpoint(self, request):
2296 return 's3_accesspoint' in request.context
2298 def _validate_fips_supported(self, request):
2299 if not self._use_fips_endpoint:
2300 return
2301 if 'fips' in request.context['s3_accesspoint']['region']:
2302 raise UnsupportedS3AccesspointConfigurationError(
2303 msg={'Invalid ARN, FIPS region not allowed in ARN.'}
2304 )
2305 if 'outpost_name' in request.context['s3_accesspoint']:
2306 raise UnsupportedS3AccesspointConfigurationError(
2307 msg=(
2308 'Client is configured to use the FIPS psuedo-region "%s", '
2309 'but outpost ARNs do not support FIPS endpoints.'
2310 % (self._region)
2311 )
2312 )
2313 # Transforming psuedo region to actual region
2314 accesspoint_region = request.context['s3_accesspoint']['region']
2315 if accesspoint_region != self._region:
2316 if not self._s3_config.get('use_arn_region', True):
2317 # TODO: Update message to reflect use_arn_region
2318 # is not set
2319 raise UnsupportedS3AccesspointConfigurationError(
2320 msg=(
2321 'Client is configured to use the FIPS psuedo-region '
2322 'for "%s", but the access-point ARN provided is for '
2323 'the "%s" region. For clients using a FIPS '
2324 'psuedo-region calls to access-point ARNs in another '
2325 'region are not allowed.'
2326 % (self._region, accesspoint_region)
2327 )
2328 )
2330 def _validate_global_regions(self, request):
2331 if self._s3_config.get('use_arn_region', True):
2332 return
2333 if self._region in ['aws-global', 's3-external-1']:
2334 raise UnsupportedS3AccesspointConfigurationError(
2335 msg=(
2336 'Client is configured to use the global psuedo-region '
2337 '"%s". When providing access-point ARNs a regional '
2338 'endpoint must be specified.' % self._region
2339 )
2340 )
2342 def _validate_accesspoint_supported(self, request):
2343 if self._use_accelerate_endpoint:
2344 raise UnsupportedS3AccesspointConfigurationError(
2345 msg=(
2346 'Client does not support s3 accelerate configuration '
2347 'when an access-point ARN is specified.'
2348 )
2349 )
2350 request_partition = request.context['s3_accesspoint']['partition']
2351 if request_partition != self._partition:
2352 raise UnsupportedS3AccesspointConfigurationError(
2353 msg=(
2354 'Client is configured for "%s" partition, but access-point'
2355 ' ARN provided is for "%s" partition. The client and '
2356 ' access-point partition must be the same.'
2357 % (self._partition, request_partition)
2358 )
2359 )
2360 s3_service = request.context['s3_accesspoint'].get('service')
2361 if s3_service == 's3-object-lambda' and self._s3_config.get(
2362 'use_dualstack_endpoint'
2363 ):
2364 raise UnsupportedS3AccesspointConfigurationError(
2365 msg=(
2366 'Client does not support s3 dualstack configuration '
2367 'when an S3 Object Lambda access point ARN is specified.'
2368 )
2369 )
2370 outpost_name = request.context['s3_accesspoint'].get('outpost_name')
2371 if outpost_name and self._s3_config.get('use_dualstack_endpoint'):
2372 raise UnsupportedS3AccesspointConfigurationError(
2373 msg=(
2374 'Client does not support s3 dualstack configuration '
2375 'when an outpost ARN is specified.'
2376 )
2377 )
2378 self._validate_mrap_s3_config(request)
2380 def _validate_mrap_s3_config(self, request):
2381 if not is_global_accesspoint(request.context):
2382 return
2383 if self._s3_config.get('s3_disable_multiregion_access_points'):
2384 raise UnsupportedS3AccesspointConfigurationError(
2385 msg=(
2386 'Invalid configuration, Multi-Region Access Point '
2387 'ARNs are disabled.'
2388 )
2389 )
2390 elif self._s3_config.get('use_dualstack_endpoint'):
2391 raise UnsupportedS3AccesspointConfigurationError(
2392 msg=(
2393 'Client does not support s3 dualstack configuration '
2394 'when a Multi-Region Access Point ARN is specified.'
2395 )
2396 )
2398 def _resolve_region_for_accesspoint_endpoint(self, request):
2399 if is_global_accesspoint(request.context):
2400 # Requests going to MRAP endpoints MUST be set to any (*) region.
2401 self._override_signing_region(request, '*')
2402 elif self._s3_config.get('use_arn_region', True):
2403 accesspoint_region = request.context['s3_accesspoint']['region']
2404 # If we are using the region from the access point,
2405 # we will also want to make sure that we set it as the
2406 # signing region as well
2407 self._override_signing_region(request, accesspoint_region)
2408 return accesspoint_region
2409 return self._region
2411 def set_signer(self, context, **kwargs):
2412 if is_global_accesspoint(context):
2413 if HAS_CRT:
2414 return 's3v4a'
2415 else:
2416 raise MissingDependencyException(
2417 msg="Using S3 with an MRAP arn requires an additional "
2418 "dependency. You will need to pip install "
2419 "botocore[crt] before proceeding."
2420 )
2422 def _resolve_signing_name_for_accesspoint_endpoint(self, request):
2423 accesspoint_service = request.context['s3_accesspoint']['service']
2424 self._override_signing_name(request.context, accesspoint_service)
2426 def _switch_to_accesspoint_endpoint(self, request, region_name):
2427 original_components = urlsplit(request.url)
2428 accesspoint_endpoint = urlunsplit(
2429 (
2430 original_components.scheme,
2431 self._get_netloc(request.context, region_name),
2432 self._get_accesspoint_path(
2433 original_components.path, request.context
2434 ),
2435 original_components.query,
2436 '',
2437 )
2438 )
2439 logger.debug(
2440 f'Updating URI from {request.url} to {accesspoint_endpoint}'
2441 )
2442 request.url = accesspoint_endpoint
2444 def _get_netloc(self, request_context, region_name):
2445 if is_global_accesspoint(request_context):
2446 return self._get_mrap_netloc(request_context)
2447 else:
2448 return self._get_accesspoint_netloc(request_context, region_name)
2450 def _get_mrap_netloc(self, request_context):
2451 s3_accesspoint = request_context['s3_accesspoint']
2452 region_name = 's3-global'
2453 mrap_netloc_components = [s3_accesspoint['name']]
2454 if self._endpoint_url:
2455 endpoint_url_netloc = urlsplit(self._endpoint_url).netloc
2456 mrap_netloc_components.append(endpoint_url_netloc)
2457 else:
2458 partition = s3_accesspoint['partition']
2459 mrap_netloc_components.extend(
2460 [
2461 'accesspoint',
2462 region_name,
2463 self._get_partition_dns_suffix(partition),
2464 ]
2465 )
2466 return '.'.join(mrap_netloc_components)
2468 def _get_accesspoint_netloc(self, request_context, region_name):
2469 s3_accesspoint = request_context['s3_accesspoint']
2470 accesspoint_netloc_components = [
2471 '{}-{}'.format(s3_accesspoint['name'], s3_accesspoint['account']),
2472 ]
2473 outpost_name = s3_accesspoint.get('outpost_name')
2474 if self._endpoint_url:
2475 if outpost_name:
2476 accesspoint_netloc_components.append(outpost_name)
2477 endpoint_url_netloc = urlsplit(self._endpoint_url).netloc
2478 accesspoint_netloc_components.append(endpoint_url_netloc)
2479 else:
2480 if outpost_name:
2481 outpost_host = [outpost_name, 's3-outposts']
2482 accesspoint_netloc_components.extend(outpost_host)
2483 elif s3_accesspoint['service'] == 's3-object-lambda':
2484 component = self._inject_fips_if_needed(
2485 's3-object-lambda', request_context
2486 )
2487 accesspoint_netloc_components.append(component)
2488 else:
2489 component = self._inject_fips_if_needed(
2490 's3-accesspoint', request_context
2491 )
2492 accesspoint_netloc_components.append(component)
2493 if self._s3_config.get('use_dualstack_endpoint'):
2494 accesspoint_netloc_components.append('dualstack')
2495 accesspoint_netloc_components.extend(
2496 [region_name, self._get_dns_suffix(region_name)]
2497 )
2498 return '.'.join(accesspoint_netloc_components)
2500 def _inject_fips_if_needed(self, component, request_context):
2501 if self._use_fips_endpoint:
2502 return '%s-fips' % component
2503 return component
2505 def _get_accesspoint_path(self, original_path, request_context):
2506 # The Bucket parameter was substituted with the access-point name as
2507 # some value was required in serializing the bucket name. Now that
2508 # we are making the request directly to the access point, we will
2509 # want to remove that access-point name from the path.
2510 name = request_context['s3_accesspoint']['name']
2511 # All S3 operations require at least a / in their path.
2512 return original_path.replace('/' + name, '', 1) or '/'
2514 def _get_partition_dns_suffix(self, partition_name):
2515 dns_suffix = self._endpoint_resolver.get_partition_dns_suffix(
2516 partition_name
2517 )
2518 if dns_suffix is None:
2519 dns_suffix = self._DEFAULT_DNS_SUFFIX
2520 return dns_suffix
2522 def _get_dns_suffix(self, region_name):
2523 resolved = self._endpoint_resolver.construct_endpoint(
2524 's3', region_name
2525 )
2526 dns_suffix = self._DEFAULT_DNS_SUFFIX
2527 if resolved and 'dnsSuffix' in resolved:
2528 dns_suffix = resolved['dnsSuffix']
2529 return dns_suffix
2531 def _override_signing_region(self, request, region_name):
2532 signing_context = request.context.get('signing', {})
2533 # S3SigV4Auth will use the context['signing']['region'] value to
2534 # sign with if present. This is used by the Bucket redirector
2535 # as well but we should be fine because the redirector is never
2536 # used in combination with the accesspoint setting logic.
2537 signing_context['region'] = region_name
2538 request.context['signing'] = signing_context
2540 def _override_signing_name(self, context, signing_name):
2541 signing_context = context.get('signing', {})
2542 # S3SigV4Auth will use the context['signing']['signing_name'] value to
2543 # sign with if present. This is used by the Bucket redirector
2544 # as well but we should be fine because the redirector is never
2545 # used in combination with the accesspoint setting logic.
2546 signing_context['signing_name'] = signing_name
2547 context['signing'] = signing_context
2549 @CachedProperty
2550 def _use_accelerate_endpoint(self):
2551 # Enable accelerate if the configuration is set to to true or the
2552 # endpoint being used matches one of the accelerate endpoints.
2554 # Accelerate has been explicitly configured.
2555 if self._s3_config.get('use_accelerate_endpoint'):
2556 return True
2558 # Accelerate mode is turned on automatically if an endpoint url is
2559 # provided that matches the accelerate scheme.
2560 if self._endpoint_url is None:
2561 return False
2563 # Accelerate is only valid for Amazon endpoints.
2564 netloc = urlsplit(self._endpoint_url).netloc
2565 if not netloc.endswith('amazonaws.com'):
2566 return False
2568 # The first part of the url should always be s3-accelerate.
2569 parts = netloc.split('.')
2570 if parts[0] != 's3-accelerate':
2571 return False
2573 # Url parts between 's3-accelerate' and 'amazonaws.com' which
2574 # represent different url features.
2575 feature_parts = parts[1:-2]
2577 # There should be no duplicate url parts.
2578 if len(feature_parts) != len(set(feature_parts)):
2579 return False
2581 # Remaining parts must all be in the whitelist.
2582 return all(p in S3_ACCELERATE_WHITELIST for p in feature_parts)
2584 @CachedProperty
2585 def _addressing_style(self):
2586 # Use virtual host style addressing if accelerate is enabled or if
2587 # the given endpoint url is an accelerate endpoint.
2588 if self._use_accelerate_endpoint:
2589 return 'virtual'
2591 # If a particular addressing style is configured, use it.
2592 configured_addressing_style = self._s3_config.get('addressing_style')
2593 if configured_addressing_style:
2594 return configured_addressing_style
2596 @CachedProperty
2597 def _s3_addressing_handler(self):
2598 # If virtual host style was configured, use it regardless of whether
2599 # or not the bucket looks dns compatible.
2600 if self._addressing_style == 'virtual':
2601 logger.debug("Using S3 virtual host style addressing.")
2602 return switch_to_virtual_host_style
2604 # If path style is configured, no additional steps are needed. If
2605 # endpoint_url was specified, don't default to virtual. We could
2606 # potentially default provided endpoint urls to virtual hosted
2607 # style, but for now it is avoided.
2608 if self._addressing_style == 'path' or self._endpoint_url is not None:
2609 logger.debug("Using S3 path style addressing.")
2610 return None
2612 logger.debug(
2613 "Defaulting to S3 virtual host style addressing with "
2614 "path style addressing fallback."
2615 )
2617 # By default, try to use virtual style with path fallback.
2618 return fix_s3_host
2621class S3ControlEndpointSetter:
2622 _DEFAULT_PARTITION = 'aws'
2623 _DEFAULT_DNS_SUFFIX = 'amazonaws.com'
2624 _HOST_LABEL_REGEX = re.compile(r'^[a-zA-Z0-9\-]{1,63}$')
2626 def __init__(
2627 self,
2628 endpoint_resolver,
2629 region=None,
2630 s3_config=None,
2631 endpoint_url=None,
2632 partition=None,
2633 use_fips_endpoint=False,
2634 ):
2635 self._endpoint_resolver = endpoint_resolver
2636 self._region = region
2637 self._s3_config = s3_config
2638 self._use_fips_endpoint = use_fips_endpoint
2639 if s3_config is None:
2640 self._s3_config = {}
2641 self._endpoint_url = endpoint_url
2642 self._partition = partition
2643 if partition is None:
2644 self._partition = self._DEFAULT_PARTITION
2646 def register(self, event_emitter):
2647 event_emitter.register('before-sign.s3-control', self.set_endpoint)
2649 def set_endpoint(self, request, **kwargs):
2650 if self._use_endpoint_from_arn_details(request):
2651 self._validate_endpoint_from_arn_details_supported(request)
2652 region_name = self._resolve_region_from_arn_details(request)
2653 self._resolve_signing_name_from_arn_details(request)
2654 self._resolve_endpoint_from_arn_details(request, region_name)
2655 self._add_headers_from_arn_details(request)
2656 elif self._use_endpoint_from_outpost_id(request):
2657 self._validate_outpost_redirection_valid(request)
2658 self._override_signing_name(request, 's3-outposts')
2659 new_netloc = self._construct_outpost_endpoint(self._region)
2660 self._update_request_netloc(request, new_netloc)
2662 def _use_endpoint_from_arn_details(self, request):
2663 return 'arn_details' in request.context
2665 def _use_endpoint_from_outpost_id(self, request):
2666 return 'outpost_id' in request.context
2668 def _validate_endpoint_from_arn_details_supported(self, request):
2669 if 'fips' in request.context['arn_details']['region']:
2670 raise UnsupportedS3ControlArnError(
2671 arn=request.context['arn_details']['original'],
2672 msg='Invalid ARN, FIPS region not allowed in ARN.',
2673 )
2674 if not self._s3_config.get('use_arn_region', False):
2675 arn_region = request.context['arn_details']['region']
2676 if arn_region != self._region:
2677 error_msg = (
2678 'The use_arn_region configuration is disabled but '
2679 'received arn for "%s" when the client is configured '
2680 'to use "%s"'
2681 ) % (arn_region, self._region)
2682 raise UnsupportedS3ControlConfigurationError(msg=error_msg)
2683 request_partion = request.context['arn_details']['partition']
2684 if request_partion != self._partition:
2685 raise UnsupportedS3ControlConfigurationError(
2686 msg=(
2687 'Client is configured for "%s" partition, but arn '
2688 'provided is for "%s" partition. The client and '
2689 'arn partition must be the same.'
2690 % (self._partition, request_partion)
2691 )
2692 )
2693 if self._s3_config.get('use_accelerate_endpoint'):
2694 raise UnsupportedS3ControlConfigurationError(
2695 msg='S3 control client does not support accelerate endpoints',
2696 )
2697 if 'outpost_name' in request.context['arn_details']:
2698 self._validate_outpost_redirection_valid(request)
2700 def _validate_outpost_redirection_valid(self, request):
2701 if self._s3_config.get('use_dualstack_endpoint'):
2702 raise UnsupportedS3ControlConfigurationError(
2703 msg=(
2704 'Client does not support s3 dualstack configuration '
2705 'when an outpost is specified.'
2706 )
2707 )
2709 def _resolve_region_from_arn_details(self, request):
2710 if self._s3_config.get('use_arn_region', False):
2711 arn_region = request.context['arn_details']['region']
2712 # If we are using the region from the expanded arn, we will also
2713 # want to make sure that we set it as the signing region as well
2714 self._override_signing_region(request, arn_region)
2715 return arn_region
2716 return self._region
2718 def _resolve_signing_name_from_arn_details(self, request):
2719 arn_service = request.context['arn_details']['service']
2720 self._override_signing_name(request, arn_service)
2721 return arn_service
2723 def _resolve_endpoint_from_arn_details(self, request, region_name):
2724 new_netloc = self._resolve_netloc_from_arn_details(
2725 request, region_name
2726 )
2727 self._update_request_netloc(request, new_netloc)
2729 def _update_request_netloc(self, request, new_netloc):
2730 original_components = urlsplit(request.url)
2731 arn_details_endpoint = urlunsplit(
2732 (
2733 original_components.scheme,
2734 new_netloc,
2735 original_components.path,
2736 original_components.query,
2737 '',
2738 )
2739 )
2740 logger.debug(
2741 f'Updating URI from {request.url} to {arn_details_endpoint}'
2742 )
2743 request.url = arn_details_endpoint
2745 def _resolve_netloc_from_arn_details(self, request, region_name):
2746 arn_details = request.context['arn_details']
2747 if 'outpost_name' in arn_details:
2748 return self._construct_outpost_endpoint(region_name)
2749 account = arn_details['account']
2750 return self._construct_s3_control_endpoint(region_name, account)
2752 def _is_valid_host_label(self, label):
2753 return self._HOST_LABEL_REGEX.match(label)
2755 def _validate_host_labels(self, *labels):
2756 for label in labels:
2757 if not self._is_valid_host_label(label):
2758 raise InvalidHostLabelError(label=label)
2760 def _construct_s3_control_endpoint(self, region_name, account):
2761 self._validate_host_labels(region_name, account)
2762 if self._endpoint_url:
2763 endpoint_url_netloc = urlsplit(self._endpoint_url).netloc
2764 netloc = [account, endpoint_url_netloc]
2765 else:
2766 netloc = [
2767 account,
2768 's3-control',
2769 ]
2770 self._add_dualstack(netloc)
2771 dns_suffix = self._get_dns_suffix(region_name)
2772 netloc.extend([region_name, dns_suffix])
2773 return self._construct_netloc(netloc)
2775 def _construct_outpost_endpoint(self, region_name):
2776 self._validate_host_labels(region_name)
2777 if self._endpoint_url:
2778 return urlsplit(self._endpoint_url).netloc
2779 else:
2780 netloc = [
2781 's3-outposts',
2782 region_name,
2783 self._get_dns_suffix(region_name),
2784 ]
2785 self._add_fips(netloc)
2786 return self._construct_netloc(netloc)
2788 def _construct_netloc(self, netloc):
2789 return '.'.join(netloc)
2791 def _add_fips(self, netloc):
2792 if self._use_fips_endpoint:
2793 netloc[0] = netloc[0] + '-fips'
2795 def _add_dualstack(self, netloc):
2796 if self._s3_config.get('use_dualstack_endpoint'):
2797 netloc.append('dualstack')
2799 def _get_dns_suffix(self, region_name):
2800 resolved = self._endpoint_resolver.construct_endpoint(
2801 's3', region_name
2802 )
2803 dns_suffix = self._DEFAULT_DNS_SUFFIX
2804 if resolved and 'dnsSuffix' in resolved:
2805 dns_suffix = resolved['dnsSuffix']
2806 return dns_suffix
2808 def _override_signing_region(self, request, region_name):
2809 signing_context = request.context.get('signing', {})
2810 # S3SigV4Auth will use the context['signing']['region'] value to
2811 # sign with if present. This is used by the Bucket redirector
2812 # as well but we should be fine because the redirector is never
2813 # used in combination with the accesspoint setting logic.
2814 signing_context['region'] = region_name
2815 request.context['signing'] = signing_context
2817 def _override_signing_name(self, request, signing_name):
2818 signing_context = request.context.get('signing', {})
2819 # S3SigV4Auth will use the context['signing']['signing_name'] value to
2820 # sign with if present. This is used by the Bucket redirector
2821 # as well but we should be fine because the redirector is never
2822 # used in combination with the accesspoint setting logic.
2823 signing_context['signing_name'] = signing_name
2824 request.context['signing'] = signing_context
2826 def _add_headers_from_arn_details(self, request):
2827 arn_details = request.context['arn_details']
2828 outpost_name = arn_details.get('outpost_name')
2829 if outpost_name:
2830 self._add_outpost_id_header(request, outpost_name)
2832 def _add_outpost_id_header(self, request, outpost_name):
2833 request.headers['x-amz-outpost-id'] = outpost_name
2836class S3ControlArnParamHandler:
2837 """This handler has been replaced by S3ControlArnParamHandlerv2. The
2838 original version remains in place for any third-party importers.
2839 """
2841 _RESOURCE_SPLIT_REGEX = re.compile(r'[/:]')
2843 def __init__(self, arn_parser=None):
2844 self._arn_parser = arn_parser
2845 if arn_parser is None:
2846 self._arn_parser = ArnParser()
2847 warnings.warn(
2848 'The S3ControlArnParamHandler class has been deprecated for a new '
2849 'internal replacement. A future version of botocore may remove '
2850 'this class.',
2851 category=FutureWarning,
2852 )
2854 def register(self, event_emitter):
2855 event_emitter.register(
2856 'before-parameter-build.s3-control',
2857 self.handle_arn,
2858 )
2860 def handle_arn(self, params, model, context, **kwargs):
2861 if model.name in ('CreateBucket', 'ListRegionalBuckets'):
2862 # CreateBucket and ListRegionalBuckets are special cases that do
2863 # not obey ARN based redirection but will redirect based off of the
2864 # presence of the OutpostId parameter
2865 self._handle_outpost_id_param(params, model, context)
2866 else:
2867 self._handle_name_param(params, model, context)
2868 self._handle_bucket_param(params, model, context)
2870 def _get_arn_details_from_param(self, params, param_name):
2871 if param_name not in params:
2872 return None
2873 try:
2874 arn = params[param_name]
2875 arn_details = self._arn_parser.parse_arn(arn)
2876 arn_details['original'] = arn
2877 arn_details['resources'] = self._split_resource(arn_details)
2878 return arn_details
2879 except InvalidArnException:
2880 return None
2882 def _split_resource(self, arn_details):
2883 return self._RESOURCE_SPLIT_REGEX.split(arn_details['resource'])
2885 def _override_account_id_param(self, params, arn_details):
2886 account_id = arn_details['account']
2887 if 'AccountId' in params and params['AccountId'] != account_id:
2888 error_msg = (
2889 'Account ID in arn does not match the AccountId parameter '
2890 'provided: "%s"'
2891 ) % params['AccountId']
2892 raise UnsupportedS3ControlArnError(
2893 arn=arn_details['original'],
2894 msg=error_msg,
2895 )
2896 params['AccountId'] = account_id
2898 def _handle_outpost_id_param(self, params, model, context):
2899 if 'OutpostId' not in params:
2900 return
2901 context['outpost_id'] = params['OutpostId']
2903 def _handle_name_param(self, params, model, context):
2904 # CreateAccessPoint is a special case that does not expand Name
2905 if model.name == 'CreateAccessPoint':
2906 return
2907 arn_details = self._get_arn_details_from_param(params, 'Name')
2908 if arn_details is None:
2909 return
2910 if self._is_outpost_accesspoint(arn_details):
2911 self._store_outpost_accesspoint(params, context, arn_details)
2912 else:
2913 error_msg = 'The Name parameter does not support the provided ARN'
2914 raise UnsupportedS3ControlArnError(
2915 arn=arn_details['original'],
2916 msg=error_msg,
2917 )
2919 def _is_outpost_accesspoint(self, arn_details):
2920 if arn_details['service'] != 's3-outposts':
2921 return False
2922 resources = arn_details['resources']
2923 if len(resources) != 4:
2924 return False
2925 # Resource must be of the form outpost/op-123/accesspoint/name
2926 return resources[0] == 'outpost' and resources[2] == 'accesspoint'
2928 def _store_outpost_accesspoint(self, params, context, arn_details):
2929 self._override_account_id_param(params, arn_details)
2930 accesspoint_name = arn_details['resources'][3]
2931 params['Name'] = accesspoint_name
2932 arn_details['accesspoint_name'] = accesspoint_name
2933 arn_details['outpost_name'] = arn_details['resources'][1]
2934 context['arn_details'] = arn_details
2936 def _handle_bucket_param(self, params, model, context):
2937 arn_details = self._get_arn_details_from_param(params, 'Bucket')
2938 if arn_details is None:
2939 return
2940 if self._is_outpost_bucket(arn_details):
2941 self._store_outpost_bucket(params, context, arn_details)
2942 else:
2943 error_msg = (
2944 'The Bucket parameter does not support the provided ARN'
2945 )
2946 raise UnsupportedS3ControlArnError(
2947 arn=arn_details['original'],
2948 msg=error_msg,
2949 )
2951 def _is_outpost_bucket(self, arn_details):
2952 if arn_details['service'] != 's3-outposts':
2953 return False
2954 resources = arn_details['resources']
2955 if len(resources) != 4:
2956 return False
2957 # Resource must be of the form outpost/op-123/bucket/name
2958 return resources[0] == 'outpost' and resources[2] == 'bucket'
2960 def _store_outpost_bucket(self, params, context, arn_details):
2961 self._override_account_id_param(params, arn_details)
2962 bucket_name = arn_details['resources'][3]
2963 params['Bucket'] = bucket_name
2964 arn_details['bucket_name'] = bucket_name
2965 arn_details['outpost_name'] = arn_details['resources'][1]
2966 context['arn_details'] = arn_details
2969class S3ControlArnParamHandlerv2(S3ControlArnParamHandler):
2970 """Updated version of S3ControlArnParamHandler for use when
2971 EndpointRulesetResolver is in use for endpoint resolution.
2973 This class is considered private and subject to abrupt breaking changes or
2974 removal without prior announcement. Please do not use it directly.
2975 """
2977 def __init__(self, arn_parser=None):
2978 self._arn_parser = arn_parser
2979 if arn_parser is None:
2980 self._arn_parser = ArnParser()
2982 def register(self, event_emitter):
2983 event_emitter.register(
2984 'before-endpoint-resolution.s3-control',
2985 self.handle_arn,
2986 )
2988 def _handle_name_param(self, params, model, context):
2989 # CreateAccessPoint is a special case that does not expand Name
2990 if model.name == 'CreateAccessPoint':
2991 return
2992 arn_details = self._get_arn_details_from_param(params, 'Name')
2993 if arn_details is None:
2994 return
2995 self._raise_for_fips_pseudo_region(arn_details)
2996 self._raise_for_accelerate_endpoint(context)
2997 if self._is_outpost_accesspoint(arn_details):
2998 self._store_outpost_accesspoint(params, context, arn_details)
2999 else:
3000 error_msg = 'The Name parameter does not support the provided ARN'
3001 raise UnsupportedS3ControlArnError(
3002 arn=arn_details['original'],
3003 msg=error_msg,
3004 )
3006 def _store_outpost_accesspoint(self, params, context, arn_details):
3007 self._override_account_id_param(params, arn_details)
3009 def _handle_bucket_param(self, params, model, context):
3010 arn_details = self._get_arn_details_from_param(params, 'Bucket')
3011 if arn_details is None:
3012 return
3013 self._raise_for_fips_pseudo_region(arn_details)
3014 self._raise_for_accelerate_endpoint(context)
3015 if self._is_outpost_bucket(arn_details):
3016 self._store_outpost_bucket(params, context, arn_details)
3017 else:
3018 error_msg = (
3019 'The Bucket parameter does not support the provided ARN'
3020 )
3021 raise UnsupportedS3ControlArnError(
3022 arn=arn_details['original'],
3023 msg=error_msg,
3024 )
3026 def _store_outpost_bucket(self, params, context, arn_details):
3027 self._override_account_id_param(params, arn_details)
3029 def _raise_for_fips_pseudo_region(self, arn_details):
3030 # FIPS pseudo region names cannot be used in ARNs
3031 arn_region = arn_details['region']
3032 if arn_region.startswith('fips-') or arn_region.endswith('fips-'):
3033 raise UnsupportedS3ControlArnError(
3034 arn=arn_details['original'],
3035 msg='Invalid ARN, FIPS region not allowed in ARN.',
3036 )
3038 def _raise_for_accelerate_endpoint(self, context):
3039 s3_config = context['client_config'].s3 or {}
3040 if s3_config.get('use_accelerate_endpoint'):
3041 raise UnsupportedS3ControlConfigurationError(
3042 msg='S3 control client does not support accelerate endpoints',
3043 )
3046class ContainerMetadataFetcher:
3047 TIMEOUT_SECONDS = 2
3048 RETRY_ATTEMPTS = 3
3049 SLEEP_TIME = 1
3050 IP_ADDRESS = '169.254.170.2'
3051 _ALLOWED_HOSTS = [
3052 IP_ADDRESS,
3053 '169.254.170.23',
3054 'fd00:ec2::23',
3055 'localhost',
3056 ]
3058 def __init__(self, session=None, sleep=time.sleep):
3059 if session is None:
3060 session = botocore.httpsession.URLLib3Session(
3061 timeout=self.TIMEOUT_SECONDS
3062 )
3063 self._session = session
3064 self._sleep = sleep
3066 def retrieve_full_uri(self, full_url, headers=None):
3067 """Retrieve JSON metadata from container metadata.
3069 :type full_url: str
3070 :param full_url: The full URL of the metadata service.
3071 This should include the scheme as well, e.g
3072 "http://localhost:123/foo"
3074 """
3075 self._validate_allowed_url(full_url)
3076 return self._retrieve_credentials(full_url, headers)
3078 def _validate_allowed_url(self, full_url):
3079 parsed = botocore.compat.urlparse(full_url)
3080 if self._is_loopback_address(parsed.hostname):
3081 return
3082 is_whitelisted_host = self._check_if_whitelisted_host(parsed.hostname)
3083 if not is_whitelisted_host:
3084 raise ValueError(
3085 f"Unsupported host '{parsed.hostname}'. Can only retrieve metadata "
3086 f"from a loopback address or one of these hosts: {', '.join(self._ALLOWED_HOSTS)}"
3087 )
3089 def _is_loopback_address(self, hostname):
3090 try:
3091 ip = ip_address(hostname)
3092 return ip.is_loopback
3093 except ValueError:
3094 return False
3096 def _check_if_whitelisted_host(self, host):
3097 if host in self._ALLOWED_HOSTS:
3098 return True
3099 return False
3101 def retrieve_uri(self, relative_uri):
3102 """Retrieve JSON metadata from container metadata.
3104 :type relative_uri: str
3105 :param relative_uri: A relative URI, e.g "/foo/bar?id=123"
3107 :return: The parsed JSON response.
3109 """
3110 full_url = self.full_url(relative_uri)
3111 return self._retrieve_credentials(full_url)
3113 def _retrieve_credentials(self, full_url, extra_headers=None):
3114 headers = {'Accept': 'application/json'}
3115 if extra_headers is not None:
3116 headers.update(extra_headers)
3117 attempts = 0
3118 while True:
3119 try:
3120 return self._get_response(
3121 full_url, headers, self.TIMEOUT_SECONDS
3122 )
3123 except MetadataRetrievalError as e:
3124 logger.debug(
3125 "Received error when attempting to retrieve "
3126 "container metadata: %s",
3127 e,
3128 exc_info=True,
3129 )
3130 self._sleep(self.SLEEP_TIME)
3131 attempts += 1
3132 if attempts >= self.RETRY_ATTEMPTS:
3133 raise
3135 def _get_response(self, full_url, headers, timeout):
3136 try:
3137 AWSRequest = botocore.awsrequest.AWSRequest
3138 request = AWSRequest(method='GET', url=full_url, headers=headers)
3139 response = self._session.send(request.prepare())
3140 response_text = response.content.decode('utf-8')
3141 if response.status_code != 200:
3142 raise MetadataRetrievalError(
3143 error_msg=(
3144 f"Received non 200 response {response.status_code} "
3145 f"from container metadata: {response_text}"
3146 )
3147 )
3148 try:
3149 return json.loads(response_text)
3150 except ValueError:
3151 error_msg = "Unable to parse JSON returned from container metadata services"
3152 logger.debug('%s:%s', error_msg, response_text)
3153 raise MetadataRetrievalError(error_msg=error_msg)
3154 except RETRYABLE_HTTP_ERRORS as e:
3155 error_msg = (
3156 "Received error when attempting to retrieve "
3157 f"container metadata: {e}"
3158 )
3159 raise MetadataRetrievalError(error_msg=error_msg)
3161 def full_url(self, relative_uri):
3162 return f'http://{self.IP_ADDRESS}{relative_uri}'
3165def get_environ_proxies(url):
3166 if should_bypass_proxies(url):
3167 return {}
3168 else:
3169 return getproxies()
3172def should_bypass_proxies(url):
3173 """
3174 Returns whether we should bypass proxies or not.
3175 """
3176 # NOTE: requests allowed for ip/cidr entries in no_proxy env that we don't
3177 # support current as urllib only checks DNS suffix
3178 # If the system proxy settings indicate that this URL should be bypassed,
3179 # don't proxy.
3180 # The proxy_bypass function is incredibly buggy on OS X in early versions
3181 # of Python 2.6, so allow this call to fail. Only catch the specific
3182 # exceptions we've seen, though: this call failing in other ways can reveal
3183 # legitimate problems.
3184 try:
3185 if proxy_bypass(urlparse(url).netloc):
3186 return True
3187 except (TypeError, socket.gaierror):
3188 pass
3190 return False
3193def determine_content_length(body):
3194 # No body, content length of 0
3195 if not body:
3196 return 0
3198 # Try asking the body for it's length
3199 try:
3200 return len(body)
3201 except (AttributeError, TypeError):
3202 pass
3204 # Try getting the length from a seekable stream
3205 if hasattr(body, 'seek') and hasattr(body, 'tell'):
3206 try:
3207 orig_pos = body.tell()
3208 body.seek(0, 2)
3209 end_file_pos = body.tell()
3210 body.seek(orig_pos)
3211 return end_file_pos - orig_pos
3212 except io.UnsupportedOperation:
3213 # in case when body is, for example, io.BufferedIOBase object
3214 # it has "seek" method which throws "UnsupportedOperation"
3215 # exception in such case we want to fall back to "chunked"
3216 # encoding
3217 pass
3218 # Failed to determine the length
3219 return None
3222def get_encoding_from_headers(headers, default='ISO-8859-1'):
3223 """Returns encodings from given HTTP Header Dict.
3225 :param headers: dictionary to extract encoding from.
3226 :param default: default encoding if the content-type is text
3227 """
3229 content_type = headers.get('content-type')
3231 if not content_type:
3232 return None
3234 message = email.message.Message()
3235 message['content-type'] = content_type
3236 charset = message.get_param("charset")
3238 if charset is not None:
3239 return charset
3241 if 'text' in content_type:
3242 return default
3245def calculate_md5(body, **kwargs):
3246 if isinstance(body, (bytes, bytearray)):
3247 binary_md5 = _calculate_md5_from_bytes(body)
3248 else:
3249 binary_md5 = _calculate_md5_from_file(body)
3250 return base64.b64encode(binary_md5).decode('ascii')
3253def _calculate_md5_from_bytes(body_bytes):
3254 md5 = get_md5(body_bytes)
3255 return md5.digest()
3258def _calculate_md5_from_file(fileobj):
3259 start_position = fileobj.tell()
3260 md5 = get_md5()
3261 for chunk in iter(lambda: fileobj.read(1024 * 1024), b''):
3262 md5.update(chunk)
3263 fileobj.seek(start_position)
3264 return md5.digest()
3267def _is_s3express_request(params):
3268 endpoint_properties = params.get('context', {}).get(
3269 'endpoint_properties', {}
3270 )
3271 return endpoint_properties.get('backend') == 'S3Express'
3274def _has_checksum_header(params):
3275 headers = params['headers']
3276 # If a user provided Content-MD5 is present,
3277 # don't try to compute a new one.
3278 if 'Content-MD5' in headers:
3279 return True
3281 # If a header matching the x-amz-checksum-* pattern is present, we
3282 # assume a checksum has already been provided and an md5 is not needed
3283 for header in headers:
3284 if CHECKSUM_HEADER_PATTERN.match(header):
3285 return True
3287 return False
3290def conditionally_calculate_checksum(params, **kwargs):
3291 if not _has_checksum_header(params):
3292 conditionally_calculate_md5(params, **kwargs)
3293 conditionally_enable_crc32(params, **kwargs)
3296def conditionally_enable_crc32(params, **kwargs):
3297 checksum_context = params.get('context', {}).get('checksum', {})
3298 checksum_algorithm = checksum_context.get('request_algorithm')
3299 if (
3300 _is_s3express_request(params)
3301 and params['body'] is not None
3302 and checksum_algorithm in (None, "conditional-md5")
3303 ):
3304 params['context']['checksum'] = {
3305 'request_algorithm': {
3306 'algorithm': 'crc32',
3307 'in': 'header',
3308 'name': 'x-amz-checksum-crc32',
3309 }
3310 }
3313def conditionally_calculate_md5(params, **kwargs):
3314 """Only add a Content-MD5 if the system supports it."""
3315 body = params['body']
3316 checksum_context = params.get('context', {}).get('checksum', {})
3317 checksum_algorithm = checksum_context.get('request_algorithm')
3318 if checksum_algorithm and checksum_algorithm != 'conditional-md5':
3319 # Skip for requests that will have a flexible checksum applied
3320 return
3322 if _has_checksum_header(params):
3323 # Don't add a new header if one is already available.
3324 return
3326 if _is_s3express_request(params):
3327 # S3Express doesn't support MD5
3328 return
3330 if MD5_AVAILABLE and body is not None:
3331 md5_digest = calculate_md5(body, **kwargs)
3332 params['headers']['Content-MD5'] = md5_digest
3335class FileWebIdentityTokenLoader:
3336 def __init__(self, web_identity_token_path, _open=open):
3337 self._web_identity_token_path = web_identity_token_path
3338 self._open = _open
3340 def __call__(self):
3341 with self._open(self._web_identity_token_path) as token_file:
3342 return token_file.read()
3345class SSOTokenLoader:
3346 def __init__(self, cache=None):
3347 if cache is None:
3348 cache = {}
3349 self._cache = cache
3351 def _generate_cache_key(self, start_url, session_name):
3352 input_str = start_url
3353 if session_name is not None:
3354 input_str = session_name
3355 return hashlib.sha1(input_str.encode('utf-8')).hexdigest()
3357 def save_token(self, start_url, token, session_name=None):
3358 cache_key = self._generate_cache_key(start_url, session_name)
3359 self._cache[cache_key] = token
3361 def __call__(self, start_url, session_name=None):
3362 cache_key = self._generate_cache_key(start_url, session_name)
3363 logger.debug(f'Checking for cached token at: {cache_key}')
3364 if cache_key not in self._cache:
3365 name = start_url
3366 if session_name is not None:
3367 name = session_name
3368 error_msg = f'Token for {name} does not exist'
3369 raise SSOTokenLoadError(error_msg=error_msg)
3371 token = self._cache[cache_key]
3372 if 'accessToken' not in token or 'expiresAt' not in token:
3373 error_msg = f'Token for {start_url} is invalid'
3374 raise SSOTokenLoadError(error_msg=error_msg)
3375 return token
3378class EventbridgeSignerSetter:
3379 _DEFAULT_PARTITION = 'aws'
3380 _DEFAULT_DNS_SUFFIX = 'amazonaws.com'
3382 def __init__(self, endpoint_resolver, region=None, endpoint_url=None):
3383 self._endpoint_resolver = endpoint_resolver
3384 self._region = region
3385 self._endpoint_url = endpoint_url
3387 def register(self, event_emitter):
3388 event_emitter.register(
3389 'before-parameter-build.events.PutEvents',
3390 self.check_for_global_endpoint,
3391 )
3392 event_emitter.register(
3393 'before-call.events.PutEvents', self.set_endpoint_url
3394 )
3396 def set_endpoint_url(self, params, context, **kwargs):
3397 if 'eventbridge_endpoint' in context:
3398 endpoint = context['eventbridge_endpoint']
3399 logger.debug(f"Rewriting URL from {params['url']} to {endpoint}")
3400 params['url'] = endpoint
3402 def check_for_global_endpoint(self, params, context, **kwargs):
3403 endpoint = params.get('EndpointId')
3404 if endpoint is None:
3405 return
3407 if len(endpoint) == 0:
3408 raise InvalidEndpointConfigurationError(
3409 msg='EndpointId must not be a zero length string'
3410 )
3412 if not HAS_CRT:
3413 raise MissingDependencyException(
3414 msg="Using EndpointId requires an additional "
3415 "dependency. You will need to pip install "
3416 "botocore[crt] before proceeding."
3417 )
3419 config = context.get('client_config')
3420 endpoint_variant_tags = None
3421 if config is not None:
3422 if config.use_fips_endpoint:
3423 raise InvalidEndpointConfigurationError(
3424 msg="FIPS is not supported with EventBridge "
3425 "multi-region endpoints."
3426 )
3427 if config.use_dualstack_endpoint:
3428 endpoint_variant_tags = ['dualstack']
3430 if self._endpoint_url is None:
3431 # Validate endpoint is a valid hostname component
3432 parts = urlparse(f'https://{endpoint}')
3433 if parts.hostname != endpoint:
3434 raise InvalidEndpointConfigurationError(
3435 msg='EndpointId is not a valid hostname component.'
3436 )
3437 resolved_endpoint = self._get_global_endpoint(
3438 endpoint, endpoint_variant_tags=endpoint_variant_tags
3439 )
3440 else:
3441 resolved_endpoint = self._endpoint_url
3443 context['eventbridge_endpoint'] = resolved_endpoint
3444 context['auth_type'] = 'v4a'
3446 def _get_global_endpoint(self, endpoint, endpoint_variant_tags=None):
3447 resolver = self._endpoint_resolver
3449 partition = resolver.get_partition_for_region(self._region)
3450 if partition is None:
3451 partition = self._DEFAULT_PARTITION
3452 dns_suffix = resolver.get_partition_dns_suffix(
3453 partition, endpoint_variant_tags=endpoint_variant_tags
3454 )
3455 if dns_suffix is None:
3456 dns_suffix = self._DEFAULT_DNS_SUFFIX
3458 return f"https://{endpoint}.endpoint.events.{dns_suffix}/"
3461def is_s3_accelerate_url(url):
3462 """Does the URL match the S3 Accelerate endpoint scheme?
3464 Virtual host naming style with bucket names in the netloc part of the URL
3465 are not allowed by this function.
3466 """
3467 if url is None:
3468 return False
3470 # Accelerate is only valid for Amazon endpoints.
3471 url_parts = urlsplit(url)
3472 if not url_parts.netloc.endswith(
3473 'amazonaws.com'
3474 ) or url_parts.scheme not in ['https', 'http']:
3475 return False
3477 # The first part of the URL must be s3-accelerate.
3478 parts = url_parts.netloc.split('.')
3479 if parts[0] != 's3-accelerate':
3480 return False
3482 # Url parts between 's3-accelerate' and 'amazonaws.com' which
3483 # represent different url features.
3484 feature_parts = parts[1:-2]
3486 # There should be no duplicate URL parts.
3487 if len(feature_parts) != len(set(feature_parts)):
3488 return False
3490 # Remaining parts must all be in the whitelist.
3491 return all(p in S3_ACCELERATE_WHITELIST for p in feature_parts)
3494class JSONFileCache:
3495 """JSON file cache.
3496 This provides a dict like interface that stores JSON serializable
3497 objects.
3498 The objects are serialized to JSON and stored in a file. These
3499 values can be retrieved at a later time.
3500 """
3502 CACHE_DIR = os.path.expanduser(os.path.join('~', '.aws', 'boto', 'cache'))
3504 def __init__(self, working_dir=CACHE_DIR, dumps_func=None):
3505 self._working_dir = working_dir
3506 if dumps_func is None:
3507 dumps_func = self._default_dumps
3508 self._dumps = dumps_func
3510 def _default_dumps(self, obj):
3511 return json.dumps(obj, default=self._serialize_if_needed)
3513 def __contains__(self, cache_key):
3514 actual_key = self._convert_cache_key(cache_key)
3515 return os.path.isfile(actual_key)
3517 def __getitem__(self, cache_key):
3518 """Retrieve value from a cache key."""
3519 actual_key = self._convert_cache_key(cache_key)
3520 try:
3521 with open(actual_key) as f:
3522 return json.load(f)
3523 except (OSError, ValueError):
3524 raise KeyError(cache_key)
3526 def __delitem__(self, cache_key):
3527 actual_key = self._convert_cache_key(cache_key)
3528 try:
3529 key_path = Path(actual_key)
3530 key_path.unlink()
3531 except FileNotFoundError:
3532 raise KeyError(cache_key)
3534 def __setitem__(self, cache_key, value):
3535 full_key = self._convert_cache_key(cache_key)
3536 try:
3537 file_content = self._dumps(value)
3538 except (TypeError, ValueError):
3539 raise ValueError(
3540 f"Value cannot be cached, must be "
3541 f"JSON serializable: {value}"
3542 )
3543 if not os.path.isdir(self._working_dir):
3544 os.makedirs(self._working_dir)
3545 with os.fdopen(
3546 os.open(full_key, os.O_WRONLY | os.O_CREAT, 0o600), 'w'
3547 ) as f:
3548 f.truncate()
3549 f.write(file_content)
3551 def _convert_cache_key(self, cache_key):
3552 full_path = os.path.join(self._working_dir, cache_key + '.json')
3553 return full_path
3555 def _serialize_if_needed(self, value, iso=False):
3556 if isinstance(value, _DatetimeClass):
3557 if iso:
3558 return value.isoformat()
3559 return value.strftime('%Y-%m-%dT%H:%M:%S%Z')
3560 return value
3563def is_s3express_bucket(bucket):
3564 if bucket is None:
3565 return False
3566 return bucket.endswith('--x-s3')
3569# This parameter is not part of the public interface and is subject to abrupt
3570# breaking changes or removal without prior announcement.
3571# Mapping of services that have been renamed for backwards compatibility reasons.
3572# Keys are the previous name that should be allowed, values are the documented
3573# and preferred client name.
3574SERVICE_NAME_ALIASES = {'runtime.sagemaker': 'sagemaker-runtime'}
3577# This parameter is not part of the public interface and is subject to abrupt
3578# breaking changes or removal without prior announcement.
3579# Mapping to determine the service ID for services that do not use it as the
3580# model data directory name. The keys are the data directory name and the
3581# values are the transformed service IDs (lower case and hyphenated).
3582CLIENT_NAME_TO_HYPHENIZED_SERVICE_ID_OVERRIDES = {
3583 # Actual service name we use -> Allowed computed service name.
3584 'alexaforbusiness': 'alexa-for-business',
3585 'apigateway': 'api-gateway',
3586 'application-autoscaling': 'application-auto-scaling',
3587 'appmesh': 'app-mesh',
3588 'autoscaling': 'auto-scaling',
3589 'autoscaling-plans': 'auto-scaling-plans',
3590 'ce': 'cost-explorer',
3591 'cloudhsmv2': 'cloudhsm-v2',
3592 'cloudsearchdomain': 'cloudsearch-domain',
3593 'cognito-idp': 'cognito-identity-provider',
3594 'config': 'config-service',
3595 'cur': 'cost-and-usage-report-service',
3596 'datapipeline': 'data-pipeline',
3597 'directconnect': 'direct-connect',
3598 'devicefarm': 'device-farm',
3599 'discovery': 'application-discovery-service',
3600 'dms': 'database-migration-service',
3601 'ds': 'directory-service',
3602 'dynamodbstreams': 'dynamodb-streams',
3603 'elasticbeanstalk': 'elastic-beanstalk',
3604 'elastictranscoder': 'elastic-transcoder',
3605 'elb': 'elastic-load-balancing',
3606 'elbv2': 'elastic-load-balancing-v2',
3607 'es': 'elasticsearch-service',
3608 'events': 'eventbridge',
3609 'globalaccelerator': 'global-accelerator',
3610 'iot-data': 'iot-data-plane',
3611 'iot-jobs-data': 'iot-jobs-data-plane',
3612 'iot1click-devices': 'iot-1click-devices-service',
3613 'iot1click-projects': 'iot-1click-projects',
3614 'iotevents-data': 'iot-events-data',
3615 'iotevents': 'iot-events',
3616 'iotwireless': 'iot-wireless',
3617 'kinesisanalytics': 'kinesis-analytics',
3618 'kinesisanalyticsv2': 'kinesis-analytics-v2',
3619 'kinesisvideo': 'kinesis-video',
3620 'lex-models': 'lex-model-building-service',
3621 'lexv2-models': 'lex-models-v2',
3622 'lex-runtime': 'lex-runtime-service',
3623 'lexv2-runtime': 'lex-runtime-v2',
3624 'logs': 'cloudwatch-logs',
3625 'machinelearning': 'machine-learning',
3626 'marketplacecommerceanalytics': 'marketplace-commerce-analytics',
3627 'marketplace-entitlement': 'marketplace-entitlement-service',
3628 'meteringmarketplace': 'marketplace-metering',
3629 'mgh': 'migration-hub',
3630 'sms-voice': 'pinpoint-sms-voice',
3631 'resourcegroupstaggingapi': 'resource-groups-tagging-api',
3632 'route53': 'route-53',
3633 'route53domains': 'route-53-domains',
3634 's3control': 's3-control',
3635 'sdb': 'simpledb',
3636 'secretsmanager': 'secrets-manager',
3637 'serverlessrepo': 'serverlessapplicationrepository',
3638 'servicecatalog': 'service-catalog',
3639 'servicecatalog-appregistry': 'service-catalog-appregistry',
3640 'stepfunctions': 'sfn',
3641 'storagegateway': 'storage-gateway',
3642}