Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/botocore/utils.py: 20%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

1682 statements  

1# Copyright 2012-2014 Amazon.com, Inc. or its affiliates. All Rights Reserved. 

2# 

3# Licensed under the Apache License, Version 2.0 (the "License"). You 

4# may not use this file except in compliance with the License. A copy of 

5# the License is located at 

6# 

7# http://aws.amazon.com/apache2.0/ 

8# 

9# or in the "license" file accompanying this file. This file is 

10# distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF 

11# ANY KIND, either express or implied. See the License for the specific 

12# language governing permissions and limitations under the License. 

13import base64 

14import binascii 

15import datetime 

16import email.message 

17import functools 

18import hashlib 

19import io 

20import logging 

21import os 

22import random 

23import re 

24import socket 

25import time 

26import warnings 

27import weakref 

28from datetime import datetime as _DatetimeClass 

29from ipaddress import ip_address 

30from pathlib import Path 

31from urllib.request import getproxies, proxy_bypass 

32 

33import dateutil.parser 

34from dateutil.tz import tzutc 

35from urllib3.exceptions import LocationParseError 

36 

37import botocore 

38import botocore.awsrequest 

39import botocore.httpsession 

40 

41# IP Regexes retained for backwards compatibility 

42from botocore.compat import ( 

43 HAS_CRT, 

44 HEX_PAT, # noqa: F401 

45 IPV4_PAT, # noqa: F401 

46 IPV4_RE, 

47 IPV6_ADDRZ_PAT, # noqa: F401 

48 IPV6_ADDRZ_RE, 

49 IPV6_PAT, # noqa: F401 

50 LS32_PAT, # noqa: F401 

51 MD5_AVAILABLE, 

52 UNRESERVED_PAT, # noqa: F401 

53 UNSAFE_URL_CHARS, 

54 ZONE_ID_PAT, # noqa: F401 

55 OrderedDict, 

56 get_md5, 

57 get_tzinfo_options, 

58 json, 

59 quote, 

60 urlparse, 

61 urlsplit, 

62 urlunsplit, 

63 zip_longest, 

64) 

65from botocore.exceptions import ( 

66 ClientError, 

67 ConfigNotFound, 

68 ConnectionClosedError, 

69 ConnectTimeoutError, 

70 EndpointConnectionError, 

71 HTTPClientError, 

72 InvalidDNSNameError, 

73 InvalidEndpointConfigurationError, 

74 InvalidExpressionError, 

75 InvalidHostLabelError, 

76 InvalidIMDSEndpointError, 

77 InvalidIMDSEndpointModeError, 

78 InvalidRegionError, 

79 MetadataRetrievalError, 

80 MissingDependencyException, 

81 ReadTimeoutError, 

82 SSOTokenLoadError, 

83 UnsupportedOutpostResourceError, 

84 UnsupportedS3AccesspointConfigurationError, 

85 UnsupportedS3ArnError, 

86 UnsupportedS3ConfigurationError, 

87 UnsupportedS3ControlArnError, 

88 UnsupportedS3ControlConfigurationError, 

89) 

90 

91logger = logging.getLogger(__name__) 

92DEFAULT_METADATA_SERVICE_TIMEOUT = 1 

93METADATA_BASE_URL = 'http://169.254.169.254/' 

94METADATA_BASE_URL_IPv6 = 'http://[fd00:ec2::254]/' 

95METADATA_ENDPOINT_MODES = ('ipv4', 'ipv6') 

96 

97# These are chars that do not need to be urlencoded. 

98# Based on rfc2986, section 2.3 

99SAFE_CHARS = '-._~' 

100LABEL_RE = re.compile(r'[a-z0-9][a-z0-9\-]*[a-z0-9]') 

101RETRYABLE_HTTP_ERRORS = ( 

102 ReadTimeoutError, 

103 EndpointConnectionError, 

104 ConnectionClosedError, 

105 ConnectTimeoutError, 

106) 

107S3_ACCELERATE_WHITELIST = ['dualstack'] 

108# In switching events from using service name / endpoint prefix to service 

109# id, we have to preserve compatibility. This maps the instances where either 

110# is different than the transformed service id. 

111EVENT_ALIASES = { 

112 "api.mediatailor": "mediatailor", 

113 "api.pricing": "pricing", 

114 "api.sagemaker": "sagemaker", 

115 "apigateway": "api-gateway", 

116 "application-autoscaling": "application-auto-scaling", 

117 "appstream2": "appstream", 

118 "autoscaling": "auto-scaling", 

119 "autoscaling-plans": "auto-scaling-plans", 

120 "ce": "cost-explorer", 

121 "cloudhsmv2": "cloudhsm-v2", 

122 "cloudsearchdomain": "cloudsearch-domain", 

123 "cognito-idp": "cognito-identity-provider", 

124 "config": "config-service", 

125 "cur": "cost-and-usage-report-service", 

126 "data.iot": "iot-data-plane", 

127 "data.jobs.iot": "iot-jobs-data-plane", 

128 "data.mediastore": "mediastore-data", 

129 "datapipeline": "data-pipeline", 

130 "devicefarm": "device-farm", 

131 "directconnect": "direct-connect", 

132 "discovery": "application-discovery-service", 

133 "dms": "database-migration-service", 

134 "ds": "directory-service", 

135 "dynamodbstreams": "dynamodb-streams", 

136 "elasticbeanstalk": "elastic-beanstalk", 

137 "elasticfilesystem": "efs", 

138 "elasticloadbalancing": "elastic-load-balancing", 

139 "elasticmapreduce": "emr", 

140 "elastictranscoder": "elastic-transcoder", 

141 "elb": "elastic-load-balancing", 

142 "elbv2": "elastic-load-balancing-v2", 

143 "email": "ses", 

144 "entitlement.marketplace": "marketplace-entitlement-service", 

145 "es": "elasticsearch-service", 

146 "events": "eventbridge", 

147 "cloudwatch-events": "eventbridge", 

148 "iot-data": "iot-data-plane", 

149 "iot-jobs-data": "iot-jobs-data-plane", 

150 "kinesisanalytics": "kinesis-analytics", 

151 "kinesisvideo": "kinesis-video", 

152 "lex-models": "lex-model-building-service", 

153 "lex-runtime": "lex-runtime-service", 

154 "logs": "cloudwatch-logs", 

155 "machinelearning": "machine-learning", 

156 "marketplace-entitlement": "marketplace-entitlement-service", 

157 "marketplacecommerceanalytics": "marketplace-commerce-analytics", 

158 "metering.marketplace": "marketplace-metering", 

159 "meteringmarketplace": "marketplace-metering", 

160 "mgh": "migration-hub", 

161 "models.lex": "lex-model-building-service", 

162 "monitoring": "cloudwatch", 

163 "mturk-requester": "mturk", 

164 "opsworks-cm": "opsworkscm", 

165 "resourcegroupstaggingapi": "resource-groups-tagging-api", 

166 "route53": "route-53", 

167 "route53domains": "route-53-domains", 

168 "runtime.lex": "lex-runtime-service", 

169 "runtime.sagemaker": "sagemaker-runtime", 

170 "sdb": "simpledb", 

171 "secretsmanager": "secrets-manager", 

172 "serverlessrepo": "serverlessapplicationrepository", 

173 "servicecatalog": "service-catalog", 

174 "states": "sfn", 

175 "stepfunctions": "sfn", 

176 "storagegateway": "storage-gateway", 

177 "streams.dynamodb": "dynamodb-streams", 

178 "tagging": "resource-groups-tagging-api", 

179} 

180 

181 

182# This pattern can be used to detect if a header is a flexible checksum header 

183CHECKSUM_HEADER_PATTERN = re.compile( 

184 r'^X-Amz-Checksum-([a-z0-9]*)$', 

185 flags=re.IGNORECASE, 

186) 

187 

188PRIORITY_ORDERED_SUPPORTED_PROTOCOLS = ( 

189 'json', 

190 'rest-json', 

191 'rest-xml', 

192 'smithy-rpc-v2-cbor', 

193 'query', 

194 'ec2', 

195) 

196 

197 

198def ensure_boolean(val): 

199 """Ensures a boolean value if a string or boolean is provided 

200 

201 For strings, the value for True/False is case insensitive 

202 """ 

203 if isinstance(val, bool): 

204 return val 

205 elif isinstance(val, str): 

206 return val.lower() == 'true' 

207 else: 

208 return False 

209 

210 

211def resolve_imds_endpoint_mode(session): 

212 """Resolving IMDS endpoint mode to either IPv6 or IPv4. 

213 

214 ec2_metadata_service_endpoint_mode takes precedence over imds_use_ipv6. 

215 """ 

216 endpoint_mode = session.get_config_variable( 

217 'ec2_metadata_service_endpoint_mode' 

218 ) 

219 if endpoint_mode is not None: 

220 lendpoint_mode = endpoint_mode.lower() 

221 if lendpoint_mode not in METADATA_ENDPOINT_MODES: 

222 error_msg_kwargs = { 

223 'mode': endpoint_mode, 

224 'valid_modes': METADATA_ENDPOINT_MODES, 

225 } 

226 raise InvalidIMDSEndpointModeError(**error_msg_kwargs) 

227 return lendpoint_mode 

228 elif session.get_config_variable('imds_use_ipv6'): 

229 return 'ipv6' 

230 return 'ipv4' 

231 

232 

233def is_json_value_header(shape): 

234 """Determines if the provided shape is the special header type jsonvalue. 

235 

236 :type shape: botocore.shape 

237 :param shape: Shape to be inspected for the jsonvalue trait. 

238 

239 :return: True if this type is a jsonvalue, False otherwise 

240 :rtype: Bool 

241 """ 

242 return ( 

243 hasattr(shape, 'serialization') 

244 and shape.serialization.get('jsonvalue', False) 

245 and shape.serialization.get('location') == 'header' 

246 and shape.type_name == 'string' 

247 ) 

248 

249 

250def has_header(header_name, headers): 

251 """Case-insensitive check for header key.""" 

252 if header_name is None: 

253 return False 

254 elif isinstance(headers, botocore.awsrequest.HeadersDict): 

255 return header_name in headers 

256 else: 

257 return header_name.lower() in [key.lower() for key in headers.keys()] 

258 

259 

260def get_service_module_name(service_model): 

261 """Returns the module name for a service 

262 

263 This is the value used in both the documentation and client class name 

264 """ 

265 name = service_model.metadata.get( 

266 'serviceAbbreviation', 

267 service_model.metadata.get( 

268 'serviceFullName', service_model.service_name 

269 ), 

270 ) 

271 name = name.replace('Amazon', '') 

272 name = name.replace('AWS', '') 

273 name = re.sub(r'\W+', '', name) 

274 return name 

275 

276 

277def normalize_url_path(path): 

278 if not path: 

279 return '/' 

280 return remove_dot_segments(path) 

281 

282 

283def normalize_boolean(val): 

284 """Returns None if val is None, otherwise ensure value 

285 converted to boolean""" 

286 if val is None: 

287 return val 

288 else: 

289 return ensure_boolean(val) 

290 

291 

292def remove_dot_segments(url): 

293 # RFC 3986, section 5.2.4 "Remove Dot Segments" 

294 # Also, AWS services require consecutive slashes to be removed, 

295 # so that's done here as well 

296 if not url: 

297 return '' 

298 input_url = url.split('/') 

299 output_list = [] 

300 for x in input_url: 

301 if x and x != '.': 

302 if x == '..': 

303 if output_list: 

304 output_list.pop() 

305 else: 

306 output_list.append(x) 

307 

308 if url[0] == '/': 

309 first = '/' 

310 else: 

311 first = '' 

312 if url[-1] == '/' and output_list: 

313 last = '/' 

314 else: 

315 last = '' 

316 return first + '/'.join(output_list) + last 

317 

318 

319def validate_jmespath_for_set(expression): 

320 # Validates a limited jmespath expression to determine if we can set a 

321 # value based on it. Only works with dotted paths. 

322 if not expression or expression == '.': 

323 raise InvalidExpressionError(expression=expression) 

324 

325 for invalid in ['[', ']', '*']: 

326 if invalid in expression: 

327 raise InvalidExpressionError(expression=expression) 

328 

329 

330def set_value_from_jmespath(source, expression, value, is_first=True): 

331 # This takes a (limited) jmespath-like expression & can set a value based 

332 # on it. 

333 # Limitations: 

334 # * Only handles dotted lookups 

335 # * No offsets/wildcards/slices/etc. 

336 if is_first: 

337 validate_jmespath_for_set(expression) 

338 

339 bits = expression.split('.', 1) 

340 current_key, remainder = bits[0], bits[1] if len(bits) > 1 else '' 

341 

342 if not current_key: 

343 raise InvalidExpressionError(expression=expression) 

344 

345 if remainder: 

346 if current_key not in source: 

347 # We've got something in the expression that's not present in the 

348 # source (new key). If there's any more bits, we'll set the key 

349 # with an empty dictionary. 

350 source[current_key] = {} 

351 

352 return set_value_from_jmespath( 

353 source[current_key], remainder, value, is_first=False 

354 ) 

355 

356 # If we're down to a single key, set it. 

357 source[current_key] = value 

358 

359 

360def is_global_accesspoint(context): 

361 """Determine if request is intended for an MRAP accesspoint.""" 

362 s3_accesspoint = context.get('s3_accesspoint', {}) 

363 is_global = s3_accesspoint.get('region') == '' 

364 return is_global 

365 

366 

367class _RetriesExceededError(Exception): 

368 """Internal exception used when the number of retries are exceeded.""" 

369 

370 pass 

371 

372 

373class BadIMDSRequestError(Exception): 

374 def __init__(self, request): 

375 self.request = request 

376 

377 

378class IMDSFetcher: 

379 _RETRIES_EXCEEDED_ERROR_CLS = _RetriesExceededError 

380 _TOKEN_PATH = 'latest/api/token' 

381 _TOKEN_TTL = '21600' 

382 

383 def __init__( 

384 self, 

385 timeout=DEFAULT_METADATA_SERVICE_TIMEOUT, 

386 num_attempts=1, 

387 base_url=METADATA_BASE_URL, 

388 env=None, 

389 user_agent=None, 

390 config=None, 

391 ): 

392 self._timeout = timeout 

393 self._num_attempts = num_attempts 

394 if config is None: 

395 config = {} 

396 self._base_url = self._select_base_url(base_url, config) 

397 self._config = config 

398 

399 if env is None: 

400 env = os.environ.copy() 

401 self._disabled = ( 

402 env.get('AWS_EC2_METADATA_DISABLED', 'false').lower() == 'true' 

403 ) 

404 self._imds_v1_disabled = config.get('ec2_metadata_v1_disabled') 

405 self._user_agent = user_agent 

406 self._session = botocore.httpsession.URLLib3Session( 

407 timeout=self._timeout, 

408 proxies=get_environ_proxies(self._base_url), 

409 ) 

410 

411 def get_base_url(self): 

412 return self._base_url 

413 

414 def _select_base_url(self, base_url, config): 

415 if config is None: 

416 config = {} 

417 

418 requires_ipv6 = ( 

419 config.get('ec2_metadata_service_endpoint_mode') == 'ipv6' 

420 ) 

421 custom_metadata_endpoint = config.get('ec2_metadata_service_endpoint') 

422 

423 if requires_ipv6 and custom_metadata_endpoint: 

424 logger.warning( 

425 "Custom endpoint and IMDS_USE_IPV6 are both set. Using custom endpoint." 

426 ) 

427 

428 chosen_base_url = None 

429 

430 if base_url != METADATA_BASE_URL: 

431 chosen_base_url = base_url 

432 elif custom_metadata_endpoint: 

433 chosen_base_url = custom_metadata_endpoint 

434 elif requires_ipv6: 

435 chosen_base_url = METADATA_BASE_URL_IPv6 

436 else: 

437 chosen_base_url = METADATA_BASE_URL 

438 

439 logger.debug(f"IMDS ENDPOINT: {chosen_base_url}") 

440 if not is_valid_uri(chosen_base_url): 

441 raise InvalidIMDSEndpointError(endpoint=chosen_base_url) 

442 

443 return chosen_base_url 

444 

445 def _construct_url(self, path): 

446 sep = '' 

447 if self._base_url and not self._base_url.endswith('/'): 

448 sep = '/' 

449 return f'{self._base_url}{sep}{path}' 

450 

451 def _fetch_metadata_token(self): 

452 self._assert_enabled() 

453 url = self._construct_url(self._TOKEN_PATH) 

454 headers = { 

455 'x-aws-ec2-metadata-token-ttl-seconds': self._TOKEN_TTL, 

456 } 

457 self._add_user_agent(headers) 

458 request = botocore.awsrequest.AWSRequest( 

459 method='PUT', url=url, headers=headers 

460 ) 

461 for i in range(self._num_attempts): 

462 try: 

463 response = self._session.send(request.prepare()) 

464 if response.status_code == 200: 

465 return response.text 

466 elif response.status_code in (404, 403, 405): 

467 return None 

468 elif response.status_code in (400,): 

469 raise BadIMDSRequestError(request) 

470 except ReadTimeoutError: 

471 return None 

472 except RETRYABLE_HTTP_ERRORS as e: 

473 logger.debug( 

474 "Caught retryable HTTP exception while making metadata " 

475 "service request to %s: %s", 

476 url, 

477 e, 

478 exc_info=True, 

479 ) 

480 except HTTPClientError as e: 

481 if isinstance(e.kwargs.get('error'), LocationParseError): 

482 raise InvalidIMDSEndpointError(endpoint=url, error=e) 

483 else: 

484 raise 

485 return None 

486 

487 def _get_request(self, url_path, retry_func, token=None): 

488 """Make a get request to the Instance Metadata Service. 

489 

490 :type url_path: str 

491 :param url_path: The path component of the URL to make a get request. 

492 This arg is appended to the base_url that was provided in the 

493 initializer. 

494 

495 :type retry_func: callable 

496 :param retry_func: A function that takes the response as an argument 

497 and determines if it needs to retry. By default empty and non 

498 200 OK responses are retried. 

499 

500 :type token: str 

501 :param token: Metadata token to send along with GET requests to IMDS. 

502 """ 

503 self._assert_enabled() 

504 if not token: 

505 self._assert_v1_enabled() 

506 if retry_func is None: 

507 retry_func = self._default_retry 

508 url = self._construct_url(url_path) 

509 headers = {} 

510 if token is not None: 

511 headers['x-aws-ec2-metadata-token'] = token 

512 self._add_user_agent(headers) 

513 for i in range(self._num_attempts): 

514 try: 

515 request = botocore.awsrequest.AWSRequest( 

516 method='GET', url=url, headers=headers 

517 ) 

518 response = self._session.send(request.prepare()) 

519 if not retry_func(response): 

520 return response 

521 except RETRYABLE_HTTP_ERRORS as e: 

522 logger.debug( 

523 "Caught retryable HTTP exception while making metadata " 

524 "service request to %s: %s", 

525 url, 

526 e, 

527 exc_info=True, 

528 ) 

529 raise self._RETRIES_EXCEEDED_ERROR_CLS() 

530 

531 def _add_user_agent(self, headers): 

532 if self._user_agent is not None: 

533 headers['User-Agent'] = self._user_agent 

534 

535 def _assert_enabled(self): 

536 if self._disabled: 

537 logger.debug("Access to EC2 metadata has been disabled.") 

538 raise self._RETRIES_EXCEEDED_ERROR_CLS() 

539 

540 def _assert_v1_enabled(self): 

541 if self._imds_v1_disabled: 

542 raise MetadataRetrievalError( 

543 error_msg="Unable to retrieve token for use in IMDSv2 call and IMDSv1 has been disabled" 

544 ) 

545 

546 def _default_retry(self, response): 

547 return self._is_non_ok_response(response) or self._is_empty(response) 

548 

549 def _is_non_ok_response(self, response): 

550 if response.status_code != 200: 

551 self._log_imds_response(response, 'non-200', log_body=True) 

552 return True 

553 return False 

554 

555 def _is_empty(self, response): 

556 if not response.content: 

557 self._log_imds_response(response, 'no body', log_body=True) 

558 return True 

559 return False 

560 

561 def _log_imds_response(self, response, reason_to_log, log_body=False): 

562 statement = ( 

563 "Metadata service returned %s response " 

564 "with status code of %s for url: %s" 

565 ) 

566 logger_args = [reason_to_log, response.status_code, response.url] 

567 if log_body: 

568 statement += ", content body: %s" 

569 logger_args.append(response.content) 

570 logger.debug(statement, *logger_args) 

571 

572 

573class InstanceMetadataFetcher(IMDSFetcher): 

574 _URL_PATH = 'latest/meta-data/iam/security-credentials/' 

575 _REQUIRED_CREDENTIAL_FIELDS = [ 

576 'AccessKeyId', 

577 'SecretAccessKey', 

578 'Token', 

579 'Expiration', 

580 ] 

581 

582 def retrieve_iam_role_credentials(self): 

583 try: 

584 token = self._fetch_metadata_token() 

585 role_name = self._get_iam_role(token) 

586 credentials = self._get_credentials(role_name, token) 

587 if self._contains_all_credential_fields(credentials): 

588 credentials = { 

589 'role_name': role_name, 

590 'access_key': credentials['AccessKeyId'], 

591 'secret_key': credentials['SecretAccessKey'], 

592 'token': credentials['Token'], 

593 'expiry_time': credentials['Expiration'], 

594 } 

595 self._evaluate_expiration(credentials) 

596 return credentials 

597 else: 

598 # IMDS can return a 200 response that has a JSON formatted 

599 # error message (i.e. if ec2 is not trusted entity for the 

600 # attached role). We do not necessarily want to retry for 

601 # these and we also do not necessarily want to raise a key 

602 # error. So at least log the problematic response and return 

603 # an empty dictionary to signal that it was not able to 

604 # retrieve credentials. These error will contain both a 

605 # Code and Message key. 

606 if 'Code' in credentials and 'Message' in credentials: 

607 logger.debug( 

608 'Error response received when retrieving' 

609 'credentials: %s.', 

610 credentials, 

611 ) 

612 return {} 

613 except self._RETRIES_EXCEEDED_ERROR_CLS: 

614 logger.debug( 

615 "Max number of attempts exceeded (%s) when " 

616 "attempting to retrieve data from metadata service.", 

617 self._num_attempts, 

618 ) 

619 except BadIMDSRequestError as e: 

620 logger.debug("Bad IMDS request: %s", e.request) 

621 return {} 

622 

623 def _get_iam_role(self, token=None): 

624 return self._get_request( 

625 url_path=self._URL_PATH, 

626 retry_func=self._needs_retry_for_role_name, 

627 token=token, 

628 ).text 

629 

630 def _get_credentials(self, role_name, token=None): 

631 r = self._get_request( 

632 url_path=self._URL_PATH + role_name, 

633 retry_func=self._needs_retry_for_credentials, 

634 token=token, 

635 ) 

636 return json.loads(r.text) 

637 

638 def _is_invalid_json(self, response): 

639 try: 

640 json.loads(response.text) 

641 return False 

642 except ValueError: 

643 self._log_imds_response(response, 'invalid json') 

644 return True 

645 

646 def _needs_retry_for_role_name(self, response): 

647 return self._is_non_ok_response(response) or self._is_empty(response) 

648 

649 def _needs_retry_for_credentials(self, response): 

650 return ( 

651 self._is_non_ok_response(response) 

652 or self._is_empty(response) 

653 or self._is_invalid_json(response) 

654 ) 

655 

656 def _contains_all_credential_fields(self, credentials): 

657 for field in self._REQUIRED_CREDENTIAL_FIELDS: 

658 if field not in credentials: 

659 logger.debug( 

660 'Retrieved credentials is missing required field: %s', 

661 field, 

662 ) 

663 return False 

664 return True 

665 

666 def _evaluate_expiration(self, credentials): 

667 expiration = credentials.get("expiry_time") 

668 if expiration is None: 

669 return 

670 try: 

671 expiration = datetime.datetime.strptime( 

672 expiration, "%Y-%m-%dT%H:%M:%SZ" 

673 ) 

674 refresh_interval = self._config.get( 

675 "ec2_credential_refresh_window", 60 * 10 

676 ) 

677 jitter = random.randint(120, 600) # Between 2 to 10 minutes 

678 refresh_interval_with_jitter = refresh_interval + jitter 

679 current_time = datetime.datetime.utcnow() 

680 refresh_offset = datetime.timedelta( 

681 seconds=refresh_interval_with_jitter 

682 ) 

683 extension_time = expiration - refresh_offset 

684 if current_time >= extension_time: 

685 new_time = current_time + refresh_offset 

686 credentials["expiry_time"] = new_time.strftime( 

687 "%Y-%m-%dT%H:%M:%SZ" 

688 ) 

689 logger.info( 

690 f"Attempting credential expiration extension due to a " 

691 f"credential service availability issue. A refresh of " 

692 f"these credentials will be attempted again within " 

693 f"the next {refresh_interval_with_jitter / 60:.0f} minutes." 

694 ) 

695 except ValueError: 

696 logger.debug( 

697 f"Unable to parse expiry_time in {credentials['expiry_time']}" 

698 ) 

699 

700 

701class IMDSRegionProvider: 

702 def __init__(self, session, environ=None, fetcher=None): 

703 """Initialize IMDSRegionProvider. 

704 :type session: :class:`botocore.session.Session` 

705 :param session: The session is needed to look up configuration for 

706 how to contact the instance metadata service. Specifically the 

707 whether or not it should use the IMDS region at all, and if so how 

708 to configure the timeout and number of attempts to reach the 

709 service. 

710 :type environ: None or dict 

711 :param environ: A dictionary of environment variables to use. If 

712 ``None`` is the argument then ``os.environ`` will be used by 

713 default. 

714 :type fecther: :class:`botocore.utils.InstanceMetadataRegionFetcher` 

715 :param fetcher: The class to actually handle the fetching of the region 

716 from the IMDS. If not provided a default one will be created. 

717 """ 

718 self._session = session 

719 if environ is None: 

720 environ = os.environ 

721 self._environ = environ 

722 self._fetcher = fetcher 

723 

724 def provide(self): 

725 """Provide the region value from IMDS.""" 

726 instance_region = self._get_instance_metadata_region() 

727 return instance_region 

728 

729 def _get_instance_metadata_region(self): 

730 fetcher = self._get_fetcher() 

731 region = fetcher.retrieve_region() 

732 return region 

733 

734 def _get_fetcher(self): 

735 if self._fetcher is None: 

736 self._fetcher = self._create_fetcher() 

737 return self._fetcher 

738 

739 def _create_fetcher(self): 

740 metadata_timeout = self._session.get_config_variable( 

741 'metadata_service_timeout' 

742 ) 

743 metadata_num_attempts = self._session.get_config_variable( 

744 'metadata_service_num_attempts' 

745 ) 

746 imds_config = { 

747 'ec2_metadata_service_endpoint': self._session.get_config_variable( 

748 'ec2_metadata_service_endpoint' 

749 ), 

750 'ec2_metadata_service_endpoint_mode': resolve_imds_endpoint_mode( 

751 self._session 

752 ), 

753 'ec2_metadata_v1_disabled': self._session.get_config_variable( 

754 'ec2_metadata_v1_disabled' 

755 ), 

756 } 

757 fetcher = InstanceMetadataRegionFetcher( 

758 timeout=metadata_timeout, 

759 num_attempts=metadata_num_attempts, 

760 env=self._environ, 

761 user_agent=self._session.user_agent(), 

762 config=imds_config, 

763 ) 

764 return fetcher 

765 

766 

767class InstanceMetadataRegionFetcher(IMDSFetcher): 

768 _URL_PATH = 'latest/meta-data/placement/availability-zone/' 

769 

770 def retrieve_region(self): 

771 """Get the current region from the instance metadata service. 

772 :rvalue: str 

773 :returns: The region the current instance is running in or None 

774 if the instance metadata service cannot be contacted or does not 

775 give a valid response. 

776 :rtype: None or str 

777 :returns: Returns the region as a string if it is configured to use 

778 IMDS as a region source. Otherwise returns ``None``. It will also 

779 return ``None`` if it fails to get the region from IMDS due to 

780 exhausting its retries or not being able to connect. 

781 """ 

782 try: 

783 region = self._get_region() 

784 return region 

785 except self._RETRIES_EXCEEDED_ERROR_CLS: 

786 logger.debug( 

787 "Max number of attempts exceeded (%s) when " 

788 "attempting to retrieve data from metadata service.", 

789 self._num_attempts, 

790 ) 

791 return None 

792 

793 def _get_region(self): 

794 token = self._fetch_metadata_token() 

795 response = self._get_request( 

796 url_path=self._URL_PATH, 

797 retry_func=self._default_retry, 

798 token=token, 

799 ) 

800 availability_zone = response.text 

801 region = availability_zone[:-1] 

802 return region 

803 

804 

805def merge_dicts(dict1, dict2, append_lists=False): 

806 """Given two dict, merge the second dict into the first. 

807 

808 The dicts can have arbitrary nesting. 

809 

810 :param append_lists: If true, instead of clobbering a list with the new 

811 value, append all of the new values onto the original list. 

812 """ 

813 for key in dict2: 

814 if isinstance(dict2[key], dict): 

815 if key in dict1 and key in dict2: 

816 merge_dicts(dict1[key], dict2[key]) 

817 else: 

818 dict1[key] = dict2[key] 

819 # If the value is a list and the ``append_lists`` flag is set, 

820 # append the new values onto the original list 

821 elif isinstance(dict2[key], list) and append_lists: 

822 # The value in dict1 must be a list in order to append new 

823 # values onto it. 

824 if key in dict1 and isinstance(dict1[key], list): 

825 dict1[key].extend(dict2[key]) 

826 else: 

827 dict1[key] = dict2[key] 

828 else: 

829 # At scalar types, we iterate and merge the 

830 # current dict that we're on. 

831 dict1[key] = dict2[key] 

832 

833 

834def lowercase_dict(original): 

835 """Copies the given dictionary ensuring all keys are lowercase strings.""" 

836 copy = {} 

837 for key in original: 

838 copy[key.lower()] = original[key] 

839 return copy 

840 

841 

842def parse_key_val_file(filename, _open=open): 

843 try: 

844 with _open(filename) as f: 

845 contents = f.read() 

846 return parse_key_val_file_contents(contents) 

847 except OSError: 

848 raise ConfigNotFound(path=filename) 

849 

850 

851def parse_key_val_file_contents(contents): 

852 # This was originally extracted from the EC2 credential provider, which was 

853 # fairly lenient in its parsing. We only try to parse key/val pairs if 

854 # there's a '=' in the line. 

855 final = {} 

856 for line in contents.splitlines(): 

857 if '=' not in line: 

858 continue 

859 key, val = line.split('=', 1) 

860 key = key.strip() 

861 val = val.strip() 

862 final[key] = val 

863 return final 

864 

865 

866def percent_encode_sequence(mapping, safe=SAFE_CHARS): 

867 """Urlencode a dict or list into a string. 

868 

869 This is similar to urllib.urlencode except that: 

870 

871 * It uses quote, and not quote_plus 

872 * It has a default list of safe chars that don't need 

873 to be encoded, which matches what AWS services expect. 

874 

875 If any value in the input ``mapping`` is a list type, 

876 then each list element wil be serialized. This is the equivalent 

877 to ``urlencode``'s ``doseq=True`` argument. 

878 

879 This function should be preferred over the stdlib 

880 ``urlencode()`` function. 

881 

882 :param mapping: Either a dict to urlencode or a list of 

883 ``(key, value)`` pairs. 

884 

885 """ 

886 encoded_pairs = [] 

887 if hasattr(mapping, 'items'): 

888 pairs = mapping.items() 

889 else: 

890 pairs = mapping 

891 for key, value in pairs: 

892 if isinstance(value, list): 

893 for element in value: 

894 encoded_pairs.append( 

895 f'{percent_encode(key)}={percent_encode(element)}' 

896 ) 

897 else: 

898 encoded_pairs.append( 

899 f'{percent_encode(key)}={percent_encode(value)}' 

900 ) 

901 return '&'.join(encoded_pairs) 

902 

903 

904def percent_encode(input_str, safe=SAFE_CHARS): 

905 """Urlencodes a string. 

906 

907 Whereas percent_encode_sequence handles taking a dict/sequence and 

908 producing a percent encoded string, this function deals only with 

909 taking a string (not a dict/sequence) and percent encoding it. 

910 

911 If given the binary type, will simply URL encode it. If given the 

912 text type, will produce the binary type by UTF-8 encoding the 

913 text. If given something else, will convert it to the text type 

914 first. 

915 """ 

916 # If its not a binary or text string, make it a text string. 

917 if not isinstance(input_str, (bytes, str)): 

918 input_str = str(input_str) 

919 # If it's not bytes, make it bytes by UTF-8 encoding it. 

920 if not isinstance(input_str, bytes): 

921 input_str = input_str.encode('utf-8') 

922 return quote(input_str, safe=safe) 

923 

924 

925def _epoch_seconds_to_datetime(value, tzinfo): 

926 """Parse numerical epoch timestamps (seconds since 1970) into a 

927 ``datetime.datetime`` in UTC using ``datetime.timedelta``. This is intended 

928 as fallback when ``fromtimestamp`` raises ``OverflowError`` or ``OSError``. 

929 

930 :type value: float or int 

931 :param value: The Unix timestamps as number. 

932 

933 :type tzinfo: callable 

934 :param tzinfo: A ``datetime.tzinfo`` class or compatible callable. 

935 """ 

936 epoch_zero = datetime.datetime(1970, 1, 1, 0, 0, 0, tzinfo=tzutc()) 

937 epoch_zero_localized = epoch_zero.astimezone(tzinfo()) 

938 return epoch_zero_localized + datetime.timedelta(seconds=value) 

939 

940 

941def _parse_timestamp_with_tzinfo(value, tzinfo): 

942 """Parse timestamp with pluggable tzinfo options.""" 

943 if isinstance(value, (int, float)): 

944 # Possibly an epoch time. 

945 return datetime.datetime.fromtimestamp(value, tzinfo()) 

946 else: 

947 try: 

948 return datetime.datetime.fromtimestamp(float(value), tzinfo()) 

949 except (TypeError, ValueError): 

950 pass 

951 try: 

952 # In certain cases, a timestamp marked with GMT can be parsed into a 

953 # different time zone, so here we provide a context which will 

954 # enforce that GMT == UTC. 

955 return dateutil.parser.parse(value, tzinfos={'GMT': tzutc()}) 

956 except (TypeError, ValueError) as e: 

957 raise ValueError(f'Invalid timestamp "{value}": {e}') 

958 

959 

960def parse_timestamp(value): 

961 """Parse a timestamp into a datetime object. 

962 

963 Supported formats: 

964 

965 * iso8601 

966 * rfc822 

967 * epoch (value is an integer) 

968 

969 This will return a ``datetime.datetime`` object. 

970 

971 """ 

972 tzinfo_options = get_tzinfo_options() 

973 for tzinfo in tzinfo_options: 

974 try: 

975 return _parse_timestamp_with_tzinfo(value, tzinfo) 

976 except (OSError, OverflowError) as e: 

977 logger.debug( 

978 'Unable to parse timestamp with "%s" timezone info.', 

979 tzinfo.__name__, 

980 exc_info=e, 

981 ) 

982 # For numeric values attempt fallback to using fromtimestamp-free method. 

983 # From Python's ``datetime.datetime.fromtimestamp`` documentation: "This 

984 # may raise ``OverflowError``, if the timestamp is out of the range of 

985 # values supported by the platform C localtime() function, and ``OSError`` 

986 # on localtime() failure. It's common for this to be restricted to years 

987 # from 1970 through 2038." 

988 try: 

989 numeric_value = float(value) 

990 except (TypeError, ValueError): 

991 pass 

992 else: 

993 try: 

994 for tzinfo in tzinfo_options: 

995 return _epoch_seconds_to_datetime(numeric_value, tzinfo=tzinfo) 

996 except (OSError, OverflowError) as e: 

997 logger.debug( 

998 'Unable to parse timestamp using fallback method with "%s" ' 

999 'timezone info.', 

1000 tzinfo.__name__, 

1001 exc_info=e, 

1002 ) 

1003 raise RuntimeError( 

1004 f'Unable to calculate correct timezone offset for "{value}"' 

1005 ) 

1006 

1007 

1008def parse_to_aware_datetime(value): 

1009 """Converted the passed in value to a datetime object with tzinfo. 

1010 

1011 This function can be used to normalize all timestamp inputs. This 

1012 function accepts a number of different types of inputs, but 

1013 will always return a datetime.datetime object with time zone 

1014 information. 

1015 

1016 The input param ``value`` can be one of several types: 

1017 

1018 * A datetime object (both naive and aware) 

1019 * An integer representing the epoch time (can also be a string 

1020 of the integer, i.e '0', instead of 0). The epoch time is 

1021 considered to be UTC. 

1022 * An iso8601 formatted timestamp. This does not need to be 

1023 a complete timestamp, it can contain just the date portion 

1024 without the time component. 

1025 

1026 The returned value will be a datetime object that will have tzinfo. 

1027 If no timezone info was provided in the input value, then UTC is 

1028 assumed, not local time. 

1029 

1030 """ 

1031 # This is a general purpose method that handles several cases of 

1032 # converting the provided value to a string timestamp suitable to be 

1033 # serialized to an http request. It can handle: 

1034 # 1) A datetime.datetime object. 

1035 if isinstance(value, _DatetimeClass): 

1036 datetime_obj = value 

1037 else: 

1038 # 2) A string object that's formatted as a timestamp. 

1039 # We document this as being an iso8601 timestamp, although 

1040 # parse_timestamp is a bit more flexible. 

1041 datetime_obj = parse_timestamp(value) 

1042 if datetime_obj.tzinfo is None: 

1043 # I think a case would be made that if no time zone is provided, 

1044 # we should use the local time. However, to restore backwards 

1045 # compat, the previous behavior was to assume UTC, which is 

1046 # what we're going to do here. 

1047 datetime_obj = datetime_obj.replace(tzinfo=tzutc()) 

1048 else: 

1049 datetime_obj = datetime_obj.astimezone(tzutc()) 

1050 return datetime_obj 

1051 

1052 

1053def datetime2timestamp(dt, default_timezone=None): 

1054 """Calculate the timestamp based on the given datetime instance. 

1055 

1056 :type dt: datetime 

1057 :param dt: A datetime object to be converted into timestamp 

1058 :type default_timezone: tzinfo 

1059 :param default_timezone: If it is provided as None, we treat it as tzutc(). 

1060 But it is only used when dt is a naive datetime. 

1061 :returns: The timestamp 

1062 """ 

1063 epoch = datetime.datetime(1970, 1, 1) 

1064 if dt.tzinfo is None: 

1065 if default_timezone is None: 

1066 default_timezone = tzutc() 

1067 dt = dt.replace(tzinfo=default_timezone) 

1068 d = dt.replace(tzinfo=None) - dt.utcoffset() - epoch 

1069 return d.total_seconds() 

1070 

1071 

1072def calculate_sha256(body, as_hex=False): 

1073 """Calculate a sha256 checksum. 

1074 

1075 This method will calculate the sha256 checksum of a file like 

1076 object. Note that this method will iterate through the entire 

1077 file contents. The caller is responsible for ensuring the proper 

1078 starting position of the file and ``seek()``'ing the file back 

1079 to its starting location if other consumers need to read from 

1080 the file like object. 

1081 

1082 :param body: Any file like object. The file must be opened 

1083 in binary mode such that a ``.read()`` call returns bytes. 

1084 :param as_hex: If True, then the hex digest is returned. 

1085 If False, then the digest (as binary bytes) is returned. 

1086 

1087 :returns: The sha256 checksum 

1088 

1089 """ 

1090 checksum = hashlib.sha256() 

1091 for chunk in iter(lambda: body.read(1024 * 1024), b''): 

1092 checksum.update(chunk) 

1093 if as_hex: 

1094 return checksum.hexdigest() 

1095 else: 

1096 return checksum.digest() 

1097 

1098 

1099def calculate_tree_hash(body): 

1100 """Calculate a tree hash checksum. 

1101 

1102 For more information see: 

1103 

1104 http://docs.aws.amazon.com/amazonglacier/latest/dev/checksum-calculations.html 

1105 

1106 :param body: Any file like object. This has the same constraints as 

1107 the ``body`` param in calculate_sha256 

1108 

1109 :rtype: str 

1110 :returns: The hex version of the calculated tree hash 

1111 

1112 """ 

1113 chunks = [] 

1114 required_chunk_size = 1024 * 1024 

1115 sha256 = hashlib.sha256 

1116 for chunk in iter(lambda: body.read(required_chunk_size), b''): 

1117 chunks.append(sha256(chunk).digest()) 

1118 if not chunks: 

1119 return sha256(b'').hexdigest() 

1120 while len(chunks) > 1: 

1121 new_chunks = [] 

1122 for first, second in _in_pairs(chunks): 

1123 if second is not None: 

1124 new_chunks.append(sha256(first + second).digest()) 

1125 else: 

1126 # We're at the end of the list and there's no pair left. 

1127 new_chunks.append(first) 

1128 chunks = new_chunks 

1129 return binascii.hexlify(chunks[0]).decode('ascii') 

1130 

1131 

1132def _in_pairs(iterable): 

1133 # Creates iterator that iterates over the list in pairs: 

1134 # for a, b in _in_pairs([0, 1, 2, 3, 4]): 

1135 # print(a, b) 

1136 # 

1137 # will print: 

1138 # 0, 1 

1139 # 2, 3 

1140 # 4, None 

1141 shared_iter = iter(iterable) 

1142 # Note that zip_longest is a compat import that uses 

1143 # the itertools izip_longest. This creates an iterator, 

1144 # this call below does _not_ immediately create the list 

1145 # of pairs. 

1146 return zip_longest(shared_iter, shared_iter) 

1147 

1148 

1149class CachedProperty: 

1150 """A read only property that caches the initially computed value. 

1151 

1152 This descriptor will only call the provided ``fget`` function once. 

1153 Subsequent access to this property will return the cached value. 

1154 

1155 """ 

1156 

1157 def __init__(self, fget): 

1158 self._fget = fget 

1159 

1160 def __get__(self, obj, cls): 

1161 if obj is None: 

1162 return self 

1163 else: 

1164 computed_value = self._fget(obj) 

1165 obj.__dict__[self._fget.__name__] = computed_value 

1166 return computed_value 

1167 

1168 

1169class ArgumentGenerator: 

1170 """Generate sample input based on a shape model. 

1171 

1172 This class contains a ``generate_skeleton`` method that will take 

1173 an input/output shape (created from ``botocore.model``) and generate 

1174 a sample dictionary corresponding to the input/output shape. 

1175 

1176 The specific values used are place holder values. For strings either an 

1177 empty string or the member name can be used, for numbers 0 or 0.0 is used. 

1178 The intended usage of this class is to generate the *shape* of the input 

1179 structure. 

1180 

1181 This can be useful for operations that have complex input shapes. 

1182 This allows a user to just fill in the necessary data instead of 

1183 worrying about the specific structure of the input arguments. 

1184 

1185 Example usage:: 

1186 

1187 s = botocore.session.get_session() 

1188 ddb = s.get_service_model('dynamodb') 

1189 arg_gen = ArgumentGenerator() 

1190 sample_input = arg_gen.generate_skeleton( 

1191 ddb.operation_model('CreateTable').input_shape) 

1192 print("Sample input for dynamodb.CreateTable: %s" % sample_input) 

1193 

1194 """ 

1195 

1196 def __init__(self, use_member_names=False): 

1197 self._use_member_names = use_member_names 

1198 

1199 def generate_skeleton(self, shape): 

1200 """Generate a sample input. 

1201 

1202 :type shape: ``botocore.model.Shape`` 

1203 :param shape: The input shape. 

1204 

1205 :return: The generated skeleton input corresponding to the 

1206 provided input shape. 

1207 

1208 """ 

1209 stack = [] 

1210 return self._generate_skeleton(shape, stack) 

1211 

1212 def _generate_skeleton(self, shape, stack, name=''): 

1213 stack.append(shape.name) 

1214 try: 

1215 if shape.type_name == 'structure': 

1216 return self._generate_type_structure(shape, stack) 

1217 elif shape.type_name == 'list': 

1218 return self._generate_type_list(shape, stack) 

1219 elif shape.type_name == 'map': 

1220 return self._generate_type_map(shape, stack) 

1221 elif shape.type_name == 'string': 

1222 if self._use_member_names: 

1223 return name 

1224 if shape.enum: 

1225 return random.choice(shape.enum) 

1226 return '' 

1227 elif shape.type_name in ['integer', 'long']: 

1228 return 0 

1229 elif shape.type_name in ['float', 'double']: 

1230 return 0.0 

1231 elif shape.type_name == 'boolean': 

1232 return True 

1233 elif shape.type_name == 'timestamp': 

1234 return datetime.datetime(1970, 1, 1, 0, 0, 0) 

1235 finally: 

1236 stack.pop() 

1237 

1238 def _generate_type_structure(self, shape, stack): 

1239 if stack.count(shape.name) > 1: 

1240 return {} 

1241 skeleton = OrderedDict() 

1242 for member_name, member_shape in shape.members.items(): 

1243 skeleton[member_name] = self._generate_skeleton( 

1244 member_shape, stack, name=member_name 

1245 ) 

1246 return skeleton 

1247 

1248 def _generate_type_list(self, shape, stack): 

1249 # For list elements we've arbitrarily decided to 

1250 # return two elements for the skeleton list. 

1251 name = '' 

1252 if self._use_member_names: 

1253 name = shape.member.name 

1254 return [ 

1255 self._generate_skeleton(shape.member, stack, name), 

1256 ] 

1257 

1258 def _generate_type_map(self, shape, stack): 

1259 key_shape = shape.key 

1260 value_shape = shape.value 

1261 assert key_shape.type_name == 'string' 

1262 return OrderedDict( 

1263 [ 

1264 ('KeyName', self._generate_skeleton(value_shape, stack)), 

1265 ] 

1266 ) 

1267 

1268 

1269def is_valid_ipv6_endpoint_url(endpoint_url): 

1270 if UNSAFE_URL_CHARS.intersection(endpoint_url): 

1271 return False 

1272 hostname = f'[{urlparse(endpoint_url).hostname}]' 

1273 return IPV6_ADDRZ_RE.match(hostname) is not None 

1274 

1275 

1276def is_valid_ipv4_endpoint_url(endpoint_url): 

1277 hostname = urlparse(endpoint_url).hostname 

1278 return IPV4_RE.match(hostname) is not None 

1279 

1280 

1281def is_valid_endpoint_url(endpoint_url): 

1282 """Verify the endpoint_url is valid. 

1283 

1284 :type endpoint_url: string 

1285 :param endpoint_url: An endpoint_url. Must have at least a scheme 

1286 and a hostname. 

1287 

1288 :return: True if the endpoint url is valid. False otherwise. 

1289 

1290 """ 

1291 # post-bpo-43882 urlsplit() strips unsafe characters from URL, causing 

1292 # it to pass hostname validation below. Detect them early to fix that. 

1293 if UNSAFE_URL_CHARS.intersection(endpoint_url): 

1294 return False 

1295 parts = urlsplit(endpoint_url) 

1296 hostname = parts.hostname 

1297 if hostname is None: 

1298 return False 

1299 if len(hostname) > 255: 

1300 return False 

1301 if hostname[-1] == ".": 

1302 hostname = hostname[:-1] 

1303 allowed = re.compile( 

1304 r"^((?!-)[A-Z\d-]{1,63}(?<!-)\.)*((?!-)[A-Z\d-]{1,63}(?<!-))$", 

1305 re.IGNORECASE, 

1306 ) 

1307 return allowed.match(hostname) 

1308 

1309 

1310def is_valid_uri(endpoint_url): 

1311 return is_valid_endpoint_url(endpoint_url) or is_valid_ipv6_endpoint_url( 

1312 endpoint_url 

1313 ) 

1314 

1315 

1316def validate_region_name(region_name): 

1317 """Provided region_name must be a valid host label.""" 

1318 if region_name is None: 

1319 return 

1320 valid_host_label = re.compile(r'^(?![0-9]+$)(?!-)[a-zA-Z0-9-]{,63}(?<!-)$') 

1321 valid = valid_host_label.match(region_name) 

1322 if not valid: 

1323 raise InvalidRegionError(region_name=region_name) 

1324 

1325 

1326def check_dns_name(bucket_name): 

1327 """ 

1328 Check to see if the ``bucket_name`` complies with the 

1329 restricted DNS naming conventions necessary to allow 

1330 access via virtual-hosting style. 

1331 

1332 Even though "." characters are perfectly valid in this DNS 

1333 naming scheme, we are going to punt on any name containing a 

1334 "." character because these will cause SSL cert validation 

1335 problems if we try to use virtual-hosting style addressing. 

1336 """ 

1337 if '.' in bucket_name: 

1338 return False 

1339 n = len(bucket_name) 

1340 if n < 3 or n > 63: 

1341 # Wrong length 

1342 return False 

1343 match = LABEL_RE.match(bucket_name) 

1344 if match is None or match.end() != len(bucket_name): 

1345 return False 

1346 return True 

1347 

1348 

1349def fix_s3_host( 

1350 request, 

1351 signature_version, 

1352 region_name, 

1353 default_endpoint_url=None, 

1354 **kwargs, 

1355): 

1356 """ 

1357 This handler looks at S3 requests just before they are signed. 

1358 If there is a bucket name on the path (true for everything except 

1359 ListAllBuckets) it checks to see if that bucket name conforms to 

1360 the DNS naming conventions. If it does, it alters the request to 

1361 use ``virtual hosting`` style addressing rather than ``path-style`` 

1362 addressing. 

1363 

1364 """ 

1365 if request.context.get('use_global_endpoint', False): 

1366 default_endpoint_url = 's3.amazonaws.com' 

1367 try: 

1368 switch_to_virtual_host_style( 

1369 request, signature_version, default_endpoint_url 

1370 ) 

1371 except InvalidDNSNameError as e: 

1372 bucket_name = e.kwargs['bucket_name'] 

1373 logger.debug( 

1374 'Not changing URI, bucket is not DNS compatible: %s', bucket_name 

1375 ) 

1376 

1377 

1378def switch_to_virtual_host_style( 

1379 request, signature_version, default_endpoint_url=None, **kwargs 

1380): 

1381 """ 

1382 This is a handler to force virtual host style s3 addressing no matter 

1383 the signature version (which is taken in consideration for the default 

1384 case). If the bucket is not DNS compatible an InvalidDNSName is thrown. 

1385 

1386 :param request: A AWSRequest object that is about to be sent. 

1387 :param signature_version: The signature version to sign with 

1388 :param default_endpoint_url: The endpoint to use when switching to a 

1389 virtual style. If None is supplied, the virtual host will be 

1390 constructed from the url of the request. 

1391 """ 

1392 if request.auth_path is not None: 

1393 # The auth_path has already been applied (this may be a 

1394 # retried request). We don't need to perform this 

1395 # customization again. 

1396 return 

1397 elif _is_get_bucket_location_request(request): 

1398 # For the GetBucketLocation response, we should not be using 

1399 # the virtual host style addressing so we can avoid any sigv4 

1400 # issues. 

1401 logger.debug( 

1402 "Request is GetBucketLocation operation, not checking " 

1403 "for DNS compatibility." 

1404 ) 

1405 return 

1406 parts = urlsplit(request.url) 

1407 request.auth_path = parts.path 

1408 path_parts = parts.path.split('/') 

1409 

1410 # Retrieve what the endpoint we will be prepending the bucket name to. 

1411 if default_endpoint_url is None: 

1412 default_endpoint_url = parts.netloc 

1413 

1414 if len(path_parts) > 1: 

1415 bucket_name = path_parts[1] 

1416 if not bucket_name: 

1417 # If the bucket name is empty we should not be checking for 

1418 # dns compatibility. 

1419 return 

1420 logger.debug('Checking for DNS compatible bucket for: %s', request.url) 

1421 if check_dns_name(bucket_name): 

1422 # If the operation is on a bucket, the auth_path must be 

1423 # terminated with a '/' character. 

1424 if len(path_parts) == 2: 

1425 if request.auth_path[-1] != '/': 

1426 request.auth_path += '/' 

1427 path_parts.remove(bucket_name) 

1428 # At the very least the path must be a '/', such as with the 

1429 # CreateBucket operation when DNS style is being used. If this 

1430 # is not used you will get an empty path which is incorrect. 

1431 path = '/'.join(path_parts) or '/' 

1432 global_endpoint = default_endpoint_url 

1433 host = bucket_name + '.' + global_endpoint 

1434 new_tuple = (parts.scheme, host, path, parts.query, '') 

1435 new_uri = urlunsplit(new_tuple) 

1436 request.url = new_uri 

1437 logger.debug('URI updated to: %s', new_uri) 

1438 else: 

1439 raise InvalidDNSNameError(bucket_name=bucket_name) 

1440 

1441 

1442def _is_get_bucket_location_request(request): 

1443 return request.url.endswith('?location') 

1444 

1445 

1446def instance_cache(func): 

1447 """Method decorator for caching method calls to a single instance. 

1448 

1449 **This is not a general purpose caching decorator.** 

1450 

1451 In order to use this, you *must* provide an ``_instance_cache`` 

1452 attribute on the instance. 

1453 

1454 This decorator is used to cache method calls. The cache is only 

1455 scoped to a single instance though such that multiple instances 

1456 will maintain their own cache. In order to keep things simple, 

1457 this decorator requires that you provide an ``_instance_cache`` 

1458 attribute on your instance. 

1459 

1460 """ 

1461 func_name = func.__name__ 

1462 

1463 @functools.wraps(func) 

1464 def _cache_guard(self, *args, **kwargs): 

1465 cache_key = (func_name, args) 

1466 if kwargs: 

1467 kwarg_items = tuple(sorted(kwargs.items())) 

1468 cache_key = (func_name, args, kwarg_items) 

1469 result = self._instance_cache.get(cache_key) 

1470 if result is not None: 

1471 return result 

1472 result = func(self, *args, **kwargs) 

1473 self._instance_cache[cache_key] = result 

1474 return result 

1475 

1476 return _cache_guard 

1477 

1478 

1479def lru_cache_weakref(*cache_args, **cache_kwargs): 

1480 """ 

1481 Version of functools.lru_cache that stores a weak reference to ``self``. 

1482 

1483 Serves the same purpose as :py:func:`instance_cache` but uses Python's 

1484 functools implementation which offers ``max_size`` and ``typed`` properties. 

1485 

1486 lru_cache is a global cache even when used on a method. The cache's 

1487 reference to ``self`` will prevent garbage collection of the object. This 

1488 wrapper around functools.lru_cache replaces the reference to ``self`` with 

1489 a weak reference to not interfere with garbage collection. 

1490 """ 

1491 

1492 def wrapper(func): 

1493 @functools.lru_cache(*cache_args, **cache_kwargs) 

1494 def func_with_weakref(weakref_to_self, *args, **kwargs): 

1495 return func(weakref_to_self(), *args, **kwargs) 

1496 

1497 @functools.wraps(func) 

1498 def inner(self, *args, **kwargs): 

1499 for kwarg_key, kwarg_value in kwargs.items(): 

1500 if isinstance(kwarg_value, list): 

1501 kwargs[kwarg_key] = tuple(kwarg_value) 

1502 return func_with_weakref(weakref.ref(self), *args, **kwargs) 

1503 

1504 inner.cache_info = func_with_weakref.cache_info 

1505 return inner 

1506 

1507 return wrapper 

1508 

1509 

1510def switch_host_s3_accelerate(request, operation_name, **kwargs): 

1511 """Switches the current s3 endpoint with an S3 Accelerate endpoint""" 

1512 

1513 # Note that when registered the switching of the s3 host happens 

1514 # before it gets changed to virtual. So we are not concerned with ensuring 

1515 # that the bucket name is translated to the virtual style here and we 

1516 # can hard code the Accelerate endpoint. 

1517 parts = urlsplit(request.url).netloc.split('.') 

1518 parts = [p for p in parts if p in S3_ACCELERATE_WHITELIST] 

1519 endpoint = 'https://s3-accelerate.' 

1520 if len(parts) > 0: 

1521 endpoint += '.'.join(parts) + '.' 

1522 endpoint += 'amazonaws.com' 

1523 

1524 if operation_name in ['ListBuckets', 'CreateBucket', 'DeleteBucket']: 

1525 return 

1526 _switch_hosts(request, endpoint, use_new_scheme=False) 

1527 

1528 

1529def switch_host_with_param(request, param_name): 

1530 """Switches the host using a parameter value from a JSON request body""" 

1531 request_json = json.loads(request.data.decode('utf-8')) 

1532 if request_json.get(param_name): 

1533 new_endpoint = request_json[param_name] 

1534 _switch_hosts(request, new_endpoint) 

1535 

1536 

1537def _switch_hosts(request, new_endpoint, use_new_scheme=True): 

1538 final_endpoint = _get_new_endpoint( 

1539 request.url, new_endpoint, use_new_scheme 

1540 ) 

1541 request.url = final_endpoint 

1542 

1543 

1544def _get_new_endpoint(original_endpoint, new_endpoint, use_new_scheme=True): 

1545 new_endpoint_components = urlsplit(new_endpoint) 

1546 original_endpoint_components = urlsplit(original_endpoint) 

1547 scheme = original_endpoint_components.scheme 

1548 if use_new_scheme: 

1549 scheme = new_endpoint_components.scheme 

1550 final_endpoint_components = ( 

1551 scheme, 

1552 new_endpoint_components.netloc, 

1553 original_endpoint_components.path, 

1554 original_endpoint_components.query, 

1555 '', 

1556 ) 

1557 final_endpoint = urlunsplit(final_endpoint_components) 

1558 logger.debug(f'Updating URI from {original_endpoint} to {final_endpoint}') 

1559 return final_endpoint 

1560 

1561 

1562def deep_merge(base, extra): 

1563 """Deeply two dictionaries, overriding existing keys in the base. 

1564 

1565 :param base: The base dictionary which will be merged into. 

1566 :param extra: The dictionary to merge into the base. Keys from this 

1567 dictionary will take precedence. 

1568 """ 

1569 for key in extra: 

1570 # If the key represents a dict on both given dicts, merge the sub-dicts 

1571 if ( 

1572 key in base 

1573 and isinstance(base[key], dict) 

1574 and isinstance(extra[key], dict) 

1575 ): 

1576 deep_merge(base[key], extra[key]) 

1577 continue 

1578 

1579 # Otherwise, set the key on the base to be the value of the extra. 

1580 base[key] = extra[key] 

1581 

1582 

1583def hyphenize_service_id(service_id): 

1584 """Translate the form used for event emitters. 

1585 

1586 :param service_id: The service_id to convert. 

1587 """ 

1588 return service_id.replace(' ', '-').lower() 

1589 

1590 

1591class IdentityCache: 

1592 """Base IdentityCache implementation for storing and retrieving 

1593 highly accessed credentials. 

1594 

1595 This class is not intended to be instantiated in user code. 

1596 """ 

1597 

1598 METHOD = "base_identity_cache" 

1599 

1600 def __init__(self, client, credential_cls): 

1601 self._client = client 

1602 self._credential_cls = credential_cls 

1603 

1604 def get_credentials(self, **kwargs): 

1605 callback = self.build_refresh_callback(**kwargs) 

1606 metadata = callback() 

1607 credential_entry = self._credential_cls.create_from_metadata( 

1608 metadata=metadata, 

1609 refresh_using=callback, 

1610 method=self.METHOD, 

1611 advisory_timeout=45, 

1612 mandatory_timeout=10, 

1613 ) 

1614 return credential_entry 

1615 

1616 def build_refresh_callback(**kwargs): 

1617 """Callback to be implemented by subclasses. 

1618 

1619 Returns a set of metadata to be converted into a new 

1620 credential instance. 

1621 """ 

1622 raise NotImplementedError() 

1623 

1624 

1625class S3ExpressIdentityCache(IdentityCache): 

1626 """S3Express IdentityCache for retrieving and storing 

1627 credentials from CreateSession calls. 

1628 

1629 This class is not intended to be instantiated in user code. 

1630 """ 

1631 

1632 METHOD = "s3express" 

1633 

1634 def __init__(self, client, credential_cls): 

1635 self._client = client 

1636 self._credential_cls = credential_cls 

1637 

1638 @functools.lru_cache(maxsize=100) 

1639 def get_credentials(self, bucket): 

1640 return super().get_credentials(bucket=bucket) 

1641 

1642 def build_refresh_callback(self, bucket): 

1643 def refresher(): 

1644 response = self._client.create_session(Bucket=bucket) 

1645 creds = response['Credentials'] 

1646 expiration = self._serialize_if_needed( 

1647 creds['Expiration'], iso=True 

1648 ) 

1649 return { 

1650 "access_key": creds['AccessKeyId'], 

1651 "secret_key": creds['SecretAccessKey'], 

1652 "token": creds['SessionToken'], 

1653 "expiry_time": expiration, 

1654 } 

1655 

1656 return refresher 

1657 

1658 def _serialize_if_needed(self, value, iso=False): 

1659 if isinstance(value, _DatetimeClass): 

1660 if iso: 

1661 return value.isoformat() 

1662 return value.strftime('%Y-%m-%dT%H:%M:%S%Z') 

1663 return value 

1664 

1665 

1666class S3ExpressIdentityResolver: 

1667 def __init__(self, client, credential_cls, cache=None): 

1668 self._client = weakref.proxy(client) 

1669 

1670 if cache is None: 

1671 cache = S3ExpressIdentityCache(self._client, credential_cls) 

1672 self._cache = cache 

1673 

1674 def register(self, event_emitter=None): 

1675 logger.debug('Registering S3Express Identity Resolver') 

1676 emitter = event_emitter or self._client.meta.events 

1677 emitter.register('before-call.s3', self.apply_signing_cache_key) 

1678 emitter.register('before-sign.s3', self.resolve_s3express_identity) 

1679 

1680 def apply_signing_cache_key(self, params, context, **kwargs): 

1681 endpoint_properties = context.get('endpoint_properties', {}) 

1682 backend = endpoint_properties.get('backend', None) 

1683 

1684 # Add cache key if Bucket supplied for s3express request 

1685 bucket_name = context.get('input_params', {}).get('Bucket') 

1686 if backend == 'S3Express' and bucket_name is not None: 

1687 context.setdefault('signing', {}) 

1688 context['signing']['cache_key'] = bucket_name 

1689 

1690 def resolve_s3express_identity( 

1691 self, 

1692 request, 

1693 signing_name, 

1694 region_name, 

1695 signature_version, 

1696 request_signer, 

1697 operation_name, 

1698 **kwargs, 

1699 ): 

1700 signing_context = request.context.get('signing', {}) 

1701 signing_name = signing_context.get('signing_name') 

1702 if signing_name == 's3express' and signature_version.startswith( 

1703 'v4-s3express' 

1704 ): 

1705 signing_context['identity_cache'] = self._cache 

1706 if 'cache_key' not in signing_context: 

1707 signing_context['cache_key'] = ( 

1708 request.context.get('s3_redirect', {}) 

1709 .get('params', {}) 

1710 .get('Bucket') 

1711 ) 

1712 

1713 

1714class S3RegionRedirectorv2: 

1715 """Updated version of S3RegionRedirector for use when 

1716 EndpointRulesetResolver is in use for endpoint resolution. 

1717 

1718 This class is considered private and subject to abrupt breaking changes or 

1719 removal without prior announcement. Please do not use it directly. 

1720 """ 

1721 

1722 def __init__(self, endpoint_bridge, client, cache=None): 

1723 self._cache = cache or {} 

1724 self._client = weakref.proxy(client) 

1725 

1726 def register(self, event_emitter=None): 

1727 logger.debug('Registering S3 region redirector handler') 

1728 emitter = event_emitter or self._client.meta.events 

1729 emitter.register('needs-retry.s3', self.redirect_from_error) 

1730 emitter.register( 

1731 'before-parameter-build.s3', self.annotate_request_context 

1732 ) 

1733 emitter.register( 

1734 'before-endpoint-resolution.s3', self.redirect_from_cache 

1735 ) 

1736 

1737 def redirect_from_error(self, request_dict, response, operation, **kwargs): 

1738 """ 

1739 An S3 request sent to the wrong region will return an error that 

1740 contains the endpoint the request should be sent to. This handler 

1741 will add the redirect information to the signing context and then 

1742 redirect the request. 

1743 """ 

1744 if response is None: 

1745 # This could be none if there was a ConnectionError or other 

1746 # transport error. 

1747 return 

1748 

1749 redirect_ctx = request_dict.get('context', {}).get('s3_redirect', {}) 

1750 if ArnParser.is_arn(redirect_ctx.get('bucket')): 

1751 logger.debug( 

1752 'S3 request was previously for an Accesspoint ARN, not ' 

1753 'redirecting.' 

1754 ) 

1755 return 

1756 

1757 if redirect_ctx.get('redirected'): 

1758 logger.debug( 

1759 'S3 request was previously redirected, not redirecting.' 

1760 ) 

1761 return 

1762 

1763 error = response[1].get('Error', {}) 

1764 error_code = error.get('Code') 

1765 response_metadata = response[1].get('ResponseMetadata', {}) 

1766 

1767 # We have to account for 400 responses because 

1768 # if we sign a Head* request with the wrong region, 

1769 # we'll get a 400 Bad Request but we won't get a 

1770 # body saying it's an "AuthorizationHeaderMalformed". 

1771 is_special_head_object = ( 

1772 error_code in ('301', '400') and operation.name == 'HeadObject' 

1773 ) 

1774 is_special_head_bucket = ( 

1775 error_code in ('301', '400') 

1776 and operation.name == 'HeadBucket' 

1777 and 'x-amz-bucket-region' 

1778 in response_metadata.get('HTTPHeaders', {}) 

1779 ) 

1780 is_wrong_signing_region = ( 

1781 error_code == 'AuthorizationHeaderMalformed' and 'Region' in error 

1782 ) 

1783 is_redirect_status = response[0] is not None and response[ 

1784 0 

1785 ].status_code in (301, 302, 307) 

1786 is_permanent_redirect = error_code == 'PermanentRedirect' 

1787 is_opt_in_region_redirect = ( 

1788 error_code == 'IllegalLocationConstraintException' 

1789 and operation.name != 'CreateBucket' 

1790 ) 

1791 if not any( 

1792 [ 

1793 is_special_head_object, 

1794 is_wrong_signing_region, 

1795 is_permanent_redirect, 

1796 is_special_head_bucket, 

1797 is_redirect_status, 

1798 is_opt_in_region_redirect, 

1799 ] 

1800 ): 

1801 return 

1802 

1803 bucket = request_dict['context']['s3_redirect']['bucket'] 

1804 client_region = request_dict['context'].get('client_region') 

1805 new_region = self.get_bucket_region(bucket, response) 

1806 

1807 if new_region is None: 

1808 logger.debug( 

1809 f"S3 client configured for region {client_region} but the " 

1810 f"bucket {bucket} is not in that region and the proper region " 

1811 "could not be automatically determined." 

1812 ) 

1813 return 

1814 

1815 logger.debug( 

1816 f"S3 client configured for region {client_region} but the bucket {bucket} " 

1817 f"is in region {new_region}; Please configure the proper region to " 

1818 f"avoid multiple unnecessary redirects and signing attempts." 

1819 ) 

1820 # Adding the new region to _cache will make construct_endpoint() to 

1821 # use the new region as value for the AWS::Region builtin parameter. 

1822 self._cache[bucket] = new_region 

1823 

1824 # Re-resolve endpoint with new region and modify request_dict with 

1825 # the new URL, auth scheme, and signing context. 

1826 ep_resolver = self._client._ruleset_resolver 

1827 ep_info = ep_resolver.construct_endpoint( 

1828 operation_model=operation, 

1829 call_args=request_dict['context']['s3_redirect']['params'], 

1830 request_context=request_dict['context'], 

1831 ) 

1832 request_dict['url'] = self.set_request_url( 

1833 request_dict['url'], ep_info.url 

1834 ) 

1835 request_dict['context']['s3_redirect']['redirected'] = True 

1836 auth_schemes = ep_info.properties.get('authSchemes') 

1837 if auth_schemes is not None: 

1838 auth_info = ep_resolver.auth_schemes_to_signing_ctx(auth_schemes) 

1839 auth_type, signing_context = auth_info 

1840 request_dict['context']['auth_type'] = auth_type 

1841 request_dict['context']['signing'] = { 

1842 **request_dict['context'].get('signing', {}), 

1843 **signing_context, 

1844 } 

1845 

1846 # Return 0 so it doesn't wait to retry 

1847 return 0 

1848 

1849 def get_bucket_region(self, bucket, response): 

1850 """ 

1851 There are multiple potential sources for the new region to redirect to, 

1852 but they aren't all universally available for use. This will try to 

1853 find region from response elements, but will fall back to calling 

1854 HEAD on the bucket if all else fails. 

1855 

1856 :param bucket: The bucket to find the region for. This is necessary if 

1857 the region is not available in the error response. 

1858 :param response: A response representing a service request that failed 

1859 due to incorrect region configuration. 

1860 """ 

1861 # First try to source the region from the headers. 

1862 service_response = response[1] 

1863 response_headers = service_response['ResponseMetadata']['HTTPHeaders'] 

1864 if 'x-amz-bucket-region' in response_headers: 

1865 return response_headers['x-amz-bucket-region'] 

1866 

1867 # Next, check the error body 

1868 region = service_response.get('Error', {}).get('Region', None) 

1869 if region is not None: 

1870 return region 

1871 

1872 # Finally, HEAD the bucket. No other choice sadly. 

1873 try: 

1874 response = self._client.head_bucket(Bucket=bucket) 

1875 headers = response['ResponseMetadata']['HTTPHeaders'] 

1876 except ClientError as e: 

1877 headers = e.response['ResponseMetadata']['HTTPHeaders'] 

1878 

1879 region = headers.get('x-amz-bucket-region', None) 

1880 return region 

1881 

1882 def set_request_url(self, old_url, new_endpoint, **kwargs): 

1883 """ 

1884 Splice a new endpoint into an existing URL. Note that some endpoints 

1885 from the the endpoint provider have a path component which will be 

1886 discarded by this function. 

1887 """ 

1888 return _get_new_endpoint(old_url, new_endpoint, False) 

1889 

1890 def redirect_from_cache(self, builtins, params, **kwargs): 

1891 """ 

1892 If a bucket name has been redirected before, it is in the cache. This 

1893 handler will update the AWS::Region endpoint resolver builtin param 

1894 to use the region from cache instead of the client region to avoid the 

1895 redirect. 

1896 """ 

1897 bucket = params.get('Bucket') 

1898 if bucket is not None and bucket in self._cache: 

1899 new_region = self._cache.get(bucket) 

1900 builtins['AWS::Region'] = new_region 

1901 

1902 def annotate_request_context(self, params, context, **kwargs): 

1903 """Store the bucket name in context for later use when redirecting. 

1904 The bucket name may be an access point ARN or alias. 

1905 """ 

1906 bucket = params.get('Bucket') 

1907 context['s3_redirect'] = { 

1908 'redirected': False, 

1909 'bucket': bucket, 

1910 'params': params, 

1911 } 

1912 

1913 

1914class S3RegionRedirector: 

1915 """This handler has been replaced by S3RegionRedirectorv2. The original 

1916 version remains in place for any third-party libraries that import it. 

1917 """ 

1918 

1919 def __init__(self, endpoint_bridge, client, cache=None): 

1920 self._endpoint_resolver = endpoint_bridge 

1921 self._cache = cache 

1922 if self._cache is None: 

1923 self._cache = {} 

1924 

1925 # This needs to be a weak ref in order to prevent memory leaks on 

1926 # python 2.6 

1927 self._client = weakref.proxy(client) 

1928 

1929 warnings.warn( 

1930 'The S3RegionRedirector class has been deprecated for a new ' 

1931 'internal replacement. A future version of botocore may remove ' 

1932 'this class.', 

1933 category=FutureWarning, 

1934 ) 

1935 

1936 def register(self, event_emitter=None): 

1937 emitter = event_emitter or self._client.meta.events 

1938 emitter.register('needs-retry.s3', self.redirect_from_error) 

1939 emitter.register('before-call.s3', self.set_request_url) 

1940 emitter.register('before-parameter-build.s3', self.redirect_from_cache) 

1941 

1942 def redirect_from_error(self, request_dict, response, operation, **kwargs): 

1943 """ 

1944 An S3 request sent to the wrong region will return an error that 

1945 contains the endpoint the request should be sent to. This handler 

1946 will add the redirect information to the signing context and then 

1947 redirect the request. 

1948 """ 

1949 if response is None: 

1950 # This could be none if there was a ConnectionError or other 

1951 # transport error. 

1952 return 

1953 

1954 if self._is_s3_accesspoint(request_dict.get('context', {})): 

1955 logger.debug( 

1956 'S3 request was previously to an accesspoint, not redirecting.' 

1957 ) 

1958 return 

1959 

1960 if request_dict.get('context', {}).get('s3_redirected'): 

1961 logger.debug( 

1962 'S3 request was previously redirected, not redirecting.' 

1963 ) 

1964 return 

1965 

1966 error = response[1].get('Error', {}) 

1967 error_code = error.get('Code') 

1968 response_metadata = response[1].get('ResponseMetadata', {}) 

1969 

1970 # We have to account for 400 responses because 

1971 # if we sign a Head* request with the wrong region, 

1972 # we'll get a 400 Bad Request but we won't get a 

1973 # body saying it's an "AuthorizationHeaderMalformed". 

1974 is_special_head_object = ( 

1975 error_code in ('301', '400') and operation.name == 'HeadObject' 

1976 ) 

1977 is_special_head_bucket = ( 

1978 error_code in ('301', '400') 

1979 and operation.name == 'HeadBucket' 

1980 and 'x-amz-bucket-region' 

1981 in response_metadata.get('HTTPHeaders', {}) 

1982 ) 

1983 is_wrong_signing_region = ( 

1984 error_code == 'AuthorizationHeaderMalformed' and 'Region' in error 

1985 ) 

1986 is_redirect_status = response[0] is not None and response[ 

1987 0 

1988 ].status_code in (301, 302, 307) 

1989 is_permanent_redirect = error_code == 'PermanentRedirect' 

1990 if not any( 

1991 [ 

1992 is_special_head_object, 

1993 is_wrong_signing_region, 

1994 is_permanent_redirect, 

1995 is_special_head_bucket, 

1996 is_redirect_status, 

1997 ] 

1998 ): 

1999 return 

2000 

2001 bucket = request_dict['context']['signing']['bucket'] 

2002 client_region = request_dict['context'].get('client_region') 

2003 new_region = self.get_bucket_region(bucket, response) 

2004 

2005 if new_region is None: 

2006 logger.debug( 

2007 f"S3 client configured for region {client_region} but the bucket {bucket} is not " 

2008 "in that region and the proper region could not be " 

2009 "automatically determined." 

2010 ) 

2011 return 

2012 

2013 logger.debug( 

2014 f"S3 client configured for region {client_region} but the bucket {bucket} is in region" 

2015 f" {new_region}; Please configure the proper region to avoid multiple " 

2016 "unnecessary redirects and signing attempts." 

2017 ) 

2018 endpoint = self._endpoint_resolver.resolve('s3', new_region) 

2019 endpoint = endpoint['endpoint_url'] 

2020 

2021 signing_context = { 

2022 'region': new_region, 

2023 'bucket': bucket, 

2024 'endpoint': endpoint, 

2025 } 

2026 request_dict['context']['signing'] = signing_context 

2027 

2028 self._cache[bucket] = signing_context 

2029 self.set_request_url(request_dict, request_dict['context']) 

2030 

2031 request_dict['context']['s3_redirected'] = True 

2032 

2033 # Return 0 so it doesn't wait to retry 

2034 return 0 

2035 

2036 def get_bucket_region(self, bucket, response): 

2037 """ 

2038 There are multiple potential sources for the new region to redirect to, 

2039 but they aren't all universally available for use. This will try to 

2040 find region from response elements, but will fall back to calling 

2041 HEAD on the bucket if all else fails. 

2042 

2043 :param bucket: The bucket to find the region for. This is necessary if 

2044 the region is not available in the error response. 

2045 :param response: A response representing a service request that failed 

2046 due to incorrect region configuration. 

2047 """ 

2048 # First try to source the region from the headers. 

2049 service_response = response[1] 

2050 response_headers = service_response['ResponseMetadata']['HTTPHeaders'] 

2051 if 'x-amz-bucket-region' in response_headers: 

2052 return response_headers['x-amz-bucket-region'] 

2053 

2054 # Next, check the error body 

2055 region = service_response.get('Error', {}).get('Region', None) 

2056 if region is not None: 

2057 return region 

2058 

2059 # Finally, HEAD the bucket. No other choice sadly. 

2060 try: 

2061 response = self._client.head_bucket(Bucket=bucket) 

2062 headers = response['ResponseMetadata']['HTTPHeaders'] 

2063 except ClientError as e: 

2064 headers = e.response['ResponseMetadata']['HTTPHeaders'] 

2065 

2066 region = headers.get('x-amz-bucket-region', None) 

2067 return region 

2068 

2069 def set_request_url(self, params, context, **kwargs): 

2070 endpoint = context.get('signing', {}).get('endpoint', None) 

2071 if endpoint is not None: 

2072 params['url'] = _get_new_endpoint(params['url'], endpoint, False) 

2073 

2074 def redirect_from_cache(self, params, context, **kwargs): 

2075 """ 

2076 This handler retrieves a given bucket's signing context from the cache 

2077 and adds it into the request context. 

2078 """ 

2079 if self._is_s3_accesspoint(context): 

2080 return 

2081 bucket = params.get('Bucket') 

2082 signing_context = self._cache.get(bucket) 

2083 if signing_context is not None: 

2084 context['signing'] = signing_context 

2085 else: 

2086 context['signing'] = {'bucket': bucket} 

2087 

2088 def _is_s3_accesspoint(self, context): 

2089 return 's3_accesspoint' in context 

2090 

2091 

2092class InvalidArnException(ValueError): 

2093 pass 

2094 

2095 

2096class ArnParser: 

2097 def parse_arn(self, arn): 

2098 arn_parts = arn.split(':', 5) 

2099 if len(arn_parts) < 6: 

2100 raise InvalidArnException( 

2101 f'Provided ARN: {arn} must be of the format: ' 

2102 'arn:partition:service:region:account:resource' 

2103 ) 

2104 return { 

2105 'partition': arn_parts[1], 

2106 'service': arn_parts[2], 

2107 'region': arn_parts[3], 

2108 'account': arn_parts[4], 

2109 'resource': arn_parts[5], 

2110 } 

2111 

2112 @staticmethod 

2113 def is_arn(value): 

2114 if not isinstance(value, str) or not value.startswith('arn:'): 

2115 return False 

2116 arn_parser = ArnParser() 

2117 try: 

2118 arn_parser.parse_arn(value) 

2119 return True 

2120 except InvalidArnException: 

2121 return False 

2122 

2123 

2124class S3ArnParamHandler: 

2125 _RESOURCE_REGEX = re.compile( 

2126 r'^(?P<resource_type>accesspoint|outpost)[/:](?P<resource_name>.+)$' 

2127 ) 

2128 _OUTPOST_RESOURCE_REGEX = re.compile( 

2129 r'^(?P<outpost_name>[a-zA-Z0-9\-]{1,63})[/:]accesspoint[/:]' 

2130 r'(?P<accesspoint_name>[a-zA-Z0-9\-]{1,63}$)' 

2131 ) 

2132 _BLACKLISTED_OPERATIONS = ['CreateBucket'] 

2133 

2134 def __init__(self, arn_parser=None): 

2135 self._arn_parser = arn_parser 

2136 if arn_parser is None: 

2137 self._arn_parser = ArnParser() 

2138 

2139 def register(self, event_emitter): 

2140 event_emitter.register('before-parameter-build.s3', self.handle_arn) 

2141 

2142 def handle_arn(self, params, model, context, **kwargs): 

2143 if model.name in self._BLACKLISTED_OPERATIONS: 

2144 return 

2145 arn_details = self._get_arn_details_from_bucket_param(params) 

2146 if arn_details is None: 

2147 return 

2148 if arn_details['resource_type'] == 'accesspoint': 

2149 self._store_accesspoint(params, context, arn_details) 

2150 elif arn_details['resource_type'] == 'outpost': 

2151 self._store_outpost(params, context, arn_details) 

2152 

2153 def _get_arn_details_from_bucket_param(self, params): 

2154 if 'Bucket' in params: 

2155 try: 

2156 arn = params['Bucket'] 

2157 arn_details = self._arn_parser.parse_arn(arn) 

2158 self._add_resource_type_and_name(arn, arn_details) 

2159 return arn_details 

2160 except InvalidArnException: 

2161 pass 

2162 return None 

2163 

2164 def _add_resource_type_and_name(self, arn, arn_details): 

2165 match = self._RESOURCE_REGEX.match(arn_details['resource']) 

2166 if match: 

2167 arn_details['resource_type'] = match.group('resource_type') 

2168 arn_details['resource_name'] = match.group('resource_name') 

2169 else: 

2170 raise UnsupportedS3ArnError(arn=arn) 

2171 

2172 def _store_accesspoint(self, params, context, arn_details): 

2173 # Ideally the access-point would be stored as a parameter in the 

2174 # request where the serializer would then know how to serialize it, 

2175 # but access-points are not modeled in S3 operations so it would fail 

2176 # validation. Instead, we set the access-point to the bucket parameter 

2177 # to have some value set when serializing the request and additional 

2178 # information on the context from the arn to use in forming the 

2179 # access-point endpoint. 

2180 params['Bucket'] = arn_details['resource_name'] 

2181 context['s3_accesspoint'] = { 

2182 'name': arn_details['resource_name'], 

2183 'account': arn_details['account'], 

2184 'partition': arn_details['partition'], 

2185 'region': arn_details['region'], 

2186 'service': arn_details['service'], 

2187 } 

2188 

2189 def _store_outpost(self, params, context, arn_details): 

2190 resource_name = arn_details['resource_name'] 

2191 match = self._OUTPOST_RESOURCE_REGEX.match(resource_name) 

2192 if not match: 

2193 raise UnsupportedOutpostResourceError(resource_name=resource_name) 

2194 # Because we need to set the bucket name to something to pass 

2195 # validation we're going to use the access point name to be consistent 

2196 # with normal access point arns. 

2197 accesspoint_name = match.group('accesspoint_name') 

2198 params['Bucket'] = accesspoint_name 

2199 context['s3_accesspoint'] = { 

2200 'outpost_name': match.group('outpost_name'), 

2201 'name': accesspoint_name, 

2202 'account': arn_details['account'], 

2203 'partition': arn_details['partition'], 

2204 'region': arn_details['region'], 

2205 'service': arn_details['service'], 

2206 } 

2207 

2208 

2209class S3EndpointSetter: 

2210 _DEFAULT_PARTITION = 'aws' 

2211 _DEFAULT_DNS_SUFFIX = 'amazonaws.com' 

2212 

2213 def __init__( 

2214 self, 

2215 endpoint_resolver, 

2216 region=None, 

2217 s3_config=None, 

2218 endpoint_url=None, 

2219 partition=None, 

2220 use_fips_endpoint=False, 

2221 ): 

2222 # This is calling the endpoint_resolver in regions.py 

2223 self._endpoint_resolver = endpoint_resolver 

2224 self._region = region 

2225 self._s3_config = s3_config 

2226 self._use_fips_endpoint = use_fips_endpoint 

2227 if s3_config is None: 

2228 self._s3_config = {} 

2229 self._endpoint_url = endpoint_url 

2230 self._partition = partition 

2231 if partition is None: 

2232 self._partition = self._DEFAULT_PARTITION 

2233 

2234 def register(self, event_emitter): 

2235 event_emitter.register('before-sign.s3', self.set_endpoint) 

2236 event_emitter.register('choose-signer.s3', self.set_signer) 

2237 event_emitter.register( 

2238 'before-call.s3.WriteGetObjectResponse', 

2239 self.update_endpoint_to_s3_object_lambda, 

2240 ) 

2241 

2242 def update_endpoint_to_s3_object_lambda(self, params, context, **kwargs): 

2243 if self._use_accelerate_endpoint: 

2244 raise UnsupportedS3ConfigurationError( 

2245 msg='S3 client does not support accelerate endpoints for S3 Object Lambda operations', 

2246 ) 

2247 

2248 self._override_signing_name(context, 's3-object-lambda') 

2249 if self._endpoint_url: 

2250 # Only update the url if an explicit url was not provided 

2251 return 

2252 

2253 resolver = self._endpoint_resolver 

2254 # Constructing endpoints as s3-object-lambda as region 

2255 resolved = resolver.construct_endpoint( 

2256 's3-object-lambda', self._region 

2257 ) 

2258 

2259 # Ideally we would be able to replace the endpoint before 

2260 # serialization but there's no event to do that currently 

2261 # host_prefix is all the arn/bucket specs 

2262 new_endpoint = 'https://{host_prefix}{hostname}'.format( 

2263 host_prefix=params['host_prefix'], 

2264 hostname=resolved['hostname'], 

2265 ) 

2266 

2267 params['url'] = _get_new_endpoint(params['url'], new_endpoint, False) 

2268 

2269 def set_endpoint(self, request, **kwargs): 

2270 if self._use_accesspoint_endpoint(request): 

2271 self._validate_accesspoint_supported(request) 

2272 self._validate_fips_supported(request) 

2273 self._validate_global_regions(request) 

2274 region_name = self._resolve_region_for_accesspoint_endpoint( 

2275 request 

2276 ) 

2277 self._resolve_signing_name_for_accesspoint_endpoint(request) 

2278 self._switch_to_accesspoint_endpoint(request, region_name) 

2279 return 

2280 if self._use_accelerate_endpoint: 

2281 if self._use_fips_endpoint: 

2282 raise UnsupportedS3ConfigurationError( 

2283 msg=( 

2284 'Client is configured to use the FIPS psuedo region ' 

2285 f'for "{self._region}", but S3 Accelerate does not have any FIPS ' 

2286 'compatible endpoints.' 

2287 ) 

2288 ) 

2289 switch_host_s3_accelerate(request=request, **kwargs) 

2290 if self._s3_addressing_handler: 

2291 self._s3_addressing_handler(request=request, **kwargs) 

2292 

2293 def _use_accesspoint_endpoint(self, request): 

2294 return 's3_accesspoint' in request.context 

2295 

2296 def _validate_fips_supported(self, request): 

2297 if not self._use_fips_endpoint: 

2298 return 

2299 if 'fips' in request.context['s3_accesspoint']['region']: 

2300 raise UnsupportedS3AccesspointConfigurationError( 

2301 msg={'Invalid ARN, FIPS region not allowed in ARN.'} 

2302 ) 

2303 if 'outpost_name' in request.context['s3_accesspoint']: 

2304 raise UnsupportedS3AccesspointConfigurationError( 

2305 msg=( 

2306 f'Client is configured to use the FIPS psuedo-region "{self._region}", ' 

2307 'but outpost ARNs do not support FIPS endpoints.' 

2308 ) 

2309 ) 

2310 # Transforming psuedo region to actual region 

2311 accesspoint_region = request.context['s3_accesspoint']['region'] 

2312 if accesspoint_region != self._region: 

2313 if not self._s3_config.get('use_arn_region', True): 

2314 # TODO: Update message to reflect use_arn_region 

2315 # is not set 

2316 raise UnsupportedS3AccesspointConfigurationError( 

2317 msg=( 

2318 'Client is configured to use the FIPS psuedo-region ' 

2319 f'for "{self._region}", but the access-point ARN provided is for ' 

2320 f'the "{accesspoint_region}" region. For clients using a FIPS ' 

2321 'psuedo-region calls to access-point ARNs in another ' 

2322 'region are not allowed.' 

2323 ) 

2324 ) 

2325 

2326 def _validate_global_regions(self, request): 

2327 if self._s3_config.get('use_arn_region', True): 

2328 return 

2329 if self._region in ['aws-global', 's3-external-1']: 

2330 raise UnsupportedS3AccesspointConfigurationError( 

2331 msg=( 

2332 'Client is configured to use the global psuedo-region ' 

2333 f'"{self._region}". When providing access-point ARNs a regional ' 

2334 'endpoint must be specified.' 

2335 ) 

2336 ) 

2337 

2338 def _validate_accesspoint_supported(self, request): 

2339 if self._use_accelerate_endpoint: 

2340 raise UnsupportedS3AccesspointConfigurationError( 

2341 msg=( 

2342 'Client does not support s3 accelerate configuration ' 

2343 'when an access-point ARN is specified.' 

2344 ) 

2345 ) 

2346 request_partition = request.context['s3_accesspoint']['partition'] 

2347 if request_partition != self._partition: 

2348 raise UnsupportedS3AccesspointConfigurationError( 

2349 msg=( 

2350 f'Client is configured for "{self._partition}" partition, but access-point' 

2351 f' ARN provided is for "{request_partition}" partition. The client and ' 

2352 ' access-point partition must be the same.' 

2353 ) 

2354 ) 

2355 s3_service = request.context['s3_accesspoint'].get('service') 

2356 if s3_service == 's3-object-lambda' and self._s3_config.get( 

2357 'use_dualstack_endpoint' 

2358 ): 

2359 raise UnsupportedS3AccesspointConfigurationError( 

2360 msg=( 

2361 'Client does not support s3 dualstack configuration ' 

2362 'when an S3 Object Lambda access point ARN is specified.' 

2363 ) 

2364 ) 

2365 outpost_name = request.context['s3_accesspoint'].get('outpost_name') 

2366 if outpost_name and self._s3_config.get('use_dualstack_endpoint'): 

2367 raise UnsupportedS3AccesspointConfigurationError( 

2368 msg=( 

2369 'Client does not support s3 dualstack configuration ' 

2370 'when an outpost ARN is specified.' 

2371 ) 

2372 ) 

2373 self._validate_mrap_s3_config(request) 

2374 

2375 def _validate_mrap_s3_config(self, request): 

2376 if not is_global_accesspoint(request.context): 

2377 return 

2378 if self._s3_config.get('s3_disable_multiregion_access_points'): 

2379 raise UnsupportedS3AccesspointConfigurationError( 

2380 msg=( 

2381 'Invalid configuration, Multi-Region Access Point ' 

2382 'ARNs are disabled.' 

2383 ) 

2384 ) 

2385 elif self._s3_config.get('use_dualstack_endpoint'): 

2386 raise UnsupportedS3AccesspointConfigurationError( 

2387 msg=( 

2388 'Client does not support s3 dualstack configuration ' 

2389 'when a Multi-Region Access Point ARN is specified.' 

2390 ) 

2391 ) 

2392 

2393 def _resolve_region_for_accesspoint_endpoint(self, request): 

2394 if is_global_accesspoint(request.context): 

2395 # Requests going to MRAP endpoints MUST be set to any (*) region. 

2396 self._override_signing_region(request, '*') 

2397 elif self._s3_config.get('use_arn_region', True): 

2398 accesspoint_region = request.context['s3_accesspoint']['region'] 

2399 # If we are using the region from the access point, 

2400 # we will also want to make sure that we set it as the 

2401 # signing region as well 

2402 self._override_signing_region(request, accesspoint_region) 

2403 return accesspoint_region 

2404 return self._region 

2405 

2406 def set_signer(self, context, **kwargs): 

2407 if is_global_accesspoint(context): 

2408 if HAS_CRT: 

2409 return 's3v4a' 

2410 else: 

2411 raise MissingDependencyException( 

2412 msg="Using S3 with an MRAP arn requires an additional " 

2413 "dependency. You will need to pip install " 

2414 "botocore[crt] before proceeding." 

2415 ) 

2416 

2417 def _resolve_signing_name_for_accesspoint_endpoint(self, request): 

2418 accesspoint_service = request.context['s3_accesspoint']['service'] 

2419 self._override_signing_name(request.context, accesspoint_service) 

2420 

2421 def _switch_to_accesspoint_endpoint(self, request, region_name): 

2422 original_components = urlsplit(request.url) 

2423 accesspoint_endpoint = urlunsplit( 

2424 ( 

2425 original_components.scheme, 

2426 self._get_netloc(request.context, region_name), 

2427 self._get_accesspoint_path( 

2428 original_components.path, request.context 

2429 ), 

2430 original_components.query, 

2431 '', 

2432 ) 

2433 ) 

2434 logger.debug( 

2435 f'Updating URI from {request.url} to {accesspoint_endpoint}' 

2436 ) 

2437 request.url = accesspoint_endpoint 

2438 

2439 def _get_netloc(self, request_context, region_name): 

2440 if is_global_accesspoint(request_context): 

2441 return self._get_mrap_netloc(request_context) 

2442 else: 

2443 return self._get_accesspoint_netloc(request_context, region_name) 

2444 

2445 def _get_mrap_netloc(self, request_context): 

2446 s3_accesspoint = request_context['s3_accesspoint'] 

2447 region_name = 's3-global' 

2448 mrap_netloc_components = [s3_accesspoint['name']] 

2449 if self._endpoint_url: 

2450 endpoint_url_netloc = urlsplit(self._endpoint_url).netloc 

2451 mrap_netloc_components.append(endpoint_url_netloc) 

2452 else: 

2453 partition = s3_accesspoint['partition'] 

2454 mrap_netloc_components.extend( 

2455 [ 

2456 'accesspoint', 

2457 region_name, 

2458 self._get_partition_dns_suffix(partition), 

2459 ] 

2460 ) 

2461 return '.'.join(mrap_netloc_components) 

2462 

2463 def _get_accesspoint_netloc(self, request_context, region_name): 

2464 s3_accesspoint = request_context['s3_accesspoint'] 

2465 accesspoint_netloc_components = [ 

2466 '{}-{}'.format(s3_accesspoint['name'], s3_accesspoint['account']), 

2467 ] 

2468 outpost_name = s3_accesspoint.get('outpost_name') 

2469 if self._endpoint_url: 

2470 if outpost_name: 

2471 accesspoint_netloc_components.append(outpost_name) 

2472 endpoint_url_netloc = urlsplit(self._endpoint_url).netloc 

2473 accesspoint_netloc_components.append(endpoint_url_netloc) 

2474 else: 

2475 if outpost_name: 

2476 outpost_host = [outpost_name, 's3-outposts'] 

2477 accesspoint_netloc_components.extend(outpost_host) 

2478 elif s3_accesspoint['service'] == 's3-object-lambda': 

2479 component = self._inject_fips_if_needed( 

2480 's3-object-lambda', request_context 

2481 ) 

2482 accesspoint_netloc_components.append(component) 

2483 else: 

2484 component = self._inject_fips_if_needed( 

2485 's3-accesspoint', request_context 

2486 ) 

2487 accesspoint_netloc_components.append(component) 

2488 if self._s3_config.get('use_dualstack_endpoint'): 

2489 accesspoint_netloc_components.append('dualstack') 

2490 accesspoint_netloc_components.extend( 

2491 [region_name, self._get_dns_suffix(region_name)] 

2492 ) 

2493 return '.'.join(accesspoint_netloc_components) 

2494 

2495 def _inject_fips_if_needed(self, component, request_context): 

2496 if self._use_fips_endpoint: 

2497 return f'{component}-fips' 

2498 return component 

2499 

2500 def _get_accesspoint_path(self, original_path, request_context): 

2501 # The Bucket parameter was substituted with the access-point name as 

2502 # some value was required in serializing the bucket name. Now that 

2503 # we are making the request directly to the access point, we will 

2504 # want to remove that access-point name from the path. 

2505 name = request_context['s3_accesspoint']['name'] 

2506 # All S3 operations require at least a / in their path. 

2507 return original_path.replace('/' + name, '', 1) or '/' 

2508 

2509 def _get_partition_dns_suffix(self, partition_name): 

2510 dns_suffix = self._endpoint_resolver.get_partition_dns_suffix( 

2511 partition_name 

2512 ) 

2513 if dns_suffix is None: 

2514 dns_suffix = self._DEFAULT_DNS_SUFFIX 

2515 return dns_suffix 

2516 

2517 def _get_dns_suffix(self, region_name): 

2518 resolved = self._endpoint_resolver.construct_endpoint( 

2519 's3', region_name 

2520 ) 

2521 dns_suffix = self._DEFAULT_DNS_SUFFIX 

2522 if resolved and 'dnsSuffix' in resolved: 

2523 dns_suffix = resolved['dnsSuffix'] 

2524 return dns_suffix 

2525 

2526 def _override_signing_region(self, request, region_name): 

2527 signing_context = request.context.get('signing', {}) 

2528 # S3SigV4Auth will use the context['signing']['region'] value to 

2529 # sign with if present. This is used by the Bucket redirector 

2530 # as well but we should be fine because the redirector is never 

2531 # used in combination with the accesspoint setting logic. 

2532 signing_context['region'] = region_name 

2533 request.context['signing'] = signing_context 

2534 

2535 def _override_signing_name(self, context, signing_name): 

2536 signing_context = context.get('signing', {}) 

2537 # S3SigV4Auth will use the context['signing']['signing_name'] value to 

2538 # sign with if present. This is used by the Bucket redirector 

2539 # as well but we should be fine because the redirector is never 

2540 # used in combination with the accesspoint setting logic. 

2541 signing_context['signing_name'] = signing_name 

2542 context['signing'] = signing_context 

2543 

2544 @CachedProperty 

2545 def _use_accelerate_endpoint(self): 

2546 # Enable accelerate if the configuration is set to to true or the 

2547 # endpoint being used matches one of the accelerate endpoints. 

2548 

2549 # Accelerate has been explicitly configured. 

2550 if self._s3_config.get('use_accelerate_endpoint'): 

2551 return True 

2552 

2553 # Accelerate mode is turned on automatically if an endpoint url is 

2554 # provided that matches the accelerate scheme. 

2555 if self._endpoint_url is None: 

2556 return False 

2557 

2558 # Accelerate is only valid for Amazon endpoints. 

2559 netloc = urlsplit(self._endpoint_url).netloc 

2560 if not netloc.endswith('amazonaws.com'): 

2561 return False 

2562 

2563 # The first part of the url should always be s3-accelerate. 

2564 parts = netloc.split('.') 

2565 if parts[0] != 's3-accelerate': 

2566 return False 

2567 

2568 # Url parts between 's3-accelerate' and 'amazonaws.com' which 

2569 # represent different url features. 

2570 feature_parts = parts[1:-2] 

2571 

2572 # There should be no duplicate url parts. 

2573 if len(feature_parts) != len(set(feature_parts)): 

2574 return False 

2575 

2576 # Remaining parts must all be in the whitelist. 

2577 return all(p in S3_ACCELERATE_WHITELIST for p in feature_parts) 

2578 

2579 @CachedProperty 

2580 def _addressing_style(self): 

2581 # Use virtual host style addressing if accelerate is enabled or if 

2582 # the given endpoint url is an accelerate endpoint. 

2583 if self._use_accelerate_endpoint: 

2584 return 'virtual' 

2585 

2586 # If a particular addressing style is configured, use it. 

2587 configured_addressing_style = self._s3_config.get('addressing_style') 

2588 if configured_addressing_style: 

2589 return configured_addressing_style 

2590 

2591 @CachedProperty 

2592 def _s3_addressing_handler(self): 

2593 # If virtual host style was configured, use it regardless of whether 

2594 # or not the bucket looks dns compatible. 

2595 if self._addressing_style == 'virtual': 

2596 logger.debug("Using S3 virtual host style addressing.") 

2597 return switch_to_virtual_host_style 

2598 

2599 # If path style is configured, no additional steps are needed. If 

2600 # endpoint_url was specified, don't default to virtual. We could 

2601 # potentially default provided endpoint urls to virtual hosted 

2602 # style, but for now it is avoided. 

2603 if self._addressing_style == 'path' or self._endpoint_url is not None: 

2604 logger.debug("Using S3 path style addressing.") 

2605 return None 

2606 

2607 logger.debug( 

2608 "Defaulting to S3 virtual host style addressing with " 

2609 "path style addressing fallback." 

2610 ) 

2611 

2612 # By default, try to use virtual style with path fallback. 

2613 return fix_s3_host 

2614 

2615 

2616class S3ControlEndpointSetter: 

2617 _DEFAULT_PARTITION = 'aws' 

2618 _DEFAULT_DNS_SUFFIX = 'amazonaws.com' 

2619 _HOST_LABEL_REGEX = re.compile(r'^[a-zA-Z0-9\-]{1,63}$') 

2620 

2621 def __init__( 

2622 self, 

2623 endpoint_resolver, 

2624 region=None, 

2625 s3_config=None, 

2626 endpoint_url=None, 

2627 partition=None, 

2628 use_fips_endpoint=False, 

2629 ): 

2630 self._endpoint_resolver = endpoint_resolver 

2631 self._region = region 

2632 self._s3_config = s3_config 

2633 self._use_fips_endpoint = use_fips_endpoint 

2634 if s3_config is None: 

2635 self._s3_config = {} 

2636 self._endpoint_url = endpoint_url 

2637 self._partition = partition 

2638 if partition is None: 

2639 self._partition = self._DEFAULT_PARTITION 

2640 

2641 def register(self, event_emitter): 

2642 event_emitter.register('before-sign.s3-control', self.set_endpoint) 

2643 

2644 def set_endpoint(self, request, **kwargs): 

2645 if self._use_endpoint_from_arn_details(request): 

2646 self._validate_endpoint_from_arn_details_supported(request) 

2647 region_name = self._resolve_region_from_arn_details(request) 

2648 self._resolve_signing_name_from_arn_details(request) 

2649 self._resolve_endpoint_from_arn_details(request, region_name) 

2650 self._add_headers_from_arn_details(request) 

2651 elif self._use_endpoint_from_outpost_id(request): 

2652 self._validate_outpost_redirection_valid(request) 

2653 self._override_signing_name(request, 's3-outposts') 

2654 new_netloc = self._construct_outpost_endpoint(self._region) 

2655 self._update_request_netloc(request, new_netloc) 

2656 

2657 def _use_endpoint_from_arn_details(self, request): 

2658 return 'arn_details' in request.context 

2659 

2660 def _use_endpoint_from_outpost_id(self, request): 

2661 return 'outpost_id' in request.context 

2662 

2663 def _validate_endpoint_from_arn_details_supported(self, request): 

2664 if 'fips' in request.context['arn_details']['region']: 

2665 raise UnsupportedS3ControlArnError( 

2666 arn=request.context['arn_details']['original'], 

2667 msg='Invalid ARN, FIPS region not allowed in ARN.', 

2668 ) 

2669 if not self._s3_config.get('use_arn_region', False): 

2670 arn_region = request.context['arn_details']['region'] 

2671 if arn_region != self._region: 

2672 error_msg = ( 

2673 'The use_arn_region configuration is disabled but ' 

2674 f'received arn for "{arn_region}" when the client is configured ' 

2675 f'to use "{self._region}"' 

2676 ) 

2677 raise UnsupportedS3ControlConfigurationError(msg=error_msg) 

2678 request_partion = request.context['arn_details']['partition'] 

2679 if request_partion != self._partition: 

2680 raise UnsupportedS3ControlConfigurationError( 

2681 msg=( 

2682 f'Client is configured for "{self._partition}" partition, but arn ' 

2683 f'provided is for "{request_partion}" partition. The client and ' 

2684 'arn partition must be the same.' 

2685 ) 

2686 ) 

2687 if self._s3_config.get('use_accelerate_endpoint'): 

2688 raise UnsupportedS3ControlConfigurationError( 

2689 msg='S3 control client does not support accelerate endpoints', 

2690 ) 

2691 if 'outpost_name' in request.context['arn_details']: 

2692 self._validate_outpost_redirection_valid(request) 

2693 

2694 def _validate_outpost_redirection_valid(self, request): 

2695 if self._s3_config.get('use_dualstack_endpoint'): 

2696 raise UnsupportedS3ControlConfigurationError( 

2697 msg=( 

2698 'Client does not support s3 dualstack configuration ' 

2699 'when an outpost is specified.' 

2700 ) 

2701 ) 

2702 

2703 def _resolve_region_from_arn_details(self, request): 

2704 if self._s3_config.get('use_arn_region', False): 

2705 arn_region = request.context['arn_details']['region'] 

2706 # If we are using the region from the expanded arn, we will also 

2707 # want to make sure that we set it as the signing region as well 

2708 self._override_signing_region(request, arn_region) 

2709 return arn_region 

2710 return self._region 

2711 

2712 def _resolve_signing_name_from_arn_details(self, request): 

2713 arn_service = request.context['arn_details']['service'] 

2714 self._override_signing_name(request, arn_service) 

2715 return arn_service 

2716 

2717 def _resolve_endpoint_from_arn_details(self, request, region_name): 

2718 new_netloc = self._resolve_netloc_from_arn_details( 

2719 request, region_name 

2720 ) 

2721 self._update_request_netloc(request, new_netloc) 

2722 

2723 def _update_request_netloc(self, request, new_netloc): 

2724 original_components = urlsplit(request.url) 

2725 arn_details_endpoint = urlunsplit( 

2726 ( 

2727 original_components.scheme, 

2728 new_netloc, 

2729 original_components.path, 

2730 original_components.query, 

2731 '', 

2732 ) 

2733 ) 

2734 logger.debug( 

2735 f'Updating URI from {request.url} to {arn_details_endpoint}' 

2736 ) 

2737 request.url = arn_details_endpoint 

2738 

2739 def _resolve_netloc_from_arn_details(self, request, region_name): 

2740 arn_details = request.context['arn_details'] 

2741 if 'outpost_name' in arn_details: 

2742 return self._construct_outpost_endpoint(region_name) 

2743 account = arn_details['account'] 

2744 return self._construct_s3_control_endpoint(region_name, account) 

2745 

2746 def _is_valid_host_label(self, label): 

2747 return self._HOST_LABEL_REGEX.match(label) 

2748 

2749 def _validate_host_labels(self, *labels): 

2750 for label in labels: 

2751 if not self._is_valid_host_label(label): 

2752 raise InvalidHostLabelError(label=label) 

2753 

2754 def _construct_s3_control_endpoint(self, region_name, account): 

2755 self._validate_host_labels(region_name, account) 

2756 if self._endpoint_url: 

2757 endpoint_url_netloc = urlsplit(self._endpoint_url).netloc 

2758 netloc = [account, endpoint_url_netloc] 

2759 else: 

2760 netloc = [ 

2761 account, 

2762 's3-control', 

2763 ] 

2764 self._add_dualstack(netloc) 

2765 dns_suffix = self._get_dns_suffix(region_name) 

2766 netloc.extend([region_name, dns_suffix]) 

2767 return self._construct_netloc(netloc) 

2768 

2769 def _construct_outpost_endpoint(self, region_name): 

2770 self._validate_host_labels(region_name) 

2771 if self._endpoint_url: 

2772 return urlsplit(self._endpoint_url).netloc 

2773 else: 

2774 netloc = [ 

2775 's3-outposts', 

2776 region_name, 

2777 self._get_dns_suffix(region_name), 

2778 ] 

2779 self._add_fips(netloc) 

2780 return self._construct_netloc(netloc) 

2781 

2782 def _construct_netloc(self, netloc): 

2783 return '.'.join(netloc) 

2784 

2785 def _add_fips(self, netloc): 

2786 if self._use_fips_endpoint: 

2787 netloc[0] = netloc[0] + '-fips' 

2788 

2789 def _add_dualstack(self, netloc): 

2790 if self._s3_config.get('use_dualstack_endpoint'): 

2791 netloc.append('dualstack') 

2792 

2793 def _get_dns_suffix(self, region_name): 

2794 resolved = self._endpoint_resolver.construct_endpoint( 

2795 's3', region_name 

2796 ) 

2797 dns_suffix = self._DEFAULT_DNS_SUFFIX 

2798 if resolved and 'dnsSuffix' in resolved: 

2799 dns_suffix = resolved['dnsSuffix'] 

2800 return dns_suffix 

2801 

2802 def _override_signing_region(self, request, region_name): 

2803 signing_context = request.context.get('signing', {}) 

2804 # S3SigV4Auth will use the context['signing']['region'] value to 

2805 # sign with if present. This is used by the Bucket redirector 

2806 # as well but we should be fine because the redirector is never 

2807 # used in combination with the accesspoint setting logic. 

2808 signing_context['region'] = region_name 

2809 request.context['signing'] = signing_context 

2810 

2811 def _override_signing_name(self, request, signing_name): 

2812 signing_context = request.context.get('signing', {}) 

2813 # S3SigV4Auth will use the context['signing']['signing_name'] value to 

2814 # sign with if present. This is used by the Bucket redirector 

2815 # as well but we should be fine because the redirector is never 

2816 # used in combination with the accesspoint setting logic. 

2817 signing_context['signing_name'] = signing_name 

2818 request.context['signing'] = signing_context 

2819 

2820 def _add_headers_from_arn_details(self, request): 

2821 arn_details = request.context['arn_details'] 

2822 outpost_name = arn_details.get('outpost_name') 

2823 if outpost_name: 

2824 self._add_outpost_id_header(request, outpost_name) 

2825 

2826 def _add_outpost_id_header(self, request, outpost_name): 

2827 request.headers['x-amz-outpost-id'] = outpost_name 

2828 

2829 

2830class S3ControlArnParamHandler: 

2831 """This handler has been replaced by S3ControlArnParamHandlerv2. The 

2832 original version remains in place for any third-party importers. 

2833 """ 

2834 

2835 _RESOURCE_SPLIT_REGEX = re.compile(r'[/:]') 

2836 

2837 def __init__(self, arn_parser=None): 

2838 self._arn_parser = arn_parser 

2839 if arn_parser is None: 

2840 self._arn_parser = ArnParser() 

2841 warnings.warn( 

2842 'The S3ControlArnParamHandler class has been deprecated for a new ' 

2843 'internal replacement. A future version of botocore may remove ' 

2844 'this class.', 

2845 category=FutureWarning, 

2846 ) 

2847 

2848 def register(self, event_emitter): 

2849 event_emitter.register( 

2850 'before-parameter-build.s3-control', 

2851 self.handle_arn, 

2852 ) 

2853 

2854 def handle_arn(self, params, model, context, **kwargs): 

2855 if model.name in ('CreateBucket', 'ListRegionalBuckets'): 

2856 # CreateBucket and ListRegionalBuckets are special cases that do 

2857 # not obey ARN based redirection but will redirect based off of the 

2858 # presence of the OutpostId parameter 

2859 self._handle_outpost_id_param(params, model, context) 

2860 else: 

2861 self._handle_name_param(params, model, context) 

2862 self._handle_bucket_param(params, model, context) 

2863 

2864 def _get_arn_details_from_param(self, params, param_name): 

2865 if param_name not in params: 

2866 return None 

2867 try: 

2868 arn = params[param_name] 

2869 arn_details = self._arn_parser.parse_arn(arn) 

2870 arn_details['original'] = arn 

2871 arn_details['resources'] = self._split_resource(arn_details) 

2872 return arn_details 

2873 except InvalidArnException: 

2874 return None 

2875 

2876 def _split_resource(self, arn_details): 

2877 return self._RESOURCE_SPLIT_REGEX.split(arn_details['resource']) 

2878 

2879 def _override_account_id_param(self, params, arn_details): 

2880 account_id = arn_details['account'] 

2881 if 'AccountId' in params and params['AccountId'] != account_id: 

2882 error_msg = ( 

2883 'Account ID in arn does not match the AccountId parameter ' 

2884 'provided: "{}"' 

2885 ).format(params['AccountId']) 

2886 raise UnsupportedS3ControlArnError( 

2887 arn=arn_details['original'], 

2888 msg=error_msg, 

2889 ) 

2890 params['AccountId'] = account_id 

2891 

2892 def _handle_outpost_id_param(self, params, model, context): 

2893 if 'OutpostId' not in params: 

2894 return 

2895 context['outpost_id'] = params['OutpostId'] 

2896 

2897 def _handle_name_param(self, params, model, context): 

2898 # CreateAccessPoint is a special case that does not expand Name 

2899 if model.name == 'CreateAccessPoint': 

2900 return 

2901 arn_details = self._get_arn_details_from_param(params, 'Name') 

2902 if arn_details is None: 

2903 return 

2904 if self._is_outpost_accesspoint(arn_details): 

2905 self._store_outpost_accesspoint(params, context, arn_details) 

2906 else: 

2907 error_msg = 'The Name parameter does not support the provided ARN' 

2908 raise UnsupportedS3ControlArnError( 

2909 arn=arn_details['original'], 

2910 msg=error_msg, 

2911 ) 

2912 

2913 def _is_outpost_accesspoint(self, arn_details): 

2914 if arn_details['service'] != 's3-outposts': 

2915 return False 

2916 resources = arn_details['resources'] 

2917 if len(resources) != 4: 

2918 return False 

2919 # Resource must be of the form outpost/op-123/accesspoint/name 

2920 return resources[0] == 'outpost' and resources[2] == 'accesspoint' 

2921 

2922 def _store_outpost_accesspoint(self, params, context, arn_details): 

2923 self._override_account_id_param(params, arn_details) 

2924 accesspoint_name = arn_details['resources'][3] 

2925 params['Name'] = accesspoint_name 

2926 arn_details['accesspoint_name'] = accesspoint_name 

2927 arn_details['outpost_name'] = arn_details['resources'][1] 

2928 context['arn_details'] = arn_details 

2929 

2930 def _handle_bucket_param(self, params, model, context): 

2931 arn_details = self._get_arn_details_from_param(params, 'Bucket') 

2932 if arn_details is None: 

2933 return 

2934 if self._is_outpost_bucket(arn_details): 

2935 self._store_outpost_bucket(params, context, arn_details) 

2936 else: 

2937 error_msg = ( 

2938 'The Bucket parameter does not support the provided ARN' 

2939 ) 

2940 raise UnsupportedS3ControlArnError( 

2941 arn=arn_details['original'], 

2942 msg=error_msg, 

2943 ) 

2944 

2945 def _is_outpost_bucket(self, arn_details): 

2946 if arn_details['service'] != 's3-outposts': 

2947 return False 

2948 resources = arn_details['resources'] 

2949 if len(resources) != 4: 

2950 return False 

2951 # Resource must be of the form outpost/op-123/bucket/name 

2952 return resources[0] == 'outpost' and resources[2] == 'bucket' 

2953 

2954 def _store_outpost_bucket(self, params, context, arn_details): 

2955 self._override_account_id_param(params, arn_details) 

2956 bucket_name = arn_details['resources'][3] 

2957 params['Bucket'] = bucket_name 

2958 arn_details['bucket_name'] = bucket_name 

2959 arn_details['outpost_name'] = arn_details['resources'][1] 

2960 context['arn_details'] = arn_details 

2961 

2962 

2963class S3ControlArnParamHandlerv2(S3ControlArnParamHandler): 

2964 """Updated version of S3ControlArnParamHandler for use when 

2965 EndpointRulesetResolver is in use for endpoint resolution. 

2966 

2967 This class is considered private and subject to abrupt breaking changes or 

2968 removal without prior announcement. Please do not use it directly. 

2969 """ 

2970 

2971 def __init__(self, arn_parser=None): 

2972 self._arn_parser = arn_parser 

2973 if arn_parser is None: 

2974 self._arn_parser = ArnParser() 

2975 

2976 def register(self, event_emitter): 

2977 event_emitter.register( 

2978 'before-endpoint-resolution.s3-control', 

2979 self.handle_arn, 

2980 ) 

2981 

2982 def _handle_name_param(self, params, model, context): 

2983 # CreateAccessPoint is a special case that does not expand Name 

2984 if model.name == 'CreateAccessPoint': 

2985 return 

2986 arn_details = self._get_arn_details_from_param(params, 'Name') 

2987 if arn_details is None: 

2988 return 

2989 self._raise_for_fips_pseudo_region(arn_details) 

2990 self._raise_for_accelerate_endpoint(context) 

2991 if self._is_outpost_accesspoint(arn_details): 

2992 self._store_outpost_accesspoint(params, context, arn_details) 

2993 else: 

2994 error_msg = 'The Name parameter does not support the provided ARN' 

2995 raise UnsupportedS3ControlArnError( 

2996 arn=arn_details['original'], 

2997 msg=error_msg, 

2998 ) 

2999 

3000 def _store_outpost_accesspoint(self, params, context, arn_details): 

3001 self._override_account_id_param(params, arn_details) 

3002 

3003 def _handle_bucket_param(self, params, model, context): 

3004 arn_details = self._get_arn_details_from_param(params, 'Bucket') 

3005 if arn_details is None: 

3006 return 

3007 self._raise_for_fips_pseudo_region(arn_details) 

3008 self._raise_for_accelerate_endpoint(context) 

3009 if self._is_outpost_bucket(arn_details): 

3010 self._store_outpost_bucket(params, context, arn_details) 

3011 else: 

3012 error_msg = ( 

3013 'The Bucket parameter does not support the provided ARN' 

3014 ) 

3015 raise UnsupportedS3ControlArnError( 

3016 arn=arn_details['original'], 

3017 msg=error_msg, 

3018 ) 

3019 

3020 def _store_outpost_bucket(self, params, context, arn_details): 

3021 self._override_account_id_param(params, arn_details) 

3022 

3023 def _raise_for_fips_pseudo_region(self, arn_details): 

3024 # FIPS pseudo region names cannot be used in ARNs 

3025 arn_region = arn_details['region'] 

3026 if arn_region.startswith('fips-') or arn_region.endswith('fips-'): 

3027 raise UnsupportedS3ControlArnError( 

3028 arn=arn_details['original'], 

3029 msg='Invalid ARN, FIPS region not allowed in ARN.', 

3030 ) 

3031 

3032 def _raise_for_accelerate_endpoint(self, context): 

3033 s3_config = context['client_config'].s3 or {} 

3034 if s3_config.get('use_accelerate_endpoint'): 

3035 raise UnsupportedS3ControlConfigurationError( 

3036 msg='S3 control client does not support accelerate endpoints', 

3037 ) 

3038 

3039 

3040class ContainerMetadataFetcher: 

3041 TIMEOUT_SECONDS = 2 

3042 RETRY_ATTEMPTS = 3 

3043 SLEEP_TIME = 1 

3044 IP_ADDRESS = '169.254.170.2' 

3045 _ALLOWED_HOSTS = [ 

3046 IP_ADDRESS, 

3047 '169.254.170.23', 

3048 'fd00:ec2::23', 

3049 'localhost', 

3050 ] 

3051 

3052 def __init__(self, session=None, sleep=time.sleep): 

3053 if session is None: 

3054 session = botocore.httpsession.URLLib3Session( 

3055 timeout=self.TIMEOUT_SECONDS 

3056 ) 

3057 self._session = session 

3058 self._sleep = sleep 

3059 

3060 def retrieve_full_uri(self, full_url, headers=None): 

3061 """Retrieve JSON metadata from container metadata. 

3062 

3063 :type full_url: str 

3064 :param full_url: The full URL of the metadata service. 

3065 This should include the scheme as well, e.g 

3066 "http://localhost:123/foo" 

3067 

3068 """ 

3069 self._validate_allowed_url(full_url) 

3070 return self._retrieve_credentials(full_url, headers) 

3071 

3072 def _validate_allowed_url(self, full_url): 

3073 parsed = botocore.compat.urlparse(full_url) 

3074 if self._is_loopback_address(parsed.hostname): 

3075 return 

3076 is_whitelisted_host = self._check_if_whitelisted_host(parsed.hostname) 

3077 if not is_whitelisted_host: 

3078 raise ValueError( 

3079 f"Unsupported host '{parsed.hostname}'. Can only retrieve metadata " 

3080 f"from a loopback address or one of these hosts: {', '.join(self._ALLOWED_HOSTS)}" 

3081 ) 

3082 

3083 def _is_loopback_address(self, hostname): 

3084 try: 

3085 ip = ip_address(hostname) 

3086 return ip.is_loopback 

3087 except ValueError: 

3088 return False 

3089 

3090 def _check_if_whitelisted_host(self, host): 

3091 if host in self._ALLOWED_HOSTS: 

3092 return True 

3093 return False 

3094 

3095 def retrieve_uri(self, relative_uri): 

3096 """Retrieve JSON metadata from container metadata. 

3097 

3098 :type relative_uri: str 

3099 :param relative_uri: A relative URI, e.g "/foo/bar?id=123" 

3100 

3101 :return: The parsed JSON response. 

3102 

3103 """ 

3104 full_url = self.full_url(relative_uri) 

3105 return self._retrieve_credentials(full_url) 

3106 

3107 def _retrieve_credentials(self, full_url, extra_headers=None): 

3108 headers = {'Accept': 'application/json'} 

3109 if extra_headers is not None: 

3110 headers.update(extra_headers) 

3111 attempts = 0 

3112 while True: 

3113 try: 

3114 return self._get_response( 

3115 full_url, headers, self.TIMEOUT_SECONDS 

3116 ) 

3117 except MetadataRetrievalError as e: 

3118 logger.debug( 

3119 "Received error when attempting to retrieve " 

3120 "container metadata: %s", 

3121 e, 

3122 exc_info=True, 

3123 ) 

3124 self._sleep(self.SLEEP_TIME) 

3125 attempts += 1 

3126 if attempts >= self.RETRY_ATTEMPTS: 

3127 raise 

3128 

3129 def _get_response(self, full_url, headers, timeout): 

3130 try: 

3131 AWSRequest = botocore.awsrequest.AWSRequest 

3132 request = AWSRequest(method='GET', url=full_url, headers=headers) 

3133 response = self._session.send(request.prepare()) 

3134 response_text = response.content.decode('utf-8') 

3135 if response.status_code != 200: 

3136 raise MetadataRetrievalError( 

3137 error_msg=( 

3138 f"Received non 200 response {response.status_code} " 

3139 f"from container metadata: {response_text}" 

3140 ) 

3141 ) 

3142 try: 

3143 return json.loads(response_text) 

3144 except ValueError: 

3145 error_msg = "Unable to parse JSON returned from container metadata services" 

3146 logger.debug('%s:%s', error_msg, response_text) 

3147 raise MetadataRetrievalError(error_msg=error_msg) 

3148 except RETRYABLE_HTTP_ERRORS as e: 

3149 error_msg = ( 

3150 "Received error when attempting to retrieve " 

3151 f"container metadata: {e}" 

3152 ) 

3153 raise MetadataRetrievalError(error_msg=error_msg) 

3154 

3155 def full_url(self, relative_uri): 

3156 return f'http://{self.IP_ADDRESS}{relative_uri}' 

3157 

3158 

3159def get_environ_proxies(url): 

3160 if should_bypass_proxies(url): 

3161 return {} 

3162 else: 

3163 return getproxies() 

3164 

3165 

3166def should_bypass_proxies(url): 

3167 """ 

3168 Returns whether we should bypass proxies or not. 

3169 """ 

3170 # NOTE: requests allowed for ip/cidr entries in no_proxy env that we don't 

3171 # support current as urllib only checks DNS suffix 

3172 # If the system proxy settings indicate that this URL should be bypassed, 

3173 # don't proxy. 

3174 # The proxy_bypass function is incredibly buggy on OS X in early versions 

3175 # of Python 2.6, so allow this call to fail. Only catch the specific 

3176 # exceptions we've seen, though: this call failing in other ways can reveal 

3177 # legitimate problems. 

3178 try: 

3179 if proxy_bypass(urlparse(url).netloc): 

3180 return True 

3181 except (TypeError, socket.gaierror): 

3182 pass 

3183 

3184 return False 

3185 

3186 

3187def determine_content_length(body): 

3188 # No body, content length of 0 

3189 if not body: 

3190 return 0 

3191 

3192 # Try asking the body for it's length 

3193 try: 

3194 return len(body) 

3195 except (AttributeError, TypeError): 

3196 pass 

3197 

3198 # Try getting the length from a seekable stream 

3199 if hasattr(body, 'seek') and hasattr(body, 'tell'): 

3200 try: 

3201 orig_pos = body.tell() 

3202 body.seek(0, 2) 

3203 end_file_pos = body.tell() 

3204 body.seek(orig_pos) 

3205 return end_file_pos - orig_pos 

3206 except io.UnsupportedOperation: 

3207 # in case when body is, for example, io.BufferedIOBase object 

3208 # it has "seek" method which throws "UnsupportedOperation" 

3209 # exception in such case we want to fall back to "chunked" 

3210 # encoding 

3211 pass 

3212 # Failed to determine the length 

3213 return None 

3214 

3215 

3216def get_encoding_from_headers(headers, default='ISO-8859-1'): 

3217 """Returns encodings from given HTTP Header Dict. 

3218 

3219 :param headers: dictionary to extract encoding from. 

3220 :param default: default encoding if the content-type is text 

3221 """ 

3222 

3223 content_type = headers.get('content-type') 

3224 

3225 if not content_type: 

3226 return None 

3227 

3228 message = email.message.Message() 

3229 message['content-type'] = content_type 

3230 charset = message.get_param("charset") 

3231 

3232 if charset is not None: 

3233 return charset 

3234 

3235 if 'text' in content_type: 

3236 return default 

3237 

3238 

3239def calculate_md5(body, **kwargs): 

3240 """This function has been deprecated, but is kept for backwards compatibility.""" 

3241 if isinstance(body, (bytes, bytearray)): 

3242 binary_md5 = _calculate_md5_from_bytes(body) 

3243 else: 

3244 binary_md5 = _calculate_md5_from_file(body) 

3245 return base64.b64encode(binary_md5).decode('ascii') 

3246 

3247 

3248def _calculate_md5_from_bytes(body_bytes): 

3249 """This function has been deprecated, but is kept for backwards compatibility.""" 

3250 md5 = get_md5(body_bytes, usedforsecurity=False) 

3251 return md5.digest() 

3252 

3253 

3254def _calculate_md5_from_file(fileobj): 

3255 """This function has been deprecated, but is kept for backwards compatibility.""" 

3256 start_position = fileobj.tell() 

3257 md5 = get_md5(usedforsecurity=False) 

3258 for chunk in iter(lambda: fileobj.read(1024 * 1024), b''): 

3259 md5.update(chunk) 

3260 fileobj.seek(start_position) 

3261 return md5.digest() 

3262 

3263 

3264def _is_s3express_request(params): 

3265 endpoint_properties = params.get('context', {}).get( 

3266 'endpoint_properties', {} 

3267 ) 

3268 return endpoint_properties.get('backend') == 'S3Express' 

3269 

3270 

3271def has_checksum_header(params): 

3272 """ 

3273 Checks if a header starting with "x-amz-checksum-" is provided in a request. 

3274 

3275 This function is considered private and subject to abrupt breaking changes or 

3276 removal without prior announcement. Please do not use it directly. 

3277 """ 

3278 headers = params['headers'] 

3279 

3280 # If a header matching the x-amz-checksum-* pattern is present, we 

3281 # assume a checksum has already been provided by the user. 

3282 for header in headers: 

3283 if CHECKSUM_HEADER_PATTERN.match(header): 

3284 return True 

3285 

3286 return False 

3287 

3288 

3289def conditionally_calculate_checksum(params, **kwargs): 

3290 """This function has been deprecated, but is kept for backwards compatibility.""" 

3291 if not has_checksum_header(params): 

3292 conditionally_calculate_md5(params, **kwargs) 

3293 conditionally_enable_crc32(params, **kwargs) 

3294 

3295 

3296def conditionally_enable_crc32(params, **kwargs): 

3297 """This function has been deprecated, but is kept for backwards compatibility.""" 

3298 checksum_context = params.get('context', {}).get('checksum', {}) 

3299 checksum_algorithm = checksum_context.get('request_algorithm') 

3300 if ( 

3301 _is_s3express_request(params) 

3302 and params['body'] is not None 

3303 and checksum_algorithm in (None, "conditional-md5") 

3304 ): 

3305 params['context']['checksum'] = { 

3306 'request_algorithm': { 

3307 'algorithm': 'crc32', 

3308 'in': 'header', 

3309 'name': 'x-amz-checksum-crc32', 

3310 } 

3311 } 

3312 

3313 

3314def conditionally_calculate_md5(params, **kwargs): 

3315 """Only add a Content-MD5 if the system supports it. 

3316 

3317 This function has been deprecated, but is kept for backwards compatibility. 

3318 """ 

3319 body = params['body'] 

3320 checksum_context = params.get('context', {}).get('checksum', {}) 

3321 checksum_algorithm = checksum_context.get('request_algorithm') 

3322 if checksum_algorithm and checksum_algorithm != 'conditional-md5': 

3323 # Skip for requests that will have a flexible checksum applied 

3324 return 

3325 

3326 if has_checksum_header(params): 

3327 # Don't add a new header if one is already available. 

3328 return 

3329 

3330 if _is_s3express_request(params): 

3331 # S3Express doesn't support MD5 

3332 return 

3333 

3334 if MD5_AVAILABLE and body is not None: 

3335 md5_digest = calculate_md5(body, **kwargs) 

3336 params['headers']['Content-MD5'] = md5_digest 

3337 

3338 

3339class FileWebIdentityTokenLoader: 

3340 def __init__(self, web_identity_token_path, _open=open): 

3341 self._web_identity_token_path = web_identity_token_path 

3342 self._open = _open 

3343 

3344 def __call__(self): 

3345 with self._open(self._web_identity_token_path) as token_file: 

3346 return token_file.read() 

3347 

3348 

3349class SSOTokenLoader: 

3350 def __init__(self, cache=None): 

3351 if cache is None: 

3352 cache = {} 

3353 self._cache = cache 

3354 

3355 def _generate_cache_key(self, start_url, session_name): 

3356 input_str = start_url 

3357 if session_name is not None: 

3358 input_str = session_name 

3359 return hashlib.sha1(input_str.encode('utf-8')).hexdigest() 

3360 

3361 def save_token(self, start_url, token, session_name=None): 

3362 cache_key = self._generate_cache_key(start_url, session_name) 

3363 self._cache[cache_key] = token 

3364 

3365 def __call__(self, start_url, session_name=None): 

3366 cache_key = self._generate_cache_key(start_url, session_name) 

3367 logger.debug(f'Checking for cached token at: {cache_key}') 

3368 if cache_key not in self._cache: 

3369 name = start_url 

3370 if session_name is not None: 

3371 name = session_name 

3372 error_msg = f'Token for {name} does not exist' 

3373 raise SSOTokenLoadError(error_msg=error_msg) 

3374 

3375 token = self._cache[cache_key] 

3376 if 'accessToken' not in token or 'expiresAt' not in token: 

3377 error_msg = f'Token for {start_url} is invalid' 

3378 raise SSOTokenLoadError(error_msg=error_msg) 

3379 return token 

3380 

3381 

3382class EventbridgeSignerSetter: 

3383 _DEFAULT_PARTITION = 'aws' 

3384 _DEFAULT_DNS_SUFFIX = 'amazonaws.com' 

3385 

3386 def __init__(self, endpoint_resolver, region=None, endpoint_url=None): 

3387 self._endpoint_resolver = endpoint_resolver 

3388 self._region = region 

3389 self._endpoint_url = endpoint_url 

3390 

3391 def register(self, event_emitter): 

3392 event_emitter.register( 

3393 'before-parameter-build.events.PutEvents', 

3394 self.check_for_global_endpoint, 

3395 ) 

3396 event_emitter.register( 

3397 'before-call.events.PutEvents', self.set_endpoint_url 

3398 ) 

3399 

3400 def set_endpoint_url(self, params, context, **kwargs): 

3401 if 'eventbridge_endpoint' in context: 

3402 endpoint = context['eventbridge_endpoint'] 

3403 logger.debug(f"Rewriting URL from {params['url']} to {endpoint}") 

3404 params['url'] = endpoint 

3405 

3406 def check_for_global_endpoint(self, params, context, **kwargs): 

3407 endpoint = params.get('EndpointId') 

3408 if endpoint is None: 

3409 return 

3410 

3411 if len(endpoint) == 0: 

3412 raise InvalidEndpointConfigurationError( 

3413 msg='EndpointId must not be a zero length string' 

3414 ) 

3415 

3416 if not HAS_CRT: 

3417 raise MissingDependencyException( 

3418 msg="Using EndpointId requires an additional " 

3419 "dependency. You will need to pip install " 

3420 "botocore[crt] before proceeding." 

3421 ) 

3422 

3423 config = context.get('client_config') 

3424 endpoint_variant_tags = None 

3425 if config is not None: 

3426 if config.use_fips_endpoint: 

3427 raise InvalidEndpointConfigurationError( 

3428 msg="FIPS is not supported with EventBridge " 

3429 "multi-region endpoints." 

3430 ) 

3431 if config.use_dualstack_endpoint: 

3432 endpoint_variant_tags = ['dualstack'] 

3433 

3434 if self._endpoint_url is None: 

3435 # Validate endpoint is a valid hostname component 

3436 parts = urlparse(f'https://{endpoint}') 

3437 if parts.hostname != endpoint: 

3438 raise InvalidEndpointConfigurationError( 

3439 msg='EndpointId is not a valid hostname component.' 

3440 ) 

3441 resolved_endpoint = self._get_global_endpoint( 

3442 endpoint, endpoint_variant_tags=endpoint_variant_tags 

3443 ) 

3444 else: 

3445 resolved_endpoint = self._endpoint_url 

3446 

3447 context['eventbridge_endpoint'] = resolved_endpoint 

3448 context['auth_type'] = 'v4a' 

3449 

3450 def _get_global_endpoint(self, endpoint, endpoint_variant_tags=None): 

3451 resolver = self._endpoint_resolver 

3452 

3453 partition = resolver.get_partition_for_region(self._region) 

3454 if partition is None: 

3455 partition = self._DEFAULT_PARTITION 

3456 dns_suffix = resolver.get_partition_dns_suffix( 

3457 partition, endpoint_variant_tags=endpoint_variant_tags 

3458 ) 

3459 if dns_suffix is None: 

3460 dns_suffix = self._DEFAULT_DNS_SUFFIX 

3461 

3462 return f"https://{endpoint}.endpoint.events.{dns_suffix}/" 

3463 

3464 

3465def is_s3_accelerate_url(url): 

3466 """Does the URL match the S3 Accelerate endpoint scheme? 

3467 

3468 Virtual host naming style with bucket names in the netloc part of the URL 

3469 are not allowed by this function. 

3470 """ 

3471 if url is None: 

3472 return False 

3473 

3474 # Accelerate is only valid for Amazon endpoints. 

3475 url_parts = urlsplit(url) 

3476 if not url_parts.netloc.endswith( 

3477 'amazonaws.com' 

3478 ) or url_parts.scheme not in ['https', 'http']: 

3479 return False 

3480 

3481 # The first part of the URL must be s3-accelerate. 

3482 parts = url_parts.netloc.split('.') 

3483 if parts[0] != 's3-accelerate': 

3484 return False 

3485 

3486 # Url parts between 's3-accelerate' and 'amazonaws.com' which 

3487 # represent different url features. 

3488 feature_parts = parts[1:-2] 

3489 

3490 # There should be no duplicate URL parts. 

3491 if len(feature_parts) != len(set(feature_parts)): 

3492 return False 

3493 

3494 # Remaining parts must all be in the whitelist. 

3495 return all(p in S3_ACCELERATE_WHITELIST for p in feature_parts) 

3496 

3497 

3498class JSONFileCache: 

3499 """JSON file cache. 

3500 This provides a dict like interface that stores JSON serializable 

3501 objects. 

3502 The objects are serialized to JSON and stored in a file. These 

3503 values can be retrieved at a later time. 

3504 """ 

3505 

3506 CACHE_DIR = os.path.expanduser(os.path.join('~', '.aws', 'boto', 'cache')) 

3507 

3508 def __init__(self, working_dir=CACHE_DIR, dumps_func=None): 

3509 self._working_dir = working_dir 

3510 if dumps_func is None: 

3511 dumps_func = self._default_dumps 

3512 self._dumps = dumps_func 

3513 

3514 def _default_dumps(self, obj): 

3515 return json.dumps(obj, default=self._serialize_if_needed) 

3516 

3517 def __contains__(self, cache_key): 

3518 actual_key = self._convert_cache_key(cache_key) 

3519 return os.path.isfile(actual_key) 

3520 

3521 def __getitem__(self, cache_key): 

3522 """Retrieve value from a cache key.""" 

3523 actual_key = self._convert_cache_key(cache_key) 

3524 try: 

3525 with open(actual_key) as f: 

3526 return json.load(f) 

3527 except (OSError, ValueError): 

3528 raise KeyError(cache_key) 

3529 

3530 def __delitem__(self, cache_key): 

3531 actual_key = self._convert_cache_key(cache_key) 

3532 try: 

3533 key_path = Path(actual_key) 

3534 key_path.unlink() 

3535 except FileNotFoundError: 

3536 raise KeyError(cache_key) 

3537 

3538 def __setitem__(self, cache_key, value): 

3539 full_key = self._convert_cache_key(cache_key) 

3540 try: 

3541 file_content = self._dumps(value) 

3542 except (TypeError, ValueError): 

3543 raise ValueError( 

3544 f"Value cannot be cached, must be JSON serializable: {value}" 

3545 ) 

3546 if not os.path.isdir(self._working_dir): 

3547 os.makedirs(self._working_dir) 

3548 with os.fdopen( 

3549 os.open(full_key, os.O_WRONLY | os.O_CREAT, 0o600), 'w' 

3550 ) as f: 

3551 f.truncate() 

3552 f.write(file_content) 

3553 

3554 def _convert_cache_key(self, cache_key): 

3555 full_path = os.path.join(self._working_dir, cache_key + '.json') 

3556 return full_path 

3557 

3558 def _serialize_if_needed(self, value, iso=False): 

3559 if isinstance(value, _DatetimeClass): 

3560 if iso: 

3561 return value.isoformat() 

3562 return value.strftime('%Y-%m-%dT%H:%M:%S%Z') 

3563 return value 

3564 

3565 

3566def is_s3express_bucket(bucket): 

3567 if bucket is None: 

3568 return False 

3569 return bucket.endswith('--x-s3') 

3570 

3571 

3572# This parameter is not part of the public interface and is subject to abrupt 

3573# breaking changes or removal without prior announcement. 

3574# Mapping of services that have been renamed for backwards compatibility reasons. 

3575# Keys are the previous name that should be allowed, values are the documented 

3576# and preferred client name. 

3577SERVICE_NAME_ALIASES = {'runtime.sagemaker': 'sagemaker-runtime'} 

3578 

3579 

3580# This parameter is not part of the public interface and is subject to abrupt 

3581# breaking changes or removal without prior announcement. 

3582# Mapping to determine the service ID for services that do not use it as the 

3583# model data directory name. The keys are the data directory name and the 

3584# values are the transformed service IDs (lower case and hyphenated). 

3585CLIENT_NAME_TO_HYPHENIZED_SERVICE_ID_OVERRIDES = { 

3586 # Actual service name we use -> Allowed computed service name. 

3587 'apigateway': 'api-gateway', 

3588 'application-autoscaling': 'application-auto-scaling', 

3589 'appmesh': 'app-mesh', 

3590 'autoscaling': 'auto-scaling', 

3591 'autoscaling-plans': 'auto-scaling-plans', 

3592 'ce': 'cost-explorer', 

3593 'cloudhsmv2': 'cloudhsm-v2', 

3594 'cloudsearchdomain': 'cloudsearch-domain', 

3595 'cognito-idp': 'cognito-identity-provider', 

3596 'config': 'config-service', 

3597 'cur': 'cost-and-usage-report-service', 

3598 'datapipeline': 'data-pipeline', 

3599 'directconnect': 'direct-connect', 

3600 'devicefarm': 'device-farm', 

3601 'discovery': 'application-discovery-service', 

3602 'dms': 'database-migration-service', 

3603 'ds': 'directory-service', 

3604 'ds-data': 'directory-service-data', 

3605 'dynamodbstreams': 'dynamodb-streams', 

3606 'elasticbeanstalk': 'elastic-beanstalk', 

3607 'elastictranscoder': 'elastic-transcoder', 

3608 'elb': 'elastic-load-balancing', 

3609 'elbv2': 'elastic-load-balancing-v2', 

3610 'es': 'elasticsearch-service', 

3611 'events': 'eventbridge', 

3612 'globalaccelerator': 'global-accelerator', 

3613 'iot-data': 'iot-data-plane', 

3614 'iot-jobs-data': 'iot-jobs-data-plane', 

3615 'iotevents-data': 'iot-events-data', 

3616 'iotevents': 'iot-events', 

3617 'iotwireless': 'iot-wireless', 

3618 'kinesisanalytics': 'kinesis-analytics', 

3619 'kinesisanalyticsv2': 'kinesis-analytics-v2', 

3620 'kinesisvideo': 'kinesis-video', 

3621 'lex-models': 'lex-model-building-service', 

3622 'lexv2-models': 'lex-models-v2', 

3623 'lex-runtime': 'lex-runtime-service', 

3624 'lexv2-runtime': 'lex-runtime-v2', 

3625 'logs': 'cloudwatch-logs', 

3626 'machinelearning': 'machine-learning', 

3627 'marketplacecommerceanalytics': 'marketplace-commerce-analytics', 

3628 'marketplace-entitlement': 'marketplace-entitlement-service', 

3629 'meteringmarketplace': 'marketplace-metering', 

3630 'mgh': 'migration-hub', 

3631 'sms-voice': 'pinpoint-sms-voice', 

3632 'resourcegroupstaggingapi': 'resource-groups-tagging-api', 

3633 'route53': 'route-53', 

3634 'route53domains': 'route-53-domains', 

3635 's3control': 's3-control', 

3636 'sdb': 'simpledb', 

3637 'secretsmanager': 'secrets-manager', 

3638 'serverlessrepo': 'serverlessapplicationrepository', 

3639 'servicecatalog': 'service-catalog', 

3640 'servicecatalog-appregistry': 'service-catalog-appregistry', 

3641 'stepfunctions': 'sfn', 

3642 'storagegateway': 'storage-gateway', 

3643}