Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/botocore/utils.py: 20%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

1699 statements  

1# Copyright 2012-2014 Amazon.com, Inc. or its affiliates. All Rights Reserved. 

2# 

3# Licensed under the Apache License, Version 2.0 (the "License"). You 

4# may not use this file except in compliance with the License. A copy of 

5# the License is located at 

6# 

7# http://aws.amazon.com/apache2.0/ 

8# 

9# or in the "license" file accompanying this file. This file is 

10# distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF 

11# ANY KIND, either express or implied. See the License for the specific 

12# language governing permissions and limitations under the License. 

13import base64 

14import binascii 

15import datetime 

16import email.message 

17import functools 

18import hashlib 

19import io 

20import logging 

21import os 

22import random 

23import re 

24import socket 

25import time 

26import warnings 

27import weakref 

28from datetime import datetime as _DatetimeClass 

29from ipaddress import ip_address 

30from pathlib import Path 

31from urllib.request import getproxies, proxy_bypass 

32 

33import dateutil.parser 

34from dateutil.tz import tzutc 

35from urllib3.exceptions import LocationParseError 

36 

37import botocore 

38import botocore.awsrequest 

39import botocore.httpsession 

40 

41# IP Regexes retained for backwards compatibility 

42from botocore.compat import ( 

43 HAS_CRT, 

44 HEX_PAT, # noqa: F401 

45 IPV4_PAT, # noqa: F401 

46 IPV4_RE, 

47 IPV6_ADDRZ_PAT, # noqa: F401 

48 IPV6_ADDRZ_RE, 

49 IPV6_PAT, # noqa: F401 

50 LS32_PAT, # noqa: F401 

51 MD5_AVAILABLE, 

52 UNRESERVED_PAT, # noqa: F401 

53 UNSAFE_URL_CHARS, 

54 ZONE_ID_PAT, # noqa: F401 

55 OrderedDict, 

56 get_current_datetime, 

57 get_md5, 

58 get_tzinfo_options, 

59 json, 

60 quote, 

61 urlparse, 

62 urlsplit, 

63 urlunsplit, 

64 zip_longest, 

65) 

66from botocore.exceptions import ( 

67 ClientError, 

68 ConfigNotFound, 

69 ConnectionClosedError, 

70 ConnectTimeoutError, 

71 EndpointConnectionError, 

72 HTTPClientError, 

73 InvalidDNSNameError, 

74 InvalidEndpointConfigurationError, 

75 InvalidExpressionError, 

76 InvalidHostLabelError, 

77 InvalidIMDSEndpointError, 

78 InvalidIMDSEndpointModeError, 

79 InvalidRegionError, 

80 MetadataRetrievalError, 

81 MissingDependencyException, 

82 ReadTimeoutError, 

83 SSOTokenLoadError, 

84 UnsupportedOutpostResourceError, 

85 UnsupportedS3AccesspointConfigurationError, 

86 UnsupportedS3ArnError, 

87 UnsupportedS3ConfigurationError, 

88 UnsupportedS3ControlArnError, 

89 UnsupportedS3ControlConfigurationError, 

90) 

91from botocore.plugin import ( 

92 PluginContext, 

93 reset_plugin_context, 

94 set_plugin_context, 

95) 

96 

97logger = logging.getLogger(__name__) 

98DEFAULT_METADATA_SERVICE_TIMEOUT = 1 

99METADATA_BASE_URL = 'http://169.254.169.254/' 

100METADATA_BASE_URL_IPv6 = 'http://[fd00:ec2::254]/' 

101METADATA_ENDPOINT_MODES = ('ipv4', 'ipv6') 

102 

103# These are chars that do not need to be urlencoded. 

104# Based on rfc2986, section 2.3 

105SAFE_CHARS = '-._~' 

106LABEL_RE = re.compile(r'[a-z0-9][a-z0-9\-]*[a-z0-9]') 

107RETRYABLE_HTTP_ERRORS = ( 

108 ReadTimeoutError, 

109 EndpointConnectionError, 

110 ConnectionClosedError, 

111 ConnectTimeoutError, 

112) 

113S3_ACCELERATE_WHITELIST = ['dualstack'] 

114# In switching events from using service name / endpoint prefix to service 

115# id, we have to preserve compatibility. This maps the instances where either 

116# is different than the transformed service id. 

117EVENT_ALIASES = { 

118 "api.mediatailor": "mediatailor", 

119 "api.pricing": "pricing", 

120 "api.sagemaker": "sagemaker", 

121 "apigateway": "api-gateway", 

122 "application-autoscaling": "application-auto-scaling", 

123 "appstream2": "appstream", 

124 "autoscaling": "auto-scaling", 

125 "autoscaling-plans": "auto-scaling-plans", 

126 "ce": "cost-explorer", 

127 "cloudhsmv2": "cloudhsm-v2", 

128 "cloudsearchdomain": "cloudsearch-domain", 

129 "cognito-idp": "cognito-identity-provider", 

130 "config": "config-service", 

131 "cur": "cost-and-usage-report-service", 

132 "data.iot": "iot-data-plane", 

133 "data.jobs.iot": "iot-jobs-data-plane", 

134 "data.mediastore": "mediastore-data", 

135 "datapipeline": "data-pipeline", 

136 "devicefarm": "device-farm", 

137 "directconnect": "direct-connect", 

138 "discovery": "application-discovery-service", 

139 "dms": "database-migration-service", 

140 "ds": "directory-service", 

141 "dynamodbstreams": "dynamodb-streams", 

142 "elasticbeanstalk": "elastic-beanstalk", 

143 "elasticfilesystem": "efs", 

144 "elasticloadbalancing": "elastic-load-balancing", 

145 "elasticmapreduce": "emr", 

146 "elastictranscoder": "elastic-transcoder", 

147 "elb": "elastic-load-balancing", 

148 "elbv2": "elastic-load-balancing-v2", 

149 "email": "ses", 

150 "entitlement.marketplace": "marketplace-entitlement-service", 

151 "es": "elasticsearch-service", 

152 "events": "eventbridge", 

153 "cloudwatch-events": "eventbridge", 

154 "iot-data": "iot-data-plane", 

155 "iot-jobs-data": "iot-jobs-data-plane", 

156 "kinesisanalytics": "kinesis-analytics", 

157 "kinesisvideo": "kinesis-video", 

158 "lex-models": "lex-model-building-service", 

159 "lex-runtime": "lex-runtime-service", 

160 "logs": "cloudwatch-logs", 

161 "machinelearning": "machine-learning", 

162 "marketplace-entitlement": "marketplace-entitlement-service", 

163 "marketplacecommerceanalytics": "marketplace-commerce-analytics", 

164 "metering.marketplace": "marketplace-metering", 

165 "meteringmarketplace": "marketplace-metering", 

166 "mgh": "migration-hub", 

167 "models.lex": "lex-model-building-service", 

168 "monitoring": "cloudwatch", 

169 "mturk-requester": "mturk", 

170 "opsworks-cm": "opsworkscm", 

171 "resourcegroupstaggingapi": "resource-groups-tagging-api", 

172 "route53": "route-53", 

173 "route53domains": "route-53-domains", 

174 "runtime.lex": "lex-runtime-service", 

175 "runtime.sagemaker": "sagemaker-runtime", 

176 "sdb": "simpledb", 

177 "secretsmanager": "secrets-manager", 

178 "serverlessrepo": "serverlessapplicationrepository", 

179 "servicecatalog": "service-catalog", 

180 "states": "sfn", 

181 "stepfunctions": "sfn", 

182 "storagegateway": "storage-gateway", 

183 "streams.dynamodb": "dynamodb-streams", 

184 "tagging": "resource-groups-tagging-api", 

185} 

186 

187 

188# This pattern can be used to detect if a header is a flexible checksum header 

189CHECKSUM_HEADER_PATTERN = re.compile( 

190 r'^X-Amz-Checksum-([a-z0-9]*)$', 

191 flags=re.IGNORECASE, 

192) 

193 

194PRIORITY_ORDERED_SUPPORTED_PROTOCOLS = ( 

195 'json', 

196 'rest-json', 

197 'rest-xml', 

198 'smithy-rpc-v2-cbor', 

199 'query', 

200 'ec2', 

201) 

202 

203 

204def ensure_boolean(val): 

205 """Ensures a boolean value if a string or boolean is provided 

206 

207 For strings, the value for True/False is case insensitive 

208 """ 

209 if isinstance(val, bool): 

210 return val 

211 elif isinstance(val, str): 

212 return val.lower() == 'true' 

213 else: 

214 return False 

215 

216 

217def resolve_imds_endpoint_mode(session): 

218 """Resolving IMDS endpoint mode to either IPv6 or IPv4. 

219 

220 ec2_metadata_service_endpoint_mode takes precedence over imds_use_ipv6. 

221 """ 

222 endpoint_mode = session.get_config_variable( 

223 'ec2_metadata_service_endpoint_mode' 

224 ) 

225 if endpoint_mode is not None: 

226 lendpoint_mode = endpoint_mode.lower() 

227 if lendpoint_mode not in METADATA_ENDPOINT_MODES: 

228 error_msg_kwargs = { 

229 'mode': endpoint_mode, 

230 'valid_modes': METADATA_ENDPOINT_MODES, 

231 } 

232 raise InvalidIMDSEndpointModeError(**error_msg_kwargs) 

233 return lendpoint_mode 

234 elif session.get_config_variable('imds_use_ipv6'): 

235 return 'ipv6' 

236 return 'ipv4' 

237 

238 

239def is_json_value_header(shape): 

240 """Determines if the provided shape is the special header type jsonvalue. 

241 

242 :type shape: botocore.shape 

243 :param shape: Shape to be inspected for the jsonvalue trait. 

244 

245 :return: True if this type is a jsonvalue, False otherwise 

246 :rtype: Bool 

247 """ 

248 return ( 

249 hasattr(shape, 'serialization') 

250 and shape.serialization.get('jsonvalue', False) 

251 and shape.serialization.get('location') == 'header' 

252 and shape.type_name == 'string' 

253 ) 

254 

255 

256def has_header(header_name, headers): 

257 """Case-insensitive check for header key.""" 

258 if header_name is None: 

259 return False 

260 elif isinstance(headers, botocore.awsrequest.HeadersDict): 

261 return header_name in headers 

262 else: 

263 return header_name.lower() in [key.lower() for key in headers.keys()] 

264 

265 

266def get_service_module_name(service_model): 

267 """Returns the module name for a service 

268 

269 This is the value used in both the documentation and client class name 

270 """ 

271 name = service_model.metadata.get( 

272 'serviceAbbreviation', 

273 service_model.metadata.get( 

274 'serviceFullName', service_model.service_name 

275 ), 

276 ) 

277 name = name.replace('Amazon', '') 

278 name = name.replace('AWS', '') 

279 name = re.sub(r'\W+', '', name) 

280 return name 

281 

282 

283def normalize_url_path(path): 

284 if not path: 

285 return '/' 

286 return remove_dot_segments(path) 

287 

288 

289def normalize_boolean(val): 

290 """Returns None if val is None, otherwise ensure value 

291 converted to boolean""" 

292 if val is None: 

293 return val 

294 else: 

295 return ensure_boolean(val) 

296 

297 

298def remove_dot_segments(url): 

299 # RFC 3986, section 5.2.4 "Remove Dot Segments" 

300 # Also, AWS services require consecutive slashes to be removed, 

301 # so that's done here as well 

302 if not url: 

303 return '' 

304 input_url = url.split('/') 

305 output_list = [] 

306 for x in input_url: 

307 if x and x != '.': 

308 if x == '..': 

309 if output_list: 

310 output_list.pop() 

311 else: 

312 output_list.append(x) 

313 

314 if url[0] == '/': 

315 first = '/' 

316 else: 

317 first = '' 

318 if url[-1] == '/' and output_list: 

319 last = '/' 

320 else: 

321 last = '' 

322 return first + '/'.join(output_list) + last 

323 

324 

325def validate_jmespath_for_set(expression): 

326 # Validates a limited jmespath expression to determine if we can set a 

327 # value based on it. Only works with dotted paths. 

328 if not expression or expression == '.': 

329 raise InvalidExpressionError(expression=expression) 

330 

331 for invalid in ['[', ']', '*']: 

332 if invalid in expression: 

333 raise InvalidExpressionError(expression=expression) 

334 

335 

336def set_value_from_jmespath(source, expression, value, is_first=True): 

337 # This takes a (limited) jmespath-like expression & can set a value based 

338 # on it. 

339 # Limitations: 

340 # * Only handles dotted lookups 

341 # * No offsets/wildcards/slices/etc. 

342 if is_first: 

343 validate_jmespath_for_set(expression) 

344 

345 bits = expression.split('.', 1) 

346 current_key, remainder = bits[0], bits[1] if len(bits) > 1 else '' 

347 

348 if not current_key: 

349 raise InvalidExpressionError(expression=expression) 

350 

351 if remainder: 

352 if current_key not in source: 

353 # We've got something in the expression that's not present in the 

354 # source (new key). If there's any more bits, we'll set the key 

355 # with an empty dictionary. 

356 source[current_key] = {} 

357 

358 return set_value_from_jmespath( 

359 source[current_key], remainder, value, is_first=False 

360 ) 

361 

362 # If we're down to a single key, set it. 

363 source[current_key] = value 

364 

365 

366def is_global_accesspoint(context): 

367 """Determine if request is intended for an MRAP accesspoint.""" 

368 s3_accesspoint = context.get('s3_accesspoint', {}) 

369 is_global = s3_accesspoint.get('region') == '' 

370 return is_global 

371 

372 

373def create_nested_client(session, service_name, **kwargs): 

374 # If a client is created from within a plugin based on the environment variable, 

375 # an infinite loop could arise. Any clients created from within another client 

376 # must use this method to prevent infinite loops. 

377 ctx = PluginContext(plugins="DISABLED") 

378 token = set_plugin_context(ctx) 

379 try: 

380 return session.create_client(service_name, **kwargs) 

381 finally: 

382 reset_plugin_context(token) 

383 

384 

385class _RetriesExceededError(Exception): 

386 """Internal exception used when the number of retries are exceeded.""" 

387 

388 pass 

389 

390 

391class BadIMDSRequestError(Exception): 

392 def __init__(self, request): 

393 self.request = request 

394 

395 

396class IMDSFetcher: 

397 _RETRIES_EXCEEDED_ERROR_CLS = _RetriesExceededError 

398 _TOKEN_PATH = 'latest/api/token' 

399 _TOKEN_TTL = '21600' 

400 

401 def __init__( 

402 self, 

403 timeout=DEFAULT_METADATA_SERVICE_TIMEOUT, 

404 num_attempts=1, 

405 base_url=METADATA_BASE_URL, 

406 env=None, 

407 user_agent=None, 

408 config=None, 

409 ): 

410 self._timeout = timeout 

411 self._num_attempts = num_attempts 

412 if config is None: 

413 config = {} 

414 self._base_url = self._select_base_url(base_url, config) 

415 self._config = config 

416 

417 if env is None: 

418 env = os.environ.copy() 

419 self._disabled = ( 

420 env.get('AWS_EC2_METADATA_DISABLED', 'false').lower() == 'true' 

421 ) 

422 self._imds_v1_disabled = config.get('ec2_metadata_v1_disabled') 

423 self._user_agent = user_agent 

424 self._session = botocore.httpsession.URLLib3Session( 

425 timeout=self._timeout, 

426 proxies=get_environ_proxies(self._base_url), 

427 ) 

428 

429 def get_base_url(self): 

430 return self._base_url 

431 

432 def _select_base_url(self, base_url, config): 

433 if config is None: 

434 config = {} 

435 

436 requires_ipv6 = ( 

437 config.get('ec2_metadata_service_endpoint_mode') == 'ipv6' 

438 ) 

439 custom_metadata_endpoint = config.get('ec2_metadata_service_endpoint') 

440 

441 if requires_ipv6 and custom_metadata_endpoint: 

442 logger.warning( 

443 "Custom endpoint and IMDS_USE_IPV6 are both set. Using custom endpoint." 

444 ) 

445 

446 chosen_base_url = None 

447 

448 if base_url != METADATA_BASE_URL: 

449 chosen_base_url = base_url 

450 elif custom_metadata_endpoint: 

451 chosen_base_url = custom_metadata_endpoint 

452 elif requires_ipv6: 

453 chosen_base_url = METADATA_BASE_URL_IPv6 

454 else: 

455 chosen_base_url = METADATA_BASE_URL 

456 

457 logger.debug(f"IMDS ENDPOINT: {chosen_base_url}") 

458 if not is_valid_uri(chosen_base_url): 

459 raise InvalidIMDSEndpointError(endpoint=chosen_base_url) 

460 

461 return chosen_base_url 

462 

463 def _construct_url(self, path): 

464 sep = '' 

465 if self._base_url and not self._base_url.endswith('/'): 

466 sep = '/' 

467 return f'{self._base_url}{sep}{path}' 

468 

469 def _fetch_metadata_token(self): 

470 self._assert_enabled() 

471 url = self._construct_url(self._TOKEN_PATH) 

472 headers = { 

473 'x-aws-ec2-metadata-token-ttl-seconds': self._TOKEN_TTL, 

474 } 

475 self._add_user_agent(headers) 

476 request = botocore.awsrequest.AWSRequest( 

477 method='PUT', url=url, headers=headers 

478 ) 

479 for i in range(self._num_attempts): 

480 try: 

481 response = self._session.send(request.prepare()) 

482 if response.status_code == 200: 

483 return response.text 

484 elif response.status_code in (404, 403, 405): 

485 return None 

486 elif response.status_code in (400,): 

487 raise BadIMDSRequestError(request) 

488 except ReadTimeoutError: 

489 return None 

490 except RETRYABLE_HTTP_ERRORS as e: 

491 logger.debug( 

492 "Caught retryable HTTP exception while making metadata " 

493 "service request to %s: %s", 

494 url, 

495 e, 

496 exc_info=True, 

497 ) 

498 except HTTPClientError as e: 

499 if isinstance(e.kwargs.get('error'), LocationParseError): 

500 raise InvalidIMDSEndpointError(endpoint=url, error=e) 

501 else: 

502 raise 

503 return None 

504 

505 def _get_request(self, url_path, retry_func, token=None): 

506 """Make a get request to the Instance Metadata Service. 

507 

508 :type url_path: str 

509 :param url_path: The path component of the URL to make a get request. 

510 This arg is appended to the base_url that was provided in the 

511 initializer. 

512 

513 :type retry_func: callable 

514 :param retry_func: A function that takes the response as an argument 

515 and determines if it needs to retry. By default empty and non 

516 200 OK responses are retried. 

517 

518 :type token: str 

519 :param token: Metadata token to send along with GET requests to IMDS. 

520 """ 

521 self._assert_enabled() 

522 if not token: 

523 self._assert_v1_enabled() 

524 if retry_func is None: 

525 retry_func = self._default_retry 

526 url = self._construct_url(url_path) 

527 headers = {} 

528 if token is not None: 

529 headers['x-aws-ec2-metadata-token'] = token 

530 self._add_user_agent(headers) 

531 for i in range(self._num_attempts): 

532 try: 

533 request = botocore.awsrequest.AWSRequest( 

534 method='GET', url=url, headers=headers 

535 ) 

536 response = self._session.send(request.prepare()) 

537 if not retry_func(response): 

538 return response 

539 except RETRYABLE_HTTP_ERRORS as e: 

540 logger.debug( 

541 "Caught retryable HTTP exception while making metadata " 

542 "service request to %s: %s", 

543 url, 

544 e, 

545 exc_info=True, 

546 ) 

547 raise self._RETRIES_EXCEEDED_ERROR_CLS() 

548 

549 def _add_user_agent(self, headers): 

550 if self._user_agent is not None: 

551 headers['User-Agent'] = self._user_agent 

552 

553 def _assert_enabled(self): 

554 if self._disabled: 

555 logger.debug("Access to EC2 metadata has been disabled.") 

556 raise self._RETRIES_EXCEEDED_ERROR_CLS() 

557 

558 def _assert_v1_enabled(self): 

559 if self._imds_v1_disabled: 

560 raise MetadataRetrievalError( 

561 error_msg="Unable to retrieve token for use in IMDSv2 call and IMDSv1 has been disabled" 

562 ) 

563 

564 def _default_retry(self, response): 

565 return self._is_non_ok_response(response) or self._is_empty(response) 

566 

567 def _is_non_ok_response(self, response): 

568 if response.status_code != 200: 

569 self._log_imds_response(response, 'non-200', log_body=True) 

570 return True 

571 return False 

572 

573 def _is_empty(self, response): 

574 if not response.content: 

575 self._log_imds_response(response, 'no body', log_body=True) 

576 return True 

577 return False 

578 

579 def _log_imds_response(self, response, reason_to_log, log_body=False): 

580 statement = ( 

581 "Metadata service returned %s response " 

582 "with status code of %s for url: %s" 

583 ) 

584 logger_args = [reason_to_log, response.status_code, response.url] 

585 if log_body: 

586 statement += ", content body: %s" 

587 logger_args.append(response.content) 

588 logger.debug(statement, *logger_args) 

589 

590 

591class InstanceMetadataFetcher(IMDSFetcher): 

592 _URL_PATH = 'latest/meta-data/iam/security-credentials/' 

593 _REQUIRED_CREDENTIAL_FIELDS = [ 

594 'AccessKeyId', 

595 'SecretAccessKey', 

596 'Token', 

597 'Expiration', 

598 ] 

599 

600 def retrieve_iam_role_credentials(self): 

601 try: 

602 token = self._fetch_metadata_token() 

603 role_name = self._get_iam_role(token) 

604 credentials = self._get_credentials(role_name, token) 

605 if self._contains_all_credential_fields(credentials): 

606 credentials = { 

607 'role_name': role_name, 

608 'access_key': credentials['AccessKeyId'], 

609 'secret_key': credentials['SecretAccessKey'], 

610 'token': credentials['Token'], 

611 'expiry_time': credentials['Expiration'], 

612 } 

613 self._evaluate_expiration(credentials) 

614 return credentials 

615 else: 

616 # IMDS can return a 200 response that has a JSON formatted 

617 # error message (i.e. if ec2 is not trusted entity for the 

618 # attached role). We do not necessarily want to retry for 

619 # these and we also do not necessarily want to raise a key 

620 # error. So at least log the problematic response and return 

621 # an empty dictionary to signal that it was not able to 

622 # retrieve credentials. These error will contain both a 

623 # Code and Message key. 

624 if 'Code' in credentials and 'Message' in credentials: 

625 logger.debug( 

626 'Error response received when retrieving' 

627 'credentials: %s.', 

628 credentials, 

629 ) 

630 return {} 

631 except self._RETRIES_EXCEEDED_ERROR_CLS: 

632 logger.debug( 

633 "Max number of attempts exceeded (%s) when " 

634 "attempting to retrieve data from metadata service.", 

635 self._num_attempts, 

636 ) 

637 except BadIMDSRequestError as e: 

638 logger.debug("Bad IMDS request: %s", e.request) 

639 return {} 

640 

641 def _get_iam_role(self, token=None): 

642 return self._get_request( 

643 url_path=self._URL_PATH, 

644 retry_func=self._needs_retry_for_role_name, 

645 token=token, 

646 ).text 

647 

648 def _get_credentials(self, role_name, token=None): 

649 r = self._get_request( 

650 url_path=self._URL_PATH + role_name, 

651 retry_func=self._needs_retry_for_credentials, 

652 token=token, 

653 ) 

654 return json.loads(r.text) 

655 

656 def _is_invalid_json(self, response): 

657 try: 

658 json.loads(response.text) 

659 return False 

660 except ValueError: 

661 self._log_imds_response(response, 'invalid json') 

662 return True 

663 

664 def _needs_retry_for_role_name(self, response): 

665 return self._is_non_ok_response(response) or self._is_empty(response) 

666 

667 def _needs_retry_for_credentials(self, response): 

668 return ( 

669 self._is_non_ok_response(response) 

670 or self._is_empty(response) 

671 or self._is_invalid_json(response) 

672 ) 

673 

674 def _contains_all_credential_fields(self, credentials): 

675 for field in self._REQUIRED_CREDENTIAL_FIELDS: 

676 if field not in credentials: 

677 logger.debug( 

678 'Retrieved credentials is missing required field: %s', 

679 field, 

680 ) 

681 return False 

682 return True 

683 

684 def _evaluate_expiration(self, credentials): 

685 expiration = credentials.get("expiry_time") 

686 if expiration is None: 

687 return 

688 try: 

689 expiration = datetime.datetime.strptime( 

690 expiration, "%Y-%m-%dT%H:%M:%SZ" 

691 ) 

692 refresh_interval = self._config.get( 

693 "ec2_credential_refresh_window", 60 * 10 

694 ) 

695 jitter = random.randint(120, 600) # Between 2 to 10 minutes 

696 refresh_interval_with_jitter = refresh_interval + jitter 

697 current_time = get_current_datetime() 

698 refresh_offset = datetime.timedelta( 

699 seconds=refresh_interval_with_jitter 

700 ) 

701 extension_time = expiration - refresh_offset 

702 if current_time >= extension_time: 

703 new_time = current_time + refresh_offset 

704 credentials["expiry_time"] = new_time.strftime( 

705 "%Y-%m-%dT%H:%M:%SZ" 

706 ) 

707 logger.info( 

708 f"Attempting credential expiration extension due to a " 

709 f"credential service availability issue. A refresh of " 

710 f"these credentials will be attempted again within " 

711 f"the next {refresh_interval_with_jitter / 60:.0f} minutes." 

712 ) 

713 except ValueError: 

714 logger.debug( 

715 f"Unable to parse expiry_time in {credentials['expiry_time']}" 

716 ) 

717 

718 

719class IMDSRegionProvider: 

720 def __init__(self, session, environ=None, fetcher=None): 

721 """Initialize IMDSRegionProvider. 

722 :type session: :class:`botocore.session.Session` 

723 :param session: The session is needed to look up configuration for 

724 how to contact the instance metadata service. Specifically the 

725 whether or not it should use the IMDS region at all, and if so how 

726 to configure the timeout and number of attempts to reach the 

727 service. 

728 :type environ: None or dict 

729 :param environ: A dictionary of environment variables to use. If 

730 ``None`` is the argument then ``os.environ`` will be used by 

731 default. 

732 :type fecther: :class:`botocore.utils.InstanceMetadataRegionFetcher` 

733 :param fetcher: The class to actually handle the fetching of the region 

734 from the IMDS. If not provided a default one will be created. 

735 """ 

736 self._session = session 

737 if environ is None: 

738 environ = os.environ 

739 self._environ = environ 

740 self._fetcher = fetcher 

741 

742 def provide(self): 

743 """Provide the region value from IMDS.""" 

744 instance_region = self._get_instance_metadata_region() 

745 return instance_region 

746 

747 def _get_instance_metadata_region(self): 

748 fetcher = self._get_fetcher() 

749 region = fetcher.retrieve_region() 

750 return region 

751 

752 def _get_fetcher(self): 

753 if self._fetcher is None: 

754 self._fetcher = self._create_fetcher() 

755 return self._fetcher 

756 

757 def _create_fetcher(self): 

758 metadata_timeout = self._session.get_config_variable( 

759 'metadata_service_timeout' 

760 ) 

761 metadata_num_attempts = self._session.get_config_variable( 

762 'metadata_service_num_attempts' 

763 ) 

764 imds_config = { 

765 'ec2_metadata_service_endpoint': self._session.get_config_variable( 

766 'ec2_metadata_service_endpoint' 

767 ), 

768 'ec2_metadata_service_endpoint_mode': resolve_imds_endpoint_mode( 

769 self._session 

770 ), 

771 'ec2_metadata_v1_disabled': self._session.get_config_variable( 

772 'ec2_metadata_v1_disabled' 

773 ), 

774 } 

775 fetcher = InstanceMetadataRegionFetcher( 

776 timeout=metadata_timeout, 

777 num_attempts=metadata_num_attempts, 

778 env=self._environ, 

779 user_agent=self._session.user_agent(), 

780 config=imds_config, 

781 ) 

782 return fetcher 

783 

784 

785class InstanceMetadataRegionFetcher(IMDSFetcher): 

786 _URL_PATH = 'latest/meta-data/placement/availability-zone/' 

787 

788 def retrieve_region(self): 

789 """Get the current region from the instance metadata service. 

790 :rvalue: str 

791 :returns: The region the current instance is running in or None 

792 if the instance metadata service cannot be contacted or does not 

793 give a valid response. 

794 :rtype: None or str 

795 :returns: Returns the region as a string if it is configured to use 

796 IMDS as a region source. Otherwise returns ``None``. It will also 

797 return ``None`` if it fails to get the region from IMDS due to 

798 exhausting its retries or not being able to connect. 

799 """ 

800 try: 

801 region = self._get_region() 

802 return region 

803 except self._RETRIES_EXCEEDED_ERROR_CLS: 

804 logger.debug( 

805 "Max number of attempts exceeded (%s) when " 

806 "attempting to retrieve data from metadata service.", 

807 self._num_attempts, 

808 ) 

809 return None 

810 

811 def _get_region(self): 

812 token = self._fetch_metadata_token() 

813 response = self._get_request( 

814 url_path=self._URL_PATH, 

815 retry_func=self._default_retry, 

816 token=token, 

817 ) 

818 availability_zone = response.text 

819 region = availability_zone[:-1] 

820 return region 

821 

822 

823def merge_dicts(dict1, dict2, append_lists=False): 

824 """Given two dict, merge the second dict into the first. 

825 

826 The dicts can have arbitrary nesting. 

827 

828 :param append_lists: If true, instead of clobbering a list with the new 

829 value, append all of the new values onto the original list. 

830 """ 

831 for key in dict2: 

832 if isinstance(dict2[key], dict): 

833 if key in dict1 and key in dict2: 

834 merge_dicts(dict1[key], dict2[key]) 

835 else: 

836 dict1[key] = dict2[key] 

837 # If the value is a list and the ``append_lists`` flag is set, 

838 # append the new values onto the original list 

839 elif isinstance(dict2[key], list) and append_lists: 

840 # The value in dict1 must be a list in order to append new 

841 # values onto it. 

842 if key in dict1 and isinstance(dict1[key], list): 

843 dict1[key].extend(dict2[key]) 

844 else: 

845 dict1[key] = dict2[key] 

846 else: 

847 # At scalar types, we iterate and merge the 

848 # current dict that we're on. 

849 dict1[key] = dict2[key] 

850 

851 

852def lowercase_dict(original): 

853 """Copies the given dictionary ensuring all keys are lowercase strings.""" 

854 copy = {} 

855 for key in original: 

856 copy[key.lower()] = original[key] 

857 return copy 

858 

859 

860def parse_key_val_file(filename, _open=open): 

861 try: 

862 with _open(filename) as f: 

863 contents = f.read() 

864 return parse_key_val_file_contents(contents) 

865 except OSError: 

866 raise ConfigNotFound(path=filename) 

867 

868 

869def parse_key_val_file_contents(contents): 

870 # This was originally extracted from the EC2 credential provider, which was 

871 # fairly lenient in its parsing. We only try to parse key/val pairs if 

872 # there's a '=' in the line. 

873 final = {} 

874 for line in contents.splitlines(): 

875 if '=' not in line: 

876 continue 

877 key, val = line.split('=', 1) 

878 key = key.strip() 

879 val = val.strip() 

880 final[key] = val 

881 return final 

882 

883 

884def percent_encode_sequence(mapping, safe=SAFE_CHARS): 

885 """Urlencode a dict or list into a string. 

886 

887 This is similar to urllib.urlencode except that: 

888 

889 * It uses quote, and not quote_plus 

890 * It has a default list of safe chars that don't need 

891 to be encoded, which matches what AWS services expect. 

892 

893 If any value in the input ``mapping`` is a list type, 

894 then each list element wil be serialized. This is the equivalent 

895 to ``urlencode``'s ``doseq=True`` argument. 

896 

897 This function should be preferred over the stdlib 

898 ``urlencode()`` function. 

899 

900 :param mapping: Either a dict to urlencode or a list of 

901 ``(key, value)`` pairs. 

902 

903 """ 

904 encoded_pairs = [] 

905 if hasattr(mapping, 'items'): 

906 pairs = mapping.items() 

907 else: 

908 pairs = mapping 

909 for key, value in pairs: 

910 if isinstance(value, list): 

911 for element in value: 

912 encoded_pairs.append( 

913 f'{percent_encode(key)}={percent_encode(element)}' 

914 ) 

915 else: 

916 encoded_pairs.append( 

917 f'{percent_encode(key)}={percent_encode(value)}' 

918 ) 

919 return '&'.join(encoded_pairs) 

920 

921 

922def percent_encode(input_str, safe=SAFE_CHARS): 

923 """Urlencodes a string. 

924 

925 Whereas percent_encode_sequence handles taking a dict/sequence and 

926 producing a percent encoded string, this function deals only with 

927 taking a string (not a dict/sequence) and percent encoding it. 

928 

929 If given the binary type, will simply URL encode it. If given the 

930 text type, will produce the binary type by UTF-8 encoding the 

931 text. If given something else, will convert it to the text type 

932 first. 

933 """ 

934 # If its not a binary or text string, make it a text string. 

935 if not isinstance(input_str, (bytes, str)): 

936 input_str = str(input_str) 

937 # If it's not bytes, make it bytes by UTF-8 encoding it. 

938 if not isinstance(input_str, bytes): 

939 input_str = input_str.encode('utf-8') 

940 return quote(input_str, safe=safe) 

941 

942 

943def _epoch_seconds_to_datetime(value, tzinfo): 

944 """Parse numerical epoch timestamps (seconds since 1970) into a 

945 ``datetime.datetime`` in UTC using ``datetime.timedelta``. This is intended 

946 as fallback when ``fromtimestamp`` raises ``OverflowError`` or ``OSError``. 

947 

948 :type value: float or int 

949 :param value: The Unix timestamps as number. 

950 

951 :type tzinfo: callable 

952 :param tzinfo: A ``datetime.tzinfo`` class or compatible callable. 

953 """ 

954 epoch_zero = datetime.datetime(1970, 1, 1, 0, 0, 0, tzinfo=tzutc()) 

955 epoch_zero_localized = epoch_zero.astimezone(tzinfo()) 

956 return epoch_zero_localized + datetime.timedelta(seconds=value) 

957 

958 

959def _parse_timestamp_with_tzinfo(value, tzinfo): 

960 """Parse timestamp with pluggable tzinfo options.""" 

961 if isinstance(value, (int, float)): 

962 # Possibly an epoch time. 

963 return datetime.datetime.fromtimestamp(value, tzinfo()) 

964 else: 

965 try: 

966 return datetime.datetime.fromtimestamp(float(value), tzinfo()) 

967 except (TypeError, ValueError): 

968 pass 

969 try: 

970 # In certain cases, a timestamp marked with GMT can be parsed into a 

971 # different time zone, so here we provide a context which will 

972 # enforce that GMT == UTC. 

973 return dateutil.parser.parse(value, tzinfos={'GMT': tzutc()}) 

974 except (TypeError, ValueError) as e: 

975 raise ValueError(f'Invalid timestamp "{value}": {e}') 

976 

977 

978def parse_timestamp(value): 

979 """Parse a timestamp into a datetime object. 

980 

981 Supported formats: 

982 

983 * iso8601 

984 * rfc822 

985 * epoch (value is an integer) 

986 

987 This will return a ``datetime.datetime`` object. 

988 

989 """ 

990 tzinfo_options = get_tzinfo_options() 

991 for tzinfo in tzinfo_options: 

992 try: 

993 return _parse_timestamp_with_tzinfo(value, tzinfo) 

994 except (OSError, OverflowError) as e: 

995 logger.debug( 

996 'Unable to parse timestamp with "%s" timezone info.', 

997 tzinfo.__name__, 

998 exc_info=e, 

999 ) 

1000 # For numeric values attempt fallback to using fromtimestamp-free method. 

1001 # From Python's ``datetime.datetime.fromtimestamp`` documentation: "This 

1002 # may raise ``OverflowError``, if the timestamp is out of the range of 

1003 # values supported by the platform C localtime() function, and ``OSError`` 

1004 # on localtime() failure. It's common for this to be restricted to years 

1005 # from 1970 through 2038." 

1006 try: 

1007 numeric_value = float(value) 

1008 except (TypeError, ValueError): 

1009 pass 

1010 else: 

1011 try: 

1012 for tzinfo in tzinfo_options: 

1013 return _epoch_seconds_to_datetime(numeric_value, tzinfo=tzinfo) 

1014 except (OSError, OverflowError) as e: 

1015 logger.debug( 

1016 'Unable to parse timestamp using fallback method with "%s" ' 

1017 'timezone info.', 

1018 tzinfo.__name__, 

1019 exc_info=e, 

1020 ) 

1021 raise RuntimeError( 

1022 f'Unable to calculate correct timezone offset for "{value}"' 

1023 ) 

1024 

1025 

1026def parse_to_aware_datetime(value): 

1027 """Converted the passed in value to a datetime object with tzinfo. 

1028 

1029 This function can be used to normalize all timestamp inputs. This 

1030 function accepts a number of different types of inputs, but 

1031 will always return a datetime.datetime object with time zone 

1032 information. 

1033 

1034 The input param ``value`` can be one of several types: 

1035 

1036 * A datetime object (both naive and aware) 

1037 * An integer representing the epoch time (can also be a string 

1038 of the integer, i.e '0', instead of 0). The epoch time is 

1039 considered to be UTC. 

1040 * An iso8601 formatted timestamp. This does not need to be 

1041 a complete timestamp, it can contain just the date portion 

1042 without the time component. 

1043 

1044 The returned value will be a datetime object that will have tzinfo. 

1045 If no timezone info was provided in the input value, then UTC is 

1046 assumed, not local time. 

1047 

1048 """ 

1049 # This is a general purpose method that handles several cases of 

1050 # converting the provided value to a string timestamp suitable to be 

1051 # serialized to an http request. It can handle: 

1052 # 1) A datetime.datetime object. 

1053 if isinstance(value, _DatetimeClass): 

1054 datetime_obj = value 

1055 else: 

1056 # 2) A string object that's formatted as a timestamp. 

1057 # We document this as being an iso8601 timestamp, although 

1058 # parse_timestamp is a bit more flexible. 

1059 datetime_obj = parse_timestamp(value) 

1060 if datetime_obj.tzinfo is None: 

1061 # I think a case would be made that if no time zone is provided, 

1062 # we should use the local time. However, to restore backwards 

1063 # compat, the previous behavior was to assume UTC, which is 

1064 # what we're going to do here. 

1065 datetime_obj = datetime_obj.replace(tzinfo=tzutc()) 

1066 else: 

1067 datetime_obj = datetime_obj.astimezone(tzutc()) 

1068 return datetime_obj 

1069 

1070 

1071def datetime2timestamp(dt, default_timezone=None): 

1072 """Calculate the timestamp based on the given datetime instance. 

1073 

1074 :type dt: datetime 

1075 :param dt: A datetime object to be converted into timestamp 

1076 :type default_timezone: tzinfo 

1077 :param default_timezone: If it is provided as None, we treat it as tzutc(). 

1078 But it is only used when dt is a naive datetime. 

1079 :returns: The timestamp 

1080 """ 

1081 epoch = datetime.datetime(1970, 1, 1) 

1082 if dt.tzinfo is None: 

1083 if default_timezone is None: 

1084 default_timezone = tzutc() 

1085 dt = dt.replace(tzinfo=default_timezone) 

1086 d = dt.replace(tzinfo=None) - dt.utcoffset() - epoch 

1087 return d.total_seconds() 

1088 

1089 

1090def calculate_sha256(body, as_hex=False): 

1091 """Calculate a sha256 checksum. 

1092 

1093 This method will calculate the sha256 checksum of a file like 

1094 object. Note that this method will iterate through the entire 

1095 file contents. The caller is responsible for ensuring the proper 

1096 starting position of the file and ``seek()``'ing the file back 

1097 to its starting location if other consumers need to read from 

1098 the file like object. 

1099 

1100 :param body: Any file like object. The file must be opened 

1101 in binary mode such that a ``.read()`` call returns bytes. 

1102 :param as_hex: If True, then the hex digest is returned. 

1103 If False, then the digest (as binary bytes) is returned. 

1104 

1105 :returns: The sha256 checksum 

1106 

1107 """ 

1108 checksum = hashlib.sha256() 

1109 for chunk in iter(lambda: body.read(1024 * 1024), b''): 

1110 checksum.update(chunk) 

1111 if as_hex: 

1112 return checksum.hexdigest() 

1113 else: 

1114 return checksum.digest() 

1115 

1116 

1117def calculate_tree_hash(body): 

1118 """Calculate a tree hash checksum. 

1119 

1120 For more information see: 

1121 

1122 http://docs.aws.amazon.com/amazonglacier/latest/dev/checksum-calculations.html 

1123 

1124 :param body: Any file like object. This has the same constraints as 

1125 the ``body`` param in calculate_sha256 

1126 

1127 :rtype: str 

1128 :returns: The hex version of the calculated tree hash 

1129 

1130 """ 

1131 chunks = [] 

1132 required_chunk_size = 1024 * 1024 

1133 sha256 = hashlib.sha256 

1134 for chunk in iter(lambda: body.read(required_chunk_size), b''): 

1135 chunks.append(sha256(chunk).digest()) 

1136 if not chunks: 

1137 return sha256(b'').hexdigest() 

1138 while len(chunks) > 1: 

1139 new_chunks = [] 

1140 for first, second in _in_pairs(chunks): 

1141 if second is not None: 

1142 new_chunks.append(sha256(first + second).digest()) 

1143 else: 

1144 # We're at the end of the list and there's no pair left. 

1145 new_chunks.append(first) 

1146 chunks = new_chunks 

1147 return binascii.hexlify(chunks[0]).decode('ascii') 

1148 

1149 

1150def _in_pairs(iterable): 

1151 # Creates iterator that iterates over the list in pairs: 

1152 # for a, b in _in_pairs([0, 1, 2, 3, 4]): 

1153 # print(a, b) 

1154 # 

1155 # will print: 

1156 # 0, 1 

1157 # 2, 3 

1158 # 4, None 

1159 shared_iter = iter(iterable) 

1160 # Note that zip_longest is a compat import that uses 

1161 # the itertools izip_longest. This creates an iterator, 

1162 # this call below does _not_ immediately create the list 

1163 # of pairs. 

1164 return zip_longest(shared_iter, shared_iter) 

1165 

1166 

1167class CachedProperty: 

1168 """A read only property that caches the initially computed value. 

1169 

1170 This descriptor will only call the provided ``fget`` function once. 

1171 Subsequent access to this property will return the cached value. 

1172 

1173 """ 

1174 

1175 def __init__(self, fget): 

1176 self._fget = fget 

1177 

1178 def __get__(self, obj, cls): 

1179 if obj is None: 

1180 return self 

1181 else: 

1182 computed_value = self._fget(obj) 

1183 obj.__dict__[self._fget.__name__] = computed_value 

1184 return computed_value 

1185 

1186 

1187class ArgumentGenerator: 

1188 """Generate sample input based on a shape model. 

1189 

1190 This class contains a ``generate_skeleton`` method that will take 

1191 an input/output shape (created from ``botocore.model``) and generate 

1192 a sample dictionary corresponding to the input/output shape. 

1193 

1194 The specific values used are place holder values. For strings either an 

1195 empty string or the member name can be used, for numbers 0 or 0.0 is used. 

1196 The intended usage of this class is to generate the *shape* of the input 

1197 structure. 

1198 

1199 This can be useful for operations that have complex input shapes. 

1200 This allows a user to just fill in the necessary data instead of 

1201 worrying about the specific structure of the input arguments. 

1202 

1203 Example usage:: 

1204 

1205 s = botocore.session.get_session() 

1206 ddb = s.get_service_model('dynamodb') 

1207 arg_gen = ArgumentGenerator() 

1208 sample_input = arg_gen.generate_skeleton( 

1209 ddb.operation_model('CreateTable').input_shape) 

1210 print("Sample input for dynamodb.CreateTable: %s" % sample_input) 

1211 

1212 """ 

1213 

1214 def __init__(self, use_member_names=False): 

1215 self._use_member_names = use_member_names 

1216 

1217 def generate_skeleton(self, shape): 

1218 """Generate a sample input. 

1219 

1220 :type shape: ``botocore.model.Shape`` 

1221 :param shape: The input shape. 

1222 

1223 :return: The generated skeleton input corresponding to the 

1224 provided input shape. 

1225 

1226 """ 

1227 stack = [] 

1228 return self._generate_skeleton(shape, stack) 

1229 

1230 def _generate_skeleton(self, shape, stack, name=''): 

1231 stack.append(shape.name) 

1232 try: 

1233 if shape.type_name == 'structure': 

1234 return self._generate_type_structure(shape, stack) 

1235 elif shape.type_name == 'list': 

1236 return self._generate_type_list(shape, stack) 

1237 elif shape.type_name == 'map': 

1238 return self._generate_type_map(shape, stack) 

1239 elif shape.type_name == 'string': 

1240 if self._use_member_names: 

1241 return name 

1242 if shape.enum: 

1243 return random.choice(shape.enum) 

1244 return '' 

1245 elif shape.type_name in ['integer', 'long']: 

1246 return 0 

1247 elif shape.type_name in ['float', 'double']: 

1248 return 0.0 

1249 elif shape.type_name == 'boolean': 

1250 return True 

1251 elif shape.type_name == 'timestamp': 

1252 return datetime.datetime(1970, 1, 1, 0, 0, 0) 

1253 finally: 

1254 stack.pop() 

1255 

1256 def _generate_type_structure(self, shape, stack): 

1257 if stack.count(shape.name) > 1: 

1258 return {} 

1259 skeleton = OrderedDict() 

1260 for member_name, member_shape in shape.members.items(): 

1261 skeleton[member_name] = self._generate_skeleton( 

1262 member_shape, stack, name=member_name 

1263 ) 

1264 return skeleton 

1265 

1266 def _generate_type_list(self, shape, stack): 

1267 # For list elements we've arbitrarily decided to 

1268 # return two elements for the skeleton list. 

1269 name = '' 

1270 if self._use_member_names: 

1271 name = shape.member.name 

1272 return [ 

1273 self._generate_skeleton(shape.member, stack, name), 

1274 ] 

1275 

1276 def _generate_type_map(self, shape, stack): 

1277 key_shape = shape.key 

1278 value_shape = shape.value 

1279 assert key_shape.type_name == 'string' 

1280 return OrderedDict( 

1281 [ 

1282 ('KeyName', self._generate_skeleton(value_shape, stack)), 

1283 ] 

1284 ) 

1285 

1286 

1287def is_valid_ipv6_endpoint_url(endpoint_url): 

1288 if UNSAFE_URL_CHARS.intersection(endpoint_url): 

1289 return False 

1290 hostname = f'[{urlparse(endpoint_url).hostname}]' 

1291 return IPV6_ADDRZ_RE.match(hostname) is not None 

1292 

1293 

1294def is_valid_ipv4_endpoint_url(endpoint_url): 

1295 hostname = urlparse(endpoint_url).hostname 

1296 return IPV4_RE.match(hostname) is not None 

1297 

1298 

1299def is_valid_endpoint_url(endpoint_url): 

1300 """Verify the endpoint_url is valid. 

1301 

1302 :type endpoint_url: string 

1303 :param endpoint_url: An endpoint_url. Must have at least a scheme 

1304 and a hostname. 

1305 

1306 :return: True if the endpoint url is valid. False otherwise. 

1307 

1308 """ 

1309 # post-bpo-43882 urlsplit() strips unsafe characters from URL, causing 

1310 # it to pass hostname validation below. Detect them early to fix that. 

1311 if UNSAFE_URL_CHARS.intersection(endpoint_url): 

1312 return False 

1313 parts = urlsplit(endpoint_url) 

1314 hostname = parts.hostname 

1315 if hostname is None: 

1316 return False 

1317 if len(hostname) > 255: 

1318 return False 

1319 if hostname[-1] == ".": 

1320 hostname = hostname[:-1] 

1321 allowed = re.compile( 

1322 r"^((?!-)[A-Z\d-]{1,63}(?<!-)\.)*((?!-)[A-Z\d-]{1,63}(?<!-))$", 

1323 re.IGNORECASE, 

1324 ) 

1325 return allowed.match(hostname) 

1326 

1327 

1328def is_valid_uri(endpoint_url): 

1329 return is_valid_endpoint_url(endpoint_url) or is_valid_ipv6_endpoint_url( 

1330 endpoint_url 

1331 ) 

1332 

1333 

1334def validate_region_name(region_name): 

1335 """Provided region_name must be a valid host label.""" 

1336 if region_name is None: 

1337 return 

1338 valid_host_label = re.compile(r'^(?![0-9]+$)(?!-)[a-zA-Z0-9-]{,63}(?<!-)$') 

1339 valid = valid_host_label.match(region_name) 

1340 if not valid: 

1341 raise InvalidRegionError(region_name=region_name) 

1342 

1343 

1344def check_dns_name(bucket_name): 

1345 """ 

1346 Check to see if the ``bucket_name`` complies with the 

1347 restricted DNS naming conventions necessary to allow 

1348 access via virtual-hosting style. 

1349 

1350 Even though "." characters are perfectly valid in this DNS 

1351 naming scheme, we are going to punt on any name containing a 

1352 "." character because these will cause SSL cert validation 

1353 problems if we try to use virtual-hosting style addressing. 

1354 """ 

1355 if '.' in bucket_name: 

1356 return False 

1357 n = len(bucket_name) 

1358 if n < 3 or n > 63: 

1359 # Wrong length 

1360 return False 

1361 match = LABEL_RE.match(bucket_name) 

1362 if match is None or match.end() != len(bucket_name): 

1363 return False 

1364 return True 

1365 

1366 

1367def fix_s3_host( 

1368 request, 

1369 signature_version, 

1370 region_name, 

1371 default_endpoint_url=None, 

1372 **kwargs, 

1373): 

1374 """ 

1375 This handler looks at S3 requests just before they are signed. 

1376 If there is a bucket name on the path (true for everything except 

1377 ListAllBuckets) it checks to see if that bucket name conforms to 

1378 the DNS naming conventions. If it does, it alters the request to 

1379 use ``virtual hosting`` style addressing rather than ``path-style`` 

1380 addressing. 

1381 

1382 """ 

1383 if request.context.get('use_global_endpoint', False): 

1384 default_endpoint_url = 's3.amazonaws.com' 

1385 try: 

1386 switch_to_virtual_host_style( 

1387 request, signature_version, default_endpoint_url 

1388 ) 

1389 except InvalidDNSNameError as e: 

1390 bucket_name = e.kwargs['bucket_name'] 

1391 logger.debug( 

1392 'Not changing URI, bucket is not DNS compatible: %s', bucket_name 

1393 ) 

1394 

1395 

1396def switch_to_virtual_host_style( 

1397 request, signature_version, default_endpoint_url=None, **kwargs 

1398): 

1399 """ 

1400 This is a handler to force virtual host style s3 addressing no matter 

1401 the signature version (which is taken in consideration for the default 

1402 case). If the bucket is not DNS compatible an InvalidDNSName is thrown. 

1403 

1404 :param request: A AWSRequest object that is about to be sent. 

1405 :param signature_version: The signature version to sign with 

1406 :param default_endpoint_url: The endpoint to use when switching to a 

1407 virtual style. If None is supplied, the virtual host will be 

1408 constructed from the url of the request. 

1409 """ 

1410 if request.auth_path is not None: 

1411 # The auth_path has already been applied (this may be a 

1412 # retried request). We don't need to perform this 

1413 # customization again. 

1414 return 

1415 elif _is_get_bucket_location_request(request): 

1416 # For the GetBucketLocation response, we should not be using 

1417 # the virtual host style addressing so we can avoid any sigv4 

1418 # issues. 

1419 logger.debug( 

1420 "Request is GetBucketLocation operation, not checking " 

1421 "for DNS compatibility." 

1422 ) 

1423 return 

1424 parts = urlsplit(request.url) 

1425 request.auth_path = parts.path 

1426 path_parts = parts.path.split('/') 

1427 

1428 # Retrieve what the endpoint we will be prepending the bucket name to. 

1429 if default_endpoint_url is None: 

1430 default_endpoint_url = parts.netloc 

1431 

1432 if len(path_parts) > 1: 

1433 bucket_name = path_parts[1] 

1434 if not bucket_name: 

1435 # If the bucket name is empty we should not be checking for 

1436 # dns compatibility. 

1437 return 

1438 logger.debug('Checking for DNS compatible bucket for: %s', request.url) 

1439 if check_dns_name(bucket_name): 

1440 # If the operation is on a bucket, the auth_path must be 

1441 # terminated with a '/' character. 

1442 if len(path_parts) == 2: 

1443 if request.auth_path[-1] != '/': 

1444 request.auth_path += '/' 

1445 path_parts.remove(bucket_name) 

1446 # At the very least the path must be a '/', such as with the 

1447 # CreateBucket operation when DNS style is being used. If this 

1448 # is not used you will get an empty path which is incorrect. 

1449 path = '/'.join(path_parts) or '/' 

1450 global_endpoint = default_endpoint_url 

1451 host = bucket_name + '.' + global_endpoint 

1452 new_tuple = (parts.scheme, host, path, parts.query, '') 

1453 new_uri = urlunsplit(new_tuple) 

1454 request.url = new_uri 

1455 logger.debug('URI updated to: %s', new_uri) 

1456 else: 

1457 raise InvalidDNSNameError(bucket_name=bucket_name) 

1458 

1459 

1460def _is_get_bucket_location_request(request): 

1461 return request.url.endswith('?location') 

1462 

1463 

1464def instance_cache(func): 

1465 """Method decorator for caching method calls to a single instance. 

1466 

1467 **This is not a general purpose caching decorator.** 

1468 

1469 In order to use this, you *must* provide an ``_instance_cache`` 

1470 attribute on the instance. 

1471 

1472 This decorator is used to cache method calls. The cache is only 

1473 scoped to a single instance though such that multiple instances 

1474 will maintain their own cache. In order to keep things simple, 

1475 this decorator requires that you provide an ``_instance_cache`` 

1476 attribute on your instance. 

1477 

1478 """ 

1479 func_name = func.__name__ 

1480 

1481 @functools.wraps(func) 

1482 def _cache_guard(self, *args, **kwargs): 

1483 cache_key = (func_name, args) 

1484 if kwargs: 

1485 kwarg_items = tuple(sorted(kwargs.items())) 

1486 cache_key = (func_name, args, kwarg_items) 

1487 result = self._instance_cache.get(cache_key) 

1488 if result is not None: 

1489 return result 

1490 result = func(self, *args, **kwargs) 

1491 self._instance_cache[cache_key] = result 

1492 return result 

1493 

1494 return _cache_guard 

1495 

1496 

1497def lru_cache_weakref(*cache_args, **cache_kwargs): 

1498 """ 

1499 Version of functools.lru_cache that stores a weak reference to ``self``. 

1500 

1501 Serves the same purpose as :py:func:`instance_cache` but uses Python's 

1502 functools implementation which offers ``max_size`` and ``typed`` properties. 

1503 

1504 lru_cache is a global cache even when used on a method. The cache's 

1505 reference to ``self`` will prevent garbage collection of the object. This 

1506 wrapper around functools.lru_cache replaces the reference to ``self`` with 

1507 a weak reference to not interfere with garbage collection. 

1508 """ 

1509 

1510 def wrapper(func): 

1511 @functools.lru_cache(*cache_args, **cache_kwargs) 

1512 def func_with_weakref(weakref_to_self, *args, **kwargs): 

1513 return func(weakref_to_self(), *args, **kwargs) 

1514 

1515 @functools.wraps(func) 

1516 def inner(self, *args, **kwargs): 

1517 for kwarg_key, kwarg_value in kwargs.items(): 

1518 if isinstance(kwarg_value, list): 

1519 kwargs[kwarg_key] = tuple(kwarg_value) 

1520 return func_with_weakref(weakref.ref(self), *args, **kwargs) 

1521 

1522 inner.cache_info = func_with_weakref.cache_info 

1523 return inner 

1524 

1525 return wrapper 

1526 

1527 

1528def switch_host_s3_accelerate(request, operation_name, **kwargs): 

1529 """Switches the current s3 endpoint with an S3 Accelerate endpoint""" 

1530 

1531 # Note that when registered the switching of the s3 host happens 

1532 # before it gets changed to virtual. So we are not concerned with ensuring 

1533 # that the bucket name is translated to the virtual style here and we 

1534 # can hard code the Accelerate endpoint. 

1535 parts = urlsplit(request.url).netloc.split('.') 

1536 parts = [p for p in parts if p in S3_ACCELERATE_WHITELIST] 

1537 endpoint = 'https://s3-accelerate.' 

1538 if len(parts) > 0: 

1539 endpoint += '.'.join(parts) + '.' 

1540 endpoint += 'amazonaws.com' 

1541 

1542 if operation_name in ['ListBuckets', 'CreateBucket', 'DeleteBucket']: 

1543 return 

1544 _switch_hosts(request, endpoint, use_new_scheme=False) 

1545 

1546 

1547def switch_host_with_param(request, param_name): 

1548 """Switches the host using a parameter value from a JSON request body""" 

1549 request_json = json.loads(request.data.decode('utf-8')) 

1550 if request_json.get(param_name): 

1551 new_endpoint = request_json[param_name] 

1552 _switch_hosts(request, new_endpoint) 

1553 

1554 

1555def _switch_hosts(request, new_endpoint, use_new_scheme=True): 

1556 final_endpoint = _get_new_endpoint( 

1557 request.url, new_endpoint, use_new_scheme 

1558 ) 

1559 request.url = final_endpoint 

1560 

1561 

1562def _get_new_endpoint(original_endpoint, new_endpoint, use_new_scheme=True): 

1563 new_endpoint_components = urlsplit(new_endpoint) 

1564 original_endpoint_components = urlsplit(original_endpoint) 

1565 scheme = original_endpoint_components.scheme 

1566 if use_new_scheme: 

1567 scheme = new_endpoint_components.scheme 

1568 final_endpoint_components = ( 

1569 scheme, 

1570 new_endpoint_components.netloc, 

1571 original_endpoint_components.path, 

1572 original_endpoint_components.query, 

1573 '', 

1574 ) 

1575 final_endpoint = urlunsplit(final_endpoint_components) 

1576 logger.debug(f'Updating URI from {original_endpoint} to {final_endpoint}') 

1577 return final_endpoint 

1578 

1579 

1580def deep_merge(base, extra): 

1581 """Deeply two dictionaries, overriding existing keys in the base. 

1582 

1583 :param base: The base dictionary which will be merged into. 

1584 :param extra: The dictionary to merge into the base. Keys from this 

1585 dictionary will take precedence. 

1586 """ 

1587 for key in extra: 

1588 # If the key represents a dict on both given dicts, merge the sub-dicts 

1589 if ( 

1590 key in base 

1591 and isinstance(base[key], dict) 

1592 and isinstance(extra[key], dict) 

1593 ): 

1594 deep_merge(base[key], extra[key]) 

1595 continue 

1596 

1597 # Otherwise, set the key on the base to be the value of the extra. 

1598 base[key] = extra[key] 

1599 

1600 

1601def hyphenize_service_id(service_id): 

1602 """Translate the form used for event emitters. 

1603 

1604 :param service_id: The service_id to convert. 

1605 """ 

1606 return service_id.replace(' ', '-').lower() 

1607 

1608 

1609class IdentityCache: 

1610 """Base IdentityCache implementation for storing and retrieving 

1611 highly accessed credentials. 

1612 

1613 This class is not intended to be instantiated in user code. 

1614 """ 

1615 

1616 METHOD = "base_identity_cache" 

1617 

1618 def __init__(self, client, credential_cls): 

1619 self._client = client 

1620 self._credential_cls = credential_cls 

1621 

1622 def get_credentials(self, **kwargs): 

1623 callback = self.build_refresh_callback(**kwargs) 

1624 metadata = callback() 

1625 credential_entry = self._credential_cls.create_from_metadata( 

1626 metadata=metadata, 

1627 refresh_using=callback, 

1628 method=self.METHOD, 

1629 advisory_timeout=45, 

1630 mandatory_timeout=10, 

1631 ) 

1632 return credential_entry 

1633 

1634 def build_refresh_callback(**kwargs): 

1635 """Callback to be implemented by subclasses. 

1636 

1637 Returns a set of metadata to be converted into a new 

1638 credential instance. 

1639 """ 

1640 raise NotImplementedError() 

1641 

1642 

1643class S3ExpressIdentityCache(IdentityCache): 

1644 """S3Express IdentityCache for retrieving and storing 

1645 credentials from CreateSession calls. 

1646 

1647 This class is not intended to be instantiated in user code. 

1648 """ 

1649 

1650 METHOD = "s3express" 

1651 

1652 def __init__(self, client, credential_cls): 

1653 self._client = client 

1654 self._credential_cls = credential_cls 

1655 

1656 @functools.lru_cache(maxsize=100) 

1657 def get_credentials(self, bucket): 

1658 return super().get_credentials(bucket=bucket) 

1659 

1660 def build_refresh_callback(self, bucket): 

1661 def refresher(): 

1662 response = self._client.create_session(Bucket=bucket) 

1663 creds = response['Credentials'] 

1664 expiration = self._serialize_if_needed( 

1665 creds['Expiration'], iso=True 

1666 ) 

1667 return { 

1668 "access_key": creds['AccessKeyId'], 

1669 "secret_key": creds['SecretAccessKey'], 

1670 "token": creds['SessionToken'], 

1671 "expiry_time": expiration, 

1672 } 

1673 

1674 return refresher 

1675 

1676 def _serialize_if_needed(self, value, iso=False): 

1677 if isinstance(value, _DatetimeClass): 

1678 if iso: 

1679 return value.isoformat() 

1680 return value.strftime('%Y-%m-%dT%H:%M:%S%Z') 

1681 return value 

1682 

1683 

1684class S3ExpressIdentityResolver: 

1685 def __init__(self, client, credential_cls, cache=None): 

1686 self._client = weakref.proxy(client) 

1687 

1688 if cache is None: 

1689 cache = S3ExpressIdentityCache(self._client, credential_cls) 

1690 self._cache = cache 

1691 

1692 def register(self, event_emitter=None): 

1693 logger.debug('Registering S3Express Identity Resolver') 

1694 emitter = event_emitter or self._client.meta.events 

1695 emitter.register('before-call.s3', self.apply_signing_cache_key) 

1696 emitter.register('before-sign.s3', self.resolve_s3express_identity) 

1697 

1698 def apply_signing_cache_key(self, params, context, **kwargs): 

1699 endpoint_properties = context.get('endpoint_properties', {}) 

1700 backend = endpoint_properties.get('backend', None) 

1701 

1702 # Add cache key if Bucket supplied for s3express request 

1703 bucket_name = context.get('input_params', {}).get('Bucket') 

1704 if backend == 'S3Express' and bucket_name is not None: 

1705 context.setdefault('signing', {}) 

1706 context['signing']['cache_key'] = bucket_name 

1707 

1708 def resolve_s3express_identity( 

1709 self, 

1710 request, 

1711 signing_name, 

1712 region_name, 

1713 signature_version, 

1714 request_signer, 

1715 operation_name, 

1716 **kwargs, 

1717 ): 

1718 signing_context = request.context.get('signing', {}) 

1719 signing_name = signing_context.get('signing_name') 

1720 if signing_name == 's3express' and signature_version.startswith( 

1721 'v4-s3express' 

1722 ): 

1723 signing_context['identity_cache'] = self._cache 

1724 if 'cache_key' not in signing_context: 

1725 signing_context['cache_key'] = ( 

1726 request.context.get('s3_redirect', {}) 

1727 .get('params', {}) 

1728 .get('Bucket') 

1729 ) 

1730 

1731 

1732class S3RegionRedirectorv2: 

1733 """Updated version of S3RegionRedirector for use when 

1734 EndpointRulesetResolver is in use for endpoint resolution. 

1735 

1736 This class is considered private and subject to abrupt breaking changes or 

1737 removal without prior announcement. Please do not use it directly. 

1738 """ 

1739 

1740 def __init__(self, endpoint_bridge, client, cache=None): 

1741 self._cache = cache or {} 

1742 self._client = weakref.proxy(client) 

1743 

1744 def register(self, event_emitter=None): 

1745 logger.debug('Registering S3 region redirector handler') 

1746 emitter = event_emitter or self._client.meta.events 

1747 emitter.register('needs-retry.s3', self.redirect_from_error) 

1748 emitter.register( 

1749 'before-parameter-build.s3', self.annotate_request_context 

1750 ) 

1751 emitter.register( 

1752 'before-endpoint-resolution.s3', self.redirect_from_cache 

1753 ) 

1754 

1755 def redirect_from_error(self, request_dict, response, operation, **kwargs): 

1756 """ 

1757 An S3 request sent to the wrong region will return an error that 

1758 contains the endpoint the request should be sent to. This handler 

1759 will add the redirect information to the signing context and then 

1760 redirect the request. 

1761 """ 

1762 if response is None: 

1763 # This could be none if there was a ConnectionError or other 

1764 # transport error. 

1765 return 

1766 

1767 redirect_ctx = request_dict.get('context', {}).get('s3_redirect', {}) 

1768 if ArnParser.is_arn(redirect_ctx.get('bucket')): 

1769 logger.debug( 

1770 'S3 request was previously for an Accesspoint ARN, not ' 

1771 'redirecting.' 

1772 ) 

1773 return 

1774 

1775 if redirect_ctx.get('redirected'): 

1776 logger.debug( 

1777 'S3 request was previously redirected, not redirecting.' 

1778 ) 

1779 return 

1780 

1781 error = response[1].get('Error', {}) 

1782 error_code = error.get('Code') 

1783 response_metadata = response[1].get('ResponseMetadata', {}) 

1784 

1785 # We have to account for 400 responses because 

1786 # if we sign a Head* request with the wrong region, 

1787 # we'll get a 400 Bad Request but we won't get a 

1788 # body saying it's an "AuthorizationHeaderMalformed". 

1789 is_special_head_object = ( 

1790 error_code in ('301', '400') and operation.name == 'HeadObject' 

1791 ) 

1792 is_special_head_bucket = ( 

1793 error_code in ('301', '400') 

1794 and operation.name == 'HeadBucket' 

1795 and 'x-amz-bucket-region' 

1796 in response_metadata.get('HTTPHeaders', {}) 

1797 ) 

1798 is_wrong_signing_region = ( 

1799 error_code == 'AuthorizationHeaderMalformed' and 'Region' in error 

1800 ) 

1801 is_redirect_status = response[0] is not None and response[ 

1802 0 

1803 ].status_code in (301, 302, 307) 

1804 is_permanent_redirect = error_code == 'PermanentRedirect' 

1805 is_opt_in_region_redirect = ( 

1806 error_code == 'IllegalLocationConstraintException' 

1807 and operation.name != 'CreateBucket' 

1808 ) 

1809 if not any( 

1810 [ 

1811 is_special_head_object, 

1812 is_wrong_signing_region, 

1813 is_permanent_redirect, 

1814 is_special_head_bucket, 

1815 is_redirect_status, 

1816 is_opt_in_region_redirect, 

1817 ] 

1818 ): 

1819 return 

1820 

1821 bucket = request_dict['context']['s3_redirect']['bucket'] 

1822 client_region = request_dict['context'].get('client_region') 

1823 new_region = self.get_bucket_region(bucket, response) 

1824 

1825 if new_region is None: 

1826 logger.debug( 

1827 f"S3 client configured for region {client_region} but the " 

1828 f"bucket {bucket} is not in that region and the proper region " 

1829 "could not be automatically determined." 

1830 ) 

1831 return 

1832 

1833 logger.debug( 

1834 f"S3 client configured for region {client_region} but the bucket {bucket} " 

1835 f"is in region {new_region}; Please configure the proper region to " 

1836 f"avoid multiple unnecessary redirects and signing attempts." 

1837 ) 

1838 # Adding the new region to _cache will make construct_endpoint() to 

1839 # use the new region as value for the AWS::Region builtin parameter. 

1840 self._cache[bucket] = new_region 

1841 

1842 # Re-resolve endpoint with new region and modify request_dict with 

1843 # the new URL, auth scheme, and signing context. 

1844 ep_resolver = self._client._ruleset_resolver 

1845 ep_info = ep_resolver.construct_endpoint( 

1846 operation_model=operation, 

1847 call_args=request_dict['context']['s3_redirect']['params'], 

1848 request_context=request_dict['context'], 

1849 ) 

1850 request_dict['url'] = self.set_request_url( 

1851 request_dict['url'], ep_info.url 

1852 ) 

1853 request_dict['context']['s3_redirect']['redirected'] = True 

1854 auth_schemes = ep_info.properties.get('authSchemes') 

1855 if auth_schemes is not None: 

1856 auth_info = ep_resolver.auth_schemes_to_signing_ctx(auth_schemes) 

1857 auth_type, signing_context = auth_info 

1858 request_dict['context']['auth_type'] = auth_type 

1859 request_dict['context']['signing'] = { 

1860 **request_dict['context'].get('signing', {}), 

1861 **signing_context, 

1862 } 

1863 

1864 # Return 0 so it doesn't wait to retry 

1865 return 0 

1866 

1867 def get_bucket_region(self, bucket, response): 

1868 """ 

1869 There are multiple potential sources for the new region to redirect to, 

1870 but they aren't all universally available for use. This will try to 

1871 find region from response elements, but will fall back to calling 

1872 HEAD on the bucket if all else fails. 

1873 

1874 :param bucket: The bucket to find the region for. This is necessary if 

1875 the region is not available in the error response. 

1876 :param response: A response representing a service request that failed 

1877 due to incorrect region configuration. 

1878 """ 

1879 # First try to source the region from the headers. 

1880 service_response = response[1] 

1881 response_headers = service_response['ResponseMetadata']['HTTPHeaders'] 

1882 if 'x-amz-bucket-region' in response_headers: 

1883 return response_headers['x-amz-bucket-region'] 

1884 

1885 # Next, check the error body 

1886 region = service_response.get('Error', {}).get('Region', None) 

1887 if region is not None: 

1888 return region 

1889 

1890 # Finally, HEAD the bucket. No other choice sadly. 

1891 try: 

1892 response = self._client.head_bucket(Bucket=bucket) 

1893 headers = response['ResponseMetadata']['HTTPHeaders'] 

1894 except ClientError as e: 

1895 headers = e.response['ResponseMetadata']['HTTPHeaders'] 

1896 

1897 region = headers.get('x-amz-bucket-region', None) 

1898 return region 

1899 

1900 def set_request_url(self, old_url, new_endpoint, **kwargs): 

1901 """ 

1902 Splice a new endpoint into an existing URL. Note that some endpoints 

1903 from the the endpoint provider have a path component which will be 

1904 discarded by this function. 

1905 """ 

1906 return _get_new_endpoint(old_url, new_endpoint, False) 

1907 

1908 def redirect_from_cache(self, builtins, params, **kwargs): 

1909 """ 

1910 If a bucket name has been redirected before, it is in the cache. This 

1911 handler will update the AWS::Region endpoint resolver builtin param 

1912 to use the region from cache instead of the client region to avoid the 

1913 redirect. 

1914 """ 

1915 bucket = params.get('Bucket') 

1916 if bucket is not None and bucket in self._cache: 

1917 new_region = self._cache.get(bucket) 

1918 builtins['AWS::Region'] = new_region 

1919 

1920 def annotate_request_context(self, params, context, **kwargs): 

1921 """Store the bucket name in context for later use when redirecting. 

1922 The bucket name may be an access point ARN or alias. 

1923 """ 

1924 bucket = params.get('Bucket') 

1925 context['s3_redirect'] = { 

1926 'redirected': False, 

1927 'bucket': bucket, 

1928 'params': params, 

1929 } 

1930 

1931 

1932class S3RegionRedirector: 

1933 """This handler has been replaced by S3RegionRedirectorv2. The original 

1934 version remains in place for any third-party libraries that import it. 

1935 """ 

1936 

1937 def __init__(self, endpoint_bridge, client, cache=None): 

1938 self._endpoint_resolver = endpoint_bridge 

1939 self._cache = cache 

1940 if self._cache is None: 

1941 self._cache = {} 

1942 

1943 # This needs to be a weak ref in order to prevent memory leaks on 

1944 # python 2.6 

1945 self._client = weakref.proxy(client) 

1946 

1947 warnings.warn( 

1948 'The S3RegionRedirector class has been deprecated for a new ' 

1949 'internal replacement. A future version of botocore may remove ' 

1950 'this class.', 

1951 category=FutureWarning, 

1952 ) 

1953 

1954 def register(self, event_emitter=None): 

1955 emitter = event_emitter or self._client.meta.events 

1956 emitter.register('needs-retry.s3', self.redirect_from_error) 

1957 emitter.register('before-call.s3', self.set_request_url) 

1958 emitter.register('before-parameter-build.s3', self.redirect_from_cache) 

1959 

1960 def redirect_from_error(self, request_dict, response, operation, **kwargs): 

1961 """ 

1962 An S3 request sent to the wrong region will return an error that 

1963 contains the endpoint the request should be sent to. This handler 

1964 will add the redirect information to the signing context and then 

1965 redirect the request. 

1966 """ 

1967 if response is None: 

1968 # This could be none if there was a ConnectionError or other 

1969 # transport error. 

1970 return 

1971 

1972 if self._is_s3_accesspoint(request_dict.get('context', {})): 

1973 logger.debug( 

1974 'S3 request was previously to an accesspoint, not redirecting.' 

1975 ) 

1976 return 

1977 

1978 if request_dict.get('context', {}).get('s3_redirected'): 

1979 logger.debug( 

1980 'S3 request was previously redirected, not redirecting.' 

1981 ) 

1982 return 

1983 

1984 error = response[1].get('Error', {}) 

1985 error_code = error.get('Code') 

1986 response_metadata = response[1].get('ResponseMetadata', {}) 

1987 

1988 # We have to account for 400 responses because 

1989 # if we sign a Head* request with the wrong region, 

1990 # we'll get a 400 Bad Request but we won't get a 

1991 # body saying it's an "AuthorizationHeaderMalformed". 

1992 is_special_head_object = ( 

1993 error_code in ('301', '400') and operation.name == 'HeadObject' 

1994 ) 

1995 is_special_head_bucket = ( 

1996 error_code in ('301', '400') 

1997 and operation.name == 'HeadBucket' 

1998 and 'x-amz-bucket-region' 

1999 in response_metadata.get('HTTPHeaders', {}) 

2000 ) 

2001 is_wrong_signing_region = ( 

2002 error_code == 'AuthorizationHeaderMalformed' and 'Region' in error 

2003 ) 

2004 is_redirect_status = response[0] is not None and response[ 

2005 0 

2006 ].status_code in (301, 302, 307) 

2007 is_permanent_redirect = error_code == 'PermanentRedirect' 

2008 if not any( 

2009 [ 

2010 is_special_head_object, 

2011 is_wrong_signing_region, 

2012 is_permanent_redirect, 

2013 is_special_head_bucket, 

2014 is_redirect_status, 

2015 ] 

2016 ): 

2017 return 

2018 

2019 bucket = request_dict['context']['signing']['bucket'] 

2020 client_region = request_dict['context'].get('client_region') 

2021 new_region = self.get_bucket_region(bucket, response) 

2022 

2023 if new_region is None: 

2024 logger.debug( 

2025 f"S3 client configured for region {client_region} but the bucket {bucket} is not " 

2026 "in that region and the proper region could not be " 

2027 "automatically determined." 

2028 ) 

2029 return 

2030 

2031 logger.debug( 

2032 f"S3 client configured for region {client_region} but the bucket {bucket} is in region" 

2033 f" {new_region}; Please configure the proper region to avoid multiple " 

2034 "unnecessary redirects and signing attempts." 

2035 ) 

2036 endpoint = self._endpoint_resolver.resolve('s3', new_region) 

2037 endpoint = endpoint['endpoint_url'] 

2038 

2039 signing_context = { 

2040 'region': new_region, 

2041 'bucket': bucket, 

2042 'endpoint': endpoint, 

2043 } 

2044 request_dict['context']['signing'] = signing_context 

2045 

2046 self._cache[bucket] = signing_context 

2047 self.set_request_url(request_dict, request_dict['context']) 

2048 

2049 request_dict['context']['s3_redirected'] = True 

2050 

2051 # Return 0 so it doesn't wait to retry 

2052 return 0 

2053 

2054 def get_bucket_region(self, bucket, response): 

2055 """ 

2056 There are multiple potential sources for the new region to redirect to, 

2057 but they aren't all universally available for use. This will try to 

2058 find region from response elements, but will fall back to calling 

2059 HEAD on the bucket if all else fails. 

2060 

2061 :param bucket: The bucket to find the region for. This is necessary if 

2062 the region is not available in the error response. 

2063 :param response: A response representing a service request that failed 

2064 due to incorrect region configuration. 

2065 """ 

2066 # First try to source the region from the headers. 

2067 service_response = response[1] 

2068 response_headers = service_response['ResponseMetadata']['HTTPHeaders'] 

2069 if 'x-amz-bucket-region' in response_headers: 

2070 return response_headers['x-amz-bucket-region'] 

2071 

2072 # Next, check the error body 

2073 region = service_response.get('Error', {}).get('Region', None) 

2074 if region is not None: 

2075 return region 

2076 

2077 # Finally, HEAD the bucket. No other choice sadly. 

2078 try: 

2079 response = self._client.head_bucket(Bucket=bucket) 

2080 headers = response['ResponseMetadata']['HTTPHeaders'] 

2081 except ClientError as e: 

2082 headers = e.response['ResponseMetadata']['HTTPHeaders'] 

2083 

2084 region = headers.get('x-amz-bucket-region', None) 

2085 return region 

2086 

2087 def set_request_url(self, params, context, **kwargs): 

2088 endpoint = context.get('signing', {}).get('endpoint', None) 

2089 if endpoint is not None: 

2090 params['url'] = _get_new_endpoint(params['url'], endpoint, False) 

2091 

2092 def redirect_from_cache(self, params, context, **kwargs): 

2093 """ 

2094 This handler retrieves a given bucket's signing context from the cache 

2095 and adds it into the request context. 

2096 """ 

2097 if self._is_s3_accesspoint(context): 

2098 return 

2099 bucket = params.get('Bucket') 

2100 signing_context = self._cache.get(bucket) 

2101 if signing_context is not None: 

2102 context['signing'] = signing_context 

2103 else: 

2104 context['signing'] = {'bucket': bucket} 

2105 

2106 def _is_s3_accesspoint(self, context): 

2107 return 's3_accesspoint' in context 

2108 

2109 

2110class InvalidArnException(ValueError): 

2111 pass 

2112 

2113 

2114class ArnParser: 

2115 def parse_arn(self, arn): 

2116 arn_parts = arn.split(':', 5) 

2117 if len(arn_parts) < 6: 

2118 raise InvalidArnException( 

2119 f'Provided ARN: {arn} must be of the format: ' 

2120 'arn:partition:service:region:account:resource' 

2121 ) 

2122 return { 

2123 'partition': arn_parts[1], 

2124 'service': arn_parts[2], 

2125 'region': arn_parts[3], 

2126 'account': arn_parts[4], 

2127 'resource': arn_parts[5], 

2128 } 

2129 

2130 @staticmethod 

2131 def is_arn(value): 

2132 if not isinstance(value, str) or not value.startswith('arn:'): 

2133 return False 

2134 arn_parser = ArnParser() 

2135 try: 

2136 arn_parser.parse_arn(value) 

2137 return True 

2138 except InvalidArnException: 

2139 return False 

2140 

2141 

2142class S3ArnParamHandler: 

2143 _RESOURCE_REGEX = re.compile( 

2144 r'^(?P<resource_type>accesspoint|outpost)[/:](?P<resource_name>.+)$' 

2145 ) 

2146 _OUTPOST_RESOURCE_REGEX = re.compile( 

2147 r'^(?P<outpost_name>[a-zA-Z0-9\-]{1,63})[/:]accesspoint[/:]' 

2148 r'(?P<accesspoint_name>[a-zA-Z0-9\-]{1,63}$)' 

2149 ) 

2150 _BLACKLISTED_OPERATIONS = ['CreateBucket'] 

2151 

2152 def __init__(self, arn_parser=None): 

2153 self._arn_parser = arn_parser 

2154 if arn_parser is None: 

2155 self._arn_parser = ArnParser() 

2156 

2157 def register(self, event_emitter): 

2158 event_emitter.register('before-parameter-build.s3', self.handle_arn) 

2159 

2160 def handle_arn(self, params, model, context, **kwargs): 

2161 if model.name in self._BLACKLISTED_OPERATIONS: 

2162 return 

2163 arn_details = self._get_arn_details_from_bucket_param(params) 

2164 if arn_details is None: 

2165 return 

2166 if arn_details['resource_type'] == 'accesspoint': 

2167 self._store_accesspoint(params, context, arn_details) 

2168 elif arn_details['resource_type'] == 'outpost': 

2169 self._store_outpost(params, context, arn_details) 

2170 

2171 def _get_arn_details_from_bucket_param(self, params): 

2172 if 'Bucket' in params: 

2173 try: 

2174 arn = params['Bucket'] 

2175 arn_details = self._arn_parser.parse_arn(arn) 

2176 self._add_resource_type_and_name(arn, arn_details) 

2177 return arn_details 

2178 except InvalidArnException: 

2179 pass 

2180 return None 

2181 

2182 def _add_resource_type_and_name(self, arn, arn_details): 

2183 match = self._RESOURCE_REGEX.match(arn_details['resource']) 

2184 if match: 

2185 arn_details['resource_type'] = match.group('resource_type') 

2186 arn_details['resource_name'] = match.group('resource_name') 

2187 else: 

2188 raise UnsupportedS3ArnError(arn=arn) 

2189 

2190 def _store_accesspoint(self, params, context, arn_details): 

2191 # Ideally the access-point would be stored as a parameter in the 

2192 # request where the serializer would then know how to serialize it, 

2193 # but access-points are not modeled in S3 operations so it would fail 

2194 # validation. Instead, we set the access-point to the bucket parameter 

2195 # to have some value set when serializing the request and additional 

2196 # information on the context from the arn to use in forming the 

2197 # access-point endpoint. 

2198 params['Bucket'] = arn_details['resource_name'] 

2199 context['s3_accesspoint'] = { 

2200 'name': arn_details['resource_name'], 

2201 'account': arn_details['account'], 

2202 'partition': arn_details['partition'], 

2203 'region': arn_details['region'], 

2204 'service': arn_details['service'], 

2205 } 

2206 

2207 def _store_outpost(self, params, context, arn_details): 

2208 resource_name = arn_details['resource_name'] 

2209 match = self._OUTPOST_RESOURCE_REGEX.match(resource_name) 

2210 if not match: 

2211 raise UnsupportedOutpostResourceError(resource_name=resource_name) 

2212 # Because we need to set the bucket name to something to pass 

2213 # validation we're going to use the access point name to be consistent 

2214 # with normal access point arns. 

2215 accesspoint_name = match.group('accesspoint_name') 

2216 params['Bucket'] = accesspoint_name 

2217 context['s3_accesspoint'] = { 

2218 'outpost_name': match.group('outpost_name'), 

2219 'name': accesspoint_name, 

2220 'account': arn_details['account'], 

2221 'partition': arn_details['partition'], 

2222 'region': arn_details['region'], 

2223 'service': arn_details['service'], 

2224 } 

2225 

2226 

2227class S3EndpointSetter: 

2228 _DEFAULT_PARTITION = 'aws' 

2229 _DEFAULT_DNS_SUFFIX = 'amazonaws.com' 

2230 

2231 def __init__( 

2232 self, 

2233 endpoint_resolver, 

2234 region=None, 

2235 s3_config=None, 

2236 endpoint_url=None, 

2237 partition=None, 

2238 use_fips_endpoint=False, 

2239 ): 

2240 # This is calling the endpoint_resolver in regions.py 

2241 self._endpoint_resolver = endpoint_resolver 

2242 self._region = region 

2243 self._s3_config = s3_config 

2244 self._use_fips_endpoint = use_fips_endpoint 

2245 if s3_config is None: 

2246 self._s3_config = {} 

2247 self._endpoint_url = endpoint_url 

2248 self._partition = partition 

2249 if partition is None: 

2250 self._partition = self._DEFAULT_PARTITION 

2251 

2252 def register(self, event_emitter): 

2253 event_emitter.register('before-sign.s3', self.set_endpoint) 

2254 event_emitter.register('choose-signer.s3', self.set_signer) 

2255 event_emitter.register( 

2256 'before-call.s3.WriteGetObjectResponse', 

2257 self.update_endpoint_to_s3_object_lambda, 

2258 ) 

2259 

2260 def update_endpoint_to_s3_object_lambda(self, params, context, **kwargs): 

2261 if self._use_accelerate_endpoint: 

2262 raise UnsupportedS3ConfigurationError( 

2263 msg='S3 client does not support accelerate endpoints for S3 Object Lambda operations', 

2264 ) 

2265 

2266 self._override_signing_name(context, 's3-object-lambda') 

2267 if self._endpoint_url: 

2268 # Only update the url if an explicit url was not provided 

2269 return 

2270 

2271 resolver = self._endpoint_resolver 

2272 # Constructing endpoints as s3-object-lambda as region 

2273 resolved = resolver.construct_endpoint( 

2274 's3-object-lambda', self._region 

2275 ) 

2276 

2277 # Ideally we would be able to replace the endpoint before 

2278 # serialization but there's no event to do that currently 

2279 # host_prefix is all the arn/bucket specs 

2280 new_endpoint = 'https://{host_prefix}{hostname}'.format( 

2281 host_prefix=params['host_prefix'], 

2282 hostname=resolved['hostname'], 

2283 ) 

2284 

2285 params['url'] = _get_new_endpoint(params['url'], new_endpoint, False) 

2286 

2287 def set_endpoint(self, request, **kwargs): 

2288 if self._use_accesspoint_endpoint(request): 

2289 self._validate_accesspoint_supported(request) 

2290 self._validate_fips_supported(request) 

2291 self._validate_global_regions(request) 

2292 region_name = self._resolve_region_for_accesspoint_endpoint( 

2293 request 

2294 ) 

2295 self._resolve_signing_name_for_accesspoint_endpoint(request) 

2296 self._switch_to_accesspoint_endpoint(request, region_name) 

2297 return 

2298 if self._use_accelerate_endpoint: 

2299 if self._use_fips_endpoint: 

2300 raise UnsupportedS3ConfigurationError( 

2301 msg=( 

2302 'Client is configured to use the FIPS psuedo region ' 

2303 f'for "{self._region}", but S3 Accelerate does not have any FIPS ' 

2304 'compatible endpoints.' 

2305 ) 

2306 ) 

2307 switch_host_s3_accelerate(request=request, **kwargs) 

2308 if self._s3_addressing_handler: 

2309 self._s3_addressing_handler(request=request, **kwargs) 

2310 

2311 def _use_accesspoint_endpoint(self, request): 

2312 return 's3_accesspoint' in request.context 

2313 

2314 def _validate_fips_supported(self, request): 

2315 if not self._use_fips_endpoint: 

2316 return 

2317 if 'fips' in request.context['s3_accesspoint']['region']: 

2318 raise UnsupportedS3AccesspointConfigurationError( 

2319 msg={'Invalid ARN, FIPS region not allowed in ARN.'} 

2320 ) 

2321 if 'outpost_name' in request.context['s3_accesspoint']: 

2322 raise UnsupportedS3AccesspointConfigurationError( 

2323 msg=( 

2324 f'Client is configured to use the FIPS psuedo-region "{self._region}", ' 

2325 'but outpost ARNs do not support FIPS endpoints.' 

2326 ) 

2327 ) 

2328 # Transforming psuedo region to actual region 

2329 accesspoint_region = request.context['s3_accesspoint']['region'] 

2330 if accesspoint_region != self._region: 

2331 if not self._s3_config.get('use_arn_region', True): 

2332 # TODO: Update message to reflect use_arn_region 

2333 # is not set 

2334 raise UnsupportedS3AccesspointConfigurationError( 

2335 msg=( 

2336 'Client is configured to use the FIPS psuedo-region ' 

2337 f'for "{self._region}", but the access-point ARN provided is for ' 

2338 f'the "{accesspoint_region}" region. For clients using a FIPS ' 

2339 'psuedo-region calls to access-point ARNs in another ' 

2340 'region are not allowed.' 

2341 ) 

2342 ) 

2343 

2344 def _validate_global_regions(self, request): 

2345 if self._s3_config.get('use_arn_region', True): 

2346 return 

2347 if self._region in ['aws-global', 's3-external-1']: 

2348 raise UnsupportedS3AccesspointConfigurationError( 

2349 msg=( 

2350 'Client is configured to use the global psuedo-region ' 

2351 f'"{self._region}". When providing access-point ARNs a regional ' 

2352 'endpoint must be specified.' 

2353 ) 

2354 ) 

2355 

2356 def _validate_accesspoint_supported(self, request): 

2357 if self._use_accelerate_endpoint: 

2358 raise UnsupportedS3AccesspointConfigurationError( 

2359 msg=( 

2360 'Client does not support s3 accelerate configuration ' 

2361 'when an access-point ARN is specified.' 

2362 ) 

2363 ) 

2364 request_partition = request.context['s3_accesspoint']['partition'] 

2365 if request_partition != self._partition: 

2366 raise UnsupportedS3AccesspointConfigurationError( 

2367 msg=( 

2368 f'Client is configured for "{self._partition}" partition, but access-point' 

2369 f' ARN provided is for "{request_partition}" partition. The client and ' 

2370 ' access-point partition must be the same.' 

2371 ) 

2372 ) 

2373 s3_service = request.context['s3_accesspoint'].get('service') 

2374 if s3_service == 's3-object-lambda' and self._s3_config.get( 

2375 'use_dualstack_endpoint' 

2376 ): 

2377 raise UnsupportedS3AccesspointConfigurationError( 

2378 msg=( 

2379 'Client does not support s3 dualstack configuration ' 

2380 'when an S3 Object Lambda access point ARN is specified.' 

2381 ) 

2382 ) 

2383 outpost_name = request.context['s3_accesspoint'].get('outpost_name') 

2384 if outpost_name and self._s3_config.get('use_dualstack_endpoint'): 

2385 raise UnsupportedS3AccesspointConfigurationError( 

2386 msg=( 

2387 'Client does not support s3 dualstack configuration ' 

2388 'when an outpost ARN is specified.' 

2389 ) 

2390 ) 

2391 self._validate_mrap_s3_config(request) 

2392 

2393 def _validate_mrap_s3_config(self, request): 

2394 if not is_global_accesspoint(request.context): 

2395 return 

2396 if self._s3_config.get('s3_disable_multiregion_access_points'): 

2397 raise UnsupportedS3AccesspointConfigurationError( 

2398 msg=( 

2399 'Invalid configuration, Multi-Region Access Point ' 

2400 'ARNs are disabled.' 

2401 ) 

2402 ) 

2403 elif self._s3_config.get('use_dualstack_endpoint'): 

2404 raise UnsupportedS3AccesspointConfigurationError( 

2405 msg=( 

2406 'Client does not support s3 dualstack configuration ' 

2407 'when a Multi-Region Access Point ARN is specified.' 

2408 ) 

2409 ) 

2410 

2411 def _resolve_region_for_accesspoint_endpoint(self, request): 

2412 if is_global_accesspoint(request.context): 

2413 # Requests going to MRAP endpoints MUST be set to any (*) region. 

2414 self._override_signing_region(request, '*') 

2415 elif self._s3_config.get('use_arn_region', True): 

2416 accesspoint_region = request.context['s3_accesspoint']['region'] 

2417 # If we are using the region from the access point, 

2418 # we will also want to make sure that we set it as the 

2419 # signing region as well 

2420 self._override_signing_region(request, accesspoint_region) 

2421 return accesspoint_region 

2422 return self._region 

2423 

2424 def set_signer(self, context, **kwargs): 

2425 if is_global_accesspoint(context): 

2426 if HAS_CRT: 

2427 return 's3v4a' 

2428 else: 

2429 raise MissingDependencyException( 

2430 msg="Using S3 with an MRAP arn requires an additional " 

2431 "dependency. You will need to pip install " 

2432 "botocore[crt] before proceeding." 

2433 ) 

2434 

2435 def _resolve_signing_name_for_accesspoint_endpoint(self, request): 

2436 accesspoint_service = request.context['s3_accesspoint']['service'] 

2437 self._override_signing_name(request.context, accesspoint_service) 

2438 

2439 def _switch_to_accesspoint_endpoint(self, request, region_name): 

2440 original_components = urlsplit(request.url) 

2441 accesspoint_endpoint = urlunsplit( 

2442 ( 

2443 original_components.scheme, 

2444 self._get_netloc(request.context, region_name), 

2445 self._get_accesspoint_path( 

2446 original_components.path, request.context 

2447 ), 

2448 original_components.query, 

2449 '', 

2450 ) 

2451 ) 

2452 logger.debug( 

2453 f'Updating URI from {request.url} to {accesspoint_endpoint}' 

2454 ) 

2455 request.url = accesspoint_endpoint 

2456 

2457 def _get_netloc(self, request_context, region_name): 

2458 if is_global_accesspoint(request_context): 

2459 return self._get_mrap_netloc(request_context) 

2460 else: 

2461 return self._get_accesspoint_netloc(request_context, region_name) 

2462 

2463 def _get_mrap_netloc(self, request_context): 

2464 s3_accesspoint = request_context['s3_accesspoint'] 

2465 region_name = 's3-global' 

2466 mrap_netloc_components = [s3_accesspoint['name']] 

2467 if self._endpoint_url: 

2468 endpoint_url_netloc = urlsplit(self._endpoint_url).netloc 

2469 mrap_netloc_components.append(endpoint_url_netloc) 

2470 else: 

2471 partition = s3_accesspoint['partition'] 

2472 mrap_netloc_components.extend( 

2473 [ 

2474 'accesspoint', 

2475 region_name, 

2476 self._get_partition_dns_suffix(partition), 

2477 ] 

2478 ) 

2479 return '.'.join(mrap_netloc_components) 

2480 

2481 def _get_accesspoint_netloc(self, request_context, region_name): 

2482 s3_accesspoint = request_context['s3_accesspoint'] 

2483 accesspoint_netloc_components = [ 

2484 '{}-{}'.format(s3_accesspoint['name'], s3_accesspoint['account']), 

2485 ] 

2486 outpost_name = s3_accesspoint.get('outpost_name') 

2487 if self._endpoint_url: 

2488 if outpost_name: 

2489 accesspoint_netloc_components.append(outpost_name) 

2490 endpoint_url_netloc = urlsplit(self._endpoint_url).netloc 

2491 accesspoint_netloc_components.append(endpoint_url_netloc) 

2492 else: 

2493 if outpost_name: 

2494 outpost_host = [outpost_name, 's3-outposts'] 

2495 accesspoint_netloc_components.extend(outpost_host) 

2496 elif s3_accesspoint['service'] == 's3-object-lambda': 

2497 component = self._inject_fips_if_needed( 

2498 's3-object-lambda', request_context 

2499 ) 

2500 accesspoint_netloc_components.append(component) 

2501 else: 

2502 component = self._inject_fips_if_needed( 

2503 's3-accesspoint', request_context 

2504 ) 

2505 accesspoint_netloc_components.append(component) 

2506 if self._s3_config.get('use_dualstack_endpoint'): 

2507 accesspoint_netloc_components.append('dualstack') 

2508 accesspoint_netloc_components.extend( 

2509 [region_name, self._get_dns_suffix(region_name)] 

2510 ) 

2511 return '.'.join(accesspoint_netloc_components) 

2512 

2513 def _inject_fips_if_needed(self, component, request_context): 

2514 if self._use_fips_endpoint: 

2515 return f'{component}-fips' 

2516 return component 

2517 

2518 def _get_accesspoint_path(self, original_path, request_context): 

2519 # The Bucket parameter was substituted with the access-point name as 

2520 # some value was required in serializing the bucket name. Now that 

2521 # we are making the request directly to the access point, we will 

2522 # want to remove that access-point name from the path. 

2523 name = request_context['s3_accesspoint']['name'] 

2524 # All S3 operations require at least a / in their path. 

2525 return original_path.replace('/' + name, '', 1) or '/' 

2526 

2527 def _get_partition_dns_suffix(self, partition_name): 

2528 dns_suffix = self._endpoint_resolver.get_partition_dns_suffix( 

2529 partition_name 

2530 ) 

2531 if dns_suffix is None: 

2532 dns_suffix = self._DEFAULT_DNS_SUFFIX 

2533 return dns_suffix 

2534 

2535 def _get_dns_suffix(self, region_name): 

2536 resolved = self._endpoint_resolver.construct_endpoint( 

2537 's3', region_name 

2538 ) 

2539 dns_suffix = self._DEFAULT_DNS_SUFFIX 

2540 if resolved and 'dnsSuffix' in resolved: 

2541 dns_suffix = resolved['dnsSuffix'] 

2542 return dns_suffix 

2543 

2544 def _override_signing_region(self, request, region_name): 

2545 signing_context = request.context.get('signing', {}) 

2546 # S3SigV4Auth will use the context['signing']['region'] value to 

2547 # sign with if present. This is used by the Bucket redirector 

2548 # as well but we should be fine because the redirector is never 

2549 # used in combination with the accesspoint setting logic. 

2550 signing_context['region'] = region_name 

2551 request.context['signing'] = signing_context 

2552 

2553 def _override_signing_name(self, context, signing_name): 

2554 signing_context = context.get('signing', {}) 

2555 # S3SigV4Auth will use the context['signing']['signing_name'] value to 

2556 # sign with if present. This is used by the Bucket redirector 

2557 # as well but we should be fine because the redirector is never 

2558 # used in combination with the accesspoint setting logic. 

2559 signing_context['signing_name'] = signing_name 

2560 context['signing'] = signing_context 

2561 

2562 @CachedProperty 

2563 def _use_accelerate_endpoint(self): 

2564 # Enable accelerate if the configuration is set to to true or the 

2565 # endpoint being used matches one of the accelerate endpoints. 

2566 

2567 # Accelerate has been explicitly configured. 

2568 if self._s3_config.get('use_accelerate_endpoint'): 

2569 return True 

2570 

2571 # Accelerate mode is turned on automatically if an endpoint url is 

2572 # provided that matches the accelerate scheme. 

2573 if self._endpoint_url is None: 

2574 return False 

2575 

2576 # Accelerate is only valid for Amazon endpoints. 

2577 netloc = urlsplit(self._endpoint_url).netloc 

2578 if not netloc.endswith('amazonaws.com'): 

2579 return False 

2580 

2581 # The first part of the url should always be s3-accelerate. 

2582 parts = netloc.split('.') 

2583 if parts[0] != 's3-accelerate': 

2584 return False 

2585 

2586 # Url parts between 's3-accelerate' and 'amazonaws.com' which 

2587 # represent different url features. 

2588 feature_parts = parts[1:-2] 

2589 

2590 # There should be no duplicate url parts. 

2591 if len(feature_parts) != len(set(feature_parts)): 

2592 return False 

2593 

2594 # Remaining parts must all be in the whitelist. 

2595 return all(p in S3_ACCELERATE_WHITELIST for p in feature_parts) 

2596 

2597 @CachedProperty 

2598 def _addressing_style(self): 

2599 # Use virtual host style addressing if accelerate is enabled or if 

2600 # the given endpoint url is an accelerate endpoint. 

2601 if self._use_accelerate_endpoint: 

2602 return 'virtual' 

2603 

2604 # If a particular addressing style is configured, use it. 

2605 configured_addressing_style = self._s3_config.get('addressing_style') 

2606 if configured_addressing_style: 

2607 return configured_addressing_style 

2608 

2609 @CachedProperty 

2610 def _s3_addressing_handler(self): 

2611 # If virtual host style was configured, use it regardless of whether 

2612 # or not the bucket looks dns compatible. 

2613 if self._addressing_style == 'virtual': 

2614 logger.debug("Using S3 virtual host style addressing.") 

2615 return switch_to_virtual_host_style 

2616 

2617 # If path style is configured, no additional steps are needed. If 

2618 # endpoint_url was specified, don't default to virtual. We could 

2619 # potentially default provided endpoint urls to virtual hosted 

2620 # style, but for now it is avoided. 

2621 if self._addressing_style == 'path' or self._endpoint_url is not None: 

2622 logger.debug("Using S3 path style addressing.") 

2623 return None 

2624 

2625 logger.debug( 

2626 "Defaulting to S3 virtual host style addressing with " 

2627 "path style addressing fallback." 

2628 ) 

2629 

2630 # By default, try to use virtual style with path fallback. 

2631 return fix_s3_host 

2632 

2633 

2634class S3ControlEndpointSetter: 

2635 _DEFAULT_PARTITION = 'aws' 

2636 _DEFAULT_DNS_SUFFIX = 'amazonaws.com' 

2637 _HOST_LABEL_REGEX = re.compile(r'^[a-zA-Z0-9\-]{1,63}$') 

2638 

2639 def __init__( 

2640 self, 

2641 endpoint_resolver, 

2642 region=None, 

2643 s3_config=None, 

2644 endpoint_url=None, 

2645 partition=None, 

2646 use_fips_endpoint=False, 

2647 ): 

2648 self._endpoint_resolver = endpoint_resolver 

2649 self._region = region 

2650 self._s3_config = s3_config 

2651 self._use_fips_endpoint = use_fips_endpoint 

2652 if s3_config is None: 

2653 self._s3_config = {} 

2654 self._endpoint_url = endpoint_url 

2655 self._partition = partition 

2656 if partition is None: 

2657 self._partition = self._DEFAULT_PARTITION 

2658 

2659 def register(self, event_emitter): 

2660 event_emitter.register('before-sign.s3-control', self.set_endpoint) 

2661 

2662 def set_endpoint(self, request, **kwargs): 

2663 if self._use_endpoint_from_arn_details(request): 

2664 self._validate_endpoint_from_arn_details_supported(request) 

2665 region_name = self._resolve_region_from_arn_details(request) 

2666 self._resolve_signing_name_from_arn_details(request) 

2667 self._resolve_endpoint_from_arn_details(request, region_name) 

2668 self._add_headers_from_arn_details(request) 

2669 elif self._use_endpoint_from_outpost_id(request): 

2670 self._validate_outpost_redirection_valid(request) 

2671 self._override_signing_name(request, 's3-outposts') 

2672 new_netloc = self._construct_outpost_endpoint(self._region) 

2673 self._update_request_netloc(request, new_netloc) 

2674 

2675 def _use_endpoint_from_arn_details(self, request): 

2676 return 'arn_details' in request.context 

2677 

2678 def _use_endpoint_from_outpost_id(self, request): 

2679 return 'outpost_id' in request.context 

2680 

2681 def _validate_endpoint_from_arn_details_supported(self, request): 

2682 if 'fips' in request.context['arn_details']['region']: 

2683 raise UnsupportedS3ControlArnError( 

2684 arn=request.context['arn_details']['original'], 

2685 msg='Invalid ARN, FIPS region not allowed in ARN.', 

2686 ) 

2687 if not self._s3_config.get('use_arn_region', False): 

2688 arn_region = request.context['arn_details']['region'] 

2689 if arn_region != self._region: 

2690 error_msg = ( 

2691 'The use_arn_region configuration is disabled but ' 

2692 f'received arn for "{arn_region}" when the client is configured ' 

2693 f'to use "{self._region}"' 

2694 ) 

2695 raise UnsupportedS3ControlConfigurationError(msg=error_msg) 

2696 request_partion = request.context['arn_details']['partition'] 

2697 if request_partion != self._partition: 

2698 raise UnsupportedS3ControlConfigurationError( 

2699 msg=( 

2700 f'Client is configured for "{self._partition}" partition, but arn ' 

2701 f'provided is for "{request_partion}" partition. The client and ' 

2702 'arn partition must be the same.' 

2703 ) 

2704 ) 

2705 if self._s3_config.get('use_accelerate_endpoint'): 

2706 raise UnsupportedS3ControlConfigurationError( 

2707 msg='S3 control client does not support accelerate endpoints', 

2708 ) 

2709 if 'outpost_name' in request.context['arn_details']: 

2710 self._validate_outpost_redirection_valid(request) 

2711 

2712 def _validate_outpost_redirection_valid(self, request): 

2713 if self._s3_config.get('use_dualstack_endpoint'): 

2714 raise UnsupportedS3ControlConfigurationError( 

2715 msg=( 

2716 'Client does not support s3 dualstack configuration ' 

2717 'when an outpost is specified.' 

2718 ) 

2719 ) 

2720 

2721 def _resolve_region_from_arn_details(self, request): 

2722 if self._s3_config.get('use_arn_region', False): 

2723 arn_region = request.context['arn_details']['region'] 

2724 # If we are using the region from the expanded arn, we will also 

2725 # want to make sure that we set it as the signing region as well 

2726 self._override_signing_region(request, arn_region) 

2727 return arn_region 

2728 return self._region 

2729 

2730 def _resolve_signing_name_from_arn_details(self, request): 

2731 arn_service = request.context['arn_details']['service'] 

2732 self._override_signing_name(request, arn_service) 

2733 return arn_service 

2734 

2735 def _resolve_endpoint_from_arn_details(self, request, region_name): 

2736 new_netloc = self._resolve_netloc_from_arn_details( 

2737 request, region_name 

2738 ) 

2739 self._update_request_netloc(request, new_netloc) 

2740 

2741 def _update_request_netloc(self, request, new_netloc): 

2742 original_components = urlsplit(request.url) 

2743 arn_details_endpoint = urlunsplit( 

2744 ( 

2745 original_components.scheme, 

2746 new_netloc, 

2747 original_components.path, 

2748 original_components.query, 

2749 '', 

2750 ) 

2751 ) 

2752 logger.debug( 

2753 f'Updating URI from {request.url} to {arn_details_endpoint}' 

2754 ) 

2755 request.url = arn_details_endpoint 

2756 

2757 def _resolve_netloc_from_arn_details(self, request, region_name): 

2758 arn_details = request.context['arn_details'] 

2759 if 'outpost_name' in arn_details: 

2760 return self._construct_outpost_endpoint(region_name) 

2761 account = arn_details['account'] 

2762 return self._construct_s3_control_endpoint(region_name, account) 

2763 

2764 def _is_valid_host_label(self, label): 

2765 return self._HOST_LABEL_REGEX.match(label) 

2766 

2767 def _validate_host_labels(self, *labels): 

2768 for label in labels: 

2769 if not self._is_valid_host_label(label): 

2770 raise InvalidHostLabelError(label=label) 

2771 

2772 def _construct_s3_control_endpoint(self, region_name, account): 

2773 self._validate_host_labels(region_name, account) 

2774 if self._endpoint_url: 

2775 endpoint_url_netloc = urlsplit(self._endpoint_url).netloc 

2776 netloc = [account, endpoint_url_netloc] 

2777 else: 

2778 netloc = [ 

2779 account, 

2780 's3-control', 

2781 ] 

2782 self._add_dualstack(netloc) 

2783 dns_suffix = self._get_dns_suffix(region_name) 

2784 netloc.extend([region_name, dns_suffix]) 

2785 return self._construct_netloc(netloc) 

2786 

2787 def _construct_outpost_endpoint(self, region_name): 

2788 self._validate_host_labels(region_name) 

2789 if self._endpoint_url: 

2790 return urlsplit(self._endpoint_url).netloc 

2791 else: 

2792 netloc = [ 

2793 's3-outposts', 

2794 region_name, 

2795 self._get_dns_suffix(region_name), 

2796 ] 

2797 self._add_fips(netloc) 

2798 return self._construct_netloc(netloc) 

2799 

2800 def _construct_netloc(self, netloc): 

2801 return '.'.join(netloc) 

2802 

2803 def _add_fips(self, netloc): 

2804 if self._use_fips_endpoint: 

2805 netloc[0] = netloc[0] + '-fips' 

2806 

2807 def _add_dualstack(self, netloc): 

2808 if self._s3_config.get('use_dualstack_endpoint'): 

2809 netloc.append('dualstack') 

2810 

2811 def _get_dns_suffix(self, region_name): 

2812 resolved = self._endpoint_resolver.construct_endpoint( 

2813 's3', region_name 

2814 ) 

2815 dns_suffix = self._DEFAULT_DNS_SUFFIX 

2816 if resolved and 'dnsSuffix' in resolved: 

2817 dns_suffix = resolved['dnsSuffix'] 

2818 return dns_suffix 

2819 

2820 def _override_signing_region(self, request, region_name): 

2821 signing_context = request.context.get('signing', {}) 

2822 # S3SigV4Auth will use the context['signing']['region'] value to 

2823 # sign with if present. This is used by the Bucket redirector 

2824 # as well but we should be fine because the redirector is never 

2825 # used in combination with the accesspoint setting logic. 

2826 signing_context['region'] = region_name 

2827 request.context['signing'] = signing_context 

2828 

2829 def _override_signing_name(self, request, signing_name): 

2830 signing_context = request.context.get('signing', {}) 

2831 # S3SigV4Auth will use the context['signing']['signing_name'] value to 

2832 # sign with if present. This is used by the Bucket redirector 

2833 # as well but we should be fine because the redirector is never 

2834 # used in combination with the accesspoint setting logic. 

2835 signing_context['signing_name'] = signing_name 

2836 request.context['signing'] = signing_context 

2837 

2838 def _add_headers_from_arn_details(self, request): 

2839 arn_details = request.context['arn_details'] 

2840 outpost_name = arn_details.get('outpost_name') 

2841 if outpost_name: 

2842 self._add_outpost_id_header(request, outpost_name) 

2843 

2844 def _add_outpost_id_header(self, request, outpost_name): 

2845 request.headers['x-amz-outpost-id'] = outpost_name 

2846 

2847 

2848class S3ControlArnParamHandler: 

2849 """This handler has been replaced by S3ControlArnParamHandlerv2. The 

2850 original version remains in place for any third-party importers. 

2851 """ 

2852 

2853 _RESOURCE_SPLIT_REGEX = re.compile(r'[/:]') 

2854 

2855 def __init__(self, arn_parser=None): 

2856 self._arn_parser = arn_parser 

2857 if arn_parser is None: 

2858 self._arn_parser = ArnParser() 

2859 warnings.warn( 

2860 'The S3ControlArnParamHandler class has been deprecated for a new ' 

2861 'internal replacement. A future version of botocore may remove ' 

2862 'this class.', 

2863 category=FutureWarning, 

2864 ) 

2865 

2866 def register(self, event_emitter): 

2867 event_emitter.register( 

2868 'before-parameter-build.s3-control', 

2869 self.handle_arn, 

2870 ) 

2871 

2872 def handle_arn(self, params, model, context, **kwargs): 

2873 if model.name in ('CreateBucket', 'ListRegionalBuckets'): 

2874 # CreateBucket and ListRegionalBuckets are special cases that do 

2875 # not obey ARN based redirection but will redirect based off of the 

2876 # presence of the OutpostId parameter 

2877 self._handle_outpost_id_param(params, model, context) 

2878 else: 

2879 self._handle_name_param(params, model, context) 

2880 self._handle_bucket_param(params, model, context) 

2881 

2882 def _get_arn_details_from_param(self, params, param_name): 

2883 if param_name not in params: 

2884 return None 

2885 try: 

2886 arn = params[param_name] 

2887 arn_details = self._arn_parser.parse_arn(arn) 

2888 arn_details['original'] = arn 

2889 arn_details['resources'] = self._split_resource(arn_details) 

2890 return arn_details 

2891 except InvalidArnException: 

2892 return None 

2893 

2894 def _split_resource(self, arn_details): 

2895 return self._RESOURCE_SPLIT_REGEX.split(arn_details['resource']) 

2896 

2897 def _override_account_id_param(self, params, arn_details): 

2898 account_id = arn_details['account'] 

2899 if 'AccountId' in params and params['AccountId'] != account_id: 

2900 error_msg = ( 

2901 'Account ID in arn does not match the AccountId parameter ' 

2902 'provided: "{}"' 

2903 ).format(params['AccountId']) 

2904 raise UnsupportedS3ControlArnError( 

2905 arn=arn_details['original'], 

2906 msg=error_msg, 

2907 ) 

2908 params['AccountId'] = account_id 

2909 

2910 def _handle_outpost_id_param(self, params, model, context): 

2911 if 'OutpostId' not in params: 

2912 return 

2913 context['outpost_id'] = params['OutpostId'] 

2914 

2915 def _handle_name_param(self, params, model, context): 

2916 # CreateAccessPoint is a special case that does not expand Name 

2917 if model.name == 'CreateAccessPoint': 

2918 return 

2919 arn_details = self._get_arn_details_from_param(params, 'Name') 

2920 if arn_details is None: 

2921 return 

2922 if self._is_outpost_accesspoint(arn_details): 

2923 self._store_outpost_accesspoint(params, context, arn_details) 

2924 else: 

2925 error_msg = 'The Name parameter does not support the provided ARN' 

2926 raise UnsupportedS3ControlArnError( 

2927 arn=arn_details['original'], 

2928 msg=error_msg, 

2929 ) 

2930 

2931 def _is_outpost_accesspoint(self, arn_details): 

2932 if arn_details['service'] != 's3-outposts': 

2933 return False 

2934 resources = arn_details['resources'] 

2935 if len(resources) != 4: 

2936 return False 

2937 # Resource must be of the form outpost/op-123/accesspoint/name 

2938 return resources[0] == 'outpost' and resources[2] == 'accesspoint' 

2939 

2940 def _store_outpost_accesspoint(self, params, context, arn_details): 

2941 self._override_account_id_param(params, arn_details) 

2942 accesspoint_name = arn_details['resources'][3] 

2943 params['Name'] = accesspoint_name 

2944 arn_details['accesspoint_name'] = accesspoint_name 

2945 arn_details['outpost_name'] = arn_details['resources'][1] 

2946 context['arn_details'] = arn_details 

2947 

2948 def _handle_bucket_param(self, params, model, context): 

2949 arn_details = self._get_arn_details_from_param(params, 'Bucket') 

2950 if arn_details is None: 

2951 return 

2952 if self._is_outpost_bucket(arn_details): 

2953 self._store_outpost_bucket(params, context, arn_details) 

2954 else: 

2955 error_msg = ( 

2956 'The Bucket parameter does not support the provided ARN' 

2957 ) 

2958 raise UnsupportedS3ControlArnError( 

2959 arn=arn_details['original'], 

2960 msg=error_msg, 

2961 ) 

2962 

2963 def _is_outpost_bucket(self, arn_details): 

2964 if arn_details['service'] != 's3-outposts': 

2965 return False 

2966 resources = arn_details['resources'] 

2967 if len(resources) != 4: 

2968 return False 

2969 # Resource must be of the form outpost/op-123/bucket/name 

2970 return resources[0] == 'outpost' and resources[2] == 'bucket' 

2971 

2972 def _store_outpost_bucket(self, params, context, arn_details): 

2973 self._override_account_id_param(params, arn_details) 

2974 bucket_name = arn_details['resources'][3] 

2975 params['Bucket'] = bucket_name 

2976 arn_details['bucket_name'] = bucket_name 

2977 arn_details['outpost_name'] = arn_details['resources'][1] 

2978 context['arn_details'] = arn_details 

2979 

2980 

2981class S3ControlArnParamHandlerv2(S3ControlArnParamHandler): 

2982 """Updated version of S3ControlArnParamHandler for use when 

2983 EndpointRulesetResolver is in use for endpoint resolution. 

2984 

2985 This class is considered private and subject to abrupt breaking changes or 

2986 removal without prior announcement. Please do not use it directly. 

2987 """ 

2988 

2989 def __init__(self, arn_parser=None): 

2990 self._arn_parser = arn_parser 

2991 if arn_parser is None: 

2992 self._arn_parser = ArnParser() 

2993 

2994 def register(self, event_emitter): 

2995 event_emitter.register( 

2996 'before-endpoint-resolution.s3-control', 

2997 self.handle_arn, 

2998 ) 

2999 

3000 def _handle_name_param(self, params, model, context): 

3001 # CreateAccessPoint is a special case that does not expand Name 

3002 if model.name == 'CreateAccessPoint': 

3003 return 

3004 arn_details = self._get_arn_details_from_param(params, 'Name') 

3005 if arn_details is None: 

3006 return 

3007 self._raise_for_fips_pseudo_region(arn_details) 

3008 self._raise_for_accelerate_endpoint(context) 

3009 if self._is_outpost_accesspoint(arn_details): 

3010 self._store_outpost_accesspoint(params, context, arn_details) 

3011 else: 

3012 error_msg = 'The Name parameter does not support the provided ARN' 

3013 raise UnsupportedS3ControlArnError( 

3014 arn=arn_details['original'], 

3015 msg=error_msg, 

3016 ) 

3017 

3018 def _store_outpost_accesspoint(self, params, context, arn_details): 

3019 self._override_account_id_param(params, arn_details) 

3020 

3021 def _handle_bucket_param(self, params, model, context): 

3022 arn_details = self._get_arn_details_from_param(params, 'Bucket') 

3023 if arn_details is None: 

3024 return 

3025 self._raise_for_fips_pseudo_region(arn_details) 

3026 self._raise_for_accelerate_endpoint(context) 

3027 if self._is_outpost_bucket(arn_details): 

3028 self._store_outpost_bucket(params, context, arn_details) 

3029 else: 

3030 error_msg = ( 

3031 'The Bucket parameter does not support the provided ARN' 

3032 ) 

3033 raise UnsupportedS3ControlArnError( 

3034 arn=arn_details['original'], 

3035 msg=error_msg, 

3036 ) 

3037 

3038 def _store_outpost_bucket(self, params, context, arn_details): 

3039 self._override_account_id_param(params, arn_details) 

3040 

3041 def _raise_for_fips_pseudo_region(self, arn_details): 

3042 # FIPS pseudo region names cannot be used in ARNs 

3043 arn_region = arn_details['region'] 

3044 if arn_region.startswith('fips-') or arn_region.endswith('fips-'): 

3045 raise UnsupportedS3ControlArnError( 

3046 arn=arn_details['original'], 

3047 msg='Invalid ARN, FIPS region not allowed in ARN.', 

3048 ) 

3049 

3050 def _raise_for_accelerate_endpoint(self, context): 

3051 s3_config = context['client_config'].s3 or {} 

3052 if s3_config.get('use_accelerate_endpoint'): 

3053 raise UnsupportedS3ControlConfigurationError( 

3054 msg='S3 control client does not support accelerate endpoints', 

3055 ) 

3056 

3057 

3058class ContainerMetadataFetcher: 

3059 TIMEOUT_SECONDS = 2 

3060 RETRY_ATTEMPTS = 3 

3061 SLEEP_TIME = 1 

3062 IP_ADDRESS = '169.254.170.2' 

3063 _ALLOWED_HOSTS = [ 

3064 IP_ADDRESS, 

3065 '169.254.170.23', 

3066 'fd00:ec2::23', 

3067 'localhost', 

3068 ] 

3069 

3070 def __init__(self, session=None, sleep=time.sleep): 

3071 if session is None: 

3072 session = botocore.httpsession.URLLib3Session( 

3073 timeout=self.TIMEOUT_SECONDS 

3074 ) 

3075 self._session = session 

3076 self._sleep = sleep 

3077 

3078 def retrieve_full_uri(self, full_url, headers=None): 

3079 """Retrieve JSON metadata from container metadata. 

3080 

3081 :type full_url: str 

3082 :param full_url: The full URL of the metadata service. 

3083 This should include the scheme as well, e.g 

3084 "http://localhost:123/foo" 

3085 

3086 """ 

3087 self._validate_allowed_url(full_url) 

3088 return self._retrieve_credentials(full_url, headers) 

3089 

3090 def _validate_allowed_url(self, full_url): 

3091 parsed = botocore.compat.urlparse(full_url) 

3092 if self._is_loopback_address(parsed.hostname): 

3093 return 

3094 is_whitelisted_host = self._check_if_whitelisted_host(parsed.hostname) 

3095 if not is_whitelisted_host: 

3096 raise ValueError( 

3097 f"Unsupported host '{parsed.hostname}'. Can only retrieve metadata " 

3098 f"from a loopback address or one of these hosts: {', '.join(self._ALLOWED_HOSTS)}" 

3099 ) 

3100 

3101 def _is_loopback_address(self, hostname): 

3102 try: 

3103 ip = ip_address(hostname) 

3104 return ip.is_loopback 

3105 except ValueError: 

3106 return False 

3107 

3108 def _check_if_whitelisted_host(self, host): 

3109 if host in self._ALLOWED_HOSTS: 

3110 return True 

3111 return False 

3112 

3113 def retrieve_uri(self, relative_uri): 

3114 """Retrieve JSON metadata from container metadata. 

3115 

3116 :type relative_uri: str 

3117 :param relative_uri: A relative URI, e.g "/foo/bar?id=123" 

3118 

3119 :return: The parsed JSON response. 

3120 

3121 """ 

3122 full_url = self.full_url(relative_uri) 

3123 return self._retrieve_credentials(full_url) 

3124 

3125 def _retrieve_credentials(self, full_url, extra_headers=None): 

3126 headers = {'Accept': 'application/json'} 

3127 if extra_headers is not None: 

3128 headers.update(extra_headers) 

3129 attempts = 0 

3130 while True: 

3131 try: 

3132 return self._get_response( 

3133 full_url, headers, self.TIMEOUT_SECONDS 

3134 ) 

3135 except MetadataRetrievalError as e: 

3136 logger.debug( 

3137 "Received error when attempting to retrieve " 

3138 "container metadata: %s", 

3139 e, 

3140 exc_info=True, 

3141 ) 

3142 self._sleep(self.SLEEP_TIME) 

3143 attempts += 1 

3144 if attempts >= self.RETRY_ATTEMPTS: 

3145 raise 

3146 

3147 def _get_response(self, full_url, headers, timeout): 

3148 try: 

3149 AWSRequest = botocore.awsrequest.AWSRequest 

3150 request = AWSRequest(method='GET', url=full_url, headers=headers) 

3151 response = self._session.send(request.prepare()) 

3152 response_text = response.content.decode('utf-8') 

3153 if response.status_code != 200: 

3154 raise MetadataRetrievalError( 

3155 error_msg=( 

3156 f"Received non 200 response {response.status_code} " 

3157 f"from container metadata: {response_text}" 

3158 ) 

3159 ) 

3160 try: 

3161 return json.loads(response_text) 

3162 except ValueError: 

3163 error_msg = "Unable to parse JSON returned from container metadata services" 

3164 logger.debug('%s:%s', error_msg, response_text) 

3165 raise MetadataRetrievalError(error_msg=error_msg) 

3166 except RETRYABLE_HTTP_ERRORS as e: 

3167 error_msg = ( 

3168 "Received error when attempting to retrieve " 

3169 f"container metadata: {e}" 

3170 ) 

3171 raise MetadataRetrievalError(error_msg=error_msg) 

3172 

3173 def full_url(self, relative_uri): 

3174 return f'http://{self.IP_ADDRESS}{relative_uri}' 

3175 

3176 

3177def get_environ_proxies(url): 

3178 if should_bypass_proxies(url): 

3179 return {} 

3180 else: 

3181 return getproxies() 

3182 

3183 

3184def should_bypass_proxies(url): 

3185 """ 

3186 Returns whether we should bypass proxies or not. 

3187 """ 

3188 # NOTE: requests allowed for ip/cidr entries in no_proxy env that we don't 

3189 # support current as urllib only checks DNS suffix 

3190 # If the system proxy settings indicate that this URL should be bypassed, 

3191 # don't proxy. 

3192 # The proxy_bypass function is incredibly buggy on OS X in early versions 

3193 # of Python 2.6, so allow this call to fail. Only catch the specific 

3194 # exceptions we've seen, though: this call failing in other ways can reveal 

3195 # legitimate problems. 

3196 try: 

3197 if proxy_bypass(urlparse(url).netloc): 

3198 return True 

3199 except (TypeError, socket.gaierror): 

3200 pass 

3201 

3202 return False 

3203 

3204 

3205def determine_content_length(body): 

3206 # No body, content length of 0 

3207 if not body: 

3208 return 0 

3209 

3210 # Try asking the body for it's length 

3211 try: 

3212 return len(body) 

3213 except (AttributeError, TypeError): 

3214 pass 

3215 

3216 # Try getting the length from a seekable stream 

3217 if hasattr(body, 'seek') and hasattr(body, 'tell'): 

3218 try: 

3219 orig_pos = body.tell() 

3220 body.seek(0, 2) 

3221 end_file_pos = body.tell() 

3222 body.seek(orig_pos) 

3223 return end_file_pos - orig_pos 

3224 except io.UnsupportedOperation: 

3225 # in case when body is, for example, io.BufferedIOBase object 

3226 # it has "seek" method which throws "UnsupportedOperation" 

3227 # exception in such case we want to fall back to "chunked" 

3228 # encoding 

3229 pass 

3230 # Failed to determine the length 

3231 return None 

3232 

3233 

3234def get_encoding_from_headers(headers, default='ISO-8859-1'): 

3235 """Returns encodings from given HTTP Header Dict. 

3236 

3237 :param headers: dictionary to extract encoding from. 

3238 :param default: default encoding if the content-type is text 

3239 """ 

3240 

3241 content_type = headers.get('content-type') 

3242 

3243 if not content_type: 

3244 return None 

3245 

3246 message = email.message.Message() 

3247 message['content-type'] = content_type 

3248 charset = message.get_param("charset") 

3249 

3250 if charset is not None: 

3251 return charset 

3252 

3253 if 'text' in content_type: 

3254 return default 

3255 

3256 

3257def calculate_md5(body, **kwargs): 

3258 """This function has been deprecated, but is kept for backwards compatibility.""" 

3259 if isinstance(body, (bytes, bytearray)): 

3260 binary_md5 = _calculate_md5_from_bytes(body) 

3261 else: 

3262 binary_md5 = _calculate_md5_from_file(body) 

3263 return base64.b64encode(binary_md5).decode('ascii') 

3264 

3265 

3266def _calculate_md5_from_bytes(body_bytes): 

3267 """This function has been deprecated, but is kept for backwards compatibility.""" 

3268 md5 = get_md5(body_bytes, usedforsecurity=False) 

3269 return md5.digest() 

3270 

3271 

3272def _calculate_md5_from_file(fileobj): 

3273 """This function has been deprecated, but is kept for backwards compatibility.""" 

3274 start_position = fileobj.tell() 

3275 md5 = get_md5(usedforsecurity=False) 

3276 for chunk in iter(lambda: fileobj.read(1024 * 1024), b''): 

3277 md5.update(chunk) 

3278 fileobj.seek(start_position) 

3279 return md5.digest() 

3280 

3281 

3282def _is_s3express_request(params): 

3283 endpoint_properties = params.get('context', {}).get( 

3284 'endpoint_properties', {} 

3285 ) 

3286 return endpoint_properties.get('backend') == 'S3Express' 

3287 

3288 

3289def has_checksum_header(params): 

3290 """ 

3291 Checks if a header starting with "x-amz-checksum-" is provided in a request. 

3292 

3293 This function is considered private and subject to abrupt breaking changes or 

3294 removal without prior announcement. Please do not use it directly. 

3295 """ 

3296 headers = params['headers'] 

3297 

3298 # If a header matching the x-amz-checksum-* pattern is present, we 

3299 # assume a checksum has already been provided by the user. 

3300 for header in headers: 

3301 if CHECKSUM_HEADER_PATTERN.match(header): 

3302 return True 

3303 

3304 return False 

3305 

3306 

3307def conditionally_calculate_checksum(params, **kwargs): 

3308 """This function has been deprecated, but is kept for backwards compatibility.""" 

3309 if not has_checksum_header(params): 

3310 conditionally_calculate_md5(params, **kwargs) 

3311 conditionally_enable_crc32(params, **kwargs) 

3312 

3313 

3314def conditionally_enable_crc32(params, **kwargs): 

3315 """This function has been deprecated, but is kept for backwards compatibility.""" 

3316 checksum_context = params.get('context', {}).get('checksum', {}) 

3317 checksum_algorithm = checksum_context.get('request_algorithm') 

3318 if ( 

3319 _is_s3express_request(params) 

3320 and params['body'] is not None 

3321 and checksum_algorithm in (None, "conditional-md5") 

3322 ): 

3323 params['context']['checksum'] = { 

3324 'request_algorithm': { 

3325 'algorithm': 'crc32', 

3326 'in': 'header', 

3327 'name': 'x-amz-checksum-crc32', 

3328 } 

3329 } 

3330 

3331 

3332def conditionally_calculate_md5(params, **kwargs): 

3333 """Only add a Content-MD5 if the system supports it. 

3334 

3335 This function has been deprecated, but is kept for backwards compatibility. 

3336 """ 

3337 body = params['body'] 

3338 checksum_context = params.get('context', {}).get('checksum', {}) 

3339 checksum_algorithm = checksum_context.get('request_algorithm') 

3340 if checksum_algorithm and checksum_algorithm != 'conditional-md5': 

3341 # Skip for requests that will have a flexible checksum applied 

3342 return 

3343 

3344 if has_checksum_header(params): 

3345 # Don't add a new header if one is already available. 

3346 return 

3347 

3348 if _is_s3express_request(params): 

3349 # S3Express doesn't support MD5 

3350 return 

3351 

3352 if MD5_AVAILABLE and body is not None: 

3353 md5_digest = calculate_md5(body, **kwargs) 

3354 params['headers']['Content-MD5'] = md5_digest 

3355 

3356 

3357class FileWebIdentityTokenLoader: 

3358 def __init__(self, web_identity_token_path, _open=open): 

3359 self._web_identity_token_path = web_identity_token_path 

3360 self._open = _open 

3361 

3362 def __call__(self): 

3363 with self._open(self._web_identity_token_path) as token_file: 

3364 return token_file.read() 

3365 

3366 

3367class SSOTokenLoader: 

3368 def __init__(self, cache=None): 

3369 if cache is None: 

3370 cache = {} 

3371 self._cache = cache 

3372 

3373 def _generate_cache_key(self, start_url, session_name): 

3374 input_str = start_url 

3375 if session_name is not None: 

3376 input_str = session_name 

3377 return hashlib.sha1(input_str.encode('utf-8')).hexdigest() 

3378 

3379 def save_token(self, start_url, token, session_name=None): 

3380 cache_key = self._generate_cache_key(start_url, session_name) 

3381 self._cache[cache_key] = token 

3382 

3383 def __call__(self, start_url, session_name=None): 

3384 cache_key = self._generate_cache_key(start_url, session_name) 

3385 logger.debug(f'Checking for cached token at: {cache_key}') 

3386 if cache_key not in self._cache: 

3387 name = start_url 

3388 if session_name is not None: 

3389 name = session_name 

3390 error_msg = f'Token for {name} does not exist' 

3391 raise SSOTokenLoadError(error_msg=error_msg) 

3392 

3393 token = self._cache[cache_key] 

3394 if 'accessToken' not in token or 'expiresAt' not in token: 

3395 error_msg = f'Token for {start_url} is invalid' 

3396 raise SSOTokenLoadError(error_msg=error_msg) 

3397 return token 

3398 

3399 

3400class EventbridgeSignerSetter: 

3401 _DEFAULT_PARTITION = 'aws' 

3402 _DEFAULT_DNS_SUFFIX = 'amazonaws.com' 

3403 

3404 def __init__(self, endpoint_resolver, region=None, endpoint_url=None): 

3405 self._endpoint_resolver = endpoint_resolver 

3406 self._region = region 

3407 self._endpoint_url = endpoint_url 

3408 

3409 def register(self, event_emitter): 

3410 event_emitter.register( 

3411 'before-parameter-build.events.PutEvents', 

3412 self.check_for_global_endpoint, 

3413 ) 

3414 event_emitter.register( 

3415 'before-call.events.PutEvents', self.set_endpoint_url 

3416 ) 

3417 

3418 def set_endpoint_url(self, params, context, **kwargs): 

3419 if 'eventbridge_endpoint' in context: 

3420 endpoint = context['eventbridge_endpoint'] 

3421 logger.debug(f"Rewriting URL from {params['url']} to {endpoint}") 

3422 params['url'] = endpoint 

3423 

3424 def check_for_global_endpoint(self, params, context, **kwargs): 

3425 endpoint = params.get('EndpointId') 

3426 if endpoint is None: 

3427 return 

3428 

3429 if len(endpoint) == 0: 

3430 raise InvalidEndpointConfigurationError( 

3431 msg='EndpointId must not be a zero length string' 

3432 ) 

3433 

3434 if not HAS_CRT: 

3435 raise MissingDependencyException( 

3436 msg="Using EndpointId requires an additional " 

3437 "dependency. You will need to pip install " 

3438 "botocore[crt] before proceeding." 

3439 ) 

3440 

3441 config = context.get('client_config') 

3442 endpoint_variant_tags = None 

3443 if config is not None: 

3444 if config.use_fips_endpoint: 

3445 raise InvalidEndpointConfigurationError( 

3446 msg="FIPS is not supported with EventBridge " 

3447 "multi-region endpoints." 

3448 ) 

3449 if config.use_dualstack_endpoint: 

3450 endpoint_variant_tags = ['dualstack'] 

3451 

3452 if self._endpoint_url is None: 

3453 # Validate endpoint is a valid hostname component 

3454 parts = urlparse(f'https://{endpoint}') 

3455 if parts.hostname != endpoint: 

3456 raise InvalidEndpointConfigurationError( 

3457 msg='EndpointId is not a valid hostname component.' 

3458 ) 

3459 resolved_endpoint = self._get_global_endpoint( 

3460 endpoint, endpoint_variant_tags=endpoint_variant_tags 

3461 ) 

3462 else: 

3463 resolved_endpoint = self._endpoint_url 

3464 

3465 context['eventbridge_endpoint'] = resolved_endpoint 

3466 context['auth_type'] = 'v4a' 

3467 

3468 def _get_global_endpoint(self, endpoint, endpoint_variant_tags=None): 

3469 resolver = self._endpoint_resolver 

3470 

3471 partition = resolver.get_partition_for_region(self._region) 

3472 if partition is None: 

3473 partition = self._DEFAULT_PARTITION 

3474 dns_suffix = resolver.get_partition_dns_suffix( 

3475 partition, endpoint_variant_tags=endpoint_variant_tags 

3476 ) 

3477 if dns_suffix is None: 

3478 dns_suffix = self._DEFAULT_DNS_SUFFIX 

3479 

3480 return f"https://{endpoint}.endpoint.events.{dns_suffix}/" 

3481 

3482 

3483def is_s3_accelerate_url(url): 

3484 """Does the URL match the S3 Accelerate endpoint scheme? 

3485 

3486 Virtual host naming style with bucket names in the netloc part of the URL 

3487 are not allowed by this function. 

3488 """ 

3489 if url is None: 

3490 return False 

3491 

3492 # Accelerate is only valid for Amazon endpoints. 

3493 url_parts = urlsplit(url) 

3494 if not url_parts.netloc.endswith( 

3495 'amazonaws.com' 

3496 ) or url_parts.scheme not in ['https', 'http']: 

3497 return False 

3498 

3499 # The first part of the URL must be s3-accelerate. 

3500 parts = url_parts.netloc.split('.') 

3501 if parts[0] != 's3-accelerate': 

3502 return False 

3503 

3504 # Url parts between 's3-accelerate' and 'amazonaws.com' which 

3505 # represent different url features. 

3506 feature_parts = parts[1:-2] 

3507 

3508 # There should be no duplicate URL parts. 

3509 if len(feature_parts) != len(set(feature_parts)): 

3510 return False 

3511 

3512 # Remaining parts must all be in the whitelist. 

3513 return all(p in S3_ACCELERATE_WHITELIST for p in feature_parts) 

3514 

3515 

3516class JSONFileCache: 

3517 """JSON file cache. 

3518 This provides a dict like interface that stores JSON serializable 

3519 objects. 

3520 The objects are serialized to JSON and stored in a file. These 

3521 values can be retrieved at a later time. 

3522 """ 

3523 

3524 CACHE_DIR = os.path.expanduser(os.path.join('~', '.aws', 'boto', 'cache')) 

3525 

3526 def __init__(self, working_dir=CACHE_DIR, dumps_func=None): 

3527 self._working_dir = working_dir 

3528 if dumps_func is None: 

3529 dumps_func = self._default_dumps 

3530 self._dumps = dumps_func 

3531 

3532 def _default_dumps(self, obj): 

3533 return json.dumps(obj, default=self._serialize_if_needed) 

3534 

3535 def __contains__(self, cache_key): 

3536 actual_key = self._convert_cache_key(cache_key) 

3537 return os.path.isfile(actual_key) 

3538 

3539 def __getitem__(self, cache_key): 

3540 """Retrieve value from a cache key.""" 

3541 actual_key = self._convert_cache_key(cache_key) 

3542 try: 

3543 with open(actual_key) as f: 

3544 return json.load(f) 

3545 except (OSError, ValueError): 

3546 raise KeyError(cache_key) 

3547 

3548 def __delitem__(self, cache_key): 

3549 actual_key = self._convert_cache_key(cache_key) 

3550 try: 

3551 key_path = Path(actual_key) 

3552 key_path.unlink() 

3553 except FileNotFoundError: 

3554 raise KeyError(cache_key) 

3555 

3556 def __setitem__(self, cache_key, value): 

3557 full_key = self._convert_cache_key(cache_key) 

3558 try: 

3559 file_content = self._dumps(value) 

3560 except (TypeError, ValueError): 

3561 raise ValueError( 

3562 f"Value cannot be cached, must be JSON serializable: {value}" 

3563 ) 

3564 if not os.path.isdir(self._working_dir): 

3565 os.makedirs(self._working_dir) 

3566 with os.fdopen( 

3567 os.open(full_key, os.O_WRONLY | os.O_CREAT, 0o600), 'w' 

3568 ) as f: 

3569 f.truncate() 

3570 f.write(file_content) 

3571 

3572 def _convert_cache_key(self, cache_key): 

3573 full_path = os.path.join(self._working_dir, cache_key + '.json') 

3574 return full_path 

3575 

3576 def _serialize_if_needed(self, value, iso=False): 

3577 if isinstance(value, _DatetimeClass): 

3578 if iso: 

3579 return value.isoformat() 

3580 return value.strftime('%Y-%m-%dT%H:%M:%S%Z') 

3581 return value 

3582 

3583 

3584def is_s3express_bucket(bucket): 

3585 if bucket is None: 

3586 return False 

3587 return bucket.endswith('--x-s3') 

3588 

3589 

3590def get_token_from_environment(signing_name, environ=None): 

3591 if not isinstance(signing_name, str) or not signing_name.strip(): 

3592 return None 

3593 

3594 if environ is None: 

3595 environ = os.environ 

3596 env_var = _get_bearer_env_var_name(signing_name) 

3597 return environ.get(env_var) 

3598 

3599 

3600def _get_bearer_env_var_name(signing_name): 

3601 bearer_name = signing_name.replace('-', '_').replace(' ', '_').upper() 

3602 return f"AWS_BEARER_TOKEN_{bearer_name}" 

3603 

3604 

3605# This parameter is not part of the public interface and is subject to abrupt 

3606# breaking changes or removal without prior announcement. 

3607# Mapping of services that have been renamed for backwards compatibility reasons. 

3608# Keys are the previous name that should be allowed, values are the documented 

3609# and preferred client name. 

3610SERVICE_NAME_ALIASES = {'runtime.sagemaker': 'sagemaker-runtime'} 

3611 

3612 

3613# This parameter is not part of the public interface and is subject to abrupt 

3614# breaking changes or removal without prior announcement. 

3615# Mapping to determine the service ID for services that do not use it as the 

3616# model data directory name. The keys are the data directory name and the 

3617# values are the transformed service IDs (lower case and hyphenated). 

3618CLIENT_NAME_TO_HYPHENIZED_SERVICE_ID_OVERRIDES = { 

3619 # Actual service name we use -> Allowed computed service name. 

3620 'apigateway': 'api-gateway', 

3621 'application-autoscaling': 'application-auto-scaling', 

3622 'appmesh': 'app-mesh', 

3623 'autoscaling': 'auto-scaling', 

3624 'autoscaling-plans': 'auto-scaling-plans', 

3625 'ce': 'cost-explorer', 

3626 'cloudhsmv2': 'cloudhsm-v2', 

3627 'cloudsearchdomain': 'cloudsearch-domain', 

3628 'cognito-idp': 'cognito-identity-provider', 

3629 'config': 'config-service', 

3630 'cur': 'cost-and-usage-report-service', 

3631 'datapipeline': 'data-pipeline', 

3632 'directconnect': 'direct-connect', 

3633 'devicefarm': 'device-farm', 

3634 'discovery': 'application-discovery-service', 

3635 'dms': 'database-migration-service', 

3636 'ds': 'directory-service', 

3637 'ds-data': 'directory-service-data', 

3638 'dynamodbstreams': 'dynamodb-streams', 

3639 'elasticbeanstalk': 'elastic-beanstalk', 

3640 'elastictranscoder': 'elastic-transcoder', 

3641 'elb': 'elastic-load-balancing', 

3642 'elbv2': 'elastic-load-balancing-v2', 

3643 'es': 'elasticsearch-service', 

3644 'events': 'eventbridge', 

3645 'globalaccelerator': 'global-accelerator', 

3646 'iot-data': 'iot-data-plane', 

3647 'iot-jobs-data': 'iot-jobs-data-plane', 

3648 'iotevents-data': 'iot-events-data', 

3649 'iotevents': 'iot-events', 

3650 'iotwireless': 'iot-wireless', 

3651 'kinesisanalytics': 'kinesis-analytics', 

3652 'kinesisanalyticsv2': 'kinesis-analytics-v2', 

3653 'kinesisvideo': 'kinesis-video', 

3654 'lex-models': 'lex-model-building-service', 

3655 'lexv2-models': 'lex-models-v2', 

3656 'lex-runtime': 'lex-runtime-service', 

3657 'lexv2-runtime': 'lex-runtime-v2', 

3658 'logs': 'cloudwatch-logs', 

3659 'machinelearning': 'machine-learning', 

3660 'marketplacecommerceanalytics': 'marketplace-commerce-analytics', 

3661 'marketplace-entitlement': 'marketplace-entitlement-service', 

3662 'meteringmarketplace': 'marketplace-metering', 

3663 'mgh': 'migration-hub', 

3664 'sms-voice': 'pinpoint-sms-voice', 

3665 'resourcegroupstaggingapi': 'resource-groups-tagging-api', 

3666 'route53': 'route-53', 

3667 'route53domains': 'route-53-domains', 

3668 's3control': 's3-control', 

3669 'sdb': 'simpledb', 

3670 'secretsmanager': 'secrets-manager', 

3671 'serverlessrepo': 'serverlessapplicationrepository', 

3672 'servicecatalog': 'service-catalog', 

3673 'servicecatalog-appregistry': 'service-catalog-appregistry', 

3674 'stepfunctions': 'sfn', 

3675 'storagegateway': 'storage-gateway', 

3676}