1# Copyright The Cloud Custodian Authors.
2# SPDX-License-Identifier: Apache-2.0
3from datetime import datetime
4import re
5
6from concurrent.futures import as_completed
7from dateutil.tz import tzutc
8from dateutil.parser import parse
9
10from c7n.actions import (
11 ActionRegistry, BaseAction, ModifyVpcSecurityGroupsAction)
12from c7n.filters import FilterRegistry, AgeFilter
13import c7n.filters.vpc as net_filters
14from c7n.filters.kms import KmsRelatedFilter
15from c7n.manager import resources
16from c7n.query import QueryResourceManager, TypeInfo, DescribeSource, ConfigSource
17from c7n.tags import universal_augment
18from c7n.utils import (
19 local_session, chunks, snapshot_identifier, type_schema, jmespath_search,
20 get_retry, QueryParser)
21
22from .aws import shape_validate
23
24filters = FilterRegistry('elasticache.filters')
25actions = ActionRegistry('elasticache.actions')
26
27TTYPE = re.compile('cache.t1')
28
29
30class ElastiCacheQueryParser(QueryParser):
31
32 QuerySchema = {
33 'ShowCacheNodeInfo': bool,
34 'ShowCacheClustersNotInReplicationGroups': bool,
35 }
36 multi_value = False
37 value_key = 'Value'
38
39
40class DescribeElastiCache(DescribeSource):
41 """
42 Allows to use query to retrieve more information
43
44 :example
45
46 .. code-block:: yaml
47
48 policies:
49 - name: cache-node-with-default-port
50 resource: aws.cache-cluster
51 query:
52 - Name: ShowCacheNodeInfo
53 Value: true
54 filters:
55 - type: list-item
56 key: CacheNodes
57 attrs:
58 - type: value
59 key: Endpoint.Port
60 value: 11211
61 """
62
63 def get_query_params(self, query_params):
64 query_params = query_params or {}
65 for q in ElastiCacheQueryParser.parse(self.manager.data.get('query', [])):
66 query_params[q['Name']] = q['Value']
67 return query_params
68
69
70@resources.register('cache-cluster')
71class ElastiCacheCluster(QueryResourceManager):
72
73 class resource_type(TypeInfo):
74 service = 'elasticache'
75 arn_type = 'cluster'
76 arn_separator = ":"
77 enum_spec = ('describe_cache_clusters',
78 'CacheClusters[]', None)
79 name = id = 'CacheClusterId'
80 filter_name = 'CacheClusterId'
81 filter_type = 'scalar'
82 date = 'CacheClusterCreateTime'
83 dimension = 'CacheClusterId'
84 universal_taggable = True
85 cfn_type = 'AWS::ElastiCache::CacheCluster'
86 permissions_augment = ("elasticache:ListTagsForResource",)
87
88 filter_registry = filters
89 action_registry = actions
90 permissions = ('elasticache:ListTagsForResource',)
91 augment = universal_augment
92
93 source_mapping = {
94 'describe': DescribeElastiCache,
95 'config': ConfigSource
96 }
97
98
99@filters.register('security-group')
100class SecurityGroupFilter(net_filters.SecurityGroupFilter):
101
102 RelatedIdsExpression = "SecurityGroups[].SecurityGroupId"
103
104
105@filters.register('subnet')
106class SubnetFilter(net_filters.SubnetFilter):
107 """Filters elasticache clusters based on their associated subnet
108
109 :example:
110
111 .. code-block:: yaml
112
113 policies:
114 - name: elasticache-in-subnet-x
115 resource: cache-cluster
116 filters:
117 - type: subnet
118 key: SubnetId
119 value: subnet-12ab34cd
120 """
121
122 RelatedIdsExpression = ""
123
124 def get_related_ids(self, resources):
125 group_ids = set()
126 for r in resources:
127 group_ids.update(
128 [s['SubnetIdentifier'] for s in
129 self.groups[r['CacheSubnetGroupName']]['Subnets']])
130 return group_ids
131
132 def process(self, resources, event=None):
133 self.groups = {
134 r['CacheSubnetGroupName']: r for r in
135 self.manager.get_resource_manager(
136 'cache-subnet-group').resources()}
137 return super(SubnetFilter, self).process(resources, event)
138
139
140@filters.register('vpc')
141class VpcFilter(net_filters.VpcFilter):
142 """Filters elasticache clusters based on their associated VPCs
143
144 :example:
145
146 .. code-block:: yaml
147
148 policies:
149 - name: cache-node-with-default-vpc
150 resource: aws.cache-cluster
151 filters:
152 - type: vpc
153 key: IsDefault
154 value: false
155 """
156
157 RelatedIdsExpression = ""
158
159 def get_related_ids(self, resources):
160 return {
161 self.groups[res['CacheSubnetGroupName']]['VpcId'] for res in resources
162 }
163
164 def process(self, resources, event=None):
165 self.groups = {
166 r['CacheSubnetGroupName']: r
167 for r in self.manager.get_resource_manager('cache-subnet-group').resources()
168 }
169 return super().process(resources, event)
170
171
172filters.register('network-location', net_filters.NetworkLocation)
173
174
175@actions.register('delete')
176class DeleteElastiCacheCluster(BaseAction):
177 """Action to delete an elasticache cluster
178
179 To prevent unwanted deletion of elasticache clusters, it is recommended
180 to include a filter
181
182 :example:
183
184 .. code-block:: yaml
185
186 policies:
187 - name: elasticache-delete-stale-clusters
188 resource: cache-cluster
189 filters:
190 - type: value
191 value_type: age
192 key: CacheClusterCreateTime
193 op: ge
194 value: 90
195 actions:
196 - type: delete
197 skip-snapshot: false
198 """
199
200 schema = type_schema(
201 'delete', **{'skip-snapshot': {'type': 'boolean'}})
202 permissions = ('elasticache:DeleteCacheCluster',
203 'elasticache:DeleteReplicationGroup')
204
205 def process(self, clusters):
206 skip = self.data.get('skip-snapshot', False)
207 client = local_session(
208 self.manager.session_factory).client('elasticache')
209
210 clusters_to_delete = []
211 replication_groups_to_delete = set()
212 for cluster in clusters:
213 if cluster.get('ReplicationGroupId') in self.fetch_global_ds_rpgs():
214 self.log.info(
215 f"Skipping {cluster['CacheClusterId']}: associated with a global datastore")
216 continue
217 if cluster.get('ReplicationGroupId', ''):
218 replication_groups_to_delete.add(cluster['ReplicationGroupId'])
219 else:
220 clusters_to_delete.append(cluster)
221 # added if statement to handle differences in parameters if snapshot is skipped
222 for cluster in clusters_to_delete:
223 params = {'CacheClusterId': cluster['CacheClusterId']}
224 if _cluster_eligible_for_snapshot(cluster) and not skip:
225 params['FinalSnapshotIdentifier'] = snapshot_identifier(
226 'Final', cluster['CacheClusterId'])
227 self.log.debug(
228 "Taking final snapshot of %s", cluster['CacheClusterId'])
229 else:
230 self.log.debug(
231 "Skipping final snapshot of %s", cluster['CacheClusterId'])
232 client.delete_cache_cluster(**params)
233 self.log.info(
234 'Deleted ElastiCache cluster: %s',
235 cluster['CacheClusterId'])
236
237 cacheClusterIds = set([c["CacheClusterId"] for c in clusters])
238 for replication_group in replication_groups_to_delete:
239 # NOTE don't delete the group if it's not empty
240 rg = client.describe_replication_groups(
241 ReplicationGroupId=replication_group)["ReplicationGroups"][0]
242 if not all(cluster in cacheClusterIds for cluster in rg["MemberClusters"]):
243 # NOTE mark members for better presentation on notifications
244 for c in clusters:
245 if c.get("ReplicationGroupId") == replication_group:
246 c["MemberClusters"] = rg["MemberClusters"]
247 self.log.info(f'{replication_group} is not empty: {rg["MemberClusters"]}')
248 continue
249
250 params = {'ReplicationGroupId': replication_group,
251 'RetainPrimaryCluster': False}
252 if not skip:
253 params['FinalSnapshotIdentifier'] = snapshot_identifier(
254 'Final', replication_group)
255 client.delete_replication_group(**params)
256
257 self.log.info(
258 'Deleted ElastiCache replication group: %s',
259 replication_group)
260
261 def fetch_global_ds_rpgs(self):
262 rpgs = self.manager.get_resource_manager('elasticache-group').resources(augment=False)
263 global_rpgs = get_global_datastore_association(rpgs)
264 return global_rpgs
265
266
267@actions.register('snapshot')
268class SnapshotElastiCacheCluster(BaseAction):
269 """Action to snapshot an elasticache cluster
270
271 :example:
272
273 .. code-block:: yaml
274
275 policies:
276 - name: elasticache-cluster-snapshot
277 resource: cache-cluster
278 filters:
279 - type: value
280 key: CacheClusterStatus
281 op: not-in
282 value: ["deleted","deleting","creating"]
283 actions:
284 - snapshot
285 """
286
287 schema = type_schema('snapshot')
288 permissions = ('elasticache:CreateSnapshot',)
289
290 def process(self, clusters):
291 set_size = len(clusters)
292 clusters = [c for c in clusters if _cluster_eligible_for_snapshot(c)]
293 if set_size != len(clusters):
294 self.log.info(
295 "action:snapshot implicitly filtered from %d to %d clusters for snapshot support",
296 set_size, len(clusters))
297
298 with self.executor_factory(max_workers=2) as w:
299 futures = []
300 client = local_session(self.manager.session_factory).client('elasticache')
301 for cluster in clusters:
302 futures.append(
303 w.submit(self.process_cluster_snapshot, client, cluster))
304
305 for f in as_completed(futures):
306 if f.exception():
307 self.log.error(
308 "Exception creating cache cluster snapshot \n %s",
309 f.exception())
310 return clusters
311
312 def process_cluster_snapshot(self, client, cluster):
313 client.create_snapshot(
314 SnapshotName=snapshot_identifier(
315 'Backup',
316 cluster['CacheClusterId']),
317 CacheClusterId=cluster['CacheClusterId'])
318
319
320@actions.register('modify-security-groups')
321class ElasticacheClusterModifyVpcSecurityGroups(ModifyVpcSecurityGroupsAction):
322 """Modify security groups on an Elasticache cluster.
323
324 Looks at the individual clusters and modifies the Replication
325 Group's configuration for Security groups so all nodes get
326 affected equally
327
328 """
329 permissions = ('elasticache:ModifyReplicationGroup',)
330
331 def process(self, clusters):
332 replication_group_map = {}
333 client = local_session(
334 self.manager.session_factory).client('elasticache')
335 groups = super(
336 ElasticacheClusterModifyVpcSecurityGroups, self).get_groups(
337 clusters)
338 for idx, c in enumerate(clusters):
339 # build map of Replication Groups to Security Groups
340 replication_group_map[c['ReplicationGroupId']] = groups[idx]
341
342 for idx, r in enumerate(replication_group_map.keys()):
343 client.modify_replication_group(
344 ReplicationGroupId=r,
345 SecurityGroupIds=replication_group_map[r])
346
347
348@resources.register('cache-subnet-group')
349class ElastiCacheSubnetGroup(QueryResourceManager):
350
351 class resource_type(TypeInfo):
352 service = 'elasticache'
353 arn_type = 'subnetgroup'
354 enum_spec = ('describe_cache_subnet_groups',
355 'CacheSubnetGroups', None)
356 name = id = 'CacheSubnetGroupName'
357 filter_name = 'CacheSubnetGroupName'
358 filter_type = 'scalar'
359 cfn_type = 'AWS::ElastiCache::SubnetGroup'
360
361
362@resources.register('cache-snapshot')
363class ElastiCacheSnapshot(QueryResourceManager):
364
365 class resource_type(TypeInfo):
366 service = 'elasticache'
367 arn_type = 'snapshot'
368 arn_separator = ":"
369 enum_spec = ('describe_snapshots', 'Snapshots', None)
370 name = id = 'SnapshotName'
371 filter_name = 'SnapshotName'
372 filter_type = 'scalar'
373 date = 'StartTime'
374 universal_taggable = True
375
376 permissions = ('elasticache:ListTagsForResource',)
377
378 def augment(self, resources):
379 return universal_augment(self, resources)
380
381
382@ElastiCacheSnapshot.filter_registry.register('age')
383class ElastiCacheSnapshotAge(AgeFilter):
384 """Filters elasticache snapshots based on their age (in days)
385
386 :example:
387
388 .. code-block:: yaml
389
390 policies:
391 - name: elasticache-stale-snapshots
392 resource: cache-snapshot
393 filters:
394 - type: age
395 days: 30
396 op: ge
397 """
398
399 schema = type_schema(
400 'age', days={'type': 'number'},
401 op={'$ref': '#/definitions/filters_common/comparison_operators'})
402
403 date_attribute = 'dummy'
404
405 def get_resource_date(self, snapshot):
406 """ Override superclass method as there is no single snapshot date attribute.
407 """
408 def to_datetime(v):
409 if not isinstance(v, datetime):
410 v = parse(v)
411 if not v.tzinfo:
412 v = v.replace(tzinfo=tzutc())
413 return v
414
415 # Return the earliest of the node snaphot creation times.
416 return min([to_datetime(ns['SnapshotCreateTime'])
417 for ns in snapshot['NodeSnapshots']])
418
419
420@ElastiCacheSnapshot.action_registry.register('delete')
421class DeleteElastiCacheSnapshot(BaseAction):
422 """Action to delete elasticache snapshots
423
424 To prevent unwanted deletion of elasticache snapshots, it is recommended to
425 apply a filter
426
427 :example:
428
429 .. code-block:: yaml
430
431 policies:
432 - name: delete-elasticache-stale-snapshots
433 resource: cache-snapshot
434 filters:
435 - type: age
436 days: 30
437 op: ge
438 actions:
439 - delete
440 """
441
442 schema = type_schema('delete')
443 permissions = ('elasticache:DeleteSnapshot',)
444
445 def process(self, snapshots):
446 self.log.info("Deleting %d ElastiCache snapshots", len(snapshots))
447 with self.executor_factory(max_workers=3) as w:
448 futures = []
449 client = local_session(self.manager.session_factory).client('elasticache')
450 for snapshot_set in chunks(reversed(snapshots), size=50):
451 futures.append(
452 w.submit(self.process_snapshot_set, client, snapshot_set))
453 for f in as_completed(futures):
454 if f.exception():
455 self.log.error(
456 "Exception deleting snapshot set \n %s",
457 f.exception())
458 return snapshots
459
460 def process_snapshot_set(self, client, snapshots_set):
461 for s in snapshots_set:
462 client.delete_snapshot(SnapshotName=s['SnapshotName'])
463
464
465@ElastiCacheSnapshot.action_registry.register('copy-cluster-tags')
466class CopyClusterTags(BaseAction):
467 """
468 Copy specified tags from Elasticache cluster to Snapshot
469 :example:
470
471 .. code-block:: yaml
472
473 - name: elasticache-test
474 resource: cache-snapshot
475 filters:
476 - type: value
477 key: SnapshotName
478 op: in
479 value:
480 - test-tags-backup
481 actions:
482 - type: copy-cluster-tags
483 tags:
484 - tag1
485 - tag2
486 """
487
488 schema = type_schema(
489 'copy-cluster-tags',
490 tags={'type': 'array', 'items': {'type': 'string'}, 'minItems': 1},
491 required=('tags',))
492
493 def get_permissions(self):
494 perms = self.manager.get_resource_manager('cache-cluster').get_permissions()
495 perms.append('elasticache:AddTagsToResource')
496 return perms
497
498 def process(self, snapshots):
499 client = local_session(self.manager.session_factory).client('elasticache')
500 clusters = {r['CacheClusterId']: r for r in
501 self.manager.get_resource_manager('cache-cluster').resources()}
502 copyable_tags = self.data.get('tags')
503
504 for s in snapshots:
505 # For replicated/sharded clusters it is possible for each
506 # shard to have separate tags, we go ahead and tag the
507 # snap with the union of tags with overlaps getting the
508 # last value (arbitrary if mismatched).
509 if 'CacheClusterId' not in s:
510 cluster_ids = [ns['CacheClusterId'] for ns in s['NodeSnapshots']]
511 else:
512 cluster_ids = [s['CacheClusterId']]
513
514 copy_tags = {}
515 for cid in sorted(cluster_ids):
516 if cid not in clusters:
517 continue
518
519 cluster_tags = {t['Key']: t['Value'] for t in clusters[cid]['Tags']}
520 snap_tags = {t['Key']: t['Value'] for t in s.get('Tags', ())}
521
522 for k, v in cluster_tags.items():
523 if copyable_tags and k not in copyable_tags:
524 continue
525 if k.startswith('aws:'):
526 continue
527 if snap_tags.get(k, '') == v:
528 continue
529 copy_tags[k] = v
530
531 if not copy_tags:
532 continue
533
534 if len(set(copy_tags).union(set(snap_tags))) > 50:
535 self.log.error(
536 "Cant copy tags, max tag limit hit on snapshot:%s",
537 s['SnapshotName'])
538 continue
539
540 arn = self.manager.generate_arn(s['SnapshotName'])
541 self.manager.retry(
542 client.add_tags_to_resource,
543 ResourceName=arn,
544 Tags=[{'Key': k, 'Value': v} for k, v in copy_tags.items()])
545
546
547def _cluster_eligible_for_snapshot(cluster):
548 # added regex search to filter unsupported cachenode types
549 return (
550 cluster['Engine'] != 'memcached' and not
551 TTYPE.match(cluster['CacheNodeType'])
552 )
553
554
555@resources.register('elasticache-group')
556class ElastiCacheReplicationGroup(QueryResourceManager):
557
558 class resource_type(TypeInfo):
559 service = "elasticache"
560 enum_spec = ('describe_replication_groups',
561 'ReplicationGroups[]', None)
562 arn_type = 'replicationgroup'
563 id = name = dimension = 'ReplicationGroupId'
564 cfn_type = 'AWS::ElastiCache::ReplicationGroup'
565 arn_separator = ":"
566 universal_taggable = object()
567
568 augment = universal_augment
569 permissions = ('elasticache:DescribeReplicationGroups',)
570
571
572@ElastiCacheReplicationGroup.filter_registry.register('kms-key')
573class KmsFilter(KmsRelatedFilter):
574
575 RelatedIdsExpression = 'KmsKeyId'
576
577
578def get_global_datastore_association(resources):
579 global_ds_rpgs = set()
580 for r in resources:
581 global_ds_association = jmespath_search(
582 'GlobalReplicationGroupInfo.GlobalReplicationGroupId', r)
583 if global_ds_association:
584 global_ds_rpgs.add(r['ReplicationGroupId'])
585 return global_ds_rpgs
586
587
588@ElastiCacheReplicationGroup.action_registry.register('delete')
589class DeleteReplicationGroup(BaseAction):
590 """Action to delete a cache replication group
591
592 :example:
593
594 .. code-block:: yaml
595
596 policies:
597 - name: elasticache-delete-replication-group
598 resource: aws.elasticache-group
599 filters:
600 - type: value
601 key: AtRestEncryptionEnabled
602 value: False
603 actions:
604 - type: delete
605 snapshot: False
606
607 """
608 schema = type_schema(
609 'delete', **{'snapshot': {'type': 'boolean'}})
610
611 valid_origin_states = ('available',)
612 permissions = ('elasticache:DeleteReplicationGroup',)
613
614 def process(self, resources):
615 resources = self.filter_resources(resources, 'Status', self.valid_origin_states)
616 client = local_session(self.manager.session_factory).client('elasticache')
617 global_datastore_association_map = get_global_datastore_association(resources)
618 for r in resources:
619 if r['ReplicationGroupId'] in global_datastore_association_map:
620 self.log.info(
621 f"Skipping {r['ReplicationGroupId']}: associated with a global datastore")
622 continue
623 params = {'ReplicationGroupId': r['ReplicationGroupId']}
624 if self.data.get('snapshot', False):
625 params.update({'FinalSnapshotIdentifier': r['ReplicationGroupId'] + '-snapshot'})
626 self.manager.retry(client.delete_replication_group, **params, ignore_err_codes=(
627 'ReplicationGroupNotFoundFault',))
628
629
630@resources.register('elasticache-user')
631class ElastiCacheUser(QueryResourceManager):
632
633 class resource_type(TypeInfo):
634 service = 'elasticache'
635 enum_spec = ('describe_users', 'Users[]', None)
636 arn_separator = ":"
637 arn_type = 'user'
638 arn = 'ARN'
639 id = 'UserId'
640 name = 'UserName'
641 cfn_type = 'AWS::ElastiCache::User'
642 universal_taggable = object()
643
644 augment = universal_augment
645 permissions = ('elasticache:DescribeUsers',)
646
647
648@ElastiCacheUser.action_registry.register('delete')
649class DeleteUser(BaseAction):
650 """Action to delete a cache user
651
652 :example:
653
654 .. code-block:: yaml
655
656 policies:
657 - name: elasticache-delete-user
658 resource: aws.elasticache-user
659 filters:
660 - type: value
661 key: Authentication.Type
662 value: no-password
663 actions:
664 - delete
665
666 """
667 schema = type_schema('delete')
668
669 permissions = ('elasticache:DeleteUser',)
670
671 def process(self, resources):
672 client = local_session(self.manager.session_factory).client('elasticache')
673 retry = get_retry(('ThrottlingException', 'InvalidUserStateFault'))
674 for r in resources:
675 retry(client.delete_user, UserId=r['UserId'], ignore_err_codes=('UserNotFoundFault',))
676
677
678@ElastiCacheUser.action_registry.register('modify')
679class ModifyUser(BaseAction):
680 """Action to modify a cache user
681
682 :example:
683
684 .. code-block:: yaml
685
686 policies:
687 - name: elasticache-modify-user
688 resource: aws.elasticache-user
689 filters:
690 - type: value
691 key: Authentication.Type
692 value: no-password
693 actions:
694 - type: modify
695 attributes:
696 AuthenticationMode:
697 Type: password
698 Passwords:
699 - "password"
700 """
701
702 permissions = ('elasticache:ModifyUser',)
703 schema = type_schema(
704 'modify',
705 attributes={'type:': 'object'},
706 required=('attributes',))
707 shape = 'ModifyUserMessage'
708
709 def validate(self):
710 req = dict(self.data["attributes"])
711 req["UserId"] = "validate"
712 return shape_validate(
713 req, self.shape, self.manager.resource_type.service
714 )
715
716 def process(self, resources):
717 client = local_session(self.manager.session_factory).client('elasticache')
718 retry = get_retry(('ThrottlingException', 'InvalidUserStateFault'))
719 for r in resources:
720 retry(client.modify_user, UserId=r['UserId'], **self.data["attributes"],
721 ignore_err_codes=('UserNotFoundFault',))