Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/botocore/utils.py: 20%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

1699 statements  

1# Copyright 2012-2014 Amazon.com, Inc. or its affiliates. All Rights Reserved. 

2# 

3# Licensed under the Apache License, Version 2.0 (the "License"). You 

4# may not use this file except in compliance with the License. A copy of 

5# the License is located at 

6# 

7# http://aws.amazon.com/apache2.0/ 

8# 

9# or in the "license" file accompanying this file. This file is 

10# distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF 

11# ANY KIND, either express or implied. See the License for the specific 

12# language governing permissions and limitations under the License. 

13import base64 

14import binascii 

15import datetime 

16import email.message 

17import functools 

18import hashlib 

19import io 

20import logging 

21import os 

22import random 

23import re 

24import socket 

25import time 

26import warnings 

27import weakref 

28from datetime import datetime as _DatetimeClass 

29from ipaddress import ip_address 

30from pathlib import Path 

31from urllib.request import getproxies, proxy_bypass 

32 

33import dateutil.parser 

34from dateutil.tz import tzutc 

35from urllib3.exceptions import LocationParseError 

36 

37import botocore 

38import botocore.awsrequest 

39import botocore.httpsession 

40 

41# IP Regexes retained for backwards compatibility 

42from botocore.compat import ( 

43 HAS_CRT, 

44 HEX_PAT, # noqa: F401 

45 IPV4_PAT, # noqa: F401 

46 IPV4_RE, 

47 IPV6_ADDRZ_PAT, # noqa: F401 

48 IPV6_ADDRZ_RE, 

49 IPV6_PAT, # noqa: F401 

50 LS32_PAT, # noqa: F401 

51 MD5_AVAILABLE, 

52 UNRESERVED_PAT, # noqa: F401 

53 UNSAFE_URL_CHARS, 

54 ZONE_ID_PAT, # noqa: F401 

55 OrderedDict, 

56 get_current_datetime, 

57 get_md5, 

58 get_tzinfo_options, 

59 json, 

60 quote, 

61 urlparse, 

62 urlsplit, 

63 urlunsplit, 

64 zip_longest, 

65) 

66from botocore.exceptions import ( 

67 ClientError, 

68 ConfigNotFound, 

69 ConnectionClosedError, 

70 ConnectTimeoutError, 

71 EndpointConnectionError, 

72 HTTPClientError, 

73 InvalidDNSNameError, 

74 InvalidEndpointConfigurationError, 

75 InvalidExpressionError, 

76 InvalidHostLabelError, 

77 InvalidIMDSEndpointError, 

78 InvalidIMDSEndpointModeError, 

79 InvalidRegionError, 

80 MetadataRetrievalError, 

81 MissingDependencyException, 

82 ReadTimeoutError, 

83 SSOTokenLoadError, 

84 UnsupportedOutpostResourceError, 

85 UnsupportedS3AccesspointConfigurationError, 

86 UnsupportedS3ArnError, 

87 UnsupportedS3ConfigurationError, 

88 UnsupportedS3ControlArnError, 

89 UnsupportedS3ControlConfigurationError, 

90) 

91from botocore.plugin import ( 

92 PluginContext, 

93 reset_plugin_context, 

94 set_plugin_context, 

95) 

96 

97logger = logging.getLogger(__name__) 

98DEFAULT_METADATA_SERVICE_TIMEOUT = 1 

99METADATA_BASE_URL = 'http://169.254.169.254/' 

100METADATA_BASE_URL_IPv6 = 'http://[fd00:ec2::254]/' 

101METADATA_ENDPOINT_MODES = ('ipv4', 'ipv6') 

102 

103# These are chars that do not need to be urlencoded. 

104# Based on rfc2986, section 2.3 

105SAFE_CHARS = '-._~' 

106LABEL_RE = re.compile(r'[a-z0-9][a-z0-9\-]*[a-z0-9]') 

107RETRYABLE_HTTP_ERRORS = ( 

108 ReadTimeoutError, 

109 EndpointConnectionError, 

110 ConnectionClosedError, 

111 ConnectTimeoutError, 

112) 

113S3_ACCELERATE_WHITELIST = ['dualstack'] 

114# In switching events from using service name / endpoint prefix to service 

115# id, we have to preserve compatibility. This maps the instances where either 

116# is different than the transformed service id. 

117EVENT_ALIASES = { 

118 "api.mediatailor": "mediatailor", 

119 "api.pricing": "pricing", 

120 "api.sagemaker": "sagemaker", 

121 "apigateway": "api-gateway", 

122 "application-autoscaling": "application-auto-scaling", 

123 "appstream2": "appstream", 

124 "autoscaling": "auto-scaling", 

125 "autoscaling-plans": "auto-scaling-plans", 

126 "ce": "cost-explorer", 

127 "cloudhsmv2": "cloudhsm-v2", 

128 "cloudsearchdomain": "cloudsearch-domain", 

129 "cognito-idp": "cognito-identity-provider", 

130 "config": "config-service", 

131 "cur": "cost-and-usage-report-service", 

132 "data.iot": "iot-data-plane", 

133 "data.jobs.iot": "iot-jobs-data-plane", 

134 "data.mediastore": "mediastore-data", 

135 "datapipeline": "data-pipeline", 

136 "devicefarm": "device-farm", 

137 "directconnect": "direct-connect", 

138 "discovery": "application-discovery-service", 

139 "dms": "database-migration-service", 

140 "ds": "directory-service", 

141 "dynamodbstreams": "dynamodb-streams", 

142 "elasticbeanstalk": "elastic-beanstalk", 

143 "elasticfilesystem": "efs", 

144 "elasticloadbalancing": "elastic-load-balancing", 

145 "elasticmapreduce": "emr", 

146 "elastictranscoder": "elastic-transcoder", 

147 "elb": "elastic-load-balancing", 

148 "elbv2": "elastic-load-balancing-v2", 

149 "email": "ses", 

150 "entitlement.marketplace": "marketplace-entitlement-service", 

151 "es": "elasticsearch-service", 

152 "events": "eventbridge", 

153 "cloudwatch-events": "eventbridge", 

154 "iot-data": "iot-data-plane", 

155 "iot-jobs-data": "iot-jobs-data-plane", 

156 "kinesisanalytics": "kinesis-analytics", 

157 "kinesisvideo": "kinesis-video", 

158 "lex-models": "lex-model-building-service", 

159 "lex-runtime": "lex-runtime-service", 

160 "logs": "cloudwatch-logs", 

161 "machinelearning": "machine-learning", 

162 "marketplace-entitlement": "marketplace-entitlement-service", 

163 "marketplacecommerceanalytics": "marketplace-commerce-analytics", 

164 "metering.marketplace": "marketplace-metering", 

165 "meteringmarketplace": "marketplace-metering", 

166 "mgh": "migration-hub", 

167 "models.lex": "lex-model-building-service", 

168 "monitoring": "cloudwatch", 

169 "mturk-requester": "mturk", 

170 "opsworks-cm": "opsworkscm", 

171 "resourcegroupstaggingapi": "resource-groups-tagging-api", 

172 "route53": "route-53", 

173 "route53domains": "route-53-domains", 

174 "runtime.lex": "lex-runtime-service", 

175 "runtime.sagemaker": "sagemaker-runtime", 

176 "sdb": "simpledb", 

177 "secretsmanager": "secrets-manager", 

178 "serverlessrepo": "serverlessapplicationrepository", 

179 "servicecatalog": "service-catalog", 

180 "states": "sfn", 

181 "stepfunctions": "sfn", 

182 "storagegateway": "storage-gateway", 

183 "streams.dynamodb": "dynamodb-streams", 

184 "tagging": "resource-groups-tagging-api", 

185} 

186 

187 

188# This pattern can be used to detect if a header is a flexible checksum header 

189CHECKSUM_HEADER_PATTERN = re.compile( 

190 r'^X-Amz-Checksum-([a-z0-9]*)$', 

191 flags=re.IGNORECASE, 

192) 

193 

194PRIORITY_ORDERED_SUPPORTED_PROTOCOLS = ( 

195 'json', 

196 'rest-json', 

197 'rest-xml', 

198 'smithy-rpc-v2-cbor', 

199 'query', 

200 'ec2', 

201) 

202 

203 

204def ensure_boolean(val): 

205 """Ensures a boolean value if a string or boolean is provided 

206 

207 For strings, the value for True/False is case insensitive 

208 """ 

209 if isinstance(val, bool): 

210 return val 

211 elif isinstance(val, str): 

212 return val.lower() == 'true' 

213 else: 

214 return False 

215 

216 

217def resolve_imds_endpoint_mode(session): 

218 """Resolving IMDS endpoint mode to either IPv6 or IPv4. 

219 

220 ec2_metadata_service_endpoint_mode takes precedence over imds_use_ipv6. 

221 """ 

222 endpoint_mode = session.get_config_variable( 

223 'ec2_metadata_service_endpoint_mode' 

224 ) 

225 if endpoint_mode is not None: 

226 lendpoint_mode = endpoint_mode.lower() 

227 if lendpoint_mode not in METADATA_ENDPOINT_MODES: 

228 error_msg_kwargs = { 

229 'mode': endpoint_mode, 

230 'valid_modes': METADATA_ENDPOINT_MODES, 

231 } 

232 raise InvalidIMDSEndpointModeError(**error_msg_kwargs) 

233 return lendpoint_mode 

234 elif session.get_config_variable('imds_use_ipv6'): 

235 return 'ipv6' 

236 return 'ipv4' 

237 

238 

239def is_json_value_header(shape): 

240 """Determines if the provided shape is the special header type jsonvalue. 

241 

242 :type shape: botocore.shape 

243 :param shape: Shape to be inspected for the jsonvalue trait. 

244 

245 :return: True if this type is a jsonvalue, False otherwise 

246 :rtype: Bool 

247 """ 

248 return ( 

249 hasattr(shape, 'serialization') 

250 and shape.serialization.get('jsonvalue', False) 

251 and shape.serialization.get('location') == 'header' 

252 and shape.type_name == 'string' 

253 ) 

254 

255 

256def has_header(header_name, headers): 

257 """Case-insensitive check for header key.""" 

258 if header_name is None: 

259 return False 

260 elif isinstance(headers, botocore.awsrequest.HeadersDict): 

261 return header_name in headers 

262 else: 

263 return header_name.lower() in [key.lower() for key in headers.keys()] 

264 

265 

266def get_service_module_name(service_model): 

267 """Returns the module name for a service 

268 

269 This is the value used in both the documentation and client class name 

270 """ 

271 name = service_model.metadata.get( 

272 'serviceAbbreviation', 

273 service_model.metadata.get( 

274 'serviceFullName', service_model.service_name 

275 ), 

276 ) 

277 name = name.replace('Amazon', '') 

278 name = name.replace('AWS', '') 

279 name = re.sub(r'\W+', '', name) 

280 return name 

281 

282 

283def normalize_url_path(path): 

284 if not path: 

285 return '/' 

286 return remove_dot_segments(path) 

287 

288 

289def normalize_boolean(val): 

290 """Returns None if val is None, otherwise ensure value 

291 converted to boolean""" 

292 if val is None: 

293 return val 

294 else: 

295 return ensure_boolean(val) 

296 

297 

298def remove_dot_segments(url): 

299 # RFC 3986, section 5.2.4 "Remove Dot Segments" 

300 # Also, AWS services require consecutive slashes to be removed, 

301 # so that's done here as well 

302 if not url: 

303 return '' 

304 input_url = url.split('/') 

305 output_list = [] 

306 for x in input_url: 

307 if x and x != '.': 

308 if x == '..': 

309 if output_list: 

310 output_list.pop() 

311 else: 

312 output_list.append(x) 

313 

314 if url[0] == '/': 

315 first = '/' 

316 else: 

317 first = '' 

318 if url[-1] == '/' and output_list: 

319 last = '/' 

320 else: 

321 last = '' 

322 return first + '/'.join(output_list) + last 

323 

324 

325def validate_jmespath_for_set(expression): 

326 # Validates a limited jmespath expression to determine if we can set a 

327 # value based on it. Only works with dotted paths. 

328 if not expression or expression == '.': 

329 raise InvalidExpressionError(expression=expression) 

330 

331 for invalid in ['[', ']', '*']: 

332 if invalid in expression: 

333 raise InvalidExpressionError(expression=expression) 

334 

335 

336def set_value_from_jmespath(source, expression, value, is_first=True): 

337 # This takes a (limited) jmespath-like expression & can set a value based 

338 # on it. 

339 # Limitations: 

340 # * Only handles dotted lookups 

341 # * No offsets/wildcards/slices/etc. 

342 if is_first: 

343 validate_jmespath_for_set(expression) 

344 

345 bits = expression.split('.', 1) 

346 current_key, remainder = bits[0], bits[1] if len(bits) > 1 else '' 

347 

348 if not current_key: 

349 raise InvalidExpressionError(expression=expression) 

350 

351 if remainder: 

352 if current_key not in source: 

353 # We've got something in the expression that's not present in the 

354 # source (new key). If there's any more bits, we'll set the key 

355 # with an empty dictionary. 

356 source[current_key] = {} 

357 

358 return set_value_from_jmespath( 

359 source[current_key], remainder, value, is_first=False 

360 ) 

361 

362 # If we're down to a single key, set it. 

363 source[current_key] = value 

364 

365 

366def is_global_accesspoint(context): 

367 """Determine if request is intended for an MRAP accesspoint.""" 

368 s3_accesspoint = context.get('s3_accesspoint', {}) 

369 is_global = s3_accesspoint.get('region') == '' 

370 return is_global 

371 

372 

373def create_nested_client(session, service_name, **kwargs): 

374 # If a client is created from within a plugin based on the environment variable, 

375 # an infinite loop could arise. Any clients created from within another client 

376 # must use this method to prevent infinite loops. 

377 ctx = PluginContext(plugins="DISABLED") 

378 token = set_plugin_context(ctx) 

379 try: 

380 return session.create_client(service_name, **kwargs) 

381 finally: 

382 reset_plugin_context(token) 

383 

384 

385class _RetriesExceededError(Exception): 

386 """Internal exception used when the number of retries are exceeded.""" 

387 

388 pass 

389 

390 

391class BadIMDSRequestError(Exception): 

392 def __init__(self, request): 

393 self.request = request 

394 

395 

396class IMDSFetcher: 

397 _RETRIES_EXCEEDED_ERROR_CLS = _RetriesExceededError 

398 _TOKEN_PATH = 'latest/api/token' 

399 _TOKEN_TTL = '21600' 

400 

401 def __init__( 

402 self, 

403 timeout=DEFAULT_METADATA_SERVICE_TIMEOUT, 

404 num_attempts=1, 

405 base_url=METADATA_BASE_URL, 

406 env=None, 

407 user_agent=None, 

408 config=None, 

409 ): 

410 self._timeout = timeout 

411 self._num_attempts = num_attempts 

412 if config is None: 

413 config = {} 

414 self._base_url = self._select_base_url(base_url, config) 

415 self._config = config 

416 

417 if env is None: 

418 env = os.environ.copy() 

419 self._disabled = ( 

420 env.get('AWS_EC2_METADATA_DISABLED', 'false').lower() == 'true' 

421 ) 

422 self._imds_v1_disabled = config.get('ec2_metadata_v1_disabled') 

423 self._user_agent = user_agent 

424 self._session = botocore.httpsession.URLLib3Session( 

425 timeout=self._timeout, 

426 proxies=get_environ_proxies(self._base_url), 

427 ) 

428 

429 def get_base_url(self): 

430 return self._base_url 

431 

432 def _select_base_url(self, base_url, config): 

433 if config is None: 

434 config = {} 

435 

436 requires_ipv6 = ( 

437 config.get('ec2_metadata_service_endpoint_mode') == 'ipv6' 

438 ) 

439 custom_metadata_endpoint = config.get('ec2_metadata_service_endpoint') 

440 

441 if requires_ipv6 and custom_metadata_endpoint: 

442 logger.warning( 

443 "Custom endpoint and IMDS_USE_IPV6 are both set. Using custom endpoint." 

444 ) 

445 

446 chosen_base_url = None 

447 

448 if base_url != METADATA_BASE_URL: 

449 chosen_base_url = base_url 

450 elif custom_metadata_endpoint: 

451 chosen_base_url = custom_metadata_endpoint 

452 elif requires_ipv6: 

453 chosen_base_url = METADATA_BASE_URL_IPv6 

454 else: 

455 chosen_base_url = METADATA_BASE_URL 

456 

457 logger.debug("IMDS ENDPOINT: %s", chosen_base_url) 

458 if not is_valid_uri(chosen_base_url): 

459 raise InvalidIMDSEndpointError(endpoint=chosen_base_url) 

460 

461 return chosen_base_url 

462 

463 def _construct_url(self, path): 

464 sep = '' 

465 if self._base_url and not self._base_url.endswith('/'): 

466 sep = '/' 

467 return f'{self._base_url}{sep}{path}' 

468 

469 def _fetch_metadata_token(self): 

470 self._assert_enabled() 

471 url = self._construct_url(self._TOKEN_PATH) 

472 headers = { 

473 'x-aws-ec2-metadata-token-ttl-seconds': self._TOKEN_TTL, 

474 } 

475 self._add_user_agent(headers) 

476 request = botocore.awsrequest.AWSRequest( 

477 method='PUT', url=url, headers=headers 

478 ) 

479 for i in range(self._num_attempts): 

480 try: 

481 response = self._session.send(request.prepare()) 

482 if response.status_code == 200: 

483 return response.text 

484 elif response.status_code in (404, 403, 405): 

485 return None 

486 elif response.status_code in (400,): 

487 raise BadIMDSRequestError(request) 

488 except ReadTimeoutError: 

489 return None 

490 except RETRYABLE_HTTP_ERRORS as e: 

491 logger.debug( 

492 "Caught retryable HTTP exception while making metadata " 

493 "service request to %s: %s", 

494 url, 

495 e, 

496 exc_info=True, 

497 ) 

498 except HTTPClientError as e: 

499 if isinstance(e.kwargs.get('error'), LocationParseError): 

500 raise InvalidIMDSEndpointError(endpoint=url, error=e) 

501 else: 

502 raise 

503 return None 

504 

505 def _get_request(self, url_path, retry_func, token=None): 

506 """Make a get request to the Instance Metadata Service. 

507 

508 :type url_path: str 

509 :param url_path: The path component of the URL to make a get request. 

510 This arg is appended to the base_url that was provided in the 

511 initializer. 

512 

513 :type retry_func: callable 

514 :param retry_func: A function that takes the response as an argument 

515 and determines if it needs to retry. By default empty and non 

516 200 OK responses are retried. 

517 

518 :type token: str 

519 :param token: Metadata token to send along with GET requests to IMDS. 

520 """ 

521 self._assert_enabled() 

522 if not token: 

523 self._assert_v1_enabled() 

524 if retry_func is None: 

525 retry_func = self._default_retry 

526 url = self._construct_url(url_path) 

527 headers = {} 

528 if token is not None: 

529 headers['x-aws-ec2-metadata-token'] = token 

530 self._add_user_agent(headers) 

531 for i in range(self._num_attempts): 

532 try: 

533 request = botocore.awsrequest.AWSRequest( 

534 method='GET', url=url, headers=headers 

535 ) 

536 response = self._session.send(request.prepare()) 

537 if not retry_func(response): 

538 return response 

539 except RETRYABLE_HTTP_ERRORS as e: 

540 logger.debug( 

541 "Caught retryable HTTP exception while making metadata " 

542 "service request to %s: %s", 

543 url, 

544 e, 

545 exc_info=True, 

546 ) 

547 raise self._RETRIES_EXCEEDED_ERROR_CLS() 

548 

549 def _add_user_agent(self, headers): 

550 if self._user_agent is not None: 

551 headers['User-Agent'] = self._user_agent 

552 

553 def _assert_enabled(self): 

554 if self._disabled: 

555 logger.debug("Access to EC2 metadata has been disabled.") 

556 raise self._RETRIES_EXCEEDED_ERROR_CLS() 

557 

558 def _assert_v1_enabled(self): 

559 if self._imds_v1_disabled: 

560 raise MetadataRetrievalError( 

561 error_msg="Unable to retrieve token for use in IMDSv2 call and IMDSv1 has been disabled" 

562 ) 

563 

564 def _default_retry(self, response): 

565 return self._is_non_ok_response(response) or self._is_empty(response) 

566 

567 def _is_non_ok_response(self, response): 

568 if response.status_code != 200: 

569 self._log_imds_response(response, 'non-200', log_body=True) 

570 return True 

571 return False 

572 

573 def _is_empty(self, response): 

574 if not response.content: 

575 self._log_imds_response(response, 'no body', log_body=True) 

576 return True 

577 return False 

578 

579 def _log_imds_response(self, response, reason_to_log, log_body=False): 

580 statement = ( 

581 "Metadata service returned %s response " 

582 "with status code of %s for url: %s" 

583 ) 

584 logger_args = [reason_to_log, response.status_code, response.url] 

585 if log_body: 

586 statement += ", content body: %s" 

587 logger_args.append(response.content) 

588 logger.debug(statement, *logger_args) 

589 

590 

591class InstanceMetadataFetcher(IMDSFetcher): 

592 _URL_PATH = 'latest/meta-data/iam/security-credentials/' 

593 _REQUIRED_CREDENTIAL_FIELDS = [ 

594 'AccessKeyId', 

595 'SecretAccessKey', 

596 'Token', 

597 'Expiration', 

598 ] 

599 

600 def retrieve_iam_role_credentials(self): 

601 try: 

602 token = self._fetch_metadata_token() 

603 role_name = self._get_iam_role(token) 

604 credentials = self._get_credentials(role_name, token) 

605 if self._contains_all_credential_fields(credentials): 

606 credentials = { 

607 'role_name': role_name, 

608 'access_key': credentials['AccessKeyId'], 

609 'secret_key': credentials['SecretAccessKey'], 

610 'token': credentials['Token'], 

611 'expiry_time': credentials['Expiration'], 

612 } 

613 self._evaluate_expiration(credentials) 

614 return credentials 

615 else: 

616 # IMDS can return a 200 response that has a JSON formatted 

617 # error message (i.e. if ec2 is not trusted entity for the 

618 # attached role). We do not necessarily want to retry for 

619 # these and we also do not necessarily want to raise a key 

620 # error. So at least log the problematic response and return 

621 # an empty dictionary to signal that it was not able to 

622 # retrieve credentials. These error will contain both a 

623 # Code and Message key. 

624 if 'Code' in credentials and 'Message' in credentials: 

625 logger.debug( 

626 'Error response received when retrieving' 

627 'credentials: %s.', 

628 credentials, 

629 ) 

630 return {} 

631 except self._RETRIES_EXCEEDED_ERROR_CLS: 

632 logger.debug( 

633 "Max number of attempts exceeded (%s) when " 

634 "attempting to retrieve data from metadata service.", 

635 self._num_attempts, 

636 ) 

637 except BadIMDSRequestError as e: 

638 logger.debug("Bad IMDS request: %s", e.request) 

639 return {} 

640 

641 def _get_iam_role(self, token=None): 

642 return self._get_request( 

643 url_path=self._URL_PATH, 

644 retry_func=self._needs_retry_for_role_name, 

645 token=token, 

646 ).text 

647 

648 def _get_credentials(self, role_name, token=None): 

649 r = self._get_request( 

650 url_path=self._URL_PATH + role_name, 

651 retry_func=self._needs_retry_for_credentials, 

652 token=token, 

653 ) 

654 return json.loads(r.text) 

655 

656 def _is_invalid_json(self, response): 

657 try: 

658 json.loads(response.text) 

659 return False 

660 except ValueError: 

661 self._log_imds_response(response, 'invalid json') 

662 return True 

663 

664 def _needs_retry_for_role_name(self, response): 

665 return self._is_non_ok_response(response) or self._is_empty(response) 

666 

667 def _needs_retry_for_credentials(self, response): 

668 return ( 

669 self._is_non_ok_response(response) 

670 or self._is_empty(response) 

671 or self._is_invalid_json(response) 

672 ) 

673 

674 def _contains_all_credential_fields(self, credentials): 

675 for field in self._REQUIRED_CREDENTIAL_FIELDS: 

676 if field not in credentials: 

677 logger.debug( 

678 'Retrieved credentials is missing required field: %s', 

679 field, 

680 ) 

681 return False 

682 return True 

683 

684 def _evaluate_expiration(self, credentials): 

685 expiration = credentials.get("expiry_time") 

686 if expiration is None: 

687 return 

688 try: 

689 expiration = datetime.datetime.strptime( 

690 expiration, "%Y-%m-%dT%H:%M:%SZ" 

691 ) 

692 refresh_interval = self._config.get( 

693 "ec2_credential_refresh_window", 60 * 10 

694 ) 

695 jitter = random.randint(120, 600) # Between 2 to 10 minutes 

696 refresh_interval_with_jitter = refresh_interval + jitter 

697 current_time = get_current_datetime() 

698 refresh_offset = datetime.timedelta( 

699 seconds=refresh_interval_with_jitter 

700 ) 

701 extension_time = expiration - refresh_offset 

702 if current_time >= extension_time: 

703 new_time = current_time + refresh_offset 

704 credentials["expiry_time"] = new_time.strftime( 

705 "%Y-%m-%dT%H:%M:%SZ" 

706 ) 

707 logger.info( 

708 "Attempting credential expiration extension due to a " 

709 "credential service availability issue. A refresh of " 

710 "these credentials will be attempted again within " 

711 "the next %.0f minutes.", 

712 refresh_interval_with_jitter / 60, 

713 ) 

714 except ValueError: 

715 logger.debug( 

716 "Unable to parse expiry_time in %s", credentials['expiry_time'] 

717 ) 

718 

719 

720class IMDSRegionProvider: 

721 def __init__(self, session, environ=None, fetcher=None): 

722 """Initialize IMDSRegionProvider. 

723 :type session: :class:`botocore.session.Session` 

724 :param session: The session is needed to look up configuration for 

725 how to contact the instance metadata service. Specifically the 

726 whether or not it should use the IMDS region at all, and if so how 

727 to configure the timeout and number of attempts to reach the 

728 service. 

729 :type environ: None or dict 

730 :param environ: A dictionary of environment variables to use. If 

731 ``None`` is the argument then ``os.environ`` will be used by 

732 default. 

733 :type fecther: :class:`botocore.utils.InstanceMetadataRegionFetcher` 

734 :param fetcher: The class to actually handle the fetching of the region 

735 from the IMDS. If not provided a default one will be created. 

736 """ 

737 self._session = session 

738 if environ is None: 

739 environ = os.environ 

740 self._environ = environ 

741 self._fetcher = fetcher 

742 

743 def provide(self): 

744 """Provide the region value from IMDS.""" 

745 instance_region = self._get_instance_metadata_region() 

746 return instance_region 

747 

748 def _get_instance_metadata_region(self): 

749 fetcher = self._get_fetcher() 

750 region = fetcher.retrieve_region() 

751 return region 

752 

753 def _get_fetcher(self): 

754 if self._fetcher is None: 

755 self._fetcher = self._create_fetcher() 

756 return self._fetcher 

757 

758 def _create_fetcher(self): 

759 metadata_timeout = self._session.get_config_variable( 

760 'metadata_service_timeout' 

761 ) 

762 metadata_num_attempts = self._session.get_config_variable( 

763 'metadata_service_num_attempts' 

764 ) 

765 imds_config = { 

766 'ec2_metadata_service_endpoint': self._session.get_config_variable( 

767 'ec2_metadata_service_endpoint' 

768 ), 

769 'ec2_metadata_service_endpoint_mode': resolve_imds_endpoint_mode( 

770 self._session 

771 ), 

772 'ec2_metadata_v1_disabled': self._session.get_config_variable( 

773 'ec2_metadata_v1_disabled' 

774 ), 

775 } 

776 fetcher = InstanceMetadataRegionFetcher( 

777 timeout=metadata_timeout, 

778 num_attempts=metadata_num_attempts, 

779 env=self._environ, 

780 user_agent=self._session.user_agent(), 

781 config=imds_config, 

782 ) 

783 return fetcher 

784 

785 

786class InstanceMetadataRegionFetcher(IMDSFetcher): 

787 _URL_PATH = 'latest/meta-data/placement/availability-zone/' 

788 

789 def retrieve_region(self): 

790 """Get the current region from the instance metadata service. 

791 :rvalue: str 

792 :returns: The region the current instance is running in or None 

793 if the instance metadata service cannot be contacted or does not 

794 give a valid response. 

795 :rtype: None or str 

796 :returns: Returns the region as a string if it is configured to use 

797 IMDS as a region source. Otherwise returns ``None``. It will also 

798 return ``None`` if it fails to get the region from IMDS due to 

799 exhausting its retries or not being able to connect. 

800 """ 

801 try: 

802 region = self._get_region() 

803 return region 

804 except self._RETRIES_EXCEEDED_ERROR_CLS: 

805 logger.debug( 

806 "Max number of attempts exceeded (%s) when " 

807 "attempting to retrieve data from metadata service.", 

808 self._num_attempts, 

809 ) 

810 return None 

811 

812 def _get_region(self): 

813 token = self._fetch_metadata_token() 

814 response = self._get_request( 

815 url_path=self._URL_PATH, 

816 retry_func=self._default_retry, 

817 token=token, 

818 ) 

819 availability_zone = response.text 

820 region = availability_zone[:-1] 

821 return region 

822 

823 

824def merge_dicts(dict1, dict2, append_lists=False): 

825 """Given two dict, merge the second dict into the first. 

826 

827 The dicts can have arbitrary nesting. 

828 

829 :param append_lists: If true, instead of clobbering a list with the new 

830 value, append all of the new values onto the original list. 

831 """ 

832 for key in dict2: 

833 if isinstance(dict2[key], dict): 

834 if key in dict1 and key in dict2: 

835 merge_dicts(dict1[key], dict2[key]) 

836 else: 

837 dict1[key] = dict2[key] 

838 # If the value is a list and the ``append_lists`` flag is set, 

839 # append the new values onto the original list 

840 elif isinstance(dict2[key], list) and append_lists: 

841 # The value in dict1 must be a list in order to append new 

842 # values onto it. 

843 if key in dict1 and isinstance(dict1[key], list): 

844 dict1[key].extend(dict2[key]) 

845 else: 

846 dict1[key] = dict2[key] 

847 else: 

848 # At scalar types, we iterate and merge the 

849 # current dict that we're on. 

850 dict1[key] = dict2[key] 

851 

852 

853def lowercase_dict(original): 

854 """Copies the given dictionary ensuring all keys are lowercase strings.""" 

855 copy = {} 

856 for key in original: 

857 copy[key.lower()] = original[key] 

858 return copy 

859 

860 

861def parse_key_val_file(filename, _open=open): 

862 try: 

863 with _open(filename) as f: 

864 contents = f.read() 

865 return parse_key_val_file_contents(contents) 

866 except OSError: 

867 raise ConfigNotFound(path=filename) 

868 

869 

870def parse_key_val_file_contents(contents): 

871 # This was originally extracted from the EC2 credential provider, which was 

872 # fairly lenient in its parsing. We only try to parse key/val pairs if 

873 # there's a '=' in the line. 

874 final = {} 

875 for line in contents.splitlines(): 

876 if '=' not in line: 

877 continue 

878 key, val = line.split('=', 1) 

879 key = key.strip() 

880 val = val.strip() 

881 final[key] = val 

882 return final 

883 

884 

885def percent_encode_sequence(mapping, safe=SAFE_CHARS): 

886 """Urlencode a dict or list into a string. 

887 

888 This is similar to urllib.urlencode except that: 

889 

890 * It uses quote, and not quote_plus 

891 * It has a default list of safe chars that don't need 

892 to be encoded, which matches what AWS services expect. 

893 

894 If any value in the input ``mapping`` is a list type, 

895 then each list element wil be serialized. This is the equivalent 

896 to ``urlencode``'s ``doseq=True`` argument. 

897 

898 This function should be preferred over the stdlib 

899 ``urlencode()`` function. 

900 

901 :param mapping: Either a dict to urlencode or a list of 

902 ``(key, value)`` pairs. 

903 

904 """ 

905 encoded_pairs = [] 

906 if hasattr(mapping, 'items'): 

907 pairs = mapping.items() 

908 else: 

909 pairs = mapping 

910 for key, value in pairs: 

911 if isinstance(value, list): 

912 for element in value: 

913 encoded_pairs.append( 

914 f'{percent_encode(key)}={percent_encode(element)}' 

915 ) 

916 else: 

917 encoded_pairs.append( 

918 f'{percent_encode(key)}={percent_encode(value)}' 

919 ) 

920 return '&'.join(encoded_pairs) 

921 

922 

923def percent_encode(input_str, safe=SAFE_CHARS): 

924 """Urlencodes a string. 

925 

926 Whereas percent_encode_sequence handles taking a dict/sequence and 

927 producing a percent encoded string, this function deals only with 

928 taking a string (not a dict/sequence) and percent encoding it. 

929 

930 If given the binary type, will simply URL encode it. If given the 

931 text type, will produce the binary type by UTF-8 encoding the 

932 text. If given something else, will convert it to the text type 

933 first. 

934 """ 

935 # If its not a binary or text string, make it a text string. 

936 if not isinstance(input_str, (bytes, str)): 

937 input_str = str(input_str) 

938 # If it's not bytes, make it bytes by UTF-8 encoding it. 

939 if not isinstance(input_str, bytes): 

940 input_str = input_str.encode('utf-8') 

941 return quote(input_str, safe=safe) 

942 

943 

944def _epoch_seconds_to_datetime(value, tzinfo): 

945 """Parse numerical epoch timestamps (seconds since 1970) into a 

946 ``datetime.datetime`` in UTC using ``datetime.timedelta``. This is intended 

947 as fallback when ``fromtimestamp`` raises ``OverflowError`` or ``OSError``. 

948 

949 :type value: float or int 

950 :param value: The Unix timestamps as number. 

951 

952 :type tzinfo: callable 

953 :param tzinfo: A ``datetime.tzinfo`` class or compatible callable. 

954 """ 

955 epoch_zero = datetime.datetime(1970, 1, 1, 0, 0, 0, tzinfo=tzutc()) 

956 epoch_zero_localized = epoch_zero.astimezone(tzinfo()) 

957 return epoch_zero_localized + datetime.timedelta(seconds=value) 

958 

959 

960def _parse_timestamp_with_tzinfo(value, tzinfo): 

961 """Parse timestamp with pluggable tzinfo options.""" 

962 if isinstance(value, (int, float)): 

963 # Possibly an epoch time. 

964 return datetime.datetime.fromtimestamp(value, tzinfo()) 

965 else: 

966 try: 

967 return datetime.datetime.fromtimestamp(float(value), tzinfo()) 

968 except (TypeError, ValueError): 

969 pass 

970 try: 

971 # In certain cases, a timestamp marked with GMT can be parsed into a 

972 # different time zone, so here we provide a context which will 

973 # enforce that GMT == UTC. 

974 return dateutil.parser.parse(value, tzinfos={'GMT': tzutc()}) 

975 except (TypeError, ValueError) as e: 

976 raise ValueError(f'Invalid timestamp "{value}": {e}') 

977 

978 

979def parse_timestamp(value): 

980 """Parse a timestamp into a datetime object. 

981 

982 Supported formats: 

983 

984 * iso8601 

985 * rfc822 

986 * epoch (value is an integer) 

987 

988 This will return a ``datetime.datetime`` object. 

989 

990 """ 

991 tzinfo_options = get_tzinfo_options() 

992 for tzinfo in tzinfo_options: 

993 try: 

994 return _parse_timestamp_with_tzinfo(value, tzinfo) 

995 except (OSError, OverflowError) as e: 

996 logger.debug( 

997 'Unable to parse timestamp with "%s" timezone info.', 

998 tzinfo.__name__, 

999 exc_info=e, 

1000 ) 

1001 # For numeric values attempt fallback to using fromtimestamp-free method. 

1002 # From Python's ``datetime.datetime.fromtimestamp`` documentation: "This 

1003 # may raise ``OverflowError``, if the timestamp is out of the range of 

1004 # values supported by the platform C localtime() function, and ``OSError`` 

1005 # on localtime() failure. It's common for this to be restricted to years 

1006 # from 1970 through 2038." 

1007 try: 

1008 numeric_value = float(value) 

1009 except (TypeError, ValueError): 

1010 pass 

1011 else: 

1012 try: 

1013 for tzinfo in tzinfo_options: 

1014 return _epoch_seconds_to_datetime(numeric_value, tzinfo=tzinfo) 

1015 except (OSError, OverflowError) as e: 

1016 logger.debug( 

1017 'Unable to parse timestamp using fallback method with "%s" ' 

1018 'timezone info.', 

1019 tzinfo.__name__, 

1020 exc_info=e, 

1021 ) 

1022 raise RuntimeError( 

1023 f'Unable to calculate correct timezone offset for "{value}"' 

1024 ) 

1025 

1026 

1027def parse_to_aware_datetime(value): 

1028 """Converted the passed in value to a datetime object with tzinfo. 

1029 

1030 This function can be used to normalize all timestamp inputs. This 

1031 function accepts a number of different types of inputs, but 

1032 will always return a datetime.datetime object with time zone 

1033 information. 

1034 

1035 The input param ``value`` can be one of several types: 

1036 

1037 * A datetime object (both naive and aware) 

1038 * An integer representing the epoch time (can also be a string 

1039 of the integer, i.e '0', instead of 0). The epoch time is 

1040 considered to be UTC. 

1041 * An iso8601 formatted timestamp. This does not need to be 

1042 a complete timestamp, it can contain just the date portion 

1043 without the time component. 

1044 

1045 The returned value will be a datetime object that will have tzinfo. 

1046 If no timezone info was provided in the input value, then UTC is 

1047 assumed, not local time. 

1048 

1049 """ 

1050 # This is a general purpose method that handles several cases of 

1051 # converting the provided value to a string timestamp suitable to be 

1052 # serialized to an http request. It can handle: 

1053 # 1) A datetime.datetime object. 

1054 if isinstance(value, _DatetimeClass): 

1055 datetime_obj = value 

1056 else: 

1057 # 2) A string object that's formatted as a timestamp. 

1058 # We document this as being an iso8601 timestamp, although 

1059 # parse_timestamp is a bit more flexible. 

1060 datetime_obj = parse_timestamp(value) 

1061 if datetime_obj.tzinfo is None: 

1062 # I think a case would be made that if no time zone is provided, 

1063 # we should use the local time. However, to restore backwards 

1064 # compat, the previous behavior was to assume UTC, which is 

1065 # what we're going to do here. 

1066 datetime_obj = datetime_obj.replace(tzinfo=tzutc()) 

1067 else: 

1068 datetime_obj = datetime_obj.astimezone(tzutc()) 

1069 return datetime_obj 

1070 

1071 

1072def datetime2timestamp(dt, default_timezone=None): 

1073 """Calculate the timestamp based on the given datetime instance. 

1074 

1075 :type dt: datetime 

1076 :param dt: A datetime object to be converted into timestamp 

1077 :type default_timezone: tzinfo 

1078 :param default_timezone: If it is provided as None, we treat it as tzutc(). 

1079 But it is only used when dt is a naive datetime. 

1080 :returns: The timestamp 

1081 """ 

1082 epoch = datetime.datetime(1970, 1, 1) 

1083 if dt.tzinfo is None: 

1084 if default_timezone is None: 

1085 default_timezone = tzutc() 

1086 dt = dt.replace(tzinfo=default_timezone) 

1087 d = dt.replace(tzinfo=None) - dt.utcoffset() - epoch 

1088 return d.total_seconds() 

1089 

1090 

1091def calculate_sha256(body, as_hex=False): 

1092 """Calculate a sha256 checksum. 

1093 

1094 This method will calculate the sha256 checksum of a file like 

1095 object. Note that this method will iterate through the entire 

1096 file contents. The caller is responsible for ensuring the proper 

1097 starting position of the file and ``seek()``'ing the file back 

1098 to its starting location if other consumers need to read from 

1099 the file like object. 

1100 

1101 :param body: Any file like object. The file must be opened 

1102 in binary mode such that a ``.read()`` call returns bytes. 

1103 :param as_hex: If True, then the hex digest is returned. 

1104 If False, then the digest (as binary bytes) is returned. 

1105 

1106 :returns: The sha256 checksum 

1107 

1108 """ 

1109 checksum = hashlib.sha256() 

1110 for chunk in iter(lambda: body.read(1024 * 1024), b''): 

1111 checksum.update(chunk) 

1112 if as_hex: 

1113 return checksum.hexdigest() 

1114 else: 

1115 return checksum.digest() 

1116 

1117 

1118def calculate_tree_hash(body): 

1119 """Calculate a tree hash checksum. 

1120 

1121 For more information see: 

1122 

1123 http://docs.aws.amazon.com/amazonglacier/latest/dev/checksum-calculations.html 

1124 

1125 :param body: Any file like object. This has the same constraints as 

1126 the ``body`` param in calculate_sha256 

1127 

1128 :rtype: str 

1129 :returns: The hex version of the calculated tree hash 

1130 

1131 """ 

1132 chunks = [] 

1133 required_chunk_size = 1024 * 1024 

1134 sha256 = hashlib.sha256 

1135 for chunk in iter(lambda: body.read(required_chunk_size), b''): 

1136 chunks.append(sha256(chunk).digest()) 

1137 if not chunks: 

1138 return sha256(b'').hexdigest() 

1139 while len(chunks) > 1: 

1140 new_chunks = [] 

1141 for first, second in _in_pairs(chunks): 

1142 if second is not None: 

1143 new_chunks.append(sha256(first + second).digest()) 

1144 else: 

1145 # We're at the end of the list and there's no pair left. 

1146 new_chunks.append(first) 

1147 chunks = new_chunks 

1148 return binascii.hexlify(chunks[0]).decode('ascii') 

1149 

1150 

1151def _in_pairs(iterable): 

1152 # Creates iterator that iterates over the list in pairs: 

1153 # for a, b in _in_pairs([0, 1, 2, 3, 4]): 

1154 # print(a, b) 

1155 # 

1156 # will print: 

1157 # 0, 1 

1158 # 2, 3 

1159 # 4, None 

1160 shared_iter = iter(iterable) 

1161 # Note that zip_longest is a compat import that uses 

1162 # the itertools izip_longest. This creates an iterator, 

1163 # this call below does _not_ immediately create the list 

1164 # of pairs. 

1165 return zip_longest(shared_iter, shared_iter) 

1166 

1167 

1168class CachedProperty: 

1169 """A read only property that caches the initially computed value. 

1170 

1171 This descriptor will only call the provided ``fget`` function once. 

1172 Subsequent access to this property will return the cached value. 

1173 

1174 """ 

1175 

1176 def __init__(self, fget): 

1177 self._fget = fget 

1178 

1179 def __get__(self, obj, cls): 

1180 if obj is None: 

1181 return self 

1182 else: 

1183 computed_value = self._fget(obj) 

1184 obj.__dict__[self._fget.__name__] = computed_value 

1185 return computed_value 

1186 

1187 

1188class ArgumentGenerator: 

1189 """Generate sample input based on a shape model. 

1190 

1191 This class contains a ``generate_skeleton`` method that will take 

1192 an input/output shape (created from ``botocore.model``) and generate 

1193 a sample dictionary corresponding to the input/output shape. 

1194 

1195 The specific values used are place holder values. For strings either an 

1196 empty string or the member name can be used, for numbers 0 or 0.0 is used. 

1197 The intended usage of this class is to generate the *shape* of the input 

1198 structure. 

1199 

1200 This can be useful for operations that have complex input shapes. 

1201 This allows a user to just fill in the necessary data instead of 

1202 worrying about the specific structure of the input arguments. 

1203 

1204 Example usage:: 

1205 

1206 s = botocore.session.get_session() 

1207 ddb = s.get_service_model('dynamodb') 

1208 arg_gen = ArgumentGenerator() 

1209 sample_input = arg_gen.generate_skeleton( 

1210 ddb.operation_model('CreateTable').input_shape) 

1211 print("Sample input for dynamodb.CreateTable: %s" % sample_input) 

1212 

1213 """ 

1214 

1215 def __init__(self, use_member_names=False): 

1216 self._use_member_names = use_member_names 

1217 

1218 def generate_skeleton(self, shape): 

1219 """Generate a sample input. 

1220 

1221 :type shape: ``botocore.model.Shape`` 

1222 :param shape: The input shape. 

1223 

1224 :return: The generated skeleton input corresponding to the 

1225 provided input shape. 

1226 

1227 """ 

1228 stack = [] 

1229 return self._generate_skeleton(shape, stack) 

1230 

1231 def _generate_skeleton(self, shape, stack, name=''): 

1232 stack.append(shape.name) 

1233 try: 

1234 if shape.type_name == 'structure': 

1235 return self._generate_type_structure(shape, stack) 

1236 elif shape.type_name == 'list': 

1237 return self._generate_type_list(shape, stack) 

1238 elif shape.type_name == 'map': 

1239 return self._generate_type_map(shape, stack) 

1240 elif shape.type_name == 'string': 

1241 if self._use_member_names: 

1242 return name 

1243 if shape.enum: 

1244 return random.choice(shape.enum) 

1245 return '' 

1246 elif shape.type_name in ['integer', 'long']: 

1247 return 0 

1248 elif shape.type_name in ['float', 'double']: 

1249 return 0.0 

1250 elif shape.type_name == 'boolean': 

1251 return True 

1252 elif shape.type_name == 'timestamp': 

1253 return datetime.datetime(1970, 1, 1, 0, 0, 0) 

1254 finally: 

1255 stack.pop() 

1256 

1257 def _generate_type_structure(self, shape, stack): 

1258 if stack.count(shape.name) > 1: 

1259 return {} 

1260 skeleton = OrderedDict() 

1261 for member_name, member_shape in shape.members.items(): 

1262 skeleton[member_name] = self._generate_skeleton( 

1263 member_shape, stack, name=member_name 

1264 ) 

1265 return skeleton 

1266 

1267 def _generate_type_list(self, shape, stack): 

1268 # For list elements we've arbitrarily decided to 

1269 # return two elements for the skeleton list. 

1270 name = '' 

1271 if self._use_member_names: 

1272 name = shape.member.name 

1273 return [ 

1274 self._generate_skeleton(shape.member, stack, name), 

1275 ] 

1276 

1277 def _generate_type_map(self, shape, stack): 

1278 key_shape = shape.key 

1279 value_shape = shape.value 

1280 assert key_shape.type_name == 'string' 

1281 return OrderedDict( 

1282 [ 

1283 ('KeyName', self._generate_skeleton(value_shape, stack)), 

1284 ] 

1285 ) 

1286 

1287 

1288def is_valid_ipv6_endpoint_url(endpoint_url): 

1289 if UNSAFE_URL_CHARS.intersection(endpoint_url): 

1290 return False 

1291 hostname = f'[{urlparse(endpoint_url).hostname}]' 

1292 return IPV6_ADDRZ_RE.match(hostname) is not None 

1293 

1294 

1295def is_valid_ipv4_endpoint_url(endpoint_url): 

1296 hostname = urlparse(endpoint_url).hostname 

1297 return IPV4_RE.match(hostname) is not None 

1298 

1299 

1300def is_valid_endpoint_url(endpoint_url): 

1301 """Verify the endpoint_url is valid. 

1302 

1303 :type endpoint_url: string 

1304 :param endpoint_url: An endpoint_url. Must have at least a scheme 

1305 and a hostname. 

1306 

1307 :return: True if the endpoint url is valid. False otherwise. 

1308 

1309 """ 

1310 # post-bpo-43882 urlsplit() strips unsafe characters from URL, causing 

1311 # it to pass hostname validation below. Detect them early to fix that. 

1312 if UNSAFE_URL_CHARS.intersection(endpoint_url): 

1313 return False 

1314 parts = urlsplit(endpoint_url) 

1315 hostname = parts.hostname 

1316 if hostname is None: 

1317 return False 

1318 if len(hostname) > 255: 

1319 return False 

1320 if hostname[-1] == ".": 

1321 hostname = hostname[:-1] 

1322 allowed = re.compile( 

1323 r"^((?!-)[A-Z\d-]{1,63}(?<!-)\.)*((?!-)[A-Z\d-]{1,63}(?<!-))$", 

1324 re.IGNORECASE, 

1325 ) 

1326 return allowed.match(hostname) 

1327 

1328 

1329def is_valid_uri(endpoint_url): 

1330 return is_valid_endpoint_url(endpoint_url) or is_valid_ipv6_endpoint_url( 

1331 endpoint_url 

1332 ) 

1333 

1334 

1335def validate_region_name(region_name): 

1336 """Provided region_name must be a valid host label.""" 

1337 if region_name is None: 

1338 return 

1339 valid_host_label = re.compile(r'^(?![0-9]+$)(?!-)[a-zA-Z0-9-]{,63}(?<!-)$') 

1340 valid = valid_host_label.match(region_name) 

1341 if not valid: 

1342 raise InvalidRegionError(region_name=region_name) 

1343 

1344 

1345def check_dns_name(bucket_name): 

1346 """ 

1347 Check to see if the ``bucket_name`` complies with the 

1348 restricted DNS naming conventions necessary to allow 

1349 access via virtual-hosting style. 

1350 

1351 Even though "." characters are perfectly valid in this DNS 

1352 naming scheme, we are going to punt on any name containing a 

1353 "." character because these will cause SSL cert validation 

1354 problems if we try to use virtual-hosting style addressing. 

1355 """ 

1356 if '.' in bucket_name: 

1357 return False 

1358 n = len(bucket_name) 

1359 if n < 3 or n > 63: 

1360 # Wrong length 

1361 return False 

1362 match = LABEL_RE.match(bucket_name) 

1363 if match is None or match.end() != len(bucket_name): 

1364 return False 

1365 return True 

1366 

1367 

1368def fix_s3_host( 

1369 request, 

1370 signature_version, 

1371 region_name, 

1372 default_endpoint_url=None, 

1373 **kwargs, 

1374): 

1375 """ 

1376 This handler looks at S3 requests just before they are signed. 

1377 If there is a bucket name on the path (true for everything except 

1378 ListAllBuckets) it checks to see if that bucket name conforms to 

1379 the DNS naming conventions. If it does, it alters the request to 

1380 use ``virtual hosting`` style addressing rather than ``path-style`` 

1381 addressing. 

1382 

1383 """ 

1384 if request.context.get('use_global_endpoint', False): 

1385 default_endpoint_url = 's3.amazonaws.com' 

1386 try: 

1387 switch_to_virtual_host_style( 

1388 request, signature_version, default_endpoint_url 

1389 ) 

1390 except InvalidDNSNameError as e: 

1391 bucket_name = e.kwargs['bucket_name'] 

1392 logger.debug( 

1393 'Not changing URI, bucket is not DNS compatible: %s', bucket_name 

1394 ) 

1395 

1396 

1397def switch_to_virtual_host_style( 

1398 request, signature_version, default_endpoint_url=None, **kwargs 

1399): 

1400 """ 

1401 This is a handler to force virtual host style s3 addressing no matter 

1402 the signature version (which is taken in consideration for the default 

1403 case). If the bucket is not DNS compatible an InvalidDNSName is thrown. 

1404 

1405 :param request: A AWSRequest object that is about to be sent. 

1406 :param signature_version: The signature version to sign with 

1407 :param default_endpoint_url: The endpoint to use when switching to a 

1408 virtual style. If None is supplied, the virtual host will be 

1409 constructed from the url of the request. 

1410 """ 

1411 if request.auth_path is not None: 

1412 # The auth_path has already been applied (this may be a 

1413 # retried request). We don't need to perform this 

1414 # customization again. 

1415 return 

1416 elif _is_get_bucket_location_request(request): 

1417 # For the GetBucketLocation response, we should not be using 

1418 # the virtual host style addressing so we can avoid any sigv4 

1419 # issues. 

1420 logger.debug( 

1421 "Request is GetBucketLocation operation, not checking " 

1422 "for DNS compatibility." 

1423 ) 

1424 return 

1425 parts = urlsplit(request.url) 

1426 request.auth_path = parts.path 

1427 path_parts = parts.path.split('/') 

1428 

1429 # Retrieve what the endpoint we will be prepending the bucket name to. 

1430 if default_endpoint_url is None: 

1431 default_endpoint_url = parts.netloc 

1432 

1433 if len(path_parts) > 1: 

1434 bucket_name = path_parts[1] 

1435 if not bucket_name: 

1436 # If the bucket name is empty we should not be checking for 

1437 # dns compatibility. 

1438 return 

1439 logger.debug('Checking for DNS compatible bucket for: %s', request.url) 

1440 if check_dns_name(bucket_name): 

1441 # If the operation is on a bucket, the auth_path must be 

1442 # terminated with a '/' character. 

1443 if len(path_parts) == 2: 

1444 if request.auth_path[-1] != '/': 

1445 request.auth_path += '/' 

1446 path_parts.remove(bucket_name) 

1447 # At the very least the path must be a '/', such as with the 

1448 # CreateBucket operation when DNS style is being used. If this 

1449 # is not used you will get an empty path which is incorrect. 

1450 path = '/'.join(path_parts) or '/' 

1451 global_endpoint = default_endpoint_url 

1452 host = bucket_name + '.' + global_endpoint 

1453 new_tuple = (parts.scheme, host, path, parts.query, '') 

1454 new_uri = urlunsplit(new_tuple) 

1455 request.url = new_uri 

1456 logger.debug('URI updated to: %s', new_uri) 

1457 else: 

1458 raise InvalidDNSNameError(bucket_name=bucket_name) 

1459 

1460 

1461def _is_get_bucket_location_request(request): 

1462 return request.url.endswith('?location') 

1463 

1464 

1465def instance_cache(func): 

1466 """Method decorator for caching method calls to a single instance. 

1467 

1468 **This is not a general purpose caching decorator.** 

1469 

1470 In order to use this, you *must* provide an ``_instance_cache`` 

1471 attribute on the instance. 

1472 

1473 This decorator is used to cache method calls. The cache is only 

1474 scoped to a single instance though such that multiple instances 

1475 will maintain their own cache. In order to keep things simple, 

1476 this decorator requires that you provide an ``_instance_cache`` 

1477 attribute on your instance. 

1478 

1479 """ 

1480 func_name = func.__name__ 

1481 

1482 @functools.wraps(func) 

1483 def _cache_guard(self, *args, **kwargs): 

1484 cache_key = (func_name, args) 

1485 if kwargs: 

1486 kwarg_items = tuple(sorted(kwargs.items())) 

1487 cache_key = (func_name, args, kwarg_items) 

1488 result = self._instance_cache.get(cache_key) 

1489 if result is not None: 

1490 return result 

1491 result = func(self, *args, **kwargs) 

1492 self._instance_cache[cache_key] = result 

1493 return result 

1494 

1495 return _cache_guard 

1496 

1497 

1498def lru_cache_weakref(*cache_args, **cache_kwargs): 

1499 """ 

1500 Version of functools.lru_cache that stores a weak reference to ``self``. 

1501 

1502 Serves the same purpose as :py:func:`instance_cache` but uses Python's 

1503 functools implementation which offers ``max_size`` and ``typed`` properties. 

1504 

1505 lru_cache is a global cache even when used on a method. The cache's 

1506 reference to ``self`` will prevent garbage collection of the object. This 

1507 wrapper around functools.lru_cache replaces the reference to ``self`` with 

1508 a weak reference to not interfere with garbage collection. 

1509 """ 

1510 

1511 def wrapper(func): 

1512 @functools.lru_cache(*cache_args, **cache_kwargs) 

1513 def func_with_weakref(weakref_to_self, *args, **kwargs): 

1514 return func(weakref_to_self(), *args, **kwargs) 

1515 

1516 @functools.wraps(func) 

1517 def inner(self, *args, **kwargs): 

1518 for kwarg_key, kwarg_value in kwargs.items(): 

1519 if isinstance(kwarg_value, list): 

1520 kwargs[kwarg_key] = tuple(kwarg_value) 

1521 return func_with_weakref(weakref.ref(self), *args, **kwargs) 

1522 

1523 inner.cache_info = func_with_weakref.cache_info 

1524 return inner 

1525 

1526 return wrapper 

1527 

1528 

1529def switch_host_s3_accelerate(request, operation_name, **kwargs): 

1530 """Switches the current s3 endpoint with an S3 Accelerate endpoint""" 

1531 

1532 # Note that when registered the switching of the s3 host happens 

1533 # before it gets changed to virtual. So we are not concerned with ensuring 

1534 # that the bucket name is translated to the virtual style here and we 

1535 # can hard code the Accelerate endpoint. 

1536 parts = urlsplit(request.url).netloc.split('.') 

1537 parts = [p for p in parts if p in S3_ACCELERATE_WHITELIST] 

1538 endpoint = 'https://s3-accelerate.' 

1539 if len(parts) > 0: 

1540 endpoint += '.'.join(parts) + '.' 

1541 endpoint += 'amazonaws.com' 

1542 

1543 if operation_name in ['ListBuckets', 'CreateBucket', 'DeleteBucket']: 

1544 return 

1545 _switch_hosts(request, endpoint, use_new_scheme=False) 

1546 

1547 

1548def switch_host_with_param(request, param_name): 

1549 """Switches the host using a parameter value from a JSON request body""" 

1550 request_json = json.loads(request.data.decode('utf-8')) 

1551 if request_json.get(param_name): 

1552 new_endpoint = request_json[param_name] 

1553 _switch_hosts(request, new_endpoint) 

1554 

1555 

1556def _switch_hosts(request, new_endpoint, use_new_scheme=True): 

1557 final_endpoint = _get_new_endpoint( 

1558 request.url, new_endpoint, use_new_scheme 

1559 ) 

1560 request.url = final_endpoint 

1561 

1562 

1563def _get_new_endpoint(original_endpoint, new_endpoint, use_new_scheme=True): 

1564 new_endpoint_components = urlsplit(new_endpoint) 

1565 original_endpoint_components = urlsplit(original_endpoint) 

1566 scheme = original_endpoint_components.scheme 

1567 if use_new_scheme: 

1568 scheme = new_endpoint_components.scheme 

1569 final_endpoint_components = ( 

1570 scheme, 

1571 new_endpoint_components.netloc, 

1572 original_endpoint_components.path, 

1573 original_endpoint_components.query, 

1574 '', 

1575 ) 

1576 final_endpoint = urlunsplit(final_endpoint_components) 

1577 logger.debug( 

1578 'Updating URI from %s to %s', original_endpoint, final_endpoint 

1579 ) 

1580 return final_endpoint 

1581 

1582 

1583def deep_merge(base, extra): 

1584 """Deeply two dictionaries, overriding existing keys in the base. 

1585 

1586 :param base: The base dictionary which will be merged into. 

1587 :param extra: The dictionary to merge into the base. Keys from this 

1588 dictionary will take precedence. 

1589 """ 

1590 for key in extra: 

1591 # If the key represents a dict on both given dicts, merge the sub-dicts 

1592 if ( 

1593 key in base 

1594 and isinstance(base[key], dict) 

1595 and isinstance(extra[key], dict) 

1596 ): 

1597 deep_merge(base[key], extra[key]) 

1598 continue 

1599 

1600 # Otherwise, set the key on the base to be the value of the extra. 

1601 base[key] = extra[key] 

1602 

1603 

1604def hyphenize_service_id(service_id): 

1605 """Translate the form used for event emitters. 

1606 

1607 :param service_id: The service_id to convert. 

1608 """ 

1609 return service_id.replace(' ', '-').lower() 

1610 

1611 

1612class IdentityCache: 

1613 """Base IdentityCache implementation for storing and retrieving 

1614 highly accessed credentials. 

1615 

1616 This class is not intended to be instantiated in user code. 

1617 """ 

1618 

1619 METHOD = "base_identity_cache" 

1620 

1621 def __init__(self, client, credential_cls): 

1622 self._client = client 

1623 self._credential_cls = credential_cls 

1624 

1625 def get_credentials(self, **kwargs): 

1626 callback = self.build_refresh_callback(**kwargs) 

1627 metadata = callback() 

1628 credential_entry = self._credential_cls.create_from_metadata( 

1629 metadata=metadata, 

1630 refresh_using=callback, 

1631 method=self.METHOD, 

1632 advisory_timeout=45, 

1633 mandatory_timeout=10, 

1634 ) 

1635 return credential_entry 

1636 

1637 def build_refresh_callback(**kwargs): 

1638 """Callback to be implemented by subclasses. 

1639 

1640 Returns a set of metadata to be converted into a new 

1641 credential instance. 

1642 """ 

1643 raise NotImplementedError() 

1644 

1645 

1646class S3ExpressIdentityCache(IdentityCache): 

1647 """S3Express IdentityCache for retrieving and storing 

1648 credentials from CreateSession calls. 

1649 

1650 This class is not intended to be instantiated in user code. 

1651 """ 

1652 

1653 METHOD = "s3express" 

1654 

1655 def __init__(self, client, credential_cls): 

1656 self._client = client 

1657 self._credential_cls = credential_cls 

1658 

1659 @functools.lru_cache(maxsize=100) 

1660 def get_credentials(self, bucket): 

1661 return super().get_credentials(bucket=bucket) 

1662 

1663 def build_refresh_callback(self, bucket): 

1664 def refresher(): 

1665 response = self._client.create_session(Bucket=bucket) 

1666 creds = response['Credentials'] 

1667 expiration = self._serialize_if_needed( 

1668 creds['Expiration'], iso=True 

1669 ) 

1670 return { 

1671 "access_key": creds['AccessKeyId'], 

1672 "secret_key": creds['SecretAccessKey'], 

1673 "token": creds['SessionToken'], 

1674 "expiry_time": expiration, 

1675 } 

1676 

1677 return refresher 

1678 

1679 def _serialize_if_needed(self, value, iso=False): 

1680 if isinstance(value, _DatetimeClass): 

1681 if iso: 

1682 return value.isoformat() 

1683 return value.strftime('%Y-%m-%dT%H:%M:%S%Z') 

1684 return value 

1685 

1686 

1687class S3ExpressIdentityResolver: 

1688 def __init__(self, client, credential_cls, cache=None): 

1689 self._client = weakref.proxy(client) 

1690 

1691 if cache is None: 

1692 cache = S3ExpressIdentityCache(self._client, credential_cls) 

1693 self._cache = cache 

1694 

1695 def register(self, event_emitter=None): 

1696 logger.debug('Registering S3Express Identity Resolver') 

1697 emitter = event_emitter or self._client.meta.events 

1698 emitter.register('before-call.s3', self.apply_signing_cache_key) 

1699 emitter.register('before-sign.s3', self.resolve_s3express_identity) 

1700 

1701 def apply_signing_cache_key(self, params, context, **kwargs): 

1702 endpoint_properties = context.get('endpoint_properties', {}) 

1703 backend = endpoint_properties.get('backend', None) 

1704 

1705 # Add cache key if Bucket supplied for s3express request 

1706 bucket_name = context.get('input_params', {}).get('Bucket') 

1707 if backend == 'S3Express' and bucket_name is not None: 

1708 context.setdefault('signing', {}) 

1709 context['signing']['cache_key'] = bucket_name 

1710 

1711 def resolve_s3express_identity( 

1712 self, 

1713 request, 

1714 signing_name, 

1715 region_name, 

1716 signature_version, 

1717 request_signer, 

1718 operation_name, 

1719 **kwargs, 

1720 ): 

1721 signing_context = request.context.get('signing', {}) 

1722 signing_name = signing_context.get('signing_name') 

1723 if signing_name == 's3express' and signature_version.startswith( 

1724 'v4-s3express' 

1725 ): 

1726 signing_context['identity_cache'] = self._cache 

1727 if 'cache_key' not in signing_context: 

1728 signing_context['cache_key'] = ( 

1729 request.context.get('s3_redirect', {}) 

1730 .get('params', {}) 

1731 .get('Bucket') 

1732 ) 

1733 

1734 

1735class S3RegionRedirectorv2: 

1736 """Updated version of S3RegionRedirector for use when 

1737 EndpointRulesetResolver is in use for endpoint resolution. 

1738 

1739 This class is considered private and subject to abrupt breaking changes or 

1740 removal without prior announcement. Please do not use it directly. 

1741 """ 

1742 

1743 def __init__(self, endpoint_bridge, client, cache=None): 

1744 self._cache = cache or {} 

1745 self._client = weakref.proxy(client) 

1746 

1747 def register(self, event_emitter=None): 

1748 logger.debug('Registering S3 region redirector handler') 

1749 emitter = event_emitter or self._client.meta.events 

1750 emitter.register('needs-retry.s3', self.redirect_from_error) 

1751 emitter.register( 

1752 'before-parameter-build.s3', self.annotate_request_context 

1753 ) 

1754 emitter.register( 

1755 'before-endpoint-resolution.s3', self.redirect_from_cache 

1756 ) 

1757 

1758 def redirect_from_error(self, request_dict, response, operation, **kwargs): 

1759 """ 

1760 An S3 request sent to the wrong region will return an error that 

1761 contains the endpoint the request should be sent to. This handler 

1762 will add the redirect information to the signing context and then 

1763 redirect the request. 

1764 """ 

1765 if response is None: 

1766 # This could be none if there was a ConnectionError or other 

1767 # transport error. 

1768 return 

1769 

1770 redirect_ctx = request_dict.get('context', {}).get('s3_redirect', {}) 

1771 if ArnParser.is_arn(redirect_ctx.get('bucket')): 

1772 logger.debug( 

1773 'S3 request was previously for an Accesspoint ARN, not ' 

1774 'redirecting.' 

1775 ) 

1776 return 

1777 

1778 if redirect_ctx.get('redirected'): 

1779 logger.debug( 

1780 'S3 request was previously redirected, not redirecting.' 

1781 ) 

1782 return 

1783 

1784 error = response[1].get('Error', {}) 

1785 error_code = error.get('Code') 

1786 response_metadata = response[1].get('ResponseMetadata', {}) 

1787 

1788 # We have to account for 400 responses because 

1789 # if we sign a Head* request with the wrong region, 

1790 # we'll get a 400 Bad Request but we won't get a 

1791 # body saying it's an "AuthorizationHeaderMalformed". 

1792 is_special_head_object = ( 

1793 error_code in ('301', '400') and operation.name == 'HeadObject' 

1794 ) 

1795 is_special_head_bucket = ( 

1796 error_code in ('301', '400') 

1797 and operation.name == 'HeadBucket' 

1798 and 'x-amz-bucket-region' 

1799 in response_metadata.get('HTTPHeaders', {}) 

1800 ) 

1801 is_wrong_signing_region = ( 

1802 error_code == 'AuthorizationHeaderMalformed' and 'Region' in error 

1803 ) 

1804 is_redirect_status = response[0] is not None and response[ 

1805 0 

1806 ].status_code in (301, 302, 307) 

1807 is_permanent_redirect = error_code == 'PermanentRedirect' 

1808 is_opt_in_region_redirect = ( 

1809 error_code == 'IllegalLocationConstraintException' 

1810 and operation.name != 'CreateBucket' 

1811 ) 

1812 if not any( 

1813 [ 

1814 is_special_head_object, 

1815 is_wrong_signing_region, 

1816 is_permanent_redirect, 

1817 is_special_head_bucket, 

1818 is_redirect_status, 

1819 is_opt_in_region_redirect, 

1820 ] 

1821 ): 

1822 return 

1823 

1824 bucket = request_dict['context']['s3_redirect']['bucket'] 

1825 client_region = request_dict['context'].get('client_region') 

1826 new_region = self.get_bucket_region(bucket, response) 

1827 

1828 if new_region is None: 

1829 logger.debug( 

1830 "S3 client configured for region %s but the " 

1831 "bucket %s is not in that region and the proper region " 

1832 "could not be automatically determined.", 

1833 client_region, 

1834 bucket, 

1835 ) 

1836 return 

1837 

1838 logger.debug( 

1839 "S3 client configured for region %s but the bucket %s " 

1840 "is in region %s; Please configure the proper region to " 

1841 "avoid multiple unnecessary redirects and signing attempts.", 

1842 client_region, 

1843 bucket, 

1844 new_region, 

1845 ) 

1846 # Adding the new region to _cache will make construct_endpoint() to 

1847 # use the new region as value for the AWS::Region builtin parameter. 

1848 self._cache[bucket] = new_region 

1849 

1850 # Re-resolve endpoint with new region and modify request_dict with 

1851 # the new URL, auth scheme, and signing context. 

1852 ep_resolver = self._client._ruleset_resolver 

1853 ep_info = ep_resolver.construct_endpoint( 

1854 operation_model=operation, 

1855 call_args=request_dict['context']['s3_redirect']['params'], 

1856 request_context=request_dict['context'], 

1857 ) 

1858 request_dict['url'] = self.set_request_url( 

1859 request_dict['url'], ep_info.url 

1860 ) 

1861 request_dict['context']['s3_redirect']['redirected'] = True 

1862 auth_schemes = ep_info.properties.get('authSchemes') 

1863 if auth_schemes is not None: 

1864 auth_info = ep_resolver.auth_schemes_to_signing_ctx(auth_schemes) 

1865 auth_type, signing_context = auth_info 

1866 request_dict['context']['auth_type'] = auth_type 

1867 request_dict['context']['signing'] = { 

1868 **request_dict['context'].get('signing', {}), 

1869 **signing_context, 

1870 } 

1871 

1872 # Return 0 so it doesn't wait to retry 

1873 return 0 

1874 

1875 def get_bucket_region(self, bucket, response): 

1876 """ 

1877 There are multiple potential sources for the new region to redirect to, 

1878 but they aren't all universally available for use. This will try to 

1879 find region from response elements, but will fall back to calling 

1880 HEAD on the bucket if all else fails. 

1881 

1882 :param bucket: The bucket to find the region for. This is necessary if 

1883 the region is not available in the error response. 

1884 :param response: A response representing a service request that failed 

1885 due to incorrect region configuration. 

1886 """ 

1887 # First try to source the region from the headers. 

1888 service_response = response[1] 

1889 response_headers = service_response['ResponseMetadata']['HTTPHeaders'] 

1890 if 'x-amz-bucket-region' in response_headers: 

1891 return response_headers['x-amz-bucket-region'] 

1892 

1893 # Next, check the error body 

1894 region = service_response.get('Error', {}).get('Region', None) 

1895 if region is not None: 

1896 return region 

1897 

1898 # Finally, HEAD the bucket. No other choice sadly. 

1899 try: 

1900 response = self._client.head_bucket(Bucket=bucket) 

1901 headers = response['ResponseMetadata']['HTTPHeaders'] 

1902 except ClientError as e: 

1903 headers = e.response['ResponseMetadata']['HTTPHeaders'] 

1904 

1905 region = headers.get('x-amz-bucket-region', None) 

1906 return region 

1907 

1908 def set_request_url(self, old_url, new_endpoint, **kwargs): 

1909 """ 

1910 Splice a new endpoint into an existing URL. Note that some endpoints 

1911 from the the endpoint provider have a path component which will be 

1912 discarded by this function. 

1913 """ 

1914 return _get_new_endpoint(old_url, new_endpoint, False) 

1915 

1916 def redirect_from_cache(self, builtins, params, **kwargs): 

1917 """ 

1918 If a bucket name has been redirected before, it is in the cache. This 

1919 handler will update the AWS::Region endpoint resolver builtin param 

1920 to use the region from cache instead of the client region to avoid the 

1921 redirect. 

1922 """ 

1923 bucket = params.get('Bucket') 

1924 if bucket is not None and bucket in self._cache: 

1925 new_region = self._cache.get(bucket) 

1926 builtins['AWS::Region'] = new_region 

1927 

1928 def annotate_request_context(self, params, context, **kwargs): 

1929 """Store the bucket name in context for later use when redirecting. 

1930 The bucket name may be an access point ARN or alias. 

1931 """ 

1932 bucket = params.get('Bucket') 

1933 context['s3_redirect'] = { 

1934 'redirected': False, 

1935 'bucket': bucket, 

1936 'params': params, 

1937 } 

1938 

1939 

1940class S3RegionRedirector: 

1941 """This handler has been replaced by S3RegionRedirectorv2. The original 

1942 version remains in place for any third-party libraries that import it. 

1943 """ 

1944 

1945 def __init__(self, endpoint_bridge, client, cache=None): 

1946 self._endpoint_resolver = endpoint_bridge 

1947 self._cache = cache 

1948 if self._cache is None: 

1949 self._cache = {} 

1950 

1951 # This needs to be a weak ref in order to prevent memory leaks on 

1952 # python 2.6 

1953 self._client = weakref.proxy(client) 

1954 

1955 warnings.warn( 

1956 'The S3RegionRedirector class has been deprecated for a new ' 

1957 'internal replacement. A future version of botocore may remove ' 

1958 'this class.', 

1959 category=FutureWarning, 

1960 ) 

1961 

1962 def register(self, event_emitter=None): 

1963 emitter = event_emitter or self._client.meta.events 

1964 emitter.register('needs-retry.s3', self.redirect_from_error) 

1965 emitter.register('before-call.s3', self.set_request_url) 

1966 emitter.register('before-parameter-build.s3', self.redirect_from_cache) 

1967 

1968 def redirect_from_error(self, request_dict, response, operation, **kwargs): 

1969 """ 

1970 An S3 request sent to the wrong region will return an error that 

1971 contains the endpoint the request should be sent to. This handler 

1972 will add the redirect information to the signing context and then 

1973 redirect the request. 

1974 """ 

1975 if response is None: 

1976 # This could be none if there was a ConnectionError or other 

1977 # transport error. 

1978 return 

1979 

1980 if self._is_s3_accesspoint(request_dict.get('context', {})): 

1981 logger.debug( 

1982 'S3 request was previously to an accesspoint, not redirecting.' 

1983 ) 

1984 return 

1985 

1986 if request_dict.get('context', {}).get('s3_redirected'): 

1987 logger.debug( 

1988 'S3 request was previously redirected, not redirecting.' 

1989 ) 

1990 return 

1991 

1992 error = response[1].get('Error', {}) 

1993 error_code = error.get('Code') 

1994 response_metadata = response[1].get('ResponseMetadata', {}) 

1995 

1996 # We have to account for 400 responses because 

1997 # if we sign a Head* request with the wrong region, 

1998 # we'll get a 400 Bad Request but we won't get a 

1999 # body saying it's an "AuthorizationHeaderMalformed". 

2000 is_special_head_object = ( 

2001 error_code in ('301', '400') and operation.name == 'HeadObject' 

2002 ) 

2003 is_special_head_bucket = ( 

2004 error_code in ('301', '400') 

2005 and operation.name == 'HeadBucket' 

2006 and 'x-amz-bucket-region' 

2007 in response_metadata.get('HTTPHeaders', {}) 

2008 ) 

2009 is_wrong_signing_region = ( 

2010 error_code == 'AuthorizationHeaderMalformed' and 'Region' in error 

2011 ) 

2012 is_redirect_status = response[0] is not None and response[ 

2013 0 

2014 ].status_code in (301, 302, 307) 

2015 is_permanent_redirect = error_code == 'PermanentRedirect' 

2016 if not any( 

2017 [ 

2018 is_special_head_object, 

2019 is_wrong_signing_region, 

2020 is_permanent_redirect, 

2021 is_special_head_bucket, 

2022 is_redirect_status, 

2023 ] 

2024 ): 

2025 return 

2026 

2027 bucket = request_dict['context']['signing']['bucket'] 

2028 client_region = request_dict['context'].get('client_region') 

2029 new_region = self.get_bucket_region(bucket, response) 

2030 

2031 if new_region is None: 

2032 logger.debug( 

2033 "S3 client configured for region %s but the bucket %s is not " 

2034 "in that region and the proper region could not be " 

2035 "automatically determined.", 

2036 client_region, 

2037 bucket, 

2038 ) 

2039 return 

2040 

2041 logger.debug( 

2042 "S3 client configured for region %s but the bucket %s is in region" 

2043 " %s; Please configure the proper region to avoid multiple " 

2044 "unnecessary redirects and signing attempts.", 

2045 client_region, 

2046 bucket, 

2047 new_region, 

2048 ) 

2049 endpoint = self._endpoint_resolver.resolve('s3', new_region) 

2050 endpoint = endpoint['endpoint_url'] 

2051 

2052 signing_context = { 

2053 'region': new_region, 

2054 'bucket': bucket, 

2055 'endpoint': endpoint, 

2056 } 

2057 request_dict['context']['signing'] = signing_context 

2058 

2059 self._cache[bucket] = signing_context 

2060 self.set_request_url(request_dict, request_dict['context']) 

2061 

2062 request_dict['context']['s3_redirected'] = True 

2063 

2064 # Return 0 so it doesn't wait to retry 

2065 return 0 

2066 

2067 def get_bucket_region(self, bucket, response): 

2068 """ 

2069 There are multiple potential sources for the new region to redirect to, 

2070 but they aren't all universally available for use. This will try to 

2071 find region from response elements, but will fall back to calling 

2072 HEAD on the bucket if all else fails. 

2073 

2074 :param bucket: The bucket to find the region for. This is necessary if 

2075 the region is not available in the error response. 

2076 :param response: A response representing a service request that failed 

2077 due to incorrect region configuration. 

2078 """ 

2079 # First try to source the region from the headers. 

2080 service_response = response[1] 

2081 response_headers = service_response['ResponseMetadata']['HTTPHeaders'] 

2082 if 'x-amz-bucket-region' in response_headers: 

2083 return response_headers['x-amz-bucket-region'] 

2084 

2085 # Next, check the error body 

2086 region = service_response.get('Error', {}).get('Region', None) 

2087 if region is not None: 

2088 return region 

2089 

2090 # Finally, HEAD the bucket. No other choice sadly. 

2091 try: 

2092 response = self._client.head_bucket(Bucket=bucket) 

2093 headers = response['ResponseMetadata']['HTTPHeaders'] 

2094 except ClientError as e: 

2095 headers = e.response['ResponseMetadata']['HTTPHeaders'] 

2096 

2097 region = headers.get('x-amz-bucket-region', None) 

2098 return region 

2099 

2100 def set_request_url(self, params, context, **kwargs): 

2101 endpoint = context.get('signing', {}).get('endpoint', None) 

2102 if endpoint is not None: 

2103 params['url'] = _get_new_endpoint(params['url'], endpoint, False) 

2104 

2105 def redirect_from_cache(self, params, context, **kwargs): 

2106 """ 

2107 This handler retrieves a given bucket's signing context from the cache 

2108 and adds it into the request context. 

2109 """ 

2110 if self._is_s3_accesspoint(context): 

2111 return 

2112 bucket = params.get('Bucket') 

2113 signing_context = self._cache.get(bucket) 

2114 if signing_context is not None: 

2115 context['signing'] = signing_context 

2116 else: 

2117 context['signing'] = {'bucket': bucket} 

2118 

2119 def _is_s3_accesspoint(self, context): 

2120 return 's3_accesspoint' in context 

2121 

2122 

2123class InvalidArnException(ValueError): 

2124 pass 

2125 

2126 

2127class ArnParser: 

2128 def parse_arn(self, arn): 

2129 arn_parts = arn.split(':', 5) 

2130 if len(arn_parts) < 6: 

2131 raise InvalidArnException( 

2132 f'Provided ARN: {arn} must be of the format: ' 

2133 'arn:partition:service:region:account:resource' 

2134 ) 

2135 return { 

2136 'partition': arn_parts[1], 

2137 'service': arn_parts[2], 

2138 'region': arn_parts[3], 

2139 'account': arn_parts[4], 

2140 'resource': arn_parts[5], 

2141 } 

2142 

2143 @staticmethod 

2144 def is_arn(value): 

2145 if not isinstance(value, str) or not value.startswith('arn:'): 

2146 return False 

2147 arn_parser = ArnParser() 

2148 try: 

2149 arn_parser.parse_arn(value) 

2150 return True 

2151 except InvalidArnException: 

2152 return False 

2153 

2154 

2155class S3ArnParamHandler: 

2156 _RESOURCE_REGEX = re.compile( 

2157 r'^(?P<resource_type>accesspoint|outpost)[/:](?P<resource_name>.+)$' 

2158 ) 

2159 _OUTPOST_RESOURCE_REGEX = re.compile( 

2160 r'^(?P<outpost_name>[a-zA-Z0-9\-]{1,63})[/:]accesspoint[/:]' 

2161 r'(?P<accesspoint_name>[a-zA-Z0-9\-]{1,63}$)' 

2162 ) 

2163 _BLACKLISTED_OPERATIONS = ['CreateBucket'] 

2164 

2165 def __init__(self, arn_parser=None): 

2166 self._arn_parser = arn_parser 

2167 if arn_parser is None: 

2168 self._arn_parser = ArnParser() 

2169 

2170 def register(self, event_emitter): 

2171 event_emitter.register('before-parameter-build.s3', self.handle_arn) 

2172 

2173 def handle_arn(self, params, model, context, **kwargs): 

2174 if model.name in self._BLACKLISTED_OPERATIONS: 

2175 return 

2176 arn_details = self._get_arn_details_from_bucket_param(params) 

2177 if arn_details is None: 

2178 return 

2179 if arn_details['resource_type'] == 'accesspoint': 

2180 self._store_accesspoint(params, context, arn_details) 

2181 elif arn_details['resource_type'] == 'outpost': 

2182 self._store_outpost(params, context, arn_details) 

2183 

2184 def _get_arn_details_from_bucket_param(self, params): 

2185 if 'Bucket' in params: 

2186 try: 

2187 arn = params['Bucket'] 

2188 arn_details = self._arn_parser.parse_arn(arn) 

2189 self._add_resource_type_and_name(arn, arn_details) 

2190 return arn_details 

2191 except InvalidArnException: 

2192 pass 

2193 return None 

2194 

2195 def _add_resource_type_and_name(self, arn, arn_details): 

2196 match = self._RESOURCE_REGEX.match(arn_details['resource']) 

2197 if match: 

2198 arn_details['resource_type'] = match.group('resource_type') 

2199 arn_details['resource_name'] = match.group('resource_name') 

2200 else: 

2201 raise UnsupportedS3ArnError(arn=arn) 

2202 

2203 def _store_accesspoint(self, params, context, arn_details): 

2204 # Ideally the access-point would be stored as a parameter in the 

2205 # request where the serializer would then know how to serialize it, 

2206 # but access-points are not modeled in S3 operations so it would fail 

2207 # validation. Instead, we set the access-point to the bucket parameter 

2208 # to have some value set when serializing the request and additional 

2209 # information on the context from the arn to use in forming the 

2210 # access-point endpoint. 

2211 params['Bucket'] = arn_details['resource_name'] 

2212 context['s3_accesspoint'] = { 

2213 'name': arn_details['resource_name'], 

2214 'account': arn_details['account'], 

2215 'partition': arn_details['partition'], 

2216 'region': arn_details['region'], 

2217 'service': arn_details['service'], 

2218 } 

2219 

2220 def _store_outpost(self, params, context, arn_details): 

2221 resource_name = arn_details['resource_name'] 

2222 match = self._OUTPOST_RESOURCE_REGEX.match(resource_name) 

2223 if not match: 

2224 raise UnsupportedOutpostResourceError(resource_name=resource_name) 

2225 # Because we need to set the bucket name to something to pass 

2226 # validation we're going to use the access point name to be consistent 

2227 # with normal access point arns. 

2228 accesspoint_name = match.group('accesspoint_name') 

2229 params['Bucket'] = accesspoint_name 

2230 context['s3_accesspoint'] = { 

2231 'outpost_name': match.group('outpost_name'), 

2232 'name': accesspoint_name, 

2233 'account': arn_details['account'], 

2234 'partition': arn_details['partition'], 

2235 'region': arn_details['region'], 

2236 'service': arn_details['service'], 

2237 } 

2238 

2239 

2240class S3EndpointSetter: 

2241 _DEFAULT_PARTITION = 'aws' 

2242 _DEFAULT_DNS_SUFFIX = 'amazonaws.com' 

2243 

2244 def __init__( 

2245 self, 

2246 endpoint_resolver, 

2247 region=None, 

2248 s3_config=None, 

2249 endpoint_url=None, 

2250 partition=None, 

2251 use_fips_endpoint=False, 

2252 ): 

2253 # This is calling the endpoint_resolver in regions.py 

2254 self._endpoint_resolver = endpoint_resolver 

2255 self._region = region 

2256 self._s3_config = s3_config 

2257 self._use_fips_endpoint = use_fips_endpoint 

2258 if s3_config is None: 

2259 self._s3_config = {} 

2260 self._endpoint_url = endpoint_url 

2261 self._partition = partition 

2262 if partition is None: 

2263 self._partition = self._DEFAULT_PARTITION 

2264 

2265 def register(self, event_emitter): 

2266 event_emitter.register('before-sign.s3', self.set_endpoint) 

2267 event_emitter.register('choose-signer.s3', self.set_signer) 

2268 event_emitter.register( 

2269 'before-call.s3.WriteGetObjectResponse', 

2270 self.update_endpoint_to_s3_object_lambda, 

2271 ) 

2272 

2273 def update_endpoint_to_s3_object_lambda(self, params, context, **kwargs): 

2274 if self._use_accelerate_endpoint: 

2275 raise UnsupportedS3ConfigurationError( 

2276 msg='S3 client does not support accelerate endpoints for S3 Object Lambda operations', 

2277 ) 

2278 

2279 self._override_signing_name(context, 's3-object-lambda') 

2280 if self._endpoint_url: 

2281 # Only update the url if an explicit url was not provided 

2282 return 

2283 

2284 resolver = self._endpoint_resolver 

2285 # Constructing endpoints as s3-object-lambda as region 

2286 resolved = resolver.construct_endpoint( 

2287 's3-object-lambda', self._region 

2288 ) 

2289 

2290 # Ideally we would be able to replace the endpoint before 

2291 # serialization but there's no event to do that currently 

2292 # host_prefix is all the arn/bucket specs 

2293 new_endpoint = 'https://{host_prefix}{hostname}'.format( 

2294 host_prefix=params['host_prefix'], 

2295 hostname=resolved['hostname'], 

2296 ) 

2297 

2298 params['url'] = _get_new_endpoint(params['url'], new_endpoint, False) 

2299 

2300 def set_endpoint(self, request, **kwargs): 

2301 if self._use_accesspoint_endpoint(request): 

2302 self._validate_accesspoint_supported(request) 

2303 self._validate_fips_supported(request) 

2304 self._validate_global_regions(request) 

2305 region_name = self._resolve_region_for_accesspoint_endpoint( 

2306 request 

2307 ) 

2308 self._resolve_signing_name_for_accesspoint_endpoint(request) 

2309 self._switch_to_accesspoint_endpoint(request, region_name) 

2310 return 

2311 if self._use_accelerate_endpoint: 

2312 if self._use_fips_endpoint: 

2313 raise UnsupportedS3ConfigurationError( 

2314 msg=( 

2315 'Client is configured to use the FIPS psuedo region ' 

2316 f'for "{self._region}", but S3 Accelerate does not have any FIPS ' 

2317 'compatible endpoints.' 

2318 ) 

2319 ) 

2320 switch_host_s3_accelerate(request=request, **kwargs) 

2321 if self._s3_addressing_handler: 

2322 self._s3_addressing_handler(request=request, **kwargs) 

2323 

2324 def _use_accesspoint_endpoint(self, request): 

2325 return 's3_accesspoint' in request.context 

2326 

2327 def _validate_fips_supported(self, request): 

2328 if not self._use_fips_endpoint: 

2329 return 

2330 if 'fips' in request.context['s3_accesspoint']['region']: 

2331 raise UnsupportedS3AccesspointConfigurationError( 

2332 msg={'Invalid ARN, FIPS region not allowed in ARN.'} 

2333 ) 

2334 if 'outpost_name' in request.context['s3_accesspoint']: 

2335 raise UnsupportedS3AccesspointConfigurationError( 

2336 msg=( 

2337 f'Client is configured to use the FIPS psuedo-region "{self._region}", ' 

2338 'but outpost ARNs do not support FIPS endpoints.' 

2339 ) 

2340 ) 

2341 # Transforming psuedo region to actual region 

2342 accesspoint_region = request.context['s3_accesspoint']['region'] 

2343 if accesspoint_region != self._region: 

2344 if not self._s3_config.get('use_arn_region', True): 

2345 # TODO: Update message to reflect use_arn_region 

2346 # is not set 

2347 raise UnsupportedS3AccesspointConfigurationError( 

2348 msg=( 

2349 'Client is configured to use the FIPS psuedo-region ' 

2350 f'for "{self._region}", but the access-point ARN provided is for ' 

2351 f'the "{accesspoint_region}" region. For clients using a FIPS ' 

2352 'psuedo-region calls to access-point ARNs in another ' 

2353 'region are not allowed.' 

2354 ) 

2355 ) 

2356 

2357 def _validate_global_regions(self, request): 

2358 if self._s3_config.get('use_arn_region', True): 

2359 return 

2360 if self._region in ['aws-global', 's3-external-1']: 

2361 raise UnsupportedS3AccesspointConfigurationError( 

2362 msg=( 

2363 'Client is configured to use the global psuedo-region ' 

2364 f'"{self._region}". When providing access-point ARNs a regional ' 

2365 'endpoint must be specified.' 

2366 ) 

2367 ) 

2368 

2369 def _validate_accesspoint_supported(self, request): 

2370 if self._use_accelerate_endpoint: 

2371 raise UnsupportedS3AccesspointConfigurationError( 

2372 msg=( 

2373 'Client does not support s3 accelerate configuration ' 

2374 'when an access-point ARN is specified.' 

2375 ) 

2376 ) 

2377 request_partition = request.context['s3_accesspoint']['partition'] 

2378 if request_partition != self._partition: 

2379 raise UnsupportedS3AccesspointConfigurationError( 

2380 msg=( 

2381 f'Client is configured for "{self._partition}" partition, but access-point' 

2382 f' ARN provided is for "{request_partition}" partition. The client and ' 

2383 ' access-point partition must be the same.' 

2384 ) 

2385 ) 

2386 s3_service = request.context['s3_accesspoint'].get('service') 

2387 if s3_service == 's3-object-lambda' and self._s3_config.get( 

2388 'use_dualstack_endpoint' 

2389 ): 

2390 raise UnsupportedS3AccesspointConfigurationError( 

2391 msg=( 

2392 'Client does not support s3 dualstack configuration ' 

2393 'when an S3 Object Lambda access point ARN is specified.' 

2394 ) 

2395 ) 

2396 outpost_name = request.context['s3_accesspoint'].get('outpost_name') 

2397 if outpost_name and self._s3_config.get('use_dualstack_endpoint'): 

2398 raise UnsupportedS3AccesspointConfigurationError( 

2399 msg=( 

2400 'Client does not support s3 dualstack configuration ' 

2401 'when an outpost ARN is specified.' 

2402 ) 

2403 ) 

2404 self._validate_mrap_s3_config(request) 

2405 

2406 def _validate_mrap_s3_config(self, request): 

2407 if not is_global_accesspoint(request.context): 

2408 return 

2409 if self._s3_config.get('s3_disable_multiregion_access_points'): 

2410 raise UnsupportedS3AccesspointConfigurationError( 

2411 msg=( 

2412 'Invalid configuration, Multi-Region Access Point ' 

2413 'ARNs are disabled.' 

2414 ) 

2415 ) 

2416 elif self._s3_config.get('use_dualstack_endpoint'): 

2417 raise UnsupportedS3AccesspointConfigurationError( 

2418 msg=( 

2419 'Client does not support s3 dualstack configuration ' 

2420 'when a Multi-Region Access Point ARN is specified.' 

2421 ) 

2422 ) 

2423 

2424 def _resolve_region_for_accesspoint_endpoint(self, request): 

2425 if is_global_accesspoint(request.context): 

2426 # Requests going to MRAP endpoints MUST be set to any (*) region. 

2427 self._override_signing_region(request, '*') 

2428 elif self._s3_config.get('use_arn_region', True): 

2429 accesspoint_region = request.context['s3_accesspoint']['region'] 

2430 # If we are using the region from the access point, 

2431 # we will also want to make sure that we set it as the 

2432 # signing region as well 

2433 self._override_signing_region(request, accesspoint_region) 

2434 return accesspoint_region 

2435 return self._region 

2436 

2437 def set_signer(self, context, **kwargs): 

2438 if is_global_accesspoint(context): 

2439 if HAS_CRT: 

2440 return 's3v4a' 

2441 else: 

2442 raise MissingDependencyException( 

2443 msg="Using S3 with an MRAP arn requires an additional " 

2444 "dependency. You will need to pip install " 

2445 "botocore[crt] before proceeding." 

2446 ) 

2447 

2448 def _resolve_signing_name_for_accesspoint_endpoint(self, request): 

2449 accesspoint_service = request.context['s3_accesspoint']['service'] 

2450 self._override_signing_name(request.context, accesspoint_service) 

2451 

2452 def _switch_to_accesspoint_endpoint(self, request, region_name): 

2453 original_components = urlsplit(request.url) 

2454 accesspoint_endpoint = urlunsplit( 

2455 ( 

2456 original_components.scheme, 

2457 self._get_netloc(request.context, region_name), 

2458 self._get_accesspoint_path( 

2459 original_components.path, request.context 

2460 ), 

2461 original_components.query, 

2462 '', 

2463 ) 

2464 ) 

2465 logger.debug( 

2466 'Updating URI from %s to %s', request.url, accesspoint_endpoint 

2467 ) 

2468 request.url = accesspoint_endpoint 

2469 

2470 def _get_netloc(self, request_context, region_name): 

2471 if is_global_accesspoint(request_context): 

2472 return self._get_mrap_netloc(request_context) 

2473 else: 

2474 return self._get_accesspoint_netloc(request_context, region_name) 

2475 

2476 def _get_mrap_netloc(self, request_context): 

2477 s3_accesspoint = request_context['s3_accesspoint'] 

2478 region_name = 's3-global' 

2479 mrap_netloc_components = [s3_accesspoint['name']] 

2480 if self._endpoint_url: 

2481 endpoint_url_netloc = urlsplit(self._endpoint_url).netloc 

2482 mrap_netloc_components.append(endpoint_url_netloc) 

2483 else: 

2484 partition = s3_accesspoint['partition'] 

2485 mrap_netloc_components.extend( 

2486 [ 

2487 'accesspoint', 

2488 region_name, 

2489 self._get_partition_dns_suffix(partition), 

2490 ] 

2491 ) 

2492 return '.'.join(mrap_netloc_components) 

2493 

2494 def _get_accesspoint_netloc(self, request_context, region_name): 

2495 s3_accesspoint = request_context['s3_accesspoint'] 

2496 accesspoint_netloc_components = [ 

2497 '{}-{}'.format(s3_accesspoint['name'], s3_accesspoint['account']), 

2498 ] 

2499 outpost_name = s3_accesspoint.get('outpost_name') 

2500 if self._endpoint_url: 

2501 if outpost_name: 

2502 accesspoint_netloc_components.append(outpost_name) 

2503 endpoint_url_netloc = urlsplit(self._endpoint_url).netloc 

2504 accesspoint_netloc_components.append(endpoint_url_netloc) 

2505 else: 

2506 if outpost_name: 

2507 outpost_host = [outpost_name, 's3-outposts'] 

2508 accesspoint_netloc_components.extend(outpost_host) 

2509 elif s3_accesspoint['service'] == 's3-object-lambda': 

2510 component = self._inject_fips_if_needed( 

2511 's3-object-lambda', request_context 

2512 ) 

2513 accesspoint_netloc_components.append(component) 

2514 else: 

2515 component = self._inject_fips_if_needed( 

2516 's3-accesspoint', request_context 

2517 ) 

2518 accesspoint_netloc_components.append(component) 

2519 if self._s3_config.get('use_dualstack_endpoint'): 

2520 accesspoint_netloc_components.append('dualstack') 

2521 accesspoint_netloc_components.extend( 

2522 [region_name, self._get_dns_suffix(region_name)] 

2523 ) 

2524 return '.'.join(accesspoint_netloc_components) 

2525 

2526 def _inject_fips_if_needed(self, component, request_context): 

2527 if self._use_fips_endpoint: 

2528 return f'{component}-fips' 

2529 return component 

2530 

2531 def _get_accesspoint_path(self, original_path, request_context): 

2532 # The Bucket parameter was substituted with the access-point name as 

2533 # some value was required in serializing the bucket name. Now that 

2534 # we are making the request directly to the access point, we will 

2535 # want to remove that access-point name from the path. 

2536 name = request_context['s3_accesspoint']['name'] 

2537 # All S3 operations require at least a / in their path. 

2538 return original_path.replace('/' + name, '', 1) or '/' 

2539 

2540 def _get_partition_dns_suffix(self, partition_name): 

2541 dns_suffix = self._endpoint_resolver.get_partition_dns_suffix( 

2542 partition_name 

2543 ) 

2544 if dns_suffix is None: 

2545 dns_suffix = self._DEFAULT_DNS_SUFFIX 

2546 return dns_suffix 

2547 

2548 def _get_dns_suffix(self, region_name): 

2549 resolved = self._endpoint_resolver.construct_endpoint( 

2550 's3', region_name 

2551 ) 

2552 dns_suffix = self._DEFAULT_DNS_SUFFIX 

2553 if resolved and 'dnsSuffix' in resolved: 

2554 dns_suffix = resolved['dnsSuffix'] 

2555 return dns_suffix 

2556 

2557 def _override_signing_region(self, request, region_name): 

2558 signing_context = request.context.get('signing', {}) 

2559 # S3SigV4Auth will use the context['signing']['region'] value to 

2560 # sign with if present. This is used by the Bucket redirector 

2561 # as well but we should be fine because the redirector is never 

2562 # used in combination with the accesspoint setting logic. 

2563 signing_context['region'] = region_name 

2564 request.context['signing'] = signing_context 

2565 

2566 def _override_signing_name(self, context, signing_name): 

2567 signing_context = context.get('signing', {}) 

2568 # S3SigV4Auth will use the context['signing']['signing_name'] value to 

2569 # sign with if present. This is used by the Bucket redirector 

2570 # as well but we should be fine because the redirector is never 

2571 # used in combination with the accesspoint setting logic. 

2572 signing_context['signing_name'] = signing_name 

2573 context['signing'] = signing_context 

2574 

2575 @CachedProperty 

2576 def _use_accelerate_endpoint(self): 

2577 # Enable accelerate if the configuration is set to to true or the 

2578 # endpoint being used matches one of the accelerate endpoints. 

2579 

2580 # Accelerate has been explicitly configured. 

2581 if self._s3_config.get('use_accelerate_endpoint'): 

2582 return True 

2583 

2584 # Accelerate mode is turned on automatically if an endpoint url is 

2585 # provided that matches the accelerate scheme. 

2586 if self._endpoint_url is None: 

2587 return False 

2588 

2589 # Accelerate is only valid for Amazon endpoints. 

2590 netloc = urlsplit(self._endpoint_url).netloc 

2591 if not netloc.endswith('amazonaws.com'): 

2592 return False 

2593 

2594 # The first part of the url should always be s3-accelerate. 

2595 parts = netloc.split('.') 

2596 if parts[0] != 's3-accelerate': 

2597 return False 

2598 

2599 # Url parts between 's3-accelerate' and 'amazonaws.com' which 

2600 # represent different url features. 

2601 feature_parts = parts[1:-2] 

2602 

2603 # There should be no duplicate url parts. 

2604 if len(feature_parts) != len(set(feature_parts)): 

2605 return False 

2606 

2607 # Remaining parts must all be in the whitelist. 

2608 return all(p in S3_ACCELERATE_WHITELIST for p in feature_parts) 

2609 

2610 @CachedProperty 

2611 def _addressing_style(self): 

2612 # Use virtual host style addressing if accelerate is enabled or if 

2613 # the given endpoint url is an accelerate endpoint. 

2614 if self._use_accelerate_endpoint: 

2615 return 'virtual' 

2616 

2617 # If a particular addressing style is configured, use it. 

2618 configured_addressing_style = self._s3_config.get('addressing_style') 

2619 if configured_addressing_style: 

2620 return configured_addressing_style 

2621 

2622 @CachedProperty 

2623 def _s3_addressing_handler(self): 

2624 # If virtual host style was configured, use it regardless of whether 

2625 # or not the bucket looks dns compatible. 

2626 if self._addressing_style == 'virtual': 

2627 logger.debug("Using S3 virtual host style addressing.") 

2628 return switch_to_virtual_host_style 

2629 

2630 # If path style is configured, no additional steps are needed. If 

2631 # endpoint_url was specified, don't default to virtual. We could 

2632 # potentially default provided endpoint urls to virtual hosted 

2633 # style, but for now it is avoided. 

2634 if self._addressing_style == 'path' or self._endpoint_url is not None: 

2635 logger.debug("Using S3 path style addressing.") 

2636 return None 

2637 

2638 logger.debug( 

2639 "Defaulting to S3 virtual host style addressing with " 

2640 "path style addressing fallback." 

2641 ) 

2642 

2643 # By default, try to use virtual style with path fallback. 

2644 return fix_s3_host 

2645 

2646 

2647class S3ControlEndpointSetter: 

2648 _DEFAULT_PARTITION = 'aws' 

2649 _DEFAULT_DNS_SUFFIX = 'amazonaws.com' 

2650 _HOST_LABEL_REGEX = re.compile(r'^[a-zA-Z0-9\-]{1,63}$') 

2651 

2652 def __init__( 

2653 self, 

2654 endpoint_resolver, 

2655 region=None, 

2656 s3_config=None, 

2657 endpoint_url=None, 

2658 partition=None, 

2659 use_fips_endpoint=False, 

2660 ): 

2661 self._endpoint_resolver = endpoint_resolver 

2662 self._region = region 

2663 self._s3_config = s3_config 

2664 self._use_fips_endpoint = use_fips_endpoint 

2665 if s3_config is None: 

2666 self._s3_config = {} 

2667 self._endpoint_url = endpoint_url 

2668 self._partition = partition 

2669 if partition is None: 

2670 self._partition = self._DEFAULT_PARTITION 

2671 

2672 def register(self, event_emitter): 

2673 event_emitter.register('before-sign.s3-control', self.set_endpoint) 

2674 

2675 def set_endpoint(self, request, **kwargs): 

2676 if self._use_endpoint_from_arn_details(request): 

2677 self._validate_endpoint_from_arn_details_supported(request) 

2678 region_name = self._resolve_region_from_arn_details(request) 

2679 self._resolve_signing_name_from_arn_details(request) 

2680 self._resolve_endpoint_from_arn_details(request, region_name) 

2681 self._add_headers_from_arn_details(request) 

2682 elif self._use_endpoint_from_outpost_id(request): 

2683 self._validate_outpost_redirection_valid(request) 

2684 self._override_signing_name(request, 's3-outposts') 

2685 new_netloc = self._construct_outpost_endpoint(self._region) 

2686 self._update_request_netloc(request, new_netloc) 

2687 

2688 def _use_endpoint_from_arn_details(self, request): 

2689 return 'arn_details' in request.context 

2690 

2691 def _use_endpoint_from_outpost_id(self, request): 

2692 return 'outpost_id' in request.context 

2693 

2694 def _validate_endpoint_from_arn_details_supported(self, request): 

2695 if 'fips' in request.context['arn_details']['region']: 

2696 raise UnsupportedS3ControlArnError( 

2697 arn=request.context['arn_details']['original'], 

2698 msg='Invalid ARN, FIPS region not allowed in ARN.', 

2699 ) 

2700 if not self._s3_config.get('use_arn_region', False): 

2701 arn_region = request.context['arn_details']['region'] 

2702 if arn_region != self._region: 

2703 error_msg = ( 

2704 'The use_arn_region configuration is disabled but ' 

2705 f'received arn for "{arn_region}" when the client is configured ' 

2706 f'to use "{self._region}"' 

2707 ) 

2708 raise UnsupportedS3ControlConfigurationError(msg=error_msg) 

2709 request_partion = request.context['arn_details']['partition'] 

2710 if request_partion != self._partition: 

2711 raise UnsupportedS3ControlConfigurationError( 

2712 msg=( 

2713 f'Client is configured for "{self._partition}" partition, but arn ' 

2714 f'provided is for "{request_partion}" partition. The client and ' 

2715 'arn partition must be the same.' 

2716 ) 

2717 ) 

2718 if self._s3_config.get('use_accelerate_endpoint'): 

2719 raise UnsupportedS3ControlConfigurationError( 

2720 msg='S3 control client does not support accelerate endpoints', 

2721 ) 

2722 if 'outpost_name' in request.context['arn_details']: 

2723 self._validate_outpost_redirection_valid(request) 

2724 

2725 def _validate_outpost_redirection_valid(self, request): 

2726 if self._s3_config.get('use_dualstack_endpoint'): 

2727 raise UnsupportedS3ControlConfigurationError( 

2728 msg=( 

2729 'Client does not support s3 dualstack configuration ' 

2730 'when an outpost is specified.' 

2731 ) 

2732 ) 

2733 

2734 def _resolve_region_from_arn_details(self, request): 

2735 if self._s3_config.get('use_arn_region', False): 

2736 arn_region = request.context['arn_details']['region'] 

2737 # If we are using the region from the expanded arn, we will also 

2738 # want to make sure that we set it as the signing region as well 

2739 self._override_signing_region(request, arn_region) 

2740 return arn_region 

2741 return self._region 

2742 

2743 def _resolve_signing_name_from_arn_details(self, request): 

2744 arn_service = request.context['arn_details']['service'] 

2745 self._override_signing_name(request, arn_service) 

2746 return arn_service 

2747 

2748 def _resolve_endpoint_from_arn_details(self, request, region_name): 

2749 new_netloc = self._resolve_netloc_from_arn_details( 

2750 request, region_name 

2751 ) 

2752 self._update_request_netloc(request, new_netloc) 

2753 

2754 def _update_request_netloc(self, request, new_netloc): 

2755 original_components = urlsplit(request.url) 

2756 arn_details_endpoint = urlunsplit( 

2757 ( 

2758 original_components.scheme, 

2759 new_netloc, 

2760 original_components.path, 

2761 original_components.query, 

2762 '', 

2763 ) 

2764 ) 

2765 logger.debug( 

2766 'Updating URI from %s to %s', request.url, arn_details_endpoint 

2767 ) 

2768 request.url = arn_details_endpoint 

2769 

2770 def _resolve_netloc_from_arn_details(self, request, region_name): 

2771 arn_details = request.context['arn_details'] 

2772 if 'outpost_name' in arn_details: 

2773 return self._construct_outpost_endpoint(region_name) 

2774 account = arn_details['account'] 

2775 return self._construct_s3_control_endpoint(region_name, account) 

2776 

2777 def _is_valid_host_label(self, label): 

2778 return self._HOST_LABEL_REGEX.match(label) 

2779 

2780 def _validate_host_labels(self, *labels): 

2781 for label in labels: 

2782 if not self._is_valid_host_label(label): 

2783 raise InvalidHostLabelError(label=label) 

2784 

2785 def _construct_s3_control_endpoint(self, region_name, account): 

2786 self._validate_host_labels(region_name, account) 

2787 if self._endpoint_url: 

2788 endpoint_url_netloc = urlsplit(self._endpoint_url).netloc 

2789 netloc = [account, endpoint_url_netloc] 

2790 else: 

2791 netloc = [ 

2792 account, 

2793 's3-control', 

2794 ] 

2795 self._add_dualstack(netloc) 

2796 dns_suffix = self._get_dns_suffix(region_name) 

2797 netloc.extend([region_name, dns_suffix]) 

2798 return self._construct_netloc(netloc) 

2799 

2800 def _construct_outpost_endpoint(self, region_name): 

2801 self._validate_host_labels(region_name) 

2802 if self._endpoint_url: 

2803 return urlsplit(self._endpoint_url).netloc 

2804 else: 

2805 netloc = [ 

2806 's3-outposts', 

2807 region_name, 

2808 self._get_dns_suffix(region_name), 

2809 ] 

2810 self._add_fips(netloc) 

2811 return self._construct_netloc(netloc) 

2812 

2813 def _construct_netloc(self, netloc): 

2814 return '.'.join(netloc) 

2815 

2816 def _add_fips(self, netloc): 

2817 if self._use_fips_endpoint: 

2818 netloc[0] = netloc[0] + '-fips' 

2819 

2820 def _add_dualstack(self, netloc): 

2821 if self._s3_config.get('use_dualstack_endpoint'): 

2822 netloc.append('dualstack') 

2823 

2824 def _get_dns_suffix(self, region_name): 

2825 resolved = self._endpoint_resolver.construct_endpoint( 

2826 's3', region_name 

2827 ) 

2828 dns_suffix = self._DEFAULT_DNS_SUFFIX 

2829 if resolved and 'dnsSuffix' in resolved: 

2830 dns_suffix = resolved['dnsSuffix'] 

2831 return dns_suffix 

2832 

2833 def _override_signing_region(self, request, region_name): 

2834 signing_context = request.context.get('signing', {}) 

2835 # S3SigV4Auth will use the context['signing']['region'] value to 

2836 # sign with if present. This is used by the Bucket redirector 

2837 # as well but we should be fine because the redirector is never 

2838 # used in combination with the accesspoint setting logic. 

2839 signing_context['region'] = region_name 

2840 request.context['signing'] = signing_context 

2841 

2842 def _override_signing_name(self, request, signing_name): 

2843 signing_context = request.context.get('signing', {}) 

2844 # S3SigV4Auth will use the context['signing']['signing_name'] value to 

2845 # sign with if present. This is used by the Bucket redirector 

2846 # as well but we should be fine because the redirector is never 

2847 # used in combination with the accesspoint setting logic. 

2848 signing_context['signing_name'] = signing_name 

2849 request.context['signing'] = signing_context 

2850 

2851 def _add_headers_from_arn_details(self, request): 

2852 arn_details = request.context['arn_details'] 

2853 outpost_name = arn_details.get('outpost_name') 

2854 if outpost_name: 

2855 self._add_outpost_id_header(request, outpost_name) 

2856 

2857 def _add_outpost_id_header(self, request, outpost_name): 

2858 request.headers['x-amz-outpost-id'] = outpost_name 

2859 

2860 

2861class S3ControlArnParamHandler: 

2862 """This handler has been replaced by S3ControlArnParamHandlerv2. The 

2863 original version remains in place for any third-party importers. 

2864 """ 

2865 

2866 _RESOURCE_SPLIT_REGEX = re.compile(r'[/:]') 

2867 

2868 def __init__(self, arn_parser=None): 

2869 self._arn_parser = arn_parser 

2870 if arn_parser is None: 

2871 self._arn_parser = ArnParser() 

2872 warnings.warn( 

2873 'The S3ControlArnParamHandler class has been deprecated for a new ' 

2874 'internal replacement. A future version of botocore may remove ' 

2875 'this class.', 

2876 category=FutureWarning, 

2877 ) 

2878 

2879 def register(self, event_emitter): 

2880 event_emitter.register( 

2881 'before-parameter-build.s3-control', 

2882 self.handle_arn, 

2883 ) 

2884 

2885 def handle_arn(self, params, model, context, **kwargs): 

2886 if model.name in ('CreateBucket', 'ListRegionalBuckets'): 

2887 # CreateBucket and ListRegionalBuckets are special cases that do 

2888 # not obey ARN based redirection but will redirect based off of the 

2889 # presence of the OutpostId parameter 

2890 self._handle_outpost_id_param(params, model, context) 

2891 else: 

2892 self._handle_name_param(params, model, context) 

2893 self._handle_bucket_param(params, model, context) 

2894 

2895 def _get_arn_details_from_param(self, params, param_name): 

2896 if param_name not in params: 

2897 return None 

2898 try: 

2899 arn = params[param_name] 

2900 arn_details = self._arn_parser.parse_arn(arn) 

2901 arn_details['original'] = arn 

2902 arn_details['resources'] = self._split_resource(arn_details) 

2903 return arn_details 

2904 except InvalidArnException: 

2905 return None 

2906 

2907 def _split_resource(self, arn_details): 

2908 return self._RESOURCE_SPLIT_REGEX.split(arn_details['resource']) 

2909 

2910 def _override_account_id_param(self, params, arn_details): 

2911 account_id = arn_details['account'] 

2912 if 'AccountId' in params and params['AccountId'] != account_id: 

2913 error_msg = ( 

2914 'Account ID in arn does not match the AccountId parameter ' 

2915 'provided: "{}"' 

2916 ).format(params['AccountId']) 

2917 raise UnsupportedS3ControlArnError( 

2918 arn=arn_details['original'], 

2919 msg=error_msg, 

2920 ) 

2921 params['AccountId'] = account_id 

2922 

2923 def _handle_outpost_id_param(self, params, model, context): 

2924 if 'OutpostId' not in params: 

2925 return 

2926 context['outpost_id'] = params['OutpostId'] 

2927 

2928 def _handle_name_param(self, params, model, context): 

2929 # CreateAccessPoint is a special case that does not expand Name 

2930 if model.name == 'CreateAccessPoint': 

2931 return 

2932 arn_details = self._get_arn_details_from_param(params, 'Name') 

2933 if arn_details is None: 

2934 return 

2935 if self._is_outpost_accesspoint(arn_details): 

2936 self._store_outpost_accesspoint(params, context, arn_details) 

2937 else: 

2938 error_msg = 'The Name parameter does not support the provided ARN' 

2939 raise UnsupportedS3ControlArnError( 

2940 arn=arn_details['original'], 

2941 msg=error_msg, 

2942 ) 

2943 

2944 def _is_outpost_accesspoint(self, arn_details): 

2945 if arn_details['service'] != 's3-outposts': 

2946 return False 

2947 resources = arn_details['resources'] 

2948 if len(resources) != 4: 

2949 return False 

2950 # Resource must be of the form outpost/op-123/accesspoint/name 

2951 return resources[0] == 'outpost' and resources[2] == 'accesspoint' 

2952 

2953 def _store_outpost_accesspoint(self, params, context, arn_details): 

2954 self._override_account_id_param(params, arn_details) 

2955 accesspoint_name = arn_details['resources'][3] 

2956 params['Name'] = accesspoint_name 

2957 arn_details['accesspoint_name'] = accesspoint_name 

2958 arn_details['outpost_name'] = arn_details['resources'][1] 

2959 context['arn_details'] = arn_details 

2960 

2961 def _handle_bucket_param(self, params, model, context): 

2962 arn_details = self._get_arn_details_from_param(params, 'Bucket') 

2963 if arn_details is None: 

2964 return 

2965 if self._is_outpost_bucket(arn_details): 

2966 self._store_outpost_bucket(params, context, arn_details) 

2967 else: 

2968 error_msg = ( 

2969 'The Bucket parameter does not support the provided ARN' 

2970 ) 

2971 raise UnsupportedS3ControlArnError( 

2972 arn=arn_details['original'], 

2973 msg=error_msg, 

2974 ) 

2975 

2976 def _is_outpost_bucket(self, arn_details): 

2977 if arn_details['service'] != 's3-outposts': 

2978 return False 

2979 resources = arn_details['resources'] 

2980 if len(resources) != 4: 

2981 return False 

2982 # Resource must be of the form outpost/op-123/bucket/name 

2983 return resources[0] == 'outpost' and resources[2] == 'bucket' 

2984 

2985 def _store_outpost_bucket(self, params, context, arn_details): 

2986 self._override_account_id_param(params, arn_details) 

2987 bucket_name = arn_details['resources'][3] 

2988 params['Bucket'] = bucket_name 

2989 arn_details['bucket_name'] = bucket_name 

2990 arn_details['outpost_name'] = arn_details['resources'][1] 

2991 context['arn_details'] = arn_details 

2992 

2993 

2994class S3ControlArnParamHandlerv2(S3ControlArnParamHandler): 

2995 """Updated version of S3ControlArnParamHandler for use when 

2996 EndpointRulesetResolver is in use for endpoint resolution. 

2997 

2998 This class is considered private and subject to abrupt breaking changes or 

2999 removal without prior announcement. Please do not use it directly. 

3000 """ 

3001 

3002 def __init__(self, arn_parser=None): 

3003 self._arn_parser = arn_parser 

3004 if arn_parser is None: 

3005 self._arn_parser = ArnParser() 

3006 

3007 def register(self, event_emitter): 

3008 event_emitter.register( 

3009 'before-endpoint-resolution.s3-control', 

3010 self.handle_arn, 

3011 ) 

3012 

3013 def _handle_name_param(self, params, model, context): 

3014 # CreateAccessPoint is a special case that does not expand Name 

3015 if model.name == 'CreateAccessPoint': 

3016 return 

3017 arn_details = self._get_arn_details_from_param(params, 'Name') 

3018 if arn_details is None: 

3019 return 

3020 self._raise_for_fips_pseudo_region(arn_details) 

3021 self._raise_for_accelerate_endpoint(context) 

3022 if self._is_outpost_accesspoint(arn_details): 

3023 self._store_outpost_accesspoint(params, context, arn_details) 

3024 else: 

3025 error_msg = 'The Name parameter does not support the provided ARN' 

3026 raise UnsupportedS3ControlArnError( 

3027 arn=arn_details['original'], 

3028 msg=error_msg, 

3029 ) 

3030 

3031 def _store_outpost_accesspoint(self, params, context, arn_details): 

3032 self._override_account_id_param(params, arn_details) 

3033 

3034 def _handle_bucket_param(self, params, model, context): 

3035 arn_details = self._get_arn_details_from_param(params, 'Bucket') 

3036 if arn_details is None: 

3037 return 

3038 self._raise_for_fips_pseudo_region(arn_details) 

3039 self._raise_for_accelerate_endpoint(context) 

3040 if self._is_outpost_bucket(arn_details): 

3041 self._store_outpost_bucket(params, context, arn_details) 

3042 else: 

3043 error_msg = ( 

3044 'The Bucket parameter does not support the provided ARN' 

3045 ) 

3046 raise UnsupportedS3ControlArnError( 

3047 arn=arn_details['original'], 

3048 msg=error_msg, 

3049 ) 

3050 

3051 def _store_outpost_bucket(self, params, context, arn_details): 

3052 self._override_account_id_param(params, arn_details) 

3053 

3054 def _raise_for_fips_pseudo_region(self, arn_details): 

3055 # FIPS pseudo region names cannot be used in ARNs 

3056 arn_region = arn_details['region'] 

3057 if arn_region.startswith('fips-') or arn_region.endswith('fips-'): 

3058 raise UnsupportedS3ControlArnError( 

3059 arn=arn_details['original'], 

3060 msg='Invalid ARN, FIPS region not allowed in ARN.', 

3061 ) 

3062 

3063 def _raise_for_accelerate_endpoint(self, context): 

3064 s3_config = context['client_config'].s3 or {} 

3065 if s3_config.get('use_accelerate_endpoint'): 

3066 raise UnsupportedS3ControlConfigurationError( 

3067 msg='S3 control client does not support accelerate endpoints', 

3068 ) 

3069 

3070 

3071class ContainerMetadataFetcher: 

3072 TIMEOUT_SECONDS = 2 

3073 RETRY_ATTEMPTS = 3 

3074 SLEEP_TIME = 1 

3075 IP_ADDRESS = '169.254.170.2' 

3076 _ALLOWED_HOSTS = [ 

3077 IP_ADDRESS, 

3078 '169.254.170.23', 

3079 'fd00:ec2::23', 

3080 'localhost', 

3081 ] 

3082 

3083 def __init__(self, session=None, sleep=time.sleep): 

3084 if session is None: 

3085 session = botocore.httpsession.URLLib3Session( 

3086 timeout=self.TIMEOUT_SECONDS 

3087 ) 

3088 self._session = session 

3089 self._sleep = sleep 

3090 

3091 def retrieve_full_uri(self, full_url, headers=None): 

3092 """Retrieve JSON metadata from container metadata. 

3093 

3094 :type full_url: str 

3095 :param full_url: The full URL of the metadata service. 

3096 This should include the scheme as well, e.g 

3097 "http://localhost:123/foo" 

3098 

3099 """ 

3100 self._validate_allowed_url(full_url) 

3101 return self._retrieve_credentials(full_url, headers) 

3102 

3103 def _validate_allowed_url(self, full_url): 

3104 parsed = botocore.compat.urlparse(full_url) 

3105 if self._is_loopback_address(parsed.hostname): 

3106 return 

3107 is_whitelisted_host = self._check_if_whitelisted_host(parsed.hostname) 

3108 if not is_whitelisted_host: 

3109 raise ValueError( 

3110 f"Unsupported host '{parsed.hostname}'. Can only retrieve metadata " 

3111 f"from a loopback address or one of these hosts: {', '.join(self._ALLOWED_HOSTS)}" 

3112 ) 

3113 

3114 def _is_loopback_address(self, hostname): 

3115 try: 

3116 ip = ip_address(hostname) 

3117 return ip.is_loopback 

3118 except ValueError: 

3119 return False 

3120 

3121 def _check_if_whitelisted_host(self, host): 

3122 if host in self._ALLOWED_HOSTS: 

3123 return True 

3124 return False 

3125 

3126 def retrieve_uri(self, relative_uri): 

3127 """Retrieve JSON metadata from container metadata. 

3128 

3129 :type relative_uri: str 

3130 :param relative_uri: A relative URI, e.g "/foo/bar?id=123" 

3131 

3132 :return: The parsed JSON response. 

3133 

3134 """ 

3135 full_url = self.full_url(relative_uri) 

3136 return self._retrieve_credentials(full_url) 

3137 

3138 def _retrieve_credentials(self, full_url, extra_headers=None): 

3139 headers = {'Accept': 'application/json'} 

3140 if extra_headers is not None: 

3141 headers.update(extra_headers) 

3142 attempts = 0 

3143 while True: 

3144 try: 

3145 return self._get_response( 

3146 full_url, headers, self.TIMEOUT_SECONDS 

3147 ) 

3148 except MetadataRetrievalError as e: 

3149 logger.debug( 

3150 "Received error when attempting to retrieve " 

3151 "container metadata: %s", 

3152 e, 

3153 exc_info=True, 

3154 ) 

3155 self._sleep(self.SLEEP_TIME) 

3156 attempts += 1 

3157 if attempts >= self.RETRY_ATTEMPTS: 

3158 raise 

3159 

3160 def _get_response(self, full_url, headers, timeout): 

3161 try: 

3162 AWSRequest = botocore.awsrequest.AWSRequest 

3163 request = AWSRequest(method='GET', url=full_url, headers=headers) 

3164 response = self._session.send(request.prepare()) 

3165 response_text = response.content.decode('utf-8') 

3166 if response.status_code != 200: 

3167 raise MetadataRetrievalError( 

3168 error_msg=( 

3169 f"Received non 200 response {response.status_code} " 

3170 f"from container metadata: {response_text}" 

3171 ) 

3172 ) 

3173 try: 

3174 return json.loads(response_text) 

3175 except ValueError: 

3176 error_msg = "Unable to parse JSON returned from container metadata services" 

3177 logger.debug('%s:%s', error_msg, response_text) 

3178 raise MetadataRetrievalError(error_msg=error_msg) 

3179 except RETRYABLE_HTTP_ERRORS as e: 

3180 error_msg = ( 

3181 "Received error when attempting to retrieve " 

3182 f"container metadata: {e}" 

3183 ) 

3184 raise MetadataRetrievalError(error_msg=error_msg) 

3185 

3186 def full_url(self, relative_uri): 

3187 return f'http://{self.IP_ADDRESS}{relative_uri}' 

3188 

3189 

3190def get_environ_proxies(url): 

3191 if should_bypass_proxies(url): 

3192 return {} 

3193 else: 

3194 return getproxies() 

3195 

3196 

3197def should_bypass_proxies(url): 

3198 """ 

3199 Returns whether we should bypass proxies or not. 

3200 """ 

3201 # NOTE: requests allowed for ip/cidr entries in no_proxy env that we don't 

3202 # support current as urllib only checks DNS suffix 

3203 # If the system proxy settings indicate that this URL should be bypassed, 

3204 # don't proxy. 

3205 # The proxy_bypass function is incredibly buggy on OS X in early versions 

3206 # of Python 2.6, so allow this call to fail. Only catch the specific 

3207 # exceptions we've seen, though: this call failing in other ways can reveal 

3208 # legitimate problems. 

3209 try: 

3210 if proxy_bypass(urlparse(url).netloc): 

3211 return True 

3212 except (TypeError, socket.gaierror): 

3213 pass 

3214 

3215 return False 

3216 

3217 

3218def determine_content_length(body): 

3219 # No body, content length of 0 

3220 if not body: 

3221 return 0 

3222 

3223 # Try asking the body for it's length 

3224 try: 

3225 return len(body) 

3226 except (AttributeError, TypeError): 

3227 pass 

3228 

3229 # Try getting the length from a seekable stream 

3230 if hasattr(body, 'seek') and hasattr(body, 'tell'): 

3231 try: 

3232 orig_pos = body.tell() 

3233 body.seek(0, 2) 

3234 end_file_pos = body.tell() 

3235 body.seek(orig_pos) 

3236 return end_file_pos - orig_pos 

3237 except io.UnsupportedOperation: 

3238 # in case when body is, for example, io.BufferedIOBase object 

3239 # it has "seek" method which throws "UnsupportedOperation" 

3240 # exception in such case we want to fall back to "chunked" 

3241 # encoding 

3242 pass 

3243 # Failed to determine the length 

3244 return None 

3245 

3246 

3247def get_encoding_from_headers(headers, default='ISO-8859-1'): 

3248 """Returns encodings from given HTTP Header Dict. 

3249 

3250 :param headers: dictionary to extract encoding from. 

3251 :param default: default encoding if the content-type is text 

3252 """ 

3253 

3254 content_type = headers.get('content-type') 

3255 

3256 if not content_type: 

3257 return None 

3258 

3259 message = email.message.Message() 

3260 message['content-type'] = content_type 

3261 charset = message.get_param("charset") 

3262 

3263 if charset is not None: 

3264 return charset 

3265 

3266 if 'text' in content_type: 

3267 return default 

3268 

3269 

3270def calculate_md5(body, **kwargs): 

3271 """This function has been deprecated, but is kept for backwards compatibility.""" 

3272 if isinstance(body, (bytes, bytearray)): 

3273 binary_md5 = _calculate_md5_from_bytes(body) 

3274 else: 

3275 binary_md5 = _calculate_md5_from_file(body) 

3276 return base64.b64encode(binary_md5).decode('ascii') 

3277 

3278 

3279def _calculate_md5_from_bytes(body_bytes): 

3280 """This function has been deprecated, but is kept for backwards compatibility.""" 

3281 md5 = get_md5(body_bytes, usedforsecurity=False) 

3282 return md5.digest() 

3283 

3284 

3285def _calculate_md5_from_file(fileobj): 

3286 """This function has been deprecated, but is kept for backwards compatibility.""" 

3287 start_position = fileobj.tell() 

3288 md5 = get_md5(usedforsecurity=False) 

3289 for chunk in iter(lambda: fileobj.read(1024 * 1024), b''): 

3290 md5.update(chunk) 

3291 fileobj.seek(start_position) 

3292 return md5.digest() 

3293 

3294 

3295def _is_s3express_request(params): 

3296 endpoint_properties = params.get('context', {}).get( 

3297 'endpoint_properties', {} 

3298 ) 

3299 return endpoint_properties.get('backend') == 'S3Express' 

3300 

3301 

3302def has_checksum_header(params): 

3303 """ 

3304 Checks if a header starting with "x-amz-checksum-" is provided in a request. 

3305 

3306 This function is considered private and subject to abrupt breaking changes or 

3307 removal without prior announcement. Please do not use it directly. 

3308 """ 

3309 headers = params['headers'] 

3310 

3311 # If a header matching the x-amz-checksum-* pattern is present, we 

3312 # assume a checksum has already been provided by the user. 

3313 for header in headers: 

3314 if CHECKSUM_HEADER_PATTERN.match(header): 

3315 return True 

3316 

3317 return False 

3318 

3319 

3320def conditionally_calculate_checksum(params, **kwargs): 

3321 """This function has been deprecated, but is kept for backwards compatibility.""" 

3322 if not has_checksum_header(params): 

3323 conditionally_calculate_md5(params, **kwargs) 

3324 conditionally_enable_crc32(params, **kwargs) 

3325 

3326 

3327def conditionally_enable_crc32(params, **kwargs): 

3328 """This function has been deprecated, but is kept for backwards compatibility.""" 

3329 checksum_context = params.get('context', {}).get('checksum', {}) 

3330 checksum_algorithm = checksum_context.get('request_algorithm') 

3331 if ( 

3332 _is_s3express_request(params) 

3333 and params['body'] is not None 

3334 and checksum_algorithm in (None, "conditional-md5") 

3335 ): 

3336 params['context']['checksum'] = { 

3337 'request_algorithm': { 

3338 'algorithm': 'crc32', 

3339 'in': 'header', 

3340 'name': 'x-amz-checksum-crc32', 

3341 } 

3342 } 

3343 

3344 

3345def conditionally_calculate_md5(params, **kwargs): 

3346 """Only add a Content-MD5 if the system supports it. 

3347 

3348 This function has been deprecated, but is kept for backwards compatibility. 

3349 """ 

3350 body = params['body'] 

3351 checksum_context = params.get('context', {}).get('checksum', {}) 

3352 checksum_algorithm = checksum_context.get('request_algorithm') 

3353 if checksum_algorithm and checksum_algorithm != 'conditional-md5': 

3354 # Skip for requests that will have a flexible checksum applied 

3355 return 

3356 

3357 if has_checksum_header(params): 

3358 # Don't add a new header if one is already available. 

3359 return 

3360 

3361 if _is_s3express_request(params): 

3362 # S3Express doesn't support MD5 

3363 return 

3364 

3365 if MD5_AVAILABLE and body is not None: 

3366 md5_digest = calculate_md5(body, **kwargs) 

3367 params['headers']['Content-MD5'] = md5_digest 

3368 

3369 

3370class FileWebIdentityTokenLoader: 

3371 def __init__(self, web_identity_token_path, _open=open): 

3372 self._web_identity_token_path = web_identity_token_path 

3373 self._open = _open 

3374 

3375 def __call__(self): 

3376 with self._open(self._web_identity_token_path) as token_file: 

3377 return token_file.read() 

3378 

3379 

3380class SSOTokenLoader: 

3381 def __init__(self, cache=None): 

3382 if cache is None: 

3383 cache = {} 

3384 self._cache = cache 

3385 

3386 def _generate_cache_key(self, start_url, session_name): 

3387 input_str = start_url 

3388 if session_name is not None: 

3389 input_str = session_name 

3390 return hashlib.sha1(input_str.encode('utf-8')).hexdigest() 

3391 

3392 def save_token(self, start_url, token, session_name=None): 

3393 cache_key = self._generate_cache_key(start_url, session_name) 

3394 self._cache[cache_key] = token 

3395 

3396 def __call__(self, start_url, session_name=None): 

3397 cache_key = self._generate_cache_key(start_url, session_name) 

3398 logger.debug('Checking for cached token at: %s', cache_key) 

3399 if cache_key not in self._cache: 

3400 name = start_url 

3401 if session_name is not None: 

3402 name = session_name 

3403 error_msg = f'Token for {name} does not exist' 

3404 raise SSOTokenLoadError(error_msg=error_msg) 

3405 

3406 token = self._cache[cache_key] 

3407 if 'accessToken' not in token or 'expiresAt' not in token: 

3408 error_msg = f'Token for {start_url} is invalid' 

3409 raise SSOTokenLoadError(error_msg=error_msg) 

3410 return token 

3411 

3412 

3413class EventbridgeSignerSetter: 

3414 _DEFAULT_PARTITION = 'aws' 

3415 _DEFAULT_DNS_SUFFIX = 'amazonaws.com' 

3416 

3417 def __init__(self, endpoint_resolver, region=None, endpoint_url=None): 

3418 self._endpoint_resolver = endpoint_resolver 

3419 self._region = region 

3420 self._endpoint_url = endpoint_url 

3421 

3422 def register(self, event_emitter): 

3423 event_emitter.register( 

3424 'before-parameter-build.events.PutEvents', 

3425 self.check_for_global_endpoint, 

3426 ) 

3427 event_emitter.register( 

3428 'before-call.events.PutEvents', self.set_endpoint_url 

3429 ) 

3430 

3431 def set_endpoint_url(self, params, context, **kwargs): 

3432 if 'eventbridge_endpoint' in context: 

3433 endpoint = context['eventbridge_endpoint'] 

3434 logger.debug( 

3435 "Rewriting URL from %s to %s", params['url'], endpoint 

3436 ) 

3437 params['url'] = endpoint 

3438 

3439 def check_for_global_endpoint(self, params, context, **kwargs): 

3440 endpoint = params.get('EndpointId') 

3441 if endpoint is None: 

3442 return 

3443 

3444 if len(endpoint) == 0: 

3445 raise InvalidEndpointConfigurationError( 

3446 msg='EndpointId must not be a zero length string' 

3447 ) 

3448 

3449 if not HAS_CRT: 

3450 raise MissingDependencyException( 

3451 msg="Using EndpointId requires an additional " 

3452 "dependency. You will need to pip install " 

3453 "botocore[crt] before proceeding." 

3454 ) 

3455 

3456 config = context.get('client_config') 

3457 endpoint_variant_tags = None 

3458 if config is not None: 

3459 if config.use_fips_endpoint: 

3460 raise InvalidEndpointConfigurationError( 

3461 msg="FIPS is not supported with EventBridge " 

3462 "multi-region endpoints." 

3463 ) 

3464 if config.use_dualstack_endpoint: 

3465 endpoint_variant_tags = ['dualstack'] 

3466 

3467 if self._endpoint_url is None: 

3468 # Validate endpoint is a valid hostname component 

3469 parts = urlparse(f'https://{endpoint}') 

3470 if parts.hostname != endpoint: 

3471 raise InvalidEndpointConfigurationError( 

3472 msg='EndpointId is not a valid hostname component.' 

3473 ) 

3474 resolved_endpoint = self._get_global_endpoint( 

3475 endpoint, endpoint_variant_tags=endpoint_variant_tags 

3476 ) 

3477 else: 

3478 resolved_endpoint = self._endpoint_url 

3479 

3480 context['eventbridge_endpoint'] = resolved_endpoint 

3481 context['auth_type'] = 'v4a' 

3482 

3483 def _get_global_endpoint(self, endpoint, endpoint_variant_tags=None): 

3484 resolver = self._endpoint_resolver 

3485 

3486 partition = resolver.get_partition_for_region(self._region) 

3487 if partition is None: 

3488 partition = self._DEFAULT_PARTITION 

3489 dns_suffix = resolver.get_partition_dns_suffix( 

3490 partition, endpoint_variant_tags=endpoint_variant_tags 

3491 ) 

3492 if dns_suffix is None: 

3493 dns_suffix = self._DEFAULT_DNS_SUFFIX 

3494 

3495 return f"https://{endpoint}.endpoint.events.{dns_suffix}/" 

3496 

3497 

3498def is_s3_accelerate_url(url): 

3499 """Does the URL match the S3 Accelerate endpoint scheme? 

3500 

3501 Virtual host naming style with bucket names in the netloc part of the URL 

3502 are not allowed by this function. 

3503 """ 

3504 if url is None: 

3505 return False 

3506 

3507 # Accelerate is only valid for Amazon endpoints. 

3508 url_parts = urlsplit(url) 

3509 if not url_parts.netloc.endswith( 

3510 'amazonaws.com' 

3511 ) or url_parts.scheme not in ['https', 'http']: 

3512 return False 

3513 

3514 # The first part of the URL must be s3-accelerate. 

3515 parts = url_parts.netloc.split('.') 

3516 if parts[0] != 's3-accelerate': 

3517 return False 

3518 

3519 # Url parts between 's3-accelerate' and 'amazonaws.com' which 

3520 # represent different url features. 

3521 feature_parts = parts[1:-2] 

3522 

3523 # There should be no duplicate URL parts. 

3524 if len(feature_parts) != len(set(feature_parts)): 

3525 return False 

3526 

3527 # Remaining parts must all be in the whitelist. 

3528 return all(p in S3_ACCELERATE_WHITELIST for p in feature_parts) 

3529 

3530 

3531class JSONFileCache: 

3532 """JSON file cache. 

3533 This provides a dict like interface that stores JSON serializable 

3534 objects. 

3535 The objects are serialized to JSON and stored in a file. These 

3536 values can be retrieved at a later time. 

3537 """ 

3538 

3539 CACHE_DIR = os.path.expanduser(os.path.join('~', '.aws', 'boto', 'cache')) 

3540 

3541 def __init__(self, working_dir=CACHE_DIR, dumps_func=None): 

3542 self._working_dir = working_dir 

3543 if dumps_func is None: 

3544 dumps_func = self._default_dumps 

3545 self._dumps = dumps_func 

3546 

3547 def _default_dumps(self, obj): 

3548 return json.dumps(obj, default=self._serialize_if_needed) 

3549 

3550 def __contains__(self, cache_key): 

3551 actual_key = self._convert_cache_key(cache_key) 

3552 return os.path.isfile(actual_key) 

3553 

3554 def __getitem__(self, cache_key): 

3555 """Retrieve value from a cache key.""" 

3556 actual_key = self._convert_cache_key(cache_key) 

3557 try: 

3558 with open(actual_key) as f: 

3559 return json.load(f) 

3560 except (OSError, ValueError): 

3561 raise KeyError(cache_key) 

3562 

3563 def __delitem__(self, cache_key): 

3564 actual_key = self._convert_cache_key(cache_key) 

3565 try: 

3566 key_path = Path(actual_key) 

3567 key_path.unlink() 

3568 except FileNotFoundError: 

3569 raise KeyError(cache_key) 

3570 

3571 def __setitem__(self, cache_key, value): 

3572 full_key = self._convert_cache_key(cache_key) 

3573 try: 

3574 file_content = self._dumps(value) 

3575 except (TypeError, ValueError): 

3576 raise ValueError( 

3577 f"Value cannot be cached, must be JSON serializable: {value}" 

3578 ) 

3579 if not os.path.isdir(self._working_dir): 

3580 os.makedirs(self._working_dir, exist_ok=True) 

3581 with os.fdopen( 

3582 os.open(full_key, os.O_WRONLY | os.O_CREAT, 0o600), 'w' 

3583 ) as f: 

3584 f.truncate() 

3585 f.write(file_content) 

3586 

3587 def _convert_cache_key(self, cache_key): 

3588 full_path = os.path.join(self._working_dir, cache_key + '.json') 

3589 return full_path 

3590 

3591 def _serialize_if_needed(self, value, iso=False): 

3592 if isinstance(value, _DatetimeClass): 

3593 if iso: 

3594 return value.isoformat() 

3595 return value.strftime('%Y-%m-%dT%H:%M:%S%Z') 

3596 return value 

3597 

3598 

3599def is_s3express_bucket(bucket): 

3600 if bucket is None: 

3601 return False 

3602 return bucket.endswith('--x-s3') 

3603 

3604 

3605def get_token_from_environment(signing_name, environ=None): 

3606 if not isinstance(signing_name, str) or not signing_name.strip(): 

3607 return None 

3608 

3609 if environ is None: 

3610 environ = os.environ 

3611 env_var = _get_bearer_env_var_name(signing_name) 

3612 return environ.get(env_var) 

3613 

3614 

3615def _get_bearer_env_var_name(signing_name): 

3616 bearer_name = signing_name.replace('-', '_').replace(' ', '_').upper() 

3617 return f"AWS_BEARER_TOKEN_{bearer_name}" 

3618 

3619 

3620# This parameter is not part of the public interface and is subject to abrupt 

3621# breaking changes or removal without prior announcement. 

3622# Mapping of services that have been renamed for backwards compatibility reasons. 

3623# Keys are the previous name that should be allowed, values are the documented 

3624# and preferred client name. 

3625SERVICE_NAME_ALIASES = {'runtime.sagemaker': 'sagemaker-runtime'} 

3626 

3627 

3628# This parameter is not part of the public interface and is subject to abrupt 

3629# breaking changes or removal without prior announcement. 

3630# Mapping to determine the service ID for services that do not use it as the 

3631# model data directory name. The keys are the data directory name and the 

3632# values are the transformed service IDs (lower case and hyphenated). 

3633CLIENT_NAME_TO_HYPHENIZED_SERVICE_ID_OVERRIDES = { 

3634 # Actual service name we use -> Allowed computed service name. 

3635 'apigateway': 'api-gateway', 

3636 'application-autoscaling': 'application-auto-scaling', 

3637 'appmesh': 'app-mesh', 

3638 'autoscaling': 'auto-scaling', 

3639 'autoscaling-plans': 'auto-scaling-plans', 

3640 'ce': 'cost-explorer', 

3641 'cloudhsmv2': 'cloudhsm-v2', 

3642 'cloudsearchdomain': 'cloudsearch-domain', 

3643 'cognito-idp': 'cognito-identity-provider', 

3644 'config': 'config-service', 

3645 'cur': 'cost-and-usage-report-service', 

3646 'datapipeline': 'data-pipeline', 

3647 'directconnect': 'direct-connect', 

3648 'devicefarm': 'device-farm', 

3649 'discovery': 'application-discovery-service', 

3650 'dms': 'database-migration-service', 

3651 'ds': 'directory-service', 

3652 'ds-data': 'directory-service-data', 

3653 'dynamodbstreams': 'dynamodb-streams', 

3654 'elasticbeanstalk': 'elastic-beanstalk', 

3655 'elastictranscoder': 'elastic-transcoder', 

3656 'elb': 'elastic-load-balancing', 

3657 'elbv2': 'elastic-load-balancing-v2', 

3658 'es': 'elasticsearch-service', 

3659 'events': 'eventbridge', 

3660 'globalaccelerator': 'global-accelerator', 

3661 'iot-data': 'iot-data-plane', 

3662 'iot-jobs-data': 'iot-jobs-data-plane', 

3663 'iotevents-data': 'iot-events-data', 

3664 'iotevents': 'iot-events', 

3665 'iotwireless': 'iot-wireless', 

3666 'kinesisanalytics': 'kinesis-analytics', 

3667 'kinesisanalyticsv2': 'kinesis-analytics-v2', 

3668 'kinesisvideo': 'kinesis-video', 

3669 'lex-models': 'lex-model-building-service', 

3670 'lexv2-models': 'lex-models-v2', 

3671 'lex-runtime': 'lex-runtime-service', 

3672 'lexv2-runtime': 'lex-runtime-v2', 

3673 'logs': 'cloudwatch-logs', 

3674 'machinelearning': 'machine-learning', 

3675 'marketplacecommerceanalytics': 'marketplace-commerce-analytics', 

3676 'marketplace-entitlement': 'marketplace-entitlement-service', 

3677 'meteringmarketplace': 'marketplace-metering', 

3678 'mgh': 'migration-hub', 

3679 'sms-voice': 'pinpoint-sms-voice', 

3680 'resourcegroupstaggingapi': 'resource-groups-tagging-api', 

3681 'route53': 'route-53', 

3682 'route53domains': 'route-53-domains', 

3683 's3control': 's3-control', 

3684 'sdb': 'simpledb', 

3685 'secretsmanager': 'secrets-manager', 

3686 'serverlessrepo': 'serverlessapplicationrepository', 

3687 'servicecatalog': 'service-catalog', 

3688 'servicecatalog-appregistry': 'service-catalog-appregistry', 

3689 'stepfunctions': 'sfn', 

3690 'storagegateway': 'storage-gateway', 

3691}