Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/botocore/utils.py: 20%

1562 statements  

« prev     ^ index     » next       coverage.py v7.2.2, created at 2023-03-26 06:03 +0000

1# Copyright 2012-2014 Amazon.com, Inc. or its affiliates. All Rights Reserved. 

2# 

3# Licensed under the Apache License, Version 2.0 (the "License"). You 

4# may not use this file except in compliance with the License. A copy of 

5# the License is located at 

6# 

7# http://aws.amazon.com/apache2.0/ 

8# 

9# or in the "license" file accompanying this file. This file is 

10# distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF 

11# ANY KIND, either express or implied. See the License for the specific 

12# language governing permissions and limitations under the License. 

13import base64 

14import binascii 

15import datetime 

16import email.message 

17import functools 

18import hashlib 

19import io 

20import logging 

21import os 

22import random 

23import re 

24import socket 

25import time 

26import warnings 

27import weakref 

28from pathlib import Path 

29from urllib.request import getproxies, proxy_bypass 

30 

31import dateutil.parser 

32from dateutil.tz import tzutc 

33from urllib3.exceptions import LocationParseError 

34 

35import botocore 

36import botocore.awsrequest 

37import botocore.httpsession 

38 

39# IP Regexes retained for backwards compatibility 

40from botocore.compat import HEX_PAT # noqa: F401 

41from botocore.compat import IPV4_PAT # noqa: F401 

42from botocore.compat import IPV6_ADDRZ_PAT # noqa: F401 

43from botocore.compat import IPV6_PAT # noqa: F401 

44from botocore.compat import LS32_PAT # noqa: F401 

45from botocore.compat import UNRESERVED_PAT # noqa: F401 

46from botocore.compat import ZONE_ID_PAT # noqa: F401 

47from botocore.compat import ( 

48 HAS_CRT, 

49 IPV4_RE, 

50 IPV6_ADDRZ_RE, 

51 MD5_AVAILABLE, 

52 UNSAFE_URL_CHARS, 

53 OrderedDict, 

54 get_md5, 

55 get_tzinfo_options, 

56 json, 

57 quote, 

58 urlparse, 

59 urlsplit, 

60 urlunsplit, 

61 zip_longest, 

62) 

63from botocore.exceptions import ( 

64 ClientError, 

65 ConfigNotFound, 

66 ConnectionClosedError, 

67 ConnectTimeoutError, 

68 EndpointConnectionError, 

69 HTTPClientError, 

70 InvalidDNSNameError, 

71 InvalidEndpointConfigurationError, 

72 InvalidExpressionError, 

73 InvalidHostLabelError, 

74 InvalidIMDSEndpointError, 

75 InvalidIMDSEndpointModeError, 

76 InvalidRegionError, 

77 MetadataRetrievalError, 

78 MissingDependencyException, 

79 ReadTimeoutError, 

80 SSOTokenLoadError, 

81 UnsupportedOutpostResourceError, 

82 UnsupportedS3AccesspointConfigurationError, 

83 UnsupportedS3ArnError, 

84 UnsupportedS3ConfigurationError, 

85 UnsupportedS3ControlArnError, 

86 UnsupportedS3ControlConfigurationError, 

87) 

88 

89logger = logging.getLogger(__name__) 

90DEFAULT_METADATA_SERVICE_TIMEOUT = 1 

91METADATA_BASE_URL = 'http://169.254.169.254/' 

92METADATA_BASE_URL_IPv6 = 'http://[fd00:ec2::254]/' 

93METADATA_ENDPOINT_MODES = ('ipv4', 'ipv6') 

94 

95# These are chars that do not need to be urlencoded. 

96# Based on rfc2986, section 2.3 

97SAFE_CHARS = '-._~' 

98LABEL_RE = re.compile(r'[a-z0-9][a-z0-9\-]*[a-z0-9]') 

99RETRYABLE_HTTP_ERRORS = ( 

100 ReadTimeoutError, 

101 EndpointConnectionError, 

102 ConnectionClosedError, 

103 ConnectTimeoutError, 

104) 

105S3_ACCELERATE_WHITELIST = ['dualstack'] 

106# In switching events from using service name / endpoint prefix to service 

107# id, we have to preserve compatibility. This maps the instances where either 

108# is different than the transformed service id. 

109EVENT_ALIASES = { 

110 "a4b": "alexa-for-business", 

111 "alexaforbusiness": "alexa-for-business", 

112 "api.mediatailor": "mediatailor", 

113 "api.pricing": "pricing", 

114 "api.sagemaker": "sagemaker", 

115 "apigateway": "api-gateway", 

116 "application-autoscaling": "application-auto-scaling", 

117 "appstream2": "appstream", 

118 "autoscaling": "auto-scaling", 

119 "autoscaling-plans": "auto-scaling-plans", 

120 "ce": "cost-explorer", 

121 "cloudhsmv2": "cloudhsm-v2", 

122 "cloudsearchdomain": "cloudsearch-domain", 

123 "cognito-idp": "cognito-identity-provider", 

124 "config": "config-service", 

125 "cur": "cost-and-usage-report-service", 

126 "data.iot": "iot-data-plane", 

127 "data.jobs.iot": "iot-jobs-data-plane", 

128 "data.mediastore": "mediastore-data", 

129 "datapipeline": "data-pipeline", 

130 "devicefarm": "device-farm", 

131 "devices.iot1click": "iot-1click-devices-service", 

132 "directconnect": "direct-connect", 

133 "discovery": "application-discovery-service", 

134 "dms": "database-migration-service", 

135 "ds": "directory-service", 

136 "dynamodbstreams": "dynamodb-streams", 

137 "elasticbeanstalk": "elastic-beanstalk", 

138 "elasticfilesystem": "efs", 

139 "elasticloadbalancing": "elastic-load-balancing", 

140 "elasticmapreduce": "emr", 

141 "elastictranscoder": "elastic-transcoder", 

142 "elb": "elastic-load-balancing", 

143 "elbv2": "elastic-load-balancing-v2", 

144 "email": "ses", 

145 "entitlement.marketplace": "marketplace-entitlement-service", 

146 "es": "elasticsearch-service", 

147 "events": "eventbridge", 

148 "cloudwatch-events": "eventbridge", 

149 "iot-data": "iot-data-plane", 

150 "iot-jobs-data": "iot-jobs-data-plane", 

151 "iot1click-devices": "iot-1click-devices-service", 

152 "iot1click-projects": "iot-1click-projects", 

153 "kinesisanalytics": "kinesis-analytics", 

154 "kinesisvideo": "kinesis-video", 

155 "lex-models": "lex-model-building-service", 

156 "lex-runtime": "lex-runtime-service", 

157 "logs": "cloudwatch-logs", 

158 "machinelearning": "machine-learning", 

159 "marketplace-entitlement": "marketplace-entitlement-service", 

160 "marketplacecommerceanalytics": "marketplace-commerce-analytics", 

161 "metering.marketplace": "marketplace-metering", 

162 "meteringmarketplace": "marketplace-metering", 

163 "mgh": "migration-hub", 

164 "models.lex": "lex-model-building-service", 

165 "monitoring": "cloudwatch", 

166 "mturk-requester": "mturk", 

167 "opsworks-cm": "opsworkscm", 

168 "projects.iot1click": "iot-1click-projects", 

169 "resourcegroupstaggingapi": "resource-groups-tagging-api", 

170 "route53": "route-53", 

171 "route53domains": "route-53-domains", 

172 "runtime.lex": "lex-runtime-service", 

173 "runtime.sagemaker": "sagemaker-runtime", 

174 "sdb": "simpledb", 

175 "secretsmanager": "secrets-manager", 

176 "serverlessrepo": "serverlessapplicationrepository", 

177 "servicecatalog": "service-catalog", 

178 "states": "sfn", 

179 "stepfunctions": "sfn", 

180 "storagegateway": "storage-gateway", 

181 "streams.dynamodb": "dynamodb-streams", 

182 "tagging": "resource-groups-tagging-api", 

183} 

184 

185 

186# This pattern can be used to detect if a header is a flexible checksum header 

187CHECKSUM_HEADER_PATTERN = re.compile( 

188 r'^X-Amz-Checksum-([a-z0-9]*)$', 

189 flags=re.IGNORECASE, 

190) 

191 

192 

193def ensure_boolean(val): 

194 """Ensures a boolean value if a string or boolean is provided 

195 

196 For strings, the value for True/False is case insensitive 

197 """ 

198 if isinstance(val, bool): 

199 return val 

200 elif isinstance(val, str): 

201 return val.lower() == 'true' 

202 else: 

203 return False 

204 

205 

206def resolve_imds_endpoint_mode(session): 

207 """Resolving IMDS endpoint mode to either IPv6 or IPv4. 

208 

209 ec2_metadata_service_endpoint_mode takes precedence over imds_use_ipv6. 

210 """ 

211 endpoint_mode = session.get_config_variable( 

212 'ec2_metadata_service_endpoint_mode' 

213 ) 

214 if endpoint_mode is not None: 

215 lendpoint_mode = endpoint_mode.lower() 

216 if lendpoint_mode not in METADATA_ENDPOINT_MODES: 

217 error_msg_kwargs = { 

218 'mode': endpoint_mode, 

219 'valid_modes': METADATA_ENDPOINT_MODES, 

220 } 

221 raise InvalidIMDSEndpointModeError(**error_msg_kwargs) 

222 return lendpoint_mode 

223 elif session.get_config_variable('imds_use_ipv6'): 

224 return 'ipv6' 

225 return 'ipv4' 

226 

227 

228def is_json_value_header(shape): 

229 """Determines if the provided shape is the special header type jsonvalue. 

230 

231 :type shape: botocore.shape 

232 :param shape: Shape to be inspected for the jsonvalue trait. 

233 

234 :return: True if this type is a jsonvalue, False otherwise 

235 :rtype: Bool 

236 """ 

237 return ( 

238 hasattr(shape, 'serialization') 

239 and shape.serialization.get('jsonvalue', False) 

240 and shape.serialization.get('location') == 'header' 

241 and shape.type_name == 'string' 

242 ) 

243 

244 

245def has_header(header_name, headers): 

246 """Case-insensitive check for header key.""" 

247 if header_name is None: 

248 return False 

249 elif isinstance(headers, botocore.awsrequest.HeadersDict): 

250 return header_name in headers 

251 else: 

252 return header_name.lower() in [key.lower() for key in headers.keys()] 

253 

254 

255def get_service_module_name(service_model): 

256 """Returns the module name for a service 

257 

258 This is the value used in both the documentation and client class name 

259 """ 

260 name = service_model.metadata.get( 

261 'serviceAbbreviation', 

262 service_model.metadata.get( 

263 'serviceFullName', service_model.service_name 

264 ), 

265 ) 

266 name = name.replace('Amazon', '') 

267 name = name.replace('AWS', '') 

268 name = re.sub(r'\W+', '', name) 

269 return name 

270 

271 

272def normalize_url_path(path): 

273 if not path: 

274 return '/' 

275 return remove_dot_segments(path) 

276 

277 

278def normalize_boolean(val): 

279 """Returns None if val is None, otherwise ensure value 

280 converted to boolean""" 

281 if val is None: 

282 return val 

283 else: 

284 return ensure_boolean(val) 

285 

286 

287def remove_dot_segments(url): 

288 # RFC 3986, section 5.2.4 "Remove Dot Segments" 

289 # Also, AWS services require consecutive slashes to be removed, 

290 # so that's done here as well 

291 if not url: 

292 return '' 

293 input_url = url.split('/') 

294 output_list = [] 

295 for x in input_url: 

296 if x and x != '.': 

297 if x == '..': 

298 if output_list: 

299 output_list.pop() 

300 else: 

301 output_list.append(x) 

302 

303 if url[0] == '/': 

304 first = '/' 

305 else: 

306 first = '' 

307 if url[-1] == '/' and output_list: 

308 last = '/' 

309 else: 

310 last = '' 

311 return first + '/'.join(output_list) + last 

312 

313 

314def validate_jmespath_for_set(expression): 

315 # Validates a limited jmespath expression to determine if we can set a 

316 # value based on it. Only works with dotted paths. 

317 if not expression or expression == '.': 

318 raise InvalidExpressionError(expression=expression) 

319 

320 for invalid in ['[', ']', '*']: 

321 if invalid in expression: 

322 raise InvalidExpressionError(expression=expression) 

323 

324 

325def set_value_from_jmespath(source, expression, value, is_first=True): 

326 # This takes a (limited) jmespath-like expression & can set a value based 

327 # on it. 

328 # Limitations: 

329 # * Only handles dotted lookups 

330 # * No offsets/wildcards/slices/etc. 

331 if is_first: 

332 validate_jmespath_for_set(expression) 

333 

334 bits = expression.split('.', 1) 

335 current_key, remainder = bits[0], bits[1] if len(bits) > 1 else '' 

336 

337 if not current_key: 

338 raise InvalidExpressionError(expression=expression) 

339 

340 if remainder: 

341 if current_key not in source: 

342 # We've got something in the expression that's not present in the 

343 # source (new key). If there's any more bits, we'll set the key 

344 # with an empty dictionary. 

345 source[current_key] = {} 

346 

347 return set_value_from_jmespath( 

348 source[current_key], remainder, value, is_first=False 

349 ) 

350 

351 # If we're down to a single key, set it. 

352 source[current_key] = value 

353 

354 

355def is_global_accesspoint(context): 

356 """Determine if request is intended for an MRAP accesspoint.""" 

357 s3_accesspoint = context.get('s3_accesspoint', {}) 

358 is_global = s3_accesspoint.get('region') == '' 

359 return is_global 

360 

361 

362class _RetriesExceededError(Exception): 

363 """Internal exception used when the number of retries are exceeded.""" 

364 

365 pass 

366 

367 

368class BadIMDSRequestError(Exception): 

369 def __init__(self, request): 

370 self.request = request 

371 

372 

373class IMDSFetcher: 

374 

375 _RETRIES_EXCEEDED_ERROR_CLS = _RetriesExceededError 

376 _TOKEN_PATH = 'latest/api/token' 

377 _TOKEN_TTL = '21600' 

378 

379 def __init__( 

380 self, 

381 timeout=DEFAULT_METADATA_SERVICE_TIMEOUT, 

382 num_attempts=1, 

383 base_url=METADATA_BASE_URL, 

384 env=None, 

385 user_agent=None, 

386 config=None, 

387 ): 

388 self._timeout = timeout 

389 self._num_attempts = num_attempts 

390 if config is None: 

391 config = {} 

392 self._base_url = self._select_base_url(base_url, config) 

393 self._config = config 

394 

395 if env is None: 

396 env = os.environ.copy() 

397 self._disabled = env.get('AWS_EC2_METADATA_DISABLED', 'false').lower() 

398 self._disabled = self._disabled == 'true' 

399 self._user_agent = user_agent 

400 self._session = botocore.httpsession.URLLib3Session( 

401 timeout=self._timeout, 

402 proxies=get_environ_proxies(self._base_url), 

403 ) 

404 

405 def get_base_url(self): 

406 return self._base_url 

407 

408 def _select_base_url(self, base_url, config): 

409 if config is None: 

410 config = {} 

411 

412 requires_ipv6 = ( 

413 config.get('ec2_metadata_service_endpoint_mode') == 'ipv6' 

414 ) 

415 custom_metadata_endpoint = config.get('ec2_metadata_service_endpoint') 

416 

417 if requires_ipv6 and custom_metadata_endpoint: 

418 logger.warning( 

419 "Custom endpoint and IMDS_USE_IPV6 are both set. Using custom endpoint." 

420 ) 

421 

422 chosen_base_url = None 

423 

424 if base_url != METADATA_BASE_URL: 

425 chosen_base_url = base_url 

426 elif custom_metadata_endpoint: 

427 chosen_base_url = custom_metadata_endpoint 

428 elif requires_ipv6: 

429 chosen_base_url = METADATA_BASE_URL_IPv6 

430 else: 

431 chosen_base_url = METADATA_BASE_URL 

432 

433 logger.debug("IMDS ENDPOINT: %s" % chosen_base_url) 

434 if not is_valid_uri(chosen_base_url): 

435 raise InvalidIMDSEndpointError(endpoint=chosen_base_url) 

436 

437 return chosen_base_url 

438 

439 def _construct_url(self, path): 

440 sep = '' 

441 if self._base_url and not self._base_url.endswith('/'): 

442 sep = '/' 

443 return f'{self._base_url}{sep}{path}' 

444 

445 def _fetch_metadata_token(self): 

446 self._assert_enabled() 

447 url = self._construct_url(self._TOKEN_PATH) 

448 headers = { 

449 'x-aws-ec2-metadata-token-ttl-seconds': self._TOKEN_TTL, 

450 } 

451 self._add_user_agent(headers) 

452 request = botocore.awsrequest.AWSRequest( 

453 method='PUT', url=url, headers=headers 

454 ) 

455 for i in range(self._num_attempts): 

456 try: 

457 response = self._session.send(request.prepare()) 

458 if response.status_code == 200: 

459 return response.text 

460 elif response.status_code in (404, 403, 405): 

461 return None 

462 elif response.status_code in (400,): 

463 raise BadIMDSRequestError(request) 

464 except ReadTimeoutError: 

465 return None 

466 except RETRYABLE_HTTP_ERRORS as e: 

467 logger.debug( 

468 "Caught retryable HTTP exception while making metadata " 

469 "service request to %s: %s", 

470 url, 

471 e, 

472 exc_info=True, 

473 ) 

474 except HTTPClientError as e: 

475 if isinstance(e.kwargs.get('error'), LocationParseError): 

476 raise InvalidIMDSEndpointError(endpoint=url, error=e) 

477 else: 

478 raise 

479 return None 

480 

481 def _get_request(self, url_path, retry_func, token=None): 

482 """Make a get request to the Instance Metadata Service. 

483 

484 :type url_path: str 

485 :param url_path: The path component of the URL to make a get request. 

486 This arg is appended to the base_url that was provided in the 

487 initializer. 

488 

489 :type retry_func: callable 

490 :param retry_func: A function that takes the response as an argument 

491 and determines if it needs to retry. By default empty and non 

492 200 OK responses are retried. 

493 

494 :type token: str 

495 :param token: Metadata token to send along with GET requests to IMDS. 

496 """ 

497 self._assert_enabled() 

498 if retry_func is None: 

499 retry_func = self._default_retry 

500 url = self._construct_url(url_path) 

501 headers = {} 

502 if token is not None: 

503 headers['x-aws-ec2-metadata-token'] = token 

504 self._add_user_agent(headers) 

505 for i in range(self._num_attempts): 

506 try: 

507 request = botocore.awsrequest.AWSRequest( 

508 method='GET', url=url, headers=headers 

509 ) 

510 response = self._session.send(request.prepare()) 

511 if not retry_func(response): 

512 return response 

513 except RETRYABLE_HTTP_ERRORS as e: 

514 logger.debug( 

515 "Caught retryable HTTP exception while making metadata " 

516 "service request to %s: %s", 

517 url, 

518 e, 

519 exc_info=True, 

520 ) 

521 raise self._RETRIES_EXCEEDED_ERROR_CLS() 

522 

523 def _add_user_agent(self, headers): 

524 if self._user_agent is not None: 

525 headers['User-Agent'] = self._user_agent 

526 

527 def _assert_enabled(self): 

528 if self._disabled: 

529 logger.debug("Access to EC2 metadata has been disabled.") 

530 raise self._RETRIES_EXCEEDED_ERROR_CLS() 

531 

532 def _default_retry(self, response): 

533 return self._is_non_ok_response(response) or self._is_empty(response) 

534 

535 def _is_non_ok_response(self, response): 

536 if response.status_code != 200: 

537 self._log_imds_response(response, 'non-200', log_body=True) 

538 return True 

539 return False 

540 

541 def _is_empty(self, response): 

542 if not response.content: 

543 self._log_imds_response(response, 'no body', log_body=True) 

544 return True 

545 return False 

546 

547 def _log_imds_response(self, response, reason_to_log, log_body=False): 

548 statement = ( 

549 "Metadata service returned %s response " 

550 "with status code of %s for url: %s" 

551 ) 

552 logger_args = [reason_to_log, response.status_code, response.url] 

553 if log_body: 

554 statement += ", content body: %s" 

555 logger_args.append(response.content) 

556 logger.debug(statement, *logger_args) 

557 

558 

559class InstanceMetadataFetcher(IMDSFetcher): 

560 _URL_PATH = 'latest/meta-data/iam/security-credentials/' 

561 _REQUIRED_CREDENTIAL_FIELDS = [ 

562 'AccessKeyId', 

563 'SecretAccessKey', 

564 'Token', 

565 'Expiration', 

566 ] 

567 

568 def retrieve_iam_role_credentials(self): 

569 try: 

570 token = self._fetch_metadata_token() 

571 role_name = self._get_iam_role(token) 

572 credentials = self._get_credentials(role_name, token) 

573 if self._contains_all_credential_fields(credentials): 

574 credentials = { 

575 'role_name': role_name, 

576 'access_key': credentials['AccessKeyId'], 

577 'secret_key': credentials['SecretAccessKey'], 

578 'token': credentials['Token'], 

579 'expiry_time': credentials['Expiration'], 

580 } 

581 self._evaluate_expiration(credentials) 

582 return credentials 

583 else: 

584 # IMDS can return a 200 response that has a JSON formatted 

585 # error message (i.e. if ec2 is not trusted entity for the 

586 # attached role). We do not necessarily want to retry for 

587 # these and we also do not necessarily want to raise a key 

588 # error. So at least log the problematic response and return 

589 # an empty dictionary to signal that it was not able to 

590 # retrieve credentials. These error will contain both a 

591 # Code and Message key. 

592 if 'Code' in credentials and 'Message' in credentials: 

593 logger.debug( 

594 'Error response received when retrieving' 

595 'credentials: %s.', 

596 credentials, 

597 ) 

598 return {} 

599 except self._RETRIES_EXCEEDED_ERROR_CLS: 

600 logger.debug( 

601 "Max number of attempts exceeded (%s) when " 

602 "attempting to retrieve data from metadata service.", 

603 self._num_attempts, 

604 ) 

605 except BadIMDSRequestError as e: 

606 logger.debug("Bad IMDS request: %s", e.request) 

607 return {} 

608 

609 def _get_iam_role(self, token=None): 

610 return self._get_request( 

611 url_path=self._URL_PATH, 

612 retry_func=self._needs_retry_for_role_name, 

613 token=token, 

614 ).text 

615 

616 def _get_credentials(self, role_name, token=None): 

617 r = self._get_request( 

618 url_path=self._URL_PATH + role_name, 

619 retry_func=self._needs_retry_for_credentials, 

620 token=token, 

621 ) 

622 return json.loads(r.text) 

623 

624 def _is_invalid_json(self, response): 

625 try: 

626 json.loads(response.text) 

627 return False 

628 except ValueError: 

629 self._log_imds_response(response, 'invalid json') 

630 return True 

631 

632 def _needs_retry_for_role_name(self, response): 

633 return self._is_non_ok_response(response) or self._is_empty(response) 

634 

635 def _needs_retry_for_credentials(self, response): 

636 return ( 

637 self._is_non_ok_response(response) 

638 or self._is_empty(response) 

639 or self._is_invalid_json(response) 

640 ) 

641 

642 def _contains_all_credential_fields(self, credentials): 

643 for field in self._REQUIRED_CREDENTIAL_FIELDS: 

644 if field not in credentials: 

645 logger.debug( 

646 'Retrieved credentials is missing required field: %s', 

647 field, 

648 ) 

649 return False 

650 return True 

651 

652 def _evaluate_expiration(self, credentials): 

653 expiration = credentials.get("expiry_time") 

654 if expiration is None: 

655 return 

656 try: 

657 expiration = datetime.datetime.strptime( 

658 expiration, "%Y-%m-%dT%H:%M:%SZ" 

659 ) 

660 refresh_interval = self._config.get( 

661 "ec2_credential_refresh_window", 60 * 10 

662 ) 

663 jitter = random.randint(120, 600) # Between 2 to 10 minutes 

664 refresh_interval_with_jitter = refresh_interval + jitter 

665 current_time = datetime.datetime.utcnow() 

666 refresh_offset = datetime.timedelta( 

667 seconds=refresh_interval_with_jitter 

668 ) 

669 extension_time = expiration - refresh_offset 

670 if current_time >= extension_time: 

671 new_time = current_time + refresh_offset 

672 credentials["expiry_time"] = new_time.strftime( 

673 "%Y-%m-%dT%H:%M:%SZ" 

674 ) 

675 logger.info( 

676 f"Attempting credential expiration extension due to a " 

677 f"credential service availability issue. A refresh of " 

678 f"these credentials will be attempted again within " 

679 f"the next {refresh_interval_with_jitter/60:.0f} minutes." 

680 ) 

681 except ValueError: 

682 logger.debug( 

683 f"Unable to parse expiry_time in {credentials['expiry_time']}" 

684 ) 

685 

686 

687class IMDSRegionProvider: 

688 def __init__(self, session, environ=None, fetcher=None): 

689 """Initialize IMDSRegionProvider. 

690 :type session: :class:`botocore.session.Session` 

691 :param session: The session is needed to look up configuration for 

692 how to contact the instance metadata service. Specifically the 

693 whether or not it should use the IMDS region at all, and if so how 

694 to configure the timeout and number of attempts to reach the 

695 service. 

696 :type environ: None or dict 

697 :param environ: A dictionary of environment variables to use. If 

698 ``None`` is the argument then ``os.environ`` will be used by 

699 default. 

700 :type fecther: :class:`botocore.utils.InstanceMetadataRegionFetcher` 

701 :param fetcher: The class to actually handle the fetching of the region 

702 from the IMDS. If not provided a default one will be created. 

703 """ 

704 self._session = session 

705 if environ is None: 

706 environ = os.environ 

707 self._environ = environ 

708 self._fetcher = fetcher 

709 

710 def provide(self): 

711 """Provide the region value from IMDS.""" 

712 instance_region = self._get_instance_metadata_region() 

713 return instance_region 

714 

715 def _get_instance_metadata_region(self): 

716 fetcher = self._get_fetcher() 

717 region = fetcher.retrieve_region() 

718 return region 

719 

720 def _get_fetcher(self): 

721 if self._fetcher is None: 

722 self._fetcher = self._create_fetcher() 

723 return self._fetcher 

724 

725 def _create_fetcher(self): 

726 metadata_timeout = self._session.get_config_variable( 

727 'metadata_service_timeout' 

728 ) 

729 metadata_num_attempts = self._session.get_config_variable( 

730 'metadata_service_num_attempts' 

731 ) 

732 imds_config = { 

733 'ec2_metadata_service_endpoint': self._session.get_config_variable( 

734 'ec2_metadata_service_endpoint' 

735 ), 

736 'ec2_metadata_service_endpoint_mode': resolve_imds_endpoint_mode( 

737 self._session 

738 ), 

739 } 

740 fetcher = InstanceMetadataRegionFetcher( 

741 timeout=metadata_timeout, 

742 num_attempts=metadata_num_attempts, 

743 env=self._environ, 

744 user_agent=self._session.user_agent(), 

745 config=imds_config, 

746 ) 

747 return fetcher 

748 

749 

750class InstanceMetadataRegionFetcher(IMDSFetcher): 

751 _URL_PATH = 'latest/meta-data/placement/availability-zone/' 

752 

753 def retrieve_region(self): 

754 """Get the current region from the instance metadata service. 

755 :rvalue: str 

756 :returns: The region the current instance is running in or None 

757 if the instance metadata service cannot be contacted or does not 

758 give a valid response. 

759 :rtype: None or str 

760 :returns: Returns the region as a string if it is configured to use 

761 IMDS as a region source. Otherwise returns ``None``. It will also 

762 return ``None`` if it fails to get the region from IMDS due to 

763 exhausting its retries or not being able to connect. 

764 """ 

765 try: 

766 region = self._get_region() 

767 return region 

768 except self._RETRIES_EXCEEDED_ERROR_CLS: 

769 logger.debug( 

770 "Max number of attempts exceeded (%s) when " 

771 "attempting to retrieve data from metadata service.", 

772 self._num_attempts, 

773 ) 

774 return None 

775 

776 def _get_region(self): 

777 token = self._fetch_metadata_token() 

778 response = self._get_request( 

779 url_path=self._URL_PATH, 

780 retry_func=self._default_retry, 

781 token=token, 

782 ) 

783 availability_zone = response.text 

784 region = availability_zone[:-1] 

785 return region 

786 

787 

788def merge_dicts(dict1, dict2, append_lists=False): 

789 """Given two dict, merge the second dict into the first. 

790 

791 The dicts can have arbitrary nesting. 

792 

793 :param append_lists: If true, instead of clobbering a list with the new 

794 value, append all of the new values onto the original list. 

795 """ 

796 for key in dict2: 

797 if isinstance(dict2[key], dict): 

798 if key in dict1 and key in dict2: 

799 merge_dicts(dict1[key], dict2[key]) 

800 else: 

801 dict1[key] = dict2[key] 

802 # If the value is a list and the ``append_lists`` flag is set, 

803 # append the new values onto the original list 

804 elif isinstance(dict2[key], list) and append_lists: 

805 # The value in dict1 must be a list in order to append new 

806 # values onto it. 

807 if key in dict1 and isinstance(dict1[key], list): 

808 dict1[key].extend(dict2[key]) 

809 else: 

810 dict1[key] = dict2[key] 

811 else: 

812 # At scalar types, we iterate and merge the 

813 # current dict that we're on. 

814 dict1[key] = dict2[key] 

815 

816 

817def lowercase_dict(original): 

818 """Copies the given dictionary ensuring all keys are lowercase strings.""" 

819 copy = {} 

820 for key in original: 

821 copy[key.lower()] = original[key] 

822 return copy 

823 

824 

825def parse_key_val_file(filename, _open=open): 

826 try: 

827 with _open(filename) as f: 

828 contents = f.read() 

829 return parse_key_val_file_contents(contents) 

830 except OSError: 

831 raise ConfigNotFound(path=filename) 

832 

833 

834def parse_key_val_file_contents(contents): 

835 # This was originally extracted from the EC2 credential provider, which was 

836 # fairly lenient in its parsing. We only try to parse key/val pairs if 

837 # there's a '=' in the line. 

838 final = {} 

839 for line in contents.splitlines(): 

840 if '=' not in line: 

841 continue 

842 key, val = line.split('=', 1) 

843 key = key.strip() 

844 val = val.strip() 

845 final[key] = val 

846 return final 

847 

848 

849def percent_encode_sequence(mapping, safe=SAFE_CHARS): 

850 """Urlencode a dict or list into a string. 

851 

852 This is similar to urllib.urlencode except that: 

853 

854 * It uses quote, and not quote_plus 

855 * It has a default list of safe chars that don't need 

856 to be encoded, which matches what AWS services expect. 

857 

858 If any value in the input ``mapping`` is a list type, 

859 then each list element wil be serialized. This is the equivalent 

860 to ``urlencode``'s ``doseq=True`` argument. 

861 

862 This function should be preferred over the stdlib 

863 ``urlencode()`` function. 

864 

865 :param mapping: Either a dict to urlencode or a list of 

866 ``(key, value)`` pairs. 

867 

868 """ 

869 encoded_pairs = [] 

870 if hasattr(mapping, 'items'): 

871 pairs = mapping.items() 

872 else: 

873 pairs = mapping 

874 for key, value in pairs: 

875 if isinstance(value, list): 

876 for element in value: 

877 encoded_pairs.append( 

878 f'{percent_encode(key)}={percent_encode(element)}' 

879 ) 

880 else: 

881 encoded_pairs.append( 

882 f'{percent_encode(key)}={percent_encode(value)}' 

883 ) 

884 return '&'.join(encoded_pairs) 

885 

886 

887def percent_encode(input_str, safe=SAFE_CHARS): 

888 """Urlencodes a string. 

889 

890 Whereas percent_encode_sequence handles taking a dict/sequence and 

891 producing a percent encoded string, this function deals only with 

892 taking a string (not a dict/sequence) and percent encoding it. 

893 

894 If given the binary type, will simply URL encode it. If given the 

895 text type, will produce the binary type by UTF-8 encoding the 

896 text. If given something else, will convert it to the text type 

897 first. 

898 """ 

899 # If its not a binary or text string, make it a text string. 

900 if not isinstance(input_str, (bytes, str)): 

901 input_str = str(input_str) 

902 # If it's not bytes, make it bytes by UTF-8 encoding it. 

903 if not isinstance(input_str, bytes): 

904 input_str = input_str.encode('utf-8') 

905 return quote(input_str, safe=safe) 

906 

907 

908def _parse_timestamp_with_tzinfo(value, tzinfo): 

909 """Parse timestamp with pluggable tzinfo options.""" 

910 if isinstance(value, (int, float)): 

911 # Possibly an epoch time. 

912 return datetime.datetime.fromtimestamp(value, tzinfo()) 

913 else: 

914 try: 

915 return datetime.datetime.fromtimestamp(float(value), tzinfo()) 

916 except (TypeError, ValueError): 

917 pass 

918 try: 

919 # In certain cases, a timestamp marked with GMT can be parsed into a 

920 # different time zone, so here we provide a context which will 

921 # enforce that GMT == UTC. 

922 return dateutil.parser.parse(value, tzinfos={'GMT': tzutc()}) 

923 except (TypeError, ValueError) as e: 

924 raise ValueError(f'Invalid timestamp "{value}": {e}') 

925 

926 

927def parse_timestamp(value): 

928 """Parse a timestamp into a datetime object. 

929 

930 Supported formats: 

931 

932 * iso8601 

933 * rfc822 

934 * epoch (value is an integer) 

935 

936 This will return a ``datetime.datetime`` object. 

937 

938 """ 

939 for tzinfo in get_tzinfo_options(): 

940 try: 

941 return _parse_timestamp_with_tzinfo(value, tzinfo) 

942 except OSError as e: 

943 logger.debug( 

944 'Unable to parse timestamp with "%s" timezone info.', 

945 tzinfo.__name__, 

946 exc_info=e, 

947 ) 

948 raise RuntimeError( 

949 'Unable to calculate correct timezone offset for "%s"' % value 

950 ) 

951 

952 

953def parse_to_aware_datetime(value): 

954 """Converted the passed in value to a datetime object with tzinfo. 

955 

956 This function can be used to normalize all timestamp inputs. This 

957 function accepts a number of different types of inputs, but 

958 will always return a datetime.datetime object with time zone 

959 information. 

960 

961 The input param ``value`` can be one of several types: 

962 

963 * A datetime object (both naive and aware) 

964 * An integer representing the epoch time (can also be a string 

965 of the integer, i.e '0', instead of 0). The epoch time is 

966 considered to be UTC. 

967 * An iso8601 formatted timestamp. This does not need to be 

968 a complete timestamp, it can contain just the date portion 

969 without the time component. 

970 

971 The returned value will be a datetime object that will have tzinfo. 

972 If no timezone info was provided in the input value, then UTC is 

973 assumed, not local time. 

974 

975 """ 

976 # This is a general purpose method that handles several cases of 

977 # converting the provided value to a string timestamp suitable to be 

978 # serialized to an http request. It can handle: 

979 # 1) A datetime.datetime object. 

980 if isinstance(value, datetime.datetime): 

981 datetime_obj = value 

982 else: 

983 # 2) A string object that's formatted as a timestamp. 

984 # We document this as being an iso8601 timestamp, although 

985 # parse_timestamp is a bit more flexible. 

986 datetime_obj = parse_timestamp(value) 

987 if datetime_obj.tzinfo is None: 

988 # I think a case would be made that if no time zone is provided, 

989 # we should use the local time. However, to restore backwards 

990 # compat, the previous behavior was to assume UTC, which is 

991 # what we're going to do here. 

992 datetime_obj = datetime_obj.replace(tzinfo=tzutc()) 

993 else: 

994 datetime_obj = datetime_obj.astimezone(tzutc()) 

995 return datetime_obj 

996 

997 

998def datetime2timestamp(dt, default_timezone=None): 

999 """Calculate the timestamp based on the given datetime instance. 

1000 

1001 :type dt: datetime 

1002 :param dt: A datetime object to be converted into timestamp 

1003 :type default_timezone: tzinfo 

1004 :param default_timezone: If it is provided as None, we treat it as tzutc(). 

1005 But it is only used when dt is a naive datetime. 

1006 :returns: The timestamp 

1007 """ 

1008 epoch = datetime.datetime(1970, 1, 1) 

1009 if dt.tzinfo is None: 

1010 if default_timezone is None: 

1011 default_timezone = tzutc() 

1012 dt = dt.replace(tzinfo=default_timezone) 

1013 d = dt.replace(tzinfo=None) - dt.utcoffset() - epoch 

1014 if hasattr(d, "total_seconds"): 

1015 return d.total_seconds() # Works in Python 3.6+ 

1016 return ( 

1017 d.microseconds + (d.seconds + d.days * 24 * 3600) * 10**6 

1018 ) / 10**6 

1019 

1020 

1021def calculate_sha256(body, as_hex=False): 

1022 """Calculate a sha256 checksum. 

1023 

1024 This method will calculate the sha256 checksum of a file like 

1025 object. Note that this method will iterate through the entire 

1026 file contents. The caller is responsible for ensuring the proper 

1027 starting position of the file and ``seek()``'ing the file back 

1028 to its starting location if other consumers need to read from 

1029 the file like object. 

1030 

1031 :param body: Any file like object. The file must be opened 

1032 in binary mode such that a ``.read()`` call returns bytes. 

1033 :param as_hex: If True, then the hex digest is returned. 

1034 If False, then the digest (as binary bytes) is returned. 

1035 

1036 :returns: The sha256 checksum 

1037 

1038 """ 

1039 checksum = hashlib.sha256() 

1040 for chunk in iter(lambda: body.read(1024 * 1024), b''): 

1041 checksum.update(chunk) 

1042 if as_hex: 

1043 return checksum.hexdigest() 

1044 else: 

1045 return checksum.digest() 

1046 

1047 

1048def calculate_tree_hash(body): 

1049 """Calculate a tree hash checksum. 

1050 

1051 For more information see: 

1052 

1053 http://docs.aws.amazon.com/amazonglacier/latest/dev/checksum-calculations.html 

1054 

1055 :param body: Any file like object. This has the same constraints as 

1056 the ``body`` param in calculate_sha256 

1057 

1058 :rtype: str 

1059 :returns: The hex version of the calculated tree hash 

1060 

1061 """ 

1062 chunks = [] 

1063 required_chunk_size = 1024 * 1024 

1064 sha256 = hashlib.sha256 

1065 for chunk in iter(lambda: body.read(required_chunk_size), b''): 

1066 chunks.append(sha256(chunk).digest()) 

1067 if not chunks: 

1068 return sha256(b'').hexdigest() 

1069 while len(chunks) > 1: 

1070 new_chunks = [] 

1071 for first, second in _in_pairs(chunks): 

1072 if second is not None: 

1073 new_chunks.append(sha256(first + second).digest()) 

1074 else: 

1075 # We're at the end of the list and there's no pair left. 

1076 new_chunks.append(first) 

1077 chunks = new_chunks 

1078 return binascii.hexlify(chunks[0]).decode('ascii') 

1079 

1080 

1081def _in_pairs(iterable): 

1082 # Creates iterator that iterates over the list in pairs: 

1083 # for a, b in _in_pairs([0, 1, 2, 3, 4]): 

1084 # print(a, b) 

1085 # 

1086 # will print: 

1087 # 0, 1 

1088 # 2, 3 

1089 # 4, None 

1090 shared_iter = iter(iterable) 

1091 # Note that zip_longest is a compat import that uses 

1092 # the itertools izip_longest. This creates an iterator, 

1093 # this call below does _not_ immediately create the list 

1094 # of pairs. 

1095 return zip_longest(shared_iter, shared_iter) 

1096 

1097 

1098class CachedProperty: 

1099 """A read only property that caches the initially computed value. 

1100 

1101 This descriptor will only call the provided ``fget`` function once. 

1102 Subsequent access to this property will return the cached value. 

1103 

1104 """ 

1105 

1106 def __init__(self, fget): 

1107 self._fget = fget 

1108 

1109 def __get__(self, obj, cls): 

1110 if obj is None: 

1111 return self 

1112 else: 

1113 computed_value = self._fget(obj) 

1114 obj.__dict__[self._fget.__name__] = computed_value 

1115 return computed_value 

1116 

1117 

1118class ArgumentGenerator: 

1119 """Generate sample input based on a shape model. 

1120 

1121 This class contains a ``generate_skeleton`` method that will take 

1122 an input/output shape (created from ``botocore.model``) and generate 

1123 a sample dictionary corresponding to the input/output shape. 

1124 

1125 The specific values used are place holder values. For strings either an 

1126 empty string or the member name can be used, for numbers 0 or 0.0 is used. 

1127 The intended usage of this class is to generate the *shape* of the input 

1128 structure. 

1129 

1130 This can be useful for operations that have complex input shapes. 

1131 This allows a user to just fill in the necessary data instead of 

1132 worrying about the specific structure of the input arguments. 

1133 

1134 Example usage:: 

1135 

1136 s = botocore.session.get_session() 

1137 ddb = s.get_service_model('dynamodb') 

1138 arg_gen = ArgumentGenerator() 

1139 sample_input = arg_gen.generate_skeleton( 

1140 ddb.operation_model('CreateTable').input_shape) 

1141 print("Sample input for dynamodb.CreateTable: %s" % sample_input) 

1142 

1143 """ 

1144 

1145 def __init__(self, use_member_names=False): 

1146 self._use_member_names = use_member_names 

1147 

1148 def generate_skeleton(self, shape): 

1149 """Generate a sample input. 

1150 

1151 :type shape: ``botocore.model.Shape`` 

1152 :param shape: The input shape. 

1153 

1154 :return: The generated skeleton input corresponding to the 

1155 provided input shape. 

1156 

1157 """ 

1158 stack = [] 

1159 return self._generate_skeleton(shape, stack) 

1160 

1161 def _generate_skeleton(self, shape, stack, name=''): 

1162 stack.append(shape.name) 

1163 try: 

1164 if shape.type_name == 'structure': 

1165 return self._generate_type_structure(shape, stack) 

1166 elif shape.type_name == 'list': 

1167 return self._generate_type_list(shape, stack) 

1168 elif shape.type_name == 'map': 

1169 return self._generate_type_map(shape, stack) 

1170 elif shape.type_name == 'string': 

1171 if self._use_member_names: 

1172 return name 

1173 if shape.enum: 

1174 return random.choice(shape.enum) 

1175 return '' 

1176 elif shape.type_name in ['integer', 'long']: 

1177 return 0 

1178 elif shape.type_name in ['float', 'double']: 

1179 return 0.0 

1180 elif shape.type_name == 'boolean': 

1181 return True 

1182 elif shape.type_name == 'timestamp': 

1183 return datetime.datetime(1970, 1, 1, 0, 0, 0) 

1184 finally: 

1185 stack.pop() 

1186 

1187 def _generate_type_structure(self, shape, stack): 

1188 if stack.count(shape.name) > 1: 

1189 return {} 

1190 skeleton = OrderedDict() 

1191 for member_name, member_shape in shape.members.items(): 

1192 skeleton[member_name] = self._generate_skeleton( 

1193 member_shape, stack, name=member_name 

1194 ) 

1195 return skeleton 

1196 

1197 def _generate_type_list(self, shape, stack): 

1198 # For list elements we've arbitrarily decided to 

1199 # return two elements for the skeleton list. 

1200 name = '' 

1201 if self._use_member_names: 

1202 name = shape.member.name 

1203 return [ 

1204 self._generate_skeleton(shape.member, stack, name), 

1205 ] 

1206 

1207 def _generate_type_map(self, shape, stack): 

1208 key_shape = shape.key 

1209 value_shape = shape.value 

1210 assert key_shape.type_name == 'string' 

1211 return OrderedDict( 

1212 [ 

1213 ('KeyName', self._generate_skeleton(value_shape, stack)), 

1214 ] 

1215 ) 

1216 

1217 

1218def is_valid_ipv6_endpoint_url(endpoint_url): 

1219 if UNSAFE_URL_CHARS.intersection(endpoint_url): 

1220 return False 

1221 hostname = f'[{urlparse(endpoint_url).hostname}]' 

1222 return IPV6_ADDRZ_RE.match(hostname) is not None 

1223 

1224 

1225def is_valid_ipv4_endpoint_url(endpoint_url): 

1226 hostname = urlparse(endpoint_url).hostname 

1227 return IPV4_RE.match(hostname) is not None 

1228 

1229 

1230def is_valid_endpoint_url(endpoint_url): 

1231 """Verify the endpoint_url is valid. 

1232 

1233 :type endpoint_url: string 

1234 :param endpoint_url: An endpoint_url. Must have at least a scheme 

1235 and a hostname. 

1236 

1237 :return: True if the endpoint url is valid. False otherwise. 

1238 

1239 """ 

1240 # post-bpo-43882 urlsplit() strips unsafe characters from URL, causing 

1241 # it to pass hostname validation below. Detect them early to fix that. 

1242 if UNSAFE_URL_CHARS.intersection(endpoint_url): 

1243 return False 

1244 parts = urlsplit(endpoint_url) 

1245 hostname = parts.hostname 

1246 if hostname is None: 

1247 return False 

1248 if len(hostname) > 255: 

1249 return False 

1250 if hostname[-1] == ".": 

1251 hostname = hostname[:-1] 

1252 allowed = re.compile( 

1253 r"^((?!-)[A-Z\d-]{1,63}(?<!-)\.)*((?!-)[A-Z\d-]{1,63}(?<!-))$", 

1254 re.IGNORECASE, 

1255 ) 

1256 return allowed.match(hostname) 

1257 

1258 

1259def is_valid_uri(endpoint_url): 

1260 return is_valid_endpoint_url(endpoint_url) or is_valid_ipv6_endpoint_url( 

1261 endpoint_url 

1262 ) 

1263 

1264 

1265def validate_region_name(region_name): 

1266 """Provided region_name must be a valid host label.""" 

1267 if region_name is None: 

1268 return 

1269 valid_host_label = re.compile(r'^(?![0-9]+$)(?!-)[a-zA-Z0-9-]{,63}(?<!-)$') 

1270 valid = valid_host_label.match(region_name) 

1271 if not valid: 

1272 raise InvalidRegionError(region_name=region_name) 

1273 

1274 

1275def check_dns_name(bucket_name): 

1276 """ 

1277 Check to see if the ``bucket_name`` complies with the 

1278 restricted DNS naming conventions necessary to allow 

1279 access via virtual-hosting style. 

1280 

1281 Even though "." characters are perfectly valid in this DNS 

1282 naming scheme, we are going to punt on any name containing a 

1283 "." character because these will cause SSL cert validation 

1284 problems if we try to use virtual-hosting style addressing. 

1285 """ 

1286 if '.' in bucket_name: 

1287 return False 

1288 n = len(bucket_name) 

1289 if n < 3 or n > 63: 

1290 # Wrong length 

1291 return False 

1292 match = LABEL_RE.match(bucket_name) 

1293 if match is None or match.end() != len(bucket_name): 

1294 return False 

1295 return True 

1296 

1297 

1298def fix_s3_host( 

1299 request, 

1300 signature_version, 

1301 region_name, 

1302 default_endpoint_url=None, 

1303 **kwargs, 

1304): 

1305 """ 

1306 This handler looks at S3 requests just before they are signed. 

1307 If there is a bucket name on the path (true for everything except 

1308 ListAllBuckets) it checks to see if that bucket name conforms to 

1309 the DNS naming conventions. If it does, it alters the request to 

1310 use ``virtual hosting`` style addressing rather than ``path-style`` 

1311 addressing. 

1312 

1313 """ 

1314 if request.context.get('use_global_endpoint', False): 

1315 default_endpoint_url = 's3.amazonaws.com' 

1316 try: 

1317 switch_to_virtual_host_style( 

1318 request, signature_version, default_endpoint_url 

1319 ) 

1320 except InvalidDNSNameError as e: 

1321 bucket_name = e.kwargs['bucket_name'] 

1322 logger.debug( 

1323 'Not changing URI, bucket is not DNS compatible: %s', bucket_name 

1324 ) 

1325 

1326 

1327def switch_to_virtual_host_style( 

1328 request, signature_version, default_endpoint_url=None, **kwargs 

1329): 

1330 """ 

1331 This is a handler to force virtual host style s3 addressing no matter 

1332 the signature version (which is taken in consideration for the default 

1333 case). If the bucket is not DNS compatible an InvalidDNSName is thrown. 

1334 

1335 :param request: A AWSRequest object that is about to be sent. 

1336 :param signature_version: The signature version to sign with 

1337 :param default_endpoint_url: The endpoint to use when switching to a 

1338 virtual style. If None is supplied, the virtual host will be 

1339 constructed from the url of the request. 

1340 """ 

1341 if request.auth_path is not None: 

1342 # The auth_path has already been applied (this may be a 

1343 # retried request). We don't need to perform this 

1344 # customization again. 

1345 return 

1346 elif _is_get_bucket_location_request(request): 

1347 # For the GetBucketLocation response, we should not be using 

1348 # the virtual host style addressing so we can avoid any sigv4 

1349 # issues. 

1350 logger.debug( 

1351 "Request is GetBucketLocation operation, not checking " 

1352 "for DNS compatibility." 

1353 ) 

1354 return 

1355 parts = urlsplit(request.url) 

1356 request.auth_path = parts.path 

1357 path_parts = parts.path.split('/') 

1358 

1359 # Retrieve what the endpoint we will be prepending the bucket name to. 

1360 if default_endpoint_url is None: 

1361 default_endpoint_url = parts.netloc 

1362 

1363 if len(path_parts) > 1: 

1364 bucket_name = path_parts[1] 

1365 if not bucket_name: 

1366 # If the bucket name is empty we should not be checking for 

1367 # dns compatibility. 

1368 return 

1369 logger.debug('Checking for DNS compatible bucket for: %s', request.url) 

1370 if check_dns_name(bucket_name): 

1371 # If the operation is on a bucket, the auth_path must be 

1372 # terminated with a '/' character. 

1373 if len(path_parts) == 2: 

1374 if request.auth_path[-1] != '/': 

1375 request.auth_path += '/' 

1376 path_parts.remove(bucket_name) 

1377 # At the very least the path must be a '/', such as with the 

1378 # CreateBucket operation when DNS style is being used. If this 

1379 # is not used you will get an empty path which is incorrect. 

1380 path = '/'.join(path_parts) or '/' 

1381 global_endpoint = default_endpoint_url 

1382 host = bucket_name + '.' + global_endpoint 

1383 new_tuple = (parts.scheme, host, path, parts.query, '') 

1384 new_uri = urlunsplit(new_tuple) 

1385 request.url = new_uri 

1386 logger.debug('URI updated to: %s', new_uri) 

1387 else: 

1388 raise InvalidDNSNameError(bucket_name=bucket_name) 

1389 

1390 

1391def _is_get_bucket_location_request(request): 

1392 return request.url.endswith('?location') 

1393 

1394 

1395def instance_cache(func): 

1396 """Method decorator for caching method calls to a single instance. 

1397 

1398 **This is not a general purpose caching decorator.** 

1399 

1400 In order to use this, you *must* provide an ``_instance_cache`` 

1401 attribute on the instance. 

1402 

1403 This decorator is used to cache method calls. The cache is only 

1404 scoped to a single instance though such that multiple instances 

1405 will maintain their own cache. In order to keep things simple, 

1406 this decorator requires that you provide an ``_instance_cache`` 

1407 attribute on your instance. 

1408 

1409 """ 

1410 func_name = func.__name__ 

1411 

1412 @functools.wraps(func) 

1413 def _cache_guard(self, *args, **kwargs): 

1414 cache_key = (func_name, args) 

1415 if kwargs: 

1416 kwarg_items = tuple(sorted(kwargs.items())) 

1417 cache_key = (func_name, args, kwarg_items) 

1418 result = self._instance_cache.get(cache_key) 

1419 if result is not None: 

1420 return result 

1421 result = func(self, *args, **kwargs) 

1422 self._instance_cache[cache_key] = result 

1423 return result 

1424 

1425 return _cache_guard 

1426 

1427 

1428def switch_host_s3_accelerate(request, operation_name, **kwargs): 

1429 """Switches the current s3 endpoint with an S3 Accelerate endpoint""" 

1430 

1431 # Note that when registered the switching of the s3 host happens 

1432 # before it gets changed to virtual. So we are not concerned with ensuring 

1433 # that the bucket name is translated to the virtual style here and we 

1434 # can hard code the Accelerate endpoint. 

1435 parts = urlsplit(request.url).netloc.split('.') 

1436 parts = [p for p in parts if p in S3_ACCELERATE_WHITELIST] 

1437 endpoint = 'https://s3-accelerate.' 

1438 if len(parts) > 0: 

1439 endpoint += '.'.join(parts) + '.' 

1440 endpoint += 'amazonaws.com' 

1441 

1442 if operation_name in ['ListBuckets', 'CreateBucket', 'DeleteBucket']: 

1443 return 

1444 _switch_hosts(request, endpoint, use_new_scheme=False) 

1445 

1446 

1447def switch_host_with_param(request, param_name): 

1448 """Switches the host using a parameter value from a JSON request body""" 

1449 request_json = json.loads(request.data.decode('utf-8')) 

1450 if request_json.get(param_name): 

1451 new_endpoint = request_json[param_name] 

1452 _switch_hosts(request, new_endpoint) 

1453 

1454 

1455def _switch_hosts(request, new_endpoint, use_new_scheme=True): 

1456 final_endpoint = _get_new_endpoint( 

1457 request.url, new_endpoint, use_new_scheme 

1458 ) 

1459 request.url = final_endpoint 

1460 

1461 

1462def _get_new_endpoint(original_endpoint, new_endpoint, use_new_scheme=True): 

1463 new_endpoint_components = urlsplit(new_endpoint) 

1464 original_endpoint_components = urlsplit(original_endpoint) 

1465 scheme = original_endpoint_components.scheme 

1466 if use_new_scheme: 

1467 scheme = new_endpoint_components.scheme 

1468 final_endpoint_components = ( 

1469 scheme, 

1470 new_endpoint_components.netloc, 

1471 original_endpoint_components.path, 

1472 original_endpoint_components.query, 

1473 '', 

1474 ) 

1475 final_endpoint = urlunsplit(final_endpoint_components) 

1476 logger.debug(f'Updating URI from {original_endpoint} to {final_endpoint}') 

1477 return final_endpoint 

1478 

1479 

1480def deep_merge(base, extra): 

1481 """Deeply two dictionaries, overriding existing keys in the base. 

1482 

1483 :param base: The base dictionary which will be merged into. 

1484 :param extra: The dictionary to merge into the base. Keys from this 

1485 dictionary will take precedence. 

1486 """ 

1487 for key in extra: 

1488 # If the key represents a dict on both given dicts, merge the sub-dicts 

1489 if ( 

1490 key in base 

1491 and isinstance(base[key], dict) 

1492 and isinstance(extra[key], dict) 

1493 ): 

1494 deep_merge(base[key], extra[key]) 

1495 continue 

1496 

1497 # Otherwise, set the key on the base to be the value of the extra. 

1498 base[key] = extra[key] 

1499 

1500 

1501def hyphenize_service_id(service_id): 

1502 """Translate the form used for event emitters. 

1503 

1504 :param service_id: The service_id to convert. 

1505 """ 

1506 return service_id.replace(' ', '-').lower() 

1507 

1508 

1509class S3RegionRedirectorv2: 

1510 """Updated version of S3RegionRedirector for use when 

1511 EndpointRulesetResolver is in use for endpoint resolution. 

1512 

1513 This class is considered private and subject to abrupt breaking changes or 

1514 removal without prior announcement. Please do not use it directly. 

1515 """ 

1516 

1517 def __init__(self, endpoint_bridge, client, cache=None): 

1518 self._cache = cache or {} 

1519 self._client = weakref.proxy(client) 

1520 

1521 def register(self, event_emitter=None): 

1522 logger.debug('Registering S3 region redirector handler') 

1523 emitter = event_emitter or self._client.meta.events 

1524 emitter.register('needs-retry.s3', self.redirect_from_error) 

1525 emitter.register( 

1526 'before-parameter-build.s3', self.annotate_request_context 

1527 ) 

1528 emitter.register( 

1529 'before-endpoint-resolution.s3', self.redirect_from_cache 

1530 ) 

1531 

1532 def redirect_from_error(self, request_dict, response, operation, **kwargs): 

1533 """ 

1534 An S3 request sent to the wrong region will return an error that 

1535 contains the endpoint the request should be sent to. This handler 

1536 will add the redirect information to the signing context and then 

1537 redirect the request. 

1538 """ 

1539 if response is None: 

1540 # This could be none if there was a ConnectionError or other 

1541 # transport error. 

1542 return 

1543 

1544 redirect_ctx = request_dict.get('context', {}).get('s3_redirect', {}) 

1545 if ArnParser.is_arn(redirect_ctx.get('bucket')): 

1546 logger.debug( 

1547 'S3 request was previously for an Accesspoint ARN, not ' 

1548 'redirecting.' 

1549 ) 

1550 return 

1551 

1552 if redirect_ctx.get('redirected'): 

1553 logger.debug( 

1554 'S3 request was previously redirected, not redirecting.' 

1555 ) 

1556 return 

1557 

1558 error = response[1].get('Error', {}) 

1559 error_code = error.get('Code') 

1560 response_metadata = response[1].get('ResponseMetadata', {}) 

1561 

1562 # We have to account for 400 responses because 

1563 # if we sign a Head* request with the wrong region, 

1564 # we'll get a 400 Bad Request but we won't get a 

1565 # body saying it's an "AuthorizationHeaderMalformed". 

1566 is_special_head_object = ( 

1567 error_code in ('301', '400') and operation.name == 'HeadObject' 

1568 ) 

1569 is_special_head_bucket = ( 

1570 error_code in ('301', '400') 

1571 and operation.name == 'HeadBucket' 

1572 and 'x-amz-bucket-region' 

1573 in response_metadata.get('HTTPHeaders', {}) 

1574 ) 

1575 is_wrong_signing_region = ( 

1576 error_code == 'AuthorizationHeaderMalformed' and 'Region' in error 

1577 ) 

1578 is_redirect_status = response[0] is not None and response[ 

1579 0 

1580 ].status_code in (301, 302, 307) 

1581 is_permanent_redirect = error_code == 'PermanentRedirect' 

1582 if not any( 

1583 [ 

1584 is_special_head_object, 

1585 is_wrong_signing_region, 

1586 is_permanent_redirect, 

1587 is_special_head_bucket, 

1588 is_redirect_status, 

1589 ] 

1590 ): 

1591 return 

1592 

1593 bucket = request_dict['context']['s3_redirect']['bucket'] 

1594 client_region = request_dict['context'].get('client_region') 

1595 new_region = self.get_bucket_region(bucket, response) 

1596 

1597 if new_region is None: 

1598 logger.debug( 

1599 "S3 client configured for region %s but the bucket %s is not " 

1600 "in that region and the proper region could not be " 

1601 "automatically determined." % (client_region, bucket) 

1602 ) 

1603 return 

1604 

1605 logger.debug( 

1606 "S3 client configured for region %s but the bucket %s is in region" 

1607 " %s; Please configure the proper region to avoid multiple " 

1608 "unnecessary redirects and signing attempts." 

1609 % (client_region, bucket, new_region) 

1610 ) 

1611 # Adding the new region to _cache will make construct_endpoint() to 

1612 # use the new region as value for the AWS::Region builtin parameter. 

1613 self._cache[bucket] = new_region 

1614 

1615 # Re-resolve endpoint with new region and modify request_dict with 

1616 # the new URL, auth scheme, and signing context. 

1617 ep_resolver = self._client._ruleset_resolver 

1618 ep_info = ep_resolver.construct_endpoint( 

1619 operation_model=operation, 

1620 call_args=request_dict['context']['s3_redirect']['params'], 

1621 request_context=request_dict['context'], 

1622 ) 

1623 request_dict['url'] = self.set_request_url( 

1624 request_dict['url'], ep_info.url 

1625 ) 

1626 request_dict['context']['s3_redirect']['redirected'] = True 

1627 auth_schemes = ep_info.properties.get('authSchemes') 

1628 if auth_schemes is not None: 

1629 auth_info = ep_resolver.auth_schemes_to_signing_ctx(auth_schemes) 

1630 auth_type, signing_context = auth_info 

1631 request_dict['context']['auth_type'] = auth_type 

1632 request_dict['context']['signing'] = { 

1633 **request_dict['context'].get('signing', {}), 

1634 **signing_context, 

1635 } 

1636 

1637 # Return 0 so it doesn't wait to retry 

1638 return 0 

1639 

1640 def get_bucket_region(self, bucket, response): 

1641 """ 

1642 There are multiple potential sources for the new region to redirect to, 

1643 but they aren't all universally available for use. This will try to 

1644 find region from response elements, but will fall back to calling 

1645 HEAD on the bucket if all else fails. 

1646 

1647 :param bucket: The bucket to find the region for. This is necessary if 

1648 the region is not available in the error response. 

1649 :param response: A response representing a service request that failed 

1650 due to incorrect region configuration. 

1651 """ 

1652 # First try to source the region from the headers. 

1653 service_response = response[1] 

1654 response_headers = service_response['ResponseMetadata']['HTTPHeaders'] 

1655 if 'x-amz-bucket-region' in response_headers: 

1656 return response_headers['x-amz-bucket-region'] 

1657 

1658 # Next, check the error body 

1659 region = service_response.get('Error', {}).get('Region', None) 

1660 if region is not None: 

1661 return region 

1662 

1663 # Finally, HEAD the bucket. No other choice sadly. 

1664 try: 

1665 response = self._client.head_bucket(Bucket=bucket) 

1666 headers = response['ResponseMetadata']['HTTPHeaders'] 

1667 except ClientError as e: 

1668 headers = e.response['ResponseMetadata']['HTTPHeaders'] 

1669 

1670 region = headers.get('x-amz-bucket-region', None) 

1671 return region 

1672 

1673 def set_request_url(self, old_url, new_endpoint, **kwargs): 

1674 """ 

1675 Splice a new endpoint into an existing URL. Note that some endpoints 

1676 from the the endpoint provider have a path component which will be 

1677 discarded by this function. 

1678 """ 

1679 return _get_new_endpoint(old_url, new_endpoint, False) 

1680 

1681 def redirect_from_cache(self, builtins, params, **kwargs): 

1682 """ 

1683 If a bucket name has been redirected before, it is in the cache. This 

1684 handler will update the AWS::Region endpoint resolver builtin param 

1685 to use the region from cache instead of the client region to avoid the 

1686 redirect. 

1687 """ 

1688 bucket = params.get('Bucket') 

1689 if bucket is not None and bucket in self._cache: 

1690 new_region = self._cache.get(bucket) 

1691 builtins['AWS::Region'] = new_region 

1692 

1693 def annotate_request_context(self, params, context, **kwargs): 

1694 """Store the bucket name in context for later use when redirecting. 

1695 The bucket name may be an access point ARN or alias. 

1696 """ 

1697 bucket = params.get('Bucket') 

1698 context['s3_redirect'] = { 

1699 'redirected': False, 

1700 'bucket': bucket, 

1701 'params': params, 

1702 } 

1703 

1704 

1705class S3RegionRedirector: 

1706 """This handler has been replaced by S3RegionRedirectorv2. The original 

1707 version remains in place for any third-party libraries that import it. 

1708 """ 

1709 

1710 def __init__(self, endpoint_bridge, client, cache=None): 

1711 self._endpoint_resolver = endpoint_bridge 

1712 self._cache = cache 

1713 if self._cache is None: 

1714 self._cache = {} 

1715 

1716 # This needs to be a weak ref in order to prevent memory leaks on 

1717 # python 2.6 

1718 self._client = weakref.proxy(client) 

1719 

1720 warnings.warn( 

1721 'The S3RegionRedirector class has been deprecated for a new ' 

1722 'internal replacement. A future version of botocore may remove ' 

1723 'this class.', 

1724 category=FutureWarning, 

1725 ) 

1726 

1727 def register(self, event_emitter=None): 

1728 emitter = event_emitter or self._client.meta.events 

1729 emitter.register('needs-retry.s3', self.redirect_from_error) 

1730 emitter.register('before-call.s3', self.set_request_url) 

1731 emitter.register('before-parameter-build.s3', self.redirect_from_cache) 

1732 

1733 def redirect_from_error(self, request_dict, response, operation, **kwargs): 

1734 """ 

1735 An S3 request sent to the wrong region will return an error that 

1736 contains the endpoint the request should be sent to. This handler 

1737 will add the redirect information to the signing context and then 

1738 redirect the request. 

1739 """ 

1740 if response is None: 

1741 # This could be none if there was a ConnectionError or other 

1742 # transport error. 

1743 return 

1744 

1745 if self._is_s3_accesspoint(request_dict.get('context', {})): 

1746 logger.debug( 

1747 'S3 request was previously to an accesspoint, not redirecting.' 

1748 ) 

1749 return 

1750 

1751 if request_dict.get('context', {}).get('s3_redirected'): 

1752 logger.debug( 

1753 'S3 request was previously redirected, not redirecting.' 

1754 ) 

1755 return 

1756 

1757 error = response[1].get('Error', {}) 

1758 error_code = error.get('Code') 

1759 response_metadata = response[1].get('ResponseMetadata', {}) 

1760 

1761 # We have to account for 400 responses because 

1762 # if we sign a Head* request with the wrong region, 

1763 # we'll get a 400 Bad Request but we won't get a 

1764 # body saying it's an "AuthorizationHeaderMalformed". 

1765 is_special_head_object = ( 

1766 error_code in ('301', '400') and operation.name == 'HeadObject' 

1767 ) 

1768 is_special_head_bucket = ( 

1769 error_code in ('301', '400') 

1770 and operation.name == 'HeadBucket' 

1771 and 'x-amz-bucket-region' 

1772 in response_metadata.get('HTTPHeaders', {}) 

1773 ) 

1774 is_wrong_signing_region = ( 

1775 error_code == 'AuthorizationHeaderMalformed' and 'Region' in error 

1776 ) 

1777 is_redirect_status = response[0] is not None and response[ 

1778 0 

1779 ].status_code in (301, 302, 307) 

1780 is_permanent_redirect = error_code == 'PermanentRedirect' 

1781 if not any( 

1782 [ 

1783 is_special_head_object, 

1784 is_wrong_signing_region, 

1785 is_permanent_redirect, 

1786 is_special_head_bucket, 

1787 is_redirect_status, 

1788 ] 

1789 ): 

1790 return 

1791 

1792 bucket = request_dict['context']['signing']['bucket'] 

1793 client_region = request_dict['context'].get('client_region') 

1794 new_region = self.get_bucket_region(bucket, response) 

1795 

1796 if new_region is None: 

1797 logger.debug( 

1798 "S3 client configured for region %s but the bucket %s is not " 

1799 "in that region and the proper region could not be " 

1800 "automatically determined." % (client_region, bucket) 

1801 ) 

1802 return 

1803 

1804 logger.debug( 

1805 "S3 client configured for region %s but the bucket %s is in region" 

1806 " %s; Please configure the proper region to avoid multiple " 

1807 "unnecessary redirects and signing attempts." 

1808 % (client_region, bucket, new_region) 

1809 ) 

1810 endpoint = self._endpoint_resolver.resolve('s3', new_region) 

1811 endpoint = endpoint['endpoint_url'] 

1812 

1813 signing_context = { 

1814 'region': new_region, 

1815 'bucket': bucket, 

1816 'endpoint': endpoint, 

1817 } 

1818 request_dict['context']['signing'] = signing_context 

1819 

1820 self._cache[bucket] = signing_context 

1821 self.set_request_url(request_dict, request_dict['context']) 

1822 

1823 request_dict['context']['s3_redirected'] = True 

1824 

1825 # Return 0 so it doesn't wait to retry 

1826 return 0 

1827 

1828 def get_bucket_region(self, bucket, response): 

1829 """ 

1830 There are multiple potential sources for the new region to redirect to, 

1831 but they aren't all universally available for use. This will try to 

1832 find region from response elements, but will fall back to calling 

1833 HEAD on the bucket if all else fails. 

1834 

1835 :param bucket: The bucket to find the region for. This is necessary if 

1836 the region is not available in the error response. 

1837 :param response: A response representing a service request that failed 

1838 due to incorrect region configuration. 

1839 """ 

1840 # First try to source the region from the headers. 

1841 service_response = response[1] 

1842 response_headers = service_response['ResponseMetadata']['HTTPHeaders'] 

1843 if 'x-amz-bucket-region' in response_headers: 

1844 return response_headers['x-amz-bucket-region'] 

1845 

1846 # Next, check the error body 

1847 region = service_response.get('Error', {}).get('Region', None) 

1848 if region is not None: 

1849 return region 

1850 

1851 # Finally, HEAD the bucket. No other choice sadly. 

1852 try: 

1853 response = self._client.head_bucket(Bucket=bucket) 

1854 headers = response['ResponseMetadata']['HTTPHeaders'] 

1855 except ClientError as e: 

1856 headers = e.response['ResponseMetadata']['HTTPHeaders'] 

1857 

1858 region = headers.get('x-amz-bucket-region', None) 

1859 return region 

1860 

1861 def set_request_url(self, params, context, **kwargs): 

1862 endpoint = context.get('signing', {}).get('endpoint', None) 

1863 if endpoint is not None: 

1864 params['url'] = _get_new_endpoint(params['url'], endpoint, False) 

1865 

1866 def redirect_from_cache(self, params, context, **kwargs): 

1867 """ 

1868 This handler retrieves a given bucket's signing context from the cache 

1869 and adds it into the request context. 

1870 """ 

1871 if self._is_s3_accesspoint(context): 

1872 return 

1873 bucket = params.get('Bucket') 

1874 signing_context = self._cache.get(bucket) 

1875 if signing_context is not None: 

1876 context['signing'] = signing_context 

1877 else: 

1878 context['signing'] = {'bucket': bucket} 

1879 

1880 def _is_s3_accesspoint(self, context): 

1881 return 's3_accesspoint' in context 

1882 

1883 

1884class InvalidArnException(ValueError): 

1885 pass 

1886 

1887 

1888class ArnParser: 

1889 def parse_arn(self, arn): 

1890 arn_parts = arn.split(':', 5) 

1891 if len(arn_parts) < 6: 

1892 raise InvalidArnException( 

1893 'Provided ARN: %s must be of the format: ' 

1894 'arn:partition:service:region:account:resource' % arn 

1895 ) 

1896 return { 

1897 'partition': arn_parts[1], 

1898 'service': arn_parts[2], 

1899 'region': arn_parts[3], 

1900 'account': arn_parts[4], 

1901 'resource': arn_parts[5], 

1902 } 

1903 

1904 @staticmethod 

1905 def is_arn(value): 

1906 if not isinstance(value, str) or not value.startswith('arn:'): 

1907 return False 

1908 arn_parser = ArnParser() 

1909 try: 

1910 arn_parser.parse_arn(value) 

1911 return True 

1912 except InvalidArnException: 

1913 return False 

1914 

1915 

1916class S3ArnParamHandler: 

1917 _RESOURCE_REGEX = re.compile( 

1918 r'^(?P<resource_type>accesspoint|outpost)[/:](?P<resource_name>.+)$' 

1919 ) 

1920 _OUTPOST_RESOURCE_REGEX = re.compile( 

1921 r'^(?P<outpost_name>[a-zA-Z0-9\-]{1,63})[/:]accesspoint[/:]' 

1922 r'(?P<accesspoint_name>[a-zA-Z0-9\-]{1,63}$)' 

1923 ) 

1924 _BLACKLISTED_OPERATIONS = ['CreateBucket'] 

1925 

1926 def __init__(self, arn_parser=None): 

1927 self._arn_parser = arn_parser 

1928 if arn_parser is None: 

1929 self._arn_parser = ArnParser() 

1930 

1931 def register(self, event_emitter): 

1932 event_emitter.register('before-parameter-build.s3', self.handle_arn) 

1933 

1934 def handle_arn(self, params, model, context, **kwargs): 

1935 if model.name in self._BLACKLISTED_OPERATIONS: 

1936 return 

1937 arn_details = self._get_arn_details_from_bucket_param(params) 

1938 if arn_details is None: 

1939 return 

1940 if arn_details['resource_type'] == 'accesspoint': 

1941 self._store_accesspoint(params, context, arn_details) 

1942 elif arn_details['resource_type'] == 'outpost': 

1943 self._store_outpost(params, context, arn_details) 

1944 

1945 def _get_arn_details_from_bucket_param(self, params): 

1946 if 'Bucket' in params: 

1947 try: 

1948 arn = params['Bucket'] 

1949 arn_details = self._arn_parser.parse_arn(arn) 

1950 self._add_resource_type_and_name(arn, arn_details) 

1951 return arn_details 

1952 except InvalidArnException: 

1953 pass 

1954 return None 

1955 

1956 def _add_resource_type_and_name(self, arn, arn_details): 

1957 match = self._RESOURCE_REGEX.match(arn_details['resource']) 

1958 if match: 

1959 arn_details['resource_type'] = match.group('resource_type') 

1960 arn_details['resource_name'] = match.group('resource_name') 

1961 else: 

1962 raise UnsupportedS3ArnError(arn=arn) 

1963 

1964 def _store_accesspoint(self, params, context, arn_details): 

1965 # Ideally the access-point would be stored as a parameter in the 

1966 # request where the serializer would then know how to serialize it, 

1967 # but access-points are not modeled in S3 operations so it would fail 

1968 # validation. Instead, we set the access-point to the bucket parameter 

1969 # to have some value set when serializing the request and additional 

1970 # information on the context from the arn to use in forming the 

1971 # access-point endpoint. 

1972 params['Bucket'] = arn_details['resource_name'] 

1973 context['s3_accesspoint'] = { 

1974 'name': arn_details['resource_name'], 

1975 'account': arn_details['account'], 

1976 'partition': arn_details['partition'], 

1977 'region': arn_details['region'], 

1978 'service': arn_details['service'], 

1979 } 

1980 

1981 def _store_outpost(self, params, context, arn_details): 

1982 resource_name = arn_details['resource_name'] 

1983 match = self._OUTPOST_RESOURCE_REGEX.match(resource_name) 

1984 if not match: 

1985 raise UnsupportedOutpostResourceError(resource_name=resource_name) 

1986 # Because we need to set the bucket name to something to pass 

1987 # validation we're going to use the access point name to be consistent 

1988 # with normal access point arns. 

1989 accesspoint_name = match.group('accesspoint_name') 

1990 params['Bucket'] = accesspoint_name 

1991 context['s3_accesspoint'] = { 

1992 'outpost_name': match.group('outpost_name'), 

1993 'name': accesspoint_name, 

1994 'account': arn_details['account'], 

1995 'partition': arn_details['partition'], 

1996 'region': arn_details['region'], 

1997 'service': arn_details['service'], 

1998 } 

1999 

2000 

2001class S3EndpointSetter: 

2002 _DEFAULT_PARTITION = 'aws' 

2003 _DEFAULT_DNS_SUFFIX = 'amazonaws.com' 

2004 

2005 def __init__( 

2006 self, 

2007 endpoint_resolver, 

2008 region=None, 

2009 s3_config=None, 

2010 endpoint_url=None, 

2011 partition=None, 

2012 use_fips_endpoint=False, 

2013 ): 

2014 # This is calling the endpoint_resolver in regions.py 

2015 self._endpoint_resolver = endpoint_resolver 

2016 self._region = region 

2017 self._s3_config = s3_config 

2018 self._use_fips_endpoint = use_fips_endpoint 

2019 if s3_config is None: 

2020 self._s3_config = {} 

2021 self._endpoint_url = endpoint_url 

2022 self._partition = partition 

2023 if partition is None: 

2024 self._partition = self._DEFAULT_PARTITION 

2025 

2026 def register(self, event_emitter): 

2027 event_emitter.register('before-sign.s3', self.set_endpoint) 

2028 event_emitter.register('choose-signer.s3', self.set_signer) 

2029 event_emitter.register( 

2030 'before-call.s3.WriteGetObjectResponse', 

2031 self.update_endpoint_to_s3_object_lambda, 

2032 ) 

2033 

2034 def update_endpoint_to_s3_object_lambda(self, params, context, **kwargs): 

2035 if self._use_accelerate_endpoint: 

2036 raise UnsupportedS3ConfigurationError( 

2037 msg='S3 client does not support accelerate endpoints for S3 Object Lambda operations', 

2038 ) 

2039 

2040 self._override_signing_name(context, 's3-object-lambda') 

2041 if self._endpoint_url: 

2042 # Only update the url if an explicit url was not provided 

2043 return 

2044 

2045 resolver = self._endpoint_resolver 

2046 # Constructing endpoints as s3-object-lambda as region 

2047 resolved = resolver.construct_endpoint( 

2048 's3-object-lambda', self._region 

2049 ) 

2050 

2051 # Ideally we would be able to replace the endpoint before 

2052 # serialization but there's no event to do that currently 

2053 # host_prefix is all the arn/bucket specs 

2054 new_endpoint = 'https://{host_prefix}{hostname}'.format( 

2055 host_prefix=params['host_prefix'], 

2056 hostname=resolved['hostname'], 

2057 ) 

2058 

2059 params['url'] = _get_new_endpoint(params['url'], new_endpoint, False) 

2060 

2061 def set_endpoint(self, request, **kwargs): 

2062 if self._use_accesspoint_endpoint(request): 

2063 self._validate_accesspoint_supported(request) 

2064 self._validate_fips_supported(request) 

2065 self._validate_global_regions(request) 

2066 region_name = self._resolve_region_for_accesspoint_endpoint( 

2067 request 

2068 ) 

2069 self._resolve_signing_name_for_accesspoint_endpoint(request) 

2070 self._switch_to_accesspoint_endpoint(request, region_name) 

2071 return 

2072 if self._use_accelerate_endpoint: 

2073 if self._use_fips_endpoint: 

2074 raise UnsupportedS3ConfigurationError( 

2075 msg=( 

2076 'Client is configured to use the FIPS psuedo region ' 

2077 'for "%s", but S3 Accelerate does not have any FIPS ' 

2078 'compatible endpoints.' % (self._region) 

2079 ) 

2080 ) 

2081 switch_host_s3_accelerate(request=request, **kwargs) 

2082 if self._s3_addressing_handler: 

2083 self._s3_addressing_handler(request=request, **kwargs) 

2084 

2085 def _use_accesspoint_endpoint(self, request): 

2086 return 's3_accesspoint' in request.context 

2087 

2088 def _validate_fips_supported(self, request): 

2089 if not self._use_fips_endpoint: 

2090 return 

2091 if 'fips' in request.context['s3_accesspoint']['region']: 

2092 raise UnsupportedS3AccesspointConfigurationError( 

2093 msg={'Invalid ARN, FIPS region not allowed in ARN.'} 

2094 ) 

2095 if 'outpost_name' in request.context['s3_accesspoint']: 

2096 raise UnsupportedS3AccesspointConfigurationError( 

2097 msg=( 

2098 'Client is configured to use the FIPS psuedo-region "%s", ' 

2099 'but outpost ARNs do not support FIPS endpoints.' 

2100 % (self._region) 

2101 ) 

2102 ) 

2103 # Transforming psuedo region to actual region 

2104 accesspoint_region = request.context['s3_accesspoint']['region'] 

2105 if accesspoint_region != self._region: 

2106 if not self._s3_config.get('use_arn_region', True): 

2107 # TODO: Update message to reflect use_arn_region 

2108 # is not set 

2109 raise UnsupportedS3AccesspointConfigurationError( 

2110 msg=( 

2111 'Client is configured to use the FIPS psuedo-region ' 

2112 'for "%s", but the access-point ARN provided is for ' 

2113 'the "%s" region. For clients using a FIPS ' 

2114 'psuedo-region calls to access-point ARNs in another ' 

2115 'region are not allowed.' 

2116 % (self._region, accesspoint_region) 

2117 ) 

2118 ) 

2119 

2120 def _validate_global_regions(self, request): 

2121 if self._s3_config.get('use_arn_region', True): 

2122 return 

2123 if self._region in ['aws-global', 's3-external-1']: 

2124 raise UnsupportedS3AccesspointConfigurationError( 

2125 msg=( 

2126 'Client is configured to use the global psuedo-region ' 

2127 '"%s". When providing access-point ARNs a regional ' 

2128 'endpoint must be specified.' % self._region 

2129 ) 

2130 ) 

2131 

2132 def _validate_accesspoint_supported(self, request): 

2133 if self._use_accelerate_endpoint: 

2134 raise UnsupportedS3AccesspointConfigurationError( 

2135 msg=( 

2136 'Client does not support s3 accelerate configuration ' 

2137 'when an access-point ARN is specified.' 

2138 ) 

2139 ) 

2140 request_partition = request.context['s3_accesspoint']['partition'] 

2141 if request_partition != self._partition: 

2142 raise UnsupportedS3AccesspointConfigurationError( 

2143 msg=( 

2144 'Client is configured for "%s" partition, but access-point' 

2145 ' ARN provided is for "%s" partition. The client and ' 

2146 ' access-point partition must be the same.' 

2147 % (self._partition, request_partition) 

2148 ) 

2149 ) 

2150 s3_service = request.context['s3_accesspoint'].get('service') 

2151 if s3_service == 's3-object-lambda' and self._s3_config.get( 

2152 'use_dualstack_endpoint' 

2153 ): 

2154 raise UnsupportedS3AccesspointConfigurationError( 

2155 msg=( 

2156 'Client does not support s3 dualstack configuration ' 

2157 'when an S3 Object Lambda access point ARN is specified.' 

2158 ) 

2159 ) 

2160 outpost_name = request.context['s3_accesspoint'].get('outpost_name') 

2161 if outpost_name and self._s3_config.get('use_dualstack_endpoint'): 

2162 raise UnsupportedS3AccesspointConfigurationError( 

2163 msg=( 

2164 'Client does not support s3 dualstack configuration ' 

2165 'when an outpost ARN is specified.' 

2166 ) 

2167 ) 

2168 self._validate_mrap_s3_config(request) 

2169 

2170 def _validate_mrap_s3_config(self, request): 

2171 if not is_global_accesspoint(request.context): 

2172 return 

2173 if self._s3_config.get('s3_disable_multiregion_access_points'): 

2174 raise UnsupportedS3AccesspointConfigurationError( 

2175 msg=( 

2176 'Invalid configuration, Multi-Region Access Point ' 

2177 'ARNs are disabled.' 

2178 ) 

2179 ) 

2180 elif self._s3_config.get('use_dualstack_endpoint'): 

2181 raise UnsupportedS3AccesspointConfigurationError( 

2182 msg=( 

2183 'Client does not support s3 dualstack configuration ' 

2184 'when a Multi-Region Access Point ARN is specified.' 

2185 ) 

2186 ) 

2187 

2188 def _resolve_region_for_accesspoint_endpoint(self, request): 

2189 if is_global_accesspoint(request.context): 

2190 # Requests going to MRAP endpoints MUST be set to any (*) region. 

2191 self._override_signing_region(request, '*') 

2192 elif self._s3_config.get('use_arn_region', True): 

2193 accesspoint_region = request.context['s3_accesspoint']['region'] 

2194 # If we are using the region from the access point, 

2195 # we will also want to make sure that we set it as the 

2196 # signing region as well 

2197 self._override_signing_region(request, accesspoint_region) 

2198 return accesspoint_region 

2199 return self._region 

2200 

2201 def set_signer(self, context, **kwargs): 

2202 if is_global_accesspoint(context): 

2203 if HAS_CRT: 

2204 return 's3v4a' 

2205 else: 

2206 raise MissingDependencyException( 

2207 msg="Using S3 with an MRAP arn requires an additional " 

2208 "dependency. You will need to pip install " 

2209 "botocore[crt] before proceeding." 

2210 ) 

2211 

2212 def _resolve_signing_name_for_accesspoint_endpoint(self, request): 

2213 accesspoint_service = request.context['s3_accesspoint']['service'] 

2214 self._override_signing_name(request.context, accesspoint_service) 

2215 

2216 def _switch_to_accesspoint_endpoint(self, request, region_name): 

2217 original_components = urlsplit(request.url) 

2218 accesspoint_endpoint = urlunsplit( 

2219 ( 

2220 original_components.scheme, 

2221 self._get_netloc(request.context, region_name), 

2222 self._get_accesspoint_path( 

2223 original_components.path, request.context 

2224 ), 

2225 original_components.query, 

2226 '', 

2227 ) 

2228 ) 

2229 logger.debug( 

2230 f'Updating URI from {request.url} to {accesspoint_endpoint}' 

2231 ) 

2232 request.url = accesspoint_endpoint 

2233 

2234 def _get_netloc(self, request_context, region_name): 

2235 if is_global_accesspoint(request_context): 

2236 return self._get_mrap_netloc(request_context) 

2237 else: 

2238 return self._get_accesspoint_netloc(request_context, region_name) 

2239 

2240 def _get_mrap_netloc(self, request_context): 

2241 s3_accesspoint = request_context['s3_accesspoint'] 

2242 region_name = 's3-global' 

2243 mrap_netloc_components = [s3_accesspoint['name']] 

2244 if self._endpoint_url: 

2245 endpoint_url_netloc = urlsplit(self._endpoint_url).netloc 

2246 mrap_netloc_components.append(endpoint_url_netloc) 

2247 else: 

2248 partition = s3_accesspoint['partition'] 

2249 mrap_netloc_components.extend( 

2250 [ 

2251 'accesspoint', 

2252 region_name, 

2253 self._get_partition_dns_suffix(partition), 

2254 ] 

2255 ) 

2256 return '.'.join(mrap_netloc_components) 

2257 

2258 def _get_accesspoint_netloc(self, request_context, region_name): 

2259 s3_accesspoint = request_context['s3_accesspoint'] 

2260 accesspoint_netloc_components = [ 

2261 '{}-{}'.format(s3_accesspoint['name'], s3_accesspoint['account']), 

2262 ] 

2263 outpost_name = s3_accesspoint.get('outpost_name') 

2264 if self._endpoint_url: 

2265 if outpost_name: 

2266 accesspoint_netloc_components.append(outpost_name) 

2267 endpoint_url_netloc = urlsplit(self._endpoint_url).netloc 

2268 accesspoint_netloc_components.append(endpoint_url_netloc) 

2269 else: 

2270 if outpost_name: 

2271 outpost_host = [outpost_name, 's3-outposts'] 

2272 accesspoint_netloc_components.extend(outpost_host) 

2273 elif s3_accesspoint['service'] == 's3-object-lambda': 

2274 component = self._inject_fips_if_needed( 

2275 's3-object-lambda', request_context 

2276 ) 

2277 accesspoint_netloc_components.append(component) 

2278 else: 

2279 component = self._inject_fips_if_needed( 

2280 's3-accesspoint', request_context 

2281 ) 

2282 accesspoint_netloc_components.append(component) 

2283 if self._s3_config.get('use_dualstack_endpoint'): 

2284 accesspoint_netloc_components.append('dualstack') 

2285 accesspoint_netloc_components.extend( 

2286 [region_name, self._get_dns_suffix(region_name)] 

2287 ) 

2288 return '.'.join(accesspoint_netloc_components) 

2289 

2290 def _inject_fips_if_needed(self, component, request_context): 

2291 if self._use_fips_endpoint: 

2292 return '%s-fips' % component 

2293 return component 

2294 

2295 def _get_accesspoint_path(self, original_path, request_context): 

2296 # The Bucket parameter was substituted with the access-point name as 

2297 # some value was required in serializing the bucket name. Now that 

2298 # we are making the request directly to the access point, we will 

2299 # want to remove that access-point name from the path. 

2300 name = request_context['s3_accesspoint']['name'] 

2301 # All S3 operations require at least a / in their path. 

2302 return original_path.replace('/' + name, '', 1) or '/' 

2303 

2304 def _get_partition_dns_suffix(self, partition_name): 

2305 dns_suffix = self._endpoint_resolver.get_partition_dns_suffix( 

2306 partition_name 

2307 ) 

2308 if dns_suffix is None: 

2309 dns_suffix = self._DEFAULT_DNS_SUFFIX 

2310 return dns_suffix 

2311 

2312 def _get_dns_suffix(self, region_name): 

2313 resolved = self._endpoint_resolver.construct_endpoint( 

2314 's3', region_name 

2315 ) 

2316 dns_suffix = self._DEFAULT_DNS_SUFFIX 

2317 if resolved and 'dnsSuffix' in resolved: 

2318 dns_suffix = resolved['dnsSuffix'] 

2319 return dns_suffix 

2320 

2321 def _override_signing_region(self, request, region_name): 

2322 signing_context = request.context.get('signing', {}) 

2323 # S3SigV4Auth will use the context['signing']['region'] value to 

2324 # sign with if present. This is used by the Bucket redirector 

2325 # as well but we should be fine because the redirector is never 

2326 # used in combination with the accesspoint setting logic. 

2327 signing_context['region'] = region_name 

2328 request.context['signing'] = signing_context 

2329 

2330 def _override_signing_name(self, context, signing_name): 

2331 signing_context = context.get('signing', {}) 

2332 # S3SigV4Auth will use the context['signing']['signing_name'] value to 

2333 # sign with if present. This is used by the Bucket redirector 

2334 # as well but we should be fine because the redirector is never 

2335 # used in combination with the accesspoint setting logic. 

2336 signing_context['signing_name'] = signing_name 

2337 context['signing'] = signing_context 

2338 

2339 @CachedProperty 

2340 def _use_accelerate_endpoint(self): 

2341 # Enable accelerate if the configuration is set to to true or the 

2342 # endpoint being used matches one of the accelerate endpoints. 

2343 

2344 # Accelerate has been explicitly configured. 

2345 if self._s3_config.get('use_accelerate_endpoint'): 

2346 return True 

2347 

2348 # Accelerate mode is turned on automatically if an endpoint url is 

2349 # provided that matches the accelerate scheme. 

2350 if self._endpoint_url is None: 

2351 return False 

2352 

2353 # Accelerate is only valid for Amazon endpoints. 

2354 netloc = urlsplit(self._endpoint_url).netloc 

2355 if not netloc.endswith('amazonaws.com'): 

2356 return False 

2357 

2358 # The first part of the url should always be s3-accelerate. 

2359 parts = netloc.split('.') 

2360 if parts[0] != 's3-accelerate': 

2361 return False 

2362 

2363 # Url parts between 's3-accelerate' and 'amazonaws.com' which 

2364 # represent different url features. 

2365 feature_parts = parts[1:-2] 

2366 

2367 # There should be no duplicate url parts. 

2368 if len(feature_parts) != len(set(feature_parts)): 

2369 return False 

2370 

2371 # Remaining parts must all be in the whitelist. 

2372 return all(p in S3_ACCELERATE_WHITELIST for p in feature_parts) 

2373 

2374 @CachedProperty 

2375 def _addressing_style(self): 

2376 # Use virtual host style addressing if accelerate is enabled or if 

2377 # the given endpoint url is an accelerate endpoint. 

2378 if self._use_accelerate_endpoint: 

2379 return 'virtual' 

2380 

2381 # If a particular addressing style is configured, use it. 

2382 configured_addressing_style = self._s3_config.get('addressing_style') 

2383 if configured_addressing_style: 

2384 return configured_addressing_style 

2385 

2386 @CachedProperty 

2387 def _s3_addressing_handler(self): 

2388 # If virtual host style was configured, use it regardless of whether 

2389 # or not the bucket looks dns compatible. 

2390 if self._addressing_style == 'virtual': 

2391 logger.debug("Using S3 virtual host style addressing.") 

2392 return switch_to_virtual_host_style 

2393 

2394 # If path style is configured, no additional steps are needed. If 

2395 # endpoint_url was specified, don't default to virtual. We could 

2396 # potentially default provided endpoint urls to virtual hosted 

2397 # style, but for now it is avoided. 

2398 if self._addressing_style == 'path' or self._endpoint_url is not None: 

2399 logger.debug("Using S3 path style addressing.") 

2400 return None 

2401 

2402 logger.debug( 

2403 "Defaulting to S3 virtual host style addressing with " 

2404 "path style addressing fallback." 

2405 ) 

2406 

2407 # By default, try to use virtual style with path fallback. 

2408 return fix_s3_host 

2409 

2410 

2411class S3ControlEndpointSetter: 

2412 _DEFAULT_PARTITION = 'aws' 

2413 _DEFAULT_DNS_SUFFIX = 'amazonaws.com' 

2414 _HOST_LABEL_REGEX = re.compile(r'^[a-zA-Z0-9\-]{1,63}$') 

2415 

2416 def __init__( 

2417 self, 

2418 endpoint_resolver, 

2419 region=None, 

2420 s3_config=None, 

2421 endpoint_url=None, 

2422 partition=None, 

2423 use_fips_endpoint=False, 

2424 ): 

2425 self._endpoint_resolver = endpoint_resolver 

2426 self._region = region 

2427 self._s3_config = s3_config 

2428 self._use_fips_endpoint = use_fips_endpoint 

2429 if s3_config is None: 

2430 self._s3_config = {} 

2431 self._endpoint_url = endpoint_url 

2432 self._partition = partition 

2433 if partition is None: 

2434 self._partition = self._DEFAULT_PARTITION 

2435 

2436 def register(self, event_emitter): 

2437 event_emitter.register('before-sign.s3-control', self.set_endpoint) 

2438 

2439 def set_endpoint(self, request, **kwargs): 

2440 if self._use_endpoint_from_arn_details(request): 

2441 self._validate_endpoint_from_arn_details_supported(request) 

2442 region_name = self._resolve_region_from_arn_details(request) 

2443 self._resolve_signing_name_from_arn_details(request) 

2444 self._resolve_endpoint_from_arn_details(request, region_name) 

2445 self._add_headers_from_arn_details(request) 

2446 elif self._use_endpoint_from_outpost_id(request): 

2447 self._validate_outpost_redirection_valid(request) 

2448 self._override_signing_name(request, 's3-outposts') 

2449 new_netloc = self._construct_outpost_endpoint(self._region) 

2450 self._update_request_netloc(request, new_netloc) 

2451 

2452 def _use_endpoint_from_arn_details(self, request): 

2453 return 'arn_details' in request.context 

2454 

2455 def _use_endpoint_from_outpost_id(self, request): 

2456 return 'outpost_id' in request.context 

2457 

2458 def _validate_endpoint_from_arn_details_supported(self, request): 

2459 if 'fips' in request.context['arn_details']['region']: 

2460 raise UnsupportedS3ControlArnError( 

2461 arn=request.context['arn_details']['original'], 

2462 msg='Invalid ARN, FIPS region not allowed in ARN.', 

2463 ) 

2464 if not self._s3_config.get('use_arn_region', False): 

2465 arn_region = request.context['arn_details']['region'] 

2466 if arn_region != self._region: 

2467 error_msg = ( 

2468 'The use_arn_region configuration is disabled but ' 

2469 'received arn for "%s" when the client is configured ' 

2470 'to use "%s"' 

2471 ) % (arn_region, self._region) 

2472 raise UnsupportedS3ControlConfigurationError(msg=error_msg) 

2473 request_partion = request.context['arn_details']['partition'] 

2474 if request_partion != self._partition: 

2475 raise UnsupportedS3ControlConfigurationError( 

2476 msg=( 

2477 'Client is configured for "%s" partition, but arn ' 

2478 'provided is for "%s" partition. The client and ' 

2479 'arn partition must be the same.' 

2480 % (self._partition, request_partion) 

2481 ) 

2482 ) 

2483 if self._s3_config.get('use_accelerate_endpoint'): 

2484 raise UnsupportedS3ControlConfigurationError( 

2485 msg='S3 control client does not support accelerate endpoints', 

2486 ) 

2487 if 'outpost_name' in request.context['arn_details']: 

2488 self._validate_outpost_redirection_valid(request) 

2489 

2490 def _validate_outpost_redirection_valid(self, request): 

2491 if self._s3_config.get('use_dualstack_endpoint'): 

2492 raise UnsupportedS3ControlConfigurationError( 

2493 msg=( 

2494 'Client does not support s3 dualstack configuration ' 

2495 'when an outpost is specified.' 

2496 ) 

2497 ) 

2498 

2499 def _resolve_region_from_arn_details(self, request): 

2500 if self._s3_config.get('use_arn_region', False): 

2501 arn_region = request.context['arn_details']['region'] 

2502 # If we are using the region from the expanded arn, we will also 

2503 # want to make sure that we set it as the signing region as well 

2504 self._override_signing_region(request, arn_region) 

2505 return arn_region 

2506 return self._region 

2507 

2508 def _resolve_signing_name_from_arn_details(self, request): 

2509 arn_service = request.context['arn_details']['service'] 

2510 self._override_signing_name(request, arn_service) 

2511 return arn_service 

2512 

2513 def _resolve_endpoint_from_arn_details(self, request, region_name): 

2514 new_netloc = self._resolve_netloc_from_arn_details( 

2515 request, region_name 

2516 ) 

2517 self._update_request_netloc(request, new_netloc) 

2518 

2519 def _update_request_netloc(self, request, new_netloc): 

2520 original_components = urlsplit(request.url) 

2521 arn_details_endpoint = urlunsplit( 

2522 ( 

2523 original_components.scheme, 

2524 new_netloc, 

2525 original_components.path, 

2526 original_components.query, 

2527 '', 

2528 ) 

2529 ) 

2530 logger.debug( 

2531 f'Updating URI from {request.url} to {arn_details_endpoint}' 

2532 ) 

2533 request.url = arn_details_endpoint 

2534 

2535 def _resolve_netloc_from_arn_details(self, request, region_name): 

2536 arn_details = request.context['arn_details'] 

2537 if 'outpost_name' in arn_details: 

2538 return self._construct_outpost_endpoint(region_name) 

2539 account = arn_details['account'] 

2540 return self._construct_s3_control_endpoint(region_name, account) 

2541 

2542 def _is_valid_host_label(self, label): 

2543 return self._HOST_LABEL_REGEX.match(label) 

2544 

2545 def _validate_host_labels(self, *labels): 

2546 for label in labels: 

2547 if not self._is_valid_host_label(label): 

2548 raise InvalidHostLabelError(label=label) 

2549 

2550 def _construct_s3_control_endpoint(self, region_name, account): 

2551 self._validate_host_labels(region_name, account) 

2552 if self._endpoint_url: 

2553 endpoint_url_netloc = urlsplit(self._endpoint_url).netloc 

2554 netloc = [account, endpoint_url_netloc] 

2555 else: 

2556 netloc = [ 

2557 account, 

2558 's3-control', 

2559 ] 

2560 self._add_dualstack(netloc) 

2561 dns_suffix = self._get_dns_suffix(region_name) 

2562 netloc.extend([region_name, dns_suffix]) 

2563 return self._construct_netloc(netloc) 

2564 

2565 def _construct_outpost_endpoint(self, region_name): 

2566 self._validate_host_labels(region_name) 

2567 if self._endpoint_url: 

2568 return urlsplit(self._endpoint_url).netloc 

2569 else: 

2570 netloc = [ 

2571 's3-outposts', 

2572 region_name, 

2573 self._get_dns_suffix(region_name), 

2574 ] 

2575 self._add_fips(netloc) 

2576 return self._construct_netloc(netloc) 

2577 

2578 def _construct_netloc(self, netloc): 

2579 return '.'.join(netloc) 

2580 

2581 def _add_fips(self, netloc): 

2582 if self._use_fips_endpoint: 

2583 netloc[0] = netloc[0] + '-fips' 

2584 

2585 def _add_dualstack(self, netloc): 

2586 if self._s3_config.get('use_dualstack_endpoint'): 

2587 netloc.append('dualstack') 

2588 

2589 def _get_dns_suffix(self, region_name): 

2590 resolved = self._endpoint_resolver.construct_endpoint( 

2591 's3', region_name 

2592 ) 

2593 dns_suffix = self._DEFAULT_DNS_SUFFIX 

2594 if resolved and 'dnsSuffix' in resolved: 

2595 dns_suffix = resolved['dnsSuffix'] 

2596 return dns_suffix 

2597 

2598 def _override_signing_region(self, request, region_name): 

2599 signing_context = request.context.get('signing', {}) 

2600 # S3SigV4Auth will use the context['signing']['region'] value to 

2601 # sign with if present. This is used by the Bucket redirector 

2602 # as well but we should be fine because the redirector is never 

2603 # used in combination with the accesspoint setting logic. 

2604 signing_context['region'] = region_name 

2605 request.context['signing'] = signing_context 

2606 

2607 def _override_signing_name(self, request, signing_name): 

2608 signing_context = request.context.get('signing', {}) 

2609 # S3SigV4Auth will use the context['signing']['signing_name'] value to 

2610 # sign with if present. This is used by the Bucket redirector 

2611 # as well but we should be fine because the redirector is never 

2612 # used in combination with the accesspoint setting logic. 

2613 signing_context['signing_name'] = signing_name 

2614 request.context['signing'] = signing_context 

2615 

2616 def _add_headers_from_arn_details(self, request): 

2617 arn_details = request.context['arn_details'] 

2618 outpost_name = arn_details.get('outpost_name') 

2619 if outpost_name: 

2620 self._add_outpost_id_header(request, outpost_name) 

2621 

2622 def _add_outpost_id_header(self, request, outpost_name): 

2623 request.headers['x-amz-outpost-id'] = outpost_name 

2624 

2625 

2626class S3ControlArnParamHandler: 

2627 """This handler has been replaced by S3ControlArnParamHandlerv2. The 

2628 original version remains in place for any third-party importers. 

2629 """ 

2630 

2631 _RESOURCE_SPLIT_REGEX = re.compile(r'[/:]') 

2632 

2633 def __init__(self, arn_parser=None): 

2634 self._arn_parser = arn_parser 

2635 if arn_parser is None: 

2636 self._arn_parser = ArnParser() 

2637 warnings.warn( 

2638 'The S3ControlArnParamHandler class has been deprecated for a new ' 

2639 'internal replacement. A future version of botocore may remove ' 

2640 'this class.', 

2641 category=FutureWarning, 

2642 ) 

2643 

2644 def register(self, event_emitter): 

2645 event_emitter.register( 

2646 'before-parameter-build.s3-control', 

2647 self.handle_arn, 

2648 ) 

2649 

2650 def handle_arn(self, params, model, context, **kwargs): 

2651 if model.name in ('CreateBucket', 'ListRegionalBuckets'): 

2652 # CreateBucket and ListRegionalBuckets are special cases that do 

2653 # not obey ARN based redirection but will redirect based off of the 

2654 # presence of the OutpostId parameter 

2655 self._handle_outpost_id_param(params, model, context) 

2656 else: 

2657 self._handle_name_param(params, model, context) 

2658 self._handle_bucket_param(params, model, context) 

2659 

2660 def _get_arn_details_from_param(self, params, param_name): 

2661 if param_name not in params: 

2662 return None 

2663 try: 

2664 arn = params[param_name] 

2665 arn_details = self._arn_parser.parse_arn(arn) 

2666 arn_details['original'] = arn 

2667 arn_details['resources'] = self._split_resource(arn_details) 

2668 return arn_details 

2669 except InvalidArnException: 

2670 return None 

2671 

2672 def _split_resource(self, arn_details): 

2673 return self._RESOURCE_SPLIT_REGEX.split(arn_details['resource']) 

2674 

2675 def _override_account_id_param(self, params, arn_details): 

2676 account_id = arn_details['account'] 

2677 if 'AccountId' in params and params['AccountId'] != account_id: 

2678 error_msg = ( 

2679 'Account ID in arn does not match the AccountId parameter ' 

2680 'provided: "%s"' 

2681 ) % params['AccountId'] 

2682 raise UnsupportedS3ControlArnError( 

2683 arn=arn_details['original'], 

2684 msg=error_msg, 

2685 ) 

2686 params['AccountId'] = account_id 

2687 

2688 def _handle_outpost_id_param(self, params, model, context): 

2689 if 'OutpostId' not in params: 

2690 return 

2691 context['outpost_id'] = params['OutpostId'] 

2692 

2693 def _handle_name_param(self, params, model, context): 

2694 # CreateAccessPoint is a special case that does not expand Name 

2695 if model.name == 'CreateAccessPoint': 

2696 return 

2697 arn_details = self._get_arn_details_from_param(params, 'Name') 

2698 if arn_details is None: 

2699 return 

2700 if self._is_outpost_accesspoint(arn_details): 

2701 self._store_outpost_accesspoint(params, context, arn_details) 

2702 else: 

2703 error_msg = 'The Name parameter does not support the provided ARN' 

2704 raise UnsupportedS3ControlArnError( 

2705 arn=arn_details['original'], 

2706 msg=error_msg, 

2707 ) 

2708 

2709 def _is_outpost_accesspoint(self, arn_details): 

2710 if arn_details['service'] != 's3-outposts': 

2711 return False 

2712 resources = arn_details['resources'] 

2713 if len(resources) != 4: 

2714 return False 

2715 # Resource must be of the form outpost/op-123/accesspoint/name 

2716 return resources[0] == 'outpost' and resources[2] == 'accesspoint' 

2717 

2718 def _store_outpost_accesspoint(self, params, context, arn_details): 

2719 self._override_account_id_param(params, arn_details) 

2720 accesspoint_name = arn_details['resources'][3] 

2721 params['Name'] = accesspoint_name 

2722 arn_details['accesspoint_name'] = accesspoint_name 

2723 arn_details['outpost_name'] = arn_details['resources'][1] 

2724 context['arn_details'] = arn_details 

2725 

2726 def _handle_bucket_param(self, params, model, context): 

2727 arn_details = self._get_arn_details_from_param(params, 'Bucket') 

2728 if arn_details is None: 

2729 return 

2730 if self._is_outpost_bucket(arn_details): 

2731 self._store_outpost_bucket(params, context, arn_details) 

2732 else: 

2733 error_msg = ( 

2734 'The Bucket parameter does not support the provided ARN' 

2735 ) 

2736 raise UnsupportedS3ControlArnError( 

2737 arn=arn_details['original'], 

2738 msg=error_msg, 

2739 ) 

2740 

2741 def _is_outpost_bucket(self, arn_details): 

2742 if arn_details['service'] != 's3-outposts': 

2743 return False 

2744 resources = arn_details['resources'] 

2745 if len(resources) != 4: 

2746 return False 

2747 # Resource must be of the form outpost/op-123/bucket/name 

2748 return resources[0] == 'outpost' and resources[2] == 'bucket' 

2749 

2750 def _store_outpost_bucket(self, params, context, arn_details): 

2751 self._override_account_id_param(params, arn_details) 

2752 bucket_name = arn_details['resources'][3] 

2753 params['Bucket'] = bucket_name 

2754 arn_details['bucket_name'] = bucket_name 

2755 arn_details['outpost_name'] = arn_details['resources'][1] 

2756 context['arn_details'] = arn_details 

2757 

2758 

2759class S3ControlArnParamHandlerv2(S3ControlArnParamHandler): 

2760 """Updated version of S3ControlArnParamHandler for use when 

2761 EndpointRulesetResolver is in use for endpoint resolution. 

2762 

2763 This class is considered private and subject to abrupt breaking changes or 

2764 removal without prior announcement. Please do not use it directly. 

2765 """ 

2766 

2767 def __init__(self, arn_parser=None): 

2768 self._arn_parser = arn_parser 

2769 if arn_parser is None: 

2770 self._arn_parser = ArnParser() 

2771 

2772 def register(self, event_emitter): 

2773 event_emitter.register( 

2774 'before-endpoint-resolution.s3-control', 

2775 self.handle_arn, 

2776 ) 

2777 

2778 def _handle_name_param(self, params, model, context): 

2779 # CreateAccessPoint is a special case that does not expand Name 

2780 if model.name == 'CreateAccessPoint': 

2781 return 

2782 arn_details = self._get_arn_details_from_param(params, 'Name') 

2783 if arn_details is None: 

2784 return 

2785 self._raise_for_fips_pseudo_region(arn_details) 

2786 self._raise_for_accelerate_endpoint(context) 

2787 if self._is_outpost_accesspoint(arn_details): 

2788 self._store_outpost_accesspoint(params, context, arn_details) 

2789 else: 

2790 error_msg = 'The Name parameter does not support the provided ARN' 

2791 raise UnsupportedS3ControlArnError( 

2792 arn=arn_details['original'], 

2793 msg=error_msg, 

2794 ) 

2795 

2796 def _store_outpost_accesspoint(self, params, context, arn_details): 

2797 self._override_account_id_param(params, arn_details) 

2798 

2799 def _handle_bucket_param(self, params, model, context): 

2800 arn_details = self._get_arn_details_from_param(params, 'Bucket') 

2801 if arn_details is None: 

2802 return 

2803 self._raise_for_fips_pseudo_region(arn_details) 

2804 self._raise_for_accelerate_endpoint(context) 

2805 if self._is_outpost_bucket(arn_details): 

2806 self._store_outpost_bucket(params, context, arn_details) 

2807 else: 

2808 error_msg = ( 

2809 'The Bucket parameter does not support the provided ARN' 

2810 ) 

2811 raise UnsupportedS3ControlArnError( 

2812 arn=arn_details['original'], 

2813 msg=error_msg, 

2814 ) 

2815 

2816 def _store_outpost_bucket(self, params, context, arn_details): 

2817 self._override_account_id_param(params, arn_details) 

2818 

2819 def _raise_for_fips_pseudo_region(self, arn_details): 

2820 # FIPS pseudo region names cannot be used in ARNs 

2821 arn_region = arn_details['region'] 

2822 if arn_region.startswith('fips-') or arn_region.endswith('fips-'): 

2823 raise UnsupportedS3ControlArnError( 

2824 arn=arn_details['original'], 

2825 msg='Invalid ARN, FIPS region not allowed in ARN.', 

2826 ) 

2827 

2828 def _raise_for_accelerate_endpoint(self, context): 

2829 s3_config = context['client_config'].s3 or {} 

2830 if s3_config.get('use_accelerate_endpoint'): 

2831 raise UnsupportedS3ControlConfigurationError( 

2832 msg='S3 control client does not support accelerate endpoints', 

2833 ) 

2834 

2835 

2836class ContainerMetadataFetcher: 

2837 

2838 TIMEOUT_SECONDS = 2 

2839 RETRY_ATTEMPTS = 3 

2840 SLEEP_TIME = 1 

2841 IP_ADDRESS = '169.254.170.2' 

2842 _ALLOWED_HOSTS = [IP_ADDRESS, 'localhost', '127.0.0.1'] 

2843 

2844 def __init__(self, session=None, sleep=time.sleep): 

2845 if session is None: 

2846 session = botocore.httpsession.URLLib3Session( 

2847 timeout=self.TIMEOUT_SECONDS 

2848 ) 

2849 self._session = session 

2850 self._sleep = sleep 

2851 

2852 def retrieve_full_uri(self, full_url, headers=None): 

2853 """Retrieve JSON metadata from container metadata. 

2854 

2855 :type full_url: str 

2856 :param full_url: The full URL of the metadata service. 

2857 This should include the scheme as well, e.g 

2858 "http://localhost:123/foo" 

2859 

2860 """ 

2861 self._validate_allowed_url(full_url) 

2862 return self._retrieve_credentials(full_url, headers) 

2863 

2864 def _validate_allowed_url(self, full_url): 

2865 parsed = botocore.compat.urlparse(full_url) 

2866 is_whitelisted_host = self._check_if_whitelisted_host(parsed.hostname) 

2867 if not is_whitelisted_host: 

2868 raise ValueError( 

2869 "Unsupported host '%s'. Can only " 

2870 "retrieve metadata from these hosts: %s" 

2871 % (parsed.hostname, ', '.join(self._ALLOWED_HOSTS)) 

2872 ) 

2873 

2874 def _check_if_whitelisted_host(self, host): 

2875 if host in self._ALLOWED_HOSTS: 

2876 return True 

2877 return False 

2878 

2879 def retrieve_uri(self, relative_uri): 

2880 """Retrieve JSON metadata from ECS metadata. 

2881 

2882 :type relative_uri: str 

2883 :param relative_uri: A relative URI, e.g "/foo/bar?id=123" 

2884 

2885 :return: The parsed JSON response. 

2886 

2887 """ 

2888 full_url = self.full_url(relative_uri) 

2889 return self._retrieve_credentials(full_url) 

2890 

2891 def _retrieve_credentials(self, full_url, extra_headers=None): 

2892 headers = {'Accept': 'application/json'} 

2893 if extra_headers is not None: 

2894 headers.update(extra_headers) 

2895 attempts = 0 

2896 while True: 

2897 try: 

2898 return self._get_response( 

2899 full_url, headers, self.TIMEOUT_SECONDS 

2900 ) 

2901 except MetadataRetrievalError as e: 

2902 logger.debug( 

2903 "Received error when attempting to retrieve " 

2904 "container metadata: %s", 

2905 e, 

2906 exc_info=True, 

2907 ) 

2908 self._sleep(self.SLEEP_TIME) 

2909 attempts += 1 

2910 if attempts >= self.RETRY_ATTEMPTS: 

2911 raise 

2912 

2913 def _get_response(self, full_url, headers, timeout): 

2914 try: 

2915 AWSRequest = botocore.awsrequest.AWSRequest 

2916 request = AWSRequest(method='GET', url=full_url, headers=headers) 

2917 response = self._session.send(request.prepare()) 

2918 response_text = response.content.decode('utf-8') 

2919 if response.status_code != 200: 

2920 raise MetadataRetrievalError( 

2921 error_msg=( 

2922 "Received non 200 response (%s) from ECS metadata: %s" 

2923 ) 

2924 % (response.status_code, response_text) 

2925 ) 

2926 try: 

2927 return json.loads(response_text) 

2928 except ValueError: 

2929 error_msg = ( 

2930 "Unable to parse JSON returned from ECS metadata services" 

2931 ) 

2932 logger.debug('%s:%s', error_msg, response_text) 

2933 raise MetadataRetrievalError(error_msg=error_msg) 

2934 except RETRYABLE_HTTP_ERRORS as e: 

2935 error_msg = ( 

2936 "Received error when attempting to retrieve " 

2937 "ECS metadata: %s" % e 

2938 ) 

2939 raise MetadataRetrievalError(error_msg=error_msg) 

2940 

2941 def full_url(self, relative_uri): 

2942 return f'http://{self.IP_ADDRESS}{relative_uri}' 

2943 

2944 

2945def get_environ_proxies(url): 

2946 if should_bypass_proxies(url): 

2947 return {} 

2948 else: 

2949 return getproxies() 

2950 

2951 

2952def should_bypass_proxies(url): 

2953 """ 

2954 Returns whether we should bypass proxies or not. 

2955 """ 

2956 # NOTE: requests allowed for ip/cidr entries in no_proxy env that we don't 

2957 # support current as urllib only checks DNS suffix 

2958 # If the system proxy settings indicate that this URL should be bypassed, 

2959 # don't proxy. 

2960 # The proxy_bypass function is incredibly buggy on OS X in early versions 

2961 # of Python 2.6, so allow this call to fail. Only catch the specific 

2962 # exceptions we've seen, though: this call failing in other ways can reveal 

2963 # legitimate problems. 

2964 try: 

2965 if proxy_bypass(urlparse(url).netloc): 

2966 return True 

2967 except (TypeError, socket.gaierror): 

2968 pass 

2969 

2970 return False 

2971 

2972 

2973def determine_content_length(body): 

2974 # No body, content length of 0 

2975 if not body: 

2976 return 0 

2977 

2978 # Try asking the body for it's length 

2979 try: 

2980 return len(body) 

2981 except (AttributeError, TypeError): 

2982 pass 

2983 

2984 # Try getting the length from a seekable stream 

2985 if hasattr(body, 'seek') and hasattr(body, 'tell'): 

2986 try: 

2987 orig_pos = body.tell() 

2988 body.seek(0, 2) 

2989 end_file_pos = body.tell() 

2990 body.seek(orig_pos) 

2991 return end_file_pos - orig_pos 

2992 except io.UnsupportedOperation: 

2993 # in case when body is, for example, io.BufferedIOBase object 

2994 # it has "seek" method which throws "UnsupportedOperation" 

2995 # exception in such case we want to fall back to "chunked" 

2996 # encoding 

2997 pass 

2998 # Failed to determine the length 

2999 return None 

3000 

3001 

3002def get_encoding_from_headers(headers, default='ISO-8859-1'): 

3003 """Returns encodings from given HTTP Header Dict. 

3004 

3005 :param headers: dictionary to extract encoding from. 

3006 :param default: default encoding if the content-type is text 

3007 """ 

3008 

3009 content_type = headers.get('content-type') 

3010 

3011 if not content_type: 

3012 return None 

3013 

3014 message = email.message.Message() 

3015 message['content-type'] = content_type 

3016 charset = message.get_param("charset") 

3017 

3018 if charset is not None: 

3019 return charset 

3020 

3021 if 'text' in content_type: 

3022 return default 

3023 

3024 

3025def calculate_md5(body, **kwargs): 

3026 if isinstance(body, (bytes, bytearray)): 

3027 binary_md5 = _calculate_md5_from_bytes(body) 

3028 else: 

3029 binary_md5 = _calculate_md5_from_file(body) 

3030 return base64.b64encode(binary_md5).decode('ascii') 

3031 

3032 

3033def _calculate_md5_from_bytes(body_bytes): 

3034 md5 = get_md5(body_bytes) 

3035 return md5.digest() 

3036 

3037 

3038def _calculate_md5_from_file(fileobj): 

3039 start_position = fileobj.tell() 

3040 md5 = get_md5() 

3041 for chunk in iter(lambda: fileobj.read(1024 * 1024), b''): 

3042 md5.update(chunk) 

3043 fileobj.seek(start_position) 

3044 return md5.digest() 

3045 

3046 

3047def conditionally_calculate_md5(params, **kwargs): 

3048 """Only add a Content-MD5 if the system supports it.""" 

3049 headers = params['headers'] 

3050 body = params['body'] 

3051 checksum_context = params.get('context', {}).get('checksum', {}) 

3052 checksum_algorithm = checksum_context.get('request_algorithm') 

3053 if checksum_algorithm and checksum_algorithm != 'conditional-md5': 

3054 # Skip for requests that will have a flexible checksum applied 

3055 return 

3056 # If a header matching the x-amz-checksum-* pattern is present, we 

3057 # assume a checksum has already been provided and an md5 is not needed 

3058 for header in headers: 

3059 if CHECKSUM_HEADER_PATTERN.match(header): 

3060 return 

3061 if MD5_AVAILABLE and body is not None and 'Content-MD5' not in headers: 

3062 md5_digest = calculate_md5(body, **kwargs) 

3063 params['headers']['Content-MD5'] = md5_digest 

3064 

3065 

3066class FileWebIdentityTokenLoader: 

3067 def __init__(self, web_identity_token_path, _open=open): 

3068 self._web_identity_token_path = web_identity_token_path 

3069 self._open = _open 

3070 

3071 def __call__(self): 

3072 with self._open(self._web_identity_token_path) as token_file: 

3073 return token_file.read() 

3074 

3075 

3076class SSOTokenLoader: 

3077 def __init__(self, cache=None): 

3078 if cache is None: 

3079 cache = {} 

3080 self._cache = cache 

3081 

3082 def _generate_cache_key(self, start_url, session_name): 

3083 input_str = start_url 

3084 if session_name is not None: 

3085 input_str = session_name 

3086 return hashlib.sha1(input_str.encode('utf-8')).hexdigest() 

3087 

3088 def save_token(self, start_url, token, session_name=None): 

3089 cache_key = self._generate_cache_key(start_url, session_name) 

3090 self._cache[cache_key] = token 

3091 

3092 def __call__(self, start_url, session_name=None): 

3093 cache_key = self._generate_cache_key(start_url, session_name) 

3094 logger.debug(f'Checking for cached token at: {cache_key}') 

3095 if cache_key not in self._cache: 

3096 name = start_url 

3097 if session_name is not None: 

3098 name = session_name 

3099 error_msg = f'Token for {name} does not exist' 

3100 raise SSOTokenLoadError(error_msg=error_msg) 

3101 

3102 token = self._cache[cache_key] 

3103 if 'accessToken' not in token or 'expiresAt' not in token: 

3104 error_msg = f'Token for {start_url} is invalid' 

3105 raise SSOTokenLoadError(error_msg=error_msg) 

3106 return token 

3107 

3108 

3109class EventbridgeSignerSetter: 

3110 _DEFAULT_PARTITION = 'aws' 

3111 _DEFAULT_DNS_SUFFIX = 'amazonaws.com' 

3112 

3113 def __init__(self, endpoint_resolver, region=None, endpoint_url=None): 

3114 self._endpoint_resolver = endpoint_resolver 

3115 self._region = region 

3116 self._endpoint_url = endpoint_url 

3117 

3118 def register(self, event_emitter): 

3119 event_emitter.register( 

3120 'before-parameter-build.events.PutEvents', 

3121 self.check_for_global_endpoint, 

3122 ) 

3123 event_emitter.register( 

3124 'before-call.events.PutEvents', self.set_endpoint_url 

3125 ) 

3126 

3127 def set_endpoint_url(self, params, context, **kwargs): 

3128 if 'eventbridge_endpoint' in context: 

3129 endpoint = context['eventbridge_endpoint'] 

3130 logger.debug(f"Rewriting URL from {params['url']} to {endpoint}") 

3131 params['url'] = endpoint 

3132 

3133 def check_for_global_endpoint(self, params, context, **kwargs): 

3134 endpoint = params.get('EndpointId') 

3135 if endpoint is None: 

3136 return 

3137 

3138 if len(endpoint) == 0: 

3139 raise InvalidEndpointConfigurationError( 

3140 msg='EndpointId must not be a zero length string' 

3141 ) 

3142 

3143 if not HAS_CRT: 

3144 raise MissingDependencyException( 

3145 msg="Using EndpointId requires an additional " 

3146 "dependency. You will need to pip install " 

3147 "botocore[crt] before proceeding." 

3148 ) 

3149 

3150 config = context.get('client_config') 

3151 endpoint_variant_tags = None 

3152 if config is not None: 

3153 if config.use_fips_endpoint: 

3154 raise InvalidEndpointConfigurationError( 

3155 msg="FIPS is not supported with EventBridge " 

3156 "multi-region endpoints." 

3157 ) 

3158 if config.use_dualstack_endpoint: 

3159 endpoint_variant_tags = ['dualstack'] 

3160 

3161 if self._endpoint_url is None: 

3162 # Validate endpoint is a valid hostname component 

3163 parts = urlparse(f'https://{endpoint}') 

3164 if parts.hostname != endpoint: 

3165 raise InvalidEndpointConfigurationError( 

3166 msg='EndpointId is not a valid hostname component.' 

3167 ) 

3168 resolved_endpoint = self._get_global_endpoint( 

3169 endpoint, endpoint_variant_tags=endpoint_variant_tags 

3170 ) 

3171 else: 

3172 resolved_endpoint = self._endpoint_url 

3173 

3174 context['eventbridge_endpoint'] = resolved_endpoint 

3175 context['auth_type'] = 'v4a' 

3176 

3177 def _get_global_endpoint(self, endpoint, endpoint_variant_tags=None): 

3178 resolver = self._endpoint_resolver 

3179 

3180 partition = resolver.get_partition_for_region(self._region) 

3181 if partition is None: 

3182 partition = self._DEFAULT_PARTITION 

3183 dns_suffix = resolver.get_partition_dns_suffix( 

3184 partition, endpoint_variant_tags=endpoint_variant_tags 

3185 ) 

3186 if dns_suffix is None: 

3187 dns_suffix = self._DEFAULT_DNS_SUFFIX 

3188 

3189 return f"https://{endpoint}.endpoint.events.{dns_suffix}/" 

3190 

3191 

3192def is_s3_accelerate_url(url): 

3193 """Does the URL match the S3 Accelerate endpoint scheme? 

3194 

3195 Virtual host naming style with bucket names in the netloc part of the URL 

3196 are not allowed by this function. 

3197 """ 

3198 if url is None: 

3199 return False 

3200 

3201 # Accelerate is only valid for Amazon endpoints. 

3202 url_parts = urlsplit(url) 

3203 if not url_parts.netloc.endswith( 

3204 'amazonaws.com' 

3205 ) or url_parts.scheme not in ['https', 'http']: 

3206 return False 

3207 

3208 # The first part of the URL must be s3-accelerate. 

3209 parts = url_parts.netloc.split('.') 

3210 if parts[0] != 's3-accelerate': 

3211 return False 

3212 

3213 # Url parts between 's3-accelerate' and 'amazonaws.com' which 

3214 # represent different url features. 

3215 feature_parts = parts[1:-2] 

3216 

3217 # There should be no duplicate URL parts. 

3218 if len(feature_parts) != len(set(feature_parts)): 

3219 return False 

3220 

3221 # Remaining parts must all be in the whitelist. 

3222 return all(p in S3_ACCELERATE_WHITELIST for p in feature_parts) 

3223 

3224 

3225class JSONFileCache: 

3226 """JSON file cache. 

3227 This provides a dict like interface that stores JSON serializable 

3228 objects. 

3229 The objects are serialized to JSON and stored in a file. These 

3230 values can be retrieved at a later time. 

3231 """ 

3232 

3233 CACHE_DIR = os.path.expanduser(os.path.join('~', '.aws', 'boto', 'cache')) 

3234 

3235 def __init__(self, working_dir=CACHE_DIR, dumps_func=None): 

3236 self._working_dir = working_dir 

3237 if dumps_func is None: 

3238 dumps_func = self._default_dumps 

3239 self._dumps = dumps_func 

3240 

3241 def _default_dumps(self, obj): 

3242 return json.dumps(obj, default=self._serialize_if_needed) 

3243 

3244 def __contains__(self, cache_key): 

3245 actual_key = self._convert_cache_key(cache_key) 

3246 return os.path.isfile(actual_key) 

3247 

3248 def __getitem__(self, cache_key): 

3249 """Retrieve value from a cache key.""" 

3250 actual_key = self._convert_cache_key(cache_key) 

3251 try: 

3252 with open(actual_key) as f: 

3253 return json.load(f) 

3254 except (OSError, ValueError): 

3255 raise KeyError(cache_key) 

3256 

3257 def __delitem__(self, cache_key): 

3258 actual_key = self._convert_cache_key(cache_key) 

3259 try: 

3260 key_path = Path(actual_key) 

3261 key_path.unlink() 

3262 except FileNotFoundError: 

3263 raise KeyError(cache_key) 

3264 

3265 def __setitem__(self, cache_key, value): 

3266 full_key = self._convert_cache_key(cache_key) 

3267 try: 

3268 file_content = self._dumps(value) 

3269 except (TypeError, ValueError): 

3270 raise ValueError( 

3271 f"Value cannot be cached, must be " 

3272 f"JSON serializable: {value}" 

3273 ) 

3274 if not os.path.isdir(self._working_dir): 

3275 os.makedirs(self._working_dir) 

3276 with os.fdopen( 

3277 os.open(full_key, os.O_WRONLY | os.O_CREAT, 0o600), 'w' 

3278 ) as f: 

3279 f.truncate() 

3280 f.write(file_content) 

3281 

3282 def _convert_cache_key(self, cache_key): 

3283 full_path = os.path.join(self._working_dir, cache_key + '.json') 

3284 return full_path 

3285 

3286 def _serialize_if_needed(self, value, iso=False): 

3287 if isinstance(value, datetime.datetime): 

3288 if iso: 

3289 return value.isoformat() 

3290 return value.strftime('%Y-%m-%dT%H:%M:%S%Z') 

3291 return value