Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/c7n_gcp/filters/sccfindings.py: 35%
51 statements
« prev ^ index » next coverage.py v7.3.2, created at 2023-12-08 06:51 +0000
« prev ^ index » next coverage.py v7.3.2, created at 2023-12-08 06:51 +0000
1# Copyright The Cloud Custodian Authors.
2# SPDX-License-Identifier: Apache-2.0
3"""
4Security Command Center Findings suppport for GCP resources
5"""
6from c7n.filters.core import ValueFilter
7from c7n.utils import local_session, type_schema
8from c7n_gcp.provider import resources as gcp_resources
11class SecurityComandCenterFindingsFilter(ValueFilter):
12 """Filters resources based on their Security Command Center (SCC) findings.
14 .. code-block:: yaml
16 - name: bucket-contains-high-finding
17 resource: gcp.bucket
18 filters:
19 - type: scc-findings
20 org: 11111111111111
21 key: severity
22 value: HIGH
23 """
25 schema = type_schema('scc-findings', rinherit=ValueFilter.schema,
26 org={'type': 'integer'}, required=['org'])
27 required_keys = {}
28 permissions = ("securitycenter.findings.list",)
29 annotation_key = 'c7n:matched-findings'
31 def process(self, resources, event=None):
32 if not resources[0].get(self.annotation_key):
33 findings_list = self.get_findings(resources)
34 self.split_by_resource(findings_list)
35 matched = [r for r in resources if self.process_resource(r)]
36 return matched
38 def get_findings(self, resources):
39 self.findings_by_resource = {}
40 query_params = {
41 'filter': self.get_resource_filter(resources),
42 'pageSize': 1000
43 }
44 session = local_session(self.manager.session_factory)
45 client = session.client("securitycenter", "v1", "organizations.sources.findings")
46 findings_paged_list = list(client.execute_paged_query('list',
47 {'parent': 'organizations/{}/sources/-'.format(self.data['org']), **query_params}))
48 findings_list = []
49 for findings_page in findings_paged_list:
50 if findings_page.get('listFindingsResults'):
51 findings_list.extend(findings_page['listFindingsResults'])
52 return findings_list
54 def get_resource_filter(self, resources):
55 resource_filter = []
56 for r in resources:
57 resource_filter.append('resourceName:"{}"'.format(r[self.manager.resource_type.name]))
58 resource_filter.append(' OR ')
59 resource_filter.pop()
61 return ''.join(resource_filter)
63 def split_by_resource(self, finding_list):
64 for f in finding_list:
65 resource_name = f["finding"]["resourceName"].split('/')[-1]
66 resource_findings = self.findings_by_resource.get(resource_name, [])
67 resource_findings.append(f['finding'])
68 self.findings_by_resource[resource_name] = resource_findings
70 def process_resource(self, resource):
71 if not resource.get(self.annotation_key):
72 resource_name = resource[self.manager.resource_type.name]
73 resource[self.annotation_key] = self.findings_by_resource.get(resource_name, [])
75 if self.data.get('key'):
76 resource[self.annotation_key] = [
77 finding for finding in resource[self.annotation_key] if self.match(finding)]
78 return len(resource[self.annotation_key]) > 0
80 @classmethod
81 def register_resources(klass, registry, resource_class):
82 if resource_class.filter_registry:
83 resource_class.filter_registry.register('scc-findings', klass)
86gcp_resources.subscribe(SecurityComandCenterFindingsFilter.register_resources)