Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/botocore/utils.py: 20%
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1# Copyright 2012-2014 Amazon.com, Inc. or its affiliates. All Rights Reserved.
2#
3# Licensed under the Apache License, Version 2.0 (the "License"). You
4# may not use this file except in compliance with the License. A copy of
5# the License is located at
6#
7# http://aws.amazon.com/apache2.0/
8#
9# or in the "license" file accompanying this file. This file is
10# distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF
11# ANY KIND, either express or implied. See the License for the specific
12# language governing permissions and limitations under the License.
13import base64
14import binascii
15import datetime
16import email.message
17import functools
18import hashlib
19import io
20import logging
21import os
22import random
23import re
24import socket
25import time
26import warnings
27import weakref
28from datetime import datetime as _DatetimeClass
29from ipaddress import ip_address
30from pathlib import Path
31from urllib.request import getproxies, proxy_bypass
33import dateutil.parser
34from dateutil.tz import tzutc
35from urllib3.exceptions import LocationParseError
37import botocore
38import botocore.awsrequest
39import botocore.httpsession
41# IP Regexes retained for backwards compatibility
42from botocore.compat import (
43 HAS_CRT,
44 HEX_PAT, # noqa: F401
45 IPV4_PAT, # noqa: F401
46 IPV4_RE,
47 IPV6_ADDRZ_PAT, # noqa: F401
48 IPV6_ADDRZ_RE,
49 IPV6_PAT, # noqa: F401
50 LS32_PAT, # noqa: F401
51 MD5_AVAILABLE,
52 UNRESERVED_PAT, # noqa: F401
53 UNSAFE_URL_CHARS,
54 ZONE_ID_PAT, # noqa: F401
55 OrderedDict,
56 get_md5,
57 get_tzinfo_options,
58 json,
59 quote,
60 urlparse,
61 urlsplit,
62 urlunsplit,
63 zip_longest,
64)
65from botocore.exceptions import (
66 ClientError,
67 ConfigNotFound,
68 ConnectionClosedError,
69 ConnectTimeoutError,
70 EndpointConnectionError,
71 HTTPClientError,
72 InvalidDNSNameError,
73 InvalidEndpointConfigurationError,
74 InvalidExpressionError,
75 InvalidHostLabelError,
76 InvalidIMDSEndpointError,
77 InvalidIMDSEndpointModeError,
78 InvalidRegionError,
79 MetadataRetrievalError,
80 MissingDependencyException,
81 ReadTimeoutError,
82 SSOTokenLoadError,
83 UnsupportedOutpostResourceError,
84 UnsupportedS3AccesspointConfigurationError,
85 UnsupportedS3ArnError,
86 UnsupportedS3ConfigurationError,
87 UnsupportedS3ControlArnError,
88 UnsupportedS3ControlConfigurationError,
89)
91logger = logging.getLogger(__name__)
92DEFAULT_METADATA_SERVICE_TIMEOUT = 1
93METADATA_BASE_URL = 'http://169.254.169.254/'
94METADATA_BASE_URL_IPv6 = 'http://[fd00:ec2::254]/'
95METADATA_ENDPOINT_MODES = ('ipv4', 'ipv6')
97# These are chars that do not need to be urlencoded.
98# Based on rfc2986, section 2.3
99SAFE_CHARS = '-._~'
100LABEL_RE = re.compile(r'[a-z0-9][a-z0-9\-]*[a-z0-9]')
101RETRYABLE_HTTP_ERRORS = (
102 ReadTimeoutError,
103 EndpointConnectionError,
104 ConnectionClosedError,
105 ConnectTimeoutError,
106)
107S3_ACCELERATE_WHITELIST = ['dualstack']
108# In switching events from using service name / endpoint prefix to service
109# id, we have to preserve compatibility. This maps the instances where either
110# is different than the transformed service id.
111EVENT_ALIASES = {
112 "api.mediatailor": "mediatailor",
113 "api.pricing": "pricing",
114 "api.sagemaker": "sagemaker",
115 "apigateway": "api-gateway",
116 "application-autoscaling": "application-auto-scaling",
117 "appstream2": "appstream",
118 "autoscaling": "auto-scaling",
119 "autoscaling-plans": "auto-scaling-plans",
120 "ce": "cost-explorer",
121 "cloudhsmv2": "cloudhsm-v2",
122 "cloudsearchdomain": "cloudsearch-domain",
123 "cognito-idp": "cognito-identity-provider",
124 "config": "config-service",
125 "cur": "cost-and-usage-report-service",
126 "data.iot": "iot-data-plane",
127 "data.jobs.iot": "iot-jobs-data-plane",
128 "data.mediastore": "mediastore-data",
129 "datapipeline": "data-pipeline",
130 "devicefarm": "device-farm",
131 "directconnect": "direct-connect",
132 "discovery": "application-discovery-service",
133 "dms": "database-migration-service",
134 "ds": "directory-service",
135 "dynamodbstreams": "dynamodb-streams",
136 "elasticbeanstalk": "elastic-beanstalk",
137 "elasticfilesystem": "efs",
138 "elasticloadbalancing": "elastic-load-balancing",
139 "elasticmapreduce": "emr",
140 "elastictranscoder": "elastic-transcoder",
141 "elb": "elastic-load-balancing",
142 "elbv2": "elastic-load-balancing-v2",
143 "email": "ses",
144 "entitlement.marketplace": "marketplace-entitlement-service",
145 "es": "elasticsearch-service",
146 "events": "eventbridge",
147 "cloudwatch-events": "eventbridge",
148 "iot-data": "iot-data-plane",
149 "iot-jobs-data": "iot-jobs-data-plane",
150 "kinesisanalytics": "kinesis-analytics",
151 "kinesisvideo": "kinesis-video",
152 "lex-models": "lex-model-building-service",
153 "lex-runtime": "lex-runtime-service",
154 "logs": "cloudwatch-logs",
155 "machinelearning": "machine-learning",
156 "marketplace-entitlement": "marketplace-entitlement-service",
157 "marketplacecommerceanalytics": "marketplace-commerce-analytics",
158 "metering.marketplace": "marketplace-metering",
159 "meteringmarketplace": "marketplace-metering",
160 "mgh": "migration-hub",
161 "models.lex": "lex-model-building-service",
162 "monitoring": "cloudwatch",
163 "mturk-requester": "mturk",
164 "opsworks-cm": "opsworkscm",
165 "resourcegroupstaggingapi": "resource-groups-tagging-api",
166 "route53": "route-53",
167 "route53domains": "route-53-domains",
168 "runtime.lex": "lex-runtime-service",
169 "runtime.sagemaker": "sagemaker-runtime",
170 "sdb": "simpledb",
171 "secretsmanager": "secrets-manager",
172 "serverlessrepo": "serverlessapplicationrepository",
173 "servicecatalog": "service-catalog",
174 "states": "sfn",
175 "stepfunctions": "sfn",
176 "storagegateway": "storage-gateway",
177 "streams.dynamodb": "dynamodb-streams",
178 "tagging": "resource-groups-tagging-api",
179}
182# This pattern can be used to detect if a header is a flexible checksum header
183CHECKSUM_HEADER_PATTERN = re.compile(
184 r'^X-Amz-Checksum-([a-z0-9]*)$',
185 flags=re.IGNORECASE,
186)
188PRIORITY_ORDERED_SUPPORTED_PROTOCOLS = (
189 'json',
190 'rest-json',
191 'rest-xml',
192 'smithy-rpc-v2-cbor',
193 'query',
194 'ec2',
195)
198def ensure_boolean(val):
199 """Ensures a boolean value if a string or boolean is provided
201 For strings, the value for True/False is case insensitive
202 """
203 if isinstance(val, bool):
204 return val
205 elif isinstance(val, str):
206 return val.lower() == 'true'
207 else:
208 return False
211def resolve_imds_endpoint_mode(session):
212 """Resolving IMDS endpoint mode to either IPv6 or IPv4.
214 ec2_metadata_service_endpoint_mode takes precedence over imds_use_ipv6.
215 """
216 endpoint_mode = session.get_config_variable(
217 'ec2_metadata_service_endpoint_mode'
218 )
219 if endpoint_mode is not None:
220 lendpoint_mode = endpoint_mode.lower()
221 if lendpoint_mode not in METADATA_ENDPOINT_MODES:
222 error_msg_kwargs = {
223 'mode': endpoint_mode,
224 'valid_modes': METADATA_ENDPOINT_MODES,
225 }
226 raise InvalidIMDSEndpointModeError(**error_msg_kwargs)
227 return lendpoint_mode
228 elif session.get_config_variable('imds_use_ipv6'):
229 return 'ipv6'
230 return 'ipv4'
233def is_json_value_header(shape):
234 """Determines if the provided shape is the special header type jsonvalue.
236 :type shape: botocore.shape
237 :param shape: Shape to be inspected for the jsonvalue trait.
239 :return: True if this type is a jsonvalue, False otherwise
240 :rtype: Bool
241 """
242 return (
243 hasattr(shape, 'serialization')
244 and shape.serialization.get('jsonvalue', False)
245 and shape.serialization.get('location') == 'header'
246 and shape.type_name == 'string'
247 )
250def has_header(header_name, headers):
251 """Case-insensitive check for header key."""
252 if header_name is None:
253 return False
254 elif isinstance(headers, botocore.awsrequest.HeadersDict):
255 return header_name in headers
256 else:
257 return header_name.lower() in [key.lower() for key in headers.keys()]
260def get_service_module_name(service_model):
261 """Returns the module name for a service
263 This is the value used in both the documentation and client class name
264 """
265 name = service_model.metadata.get(
266 'serviceAbbreviation',
267 service_model.metadata.get(
268 'serviceFullName', service_model.service_name
269 ),
270 )
271 name = name.replace('Amazon', '')
272 name = name.replace('AWS', '')
273 name = re.sub(r'\W+', '', name)
274 return name
277def normalize_url_path(path):
278 if not path:
279 return '/'
280 return remove_dot_segments(path)
283def normalize_boolean(val):
284 """Returns None if val is None, otherwise ensure value
285 converted to boolean"""
286 if val is None:
287 return val
288 else:
289 return ensure_boolean(val)
292def remove_dot_segments(url):
293 # RFC 3986, section 5.2.4 "Remove Dot Segments"
294 # Also, AWS services require consecutive slashes to be removed,
295 # so that's done here as well
296 if not url:
297 return ''
298 input_url = url.split('/')
299 output_list = []
300 for x in input_url:
301 if x and x != '.':
302 if x == '..':
303 if output_list:
304 output_list.pop()
305 else:
306 output_list.append(x)
308 if url[0] == '/':
309 first = '/'
310 else:
311 first = ''
312 if url[-1] == '/' and output_list:
313 last = '/'
314 else:
315 last = ''
316 return first + '/'.join(output_list) + last
319def validate_jmespath_for_set(expression):
320 # Validates a limited jmespath expression to determine if we can set a
321 # value based on it. Only works with dotted paths.
322 if not expression or expression == '.':
323 raise InvalidExpressionError(expression=expression)
325 for invalid in ['[', ']', '*']:
326 if invalid in expression:
327 raise InvalidExpressionError(expression=expression)
330def set_value_from_jmespath(source, expression, value, is_first=True):
331 # This takes a (limited) jmespath-like expression & can set a value based
332 # on it.
333 # Limitations:
334 # * Only handles dotted lookups
335 # * No offsets/wildcards/slices/etc.
336 if is_first:
337 validate_jmespath_for_set(expression)
339 bits = expression.split('.', 1)
340 current_key, remainder = bits[0], bits[1] if len(bits) > 1 else ''
342 if not current_key:
343 raise InvalidExpressionError(expression=expression)
345 if remainder:
346 if current_key not in source:
347 # We've got something in the expression that's not present in the
348 # source (new key). If there's any more bits, we'll set the key
349 # with an empty dictionary.
350 source[current_key] = {}
352 return set_value_from_jmespath(
353 source[current_key], remainder, value, is_first=False
354 )
356 # If we're down to a single key, set it.
357 source[current_key] = value
360def is_global_accesspoint(context):
361 """Determine if request is intended for an MRAP accesspoint."""
362 s3_accesspoint = context.get('s3_accesspoint', {})
363 is_global = s3_accesspoint.get('region') == ''
364 return is_global
367class _RetriesExceededError(Exception):
368 """Internal exception used when the number of retries are exceeded."""
370 pass
373class BadIMDSRequestError(Exception):
374 def __init__(self, request):
375 self.request = request
378class IMDSFetcher:
379 _RETRIES_EXCEEDED_ERROR_CLS = _RetriesExceededError
380 _TOKEN_PATH = 'latest/api/token'
381 _TOKEN_TTL = '21600'
383 def __init__(
384 self,
385 timeout=DEFAULT_METADATA_SERVICE_TIMEOUT,
386 num_attempts=1,
387 base_url=METADATA_BASE_URL,
388 env=None,
389 user_agent=None,
390 config=None,
391 ):
392 self._timeout = timeout
393 self._num_attempts = num_attempts
394 if config is None:
395 config = {}
396 self._base_url = self._select_base_url(base_url, config)
397 self._config = config
399 if env is None:
400 env = os.environ.copy()
401 self._disabled = (
402 env.get('AWS_EC2_METADATA_DISABLED', 'false').lower() == 'true'
403 )
404 self._imds_v1_disabled = config.get('ec2_metadata_v1_disabled')
405 self._user_agent = user_agent
406 self._session = botocore.httpsession.URLLib3Session(
407 timeout=self._timeout,
408 proxies=get_environ_proxies(self._base_url),
409 )
411 def get_base_url(self):
412 return self._base_url
414 def _select_base_url(self, base_url, config):
415 if config is None:
416 config = {}
418 requires_ipv6 = (
419 config.get('ec2_metadata_service_endpoint_mode') == 'ipv6'
420 )
421 custom_metadata_endpoint = config.get('ec2_metadata_service_endpoint')
423 if requires_ipv6 and custom_metadata_endpoint:
424 logger.warning(
425 "Custom endpoint and IMDS_USE_IPV6 are both set. Using custom endpoint."
426 )
428 chosen_base_url = None
430 if base_url != METADATA_BASE_URL:
431 chosen_base_url = base_url
432 elif custom_metadata_endpoint:
433 chosen_base_url = custom_metadata_endpoint
434 elif requires_ipv6:
435 chosen_base_url = METADATA_BASE_URL_IPv6
436 else:
437 chosen_base_url = METADATA_BASE_URL
439 logger.debug(f"IMDS ENDPOINT: {chosen_base_url}")
440 if not is_valid_uri(chosen_base_url):
441 raise InvalidIMDSEndpointError(endpoint=chosen_base_url)
443 return chosen_base_url
445 def _construct_url(self, path):
446 sep = ''
447 if self._base_url and not self._base_url.endswith('/'):
448 sep = '/'
449 return f'{self._base_url}{sep}{path}'
451 def _fetch_metadata_token(self):
452 self._assert_enabled()
453 url = self._construct_url(self._TOKEN_PATH)
454 headers = {
455 'x-aws-ec2-metadata-token-ttl-seconds': self._TOKEN_TTL,
456 }
457 self._add_user_agent(headers)
458 request = botocore.awsrequest.AWSRequest(
459 method='PUT', url=url, headers=headers
460 )
461 for i in range(self._num_attempts):
462 try:
463 response = self._session.send(request.prepare())
464 if response.status_code == 200:
465 return response.text
466 elif response.status_code in (404, 403, 405):
467 return None
468 elif response.status_code in (400,):
469 raise BadIMDSRequestError(request)
470 except ReadTimeoutError:
471 return None
472 except RETRYABLE_HTTP_ERRORS as e:
473 logger.debug(
474 "Caught retryable HTTP exception while making metadata "
475 "service request to %s: %s",
476 url,
477 e,
478 exc_info=True,
479 )
480 except HTTPClientError as e:
481 if isinstance(e.kwargs.get('error'), LocationParseError):
482 raise InvalidIMDSEndpointError(endpoint=url, error=e)
483 else:
484 raise
485 return None
487 def _get_request(self, url_path, retry_func, token=None):
488 """Make a get request to the Instance Metadata Service.
490 :type url_path: str
491 :param url_path: The path component of the URL to make a get request.
492 This arg is appended to the base_url that was provided in the
493 initializer.
495 :type retry_func: callable
496 :param retry_func: A function that takes the response as an argument
497 and determines if it needs to retry. By default empty and non
498 200 OK responses are retried.
500 :type token: str
501 :param token: Metadata token to send along with GET requests to IMDS.
502 """
503 self._assert_enabled()
504 if not token:
505 self._assert_v1_enabled()
506 if retry_func is None:
507 retry_func = self._default_retry
508 url = self._construct_url(url_path)
509 headers = {}
510 if token is not None:
511 headers['x-aws-ec2-metadata-token'] = token
512 self._add_user_agent(headers)
513 for i in range(self._num_attempts):
514 try:
515 request = botocore.awsrequest.AWSRequest(
516 method='GET', url=url, headers=headers
517 )
518 response = self._session.send(request.prepare())
519 if not retry_func(response):
520 return response
521 except RETRYABLE_HTTP_ERRORS as e:
522 logger.debug(
523 "Caught retryable HTTP exception while making metadata "
524 "service request to %s: %s",
525 url,
526 e,
527 exc_info=True,
528 )
529 raise self._RETRIES_EXCEEDED_ERROR_CLS()
531 def _add_user_agent(self, headers):
532 if self._user_agent is not None:
533 headers['User-Agent'] = self._user_agent
535 def _assert_enabled(self):
536 if self._disabled:
537 logger.debug("Access to EC2 metadata has been disabled.")
538 raise self._RETRIES_EXCEEDED_ERROR_CLS()
540 def _assert_v1_enabled(self):
541 if self._imds_v1_disabled:
542 raise MetadataRetrievalError(
543 error_msg="Unable to retrieve token for use in IMDSv2 call and IMDSv1 has been disabled"
544 )
546 def _default_retry(self, response):
547 return self._is_non_ok_response(response) or self._is_empty(response)
549 def _is_non_ok_response(self, response):
550 if response.status_code != 200:
551 self._log_imds_response(response, 'non-200', log_body=True)
552 return True
553 return False
555 def _is_empty(self, response):
556 if not response.content:
557 self._log_imds_response(response, 'no body', log_body=True)
558 return True
559 return False
561 def _log_imds_response(self, response, reason_to_log, log_body=False):
562 statement = (
563 "Metadata service returned %s response "
564 "with status code of %s for url: %s"
565 )
566 logger_args = [reason_to_log, response.status_code, response.url]
567 if log_body:
568 statement += ", content body: %s"
569 logger_args.append(response.content)
570 logger.debug(statement, *logger_args)
573class InstanceMetadataFetcher(IMDSFetcher):
574 _URL_PATH = 'latest/meta-data/iam/security-credentials/'
575 _REQUIRED_CREDENTIAL_FIELDS = [
576 'AccessKeyId',
577 'SecretAccessKey',
578 'Token',
579 'Expiration',
580 ]
582 def retrieve_iam_role_credentials(self):
583 try:
584 token = self._fetch_metadata_token()
585 role_name = self._get_iam_role(token)
586 credentials = self._get_credentials(role_name, token)
587 if self._contains_all_credential_fields(credentials):
588 credentials = {
589 'role_name': role_name,
590 'access_key': credentials['AccessKeyId'],
591 'secret_key': credentials['SecretAccessKey'],
592 'token': credentials['Token'],
593 'expiry_time': credentials['Expiration'],
594 }
595 self._evaluate_expiration(credentials)
596 return credentials
597 else:
598 # IMDS can return a 200 response that has a JSON formatted
599 # error message (i.e. if ec2 is not trusted entity for the
600 # attached role). We do not necessarily want to retry for
601 # these and we also do not necessarily want to raise a key
602 # error. So at least log the problematic response and return
603 # an empty dictionary to signal that it was not able to
604 # retrieve credentials. These error will contain both a
605 # Code and Message key.
606 if 'Code' in credentials and 'Message' in credentials:
607 logger.debug(
608 'Error response received when retrieving'
609 'credentials: %s.',
610 credentials,
611 )
612 return {}
613 except self._RETRIES_EXCEEDED_ERROR_CLS:
614 logger.debug(
615 "Max number of attempts exceeded (%s) when "
616 "attempting to retrieve data from metadata service.",
617 self._num_attempts,
618 )
619 except BadIMDSRequestError as e:
620 logger.debug("Bad IMDS request: %s", e.request)
621 return {}
623 def _get_iam_role(self, token=None):
624 return self._get_request(
625 url_path=self._URL_PATH,
626 retry_func=self._needs_retry_for_role_name,
627 token=token,
628 ).text
630 def _get_credentials(self, role_name, token=None):
631 r = self._get_request(
632 url_path=self._URL_PATH + role_name,
633 retry_func=self._needs_retry_for_credentials,
634 token=token,
635 )
636 return json.loads(r.text)
638 def _is_invalid_json(self, response):
639 try:
640 json.loads(response.text)
641 return False
642 except ValueError:
643 self._log_imds_response(response, 'invalid json')
644 return True
646 def _needs_retry_for_role_name(self, response):
647 return self._is_non_ok_response(response) or self._is_empty(response)
649 def _needs_retry_for_credentials(self, response):
650 return (
651 self._is_non_ok_response(response)
652 or self._is_empty(response)
653 or self._is_invalid_json(response)
654 )
656 def _contains_all_credential_fields(self, credentials):
657 for field in self._REQUIRED_CREDENTIAL_FIELDS:
658 if field not in credentials:
659 logger.debug(
660 'Retrieved credentials is missing required field: %s',
661 field,
662 )
663 return False
664 return True
666 def _evaluate_expiration(self, credentials):
667 expiration = credentials.get("expiry_time")
668 if expiration is None:
669 return
670 try:
671 expiration = datetime.datetime.strptime(
672 expiration, "%Y-%m-%dT%H:%M:%SZ"
673 )
674 refresh_interval = self._config.get(
675 "ec2_credential_refresh_window", 60 * 10
676 )
677 jitter = random.randint(120, 600) # Between 2 to 10 minutes
678 refresh_interval_with_jitter = refresh_interval + jitter
679 current_time = datetime.datetime.utcnow()
680 refresh_offset = datetime.timedelta(
681 seconds=refresh_interval_with_jitter
682 )
683 extension_time = expiration - refresh_offset
684 if current_time >= extension_time:
685 new_time = current_time + refresh_offset
686 credentials["expiry_time"] = new_time.strftime(
687 "%Y-%m-%dT%H:%M:%SZ"
688 )
689 logger.info(
690 f"Attempting credential expiration extension due to a "
691 f"credential service availability issue. A refresh of "
692 f"these credentials will be attempted again within "
693 f"the next {refresh_interval_with_jitter / 60:.0f} minutes."
694 )
695 except ValueError:
696 logger.debug(
697 f"Unable to parse expiry_time in {credentials['expiry_time']}"
698 )
701class IMDSRegionProvider:
702 def __init__(self, session, environ=None, fetcher=None):
703 """Initialize IMDSRegionProvider.
704 :type session: :class:`botocore.session.Session`
705 :param session: The session is needed to look up configuration for
706 how to contact the instance metadata service. Specifically the
707 whether or not it should use the IMDS region at all, and if so how
708 to configure the timeout and number of attempts to reach the
709 service.
710 :type environ: None or dict
711 :param environ: A dictionary of environment variables to use. If
712 ``None`` is the argument then ``os.environ`` will be used by
713 default.
714 :type fecther: :class:`botocore.utils.InstanceMetadataRegionFetcher`
715 :param fetcher: The class to actually handle the fetching of the region
716 from the IMDS. If not provided a default one will be created.
717 """
718 self._session = session
719 if environ is None:
720 environ = os.environ
721 self._environ = environ
722 self._fetcher = fetcher
724 def provide(self):
725 """Provide the region value from IMDS."""
726 instance_region = self._get_instance_metadata_region()
727 return instance_region
729 def _get_instance_metadata_region(self):
730 fetcher = self._get_fetcher()
731 region = fetcher.retrieve_region()
732 return region
734 def _get_fetcher(self):
735 if self._fetcher is None:
736 self._fetcher = self._create_fetcher()
737 return self._fetcher
739 def _create_fetcher(self):
740 metadata_timeout = self._session.get_config_variable(
741 'metadata_service_timeout'
742 )
743 metadata_num_attempts = self._session.get_config_variable(
744 'metadata_service_num_attempts'
745 )
746 imds_config = {
747 'ec2_metadata_service_endpoint': self._session.get_config_variable(
748 'ec2_metadata_service_endpoint'
749 ),
750 'ec2_metadata_service_endpoint_mode': resolve_imds_endpoint_mode(
751 self._session
752 ),
753 'ec2_metadata_v1_disabled': self._session.get_config_variable(
754 'ec2_metadata_v1_disabled'
755 ),
756 }
757 fetcher = InstanceMetadataRegionFetcher(
758 timeout=metadata_timeout,
759 num_attempts=metadata_num_attempts,
760 env=self._environ,
761 user_agent=self._session.user_agent(),
762 config=imds_config,
763 )
764 return fetcher
767class InstanceMetadataRegionFetcher(IMDSFetcher):
768 _URL_PATH = 'latest/meta-data/placement/availability-zone/'
770 def retrieve_region(self):
771 """Get the current region from the instance metadata service.
772 :rvalue: str
773 :returns: The region the current instance is running in or None
774 if the instance metadata service cannot be contacted or does not
775 give a valid response.
776 :rtype: None or str
777 :returns: Returns the region as a string if it is configured to use
778 IMDS as a region source. Otherwise returns ``None``. It will also
779 return ``None`` if it fails to get the region from IMDS due to
780 exhausting its retries or not being able to connect.
781 """
782 try:
783 region = self._get_region()
784 return region
785 except self._RETRIES_EXCEEDED_ERROR_CLS:
786 logger.debug(
787 "Max number of attempts exceeded (%s) when "
788 "attempting to retrieve data from metadata service.",
789 self._num_attempts,
790 )
791 return None
793 def _get_region(self):
794 token = self._fetch_metadata_token()
795 response = self._get_request(
796 url_path=self._URL_PATH,
797 retry_func=self._default_retry,
798 token=token,
799 )
800 availability_zone = response.text
801 region = availability_zone[:-1]
802 return region
805def merge_dicts(dict1, dict2, append_lists=False):
806 """Given two dict, merge the second dict into the first.
808 The dicts can have arbitrary nesting.
810 :param append_lists: If true, instead of clobbering a list with the new
811 value, append all of the new values onto the original list.
812 """
813 for key in dict2:
814 if isinstance(dict2[key], dict):
815 if key in dict1 and key in dict2:
816 merge_dicts(dict1[key], dict2[key])
817 else:
818 dict1[key] = dict2[key]
819 # If the value is a list and the ``append_lists`` flag is set,
820 # append the new values onto the original list
821 elif isinstance(dict2[key], list) and append_lists:
822 # The value in dict1 must be a list in order to append new
823 # values onto it.
824 if key in dict1 and isinstance(dict1[key], list):
825 dict1[key].extend(dict2[key])
826 else:
827 dict1[key] = dict2[key]
828 else:
829 # At scalar types, we iterate and merge the
830 # current dict that we're on.
831 dict1[key] = dict2[key]
834def lowercase_dict(original):
835 """Copies the given dictionary ensuring all keys are lowercase strings."""
836 copy = {}
837 for key in original:
838 copy[key.lower()] = original[key]
839 return copy
842def parse_key_val_file(filename, _open=open):
843 try:
844 with _open(filename) as f:
845 contents = f.read()
846 return parse_key_val_file_contents(contents)
847 except OSError:
848 raise ConfigNotFound(path=filename)
851def parse_key_val_file_contents(contents):
852 # This was originally extracted from the EC2 credential provider, which was
853 # fairly lenient in its parsing. We only try to parse key/val pairs if
854 # there's a '=' in the line.
855 final = {}
856 for line in contents.splitlines():
857 if '=' not in line:
858 continue
859 key, val = line.split('=', 1)
860 key = key.strip()
861 val = val.strip()
862 final[key] = val
863 return final
866def percent_encode_sequence(mapping, safe=SAFE_CHARS):
867 """Urlencode a dict or list into a string.
869 This is similar to urllib.urlencode except that:
871 * It uses quote, and not quote_plus
872 * It has a default list of safe chars that don't need
873 to be encoded, which matches what AWS services expect.
875 If any value in the input ``mapping`` is a list type,
876 then each list element wil be serialized. This is the equivalent
877 to ``urlencode``'s ``doseq=True`` argument.
879 This function should be preferred over the stdlib
880 ``urlencode()`` function.
882 :param mapping: Either a dict to urlencode or a list of
883 ``(key, value)`` pairs.
885 """
886 encoded_pairs = []
887 if hasattr(mapping, 'items'):
888 pairs = mapping.items()
889 else:
890 pairs = mapping
891 for key, value in pairs:
892 if isinstance(value, list):
893 for element in value:
894 encoded_pairs.append(
895 f'{percent_encode(key)}={percent_encode(element)}'
896 )
897 else:
898 encoded_pairs.append(
899 f'{percent_encode(key)}={percent_encode(value)}'
900 )
901 return '&'.join(encoded_pairs)
904def percent_encode(input_str, safe=SAFE_CHARS):
905 """Urlencodes a string.
907 Whereas percent_encode_sequence handles taking a dict/sequence and
908 producing a percent encoded string, this function deals only with
909 taking a string (not a dict/sequence) and percent encoding it.
911 If given the binary type, will simply URL encode it. If given the
912 text type, will produce the binary type by UTF-8 encoding the
913 text. If given something else, will convert it to the text type
914 first.
915 """
916 # If its not a binary or text string, make it a text string.
917 if not isinstance(input_str, (bytes, str)):
918 input_str = str(input_str)
919 # If it's not bytes, make it bytes by UTF-8 encoding it.
920 if not isinstance(input_str, bytes):
921 input_str = input_str.encode('utf-8')
922 return quote(input_str, safe=safe)
925def _epoch_seconds_to_datetime(value, tzinfo):
926 """Parse numerical epoch timestamps (seconds since 1970) into a
927 ``datetime.datetime`` in UTC using ``datetime.timedelta``. This is intended
928 as fallback when ``fromtimestamp`` raises ``OverflowError`` or ``OSError``.
930 :type value: float or int
931 :param value: The Unix timestamps as number.
933 :type tzinfo: callable
934 :param tzinfo: A ``datetime.tzinfo`` class or compatible callable.
935 """
936 epoch_zero = datetime.datetime(1970, 1, 1, 0, 0, 0, tzinfo=tzutc())
937 epoch_zero_localized = epoch_zero.astimezone(tzinfo())
938 return epoch_zero_localized + datetime.timedelta(seconds=value)
941def _parse_timestamp_with_tzinfo(value, tzinfo):
942 """Parse timestamp with pluggable tzinfo options."""
943 if isinstance(value, (int, float)):
944 # Possibly an epoch time.
945 return datetime.datetime.fromtimestamp(value, tzinfo())
946 else:
947 try:
948 return datetime.datetime.fromtimestamp(float(value), tzinfo())
949 except (TypeError, ValueError):
950 pass
951 try:
952 # In certain cases, a timestamp marked with GMT can be parsed into a
953 # different time zone, so here we provide a context which will
954 # enforce that GMT == UTC.
955 return dateutil.parser.parse(value, tzinfos={'GMT': tzutc()})
956 except (TypeError, ValueError) as e:
957 raise ValueError(f'Invalid timestamp "{value}": {e}')
960def parse_timestamp(value):
961 """Parse a timestamp into a datetime object.
963 Supported formats:
965 * iso8601
966 * rfc822
967 * epoch (value is an integer)
969 This will return a ``datetime.datetime`` object.
971 """
972 tzinfo_options = get_tzinfo_options()
973 for tzinfo in tzinfo_options:
974 try:
975 return _parse_timestamp_with_tzinfo(value, tzinfo)
976 except (OSError, OverflowError) as e:
977 logger.debug(
978 'Unable to parse timestamp with "%s" timezone info.',
979 tzinfo.__name__,
980 exc_info=e,
981 )
982 # For numeric values attempt fallback to using fromtimestamp-free method.
983 # From Python's ``datetime.datetime.fromtimestamp`` documentation: "This
984 # may raise ``OverflowError``, if the timestamp is out of the range of
985 # values supported by the platform C localtime() function, and ``OSError``
986 # on localtime() failure. It's common for this to be restricted to years
987 # from 1970 through 2038."
988 try:
989 numeric_value = float(value)
990 except (TypeError, ValueError):
991 pass
992 else:
993 try:
994 for tzinfo in tzinfo_options:
995 return _epoch_seconds_to_datetime(numeric_value, tzinfo=tzinfo)
996 except (OSError, OverflowError) as e:
997 logger.debug(
998 'Unable to parse timestamp using fallback method with "%s" '
999 'timezone info.',
1000 tzinfo.__name__,
1001 exc_info=e,
1002 )
1003 raise RuntimeError(
1004 f'Unable to calculate correct timezone offset for "{value}"'
1005 )
1008def parse_to_aware_datetime(value):
1009 """Converted the passed in value to a datetime object with tzinfo.
1011 This function can be used to normalize all timestamp inputs. This
1012 function accepts a number of different types of inputs, but
1013 will always return a datetime.datetime object with time zone
1014 information.
1016 The input param ``value`` can be one of several types:
1018 * A datetime object (both naive and aware)
1019 * An integer representing the epoch time (can also be a string
1020 of the integer, i.e '0', instead of 0). The epoch time is
1021 considered to be UTC.
1022 * An iso8601 formatted timestamp. This does not need to be
1023 a complete timestamp, it can contain just the date portion
1024 without the time component.
1026 The returned value will be a datetime object that will have tzinfo.
1027 If no timezone info was provided in the input value, then UTC is
1028 assumed, not local time.
1030 """
1031 # This is a general purpose method that handles several cases of
1032 # converting the provided value to a string timestamp suitable to be
1033 # serialized to an http request. It can handle:
1034 # 1) A datetime.datetime object.
1035 if isinstance(value, _DatetimeClass):
1036 datetime_obj = value
1037 else:
1038 # 2) A string object that's formatted as a timestamp.
1039 # We document this as being an iso8601 timestamp, although
1040 # parse_timestamp is a bit more flexible.
1041 datetime_obj = parse_timestamp(value)
1042 if datetime_obj.tzinfo is None:
1043 # I think a case would be made that if no time zone is provided,
1044 # we should use the local time. However, to restore backwards
1045 # compat, the previous behavior was to assume UTC, which is
1046 # what we're going to do here.
1047 datetime_obj = datetime_obj.replace(tzinfo=tzutc())
1048 else:
1049 datetime_obj = datetime_obj.astimezone(tzutc())
1050 return datetime_obj
1053def datetime2timestamp(dt, default_timezone=None):
1054 """Calculate the timestamp based on the given datetime instance.
1056 :type dt: datetime
1057 :param dt: A datetime object to be converted into timestamp
1058 :type default_timezone: tzinfo
1059 :param default_timezone: If it is provided as None, we treat it as tzutc().
1060 But it is only used when dt is a naive datetime.
1061 :returns: The timestamp
1062 """
1063 epoch = datetime.datetime(1970, 1, 1)
1064 if dt.tzinfo is None:
1065 if default_timezone is None:
1066 default_timezone = tzutc()
1067 dt = dt.replace(tzinfo=default_timezone)
1068 d = dt.replace(tzinfo=None) - dt.utcoffset() - epoch
1069 return d.total_seconds()
1072def calculate_sha256(body, as_hex=False):
1073 """Calculate a sha256 checksum.
1075 This method will calculate the sha256 checksum of a file like
1076 object. Note that this method will iterate through the entire
1077 file contents. The caller is responsible for ensuring the proper
1078 starting position of the file and ``seek()``'ing the file back
1079 to its starting location if other consumers need to read from
1080 the file like object.
1082 :param body: Any file like object. The file must be opened
1083 in binary mode such that a ``.read()`` call returns bytes.
1084 :param as_hex: If True, then the hex digest is returned.
1085 If False, then the digest (as binary bytes) is returned.
1087 :returns: The sha256 checksum
1089 """
1090 checksum = hashlib.sha256()
1091 for chunk in iter(lambda: body.read(1024 * 1024), b''):
1092 checksum.update(chunk)
1093 if as_hex:
1094 return checksum.hexdigest()
1095 else:
1096 return checksum.digest()
1099def calculate_tree_hash(body):
1100 """Calculate a tree hash checksum.
1102 For more information see:
1104 http://docs.aws.amazon.com/amazonglacier/latest/dev/checksum-calculations.html
1106 :param body: Any file like object. This has the same constraints as
1107 the ``body`` param in calculate_sha256
1109 :rtype: str
1110 :returns: The hex version of the calculated tree hash
1112 """
1113 chunks = []
1114 required_chunk_size = 1024 * 1024
1115 sha256 = hashlib.sha256
1116 for chunk in iter(lambda: body.read(required_chunk_size), b''):
1117 chunks.append(sha256(chunk).digest())
1118 if not chunks:
1119 return sha256(b'').hexdigest()
1120 while len(chunks) > 1:
1121 new_chunks = []
1122 for first, second in _in_pairs(chunks):
1123 if second is not None:
1124 new_chunks.append(sha256(first + second).digest())
1125 else:
1126 # We're at the end of the list and there's no pair left.
1127 new_chunks.append(first)
1128 chunks = new_chunks
1129 return binascii.hexlify(chunks[0]).decode('ascii')
1132def _in_pairs(iterable):
1133 # Creates iterator that iterates over the list in pairs:
1134 # for a, b in _in_pairs([0, 1, 2, 3, 4]):
1135 # print(a, b)
1136 #
1137 # will print:
1138 # 0, 1
1139 # 2, 3
1140 # 4, None
1141 shared_iter = iter(iterable)
1142 # Note that zip_longest is a compat import that uses
1143 # the itertools izip_longest. This creates an iterator,
1144 # this call below does _not_ immediately create the list
1145 # of pairs.
1146 return zip_longest(shared_iter, shared_iter)
1149class CachedProperty:
1150 """A read only property that caches the initially computed value.
1152 This descriptor will only call the provided ``fget`` function once.
1153 Subsequent access to this property will return the cached value.
1155 """
1157 def __init__(self, fget):
1158 self._fget = fget
1160 def __get__(self, obj, cls):
1161 if obj is None:
1162 return self
1163 else:
1164 computed_value = self._fget(obj)
1165 obj.__dict__[self._fget.__name__] = computed_value
1166 return computed_value
1169class ArgumentGenerator:
1170 """Generate sample input based on a shape model.
1172 This class contains a ``generate_skeleton`` method that will take
1173 an input/output shape (created from ``botocore.model``) and generate
1174 a sample dictionary corresponding to the input/output shape.
1176 The specific values used are place holder values. For strings either an
1177 empty string or the member name can be used, for numbers 0 or 0.0 is used.
1178 The intended usage of this class is to generate the *shape* of the input
1179 structure.
1181 This can be useful for operations that have complex input shapes.
1182 This allows a user to just fill in the necessary data instead of
1183 worrying about the specific structure of the input arguments.
1185 Example usage::
1187 s = botocore.session.get_session()
1188 ddb = s.get_service_model('dynamodb')
1189 arg_gen = ArgumentGenerator()
1190 sample_input = arg_gen.generate_skeleton(
1191 ddb.operation_model('CreateTable').input_shape)
1192 print("Sample input for dynamodb.CreateTable: %s" % sample_input)
1194 """
1196 def __init__(self, use_member_names=False):
1197 self._use_member_names = use_member_names
1199 def generate_skeleton(self, shape):
1200 """Generate a sample input.
1202 :type shape: ``botocore.model.Shape``
1203 :param shape: The input shape.
1205 :return: The generated skeleton input corresponding to the
1206 provided input shape.
1208 """
1209 stack = []
1210 return self._generate_skeleton(shape, stack)
1212 def _generate_skeleton(self, shape, stack, name=''):
1213 stack.append(shape.name)
1214 try:
1215 if shape.type_name == 'structure':
1216 return self._generate_type_structure(shape, stack)
1217 elif shape.type_name == 'list':
1218 return self._generate_type_list(shape, stack)
1219 elif shape.type_name == 'map':
1220 return self._generate_type_map(shape, stack)
1221 elif shape.type_name == 'string':
1222 if self._use_member_names:
1223 return name
1224 if shape.enum:
1225 return random.choice(shape.enum)
1226 return ''
1227 elif shape.type_name in ['integer', 'long']:
1228 return 0
1229 elif shape.type_name in ['float', 'double']:
1230 return 0.0
1231 elif shape.type_name == 'boolean':
1232 return True
1233 elif shape.type_name == 'timestamp':
1234 return datetime.datetime(1970, 1, 1, 0, 0, 0)
1235 finally:
1236 stack.pop()
1238 def _generate_type_structure(self, shape, stack):
1239 if stack.count(shape.name) > 1:
1240 return {}
1241 skeleton = OrderedDict()
1242 for member_name, member_shape in shape.members.items():
1243 skeleton[member_name] = self._generate_skeleton(
1244 member_shape, stack, name=member_name
1245 )
1246 return skeleton
1248 def _generate_type_list(self, shape, stack):
1249 # For list elements we've arbitrarily decided to
1250 # return two elements for the skeleton list.
1251 name = ''
1252 if self._use_member_names:
1253 name = shape.member.name
1254 return [
1255 self._generate_skeleton(shape.member, stack, name),
1256 ]
1258 def _generate_type_map(self, shape, stack):
1259 key_shape = shape.key
1260 value_shape = shape.value
1261 assert key_shape.type_name == 'string'
1262 return OrderedDict(
1263 [
1264 ('KeyName', self._generate_skeleton(value_shape, stack)),
1265 ]
1266 )
1269def is_valid_ipv6_endpoint_url(endpoint_url):
1270 if UNSAFE_URL_CHARS.intersection(endpoint_url):
1271 return False
1272 hostname = f'[{urlparse(endpoint_url).hostname}]'
1273 return IPV6_ADDRZ_RE.match(hostname) is not None
1276def is_valid_ipv4_endpoint_url(endpoint_url):
1277 hostname = urlparse(endpoint_url).hostname
1278 return IPV4_RE.match(hostname) is not None
1281def is_valid_endpoint_url(endpoint_url):
1282 """Verify the endpoint_url is valid.
1284 :type endpoint_url: string
1285 :param endpoint_url: An endpoint_url. Must have at least a scheme
1286 and a hostname.
1288 :return: True if the endpoint url is valid. False otherwise.
1290 """
1291 # post-bpo-43882 urlsplit() strips unsafe characters from URL, causing
1292 # it to pass hostname validation below. Detect them early to fix that.
1293 if UNSAFE_URL_CHARS.intersection(endpoint_url):
1294 return False
1295 parts = urlsplit(endpoint_url)
1296 hostname = parts.hostname
1297 if hostname is None:
1298 return False
1299 if len(hostname) > 255:
1300 return False
1301 if hostname[-1] == ".":
1302 hostname = hostname[:-1]
1303 allowed = re.compile(
1304 r"^((?!-)[A-Z\d-]{1,63}(?<!-)\.)*((?!-)[A-Z\d-]{1,63}(?<!-))$",
1305 re.IGNORECASE,
1306 )
1307 return allowed.match(hostname)
1310def is_valid_uri(endpoint_url):
1311 return is_valid_endpoint_url(endpoint_url) or is_valid_ipv6_endpoint_url(
1312 endpoint_url
1313 )
1316def validate_region_name(region_name):
1317 """Provided region_name must be a valid host label."""
1318 if region_name is None:
1319 return
1320 valid_host_label = re.compile(r'^(?![0-9]+$)(?!-)[a-zA-Z0-9-]{,63}(?<!-)$')
1321 valid = valid_host_label.match(region_name)
1322 if not valid:
1323 raise InvalidRegionError(region_name=region_name)
1326def check_dns_name(bucket_name):
1327 """
1328 Check to see if the ``bucket_name`` complies with the
1329 restricted DNS naming conventions necessary to allow
1330 access via virtual-hosting style.
1332 Even though "." characters are perfectly valid in this DNS
1333 naming scheme, we are going to punt on any name containing a
1334 "." character because these will cause SSL cert validation
1335 problems if we try to use virtual-hosting style addressing.
1336 """
1337 if '.' in bucket_name:
1338 return False
1339 n = len(bucket_name)
1340 if n < 3 or n > 63:
1341 # Wrong length
1342 return False
1343 match = LABEL_RE.match(bucket_name)
1344 if match is None or match.end() != len(bucket_name):
1345 return False
1346 return True
1349def fix_s3_host(
1350 request,
1351 signature_version,
1352 region_name,
1353 default_endpoint_url=None,
1354 **kwargs,
1355):
1356 """
1357 This handler looks at S3 requests just before they are signed.
1358 If there is a bucket name on the path (true for everything except
1359 ListAllBuckets) it checks to see if that bucket name conforms to
1360 the DNS naming conventions. If it does, it alters the request to
1361 use ``virtual hosting`` style addressing rather than ``path-style``
1362 addressing.
1364 """
1365 if request.context.get('use_global_endpoint', False):
1366 default_endpoint_url = 's3.amazonaws.com'
1367 try:
1368 switch_to_virtual_host_style(
1369 request, signature_version, default_endpoint_url
1370 )
1371 except InvalidDNSNameError as e:
1372 bucket_name = e.kwargs['bucket_name']
1373 logger.debug(
1374 'Not changing URI, bucket is not DNS compatible: %s', bucket_name
1375 )
1378def switch_to_virtual_host_style(
1379 request, signature_version, default_endpoint_url=None, **kwargs
1380):
1381 """
1382 This is a handler to force virtual host style s3 addressing no matter
1383 the signature version (which is taken in consideration for the default
1384 case). If the bucket is not DNS compatible an InvalidDNSName is thrown.
1386 :param request: A AWSRequest object that is about to be sent.
1387 :param signature_version: The signature version to sign with
1388 :param default_endpoint_url: The endpoint to use when switching to a
1389 virtual style. If None is supplied, the virtual host will be
1390 constructed from the url of the request.
1391 """
1392 if request.auth_path is not None:
1393 # The auth_path has already been applied (this may be a
1394 # retried request). We don't need to perform this
1395 # customization again.
1396 return
1397 elif _is_get_bucket_location_request(request):
1398 # For the GetBucketLocation response, we should not be using
1399 # the virtual host style addressing so we can avoid any sigv4
1400 # issues.
1401 logger.debug(
1402 "Request is GetBucketLocation operation, not checking "
1403 "for DNS compatibility."
1404 )
1405 return
1406 parts = urlsplit(request.url)
1407 request.auth_path = parts.path
1408 path_parts = parts.path.split('/')
1410 # Retrieve what the endpoint we will be prepending the bucket name to.
1411 if default_endpoint_url is None:
1412 default_endpoint_url = parts.netloc
1414 if len(path_parts) > 1:
1415 bucket_name = path_parts[1]
1416 if not bucket_name:
1417 # If the bucket name is empty we should not be checking for
1418 # dns compatibility.
1419 return
1420 logger.debug('Checking for DNS compatible bucket for: %s', request.url)
1421 if check_dns_name(bucket_name):
1422 # If the operation is on a bucket, the auth_path must be
1423 # terminated with a '/' character.
1424 if len(path_parts) == 2:
1425 if request.auth_path[-1] != '/':
1426 request.auth_path += '/'
1427 path_parts.remove(bucket_name)
1428 # At the very least the path must be a '/', such as with the
1429 # CreateBucket operation when DNS style is being used. If this
1430 # is not used you will get an empty path which is incorrect.
1431 path = '/'.join(path_parts) or '/'
1432 global_endpoint = default_endpoint_url
1433 host = bucket_name + '.' + global_endpoint
1434 new_tuple = (parts.scheme, host, path, parts.query, '')
1435 new_uri = urlunsplit(new_tuple)
1436 request.url = new_uri
1437 logger.debug('URI updated to: %s', new_uri)
1438 else:
1439 raise InvalidDNSNameError(bucket_name=bucket_name)
1442def _is_get_bucket_location_request(request):
1443 return request.url.endswith('?location')
1446def instance_cache(func):
1447 """Method decorator for caching method calls to a single instance.
1449 **This is not a general purpose caching decorator.**
1451 In order to use this, you *must* provide an ``_instance_cache``
1452 attribute on the instance.
1454 This decorator is used to cache method calls. The cache is only
1455 scoped to a single instance though such that multiple instances
1456 will maintain their own cache. In order to keep things simple,
1457 this decorator requires that you provide an ``_instance_cache``
1458 attribute on your instance.
1460 """
1461 func_name = func.__name__
1463 @functools.wraps(func)
1464 def _cache_guard(self, *args, **kwargs):
1465 cache_key = (func_name, args)
1466 if kwargs:
1467 kwarg_items = tuple(sorted(kwargs.items()))
1468 cache_key = (func_name, args, kwarg_items)
1469 result = self._instance_cache.get(cache_key)
1470 if result is not None:
1471 return result
1472 result = func(self, *args, **kwargs)
1473 self._instance_cache[cache_key] = result
1474 return result
1476 return _cache_guard
1479def lru_cache_weakref(*cache_args, **cache_kwargs):
1480 """
1481 Version of functools.lru_cache that stores a weak reference to ``self``.
1483 Serves the same purpose as :py:func:`instance_cache` but uses Python's
1484 functools implementation which offers ``max_size`` and ``typed`` properties.
1486 lru_cache is a global cache even when used on a method. The cache's
1487 reference to ``self`` will prevent garbage collection of the object. This
1488 wrapper around functools.lru_cache replaces the reference to ``self`` with
1489 a weak reference to not interfere with garbage collection.
1490 """
1492 def wrapper(func):
1493 @functools.lru_cache(*cache_args, **cache_kwargs)
1494 def func_with_weakref(weakref_to_self, *args, **kwargs):
1495 return func(weakref_to_self(), *args, **kwargs)
1497 @functools.wraps(func)
1498 def inner(self, *args, **kwargs):
1499 for kwarg_key, kwarg_value in kwargs.items():
1500 if isinstance(kwarg_value, list):
1501 kwargs[kwarg_key] = tuple(kwarg_value)
1502 return func_with_weakref(weakref.ref(self), *args, **kwargs)
1504 inner.cache_info = func_with_weakref.cache_info
1505 return inner
1507 return wrapper
1510def switch_host_s3_accelerate(request, operation_name, **kwargs):
1511 """Switches the current s3 endpoint with an S3 Accelerate endpoint"""
1513 # Note that when registered the switching of the s3 host happens
1514 # before it gets changed to virtual. So we are not concerned with ensuring
1515 # that the bucket name is translated to the virtual style here and we
1516 # can hard code the Accelerate endpoint.
1517 parts = urlsplit(request.url).netloc.split('.')
1518 parts = [p for p in parts if p in S3_ACCELERATE_WHITELIST]
1519 endpoint = 'https://s3-accelerate.'
1520 if len(parts) > 0:
1521 endpoint += '.'.join(parts) + '.'
1522 endpoint += 'amazonaws.com'
1524 if operation_name in ['ListBuckets', 'CreateBucket', 'DeleteBucket']:
1525 return
1526 _switch_hosts(request, endpoint, use_new_scheme=False)
1529def switch_host_with_param(request, param_name):
1530 """Switches the host using a parameter value from a JSON request body"""
1531 request_json = json.loads(request.data.decode('utf-8'))
1532 if request_json.get(param_name):
1533 new_endpoint = request_json[param_name]
1534 _switch_hosts(request, new_endpoint)
1537def _switch_hosts(request, new_endpoint, use_new_scheme=True):
1538 final_endpoint = _get_new_endpoint(
1539 request.url, new_endpoint, use_new_scheme
1540 )
1541 request.url = final_endpoint
1544def _get_new_endpoint(original_endpoint, new_endpoint, use_new_scheme=True):
1545 new_endpoint_components = urlsplit(new_endpoint)
1546 original_endpoint_components = urlsplit(original_endpoint)
1547 scheme = original_endpoint_components.scheme
1548 if use_new_scheme:
1549 scheme = new_endpoint_components.scheme
1550 final_endpoint_components = (
1551 scheme,
1552 new_endpoint_components.netloc,
1553 original_endpoint_components.path,
1554 original_endpoint_components.query,
1555 '',
1556 )
1557 final_endpoint = urlunsplit(final_endpoint_components)
1558 logger.debug(f'Updating URI from {original_endpoint} to {final_endpoint}')
1559 return final_endpoint
1562def deep_merge(base, extra):
1563 """Deeply two dictionaries, overriding existing keys in the base.
1565 :param base: The base dictionary which will be merged into.
1566 :param extra: The dictionary to merge into the base. Keys from this
1567 dictionary will take precedence.
1568 """
1569 for key in extra:
1570 # If the key represents a dict on both given dicts, merge the sub-dicts
1571 if (
1572 key in base
1573 and isinstance(base[key], dict)
1574 and isinstance(extra[key], dict)
1575 ):
1576 deep_merge(base[key], extra[key])
1577 continue
1579 # Otherwise, set the key on the base to be the value of the extra.
1580 base[key] = extra[key]
1583def hyphenize_service_id(service_id):
1584 """Translate the form used for event emitters.
1586 :param service_id: The service_id to convert.
1587 """
1588 return service_id.replace(' ', '-').lower()
1591class IdentityCache:
1592 """Base IdentityCache implementation for storing and retrieving
1593 highly accessed credentials.
1595 This class is not intended to be instantiated in user code.
1596 """
1598 METHOD = "base_identity_cache"
1600 def __init__(self, client, credential_cls):
1601 self._client = client
1602 self._credential_cls = credential_cls
1604 def get_credentials(self, **kwargs):
1605 callback = self.build_refresh_callback(**kwargs)
1606 metadata = callback()
1607 credential_entry = self._credential_cls.create_from_metadata(
1608 metadata=metadata,
1609 refresh_using=callback,
1610 method=self.METHOD,
1611 advisory_timeout=45,
1612 mandatory_timeout=10,
1613 )
1614 return credential_entry
1616 def build_refresh_callback(**kwargs):
1617 """Callback to be implemented by subclasses.
1619 Returns a set of metadata to be converted into a new
1620 credential instance.
1621 """
1622 raise NotImplementedError()
1625class S3ExpressIdentityCache(IdentityCache):
1626 """S3Express IdentityCache for retrieving and storing
1627 credentials from CreateSession calls.
1629 This class is not intended to be instantiated in user code.
1630 """
1632 METHOD = "s3express"
1634 def __init__(self, client, credential_cls):
1635 self._client = client
1636 self._credential_cls = credential_cls
1638 @functools.lru_cache(maxsize=100)
1639 def get_credentials(self, bucket):
1640 return super().get_credentials(bucket=bucket)
1642 def build_refresh_callback(self, bucket):
1643 def refresher():
1644 response = self._client.create_session(Bucket=bucket)
1645 creds = response['Credentials']
1646 expiration = self._serialize_if_needed(
1647 creds['Expiration'], iso=True
1648 )
1649 return {
1650 "access_key": creds['AccessKeyId'],
1651 "secret_key": creds['SecretAccessKey'],
1652 "token": creds['SessionToken'],
1653 "expiry_time": expiration,
1654 }
1656 return refresher
1658 def _serialize_if_needed(self, value, iso=False):
1659 if isinstance(value, _DatetimeClass):
1660 if iso:
1661 return value.isoformat()
1662 return value.strftime('%Y-%m-%dT%H:%M:%S%Z')
1663 return value
1666class S3ExpressIdentityResolver:
1667 def __init__(self, client, credential_cls, cache=None):
1668 self._client = weakref.proxy(client)
1670 if cache is None:
1671 cache = S3ExpressIdentityCache(self._client, credential_cls)
1672 self._cache = cache
1674 def register(self, event_emitter=None):
1675 logger.debug('Registering S3Express Identity Resolver')
1676 emitter = event_emitter or self._client.meta.events
1677 emitter.register('before-call.s3', self.apply_signing_cache_key)
1678 emitter.register('before-sign.s3', self.resolve_s3express_identity)
1680 def apply_signing_cache_key(self, params, context, **kwargs):
1681 endpoint_properties = context.get('endpoint_properties', {})
1682 backend = endpoint_properties.get('backend', None)
1684 # Add cache key if Bucket supplied for s3express request
1685 bucket_name = context.get('input_params', {}).get('Bucket')
1686 if backend == 'S3Express' and bucket_name is not None:
1687 context.setdefault('signing', {})
1688 context['signing']['cache_key'] = bucket_name
1690 def resolve_s3express_identity(
1691 self,
1692 request,
1693 signing_name,
1694 region_name,
1695 signature_version,
1696 request_signer,
1697 operation_name,
1698 **kwargs,
1699 ):
1700 signing_context = request.context.get('signing', {})
1701 signing_name = signing_context.get('signing_name')
1702 if signing_name == 's3express' and signature_version.startswith(
1703 'v4-s3express'
1704 ):
1705 signing_context['identity_cache'] = self._cache
1706 if 'cache_key' not in signing_context:
1707 signing_context['cache_key'] = (
1708 request.context.get('s3_redirect', {})
1709 .get('params', {})
1710 .get('Bucket')
1711 )
1714class S3RegionRedirectorv2:
1715 """Updated version of S3RegionRedirector for use when
1716 EndpointRulesetResolver is in use for endpoint resolution.
1718 This class is considered private and subject to abrupt breaking changes or
1719 removal without prior announcement. Please do not use it directly.
1720 """
1722 def __init__(self, endpoint_bridge, client, cache=None):
1723 self._cache = cache or {}
1724 self._client = weakref.proxy(client)
1726 def register(self, event_emitter=None):
1727 logger.debug('Registering S3 region redirector handler')
1728 emitter = event_emitter or self._client.meta.events
1729 emitter.register('needs-retry.s3', self.redirect_from_error)
1730 emitter.register(
1731 'before-parameter-build.s3', self.annotate_request_context
1732 )
1733 emitter.register(
1734 'before-endpoint-resolution.s3', self.redirect_from_cache
1735 )
1737 def redirect_from_error(self, request_dict, response, operation, **kwargs):
1738 """
1739 An S3 request sent to the wrong region will return an error that
1740 contains the endpoint the request should be sent to. This handler
1741 will add the redirect information to the signing context and then
1742 redirect the request.
1743 """
1744 if response is None:
1745 # This could be none if there was a ConnectionError or other
1746 # transport error.
1747 return
1749 redirect_ctx = request_dict.get('context', {}).get('s3_redirect', {})
1750 if ArnParser.is_arn(redirect_ctx.get('bucket')):
1751 logger.debug(
1752 'S3 request was previously for an Accesspoint ARN, not '
1753 'redirecting.'
1754 )
1755 return
1757 if redirect_ctx.get('redirected'):
1758 logger.debug(
1759 'S3 request was previously redirected, not redirecting.'
1760 )
1761 return
1763 error = response[1].get('Error', {})
1764 error_code = error.get('Code')
1765 response_metadata = response[1].get('ResponseMetadata', {})
1767 # We have to account for 400 responses because
1768 # if we sign a Head* request with the wrong region,
1769 # we'll get a 400 Bad Request but we won't get a
1770 # body saying it's an "AuthorizationHeaderMalformed".
1771 is_special_head_object = (
1772 error_code in ('301', '400') and operation.name == 'HeadObject'
1773 )
1774 is_special_head_bucket = (
1775 error_code in ('301', '400')
1776 and operation.name == 'HeadBucket'
1777 and 'x-amz-bucket-region'
1778 in response_metadata.get('HTTPHeaders', {})
1779 )
1780 is_wrong_signing_region = (
1781 error_code == 'AuthorizationHeaderMalformed' and 'Region' in error
1782 )
1783 is_redirect_status = response[0] is not None and response[
1784 0
1785 ].status_code in (301, 302, 307)
1786 is_permanent_redirect = error_code == 'PermanentRedirect'
1787 is_opt_in_region_redirect = (
1788 error_code == 'IllegalLocationConstraintException'
1789 and operation.name != 'CreateBucket'
1790 )
1791 if not any(
1792 [
1793 is_special_head_object,
1794 is_wrong_signing_region,
1795 is_permanent_redirect,
1796 is_special_head_bucket,
1797 is_redirect_status,
1798 is_opt_in_region_redirect,
1799 ]
1800 ):
1801 return
1803 bucket = request_dict['context']['s3_redirect']['bucket']
1804 client_region = request_dict['context'].get('client_region')
1805 new_region = self.get_bucket_region(bucket, response)
1807 if new_region is None:
1808 logger.debug(
1809 f"S3 client configured for region {client_region} but the "
1810 f"bucket {bucket} is not in that region and the proper region "
1811 "could not be automatically determined."
1812 )
1813 return
1815 logger.debug(
1816 f"S3 client configured for region {client_region} but the bucket {bucket} "
1817 f"is in region {new_region}; Please configure the proper region to "
1818 f"avoid multiple unnecessary redirects and signing attempts."
1819 )
1820 # Adding the new region to _cache will make construct_endpoint() to
1821 # use the new region as value for the AWS::Region builtin parameter.
1822 self._cache[bucket] = new_region
1824 # Re-resolve endpoint with new region and modify request_dict with
1825 # the new URL, auth scheme, and signing context.
1826 ep_resolver = self._client._ruleset_resolver
1827 ep_info = ep_resolver.construct_endpoint(
1828 operation_model=operation,
1829 call_args=request_dict['context']['s3_redirect']['params'],
1830 request_context=request_dict['context'],
1831 )
1832 request_dict['url'] = self.set_request_url(
1833 request_dict['url'], ep_info.url
1834 )
1835 request_dict['context']['s3_redirect']['redirected'] = True
1836 auth_schemes = ep_info.properties.get('authSchemes')
1837 if auth_schemes is not None:
1838 auth_info = ep_resolver.auth_schemes_to_signing_ctx(auth_schemes)
1839 auth_type, signing_context = auth_info
1840 request_dict['context']['auth_type'] = auth_type
1841 request_dict['context']['signing'] = {
1842 **request_dict['context'].get('signing', {}),
1843 **signing_context,
1844 }
1846 # Return 0 so it doesn't wait to retry
1847 return 0
1849 def get_bucket_region(self, bucket, response):
1850 """
1851 There are multiple potential sources for the new region to redirect to,
1852 but they aren't all universally available for use. This will try to
1853 find region from response elements, but will fall back to calling
1854 HEAD on the bucket if all else fails.
1856 :param bucket: The bucket to find the region for. This is necessary if
1857 the region is not available in the error response.
1858 :param response: A response representing a service request that failed
1859 due to incorrect region configuration.
1860 """
1861 # First try to source the region from the headers.
1862 service_response = response[1]
1863 response_headers = service_response['ResponseMetadata']['HTTPHeaders']
1864 if 'x-amz-bucket-region' in response_headers:
1865 return response_headers['x-amz-bucket-region']
1867 # Next, check the error body
1868 region = service_response.get('Error', {}).get('Region', None)
1869 if region is not None:
1870 return region
1872 # Finally, HEAD the bucket. No other choice sadly.
1873 try:
1874 response = self._client.head_bucket(Bucket=bucket)
1875 headers = response['ResponseMetadata']['HTTPHeaders']
1876 except ClientError as e:
1877 headers = e.response['ResponseMetadata']['HTTPHeaders']
1879 region = headers.get('x-amz-bucket-region', None)
1880 return region
1882 def set_request_url(self, old_url, new_endpoint, **kwargs):
1883 """
1884 Splice a new endpoint into an existing URL. Note that some endpoints
1885 from the the endpoint provider have a path component which will be
1886 discarded by this function.
1887 """
1888 return _get_new_endpoint(old_url, new_endpoint, False)
1890 def redirect_from_cache(self, builtins, params, **kwargs):
1891 """
1892 If a bucket name has been redirected before, it is in the cache. This
1893 handler will update the AWS::Region endpoint resolver builtin param
1894 to use the region from cache instead of the client region to avoid the
1895 redirect.
1896 """
1897 bucket = params.get('Bucket')
1898 if bucket is not None and bucket in self._cache:
1899 new_region = self._cache.get(bucket)
1900 builtins['AWS::Region'] = new_region
1902 def annotate_request_context(self, params, context, **kwargs):
1903 """Store the bucket name in context for later use when redirecting.
1904 The bucket name may be an access point ARN or alias.
1905 """
1906 bucket = params.get('Bucket')
1907 context['s3_redirect'] = {
1908 'redirected': False,
1909 'bucket': bucket,
1910 'params': params,
1911 }
1914class S3RegionRedirector:
1915 """This handler has been replaced by S3RegionRedirectorv2. The original
1916 version remains in place for any third-party libraries that import it.
1917 """
1919 def __init__(self, endpoint_bridge, client, cache=None):
1920 self._endpoint_resolver = endpoint_bridge
1921 self._cache = cache
1922 if self._cache is None:
1923 self._cache = {}
1925 # This needs to be a weak ref in order to prevent memory leaks on
1926 # python 2.6
1927 self._client = weakref.proxy(client)
1929 warnings.warn(
1930 'The S3RegionRedirector class has been deprecated for a new '
1931 'internal replacement. A future version of botocore may remove '
1932 'this class.',
1933 category=FutureWarning,
1934 )
1936 def register(self, event_emitter=None):
1937 emitter = event_emitter or self._client.meta.events
1938 emitter.register('needs-retry.s3', self.redirect_from_error)
1939 emitter.register('before-call.s3', self.set_request_url)
1940 emitter.register('before-parameter-build.s3', self.redirect_from_cache)
1942 def redirect_from_error(self, request_dict, response, operation, **kwargs):
1943 """
1944 An S3 request sent to the wrong region will return an error that
1945 contains the endpoint the request should be sent to. This handler
1946 will add the redirect information to the signing context and then
1947 redirect the request.
1948 """
1949 if response is None:
1950 # This could be none if there was a ConnectionError or other
1951 # transport error.
1952 return
1954 if self._is_s3_accesspoint(request_dict.get('context', {})):
1955 logger.debug(
1956 'S3 request was previously to an accesspoint, not redirecting.'
1957 )
1958 return
1960 if request_dict.get('context', {}).get('s3_redirected'):
1961 logger.debug(
1962 'S3 request was previously redirected, not redirecting.'
1963 )
1964 return
1966 error = response[1].get('Error', {})
1967 error_code = error.get('Code')
1968 response_metadata = response[1].get('ResponseMetadata', {})
1970 # We have to account for 400 responses because
1971 # if we sign a Head* request with the wrong region,
1972 # we'll get a 400 Bad Request but we won't get a
1973 # body saying it's an "AuthorizationHeaderMalformed".
1974 is_special_head_object = (
1975 error_code in ('301', '400') and operation.name == 'HeadObject'
1976 )
1977 is_special_head_bucket = (
1978 error_code in ('301', '400')
1979 and operation.name == 'HeadBucket'
1980 and 'x-amz-bucket-region'
1981 in response_metadata.get('HTTPHeaders', {})
1982 )
1983 is_wrong_signing_region = (
1984 error_code == 'AuthorizationHeaderMalformed' and 'Region' in error
1985 )
1986 is_redirect_status = response[0] is not None and response[
1987 0
1988 ].status_code in (301, 302, 307)
1989 is_permanent_redirect = error_code == 'PermanentRedirect'
1990 if not any(
1991 [
1992 is_special_head_object,
1993 is_wrong_signing_region,
1994 is_permanent_redirect,
1995 is_special_head_bucket,
1996 is_redirect_status,
1997 ]
1998 ):
1999 return
2001 bucket = request_dict['context']['signing']['bucket']
2002 client_region = request_dict['context'].get('client_region')
2003 new_region = self.get_bucket_region(bucket, response)
2005 if new_region is None:
2006 logger.debug(
2007 f"S3 client configured for region {client_region} but the bucket {bucket} is not "
2008 "in that region and the proper region could not be "
2009 "automatically determined."
2010 )
2011 return
2013 logger.debug(
2014 f"S3 client configured for region {client_region} but the bucket {bucket} is in region"
2015 f" {new_region}; Please configure the proper region to avoid multiple "
2016 "unnecessary redirects and signing attempts."
2017 )
2018 endpoint = self._endpoint_resolver.resolve('s3', new_region)
2019 endpoint = endpoint['endpoint_url']
2021 signing_context = {
2022 'region': new_region,
2023 'bucket': bucket,
2024 'endpoint': endpoint,
2025 }
2026 request_dict['context']['signing'] = signing_context
2028 self._cache[bucket] = signing_context
2029 self.set_request_url(request_dict, request_dict['context'])
2031 request_dict['context']['s3_redirected'] = True
2033 # Return 0 so it doesn't wait to retry
2034 return 0
2036 def get_bucket_region(self, bucket, response):
2037 """
2038 There are multiple potential sources for the new region to redirect to,
2039 but they aren't all universally available for use. This will try to
2040 find region from response elements, but will fall back to calling
2041 HEAD on the bucket if all else fails.
2043 :param bucket: The bucket to find the region for. This is necessary if
2044 the region is not available in the error response.
2045 :param response: A response representing a service request that failed
2046 due to incorrect region configuration.
2047 """
2048 # First try to source the region from the headers.
2049 service_response = response[1]
2050 response_headers = service_response['ResponseMetadata']['HTTPHeaders']
2051 if 'x-amz-bucket-region' in response_headers:
2052 return response_headers['x-amz-bucket-region']
2054 # Next, check the error body
2055 region = service_response.get('Error', {}).get('Region', None)
2056 if region is not None:
2057 return region
2059 # Finally, HEAD the bucket. No other choice sadly.
2060 try:
2061 response = self._client.head_bucket(Bucket=bucket)
2062 headers = response['ResponseMetadata']['HTTPHeaders']
2063 except ClientError as e:
2064 headers = e.response['ResponseMetadata']['HTTPHeaders']
2066 region = headers.get('x-amz-bucket-region', None)
2067 return region
2069 def set_request_url(self, params, context, **kwargs):
2070 endpoint = context.get('signing', {}).get('endpoint', None)
2071 if endpoint is not None:
2072 params['url'] = _get_new_endpoint(params['url'], endpoint, False)
2074 def redirect_from_cache(self, params, context, **kwargs):
2075 """
2076 This handler retrieves a given bucket's signing context from the cache
2077 and adds it into the request context.
2078 """
2079 if self._is_s3_accesspoint(context):
2080 return
2081 bucket = params.get('Bucket')
2082 signing_context = self._cache.get(bucket)
2083 if signing_context is not None:
2084 context['signing'] = signing_context
2085 else:
2086 context['signing'] = {'bucket': bucket}
2088 def _is_s3_accesspoint(self, context):
2089 return 's3_accesspoint' in context
2092class InvalidArnException(ValueError):
2093 pass
2096class ArnParser:
2097 def parse_arn(self, arn):
2098 arn_parts = arn.split(':', 5)
2099 if len(arn_parts) < 6:
2100 raise InvalidArnException(
2101 f'Provided ARN: {arn} must be of the format: '
2102 'arn:partition:service:region:account:resource'
2103 )
2104 return {
2105 'partition': arn_parts[1],
2106 'service': arn_parts[2],
2107 'region': arn_parts[3],
2108 'account': arn_parts[4],
2109 'resource': arn_parts[5],
2110 }
2112 @staticmethod
2113 def is_arn(value):
2114 if not isinstance(value, str) or not value.startswith('arn:'):
2115 return False
2116 arn_parser = ArnParser()
2117 try:
2118 arn_parser.parse_arn(value)
2119 return True
2120 except InvalidArnException:
2121 return False
2124class S3ArnParamHandler:
2125 _RESOURCE_REGEX = re.compile(
2126 r'^(?P<resource_type>accesspoint|outpost)[/:](?P<resource_name>.+)$'
2127 )
2128 _OUTPOST_RESOURCE_REGEX = re.compile(
2129 r'^(?P<outpost_name>[a-zA-Z0-9\-]{1,63})[/:]accesspoint[/:]'
2130 r'(?P<accesspoint_name>[a-zA-Z0-9\-]{1,63}$)'
2131 )
2132 _BLACKLISTED_OPERATIONS = ['CreateBucket']
2134 def __init__(self, arn_parser=None):
2135 self._arn_parser = arn_parser
2136 if arn_parser is None:
2137 self._arn_parser = ArnParser()
2139 def register(self, event_emitter):
2140 event_emitter.register('before-parameter-build.s3', self.handle_arn)
2142 def handle_arn(self, params, model, context, **kwargs):
2143 if model.name in self._BLACKLISTED_OPERATIONS:
2144 return
2145 arn_details = self._get_arn_details_from_bucket_param(params)
2146 if arn_details is None:
2147 return
2148 if arn_details['resource_type'] == 'accesspoint':
2149 self._store_accesspoint(params, context, arn_details)
2150 elif arn_details['resource_type'] == 'outpost':
2151 self._store_outpost(params, context, arn_details)
2153 def _get_arn_details_from_bucket_param(self, params):
2154 if 'Bucket' in params:
2155 try:
2156 arn = params['Bucket']
2157 arn_details = self._arn_parser.parse_arn(arn)
2158 self._add_resource_type_and_name(arn, arn_details)
2159 return arn_details
2160 except InvalidArnException:
2161 pass
2162 return None
2164 def _add_resource_type_and_name(self, arn, arn_details):
2165 match = self._RESOURCE_REGEX.match(arn_details['resource'])
2166 if match:
2167 arn_details['resource_type'] = match.group('resource_type')
2168 arn_details['resource_name'] = match.group('resource_name')
2169 else:
2170 raise UnsupportedS3ArnError(arn=arn)
2172 def _store_accesspoint(self, params, context, arn_details):
2173 # Ideally the access-point would be stored as a parameter in the
2174 # request where the serializer would then know how to serialize it,
2175 # but access-points are not modeled in S3 operations so it would fail
2176 # validation. Instead, we set the access-point to the bucket parameter
2177 # to have some value set when serializing the request and additional
2178 # information on the context from the arn to use in forming the
2179 # access-point endpoint.
2180 params['Bucket'] = arn_details['resource_name']
2181 context['s3_accesspoint'] = {
2182 'name': arn_details['resource_name'],
2183 'account': arn_details['account'],
2184 'partition': arn_details['partition'],
2185 'region': arn_details['region'],
2186 'service': arn_details['service'],
2187 }
2189 def _store_outpost(self, params, context, arn_details):
2190 resource_name = arn_details['resource_name']
2191 match = self._OUTPOST_RESOURCE_REGEX.match(resource_name)
2192 if not match:
2193 raise UnsupportedOutpostResourceError(resource_name=resource_name)
2194 # Because we need to set the bucket name to something to pass
2195 # validation we're going to use the access point name to be consistent
2196 # with normal access point arns.
2197 accesspoint_name = match.group('accesspoint_name')
2198 params['Bucket'] = accesspoint_name
2199 context['s3_accesspoint'] = {
2200 'outpost_name': match.group('outpost_name'),
2201 'name': accesspoint_name,
2202 'account': arn_details['account'],
2203 'partition': arn_details['partition'],
2204 'region': arn_details['region'],
2205 'service': arn_details['service'],
2206 }
2209class S3EndpointSetter:
2210 _DEFAULT_PARTITION = 'aws'
2211 _DEFAULT_DNS_SUFFIX = 'amazonaws.com'
2213 def __init__(
2214 self,
2215 endpoint_resolver,
2216 region=None,
2217 s3_config=None,
2218 endpoint_url=None,
2219 partition=None,
2220 use_fips_endpoint=False,
2221 ):
2222 # This is calling the endpoint_resolver in regions.py
2223 self._endpoint_resolver = endpoint_resolver
2224 self._region = region
2225 self._s3_config = s3_config
2226 self._use_fips_endpoint = use_fips_endpoint
2227 if s3_config is None:
2228 self._s3_config = {}
2229 self._endpoint_url = endpoint_url
2230 self._partition = partition
2231 if partition is None:
2232 self._partition = self._DEFAULT_PARTITION
2234 def register(self, event_emitter):
2235 event_emitter.register('before-sign.s3', self.set_endpoint)
2236 event_emitter.register('choose-signer.s3', self.set_signer)
2237 event_emitter.register(
2238 'before-call.s3.WriteGetObjectResponse',
2239 self.update_endpoint_to_s3_object_lambda,
2240 )
2242 def update_endpoint_to_s3_object_lambda(self, params, context, **kwargs):
2243 if self._use_accelerate_endpoint:
2244 raise UnsupportedS3ConfigurationError(
2245 msg='S3 client does not support accelerate endpoints for S3 Object Lambda operations',
2246 )
2248 self._override_signing_name(context, 's3-object-lambda')
2249 if self._endpoint_url:
2250 # Only update the url if an explicit url was not provided
2251 return
2253 resolver = self._endpoint_resolver
2254 # Constructing endpoints as s3-object-lambda as region
2255 resolved = resolver.construct_endpoint(
2256 's3-object-lambda', self._region
2257 )
2259 # Ideally we would be able to replace the endpoint before
2260 # serialization but there's no event to do that currently
2261 # host_prefix is all the arn/bucket specs
2262 new_endpoint = 'https://{host_prefix}{hostname}'.format(
2263 host_prefix=params['host_prefix'],
2264 hostname=resolved['hostname'],
2265 )
2267 params['url'] = _get_new_endpoint(params['url'], new_endpoint, False)
2269 def set_endpoint(self, request, **kwargs):
2270 if self._use_accesspoint_endpoint(request):
2271 self._validate_accesspoint_supported(request)
2272 self._validate_fips_supported(request)
2273 self._validate_global_regions(request)
2274 region_name = self._resolve_region_for_accesspoint_endpoint(
2275 request
2276 )
2277 self._resolve_signing_name_for_accesspoint_endpoint(request)
2278 self._switch_to_accesspoint_endpoint(request, region_name)
2279 return
2280 if self._use_accelerate_endpoint:
2281 if self._use_fips_endpoint:
2282 raise UnsupportedS3ConfigurationError(
2283 msg=(
2284 'Client is configured to use the FIPS psuedo region '
2285 f'for "{self._region}", but S3 Accelerate does not have any FIPS '
2286 'compatible endpoints.'
2287 )
2288 )
2289 switch_host_s3_accelerate(request=request, **kwargs)
2290 if self._s3_addressing_handler:
2291 self._s3_addressing_handler(request=request, **kwargs)
2293 def _use_accesspoint_endpoint(self, request):
2294 return 's3_accesspoint' in request.context
2296 def _validate_fips_supported(self, request):
2297 if not self._use_fips_endpoint:
2298 return
2299 if 'fips' in request.context['s3_accesspoint']['region']:
2300 raise UnsupportedS3AccesspointConfigurationError(
2301 msg={'Invalid ARN, FIPS region not allowed in ARN.'}
2302 )
2303 if 'outpost_name' in request.context['s3_accesspoint']:
2304 raise UnsupportedS3AccesspointConfigurationError(
2305 msg=(
2306 f'Client is configured to use the FIPS psuedo-region "{self._region}", '
2307 'but outpost ARNs do not support FIPS endpoints.'
2308 )
2309 )
2310 # Transforming psuedo region to actual region
2311 accesspoint_region = request.context['s3_accesspoint']['region']
2312 if accesspoint_region != self._region:
2313 if not self._s3_config.get('use_arn_region', True):
2314 # TODO: Update message to reflect use_arn_region
2315 # is not set
2316 raise UnsupportedS3AccesspointConfigurationError(
2317 msg=(
2318 'Client is configured to use the FIPS psuedo-region '
2319 f'for "{self._region}", but the access-point ARN provided is for '
2320 f'the "{accesspoint_region}" region. For clients using a FIPS '
2321 'psuedo-region calls to access-point ARNs in another '
2322 'region are not allowed.'
2323 )
2324 )
2326 def _validate_global_regions(self, request):
2327 if self._s3_config.get('use_arn_region', True):
2328 return
2329 if self._region in ['aws-global', 's3-external-1']:
2330 raise UnsupportedS3AccesspointConfigurationError(
2331 msg=(
2332 'Client is configured to use the global psuedo-region '
2333 f'"{self._region}". When providing access-point ARNs a regional '
2334 'endpoint must be specified.'
2335 )
2336 )
2338 def _validate_accesspoint_supported(self, request):
2339 if self._use_accelerate_endpoint:
2340 raise UnsupportedS3AccesspointConfigurationError(
2341 msg=(
2342 'Client does not support s3 accelerate configuration '
2343 'when an access-point ARN is specified.'
2344 )
2345 )
2346 request_partition = request.context['s3_accesspoint']['partition']
2347 if request_partition != self._partition:
2348 raise UnsupportedS3AccesspointConfigurationError(
2349 msg=(
2350 f'Client is configured for "{self._partition}" partition, but access-point'
2351 f' ARN provided is for "{request_partition}" partition. The client and '
2352 ' access-point partition must be the same.'
2353 )
2354 )
2355 s3_service = request.context['s3_accesspoint'].get('service')
2356 if s3_service == 's3-object-lambda' and self._s3_config.get(
2357 'use_dualstack_endpoint'
2358 ):
2359 raise UnsupportedS3AccesspointConfigurationError(
2360 msg=(
2361 'Client does not support s3 dualstack configuration '
2362 'when an S3 Object Lambda access point ARN is specified.'
2363 )
2364 )
2365 outpost_name = request.context['s3_accesspoint'].get('outpost_name')
2366 if outpost_name and self._s3_config.get('use_dualstack_endpoint'):
2367 raise UnsupportedS3AccesspointConfigurationError(
2368 msg=(
2369 'Client does not support s3 dualstack configuration '
2370 'when an outpost ARN is specified.'
2371 )
2372 )
2373 self._validate_mrap_s3_config(request)
2375 def _validate_mrap_s3_config(self, request):
2376 if not is_global_accesspoint(request.context):
2377 return
2378 if self._s3_config.get('s3_disable_multiregion_access_points'):
2379 raise UnsupportedS3AccesspointConfigurationError(
2380 msg=(
2381 'Invalid configuration, Multi-Region Access Point '
2382 'ARNs are disabled.'
2383 )
2384 )
2385 elif self._s3_config.get('use_dualstack_endpoint'):
2386 raise UnsupportedS3AccesspointConfigurationError(
2387 msg=(
2388 'Client does not support s3 dualstack configuration '
2389 'when a Multi-Region Access Point ARN is specified.'
2390 )
2391 )
2393 def _resolve_region_for_accesspoint_endpoint(self, request):
2394 if is_global_accesspoint(request.context):
2395 # Requests going to MRAP endpoints MUST be set to any (*) region.
2396 self._override_signing_region(request, '*')
2397 elif self._s3_config.get('use_arn_region', True):
2398 accesspoint_region = request.context['s3_accesspoint']['region']
2399 # If we are using the region from the access point,
2400 # we will also want to make sure that we set it as the
2401 # signing region as well
2402 self._override_signing_region(request, accesspoint_region)
2403 return accesspoint_region
2404 return self._region
2406 def set_signer(self, context, **kwargs):
2407 if is_global_accesspoint(context):
2408 if HAS_CRT:
2409 return 's3v4a'
2410 else:
2411 raise MissingDependencyException(
2412 msg="Using S3 with an MRAP arn requires an additional "
2413 "dependency. You will need to pip install "
2414 "botocore[crt] before proceeding."
2415 )
2417 def _resolve_signing_name_for_accesspoint_endpoint(self, request):
2418 accesspoint_service = request.context['s3_accesspoint']['service']
2419 self._override_signing_name(request.context, accesspoint_service)
2421 def _switch_to_accesspoint_endpoint(self, request, region_name):
2422 original_components = urlsplit(request.url)
2423 accesspoint_endpoint = urlunsplit(
2424 (
2425 original_components.scheme,
2426 self._get_netloc(request.context, region_name),
2427 self._get_accesspoint_path(
2428 original_components.path, request.context
2429 ),
2430 original_components.query,
2431 '',
2432 )
2433 )
2434 logger.debug(
2435 f'Updating URI from {request.url} to {accesspoint_endpoint}'
2436 )
2437 request.url = accesspoint_endpoint
2439 def _get_netloc(self, request_context, region_name):
2440 if is_global_accesspoint(request_context):
2441 return self._get_mrap_netloc(request_context)
2442 else:
2443 return self._get_accesspoint_netloc(request_context, region_name)
2445 def _get_mrap_netloc(self, request_context):
2446 s3_accesspoint = request_context['s3_accesspoint']
2447 region_name = 's3-global'
2448 mrap_netloc_components = [s3_accesspoint['name']]
2449 if self._endpoint_url:
2450 endpoint_url_netloc = urlsplit(self._endpoint_url).netloc
2451 mrap_netloc_components.append(endpoint_url_netloc)
2452 else:
2453 partition = s3_accesspoint['partition']
2454 mrap_netloc_components.extend(
2455 [
2456 'accesspoint',
2457 region_name,
2458 self._get_partition_dns_suffix(partition),
2459 ]
2460 )
2461 return '.'.join(mrap_netloc_components)
2463 def _get_accesspoint_netloc(self, request_context, region_name):
2464 s3_accesspoint = request_context['s3_accesspoint']
2465 accesspoint_netloc_components = [
2466 '{}-{}'.format(s3_accesspoint['name'], s3_accesspoint['account']),
2467 ]
2468 outpost_name = s3_accesspoint.get('outpost_name')
2469 if self._endpoint_url:
2470 if outpost_name:
2471 accesspoint_netloc_components.append(outpost_name)
2472 endpoint_url_netloc = urlsplit(self._endpoint_url).netloc
2473 accesspoint_netloc_components.append(endpoint_url_netloc)
2474 else:
2475 if outpost_name:
2476 outpost_host = [outpost_name, 's3-outposts']
2477 accesspoint_netloc_components.extend(outpost_host)
2478 elif s3_accesspoint['service'] == 's3-object-lambda':
2479 component = self._inject_fips_if_needed(
2480 's3-object-lambda', request_context
2481 )
2482 accesspoint_netloc_components.append(component)
2483 else:
2484 component = self._inject_fips_if_needed(
2485 's3-accesspoint', request_context
2486 )
2487 accesspoint_netloc_components.append(component)
2488 if self._s3_config.get('use_dualstack_endpoint'):
2489 accesspoint_netloc_components.append('dualstack')
2490 accesspoint_netloc_components.extend(
2491 [region_name, self._get_dns_suffix(region_name)]
2492 )
2493 return '.'.join(accesspoint_netloc_components)
2495 def _inject_fips_if_needed(self, component, request_context):
2496 if self._use_fips_endpoint:
2497 return f'{component}-fips'
2498 return component
2500 def _get_accesspoint_path(self, original_path, request_context):
2501 # The Bucket parameter was substituted with the access-point name as
2502 # some value was required in serializing the bucket name. Now that
2503 # we are making the request directly to the access point, we will
2504 # want to remove that access-point name from the path.
2505 name = request_context['s3_accesspoint']['name']
2506 # All S3 operations require at least a / in their path.
2507 return original_path.replace('/' + name, '', 1) or '/'
2509 def _get_partition_dns_suffix(self, partition_name):
2510 dns_suffix = self._endpoint_resolver.get_partition_dns_suffix(
2511 partition_name
2512 )
2513 if dns_suffix is None:
2514 dns_suffix = self._DEFAULT_DNS_SUFFIX
2515 return dns_suffix
2517 def _get_dns_suffix(self, region_name):
2518 resolved = self._endpoint_resolver.construct_endpoint(
2519 's3', region_name
2520 )
2521 dns_suffix = self._DEFAULT_DNS_SUFFIX
2522 if resolved and 'dnsSuffix' in resolved:
2523 dns_suffix = resolved['dnsSuffix']
2524 return dns_suffix
2526 def _override_signing_region(self, request, region_name):
2527 signing_context = request.context.get('signing', {})
2528 # S3SigV4Auth will use the context['signing']['region'] value to
2529 # sign with if present. This is used by the Bucket redirector
2530 # as well but we should be fine because the redirector is never
2531 # used in combination with the accesspoint setting logic.
2532 signing_context['region'] = region_name
2533 request.context['signing'] = signing_context
2535 def _override_signing_name(self, context, signing_name):
2536 signing_context = context.get('signing', {})
2537 # S3SigV4Auth will use the context['signing']['signing_name'] value to
2538 # sign with if present. This is used by the Bucket redirector
2539 # as well but we should be fine because the redirector is never
2540 # used in combination with the accesspoint setting logic.
2541 signing_context['signing_name'] = signing_name
2542 context['signing'] = signing_context
2544 @CachedProperty
2545 def _use_accelerate_endpoint(self):
2546 # Enable accelerate if the configuration is set to to true or the
2547 # endpoint being used matches one of the accelerate endpoints.
2549 # Accelerate has been explicitly configured.
2550 if self._s3_config.get('use_accelerate_endpoint'):
2551 return True
2553 # Accelerate mode is turned on automatically if an endpoint url is
2554 # provided that matches the accelerate scheme.
2555 if self._endpoint_url is None:
2556 return False
2558 # Accelerate is only valid for Amazon endpoints.
2559 netloc = urlsplit(self._endpoint_url).netloc
2560 if not netloc.endswith('amazonaws.com'):
2561 return False
2563 # The first part of the url should always be s3-accelerate.
2564 parts = netloc.split('.')
2565 if parts[0] != 's3-accelerate':
2566 return False
2568 # Url parts between 's3-accelerate' and 'amazonaws.com' which
2569 # represent different url features.
2570 feature_parts = parts[1:-2]
2572 # There should be no duplicate url parts.
2573 if len(feature_parts) != len(set(feature_parts)):
2574 return False
2576 # Remaining parts must all be in the whitelist.
2577 return all(p in S3_ACCELERATE_WHITELIST for p in feature_parts)
2579 @CachedProperty
2580 def _addressing_style(self):
2581 # Use virtual host style addressing if accelerate is enabled or if
2582 # the given endpoint url is an accelerate endpoint.
2583 if self._use_accelerate_endpoint:
2584 return 'virtual'
2586 # If a particular addressing style is configured, use it.
2587 configured_addressing_style = self._s3_config.get('addressing_style')
2588 if configured_addressing_style:
2589 return configured_addressing_style
2591 @CachedProperty
2592 def _s3_addressing_handler(self):
2593 # If virtual host style was configured, use it regardless of whether
2594 # or not the bucket looks dns compatible.
2595 if self._addressing_style == 'virtual':
2596 logger.debug("Using S3 virtual host style addressing.")
2597 return switch_to_virtual_host_style
2599 # If path style is configured, no additional steps are needed. If
2600 # endpoint_url was specified, don't default to virtual. We could
2601 # potentially default provided endpoint urls to virtual hosted
2602 # style, but for now it is avoided.
2603 if self._addressing_style == 'path' or self._endpoint_url is not None:
2604 logger.debug("Using S3 path style addressing.")
2605 return None
2607 logger.debug(
2608 "Defaulting to S3 virtual host style addressing with "
2609 "path style addressing fallback."
2610 )
2612 # By default, try to use virtual style with path fallback.
2613 return fix_s3_host
2616class S3ControlEndpointSetter:
2617 _DEFAULT_PARTITION = 'aws'
2618 _DEFAULT_DNS_SUFFIX = 'amazonaws.com'
2619 _HOST_LABEL_REGEX = re.compile(r'^[a-zA-Z0-9\-]{1,63}$')
2621 def __init__(
2622 self,
2623 endpoint_resolver,
2624 region=None,
2625 s3_config=None,
2626 endpoint_url=None,
2627 partition=None,
2628 use_fips_endpoint=False,
2629 ):
2630 self._endpoint_resolver = endpoint_resolver
2631 self._region = region
2632 self._s3_config = s3_config
2633 self._use_fips_endpoint = use_fips_endpoint
2634 if s3_config is None:
2635 self._s3_config = {}
2636 self._endpoint_url = endpoint_url
2637 self._partition = partition
2638 if partition is None:
2639 self._partition = self._DEFAULT_PARTITION
2641 def register(self, event_emitter):
2642 event_emitter.register('before-sign.s3-control', self.set_endpoint)
2644 def set_endpoint(self, request, **kwargs):
2645 if self._use_endpoint_from_arn_details(request):
2646 self._validate_endpoint_from_arn_details_supported(request)
2647 region_name = self._resolve_region_from_arn_details(request)
2648 self._resolve_signing_name_from_arn_details(request)
2649 self._resolve_endpoint_from_arn_details(request, region_name)
2650 self._add_headers_from_arn_details(request)
2651 elif self._use_endpoint_from_outpost_id(request):
2652 self._validate_outpost_redirection_valid(request)
2653 self._override_signing_name(request, 's3-outposts')
2654 new_netloc = self._construct_outpost_endpoint(self._region)
2655 self._update_request_netloc(request, new_netloc)
2657 def _use_endpoint_from_arn_details(self, request):
2658 return 'arn_details' in request.context
2660 def _use_endpoint_from_outpost_id(self, request):
2661 return 'outpost_id' in request.context
2663 def _validate_endpoint_from_arn_details_supported(self, request):
2664 if 'fips' in request.context['arn_details']['region']:
2665 raise UnsupportedS3ControlArnError(
2666 arn=request.context['arn_details']['original'],
2667 msg='Invalid ARN, FIPS region not allowed in ARN.',
2668 )
2669 if not self._s3_config.get('use_arn_region', False):
2670 arn_region = request.context['arn_details']['region']
2671 if arn_region != self._region:
2672 error_msg = (
2673 'The use_arn_region configuration is disabled but '
2674 f'received arn for "{arn_region}" when the client is configured '
2675 f'to use "{self._region}"'
2676 )
2677 raise UnsupportedS3ControlConfigurationError(msg=error_msg)
2678 request_partion = request.context['arn_details']['partition']
2679 if request_partion != self._partition:
2680 raise UnsupportedS3ControlConfigurationError(
2681 msg=(
2682 f'Client is configured for "{self._partition}" partition, but arn '
2683 f'provided is for "{request_partion}" partition. The client and '
2684 'arn partition must be the same.'
2685 )
2686 )
2687 if self._s3_config.get('use_accelerate_endpoint'):
2688 raise UnsupportedS3ControlConfigurationError(
2689 msg='S3 control client does not support accelerate endpoints',
2690 )
2691 if 'outpost_name' in request.context['arn_details']:
2692 self._validate_outpost_redirection_valid(request)
2694 def _validate_outpost_redirection_valid(self, request):
2695 if self._s3_config.get('use_dualstack_endpoint'):
2696 raise UnsupportedS3ControlConfigurationError(
2697 msg=(
2698 'Client does not support s3 dualstack configuration '
2699 'when an outpost is specified.'
2700 )
2701 )
2703 def _resolve_region_from_arn_details(self, request):
2704 if self._s3_config.get('use_arn_region', False):
2705 arn_region = request.context['arn_details']['region']
2706 # If we are using the region from the expanded arn, we will also
2707 # want to make sure that we set it as the signing region as well
2708 self._override_signing_region(request, arn_region)
2709 return arn_region
2710 return self._region
2712 def _resolve_signing_name_from_arn_details(self, request):
2713 arn_service = request.context['arn_details']['service']
2714 self._override_signing_name(request, arn_service)
2715 return arn_service
2717 def _resolve_endpoint_from_arn_details(self, request, region_name):
2718 new_netloc = self._resolve_netloc_from_arn_details(
2719 request, region_name
2720 )
2721 self._update_request_netloc(request, new_netloc)
2723 def _update_request_netloc(self, request, new_netloc):
2724 original_components = urlsplit(request.url)
2725 arn_details_endpoint = urlunsplit(
2726 (
2727 original_components.scheme,
2728 new_netloc,
2729 original_components.path,
2730 original_components.query,
2731 '',
2732 )
2733 )
2734 logger.debug(
2735 f'Updating URI from {request.url} to {arn_details_endpoint}'
2736 )
2737 request.url = arn_details_endpoint
2739 def _resolve_netloc_from_arn_details(self, request, region_name):
2740 arn_details = request.context['arn_details']
2741 if 'outpost_name' in arn_details:
2742 return self._construct_outpost_endpoint(region_name)
2743 account = arn_details['account']
2744 return self._construct_s3_control_endpoint(region_name, account)
2746 def _is_valid_host_label(self, label):
2747 return self._HOST_LABEL_REGEX.match(label)
2749 def _validate_host_labels(self, *labels):
2750 for label in labels:
2751 if not self._is_valid_host_label(label):
2752 raise InvalidHostLabelError(label=label)
2754 def _construct_s3_control_endpoint(self, region_name, account):
2755 self._validate_host_labels(region_name, account)
2756 if self._endpoint_url:
2757 endpoint_url_netloc = urlsplit(self._endpoint_url).netloc
2758 netloc = [account, endpoint_url_netloc]
2759 else:
2760 netloc = [
2761 account,
2762 's3-control',
2763 ]
2764 self._add_dualstack(netloc)
2765 dns_suffix = self._get_dns_suffix(region_name)
2766 netloc.extend([region_name, dns_suffix])
2767 return self._construct_netloc(netloc)
2769 def _construct_outpost_endpoint(self, region_name):
2770 self._validate_host_labels(region_name)
2771 if self._endpoint_url:
2772 return urlsplit(self._endpoint_url).netloc
2773 else:
2774 netloc = [
2775 's3-outposts',
2776 region_name,
2777 self._get_dns_suffix(region_name),
2778 ]
2779 self._add_fips(netloc)
2780 return self._construct_netloc(netloc)
2782 def _construct_netloc(self, netloc):
2783 return '.'.join(netloc)
2785 def _add_fips(self, netloc):
2786 if self._use_fips_endpoint:
2787 netloc[0] = netloc[0] + '-fips'
2789 def _add_dualstack(self, netloc):
2790 if self._s3_config.get('use_dualstack_endpoint'):
2791 netloc.append('dualstack')
2793 def _get_dns_suffix(self, region_name):
2794 resolved = self._endpoint_resolver.construct_endpoint(
2795 's3', region_name
2796 )
2797 dns_suffix = self._DEFAULT_DNS_SUFFIX
2798 if resolved and 'dnsSuffix' in resolved:
2799 dns_suffix = resolved['dnsSuffix']
2800 return dns_suffix
2802 def _override_signing_region(self, request, region_name):
2803 signing_context = request.context.get('signing', {})
2804 # S3SigV4Auth will use the context['signing']['region'] value to
2805 # sign with if present. This is used by the Bucket redirector
2806 # as well but we should be fine because the redirector is never
2807 # used in combination with the accesspoint setting logic.
2808 signing_context['region'] = region_name
2809 request.context['signing'] = signing_context
2811 def _override_signing_name(self, request, signing_name):
2812 signing_context = request.context.get('signing', {})
2813 # S3SigV4Auth will use the context['signing']['signing_name'] value to
2814 # sign with if present. This is used by the Bucket redirector
2815 # as well but we should be fine because the redirector is never
2816 # used in combination with the accesspoint setting logic.
2817 signing_context['signing_name'] = signing_name
2818 request.context['signing'] = signing_context
2820 def _add_headers_from_arn_details(self, request):
2821 arn_details = request.context['arn_details']
2822 outpost_name = arn_details.get('outpost_name')
2823 if outpost_name:
2824 self._add_outpost_id_header(request, outpost_name)
2826 def _add_outpost_id_header(self, request, outpost_name):
2827 request.headers['x-amz-outpost-id'] = outpost_name
2830class S3ControlArnParamHandler:
2831 """This handler has been replaced by S3ControlArnParamHandlerv2. The
2832 original version remains in place for any third-party importers.
2833 """
2835 _RESOURCE_SPLIT_REGEX = re.compile(r'[/:]')
2837 def __init__(self, arn_parser=None):
2838 self._arn_parser = arn_parser
2839 if arn_parser is None:
2840 self._arn_parser = ArnParser()
2841 warnings.warn(
2842 'The S3ControlArnParamHandler class has been deprecated for a new '
2843 'internal replacement. A future version of botocore may remove '
2844 'this class.',
2845 category=FutureWarning,
2846 )
2848 def register(self, event_emitter):
2849 event_emitter.register(
2850 'before-parameter-build.s3-control',
2851 self.handle_arn,
2852 )
2854 def handle_arn(self, params, model, context, **kwargs):
2855 if model.name in ('CreateBucket', 'ListRegionalBuckets'):
2856 # CreateBucket and ListRegionalBuckets are special cases that do
2857 # not obey ARN based redirection but will redirect based off of the
2858 # presence of the OutpostId parameter
2859 self._handle_outpost_id_param(params, model, context)
2860 else:
2861 self._handle_name_param(params, model, context)
2862 self._handle_bucket_param(params, model, context)
2864 def _get_arn_details_from_param(self, params, param_name):
2865 if param_name not in params:
2866 return None
2867 try:
2868 arn = params[param_name]
2869 arn_details = self._arn_parser.parse_arn(arn)
2870 arn_details['original'] = arn
2871 arn_details['resources'] = self._split_resource(arn_details)
2872 return arn_details
2873 except InvalidArnException:
2874 return None
2876 def _split_resource(self, arn_details):
2877 return self._RESOURCE_SPLIT_REGEX.split(arn_details['resource'])
2879 def _override_account_id_param(self, params, arn_details):
2880 account_id = arn_details['account']
2881 if 'AccountId' in params and params['AccountId'] != account_id:
2882 error_msg = (
2883 'Account ID in arn does not match the AccountId parameter '
2884 'provided: "{}"'
2885 ).format(params['AccountId'])
2886 raise UnsupportedS3ControlArnError(
2887 arn=arn_details['original'],
2888 msg=error_msg,
2889 )
2890 params['AccountId'] = account_id
2892 def _handle_outpost_id_param(self, params, model, context):
2893 if 'OutpostId' not in params:
2894 return
2895 context['outpost_id'] = params['OutpostId']
2897 def _handle_name_param(self, params, model, context):
2898 # CreateAccessPoint is a special case that does not expand Name
2899 if model.name == 'CreateAccessPoint':
2900 return
2901 arn_details = self._get_arn_details_from_param(params, 'Name')
2902 if arn_details is None:
2903 return
2904 if self._is_outpost_accesspoint(arn_details):
2905 self._store_outpost_accesspoint(params, context, arn_details)
2906 else:
2907 error_msg = 'The Name parameter does not support the provided ARN'
2908 raise UnsupportedS3ControlArnError(
2909 arn=arn_details['original'],
2910 msg=error_msg,
2911 )
2913 def _is_outpost_accesspoint(self, arn_details):
2914 if arn_details['service'] != 's3-outposts':
2915 return False
2916 resources = arn_details['resources']
2917 if len(resources) != 4:
2918 return False
2919 # Resource must be of the form outpost/op-123/accesspoint/name
2920 return resources[0] == 'outpost' and resources[2] == 'accesspoint'
2922 def _store_outpost_accesspoint(self, params, context, arn_details):
2923 self._override_account_id_param(params, arn_details)
2924 accesspoint_name = arn_details['resources'][3]
2925 params['Name'] = accesspoint_name
2926 arn_details['accesspoint_name'] = accesspoint_name
2927 arn_details['outpost_name'] = arn_details['resources'][1]
2928 context['arn_details'] = arn_details
2930 def _handle_bucket_param(self, params, model, context):
2931 arn_details = self._get_arn_details_from_param(params, 'Bucket')
2932 if arn_details is None:
2933 return
2934 if self._is_outpost_bucket(arn_details):
2935 self._store_outpost_bucket(params, context, arn_details)
2936 else:
2937 error_msg = (
2938 'The Bucket parameter does not support the provided ARN'
2939 )
2940 raise UnsupportedS3ControlArnError(
2941 arn=arn_details['original'],
2942 msg=error_msg,
2943 )
2945 def _is_outpost_bucket(self, arn_details):
2946 if arn_details['service'] != 's3-outposts':
2947 return False
2948 resources = arn_details['resources']
2949 if len(resources) != 4:
2950 return False
2951 # Resource must be of the form outpost/op-123/bucket/name
2952 return resources[0] == 'outpost' and resources[2] == 'bucket'
2954 def _store_outpost_bucket(self, params, context, arn_details):
2955 self._override_account_id_param(params, arn_details)
2956 bucket_name = arn_details['resources'][3]
2957 params['Bucket'] = bucket_name
2958 arn_details['bucket_name'] = bucket_name
2959 arn_details['outpost_name'] = arn_details['resources'][1]
2960 context['arn_details'] = arn_details
2963class S3ControlArnParamHandlerv2(S3ControlArnParamHandler):
2964 """Updated version of S3ControlArnParamHandler for use when
2965 EndpointRulesetResolver is in use for endpoint resolution.
2967 This class is considered private and subject to abrupt breaking changes or
2968 removal without prior announcement. Please do not use it directly.
2969 """
2971 def __init__(self, arn_parser=None):
2972 self._arn_parser = arn_parser
2973 if arn_parser is None:
2974 self._arn_parser = ArnParser()
2976 def register(self, event_emitter):
2977 event_emitter.register(
2978 'before-endpoint-resolution.s3-control',
2979 self.handle_arn,
2980 )
2982 def _handle_name_param(self, params, model, context):
2983 # CreateAccessPoint is a special case that does not expand Name
2984 if model.name == 'CreateAccessPoint':
2985 return
2986 arn_details = self._get_arn_details_from_param(params, 'Name')
2987 if arn_details is None:
2988 return
2989 self._raise_for_fips_pseudo_region(arn_details)
2990 self._raise_for_accelerate_endpoint(context)
2991 if self._is_outpost_accesspoint(arn_details):
2992 self._store_outpost_accesspoint(params, context, arn_details)
2993 else:
2994 error_msg = 'The Name parameter does not support the provided ARN'
2995 raise UnsupportedS3ControlArnError(
2996 arn=arn_details['original'],
2997 msg=error_msg,
2998 )
3000 def _store_outpost_accesspoint(self, params, context, arn_details):
3001 self._override_account_id_param(params, arn_details)
3003 def _handle_bucket_param(self, params, model, context):
3004 arn_details = self._get_arn_details_from_param(params, 'Bucket')
3005 if arn_details is None:
3006 return
3007 self._raise_for_fips_pseudo_region(arn_details)
3008 self._raise_for_accelerate_endpoint(context)
3009 if self._is_outpost_bucket(arn_details):
3010 self._store_outpost_bucket(params, context, arn_details)
3011 else:
3012 error_msg = (
3013 'The Bucket parameter does not support the provided ARN'
3014 )
3015 raise UnsupportedS3ControlArnError(
3016 arn=arn_details['original'],
3017 msg=error_msg,
3018 )
3020 def _store_outpost_bucket(self, params, context, arn_details):
3021 self._override_account_id_param(params, arn_details)
3023 def _raise_for_fips_pseudo_region(self, arn_details):
3024 # FIPS pseudo region names cannot be used in ARNs
3025 arn_region = arn_details['region']
3026 if arn_region.startswith('fips-') or arn_region.endswith('fips-'):
3027 raise UnsupportedS3ControlArnError(
3028 arn=arn_details['original'],
3029 msg='Invalid ARN, FIPS region not allowed in ARN.',
3030 )
3032 def _raise_for_accelerate_endpoint(self, context):
3033 s3_config = context['client_config'].s3 or {}
3034 if s3_config.get('use_accelerate_endpoint'):
3035 raise UnsupportedS3ControlConfigurationError(
3036 msg='S3 control client does not support accelerate endpoints',
3037 )
3040class ContainerMetadataFetcher:
3041 TIMEOUT_SECONDS = 2
3042 RETRY_ATTEMPTS = 3
3043 SLEEP_TIME = 1
3044 IP_ADDRESS = '169.254.170.2'
3045 _ALLOWED_HOSTS = [
3046 IP_ADDRESS,
3047 '169.254.170.23',
3048 'fd00:ec2::23',
3049 'localhost',
3050 ]
3052 def __init__(self, session=None, sleep=time.sleep):
3053 if session is None:
3054 session = botocore.httpsession.URLLib3Session(
3055 timeout=self.TIMEOUT_SECONDS
3056 )
3057 self._session = session
3058 self._sleep = sleep
3060 def retrieve_full_uri(self, full_url, headers=None):
3061 """Retrieve JSON metadata from container metadata.
3063 :type full_url: str
3064 :param full_url: The full URL of the metadata service.
3065 This should include the scheme as well, e.g
3066 "http://localhost:123/foo"
3068 """
3069 self._validate_allowed_url(full_url)
3070 return self._retrieve_credentials(full_url, headers)
3072 def _validate_allowed_url(self, full_url):
3073 parsed = botocore.compat.urlparse(full_url)
3074 if self._is_loopback_address(parsed.hostname):
3075 return
3076 is_whitelisted_host = self._check_if_whitelisted_host(parsed.hostname)
3077 if not is_whitelisted_host:
3078 raise ValueError(
3079 f"Unsupported host '{parsed.hostname}'. Can only retrieve metadata "
3080 f"from a loopback address or one of these hosts: {', '.join(self._ALLOWED_HOSTS)}"
3081 )
3083 def _is_loopback_address(self, hostname):
3084 try:
3085 ip = ip_address(hostname)
3086 return ip.is_loopback
3087 except ValueError:
3088 return False
3090 def _check_if_whitelisted_host(self, host):
3091 if host in self._ALLOWED_HOSTS:
3092 return True
3093 return False
3095 def retrieve_uri(self, relative_uri):
3096 """Retrieve JSON metadata from container metadata.
3098 :type relative_uri: str
3099 :param relative_uri: A relative URI, e.g "/foo/bar?id=123"
3101 :return: The parsed JSON response.
3103 """
3104 full_url = self.full_url(relative_uri)
3105 return self._retrieve_credentials(full_url)
3107 def _retrieve_credentials(self, full_url, extra_headers=None):
3108 headers = {'Accept': 'application/json'}
3109 if extra_headers is not None:
3110 headers.update(extra_headers)
3111 attempts = 0
3112 while True:
3113 try:
3114 return self._get_response(
3115 full_url, headers, self.TIMEOUT_SECONDS
3116 )
3117 except MetadataRetrievalError as e:
3118 logger.debug(
3119 "Received error when attempting to retrieve "
3120 "container metadata: %s",
3121 e,
3122 exc_info=True,
3123 )
3124 self._sleep(self.SLEEP_TIME)
3125 attempts += 1
3126 if attempts >= self.RETRY_ATTEMPTS:
3127 raise
3129 def _get_response(self, full_url, headers, timeout):
3130 try:
3131 AWSRequest = botocore.awsrequest.AWSRequest
3132 request = AWSRequest(method='GET', url=full_url, headers=headers)
3133 response = self._session.send(request.prepare())
3134 response_text = response.content.decode('utf-8')
3135 if response.status_code != 200:
3136 raise MetadataRetrievalError(
3137 error_msg=(
3138 f"Received non 200 response {response.status_code} "
3139 f"from container metadata: {response_text}"
3140 )
3141 )
3142 try:
3143 return json.loads(response_text)
3144 except ValueError:
3145 error_msg = "Unable to parse JSON returned from container metadata services"
3146 logger.debug('%s:%s', error_msg, response_text)
3147 raise MetadataRetrievalError(error_msg=error_msg)
3148 except RETRYABLE_HTTP_ERRORS as e:
3149 error_msg = (
3150 "Received error when attempting to retrieve "
3151 f"container metadata: {e}"
3152 )
3153 raise MetadataRetrievalError(error_msg=error_msg)
3155 def full_url(self, relative_uri):
3156 return f'http://{self.IP_ADDRESS}{relative_uri}'
3159def get_environ_proxies(url):
3160 if should_bypass_proxies(url):
3161 return {}
3162 else:
3163 return getproxies()
3166def should_bypass_proxies(url):
3167 """
3168 Returns whether we should bypass proxies or not.
3169 """
3170 # NOTE: requests allowed for ip/cidr entries in no_proxy env that we don't
3171 # support current as urllib only checks DNS suffix
3172 # If the system proxy settings indicate that this URL should be bypassed,
3173 # don't proxy.
3174 # The proxy_bypass function is incredibly buggy on OS X in early versions
3175 # of Python 2.6, so allow this call to fail. Only catch the specific
3176 # exceptions we've seen, though: this call failing in other ways can reveal
3177 # legitimate problems.
3178 try:
3179 if proxy_bypass(urlparse(url).netloc):
3180 return True
3181 except (TypeError, socket.gaierror):
3182 pass
3184 return False
3187def determine_content_length(body):
3188 # No body, content length of 0
3189 if not body:
3190 return 0
3192 # Try asking the body for it's length
3193 try:
3194 return len(body)
3195 except (AttributeError, TypeError):
3196 pass
3198 # Try getting the length from a seekable stream
3199 if hasattr(body, 'seek') and hasattr(body, 'tell'):
3200 try:
3201 orig_pos = body.tell()
3202 body.seek(0, 2)
3203 end_file_pos = body.tell()
3204 body.seek(orig_pos)
3205 return end_file_pos - orig_pos
3206 except io.UnsupportedOperation:
3207 # in case when body is, for example, io.BufferedIOBase object
3208 # it has "seek" method which throws "UnsupportedOperation"
3209 # exception in such case we want to fall back to "chunked"
3210 # encoding
3211 pass
3212 # Failed to determine the length
3213 return None
3216def get_encoding_from_headers(headers, default='ISO-8859-1'):
3217 """Returns encodings from given HTTP Header Dict.
3219 :param headers: dictionary to extract encoding from.
3220 :param default: default encoding if the content-type is text
3221 """
3223 content_type = headers.get('content-type')
3225 if not content_type:
3226 return None
3228 message = email.message.Message()
3229 message['content-type'] = content_type
3230 charset = message.get_param("charset")
3232 if charset is not None:
3233 return charset
3235 if 'text' in content_type:
3236 return default
3239def calculate_md5(body, **kwargs):
3240 """This function has been deprecated, but is kept for backwards compatibility."""
3241 if isinstance(body, (bytes, bytearray)):
3242 binary_md5 = _calculate_md5_from_bytes(body)
3243 else:
3244 binary_md5 = _calculate_md5_from_file(body)
3245 return base64.b64encode(binary_md5).decode('ascii')
3248def _calculate_md5_from_bytes(body_bytes):
3249 """This function has been deprecated, but is kept for backwards compatibility."""
3250 md5 = get_md5(body_bytes, usedforsecurity=False)
3251 return md5.digest()
3254def _calculate_md5_from_file(fileobj):
3255 """This function has been deprecated, but is kept for backwards compatibility."""
3256 start_position = fileobj.tell()
3257 md5 = get_md5(usedforsecurity=False)
3258 for chunk in iter(lambda: fileobj.read(1024 * 1024), b''):
3259 md5.update(chunk)
3260 fileobj.seek(start_position)
3261 return md5.digest()
3264def _is_s3express_request(params):
3265 endpoint_properties = params.get('context', {}).get(
3266 'endpoint_properties', {}
3267 )
3268 return endpoint_properties.get('backend') == 'S3Express'
3271def has_checksum_header(params):
3272 """
3273 Checks if a header starting with "x-amz-checksum-" is provided in a request.
3275 This function is considered private and subject to abrupt breaking changes or
3276 removal without prior announcement. Please do not use it directly.
3277 """
3278 headers = params['headers']
3280 # If a header matching the x-amz-checksum-* pattern is present, we
3281 # assume a checksum has already been provided by the user.
3282 for header in headers:
3283 if CHECKSUM_HEADER_PATTERN.match(header):
3284 return True
3286 return False
3289def conditionally_calculate_checksum(params, **kwargs):
3290 """This function has been deprecated, but is kept for backwards compatibility."""
3291 if not has_checksum_header(params):
3292 conditionally_calculate_md5(params, **kwargs)
3293 conditionally_enable_crc32(params, **kwargs)
3296def conditionally_enable_crc32(params, **kwargs):
3297 """This function has been deprecated, but is kept for backwards compatibility."""
3298 checksum_context = params.get('context', {}).get('checksum', {})
3299 checksum_algorithm = checksum_context.get('request_algorithm')
3300 if (
3301 _is_s3express_request(params)
3302 and params['body'] is not None
3303 and checksum_algorithm in (None, "conditional-md5")
3304 ):
3305 params['context']['checksum'] = {
3306 'request_algorithm': {
3307 'algorithm': 'crc32',
3308 'in': 'header',
3309 'name': 'x-amz-checksum-crc32',
3310 }
3311 }
3314def conditionally_calculate_md5(params, **kwargs):
3315 """Only add a Content-MD5 if the system supports it.
3317 This function has been deprecated, but is kept for backwards compatibility.
3318 """
3319 body = params['body']
3320 checksum_context = params.get('context', {}).get('checksum', {})
3321 checksum_algorithm = checksum_context.get('request_algorithm')
3322 if checksum_algorithm and checksum_algorithm != 'conditional-md5':
3323 # Skip for requests that will have a flexible checksum applied
3324 return
3326 if has_checksum_header(params):
3327 # Don't add a new header if one is already available.
3328 return
3330 if _is_s3express_request(params):
3331 # S3Express doesn't support MD5
3332 return
3334 if MD5_AVAILABLE and body is not None:
3335 md5_digest = calculate_md5(body, **kwargs)
3336 params['headers']['Content-MD5'] = md5_digest
3339class FileWebIdentityTokenLoader:
3340 def __init__(self, web_identity_token_path, _open=open):
3341 self._web_identity_token_path = web_identity_token_path
3342 self._open = _open
3344 def __call__(self):
3345 with self._open(self._web_identity_token_path) as token_file:
3346 return token_file.read()
3349class SSOTokenLoader:
3350 def __init__(self, cache=None):
3351 if cache is None:
3352 cache = {}
3353 self._cache = cache
3355 def _generate_cache_key(self, start_url, session_name):
3356 input_str = start_url
3357 if session_name is not None:
3358 input_str = session_name
3359 return hashlib.sha1(input_str.encode('utf-8')).hexdigest()
3361 def save_token(self, start_url, token, session_name=None):
3362 cache_key = self._generate_cache_key(start_url, session_name)
3363 self._cache[cache_key] = token
3365 def __call__(self, start_url, session_name=None):
3366 cache_key = self._generate_cache_key(start_url, session_name)
3367 logger.debug(f'Checking for cached token at: {cache_key}')
3368 if cache_key not in self._cache:
3369 name = start_url
3370 if session_name is not None:
3371 name = session_name
3372 error_msg = f'Token for {name} does not exist'
3373 raise SSOTokenLoadError(error_msg=error_msg)
3375 token = self._cache[cache_key]
3376 if 'accessToken' not in token or 'expiresAt' not in token:
3377 error_msg = f'Token for {start_url} is invalid'
3378 raise SSOTokenLoadError(error_msg=error_msg)
3379 return token
3382class EventbridgeSignerSetter:
3383 _DEFAULT_PARTITION = 'aws'
3384 _DEFAULT_DNS_SUFFIX = 'amazonaws.com'
3386 def __init__(self, endpoint_resolver, region=None, endpoint_url=None):
3387 self._endpoint_resolver = endpoint_resolver
3388 self._region = region
3389 self._endpoint_url = endpoint_url
3391 def register(self, event_emitter):
3392 event_emitter.register(
3393 'before-parameter-build.events.PutEvents',
3394 self.check_for_global_endpoint,
3395 )
3396 event_emitter.register(
3397 'before-call.events.PutEvents', self.set_endpoint_url
3398 )
3400 def set_endpoint_url(self, params, context, **kwargs):
3401 if 'eventbridge_endpoint' in context:
3402 endpoint = context['eventbridge_endpoint']
3403 logger.debug(f"Rewriting URL from {params['url']} to {endpoint}")
3404 params['url'] = endpoint
3406 def check_for_global_endpoint(self, params, context, **kwargs):
3407 endpoint = params.get('EndpointId')
3408 if endpoint is None:
3409 return
3411 if len(endpoint) == 0:
3412 raise InvalidEndpointConfigurationError(
3413 msg='EndpointId must not be a zero length string'
3414 )
3416 if not HAS_CRT:
3417 raise MissingDependencyException(
3418 msg="Using EndpointId requires an additional "
3419 "dependency. You will need to pip install "
3420 "botocore[crt] before proceeding."
3421 )
3423 config = context.get('client_config')
3424 endpoint_variant_tags = None
3425 if config is not None:
3426 if config.use_fips_endpoint:
3427 raise InvalidEndpointConfigurationError(
3428 msg="FIPS is not supported with EventBridge "
3429 "multi-region endpoints."
3430 )
3431 if config.use_dualstack_endpoint:
3432 endpoint_variant_tags = ['dualstack']
3434 if self._endpoint_url is None:
3435 # Validate endpoint is a valid hostname component
3436 parts = urlparse(f'https://{endpoint}')
3437 if parts.hostname != endpoint:
3438 raise InvalidEndpointConfigurationError(
3439 msg='EndpointId is not a valid hostname component.'
3440 )
3441 resolved_endpoint = self._get_global_endpoint(
3442 endpoint, endpoint_variant_tags=endpoint_variant_tags
3443 )
3444 else:
3445 resolved_endpoint = self._endpoint_url
3447 context['eventbridge_endpoint'] = resolved_endpoint
3448 context['auth_type'] = 'v4a'
3450 def _get_global_endpoint(self, endpoint, endpoint_variant_tags=None):
3451 resolver = self._endpoint_resolver
3453 partition = resolver.get_partition_for_region(self._region)
3454 if partition is None:
3455 partition = self._DEFAULT_PARTITION
3456 dns_suffix = resolver.get_partition_dns_suffix(
3457 partition, endpoint_variant_tags=endpoint_variant_tags
3458 )
3459 if dns_suffix is None:
3460 dns_suffix = self._DEFAULT_DNS_SUFFIX
3462 return f"https://{endpoint}.endpoint.events.{dns_suffix}/"
3465def is_s3_accelerate_url(url):
3466 """Does the URL match the S3 Accelerate endpoint scheme?
3468 Virtual host naming style with bucket names in the netloc part of the URL
3469 are not allowed by this function.
3470 """
3471 if url is None:
3472 return False
3474 # Accelerate is only valid for Amazon endpoints.
3475 url_parts = urlsplit(url)
3476 if not url_parts.netloc.endswith(
3477 'amazonaws.com'
3478 ) or url_parts.scheme not in ['https', 'http']:
3479 return False
3481 # The first part of the URL must be s3-accelerate.
3482 parts = url_parts.netloc.split('.')
3483 if parts[0] != 's3-accelerate':
3484 return False
3486 # Url parts between 's3-accelerate' and 'amazonaws.com' which
3487 # represent different url features.
3488 feature_parts = parts[1:-2]
3490 # There should be no duplicate URL parts.
3491 if len(feature_parts) != len(set(feature_parts)):
3492 return False
3494 # Remaining parts must all be in the whitelist.
3495 return all(p in S3_ACCELERATE_WHITELIST for p in feature_parts)
3498class JSONFileCache:
3499 """JSON file cache.
3500 This provides a dict like interface that stores JSON serializable
3501 objects.
3502 The objects are serialized to JSON and stored in a file. These
3503 values can be retrieved at a later time.
3504 """
3506 CACHE_DIR = os.path.expanduser(os.path.join('~', '.aws', 'boto', 'cache'))
3508 def __init__(self, working_dir=CACHE_DIR, dumps_func=None):
3509 self._working_dir = working_dir
3510 if dumps_func is None:
3511 dumps_func = self._default_dumps
3512 self._dumps = dumps_func
3514 def _default_dumps(self, obj):
3515 return json.dumps(obj, default=self._serialize_if_needed)
3517 def __contains__(self, cache_key):
3518 actual_key = self._convert_cache_key(cache_key)
3519 return os.path.isfile(actual_key)
3521 def __getitem__(self, cache_key):
3522 """Retrieve value from a cache key."""
3523 actual_key = self._convert_cache_key(cache_key)
3524 try:
3525 with open(actual_key) as f:
3526 return json.load(f)
3527 except (OSError, ValueError):
3528 raise KeyError(cache_key)
3530 def __delitem__(self, cache_key):
3531 actual_key = self._convert_cache_key(cache_key)
3532 try:
3533 key_path = Path(actual_key)
3534 key_path.unlink()
3535 except FileNotFoundError:
3536 raise KeyError(cache_key)
3538 def __setitem__(self, cache_key, value):
3539 full_key = self._convert_cache_key(cache_key)
3540 try:
3541 file_content = self._dumps(value)
3542 except (TypeError, ValueError):
3543 raise ValueError(
3544 f"Value cannot be cached, must be JSON serializable: {value}"
3545 )
3546 if not os.path.isdir(self._working_dir):
3547 os.makedirs(self._working_dir)
3548 with os.fdopen(
3549 os.open(full_key, os.O_WRONLY | os.O_CREAT, 0o600), 'w'
3550 ) as f:
3551 f.truncate()
3552 f.write(file_content)
3554 def _convert_cache_key(self, cache_key):
3555 full_path = os.path.join(self._working_dir, cache_key + '.json')
3556 return full_path
3558 def _serialize_if_needed(self, value, iso=False):
3559 if isinstance(value, _DatetimeClass):
3560 if iso:
3561 return value.isoformat()
3562 return value.strftime('%Y-%m-%dT%H:%M:%S%Z')
3563 return value
3566def is_s3express_bucket(bucket):
3567 if bucket is None:
3568 return False
3569 return bucket.endswith('--x-s3')
3572# This parameter is not part of the public interface and is subject to abrupt
3573# breaking changes or removal without prior announcement.
3574# Mapping of services that have been renamed for backwards compatibility reasons.
3575# Keys are the previous name that should be allowed, values are the documented
3576# and preferred client name.
3577SERVICE_NAME_ALIASES = {'runtime.sagemaker': 'sagemaker-runtime'}
3580# This parameter is not part of the public interface and is subject to abrupt
3581# breaking changes or removal without prior announcement.
3582# Mapping to determine the service ID for services that do not use it as the
3583# model data directory name. The keys are the data directory name and the
3584# values are the transformed service IDs (lower case and hyphenated).
3585CLIENT_NAME_TO_HYPHENIZED_SERVICE_ID_OVERRIDES = {
3586 # Actual service name we use -> Allowed computed service name.
3587 'apigateway': 'api-gateway',
3588 'application-autoscaling': 'application-auto-scaling',
3589 'appmesh': 'app-mesh',
3590 'autoscaling': 'auto-scaling',
3591 'autoscaling-plans': 'auto-scaling-plans',
3592 'ce': 'cost-explorer',
3593 'cloudhsmv2': 'cloudhsm-v2',
3594 'cloudsearchdomain': 'cloudsearch-domain',
3595 'cognito-idp': 'cognito-identity-provider',
3596 'config': 'config-service',
3597 'cur': 'cost-and-usage-report-service',
3598 'datapipeline': 'data-pipeline',
3599 'directconnect': 'direct-connect',
3600 'devicefarm': 'device-farm',
3601 'discovery': 'application-discovery-service',
3602 'dms': 'database-migration-service',
3603 'ds': 'directory-service',
3604 'ds-data': 'directory-service-data',
3605 'dynamodbstreams': 'dynamodb-streams',
3606 'elasticbeanstalk': 'elastic-beanstalk',
3607 'elastictranscoder': 'elastic-transcoder',
3608 'elb': 'elastic-load-balancing',
3609 'elbv2': 'elastic-load-balancing-v2',
3610 'es': 'elasticsearch-service',
3611 'events': 'eventbridge',
3612 'globalaccelerator': 'global-accelerator',
3613 'iot-data': 'iot-data-plane',
3614 'iot-jobs-data': 'iot-jobs-data-plane',
3615 'iotevents-data': 'iot-events-data',
3616 'iotevents': 'iot-events',
3617 'iotwireless': 'iot-wireless',
3618 'kinesisanalytics': 'kinesis-analytics',
3619 'kinesisanalyticsv2': 'kinesis-analytics-v2',
3620 'kinesisvideo': 'kinesis-video',
3621 'lex-models': 'lex-model-building-service',
3622 'lexv2-models': 'lex-models-v2',
3623 'lex-runtime': 'lex-runtime-service',
3624 'lexv2-runtime': 'lex-runtime-v2',
3625 'logs': 'cloudwatch-logs',
3626 'machinelearning': 'machine-learning',
3627 'marketplacecommerceanalytics': 'marketplace-commerce-analytics',
3628 'marketplace-entitlement': 'marketplace-entitlement-service',
3629 'meteringmarketplace': 'marketplace-metering',
3630 'mgh': 'migration-hub',
3631 'sms-voice': 'pinpoint-sms-voice',
3632 'resourcegroupstaggingapi': 'resource-groups-tagging-api',
3633 'route53': 'route-53',
3634 'route53domains': 'route-53-domains',
3635 's3control': 's3-control',
3636 'sdb': 'simpledb',
3637 'secretsmanager': 'secrets-manager',
3638 'serverlessrepo': 'serverlessapplicationrepository',
3639 'servicecatalog': 'service-catalog',
3640 'servicecatalog-appregistry': 'service-catalog-appregistry',
3641 'stepfunctions': 'sfn',
3642 'storagegateway': 'storage-gateway',
3643}