1# Copyright The Cloud Custodian Authors.
2# SPDX-License-Identifier: Apache-2.0
3import copy
4from botocore.exceptions import ClientError
5
6from c7n.actions import AutoTagUser, AutoscalingBase, BaseAction
7from c7n.exceptions import PolicyExecutionError, PolicyValidationError
8from c7n.filters import MetricsFilter, ValueFilter, Filter
9from c7n.filters.costhub import CostHubRecommendation
10from c7n.filters.offhours import OffHour, OnHour
11import c7n.filters.vpc as net_filters
12from c7n.manager import resources
13from c7n.utils import local_session, chunks, get_retry, type_schema, group_by, jmespath_compile
14from c7n import query, utils
15from c7n.query import DescribeSource, ConfigSource
16from c7n.resources.aws import Arn
17from c7n.tags import Tag, TagDelayedAction, RemoveTag, TagActionFilter
18
19
20def ecs_tag_normalize(resources):
21 """normalize tag format on ecs resources to match common aws format."""
22 for r in resources:
23 if 'tags' in r:
24 r['Tags'] = [{'Key': t['key'], 'Value': t['value']} for t in r['tags']]
25 r.pop('tags')
26
27
28NEW_ARN_STYLE = ('container-instance', 'service', 'task')
29
30
31def ecs_taggable(model, r):
32 # Tag support requires new arn format
33 # https://docs.aws.amazon.com/AmazonECS/latest/developerguide/ecs-using-tags.html
34 #
35 # New arn format details
36 # https://docs.aws.amazon.com/AmazonECS/latest/developerguide/ecs-resource-ids.html
37 #
38 path_parts = r[model.id].rsplit(':', 1)[-1].split('/')
39 if path_parts[0] not in NEW_ARN_STYLE:
40 return True
41 return len(path_parts) > 2
42
43
44class ContainerConfigSource(ConfigSource):
45
46 preserve_empty = ()
47 preserve_case = {'Tags'}
48 mapped_keys = {}
49
50 @classmethod
51 def remap_keys(cls, resource):
52 for k, v in cls.mapped_keys.items():
53 if v in resource:
54 continue
55 if k not in resource:
56 continue
57 resource[v] = resource.pop(k)
58 return resource
59
60 @classmethod
61 def lower_keys(cls, data):
62 if isinstance(data, dict):
63 for k, v in list(data.items()):
64 if k in cls.preserve_case:
65 continue
66 lk = k[0].lower() + k[1:]
67 data[lk] = data.pop(k)
68 # describe doesn't return empty list/dict by default
69 if isinstance(v, (list, dict)) and not v and lk not in cls.preserve_empty:
70 data.pop(lk)
71 elif isinstance(v, (dict, list)):
72 data[lk] = cls.lower_keys(v)
73 elif isinstance(data, list):
74 return list(map(cls.lower_keys, data))
75 return data
76
77 def load_resource(self, item):
78 resource = self.lower_keys(super().load_resource(item))
79 if self.mapped_keys:
80 return self.remap_keys(resource)
81 return resource
82
83
84class ClusterDescribe(query.DescribeSource):
85
86 def augment(self, resources):
87 resources = super(ClusterDescribe, self).augment(resources)
88 ecs_tag_normalize(resources)
89 return resources
90
91
92@resources.register('ecs')
93class ECSCluster(query.QueryResourceManager):
94
95 class resource_type(query.TypeInfo):
96 service = 'ecs'
97 enum_spec = ('list_clusters', 'clusterArns', None)
98 batch_detail_spec = (
99 'describe_clusters', 'clusters', None, 'clusters', {
100 'include': ['TAGS', 'SETTINGS', 'CONFIGURATIONS']
101 })
102 name = "clusterName"
103 arn = id = "clusterArn"
104 arn_type = 'cluster'
105 config_type = cfn_type = 'AWS::ECS::Cluster'
106
107 source_mapping = {
108 'describe': ClusterDescribe,
109 'config': query.ConfigSource
110 }
111
112
113@ECSCluster.filter_registry.register('metrics')
114class ECSMetrics(MetricsFilter):
115
116 def get_dimensions(self, resource):
117 return [{'Name': 'ClusterName', 'Value': resource['clusterName']}]
118
119
120class ECSClusterResourceDescribeSource(query.ChildDescribeSource):
121
122 # We need an additional subclass of describe for ecs cluster.
123 #
124 # - Default child query just returns the child resources from
125 # enumeration op, for ecs clusters, enumeration just returns
126 # resources ids, we also need to retain the parent id for
127 # augmentation.
128 #
129 # - The default augmentation detail_spec/batch_detail_spec need additional
130 # handling for the string resources with parent id.
131 #
132
133 def __init__(self, manager):
134 self.manager = manager
135 self.query = query.ChildResourceQuery(
136 self.manager.session_factory, self.manager, capture_parent_id=True)
137
138 def get_resources(self, ids, cache=True):
139 """Retrieve ecs resources for serverless policies or related resources
140
141 Requires arns in new format.
142 https://docs.aws.amazon.com/AmazonECS/latest/userguide/ecs-resource-ids.html
143 """
144 cluster_resources = {}
145 for i in ids:
146 _, ident = i.rsplit(':', 1)
147 parts = ident.split('/', 2)
148 if len(parts) != 3:
149 raise PolicyExecutionError("New format ecs arn required")
150 cluster_resources.setdefault(parts[1], []).append(parts[2])
151
152 results = []
153 client = local_session(self.manager.session_factory).client('ecs')
154 for cid, resource_ids in cluster_resources.items():
155 results.extend(
156 self.process_cluster_resources(client, cid, resource_ids))
157 return results
158
159 def augment(self, resources):
160 parent_child_map = {}
161 for pid, r in resources:
162 parent_child_map.setdefault(pid, []).append(r)
163 results = []
164 with self.manager.executor_factory(
165 max_workers=self.manager.max_workers) as w:
166 client = local_session(self.manager.session_factory).client('ecs')
167 futures = {}
168 for pid, services in parent_child_map.items():
169 futures[
170 w.submit(
171 self.process_cluster_resources, client, pid, services)
172 ] = (pid, services)
173 for f in futures:
174 pid, services = futures[f]
175 if f.exception():
176 self.manager.log.warning(
177 'error fetching ecs resources for cluster %s: %s',
178 pid, f.exception())
179 continue
180 results.extend(f.result())
181 return results
182
183
184@query.sources.register('describe-ecs-service')
185class ECSServiceDescribeSource(ECSClusterResourceDescribeSource):
186
187 def process_cluster_resources(self, client, cluster_id, services):
188 results = []
189 for service_set in chunks(services, self.manager.chunk_size):
190 results.extend(
191 client.describe_services(
192 cluster=cluster_id,
193 include=['TAGS'],
194 services=service_set).get('services', []))
195 ecs_tag_normalize(results)
196 return results
197
198
199class ECSServiceConfigSource(ContainerConfigSource):
200 perserve_empty = {
201 'placementConstraints', 'placementStrategy',
202 'serviceRegistries', 'Tags', 'loadBalancers'}
203
204 mapped_keys = {
205 'role': 'roleArn', 'cluster': 'clusterArn'}
206
207
208@resources.register('ecs-service')
209class Service(query.ChildResourceManager):
210
211 chunk_size = 10
212
213 class resource_type(query.TypeInfo):
214 service = 'ecs'
215 name = 'serviceName'
216 arn = id = 'serviceArn'
217 enum_spec = ('list_services', 'serviceArns', None)
218 parent_spec = ('ecs', 'cluster', None)
219 supports_trailevents = True
220 config_type = cfn_type = 'AWS::ECS::Service'
221
222 source_mapping = {
223 'config': ECSServiceConfigSource,
224 'describe-child': ECSServiceDescribeSource,
225 'describe': ECSServiceDescribeSource,
226 }
227
228 def get_resources(self, ids, cache=True, augment=True):
229 return super(Service, self).get_resources(ids, cache, augment=False)
230
231
232@Service.filter_registry.register('metrics')
233class ServiceMetrics(MetricsFilter):
234
235 def get_dimensions(self, resource):
236 return [
237 {'Name': 'ClusterName', 'Value': resource['clusterArn'].rsplit('/')[-1]},
238 {'Name': 'ServiceName', 'Value': resource['serviceName']}]
239
240
241class RelatedTaskDefinitionFilter(ValueFilter):
242
243 schema = type_schema('task-definition', rinherit=ValueFilter.schema)
244 schema_alias = False
245 permissions = ('ecs:DescribeTaskDefinition',
246 'ecs:ListTaskDefinitions')
247 related_key = 'taskDefinition'
248
249 def process(self, resources, event=None):
250 self.task_defs = {t['taskDefinitionArn']: t for t in self.get_task_defs(resources)}
251 return super(RelatedTaskDefinitionFilter, self).process(resources)
252
253 def get_task_defs(self, resources):
254 task_def_ids = list({s[self.related_key] for s in resources})
255 task_def_manager = self.manager.get_resource_manager(
256 'ecs-task-definition')
257
258 # due to model difference (multi-level containment with
259 # multi-step resource iteration) and potential volume of
260 # resources, we break our abstractions a little in the name of
261 # efficiency wrt api usage.
262
263 # check to see if task def cache is already populated
264 key = task_def_manager.get_cache_key(None)
265 if self.manager._cache.get(key):
266 task_defs = task_def_manager.get_resources(task_def_ids)
267 # else just augment the ids
268 else:
269 task_defs = task_def_manager.augment(task_def_ids)
270 return task_defs
271
272 def __call__(self, i):
273 task = self.task_defs[i[self.related_key]]
274 return self.match(task)
275
276
277@Service.filter_registry.register('task-definition')
278class ServiceTaskDefinitionFilter(RelatedTaskDefinitionFilter):
279 """Filter services by their task definitions.
280
281 :Example:
282
283 Find any fargate services that are running with a particular
284 image in the task and stop them.
285
286 .. code-block:: yaml
287
288 policies:
289 - name: fargate-find-stop-image
290 resource: ecs-task
291 filters:
292 - launchType: FARGATE
293 - type: task-definition
294 key: "containerDefinitions[].image"
295 value: "elasticsearch/elasticsearch:6.4.3"
296 value_type: swap
297 op: contains
298 actions:
299 - type: stop
300 """
301
302
303@ECSCluster.filter_registry.register('ebs-storage')
304class Storage(ValueFilter):
305 """Filter clusters by configured EBS storage parameters.
306
307 :Example:
308
309 Find any ECS clusters that have instances that are using unencrypted EBS volumes.
310
311 .. code-block:: yaml
312
313 policies:
314 - name: encrypted-ebs-volumes
315 resource: ecs
316 filters:
317 - type: ebs-storage
318 key: Encrypted
319 value: true
320 """
321
322 schema = type_schema(
323 'ebs-storage', rinherit=ValueFilter.schema,
324 operator={'type': 'string', 'enum': ['or', 'and']},
325 )
326 schema_alias = False
327
328 def get_permissions(self):
329 return (self.manager.get_resource_manager('ebs').get_permissions() +
330 self.manager.get_resource_manager('ec2').get_permissions() +
331 self.manager.get_resource_manager('ecs').get_permissions()
332 )
333
334 def process(self, resources, event=None):
335 self.storage = self.get_storage(resources)
336 self.skip = []
337 self.operator = self.data.get(
338 'operator', 'or') == 'or' and any or all
339 return list(filter(self, resources))
340
341 def get_storage(self, resources):
342 manager = self.manager.get_resource_manager('ecs-container-instance')
343
344 storage = {}
345 for cluster_set in utils.chunks(resources, 200):
346 for cluster in cluster_set:
347 cluster["clusterArn"]
348 instances = manager.resources({
349 "cluster": cluster['clusterArn'],
350 }, augment=False)
351 instances = manager.get_resources(instances, augment=False)
352 storage[cluster["clusterArn"]] = []
353
354 for instance in instances:
355 storage[cluster["clusterArn"]].extend(self.get_ebs_volumes([instance["ec2InstanceId"]]))
356
357 return storage
358
359 def get_ebs_volumes(self, resources):
360 volumes = []
361 ec2_manager = self.manager.get_resource_manager('ec2')
362 ebs_manager = self.manager.get_resource_manager('ebs')
363 for instance_set in utils.chunks(resources, 200):
364 instance_set = ec2_manager.get_resources(instance_set)
365 volume_ids = []
366 for i in instance_set:
367 for bd in i.get('BlockDeviceMappings', ()):
368 if 'Ebs' not in bd:
369 continue
370 volume_ids.append(bd['Ebs']['VolumeId'])
371 for v in ebs_manager.get_resources(volume_ids):
372 if not v['Attachments']:
373 continue
374 volumes.append(v)
375 return volumes
376
377 def __call__(self, i):
378 storage = self.storage.get(i["clusterArn"])
379
380 if not storage:
381 return False
382 return self.operator(map(self.match, storage))
383
384
385@Service.filter_registry.register('subnet')
386class SubnetFilter(net_filters.SubnetFilter):
387
388 RelatedIdsExpression = ""
389 expressions = ('taskSets[].networkConfiguration.awsvpcConfiguration.subnets[]',
390 'deployments[].networkConfiguration.awsvpcConfiguration.subnets[]',
391 'networkConfiguration.awsvpcConfiguration.subnets[]')
392
393 def get_related_ids(self, resources):
394 subnet_ids = set()
395 for exp in self.expressions:
396 cexp = jmespath_compile(exp)
397 for r in resources:
398 ids = cexp.search(r)
399 if ids:
400 subnet_ids.update(ids)
401 return list(subnet_ids)
402
403
404@Service.filter_registry.register('security-group')
405class SGFilter(net_filters.SecurityGroupFilter):
406
407 RelatedIdsExpression = ""
408 expressions = ('taskSets[].networkConfiguration.awsvpcConfiguration.securityGroups[]',
409 'deployments[].networkConfiguration.awsvpcConfiguration.securityGroups[]',
410 'networkConfiguration.awsvpcConfiguration.securityGroups[]')
411
412 def get_related_ids(self, resources):
413 sg_ids = set()
414 for exp in self.expressions:
415 cexp = jmespath_compile(exp)
416 for r in resources:
417 ids = cexp.search(r)
418 if ids:
419 sg_ids.update(ids)
420 return list(sg_ids)
421
422
423@Service.filter_registry.register('network-location', net_filters.NetworkLocation)
424@Service.action_registry.register('modify-definition')
425class UpdateTemplate(BaseAction):
426
427 schema = type_schema(
428 'modify-definition',
429 properties={'type': 'object'},
430 )
431
432 permissions = ("ecs:RegisterTaskDefinition", "ecs:UpdateService")
433
434 def validate(self):
435 if self.data.get('properties'):
436 return
437 found = False
438 for f in self.manager.iter_filters():
439 if isinstance(f, CostHubRecommendation):
440 found = True
441 if not found:
442 raise PolicyValidationError(
443 "modify-definition: either properties specified or am optimization filter used"
444 )
445
446 def process(self, resources):
447 task_def_filter = ServiceTaskDefinitionFilter({}, self.manager)
448 task_defs = {t['taskDefinitionArn']: t for t in task_def_filter.get_task_defs(resources)}
449 client = local_session(self.manager.session_factory).client('ecs')
450
451 # we can only modify task definition when ecs is controlling the deployment.
452 resources = self.filter_resources(resources, "deploymentController.type", ("ECS",))
453
454 nack = 0
455 for r in resources:
456 r_task_def = task_defs[r[task_def_filter.related_key]]
457 m_task_def = self.get_target_task_def(r, r_task_def)
458 if m_task_def is None:
459 nack += 1
460 continue
461 response = client.register_task_definition(**m_task_def)
462 task_arn = response['taskDefinition']['taskDefinitionArn']
463 cluster, _ = Arn.parse(r['serviceArn']).resource.split('/', 1)
464 client.update_service(
465 cluster=cluster,
466 service=r['serviceName'],
467 taskDefinition=task_arn
468 )
469 if nack:
470 self.log.warning("modify-definition %d services not modified", nack)
471
472 task_def_normalized = [
473 "taskDefinitionArn", "revision", "status",
474 "registeredAt", "registeredBy", "requiresAttributes",
475 ]
476
477 task_def_remap = {
478 "compatibilities": "requiresCompatibilities",
479 }
480
481 def get_target_task_def(self, resource, current_task_def):
482 target_task_def = copy.deepcopy(current_task_def)
483 target_task_def.update(self.data.get('properties', {}))
484 cost_optimization = resource.get(CostHubRecommendation.annotation_key)
485
486 if cost_optimization and cost_optimization['actionType'] == 'Rightsize':
487 cpu, mem = cost_optimization["recommendedResourceSummary"].split("/")
488 cpu = int(float(cpu.split(" ")[0]))
489 mem = int(mem.split(" ")[0])
490 target_task_def["cpu"] = str(cpu)
491 target_task_def["memory"] = str(mem)
492 target_task_def = self.update_target_containers_size(current_task_def, target_task_def)
493
494 if target_task_def == current_task_def or target_task_def is None:
495 return
496
497 # normalize from describe to register formats
498 for k in self.task_def_normalized:
499 target_task_def.pop(k, None)
500 for ck, dk in self.task_def_remap.items():
501 if ck in target_task_def:
502 target_task_def[dk] = target_task_def.pop(ck)
503 tags = []
504 for t in target_task_def.pop('Tags', []):
505 tags.append({'key': t['Key'], 'value': t['Value']})
506 if tags:
507 target_task_def['tags'] = tags
508 return target_task_def
509
510 def update_target_containers_size(self, current_task_def, target_task_def):
511 """Update container memory/size targets.
512
513 We need to update memory/cpu requirements of the containers within
514 the task, so the total of the containers in the task def
515 matches the definition.
516
517 for a task w/ a single container this is simple, make the container match
518 the definition.
519
520 for a multi container task, we need select a heuristic
521 (proportional, largest) with some notion of a floor / min for
522 proportional. for now we punt on multi-container tasks.
523 """
524 if len(target_task_def['containerDefinitions']) > 1:
525 return
526 container_def = target_task_def['containerDefinitions'][0]
527 container_def['memory'] = int(target_task_def['memory'])
528 container_def['cpu'] = int(target_task_def['cpu'])
529 return target_task_def
530
531
532@Service.action_registry.register('modify')
533class UpdateService(BaseAction):
534 """Action to update service
535
536 :example:
537
538 .. code-block:: yaml
539
540 policies:
541 - name: no-public-ips-services
542 resource: ecs-service
543 filters:
544 - 'networkConfiguration.awsvpcConfiguration.assignPublicIp': 'ENABLED'
545 actions:
546 - type: modify
547 update:
548 networkConfiguration:
549 awsvpcConfiguration:
550 assignPublicIp: DISABLED
551 """
552
553 schema = type_schema('modify',
554 update={
555 'desiredCount': {'type': 'integer'},
556 'taskDefinition': {'type': 'string'},
557 'deploymentConfiguration': {
558 'type': 'object',
559 'properties': {
560 'maximumPercent': {'type': 'integer'},
561 'minimumHealthyPercent': {'type': 'integer'},
562 }
563 },
564 'networkConfiguration': {
565 'type': 'object',
566 'properties': {
567 'awsvpcConfiguration': {
568 'type': 'object',
569 'properties': {
570 'subnets': {
571 'type': 'array',
572 'items': {
573 'type': 'string',
574 },
575 'minItems': 1
576 },
577 'securityGroups': {
578 'items': {
579 'type': 'string',
580 },
581 },
582 'assignPublicIp': {
583 'type': 'string',
584 'enum': ['ENABLED', 'DISABLED'],
585 }
586 }
587 }
588 }
589 },
590 'platformVersion': {'type': 'string'},
591 'forceNewDeployment': {'type': 'boolean', 'default': False},
592 'healthCheckGracePeriodSeconds': {'type': 'integer'},
593 }
594 )
595
596 permissions = ('ecs:UpdateService',)
597
598 def process(self, resources):
599 client = local_session(self.manager.session_factory).client('ecs')
600 update = self.data.get('update')
601
602 for r in resources:
603 param = {}
604
605 # Handle network separately as it requires atomic updating, and populating
606 # defaults from the resource.
607 net_update = update.get('networkConfiguration', {}).get('awsvpcConfiguration')
608 if net_update:
609 net_param = dict(r['networkConfiguration']['awsvpcConfiguration'])
610 param['networkConfiguration'] = {'awsvpcConfiguration': net_param}
611 for k, v in net_update.items():
612 net_param[k] = v
613
614 for k, v in update.items():
615 if k == 'networkConfiguration':
616 continue
617 elif r.get(k) != v:
618 param[k] = v
619
620 if not param:
621 continue
622
623 client.update_service(
624 cluster=r['clusterArn'], service=r['serviceName'], **param)
625
626
627@Service.action_registry.register('delete')
628class DeleteService(BaseAction):
629 """Delete service(s)."""
630
631 schema = type_schema('delete')
632 permissions = ('ecs:DeleteService',)
633
634 def process(self, resources):
635 client = local_session(self.manager.session_factory).client('ecs')
636 retry = get_retry(('Throttling',))
637 for r in resources:
638 try:
639 desiredCount = 0
640
641 # Two different types of responses:
642 # Deployments would appear for normal services
643 # TaskSets would show for Blue/Green deployment
644 if 'deployments' in r:
645 primary = [d for d in r['deployments'] if d['status'] == 'PRIMARY'].pop()
646 desiredCount = primary.get('desiredCount', 0)
647 elif 'taskSets' in r:
648 primary = [t for t in r['taskSets'] if t['status'] == 'PRIMARY'].pop()
649 desiredCount = primary.get('computedDesiredCount', 0)
650
651 if desiredCount > 0:
652 retry(client.update_service,
653 cluster=r['clusterArn'], service=r['serviceName'], desiredCount=0)
654
655 retry(client.delete_service,
656 cluster=r['clusterArn'], service=r['serviceName'])
657 except ClientError as e:
658 if e.response['Error']['Code'] != 'ServiceNotFoundException':
659 raise
660
661
662@query.sources.register('describe-ecs-task')
663class ECSTaskDescribeSource(ECSClusterResourceDescribeSource):
664
665 def process_cluster_resources(self, client, cluster_id, tasks):
666 results = []
667 for task_set in chunks(tasks, self.manager.chunk_size):
668 results.extend(
669 self.manager.retry(
670 client.describe_tasks,
671 cluster=cluster_id,
672 include=['TAGS'],
673 tasks=task_set).get('tasks', []))
674 ecs_tag_normalize(results)
675 return results
676
677
678@resources.register('ecs-task')
679class Task(query.ChildResourceManager):
680
681 chunk_size = 100
682
683 class resource_type(query.TypeInfo):
684 service = 'ecs'
685 arn = id = name = 'taskArn'
686 arn_type = 'task'
687 enum_spec = ('list_tasks', 'taskArns', None)
688 parent_spec = ('ecs', 'cluster', None)
689 supports_trailevents = True
690 cfn_type = 'AWS::ECS::TaskSet'
691
692 @property
693 def source_type(self):
694 source = self.data.get('source', 'describe')
695 if source in ('describe', 'describe-child'):
696 source = 'describe-ecs-task'
697 return source
698
699 def get_resources(self, ids, cache=True, augment=True):
700 return super(Task, self).get_resources(ids, cache, augment=False)
701
702
703@Task.filter_registry.register('subnet')
704class TaskSubnetFilter(net_filters.SubnetFilter):
705
706 RelatedIdsExpression = "attachments[].details[?name == 'subnetId'].value[]"
707
708
709@Task.filter_registry.register('security-group')
710class TaskSGFilter(net_filters.SecurityGroupFilter):
711
712 ecs_group_cache = None
713
714 RelatedIdsExpression = ""
715 eni_expression = "attachments[].details[?name == 'networkInterfaceId'].value[]"
716 sg_expression = "Groups[].GroupId[]"
717
718 def _get_related_ids(self, resources):
719 groups = dict()
720 eni_ids = set()
721
722 cexp = jmespath_compile(self.eni_expression)
723 for r in resources:
724 ids = cexp.search(r)
725 if ids:
726 eni_ids.update(ids)
727
728 if eni_ids:
729 client = local_session(self.manager.session_factory).client('ec2')
730 response = client.describe_network_interfaces(
731 NetworkInterfaceIds=list(eni_ids)
732 )
733 if response["NetworkInterfaces"]:
734 cexp = jmespath_compile(self.sg_expression)
735 for r in response["NetworkInterfaces"]:
736 ids = cexp.search(r)
737 if ids:
738 groups[r["NetworkInterfaceId"]] = ids
739 self.ecs_group_cache = groups
740
741 return groups
742
743 def get_related_ids(self, resources):
744 if not self.ecs_group_cache:
745 self.ecs_group_cache = self._get_related_ids(resources)
746
747 group_ids = set()
748 cexp = jmespath_compile(self.eni_expression)
749 for r in resources:
750 ids = cexp.search(r)
751 for group_id in ids:
752 group_ids.update(self.ecs_group_cache.get(group_id, ()))
753 return list(group_ids)
754
755
756@Task.filter_registry.register('network-location', net_filters.NetworkLocation)
757@Task.filter_registry.register('task-definition')
758class TaskTaskDefinitionFilter(RelatedTaskDefinitionFilter):
759 """Filter tasks by their task definition.
760
761 :Example:
762
763 Find any fargate tasks that are running without read only root
764 and stop them.
765
766 .. code-block:: yaml
767
768 policies:
769 - name: fargate-readonly-tasks
770 resource: ecs-task
771 filters:
772 - launchType: FARGATE
773 - type: task-definition
774 key: "containerDefinitions[].readonlyRootFilesystem"
775 value: None
776 value_type: swap
777 op: contains
778 actions:
779 - type: stop
780
781 """
782 related_key = 'taskDefinitionArn'
783
784
785@Task.action_registry.register('stop')
786class StopTask(BaseAction):
787 """Stop/Delete a currently running task."""
788
789 schema = type_schema('stop', reason={"type": "string"})
790 permissions = ('ecs:StopTask',)
791
792 def process(self, resources):
793 client = local_session(self.manager.session_factory).client('ecs')
794 retry = get_retry(('Throttling',))
795 reason = self.data.get('reason', 'custodian policy')
796
797 for r in resources:
798 try:
799 retry(client.stop_task,
800 cluster=r['clusterArn'],
801 task=r['taskArn'],
802 reason=reason)
803 except ClientError as e:
804 # No error code for not found.
805 if e.response['Error']['Message'] != "The referenced task was not found.":
806 raise
807
808
809class DescribeTaskDefinition(DescribeSource):
810
811 def get_resources(self, ids, cache=True):
812 if cache:
813 resources = self.manager._get_cached_resources(ids)
814 if resources is not None:
815 return resources
816 try:
817 resources = self.augment(ids)
818 return resources
819 except ClientError as e:
820 self.manager.log.warning("event ids not resolved: %s error:%s" % (ids, e))
821 return []
822
823 def augment(self, resources):
824 results = []
825 client = local_session(self.manager.session_factory).client('ecs')
826 for task_def_set in resources:
827 response = self.manager.retry(
828 client.describe_task_definition,
829 taskDefinition=task_def_set,
830 include=['TAGS'])
831 r = response['taskDefinition']
832 r['tags'] = response.get('tags', [])
833 results.append(r)
834 ecs_tag_normalize(results)
835 return results
836
837
838class ConfigECSTaskDefinition(ContainerConfigSource):
839
840 preserve_empty = {'mountPoints', 'portMappings', 'volumesFrom'}
841
842
843@resources.register('ecs-task-definition')
844class TaskDefinition(query.QueryResourceManager):
845
846 class resource_type(query.TypeInfo):
847 service = 'ecs'
848 arn = id = name = 'taskDefinitionArn'
849 enum_spec = ('list_task_definitions', 'taskDefinitionArns', None)
850 cfn_type = config_type = 'AWS::ECS::TaskDefinition'
851 arn_type = 'task-definition'
852
853 source_mapping = {
854 'config': ConfigECSTaskDefinition,
855 'describe': DescribeTaskDefinition
856 }
857
858 def get_resources(self, ids, cache=True, augment=True):
859 return super(TaskDefinition, self).get_resources(ids, cache, augment=False)
860
861
862@TaskDefinition.action_registry.register('delete')
863class DeleteTaskDefinition(BaseAction):
864 """Delete/DeRegister a task definition.
865
866 The definition will be marked as InActive. Currently running
867 services and task can still reference, new services & tasks
868 can't.
869
870 force is False by default. When given as True, the task definition will
871 be permanently deleted.
872
873 .. code-block:: yaml
874
875 policies:
876 - name: deregister-task-definition
877 resource: ecs-task-definition
878 filters:
879 - family: test-task-def
880 actions:
881 - type: delete
882
883 - name: delete-task-definition
884 resource: ecs-task-definition
885 filters:
886 - family: test-task-def
887 actions:
888 - type: delete
889 force: True
890 """
891
892 schema = type_schema('delete', force={'type': 'boolean'})
893 permissions = ('ecs:DeregisterTaskDefinition', 'ecs:DeleteTaskDefinitions',)
894
895 def process(self, resources):
896 client = local_session(self.manager.session_factory).client('ecs')
897 retry = get_retry(('Throttling',))
898 force = self.data.get('force', False)
899
900 for r in resources:
901 if r['status'] == 'INACTIVE':
902 continue
903 try:
904 retry(client.deregister_task_definition,
905 taskDefinition=r['taskDefinitionArn'])
906 except ClientError as e:
907 if e.response['Error'][
908 'Message'] != 'The specified task definition does not exist.':
909 raise
910
911 if force:
912 task_definitions_arns = [
913 r['taskDefinitionArn']
914 for r in resources
915 ]
916 for chunk in chunks(task_definitions_arns, size=10):
917 retry(client.delete_task_definitions, taskDefinitions=chunk)
918
919
920@resources.register('ecs-container-instance')
921class ContainerInstance(query.ChildResourceManager):
922
923 chunk_size = 100
924
925 class resource_type(query.TypeInfo):
926 service = 'ecs'
927 id = name = 'containerInstanceArn'
928 enum_spec = ('list_container_instances', 'containerInstanceArns', None)
929 parent_spec = ('ecs', 'cluster', None)
930 arn = "containerInstanceArn"
931
932 @property
933 def source_type(self):
934 source = self.data.get('source', 'describe')
935 if source in ('describe', 'describe-child'):
936 source = 'describe-ecs-container-instance'
937 return source
938
939
940@query.sources.register('describe-ecs-container-instance')
941class ECSContainerInstanceDescribeSource(ECSClusterResourceDescribeSource):
942
943 def process_cluster_resources(self, client, cluster_id, container_instances):
944 results = []
945 for service_set in chunks(container_instances, self.manager.chunk_size):
946 r = client.describe_container_instances(
947 cluster=cluster_id,
948 include=['TAGS'],
949 containerInstances=service_set).get('containerInstances', [])
950 # Many Container Instance API calls require the cluster_id, adding as a
951 # custodian specific key in the resource
952 for i in r:
953 i['c7n:cluster'] = cluster_id
954 results.extend(r)
955 ecs_tag_normalize(results)
956 return results
957
958
959@ContainerInstance.filter_registry.register('subnet')
960class ContainerInstanceSubnetFilter(net_filters.SubnetFilter):
961
962 RelatedIdsExpression = "attributes[?name == 'ecs.subnet-id'].value[]"
963
964
965@ContainerInstance.action_registry.register('set-state')
966class SetState(BaseAction):
967 """Updates a container instance to either ACTIVE or DRAINING
968
969 :example:
970
971 .. code-block:: yaml
972
973 policies:
974 - name: drain-container-instances
975 resource: ecs-container-instance
976 actions:
977 - type: set-state
978 state: DRAINING
979 """
980 schema = type_schema(
981 'set-state',
982 state={"type": "string", 'enum': ['DRAINING', 'ACTIVE']})
983 permissions = ('ecs:UpdateContainerInstancesState',)
984
985 def process(self, resources):
986 cluster_map = group_by(resources, 'c7n:cluster')
987 for cluster in cluster_map:
988 c_instances = [i['containerInstanceArn'] for i in cluster_map[cluster]
989 if i['status'] != self.data.get('state')]
990 results = self.process_cluster(cluster, c_instances)
991 return results
992
993 def process_cluster(self, cluster, c_instances):
994 # Limit on number of container instance that can be updated in a single
995 # update_container_instances_state call is 10
996 chunk_size = 10
997 client = local_session(self.manager.session_factory).client('ecs')
998 for service_set in chunks(c_instances, chunk_size):
999 try:
1000 client.update_container_instances_state(
1001 cluster=cluster,
1002 containerInstances=service_set,
1003 status=self.data.get('state'))
1004 except ClientError:
1005 self.manager.log.warning(
1006 'Failed to update Container Instances State: %s, cluster %s' %
1007 (service_set, cluster))
1008 raise
1009
1010
1011@ContainerInstance.action_registry.register('update-agent')
1012class UpdateAgent(BaseAction):
1013 """Updates the agent on a container instance
1014 """
1015
1016 schema = type_schema('update-agent')
1017 permissions = ('ecs:UpdateContainerAgent',)
1018
1019 def process(self, resources):
1020 client = local_session(self.manager.session_factory).client('ecs')
1021 for r in resources:
1022 self.process_instance(
1023 client, r.get('c7n:cluster'), r.get('containerInstanceArn'))
1024
1025 def process_instance(self, client, cluster, instance):
1026 try:
1027 client.update_container_agent(
1028 cluster=cluster, containerInstance=instance)
1029 except (client.exceptions.NoUpdateAvailableException,
1030 client.exceptions.UpdateInProgressException):
1031 return
1032
1033
1034@ECSCluster.action_registry.register('tag')
1035@TaskDefinition.action_registry.register('tag')
1036@Service.action_registry.register('tag')
1037@Task.action_registry.register('tag')
1038@ContainerInstance.action_registry.register('tag')
1039class TagEcsResource(Tag):
1040 """Action to create tag(s) on an ECS resource
1041 (ecs, ecs-task-definition, ecs-service, ecs-task, ecs-container-instance)
1042
1043 Requires arns in new format for tasks, services, and container-instances.
1044 https://docs.aws.amazon.com/AmazonECS/latest/userguide/ecs-resource-ids.html
1045
1046 :example:
1047
1048 .. code-block:: yaml
1049
1050 policies:
1051 - name: tag-ecs-service
1052 resource: ecs-service
1053 filters:
1054 - "tag:target-tag": absent
1055 - type: taggable
1056 state: true
1057 actions:
1058 - type: tag
1059 key: target-tag
1060 value: target-value
1061 """
1062 permissions = ('ecs:TagResource',)
1063 batch_size = 1
1064
1065 def process_resource_set(self, client, resources, tags):
1066 mid = self.manager.resource_type.id
1067 tags = [{'key': t['Key'], 'value': t['Value']} for t in tags]
1068 old_arns = 0
1069 for r in resources:
1070 if not ecs_taggable(self.manager.resource_type, r):
1071 old_arns += 1
1072 continue
1073 client.tag_resource(resourceArn=r[mid], tags=tags)
1074 if old_arns:
1075 self.log.warn("Couldn't tag %d resource(s). Needs new ARN format", old_arns)
1076
1077
1078@ECSCluster.action_registry.register('remove-tag')
1079@TaskDefinition.action_registry.register('remove-tag')
1080@Service.action_registry.register('remove-tag')
1081@Task.action_registry.register('remove-tag')
1082@ContainerInstance.action_registry.register('remove-tag')
1083class RemoveTagEcsResource(RemoveTag):
1084 """Remove tag(s) from ECS resources
1085 (ecs, ecs-task-definition, ecs-service, ecs-task, ecs-container-instance)
1086
1087 :example:
1088
1089 .. code-block:: yaml
1090
1091 policies:
1092 - name: ecs-service-remove-tag
1093 resource: ecs-service
1094 filters:
1095 - type: taggable
1096 state: true
1097 actions:
1098 - type: remove-tag
1099 tags: ["BadTag"]
1100 """
1101 permissions = ('ecs:UntagResource',)
1102 batch_size = 1
1103
1104 def process_resource_set(self, client, resources, keys):
1105 old_arns = 0
1106 for r in resources:
1107 if not ecs_taggable(self.manager.resource_type, r):
1108 old_arns += 1
1109 continue
1110 client.untag_resource(resourceArn=r[self.id_key], tagKeys=keys)
1111 if old_arns != 0:
1112 self.log.warn("Couldn't untag %d resource(s). Needs new ARN format", old_arns)
1113
1114
1115@ECSCluster.action_registry.register('mark-for-op')
1116@TaskDefinition.action_registry.register('mark-for-op')
1117@Service.action_registry.register('mark-for-op')
1118@Task.action_registry.register('mark-for-op')
1119@ContainerInstance.action_registry.register('mark-for-op')
1120class MarkEcsResourceForOp(TagDelayedAction):
1121 """Mark ECS resources for deferred action
1122 (ecs, ecs-task-definition, ecs-service, ecs-task, ecs-container-instance)
1123
1124 Requires arns in new format for tasks, services, and container-instances.
1125 https://docs.aws.amazon.com/AmazonECS/latest/userguide/ecs-resource-ids.html
1126
1127 :example:
1128
1129 .. code-block:: yaml
1130
1131 policies:
1132 - name: ecs-service-invalid-tag-stop
1133 resource: ecs-service
1134 filters:
1135 - "tag:InvalidTag": present
1136 - type: taggable
1137 state: true
1138 actions:
1139 - type: mark-for-op
1140 op: delete
1141 days: 1
1142 """
1143
1144
1145@Service.filter_registry.register('taggable')
1146@Task.filter_registry.register('taggable')
1147@ContainerInstance.filter_registry.register('taggable')
1148class ECSTaggable(Filter):
1149 """
1150 Filter ECS resources on arn-format
1151 https://docs.aws.amazon.com/AmazonECS/latest/userguide/ecs-resource-ids.html
1152 :example:
1153
1154 .. code-block:: yaml
1155
1156 policies:
1157 - name: taggable
1158 resource: ecs-service
1159 filters:
1160 - type: taggable
1161 state: True
1162 """
1163
1164 schema = type_schema('taggable', state={'type': 'boolean'})
1165
1166 def get_permissions(self):
1167 return self.manager.get_permissions()
1168
1169 def process(self, resources, event=None):
1170 if not self.data.get('state'):
1171 return [r for r in resources if not ecs_taggable(self.manager.resource_type, r)]
1172 else:
1173 return [r for r in resources if ecs_taggable(self.manager.resource_type, r)]
1174
1175
1176ECSCluster.filter_registry.register('marked-for-op', TagActionFilter)
1177TaskDefinition.filter_registry.register('marked-for-op', TagActionFilter)
1178Service.filter_registry.register('marked-for-op', TagActionFilter)
1179Task.filter_registry.register('marked-for-op', TagActionFilter)
1180ContainerInstance.filter_registry.register('marked-for-op', TagActionFilter)
1181
1182ECSCluster.action_registry.register('auto-tag-user', AutoTagUser)
1183TaskDefinition.action_registry.register('auto-tag-user', AutoTagUser)
1184Service.action_registry.register('auto-tag-user', AutoTagUser)
1185Task.action_registry.register('auto-tag-user', AutoTagUser)
1186ContainerInstance.action_registry.register('auto-tag-user', AutoTagUser)
1187
1188Service.filter_registry.register('offhour', OffHour)
1189Service.filter_registry.register('onhour', OnHour)
1190
1191
1192@Service.action_registry.register('resize')
1193class AutoscalingECSService(AutoscalingBase):
1194 permissions = (
1195 'ecs:UpdateService',
1196 'ecs:TagResource',
1197 'ecs:UntagResource',
1198 )
1199
1200 service_namespace = 'ecs'
1201 scalable_dimension = 'ecs:service:DesiredCount'
1202
1203 def get_resource_id(self, resource):
1204 return resource['serviceArn'].split(':')[-1]
1205
1206 def get_resource_tag(self, resource, key):
1207 if 'Tags' in resource:
1208 for tag in resource['Tags']:
1209 if tag['Key'] == key:
1210 return tag['Value']
1211 return None
1212
1213 def get_resource_desired(self, resource):
1214 return int(resource['desiredCount'])
1215
1216 def set_resource_desired(self, resource, desired):
1217 client = local_session(self.manager.session_factory).client('ecs')
1218 client.update_service(
1219 cluster=resource['clusterArn'],
1220 service=resource['serviceName'],
1221 desiredCount=desired,
1222 )