Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/c7n/cwe.py: 19%
58 statements
« prev ^ index » next coverage.py v7.3.2, created at 2023-12-08 06:51 +0000
« prev ^ index » next coverage.py v7.3.2, created at 2023-12-08 06:51 +0000
1# Copyright The Cloud Custodian Authors.
2# SPDX-License-Identifier: Apache-2.0
3from c7n.utils import jmespath_search, jmespath_compile
6class CloudWatchEvents:
7 """A mapping of events to resource types."""
9 # **These are just shortcuts**, you can use the policy definition to
10 # subscribe to any arbitrary cloud trail event that corresponds to
11 # a custodian resource.
13 # For common events that we want to match, just keep a short mapping.
14 # Users can specify arbitrary cloud watch events by specifying these
15 # values in their config, but keep the common case simple.
17 trail_events = {
18 # event source, resource type as keys, mapping to api call and
19 # jmespath expression
20 'ConsoleLogin': {
21 'ids': 'userIdentity.arn',
22 'source': 'signin.amazonaws.com'},
24 'CreateAutoScalingGroup': {
25 'ids': 'requestParameters.autoScalingGroupName',
26 'source': 'autoscaling.amazonaws.com'},
28 'UpdateAutoScalingGroup': {
29 'ids': 'requestParameters.autoScalingGroupName',
30 'source': 'autoscaling.amazonaws.com'},
32 'CreateBucket': {
33 'ids': 'requestParameters.bucketName',
34 'source': 's3.amazonaws.com'},
36 'CreateCluster': {
37 'ids': 'requestParameters.clusterIdentifier',
38 'source': 'redshift.amazonaws.com'},
40 'CreateLoadBalancer': {
41 'ids': 'requestParameters.loadBalancerName',
42 'source': 'elasticloadbalancing.amazonaws.com'},
44 'CreateLoadBalancerPolicy': {
45 'ids': 'requestParameters.loadBalancerName',
46 'source': 'elasticloadbalancing.amazonaws.com'},
48 'CreateDBInstance': {
49 'ids': 'requestParameters.dBInstanceIdentifier',
50 'source': 'rds.amazonaws.com'},
52 'CreateVolume': {
53 'ids': 'responseElements.volumeId',
54 'source': 'ec2.amazonaws.com'},
56 'SetLoadBalancerPoliciesOfListener': {
57 'ids': 'requestParameters.loadBalancerName',
58 'source': 'elasticloadbalancing.amazonaws.com'},
60 'CreateElasticsearchDomain': {
61 'ids': 'requestParameters.domainName',
62 'source': 'es.amazonaws.com'},
64 'CreateTable': {
65 'ids': 'requestParameters.tableName',
66 'source': 'dynamodb.amazonaws.com'},
68 'CreateFunction': {
69 'event': 'CreateFunction20150331',
70 'source': 'lambda.amazonaws.com',
71 'ids': 'requestParameters.functionName'},
73 'RunInstances': {
74 'ids': 'responseElements.instancesSet.items[].instanceId',
75 'source': 'ec2.amazonaws.com'}}
77 @classmethod
78 def get(cls, event_name):
79 return cls.trail_events.get(event_name)
81 @classmethod
82 def match(cls, event):
83 """Match a given cwe event as cloudtrail with an api call
85 That has its information filled out.
86 """
87 if 'detail' not in event:
88 return False
89 if 'eventName' not in event['detail']:
90 return False
91 k = event['detail']['eventName']
93 # We want callers to use a compiled expression, but want to avoid
94 # initialization cost of doing it without cause. Not thread safe,
95 # but usage context is lambda entry.
96 if k in cls.trail_events:
97 v = dict(cls.trail_events[k])
98 if isinstance(v['ids'], str):
99 v['ids'] = e = jmespath_compile('detail.%s' % v['ids'])
100 cls.trail_events[k]['ids'] = e
101 return v
103 return False
105 @classmethod
106 def get_trail_ids(cls, event, mode):
107 """extract resources ids from a cloud trail event."""
108 resource_ids = ()
109 event_name = event['detail']['eventName']
110 event_source = event['detail']['eventSource']
111 for e in mode.get('events', []):
112 if not isinstance(e, dict):
113 # Check if we have a short cut / alias
114 info = CloudWatchEvents.match(event)
115 if info:
116 return info['ids'].search(event)
117 continue
118 if event_name != e.get('event'):
119 continue
120 if event_source != e.get('source'):
121 continue
123 id_query = e.get('ids')
124 if not id_query:
125 raise ValueError("No id query configured")
126 evt = event
127 # be forgiving for users specifying with details or without
128 if not id_query.startswith('detail.'):
129 evt = event.get('detail', {})
130 resource_ids = jmespath_search(id_query, evt)
131 if resource_ids:
132 break
133 return resource_ids
135 @classmethod
136 def get_ids(cls, event, mode):
137 mode_type = mode.get('type')
138 if mode_type == 'ec2-instance-state':
139 resource_ids = [event.get('detail', {}).get('instance-id')]
140 elif mode_type == 'asg-instance-state':
141 resource_ids = [event.get('detail', {}).get('AutoScalingGroupName')]
142 elif mode_type != 'cloudtrail':
143 return None
144 else:
145 resource_ids = cls.get_trail_ids(event, mode)
147 if not isinstance(resource_ids, (tuple, list)):
148 resource_ids = [resource_ids]
150 return list(filter(None, resource_ids))