Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/botocore/utils.py: 21%

1690 statements  

« prev     ^ index     » next       coverage.py v7.3.2, created at 2023-12-08 06:51 +0000

1# Copyright 2012-2014 Amazon.com, Inc. or its affiliates. All Rights Reserved. 

2# 

3# Licensed under the Apache License, Version 2.0 (the "License"). You 

4# may not use this file except in compliance with the License. A copy of 

5# the License is located at 

6# 

7# http://aws.amazon.com/apache2.0/ 

8# 

9# or in the "license" file accompanying this file. This file is 

10# distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF 

11# ANY KIND, either express or implied. See the License for the specific 

12# language governing permissions and limitations under the License. 

13import base64 

14import binascii 

15import datetime 

16import email.message 

17import functools 

18import hashlib 

19import io 

20import logging 

21import os 

22import random 

23import re 

24import socket 

25import time 

26import warnings 

27import weakref 

28from datetime import datetime as _DatetimeClass 

29from ipaddress import ip_address 

30from pathlib import Path 

31from urllib.request import getproxies, proxy_bypass 

32 

33import dateutil.parser 

34from dateutil.tz import tzutc 

35from urllib3.exceptions import LocationParseError 

36 

37import botocore 

38import botocore.awsrequest 

39import botocore.httpsession 

40 

41# IP Regexes retained for backwards compatibility 

42from botocore.compat import HEX_PAT # noqa: F401 

43from botocore.compat import IPV4_PAT # noqa: F401 

44from botocore.compat import IPV6_ADDRZ_PAT # noqa: F401 

45from botocore.compat import IPV6_PAT # noqa: F401 

46from botocore.compat import LS32_PAT # noqa: F401 

47from botocore.compat import UNRESERVED_PAT # noqa: F401 

48from botocore.compat import ZONE_ID_PAT # noqa: F401 

49from botocore.compat import ( 

50 HAS_CRT, 

51 IPV4_RE, 

52 IPV6_ADDRZ_RE, 

53 MD5_AVAILABLE, 

54 UNSAFE_URL_CHARS, 

55 OrderedDict, 

56 get_md5, 

57 get_tzinfo_options, 

58 json, 

59 quote, 

60 urlparse, 

61 urlsplit, 

62 urlunsplit, 

63 zip_longest, 

64) 

65from botocore.exceptions import ( 

66 ClientError, 

67 ConfigNotFound, 

68 ConnectionClosedError, 

69 ConnectTimeoutError, 

70 EndpointConnectionError, 

71 HTTPClientError, 

72 InvalidDNSNameError, 

73 InvalidEndpointConfigurationError, 

74 InvalidExpressionError, 

75 InvalidHostLabelError, 

76 InvalidIMDSEndpointError, 

77 InvalidIMDSEndpointModeError, 

78 InvalidRegionError, 

79 MetadataRetrievalError, 

80 MissingDependencyException, 

81 ReadTimeoutError, 

82 SSOTokenLoadError, 

83 UnsupportedOutpostResourceError, 

84 UnsupportedS3AccesspointConfigurationError, 

85 UnsupportedS3ArnError, 

86 UnsupportedS3ConfigurationError, 

87 UnsupportedS3ControlArnError, 

88 UnsupportedS3ControlConfigurationError, 

89) 

90 

91logger = logging.getLogger(__name__) 

92DEFAULT_METADATA_SERVICE_TIMEOUT = 1 

93METADATA_BASE_URL = 'http://169.254.169.254/' 

94METADATA_BASE_URL_IPv6 = 'http://[fd00:ec2::254]/' 

95METADATA_ENDPOINT_MODES = ('ipv4', 'ipv6') 

96 

97# These are chars that do not need to be urlencoded. 

98# Based on rfc2986, section 2.3 

99SAFE_CHARS = '-._~' 

100LABEL_RE = re.compile(r'[a-z0-9][a-z0-9\-]*[a-z0-9]') 

101RETRYABLE_HTTP_ERRORS = ( 

102 ReadTimeoutError, 

103 EndpointConnectionError, 

104 ConnectionClosedError, 

105 ConnectTimeoutError, 

106) 

107S3_ACCELERATE_WHITELIST = ['dualstack'] 

108# In switching events from using service name / endpoint prefix to service 

109# id, we have to preserve compatibility. This maps the instances where either 

110# is different than the transformed service id. 

111EVENT_ALIASES = { 

112 "a4b": "alexa-for-business", 

113 "alexaforbusiness": "alexa-for-business", 

114 "api.mediatailor": "mediatailor", 

115 "api.pricing": "pricing", 

116 "api.sagemaker": "sagemaker", 

117 "apigateway": "api-gateway", 

118 "application-autoscaling": "application-auto-scaling", 

119 "appstream2": "appstream", 

120 "autoscaling": "auto-scaling", 

121 "autoscaling-plans": "auto-scaling-plans", 

122 "ce": "cost-explorer", 

123 "cloudhsmv2": "cloudhsm-v2", 

124 "cloudsearchdomain": "cloudsearch-domain", 

125 "cognito-idp": "cognito-identity-provider", 

126 "config": "config-service", 

127 "cur": "cost-and-usage-report-service", 

128 "data.iot": "iot-data-plane", 

129 "data.jobs.iot": "iot-jobs-data-plane", 

130 "data.mediastore": "mediastore-data", 

131 "datapipeline": "data-pipeline", 

132 "devicefarm": "device-farm", 

133 "devices.iot1click": "iot-1click-devices-service", 

134 "directconnect": "direct-connect", 

135 "discovery": "application-discovery-service", 

136 "dms": "database-migration-service", 

137 "ds": "directory-service", 

138 "dynamodbstreams": "dynamodb-streams", 

139 "elasticbeanstalk": "elastic-beanstalk", 

140 "elasticfilesystem": "efs", 

141 "elasticloadbalancing": "elastic-load-balancing", 

142 "elasticmapreduce": "emr", 

143 "elastictranscoder": "elastic-transcoder", 

144 "elb": "elastic-load-balancing", 

145 "elbv2": "elastic-load-balancing-v2", 

146 "email": "ses", 

147 "entitlement.marketplace": "marketplace-entitlement-service", 

148 "es": "elasticsearch-service", 

149 "events": "eventbridge", 

150 "cloudwatch-events": "eventbridge", 

151 "iot-data": "iot-data-plane", 

152 "iot-jobs-data": "iot-jobs-data-plane", 

153 "iot1click-devices": "iot-1click-devices-service", 

154 "iot1click-projects": "iot-1click-projects", 

155 "kinesisanalytics": "kinesis-analytics", 

156 "kinesisvideo": "kinesis-video", 

157 "lex-models": "lex-model-building-service", 

158 "lex-runtime": "lex-runtime-service", 

159 "logs": "cloudwatch-logs", 

160 "machinelearning": "machine-learning", 

161 "marketplace-entitlement": "marketplace-entitlement-service", 

162 "marketplacecommerceanalytics": "marketplace-commerce-analytics", 

163 "metering.marketplace": "marketplace-metering", 

164 "meteringmarketplace": "marketplace-metering", 

165 "mgh": "migration-hub", 

166 "models.lex": "lex-model-building-service", 

167 "monitoring": "cloudwatch", 

168 "mturk-requester": "mturk", 

169 "opsworks-cm": "opsworkscm", 

170 "projects.iot1click": "iot-1click-projects", 

171 "resourcegroupstaggingapi": "resource-groups-tagging-api", 

172 "route53": "route-53", 

173 "route53domains": "route-53-domains", 

174 "runtime.lex": "lex-runtime-service", 

175 "runtime.sagemaker": "sagemaker-runtime", 

176 "sdb": "simpledb", 

177 "secretsmanager": "secrets-manager", 

178 "serverlessrepo": "serverlessapplicationrepository", 

179 "servicecatalog": "service-catalog", 

180 "states": "sfn", 

181 "stepfunctions": "sfn", 

182 "storagegateway": "storage-gateway", 

183 "streams.dynamodb": "dynamodb-streams", 

184 "tagging": "resource-groups-tagging-api", 

185} 

186 

187 

188# This pattern can be used to detect if a header is a flexible checksum header 

189CHECKSUM_HEADER_PATTERN = re.compile( 

190 r'^X-Amz-Checksum-([a-z0-9]*)$', 

191 flags=re.IGNORECASE, 

192) 

193 

194 

195def ensure_boolean(val): 

196 """Ensures a boolean value if a string or boolean is provided 

197 

198 For strings, the value for True/False is case insensitive 

199 """ 

200 if isinstance(val, bool): 

201 return val 

202 elif isinstance(val, str): 

203 return val.lower() == 'true' 

204 else: 

205 return False 

206 

207 

208def resolve_imds_endpoint_mode(session): 

209 """Resolving IMDS endpoint mode to either IPv6 or IPv4. 

210 

211 ec2_metadata_service_endpoint_mode takes precedence over imds_use_ipv6. 

212 """ 

213 endpoint_mode = session.get_config_variable( 

214 'ec2_metadata_service_endpoint_mode' 

215 ) 

216 if endpoint_mode is not None: 

217 lendpoint_mode = endpoint_mode.lower() 

218 if lendpoint_mode not in METADATA_ENDPOINT_MODES: 

219 error_msg_kwargs = { 

220 'mode': endpoint_mode, 

221 'valid_modes': METADATA_ENDPOINT_MODES, 

222 } 

223 raise InvalidIMDSEndpointModeError(**error_msg_kwargs) 

224 return lendpoint_mode 

225 elif session.get_config_variable('imds_use_ipv6'): 

226 return 'ipv6' 

227 return 'ipv4' 

228 

229 

230def is_json_value_header(shape): 

231 """Determines if the provided shape is the special header type jsonvalue. 

232 

233 :type shape: botocore.shape 

234 :param shape: Shape to be inspected for the jsonvalue trait. 

235 

236 :return: True if this type is a jsonvalue, False otherwise 

237 :rtype: Bool 

238 """ 

239 return ( 

240 hasattr(shape, 'serialization') 

241 and shape.serialization.get('jsonvalue', False) 

242 and shape.serialization.get('location') == 'header' 

243 and shape.type_name == 'string' 

244 ) 

245 

246 

247def has_header(header_name, headers): 

248 """Case-insensitive check for header key.""" 

249 if header_name is None: 

250 return False 

251 elif isinstance(headers, botocore.awsrequest.HeadersDict): 

252 return header_name in headers 

253 else: 

254 return header_name.lower() in [key.lower() for key in headers.keys()] 

255 

256 

257def get_service_module_name(service_model): 

258 """Returns the module name for a service 

259 

260 This is the value used in both the documentation and client class name 

261 """ 

262 name = service_model.metadata.get( 

263 'serviceAbbreviation', 

264 service_model.metadata.get( 

265 'serviceFullName', service_model.service_name 

266 ), 

267 ) 

268 name = name.replace('Amazon', '') 

269 name = name.replace('AWS', '') 

270 name = re.sub(r'\W+', '', name) 

271 return name 

272 

273 

274def normalize_url_path(path): 

275 if not path: 

276 return '/' 

277 return remove_dot_segments(path) 

278 

279 

280def normalize_boolean(val): 

281 """Returns None if val is None, otherwise ensure value 

282 converted to boolean""" 

283 if val is None: 

284 return val 

285 else: 

286 return ensure_boolean(val) 

287 

288 

289def remove_dot_segments(url): 

290 # RFC 3986, section 5.2.4 "Remove Dot Segments" 

291 # Also, AWS services require consecutive slashes to be removed, 

292 # so that's done here as well 

293 if not url: 

294 return '' 

295 input_url = url.split('/') 

296 output_list = [] 

297 for x in input_url: 

298 if x and x != '.': 

299 if x == '..': 

300 if output_list: 

301 output_list.pop() 

302 else: 

303 output_list.append(x) 

304 

305 if url[0] == '/': 

306 first = '/' 

307 else: 

308 first = '' 

309 if url[-1] == '/' and output_list: 

310 last = '/' 

311 else: 

312 last = '' 

313 return first + '/'.join(output_list) + last 

314 

315 

316def validate_jmespath_for_set(expression): 

317 # Validates a limited jmespath expression to determine if we can set a 

318 # value based on it. Only works with dotted paths. 

319 if not expression or expression == '.': 

320 raise InvalidExpressionError(expression=expression) 

321 

322 for invalid in ['[', ']', '*']: 

323 if invalid in expression: 

324 raise InvalidExpressionError(expression=expression) 

325 

326 

327def set_value_from_jmespath(source, expression, value, is_first=True): 

328 # This takes a (limited) jmespath-like expression & can set a value based 

329 # on it. 

330 # Limitations: 

331 # * Only handles dotted lookups 

332 # * No offsets/wildcards/slices/etc. 

333 if is_first: 

334 validate_jmespath_for_set(expression) 

335 

336 bits = expression.split('.', 1) 

337 current_key, remainder = bits[0], bits[1] if len(bits) > 1 else '' 

338 

339 if not current_key: 

340 raise InvalidExpressionError(expression=expression) 

341 

342 if remainder: 

343 if current_key not in source: 

344 # We've got something in the expression that's not present in the 

345 # source (new key). If there's any more bits, we'll set the key 

346 # with an empty dictionary. 

347 source[current_key] = {} 

348 

349 return set_value_from_jmespath( 

350 source[current_key], remainder, value, is_first=False 

351 ) 

352 

353 # If we're down to a single key, set it. 

354 source[current_key] = value 

355 

356 

357def is_global_accesspoint(context): 

358 """Determine if request is intended for an MRAP accesspoint.""" 

359 s3_accesspoint = context.get('s3_accesspoint', {}) 

360 is_global = s3_accesspoint.get('region') == '' 

361 return is_global 

362 

363 

364class _RetriesExceededError(Exception): 

365 """Internal exception used when the number of retries are exceeded.""" 

366 

367 pass 

368 

369 

370class BadIMDSRequestError(Exception): 

371 def __init__(self, request): 

372 self.request = request 

373 

374 

375class IMDSFetcher: 

376 _RETRIES_EXCEEDED_ERROR_CLS = _RetriesExceededError 

377 _TOKEN_PATH = 'latest/api/token' 

378 _TOKEN_TTL = '21600' 

379 

380 def __init__( 

381 self, 

382 timeout=DEFAULT_METADATA_SERVICE_TIMEOUT, 

383 num_attempts=1, 

384 base_url=METADATA_BASE_URL, 

385 env=None, 

386 user_agent=None, 

387 config=None, 

388 ): 

389 self._timeout = timeout 

390 self._num_attempts = num_attempts 

391 if config is None: 

392 config = {} 

393 self._base_url = self._select_base_url(base_url, config) 

394 self._config = config 

395 

396 if env is None: 

397 env = os.environ.copy() 

398 self._disabled = ( 

399 env.get('AWS_EC2_METADATA_DISABLED', 'false').lower() == 'true' 

400 ) 

401 self._imds_v1_disabled = config.get('ec2_metadata_v1_disabled') 

402 self._user_agent = user_agent 

403 self._session = botocore.httpsession.URLLib3Session( 

404 timeout=self._timeout, 

405 proxies=get_environ_proxies(self._base_url), 

406 ) 

407 

408 def get_base_url(self): 

409 return self._base_url 

410 

411 def _select_base_url(self, base_url, config): 

412 if config is None: 

413 config = {} 

414 

415 requires_ipv6 = ( 

416 config.get('ec2_metadata_service_endpoint_mode') == 'ipv6' 

417 ) 

418 custom_metadata_endpoint = config.get('ec2_metadata_service_endpoint') 

419 

420 if requires_ipv6 and custom_metadata_endpoint: 

421 logger.warning( 

422 "Custom endpoint and IMDS_USE_IPV6 are both set. Using custom endpoint." 

423 ) 

424 

425 chosen_base_url = None 

426 

427 if base_url != METADATA_BASE_URL: 

428 chosen_base_url = base_url 

429 elif custom_metadata_endpoint: 

430 chosen_base_url = custom_metadata_endpoint 

431 elif requires_ipv6: 

432 chosen_base_url = METADATA_BASE_URL_IPv6 

433 else: 

434 chosen_base_url = METADATA_BASE_URL 

435 

436 logger.debug("IMDS ENDPOINT: %s" % chosen_base_url) 

437 if not is_valid_uri(chosen_base_url): 

438 raise InvalidIMDSEndpointError(endpoint=chosen_base_url) 

439 

440 return chosen_base_url 

441 

442 def _construct_url(self, path): 

443 sep = '' 

444 if self._base_url and not self._base_url.endswith('/'): 

445 sep = '/' 

446 return f'{self._base_url}{sep}{path}' 

447 

448 def _fetch_metadata_token(self): 

449 self._assert_enabled() 

450 url = self._construct_url(self._TOKEN_PATH) 

451 headers = { 

452 'x-aws-ec2-metadata-token-ttl-seconds': self._TOKEN_TTL, 

453 } 

454 self._add_user_agent(headers) 

455 request = botocore.awsrequest.AWSRequest( 

456 method='PUT', url=url, headers=headers 

457 ) 

458 for i in range(self._num_attempts): 

459 try: 

460 response = self._session.send(request.prepare()) 

461 if response.status_code == 200: 

462 return response.text 

463 elif response.status_code in (404, 403, 405): 

464 return None 

465 elif response.status_code in (400,): 

466 raise BadIMDSRequestError(request) 

467 except ReadTimeoutError: 

468 return None 

469 except RETRYABLE_HTTP_ERRORS as e: 

470 logger.debug( 

471 "Caught retryable HTTP exception while making metadata " 

472 "service request to %s: %s", 

473 url, 

474 e, 

475 exc_info=True, 

476 ) 

477 except HTTPClientError as e: 

478 if isinstance(e.kwargs.get('error'), LocationParseError): 

479 raise InvalidIMDSEndpointError(endpoint=url, error=e) 

480 else: 

481 raise 

482 return None 

483 

484 def _get_request(self, url_path, retry_func, token=None): 

485 """Make a get request to the Instance Metadata Service. 

486 

487 :type url_path: str 

488 :param url_path: The path component of the URL to make a get request. 

489 This arg is appended to the base_url that was provided in the 

490 initializer. 

491 

492 :type retry_func: callable 

493 :param retry_func: A function that takes the response as an argument 

494 and determines if it needs to retry. By default empty and non 

495 200 OK responses are retried. 

496 

497 :type token: str 

498 :param token: Metadata token to send along with GET requests to IMDS. 

499 """ 

500 self._assert_enabled() 

501 if not token: 

502 self._assert_v1_enabled() 

503 if retry_func is None: 

504 retry_func = self._default_retry 

505 url = self._construct_url(url_path) 

506 headers = {} 

507 if token is not None: 

508 headers['x-aws-ec2-metadata-token'] = token 

509 self._add_user_agent(headers) 

510 for i in range(self._num_attempts): 

511 try: 

512 request = botocore.awsrequest.AWSRequest( 

513 method='GET', url=url, headers=headers 

514 ) 

515 response = self._session.send(request.prepare()) 

516 if not retry_func(response): 

517 return response 

518 except RETRYABLE_HTTP_ERRORS as e: 

519 logger.debug( 

520 "Caught retryable HTTP exception while making metadata " 

521 "service request to %s: %s", 

522 url, 

523 e, 

524 exc_info=True, 

525 ) 

526 raise self._RETRIES_EXCEEDED_ERROR_CLS() 

527 

528 def _add_user_agent(self, headers): 

529 if self._user_agent is not None: 

530 headers['User-Agent'] = self._user_agent 

531 

532 def _assert_enabled(self): 

533 if self._disabled: 

534 logger.debug("Access to EC2 metadata has been disabled.") 

535 raise self._RETRIES_EXCEEDED_ERROR_CLS() 

536 

537 def _assert_v1_enabled(self): 

538 if self._imds_v1_disabled: 

539 raise MetadataRetrievalError( 

540 error_msg="Unable to retrieve token for use in IMDSv2 call and IMDSv1 has been disabled" 

541 ) 

542 

543 def _default_retry(self, response): 

544 return self._is_non_ok_response(response) or self._is_empty(response) 

545 

546 def _is_non_ok_response(self, response): 

547 if response.status_code != 200: 

548 self._log_imds_response(response, 'non-200', log_body=True) 

549 return True 

550 return False 

551 

552 def _is_empty(self, response): 

553 if not response.content: 

554 self._log_imds_response(response, 'no body', log_body=True) 

555 return True 

556 return False 

557 

558 def _log_imds_response(self, response, reason_to_log, log_body=False): 

559 statement = ( 

560 "Metadata service returned %s response " 

561 "with status code of %s for url: %s" 

562 ) 

563 logger_args = [reason_to_log, response.status_code, response.url] 

564 if log_body: 

565 statement += ", content body: %s" 

566 logger_args.append(response.content) 

567 logger.debug(statement, *logger_args) 

568 

569 

570class InstanceMetadataFetcher(IMDSFetcher): 

571 _URL_PATH = 'latest/meta-data/iam/security-credentials/' 

572 _REQUIRED_CREDENTIAL_FIELDS = [ 

573 'AccessKeyId', 

574 'SecretAccessKey', 

575 'Token', 

576 'Expiration', 

577 ] 

578 

579 def retrieve_iam_role_credentials(self): 

580 try: 

581 token = self._fetch_metadata_token() 

582 role_name = self._get_iam_role(token) 

583 credentials = self._get_credentials(role_name, token) 

584 if self._contains_all_credential_fields(credentials): 

585 credentials = { 

586 'role_name': role_name, 

587 'access_key': credentials['AccessKeyId'], 

588 'secret_key': credentials['SecretAccessKey'], 

589 'token': credentials['Token'], 

590 'expiry_time': credentials['Expiration'], 

591 } 

592 self._evaluate_expiration(credentials) 

593 return credentials 

594 else: 

595 # IMDS can return a 200 response that has a JSON formatted 

596 # error message (i.e. if ec2 is not trusted entity for the 

597 # attached role). We do not necessarily want to retry for 

598 # these and we also do not necessarily want to raise a key 

599 # error. So at least log the problematic response and return 

600 # an empty dictionary to signal that it was not able to 

601 # retrieve credentials. These error will contain both a 

602 # Code and Message key. 

603 if 'Code' in credentials and 'Message' in credentials: 

604 logger.debug( 

605 'Error response received when retrieving' 

606 'credentials: %s.', 

607 credentials, 

608 ) 

609 return {} 

610 except self._RETRIES_EXCEEDED_ERROR_CLS: 

611 logger.debug( 

612 "Max number of attempts exceeded (%s) when " 

613 "attempting to retrieve data from metadata service.", 

614 self._num_attempts, 

615 ) 

616 except BadIMDSRequestError as e: 

617 logger.debug("Bad IMDS request: %s", e.request) 

618 return {} 

619 

620 def _get_iam_role(self, token=None): 

621 return self._get_request( 

622 url_path=self._URL_PATH, 

623 retry_func=self._needs_retry_for_role_name, 

624 token=token, 

625 ).text 

626 

627 def _get_credentials(self, role_name, token=None): 

628 r = self._get_request( 

629 url_path=self._URL_PATH + role_name, 

630 retry_func=self._needs_retry_for_credentials, 

631 token=token, 

632 ) 

633 return json.loads(r.text) 

634 

635 def _is_invalid_json(self, response): 

636 try: 

637 json.loads(response.text) 

638 return False 

639 except ValueError: 

640 self._log_imds_response(response, 'invalid json') 

641 return True 

642 

643 def _needs_retry_for_role_name(self, response): 

644 return self._is_non_ok_response(response) or self._is_empty(response) 

645 

646 def _needs_retry_for_credentials(self, response): 

647 return ( 

648 self._is_non_ok_response(response) 

649 or self._is_empty(response) 

650 or self._is_invalid_json(response) 

651 ) 

652 

653 def _contains_all_credential_fields(self, credentials): 

654 for field in self._REQUIRED_CREDENTIAL_FIELDS: 

655 if field not in credentials: 

656 logger.debug( 

657 'Retrieved credentials is missing required field: %s', 

658 field, 

659 ) 

660 return False 

661 return True 

662 

663 def _evaluate_expiration(self, credentials): 

664 expiration = credentials.get("expiry_time") 

665 if expiration is None: 

666 return 

667 try: 

668 expiration = datetime.datetime.strptime( 

669 expiration, "%Y-%m-%dT%H:%M:%SZ" 

670 ) 

671 refresh_interval = self._config.get( 

672 "ec2_credential_refresh_window", 60 * 10 

673 ) 

674 jitter = random.randint(120, 600) # Between 2 to 10 minutes 

675 refresh_interval_with_jitter = refresh_interval + jitter 

676 current_time = datetime.datetime.utcnow() 

677 refresh_offset = datetime.timedelta( 

678 seconds=refresh_interval_with_jitter 

679 ) 

680 extension_time = expiration - refresh_offset 

681 if current_time >= extension_time: 

682 new_time = current_time + refresh_offset 

683 credentials["expiry_time"] = new_time.strftime( 

684 "%Y-%m-%dT%H:%M:%SZ" 

685 ) 

686 logger.info( 

687 f"Attempting credential expiration extension due to a " 

688 f"credential service availability issue. A refresh of " 

689 f"these credentials will be attempted again within " 

690 f"the next {refresh_interval_with_jitter/60:.0f} minutes." 

691 ) 

692 except ValueError: 

693 logger.debug( 

694 f"Unable to parse expiry_time in {credentials['expiry_time']}" 

695 ) 

696 

697 

698class IMDSRegionProvider: 

699 def __init__(self, session, environ=None, fetcher=None): 

700 """Initialize IMDSRegionProvider. 

701 :type session: :class:`botocore.session.Session` 

702 :param session: The session is needed to look up configuration for 

703 how to contact the instance metadata service. Specifically the 

704 whether or not it should use the IMDS region at all, and if so how 

705 to configure the timeout and number of attempts to reach the 

706 service. 

707 :type environ: None or dict 

708 :param environ: A dictionary of environment variables to use. If 

709 ``None`` is the argument then ``os.environ`` will be used by 

710 default. 

711 :type fecther: :class:`botocore.utils.InstanceMetadataRegionFetcher` 

712 :param fetcher: The class to actually handle the fetching of the region 

713 from the IMDS. If not provided a default one will be created. 

714 """ 

715 self._session = session 

716 if environ is None: 

717 environ = os.environ 

718 self._environ = environ 

719 self._fetcher = fetcher 

720 

721 def provide(self): 

722 """Provide the region value from IMDS.""" 

723 instance_region = self._get_instance_metadata_region() 

724 return instance_region 

725 

726 def _get_instance_metadata_region(self): 

727 fetcher = self._get_fetcher() 

728 region = fetcher.retrieve_region() 

729 return region 

730 

731 def _get_fetcher(self): 

732 if self._fetcher is None: 

733 self._fetcher = self._create_fetcher() 

734 return self._fetcher 

735 

736 def _create_fetcher(self): 

737 metadata_timeout = self._session.get_config_variable( 

738 'metadata_service_timeout' 

739 ) 

740 metadata_num_attempts = self._session.get_config_variable( 

741 'metadata_service_num_attempts' 

742 ) 

743 imds_config = { 

744 'ec2_metadata_service_endpoint': self._session.get_config_variable( 

745 'ec2_metadata_service_endpoint' 

746 ), 

747 'ec2_metadata_service_endpoint_mode': resolve_imds_endpoint_mode( 

748 self._session 

749 ), 

750 'ec2_metadata_v1_disabled': self._session.get_config_variable( 

751 'ec2_metadata_v1_disabled' 

752 ), 

753 } 

754 fetcher = InstanceMetadataRegionFetcher( 

755 timeout=metadata_timeout, 

756 num_attempts=metadata_num_attempts, 

757 env=self._environ, 

758 user_agent=self._session.user_agent(), 

759 config=imds_config, 

760 ) 

761 return fetcher 

762 

763 

764class InstanceMetadataRegionFetcher(IMDSFetcher): 

765 _URL_PATH = 'latest/meta-data/placement/availability-zone/' 

766 

767 def retrieve_region(self): 

768 """Get the current region from the instance metadata service. 

769 :rvalue: str 

770 :returns: The region the current instance is running in or None 

771 if the instance metadata service cannot be contacted or does not 

772 give a valid response. 

773 :rtype: None or str 

774 :returns: Returns the region as a string if it is configured to use 

775 IMDS as a region source. Otherwise returns ``None``. It will also 

776 return ``None`` if it fails to get the region from IMDS due to 

777 exhausting its retries or not being able to connect. 

778 """ 

779 try: 

780 region = self._get_region() 

781 return region 

782 except self._RETRIES_EXCEEDED_ERROR_CLS: 

783 logger.debug( 

784 "Max number of attempts exceeded (%s) when " 

785 "attempting to retrieve data from metadata service.", 

786 self._num_attempts, 

787 ) 

788 return None 

789 

790 def _get_region(self): 

791 token = self._fetch_metadata_token() 

792 response = self._get_request( 

793 url_path=self._URL_PATH, 

794 retry_func=self._default_retry, 

795 token=token, 

796 ) 

797 availability_zone = response.text 

798 region = availability_zone[:-1] 

799 return region 

800 

801 

802def merge_dicts(dict1, dict2, append_lists=False): 

803 """Given two dict, merge the second dict into the first. 

804 

805 The dicts can have arbitrary nesting. 

806 

807 :param append_lists: If true, instead of clobbering a list with the new 

808 value, append all of the new values onto the original list. 

809 """ 

810 for key in dict2: 

811 if isinstance(dict2[key], dict): 

812 if key in dict1 and key in dict2: 

813 merge_dicts(dict1[key], dict2[key]) 

814 else: 

815 dict1[key] = dict2[key] 

816 # If the value is a list and the ``append_lists`` flag is set, 

817 # append the new values onto the original list 

818 elif isinstance(dict2[key], list) and append_lists: 

819 # The value in dict1 must be a list in order to append new 

820 # values onto it. 

821 if key in dict1 and isinstance(dict1[key], list): 

822 dict1[key].extend(dict2[key]) 

823 else: 

824 dict1[key] = dict2[key] 

825 else: 

826 # At scalar types, we iterate and merge the 

827 # current dict that we're on. 

828 dict1[key] = dict2[key] 

829 

830 

831def lowercase_dict(original): 

832 """Copies the given dictionary ensuring all keys are lowercase strings.""" 

833 copy = {} 

834 for key in original: 

835 copy[key.lower()] = original[key] 

836 return copy 

837 

838 

839def parse_key_val_file(filename, _open=open): 

840 try: 

841 with _open(filename) as f: 

842 contents = f.read() 

843 return parse_key_val_file_contents(contents) 

844 except OSError: 

845 raise ConfigNotFound(path=filename) 

846 

847 

848def parse_key_val_file_contents(contents): 

849 # This was originally extracted from the EC2 credential provider, which was 

850 # fairly lenient in its parsing. We only try to parse key/val pairs if 

851 # there's a '=' in the line. 

852 final = {} 

853 for line in contents.splitlines(): 

854 if '=' not in line: 

855 continue 

856 key, val = line.split('=', 1) 

857 key = key.strip() 

858 val = val.strip() 

859 final[key] = val 

860 return final 

861 

862 

863def percent_encode_sequence(mapping, safe=SAFE_CHARS): 

864 """Urlencode a dict or list into a string. 

865 

866 This is similar to urllib.urlencode except that: 

867 

868 * It uses quote, and not quote_plus 

869 * It has a default list of safe chars that don't need 

870 to be encoded, which matches what AWS services expect. 

871 

872 If any value in the input ``mapping`` is a list type, 

873 then each list element wil be serialized. This is the equivalent 

874 to ``urlencode``'s ``doseq=True`` argument. 

875 

876 This function should be preferred over the stdlib 

877 ``urlencode()`` function. 

878 

879 :param mapping: Either a dict to urlencode or a list of 

880 ``(key, value)`` pairs. 

881 

882 """ 

883 encoded_pairs = [] 

884 if hasattr(mapping, 'items'): 

885 pairs = mapping.items() 

886 else: 

887 pairs = mapping 

888 for key, value in pairs: 

889 if isinstance(value, list): 

890 for element in value: 

891 encoded_pairs.append( 

892 f'{percent_encode(key)}={percent_encode(element)}' 

893 ) 

894 else: 

895 encoded_pairs.append( 

896 f'{percent_encode(key)}={percent_encode(value)}' 

897 ) 

898 return '&'.join(encoded_pairs) 

899 

900 

901def percent_encode(input_str, safe=SAFE_CHARS): 

902 """Urlencodes a string. 

903 

904 Whereas percent_encode_sequence handles taking a dict/sequence and 

905 producing a percent encoded string, this function deals only with 

906 taking a string (not a dict/sequence) and percent encoding it. 

907 

908 If given the binary type, will simply URL encode it. If given the 

909 text type, will produce the binary type by UTF-8 encoding the 

910 text. If given something else, will convert it to the text type 

911 first. 

912 """ 

913 # If its not a binary or text string, make it a text string. 

914 if not isinstance(input_str, (bytes, str)): 

915 input_str = str(input_str) 

916 # If it's not bytes, make it bytes by UTF-8 encoding it. 

917 if not isinstance(input_str, bytes): 

918 input_str = input_str.encode('utf-8') 

919 return quote(input_str, safe=safe) 

920 

921 

922def _epoch_seconds_to_datetime(value, tzinfo): 

923 """Parse numerical epoch timestamps (seconds since 1970) into a 

924 ``datetime.datetime`` in UTC using ``datetime.timedelta``. This is intended 

925 as fallback when ``fromtimestamp`` raises ``OverflowError`` or ``OSError``. 

926 

927 :type value: float or int 

928 :param value: The Unix timestamps as number. 

929 

930 :type tzinfo: callable 

931 :param tzinfo: A ``datetime.tzinfo`` class or compatible callable. 

932 """ 

933 epoch_zero = datetime.datetime(1970, 1, 1, 0, 0, 0, tzinfo=tzutc()) 

934 epoch_zero_localized = epoch_zero.astimezone(tzinfo()) 

935 return epoch_zero_localized + datetime.timedelta(seconds=value) 

936 

937 

938def _parse_timestamp_with_tzinfo(value, tzinfo): 

939 """Parse timestamp with pluggable tzinfo options.""" 

940 if isinstance(value, (int, float)): 

941 # Possibly an epoch time. 

942 return datetime.datetime.fromtimestamp(value, tzinfo()) 

943 else: 

944 try: 

945 return datetime.datetime.fromtimestamp(float(value), tzinfo()) 

946 except (TypeError, ValueError): 

947 pass 

948 try: 

949 # In certain cases, a timestamp marked with GMT can be parsed into a 

950 # different time zone, so here we provide a context which will 

951 # enforce that GMT == UTC. 

952 return dateutil.parser.parse(value, tzinfos={'GMT': tzutc()}) 

953 except (TypeError, ValueError) as e: 

954 raise ValueError(f'Invalid timestamp "{value}": {e}') 

955 

956 

957def parse_timestamp(value): 

958 """Parse a timestamp into a datetime object. 

959 

960 Supported formats: 

961 

962 * iso8601 

963 * rfc822 

964 * epoch (value is an integer) 

965 

966 This will return a ``datetime.datetime`` object. 

967 

968 """ 

969 tzinfo_options = get_tzinfo_options() 

970 for tzinfo in tzinfo_options: 

971 try: 

972 return _parse_timestamp_with_tzinfo(value, tzinfo) 

973 except (OSError, OverflowError) as e: 

974 logger.debug( 

975 'Unable to parse timestamp with "%s" timezone info.', 

976 tzinfo.__name__, 

977 exc_info=e, 

978 ) 

979 # For numeric values attempt fallback to using fromtimestamp-free method. 

980 # From Python's ``datetime.datetime.fromtimestamp`` documentation: "This 

981 # may raise ``OverflowError``, if the timestamp is out of the range of 

982 # values supported by the platform C localtime() function, and ``OSError`` 

983 # on localtime() failure. It's common for this to be restricted to years 

984 # from 1970 through 2038." 

985 try: 

986 numeric_value = float(value) 

987 except (TypeError, ValueError): 

988 pass 

989 else: 

990 try: 

991 for tzinfo in tzinfo_options: 

992 return _epoch_seconds_to_datetime(numeric_value, tzinfo=tzinfo) 

993 except (OSError, OverflowError) as e: 

994 logger.debug( 

995 'Unable to parse timestamp using fallback method with "%s" ' 

996 'timezone info.', 

997 tzinfo.__name__, 

998 exc_info=e, 

999 ) 

1000 raise RuntimeError( 

1001 'Unable to calculate correct timezone offset for "%s"' % value 

1002 ) 

1003 

1004 

1005def parse_to_aware_datetime(value): 

1006 """Converted the passed in value to a datetime object with tzinfo. 

1007 

1008 This function can be used to normalize all timestamp inputs. This 

1009 function accepts a number of different types of inputs, but 

1010 will always return a datetime.datetime object with time zone 

1011 information. 

1012 

1013 The input param ``value`` can be one of several types: 

1014 

1015 * A datetime object (both naive and aware) 

1016 * An integer representing the epoch time (can also be a string 

1017 of the integer, i.e '0', instead of 0). The epoch time is 

1018 considered to be UTC. 

1019 * An iso8601 formatted timestamp. This does not need to be 

1020 a complete timestamp, it can contain just the date portion 

1021 without the time component. 

1022 

1023 The returned value will be a datetime object that will have tzinfo. 

1024 If no timezone info was provided in the input value, then UTC is 

1025 assumed, not local time. 

1026 

1027 """ 

1028 # This is a general purpose method that handles several cases of 

1029 # converting the provided value to a string timestamp suitable to be 

1030 # serialized to an http request. It can handle: 

1031 # 1) A datetime.datetime object. 

1032 if isinstance(value, _DatetimeClass): 

1033 datetime_obj = value 

1034 else: 

1035 # 2) A string object that's formatted as a timestamp. 

1036 # We document this as being an iso8601 timestamp, although 

1037 # parse_timestamp is a bit more flexible. 

1038 datetime_obj = parse_timestamp(value) 

1039 if datetime_obj.tzinfo is None: 

1040 # I think a case would be made that if no time zone is provided, 

1041 # we should use the local time. However, to restore backwards 

1042 # compat, the previous behavior was to assume UTC, which is 

1043 # what we're going to do here. 

1044 datetime_obj = datetime_obj.replace(tzinfo=tzutc()) 

1045 else: 

1046 datetime_obj = datetime_obj.astimezone(tzutc()) 

1047 return datetime_obj 

1048 

1049 

1050def datetime2timestamp(dt, default_timezone=None): 

1051 """Calculate the timestamp based on the given datetime instance. 

1052 

1053 :type dt: datetime 

1054 :param dt: A datetime object to be converted into timestamp 

1055 :type default_timezone: tzinfo 

1056 :param default_timezone: If it is provided as None, we treat it as tzutc(). 

1057 But it is only used when dt is a naive datetime. 

1058 :returns: The timestamp 

1059 """ 

1060 epoch = datetime.datetime(1970, 1, 1) 

1061 if dt.tzinfo is None: 

1062 if default_timezone is None: 

1063 default_timezone = tzutc() 

1064 dt = dt.replace(tzinfo=default_timezone) 

1065 d = dt.replace(tzinfo=None) - dt.utcoffset() - epoch 

1066 if hasattr(d, "total_seconds"): 

1067 return d.total_seconds() # Works in Python 3.6+ 

1068 return ( 

1069 d.microseconds + (d.seconds + d.days * 24 * 3600) * 10**6 

1070 ) / 10**6 

1071 

1072 

1073def calculate_sha256(body, as_hex=False): 

1074 """Calculate a sha256 checksum. 

1075 

1076 This method will calculate the sha256 checksum of a file like 

1077 object. Note that this method will iterate through the entire 

1078 file contents. The caller is responsible for ensuring the proper 

1079 starting position of the file and ``seek()``'ing the file back 

1080 to its starting location if other consumers need to read from 

1081 the file like object. 

1082 

1083 :param body: Any file like object. The file must be opened 

1084 in binary mode such that a ``.read()`` call returns bytes. 

1085 :param as_hex: If True, then the hex digest is returned. 

1086 If False, then the digest (as binary bytes) is returned. 

1087 

1088 :returns: The sha256 checksum 

1089 

1090 """ 

1091 checksum = hashlib.sha256() 

1092 for chunk in iter(lambda: body.read(1024 * 1024), b''): 

1093 checksum.update(chunk) 

1094 if as_hex: 

1095 return checksum.hexdigest() 

1096 else: 

1097 return checksum.digest() 

1098 

1099 

1100def calculate_tree_hash(body): 

1101 """Calculate a tree hash checksum. 

1102 

1103 For more information see: 

1104 

1105 http://docs.aws.amazon.com/amazonglacier/latest/dev/checksum-calculations.html 

1106 

1107 :param body: Any file like object. This has the same constraints as 

1108 the ``body`` param in calculate_sha256 

1109 

1110 :rtype: str 

1111 :returns: The hex version of the calculated tree hash 

1112 

1113 """ 

1114 chunks = [] 

1115 required_chunk_size = 1024 * 1024 

1116 sha256 = hashlib.sha256 

1117 for chunk in iter(lambda: body.read(required_chunk_size), b''): 

1118 chunks.append(sha256(chunk).digest()) 

1119 if not chunks: 

1120 return sha256(b'').hexdigest() 

1121 while len(chunks) > 1: 

1122 new_chunks = [] 

1123 for first, second in _in_pairs(chunks): 

1124 if second is not None: 

1125 new_chunks.append(sha256(first + second).digest()) 

1126 else: 

1127 # We're at the end of the list and there's no pair left. 

1128 new_chunks.append(first) 

1129 chunks = new_chunks 

1130 return binascii.hexlify(chunks[0]).decode('ascii') 

1131 

1132 

1133def _in_pairs(iterable): 

1134 # Creates iterator that iterates over the list in pairs: 

1135 # for a, b in _in_pairs([0, 1, 2, 3, 4]): 

1136 # print(a, b) 

1137 # 

1138 # will print: 

1139 # 0, 1 

1140 # 2, 3 

1141 # 4, None 

1142 shared_iter = iter(iterable) 

1143 # Note that zip_longest is a compat import that uses 

1144 # the itertools izip_longest. This creates an iterator, 

1145 # this call below does _not_ immediately create the list 

1146 # of pairs. 

1147 return zip_longest(shared_iter, shared_iter) 

1148 

1149 

1150class CachedProperty: 

1151 """A read only property that caches the initially computed value. 

1152 

1153 This descriptor will only call the provided ``fget`` function once. 

1154 Subsequent access to this property will return the cached value. 

1155 

1156 """ 

1157 

1158 def __init__(self, fget): 

1159 self._fget = fget 

1160 

1161 def __get__(self, obj, cls): 

1162 if obj is None: 

1163 return self 

1164 else: 

1165 computed_value = self._fget(obj) 

1166 obj.__dict__[self._fget.__name__] = computed_value 

1167 return computed_value 

1168 

1169 

1170class ArgumentGenerator: 

1171 """Generate sample input based on a shape model. 

1172 

1173 This class contains a ``generate_skeleton`` method that will take 

1174 an input/output shape (created from ``botocore.model``) and generate 

1175 a sample dictionary corresponding to the input/output shape. 

1176 

1177 The specific values used are place holder values. For strings either an 

1178 empty string or the member name can be used, for numbers 0 or 0.0 is used. 

1179 The intended usage of this class is to generate the *shape* of the input 

1180 structure. 

1181 

1182 This can be useful for operations that have complex input shapes. 

1183 This allows a user to just fill in the necessary data instead of 

1184 worrying about the specific structure of the input arguments. 

1185 

1186 Example usage:: 

1187 

1188 s = botocore.session.get_session() 

1189 ddb = s.get_service_model('dynamodb') 

1190 arg_gen = ArgumentGenerator() 

1191 sample_input = arg_gen.generate_skeleton( 

1192 ddb.operation_model('CreateTable').input_shape) 

1193 print("Sample input for dynamodb.CreateTable: %s" % sample_input) 

1194 

1195 """ 

1196 

1197 def __init__(self, use_member_names=False): 

1198 self._use_member_names = use_member_names 

1199 

1200 def generate_skeleton(self, shape): 

1201 """Generate a sample input. 

1202 

1203 :type shape: ``botocore.model.Shape`` 

1204 :param shape: The input shape. 

1205 

1206 :return: The generated skeleton input corresponding to the 

1207 provided input shape. 

1208 

1209 """ 

1210 stack = [] 

1211 return self._generate_skeleton(shape, stack) 

1212 

1213 def _generate_skeleton(self, shape, stack, name=''): 

1214 stack.append(shape.name) 

1215 try: 

1216 if shape.type_name == 'structure': 

1217 return self._generate_type_structure(shape, stack) 

1218 elif shape.type_name == 'list': 

1219 return self._generate_type_list(shape, stack) 

1220 elif shape.type_name == 'map': 

1221 return self._generate_type_map(shape, stack) 

1222 elif shape.type_name == 'string': 

1223 if self._use_member_names: 

1224 return name 

1225 if shape.enum: 

1226 return random.choice(shape.enum) 

1227 return '' 

1228 elif shape.type_name in ['integer', 'long']: 

1229 return 0 

1230 elif shape.type_name in ['float', 'double']: 

1231 return 0.0 

1232 elif shape.type_name == 'boolean': 

1233 return True 

1234 elif shape.type_name == 'timestamp': 

1235 return datetime.datetime(1970, 1, 1, 0, 0, 0) 

1236 finally: 

1237 stack.pop() 

1238 

1239 def _generate_type_structure(self, shape, stack): 

1240 if stack.count(shape.name) > 1: 

1241 return {} 

1242 skeleton = OrderedDict() 

1243 for member_name, member_shape in shape.members.items(): 

1244 skeleton[member_name] = self._generate_skeleton( 

1245 member_shape, stack, name=member_name 

1246 ) 

1247 return skeleton 

1248 

1249 def _generate_type_list(self, shape, stack): 

1250 # For list elements we've arbitrarily decided to 

1251 # return two elements for the skeleton list. 

1252 name = '' 

1253 if self._use_member_names: 

1254 name = shape.member.name 

1255 return [ 

1256 self._generate_skeleton(shape.member, stack, name), 

1257 ] 

1258 

1259 def _generate_type_map(self, shape, stack): 

1260 key_shape = shape.key 

1261 value_shape = shape.value 

1262 assert key_shape.type_name == 'string' 

1263 return OrderedDict( 

1264 [ 

1265 ('KeyName', self._generate_skeleton(value_shape, stack)), 

1266 ] 

1267 ) 

1268 

1269 

1270def is_valid_ipv6_endpoint_url(endpoint_url): 

1271 if UNSAFE_URL_CHARS.intersection(endpoint_url): 

1272 return False 

1273 hostname = f'[{urlparse(endpoint_url).hostname}]' 

1274 return IPV6_ADDRZ_RE.match(hostname) is not None 

1275 

1276 

1277def is_valid_ipv4_endpoint_url(endpoint_url): 

1278 hostname = urlparse(endpoint_url).hostname 

1279 return IPV4_RE.match(hostname) is not None 

1280 

1281 

1282def is_valid_endpoint_url(endpoint_url): 

1283 """Verify the endpoint_url is valid. 

1284 

1285 :type endpoint_url: string 

1286 :param endpoint_url: An endpoint_url. Must have at least a scheme 

1287 and a hostname. 

1288 

1289 :return: True if the endpoint url is valid. False otherwise. 

1290 

1291 """ 

1292 # post-bpo-43882 urlsplit() strips unsafe characters from URL, causing 

1293 # it to pass hostname validation below. Detect them early to fix that. 

1294 if UNSAFE_URL_CHARS.intersection(endpoint_url): 

1295 return False 

1296 parts = urlsplit(endpoint_url) 

1297 hostname = parts.hostname 

1298 if hostname is None: 

1299 return False 

1300 if len(hostname) > 255: 

1301 return False 

1302 if hostname[-1] == ".": 

1303 hostname = hostname[:-1] 

1304 allowed = re.compile( 

1305 r"^((?!-)[A-Z\d-]{1,63}(?<!-)\.)*((?!-)[A-Z\d-]{1,63}(?<!-))$", 

1306 re.IGNORECASE, 

1307 ) 

1308 return allowed.match(hostname) 

1309 

1310 

1311def is_valid_uri(endpoint_url): 

1312 return is_valid_endpoint_url(endpoint_url) or is_valid_ipv6_endpoint_url( 

1313 endpoint_url 

1314 ) 

1315 

1316 

1317def validate_region_name(region_name): 

1318 """Provided region_name must be a valid host label.""" 

1319 if region_name is None: 

1320 return 

1321 valid_host_label = re.compile(r'^(?![0-9]+$)(?!-)[a-zA-Z0-9-]{,63}(?<!-)$') 

1322 valid = valid_host_label.match(region_name) 

1323 if not valid: 

1324 raise InvalidRegionError(region_name=region_name) 

1325 

1326 

1327def check_dns_name(bucket_name): 

1328 """ 

1329 Check to see if the ``bucket_name`` complies with the 

1330 restricted DNS naming conventions necessary to allow 

1331 access via virtual-hosting style. 

1332 

1333 Even though "." characters are perfectly valid in this DNS 

1334 naming scheme, we are going to punt on any name containing a 

1335 "." character because these will cause SSL cert validation 

1336 problems if we try to use virtual-hosting style addressing. 

1337 """ 

1338 if '.' in bucket_name: 

1339 return False 

1340 n = len(bucket_name) 

1341 if n < 3 or n > 63: 

1342 # Wrong length 

1343 return False 

1344 match = LABEL_RE.match(bucket_name) 

1345 if match is None or match.end() != len(bucket_name): 

1346 return False 

1347 return True 

1348 

1349 

1350def fix_s3_host( 

1351 request, 

1352 signature_version, 

1353 region_name, 

1354 default_endpoint_url=None, 

1355 **kwargs, 

1356): 

1357 """ 

1358 This handler looks at S3 requests just before they are signed. 

1359 If there is a bucket name on the path (true for everything except 

1360 ListAllBuckets) it checks to see if that bucket name conforms to 

1361 the DNS naming conventions. If it does, it alters the request to 

1362 use ``virtual hosting`` style addressing rather than ``path-style`` 

1363 addressing. 

1364 

1365 """ 

1366 if request.context.get('use_global_endpoint', False): 

1367 default_endpoint_url = 's3.amazonaws.com' 

1368 try: 

1369 switch_to_virtual_host_style( 

1370 request, signature_version, default_endpoint_url 

1371 ) 

1372 except InvalidDNSNameError as e: 

1373 bucket_name = e.kwargs['bucket_name'] 

1374 logger.debug( 

1375 'Not changing URI, bucket is not DNS compatible: %s', bucket_name 

1376 ) 

1377 

1378 

1379def switch_to_virtual_host_style( 

1380 request, signature_version, default_endpoint_url=None, **kwargs 

1381): 

1382 """ 

1383 This is a handler to force virtual host style s3 addressing no matter 

1384 the signature version (which is taken in consideration for the default 

1385 case). If the bucket is not DNS compatible an InvalidDNSName is thrown. 

1386 

1387 :param request: A AWSRequest object that is about to be sent. 

1388 :param signature_version: The signature version to sign with 

1389 :param default_endpoint_url: The endpoint to use when switching to a 

1390 virtual style. If None is supplied, the virtual host will be 

1391 constructed from the url of the request. 

1392 """ 

1393 if request.auth_path is not None: 

1394 # The auth_path has already been applied (this may be a 

1395 # retried request). We don't need to perform this 

1396 # customization again. 

1397 return 

1398 elif _is_get_bucket_location_request(request): 

1399 # For the GetBucketLocation response, we should not be using 

1400 # the virtual host style addressing so we can avoid any sigv4 

1401 # issues. 

1402 logger.debug( 

1403 "Request is GetBucketLocation operation, not checking " 

1404 "for DNS compatibility." 

1405 ) 

1406 return 

1407 parts = urlsplit(request.url) 

1408 request.auth_path = parts.path 

1409 path_parts = parts.path.split('/') 

1410 

1411 # Retrieve what the endpoint we will be prepending the bucket name to. 

1412 if default_endpoint_url is None: 

1413 default_endpoint_url = parts.netloc 

1414 

1415 if len(path_parts) > 1: 

1416 bucket_name = path_parts[1] 

1417 if not bucket_name: 

1418 # If the bucket name is empty we should not be checking for 

1419 # dns compatibility. 

1420 return 

1421 logger.debug('Checking for DNS compatible bucket for: %s', request.url) 

1422 if check_dns_name(bucket_name): 

1423 # If the operation is on a bucket, the auth_path must be 

1424 # terminated with a '/' character. 

1425 if len(path_parts) == 2: 

1426 if request.auth_path[-1] != '/': 

1427 request.auth_path += '/' 

1428 path_parts.remove(bucket_name) 

1429 # At the very least the path must be a '/', such as with the 

1430 # CreateBucket operation when DNS style is being used. If this 

1431 # is not used you will get an empty path which is incorrect. 

1432 path = '/'.join(path_parts) or '/' 

1433 global_endpoint = default_endpoint_url 

1434 host = bucket_name + '.' + global_endpoint 

1435 new_tuple = (parts.scheme, host, path, parts.query, '') 

1436 new_uri = urlunsplit(new_tuple) 

1437 request.url = new_uri 

1438 logger.debug('URI updated to: %s', new_uri) 

1439 else: 

1440 raise InvalidDNSNameError(bucket_name=bucket_name) 

1441 

1442 

1443def _is_get_bucket_location_request(request): 

1444 return request.url.endswith('?location') 

1445 

1446 

1447def instance_cache(func): 

1448 """Method decorator for caching method calls to a single instance. 

1449 

1450 **This is not a general purpose caching decorator.** 

1451 

1452 In order to use this, you *must* provide an ``_instance_cache`` 

1453 attribute on the instance. 

1454 

1455 This decorator is used to cache method calls. The cache is only 

1456 scoped to a single instance though such that multiple instances 

1457 will maintain their own cache. In order to keep things simple, 

1458 this decorator requires that you provide an ``_instance_cache`` 

1459 attribute on your instance. 

1460 

1461 """ 

1462 func_name = func.__name__ 

1463 

1464 @functools.wraps(func) 

1465 def _cache_guard(self, *args, **kwargs): 

1466 cache_key = (func_name, args) 

1467 if kwargs: 

1468 kwarg_items = tuple(sorted(kwargs.items())) 

1469 cache_key = (func_name, args, kwarg_items) 

1470 result = self._instance_cache.get(cache_key) 

1471 if result is not None: 

1472 return result 

1473 result = func(self, *args, **kwargs) 

1474 self._instance_cache[cache_key] = result 

1475 return result 

1476 

1477 return _cache_guard 

1478 

1479 

1480def lru_cache_weakref(*cache_args, **cache_kwargs): 

1481 """ 

1482 Version of functools.lru_cache that stores a weak reference to ``self``. 

1483 

1484 Serves the same purpose as :py:func:`instance_cache` but uses Python's 

1485 functools implementation which offers ``max_size`` and ``typed`` properties. 

1486 

1487 lru_cache is a global cache even when used on a method. The cache's 

1488 reference to ``self`` will prevent garbace collection of the object. This 

1489 wrapper around functools.lru_cache replaces the reference to ``self`` with 

1490 a weak reference to not interfere with garbage collection. 

1491 """ 

1492 

1493 def wrapper(func): 

1494 @functools.lru_cache(*cache_args, **cache_kwargs) 

1495 def func_with_weakref(weakref_to_self, *args, **kwargs): 

1496 return func(weakref_to_self(), *args, **kwargs) 

1497 

1498 @functools.wraps(func) 

1499 def inner(self, *args, **kwargs): 

1500 return func_with_weakref(weakref.ref(self), *args, **kwargs) 

1501 

1502 inner.cache_info = func_with_weakref.cache_info 

1503 return inner 

1504 

1505 return wrapper 

1506 

1507 

1508def switch_host_s3_accelerate(request, operation_name, **kwargs): 

1509 """Switches the current s3 endpoint with an S3 Accelerate endpoint""" 

1510 

1511 # Note that when registered the switching of the s3 host happens 

1512 # before it gets changed to virtual. So we are not concerned with ensuring 

1513 # that the bucket name is translated to the virtual style here and we 

1514 # can hard code the Accelerate endpoint. 

1515 parts = urlsplit(request.url).netloc.split('.') 

1516 parts = [p for p in parts if p in S3_ACCELERATE_WHITELIST] 

1517 endpoint = 'https://s3-accelerate.' 

1518 if len(parts) > 0: 

1519 endpoint += '.'.join(parts) + '.' 

1520 endpoint += 'amazonaws.com' 

1521 

1522 if operation_name in ['ListBuckets', 'CreateBucket', 'DeleteBucket']: 

1523 return 

1524 _switch_hosts(request, endpoint, use_new_scheme=False) 

1525 

1526 

1527def switch_host_with_param(request, param_name): 

1528 """Switches the host using a parameter value from a JSON request body""" 

1529 request_json = json.loads(request.data.decode('utf-8')) 

1530 if request_json.get(param_name): 

1531 new_endpoint = request_json[param_name] 

1532 _switch_hosts(request, new_endpoint) 

1533 

1534 

1535def _switch_hosts(request, new_endpoint, use_new_scheme=True): 

1536 final_endpoint = _get_new_endpoint( 

1537 request.url, new_endpoint, use_new_scheme 

1538 ) 

1539 request.url = final_endpoint 

1540 

1541 

1542def _get_new_endpoint(original_endpoint, new_endpoint, use_new_scheme=True): 

1543 new_endpoint_components = urlsplit(new_endpoint) 

1544 original_endpoint_components = urlsplit(original_endpoint) 

1545 scheme = original_endpoint_components.scheme 

1546 if use_new_scheme: 

1547 scheme = new_endpoint_components.scheme 

1548 final_endpoint_components = ( 

1549 scheme, 

1550 new_endpoint_components.netloc, 

1551 original_endpoint_components.path, 

1552 original_endpoint_components.query, 

1553 '', 

1554 ) 

1555 final_endpoint = urlunsplit(final_endpoint_components) 

1556 logger.debug(f'Updating URI from {original_endpoint} to {final_endpoint}') 

1557 return final_endpoint 

1558 

1559 

1560def deep_merge(base, extra): 

1561 """Deeply two dictionaries, overriding existing keys in the base. 

1562 

1563 :param base: The base dictionary which will be merged into. 

1564 :param extra: The dictionary to merge into the base. Keys from this 

1565 dictionary will take precedence. 

1566 """ 

1567 for key in extra: 

1568 # If the key represents a dict on both given dicts, merge the sub-dicts 

1569 if ( 

1570 key in base 

1571 and isinstance(base[key], dict) 

1572 and isinstance(extra[key], dict) 

1573 ): 

1574 deep_merge(base[key], extra[key]) 

1575 continue 

1576 

1577 # Otherwise, set the key on the base to be the value of the extra. 

1578 base[key] = extra[key] 

1579 

1580 

1581def hyphenize_service_id(service_id): 

1582 """Translate the form used for event emitters. 

1583 

1584 :param service_id: The service_id to convert. 

1585 """ 

1586 return service_id.replace(' ', '-').lower() 

1587 

1588 

1589class IdentityCache: 

1590 """Base IdentityCache implementation for storing and retrieving 

1591 highly accessed credentials. 

1592 

1593 This class is not intended to be instantiated in user code. 

1594 """ 

1595 

1596 METHOD = "base_identity_cache" 

1597 

1598 def __init__(self, client, credential_cls): 

1599 self._client = client 

1600 self._credential_cls = credential_cls 

1601 

1602 def get_credentials(self, **kwargs): 

1603 callback = self.build_refresh_callback(**kwargs) 

1604 metadata = callback() 

1605 credential_entry = self._credential_cls.create_from_metadata( 

1606 metadata=metadata, 

1607 refresh_using=callback, 

1608 method=self.METHOD, 

1609 advisory_timeout=45, 

1610 mandatory_timeout=10, 

1611 ) 

1612 return credential_entry 

1613 

1614 def build_refresh_callback(**kwargs): 

1615 """Callback to be implemented by subclasses. 

1616 

1617 Returns a set of metadata to be converted into a new 

1618 credential instance. 

1619 """ 

1620 raise NotImplementedError() 

1621 

1622 

1623class S3ExpressIdentityCache(IdentityCache): 

1624 """S3Express IdentityCache for retrieving and storing 

1625 credentials from CreateSession calls. 

1626 

1627 This class is not intended to be instantiated in user code. 

1628 """ 

1629 

1630 METHOD = "s3express" 

1631 

1632 def __init__(self, client, credential_cls): 

1633 self._client = client 

1634 self._credential_cls = credential_cls 

1635 

1636 @functools.lru_cache(maxsize=100) 

1637 def get_credentials(self, bucket): 

1638 return super().get_credentials(bucket=bucket) 

1639 

1640 def build_refresh_callback(self, bucket): 

1641 def refresher(): 

1642 response = self._client.create_session(Bucket=bucket) 

1643 creds = response['Credentials'] 

1644 expiration = self._serialize_if_needed( 

1645 creds['Expiration'], iso=True 

1646 ) 

1647 return { 

1648 "access_key": creds['AccessKeyId'], 

1649 "secret_key": creds['SecretAccessKey'], 

1650 "token": creds['SessionToken'], 

1651 "expiry_time": expiration, 

1652 } 

1653 

1654 return refresher 

1655 

1656 def _serialize_if_needed(self, value, iso=False): 

1657 if isinstance(value, _DatetimeClass): 

1658 if iso: 

1659 return value.isoformat() 

1660 return value.strftime('%Y-%m-%dT%H:%M:%S%Z') 

1661 return value 

1662 

1663 

1664class S3ExpressIdentityResolver: 

1665 def __init__(self, client, credential_cls, cache=None): 

1666 self._client = weakref.proxy(client) 

1667 

1668 if cache is None: 

1669 cache = S3ExpressIdentityCache(self._client, credential_cls) 

1670 self._cache = cache 

1671 

1672 def register(self, event_emitter=None): 

1673 logger.debug('Registering S3Express Identity Resolver') 

1674 emitter = event_emitter or self._client.meta.events 

1675 emitter.register( 

1676 'before-parameter-build.s3', self.inject_signing_cache_key 

1677 ) 

1678 emitter.register('before-call.s3', self.apply_signing_cache_key) 

1679 emitter.register('before-sign.s3', self.resolve_s3express_identity) 

1680 

1681 def inject_signing_cache_key(self, params, context, **kwargs): 

1682 if 'Bucket' in params: 

1683 context['S3Express'] = {'bucket_name': params['Bucket']} 

1684 

1685 def apply_signing_cache_key(self, params, context, **kwargs): 

1686 endpoint_properties = context.get('endpoint_properties', {}) 

1687 backend = endpoint_properties.get('backend', None) 

1688 

1689 # Add cache key if Bucket supplied for s3express request 

1690 bucket_name = context.get('S3Express', {}).get('bucket_name') 

1691 if backend == 'S3Express' and bucket_name is not None: 

1692 context.setdefault('signing', {}) 

1693 context['signing']['cache_key'] = bucket_name 

1694 

1695 def resolve_s3express_identity( 

1696 self, 

1697 request, 

1698 signing_name, 

1699 region_name, 

1700 signature_version, 

1701 request_signer, 

1702 operation_name, 

1703 **kwargs, 

1704 ): 

1705 signing_context = request.context.get('signing', {}) 

1706 signing_name = signing_context.get('signing_name') 

1707 if signing_name == 's3express' and signature_version.startswith( 

1708 'v4-s3express' 

1709 ): 

1710 signing_context['identity_cache'] = self._cache 

1711 if 'cache_key' not in signing_context: 

1712 signing_context['cache_key'] = ( 

1713 request.context.get('s3_redirect', {}) 

1714 .get('params', {}) 

1715 .get('Bucket') 

1716 ) 

1717 

1718 

1719class S3RegionRedirectorv2: 

1720 """Updated version of S3RegionRedirector for use when 

1721 EndpointRulesetResolver is in use for endpoint resolution. 

1722 

1723 This class is considered private and subject to abrupt breaking changes or 

1724 removal without prior announcement. Please do not use it directly. 

1725 """ 

1726 

1727 def __init__(self, endpoint_bridge, client, cache=None): 

1728 self._cache = cache or {} 

1729 self._client = weakref.proxy(client) 

1730 

1731 def register(self, event_emitter=None): 

1732 logger.debug('Registering S3 region redirector handler') 

1733 emitter = event_emitter or self._client.meta.events 

1734 emitter.register('needs-retry.s3', self.redirect_from_error) 

1735 emitter.register( 

1736 'before-parameter-build.s3', self.annotate_request_context 

1737 ) 

1738 emitter.register( 

1739 'before-endpoint-resolution.s3', self.redirect_from_cache 

1740 ) 

1741 

1742 def redirect_from_error(self, request_dict, response, operation, **kwargs): 

1743 """ 

1744 An S3 request sent to the wrong region will return an error that 

1745 contains the endpoint the request should be sent to. This handler 

1746 will add the redirect information to the signing context and then 

1747 redirect the request. 

1748 """ 

1749 if response is None: 

1750 # This could be none if there was a ConnectionError or other 

1751 # transport error. 

1752 return 

1753 

1754 redirect_ctx = request_dict.get('context', {}).get('s3_redirect', {}) 

1755 if ArnParser.is_arn(redirect_ctx.get('bucket')): 

1756 logger.debug( 

1757 'S3 request was previously for an Accesspoint ARN, not ' 

1758 'redirecting.' 

1759 ) 

1760 return 

1761 

1762 if redirect_ctx.get('redirected'): 

1763 logger.debug( 

1764 'S3 request was previously redirected, not redirecting.' 

1765 ) 

1766 return 

1767 

1768 error = response[1].get('Error', {}) 

1769 error_code = error.get('Code') 

1770 response_metadata = response[1].get('ResponseMetadata', {}) 

1771 

1772 # We have to account for 400 responses because 

1773 # if we sign a Head* request with the wrong region, 

1774 # we'll get a 400 Bad Request but we won't get a 

1775 # body saying it's an "AuthorizationHeaderMalformed". 

1776 is_special_head_object = ( 

1777 error_code in ('301', '400') and operation.name == 'HeadObject' 

1778 ) 

1779 is_special_head_bucket = ( 

1780 error_code in ('301', '400') 

1781 and operation.name == 'HeadBucket' 

1782 and 'x-amz-bucket-region' 

1783 in response_metadata.get('HTTPHeaders', {}) 

1784 ) 

1785 is_wrong_signing_region = ( 

1786 error_code == 'AuthorizationHeaderMalformed' and 'Region' in error 

1787 ) 

1788 is_redirect_status = response[0] is not None and response[ 

1789 0 

1790 ].status_code in (301, 302, 307) 

1791 is_permanent_redirect = error_code == 'PermanentRedirect' 

1792 if not any( 

1793 [ 

1794 is_special_head_object, 

1795 is_wrong_signing_region, 

1796 is_permanent_redirect, 

1797 is_special_head_bucket, 

1798 is_redirect_status, 

1799 ] 

1800 ): 

1801 return 

1802 

1803 bucket = request_dict['context']['s3_redirect']['bucket'] 

1804 client_region = request_dict['context'].get('client_region') 

1805 new_region = self.get_bucket_region(bucket, response) 

1806 

1807 if new_region is None: 

1808 logger.debug( 

1809 "S3 client configured for region %s but the bucket %s is not " 

1810 "in that region and the proper region could not be " 

1811 "automatically determined." % (client_region, bucket) 

1812 ) 

1813 return 

1814 

1815 logger.debug( 

1816 "S3 client configured for region %s but the bucket %s is in region" 

1817 " %s; Please configure the proper region to avoid multiple " 

1818 "unnecessary redirects and signing attempts." 

1819 % (client_region, bucket, new_region) 

1820 ) 

1821 # Adding the new region to _cache will make construct_endpoint() to 

1822 # use the new region as value for the AWS::Region builtin parameter. 

1823 self._cache[bucket] = new_region 

1824 

1825 # Re-resolve endpoint with new region and modify request_dict with 

1826 # the new URL, auth scheme, and signing context. 

1827 ep_resolver = self._client._ruleset_resolver 

1828 ep_info = ep_resolver.construct_endpoint( 

1829 operation_model=operation, 

1830 call_args=request_dict['context']['s3_redirect']['params'], 

1831 request_context=request_dict['context'], 

1832 ) 

1833 request_dict['url'] = self.set_request_url( 

1834 request_dict['url'], ep_info.url 

1835 ) 

1836 request_dict['context']['s3_redirect']['redirected'] = True 

1837 auth_schemes = ep_info.properties.get('authSchemes') 

1838 if auth_schemes is not None: 

1839 auth_info = ep_resolver.auth_schemes_to_signing_ctx(auth_schemes) 

1840 auth_type, signing_context = auth_info 

1841 request_dict['context']['auth_type'] = auth_type 

1842 request_dict['context']['signing'] = { 

1843 **request_dict['context'].get('signing', {}), 

1844 **signing_context, 

1845 } 

1846 

1847 # Return 0 so it doesn't wait to retry 

1848 return 0 

1849 

1850 def get_bucket_region(self, bucket, response): 

1851 """ 

1852 There are multiple potential sources for the new region to redirect to, 

1853 but they aren't all universally available for use. This will try to 

1854 find region from response elements, but will fall back to calling 

1855 HEAD on the bucket if all else fails. 

1856 

1857 :param bucket: The bucket to find the region for. This is necessary if 

1858 the region is not available in the error response. 

1859 :param response: A response representing a service request that failed 

1860 due to incorrect region configuration. 

1861 """ 

1862 # First try to source the region from the headers. 

1863 service_response = response[1] 

1864 response_headers = service_response['ResponseMetadata']['HTTPHeaders'] 

1865 if 'x-amz-bucket-region' in response_headers: 

1866 return response_headers['x-amz-bucket-region'] 

1867 

1868 # Next, check the error body 

1869 region = service_response.get('Error', {}).get('Region', None) 

1870 if region is not None: 

1871 return region 

1872 

1873 # Finally, HEAD the bucket. No other choice sadly. 

1874 try: 

1875 response = self._client.head_bucket(Bucket=bucket) 

1876 headers = response['ResponseMetadata']['HTTPHeaders'] 

1877 except ClientError as e: 

1878 headers = e.response['ResponseMetadata']['HTTPHeaders'] 

1879 

1880 region = headers.get('x-amz-bucket-region', None) 

1881 return region 

1882 

1883 def set_request_url(self, old_url, new_endpoint, **kwargs): 

1884 """ 

1885 Splice a new endpoint into an existing URL. Note that some endpoints 

1886 from the the endpoint provider have a path component which will be 

1887 discarded by this function. 

1888 """ 

1889 return _get_new_endpoint(old_url, new_endpoint, False) 

1890 

1891 def redirect_from_cache(self, builtins, params, **kwargs): 

1892 """ 

1893 If a bucket name has been redirected before, it is in the cache. This 

1894 handler will update the AWS::Region endpoint resolver builtin param 

1895 to use the region from cache instead of the client region to avoid the 

1896 redirect. 

1897 """ 

1898 bucket = params.get('Bucket') 

1899 if bucket is not None and bucket in self._cache: 

1900 new_region = self._cache.get(bucket) 

1901 builtins['AWS::Region'] = new_region 

1902 

1903 def annotate_request_context(self, params, context, **kwargs): 

1904 """Store the bucket name in context for later use when redirecting. 

1905 The bucket name may be an access point ARN or alias. 

1906 """ 

1907 bucket = params.get('Bucket') 

1908 context['s3_redirect'] = { 

1909 'redirected': False, 

1910 'bucket': bucket, 

1911 'params': params, 

1912 } 

1913 

1914 

1915class S3RegionRedirector: 

1916 """This handler has been replaced by S3RegionRedirectorv2. The original 

1917 version remains in place for any third-party libraries that import it. 

1918 """ 

1919 

1920 def __init__(self, endpoint_bridge, client, cache=None): 

1921 self._endpoint_resolver = endpoint_bridge 

1922 self._cache = cache 

1923 if self._cache is None: 

1924 self._cache = {} 

1925 

1926 # This needs to be a weak ref in order to prevent memory leaks on 

1927 # python 2.6 

1928 self._client = weakref.proxy(client) 

1929 

1930 warnings.warn( 

1931 'The S3RegionRedirector class has been deprecated for a new ' 

1932 'internal replacement. A future version of botocore may remove ' 

1933 'this class.', 

1934 category=FutureWarning, 

1935 ) 

1936 

1937 def register(self, event_emitter=None): 

1938 emitter = event_emitter or self._client.meta.events 

1939 emitter.register('needs-retry.s3', self.redirect_from_error) 

1940 emitter.register('before-call.s3', self.set_request_url) 

1941 emitter.register('before-parameter-build.s3', self.redirect_from_cache) 

1942 

1943 def redirect_from_error(self, request_dict, response, operation, **kwargs): 

1944 """ 

1945 An S3 request sent to the wrong region will return an error that 

1946 contains the endpoint the request should be sent to. This handler 

1947 will add the redirect information to the signing context and then 

1948 redirect the request. 

1949 """ 

1950 if response is None: 

1951 # This could be none if there was a ConnectionError or other 

1952 # transport error. 

1953 return 

1954 

1955 if self._is_s3_accesspoint(request_dict.get('context', {})): 

1956 logger.debug( 

1957 'S3 request was previously to an accesspoint, not redirecting.' 

1958 ) 

1959 return 

1960 

1961 if request_dict.get('context', {}).get('s3_redirected'): 

1962 logger.debug( 

1963 'S3 request was previously redirected, not redirecting.' 

1964 ) 

1965 return 

1966 

1967 error = response[1].get('Error', {}) 

1968 error_code = error.get('Code') 

1969 response_metadata = response[1].get('ResponseMetadata', {}) 

1970 

1971 # We have to account for 400 responses because 

1972 # if we sign a Head* request with the wrong region, 

1973 # we'll get a 400 Bad Request but we won't get a 

1974 # body saying it's an "AuthorizationHeaderMalformed". 

1975 is_special_head_object = ( 

1976 error_code in ('301', '400') and operation.name == 'HeadObject' 

1977 ) 

1978 is_special_head_bucket = ( 

1979 error_code in ('301', '400') 

1980 and operation.name == 'HeadBucket' 

1981 and 'x-amz-bucket-region' 

1982 in response_metadata.get('HTTPHeaders', {}) 

1983 ) 

1984 is_wrong_signing_region = ( 

1985 error_code == 'AuthorizationHeaderMalformed' and 'Region' in error 

1986 ) 

1987 is_redirect_status = response[0] is not None and response[ 

1988 0 

1989 ].status_code in (301, 302, 307) 

1990 is_permanent_redirect = error_code == 'PermanentRedirect' 

1991 if not any( 

1992 [ 

1993 is_special_head_object, 

1994 is_wrong_signing_region, 

1995 is_permanent_redirect, 

1996 is_special_head_bucket, 

1997 is_redirect_status, 

1998 ] 

1999 ): 

2000 return 

2001 

2002 bucket = request_dict['context']['signing']['bucket'] 

2003 client_region = request_dict['context'].get('client_region') 

2004 new_region = self.get_bucket_region(bucket, response) 

2005 

2006 if new_region is None: 

2007 logger.debug( 

2008 "S3 client configured for region %s but the bucket %s is not " 

2009 "in that region and the proper region could not be " 

2010 "automatically determined." % (client_region, bucket) 

2011 ) 

2012 return 

2013 

2014 logger.debug( 

2015 "S3 client configured for region %s but the bucket %s is in region" 

2016 " %s; Please configure the proper region to avoid multiple " 

2017 "unnecessary redirects and signing attempts." 

2018 % (client_region, bucket, new_region) 

2019 ) 

2020 endpoint = self._endpoint_resolver.resolve('s3', new_region) 

2021 endpoint = endpoint['endpoint_url'] 

2022 

2023 signing_context = { 

2024 'region': new_region, 

2025 'bucket': bucket, 

2026 'endpoint': endpoint, 

2027 } 

2028 request_dict['context']['signing'] = signing_context 

2029 

2030 self._cache[bucket] = signing_context 

2031 self.set_request_url(request_dict, request_dict['context']) 

2032 

2033 request_dict['context']['s3_redirected'] = True 

2034 

2035 # Return 0 so it doesn't wait to retry 

2036 return 0 

2037 

2038 def get_bucket_region(self, bucket, response): 

2039 """ 

2040 There are multiple potential sources for the new region to redirect to, 

2041 but they aren't all universally available for use. This will try to 

2042 find region from response elements, but will fall back to calling 

2043 HEAD on the bucket if all else fails. 

2044 

2045 :param bucket: The bucket to find the region for. This is necessary if 

2046 the region is not available in the error response. 

2047 :param response: A response representing a service request that failed 

2048 due to incorrect region configuration. 

2049 """ 

2050 # First try to source the region from the headers. 

2051 service_response = response[1] 

2052 response_headers = service_response['ResponseMetadata']['HTTPHeaders'] 

2053 if 'x-amz-bucket-region' in response_headers: 

2054 return response_headers['x-amz-bucket-region'] 

2055 

2056 # Next, check the error body 

2057 region = service_response.get('Error', {}).get('Region', None) 

2058 if region is not None: 

2059 return region 

2060 

2061 # Finally, HEAD the bucket. No other choice sadly. 

2062 try: 

2063 response = self._client.head_bucket(Bucket=bucket) 

2064 headers = response['ResponseMetadata']['HTTPHeaders'] 

2065 except ClientError as e: 

2066 headers = e.response['ResponseMetadata']['HTTPHeaders'] 

2067 

2068 region = headers.get('x-amz-bucket-region', None) 

2069 return region 

2070 

2071 def set_request_url(self, params, context, **kwargs): 

2072 endpoint = context.get('signing', {}).get('endpoint', None) 

2073 if endpoint is not None: 

2074 params['url'] = _get_new_endpoint(params['url'], endpoint, False) 

2075 

2076 def redirect_from_cache(self, params, context, **kwargs): 

2077 """ 

2078 This handler retrieves a given bucket's signing context from the cache 

2079 and adds it into the request context. 

2080 """ 

2081 if self._is_s3_accesspoint(context): 

2082 return 

2083 bucket = params.get('Bucket') 

2084 signing_context = self._cache.get(bucket) 

2085 if signing_context is not None: 

2086 context['signing'] = signing_context 

2087 else: 

2088 context['signing'] = {'bucket': bucket} 

2089 

2090 def _is_s3_accesspoint(self, context): 

2091 return 's3_accesspoint' in context 

2092 

2093 

2094class InvalidArnException(ValueError): 

2095 pass 

2096 

2097 

2098class ArnParser: 

2099 def parse_arn(self, arn): 

2100 arn_parts = arn.split(':', 5) 

2101 if len(arn_parts) < 6: 

2102 raise InvalidArnException( 

2103 'Provided ARN: %s must be of the format: ' 

2104 'arn:partition:service:region:account:resource' % arn 

2105 ) 

2106 return { 

2107 'partition': arn_parts[1], 

2108 'service': arn_parts[2], 

2109 'region': arn_parts[3], 

2110 'account': arn_parts[4], 

2111 'resource': arn_parts[5], 

2112 } 

2113 

2114 @staticmethod 

2115 def is_arn(value): 

2116 if not isinstance(value, str) or not value.startswith('arn:'): 

2117 return False 

2118 arn_parser = ArnParser() 

2119 try: 

2120 arn_parser.parse_arn(value) 

2121 return True 

2122 except InvalidArnException: 

2123 return False 

2124 

2125 

2126class S3ArnParamHandler: 

2127 _RESOURCE_REGEX = re.compile( 

2128 r'^(?P<resource_type>accesspoint|outpost)[/:](?P<resource_name>.+)$' 

2129 ) 

2130 _OUTPOST_RESOURCE_REGEX = re.compile( 

2131 r'^(?P<outpost_name>[a-zA-Z0-9\-]{1,63})[/:]accesspoint[/:]' 

2132 r'(?P<accesspoint_name>[a-zA-Z0-9\-]{1,63}$)' 

2133 ) 

2134 _BLACKLISTED_OPERATIONS = ['CreateBucket'] 

2135 

2136 def __init__(self, arn_parser=None): 

2137 self._arn_parser = arn_parser 

2138 if arn_parser is None: 

2139 self._arn_parser = ArnParser() 

2140 

2141 def register(self, event_emitter): 

2142 event_emitter.register('before-parameter-build.s3', self.handle_arn) 

2143 

2144 def handle_arn(self, params, model, context, **kwargs): 

2145 if model.name in self._BLACKLISTED_OPERATIONS: 

2146 return 

2147 arn_details = self._get_arn_details_from_bucket_param(params) 

2148 if arn_details is None: 

2149 return 

2150 if arn_details['resource_type'] == 'accesspoint': 

2151 self._store_accesspoint(params, context, arn_details) 

2152 elif arn_details['resource_type'] == 'outpost': 

2153 self._store_outpost(params, context, arn_details) 

2154 

2155 def _get_arn_details_from_bucket_param(self, params): 

2156 if 'Bucket' in params: 

2157 try: 

2158 arn = params['Bucket'] 

2159 arn_details = self._arn_parser.parse_arn(arn) 

2160 self._add_resource_type_and_name(arn, arn_details) 

2161 return arn_details 

2162 except InvalidArnException: 

2163 pass 

2164 return None 

2165 

2166 def _add_resource_type_and_name(self, arn, arn_details): 

2167 match = self._RESOURCE_REGEX.match(arn_details['resource']) 

2168 if match: 

2169 arn_details['resource_type'] = match.group('resource_type') 

2170 arn_details['resource_name'] = match.group('resource_name') 

2171 else: 

2172 raise UnsupportedS3ArnError(arn=arn) 

2173 

2174 def _store_accesspoint(self, params, context, arn_details): 

2175 # Ideally the access-point would be stored as a parameter in the 

2176 # request where the serializer would then know how to serialize it, 

2177 # but access-points are not modeled in S3 operations so it would fail 

2178 # validation. Instead, we set the access-point to the bucket parameter 

2179 # to have some value set when serializing the request and additional 

2180 # information on the context from the arn to use in forming the 

2181 # access-point endpoint. 

2182 params['Bucket'] = arn_details['resource_name'] 

2183 context['s3_accesspoint'] = { 

2184 'name': arn_details['resource_name'], 

2185 'account': arn_details['account'], 

2186 'partition': arn_details['partition'], 

2187 'region': arn_details['region'], 

2188 'service': arn_details['service'], 

2189 } 

2190 

2191 def _store_outpost(self, params, context, arn_details): 

2192 resource_name = arn_details['resource_name'] 

2193 match = self._OUTPOST_RESOURCE_REGEX.match(resource_name) 

2194 if not match: 

2195 raise UnsupportedOutpostResourceError(resource_name=resource_name) 

2196 # Because we need to set the bucket name to something to pass 

2197 # validation we're going to use the access point name to be consistent 

2198 # with normal access point arns. 

2199 accesspoint_name = match.group('accesspoint_name') 

2200 params['Bucket'] = accesspoint_name 

2201 context['s3_accesspoint'] = { 

2202 'outpost_name': match.group('outpost_name'), 

2203 'name': accesspoint_name, 

2204 'account': arn_details['account'], 

2205 'partition': arn_details['partition'], 

2206 'region': arn_details['region'], 

2207 'service': arn_details['service'], 

2208 } 

2209 

2210 

2211class S3EndpointSetter: 

2212 _DEFAULT_PARTITION = 'aws' 

2213 _DEFAULT_DNS_SUFFIX = 'amazonaws.com' 

2214 

2215 def __init__( 

2216 self, 

2217 endpoint_resolver, 

2218 region=None, 

2219 s3_config=None, 

2220 endpoint_url=None, 

2221 partition=None, 

2222 use_fips_endpoint=False, 

2223 ): 

2224 # This is calling the endpoint_resolver in regions.py 

2225 self._endpoint_resolver = endpoint_resolver 

2226 self._region = region 

2227 self._s3_config = s3_config 

2228 self._use_fips_endpoint = use_fips_endpoint 

2229 if s3_config is None: 

2230 self._s3_config = {} 

2231 self._endpoint_url = endpoint_url 

2232 self._partition = partition 

2233 if partition is None: 

2234 self._partition = self._DEFAULT_PARTITION 

2235 

2236 def register(self, event_emitter): 

2237 event_emitter.register('before-sign.s3', self.set_endpoint) 

2238 event_emitter.register('choose-signer.s3', self.set_signer) 

2239 event_emitter.register( 

2240 'before-call.s3.WriteGetObjectResponse', 

2241 self.update_endpoint_to_s3_object_lambda, 

2242 ) 

2243 

2244 def update_endpoint_to_s3_object_lambda(self, params, context, **kwargs): 

2245 if self._use_accelerate_endpoint: 

2246 raise UnsupportedS3ConfigurationError( 

2247 msg='S3 client does not support accelerate endpoints for S3 Object Lambda operations', 

2248 ) 

2249 

2250 self._override_signing_name(context, 's3-object-lambda') 

2251 if self._endpoint_url: 

2252 # Only update the url if an explicit url was not provided 

2253 return 

2254 

2255 resolver = self._endpoint_resolver 

2256 # Constructing endpoints as s3-object-lambda as region 

2257 resolved = resolver.construct_endpoint( 

2258 's3-object-lambda', self._region 

2259 ) 

2260 

2261 # Ideally we would be able to replace the endpoint before 

2262 # serialization but there's no event to do that currently 

2263 # host_prefix is all the arn/bucket specs 

2264 new_endpoint = 'https://{host_prefix}{hostname}'.format( 

2265 host_prefix=params['host_prefix'], 

2266 hostname=resolved['hostname'], 

2267 ) 

2268 

2269 params['url'] = _get_new_endpoint(params['url'], new_endpoint, False) 

2270 

2271 def set_endpoint(self, request, **kwargs): 

2272 if self._use_accesspoint_endpoint(request): 

2273 self._validate_accesspoint_supported(request) 

2274 self._validate_fips_supported(request) 

2275 self._validate_global_regions(request) 

2276 region_name = self._resolve_region_for_accesspoint_endpoint( 

2277 request 

2278 ) 

2279 self._resolve_signing_name_for_accesspoint_endpoint(request) 

2280 self._switch_to_accesspoint_endpoint(request, region_name) 

2281 return 

2282 if self._use_accelerate_endpoint: 

2283 if self._use_fips_endpoint: 

2284 raise UnsupportedS3ConfigurationError( 

2285 msg=( 

2286 'Client is configured to use the FIPS psuedo region ' 

2287 'for "%s", but S3 Accelerate does not have any FIPS ' 

2288 'compatible endpoints.' % (self._region) 

2289 ) 

2290 ) 

2291 switch_host_s3_accelerate(request=request, **kwargs) 

2292 if self._s3_addressing_handler: 

2293 self._s3_addressing_handler(request=request, **kwargs) 

2294 

2295 def _use_accesspoint_endpoint(self, request): 

2296 return 's3_accesspoint' in request.context 

2297 

2298 def _validate_fips_supported(self, request): 

2299 if not self._use_fips_endpoint: 

2300 return 

2301 if 'fips' in request.context['s3_accesspoint']['region']: 

2302 raise UnsupportedS3AccesspointConfigurationError( 

2303 msg={'Invalid ARN, FIPS region not allowed in ARN.'} 

2304 ) 

2305 if 'outpost_name' in request.context['s3_accesspoint']: 

2306 raise UnsupportedS3AccesspointConfigurationError( 

2307 msg=( 

2308 'Client is configured to use the FIPS psuedo-region "%s", ' 

2309 'but outpost ARNs do not support FIPS endpoints.' 

2310 % (self._region) 

2311 ) 

2312 ) 

2313 # Transforming psuedo region to actual region 

2314 accesspoint_region = request.context['s3_accesspoint']['region'] 

2315 if accesspoint_region != self._region: 

2316 if not self._s3_config.get('use_arn_region', True): 

2317 # TODO: Update message to reflect use_arn_region 

2318 # is not set 

2319 raise UnsupportedS3AccesspointConfigurationError( 

2320 msg=( 

2321 'Client is configured to use the FIPS psuedo-region ' 

2322 'for "%s", but the access-point ARN provided is for ' 

2323 'the "%s" region. For clients using a FIPS ' 

2324 'psuedo-region calls to access-point ARNs in another ' 

2325 'region are not allowed.' 

2326 % (self._region, accesspoint_region) 

2327 ) 

2328 ) 

2329 

2330 def _validate_global_regions(self, request): 

2331 if self._s3_config.get('use_arn_region', True): 

2332 return 

2333 if self._region in ['aws-global', 's3-external-1']: 

2334 raise UnsupportedS3AccesspointConfigurationError( 

2335 msg=( 

2336 'Client is configured to use the global psuedo-region ' 

2337 '"%s". When providing access-point ARNs a regional ' 

2338 'endpoint must be specified.' % self._region 

2339 ) 

2340 ) 

2341 

2342 def _validate_accesspoint_supported(self, request): 

2343 if self._use_accelerate_endpoint: 

2344 raise UnsupportedS3AccesspointConfigurationError( 

2345 msg=( 

2346 'Client does not support s3 accelerate configuration ' 

2347 'when an access-point ARN is specified.' 

2348 ) 

2349 ) 

2350 request_partition = request.context['s3_accesspoint']['partition'] 

2351 if request_partition != self._partition: 

2352 raise UnsupportedS3AccesspointConfigurationError( 

2353 msg=( 

2354 'Client is configured for "%s" partition, but access-point' 

2355 ' ARN provided is for "%s" partition. The client and ' 

2356 ' access-point partition must be the same.' 

2357 % (self._partition, request_partition) 

2358 ) 

2359 ) 

2360 s3_service = request.context['s3_accesspoint'].get('service') 

2361 if s3_service == 's3-object-lambda' and self._s3_config.get( 

2362 'use_dualstack_endpoint' 

2363 ): 

2364 raise UnsupportedS3AccesspointConfigurationError( 

2365 msg=( 

2366 'Client does not support s3 dualstack configuration ' 

2367 'when an S3 Object Lambda access point ARN is specified.' 

2368 ) 

2369 ) 

2370 outpost_name = request.context['s3_accesspoint'].get('outpost_name') 

2371 if outpost_name and self._s3_config.get('use_dualstack_endpoint'): 

2372 raise UnsupportedS3AccesspointConfigurationError( 

2373 msg=( 

2374 'Client does not support s3 dualstack configuration ' 

2375 'when an outpost ARN is specified.' 

2376 ) 

2377 ) 

2378 self._validate_mrap_s3_config(request) 

2379 

2380 def _validate_mrap_s3_config(self, request): 

2381 if not is_global_accesspoint(request.context): 

2382 return 

2383 if self._s3_config.get('s3_disable_multiregion_access_points'): 

2384 raise UnsupportedS3AccesspointConfigurationError( 

2385 msg=( 

2386 'Invalid configuration, Multi-Region Access Point ' 

2387 'ARNs are disabled.' 

2388 ) 

2389 ) 

2390 elif self._s3_config.get('use_dualstack_endpoint'): 

2391 raise UnsupportedS3AccesspointConfigurationError( 

2392 msg=( 

2393 'Client does not support s3 dualstack configuration ' 

2394 'when a Multi-Region Access Point ARN is specified.' 

2395 ) 

2396 ) 

2397 

2398 def _resolve_region_for_accesspoint_endpoint(self, request): 

2399 if is_global_accesspoint(request.context): 

2400 # Requests going to MRAP endpoints MUST be set to any (*) region. 

2401 self._override_signing_region(request, '*') 

2402 elif self._s3_config.get('use_arn_region', True): 

2403 accesspoint_region = request.context['s3_accesspoint']['region'] 

2404 # If we are using the region from the access point, 

2405 # we will also want to make sure that we set it as the 

2406 # signing region as well 

2407 self._override_signing_region(request, accesspoint_region) 

2408 return accesspoint_region 

2409 return self._region 

2410 

2411 def set_signer(self, context, **kwargs): 

2412 if is_global_accesspoint(context): 

2413 if HAS_CRT: 

2414 return 's3v4a' 

2415 else: 

2416 raise MissingDependencyException( 

2417 msg="Using S3 with an MRAP arn requires an additional " 

2418 "dependency. You will need to pip install " 

2419 "botocore[crt] before proceeding." 

2420 ) 

2421 

2422 def _resolve_signing_name_for_accesspoint_endpoint(self, request): 

2423 accesspoint_service = request.context['s3_accesspoint']['service'] 

2424 self._override_signing_name(request.context, accesspoint_service) 

2425 

2426 def _switch_to_accesspoint_endpoint(self, request, region_name): 

2427 original_components = urlsplit(request.url) 

2428 accesspoint_endpoint = urlunsplit( 

2429 ( 

2430 original_components.scheme, 

2431 self._get_netloc(request.context, region_name), 

2432 self._get_accesspoint_path( 

2433 original_components.path, request.context 

2434 ), 

2435 original_components.query, 

2436 '', 

2437 ) 

2438 ) 

2439 logger.debug( 

2440 f'Updating URI from {request.url} to {accesspoint_endpoint}' 

2441 ) 

2442 request.url = accesspoint_endpoint 

2443 

2444 def _get_netloc(self, request_context, region_name): 

2445 if is_global_accesspoint(request_context): 

2446 return self._get_mrap_netloc(request_context) 

2447 else: 

2448 return self._get_accesspoint_netloc(request_context, region_name) 

2449 

2450 def _get_mrap_netloc(self, request_context): 

2451 s3_accesspoint = request_context['s3_accesspoint'] 

2452 region_name = 's3-global' 

2453 mrap_netloc_components = [s3_accesspoint['name']] 

2454 if self._endpoint_url: 

2455 endpoint_url_netloc = urlsplit(self._endpoint_url).netloc 

2456 mrap_netloc_components.append(endpoint_url_netloc) 

2457 else: 

2458 partition = s3_accesspoint['partition'] 

2459 mrap_netloc_components.extend( 

2460 [ 

2461 'accesspoint', 

2462 region_name, 

2463 self._get_partition_dns_suffix(partition), 

2464 ] 

2465 ) 

2466 return '.'.join(mrap_netloc_components) 

2467 

2468 def _get_accesspoint_netloc(self, request_context, region_name): 

2469 s3_accesspoint = request_context['s3_accesspoint'] 

2470 accesspoint_netloc_components = [ 

2471 '{}-{}'.format(s3_accesspoint['name'], s3_accesspoint['account']), 

2472 ] 

2473 outpost_name = s3_accesspoint.get('outpost_name') 

2474 if self._endpoint_url: 

2475 if outpost_name: 

2476 accesspoint_netloc_components.append(outpost_name) 

2477 endpoint_url_netloc = urlsplit(self._endpoint_url).netloc 

2478 accesspoint_netloc_components.append(endpoint_url_netloc) 

2479 else: 

2480 if outpost_name: 

2481 outpost_host = [outpost_name, 's3-outposts'] 

2482 accesspoint_netloc_components.extend(outpost_host) 

2483 elif s3_accesspoint['service'] == 's3-object-lambda': 

2484 component = self._inject_fips_if_needed( 

2485 's3-object-lambda', request_context 

2486 ) 

2487 accesspoint_netloc_components.append(component) 

2488 else: 

2489 component = self._inject_fips_if_needed( 

2490 's3-accesspoint', request_context 

2491 ) 

2492 accesspoint_netloc_components.append(component) 

2493 if self._s3_config.get('use_dualstack_endpoint'): 

2494 accesspoint_netloc_components.append('dualstack') 

2495 accesspoint_netloc_components.extend( 

2496 [region_name, self._get_dns_suffix(region_name)] 

2497 ) 

2498 return '.'.join(accesspoint_netloc_components) 

2499 

2500 def _inject_fips_if_needed(self, component, request_context): 

2501 if self._use_fips_endpoint: 

2502 return '%s-fips' % component 

2503 return component 

2504 

2505 def _get_accesspoint_path(self, original_path, request_context): 

2506 # The Bucket parameter was substituted with the access-point name as 

2507 # some value was required in serializing the bucket name. Now that 

2508 # we are making the request directly to the access point, we will 

2509 # want to remove that access-point name from the path. 

2510 name = request_context['s3_accesspoint']['name'] 

2511 # All S3 operations require at least a / in their path. 

2512 return original_path.replace('/' + name, '', 1) or '/' 

2513 

2514 def _get_partition_dns_suffix(self, partition_name): 

2515 dns_suffix = self._endpoint_resolver.get_partition_dns_suffix( 

2516 partition_name 

2517 ) 

2518 if dns_suffix is None: 

2519 dns_suffix = self._DEFAULT_DNS_SUFFIX 

2520 return dns_suffix 

2521 

2522 def _get_dns_suffix(self, region_name): 

2523 resolved = self._endpoint_resolver.construct_endpoint( 

2524 's3', region_name 

2525 ) 

2526 dns_suffix = self._DEFAULT_DNS_SUFFIX 

2527 if resolved and 'dnsSuffix' in resolved: 

2528 dns_suffix = resolved['dnsSuffix'] 

2529 return dns_suffix 

2530 

2531 def _override_signing_region(self, request, region_name): 

2532 signing_context = request.context.get('signing', {}) 

2533 # S3SigV4Auth will use the context['signing']['region'] value to 

2534 # sign with if present. This is used by the Bucket redirector 

2535 # as well but we should be fine because the redirector is never 

2536 # used in combination with the accesspoint setting logic. 

2537 signing_context['region'] = region_name 

2538 request.context['signing'] = signing_context 

2539 

2540 def _override_signing_name(self, context, signing_name): 

2541 signing_context = context.get('signing', {}) 

2542 # S3SigV4Auth will use the context['signing']['signing_name'] value to 

2543 # sign with if present. This is used by the Bucket redirector 

2544 # as well but we should be fine because the redirector is never 

2545 # used in combination with the accesspoint setting logic. 

2546 signing_context['signing_name'] = signing_name 

2547 context['signing'] = signing_context 

2548 

2549 @CachedProperty 

2550 def _use_accelerate_endpoint(self): 

2551 # Enable accelerate if the configuration is set to to true or the 

2552 # endpoint being used matches one of the accelerate endpoints. 

2553 

2554 # Accelerate has been explicitly configured. 

2555 if self._s3_config.get('use_accelerate_endpoint'): 

2556 return True 

2557 

2558 # Accelerate mode is turned on automatically if an endpoint url is 

2559 # provided that matches the accelerate scheme. 

2560 if self._endpoint_url is None: 

2561 return False 

2562 

2563 # Accelerate is only valid for Amazon endpoints. 

2564 netloc = urlsplit(self._endpoint_url).netloc 

2565 if not netloc.endswith('amazonaws.com'): 

2566 return False 

2567 

2568 # The first part of the url should always be s3-accelerate. 

2569 parts = netloc.split('.') 

2570 if parts[0] != 's3-accelerate': 

2571 return False 

2572 

2573 # Url parts between 's3-accelerate' and 'amazonaws.com' which 

2574 # represent different url features. 

2575 feature_parts = parts[1:-2] 

2576 

2577 # There should be no duplicate url parts. 

2578 if len(feature_parts) != len(set(feature_parts)): 

2579 return False 

2580 

2581 # Remaining parts must all be in the whitelist. 

2582 return all(p in S3_ACCELERATE_WHITELIST for p in feature_parts) 

2583 

2584 @CachedProperty 

2585 def _addressing_style(self): 

2586 # Use virtual host style addressing if accelerate is enabled or if 

2587 # the given endpoint url is an accelerate endpoint. 

2588 if self._use_accelerate_endpoint: 

2589 return 'virtual' 

2590 

2591 # If a particular addressing style is configured, use it. 

2592 configured_addressing_style = self._s3_config.get('addressing_style') 

2593 if configured_addressing_style: 

2594 return configured_addressing_style 

2595 

2596 @CachedProperty 

2597 def _s3_addressing_handler(self): 

2598 # If virtual host style was configured, use it regardless of whether 

2599 # or not the bucket looks dns compatible. 

2600 if self._addressing_style == 'virtual': 

2601 logger.debug("Using S3 virtual host style addressing.") 

2602 return switch_to_virtual_host_style 

2603 

2604 # If path style is configured, no additional steps are needed. If 

2605 # endpoint_url was specified, don't default to virtual. We could 

2606 # potentially default provided endpoint urls to virtual hosted 

2607 # style, but for now it is avoided. 

2608 if self._addressing_style == 'path' or self._endpoint_url is not None: 

2609 logger.debug("Using S3 path style addressing.") 

2610 return None 

2611 

2612 logger.debug( 

2613 "Defaulting to S3 virtual host style addressing with " 

2614 "path style addressing fallback." 

2615 ) 

2616 

2617 # By default, try to use virtual style with path fallback. 

2618 return fix_s3_host 

2619 

2620 

2621class S3ControlEndpointSetter: 

2622 _DEFAULT_PARTITION = 'aws' 

2623 _DEFAULT_DNS_SUFFIX = 'amazonaws.com' 

2624 _HOST_LABEL_REGEX = re.compile(r'^[a-zA-Z0-9\-]{1,63}$') 

2625 

2626 def __init__( 

2627 self, 

2628 endpoint_resolver, 

2629 region=None, 

2630 s3_config=None, 

2631 endpoint_url=None, 

2632 partition=None, 

2633 use_fips_endpoint=False, 

2634 ): 

2635 self._endpoint_resolver = endpoint_resolver 

2636 self._region = region 

2637 self._s3_config = s3_config 

2638 self._use_fips_endpoint = use_fips_endpoint 

2639 if s3_config is None: 

2640 self._s3_config = {} 

2641 self._endpoint_url = endpoint_url 

2642 self._partition = partition 

2643 if partition is None: 

2644 self._partition = self._DEFAULT_PARTITION 

2645 

2646 def register(self, event_emitter): 

2647 event_emitter.register('before-sign.s3-control', self.set_endpoint) 

2648 

2649 def set_endpoint(self, request, **kwargs): 

2650 if self._use_endpoint_from_arn_details(request): 

2651 self._validate_endpoint_from_arn_details_supported(request) 

2652 region_name = self._resolve_region_from_arn_details(request) 

2653 self._resolve_signing_name_from_arn_details(request) 

2654 self._resolve_endpoint_from_arn_details(request, region_name) 

2655 self._add_headers_from_arn_details(request) 

2656 elif self._use_endpoint_from_outpost_id(request): 

2657 self._validate_outpost_redirection_valid(request) 

2658 self._override_signing_name(request, 's3-outposts') 

2659 new_netloc = self._construct_outpost_endpoint(self._region) 

2660 self._update_request_netloc(request, new_netloc) 

2661 

2662 def _use_endpoint_from_arn_details(self, request): 

2663 return 'arn_details' in request.context 

2664 

2665 def _use_endpoint_from_outpost_id(self, request): 

2666 return 'outpost_id' in request.context 

2667 

2668 def _validate_endpoint_from_arn_details_supported(self, request): 

2669 if 'fips' in request.context['arn_details']['region']: 

2670 raise UnsupportedS3ControlArnError( 

2671 arn=request.context['arn_details']['original'], 

2672 msg='Invalid ARN, FIPS region not allowed in ARN.', 

2673 ) 

2674 if not self._s3_config.get('use_arn_region', False): 

2675 arn_region = request.context['arn_details']['region'] 

2676 if arn_region != self._region: 

2677 error_msg = ( 

2678 'The use_arn_region configuration is disabled but ' 

2679 'received arn for "%s" when the client is configured ' 

2680 'to use "%s"' 

2681 ) % (arn_region, self._region) 

2682 raise UnsupportedS3ControlConfigurationError(msg=error_msg) 

2683 request_partion = request.context['arn_details']['partition'] 

2684 if request_partion != self._partition: 

2685 raise UnsupportedS3ControlConfigurationError( 

2686 msg=( 

2687 'Client is configured for "%s" partition, but arn ' 

2688 'provided is for "%s" partition. The client and ' 

2689 'arn partition must be the same.' 

2690 % (self._partition, request_partion) 

2691 ) 

2692 ) 

2693 if self._s3_config.get('use_accelerate_endpoint'): 

2694 raise UnsupportedS3ControlConfigurationError( 

2695 msg='S3 control client does not support accelerate endpoints', 

2696 ) 

2697 if 'outpost_name' in request.context['arn_details']: 

2698 self._validate_outpost_redirection_valid(request) 

2699 

2700 def _validate_outpost_redirection_valid(self, request): 

2701 if self._s3_config.get('use_dualstack_endpoint'): 

2702 raise UnsupportedS3ControlConfigurationError( 

2703 msg=( 

2704 'Client does not support s3 dualstack configuration ' 

2705 'when an outpost is specified.' 

2706 ) 

2707 ) 

2708 

2709 def _resolve_region_from_arn_details(self, request): 

2710 if self._s3_config.get('use_arn_region', False): 

2711 arn_region = request.context['arn_details']['region'] 

2712 # If we are using the region from the expanded arn, we will also 

2713 # want to make sure that we set it as the signing region as well 

2714 self._override_signing_region(request, arn_region) 

2715 return arn_region 

2716 return self._region 

2717 

2718 def _resolve_signing_name_from_arn_details(self, request): 

2719 arn_service = request.context['arn_details']['service'] 

2720 self._override_signing_name(request, arn_service) 

2721 return arn_service 

2722 

2723 def _resolve_endpoint_from_arn_details(self, request, region_name): 

2724 new_netloc = self._resolve_netloc_from_arn_details( 

2725 request, region_name 

2726 ) 

2727 self._update_request_netloc(request, new_netloc) 

2728 

2729 def _update_request_netloc(self, request, new_netloc): 

2730 original_components = urlsplit(request.url) 

2731 arn_details_endpoint = urlunsplit( 

2732 ( 

2733 original_components.scheme, 

2734 new_netloc, 

2735 original_components.path, 

2736 original_components.query, 

2737 '', 

2738 ) 

2739 ) 

2740 logger.debug( 

2741 f'Updating URI from {request.url} to {arn_details_endpoint}' 

2742 ) 

2743 request.url = arn_details_endpoint 

2744 

2745 def _resolve_netloc_from_arn_details(self, request, region_name): 

2746 arn_details = request.context['arn_details'] 

2747 if 'outpost_name' in arn_details: 

2748 return self._construct_outpost_endpoint(region_name) 

2749 account = arn_details['account'] 

2750 return self._construct_s3_control_endpoint(region_name, account) 

2751 

2752 def _is_valid_host_label(self, label): 

2753 return self._HOST_LABEL_REGEX.match(label) 

2754 

2755 def _validate_host_labels(self, *labels): 

2756 for label in labels: 

2757 if not self._is_valid_host_label(label): 

2758 raise InvalidHostLabelError(label=label) 

2759 

2760 def _construct_s3_control_endpoint(self, region_name, account): 

2761 self._validate_host_labels(region_name, account) 

2762 if self._endpoint_url: 

2763 endpoint_url_netloc = urlsplit(self._endpoint_url).netloc 

2764 netloc = [account, endpoint_url_netloc] 

2765 else: 

2766 netloc = [ 

2767 account, 

2768 's3-control', 

2769 ] 

2770 self._add_dualstack(netloc) 

2771 dns_suffix = self._get_dns_suffix(region_name) 

2772 netloc.extend([region_name, dns_suffix]) 

2773 return self._construct_netloc(netloc) 

2774 

2775 def _construct_outpost_endpoint(self, region_name): 

2776 self._validate_host_labels(region_name) 

2777 if self._endpoint_url: 

2778 return urlsplit(self._endpoint_url).netloc 

2779 else: 

2780 netloc = [ 

2781 's3-outposts', 

2782 region_name, 

2783 self._get_dns_suffix(region_name), 

2784 ] 

2785 self._add_fips(netloc) 

2786 return self._construct_netloc(netloc) 

2787 

2788 def _construct_netloc(self, netloc): 

2789 return '.'.join(netloc) 

2790 

2791 def _add_fips(self, netloc): 

2792 if self._use_fips_endpoint: 

2793 netloc[0] = netloc[0] + '-fips' 

2794 

2795 def _add_dualstack(self, netloc): 

2796 if self._s3_config.get('use_dualstack_endpoint'): 

2797 netloc.append('dualstack') 

2798 

2799 def _get_dns_suffix(self, region_name): 

2800 resolved = self._endpoint_resolver.construct_endpoint( 

2801 's3', region_name 

2802 ) 

2803 dns_suffix = self._DEFAULT_DNS_SUFFIX 

2804 if resolved and 'dnsSuffix' in resolved: 

2805 dns_suffix = resolved['dnsSuffix'] 

2806 return dns_suffix 

2807 

2808 def _override_signing_region(self, request, region_name): 

2809 signing_context = request.context.get('signing', {}) 

2810 # S3SigV4Auth will use the context['signing']['region'] value to 

2811 # sign with if present. This is used by the Bucket redirector 

2812 # as well but we should be fine because the redirector is never 

2813 # used in combination with the accesspoint setting logic. 

2814 signing_context['region'] = region_name 

2815 request.context['signing'] = signing_context 

2816 

2817 def _override_signing_name(self, request, signing_name): 

2818 signing_context = request.context.get('signing', {}) 

2819 # S3SigV4Auth will use the context['signing']['signing_name'] value to 

2820 # sign with if present. This is used by the Bucket redirector 

2821 # as well but we should be fine because the redirector is never 

2822 # used in combination with the accesspoint setting logic. 

2823 signing_context['signing_name'] = signing_name 

2824 request.context['signing'] = signing_context 

2825 

2826 def _add_headers_from_arn_details(self, request): 

2827 arn_details = request.context['arn_details'] 

2828 outpost_name = arn_details.get('outpost_name') 

2829 if outpost_name: 

2830 self._add_outpost_id_header(request, outpost_name) 

2831 

2832 def _add_outpost_id_header(self, request, outpost_name): 

2833 request.headers['x-amz-outpost-id'] = outpost_name 

2834 

2835 

2836class S3ControlArnParamHandler: 

2837 """This handler has been replaced by S3ControlArnParamHandlerv2. The 

2838 original version remains in place for any third-party importers. 

2839 """ 

2840 

2841 _RESOURCE_SPLIT_REGEX = re.compile(r'[/:]') 

2842 

2843 def __init__(self, arn_parser=None): 

2844 self._arn_parser = arn_parser 

2845 if arn_parser is None: 

2846 self._arn_parser = ArnParser() 

2847 warnings.warn( 

2848 'The S3ControlArnParamHandler class has been deprecated for a new ' 

2849 'internal replacement. A future version of botocore may remove ' 

2850 'this class.', 

2851 category=FutureWarning, 

2852 ) 

2853 

2854 def register(self, event_emitter): 

2855 event_emitter.register( 

2856 'before-parameter-build.s3-control', 

2857 self.handle_arn, 

2858 ) 

2859 

2860 def handle_arn(self, params, model, context, **kwargs): 

2861 if model.name in ('CreateBucket', 'ListRegionalBuckets'): 

2862 # CreateBucket and ListRegionalBuckets are special cases that do 

2863 # not obey ARN based redirection but will redirect based off of the 

2864 # presence of the OutpostId parameter 

2865 self._handle_outpost_id_param(params, model, context) 

2866 else: 

2867 self._handle_name_param(params, model, context) 

2868 self._handle_bucket_param(params, model, context) 

2869 

2870 def _get_arn_details_from_param(self, params, param_name): 

2871 if param_name not in params: 

2872 return None 

2873 try: 

2874 arn = params[param_name] 

2875 arn_details = self._arn_parser.parse_arn(arn) 

2876 arn_details['original'] = arn 

2877 arn_details['resources'] = self._split_resource(arn_details) 

2878 return arn_details 

2879 except InvalidArnException: 

2880 return None 

2881 

2882 def _split_resource(self, arn_details): 

2883 return self._RESOURCE_SPLIT_REGEX.split(arn_details['resource']) 

2884 

2885 def _override_account_id_param(self, params, arn_details): 

2886 account_id = arn_details['account'] 

2887 if 'AccountId' in params and params['AccountId'] != account_id: 

2888 error_msg = ( 

2889 'Account ID in arn does not match the AccountId parameter ' 

2890 'provided: "%s"' 

2891 ) % params['AccountId'] 

2892 raise UnsupportedS3ControlArnError( 

2893 arn=arn_details['original'], 

2894 msg=error_msg, 

2895 ) 

2896 params['AccountId'] = account_id 

2897 

2898 def _handle_outpost_id_param(self, params, model, context): 

2899 if 'OutpostId' not in params: 

2900 return 

2901 context['outpost_id'] = params['OutpostId'] 

2902 

2903 def _handle_name_param(self, params, model, context): 

2904 # CreateAccessPoint is a special case that does not expand Name 

2905 if model.name == 'CreateAccessPoint': 

2906 return 

2907 arn_details = self._get_arn_details_from_param(params, 'Name') 

2908 if arn_details is None: 

2909 return 

2910 if self._is_outpost_accesspoint(arn_details): 

2911 self._store_outpost_accesspoint(params, context, arn_details) 

2912 else: 

2913 error_msg = 'The Name parameter does not support the provided ARN' 

2914 raise UnsupportedS3ControlArnError( 

2915 arn=arn_details['original'], 

2916 msg=error_msg, 

2917 ) 

2918 

2919 def _is_outpost_accesspoint(self, arn_details): 

2920 if arn_details['service'] != 's3-outposts': 

2921 return False 

2922 resources = arn_details['resources'] 

2923 if len(resources) != 4: 

2924 return False 

2925 # Resource must be of the form outpost/op-123/accesspoint/name 

2926 return resources[0] == 'outpost' and resources[2] == 'accesspoint' 

2927 

2928 def _store_outpost_accesspoint(self, params, context, arn_details): 

2929 self._override_account_id_param(params, arn_details) 

2930 accesspoint_name = arn_details['resources'][3] 

2931 params['Name'] = accesspoint_name 

2932 arn_details['accesspoint_name'] = accesspoint_name 

2933 arn_details['outpost_name'] = arn_details['resources'][1] 

2934 context['arn_details'] = arn_details 

2935 

2936 def _handle_bucket_param(self, params, model, context): 

2937 arn_details = self._get_arn_details_from_param(params, 'Bucket') 

2938 if arn_details is None: 

2939 return 

2940 if self._is_outpost_bucket(arn_details): 

2941 self._store_outpost_bucket(params, context, arn_details) 

2942 else: 

2943 error_msg = ( 

2944 'The Bucket parameter does not support the provided ARN' 

2945 ) 

2946 raise UnsupportedS3ControlArnError( 

2947 arn=arn_details['original'], 

2948 msg=error_msg, 

2949 ) 

2950 

2951 def _is_outpost_bucket(self, arn_details): 

2952 if arn_details['service'] != 's3-outposts': 

2953 return False 

2954 resources = arn_details['resources'] 

2955 if len(resources) != 4: 

2956 return False 

2957 # Resource must be of the form outpost/op-123/bucket/name 

2958 return resources[0] == 'outpost' and resources[2] == 'bucket' 

2959 

2960 def _store_outpost_bucket(self, params, context, arn_details): 

2961 self._override_account_id_param(params, arn_details) 

2962 bucket_name = arn_details['resources'][3] 

2963 params['Bucket'] = bucket_name 

2964 arn_details['bucket_name'] = bucket_name 

2965 arn_details['outpost_name'] = arn_details['resources'][1] 

2966 context['arn_details'] = arn_details 

2967 

2968 

2969class S3ControlArnParamHandlerv2(S3ControlArnParamHandler): 

2970 """Updated version of S3ControlArnParamHandler for use when 

2971 EndpointRulesetResolver is in use for endpoint resolution. 

2972 

2973 This class is considered private and subject to abrupt breaking changes or 

2974 removal without prior announcement. Please do not use it directly. 

2975 """ 

2976 

2977 def __init__(self, arn_parser=None): 

2978 self._arn_parser = arn_parser 

2979 if arn_parser is None: 

2980 self._arn_parser = ArnParser() 

2981 

2982 def register(self, event_emitter): 

2983 event_emitter.register( 

2984 'before-endpoint-resolution.s3-control', 

2985 self.handle_arn, 

2986 ) 

2987 

2988 def _handle_name_param(self, params, model, context): 

2989 # CreateAccessPoint is a special case that does not expand Name 

2990 if model.name == 'CreateAccessPoint': 

2991 return 

2992 arn_details = self._get_arn_details_from_param(params, 'Name') 

2993 if arn_details is None: 

2994 return 

2995 self._raise_for_fips_pseudo_region(arn_details) 

2996 self._raise_for_accelerate_endpoint(context) 

2997 if self._is_outpost_accesspoint(arn_details): 

2998 self._store_outpost_accesspoint(params, context, arn_details) 

2999 else: 

3000 error_msg = 'The Name parameter does not support the provided ARN' 

3001 raise UnsupportedS3ControlArnError( 

3002 arn=arn_details['original'], 

3003 msg=error_msg, 

3004 ) 

3005 

3006 def _store_outpost_accesspoint(self, params, context, arn_details): 

3007 self._override_account_id_param(params, arn_details) 

3008 

3009 def _handle_bucket_param(self, params, model, context): 

3010 arn_details = self._get_arn_details_from_param(params, 'Bucket') 

3011 if arn_details is None: 

3012 return 

3013 self._raise_for_fips_pseudo_region(arn_details) 

3014 self._raise_for_accelerate_endpoint(context) 

3015 if self._is_outpost_bucket(arn_details): 

3016 self._store_outpost_bucket(params, context, arn_details) 

3017 else: 

3018 error_msg = ( 

3019 'The Bucket parameter does not support the provided ARN' 

3020 ) 

3021 raise UnsupportedS3ControlArnError( 

3022 arn=arn_details['original'], 

3023 msg=error_msg, 

3024 ) 

3025 

3026 def _store_outpost_bucket(self, params, context, arn_details): 

3027 self._override_account_id_param(params, arn_details) 

3028 

3029 def _raise_for_fips_pseudo_region(self, arn_details): 

3030 # FIPS pseudo region names cannot be used in ARNs 

3031 arn_region = arn_details['region'] 

3032 if arn_region.startswith('fips-') or arn_region.endswith('fips-'): 

3033 raise UnsupportedS3ControlArnError( 

3034 arn=arn_details['original'], 

3035 msg='Invalid ARN, FIPS region not allowed in ARN.', 

3036 ) 

3037 

3038 def _raise_for_accelerate_endpoint(self, context): 

3039 s3_config = context['client_config'].s3 or {} 

3040 if s3_config.get('use_accelerate_endpoint'): 

3041 raise UnsupportedS3ControlConfigurationError( 

3042 msg='S3 control client does not support accelerate endpoints', 

3043 ) 

3044 

3045 

3046class ContainerMetadataFetcher: 

3047 TIMEOUT_SECONDS = 2 

3048 RETRY_ATTEMPTS = 3 

3049 SLEEP_TIME = 1 

3050 IP_ADDRESS = '169.254.170.2' 

3051 _ALLOWED_HOSTS = [ 

3052 IP_ADDRESS, 

3053 '169.254.170.23', 

3054 'fd00:ec2::23', 

3055 'localhost', 

3056 ] 

3057 

3058 def __init__(self, session=None, sleep=time.sleep): 

3059 if session is None: 

3060 session = botocore.httpsession.URLLib3Session( 

3061 timeout=self.TIMEOUT_SECONDS 

3062 ) 

3063 self._session = session 

3064 self._sleep = sleep 

3065 

3066 def retrieve_full_uri(self, full_url, headers=None): 

3067 """Retrieve JSON metadata from container metadata. 

3068 

3069 :type full_url: str 

3070 :param full_url: The full URL of the metadata service. 

3071 This should include the scheme as well, e.g 

3072 "http://localhost:123/foo" 

3073 

3074 """ 

3075 self._validate_allowed_url(full_url) 

3076 return self._retrieve_credentials(full_url, headers) 

3077 

3078 def _validate_allowed_url(self, full_url): 

3079 parsed = botocore.compat.urlparse(full_url) 

3080 if self._is_loopback_address(parsed.hostname): 

3081 return 

3082 is_whitelisted_host = self._check_if_whitelisted_host(parsed.hostname) 

3083 if not is_whitelisted_host: 

3084 raise ValueError( 

3085 f"Unsupported host '{parsed.hostname}'. Can only retrieve metadata " 

3086 f"from a loopback address or one of these hosts: {', '.join(self._ALLOWED_HOSTS)}" 

3087 ) 

3088 

3089 def _is_loopback_address(self, hostname): 

3090 try: 

3091 ip = ip_address(hostname) 

3092 return ip.is_loopback 

3093 except ValueError: 

3094 return False 

3095 

3096 def _check_if_whitelisted_host(self, host): 

3097 if host in self._ALLOWED_HOSTS: 

3098 return True 

3099 return False 

3100 

3101 def retrieve_uri(self, relative_uri): 

3102 """Retrieve JSON metadata from container metadata. 

3103 

3104 :type relative_uri: str 

3105 :param relative_uri: A relative URI, e.g "/foo/bar?id=123" 

3106 

3107 :return: The parsed JSON response. 

3108 

3109 """ 

3110 full_url = self.full_url(relative_uri) 

3111 return self._retrieve_credentials(full_url) 

3112 

3113 def _retrieve_credentials(self, full_url, extra_headers=None): 

3114 headers = {'Accept': 'application/json'} 

3115 if extra_headers is not None: 

3116 headers.update(extra_headers) 

3117 attempts = 0 

3118 while True: 

3119 try: 

3120 return self._get_response( 

3121 full_url, headers, self.TIMEOUT_SECONDS 

3122 ) 

3123 except MetadataRetrievalError as e: 

3124 logger.debug( 

3125 "Received error when attempting to retrieve " 

3126 "container metadata: %s", 

3127 e, 

3128 exc_info=True, 

3129 ) 

3130 self._sleep(self.SLEEP_TIME) 

3131 attempts += 1 

3132 if attempts >= self.RETRY_ATTEMPTS: 

3133 raise 

3134 

3135 def _get_response(self, full_url, headers, timeout): 

3136 try: 

3137 AWSRequest = botocore.awsrequest.AWSRequest 

3138 request = AWSRequest(method='GET', url=full_url, headers=headers) 

3139 response = self._session.send(request.prepare()) 

3140 response_text = response.content.decode('utf-8') 

3141 if response.status_code != 200: 

3142 raise MetadataRetrievalError( 

3143 error_msg=( 

3144 f"Received non 200 response {response.status_code} " 

3145 f"from container metadata: {response_text}" 

3146 ) 

3147 ) 

3148 try: 

3149 return json.loads(response_text) 

3150 except ValueError: 

3151 error_msg = "Unable to parse JSON returned from container metadata services" 

3152 logger.debug('%s:%s', error_msg, response_text) 

3153 raise MetadataRetrievalError(error_msg=error_msg) 

3154 except RETRYABLE_HTTP_ERRORS as e: 

3155 error_msg = ( 

3156 "Received error when attempting to retrieve " 

3157 f"container metadata: {e}" 

3158 ) 

3159 raise MetadataRetrievalError(error_msg=error_msg) 

3160 

3161 def full_url(self, relative_uri): 

3162 return f'http://{self.IP_ADDRESS}{relative_uri}' 

3163 

3164 

3165def get_environ_proxies(url): 

3166 if should_bypass_proxies(url): 

3167 return {} 

3168 else: 

3169 return getproxies() 

3170 

3171 

3172def should_bypass_proxies(url): 

3173 """ 

3174 Returns whether we should bypass proxies or not. 

3175 """ 

3176 # NOTE: requests allowed for ip/cidr entries in no_proxy env that we don't 

3177 # support current as urllib only checks DNS suffix 

3178 # If the system proxy settings indicate that this URL should be bypassed, 

3179 # don't proxy. 

3180 # The proxy_bypass function is incredibly buggy on OS X in early versions 

3181 # of Python 2.6, so allow this call to fail. Only catch the specific 

3182 # exceptions we've seen, though: this call failing in other ways can reveal 

3183 # legitimate problems. 

3184 try: 

3185 if proxy_bypass(urlparse(url).netloc): 

3186 return True 

3187 except (TypeError, socket.gaierror): 

3188 pass 

3189 

3190 return False 

3191 

3192 

3193def determine_content_length(body): 

3194 # No body, content length of 0 

3195 if not body: 

3196 return 0 

3197 

3198 # Try asking the body for it's length 

3199 try: 

3200 return len(body) 

3201 except (AttributeError, TypeError): 

3202 pass 

3203 

3204 # Try getting the length from a seekable stream 

3205 if hasattr(body, 'seek') and hasattr(body, 'tell'): 

3206 try: 

3207 orig_pos = body.tell() 

3208 body.seek(0, 2) 

3209 end_file_pos = body.tell() 

3210 body.seek(orig_pos) 

3211 return end_file_pos - orig_pos 

3212 except io.UnsupportedOperation: 

3213 # in case when body is, for example, io.BufferedIOBase object 

3214 # it has "seek" method which throws "UnsupportedOperation" 

3215 # exception in such case we want to fall back to "chunked" 

3216 # encoding 

3217 pass 

3218 # Failed to determine the length 

3219 return None 

3220 

3221 

3222def get_encoding_from_headers(headers, default='ISO-8859-1'): 

3223 """Returns encodings from given HTTP Header Dict. 

3224 

3225 :param headers: dictionary to extract encoding from. 

3226 :param default: default encoding if the content-type is text 

3227 """ 

3228 

3229 content_type = headers.get('content-type') 

3230 

3231 if not content_type: 

3232 return None 

3233 

3234 message = email.message.Message() 

3235 message['content-type'] = content_type 

3236 charset = message.get_param("charset") 

3237 

3238 if charset is not None: 

3239 return charset 

3240 

3241 if 'text' in content_type: 

3242 return default 

3243 

3244 

3245def calculate_md5(body, **kwargs): 

3246 if isinstance(body, (bytes, bytearray)): 

3247 binary_md5 = _calculate_md5_from_bytes(body) 

3248 else: 

3249 binary_md5 = _calculate_md5_from_file(body) 

3250 return base64.b64encode(binary_md5).decode('ascii') 

3251 

3252 

3253def _calculate_md5_from_bytes(body_bytes): 

3254 md5 = get_md5(body_bytes) 

3255 return md5.digest() 

3256 

3257 

3258def _calculate_md5_from_file(fileobj): 

3259 start_position = fileobj.tell() 

3260 md5 = get_md5() 

3261 for chunk in iter(lambda: fileobj.read(1024 * 1024), b''): 

3262 md5.update(chunk) 

3263 fileobj.seek(start_position) 

3264 return md5.digest() 

3265 

3266 

3267def _is_s3express_request(params): 

3268 endpoint_properties = params.get('context', {}).get( 

3269 'endpoint_properties', {} 

3270 ) 

3271 return endpoint_properties.get('backend') == 'S3Express' 

3272 

3273 

3274def _has_checksum_header(params): 

3275 headers = params['headers'] 

3276 # If a user provided Content-MD5 is present, 

3277 # don't try to compute a new one. 

3278 if 'Content-MD5' in headers: 

3279 return True 

3280 

3281 # If a header matching the x-amz-checksum-* pattern is present, we 

3282 # assume a checksum has already been provided and an md5 is not needed 

3283 for header in headers: 

3284 if CHECKSUM_HEADER_PATTERN.match(header): 

3285 return True 

3286 

3287 return False 

3288 

3289 

3290def conditionally_calculate_checksum(params, **kwargs): 

3291 if not _has_checksum_header(params): 

3292 conditionally_calculate_md5(params, **kwargs) 

3293 conditionally_enable_crc32(params, **kwargs) 

3294 

3295 

3296def conditionally_enable_crc32(params, **kwargs): 

3297 checksum_context = params.get('context', {}).get('checksum', {}) 

3298 checksum_algorithm = checksum_context.get('request_algorithm') 

3299 if ( 

3300 _is_s3express_request(params) 

3301 and params['body'] is not None 

3302 and checksum_algorithm in (None, "conditional-md5") 

3303 ): 

3304 params['context']['checksum'] = { 

3305 'request_algorithm': { 

3306 'algorithm': 'crc32', 

3307 'in': 'header', 

3308 'name': 'x-amz-checksum-crc32', 

3309 } 

3310 } 

3311 

3312 

3313def conditionally_calculate_md5(params, **kwargs): 

3314 """Only add a Content-MD5 if the system supports it.""" 

3315 body = params['body'] 

3316 checksum_context = params.get('context', {}).get('checksum', {}) 

3317 checksum_algorithm = checksum_context.get('request_algorithm') 

3318 if checksum_algorithm and checksum_algorithm != 'conditional-md5': 

3319 # Skip for requests that will have a flexible checksum applied 

3320 return 

3321 

3322 if _has_checksum_header(params): 

3323 # Don't add a new header if one is already available. 

3324 return 

3325 

3326 if _is_s3express_request(params): 

3327 # S3Express doesn't support MD5 

3328 return 

3329 

3330 if MD5_AVAILABLE and body is not None: 

3331 md5_digest = calculate_md5(body, **kwargs) 

3332 params['headers']['Content-MD5'] = md5_digest 

3333 

3334 

3335class FileWebIdentityTokenLoader: 

3336 def __init__(self, web_identity_token_path, _open=open): 

3337 self._web_identity_token_path = web_identity_token_path 

3338 self._open = _open 

3339 

3340 def __call__(self): 

3341 with self._open(self._web_identity_token_path) as token_file: 

3342 return token_file.read() 

3343 

3344 

3345class SSOTokenLoader: 

3346 def __init__(self, cache=None): 

3347 if cache is None: 

3348 cache = {} 

3349 self._cache = cache 

3350 

3351 def _generate_cache_key(self, start_url, session_name): 

3352 input_str = start_url 

3353 if session_name is not None: 

3354 input_str = session_name 

3355 return hashlib.sha1(input_str.encode('utf-8')).hexdigest() 

3356 

3357 def save_token(self, start_url, token, session_name=None): 

3358 cache_key = self._generate_cache_key(start_url, session_name) 

3359 self._cache[cache_key] = token 

3360 

3361 def __call__(self, start_url, session_name=None): 

3362 cache_key = self._generate_cache_key(start_url, session_name) 

3363 logger.debug(f'Checking for cached token at: {cache_key}') 

3364 if cache_key not in self._cache: 

3365 name = start_url 

3366 if session_name is not None: 

3367 name = session_name 

3368 error_msg = f'Token for {name} does not exist' 

3369 raise SSOTokenLoadError(error_msg=error_msg) 

3370 

3371 token = self._cache[cache_key] 

3372 if 'accessToken' not in token or 'expiresAt' not in token: 

3373 error_msg = f'Token for {start_url} is invalid' 

3374 raise SSOTokenLoadError(error_msg=error_msg) 

3375 return token 

3376 

3377 

3378class EventbridgeSignerSetter: 

3379 _DEFAULT_PARTITION = 'aws' 

3380 _DEFAULT_DNS_SUFFIX = 'amazonaws.com' 

3381 

3382 def __init__(self, endpoint_resolver, region=None, endpoint_url=None): 

3383 self._endpoint_resolver = endpoint_resolver 

3384 self._region = region 

3385 self._endpoint_url = endpoint_url 

3386 

3387 def register(self, event_emitter): 

3388 event_emitter.register( 

3389 'before-parameter-build.events.PutEvents', 

3390 self.check_for_global_endpoint, 

3391 ) 

3392 event_emitter.register( 

3393 'before-call.events.PutEvents', self.set_endpoint_url 

3394 ) 

3395 

3396 def set_endpoint_url(self, params, context, **kwargs): 

3397 if 'eventbridge_endpoint' in context: 

3398 endpoint = context['eventbridge_endpoint'] 

3399 logger.debug(f"Rewriting URL from {params['url']} to {endpoint}") 

3400 params['url'] = endpoint 

3401 

3402 def check_for_global_endpoint(self, params, context, **kwargs): 

3403 endpoint = params.get('EndpointId') 

3404 if endpoint is None: 

3405 return 

3406 

3407 if len(endpoint) == 0: 

3408 raise InvalidEndpointConfigurationError( 

3409 msg='EndpointId must not be a zero length string' 

3410 ) 

3411 

3412 if not HAS_CRT: 

3413 raise MissingDependencyException( 

3414 msg="Using EndpointId requires an additional " 

3415 "dependency. You will need to pip install " 

3416 "botocore[crt] before proceeding." 

3417 ) 

3418 

3419 config = context.get('client_config') 

3420 endpoint_variant_tags = None 

3421 if config is not None: 

3422 if config.use_fips_endpoint: 

3423 raise InvalidEndpointConfigurationError( 

3424 msg="FIPS is not supported with EventBridge " 

3425 "multi-region endpoints." 

3426 ) 

3427 if config.use_dualstack_endpoint: 

3428 endpoint_variant_tags = ['dualstack'] 

3429 

3430 if self._endpoint_url is None: 

3431 # Validate endpoint is a valid hostname component 

3432 parts = urlparse(f'https://{endpoint}') 

3433 if parts.hostname != endpoint: 

3434 raise InvalidEndpointConfigurationError( 

3435 msg='EndpointId is not a valid hostname component.' 

3436 ) 

3437 resolved_endpoint = self._get_global_endpoint( 

3438 endpoint, endpoint_variant_tags=endpoint_variant_tags 

3439 ) 

3440 else: 

3441 resolved_endpoint = self._endpoint_url 

3442 

3443 context['eventbridge_endpoint'] = resolved_endpoint 

3444 context['auth_type'] = 'v4a' 

3445 

3446 def _get_global_endpoint(self, endpoint, endpoint_variant_tags=None): 

3447 resolver = self._endpoint_resolver 

3448 

3449 partition = resolver.get_partition_for_region(self._region) 

3450 if partition is None: 

3451 partition = self._DEFAULT_PARTITION 

3452 dns_suffix = resolver.get_partition_dns_suffix( 

3453 partition, endpoint_variant_tags=endpoint_variant_tags 

3454 ) 

3455 if dns_suffix is None: 

3456 dns_suffix = self._DEFAULT_DNS_SUFFIX 

3457 

3458 return f"https://{endpoint}.endpoint.events.{dns_suffix}/" 

3459 

3460 

3461def is_s3_accelerate_url(url): 

3462 """Does the URL match the S3 Accelerate endpoint scheme? 

3463 

3464 Virtual host naming style with bucket names in the netloc part of the URL 

3465 are not allowed by this function. 

3466 """ 

3467 if url is None: 

3468 return False 

3469 

3470 # Accelerate is only valid for Amazon endpoints. 

3471 url_parts = urlsplit(url) 

3472 if not url_parts.netloc.endswith( 

3473 'amazonaws.com' 

3474 ) or url_parts.scheme not in ['https', 'http']: 

3475 return False 

3476 

3477 # The first part of the URL must be s3-accelerate. 

3478 parts = url_parts.netloc.split('.') 

3479 if parts[0] != 's3-accelerate': 

3480 return False 

3481 

3482 # Url parts between 's3-accelerate' and 'amazonaws.com' which 

3483 # represent different url features. 

3484 feature_parts = parts[1:-2] 

3485 

3486 # There should be no duplicate URL parts. 

3487 if len(feature_parts) != len(set(feature_parts)): 

3488 return False 

3489 

3490 # Remaining parts must all be in the whitelist. 

3491 return all(p in S3_ACCELERATE_WHITELIST for p in feature_parts) 

3492 

3493 

3494class JSONFileCache: 

3495 """JSON file cache. 

3496 This provides a dict like interface that stores JSON serializable 

3497 objects. 

3498 The objects are serialized to JSON and stored in a file. These 

3499 values can be retrieved at a later time. 

3500 """ 

3501 

3502 CACHE_DIR = os.path.expanduser(os.path.join('~', '.aws', 'boto', 'cache')) 

3503 

3504 def __init__(self, working_dir=CACHE_DIR, dumps_func=None): 

3505 self._working_dir = working_dir 

3506 if dumps_func is None: 

3507 dumps_func = self._default_dumps 

3508 self._dumps = dumps_func 

3509 

3510 def _default_dumps(self, obj): 

3511 return json.dumps(obj, default=self._serialize_if_needed) 

3512 

3513 def __contains__(self, cache_key): 

3514 actual_key = self._convert_cache_key(cache_key) 

3515 return os.path.isfile(actual_key) 

3516 

3517 def __getitem__(self, cache_key): 

3518 """Retrieve value from a cache key.""" 

3519 actual_key = self._convert_cache_key(cache_key) 

3520 try: 

3521 with open(actual_key) as f: 

3522 return json.load(f) 

3523 except (OSError, ValueError): 

3524 raise KeyError(cache_key) 

3525 

3526 def __delitem__(self, cache_key): 

3527 actual_key = self._convert_cache_key(cache_key) 

3528 try: 

3529 key_path = Path(actual_key) 

3530 key_path.unlink() 

3531 except FileNotFoundError: 

3532 raise KeyError(cache_key) 

3533 

3534 def __setitem__(self, cache_key, value): 

3535 full_key = self._convert_cache_key(cache_key) 

3536 try: 

3537 file_content = self._dumps(value) 

3538 except (TypeError, ValueError): 

3539 raise ValueError( 

3540 f"Value cannot be cached, must be " 

3541 f"JSON serializable: {value}" 

3542 ) 

3543 if not os.path.isdir(self._working_dir): 

3544 os.makedirs(self._working_dir) 

3545 with os.fdopen( 

3546 os.open(full_key, os.O_WRONLY | os.O_CREAT, 0o600), 'w' 

3547 ) as f: 

3548 f.truncate() 

3549 f.write(file_content) 

3550 

3551 def _convert_cache_key(self, cache_key): 

3552 full_path = os.path.join(self._working_dir, cache_key + '.json') 

3553 return full_path 

3554 

3555 def _serialize_if_needed(self, value, iso=False): 

3556 if isinstance(value, _DatetimeClass): 

3557 if iso: 

3558 return value.isoformat() 

3559 return value.strftime('%Y-%m-%dT%H:%M:%S%Z') 

3560 return value 

3561 

3562 

3563def is_s3express_bucket(bucket): 

3564 if bucket is None: 

3565 return False 

3566 return bucket.endswith('--x-s3') 

3567 

3568 

3569# This parameter is not part of the public interface and is subject to abrupt 

3570# breaking changes or removal without prior announcement. 

3571# Mapping of services that have been renamed for backwards compatibility reasons. 

3572# Keys are the previous name that should be allowed, values are the documented 

3573# and preferred client name. 

3574SERVICE_NAME_ALIASES = {'runtime.sagemaker': 'sagemaker-runtime'} 

3575 

3576 

3577# This parameter is not part of the public interface and is subject to abrupt 

3578# breaking changes or removal without prior announcement. 

3579# Mapping to determine the service ID for services that do not use it as the 

3580# model data directory name. The keys are the data directory name and the 

3581# values are the transformed service IDs (lower case and hyphenated). 

3582CLIENT_NAME_TO_HYPHENIZED_SERVICE_ID_OVERRIDES = { 

3583 # Actual service name we use -> Allowed computed service name. 

3584 'alexaforbusiness': 'alexa-for-business', 

3585 'apigateway': 'api-gateway', 

3586 'application-autoscaling': 'application-auto-scaling', 

3587 'appmesh': 'app-mesh', 

3588 'autoscaling': 'auto-scaling', 

3589 'autoscaling-plans': 'auto-scaling-plans', 

3590 'ce': 'cost-explorer', 

3591 'cloudhsmv2': 'cloudhsm-v2', 

3592 'cloudsearchdomain': 'cloudsearch-domain', 

3593 'cognito-idp': 'cognito-identity-provider', 

3594 'config': 'config-service', 

3595 'cur': 'cost-and-usage-report-service', 

3596 'datapipeline': 'data-pipeline', 

3597 'directconnect': 'direct-connect', 

3598 'devicefarm': 'device-farm', 

3599 'discovery': 'application-discovery-service', 

3600 'dms': 'database-migration-service', 

3601 'ds': 'directory-service', 

3602 'dynamodbstreams': 'dynamodb-streams', 

3603 'elasticbeanstalk': 'elastic-beanstalk', 

3604 'elastictranscoder': 'elastic-transcoder', 

3605 'elb': 'elastic-load-balancing', 

3606 'elbv2': 'elastic-load-balancing-v2', 

3607 'es': 'elasticsearch-service', 

3608 'events': 'eventbridge', 

3609 'globalaccelerator': 'global-accelerator', 

3610 'iot-data': 'iot-data-plane', 

3611 'iot-jobs-data': 'iot-jobs-data-plane', 

3612 'iot1click-devices': 'iot-1click-devices-service', 

3613 'iot1click-projects': 'iot-1click-projects', 

3614 'iotevents-data': 'iot-events-data', 

3615 'iotevents': 'iot-events', 

3616 'iotwireless': 'iot-wireless', 

3617 'kinesisanalytics': 'kinesis-analytics', 

3618 'kinesisanalyticsv2': 'kinesis-analytics-v2', 

3619 'kinesisvideo': 'kinesis-video', 

3620 'lex-models': 'lex-model-building-service', 

3621 'lexv2-models': 'lex-models-v2', 

3622 'lex-runtime': 'lex-runtime-service', 

3623 'lexv2-runtime': 'lex-runtime-v2', 

3624 'logs': 'cloudwatch-logs', 

3625 'machinelearning': 'machine-learning', 

3626 'marketplacecommerceanalytics': 'marketplace-commerce-analytics', 

3627 'marketplace-entitlement': 'marketplace-entitlement-service', 

3628 'meteringmarketplace': 'marketplace-metering', 

3629 'mgh': 'migration-hub', 

3630 'sms-voice': 'pinpoint-sms-voice', 

3631 'resourcegroupstaggingapi': 'resource-groups-tagging-api', 

3632 'route53': 'route-53', 

3633 'route53domains': 'route-53-domains', 

3634 's3control': 's3-control', 

3635 'sdb': 'simpledb', 

3636 'secretsmanager': 'secrets-manager', 

3637 'serverlessrepo': 'serverlessapplicationrepository', 

3638 'servicecatalog': 'service-catalog', 

3639 'servicecatalog-appregistry': 'service-catalog-appregistry', 

3640 'stepfunctions': 'sfn', 

3641 'storagegateway': 'storage-gateway', 

3642}