Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/c7n/resources/emr.py: 62%
239 statements
« prev ^ index » next coverage.py v7.3.2, created at 2023-12-08 06:51 +0000
« prev ^ index » next coverage.py v7.3.2, created at 2023-12-08 06:51 +0000
1# Copyright The Cloud Custodian Authors.
2# SPDX-License-Identifier: Apache-2.0
3import logging
4import time
5import json
7from c7n.actions import ActionRegistry, BaseAction
8from c7n.exceptions import PolicyValidationError
9from c7n.filters import FilterRegistry, MetricsFilter, ValueFilter
10from c7n.manager import resources
11from c7n.query import QueryResourceManager, TypeInfo, ConfigSource, DescribeSource
12from c7n.tags import universal_augment
13from c7n.utils import (
14 local_session, type_schema, get_retry, jmespath_search)
15from c7n.tags import (
16 TagDelayedAction, RemoveTag, TagActionFilter, Tag)
17import c7n.filters.vpc as net_filters
19filters = FilterRegistry('emr.filters')
20actions = ActionRegistry('emr.actions')
21log = logging.getLogger('custodian.emr')
23filters.register('marked-for-op', TagActionFilter)
26@resources.register('emr')
27class EMRCluster(QueryResourceManager):
28 """Resource manager for Elastic MapReduce clusters
29 """
31 class resource_type(TypeInfo):
32 service = 'emr'
33 arn_type = 'emr'
34 permission_prefix = 'elasticmapreduce'
35 default_cluster_states = ['WAITING', 'BOOTSTRAPPING', 'RUNNING', 'STARTING']
36 enum_spec = ('list_clusters', 'Clusters', None)
37 name = 'Name'
38 id = 'Id'
39 date = "Status.Timeline.CreationDateTime"
40 cfn_type = 'AWS::EMR::Cluster'
42 action_registry = actions
43 filter_registry = filters
44 retry = staticmethod(get_retry(('ThrottlingException',)))
46 def __init__(self, ctx, data):
47 super(EMRCluster, self).__init__(ctx, data)
48 self.queries = QueryFilter.parse(
49 self.data.get('query', []))
51 @classmethod
52 def get_permissions(cls):
53 return ("elasticmapreduce:ListClusters",
54 "elasticmapreduce:DescribeCluster")
56 def get_resources(self, ids):
57 # no filtering by id set supported at the api
58 client = local_session(self.session_factory).client('emr')
59 results = []
60 for jid in ids:
61 results.append(
62 client.describe_cluster(ClusterId=jid)['Cluster'])
63 return results
65 def resources(self, query=None):
66 q = self.consolidate_query_filter()
67 if q is not None:
68 query = query or {}
69 for i in range(0, len(q)):
70 query[q[i]['Name']] = q[i]['Values']
71 return super(EMRCluster, self).resources(query=query)
73 def consolidate_query_filter(self):
74 result = []
75 names = set()
76 # allow same name to be specified multiple times and append the queries
77 # under the same name
78 for q in self.queries:
79 query_filter = q.query()
80 if query_filter['Name'] in names:
81 for filt in result:
82 if query_filter['Name'] == filt['Name']:
83 filt['Values'].extend(query_filter['Values'])
84 else:
85 names.add(query_filter['Name'])
86 result.append(query_filter)
87 if 'ClusterStates' not in names:
88 # include default query
89 result.append(
90 {
91 'Name': 'ClusterStates',
92 'Values': self.resource_type.default_cluster_states
93 }
94 )
95 return result
97 def augment(self, resources):
98 client = local_session(
99 self.get_resource_manager('emr').session_factory).client('emr')
100 result = []
101 # remap for cwmetrics
102 for r in resources:
103 cluster = self.retry(
104 client.describe_cluster, ClusterId=r['Id'])['Cluster']
105 result.append(cluster)
106 return result
109@EMRCluster.filter_registry.register('metrics')
110class EMRMetrics(MetricsFilter):
112 def get_dimensions(self, resource):
113 # Job flow id is legacy name for cluster id
114 return [{'Name': 'JobFlowId', 'Value': resource['Id']}]
117@actions.register('mark-for-op')
118class TagDelayedAction(TagDelayedAction):
119 """Action to specify an action to occur at a later date
121 :example:
123 .. code-block:: yaml
125 policies:
126 - name: emr-mark-for-op
127 resource: emr
128 filters:
129 - "tag:Name": absent
130 actions:
131 - type: mark-for-op
132 tag: custodian_cleanup
133 op: terminate
134 days: 4
135 msg: "Cluster does not have required tags"
136 """
139@actions.register('tag')
140class TagTable(Tag):
141 """Action to create tag(s) on a resource
143 :example:
145 .. code-block:: yaml
147 policies:
148 - name: emr-tag-table
149 resource: emr
150 filters:
151 - "tag:target-tag": absent
152 actions:
153 - type: tag
154 key: target-tag
155 value: target-tag-value
156 """
158 permissions = ('elasticmapreduce:AddTags',)
159 batch_size = 1
160 retry = staticmethod(get_retry(('ThrottlingException',)))
162 def process_resource_set(self, client, resources, tags):
163 for r in resources:
164 self.retry(client.add_tags, ResourceId=r['Id'], Tags=tags)
167@actions.register('remove-tag')
168class UntagTable(RemoveTag):
169 """Action to remove tag(s) on a resource
171 :example:
173 .. code-block:: yaml
175 policies:
176 - name: emr-remove-tag
177 resource: emr
178 filters:
179 - "tag:target-tag": present
180 actions:
181 - type: remove-tag
182 tags: ["target-tag"]
183 """
185 concurrency = 2
186 batch_size = 5
187 permissions = ('elasticmapreduce:RemoveTags',)
189 def process_resource_set(self, client, resources, tag_keys):
190 for r in resources:
191 client.remove_tags(ResourceId=r['Id'], TagKeys=tag_keys)
194@actions.register('terminate')
195class Terminate(BaseAction):
196 """Action to terminate EMR cluster(s)
198 It is recommended to apply a filter to the terminate action to avoid
199 termination of all EMR clusters
201 :example:
203 .. code-block:: yaml
205 policies:
206 - name: emr-terminate
207 resource: emr
208 query:
209 - ClusterStates: [STARTING, BOOTSTRAPPING, RUNNING, WAITING]
210 actions:
211 - terminate
212 """
214 schema = type_schema('terminate', force={'type': 'boolean'})
215 permissions = ("elasticmapreduce:TerminateJobFlows",)
216 delay = 5
218 def process(self, emrs):
219 client = local_session(self.manager.session_factory).client('emr')
220 cluster_ids = [emr['Id'] for emr in emrs]
221 if self.data.get('force'):
222 client.set_termination_protection(
223 JobFlowIds=cluster_ids, TerminationProtected=False)
224 time.sleep(self.delay)
225 client.terminate_job_flows(JobFlowIds=cluster_ids)
226 self.log.info("Deleted emrs: %s", cluster_ids)
227 return emrs
230# Valid EMR Query Filters
231EMR_VALID_FILTERS = {'CreatedAfter', 'CreatedBefore', 'ClusterStates'}
234class QueryFilter:
236 @classmethod
237 def parse(cls, data):
238 results = []
239 for d in data:
240 if not isinstance(d, dict):
241 raise PolicyValidationError(
242 "EMR Query Filter Invalid structure %s" % d)
243 results.append(cls(d).validate())
244 return results
246 def __init__(self, data):
247 self.data = data
248 self.key = None
249 self.value = None
251 def validate(self):
252 if not len(list(self.data.keys())) == 1:
253 raise PolicyValidationError(
254 "EMR Query Filter Invalid %s" % self.data)
255 self.key = list(self.data.keys())[0]
256 self.value = list(self.data.values())[0]
258 if self.key not in EMR_VALID_FILTERS and not self.key.startswith(
259 'tag:'):
260 raise PolicyValidationError(
261 "EMR Query Filter invalid filter name %s" % (self.data))
263 if self.value is None:
264 raise PolicyValidationError(
265 "EMR Query Filters must have a value, use tag-key"
266 " w/ tag name as value for tag present checks"
267 " %s" % self.data)
268 return self
270 def query(self):
271 value = self.value
272 if isinstance(self.value, str):
273 value = [self.value]
275 return {'Name': self.key, 'Values': value}
278@filters.register('subnet')
279class SubnetFilter(net_filters.SubnetFilter):
281 RelatedIdsExpression = "Ec2InstanceAttributes.RequestedEc2SubnetIds[]"
284@filters.register('security-group')
285class SecurityGroupFilter(net_filters.SecurityGroupFilter):
287 RelatedIdsExpression = ""
288 expressions = ('Ec2InstanceAttributes.EmrManagedMasterSecurityGroup',
289 'Ec2InstanceAttributes.EmrManagedSlaveSecurityGroup',
290 'Ec2InstanceAttributes.ServiceAccessSecurityGroup',
291 'Ec2InstanceAttributes.AdditionalMasterSecurityGroups[]',
292 'Ec2InstanceAttributes.AdditionalSlaveSecurityGroups[]')
294 def get_related_ids(self, resources):
295 sg_ids = set()
296 for r in resources:
297 for exp in self.expressions:
298 ids = jmespath_search(exp, r)
299 if isinstance(ids, list):
300 sg_ids.update(tuple(ids))
301 elif isinstance(ids, str):
302 sg_ids.add(ids)
303 return list(sg_ids)
306filters.register('network-location', net_filters.NetworkLocation)
309@filters.register('security-configuration')
310class EMRSecurityConfigurationFilter(ValueFilter):
311 """Filter for annotate security configuration and
312 filter based on its attributes.
314 :example:
316 .. code-block:: yaml
318 policies:
319 - name: emr-security-configuration
320 resource: emr
321 filters:
322 - type: security-configuration
323 key: EnableAtRestEncryption
324 value: true
326 """
327 annotation_key = 'c7n:SecurityConfiguration'
328 permissions = ("elasticmapreduce:ListSecurityConfigurations",
329 "elasticmapreduce:DescribeSecurityConfiguration",)
330 schema = type_schema('security-configuration', rinherit=ValueFilter.schema)
331 schema_alias = False
333 def process(self, resources, event=None):
334 results = []
335 emr_sec_cfgs = {
336 cfg['Name']: cfg for cfg in self.manager.get_resource_manager(
337 'emr-security-configuration').resources()}
338 for r in resources:
339 if 'SecurityConfiguration' not in r:
340 continue
341 cfg = emr_sec_cfgs.get(r['SecurityConfiguration'], {}).get('SecurityConfiguration', {})
342 if self.match(cfg):
343 r[self.annotation_key] = cfg
344 results.append(r)
345 return results
348@resources.register('emr-security-configuration')
349class EMRSecurityConfiguration(QueryResourceManager):
350 """Resource manager for EMR Security Configuration
351 """
353 class resource_type(TypeInfo):
354 service = 'emr'
355 arn_type = 'emr'
356 permission_prefix = 'elasticmapreduce'
357 enum_spec = ('list_security_configurations', 'SecurityConfigurations', None)
358 detail_spec = ('describe_security_configuration', 'Name', 'Name', None)
359 id = name = 'Name'
360 cfn_type = 'AWS::EMR::SecurityConfiguration'
362 permissions = ('elasticmapreduce:ListSecurityConfigurations',
363 'elasticmapreduce:DescribeSecurityConfiguration',)
365 def augment(self, resources):
366 resources = super().augment(resources)
367 for r in resources:
368 r['SecurityConfiguration'] = json.loads(r['SecurityConfiguration'])
369 return resources
372@EMRSecurityConfiguration.action_registry.register('delete')
373class DeleteEMRSecurityConfiguration(BaseAction):
375 schema = type_schema('delete')
376 permissions = ('elasticmapreduce:DeleteSecurityConfiguration',)
378 def process(self, resources):
379 client = local_session(self.manager.session_factory).client('emr')
380 for r in resources:
381 try:
382 client.delete_security_configuration(Name=r['Name'])
383 except client.exceptions.EntityNotFoundException:
384 continue
387class DescribeEMRServerlessApp(DescribeSource):
389 def augment(self, resources):
390 return universal_augment(
391 self.manager,
392 super().augment(resources))
395@resources.register('emr-serverless-app')
396class EMRServerless(QueryResourceManager):
397 """Resource manager for Elastic MapReduce Serverless Application
398 """
400 class resource_type(TypeInfo):
401 service = 'emr-serverless'
402 enum_spec = ('list_applications', 'applications', None)
403 arn = 'arn'
404 arn_type = '/applications'
405 name = 'name'
406 id = 'id'
407 date = "createdAt"
408 cfn_type = 'AWS::EMRServerless::Application'
410 source_mapping = {
411 'describe': DescribeEMRServerlessApp,
412 'config': ConfigSource
413 }
416EMRServerless.action_registry.register('mark-for-op', TagDelayedAction)
417EMRServerless.filter_registry.register('marked-for-op', TagActionFilter)
420@EMRServerless.action_registry.register('tag')
421class EMRServerlessTag(Tag):
422 """Action to create tag(s) on EMR-Serverless
424 :example:
426 .. code-block:: yaml
428 policies:
429 - name: tag-emr-serverless
430 resource: emr-serverless-app
431 filters:
432 - "tag:target-tag": absent
433 actions:
434 - type: tag
435 key: target-tag
436 value: target-tag-value
437 """
439 permissions = ('emr-serverless:TagResource',)
441 def process_resource_set(self, client, resource_set, tags):
442 Tags = {r['Key']: r['Value'] for r in tags}
443 for r in resource_set:
444 client.tag_resource(resourceArn=r['arn'], tags=Tags)
447@EMRServerless.action_registry.register("remove-tag")
448class EMRServerlessRemoveTag(RemoveTag):
449 """Action to create tag(s) on EMR-Serverless
451 :example:
453 .. code-block:: yaml
455 policies:
456 - name: untag-emr-serverless
457 resource: emr-serverless-app
458 filters:
459 - "tag:target-tag": present
460 actions:
461 - type: remove-tag
462 tags: ["target-tag"]
463 """
464 permissions = ('emr-serverless:UntagResource',)
466 def process_resource_set(self, client, resource_set, tags):
467 for r in resource_set:
468 client.untag_resource(resourceArn=r['arn'], tagKeys=tags)
471@EMRServerless.action_registry.register("delete")
472class EMRServerlessDelete(BaseAction):
473 """Deletes an EMRServerless application
474 :example:
476 .. code-block:: yaml
478 policies:
479 - name: delete-emr-serverless-app
480 resource: emr-serverless-app
481 actions:
482 - type: delete
483 """
484 schema = type_schema('delete')
485 permissions = ('emr-serverless:DeleteApplication',)
487 def process(self, resources):
488 client = local_session(self.manager.session_factory).client('emr-serverless')
489 for r in resources:
490 try:
491 client.delete_application(
492 applicationId=r['id']
493 )
494 except client.exceptions.ResourceNotFoundException:
495 continue