Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/c7n/cwe.py: 19%

58 statements  

« prev     ^ index     » next       coverage.py v7.3.2, created at 2023-12-08 06:51 +0000

1# Copyright The Cloud Custodian Authors. 

2# SPDX-License-Identifier: Apache-2.0 

3from c7n.utils import jmespath_search, jmespath_compile 

4 

5 

6class CloudWatchEvents: 

7 """A mapping of events to resource types.""" 

8 

9 # **These are just shortcuts**, you can use the policy definition to 

10 # subscribe to any arbitrary cloud trail event that corresponds to 

11 # a custodian resource. 

12 

13 # For common events that we want to match, just keep a short mapping. 

14 # Users can specify arbitrary cloud watch events by specifying these 

15 # values in their config, but keep the common case simple. 

16 

17 trail_events = { 

18 # event source, resource type as keys, mapping to api call and 

19 # jmespath expression 

20 'ConsoleLogin': { 

21 'ids': 'userIdentity.arn', 

22 'source': 'signin.amazonaws.com'}, 

23 

24 'CreateAutoScalingGroup': { 

25 'ids': 'requestParameters.autoScalingGroupName', 

26 'source': 'autoscaling.amazonaws.com'}, 

27 

28 'UpdateAutoScalingGroup': { 

29 'ids': 'requestParameters.autoScalingGroupName', 

30 'source': 'autoscaling.amazonaws.com'}, 

31 

32 'CreateBucket': { 

33 'ids': 'requestParameters.bucketName', 

34 'source': 's3.amazonaws.com'}, 

35 

36 'CreateCluster': { 

37 'ids': 'requestParameters.clusterIdentifier', 

38 'source': 'redshift.amazonaws.com'}, 

39 

40 'CreateLoadBalancer': { 

41 'ids': 'requestParameters.loadBalancerName', 

42 'source': 'elasticloadbalancing.amazonaws.com'}, 

43 

44 'CreateLoadBalancerPolicy': { 

45 'ids': 'requestParameters.loadBalancerName', 

46 'source': 'elasticloadbalancing.amazonaws.com'}, 

47 

48 'CreateDBInstance': { 

49 'ids': 'requestParameters.dBInstanceIdentifier', 

50 'source': 'rds.amazonaws.com'}, 

51 

52 'CreateVolume': { 

53 'ids': 'responseElements.volumeId', 

54 'source': 'ec2.amazonaws.com'}, 

55 

56 'SetLoadBalancerPoliciesOfListener': { 

57 'ids': 'requestParameters.loadBalancerName', 

58 'source': 'elasticloadbalancing.amazonaws.com'}, 

59 

60 'CreateElasticsearchDomain': { 

61 'ids': 'requestParameters.domainName', 

62 'source': 'es.amazonaws.com'}, 

63 

64 'CreateTable': { 

65 'ids': 'requestParameters.tableName', 

66 'source': 'dynamodb.amazonaws.com'}, 

67 

68 'CreateFunction': { 

69 'event': 'CreateFunction20150331', 

70 'source': 'lambda.amazonaws.com', 

71 'ids': 'requestParameters.functionName'}, 

72 

73 'RunInstances': { 

74 'ids': 'responseElements.instancesSet.items[].instanceId', 

75 'source': 'ec2.amazonaws.com'}} 

76 

77 @classmethod 

78 def get(cls, event_name): 

79 return cls.trail_events.get(event_name) 

80 

81 @classmethod 

82 def match(cls, event): 

83 """Match a given cwe event as cloudtrail with an api call 

84 

85 That has its information filled out. 

86 """ 

87 if 'detail' not in event: 

88 return False 

89 if 'eventName' not in event['detail']: 

90 return False 

91 k = event['detail']['eventName'] 

92 

93 # We want callers to use a compiled expression, but want to avoid 

94 # initialization cost of doing it without cause. Not thread safe, 

95 # but usage context is lambda entry. 

96 if k in cls.trail_events: 

97 v = dict(cls.trail_events[k]) 

98 if isinstance(v['ids'], str): 

99 v['ids'] = e = jmespath_compile('detail.%s' % v['ids']) 

100 cls.trail_events[k]['ids'] = e 

101 return v 

102 

103 return False 

104 

105 @classmethod 

106 def get_trail_ids(cls, event, mode): 

107 """extract resources ids from a cloud trail event.""" 

108 resource_ids = () 

109 event_name = event['detail']['eventName'] 

110 event_source = event['detail']['eventSource'] 

111 for e in mode.get('events', []): 

112 if not isinstance(e, dict): 

113 # Check if we have a short cut / alias 

114 info = CloudWatchEvents.match(event) 

115 if info: 

116 return info['ids'].search(event) 

117 continue 

118 if event_name != e.get('event'): 

119 continue 

120 if event_source != e.get('source'): 

121 continue 

122 

123 id_query = e.get('ids') 

124 if not id_query: 

125 raise ValueError("No id query configured") 

126 evt = event 

127 # be forgiving for users specifying with details or without 

128 if not id_query.startswith('detail.'): 

129 evt = event.get('detail', {}) 

130 resource_ids = jmespath_search(id_query, evt) 

131 if resource_ids: 

132 break 

133 return resource_ids 

134 

135 @classmethod 

136 def get_ids(cls, event, mode): 

137 mode_type = mode.get('type') 

138 if mode_type == 'ec2-instance-state': 

139 resource_ids = [event.get('detail', {}).get('instance-id')] 

140 elif mode_type == 'asg-instance-state': 

141 resource_ids = [event.get('detail', {}).get('AutoScalingGroupName')] 

142 elif mode_type != 'cloudtrail': 

143 return None 

144 else: 

145 resource_ids = cls.get_trail_ids(event, mode) 

146 

147 if not isinstance(resource_ids, (tuple, list)): 

148 resource_ids = [resource_ids] 

149 

150 return list(filter(None, resource_ids))