1# Copyright The Cloud Custodian Authors.
2# SPDX-License-Identifier: Apache-2.0
3"""
4CloudWatch Metrics suppport for resources
5"""
6import re
7
8from collections import namedtuple
9from concurrent.futures import as_completed
10from datetime import datetime, timedelta
11
12from c7n.exceptions import PolicyValidationError
13from c7n.filters.core import Filter, OPERATORS
14from c7n.utils import local_session, type_schema, chunks
15
16
17class MetricsFilter(Filter):
18 """Supports cloud watch metrics filters on resources.
19
20 All resources that have cloud watch metrics are supported.
21
22 Docs on cloud watch metrics
23
24 - GetMetricStatistics
25 https://docs.aws.amazon.com/AmazonCloudWatch/latest/APIReference/API_GetMetricStatistics.html
26
27 - Supported Metrics
28 https://docs.aws.amazon.com/AmazonCloudWatch/latest/monitoring/aws-services-cloudwatch-metrics.html
29
30 .. code-block:: yaml
31
32 - name: ec2-underutilized
33 resource: ec2
34 filters:
35 - type: metrics
36 name: CPUUtilization
37 days: 4
38 period: 86400
39 value: 30
40 op: less-than
41
42 Note periods when a resource is not sending metrics are not part
43 of calculated statistics as in the case of a stopped ec2 instance,
44 nor for resources to new to have existed the entire
45 period. ie. being stopped for an ec2 instance wouldn't lower the
46 average cpu utilization.
47
48 The "missing-value" key allows a policy to specify a default
49 value when CloudWatch has no data to report:
50
51 .. code-block:: yaml
52
53 - name: elb-low-request-count
54 resource: elb
55 filters:
56 - type: metrics
57 name: RequestCount
58 statistics: Sum
59 days: 7
60 value: 7
61 missing-value: 0
62 op: less-than
63
64 This policy matches any ELB with fewer than 7 requests for the past week.
65 ELBs with no requests during that time will have an empty set of metrics.
66 Rather than skipping those resources, "missing-value: 0" causes the
67 policy to treat their request counts as 0.
68
69 Note the default statistic for metrics is Average.
70 """
71
72 schema = type_schema(
73 'metrics',
74 **{'namespace': {'type': 'string'},
75 'name': {'type': 'string'},
76 'dimensions': {
77 'type': 'object',
78 'patternProperties': {
79 '^.*$': {'type': 'string'}}},
80 # Type choices
81 'statistics': {'type': 'string'},
82 'days': {'type': 'number'},
83 'op': {'type': 'string', 'enum': list(OPERATORS.keys())},
84 'value': {'type': 'number'},
85 'period': {'type': 'number'},
86 'attr-multiplier': {'type': 'number'},
87 'percent-attr': {'type': 'string'},
88 'missing-value': {'type': 'number'},
89 'required': ('value', 'name')})
90 schema_alias = True
91 permissions = ("cloudwatch:GetMetricStatistics",)
92
93 MAX_QUERY_POINTS = 50850
94 MAX_RESULT_POINTS = 1440
95
96 # Default per service, for overloaded services like ec2
97 # we do type specific default namespace annotation
98 # specifically AWS/EBS and AWS/EC2Spot
99
100 # ditto for spot fleet
101 DEFAULT_NAMESPACE = {
102 'apigateway': 'AWS/ApiGateway',
103 'cloudfront': 'AWS/CloudFront',
104 'cloudsearch': 'AWS/CloudSearch',
105 'dynamodb': 'AWS/DynamoDB',
106 'ecs': 'AWS/ECS',
107 'ecr': 'AWS/ECR',
108 'efs': 'AWS/EFS',
109 'elasticache': 'AWS/ElastiCache',
110 'ec2': 'AWS/EC2',
111 'elb': 'AWS/ELB',
112 'elbv2': 'AWS/ApplicationELB',
113 'emr': 'AWS/ElasticMapReduce',
114 'es': 'AWS/ES',
115 'events': 'AWS/Events',
116 'firehose': 'AWS/Firehose',
117 'fsx': 'AWS/FSx',
118 'kinesis': 'AWS/Kinesis',
119 'lambda': 'AWS/Lambda',
120 'logs': 'AWS/Logs',
121 'redshift': 'AWS/Redshift',
122 'rds': 'AWS/RDS',
123 'route53': 'AWS/Route53',
124 's3': 'AWS/S3',
125 'sns': 'AWS/SNS',
126 'sqs': 'AWS/SQS',
127 'workspaces': 'AWS/WorkSpaces',
128 }
129
130 standard_stats = {'Average', 'Sum', 'Maximum', 'Minimum', 'SampleCount'}
131 extended_stats_re = re.compile(r'^p\d{1,3}\.{0,1}\d{0,1}$')
132
133 def __init__(self, data, manager=None):
134 super(MetricsFilter, self).__init__(data, manager)
135 self.days = self.data.get('days', 14)
136
137 def validate(self):
138 stats = self.data.get('statistics', 'Average')
139 if stats not in self.standard_stats and not self.extended_stats_re.match(stats):
140 raise PolicyValidationError(
141 "metrics filter statistics method %s not supported" % stats)
142
143 if self.days > 455:
144 raise PolicyValidationError(
145 "metrics filter days value (%s) cannot exceed 455" % self.days)
146
147 def get_metric_window(self):
148 """Determine start and end times for the CloudWatch metric window
149
150 Ensure that the window aligns with time segments based on CloudWatch's retention
151 schedule defined here:
152
153 https://docs.aws.amazon.com/AmazonCloudWatch/latest/monitoring/cloudwatch_concepts.html#Metric
154 """ # noqa: E501
155
156 duration = timedelta(self.days)
157 now = datetime.utcnow()
158 MetricWindow = namedtuple('MetricWindow', 'start end')
159
160 if duration <= timedelta(days=(1 / 8.0)):
161 # Align period with the start of the next second
162 # CloudWatch retention: 3 hours
163 end = now.replace(microsecond=0) + timedelta(seconds=1)
164 elif duration <= timedelta(days=15):
165 # Align period with the start of the next minute
166 # CloudWatch retention: 15 days
167 end = now.replace(second=0, microsecond=0) + timedelta(minutes=1)
168 elif duration <= timedelta(days=63):
169 # Align period with the start of the next five-minute block
170 # CloudWatch retention: 63 days
171 end = (now.replace(minute=(now.minute // 5) * 5, second=0, microsecond=0)
172 + timedelta(minutes=5))
173 else:
174 # Align period with the start of the next hour
175 # CloudWatch retention: 455 days
176 end = now.replace(minute=0, second=0, microsecond=0) + timedelta(hours=1)
177
178 return MetricWindow((end - duration), end)
179
180 def process(self, resources, event=None):
181 self.start, self.end = self.get_metric_window()
182 self.metric = self.data['name']
183 self.period = int(self.data.get('period', (self.end - self.start).total_seconds()))
184 self.statistics = self.data.get('statistics', 'Average')
185 self.model = self.manager.get_model()
186 self.op = OPERATORS[self.data.get('op', 'less-than')]
187 self.value = self.data['value']
188
189 ns = self.data.get('namespace')
190 if not ns:
191 ns = getattr(self.model, 'metrics_namespace', None)
192 if not ns:
193 ns = self.DEFAULT_NAMESPACE[self.model.service]
194 self.namespace = ns
195
196 self.log.debug("Querying metrics for %d", len(resources))
197 matched = []
198 with self.executor_factory(max_workers=3) as w:
199 futures = []
200 for resource_set in chunks(resources, 50):
201 futures.append(
202 w.submit(self.process_resource_set, resource_set))
203
204 for f in as_completed(futures):
205 if f.exception():
206 self.log.warning(
207 "CW Retrieval error: %s" % f.exception())
208 continue
209 matched.extend(f.result())
210 return matched
211
212 def get_dimensions(self, resource):
213 return [{'Name': self.model.dimension,
214 'Value': resource[self.model.dimension]}]
215
216 def get_user_dimensions(self):
217 dims = []
218 if 'dimensions' not in self.data:
219 return dims
220 for k, v in self.data['dimensions'].items():
221 dims.append({'Name': k, 'Value': v})
222 return dims
223
224 def process_resource_set(self, resource_set):
225 client = local_session(
226 self.manager.session_factory).client('cloudwatch')
227
228 matched = []
229 for r in resource_set:
230 # if we overload dimensions with multiple resources we get
231 # the statistics/average over those resources.
232 dimensions = self.get_dimensions(r)
233 # Merge in any filter specified metrics, get_dimensions is
234 # commonly overridden so we can't do it there.
235 dimensions.extend(self.get_user_dimensions())
236
237 collected_metrics = r.setdefault('c7n.metrics', {})
238 # Note this annotation cache is policy scoped, not across
239 # policies, still the lack of full qualification on the key
240 # means multiple filters within a policy using the same metric
241 # across different periods or dimensions would be problematic.
242 key = "%s.%s.%s.%s" % (self.namespace, self.metric, self.statistics, str(self.days))
243
244 params = dict(
245 Namespace=self.namespace,
246 MetricName=self.metric,
247 StartTime=self.start,
248 EndTime=self.end,
249 Period=self.period,
250 Dimensions=dimensions
251 )
252
253 stats_key = (self.statistics in self.standard_stats
254 and 'Statistics' or 'ExtendedStatistics')
255 params[stats_key] = [self.statistics]
256
257 if key not in collected_metrics:
258 collected_metrics[key] = client.get_metric_statistics(
259 **params)['Datapoints']
260
261 # In certain cases CloudWatch reports no data for a metric.
262 # If the policy specifies a fill value for missing data, add
263 # that here before testing for matches. Otherwise, skip
264 # matching entirely.
265 if len(collected_metrics[key]) == 0:
266 if 'missing-value' not in self.data:
267 continue
268 collected_metrics[key].append({
269 'Timestamp': self.start,
270 self.statistics: self.data['missing-value'],
271 'c7n:detail': 'Fill value for missing data'
272 })
273
274 if self.data.get('percent-attr'):
275 rvalue = r[self.data.get('percent-attr')]
276 if self.data.get('attr-multiplier'):
277 rvalue = rvalue * self.data['attr-multiplier']
278 all_meet_condition = True
279 for data_point in collected_metrics[key]:
280 percent = (data_point[self.statistics] / rvalue * 100)
281 if not self.op(percent, self.value):
282 all_meet_condition = False
283 break
284 if all_meet_condition:
285 matched.append(r)
286 else:
287 all_meet_condition = True
288 for data_point in collected_metrics[key]:
289 if 'ExtendedStatistics' in data_point:
290 data_point = data_point['ExtendedStatistics']
291 if not self.op(data_point[self.statistics], self.value):
292 all_meet_condition = False
293 break
294 if all_meet_condition:
295 matched.append(r)
296 return matched
297
298
299class ShieldMetrics(MetricsFilter):
300 """Specialized metrics filter for shield
301 """
302 schema = type_schema('shield-metrics', rinherit=MetricsFilter.schema)
303
304 namespace = "AWS/DDoSProtection"
305 metrics = (
306 'DDoSAttackBitsPerSecond',
307 'DDoSAttackRequestsPerSecond',
308 'DDoSDetected')
309
310 attack_vectors = (
311 'ACKFlood',
312 'ChargenReflection',
313 'DNSReflection',
314 'GenericUDPReflection',
315 'MSSQLReflection',
316 'NetBIOSReflection',
317 'NTPReflection',
318 'PortMapper',
319 'RequestFlood',
320 'RIPReflection',
321 'SNMPReflection',
322 'SYNFlood',
323 'SSDPReflection',
324 'UDPTraffic',
325 'UDPFragment')
326
327 def validate(self):
328 if self.data.get('name') not in self.metrics:
329 raise PolicyValidationError(
330 "invalid shield metric %s valid:%s on %s" % (
331 self.data['name'],
332 ", ".join(self.metrics),
333 self.manager.data))
334
335 def get_dimensions(self, resource):
336 return [{
337 'Name': 'ResourceArn',
338 'Value': self.manager.get_arns([resource])[0]}]
339
340 def process(self, resources, event=None):
341 self.data['namespace'] = self.namespace
342 return super(ShieldMetrics, self).process(resources, event)