Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/botocore/utils.py: 20%
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1# Copyright 2012-2014 Amazon.com, Inc. or its affiliates. All Rights Reserved.
2#
3# Licensed under the Apache License, Version 2.0 (the "License"). You
4# may not use this file except in compliance with the License. A copy of
5# the License is located at
6#
7# http://aws.amazon.com/apache2.0/
8#
9# or in the "license" file accompanying this file. This file is
10# distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF
11# ANY KIND, either express or implied. See the License for the specific
12# language governing permissions and limitations under the License.
13import base64
14import binascii
15import datetime
16import email.message
17import functools
18import hashlib
19import io
20import logging
21import os
22import random
23import re
24import socket
25import time
26import warnings
27import weakref
28from datetime import datetime as _DatetimeClass
29from ipaddress import ip_address
30from pathlib import Path
31from urllib.request import getproxies, proxy_bypass
33import dateutil.parser
34from dateutil.tz import tzutc
35from urllib3.exceptions import LocationParseError
37import botocore
38import botocore.awsrequest
39import botocore.httpsession
41# IP Regexes retained for backwards compatibility
42from botocore.compat import (
43 HAS_CRT,
44 HEX_PAT, # noqa: F401
45 IPV4_PAT, # noqa: F401
46 IPV4_RE,
47 IPV6_ADDRZ_PAT, # noqa: F401
48 IPV6_ADDRZ_RE,
49 IPV6_PAT, # noqa: F401
50 LS32_PAT, # noqa: F401
51 MD5_AVAILABLE,
52 UNRESERVED_PAT, # noqa: F401
53 UNSAFE_URL_CHARS,
54 ZONE_ID_PAT, # noqa: F401
55 OrderedDict,
56 get_current_datetime,
57 get_md5,
58 get_tzinfo_options,
59 json,
60 quote,
61 urlparse,
62 urlsplit,
63 urlunsplit,
64 zip_longest,
65)
66from botocore.exceptions import (
67 ClientError,
68 ConfigNotFound,
69 ConnectionClosedError,
70 ConnectTimeoutError,
71 EndpointConnectionError,
72 HTTPClientError,
73 InvalidDNSNameError,
74 InvalidEndpointConfigurationError,
75 InvalidExpressionError,
76 InvalidHostLabelError,
77 InvalidIMDSEndpointError,
78 InvalidIMDSEndpointModeError,
79 InvalidRegionError,
80 MetadataRetrievalError,
81 MissingDependencyException,
82 ReadTimeoutError,
83 SSOTokenLoadError,
84 UnsupportedOutpostResourceError,
85 UnsupportedS3AccesspointConfigurationError,
86 UnsupportedS3ArnError,
87 UnsupportedS3ConfigurationError,
88 UnsupportedS3ControlArnError,
89 UnsupportedS3ControlConfigurationError,
90)
91from botocore.plugin import (
92 PluginContext,
93 reset_plugin_context,
94 set_plugin_context,
95)
97logger = logging.getLogger(__name__)
98DEFAULT_METADATA_SERVICE_TIMEOUT = 1
99METADATA_BASE_URL = 'http://169.254.169.254/'
100METADATA_BASE_URL_IPv6 = 'http://[fd00:ec2::254]/'
101METADATA_ENDPOINT_MODES = ('ipv4', 'ipv6')
103# These are chars that do not need to be urlencoded.
104# Based on rfc2986, section 2.3
105SAFE_CHARS = '-._~'
106LABEL_RE = re.compile(r'[a-z0-9][a-z0-9\-]*[a-z0-9]')
107RETRYABLE_HTTP_ERRORS = (
108 ReadTimeoutError,
109 EndpointConnectionError,
110 ConnectionClosedError,
111 ConnectTimeoutError,
112)
113S3_ACCELERATE_WHITELIST = ['dualstack']
114# In switching events from using service name / endpoint prefix to service
115# id, we have to preserve compatibility. This maps the instances where either
116# is different than the transformed service id.
117EVENT_ALIASES = {
118 "api.mediatailor": "mediatailor",
119 "api.pricing": "pricing",
120 "api.sagemaker": "sagemaker",
121 "apigateway": "api-gateway",
122 "application-autoscaling": "application-auto-scaling",
123 "appstream2": "appstream",
124 "autoscaling": "auto-scaling",
125 "autoscaling-plans": "auto-scaling-plans",
126 "ce": "cost-explorer",
127 "cloudhsmv2": "cloudhsm-v2",
128 "cloudsearchdomain": "cloudsearch-domain",
129 "cognito-idp": "cognito-identity-provider",
130 "config": "config-service",
131 "cur": "cost-and-usage-report-service",
132 "data.iot": "iot-data-plane",
133 "data.jobs.iot": "iot-jobs-data-plane",
134 "data.mediastore": "mediastore-data",
135 "datapipeline": "data-pipeline",
136 "devicefarm": "device-farm",
137 "directconnect": "direct-connect",
138 "discovery": "application-discovery-service",
139 "dms": "database-migration-service",
140 "ds": "directory-service",
141 "dynamodbstreams": "dynamodb-streams",
142 "elasticbeanstalk": "elastic-beanstalk",
143 "elasticfilesystem": "efs",
144 "elasticloadbalancing": "elastic-load-balancing",
145 "elasticmapreduce": "emr",
146 "elastictranscoder": "elastic-transcoder",
147 "elb": "elastic-load-balancing",
148 "elbv2": "elastic-load-balancing-v2",
149 "email": "ses",
150 "entitlement.marketplace": "marketplace-entitlement-service",
151 "es": "elasticsearch-service",
152 "events": "eventbridge",
153 "cloudwatch-events": "eventbridge",
154 "iot-data": "iot-data-plane",
155 "iot-jobs-data": "iot-jobs-data-plane",
156 "kinesisanalytics": "kinesis-analytics",
157 "kinesisvideo": "kinesis-video",
158 "lex-models": "lex-model-building-service",
159 "lex-runtime": "lex-runtime-service",
160 "logs": "cloudwatch-logs",
161 "machinelearning": "machine-learning",
162 "marketplace-entitlement": "marketplace-entitlement-service",
163 "marketplacecommerceanalytics": "marketplace-commerce-analytics",
164 "metering.marketplace": "marketplace-metering",
165 "meteringmarketplace": "marketplace-metering",
166 "mgh": "migration-hub",
167 "models.lex": "lex-model-building-service",
168 "monitoring": "cloudwatch",
169 "mturk-requester": "mturk",
170 "resourcegroupstaggingapi": "resource-groups-tagging-api",
171 "route53": "route-53",
172 "route53domains": "route-53-domains",
173 "runtime.lex": "lex-runtime-service",
174 "runtime.sagemaker": "sagemaker-runtime",
175 "sdb": "simpledb",
176 "secretsmanager": "secrets-manager",
177 "serverlessrepo": "serverlessapplicationrepository",
178 "servicecatalog": "service-catalog",
179 "states": "sfn",
180 "stepfunctions": "sfn",
181 "storagegateway": "storage-gateway",
182 "streams.dynamodb": "dynamodb-streams",
183 "tagging": "resource-groups-tagging-api",
184}
187# This pattern can be used to detect if a header is a flexible checksum header
188CHECKSUM_HEADER_PATTERN = re.compile(
189 r'^X-Amz-Checksum-([a-z0-9]*)$',
190 flags=re.IGNORECASE,
191)
193PRIORITY_ORDERED_SUPPORTED_PROTOCOLS = (
194 'json',
195 'rest-json',
196 'rest-xml',
197 'smithy-rpc-v2-cbor',
198 'query',
199 'ec2',
200)
203def ensure_boolean(val):
204 """Ensures a boolean value if a string or boolean is provided
206 For strings, the value for True/False is case insensitive
207 """
208 if isinstance(val, bool):
209 return val
210 elif isinstance(val, str):
211 return val.lower() == 'true'
212 else:
213 return False
216def resolve_imds_endpoint_mode(session):
217 """Resolving IMDS endpoint mode to either IPv6 or IPv4.
219 ec2_metadata_service_endpoint_mode takes precedence over imds_use_ipv6.
220 """
221 endpoint_mode = session.get_config_variable(
222 'ec2_metadata_service_endpoint_mode'
223 )
224 if endpoint_mode is not None:
225 lendpoint_mode = endpoint_mode.lower()
226 if lendpoint_mode not in METADATA_ENDPOINT_MODES:
227 error_msg_kwargs = {
228 'mode': endpoint_mode,
229 'valid_modes': METADATA_ENDPOINT_MODES,
230 }
231 raise InvalidIMDSEndpointModeError(**error_msg_kwargs)
232 return lendpoint_mode
233 elif session.get_config_variable('imds_use_ipv6'):
234 return 'ipv6'
235 return 'ipv4'
238def is_json_value_header(shape):
239 """Determines if the provided shape is the special header type jsonvalue.
241 :type shape: botocore.shape
242 :param shape: Shape to be inspected for the jsonvalue trait.
244 :return: True if this type is a jsonvalue, False otherwise
245 :rtype: Bool
246 """
247 return (
248 hasattr(shape, 'serialization')
249 and shape.serialization.get('jsonvalue', False)
250 and shape.serialization.get('location') == 'header'
251 and shape.type_name == 'string'
252 )
255def has_header(header_name, headers):
256 """Case-insensitive check for header key."""
257 if header_name is None:
258 return False
259 elif isinstance(headers, botocore.awsrequest.HeadersDict):
260 return header_name in headers
261 else:
262 return header_name.lower() in [key.lower() for key in headers.keys()]
265def get_service_module_name(service_model):
266 """Returns the module name for a service
268 This is the value used in both the documentation and client class name
269 """
270 name = service_model.metadata.get(
271 'serviceAbbreviation',
272 service_model.metadata.get(
273 'serviceFullName', service_model.service_name
274 ),
275 )
276 name = name.replace('Amazon', '')
277 name = name.replace('AWS', '')
278 name = re.sub(r'\W+', '', name)
279 return name
282def normalize_url_path(path):
283 if not path:
284 return '/'
285 return remove_dot_segments(path)
288def normalize_boolean(val):
289 """Returns None if val is None, otherwise ensure value
290 converted to boolean"""
291 if val is None:
292 return val
293 else:
294 return ensure_boolean(val)
297def remove_dot_segments(url):
298 # RFC 3986, section 5.2.4 "Remove Dot Segments"
299 # Also, AWS services require consecutive slashes to be removed,
300 # so that's done here as well
301 if not url:
302 return ''
303 input_url = url.split('/')
304 output_list = []
305 for x in input_url:
306 if x and x != '.':
307 if x == '..':
308 if output_list:
309 output_list.pop()
310 else:
311 output_list.append(x)
313 if url[0] == '/':
314 first = '/'
315 else:
316 first = ''
317 if url[-1] == '/' and output_list:
318 last = '/'
319 else:
320 last = ''
321 return first + '/'.join(output_list) + last
324def validate_jmespath_for_set(expression):
325 # Validates a limited jmespath expression to determine if we can set a
326 # value based on it. Only works with dotted paths.
327 if not expression or expression == '.':
328 raise InvalidExpressionError(expression=expression)
330 for invalid in ['[', ']', '*']:
331 if invalid in expression:
332 raise InvalidExpressionError(expression=expression)
335def set_value_from_jmespath(source, expression, value, is_first=True):
336 # This takes a (limited) jmespath-like expression & can set a value based
337 # on it.
338 # Limitations:
339 # * Only handles dotted lookups
340 # * No offsets/wildcards/slices/etc.
341 if is_first:
342 validate_jmespath_for_set(expression)
344 bits = expression.split('.', 1)
345 current_key, remainder = bits[0], bits[1] if len(bits) > 1 else ''
347 if not current_key:
348 raise InvalidExpressionError(expression=expression)
350 if remainder:
351 if current_key not in source:
352 # We've got something in the expression that's not present in the
353 # source (new key). If there's any more bits, we'll set the key
354 # with an empty dictionary.
355 source[current_key] = {}
357 return set_value_from_jmespath(
358 source[current_key], remainder, value, is_first=False
359 )
361 # If we're down to a single key, set it.
362 source[current_key] = value
365def is_global_accesspoint(context):
366 """Determine if request is intended for an MRAP accesspoint."""
367 s3_accesspoint = context.get('s3_accesspoint', {})
368 is_global = s3_accesspoint.get('region') == ''
369 return is_global
372def create_nested_client(session, service_name, **kwargs):
373 # If a client is created from within a plugin based on the environment variable,
374 # an infinite loop could arise. Any clients created from within another client
375 # must use this method to prevent infinite loops.
376 ctx = PluginContext(plugins="DISABLED")
377 token = set_plugin_context(ctx)
378 try:
379 return session.create_client(service_name, **kwargs)
380 finally:
381 reset_plugin_context(token)
384class _RetriesExceededError(Exception):
385 """Internal exception used when the number of retries are exceeded."""
387 pass
390class BadIMDSRequestError(Exception):
391 def __init__(self, request):
392 self.request = request
395class IMDSFetcher:
396 _RETRIES_EXCEEDED_ERROR_CLS = _RetriesExceededError
397 _TOKEN_PATH = 'latest/api/token'
398 _TOKEN_TTL = '21600'
400 def __init__(
401 self,
402 timeout=DEFAULT_METADATA_SERVICE_TIMEOUT,
403 num_attempts=1,
404 base_url=METADATA_BASE_URL,
405 env=None,
406 user_agent=None,
407 config=None,
408 ):
409 self._timeout = timeout
410 self._num_attempts = num_attempts
411 if config is None:
412 config = {}
413 self._base_url = self._select_base_url(base_url, config)
414 self._config = config
416 if env is None:
417 env = os.environ.copy()
418 self._disabled = (
419 env.get('AWS_EC2_METADATA_DISABLED', 'false').lower() == 'true'
420 )
421 self._imds_v1_disabled = config.get('ec2_metadata_v1_disabled')
422 self._user_agent = user_agent
423 self._session = botocore.httpsession.URLLib3Session(
424 timeout=self._timeout,
425 proxies=get_environ_proxies(self._base_url),
426 )
428 def get_base_url(self):
429 return self._base_url
431 def _select_base_url(self, base_url, config):
432 if config is None:
433 config = {}
435 requires_ipv6 = (
436 config.get('ec2_metadata_service_endpoint_mode') == 'ipv6'
437 )
438 custom_metadata_endpoint = config.get('ec2_metadata_service_endpoint')
440 if requires_ipv6 and custom_metadata_endpoint:
441 logger.warning(
442 "Custom endpoint and IMDS_USE_IPV6 are both set. Using custom endpoint."
443 )
445 chosen_base_url = None
447 if base_url != METADATA_BASE_URL:
448 chosen_base_url = base_url
449 elif custom_metadata_endpoint:
450 chosen_base_url = custom_metadata_endpoint
451 elif requires_ipv6:
452 chosen_base_url = METADATA_BASE_URL_IPv6
453 else:
454 chosen_base_url = METADATA_BASE_URL
456 logger.debug("IMDS ENDPOINT: %s", chosen_base_url)
457 if not is_valid_uri(chosen_base_url):
458 raise InvalidIMDSEndpointError(endpoint=chosen_base_url)
460 return chosen_base_url
462 def _construct_url(self, path):
463 sep = ''
464 if self._base_url and not self._base_url.endswith('/'):
465 sep = '/'
466 return f'{self._base_url}{sep}{path}'
468 def _fetch_metadata_token(self):
469 self._assert_enabled()
470 url = self._construct_url(self._TOKEN_PATH)
471 headers = {
472 'x-aws-ec2-metadata-token-ttl-seconds': self._TOKEN_TTL,
473 }
474 self._add_user_agent(headers)
475 request = botocore.awsrequest.AWSRequest(
476 method='PUT', url=url, headers=headers
477 )
478 for i in range(self._num_attempts):
479 try:
480 response = self._session.send(request.prepare())
481 if response.status_code == 200:
482 return response.text
483 elif response.status_code in (404, 403, 405):
484 return None
485 elif response.status_code in (400,):
486 raise BadIMDSRequestError(request)
487 except ReadTimeoutError:
488 return None
489 except RETRYABLE_HTTP_ERRORS as e:
490 logger.debug(
491 "Caught retryable HTTP exception while making metadata "
492 "service request to %s: %s",
493 url,
494 e,
495 exc_info=True,
496 )
497 except HTTPClientError as e:
498 if isinstance(e.kwargs.get('error'), LocationParseError):
499 raise InvalidIMDSEndpointError(endpoint=url, error=e)
500 else:
501 raise
502 return None
504 def _get_request(self, url_path, retry_func, token=None):
505 """Make a get request to the Instance Metadata Service.
507 :type url_path: str
508 :param url_path: The path component of the URL to make a get request.
509 This arg is appended to the base_url that was provided in the
510 initializer.
512 :type retry_func: callable
513 :param retry_func: A function that takes the response as an argument
514 and determines if it needs to retry. By default empty and non
515 200 OK responses are retried.
517 :type token: str
518 :param token: Metadata token to send along with GET requests to IMDS.
519 """
520 self._assert_enabled()
521 if not token:
522 self._assert_v1_enabled()
523 if retry_func is None:
524 retry_func = self._default_retry
525 url = self._construct_url(url_path)
526 headers = {}
527 if token is not None:
528 headers['x-aws-ec2-metadata-token'] = token
529 self._add_user_agent(headers)
530 for i in range(self._num_attempts):
531 try:
532 request = botocore.awsrequest.AWSRequest(
533 method='GET', url=url, headers=headers
534 )
535 response = self._session.send(request.prepare())
536 if not retry_func(response):
537 return response
538 except RETRYABLE_HTTP_ERRORS as e:
539 logger.debug(
540 "Caught retryable HTTP exception while making metadata "
541 "service request to %s: %s",
542 url,
543 e,
544 exc_info=True,
545 )
546 raise self._RETRIES_EXCEEDED_ERROR_CLS()
548 def _add_user_agent(self, headers):
549 if self._user_agent is not None:
550 headers['User-Agent'] = self._user_agent
552 def _assert_enabled(self):
553 if self._disabled:
554 logger.debug("Access to EC2 metadata has been disabled.")
555 raise self._RETRIES_EXCEEDED_ERROR_CLS()
557 def _assert_v1_enabled(self):
558 if self._imds_v1_disabled:
559 raise MetadataRetrievalError(
560 error_msg="Unable to retrieve token for use in IMDSv2 call and IMDSv1 has been disabled"
561 )
563 def _default_retry(self, response):
564 return self._is_non_ok_response(response) or self._is_empty(response)
566 def _is_non_ok_response(self, response):
567 if response.status_code != 200:
568 self._log_imds_response(response, 'non-200', log_body=True)
569 return True
570 return False
572 def _is_empty(self, response):
573 if not response.content:
574 self._log_imds_response(response, 'no body', log_body=True)
575 return True
576 return False
578 def _log_imds_response(self, response, reason_to_log, log_body=False):
579 statement = (
580 "Metadata service returned %s response "
581 "with status code of %s for url: %s"
582 )
583 logger_args = [reason_to_log, response.status_code, response.url]
584 if log_body:
585 statement += ", content body: %s"
586 logger_args.append(response.content)
587 logger.debug(statement, *logger_args)
590class InstanceMetadataFetcher(IMDSFetcher):
591 _URL_PATH = 'latest/meta-data/iam/security-credentials/'
592 _REQUIRED_CREDENTIAL_FIELDS = [
593 'AccessKeyId',
594 'SecretAccessKey',
595 'Token',
596 'Expiration',
597 ]
599 def retrieve_iam_role_credentials(self):
600 try:
601 token = self._fetch_metadata_token()
602 role_name = self._get_iam_role(token)
603 credentials = self._get_credentials(role_name, token)
604 if self._contains_all_credential_fields(credentials):
605 credentials = {
606 'role_name': role_name,
607 'access_key': credentials['AccessKeyId'],
608 'secret_key': credentials['SecretAccessKey'],
609 'token': credentials['Token'],
610 'expiry_time': credentials['Expiration'],
611 }
612 self._evaluate_expiration(credentials)
613 return credentials
614 else:
615 # IMDS can return a 200 response that has a JSON formatted
616 # error message (i.e. if ec2 is not trusted entity for the
617 # attached role). We do not necessarily want to retry for
618 # these and we also do not necessarily want to raise a key
619 # error. So at least log the problematic response and return
620 # an empty dictionary to signal that it was not able to
621 # retrieve credentials. These error will contain both a
622 # Code and Message key.
623 if 'Code' in credentials and 'Message' in credentials:
624 logger.debug(
625 'Error response received when retrieving'
626 'credentials: %s.',
627 credentials,
628 )
629 return {}
630 except self._RETRIES_EXCEEDED_ERROR_CLS:
631 logger.debug(
632 "Max number of attempts exceeded (%s) when "
633 "attempting to retrieve data from metadata service.",
634 self._num_attempts,
635 )
636 except BadIMDSRequestError as e:
637 logger.debug("Bad IMDS request: %s", e.request)
638 return {}
640 def _get_iam_role(self, token=None):
641 return self._get_request(
642 url_path=self._URL_PATH,
643 retry_func=self._needs_retry_for_role_name,
644 token=token,
645 ).text
647 def _get_credentials(self, role_name, token=None):
648 r = self._get_request(
649 url_path=self._URL_PATH + role_name,
650 retry_func=self._needs_retry_for_credentials,
651 token=token,
652 )
653 return json.loads(r.text)
655 def _is_invalid_json(self, response):
656 try:
657 json.loads(response.text)
658 return False
659 except ValueError:
660 self._log_imds_response(response, 'invalid json')
661 return True
663 def _needs_retry_for_role_name(self, response):
664 return self._is_non_ok_response(response) or self._is_empty(response)
666 def _needs_retry_for_credentials(self, response):
667 return (
668 self._is_non_ok_response(response)
669 or self._is_empty(response)
670 or self._is_invalid_json(response)
671 )
673 def _contains_all_credential_fields(self, credentials):
674 for field in self._REQUIRED_CREDENTIAL_FIELDS:
675 if field not in credentials:
676 logger.debug(
677 'Retrieved credentials is missing required field: %s',
678 field,
679 )
680 return False
681 return True
683 def _evaluate_expiration(self, credentials):
684 expiration = credentials.get("expiry_time")
685 if expiration is None:
686 return
687 try:
688 expiration = datetime.datetime.strptime(
689 expiration, "%Y-%m-%dT%H:%M:%SZ"
690 )
691 refresh_interval = self._config.get(
692 "ec2_credential_refresh_window", 60 * 10
693 )
694 jitter = random.randint(120, 600) # Between 2 to 10 minutes
695 refresh_interval_with_jitter = refresh_interval + jitter
696 current_time = get_current_datetime()
697 refresh_offset = datetime.timedelta(
698 seconds=refresh_interval_with_jitter
699 )
700 extension_time = expiration - refresh_offset
701 if current_time >= extension_time:
702 new_time = current_time + refresh_offset
703 credentials["expiry_time"] = new_time.strftime(
704 "%Y-%m-%dT%H:%M:%SZ"
705 )
706 logger.info(
707 "Attempting credential expiration extension due to a "
708 "credential service availability issue. A refresh of "
709 "these credentials will be attempted again within "
710 "the next %.0f minutes.",
711 refresh_interval_with_jitter / 60,
712 )
713 except ValueError:
714 logger.debug(
715 "Unable to parse expiry_time in %s", credentials['expiry_time']
716 )
719class IMDSRegionProvider:
720 def __init__(self, session, environ=None, fetcher=None):
721 """Initialize IMDSRegionProvider.
722 :type session: :class:`botocore.session.Session`
723 :param session: The session is needed to look up configuration for
724 how to contact the instance metadata service. Specifically the
725 whether or not it should use the IMDS region at all, and if so how
726 to configure the timeout and number of attempts to reach the
727 service.
728 :type environ: None or dict
729 :param environ: A dictionary of environment variables to use. If
730 ``None`` is the argument then ``os.environ`` will be used by
731 default.
732 :type fecther: :class:`botocore.utils.InstanceMetadataRegionFetcher`
733 :param fetcher: The class to actually handle the fetching of the region
734 from the IMDS. If not provided a default one will be created.
735 """
736 self._session = session
737 if environ is None:
738 environ = os.environ
739 self._environ = environ
740 self._fetcher = fetcher
742 def provide(self):
743 """Provide the region value from IMDS."""
744 instance_region = self._get_instance_metadata_region()
745 return instance_region
747 def _get_instance_metadata_region(self):
748 fetcher = self._get_fetcher()
749 region = fetcher.retrieve_region()
750 return region
752 def _get_fetcher(self):
753 if self._fetcher is None:
754 self._fetcher = self._create_fetcher()
755 return self._fetcher
757 def _create_fetcher(self):
758 metadata_timeout = self._session.get_config_variable(
759 'metadata_service_timeout'
760 )
761 metadata_num_attempts = self._session.get_config_variable(
762 'metadata_service_num_attempts'
763 )
764 imds_config = {
765 'ec2_metadata_service_endpoint': self._session.get_config_variable(
766 'ec2_metadata_service_endpoint'
767 ),
768 'ec2_metadata_service_endpoint_mode': resolve_imds_endpoint_mode(
769 self._session
770 ),
771 'ec2_metadata_v1_disabled': self._session.get_config_variable(
772 'ec2_metadata_v1_disabled'
773 ),
774 }
775 fetcher = InstanceMetadataRegionFetcher(
776 timeout=metadata_timeout,
777 num_attempts=metadata_num_attempts,
778 env=self._environ,
779 user_agent=self._session.user_agent(),
780 config=imds_config,
781 )
782 return fetcher
785class InstanceMetadataRegionFetcher(IMDSFetcher):
786 _URL_PATH = 'latest/meta-data/placement/availability-zone/'
788 def retrieve_region(self):
789 """Get the current region from the instance metadata service.
790 :rvalue: str
791 :returns: The region the current instance is running in or None
792 if the instance metadata service cannot be contacted or does not
793 give a valid response.
794 :rtype: None or str
795 :returns: Returns the region as a string if it is configured to use
796 IMDS as a region source. Otherwise returns ``None``. It will also
797 return ``None`` if it fails to get the region from IMDS due to
798 exhausting its retries or not being able to connect.
799 """
800 try:
801 region = self._get_region()
802 return region
803 except self._RETRIES_EXCEEDED_ERROR_CLS:
804 logger.debug(
805 "Max number of attempts exceeded (%s) when "
806 "attempting to retrieve data from metadata service.",
807 self._num_attempts,
808 )
809 return None
811 def _get_region(self):
812 token = self._fetch_metadata_token()
813 response = self._get_request(
814 url_path=self._URL_PATH,
815 retry_func=self._default_retry,
816 token=token,
817 )
818 availability_zone = response.text
819 region = availability_zone[:-1]
820 return region
823def merge_dicts(dict1, dict2, append_lists=False):
824 """Given two dict, merge the second dict into the first.
826 The dicts can have arbitrary nesting.
828 :param append_lists: If true, instead of clobbering a list with the new
829 value, append all of the new values onto the original list.
830 """
831 for key in dict2:
832 if isinstance(dict2[key], dict):
833 if key in dict1 and key in dict2:
834 merge_dicts(dict1[key], dict2[key])
835 else:
836 dict1[key] = dict2[key]
837 # If the value is a list and the ``append_lists`` flag is set,
838 # append the new values onto the original list
839 elif isinstance(dict2[key], list) and append_lists:
840 # The value in dict1 must be a list in order to append new
841 # values onto it.
842 if key in dict1 and isinstance(dict1[key], list):
843 dict1[key].extend(dict2[key])
844 else:
845 dict1[key] = dict2[key]
846 else:
847 # At scalar types, we iterate and merge the
848 # current dict that we're on.
849 dict1[key] = dict2[key]
852def lowercase_dict(original):
853 """Copies the given dictionary ensuring all keys are lowercase strings."""
854 copy = {}
855 for key in original:
856 copy[key.lower()] = original[key]
857 return copy
860def parse_key_val_file(filename, _open=open):
861 try:
862 with _open(filename) as f:
863 contents = f.read()
864 return parse_key_val_file_contents(contents)
865 except OSError:
866 raise ConfigNotFound(path=filename)
869def parse_key_val_file_contents(contents):
870 # This was originally extracted from the EC2 credential provider, which was
871 # fairly lenient in its parsing. We only try to parse key/val pairs if
872 # there's a '=' in the line.
873 final = {}
874 for line in contents.splitlines():
875 if '=' not in line:
876 continue
877 key, val = line.split('=', 1)
878 key = key.strip()
879 val = val.strip()
880 final[key] = val
881 return final
884def percent_encode_sequence(mapping, safe=SAFE_CHARS):
885 """Urlencode a dict or list into a string.
887 This is similar to urllib.urlencode except that:
889 * It uses quote, and not quote_plus
890 * It has a default list of safe chars that don't need
891 to be encoded, which matches what AWS services expect.
893 If any value in the input ``mapping`` is a list type,
894 then each list element wil be serialized. This is the equivalent
895 to ``urlencode``'s ``doseq=True`` argument.
897 This function should be preferred over the stdlib
898 ``urlencode()`` function.
900 :param mapping: Either a dict to urlencode or a list of
901 ``(key, value)`` pairs.
903 """
904 encoded_pairs = []
905 if hasattr(mapping, 'items'):
906 pairs = mapping.items()
907 else:
908 pairs = mapping
909 for key, value in pairs:
910 if isinstance(value, list):
911 for element in value:
912 encoded_pairs.append(
913 f'{percent_encode(key)}={percent_encode(element)}'
914 )
915 else:
916 encoded_pairs.append(
917 f'{percent_encode(key)}={percent_encode(value)}'
918 )
919 return '&'.join(encoded_pairs)
922def percent_encode(input_str, safe=SAFE_CHARS):
923 """Urlencodes a string.
925 Whereas percent_encode_sequence handles taking a dict/sequence and
926 producing a percent encoded string, this function deals only with
927 taking a string (not a dict/sequence) and percent encoding it.
929 If given the binary type, will simply URL encode it. If given the
930 text type, will produce the binary type by UTF-8 encoding the
931 text. If given something else, will convert it to the text type
932 first.
933 """
934 # If its not a binary or text string, make it a text string.
935 if not isinstance(input_str, (bytes, str)):
936 input_str = str(input_str)
937 # If it's not bytes, make it bytes by UTF-8 encoding it.
938 if not isinstance(input_str, bytes):
939 input_str = input_str.encode('utf-8')
940 return quote(input_str, safe=safe)
943def _epoch_seconds_to_datetime(value, tzinfo):
944 """Parse numerical epoch timestamps (seconds since 1970) into a
945 ``datetime.datetime`` in UTC using ``datetime.timedelta``. This is intended
946 as fallback when ``fromtimestamp`` raises ``OverflowError`` or ``OSError``.
948 :type value: float or int
949 :param value: The Unix timestamps as number.
951 :type tzinfo: callable
952 :param tzinfo: A ``datetime.tzinfo`` class or compatible callable.
953 """
954 epoch_zero = datetime.datetime(1970, 1, 1, 0, 0, 0, tzinfo=tzutc())
955 epoch_zero_localized = epoch_zero.astimezone(tzinfo())
956 return epoch_zero_localized + datetime.timedelta(seconds=value)
959def _parse_timestamp_with_tzinfo(value, tzinfo):
960 """Parse timestamp with pluggable tzinfo options."""
961 if isinstance(value, (int, float)):
962 # Possibly an epoch time.
963 return datetime.datetime.fromtimestamp(value, tzinfo())
964 else:
965 try:
966 return datetime.datetime.fromtimestamp(float(value), tzinfo())
967 except (TypeError, ValueError):
968 pass
969 try:
970 # In certain cases, a timestamp marked with GMT can be parsed into a
971 # different time zone, so here we provide a context which will
972 # enforce that GMT == UTC.
973 return dateutil.parser.parse(value, tzinfos={'GMT': tzutc()})
974 except (TypeError, ValueError) as e:
975 raise ValueError(f'Invalid timestamp "{value}": {e}')
978def parse_timestamp(value):
979 """Parse a timestamp into a datetime object.
981 Supported formats:
983 * iso8601
984 * rfc822
985 * epoch (value is an integer)
987 This will return a ``datetime.datetime`` object.
989 """
990 tzinfo_options = get_tzinfo_options()
991 for tzinfo in tzinfo_options:
992 try:
993 return _parse_timestamp_with_tzinfo(value, tzinfo)
994 except (OSError, OverflowError) as e:
995 logger.debug(
996 'Unable to parse timestamp with "%s" timezone info.',
997 tzinfo.__name__,
998 exc_info=e,
999 )
1000 # For numeric values attempt fallback to using fromtimestamp-free method.
1001 # From Python's ``datetime.datetime.fromtimestamp`` documentation: "This
1002 # may raise ``OverflowError``, if the timestamp is out of the range of
1003 # values supported by the platform C localtime() function, and ``OSError``
1004 # on localtime() failure. It's common for this to be restricted to years
1005 # from 1970 through 2038."
1006 try:
1007 numeric_value = float(value)
1008 except (TypeError, ValueError):
1009 pass
1010 else:
1011 try:
1012 for tzinfo in tzinfo_options:
1013 return _epoch_seconds_to_datetime(numeric_value, tzinfo=tzinfo)
1014 except (OSError, OverflowError) as e:
1015 logger.debug(
1016 'Unable to parse timestamp using fallback method with "%s" '
1017 'timezone info.',
1018 tzinfo.__name__,
1019 exc_info=e,
1020 )
1021 raise RuntimeError(
1022 f'Unable to calculate correct timezone offset for "{value}"'
1023 )
1026def parse_to_aware_datetime(value):
1027 """Converted the passed in value to a datetime object with tzinfo.
1029 This function can be used to normalize all timestamp inputs. This
1030 function accepts a number of different types of inputs, but
1031 will always return a datetime.datetime object with time zone
1032 information.
1034 The input param ``value`` can be one of several types:
1036 * A datetime object (both naive and aware)
1037 * An integer representing the epoch time (can also be a string
1038 of the integer, i.e '0', instead of 0). The epoch time is
1039 considered to be UTC.
1040 * An iso8601 formatted timestamp. This does not need to be
1041 a complete timestamp, it can contain just the date portion
1042 without the time component.
1044 The returned value will be a datetime object that will have tzinfo.
1045 If no timezone info was provided in the input value, then UTC is
1046 assumed, not local time.
1048 """
1049 # This is a general purpose method that handles several cases of
1050 # converting the provided value to a string timestamp suitable to be
1051 # serialized to an http request. It can handle:
1052 # 1) A datetime.datetime object.
1053 if isinstance(value, _DatetimeClass):
1054 datetime_obj = value
1055 else:
1056 # 2) A string object that's formatted as a timestamp.
1057 # We document this as being an iso8601 timestamp, although
1058 # parse_timestamp is a bit more flexible.
1059 datetime_obj = parse_timestamp(value)
1060 if datetime_obj.tzinfo is None:
1061 # I think a case would be made that if no time zone is provided,
1062 # we should use the local time. However, to restore backwards
1063 # compat, the previous behavior was to assume UTC, which is
1064 # what we're going to do here.
1065 datetime_obj = datetime_obj.replace(tzinfo=tzutc())
1066 else:
1067 datetime_obj = datetime_obj.astimezone(tzutc())
1068 return datetime_obj
1071def datetime2timestamp(dt, default_timezone=None):
1072 """Calculate the timestamp based on the given datetime instance.
1074 :type dt: datetime
1075 :param dt: A datetime object to be converted into timestamp
1076 :type default_timezone: tzinfo
1077 :param default_timezone: If it is provided as None, we treat it as tzutc().
1078 But it is only used when dt is a naive datetime.
1079 :returns: The timestamp
1080 """
1081 epoch = datetime.datetime(1970, 1, 1)
1082 if dt.tzinfo is None:
1083 if default_timezone is None:
1084 default_timezone = tzutc()
1085 dt = dt.replace(tzinfo=default_timezone)
1086 d = dt.replace(tzinfo=None) - dt.utcoffset() - epoch
1087 return d.total_seconds()
1090def calculate_sha256(body, as_hex=False):
1091 """Calculate a sha256 checksum.
1093 This method will calculate the sha256 checksum of a file like
1094 object. Note that this method will iterate through the entire
1095 file contents. The caller is responsible for ensuring the proper
1096 starting position of the file and ``seek()``'ing the file back
1097 to its starting location if other consumers need to read from
1098 the file like object.
1100 :param body: Any file like object. The file must be opened
1101 in binary mode such that a ``.read()`` call returns bytes.
1102 :param as_hex: If True, then the hex digest is returned.
1103 If False, then the digest (as binary bytes) is returned.
1105 :returns: The sha256 checksum
1107 """
1108 checksum = hashlib.sha256()
1109 for chunk in iter(lambda: body.read(1024 * 1024), b''):
1110 checksum.update(chunk)
1111 if as_hex:
1112 return checksum.hexdigest()
1113 else:
1114 return checksum.digest()
1117def calculate_tree_hash(body):
1118 """Calculate a tree hash checksum.
1120 For more information see:
1122 http://docs.aws.amazon.com/amazonglacier/latest/dev/checksum-calculations.html
1124 :param body: Any file like object. This has the same constraints as
1125 the ``body`` param in calculate_sha256
1127 :rtype: str
1128 :returns: The hex version of the calculated tree hash
1130 """
1131 chunks = []
1132 required_chunk_size = 1024 * 1024
1133 sha256 = hashlib.sha256
1134 for chunk in iter(lambda: body.read(required_chunk_size), b''):
1135 chunks.append(sha256(chunk).digest())
1136 if not chunks:
1137 return sha256(b'').hexdigest()
1138 while len(chunks) > 1:
1139 new_chunks = []
1140 for first, second in _in_pairs(chunks):
1141 if second is not None:
1142 new_chunks.append(sha256(first + second).digest())
1143 else:
1144 # We're at the end of the list and there's no pair left.
1145 new_chunks.append(first)
1146 chunks = new_chunks
1147 return binascii.hexlify(chunks[0]).decode('ascii')
1150def _in_pairs(iterable):
1151 # Creates iterator that iterates over the list in pairs:
1152 # for a, b in _in_pairs([0, 1, 2, 3, 4]):
1153 # print(a, b)
1154 #
1155 # will print:
1156 # 0, 1
1157 # 2, 3
1158 # 4, None
1159 shared_iter = iter(iterable)
1160 # Note that zip_longest is a compat import that uses
1161 # the itertools izip_longest. This creates an iterator,
1162 # this call below does _not_ immediately create the list
1163 # of pairs.
1164 return zip_longest(shared_iter, shared_iter)
1167class CachedProperty:
1168 """A read only property that caches the initially computed value.
1170 This descriptor will only call the provided ``fget`` function once.
1171 Subsequent access to this property will return the cached value.
1173 """
1175 def __init__(self, fget):
1176 self._fget = fget
1178 def __get__(self, obj, cls):
1179 if obj is None:
1180 return self
1181 else:
1182 computed_value = self._fget(obj)
1183 obj.__dict__[self._fget.__name__] = computed_value
1184 return computed_value
1187class ArgumentGenerator:
1188 """Generate sample input based on a shape model.
1190 This class contains a ``generate_skeleton`` method that will take
1191 an input/output shape (created from ``botocore.model``) and generate
1192 a sample dictionary corresponding to the input/output shape.
1194 The specific values used are place holder values. For strings either an
1195 empty string or the member name can be used, for numbers 0 or 0.0 is used.
1196 The intended usage of this class is to generate the *shape* of the input
1197 structure.
1199 This can be useful for operations that have complex input shapes.
1200 This allows a user to just fill in the necessary data instead of
1201 worrying about the specific structure of the input arguments.
1203 Example usage::
1205 s = botocore.session.get_session()
1206 ddb = s.get_service_model('dynamodb')
1207 arg_gen = ArgumentGenerator()
1208 sample_input = arg_gen.generate_skeleton(
1209 ddb.operation_model('CreateTable').input_shape)
1210 print("Sample input for dynamodb.CreateTable: %s" % sample_input)
1212 """
1214 def __init__(self, use_member_names=False):
1215 self._use_member_names = use_member_names
1217 def generate_skeleton(self, shape):
1218 """Generate a sample input.
1220 :type shape: ``botocore.model.Shape``
1221 :param shape: The input shape.
1223 :return: The generated skeleton input corresponding to the
1224 provided input shape.
1226 """
1227 stack = []
1228 return self._generate_skeleton(shape, stack)
1230 def _generate_skeleton(self, shape, stack, name=''):
1231 stack.append(shape.name)
1232 try:
1233 if shape.type_name == 'structure':
1234 return self._generate_type_structure(shape, stack)
1235 elif shape.type_name == 'list':
1236 return self._generate_type_list(shape, stack)
1237 elif shape.type_name == 'map':
1238 return self._generate_type_map(shape, stack)
1239 elif shape.type_name == 'string':
1240 if self._use_member_names:
1241 return name
1242 if shape.enum:
1243 return random.choice(shape.enum)
1244 return ''
1245 elif shape.type_name in ['integer', 'long']:
1246 return 0
1247 elif shape.type_name in ['float', 'double']:
1248 return 0.0
1249 elif shape.type_name == 'boolean':
1250 return True
1251 elif shape.type_name == 'timestamp':
1252 return datetime.datetime(1970, 1, 1, 0, 0, 0)
1253 finally:
1254 stack.pop()
1256 def _generate_type_structure(self, shape, stack):
1257 if stack.count(shape.name) > 1:
1258 return {}
1259 skeleton = OrderedDict()
1260 for member_name, member_shape in shape.members.items():
1261 skeleton[member_name] = self._generate_skeleton(
1262 member_shape, stack, name=member_name
1263 )
1264 return skeleton
1266 def _generate_type_list(self, shape, stack):
1267 # For list elements we've arbitrarily decided to
1268 # return two elements for the skeleton list.
1269 name = ''
1270 if self._use_member_names:
1271 name = shape.member.name
1272 return [
1273 self._generate_skeleton(shape.member, stack, name),
1274 ]
1276 def _generate_type_map(self, shape, stack):
1277 key_shape = shape.key
1278 value_shape = shape.value
1279 assert key_shape.type_name == 'string'
1280 return OrderedDict(
1281 [
1282 ('KeyName', self._generate_skeleton(value_shape, stack)),
1283 ]
1284 )
1287def is_valid_ipv6_endpoint_url(endpoint_url):
1288 if UNSAFE_URL_CHARS.intersection(endpoint_url):
1289 return False
1290 hostname = f'[{urlparse(endpoint_url).hostname}]'
1291 return IPV6_ADDRZ_RE.match(hostname) is not None
1294def is_valid_ipv4_endpoint_url(endpoint_url):
1295 hostname = urlparse(endpoint_url).hostname
1296 return IPV4_RE.match(hostname) is not None
1299def is_valid_endpoint_url(endpoint_url):
1300 """Verify the endpoint_url is valid.
1302 :type endpoint_url: string
1303 :param endpoint_url: An endpoint_url. Must have at least a scheme
1304 and a hostname.
1306 :return: True if the endpoint url is valid. False otherwise.
1308 """
1309 # post-bpo-43882 urlsplit() strips unsafe characters from URL, causing
1310 # it to pass hostname validation below. Detect them early to fix that.
1311 if UNSAFE_URL_CHARS.intersection(endpoint_url):
1312 return False
1313 parts = urlsplit(endpoint_url)
1314 hostname = parts.hostname
1315 if hostname is None:
1316 return False
1317 if len(hostname) > 255:
1318 return False
1319 if hostname[-1] == ".":
1320 hostname = hostname[:-1]
1321 allowed = re.compile(
1322 r"^((?!-)[A-Z\d-]{1,63}(?<!-)\.)*((?!-)[A-Z\d-]{1,63}(?<!-))$",
1323 re.IGNORECASE,
1324 )
1325 return allowed.match(hostname)
1328def is_valid_uri(endpoint_url):
1329 return is_valid_endpoint_url(endpoint_url) or is_valid_ipv6_endpoint_url(
1330 endpoint_url
1331 )
1334def validate_region_name(region_name):
1335 """Provided region_name must be a valid host label."""
1336 if region_name is None:
1337 return
1338 valid_host_label = re.compile(r'^(?![0-9]+$)(?!-)[a-zA-Z0-9-]{,63}(?<!-)$')
1339 valid = valid_host_label.match(region_name)
1340 if not valid:
1341 raise InvalidRegionError(region_name=region_name)
1344def check_dns_name(bucket_name):
1345 """
1346 Check to see if the ``bucket_name`` complies with the
1347 restricted DNS naming conventions necessary to allow
1348 access via virtual-hosting style.
1350 Even though "." characters are perfectly valid in this DNS
1351 naming scheme, we are going to punt on any name containing a
1352 "." character because these will cause SSL cert validation
1353 problems if we try to use virtual-hosting style addressing.
1354 """
1355 if '.' in bucket_name:
1356 return False
1357 n = len(bucket_name)
1358 if n < 3 or n > 63:
1359 # Wrong length
1360 return False
1361 match = LABEL_RE.match(bucket_name)
1362 if match is None or match.end() != len(bucket_name):
1363 return False
1364 return True
1367def fix_s3_host(
1368 request,
1369 signature_version,
1370 region_name,
1371 default_endpoint_url=None,
1372 **kwargs,
1373):
1374 """
1375 This handler looks at S3 requests just before they are signed.
1376 If there is a bucket name on the path (true for everything except
1377 ListAllBuckets) it checks to see if that bucket name conforms to
1378 the DNS naming conventions. If it does, it alters the request to
1379 use ``virtual hosting`` style addressing rather than ``path-style``
1380 addressing.
1382 """
1383 if request.context.get('use_global_endpoint', False):
1384 default_endpoint_url = 's3.amazonaws.com'
1385 try:
1386 switch_to_virtual_host_style(
1387 request, signature_version, default_endpoint_url
1388 )
1389 except InvalidDNSNameError as e:
1390 bucket_name = e.kwargs['bucket_name']
1391 logger.debug(
1392 'Not changing URI, bucket is not DNS compatible: %s', bucket_name
1393 )
1396def switch_to_virtual_host_style(
1397 request, signature_version, default_endpoint_url=None, **kwargs
1398):
1399 """
1400 This is a handler to force virtual host style s3 addressing no matter
1401 the signature version (which is taken in consideration for the default
1402 case). If the bucket is not DNS compatible an InvalidDNSName is thrown.
1404 :param request: A AWSRequest object that is about to be sent.
1405 :param signature_version: The signature version to sign with
1406 :param default_endpoint_url: The endpoint to use when switching to a
1407 virtual style. If None is supplied, the virtual host will be
1408 constructed from the url of the request.
1409 """
1410 if request.auth_path is not None:
1411 # The auth_path has already been applied (this may be a
1412 # retried request). We don't need to perform this
1413 # customization again.
1414 return
1415 elif _is_get_bucket_location_request(request):
1416 # For the GetBucketLocation response, we should not be using
1417 # the virtual host style addressing so we can avoid any sigv4
1418 # issues.
1419 logger.debug(
1420 "Request is GetBucketLocation operation, not checking "
1421 "for DNS compatibility."
1422 )
1423 return
1424 parts = urlsplit(request.url)
1425 request.auth_path = parts.path
1426 path_parts = parts.path.split('/')
1428 # Retrieve what the endpoint we will be prepending the bucket name to.
1429 if default_endpoint_url is None:
1430 default_endpoint_url = parts.netloc
1432 if len(path_parts) > 1:
1433 bucket_name = path_parts[1]
1434 if not bucket_name:
1435 # If the bucket name is empty we should not be checking for
1436 # dns compatibility.
1437 return
1438 logger.debug('Checking for DNS compatible bucket for: %s', request.url)
1439 if check_dns_name(bucket_name):
1440 # If the operation is on a bucket, the auth_path must be
1441 # terminated with a '/' character.
1442 if len(path_parts) == 2:
1443 if request.auth_path[-1] != '/':
1444 request.auth_path += '/'
1445 path_parts.remove(bucket_name)
1446 # At the very least the path must be a '/', such as with the
1447 # CreateBucket operation when DNS style is being used. If this
1448 # is not used you will get an empty path which is incorrect.
1449 path = '/'.join(path_parts) or '/'
1450 global_endpoint = default_endpoint_url
1451 host = bucket_name + '.' + global_endpoint
1452 new_tuple = (parts.scheme, host, path, parts.query, '')
1453 new_uri = urlunsplit(new_tuple)
1454 request.url = new_uri
1455 logger.debug('URI updated to: %s', new_uri)
1456 else:
1457 raise InvalidDNSNameError(bucket_name=bucket_name)
1460def _is_get_bucket_location_request(request):
1461 return request.url.endswith('?location')
1464def instance_cache(func):
1465 """Method decorator for caching method calls to a single instance.
1467 **This is not a general purpose caching decorator.**
1469 In order to use this, you *must* provide an ``_instance_cache``
1470 attribute on the instance.
1472 This decorator is used to cache method calls. The cache is only
1473 scoped to a single instance though such that multiple instances
1474 will maintain their own cache. In order to keep things simple,
1475 this decorator requires that you provide an ``_instance_cache``
1476 attribute on your instance.
1478 """
1479 func_name = func.__name__
1481 @functools.wraps(func)
1482 def _cache_guard(self, *args, **kwargs):
1483 cache_key = (func_name, args)
1484 if kwargs:
1485 kwarg_items = tuple(sorted(kwargs.items()))
1486 cache_key = (func_name, args, kwarg_items)
1487 result = self._instance_cache.get(cache_key)
1488 if result is not None:
1489 return result
1490 result = func(self, *args, **kwargs)
1491 self._instance_cache[cache_key] = result
1492 return result
1494 return _cache_guard
1497def lru_cache_weakref(*cache_args, **cache_kwargs):
1498 """
1499 Version of functools.lru_cache that stores a weak reference to ``self``.
1501 Serves the same purpose as :py:func:`instance_cache` but uses Python's
1502 functools implementation which offers ``max_size`` and ``typed`` properties.
1504 lru_cache is a global cache even when used on a method. The cache's
1505 reference to ``self`` will prevent garbage collection of the object. This
1506 wrapper around functools.lru_cache replaces the reference to ``self`` with
1507 a weak reference to not interfere with garbage collection.
1508 """
1510 def wrapper(func):
1511 @functools.lru_cache(*cache_args, **cache_kwargs)
1512 def func_with_weakref(weakref_to_self, *args, **kwargs):
1513 return func(weakref_to_self(), *args, **kwargs)
1515 @functools.wraps(func)
1516 def inner(self, *args, **kwargs):
1517 for kwarg_key, kwarg_value in kwargs.items():
1518 if isinstance(kwarg_value, list):
1519 kwargs[kwarg_key] = tuple(kwarg_value)
1520 return func_with_weakref(weakref.ref(self), *args, **kwargs)
1522 inner.cache_info = func_with_weakref.cache_info
1523 return inner
1525 return wrapper
1528def switch_host_s3_accelerate(request, operation_name, **kwargs):
1529 """Switches the current s3 endpoint with an S3 Accelerate endpoint"""
1531 # Note that when registered the switching of the s3 host happens
1532 # before it gets changed to virtual. So we are not concerned with ensuring
1533 # that the bucket name is translated to the virtual style here and we
1534 # can hard code the Accelerate endpoint.
1535 parts = urlsplit(request.url).netloc.split('.')
1536 parts = [p for p in parts if p in S3_ACCELERATE_WHITELIST]
1537 endpoint = 'https://s3-accelerate.'
1538 if len(parts) > 0:
1539 endpoint += '.'.join(parts) + '.'
1540 endpoint += 'amazonaws.com'
1542 if operation_name in ['ListBuckets', 'CreateBucket', 'DeleteBucket']:
1543 return
1544 _switch_hosts(request, endpoint, use_new_scheme=False)
1547def switch_host_with_param(request, param_name):
1548 """Switches the host using a parameter value from a JSON request body"""
1549 request_json = json.loads(request.data.decode('utf-8'))
1550 if request_json.get(param_name):
1551 new_endpoint = request_json[param_name]
1552 _switch_hosts(request, new_endpoint)
1555def _switch_hosts(request, new_endpoint, use_new_scheme=True):
1556 final_endpoint = _get_new_endpoint(
1557 request.url, new_endpoint, use_new_scheme
1558 )
1559 request.url = final_endpoint
1562def _get_new_endpoint(original_endpoint, new_endpoint, use_new_scheme=True):
1563 new_endpoint_components = urlsplit(new_endpoint)
1564 original_endpoint_components = urlsplit(original_endpoint)
1565 scheme = original_endpoint_components.scheme
1566 if use_new_scheme:
1567 scheme = new_endpoint_components.scheme
1568 final_endpoint_components = (
1569 scheme,
1570 new_endpoint_components.netloc,
1571 original_endpoint_components.path,
1572 original_endpoint_components.query,
1573 '',
1574 )
1575 final_endpoint = urlunsplit(final_endpoint_components)
1576 logger.debug(
1577 'Updating URI from %s to %s', original_endpoint, final_endpoint
1578 )
1579 return final_endpoint
1582def deep_merge(base, extra):
1583 """Deeply two dictionaries, overriding existing keys in the base.
1585 :param base: The base dictionary which will be merged into.
1586 :param extra: The dictionary to merge into the base. Keys from this
1587 dictionary will take precedence.
1588 """
1589 for key in extra:
1590 # If the key represents a dict on both given dicts, merge the sub-dicts
1591 if (
1592 key in base
1593 and isinstance(base[key], dict)
1594 and isinstance(extra[key], dict)
1595 ):
1596 deep_merge(base[key], extra[key])
1597 continue
1599 # Otherwise, set the key on the base to be the value of the extra.
1600 base[key] = extra[key]
1603def hyphenize_service_id(service_id):
1604 """Translate the form used for event emitters.
1606 :param service_id: The service_id to convert.
1607 """
1608 return service_id.replace(' ', '-').lower()
1611class IdentityCache:
1612 """Base IdentityCache implementation for storing and retrieving
1613 highly accessed credentials.
1615 This class is not intended to be instantiated in user code.
1616 """
1618 METHOD = "base_identity_cache"
1620 def __init__(self, client, credential_cls):
1621 self._client = client
1622 self._credential_cls = credential_cls
1624 def get_credentials(self, **kwargs):
1625 callback = self.build_refresh_callback(**kwargs)
1626 metadata = callback()
1627 credential_entry = self._credential_cls.create_from_metadata(
1628 metadata=metadata,
1629 refresh_using=callback,
1630 method=self.METHOD,
1631 advisory_timeout=45,
1632 mandatory_timeout=10,
1633 )
1634 return credential_entry
1636 def build_refresh_callback(**kwargs):
1637 """Callback to be implemented by subclasses.
1639 Returns a set of metadata to be converted into a new
1640 credential instance.
1641 """
1642 raise NotImplementedError()
1645class S3ExpressIdentityCache(IdentityCache):
1646 """S3Express IdentityCache for retrieving and storing
1647 credentials from CreateSession calls.
1649 This class is not intended to be instantiated in user code.
1650 """
1652 METHOD = "s3express"
1654 def __init__(self, client, credential_cls):
1655 self._client = client
1656 self._credential_cls = credential_cls
1658 @functools.lru_cache(maxsize=100)
1659 def get_credentials(self, bucket):
1660 return super().get_credentials(bucket=bucket)
1662 def build_refresh_callback(self, bucket):
1663 def refresher():
1664 response = self._client.create_session(Bucket=bucket)
1665 creds = response['Credentials']
1666 expiration = self._serialize_if_needed(
1667 creds['Expiration'], iso=True
1668 )
1669 return {
1670 "access_key": creds['AccessKeyId'],
1671 "secret_key": creds['SecretAccessKey'],
1672 "token": creds['SessionToken'],
1673 "expiry_time": expiration,
1674 }
1676 return refresher
1678 def _serialize_if_needed(self, value, iso=False):
1679 if isinstance(value, _DatetimeClass):
1680 if iso:
1681 return value.isoformat()
1682 return value.strftime('%Y-%m-%dT%H:%M:%S%Z')
1683 return value
1686class S3ExpressIdentityResolver:
1687 def __init__(self, client, credential_cls, cache=None):
1688 self._client = weakref.proxy(client)
1690 if cache is None:
1691 cache = S3ExpressIdentityCache(self._client, credential_cls)
1692 self._cache = cache
1694 def register(self, event_emitter=None):
1695 logger.debug('Registering S3Express Identity Resolver')
1696 emitter = event_emitter or self._client.meta.events
1697 emitter.register('before-call.s3', self.apply_signing_cache_key)
1698 emitter.register('before-sign.s3', self.resolve_s3express_identity)
1700 def apply_signing_cache_key(self, params, context, **kwargs):
1701 endpoint_properties = context.get('endpoint_properties', {})
1702 backend = endpoint_properties.get('backend', None)
1704 # Add cache key if Bucket supplied for s3express request
1705 bucket_name = context.get('input_params', {}).get('Bucket')
1706 if backend == 'S3Express' and bucket_name is not None:
1707 context.setdefault('signing', {})
1708 context['signing']['cache_key'] = bucket_name
1710 def resolve_s3express_identity(
1711 self,
1712 request,
1713 signing_name,
1714 region_name,
1715 signature_version,
1716 request_signer,
1717 operation_name,
1718 **kwargs,
1719 ):
1720 signing_context = request.context.get('signing', {})
1721 signing_name = signing_context.get('signing_name')
1722 if signing_name == 's3express' and signature_version.startswith(
1723 'v4-s3express'
1724 ):
1725 signing_context['identity_cache'] = self._cache
1726 if 'cache_key' not in signing_context:
1727 signing_context['cache_key'] = (
1728 request.context.get('s3_redirect', {})
1729 .get('params', {})
1730 .get('Bucket')
1731 )
1734class S3RegionRedirectorv2:
1735 """Updated version of S3RegionRedirector for use when
1736 EndpointRulesetResolver is in use for endpoint resolution.
1738 This class is considered private and subject to abrupt breaking changes or
1739 removal without prior announcement. Please do not use it directly.
1740 """
1742 def __init__(self, endpoint_bridge, client, cache=None):
1743 self._cache = cache or {}
1744 self._client = weakref.proxy(client)
1746 def register(self, event_emitter=None):
1747 logger.debug('Registering S3 region redirector handler')
1748 emitter = event_emitter or self._client.meta.events
1749 emitter.register('needs-retry.s3', self.redirect_from_error)
1750 emitter.register(
1751 'before-parameter-build.s3', self.annotate_request_context
1752 )
1753 emitter.register(
1754 'before-endpoint-resolution.s3', self.redirect_from_cache
1755 )
1757 def redirect_from_error(self, request_dict, response, operation, **kwargs):
1758 """
1759 An S3 request sent to the wrong region will return an error that
1760 contains the endpoint the request should be sent to. This handler
1761 will add the redirect information to the signing context and then
1762 redirect the request.
1763 """
1764 if response is None:
1765 # This could be none if there was a ConnectionError or other
1766 # transport error.
1767 return
1769 redirect_ctx = request_dict.get('context', {}).get('s3_redirect', {})
1770 if ArnParser.is_arn(redirect_ctx.get('bucket')):
1771 logger.debug(
1772 'S3 request was previously for an Accesspoint ARN, not '
1773 'redirecting.'
1774 )
1775 return
1777 if redirect_ctx.get('redirected'):
1778 logger.debug(
1779 'S3 request was previously redirected, not redirecting.'
1780 )
1781 return
1783 error = response[1].get('Error', {})
1784 error_code = error.get('Code')
1785 response_metadata = response[1].get('ResponseMetadata', {})
1787 # We have to account for 400 responses because
1788 # if we sign a Head* request with the wrong region,
1789 # we'll get a 400 Bad Request but we won't get a
1790 # body saying it's an "AuthorizationHeaderMalformed".
1791 is_special_head_object = (
1792 error_code in ('301', '400') and operation.name == 'HeadObject'
1793 )
1794 is_special_head_bucket = (
1795 error_code in ('301', '400')
1796 and operation.name == 'HeadBucket'
1797 and 'x-amz-bucket-region'
1798 in response_metadata.get('HTTPHeaders', {})
1799 )
1800 is_wrong_signing_region = (
1801 error_code == 'AuthorizationHeaderMalformed' and 'Region' in error
1802 )
1803 is_redirect_status = response[0] is not None and response[
1804 0
1805 ].status_code in (301, 302, 307)
1806 is_permanent_redirect = error_code == 'PermanentRedirect'
1807 is_opt_in_region_redirect = (
1808 error_code == 'IllegalLocationConstraintException'
1809 and operation.name != 'CreateBucket'
1810 )
1811 if not any(
1812 [
1813 is_special_head_object,
1814 is_wrong_signing_region,
1815 is_permanent_redirect,
1816 is_special_head_bucket,
1817 is_redirect_status,
1818 is_opt_in_region_redirect,
1819 ]
1820 ):
1821 return
1823 bucket = request_dict['context']['s3_redirect']['bucket']
1824 client_region = request_dict['context'].get('client_region')
1825 new_region = self.get_bucket_region(bucket, response)
1827 if new_region is None:
1828 logger.debug(
1829 "S3 client configured for region %s but the "
1830 "bucket %s is not in that region and the proper region "
1831 "could not be automatically determined.",
1832 client_region,
1833 bucket,
1834 )
1835 return
1837 logger.debug(
1838 "S3 client configured for region %s but the bucket %s "
1839 "is in region %s; Please configure the proper region to "
1840 "avoid multiple unnecessary redirects and signing attempts.",
1841 client_region,
1842 bucket,
1843 new_region,
1844 )
1845 # Adding the new region to _cache will make construct_endpoint() to
1846 # use the new region as value for the AWS::Region builtin parameter.
1847 self._cache[bucket] = new_region
1849 # Re-resolve endpoint with new region and modify request_dict with
1850 # the new URL, auth scheme, and signing context.
1851 ep_resolver = self._client._ruleset_resolver
1852 ep_info = ep_resolver.construct_endpoint(
1853 operation_model=operation,
1854 call_args=request_dict['context']['s3_redirect']['params'],
1855 request_context=request_dict['context'],
1856 )
1857 request_dict['url'] = self.set_request_url(
1858 request_dict['url'], ep_info.url
1859 )
1860 request_dict['context']['s3_redirect']['redirected'] = True
1861 auth_schemes = ep_info.properties.get('authSchemes')
1862 if auth_schemes is not None:
1863 auth_info = ep_resolver.auth_schemes_to_signing_ctx(auth_schemes)
1864 auth_type, signing_context = auth_info
1865 request_dict['context']['auth_type'] = auth_type
1866 request_dict['context']['signing'] = {
1867 **request_dict['context'].get('signing', {}),
1868 **signing_context,
1869 }
1871 # Return 0 so it doesn't wait to retry
1872 return 0
1874 def get_bucket_region(self, bucket, response):
1875 """
1876 There are multiple potential sources for the new region to redirect to,
1877 but they aren't all universally available for use. This will try to
1878 find region from response elements, but will fall back to calling
1879 HEAD on the bucket if all else fails.
1881 :param bucket: The bucket to find the region for. This is necessary if
1882 the region is not available in the error response.
1883 :param response: A response representing a service request that failed
1884 due to incorrect region configuration.
1885 """
1886 # First try to source the region from the headers.
1887 service_response = response[1]
1888 response_headers = service_response['ResponseMetadata']['HTTPHeaders']
1889 if 'x-amz-bucket-region' in response_headers:
1890 return response_headers['x-amz-bucket-region']
1892 # Next, check the error body
1893 region = service_response.get('Error', {}).get('Region', None)
1894 if region is not None:
1895 return region
1897 # Finally, HEAD the bucket. No other choice sadly.
1898 try:
1899 response = self._client.head_bucket(Bucket=bucket)
1900 headers = response['ResponseMetadata']['HTTPHeaders']
1901 except ClientError as e:
1902 headers = e.response['ResponseMetadata']['HTTPHeaders']
1904 region = headers.get('x-amz-bucket-region', None)
1905 return region
1907 def set_request_url(self, old_url, new_endpoint, **kwargs):
1908 """
1909 Splice a new endpoint into an existing URL. Note that some endpoints
1910 from the the endpoint provider have a path component which will be
1911 discarded by this function.
1912 """
1913 return _get_new_endpoint(old_url, new_endpoint, False)
1915 def redirect_from_cache(self, builtins, params, **kwargs):
1916 """
1917 If a bucket name has been redirected before, it is in the cache. This
1918 handler will update the AWS::Region endpoint resolver builtin param
1919 to use the region from cache instead of the client region to avoid the
1920 redirect.
1921 """
1922 bucket = params.get('Bucket')
1923 if bucket is not None and bucket in self._cache:
1924 new_region = self._cache.get(bucket)
1925 builtins['AWS::Region'] = new_region
1927 def annotate_request_context(self, params, context, **kwargs):
1928 """Store the bucket name in context for later use when redirecting.
1929 The bucket name may be an access point ARN or alias.
1930 """
1931 bucket = params.get('Bucket')
1932 context['s3_redirect'] = {
1933 'redirected': False,
1934 'bucket': bucket,
1935 'params': params,
1936 }
1939class S3RegionRedirector:
1940 """This handler has been replaced by S3RegionRedirectorv2. The original
1941 version remains in place for any third-party libraries that import it.
1942 """
1944 def __init__(self, endpoint_bridge, client, cache=None):
1945 self._endpoint_resolver = endpoint_bridge
1946 self._cache = cache
1947 if self._cache is None:
1948 self._cache = {}
1950 # This needs to be a weak ref in order to prevent memory leaks on
1951 # python 2.6
1952 self._client = weakref.proxy(client)
1954 warnings.warn(
1955 'The S3RegionRedirector class has been deprecated for a new '
1956 'internal replacement. A future version of botocore may remove '
1957 'this class.',
1958 category=FutureWarning,
1959 )
1961 def register(self, event_emitter=None):
1962 emitter = event_emitter or self._client.meta.events
1963 emitter.register('needs-retry.s3', self.redirect_from_error)
1964 emitter.register('before-call.s3', self.set_request_url)
1965 emitter.register('before-parameter-build.s3', self.redirect_from_cache)
1967 def redirect_from_error(self, request_dict, response, operation, **kwargs):
1968 """
1969 An S3 request sent to the wrong region will return an error that
1970 contains the endpoint the request should be sent to. This handler
1971 will add the redirect information to the signing context and then
1972 redirect the request.
1973 """
1974 if response is None:
1975 # This could be none if there was a ConnectionError or other
1976 # transport error.
1977 return
1979 if self._is_s3_accesspoint(request_dict.get('context', {})):
1980 logger.debug(
1981 'S3 request was previously to an accesspoint, not redirecting.'
1982 )
1983 return
1985 if request_dict.get('context', {}).get('s3_redirected'):
1986 logger.debug(
1987 'S3 request was previously redirected, not redirecting.'
1988 )
1989 return
1991 error = response[1].get('Error', {})
1992 error_code = error.get('Code')
1993 response_metadata = response[1].get('ResponseMetadata', {})
1995 # We have to account for 400 responses because
1996 # if we sign a Head* request with the wrong region,
1997 # we'll get a 400 Bad Request but we won't get a
1998 # body saying it's an "AuthorizationHeaderMalformed".
1999 is_special_head_object = (
2000 error_code in ('301', '400') and operation.name == 'HeadObject'
2001 )
2002 is_special_head_bucket = (
2003 error_code in ('301', '400')
2004 and operation.name == 'HeadBucket'
2005 and 'x-amz-bucket-region'
2006 in response_metadata.get('HTTPHeaders', {})
2007 )
2008 is_wrong_signing_region = (
2009 error_code == 'AuthorizationHeaderMalformed' and 'Region' in error
2010 )
2011 is_redirect_status = response[0] is not None and response[
2012 0
2013 ].status_code in (301, 302, 307)
2014 is_permanent_redirect = error_code == 'PermanentRedirect'
2015 if not any(
2016 [
2017 is_special_head_object,
2018 is_wrong_signing_region,
2019 is_permanent_redirect,
2020 is_special_head_bucket,
2021 is_redirect_status,
2022 ]
2023 ):
2024 return
2026 bucket = request_dict['context']['signing']['bucket']
2027 client_region = request_dict['context'].get('client_region')
2028 new_region = self.get_bucket_region(bucket, response)
2030 if new_region is None:
2031 logger.debug(
2032 "S3 client configured for region %s but the bucket %s is not "
2033 "in that region and the proper region could not be "
2034 "automatically determined.",
2035 client_region,
2036 bucket,
2037 )
2038 return
2040 logger.debug(
2041 "S3 client configured for region %s but the bucket %s is in region"
2042 " %s; Please configure the proper region to avoid multiple "
2043 "unnecessary redirects and signing attempts.",
2044 client_region,
2045 bucket,
2046 new_region,
2047 )
2048 endpoint = self._endpoint_resolver.resolve('s3', new_region)
2049 endpoint = endpoint['endpoint_url']
2051 signing_context = {
2052 'region': new_region,
2053 'bucket': bucket,
2054 'endpoint': endpoint,
2055 }
2056 request_dict['context']['signing'] = signing_context
2058 self._cache[bucket] = signing_context
2059 self.set_request_url(request_dict, request_dict['context'])
2061 request_dict['context']['s3_redirected'] = True
2063 # Return 0 so it doesn't wait to retry
2064 return 0
2066 def get_bucket_region(self, bucket, response):
2067 """
2068 There are multiple potential sources for the new region to redirect to,
2069 but they aren't all universally available for use. This will try to
2070 find region from response elements, but will fall back to calling
2071 HEAD on the bucket if all else fails.
2073 :param bucket: The bucket to find the region for. This is necessary if
2074 the region is not available in the error response.
2075 :param response: A response representing a service request that failed
2076 due to incorrect region configuration.
2077 """
2078 # First try to source the region from the headers.
2079 service_response = response[1]
2080 response_headers = service_response['ResponseMetadata']['HTTPHeaders']
2081 if 'x-amz-bucket-region' in response_headers:
2082 return response_headers['x-amz-bucket-region']
2084 # Next, check the error body
2085 region = service_response.get('Error', {}).get('Region', None)
2086 if region is not None:
2087 return region
2089 # Finally, HEAD the bucket. No other choice sadly.
2090 try:
2091 response = self._client.head_bucket(Bucket=bucket)
2092 headers = response['ResponseMetadata']['HTTPHeaders']
2093 except ClientError as e:
2094 headers = e.response['ResponseMetadata']['HTTPHeaders']
2096 region = headers.get('x-amz-bucket-region', None)
2097 return region
2099 def set_request_url(self, params, context, **kwargs):
2100 endpoint = context.get('signing', {}).get('endpoint', None)
2101 if endpoint is not None:
2102 params['url'] = _get_new_endpoint(params['url'], endpoint, False)
2104 def redirect_from_cache(self, params, context, **kwargs):
2105 """
2106 This handler retrieves a given bucket's signing context from the cache
2107 and adds it into the request context.
2108 """
2109 if self._is_s3_accesspoint(context):
2110 return
2111 bucket = params.get('Bucket')
2112 signing_context = self._cache.get(bucket)
2113 if signing_context is not None:
2114 context['signing'] = signing_context
2115 else:
2116 context['signing'] = {'bucket': bucket}
2118 def _is_s3_accesspoint(self, context):
2119 return 's3_accesspoint' in context
2122class InvalidArnException(ValueError):
2123 pass
2126class ArnParser:
2127 def parse_arn(self, arn):
2128 arn_parts = arn.split(':', 5)
2129 if len(arn_parts) < 6:
2130 raise InvalidArnException(
2131 f'Provided ARN: {arn} must be of the format: '
2132 'arn:partition:service:region:account:resource'
2133 )
2134 return {
2135 'partition': arn_parts[1],
2136 'service': arn_parts[2],
2137 'region': arn_parts[3],
2138 'account': arn_parts[4],
2139 'resource': arn_parts[5],
2140 }
2142 @staticmethod
2143 def is_arn(value):
2144 if not isinstance(value, str) or not value.startswith('arn:'):
2145 return False
2146 arn_parser = ArnParser()
2147 try:
2148 arn_parser.parse_arn(value)
2149 return True
2150 except InvalidArnException:
2151 return False
2154class S3ArnParamHandler:
2155 _RESOURCE_REGEX = re.compile(
2156 r'^(?P<resource_type>accesspoint|outpost)[/:](?P<resource_name>.+)$'
2157 )
2158 _OUTPOST_RESOURCE_REGEX = re.compile(
2159 r'^(?P<outpost_name>[a-zA-Z0-9\-]{1,63})[/:]accesspoint[/:]'
2160 r'(?P<accesspoint_name>[a-zA-Z0-9\-]{1,63}$)'
2161 )
2162 _BLACKLISTED_OPERATIONS = ['CreateBucket']
2164 def __init__(self, arn_parser=None):
2165 self._arn_parser = arn_parser
2166 if arn_parser is None:
2167 self._arn_parser = ArnParser()
2169 def register(self, event_emitter):
2170 event_emitter.register('before-parameter-build.s3', self.handle_arn)
2172 def handle_arn(self, params, model, context, **kwargs):
2173 if model.name in self._BLACKLISTED_OPERATIONS:
2174 return
2175 arn_details = self._get_arn_details_from_bucket_param(params)
2176 if arn_details is None:
2177 return
2178 if arn_details['resource_type'] == 'accesspoint':
2179 self._store_accesspoint(params, context, arn_details)
2180 elif arn_details['resource_type'] == 'outpost':
2181 self._store_outpost(params, context, arn_details)
2183 def _get_arn_details_from_bucket_param(self, params):
2184 if 'Bucket' in params:
2185 try:
2186 arn = params['Bucket']
2187 arn_details = self._arn_parser.parse_arn(arn)
2188 self._add_resource_type_and_name(arn, arn_details)
2189 return arn_details
2190 except InvalidArnException:
2191 pass
2192 return None
2194 def _add_resource_type_and_name(self, arn, arn_details):
2195 match = self._RESOURCE_REGEX.match(arn_details['resource'])
2196 if match:
2197 arn_details['resource_type'] = match.group('resource_type')
2198 arn_details['resource_name'] = match.group('resource_name')
2199 else:
2200 raise UnsupportedS3ArnError(arn=arn)
2202 def _store_accesspoint(self, params, context, arn_details):
2203 # Ideally the access-point would be stored as a parameter in the
2204 # request where the serializer would then know how to serialize it,
2205 # but access-points are not modeled in S3 operations so it would fail
2206 # validation. Instead, we set the access-point to the bucket parameter
2207 # to have some value set when serializing the request and additional
2208 # information on the context from the arn to use in forming the
2209 # access-point endpoint.
2210 params['Bucket'] = arn_details['resource_name']
2211 context['s3_accesspoint'] = {
2212 'name': arn_details['resource_name'],
2213 'account': arn_details['account'],
2214 'partition': arn_details['partition'],
2215 'region': arn_details['region'],
2216 'service': arn_details['service'],
2217 }
2219 def _store_outpost(self, params, context, arn_details):
2220 resource_name = arn_details['resource_name']
2221 match = self._OUTPOST_RESOURCE_REGEX.match(resource_name)
2222 if not match:
2223 raise UnsupportedOutpostResourceError(resource_name=resource_name)
2224 # Because we need to set the bucket name to something to pass
2225 # validation we're going to use the access point name to be consistent
2226 # with normal access point arns.
2227 accesspoint_name = match.group('accesspoint_name')
2228 params['Bucket'] = accesspoint_name
2229 context['s3_accesspoint'] = {
2230 'outpost_name': match.group('outpost_name'),
2231 'name': accesspoint_name,
2232 'account': arn_details['account'],
2233 'partition': arn_details['partition'],
2234 'region': arn_details['region'],
2235 'service': arn_details['service'],
2236 }
2239class S3EndpointSetter:
2240 _DEFAULT_PARTITION = 'aws'
2241 _DEFAULT_DNS_SUFFIX = 'amazonaws.com'
2243 def __init__(
2244 self,
2245 endpoint_resolver,
2246 region=None,
2247 s3_config=None,
2248 endpoint_url=None,
2249 partition=None,
2250 use_fips_endpoint=False,
2251 ):
2252 # This is calling the endpoint_resolver in regions.py
2253 self._endpoint_resolver = endpoint_resolver
2254 self._region = region
2255 self._s3_config = s3_config
2256 self._use_fips_endpoint = use_fips_endpoint
2257 if s3_config is None:
2258 self._s3_config = {}
2259 self._endpoint_url = endpoint_url
2260 self._partition = partition
2261 if partition is None:
2262 self._partition = self._DEFAULT_PARTITION
2264 def register(self, event_emitter):
2265 event_emitter.register('before-sign.s3', self.set_endpoint)
2266 event_emitter.register('choose-signer.s3', self.set_signer)
2267 event_emitter.register(
2268 'before-call.s3.WriteGetObjectResponse',
2269 self.update_endpoint_to_s3_object_lambda,
2270 )
2272 def update_endpoint_to_s3_object_lambda(self, params, context, **kwargs):
2273 if self._use_accelerate_endpoint:
2274 raise UnsupportedS3ConfigurationError(
2275 msg='S3 client does not support accelerate endpoints for S3 Object Lambda operations',
2276 )
2278 self._override_signing_name(context, 's3-object-lambda')
2279 if self._endpoint_url:
2280 # Only update the url if an explicit url was not provided
2281 return
2283 resolver = self._endpoint_resolver
2284 # Constructing endpoints as s3-object-lambda as region
2285 resolved = resolver.construct_endpoint(
2286 's3-object-lambda', self._region
2287 )
2289 # Ideally we would be able to replace the endpoint before
2290 # serialization but there's no event to do that currently
2291 # host_prefix is all the arn/bucket specs
2292 new_endpoint = 'https://{host_prefix}{hostname}'.format(
2293 host_prefix=params['host_prefix'],
2294 hostname=resolved['hostname'],
2295 )
2297 params['url'] = _get_new_endpoint(params['url'], new_endpoint, False)
2299 def set_endpoint(self, request, **kwargs):
2300 if self._use_accesspoint_endpoint(request):
2301 self._validate_accesspoint_supported(request)
2302 self._validate_fips_supported(request)
2303 self._validate_global_regions(request)
2304 region_name = self._resolve_region_for_accesspoint_endpoint(
2305 request
2306 )
2307 self._resolve_signing_name_for_accesspoint_endpoint(request)
2308 self._switch_to_accesspoint_endpoint(request, region_name)
2309 return
2310 if self._use_accelerate_endpoint:
2311 if self._use_fips_endpoint:
2312 raise UnsupportedS3ConfigurationError(
2313 msg=(
2314 'Client is configured to use the FIPS psuedo region '
2315 f'for "{self._region}", but S3 Accelerate does not have any FIPS '
2316 'compatible endpoints.'
2317 )
2318 )
2319 switch_host_s3_accelerate(request=request, **kwargs)
2320 if self._s3_addressing_handler:
2321 self._s3_addressing_handler(request=request, **kwargs)
2323 def _use_accesspoint_endpoint(self, request):
2324 return 's3_accesspoint' in request.context
2326 def _validate_fips_supported(self, request):
2327 if not self._use_fips_endpoint:
2328 return
2329 if 'fips' in request.context['s3_accesspoint']['region']:
2330 raise UnsupportedS3AccesspointConfigurationError(
2331 msg={'Invalid ARN, FIPS region not allowed in ARN.'}
2332 )
2333 if 'outpost_name' in request.context['s3_accesspoint']:
2334 raise UnsupportedS3AccesspointConfigurationError(
2335 msg=(
2336 f'Client is configured to use the FIPS psuedo-region "{self._region}", '
2337 'but outpost ARNs do not support FIPS endpoints.'
2338 )
2339 )
2340 # Transforming psuedo region to actual region
2341 accesspoint_region = request.context['s3_accesspoint']['region']
2342 if accesspoint_region != self._region:
2343 if not self._s3_config.get('use_arn_region', True):
2344 # TODO: Update message to reflect use_arn_region
2345 # is not set
2346 raise UnsupportedS3AccesspointConfigurationError(
2347 msg=(
2348 'Client is configured to use the FIPS psuedo-region '
2349 f'for "{self._region}", but the access-point ARN provided is for '
2350 f'the "{accesspoint_region}" region. For clients using a FIPS '
2351 'psuedo-region calls to access-point ARNs in another '
2352 'region are not allowed.'
2353 )
2354 )
2356 def _validate_global_regions(self, request):
2357 if self._s3_config.get('use_arn_region', True):
2358 return
2359 if self._region in ['aws-global', 's3-external-1']:
2360 raise UnsupportedS3AccesspointConfigurationError(
2361 msg=(
2362 'Client is configured to use the global psuedo-region '
2363 f'"{self._region}". When providing access-point ARNs a regional '
2364 'endpoint must be specified.'
2365 )
2366 )
2368 def _validate_accesspoint_supported(self, request):
2369 if self._use_accelerate_endpoint:
2370 raise UnsupportedS3AccesspointConfigurationError(
2371 msg=(
2372 'Client does not support s3 accelerate configuration '
2373 'when an access-point ARN is specified.'
2374 )
2375 )
2376 request_partition = request.context['s3_accesspoint']['partition']
2377 if request_partition != self._partition:
2378 raise UnsupportedS3AccesspointConfigurationError(
2379 msg=(
2380 f'Client is configured for "{self._partition}" partition, but access-point'
2381 f' ARN provided is for "{request_partition}" partition. The client and '
2382 ' access-point partition must be the same.'
2383 )
2384 )
2385 s3_service = request.context['s3_accesspoint'].get('service')
2386 if s3_service == 's3-object-lambda' and self._s3_config.get(
2387 'use_dualstack_endpoint'
2388 ):
2389 raise UnsupportedS3AccesspointConfigurationError(
2390 msg=(
2391 'Client does not support s3 dualstack configuration '
2392 'when an S3 Object Lambda access point ARN is specified.'
2393 )
2394 )
2395 outpost_name = request.context['s3_accesspoint'].get('outpost_name')
2396 if outpost_name and self._s3_config.get('use_dualstack_endpoint'):
2397 raise UnsupportedS3AccesspointConfigurationError(
2398 msg=(
2399 'Client does not support s3 dualstack configuration '
2400 'when an outpost ARN is specified.'
2401 )
2402 )
2403 self._validate_mrap_s3_config(request)
2405 def _validate_mrap_s3_config(self, request):
2406 if not is_global_accesspoint(request.context):
2407 return
2408 if self._s3_config.get('s3_disable_multiregion_access_points'):
2409 raise UnsupportedS3AccesspointConfigurationError(
2410 msg=(
2411 'Invalid configuration, Multi-Region Access Point '
2412 'ARNs are disabled.'
2413 )
2414 )
2415 elif self._s3_config.get('use_dualstack_endpoint'):
2416 raise UnsupportedS3AccesspointConfigurationError(
2417 msg=(
2418 'Client does not support s3 dualstack configuration '
2419 'when a Multi-Region Access Point ARN is specified.'
2420 )
2421 )
2423 def _resolve_region_for_accesspoint_endpoint(self, request):
2424 if is_global_accesspoint(request.context):
2425 # Requests going to MRAP endpoints MUST be set to any (*) region.
2426 self._override_signing_region(request, '*')
2427 elif self._s3_config.get('use_arn_region', True):
2428 accesspoint_region = request.context['s3_accesspoint']['region']
2429 # If we are using the region from the access point,
2430 # we will also want to make sure that we set it as the
2431 # signing region as well
2432 self._override_signing_region(request, accesspoint_region)
2433 return accesspoint_region
2434 return self._region
2436 def set_signer(self, context, **kwargs):
2437 if is_global_accesspoint(context):
2438 if HAS_CRT:
2439 return 's3v4a'
2440 else:
2441 raise MissingDependencyException(
2442 msg="Using S3 with an MRAP arn requires an additional "
2443 "dependency. You will need to pip install "
2444 "botocore[crt] before proceeding."
2445 )
2447 def _resolve_signing_name_for_accesspoint_endpoint(self, request):
2448 accesspoint_service = request.context['s3_accesspoint']['service']
2449 self._override_signing_name(request.context, accesspoint_service)
2451 def _switch_to_accesspoint_endpoint(self, request, region_name):
2452 original_components = urlsplit(request.url)
2453 accesspoint_endpoint = urlunsplit(
2454 (
2455 original_components.scheme,
2456 self._get_netloc(request.context, region_name),
2457 self._get_accesspoint_path(
2458 original_components.path, request.context
2459 ),
2460 original_components.query,
2461 '',
2462 )
2463 )
2464 logger.debug(
2465 'Updating URI from %s to %s', request.url, accesspoint_endpoint
2466 )
2467 request.url = accesspoint_endpoint
2469 def _get_netloc(self, request_context, region_name):
2470 if is_global_accesspoint(request_context):
2471 return self._get_mrap_netloc(request_context)
2472 else:
2473 return self._get_accesspoint_netloc(request_context, region_name)
2475 def _get_mrap_netloc(self, request_context):
2476 s3_accesspoint = request_context['s3_accesspoint']
2477 region_name = 's3-global'
2478 mrap_netloc_components = [s3_accesspoint['name']]
2479 if self._endpoint_url:
2480 endpoint_url_netloc = urlsplit(self._endpoint_url).netloc
2481 mrap_netloc_components.append(endpoint_url_netloc)
2482 else:
2483 partition = s3_accesspoint['partition']
2484 mrap_netloc_components.extend(
2485 [
2486 'accesspoint',
2487 region_name,
2488 self._get_partition_dns_suffix(partition),
2489 ]
2490 )
2491 return '.'.join(mrap_netloc_components)
2493 def _get_accesspoint_netloc(self, request_context, region_name):
2494 s3_accesspoint = request_context['s3_accesspoint']
2495 accesspoint_netloc_components = [
2496 '{}-{}'.format(s3_accesspoint['name'], s3_accesspoint['account']),
2497 ]
2498 outpost_name = s3_accesspoint.get('outpost_name')
2499 if self._endpoint_url:
2500 if outpost_name:
2501 accesspoint_netloc_components.append(outpost_name)
2502 endpoint_url_netloc = urlsplit(self._endpoint_url).netloc
2503 accesspoint_netloc_components.append(endpoint_url_netloc)
2504 else:
2505 if outpost_name:
2506 outpost_host = [outpost_name, 's3-outposts']
2507 accesspoint_netloc_components.extend(outpost_host)
2508 elif s3_accesspoint['service'] == 's3-object-lambda':
2509 component = self._inject_fips_if_needed(
2510 's3-object-lambda', request_context
2511 )
2512 accesspoint_netloc_components.append(component)
2513 else:
2514 component = self._inject_fips_if_needed(
2515 's3-accesspoint', request_context
2516 )
2517 accesspoint_netloc_components.append(component)
2518 if self._s3_config.get('use_dualstack_endpoint'):
2519 accesspoint_netloc_components.append('dualstack')
2520 accesspoint_netloc_components.extend(
2521 [region_name, self._get_dns_suffix(region_name)]
2522 )
2523 return '.'.join(accesspoint_netloc_components)
2525 def _inject_fips_if_needed(self, component, request_context):
2526 if self._use_fips_endpoint:
2527 return f'{component}-fips'
2528 return component
2530 def _get_accesspoint_path(self, original_path, request_context):
2531 # The Bucket parameter was substituted with the access-point name as
2532 # some value was required in serializing the bucket name. Now that
2533 # we are making the request directly to the access point, we will
2534 # want to remove that access-point name from the path.
2535 name = request_context['s3_accesspoint']['name']
2536 # All S3 operations require at least a / in their path.
2537 return original_path.replace('/' + name, '', 1) or '/'
2539 def _get_partition_dns_suffix(self, partition_name):
2540 dns_suffix = self._endpoint_resolver.get_partition_dns_suffix(
2541 partition_name
2542 )
2543 if dns_suffix is None:
2544 dns_suffix = self._DEFAULT_DNS_SUFFIX
2545 return dns_suffix
2547 def _get_dns_suffix(self, region_name):
2548 resolved = self._endpoint_resolver.construct_endpoint(
2549 's3', region_name
2550 )
2551 dns_suffix = self._DEFAULT_DNS_SUFFIX
2552 if resolved and 'dnsSuffix' in resolved:
2553 dns_suffix = resolved['dnsSuffix']
2554 return dns_suffix
2556 def _override_signing_region(self, request, region_name):
2557 signing_context = request.context.get('signing', {})
2558 # S3SigV4Auth will use the context['signing']['region'] value to
2559 # sign with if present. This is used by the Bucket redirector
2560 # as well but we should be fine because the redirector is never
2561 # used in combination with the accesspoint setting logic.
2562 signing_context['region'] = region_name
2563 request.context['signing'] = signing_context
2565 def _override_signing_name(self, context, signing_name):
2566 signing_context = context.get('signing', {})
2567 # S3SigV4Auth will use the context['signing']['signing_name'] value to
2568 # sign with if present. This is used by the Bucket redirector
2569 # as well but we should be fine because the redirector is never
2570 # used in combination with the accesspoint setting logic.
2571 signing_context['signing_name'] = signing_name
2572 context['signing'] = signing_context
2574 @CachedProperty
2575 def _use_accelerate_endpoint(self):
2576 # Enable accelerate if the configuration is set to to true or the
2577 # endpoint being used matches one of the accelerate endpoints.
2579 # Accelerate has been explicitly configured.
2580 if self._s3_config.get('use_accelerate_endpoint'):
2581 return True
2583 # Accelerate mode is turned on automatically if an endpoint url is
2584 # provided that matches the accelerate scheme.
2585 if self._endpoint_url is None:
2586 return False
2588 # Accelerate is only valid for Amazon endpoints.
2589 netloc = urlsplit(self._endpoint_url).netloc
2590 if not netloc.endswith('amazonaws.com'):
2591 return False
2593 # The first part of the url should always be s3-accelerate.
2594 parts = netloc.split('.')
2595 if parts[0] != 's3-accelerate':
2596 return False
2598 # Url parts between 's3-accelerate' and 'amazonaws.com' which
2599 # represent different url features.
2600 feature_parts = parts[1:-2]
2602 # There should be no duplicate url parts.
2603 if len(feature_parts) != len(set(feature_parts)):
2604 return False
2606 # Remaining parts must all be in the whitelist.
2607 return all(p in S3_ACCELERATE_WHITELIST for p in feature_parts)
2609 @CachedProperty
2610 def _addressing_style(self):
2611 # Use virtual host style addressing if accelerate is enabled or if
2612 # the given endpoint url is an accelerate endpoint.
2613 if self._use_accelerate_endpoint:
2614 return 'virtual'
2616 # If a particular addressing style is configured, use it.
2617 configured_addressing_style = self._s3_config.get('addressing_style')
2618 if configured_addressing_style:
2619 return configured_addressing_style
2621 @CachedProperty
2622 def _s3_addressing_handler(self):
2623 # If virtual host style was configured, use it regardless of whether
2624 # or not the bucket looks dns compatible.
2625 if self._addressing_style == 'virtual':
2626 logger.debug("Using S3 virtual host style addressing.")
2627 return switch_to_virtual_host_style
2629 # If path style is configured, no additional steps are needed. If
2630 # endpoint_url was specified, don't default to virtual. We could
2631 # potentially default provided endpoint urls to virtual hosted
2632 # style, but for now it is avoided.
2633 if self._addressing_style == 'path' or self._endpoint_url is not None:
2634 logger.debug("Using S3 path style addressing.")
2635 return None
2637 logger.debug(
2638 "Defaulting to S3 virtual host style addressing with "
2639 "path style addressing fallback."
2640 )
2642 # By default, try to use virtual style with path fallback.
2643 return fix_s3_host
2646class S3ControlEndpointSetter:
2647 _DEFAULT_PARTITION = 'aws'
2648 _DEFAULT_DNS_SUFFIX = 'amazonaws.com'
2649 _HOST_LABEL_REGEX = re.compile(r'^[a-zA-Z0-9\-]{1,63}$')
2651 def __init__(
2652 self,
2653 endpoint_resolver,
2654 region=None,
2655 s3_config=None,
2656 endpoint_url=None,
2657 partition=None,
2658 use_fips_endpoint=False,
2659 ):
2660 self._endpoint_resolver = endpoint_resolver
2661 self._region = region
2662 self._s3_config = s3_config
2663 self._use_fips_endpoint = use_fips_endpoint
2664 if s3_config is None:
2665 self._s3_config = {}
2666 self._endpoint_url = endpoint_url
2667 self._partition = partition
2668 if partition is None:
2669 self._partition = self._DEFAULT_PARTITION
2671 def register(self, event_emitter):
2672 event_emitter.register('before-sign.s3-control', self.set_endpoint)
2674 def set_endpoint(self, request, **kwargs):
2675 if self._use_endpoint_from_arn_details(request):
2676 self._validate_endpoint_from_arn_details_supported(request)
2677 region_name = self._resolve_region_from_arn_details(request)
2678 self._resolve_signing_name_from_arn_details(request)
2679 self._resolve_endpoint_from_arn_details(request, region_name)
2680 self._add_headers_from_arn_details(request)
2681 elif self._use_endpoint_from_outpost_id(request):
2682 self._validate_outpost_redirection_valid(request)
2683 self._override_signing_name(request, 's3-outposts')
2684 new_netloc = self._construct_outpost_endpoint(self._region)
2685 self._update_request_netloc(request, new_netloc)
2687 def _use_endpoint_from_arn_details(self, request):
2688 return 'arn_details' in request.context
2690 def _use_endpoint_from_outpost_id(self, request):
2691 return 'outpost_id' in request.context
2693 def _validate_endpoint_from_arn_details_supported(self, request):
2694 if 'fips' in request.context['arn_details']['region']:
2695 raise UnsupportedS3ControlArnError(
2696 arn=request.context['arn_details']['original'],
2697 msg='Invalid ARN, FIPS region not allowed in ARN.',
2698 )
2699 if not self._s3_config.get('use_arn_region', False):
2700 arn_region = request.context['arn_details']['region']
2701 if arn_region != self._region:
2702 error_msg = (
2703 'The use_arn_region configuration is disabled but '
2704 f'received arn for "{arn_region}" when the client is configured '
2705 f'to use "{self._region}"'
2706 )
2707 raise UnsupportedS3ControlConfigurationError(msg=error_msg)
2708 request_partion = request.context['arn_details']['partition']
2709 if request_partion != self._partition:
2710 raise UnsupportedS3ControlConfigurationError(
2711 msg=(
2712 f'Client is configured for "{self._partition}" partition, but arn '
2713 f'provided is for "{request_partion}" partition. The client and '
2714 'arn partition must be the same.'
2715 )
2716 )
2717 if self._s3_config.get('use_accelerate_endpoint'):
2718 raise UnsupportedS3ControlConfigurationError(
2719 msg='S3 control client does not support accelerate endpoints',
2720 )
2721 if 'outpost_name' in request.context['arn_details']:
2722 self._validate_outpost_redirection_valid(request)
2724 def _validate_outpost_redirection_valid(self, request):
2725 if self._s3_config.get('use_dualstack_endpoint'):
2726 raise UnsupportedS3ControlConfigurationError(
2727 msg=(
2728 'Client does not support s3 dualstack configuration '
2729 'when an outpost is specified.'
2730 )
2731 )
2733 def _resolve_region_from_arn_details(self, request):
2734 if self._s3_config.get('use_arn_region', False):
2735 arn_region = request.context['arn_details']['region']
2736 # If we are using the region from the expanded arn, we will also
2737 # want to make sure that we set it as the signing region as well
2738 self._override_signing_region(request, arn_region)
2739 return arn_region
2740 return self._region
2742 def _resolve_signing_name_from_arn_details(self, request):
2743 arn_service = request.context['arn_details']['service']
2744 self._override_signing_name(request, arn_service)
2745 return arn_service
2747 def _resolve_endpoint_from_arn_details(self, request, region_name):
2748 new_netloc = self._resolve_netloc_from_arn_details(
2749 request, region_name
2750 )
2751 self._update_request_netloc(request, new_netloc)
2753 def _update_request_netloc(self, request, new_netloc):
2754 original_components = urlsplit(request.url)
2755 arn_details_endpoint = urlunsplit(
2756 (
2757 original_components.scheme,
2758 new_netloc,
2759 original_components.path,
2760 original_components.query,
2761 '',
2762 )
2763 )
2764 logger.debug(
2765 'Updating URI from %s to %s', request.url, arn_details_endpoint
2766 )
2767 request.url = arn_details_endpoint
2769 def _resolve_netloc_from_arn_details(self, request, region_name):
2770 arn_details = request.context['arn_details']
2771 if 'outpost_name' in arn_details:
2772 return self._construct_outpost_endpoint(region_name)
2773 account = arn_details['account']
2774 return self._construct_s3_control_endpoint(region_name, account)
2776 def _is_valid_host_label(self, label):
2777 return self._HOST_LABEL_REGEX.match(label)
2779 def _validate_host_labels(self, *labels):
2780 for label in labels:
2781 if not self._is_valid_host_label(label):
2782 raise InvalidHostLabelError(label=label)
2784 def _construct_s3_control_endpoint(self, region_name, account):
2785 self._validate_host_labels(region_name, account)
2786 if self._endpoint_url:
2787 endpoint_url_netloc = urlsplit(self._endpoint_url).netloc
2788 netloc = [account, endpoint_url_netloc]
2789 else:
2790 netloc = [
2791 account,
2792 's3-control',
2793 ]
2794 self._add_dualstack(netloc)
2795 dns_suffix = self._get_dns_suffix(region_name)
2796 netloc.extend([region_name, dns_suffix])
2797 return self._construct_netloc(netloc)
2799 def _construct_outpost_endpoint(self, region_name):
2800 self._validate_host_labels(region_name)
2801 if self._endpoint_url:
2802 return urlsplit(self._endpoint_url).netloc
2803 else:
2804 netloc = [
2805 's3-outposts',
2806 region_name,
2807 self._get_dns_suffix(region_name),
2808 ]
2809 self._add_fips(netloc)
2810 return self._construct_netloc(netloc)
2812 def _construct_netloc(self, netloc):
2813 return '.'.join(netloc)
2815 def _add_fips(self, netloc):
2816 if self._use_fips_endpoint:
2817 netloc[0] = netloc[0] + '-fips'
2819 def _add_dualstack(self, netloc):
2820 if self._s3_config.get('use_dualstack_endpoint'):
2821 netloc.append('dualstack')
2823 def _get_dns_suffix(self, region_name):
2824 resolved = self._endpoint_resolver.construct_endpoint(
2825 's3', region_name
2826 )
2827 dns_suffix = self._DEFAULT_DNS_SUFFIX
2828 if resolved and 'dnsSuffix' in resolved:
2829 dns_suffix = resolved['dnsSuffix']
2830 return dns_suffix
2832 def _override_signing_region(self, request, region_name):
2833 signing_context = request.context.get('signing', {})
2834 # S3SigV4Auth will use the context['signing']['region'] value to
2835 # sign with if present. This is used by the Bucket redirector
2836 # as well but we should be fine because the redirector is never
2837 # used in combination with the accesspoint setting logic.
2838 signing_context['region'] = region_name
2839 request.context['signing'] = signing_context
2841 def _override_signing_name(self, request, signing_name):
2842 signing_context = request.context.get('signing', {})
2843 # S3SigV4Auth will use the context['signing']['signing_name'] value to
2844 # sign with if present. This is used by the Bucket redirector
2845 # as well but we should be fine because the redirector is never
2846 # used in combination with the accesspoint setting logic.
2847 signing_context['signing_name'] = signing_name
2848 request.context['signing'] = signing_context
2850 def _add_headers_from_arn_details(self, request):
2851 arn_details = request.context['arn_details']
2852 outpost_name = arn_details.get('outpost_name')
2853 if outpost_name:
2854 self._add_outpost_id_header(request, outpost_name)
2856 def _add_outpost_id_header(self, request, outpost_name):
2857 request.headers['x-amz-outpost-id'] = outpost_name
2860class S3ControlArnParamHandler:
2861 """This handler has been replaced by S3ControlArnParamHandlerv2. The
2862 original version remains in place for any third-party importers.
2863 """
2865 _RESOURCE_SPLIT_REGEX = re.compile(r'[/:]')
2867 def __init__(self, arn_parser=None):
2868 self._arn_parser = arn_parser
2869 if arn_parser is None:
2870 self._arn_parser = ArnParser()
2871 warnings.warn(
2872 'The S3ControlArnParamHandler class has been deprecated for a new '
2873 'internal replacement. A future version of botocore may remove '
2874 'this class.',
2875 category=FutureWarning,
2876 )
2878 def register(self, event_emitter):
2879 event_emitter.register(
2880 'before-parameter-build.s3-control',
2881 self.handle_arn,
2882 )
2884 def handle_arn(self, params, model, context, **kwargs):
2885 if model.name in ('CreateBucket', 'ListRegionalBuckets'):
2886 # CreateBucket and ListRegionalBuckets are special cases that do
2887 # not obey ARN based redirection but will redirect based off of the
2888 # presence of the OutpostId parameter
2889 self._handle_outpost_id_param(params, model, context)
2890 else:
2891 self._handle_name_param(params, model, context)
2892 self._handle_bucket_param(params, model, context)
2894 def _get_arn_details_from_param(self, params, param_name):
2895 if param_name not in params:
2896 return None
2897 try:
2898 arn = params[param_name]
2899 arn_details = self._arn_parser.parse_arn(arn)
2900 arn_details['original'] = arn
2901 arn_details['resources'] = self._split_resource(arn_details)
2902 return arn_details
2903 except InvalidArnException:
2904 return None
2906 def _split_resource(self, arn_details):
2907 return self._RESOURCE_SPLIT_REGEX.split(arn_details['resource'])
2909 def _override_account_id_param(self, params, arn_details):
2910 account_id = arn_details['account']
2911 if 'AccountId' in params and params['AccountId'] != account_id:
2912 error_msg = (
2913 'Account ID in arn does not match the AccountId parameter '
2914 'provided: "{}"'
2915 ).format(params['AccountId'])
2916 raise UnsupportedS3ControlArnError(
2917 arn=arn_details['original'],
2918 msg=error_msg,
2919 )
2920 params['AccountId'] = account_id
2922 def _handle_outpost_id_param(self, params, model, context):
2923 if 'OutpostId' not in params:
2924 return
2925 context['outpost_id'] = params['OutpostId']
2927 def _handle_name_param(self, params, model, context):
2928 # CreateAccessPoint is a special case that does not expand Name
2929 if model.name == 'CreateAccessPoint':
2930 return
2931 arn_details = self._get_arn_details_from_param(params, 'Name')
2932 if arn_details is None:
2933 return
2934 if self._is_outpost_accesspoint(arn_details):
2935 self._store_outpost_accesspoint(params, context, arn_details)
2936 else:
2937 error_msg = 'The Name parameter does not support the provided ARN'
2938 raise UnsupportedS3ControlArnError(
2939 arn=arn_details['original'],
2940 msg=error_msg,
2941 )
2943 def _is_outpost_accesspoint(self, arn_details):
2944 if arn_details['service'] != 's3-outposts':
2945 return False
2946 resources = arn_details['resources']
2947 if len(resources) != 4:
2948 return False
2949 # Resource must be of the form outpost/op-123/accesspoint/name
2950 return resources[0] == 'outpost' and resources[2] == 'accesspoint'
2952 def _store_outpost_accesspoint(self, params, context, arn_details):
2953 self._override_account_id_param(params, arn_details)
2954 accesspoint_name = arn_details['resources'][3]
2955 params['Name'] = accesspoint_name
2956 arn_details['accesspoint_name'] = accesspoint_name
2957 arn_details['outpost_name'] = arn_details['resources'][1]
2958 context['arn_details'] = arn_details
2960 def _handle_bucket_param(self, params, model, context):
2961 arn_details = self._get_arn_details_from_param(params, 'Bucket')
2962 if arn_details is None:
2963 return
2964 if self._is_outpost_bucket(arn_details):
2965 self._store_outpost_bucket(params, context, arn_details)
2966 else:
2967 error_msg = (
2968 'The Bucket parameter does not support the provided ARN'
2969 )
2970 raise UnsupportedS3ControlArnError(
2971 arn=arn_details['original'],
2972 msg=error_msg,
2973 )
2975 def _is_outpost_bucket(self, arn_details):
2976 if arn_details['service'] != 's3-outposts':
2977 return False
2978 resources = arn_details['resources']
2979 if len(resources) != 4:
2980 return False
2981 # Resource must be of the form outpost/op-123/bucket/name
2982 return resources[0] == 'outpost' and resources[2] == 'bucket'
2984 def _store_outpost_bucket(self, params, context, arn_details):
2985 self._override_account_id_param(params, arn_details)
2986 bucket_name = arn_details['resources'][3]
2987 params['Bucket'] = bucket_name
2988 arn_details['bucket_name'] = bucket_name
2989 arn_details['outpost_name'] = arn_details['resources'][1]
2990 context['arn_details'] = arn_details
2993class S3ControlArnParamHandlerv2(S3ControlArnParamHandler):
2994 """Updated version of S3ControlArnParamHandler for use when
2995 EndpointRulesetResolver is in use for endpoint resolution.
2997 This class is considered private and subject to abrupt breaking changes or
2998 removal without prior announcement. Please do not use it directly.
2999 """
3001 def __init__(self, arn_parser=None):
3002 self._arn_parser = arn_parser
3003 if arn_parser is None:
3004 self._arn_parser = ArnParser()
3006 def register(self, event_emitter):
3007 event_emitter.register(
3008 'before-endpoint-resolution.s3-control',
3009 self.handle_arn,
3010 )
3012 def _handle_name_param(self, params, model, context):
3013 # CreateAccessPoint is a special case that does not expand Name
3014 if model.name == 'CreateAccessPoint':
3015 return
3016 arn_details = self._get_arn_details_from_param(params, 'Name')
3017 if arn_details is None:
3018 return
3019 self._raise_for_fips_pseudo_region(arn_details)
3020 self._raise_for_accelerate_endpoint(context)
3021 if self._is_outpost_accesspoint(arn_details):
3022 self._store_outpost_accesspoint(params, context, arn_details)
3023 else:
3024 error_msg = 'The Name parameter does not support the provided ARN'
3025 raise UnsupportedS3ControlArnError(
3026 arn=arn_details['original'],
3027 msg=error_msg,
3028 )
3030 def _store_outpost_accesspoint(self, params, context, arn_details):
3031 self._override_account_id_param(params, arn_details)
3033 def _handle_bucket_param(self, params, model, context):
3034 arn_details = self._get_arn_details_from_param(params, 'Bucket')
3035 if arn_details is None:
3036 return
3037 self._raise_for_fips_pseudo_region(arn_details)
3038 self._raise_for_accelerate_endpoint(context)
3039 if self._is_outpost_bucket(arn_details):
3040 self._store_outpost_bucket(params, context, arn_details)
3041 else:
3042 error_msg = (
3043 'The Bucket parameter does not support the provided ARN'
3044 )
3045 raise UnsupportedS3ControlArnError(
3046 arn=arn_details['original'],
3047 msg=error_msg,
3048 )
3050 def _store_outpost_bucket(self, params, context, arn_details):
3051 self._override_account_id_param(params, arn_details)
3053 def _raise_for_fips_pseudo_region(self, arn_details):
3054 # FIPS pseudo region names cannot be used in ARNs
3055 arn_region = arn_details['region']
3056 if arn_region.startswith('fips-') or arn_region.endswith('fips-'):
3057 raise UnsupportedS3ControlArnError(
3058 arn=arn_details['original'],
3059 msg='Invalid ARN, FIPS region not allowed in ARN.',
3060 )
3062 def _raise_for_accelerate_endpoint(self, context):
3063 s3_config = context['client_config'].s3 or {}
3064 if s3_config.get('use_accelerate_endpoint'):
3065 raise UnsupportedS3ControlConfigurationError(
3066 msg='S3 control client does not support accelerate endpoints',
3067 )
3070class ContainerMetadataFetcher:
3071 TIMEOUT_SECONDS = 2
3072 RETRY_ATTEMPTS = 3
3073 SLEEP_TIME = 1
3074 IP_ADDRESS = '169.254.170.2'
3075 _ALLOWED_HOSTS = [
3076 IP_ADDRESS,
3077 '169.254.170.23',
3078 'fd00:ec2::23',
3079 'localhost',
3080 ]
3082 def __init__(self, session=None, sleep=time.sleep):
3083 if session is None:
3084 session = botocore.httpsession.URLLib3Session(
3085 timeout=self.TIMEOUT_SECONDS
3086 )
3087 self._session = session
3088 self._sleep = sleep
3090 def retrieve_full_uri(self, full_url, headers=None):
3091 """Retrieve JSON metadata from container metadata.
3093 :type full_url: str
3094 :param full_url: The full URL of the metadata service.
3095 This should include the scheme as well, e.g
3096 "http://localhost:123/foo"
3098 """
3099 self._validate_allowed_url(full_url)
3100 return self._retrieve_credentials(full_url, headers)
3102 def _validate_allowed_url(self, full_url):
3103 parsed = botocore.compat.urlparse(full_url)
3104 if self._is_loopback_address(parsed.hostname):
3105 return
3106 is_whitelisted_host = self._check_if_whitelisted_host(parsed.hostname)
3107 if not is_whitelisted_host:
3108 raise ValueError(
3109 f"Unsupported host '{parsed.hostname}'. Can only retrieve metadata "
3110 f"from a loopback address or one of these hosts: {', '.join(self._ALLOWED_HOSTS)}"
3111 )
3113 def _is_loopback_address(self, hostname):
3114 try:
3115 ip = ip_address(hostname)
3116 return ip.is_loopback
3117 except ValueError:
3118 return False
3120 def _check_if_whitelisted_host(self, host):
3121 if host in self._ALLOWED_HOSTS:
3122 return True
3123 return False
3125 def retrieve_uri(self, relative_uri):
3126 """Retrieve JSON metadata from container metadata.
3128 :type relative_uri: str
3129 :param relative_uri: A relative URI, e.g "/foo/bar?id=123"
3131 :return: The parsed JSON response.
3133 """
3134 full_url = self.full_url(relative_uri)
3135 return self._retrieve_credentials(full_url)
3137 def _retrieve_credentials(self, full_url, extra_headers=None):
3138 headers = {'Accept': 'application/json'}
3139 if extra_headers is not None:
3140 headers.update(extra_headers)
3141 attempts = 0
3142 while True:
3143 try:
3144 return self._get_response(
3145 full_url, headers, self.TIMEOUT_SECONDS
3146 )
3147 except MetadataRetrievalError as e:
3148 logger.debug(
3149 "Received error when attempting to retrieve "
3150 "container metadata: %s",
3151 e,
3152 exc_info=True,
3153 )
3154 self._sleep(self.SLEEP_TIME)
3155 attempts += 1
3156 if attempts >= self.RETRY_ATTEMPTS:
3157 raise
3159 def _get_response(self, full_url, headers, timeout):
3160 try:
3161 AWSRequest = botocore.awsrequest.AWSRequest
3162 request = AWSRequest(method='GET', url=full_url, headers=headers)
3163 response = self._session.send(request.prepare())
3164 response_text = response.content.decode('utf-8')
3165 if response.status_code != 200:
3166 raise MetadataRetrievalError(
3167 error_msg=(
3168 f"Received non 200 response {response.status_code} "
3169 f"from container metadata: {response_text}"
3170 )
3171 )
3172 try:
3173 return json.loads(response_text)
3174 except ValueError:
3175 error_msg = "Unable to parse JSON returned from container metadata services"
3176 logger.debug('%s:%s', error_msg, response_text)
3177 raise MetadataRetrievalError(error_msg=error_msg)
3178 except RETRYABLE_HTTP_ERRORS as e:
3179 error_msg = (
3180 "Received error when attempting to retrieve "
3181 f"container metadata: {e}"
3182 )
3183 raise MetadataRetrievalError(error_msg=error_msg)
3185 def full_url(self, relative_uri):
3186 return f'http://{self.IP_ADDRESS}{relative_uri}'
3189def get_environ_proxies(url):
3190 if should_bypass_proxies(url):
3191 return {}
3192 else:
3193 return getproxies()
3196def should_bypass_proxies(url):
3197 """
3198 Returns whether we should bypass proxies or not.
3199 """
3200 # NOTE: requests allowed for ip/cidr entries in no_proxy env that we don't
3201 # support current as urllib only checks DNS suffix
3202 # If the system proxy settings indicate that this URL should be bypassed,
3203 # don't proxy.
3204 # The proxy_bypass function is incredibly buggy on OS X in early versions
3205 # of Python 2.6, so allow this call to fail. Only catch the specific
3206 # exceptions we've seen, though: this call failing in other ways can reveal
3207 # legitimate problems.
3208 try:
3209 if proxy_bypass(urlparse(url).netloc):
3210 return True
3211 except (TypeError, socket.gaierror):
3212 pass
3214 return False
3217def determine_content_length(body):
3218 # No body, content length of 0
3219 if not body:
3220 return 0
3222 # Try asking the body for it's length
3223 try:
3224 return len(body)
3225 except (AttributeError, TypeError):
3226 pass
3228 # Try getting the length from a seekable stream
3229 if hasattr(body, 'seek') and hasattr(body, 'tell'):
3230 try:
3231 orig_pos = body.tell()
3232 body.seek(0, 2)
3233 end_file_pos = body.tell()
3234 body.seek(orig_pos)
3235 return end_file_pos - orig_pos
3236 except io.UnsupportedOperation:
3237 # in case when body is, for example, io.BufferedIOBase object
3238 # it has "seek" method which throws "UnsupportedOperation"
3239 # exception in such case we want to fall back to "chunked"
3240 # encoding
3241 pass
3242 # Failed to determine the length
3243 return None
3246def get_encoding_from_headers(headers, default='ISO-8859-1'):
3247 """Returns encodings from given HTTP Header Dict.
3249 :param headers: dictionary to extract encoding from.
3250 :param default: default encoding if the content-type is text
3251 """
3253 content_type = headers.get('content-type')
3255 if not content_type:
3256 return None
3258 message = email.message.Message()
3259 message['content-type'] = content_type
3260 charset = message.get_param("charset")
3262 if charset is not None:
3263 return charset
3265 if 'text' in content_type:
3266 return default
3269def calculate_md5(body, **kwargs):
3270 """This function has been deprecated, but is kept for backwards compatibility."""
3271 if isinstance(body, (bytes, bytearray)):
3272 binary_md5 = _calculate_md5_from_bytes(body)
3273 else:
3274 binary_md5 = _calculate_md5_from_file(body)
3275 return base64.b64encode(binary_md5).decode('ascii')
3278def _calculate_md5_from_bytes(body_bytes):
3279 """This function has been deprecated, but is kept for backwards compatibility."""
3280 md5 = get_md5(body_bytes, usedforsecurity=False)
3281 return md5.digest()
3284def _calculate_md5_from_file(fileobj):
3285 """This function has been deprecated, but is kept for backwards compatibility."""
3286 start_position = fileobj.tell()
3287 md5 = get_md5(usedforsecurity=False)
3288 for chunk in iter(lambda: fileobj.read(1024 * 1024), b''):
3289 md5.update(chunk)
3290 fileobj.seek(start_position)
3291 return md5.digest()
3294def _is_s3express_request(params):
3295 endpoint_properties = params.get('context', {}).get(
3296 'endpoint_properties', {}
3297 )
3298 return endpoint_properties.get('backend') == 'S3Express'
3301def has_checksum_header(params):
3302 """
3303 Checks if a header starting with "x-amz-checksum-" is provided in a request.
3305 This function is considered private and subject to abrupt breaking changes or
3306 removal without prior announcement. Please do not use it directly.
3307 """
3308 headers = params['headers']
3310 # If a header matching the x-amz-checksum-* pattern is present, we
3311 # assume a checksum has already been provided by the user.
3312 for header in headers:
3313 if CHECKSUM_HEADER_PATTERN.match(header):
3314 return True
3316 return False
3319def conditionally_calculate_checksum(params, **kwargs):
3320 """This function has been deprecated, but is kept for backwards compatibility."""
3321 if not has_checksum_header(params):
3322 conditionally_calculate_md5(params, **kwargs)
3323 conditionally_enable_crc32(params, **kwargs)
3326def conditionally_enable_crc32(params, **kwargs):
3327 """This function has been deprecated, but is kept for backwards compatibility."""
3328 checksum_context = params.get('context', {}).get('checksum', {})
3329 checksum_algorithm = checksum_context.get('request_algorithm')
3330 if (
3331 _is_s3express_request(params)
3332 and params['body'] is not None
3333 and checksum_algorithm in (None, "conditional-md5")
3334 ):
3335 params['context']['checksum'] = {
3336 'request_algorithm': {
3337 'algorithm': 'crc32',
3338 'in': 'header',
3339 'name': 'x-amz-checksum-crc32',
3340 }
3341 }
3344def conditionally_calculate_md5(params, **kwargs):
3345 """Only add a Content-MD5 if the system supports it.
3347 This function has been deprecated, but is kept for backwards compatibility.
3348 """
3349 body = params['body']
3350 checksum_context = params.get('context', {}).get('checksum', {})
3351 checksum_algorithm = checksum_context.get('request_algorithm')
3352 if checksum_algorithm and checksum_algorithm != 'conditional-md5':
3353 # Skip for requests that will have a flexible checksum applied
3354 return
3356 if has_checksum_header(params):
3357 # Don't add a new header if one is already available.
3358 return
3360 if _is_s3express_request(params):
3361 # S3Express doesn't support MD5
3362 return
3364 if MD5_AVAILABLE and body is not None:
3365 md5_digest = calculate_md5(body, **kwargs)
3366 params['headers']['Content-MD5'] = md5_digest
3369class FileWebIdentityTokenLoader:
3370 def __init__(self, web_identity_token_path, _open=open):
3371 self._web_identity_token_path = web_identity_token_path
3372 self._open = _open
3374 def __call__(self):
3375 with self._open(self._web_identity_token_path) as token_file:
3376 return token_file.read()
3379class SSOTokenLoader:
3380 def __init__(self, cache=None):
3381 if cache is None:
3382 cache = {}
3383 self._cache = cache
3385 def _generate_cache_key(self, start_url, session_name):
3386 input_str = start_url
3387 if session_name is not None:
3388 input_str = session_name
3389 return hashlib.sha1(input_str.encode('utf-8')).hexdigest()
3391 def save_token(self, start_url, token, session_name=None):
3392 cache_key = self._generate_cache_key(start_url, session_name)
3393 self._cache[cache_key] = token
3395 def __call__(self, start_url, session_name=None):
3396 cache_key = self._generate_cache_key(start_url, session_name)
3397 logger.debug('Checking for cached token at: %s', cache_key)
3398 if cache_key not in self._cache:
3399 name = start_url
3400 if session_name is not None:
3401 name = session_name
3402 error_msg = f'Token for {name} does not exist'
3403 raise SSOTokenLoadError(error_msg=error_msg)
3405 token = self._cache[cache_key]
3406 if 'accessToken' not in token or 'expiresAt' not in token:
3407 error_msg = f'Token for {start_url} is invalid'
3408 raise SSOTokenLoadError(error_msg=error_msg)
3409 return token
3412class EventbridgeSignerSetter:
3413 _DEFAULT_PARTITION = 'aws'
3414 _DEFAULT_DNS_SUFFIX = 'amazonaws.com'
3416 def __init__(self, endpoint_resolver, region=None, endpoint_url=None):
3417 self._endpoint_resolver = endpoint_resolver
3418 self._region = region
3419 self._endpoint_url = endpoint_url
3421 def register(self, event_emitter):
3422 event_emitter.register(
3423 'before-parameter-build.events.PutEvents',
3424 self.check_for_global_endpoint,
3425 )
3426 event_emitter.register(
3427 'before-call.events.PutEvents', self.set_endpoint_url
3428 )
3430 def set_endpoint_url(self, params, context, **kwargs):
3431 if 'eventbridge_endpoint' in context:
3432 endpoint = context['eventbridge_endpoint']
3433 logger.debug(
3434 "Rewriting URL from %s to %s", params['url'], endpoint
3435 )
3436 params['url'] = endpoint
3438 def check_for_global_endpoint(self, params, context, **kwargs):
3439 endpoint = params.get('EndpointId')
3440 if endpoint is None:
3441 return
3443 if len(endpoint) == 0:
3444 raise InvalidEndpointConfigurationError(
3445 msg='EndpointId must not be a zero length string'
3446 )
3448 if not HAS_CRT:
3449 raise MissingDependencyException(
3450 msg="Using EndpointId requires an additional "
3451 "dependency. You will need to pip install "
3452 "botocore[crt] before proceeding."
3453 )
3455 config = context.get('client_config')
3456 endpoint_variant_tags = None
3457 if config is not None:
3458 if config.use_fips_endpoint:
3459 raise InvalidEndpointConfigurationError(
3460 msg="FIPS is not supported with EventBridge "
3461 "multi-region endpoints."
3462 )
3463 if config.use_dualstack_endpoint:
3464 endpoint_variant_tags = ['dualstack']
3466 if self._endpoint_url is None:
3467 # Validate endpoint is a valid hostname component
3468 parts = urlparse(f'https://{endpoint}')
3469 if parts.hostname != endpoint:
3470 raise InvalidEndpointConfigurationError(
3471 msg='EndpointId is not a valid hostname component.'
3472 )
3473 resolved_endpoint = self._get_global_endpoint(
3474 endpoint, endpoint_variant_tags=endpoint_variant_tags
3475 )
3476 else:
3477 resolved_endpoint = self._endpoint_url
3479 context['eventbridge_endpoint'] = resolved_endpoint
3480 context['auth_type'] = 'v4a'
3482 def _get_global_endpoint(self, endpoint, endpoint_variant_tags=None):
3483 resolver = self._endpoint_resolver
3485 partition = resolver.get_partition_for_region(self._region)
3486 if partition is None:
3487 partition = self._DEFAULT_PARTITION
3488 dns_suffix = resolver.get_partition_dns_suffix(
3489 partition, endpoint_variant_tags=endpoint_variant_tags
3490 )
3491 if dns_suffix is None:
3492 dns_suffix = self._DEFAULT_DNS_SUFFIX
3494 return f"https://{endpoint}.endpoint.events.{dns_suffix}/"
3497def is_s3_accelerate_url(url):
3498 """Does the URL match the S3 Accelerate endpoint scheme?
3500 Virtual host naming style with bucket names in the netloc part of the URL
3501 are not allowed by this function.
3502 """
3503 if url is None:
3504 return False
3506 # Accelerate is only valid for Amazon endpoints.
3507 url_parts = urlsplit(url)
3508 if not url_parts.netloc.endswith(
3509 'amazonaws.com'
3510 ) or url_parts.scheme not in ['https', 'http']:
3511 return False
3513 # The first part of the URL must be s3-accelerate.
3514 parts = url_parts.netloc.split('.')
3515 if parts[0] != 's3-accelerate':
3516 return False
3518 # Url parts between 's3-accelerate' and 'amazonaws.com' which
3519 # represent different url features.
3520 feature_parts = parts[1:-2]
3522 # There should be no duplicate URL parts.
3523 if len(feature_parts) != len(set(feature_parts)):
3524 return False
3526 # Remaining parts must all be in the whitelist.
3527 return all(p in S3_ACCELERATE_WHITELIST for p in feature_parts)
3530class JSONFileCache:
3531 """JSON file cache.
3532 This provides a dict like interface that stores JSON serializable
3533 objects.
3534 The objects are serialized to JSON and stored in a file. These
3535 values can be retrieved at a later time.
3536 """
3538 CACHE_DIR = os.path.expanduser(os.path.join('~', '.aws', 'boto', 'cache'))
3540 def __init__(self, working_dir=CACHE_DIR, dumps_func=None):
3541 self._working_dir = working_dir
3542 if dumps_func is None:
3543 dumps_func = self._default_dumps
3544 self._dumps = dumps_func
3546 def _default_dumps(self, obj):
3547 return json.dumps(obj, default=self._serialize_if_needed)
3549 def __contains__(self, cache_key):
3550 actual_key = self._convert_cache_key(cache_key)
3551 return os.path.isfile(actual_key)
3553 def __getitem__(self, cache_key):
3554 """Retrieve value from a cache key."""
3555 actual_key = self._convert_cache_key(cache_key)
3556 try:
3557 with open(actual_key) as f:
3558 return json.load(f)
3559 except (OSError, ValueError):
3560 raise KeyError(cache_key)
3562 def __delitem__(self, cache_key):
3563 actual_key = self._convert_cache_key(cache_key)
3564 try:
3565 key_path = Path(actual_key)
3566 key_path.unlink()
3567 except FileNotFoundError:
3568 raise KeyError(cache_key)
3570 def __setitem__(self, cache_key, value):
3571 full_key = self._convert_cache_key(cache_key)
3572 try:
3573 file_content = self._dumps(value)
3574 except (TypeError, ValueError):
3575 raise ValueError(
3576 f"Value cannot be cached, must be JSON serializable: {value}"
3577 )
3578 if not os.path.isdir(self._working_dir):
3579 os.makedirs(self._working_dir, exist_ok=True)
3580 with os.fdopen(
3581 os.open(full_key, os.O_WRONLY | os.O_CREAT, 0o600), 'w'
3582 ) as f:
3583 f.truncate()
3584 f.write(file_content)
3586 def _convert_cache_key(self, cache_key):
3587 full_path = os.path.join(self._working_dir, cache_key + '.json')
3588 return full_path
3590 def _serialize_if_needed(self, value, iso=False):
3591 if isinstance(value, _DatetimeClass):
3592 if iso:
3593 return value.isoformat()
3594 return value.strftime('%Y-%m-%dT%H:%M:%S%Z')
3595 return value
3598def is_s3express_bucket(bucket):
3599 if bucket is None:
3600 return False
3601 return bucket.endswith('--x-s3')
3604def get_token_from_environment(signing_name, environ=None):
3605 if not isinstance(signing_name, str) or not signing_name.strip():
3606 return None
3608 if environ is None:
3609 environ = os.environ
3610 env_var = _get_bearer_env_var_name(signing_name)
3611 return environ.get(env_var)
3614def _get_bearer_env_var_name(signing_name):
3615 bearer_name = signing_name.replace('-', '_').replace(' ', '_').upper()
3616 return f"AWS_BEARER_TOKEN_{bearer_name}"
3619# This parameter is not part of the public interface and is subject to abrupt
3620# breaking changes or removal without prior announcement.
3621# Mapping of services that have been renamed for backwards compatibility reasons.
3622# Keys are the previous name that should be allowed, values are the documented
3623# and preferred client name.
3624SERVICE_NAME_ALIASES = {'runtime.sagemaker': 'sagemaker-runtime'}
3627# This parameter is not part of the public interface and is subject to abrupt
3628# breaking changes or removal without prior announcement.
3629# Mapping to determine the service ID for services that do not use it as the
3630# model data directory name. The keys are the data directory name and the
3631# values are the transformed service IDs (lower case and hyphenated).
3632CLIENT_NAME_TO_HYPHENIZED_SERVICE_ID_OVERRIDES = {
3633 # Actual service name we use -> Allowed computed service name.
3634 'apigateway': 'api-gateway',
3635 'application-autoscaling': 'application-auto-scaling',
3636 'appmesh': 'app-mesh',
3637 'autoscaling': 'auto-scaling',
3638 'autoscaling-plans': 'auto-scaling-plans',
3639 'ce': 'cost-explorer',
3640 'cloudhsmv2': 'cloudhsm-v2',
3641 'cloudsearchdomain': 'cloudsearch-domain',
3642 'cognito-idp': 'cognito-identity-provider',
3643 'config': 'config-service',
3644 'cur': 'cost-and-usage-report-service',
3645 'datapipeline': 'data-pipeline',
3646 'directconnect': 'direct-connect',
3647 'devicefarm': 'device-farm',
3648 'discovery': 'application-discovery-service',
3649 'dms': 'database-migration-service',
3650 'ds': 'directory-service',
3651 'ds-data': 'directory-service-data',
3652 'dynamodbstreams': 'dynamodb-streams',
3653 'elasticbeanstalk': 'elastic-beanstalk',
3654 'elastictranscoder': 'elastic-transcoder',
3655 'elb': 'elastic-load-balancing',
3656 'elbv2': 'elastic-load-balancing-v2',
3657 'es': 'elasticsearch-service',
3658 'events': 'eventbridge',
3659 'globalaccelerator': 'global-accelerator',
3660 'iot-data': 'iot-data-plane',
3661 'iot-jobs-data': 'iot-jobs-data-plane',
3662 'iotevents-data': 'iot-events-data',
3663 'iotevents': 'iot-events',
3664 'iotwireless': 'iot-wireless',
3665 'kinesisanalytics': 'kinesis-analytics',
3666 'kinesisanalyticsv2': 'kinesis-analytics-v2',
3667 'kinesisvideo': 'kinesis-video',
3668 'lex-models': 'lex-model-building-service',
3669 'lexv2-models': 'lex-models-v2',
3670 'lex-runtime': 'lex-runtime-service',
3671 'lexv2-runtime': 'lex-runtime-v2',
3672 'logs': 'cloudwatch-logs',
3673 'machinelearning': 'machine-learning',
3674 'marketplacecommerceanalytics': 'marketplace-commerce-analytics',
3675 'marketplace-entitlement': 'marketplace-entitlement-service',
3676 'meteringmarketplace': 'marketplace-metering',
3677 'mgh': 'migration-hub',
3678 'sms-voice': 'pinpoint-sms-voice',
3679 'resourcegroupstaggingapi': 'resource-groups-tagging-api',
3680 'route53': 'route-53',
3681 'route53domains': 'route-53-domains',
3682 's3control': 's3-control',
3683 'sdb': 'simpledb',
3684 'secretsmanager': 'secrets-manager',
3685 'serverlessrepo': 'serverlessapplicationrepository',
3686 'servicecatalog': 'service-catalog',
3687 'servicecatalog-appregistry': 'service-catalog-appregistry',
3688 'stepfunctions': 'sfn',
3689 'storagegateway': 'storage-gateway',
3690}