Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/botocore/utils.py: 21%
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1# Copyright 2012-2014 Amazon.com, Inc. or its affiliates. All Rights Reserved.
2#
3# Licensed under the Apache License, Version 2.0 (the "License"). You
4# may not use this file except in compliance with the License. A copy of
5# the License is located at
6#
7# http://aws.amazon.com/apache2.0/
8#
9# or in the "license" file accompanying this file. This file is
10# distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF
11# ANY KIND, either express or implied. See the License for the specific
12# language governing permissions and limitations under the License.
13import base64
14import binascii
15import datetime
16import email.message
17import functools
18import hashlib
19import io
20import logging
21import os
22import random
23import re
24import socket
25import time
26import warnings
27import weakref
28from datetime import datetime as _DatetimeClass
29from ipaddress import ip_address
30from pathlib import Path
31from urllib.request import getproxies, proxy_bypass
33import dateutil.parser
34from dateutil.tz import tzutc
35from urllib3.exceptions import LocationParseError
37import botocore
38import botocore.awsrequest
39import botocore.httpsession
41# IP Regexes retained for backwards compatibility
42from botocore.compat import HEX_PAT # noqa: F401
43from botocore.compat import IPV4_PAT # noqa: F401
44from botocore.compat import IPV6_ADDRZ_PAT # noqa: F401
45from botocore.compat import IPV6_PAT # noqa: F401
46from botocore.compat import LS32_PAT # noqa: F401
47from botocore.compat import UNRESERVED_PAT # noqa: F401
48from botocore.compat import ZONE_ID_PAT # noqa: F401
49from botocore.compat import (
50 HAS_CRT,
51 IPV4_RE,
52 IPV6_ADDRZ_RE,
53 MD5_AVAILABLE,
54 UNSAFE_URL_CHARS,
55 OrderedDict,
56 get_md5,
57 get_tzinfo_options,
58 json,
59 quote,
60 urlparse,
61 urlsplit,
62 urlunsplit,
63 zip_longest,
64)
65from botocore.exceptions import (
66 ClientError,
67 ConfigNotFound,
68 ConnectionClosedError,
69 ConnectTimeoutError,
70 EndpointConnectionError,
71 HTTPClientError,
72 InvalidDNSNameError,
73 InvalidEndpointConfigurationError,
74 InvalidExpressionError,
75 InvalidHostLabelError,
76 InvalidIMDSEndpointError,
77 InvalidIMDSEndpointModeError,
78 InvalidRegionError,
79 MetadataRetrievalError,
80 MissingDependencyException,
81 ReadTimeoutError,
82 SSOTokenLoadError,
83 UnsupportedOutpostResourceError,
84 UnsupportedS3AccesspointConfigurationError,
85 UnsupportedS3ArnError,
86 UnsupportedS3ConfigurationError,
87 UnsupportedS3ControlArnError,
88 UnsupportedS3ControlConfigurationError,
89)
91logger = logging.getLogger(__name__)
92DEFAULT_METADATA_SERVICE_TIMEOUT = 1
93METADATA_BASE_URL = 'http://169.254.169.254/'
94METADATA_BASE_URL_IPv6 = 'http://[fd00:ec2::254]/'
95METADATA_ENDPOINT_MODES = ('ipv4', 'ipv6')
97# These are chars that do not need to be urlencoded.
98# Based on rfc2986, section 2.3
99SAFE_CHARS = '-._~'
100LABEL_RE = re.compile(r'[a-z0-9][a-z0-9\-]*[a-z0-9]')
101RETRYABLE_HTTP_ERRORS = (
102 ReadTimeoutError,
103 EndpointConnectionError,
104 ConnectionClosedError,
105 ConnectTimeoutError,
106)
107S3_ACCELERATE_WHITELIST = ['dualstack']
108# In switching events from using service name / endpoint prefix to service
109# id, we have to preserve compatibility. This maps the instances where either
110# is different than the transformed service id.
111EVENT_ALIASES = {
112 "api.mediatailor": "mediatailor",
113 "api.pricing": "pricing",
114 "api.sagemaker": "sagemaker",
115 "apigateway": "api-gateway",
116 "application-autoscaling": "application-auto-scaling",
117 "appstream2": "appstream",
118 "autoscaling": "auto-scaling",
119 "autoscaling-plans": "auto-scaling-plans",
120 "ce": "cost-explorer",
121 "cloudhsmv2": "cloudhsm-v2",
122 "cloudsearchdomain": "cloudsearch-domain",
123 "cognito-idp": "cognito-identity-provider",
124 "config": "config-service",
125 "cur": "cost-and-usage-report-service",
126 "data.iot": "iot-data-plane",
127 "data.jobs.iot": "iot-jobs-data-plane",
128 "data.mediastore": "mediastore-data",
129 "datapipeline": "data-pipeline",
130 "devicefarm": "device-farm",
131 "devices.iot1click": "iot-1click-devices-service",
132 "directconnect": "direct-connect",
133 "discovery": "application-discovery-service",
134 "dms": "database-migration-service",
135 "ds": "directory-service",
136 "dynamodbstreams": "dynamodb-streams",
137 "elasticbeanstalk": "elastic-beanstalk",
138 "elasticfilesystem": "efs",
139 "elasticloadbalancing": "elastic-load-balancing",
140 "elasticmapreduce": "emr",
141 "elastictranscoder": "elastic-transcoder",
142 "elb": "elastic-load-balancing",
143 "elbv2": "elastic-load-balancing-v2",
144 "email": "ses",
145 "entitlement.marketplace": "marketplace-entitlement-service",
146 "es": "elasticsearch-service",
147 "events": "eventbridge",
148 "cloudwatch-events": "eventbridge",
149 "iot-data": "iot-data-plane",
150 "iot-jobs-data": "iot-jobs-data-plane",
151 "iot1click-devices": "iot-1click-devices-service",
152 "iot1click-projects": "iot-1click-projects",
153 "kinesisanalytics": "kinesis-analytics",
154 "kinesisvideo": "kinesis-video",
155 "lex-models": "lex-model-building-service",
156 "lex-runtime": "lex-runtime-service",
157 "logs": "cloudwatch-logs",
158 "machinelearning": "machine-learning",
159 "marketplace-entitlement": "marketplace-entitlement-service",
160 "marketplacecommerceanalytics": "marketplace-commerce-analytics",
161 "metering.marketplace": "marketplace-metering",
162 "meteringmarketplace": "marketplace-metering",
163 "mgh": "migration-hub",
164 "models.lex": "lex-model-building-service",
165 "monitoring": "cloudwatch",
166 "mturk-requester": "mturk",
167 "opsworks-cm": "opsworkscm",
168 "projects.iot1click": "iot-1click-projects",
169 "resourcegroupstaggingapi": "resource-groups-tagging-api",
170 "route53": "route-53",
171 "route53domains": "route-53-domains",
172 "runtime.lex": "lex-runtime-service",
173 "runtime.sagemaker": "sagemaker-runtime",
174 "sdb": "simpledb",
175 "secretsmanager": "secrets-manager",
176 "serverlessrepo": "serverlessapplicationrepository",
177 "servicecatalog": "service-catalog",
178 "states": "sfn",
179 "stepfunctions": "sfn",
180 "storagegateway": "storage-gateway",
181 "streams.dynamodb": "dynamodb-streams",
182 "tagging": "resource-groups-tagging-api",
183}
186# This pattern can be used to detect if a header is a flexible checksum header
187CHECKSUM_HEADER_PATTERN = re.compile(
188 r'^X-Amz-Checksum-([a-z0-9]*)$',
189 flags=re.IGNORECASE,
190)
193def ensure_boolean(val):
194 """Ensures a boolean value if a string or boolean is provided
196 For strings, the value for True/False is case insensitive
197 """
198 if isinstance(val, bool):
199 return val
200 elif isinstance(val, str):
201 return val.lower() == 'true'
202 else:
203 return False
206def resolve_imds_endpoint_mode(session):
207 """Resolving IMDS endpoint mode to either IPv6 or IPv4.
209 ec2_metadata_service_endpoint_mode takes precedence over imds_use_ipv6.
210 """
211 endpoint_mode = session.get_config_variable(
212 'ec2_metadata_service_endpoint_mode'
213 )
214 if endpoint_mode is not None:
215 lendpoint_mode = endpoint_mode.lower()
216 if lendpoint_mode not in METADATA_ENDPOINT_MODES:
217 error_msg_kwargs = {
218 'mode': endpoint_mode,
219 'valid_modes': METADATA_ENDPOINT_MODES,
220 }
221 raise InvalidIMDSEndpointModeError(**error_msg_kwargs)
222 return lendpoint_mode
223 elif session.get_config_variable('imds_use_ipv6'):
224 return 'ipv6'
225 return 'ipv4'
228def is_json_value_header(shape):
229 """Determines if the provided shape is the special header type jsonvalue.
231 :type shape: botocore.shape
232 :param shape: Shape to be inspected for the jsonvalue trait.
234 :return: True if this type is a jsonvalue, False otherwise
235 :rtype: Bool
236 """
237 return (
238 hasattr(shape, 'serialization')
239 and shape.serialization.get('jsonvalue', False)
240 and shape.serialization.get('location') == 'header'
241 and shape.type_name == 'string'
242 )
245def has_header(header_name, headers):
246 """Case-insensitive check for header key."""
247 if header_name is None:
248 return False
249 elif isinstance(headers, botocore.awsrequest.HeadersDict):
250 return header_name in headers
251 else:
252 return header_name.lower() in [key.lower() for key in headers.keys()]
255def get_service_module_name(service_model):
256 """Returns the module name for a service
258 This is the value used in both the documentation and client class name
259 """
260 name = service_model.metadata.get(
261 'serviceAbbreviation',
262 service_model.metadata.get(
263 'serviceFullName', service_model.service_name
264 ),
265 )
266 name = name.replace('Amazon', '')
267 name = name.replace('AWS', '')
268 name = re.sub(r'\W+', '', name)
269 return name
272def normalize_url_path(path):
273 if not path:
274 return '/'
275 return remove_dot_segments(path)
278def normalize_boolean(val):
279 """Returns None if val is None, otherwise ensure value
280 converted to boolean"""
281 if val is None:
282 return val
283 else:
284 return ensure_boolean(val)
287def remove_dot_segments(url):
288 # RFC 3986, section 5.2.4 "Remove Dot Segments"
289 # Also, AWS services require consecutive slashes to be removed,
290 # so that's done here as well
291 if not url:
292 return ''
293 input_url = url.split('/')
294 output_list = []
295 for x in input_url:
296 if x and x != '.':
297 if x == '..':
298 if output_list:
299 output_list.pop()
300 else:
301 output_list.append(x)
303 if url[0] == '/':
304 first = '/'
305 else:
306 first = ''
307 if url[-1] == '/' and output_list:
308 last = '/'
309 else:
310 last = ''
311 return first + '/'.join(output_list) + last
314def validate_jmespath_for_set(expression):
315 # Validates a limited jmespath expression to determine if we can set a
316 # value based on it. Only works with dotted paths.
317 if not expression or expression == '.':
318 raise InvalidExpressionError(expression=expression)
320 for invalid in ['[', ']', '*']:
321 if invalid in expression:
322 raise InvalidExpressionError(expression=expression)
325def set_value_from_jmespath(source, expression, value, is_first=True):
326 # This takes a (limited) jmespath-like expression & can set a value based
327 # on it.
328 # Limitations:
329 # * Only handles dotted lookups
330 # * No offsets/wildcards/slices/etc.
331 if is_first:
332 validate_jmespath_for_set(expression)
334 bits = expression.split('.', 1)
335 current_key, remainder = bits[0], bits[1] if len(bits) > 1 else ''
337 if not current_key:
338 raise InvalidExpressionError(expression=expression)
340 if remainder:
341 if current_key not in source:
342 # We've got something in the expression that's not present in the
343 # source (new key). If there's any more bits, we'll set the key
344 # with an empty dictionary.
345 source[current_key] = {}
347 return set_value_from_jmespath(
348 source[current_key], remainder, value, is_first=False
349 )
351 # If we're down to a single key, set it.
352 source[current_key] = value
355def is_global_accesspoint(context):
356 """Determine if request is intended for an MRAP accesspoint."""
357 s3_accesspoint = context.get('s3_accesspoint', {})
358 is_global = s3_accesspoint.get('region') == ''
359 return is_global
362class _RetriesExceededError(Exception):
363 """Internal exception used when the number of retries are exceeded."""
365 pass
368class BadIMDSRequestError(Exception):
369 def __init__(self, request):
370 self.request = request
373class IMDSFetcher:
374 _RETRIES_EXCEEDED_ERROR_CLS = _RetriesExceededError
375 _TOKEN_PATH = 'latest/api/token'
376 _TOKEN_TTL = '21600'
378 def __init__(
379 self,
380 timeout=DEFAULT_METADATA_SERVICE_TIMEOUT,
381 num_attempts=1,
382 base_url=METADATA_BASE_URL,
383 env=None,
384 user_agent=None,
385 config=None,
386 ):
387 self._timeout = timeout
388 self._num_attempts = num_attempts
389 if config is None:
390 config = {}
391 self._base_url = self._select_base_url(base_url, config)
392 self._config = config
394 if env is None:
395 env = os.environ.copy()
396 self._disabled = (
397 env.get('AWS_EC2_METADATA_DISABLED', 'false').lower() == 'true'
398 )
399 self._imds_v1_disabled = config.get('ec2_metadata_v1_disabled')
400 self._user_agent = user_agent
401 self._session = botocore.httpsession.URLLib3Session(
402 timeout=self._timeout,
403 proxies=get_environ_proxies(self._base_url),
404 )
406 def get_base_url(self):
407 return self._base_url
409 def _select_base_url(self, base_url, config):
410 if config is None:
411 config = {}
413 requires_ipv6 = (
414 config.get('ec2_metadata_service_endpoint_mode') == 'ipv6'
415 )
416 custom_metadata_endpoint = config.get('ec2_metadata_service_endpoint')
418 if requires_ipv6 and custom_metadata_endpoint:
419 logger.warning(
420 "Custom endpoint and IMDS_USE_IPV6 are both set. Using custom endpoint."
421 )
423 chosen_base_url = None
425 if base_url != METADATA_BASE_URL:
426 chosen_base_url = base_url
427 elif custom_metadata_endpoint:
428 chosen_base_url = custom_metadata_endpoint
429 elif requires_ipv6:
430 chosen_base_url = METADATA_BASE_URL_IPv6
431 else:
432 chosen_base_url = METADATA_BASE_URL
434 logger.debug(f"IMDS ENDPOINT: {chosen_base_url}")
435 if not is_valid_uri(chosen_base_url):
436 raise InvalidIMDSEndpointError(endpoint=chosen_base_url)
438 return chosen_base_url
440 def _construct_url(self, path):
441 sep = ''
442 if self._base_url and not self._base_url.endswith('/'):
443 sep = '/'
444 return f'{self._base_url}{sep}{path}'
446 def _fetch_metadata_token(self):
447 self._assert_enabled()
448 url = self._construct_url(self._TOKEN_PATH)
449 headers = {
450 'x-aws-ec2-metadata-token-ttl-seconds': self._TOKEN_TTL,
451 }
452 self._add_user_agent(headers)
453 request = botocore.awsrequest.AWSRequest(
454 method='PUT', url=url, headers=headers
455 )
456 for i in range(self._num_attempts):
457 try:
458 response = self._session.send(request.prepare())
459 if response.status_code == 200:
460 return response.text
461 elif response.status_code in (404, 403, 405):
462 return None
463 elif response.status_code in (400,):
464 raise BadIMDSRequestError(request)
465 except ReadTimeoutError:
466 return None
467 except RETRYABLE_HTTP_ERRORS as e:
468 logger.debug(
469 "Caught retryable HTTP exception while making metadata "
470 "service request to %s: %s",
471 url,
472 e,
473 exc_info=True,
474 )
475 except HTTPClientError as e:
476 if isinstance(e.kwargs.get('error'), LocationParseError):
477 raise InvalidIMDSEndpointError(endpoint=url, error=e)
478 else:
479 raise
480 return None
482 def _get_request(self, url_path, retry_func, token=None):
483 """Make a get request to the Instance Metadata Service.
485 :type url_path: str
486 :param url_path: The path component of the URL to make a get request.
487 This arg is appended to the base_url that was provided in the
488 initializer.
490 :type retry_func: callable
491 :param retry_func: A function that takes the response as an argument
492 and determines if it needs to retry. By default empty and non
493 200 OK responses are retried.
495 :type token: str
496 :param token: Metadata token to send along with GET requests to IMDS.
497 """
498 self._assert_enabled()
499 if not token:
500 self._assert_v1_enabled()
501 if retry_func is None:
502 retry_func = self._default_retry
503 url = self._construct_url(url_path)
504 headers = {}
505 if token is not None:
506 headers['x-aws-ec2-metadata-token'] = token
507 self._add_user_agent(headers)
508 for i in range(self._num_attempts):
509 try:
510 request = botocore.awsrequest.AWSRequest(
511 method='GET', url=url, headers=headers
512 )
513 response = self._session.send(request.prepare())
514 if not retry_func(response):
515 return response
516 except RETRYABLE_HTTP_ERRORS as e:
517 logger.debug(
518 "Caught retryable HTTP exception while making metadata "
519 "service request to %s: %s",
520 url,
521 e,
522 exc_info=True,
523 )
524 raise self._RETRIES_EXCEEDED_ERROR_CLS()
526 def _add_user_agent(self, headers):
527 if self._user_agent is not None:
528 headers['User-Agent'] = self._user_agent
530 def _assert_enabled(self):
531 if self._disabled:
532 logger.debug("Access to EC2 metadata has been disabled.")
533 raise self._RETRIES_EXCEEDED_ERROR_CLS()
535 def _assert_v1_enabled(self):
536 if self._imds_v1_disabled:
537 raise MetadataRetrievalError(
538 error_msg="Unable to retrieve token for use in IMDSv2 call and IMDSv1 has been disabled"
539 )
541 def _default_retry(self, response):
542 return self._is_non_ok_response(response) or self._is_empty(response)
544 def _is_non_ok_response(self, response):
545 if response.status_code != 200:
546 self._log_imds_response(response, 'non-200', log_body=True)
547 return True
548 return False
550 def _is_empty(self, response):
551 if not response.content:
552 self._log_imds_response(response, 'no body', log_body=True)
553 return True
554 return False
556 def _log_imds_response(self, response, reason_to_log, log_body=False):
557 statement = (
558 "Metadata service returned %s response "
559 "with status code of %s for url: %s"
560 )
561 logger_args = [reason_to_log, response.status_code, response.url]
562 if log_body:
563 statement += ", content body: %s"
564 logger_args.append(response.content)
565 logger.debug(statement, *logger_args)
568class InstanceMetadataFetcher(IMDSFetcher):
569 _URL_PATH = 'latest/meta-data/iam/security-credentials/'
570 _REQUIRED_CREDENTIAL_FIELDS = [
571 'AccessKeyId',
572 'SecretAccessKey',
573 'Token',
574 'Expiration',
575 ]
577 def retrieve_iam_role_credentials(self):
578 try:
579 token = self._fetch_metadata_token()
580 role_name = self._get_iam_role(token)
581 credentials = self._get_credentials(role_name, token)
582 if self._contains_all_credential_fields(credentials):
583 credentials = {
584 'role_name': role_name,
585 'access_key': credentials['AccessKeyId'],
586 'secret_key': credentials['SecretAccessKey'],
587 'token': credentials['Token'],
588 'expiry_time': credentials['Expiration'],
589 }
590 self._evaluate_expiration(credentials)
591 return credentials
592 else:
593 # IMDS can return a 200 response that has a JSON formatted
594 # error message (i.e. if ec2 is not trusted entity for the
595 # attached role). We do not necessarily want to retry for
596 # these and we also do not necessarily want to raise a key
597 # error. So at least log the problematic response and return
598 # an empty dictionary to signal that it was not able to
599 # retrieve credentials. These error will contain both a
600 # Code and Message key.
601 if 'Code' in credentials and 'Message' in credentials:
602 logger.debug(
603 'Error response received when retrieving'
604 'credentials: %s.',
605 credentials,
606 )
607 return {}
608 except self._RETRIES_EXCEEDED_ERROR_CLS:
609 logger.debug(
610 "Max number of attempts exceeded (%s) when "
611 "attempting to retrieve data from metadata service.",
612 self._num_attempts,
613 )
614 except BadIMDSRequestError as e:
615 logger.debug("Bad IMDS request: %s", e.request)
616 return {}
618 def _get_iam_role(self, token=None):
619 return self._get_request(
620 url_path=self._URL_PATH,
621 retry_func=self._needs_retry_for_role_name,
622 token=token,
623 ).text
625 def _get_credentials(self, role_name, token=None):
626 r = self._get_request(
627 url_path=self._URL_PATH + role_name,
628 retry_func=self._needs_retry_for_credentials,
629 token=token,
630 )
631 return json.loads(r.text)
633 def _is_invalid_json(self, response):
634 try:
635 json.loads(response.text)
636 return False
637 except ValueError:
638 self._log_imds_response(response, 'invalid json')
639 return True
641 def _needs_retry_for_role_name(self, response):
642 return self._is_non_ok_response(response) or self._is_empty(response)
644 def _needs_retry_for_credentials(self, response):
645 return (
646 self._is_non_ok_response(response)
647 or self._is_empty(response)
648 or self._is_invalid_json(response)
649 )
651 def _contains_all_credential_fields(self, credentials):
652 for field in self._REQUIRED_CREDENTIAL_FIELDS:
653 if field not in credentials:
654 logger.debug(
655 'Retrieved credentials is missing required field: %s',
656 field,
657 )
658 return False
659 return True
661 def _evaluate_expiration(self, credentials):
662 expiration = credentials.get("expiry_time")
663 if expiration is None:
664 return
665 try:
666 expiration = datetime.datetime.strptime(
667 expiration, "%Y-%m-%dT%H:%M:%SZ"
668 )
669 refresh_interval = self._config.get(
670 "ec2_credential_refresh_window", 60 * 10
671 )
672 jitter = random.randint(120, 600) # Between 2 to 10 minutes
673 refresh_interval_with_jitter = refresh_interval + jitter
674 current_time = datetime.datetime.utcnow()
675 refresh_offset = datetime.timedelta(
676 seconds=refresh_interval_with_jitter
677 )
678 extension_time = expiration - refresh_offset
679 if current_time >= extension_time:
680 new_time = current_time + refresh_offset
681 credentials["expiry_time"] = new_time.strftime(
682 "%Y-%m-%dT%H:%M:%SZ"
683 )
684 logger.info(
685 f"Attempting credential expiration extension due to a "
686 f"credential service availability issue. A refresh of "
687 f"these credentials will be attempted again within "
688 f"the next {refresh_interval_with_jitter/60:.0f} minutes."
689 )
690 except ValueError:
691 logger.debug(
692 f"Unable to parse expiry_time in {credentials['expiry_time']}"
693 )
696class IMDSRegionProvider:
697 def __init__(self, session, environ=None, fetcher=None):
698 """Initialize IMDSRegionProvider.
699 :type session: :class:`botocore.session.Session`
700 :param session: The session is needed to look up configuration for
701 how to contact the instance metadata service. Specifically the
702 whether or not it should use the IMDS region at all, and if so how
703 to configure the timeout and number of attempts to reach the
704 service.
705 :type environ: None or dict
706 :param environ: A dictionary of environment variables to use. If
707 ``None`` is the argument then ``os.environ`` will be used by
708 default.
709 :type fecther: :class:`botocore.utils.InstanceMetadataRegionFetcher`
710 :param fetcher: The class to actually handle the fetching of the region
711 from the IMDS. If not provided a default one will be created.
712 """
713 self._session = session
714 if environ is None:
715 environ = os.environ
716 self._environ = environ
717 self._fetcher = fetcher
719 def provide(self):
720 """Provide the region value from IMDS."""
721 instance_region = self._get_instance_metadata_region()
722 return instance_region
724 def _get_instance_metadata_region(self):
725 fetcher = self._get_fetcher()
726 region = fetcher.retrieve_region()
727 return region
729 def _get_fetcher(self):
730 if self._fetcher is None:
731 self._fetcher = self._create_fetcher()
732 return self._fetcher
734 def _create_fetcher(self):
735 metadata_timeout = self._session.get_config_variable(
736 'metadata_service_timeout'
737 )
738 metadata_num_attempts = self._session.get_config_variable(
739 'metadata_service_num_attempts'
740 )
741 imds_config = {
742 'ec2_metadata_service_endpoint': self._session.get_config_variable(
743 'ec2_metadata_service_endpoint'
744 ),
745 'ec2_metadata_service_endpoint_mode': resolve_imds_endpoint_mode(
746 self._session
747 ),
748 'ec2_metadata_v1_disabled': self._session.get_config_variable(
749 'ec2_metadata_v1_disabled'
750 ),
751 }
752 fetcher = InstanceMetadataRegionFetcher(
753 timeout=metadata_timeout,
754 num_attempts=metadata_num_attempts,
755 env=self._environ,
756 user_agent=self._session.user_agent(),
757 config=imds_config,
758 )
759 return fetcher
762class InstanceMetadataRegionFetcher(IMDSFetcher):
763 _URL_PATH = 'latest/meta-data/placement/availability-zone/'
765 def retrieve_region(self):
766 """Get the current region from the instance metadata service.
767 :rvalue: str
768 :returns: The region the current instance is running in or None
769 if the instance metadata service cannot be contacted or does not
770 give a valid response.
771 :rtype: None or str
772 :returns: Returns the region as a string if it is configured to use
773 IMDS as a region source. Otherwise returns ``None``. It will also
774 return ``None`` if it fails to get the region from IMDS due to
775 exhausting its retries or not being able to connect.
776 """
777 try:
778 region = self._get_region()
779 return region
780 except self._RETRIES_EXCEEDED_ERROR_CLS:
781 logger.debug(
782 "Max number of attempts exceeded (%s) when "
783 "attempting to retrieve data from metadata service.",
784 self._num_attempts,
785 )
786 return None
788 def _get_region(self):
789 token = self._fetch_metadata_token()
790 response = self._get_request(
791 url_path=self._URL_PATH,
792 retry_func=self._default_retry,
793 token=token,
794 )
795 availability_zone = response.text
796 region = availability_zone[:-1]
797 return region
800def merge_dicts(dict1, dict2, append_lists=False):
801 """Given two dict, merge the second dict into the first.
803 The dicts can have arbitrary nesting.
805 :param append_lists: If true, instead of clobbering a list with the new
806 value, append all of the new values onto the original list.
807 """
808 for key in dict2:
809 if isinstance(dict2[key], dict):
810 if key in dict1 and key in dict2:
811 merge_dicts(dict1[key], dict2[key])
812 else:
813 dict1[key] = dict2[key]
814 # If the value is a list and the ``append_lists`` flag is set,
815 # append the new values onto the original list
816 elif isinstance(dict2[key], list) and append_lists:
817 # The value in dict1 must be a list in order to append new
818 # values onto it.
819 if key in dict1 and isinstance(dict1[key], list):
820 dict1[key].extend(dict2[key])
821 else:
822 dict1[key] = dict2[key]
823 else:
824 # At scalar types, we iterate and merge the
825 # current dict that we're on.
826 dict1[key] = dict2[key]
829def lowercase_dict(original):
830 """Copies the given dictionary ensuring all keys are lowercase strings."""
831 copy = {}
832 for key in original:
833 copy[key.lower()] = original[key]
834 return copy
837def parse_key_val_file(filename, _open=open):
838 try:
839 with _open(filename) as f:
840 contents = f.read()
841 return parse_key_val_file_contents(contents)
842 except OSError:
843 raise ConfigNotFound(path=filename)
846def parse_key_val_file_contents(contents):
847 # This was originally extracted from the EC2 credential provider, which was
848 # fairly lenient in its parsing. We only try to parse key/val pairs if
849 # there's a '=' in the line.
850 final = {}
851 for line in contents.splitlines():
852 if '=' not in line:
853 continue
854 key, val = line.split('=', 1)
855 key = key.strip()
856 val = val.strip()
857 final[key] = val
858 return final
861def percent_encode_sequence(mapping, safe=SAFE_CHARS):
862 """Urlencode a dict or list into a string.
864 This is similar to urllib.urlencode except that:
866 * It uses quote, and not quote_plus
867 * It has a default list of safe chars that don't need
868 to be encoded, which matches what AWS services expect.
870 If any value in the input ``mapping`` is a list type,
871 then each list element wil be serialized. This is the equivalent
872 to ``urlencode``'s ``doseq=True`` argument.
874 This function should be preferred over the stdlib
875 ``urlencode()`` function.
877 :param mapping: Either a dict to urlencode or a list of
878 ``(key, value)`` pairs.
880 """
881 encoded_pairs = []
882 if hasattr(mapping, 'items'):
883 pairs = mapping.items()
884 else:
885 pairs = mapping
886 for key, value in pairs:
887 if isinstance(value, list):
888 for element in value:
889 encoded_pairs.append(
890 f'{percent_encode(key)}={percent_encode(element)}'
891 )
892 else:
893 encoded_pairs.append(
894 f'{percent_encode(key)}={percent_encode(value)}'
895 )
896 return '&'.join(encoded_pairs)
899def percent_encode(input_str, safe=SAFE_CHARS):
900 """Urlencodes a string.
902 Whereas percent_encode_sequence handles taking a dict/sequence and
903 producing a percent encoded string, this function deals only with
904 taking a string (not a dict/sequence) and percent encoding it.
906 If given the binary type, will simply URL encode it. If given the
907 text type, will produce the binary type by UTF-8 encoding the
908 text. If given something else, will convert it to the text type
909 first.
910 """
911 # If its not a binary or text string, make it a text string.
912 if not isinstance(input_str, (bytes, str)):
913 input_str = str(input_str)
914 # If it's not bytes, make it bytes by UTF-8 encoding it.
915 if not isinstance(input_str, bytes):
916 input_str = input_str.encode('utf-8')
917 return quote(input_str, safe=safe)
920def _epoch_seconds_to_datetime(value, tzinfo):
921 """Parse numerical epoch timestamps (seconds since 1970) into a
922 ``datetime.datetime`` in UTC using ``datetime.timedelta``. This is intended
923 as fallback when ``fromtimestamp`` raises ``OverflowError`` or ``OSError``.
925 :type value: float or int
926 :param value: The Unix timestamps as number.
928 :type tzinfo: callable
929 :param tzinfo: A ``datetime.tzinfo`` class or compatible callable.
930 """
931 epoch_zero = datetime.datetime(1970, 1, 1, 0, 0, 0, tzinfo=tzutc())
932 epoch_zero_localized = epoch_zero.astimezone(tzinfo())
933 return epoch_zero_localized + datetime.timedelta(seconds=value)
936def _parse_timestamp_with_tzinfo(value, tzinfo):
937 """Parse timestamp with pluggable tzinfo options."""
938 if isinstance(value, (int, float)):
939 # Possibly an epoch time.
940 return datetime.datetime.fromtimestamp(value, tzinfo())
941 else:
942 try:
943 return datetime.datetime.fromtimestamp(float(value), tzinfo())
944 except (TypeError, ValueError):
945 pass
946 try:
947 # In certain cases, a timestamp marked with GMT can be parsed into a
948 # different time zone, so here we provide a context which will
949 # enforce that GMT == UTC.
950 return dateutil.parser.parse(value, tzinfos={'GMT': tzutc()})
951 except (TypeError, ValueError) as e:
952 raise ValueError(f'Invalid timestamp "{value}": {e}')
955def parse_timestamp(value):
956 """Parse a timestamp into a datetime object.
958 Supported formats:
960 * iso8601
961 * rfc822
962 * epoch (value is an integer)
964 This will return a ``datetime.datetime`` object.
966 """
967 tzinfo_options = get_tzinfo_options()
968 for tzinfo in tzinfo_options:
969 try:
970 return _parse_timestamp_with_tzinfo(value, tzinfo)
971 except (OSError, OverflowError) as e:
972 logger.debug(
973 'Unable to parse timestamp with "%s" timezone info.',
974 tzinfo.__name__,
975 exc_info=e,
976 )
977 # For numeric values attempt fallback to using fromtimestamp-free method.
978 # From Python's ``datetime.datetime.fromtimestamp`` documentation: "This
979 # may raise ``OverflowError``, if the timestamp is out of the range of
980 # values supported by the platform C localtime() function, and ``OSError``
981 # on localtime() failure. It's common for this to be restricted to years
982 # from 1970 through 2038."
983 try:
984 numeric_value = float(value)
985 except (TypeError, ValueError):
986 pass
987 else:
988 try:
989 for tzinfo in tzinfo_options:
990 return _epoch_seconds_to_datetime(numeric_value, tzinfo=tzinfo)
991 except (OSError, OverflowError) as e:
992 logger.debug(
993 'Unable to parse timestamp using fallback method with "%s" '
994 'timezone info.',
995 tzinfo.__name__,
996 exc_info=e,
997 )
998 raise RuntimeError(
999 f'Unable to calculate correct timezone offset for "{value}"'
1000 )
1003def parse_to_aware_datetime(value):
1004 """Converted the passed in value to a datetime object with tzinfo.
1006 This function can be used to normalize all timestamp inputs. This
1007 function accepts a number of different types of inputs, but
1008 will always return a datetime.datetime object with time zone
1009 information.
1011 The input param ``value`` can be one of several types:
1013 * A datetime object (both naive and aware)
1014 * An integer representing the epoch time (can also be a string
1015 of the integer, i.e '0', instead of 0). The epoch time is
1016 considered to be UTC.
1017 * An iso8601 formatted timestamp. This does not need to be
1018 a complete timestamp, it can contain just the date portion
1019 without the time component.
1021 The returned value will be a datetime object that will have tzinfo.
1022 If no timezone info was provided in the input value, then UTC is
1023 assumed, not local time.
1025 """
1026 # This is a general purpose method that handles several cases of
1027 # converting the provided value to a string timestamp suitable to be
1028 # serialized to an http request. It can handle:
1029 # 1) A datetime.datetime object.
1030 if isinstance(value, _DatetimeClass):
1031 datetime_obj = value
1032 else:
1033 # 2) A string object that's formatted as a timestamp.
1034 # We document this as being an iso8601 timestamp, although
1035 # parse_timestamp is a bit more flexible.
1036 datetime_obj = parse_timestamp(value)
1037 if datetime_obj.tzinfo is None:
1038 # I think a case would be made that if no time zone is provided,
1039 # we should use the local time. However, to restore backwards
1040 # compat, the previous behavior was to assume UTC, which is
1041 # what we're going to do here.
1042 datetime_obj = datetime_obj.replace(tzinfo=tzutc())
1043 else:
1044 datetime_obj = datetime_obj.astimezone(tzutc())
1045 return datetime_obj
1048def datetime2timestamp(dt, default_timezone=None):
1049 """Calculate the timestamp based on the given datetime instance.
1051 :type dt: datetime
1052 :param dt: A datetime object to be converted into timestamp
1053 :type default_timezone: tzinfo
1054 :param default_timezone: If it is provided as None, we treat it as tzutc().
1055 But it is only used when dt is a naive datetime.
1056 :returns: The timestamp
1057 """
1058 epoch = datetime.datetime(1970, 1, 1)
1059 if dt.tzinfo is None:
1060 if default_timezone is None:
1061 default_timezone = tzutc()
1062 dt = dt.replace(tzinfo=default_timezone)
1063 d = dt.replace(tzinfo=None) - dt.utcoffset() - epoch
1064 return d.total_seconds()
1067def calculate_sha256(body, as_hex=False):
1068 """Calculate a sha256 checksum.
1070 This method will calculate the sha256 checksum of a file like
1071 object. Note that this method will iterate through the entire
1072 file contents. The caller is responsible for ensuring the proper
1073 starting position of the file and ``seek()``'ing the file back
1074 to its starting location if other consumers need to read from
1075 the file like object.
1077 :param body: Any file like object. The file must be opened
1078 in binary mode such that a ``.read()`` call returns bytes.
1079 :param as_hex: If True, then the hex digest is returned.
1080 If False, then the digest (as binary bytes) is returned.
1082 :returns: The sha256 checksum
1084 """
1085 checksum = hashlib.sha256()
1086 for chunk in iter(lambda: body.read(1024 * 1024), b''):
1087 checksum.update(chunk)
1088 if as_hex:
1089 return checksum.hexdigest()
1090 else:
1091 return checksum.digest()
1094def calculate_tree_hash(body):
1095 """Calculate a tree hash checksum.
1097 For more information see:
1099 http://docs.aws.amazon.com/amazonglacier/latest/dev/checksum-calculations.html
1101 :param body: Any file like object. This has the same constraints as
1102 the ``body`` param in calculate_sha256
1104 :rtype: str
1105 :returns: The hex version of the calculated tree hash
1107 """
1108 chunks = []
1109 required_chunk_size = 1024 * 1024
1110 sha256 = hashlib.sha256
1111 for chunk in iter(lambda: body.read(required_chunk_size), b''):
1112 chunks.append(sha256(chunk).digest())
1113 if not chunks:
1114 return sha256(b'').hexdigest()
1115 while len(chunks) > 1:
1116 new_chunks = []
1117 for first, second in _in_pairs(chunks):
1118 if second is not None:
1119 new_chunks.append(sha256(first + second).digest())
1120 else:
1121 # We're at the end of the list and there's no pair left.
1122 new_chunks.append(first)
1123 chunks = new_chunks
1124 return binascii.hexlify(chunks[0]).decode('ascii')
1127def _in_pairs(iterable):
1128 # Creates iterator that iterates over the list in pairs:
1129 # for a, b in _in_pairs([0, 1, 2, 3, 4]):
1130 # print(a, b)
1131 #
1132 # will print:
1133 # 0, 1
1134 # 2, 3
1135 # 4, None
1136 shared_iter = iter(iterable)
1137 # Note that zip_longest is a compat import that uses
1138 # the itertools izip_longest. This creates an iterator,
1139 # this call below does _not_ immediately create the list
1140 # of pairs.
1141 return zip_longest(shared_iter, shared_iter)
1144class CachedProperty:
1145 """A read only property that caches the initially computed value.
1147 This descriptor will only call the provided ``fget`` function once.
1148 Subsequent access to this property will return the cached value.
1150 """
1152 def __init__(self, fget):
1153 self._fget = fget
1155 def __get__(self, obj, cls):
1156 if obj is None:
1157 return self
1158 else:
1159 computed_value = self._fget(obj)
1160 obj.__dict__[self._fget.__name__] = computed_value
1161 return computed_value
1164class ArgumentGenerator:
1165 """Generate sample input based on a shape model.
1167 This class contains a ``generate_skeleton`` method that will take
1168 an input/output shape (created from ``botocore.model``) and generate
1169 a sample dictionary corresponding to the input/output shape.
1171 The specific values used are place holder values. For strings either an
1172 empty string or the member name can be used, for numbers 0 or 0.0 is used.
1173 The intended usage of this class is to generate the *shape* of the input
1174 structure.
1176 This can be useful for operations that have complex input shapes.
1177 This allows a user to just fill in the necessary data instead of
1178 worrying about the specific structure of the input arguments.
1180 Example usage::
1182 s = botocore.session.get_session()
1183 ddb = s.get_service_model('dynamodb')
1184 arg_gen = ArgumentGenerator()
1185 sample_input = arg_gen.generate_skeleton(
1186 ddb.operation_model('CreateTable').input_shape)
1187 print("Sample input for dynamodb.CreateTable: %s" % sample_input)
1189 """
1191 def __init__(self, use_member_names=False):
1192 self._use_member_names = use_member_names
1194 def generate_skeleton(self, shape):
1195 """Generate a sample input.
1197 :type shape: ``botocore.model.Shape``
1198 :param shape: The input shape.
1200 :return: The generated skeleton input corresponding to the
1201 provided input shape.
1203 """
1204 stack = []
1205 return self._generate_skeleton(shape, stack)
1207 def _generate_skeleton(self, shape, stack, name=''):
1208 stack.append(shape.name)
1209 try:
1210 if shape.type_name == 'structure':
1211 return self._generate_type_structure(shape, stack)
1212 elif shape.type_name == 'list':
1213 return self._generate_type_list(shape, stack)
1214 elif shape.type_name == 'map':
1215 return self._generate_type_map(shape, stack)
1216 elif shape.type_name == 'string':
1217 if self._use_member_names:
1218 return name
1219 if shape.enum:
1220 return random.choice(shape.enum)
1221 return ''
1222 elif shape.type_name in ['integer', 'long']:
1223 return 0
1224 elif shape.type_name in ['float', 'double']:
1225 return 0.0
1226 elif shape.type_name == 'boolean':
1227 return True
1228 elif shape.type_name == 'timestamp':
1229 return datetime.datetime(1970, 1, 1, 0, 0, 0)
1230 finally:
1231 stack.pop()
1233 def _generate_type_structure(self, shape, stack):
1234 if stack.count(shape.name) > 1:
1235 return {}
1236 skeleton = OrderedDict()
1237 for member_name, member_shape in shape.members.items():
1238 skeleton[member_name] = self._generate_skeleton(
1239 member_shape, stack, name=member_name
1240 )
1241 return skeleton
1243 def _generate_type_list(self, shape, stack):
1244 # For list elements we've arbitrarily decided to
1245 # return two elements for the skeleton list.
1246 name = ''
1247 if self._use_member_names:
1248 name = shape.member.name
1249 return [
1250 self._generate_skeleton(shape.member, stack, name),
1251 ]
1253 def _generate_type_map(self, shape, stack):
1254 key_shape = shape.key
1255 value_shape = shape.value
1256 assert key_shape.type_name == 'string'
1257 return OrderedDict(
1258 [
1259 ('KeyName', self._generate_skeleton(value_shape, stack)),
1260 ]
1261 )
1264def is_valid_ipv6_endpoint_url(endpoint_url):
1265 if UNSAFE_URL_CHARS.intersection(endpoint_url):
1266 return False
1267 hostname = f'[{urlparse(endpoint_url).hostname}]'
1268 return IPV6_ADDRZ_RE.match(hostname) is not None
1271def is_valid_ipv4_endpoint_url(endpoint_url):
1272 hostname = urlparse(endpoint_url).hostname
1273 return IPV4_RE.match(hostname) is not None
1276def is_valid_endpoint_url(endpoint_url):
1277 """Verify the endpoint_url is valid.
1279 :type endpoint_url: string
1280 :param endpoint_url: An endpoint_url. Must have at least a scheme
1281 and a hostname.
1283 :return: True if the endpoint url is valid. False otherwise.
1285 """
1286 # post-bpo-43882 urlsplit() strips unsafe characters from URL, causing
1287 # it to pass hostname validation below. Detect them early to fix that.
1288 if UNSAFE_URL_CHARS.intersection(endpoint_url):
1289 return False
1290 parts = urlsplit(endpoint_url)
1291 hostname = parts.hostname
1292 if hostname is None:
1293 return False
1294 if len(hostname) > 255:
1295 return False
1296 if hostname[-1] == ".":
1297 hostname = hostname[:-1]
1298 allowed = re.compile(
1299 r"^((?!-)[A-Z\d-]{1,63}(?<!-)\.)*((?!-)[A-Z\d-]{1,63}(?<!-))$",
1300 re.IGNORECASE,
1301 )
1302 return allowed.match(hostname)
1305def is_valid_uri(endpoint_url):
1306 return is_valid_endpoint_url(endpoint_url) or is_valid_ipv6_endpoint_url(
1307 endpoint_url
1308 )
1311def validate_region_name(region_name):
1312 """Provided region_name must be a valid host label."""
1313 if region_name is None:
1314 return
1315 valid_host_label = re.compile(r'^(?![0-9]+$)(?!-)[a-zA-Z0-9-]{,63}(?<!-)$')
1316 valid = valid_host_label.match(region_name)
1317 if not valid:
1318 raise InvalidRegionError(region_name=region_name)
1321def check_dns_name(bucket_name):
1322 """
1323 Check to see if the ``bucket_name`` complies with the
1324 restricted DNS naming conventions necessary to allow
1325 access via virtual-hosting style.
1327 Even though "." characters are perfectly valid in this DNS
1328 naming scheme, we are going to punt on any name containing a
1329 "." character because these will cause SSL cert validation
1330 problems if we try to use virtual-hosting style addressing.
1331 """
1332 if '.' in bucket_name:
1333 return False
1334 n = len(bucket_name)
1335 if n < 3 or n > 63:
1336 # Wrong length
1337 return False
1338 match = LABEL_RE.match(bucket_name)
1339 if match is None or match.end() != len(bucket_name):
1340 return False
1341 return True
1344def fix_s3_host(
1345 request,
1346 signature_version,
1347 region_name,
1348 default_endpoint_url=None,
1349 **kwargs,
1350):
1351 """
1352 This handler looks at S3 requests just before they are signed.
1353 If there is a bucket name on the path (true for everything except
1354 ListAllBuckets) it checks to see if that bucket name conforms to
1355 the DNS naming conventions. If it does, it alters the request to
1356 use ``virtual hosting`` style addressing rather than ``path-style``
1357 addressing.
1359 """
1360 if request.context.get('use_global_endpoint', False):
1361 default_endpoint_url = 's3.amazonaws.com'
1362 try:
1363 switch_to_virtual_host_style(
1364 request, signature_version, default_endpoint_url
1365 )
1366 except InvalidDNSNameError as e:
1367 bucket_name = e.kwargs['bucket_name']
1368 logger.debug(
1369 'Not changing URI, bucket is not DNS compatible: %s', bucket_name
1370 )
1373def switch_to_virtual_host_style(
1374 request, signature_version, default_endpoint_url=None, **kwargs
1375):
1376 """
1377 This is a handler to force virtual host style s3 addressing no matter
1378 the signature version (which is taken in consideration for the default
1379 case). If the bucket is not DNS compatible an InvalidDNSName is thrown.
1381 :param request: A AWSRequest object that is about to be sent.
1382 :param signature_version: The signature version to sign with
1383 :param default_endpoint_url: The endpoint to use when switching to a
1384 virtual style. If None is supplied, the virtual host will be
1385 constructed from the url of the request.
1386 """
1387 if request.auth_path is not None:
1388 # The auth_path has already been applied (this may be a
1389 # retried request). We don't need to perform this
1390 # customization again.
1391 return
1392 elif _is_get_bucket_location_request(request):
1393 # For the GetBucketLocation response, we should not be using
1394 # the virtual host style addressing so we can avoid any sigv4
1395 # issues.
1396 logger.debug(
1397 "Request is GetBucketLocation operation, not checking "
1398 "for DNS compatibility."
1399 )
1400 return
1401 parts = urlsplit(request.url)
1402 request.auth_path = parts.path
1403 path_parts = parts.path.split('/')
1405 # Retrieve what the endpoint we will be prepending the bucket name to.
1406 if default_endpoint_url is None:
1407 default_endpoint_url = parts.netloc
1409 if len(path_parts) > 1:
1410 bucket_name = path_parts[1]
1411 if not bucket_name:
1412 # If the bucket name is empty we should not be checking for
1413 # dns compatibility.
1414 return
1415 logger.debug('Checking for DNS compatible bucket for: %s', request.url)
1416 if check_dns_name(bucket_name):
1417 # If the operation is on a bucket, the auth_path must be
1418 # terminated with a '/' character.
1419 if len(path_parts) == 2:
1420 if request.auth_path[-1] != '/':
1421 request.auth_path += '/'
1422 path_parts.remove(bucket_name)
1423 # At the very least the path must be a '/', such as with the
1424 # CreateBucket operation when DNS style is being used. If this
1425 # is not used you will get an empty path which is incorrect.
1426 path = '/'.join(path_parts) or '/'
1427 global_endpoint = default_endpoint_url
1428 host = bucket_name + '.' + global_endpoint
1429 new_tuple = (parts.scheme, host, path, parts.query, '')
1430 new_uri = urlunsplit(new_tuple)
1431 request.url = new_uri
1432 logger.debug('URI updated to: %s', new_uri)
1433 else:
1434 raise InvalidDNSNameError(bucket_name=bucket_name)
1437def _is_get_bucket_location_request(request):
1438 return request.url.endswith('?location')
1441def instance_cache(func):
1442 """Method decorator for caching method calls to a single instance.
1444 **This is not a general purpose caching decorator.**
1446 In order to use this, you *must* provide an ``_instance_cache``
1447 attribute on the instance.
1449 This decorator is used to cache method calls. The cache is only
1450 scoped to a single instance though such that multiple instances
1451 will maintain their own cache. In order to keep things simple,
1452 this decorator requires that you provide an ``_instance_cache``
1453 attribute on your instance.
1455 """
1456 func_name = func.__name__
1458 @functools.wraps(func)
1459 def _cache_guard(self, *args, **kwargs):
1460 cache_key = (func_name, args)
1461 if kwargs:
1462 kwarg_items = tuple(sorted(kwargs.items()))
1463 cache_key = (func_name, args, kwarg_items)
1464 result = self._instance_cache.get(cache_key)
1465 if result is not None:
1466 return result
1467 result = func(self, *args, **kwargs)
1468 self._instance_cache[cache_key] = result
1469 return result
1471 return _cache_guard
1474def lru_cache_weakref(*cache_args, **cache_kwargs):
1475 """
1476 Version of functools.lru_cache that stores a weak reference to ``self``.
1478 Serves the same purpose as :py:func:`instance_cache` but uses Python's
1479 functools implementation which offers ``max_size`` and ``typed`` properties.
1481 lru_cache is a global cache even when used on a method. The cache's
1482 reference to ``self`` will prevent garbace collection of the object. This
1483 wrapper around functools.lru_cache replaces the reference to ``self`` with
1484 a weak reference to not interfere with garbage collection.
1485 """
1487 def wrapper(func):
1488 @functools.lru_cache(*cache_args, **cache_kwargs)
1489 def func_with_weakref(weakref_to_self, *args, **kwargs):
1490 return func(weakref_to_self(), *args, **kwargs)
1492 @functools.wraps(func)
1493 def inner(self, *args, **kwargs):
1494 return func_with_weakref(weakref.ref(self), *args, **kwargs)
1496 inner.cache_info = func_with_weakref.cache_info
1497 return inner
1499 return wrapper
1502def switch_host_s3_accelerate(request, operation_name, **kwargs):
1503 """Switches the current s3 endpoint with an S3 Accelerate endpoint"""
1505 # Note that when registered the switching of the s3 host happens
1506 # before it gets changed to virtual. So we are not concerned with ensuring
1507 # that the bucket name is translated to the virtual style here and we
1508 # can hard code the Accelerate endpoint.
1509 parts = urlsplit(request.url).netloc.split('.')
1510 parts = [p for p in parts if p in S3_ACCELERATE_WHITELIST]
1511 endpoint = 'https://s3-accelerate.'
1512 if len(parts) > 0:
1513 endpoint += '.'.join(parts) + '.'
1514 endpoint += 'amazonaws.com'
1516 if operation_name in ['ListBuckets', 'CreateBucket', 'DeleteBucket']:
1517 return
1518 _switch_hosts(request, endpoint, use_new_scheme=False)
1521def switch_host_with_param(request, param_name):
1522 """Switches the host using a parameter value from a JSON request body"""
1523 request_json = json.loads(request.data.decode('utf-8'))
1524 if request_json.get(param_name):
1525 new_endpoint = request_json[param_name]
1526 _switch_hosts(request, new_endpoint)
1529def _switch_hosts(request, new_endpoint, use_new_scheme=True):
1530 final_endpoint = _get_new_endpoint(
1531 request.url, new_endpoint, use_new_scheme
1532 )
1533 request.url = final_endpoint
1536def _get_new_endpoint(original_endpoint, new_endpoint, use_new_scheme=True):
1537 new_endpoint_components = urlsplit(new_endpoint)
1538 original_endpoint_components = urlsplit(original_endpoint)
1539 scheme = original_endpoint_components.scheme
1540 if use_new_scheme:
1541 scheme = new_endpoint_components.scheme
1542 final_endpoint_components = (
1543 scheme,
1544 new_endpoint_components.netloc,
1545 original_endpoint_components.path,
1546 original_endpoint_components.query,
1547 '',
1548 )
1549 final_endpoint = urlunsplit(final_endpoint_components)
1550 logger.debug(f'Updating URI from {original_endpoint} to {final_endpoint}')
1551 return final_endpoint
1554def deep_merge(base, extra):
1555 """Deeply two dictionaries, overriding existing keys in the base.
1557 :param base: The base dictionary which will be merged into.
1558 :param extra: The dictionary to merge into the base. Keys from this
1559 dictionary will take precedence.
1560 """
1561 for key in extra:
1562 # If the key represents a dict on both given dicts, merge the sub-dicts
1563 if (
1564 key in base
1565 and isinstance(base[key], dict)
1566 and isinstance(extra[key], dict)
1567 ):
1568 deep_merge(base[key], extra[key])
1569 continue
1571 # Otherwise, set the key on the base to be the value of the extra.
1572 base[key] = extra[key]
1575def hyphenize_service_id(service_id):
1576 """Translate the form used for event emitters.
1578 :param service_id: The service_id to convert.
1579 """
1580 return service_id.replace(' ', '-').lower()
1583class IdentityCache:
1584 """Base IdentityCache implementation for storing and retrieving
1585 highly accessed credentials.
1587 This class is not intended to be instantiated in user code.
1588 """
1590 METHOD = "base_identity_cache"
1592 def __init__(self, client, credential_cls):
1593 self._client = client
1594 self._credential_cls = credential_cls
1596 def get_credentials(self, **kwargs):
1597 callback = self.build_refresh_callback(**kwargs)
1598 metadata = callback()
1599 credential_entry = self._credential_cls.create_from_metadata(
1600 metadata=metadata,
1601 refresh_using=callback,
1602 method=self.METHOD,
1603 advisory_timeout=45,
1604 mandatory_timeout=10,
1605 )
1606 return credential_entry
1608 def build_refresh_callback(**kwargs):
1609 """Callback to be implemented by subclasses.
1611 Returns a set of metadata to be converted into a new
1612 credential instance.
1613 """
1614 raise NotImplementedError()
1617class S3ExpressIdentityCache(IdentityCache):
1618 """S3Express IdentityCache for retrieving and storing
1619 credentials from CreateSession calls.
1621 This class is not intended to be instantiated in user code.
1622 """
1624 METHOD = "s3express"
1626 def __init__(self, client, credential_cls):
1627 self._client = client
1628 self._credential_cls = credential_cls
1630 @functools.lru_cache(maxsize=100)
1631 def get_credentials(self, bucket):
1632 return super().get_credentials(bucket=bucket)
1634 def build_refresh_callback(self, bucket):
1635 def refresher():
1636 response = self._client.create_session(Bucket=bucket)
1637 creds = response['Credentials']
1638 expiration = self._serialize_if_needed(
1639 creds['Expiration'], iso=True
1640 )
1641 return {
1642 "access_key": creds['AccessKeyId'],
1643 "secret_key": creds['SecretAccessKey'],
1644 "token": creds['SessionToken'],
1645 "expiry_time": expiration,
1646 }
1648 return refresher
1650 def _serialize_if_needed(self, value, iso=False):
1651 if isinstance(value, _DatetimeClass):
1652 if iso:
1653 return value.isoformat()
1654 return value.strftime('%Y-%m-%dT%H:%M:%S%Z')
1655 return value
1658class S3ExpressIdentityResolver:
1659 def __init__(self, client, credential_cls, cache=None):
1660 self._client = weakref.proxy(client)
1662 if cache is None:
1663 cache = S3ExpressIdentityCache(self._client, credential_cls)
1664 self._cache = cache
1666 def register(self, event_emitter=None):
1667 logger.debug('Registering S3Express Identity Resolver')
1668 emitter = event_emitter or self._client.meta.events
1669 emitter.register('before-call.s3', self.apply_signing_cache_key)
1670 emitter.register('before-sign.s3', self.resolve_s3express_identity)
1672 def apply_signing_cache_key(self, params, context, **kwargs):
1673 endpoint_properties = context.get('endpoint_properties', {})
1674 backend = endpoint_properties.get('backend', None)
1676 # Add cache key if Bucket supplied for s3express request
1677 bucket_name = context.get('input_params', {}).get('Bucket')
1678 if backend == 'S3Express' and bucket_name is not None:
1679 context.setdefault('signing', {})
1680 context['signing']['cache_key'] = bucket_name
1682 def resolve_s3express_identity(
1683 self,
1684 request,
1685 signing_name,
1686 region_name,
1687 signature_version,
1688 request_signer,
1689 operation_name,
1690 **kwargs,
1691 ):
1692 signing_context = request.context.get('signing', {})
1693 signing_name = signing_context.get('signing_name')
1694 if signing_name == 's3express' and signature_version.startswith(
1695 'v4-s3express'
1696 ):
1697 signing_context['identity_cache'] = self._cache
1698 if 'cache_key' not in signing_context:
1699 signing_context['cache_key'] = (
1700 request.context.get('s3_redirect', {})
1701 .get('params', {})
1702 .get('Bucket')
1703 )
1706class S3RegionRedirectorv2:
1707 """Updated version of S3RegionRedirector for use when
1708 EndpointRulesetResolver is in use for endpoint resolution.
1710 This class is considered private and subject to abrupt breaking changes or
1711 removal without prior announcement. Please do not use it directly.
1712 """
1714 def __init__(self, endpoint_bridge, client, cache=None):
1715 self._cache = cache or {}
1716 self._client = weakref.proxy(client)
1718 def register(self, event_emitter=None):
1719 logger.debug('Registering S3 region redirector handler')
1720 emitter = event_emitter or self._client.meta.events
1721 emitter.register('needs-retry.s3', self.redirect_from_error)
1722 emitter.register(
1723 'before-parameter-build.s3', self.annotate_request_context
1724 )
1725 emitter.register(
1726 'before-endpoint-resolution.s3', self.redirect_from_cache
1727 )
1729 def redirect_from_error(self, request_dict, response, operation, **kwargs):
1730 """
1731 An S3 request sent to the wrong region will return an error that
1732 contains the endpoint the request should be sent to. This handler
1733 will add the redirect information to the signing context and then
1734 redirect the request.
1735 """
1736 if response is None:
1737 # This could be none if there was a ConnectionError or other
1738 # transport error.
1739 return
1741 redirect_ctx = request_dict.get('context', {}).get('s3_redirect', {})
1742 if ArnParser.is_arn(redirect_ctx.get('bucket')):
1743 logger.debug(
1744 'S3 request was previously for an Accesspoint ARN, not '
1745 'redirecting.'
1746 )
1747 return
1749 if redirect_ctx.get('redirected'):
1750 logger.debug(
1751 'S3 request was previously redirected, not redirecting.'
1752 )
1753 return
1755 error = response[1].get('Error', {})
1756 error_code = error.get('Code')
1757 response_metadata = response[1].get('ResponseMetadata', {})
1759 # We have to account for 400 responses because
1760 # if we sign a Head* request with the wrong region,
1761 # we'll get a 400 Bad Request but we won't get a
1762 # body saying it's an "AuthorizationHeaderMalformed".
1763 is_special_head_object = (
1764 error_code in ('301', '400') and operation.name == 'HeadObject'
1765 )
1766 is_special_head_bucket = (
1767 error_code in ('301', '400')
1768 and operation.name == 'HeadBucket'
1769 and 'x-amz-bucket-region'
1770 in response_metadata.get('HTTPHeaders', {})
1771 )
1772 is_wrong_signing_region = (
1773 error_code == 'AuthorizationHeaderMalformed' and 'Region' in error
1774 )
1775 is_redirect_status = response[0] is not None and response[
1776 0
1777 ].status_code in (301, 302, 307)
1778 is_permanent_redirect = error_code == 'PermanentRedirect'
1779 if not any(
1780 [
1781 is_special_head_object,
1782 is_wrong_signing_region,
1783 is_permanent_redirect,
1784 is_special_head_bucket,
1785 is_redirect_status,
1786 ]
1787 ):
1788 return
1790 bucket = request_dict['context']['s3_redirect']['bucket']
1791 client_region = request_dict['context'].get('client_region')
1792 new_region = self.get_bucket_region(bucket, response)
1794 if new_region is None:
1795 logger.debug(
1796 f"S3 client configured for region {client_region} but the "
1797 f"bucket {bucket} is not in that region and the proper region "
1798 "could not be automatically determined."
1799 )
1800 return
1802 logger.debug(
1803 f"S3 client configured for region {client_region} but the bucket {bucket} "
1804 f"is in region {new_region}; Please configure the proper region to "
1805 f"avoid multiple unnecessary redirects and signing attempts."
1806 )
1807 # Adding the new region to _cache will make construct_endpoint() to
1808 # use the new region as value for the AWS::Region builtin parameter.
1809 self._cache[bucket] = new_region
1811 # Re-resolve endpoint with new region and modify request_dict with
1812 # the new URL, auth scheme, and signing context.
1813 ep_resolver = self._client._ruleset_resolver
1814 ep_info = ep_resolver.construct_endpoint(
1815 operation_model=operation,
1816 call_args=request_dict['context']['s3_redirect']['params'],
1817 request_context=request_dict['context'],
1818 )
1819 request_dict['url'] = self.set_request_url(
1820 request_dict['url'], ep_info.url
1821 )
1822 request_dict['context']['s3_redirect']['redirected'] = True
1823 auth_schemes = ep_info.properties.get('authSchemes')
1824 if auth_schemes is not None:
1825 auth_info = ep_resolver.auth_schemes_to_signing_ctx(auth_schemes)
1826 auth_type, signing_context = auth_info
1827 request_dict['context']['auth_type'] = auth_type
1828 request_dict['context']['signing'] = {
1829 **request_dict['context'].get('signing', {}),
1830 **signing_context,
1831 }
1833 # Return 0 so it doesn't wait to retry
1834 return 0
1836 def get_bucket_region(self, bucket, response):
1837 """
1838 There are multiple potential sources for the new region to redirect to,
1839 but they aren't all universally available for use. This will try to
1840 find region from response elements, but will fall back to calling
1841 HEAD on the bucket if all else fails.
1843 :param bucket: The bucket to find the region for. This is necessary if
1844 the region is not available in the error response.
1845 :param response: A response representing a service request that failed
1846 due to incorrect region configuration.
1847 """
1848 # First try to source the region from the headers.
1849 service_response = response[1]
1850 response_headers = service_response['ResponseMetadata']['HTTPHeaders']
1851 if 'x-amz-bucket-region' in response_headers:
1852 return response_headers['x-amz-bucket-region']
1854 # Next, check the error body
1855 region = service_response.get('Error', {}).get('Region', None)
1856 if region is not None:
1857 return region
1859 # Finally, HEAD the bucket. No other choice sadly.
1860 try:
1861 response = self._client.head_bucket(Bucket=bucket)
1862 headers = response['ResponseMetadata']['HTTPHeaders']
1863 except ClientError as e:
1864 headers = e.response['ResponseMetadata']['HTTPHeaders']
1866 region = headers.get('x-amz-bucket-region', None)
1867 return region
1869 def set_request_url(self, old_url, new_endpoint, **kwargs):
1870 """
1871 Splice a new endpoint into an existing URL. Note that some endpoints
1872 from the the endpoint provider have a path component which will be
1873 discarded by this function.
1874 """
1875 return _get_new_endpoint(old_url, new_endpoint, False)
1877 def redirect_from_cache(self, builtins, params, **kwargs):
1878 """
1879 If a bucket name has been redirected before, it is in the cache. This
1880 handler will update the AWS::Region endpoint resolver builtin param
1881 to use the region from cache instead of the client region to avoid the
1882 redirect.
1883 """
1884 bucket = params.get('Bucket')
1885 if bucket is not None and bucket in self._cache:
1886 new_region = self._cache.get(bucket)
1887 builtins['AWS::Region'] = new_region
1889 def annotate_request_context(self, params, context, **kwargs):
1890 """Store the bucket name in context for later use when redirecting.
1891 The bucket name may be an access point ARN or alias.
1892 """
1893 bucket = params.get('Bucket')
1894 context['s3_redirect'] = {
1895 'redirected': False,
1896 'bucket': bucket,
1897 'params': params,
1898 }
1901class S3RegionRedirector:
1902 """This handler has been replaced by S3RegionRedirectorv2. The original
1903 version remains in place for any third-party libraries that import it.
1904 """
1906 def __init__(self, endpoint_bridge, client, cache=None):
1907 self._endpoint_resolver = endpoint_bridge
1908 self._cache = cache
1909 if self._cache is None:
1910 self._cache = {}
1912 # This needs to be a weak ref in order to prevent memory leaks on
1913 # python 2.6
1914 self._client = weakref.proxy(client)
1916 warnings.warn(
1917 'The S3RegionRedirector class has been deprecated for a new '
1918 'internal replacement. A future version of botocore may remove '
1919 'this class.',
1920 category=FutureWarning,
1921 )
1923 def register(self, event_emitter=None):
1924 emitter = event_emitter or self._client.meta.events
1925 emitter.register('needs-retry.s3', self.redirect_from_error)
1926 emitter.register('before-call.s3', self.set_request_url)
1927 emitter.register('before-parameter-build.s3', self.redirect_from_cache)
1929 def redirect_from_error(self, request_dict, response, operation, **kwargs):
1930 """
1931 An S3 request sent to the wrong region will return an error that
1932 contains the endpoint the request should be sent to. This handler
1933 will add the redirect information to the signing context and then
1934 redirect the request.
1935 """
1936 if response is None:
1937 # This could be none if there was a ConnectionError or other
1938 # transport error.
1939 return
1941 if self._is_s3_accesspoint(request_dict.get('context', {})):
1942 logger.debug(
1943 'S3 request was previously to an accesspoint, not redirecting.'
1944 )
1945 return
1947 if request_dict.get('context', {}).get('s3_redirected'):
1948 logger.debug(
1949 'S3 request was previously redirected, not redirecting.'
1950 )
1951 return
1953 error = response[1].get('Error', {})
1954 error_code = error.get('Code')
1955 response_metadata = response[1].get('ResponseMetadata', {})
1957 # We have to account for 400 responses because
1958 # if we sign a Head* request with the wrong region,
1959 # we'll get a 400 Bad Request but we won't get a
1960 # body saying it's an "AuthorizationHeaderMalformed".
1961 is_special_head_object = (
1962 error_code in ('301', '400') and operation.name == 'HeadObject'
1963 )
1964 is_special_head_bucket = (
1965 error_code in ('301', '400')
1966 and operation.name == 'HeadBucket'
1967 and 'x-amz-bucket-region'
1968 in response_metadata.get('HTTPHeaders', {})
1969 )
1970 is_wrong_signing_region = (
1971 error_code == 'AuthorizationHeaderMalformed' and 'Region' in error
1972 )
1973 is_redirect_status = response[0] is not None and response[
1974 0
1975 ].status_code in (301, 302, 307)
1976 is_permanent_redirect = error_code == 'PermanentRedirect'
1977 if not any(
1978 [
1979 is_special_head_object,
1980 is_wrong_signing_region,
1981 is_permanent_redirect,
1982 is_special_head_bucket,
1983 is_redirect_status,
1984 ]
1985 ):
1986 return
1988 bucket = request_dict['context']['signing']['bucket']
1989 client_region = request_dict['context'].get('client_region')
1990 new_region = self.get_bucket_region(bucket, response)
1992 if new_region is None:
1993 logger.debug(
1994 f"S3 client configured for region {client_region} but the bucket {bucket} is not "
1995 "in that region and the proper region could not be "
1996 "automatically determined."
1997 )
1998 return
2000 logger.debug(
2001 f"S3 client configured for region {client_region} but the bucket {bucket} is in region"
2002 f" {new_region}; Please configure the proper region to avoid multiple "
2003 "unnecessary redirects and signing attempts."
2004 )
2005 endpoint = self._endpoint_resolver.resolve('s3', new_region)
2006 endpoint = endpoint['endpoint_url']
2008 signing_context = {
2009 'region': new_region,
2010 'bucket': bucket,
2011 'endpoint': endpoint,
2012 }
2013 request_dict['context']['signing'] = signing_context
2015 self._cache[bucket] = signing_context
2016 self.set_request_url(request_dict, request_dict['context'])
2018 request_dict['context']['s3_redirected'] = True
2020 # Return 0 so it doesn't wait to retry
2021 return 0
2023 def get_bucket_region(self, bucket, response):
2024 """
2025 There are multiple potential sources for the new region to redirect to,
2026 but they aren't all universally available for use. This will try to
2027 find region from response elements, but will fall back to calling
2028 HEAD on the bucket if all else fails.
2030 :param bucket: The bucket to find the region for. This is necessary if
2031 the region is not available in the error response.
2032 :param response: A response representing a service request that failed
2033 due to incorrect region configuration.
2034 """
2035 # First try to source the region from the headers.
2036 service_response = response[1]
2037 response_headers = service_response['ResponseMetadata']['HTTPHeaders']
2038 if 'x-amz-bucket-region' in response_headers:
2039 return response_headers['x-amz-bucket-region']
2041 # Next, check the error body
2042 region = service_response.get('Error', {}).get('Region', None)
2043 if region is not None:
2044 return region
2046 # Finally, HEAD the bucket. No other choice sadly.
2047 try:
2048 response = self._client.head_bucket(Bucket=bucket)
2049 headers = response['ResponseMetadata']['HTTPHeaders']
2050 except ClientError as e:
2051 headers = e.response['ResponseMetadata']['HTTPHeaders']
2053 region = headers.get('x-amz-bucket-region', None)
2054 return region
2056 def set_request_url(self, params, context, **kwargs):
2057 endpoint = context.get('signing', {}).get('endpoint', None)
2058 if endpoint is not None:
2059 params['url'] = _get_new_endpoint(params['url'], endpoint, False)
2061 def redirect_from_cache(self, params, context, **kwargs):
2062 """
2063 This handler retrieves a given bucket's signing context from the cache
2064 and adds it into the request context.
2065 """
2066 if self._is_s3_accesspoint(context):
2067 return
2068 bucket = params.get('Bucket')
2069 signing_context = self._cache.get(bucket)
2070 if signing_context is not None:
2071 context['signing'] = signing_context
2072 else:
2073 context['signing'] = {'bucket': bucket}
2075 def _is_s3_accesspoint(self, context):
2076 return 's3_accesspoint' in context
2079class InvalidArnException(ValueError):
2080 pass
2083class ArnParser:
2084 def parse_arn(self, arn):
2085 arn_parts = arn.split(':', 5)
2086 if len(arn_parts) < 6:
2087 raise InvalidArnException(
2088 f'Provided ARN: {arn} must be of the format: '
2089 'arn:partition:service:region:account:resource'
2090 )
2091 return {
2092 'partition': arn_parts[1],
2093 'service': arn_parts[2],
2094 'region': arn_parts[3],
2095 'account': arn_parts[4],
2096 'resource': arn_parts[5],
2097 }
2099 @staticmethod
2100 def is_arn(value):
2101 if not isinstance(value, str) or not value.startswith('arn:'):
2102 return False
2103 arn_parser = ArnParser()
2104 try:
2105 arn_parser.parse_arn(value)
2106 return True
2107 except InvalidArnException:
2108 return False
2111class S3ArnParamHandler:
2112 _RESOURCE_REGEX = re.compile(
2113 r'^(?P<resource_type>accesspoint|outpost)[/:](?P<resource_name>.+)$'
2114 )
2115 _OUTPOST_RESOURCE_REGEX = re.compile(
2116 r'^(?P<outpost_name>[a-zA-Z0-9\-]{1,63})[/:]accesspoint[/:]'
2117 r'(?P<accesspoint_name>[a-zA-Z0-9\-]{1,63}$)'
2118 )
2119 _BLACKLISTED_OPERATIONS = ['CreateBucket']
2121 def __init__(self, arn_parser=None):
2122 self._arn_parser = arn_parser
2123 if arn_parser is None:
2124 self._arn_parser = ArnParser()
2126 def register(self, event_emitter):
2127 event_emitter.register('before-parameter-build.s3', self.handle_arn)
2129 def handle_arn(self, params, model, context, **kwargs):
2130 if model.name in self._BLACKLISTED_OPERATIONS:
2131 return
2132 arn_details = self._get_arn_details_from_bucket_param(params)
2133 if arn_details is None:
2134 return
2135 if arn_details['resource_type'] == 'accesspoint':
2136 self._store_accesspoint(params, context, arn_details)
2137 elif arn_details['resource_type'] == 'outpost':
2138 self._store_outpost(params, context, arn_details)
2140 def _get_arn_details_from_bucket_param(self, params):
2141 if 'Bucket' in params:
2142 try:
2143 arn = params['Bucket']
2144 arn_details = self._arn_parser.parse_arn(arn)
2145 self._add_resource_type_and_name(arn, arn_details)
2146 return arn_details
2147 except InvalidArnException:
2148 pass
2149 return None
2151 def _add_resource_type_and_name(self, arn, arn_details):
2152 match = self._RESOURCE_REGEX.match(arn_details['resource'])
2153 if match:
2154 arn_details['resource_type'] = match.group('resource_type')
2155 arn_details['resource_name'] = match.group('resource_name')
2156 else:
2157 raise UnsupportedS3ArnError(arn=arn)
2159 def _store_accesspoint(self, params, context, arn_details):
2160 # Ideally the access-point would be stored as a parameter in the
2161 # request where the serializer would then know how to serialize it,
2162 # but access-points are not modeled in S3 operations so it would fail
2163 # validation. Instead, we set the access-point to the bucket parameter
2164 # to have some value set when serializing the request and additional
2165 # information on the context from the arn to use in forming the
2166 # access-point endpoint.
2167 params['Bucket'] = arn_details['resource_name']
2168 context['s3_accesspoint'] = {
2169 'name': arn_details['resource_name'],
2170 'account': arn_details['account'],
2171 'partition': arn_details['partition'],
2172 'region': arn_details['region'],
2173 'service': arn_details['service'],
2174 }
2176 def _store_outpost(self, params, context, arn_details):
2177 resource_name = arn_details['resource_name']
2178 match = self._OUTPOST_RESOURCE_REGEX.match(resource_name)
2179 if not match:
2180 raise UnsupportedOutpostResourceError(resource_name=resource_name)
2181 # Because we need to set the bucket name to something to pass
2182 # validation we're going to use the access point name to be consistent
2183 # with normal access point arns.
2184 accesspoint_name = match.group('accesspoint_name')
2185 params['Bucket'] = accesspoint_name
2186 context['s3_accesspoint'] = {
2187 'outpost_name': match.group('outpost_name'),
2188 'name': accesspoint_name,
2189 'account': arn_details['account'],
2190 'partition': arn_details['partition'],
2191 'region': arn_details['region'],
2192 'service': arn_details['service'],
2193 }
2196class S3EndpointSetter:
2197 _DEFAULT_PARTITION = 'aws'
2198 _DEFAULT_DNS_SUFFIX = 'amazonaws.com'
2200 def __init__(
2201 self,
2202 endpoint_resolver,
2203 region=None,
2204 s3_config=None,
2205 endpoint_url=None,
2206 partition=None,
2207 use_fips_endpoint=False,
2208 ):
2209 # This is calling the endpoint_resolver in regions.py
2210 self._endpoint_resolver = endpoint_resolver
2211 self._region = region
2212 self._s3_config = s3_config
2213 self._use_fips_endpoint = use_fips_endpoint
2214 if s3_config is None:
2215 self._s3_config = {}
2216 self._endpoint_url = endpoint_url
2217 self._partition = partition
2218 if partition is None:
2219 self._partition = self._DEFAULT_PARTITION
2221 def register(self, event_emitter):
2222 event_emitter.register('before-sign.s3', self.set_endpoint)
2223 event_emitter.register('choose-signer.s3', self.set_signer)
2224 event_emitter.register(
2225 'before-call.s3.WriteGetObjectResponse',
2226 self.update_endpoint_to_s3_object_lambda,
2227 )
2229 def update_endpoint_to_s3_object_lambda(self, params, context, **kwargs):
2230 if self._use_accelerate_endpoint:
2231 raise UnsupportedS3ConfigurationError(
2232 msg='S3 client does not support accelerate endpoints for S3 Object Lambda operations',
2233 )
2235 self._override_signing_name(context, 's3-object-lambda')
2236 if self._endpoint_url:
2237 # Only update the url if an explicit url was not provided
2238 return
2240 resolver = self._endpoint_resolver
2241 # Constructing endpoints as s3-object-lambda as region
2242 resolved = resolver.construct_endpoint(
2243 's3-object-lambda', self._region
2244 )
2246 # Ideally we would be able to replace the endpoint before
2247 # serialization but there's no event to do that currently
2248 # host_prefix is all the arn/bucket specs
2249 new_endpoint = 'https://{host_prefix}{hostname}'.format(
2250 host_prefix=params['host_prefix'],
2251 hostname=resolved['hostname'],
2252 )
2254 params['url'] = _get_new_endpoint(params['url'], new_endpoint, False)
2256 def set_endpoint(self, request, **kwargs):
2257 if self._use_accesspoint_endpoint(request):
2258 self._validate_accesspoint_supported(request)
2259 self._validate_fips_supported(request)
2260 self._validate_global_regions(request)
2261 region_name = self._resolve_region_for_accesspoint_endpoint(
2262 request
2263 )
2264 self._resolve_signing_name_for_accesspoint_endpoint(request)
2265 self._switch_to_accesspoint_endpoint(request, region_name)
2266 return
2267 if self._use_accelerate_endpoint:
2268 if self._use_fips_endpoint:
2269 raise UnsupportedS3ConfigurationError(
2270 msg=(
2271 'Client is configured to use the FIPS psuedo region '
2272 f'for "{self._region}", but S3 Accelerate does not have any FIPS '
2273 'compatible endpoints.'
2274 )
2275 )
2276 switch_host_s3_accelerate(request=request, **kwargs)
2277 if self._s3_addressing_handler:
2278 self._s3_addressing_handler(request=request, **kwargs)
2280 def _use_accesspoint_endpoint(self, request):
2281 return 's3_accesspoint' in request.context
2283 def _validate_fips_supported(self, request):
2284 if not self._use_fips_endpoint:
2285 return
2286 if 'fips' in request.context['s3_accesspoint']['region']:
2287 raise UnsupportedS3AccesspointConfigurationError(
2288 msg={'Invalid ARN, FIPS region not allowed in ARN.'}
2289 )
2290 if 'outpost_name' in request.context['s3_accesspoint']:
2291 raise UnsupportedS3AccesspointConfigurationError(
2292 msg=(
2293 f'Client is configured to use the FIPS psuedo-region "{self._region}", '
2294 'but outpost ARNs do not support FIPS endpoints.'
2295 )
2296 )
2297 # Transforming psuedo region to actual region
2298 accesspoint_region = request.context['s3_accesspoint']['region']
2299 if accesspoint_region != self._region:
2300 if not self._s3_config.get('use_arn_region', True):
2301 # TODO: Update message to reflect use_arn_region
2302 # is not set
2303 raise UnsupportedS3AccesspointConfigurationError(
2304 msg=(
2305 'Client is configured to use the FIPS psuedo-region '
2306 f'for "{self._region}", but the access-point ARN provided is for '
2307 f'the "{accesspoint_region}" region. For clients using a FIPS '
2308 'psuedo-region calls to access-point ARNs in another '
2309 'region are not allowed.'
2310 )
2311 )
2313 def _validate_global_regions(self, request):
2314 if self._s3_config.get('use_arn_region', True):
2315 return
2316 if self._region in ['aws-global', 's3-external-1']:
2317 raise UnsupportedS3AccesspointConfigurationError(
2318 msg=(
2319 'Client is configured to use the global psuedo-region '
2320 f'"{self._region}". When providing access-point ARNs a regional '
2321 'endpoint must be specified.'
2322 )
2323 )
2325 def _validate_accesspoint_supported(self, request):
2326 if self._use_accelerate_endpoint:
2327 raise UnsupportedS3AccesspointConfigurationError(
2328 msg=(
2329 'Client does not support s3 accelerate configuration '
2330 'when an access-point ARN is specified.'
2331 )
2332 )
2333 request_partition = request.context['s3_accesspoint']['partition']
2334 if request_partition != self._partition:
2335 raise UnsupportedS3AccesspointConfigurationError(
2336 msg=(
2337 f'Client is configured for "{self._partition}" partition, but access-point'
2338 f' ARN provided is for "{request_partition}" partition. The client and '
2339 ' access-point partition must be the same.'
2340 )
2341 )
2342 s3_service = request.context['s3_accesspoint'].get('service')
2343 if s3_service == 's3-object-lambda' and self._s3_config.get(
2344 'use_dualstack_endpoint'
2345 ):
2346 raise UnsupportedS3AccesspointConfigurationError(
2347 msg=(
2348 'Client does not support s3 dualstack configuration '
2349 'when an S3 Object Lambda access point ARN is specified.'
2350 )
2351 )
2352 outpost_name = request.context['s3_accesspoint'].get('outpost_name')
2353 if outpost_name and self._s3_config.get('use_dualstack_endpoint'):
2354 raise UnsupportedS3AccesspointConfigurationError(
2355 msg=(
2356 'Client does not support s3 dualstack configuration '
2357 'when an outpost ARN is specified.'
2358 )
2359 )
2360 self._validate_mrap_s3_config(request)
2362 def _validate_mrap_s3_config(self, request):
2363 if not is_global_accesspoint(request.context):
2364 return
2365 if self._s3_config.get('s3_disable_multiregion_access_points'):
2366 raise UnsupportedS3AccesspointConfigurationError(
2367 msg=(
2368 'Invalid configuration, Multi-Region Access Point '
2369 'ARNs are disabled.'
2370 )
2371 )
2372 elif self._s3_config.get('use_dualstack_endpoint'):
2373 raise UnsupportedS3AccesspointConfigurationError(
2374 msg=(
2375 'Client does not support s3 dualstack configuration '
2376 'when a Multi-Region Access Point ARN is specified.'
2377 )
2378 )
2380 def _resolve_region_for_accesspoint_endpoint(self, request):
2381 if is_global_accesspoint(request.context):
2382 # Requests going to MRAP endpoints MUST be set to any (*) region.
2383 self._override_signing_region(request, '*')
2384 elif self._s3_config.get('use_arn_region', True):
2385 accesspoint_region = request.context['s3_accesspoint']['region']
2386 # If we are using the region from the access point,
2387 # we will also want to make sure that we set it as the
2388 # signing region as well
2389 self._override_signing_region(request, accesspoint_region)
2390 return accesspoint_region
2391 return self._region
2393 def set_signer(self, context, **kwargs):
2394 if is_global_accesspoint(context):
2395 if HAS_CRT:
2396 return 's3v4a'
2397 else:
2398 raise MissingDependencyException(
2399 msg="Using S3 with an MRAP arn requires an additional "
2400 "dependency. You will need to pip install "
2401 "botocore[crt] before proceeding."
2402 )
2404 def _resolve_signing_name_for_accesspoint_endpoint(self, request):
2405 accesspoint_service = request.context['s3_accesspoint']['service']
2406 self._override_signing_name(request.context, accesspoint_service)
2408 def _switch_to_accesspoint_endpoint(self, request, region_name):
2409 original_components = urlsplit(request.url)
2410 accesspoint_endpoint = urlunsplit(
2411 (
2412 original_components.scheme,
2413 self._get_netloc(request.context, region_name),
2414 self._get_accesspoint_path(
2415 original_components.path, request.context
2416 ),
2417 original_components.query,
2418 '',
2419 )
2420 )
2421 logger.debug(
2422 f'Updating URI from {request.url} to {accesspoint_endpoint}'
2423 )
2424 request.url = accesspoint_endpoint
2426 def _get_netloc(self, request_context, region_name):
2427 if is_global_accesspoint(request_context):
2428 return self._get_mrap_netloc(request_context)
2429 else:
2430 return self._get_accesspoint_netloc(request_context, region_name)
2432 def _get_mrap_netloc(self, request_context):
2433 s3_accesspoint = request_context['s3_accesspoint']
2434 region_name = 's3-global'
2435 mrap_netloc_components = [s3_accesspoint['name']]
2436 if self._endpoint_url:
2437 endpoint_url_netloc = urlsplit(self._endpoint_url).netloc
2438 mrap_netloc_components.append(endpoint_url_netloc)
2439 else:
2440 partition = s3_accesspoint['partition']
2441 mrap_netloc_components.extend(
2442 [
2443 'accesspoint',
2444 region_name,
2445 self._get_partition_dns_suffix(partition),
2446 ]
2447 )
2448 return '.'.join(mrap_netloc_components)
2450 def _get_accesspoint_netloc(self, request_context, region_name):
2451 s3_accesspoint = request_context['s3_accesspoint']
2452 accesspoint_netloc_components = [
2453 '{}-{}'.format(s3_accesspoint['name'], s3_accesspoint['account']),
2454 ]
2455 outpost_name = s3_accesspoint.get('outpost_name')
2456 if self._endpoint_url:
2457 if outpost_name:
2458 accesspoint_netloc_components.append(outpost_name)
2459 endpoint_url_netloc = urlsplit(self._endpoint_url).netloc
2460 accesspoint_netloc_components.append(endpoint_url_netloc)
2461 else:
2462 if outpost_name:
2463 outpost_host = [outpost_name, 's3-outposts']
2464 accesspoint_netloc_components.extend(outpost_host)
2465 elif s3_accesspoint['service'] == 's3-object-lambda':
2466 component = self._inject_fips_if_needed(
2467 's3-object-lambda', request_context
2468 )
2469 accesspoint_netloc_components.append(component)
2470 else:
2471 component = self._inject_fips_if_needed(
2472 's3-accesspoint', request_context
2473 )
2474 accesspoint_netloc_components.append(component)
2475 if self._s3_config.get('use_dualstack_endpoint'):
2476 accesspoint_netloc_components.append('dualstack')
2477 accesspoint_netloc_components.extend(
2478 [region_name, self._get_dns_suffix(region_name)]
2479 )
2480 return '.'.join(accesspoint_netloc_components)
2482 def _inject_fips_if_needed(self, component, request_context):
2483 if self._use_fips_endpoint:
2484 return f'{component}-fips'
2485 return component
2487 def _get_accesspoint_path(self, original_path, request_context):
2488 # The Bucket parameter was substituted with the access-point name as
2489 # some value was required in serializing the bucket name. Now that
2490 # we are making the request directly to the access point, we will
2491 # want to remove that access-point name from the path.
2492 name = request_context['s3_accesspoint']['name']
2493 # All S3 operations require at least a / in their path.
2494 return original_path.replace('/' + name, '', 1) or '/'
2496 def _get_partition_dns_suffix(self, partition_name):
2497 dns_suffix = self._endpoint_resolver.get_partition_dns_suffix(
2498 partition_name
2499 )
2500 if dns_suffix is None:
2501 dns_suffix = self._DEFAULT_DNS_SUFFIX
2502 return dns_suffix
2504 def _get_dns_suffix(self, region_name):
2505 resolved = self._endpoint_resolver.construct_endpoint(
2506 's3', region_name
2507 )
2508 dns_suffix = self._DEFAULT_DNS_SUFFIX
2509 if resolved and 'dnsSuffix' in resolved:
2510 dns_suffix = resolved['dnsSuffix']
2511 return dns_suffix
2513 def _override_signing_region(self, request, region_name):
2514 signing_context = request.context.get('signing', {})
2515 # S3SigV4Auth will use the context['signing']['region'] value to
2516 # sign with if present. This is used by the Bucket redirector
2517 # as well but we should be fine because the redirector is never
2518 # used in combination with the accesspoint setting logic.
2519 signing_context['region'] = region_name
2520 request.context['signing'] = signing_context
2522 def _override_signing_name(self, context, signing_name):
2523 signing_context = context.get('signing', {})
2524 # S3SigV4Auth will use the context['signing']['signing_name'] value to
2525 # sign with if present. This is used by the Bucket redirector
2526 # as well but we should be fine because the redirector is never
2527 # used in combination with the accesspoint setting logic.
2528 signing_context['signing_name'] = signing_name
2529 context['signing'] = signing_context
2531 @CachedProperty
2532 def _use_accelerate_endpoint(self):
2533 # Enable accelerate if the configuration is set to to true or the
2534 # endpoint being used matches one of the accelerate endpoints.
2536 # Accelerate has been explicitly configured.
2537 if self._s3_config.get('use_accelerate_endpoint'):
2538 return True
2540 # Accelerate mode is turned on automatically if an endpoint url is
2541 # provided that matches the accelerate scheme.
2542 if self._endpoint_url is None:
2543 return False
2545 # Accelerate is only valid for Amazon endpoints.
2546 netloc = urlsplit(self._endpoint_url).netloc
2547 if not netloc.endswith('amazonaws.com'):
2548 return False
2550 # The first part of the url should always be s3-accelerate.
2551 parts = netloc.split('.')
2552 if parts[0] != 's3-accelerate':
2553 return False
2555 # Url parts between 's3-accelerate' and 'amazonaws.com' which
2556 # represent different url features.
2557 feature_parts = parts[1:-2]
2559 # There should be no duplicate url parts.
2560 if len(feature_parts) != len(set(feature_parts)):
2561 return False
2563 # Remaining parts must all be in the whitelist.
2564 return all(p in S3_ACCELERATE_WHITELIST for p in feature_parts)
2566 @CachedProperty
2567 def _addressing_style(self):
2568 # Use virtual host style addressing if accelerate is enabled or if
2569 # the given endpoint url is an accelerate endpoint.
2570 if self._use_accelerate_endpoint:
2571 return 'virtual'
2573 # If a particular addressing style is configured, use it.
2574 configured_addressing_style = self._s3_config.get('addressing_style')
2575 if configured_addressing_style:
2576 return configured_addressing_style
2578 @CachedProperty
2579 def _s3_addressing_handler(self):
2580 # If virtual host style was configured, use it regardless of whether
2581 # or not the bucket looks dns compatible.
2582 if self._addressing_style == 'virtual':
2583 logger.debug("Using S3 virtual host style addressing.")
2584 return switch_to_virtual_host_style
2586 # If path style is configured, no additional steps are needed. If
2587 # endpoint_url was specified, don't default to virtual. We could
2588 # potentially default provided endpoint urls to virtual hosted
2589 # style, but for now it is avoided.
2590 if self._addressing_style == 'path' or self._endpoint_url is not None:
2591 logger.debug("Using S3 path style addressing.")
2592 return None
2594 logger.debug(
2595 "Defaulting to S3 virtual host style addressing with "
2596 "path style addressing fallback."
2597 )
2599 # By default, try to use virtual style with path fallback.
2600 return fix_s3_host
2603class S3ControlEndpointSetter:
2604 _DEFAULT_PARTITION = 'aws'
2605 _DEFAULT_DNS_SUFFIX = 'amazonaws.com'
2606 _HOST_LABEL_REGEX = re.compile(r'^[a-zA-Z0-9\-]{1,63}$')
2608 def __init__(
2609 self,
2610 endpoint_resolver,
2611 region=None,
2612 s3_config=None,
2613 endpoint_url=None,
2614 partition=None,
2615 use_fips_endpoint=False,
2616 ):
2617 self._endpoint_resolver = endpoint_resolver
2618 self._region = region
2619 self._s3_config = s3_config
2620 self._use_fips_endpoint = use_fips_endpoint
2621 if s3_config is None:
2622 self._s3_config = {}
2623 self._endpoint_url = endpoint_url
2624 self._partition = partition
2625 if partition is None:
2626 self._partition = self._DEFAULT_PARTITION
2628 def register(self, event_emitter):
2629 event_emitter.register('before-sign.s3-control', self.set_endpoint)
2631 def set_endpoint(self, request, **kwargs):
2632 if self._use_endpoint_from_arn_details(request):
2633 self._validate_endpoint_from_arn_details_supported(request)
2634 region_name = self._resolve_region_from_arn_details(request)
2635 self._resolve_signing_name_from_arn_details(request)
2636 self._resolve_endpoint_from_arn_details(request, region_name)
2637 self._add_headers_from_arn_details(request)
2638 elif self._use_endpoint_from_outpost_id(request):
2639 self._validate_outpost_redirection_valid(request)
2640 self._override_signing_name(request, 's3-outposts')
2641 new_netloc = self._construct_outpost_endpoint(self._region)
2642 self._update_request_netloc(request, new_netloc)
2644 def _use_endpoint_from_arn_details(self, request):
2645 return 'arn_details' in request.context
2647 def _use_endpoint_from_outpost_id(self, request):
2648 return 'outpost_id' in request.context
2650 def _validate_endpoint_from_arn_details_supported(self, request):
2651 if 'fips' in request.context['arn_details']['region']:
2652 raise UnsupportedS3ControlArnError(
2653 arn=request.context['arn_details']['original'],
2654 msg='Invalid ARN, FIPS region not allowed in ARN.',
2655 )
2656 if not self._s3_config.get('use_arn_region', False):
2657 arn_region = request.context['arn_details']['region']
2658 if arn_region != self._region:
2659 error_msg = (
2660 'The use_arn_region configuration is disabled but '
2661 f'received arn for "{arn_region}" when the client is configured '
2662 f'to use "{self._region}"'
2663 )
2664 raise UnsupportedS3ControlConfigurationError(msg=error_msg)
2665 request_partion = request.context['arn_details']['partition']
2666 if request_partion != self._partition:
2667 raise UnsupportedS3ControlConfigurationError(
2668 msg=(
2669 f'Client is configured for "{self._partition}" partition, but arn '
2670 f'provided is for "{request_partion}" partition. The client and '
2671 'arn partition must be the same.'
2672 )
2673 )
2674 if self._s3_config.get('use_accelerate_endpoint'):
2675 raise UnsupportedS3ControlConfigurationError(
2676 msg='S3 control client does not support accelerate endpoints',
2677 )
2678 if 'outpost_name' in request.context['arn_details']:
2679 self._validate_outpost_redirection_valid(request)
2681 def _validate_outpost_redirection_valid(self, request):
2682 if self._s3_config.get('use_dualstack_endpoint'):
2683 raise UnsupportedS3ControlConfigurationError(
2684 msg=(
2685 'Client does not support s3 dualstack configuration '
2686 'when an outpost is specified.'
2687 )
2688 )
2690 def _resolve_region_from_arn_details(self, request):
2691 if self._s3_config.get('use_arn_region', False):
2692 arn_region = request.context['arn_details']['region']
2693 # If we are using the region from the expanded arn, we will also
2694 # want to make sure that we set it as the signing region as well
2695 self._override_signing_region(request, arn_region)
2696 return arn_region
2697 return self._region
2699 def _resolve_signing_name_from_arn_details(self, request):
2700 arn_service = request.context['arn_details']['service']
2701 self._override_signing_name(request, arn_service)
2702 return arn_service
2704 def _resolve_endpoint_from_arn_details(self, request, region_name):
2705 new_netloc = self._resolve_netloc_from_arn_details(
2706 request, region_name
2707 )
2708 self._update_request_netloc(request, new_netloc)
2710 def _update_request_netloc(self, request, new_netloc):
2711 original_components = urlsplit(request.url)
2712 arn_details_endpoint = urlunsplit(
2713 (
2714 original_components.scheme,
2715 new_netloc,
2716 original_components.path,
2717 original_components.query,
2718 '',
2719 )
2720 )
2721 logger.debug(
2722 f'Updating URI from {request.url} to {arn_details_endpoint}'
2723 )
2724 request.url = arn_details_endpoint
2726 def _resolve_netloc_from_arn_details(self, request, region_name):
2727 arn_details = request.context['arn_details']
2728 if 'outpost_name' in arn_details:
2729 return self._construct_outpost_endpoint(region_name)
2730 account = arn_details['account']
2731 return self._construct_s3_control_endpoint(region_name, account)
2733 def _is_valid_host_label(self, label):
2734 return self._HOST_LABEL_REGEX.match(label)
2736 def _validate_host_labels(self, *labels):
2737 for label in labels:
2738 if not self._is_valid_host_label(label):
2739 raise InvalidHostLabelError(label=label)
2741 def _construct_s3_control_endpoint(self, region_name, account):
2742 self._validate_host_labels(region_name, account)
2743 if self._endpoint_url:
2744 endpoint_url_netloc = urlsplit(self._endpoint_url).netloc
2745 netloc = [account, endpoint_url_netloc]
2746 else:
2747 netloc = [
2748 account,
2749 's3-control',
2750 ]
2751 self._add_dualstack(netloc)
2752 dns_suffix = self._get_dns_suffix(region_name)
2753 netloc.extend([region_name, dns_suffix])
2754 return self._construct_netloc(netloc)
2756 def _construct_outpost_endpoint(self, region_name):
2757 self._validate_host_labels(region_name)
2758 if self._endpoint_url:
2759 return urlsplit(self._endpoint_url).netloc
2760 else:
2761 netloc = [
2762 's3-outposts',
2763 region_name,
2764 self._get_dns_suffix(region_name),
2765 ]
2766 self._add_fips(netloc)
2767 return self._construct_netloc(netloc)
2769 def _construct_netloc(self, netloc):
2770 return '.'.join(netloc)
2772 def _add_fips(self, netloc):
2773 if self._use_fips_endpoint:
2774 netloc[0] = netloc[0] + '-fips'
2776 def _add_dualstack(self, netloc):
2777 if self._s3_config.get('use_dualstack_endpoint'):
2778 netloc.append('dualstack')
2780 def _get_dns_suffix(self, region_name):
2781 resolved = self._endpoint_resolver.construct_endpoint(
2782 's3', region_name
2783 )
2784 dns_suffix = self._DEFAULT_DNS_SUFFIX
2785 if resolved and 'dnsSuffix' in resolved:
2786 dns_suffix = resolved['dnsSuffix']
2787 return dns_suffix
2789 def _override_signing_region(self, request, region_name):
2790 signing_context = request.context.get('signing', {})
2791 # S3SigV4Auth will use the context['signing']['region'] value to
2792 # sign with if present. This is used by the Bucket redirector
2793 # as well but we should be fine because the redirector is never
2794 # used in combination with the accesspoint setting logic.
2795 signing_context['region'] = region_name
2796 request.context['signing'] = signing_context
2798 def _override_signing_name(self, request, signing_name):
2799 signing_context = request.context.get('signing', {})
2800 # S3SigV4Auth will use the context['signing']['signing_name'] value to
2801 # sign with if present. This is used by the Bucket redirector
2802 # as well but we should be fine because the redirector is never
2803 # used in combination with the accesspoint setting logic.
2804 signing_context['signing_name'] = signing_name
2805 request.context['signing'] = signing_context
2807 def _add_headers_from_arn_details(self, request):
2808 arn_details = request.context['arn_details']
2809 outpost_name = arn_details.get('outpost_name')
2810 if outpost_name:
2811 self._add_outpost_id_header(request, outpost_name)
2813 def _add_outpost_id_header(self, request, outpost_name):
2814 request.headers['x-amz-outpost-id'] = outpost_name
2817class S3ControlArnParamHandler:
2818 """This handler has been replaced by S3ControlArnParamHandlerv2. The
2819 original version remains in place for any third-party importers.
2820 """
2822 _RESOURCE_SPLIT_REGEX = re.compile(r'[/:]')
2824 def __init__(self, arn_parser=None):
2825 self._arn_parser = arn_parser
2826 if arn_parser is None:
2827 self._arn_parser = ArnParser()
2828 warnings.warn(
2829 'The S3ControlArnParamHandler class has been deprecated for a new '
2830 'internal replacement. A future version of botocore may remove '
2831 'this class.',
2832 category=FutureWarning,
2833 )
2835 def register(self, event_emitter):
2836 event_emitter.register(
2837 'before-parameter-build.s3-control',
2838 self.handle_arn,
2839 )
2841 def handle_arn(self, params, model, context, **kwargs):
2842 if model.name in ('CreateBucket', 'ListRegionalBuckets'):
2843 # CreateBucket and ListRegionalBuckets are special cases that do
2844 # not obey ARN based redirection but will redirect based off of the
2845 # presence of the OutpostId parameter
2846 self._handle_outpost_id_param(params, model, context)
2847 else:
2848 self._handle_name_param(params, model, context)
2849 self._handle_bucket_param(params, model, context)
2851 def _get_arn_details_from_param(self, params, param_name):
2852 if param_name not in params:
2853 return None
2854 try:
2855 arn = params[param_name]
2856 arn_details = self._arn_parser.parse_arn(arn)
2857 arn_details['original'] = arn
2858 arn_details['resources'] = self._split_resource(arn_details)
2859 return arn_details
2860 except InvalidArnException:
2861 return None
2863 def _split_resource(self, arn_details):
2864 return self._RESOURCE_SPLIT_REGEX.split(arn_details['resource'])
2866 def _override_account_id_param(self, params, arn_details):
2867 account_id = arn_details['account']
2868 if 'AccountId' in params and params['AccountId'] != account_id:
2869 error_msg = (
2870 'Account ID in arn does not match the AccountId parameter '
2871 'provided: "{}"'
2872 ).format(params['AccountId'])
2873 raise UnsupportedS3ControlArnError(
2874 arn=arn_details['original'],
2875 msg=error_msg,
2876 )
2877 params['AccountId'] = account_id
2879 def _handle_outpost_id_param(self, params, model, context):
2880 if 'OutpostId' not in params:
2881 return
2882 context['outpost_id'] = params['OutpostId']
2884 def _handle_name_param(self, params, model, context):
2885 # CreateAccessPoint is a special case that does not expand Name
2886 if model.name == 'CreateAccessPoint':
2887 return
2888 arn_details = self._get_arn_details_from_param(params, 'Name')
2889 if arn_details is None:
2890 return
2891 if self._is_outpost_accesspoint(arn_details):
2892 self._store_outpost_accesspoint(params, context, arn_details)
2893 else:
2894 error_msg = 'The Name parameter does not support the provided ARN'
2895 raise UnsupportedS3ControlArnError(
2896 arn=arn_details['original'],
2897 msg=error_msg,
2898 )
2900 def _is_outpost_accesspoint(self, arn_details):
2901 if arn_details['service'] != 's3-outposts':
2902 return False
2903 resources = arn_details['resources']
2904 if len(resources) != 4:
2905 return False
2906 # Resource must be of the form outpost/op-123/accesspoint/name
2907 return resources[0] == 'outpost' and resources[2] == 'accesspoint'
2909 def _store_outpost_accesspoint(self, params, context, arn_details):
2910 self._override_account_id_param(params, arn_details)
2911 accesspoint_name = arn_details['resources'][3]
2912 params['Name'] = accesspoint_name
2913 arn_details['accesspoint_name'] = accesspoint_name
2914 arn_details['outpost_name'] = arn_details['resources'][1]
2915 context['arn_details'] = arn_details
2917 def _handle_bucket_param(self, params, model, context):
2918 arn_details = self._get_arn_details_from_param(params, 'Bucket')
2919 if arn_details is None:
2920 return
2921 if self._is_outpost_bucket(arn_details):
2922 self._store_outpost_bucket(params, context, arn_details)
2923 else:
2924 error_msg = (
2925 'The Bucket parameter does not support the provided ARN'
2926 )
2927 raise UnsupportedS3ControlArnError(
2928 arn=arn_details['original'],
2929 msg=error_msg,
2930 )
2932 def _is_outpost_bucket(self, arn_details):
2933 if arn_details['service'] != 's3-outposts':
2934 return False
2935 resources = arn_details['resources']
2936 if len(resources) != 4:
2937 return False
2938 # Resource must be of the form outpost/op-123/bucket/name
2939 return resources[0] == 'outpost' and resources[2] == 'bucket'
2941 def _store_outpost_bucket(self, params, context, arn_details):
2942 self._override_account_id_param(params, arn_details)
2943 bucket_name = arn_details['resources'][3]
2944 params['Bucket'] = bucket_name
2945 arn_details['bucket_name'] = bucket_name
2946 arn_details['outpost_name'] = arn_details['resources'][1]
2947 context['arn_details'] = arn_details
2950class S3ControlArnParamHandlerv2(S3ControlArnParamHandler):
2951 """Updated version of S3ControlArnParamHandler for use when
2952 EndpointRulesetResolver is in use for endpoint resolution.
2954 This class is considered private and subject to abrupt breaking changes or
2955 removal without prior announcement. Please do not use it directly.
2956 """
2958 def __init__(self, arn_parser=None):
2959 self._arn_parser = arn_parser
2960 if arn_parser is None:
2961 self._arn_parser = ArnParser()
2963 def register(self, event_emitter):
2964 event_emitter.register(
2965 'before-endpoint-resolution.s3-control',
2966 self.handle_arn,
2967 )
2969 def _handle_name_param(self, params, model, context):
2970 # CreateAccessPoint is a special case that does not expand Name
2971 if model.name == 'CreateAccessPoint':
2972 return
2973 arn_details = self._get_arn_details_from_param(params, 'Name')
2974 if arn_details is None:
2975 return
2976 self._raise_for_fips_pseudo_region(arn_details)
2977 self._raise_for_accelerate_endpoint(context)
2978 if self._is_outpost_accesspoint(arn_details):
2979 self._store_outpost_accesspoint(params, context, arn_details)
2980 else:
2981 error_msg = 'The Name parameter does not support the provided ARN'
2982 raise UnsupportedS3ControlArnError(
2983 arn=arn_details['original'],
2984 msg=error_msg,
2985 )
2987 def _store_outpost_accesspoint(self, params, context, arn_details):
2988 self._override_account_id_param(params, arn_details)
2990 def _handle_bucket_param(self, params, model, context):
2991 arn_details = self._get_arn_details_from_param(params, 'Bucket')
2992 if arn_details is None:
2993 return
2994 self._raise_for_fips_pseudo_region(arn_details)
2995 self._raise_for_accelerate_endpoint(context)
2996 if self._is_outpost_bucket(arn_details):
2997 self._store_outpost_bucket(params, context, arn_details)
2998 else:
2999 error_msg = (
3000 'The Bucket parameter does not support the provided ARN'
3001 )
3002 raise UnsupportedS3ControlArnError(
3003 arn=arn_details['original'],
3004 msg=error_msg,
3005 )
3007 def _store_outpost_bucket(self, params, context, arn_details):
3008 self._override_account_id_param(params, arn_details)
3010 def _raise_for_fips_pseudo_region(self, arn_details):
3011 # FIPS pseudo region names cannot be used in ARNs
3012 arn_region = arn_details['region']
3013 if arn_region.startswith('fips-') or arn_region.endswith('fips-'):
3014 raise UnsupportedS3ControlArnError(
3015 arn=arn_details['original'],
3016 msg='Invalid ARN, FIPS region not allowed in ARN.',
3017 )
3019 def _raise_for_accelerate_endpoint(self, context):
3020 s3_config = context['client_config'].s3 or {}
3021 if s3_config.get('use_accelerate_endpoint'):
3022 raise UnsupportedS3ControlConfigurationError(
3023 msg='S3 control client does not support accelerate endpoints',
3024 )
3027class ContainerMetadataFetcher:
3028 TIMEOUT_SECONDS = 2
3029 RETRY_ATTEMPTS = 3
3030 SLEEP_TIME = 1
3031 IP_ADDRESS = '169.254.170.2'
3032 _ALLOWED_HOSTS = [
3033 IP_ADDRESS,
3034 '169.254.170.23',
3035 'fd00:ec2::23',
3036 'localhost',
3037 ]
3039 def __init__(self, session=None, sleep=time.sleep):
3040 if session is None:
3041 session = botocore.httpsession.URLLib3Session(
3042 timeout=self.TIMEOUT_SECONDS
3043 )
3044 self._session = session
3045 self._sleep = sleep
3047 def retrieve_full_uri(self, full_url, headers=None):
3048 """Retrieve JSON metadata from container metadata.
3050 :type full_url: str
3051 :param full_url: The full URL of the metadata service.
3052 This should include the scheme as well, e.g
3053 "http://localhost:123/foo"
3055 """
3056 self._validate_allowed_url(full_url)
3057 return self._retrieve_credentials(full_url, headers)
3059 def _validate_allowed_url(self, full_url):
3060 parsed = botocore.compat.urlparse(full_url)
3061 if self._is_loopback_address(parsed.hostname):
3062 return
3063 is_whitelisted_host = self._check_if_whitelisted_host(parsed.hostname)
3064 if not is_whitelisted_host:
3065 raise ValueError(
3066 f"Unsupported host '{parsed.hostname}'. Can only retrieve metadata "
3067 f"from a loopback address or one of these hosts: {', '.join(self._ALLOWED_HOSTS)}"
3068 )
3070 def _is_loopback_address(self, hostname):
3071 try:
3072 ip = ip_address(hostname)
3073 return ip.is_loopback
3074 except ValueError:
3075 return False
3077 def _check_if_whitelisted_host(self, host):
3078 if host in self._ALLOWED_HOSTS:
3079 return True
3080 return False
3082 def retrieve_uri(self, relative_uri):
3083 """Retrieve JSON metadata from container metadata.
3085 :type relative_uri: str
3086 :param relative_uri: A relative URI, e.g "/foo/bar?id=123"
3088 :return: The parsed JSON response.
3090 """
3091 full_url = self.full_url(relative_uri)
3092 return self._retrieve_credentials(full_url)
3094 def _retrieve_credentials(self, full_url, extra_headers=None):
3095 headers = {'Accept': 'application/json'}
3096 if extra_headers is not None:
3097 headers.update(extra_headers)
3098 attempts = 0
3099 while True:
3100 try:
3101 return self._get_response(
3102 full_url, headers, self.TIMEOUT_SECONDS
3103 )
3104 except MetadataRetrievalError as e:
3105 logger.debug(
3106 "Received error when attempting to retrieve "
3107 "container metadata: %s",
3108 e,
3109 exc_info=True,
3110 )
3111 self._sleep(self.SLEEP_TIME)
3112 attempts += 1
3113 if attempts >= self.RETRY_ATTEMPTS:
3114 raise
3116 def _get_response(self, full_url, headers, timeout):
3117 try:
3118 AWSRequest = botocore.awsrequest.AWSRequest
3119 request = AWSRequest(method='GET', url=full_url, headers=headers)
3120 response = self._session.send(request.prepare())
3121 response_text = response.content.decode('utf-8')
3122 if response.status_code != 200:
3123 raise MetadataRetrievalError(
3124 error_msg=(
3125 f"Received non 200 response {response.status_code} "
3126 f"from container metadata: {response_text}"
3127 )
3128 )
3129 try:
3130 return json.loads(response_text)
3131 except ValueError:
3132 error_msg = "Unable to parse JSON returned from container metadata services"
3133 logger.debug('%s:%s', error_msg, response_text)
3134 raise MetadataRetrievalError(error_msg=error_msg)
3135 except RETRYABLE_HTTP_ERRORS as e:
3136 error_msg = (
3137 "Received error when attempting to retrieve "
3138 f"container metadata: {e}"
3139 )
3140 raise MetadataRetrievalError(error_msg=error_msg)
3142 def full_url(self, relative_uri):
3143 return f'http://{self.IP_ADDRESS}{relative_uri}'
3146def get_environ_proxies(url):
3147 if should_bypass_proxies(url):
3148 return {}
3149 else:
3150 return getproxies()
3153def should_bypass_proxies(url):
3154 """
3155 Returns whether we should bypass proxies or not.
3156 """
3157 # NOTE: requests allowed for ip/cidr entries in no_proxy env that we don't
3158 # support current as urllib only checks DNS suffix
3159 # If the system proxy settings indicate that this URL should be bypassed,
3160 # don't proxy.
3161 # The proxy_bypass function is incredibly buggy on OS X in early versions
3162 # of Python 2.6, so allow this call to fail. Only catch the specific
3163 # exceptions we've seen, though: this call failing in other ways can reveal
3164 # legitimate problems.
3165 try:
3166 if proxy_bypass(urlparse(url).netloc):
3167 return True
3168 except (TypeError, socket.gaierror):
3169 pass
3171 return False
3174def determine_content_length(body):
3175 # No body, content length of 0
3176 if not body:
3177 return 0
3179 # Try asking the body for it's length
3180 try:
3181 return len(body)
3182 except (AttributeError, TypeError):
3183 pass
3185 # Try getting the length from a seekable stream
3186 if hasattr(body, 'seek') and hasattr(body, 'tell'):
3187 try:
3188 orig_pos = body.tell()
3189 body.seek(0, 2)
3190 end_file_pos = body.tell()
3191 body.seek(orig_pos)
3192 return end_file_pos - orig_pos
3193 except io.UnsupportedOperation:
3194 # in case when body is, for example, io.BufferedIOBase object
3195 # it has "seek" method which throws "UnsupportedOperation"
3196 # exception in such case we want to fall back to "chunked"
3197 # encoding
3198 pass
3199 # Failed to determine the length
3200 return None
3203def get_encoding_from_headers(headers, default='ISO-8859-1'):
3204 """Returns encodings from given HTTP Header Dict.
3206 :param headers: dictionary to extract encoding from.
3207 :param default: default encoding if the content-type is text
3208 """
3210 content_type = headers.get('content-type')
3212 if not content_type:
3213 return None
3215 message = email.message.Message()
3216 message['content-type'] = content_type
3217 charset = message.get_param("charset")
3219 if charset is not None:
3220 return charset
3222 if 'text' in content_type:
3223 return default
3226def calculate_md5(body, **kwargs):
3227 if isinstance(body, (bytes, bytearray)):
3228 binary_md5 = _calculate_md5_from_bytes(body)
3229 else:
3230 binary_md5 = _calculate_md5_from_file(body)
3231 return base64.b64encode(binary_md5).decode('ascii')
3234def _calculate_md5_from_bytes(body_bytes):
3235 md5 = get_md5(body_bytes)
3236 return md5.digest()
3239def _calculate_md5_from_file(fileobj):
3240 start_position = fileobj.tell()
3241 md5 = get_md5()
3242 for chunk in iter(lambda: fileobj.read(1024 * 1024), b''):
3243 md5.update(chunk)
3244 fileobj.seek(start_position)
3245 return md5.digest()
3248def _is_s3express_request(params):
3249 endpoint_properties = params.get('context', {}).get(
3250 'endpoint_properties', {}
3251 )
3252 return endpoint_properties.get('backend') == 'S3Express'
3255def _has_checksum_header(params):
3256 headers = params['headers']
3257 # If a user provided Content-MD5 is present,
3258 # don't try to compute a new one.
3259 if 'Content-MD5' in headers:
3260 return True
3262 # If a header matching the x-amz-checksum-* pattern is present, we
3263 # assume a checksum has already been provided and an md5 is not needed
3264 for header in headers:
3265 if CHECKSUM_HEADER_PATTERN.match(header):
3266 return True
3268 return False
3271def conditionally_calculate_checksum(params, **kwargs):
3272 if not _has_checksum_header(params):
3273 conditionally_calculate_md5(params, **kwargs)
3274 conditionally_enable_crc32(params, **kwargs)
3277def conditionally_enable_crc32(params, **kwargs):
3278 checksum_context = params.get('context', {}).get('checksum', {})
3279 checksum_algorithm = checksum_context.get('request_algorithm')
3280 if (
3281 _is_s3express_request(params)
3282 and params['body'] is not None
3283 and checksum_algorithm in (None, "conditional-md5")
3284 ):
3285 params['context']['checksum'] = {
3286 'request_algorithm': {
3287 'algorithm': 'crc32',
3288 'in': 'header',
3289 'name': 'x-amz-checksum-crc32',
3290 }
3291 }
3294def conditionally_calculate_md5(params, **kwargs):
3295 """Only add a Content-MD5 if the system supports it."""
3296 body = params['body']
3297 checksum_context = params.get('context', {}).get('checksum', {})
3298 checksum_algorithm = checksum_context.get('request_algorithm')
3299 if checksum_algorithm and checksum_algorithm != 'conditional-md5':
3300 # Skip for requests that will have a flexible checksum applied
3301 return
3303 if _has_checksum_header(params):
3304 # Don't add a new header if one is already available.
3305 return
3307 if _is_s3express_request(params):
3308 # S3Express doesn't support MD5
3309 return
3311 if MD5_AVAILABLE and body is not None:
3312 md5_digest = calculate_md5(body, **kwargs)
3313 params['headers']['Content-MD5'] = md5_digest
3316class FileWebIdentityTokenLoader:
3317 def __init__(self, web_identity_token_path, _open=open):
3318 self._web_identity_token_path = web_identity_token_path
3319 self._open = _open
3321 def __call__(self):
3322 with self._open(self._web_identity_token_path) as token_file:
3323 return token_file.read()
3326class SSOTokenLoader:
3327 def __init__(self, cache=None):
3328 if cache is None:
3329 cache = {}
3330 self._cache = cache
3332 def _generate_cache_key(self, start_url, session_name):
3333 input_str = start_url
3334 if session_name is not None:
3335 input_str = session_name
3336 return hashlib.sha1(input_str.encode('utf-8')).hexdigest()
3338 def save_token(self, start_url, token, session_name=None):
3339 cache_key = self._generate_cache_key(start_url, session_name)
3340 self._cache[cache_key] = token
3342 def __call__(self, start_url, session_name=None):
3343 cache_key = self._generate_cache_key(start_url, session_name)
3344 logger.debug(f'Checking for cached token at: {cache_key}')
3345 if cache_key not in self._cache:
3346 name = start_url
3347 if session_name is not None:
3348 name = session_name
3349 error_msg = f'Token for {name} does not exist'
3350 raise SSOTokenLoadError(error_msg=error_msg)
3352 token = self._cache[cache_key]
3353 if 'accessToken' not in token or 'expiresAt' not in token:
3354 error_msg = f'Token for {start_url} is invalid'
3355 raise SSOTokenLoadError(error_msg=error_msg)
3356 return token
3359class EventbridgeSignerSetter:
3360 _DEFAULT_PARTITION = 'aws'
3361 _DEFAULT_DNS_SUFFIX = 'amazonaws.com'
3363 def __init__(self, endpoint_resolver, region=None, endpoint_url=None):
3364 self._endpoint_resolver = endpoint_resolver
3365 self._region = region
3366 self._endpoint_url = endpoint_url
3368 def register(self, event_emitter):
3369 event_emitter.register(
3370 'before-parameter-build.events.PutEvents',
3371 self.check_for_global_endpoint,
3372 )
3373 event_emitter.register(
3374 'before-call.events.PutEvents', self.set_endpoint_url
3375 )
3377 def set_endpoint_url(self, params, context, **kwargs):
3378 if 'eventbridge_endpoint' in context:
3379 endpoint = context['eventbridge_endpoint']
3380 logger.debug(f"Rewriting URL from {params['url']} to {endpoint}")
3381 params['url'] = endpoint
3383 def check_for_global_endpoint(self, params, context, **kwargs):
3384 endpoint = params.get('EndpointId')
3385 if endpoint is None:
3386 return
3388 if len(endpoint) == 0:
3389 raise InvalidEndpointConfigurationError(
3390 msg='EndpointId must not be a zero length string'
3391 )
3393 if not HAS_CRT:
3394 raise MissingDependencyException(
3395 msg="Using EndpointId requires an additional "
3396 "dependency. You will need to pip install "
3397 "botocore[crt] before proceeding."
3398 )
3400 config = context.get('client_config')
3401 endpoint_variant_tags = None
3402 if config is not None:
3403 if config.use_fips_endpoint:
3404 raise InvalidEndpointConfigurationError(
3405 msg="FIPS is not supported with EventBridge "
3406 "multi-region endpoints."
3407 )
3408 if config.use_dualstack_endpoint:
3409 endpoint_variant_tags = ['dualstack']
3411 if self._endpoint_url is None:
3412 # Validate endpoint is a valid hostname component
3413 parts = urlparse(f'https://{endpoint}')
3414 if parts.hostname != endpoint:
3415 raise InvalidEndpointConfigurationError(
3416 msg='EndpointId is not a valid hostname component.'
3417 )
3418 resolved_endpoint = self._get_global_endpoint(
3419 endpoint, endpoint_variant_tags=endpoint_variant_tags
3420 )
3421 else:
3422 resolved_endpoint = self._endpoint_url
3424 context['eventbridge_endpoint'] = resolved_endpoint
3425 context['auth_type'] = 'v4a'
3427 def _get_global_endpoint(self, endpoint, endpoint_variant_tags=None):
3428 resolver = self._endpoint_resolver
3430 partition = resolver.get_partition_for_region(self._region)
3431 if partition is None:
3432 partition = self._DEFAULT_PARTITION
3433 dns_suffix = resolver.get_partition_dns_suffix(
3434 partition, endpoint_variant_tags=endpoint_variant_tags
3435 )
3436 if dns_suffix is None:
3437 dns_suffix = self._DEFAULT_DNS_SUFFIX
3439 return f"https://{endpoint}.endpoint.events.{dns_suffix}/"
3442def is_s3_accelerate_url(url):
3443 """Does the URL match the S3 Accelerate endpoint scheme?
3445 Virtual host naming style with bucket names in the netloc part of the URL
3446 are not allowed by this function.
3447 """
3448 if url is None:
3449 return False
3451 # Accelerate is only valid for Amazon endpoints.
3452 url_parts = urlsplit(url)
3453 if not url_parts.netloc.endswith(
3454 'amazonaws.com'
3455 ) or url_parts.scheme not in ['https', 'http']:
3456 return False
3458 # The first part of the URL must be s3-accelerate.
3459 parts = url_parts.netloc.split('.')
3460 if parts[0] != 's3-accelerate':
3461 return False
3463 # Url parts between 's3-accelerate' and 'amazonaws.com' which
3464 # represent different url features.
3465 feature_parts = parts[1:-2]
3467 # There should be no duplicate URL parts.
3468 if len(feature_parts) != len(set(feature_parts)):
3469 return False
3471 # Remaining parts must all be in the whitelist.
3472 return all(p in S3_ACCELERATE_WHITELIST for p in feature_parts)
3475class JSONFileCache:
3476 """JSON file cache.
3477 This provides a dict like interface that stores JSON serializable
3478 objects.
3479 The objects are serialized to JSON and stored in a file. These
3480 values can be retrieved at a later time.
3481 """
3483 CACHE_DIR = os.path.expanduser(os.path.join('~', '.aws', 'boto', 'cache'))
3485 def __init__(self, working_dir=CACHE_DIR, dumps_func=None):
3486 self._working_dir = working_dir
3487 if dumps_func is None:
3488 dumps_func = self._default_dumps
3489 self._dumps = dumps_func
3491 def _default_dumps(self, obj):
3492 return json.dumps(obj, default=self._serialize_if_needed)
3494 def __contains__(self, cache_key):
3495 actual_key = self._convert_cache_key(cache_key)
3496 return os.path.isfile(actual_key)
3498 def __getitem__(self, cache_key):
3499 """Retrieve value from a cache key."""
3500 actual_key = self._convert_cache_key(cache_key)
3501 try:
3502 with open(actual_key) as f:
3503 return json.load(f)
3504 except (OSError, ValueError):
3505 raise KeyError(cache_key)
3507 def __delitem__(self, cache_key):
3508 actual_key = self._convert_cache_key(cache_key)
3509 try:
3510 key_path = Path(actual_key)
3511 key_path.unlink()
3512 except FileNotFoundError:
3513 raise KeyError(cache_key)
3515 def __setitem__(self, cache_key, value):
3516 full_key = self._convert_cache_key(cache_key)
3517 try:
3518 file_content = self._dumps(value)
3519 except (TypeError, ValueError):
3520 raise ValueError(
3521 f"Value cannot be cached, must be "
3522 f"JSON serializable: {value}"
3523 )
3524 if not os.path.isdir(self._working_dir):
3525 os.makedirs(self._working_dir)
3526 with os.fdopen(
3527 os.open(full_key, os.O_WRONLY | os.O_CREAT, 0o600), 'w'
3528 ) as f:
3529 f.truncate()
3530 f.write(file_content)
3532 def _convert_cache_key(self, cache_key):
3533 full_path = os.path.join(self._working_dir, cache_key + '.json')
3534 return full_path
3536 def _serialize_if_needed(self, value, iso=False):
3537 if isinstance(value, _DatetimeClass):
3538 if iso:
3539 return value.isoformat()
3540 return value.strftime('%Y-%m-%dT%H:%M:%S%Z')
3541 return value
3544def is_s3express_bucket(bucket):
3545 if bucket is None:
3546 return False
3547 return bucket.endswith('--x-s3')
3550# This parameter is not part of the public interface and is subject to abrupt
3551# breaking changes or removal without prior announcement.
3552# Mapping of services that have been renamed for backwards compatibility reasons.
3553# Keys are the previous name that should be allowed, values are the documented
3554# and preferred client name.
3555SERVICE_NAME_ALIASES = {'runtime.sagemaker': 'sagemaker-runtime'}
3558# This parameter is not part of the public interface and is subject to abrupt
3559# breaking changes or removal without prior announcement.
3560# Mapping to determine the service ID for services that do not use it as the
3561# model data directory name. The keys are the data directory name and the
3562# values are the transformed service IDs (lower case and hyphenated).
3563CLIENT_NAME_TO_HYPHENIZED_SERVICE_ID_OVERRIDES = {
3564 # Actual service name we use -> Allowed computed service name.
3565 'apigateway': 'api-gateway',
3566 'application-autoscaling': 'application-auto-scaling',
3567 'appmesh': 'app-mesh',
3568 'autoscaling': 'auto-scaling',
3569 'autoscaling-plans': 'auto-scaling-plans',
3570 'ce': 'cost-explorer',
3571 'cloudhsmv2': 'cloudhsm-v2',
3572 'cloudsearchdomain': 'cloudsearch-domain',
3573 'cognito-idp': 'cognito-identity-provider',
3574 'config': 'config-service',
3575 'cur': 'cost-and-usage-report-service',
3576 'datapipeline': 'data-pipeline',
3577 'directconnect': 'direct-connect',
3578 'devicefarm': 'device-farm',
3579 'discovery': 'application-discovery-service',
3580 'dms': 'database-migration-service',
3581 'ds': 'directory-service',
3582 'dynamodbstreams': 'dynamodb-streams',
3583 'elasticbeanstalk': 'elastic-beanstalk',
3584 'elastictranscoder': 'elastic-transcoder',
3585 'elb': 'elastic-load-balancing',
3586 'elbv2': 'elastic-load-balancing-v2',
3587 'es': 'elasticsearch-service',
3588 'events': 'eventbridge',
3589 'globalaccelerator': 'global-accelerator',
3590 'iot-data': 'iot-data-plane',
3591 'iot-jobs-data': 'iot-jobs-data-plane',
3592 'iot1click-devices': 'iot-1click-devices-service',
3593 'iot1click-projects': 'iot-1click-projects',
3594 'iotevents-data': 'iot-events-data',
3595 'iotevents': 'iot-events',
3596 'iotwireless': 'iot-wireless',
3597 'kinesisanalytics': 'kinesis-analytics',
3598 'kinesisanalyticsv2': 'kinesis-analytics-v2',
3599 'kinesisvideo': 'kinesis-video',
3600 'lex-models': 'lex-model-building-service',
3601 'lexv2-models': 'lex-models-v2',
3602 'lex-runtime': 'lex-runtime-service',
3603 'lexv2-runtime': 'lex-runtime-v2',
3604 'logs': 'cloudwatch-logs',
3605 'machinelearning': 'machine-learning',
3606 'marketplacecommerceanalytics': 'marketplace-commerce-analytics',
3607 'marketplace-entitlement': 'marketplace-entitlement-service',
3608 'meteringmarketplace': 'marketplace-metering',
3609 'mgh': 'migration-hub',
3610 'sms-voice': 'pinpoint-sms-voice',
3611 'resourcegroupstaggingapi': 'resource-groups-tagging-api',
3612 'route53': 'route-53',
3613 'route53domains': 'route-53-domains',
3614 's3control': 's3-control',
3615 'sdb': 'simpledb',
3616 'secretsmanager': 'secrets-manager',
3617 'serverlessrepo': 'serverlessapplicationrepository',
3618 'servicecatalog': 'service-catalog',
3619 'servicecatalog-appregistry': 'service-catalog-appregistry',
3620 'stepfunctions': 'sfn',
3621 'storagegateway': 'storage-gateway',
3622}