1# Copyright The Cloud Custodian Authors.
2# SPDX-License-Identifier: Apache-2.0
3import logging
4import time
5import json
6
7from c7n.actions import ActionRegistry, BaseAction
8from c7n.exceptions import PolicyValidationError
9from c7n.filters import FilterRegistry, MetricsFilter, ValueFilter
10from c7n.manager import resources
11from c7n.query import QueryResourceManager, TypeInfo, ConfigSource, DescribeSource
12from c7n.tags import universal_augment
13from c7n.utils import (
14 local_session, type_schema, get_retry, jmespath_search)
15from c7n.tags import (
16 TagDelayedAction, RemoveTag, TagActionFilter, Tag)
17import c7n.filters.vpc as net_filters
18
19filters = FilterRegistry('emr.filters')
20actions = ActionRegistry('emr.actions')
21log = logging.getLogger('custodian.emr')
22
23filters.register('marked-for-op', TagActionFilter)
24
25
26@resources.register('emr')
27class EMRCluster(QueryResourceManager):
28 """Resource manager for Elastic MapReduce clusters
29 """
30
31 class resource_type(TypeInfo):
32 service = 'emr'
33 arn_type = 'emr'
34 permission_prefix = 'elasticmapreduce'
35 default_cluster_states = ['WAITING', 'BOOTSTRAPPING', 'RUNNING', 'STARTING']
36 enum_spec = ('list_clusters', 'Clusters', None)
37 name = 'Name'
38 id = 'Id'
39 date = "Status.Timeline.CreationDateTime"
40 cfn_type = 'AWS::EMR::Cluster'
41
42 action_registry = actions
43 filter_registry = filters
44 retry = staticmethod(get_retry(('ThrottlingException',)))
45
46 def __init__(self, ctx, data):
47 super(EMRCluster, self).__init__(ctx, data)
48 self.queries = QueryFilter.parse(
49 self.data.get('query', []))
50
51 @classmethod
52 def get_permissions(cls):
53 return ("elasticmapreduce:ListClusters",
54 "elasticmapreduce:DescribeCluster")
55
56 def get_resources(self, ids):
57 # no filtering by id set supported at the api
58 client = local_session(self.session_factory).client('emr')
59 results = []
60 for jid in ids:
61 results.append(
62 client.describe_cluster(ClusterId=jid)['Cluster'])
63 return results
64
65 def resources(self, query=None):
66 q = self.consolidate_query_filter()
67 if q is not None:
68 query = query or {}
69 for i in range(0, len(q)):
70 query[q[i]['Name']] = q[i]['Values']
71 return super(EMRCluster, self).resources(query=query)
72
73 def consolidate_query_filter(self):
74 result = []
75 names = set()
76 # allow same name to be specified multiple times and append the queries
77 # under the same name
78 for q in self.queries:
79 query_filter = q.query()
80 if query_filter['Name'] in names:
81 for filt in result:
82 if query_filter['Name'] == filt['Name']:
83 filt['Values'].extend(query_filter['Values'])
84 else:
85 names.add(query_filter['Name'])
86 result.append(query_filter)
87 if 'ClusterStates' not in names:
88 # include default query
89 result.append(
90 {
91 'Name': 'ClusterStates',
92 'Values': self.resource_type.default_cluster_states
93 }
94 )
95 return result
96
97 def augment(self, resources):
98 client = local_session(
99 self.get_resource_manager('emr').session_factory).client('emr')
100 result = []
101 # remap for cwmetrics
102 for r in resources:
103 cluster = self.retry(
104 client.describe_cluster, ClusterId=r['Id'])['Cluster']
105 result.append(cluster)
106 return result
107
108
109@EMRCluster.filter_registry.register('metrics')
110class EMRMetrics(MetricsFilter):
111
112 def get_dimensions(self, resource):
113 # Job flow id is legacy name for cluster id
114 return [{'Name': 'JobFlowId', 'Value': resource['Id']}]
115
116
117@actions.register('mark-for-op')
118class TagDelayedAction(TagDelayedAction):
119 """Action to specify an action to occur at a later date
120
121 :example:
122
123 .. code-block:: yaml
124
125 policies:
126 - name: emr-mark-for-op
127 resource: emr
128 filters:
129 - "tag:Name": absent
130 actions:
131 - type: mark-for-op
132 tag: custodian_cleanup
133 op: terminate
134 days: 4
135 msg: "Cluster does not have required tags"
136 """
137
138
139@actions.register('tag')
140class TagTable(Tag):
141 """Action to create tag(s) on a resource
142
143 :example:
144
145 .. code-block:: yaml
146
147 policies:
148 - name: emr-tag-table
149 resource: emr
150 filters:
151 - "tag:target-tag": absent
152 actions:
153 - type: tag
154 key: target-tag
155 value: target-tag-value
156 """
157
158 permissions = ('elasticmapreduce:AddTags',)
159 batch_size = 1
160 retry = staticmethod(get_retry(('ThrottlingException',)))
161
162 def process_resource_set(self, client, resources, tags):
163 for r in resources:
164 self.retry(client.add_tags, ResourceId=r['Id'], Tags=tags)
165
166
167@actions.register('remove-tag')
168class UntagTable(RemoveTag):
169 """Action to remove tag(s) on a resource
170
171 :example:
172
173 .. code-block:: yaml
174
175 policies:
176 - name: emr-remove-tag
177 resource: emr
178 filters:
179 - "tag:target-tag": present
180 actions:
181 - type: remove-tag
182 tags: ["target-tag"]
183 """
184
185 concurrency = 2
186 batch_size = 5
187 permissions = ('elasticmapreduce:RemoveTags',)
188
189 def process_resource_set(self, client, resources, tag_keys):
190 for r in resources:
191 client.remove_tags(ResourceId=r['Id'], TagKeys=tag_keys)
192
193
194@actions.register('terminate')
195class Terminate(BaseAction):
196 """Action to terminate EMR cluster(s)
197
198 It is recommended to apply a filter to the terminate action to avoid
199 termination of all EMR clusters
200
201 :example:
202
203 .. code-block:: yaml
204
205 policies:
206 - name: emr-terminate
207 resource: emr
208 query:
209 - ClusterStates: [STARTING, BOOTSTRAPPING, RUNNING, WAITING]
210 actions:
211 - terminate
212 """
213
214 schema = type_schema('terminate', force={'type': 'boolean'})
215 permissions = ("elasticmapreduce:TerminateJobFlows",)
216 delay = 5
217
218 def process(self, emrs):
219 client = local_session(self.manager.session_factory).client('emr')
220 cluster_ids = [emr['Id'] for emr in emrs]
221 if self.data.get('force'):
222 client.set_termination_protection(
223 JobFlowIds=cluster_ids, TerminationProtected=False)
224 time.sleep(self.delay)
225 client.terminate_job_flows(JobFlowIds=cluster_ids)
226 self.log.info("Deleted emrs: %s", cluster_ids)
227 return emrs
228
229
230# Valid EMR Query Filters
231EMR_VALID_FILTERS = {'CreatedAfter', 'CreatedBefore', 'ClusterStates'}
232
233
234class QueryFilter:
235
236 @classmethod
237 def parse(cls, data):
238 results = []
239 for d in data:
240 if not isinstance(d, dict):
241 raise PolicyValidationError(
242 "EMR Query Filter Invalid structure %s" % d)
243 results.append(cls(d).validate())
244 return results
245
246 def __init__(self, data):
247 self.data = data
248 self.key = None
249 self.value = None
250
251 def validate(self):
252 if not len(list(self.data.keys())) == 1:
253 raise PolicyValidationError(
254 "EMR Query Filter Invalid %s" % self.data)
255 self.key = list(self.data.keys())[0]
256 self.value = list(self.data.values())[0]
257
258 if self.key not in EMR_VALID_FILTERS and not self.key.startswith(
259 'tag:'):
260 raise PolicyValidationError(
261 "EMR Query Filter invalid filter name %s" % (self.data))
262
263 if self.value is None:
264 raise PolicyValidationError(
265 "EMR Query Filters must have a value, use tag-key"
266 " w/ tag name as value for tag present checks"
267 " %s" % self.data)
268 return self
269
270 def query(self):
271 value = self.value
272 if isinstance(self.value, str):
273 value = [self.value]
274
275 return {'Name': self.key, 'Values': value}
276
277
278@filters.register('subnet')
279class SubnetFilter(net_filters.SubnetFilter):
280
281 RelatedIdsExpression = "Ec2InstanceAttributes.RequestedEc2SubnetIds[]"
282
283
284@filters.register('security-group')
285class SecurityGroupFilter(net_filters.SecurityGroupFilter):
286
287 RelatedIdsExpression = ""
288 expressions = ('Ec2InstanceAttributes.EmrManagedMasterSecurityGroup',
289 'Ec2InstanceAttributes.EmrManagedSlaveSecurityGroup',
290 'Ec2InstanceAttributes.ServiceAccessSecurityGroup',
291 'Ec2InstanceAttributes.AdditionalMasterSecurityGroups[]',
292 'Ec2InstanceAttributes.AdditionalSlaveSecurityGroups[]')
293
294 def get_related_ids(self, resources):
295 sg_ids = set()
296 for r in resources:
297 for exp in self.expressions:
298 ids = jmespath_search(exp, r)
299 if isinstance(ids, list):
300 sg_ids.update(tuple(ids))
301 elif isinstance(ids, str):
302 sg_ids.add(ids)
303 return list(sg_ids)
304
305
306filters.register('network-location', net_filters.NetworkLocation)
307
308
309@filters.register('security-configuration')
310class EMRSecurityConfigurationFilter(ValueFilter):
311 """Filter for annotate security configuration and
312 filter based on its attributes.
313
314 :example:
315
316 .. code-block:: yaml
317
318 policies:
319 - name: emr-security-configuration
320 resource: emr
321 filters:
322 - type: security-configuration
323 key: EnableAtRestEncryption
324 value: true
325
326 """
327 annotation_key = 'c7n:SecurityConfiguration'
328 permissions = ("elasticmapreduce:ListSecurityConfigurations",
329 "elasticmapreduce:DescribeSecurityConfiguration",)
330 schema = type_schema('security-configuration', rinherit=ValueFilter.schema)
331 schema_alias = False
332
333 def process(self, resources, event=None):
334 results = []
335 emr_sec_cfgs = {
336 cfg['Name']: cfg for cfg in self.manager.get_resource_manager(
337 'emr-security-configuration').resources()}
338 for r in resources:
339 if 'SecurityConfiguration' not in r:
340 continue
341 cfg = emr_sec_cfgs.get(r['SecurityConfiguration'], {}).get('SecurityConfiguration', {})
342 if self.match(cfg):
343 r[self.annotation_key] = cfg
344 results.append(r)
345 return results
346
347
348@resources.register('emr-security-configuration')
349class EMRSecurityConfiguration(QueryResourceManager):
350 """Resource manager for EMR Security Configuration
351 """
352
353 class resource_type(TypeInfo):
354 service = 'emr'
355 arn_type = 'emr'
356 permission_prefix = 'elasticmapreduce'
357 enum_spec = ('list_security_configurations', 'SecurityConfigurations', None)
358 detail_spec = ('describe_security_configuration', 'Name', 'Name', None)
359 id = name = 'Name'
360 cfn_type = 'AWS::EMR::SecurityConfiguration'
361
362 permissions = ('elasticmapreduce:ListSecurityConfigurations',
363 'elasticmapreduce:DescribeSecurityConfiguration',)
364
365 def augment(self, resources):
366 resources = super().augment(resources)
367 for r in resources:
368 r['SecurityConfiguration'] = json.loads(r['SecurityConfiguration'])
369 return resources
370
371
372@EMRSecurityConfiguration.action_registry.register('delete')
373class DeleteEMRSecurityConfiguration(BaseAction):
374
375 schema = type_schema('delete')
376 permissions = ('elasticmapreduce:DeleteSecurityConfiguration',)
377
378 def process(self, resources):
379 client = local_session(self.manager.session_factory).client('emr')
380 for r in resources:
381 try:
382 client.delete_security_configuration(Name=r['Name'])
383 except client.exceptions.EntityNotFoundException:
384 continue
385
386
387class DescribeEMRServerlessApp(DescribeSource):
388
389 def augment(self, resources):
390 return universal_augment(
391 self.manager,
392 super().augment(resources))
393
394
395@resources.register('emr-serverless-app')
396class EMRServerless(QueryResourceManager):
397 """Resource manager for Elastic MapReduce Serverless Application
398 """
399
400 class resource_type(TypeInfo):
401 service = 'emr-serverless'
402 enum_spec = ('list_applications', 'applications', None)
403 arn = 'arn'
404 arn_type = '/applications'
405 name = 'name'
406 id = 'id'
407 date = "createdAt"
408 cfn_type = 'AWS::EMRServerless::Application'
409
410 source_mapping = {
411 'describe': DescribeEMRServerlessApp,
412 'config': ConfigSource
413 }
414
415
416EMRServerless.action_registry.register('mark-for-op', TagDelayedAction)
417EMRServerless.filter_registry.register('marked-for-op', TagActionFilter)
418
419
420@EMRServerless.action_registry.register('tag')
421class EMRServerlessTag(Tag):
422 """Action to create tag(s) on EMR-Serverless
423
424 :example:
425
426 .. code-block:: yaml
427
428 policies:
429 - name: tag-emr-serverless
430 resource: emr-serverless-app
431 filters:
432 - "tag:target-tag": absent
433 actions:
434 - type: tag
435 key: target-tag
436 value: target-tag-value
437 """
438
439 permissions = ('emr-serverless:TagResource',)
440
441 def process_resource_set(self, client, resource_set, tags):
442 Tags = {r['Key']: r['Value'] for r in tags}
443 for r in resource_set:
444 client.tag_resource(resourceArn=r['arn'], tags=Tags)
445
446
447@EMRServerless.action_registry.register("remove-tag")
448class EMRServerlessRemoveTag(RemoveTag):
449 """Action to create tag(s) on EMR-Serverless
450
451 :example:
452
453 .. code-block:: yaml
454
455 policies:
456 - name: untag-emr-serverless
457 resource: emr-serverless-app
458 filters:
459 - "tag:target-tag": present
460 actions:
461 - type: remove-tag
462 tags: ["target-tag"]
463 """
464 permissions = ('emr-serverless:UntagResource',)
465
466 def process_resource_set(self, client, resource_set, tags):
467 for r in resource_set:
468 client.untag_resource(resourceArn=r['arn'], tagKeys=tags)
469
470
471@EMRServerless.action_registry.register("delete")
472class EMRServerlessDelete(BaseAction):
473 """Deletes an EMRServerless application
474 :example:
475
476 .. code-block:: yaml
477
478 policies:
479 - name: delete-emr-serverless-app
480 resource: emr-serverless-app
481 actions:
482 - type: delete
483 """
484 schema = type_schema('delete')
485 permissions = ('emr-serverless:DeleteApplication',)
486
487 def process(self, resources):
488 client = local_session(self.manager.session_factory).client('emr-serverless')
489 for r in resources:
490 try:
491 client.delete_application(
492 applicationId=r['id']
493 )
494 except client.exceptions.ResourceNotFoundException:
495 continue