Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/c7n/credentials.py: 45%

62 statements  

« prev     ^ index     » next       coverage.py v7.3.2, created at 2023-12-08 06:51 +0000

1# Copyright The Cloud Custodian Authors. 

2# SPDX-License-Identifier: Apache-2.0 

3""" 

4Authentication utilities 

5""" 

6import os 

7 

8from botocore.credentials import RefreshableCredentials 

9from botocore.session import get_session 

10from boto3 import Session 

11 

12from c7n.version import version 

13from c7n.utils import get_retry 

14 

15 

16# we still have some issues (see #5023) to work through to switch to 

17# default regional endpoints, for now its opt-in. 

18USE_STS_REGIONAL = os.environ.get( 

19 'C7N_USE_STS_REGIONAL', '').lower() in ('yes', 'true') 

20 

21 

22class SessionFactory: 

23 

24 def __init__(self, region, profile=None, assume_role=None, external_id=None): 

25 self.region = region 

26 self.profile = profile 

27 self.assume_role = assume_role 

28 self.external_id = external_id 

29 self.session_name = "CloudCustodian" 

30 if 'C7N_SESSION_SUFFIX' in os.environ: 

31 self.session_name = "%s@%s" % ( 

32 self.session_name, os.environ['C7N_SESSION_SUFFIX']) 

33 self._subscribers = [] 

34 self._policy_name = "" 

35 

36 def _set_policy_name(self, name): 

37 self._policy_name = name 

38 

39 policy_name = property(None, _set_policy_name) 

40 

41 def __call__(self, assume=True, region=None): 

42 if self.assume_role and assume: 

43 session = Session(profile_name=self.profile) 

44 session = assumed_session( 

45 self.assume_role, self.session_name, session, 

46 region or self.region, self.external_id) 

47 else: 

48 session = Session( 

49 region_name=region or self.region, profile_name=self.profile) 

50 

51 return self.update(session) 

52 

53 def update(self, session): 

54 session._session.user_agent_name = "c7n" 

55 session._session.user_agent_version = version 

56 if self._policy_name: 

57 session._session.user_agent_extra = f"c7n/policy#{self._policy_name}" 

58 

59 for s in self._subscribers: 

60 s(session) 

61 

62 return session 

63 

64 def set_subscribers(self, subscribers): 

65 self._subscribers = subscribers 

66 

67 

68def assumed_session(role_arn, session_name, session=None, region=None, external_id=None): 

69 """STS Role assume a boto3.Session 

70 

71 With automatic credential renewal. 

72 

73 Args: 

74 role_arn: iam role arn to assume 

75 session_name: client session identifier 

76 session: an optional extant session, note session is captured 

77 in a function closure for renewing the sts assumed role. 

78 

79 :return: a boto3 session using the sts assumed role credentials 

80 

81 Notes: We have to poke at botocore internals a few times 

82 """ 

83 if session is None: 

84 session = Session() 

85 

86 retry = get_retry(('Throttling',)) 

87 

88 def refresh(): 

89 

90 parameters = {"RoleArn": role_arn, "RoleSessionName": session_name} 

91 

92 if external_id is not None: 

93 parameters['ExternalId'] = external_id 

94 

95 credentials = retry( 

96 get_sts_client( 

97 session, region).assume_role, **parameters)['Credentials'] 

98 return dict( 

99 access_key=credentials['AccessKeyId'], 

100 secret_key=credentials['SecretAccessKey'], 

101 token=credentials['SessionToken'], 

102 # Silly that we basically stringify so it can be parsed again 

103 expiry_time=credentials['Expiration'].isoformat()) 

104 

105 session_credentials = RefreshableCredentials.create_from_metadata( 

106 metadata=refresh(), 

107 refresh_using=refresh, 

108 method='sts-assume-role') 

109 

110 # so dirty.. it hurts, no clean way to set this outside of the 

111 # internals poke. There's some work upstream on making this nicer 

112 # but its pretty baroque as well with upstream support. 

113 # https://github.com/boto/boto3/issues/443 

114 # https://github.com/boto/botocore/issues/761 

115 

116 s = get_session() 

117 s._credentials = session_credentials 

118 if region is None: 

119 region = s.get_config_variable('region') or 'us-east-1' 

120 s.set_config_variable('region', region) 

121 return Session(botocore_session=s) 

122 

123 

124def get_sts_client(session, region): 

125 """Get the AWS STS endpoint specific for the given region. 

126 

127 Returns the global endpoint if region is not specified. 

128 

129 For the list of regional endpoints, see https://amzn.to/2ohJgtR 

130 """ 

131 if region and USE_STS_REGIONAL: 

132 endpoint_url = "https://sts.{}.amazonaws.com".format(region) 

133 region_name = region 

134 else: 

135 endpoint_url = None 

136 region_name = None 

137 return session.client( 

138 'sts', endpoint_url=endpoint_url, region_name=region_name)