Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/botocore/credentials.py: 25%
979 statements
« prev ^ index » next coverage.py v7.3.2, created at 2023-12-08 06:51 +0000
« prev ^ index » next coverage.py v7.3.2, created at 2023-12-08 06:51 +0000
1# Copyright (c) 2012-2013 Mitch Garnaat http://garnaat.org/
2# Copyright 2012-2014 Amazon.com, Inc. or its affiliates. All Rights Reserved.
3#
4# Licensed under the Apache License, Version 2.0 (the "License"). You
5# may not use this file except in compliance with the License. A copy of
6# the License is located at
7#
8# http://aws.amazon.com/apache2.0/
9#
10# or in the "license" file accompanying this file. This file is
11# distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF
12# ANY KIND, either express or implied. See the License for the specific
13# language governing permissions and limitations under the License.
14import datetime
15import getpass
16import json
17import logging
18import os
19import subprocess
20import threading
21import time
22from collections import namedtuple
23from copy import deepcopy
24from hashlib import sha1
26from dateutil.parser import parse
27from dateutil.tz import tzlocal, tzutc
29import botocore.compat
30import botocore.configloader
31from botocore import UNSIGNED
32from botocore.compat import compat_shell_split, total_seconds
33from botocore.config import Config
34from botocore.exceptions import (
35 ConfigNotFound,
36 CredentialRetrievalError,
37 InfiniteLoopConfigError,
38 InvalidConfigError,
39 MetadataRetrievalError,
40 PartialCredentialsError,
41 RefreshWithMFAUnsupportedError,
42 UnauthorizedSSOTokenError,
43 UnknownCredentialError,
44)
45from botocore.tokens import SSOTokenProvider
46from botocore.utils import (
47 ContainerMetadataFetcher,
48 FileWebIdentityTokenLoader,
49 InstanceMetadataFetcher,
50 JSONFileCache,
51 SSOTokenLoader,
52 parse_key_val_file,
53 resolve_imds_endpoint_mode,
54)
56logger = logging.getLogger(__name__)
57ReadOnlyCredentials = namedtuple(
58 'ReadOnlyCredentials', ['access_key', 'secret_key', 'token']
59)
61_DEFAULT_MANDATORY_REFRESH_TIMEOUT = 10 * 60 # 10 min
62_DEFAULT_ADVISORY_REFRESH_TIMEOUT = 15 * 60 # 15 min
65def create_credential_resolver(session, cache=None, region_name=None):
66 """Create a default credential resolver.
68 This creates a pre-configured credential resolver
69 that includes the default lookup chain for
70 credentials.
72 """
73 profile_name = session.get_config_variable('profile') or 'default'
74 metadata_timeout = session.get_config_variable('metadata_service_timeout')
75 num_attempts = session.get_config_variable('metadata_service_num_attempts')
76 disable_env_vars = session.instance_variables().get('profile') is not None
78 imds_config = {
79 'ec2_metadata_service_endpoint': session.get_config_variable(
80 'ec2_metadata_service_endpoint'
81 ),
82 'ec2_metadata_service_endpoint_mode': resolve_imds_endpoint_mode(
83 session
84 ),
85 'ec2_credential_refresh_window': _DEFAULT_ADVISORY_REFRESH_TIMEOUT,
86 'ec2_metadata_v1_disabled': session.get_config_variable(
87 'ec2_metadata_v1_disabled'
88 ),
89 }
91 if cache is None:
92 cache = {}
94 env_provider = EnvProvider()
95 container_provider = ContainerProvider()
96 instance_metadata_provider = InstanceMetadataProvider(
97 iam_role_fetcher=InstanceMetadataFetcher(
98 timeout=metadata_timeout,
99 num_attempts=num_attempts,
100 user_agent=session.user_agent(),
101 config=imds_config,
102 )
103 )
105 profile_provider_builder = ProfileProviderBuilder(
106 session, cache=cache, region_name=region_name
107 )
108 assume_role_provider = AssumeRoleProvider(
109 load_config=lambda: session.full_config,
110 client_creator=_get_client_creator(session, region_name),
111 cache=cache,
112 profile_name=profile_name,
113 credential_sourcer=CanonicalNameCredentialSourcer(
114 [env_provider, container_provider, instance_metadata_provider]
115 ),
116 profile_provider_builder=profile_provider_builder,
117 )
119 pre_profile = [
120 env_provider,
121 assume_role_provider,
122 ]
123 profile_providers = profile_provider_builder.providers(
124 profile_name=profile_name,
125 disable_env_vars=disable_env_vars,
126 )
127 post_profile = [
128 OriginalEC2Provider(),
129 BotoProvider(),
130 container_provider,
131 instance_metadata_provider,
132 ]
133 providers = pre_profile + profile_providers + post_profile
135 if disable_env_vars:
136 # An explicitly provided profile will negate an EnvProvider.
137 # We will defer to providers that understand the "profile"
138 # concept to retrieve credentials.
139 # The one edge case if is all three values are provided via
140 # env vars:
141 # export AWS_ACCESS_KEY_ID=foo
142 # export AWS_SECRET_ACCESS_KEY=bar
143 # export AWS_PROFILE=baz
144 # Then, just like our client() calls, the explicit credentials
145 # will take precedence.
146 #
147 # This precedence is enforced by leaving the EnvProvider in the chain.
148 # This means that the only way a "profile" would win is if the
149 # EnvProvider does not return credentials, which is what we want
150 # in this scenario.
151 providers.remove(env_provider)
152 logger.debug(
153 'Skipping environment variable credential check'
154 ' because profile name was explicitly set.'
155 )
157 resolver = CredentialResolver(providers=providers)
158 return resolver
161class ProfileProviderBuilder:
162 """This class handles the creation of profile based providers.
164 NOTE: This class is only intended for internal use.
166 This class handles the creation and ordering of the various credential
167 providers that primarly source their configuration from the shared config.
168 This is needed to enable sharing between the default credential chain and
169 the source profile chain created by the assume role provider.
170 """
172 def __init__(
173 self, session, cache=None, region_name=None, sso_token_cache=None
174 ):
175 self._session = session
176 self._cache = cache
177 self._region_name = region_name
178 self._sso_token_cache = sso_token_cache
180 def providers(self, profile_name, disable_env_vars=False):
181 return [
182 self._create_web_identity_provider(
183 profile_name,
184 disable_env_vars,
185 ),
186 self._create_sso_provider(profile_name),
187 self._create_shared_credential_provider(profile_name),
188 self._create_process_provider(profile_name),
189 self._create_config_provider(profile_name),
190 ]
192 def _create_process_provider(self, profile_name):
193 return ProcessProvider(
194 profile_name=profile_name,
195 load_config=lambda: self._session.full_config,
196 )
198 def _create_shared_credential_provider(self, profile_name):
199 credential_file = self._session.get_config_variable('credentials_file')
200 return SharedCredentialProvider(
201 profile_name=profile_name,
202 creds_filename=credential_file,
203 )
205 def _create_config_provider(self, profile_name):
206 config_file = self._session.get_config_variable('config_file')
207 return ConfigProvider(
208 profile_name=profile_name,
209 config_filename=config_file,
210 )
212 def _create_web_identity_provider(self, profile_name, disable_env_vars):
213 return AssumeRoleWithWebIdentityProvider(
214 load_config=lambda: self._session.full_config,
215 client_creator=_get_client_creator(
216 self._session, self._region_name
217 ),
218 cache=self._cache,
219 profile_name=profile_name,
220 disable_env_vars=disable_env_vars,
221 )
223 def _create_sso_provider(self, profile_name):
224 return SSOProvider(
225 load_config=lambda: self._session.full_config,
226 client_creator=self._session.create_client,
227 profile_name=profile_name,
228 cache=self._cache,
229 token_cache=self._sso_token_cache,
230 token_provider=SSOTokenProvider(
231 self._session,
232 cache=self._sso_token_cache,
233 profile_name=profile_name,
234 ),
235 )
238def get_credentials(session):
239 resolver = create_credential_resolver(session)
240 return resolver.load_credentials()
243def _local_now():
244 return datetime.datetime.now(tzlocal())
247def _parse_if_needed(value):
248 if isinstance(value, datetime.datetime):
249 return value
250 return parse(value)
253def _serialize_if_needed(value, iso=False):
254 if isinstance(value, datetime.datetime):
255 if iso:
256 return value.isoformat()
257 return value.strftime('%Y-%m-%dT%H:%M:%S%Z')
258 return value
261def _get_client_creator(session, region_name):
262 def client_creator(service_name, **kwargs):
263 create_client_kwargs = {'region_name': region_name}
264 create_client_kwargs.update(**kwargs)
265 return session.create_client(service_name, **create_client_kwargs)
267 return client_creator
270def create_assume_role_refresher(client, params):
271 def refresh():
272 response = client.assume_role(**params)
273 credentials = response['Credentials']
274 # We need to normalize the credential names to
275 # the values expected by the refresh creds.
276 return {
277 'access_key': credentials['AccessKeyId'],
278 'secret_key': credentials['SecretAccessKey'],
279 'token': credentials['SessionToken'],
280 'expiry_time': _serialize_if_needed(credentials['Expiration']),
281 }
283 return refresh
286def create_mfa_serial_refresher(actual_refresh):
287 class _Refresher:
288 def __init__(self, refresh):
289 self._refresh = refresh
290 self._has_been_called = False
292 def __call__(self):
293 if self._has_been_called:
294 # We can explore an option in the future to support
295 # reprompting for MFA, but for now we just error out
296 # when the temp creds expire.
297 raise RefreshWithMFAUnsupportedError()
298 self._has_been_called = True
299 return self._refresh()
301 return _Refresher(actual_refresh)
304class Credentials:
305 """
306 Holds the credentials needed to authenticate requests.
308 :param str access_key: The access key part of the credentials.
309 :param str secret_key: The secret key part of the credentials.
310 :param str token: The security token, valid only for session credentials.
311 :param str method: A string which identifies where the credentials
312 were found.
313 """
315 def __init__(self, access_key, secret_key, token=None, method=None):
316 self.access_key = access_key
317 self.secret_key = secret_key
318 self.token = token
320 if method is None:
321 method = 'explicit'
322 self.method = method
324 self._normalize()
326 def _normalize(self):
327 # Keys would sometimes (accidentally) contain non-ascii characters.
328 # It would cause a confusing UnicodeDecodeError in Python 2.
329 # We explicitly convert them into unicode to avoid such error.
330 #
331 # Eventually the service will decide whether to accept the credential.
332 # This also complies with the behavior in Python 3.
333 self.access_key = botocore.compat.ensure_unicode(self.access_key)
334 self.secret_key = botocore.compat.ensure_unicode(self.secret_key)
336 def get_frozen_credentials(self):
337 return ReadOnlyCredentials(
338 self.access_key, self.secret_key, self.token
339 )
342class RefreshableCredentials(Credentials):
343 """
344 Holds the credentials needed to authenticate requests. In addition, it
345 knows how to refresh itself.
347 :param str access_key: The access key part of the credentials.
348 :param str secret_key: The secret key part of the credentials.
349 :param str token: The security token, valid only for session credentials.
350 :param datetime expiry_time: The expiration time of the credentials.
351 :param function refresh_using: Callback function to refresh the credentials.
352 :param str method: A string which identifies where the credentials
353 were found.
354 :param function time_fetcher: Callback function to retrieve current time.
355 """
357 # The time at which we'll attempt to refresh, but not
358 # block if someone else is refreshing.
359 _advisory_refresh_timeout = _DEFAULT_ADVISORY_REFRESH_TIMEOUT
360 # The time at which all threads will block waiting for
361 # refreshed credentials.
362 _mandatory_refresh_timeout = _DEFAULT_MANDATORY_REFRESH_TIMEOUT
364 def __init__(
365 self,
366 access_key,
367 secret_key,
368 token,
369 expiry_time,
370 refresh_using,
371 method,
372 time_fetcher=_local_now,
373 advisory_timeout=None,
374 mandatory_timeout=None,
375 ):
376 self._refresh_using = refresh_using
377 self._access_key = access_key
378 self._secret_key = secret_key
379 self._token = token
380 self._expiry_time = expiry_time
381 self._time_fetcher = time_fetcher
382 self._refresh_lock = threading.Lock()
383 self.method = method
384 self._frozen_credentials = ReadOnlyCredentials(
385 access_key, secret_key, token
386 )
387 self._normalize()
388 if advisory_timeout is not None:
389 self._advisory_refresh_timeout = advisory_timeout
390 if mandatory_timeout is not None:
391 self._mandatory_refresh_timeout = mandatory_timeout
393 def _normalize(self):
394 self._access_key = botocore.compat.ensure_unicode(self._access_key)
395 self._secret_key = botocore.compat.ensure_unicode(self._secret_key)
397 @classmethod
398 def create_from_metadata(
399 cls,
400 metadata,
401 refresh_using,
402 method,
403 advisory_timeout=None,
404 mandatory_timeout=None,
405 ):
406 kwargs = {}
407 if advisory_timeout is not None:
408 kwargs['advisory_timeout'] = advisory_timeout
409 if mandatory_timeout is not None:
410 kwargs['mandatory_timeout'] = mandatory_timeout
412 instance = cls(
413 access_key=metadata['access_key'],
414 secret_key=metadata['secret_key'],
415 token=metadata['token'],
416 expiry_time=cls._expiry_datetime(metadata['expiry_time']),
417 method=method,
418 refresh_using=refresh_using,
419 **kwargs,
420 )
421 return instance
423 @property
424 def access_key(self):
425 """Warning: Using this property can lead to race conditions if you
426 access another property subsequently along the refresh boundary.
427 Please use get_frozen_credentials instead.
428 """
429 self._refresh()
430 return self._access_key
432 @access_key.setter
433 def access_key(self, value):
434 self._access_key = value
436 @property
437 def secret_key(self):
438 """Warning: Using this property can lead to race conditions if you
439 access another property subsequently along the refresh boundary.
440 Please use get_frozen_credentials instead.
441 """
442 self._refresh()
443 return self._secret_key
445 @secret_key.setter
446 def secret_key(self, value):
447 self._secret_key = value
449 @property
450 def token(self):
451 """Warning: Using this property can lead to race conditions if you
452 access another property subsequently along the refresh boundary.
453 Please use get_frozen_credentials instead.
454 """
455 self._refresh()
456 return self._token
458 @token.setter
459 def token(self, value):
460 self._token = value
462 def _seconds_remaining(self):
463 delta = self._expiry_time - self._time_fetcher()
464 return total_seconds(delta)
466 def refresh_needed(self, refresh_in=None):
467 """Check if a refresh is needed.
469 A refresh is needed if the expiry time associated
470 with the temporary credentials is less than the
471 provided ``refresh_in``. If ``time_delta`` is not
472 provided, ``self.advisory_refresh_needed`` will be used.
474 For example, if your temporary credentials expire
475 in 10 minutes and the provided ``refresh_in`` is
476 ``15 * 60``, then this function will return ``True``.
478 :type refresh_in: int
479 :param refresh_in: The number of seconds before the
480 credentials expire in which refresh attempts should
481 be made.
483 :return: True if refresh needed, False otherwise.
485 """
486 if self._expiry_time is None:
487 # No expiration, so assume we don't need to refresh.
488 return False
490 if refresh_in is None:
491 refresh_in = self._advisory_refresh_timeout
492 # The credentials should be refreshed if they're going to expire
493 # in less than 5 minutes.
494 if self._seconds_remaining() >= refresh_in:
495 # There's enough time left. Don't refresh.
496 return False
497 logger.debug("Credentials need to be refreshed.")
498 return True
500 def _is_expired(self):
501 # Checks if the current credentials are expired.
502 return self.refresh_needed(refresh_in=0)
504 def _refresh(self):
505 # In the common case where we don't need a refresh, we
506 # can immediately exit and not require acquiring the
507 # refresh lock.
508 if not self.refresh_needed(self._advisory_refresh_timeout):
509 return
511 # acquire() doesn't accept kwargs, but False is indicating
512 # that we should not block if we can't acquire the lock.
513 # If we aren't able to acquire the lock, we'll trigger
514 # the else clause.
515 if self._refresh_lock.acquire(False):
516 try:
517 if not self.refresh_needed(self._advisory_refresh_timeout):
518 return
519 is_mandatory_refresh = self.refresh_needed(
520 self._mandatory_refresh_timeout
521 )
522 self._protected_refresh(is_mandatory=is_mandatory_refresh)
523 return
524 finally:
525 self._refresh_lock.release()
526 elif self.refresh_needed(self._mandatory_refresh_timeout):
527 # If we're within the mandatory refresh window,
528 # we must block until we get refreshed credentials.
529 with self._refresh_lock:
530 if not self.refresh_needed(self._mandatory_refresh_timeout):
531 return
532 self._protected_refresh(is_mandatory=True)
534 def _protected_refresh(self, is_mandatory):
535 # precondition: this method should only be called if you've acquired
536 # the self._refresh_lock.
537 try:
538 metadata = self._refresh_using()
539 except Exception:
540 period_name = 'mandatory' if is_mandatory else 'advisory'
541 logger.warning(
542 "Refreshing temporary credentials failed "
543 "during %s refresh period.",
544 period_name,
545 exc_info=True,
546 )
547 if is_mandatory:
548 # If this is a mandatory refresh, then
549 # all errors that occur when we attempt to refresh
550 # credentials are propagated back to the user.
551 raise
552 # Otherwise we'll just return.
553 # The end result will be that we'll use the current
554 # set of temporary credentials we have.
555 return
556 self._set_from_data(metadata)
557 self._frozen_credentials = ReadOnlyCredentials(
558 self._access_key, self._secret_key, self._token
559 )
560 if self._is_expired():
561 # We successfully refreshed credentials but for whatever
562 # reason, our refreshing function returned credentials
563 # that are still expired. In this scenario, the only
564 # thing we can do is let the user know and raise
565 # an exception.
566 msg = (
567 "Credentials were refreshed, but the "
568 "refreshed credentials are still expired."
569 )
570 logger.warning(msg)
571 raise RuntimeError(msg)
573 @staticmethod
574 def _expiry_datetime(time_str):
575 return parse(time_str)
577 def _set_from_data(self, data):
578 expected_keys = ['access_key', 'secret_key', 'token', 'expiry_time']
579 if not data:
580 missing_keys = expected_keys
581 else:
582 missing_keys = [k for k in expected_keys if k not in data]
584 if missing_keys:
585 message = "Credential refresh failed, response did not contain: %s"
586 raise CredentialRetrievalError(
587 provider=self.method,
588 error_msg=message % ', '.join(missing_keys),
589 )
591 self.access_key = data['access_key']
592 self.secret_key = data['secret_key']
593 self.token = data['token']
594 self._expiry_time = parse(data['expiry_time'])
595 logger.debug(
596 "Retrieved credentials will expire at: %s", self._expiry_time
597 )
598 self._normalize()
600 def get_frozen_credentials(self):
601 """Return immutable credentials.
603 The ``access_key``, ``secret_key``, and ``token`` properties
604 on this class will always check and refresh credentials if
605 needed before returning the particular credentials.
607 This has an edge case where you can get inconsistent
608 credentials. Imagine this:
610 # Current creds are "t1"
611 tmp.access_key ---> expired? no, so return t1.access_key
612 # ---- time is now expired, creds need refreshing to "t2" ----
613 tmp.secret_key ---> expired? yes, refresh and return t2.secret_key
615 This means we're using the access key from t1 with the secret key
616 from t2. To fix this issue, you can request a frozen credential object
617 which is guaranteed not to change.
619 The frozen credentials returned from this method should be used
620 immediately and then discarded. The typical usage pattern would
621 be::
623 creds = RefreshableCredentials(...)
624 some_code = SomeSignerObject()
625 # I'm about to sign the request.
626 # The frozen credentials are only used for the
627 # duration of generate_presigned_url and will be
628 # immediately thrown away.
629 request = some_code.sign_some_request(
630 with_credentials=creds.get_frozen_credentials())
631 print("Signed request:", request)
633 """
634 self._refresh()
635 return self._frozen_credentials
638class DeferredRefreshableCredentials(RefreshableCredentials):
639 """Refreshable credentials that don't require initial credentials.
641 refresh_using will be called upon first access.
642 """
644 def __init__(self, refresh_using, method, time_fetcher=_local_now):
645 self._refresh_using = refresh_using
646 self._access_key = None
647 self._secret_key = None
648 self._token = None
649 self._expiry_time = None
650 self._time_fetcher = time_fetcher
651 self._refresh_lock = threading.Lock()
652 self.method = method
653 self._frozen_credentials = None
655 def refresh_needed(self, refresh_in=None):
656 if self._frozen_credentials is None:
657 return True
658 return super().refresh_needed(refresh_in)
661class CachedCredentialFetcher:
662 DEFAULT_EXPIRY_WINDOW_SECONDS = 60 * 15
664 def __init__(self, cache=None, expiry_window_seconds=None):
665 if cache is None:
666 cache = {}
667 self._cache = cache
668 self._cache_key = self._create_cache_key()
669 if expiry_window_seconds is None:
670 expiry_window_seconds = self.DEFAULT_EXPIRY_WINDOW_SECONDS
671 self._expiry_window_seconds = expiry_window_seconds
673 def _create_cache_key(self):
674 raise NotImplementedError('_create_cache_key()')
676 def _make_file_safe(self, filename):
677 # Replace :, path sep, and / to make it the string filename safe.
678 filename = filename.replace(':', '_').replace(os.sep, '_')
679 return filename.replace('/', '_')
681 def _get_credentials(self):
682 raise NotImplementedError('_get_credentials()')
684 def fetch_credentials(self):
685 return self._get_cached_credentials()
687 def _get_cached_credentials(self):
688 """Get up-to-date credentials.
690 This will check the cache for up-to-date credentials, calling assume
691 role if none are available.
692 """
693 response = self._load_from_cache()
694 if response is None:
695 response = self._get_credentials()
696 self._write_to_cache(response)
697 else:
698 logger.debug("Credentials for role retrieved from cache.")
700 creds = response['Credentials']
701 expiration = _serialize_if_needed(creds['Expiration'], iso=True)
702 return {
703 'access_key': creds['AccessKeyId'],
704 'secret_key': creds['SecretAccessKey'],
705 'token': creds['SessionToken'],
706 'expiry_time': expiration,
707 }
709 def _load_from_cache(self):
710 if self._cache_key in self._cache:
711 creds = deepcopy(self._cache[self._cache_key])
712 if not self._is_expired(creds):
713 return creds
714 else:
715 logger.debug(
716 "Credentials were found in cache, but they are expired."
717 )
718 return None
720 def _write_to_cache(self, response):
721 self._cache[self._cache_key] = deepcopy(response)
723 def _is_expired(self, credentials):
724 """Check if credentials are expired."""
725 end_time = _parse_if_needed(credentials['Credentials']['Expiration'])
726 seconds = total_seconds(end_time - _local_now())
727 return seconds < self._expiry_window_seconds
730class BaseAssumeRoleCredentialFetcher(CachedCredentialFetcher):
731 def __init__(
732 self,
733 client_creator,
734 role_arn,
735 extra_args=None,
736 cache=None,
737 expiry_window_seconds=None,
738 ):
739 self._client_creator = client_creator
740 self._role_arn = role_arn
742 if extra_args is None:
743 self._assume_kwargs = {}
744 else:
745 self._assume_kwargs = deepcopy(extra_args)
746 self._assume_kwargs['RoleArn'] = self._role_arn
748 self._role_session_name = self._assume_kwargs.get('RoleSessionName')
749 self._using_default_session_name = False
750 if not self._role_session_name:
751 self._generate_assume_role_name()
753 super().__init__(cache, expiry_window_seconds)
755 def _generate_assume_role_name(self):
756 self._role_session_name = 'botocore-session-%s' % (int(time.time()))
757 self._assume_kwargs['RoleSessionName'] = self._role_session_name
758 self._using_default_session_name = True
760 def _create_cache_key(self):
761 """Create a predictable cache key for the current configuration.
763 The cache key is intended to be compatible with file names.
764 """
765 args = deepcopy(self._assume_kwargs)
767 # The role session name gets randomly generated, so we don't want it
768 # in the hash.
769 if self._using_default_session_name:
770 del args['RoleSessionName']
772 if 'Policy' in args:
773 # To have a predictable hash, the keys of the policy must be
774 # sorted, so we have to load it here to make sure it gets sorted
775 # later on.
776 args['Policy'] = json.loads(args['Policy'])
778 args = json.dumps(args, sort_keys=True)
779 argument_hash = sha1(args.encode('utf-8')).hexdigest()
780 return self._make_file_safe(argument_hash)
783class AssumeRoleCredentialFetcher(BaseAssumeRoleCredentialFetcher):
784 def __init__(
785 self,
786 client_creator,
787 source_credentials,
788 role_arn,
789 extra_args=None,
790 mfa_prompter=None,
791 cache=None,
792 expiry_window_seconds=None,
793 ):
794 """
795 :type client_creator: callable
796 :param client_creator: A callable that creates a client taking
797 arguments like ``Session.create_client``.
799 :type source_credentials: Credentials
800 :param source_credentials: The credentials to use to create the
801 client for the call to AssumeRole.
803 :type role_arn: str
804 :param role_arn: The ARN of the role to be assumed.
806 :type extra_args: dict
807 :param extra_args: Any additional arguments to add to the assume
808 role request using the format of the botocore operation.
809 Possible keys include, but may not be limited to,
810 DurationSeconds, Policy, SerialNumber, ExternalId and
811 RoleSessionName.
813 :type mfa_prompter: callable
814 :param mfa_prompter: A callable that returns input provided by the
815 user (i.e raw_input, getpass.getpass, etc.).
817 :type cache: dict
818 :param cache: An object that supports ``__getitem__``,
819 ``__setitem__``, and ``__contains__``. An example of this is
820 the ``JSONFileCache`` class in aws-cli.
822 :type expiry_window_seconds: int
823 :param expiry_window_seconds: The amount of time, in seconds,
824 """
825 self._source_credentials = source_credentials
826 self._mfa_prompter = mfa_prompter
827 if self._mfa_prompter is None:
828 self._mfa_prompter = getpass.getpass
830 super().__init__(
831 client_creator,
832 role_arn,
833 extra_args=extra_args,
834 cache=cache,
835 expiry_window_seconds=expiry_window_seconds,
836 )
838 def _get_credentials(self):
839 """Get credentials by calling assume role."""
840 kwargs = self._assume_role_kwargs()
841 client = self._create_client()
842 return client.assume_role(**kwargs)
844 def _assume_role_kwargs(self):
845 """Get the arguments for assume role based on current configuration."""
846 assume_role_kwargs = deepcopy(self._assume_kwargs)
848 mfa_serial = assume_role_kwargs.get('SerialNumber')
850 if mfa_serial is not None:
851 prompt = 'Enter MFA code for %s: ' % mfa_serial
852 token_code = self._mfa_prompter(prompt)
853 assume_role_kwargs['TokenCode'] = token_code
855 duration_seconds = assume_role_kwargs.get('DurationSeconds')
857 if duration_seconds is not None:
858 assume_role_kwargs['DurationSeconds'] = duration_seconds
860 return assume_role_kwargs
862 def _create_client(self):
863 """Create an STS client using the source credentials."""
864 frozen_credentials = self._source_credentials.get_frozen_credentials()
865 return self._client_creator(
866 'sts',
867 aws_access_key_id=frozen_credentials.access_key,
868 aws_secret_access_key=frozen_credentials.secret_key,
869 aws_session_token=frozen_credentials.token,
870 )
873class AssumeRoleWithWebIdentityCredentialFetcher(
874 BaseAssumeRoleCredentialFetcher
875):
876 def __init__(
877 self,
878 client_creator,
879 web_identity_token_loader,
880 role_arn,
881 extra_args=None,
882 cache=None,
883 expiry_window_seconds=None,
884 ):
885 """
886 :type client_creator: callable
887 :param client_creator: A callable that creates a client taking
888 arguments like ``Session.create_client``.
890 :type web_identity_token_loader: callable
891 :param web_identity_token_loader: A callable that takes no arguments
892 and returns a web identity token str.
894 :type role_arn: str
895 :param role_arn: The ARN of the role to be assumed.
897 :type extra_args: dict
898 :param extra_args: Any additional arguments to add to the assume
899 role request using the format of the botocore operation.
900 Possible keys include, but may not be limited to,
901 DurationSeconds, Policy, SerialNumber, ExternalId and
902 RoleSessionName.
904 :type cache: dict
905 :param cache: An object that supports ``__getitem__``,
906 ``__setitem__``, and ``__contains__``. An example of this is
907 the ``JSONFileCache`` class in aws-cli.
909 :type expiry_window_seconds: int
910 :param expiry_window_seconds: The amount of time, in seconds,
911 """
912 self._web_identity_token_loader = web_identity_token_loader
914 super().__init__(
915 client_creator,
916 role_arn,
917 extra_args=extra_args,
918 cache=cache,
919 expiry_window_seconds=expiry_window_seconds,
920 )
922 def _get_credentials(self):
923 """Get credentials by calling assume role."""
924 kwargs = self._assume_role_kwargs()
925 # Assume role with web identity does not require credentials other than
926 # the token, explicitly configure the client to not sign requests.
927 config = Config(signature_version=UNSIGNED)
928 client = self._client_creator('sts', config=config)
929 return client.assume_role_with_web_identity(**kwargs)
931 def _assume_role_kwargs(self):
932 """Get the arguments for assume role based on current configuration."""
933 assume_role_kwargs = deepcopy(self._assume_kwargs)
934 identity_token = self._web_identity_token_loader()
935 assume_role_kwargs['WebIdentityToken'] = identity_token
937 return assume_role_kwargs
940class CredentialProvider:
941 # A short name to identify the provider within botocore.
942 METHOD = None
944 # A name to identify the provider for use in cross-sdk features like
945 # assume role's `credential_source` configuration option. These names
946 # are to be treated in a case-insensitive way. NOTE: any providers not
947 # implemented in botocore MUST prefix their canonical names with
948 # 'custom' or we DO NOT guarantee that it will work with any features
949 # that this provides.
950 CANONICAL_NAME = None
952 def __init__(self, session=None):
953 self.session = session
955 def load(self):
956 """
957 Loads the credentials from their source & sets them on the object.
959 Subclasses should implement this method (by reading from disk, the
960 environment, the network or wherever), returning ``True`` if they were
961 found & loaded.
963 If not found, this method should return ``False``, indictating that the
964 ``CredentialResolver`` should fall back to the next available method.
966 The default implementation does nothing, assuming the user has set the
967 ``access_key/secret_key/token`` themselves.
969 :returns: Whether credentials were found & set
970 :rtype: Credentials
971 """
972 return True
974 def _extract_creds_from_mapping(self, mapping, *key_names):
975 found = []
976 for key_name in key_names:
977 try:
978 found.append(mapping[key_name])
979 except KeyError:
980 raise PartialCredentialsError(
981 provider=self.METHOD, cred_var=key_name
982 )
983 return found
986class ProcessProvider(CredentialProvider):
987 METHOD = 'custom-process'
989 def __init__(self, profile_name, load_config, popen=subprocess.Popen):
990 self._profile_name = profile_name
991 self._load_config = load_config
992 self._loaded_config = None
993 self._popen = popen
995 def load(self):
996 credential_process = self._credential_process
997 if credential_process is None:
998 return
1000 creds_dict = self._retrieve_credentials_using(credential_process)
1001 if creds_dict.get('expiry_time') is not None:
1002 return RefreshableCredentials.create_from_metadata(
1003 creds_dict,
1004 lambda: self._retrieve_credentials_using(credential_process),
1005 self.METHOD,
1006 )
1008 return Credentials(
1009 access_key=creds_dict['access_key'],
1010 secret_key=creds_dict['secret_key'],
1011 token=creds_dict.get('token'),
1012 method=self.METHOD,
1013 )
1015 def _retrieve_credentials_using(self, credential_process):
1016 # We're not using shell=True, so we need to pass the
1017 # command and all arguments as a list.
1018 process_list = compat_shell_split(credential_process)
1019 p = self._popen(
1020 process_list, stdout=subprocess.PIPE, stderr=subprocess.PIPE
1021 )
1022 stdout, stderr = p.communicate()
1023 if p.returncode != 0:
1024 raise CredentialRetrievalError(
1025 provider=self.METHOD, error_msg=stderr.decode('utf-8')
1026 )
1027 parsed = botocore.compat.json.loads(stdout.decode('utf-8'))
1028 version = parsed.get('Version', '<Version key not provided>')
1029 if version != 1:
1030 raise CredentialRetrievalError(
1031 provider=self.METHOD,
1032 error_msg=(
1033 f"Unsupported version '{version}' for credential process "
1034 f"provider, supported versions: 1"
1035 ),
1036 )
1037 try:
1038 return {
1039 'access_key': parsed['AccessKeyId'],
1040 'secret_key': parsed['SecretAccessKey'],
1041 'token': parsed.get('SessionToken'),
1042 'expiry_time': parsed.get('Expiration'),
1043 }
1044 except KeyError as e:
1045 raise CredentialRetrievalError(
1046 provider=self.METHOD,
1047 error_msg=f"Missing required key in response: {e}",
1048 )
1050 @property
1051 def _credential_process(self):
1052 if self._loaded_config is None:
1053 self._loaded_config = self._load_config()
1054 profile_config = self._loaded_config.get('profiles', {}).get(
1055 self._profile_name, {}
1056 )
1057 return profile_config.get('credential_process')
1060class InstanceMetadataProvider(CredentialProvider):
1061 METHOD = 'iam-role'
1062 CANONICAL_NAME = 'Ec2InstanceMetadata'
1064 def __init__(self, iam_role_fetcher):
1065 self._role_fetcher = iam_role_fetcher
1067 def load(self):
1068 fetcher = self._role_fetcher
1069 # We do the first request, to see if we get useful data back.
1070 # If not, we'll pass & move on to whatever's next in the credential
1071 # chain.
1072 metadata = fetcher.retrieve_iam_role_credentials()
1073 if not metadata:
1074 return None
1075 logger.info(
1076 'Found credentials from IAM Role: %s', metadata['role_name']
1077 )
1078 # We manually set the data here, since we already made the request &
1079 # have it. When the expiry is hit, the credentials will auto-refresh
1080 # themselves.
1081 creds = RefreshableCredentials.create_from_metadata(
1082 metadata,
1083 method=self.METHOD,
1084 refresh_using=fetcher.retrieve_iam_role_credentials,
1085 )
1086 return creds
1089class EnvProvider(CredentialProvider):
1090 METHOD = 'env'
1091 CANONICAL_NAME = 'Environment'
1092 ACCESS_KEY = 'AWS_ACCESS_KEY_ID'
1093 SECRET_KEY = 'AWS_SECRET_ACCESS_KEY'
1094 # The token can come from either of these env var.
1095 # AWS_SESSION_TOKEN is what other AWS SDKs have standardized on.
1096 TOKENS = ['AWS_SECURITY_TOKEN', 'AWS_SESSION_TOKEN']
1097 EXPIRY_TIME = 'AWS_CREDENTIAL_EXPIRATION'
1099 def __init__(self, environ=None, mapping=None):
1100 """
1102 :param environ: The environment variables (defaults to
1103 ``os.environ`` if no value is provided).
1104 :param mapping: An optional mapping of variable names to
1105 environment variable names. Use this if you want to
1106 change the mapping of access_key->AWS_ACCESS_KEY_ID, etc.
1107 The dict can have up to 3 keys: ``access_key``, ``secret_key``,
1108 ``session_token``.
1109 """
1110 if environ is None:
1111 environ = os.environ
1112 self.environ = environ
1113 self._mapping = self._build_mapping(mapping)
1115 def _build_mapping(self, mapping):
1116 # Mapping of variable name to env var name.
1117 var_mapping = {}
1118 if mapping is None:
1119 # Use the class var default.
1120 var_mapping['access_key'] = self.ACCESS_KEY
1121 var_mapping['secret_key'] = self.SECRET_KEY
1122 var_mapping['token'] = self.TOKENS
1123 var_mapping['expiry_time'] = self.EXPIRY_TIME
1124 else:
1125 var_mapping['access_key'] = mapping.get(
1126 'access_key', self.ACCESS_KEY
1127 )
1128 var_mapping['secret_key'] = mapping.get(
1129 'secret_key', self.SECRET_KEY
1130 )
1131 var_mapping['token'] = mapping.get('token', self.TOKENS)
1132 if not isinstance(var_mapping['token'], list):
1133 var_mapping['token'] = [var_mapping['token']]
1134 var_mapping['expiry_time'] = mapping.get(
1135 'expiry_time', self.EXPIRY_TIME
1136 )
1137 return var_mapping
1139 def load(self):
1140 """
1141 Search for credentials in explicit environment variables.
1142 """
1144 access_key = self.environ.get(self._mapping['access_key'], '')
1146 if access_key:
1147 logger.info('Found credentials in environment variables.')
1148 fetcher = self._create_credentials_fetcher()
1149 credentials = fetcher(require_expiry=False)
1151 expiry_time = credentials['expiry_time']
1152 if expiry_time is not None:
1153 expiry_time = parse(expiry_time)
1154 return RefreshableCredentials(
1155 credentials['access_key'],
1156 credentials['secret_key'],
1157 credentials['token'],
1158 expiry_time,
1159 refresh_using=fetcher,
1160 method=self.METHOD,
1161 )
1163 return Credentials(
1164 credentials['access_key'],
1165 credentials['secret_key'],
1166 credentials['token'],
1167 method=self.METHOD,
1168 )
1169 else:
1170 return None
1172 def _create_credentials_fetcher(self):
1173 mapping = self._mapping
1174 method = self.METHOD
1175 environ = self.environ
1177 def fetch_credentials(require_expiry=True):
1178 credentials = {}
1180 access_key = environ.get(mapping['access_key'], '')
1181 if not access_key:
1182 raise PartialCredentialsError(
1183 provider=method, cred_var=mapping['access_key']
1184 )
1185 credentials['access_key'] = access_key
1187 secret_key = environ.get(mapping['secret_key'], '')
1188 if not secret_key:
1189 raise PartialCredentialsError(
1190 provider=method, cred_var=mapping['secret_key']
1191 )
1192 credentials['secret_key'] = secret_key
1194 credentials['token'] = None
1195 for token_env_var in mapping['token']:
1196 token = environ.get(token_env_var, '')
1197 if token:
1198 credentials['token'] = token
1199 break
1201 credentials['expiry_time'] = None
1202 expiry_time = environ.get(mapping['expiry_time'], '')
1203 if expiry_time:
1204 credentials['expiry_time'] = expiry_time
1205 if require_expiry and not expiry_time:
1206 raise PartialCredentialsError(
1207 provider=method, cred_var=mapping['expiry_time']
1208 )
1210 return credentials
1212 return fetch_credentials
1215class OriginalEC2Provider(CredentialProvider):
1216 METHOD = 'ec2-credentials-file'
1217 CANONICAL_NAME = 'Ec2Config'
1219 CRED_FILE_ENV = 'AWS_CREDENTIAL_FILE'
1220 ACCESS_KEY = 'AWSAccessKeyId'
1221 SECRET_KEY = 'AWSSecretKey'
1223 def __init__(self, environ=None, parser=None):
1224 if environ is None:
1225 environ = os.environ
1226 if parser is None:
1227 parser = parse_key_val_file
1228 self._environ = environ
1229 self._parser = parser
1231 def load(self):
1232 """
1233 Search for a credential file used by original EC2 CLI tools.
1234 """
1235 if 'AWS_CREDENTIAL_FILE' in self._environ:
1236 full_path = os.path.expanduser(
1237 self._environ['AWS_CREDENTIAL_FILE']
1238 )
1239 creds = self._parser(full_path)
1240 if self.ACCESS_KEY in creds:
1241 logger.info('Found credentials in AWS_CREDENTIAL_FILE.')
1242 access_key = creds[self.ACCESS_KEY]
1243 secret_key = creds[self.SECRET_KEY]
1244 # EC2 creds file doesn't support session tokens.
1245 return Credentials(access_key, secret_key, method=self.METHOD)
1246 else:
1247 return None
1250class SharedCredentialProvider(CredentialProvider):
1251 METHOD = 'shared-credentials-file'
1252 CANONICAL_NAME = 'SharedCredentials'
1254 ACCESS_KEY = 'aws_access_key_id'
1255 SECRET_KEY = 'aws_secret_access_key'
1256 # Same deal as the EnvProvider above. Botocore originally supported
1257 # aws_security_token, but the SDKs are standardizing on aws_session_token
1258 # so we support both.
1259 TOKENS = ['aws_security_token', 'aws_session_token']
1261 def __init__(self, creds_filename, profile_name=None, ini_parser=None):
1262 self._creds_filename = creds_filename
1263 if profile_name is None:
1264 profile_name = 'default'
1265 self._profile_name = profile_name
1266 if ini_parser is None:
1267 ini_parser = botocore.configloader.raw_config_parse
1268 self._ini_parser = ini_parser
1270 def load(self):
1271 try:
1272 available_creds = self._ini_parser(self._creds_filename)
1273 except ConfigNotFound:
1274 return None
1275 if self._profile_name in available_creds:
1276 config = available_creds[self._profile_name]
1277 if self.ACCESS_KEY in config:
1278 logger.info(
1279 "Found credentials in shared credentials file: %s",
1280 self._creds_filename,
1281 )
1282 access_key, secret_key = self._extract_creds_from_mapping(
1283 config, self.ACCESS_KEY, self.SECRET_KEY
1284 )
1285 token = self._get_session_token(config)
1286 return Credentials(
1287 access_key, secret_key, token, method=self.METHOD
1288 )
1290 def _get_session_token(self, config):
1291 for token_envvar in self.TOKENS:
1292 if token_envvar in config:
1293 return config[token_envvar]
1296class ConfigProvider(CredentialProvider):
1297 """INI based config provider with profile sections."""
1299 METHOD = 'config-file'
1300 CANONICAL_NAME = 'SharedConfig'
1302 ACCESS_KEY = 'aws_access_key_id'
1303 SECRET_KEY = 'aws_secret_access_key'
1304 # Same deal as the EnvProvider above. Botocore originally supported
1305 # aws_security_token, but the SDKs are standardizing on aws_session_token
1306 # so we support both.
1307 TOKENS = ['aws_security_token', 'aws_session_token']
1309 def __init__(self, config_filename, profile_name, config_parser=None):
1310 """
1312 :param config_filename: The session configuration scoped to the current
1313 profile. This is available via ``session.config``.
1314 :param profile_name: The name of the current profile.
1315 :param config_parser: A config parser callable.
1317 """
1318 self._config_filename = config_filename
1319 self._profile_name = profile_name
1320 if config_parser is None:
1321 config_parser = botocore.configloader.load_config
1322 self._config_parser = config_parser
1324 def load(self):
1325 """
1326 If there is are credentials in the configuration associated with
1327 the session, use those.
1328 """
1329 try:
1330 full_config = self._config_parser(self._config_filename)
1331 except ConfigNotFound:
1332 return None
1333 if self._profile_name in full_config['profiles']:
1334 profile_config = full_config['profiles'][self._profile_name]
1335 if self.ACCESS_KEY in profile_config:
1336 logger.info(
1337 "Credentials found in config file: %s",
1338 self._config_filename,
1339 )
1340 access_key, secret_key = self._extract_creds_from_mapping(
1341 profile_config, self.ACCESS_KEY, self.SECRET_KEY
1342 )
1343 token = self._get_session_token(profile_config)
1344 return Credentials(
1345 access_key, secret_key, token, method=self.METHOD
1346 )
1347 else:
1348 return None
1350 def _get_session_token(self, profile_config):
1351 for token_name in self.TOKENS:
1352 if token_name in profile_config:
1353 return profile_config[token_name]
1356class BotoProvider(CredentialProvider):
1357 METHOD = 'boto-config'
1358 CANONICAL_NAME = 'Boto2Config'
1360 BOTO_CONFIG_ENV = 'BOTO_CONFIG'
1361 DEFAULT_CONFIG_FILENAMES = ['/etc/boto.cfg', '~/.boto']
1362 ACCESS_KEY = 'aws_access_key_id'
1363 SECRET_KEY = 'aws_secret_access_key'
1365 def __init__(self, environ=None, ini_parser=None):
1366 if environ is None:
1367 environ = os.environ
1368 if ini_parser is None:
1369 ini_parser = botocore.configloader.raw_config_parse
1370 self._environ = environ
1371 self._ini_parser = ini_parser
1373 def load(self):
1374 """
1375 Look for credentials in boto config file.
1376 """
1377 if self.BOTO_CONFIG_ENV in self._environ:
1378 potential_locations = [self._environ[self.BOTO_CONFIG_ENV]]
1379 else:
1380 potential_locations = self.DEFAULT_CONFIG_FILENAMES
1381 for filename in potential_locations:
1382 try:
1383 config = self._ini_parser(filename)
1384 except ConfigNotFound:
1385 # Move on to the next potential config file name.
1386 continue
1387 if 'Credentials' in config:
1388 credentials = config['Credentials']
1389 if self.ACCESS_KEY in credentials:
1390 logger.info(
1391 "Found credentials in boto config file: %s", filename
1392 )
1393 access_key, secret_key = self._extract_creds_from_mapping(
1394 credentials, self.ACCESS_KEY, self.SECRET_KEY
1395 )
1396 return Credentials(
1397 access_key, secret_key, method=self.METHOD
1398 )
1401class AssumeRoleProvider(CredentialProvider):
1402 METHOD = 'assume-role'
1403 # The AssumeRole provider is logically part of the SharedConfig and
1404 # SharedCredentials providers. Since the purpose of the canonical name
1405 # is to provide cross-sdk compatibility, calling code will need to be
1406 # aware that either of those providers should be tied to the AssumeRole
1407 # provider as much as possible.
1408 CANONICAL_NAME = None
1409 ROLE_CONFIG_VAR = 'role_arn'
1410 WEB_IDENTITY_TOKE_FILE_VAR = 'web_identity_token_file'
1411 # Credentials are considered expired (and will be refreshed) once the total
1412 # remaining time left until the credentials expires is less than the
1413 # EXPIRY_WINDOW.
1414 EXPIRY_WINDOW_SECONDS = 60 * 15
1416 def __init__(
1417 self,
1418 load_config,
1419 client_creator,
1420 cache,
1421 profile_name,
1422 prompter=getpass.getpass,
1423 credential_sourcer=None,
1424 profile_provider_builder=None,
1425 ):
1426 """
1427 :type load_config: callable
1428 :param load_config: A function that accepts no arguments, and
1429 when called, will return the full configuration dictionary
1430 for the session (``session.full_config``).
1432 :type client_creator: callable
1433 :param client_creator: A factory function that will create
1434 a client when called. Has the same interface as
1435 ``botocore.session.Session.create_client``.
1437 :type cache: dict
1438 :param cache: An object that supports ``__getitem__``,
1439 ``__setitem__``, and ``__contains__``. An example
1440 of this is the ``JSONFileCache`` class in the CLI.
1442 :type profile_name: str
1443 :param profile_name: The name of the profile.
1445 :type prompter: callable
1446 :param prompter: A callable that returns input provided
1447 by the user (i.e raw_input, getpass.getpass, etc.).
1449 :type credential_sourcer: CanonicalNameCredentialSourcer
1450 :param credential_sourcer: A credential provider that takes a
1451 configuration, which is used to provide the source credentials
1452 for the STS call.
1453 """
1454 #: The cache used to first check for assumed credentials.
1455 #: This is checked before making the AssumeRole API
1456 #: calls and can be useful if you have short lived
1457 #: scripts and you'd like to avoid calling AssumeRole
1458 #: until the credentials are expired.
1459 self.cache = cache
1460 self._load_config = load_config
1461 # client_creator is a callable that creates function.
1462 # It's basically session.create_client
1463 self._client_creator = client_creator
1464 self._profile_name = profile_name
1465 self._prompter = prompter
1466 # The _loaded_config attribute will be populated from the
1467 # load_config() function once the configuration is actually
1468 # loaded. The reason we go through all this instead of just
1469 # requiring that the loaded_config be passed to us is to that
1470 # we can defer configuration loaded until we actually try
1471 # to load credentials (as opposed to when the object is
1472 # instantiated).
1473 self._loaded_config = {}
1474 self._credential_sourcer = credential_sourcer
1475 self._profile_provider_builder = profile_provider_builder
1476 self._visited_profiles = [self._profile_name]
1478 def load(self):
1479 self._loaded_config = self._load_config()
1480 profiles = self._loaded_config.get('profiles', {})
1481 profile = profiles.get(self._profile_name, {})
1482 if self._has_assume_role_config_vars(profile):
1483 return self._load_creds_via_assume_role(self._profile_name)
1485 def _has_assume_role_config_vars(self, profile):
1486 return (
1487 self.ROLE_CONFIG_VAR in profile
1488 and
1489 # We need to ensure this provider doesn't look at a profile when
1490 # the profile has configuration for web identity. Simply relying on
1491 # the order in the credential chain is insufficient as it doesn't
1492 # prevent the case when we're doing an assume role chain.
1493 self.WEB_IDENTITY_TOKE_FILE_VAR not in profile
1494 )
1496 def _load_creds_via_assume_role(self, profile_name):
1497 role_config = self._get_role_config(profile_name)
1498 source_credentials = self._resolve_source_credentials(
1499 role_config, profile_name
1500 )
1502 extra_args = {}
1503 role_session_name = role_config.get('role_session_name')
1504 if role_session_name is not None:
1505 extra_args['RoleSessionName'] = role_session_name
1507 external_id = role_config.get('external_id')
1508 if external_id is not None:
1509 extra_args['ExternalId'] = external_id
1511 mfa_serial = role_config.get('mfa_serial')
1512 if mfa_serial is not None:
1513 extra_args['SerialNumber'] = mfa_serial
1515 duration_seconds = role_config.get('duration_seconds')
1516 if duration_seconds is not None:
1517 extra_args['DurationSeconds'] = duration_seconds
1519 fetcher = AssumeRoleCredentialFetcher(
1520 client_creator=self._client_creator,
1521 source_credentials=source_credentials,
1522 role_arn=role_config['role_arn'],
1523 extra_args=extra_args,
1524 mfa_prompter=self._prompter,
1525 cache=self.cache,
1526 )
1527 refresher = fetcher.fetch_credentials
1528 if mfa_serial is not None:
1529 refresher = create_mfa_serial_refresher(refresher)
1531 # The initial credentials are empty and the expiration time is set
1532 # to now so that we can delay the call to assume role until it is
1533 # strictly needed.
1534 return DeferredRefreshableCredentials(
1535 method=self.METHOD,
1536 refresh_using=refresher,
1537 time_fetcher=_local_now,
1538 )
1540 def _get_role_config(self, profile_name):
1541 """Retrieves and validates the role configuration for the profile."""
1542 profiles = self._loaded_config.get('profiles', {})
1544 profile = profiles[profile_name]
1545 source_profile = profile.get('source_profile')
1546 role_arn = profile['role_arn']
1547 credential_source = profile.get('credential_source')
1548 mfa_serial = profile.get('mfa_serial')
1549 external_id = profile.get('external_id')
1550 role_session_name = profile.get('role_session_name')
1551 duration_seconds = profile.get('duration_seconds')
1553 role_config = {
1554 'role_arn': role_arn,
1555 'external_id': external_id,
1556 'mfa_serial': mfa_serial,
1557 'role_session_name': role_session_name,
1558 'source_profile': source_profile,
1559 'credential_source': credential_source,
1560 }
1562 if duration_seconds is not None:
1563 try:
1564 role_config['duration_seconds'] = int(duration_seconds)
1565 except ValueError:
1566 pass
1568 # Either the credential source or the source profile must be
1569 # specified, but not both.
1570 if credential_source is not None and source_profile is not None:
1571 raise InvalidConfigError(
1572 error_msg=(
1573 'The profile "%s" contains both source_profile and '
1574 'credential_source.' % profile_name
1575 )
1576 )
1577 elif credential_source is None and source_profile is None:
1578 raise PartialCredentialsError(
1579 provider=self.METHOD,
1580 cred_var='source_profile or credential_source',
1581 )
1582 elif credential_source is not None:
1583 self._validate_credential_source(profile_name, credential_source)
1584 else:
1585 self._validate_source_profile(profile_name, source_profile)
1587 return role_config
1589 def _validate_credential_source(self, parent_profile, credential_source):
1590 if self._credential_sourcer is None:
1591 raise InvalidConfigError(
1592 error_msg=(
1593 f"The credential_source \"{credential_source}\" is specified "
1594 f"in profile \"{parent_profile}\", "
1595 f"but no source provider was configured."
1596 )
1597 )
1598 if not self._credential_sourcer.is_supported(credential_source):
1599 raise InvalidConfigError(
1600 error_msg=(
1601 f"The credential source \"{credential_source}\" referenced "
1602 f"in profile \"{parent_profile}\" is not valid."
1603 )
1604 )
1606 def _source_profile_has_credentials(self, profile):
1607 return any(
1608 [
1609 self._has_static_credentials(profile),
1610 self._has_assume_role_config_vars(profile),
1611 ]
1612 )
1614 def _validate_source_profile(
1615 self, parent_profile_name, source_profile_name
1616 ):
1617 profiles = self._loaded_config.get('profiles', {})
1618 if source_profile_name not in profiles:
1619 raise InvalidConfigError(
1620 error_msg=(
1621 f"The source_profile \"{source_profile_name}\" referenced in "
1622 f"the profile \"{parent_profile_name}\" does not exist."
1623 )
1624 )
1626 source_profile = profiles[source_profile_name]
1628 # Make sure we aren't going into an infinite loop. If we haven't
1629 # visited the profile yet, we're good.
1630 if source_profile_name not in self._visited_profiles:
1631 return
1633 # If we have visited the profile and the profile isn't simply
1634 # referencing itself, that's an infinite loop.
1635 if source_profile_name != parent_profile_name:
1636 raise InfiniteLoopConfigError(
1637 source_profile=source_profile_name,
1638 visited_profiles=self._visited_profiles,
1639 )
1641 # A profile is allowed to reference itself so that it can source
1642 # static credentials and have configuration all in the same
1643 # profile. This will only ever work for the top level assume
1644 # role because the static credentials will otherwise take
1645 # precedence.
1646 if not self._has_static_credentials(source_profile):
1647 raise InfiniteLoopConfigError(
1648 source_profile=source_profile_name,
1649 visited_profiles=self._visited_profiles,
1650 )
1652 def _has_static_credentials(self, profile):
1653 static_keys = ['aws_secret_access_key', 'aws_access_key_id']
1654 return any(static_key in profile for static_key in static_keys)
1656 def _resolve_source_credentials(self, role_config, profile_name):
1657 credential_source = role_config.get('credential_source')
1658 if credential_source is not None:
1659 return self._resolve_credentials_from_source(
1660 credential_source, profile_name
1661 )
1663 source_profile = role_config['source_profile']
1664 self._visited_profiles.append(source_profile)
1665 return self._resolve_credentials_from_profile(source_profile)
1667 def _resolve_credentials_from_profile(self, profile_name):
1668 profiles = self._loaded_config.get('profiles', {})
1669 profile = profiles[profile_name]
1671 if (
1672 self._has_static_credentials(profile)
1673 and not self._profile_provider_builder
1674 ):
1675 # This is only here for backwards compatibility. If this provider
1676 # isn't given a profile provider builder we still want to be able
1677 # handle the basic static credential case as we would before the
1678 # provile provider builder parameter was added.
1679 return self._resolve_static_credentials_from_profile(profile)
1680 elif self._has_static_credentials(
1681 profile
1682 ) or not self._has_assume_role_config_vars(profile):
1683 profile_providers = self._profile_provider_builder.providers(
1684 profile_name=profile_name,
1685 disable_env_vars=True,
1686 )
1687 profile_chain = CredentialResolver(profile_providers)
1688 credentials = profile_chain.load_credentials()
1689 if credentials is None:
1690 error_message = (
1691 'The source profile "%s" must have credentials.'
1692 )
1693 raise InvalidConfigError(
1694 error_msg=error_message % profile_name,
1695 )
1696 return credentials
1698 return self._load_creds_via_assume_role(profile_name)
1700 def _resolve_static_credentials_from_profile(self, profile):
1701 try:
1702 return Credentials(
1703 access_key=profile['aws_access_key_id'],
1704 secret_key=profile['aws_secret_access_key'],
1705 token=profile.get('aws_session_token'),
1706 )
1707 except KeyError as e:
1708 raise PartialCredentialsError(
1709 provider=self.METHOD, cred_var=str(e)
1710 )
1712 def _resolve_credentials_from_source(
1713 self, credential_source, profile_name
1714 ):
1715 credentials = self._credential_sourcer.source_credentials(
1716 credential_source
1717 )
1718 if credentials is None:
1719 raise CredentialRetrievalError(
1720 provider=credential_source,
1721 error_msg=(
1722 'No credentials found in credential_source referenced '
1723 'in profile %s' % profile_name
1724 ),
1725 )
1726 return credentials
1729class AssumeRoleWithWebIdentityProvider(CredentialProvider):
1730 METHOD = 'assume-role-with-web-identity'
1731 CANONICAL_NAME = None
1732 _CONFIG_TO_ENV_VAR = {
1733 'web_identity_token_file': 'AWS_WEB_IDENTITY_TOKEN_FILE',
1734 'role_session_name': 'AWS_ROLE_SESSION_NAME',
1735 'role_arn': 'AWS_ROLE_ARN',
1736 }
1738 def __init__(
1739 self,
1740 load_config,
1741 client_creator,
1742 profile_name,
1743 cache=None,
1744 disable_env_vars=False,
1745 token_loader_cls=None,
1746 ):
1747 self.cache = cache
1748 self._load_config = load_config
1749 self._client_creator = client_creator
1750 self._profile_name = profile_name
1751 self._profile_config = None
1752 self._disable_env_vars = disable_env_vars
1753 if token_loader_cls is None:
1754 token_loader_cls = FileWebIdentityTokenLoader
1755 self._token_loader_cls = token_loader_cls
1757 def load(self):
1758 return self._assume_role_with_web_identity()
1760 def _get_profile_config(self, key):
1761 if self._profile_config is None:
1762 loaded_config = self._load_config()
1763 profiles = loaded_config.get('profiles', {})
1764 self._profile_config = profiles.get(self._profile_name, {})
1765 return self._profile_config.get(key)
1767 def _get_env_config(self, key):
1768 if self._disable_env_vars:
1769 return None
1770 env_key = self._CONFIG_TO_ENV_VAR.get(key)
1771 if env_key and env_key in os.environ:
1772 return os.environ[env_key]
1773 return None
1775 def _get_config(self, key):
1776 env_value = self._get_env_config(key)
1777 if env_value is not None:
1778 return env_value
1779 return self._get_profile_config(key)
1781 def _assume_role_with_web_identity(self):
1782 token_path = self._get_config('web_identity_token_file')
1783 if not token_path:
1784 return None
1785 token_loader = self._token_loader_cls(token_path)
1787 role_arn = self._get_config('role_arn')
1788 if not role_arn:
1789 error_msg = (
1790 'The provided profile or the current environment is '
1791 'configured to assume role with web identity but has no '
1792 'role ARN configured. Ensure that the profile has the role_arn'
1793 'configuration set or the AWS_ROLE_ARN env var is set.'
1794 )
1795 raise InvalidConfigError(error_msg=error_msg)
1797 extra_args = {}
1798 role_session_name = self._get_config('role_session_name')
1799 if role_session_name is not None:
1800 extra_args['RoleSessionName'] = role_session_name
1802 fetcher = AssumeRoleWithWebIdentityCredentialFetcher(
1803 client_creator=self._client_creator,
1804 web_identity_token_loader=token_loader,
1805 role_arn=role_arn,
1806 extra_args=extra_args,
1807 cache=self.cache,
1808 )
1809 # The initial credentials are empty and the expiration time is set
1810 # to now so that we can delay the call to assume role until it is
1811 # strictly needed.
1812 return DeferredRefreshableCredentials(
1813 method=self.METHOD,
1814 refresh_using=fetcher.fetch_credentials,
1815 )
1818class CanonicalNameCredentialSourcer:
1819 def __init__(self, providers):
1820 self._providers = providers
1822 def is_supported(self, source_name):
1823 """Validates a given source name.
1825 :type source_name: str
1826 :param source_name: The value of credential_source in the config
1827 file. This is the canonical name of the credential provider.
1829 :rtype: bool
1830 :returns: True if the credential provider is supported,
1831 False otherwise.
1832 """
1833 return source_name in [p.CANONICAL_NAME for p in self._providers]
1835 def source_credentials(self, source_name):
1836 """Loads source credentials based on the provided configuration.
1838 :type source_name: str
1839 :param source_name: The value of credential_source in the config
1840 file. This is the canonical name of the credential provider.
1842 :rtype: Credentials
1843 """
1844 source = self._get_provider(source_name)
1845 if isinstance(source, CredentialResolver):
1846 return source.load_credentials()
1847 return source.load()
1849 def _get_provider(self, canonical_name):
1850 """Return a credential provider by its canonical name.
1852 :type canonical_name: str
1853 :param canonical_name: The canonical name of the provider.
1855 :raises UnknownCredentialError: Raised if no
1856 credential provider by the provided name
1857 is found.
1858 """
1859 provider = self._get_provider_by_canonical_name(canonical_name)
1861 # The AssumeRole provider should really be part of the SharedConfig
1862 # provider rather than being its own thing, but it is not. It is
1863 # effectively part of both the SharedConfig provider and the
1864 # SharedCredentials provider now due to the way it behaves.
1865 # Therefore if we want either of those providers we should return
1866 # the AssumeRole provider with it.
1867 if canonical_name.lower() in ['sharedconfig', 'sharedcredentials']:
1868 assume_role_provider = self._get_provider_by_method('assume-role')
1869 if assume_role_provider is not None:
1870 # The SharedConfig or SharedCredentials provider may not be
1871 # present if it was removed for some reason, but the
1872 # AssumeRole provider could still be present. In that case,
1873 # return the assume role provider by itself.
1874 if provider is None:
1875 return assume_role_provider
1877 # If both are present, return them both as a
1878 # CredentialResolver so that calling code can treat them as
1879 # a single entity.
1880 return CredentialResolver([assume_role_provider, provider])
1882 if provider is None:
1883 raise UnknownCredentialError(name=canonical_name)
1885 return provider
1887 def _get_provider_by_canonical_name(self, canonical_name):
1888 """Return a credential provider by its canonical name.
1890 This function is strict, it does not attempt to address
1891 compatibility issues.
1892 """
1893 for provider in self._providers:
1894 name = provider.CANONICAL_NAME
1895 # Canonical names are case-insensitive
1896 if name and name.lower() == canonical_name.lower():
1897 return provider
1899 def _get_provider_by_method(self, method):
1900 """Return a credential provider by its METHOD name."""
1901 for provider in self._providers:
1902 if provider.METHOD == method:
1903 return provider
1906class ContainerProvider(CredentialProvider):
1907 METHOD = 'container-role'
1908 CANONICAL_NAME = 'EcsContainer'
1909 ENV_VAR = 'AWS_CONTAINER_CREDENTIALS_RELATIVE_URI'
1910 ENV_VAR_FULL = 'AWS_CONTAINER_CREDENTIALS_FULL_URI'
1911 ENV_VAR_AUTH_TOKEN = 'AWS_CONTAINER_AUTHORIZATION_TOKEN'
1912 ENV_VAR_AUTH_TOKEN_FILE = 'AWS_CONTAINER_AUTHORIZATION_TOKEN_FILE'
1914 def __init__(self, environ=None, fetcher=None):
1915 if environ is None:
1916 environ = os.environ
1917 if fetcher is None:
1918 fetcher = ContainerMetadataFetcher()
1919 self._environ = environ
1920 self._fetcher = fetcher
1922 def load(self):
1923 # This cred provider is only triggered if the self.ENV_VAR is set,
1924 # which only happens if you opt into this feature.
1925 if self.ENV_VAR in self._environ or self.ENV_VAR_FULL in self._environ:
1926 return self._retrieve_or_fail()
1928 def _retrieve_or_fail(self):
1929 if self._provided_relative_uri():
1930 full_uri = self._fetcher.full_url(self._environ[self.ENV_VAR])
1931 else:
1932 full_uri = self._environ[self.ENV_VAR_FULL]
1933 headers = self._build_headers()
1934 fetcher = self._create_fetcher(full_uri, headers)
1935 creds = fetcher()
1936 return RefreshableCredentials(
1937 access_key=creds['access_key'],
1938 secret_key=creds['secret_key'],
1939 token=creds['token'],
1940 method=self.METHOD,
1941 expiry_time=_parse_if_needed(creds['expiry_time']),
1942 refresh_using=fetcher,
1943 )
1945 def _build_headers(self):
1946 auth_token = None
1947 if self.ENV_VAR_AUTH_TOKEN_FILE in self._environ:
1948 auth_token_file_path = self._environ[self.ENV_VAR_AUTH_TOKEN_FILE]
1949 with open(auth_token_file_path) as token_file:
1950 auth_token = token_file.read()
1951 elif self.ENV_VAR_AUTH_TOKEN in self._environ:
1952 auth_token = self._environ[self.ENV_VAR_AUTH_TOKEN]
1953 if auth_token is not None:
1954 self._validate_auth_token(auth_token)
1955 return {'Authorization': auth_token}
1957 def _validate_auth_token(self, auth_token):
1958 if "\r" in auth_token or "\n" in auth_token:
1959 raise ValueError("Auth token value is not a legal header value")
1961 def _create_fetcher(self, full_uri, headers):
1962 def fetch_creds():
1963 try:
1964 response = self._fetcher.retrieve_full_uri(
1965 full_uri, headers=headers
1966 )
1967 except MetadataRetrievalError as e:
1968 logger.debug(
1969 "Error retrieving container metadata: %s", e, exc_info=True
1970 )
1971 raise CredentialRetrievalError(
1972 provider=self.METHOD, error_msg=str(e)
1973 )
1974 return {
1975 'access_key': response['AccessKeyId'],
1976 'secret_key': response['SecretAccessKey'],
1977 'token': response['Token'],
1978 'expiry_time': response['Expiration'],
1979 }
1981 return fetch_creds
1983 def _provided_relative_uri(self):
1984 return self.ENV_VAR in self._environ
1987class CredentialResolver:
1988 def __init__(self, providers):
1989 """
1991 :param providers: A list of ``CredentialProvider`` instances.
1993 """
1994 self.providers = providers
1996 def insert_before(self, name, credential_provider):
1997 """
1998 Inserts a new instance of ``CredentialProvider`` into the chain that
1999 will be tried before an existing one.
2001 :param name: The short name of the credentials you'd like to insert the
2002 new credentials before. (ex. ``env`` or ``config``). Existing names
2003 & ordering can be discovered via ``self.available_methods``.
2004 :type name: string
2006 :param cred_instance: An instance of the new ``Credentials`` object
2007 you'd like to add to the chain.
2008 :type cred_instance: A subclass of ``Credentials``
2009 """
2010 try:
2011 offset = [p.METHOD for p in self.providers].index(name)
2012 except ValueError:
2013 raise UnknownCredentialError(name=name)
2014 self.providers.insert(offset, credential_provider)
2016 def insert_after(self, name, credential_provider):
2017 """
2018 Inserts a new type of ``Credentials`` instance into the chain that will
2019 be tried after an existing one.
2021 :param name: The short name of the credentials you'd like to insert the
2022 new credentials after. (ex. ``env`` or ``config``). Existing names
2023 & ordering can be discovered via ``self.available_methods``.
2024 :type name: string
2026 :param cred_instance: An instance of the new ``Credentials`` object
2027 you'd like to add to the chain.
2028 :type cred_instance: A subclass of ``Credentials``
2029 """
2030 offset = self._get_provider_offset(name)
2031 self.providers.insert(offset + 1, credential_provider)
2033 def remove(self, name):
2034 """
2035 Removes a given ``Credentials`` instance from the chain.
2037 :param name: The short name of the credentials instance to remove.
2038 :type name: string
2039 """
2040 available_methods = [p.METHOD for p in self.providers]
2041 if name not in available_methods:
2042 # It's not present. Fail silently.
2043 return
2045 offset = available_methods.index(name)
2046 self.providers.pop(offset)
2048 def get_provider(self, name):
2049 """Return a credential provider by name.
2051 :type name: str
2052 :param name: The name of the provider.
2054 :raises UnknownCredentialError: Raised if no
2055 credential provider by the provided name
2056 is found.
2057 """
2058 return self.providers[self._get_provider_offset(name)]
2060 def _get_provider_offset(self, name):
2061 try:
2062 return [p.METHOD for p in self.providers].index(name)
2063 except ValueError:
2064 raise UnknownCredentialError(name=name)
2066 def load_credentials(self):
2067 """
2068 Goes through the credentials chain, returning the first ``Credentials``
2069 that could be loaded.
2070 """
2071 # First provider to return a non-None response wins.
2072 for provider in self.providers:
2073 logger.debug("Looking for credentials via: %s", provider.METHOD)
2074 creds = provider.load()
2075 if creds is not None:
2076 return creds
2078 # If we got here, no credentials could be found.
2079 # This feels like it should be an exception, but historically, ``None``
2080 # is returned.
2081 #
2082 # +1
2083 # -js
2084 return None
2087class SSOCredentialFetcher(CachedCredentialFetcher):
2088 _UTC_DATE_FORMAT = '%Y-%m-%dT%H:%M:%SZ'
2090 def __init__(
2091 self,
2092 start_url,
2093 sso_region,
2094 role_name,
2095 account_id,
2096 client_creator,
2097 token_loader=None,
2098 cache=None,
2099 expiry_window_seconds=None,
2100 token_provider=None,
2101 sso_session_name=None,
2102 ):
2103 self._client_creator = client_creator
2104 self._sso_region = sso_region
2105 self._role_name = role_name
2106 self._account_id = account_id
2107 self._start_url = start_url
2108 self._token_loader = token_loader
2109 self._token_provider = token_provider
2110 self._sso_session_name = sso_session_name
2111 super().__init__(cache, expiry_window_seconds)
2113 def _create_cache_key(self):
2114 """Create a predictable cache key for the current configuration.
2116 The cache key is intended to be compatible with file names.
2117 """
2118 args = {
2119 'roleName': self._role_name,
2120 'accountId': self._account_id,
2121 }
2122 if self._sso_session_name:
2123 args['sessionName'] = self._sso_session_name
2124 else:
2125 args['startUrl'] = self._start_url
2126 # NOTE: It would be good to hoist this cache key construction logic
2127 # into the CachedCredentialFetcher class as we should be consistent.
2128 # Unfortunately, the current assume role fetchers that sub class don't
2129 # pass separators resulting in non-minified JSON. In the long term,
2130 # all fetchers should use the below caching scheme.
2131 args = json.dumps(args, sort_keys=True, separators=(',', ':'))
2132 argument_hash = sha1(args.encode('utf-8')).hexdigest()
2133 return self._make_file_safe(argument_hash)
2135 def _parse_timestamp(self, timestamp_ms):
2136 # fromtimestamp expects seconds so: milliseconds / 1000 = seconds
2137 timestamp_seconds = timestamp_ms / 1000.0
2138 timestamp = datetime.datetime.fromtimestamp(timestamp_seconds, tzutc())
2139 return timestamp.strftime(self._UTC_DATE_FORMAT)
2141 def _get_credentials(self):
2142 """Get credentials by calling SSO get role credentials."""
2143 config = Config(
2144 signature_version=UNSIGNED,
2145 region_name=self._sso_region,
2146 )
2147 client = self._client_creator('sso', config=config)
2148 if self._token_provider:
2149 initial_token_data = self._token_provider.load_token()
2150 token = initial_token_data.get_frozen_token().token
2151 else:
2152 token = self._token_loader(self._start_url)['accessToken']
2154 kwargs = {
2155 'roleName': self._role_name,
2156 'accountId': self._account_id,
2157 'accessToken': token,
2158 }
2159 try:
2160 response = client.get_role_credentials(**kwargs)
2161 except client.exceptions.UnauthorizedException:
2162 raise UnauthorizedSSOTokenError()
2163 credentials = response['roleCredentials']
2165 credentials = {
2166 'ProviderType': 'sso',
2167 'Credentials': {
2168 'AccessKeyId': credentials['accessKeyId'],
2169 'SecretAccessKey': credentials['secretAccessKey'],
2170 'SessionToken': credentials['sessionToken'],
2171 'Expiration': self._parse_timestamp(credentials['expiration']),
2172 },
2173 }
2174 return credentials
2177class SSOProvider(CredentialProvider):
2178 METHOD = 'sso'
2180 _SSO_TOKEN_CACHE_DIR = os.path.expanduser(
2181 os.path.join('~', '.aws', 'sso', 'cache')
2182 )
2183 _PROFILE_REQUIRED_CONFIG_VARS = (
2184 'sso_role_name',
2185 'sso_account_id',
2186 )
2187 _SSO_REQUIRED_CONFIG_VARS = (
2188 'sso_start_url',
2189 'sso_region',
2190 )
2191 _ALL_REQUIRED_CONFIG_VARS = (
2192 _PROFILE_REQUIRED_CONFIG_VARS + _SSO_REQUIRED_CONFIG_VARS
2193 )
2195 def __init__(
2196 self,
2197 load_config,
2198 client_creator,
2199 profile_name,
2200 cache=None,
2201 token_cache=None,
2202 token_provider=None,
2203 ):
2204 if token_cache is None:
2205 token_cache = JSONFileCache(self._SSO_TOKEN_CACHE_DIR)
2206 self._token_cache = token_cache
2207 self._token_provider = token_provider
2208 if cache is None:
2209 cache = {}
2210 self.cache = cache
2211 self._load_config = load_config
2212 self._client_creator = client_creator
2213 self._profile_name = profile_name
2215 def _load_sso_config(self):
2216 loaded_config = self._load_config()
2217 profiles = loaded_config.get('profiles', {})
2218 profile_name = self._profile_name
2219 profile_config = profiles.get(self._profile_name, {})
2220 sso_sessions = loaded_config.get('sso_sessions', {})
2222 # Role name & Account ID indicate the cred provider should be used
2223 if all(
2224 c not in profile_config for c in self._PROFILE_REQUIRED_CONFIG_VARS
2225 ):
2226 return None
2228 resolved_config, extra_reqs = self._resolve_sso_session_reference(
2229 profile_config, sso_sessions
2230 )
2232 config = {}
2233 missing_config_vars = []
2234 all_required_configs = self._ALL_REQUIRED_CONFIG_VARS + extra_reqs
2235 for config_var in all_required_configs:
2236 if config_var in resolved_config:
2237 config[config_var] = resolved_config[config_var]
2238 else:
2239 missing_config_vars.append(config_var)
2241 if missing_config_vars:
2242 missing = ', '.join(missing_config_vars)
2243 raise InvalidConfigError(
2244 error_msg=(
2245 'The profile "%s" is configured to use SSO but is missing '
2246 'required configuration: %s' % (profile_name, missing)
2247 )
2248 )
2249 return config
2251 def _resolve_sso_session_reference(self, profile_config, sso_sessions):
2252 sso_session_name = profile_config.get('sso_session')
2253 if sso_session_name is None:
2254 # No reference to resolve, proceed with legacy flow
2255 return profile_config, ()
2257 if sso_session_name not in sso_sessions:
2258 error_msg = f'The specified sso-session does not exist: "{sso_session_name}"'
2259 raise InvalidConfigError(error_msg=error_msg)
2261 config = profile_config.copy()
2262 session = sso_sessions[sso_session_name]
2263 for config_var, val in session.items():
2264 # Validate any keys referenced in both profile and sso_session match
2265 if config.get(config_var, val) != val:
2266 error_msg = (
2267 f"The value for {config_var} is inconsistent between "
2268 f"profile ({config[config_var]}) and sso-session ({val})."
2269 )
2270 raise InvalidConfigError(error_msg=error_msg)
2271 config[config_var] = val
2272 return config, ('sso_session',)
2274 def load(self):
2275 sso_config = self._load_sso_config()
2276 if not sso_config:
2277 return None
2279 fetcher_kwargs = {
2280 'start_url': sso_config['sso_start_url'],
2281 'sso_region': sso_config['sso_region'],
2282 'role_name': sso_config['sso_role_name'],
2283 'account_id': sso_config['sso_account_id'],
2284 'client_creator': self._client_creator,
2285 'token_loader': SSOTokenLoader(cache=self._token_cache),
2286 'cache': self.cache,
2287 }
2288 if 'sso_session' in sso_config:
2289 fetcher_kwargs['sso_session_name'] = sso_config['sso_session']
2290 fetcher_kwargs['token_provider'] = self._token_provider
2292 sso_fetcher = SSOCredentialFetcher(**fetcher_kwargs)
2294 return DeferredRefreshableCredentials(
2295 method=self.METHOD,
2296 refresh_using=sso_fetcher.fetch_credentials,
2297 )