Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/c7n/resources/ecs.py: 41%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

626 statements  

1# Copyright The Cloud Custodian Authors. 

2# SPDX-License-Identifier: Apache-2.0 

3import copy 

4from botocore.exceptions import ClientError 

5 

6from c7n.actions import AutoTagUser, AutoscalingBase, BaseAction 

7from c7n.exceptions import PolicyExecutionError, PolicyValidationError 

8from c7n.filters import MetricsFilter, ValueFilter, Filter 

9from c7n.filters.costhub import CostHubRecommendation 

10from c7n.filters.offhours import OffHour, OnHour 

11import c7n.filters.vpc as net_filters 

12from c7n.manager import resources 

13from c7n.utils import local_session, chunks, get_retry, type_schema, group_by, jmespath_compile 

14from c7n import query, utils 

15from c7n.query import DescribeSource, ConfigSource 

16from c7n.resources.aws import Arn 

17from c7n.tags import Tag, TagDelayedAction, RemoveTag, TagActionFilter 

18 

19 

20def ecs_tag_normalize(resources): 

21 """normalize tag format on ecs resources to match common aws format.""" 

22 for r in resources: 

23 if 'tags' in r: 

24 r['Tags'] = [{'Key': t['key'], 'Value': t['value']} for t in r['tags']] 

25 r.pop('tags') 

26 

27 

28NEW_ARN_STYLE = ('container-instance', 'service', 'task') 

29 

30 

31def ecs_taggable(model, r): 

32 # Tag support requires new arn format 

33 # https://docs.aws.amazon.com/AmazonECS/latest/developerguide/ecs-using-tags.html 

34 # 

35 # New arn format details 

36 # https://docs.aws.amazon.com/AmazonECS/latest/developerguide/ecs-resource-ids.html 

37 # 

38 path_parts = r[model.id].rsplit(':', 1)[-1].split('/') 

39 if path_parts[0] not in NEW_ARN_STYLE: 

40 return True 

41 return len(path_parts) > 2 

42 

43 

44class ContainerConfigSource(ConfigSource): 

45 

46 preserve_empty = () 

47 preserve_case = {'Tags'} 

48 mapped_keys = {} 

49 

50 @classmethod 

51 def remap_keys(cls, resource): 

52 for k, v in cls.mapped_keys.items(): 

53 if v in resource: 

54 continue 

55 if k not in resource: 

56 continue 

57 resource[v] = resource.pop(k) 

58 return resource 

59 

60 @classmethod 

61 def lower_keys(cls, data): 

62 if isinstance(data, dict): 

63 for k, v in list(data.items()): 

64 if k in cls.preserve_case: 

65 continue 

66 lk = k[0].lower() + k[1:] 

67 data[lk] = data.pop(k) 

68 # describe doesn't return empty list/dict by default 

69 if isinstance(v, (list, dict)) and not v and lk not in cls.preserve_empty: 

70 data.pop(lk) 

71 elif isinstance(v, (dict, list)): 

72 data[lk] = cls.lower_keys(v) 

73 elif isinstance(data, list): 

74 return list(map(cls.lower_keys, data)) 

75 return data 

76 

77 def load_resource(self, item): 

78 resource = self.lower_keys(super().load_resource(item)) 

79 if self.mapped_keys: 

80 return self.remap_keys(resource) 

81 return resource 

82 

83 

84class ClusterDescribe(query.DescribeSource): 

85 

86 def augment(self, resources): 

87 resources = super(ClusterDescribe, self).augment(resources) 

88 ecs_tag_normalize(resources) 

89 return resources 

90 

91 

92@resources.register('ecs') 

93class ECSCluster(query.QueryResourceManager): 

94 

95 class resource_type(query.TypeInfo): 

96 service = 'ecs' 

97 enum_spec = ('list_clusters', 'clusterArns', None) 

98 batch_detail_spec = ( 

99 'describe_clusters', 'clusters', None, 'clusters', { 

100 'include': ['TAGS', 'SETTINGS', 'CONFIGURATIONS'] 

101 }) 

102 name = "clusterName" 

103 arn = id = "clusterArn" 

104 arn_type = 'cluster' 

105 config_type = cfn_type = 'AWS::ECS::Cluster' 

106 

107 source_mapping = { 

108 'describe': ClusterDescribe, 

109 'config': query.ConfigSource 

110 } 

111 

112 

113@ECSCluster.filter_registry.register('metrics') 

114class ECSMetrics(MetricsFilter): 

115 

116 def get_dimensions(self, resource): 

117 return [{'Name': 'ClusterName', 'Value': resource['clusterName']}] 

118 

119 

120class ECSClusterResourceDescribeSource(query.ChildDescribeSource): 

121 

122 # We need an additional subclass of describe for ecs cluster. 

123 # 

124 # - Default child query just returns the child resources from 

125 # enumeration op, for ecs clusters, enumeration just returns 

126 # resources ids, we also need to retain the parent id for 

127 # augmentation. 

128 # 

129 # - The default augmentation detail_spec/batch_detail_spec need additional 

130 # handling for the string resources with parent id. 

131 # 

132 

133 def __init__(self, manager): 

134 self.manager = manager 

135 self.query = query.ChildResourceQuery( 

136 self.manager.session_factory, self.manager, capture_parent_id=True) 

137 

138 def get_resources(self, ids, cache=True): 

139 """Retrieve ecs resources for serverless policies or related resources 

140 

141 Requires arns in new format. 

142 https://docs.aws.amazon.com/AmazonECS/latest/userguide/ecs-resource-ids.html 

143 """ 

144 cluster_resources = {} 

145 for i in ids: 

146 _, ident = i.rsplit(':', 1) 

147 parts = ident.split('/', 2) 

148 if len(parts) != 3: 

149 raise PolicyExecutionError("New format ecs arn required") 

150 cluster_resources.setdefault(parts[1], []).append(parts[2]) 

151 

152 results = [] 

153 client = local_session(self.manager.session_factory).client('ecs') 

154 for cid, resource_ids in cluster_resources.items(): 

155 results.extend( 

156 self.process_cluster_resources(client, cid, resource_ids)) 

157 return results 

158 

159 def augment(self, resources): 

160 parent_child_map = {} 

161 for pid, r in resources: 

162 parent_child_map.setdefault(pid, []).append(r) 

163 results = [] 

164 with self.manager.executor_factory( 

165 max_workers=self.manager.max_workers) as w: 

166 client = local_session(self.manager.session_factory).client('ecs') 

167 futures = {} 

168 for pid, services in parent_child_map.items(): 

169 futures[ 

170 w.submit( 

171 self.process_cluster_resources, client, pid, services) 

172 ] = (pid, services) 

173 for f in futures: 

174 pid, services = futures[f] 

175 if f.exception(): 

176 self.manager.log.warning( 

177 'error fetching ecs resources for cluster %s: %s', 

178 pid, f.exception()) 

179 continue 

180 results.extend(f.result()) 

181 return results 

182 

183 

184@query.sources.register('describe-ecs-service') 

185class ECSServiceDescribeSource(ECSClusterResourceDescribeSource): 

186 

187 def process_cluster_resources(self, client, cluster_id, services): 

188 results = [] 

189 for service_set in chunks(services, self.manager.chunk_size): 

190 results.extend( 

191 client.describe_services( 

192 cluster=cluster_id, 

193 include=['TAGS'], 

194 services=service_set).get('services', [])) 

195 ecs_tag_normalize(results) 

196 return results 

197 

198 

199class ECSServiceConfigSource(ContainerConfigSource): 

200 perserve_empty = { 

201 'placementConstraints', 'placementStrategy', 

202 'serviceRegistries', 'Tags', 'loadBalancers'} 

203 

204 mapped_keys = { 

205 'role': 'roleArn', 'cluster': 'clusterArn'} 

206 

207 

208@resources.register('ecs-service') 

209class Service(query.ChildResourceManager): 

210 

211 chunk_size = 10 

212 

213 class resource_type(query.TypeInfo): 

214 service = 'ecs' 

215 name = 'serviceName' 

216 arn = id = 'serviceArn' 

217 enum_spec = ('list_services', 'serviceArns', None) 

218 parent_spec = ('ecs', 'cluster', None) 

219 supports_trailevents = True 

220 config_type = cfn_type = 'AWS::ECS::Service' 

221 

222 source_mapping = { 

223 'config': ECSServiceConfigSource, 

224 'describe-child': ECSServiceDescribeSource, 

225 'describe': ECSServiceDescribeSource, 

226 } 

227 

228 def get_resources(self, ids, cache=True, augment=True): 

229 return super(Service, self).get_resources(ids, cache, augment=False) 

230 

231 

232@Service.filter_registry.register('metrics') 

233class ServiceMetrics(MetricsFilter): 

234 

235 def get_dimensions(self, resource): 

236 return [ 

237 {'Name': 'ClusterName', 'Value': resource['clusterArn'].rsplit('/')[-1]}, 

238 {'Name': 'ServiceName', 'Value': resource['serviceName']}] 

239 

240 

241class RelatedTaskDefinitionFilter(ValueFilter): 

242 

243 schema = type_schema('task-definition', rinherit=ValueFilter.schema) 

244 schema_alias = False 

245 permissions = ('ecs:DescribeTaskDefinition', 

246 'ecs:ListTaskDefinitions') 

247 related_key = 'taskDefinition' 

248 

249 def process(self, resources, event=None): 

250 self.task_defs = {t['taskDefinitionArn']: t for t in self.get_task_defs(resources)} 

251 return super(RelatedTaskDefinitionFilter, self).process(resources) 

252 

253 def get_task_defs(self, resources): 

254 task_def_ids = list({s[self.related_key] for s in resources}) 

255 task_def_manager = self.manager.get_resource_manager( 

256 'ecs-task-definition') 

257 

258 # due to model difference (multi-level containment with 

259 # multi-step resource iteration) and potential volume of 

260 # resources, we break our abstractions a little in the name of 

261 # efficiency wrt api usage. 

262 

263 # check to see if task def cache is already populated 

264 key = task_def_manager.get_cache_key(None) 

265 if self.manager._cache.get(key): 

266 task_defs = task_def_manager.get_resources(task_def_ids) 

267 # else just augment the ids 

268 else: 

269 task_defs = task_def_manager.augment(task_def_ids) 

270 return task_defs 

271 

272 def __call__(self, i): 

273 task = self.task_defs[i[self.related_key]] 

274 return self.match(task) 

275 

276 

277@Service.filter_registry.register('task-definition') 

278class ServiceTaskDefinitionFilter(RelatedTaskDefinitionFilter): 

279 """Filter services by their task definitions. 

280 

281 :Example: 

282 

283 Find any fargate services that are running with a particular 

284 image in the task and stop them. 

285 

286 .. code-block:: yaml 

287 

288 policies: 

289 - name: fargate-find-stop-image 

290 resource: ecs-task 

291 filters: 

292 - launchType: FARGATE 

293 - type: task-definition 

294 key: "containerDefinitions[].image" 

295 value: "elasticsearch/elasticsearch:6.4.3" 

296 value_type: swap 

297 op: contains 

298 actions: 

299 - type: stop 

300 """ 

301 

302 

303@ECSCluster.filter_registry.register('ebs-storage') 

304class Storage(ValueFilter): 

305 """Filter clusters by configured EBS storage parameters. 

306 

307 :Example: 

308 

309 Find any ECS clusters that have instances that are using unencrypted EBS volumes. 

310 

311 .. code-block:: yaml 

312 

313 policies: 

314 - name: encrypted-ebs-volumes 

315 resource: ecs 

316 filters: 

317 - type: ebs-storage 

318 key: Encrypted 

319 value: true 

320 """ 

321 

322 schema = type_schema( 

323 'ebs-storage', rinherit=ValueFilter.schema, 

324 operator={'type': 'string', 'enum': ['or', 'and']}, 

325 ) 

326 schema_alias = False 

327 

328 def get_permissions(self): 

329 return (self.manager.get_resource_manager('ebs').get_permissions() + 

330 self.manager.get_resource_manager('ec2').get_permissions() + 

331 self.manager.get_resource_manager('ecs').get_permissions() 

332 ) 

333 

334 def process(self, resources, event=None): 

335 self.storage = self.get_storage(resources) 

336 self.skip = [] 

337 self.operator = self.data.get( 

338 'operator', 'or') == 'or' and any or all 

339 return list(filter(self, resources)) 

340 

341 def get_storage(self, resources): 

342 manager = self.manager.get_resource_manager('ecs-container-instance') 

343 

344 storage = {} 

345 for cluster_set in utils.chunks(resources, 200): 

346 for cluster in cluster_set: 

347 cluster["clusterArn"] 

348 instances = manager.resources({ 

349 "cluster": cluster['clusterArn'], 

350 }, augment=False) 

351 instances = manager.get_resources(instances, augment=False) 

352 storage[cluster["clusterArn"]] = [] 

353 

354 for instance in instances: 

355 storage[cluster["clusterArn"]].extend(self.get_ebs_volumes([instance["ec2InstanceId"]])) 

356 

357 return storage 

358 

359 def get_ebs_volumes(self, resources): 

360 volumes = [] 

361 ec2_manager = self.manager.get_resource_manager('ec2') 

362 ebs_manager = self.manager.get_resource_manager('ebs') 

363 for instance_set in utils.chunks(resources, 200): 

364 instance_set = ec2_manager.get_resources(instance_set) 

365 volume_ids = [] 

366 for i in instance_set: 

367 for bd in i.get('BlockDeviceMappings', ()): 

368 if 'Ebs' not in bd: 

369 continue 

370 volume_ids.append(bd['Ebs']['VolumeId']) 

371 for v in ebs_manager.get_resources(volume_ids): 

372 if not v['Attachments']: 

373 continue 

374 volumes.append(v) 

375 return volumes 

376 

377 def __call__(self, i): 

378 storage = self.storage.get(i["clusterArn"]) 

379 

380 if not storage: 

381 return False 

382 return self.operator(map(self.match, storage)) 

383 

384 

385@Service.filter_registry.register('subnet') 

386class SubnetFilter(net_filters.SubnetFilter): 

387 

388 RelatedIdsExpression = "" 

389 expressions = ('taskSets[].networkConfiguration.awsvpcConfiguration.subnets[]', 

390 'deployments[].networkConfiguration.awsvpcConfiguration.subnets[]', 

391 'networkConfiguration.awsvpcConfiguration.subnets[]') 

392 

393 def get_related_ids(self, resources): 

394 subnet_ids = set() 

395 for exp in self.expressions: 

396 cexp = jmespath_compile(exp) 

397 for r in resources: 

398 ids = cexp.search(r) 

399 if ids: 

400 subnet_ids.update(ids) 

401 return list(subnet_ids) 

402 

403 

404@Service.filter_registry.register('security-group') 

405class SGFilter(net_filters.SecurityGroupFilter): 

406 

407 RelatedIdsExpression = "" 

408 expressions = ('taskSets[].networkConfiguration.awsvpcConfiguration.securityGroups[]', 

409 'deployments[].networkConfiguration.awsvpcConfiguration.securityGroups[]', 

410 'networkConfiguration.awsvpcConfiguration.securityGroups[]') 

411 

412 def get_related_ids(self, resources): 

413 sg_ids = set() 

414 for exp in self.expressions: 

415 cexp = jmespath_compile(exp) 

416 for r in resources: 

417 ids = cexp.search(r) 

418 if ids: 

419 sg_ids.update(ids) 

420 return list(sg_ids) 

421 

422 

423@Service.filter_registry.register('network-location', net_filters.NetworkLocation) 

424@Service.action_registry.register('modify-definition') 

425class UpdateTemplate(BaseAction): 

426 

427 schema = type_schema( 

428 'modify-definition', 

429 properties={'type': 'object'}, 

430 ) 

431 

432 permissions = ("ecs:RegisterTaskDefinition", "ecs:UpdateService") 

433 

434 def validate(self): 

435 if self.data.get('properties'): 

436 return 

437 found = False 

438 for f in self.manager.iter_filters(): 

439 if isinstance(f, CostHubRecommendation): 

440 found = True 

441 if not found: 

442 raise PolicyValidationError( 

443 "modify-definition: either properties specified or am optimization filter used" 

444 ) 

445 

446 def process(self, resources): 

447 task_def_filter = ServiceTaskDefinitionFilter({}, self.manager) 

448 task_defs = {t['taskDefinitionArn']: t for t in task_def_filter.get_task_defs(resources)} 

449 client = local_session(self.manager.session_factory).client('ecs') 

450 

451 # we can only modify task definition when ecs is controlling the deployment. 

452 resources = self.filter_resources(resources, "deploymentController.type", ("ECS",)) 

453 

454 nack = 0 

455 for r in resources: 

456 r_task_def = task_defs[r[task_def_filter.related_key]] 

457 m_task_def = self.get_target_task_def(r, r_task_def) 

458 if m_task_def is None: 

459 nack += 1 

460 continue 

461 response = client.register_task_definition(**m_task_def) 

462 task_arn = response['taskDefinition']['taskDefinitionArn'] 

463 cluster, _ = Arn.parse(r['serviceArn']).resource.split('/', 1) 

464 client.update_service( 

465 cluster=cluster, 

466 service=r['serviceName'], 

467 taskDefinition=task_arn 

468 ) 

469 if nack: 

470 self.log.warning("modify-definition %d services not modified", nack) 

471 

472 task_def_normalized = [ 

473 "taskDefinitionArn", "revision", "status", 

474 "registeredAt", "registeredBy", "requiresAttributes", 

475 ] 

476 

477 task_def_remap = { 

478 "compatibilities": "requiresCompatibilities", 

479 } 

480 

481 def get_target_task_def(self, resource, current_task_def): 

482 target_task_def = copy.deepcopy(current_task_def) 

483 target_task_def.update(self.data.get('properties', {})) 

484 cost_optimization = resource.get(CostHubRecommendation.annotation_key) 

485 

486 if cost_optimization and cost_optimization['actionType'] == 'Rightsize': 

487 cpu, mem = cost_optimization["recommendedResourceSummary"].split("/") 

488 cpu = int(float(cpu.split(" ")[0])) 

489 mem = int(mem.split(" ")[0]) 

490 target_task_def["cpu"] = str(cpu) 

491 target_task_def["memory"] = str(mem) 

492 target_task_def = self.update_target_containers_size(current_task_def, target_task_def) 

493 

494 if target_task_def == current_task_def or target_task_def is None: 

495 return 

496 

497 # normalize from describe to register formats 

498 for k in self.task_def_normalized: 

499 target_task_def.pop(k, None) 

500 for ck, dk in self.task_def_remap.items(): 

501 if ck in target_task_def: 

502 target_task_def[dk] = target_task_def.pop(ck) 

503 tags = [] 

504 for t in target_task_def.pop('Tags', []): 

505 tags.append({'key': t['Key'], 'value': t['Value']}) 

506 if tags: 

507 target_task_def['tags'] = tags 

508 return target_task_def 

509 

510 def update_target_containers_size(self, current_task_def, target_task_def): 

511 """Update container memory/size targets. 

512 

513 We need to update memory/cpu requirements of the containers within 

514 the task, so the total of the containers in the task def 

515 matches the definition. 

516 

517 for a task w/ a single container this is simple, make the container match 

518 the definition. 

519 

520 for a multi container task, we need select a heuristic 

521 (proportional, largest) with some notion of a floor / min for 

522 proportional. for now we punt on multi-container tasks. 

523 """ 

524 if len(target_task_def['containerDefinitions']) > 1: 

525 return 

526 container_def = target_task_def['containerDefinitions'][0] 

527 container_def['memory'] = int(target_task_def['memory']) 

528 container_def['cpu'] = int(target_task_def['cpu']) 

529 return target_task_def 

530 

531 

532@Service.action_registry.register('modify') 

533class UpdateService(BaseAction): 

534 """Action to update service 

535 

536 :example: 

537 

538 .. code-block:: yaml 

539 

540 policies: 

541 - name: no-public-ips-services 

542 resource: ecs-service 

543 filters: 

544 - 'networkConfiguration.awsvpcConfiguration.assignPublicIp': 'ENABLED' 

545 actions: 

546 - type: modify 

547 update: 

548 networkConfiguration: 

549 awsvpcConfiguration: 

550 assignPublicIp: DISABLED 

551 """ 

552 

553 schema = type_schema('modify', 

554 update={ 

555 'desiredCount': {'type': 'integer'}, 

556 'taskDefinition': {'type': 'string'}, 

557 'deploymentConfiguration': { 

558 'type': 'object', 

559 'properties': { 

560 'maximumPercent': {'type': 'integer'}, 

561 'minimumHealthyPercent': {'type': 'integer'}, 

562 } 

563 }, 

564 'networkConfiguration': { 

565 'type': 'object', 

566 'properties': { 

567 'awsvpcConfiguration': { 

568 'type': 'object', 

569 'properties': { 

570 'subnets': { 

571 'type': 'array', 

572 'items': { 

573 'type': 'string', 

574 }, 

575 'minItems': 1 

576 }, 

577 'securityGroups': { 

578 'items': { 

579 'type': 'string', 

580 }, 

581 }, 

582 'assignPublicIp': { 

583 'type': 'string', 

584 'enum': ['ENABLED', 'DISABLED'], 

585 } 

586 } 

587 } 

588 } 

589 }, 

590 'platformVersion': {'type': 'string'}, 

591 'forceNewDeployment': {'type': 'boolean', 'default': False}, 

592 'healthCheckGracePeriodSeconds': {'type': 'integer'}, 

593 } 

594 ) 

595 

596 permissions = ('ecs:UpdateService',) 

597 

598 def process(self, resources): 

599 client = local_session(self.manager.session_factory).client('ecs') 

600 update = self.data.get('update') 

601 

602 for r in resources: 

603 param = {} 

604 

605 # Handle network separately as it requires atomic updating, and populating 

606 # defaults from the resource. 

607 net_update = update.get('networkConfiguration', {}).get('awsvpcConfiguration') 

608 if net_update: 

609 net_param = dict(r['networkConfiguration']['awsvpcConfiguration']) 

610 param['networkConfiguration'] = {'awsvpcConfiguration': net_param} 

611 for k, v in net_update.items(): 

612 net_param[k] = v 

613 

614 for k, v in update.items(): 

615 if k == 'networkConfiguration': 

616 continue 

617 elif r.get(k) != v: 

618 param[k] = v 

619 

620 if not param: 

621 continue 

622 

623 client.update_service( 

624 cluster=r['clusterArn'], service=r['serviceName'], **param) 

625 

626 

627@Service.action_registry.register('delete') 

628class DeleteService(BaseAction): 

629 """Delete service(s).""" 

630 

631 schema = type_schema('delete') 

632 permissions = ('ecs:DeleteService',) 

633 

634 def process(self, resources): 

635 client = local_session(self.manager.session_factory).client('ecs') 

636 retry = get_retry(('Throttling',)) 

637 for r in resources: 

638 try: 

639 desiredCount = 0 

640 

641 # Two different types of responses: 

642 # Deployments would appear for normal services 

643 # TaskSets would show for Blue/Green deployment 

644 if 'deployments' in r: 

645 primary = [d for d in r['deployments'] if d['status'] == 'PRIMARY'].pop() 

646 desiredCount = primary.get('desiredCount', 0) 

647 elif 'taskSets' in r: 

648 primary = [t for t in r['taskSets'] if t['status'] == 'PRIMARY'].pop() 

649 desiredCount = primary.get('computedDesiredCount', 0) 

650 

651 if desiredCount > 0: 

652 retry(client.update_service, 

653 cluster=r['clusterArn'], service=r['serviceName'], desiredCount=0) 

654 

655 retry(client.delete_service, 

656 cluster=r['clusterArn'], service=r['serviceName']) 

657 except ClientError as e: 

658 if e.response['Error']['Code'] != 'ServiceNotFoundException': 

659 raise 

660 

661 

662@query.sources.register('describe-ecs-task') 

663class ECSTaskDescribeSource(ECSClusterResourceDescribeSource): 

664 

665 def process_cluster_resources(self, client, cluster_id, tasks): 

666 results = [] 

667 for task_set in chunks(tasks, self.manager.chunk_size): 

668 results.extend( 

669 self.manager.retry( 

670 client.describe_tasks, 

671 cluster=cluster_id, 

672 include=['TAGS'], 

673 tasks=task_set).get('tasks', [])) 

674 ecs_tag_normalize(results) 

675 return results 

676 

677 

678@resources.register('ecs-task') 

679class Task(query.ChildResourceManager): 

680 

681 chunk_size = 100 

682 

683 class resource_type(query.TypeInfo): 

684 service = 'ecs' 

685 arn = id = name = 'taskArn' 

686 arn_type = 'task' 

687 enum_spec = ('list_tasks', 'taskArns', None) 

688 parent_spec = ('ecs', 'cluster', None) 

689 supports_trailevents = True 

690 cfn_type = 'AWS::ECS::TaskSet' 

691 

692 @property 

693 def source_type(self): 

694 source = self.data.get('source', 'describe') 

695 if source in ('describe', 'describe-child'): 

696 source = 'describe-ecs-task' 

697 return source 

698 

699 def get_resources(self, ids, cache=True, augment=True): 

700 return super(Task, self).get_resources(ids, cache, augment=False) 

701 

702 

703@Task.filter_registry.register('subnet') 

704class TaskSubnetFilter(net_filters.SubnetFilter): 

705 

706 RelatedIdsExpression = "attachments[].details[?name == 'subnetId'].value[]" 

707 

708 

709@Task.filter_registry.register('security-group') 

710class TaskSGFilter(net_filters.SecurityGroupFilter): 

711 

712 ecs_group_cache = None 

713 

714 RelatedIdsExpression = "" 

715 eni_expression = "attachments[].details[?name == 'networkInterfaceId'].value[]" 

716 sg_expression = "Groups[].GroupId[]" 

717 

718 def _get_related_ids(self, resources): 

719 groups = dict() 

720 eni_ids = set() 

721 

722 cexp = jmespath_compile(self.eni_expression) 

723 for r in resources: 

724 ids = cexp.search(r) 

725 if ids: 

726 eni_ids.update(ids) 

727 

728 if eni_ids: 

729 client = local_session(self.manager.session_factory).client('ec2') 

730 response = client.describe_network_interfaces( 

731 NetworkInterfaceIds=list(eni_ids) 

732 ) 

733 if response["NetworkInterfaces"]: 

734 cexp = jmespath_compile(self.sg_expression) 

735 for r in response["NetworkInterfaces"]: 

736 ids = cexp.search(r) 

737 if ids: 

738 groups[r["NetworkInterfaceId"]] = ids 

739 self.ecs_group_cache = groups 

740 

741 return groups 

742 

743 def get_related_ids(self, resources): 

744 if not self.ecs_group_cache: 

745 self.ecs_group_cache = self._get_related_ids(resources) 

746 

747 group_ids = set() 

748 cexp = jmespath_compile(self.eni_expression) 

749 for r in resources: 

750 ids = cexp.search(r) 

751 for group_id in ids: 

752 group_ids.update(self.ecs_group_cache.get(group_id, ())) 

753 return list(group_ids) 

754 

755 

756@Task.filter_registry.register('network-location', net_filters.NetworkLocation) 

757@Task.filter_registry.register('task-definition') 

758class TaskTaskDefinitionFilter(RelatedTaskDefinitionFilter): 

759 """Filter tasks by their task definition. 

760 

761 :Example: 

762 

763 Find any fargate tasks that are running without read only root 

764 and stop them. 

765 

766 .. code-block:: yaml 

767 

768 policies: 

769 - name: fargate-readonly-tasks 

770 resource: ecs-task 

771 filters: 

772 - launchType: FARGATE 

773 - type: task-definition 

774 key: "containerDefinitions[].readonlyRootFilesystem" 

775 value: None 

776 value_type: swap 

777 op: contains 

778 actions: 

779 - type: stop 

780 

781 """ 

782 related_key = 'taskDefinitionArn' 

783 

784 

785@Task.action_registry.register('stop') 

786class StopTask(BaseAction): 

787 """Stop/Delete a currently running task.""" 

788 

789 schema = type_schema('stop', reason={"type": "string"}) 

790 permissions = ('ecs:StopTask',) 

791 

792 def process(self, resources): 

793 client = local_session(self.manager.session_factory).client('ecs') 

794 retry = get_retry(('Throttling',)) 

795 reason = self.data.get('reason', 'custodian policy') 

796 

797 for r in resources: 

798 try: 

799 retry(client.stop_task, 

800 cluster=r['clusterArn'], 

801 task=r['taskArn'], 

802 reason=reason) 

803 except ClientError as e: 

804 # No error code for not found. 

805 if e.response['Error']['Message'] != "The referenced task was not found.": 

806 raise 

807 

808 

809class DescribeTaskDefinition(DescribeSource): 

810 

811 def get_resources(self, ids, cache=True): 

812 if cache: 

813 resources = self.manager._get_cached_resources(ids) 

814 if resources is not None: 

815 return resources 

816 try: 

817 resources = self.augment(ids) 

818 return resources 

819 except ClientError as e: 

820 self.manager.log.warning("event ids not resolved: %s error:%s" % (ids, e)) 

821 return [] 

822 

823 def augment(self, resources): 

824 results = [] 

825 client = local_session(self.manager.session_factory).client('ecs') 

826 for task_def_set in resources: 

827 response = self.manager.retry( 

828 client.describe_task_definition, 

829 taskDefinition=task_def_set, 

830 include=['TAGS']) 

831 r = response['taskDefinition'] 

832 r['tags'] = response.get('tags', []) 

833 results.append(r) 

834 ecs_tag_normalize(results) 

835 return results 

836 

837 

838class ConfigECSTaskDefinition(ContainerConfigSource): 

839 

840 preserve_empty = {'mountPoints', 'portMappings', 'volumesFrom'} 

841 

842 

843@resources.register('ecs-task-definition') 

844class TaskDefinition(query.QueryResourceManager): 

845 

846 class resource_type(query.TypeInfo): 

847 service = 'ecs' 

848 arn = id = name = 'taskDefinitionArn' 

849 enum_spec = ('list_task_definitions', 'taskDefinitionArns', None) 

850 cfn_type = config_type = 'AWS::ECS::TaskDefinition' 

851 arn_type = 'task-definition' 

852 

853 source_mapping = { 

854 'config': ConfigECSTaskDefinition, 

855 'describe': DescribeTaskDefinition 

856 } 

857 

858 def get_resources(self, ids, cache=True, augment=True): 

859 return super(TaskDefinition, self).get_resources(ids, cache, augment=False) 

860 

861 

862@TaskDefinition.action_registry.register('delete') 

863class DeleteTaskDefinition(BaseAction): 

864 """Delete/DeRegister a task definition. 

865 

866 The definition will be marked as InActive. Currently running 

867 services and task can still reference, new services & tasks 

868 can't. 

869 

870 force is False by default. When given as True, the task definition will 

871 be permanently deleted. 

872 

873 .. code-block:: yaml 

874 

875 policies: 

876 - name: deregister-task-definition 

877 resource: ecs-task-definition 

878 filters: 

879 - family: test-task-def 

880 actions: 

881 - type: delete 

882 

883 - name: delete-task-definition 

884 resource: ecs-task-definition 

885 filters: 

886 - family: test-task-def 

887 actions: 

888 - type: delete 

889 force: True 

890 """ 

891 

892 schema = type_schema('delete', force={'type': 'boolean'}) 

893 permissions = ('ecs:DeregisterTaskDefinition', 'ecs:DeleteTaskDefinitions',) 

894 

895 def process(self, resources): 

896 client = local_session(self.manager.session_factory).client('ecs') 

897 retry = get_retry(('Throttling',)) 

898 force = self.data.get('force', False) 

899 

900 for r in resources: 

901 if r['status'] == 'INACTIVE': 

902 continue 

903 try: 

904 retry(client.deregister_task_definition, 

905 taskDefinition=r['taskDefinitionArn']) 

906 except ClientError as e: 

907 if e.response['Error'][ 

908 'Message'] != 'The specified task definition does not exist.': 

909 raise 

910 

911 if force: 

912 task_definitions_arns = [ 

913 r['taskDefinitionArn'] 

914 for r in resources 

915 ] 

916 for chunk in chunks(task_definitions_arns, size=10): 

917 retry(client.delete_task_definitions, taskDefinitions=chunk) 

918 

919 

920@resources.register('ecs-container-instance') 

921class ContainerInstance(query.ChildResourceManager): 

922 

923 chunk_size = 100 

924 

925 class resource_type(query.TypeInfo): 

926 service = 'ecs' 

927 id = name = 'containerInstanceArn' 

928 enum_spec = ('list_container_instances', 'containerInstanceArns', None) 

929 parent_spec = ('ecs', 'cluster', None) 

930 arn = "containerInstanceArn" 

931 

932 @property 

933 def source_type(self): 

934 source = self.data.get('source', 'describe') 

935 if source in ('describe', 'describe-child'): 

936 source = 'describe-ecs-container-instance' 

937 return source 

938 

939 

940@query.sources.register('describe-ecs-container-instance') 

941class ECSContainerInstanceDescribeSource(ECSClusterResourceDescribeSource): 

942 

943 def process_cluster_resources(self, client, cluster_id, container_instances): 

944 results = [] 

945 for service_set in chunks(container_instances, self.manager.chunk_size): 

946 r = client.describe_container_instances( 

947 cluster=cluster_id, 

948 include=['TAGS'], 

949 containerInstances=service_set).get('containerInstances', []) 

950 # Many Container Instance API calls require the cluster_id, adding as a 

951 # custodian specific key in the resource 

952 for i in r: 

953 i['c7n:cluster'] = cluster_id 

954 results.extend(r) 

955 ecs_tag_normalize(results) 

956 return results 

957 

958 

959@ContainerInstance.filter_registry.register('subnet') 

960class ContainerInstanceSubnetFilter(net_filters.SubnetFilter): 

961 

962 RelatedIdsExpression = "attributes[?name == 'ecs.subnet-id'].value[]" 

963 

964 

965@ContainerInstance.action_registry.register('set-state') 

966class SetState(BaseAction): 

967 """Updates a container instance to either ACTIVE or DRAINING 

968 

969 :example: 

970 

971 .. code-block:: yaml 

972 

973 policies: 

974 - name: drain-container-instances 

975 resource: ecs-container-instance 

976 actions: 

977 - type: set-state 

978 state: DRAINING 

979 """ 

980 schema = type_schema( 

981 'set-state', 

982 state={"type": "string", 'enum': ['DRAINING', 'ACTIVE']}) 

983 permissions = ('ecs:UpdateContainerInstancesState',) 

984 

985 def process(self, resources): 

986 cluster_map = group_by(resources, 'c7n:cluster') 

987 for cluster in cluster_map: 

988 c_instances = [i['containerInstanceArn'] for i in cluster_map[cluster] 

989 if i['status'] != self.data.get('state')] 

990 results = self.process_cluster(cluster, c_instances) 

991 return results 

992 

993 def process_cluster(self, cluster, c_instances): 

994 # Limit on number of container instance that can be updated in a single 

995 # update_container_instances_state call is 10 

996 chunk_size = 10 

997 client = local_session(self.manager.session_factory).client('ecs') 

998 for service_set in chunks(c_instances, chunk_size): 

999 try: 

1000 client.update_container_instances_state( 

1001 cluster=cluster, 

1002 containerInstances=service_set, 

1003 status=self.data.get('state')) 

1004 except ClientError: 

1005 self.manager.log.warning( 

1006 'Failed to update Container Instances State: %s, cluster %s' % 

1007 (service_set, cluster)) 

1008 raise 

1009 

1010 

1011@ContainerInstance.action_registry.register('update-agent') 

1012class UpdateAgent(BaseAction): 

1013 """Updates the agent on a container instance 

1014 """ 

1015 

1016 schema = type_schema('update-agent') 

1017 permissions = ('ecs:UpdateContainerAgent',) 

1018 

1019 def process(self, resources): 

1020 client = local_session(self.manager.session_factory).client('ecs') 

1021 for r in resources: 

1022 self.process_instance( 

1023 client, r.get('c7n:cluster'), r.get('containerInstanceArn')) 

1024 

1025 def process_instance(self, client, cluster, instance): 

1026 try: 

1027 client.update_container_agent( 

1028 cluster=cluster, containerInstance=instance) 

1029 except (client.exceptions.NoUpdateAvailableException, 

1030 client.exceptions.UpdateInProgressException): 

1031 return 

1032 

1033 

1034@ECSCluster.action_registry.register('tag') 

1035@TaskDefinition.action_registry.register('tag') 

1036@Service.action_registry.register('tag') 

1037@Task.action_registry.register('tag') 

1038@ContainerInstance.action_registry.register('tag') 

1039class TagEcsResource(Tag): 

1040 """Action to create tag(s) on an ECS resource 

1041 (ecs, ecs-task-definition, ecs-service, ecs-task, ecs-container-instance) 

1042 

1043 Requires arns in new format for tasks, services, and container-instances. 

1044 https://docs.aws.amazon.com/AmazonECS/latest/userguide/ecs-resource-ids.html 

1045 

1046 :example: 

1047 

1048 .. code-block:: yaml 

1049 

1050 policies: 

1051 - name: tag-ecs-service 

1052 resource: ecs-service 

1053 filters: 

1054 - "tag:target-tag": absent 

1055 - type: taggable 

1056 state: true 

1057 actions: 

1058 - type: tag 

1059 key: target-tag 

1060 value: target-value 

1061 """ 

1062 permissions = ('ecs:TagResource',) 

1063 batch_size = 1 

1064 

1065 def process_resource_set(self, client, resources, tags): 

1066 mid = self.manager.resource_type.id 

1067 tags = [{'key': t['Key'], 'value': t['Value']} for t in tags] 

1068 old_arns = 0 

1069 for r in resources: 

1070 if not ecs_taggable(self.manager.resource_type, r): 

1071 old_arns += 1 

1072 continue 

1073 client.tag_resource(resourceArn=r[mid], tags=tags) 

1074 if old_arns: 

1075 self.log.warn("Couldn't tag %d resource(s). Needs new ARN format", old_arns) 

1076 

1077 

1078@ECSCluster.action_registry.register('remove-tag') 

1079@TaskDefinition.action_registry.register('remove-tag') 

1080@Service.action_registry.register('remove-tag') 

1081@Task.action_registry.register('remove-tag') 

1082@ContainerInstance.action_registry.register('remove-tag') 

1083class RemoveTagEcsResource(RemoveTag): 

1084 """Remove tag(s) from ECS resources 

1085 (ecs, ecs-task-definition, ecs-service, ecs-task, ecs-container-instance) 

1086 

1087 :example: 

1088 

1089 .. code-block:: yaml 

1090 

1091 policies: 

1092 - name: ecs-service-remove-tag 

1093 resource: ecs-service 

1094 filters: 

1095 - type: taggable 

1096 state: true 

1097 actions: 

1098 - type: remove-tag 

1099 tags: ["BadTag"] 

1100 """ 

1101 permissions = ('ecs:UntagResource',) 

1102 batch_size = 1 

1103 

1104 def process_resource_set(self, client, resources, keys): 

1105 old_arns = 0 

1106 for r in resources: 

1107 if not ecs_taggable(self.manager.resource_type, r): 

1108 old_arns += 1 

1109 continue 

1110 client.untag_resource(resourceArn=r[self.id_key], tagKeys=keys) 

1111 if old_arns != 0: 

1112 self.log.warn("Couldn't untag %d resource(s). Needs new ARN format", old_arns) 

1113 

1114 

1115@ECSCluster.action_registry.register('mark-for-op') 

1116@TaskDefinition.action_registry.register('mark-for-op') 

1117@Service.action_registry.register('mark-for-op') 

1118@Task.action_registry.register('mark-for-op') 

1119@ContainerInstance.action_registry.register('mark-for-op') 

1120class MarkEcsResourceForOp(TagDelayedAction): 

1121 """Mark ECS resources for deferred action 

1122 (ecs, ecs-task-definition, ecs-service, ecs-task, ecs-container-instance) 

1123 

1124 Requires arns in new format for tasks, services, and container-instances. 

1125 https://docs.aws.amazon.com/AmazonECS/latest/userguide/ecs-resource-ids.html 

1126 

1127 :example: 

1128 

1129 .. code-block:: yaml 

1130 

1131 policies: 

1132 - name: ecs-service-invalid-tag-stop 

1133 resource: ecs-service 

1134 filters: 

1135 - "tag:InvalidTag": present 

1136 - type: taggable 

1137 state: true 

1138 actions: 

1139 - type: mark-for-op 

1140 op: delete 

1141 days: 1 

1142 """ 

1143 

1144 

1145@Service.filter_registry.register('taggable') 

1146@Task.filter_registry.register('taggable') 

1147@ContainerInstance.filter_registry.register('taggable') 

1148class ECSTaggable(Filter): 

1149 """ 

1150 Filter ECS resources on arn-format 

1151 https://docs.aws.amazon.com/AmazonECS/latest/userguide/ecs-resource-ids.html 

1152 :example: 

1153 

1154 .. code-block:: yaml 

1155 

1156 policies: 

1157 - name: taggable 

1158 resource: ecs-service 

1159 filters: 

1160 - type: taggable 

1161 state: True 

1162 """ 

1163 

1164 schema = type_schema('taggable', state={'type': 'boolean'}) 

1165 

1166 def get_permissions(self): 

1167 return self.manager.get_permissions() 

1168 

1169 def process(self, resources, event=None): 

1170 if not self.data.get('state'): 

1171 return [r for r in resources if not ecs_taggable(self.manager.resource_type, r)] 

1172 else: 

1173 return [r for r in resources if ecs_taggable(self.manager.resource_type, r)] 

1174 

1175 

1176ECSCluster.filter_registry.register('marked-for-op', TagActionFilter) 

1177TaskDefinition.filter_registry.register('marked-for-op', TagActionFilter) 

1178Service.filter_registry.register('marked-for-op', TagActionFilter) 

1179Task.filter_registry.register('marked-for-op', TagActionFilter) 

1180ContainerInstance.filter_registry.register('marked-for-op', TagActionFilter) 

1181 

1182ECSCluster.action_registry.register('auto-tag-user', AutoTagUser) 

1183TaskDefinition.action_registry.register('auto-tag-user', AutoTagUser) 

1184Service.action_registry.register('auto-tag-user', AutoTagUser) 

1185Task.action_registry.register('auto-tag-user', AutoTagUser) 

1186ContainerInstance.action_registry.register('auto-tag-user', AutoTagUser) 

1187 

1188Service.filter_registry.register('offhour', OffHour) 

1189Service.filter_registry.register('onhour', OnHour) 

1190 

1191 

1192@Service.action_registry.register('resize') 

1193class AutoscalingECSService(AutoscalingBase): 

1194 permissions = ( 

1195 'ecs:UpdateService', 

1196 'ecs:TagResource', 

1197 'ecs:UntagResource', 

1198 ) 

1199 

1200 service_namespace = 'ecs' 

1201 scalable_dimension = 'ecs:service:DesiredCount' 

1202 

1203 def get_resource_id(self, resource): 

1204 return resource['serviceArn'].split(':')[-1] 

1205 

1206 def get_resource_tag(self, resource, key): 

1207 if 'Tags' in resource: 

1208 for tag in resource['Tags']: 

1209 if tag['Key'] == key: 

1210 return tag['Value'] 

1211 return None 

1212 

1213 def get_resource_desired(self, resource): 

1214 return int(resource['desiredCount']) 

1215 

1216 def set_resource_desired(self, resource, desired): 

1217 client = local_session(self.manager.session_factory).client('ecs') 

1218 client.update_service( 

1219 cluster=resource['clusterArn'], 

1220 service=resource['serviceName'], 

1221 desiredCount=desired, 

1222 )