Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/botocore/credentials.py: 25%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

1077 statements  

1# Copyright (c) 2012-2013 Mitch Garnaat http://garnaat.org/ 

2# Copyright 2012-2014 Amazon.com, Inc. or its affiliates. All Rights Reserved. 

3# 

4# Licensed under the Apache License, Version 2.0 (the "License"). You 

5# may not use this file except in compliance with the License. A copy of 

6# the License is located at 

7# 

8# http://aws.amazon.com/apache2.0/ 

9# 

10# or in the "license" file accompanying this file. This file is 

11# distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF 

12# ANY KIND, either express or implied. See the License for the specific 

13# language governing permissions and limitations under the License. 

14import datetime 

15import getpass 

16import json 

17import logging 

18import os 

19import subprocess 

20import threading 

21import time 

22from collections import namedtuple 

23from copy import deepcopy 

24from hashlib import sha1 

25 

26import dateutil.parser 

27from dateutil.parser import parse 

28from dateutil.tz import tzlocal, tzutc 

29 

30import botocore.compat 

31import botocore.configloader 

32from botocore import UNSIGNED 

33from botocore.compat import compat_shell_split, total_seconds 

34from botocore.config import Config 

35from botocore.exceptions import ( 

36 ConfigNotFound, 

37 CredentialRetrievalError, 

38 InfiniteLoopConfigError, 

39 InvalidConfigError, 

40 MetadataRetrievalError, 

41 PartialCredentialsError, 

42 RefreshWithMFAUnsupportedError, 

43 UnauthorizedSSOTokenError, 

44 UnknownCredentialError, 

45) 

46from botocore.tokens import SSOTokenProvider 

47from botocore.useragent import register_feature_id, register_feature_ids 

48from botocore.utils import ( 

49 ArnParser, 

50 ContainerMetadataFetcher, 

51 FileWebIdentityTokenLoader, 

52 InstanceMetadataFetcher, 

53 JSONFileCache, 

54 SSOTokenLoader, 

55 create_nested_client, 

56 parse_key_val_file, 

57 resolve_imds_endpoint_mode, 

58) 

59 

60logger = logging.getLogger(__name__) 

61ReadOnlyCredentials = namedtuple( 

62 'ReadOnlyCredentials', 

63 ['access_key', 'secret_key', 'token', 'account_id'], 

64 defaults=(None,), 

65) 

66 

67_DEFAULT_MANDATORY_REFRESH_TIMEOUT = 10 * 60 # 10 min 

68_DEFAULT_ADVISORY_REFRESH_TIMEOUT = 15 * 60 # 15 min 

69 

70 

71def create_credential_resolver(session, cache=None, region_name=None): 

72 """Create a default credential resolver. 

73 

74 This creates a pre-configured credential resolver 

75 that includes the default lookup chain for 

76 credentials. 

77 

78 """ 

79 profile_name = session.get_config_variable('profile') or 'default' 

80 metadata_timeout = session.get_config_variable('metadata_service_timeout') 

81 num_attempts = session.get_config_variable('metadata_service_num_attempts') 

82 disable_env_vars = session.instance_variables().get('profile') is not None 

83 

84 imds_config = { 

85 'ec2_metadata_service_endpoint': session.get_config_variable( 

86 'ec2_metadata_service_endpoint' 

87 ), 

88 'ec2_metadata_service_endpoint_mode': resolve_imds_endpoint_mode( 

89 session 

90 ), 

91 'ec2_credential_refresh_window': _DEFAULT_ADVISORY_REFRESH_TIMEOUT, 

92 'ec2_metadata_v1_disabled': session.get_config_variable( 

93 'ec2_metadata_v1_disabled' 

94 ), 

95 } 

96 

97 if cache is None: 

98 cache = {} 

99 

100 env_provider = EnvProvider() 

101 container_provider = ContainerProvider() 

102 instance_metadata_provider = InstanceMetadataProvider( 

103 iam_role_fetcher=InstanceMetadataFetcher( 

104 timeout=metadata_timeout, 

105 num_attempts=num_attempts, 

106 user_agent=session.user_agent(), 

107 config=imds_config, 

108 ) 

109 ) 

110 

111 profile_provider_builder = ProfileProviderBuilder( 

112 session, cache=cache, region_name=region_name 

113 ) 

114 assume_role_provider = AssumeRoleProvider( 

115 load_config=lambda: session.full_config, 

116 client_creator=_get_client_creator(session, region_name), 

117 cache=cache, 

118 profile_name=profile_name, 

119 credential_sourcer=CanonicalNameCredentialSourcer( 

120 [env_provider, container_provider, instance_metadata_provider] 

121 ), 

122 profile_provider_builder=profile_provider_builder, 

123 ) 

124 

125 pre_profile = [ 

126 env_provider, 

127 assume_role_provider, 

128 ] 

129 profile_providers = profile_provider_builder.providers( 

130 profile_name=profile_name, 

131 disable_env_vars=disable_env_vars, 

132 ) 

133 post_profile = [ 

134 OriginalEC2Provider(), 

135 BotoProvider(), 

136 container_provider, 

137 instance_metadata_provider, 

138 ] 

139 providers = pre_profile + profile_providers + post_profile 

140 

141 if disable_env_vars: 

142 # An explicitly provided profile will negate an EnvProvider. 

143 # We will defer to providers that understand the "profile" 

144 # concept to retrieve credentials. 

145 # The one edge case if is all three values are provided via 

146 # env vars: 

147 # export AWS_ACCESS_KEY_ID=foo 

148 # export AWS_SECRET_ACCESS_KEY=bar 

149 # export AWS_PROFILE=baz 

150 # Then, just like our client() calls, the explicit credentials 

151 # will take precedence. 

152 # 

153 # This precedence is enforced by leaving the EnvProvider in the chain. 

154 # This means that the only way a "profile" would win is if the 

155 # EnvProvider does not return credentials, which is what we want 

156 # in this scenario. 

157 providers.remove(env_provider) 

158 logger.debug( 

159 'Skipping environment variable credential check' 

160 ' because profile name was explicitly set.' 

161 ) 

162 

163 resolver = CredentialResolver(providers=providers) 

164 return resolver 

165 

166 

167class ProfileProviderBuilder: 

168 """This class handles the creation of profile based providers. 

169 

170 NOTE: This class is only intended for internal use. 

171 

172 This class handles the creation and ordering of the various credential 

173 providers that primarly source their configuration from the shared config. 

174 This is needed to enable sharing between the default credential chain and 

175 the source profile chain created by the assume role provider. 

176 """ 

177 

178 def __init__( 

179 self, session, cache=None, region_name=None, sso_token_cache=None 

180 ): 

181 self._session = session 

182 self._cache = cache 

183 self._region_name = region_name 

184 self._sso_token_cache = sso_token_cache 

185 

186 def providers(self, profile_name, disable_env_vars=False): 

187 return [ 

188 self._create_web_identity_provider( 

189 profile_name, 

190 disable_env_vars, 

191 ), 

192 self._create_sso_provider(profile_name), 

193 self._create_shared_credential_provider(profile_name), 

194 self._create_process_provider(profile_name), 

195 self._create_config_provider(profile_name), 

196 ] 

197 

198 def _create_process_provider(self, profile_name): 

199 return ProcessProvider( 

200 profile_name=profile_name, 

201 load_config=lambda: self._session.full_config, 

202 ) 

203 

204 def _create_shared_credential_provider(self, profile_name): 

205 credential_file = self._session.get_config_variable('credentials_file') 

206 return SharedCredentialProvider( 

207 profile_name=profile_name, 

208 creds_filename=credential_file, 

209 ) 

210 

211 def _create_config_provider(self, profile_name): 

212 config_file = self._session.get_config_variable('config_file') 

213 return ConfigProvider( 

214 profile_name=profile_name, 

215 config_filename=config_file, 

216 ) 

217 

218 def _create_web_identity_provider(self, profile_name, disable_env_vars): 

219 return AssumeRoleWithWebIdentityProvider( 

220 load_config=lambda: self._session.full_config, 

221 client_creator=_get_client_creator( 

222 self._session, self._region_name 

223 ), 

224 cache=self._cache, 

225 profile_name=profile_name, 

226 disable_env_vars=disable_env_vars, 

227 ) 

228 

229 def _create_sso_provider(self, profile_name): 

230 return SSOProvider( 

231 load_config=lambda: self._session.full_config, 

232 client_creator=self._session.create_client, 

233 profile_name=profile_name, 

234 cache=self._cache, 

235 token_cache=self._sso_token_cache, 

236 token_provider=SSOTokenProvider( 

237 self._session, 

238 cache=self._sso_token_cache, 

239 profile_name=profile_name, 

240 ), 

241 ) 

242 

243 

244def get_credentials(session): 

245 resolver = create_credential_resolver(session) 

246 return resolver.load_credentials() 

247 

248 

249def _local_now(): 

250 return datetime.datetime.now(tzlocal()) 

251 

252 

253def _parse_if_needed(value): 

254 if isinstance(value, datetime.datetime): 

255 return value 

256 return parse(value) 

257 

258 

259def _serialize_if_needed(value, iso=False): 

260 if isinstance(value, datetime.datetime): 

261 if iso: 

262 return value.isoformat() 

263 return value.strftime('%Y-%m-%dT%H:%M:%S%Z') 

264 return value 

265 

266 

267def _get_client_creator(session, region_name): 

268 def client_creator(service_name, **kwargs): 

269 create_client_kwargs = {'region_name': region_name} 

270 create_client_kwargs.update(**kwargs) 

271 return create_nested_client( 

272 session, service_name, **create_client_kwargs 

273 ) 

274 

275 return client_creator 

276 

277 

278def create_assume_role_refresher(client, params): 

279 def refresh(): 

280 response = client.assume_role(**params) 

281 credentials = response['Credentials'] 

282 # We need to normalize the credential names to 

283 # the values expected by the refresh creds. 

284 return { 

285 'access_key': credentials['AccessKeyId'], 

286 'secret_key': credentials['SecretAccessKey'], 

287 'token': credentials['SessionToken'], 

288 'expiry_time': _serialize_if_needed(credentials['Expiration']), 

289 } 

290 

291 return refresh 

292 

293 

294def create_mfa_serial_refresher(actual_refresh): 

295 class _Refresher: 

296 def __init__(self, refresh): 

297 self._refresh = refresh 

298 self._has_been_called = False 

299 

300 def __call__(self): 

301 if self._has_been_called: 

302 # We can explore an option in the future to support 

303 # reprompting for MFA, but for now we just error out 

304 # when the temp creds expire. 

305 raise RefreshWithMFAUnsupportedError() 

306 self._has_been_called = True 

307 return self._refresh() 

308 

309 return _Refresher(actual_refresh) 

310 

311 

312class Credentials: 

313 """ 

314 Holds the credentials needed to authenticate requests. 

315 

316 :param str access_key: The access key part of the credentials. 

317 :param str secret_key: The secret key part of the credentials. 

318 :param str token: The security token, valid only for session credentials. 

319 :param str method: A string which identifies where the credentials 

320 were found. 

321 :param str account_id: (optional) An account ID associated with the credentials. 

322 """ 

323 

324 def __init__( 

325 self, access_key, secret_key, token=None, method=None, account_id=None 

326 ): 

327 self.access_key = access_key 

328 self.secret_key = secret_key 

329 self.token = token 

330 

331 if method is None: 

332 method = 'explicit' 

333 self.method = method 

334 self.account_id = account_id 

335 

336 self._normalize() 

337 

338 def _normalize(self): 

339 # Keys would sometimes (accidentally) contain non-ascii characters. 

340 # It would cause a confusing UnicodeDecodeError in Python 2. 

341 # We explicitly convert them into unicode to avoid such error. 

342 # 

343 # Eventually the service will decide whether to accept the credential. 

344 # This also complies with the behavior in Python 3. 

345 self.access_key = botocore.compat.ensure_unicode(self.access_key) 

346 self.secret_key = botocore.compat.ensure_unicode(self.secret_key) 

347 

348 def get_frozen_credentials(self): 

349 return ReadOnlyCredentials( 

350 self.access_key, self.secret_key, self.token, self.account_id 

351 ) 

352 

353 def get_deferred_property(self, property_name): 

354 def get_property(): 

355 return getattr(self, property_name, None) 

356 

357 return get_property 

358 

359 

360class RefreshableCredentials(Credentials): 

361 """ 

362 Holds the credentials needed to authenticate requests. In addition, it 

363 knows how to refresh itself. 

364 

365 :param str access_key: The access key part of the credentials. 

366 :param str secret_key: The secret key part of the credentials. 

367 :param str token: The security token, valid only for session credentials. 

368 :param datetime expiry_time: The expiration time of the credentials. 

369 :param function refresh_using: Callback function to refresh the credentials. 

370 :param str method: A string which identifies where the credentials 

371 were found. 

372 :param function time_fetcher: Callback function to retrieve current time. 

373 """ 

374 

375 # The time at which we'll attempt to refresh, but not 

376 # block if someone else is refreshing. 

377 _advisory_refresh_timeout = _DEFAULT_ADVISORY_REFRESH_TIMEOUT 

378 # The time at which all threads will block waiting for 

379 # refreshed credentials. 

380 _mandatory_refresh_timeout = _DEFAULT_MANDATORY_REFRESH_TIMEOUT 

381 

382 def __init__( 

383 self, 

384 access_key, 

385 secret_key, 

386 token, 

387 expiry_time, 

388 refresh_using, 

389 method, 

390 time_fetcher=_local_now, 

391 advisory_timeout=None, 

392 mandatory_timeout=None, 

393 account_id=None, 

394 ): 

395 self._refresh_using = refresh_using 

396 self._access_key = access_key 

397 self._secret_key = secret_key 

398 self._token = token 

399 self._account_id = account_id 

400 self._expiry_time = expiry_time 

401 self._time_fetcher = time_fetcher 

402 self._refresh_lock = threading.Lock() 

403 self.method = method 

404 self._frozen_credentials = ReadOnlyCredentials( 

405 access_key, secret_key, token, account_id 

406 ) 

407 self._normalize() 

408 if advisory_timeout is not None: 

409 self._advisory_refresh_timeout = advisory_timeout 

410 if mandatory_timeout is not None: 

411 self._mandatory_refresh_timeout = mandatory_timeout 

412 

413 def _normalize(self): 

414 self._access_key = botocore.compat.ensure_unicode(self._access_key) 

415 self._secret_key = botocore.compat.ensure_unicode(self._secret_key) 

416 

417 @classmethod 

418 def create_from_metadata( 

419 cls, 

420 metadata, 

421 refresh_using, 

422 method, 

423 advisory_timeout=None, 

424 mandatory_timeout=None, 

425 ): 

426 kwargs = {} 

427 if advisory_timeout is not None: 

428 kwargs['advisory_timeout'] = advisory_timeout 

429 if mandatory_timeout is not None: 

430 kwargs['mandatory_timeout'] = mandatory_timeout 

431 

432 instance = cls( 

433 access_key=metadata['access_key'], 

434 secret_key=metadata['secret_key'], 

435 token=metadata['token'], 

436 expiry_time=cls._expiry_datetime(metadata['expiry_time']), 

437 method=method, 

438 refresh_using=refresh_using, 

439 account_id=metadata.get('account_id'), 

440 **kwargs, 

441 ) 

442 return instance 

443 

444 @property 

445 def access_key(self): 

446 """Warning: Using this property can lead to race conditions if you 

447 access another property subsequently along the refresh boundary. 

448 Please use get_frozen_credentials instead. 

449 """ 

450 self._refresh() 

451 return self._access_key 

452 

453 @access_key.setter 

454 def access_key(self, value): 

455 self._access_key = value 

456 

457 @property 

458 def secret_key(self): 

459 """Warning: Using this property can lead to race conditions if you 

460 access another property subsequently along the refresh boundary. 

461 Please use get_frozen_credentials instead. 

462 """ 

463 self._refresh() 

464 return self._secret_key 

465 

466 @secret_key.setter 

467 def secret_key(self, value): 

468 self._secret_key = value 

469 

470 @property 

471 def token(self): 

472 """Warning: Using this property can lead to race conditions if you 

473 access another property subsequently along the refresh boundary. 

474 Please use get_frozen_credentials instead. 

475 """ 

476 self._refresh() 

477 return self._token 

478 

479 @token.setter 

480 def token(self, value): 

481 self._token = value 

482 

483 @property 

484 def account_id(self): 

485 """Warning: Using this property can lead to race conditions if you 

486 access another property subsequently along the refresh boundary. 

487 Please use get_frozen_credentials instead. 

488 """ 

489 self._refresh() 

490 return self._account_id 

491 

492 @account_id.setter 

493 def account_id(self, value): 

494 self._account_id = value 

495 

496 def _seconds_remaining(self): 

497 delta = self._expiry_time - self._time_fetcher() 

498 return total_seconds(delta) 

499 

500 def refresh_needed(self, refresh_in=None): 

501 """Check if a refresh is needed. 

502 

503 A refresh is needed if the expiry time associated 

504 with the temporary credentials is less than the 

505 provided ``refresh_in``. If ``time_delta`` is not 

506 provided, ``self.advisory_refresh_needed`` will be used. 

507 

508 For example, if your temporary credentials expire 

509 in 10 minutes and the provided ``refresh_in`` is 

510 ``15 * 60``, then this function will return ``True``. 

511 

512 :type refresh_in: int 

513 :param refresh_in: The number of seconds before the 

514 credentials expire in which refresh attempts should 

515 be made. 

516 

517 :return: True if refresh needed, False otherwise. 

518 

519 """ 

520 if self._expiry_time is None: 

521 # No expiration, so assume we don't need to refresh. 

522 return False 

523 

524 if refresh_in is None: 

525 refresh_in = self._advisory_refresh_timeout 

526 # The credentials should be refreshed if they're going to expire 

527 # in less than 5 minutes. 

528 if self._seconds_remaining() >= refresh_in: 

529 # There's enough time left. Don't refresh. 

530 return False 

531 logger.debug("Credentials need to be refreshed.") 

532 return True 

533 

534 def _is_expired(self): 

535 # Checks if the current credentials are expired. 

536 return self.refresh_needed(refresh_in=0) 

537 

538 def _refresh(self): 

539 # In the common case where we don't need a refresh, we 

540 # can immediately exit and not require acquiring the 

541 # refresh lock. 

542 if not self.refresh_needed(self._advisory_refresh_timeout): 

543 return 

544 

545 # acquire() doesn't accept kwargs, but False is indicating 

546 # that we should not block if we can't acquire the lock. 

547 # If we aren't able to acquire the lock, we'll trigger 

548 # the else clause. 

549 if self._refresh_lock.acquire(False): 

550 try: 

551 if not self.refresh_needed(self._advisory_refresh_timeout): 

552 return 

553 is_mandatory_refresh = self.refresh_needed( 

554 self._mandatory_refresh_timeout 

555 ) 

556 self._protected_refresh(is_mandatory=is_mandatory_refresh) 

557 return 

558 finally: 

559 self._refresh_lock.release() 

560 elif self.refresh_needed(self._mandatory_refresh_timeout): 

561 # If we're within the mandatory refresh window, 

562 # we must block until we get refreshed credentials. 

563 with self._refresh_lock: 

564 if not self.refresh_needed(self._mandatory_refresh_timeout): 

565 return 

566 self._protected_refresh(is_mandatory=True) 

567 

568 def _protected_refresh(self, is_mandatory): 

569 # precondition: this method should only be called if you've acquired 

570 # the self._refresh_lock. 

571 try: 

572 metadata = self._refresh_using() 

573 except Exception: 

574 period_name = 'mandatory' if is_mandatory else 'advisory' 

575 logger.warning( 

576 "Refreshing temporary credentials failed " 

577 "during %s refresh period.", 

578 period_name, 

579 exc_info=True, 

580 ) 

581 if is_mandatory: 

582 # If this is a mandatory refresh, then 

583 # all errors that occur when we attempt to refresh 

584 # credentials are propagated back to the user. 

585 raise 

586 # Otherwise we'll just return. 

587 # The end result will be that we'll use the current 

588 # set of temporary credentials we have. 

589 return 

590 self._set_from_data(metadata) 

591 self._frozen_credentials = ReadOnlyCredentials( 

592 self._access_key, self._secret_key, self._token, self._account_id 

593 ) 

594 if self._is_expired(): 

595 # We successfully refreshed credentials but for whatever 

596 # reason, our refreshing function returned credentials 

597 # that are still expired. In this scenario, the only 

598 # thing we can do is let the user know and raise 

599 # an exception. 

600 msg = ( 

601 "Credentials were refreshed, but the " 

602 "refreshed credentials are still expired." 

603 ) 

604 logger.warning(msg) 

605 raise RuntimeError(msg) 

606 

607 @staticmethod 

608 def _expiry_datetime(time_str): 

609 return parse(time_str) 

610 

611 def _set_from_data(self, data): 

612 expected_keys = ['access_key', 'secret_key', 'token', 'expiry_time'] 

613 if not data: 

614 missing_keys = expected_keys 

615 else: 

616 missing_keys = [k for k in expected_keys if k not in data] 

617 

618 if missing_keys: 

619 message = "Credential refresh failed, response did not contain: %s" 

620 raise CredentialRetrievalError( 

621 provider=self.method, 

622 error_msg=message % ', '.join(missing_keys), 

623 ) 

624 

625 self.access_key = data['access_key'] 

626 self.secret_key = data['secret_key'] 

627 self.token = data['token'] 

628 self._expiry_time = parse(data['expiry_time']) 

629 self.account_id = data.get('account_id') 

630 logger.debug( 

631 "Retrieved credentials will expire at: %s", self._expiry_time 

632 ) 

633 self._normalize() 

634 

635 def get_frozen_credentials(self): 

636 """Return immutable credentials. 

637 

638 The ``access_key``, ``secret_key``, and ``token`` properties 

639 on this class will always check and refresh credentials if 

640 needed before returning the particular credentials. 

641 

642 This has an edge case where you can get inconsistent 

643 credentials. Imagine this: 

644 

645 # Current creds are "t1" 

646 tmp.access_key ---> expired? no, so return t1.access_key 

647 # ---- time is now expired, creds need refreshing to "t2" ---- 

648 tmp.secret_key ---> expired? yes, refresh and return t2.secret_key 

649 

650 This means we're using the access key from t1 with the secret key 

651 from t2. To fix this issue, you can request a frozen credential object 

652 which is guaranteed not to change. 

653 

654 The frozen credentials returned from this method should be used 

655 immediately and then discarded. The typical usage pattern would 

656 be:: 

657 

658 creds = RefreshableCredentials(...) 

659 some_code = SomeSignerObject() 

660 # I'm about to sign the request. 

661 # The frozen credentials are only used for the 

662 # duration of generate_presigned_url and will be 

663 # immediately thrown away. 

664 request = some_code.sign_some_request( 

665 with_credentials=creds.get_frozen_credentials()) 

666 print("Signed request:", request) 

667 

668 """ 

669 self._refresh() 

670 return self._frozen_credentials 

671 

672 

673class DeferredRefreshableCredentials(RefreshableCredentials): 

674 """Refreshable credentials that don't require initial credentials. 

675 

676 refresh_using will be called upon first access. 

677 """ 

678 

679 def __init__(self, refresh_using, method, time_fetcher=_local_now): 

680 self._refresh_using = refresh_using 

681 self._access_key = None 

682 self._secret_key = None 

683 self._token = None 

684 self._account_id = None 

685 self._expiry_time = None 

686 self._time_fetcher = time_fetcher 

687 self._refresh_lock = threading.Lock() 

688 self.method = method 

689 self._frozen_credentials = None 

690 

691 def refresh_needed(self, refresh_in=None): 

692 if self._frozen_credentials is None: 

693 return True 

694 return super().refresh_needed(refresh_in) 

695 

696 

697class CachedCredentialFetcher: 

698 DEFAULT_EXPIRY_WINDOW_SECONDS = 60 * 15 

699 

700 def __init__(self, cache=None, expiry_window_seconds=None): 

701 if cache is None: 

702 cache = {} 

703 self._cache = cache 

704 self._cache_key = self._create_cache_key() 

705 if expiry_window_seconds is None: 

706 expiry_window_seconds = self.DEFAULT_EXPIRY_WINDOW_SECONDS 

707 self._expiry_window_seconds = expiry_window_seconds 

708 self.feature_ids = set() 

709 

710 def _create_cache_key(self): 

711 raise NotImplementedError('_create_cache_key()') 

712 

713 def _make_file_safe(self, filename): 

714 # Replace :, path sep, and / to make it the string filename safe. 

715 filename = filename.replace(':', '_').replace(os.sep, '_') 

716 return filename.replace('/', '_') 

717 

718 def _get_credentials(self): 

719 raise NotImplementedError('_get_credentials()') 

720 

721 def fetch_credentials(self): 

722 return self._get_cached_credentials() 

723 

724 def _get_cached_credentials(self): 

725 """Get up-to-date credentials. 

726 

727 This will check the cache for up-to-date credentials, calling assume 

728 role if none are available. 

729 """ 

730 response = self._load_from_cache() 

731 if response is None: 

732 response = self._get_credentials() 

733 self._write_to_cache(response) 

734 else: 

735 logger.debug("Credentials for role retrieved from cache.") 

736 

737 creds = response['Credentials'] 

738 expiration = _serialize_if_needed(creds['Expiration'], iso=True) 

739 credentials = { 

740 'access_key': creds['AccessKeyId'], 

741 'secret_key': creds['SecretAccessKey'], 

742 'token': creds['SessionToken'], 

743 'expiry_time': expiration, 

744 'account_id': creds.get('AccountId'), 

745 } 

746 

747 return credentials 

748 

749 def _load_from_cache(self): 

750 if self._cache_key in self._cache: 

751 creds = deepcopy(self._cache[self._cache_key]) 

752 if not self._is_expired(creds): 

753 return creds 

754 else: 

755 logger.debug( 

756 "Credentials were found in cache, but they are expired." 

757 ) 

758 return None 

759 

760 def _write_to_cache(self, response): 

761 self._cache[self._cache_key] = deepcopy(response) 

762 

763 def _is_expired(self, credentials): 

764 """Check if credentials are expired.""" 

765 end_time = _parse_if_needed(credentials['Credentials']['Expiration']) 

766 seconds = total_seconds(end_time - _local_now()) 

767 return seconds < self._expiry_window_seconds 

768 

769 

770class BaseAssumeRoleCredentialFetcher(CachedCredentialFetcher): 

771 def __init__( 

772 self, 

773 client_creator, 

774 role_arn, 

775 extra_args=None, 

776 cache=None, 

777 expiry_window_seconds=None, 

778 ): 

779 self._client_creator = client_creator 

780 self._role_arn = role_arn 

781 

782 if extra_args is None: 

783 self._assume_kwargs = {} 

784 else: 

785 self._assume_kwargs = deepcopy(extra_args) 

786 self._assume_kwargs['RoleArn'] = self._role_arn 

787 

788 self._role_session_name = self._assume_kwargs.get('RoleSessionName') 

789 self._using_default_session_name = False 

790 if not self._role_session_name: 

791 self._generate_assume_role_name() 

792 

793 super().__init__(cache, expiry_window_seconds) 

794 

795 def _generate_assume_role_name(self): 

796 self._role_session_name = f'botocore-session-{int(time.time())}' 

797 self._assume_kwargs['RoleSessionName'] = self._role_session_name 

798 self._using_default_session_name = True 

799 

800 def _create_cache_key(self): 

801 """Create a predictable cache key for the current configuration. 

802 

803 The cache key is intended to be compatible with file names. 

804 """ 

805 args = deepcopy(self._assume_kwargs) 

806 

807 # The role session name gets randomly generated, so we don't want it 

808 # in the hash. 

809 if self._using_default_session_name: 

810 del args['RoleSessionName'] 

811 

812 if 'Policy' in args: 

813 # To have a predictable hash, the keys of the policy must be 

814 # sorted, so we have to load it here to make sure it gets sorted 

815 # later on. 

816 args['Policy'] = json.loads(args['Policy']) 

817 

818 args = json.dumps(args, sort_keys=True) 

819 argument_hash = sha1(args.encode('utf-8')).hexdigest() 

820 return self._make_file_safe(argument_hash) 

821 

822 def _add_account_id_to_response(self, response): 

823 role_arn = response.get('AssumedRoleUser', {}).get('Arn') 

824 if ArnParser.is_arn(role_arn): 

825 arn_parser = ArnParser() 

826 account_id = arn_parser.parse_arn(role_arn)['account'] 

827 response['Credentials']['AccountId'] = account_id 

828 else: 

829 logger.debug("Unable to extract account ID from Arn: %s", role_arn) 

830 

831 

832class AssumeRoleCredentialFetcher(BaseAssumeRoleCredentialFetcher): 

833 def __init__( 

834 self, 

835 client_creator, 

836 source_credentials, 

837 role_arn, 

838 extra_args=None, 

839 mfa_prompter=None, 

840 cache=None, 

841 expiry_window_seconds=None, 

842 ): 

843 """ 

844 :type client_creator: callable 

845 :param client_creator: A callable that creates a client taking 

846 arguments like ``Session.create_client``. 

847 

848 :type source_credentials: Credentials 

849 :param source_credentials: The credentials to use to create the 

850 client for the call to AssumeRole. 

851 

852 :type role_arn: str 

853 :param role_arn: The ARN of the role to be assumed. 

854 

855 :type extra_args: dict 

856 :param extra_args: Any additional arguments to add to the assume 

857 role request using the format of the botocore operation. 

858 Possible keys include, but may not be limited to, 

859 DurationSeconds, Policy, SerialNumber, ExternalId and 

860 RoleSessionName. 

861 

862 :type mfa_prompter: callable 

863 :param mfa_prompter: A callable that returns input provided by the 

864 user (i.e raw_input, getpass.getpass, etc.). 

865 

866 :type cache: dict 

867 :param cache: An object that supports ``__getitem__``, 

868 ``__setitem__``, and ``__contains__``. An example of this is 

869 the ``JSONFileCache`` class in aws-cli. 

870 

871 :type expiry_window_seconds: int 

872 :param expiry_window_seconds: The amount of time, in seconds, 

873 """ 

874 self._source_credentials = source_credentials 

875 self._mfa_prompter = mfa_prompter 

876 if self._mfa_prompter is None: 

877 self._mfa_prompter = getpass.getpass 

878 

879 super().__init__( 

880 client_creator, 

881 role_arn, 

882 extra_args=extra_args, 

883 cache=cache, 

884 expiry_window_seconds=expiry_window_seconds, 

885 ) 

886 

887 def _get_credentials(self): 

888 """Get credentials by calling assume role.""" 

889 register_feature_ids(self.feature_ids) 

890 kwargs = self._assume_role_kwargs() 

891 client = self._create_client() 

892 response = client.assume_role(**kwargs) 

893 self._add_account_id_to_response(response) 

894 return response 

895 

896 def _assume_role_kwargs(self): 

897 """Get the arguments for assume role based on current configuration.""" 

898 assume_role_kwargs = deepcopy(self._assume_kwargs) 

899 

900 mfa_serial = assume_role_kwargs.get('SerialNumber') 

901 

902 if mfa_serial is not None: 

903 prompt = f'Enter MFA code for {mfa_serial}: ' 

904 token_code = self._mfa_prompter(prompt) 

905 assume_role_kwargs['TokenCode'] = token_code 

906 

907 duration_seconds = assume_role_kwargs.get('DurationSeconds') 

908 

909 if duration_seconds is not None: 

910 assume_role_kwargs['DurationSeconds'] = duration_seconds 

911 

912 return assume_role_kwargs 

913 

914 def _create_client(self): 

915 """Create an STS client using the source credentials.""" 

916 frozen_credentials = self._source_credentials.get_frozen_credentials() 

917 return self._client_creator( 

918 'sts', 

919 aws_access_key_id=frozen_credentials.access_key, 

920 aws_secret_access_key=frozen_credentials.secret_key, 

921 aws_session_token=frozen_credentials.token, 

922 ) 

923 

924 

925class AssumeRoleWithWebIdentityCredentialFetcher( 

926 BaseAssumeRoleCredentialFetcher 

927): 

928 def __init__( 

929 self, 

930 client_creator, 

931 web_identity_token_loader, 

932 role_arn, 

933 extra_args=None, 

934 cache=None, 

935 expiry_window_seconds=None, 

936 ): 

937 """ 

938 :type client_creator: callable 

939 :param client_creator: A callable that creates a client taking 

940 arguments like ``Session.create_client``. 

941 

942 :type web_identity_token_loader: callable 

943 :param web_identity_token_loader: A callable that takes no arguments 

944 and returns a web identity token str. 

945 

946 :type role_arn: str 

947 :param role_arn: The ARN of the role to be assumed. 

948 

949 :type extra_args: dict 

950 :param extra_args: Any additional arguments to add to the assume 

951 role request using the format of the botocore operation. 

952 Possible keys include, but may not be limited to, 

953 DurationSeconds, Policy, SerialNumber, ExternalId and 

954 RoleSessionName. 

955 

956 :type cache: dict 

957 :param cache: An object that supports ``__getitem__``, 

958 ``__setitem__``, and ``__contains__``. An example of this is 

959 the ``JSONFileCache`` class in aws-cli. 

960 

961 :type expiry_window_seconds: int 

962 :param expiry_window_seconds: The amount of time, in seconds, 

963 """ 

964 self._web_identity_token_loader = web_identity_token_loader 

965 

966 super().__init__( 

967 client_creator, 

968 role_arn, 

969 extra_args=extra_args, 

970 cache=cache, 

971 expiry_window_seconds=expiry_window_seconds, 

972 ) 

973 

974 def _get_credentials(self): 

975 """Get credentials by calling assume role.""" 

976 register_feature_ids(self.feature_ids) 

977 kwargs = self._assume_role_kwargs() 

978 # Assume role with web identity does not require credentials other than 

979 # the token, explicitly configure the client to not sign requests. 

980 config = Config(signature_version=UNSIGNED) 

981 client = self._client_creator('sts', config=config) 

982 response = client.assume_role_with_web_identity(**kwargs) 

983 self._add_account_id_to_response(response) 

984 return response 

985 

986 def _assume_role_kwargs(self): 

987 """Get the arguments for assume role based on current configuration.""" 

988 assume_role_kwargs = deepcopy(self._assume_kwargs) 

989 identity_token = self._web_identity_token_loader() 

990 assume_role_kwargs['WebIdentityToken'] = identity_token 

991 

992 return assume_role_kwargs 

993 

994 

995class CredentialProvider: 

996 # A short name to identify the provider within botocore. 

997 METHOD = None 

998 

999 # A name to identify the provider for use in cross-sdk features like 

1000 # assume role's `credential_source` configuration option. These names 

1001 # are to be treated in a case-insensitive way. NOTE: any providers not 

1002 # implemented in botocore MUST prefix their canonical names with 

1003 # 'custom' or we DO NOT guarantee that it will work with any features 

1004 # that this provides. 

1005 CANONICAL_NAME = None 

1006 

1007 def __init__(self, session=None): 

1008 self.session = session 

1009 

1010 def load(self): 

1011 """ 

1012 Loads the credentials from their source & sets them on the object. 

1013 

1014 Subclasses should implement this method (by reading from disk, the 

1015 environment, the network or wherever), returning ``True`` if they were 

1016 found & loaded. 

1017 

1018 If not found, this method should return ``False``, indictating that the 

1019 ``CredentialResolver`` should fall back to the next available method. 

1020 

1021 The default implementation does nothing, assuming the user has set the 

1022 ``access_key/secret_key/token`` themselves. 

1023 

1024 :returns: Whether credentials were found & set 

1025 :rtype: Credentials 

1026 """ 

1027 return True 

1028 

1029 def _extract_creds_from_mapping(self, mapping, *key_names): 

1030 found = [] 

1031 for key_name in key_names: 

1032 try: 

1033 found.append(mapping[key_name]) 

1034 except KeyError: 

1035 raise PartialCredentialsError( 

1036 provider=self.METHOD, cred_var=key_name 

1037 ) 

1038 return found 

1039 

1040 

1041class ProcessProvider(CredentialProvider): 

1042 METHOD = 'custom-process' 

1043 

1044 def __init__(self, profile_name, load_config, popen=subprocess.Popen): 

1045 self._profile_name = profile_name 

1046 self._load_config = load_config 

1047 self._loaded_config = None 

1048 self._popen = popen 

1049 

1050 def load(self): 

1051 credential_process = self._credential_process 

1052 if credential_process is None: 

1053 return 

1054 

1055 register_feature_id('CREDENTIALS_PROFILE_PROCESS') 

1056 creds_dict = self._retrieve_credentials_using(credential_process) 

1057 register_feature_id('CREDENTIALS_PROCESS') 

1058 if creds_dict.get('expiry_time') is not None: 

1059 return RefreshableCredentials.create_from_metadata( 

1060 creds_dict, 

1061 lambda: self._retrieve_credentials_using(credential_process), 

1062 self.METHOD, 

1063 ) 

1064 

1065 return Credentials( 

1066 access_key=creds_dict['access_key'], 

1067 secret_key=creds_dict['secret_key'], 

1068 token=creds_dict.get('token'), 

1069 method=self.METHOD, 

1070 account_id=creds_dict.get('account_id'), 

1071 ) 

1072 

1073 def _retrieve_credentials_using(self, credential_process): 

1074 # We're not using shell=True, so we need to pass the 

1075 # command and all arguments as a list. 

1076 process_list = compat_shell_split(credential_process) 

1077 p = self._popen( 

1078 process_list, stdout=subprocess.PIPE, stderr=subprocess.PIPE 

1079 ) 

1080 stdout, stderr = p.communicate() 

1081 if p.returncode != 0: 

1082 raise CredentialRetrievalError( 

1083 provider=self.METHOD, error_msg=stderr.decode('utf-8') 

1084 ) 

1085 parsed = botocore.compat.json.loads(stdout.decode('utf-8')) 

1086 version = parsed.get('Version', '<Version key not provided>') 

1087 if version != 1: 

1088 raise CredentialRetrievalError( 

1089 provider=self.METHOD, 

1090 error_msg=( 

1091 f"Unsupported version '{version}' for credential process " 

1092 f"provider, supported versions: 1" 

1093 ), 

1094 ) 

1095 try: 

1096 return { 

1097 'access_key': parsed['AccessKeyId'], 

1098 'secret_key': parsed['SecretAccessKey'], 

1099 'token': parsed.get('SessionToken'), 

1100 'expiry_time': parsed.get('Expiration'), 

1101 'account_id': self._get_account_id(parsed), 

1102 } 

1103 except KeyError as e: 

1104 raise CredentialRetrievalError( 

1105 provider=self.METHOD, 

1106 error_msg=f"Missing required key in response: {e}", 

1107 ) 

1108 

1109 @property 

1110 def _credential_process(self): 

1111 return self.profile_config.get('credential_process') 

1112 

1113 @property 

1114 def profile_config(self): 

1115 if self._loaded_config is None: 

1116 self._loaded_config = self._load_config() 

1117 profile_config = self._loaded_config.get('profiles', {}).get( 

1118 self._profile_name, {} 

1119 ) 

1120 return profile_config 

1121 

1122 def _get_account_id(self, parsed): 

1123 account_id = parsed.get('AccountId') 

1124 return account_id or self.profile_config.get('aws_account_id') 

1125 

1126 

1127class InstanceMetadataProvider(CredentialProvider): 

1128 METHOD = 'iam-role' 

1129 CANONICAL_NAME = 'Ec2InstanceMetadata' 

1130 

1131 def __init__(self, iam_role_fetcher): 

1132 self._role_fetcher = iam_role_fetcher 

1133 

1134 def load(self): 

1135 fetcher = self._role_fetcher 

1136 # We do the first request, to see if we get useful data back. 

1137 # If not, we'll pass & move on to whatever's next in the credential 

1138 # chain. 

1139 metadata = fetcher.retrieve_iam_role_credentials() 

1140 if not metadata: 

1141 return None 

1142 register_feature_id('CREDENTIALS_IMDS') 

1143 logger.info( 

1144 'Found credentials from IAM Role: %s', metadata['role_name'] 

1145 ) 

1146 # We manually set the data here, since we already made the request & 

1147 # have it. When the expiry is hit, the credentials will auto-refresh 

1148 # themselves. 

1149 creds = RefreshableCredentials.create_from_metadata( 

1150 metadata, 

1151 method=self.METHOD, 

1152 refresh_using=fetcher.retrieve_iam_role_credentials, 

1153 ) 

1154 return creds 

1155 

1156 

1157class EnvProvider(CredentialProvider): 

1158 METHOD = 'env' 

1159 CANONICAL_NAME = 'Environment' 

1160 ACCESS_KEY = 'AWS_ACCESS_KEY_ID' 

1161 SECRET_KEY = 'AWS_SECRET_ACCESS_KEY' 

1162 # The token can come from either of these env var. 

1163 # AWS_SESSION_TOKEN is what other AWS SDKs have standardized on. 

1164 TOKENS = ['AWS_SECURITY_TOKEN', 'AWS_SESSION_TOKEN'] 

1165 EXPIRY_TIME = 'AWS_CREDENTIAL_EXPIRATION' 

1166 ACCOUNT_ID = 'AWS_ACCOUNT_ID' 

1167 

1168 def __init__(self, environ=None, mapping=None): 

1169 """ 

1170 

1171 :param environ: The environment variables (defaults to 

1172 ``os.environ`` if no value is provided). 

1173 :param mapping: An optional mapping of variable names to 

1174 environment variable names. Use this if you want to 

1175 change the mapping of access_key->AWS_ACCESS_KEY_ID, etc. 

1176 The dict can have up to 5 keys: 

1177 * ``access_key`` 

1178 * ``secret_key`` 

1179 * ``token`` 

1180 * ``expiry_time`` 

1181 * ``account_id`` 

1182 """ 

1183 if environ is None: 

1184 environ = os.environ 

1185 self.environ = environ 

1186 self._mapping = self._build_mapping(mapping) 

1187 

1188 def _build_mapping(self, mapping): 

1189 # Mapping of variable name to env var name. 

1190 var_mapping = {} 

1191 if mapping is None: 

1192 # Use the class var default. 

1193 var_mapping['access_key'] = self.ACCESS_KEY 

1194 var_mapping['secret_key'] = self.SECRET_KEY 

1195 var_mapping['token'] = self.TOKENS 

1196 var_mapping['expiry_time'] = self.EXPIRY_TIME 

1197 var_mapping['account_id'] = self.ACCOUNT_ID 

1198 else: 

1199 var_mapping['access_key'] = mapping.get( 

1200 'access_key', self.ACCESS_KEY 

1201 ) 

1202 var_mapping['secret_key'] = mapping.get( 

1203 'secret_key', self.SECRET_KEY 

1204 ) 

1205 var_mapping['token'] = mapping.get('token', self.TOKENS) 

1206 if not isinstance(var_mapping['token'], list): 

1207 var_mapping['token'] = [var_mapping['token']] 

1208 var_mapping['expiry_time'] = mapping.get( 

1209 'expiry_time', self.EXPIRY_TIME 

1210 ) 

1211 var_mapping['account_id'] = mapping.get( 

1212 'account_id', self.ACCOUNT_ID 

1213 ) 

1214 return var_mapping 

1215 

1216 def load(self): 

1217 """ 

1218 Search for credentials in explicit environment variables. 

1219 """ 

1220 

1221 access_key = self.environ.get(self._mapping['access_key'], '') 

1222 

1223 if access_key: 

1224 logger.info('Found credentials in environment variables.') 

1225 fetcher = self._create_credentials_fetcher() 

1226 credentials = fetcher(require_expiry=False) 

1227 register_feature_id('CREDENTIALS_ENV_VARS') 

1228 

1229 expiry_time = credentials['expiry_time'] 

1230 if expiry_time is not None: 

1231 expiry_time = parse(expiry_time) 

1232 return RefreshableCredentials( 

1233 credentials['access_key'], 

1234 credentials['secret_key'], 

1235 credentials['token'], 

1236 expiry_time, 

1237 refresh_using=fetcher, 

1238 method=self.METHOD, 

1239 account_id=credentials['account_id'], 

1240 ) 

1241 

1242 return Credentials( 

1243 credentials['access_key'], 

1244 credentials['secret_key'], 

1245 credentials['token'], 

1246 method=self.METHOD, 

1247 account_id=credentials['account_id'], 

1248 ) 

1249 else: 

1250 return None 

1251 

1252 def _create_credentials_fetcher(self): 

1253 mapping = self._mapping 

1254 method = self.METHOD 

1255 environ = self.environ 

1256 

1257 def fetch_credentials(require_expiry=True): 

1258 credentials = {} 

1259 

1260 access_key = environ.get(mapping['access_key'], '') 

1261 if not access_key: 

1262 raise PartialCredentialsError( 

1263 provider=method, cred_var=mapping['access_key'] 

1264 ) 

1265 credentials['access_key'] = access_key 

1266 

1267 secret_key = environ.get(mapping['secret_key'], '') 

1268 if not secret_key: 

1269 raise PartialCredentialsError( 

1270 provider=method, cred_var=mapping['secret_key'] 

1271 ) 

1272 credentials['secret_key'] = secret_key 

1273 

1274 credentials['token'] = None 

1275 for token_env_var in mapping['token']: 

1276 token = environ.get(token_env_var, '') 

1277 if token: 

1278 credentials['token'] = token 

1279 break 

1280 

1281 credentials['expiry_time'] = None 

1282 expiry_time = environ.get(mapping['expiry_time'], '') 

1283 if expiry_time: 

1284 credentials['expiry_time'] = expiry_time 

1285 if require_expiry and not expiry_time: 

1286 raise PartialCredentialsError( 

1287 provider=method, cred_var=mapping['expiry_time'] 

1288 ) 

1289 

1290 credentials['account_id'] = None 

1291 account_id = environ.get(mapping['account_id'], '') 

1292 if account_id: 

1293 credentials['account_id'] = account_id 

1294 

1295 return credentials 

1296 

1297 return fetch_credentials 

1298 

1299 

1300class OriginalEC2Provider(CredentialProvider): 

1301 METHOD = 'ec2-credentials-file' 

1302 CANONICAL_NAME = 'Ec2Config' 

1303 

1304 CRED_FILE_ENV = 'AWS_CREDENTIAL_FILE' 

1305 ACCESS_KEY = 'AWSAccessKeyId' 

1306 SECRET_KEY = 'AWSSecretKey' 

1307 

1308 def __init__(self, environ=None, parser=None): 

1309 if environ is None: 

1310 environ = os.environ 

1311 if parser is None: 

1312 parser = parse_key_val_file 

1313 self._environ = environ 

1314 self._parser = parser 

1315 

1316 def load(self): 

1317 """ 

1318 Search for a credential file used by original EC2 CLI tools. 

1319 """ 

1320 if 'AWS_CREDENTIAL_FILE' in self._environ: 

1321 full_path = os.path.expanduser( 

1322 self._environ['AWS_CREDENTIAL_FILE'] 

1323 ) 

1324 creds = self._parser(full_path) 

1325 if self.ACCESS_KEY in creds: 

1326 logger.info('Found credentials in AWS_CREDENTIAL_FILE.') 

1327 access_key = creds[self.ACCESS_KEY] 

1328 secret_key = creds[self.SECRET_KEY] 

1329 # EC2 creds file doesn't support session tokens. 

1330 return Credentials(access_key, secret_key, method=self.METHOD) 

1331 else: 

1332 return None 

1333 

1334 

1335class SharedCredentialProvider(CredentialProvider): 

1336 METHOD = 'shared-credentials-file' 

1337 CANONICAL_NAME = 'SharedCredentials' 

1338 

1339 ACCESS_KEY = 'aws_access_key_id' 

1340 SECRET_KEY = 'aws_secret_access_key' 

1341 # Same deal as the EnvProvider above. Botocore originally supported 

1342 # aws_security_token, but the SDKs are standardizing on aws_session_token 

1343 # so we support both. 

1344 TOKENS = ['aws_security_token', 'aws_session_token'] 

1345 ACCOUNT_ID = 'aws_account_id' 

1346 

1347 def __init__(self, creds_filename, profile_name=None, ini_parser=None): 

1348 self._creds_filename = creds_filename 

1349 if profile_name is None: 

1350 profile_name = 'default' 

1351 self._profile_name = profile_name 

1352 if ini_parser is None: 

1353 ini_parser = botocore.configloader.raw_config_parse 

1354 self._ini_parser = ini_parser 

1355 

1356 def load(self): 

1357 try: 

1358 available_creds = self._ini_parser(self._creds_filename) 

1359 except ConfigNotFound: 

1360 return None 

1361 if self._profile_name in available_creds: 

1362 config = available_creds[self._profile_name] 

1363 if self.ACCESS_KEY in config: 

1364 logger.info( 

1365 "Found credentials in shared credentials file: %s", 

1366 self._creds_filename, 

1367 ) 

1368 access_key, secret_key = self._extract_creds_from_mapping( 

1369 config, self.ACCESS_KEY, self.SECRET_KEY 

1370 ) 

1371 token = self._get_session_token(config) 

1372 account_id = self._get_account_id(config) 

1373 register_feature_id('CREDENTIALS_PROFILE') 

1374 return Credentials( 

1375 access_key, 

1376 secret_key, 

1377 token, 

1378 method=self.METHOD, 

1379 account_id=account_id, 

1380 ) 

1381 

1382 def _get_session_token(self, config): 

1383 for token_envvar in self.TOKENS: 

1384 if token_envvar in config: 

1385 return config[token_envvar] 

1386 

1387 def _get_account_id(self, config): 

1388 return config.get(self.ACCOUNT_ID) 

1389 

1390 

1391class ConfigProvider(CredentialProvider): 

1392 """INI based config provider with profile sections.""" 

1393 

1394 METHOD = 'config-file' 

1395 CANONICAL_NAME = 'SharedConfig' 

1396 

1397 ACCESS_KEY = 'aws_access_key_id' 

1398 SECRET_KEY = 'aws_secret_access_key' 

1399 # Same deal as the EnvProvider above. Botocore originally supported 

1400 # aws_security_token, but the SDKs are standardizing on aws_session_token 

1401 # so we support both. 

1402 TOKENS = ['aws_security_token', 'aws_session_token'] 

1403 ACCOUNT_ID = 'aws_account_id' 

1404 

1405 def __init__(self, config_filename, profile_name, config_parser=None): 

1406 """ 

1407 

1408 :param config_filename: The session configuration scoped to the current 

1409 profile. This is available via ``session.config``. 

1410 :param profile_name: The name of the current profile. 

1411 :param config_parser: A config parser callable. 

1412 

1413 """ 

1414 self._config_filename = config_filename 

1415 self._profile_name = profile_name 

1416 if config_parser is None: 

1417 config_parser = botocore.configloader.load_config 

1418 self._config_parser = config_parser 

1419 

1420 def load(self): 

1421 """ 

1422 If there is are credentials in the configuration associated with 

1423 the session, use those. 

1424 """ 

1425 try: 

1426 full_config = self._config_parser(self._config_filename) 

1427 except ConfigNotFound: 

1428 return None 

1429 if self._profile_name in full_config['profiles']: 

1430 profile_config = full_config['profiles'][self._profile_name] 

1431 if self.ACCESS_KEY in profile_config: 

1432 logger.info( 

1433 "Credentials found in config file: %s", 

1434 self._config_filename, 

1435 ) 

1436 access_key, secret_key = self._extract_creds_from_mapping( 

1437 profile_config, self.ACCESS_KEY, self.SECRET_KEY 

1438 ) 

1439 token = self._get_session_token(profile_config) 

1440 account_id = self._get_account_id(profile_config) 

1441 register_feature_id('CREDENTIALS_PROFILE') 

1442 return Credentials( 

1443 access_key, 

1444 secret_key, 

1445 token, 

1446 method=self.METHOD, 

1447 account_id=account_id, 

1448 ) 

1449 else: 

1450 return None 

1451 

1452 def _get_session_token(self, profile_config): 

1453 for token_name in self.TOKENS: 

1454 if token_name in profile_config: 

1455 return profile_config[token_name] 

1456 

1457 def _get_account_id(self, config): 

1458 return config.get(self.ACCOUNT_ID) 

1459 

1460 

1461class BotoProvider(CredentialProvider): 

1462 METHOD = 'boto-config' 

1463 CANONICAL_NAME = 'Boto2Config' 

1464 

1465 BOTO_CONFIG_ENV = 'BOTO_CONFIG' 

1466 DEFAULT_CONFIG_FILENAMES = ['/etc/boto.cfg', '~/.boto'] 

1467 ACCESS_KEY = 'aws_access_key_id' 

1468 SECRET_KEY = 'aws_secret_access_key' 

1469 

1470 def __init__(self, environ=None, ini_parser=None): 

1471 if environ is None: 

1472 environ = os.environ 

1473 if ini_parser is None: 

1474 ini_parser = botocore.configloader.raw_config_parse 

1475 self._environ = environ 

1476 self._ini_parser = ini_parser 

1477 

1478 def load(self): 

1479 """ 

1480 Look for credentials in boto config file. 

1481 """ 

1482 if self.BOTO_CONFIG_ENV in self._environ: 

1483 potential_locations = [self._environ[self.BOTO_CONFIG_ENV]] 

1484 else: 

1485 potential_locations = self.DEFAULT_CONFIG_FILENAMES 

1486 for filename in potential_locations: 

1487 try: 

1488 config = self._ini_parser(filename) 

1489 except ConfigNotFound: 

1490 # Move on to the next potential config file name. 

1491 continue 

1492 if 'Credentials' in config: 

1493 credentials = config['Credentials'] 

1494 if self.ACCESS_KEY in credentials: 

1495 logger.info( 

1496 "Found credentials in boto config file: %s", filename 

1497 ) 

1498 access_key, secret_key = self._extract_creds_from_mapping( 

1499 credentials, self.ACCESS_KEY, self.SECRET_KEY 

1500 ) 

1501 register_feature_id('CREDENTIALS_BOTO2_CONFIG_FILE') 

1502 return Credentials( 

1503 access_key, secret_key, method=self.METHOD 

1504 ) 

1505 

1506 

1507class AssumeRoleProvider(CredentialProvider): 

1508 METHOD = 'assume-role' 

1509 # The AssumeRole provider is logically part of the SharedConfig and 

1510 # SharedCredentials providers. Since the purpose of the canonical name 

1511 # is to provide cross-sdk compatibility, calling code will need to be 

1512 # aware that either of those providers should be tied to the AssumeRole 

1513 # provider as much as possible. 

1514 CANONICAL_NAME = None 

1515 ROLE_CONFIG_VAR = 'role_arn' 

1516 WEB_IDENTITY_TOKE_FILE_VAR = 'web_identity_token_file' 

1517 # Credentials are considered expired (and will be refreshed) once the total 

1518 # remaining time left until the credentials expires is less than the 

1519 # EXPIRY_WINDOW. 

1520 EXPIRY_WINDOW_SECONDS = 60 * 15 

1521 NAMED_PROVIDER_FEATURE_MAP = { 

1522 'Ec2InstanceMetadata': 'CREDENTIALS_IMDS', 

1523 'Environment': 'CREDENTIALS_ENV_VARS', 

1524 'EcsContainer': 'CREDENTIALS_HTTP', 

1525 } 

1526 

1527 def __init__( 

1528 self, 

1529 load_config, 

1530 client_creator, 

1531 cache, 

1532 profile_name, 

1533 prompter=getpass.getpass, 

1534 credential_sourcer=None, 

1535 profile_provider_builder=None, 

1536 ): 

1537 """ 

1538 :type load_config: callable 

1539 :param load_config: A function that accepts no arguments, and 

1540 when called, will return the full configuration dictionary 

1541 for the session (``session.full_config``). 

1542 

1543 :type client_creator: callable 

1544 :param client_creator: A factory function that will create 

1545 a client when called. Has the same interface as 

1546 ``botocore.session.Session.create_client``. 

1547 

1548 :type cache: dict 

1549 :param cache: An object that supports ``__getitem__``, 

1550 ``__setitem__``, and ``__contains__``. An example 

1551 of this is the ``JSONFileCache`` class in the CLI. 

1552 

1553 :type profile_name: str 

1554 :param profile_name: The name of the profile. 

1555 

1556 :type prompter: callable 

1557 :param prompter: A callable that returns input provided 

1558 by the user (i.e raw_input, getpass.getpass, etc.). 

1559 

1560 :type credential_sourcer: CanonicalNameCredentialSourcer 

1561 :param credential_sourcer: A credential provider that takes a 

1562 configuration, which is used to provide the source credentials 

1563 for the STS call. 

1564 """ 

1565 #: The cache used to first check for assumed credentials. 

1566 #: This is checked before making the AssumeRole API 

1567 #: calls and can be useful if you have short lived 

1568 #: scripts and you'd like to avoid calling AssumeRole 

1569 #: until the credentials are expired. 

1570 self.cache = cache 

1571 self._load_config = load_config 

1572 # client_creator is a callable that creates function. 

1573 # It's basically session.create_client 

1574 self._client_creator = client_creator 

1575 self._profile_name = profile_name 

1576 self._prompter = prompter 

1577 # The _loaded_config attribute will be populated from the 

1578 # load_config() function once the configuration is actually 

1579 # loaded. The reason we go through all this instead of just 

1580 # requiring that the loaded_config be passed to us is to that 

1581 # we can defer configuration loaded until we actually try 

1582 # to load credentials (as opposed to when the object is 

1583 # instantiated). 

1584 self._loaded_config = {} 

1585 self._credential_sourcer = credential_sourcer 

1586 self._profile_provider_builder = profile_provider_builder 

1587 self._visited_profiles = [self._profile_name] 

1588 self._feature_ids = set() 

1589 

1590 def load(self): 

1591 self._loaded_config = self._load_config() 

1592 profiles = self._loaded_config.get('profiles', {}) 

1593 profile = profiles.get(self._profile_name, {}) 

1594 if self._has_assume_role_config_vars(profile): 

1595 return self._load_creds_via_assume_role(self._profile_name) 

1596 

1597 def _has_assume_role_config_vars(self, profile): 

1598 return ( 

1599 self.ROLE_CONFIG_VAR in profile 

1600 and 

1601 # We need to ensure this provider doesn't look at a profile when 

1602 # the profile has configuration for web identity. Simply relying on 

1603 # the order in the credential chain is insufficient as it doesn't 

1604 # prevent the case when we're doing an assume role chain. 

1605 self.WEB_IDENTITY_TOKE_FILE_VAR not in profile 

1606 ) 

1607 

1608 def _load_creds_via_assume_role(self, profile_name): 

1609 role_config = self._get_role_config(profile_name) 

1610 source_credentials = self._resolve_source_credentials( 

1611 role_config, profile_name 

1612 ) 

1613 

1614 extra_args = {} 

1615 role_session_name = role_config.get('role_session_name') 

1616 if role_session_name is not None: 

1617 extra_args['RoleSessionName'] = role_session_name 

1618 

1619 external_id = role_config.get('external_id') 

1620 if external_id is not None: 

1621 extra_args['ExternalId'] = external_id 

1622 

1623 mfa_serial = role_config.get('mfa_serial') 

1624 if mfa_serial is not None: 

1625 extra_args['SerialNumber'] = mfa_serial 

1626 

1627 duration_seconds = role_config.get('duration_seconds') 

1628 if duration_seconds is not None: 

1629 extra_args['DurationSeconds'] = duration_seconds 

1630 

1631 fetcher = AssumeRoleCredentialFetcher( 

1632 client_creator=self._client_creator, 

1633 source_credentials=source_credentials, 

1634 role_arn=role_config['role_arn'], 

1635 extra_args=extra_args, 

1636 mfa_prompter=self._prompter, 

1637 cache=self.cache, 

1638 ) 

1639 fetcher.feature_ids = self._feature_ids.copy() 

1640 refresher = fetcher.fetch_credentials 

1641 if mfa_serial is not None: 

1642 refresher = create_mfa_serial_refresher(refresher) 

1643 

1644 self._feature_ids.add('CREDENTIALS_STS_ASSUME_ROLE') 

1645 register_feature_ids(self._feature_ids) 

1646 # The initial credentials are empty and the expiration time is set 

1647 # to now so that we can delay the call to assume role until it is 

1648 # strictly needed. 

1649 return DeferredRefreshableCredentials( 

1650 method=self.METHOD, 

1651 refresh_using=refresher, 

1652 time_fetcher=_local_now, 

1653 ) 

1654 

1655 def _get_role_config(self, profile_name): 

1656 """Retrieves and validates the role configuration for the profile.""" 

1657 profiles = self._loaded_config.get('profiles', {}) 

1658 

1659 profile = profiles[profile_name] 

1660 source_profile = profile.get('source_profile') 

1661 role_arn = profile['role_arn'] 

1662 credential_source = profile.get('credential_source') 

1663 mfa_serial = profile.get('mfa_serial') 

1664 external_id = profile.get('external_id') 

1665 role_session_name = profile.get('role_session_name') 

1666 duration_seconds = profile.get('duration_seconds') 

1667 

1668 role_config = { 

1669 'role_arn': role_arn, 

1670 'external_id': external_id, 

1671 'mfa_serial': mfa_serial, 

1672 'role_session_name': role_session_name, 

1673 'source_profile': source_profile, 

1674 'credential_source': credential_source, 

1675 } 

1676 

1677 if duration_seconds is not None: 

1678 try: 

1679 role_config['duration_seconds'] = int(duration_seconds) 

1680 except ValueError: 

1681 pass 

1682 

1683 # Either the credential source or the source profile must be 

1684 # specified, but not both. 

1685 if credential_source is not None and source_profile is not None: 

1686 raise InvalidConfigError( 

1687 error_msg=( 

1688 f'The profile "{profile_name}" contains both ' 

1689 'source_profile and credential_source.' 

1690 ) 

1691 ) 

1692 elif credential_source is None and source_profile is None: 

1693 raise PartialCredentialsError( 

1694 provider=self.METHOD, 

1695 cred_var='source_profile or credential_source', 

1696 ) 

1697 elif credential_source is not None: 

1698 self._validate_credential_source(profile_name, credential_source) 

1699 else: 

1700 self._validate_source_profile(profile_name, source_profile) 

1701 

1702 return role_config 

1703 

1704 def _validate_credential_source(self, parent_profile, credential_source): 

1705 if self._credential_sourcer is None: 

1706 raise InvalidConfigError( 

1707 error_msg=( 

1708 f"The credential_source \"{credential_source}\" is specified " 

1709 f"in profile \"{parent_profile}\", " 

1710 f"but no source provider was configured." 

1711 ) 

1712 ) 

1713 if not self._credential_sourcer.is_supported(credential_source): 

1714 raise InvalidConfigError( 

1715 error_msg=( 

1716 f"The credential source \"{credential_source}\" referenced " 

1717 f"in profile \"{parent_profile}\" is not valid." 

1718 ) 

1719 ) 

1720 

1721 def _source_profile_has_credentials(self, profile): 

1722 return any( 

1723 [ 

1724 self._has_static_credentials(profile), 

1725 self._has_assume_role_config_vars(profile), 

1726 ] 

1727 ) 

1728 

1729 def _validate_source_profile( 

1730 self, parent_profile_name, source_profile_name 

1731 ): 

1732 profiles = self._loaded_config.get('profiles', {}) 

1733 if source_profile_name not in profiles: 

1734 raise InvalidConfigError( 

1735 error_msg=( 

1736 f"The source_profile \"{source_profile_name}\" referenced in " 

1737 f"the profile \"{parent_profile_name}\" does not exist." 

1738 ) 

1739 ) 

1740 

1741 source_profile = profiles[source_profile_name] 

1742 

1743 # Make sure we aren't going into an infinite loop. If we haven't 

1744 # visited the profile yet, we're good. 

1745 if source_profile_name not in self._visited_profiles: 

1746 return 

1747 

1748 # If we have visited the profile and the profile isn't simply 

1749 # referencing itself, that's an infinite loop. 

1750 if source_profile_name != parent_profile_name: 

1751 raise InfiniteLoopConfigError( 

1752 source_profile=source_profile_name, 

1753 visited_profiles=self._visited_profiles, 

1754 ) 

1755 

1756 # A profile is allowed to reference itself so that it can source 

1757 # static credentials and have configuration all in the same 

1758 # profile. This will only ever work for the top level assume 

1759 # role because the static credentials will otherwise take 

1760 # precedence. 

1761 if not self._has_static_credentials(source_profile): 

1762 raise InfiniteLoopConfigError( 

1763 source_profile=source_profile_name, 

1764 visited_profiles=self._visited_profiles, 

1765 ) 

1766 

1767 def _has_static_credentials(self, profile): 

1768 static_keys = ['aws_secret_access_key', 'aws_access_key_id'] 

1769 return any(static_key in profile for static_key in static_keys) 

1770 

1771 def _resolve_source_credentials(self, role_config, profile_name): 

1772 credential_source = role_config.get('credential_source') 

1773 if credential_source is not None: 

1774 self._feature_ids.add('CREDENTIALS_PROFILE_NAMED_PROVIDER') 

1775 return self._resolve_credentials_from_source( 

1776 credential_source, profile_name 

1777 ) 

1778 

1779 source_profile = role_config['source_profile'] 

1780 self._visited_profiles.append(source_profile) 

1781 self._feature_ids.add('CREDENTIALS_PROFILE_SOURCE_PROFILE') 

1782 return self._resolve_credentials_from_profile(source_profile) 

1783 

1784 def _resolve_credentials_from_profile(self, profile_name): 

1785 profiles = self._loaded_config.get('profiles', {}) 

1786 profile = profiles[profile_name] 

1787 self._feature_ids.add('CREDENTIALS_PROFILE') 

1788 if ( 

1789 self._has_static_credentials(profile) 

1790 and not self._profile_provider_builder 

1791 ): 

1792 # This is only here for backwards compatibility. If this provider 

1793 # isn't given a profile provider builder we still want to be able 

1794 # to handle the basic static credential case as we would before the 

1795 # profile provider builder parameter was added. 

1796 return self._resolve_static_credentials_from_profile(profile) 

1797 elif self._has_static_credentials( 

1798 profile 

1799 ) or not self._has_assume_role_config_vars(profile): 

1800 profile_providers = self._profile_provider_builder.providers( 

1801 profile_name=profile_name, 

1802 disable_env_vars=True, 

1803 ) 

1804 profile_chain = CredentialResolver(profile_providers) 

1805 credentials = profile_chain.load_credentials() 

1806 if credentials is None: 

1807 error_message = ( 

1808 'The source profile "%s" must have credentials.' 

1809 ) 

1810 raise InvalidConfigError( 

1811 error_msg=error_message % profile_name, 

1812 ) 

1813 return credentials 

1814 

1815 return self._load_creds_via_assume_role(profile_name) 

1816 

1817 def _resolve_static_credentials_from_profile(self, profile): 

1818 try: 

1819 return Credentials( 

1820 access_key=profile['aws_access_key_id'], 

1821 secret_key=profile['aws_secret_access_key'], 

1822 token=profile.get('aws_session_token'), 

1823 ) 

1824 except KeyError as e: 

1825 raise PartialCredentialsError( 

1826 provider=self.METHOD, cred_var=str(e) 

1827 ) 

1828 

1829 def _resolve_credentials_from_source( 

1830 self, credential_source, profile_name 

1831 ): 

1832 credentials = self._credential_sourcer.source_credentials( 

1833 credential_source 

1834 ) 

1835 if credentials is None: 

1836 raise CredentialRetrievalError( 

1837 provider=credential_source, 

1838 error_msg=( 

1839 'No credentials found in credential_source referenced ' 

1840 f'in profile {profile_name}' 

1841 ), 

1842 ) 

1843 named_provider_feature_id = self.NAMED_PROVIDER_FEATURE_MAP.get( 

1844 credential_source 

1845 ) 

1846 if named_provider_feature_id: 

1847 self._feature_ids.add(named_provider_feature_id) 

1848 return credentials 

1849 

1850 

1851class AssumeRoleWithWebIdentityProvider(CredentialProvider): 

1852 METHOD = 'assume-role-with-web-identity' 

1853 CANONICAL_NAME = None 

1854 _CONFIG_TO_ENV_VAR = { 

1855 'web_identity_token_file': 'AWS_WEB_IDENTITY_TOKEN_FILE', 

1856 'role_session_name': 'AWS_ROLE_SESSION_NAME', 

1857 'role_arn': 'AWS_ROLE_ARN', 

1858 } 

1859 

1860 def __init__( 

1861 self, 

1862 load_config, 

1863 client_creator, 

1864 profile_name, 

1865 cache=None, 

1866 disable_env_vars=False, 

1867 token_loader_cls=None, 

1868 ): 

1869 self.cache = cache 

1870 self._load_config = load_config 

1871 self._client_creator = client_creator 

1872 self._profile_name = profile_name 

1873 self._profile_config = None 

1874 self._disable_env_vars = disable_env_vars 

1875 if token_loader_cls is None: 

1876 token_loader_cls = FileWebIdentityTokenLoader 

1877 self._token_loader_cls = token_loader_cls 

1878 self._feature_ids = set() 

1879 

1880 def load(self): 

1881 return self._assume_role_with_web_identity() 

1882 

1883 def _get_profile_config(self, key): 

1884 if self._profile_config is None: 

1885 loaded_config = self._load_config() 

1886 profiles = loaded_config.get('profiles', {}) 

1887 self._profile_config = profiles.get(self._profile_name, {}) 

1888 return self._profile_config.get(key) 

1889 

1890 def _get_env_config(self, key): 

1891 if self._disable_env_vars: 

1892 return None 

1893 env_key = self._CONFIG_TO_ENV_VAR.get(key) 

1894 if env_key and env_key in os.environ: 

1895 return os.environ[env_key] 

1896 return None 

1897 

1898 def _get_config(self, key): 

1899 env_value = self._get_env_config(key) 

1900 if env_value is not None: 

1901 self._feature_ids.add('CREDENTIALS_ENV_VARS_STS_WEB_ID_TOKEN') 

1902 return env_value 

1903 

1904 config_value = self._get_profile_config(key) 

1905 if config_value is not None: 

1906 self._feature_ids.add('CREDENTIALS_PROFILE_STS_WEB_ID_TOKEN') 

1907 return config_value 

1908 

1909 return None 

1910 

1911 def _assume_role_with_web_identity(self): 

1912 token_path = self._get_config('web_identity_token_file') 

1913 if not token_path: 

1914 return None 

1915 token_loader = self._token_loader_cls(token_path) 

1916 

1917 role_arn = self._get_config('role_arn') 

1918 if not role_arn: 

1919 error_msg = ( 

1920 'The provided profile or the current environment is ' 

1921 'configured to assume role with web identity but has no ' 

1922 'role ARN configured. Ensure that the profile has the role_arn' 

1923 'configuration set or the AWS_ROLE_ARN env var is set.' 

1924 ) 

1925 raise InvalidConfigError(error_msg=error_msg) 

1926 

1927 extra_args = {} 

1928 role_session_name = self._get_config('role_session_name') 

1929 if role_session_name is not None: 

1930 extra_args['RoleSessionName'] = role_session_name 

1931 

1932 fetcher = AssumeRoleWithWebIdentityCredentialFetcher( 

1933 client_creator=self._client_creator, 

1934 web_identity_token_loader=token_loader, 

1935 role_arn=role_arn, 

1936 extra_args=extra_args, 

1937 cache=self.cache, 

1938 ) 

1939 fetcher.feature_ids = self._feature_ids.copy() 

1940 

1941 self._feature_ids.add('CREDENTIALS_STS_ASSUME_ROLE_WEB_ID') 

1942 register_feature_ids(self._feature_ids) 

1943 # The initial credentials are empty and the expiration time is set 

1944 # to now so that we can delay the call to assume role until it is 

1945 # strictly needed. 

1946 return DeferredRefreshableCredentials( 

1947 method=self.METHOD, 

1948 refresh_using=fetcher.fetch_credentials, 

1949 ) 

1950 

1951 

1952class CanonicalNameCredentialSourcer: 

1953 def __init__(self, providers): 

1954 self._providers = providers 

1955 

1956 def is_supported(self, source_name): 

1957 """Validates a given source name. 

1958 

1959 :type source_name: str 

1960 :param source_name: The value of credential_source in the config 

1961 file. This is the canonical name of the credential provider. 

1962 

1963 :rtype: bool 

1964 :returns: True if the credential provider is supported, 

1965 False otherwise. 

1966 """ 

1967 return source_name in [p.CANONICAL_NAME for p in self._providers] 

1968 

1969 def source_credentials(self, source_name): 

1970 """Loads source credentials based on the provided configuration. 

1971 

1972 :type source_name: str 

1973 :param source_name: The value of credential_source in the config 

1974 file. This is the canonical name of the credential provider. 

1975 

1976 :rtype: Credentials 

1977 """ 

1978 source = self._get_provider(source_name) 

1979 if isinstance(source, CredentialResolver): 

1980 return source.load_credentials() 

1981 return source.load() 

1982 

1983 def _get_provider(self, canonical_name): 

1984 """Return a credential provider by its canonical name. 

1985 

1986 :type canonical_name: str 

1987 :param canonical_name: The canonical name of the provider. 

1988 

1989 :raises UnknownCredentialError: Raised if no 

1990 credential provider by the provided name 

1991 is found. 

1992 """ 

1993 provider = self._get_provider_by_canonical_name(canonical_name) 

1994 

1995 # The AssumeRole provider should really be part of the SharedConfig 

1996 # provider rather than being its own thing, but it is not. It is 

1997 # effectively part of both the SharedConfig provider and the 

1998 # SharedCredentials provider now due to the way it behaves. 

1999 # Therefore if we want either of those providers we should return 

2000 # the AssumeRole provider with it. 

2001 if canonical_name.lower() in ['sharedconfig', 'sharedcredentials']: 

2002 assume_role_provider = self._get_provider_by_method('assume-role') 

2003 if assume_role_provider is not None: 

2004 # The SharedConfig or SharedCredentials provider may not be 

2005 # present if it was removed for some reason, but the 

2006 # AssumeRole provider could still be present. In that case, 

2007 # return the assume role provider by itself. 

2008 if provider is None: 

2009 return assume_role_provider 

2010 

2011 # If both are present, return them both as a 

2012 # CredentialResolver so that calling code can treat them as 

2013 # a single entity. 

2014 return CredentialResolver([assume_role_provider, provider]) 

2015 

2016 if provider is None: 

2017 raise UnknownCredentialError(name=canonical_name) 

2018 

2019 return provider 

2020 

2021 def _get_provider_by_canonical_name(self, canonical_name): 

2022 """Return a credential provider by its canonical name. 

2023 

2024 This function is strict, it does not attempt to address 

2025 compatibility issues. 

2026 """ 

2027 for provider in self._providers: 

2028 name = provider.CANONICAL_NAME 

2029 # Canonical names are case-insensitive 

2030 if name and name.lower() == canonical_name.lower(): 

2031 return provider 

2032 

2033 def _get_provider_by_method(self, method): 

2034 """Return a credential provider by its METHOD name.""" 

2035 for provider in self._providers: 

2036 if provider.METHOD == method: 

2037 return provider 

2038 

2039 

2040class ContainerProvider(CredentialProvider): 

2041 METHOD = 'container-role' 

2042 CANONICAL_NAME = 'EcsContainer' 

2043 ENV_VAR = 'AWS_CONTAINER_CREDENTIALS_RELATIVE_URI' 

2044 ENV_VAR_FULL = 'AWS_CONTAINER_CREDENTIALS_FULL_URI' 

2045 ENV_VAR_AUTH_TOKEN = 'AWS_CONTAINER_AUTHORIZATION_TOKEN' 

2046 ENV_VAR_AUTH_TOKEN_FILE = 'AWS_CONTAINER_AUTHORIZATION_TOKEN_FILE' 

2047 

2048 def __init__(self, environ=None, fetcher=None): 

2049 if environ is None: 

2050 environ = os.environ 

2051 if fetcher is None: 

2052 fetcher = ContainerMetadataFetcher() 

2053 self._environ = environ 

2054 self._fetcher = fetcher 

2055 

2056 def load(self): 

2057 # This cred provider is only triggered if the self.ENV_VAR is set, 

2058 # which only happens if you opt into this feature. 

2059 if self.ENV_VAR in self._environ or self.ENV_VAR_FULL in self._environ: 

2060 return self._retrieve_or_fail() 

2061 

2062 def _retrieve_or_fail(self): 

2063 if self._provided_relative_uri(): 

2064 full_uri = self._fetcher.full_url(self._environ[self.ENV_VAR]) 

2065 else: 

2066 full_uri = self._environ[self.ENV_VAR_FULL] 

2067 fetcher = self._create_fetcher(full_uri) 

2068 creds = fetcher() 

2069 return RefreshableCredentials( 

2070 access_key=creds['access_key'], 

2071 secret_key=creds['secret_key'], 

2072 token=creds['token'], 

2073 method=self.METHOD, 

2074 expiry_time=_parse_if_needed(creds['expiry_time']), 

2075 refresh_using=fetcher, 

2076 account_id=creds.get('account_id'), 

2077 ) 

2078 

2079 def _build_headers(self): 

2080 auth_token = None 

2081 if self.ENV_VAR_AUTH_TOKEN_FILE in self._environ: 

2082 auth_token_file_path = self._environ[self.ENV_VAR_AUTH_TOKEN_FILE] 

2083 with open(auth_token_file_path) as token_file: 

2084 auth_token = token_file.read() 

2085 elif self.ENV_VAR_AUTH_TOKEN in self._environ: 

2086 auth_token = self._environ[self.ENV_VAR_AUTH_TOKEN] 

2087 if auth_token is not None: 

2088 self._validate_auth_token(auth_token) 

2089 return {'Authorization': auth_token} 

2090 

2091 def _validate_auth_token(self, auth_token): 

2092 if "\r" in auth_token or "\n" in auth_token: 

2093 raise ValueError("Auth token value is not a legal header value") 

2094 

2095 def _create_fetcher(self, full_uri, *args, **kwargs): 

2096 def fetch_creds(): 

2097 try: 

2098 headers = self._build_headers() 

2099 response = self._fetcher.retrieve_full_uri( 

2100 full_uri, headers=headers 

2101 ) 

2102 register_feature_id('CREDENTIALS_HTTP') 

2103 except MetadataRetrievalError as e: 

2104 logger.debug( 

2105 "Error retrieving container metadata: %s", e, exc_info=True 

2106 ) 

2107 raise CredentialRetrievalError( 

2108 provider=self.METHOD, error_msg=str(e) 

2109 ) 

2110 return { 

2111 'access_key': response['AccessKeyId'], 

2112 'secret_key': response['SecretAccessKey'], 

2113 'token': response['Token'], 

2114 'expiry_time': response['Expiration'], 

2115 'account_id': response.get('AccountId'), 

2116 } 

2117 

2118 return fetch_creds 

2119 

2120 def _provided_relative_uri(self): 

2121 return self.ENV_VAR in self._environ 

2122 

2123 

2124class CredentialResolver: 

2125 def __init__(self, providers): 

2126 """ 

2127 

2128 :param providers: A list of ``CredentialProvider`` instances. 

2129 

2130 """ 

2131 self.providers = providers 

2132 

2133 def insert_before(self, name, credential_provider): 

2134 """ 

2135 Inserts a new instance of ``CredentialProvider`` into the chain that 

2136 will be tried before an existing one. 

2137 

2138 :param name: The short name of the credentials you'd like to insert the 

2139 new credentials before. (ex. ``env`` or ``config``). Existing names 

2140 & ordering can be discovered via ``self.available_methods``. 

2141 :type name: string 

2142 

2143 :param cred_instance: An instance of the new ``Credentials`` object 

2144 you'd like to add to the chain. 

2145 :type cred_instance: A subclass of ``Credentials`` 

2146 """ 

2147 try: 

2148 offset = [p.METHOD for p in self.providers].index(name) 

2149 except ValueError: 

2150 raise UnknownCredentialError(name=name) 

2151 self.providers.insert(offset, credential_provider) 

2152 

2153 def insert_after(self, name, credential_provider): 

2154 """ 

2155 Inserts a new type of ``Credentials`` instance into the chain that will 

2156 be tried after an existing one. 

2157 

2158 :param name: The short name of the credentials you'd like to insert the 

2159 new credentials after. (ex. ``env`` or ``config``). Existing names 

2160 & ordering can be discovered via ``self.available_methods``. 

2161 :type name: string 

2162 

2163 :param cred_instance: An instance of the new ``Credentials`` object 

2164 you'd like to add to the chain. 

2165 :type cred_instance: A subclass of ``Credentials`` 

2166 """ 

2167 offset = self._get_provider_offset(name) 

2168 self.providers.insert(offset + 1, credential_provider) 

2169 

2170 def remove(self, name): 

2171 """ 

2172 Removes a given ``Credentials`` instance from the chain. 

2173 

2174 :param name: The short name of the credentials instance to remove. 

2175 :type name: string 

2176 """ 

2177 available_methods = [p.METHOD for p in self.providers] 

2178 if name not in available_methods: 

2179 # It's not present. Fail silently. 

2180 return 

2181 

2182 offset = available_methods.index(name) 

2183 self.providers.pop(offset) 

2184 

2185 def get_provider(self, name): 

2186 """Return a credential provider by name. 

2187 

2188 :type name: str 

2189 :param name: The name of the provider. 

2190 

2191 :raises UnknownCredentialError: Raised if no 

2192 credential provider by the provided name 

2193 is found. 

2194 """ 

2195 return self.providers[self._get_provider_offset(name)] 

2196 

2197 def _get_provider_offset(self, name): 

2198 try: 

2199 return [p.METHOD for p in self.providers].index(name) 

2200 except ValueError: 

2201 raise UnknownCredentialError(name=name) 

2202 

2203 def load_credentials(self): 

2204 """ 

2205 Goes through the credentials chain, returning the first ``Credentials`` 

2206 that could be loaded. 

2207 """ 

2208 # First provider to return a non-None response wins. 

2209 for provider in self.providers: 

2210 logger.debug("Looking for credentials via: %s", provider.METHOD) 

2211 creds = provider.load() 

2212 if creds is not None: 

2213 return creds 

2214 

2215 # If we got here, no credentials could be found. 

2216 # This feels like it should be an exception, but historically, ``None`` 

2217 # is returned. 

2218 # 

2219 # +1 

2220 # -js 

2221 return None 

2222 

2223 

2224class SSOCredentialFetcher(CachedCredentialFetcher): 

2225 _UTC_DATE_FORMAT = '%Y-%m-%dT%H:%M:%SZ' 

2226 

2227 def __init__( 

2228 self, 

2229 start_url, 

2230 sso_region, 

2231 role_name, 

2232 account_id, 

2233 client_creator, 

2234 token_loader=None, 

2235 cache=None, 

2236 expiry_window_seconds=None, 

2237 token_provider=None, 

2238 sso_session_name=None, 

2239 time_fetcher=_local_now, 

2240 ): 

2241 self._client_creator = client_creator 

2242 self._sso_region = sso_region 

2243 self._role_name = role_name 

2244 self._account_id = account_id 

2245 self._start_url = start_url 

2246 self._token_loader = token_loader 

2247 self._token_provider = token_provider 

2248 self._sso_session_name = sso_session_name 

2249 self._time_fetcher = time_fetcher 

2250 super().__init__(cache, expiry_window_seconds) 

2251 

2252 def _create_cache_key(self): 

2253 """Create a predictable cache key for the current configuration. 

2254 

2255 The cache key is intended to be compatible with file names. 

2256 """ 

2257 args = { 

2258 'roleName': self._role_name, 

2259 'accountId': self._account_id, 

2260 } 

2261 if self._sso_session_name: 

2262 args['sessionName'] = self._sso_session_name 

2263 else: 

2264 args['startUrl'] = self._start_url 

2265 # NOTE: It would be good to hoist this cache key construction logic 

2266 # into the CachedCredentialFetcher class as we should be consistent. 

2267 # Unfortunately, the current assume role fetchers that sub class don't 

2268 # pass separators resulting in non-minified JSON. In the long term, 

2269 # all fetchers should use the below caching scheme. 

2270 args = json.dumps(args, sort_keys=True, separators=(',', ':')) 

2271 argument_hash = sha1(args.encode('utf-8')).hexdigest() 

2272 return self._make_file_safe(argument_hash) 

2273 

2274 def _parse_timestamp(self, timestamp_ms): 

2275 # fromtimestamp expects seconds so: milliseconds / 1000 = seconds 

2276 timestamp_seconds = timestamp_ms / 1000.0 

2277 timestamp = datetime.datetime.fromtimestamp(timestamp_seconds, tzutc()) 

2278 return timestamp.strftime(self._UTC_DATE_FORMAT) 

2279 

2280 def _get_credentials(self): 

2281 """Get credentials by calling SSO get role credentials.""" 

2282 config = Config( 

2283 signature_version=UNSIGNED, 

2284 region_name=self._sso_region, 

2285 ) 

2286 client = self._client_creator('sso', config=config) 

2287 if self._token_provider: 

2288 initial_token_data = self._token_provider.load_token() 

2289 token = initial_token_data.get_frozen_token().token 

2290 else: 

2291 token_dict = self._token_loader(self._start_url) 

2292 token = token_dict['accessToken'] 

2293 

2294 # raise an UnauthorizedSSOTokenError if the loaded legacy token 

2295 # is expired to save a call to GetRoleCredentials with an 

2296 # expired token. 

2297 expiration = dateutil.parser.parse(token_dict['expiresAt']) 

2298 remaining = total_seconds(expiration - self._time_fetcher()) 

2299 if remaining <= 0: 

2300 raise UnauthorizedSSOTokenError() 

2301 

2302 kwargs = { 

2303 'roleName': self._role_name, 

2304 'accountId': self._account_id, 

2305 'accessToken': token, 

2306 } 

2307 try: 

2308 register_feature_ids(self.feature_ids) 

2309 response = client.get_role_credentials(**kwargs) 

2310 except client.exceptions.UnauthorizedException: 

2311 raise UnauthorizedSSOTokenError() 

2312 credentials = response['roleCredentials'] 

2313 

2314 credentials = { 

2315 'ProviderType': 'sso', 

2316 'Credentials': { 

2317 'AccessKeyId': credentials['accessKeyId'], 

2318 'SecretAccessKey': credentials['secretAccessKey'], 

2319 'SessionToken': credentials['sessionToken'], 

2320 'Expiration': self._parse_timestamp(credentials['expiration']), 

2321 'AccountId': self._account_id, 

2322 }, 

2323 } 

2324 return credentials 

2325 

2326 

2327class SSOProvider(CredentialProvider): 

2328 METHOD = 'sso' 

2329 

2330 _SSO_TOKEN_CACHE_DIR = os.path.expanduser( 

2331 os.path.join('~', '.aws', 'sso', 'cache') 

2332 ) 

2333 _PROFILE_REQUIRED_CONFIG_VARS = ( 

2334 'sso_role_name', 

2335 'sso_account_id', 

2336 ) 

2337 _SSO_REQUIRED_CONFIG_VARS = ( 

2338 'sso_start_url', 

2339 'sso_region', 

2340 ) 

2341 _ALL_REQUIRED_CONFIG_VARS = ( 

2342 _PROFILE_REQUIRED_CONFIG_VARS + _SSO_REQUIRED_CONFIG_VARS 

2343 ) 

2344 

2345 def __init__( 

2346 self, 

2347 load_config, 

2348 client_creator, 

2349 profile_name, 

2350 cache=None, 

2351 token_cache=None, 

2352 token_provider=None, 

2353 ): 

2354 if token_cache is None: 

2355 token_cache = JSONFileCache(self._SSO_TOKEN_CACHE_DIR) 

2356 self._token_cache = token_cache 

2357 self._token_provider = token_provider 

2358 if cache is None: 

2359 cache = {} 

2360 self.cache = cache 

2361 self._load_config = load_config 

2362 self._client_creator = client_creator 

2363 self._profile_name = profile_name 

2364 self._feature_ids = set() 

2365 

2366 def _load_sso_config(self): 

2367 loaded_config = self._load_config() 

2368 profiles = loaded_config.get('profiles', {}) 

2369 profile_name = self._profile_name 

2370 profile_config = profiles.get(self._profile_name, {}) 

2371 sso_sessions = loaded_config.get('sso_sessions', {}) 

2372 

2373 # Role name & Account ID indicate the cred provider should be used 

2374 if all( 

2375 c not in profile_config for c in self._PROFILE_REQUIRED_CONFIG_VARS 

2376 ): 

2377 return None 

2378 

2379 resolved_config, extra_reqs = self._resolve_sso_session_reference( 

2380 profile_config, sso_sessions 

2381 ) 

2382 

2383 config = {} 

2384 missing_config_vars = [] 

2385 all_required_configs = self._ALL_REQUIRED_CONFIG_VARS + extra_reqs 

2386 for config_var in all_required_configs: 

2387 if config_var in resolved_config: 

2388 config[config_var] = resolved_config[config_var] 

2389 else: 

2390 missing_config_vars.append(config_var) 

2391 

2392 if missing_config_vars: 

2393 missing = ', '.join(missing_config_vars) 

2394 raise InvalidConfigError( 

2395 error_msg=( 

2396 f'The profile "{profile_name}" is configured to use SSO ' 

2397 f'but is missing required configuration: {missing}' 

2398 ) 

2399 ) 

2400 return config 

2401 

2402 def _resolve_sso_session_reference(self, profile_config, sso_sessions): 

2403 sso_session_name = profile_config.get('sso_session') 

2404 if sso_session_name is None: 

2405 # No reference to resolve, proceed with legacy flow 

2406 return profile_config, () 

2407 

2408 if sso_session_name not in sso_sessions: 

2409 error_msg = f'The specified sso-session does not exist: "{sso_session_name}"' 

2410 raise InvalidConfigError(error_msg=error_msg) 

2411 

2412 config = profile_config.copy() 

2413 session = sso_sessions[sso_session_name] 

2414 for config_var, val in session.items(): 

2415 # Validate any keys referenced in both profile and sso_session match 

2416 if config.get(config_var, val) != val: 

2417 error_msg = ( 

2418 f"The value for {config_var} is inconsistent between " 

2419 f"profile ({config[config_var]}) and sso-session ({val})." 

2420 ) 

2421 raise InvalidConfigError(error_msg=error_msg) 

2422 config[config_var] = val 

2423 return config, ('sso_session',) 

2424 

2425 def load(self): 

2426 sso_config = self._load_sso_config() 

2427 if not sso_config: 

2428 return None 

2429 

2430 fetcher_kwargs = { 

2431 'start_url': sso_config['sso_start_url'], 

2432 'sso_region': sso_config['sso_region'], 

2433 'role_name': sso_config['sso_role_name'], 

2434 'account_id': sso_config['sso_account_id'], 

2435 'client_creator': self._client_creator, 

2436 'token_loader': SSOTokenLoader(cache=self._token_cache), 

2437 'cache': self.cache, 

2438 } 

2439 sso_session_in_config = 'sso_session' in sso_config 

2440 if sso_session_in_config: 

2441 fetcher_kwargs['sso_session_name'] = sso_config['sso_session'] 

2442 fetcher_kwargs['token_provider'] = self._token_provider 

2443 self._feature_ids.add('CREDENTIALS_PROFILE_SSO') 

2444 else: 

2445 self._feature_ids.add('CREDENTIALS_PROFILE_SSO_LEGACY') 

2446 

2447 sso_fetcher = SSOCredentialFetcher(**fetcher_kwargs) 

2448 sso_fetcher.feature_ids = self._feature_ids.copy() 

2449 

2450 if sso_session_in_config: 

2451 self._feature_ids.add('CREDENTIALS_SSO') 

2452 else: 

2453 self._feature_ids.add('CREDENTIALS_SSO_LEGACY') 

2454 

2455 register_feature_ids(self._feature_ids) 

2456 return DeferredRefreshableCredentials( 

2457 method=self.METHOD, 

2458 refresh_using=sso_fetcher.fetch_credentials, 

2459 )