Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/c7n/resources/health.py: 51%

83 statements  

« prev     ^ index     » next       coverage.py v7.3.2, created at 2023-12-08 06:51 +0000

1# Copyright The Cloud Custodian Authors. 

2# SPDX-License-Identifier: Apache-2.0 

3import itertools 

4 

5from c7n.exceptions import PolicyValidationError 

6from c7n.query import QueryResourceManager, TypeInfo 

7from c7n.manager import resources 

8from c7n.utils import local_session, chunks 

9 

10 

11@resources.register('health-event') 

12class HealthEvents(QueryResourceManager): 

13 """Query resource manager for AWS health events 

14 """ 

15 

16 class resource_type(TypeInfo): 

17 service = 'health' 

18 arn = 'arn' 

19 arn_type = 'event' 

20 enum_spec = ('describe_events', 'events', None) 

21 name = 'eventTypeCode' 

22 global_resource = True 

23 id = 'arn' 

24 date = 'startTime' 

25 

26 permissions = ( 

27 'health:DescribeEvents', 

28 'health:DescribeEventDetails', 

29 'health:DescribeAffectedEntities') 

30 

31 def __init__(self, ctx, data): 

32 super(HealthEvents, self).__init__(ctx, data) 

33 self.queries = QueryFilter.parse( 

34 self.data.get('query', [ 

35 {'eventStatusCodes': 'open'}, 

36 {'eventTypeCategories': ['issue', 'accountNotification']}])) 

37 

38 def resource_query(self): 

39 qf = {} 

40 for q in self.queries: 

41 qd = q.query() 

42 if qd['Name'] in qf: 

43 for qv in qf[qd['Name']]: 

44 if qv in qf[qd['Name']]: 

45 continue 

46 qf[qd['Name']].append(qv) 

47 else: 

48 qf[qd['Name']] = [] 

49 for qv in qd['Values']: 

50 qf[qd['Name']].append(qv) 

51 return qf 

52 

53 def resources(self, query=None): 

54 q = self.resource_query() 

55 if q is not None: 

56 query = query or {} 

57 query['filter'] = q 

58 return super(HealthEvents, self).resources(query=query) 

59 

60 def augment(self, resources): 

61 client = local_session(self.session_factory).client('health') 

62 for resource_set in chunks(resources, 10): 

63 event_map = {r['arn']: r for r in resource_set} 

64 event_details = client.describe_event_details( 

65 eventArns=list(event_map.keys()))['successfulSet'] 

66 for d in event_details: 

67 event_map[d['event']['arn']][ 

68 'Description'] = d['eventDescription']['latestDescription'] 

69 

70 event_arns = [r['arn'] for r in resource_set 

71 if r['eventTypeCategory'] != 'accountNotification'] 

72 

73 if not event_arns: 

74 continue 

75 paginator = client.get_paginator('describe_affected_entities') 

76 entities = list(itertools.chain( 

77 *[p['entities']for p in paginator.paginate( 

78 filter={'eventArns': event_arns})])) 

79 

80 for e in entities: 

81 event_map[e.pop('eventArn')].setdefault( 

82 'AffectedEntities', []).append(e) 

83 

84 return resources 

85 

86 

87HEALTH_VALID_FILTERS = { 

88 'availability-zone': str, 

89 'eventTypeCategories': {'issue', 'accountNotification', 'scheduledChange'}, 

90 'regions': str, 

91 'services': str, 

92 'eventStatusCodes': {'open', 'closed', 'upcoming'}, 

93 'eventTypeCodes': str 

94} 

95 

96 

97class QueryFilter: 

98 

99 @classmethod 

100 def parse(cls, data): 

101 results = [] 

102 for d in data: 

103 if not isinstance(d, dict): 

104 raise PolicyValidationError( 

105 "Health Query Filter Invalid structure %s" % d) 

106 results.append(cls(d).validate()) 

107 return results 

108 

109 def __init__(self, data): 

110 self.data = data 

111 self.key = None 

112 self.value = None 

113 

114 def validate(self): 

115 if not len(list(self.data.keys())) == 1: 

116 raise ValueError( 

117 "Health Query Filter Invalid %s" % self.data) 

118 self.key = list(self.data.keys())[0] 

119 self.value = list(self.data.values())[0] 

120 

121 if self.key not in HEALTH_VALID_FILTERS: 

122 raise PolicyValidationError( 

123 "Health Query Filter invalid filter name %s" % (self.data)) 

124 

125 if self.value is None: 

126 raise PolicyValidationError( 

127 "Health Query Filters must have a value, use tag-key" 

128 " w/ tag name as value for tag present checks" 

129 " %s" % self.data) 

130 return self 

131 

132 def query(self): 

133 value = self.value 

134 if isinstance(self.value, str): 

135 value = [self.value] 

136 return {'Name': self.key, 'Values': value}