Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/botocore/utils.py: 20%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

1730 statements  

1# Copyright 2012-2014 Amazon.com, Inc. or its affiliates. All Rights Reserved. 

2# 

3# Licensed under the Apache License, Version 2.0 (the "License"). You 

4# may not use this file except in compliance with the License. A copy of 

5# the License is located at 

6# 

7# http://aws.amazon.com/apache2.0/ 

8# 

9# or in the "license" file accompanying this file. This file is 

10# distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF 

11# ANY KIND, either express or implied. See the License for the specific 

12# language governing permissions and limitations under the License. 

13import base64 

14import binascii 

15import datetime 

16import email.message 

17import functools 

18import hashlib 

19import io 

20import logging 

21import os 

22import random 

23import re 

24import socket 

25import tempfile 

26import time 

27import warnings 

28import weakref 

29from datetime import datetime as _DatetimeClass 

30from ipaddress import ip_address 

31from pathlib import Path 

32from urllib.request import getproxies, proxy_bypass 

33 

34import dateutil.parser 

35from dateutil.tz import tzutc 

36from urllib3.exceptions import LocationParseError 

37 

38import botocore 

39import botocore.awsrequest 

40import botocore.httpsession 

41 

42# IP Regexes retained for backwards compatibility 

43from botocore.compat import ( 

44 HAS_CRT, 

45 HEX_PAT, # noqa: F401 

46 IPV4_PAT, # noqa: F401 

47 IPV4_RE, 

48 IPV6_ADDRZ_PAT, # noqa: F401 

49 IPV6_ADDRZ_RE, 

50 IPV6_PAT, # noqa: F401 

51 LS32_PAT, # noqa: F401 

52 MD5_AVAILABLE, 

53 UNRESERVED_PAT, # noqa: F401 

54 UNSAFE_URL_CHARS, 

55 ZONE_ID_PAT, # noqa: F401 

56 OrderedDict, 

57 get_current_datetime, 

58 get_md5, 

59 get_tzinfo_options, 

60 json, 

61 quote, 

62 urlparse, 

63 urlsplit, 

64 urlunsplit, 

65 zip_longest, 

66) 

67from botocore.exceptions import ( 

68 ClientError, 

69 ConfigNotFound, 

70 ConnectionClosedError, 

71 ConnectTimeoutError, 

72 EndpointConnectionError, 

73 HTTPClientError, 

74 InvalidDNSNameError, 

75 InvalidEndpointConfigurationError, 

76 InvalidExpressionError, 

77 InvalidHostLabelError, 

78 InvalidIMDSEndpointError, 

79 InvalidIMDSEndpointModeError, 

80 InvalidRegionError, 

81 MetadataRetrievalError, 

82 MissingDependencyException, 

83 ReadTimeoutError, 

84 SSOTokenLoadError, 

85 UnsupportedOutpostResourceError, 

86 UnsupportedS3AccesspointConfigurationError, 

87 UnsupportedS3ArnError, 

88 UnsupportedS3ConfigurationError, 

89 UnsupportedS3ControlArnError, 

90 UnsupportedS3ControlConfigurationError, 

91) 

92from botocore.plugin import ( 

93 PluginContext, 

94 reset_plugin_context, 

95 set_plugin_context, 

96) 

97 

98logger = logging.getLogger(__name__) 

99DEFAULT_METADATA_SERVICE_TIMEOUT = 1 

100METADATA_BASE_URL = 'http://169.254.169.254/' 

101METADATA_BASE_URL_IPv6 = 'http://[fd00:ec2::254]/' 

102METADATA_ENDPOINT_MODES = ('ipv4', 'ipv6') 

103 

104# These are chars that do not need to be urlencoded. 

105# Based on rfc2986, section 2.3 

106SAFE_CHARS = '-._~' 

107LABEL_RE = re.compile(r'[a-z0-9][a-z0-9\-]*[a-z0-9]') 

108RETRYABLE_HTTP_ERRORS = ( 

109 ReadTimeoutError, 

110 EndpointConnectionError, 

111 ConnectionClosedError, 

112 ConnectTimeoutError, 

113) 

114S3_ACCELERATE_WHITELIST = ['dualstack'] 

115# In switching events from using service name / endpoint prefix to service 

116# id, we have to preserve compatibility. This maps the instances where either 

117# is different than the transformed service id. 

118EVENT_ALIASES = { 

119 "api.mediatailor": "mediatailor", 

120 "api.pricing": "pricing", 

121 "api.sagemaker": "sagemaker", 

122 "apigateway": "api-gateway", 

123 "application-autoscaling": "application-auto-scaling", 

124 "appstream2": "appstream", 

125 "autoscaling": "auto-scaling", 

126 "autoscaling-plans": "auto-scaling-plans", 

127 "ce": "cost-explorer", 

128 "cloudhsmv2": "cloudhsm-v2", 

129 "cloudsearchdomain": "cloudsearch-domain", 

130 "cognito-idp": "cognito-identity-provider", 

131 "config": "config-service", 

132 "cur": "cost-and-usage-report-service", 

133 "data.iot": "iot-data-plane", 

134 "data.jobs.iot": "iot-jobs-data-plane", 

135 "data.mediastore": "mediastore-data", 

136 "datapipeline": "data-pipeline", 

137 "devicefarm": "device-farm", 

138 "directconnect": "direct-connect", 

139 "discovery": "application-discovery-service", 

140 "dms": "database-migration-service", 

141 "ds": "directory-service", 

142 "dynamodbstreams": "dynamodb-streams", 

143 "elasticbeanstalk": "elastic-beanstalk", 

144 "elasticfilesystem": "efs", 

145 "elasticloadbalancing": "elastic-load-balancing", 

146 "elasticmapreduce": "emr", 

147 "elb": "elastic-load-balancing", 

148 "elbv2": "elastic-load-balancing-v2", 

149 "email": "ses", 

150 "entitlement.marketplace": "marketplace-entitlement-service", 

151 "es": "elasticsearch-service", 

152 "events": "eventbridge", 

153 "cloudwatch-events": "eventbridge", 

154 "iot-data": "iot-data-plane", 

155 "iot-jobs-data": "iot-jobs-data-plane", 

156 "kinesisanalytics": "kinesis-analytics", 

157 "kinesisvideo": "kinesis-video", 

158 "lex-models": "lex-model-building-service", 

159 "lex-runtime": "lex-runtime-service", 

160 "logs": "cloudwatch-logs", 

161 "machinelearning": "machine-learning", 

162 "marketplace-entitlement": "marketplace-entitlement-service", 

163 "marketplacecommerceanalytics": "marketplace-commerce-analytics", 

164 "metering.marketplace": "marketplace-metering", 

165 "meteringmarketplace": "marketplace-metering", 

166 "mgh": "migration-hub", 

167 "models.lex": "lex-model-building-service", 

168 "monitoring": "cloudwatch", 

169 "mturk-requester": "mturk", 

170 "resourcegroupstaggingapi": "resource-groups-tagging-api", 

171 "route53": "route-53", 

172 "route53domains": "route-53-domains", 

173 "runtime.lex": "lex-runtime-service", 

174 "runtime.sagemaker": "sagemaker-runtime", 

175 "sdb": "simpledb", 

176 "secretsmanager": "secrets-manager", 

177 "serverlessrepo": "serverlessapplicationrepository", 

178 "servicecatalog": "service-catalog", 

179 "states": "sfn", 

180 "stepfunctions": "sfn", 

181 "storagegateway": "storage-gateway", 

182 "streams.dynamodb": "dynamodb-streams", 

183 "tagging": "resource-groups-tagging-api", 

184} 

185 

186 

187# This pattern can be used to detect if a header is a flexible checksum header 

188CHECKSUM_HEADER_PATTERN = re.compile( 

189 r'^X-Amz-Checksum-([a-z0-9]*)$', 

190 flags=re.IGNORECASE, 

191) 

192 

193PRIORITY_ORDERED_SUPPORTED_PROTOCOLS = ( 

194 'json', 

195 'rest-json', 

196 'rest-xml', 

197 'smithy-rpc-v2-cbor', 

198 'query', 

199 'ec2', 

200) 

201 

202 

203def ensure_boolean(val): 

204 """Ensures a boolean value if a string or boolean is provided 

205 

206 For strings, the value for True/False is case insensitive 

207 """ 

208 if isinstance(val, bool): 

209 return val 

210 elif isinstance(val, str): 

211 return val.lower() == 'true' 

212 else: 

213 return False 

214 

215 

216def resolve_imds_endpoint_mode(session): 

217 """Resolving IMDS endpoint mode to either IPv6 or IPv4. 

218 

219 ec2_metadata_service_endpoint_mode takes precedence over imds_use_ipv6. 

220 """ 

221 endpoint_mode = session.get_config_variable( 

222 'ec2_metadata_service_endpoint_mode' 

223 ) 

224 if endpoint_mode is not None: 

225 lendpoint_mode = endpoint_mode.lower() 

226 if lendpoint_mode not in METADATA_ENDPOINT_MODES: 

227 error_msg_kwargs = { 

228 'mode': endpoint_mode, 

229 'valid_modes': METADATA_ENDPOINT_MODES, 

230 } 

231 raise InvalidIMDSEndpointModeError(**error_msg_kwargs) 

232 return lendpoint_mode 

233 elif session.get_config_variable('imds_use_ipv6'): 

234 return 'ipv6' 

235 return 'ipv4' 

236 

237 

238def is_json_value_header(shape): 

239 """Determines if the provided shape is the special header type jsonvalue. 

240 

241 :type shape: botocore.shape 

242 :param shape: Shape to be inspected for the jsonvalue trait. 

243 

244 :return: True if this type is a jsonvalue, False otherwise 

245 :rtype: Bool 

246 """ 

247 return ( 

248 hasattr(shape, 'serialization') 

249 and shape.serialization.get('jsonvalue', False) 

250 and shape.serialization.get('location') == 'header' 

251 and shape.type_name == 'string' 

252 ) 

253 

254 

255def has_header(header_name, headers): 

256 """Case-insensitive check for header key.""" 

257 if header_name is None: 

258 return False 

259 elif isinstance(headers, botocore.awsrequest.HeadersDict): 

260 return header_name in headers 

261 else: 

262 return header_name.lower() in [key.lower() for key in headers.keys()] 

263 

264 

265def get_service_module_name(service_model): 

266 """Returns the module name for a service 

267 

268 This is the value used in both the documentation and client class name 

269 """ 

270 name = service_model.metadata.get( 

271 'serviceAbbreviation', 

272 service_model.metadata.get( 

273 'serviceFullName', service_model.service_name 

274 ), 

275 ) 

276 name = name.replace('Amazon', '') 

277 name = name.replace('AWS', '') 

278 name = re.sub(r'\W+', '', name) 

279 return name 

280 

281 

282def normalize_url_path(path): 

283 if not path: 

284 return '/' 

285 return remove_dot_segments(path) 

286 

287 

288def normalize_boolean(val): 

289 """Returns None if val is None, otherwise ensure value 

290 converted to boolean""" 

291 if val is None: 

292 return val 

293 else: 

294 return ensure_boolean(val) 

295 

296 

297def remove_dot_segments(url): 

298 # RFC 3986, section 5.2.4 "Remove Dot Segments" 

299 # Also, AWS services require consecutive slashes to be removed, 

300 # so that's done here as well 

301 if not url: 

302 return '' 

303 input_url = url.split('/') 

304 output_list = [] 

305 for x in input_url: 

306 if x and x != '.': 

307 if x == '..': 

308 if output_list: 

309 output_list.pop() 

310 else: 

311 output_list.append(x) 

312 

313 if url[0] == '/': 

314 first = '/' 

315 else: 

316 first = '' 

317 if url[-1] == '/' and output_list: 

318 last = '/' 

319 else: 

320 last = '' 

321 return first + '/'.join(output_list) + last 

322 

323 

324def validate_jmespath_for_set(expression): 

325 # Validates a limited jmespath expression to determine if we can set a 

326 # value based on it. Only works with dotted paths. 

327 if not expression or expression == '.': 

328 raise InvalidExpressionError(expression=expression) 

329 

330 for invalid in ['[', ']', '*']: 

331 if invalid in expression: 

332 raise InvalidExpressionError(expression=expression) 

333 

334 

335def set_value_from_jmespath(source, expression, value, is_first=True): 

336 # This takes a (limited) jmespath-like expression & can set a value based 

337 # on it. 

338 # Limitations: 

339 # * Only handles dotted lookups 

340 # * No offsets/wildcards/slices/etc. 

341 if is_first: 

342 validate_jmespath_for_set(expression) 

343 

344 bits = expression.split('.', 1) 

345 current_key, remainder = bits[0], bits[1] if len(bits) > 1 else '' 

346 

347 if not current_key: 

348 raise InvalidExpressionError(expression=expression) 

349 

350 if remainder: 

351 if current_key not in source: 

352 # We've got something in the expression that's not present in the 

353 # source (new key). If there's any more bits, we'll set the key 

354 # with an empty dictionary. 

355 source[current_key] = {} 

356 

357 return set_value_from_jmespath( 

358 source[current_key], remainder, value, is_first=False 

359 ) 

360 

361 # If we're down to a single key, set it. 

362 source[current_key] = value 

363 

364 

365def is_global_accesspoint(context): 

366 """Determine if request is intended for an MRAP accesspoint.""" 

367 s3_accesspoint = context.get('s3_accesspoint', {}) 

368 is_global = s3_accesspoint.get('region') == '' 

369 return is_global 

370 

371 

372def create_nested_client(session, service_name, **kwargs): 

373 # If a client is created from within a plugin based on the environment variable, 

374 # an infinite loop could arise. Any clients created from within another client 

375 # must use this method to prevent infinite loops. 

376 ctx = PluginContext(plugins="DISABLED") 

377 token = set_plugin_context(ctx) 

378 try: 

379 return session.create_client(service_name, **kwargs) 

380 finally: 

381 reset_plugin_context(token) 

382 

383 

384class _RetriesExceededError(Exception): 

385 """Internal exception used when the number of retries are exceeded.""" 

386 

387 pass 

388 

389 

390class BadIMDSRequestError(Exception): 

391 def __init__(self, request): 

392 self.request = request 

393 

394 

395class IMDSFetcher: 

396 _RETRIES_EXCEEDED_ERROR_CLS = _RetriesExceededError 

397 _TOKEN_PATH = 'latest/api/token' 

398 _TOKEN_TTL = '21600' 

399 

400 def __init__( 

401 self, 

402 timeout=DEFAULT_METADATA_SERVICE_TIMEOUT, 

403 num_attempts=1, 

404 base_url=METADATA_BASE_URL, 

405 env=None, 

406 user_agent=None, 

407 config=None, 

408 ): 

409 self._timeout = timeout 

410 self._num_attempts = num_attempts 

411 if config is None: 

412 config = {} 

413 self._base_url = self._select_base_url(base_url, config) 

414 self._config = config 

415 

416 if env is None: 

417 env = os.environ.copy() 

418 self._disabled = ( 

419 env.get('AWS_EC2_METADATA_DISABLED', 'false').lower() == 'true' 

420 ) 

421 self._imds_v1_disabled = config.get('ec2_metadata_v1_disabled') 

422 self._user_agent = user_agent 

423 self._session = botocore.httpsession.URLLib3Session( 

424 timeout=self._timeout, 

425 proxies=get_environ_proxies(self._base_url), 

426 ) 

427 

428 def get_base_url(self): 

429 return self._base_url 

430 

431 def _select_base_url(self, base_url, config): 

432 if config is None: 

433 config = {} 

434 

435 requires_ipv6 = ( 

436 config.get('ec2_metadata_service_endpoint_mode') == 'ipv6' 

437 ) 

438 custom_metadata_endpoint = config.get('ec2_metadata_service_endpoint') 

439 

440 if requires_ipv6 and custom_metadata_endpoint: 

441 logger.warning( 

442 "Custom endpoint and IMDS_USE_IPV6 are both set. Using custom endpoint." 

443 ) 

444 

445 chosen_base_url = None 

446 

447 if base_url != METADATA_BASE_URL: 

448 chosen_base_url = base_url 

449 elif custom_metadata_endpoint: 

450 chosen_base_url = custom_metadata_endpoint 

451 elif requires_ipv6: 

452 chosen_base_url = METADATA_BASE_URL_IPv6 

453 else: 

454 chosen_base_url = METADATA_BASE_URL 

455 

456 logger.debug("IMDS ENDPOINT: %s", chosen_base_url) 

457 if not is_valid_uri(chosen_base_url): 

458 raise InvalidIMDSEndpointError(endpoint=chosen_base_url) 

459 

460 return chosen_base_url 

461 

462 def _construct_url(self, path): 

463 sep = '' 

464 if self._base_url and not self._base_url.endswith('/'): 

465 sep = '/' 

466 return f'{self._base_url}{sep}{path}' 

467 

468 def _fetch_metadata_token(self): 

469 self._assert_enabled() 

470 url = self._construct_url(self._TOKEN_PATH) 

471 headers = { 

472 'x-aws-ec2-metadata-token-ttl-seconds': self._TOKEN_TTL, 

473 } 

474 self._add_user_agent(headers) 

475 request = botocore.awsrequest.AWSRequest( 

476 method='PUT', url=url, headers=headers 

477 ) 

478 for i in range(self._num_attempts): 

479 try: 

480 response = self._session.send(request.prepare()) 

481 if response.status_code == 200: 

482 return response.text 

483 elif response.status_code in (404, 403, 405): 

484 return None 

485 elif response.status_code in (400,): 

486 raise BadIMDSRequestError(request) 

487 except ReadTimeoutError: 

488 return None 

489 except RETRYABLE_HTTP_ERRORS as e: 

490 logger.debug( 

491 "Caught retryable HTTP exception while making metadata " 

492 "service request to %s: %s", 

493 url, 

494 e, 

495 exc_info=True, 

496 ) 

497 except HTTPClientError as e: 

498 if isinstance(e.kwargs.get('error'), LocationParseError): 

499 raise InvalidIMDSEndpointError(endpoint=url, error=e) 

500 else: 

501 raise 

502 return None 

503 

504 def _get_request(self, url_path, retry_func, token=None): 

505 """Make a get request to the Instance Metadata Service. 

506 

507 :type url_path: str 

508 :param url_path: The path component of the URL to make a get request. 

509 This arg is appended to the base_url that was provided in the 

510 initializer. 

511 

512 :type retry_func: callable 

513 :param retry_func: A function that takes the response as an argument 

514 and determines if it needs to retry. By default empty and non 

515 200 OK responses are retried. 

516 

517 :type token: str 

518 :param token: Metadata token to send along with GET requests to IMDS. 

519 """ 

520 self._assert_enabled() 

521 if not token: 

522 self._assert_v1_enabled() 

523 if retry_func is None: 

524 retry_func = self._default_retry 

525 url = self._construct_url(url_path) 

526 headers = {} 

527 if token is not None: 

528 headers['x-aws-ec2-metadata-token'] = token 

529 self._add_user_agent(headers) 

530 for i in range(self._num_attempts): 

531 try: 

532 request = botocore.awsrequest.AWSRequest( 

533 method='GET', url=url, headers=headers 

534 ) 

535 response = self._session.send(request.prepare()) 

536 if not retry_func(response): 

537 return response 

538 except RETRYABLE_HTTP_ERRORS as e: 

539 logger.debug( 

540 "Caught retryable HTTP exception while making metadata " 

541 "service request to %s: %s", 

542 url, 

543 e, 

544 exc_info=True, 

545 ) 

546 raise self._RETRIES_EXCEEDED_ERROR_CLS() 

547 

548 def _add_user_agent(self, headers): 

549 if self._user_agent is not None: 

550 headers['User-Agent'] = self._user_agent 

551 

552 def _assert_enabled(self): 

553 if self._disabled: 

554 logger.debug("Access to EC2 metadata has been disabled.") 

555 raise self._RETRIES_EXCEEDED_ERROR_CLS() 

556 

557 def _assert_v1_enabled(self): 

558 if self._imds_v1_disabled: 

559 raise MetadataRetrievalError( 

560 error_msg="Unable to retrieve token for use in IMDSv2 call and IMDSv1 has been disabled" 

561 ) 

562 

563 def _default_retry(self, response): 

564 return self._is_non_ok_response(response) or self._is_empty(response) 

565 

566 def _is_non_ok_response(self, response): 

567 if response.status_code != 200: 

568 self._log_imds_response(response, 'non-200', log_body=True) 

569 return True 

570 return False 

571 

572 def _is_empty(self, response): 

573 if not response.content: 

574 self._log_imds_response(response, 'no body', log_body=True) 

575 return True 

576 return False 

577 

578 def _log_imds_response(self, response, reason_to_log, log_body=False): 

579 statement = ( 

580 "Metadata service returned %s response " 

581 "with status code of %s for url: %s" 

582 ) 

583 logger_args = [reason_to_log, response.status_code, response.url] 

584 if log_body: 

585 statement += ", content body: %s" 

586 logger_args.append(response.content) 

587 logger.debug(statement, *logger_args) 

588 

589 

590class InstanceMetadataFetcher(IMDSFetcher): 

591 _URL_PATH = 'latest/meta-data/iam/security-credentials/' 

592 _REQUIRED_CREDENTIAL_FIELDS = [ 

593 'AccessKeyId', 

594 'SecretAccessKey', 

595 'Token', 

596 'Expiration', 

597 ] 

598 

599 def retrieve_iam_role_credentials(self): 

600 try: 

601 token = self._fetch_metadata_token() 

602 role_name = self._get_iam_role(token) 

603 credentials = self._get_credentials(role_name, token) 

604 if self._contains_all_credential_fields(credentials): 

605 credentials = { 

606 'role_name': role_name, 

607 'access_key': credentials['AccessKeyId'], 

608 'secret_key': credentials['SecretAccessKey'], 

609 'token': credentials['Token'], 

610 'expiry_time': credentials['Expiration'], 

611 } 

612 self._evaluate_expiration(credentials) 

613 return credentials 

614 else: 

615 # IMDS can return a 200 response that has a JSON formatted 

616 # error message (i.e. if ec2 is not trusted entity for the 

617 # attached role). We do not necessarily want to retry for 

618 # these and we also do not necessarily want to raise a key 

619 # error. So at least log the problematic response and return 

620 # an empty dictionary to signal that it was not able to 

621 # retrieve credentials. These error will contain both a 

622 # Code and Message key. 

623 if 'Code' in credentials and 'Message' in credentials: 

624 logger.debug( 

625 'Error response received when retrieving' 

626 'credentials: %s.', 

627 credentials, 

628 ) 

629 return {} 

630 except self._RETRIES_EXCEEDED_ERROR_CLS: 

631 logger.debug( 

632 "Max number of attempts exceeded (%s) when " 

633 "attempting to retrieve data from metadata service.", 

634 self._num_attempts, 

635 ) 

636 except BadIMDSRequestError as e: 

637 logger.debug("Bad IMDS request: %s", e.request) 

638 return {} 

639 

640 def _get_iam_role(self, token=None): 

641 return self._get_request( 

642 url_path=self._URL_PATH, 

643 retry_func=self._needs_retry_for_role_name, 

644 token=token, 

645 ).text 

646 

647 def _get_credentials(self, role_name, token=None): 

648 r = self._get_request( 

649 url_path=self._URL_PATH + role_name, 

650 retry_func=self._needs_retry_for_credentials, 

651 token=token, 

652 ) 

653 return json.loads(r.text) 

654 

655 def _is_invalid_json(self, response): 

656 try: 

657 json.loads(response.text) 

658 return False 

659 except ValueError: 

660 self._log_imds_response(response, 'invalid json') 

661 return True 

662 

663 def _needs_retry_for_role_name(self, response): 

664 return self._is_non_ok_response(response) or self._is_empty(response) 

665 

666 def _needs_retry_for_credentials(self, response): 

667 return ( 

668 self._is_non_ok_response(response) 

669 or self._is_empty(response) 

670 or self._is_invalid_json(response) 

671 ) 

672 

673 def _contains_all_credential_fields(self, credentials): 

674 for field in self._REQUIRED_CREDENTIAL_FIELDS: 

675 if field not in credentials: 

676 logger.debug( 

677 'Retrieved credentials is missing required field: %s', 

678 field, 

679 ) 

680 return False 

681 return True 

682 

683 def _evaluate_expiration(self, credentials): 

684 expiration = credentials.get("expiry_time") 

685 if expiration is None: 

686 return 

687 try: 

688 expiration = datetime.datetime.strptime( 

689 expiration, "%Y-%m-%dT%H:%M:%SZ" 

690 ) 

691 refresh_interval = self._config.get( 

692 "ec2_credential_refresh_window", 60 * 10 

693 ) 

694 jitter = random.randint(120, 600) # Between 2 to 10 minutes 

695 refresh_interval_with_jitter = refresh_interval + jitter 

696 current_time = get_current_datetime() 

697 refresh_offset = datetime.timedelta( 

698 seconds=refresh_interval_with_jitter 

699 ) 

700 extension_time = expiration - refresh_offset 

701 if current_time >= extension_time: 

702 new_time = current_time + refresh_offset 

703 credentials["expiry_time"] = new_time.strftime( 

704 "%Y-%m-%dT%H:%M:%SZ" 

705 ) 

706 logger.info( 

707 "Attempting credential expiration extension due to a " 

708 "credential service availability issue. A refresh of " 

709 "these credentials will be attempted again within " 

710 "the next %.0f minutes.", 

711 refresh_interval_with_jitter / 60, 

712 ) 

713 except ValueError: 

714 logger.debug( 

715 "Unable to parse expiry_time in %s", credentials['expiry_time'] 

716 ) 

717 

718 

719class IMDSRegionProvider: 

720 def __init__(self, session, environ=None, fetcher=None): 

721 """Initialize IMDSRegionProvider. 

722 :type session: :class:`botocore.session.Session` 

723 :param session: The session is needed to look up configuration for 

724 how to contact the instance metadata service. Specifically the 

725 whether or not it should use the IMDS region at all, and if so how 

726 to configure the timeout and number of attempts to reach the 

727 service. 

728 :type environ: None or dict 

729 :param environ: A dictionary of environment variables to use. If 

730 ``None`` is the argument then ``os.environ`` will be used by 

731 default. 

732 :type fecther: :class:`botocore.utils.InstanceMetadataRegionFetcher` 

733 :param fetcher: The class to actually handle the fetching of the region 

734 from the IMDS. If not provided a default one will be created. 

735 """ 

736 self._session = session 

737 if environ is None: 

738 environ = os.environ 

739 self._environ = environ 

740 self._fetcher = fetcher 

741 

742 def provide(self): 

743 """Provide the region value from IMDS.""" 

744 instance_region = self._get_instance_metadata_region() 

745 return instance_region 

746 

747 def _get_instance_metadata_region(self): 

748 fetcher = self._get_fetcher() 

749 region = fetcher.retrieve_region() 

750 return region 

751 

752 def _get_fetcher(self): 

753 if self._fetcher is None: 

754 self._fetcher = self._create_fetcher() 

755 return self._fetcher 

756 

757 def _create_fetcher(self): 

758 metadata_timeout = self._session.get_config_variable( 

759 'metadata_service_timeout' 

760 ) 

761 metadata_num_attempts = self._session.get_config_variable( 

762 'metadata_service_num_attempts' 

763 ) 

764 imds_config = { 

765 'ec2_metadata_service_endpoint': self._session.get_config_variable( 

766 'ec2_metadata_service_endpoint' 

767 ), 

768 'ec2_metadata_service_endpoint_mode': resolve_imds_endpoint_mode( 

769 self._session 

770 ), 

771 'ec2_metadata_v1_disabled': self._session.get_config_variable( 

772 'ec2_metadata_v1_disabled' 

773 ), 

774 } 

775 fetcher = InstanceMetadataRegionFetcher( 

776 timeout=metadata_timeout, 

777 num_attempts=metadata_num_attempts, 

778 env=self._environ, 

779 user_agent=self._session.user_agent(), 

780 config=imds_config, 

781 ) 

782 return fetcher 

783 

784 

785class InstanceMetadataRegionFetcher(IMDSFetcher): 

786 _URL_PATH = 'latest/meta-data/placement/availability-zone/' 

787 

788 def retrieve_region(self): 

789 """Get the current region from the instance metadata service. 

790 :rvalue: str 

791 :returns: The region the current instance is running in or None 

792 if the instance metadata service cannot be contacted or does not 

793 give a valid response. 

794 :rtype: None or str 

795 :returns: Returns the region as a string if it is configured to use 

796 IMDS as a region source. Otherwise returns ``None``. It will also 

797 return ``None`` if it fails to get the region from IMDS due to 

798 exhausting its retries or not being able to connect. 

799 """ 

800 try: 

801 region = self._get_region() 

802 return region 

803 except self._RETRIES_EXCEEDED_ERROR_CLS: 

804 logger.debug( 

805 "Max number of attempts exceeded (%s) when " 

806 "attempting to retrieve data from metadata service.", 

807 self._num_attempts, 

808 ) 

809 return None 

810 

811 def _get_region(self): 

812 token = self._fetch_metadata_token() 

813 response = self._get_request( 

814 url_path=self._URL_PATH, 

815 retry_func=self._default_retry, 

816 token=token, 

817 ) 

818 availability_zone = response.text 

819 region = availability_zone[:-1] 

820 return region 

821 

822 

823def merge_dicts(dict1, dict2, append_lists=False): 

824 """Given two dict, merge the second dict into the first. 

825 

826 The dicts can have arbitrary nesting. 

827 

828 :param append_lists: If true, instead of clobbering a list with the new 

829 value, append all of the new values onto the original list. 

830 """ 

831 for key in dict2: 

832 if isinstance(dict2[key], dict): 

833 if key in dict1 and key in dict2: 

834 merge_dicts(dict1[key], dict2[key]) 

835 else: 

836 dict1[key] = dict2[key] 

837 # If the value is a list and the ``append_lists`` flag is set, 

838 # append the new values onto the original list 

839 elif isinstance(dict2[key], list) and append_lists: 

840 # The value in dict1 must be a list in order to append new 

841 # values onto it. 

842 if key in dict1 and isinstance(dict1[key], list): 

843 dict1[key].extend(dict2[key]) 

844 else: 

845 dict1[key] = dict2[key] 

846 else: 

847 # At scalar types, we iterate and merge the 

848 # current dict that we're on. 

849 dict1[key] = dict2[key] 

850 

851 

852def lowercase_dict(original): 

853 """Copies the given dictionary ensuring all keys are lowercase strings.""" 

854 copy = {} 

855 for key in original: 

856 copy[key.lower()] = original[key] 

857 return copy 

858 

859 

860def parse_key_val_file(filename, _open=open): 

861 try: 

862 with _open(filename) as f: 

863 contents = f.read() 

864 return parse_key_val_file_contents(contents) 

865 except OSError: 

866 raise ConfigNotFound(path=filename) 

867 

868 

869def parse_key_val_file_contents(contents): 

870 # This was originally extracted from the EC2 credential provider, which was 

871 # fairly lenient in its parsing. We only try to parse key/val pairs if 

872 # there's a '=' in the line. 

873 final = {} 

874 for line in contents.splitlines(): 

875 if '=' not in line: 

876 continue 

877 key, val = line.split('=', 1) 

878 key = key.strip() 

879 val = val.strip() 

880 final[key] = val 

881 return final 

882 

883 

884def percent_encode_sequence(mapping, safe=SAFE_CHARS): 

885 """Urlencode a dict or list into a string. 

886 

887 This is similar to urllib.urlencode except that: 

888 

889 * It uses quote, and not quote_plus 

890 * It has a default list of safe chars that don't need 

891 to be encoded, which matches what AWS services expect. 

892 

893 If any value in the input ``mapping`` is a list type, 

894 then each list element wil be serialized. This is the equivalent 

895 to ``urlencode``'s ``doseq=True`` argument. 

896 

897 This function should be preferred over the stdlib 

898 ``urlencode()`` function. 

899 

900 :param mapping: Either a dict to urlencode or a list of 

901 ``(key, value)`` pairs. 

902 

903 """ 

904 encoded_pairs = [] 

905 if hasattr(mapping, 'items'): 

906 pairs = mapping.items() 

907 else: 

908 pairs = mapping 

909 for key, value in pairs: 

910 if isinstance(value, list): 

911 for element in value: 

912 encoded_pairs.append( 

913 f'{percent_encode(key)}={percent_encode(element)}' 

914 ) 

915 else: 

916 encoded_pairs.append( 

917 f'{percent_encode(key)}={percent_encode(value)}' 

918 ) 

919 return '&'.join(encoded_pairs) 

920 

921 

922def percent_encode(input_str, safe=SAFE_CHARS): 

923 """Urlencodes a string. 

924 

925 Whereas percent_encode_sequence handles taking a dict/sequence and 

926 producing a percent encoded string, this function deals only with 

927 taking a string (not a dict/sequence) and percent encoding it. 

928 

929 If given the binary type, will simply URL encode it. If given the 

930 text type, will produce the binary type by UTF-8 encoding the 

931 text. If given something else, will convert it to the text type 

932 first. 

933 """ 

934 # If its not a binary or text string, make it a text string. 

935 if not isinstance(input_str, (bytes, str)): 

936 input_str = str(input_str) 

937 # If it's not bytes, make it bytes by UTF-8 encoding it. 

938 if not isinstance(input_str, bytes): 

939 input_str = input_str.encode('utf-8') 

940 return quote(input_str, safe=safe) 

941 

942 

943def _epoch_seconds_to_datetime(value, tzinfo): 

944 """Parse numerical epoch timestamps (seconds since 1970) into a 

945 ``datetime.datetime`` in UTC using ``datetime.timedelta``. This is intended 

946 as fallback when ``fromtimestamp`` raises ``OverflowError`` or ``OSError``. 

947 

948 :type value: float or int 

949 :param value: The Unix timestamps as number. 

950 

951 :type tzinfo: callable 

952 :param tzinfo: A ``datetime.tzinfo`` class or compatible callable. 

953 """ 

954 epoch_zero = datetime.datetime(1970, 1, 1, 0, 0, 0, tzinfo=tzutc()) 

955 epoch_zero_localized = epoch_zero.astimezone(tzinfo()) 

956 return epoch_zero_localized + datetime.timedelta(seconds=value) 

957 

958 

959def _parse_timestamp_with_tzinfo(value, tzinfo): 

960 """Parse timestamp with pluggable tzinfo options.""" 

961 if isinstance(value, (int, float)): 

962 # Possibly an epoch time. 

963 return datetime.datetime.fromtimestamp(value, tzinfo()) 

964 else: 

965 try: 

966 return datetime.datetime.fromtimestamp(float(value), tzinfo()) 

967 except (TypeError, ValueError): 

968 pass 

969 try: 

970 # In certain cases, a timestamp marked with GMT can be parsed into a 

971 # different time zone, so here we provide a context which will 

972 # enforce that GMT == UTC. 

973 return dateutil.parser.parse(value, tzinfos={'GMT': tzutc()}) 

974 except (TypeError, ValueError) as e: 

975 raise ValueError(f'Invalid timestamp "{value}": {e}') 

976 

977 

978def parse_timestamp(value): 

979 """Parse a timestamp into a datetime object. 

980 

981 Supported formats: 

982 

983 * iso8601 

984 * rfc822 

985 * epoch (value is an integer) 

986 

987 This will return a ``datetime.datetime`` object. 

988 

989 """ 

990 tzinfo_options = get_tzinfo_options() 

991 for tzinfo in tzinfo_options: 

992 try: 

993 return _parse_timestamp_with_tzinfo(value, tzinfo) 

994 except (OSError, OverflowError) as e: 

995 logger.debug( 

996 'Unable to parse timestamp with "%s" timezone info.', 

997 tzinfo.__name__, 

998 exc_info=e, 

999 ) 

1000 # For numeric values attempt fallback to using fromtimestamp-free method. 

1001 # From Python's ``datetime.datetime.fromtimestamp`` documentation: "This 

1002 # may raise ``OverflowError``, if the timestamp is out of the range of 

1003 # values supported by the platform C localtime() function, and ``OSError`` 

1004 # on localtime() failure. It's common for this to be restricted to years 

1005 # from 1970 through 2038." 

1006 try: 

1007 numeric_value = float(value) 

1008 except (TypeError, ValueError): 

1009 pass 

1010 else: 

1011 try: 

1012 for tzinfo in tzinfo_options: 

1013 return _epoch_seconds_to_datetime(numeric_value, tzinfo=tzinfo) 

1014 except (OSError, OverflowError) as e: 

1015 logger.debug( 

1016 'Unable to parse timestamp using fallback method with "%s" ' 

1017 'timezone info.', 

1018 tzinfo.__name__, 

1019 exc_info=e, 

1020 ) 

1021 raise RuntimeError( 

1022 f'Unable to calculate correct timezone offset for "{value}"' 

1023 ) 

1024 

1025 

1026def parse_to_aware_datetime(value): 

1027 """Converted the passed in value to a datetime object with tzinfo. 

1028 

1029 This function can be used to normalize all timestamp inputs. This 

1030 function accepts a number of different types of inputs, but 

1031 will always return a datetime.datetime object with time zone 

1032 information. 

1033 

1034 The input param ``value`` can be one of several types: 

1035 

1036 * A datetime object (both naive and aware) 

1037 * An integer representing the epoch time (can also be a string 

1038 of the integer, i.e '0', instead of 0). The epoch time is 

1039 considered to be UTC. 

1040 * An iso8601 formatted timestamp. This does not need to be 

1041 a complete timestamp, it can contain just the date portion 

1042 without the time component. 

1043 

1044 The returned value will be a datetime object that will have tzinfo. 

1045 If no timezone info was provided in the input value, then UTC is 

1046 assumed, not local time. 

1047 

1048 """ 

1049 # This is a general purpose method that handles several cases of 

1050 # converting the provided value to a string timestamp suitable to be 

1051 # serialized to an http request. It can handle: 

1052 # 1) A datetime.datetime object. 

1053 if isinstance(value, _DatetimeClass): 

1054 datetime_obj = value 

1055 else: 

1056 # 2) A string object that's formatted as a timestamp. 

1057 # We document this as being an iso8601 timestamp, although 

1058 # parse_timestamp is a bit more flexible. 

1059 datetime_obj = parse_timestamp(value) 

1060 if datetime_obj.tzinfo is None: 

1061 # I think a case would be made that if no time zone is provided, 

1062 # we should use the local time. However, to restore backwards 

1063 # compat, the previous behavior was to assume UTC, which is 

1064 # what we're going to do here. 

1065 datetime_obj = datetime_obj.replace(tzinfo=tzutc()) 

1066 else: 

1067 datetime_obj = datetime_obj.astimezone(tzutc()) 

1068 return datetime_obj 

1069 

1070 

1071def datetime2timestamp(dt, default_timezone=None): 

1072 """Calculate the timestamp based on the given datetime instance. 

1073 

1074 :type dt: datetime 

1075 :param dt: A datetime object to be converted into timestamp 

1076 :type default_timezone: tzinfo 

1077 :param default_timezone: If it is provided as None, we treat it as tzutc(). 

1078 But it is only used when dt is a naive datetime. 

1079 :returns: The timestamp 

1080 """ 

1081 epoch = datetime.datetime(1970, 1, 1) 

1082 if dt.tzinfo is None: 

1083 if default_timezone is None: 

1084 default_timezone = tzutc() 

1085 dt = dt.replace(tzinfo=default_timezone) 

1086 d = dt.replace(tzinfo=None) - dt.utcoffset() - epoch 

1087 return d.total_seconds() 

1088 

1089 

1090def calculate_sha256(body, as_hex=False): 

1091 """Calculate a sha256 checksum. 

1092 

1093 This method will calculate the sha256 checksum of a file like 

1094 object. Note that this method will iterate through the entire 

1095 file contents. The caller is responsible for ensuring the proper 

1096 starting position of the file and ``seek()``'ing the file back 

1097 to its starting location if other consumers need to read from 

1098 the file like object. 

1099 

1100 :param body: Any file like object. The file must be opened 

1101 in binary mode such that a ``.read()`` call returns bytes. 

1102 :param as_hex: If True, then the hex digest is returned. 

1103 If False, then the digest (as binary bytes) is returned. 

1104 

1105 :returns: The sha256 checksum 

1106 

1107 """ 

1108 checksum = hashlib.sha256() 

1109 for chunk in iter(lambda: body.read(1024 * 1024), b''): 

1110 checksum.update(chunk) 

1111 if as_hex: 

1112 return checksum.hexdigest() 

1113 else: 

1114 return checksum.digest() 

1115 

1116 

1117def calculate_tree_hash(body): 

1118 """Calculate a tree hash checksum. 

1119 

1120 For more information see: 

1121 

1122 http://docs.aws.amazon.com/amazonglacier/latest/dev/checksum-calculations.html 

1123 

1124 :param body: Any file like object. This has the same constraints as 

1125 the ``body`` param in calculate_sha256 

1126 

1127 :rtype: str 

1128 :returns: The hex version of the calculated tree hash 

1129 

1130 """ 

1131 chunks = [] 

1132 required_chunk_size = 1024 * 1024 

1133 sha256 = hashlib.sha256 

1134 for chunk in iter(lambda: body.read(required_chunk_size), b''): 

1135 chunks.append(sha256(chunk).digest()) 

1136 if not chunks: 

1137 return sha256(b'').hexdigest() 

1138 while len(chunks) > 1: 

1139 new_chunks = [] 

1140 for first, second in _in_pairs(chunks): 

1141 if second is not None: 

1142 new_chunks.append(sha256(first + second).digest()) 

1143 else: 

1144 # We're at the end of the list and there's no pair left. 

1145 new_chunks.append(first) 

1146 chunks = new_chunks 

1147 return binascii.hexlify(chunks[0]).decode('ascii') 

1148 

1149 

1150def _in_pairs(iterable): 

1151 # Creates iterator that iterates over the list in pairs: 

1152 # for a, b in _in_pairs([0, 1, 2, 3, 4]): 

1153 # print(a, b) 

1154 # 

1155 # will print: 

1156 # 0, 1 

1157 # 2, 3 

1158 # 4, None 

1159 shared_iter = iter(iterable) 

1160 # Note that zip_longest is a compat import that uses 

1161 # the itertools izip_longest. This creates an iterator, 

1162 # this call below does _not_ immediately create the list 

1163 # of pairs. 

1164 return zip_longest(shared_iter, shared_iter) 

1165 

1166 

1167class CachedProperty: 

1168 """A read only property that caches the initially computed value. 

1169 

1170 This descriptor will only call the provided ``fget`` function once. 

1171 Subsequent access to this property will return the cached value. 

1172 

1173 """ 

1174 

1175 def __init__(self, fget): 

1176 self._fget = fget 

1177 

1178 def __get__(self, obj, cls): 

1179 if obj is None: 

1180 return self 

1181 else: 

1182 computed_value = self._fget(obj) 

1183 obj.__dict__[self._fget.__name__] = computed_value 

1184 return computed_value 

1185 

1186 

1187class ArgumentGenerator: 

1188 """Generate sample input based on a shape model. 

1189 

1190 This class contains a ``generate_skeleton`` method that will take 

1191 an input/output shape (created from ``botocore.model``) and generate 

1192 a sample dictionary corresponding to the input/output shape. 

1193 

1194 The specific values used are place holder values. For strings either an 

1195 empty string or the member name can be used, for numbers 0 or 0.0 is used. 

1196 The intended usage of this class is to generate the *shape* of the input 

1197 structure. 

1198 

1199 This can be useful for operations that have complex input shapes. 

1200 This allows a user to just fill in the necessary data instead of 

1201 worrying about the specific structure of the input arguments. 

1202 

1203 Example usage:: 

1204 

1205 s = botocore.session.get_session() 

1206 ddb = s.get_service_model('dynamodb') 

1207 arg_gen = ArgumentGenerator() 

1208 sample_input = arg_gen.generate_skeleton( 

1209 ddb.operation_model('CreateTable').input_shape) 

1210 print("Sample input for dynamodb.CreateTable: %s" % sample_input) 

1211 

1212 """ 

1213 

1214 def __init__(self, use_member_names=False): 

1215 self._use_member_names = use_member_names 

1216 

1217 def generate_skeleton(self, shape): 

1218 """Generate a sample input. 

1219 

1220 :type shape: ``botocore.model.Shape`` 

1221 :param shape: The input shape. 

1222 

1223 :return: The generated skeleton input corresponding to the 

1224 provided input shape. 

1225 

1226 """ 

1227 stack = [] 

1228 return self._generate_skeleton(shape, stack) 

1229 

1230 def _generate_skeleton(self, shape, stack, name=''): 

1231 stack.append(shape.name) 

1232 try: 

1233 if shape.type_name == 'structure': 

1234 return self._generate_type_structure(shape, stack) 

1235 elif shape.type_name == 'list': 

1236 return self._generate_type_list(shape, stack) 

1237 elif shape.type_name == 'map': 

1238 return self._generate_type_map(shape, stack) 

1239 elif shape.type_name == 'string': 

1240 if self._use_member_names: 

1241 return name 

1242 if shape.enum: 

1243 return random.choice(shape.enum) 

1244 return '' 

1245 elif shape.type_name in ['integer', 'long']: 

1246 return 0 

1247 elif shape.type_name in ['float', 'double']: 

1248 return 0.0 

1249 elif shape.type_name == 'boolean': 

1250 return True 

1251 elif shape.type_name == 'timestamp': 

1252 return datetime.datetime(1970, 1, 1, 0, 0, 0) 

1253 finally: 

1254 stack.pop() 

1255 

1256 def _generate_type_structure(self, shape, stack): 

1257 if stack.count(shape.name) > 1: 

1258 return {} 

1259 skeleton = OrderedDict() 

1260 for member_name, member_shape in shape.members.items(): 

1261 skeleton[member_name] = self._generate_skeleton( 

1262 member_shape, stack, name=member_name 

1263 ) 

1264 return skeleton 

1265 

1266 def _generate_type_list(self, shape, stack): 

1267 # For list elements we've arbitrarily decided to 

1268 # return two elements for the skeleton list. 

1269 name = '' 

1270 if self._use_member_names: 

1271 name = shape.member.name 

1272 return [ 

1273 self._generate_skeleton(shape.member, stack, name), 

1274 ] 

1275 

1276 def _generate_type_map(self, shape, stack): 

1277 key_shape = shape.key 

1278 value_shape = shape.value 

1279 assert key_shape.type_name == 'string' 

1280 return OrderedDict( 

1281 [ 

1282 ('KeyName', self._generate_skeleton(value_shape, stack)), 

1283 ] 

1284 ) 

1285 

1286 

1287def is_valid_ipv6_endpoint_url(endpoint_url): 

1288 if UNSAFE_URL_CHARS.intersection(endpoint_url): 

1289 return False 

1290 hostname = f'[{urlparse(endpoint_url).hostname}]' 

1291 return IPV6_ADDRZ_RE.match(hostname) is not None 

1292 

1293 

1294def is_valid_ipv4_endpoint_url(endpoint_url): 

1295 hostname = urlparse(endpoint_url).hostname 

1296 return IPV4_RE.match(hostname) is not None 

1297 

1298 

1299def is_valid_endpoint_url(endpoint_url): 

1300 """Verify the endpoint_url is valid. 

1301 

1302 :type endpoint_url: string 

1303 :param endpoint_url: An endpoint_url. Must have at least a scheme 

1304 and a hostname. 

1305 

1306 :return: True if the endpoint url is valid. False otherwise. 

1307 

1308 """ 

1309 # post-bpo-43882 urlsplit() strips unsafe characters from URL, causing 

1310 # it to pass hostname validation below. Detect them early to fix that. 

1311 if UNSAFE_URL_CHARS.intersection(endpoint_url): 

1312 return False 

1313 parts = urlsplit(endpoint_url) 

1314 hostname = parts.hostname 

1315 if hostname is None: 

1316 return False 

1317 if len(hostname) > 255: 

1318 return False 

1319 if hostname[-1] == ".": 

1320 hostname = hostname[:-1] 

1321 allowed = re.compile( 

1322 r"^((?!-)[A-Z\d-]{1,63}(?<!-)\.)*((?!-)[A-Z\d-]{1,63}(?<!-))$", 

1323 re.IGNORECASE, 

1324 ) 

1325 return allowed.match(hostname) 

1326 

1327 

1328def is_valid_uri(endpoint_url): 

1329 return is_valid_endpoint_url(endpoint_url) or is_valid_ipv6_endpoint_url( 

1330 endpoint_url 

1331 ) 

1332 

1333 

1334def validate_region_name(region_name): 

1335 """Provided region_name must be a valid host label.""" 

1336 if region_name is None: 

1337 return 

1338 valid_host_label = re.compile(r'^(?![0-9]+$)(?!-)[a-zA-Z0-9-]{,63}(?<!-)$') 

1339 valid = valid_host_label.match(region_name) 

1340 if not valid: 

1341 raise InvalidRegionError(region_name=region_name) 

1342 

1343 

1344def check_dns_name(bucket_name): 

1345 """ 

1346 Check to see if the ``bucket_name`` complies with the 

1347 restricted DNS naming conventions necessary to allow 

1348 access via virtual-hosting style. 

1349 

1350 Even though "." characters are perfectly valid in this DNS 

1351 naming scheme, we are going to punt on any name containing a 

1352 "." character because these will cause SSL cert validation 

1353 problems if we try to use virtual-hosting style addressing. 

1354 """ 

1355 if '.' in bucket_name: 

1356 return False 

1357 n = len(bucket_name) 

1358 if n < 3 or n > 63: 

1359 # Wrong length 

1360 return False 

1361 match = LABEL_RE.match(bucket_name) 

1362 if match is None or match.end() != len(bucket_name): 

1363 return False 

1364 return True 

1365 

1366 

1367def fix_s3_host( 

1368 request, 

1369 signature_version, 

1370 region_name, 

1371 default_endpoint_url=None, 

1372 **kwargs, 

1373): 

1374 """ 

1375 This handler looks at S3 requests just before they are signed. 

1376 If there is a bucket name on the path (true for everything except 

1377 ListAllBuckets) it checks to see if that bucket name conforms to 

1378 the DNS naming conventions. If it does, it alters the request to 

1379 use ``virtual hosting`` style addressing rather than ``path-style`` 

1380 addressing. 

1381 

1382 """ 

1383 if request.context.get('use_global_endpoint', False): 

1384 default_endpoint_url = 's3.amazonaws.com' 

1385 try: 

1386 switch_to_virtual_host_style( 

1387 request, signature_version, default_endpoint_url 

1388 ) 

1389 except InvalidDNSNameError as e: 

1390 bucket_name = e.kwargs['bucket_name'] 

1391 logger.debug( 

1392 'Not changing URI, bucket is not DNS compatible: %s', bucket_name 

1393 ) 

1394 

1395 

1396def switch_to_virtual_host_style( 

1397 request, signature_version, default_endpoint_url=None, **kwargs 

1398): 

1399 """ 

1400 This is a handler to force virtual host style s3 addressing no matter 

1401 the signature version (which is taken in consideration for the default 

1402 case). If the bucket is not DNS compatible an InvalidDNSName is thrown. 

1403 

1404 :param request: A AWSRequest object that is about to be sent. 

1405 :param signature_version: The signature version to sign with 

1406 :param default_endpoint_url: The endpoint to use when switching to a 

1407 virtual style. If None is supplied, the virtual host will be 

1408 constructed from the url of the request. 

1409 """ 

1410 if request.auth_path is not None: 

1411 # The auth_path has already been applied (this may be a 

1412 # retried request). We don't need to perform this 

1413 # customization again. 

1414 return 

1415 elif _is_get_bucket_location_request(request): 

1416 # For the GetBucketLocation response, we should not be using 

1417 # the virtual host style addressing so we can avoid any sigv4 

1418 # issues. 

1419 logger.debug( 

1420 "Request is GetBucketLocation operation, not checking " 

1421 "for DNS compatibility." 

1422 ) 

1423 return 

1424 parts = urlsplit(request.url) 

1425 request.auth_path = parts.path 

1426 path_parts = parts.path.split('/') 

1427 

1428 # Retrieve what the endpoint we will be prepending the bucket name to. 

1429 if default_endpoint_url is None: 

1430 default_endpoint_url = parts.netloc 

1431 

1432 if len(path_parts) > 1: 

1433 bucket_name = path_parts[1] 

1434 if not bucket_name: 

1435 # If the bucket name is empty we should not be checking for 

1436 # dns compatibility. 

1437 return 

1438 logger.debug('Checking for DNS compatible bucket for: %s', request.url) 

1439 if check_dns_name(bucket_name): 

1440 # If the operation is on a bucket, the auth_path must be 

1441 # terminated with a '/' character. 

1442 if len(path_parts) == 2: 

1443 if request.auth_path[-1] != '/': 

1444 request.auth_path += '/' 

1445 path_parts.remove(bucket_name) 

1446 # At the very least the path must be a '/', such as with the 

1447 # CreateBucket operation when DNS style is being used. If this 

1448 # is not used you will get an empty path which is incorrect. 

1449 path = '/'.join(path_parts) or '/' 

1450 global_endpoint = default_endpoint_url 

1451 host = bucket_name + '.' + global_endpoint 

1452 new_tuple = (parts.scheme, host, path, parts.query, '') 

1453 new_uri = urlunsplit(new_tuple) 

1454 request.url = new_uri 

1455 logger.debug('URI updated to: %s', new_uri) 

1456 else: 

1457 raise InvalidDNSNameError(bucket_name=bucket_name) 

1458 

1459 

1460def _is_get_bucket_location_request(request): 

1461 return request.url.endswith('?location') 

1462 

1463 

1464def instance_cache(func): 

1465 """Method decorator for caching method calls to a single instance. 

1466 

1467 **This is not a general purpose caching decorator.** 

1468 

1469 In order to use this, you *must* provide an ``_instance_cache`` 

1470 attribute on the instance. 

1471 

1472 This decorator is used to cache method calls. The cache is only 

1473 scoped to a single instance though such that multiple instances 

1474 will maintain their own cache. In order to keep things simple, 

1475 this decorator requires that you provide an ``_instance_cache`` 

1476 attribute on your instance. 

1477 

1478 """ 

1479 func_name = func.__name__ 

1480 

1481 @functools.wraps(func) 

1482 def _cache_guard(self, *args, **kwargs): 

1483 cache_key = (func_name, args) 

1484 if kwargs: 

1485 kwarg_items = tuple(sorted(kwargs.items())) 

1486 cache_key = (func_name, args, kwarg_items) 

1487 result = self._instance_cache.get(cache_key) 

1488 if result is not None: 

1489 return result 

1490 result = func(self, *args, **kwargs) 

1491 self._instance_cache[cache_key] = result 

1492 return result 

1493 

1494 return _cache_guard 

1495 

1496 

1497def lru_cache_weakref(*cache_args, **cache_kwargs): 

1498 """ 

1499 Version of functools.lru_cache that stores a weak reference to ``self``. 

1500 

1501 Serves the same purpose as :py:func:`instance_cache` but uses Python's 

1502 functools implementation which offers ``max_size`` and ``typed`` properties. 

1503 

1504 lru_cache is a global cache even when used on a method. The cache's 

1505 reference to ``self`` will prevent garbage collection of the object. This 

1506 wrapper around functools.lru_cache replaces the reference to ``self`` with 

1507 a weak reference to not interfere with garbage collection. 

1508 """ 

1509 

1510 def wrapper(func): 

1511 @functools.lru_cache(*cache_args, **cache_kwargs) 

1512 def func_with_weakref(weakref_to_self, *args, **kwargs): 

1513 return func(weakref_to_self(), *args, **kwargs) 

1514 

1515 @functools.wraps(func) 

1516 def inner(self, *args, **kwargs): 

1517 for kwarg_key, kwarg_value in kwargs.items(): 

1518 if isinstance(kwarg_value, list): 

1519 kwargs[kwarg_key] = tuple(kwarg_value) 

1520 return func_with_weakref(weakref.ref(self), *args, **kwargs) 

1521 

1522 inner.cache_info = func_with_weakref.cache_info 

1523 return inner 

1524 

1525 return wrapper 

1526 

1527 

1528def switch_host_s3_accelerate(request, operation_name, **kwargs): 

1529 """Switches the current s3 endpoint with an S3 Accelerate endpoint""" 

1530 

1531 # Note that when registered the switching of the s3 host happens 

1532 # before it gets changed to virtual. So we are not concerned with ensuring 

1533 # that the bucket name is translated to the virtual style here and we 

1534 # can hard code the Accelerate endpoint. 

1535 parts = urlsplit(request.url).netloc.split('.') 

1536 parts = [p for p in parts if p in S3_ACCELERATE_WHITELIST] 

1537 endpoint = 'https://s3-accelerate.' 

1538 if len(parts) > 0: 

1539 endpoint += '.'.join(parts) + '.' 

1540 endpoint += 'amazonaws.com' 

1541 

1542 if operation_name in ['ListBuckets', 'CreateBucket', 'DeleteBucket']: 

1543 return 

1544 _switch_hosts(request, endpoint, use_new_scheme=False) 

1545 

1546 

1547def switch_host_with_param(request, param_name): 

1548 """Switches the host using a parameter value from a JSON request body""" 

1549 request_json = json.loads(request.data.decode('utf-8')) 

1550 if request_json.get(param_name): 

1551 new_endpoint = request_json[param_name] 

1552 _switch_hosts(request, new_endpoint) 

1553 

1554 

1555def _switch_hosts(request, new_endpoint, use_new_scheme=True): 

1556 final_endpoint = _get_new_endpoint( 

1557 request.url, new_endpoint, use_new_scheme 

1558 ) 

1559 request.url = final_endpoint 

1560 

1561 

1562def _get_new_endpoint(original_endpoint, new_endpoint, use_new_scheme=True): 

1563 new_endpoint_components = urlsplit(new_endpoint) 

1564 original_endpoint_components = urlsplit(original_endpoint) 

1565 scheme = original_endpoint_components.scheme 

1566 if use_new_scheme: 

1567 scheme = new_endpoint_components.scheme 

1568 final_endpoint_components = ( 

1569 scheme, 

1570 new_endpoint_components.netloc, 

1571 original_endpoint_components.path, 

1572 original_endpoint_components.query, 

1573 '', 

1574 ) 

1575 final_endpoint = urlunsplit(final_endpoint_components) 

1576 logger.debug( 

1577 'Updating URI from %s to %s', original_endpoint, final_endpoint 

1578 ) 

1579 return final_endpoint 

1580 

1581 

1582def deep_merge(base, extra): 

1583 """Deeply two dictionaries, overriding existing keys in the base. 

1584 

1585 :param base: The base dictionary which will be merged into. 

1586 :param extra: The dictionary to merge into the base. Keys from this 

1587 dictionary will take precedence. 

1588 """ 

1589 for key in extra: 

1590 # If the key represents a dict on both given dicts, merge the sub-dicts 

1591 if ( 

1592 key in base 

1593 and isinstance(base[key], dict) 

1594 and isinstance(extra[key], dict) 

1595 ): 

1596 deep_merge(base[key], extra[key]) 

1597 continue 

1598 

1599 # Otherwise, set the key on the base to be the value of the extra. 

1600 base[key] = extra[key] 

1601 

1602 

1603def hyphenize_service_id(service_id): 

1604 """Translate the form used for event emitters. 

1605 

1606 :param service_id: The service_id to convert. 

1607 """ 

1608 return service_id.replace(' ', '-').lower() 

1609 

1610 

1611class IdentityCache: 

1612 """Base IdentityCache implementation for storing and retrieving 

1613 highly accessed credentials. 

1614 

1615 This class is not intended to be instantiated in user code. 

1616 """ 

1617 

1618 METHOD = "base_identity_cache" 

1619 

1620 def __init__(self, client, credential_cls): 

1621 self._client = client 

1622 self._credential_cls = credential_cls 

1623 

1624 def get_credentials(self, **kwargs): 

1625 callback = self.build_refresh_callback(**kwargs) 

1626 metadata = callback() 

1627 credential_entry = self._credential_cls.create_from_metadata( 

1628 metadata=metadata, 

1629 refresh_using=callback, 

1630 method=self.METHOD, 

1631 advisory_timeout=45, 

1632 mandatory_timeout=10, 

1633 ) 

1634 return credential_entry 

1635 

1636 def build_refresh_callback(**kwargs): 

1637 """Callback to be implemented by subclasses. 

1638 

1639 Returns a set of metadata to be converted into a new 

1640 credential instance. 

1641 """ 

1642 raise NotImplementedError() 

1643 

1644 

1645class S3ExpressIdentityCache(IdentityCache): 

1646 """S3Express IdentityCache for retrieving and storing 

1647 credentials from CreateSession calls. 

1648 

1649 This class is not intended to be instantiated in user code. 

1650 """ 

1651 

1652 METHOD = "s3express" 

1653 

1654 def __init__(self, client, credential_cls): 

1655 self._client = client 

1656 self._credential_cls = credential_cls 

1657 

1658 @functools.lru_cache(maxsize=100) 

1659 def get_credentials(self, bucket): 

1660 return super().get_credentials(bucket=bucket) 

1661 

1662 def build_refresh_callback(self, bucket): 

1663 def refresher(): 

1664 response = self._client.create_session(Bucket=bucket) 

1665 creds = response['Credentials'] 

1666 expiration = self._serialize_if_needed( 

1667 creds['Expiration'], iso=True 

1668 ) 

1669 return { 

1670 "access_key": creds['AccessKeyId'], 

1671 "secret_key": creds['SecretAccessKey'], 

1672 "token": creds['SessionToken'], 

1673 "expiry_time": expiration, 

1674 } 

1675 

1676 return refresher 

1677 

1678 def _serialize_if_needed(self, value, iso=False): 

1679 if isinstance(value, _DatetimeClass): 

1680 if iso: 

1681 return value.isoformat() 

1682 return value.strftime('%Y-%m-%dT%H:%M:%S%Z') 

1683 return value 

1684 

1685 

1686class S3ExpressIdentityResolver: 

1687 def __init__(self, client, credential_cls, cache=None): 

1688 self._client = weakref.proxy(client) 

1689 

1690 if cache is None: 

1691 cache = S3ExpressIdentityCache(self._client, credential_cls) 

1692 self._cache = cache 

1693 

1694 def register(self, event_emitter=None): 

1695 logger.debug('Registering S3Express Identity Resolver') 

1696 emitter = event_emitter or self._client.meta.events 

1697 emitter.register('before-call.s3', self.apply_signing_cache_key) 

1698 emitter.register('before-sign.s3', self.resolve_s3express_identity) 

1699 

1700 def apply_signing_cache_key(self, params, context, **kwargs): 

1701 endpoint_properties = context.get('endpoint_properties', {}) 

1702 backend = endpoint_properties.get('backend', None) 

1703 

1704 # Add cache key if Bucket supplied for s3express request 

1705 bucket_name = context.get('input_params', {}).get('Bucket') 

1706 if backend == 'S3Express' and bucket_name is not None: 

1707 context.setdefault('signing', {}) 

1708 context['signing']['cache_key'] = bucket_name 

1709 

1710 def resolve_s3express_identity( 

1711 self, 

1712 request, 

1713 signing_name, 

1714 region_name, 

1715 signature_version, 

1716 request_signer, 

1717 operation_name, 

1718 **kwargs, 

1719 ): 

1720 signing_context = request.context.get('signing', {}) 

1721 signing_name = signing_context.get('signing_name') 

1722 if signing_name == 's3express' and signature_version.startswith( 

1723 'v4-s3express' 

1724 ): 

1725 signing_context['identity_cache'] = self._cache 

1726 if 'cache_key' not in signing_context: 

1727 signing_context['cache_key'] = ( 

1728 request.context.get('s3_redirect', {}) 

1729 .get('params', {}) 

1730 .get('Bucket') 

1731 ) 

1732 

1733 

1734class S3RegionRedirectorv2: 

1735 """Updated version of S3RegionRedirector for use when 

1736 EndpointRulesetResolver is in use for endpoint resolution. 

1737 

1738 This class is considered private and subject to abrupt breaking changes or 

1739 removal without prior announcement. Please do not use it directly. 

1740 """ 

1741 

1742 def __init__(self, endpoint_bridge, client, cache=None): 

1743 self._cache = cache or {} 

1744 self._client = weakref.proxy(client) 

1745 

1746 def register(self, event_emitter=None): 

1747 logger.debug('Registering S3 region redirector handler') 

1748 emitter = event_emitter or self._client.meta.events 

1749 emitter.register('needs-retry.s3', self.redirect_from_error) 

1750 emitter.register( 

1751 'before-parameter-build.s3', self.annotate_request_context 

1752 ) 

1753 emitter.register( 

1754 'before-endpoint-resolution.s3', self.redirect_from_cache 

1755 ) 

1756 

1757 def redirect_from_error(self, request_dict, response, operation, **kwargs): 

1758 """ 

1759 An S3 request sent to the wrong region will return an error that 

1760 contains the endpoint the request should be sent to. This handler 

1761 will add the redirect information to the signing context and then 

1762 redirect the request. 

1763 """ 

1764 if response is None: 

1765 # This could be none if there was a ConnectionError or other 

1766 # transport error. 

1767 return 

1768 

1769 redirect_ctx = request_dict.get('context', {}).get('s3_redirect', {}) 

1770 if ArnParser.is_arn(redirect_ctx.get('bucket')): 

1771 logger.debug( 

1772 'S3 request was previously for an Accesspoint ARN, not ' 

1773 'redirecting.' 

1774 ) 

1775 return 

1776 

1777 if redirect_ctx.get('redirected'): 

1778 logger.debug( 

1779 'S3 request was previously redirected, not redirecting.' 

1780 ) 

1781 return 

1782 

1783 error = response[1].get('Error', {}) 

1784 error_code = error.get('Code') 

1785 response_metadata = response[1].get('ResponseMetadata', {}) 

1786 

1787 # We have to account for 400 responses because 

1788 # if we sign a Head* request with the wrong region, 

1789 # we'll get a 400 Bad Request but we won't get a 

1790 # body saying it's an "AuthorizationHeaderMalformed". 

1791 is_special_head_object = ( 

1792 error_code in ('301', '400') and operation.name == 'HeadObject' 

1793 ) 

1794 is_special_head_bucket = ( 

1795 error_code in ('301', '400') 

1796 and operation.name == 'HeadBucket' 

1797 and 'x-amz-bucket-region' 

1798 in response_metadata.get('HTTPHeaders', {}) 

1799 ) 

1800 is_wrong_signing_region = ( 

1801 error_code == 'AuthorizationHeaderMalformed' and 'Region' in error 

1802 ) 

1803 is_redirect_status = response[0] is not None and response[ 

1804 0 

1805 ].status_code in (301, 302, 307) 

1806 is_permanent_redirect = error_code == 'PermanentRedirect' 

1807 is_opt_in_region_redirect = ( 

1808 error_code == 'IllegalLocationConstraintException' 

1809 and operation.name != 'CreateBucket' 

1810 ) 

1811 if not any( 

1812 [ 

1813 is_special_head_object, 

1814 is_wrong_signing_region, 

1815 is_permanent_redirect, 

1816 is_special_head_bucket, 

1817 is_redirect_status, 

1818 is_opt_in_region_redirect, 

1819 ] 

1820 ): 

1821 return 

1822 

1823 bucket = request_dict['context']['s3_redirect']['bucket'] 

1824 client_region = request_dict['context'].get('client_region') 

1825 new_region = self.get_bucket_region(bucket, response) 

1826 

1827 if new_region is None: 

1828 logger.debug( 

1829 "S3 client configured for region %s but the " 

1830 "bucket %s is not in that region and the proper region " 

1831 "could not be automatically determined.", 

1832 client_region, 

1833 bucket, 

1834 ) 

1835 return 

1836 

1837 logger.debug( 

1838 "S3 client configured for region %s but the bucket %s " 

1839 "is in region %s; Please configure the proper region to " 

1840 "avoid multiple unnecessary redirects and signing attempts.", 

1841 client_region, 

1842 bucket, 

1843 new_region, 

1844 ) 

1845 # Adding the new region to _cache will make construct_endpoint() to 

1846 # use the new region as value for the AWS::Region builtin parameter. 

1847 self._cache[bucket] = new_region 

1848 

1849 # Re-resolve endpoint with new region and modify request_dict with 

1850 # the new URL, auth scheme, and signing context. 

1851 ep_resolver = self._client._ruleset_resolver 

1852 ep_info = ep_resolver.construct_endpoint( 

1853 operation_model=operation, 

1854 call_args=request_dict['context']['s3_redirect']['params'], 

1855 request_context=request_dict['context'], 

1856 ) 

1857 request_dict['url'] = self.set_request_url( 

1858 request_dict['url'], ep_info.url 

1859 ) 

1860 request_dict['context']['s3_redirect']['redirected'] = True 

1861 auth_schemes = ep_info.properties.get('authSchemes') 

1862 if auth_schemes is not None: 

1863 auth_info = ep_resolver.auth_schemes_to_signing_ctx(auth_schemes) 

1864 auth_type, signing_context = auth_info 

1865 request_dict['context']['auth_type'] = auth_type 

1866 request_dict['context']['signing'] = { 

1867 **request_dict['context'].get('signing', {}), 

1868 **signing_context, 

1869 } 

1870 

1871 # Return 0 so it doesn't wait to retry 

1872 return 0 

1873 

1874 def get_bucket_region(self, bucket, response): 

1875 """ 

1876 There are multiple potential sources for the new region to redirect to, 

1877 but they aren't all universally available for use. This will try to 

1878 find region from response elements, but will fall back to calling 

1879 HEAD on the bucket if all else fails. 

1880 

1881 :param bucket: The bucket to find the region for. This is necessary if 

1882 the region is not available in the error response. 

1883 :param response: A response representing a service request that failed 

1884 due to incorrect region configuration. 

1885 """ 

1886 # First try to source the region from the headers. 

1887 service_response = response[1] 

1888 response_headers = service_response['ResponseMetadata']['HTTPHeaders'] 

1889 if 'x-amz-bucket-region' in response_headers: 

1890 region = response_headers['x-amz-bucket-region'] 

1891 # Next, check the error body 

1892 elif r := service_response.get('Error', {}).get('Region', None): 

1893 region = r 

1894 else: 

1895 # Finally, HEAD the bucket. No other choice sadly. 

1896 try: 

1897 response = self._client.head_bucket(Bucket=bucket) 

1898 headers = response['ResponseMetadata']['HTTPHeaders'] 

1899 except ClientError as e: 

1900 headers = e.response['ResponseMetadata']['HTTPHeaders'] 

1901 region = headers.get('x-amz-bucket-region', None) 

1902 validate_region_name(region) 

1903 return region 

1904 

1905 def set_request_url(self, old_url, new_endpoint, **kwargs): 

1906 """ 

1907 Splice a new endpoint into an existing URL. Note that some endpoints 

1908 from the the endpoint provider have a path component which will be 

1909 discarded by this function. 

1910 """ 

1911 return _get_new_endpoint(old_url, new_endpoint, False) 

1912 

1913 def redirect_from_cache(self, builtins, params, **kwargs): 

1914 """ 

1915 If a bucket name has been redirected before, it is in the cache. This 

1916 handler will update the AWS::Region endpoint resolver builtin param 

1917 to use the region from cache instead of the client region to avoid the 

1918 redirect. 

1919 """ 

1920 bucket = params.get('Bucket') 

1921 if bucket is not None and bucket in self._cache: 

1922 new_region = self._cache.get(bucket) 

1923 builtins['AWS::Region'] = new_region 

1924 

1925 def annotate_request_context(self, params, context, **kwargs): 

1926 """Store the bucket name in context for later use when redirecting. 

1927 The bucket name may be an access point ARN or alias. 

1928 """ 

1929 bucket = params.get('Bucket') 

1930 context['s3_redirect'] = { 

1931 'redirected': False, 

1932 'bucket': bucket, 

1933 'params': params, 

1934 } 

1935 

1936 

1937class S3RegionRedirector: 

1938 """This handler has been replaced by S3RegionRedirectorv2. The original 

1939 version remains in place for any third-party libraries that import it. 

1940 """ 

1941 

1942 def __init__(self, endpoint_bridge, client, cache=None): 

1943 self._endpoint_resolver = endpoint_bridge 

1944 self._cache = cache 

1945 if self._cache is None: 

1946 self._cache = {} 

1947 

1948 # This needs to be a weak ref in order to prevent memory leaks on 

1949 # python 2.6 

1950 self._client = weakref.proxy(client) 

1951 

1952 warnings.warn( 

1953 'The S3RegionRedirector class has been deprecated for a new ' 

1954 'internal replacement. A future version of botocore may remove ' 

1955 'this class.', 

1956 category=FutureWarning, 

1957 ) 

1958 

1959 def register(self, event_emitter=None): 

1960 emitter = event_emitter or self._client.meta.events 

1961 emitter.register('needs-retry.s3', self.redirect_from_error) 

1962 emitter.register('before-call.s3', self.set_request_url) 

1963 emitter.register('before-parameter-build.s3', self.redirect_from_cache) 

1964 

1965 def redirect_from_error(self, request_dict, response, operation, **kwargs): 

1966 """ 

1967 An S3 request sent to the wrong region will return an error that 

1968 contains the endpoint the request should be sent to. This handler 

1969 will add the redirect information to the signing context and then 

1970 redirect the request. 

1971 """ 

1972 if response is None: 

1973 # This could be none if there was a ConnectionError or other 

1974 # transport error. 

1975 return 

1976 

1977 if self._is_s3_accesspoint(request_dict.get('context', {})): 

1978 logger.debug( 

1979 'S3 request was previously to an accesspoint, not redirecting.' 

1980 ) 

1981 return 

1982 

1983 if request_dict.get('context', {}).get('s3_redirected'): 

1984 logger.debug( 

1985 'S3 request was previously redirected, not redirecting.' 

1986 ) 

1987 return 

1988 

1989 error = response[1].get('Error', {}) 

1990 error_code = error.get('Code') 

1991 response_metadata = response[1].get('ResponseMetadata', {}) 

1992 

1993 # We have to account for 400 responses because 

1994 # if we sign a Head* request with the wrong region, 

1995 # we'll get a 400 Bad Request but we won't get a 

1996 # body saying it's an "AuthorizationHeaderMalformed". 

1997 is_special_head_object = ( 

1998 error_code in ('301', '400') and operation.name == 'HeadObject' 

1999 ) 

2000 is_special_head_bucket = ( 

2001 error_code in ('301', '400') 

2002 and operation.name == 'HeadBucket' 

2003 and 'x-amz-bucket-region' 

2004 in response_metadata.get('HTTPHeaders', {}) 

2005 ) 

2006 is_wrong_signing_region = ( 

2007 error_code == 'AuthorizationHeaderMalformed' and 'Region' in error 

2008 ) 

2009 is_redirect_status = response[0] is not None and response[ 

2010 0 

2011 ].status_code in (301, 302, 307) 

2012 is_permanent_redirect = error_code == 'PermanentRedirect' 

2013 if not any( 

2014 [ 

2015 is_special_head_object, 

2016 is_wrong_signing_region, 

2017 is_permanent_redirect, 

2018 is_special_head_bucket, 

2019 is_redirect_status, 

2020 ] 

2021 ): 

2022 return 

2023 

2024 bucket = request_dict['context']['signing']['bucket'] 

2025 client_region = request_dict['context'].get('client_region') 

2026 new_region = self.get_bucket_region(bucket, response) 

2027 

2028 if new_region is None: 

2029 logger.debug( 

2030 "S3 client configured for region %s but the bucket %s is not " 

2031 "in that region and the proper region could not be " 

2032 "automatically determined.", 

2033 client_region, 

2034 bucket, 

2035 ) 

2036 return 

2037 

2038 logger.debug( 

2039 "S3 client configured for region %s but the bucket %s is in region" 

2040 " %s; Please configure the proper region to avoid multiple " 

2041 "unnecessary redirects and signing attempts.", 

2042 client_region, 

2043 bucket, 

2044 new_region, 

2045 ) 

2046 endpoint = self._endpoint_resolver.resolve('s3', new_region) 

2047 endpoint = endpoint['endpoint_url'] 

2048 

2049 signing_context = { 

2050 'region': new_region, 

2051 'bucket': bucket, 

2052 'endpoint': endpoint, 

2053 } 

2054 request_dict['context']['signing'] = signing_context 

2055 

2056 self._cache[bucket] = signing_context 

2057 self.set_request_url(request_dict, request_dict['context']) 

2058 

2059 request_dict['context']['s3_redirected'] = True 

2060 

2061 # Return 0 so it doesn't wait to retry 

2062 return 0 

2063 

2064 def get_bucket_region(self, bucket, response): 

2065 """ 

2066 There are multiple potential sources for the new region to redirect to, 

2067 but they aren't all universally available for use. This will try to 

2068 find region from response elements, but will fall back to calling 

2069 HEAD on the bucket if all else fails. 

2070 

2071 :param bucket: The bucket to find the region for. This is necessary if 

2072 the region is not available in the error response. 

2073 :param response: A response representing a service request that failed 

2074 due to incorrect region configuration. 

2075 """ 

2076 # First try to source the region from the headers. 

2077 service_response = response[1] 

2078 response_headers = service_response['ResponseMetadata']['HTTPHeaders'] 

2079 if 'x-amz-bucket-region' in response_headers: 

2080 return response_headers['x-amz-bucket-region'] 

2081 

2082 # Next, check the error body 

2083 region = service_response.get('Error', {}).get('Region', None) 

2084 if region is not None: 

2085 return region 

2086 

2087 # Finally, HEAD the bucket. No other choice sadly. 

2088 try: 

2089 response = self._client.head_bucket(Bucket=bucket) 

2090 headers = response['ResponseMetadata']['HTTPHeaders'] 

2091 except ClientError as e: 

2092 headers = e.response['ResponseMetadata']['HTTPHeaders'] 

2093 

2094 region = headers.get('x-amz-bucket-region', None) 

2095 return region 

2096 

2097 def set_request_url(self, params, context, **kwargs): 

2098 endpoint = context.get('signing', {}).get('endpoint', None) 

2099 if endpoint is not None: 

2100 params['url'] = _get_new_endpoint(params['url'], endpoint, False) 

2101 

2102 def redirect_from_cache(self, params, context, **kwargs): 

2103 """ 

2104 This handler retrieves a given bucket's signing context from the cache 

2105 and adds it into the request context. 

2106 """ 

2107 if self._is_s3_accesspoint(context): 

2108 return 

2109 bucket = params.get('Bucket') 

2110 signing_context = self._cache.get(bucket) 

2111 if signing_context is not None: 

2112 context['signing'] = signing_context 

2113 else: 

2114 context['signing'] = {'bucket': bucket} 

2115 

2116 def _is_s3_accesspoint(self, context): 

2117 return 's3_accesspoint' in context 

2118 

2119 

2120class InvalidArnException(ValueError): 

2121 pass 

2122 

2123 

2124class ArnParser: 

2125 def parse_arn(self, arn): 

2126 arn_parts = arn.split(':', 5) 

2127 if len(arn_parts) < 6: 

2128 raise InvalidArnException( 

2129 f'Provided ARN: {arn} must be of the format: ' 

2130 'arn:partition:service:region:account:resource' 

2131 ) 

2132 return { 

2133 'partition': arn_parts[1], 

2134 'service': arn_parts[2], 

2135 'region': arn_parts[3], 

2136 'account': arn_parts[4], 

2137 'resource': arn_parts[5], 

2138 } 

2139 

2140 @staticmethod 

2141 def is_arn(value): 

2142 if not isinstance(value, str) or not value.startswith('arn:'): 

2143 return False 

2144 arn_parser = ArnParser() 

2145 try: 

2146 arn_parser.parse_arn(value) 

2147 return True 

2148 except InvalidArnException: 

2149 return False 

2150 

2151 

2152class S3ArnParamHandler: 

2153 _RESOURCE_REGEX = re.compile( 

2154 r'^(?P<resource_type>accesspoint|outpost)[/:](?P<resource_name>.+)$' 

2155 ) 

2156 _OUTPOST_RESOURCE_REGEX = re.compile( 

2157 r'^(?P<outpost_name>[a-zA-Z0-9\-]{1,63})[/:]accesspoint[/:]' 

2158 r'(?P<accesspoint_name>[a-zA-Z0-9\-]{1,63}$)' 

2159 ) 

2160 _BLACKLISTED_OPERATIONS = ['CreateBucket'] 

2161 

2162 def __init__(self, arn_parser=None): 

2163 self._arn_parser = arn_parser 

2164 if arn_parser is None: 

2165 self._arn_parser = ArnParser() 

2166 

2167 def register(self, event_emitter): 

2168 event_emitter.register('before-parameter-build.s3', self.handle_arn) 

2169 

2170 def handle_arn(self, params, model, context, **kwargs): 

2171 if model.name in self._BLACKLISTED_OPERATIONS: 

2172 return 

2173 arn_details = self._get_arn_details_from_bucket_param(params) 

2174 if arn_details is None: 

2175 return 

2176 if arn_details['resource_type'] == 'accesspoint': 

2177 self._store_accesspoint(params, context, arn_details) 

2178 elif arn_details['resource_type'] == 'outpost': 

2179 self._store_outpost(params, context, arn_details) 

2180 

2181 def _get_arn_details_from_bucket_param(self, params): 

2182 if 'Bucket' in params: 

2183 try: 

2184 arn = params['Bucket'] 

2185 arn_details = self._arn_parser.parse_arn(arn) 

2186 self._add_resource_type_and_name(arn, arn_details) 

2187 return arn_details 

2188 except InvalidArnException: 

2189 pass 

2190 return None 

2191 

2192 def _add_resource_type_and_name(self, arn, arn_details): 

2193 match = self._RESOURCE_REGEX.match(arn_details['resource']) 

2194 if match: 

2195 arn_details['resource_type'] = match.group('resource_type') 

2196 arn_details['resource_name'] = match.group('resource_name') 

2197 else: 

2198 raise UnsupportedS3ArnError(arn=arn) 

2199 

2200 def _store_accesspoint(self, params, context, arn_details): 

2201 # Ideally the access-point would be stored as a parameter in the 

2202 # request where the serializer would then know how to serialize it, 

2203 # but access-points are not modeled in S3 operations so it would fail 

2204 # validation. Instead, we set the access-point to the bucket parameter 

2205 # to have some value set when serializing the request and additional 

2206 # information on the context from the arn to use in forming the 

2207 # access-point endpoint. 

2208 params['Bucket'] = arn_details['resource_name'] 

2209 context['s3_accesspoint'] = { 

2210 'name': arn_details['resource_name'], 

2211 'account': arn_details['account'], 

2212 'partition': arn_details['partition'], 

2213 'region': arn_details['region'], 

2214 'service': arn_details['service'], 

2215 } 

2216 

2217 def _store_outpost(self, params, context, arn_details): 

2218 resource_name = arn_details['resource_name'] 

2219 match = self._OUTPOST_RESOURCE_REGEX.match(resource_name) 

2220 if not match: 

2221 raise UnsupportedOutpostResourceError(resource_name=resource_name) 

2222 # Because we need to set the bucket name to something to pass 

2223 # validation we're going to use the access point name to be consistent 

2224 # with normal access point arns. 

2225 accesspoint_name = match.group('accesspoint_name') 

2226 params['Bucket'] = accesspoint_name 

2227 context['s3_accesspoint'] = { 

2228 'outpost_name': match.group('outpost_name'), 

2229 'name': accesspoint_name, 

2230 'account': arn_details['account'], 

2231 'partition': arn_details['partition'], 

2232 'region': arn_details['region'], 

2233 'service': arn_details['service'], 

2234 } 

2235 

2236 

2237class S3EndpointSetter: 

2238 _DEFAULT_PARTITION = 'aws' 

2239 _DEFAULT_DNS_SUFFIX = 'amazonaws.com' 

2240 

2241 def __init__( 

2242 self, 

2243 endpoint_resolver, 

2244 region=None, 

2245 s3_config=None, 

2246 endpoint_url=None, 

2247 partition=None, 

2248 use_fips_endpoint=False, 

2249 ): 

2250 # This is calling the endpoint_resolver in regions.py 

2251 self._endpoint_resolver = endpoint_resolver 

2252 self._region = region 

2253 self._s3_config = s3_config 

2254 self._use_fips_endpoint = use_fips_endpoint 

2255 if s3_config is None: 

2256 self._s3_config = {} 

2257 self._endpoint_url = endpoint_url 

2258 self._partition = partition 

2259 if partition is None: 

2260 self._partition = self._DEFAULT_PARTITION 

2261 

2262 def register(self, event_emitter): 

2263 event_emitter.register('before-sign.s3', self.set_endpoint) 

2264 event_emitter.register('choose-signer.s3', self.set_signer) 

2265 event_emitter.register( 

2266 'before-call.s3.WriteGetObjectResponse', 

2267 self.update_endpoint_to_s3_object_lambda, 

2268 ) 

2269 

2270 def update_endpoint_to_s3_object_lambda(self, params, context, **kwargs): 

2271 if self._use_accelerate_endpoint: 

2272 raise UnsupportedS3ConfigurationError( 

2273 msg='S3 client does not support accelerate endpoints for S3 Object Lambda operations', 

2274 ) 

2275 

2276 self._override_signing_name(context, 's3-object-lambda') 

2277 if self._endpoint_url: 

2278 # Only update the url if an explicit url was not provided 

2279 return 

2280 

2281 resolver = self._endpoint_resolver 

2282 # Constructing endpoints as s3-object-lambda as region 

2283 resolved = resolver.construct_endpoint( 

2284 's3-object-lambda', self._region 

2285 ) 

2286 

2287 # Ideally we would be able to replace the endpoint before 

2288 # serialization but there's no event to do that currently 

2289 # host_prefix is all the arn/bucket specs 

2290 new_endpoint = 'https://{host_prefix}{hostname}'.format( 

2291 host_prefix=params['host_prefix'], 

2292 hostname=resolved['hostname'], 

2293 ) 

2294 

2295 params['url'] = _get_new_endpoint(params['url'], new_endpoint, False) 

2296 

2297 def set_endpoint(self, request, **kwargs): 

2298 if self._use_accesspoint_endpoint(request): 

2299 self._validate_accesspoint_supported(request) 

2300 self._validate_fips_supported(request) 

2301 self._validate_global_regions(request) 

2302 region_name = self._resolve_region_for_accesspoint_endpoint( 

2303 request 

2304 ) 

2305 self._resolve_signing_name_for_accesspoint_endpoint(request) 

2306 self._switch_to_accesspoint_endpoint(request, region_name) 

2307 return 

2308 if self._use_accelerate_endpoint: 

2309 if self._use_fips_endpoint: 

2310 raise UnsupportedS3ConfigurationError( 

2311 msg=( 

2312 'Client is configured to use the FIPS psuedo region ' 

2313 f'for "{self._region}", but S3 Accelerate does not have any FIPS ' 

2314 'compatible endpoints.' 

2315 ) 

2316 ) 

2317 switch_host_s3_accelerate(request=request, **kwargs) 

2318 if self._s3_addressing_handler: 

2319 self._s3_addressing_handler(request=request, **kwargs) 

2320 

2321 def _use_accesspoint_endpoint(self, request): 

2322 return 's3_accesspoint' in request.context 

2323 

2324 def _validate_fips_supported(self, request): 

2325 if not self._use_fips_endpoint: 

2326 return 

2327 if 'fips' in request.context['s3_accesspoint']['region']: 

2328 raise UnsupportedS3AccesspointConfigurationError( 

2329 msg={'Invalid ARN, FIPS region not allowed in ARN.'} 

2330 ) 

2331 if 'outpost_name' in request.context['s3_accesspoint']: 

2332 raise UnsupportedS3AccesspointConfigurationError( 

2333 msg=( 

2334 f'Client is configured to use the FIPS psuedo-region "{self._region}", ' 

2335 'but outpost ARNs do not support FIPS endpoints.' 

2336 ) 

2337 ) 

2338 # Transforming psuedo region to actual region 

2339 accesspoint_region = request.context['s3_accesspoint']['region'] 

2340 if accesspoint_region != self._region: 

2341 if not self._s3_config.get('use_arn_region', True): 

2342 # TODO: Update message to reflect use_arn_region 

2343 # is not set 

2344 raise UnsupportedS3AccesspointConfigurationError( 

2345 msg=( 

2346 'Client is configured to use the FIPS psuedo-region ' 

2347 f'for "{self._region}", but the access-point ARN provided is for ' 

2348 f'the "{accesspoint_region}" region. For clients using a FIPS ' 

2349 'psuedo-region calls to access-point ARNs in another ' 

2350 'region are not allowed.' 

2351 ) 

2352 ) 

2353 

2354 def _validate_global_regions(self, request): 

2355 if self._s3_config.get('use_arn_region', True): 

2356 return 

2357 if self._region in ['aws-global', 's3-external-1']: 

2358 raise UnsupportedS3AccesspointConfigurationError( 

2359 msg=( 

2360 'Client is configured to use the global psuedo-region ' 

2361 f'"{self._region}". When providing access-point ARNs a regional ' 

2362 'endpoint must be specified.' 

2363 ) 

2364 ) 

2365 

2366 def _validate_accesspoint_supported(self, request): 

2367 if self._use_accelerate_endpoint: 

2368 raise UnsupportedS3AccesspointConfigurationError( 

2369 msg=( 

2370 'Client does not support s3 accelerate configuration ' 

2371 'when an access-point ARN is specified.' 

2372 ) 

2373 ) 

2374 request_partition = request.context['s3_accesspoint']['partition'] 

2375 if request_partition != self._partition: 

2376 raise UnsupportedS3AccesspointConfigurationError( 

2377 msg=( 

2378 f'Client is configured for "{self._partition}" partition, but access-point' 

2379 f' ARN provided is for "{request_partition}" partition. The client and ' 

2380 ' access-point partition must be the same.' 

2381 ) 

2382 ) 

2383 s3_service = request.context['s3_accesspoint'].get('service') 

2384 if s3_service == 's3-object-lambda' and self._s3_config.get( 

2385 'use_dualstack_endpoint' 

2386 ): 

2387 raise UnsupportedS3AccesspointConfigurationError( 

2388 msg=( 

2389 'Client does not support s3 dualstack configuration ' 

2390 'when an S3 Object Lambda access point ARN is specified.' 

2391 ) 

2392 ) 

2393 outpost_name = request.context['s3_accesspoint'].get('outpost_name') 

2394 if outpost_name and self._s3_config.get('use_dualstack_endpoint'): 

2395 raise UnsupportedS3AccesspointConfigurationError( 

2396 msg=( 

2397 'Client does not support s3 dualstack configuration ' 

2398 'when an outpost ARN is specified.' 

2399 ) 

2400 ) 

2401 self._validate_mrap_s3_config(request) 

2402 

2403 def _validate_mrap_s3_config(self, request): 

2404 if not is_global_accesspoint(request.context): 

2405 return 

2406 if self._s3_config.get('s3_disable_multiregion_access_points'): 

2407 raise UnsupportedS3AccesspointConfigurationError( 

2408 msg=( 

2409 'Invalid configuration, Multi-Region Access Point ' 

2410 'ARNs are disabled.' 

2411 ) 

2412 ) 

2413 elif self._s3_config.get('use_dualstack_endpoint'): 

2414 raise UnsupportedS3AccesspointConfigurationError( 

2415 msg=( 

2416 'Client does not support s3 dualstack configuration ' 

2417 'when a Multi-Region Access Point ARN is specified.' 

2418 ) 

2419 ) 

2420 

2421 def _resolve_region_for_accesspoint_endpoint(self, request): 

2422 if is_global_accesspoint(request.context): 

2423 # Requests going to MRAP endpoints MUST be set to any (*) region. 

2424 self._override_signing_region(request, '*') 

2425 elif self._s3_config.get('use_arn_region', True): 

2426 accesspoint_region = request.context['s3_accesspoint']['region'] 

2427 # If we are using the region from the access point, 

2428 # we will also want to make sure that we set it as the 

2429 # signing region as well 

2430 self._override_signing_region(request, accesspoint_region) 

2431 return accesspoint_region 

2432 return self._region 

2433 

2434 def set_signer(self, context, **kwargs): 

2435 if is_global_accesspoint(context): 

2436 if HAS_CRT: 

2437 return 's3v4a' 

2438 else: 

2439 raise MissingDependencyException( 

2440 msg="Using S3 with an MRAP arn requires an additional " 

2441 "dependency. You will need to pip install " 

2442 "botocore[crt] before proceeding." 

2443 ) 

2444 

2445 def _resolve_signing_name_for_accesspoint_endpoint(self, request): 

2446 accesspoint_service = request.context['s3_accesspoint']['service'] 

2447 self._override_signing_name(request.context, accesspoint_service) 

2448 

2449 def _switch_to_accesspoint_endpoint(self, request, region_name): 

2450 original_components = urlsplit(request.url) 

2451 accesspoint_endpoint = urlunsplit( 

2452 ( 

2453 original_components.scheme, 

2454 self._get_netloc(request.context, region_name), 

2455 self._get_accesspoint_path( 

2456 original_components.path, request.context 

2457 ), 

2458 original_components.query, 

2459 '', 

2460 ) 

2461 ) 

2462 logger.debug( 

2463 'Updating URI from %s to %s', request.url, accesspoint_endpoint 

2464 ) 

2465 request.url = accesspoint_endpoint 

2466 

2467 def _get_netloc(self, request_context, region_name): 

2468 if is_global_accesspoint(request_context): 

2469 return self._get_mrap_netloc(request_context) 

2470 else: 

2471 return self._get_accesspoint_netloc(request_context, region_name) 

2472 

2473 def _get_mrap_netloc(self, request_context): 

2474 s3_accesspoint = request_context['s3_accesspoint'] 

2475 region_name = 's3-global' 

2476 mrap_netloc_components = [s3_accesspoint['name']] 

2477 if self._endpoint_url: 

2478 endpoint_url_netloc = urlsplit(self._endpoint_url).netloc 

2479 mrap_netloc_components.append(endpoint_url_netloc) 

2480 else: 

2481 partition = s3_accesspoint['partition'] 

2482 mrap_netloc_components.extend( 

2483 [ 

2484 'accesspoint', 

2485 region_name, 

2486 self._get_partition_dns_suffix(partition), 

2487 ] 

2488 ) 

2489 return '.'.join(mrap_netloc_components) 

2490 

2491 def _get_accesspoint_netloc(self, request_context, region_name): 

2492 s3_accesspoint = request_context['s3_accesspoint'] 

2493 accesspoint_netloc_components = [ 

2494 '{}-{}'.format(s3_accesspoint['name'], s3_accesspoint['account']), 

2495 ] 

2496 outpost_name = s3_accesspoint.get('outpost_name') 

2497 if self._endpoint_url: 

2498 if outpost_name: 

2499 accesspoint_netloc_components.append(outpost_name) 

2500 endpoint_url_netloc = urlsplit(self._endpoint_url).netloc 

2501 accesspoint_netloc_components.append(endpoint_url_netloc) 

2502 else: 

2503 if outpost_name: 

2504 outpost_host = [outpost_name, 's3-outposts'] 

2505 accesspoint_netloc_components.extend(outpost_host) 

2506 elif s3_accesspoint['service'] == 's3-object-lambda': 

2507 component = self._inject_fips_if_needed( 

2508 's3-object-lambda', request_context 

2509 ) 

2510 accesspoint_netloc_components.append(component) 

2511 else: 

2512 component = self._inject_fips_if_needed( 

2513 's3-accesspoint', request_context 

2514 ) 

2515 accesspoint_netloc_components.append(component) 

2516 if self._s3_config.get('use_dualstack_endpoint'): 

2517 accesspoint_netloc_components.append('dualstack') 

2518 accesspoint_netloc_components.extend( 

2519 [region_name, self._get_dns_suffix(region_name)] 

2520 ) 

2521 return '.'.join(accesspoint_netloc_components) 

2522 

2523 def _inject_fips_if_needed(self, component, request_context): 

2524 if self._use_fips_endpoint: 

2525 return f'{component}-fips' 

2526 return component 

2527 

2528 def _get_accesspoint_path(self, original_path, request_context): 

2529 # The Bucket parameter was substituted with the access-point name as 

2530 # some value was required in serializing the bucket name. Now that 

2531 # we are making the request directly to the access point, we will 

2532 # want to remove that access-point name from the path. 

2533 name = request_context['s3_accesspoint']['name'] 

2534 # All S3 operations require at least a / in their path. 

2535 return original_path.replace('/' + name, '', 1) or '/' 

2536 

2537 def _get_partition_dns_suffix(self, partition_name): 

2538 dns_suffix = self._endpoint_resolver.get_partition_dns_suffix( 

2539 partition_name 

2540 ) 

2541 if dns_suffix is None: 

2542 dns_suffix = self._DEFAULT_DNS_SUFFIX 

2543 return dns_suffix 

2544 

2545 def _get_dns_suffix(self, region_name): 

2546 resolved = self._endpoint_resolver.construct_endpoint( 

2547 's3', region_name 

2548 ) 

2549 dns_suffix = self._DEFAULT_DNS_SUFFIX 

2550 if resolved and 'dnsSuffix' in resolved: 

2551 dns_suffix = resolved['dnsSuffix'] 

2552 return dns_suffix 

2553 

2554 def _override_signing_region(self, request, region_name): 

2555 signing_context = request.context.get('signing', {}) 

2556 # S3SigV4Auth will use the context['signing']['region'] value to 

2557 # sign with if present. This is used by the Bucket redirector 

2558 # as well but we should be fine because the redirector is never 

2559 # used in combination with the accesspoint setting logic. 

2560 signing_context['region'] = region_name 

2561 request.context['signing'] = signing_context 

2562 

2563 def _override_signing_name(self, context, signing_name): 

2564 signing_context = context.get('signing', {}) 

2565 # S3SigV4Auth will use the context['signing']['signing_name'] value to 

2566 # sign with if present. This is used by the Bucket redirector 

2567 # as well but we should be fine because the redirector is never 

2568 # used in combination with the accesspoint setting logic. 

2569 signing_context['signing_name'] = signing_name 

2570 context['signing'] = signing_context 

2571 

2572 @CachedProperty 

2573 def _use_accelerate_endpoint(self): 

2574 # Enable accelerate if the configuration is set to to true or the 

2575 # endpoint being used matches one of the accelerate endpoints. 

2576 

2577 # Accelerate has been explicitly configured. 

2578 if self._s3_config.get('use_accelerate_endpoint'): 

2579 return True 

2580 

2581 # Accelerate mode is turned on automatically if an endpoint url is 

2582 # provided that matches the accelerate scheme. 

2583 if self._endpoint_url is None: 

2584 return False 

2585 

2586 # Accelerate is only valid for Amazon endpoints. 

2587 netloc = urlsplit(self._endpoint_url).netloc 

2588 if not netloc.endswith('amazonaws.com'): 

2589 return False 

2590 

2591 # The first part of the url should always be s3-accelerate. 

2592 parts = netloc.split('.') 

2593 if parts[0] != 's3-accelerate': 

2594 return False 

2595 

2596 # Url parts between 's3-accelerate' and 'amazonaws.com' which 

2597 # represent different url features. 

2598 feature_parts = parts[1:-2] 

2599 

2600 # There should be no duplicate url parts. 

2601 if len(feature_parts) != len(set(feature_parts)): 

2602 return False 

2603 

2604 # Remaining parts must all be in the whitelist. 

2605 return all(p in S3_ACCELERATE_WHITELIST for p in feature_parts) 

2606 

2607 @CachedProperty 

2608 def _addressing_style(self): 

2609 # Use virtual host style addressing if accelerate is enabled or if 

2610 # the given endpoint url is an accelerate endpoint. 

2611 if self._use_accelerate_endpoint: 

2612 return 'virtual' 

2613 

2614 # If a particular addressing style is configured, use it. 

2615 configured_addressing_style = self._s3_config.get('addressing_style') 

2616 if configured_addressing_style: 

2617 return configured_addressing_style 

2618 

2619 @CachedProperty 

2620 def _s3_addressing_handler(self): 

2621 # If virtual host style was configured, use it regardless of whether 

2622 # or not the bucket looks dns compatible. 

2623 if self._addressing_style == 'virtual': 

2624 logger.debug("Using S3 virtual host style addressing.") 

2625 return switch_to_virtual_host_style 

2626 

2627 # If path style is configured, no additional steps are needed. If 

2628 # endpoint_url was specified, don't default to virtual. We could 

2629 # potentially default provided endpoint urls to virtual hosted 

2630 # style, but for now it is avoided. 

2631 if self._addressing_style == 'path' or self._endpoint_url is not None: 

2632 logger.debug("Using S3 path style addressing.") 

2633 return None 

2634 

2635 logger.debug( 

2636 "Defaulting to S3 virtual host style addressing with " 

2637 "path style addressing fallback." 

2638 ) 

2639 

2640 # By default, try to use virtual style with path fallback. 

2641 return fix_s3_host 

2642 

2643 

2644class S3ControlEndpointSetter: 

2645 _DEFAULT_PARTITION = 'aws' 

2646 _DEFAULT_DNS_SUFFIX = 'amazonaws.com' 

2647 _HOST_LABEL_REGEX = re.compile(r'^[a-zA-Z0-9\-]{1,63}$') 

2648 

2649 def __init__( 

2650 self, 

2651 endpoint_resolver, 

2652 region=None, 

2653 s3_config=None, 

2654 endpoint_url=None, 

2655 partition=None, 

2656 use_fips_endpoint=False, 

2657 ): 

2658 self._endpoint_resolver = endpoint_resolver 

2659 self._region = region 

2660 self._s3_config = s3_config 

2661 self._use_fips_endpoint = use_fips_endpoint 

2662 if s3_config is None: 

2663 self._s3_config = {} 

2664 self._endpoint_url = endpoint_url 

2665 self._partition = partition 

2666 if partition is None: 

2667 self._partition = self._DEFAULT_PARTITION 

2668 

2669 def register(self, event_emitter): 

2670 event_emitter.register('before-sign.s3-control', self.set_endpoint) 

2671 

2672 def set_endpoint(self, request, **kwargs): 

2673 if self._use_endpoint_from_arn_details(request): 

2674 self._validate_endpoint_from_arn_details_supported(request) 

2675 region_name = self._resolve_region_from_arn_details(request) 

2676 self._resolve_signing_name_from_arn_details(request) 

2677 self._resolve_endpoint_from_arn_details(request, region_name) 

2678 self._add_headers_from_arn_details(request) 

2679 elif self._use_endpoint_from_outpost_id(request): 

2680 self._validate_outpost_redirection_valid(request) 

2681 self._override_signing_name(request, 's3-outposts') 

2682 new_netloc = self._construct_outpost_endpoint(self._region) 

2683 self._update_request_netloc(request, new_netloc) 

2684 

2685 def _use_endpoint_from_arn_details(self, request): 

2686 return 'arn_details' in request.context 

2687 

2688 def _use_endpoint_from_outpost_id(self, request): 

2689 return 'outpost_id' in request.context 

2690 

2691 def _validate_endpoint_from_arn_details_supported(self, request): 

2692 if 'fips' in request.context['arn_details']['region']: 

2693 raise UnsupportedS3ControlArnError( 

2694 arn=request.context['arn_details']['original'], 

2695 msg='Invalid ARN, FIPS region not allowed in ARN.', 

2696 ) 

2697 if not self._s3_config.get('use_arn_region', False): 

2698 arn_region = request.context['arn_details']['region'] 

2699 if arn_region != self._region: 

2700 error_msg = ( 

2701 'The use_arn_region configuration is disabled but ' 

2702 f'received arn for "{arn_region}" when the client is configured ' 

2703 f'to use "{self._region}"' 

2704 ) 

2705 raise UnsupportedS3ControlConfigurationError(msg=error_msg) 

2706 request_partion = request.context['arn_details']['partition'] 

2707 if request_partion != self._partition: 

2708 raise UnsupportedS3ControlConfigurationError( 

2709 msg=( 

2710 f'Client is configured for "{self._partition}" partition, but arn ' 

2711 f'provided is for "{request_partion}" partition. The client and ' 

2712 'arn partition must be the same.' 

2713 ) 

2714 ) 

2715 if self._s3_config.get('use_accelerate_endpoint'): 

2716 raise UnsupportedS3ControlConfigurationError( 

2717 msg='S3 control client does not support accelerate endpoints', 

2718 ) 

2719 if 'outpost_name' in request.context['arn_details']: 

2720 self._validate_outpost_redirection_valid(request) 

2721 

2722 def _validate_outpost_redirection_valid(self, request): 

2723 if self._s3_config.get('use_dualstack_endpoint'): 

2724 raise UnsupportedS3ControlConfigurationError( 

2725 msg=( 

2726 'Client does not support s3 dualstack configuration ' 

2727 'when an outpost is specified.' 

2728 ) 

2729 ) 

2730 

2731 def _resolve_region_from_arn_details(self, request): 

2732 if self._s3_config.get('use_arn_region', False): 

2733 arn_region = request.context['arn_details']['region'] 

2734 # If we are using the region from the expanded arn, we will also 

2735 # want to make sure that we set it as the signing region as well 

2736 self._override_signing_region(request, arn_region) 

2737 return arn_region 

2738 return self._region 

2739 

2740 def _resolve_signing_name_from_arn_details(self, request): 

2741 arn_service = request.context['arn_details']['service'] 

2742 self._override_signing_name(request, arn_service) 

2743 return arn_service 

2744 

2745 def _resolve_endpoint_from_arn_details(self, request, region_name): 

2746 new_netloc = self._resolve_netloc_from_arn_details( 

2747 request, region_name 

2748 ) 

2749 self._update_request_netloc(request, new_netloc) 

2750 

2751 def _update_request_netloc(self, request, new_netloc): 

2752 original_components = urlsplit(request.url) 

2753 arn_details_endpoint = urlunsplit( 

2754 ( 

2755 original_components.scheme, 

2756 new_netloc, 

2757 original_components.path, 

2758 original_components.query, 

2759 '', 

2760 ) 

2761 ) 

2762 logger.debug( 

2763 'Updating URI from %s to %s', request.url, arn_details_endpoint 

2764 ) 

2765 request.url = arn_details_endpoint 

2766 

2767 def _resolve_netloc_from_arn_details(self, request, region_name): 

2768 arn_details = request.context['arn_details'] 

2769 if 'outpost_name' in arn_details: 

2770 return self._construct_outpost_endpoint(region_name) 

2771 account = arn_details['account'] 

2772 return self._construct_s3_control_endpoint(region_name, account) 

2773 

2774 def _is_valid_host_label(self, label): 

2775 return self._HOST_LABEL_REGEX.match(label) 

2776 

2777 def _validate_host_labels(self, *labels): 

2778 for label in labels: 

2779 if not self._is_valid_host_label(label): 

2780 raise InvalidHostLabelError(label=label) 

2781 

2782 def _construct_s3_control_endpoint(self, region_name, account): 

2783 self._validate_host_labels(region_name, account) 

2784 if self._endpoint_url: 

2785 endpoint_url_netloc = urlsplit(self._endpoint_url).netloc 

2786 netloc = [account, endpoint_url_netloc] 

2787 else: 

2788 netloc = [ 

2789 account, 

2790 's3-control', 

2791 ] 

2792 self._add_dualstack(netloc) 

2793 dns_suffix = self._get_dns_suffix(region_name) 

2794 netloc.extend([region_name, dns_suffix]) 

2795 return self._construct_netloc(netloc) 

2796 

2797 def _construct_outpost_endpoint(self, region_name): 

2798 self._validate_host_labels(region_name) 

2799 if self._endpoint_url: 

2800 return urlsplit(self._endpoint_url).netloc 

2801 else: 

2802 netloc = [ 

2803 's3-outposts', 

2804 region_name, 

2805 self._get_dns_suffix(region_name), 

2806 ] 

2807 self._add_fips(netloc) 

2808 return self._construct_netloc(netloc) 

2809 

2810 def _construct_netloc(self, netloc): 

2811 return '.'.join(netloc) 

2812 

2813 def _add_fips(self, netloc): 

2814 if self._use_fips_endpoint: 

2815 netloc[0] = netloc[0] + '-fips' 

2816 

2817 def _add_dualstack(self, netloc): 

2818 if self._s3_config.get('use_dualstack_endpoint'): 

2819 netloc.append('dualstack') 

2820 

2821 def _get_dns_suffix(self, region_name): 

2822 resolved = self._endpoint_resolver.construct_endpoint( 

2823 's3', region_name 

2824 ) 

2825 dns_suffix = self._DEFAULT_DNS_SUFFIX 

2826 if resolved and 'dnsSuffix' in resolved: 

2827 dns_suffix = resolved['dnsSuffix'] 

2828 return dns_suffix 

2829 

2830 def _override_signing_region(self, request, region_name): 

2831 signing_context = request.context.get('signing', {}) 

2832 # S3SigV4Auth will use the context['signing']['region'] value to 

2833 # sign with if present. This is used by the Bucket redirector 

2834 # as well but we should be fine because the redirector is never 

2835 # used in combination with the accesspoint setting logic. 

2836 signing_context['region'] = region_name 

2837 request.context['signing'] = signing_context 

2838 

2839 def _override_signing_name(self, request, signing_name): 

2840 signing_context = request.context.get('signing', {}) 

2841 # S3SigV4Auth will use the context['signing']['signing_name'] value to 

2842 # sign with if present. This is used by the Bucket redirector 

2843 # as well but we should be fine because the redirector is never 

2844 # used in combination with the accesspoint setting logic. 

2845 signing_context['signing_name'] = signing_name 

2846 request.context['signing'] = signing_context 

2847 

2848 def _add_headers_from_arn_details(self, request): 

2849 arn_details = request.context['arn_details'] 

2850 outpost_name = arn_details.get('outpost_name') 

2851 if outpost_name: 

2852 self._add_outpost_id_header(request, outpost_name) 

2853 

2854 def _add_outpost_id_header(self, request, outpost_name): 

2855 request.headers['x-amz-outpost-id'] = outpost_name 

2856 

2857 

2858class S3ControlArnParamHandler: 

2859 """This handler has been replaced by S3ControlArnParamHandlerv2. The 

2860 original version remains in place for any third-party importers. 

2861 """ 

2862 

2863 _RESOURCE_SPLIT_REGEX = re.compile(r'[/:]') 

2864 

2865 def __init__(self, arn_parser=None): 

2866 self._arn_parser = arn_parser 

2867 if arn_parser is None: 

2868 self._arn_parser = ArnParser() 

2869 warnings.warn( 

2870 'The S3ControlArnParamHandler class has been deprecated for a new ' 

2871 'internal replacement. A future version of botocore may remove ' 

2872 'this class.', 

2873 category=FutureWarning, 

2874 ) 

2875 

2876 def register(self, event_emitter): 

2877 event_emitter.register( 

2878 'before-parameter-build.s3-control', 

2879 self.handle_arn, 

2880 ) 

2881 

2882 def handle_arn(self, params, model, context, **kwargs): 

2883 if model.name in ('CreateBucket', 'ListRegionalBuckets'): 

2884 # CreateBucket and ListRegionalBuckets are special cases that do 

2885 # not obey ARN based redirection but will redirect based off of the 

2886 # presence of the OutpostId parameter 

2887 self._handle_outpost_id_param(params, model, context) 

2888 else: 

2889 self._handle_name_param(params, model, context) 

2890 self._handle_bucket_param(params, model, context) 

2891 

2892 def _get_arn_details_from_param(self, params, param_name): 

2893 if param_name not in params: 

2894 return None 

2895 try: 

2896 arn = params[param_name] 

2897 arn_details = self._arn_parser.parse_arn(arn) 

2898 arn_details['original'] = arn 

2899 arn_details['resources'] = self._split_resource(arn_details) 

2900 return arn_details 

2901 except InvalidArnException: 

2902 return None 

2903 

2904 def _split_resource(self, arn_details): 

2905 return self._RESOURCE_SPLIT_REGEX.split(arn_details['resource']) 

2906 

2907 def _override_account_id_param(self, params, arn_details): 

2908 account_id = arn_details['account'] 

2909 if 'AccountId' in params and params['AccountId'] != account_id: 

2910 error_msg = ( 

2911 'Account ID in arn does not match the AccountId parameter ' 

2912 'provided: "{}"' 

2913 ).format(params['AccountId']) 

2914 raise UnsupportedS3ControlArnError( 

2915 arn=arn_details['original'], 

2916 msg=error_msg, 

2917 ) 

2918 params['AccountId'] = account_id 

2919 

2920 def _handle_outpost_id_param(self, params, model, context): 

2921 if 'OutpostId' not in params: 

2922 return 

2923 context['outpost_id'] = params['OutpostId'] 

2924 

2925 def _handle_name_param(self, params, model, context): 

2926 # CreateAccessPoint is a special case that does not expand Name 

2927 if model.name == 'CreateAccessPoint': 

2928 return 

2929 arn_details = self._get_arn_details_from_param(params, 'Name') 

2930 if arn_details is None: 

2931 return 

2932 if self._is_outpost_accesspoint(arn_details): 

2933 self._store_outpost_accesspoint(params, context, arn_details) 

2934 else: 

2935 error_msg = 'The Name parameter does not support the provided ARN' 

2936 raise UnsupportedS3ControlArnError( 

2937 arn=arn_details['original'], 

2938 msg=error_msg, 

2939 ) 

2940 

2941 def _is_outpost_accesspoint(self, arn_details): 

2942 if arn_details['service'] != 's3-outposts': 

2943 return False 

2944 resources = arn_details['resources'] 

2945 if len(resources) != 4: 

2946 return False 

2947 # Resource must be of the form outpost/op-123/accesspoint/name 

2948 return resources[0] == 'outpost' and resources[2] == 'accesspoint' 

2949 

2950 def _store_outpost_accesspoint(self, params, context, arn_details): 

2951 self._override_account_id_param(params, arn_details) 

2952 accesspoint_name = arn_details['resources'][3] 

2953 params['Name'] = accesspoint_name 

2954 arn_details['accesspoint_name'] = accesspoint_name 

2955 arn_details['outpost_name'] = arn_details['resources'][1] 

2956 context['arn_details'] = arn_details 

2957 

2958 def _handle_bucket_param(self, params, model, context): 

2959 arn_details = self._get_arn_details_from_param(params, 'Bucket') 

2960 if arn_details is None: 

2961 return 

2962 if self._is_outpost_bucket(arn_details): 

2963 self._store_outpost_bucket(params, context, arn_details) 

2964 else: 

2965 error_msg = ( 

2966 'The Bucket parameter does not support the provided ARN' 

2967 ) 

2968 raise UnsupportedS3ControlArnError( 

2969 arn=arn_details['original'], 

2970 msg=error_msg, 

2971 ) 

2972 

2973 def _is_outpost_bucket(self, arn_details): 

2974 if arn_details['service'] != 's3-outposts': 

2975 return False 

2976 resources = arn_details['resources'] 

2977 if len(resources) != 4: 

2978 return False 

2979 # Resource must be of the form outpost/op-123/bucket/name 

2980 return resources[0] == 'outpost' and resources[2] == 'bucket' 

2981 

2982 def _store_outpost_bucket(self, params, context, arn_details): 

2983 self._override_account_id_param(params, arn_details) 

2984 bucket_name = arn_details['resources'][3] 

2985 params['Bucket'] = bucket_name 

2986 arn_details['bucket_name'] = bucket_name 

2987 arn_details['outpost_name'] = arn_details['resources'][1] 

2988 context['arn_details'] = arn_details 

2989 

2990 

2991class S3ControlArnParamHandlerv2(S3ControlArnParamHandler): 

2992 """Updated version of S3ControlArnParamHandler for use when 

2993 EndpointRulesetResolver is in use for endpoint resolution. 

2994 

2995 This class is considered private and subject to abrupt breaking changes or 

2996 removal without prior announcement. Please do not use it directly. 

2997 """ 

2998 

2999 def __init__(self, arn_parser=None): 

3000 self._arn_parser = arn_parser 

3001 if arn_parser is None: 

3002 self._arn_parser = ArnParser() 

3003 

3004 def register(self, event_emitter): 

3005 event_emitter.register( 

3006 'before-endpoint-resolution.s3-control', 

3007 self.handle_arn, 

3008 ) 

3009 

3010 def _handle_name_param(self, params, model, context): 

3011 # CreateAccessPoint is a special case that does not expand Name 

3012 if model.name == 'CreateAccessPoint': 

3013 return 

3014 arn_details = self._get_arn_details_from_param(params, 'Name') 

3015 if arn_details is None: 

3016 return 

3017 self._raise_for_fips_pseudo_region(arn_details) 

3018 self._raise_for_accelerate_endpoint(context) 

3019 if self._is_outpost_accesspoint(arn_details): 

3020 self._store_outpost_accesspoint(params, context, arn_details) 

3021 else: 

3022 error_msg = 'The Name parameter does not support the provided ARN' 

3023 raise UnsupportedS3ControlArnError( 

3024 arn=arn_details['original'], 

3025 msg=error_msg, 

3026 ) 

3027 

3028 def _store_outpost_accesspoint(self, params, context, arn_details): 

3029 self._override_account_id_param(params, arn_details) 

3030 

3031 def _handle_bucket_param(self, params, model, context): 

3032 arn_details = self._get_arn_details_from_param(params, 'Bucket') 

3033 if arn_details is None: 

3034 return 

3035 self._raise_for_fips_pseudo_region(arn_details) 

3036 self._raise_for_accelerate_endpoint(context) 

3037 if self._is_outpost_bucket(arn_details): 

3038 self._store_outpost_bucket(params, context, arn_details) 

3039 else: 

3040 error_msg = ( 

3041 'The Bucket parameter does not support the provided ARN' 

3042 ) 

3043 raise UnsupportedS3ControlArnError( 

3044 arn=arn_details['original'], 

3045 msg=error_msg, 

3046 ) 

3047 

3048 def _store_outpost_bucket(self, params, context, arn_details): 

3049 self._override_account_id_param(params, arn_details) 

3050 

3051 def _raise_for_fips_pseudo_region(self, arn_details): 

3052 # FIPS pseudo region names cannot be used in ARNs 

3053 arn_region = arn_details['region'] 

3054 if arn_region.startswith('fips-') or arn_region.endswith('fips-'): 

3055 raise UnsupportedS3ControlArnError( 

3056 arn=arn_details['original'], 

3057 msg='Invalid ARN, FIPS region not allowed in ARN.', 

3058 ) 

3059 

3060 def _raise_for_accelerate_endpoint(self, context): 

3061 s3_config = context['client_config'].s3 or {} 

3062 if s3_config.get('use_accelerate_endpoint'): 

3063 raise UnsupportedS3ControlConfigurationError( 

3064 msg='S3 control client does not support accelerate endpoints', 

3065 ) 

3066 

3067 

3068class ContainerMetadataFetcher: 

3069 TIMEOUT_SECONDS = 2 

3070 RETRY_ATTEMPTS = 3 

3071 SLEEP_TIME = 1 

3072 IP_ADDRESS = '169.254.170.2' 

3073 _ALLOWED_HOSTS = [ 

3074 IP_ADDRESS, 

3075 '169.254.170.23', 

3076 'fd00:ec2::23', 

3077 'localhost', 

3078 ] 

3079 

3080 def __init__(self, session=None, sleep=time.sleep): 

3081 if session is None: 

3082 session = botocore.httpsession.URLLib3Session( 

3083 timeout=self.TIMEOUT_SECONDS 

3084 ) 

3085 self._session = session 

3086 self._sleep = sleep 

3087 

3088 def retrieve_full_uri(self, full_url, headers=None): 

3089 """Retrieve JSON metadata from container metadata. 

3090 

3091 :type full_url: str 

3092 :param full_url: The full URL of the metadata service. 

3093 This should include the scheme as well, e.g 

3094 "http://localhost:123/foo" 

3095 

3096 """ 

3097 self._validate_allowed_url(full_url) 

3098 return self._retrieve_credentials(full_url, headers) 

3099 

3100 def _validate_allowed_url(self, full_url): 

3101 parsed = botocore.compat.urlparse(full_url) 

3102 

3103 if parsed.scheme == 'https': 

3104 return 

3105 if self._is_loopback_address(parsed.hostname): 

3106 return 

3107 is_whitelisted_host = self._check_if_whitelisted_host(parsed.hostname) 

3108 if not is_whitelisted_host: 

3109 raise ValueError( 

3110 f"Unsupported host '{parsed.hostname}'. Can only retrieve metadata " 

3111 f"from a loopback address or one of these hosts: {', '.join(self._ALLOWED_HOSTS)}" 

3112 ) 

3113 

3114 def _is_loopback_address(self, hostname): 

3115 try: 

3116 ip = ip_address(hostname) 

3117 return ip.is_loopback 

3118 except ValueError: 

3119 return False 

3120 

3121 def _check_if_whitelisted_host(self, host): 

3122 if host in self._ALLOWED_HOSTS: 

3123 return True 

3124 return False 

3125 

3126 def retrieve_uri(self, relative_uri): 

3127 """Retrieve JSON metadata from container metadata. 

3128 

3129 :type relative_uri: str 

3130 :param relative_uri: A relative URI, e.g "/foo/bar?id=123" 

3131 

3132 :return: The parsed JSON response. 

3133 

3134 """ 

3135 full_url = self.full_url(relative_uri) 

3136 return self._retrieve_credentials(full_url) 

3137 

3138 def _retrieve_credentials(self, full_url, extra_headers=None): 

3139 headers = {'Accept': 'application/json'} 

3140 if extra_headers is not None: 

3141 headers.update(extra_headers) 

3142 attempts = 0 

3143 while True: 

3144 try: 

3145 return self._get_response( 

3146 full_url, headers, self.TIMEOUT_SECONDS 

3147 ) 

3148 except MetadataRetrievalError as e: 

3149 logger.debug( 

3150 "Received error when attempting to retrieve " 

3151 "container metadata: %s", 

3152 e, 

3153 exc_info=True, 

3154 ) 

3155 self._sleep(self.SLEEP_TIME) 

3156 attempts += 1 

3157 if attempts >= self.RETRY_ATTEMPTS: 

3158 raise 

3159 

3160 def _get_response(self, full_url, headers, timeout): 

3161 try: 

3162 AWSRequest = botocore.awsrequest.AWSRequest 

3163 request = AWSRequest(method='GET', url=full_url, headers=headers) 

3164 response = self._session.send(request.prepare()) 

3165 response_text = response.content.decode('utf-8') 

3166 if response.status_code != 200: 

3167 raise MetadataRetrievalError( 

3168 error_msg=( 

3169 f"Received non 200 response {response.status_code} " 

3170 f"from container metadata: {response_text}" 

3171 ) 

3172 ) 

3173 try: 

3174 return json.loads(response_text) 

3175 except ValueError: 

3176 error_msg = "Unable to parse JSON returned from container metadata services" 

3177 logger.debug('%s:%s', error_msg, response_text) 

3178 raise MetadataRetrievalError(error_msg=error_msg) 

3179 except RETRYABLE_HTTP_ERRORS as e: 

3180 error_msg = ( 

3181 "Received error when attempting to retrieve " 

3182 f"container metadata: {e}" 

3183 ) 

3184 raise MetadataRetrievalError(error_msg=error_msg) 

3185 

3186 def full_url(self, relative_uri): 

3187 return f'http://{self.IP_ADDRESS}{relative_uri}' 

3188 

3189 

3190def get_environ_proxies(url): 

3191 if should_bypass_proxies(url): 

3192 return {} 

3193 else: 

3194 return getproxies() 

3195 

3196 

3197def should_bypass_proxies(url): 

3198 """ 

3199 Returns whether we should bypass proxies or not. 

3200 """ 

3201 # NOTE: requests allowed for ip/cidr entries in no_proxy env that we don't 

3202 # support current as urllib only checks DNS suffix 

3203 # If the system proxy settings indicate that this URL should be bypassed, 

3204 # don't proxy. 

3205 # The proxy_bypass function is incredibly buggy on OS X in early versions 

3206 # of Python 2.6, so allow this call to fail. Only catch the specific 

3207 # exceptions we've seen, though: this call failing in other ways can reveal 

3208 # legitimate problems. 

3209 try: 

3210 if proxy_bypass(urlparse(url).netloc): 

3211 return True 

3212 except (TypeError, socket.gaierror): 

3213 pass 

3214 

3215 return False 

3216 

3217 

3218def determine_content_length(body): 

3219 # No body, content length of 0 

3220 if not body: 

3221 return 0 

3222 

3223 # Try asking the body for it's length 

3224 try: 

3225 return len(body) 

3226 except (AttributeError, TypeError): 

3227 pass 

3228 

3229 # Try getting the length from a seekable stream 

3230 if hasattr(body, 'seek') and hasattr(body, 'tell'): 

3231 try: 

3232 orig_pos = body.tell() 

3233 body.seek(0, 2) 

3234 end_file_pos = body.tell() 

3235 body.seek(orig_pos) 

3236 return end_file_pos - orig_pos 

3237 except io.UnsupportedOperation: 

3238 # in case when body is, for example, io.BufferedIOBase object 

3239 # it has "seek" method which throws "UnsupportedOperation" 

3240 # exception in such case we want to fall back to "chunked" 

3241 # encoding 

3242 pass 

3243 # Failed to determine the length 

3244 return None 

3245 

3246 

3247def get_encoding_from_headers(headers, default='ISO-8859-1'): 

3248 """Returns encodings from given HTTP Header Dict. 

3249 

3250 :param headers: dictionary to extract encoding from. 

3251 :param default: default encoding if the content-type is text 

3252 """ 

3253 

3254 content_type = headers.get('content-type') 

3255 

3256 if not content_type: 

3257 return None 

3258 

3259 message = email.message.Message() 

3260 message['content-type'] = content_type 

3261 charset = message.get_param("charset") 

3262 

3263 if charset is not None: 

3264 return charset 

3265 

3266 if 'text' in content_type: 

3267 return default 

3268 

3269 

3270def calculate_md5(body, **kwargs): 

3271 """This function has been deprecated, but is kept for backwards compatibility.""" 

3272 if isinstance(body, (bytes, bytearray)): 

3273 binary_md5 = _calculate_md5_from_bytes(body) 

3274 else: 

3275 binary_md5 = _calculate_md5_from_file(body) 

3276 return base64.b64encode(binary_md5).decode('ascii') 

3277 

3278 

3279def _calculate_md5_from_bytes(body_bytes): 

3280 """This function has been deprecated, but is kept for backwards compatibility.""" 

3281 md5 = get_md5(body_bytes, usedforsecurity=False) 

3282 return md5.digest() 

3283 

3284 

3285def _calculate_md5_from_file(fileobj): 

3286 """This function has been deprecated, but is kept for backwards compatibility.""" 

3287 start_position = fileobj.tell() 

3288 md5 = get_md5(usedforsecurity=False) 

3289 for chunk in iter(lambda: fileobj.read(1024 * 1024), b''): 

3290 md5.update(chunk) 

3291 fileobj.seek(start_position) 

3292 return md5.digest() 

3293 

3294 

3295def _is_s3express_request(params): 

3296 endpoint_properties = params.get('context', {}).get( 

3297 'endpoint_properties', {} 

3298 ) 

3299 return endpoint_properties.get('backend') == 'S3Express' 

3300 

3301 

3302def get_checksum_header_algorithms(params): 

3303 """ 

3304 Returns the a list of algorithm name if a headers starting with "x-amz-checksum-" 

3305 are provided in a request, otherwise returns an empty list. 

3306 

3307 This function is considered private and subject to abrupt breaking changes or 

3308 removal without prior announcement. Please do not use it directly. 

3309 """ 

3310 headers = params['headers'] 

3311 checksum_headers = [] 

3312 

3313 # If a header matching the x-amz-checksum-* pattern is present, we 

3314 # extract and return the algorithm name. 

3315 for header in headers: 

3316 match = CHECKSUM_HEADER_PATTERN.match(header) 

3317 if match: 

3318 checksum_headers.append(match.group(1)) 

3319 return checksum_headers 

3320 

3321 

3322def has_checksum_header(params): 

3323 """ 

3324 Checks if a header starting with "x-amz-checksum-" is provided in a request. 

3325 

3326 This function is considered private and subject to abrupt breaking changes or 

3327 removal without prior announcement. Please do not use it directly. 

3328 """ 

3329 return bool(get_checksum_header_algorithms(params)) 

3330 

3331 

3332def conditionally_calculate_checksum(params, **kwargs): 

3333 """This function has been deprecated, but is kept for backwards compatibility.""" 

3334 if not has_checksum_header(params): 

3335 conditionally_calculate_md5(params, **kwargs) 

3336 conditionally_enable_crc32(params, **kwargs) 

3337 

3338 

3339def conditionally_enable_crc32(params, **kwargs): 

3340 """This function has been deprecated, but is kept for backwards compatibility.""" 

3341 checksum_context = params.get('context', {}).get('checksum', {}) 

3342 checksum_algorithm = checksum_context.get('request_algorithm') 

3343 if ( 

3344 _is_s3express_request(params) 

3345 and params['body'] is not None 

3346 and checksum_algorithm in (None, "conditional-md5") 

3347 ): 

3348 params['context']['checksum'] = { 

3349 'request_algorithm': { 

3350 'algorithm': 'crc32', 

3351 'in': 'header', 

3352 'name': 'x-amz-checksum-crc32', 

3353 } 

3354 } 

3355 

3356 

3357def conditionally_calculate_md5(params, **kwargs): 

3358 """Only add a Content-MD5 if the system supports it. 

3359 

3360 This function has been deprecated, but is kept for backwards compatibility. 

3361 """ 

3362 body = params['body'] 

3363 checksum_context = params.get('context', {}).get('checksum', {}) 

3364 checksum_algorithm = checksum_context.get('request_algorithm') 

3365 if checksum_algorithm and checksum_algorithm != 'conditional-md5': 

3366 # Skip for requests that will have a flexible checksum applied 

3367 return 

3368 

3369 if has_checksum_header(params): 

3370 # Don't add a new header if one is already available. 

3371 return 

3372 

3373 if _is_s3express_request(params): 

3374 # S3Express doesn't support MD5 

3375 return 

3376 

3377 if MD5_AVAILABLE and body is not None: 

3378 md5_digest = calculate_md5(body, **kwargs) 

3379 params['headers']['Content-MD5'] = md5_digest 

3380 

3381 

3382class FileWebIdentityTokenLoader: 

3383 def __init__(self, web_identity_token_path, _open=open): 

3384 self._web_identity_token_path = web_identity_token_path 

3385 self._open = _open 

3386 

3387 def __call__(self): 

3388 with self._open(self._web_identity_token_path) as token_file: 

3389 return token_file.read() 

3390 

3391 

3392class SSOTokenLoader: 

3393 def __init__(self, cache=None): 

3394 if cache is None: 

3395 cache = {} 

3396 self._cache = cache 

3397 

3398 def _generate_cache_key(self, start_url, session_name): 

3399 input_str = start_url 

3400 if session_name is not None: 

3401 input_str = session_name 

3402 return hashlib.sha1(input_str.encode('utf-8')).hexdigest() 

3403 

3404 def save_token(self, start_url, token, session_name=None): 

3405 cache_key = self._generate_cache_key(start_url, session_name) 

3406 self._cache[cache_key] = token 

3407 

3408 def __call__(self, start_url, session_name=None): 

3409 cache_key = self._generate_cache_key(start_url, session_name) 

3410 logger.debug('Checking for cached token at: %s', cache_key) 

3411 if cache_key not in self._cache: 

3412 name = start_url 

3413 if session_name is not None: 

3414 name = session_name 

3415 error_msg = f'Token for {name} does not exist' 

3416 raise SSOTokenLoadError(error_msg=error_msg) 

3417 

3418 token = self._cache[cache_key] 

3419 if 'accessToken' not in token or 'expiresAt' not in token: 

3420 error_msg = f'Token for {start_url} is invalid' 

3421 raise SSOTokenLoadError(error_msg=error_msg) 

3422 return token 

3423 

3424 

3425class EventbridgeSignerSetter: 

3426 _DEFAULT_PARTITION = 'aws' 

3427 _DEFAULT_DNS_SUFFIX = 'amazonaws.com' 

3428 

3429 def __init__(self, endpoint_resolver, region=None, endpoint_url=None): 

3430 self._endpoint_resolver = endpoint_resolver 

3431 self._region = region 

3432 self._endpoint_url = endpoint_url 

3433 

3434 def register(self, event_emitter): 

3435 event_emitter.register( 

3436 'before-parameter-build.events.PutEvents', 

3437 self.check_for_global_endpoint, 

3438 ) 

3439 event_emitter.register( 

3440 'before-call.events.PutEvents', self.set_endpoint_url 

3441 ) 

3442 

3443 def set_endpoint_url(self, params, context, **kwargs): 

3444 if 'eventbridge_endpoint' in context: 

3445 endpoint = context['eventbridge_endpoint'] 

3446 logger.debug( 

3447 "Rewriting URL from %s to %s", params['url'], endpoint 

3448 ) 

3449 params['url'] = endpoint 

3450 

3451 def check_for_global_endpoint(self, params, context, **kwargs): 

3452 endpoint = params.get('EndpointId') 

3453 if endpoint is None: 

3454 return 

3455 

3456 if len(endpoint) == 0: 

3457 raise InvalidEndpointConfigurationError( 

3458 msg='EndpointId must not be a zero length string' 

3459 ) 

3460 

3461 if not HAS_CRT: 

3462 raise MissingDependencyException( 

3463 msg="Using EndpointId requires an additional " 

3464 "dependency. You will need to pip install " 

3465 "botocore[crt] before proceeding." 

3466 ) 

3467 

3468 config = context.get('client_config') 

3469 endpoint_variant_tags = None 

3470 if config is not None: 

3471 if config.use_fips_endpoint: 

3472 raise InvalidEndpointConfigurationError( 

3473 msg="FIPS is not supported with EventBridge " 

3474 "multi-region endpoints." 

3475 ) 

3476 if config.use_dualstack_endpoint: 

3477 endpoint_variant_tags = ['dualstack'] 

3478 

3479 if self._endpoint_url is None: 

3480 # Validate endpoint is a valid hostname component 

3481 parts = urlparse(f'https://{endpoint}') 

3482 if parts.hostname != endpoint: 

3483 raise InvalidEndpointConfigurationError( 

3484 msg='EndpointId is not a valid hostname component.' 

3485 ) 

3486 resolved_endpoint = self._get_global_endpoint( 

3487 endpoint, endpoint_variant_tags=endpoint_variant_tags 

3488 ) 

3489 else: 

3490 resolved_endpoint = self._endpoint_url 

3491 

3492 context['eventbridge_endpoint'] = resolved_endpoint 

3493 context['auth_type'] = 'v4a' 

3494 

3495 def _get_global_endpoint(self, endpoint, endpoint_variant_tags=None): 

3496 resolver = self._endpoint_resolver 

3497 

3498 partition = resolver.get_partition_for_region(self._region) 

3499 if partition is None: 

3500 partition = self._DEFAULT_PARTITION 

3501 dns_suffix = resolver.get_partition_dns_suffix( 

3502 partition, endpoint_variant_tags=endpoint_variant_tags 

3503 ) 

3504 if dns_suffix is None: 

3505 dns_suffix = self._DEFAULT_DNS_SUFFIX 

3506 

3507 return f"https://{endpoint}.endpoint.events.{dns_suffix}/" 

3508 

3509 

3510def is_s3_accelerate_url(url): 

3511 """Does the URL match the S3 Accelerate endpoint scheme? 

3512 

3513 Virtual host naming style with bucket names in the netloc part of the URL 

3514 are not allowed by this function. 

3515 """ 

3516 if url is None: 

3517 return False 

3518 

3519 # Accelerate is only valid for Amazon endpoints. 

3520 url_parts = urlsplit(url) 

3521 if not url_parts.netloc.endswith( 

3522 'amazonaws.com' 

3523 ) or url_parts.scheme not in ['https', 'http']: 

3524 return False 

3525 

3526 # The first part of the URL must be s3-accelerate. 

3527 parts = url_parts.netloc.split('.') 

3528 if parts[0] != 's3-accelerate': 

3529 return False 

3530 

3531 # Url parts between 's3-accelerate' and 'amazonaws.com' which 

3532 # represent different url features. 

3533 feature_parts = parts[1:-2] 

3534 

3535 # There should be no duplicate URL parts. 

3536 if len(feature_parts) != len(set(feature_parts)): 

3537 return False 

3538 

3539 # Remaining parts must all be in the whitelist. 

3540 return all(p in S3_ACCELERATE_WHITELIST for p in feature_parts) 

3541 

3542 

3543class JSONFileCache: 

3544 """JSON file cache. 

3545 This provides a dict like interface that stores JSON serializable 

3546 objects. 

3547 The objects are serialized to JSON and stored in a file. These 

3548 values can be retrieved at a later time. 

3549 """ 

3550 

3551 CACHE_DIR = os.path.expanduser(os.path.join('~', '.aws', 'boto', 'cache')) 

3552 

3553 def __init__(self, working_dir=CACHE_DIR, dumps_func=None): 

3554 self._working_dir = working_dir 

3555 if dumps_func is None: 

3556 dumps_func = self._default_dumps 

3557 self._dumps = dumps_func 

3558 

3559 def _default_dumps(self, obj): 

3560 return json.dumps(obj, default=self._serialize_if_needed) 

3561 

3562 def __contains__(self, cache_key): 

3563 actual_key = self._convert_cache_key(cache_key) 

3564 return os.path.isfile(actual_key) 

3565 

3566 def __getitem__(self, cache_key): 

3567 """Retrieve value from a cache key.""" 

3568 actual_key = self._convert_cache_key(cache_key) 

3569 try: 

3570 with open(actual_key) as f: 

3571 return json.load(f) 

3572 except (OSError, ValueError): 

3573 raise KeyError(cache_key) 

3574 

3575 def __delitem__(self, cache_key): 

3576 actual_key = self._convert_cache_key(cache_key) 

3577 try: 

3578 key_path = Path(actual_key) 

3579 key_path.unlink() 

3580 except FileNotFoundError: 

3581 raise KeyError(cache_key) 

3582 

3583 def __setitem__(self, cache_key, value): 

3584 full_key = self._convert_cache_key(cache_key) 

3585 try: 

3586 file_content = self._dumps(value) 

3587 except (TypeError, ValueError): 

3588 raise ValueError( 

3589 f"Value cannot be cached, must be JSON serializable: {value}" 

3590 ) 

3591 if not os.path.isdir(self._working_dir): 

3592 os.makedirs(self._working_dir, exist_ok=True) 

3593 

3594 temp_fd, temp_path = tempfile.mkstemp( 

3595 dir=self._working_dir, suffix='.tmp' 

3596 ) 

3597 with os.fdopen(temp_fd, 'w') as f: 

3598 f.write(file_content) 

3599 f.flush() 

3600 os.fsync(f.fileno()) 

3601 

3602 os.replace(temp_path, full_key) 

3603 

3604 def _convert_cache_key(self, cache_key): 

3605 full_path = os.path.join(self._working_dir, cache_key + '.json') 

3606 return full_path 

3607 

3608 def _serialize_if_needed(self, value, iso=False): 

3609 if isinstance(value, _DatetimeClass): 

3610 if iso: 

3611 return value.isoformat() 

3612 return value.strftime('%Y-%m-%dT%H:%M:%S%Z') 

3613 return value 

3614 

3615 

3616def generate_login_cache_key(sign_in_session_name): 

3617 return hashlib.sha256(sign_in_session_name.encode('utf-8')).hexdigest() 

3618 

3619 

3620def is_s3express_bucket(bucket): 

3621 if bucket is None: 

3622 return False 

3623 return bucket.endswith('--x-s3') 

3624 

3625 

3626def get_token_from_environment(signing_name, environ=None): 

3627 if not isinstance(signing_name, str) or not signing_name.strip(): 

3628 return None 

3629 

3630 if environ is None: 

3631 environ = os.environ 

3632 env_var = _get_bearer_env_var_name(signing_name) 

3633 return environ.get(env_var) 

3634 

3635 

3636def _get_bearer_env_var_name(signing_name): 

3637 bearer_name = signing_name.replace('-', '_').replace(' ', '_').upper() 

3638 return f"AWS_BEARER_TOKEN_{bearer_name}" 

3639 

3640 

3641# This parameter is not part of the public interface and is subject to abrupt 

3642# breaking changes or removal without prior announcement. 

3643# Mapping of services that have been renamed for backwards compatibility reasons. 

3644# Keys are the previous name that should be allowed, values are the documented 

3645# and preferred client name. 

3646SERVICE_NAME_ALIASES = {'runtime.sagemaker': 'sagemaker-runtime'} 

3647 

3648 

3649# This parameter is not part of the public interface and is subject to abrupt 

3650# breaking changes or removal without prior announcement. 

3651# Mapping to determine the service ID for services that do not use it as the 

3652# model data directory name. The keys are the data directory name and the 

3653# values are the transformed service IDs (lower case and hyphenated). 

3654CLIENT_NAME_TO_HYPHENIZED_SERVICE_ID_OVERRIDES = { 

3655 # Actual service name we use -> Allowed computed service name. 

3656 'apigateway': 'api-gateway', 

3657 'application-autoscaling': 'application-auto-scaling', 

3658 'appmesh': 'app-mesh', 

3659 'autoscaling': 'auto-scaling', 

3660 'autoscaling-plans': 'auto-scaling-plans', 

3661 'ce': 'cost-explorer', 

3662 'cloudhsmv2': 'cloudhsm-v2', 

3663 'cloudsearchdomain': 'cloudsearch-domain', 

3664 'cognito-idp': 'cognito-identity-provider', 

3665 'config': 'config-service', 

3666 'cur': 'cost-and-usage-report-service', 

3667 'datapipeline': 'data-pipeline', 

3668 'directconnect': 'direct-connect', 

3669 'devicefarm': 'device-farm', 

3670 'discovery': 'application-discovery-service', 

3671 'dms': 'database-migration-service', 

3672 'ds': 'directory-service', 

3673 'ds-data': 'directory-service-data', 

3674 'dynamodbstreams': 'dynamodb-streams', 

3675 'elasticbeanstalk': 'elastic-beanstalk', 

3676 'elb': 'elastic-load-balancing', 

3677 'elbv2': 'elastic-load-balancing-v2', 

3678 'es': 'elasticsearch-service', 

3679 'events': 'eventbridge', 

3680 'globalaccelerator': 'global-accelerator', 

3681 'iot-data': 'iot-data-plane', 

3682 'iot-jobs-data': 'iot-jobs-data-plane', 

3683 'iotevents-data': 'iot-events-data', 

3684 'iotevents': 'iot-events', 

3685 'iotwireless': 'iot-wireless', 

3686 'kinesisanalytics': 'kinesis-analytics', 

3687 'kinesisanalyticsv2': 'kinesis-analytics-v2', 

3688 'kinesisvideo': 'kinesis-video', 

3689 'lex-models': 'lex-model-building-service', 

3690 'lexv2-models': 'lex-models-v2', 

3691 'lex-runtime': 'lex-runtime-service', 

3692 'lexv2-runtime': 'lex-runtime-v2', 

3693 'logs': 'cloudwatch-logs', 

3694 'machinelearning': 'machine-learning', 

3695 'marketplacecommerceanalytics': 'marketplace-commerce-analytics', 

3696 'marketplace-entitlement': 'marketplace-entitlement-service', 

3697 'meteringmarketplace': 'marketplace-metering', 

3698 'mgh': 'migration-hub', 

3699 'sms-voice': 'pinpoint-sms-voice', 

3700 'resourcegroupstaggingapi': 'resource-groups-tagging-api', 

3701 'route53': 'route-53', 

3702 'route53domains': 'route-53-domains', 

3703 's3control': 's3-control', 

3704 'sdb': 'simpledb', 

3705 'secretsmanager': 'secrets-manager', 

3706 'serverlessrepo': 'serverlessapplicationrepository', 

3707 'servicecatalog': 'service-catalog', 

3708 'servicecatalog-appregistry': 'service-catalog-appregistry', 

3709 'stepfunctions': 'sfn', 

3710 'storagegateway': 'storage-gateway', 

3711} 

3712 

3713 

3714def get_login_token_cache_directory(): 

3715 """Returns which directory contains the login_session token files""" 

3716 if 'AWS_LOGIN_CACHE_DIRECTORY' in os.environ: 

3717 path = os.path.expandvars(os.environ['AWS_LOGIN_CACHE_DIRECTORY']) 

3718 path = os.path.expanduser(path) 

3719 return path 

3720 else: 

3721 return os.path.expanduser(os.path.join('~', '.aws', 'login', 'cache')) 

3722 

3723 

3724class LoginTokenLoader: 

3725 """Loads and saves login access tokens to disk""" 

3726 

3727 def __init__(self, cache=None): 

3728 if cache is None: 

3729 cache = {} 

3730 self._cache = cache 

3731 

3732 def save_token(self, session_name, token): 

3733 cache_key = generate_login_cache_key(session_name) 

3734 self._cache[cache_key] = token 

3735 

3736 def load_token(self, session_name): 

3737 cache_key = generate_login_cache_key(session_name) 

3738 if cache_key not in self._cache: 

3739 return None 

3740 return self._cache[cache_key]