Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/botocore/utils.py: 21%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

1684 statements  

1# Copyright 2012-2014 Amazon.com, Inc. or its affiliates. All Rights Reserved. 

2# 

3# Licensed under the Apache License, Version 2.0 (the "License"). You 

4# may not use this file except in compliance with the License. A copy of 

5# the License is located at 

6# 

7# http://aws.amazon.com/apache2.0/ 

8# 

9# or in the "license" file accompanying this file. This file is 

10# distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF 

11# ANY KIND, either express or implied. See the License for the specific 

12# language governing permissions and limitations under the License. 

13import base64 

14import binascii 

15import datetime 

16import email.message 

17import functools 

18import hashlib 

19import io 

20import logging 

21import os 

22import random 

23import re 

24import socket 

25import time 

26import warnings 

27import weakref 

28from datetime import datetime as _DatetimeClass 

29from ipaddress import ip_address 

30from pathlib import Path 

31from urllib.request import getproxies, proxy_bypass 

32 

33import dateutil.parser 

34from dateutil.tz import tzutc 

35from urllib3.exceptions import LocationParseError 

36 

37import botocore 

38import botocore.awsrequest 

39import botocore.httpsession 

40 

41# IP Regexes retained for backwards compatibility 

42from botocore.compat import HEX_PAT # noqa: F401 

43from botocore.compat import IPV4_PAT # noqa: F401 

44from botocore.compat import IPV6_ADDRZ_PAT # noqa: F401 

45from botocore.compat import IPV6_PAT # noqa: F401 

46from botocore.compat import LS32_PAT # noqa: F401 

47from botocore.compat import UNRESERVED_PAT # noqa: F401 

48from botocore.compat import ZONE_ID_PAT # noqa: F401 

49from botocore.compat import ( 

50 HAS_CRT, 

51 IPV4_RE, 

52 IPV6_ADDRZ_RE, 

53 MD5_AVAILABLE, 

54 UNSAFE_URL_CHARS, 

55 OrderedDict, 

56 get_md5, 

57 get_tzinfo_options, 

58 json, 

59 quote, 

60 urlparse, 

61 urlsplit, 

62 urlunsplit, 

63 zip_longest, 

64) 

65from botocore.exceptions import ( 

66 ClientError, 

67 ConfigNotFound, 

68 ConnectionClosedError, 

69 ConnectTimeoutError, 

70 EndpointConnectionError, 

71 HTTPClientError, 

72 InvalidDNSNameError, 

73 InvalidEndpointConfigurationError, 

74 InvalidExpressionError, 

75 InvalidHostLabelError, 

76 InvalidIMDSEndpointError, 

77 InvalidIMDSEndpointModeError, 

78 InvalidRegionError, 

79 MetadataRetrievalError, 

80 MissingDependencyException, 

81 ReadTimeoutError, 

82 SSOTokenLoadError, 

83 UnsupportedOutpostResourceError, 

84 UnsupportedS3AccesspointConfigurationError, 

85 UnsupportedS3ArnError, 

86 UnsupportedS3ConfigurationError, 

87 UnsupportedS3ControlArnError, 

88 UnsupportedS3ControlConfigurationError, 

89) 

90 

91logger = logging.getLogger(__name__) 

92DEFAULT_METADATA_SERVICE_TIMEOUT = 1 

93METADATA_BASE_URL = 'http://169.254.169.254/' 

94METADATA_BASE_URL_IPv6 = 'http://[fd00:ec2::254]/' 

95METADATA_ENDPOINT_MODES = ('ipv4', 'ipv6') 

96 

97# These are chars that do not need to be urlencoded. 

98# Based on rfc2986, section 2.3 

99SAFE_CHARS = '-._~' 

100LABEL_RE = re.compile(r'[a-z0-9][a-z0-9\-]*[a-z0-9]') 

101RETRYABLE_HTTP_ERRORS = ( 

102 ReadTimeoutError, 

103 EndpointConnectionError, 

104 ConnectionClosedError, 

105 ConnectTimeoutError, 

106) 

107S3_ACCELERATE_WHITELIST = ['dualstack'] 

108# In switching events from using service name / endpoint prefix to service 

109# id, we have to preserve compatibility. This maps the instances where either 

110# is different than the transformed service id. 

111EVENT_ALIASES = { 

112 "api.mediatailor": "mediatailor", 

113 "api.pricing": "pricing", 

114 "api.sagemaker": "sagemaker", 

115 "apigateway": "api-gateway", 

116 "application-autoscaling": "application-auto-scaling", 

117 "appstream2": "appstream", 

118 "autoscaling": "auto-scaling", 

119 "autoscaling-plans": "auto-scaling-plans", 

120 "ce": "cost-explorer", 

121 "cloudhsmv2": "cloudhsm-v2", 

122 "cloudsearchdomain": "cloudsearch-domain", 

123 "cognito-idp": "cognito-identity-provider", 

124 "config": "config-service", 

125 "cur": "cost-and-usage-report-service", 

126 "data.iot": "iot-data-plane", 

127 "data.jobs.iot": "iot-jobs-data-plane", 

128 "data.mediastore": "mediastore-data", 

129 "datapipeline": "data-pipeline", 

130 "devicefarm": "device-farm", 

131 "devices.iot1click": "iot-1click-devices-service", 

132 "directconnect": "direct-connect", 

133 "discovery": "application-discovery-service", 

134 "dms": "database-migration-service", 

135 "ds": "directory-service", 

136 "dynamodbstreams": "dynamodb-streams", 

137 "elasticbeanstalk": "elastic-beanstalk", 

138 "elasticfilesystem": "efs", 

139 "elasticloadbalancing": "elastic-load-balancing", 

140 "elasticmapreduce": "emr", 

141 "elastictranscoder": "elastic-transcoder", 

142 "elb": "elastic-load-balancing", 

143 "elbv2": "elastic-load-balancing-v2", 

144 "email": "ses", 

145 "entitlement.marketplace": "marketplace-entitlement-service", 

146 "es": "elasticsearch-service", 

147 "events": "eventbridge", 

148 "cloudwatch-events": "eventbridge", 

149 "iot-data": "iot-data-plane", 

150 "iot-jobs-data": "iot-jobs-data-plane", 

151 "iot1click-devices": "iot-1click-devices-service", 

152 "iot1click-projects": "iot-1click-projects", 

153 "kinesisanalytics": "kinesis-analytics", 

154 "kinesisvideo": "kinesis-video", 

155 "lex-models": "lex-model-building-service", 

156 "lex-runtime": "lex-runtime-service", 

157 "logs": "cloudwatch-logs", 

158 "machinelearning": "machine-learning", 

159 "marketplace-entitlement": "marketplace-entitlement-service", 

160 "marketplacecommerceanalytics": "marketplace-commerce-analytics", 

161 "metering.marketplace": "marketplace-metering", 

162 "meteringmarketplace": "marketplace-metering", 

163 "mgh": "migration-hub", 

164 "models.lex": "lex-model-building-service", 

165 "monitoring": "cloudwatch", 

166 "mturk-requester": "mturk", 

167 "opsworks-cm": "opsworkscm", 

168 "projects.iot1click": "iot-1click-projects", 

169 "resourcegroupstaggingapi": "resource-groups-tagging-api", 

170 "route53": "route-53", 

171 "route53domains": "route-53-domains", 

172 "runtime.lex": "lex-runtime-service", 

173 "runtime.sagemaker": "sagemaker-runtime", 

174 "sdb": "simpledb", 

175 "secretsmanager": "secrets-manager", 

176 "serverlessrepo": "serverlessapplicationrepository", 

177 "servicecatalog": "service-catalog", 

178 "states": "sfn", 

179 "stepfunctions": "sfn", 

180 "storagegateway": "storage-gateway", 

181 "streams.dynamodb": "dynamodb-streams", 

182 "tagging": "resource-groups-tagging-api", 

183} 

184 

185 

186# This pattern can be used to detect if a header is a flexible checksum header 

187CHECKSUM_HEADER_PATTERN = re.compile( 

188 r'^X-Amz-Checksum-([a-z0-9]*)$', 

189 flags=re.IGNORECASE, 

190) 

191 

192 

193def ensure_boolean(val): 

194 """Ensures a boolean value if a string or boolean is provided 

195 

196 For strings, the value for True/False is case insensitive 

197 """ 

198 if isinstance(val, bool): 

199 return val 

200 elif isinstance(val, str): 

201 return val.lower() == 'true' 

202 else: 

203 return False 

204 

205 

206def resolve_imds_endpoint_mode(session): 

207 """Resolving IMDS endpoint mode to either IPv6 or IPv4. 

208 

209 ec2_metadata_service_endpoint_mode takes precedence over imds_use_ipv6. 

210 """ 

211 endpoint_mode = session.get_config_variable( 

212 'ec2_metadata_service_endpoint_mode' 

213 ) 

214 if endpoint_mode is not None: 

215 lendpoint_mode = endpoint_mode.lower() 

216 if lendpoint_mode not in METADATA_ENDPOINT_MODES: 

217 error_msg_kwargs = { 

218 'mode': endpoint_mode, 

219 'valid_modes': METADATA_ENDPOINT_MODES, 

220 } 

221 raise InvalidIMDSEndpointModeError(**error_msg_kwargs) 

222 return lendpoint_mode 

223 elif session.get_config_variable('imds_use_ipv6'): 

224 return 'ipv6' 

225 return 'ipv4' 

226 

227 

228def is_json_value_header(shape): 

229 """Determines if the provided shape is the special header type jsonvalue. 

230 

231 :type shape: botocore.shape 

232 :param shape: Shape to be inspected for the jsonvalue trait. 

233 

234 :return: True if this type is a jsonvalue, False otherwise 

235 :rtype: Bool 

236 """ 

237 return ( 

238 hasattr(shape, 'serialization') 

239 and shape.serialization.get('jsonvalue', False) 

240 and shape.serialization.get('location') == 'header' 

241 and shape.type_name == 'string' 

242 ) 

243 

244 

245def has_header(header_name, headers): 

246 """Case-insensitive check for header key.""" 

247 if header_name is None: 

248 return False 

249 elif isinstance(headers, botocore.awsrequest.HeadersDict): 

250 return header_name in headers 

251 else: 

252 return header_name.lower() in [key.lower() for key in headers.keys()] 

253 

254 

255def get_service_module_name(service_model): 

256 """Returns the module name for a service 

257 

258 This is the value used in both the documentation and client class name 

259 """ 

260 name = service_model.metadata.get( 

261 'serviceAbbreviation', 

262 service_model.metadata.get( 

263 'serviceFullName', service_model.service_name 

264 ), 

265 ) 

266 name = name.replace('Amazon', '') 

267 name = name.replace('AWS', '') 

268 name = re.sub(r'\W+', '', name) 

269 return name 

270 

271 

272def normalize_url_path(path): 

273 if not path: 

274 return '/' 

275 return remove_dot_segments(path) 

276 

277 

278def normalize_boolean(val): 

279 """Returns None if val is None, otherwise ensure value 

280 converted to boolean""" 

281 if val is None: 

282 return val 

283 else: 

284 return ensure_boolean(val) 

285 

286 

287def remove_dot_segments(url): 

288 # RFC 3986, section 5.2.4 "Remove Dot Segments" 

289 # Also, AWS services require consecutive slashes to be removed, 

290 # so that's done here as well 

291 if not url: 

292 return '' 

293 input_url = url.split('/') 

294 output_list = [] 

295 for x in input_url: 

296 if x and x != '.': 

297 if x == '..': 

298 if output_list: 

299 output_list.pop() 

300 else: 

301 output_list.append(x) 

302 

303 if url[0] == '/': 

304 first = '/' 

305 else: 

306 first = '' 

307 if url[-1] == '/' and output_list: 

308 last = '/' 

309 else: 

310 last = '' 

311 return first + '/'.join(output_list) + last 

312 

313 

314def validate_jmespath_for_set(expression): 

315 # Validates a limited jmespath expression to determine if we can set a 

316 # value based on it. Only works with dotted paths. 

317 if not expression or expression == '.': 

318 raise InvalidExpressionError(expression=expression) 

319 

320 for invalid in ['[', ']', '*']: 

321 if invalid in expression: 

322 raise InvalidExpressionError(expression=expression) 

323 

324 

325def set_value_from_jmespath(source, expression, value, is_first=True): 

326 # This takes a (limited) jmespath-like expression & can set a value based 

327 # on it. 

328 # Limitations: 

329 # * Only handles dotted lookups 

330 # * No offsets/wildcards/slices/etc. 

331 if is_first: 

332 validate_jmespath_for_set(expression) 

333 

334 bits = expression.split('.', 1) 

335 current_key, remainder = bits[0], bits[1] if len(bits) > 1 else '' 

336 

337 if not current_key: 

338 raise InvalidExpressionError(expression=expression) 

339 

340 if remainder: 

341 if current_key not in source: 

342 # We've got something in the expression that's not present in the 

343 # source (new key). If there's any more bits, we'll set the key 

344 # with an empty dictionary. 

345 source[current_key] = {} 

346 

347 return set_value_from_jmespath( 

348 source[current_key], remainder, value, is_first=False 

349 ) 

350 

351 # If we're down to a single key, set it. 

352 source[current_key] = value 

353 

354 

355def is_global_accesspoint(context): 

356 """Determine if request is intended for an MRAP accesspoint.""" 

357 s3_accesspoint = context.get('s3_accesspoint', {}) 

358 is_global = s3_accesspoint.get('region') == '' 

359 return is_global 

360 

361 

362class _RetriesExceededError(Exception): 

363 """Internal exception used when the number of retries are exceeded.""" 

364 

365 pass 

366 

367 

368class BadIMDSRequestError(Exception): 

369 def __init__(self, request): 

370 self.request = request 

371 

372 

373class IMDSFetcher: 

374 _RETRIES_EXCEEDED_ERROR_CLS = _RetriesExceededError 

375 _TOKEN_PATH = 'latest/api/token' 

376 _TOKEN_TTL = '21600' 

377 

378 def __init__( 

379 self, 

380 timeout=DEFAULT_METADATA_SERVICE_TIMEOUT, 

381 num_attempts=1, 

382 base_url=METADATA_BASE_URL, 

383 env=None, 

384 user_agent=None, 

385 config=None, 

386 ): 

387 self._timeout = timeout 

388 self._num_attempts = num_attempts 

389 if config is None: 

390 config = {} 

391 self._base_url = self._select_base_url(base_url, config) 

392 self._config = config 

393 

394 if env is None: 

395 env = os.environ.copy() 

396 self._disabled = ( 

397 env.get('AWS_EC2_METADATA_DISABLED', 'false').lower() == 'true' 

398 ) 

399 self._imds_v1_disabled = config.get('ec2_metadata_v1_disabled') 

400 self._user_agent = user_agent 

401 self._session = botocore.httpsession.URLLib3Session( 

402 timeout=self._timeout, 

403 proxies=get_environ_proxies(self._base_url), 

404 ) 

405 

406 def get_base_url(self): 

407 return self._base_url 

408 

409 def _select_base_url(self, base_url, config): 

410 if config is None: 

411 config = {} 

412 

413 requires_ipv6 = ( 

414 config.get('ec2_metadata_service_endpoint_mode') == 'ipv6' 

415 ) 

416 custom_metadata_endpoint = config.get('ec2_metadata_service_endpoint') 

417 

418 if requires_ipv6 and custom_metadata_endpoint: 

419 logger.warning( 

420 "Custom endpoint and IMDS_USE_IPV6 are both set. Using custom endpoint." 

421 ) 

422 

423 chosen_base_url = None 

424 

425 if base_url != METADATA_BASE_URL: 

426 chosen_base_url = base_url 

427 elif custom_metadata_endpoint: 

428 chosen_base_url = custom_metadata_endpoint 

429 elif requires_ipv6: 

430 chosen_base_url = METADATA_BASE_URL_IPv6 

431 else: 

432 chosen_base_url = METADATA_BASE_URL 

433 

434 logger.debug(f"IMDS ENDPOINT: {chosen_base_url}") 

435 if not is_valid_uri(chosen_base_url): 

436 raise InvalidIMDSEndpointError(endpoint=chosen_base_url) 

437 

438 return chosen_base_url 

439 

440 def _construct_url(self, path): 

441 sep = '' 

442 if self._base_url and not self._base_url.endswith('/'): 

443 sep = '/' 

444 return f'{self._base_url}{sep}{path}' 

445 

446 def _fetch_metadata_token(self): 

447 self._assert_enabled() 

448 url = self._construct_url(self._TOKEN_PATH) 

449 headers = { 

450 'x-aws-ec2-metadata-token-ttl-seconds': self._TOKEN_TTL, 

451 } 

452 self._add_user_agent(headers) 

453 request = botocore.awsrequest.AWSRequest( 

454 method='PUT', url=url, headers=headers 

455 ) 

456 for i in range(self._num_attempts): 

457 try: 

458 response = self._session.send(request.prepare()) 

459 if response.status_code == 200: 

460 return response.text 

461 elif response.status_code in (404, 403, 405): 

462 return None 

463 elif response.status_code in (400,): 

464 raise BadIMDSRequestError(request) 

465 except ReadTimeoutError: 

466 return None 

467 except RETRYABLE_HTTP_ERRORS as e: 

468 logger.debug( 

469 "Caught retryable HTTP exception while making metadata " 

470 "service request to %s: %s", 

471 url, 

472 e, 

473 exc_info=True, 

474 ) 

475 except HTTPClientError as e: 

476 if isinstance(e.kwargs.get('error'), LocationParseError): 

477 raise InvalidIMDSEndpointError(endpoint=url, error=e) 

478 else: 

479 raise 

480 return None 

481 

482 def _get_request(self, url_path, retry_func, token=None): 

483 """Make a get request to the Instance Metadata Service. 

484 

485 :type url_path: str 

486 :param url_path: The path component of the URL to make a get request. 

487 This arg is appended to the base_url that was provided in the 

488 initializer. 

489 

490 :type retry_func: callable 

491 :param retry_func: A function that takes the response as an argument 

492 and determines if it needs to retry. By default empty and non 

493 200 OK responses are retried. 

494 

495 :type token: str 

496 :param token: Metadata token to send along with GET requests to IMDS. 

497 """ 

498 self._assert_enabled() 

499 if not token: 

500 self._assert_v1_enabled() 

501 if retry_func is None: 

502 retry_func = self._default_retry 

503 url = self._construct_url(url_path) 

504 headers = {} 

505 if token is not None: 

506 headers['x-aws-ec2-metadata-token'] = token 

507 self._add_user_agent(headers) 

508 for i in range(self._num_attempts): 

509 try: 

510 request = botocore.awsrequest.AWSRequest( 

511 method='GET', url=url, headers=headers 

512 ) 

513 response = self._session.send(request.prepare()) 

514 if not retry_func(response): 

515 return response 

516 except RETRYABLE_HTTP_ERRORS as e: 

517 logger.debug( 

518 "Caught retryable HTTP exception while making metadata " 

519 "service request to %s: %s", 

520 url, 

521 e, 

522 exc_info=True, 

523 ) 

524 raise self._RETRIES_EXCEEDED_ERROR_CLS() 

525 

526 def _add_user_agent(self, headers): 

527 if self._user_agent is not None: 

528 headers['User-Agent'] = self._user_agent 

529 

530 def _assert_enabled(self): 

531 if self._disabled: 

532 logger.debug("Access to EC2 metadata has been disabled.") 

533 raise self._RETRIES_EXCEEDED_ERROR_CLS() 

534 

535 def _assert_v1_enabled(self): 

536 if self._imds_v1_disabled: 

537 raise MetadataRetrievalError( 

538 error_msg="Unable to retrieve token for use in IMDSv2 call and IMDSv1 has been disabled" 

539 ) 

540 

541 def _default_retry(self, response): 

542 return self._is_non_ok_response(response) or self._is_empty(response) 

543 

544 def _is_non_ok_response(self, response): 

545 if response.status_code != 200: 

546 self._log_imds_response(response, 'non-200', log_body=True) 

547 return True 

548 return False 

549 

550 def _is_empty(self, response): 

551 if not response.content: 

552 self._log_imds_response(response, 'no body', log_body=True) 

553 return True 

554 return False 

555 

556 def _log_imds_response(self, response, reason_to_log, log_body=False): 

557 statement = ( 

558 "Metadata service returned %s response " 

559 "with status code of %s for url: %s" 

560 ) 

561 logger_args = [reason_to_log, response.status_code, response.url] 

562 if log_body: 

563 statement += ", content body: %s" 

564 logger_args.append(response.content) 

565 logger.debug(statement, *logger_args) 

566 

567 

568class InstanceMetadataFetcher(IMDSFetcher): 

569 _URL_PATH = 'latest/meta-data/iam/security-credentials/' 

570 _REQUIRED_CREDENTIAL_FIELDS = [ 

571 'AccessKeyId', 

572 'SecretAccessKey', 

573 'Token', 

574 'Expiration', 

575 ] 

576 

577 def retrieve_iam_role_credentials(self): 

578 try: 

579 token = self._fetch_metadata_token() 

580 role_name = self._get_iam_role(token) 

581 credentials = self._get_credentials(role_name, token) 

582 if self._contains_all_credential_fields(credentials): 

583 credentials = { 

584 'role_name': role_name, 

585 'access_key': credentials['AccessKeyId'], 

586 'secret_key': credentials['SecretAccessKey'], 

587 'token': credentials['Token'], 

588 'expiry_time': credentials['Expiration'], 

589 } 

590 self._evaluate_expiration(credentials) 

591 return credentials 

592 else: 

593 # IMDS can return a 200 response that has a JSON formatted 

594 # error message (i.e. if ec2 is not trusted entity for the 

595 # attached role). We do not necessarily want to retry for 

596 # these and we also do not necessarily want to raise a key 

597 # error. So at least log the problematic response and return 

598 # an empty dictionary to signal that it was not able to 

599 # retrieve credentials. These error will contain both a 

600 # Code and Message key. 

601 if 'Code' in credentials and 'Message' in credentials: 

602 logger.debug( 

603 'Error response received when retrieving' 

604 'credentials: %s.', 

605 credentials, 

606 ) 

607 return {} 

608 except self._RETRIES_EXCEEDED_ERROR_CLS: 

609 logger.debug( 

610 "Max number of attempts exceeded (%s) when " 

611 "attempting to retrieve data from metadata service.", 

612 self._num_attempts, 

613 ) 

614 except BadIMDSRequestError as e: 

615 logger.debug("Bad IMDS request: %s", e.request) 

616 return {} 

617 

618 def _get_iam_role(self, token=None): 

619 return self._get_request( 

620 url_path=self._URL_PATH, 

621 retry_func=self._needs_retry_for_role_name, 

622 token=token, 

623 ).text 

624 

625 def _get_credentials(self, role_name, token=None): 

626 r = self._get_request( 

627 url_path=self._URL_PATH + role_name, 

628 retry_func=self._needs_retry_for_credentials, 

629 token=token, 

630 ) 

631 return json.loads(r.text) 

632 

633 def _is_invalid_json(self, response): 

634 try: 

635 json.loads(response.text) 

636 return False 

637 except ValueError: 

638 self._log_imds_response(response, 'invalid json') 

639 return True 

640 

641 def _needs_retry_for_role_name(self, response): 

642 return self._is_non_ok_response(response) or self._is_empty(response) 

643 

644 def _needs_retry_for_credentials(self, response): 

645 return ( 

646 self._is_non_ok_response(response) 

647 or self._is_empty(response) 

648 or self._is_invalid_json(response) 

649 ) 

650 

651 def _contains_all_credential_fields(self, credentials): 

652 for field in self._REQUIRED_CREDENTIAL_FIELDS: 

653 if field not in credentials: 

654 logger.debug( 

655 'Retrieved credentials is missing required field: %s', 

656 field, 

657 ) 

658 return False 

659 return True 

660 

661 def _evaluate_expiration(self, credentials): 

662 expiration = credentials.get("expiry_time") 

663 if expiration is None: 

664 return 

665 try: 

666 expiration = datetime.datetime.strptime( 

667 expiration, "%Y-%m-%dT%H:%M:%SZ" 

668 ) 

669 refresh_interval = self._config.get( 

670 "ec2_credential_refresh_window", 60 * 10 

671 ) 

672 jitter = random.randint(120, 600) # Between 2 to 10 minutes 

673 refresh_interval_with_jitter = refresh_interval + jitter 

674 current_time = datetime.datetime.utcnow() 

675 refresh_offset = datetime.timedelta( 

676 seconds=refresh_interval_with_jitter 

677 ) 

678 extension_time = expiration - refresh_offset 

679 if current_time >= extension_time: 

680 new_time = current_time + refresh_offset 

681 credentials["expiry_time"] = new_time.strftime( 

682 "%Y-%m-%dT%H:%M:%SZ" 

683 ) 

684 logger.info( 

685 f"Attempting credential expiration extension due to a " 

686 f"credential service availability issue. A refresh of " 

687 f"these credentials will be attempted again within " 

688 f"the next {refresh_interval_with_jitter/60:.0f} minutes." 

689 ) 

690 except ValueError: 

691 logger.debug( 

692 f"Unable to parse expiry_time in {credentials['expiry_time']}" 

693 ) 

694 

695 

696class IMDSRegionProvider: 

697 def __init__(self, session, environ=None, fetcher=None): 

698 """Initialize IMDSRegionProvider. 

699 :type session: :class:`botocore.session.Session` 

700 :param session: The session is needed to look up configuration for 

701 how to contact the instance metadata service. Specifically the 

702 whether or not it should use the IMDS region at all, and if so how 

703 to configure the timeout and number of attempts to reach the 

704 service. 

705 :type environ: None or dict 

706 :param environ: A dictionary of environment variables to use. If 

707 ``None`` is the argument then ``os.environ`` will be used by 

708 default. 

709 :type fecther: :class:`botocore.utils.InstanceMetadataRegionFetcher` 

710 :param fetcher: The class to actually handle the fetching of the region 

711 from the IMDS. If not provided a default one will be created. 

712 """ 

713 self._session = session 

714 if environ is None: 

715 environ = os.environ 

716 self._environ = environ 

717 self._fetcher = fetcher 

718 

719 def provide(self): 

720 """Provide the region value from IMDS.""" 

721 instance_region = self._get_instance_metadata_region() 

722 return instance_region 

723 

724 def _get_instance_metadata_region(self): 

725 fetcher = self._get_fetcher() 

726 region = fetcher.retrieve_region() 

727 return region 

728 

729 def _get_fetcher(self): 

730 if self._fetcher is None: 

731 self._fetcher = self._create_fetcher() 

732 return self._fetcher 

733 

734 def _create_fetcher(self): 

735 metadata_timeout = self._session.get_config_variable( 

736 'metadata_service_timeout' 

737 ) 

738 metadata_num_attempts = self._session.get_config_variable( 

739 'metadata_service_num_attempts' 

740 ) 

741 imds_config = { 

742 'ec2_metadata_service_endpoint': self._session.get_config_variable( 

743 'ec2_metadata_service_endpoint' 

744 ), 

745 'ec2_metadata_service_endpoint_mode': resolve_imds_endpoint_mode( 

746 self._session 

747 ), 

748 'ec2_metadata_v1_disabled': self._session.get_config_variable( 

749 'ec2_metadata_v1_disabled' 

750 ), 

751 } 

752 fetcher = InstanceMetadataRegionFetcher( 

753 timeout=metadata_timeout, 

754 num_attempts=metadata_num_attempts, 

755 env=self._environ, 

756 user_agent=self._session.user_agent(), 

757 config=imds_config, 

758 ) 

759 return fetcher 

760 

761 

762class InstanceMetadataRegionFetcher(IMDSFetcher): 

763 _URL_PATH = 'latest/meta-data/placement/availability-zone/' 

764 

765 def retrieve_region(self): 

766 """Get the current region from the instance metadata service. 

767 :rvalue: str 

768 :returns: The region the current instance is running in or None 

769 if the instance metadata service cannot be contacted or does not 

770 give a valid response. 

771 :rtype: None or str 

772 :returns: Returns the region as a string if it is configured to use 

773 IMDS as a region source. Otherwise returns ``None``. It will also 

774 return ``None`` if it fails to get the region from IMDS due to 

775 exhausting its retries or not being able to connect. 

776 """ 

777 try: 

778 region = self._get_region() 

779 return region 

780 except self._RETRIES_EXCEEDED_ERROR_CLS: 

781 logger.debug( 

782 "Max number of attempts exceeded (%s) when " 

783 "attempting to retrieve data from metadata service.", 

784 self._num_attempts, 

785 ) 

786 return None 

787 

788 def _get_region(self): 

789 token = self._fetch_metadata_token() 

790 response = self._get_request( 

791 url_path=self._URL_PATH, 

792 retry_func=self._default_retry, 

793 token=token, 

794 ) 

795 availability_zone = response.text 

796 region = availability_zone[:-1] 

797 return region 

798 

799 

800def merge_dicts(dict1, dict2, append_lists=False): 

801 """Given two dict, merge the second dict into the first. 

802 

803 The dicts can have arbitrary nesting. 

804 

805 :param append_lists: If true, instead of clobbering a list with the new 

806 value, append all of the new values onto the original list. 

807 """ 

808 for key in dict2: 

809 if isinstance(dict2[key], dict): 

810 if key in dict1 and key in dict2: 

811 merge_dicts(dict1[key], dict2[key]) 

812 else: 

813 dict1[key] = dict2[key] 

814 # If the value is a list and the ``append_lists`` flag is set, 

815 # append the new values onto the original list 

816 elif isinstance(dict2[key], list) and append_lists: 

817 # The value in dict1 must be a list in order to append new 

818 # values onto it. 

819 if key in dict1 and isinstance(dict1[key], list): 

820 dict1[key].extend(dict2[key]) 

821 else: 

822 dict1[key] = dict2[key] 

823 else: 

824 # At scalar types, we iterate and merge the 

825 # current dict that we're on. 

826 dict1[key] = dict2[key] 

827 

828 

829def lowercase_dict(original): 

830 """Copies the given dictionary ensuring all keys are lowercase strings.""" 

831 copy = {} 

832 for key in original: 

833 copy[key.lower()] = original[key] 

834 return copy 

835 

836 

837def parse_key_val_file(filename, _open=open): 

838 try: 

839 with _open(filename) as f: 

840 contents = f.read() 

841 return parse_key_val_file_contents(contents) 

842 except OSError: 

843 raise ConfigNotFound(path=filename) 

844 

845 

846def parse_key_val_file_contents(contents): 

847 # This was originally extracted from the EC2 credential provider, which was 

848 # fairly lenient in its parsing. We only try to parse key/val pairs if 

849 # there's a '=' in the line. 

850 final = {} 

851 for line in contents.splitlines(): 

852 if '=' not in line: 

853 continue 

854 key, val = line.split('=', 1) 

855 key = key.strip() 

856 val = val.strip() 

857 final[key] = val 

858 return final 

859 

860 

861def percent_encode_sequence(mapping, safe=SAFE_CHARS): 

862 """Urlencode a dict or list into a string. 

863 

864 This is similar to urllib.urlencode except that: 

865 

866 * It uses quote, and not quote_plus 

867 * It has a default list of safe chars that don't need 

868 to be encoded, which matches what AWS services expect. 

869 

870 If any value in the input ``mapping`` is a list type, 

871 then each list element wil be serialized. This is the equivalent 

872 to ``urlencode``'s ``doseq=True`` argument. 

873 

874 This function should be preferred over the stdlib 

875 ``urlencode()`` function. 

876 

877 :param mapping: Either a dict to urlencode or a list of 

878 ``(key, value)`` pairs. 

879 

880 """ 

881 encoded_pairs = [] 

882 if hasattr(mapping, 'items'): 

883 pairs = mapping.items() 

884 else: 

885 pairs = mapping 

886 for key, value in pairs: 

887 if isinstance(value, list): 

888 for element in value: 

889 encoded_pairs.append( 

890 f'{percent_encode(key)}={percent_encode(element)}' 

891 ) 

892 else: 

893 encoded_pairs.append( 

894 f'{percent_encode(key)}={percent_encode(value)}' 

895 ) 

896 return '&'.join(encoded_pairs) 

897 

898 

899def percent_encode(input_str, safe=SAFE_CHARS): 

900 """Urlencodes a string. 

901 

902 Whereas percent_encode_sequence handles taking a dict/sequence and 

903 producing a percent encoded string, this function deals only with 

904 taking a string (not a dict/sequence) and percent encoding it. 

905 

906 If given the binary type, will simply URL encode it. If given the 

907 text type, will produce the binary type by UTF-8 encoding the 

908 text. If given something else, will convert it to the text type 

909 first. 

910 """ 

911 # If its not a binary or text string, make it a text string. 

912 if not isinstance(input_str, (bytes, str)): 

913 input_str = str(input_str) 

914 # If it's not bytes, make it bytes by UTF-8 encoding it. 

915 if not isinstance(input_str, bytes): 

916 input_str = input_str.encode('utf-8') 

917 return quote(input_str, safe=safe) 

918 

919 

920def _epoch_seconds_to_datetime(value, tzinfo): 

921 """Parse numerical epoch timestamps (seconds since 1970) into a 

922 ``datetime.datetime`` in UTC using ``datetime.timedelta``. This is intended 

923 as fallback when ``fromtimestamp`` raises ``OverflowError`` or ``OSError``. 

924 

925 :type value: float or int 

926 :param value: The Unix timestamps as number. 

927 

928 :type tzinfo: callable 

929 :param tzinfo: A ``datetime.tzinfo`` class or compatible callable. 

930 """ 

931 epoch_zero = datetime.datetime(1970, 1, 1, 0, 0, 0, tzinfo=tzutc()) 

932 epoch_zero_localized = epoch_zero.astimezone(tzinfo()) 

933 return epoch_zero_localized + datetime.timedelta(seconds=value) 

934 

935 

936def _parse_timestamp_with_tzinfo(value, tzinfo): 

937 """Parse timestamp with pluggable tzinfo options.""" 

938 if isinstance(value, (int, float)): 

939 # Possibly an epoch time. 

940 return datetime.datetime.fromtimestamp(value, tzinfo()) 

941 else: 

942 try: 

943 return datetime.datetime.fromtimestamp(float(value), tzinfo()) 

944 except (TypeError, ValueError): 

945 pass 

946 try: 

947 # In certain cases, a timestamp marked with GMT can be parsed into a 

948 # different time zone, so here we provide a context which will 

949 # enforce that GMT == UTC. 

950 return dateutil.parser.parse(value, tzinfos={'GMT': tzutc()}) 

951 except (TypeError, ValueError) as e: 

952 raise ValueError(f'Invalid timestamp "{value}": {e}') 

953 

954 

955def parse_timestamp(value): 

956 """Parse a timestamp into a datetime object. 

957 

958 Supported formats: 

959 

960 * iso8601 

961 * rfc822 

962 * epoch (value is an integer) 

963 

964 This will return a ``datetime.datetime`` object. 

965 

966 """ 

967 tzinfo_options = get_tzinfo_options() 

968 for tzinfo in tzinfo_options: 

969 try: 

970 return _parse_timestamp_with_tzinfo(value, tzinfo) 

971 except (OSError, OverflowError) as e: 

972 logger.debug( 

973 'Unable to parse timestamp with "%s" timezone info.', 

974 tzinfo.__name__, 

975 exc_info=e, 

976 ) 

977 # For numeric values attempt fallback to using fromtimestamp-free method. 

978 # From Python's ``datetime.datetime.fromtimestamp`` documentation: "This 

979 # may raise ``OverflowError``, if the timestamp is out of the range of 

980 # values supported by the platform C localtime() function, and ``OSError`` 

981 # on localtime() failure. It's common for this to be restricted to years 

982 # from 1970 through 2038." 

983 try: 

984 numeric_value = float(value) 

985 except (TypeError, ValueError): 

986 pass 

987 else: 

988 try: 

989 for tzinfo in tzinfo_options: 

990 return _epoch_seconds_to_datetime(numeric_value, tzinfo=tzinfo) 

991 except (OSError, OverflowError) as e: 

992 logger.debug( 

993 'Unable to parse timestamp using fallback method with "%s" ' 

994 'timezone info.', 

995 tzinfo.__name__, 

996 exc_info=e, 

997 ) 

998 raise RuntimeError( 

999 f'Unable to calculate correct timezone offset for "{value}"' 

1000 ) 

1001 

1002 

1003def parse_to_aware_datetime(value): 

1004 """Converted the passed in value to a datetime object with tzinfo. 

1005 

1006 This function can be used to normalize all timestamp inputs. This 

1007 function accepts a number of different types of inputs, but 

1008 will always return a datetime.datetime object with time zone 

1009 information. 

1010 

1011 The input param ``value`` can be one of several types: 

1012 

1013 * A datetime object (both naive and aware) 

1014 * An integer representing the epoch time (can also be a string 

1015 of the integer, i.e '0', instead of 0). The epoch time is 

1016 considered to be UTC. 

1017 * An iso8601 formatted timestamp. This does not need to be 

1018 a complete timestamp, it can contain just the date portion 

1019 without the time component. 

1020 

1021 The returned value will be a datetime object that will have tzinfo. 

1022 If no timezone info was provided in the input value, then UTC is 

1023 assumed, not local time. 

1024 

1025 """ 

1026 # This is a general purpose method that handles several cases of 

1027 # converting the provided value to a string timestamp suitable to be 

1028 # serialized to an http request. It can handle: 

1029 # 1) A datetime.datetime object. 

1030 if isinstance(value, _DatetimeClass): 

1031 datetime_obj = value 

1032 else: 

1033 # 2) A string object that's formatted as a timestamp. 

1034 # We document this as being an iso8601 timestamp, although 

1035 # parse_timestamp is a bit more flexible. 

1036 datetime_obj = parse_timestamp(value) 

1037 if datetime_obj.tzinfo is None: 

1038 # I think a case would be made that if no time zone is provided, 

1039 # we should use the local time. However, to restore backwards 

1040 # compat, the previous behavior was to assume UTC, which is 

1041 # what we're going to do here. 

1042 datetime_obj = datetime_obj.replace(tzinfo=tzutc()) 

1043 else: 

1044 datetime_obj = datetime_obj.astimezone(tzutc()) 

1045 return datetime_obj 

1046 

1047 

1048def datetime2timestamp(dt, default_timezone=None): 

1049 """Calculate the timestamp based on the given datetime instance. 

1050 

1051 :type dt: datetime 

1052 :param dt: A datetime object to be converted into timestamp 

1053 :type default_timezone: tzinfo 

1054 :param default_timezone: If it is provided as None, we treat it as tzutc(). 

1055 But it is only used when dt is a naive datetime. 

1056 :returns: The timestamp 

1057 """ 

1058 epoch = datetime.datetime(1970, 1, 1) 

1059 if dt.tzinfo is None: 

1060 if default_timezone is None: 

1061 default_timezone = tzutc() 

1062 dt = dt.replace(tzinfo=default_timezone) 

1063 d = dt.replace(tzinfo=None) - dt.utcoffset() - epoch 

1064 return d.total_seconds() 

1065 

1066 

1067def calculate_sha256(body, as_hex=False): 

1068 """Calculate a sha256 checksum. 

1069 

1070 This method will calculate the sha256 checksum of a file like 

1071 object. Note that this method will iterate through the entire 

1072 file contents. The caller is responsible for ensuring the proper 

1073 starting position of the file and ``seek()``'ing the file back 

1074 to its starting location if other consumers need to read from 

1075 the file like object. 

1076 

1077 :param body: Any file like object. The file must be opened 

1078 in binary mode such that a ``.read()`` call returns bytes. 

1079 :param as_hex: If True, then the hex digest is returned. 

1080 If False, then the digest (as binary bytes) is returned. 

1081 

1082 :returns: The sha256 checksum 

1083 

1084 """ 

1085 checksum = hashlib.sha256() 

1086 for chunk in iter(lambda: body.read(1024 * 1024), b''): 

1087 checksum.update(chunk) 

1088 if as_hex: 

1089 return checksum.hexdigest() 

1090 else: 

1091 return checksum.digest() 

1092 

1093 

1094def calculate_tree_hash(body): 

1095 """Calculate a tree hash checksum. 

1096 

1097 For more information see: 

1098 

1099 http://docs.aws.amazon.com/amazonglacier/latest/dev/checksum-calculations.html 

1100 

1101 :param body: Any file like object. This has the same constraints as 

1102 the ``body`` param in calculate_sha256 

1103 

1104 :rtype: str 

1105 :returns: The hex version of the calculated tree hash 

1106 

1107 """ 

1108 chunks = [] 

1109 required_chunk_size = 1024 * 1024 

1110 sha256 = hashlib.sha256 

1111 for chunk in iter(lambda: body.read(required_chunk_size), b''): 

1112 chunks.append(sha256(chunk).digest()) 

1113 if not chunks: 

1114 return sha256(b'').hexdigest() 

1115 while len(chunks) > 1: 

1116 new_chunks = [] 

1117 for first, second in _in_pairs(chunks): 

1118 if second is not None: 

1119 new_chunks.append(sha256(first + second).digest()) 

1120 else: 

1121 # We're at the end of the list and there's no pair left. 

1122 new_chunks.append(first) 

1123 chunks = new_chunks 

1124 return binascii.hexlify(chunks[0]).decode('ascii') 

1125 

1126 

1127def _in_pairs(iterable): 

1128 # Creates iterator that iterates over the list in pairs: 

1129 # for a, b in _in_pairs([0, 1, 2, 3, 4]): 

1130 # print(a, b) 

1131 # 

1132 # will print: 

1133 # 0, 1 

1134 # 2, 3 

1135 # 4, None 

1136 shared_iter = iter(iterable) 

1137 # Note that zip_longest is a compat import that uses 

1138 # the itertools izip_longest. This creates an iterator, 

1139 # this call below does _not_ immediately create the list 

1140 # of pairs. 

1141 return zip_longest(shared_iter, shared_iter) 

1142 

1143 

1144class CachedProperty: 

1145 """A read only property that caches the initially computed value. 

1146 

1147 This descriptor will only call the provided ``fget`` function once. 

1148 Subsequent access to this property will return the cached value. 

1149 

1150 """ 

1151 

1152 def __init__(self, fget): 

1153 self._fget = fget 

1154 

1155 def __get__(self, obj, cls): 

1156 if obj is None: 

1157 return self 

1158 else: 

1159 computed_value = self._fget(obj) 

1160 obj.__dict__[self._fget.__name__] = computed_value 

1161 return computed_value 

1162 

1163 

1164class ArgumentGenerator: 

1165 """Generate sample input based on a shape model. 

1166 

1167 This class contains a ``generate_skeleton`` method that will take 

1168 an input/output shape (created from ``botocore.model``) and generate 

1169 a sample dictionary corresponding to the input/output shape. 

1170 

1171 The specific values used are place holder values. For strings either an 

1172 empty string or the member name can be used, for numbers 0 or 0.0 is used. 

1173 The intended usage of this class is to generate the *shape* of the input 

1174 structure. 

1175 

1176 This can be useful for operations that have complex input shapes. 

1177 This allows a user to just fill in the necessary data instead of 

1178 worrying about the specific structure of the input arguments. 

1179 

1180 Example usage:: 

1181 

1182 s = botocore.session.get_session() 

1183 ddb = s.get_service_model('dynamodb') 

1184 arg_gen = ArgumentGenerator() 

1185 sample_input = arg_gen.generate_skeleton( 

1186 ddb.operation_model('CreateTable').input_shape) 

1187 print("Sample input for dynamodb.CreateTable: %s" % sample_input) 

1188 

1189 """ 

1190 

1191 def __init__(self, use_member_names=False): 

1192 self._use_member_names = use_member_names 

1193 

1194 def generate_skeleton(self, shape): 

1195 """Generate a sample input. 

1196 

1197 :type shape: ``botocore.model.Shape`` 

1198 :param shape: The input shape. 

1199 

1200 :return: The generated skeleton input corresponding to the 

1201 provided input shape. 

1202 

1203 """ 

1204 stack = [] 

1205 return self._generate_skeleton(shape, stack) 

1206 

1207 def _generate_skeleton(self, shape, stack, name=''): 

1208 stack.append(shape.name) 

1209 try: 

1210 if shape.type_name == 'structure': 

1211 return self._generate_type_structure(shape, stack) 

1212 elif shape.type_name == 'list': 

1213 return self._generate_type_list(shape, stack) 

1214 elif shape.type_name == 'map': 

1215 return self._generate_type_map(shape, stack) 

1216 elif shape.type_name == 'string': 

1217 if self._use_member_names: 

1218 return name 

1219 if shape.enum: 

1220 return random.choice(shape.enum) 

1221 return '' 

1222 elif shape.type_name in ['integer', 'long']: 

1223 return 0 

1224 elif shape.type_name in ['float', 'double']: 

1225 return 0.0 

1226 elif shape.type_name == 'boolean': 

1227 return True 

1228 elif shape.type_name == 'timestamp': 

1229 return datetime.datetime(1970, 1, 1, 0, 0, 0) 

1230 finally: 

1231 stack.pop() 

1232 

1233 def _generate_type_structure(self, shape, stack): 

1234 if stack.count(shape.name) > 1: 

1235 return {} 

1236 skeleton = OrderedDict() 

1237 for member_name, member_shape in shape.members.items(): 

1238 skeleton[member_name] = self._generate_skeleton( 

1239 member_shape, stack, name=member_name 

1240 ) 

1241 return skeleton 

1242 

1243 def _generate_type_list(self, shape, stack): 

1244 # For list elements we've arbitrarily decided to 

1245 # return two elements for the skeleton list. 

1246 name = '' 

1247 if self._use_member_names: 

1248 name = shape.member.name 

1249 return [ 

1250 self._generate_skeleton(shape.member, stack, name), 

1251 ] 

1252 

1253 def _generate_type_map(self, shape, stack): 

1254 key_shape = shape.key 

1255 value_shape = shape.value 

1256 assert key_shape.type_name == 'string' 

1257 return OrderedDict( 

1258 [ 

1259 ('KeyName', self._generate_skeleton(value_shape, stack)), 

1260 ] 

1261 ) 

1262 

1263 

1264def is_valid_ipv6_endpoint_url(endpoint_url): 

1265 if UNSAFE_URL_CHARS.intersection(endpoint_url): 

1266 return False 

1267 hostname = f'[{urlparse(endpoint_url).hostname}]' 

1268 return IPV6_ADDRZ_RE.match(hostname) is not None 

1269 

1270 

1271def is_valid_ipv4_endpoint_url(endpoint_url): 

1272 hostname = urlparse(endpoint_url).hostname 

1273 return IPV4_RE.match(hostname) is not None 

1274 

1275 

1276def is_valid_endpoint_url(endpoint_url): 

1277 """Verify the endpoint_url is valid. 

1278 

1279 :type endpoint_url: string 

1280 :param endpoint_url: An endpoint_url. Must have at least a scheme 

1281 and a hostname. 

1282 

1283 :return: True if the endpoint url is valid. False otherwise. 

1284 

1285 """ 

1286 # post-bpo-43882 urlsplit() strips unsafe characters from URL, causing 

1287 # it to pass hostname validation below. Detect them early to fix that. 

1288 if UNSAFE_URL_CHARS.intersection(endpoint_url): 

1289 return False 

1290 parts = urlsplit(endpoint_url) 

1291 hostname = parts.hostname 

1292 if hostname is None: 

1293 return False 

1294 if len(hostname) > 255: 

1295 return False 

1296 if hostname[-1] == ".": 

1297 hostname = hostname[:-1] 

1298 allowed = re.compile( 

1299 r"^((?!-)[A-Z\d-]{1,63}(?<!-)\.)*((?!-)[A-Z\d-]{1,63}(?<!-))$", 

1300 re.IGNORECASE, 

1301 ) 

1302 return allowed.match(hostname) 

1303 

1304 

1305def is_valid_uri(endpoint_url): 

1306 return is_valid_endpoint_url(endpoint_url) or is_valid_ipv6_endpoint_url( 

1307 endpoint_url 

1308 ) 

1309 

1310 

1311def validate_region_name(region_name): 

1312 """Provided region_name must be a valid host label.""" 

1313 if region_name is None: 

1314 return 

1315 valid_host_label = re.compile(r'^(?![0-9]+$)(?!-)[a-zA-Z0-9-]{,63}(?<!-)$') 

1316 valid = valid_host_label.match(region_name) 

1317 if not valid: 

1318 raise InvalidRegionError(region_name=region_name) 

1319 

1320 

1321def check_dns_name(bucket_name): 

1322 """ 

1323 Check to see if the ``bucket_name`` complies with the 

1324 restricted DNS naming conventions necessary to allow 

1325 access via virtual-hosting style. 

1326 

1327 Even though "." characters are perfectly valid in this DNS 

1328 naming scheme, we are going to punt on any name containing a 

1329 "." character because these will cause SSL cert validation 

1330 problems if we try to use virtual-hosting style addressing. 

1331 """ 

1332 if '.' in bucket_name: 

1333 return False 

1334 n = len(bucket_name) 

1335 if n < 3 or n > 63: 

1336 # Wrong length 

1337 return False 

1338 match = LABEL_RE.match(bucket_name) 

1339 if match is None or match.end() != len(bucket_name): 

1340 return False 

1341 return True 

1342 

1343 

1344def fix_s3_host( 

1345 request, 

1346 signature_version, 

1347 region_name, 

1348 default_endpoint_url=None, 

1349 **kwargs, 

1350): 

1351 """ 

1352 This handler looks at S3 requests just before they are signed. 

1353 If there is a bucket name on the path (true for everything except 

1354 ListAllBuckets) it checks to see if that bucket name conforms to 

1355 the DNS naming conventions. If it does, it alters the request to 

1356 use ``virtual hosting`` style addressing rather than ``path-style`` 

1357 addressing. 

1358 

1359 """ 

1360 if request.context.get('use_global_endpoint', False): 

1361 default_endpoint_url = 's3.amazonaws.com' 

1362 try: 

1363 switch_to_virtual_host_style( 

1364 request, signature_version, default_endpoint_url 

1365 ) 

1366 except InvalidDNSNameError as e: 

1367 bucket_name = e.kwargs['bucket_name'] 

1368 logger.debug( 

1369 'Not changing URI, bucket is not DNS compatible: %s', bucket_name 

1370 ) 

1371 

1372 

1373def switch_to_virtual_host_style( 

1374 request, signature_version, default_endpoint_url=None, **kwargs 

1375): 

1376 """ 

1377 This is a handler to force virtual host style s3 addressing no matter 

1378 the signature version (which is taken in consideration for the default 

1379 case). If the bucket is not DNS compatible an InvalidDNSName is thrown. 

1380 

1381 :param request: A AWSRequest object that is about to be sent. 

1382 :param signature_version: The signature version to sign with 

1383 :param default_endpoint_url: The endpoint to use when switching to a 

1384 virtual style. If None is supplied, the virtual host will be 

1385 constructed from the url of the request. 

1386 """ 

1387 if request.auth_path is not None: 

1388 # The auth_path has already been applied (this may be a 

1389 # retried request). We don't need to perform this 

1390 # customization again. 

1391 return 

1392 elif _is_get_bucket_location_request(request): 

1393 # For the GetBucketLocation response, we should not be using 

1394 # the virtual host style addressing so we can avoid any sigv4 

1395 # issues. 

1396 logger.debug( 

1397 "Request is GetBucketLocation operation, not checking " 

1398 "for DNS compatibility." 

1399 ) 

1400 return 

1401 parts = urlsplit(request.url) 

1402 request.auth_path = parts.path 

1403 path_parts = parts.path.split('/') 

1404 

1405 # Retrieve what the endpoint we will be prepending the bucket name to. 

1406 if default_endpoint_url is None: 

1407 default_endpoint_url = parts.netloc 

1408 

1409 if len(path_parts) > 1: 

1410 bucket_name = path_parts[1] 

1411 if not bucket_name: 

1412 # If the bucket name is empty we should not be checking for 

1413 # dns compatibility. 

1414 return 

1415 logger.debug('Checking for DNS compatible bucket for: %s', request.url) 

1416 if check_dns_name(bucket_name): 

1417 # If the operation is on a bucket, the auth_path must be 

1418 # terminated with a '/' character. 

1419 if len(path_parts) == 2: 

1420 if request.auth_path[-1] != '/': 

1421 request.auth_path += '/' 

1422 path_parts.remove(bucket_name) 

1423 # At the very least the path must be a '/', such as with the 

1424 # CreateBucket operation when DNS style is being used. If this 

1425 # is not used you will get an empty path which is incorrect. 

1426 path = '/'.join(path_parts) or '/' 

1427 global_endpoint = default_endpoint_url 

1428 host = bucket_name + '.' + global_endpoint 

1429 new_tuple = (parts.scheme, host, path, parts.query, '') 

1430 new_uri = urlunsplit(new_tuple) 

1431 request.url = new_uri 

1432 logger.debug('URI updated to: %s', new_uri) 

1433 else: 

1434 raise InvalidDNSNameError(bucket_name=bucket_name) 

1435 

1436 

1437def _is_get_bucket_location_request(request): 

1438 return request.url.endswith('?location') 

1439 

1440 

1441def instance_cache(func): 

1442 """Method decorator for caching method calls to a single instance. 

1443 

1444 **This is not a general purpose caching decorator.** 

1445 

1446 In order to use this, you *must* provide an ``_instance_cache`` 

1447 attribute on the instance. 

1448 

1449 This decorator is used to cache method calls. The cache is only 

1450 scoped to a single instance though such that multiple instances 

1451 will maintain their own cache. In order to keep things simple, 

1452 this decorator requires that you provide an ``_instance_cache`` 

1453 attribute on your instance. 

1454 

1455 """ 

1456 func_name = func.__name__ 

1457 

1458 @functools.wraps(func) 

1459 def _cache_guard(self, *args, **kwargs): 

1460 cache_key = (func_name, args) 

1461 if kwargs: 

1462 kwarg_items = tuple(sorted(kwargs.items())) 

1463 cache_key = (func_name, args, kwarg_items) 

1464 result = self._instance_cache.get(cache_key) 

1465 if result is not None: 

1466 return result 

1467 result = func(self, *args, **kwargs) 

1468 self._instance_cache[cache_key] = result 

1469 return result 

1470 

1471 return _cache_guard 

1472 

1473 

1474def lru_cache_weakref(*cache_args, **cache_kwargs): 

1475 """ 

1476 Version of functools.lru_cache that stores a weak reference to ``self``. 

1477 

1478 Serves the same purpose as :py:func:`instance_cache` but uses Python's 

1479 functools implementation which offers ``max_size`` and ``typed`` properties. 

1480 

1481 lru_cache is a global cache even when used on a method. The cache's 

1482 reference to ``self`` will prevent garbace collection of the object. This 

1483 wrapper around functools.lru_cache replaces the reference to ``self`` with 

1484 a weak reference to not interfere with garbage collection. 

1485 """ 

1486 

1487 def wrapper(func): 

1488 @functools.lru_cache(*cache_args, **cache_kwargs) 

1489 def func_with_weakref(weakref_to_self, *args, **kwargs): 

1490 return func(weakref_to_self(), *args, **kwargs) 

1491 

1492 @functools.wraps(func) 

1493 def inner(self, *args, **kwargs): 

1494 return func_with_weakref(weakref.ref(self), *args, **kwargs) 

1495 

1496 inner.cache_info = func_with_weakref.cache_info 

1497 return inner 

1498 

1499 return wrapper 

1500 

1501 

1502def switch_host_s3_accelerate(request, operation_name, **kwargs): 

1503 """Switches the current s3 endpoint with an S3 Accelerate endpoint""" 

1504 

1505 # Note that when registered the switching of the s3 host happens 

1506 # before it gets changed to virtual. So we are not concerned with ensuring 

1507 # that the bucket name is translated to the virtual style here and we 

1508 # can hard code the Accelerate endpoint. 

1509 parts = urlsplit(request.url).netloc.split('.') 

1510 parts = [p for p in parts if p in S3_ACCELERATE_WHITELIST] 

1511 endpoint = 'https://s3-accelerate.' 

1512 if len(parts) > 0: 

1513 endpoint += '.'.join(parts) + '.' 

1514 endpoint += 'amazonaws.com' 

1515 

1516 if operation_name in ['ListBuckets', 'CreateBucket', 'DeleteBucket']: 

1517 return 

1518 _switch_hosts(request, endpoint, use_new_scheme=False) 

1519 

1520 

1521def switch_host_with_param(request, param_name): 

1522 """Switches the host using a parameter value from a JSON request body""" 

1523 request_json = json.loads(request.data.decode('utf-8')) 

1524 if request_json.get(param_name): 

1525 new_endpoint = request_json[param_name] 

1526 _switch_hosts(request, new_endpoint) 

1527 

1528 

1529def _switch_hosts(request, new_endpoint, use_new_scheme=True): 

1530 final_endpoint = _get_new_endpoint( 

1531 request.url, new_endpoint, use_new_scheme 

1532 ) 

1533 request.url = final_endpoint 

1534 

1535 

1536def _get_new_endpoint(original_endpoint, new_endpoint, use_new_scheme=True): 

1537 new_endpoint_components = urlsplit(new_endpoint) 

1538 original_endpoint_components = urlsplit(original_endpoint) 

1539 scheme = original_endpoint_components.scheme 

1540 if use_new_scheme: 

1541 scheme = new_endpoint_components.scheme 

1542 final_endpoint_components = ( 

1543 scheme, 

1544 new_endpoint_components.netloc, 

1545 original_endpoint_components.path, 

1546 original_endpoint_components.query, 

1547 '', 

1548 ) 

1549 final_endpoint = urlunsplit(final_endpoint_components) 

1550 logger.debug(f'Updating URI from {original_endpoint} to {final_endpoint}') 

1551 return final_endpoint 

1552 

1553 

1554def deep_merge(base, extra): 

1555 """Deeply two dictionaries, overriding existing keys in the base. 

1556 

1557 :param base: The base dictionary which will be merged into. 

1558 :param extra: The dictionary to merge into the base. Keys from this 

1559 dictionary will take precedence. 

1560 """ 

1561 for key in extra: 

1562 # If the key represents a dict on both given dicts, merge the sub-dicts 

1563 if ( 

1564 key in base 

1565 and isinstance(base[key], dict) 

1566 and isinstance(extra[key], dict) 

1567 ): 

1568 deep_merge(base[key], extra[key]) 

1569 continue 

1570 

1571 # Otherwise, set the key on the base to be the value of the extra. 

1572 base[key] = extra[key] 

1573 

1574 

1575def hyphenize_service_id(service_id): 

1576 """Translate the form used for event emitters. 

1577 

1578 :param service_id: The service_id to convert. 

1579 """ 

1580 return service_id.replace(' ', '-').lower() 

1581 

1582 

1583class IdentityCache: 

1584 """Base IdentityCache implementation for storing and retrieving 

1585 highly accessed credentials. 

1586 

1587 This class is not intended to be instantiated in user code. 

1588 """ 

1589 

1590 METHOD = "base_identity_cache" 

1591 

1592 def __init__(self, client, credential_cls): 

1593 self._client = client 

1594 self._credential_cls = credential_cls 

1595 

1596 def get_credentials(self, **kwargs): 

1597 callback = self.build_refresh_callback(**kwargs) 

1598 metadata = callback() 

1599 credential_entry = self._credential_cls.create_from_metadata( 

1600 metadata=metadata, 

1601 refresh_using=callback, 

1602 method=self.METHOD, 

1603 advisory_timeout=45, 

1604 mandatory_timeout=10, 

1605 ) 

1606 return credential_entry 

1607 

1608 def build_refresh_callback(**kwargs): 

1609 """Callback to be implemented by subclasses. 

1610 

1611 Returns a set of metadata to be converted into a new 

1612 credential instance. 

1613 """ 

1614 raise NotImplementedError() 

1615 

1616 

1617class S3ExpressIdentityCache(IdentityCache): 

1618 """S3Express IdentityCache for retrieving and storing 

1619 credentials from CreateSession calls. 

1620 

1621 This class is not intended to be instantiated in user code. 

1622 """ 

1623 

1624 METHOD = "s3express" 

1625 

1626 def __init__(self, client, credential_cls): 

1627 self._client = client 

1628 self._credential_cls = credential_cls 

1629 

1630 @functools.lru_cache(maxsize=100) 

1631 def get_credentials(self, bucket): 

1632 return super().get_credentials(bucket=bucket) 

1633 

1634 def build_refresh_callback(self, bucket): 

1635 def refresher(): 

1636 response = self._client.create_session(Bucket=bucket) 

1637 creds = response['Credentials'] 

1638 expiration = self._serialize_if_needed( 

1639 creds['Expiration'], iso=True 

1640 ) 

1641 return { 

1642 "access_key": creds['AccessKeyId'], 

1643 "secret_key": creds['SecretAccessKey'], 

1644 "token": creds['SessionToken'], 

1645 "expiry_time": expiration, 

1646 } 

1647 

1648 return refresher 

1649 

1650 def _serialize_if_needed(self, value, iso=False): 

1651 if isinstance(value, _DatetimeClass): 

1652 if iso: 

1653 return value.isoformat() 

1654 return value.strftime('%Y-%m-%dT%H:%M:%S%Z') 

1655 return value 

1656 

1657 

1658class S3ExpressIdentityResolver: 

1659 def __init__(self, client, credential_cls, cache=None): 

1660 self._client = weakref.proxy(client) 

1661 

1662 if cache is None: 

1663 cache = S3ExpressIdentityCache(self._client, credential_cls) 

1664 self._cache = cache 

1665 

1666 def register(self, event_emitter=None): 

1667 logger.debug('Registering S3Express Identity Resolver') 

1668 emitter = event_emitter or self._client.meta.events 

1669 emitter.register('before-call.s3', self.apply_signing_cache_key) 

1670 emitter.register('before-sign.s3', self.resolve_s3express_identity) 

1671 

1672 def apply_signing_cache_key(self, params, context, **kwargs): 

1673 endpoint_properties = context.get('endpoint_properties', {}) 

1674 backend = endpoint_properties.get('backend', None) 

1675 

1676 # Add cache key if Bucket supplied for s3express request 

1677 bucket_name = context.get('input_params', {}).get('Bucket') 

1678 if backend == 'S3Express' and bucket_name is not None: 

1679 context.setdefault('signing', {}) 

1680 context['signing']['cache_key'] = bucket_name 

1681 

1682 def resolve_s3express_identity( 

1683 self, 

1684 request, 

1685 signing_name, 

1686 region_name, 

1687 signature_version, 

1688 request_signer, 

1689 operation_name, 

1690 **kwargs, 

1691 ): 

1692 signing_context = request.context.get('signing', {}) 

1693 signing_name = signing_context.get('signing_name') 

1694 if signing_name == 's3express' and signature_version.startswith( 

1695 'v4-s3express' 

1696 ): 

1697 signing_context['identity_cache'] = self._cache 

1698 if 'cache_key' not in signing_context: 

1699 signing_context['cache_key'] = ( 

1700 request.context.get('s3_redirect', {}) 

1701 .get('params', {}) 

1702 .get('Bucket') 

1703 ) 

1704 

1705 

1706class S3RegionRedirectorv2: 

1707 """Updated version of S3RegionRedirector for use when 

1708 EndpointRulesetResolver is in use for endpoint resolution. 

1709 

1710 This class is considered private and subject to abrupt breaking changes or 

1711 removal without prior announcement. Please do not use it directly. 

1712 """ 

1713 

1714 def __init__(self, endpoint_bridge, client, cache=None): 

1715 self._cache = cache or {} 

1716 self._client = weakref.proxy(client) 

1717 

1718 def register(self, event_emitter=None): 

1719 logger.debug('Registering S3 region redirector handler') 

1720 emitter = event_emitter or self._client.meta.events 

1721 emitter.register('needs-retry.s3', self.redirect_from_error) 

1722 emitter.register( 

1723 'before-parameter-build.s3', self.annotate_request_context 

1724 ) 

1725 emitter.register( 

1726 'before-endpoint-resolution.s3', self.redirect_from_cache 

1727 ) 

1728 

1729 def redirect_from_error(self, request_dict, response, operation, **kwargs): 

1730 """ 

1731 An S3 request sent to the wrong region will return an error that 

1732 contains the endpoint the request should be sent to. This handler 

1733 will add the redirect information to the signing context and then 

1734 redirect the request. 

1735 """ 

1736 if response is None: 

1737 # This could be none if there was a ConnectionError or other 

1738 # transport error. 

1739 return 

1740 

1741 redirect_ctx = request_dict.get('context', {}).get('s3_redirect', {}) 

1742 if ArnParser.is_arn(redirect_ctx.get('bucket')): 

1743 logger.debug( 

1744 'S3 request was previously for an Accesspoint ARN, not ' 

1745 'redirecting.' 

1746 ) 

1747 return 

1748 

1749 if redirect_ctx.get('redirected'): 

1750 logger.debug( 

1751 'S3 request was previously redirected, not redirecting.' 

1752 ) 

1753 return 

1754 

1755 error = response[1].get('Error', {}) 

1756 error_code = error.get('Code') 

1757 response_metadata = response[1].get('ResponseMetadata', {}) 

1758 

1759 # We have to account for 400 responses because 

1760 # if we sign a Head* request with the wrong region, 

1761 # we'll get a 400 Bad Request but we won't get a 

1762 # body saying it's an "AuthorizationHeaderMalformed". 

1763 is_special_head_object = ( 

1764 error_code in ('301', '400') and operation.name == 'HeadObject' 

1765 ) 

1766 is_special_head_bucket = ( 

1767 error_code in ('301', '400') 

1768 and operation.name == 'HeadBucket' 

1769 and 'x-amz-bucket-region' 

1770 in response_metadata.get('HTTPHeaders', {}) 

1771 ) 

1772 is_wrong_signing_region = ( 

1773 error_code == 'AuthorizationHeaderMalformed' and 'Region' in error 

1774 ) 

1775 is_redirect_status = response[0] is not None and response[ 

1776 0 

1777 ].status_code in (301, 302, 307) 

1778 is_permanent_redirect = error_code == 'PermanentRedirect' 

1779 if not any( 

1780 [ 

1781 is_special_head_object, 

1782 is_wrong_signing_region, 

1783 is_permanent_redirect, 

1784 is_special_head_bucket, 

1785 is_redirect_status, 

1786 ] 

1787 ): 

1788 return 

1789 

1790 bucket = request_dict['context']['s3_redirect']['bucket'] 

1791 client_region = request_dict['context'].get('client_region') 

1792 new_region = self.get_bucket_region(bucket, response) 

1793 

1794 if new_region is None: 

1795 logger.debug( 

1796 f"S3 client configured for region {client_region} but the " 

1797 f"bucket {bucket} is not in that region and the proper region " 

1798 "could not be automatically determined." 

1799 ) 

1800 return 

1801 

1802 logger.debug( 

1803 f"S3 client configured for region {client_region} but the bucket {bucket} " 

1804 f"is in region {new_region}; Please configure the proper region to " 

1805 f"avoid multiple unnecessary redirects and signing attempts." 

1806 ) 

1807 # Adding the new region to _cache will make construct_endpoint() to 

1808 # use the new region as value for the AWS::Region builtin parameter. 

1809 self._cache[bucket] = new_region 

1810 

1811 # Re-resolve endpoint with new region and modify request_dict with 

1812 # the new URL, auth scheme, and signing context. 

1813 ep_resolver = self._client._ruleset_resolver 

1814 ep_info = ep_resolver.construct_endpoint( 

1815 operation_model=operation, 

1816 call_args=request_dict['context']['s3_redirect']['params'], 

1817 request_context=request_dict['context'], 

1818 ) 

1819 request_dict['url'] = self.set_request_url( 

1820 request_dict['url'], ep_info.url 

1821 ) 

1822 request_dict['context']['s3_redirect']['redirected'] = True 

1823 auth_schemes = ep_info.properties.get('authSchemes') 

1824 if auth_schemes is not None: 

1825 auth_info = ep_resolver.auth_schemes_to_signing_ctx(auth_schemes) 

1826 auth_type, signing_context = auth_info 

1827 request_dict['context']['auth_type'] = auth_type 

1828 request_dict['context']['signing'] = { 

1829 **request_dict['context'].get('signing', {}), 

1830 **signing_context, 

1831 } 

1832 

1833 # Return 0 so it doesn't wait to retry 

1834 return 0 

1835 

1836 def get_bucket_region(self, bucket, response): 

1837 """ 

1838 There are multiple potential sources for the new region to redirect to, 

1839 but they aren't all universally available for use. This will try to 

1840 find region from response elements, but will fall back to calling 

1841 HEAD on the bucket if all else fails. 

1842 

1843 :param bucket: The bucket to find the region for. This is necessary if 

1844 the region is not available in the error response. 

1845 :param response: A response representing a service request that failed 

1846 due to incorrect region configuration. 

1847 """ 

1848 # First try to source the region from the headers. 

1849 service_response = response[1] 

1850 response_headers = service_response['ResponseMetadata']['HTTPHeaders'] 

1851 if 'x-amz-bucket-region' in response_headers: 

1852 return response_headers['x-amz-bucket-region'] 

1853 

1854 # Next, check the error body 

1855 region = service_response.get('Error', {}).get('Region', None) 

1856 if region is not None: 

1857 return region 

1858 

1859 # Finally, HEAD the bucket. No other choice sadly. 

1860 try: 

1861 response = self._client.head_bucket(Bucket=bucket) 

1862 headers = response['ResponseMetadata']['HTTPHeaders'] 

1863 except ClientError as e: 

1864 headers = e.response['ResponseMetadata']['HTTPHeaders'] 

1865 

1866 region = headers.get('x-amz-bucket-region', None) 

1867 return region 

1868 

1869 def set_request_url(self, old_url, new_endpoint, **kwargs): 

1870 """ 

1871 Splice a new endpoint into an existing URL. Note that some endpoints 

1872 from the the endpoint provider have a path component which will be 

1873 discarded by this function. 

1874 """ 

1875 return _get_new_endpoint(old_url, new_endpoint, False) 

1876 

1877 def redirect_from_cache(self, builtins, params, **kwargs): 

1878 """ 

1879 If a bucket name has been redirected before, it is in the cache. This 

1880 handler will update the AWS::Region endpoint resolver builtin param 

1881 to use the region from cache instead of the client region to avoid the 

1882 redirect. 

1883 """ 

1884 bucket = params.get('Bucket') 

1885 if bucket is not None and bucket in self._cache: 

1886 new_region = self._cache.get(bucket) 

1887 builtins['AWS::Region'] = new_region 

1888 

1889 def annotate_request_context(self, params, context, **kwargs): 

1890 """Store the bucket name in context for later use when redirecting. 

1891 The bucket name may be an access point ARN or alias. 

1892 """ 

1893 bucket = params.get('Bucket') 

1894 context['s3_redirect'] = { 

1895 'redirected': False, 

1896 'bucket': bucket, 

1897 'params': params, 

1898 } 

1899 

1900 

1901class S3RegionRedirector: 

1902 """This handler has been replaced by S3RegionRedirectorv2. The original 

1903 version remains in place for any third-party libraries that import it. 

1904 """ 

1905 

1906 def __init__(self, endpoint_bridge, client, cache=None): 

1907 self._endpoint_resolver = endpoint_bridge 

1908 self._cache = cache 

1909 if self._cache is None: 

1910 self._cache = {} 

1911 

1912 # This needs to be a weak ref in order to prevent memory leaks on 

1913 # python 2.6 

1914 self._client = weakref.proxy(client) 

1915 

1916 warnings.warn( 

1917 'The S3RegionRedirector class has been deprecated for a new ' 

1918 'internal replacement. A future version of botocore may remove ' 

1919 'this class.', 

1920 category=FutureWarning, 

1921 ) 

1922 

1923 def register(self, event_emitter=None): 

1924 emitter = event_emitter or self._client.meta.events 

1925 emitter.register('needs-retry.s3', self.redirect_from_error) 

1926 emitter.register('before-call.s3', self.set_request_url) 

1927 emitter.register('before-parameter-build.s3', self.redirect_from_cache) 

1928 

1929 def redirect_from_error(self, request_dict, response, operation, **kwargs): 

1930 """ 

1931 An S3 request sent to the wrong region will return an error that 

1932 contains the endpoint the request should be sent to. This handler 

1933 will add the redirect information to the signing context and then 

1934 redirect the request. 

1935 """ 

1936 if response is None: 

1937 # This could be none if there was a ConnectionError or other 

1938 # transport error. 

1939 return 

1940 

1941 if self._is_s3_accesspoint(request_dict.get('context', {})): 

1942 logger.debug( 

1943 'S3 request was previously to an accesspoint, not redirecting.' 

1944 ) 

1945 return 

1946 

1947 if request_dict.get('context', {}).get('s3_redirected'): 

1948 logger.debug( 

1949 'S3 request was previously redirected, not redirecting.' 

1950 ) 

1951 return 

1952 

1953 error = response[1].get('Error', {}) 

1954 error_code = error.get('Code') 

1955 response_metadata = response[1].get('ResponseMetadata', {}) 

1956 

1957 # We have to account for 400 responses because 

1958 # if we sign a Head* request with the wrong region, 

1959 # we'll get a 400 Bad Request but we won't get a 

1960 # body saying it's an "AuthorizationHeaderMalformed". 

1961 is_special_head_object = ( 

1962 error_code in ('301', '400') and operation.name == 'HeadObject' 

1963 ) 

1964 is_special_head_bucket = ( 

1965 error_code in ('301', '400') 

1966 and operation.name == 'HeadBucket' 

1967 and 'x-amz-bucket-region' 

1968 in response_metadata.get('HTTPHeaders', {}) 

1969 ) 

1970 is_wrong_signing_region = ( 

1971 error_code == 'AuthorizationHeaderMalformed' and 'Region' in error 

1972 ) 

1973 is_redirect_status = response[0] is not None and response[ 

1974 0 

1975 ].status_code in (301, 302, 307) 

1976 is_permanent_redirect = error_code == 'PermanentRedirect' 

1977 if not any( 

1978 [ 

1979 is_special_head_object, 

1980 is_wrong_signing_region, 

1981 is_permanent_redirect, 

1982 is_special_head_bucket, 

1983 is_redirect_status, 

1984 ] 

1985 ): 

1986 return 

1987 

1988 bucket = request_dict['context']['signing']['bucket'] 

1989 client_region = request_dict['context'].get('client_region') 

1990 new_region = self.get_bucket_region(bucket, response) 

1991 

1992 if new_region is None: 

1993 logger.debug( 

1994 f"S3 client configured for region {client_region} but the bucket {bucket} is not " 

1995 "in that region and the proper region could not be " 

1996 "automatically determined." 

1997 ) 

1998 return 

1999 

2000 logger.debug( 

2001 f"S3 client configured for region {client_region} but the bucket {bucket} is in region" 

2002 f" {new_region}; Please configure the proper region to avoid multiple " 

2003 "unnecessary redirects and signing attempts." 

2004 ) 

2005 endpoint = self._endpoint_resolver.resolve('s3', new_region) 

2006 endpoint = endpoint['endpoint_url'] 

2007 

2008 signing_context = { 

2009 'region': new_region, 

2010 'bucket': bucket, 

2011 'endpoint': endpoint, 

2012 } 

2013 request_dict['context']['signing'] = signing_context 

2014 

2015 self._cache[bucket] = signing_context 

2016 self.set_request_url(request_dict, request_dict['context']) 

2017 

2018 request_dict['context']['s3_redirected'] = True 

2019 

2020 # Return 0 so it doesn't wait to retry 

2021 return 0 

2022 

2023 def get_bucket_region(self, bucket, response): 

2024 """ 

2025 There are multiple potential sources for the new region to redirect to, 

2026 but they aren't all universally available for use. This will try to 

2027 find region from response elements, but will fall back to calling 

2028 HEAD on the bucket if all else fails. 

2029 

2030 :param bucket: The bucket to find the region for. This is necessary if 

2031 the region is not available in the error response. 

2032 :param response: A response representing a service request that failed 

2033 due to incorrect region configuration. 

2034 """ 

2035 # First try to source the region from the headers. 

2036 service_response = response[1] 

2037 response_headers = service_response['ResponseMetadata']['HTTPHeaders'] 

2038 if 'x-amz-bucket-region' in response_headers: 

2039 return response_headers['x-amz-bucket-region'] 

2040 

2041 # Next, check the error body 

2042 region = service_response.get('Error', {}).get('Region', None) 

2043 if region is not None: 

2044 return region 

2045 

2046 # Finally, HEAD the bucket. No other choice sadly. 

2047 try: 

2048 response = self._client.head_bucket(Bucket=bucket) 

2049 headers = response['ResponseMetadata']['HTTPHeaders'] 

2050 except ClientError as e: 

2051 headers = e.response['ResponseMetadata']['HTTPHeaders'] 

2052 

2053 region = headers.get('x-amz-bucket-region', None) 

2054 return region 

2055 

2056 def set_request_url(self, params, context, **kwargs): 

2057 endpoint = context.get('signing', {}).get('endpoint', None) 

2058 if endpoint is not None: 

2059 params['url'] = _get_new_endpoint(params['url'], endpoint, False) 

2060 

2061 def redirect_from_cache(self, params, context, **kwargs): 

2062 """ 

2063 This handler retrieves a given bucket's signing context from the cache 

2064 and adds it into the request context. 

2065 """ 

2066 if self._is_s3_accesspoint(context): 

2067 return 

2068 bucket = params.get('Bucket') 

2069 signing_context = self._cache.get(bucket) 

2070 if signing_context is not None: 

2071 context['signing'] = signing_context 

2072 else: 

2073 context['signing'] = {'bucket': bucket} 

2074 

2075 def _is_s3_accesspoint(self, context): 

2076 return 's3_accesspoint' in context 

2077 

2078 

2079class InvalidArnException(ValueError): 

2080 pass 

2081 

2082 

2083class ArnParser: 

2084 def parse_arn(self, arn): 

2085 arn_parts = arn.split(':', 5) 

2086 if len(arn_parts) < 6: 

2087 raise InvalidArnException( 

2088 f'Provided ARN: {arn} must be of the format: ' 

2089 'arn:partition:service:region:account:resource' 

2090 ) 

2091 return { 

2092 'partition': arn_parts[1], 

2093 'service': arn_parts[2], 

2094 'region': arn_parts[3], 

2095 'account': arn_parts[4], 

2096 'resource': arn_parts[5], 

2097 } 

2098 

2099 @staticmethod 

2100 def is_arn(value): 

2101 if not isinstance(value, str) or not value.startswith('arn:'): 

2102 return False 

2103 arn_parser = ArnParser() 

2104 try: 

2105 arn_parser.parse_arn(value) 

2106 return True 

2107 except InvalidArnException: 

2108 return False 

2109 

2110 

2111class S3ArnParamHandler: 

2112 _RESOURCE_REGEX = re.compile( 

2113 r'^(?P<resource_type>accesspoint|outpost)[/:](?P<resource_name>.+)$' 

2114 ) 

2115 _OUTPOST_RESOURCE_REGEX = re.compile( 

2116 r'^(?P<outpost_name>[a-zA-Z0-9\-]{1,63})[/:]accesspoint[/:]' 

2117 r'(?P<accesspoint_name>[a-zA-Z0-9\-]{1,63}$)' 

2118 ) 

2119 _BLACKLISTED_OPERATIONS = ['CreateBucket'] 

2120 

2121 def __init__(self, arn_parser=None): 

2122 self._arn_parser = arn_parser 

2123 if arn_parser is None: 

2124 self._arn_parser = ArnParser() 

2125 

2126 def register(self, event_emitter): 

2127 event_emitter.register('before-parameter-build.s3', self.handle_arn) 

2128 

2129 def handle_arn(self, params, model, context, **kwargs): 

2130 if model.name in self._BLACKLISTED_OPERATIONS: 

2131 return 

2132 arn_details = self._get_arn_details_from_bucket_param(params) 

2133 if arn_details is None: 

2134 return 

2135 if arn_details['resource_type'] == 'accesspoint': 

2136 self._store_accesspoint(params, context, arn_details) 

2137 elif arn_details['resource_type'] == 'outpost': 

2138 self._store_outpost(params, context, arn_details) 

2139 

2140 def _get_arn_details_from_bucket_param(self, params): 

2141 if 'Bucket' in params: 

2142 try: 

2143 arn = params['Bucket'] 

2144 arn_details = self._arn_parser.parse_arn(arn) 

2145 self._add_resource_type_and_name(arn, arn_details) 

2146 return arn_details 

2147 except InvalidArnException: 

2148 pass 

2149 return None 

2150 

2151 def _add_resource_type_and_name(self, arn, arn_details): 

2152 match = self._RESOURCE_REGEX.match(arn_details['resource']) 

2153 if match: 

2154 arn_details['resource_type'] = match.group('resource_type') 

2155 arn_details['resource_name'] = match.group('resource_name') 

2156 else: 

2157 raise UnsupportedS3ArnError(arn=arn) 

2158 

2159 def _store_accesspoint(self, params, context, arn_details): 

2160 # Ideally the access-point would be stored as a parameter in the 

2161 # request where the serializer would then know how to serialize it, 

2162 # but access-points are not modeled in S3 operations so it would fail 

2163 # validation. Instead, we set the access-point to the bucket parameter 

2164 # to have some value set when serializing the request and additional 

2165 # information on the context from the arn to use in forming the 

2166 # access-point endpoint. 

2167 params['Bucket'] = arn_details['resource_name'] 

2168 context['s3_accesspoint'] = { 

2169 'name': arn_details['resource_name'], 

2170 'account': arn_details['account'], 

2171 'partition': arn_details['partition'], 

2172 'region': arn_details['region'], 

2173 'service': arn_details['service'], 

2174 } 

2175 

2176 def _store_outpost(self, params, context, arn_details): 

2177 resource_name = arn_details['resource_name'] 

2178 match = self._OUTPOST_RESOURCE_REGEX.match(resource_name) 

2179 if not match: 

2180 raise UnsupportedOutpostResourceError(resource_name=resource_name) 

2181 # Because we need to set the bucket name to something to pass 

2182 # validation we're going to use the access point name to be consistent 

2183 # with normal access point arns. 

2184 accesspoint_name = match.group('accesspoint_name') 

2185 params['Bucket'] = accesspoint_name 

2186 context['s3_accesspoint'] = { 

2187 'outpost_name': match.group('outpost_name'), 

2188 'name': accesspoint_name, 

2189 'account': arn_details['account'], 

2190 'partition': arn_details['partition'], 

2191 'region': arn_details['region'], 

2192 'service': arn_details['service'], 

2193 } 

2194 

2195 

2196class S3EndpointSetter: 

2197 _DEFAULT_PARTITION = 'aws' 

2198 _DEFAULT_DNS_SUFFIX = 'amazonaws.com' 

2199 

2200 def __init__( 

2201 self, 

2202 endpoint_resolver, 

2203 region=None, 

2204 s3_config=None, 

2205 endpoint_url=None, 

2206 partition=None, 

2207 use_fips_endpoint=False, 

2208 ): 

2209 # This is calling the endpoint_resolver in regions.py 

2210 self._endpoint_resolver = endpoint_resolver 

2211 self._region = region 

2212 self._s3_config = s3_config 

2213 self._use_fips_endpoint = use_fips_endpoint 

2214 if s3_config is None: 

2215 self._s3_config = {} 

2216 self._endpoint_url = endpoint_url 

2217 self._partition = partition 

2218 if partition is None: 

2219 self._partition = self._DEFAULT_PARTITION 

2220 

2221 def register(self, event_emitter): 

2222 event_emitter.register('before-sign.s3', self.set_endpoint) 

2223 event_emitter.register('choose-signer.s3', self.set_signer) 

2224 event_emitter.register( 

2225 'before-call.s3.WriteGetObjectResponse', 

2226 self.update_endpoint_to_s3_object_lambda, 

2227 ) 

2228 

2229 def update_endpoint_to_s3_object_lambda(self, params, context, **kwargs): 

2230 if self._use_accelerate_endpoint: 

2231 raise UnsupportedS3ConfigurationError( 

2232 msg='S3 client does not support accelerate endpoints for S3 Object Lambda operations', 

2233 ) 

2234 

2235 self._override_signing_name(context, 's3-object-lambda') 

2236 if self._endpoint_url: 

2237 # Only update the url if an explicit url was not provided 

2238 return 

2239 

2240 resolver = self._endpoint_resolver 

2241 # Constructing endpoints as s3-object-lambda as region 

2242 resolved = resolver.construct_endpoint( 

2243 's3-object-lambda', self._region 

2244 ) 

2245 

2246 # Ideally we would be able to replace the endpoint before 

2247 # serialization but there's no event to do that currently 

2248 # host_prefix is all the arn/bucket specs 

2249 new_endpoint = 'https://{host_prefix}{hostname}'.format( 

2250 host_prefix=params['host_prefix'], 

2251 hostname=resolved['hostname'], 

2252 ) 

2253 

2254 params['url'] = _get_new_endpoint(params['url'], new_endpoint, False) 

2255 

2256 def set_endpoint(self, request, **kwargs): 

2257 if self._use_accesspoint_endpoint(request): 

2258 self._validate_accesspoint_supported(request) 

2259 self._validate_fips_supported(request) 

2260 self._validate_global_regions(request) 

2261 region_name = self._resolve_region_for_accesspoint_endpoint( 

2262 request 

2263 ) 

2264 self._resolve_signing_name_for_accesspoint_endpoint(request) 

2265 self._switch_to_accesspoint_endpoint(request, region_name) 

2266 return 

2267 if self._use_accelerate_endpoint: 

2268 if self._use_fips_endpoint: 

2269 raise UnsupportedS3ConfigurationError( 

2270 msg=( 

2271 'Client is configured to use the FIPS psuedo region ' 

2272 f'for "{self._region}", but S3 Accelerate does not have any FIPS ' 

2273 'compatible endpoints.' 

2274 ) 

2275 ) 

2276 switch_host_s3_accelerate(request=request, **kwargs) 

2277 if self._s3_addressing_handler: 

2278 self._s3_addressing_handler(request=request, **kwargs) 

2279 

2280 def _use_accesspoint_endpoint(self, request): 

2281 return 's3_accesspoint' in request.context 

2282 

2283 def _validate_fips_supported(self, request): 

2284 if not self._use_fips_endpoint: 

2285 return 

2286 if 'fips' in request.context['s3_accesspoint']['region']: 

2287 raise UnsupportedS3AccesspointConfigurationError( 

2288 msg={'Invalid ARN, FIPS region not allowed in ARN.'} 

2289 ) 

2290 if 'outpost_name' in request.context['s3_accesspoint']: 

2291 raise UnsupportedS3AccesspointConfigurationError( 

2292 msg=( 

2293 f'Client is configured to use the FIPS psuedo-region "{self._region}", ' 

2294 'but outpost ARNs do not support FIPS endpoints.' 

2295 ) 

2296 ) 

2297 # Transforming psuedo region to actual region 

2298 accesspoint_region = request.context['s3_accesspoint']['region'] 

2299 if accesspoint_region != self._region: 

2300 if not self._s3_config.get('use_arn_region', True): 

2301 # TODO: Update message to reflect use_arn_region 

2302 # is not set 

2303 raise UnsupportedS3AccesspointConfigurationError( 

2304 msg=( 

2305 'Client is configured to use the FIPS psuedo-region ' 

2306 f'for "{self._region}", but the access-point ARN provided is for ' 

2307 f'the "{accesspoint_region}" region. For clients using a FIPS ' 

2308 'psuedo-region calls to access-point ARNs in another ' 

2309 'region are not allowed.' 

2310 ) 

2311 ) 

2312 

2313 def _validate_global_regions(self, request): 

2314 if self._s3_config.get('use_arn_region', True): 

2315 return 

2316 if self._region in ['aws-global', 's3-external-1']: 

2317 raise UnsupportedS3AccesspointConfigurationError( 

2318 msg=( 

2319 'Client is configured to use the global psuedo-region ' 

2320 f'"{self._region}". When providing access-point ARNs a regional ' 

2321 'endpoint must be specified.' 

2322 ) 

2323 ) 

2324 

2325 def _validate_accesspoint_supported(self, request): 

2326 if self._use_accelerate_endpoint: 

2327 raise UnsupportedS3AccesspointConfigurationError( 

2328 msg=( 

2329 'Client does not support s3 accelerate configuration ' 

2330 'when an access-point ARN is specified.' 

2331 ) 

2332 ) 

2333 request_partition = request.context['s3_accesspoint']['partition'] 

2334 if request_partition != self._partition: 

2335 raise UnsupportedS3AccesspointConfigurationError( 

2336 msg=( 

2337 f'Client is configured for "{self._partition}" partition, but access-point' 

2338 f' ARN provided is for "{request_partition}" partition. The client and ' 

2339 ' access-point partition must be the same.' 

2340 ) 

2341 ) 

2342 s3_service = request.context['s3_accesspoint'].get('service') 

2343 if s3_service == 's3-object-lambda' and self._s3_config.get( 

2344 'use_dualstack_endpoint' 

2345 ): 

2346 raise UnsupportedS3AccesspointConfigurationError( 

2347 msg=( 

2348 'Client does not support s3 dualstack configuration ' 

2349 'when an S3 Object Lambda access point ARN is specified.' 

2350 ) 

2351 ) 

2352 outpost_name = request.context['s3_accesspoint'].get('outpost_name') 

2353 if outpost_name and self._s3_config.get('use_dualstack_endpoint'): 

2354 raise UnsupportedS3AccesspointConfigurationError( 

2355 msg=( 

2356 'Client does not support s3 dualstack configuration ' 

2357 'when an outpost ARN is specified.' 

2358 ) 

2359 ) 

2360 self._validate_mrap_s3_config(request) 

2361 

2362 def _validate_mrap_s3_config(self, request): 

2363 if not is_global_accesspoint(request.context): 

2364 return 

2365 if self._s3_config.get('s3_disable_multiregion_access_points'): 

2366 raise UnsupportedS3AccesspointConfigurationError( 

2367 msg=( 

2368 'Invalid configuration, Multi-Region Access Point ' 

2369 'ARNs are disabled.' 

2370 ) 

2371 ) 

2372 elif self._s3_config.get('use_dualstack_endpoint'): 

2373 raise UnsupportedS3AccesspointConfigurationError( 

2374 msg=( 

2375 'Client does not support s3 dualstack configuration ' 

2376 'when a Multi-Region Access Point ARN is specified.' 

2377 ) 

2378 ) 

2379 

2380 def _resolve_region_for_accesspoint_endpoint(self, request): 

2381 if is_global_accesspoint(request.context): 

2382 # Requests going to MRAP endpoints MUST be set to any (*) region. 

2383 self._override_signing_region(request, '*') 

2384 elif self._s3_config.get('use_arn_region', True): 

2385 accesspoint_region = request.context['s3_accesspoint']['region'] 

2386 # If we are using the region from the access point, 

2387 # we will also want to make sure that we set it as the 

2388 # signing region as well 

2389 self._override_signing_region(request, accesspoint_region) 

2390 return accesspoint_region 

2391 return self._region 

2392 

2393 def set_signer(self, context, **kwargs): 

2394 if is_global_accesspoint(context): 

2395 if HAS_CRT: 

2396 return 's3v4a' 

2397 else: 

2398 raise MissingDependencyException( 

2399 msg="Using S3 with an MRAP arn requires an additional " 

2400 "dependency. You will need to pip install " 

2401 "botocore[crt] before proceeding." 

2402 ) 

2403 

2404 def _resolve_signing_name_for_accesspoint_endpoint(self, request): 

2405 accesspoint_service = request.context['s3_accesspoint']['service'] 

2406 self._override_signing_name(request.context, accesspoint_service) 

2407 

2408 def _switch_to_accesspoint_endpoint(self, request, region_name): 

2409 original_components = urlsplit(request.url) 

2410 accesspoint_endpoint = urlunsplit( 

2411 ( 

2412 original_components.scheme, 

2413 self._get_netloc(request.context, region_name), 

2414 self._get_accesspoint_path( 

2415 original_components.path, request.context 

2416 ), 

2417 original_components.query, 

2418 '', 

2419 ) 

2420 ) 

2421 logger.debug( 

2422 f'Updating URI from {request.url} to {accesspoint_endpoint}' 

2423 ) 

2424 request.url = accesspoint_endpoint 

2425 

2426 def _get_netloc(self, request_context, region_name): 

2427 if is_global_accesspoint(request_context): 

2428 return self._get_mrap_netloc(request_context) 

2429 else: 

2430 return self._get_accesspoint_netloc(request_context, region_name) 

2431 

2432 def _get_mrap_netloc(self, request_context): 

2433 s3_accesspoint = request_context['s3_accesspoint'] 

2434 region_name = 's3-global' 

2435 mrap_netloc_components = [s3_accesspoint['name']] 

2436 if self._endpoint_url: 

2437 endpoint_url_netloc = urlsplit(self._endpoint_url).netloc 

2438 mrap_netloc_components.append(endpoint_url_netloc) 

2439 else: 

2440 partition = s3_accesspoint['partition'] 

2441 mrap_netloc_components.extend( 

2442 [ 

2443 'accesspoint', 

2444 region_name, 

2445 self._get_partition_dns_suffix(partition), 

2446 ] 

2447 ) 

2448 return '.'.join(mrap_netloc_components) 

2449 

2450 def _get_accesspoint_netloc(self, request_context, region_name): 

2451 s3_accesspoint = request_context['s3_accesspoint'] 

2452 accesspoint_netloc_components = [ 

2453 '{}-{}'.format(s3_accesspoint['name'], s3_accesspoint['account']), 

2454 ] 

2455 outpost_name = s3_accesspoint.get('outpost_name') 

2456 if self._endpoint_url: 

2457 if outpost_name: 

2458 accesspoint_netloc_components.append(outpost_name) 

2459 endpoint_url_netloc = urlsplit(self._endpoint_url).netloc 

2460 accesspoint_netloc_components.append(endpoint_url_netloc) 

2461 else: 

2462 if outpost_name: 

2463 outpost_host = [outpost_name, 's3-outposts'] 

2464 accesspoint_netloc_components.extend(outpost_host) 

2465 elif s3_accesspoint['service'] == 's3-object-lambda': 

2466 component = self._inject_fips_if_needed( 

2467 's3-object-lambda', request_context 

2468 ) 

2469 accesspoint_netloc_components.append(component) 

2470 else: 

2471 component = self._inject_fips_if_needed( 

2472 's3-accesspoint', request_context 

2473 ) 

2474 accesspoint_netloc_components.append(component) 

2475 if self._s3_config.get('use_dualstack_endpoint'): 

2476 accesspoint_netloc_components.append('dualstack') 

2477 accesspoint_netloc_components.extend( 

2478 [region_name, self._get_dns_suffix(region_name)] 

2479 ) 

2480 return '.'.join(accesspoint_netloc_components) 

2481 

2482 def _inject_fips_if_needed(self, component, request_context): 

2483 if self._use_fips_endpoint: 

2484 return f'{component}-fips' 

2485 return component 

2486 

2487 def _get_accesspoint_path(self, original_path, request_context): 

2488 # The Bucket parameter was substituted with the access-point name as 

2489 # some value was required in serializing the bucket name. Now that 

2490 # we are making the request directly to the access point, we will 

2491 # want to remove that access-point name from the path. 

2492 name = request_context['s3_accesspoint']['name'] 

2493 # All S3 operations require at least a / in their path. 

2494 return original_path.replace('/' + name, '', 1) or '/' 

2495 

2496 def _get_partition_dns_suffix(self, partition_name): 

2497 dns_suffix = self._endpoint_resolver.get_partition_dns_suffix( 

2498 partition_name 

2499 ) 

2500 if dns_suffix is None: 

2501 dns_suffix = self._DEFAULT_DNS_SUFFIX 

2502 return dns_suffix 

2503 

2504 def _get_dns_suffix(self, region_name): 

2505 resolved = self._endpoint_resolver.construct_endpoint( 

2506 's3', region_name 

2507 ) 

2508 dns_suffix = self._DEFAULT_DNS_SUFFIX 

2509 if resolved and 'dnsSuffix' in resolved: 

2510 dns_suffix = resolved['dnsSuffix'] 

2511 return dns_suffix 

2512 

2513 def _override_signing_region(self, request, region_name): 

2514 signing_context = request.context.get('signing', {}) 

2515 # S3SigV4Auth will use the context['signing']['region'] value to 

2516 # sign with if present. This is used by the Bucket redirector 

2517 # as well but we should be fine because the redirector is never 

2518 # used in combination with the accesspoint setting logic. 

2519 signing_context['region'] = region_name 

2520 request.context['signing'] = signing_context 

2521 

2522 def _override_signing_name(self, context, signing_name): 

2523 signing_context = context.get('signing', {}) 

2524 # S3SigV4Auth will use the context['signing']['signing_name'] value to 

2525 # sign with if present. This is used by the Bucket redirector 

2526 # as well but we should be fine because the redirector is never 

2527 # used in combination with the accesspoint setting logic. 

2528 signing_context['signing_name'] = signing_name 

2529 context['signing'] = signing_context 

2530 

2531 @CachedProperty 

2532 def _use_accelerate_endpoint(self): 

2533 # Enable accelerate if the configuration is set to to true or the 

2534 # endpoint being used matches one of the accelerate endpoints. 

2535 

2536 # Accelerate has been explicitly configured. 

2537 if self._s3_config.get('use_accelerate_endpoint'): 

2538 return True 

2539 

2540 # Accelerate mode is turned on automatically if an endpoint url is 

2541 # provided that matches the accelerate scheme. 

2542 if self._endpoint_url is None: 

2543 return False 

2544 

2545 # Accelerate is only valid for Amazon endpoints. 

2546 netloc = urlsplit(self._endpoint_url).netloc 

2547 if not netloc.endswith('amazonaws.com'): 

2548 return False 

2549 

2550 # The first part of the url should always be s3-accelerate. 

2551 parts = netloc.split('.') 

2552 if parts[0] != 's3-accelerate': 

2553 return False 

2554 

2555 # Url parts between 's3-accelerate' and 'amazonaws.com' which 

2556 # represent different url features. 

2557 feature_parts = parts[1:-2] 

2558 

2559 # There should be no duplicate url parts. 

2560 if len(feature_parts) != len(set(feature_parts)): 

2561 return False 

2562 

2563 # Remaining parts must all be in the whitelist. 

2564 return all(p in S3_ACCELERATE_WHITELIST for p in feature_parts) 

2565 

2566 @CachedProperty 

2567 def _addressing_style(self): 

2568 # Use virtual host style addressing if accelerate is enabled or if 

2569 # the given endpoint url is an accelerate endpoint. 

2570 if self._use_accelerate_endpoint: 

2571 return 'virtual' 

2572 

2573 # If a particular addressing style is configured, use it. 

2574 configured_addressing_style = self._s3_config.get('addressing_style') 

2575 if configured_addressing_style: 

2576 return configured_addressing_style 

2577 

2578 @CachedProperty 

2579 def _s3_addressing_handler(self): 

2580 # If virtual host style was configured, use it regardless of whether 

2581 # or not the bucket looks dns compatible. 

2582 if self._addressing_style == 'virtual': 

2583 logger.debug("Using S3 virtual host style addressing.") 

2584 return switch_to_virtual_host_style 

2585 

2586 # If path style is configured, no additional steps are needed. If 

2587 # endpoint_url was specified, don't default to virtual. We could 

2588 # potentially default provided endpoint urls to virtual hosted 

2589 # style, but for now it is avoided. 

2590 if self._addressing_style == 'path' or self._endpoint_url is not None: 

2591 logger.debug("Using S3 path style addressing.") 

2592 return None 

2593 

2594 logger.debug( 

2595 "Defaulting to S3 virtual host style addressing with " 

2596 "path style addressing fallback." 

2597 ) 

2598 

2599 # By default, try to use virtual style with path fallback. 

2600 return fix_s3_host 

2601 

2602 

2603class S3ControlEndpointSetter: 

2604 _DEFAULT_PARTITION = 'aws' 

2605 _DEFAULT_DNS_SUFFIX = 'amazonaws.com' 

2606 _HOST_LABEL_REGEX = re.compile(r'^[a-zA-Z0-9\-]{1,63}$') 

2607 

2608 def __init__( 

2609 self, 

2610 endpoint_resolver, 

2611 region=None, 

2612 s3_config=None, 

2613 endpoint_url=None, 

2614 partition=None, 

2615 use_fips_endpoint=False, 

2616 ): 

2617 self._endpoint_resolver = endpoint_resolver 

2618 self._region = region 

2619 self._s3_config = s3_config 

2620 self._use_fips_endpoint = use_fips_endpoint 

2621 if s3_config is None: 

2622 self._s3_config = {} 

2623 self._endpoint_url = endpoint_url 

2624 self._partition = partition 

2625 if partition is None: 

2626 self._partition = self._DEFAULT_PARTITION 

2627 

2628 def register(self, event_emitter): 

2629 event_emitter.register('before-sign.s3-control', self.set_endpoint) 

2630 

2631 def set_endpoint(self, request, **kwargs): 

2632 if self._use_endpoint_from_arn_details(request): 

2633 self._validate_endpoint_from_arn_details_supported(request) 

2634 region_name = self._resolve_region_from_arn_details(request) 

2635 self._resolve_signing_name_from_arn_details(request) 

2636 self._resolve_endpoint_from_arn_details(request, region_name) 

2637 self._add_headers_from_arn_details(request) 

2638 elif self._use_endpoint_from_outpost_id(request): 

2639 self._validate_outpost_redirection_valid(request) 

2640 self._override_signing_name(request, 's3-outposts') 

2641 new_netloc = self._construct_outpost_endpoint(self._region) 

2642 self._update_request_netloc(request, new_netloc) 

2643 

2644 def _use_endpoint_from_arn_details(self, request): 

2645 return 'arn_details' in request.context 

2646 

2647 def _use_endpoint_from_outpost_id(self, request): 

2648 return 'outpost_id' in request.context 

2649 

2650 def _validate_endpoint_from_arn_details_supported(self, request): 

2651 if 'fips' in request.context['arn_details']['region']: 

2652 raise UnsupportedS3ControlArnError( 

2653 arn=request.context['arn_details']['original'], 

2654 msg='Invalid ARN, FIPS region not allowed in ARN.', 

2655 ) 

2656 if not self._s3_config.get('use_arn_region', False): 

2657 arn_region = request.context['arn_details']['region'] 

2658 if arn_region != self._region: 

2659 error_msg = ( 

2660 'The use_arn_region configuration is disabled but ' 

2661 f'received arn for "{arn_region}" when the client is configured ' 

2662 f'to use "{self._region}"' 

2663 ) 

2664 raise UnsupportedS3ControlConfigurationError(msg=error_msg) 

2665 request_partion = request.context['arn_details']['partition'] 

2666 if request_partion != self._partition: 

2667 raise UnsupportedS3ControlConfigurationError( 

2668 msg=( 

2669 f'Client is configured for "{self._partition}" partition, but arn ' 

2670 f'provided is for "{request_partion}" partition. The client and ' 

2671 'arn partition must be the same.' 

2672 ) 

2673 ) 

2674 if self._s3_config.get('use_accelerate_endpoint'): 

2675 raise UnsupportedS3ControlConfigurationError( 

2676 msg='S3 control client does not support accelerate endpoints', 

2677 ) 

2678 if 'outpost_name' in request.context['arn_details']: 

2679 self._validate_outpost_redirection_valid(request) 

2680 

2681 def _validate_outpost_redirection_valid(self, request): 

2682 if self._s3_config.get('use_dualstack_endpoint'): 

2683 raise UnsupportedS3ControlConfigurationError( 

2684 msg=( 

2685 'Client does not support s3 dualstack configuration ' 

2686 'when an outpost is specified.' 

2687 ) 

2688 ) 

2689 

2690 def _resolve_region_from_arn_details(self, request): 

2691 if self._s3_config.get('use_arn_region', False): 

2692 arn_region = request.context['arn_details']['region'] 

2693 # If we are using the region from the expanded arn, we will also 

2694 # want to make sure that we set it as the signing region as well 

2695 self._override_signing_region(request, arn_region) 

2696 return arn_region 

2697 return self._region 

2698 

2699 def _resolve_signing_name_from_arn_details(self, request): 

2700 arn_service = request.context['arn_details']['service'] 

2701 self._override_signing_name(request, arn_service) 

2702 return arn_service 

2703 

2704 def _resolve_endpoint_from_arn_details(self, request, region_name): 

2705 new_netloc = self._resolve_netloc_from_arn_details( 

2706 request, region_name 

2707 ) 

2708 self._update_request_netloc(request, new_netloc) 

2709 

2710 def _update_request_netloc(self, request, new_netloc): 

2711 original_components = urlsplit(request.url) 

2712 arn_details_endpoint = urlunsplit( 

2713 ( 

2714 original_components.scheme, 

2715 new_netloc, 

2716 original_components.path, 

2717 original_components.query, 

2718 '', 

2719 ) 

2720 ) 

2721 logger.debug( 

2722 f'Updating URI from {request.url} to {arn_details_endpoint}' 

2723 ) 

2724 request.url = arn_details_endpoint 

2725 

2726 def _resolve_netloc_from_arn_details(self, request, region_name): 

2727 arn_details = request.context['arn_details'] 

2728 if 'outpost_name' in arn_details: 

2729 return self._construct_outpost_endpoint(region_name) 

2730 account = arn_details['account'] 

2731 return self._construct_s3_control_endpoint(region_name, account) 

2732 

2733 def _is_valid_host_label(self, label): 

2734 return self._HOST_LABEL_REGEX.match(label) 

2735 

2736 def _validate_host_labels(self, *labels): 

2737 for label in labels: 

2738 if not self._is_valid_host_label(label): 

2739 raise InvalidHostLabelError(label=label) 

2740 

2741 def _construct_s3_control_endpoint(self, region_name, account): 

2742 self._validate_host_labels(region_name, account) 

2743 if self._endpoint_url: 

2744 endpoint_url_netloc = urlsplit(self._endpoint_url).netloc 

2745 netloc = [account, endpoint_url_netloc] 

2746 else: 

2747 netloc = [ 

2748 account, 

2749 's3-control', 

2750 ] 

2751 self._add_dualstack(netloc) 

2752 dns_suffix = self._get_dns_suffix(region_name) 

2753 netloc.extend([region_name, dns_suffix]) 

2754 return self._construct_netloc(netloc) 

2755 

2756 def _construct_outpost_endpoint(self, region_name): 

2757 self._validate_host_labels(region_name) 

2758 if self._endpoint_url: 

2759 return urlsplit(self._endpoint_url).netloc 

2760 else: 

2761 netloc = [ 

2762 's3-outposts', 

2763 region_name, 

2764 self._get_dns_suffix(region_name), 

2765 ] 

2766 self._add_fips(netloc) 

2767 return self._construct_netloc(netloc) 

2768 

2769 def _construct_netloc(self, netloc): 

2770 return '.'.join(netloc) 

2771 

2772 def _add_fips(self, netloc): 

2773 if self._use_fips_endpoint: 

2774 netloc[0] = netloc[0] + '-fips' 

2775 

2776 def _add_dualstack(self, netloc): 

2777 if self._s3_config.get('use_dualstack_endpoint'): 

2778 netloc.append('dualstack') 

2779 

2780 def _get_dns_suffix(self, region_name): 

2781 resolved = self._endpoint_resolver.construct_endpoint( 

2782 's3', region_name 

2783 ) 

2784 dns_suffix = self._DEFAULT_DNS_SUFFIX 

2785 if resolved and 'dnsSuffix' in resolved: 

2786 dns_suffix = resolved['dnsSuffix'] 

2787 return dns_suffix 

2788 

2789 def _override_signing_region(self, request, region_name): 

2790 signing_context = request.context.get('signing', {}) 

2791 # S3SigV4Auth will use the context['signing']['region'] value to 

2792 # sign with if present. This is used by the Bucket redirector 

2793 # as well but we should be fine because the redirector is never 

2794 # used in combination with the accesspoint setting logic. 

2795 signing_context['region'] = region_name 

2796 request.context['signing'] = signing_context 

2797 

2798 def _override_signing_name(self, request, signing_name): 

2799 signing_context = request.context.get('signing', {}) 

2800 # S3SigV4Auth will use the context['signing']['signing_name'] value to 

2801 # sign with if present. This is used by the Bucket redirector 

2802 # as well but we should be fine because the redirector is never 

2803 # used in combination with the accesspoint setting logic. 

2804 signing_context['signing_name'] = signing_name 

2805 request.context['signing'] = signing_context 

2806 

2807 def _add_headers_from_arn_details(self, request): 

2808 arn_details = request.context['arn_details'] 

2809 outpost_name = arn_details.get('outpost_name') 

2810 if outpost_name: 

2811 self._add_outpost_id_header(request, outpost_name) 

2812 

2813 def _add_outpost_id_header(self, request, outpost_name): 

2814 request.headers['x-amz-outpost-id'] = outpost_name 

2815 

2816 

2817class S3ControlArnParamHandler: 

2818 """This handler has been replaced by S3ControlArnParamHandlerv2. The 

2819 original version remains in place for any third-party importers. 

2820 """ 

2821 

2822 _RESOURCE_SPLIT_REGEX = re.compile(r'[/:]') 

2823 

2824 def __init__(self, arn_parser=None): 

2825 self._arn_parser = arn_parser 

2826 if arn_parser is None: 

2827 self._arn_parser = ArnParser() 

2828 warnings.warn( 

2829 'The S3ControlArnParamHandler class has been deprecated for a new ' 

2830 'internal replacement. A future version of botocore may remove ' 

2831 'this class.', 

2832 category=FutureWarning, 

2833 ) 

2834 

2835 def register(self, event_emitter): 

2836 event_emitter.register( 

2837 'before-parameter-build.s3-control', 

2838 self.handle_arn, 

2839 ) 

2840 

2841 def handle_arn(self, params, model, context, **kwargs): 

2842 if model.name in ('CreateBucket', 'ListRegionalBuckets'): 

2843 # CreateBucket and ListRegionalBuckets are special cases that do 

2844 # not obey ARN based redirection but will redirect based off of the 

2845 # presence of the OutpostId parameter 

2846 self._handle_outpost_id_param(params, model, context) 

2847 else: 

2848 self._handle_name_param(params, model, context) 

2849 self._handle_bucket_param(params, model, context) 

2850 

2851 def _get_arn_details_from_param(self, params, param_name): 

2852 if param_name not in params: 

2853 return None 

2854 try: 

2855 arn = params[param_name] 

2856 arn_details = self._arn_parser.parse_arn(arn) 

2857 arn_details['original'] = arn 

2858 arn_details['resources'] = self._split_resource(arn_details) 

2859 return arn_details 

2860 except InvalidArnException: 

2861 return None 

2862 

2863 def _split_resource(self, arn_details): 

2864 return self._RESOURCE_SPLIT_REGEX.split(arn_details['resource']) 

2865 

2866 def _override_account_id_param(self, params, arn_details): 

2867 account_id = arn_details['account'] 

2868 if 'AccountId' in params and params['AccountId'] != account_id: 

2869 error_msg = ( 

2870 'Account ID in arn does not match the AccountId parameter ' 

2871 'provided: "{}"' 

2872 ).format(params['AccountId']) 

2873 raise UnsupportedS3ControlArnError( 

2874 arn=arn_details['original'], 

2875 msg=error_msg, 

2876 ) 

2877 params['AccountId'] = account_id 

2878 

2879 def _handle_outpost_id_param(self, params, model, context): 

2880 if 'OutpostId' not in params: 

2881 return 

2882 context['outpost_id'] = params['OutpostId'] 

2883 

2884 def _handle_name_param(self, params, model, context): 

2885 # CreateAccessPoint is a special case that does not expand Name 

2886 if model.name == 'CreateAccessPoint': 

2887 return 

2888 arn_details = self._get_arn_details_from_param(params, 'Name') 

2889 if arn_details is None: 

2890 return 

2891 if self._is_outpost_accesspoint(arn_details): 

2892 self._store_outpost_accesspoint(params, context, arn_details) 

2893 else: 

2894 error_msg = 'The Name parameter does not support the provided ARN' 

2895 raise UnsupportedS3ControlArnError( 

2896 arn=arn_details['original'], 

2897 msg=error_msg, 

2898 ) 

2899 

2900 def _is_outpost_accesspoint(self, arn_details): 

2901 if arn_details['service'] != 's3-outposts': 

2902 return False 

2903 resources = arn_details['resources'] 

2904 if len(resources) != 4: 

2905 return False 

2906 # Resource must be of the form outpost/op-123/accesspoint/name 

2907 return resources[0] == 'outpost' and resources[2] == 'accesspoint' 

2908 

2909 def _store_outpost_accesspoint(self, params, context, arn_details): 

2910 self._override_account_id_param(params, arn_details) 

2911 accesspoint_name = arn_details['resources'][3] 

2912 params['Name'] = accesspoint_name 

2913 arn_details['accesspoint_name'] = accesspoint_name 

2914 arn_details['outpost_name'] = arn_details['resources'][1] 

2915 context['arn_details'] = arn_details 

2916 

2917 def _handle_bucket_param(self, params, model, context): 

2918 arn_details = self._get_arn_details_from_param(params, 'Bucket') 

2919 if arn_details is None: 

2920 return 

2921 if self._is_outpost_bucket(arn_details): 

2922 self._store_outpost_bucket(params, context, arn_details) 

2923 else: 

2924 error_msg = ( 

2925 'The Bucket parameter does not support the provided ARN' 

2926 ) 

2927 raise UnsupportedS3ControlArnError( 

2928 arn=arn_details['original'], 

2929 msg=error_msg, 

2930 ) 

2931 

2932 def _is_outpost_bucket(self, arn_details): 

2933 if arn_details['service'] != 's3-outposts': 

2934 return False 

2935 resources = arn_details['resources'] 

2936 if len(resources) != 4: 

2937 return False 

2938 # Resource must be of the form outpost/op-123/bucket/name 

2939 return resources[0] == 'outpost' and resources[2] == 'bucket' 

2940 

2941 def _store_outpost_bucket(self, params, context, arn_details): 

2942 self._override_account_id_param(params, arn_details) 

2943 bucket_name = arn_details['resources'][3] 

2944 params['Bucket'] = bucket_name 

2945 arn_details['bucket_name'] = bucket_name 

2946 arn_details['outpost_name'] = arn_details['resources'][1] 

2947 context['arn_details'] = arn_details 

2948 

2949 

2950class S3ControlArnParamHandlerv2(S3ControlArnParamHandler): 

2951 """Updated version of S3ControlArnParamHandler for use when 

2952 EndpointRulesetResolver is in use for endpoint resolution. 

2953 

2954 This class is considered private and subject to abrupt breaking changes or 

2955 removal without prior announcement. Please do not use it directly. 

2956 """ 

2957 

2958 def __init__(self, arn_parser=None): 

2959 self._arn_parser = arn_parser 

2960 if arn_parser is None: 

2961 self._arn_parser = ArnParser() 

2962 

2963 def register(self, event_emitter): 

2964 event_emitter.register( 

2965 'before-endpoint-resolution.s3-control', 

2966 self.handle_arn, 

2967 ) 

2968 

2969 def _handle_name_param(self, params, model, context): 

2970 # CreateAccessPoint is a special case that does not expand Name 

2971 if model.name == 'CreateAccessPoint': 

2972 return 

2973 arn_details = self._get_arn_details_from_param(params, 'Name') 

2974 if arn_details is None: 

2975 return 

2976 self._raise_for_fips_pseudo_region(arn_details) 

2977 self._raise_for_accelerate_endpoint(context) 

2978 if self._is_outpost_accesspoint(arn_details): 

2979 self._store_outpost_accesspoint(params, context, arn_details) 

2980 else: 

2981 error_msg = 'The Name parameter does not support the provided ARN' 

2982 raise UnsupportedS3ControlArnError( 

2983 arn=arn_details['original'], 

2984 msg=error_msg, 

2985 ) 

2986 

2987 def _store_outpost_accesspoint(self, params, context, arn_details): 

2988 self._override_account_id_param(params, arn_details) 

2989 

2990 def _handle_bucket_param(self, params, model, context): 

2991 arn_details = self._get_arn_details_from_param(params, 'Bucket') 

2992 if arn_details is None: 

2993 return 

2994 self._raise_for_fips_pseudo_region(arn_details) 

2995 self._raise_for_accelerate_endpoint(context) 

2996 if self._is_outpost_bucket(arn_details): 

2997 self._store_outpost_bucket(params, context, arn_details) 

2998 else: 

2999 error_msg = ( 

3000 'The Bucket parameter does not support the provided ARN' 

3001 ) 

3002 raise UnsupportedS3ControlArnError( 

3003 arn=arn_details['original'], 

3004 msg=error_msg, 

3005 ) 

3006 

3007 def _store_outpost_bucket(self, params, context, arn_details): 

3008 self._override_account_id_param(params, arn_details) 

3009 

3010 def _raise_for_fips_pseudo_region(self, arn_details): 

3011 # FIPS pseudo region names cannot be used in ARNs 

3012 arn_region = arn_details['region'] 

3013 if arn_region.startswith('fips-') or arn_region.endswith('fips-'): 

3014 raise UnsupportedS3ControlArnError( 

3015 arn=arn_details['original'], 

3016 msg='Invalid ARN, FIPS region not allowed in ARN.', 

3017 ) 

3018 

3019 def _raise_for_accelerate_endpoint(self, context): 

3020 s3_config = context['client_config'].s3 or {} 

3021 if s3_config.get('use_accelerate_endpoint'): 

3022 raise UnsupportedS3ControlConfigurationError( 

3023 msg='S3 control client does not support accelerate endpoints', 

3024 ) 

3025 

3026 

3027class ContainerMetadataFetcher: 

3028 TIMEOUT_SECONDS = 2 

3029 RETRY_ATTEMPTS = 3 

3030 SLEEP_TIME = 1 

3031 IP_ADDRESS = '169.254.170.2' 

3032 _ALLOWED_HOSTS = [ 

3033 IP_ADDRESS, 

3034 '169.254.170.23', 

3035 'fd00:ec2::23', 

3036 'localhost', 

3037 ] 

3038 

3039 def __init__(self, session=None, sleep=time.sleep): 

3040 if session is None: 

3041 session = botocore.httpsession.URLLib3Session( 

3042 timeout=self.TIMEOUT_SECONDS 

3043 ) 

3044 self._session = session 

3045 self._sleep = sleep 

3046 

3047 def retrieve_full_uri(self, full_url, headers=None): 

3048 """Retrieve JSON metadata from container metadata. 

3049 

3050 :type full_url: str 

3051 :param full_url: The full URL of the metadata service. 

3052 This should include the scheme as well, e.g 

3053 "http://localhost:123/foo" 

3054 

3055 """ 

3056 self._validate_allowed_url(full_url) 

3057 return self._retrieve_credentials(full_url, headers) 

3058 

3059 def _validate_allowed_url(self, full_url): 

3060 parsed = botocore.compat.urlparse(full_url) 

3061 if self._is_loopback_address(parsed.hostname): 

3062 return 

3063 is_whitelisted_host = self._check_if_whitelisted_host(parsed.hostname) 

3064 if not is_whitelisted_host: 

3065 raise ValueError( 

3066 f"Unsupported host '{parsed.hostname}'. Can only retrieve metadata " 

3067 f"from a loopback address or one of these hosts: {', '.join(self._ALLOWED_HOSTS)}" 

3068 ) 

3069 

3070 def _is_loopback_address(self, hostname): 

3071 try: 

3072 ip = ip_address(hostname) 

3073 return ip.is_loopback 

3074 except ValueError: 

3075 return False 

3076 

3077 def _check_if_whitelisted_host(self, host): 

3078 if host in self._ALLOWED_HOSTS: 

3079 return True 

3080 return False 

3081 

3082 def retrieve_uri(self, relative_uri): 

3083 """Retrieve JSON metadata from container metadata. 

3084 

3085 :type relative_uri: str 

3086 :param relative_uri: A relative URI, e.g "/foo/bar?id=123" 

3087 

3088 :return: The parsed JSON response. 

3089 

3090 """ 

3091 full_url = self.full_url(relative_uri) 

3092 return self._retrieve_credentials(full_url) 

3093 

3094 def _retrieve_credentials(self, full_url, extra_headers=None): 

3095 headers = {'Accept': 'application/json'} 

3096 if extra_headers is not None: 

3097 headers.update(extra_headers) 

3098 attempts = 0 

3099 while True: 

3100 try: 

3101 return self._get_response( 

3102 full_url, headers, self.TIMEOUT_SECONDS 

3103 ) 

3104 except MetadataRetrievalError as e: 

3105 logger.debug( 

3106 "Received error when attempting to retrieve " 

3107 "container metadata: %s", 

3108 e, 

3109 exc_info=True, 

3110 ) 

3111 self._sleep(self.SLEEP_TIME) 

3112 attempts += 1 

3113 if attempts >= self.RETRY_ATTEMPTS: 

3114 raise 

3115 

3116 def _get_response(self, full_url, headers, timeout): 

3117 try: 

3118 AWSRequest = botocore.awsrequest.AWSRequest 

3119 request = AWSRequest(method='GET', url=full_url, headers=headers) 

3120 response = self._session.send(request.prepare()) 

3121 response_text = response.content.decode('utf-8') 

3122 if response.status_code != 200: 

3123 raise MetadataRetrievalError( 

3124 error_msg=( 

3125 f"Received non 200 response {response.status_code} " 

3126 f"from container metadata: {response_text}" 

3127 ) 

3128 ) 

3129 try: 

3130 return json.loads(response_text) 

3131 except ValueError: 

3132 error_msg = "Unable to parse JSON returned from container metadata services" 

3133 logger.debug('%s:%s', error_msg, response_text) 

3134 raise MetadataRetrievalError(error_msg=error_msg) 

3135 except RETRYABLE_HTTP_ERRORS as e: 

3136 error_msg = ( 

3137 "Received error when attempting to retrieve " 

3138 f"container metadata: {e}" 

3139 ) 

3140 raise MetadataRetrievalError(error_msg=error_msg) 

3141 

3142 def full_url(self, relative_uri): 

3143 return f'http://{self.IP_ADDRESS}{relative_uri}' 

3144 

3145 

3146def get_environ_proxies(url): 

3147 if should_bypass_proxies(url): 

3148 return {} 

3149 else: 

3150 return getproxies() 

3151 

3152 

3153def should_bypass_proxies(url): 

3154 """ 

3155 Returns whether we should bypass proxies or not. 

3156 """ 

3157 # NOTE: requests allowed for ip/cidr entries in no_proxy env that we don't 

3158 # support current as urllib only checks DNS suffix 

3159 # If the system proxy settings indicate that this URL should be bypassed, 

3160 # don't proxy. 

3161 # The proxy_bypass function is incredibly buggy on OS X in early versions 

3162 # of Python 2.6, so allow this call to fail. Only catch the specific 

3163 # exceptions we've seen, though: this call failing in other ways can reveal 

3164 # legitimate problems. 

3165 try: 

3166 if proxy_bypass(urlparse(url).netloc): 

3167 return True 

3168 except (TypeError, socket.gaierror): 

3169 pass 

3170 

3171 return False 

3172 

3173 

3174def determine_content_length(body): 

3175 # No body, content length of 0 

3176 if not body: 

3177 return 0 

3178 

3179 # Try asking the body for it's length 

3180 try: 

3181 return len(body) 

3182 except (AttributeError, TypeError): 

3183 pass 

3184 

3185 # Try getting the length from a seekable stream 

3186 if hasattr(body, 'seek') and hasattr(body, 'tell'): 

3187 try: 

3188 orig_pos = body.tell() 

3189 body.seek(0, 2) 

3190 end_file_pos = body.tell() 

3191 body.seek(orig_pos) 

3192 return end_file_pos - orig_pos 

3193 except io.UnsupportedOperation: 

3194 # in case when body is, for example, io.BufferedIOBase object 

3195 # it has "seek" method which throws "UnsupportedOperation" 

3196 # exception in such case we want to fall back to "chunked" 

3197 # encoding 

3198 pass 

3199 # Failed to determine the length 

3200 return None 

3201 

3202 

3203def get_encoding_from_headers(headers, default='ISO-8859-1'): 

3204 """Returns encodings from given HTTP Header Dict. 

3205 

3206 :param headers: dictionary to extract encoding from. 

3207 :param default: default encoding if the content-type is text 

3208 """ 

3209 

3210 content_type = headers.get('content-type') 

3211 

3212 if not content_type: 

3213 return None 

3214 

3215 message = email.message.Message() 

3216 message['content-type'] = content_type 

3217 charset = message.get_param("charset") 

3218 

3219 if charset is not None: 

3220 return charset 

3221 

3222 if 'text' in content_type: 

3223 return default 

3224 

3225 

3226def calculate_md5(body, **kwargs): 

3227 if isinstance(body, (bytes, bytearray)): 

3228 binary_md5 = _calculate_md5_from_bytes(body) 

3229 else: 

3230 binary_md5 = _calculate_md5_from_file(body) 

3231 return base64.b64encode(binary_md5).decode('ascii') 

3232 

3233 

3234def _calculate_md5_from_bytes(body_bytes): 

3235 md5 = get_md5(body_bytes) 

3236 return md5.digest() 

3237 

3238 

3239def _calculate_md5_from_file(fileobj): 

3240 start_position = fileobj.tell() 

3241 md5 = get_md5() 

3242 for chunk in iter(lambda: fileobj.read(1024 * 1024), b''): 

3243 md5.update(chunk) 

3244 fileobj.seek(start_position) 

3245 return md5.digest() 

3246 

3247 

3248def _is_s3express_request(params): 

3249 endpoint_properties = params.get('context', {}).get( 

3250 'endpoint_properties', {} 

3251 ) 

3252 return endpoint_properties.get('backend') == 'S3Express' 

3253 

3254 

3255def _has_checksum_header(params): 

3256 headers = params['headers'] 

3257 # If a user provided Content-MD5 is present, 

3258 # don't try to compute a new one. 

3259 if 'Content-MD5' in headers: 

3260 return True 

3261 

3262 # If a header matching the x-amz-checksum-* pattern is present, we 

3263 # assume a checksum has already been provided and an md5 is not needed 

3264 for header in headers: 

3265 if CHECKSUM_HEADER_PATTERN.match(header): 

3266 return True 

3267 

3268 return False 

3269 

3270 

3271def conditionally_calculate_checksum(params, **kwargs): 

3272 if not _has_checksum_header(params): 

3273 conditionally_calculate_md5(params, **kwargs) 

3274 conditionally_enable_crc32(params, **kwargs) 

3275 

3276 

3277def conditionally_enable_crc32(params, **kwargs): 

3278 checksum_context = params.get('context', {}).get('checksum', {}) 

3279 checksum_algorithm = checksum_context.get('request_algorithm') 

3280 if ( 

3281 _is_s3express_request(params) 

3282 and params['body'] is not None 

3283 and checksum_algorithm in (None, "conditional-md5") 

3284 ): 

3285 params['context']['checksum'] = { 

3286 'request_algorithm': { 

3287 'algorithm': 'crc32', 

3288 'in': 'header', 

3289 'name': 'x-amz-checksum-crc32', 

3290 } 

3291 } 

3292 

3293 

3294def conditionally_calculate_md5(params, **kwargs): 

3295 """Only add a Content-MD5 if the system supports it.""" 

3296 body = params['body'] 

3297 checksum_context = params.get('context', {}).get('checksum', {}) 

3298 checksum_algorithm = checksum_context.get('request_algorithm') 

3299 if checksum_algorithm and checksum_algorithm != 'conditional-md5': 

3300 # Skip for requests that will have a flexible checksum applied 

3301 return 

3302 

3303 if _has_checksum_header(params): 

3304 # Don't add a new header if one is already available. 

3305 return 

3306 

3307 if _is_s3express_request(params): 

3308 # S3Express doesn't support MD5 

3309 return 

3310 

3311 if MD5_AVAILABLE and body is not None: 

3312 md5_digest = calculate_md5(body, **kwargs) 

3313 params['headers']['Content-MD5'] = md5_digest 

3314 

3315 

3316class FileWebIdentityTokenLoader: 

3317 def __init__(self, web_identity_token_path, _open=open): 

3318 self._web_identity_token_path = web_identity_token_path 

3319 self._open = _open 

3320 

3321 def __call__(self): 

3322 with self._open(self._web_identity_token_path) as token_file: 

3323 return token_file.read() 

3324 

3325 

3326class SSOTokenLoader: 

3327 def __init__(self, cache=None): 

3328 if cache is None: 

3329 cache = {} 

3330 self._cache = cache 

3331 

3332 def _generate_cache_key(self, start_url, session_name): 

3333 input_str = start_url 

3334 if session_name is not None: 

3335 input_str = session_name 

3336 return hashlib.sha1(input_str.encode('utf-8')).hexdigest() 

3337 

3338 def save_token(self, start_url, token, session_name=None): 

3339 cache_key = self._generate_cache_key(start_url, session_name) 

3340 self._cache[cache_key] = token 

3341 

3342 def __call__(self, start_url, session_name=None): 

3343 cache_key = self._generate_cache_key(start_url, session_name) 

3344 logger.debug(f'Checking for cached token at: {cache_key}') 

3345 if cache_key not in self._cache: 

3346 name = start_url 

3347 if session_name is not None: 

3348 name = session_name 

3349 error_msg = f'Token for {name} does not exist' 

3350 raise SSOTokenLoadError(error_msg=error_msg) 

3351 

3352 token = self._cache[cache_key] 

3353 if 'accessToken' not in token or 'expiresAt' not in token: 

3354 error_msg = f'Token for {start_url} is invalid' 

3355 raise SSOTokenLoadError(error_msg=error_msg) 

3356 return token 

3357 

3358 

3359class EventbridgeSignerSetter: 

3360 _DEFAULT_PARTITION = 'aws' 

3361 _DEFAULT_DNS_SUFFIX = 'amazonaws.com' 

3362 

3363 def __init__(self, endpoint_resolver, region=None, endpoint_url=None): 

3364 self._endpoint_resolver = endpoint_resolver 

3365 self._region = region 

3366 self._endpoint_url = endpoint_url 

3367 

3368 def register(self, event_emitter): 

3369 event_emitter.register( 

3370 'before-parameter-build.events.PutEvents', 

3371 self.check_for_global_endpoint, 

3372 ) 

3373 event_emitter.register( 

3374 'before-call.events.PutEvents', self.set_endpoint_url 

3375 ) 

3376 

3377 def set_endpoint_url(self, params, context, **kwargs): 

3378 if 'eventbridge_endpoint' in context: 

3379 endpoint = context['eventbridge_endpoint'] 

3380 logger.debug(f"Rewriting URL from {params['url']} to {endpoint}") 

3381 params['url'] = endpoint 

3382 

3383 def check_for_global_endpoint(self, params, context, **kwargs): 

3384 endpoint = params.get('EndpointId') 

3385 if endpoint is None: 

3386 return 

3387 

3388 if len(endpoint) == 0: 

3389 raise InvalidEndpointConfigurationError( 

3390 msg='EndpointId must not be a zero length string' 

3391 ) 

3392 

3393 if not HAS_CRT: 

3394 raise MissingDependencyException( 

3395 msg="Using EndpointId requires an additional " 

3396 "dependency. You will need to pip install " 

3397 "botocore[crt] before proceeding." 

3398 ) 

3399 

3400 config = context.get('client_config') 

3401 endpoint_variant_tags = None 

3402 if config is not None: 

3403 if config.use_fips_endpoint: 

3404 raise InvalidEndpointConfigurationError( 

3405 msg="FIPS is not supported with EventBridge " 

3406 "multi-region endpoints." 

3407 ) 

3408 if config.use_dualstack_endpoint: 

3409 endpoint_variant_tags = ['dualstack'] 

3410 

3411 if self._endpoint_url is None: 

3412 # Validate endpoint is a valid hostname component 

3413 parts = urlparse(f'https://{endpoint}') 

3414 if parts.hostname != endpoint: 

3415 raise InvalidEndpointConfigurationError( 

3416 msg='EndpointId is not a valid hostname component.' 

3417 ) 

3418 resolved_endpoint = self._get_global_endpoint( 

3419 endpoint, endpoint_variant_tags=endpoint_variant_tags 

3420 ) 

3421 else: 

3422 resolved_endpoint = self._endpoint_url 

3423 

3424 context['eventbridge_endpoint'] = resolved_endpoint 

3425 context['auth_type'] = 'v4a' 

3426 

3427 def _get_global_endpoint(self, endpoint, endpoint_variant_tags=None): 

3428 resolver = self._endpoint_resolver 

3429 

3430 partition = resolver.get_partition_for_region(self._region) 

3431 if partition is None: 

3432 partition = self._DEFAULT_PARTITION 

3433 dns_suffix = resolver.get_partition_dns_suffix( 

3434 partition, endpoint_variant_tags=endpoint_variant_tags 

3435 ) 

3436 if dns_suffix is None: 

3437 dns_suffix = self._DEFAULT_DNS_SUFFIX 

3438 

3439 return f"https://{endpoint}.endpoint.events.{dns_suffix}/" 

3440 

3441 

3442def is_s3_accelerate_url(url): 

3443 """Does the URL match the S3 Accelerate endpoint scheme? 

3444 

3445 Virtual host naming style with bucket names in the netloc part of the URL 

3446 are not allowed by this function. 

3447 """ 

3448 if url is None: 

3449 return False 

3450 

3451 # Accelerate is only valid for Amazon endpoints. 

3452 url_parts = urlsplit(url) 

3453 if not url_parts.netloc.endswith( 

3454 'amazonaws.com' 

3455 ) or url_parts.scheme not in ['https', 'http']: 

3456 return False 

3457 

3458 # The first part of the URL must be s3-accelerate. 

3459 parts = url_parts.netloc.split('.') 

3460 if parts[0] != 's3-accelerate': 

3461 return False 

3462 

3463 # Url parts between 's3-accelerate' and 'amazonaws.com' which 

3464 # represent different url features. 

3465 feature_parts = parts[1:-2] 

3466 

3467 # There should be no duplicate URL parts. 

3468 if len(feature_parts) != len(set(feature_parts)): 

3469 return False 

3470 

3471 # Remaining parts must all be in the whitelist. 

3472 return all(p in S3_ACCELERATE_WHITELIST for p in feature_parts) 

3473 

3474 

3475class JSONFileCache: 

3476 """JSON file cache. 

3477 This provides a dict like interface that stores JSON serializable 

3478 objects. 

3479 The objects are serialized to JSON and stored in a file. These 

3480 values can be retrieved at a later time. 

3481 """ 

3482 

3483 CACHE_DIR = os.path.expanduser(os.path.join('~', '.aws', 'boto', 'cache')) 

3484 

3485 def __init__(self, working_dir=CACHE_DIR, dumps_func=None): 

3486 self._working_dir = working_dir 

3487 if dumps_func is None: 

3488 dumps_func = self._default_dumps 

3489 self._dumps = dumps_func 

3490 

3491 def _default_dumps(self, obj): 

3492 return json.dumps(obj, default=self._serialize_if_needed) 

3493 

3494 def __contains__(self, cache_key): 

3495 actual_key = self._convert_cache_key(cache_key) 

3496 return os.path.isfile(actual_key) 

3497 

3498 def __getitem__(self, cache_key): 

3499 """Retrieve value from a cache key.""" 

3500 actual_key = self._convert_cache_key(cache_key) 

3501 try: 

3502 with open(actual_key) as f: 

3503 return json.load(f) 

3504 except (OSError, ValueError): 

3505 raise KeyError(cache_key) 

3506 

3507 def __delitem__(self, cache_key): 

3508 actual_key = self._convert_cache_key(cache_key) 

3509 try: 

3510 key_path = Path(actual_key) 

3511 key_path.unlink() 

3512 except FileNotFoundError: 

3513 raise KeyError(cache_key) 

3514 

3515 def __setitem__(self, cache_key, value): 

3516 full_key = self._convert_cache_key(cache_key) 

3517 try: 

3518 file_content = self._dumps(value) 

3519 except (TypeError, ValueError): 

3520 raise ValueError( 

3521 f"Value cannot be cached, must be " 

3522 f"JSON serializable: {value}" 

3523 ) 

3524 if not os.path.isdir(self._working_dir): 

3525 os.makedirs(self._working_dir) 

3526 with os.fdopen( 

3527 os.open(full_key, os.O_WRONLY | os.O_CREAT, 0o600), 'w' 

3528 ) as f: 

3529 f.truncate() 

3530 f.write(file_content) 

3531 

3532 def _convert_cache_key(self, cache_key): 

3533 full_path = os.path.join(self._working_dir, cache_key + '.json') 

3534 return full_path 

3535 

3536 def _serialize_if_needed(self, value, iso=False): 

3537 if isinstance(value, _DatetimeClass): 

3538 if iso: 

3539 return value.isoformat() 

3540 return value.strftime('%Y-%m-%dT%H:%M:%S%Z') 

3541 return value 

3542 

3543 

3544def is_s3express_bucket(bucket): 

3545 if bucket is None: 

3546 return False 

3547 return bucket.endswith('--x-s3') 

3548 

3549 

3550# This parameter is not part of the public interface and is subject to abrupt 

3551# breaking changes or removal without prior announcement. 

3552# Mapping of services that have been renamed for backwards compatibility reasons. 

3553# Keys are the previous name that should be allowed, values are the documented 

3554# and preferred client name. 

3555SERVICE_NAME_ALIASES = {'runtime.sagemaker': 'sagemaker-runtime'} 

3556 

3557 

3558# This parameter is not part of the public interface and is subject to abrupt 

3559# breaking changes or removal without prior announcement. 

3560# Mapping to determine the service ID for services that do not use it as the 

3561# model data directory name. The keys are the data directory name and the 

3562# values are the transformed service IDs (lower case and hyphenated). 

3563CLIENT_NAME_TO_HYPHENIZED_SERVICE_ID_OVERRIDES = { 

3564 # Actual service name we use -> Allowed computed service name. 

3565 'apigateway': 'api-gateway', 

3566 'application-autoscaling': 'application-auto-scaling', 

3567 'appmesh': 'app-mesh', 

3568 'autoscaling': 'auto-scaling', 

3569 'autoscaling-plans': 'auto-scaling-plans', 

3570 'ce': 'cost-explorer', 

3571 'cloudhsmv2': 'cloudhsm-v2', 

3572 'cloudsearchdomain': 'cloudsearch-domain', 

3573 'cognito-idp': 'cognito-identity-provider', 

3574 'config': 'config-service', 

3575 'cur': 'cost-and-usage-report-service', 

3576 'datapipeline': 'data-pipeline', 

3577 'directconnect': 'direct-connect', 

3578 'devicefarm': 'device-farm', 

3579 'discovery': 'application-discovery-service', 

3580 'dms': 'database-migration-service', 

3581 'ds': 'directory-service', 

3582 'dynamodbstreams': 'dynamodb-streams', 

3583 'elasticbeanstalk': 'elastic-beanstalk', 

3584 'elastictranscoder': 'elastic-transcoder', 

3585 'elb': 'elastic-load-balancing', 

3586 'elbv2': 'elastic-load-balancing-v2', 

3587 'es': 'elasticsearch-service', 

3588 'events': 'eventbridge', 

3589 'globalaccelerator': 'global-accelerator', 

3590 'iot-data': 'iot-data-plane', 

3591 'iot-jobs-data': 'iot-jobs-data-plane', 

3592 'iot1click-devices': 'iot-1click-devices-service', 

3593 'iot1click-projects': 'iot-1click-projects', 

3594 'iotevents-data': 'iot-events-data', 

3595 'iotevents': 'iot-events', 

3596 'iotwireless': 'iot-wireless', 

3597 'kinesisanalytics': 'kinesis-analytics', 

3598 'kinesisanalyticsv2': 'kinesis-analytics-v2', 

3599 'kinesisvideo': 'kinesis-video', 

3600 'lex-models': 'lex-model-building-service', 

3601 'lexv2-models': 'lex-models-v2', 

3602 'lex-runtime': 'lex-runtime-service', 

3603 'lexv2-runtime': 'lex-runtime-v2', 

3604 'logs': 'cloudwatch-logs', 

3605 'machinelearning': 'machine-learning', 

3606 'marketplacecommerceanalytics': 'marketplace-commerce-analytics', 

3607 'marketplace-entitlement': 'marketplace-entitlement-service', 

3608 'meteringmarketplace': 'marketplace-metering', 

3609 'mgh': 'migration-hub', 

3610 'sms-voice': 'pinpoint-sms-voice', 

3611 'resourcegroupstaggingapi': 'resource-groups-tagging-api', 

3612 'route53': 'route-53', 

3613 'route53domains': 'route-53-domains', 

3614 's3control': 's3-control', 

3615 'sdb': 'simpledb', 

3616 'secretsmanager': 'secrets-manager', 

3617 'serverlessrepo': 'serverlessapplicationrepository', 

3618 'servicecatalog': 'service-catalog', 

3619 'servicecatalog-appregistry': 'service-catalog-appregistry', 

3620 'stepfunctions': 'sfn', 

3621 'storagegateway': 'storage-gateway', 

3622}