Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/c7n_gcp/filters/alerts.py: 56%

32 statements  

« prev     ^ index     » next       coverage.py v7.3.2, created at 2023-12-08 06:51 +0000

1# Copyright The Cloud Custodian Authors. 

2# SPDX-License-Identifier: Apache-2.0 

3""" 

4GCP Filter to find Alerts for a Log Metric Filter 

5""" 

6from c7n.filters.core import ValueFilter 

7from c7n.utils import local_session, type_schema, jmespath_search 

8from c7n_gcp.resources.logging import LogProjectMetric 

9 

10 

11@LogProjectMetric.filter_registry.register('alerts') 

12class AlertsFilter(ValueFilter): 

13 """GCP Filter to find Alerts for a Log Metric Filter. 

14 

15 .. code-block:: yaml 

16 

17 - name: log-metric-filter-has-alert 

18 resource: gcp.log-project-metric 

19 filters: 

20 - type: alerts 

21 """ 

22 

23 schema = type_schema('alerts') 

24 required_keys = {} 

25 permissions = ("monitoring.alertPolicies.list",) 

26 

27 def process(self, resources, event=None): 

28 self.findings_list = self.get_findings() 

29 matched = [r for r in resources if self.process_resource(r)] 

30 return matched 

31 

32 def get_findings(self): 

33 query_params = { 

34 'pageSize': 1000 

35 } 

36 session = local_session(self.manager.session_factory) 

37 client = session.client("monitoring", "v3", "projects.alertPolicies") 

38 project = session.get_default_project() 

39 findings_paged_list = list(client.execute_paged_query('list', 

40 {'name': 'projects/' + project, **query_params})) 

41 findings_list = [] 

42 for findings_page in findings_paged_list: 

43 if findings_page.get('alertPolicies'): 

44 findings_list.extend(findings_page['alertPolicies']) 

45 return findings_list 

46 

47 def process_resource(self, resource): 

48 resource_name = resource['name'] 

49 enabled_alerts = jmespath_search("[?enabled]", self.findings_list) 

50 search_string = "[*].conditions[?contains(conditionThreshold.filter,\ 

51 'metric.type=\"logging.googleapis.com/user/" + resource_name + "\"')]" 

52 result = jmespath_search(search_string, enabled_alerts) 

53 

54 if not any(result): 

55 return False 

56 else: 

57 return True