1# Copyright The Cloud Custodian Authors.
2# SPDX-License-Identifier: Apache-2.0
3"""
4CloudWatch Metrics suppport for resources
5"""
6import re
7
8from collections import namedtuple
9from concurrent.futures import as_completed
10from datetime import datetime, timedelta
11
12from c7n.exceptions import PolicyValidationError
13from c7n.filters.core import Filter, OPERATORS
14from c7n.utils import local_session, type_schema, chunks
15
16
17class MetricsFilter(Filter):
18 """Supports cloud watch metrics filters on resources.
19
20 All resources that have cloud watch metrics are supported.
21
22 Docs on cloud watch metrics
23
24 - GetMetricStatistics
25 https://docs.aws.amazon.com/AmazonCloudWatch/latest/APIReference/API_GetMetricStatistics.html
26
27 - Supported Metrics
28 https://docs.aws.amazon.com/AmazonCloudWatch/latest/monitoring/aws-services-cloudwatch-metrics.html
29
30 .. code-block:: yaml
31
32 - name: ec2-underutilized
33 resource: ec2
34 filters:
35 - type: metrics
36 name: CPUUtilization
37 days: 4
38 period: 86400
39 value: 30
40 op: less-than
41
42 Note periods when a resource is not sending metrics are not part
43 of calculated statistics as in the case of a stopped ec2 instance,
44 nor for resources to new to have existed the entire
45 period. ie. being stopped for an ec2 instance wouldn't lower the
46 average cpu utilization.
47
48 The "missing-value" key allows a policy to specify a default
49 value when CloudWatch has no data to report:
50
51 .. code-block:: yaml
52
53 - name: elb-low-request-count
54 resource: elb
55 filters:
56 - type: metrics
57 name: RequestCount
58 statistics: Sum
59 days: 7
60 value: 7
61 missing-value: 0
62 op: less-than
63
64 This policy matches any ELB with fewer than 7 requests for the past week.
65 ELBs with no requests during that time will have an empty set of metrics.
66 Rather than skipping those resources, "missing-value: 0" causes the
67 policy to treat their request counts as 0.
68
69 Note the default statistic for metrics is Average.
70 """
71
72 schema = type_schema(
73 'metrics',
74 **{'namespace': {'type': 'string'},
75 'name': {'type': 'string'},
76 'dimensions': {
77 'type': 'object',
78 'patternProperties': {
79 '^.*$': {'type': 'string'}}},
80 # Type choices
81 'statistics': {'type': 'string'},
82 'days': {'type': 'number'},
83 'op': {'type': 'string', 'enum': list(OPERATORS.keys())},
84 'value': {'type': 'number'},
85 'period': {'type': 'number'},
86 'attr-multiplier': {'type': 'number'},
87 'percent-attr': {'type': 'string'},
88 'missing-value': {'type': 'number'},
89 'required': ('value', 'name')})
90 schema_alias = True
91 permissions = ("cloudwatch:GetMetricStatistics",)
92
93 MAX_QUERY_POINTS = 50850
94 MAX_RESULT_POINTS = 1440
95
96 # Default per service, for overloaded services like ec2
97 # we do type specific default namespace annotation
98 # specifically AWS/EBS and AWS/EC2Spot
99
100 # ditto for spot fleet
101 DEFAULT_NAMESPACE = {
102 'apigateway': 'AWS/ApiGateway',
103 'cloudfront': 'AWS/CloudFront',
104 'cloudsearch': 'AWS/CloudSearch',
105 'dynamodb': 'AWS/DynamoDB',
106 'ecs': 'AWS/ECS',
107 'ecr': 'AWS/ECR',
108 'efs': 'AWS/EFS',
109 'elasticache': 'AWS/ElastiCache',
110 'ec2': 'AWS/EC2',
111 'elb': 'AWS/ELB',
112 'elbv2': 'AWS/ApplicationELB',
113 'emr': 'AWS/ElasticMapReduce',
114 'es': 'AWS/ES',
115 'events': 'AWS/Events',
116 'firehose': 'AWS/Firehose',
117 'kinesis': 'AWS/Kinesis',
118 'lambda': 'AWS/Lambda',
119 'logs': 'AWS/Logs',
120 'redshift': 'AWS/Redshift',
121 'rds': 'AWS/RDS',
122 'route53': 'AWS/Route53',
123 's3': 'AWS/S3',
124 'sns': 'AWS/SNS',
125 'sqs': 'AWS/SQS',
126 'workspaces': 'AWS/WorkSpaces',
127 }
128
129 standard_stats = {'Average', 'Sum', 'Maximum', 'Minimum', 'SampleCount'}
130 extended_stats_re = re.compile(r'^p\d{1,3}\.{0,1}\d{0,1}$')
131
132 def __init__(self, data, manager=None):
133 super(MetricsFilter, self).__init__(data, manager)
134 self.days = self.data.get('days', 14)
135
136 def validate(self):
137 stats = self.data.get('statistics', 'Average')
138 if stats not in self.standard_stats and not self.extended_stats_re.match(stats):
139 raise PolicyValidationError(
140 "metrics filter statistics method %s not supported" % stats)
141
142 if self.days > 455:
143 raise PolicyValidationError(
144 "metrics filter days value (%s) cannot exceed 455" % self.days)
145
146 def get_metric_window(self):
147 """Determine start and end times for the CloudWatch metric window
148
149 Ensure that the window aligns with time segments based on CloudWatch's retention
150 schedule defined here:
151
152 https://docs.aws.amazon.com/AmazonCloudWatch/latest/monitoring/cloudwatch_concepts.html#Metric
153 """ # noqa: E501
154
155 duration = timedelta(self.days)
156 now = datetime.utcnow()
157 MetricWindow = namedtuple('MetricWindow', 'start end')
158
159 if duration <= timedelta(days=(1 / 8.0)):
160 # Align period with the start of the next second
161 # CloudWatch retention: 3 hours
162 end = now.replace(microsecond=0) + timedelta(seconds=1)
163 elif duration <= timedelta(days=15):
164 # Align period with the start of the next minute
165 # CloudWatch retention: 15 days
166 end = now.replace(second=0, microsecond=0) + timedelta(minutes=1)
167 elif duration <= timedelta(days=63):
168 # Align period with the start of the next five-minute block
169 # CloudWatch retention: 63 days
170 end = (now.replace(minute=(now.minute // 5) * 5, second=0, microsecond=0)
171 + timedelta(minutes=5))
172 else:
173 # Align period with the start of the next hour
174 # CloudWatch retention: 455 days
175 end = now.replace(minute=0, second=0, microsecond=0) + timedelta(hours=1)
176
177 return MetricWindow((end - duration), end)
178
179 def process(self, resources, event=None):
180 self.start, self.end = self.get_metric_window()
181 self.metric = self.data['name']
182 self.period = int(self.data.get('period', (self.end - self.start).total_seconds()))
183 self.statistics = self.data.get('statistics', 'Average')
184 self.model = self.manager.get_model()
185 self.op = OPERATORS[self.data.get('op', 'less-than')]
186 self.value = self.data['value']
187
188 ns = self.data.get('namespace')
189 if not ns:
190 ns = getattr(self.model, 'metrics_namespace', None)
191 if not ns:
192 ns = self.DEFAULT_NAMESPACE[self.model.service]
193 self.namespace = ns
194
195 self.log.debug("Querying metrics for %d", len(resources))
196 matched = []
197 with self.executor_factory(max_workers=3) as w:
198 futures = []
199 for resource_set in chunks(resources, 50):
200 futures.append(
201 w.submit(self.process_resource_set, resource_set))
202
203 for f in as_completed(futures):
204 if f.exception():
205 self.log.warning(
206 "CW Retrieval error: %s" % f.exception())
207 continue
208 matched.extend(f.result())
209 return matched
210
211 def get_dimensions(self, resource):
212 return [{'Name': self.model.dimension,
213 'Value': resource[self.model.dimension]}]
214
215 def get_user_dimensions(self):
216 dims = []
217 if 'dimensions' not in self.data:
218 return dims
219 for k, v in self.data['dimensions'].items():
220 dims.append({'Name': k, 'Value': v})
221 return dims
222
223 def process_resource_set(self, resource_set):
224 client = local_session(
225 self.manager.session_factory).client('cloudwatch')
226
227 matched = []
228 for r in resource_set:
229 # if we overload dimensions with multiple resources we get
230 # the statistics/average over those resources.
231 dimensions = self.get_dimensions(r)
232 # Merge in any filter specified metrics, get_dimensions is
233 # commonly overridden so we can't do it there.
234 dimensions.extend(self.get_user_dimensions())
235
236 collected_metrics = r.setdefault('c7n.metrics', {})
237 # Note this annotation cache is policy scoped, not across
238 # policies, still the lack of full qualification on the key
239 # means multiple filters within a policy using the same metric
240 # across different periods or dimensions would be problematic.
241 key = "%s.%s.%s.%s" % (self.namespace, self.metric, self.statistics, str(self.days))
242
243 params = dict(
244 Namespace=self.namespace,
245 MetricName=self.metric,
246 StartTime=self.start,
247 EndTime=self.end,
248 Period=self.period,
249 Dimensions=dimensions
250 )
251
252 stats_key = (self.statistics in self.standard_stats
253 and 'Statistics' or 'ExtendedStatistics')
254 params[stats_key] = [self.statistics]
255
256 if key not in collected_metrics:
257 collected_metrics[key] = client.get_metric_statistics(
258 **params)['Datapoints']
259
260 # In certain cases CloudWatch reports no data for a metric.
261 # If the policy specifies a fill value for missing data, add
262 # that here before testing for matches. Otherwise, skip
263 # matching entirely.
264 if len(collected_metrics[key]) == 0:
265 if 'missing-value' not in self.data:
266 continue
267 collected_metrics[key].append({
268 'Timestamp': self.start,
269 self.statistics: self.data['missing-value'],
270 'c7n:detail': 'Fill value for missing data'
271 })
272
273 if self.data.get('percent-attr'):
274 rvalue = r[self.data.get('percent-attr')]
275 if self.data.get('attr-multiplier'):
276 rvalue = rvalue * self.data['attr-multiplier']
277 all_meet_condition = True
278 for data_point in collected_metrics[key]:
279 percent = (data_point[self.statistics] / rvalue * 100)
280 if not self.op(percent, self.value):
281 all_meet_condition = False
282 break
283 if all_meet_condition:
284 matched.append(r)
285 else:
286 all_meet_condition = True
287 for data_point in collected_metrics[key]:
288 if not self.op(data_point[self.statistics], self.value):
289 all_meet_condition = False
290 break
291 if all_meet_condition:
292 matched.append(r)
293 return matched
294
295
296class ShieldMetrics(MetricsFilter):
297 """Specialized metrics filter for shield
298 """
299 schema = type_schema('shield-metrics', rinherit=MetricsFilter.schema)
300
301 namespace = "AWS/DDoSProtection"
302 metrics = (
303 'DDoSAttackBitsPerSecond',
304 'DDoSAttackRequestsPerSecond',
305 'DDoSDetected')
306
307 attack_vectors = (
308 'ACKFlood',
309 'ChargenReflection',
310 'DNSReflection',
311 'GenericUDPReflection',
312 'MSSQLReflection',
313 'NetBIOSReflection',
314 'NTPReflection',
315 'PortMapper',
316 'RequestFlood',
317 'RIPReflection',
318 'SNMPReflection',
319 'SYNFlood',
320 'SSDPReflection',
321 'UDPTraffic',
322 'UDPFragment')
323
324 def validate(self):
325 if self.data.get('name') not in self.metrics:
326 raise PolicyValidationError(
327 "invalid shield metric %s valid:%s on %s" % (
328 self.data['name'],
329 ", ".join(self.metrics),
330 self.manager.data))
331
332 def get_dimensions(self, resource):
333 return [{
334 'Name': 'ResourceArn',
335 'Value': self.manager.get_arns([resource])[0]}]
336
337 def process(self, resources, event=None):
338 self.data['namespace'] = self.namespace
339 return super(ShieldMetrics, self).process(resources, event)