Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/c7n_gcp/actions/iampolicy.py: 23%

52 statements  

« prev     ^ index     » next       coverage.py v7.3.2, created at 2023-12-08 06:51 +0000

1# Copyright The Cloud Custodian Authors. 

2# SPDX-License-Identifier: Apache-2.0 

3 

4from c7n.utils import local_session, type_schema 

5from c7n_gcp.actions import MethodAction 

6 

7 

8class SetIamPolicy(MethodAction): 

9 """ Sets IAM policy. It works with bindings only. 

10 

11 The action supports two lists for modifying the existing IAM policy: `add-bindings` and 

12 `remove-bindings`. The `add-bindings` records are merged with the existing bindings, hereby 

13 no changes are made if all the required bindings are already present in the applicable 

14 resource. The `remove-bindings` records are used to filter out the existing bindings, 

15 so the action will take no effect if there are no matches. For more information, 

16 please refer to the `_add_bindings` and `_remove_bindings` methods respectively. 

17 

18 Considering a record added both to the `add-bindings` and `remove-bindings` lists, which 

19 though is not a recommended thing to do in general, the latter is designed to be a more 

20 restrictive one, so the record will be removed from the existing IAM bindings in the end. 

21 

22 There following member types are available to work with: 

23 - allUsers, 

24 - allAuthenticatedUsers, 

25 - user, 

26 - group, 

27 - domain, 

28 - serviceAccount. 

29 

30 Note the `resource` field in the example that could be changed to another resource that has 

31 both `setIamPolicy` and `getIamPolicy` methods (such as gcp.spanner-database-instance). 

32 

33 Example: 

34 

35 .. code-block:: yaml 

36 

37 policies: 

38 - name: gcp-spanner-instance-set-iam-policy 

39 resource: gcp.spanner-instance 

40 actions: 

41 - type: set-iam-policy 

42 add-bindings: 

43 - members: 

44 - user:user1@test.com 

45 - user:user2@test.com 

46 role: roles/owner 

47 - members: 

48 - user:user3@gmail.com 

49 role: roles/viewer 

50 remove-bindings: 

51 - members: 

52 - user:user4@test.com 

53 role: roles/owner 

54 - members: 

55 - user:user5@gmail.com 

56 - user:user6@gmail.com 

57 role: roles/viewer 

58 """ 

59 schema = type_schema('set-iam-policy', 

60 **{ 

61 'minProperties': 1, 

62 'additionalProperties': False, 

63 'add-bindings': { 

64 'type': 'array', 

65 'minItems': 1, 

66 'items': {'role': {'type': 'string'}, 

67 'members': {'type': 'array', 

68 'items': { 

69 'type': 'string'}, 

70 'minItems': 1}} 

71 }, 

72 'remove-bindings': { 

73 'type': 'array', 

74 'minItems': 1, 

75 'items': {'role': {'type': 'string'}, 

76 'members': {'oneOf': [ 

77 {'type': 'array', 

78 'items': {'type': 'string'}, 

79 'minItems': 1}, 

80 {'enum': ['*']}]}}}, 

81 }) 

82 method_spec = {'op': 'setIamPolicy'} 

83 schema_alias = True 

84 

85 def get_resource_params(self, model, resource): 

86 """ 

87 Collects `existing_bindings` with the `_get_existing_bindings` method, `add_bindings` and 

88 `remove_bindings` from a policy, then calls `_remove_bindings` with the result of 

89 `_add_bindings` being applied to the `existing_bindings`, and finally sets the resulting 

90 list at the 'bindings' key if there is at least a single record there, or assigns an empty 

91 object to the 'policy' key in order to avoid errors produced by the API. 

92 

93 :param model: the parameters that are defined in a resource manager 

94 :param resource: the resource the action is applied to 

95 """ 

96 params = self._verb_arguments(resource) 

97 existing_bindings = self._get_existing_bindings(model, resource) 

98 add_bindings = self.data['add-bindings'] if 'add-bindings' in self.data else [] 

99 remove_bindings = self.data['remove-bindings'] if 'remove-bindings' in self.data else [] 

100 bindings_to_set = self._add_bindings(existing_bindings, add_bindings) 

101 bindings_to_set = self._remove_bindings(bindings_to_set, remove_bindings) 

102 params['body'] = { 

103 'policy': {'bindings': bindings_to_set} if len(bindings_to_set) > 0 else {}} 

104 return params 

105 

106 def _get_existing_bindings(self, model, resource): 

107 """ 

108 Calls the `getIamPolicy` method on the resource the action is applied to and returns 

109 either a list of existing bindings or an empty one if there is no 'bindings' key. 

110 

111 :param model: the same as in `get_resource_params` (needed to take `component` from) 

112 :param resource: the same as in `get_resource_params` (passed into `_verb_arguments`) 

113 """ 

114 existing_bindings = local_session(self.manager.session_factory).client( 

115 self.manager.resource_type.service, 

116 self.manager.resource_type.version, 

117 model.component).execute_query( 

118 'getIamPolicy', verb_arguments=self._verb_arguments(resource)) 

119 return existing_bindings['bindings'] if 'bindings' in existing_bindings else [] 

120 

121 def _verb_arguments(self, resource): 

122 """ 

123 Returns a dictionary passed when making the `getIamPolicy` and 'setIamPolicy' API calls. 

124 

125 :param resource: the same as in `get_resource_params` 

126 """ 

127 return {'resource': resource[self.manager.resource_type.id]} 

128 

129 def _add_bindings(self, existing_bindings, bindings_to_add): 

130 """ 

131 Converts the provided lists using `_get_roles_to_bindings_dict`, then iterates through 

132 them so that the returned list combines: 

133 - among the roles mentioned in a policy, the existing members merged with the ones to add 

134 so that there are no duplicates, 

135 - as for the other roles, all their members. 

136 

137 The roles or members that are mentioned in the policy and already present 

138 in the existing bindings are simply ignored with no errors produced. 

139 

140 An empty list could be returned only if both `existing_bindings` and `bindings_to_remove` 

141 are empty, the possibility of which is defined by the caller of the method. 

142 

143 For additional information on how the method works, please refer to the tests 

144 (e.g. test_spanner). 

145 

146 :param existing_bindings: a list of dictionaries containing the 'role' and 'members' keys 

147 taken from the resource the action is applied to 

148 :param bindings_to_add: a list of dictionaries containing the 'role' and 'members' keys 

149 taken from the policy 

150 """ 

151 bindings = [] 

152 roles_to_existing_bindings = self._get_roles_to_bindings_dict(existing_bindings) 

153 roles_to_bindings_to_add = self._get_roles_to_bindings_dict(bindings_to_add) 

154 for role in roles_to_bindings_to_add: 

155 updated_members = dict(roles_to_bindings_to_add[role]) 

156 if role in roles_to_existing_bindings: 

157 existing_members = roles_to_existing_bindings[role]['members'] 

158 members_to_add = list(filter(lambda member: member not in existing_members, 

159 updated_members['members'])) 

160 updated_members['members'] = existing_members + members_to_add 

161 bindings.append(updated_members) 

162 

163 for role in roles_to_existing_bindings: 

164 if role not in roles_to_bindings_to_add: 

165 bindings.append(roles_to_existing_bindings[role]) 

166 return bindings 

167 

168 def _remove_bindings(self, existing_bindings, bindings_to_remove): 

169 """ 

170 Converts the provided lists using `_get_roles_to_bindings_dict`, then iterates through 

171 them so that the returned list combines: 

172 - among the roles mentioned in a policy, only the members that are not marked for removal, 

173 - as for the other roles, all their members. 

174 

175 The roles or members that are mentioned in the policy but are absent 

176 in the existing bindings are simply ignored with no errors produced. 

177 

178 As can be observed, it is possible to have an empty list returned either if 

179 `existing_bindings` is already empty or `bindings_to_remove` filters everything out. 

180 

181 In addition, a star wildcard could be used as the `members` key value (members: '*') 

182 in order to remove all members from a role. 

183 

184 For additional information on how the method works, please refer to the tests 

185 (e.g. test_spanner). 

186 

187 :param existing_bindings: a list of dictionaries containing the 'role' and 'members' keys 

188 taken from the resource the action is applied to 

189 :param bindings_to_remove: a list of dictionaries containing the 'role' and 'members' keys 

190 taken from the policy 

191 """ 

192 bindings = [] 

193 roles_to_existing_bindings = self._get_roles_to_bindings_dict(existing_bindings) 

194 roles_to_bindings_to_remove = self._get_roles_to_bindings_dict(bindings_to_remove) 

195 for role in roles_to_bindings_to_remove: 

196 if (role in roles_to_existing_bindings and 

197 roles_to_bindings_to_remove[role]['members'] != '*'): 

198 updated_members = dict(roles_to_existing_bindings[role]) 

199 members_to_remove = roles_to_bindings_to_remove[role] 

200 updated_members['members'] = list(filter( 

201 lambda member: member not in members_to_remove['members'], 

202 updated_members['members'])) 

203 if len(updated_members['members']) > 0: 

204 bindings.append(updated_members) 

205 

206 for role in roles_to_existing_bindings: 

207 if role not in roles_to_bindings_to_remove: 

208 bindings.append(roles_to_existing_bindings[role]) 

209 return bindings 

210 

211 def _get_roles_to_bindings_dict(self, bindings_list): 

212 """ 

213 Converts a given list to a dictionary, values under the 'role' key in elements of whose 

214 become keys in the resulting dictionary while the elements themselves become values 

215 associated with these keys. 

216 

217 :param bindings_list: a list whose elements are expected to have the 'role' key 

218 """ 

219 return {binding['role']: binding for binding in bindings_list}