Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/c7n/resources/health.py: 51%
83 statements
« prev ^ index » next coverage.py v7.3.2, created at 2023-12-08 06:51 +0000
« prev ^ index » next coverage.py v7.3.2, created at 2023-12-08 06:51 +0000
1# Copyright The Cloud Custodian Authors.
2# SPDX-License-Identifier: Apache-2.0
3import itertools
5from c7n.exceptions import PolicyValidationError
6from c7n.query import QueryResourceManager, TypeInfo
7from c7n.manager import resources
8from c7n.utils import local_session, chunks
11@resources.register('health-event')
12class HealthEvents(QueryResourceManager):
13 """Query resource manager for AWS health events
14 """
16 class resource_type(TypeInfo):
17 service = 'health'
18 arn = 'arn'
19 arn_type = 'event'
20 enum_spec = ('describe_events', 'events', None)
21 name = 'eventTypeCode'
22 global_resource = True
23 id = 'arn'
24 date = 'startTime'
26 permissions = (
27 'health:DescribeEvents',
28 'health:DescribeEventDetails',
29 'health:DescribeAffectedEntities')
31 def __init__(self, ctx, data):
32 super(HealthEvents, self).__init__(ctx, data)
33 self.queries = QueryFilter.parse(
34 self.data.get('query', [
35 {'eventStatusCodes': 'open'},
36 {'eventTypeCategories': ['issue', 'accountNotification']}]))
38 def resource_query(self):
39 qf = {}
40 for q in self.queries:
41 qd = q.query()
42 if qd['Name'] in qf:
43 for qv in qf[qd['Name']]:
44 if qv in qf[qd['Name']]:
45 continue
46 qf[qd['Name']].append(qv)
47 else:
48 qf[qd['Name']] = []
49 for qv in qd['Values']:
50 qf[qd['Name']].append(qv)
51 return qf
53 def resources(self, query=None):
54 q = self.resource_query()
55 if q is not None:
56 query = query or {}
57 query['filter'] = q
58 return super(HealthEvents, self).resources(query=query)
60 def augment(self, resources):
61 client = local_session(self.session_factory).client('health')
62 for resource_set in chunks(resources, 10):
63 event_map = {r['arn']: r for r in resource_set}
64 event_details = client.describe_event_details(
65 eventArns=list(event_map.keys()))['successfulSet']
66 for d in event_details:
67 event_map[d['event']['arn']][
68 'Description'] = d['eventDescription']['latestDescription']
70 event_arns = [r['arn'] for r in resource_set
71 if r['eventTypeCategory'] != 'accountNotification']
73 if not event_arns:
74 continue
75 paginator = client.get_paginator('describe_affected_entities')
76 entities = list(itertools.chain(
77 *[p['entities']for p in paginator.paginate(
78 filter={'eventArns': event_arns})]))
80 for e in entities:
81 event_map[e.pop('eventArn')].setdefault(
82 'AffectedEntities', []).append(e)
84 return resources
87HEALTH_VALID_FILTERS = {
88 'availability-zone': str,
89 'eventTypeCategories': {'issue', 'accountNotification', 'scheduledChange'},
90 'regions': str,
91 'services': str,
92 'eventStatusCodes': {'open', 'closed', 'upcoming'},
93 'eventTypeCodes': str
94}
97class QueryFilter:
99 @classmethod
100 def parse(cls, data):
101 results = []
102 for d in data:
103 if not isinstance(d, dict):
104 raise PolicyValidationError(
105 "Health Query Filter Invalid structure %s" % d)
106 results.append(cls(d).validate())
107 return results
109 def __init__(self, data):
110 self.data = data
111 self.key = None
112 self.value = None
114 def validate(self):
115 if not len(list(self.data.keys())) == 1:
116 raise ValueError(
117 "Health Query Filter Invalid %s" % self.data)
118 self.key = list(self.data.keys())[0]
119 self.value = list(self.data.values())[0]
121 if self.key not in HEALTH_VALID_FILTERS:
122 raise PolicyValidationError(
123 "Health Query Filter invalid filter name %s" % (self.data))
125 if self.value is None:
126 raise PolicyValidationError(
127 "Health Query Filters must have a value, use tag-key"
128 " w/ tag name as value for tag present checks"
129 " %s" % self.data)
130 return self
132 def query(self):
133 value = self.value
134 if isinstance(self.value, str):
135 value = [self.value]
136 return {'Name': self.key, 'Values': value}