Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/c7n_gcp/filters/iampolicy.py: 36%
75 statements
« prev ^ index » next coverage.py v7.3.2, created at 2023-12-08 06:51 +0000
« prev ^ index » next coverage.py v7.3.2, created at 2023-12-08 06:51 +0000
1# Copyright The Cloud Custodian Authors.
2# SPDX-License-Identifier: Apache-2.0
3import copy
4from c7n.filters.core import Filter, ValueFilter
6from c7n.utils import local_session, type_schema
9class IamPolicyFilter(Filter):
10 """
11 Filters resources based on their IAM policy
12 """
14 value_filter_schema = copy.deepcopy(ValueFilter.schema)
15 del value_filter_schema['required']
17 user_role_schema = {
18 'type': 'object',
19 'additionalProperties': False,
20 'required': ['user', 'role'],
21 'properties': {
22 'user': {'type': 'string'},
23 'role': {'type': 'string'},
24 'has': {'type': 'boolean'}
25 }
26 }
28 schema = type_schema(
29 'iam-policy',
30 **{'doc': value_filter_schema,
31 'user-role': user_role_schema})
33 def get_client(self, session, model):
34 return session.client(
35 model.service, model.version, model.component)
37 def process(self, resources, event=None):
38 if 'doc' in self.data:
39 try:
40 resources = self.process_resources(resources)
41 except TypeError:
42 valueFilter = IamPolicyValueFilter(self.data['doc'], self.manager, "bucket")
43 resources = valueFilter.process(resources)
44 if 'user-role' in self.data:
45 user_role = self.data['user-role']
46 key = user_role['user']
47 val = user_role['role']
48 op = 'in' if user_role.get('has', True) else 'not-in'
49 value_type = 'swap'
50 userRolePairFilter = IamPolicyUserRolePairFilter({'key': key, 'value': val,
51 'op': op, 'value_type': value_type}, self.manager)
52 resources = userRolePairFilter.process(resources)
54 return resources
56 def process_resources(self, resources):
57 valueFilter = IamPolicyValueFilter(self.data['doc'], self.manager)
58 resources = valueFilter.process(resources)
59 return resources
62class IamPolicyValueFilter(ValueFilter):
63 """Generic value filter on resources' IAM policy bindings using jmespath
65 :example:
67 Filter all kms-cryptokeys accessible to all users
68 or all authenticated users
70 .. code-block :: yaml
72 policies:
73 - name: gcp-iam-policy-value
74 resource: gcp.kms-cryptokey
75 filters:
76 - type: iam-policy
77 doc:
78 key: "bindings[*].members[]"
79 op: intersect
80 value: ["allUsers", "allAuthenticatedUsers"]
81 """
83 schema = type_schema('iam-policy', rinherit=ValueFilter.schema,)
84# permissions = 'GCP_SERVICE.GCP_RESOURCE.getIamPolicy',)
86 def __init__(self, data, manager=None, identifier="resource"):
87 super(IamPolicyValueFilter, self).__init__(data, manager)
88 self.identifier = identifier
90 def get_client(self, session, model):
91 return session.client(
92 model.service, model.version, model.component)
94 def process(self, resources, event=None):
95 model = self.manager.get_model()
96 session = local_session(self.manager.session_factory)
97 client = self.get_client(session, model)
99 for r in resources:
100 iam_policy = client.execute_command('getIamPolicy', self._verb_arguments(r))
101 r["c7n:iamPolicy"] = iam_policy
103 return super(IamPolicyValueFilter, self).process(resources)
105 def __call__(self, r):
106 return self.match(r['c7n:iamPolicy'])
108 def _verb_arguments(self, resource):
109 """
110 Returns a dictionary passed when making the `getIamPolicy` and 'setIamPolicy' API calls.
112 :param resource: the same as in `get_resource_params`
113 """
114 return {self.identifier: resource[self.manager.resource_type.id]}
117class IamPolicyUserRolePairFilter(ValueFilter):
118 """Filters resources based on specified user-role pairs.
120 :example:
122 Filter all projects where the user test123@gmail.com does not have the owner role
124 .. code-block :: yaml
126 policies:
127 - name: gcp-iam-user-roles
128 resource: gcp.project
129 filters:
130 - type: iam-policy
131 user-role:
132 user: "user:test123@gmail.com"
133 has: false
134 role: "roles/owner"
135 """
137 schema = type_schema('iam-user-roles', rinherit=ValueFilter.schema)
138# permissions = ('resourcemanager.projects.getIamPolicy',)
140 def get_client(self, session, model):
141 return session.client(
142 model.service, model.version, model.component)
144 def process(self, resources, event=None):
145 model = self.manager.get_model()
146 session = local_session(self.manager.session_factory)
147 client = self.get_client(session, model)
149 for r in resources:
150 resource_key = 'projectId' if 'projectId' in r else 'name'
151 iam_policy = client.execute_command('getIamPolicy', {"resource": r[resource_key]})
152 r["c7n:iamPolicyUserRolePair"] = {}
153 userToRolesMap = {}
155 for b in iam_policy["bindings"]:
156 role, members = b["role"], b["members"]
157 for user in members:
158 if user in userToRolesMap:
159 userToRolesMap[user].append(role)
160 else:
161 userToRolesMap[user] = [role]
162 for user, roles in userToRolesMap.items():
163 r["c7n:iamPolicyUserRolePair"][user] = roles
165 return super(IamPolicyUserRolePairFilter, self).process(resources)
167 def __call__(self, r):
168 return self.match(r["c7n:iamPolicyUserRolePair"])
170 def _verb_arguments(self, resource, identifier="resource"):
171 """
172 Returns a dictionary passed when making the `getIamPolicy` and 'setIamPolicy' API calls.
174 :param resource: the same as in `get_resource_params`
175 """
176 return {identifier: resource[self.manager.resource_type.id]}