Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/c7n_gcp/actions/iampolicy.py: 23%
52 statements
« prev ^ index » next coverage.py v7.3.2, created at 2023-12-08 06:51 +0000
« prev ^ index » next coverage.py v7.3.2, created at 2023-12-08 06:51 +0000
1# Copyright The Cloud Custodian Authors.
2# SPDX-License-Identifier: Apache-2.0
4from c7n.utils import local_session, type_schema
5from c7n_gcp.actions import MethodAction
8class SetIamPolicy(MethodAction):
9 """ Sets IAM policy. It works with bindings only.
11 The action supports two lists for modifying the existing IAM policy: `add-bindings` and
12 `remove-bindings`. The `add-bindings` records are merged with the existing bindings, hereby
13 no changes are made if all the required bindings are already present in the applicable
14 resource. The `remove-bindings` records are used to filter out the existing bindings,
15 so the action will take no effect if there are no matches. For more information,
16 please refer to the `_add_bindings` and `_remove_bindings` methods respectively.
18 Considering a record added both to the `add-bindings` and `remove-bindings` lists, which
19 though is not a recommended thing to do in general, the latter is designed to be a more
20 restrictive one, so the record will be removed from the existing IAM bindings in the end.
22 There following member types are available to work with:
23 - allUsers,
24 - allAuthenticatedUsers,
25 - user,
26 - group,
27 - domain,
28 - serviceAccount.
30 Note the `resource` field in the example that could be changed to another resource that has
31 both `setIamPolicy` and `getIamPolicy` methods (such as gcp.spanner-database-instance).
33 Example:
35 .. code-block:: yaml
37 policies:
38 - name: gcp-spanner-instance-set-iam-policy
39 resource: gcp.spanner-instance
40 actions:
41 - type: set-iam-policy
42 add-bindings:
43 - members:
44 - user:user1@test.com
45 - user:user2@test.com
46 role: roles/owner
47 - members:
48 - user:user3@gmail.com
49 role: roles/viewer
50 remove-bindings:
51 - members:
52 - user:user4@test.com
53 role: roles/owner
54 - members:
55 - user:user5@gmail.com
56 - user:user6@gmail.com
57 role: roles/viewer
58 """
59 schema = type_schema('set-iam-policy',
60 **{
61 'minProperties': 1,
62 'additionalProperties': False,
63 'add-bindings': {
64 'type': 'array',
65 'minItems': 1,
66 'items': {'role': {'type': 'string'},
67 'members': {'type': 'array',
68 'items': {
69 'type': 'string'},
70 'minItems': 1}}
71 },
72 'remove-bindings': {
73 'type': 'array',
74 'minItems': 1,
75 'items': {'role': {'type': 'string'},
76 'members': {'oneOf': [
77 {'type': 'array',
78 'items': {'type': 'string'},
79 'minItems': 1},
80 {'enum': ['*']}]}}},
81 })
82 method_spec = {'op': 'setIamPolicy'}
83 schema_alias = True
85 def get_resource_params(self, model, resource):
86 """
87 Collects `existing_bindings` with the `_get_existing_bindings` method, `add_bindings` and
88 `remove_bindings` from a policy, then calls `_remove_bindings` with the result of
89 `_add_bindings` being applied to the `existing_bindings`, and finally sets the resulting
90 list at the 'bindings' key if there is at least a single record there, or assigns an empty
91 object to the 'policy' key in order to avoid errors produced by the API.
93 :param model: the parameters that are defined in a resource manager
94 :param resource: the resource the action is applied to
95 """
96 params = self._verb_arguments(resource)
97 existing_bindings = self._get_existing_bindings(model, resource)
98 add_bindings = self.data['add-bindings'] if 'add-bindings' in self.data else []
99 remove_bindings = self.data['remove-bindings'] if 'remove-bindings' in self.data else []
100 bindings_to_set = self._add_bindings(existing_bindings, add_bindings)
101 bindings_to_set = self._remove_bindings(bindings_to_set, remove_bindings)
102 params['body'] = {
103 'policy': {'bindings': bindings_to_set} if len(bindings_to_set) > 0 else {}}
104 return params
106 def _get_existing_bindings(self, model, resource):
107 """
108 Calls the `getIamPolicy` method on the resource the action is applied to and returns
109 either a list of existing bindings or an empty one if there is no 'bindings' key.
111 :param model: the same as in `get_resource_params` (needed to take `component` from)
112 :param resource: the same as in `get_resource_params` (passed into `_verb_arguments`)
113 """
114 existing_bindings = local_session(self.manager.session_factory).client(
115 self.manager.resource_type.service,
116 self.manager.resource_type.version,
117 model.component).execute_query(
118 'getIamPolicy', verb_arguments=self._verb_arguments(resource))
119 return existing_bindings['bindings'] if 'bindings' in existing_bindings else []
121 def _verb_arguments(self, resource):
122 """
123 Returns a dictionary passed when making the `getIamPolicy` and 'setIamPolicy' API calls.
125 :param resource: the same as in `get_resource_params`
126 """
127 return {'resource': resource[self.manager.resource_type.id]}
129 def _add_bindings(self, existing_bindings, bindings_to_add):
130 """
131 Converts the provided lists using `_get_roles_to_bindings_dict`, then iterates through
132 them so that the returned list combines:
133 - among the roles mentioned in a policy, the existing members merged with the ones to add
134 so that there are no duplicates,
135 - as for the other roles, all their members.
137 The roles or members that are mentioned in the policy and already present
138 in the existing bindings are simply ignored with no errors produced.
140 An empty list could be returned only if both `existing_bindings` and `bindings_to_remove`
141 are empty, the possibility of which is defined by the caller of the method.
143 For additional information on how the method works, please refer to the tests
144 (e.g. test_spanner).
146 :param existing_bindings: a list of dictionaries containing the 'role' and 'members' keys
147 taken from the resource the action is applied to
148 :param bindings_to_add: a list of dictionaries containing the 'role' and 'members' keys
149 taken from the policy
150 """
151 bindings = []
152 roles_to_existing_bindings = self._get_roles_to_bindings_dict(existing_bindings)
153 roles_to_bindings_to_add = self._get_roles_to_bindings_dict(bindings_to_add)
154 for role in roles_to_bindings_to_add:
155 updated_members = dict(roles_to_bindings_to_add[role])
156 if role in roles_to_existing_bindings:
157 existing_members = roles_to_existing_bindings[role]['members']
158 members_to_add = list(filter(lambda member: member not in existing_members,
159 updated_members['members']))
160 updated_members['members'] = existing_members + members_to_add
161 bindings.append(updated_members)
163 for role in roles_to_existing_bindings:
164 if role not in roles_to_bindings_to_add:
165 bindings.append(roles_to_existing_bindings[role])
166 return bindings
168 def _remove_bindings(self, existing_bindings, bindings_to_remove):
169 """
170 Converts the provided lists using `_get_roles_to_bindings_dict`, then iterates through
171 them so that the returned list combines:
172 - among the roles mentioned in a policy, only the members that are not marked for removal,
173 - as for the other roles, all their members.
175 The roles or members that are mentioned in the policy but are absent
176 in the existing bindings are simply ignored with no errors produced.
178 As can be observed, it is possible to have an empty list returned either if
179 `existing_bindings` is already empty or `bindings_to_remove` filters everything out.
181 In addition, a star wildcard could be used as the `members` key value (members: '*')
182 in order to remove all members from a role.
184 For additional information on how the method works, please refer to the tests
185 (e.g. test_spanner).
187 :param existing_bindings: a list of dictionaries containing the 'role' and 'members' keys
188 taken from the resource the action is applied to
189 :param bindings_to_remove: a list of dictionaries containing the 'role' and 'members' keys
190 taken from the policy
191 """
192 bindings = []
193 roles_to_existing_bindings = self._get_roles_to_bindings_dict(existing_bindings)
194 roles_to_bindings_to_remove = self._get_roles_to_bindings_dict(bindings_to_remove)
195 for role in roles_to_bindings_to_remove:
196 if (role in roles_to_existing_bindings and
197 roles_to_bindings_to_remove[role]['members'] != '*'):
198 updated_members = dict(roles_to_existing_bindings[role])
199 members_to_remove = roles_to_bindings_to_remove[role]
200 updated_members['members'] = list(filter(
201 lambda member: member not in members_to_remove['members'],
202 updated_members['members']))
203 if len(updated_members['members']) > 0:
204 bindings.append(updated_members)
206 for role in roles_to_existing_bindings:
207 if role not in roles_to_bindings_to_remove:
208 bindings.append(roles_to_existing_bindings[role])
209 return bindings
211 def _get_roles_to_bindings_dict(self, bindings_list):
212 """
213 Converts a given list to a dictionary, values under the 'role' key in elements of whose
214 become keys in the resulting dictionary while the elements themselves become values
215 associated with these keys.
217 :param bindings_list: a list whose elements are expected to have the 'role' key
218 """
219 return {binding['role']: binding for binding in bindings_list}