Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/c7n/resources/emr.py: 62%
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1# Copyright The Cloud Custodian Authors.
2# SPDX-License-Identifier: Apache-2.0
3import logging
4import time
5import json
7from c7n.actions import ActionRegistry, BaseAction
8from c7n.filters import FilterRegistry, MetricsFilter, ValueFilter
9from c7n.manager import resources
10from c7n.query import QueryResourceManager, TypeInfo, ConfigSource, DescribeSource
11from c7n.tags import universal_augment
12from c7n.utils import (
13 local_session, type_schema, get_retry, jmespath_search, QueryParser)
14from c7n.tags import (
15 TagDelayedAction, RemoveTag, TagActionFilter, Tag)
16import c7n.filters.vpc as net_filters
18filters = FilterRegistry('emr.filters')
19actions = ActionRegistry('emr.actions')
20log = logging.getLogger('custodian.emr')
22filters.register('marked-for-op', TagActionFilter)
25@resources.register('emr')
26class EMRCluster(QueryResourceManager):
27 """Resource manager for Elastic MapReduce clusters
28 """
30 class resource_type(TypeInfo):
31 service = 'emr'
32 arn_type = 'emr'
33 permission_prefix = 'elasticmapreduce'
34 default_cluster_states = ['WAITING', 'BOOTSTRAPPING', 'RUNNING', 'STARTING']
35 enum_spec = ('list_clusters', 'Clusters', None)
36 name = 'Name'
37 id = 'Id'
38 date = "Status.Timeline.CreationDateTime"
39 cfn_type = 'AWS::EMR::Cluster'
41 action_registry = actions
42 filter_registry = filters
43 retry = staticmethod(get_retry(('ThrottlingException',)))
45 def __init__(self, ctx, data):
46 super(EMRCluster, self).__init__(ctx, data)
47 self.queries = EMRQueryParser.parse(
48 self.data.get('query', []))
50 @classmethod
51 def get_permissions(cls):
52 return ("elasticmapreduce:ListClusters",
53 "elasticmapreduce:DescribeCluster")
55 def get_resources(self, ids):
56 # no filtering by id set supported at the api
57 client = local_session(self.session_factory).client('emr')
58 results = []
59 for jid in ids:
60 results.append(
61 client.describe_cluster(ClusterId=jid)['Cluster'])
62 return results
64 def resources(self, query=None):
65 query = query or {}
66 for q in self.queries:
67 query.update(q)
68 if 'ClusterStates' not in query:
69 query['ClusterStates'] = self.resource_type.default_cluster_states
70 return super(EMRCluster, self).resources(query=query)
72 def augment(self, resources):
73 client = local_session(
74 self.get_resource_manager('emr').session_factory).client('emr')
75 result = []
76 # remap for cwmetrics
77 for r in resources:
78 cluster = self.retry(
79 client.describe_cluster, ClusterId=r['Id'])['Cluster']
80 result.append(cluster)
81 return result
84@EMRCluster.filter_registry.register('metrics')
85class EMRMetrics(MetricsFilter):
87 def get_dimensions(self, resource):
88 # Job flow id is legacy name for cluster id
89 return [{'Name': 'JobFlowId', 'Value': resource['Id']}]
92@actions.register('mark-for-op')
93class TagDelayedAction(TagDelayedAction):
94 """Action to specify an action to occur at a later date
96 :example:
98 .. code-block:: yaml
100 policies:
101 - name: emr-mark-for-op
102 resource: emr
103 filters:
104 - "tag:Name": absent
105 actions:
106 - type: mark-for-op
107 tag: custodian_cleanup
108 op: terminate
109 days: 4
110 msg: "Cluster does not have required tags"
111 """
114@actions.register('tag')
115class TagTable(Tag):
116 """Action to create tag(s) on a resource
118 :example:
120 .. code-block:: yaml
122 policies:
123 - name: emr-tag-table
124 resource: emr
125 filters:
126 - "tag:target-tag": absent
127 actions:
128 - type: tag
129 key: target-tag
130 value: target-tag-value
131 """
133 permissions = ('elasticmapreduce:AddTags',)
134 batch_size = 1
135 retry = staticmethod(get_retry(('ThrottlingException',)))
137 def process_resource_set(self, client, resources, tags):
138 for r in resources:
139 self.retry(client.add_tags, ResourceId=r['Id'], Tags=tags)
142@actions.register('remove-tag')
143class UntagTable(RemoveTag):
144 """Action to remove tag(s) on a resource
146 :example:
148 .. code-block:: yaml
150 policies:
151 - name: emr-remove-tag
152 resource: emr
153 filters:
154 - "tag:target-tag": present
155 actions:
156 - type: remove-tag
157 tags: ["target-tag"]
158 """
160 concurrency = 2
161 batch_size = 5
162 permissions = ('elasticmapreduce:RemoveTags',)
164 def process_resource_set(self, client, resources, tag_keys):
165 for r in resources:
166 client.remove_tags(ResourceId=r['Id'], TagKeys=tag_keys)
169@actions.register('terminate')
170class Terminate(BaseAction):
171 """Action to terminate EMR cluster(s)
173 It is recommended to apply a filter to the terminate action to avoid
174 termination of all EMR clusters
176 :example:
178 .. code-block:: yaml
180 policies:
181 - name: emr-terminate
182 resource: emr
183 query:
184 - ClusterStates: [STARTING, BOOTSTRAPPING, RUNNING, WAITING]
185 actions:
186 - terminate
187 """
189 schema = type_schema('terminate', force={'type': 'boolean'})
190 permissions = ("elasticmapreduce:TerminateJobFlows",)
191 delay = 5
193 def process(self, emrs):
194 client = local_session(self.manager.session_factory).client('emr')
195 cluster_ids = [emr['Id'] for emr in emrs]
196 if self.data.get('force'):
197 client.set_termination_protection(
198 JobFlowIds=cluster_ids, TerminationProtected=False)
199 time.sleep(self.delay)
200 client.terminate_job_flows(JobFlowIds=cluster_ids)
201 self.log.info("Deleted emrs: %s", cluster_ids)
202 return emrs
205class EMRQueryParser(QueryParser):
206 QuerySchema = {
207 'ClusterStates':
208 ('STARTING', 'BOOTSTRAPPING', 'RUNNING', 'WAITING', 'TERMINATING', 'TERMINATED',
209 'TERMINATED_WITH_ERRORS',),
210 'CreatedBefore': 'date',
211 'CreatedAfter': 'date',
212 }
213 single_value_fields = ('CreatedBefore', 'CreatedAfter')
215 type_name = "EMR"
218@filters.register('subnet')
219class SubnetFilter(net_filters.SubnetFilter):
221 RelatedIdsExpression = "Ec2InstanceAttributes.RequestedEc2SubnetIds[]"
224@filters.register('security-group')
225class SecurityGroupFilter(net_filters.SecurityGroupFilter):
227 RelatedIdsExpression = ""
228 expressions = ('Ec2InstanceAttributes.EmrManagedMasterSecurityGroup',
229 'Ec2InstanceAttributes.EmrManagedSlaveSecurityGroup',
230 'Ec2InstanceAttributes.ServiceAccessSecurityGroup',
231 'Ec2InstanceAttributes.AdditionalMasterSecurityGroups[]',
232 'Ec2InstanceAttributes.AdditionalSlaveSecurityGroups[]')
234 def get_related_ids(self, resources):
235 sg_ids = set()
236 for r in resources:
237 for exp in self.expressions:
238 ids = jmespath_search(exp, r)
239 if isinstance(ids, list):
240 sg_ids.update(tuple(ids))
241 elif isinstance(ids, str):
242 sg_ids.add(ids)
243 return list(sg_ids)
246filters.register('network-location', net_filters.NetworkLocation)
249@filters.register('security-configuration')
250class EMRSecurityConfigurationFilter(ValueFilter):
251 """Filter for annotate security configuration and
252 filter based on its attributes.
254 :example:
256 .. code-block:: yaml
258 policies:
259 - name: emr-security-configuration
260 resource: emr
261 filters:
262 - type: security-configuration
263 key: EnableAtRestEncryption
264 value: true
266 """
267 annotation_key = 'c7n:SecurityConfiguration'
268 permissions = ("elasticmapreduce:ListSecurityConfigurations",
269 "elasticmapreduce:DescribeSecurityConfiguration",)
270 schema = type_schema('security-configuration', rinherit=ValueFilter.schema)
271 schema_alias = False
273 def process(self, resources, event=None):
274 results = []
275 emr_sec_cfgs = {
276 cfg['Name']: cfg for cfg in self.manager.get_resource_manager(
277 'emr-security-configuration').resources()}
278 for r in resources:
279 if 'SecurityConfiguration' not in r:
280 continue
281 cfg = emr_sec_cfgs.get(r['SecurityConfiguration'], {}).get('SecurityConfiguration', {})
282 if self.match(cfg):
283 r[self.annotation_key] = cfg
284 results.append(r)
285 return results
288@resources.register('emr-security-configuration')
289class EMRSecurityConfiguration(QueryResourceManager):
290 """Resource manager for EMR Security Configuration
291 """
293 class resource_type(TypeInfo):
294 service = 'emr'
295 arn_type = 'emr'
296 permission_prefix = 'elasticmapreduce'
297 enum_spec = ('list_security_configurations', 'SecurityConfigurations', None)
298 detail_spec = ('describe_security_configuration', 'Name', 'Name', None)
299 id = name = 'Name'
300 cfn_type = 'AWS::EMR::SecurityConfiguration'
302 permissions = ('elasticmapreduce:ListSecurityConfigurations',
303 'elasticmapreduce:DescribeSecurityConfiguration',)
305 def augment(self, resources):
306 resources = super().augment(resources)
307 for r in resources:
308 r['SecurityConfiguration'] = json.loads(r['SecurityConfiguration'])
309 return resources
312@EMRSecurityConfiguration.action_registry.register('delete')
313class DeleteEMRSecurityConfiguration(BaseAction):
315 schema = type_schema('delete')
316 permissions = ('elasticmapreduce:DeleteSecurityConfiguration',)
318 def process(self, resources):
319 client = local_session(self.manager.session_factory).client('emr')
320 for r in resources:
321 try:
322 client.delete_security_configuration(Name=r['Name'])
323 except client.exceptions.EntityNotFoundException:
324 continue
327class DescribeEMRServerlessApp(DescribeSource):
329 def augment(self, resources):
330 return universal_augment(
331 self.manager,
332 super().augment(resources))
335@resources.register('emr-serverless-app')
336class EMRServerless(QueryResourceManager):
337 """Resource manager for Elastic MapReduce Serverless Application
338 """
340 class resource_type(TypeInfo):
341 service = 'emr-serverless'
342 enum_spec = ('list_applications', 'applications', None)
343 arn = 'arn'
344 arn_type = '/applications'
345 name = 'name'
346 id = 'id'
347 date = "createdAt"
348 cfn_type = 'AWS::EMRServerless::Application'
350 source_mapping = {
351 'describe': DescribeEMRServerlessApp,
352 'config': ConfigSource
353 }
356EMRServerless.action_registry.register('mark-for-op', TagDelayedAction)
357EMRServerless.filter_registry.register('marked-for-op', TagActionFilter)
360@EMRServerless.action_registry.register('tag')
361class EMRServerlessTag(Tag):
362 """Action to create tag(s) on EMR-Serverless
364 :example:
366 .. code-block:: yaml
368 policies:
369 - name: tag-emr-serverless
370 resource: emr-serverless-app
371 filters:
372 - "tag:target-tag": absent
373 actions:
374 - type: tag
375 key: target-tag
376 value: target-tag-value
377 """
379 permissions = ('emr-serverless:TagResource',)
381 def process_resource_set(self, client, resource_set, tags):
382 Tags = {r['Key']: r['Value'] for r in tags}
383 for r in resource_set:
384 client.tag_resource(resourceArn=r['arn'], tags=Tags)
387@EMRServerless.action_registry.register("remove-tag")
388class EMRServerlessRemoveTag(RemoveTag):
389 """Action to create tag(s) on EMR-Serverless
391 :example:
393 .. code-block:: yaml
395 policies:
396 - name: untag-emr-serverless
397 resource: emr-serverless-app
398 filters:
399 - "tag:target-tag": present
400 actions:
401 - type: remove-tag
402 tags: ["target-tag"]
403 """
404 permissions = ('emr-serverless:UntagResource',)
406 def process_resource_set(self, client, resource_set, tags):
407 for r in resource_set:
408 client.untag_resource(resourceArn=r['arn'], tagKeys=tags)
411@EMRServerless.action_registry.register("delete")
412class EMRServerlessDelete(BaseAction):
413 """Deletes an EMRServerless application
414 :example:
416 .. code-block:: yaml
418 policies:
419 - name: delete-emr-serverless-app
420 resource: emr-serverless-app
421 actions:
422 - type: delete
423 """
424 schema = type_schema('delete')
425 permissions = ('emr-serverless:DeleteApplication',)
427 def process(self, resources):
428 client = local_session(self.manager.session_factory).client('emr-serverless')
429 for r in resources:
430 try:
431 client.delete_application(
432 applicationId=r['id']
433 )
434 except client.exceptions.ResourceNotFoundException:
435 continue