Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/c7n_gcp/filters/sccfindings.py: 35%

51 statements  

« prev     ^ index     » next       coverage.py v7.3.2, created at 2023-12-08 06:51 +0000

1# Copyright The Cloud Custodian Authors. 

2# SPDX-License-Identifier: Apache-2.0 

3""" 

4Security Command Center Findings suppport for GCP resources 

5""" 

6from c7n.filters.core import ValueFilter 

7from c7n.utils import local_session, type_schema 

8from c7n_gcp.provider import resources as gcp_resources 

9 

10 

11class SecurityComandCenterFindingsFilter(ValueFilter): 

12 """Filters resources based on their Security Command Center (SCC) findings. 

13 

14 .. code-block:: yaml 

15 

16 - name: bucket-contains-high-finding 

17 resource: gcp.bucket 

18 filters: 

19 - type: scc-findings 

20 org: 11111111111111 

21 key: severity 

22 value: HIGH 

23 """ 

24 

25 schema = type_schema('scc-findings', rinherit=ValueFilter.schema, 

26 org={'type': 'integer'}, required=['org']) 

27 required_keys = {} 

28 permissions = ("securitycenter.findings.list",) 

29 annotation_key = 'c7n:matched-findings' 

30 

31 def process(self, resources, event=None): 

32 if not resources[0].get(self.annotation_key): 

33 findings_list = self.get_findings(resources) 

34 self.split_by_resource(findings_list) 

35 matched = [r for r in resources if self.process_resource(r)] 

36 return matched 

37 

38 def get_findings(self, resources): 

39 self.findings_by_resource = {} 

40 query_params = { 

41 'filter': self.get_resource_filter(resources), 

42 'pageSize': 1000 

43 } 

44 session = local_session(self.manager.session_factory) 

45 client = session.client("securitycenter", "v1", "organizations.sources.findings") 

46 findings_paged_list = list(client.execute_paged_query('list', 

47 {'parent': 'organizations/{}/sources/-'.format(self.data['org']), **query_params})) 

48 findings_list = [] 

49 for findings_page in findings_paged_list: 

50 if findings_page.get('listFindingsResults'): 

51 findings_list.extend(findings_page['listFindingsResults']) 

52 return findings_list 

53 

54 def get_resource_filter(self, resources): 

55 resource_filter = [] 

56 for r in resources: 

57 resource_filter.append('resourceName:"{}"'.format(r[self.manager.resource_type.name])) 

58 resource_filter.append(' OR ') 

59 resource_filter.pop() 

60 

61 return ''.join(resource_filter) 

62 

63 def split_by_resource(self, finding_list): 

64 for f in finding_list: 

65 resource_name = f["finding"]["resourceName"].split('/')[-1] 

66 resource_findings = self.findings_by_resource.get(resource_name, []) 

67 resource_findings.append(f['finding']) 

68 self.findings_by_resource[resource_name] = resource_findings 

69 

70 def process_resource(self, resource): 

71 if not resource.get(self.annotation_key): 

72 resource_name = resource[self.manager.resource_type.name] 

73 resource[self.annotation_key] = self.findings_by_resource.get(resource_name, []) 

74 

75 if self.data.get('key'): 

76 resource[self.annotation_key] = [ 

77 finding for finding in resource[self.annotation_key] if self.match(finding)] 

78 return len(resource[self.annotation_key]) > 0 

79 

80 @classmethod 

81 def register_resources(klass, registry, resource_class): 

82 if resource_class.filter_registry: 

83 resource_class.filter_registry.register('scc-findings', klass) 

84 

85 

86gcp_resources.subscribe(SecurityComandCenterFindingsFilter.register_resources)