1# Copyright The Cloud Custodian Authors.
2# SPDX-License-Identifier: Apache-2.0
3from datetime import datetime
4import re
5
6from concurrent.futures import as_completed
7from dateutil.tz import tzutc
8from dateutil.parser import parse
9
10from c7n.actions import (
11 ActionRegistry, BaseAction, ModifyVpcSecurityGroupsAction)
12from c7n.filters import FilterRegistry, AgeFilter
13import c7n.filters.vpc as net_filters
14from c7n.filters.kms import KmsRelatedFilter
15from c7n.manager import resources
16from c7n.query import QueryResourceManager, TypeInfo
17from c7n.tags import universal_augment
18from c7n.utils import (
19 local_session, chunks, snapshot_identifier, type_schema, jmespath_search)
20
21filters = FilterRegistry('elasticache.filters')
22actions = ActionRegistry('elasticache.actions')
23
24TTYPE = re.compile('cache.t1')
25
26
27@resources.register('cache-cluster')
28class ElastiCacheCluster(QueryResourceManager):
29
30 class resource_type(TypeInfo):
31 service = 'elasticache'
32 arn_type = 'cluster'
33 arn_separator = ":"
34 enum_spec = ('describe_cache_clusters',
35 'CacheClusters[]', None)
36 name = id = 'CacheClusterId'
37 filter_name = 'CacheClusterId'
38 filter_type = 'scalar'
39 date = 'CacheClusterCreateTime'
40 dimension = 'CacheClusterId'
41 universal_taggable = True
42 cfn_type = 'AWS::ElastiCache::CacheCluster'
43 permissions_augment = ("elasticache:ListTagsForResource",)
44
45 filter_registry = filters
46 action_registry = actions
47 permissions = ('elasticache:ListTagsForResource',)
48 augment = universal_augment
49
50
51@filters.register('security-group')
52class SecurityGroupFilter(net_filters.SecurityGroupFilter):
53
54 RelatedIdsExpression = "SecurityGroups[].SecurityGroupId"
55
56
57@filters.register('subnet')
58class SubnetFilter(net_filters.SubnetFilter):
59 """Filters elasticache clusters based on their associated subnet
60
61 :example:
62
63 .. code-block:: yaml
64
65 policies:
66 - name: elasticache-in-subnet-x
67 resource: cache-cluster
68 filters:
69 - type: subnet
70 key: SubnetId
71 value: subnet-12ab34cd
72 """
73
74 RelatedIdsExpression = ""
75
76 def get_related_ids(self, resources):
77 group_ids = set()
78 for r in resources:
79 group_ids.update(
80 [s['SubnetIdentifier'] for s in
81 self.groups[r['CacheSubnetGroupName']]['Subnets']])
82 return group_ids
83
84 def process(self, resources, event=None):
85 self.groups = {
86 r['CacheSubnetGroupName']: r for r in
87 self.manager.get_resource_manager(
88 'cache-subnet-group').resources()}
89 return super(SubnetFilter, self).process(resources, event)
90
91
92filters.register('network-location', net_filters.NetworkLocation)
93
94
95@actions.register('delete')
96class DeleteElastiCacheCluster(BaseAction):
97 """Action to delete an elasticache cluster
98
99 To prevent unwanted deletion of elasticache clusters, it is recommended
100 to include a filter
101
102 :example:
103
104 .. code-block:: yaml
105
106 policies:
107 - name: elasticache-delete-stale-clusters
108 resource: cache-cluster
109 filters:
110 - type: value
111 value_type: age
112 key: CacheClusterCreateTime
113 op: ge
114 value: 90
115 actions:
116 - type: delete
117 skip-snapshot: false
118 """
119
120 schema = type_schema(
121 'delete', **{'skip-snapshot': {'type': 'boolean'}})
122 permissions = ('elasticache:DeleteCacheCluster',
123 'elasticache:DeleteReplicationGroup')
124
125 def process(self, clusters):
126 skip = self.data.get('skip-snapshot', False)
127 client = local_session(
128 self.manager.session_factory).client('elasticache')
129
130 clusters_to_delete = []
131 replication_groups_to_delete = set()
132 for cluster in clusters:
133 if cluster.get('ReplicationGroupId') in self.fetch_global_ds_rpgs():
134 self.log.info(
135 f"Skipping {cluster['CacheClusterId']}: associated with a global datastore")
136 continue
137 if cluster.get('ReplicationGroupId', ''):
138 replication_groups_to_delete.add(cluster['ReplicationGroupId'])
139 else:
140 clusters_to_delete.append(cluster)
141 # added if statement to handle differences in parameters if snapshot is skipped
142 for cluster in clusters_to_delete:
143 params = {'CacheClusterId': cluster['CacheClusterId']}
144 if _cluster_eligible_for_snapshot(cluster) and not skip:
145 params['FinalSnapshotIdentifier'] = snapshot_identifier(
146 'Final', cluster['CacheClusterId'])
147 self.log.debug(
148 "Taking final snapshot of %s", cluster['CacheClusterId'])
149 else:
150 self.log.debug(
151 "Skipping final snapshot of %s", cluster['CacheClusterId'])
152 client.delete_cache_cluster(**params)
153 self.log.info(
154 'Deleted ElastiCache cluster: %s',
155 cluster['CacheClusterId'])
156
157 cacheClusterIds = set([c["CacheClusterId"] for c in clusters])
158 for replication_group in replication_groups_to_delete:
159 # NOTE don't delete the group if it's not empty
160 rg = client.describe_replication_groups(
161 ReplicationGroupId=replication_group)["ReplicationGroups"][0]
162 if not all(cluster in cacheClusterIds for cluster in rg["MemberClusters"]):
163 # NOTE mark members for better presentation on notifications
164 for c in clusters:
165 if c.get("ReplicationGroupId") == replication_group:
166 c["MemberClusters"] = rg["MemberClusters"]
167 self.log.info(f'{replication_group} is not empty: {rg["MemberClusters"]}')
168 continue
169
170 params = {'ReplicationGroupId': replication_group,
171 'RetainPrimaryCluster': False}
172 if not skip:
173 params['FinalSnapshotIdentifier'] = snapshot_identifier(
174 'Final', replication_group)
175 client.delete_replication_group(**params)
176
177 self.log.info(
178 'Deleted ElastiCache replication group: %s',
179 replication_group)
180
181 def fetch_global_ds_rpgs(self):
182 rpgs = self.manager.get_resource_manager('elasticache-group').resources(augment=False)
183 global_rpgs = get_global_datastore_association(rpgs)
184 return global_rpgs
185
186
187@actions.register('snapshot')
188class SnapshotElastiCacheCluster(BaseAction):
189 """Action to snapshot an elasticache cluster
190
191 :example:
192
193 .. code-block:: yaml
194
195 policies:
196 - name: elasticache-cluster-snapshot
197 resource: cache-cluster
198 filters:
199 - type: value
200 key: CacheClusterStatus
201 op: not-in
202 value: ["deleted","deleting","creating"]
203 actions:
204 - snapshot
205 """
206
207 schema = type_schema('snapshot')
208 permissions = ('elasticache:CreateSnapshot',)
209
210 def process(self, clusters):
211 set_size = len(clusters)
212 clusters = [c for c in clusters if _cluster_eligible_for_snapshot(c)]
213 if set_size != len(clusters):
214 self.log.info(
215 "action:snapshot implicitly filtered from %d to %d clusters for snapshot support",
216 set_size, len(clusters))
217
218 with self.executor_factory(max_workers=2) as w:
219 futures = []
220 client = local_session(self.manager.session_factory).client('elasticache')
221 for cluster in clusters:
222 futures.append(
223 w.submit(self.process_cluster_snapshot, client, cluster))
224
225 for f in as_completed(futures):
226 if f.exception():
227 self.log.error(
228 "Exception creating cache cluster snapshot \n %s",
229 f.exception())
230 return clusters
231
232 def process_cluster_snapshot(self, client, cluster):
233 client.create_snapshot(
234 SnapshotName=snapshot_identifier(
235 'Backup',
236 cluster['CacheClusterId']),
237 CacheClusterId=cluster['CacheClusterId'])
238
239
240@actions.register('modify-security-groups')
241class ElasticacheClusterModifyVpcSecurityGroups(ModifyVpcSecurityGroupsAction):
242 """Modify security groups on an Elasticache cluster.
243
244 Looks at the individual clusters and modifies the Replication
245 Group's configuration for Security groups so all nodes get
246 affected equally
247
248 """
249 permissions = ('elasticache:ModifyReplicationGroup',)
250
251 def process(self, clusters):
252 replication_group_map = {}
253 client = local_session(
254 self.manager.session_factory).client('elasticache')
255 groups = super(
256 ElasticacheClusterModifyVpcSecurityGroups, self).get_groups(
257 clusters)
258 for idx, c in enumerate(clusters):
259 # build map of Replication Groups to Security Groups
260 replication_group_map[c['ReplicationGroupId']] = groups[idx]
261
262 for idx, r in enumerate(replication_group_map.keys()):
263 client.modify_replication_group(
264 ReplicationGroupId=r,
265 SecurityGroupIds=replication_group_map[r])
266
267
268@resources.register('cache-subnet-group')
269class ElastiCacheSubnetGroup(QueryResourceManager):
270
271 class resource_type(TypeInfo):
272 service = 'elasticache'
273 arn_type = 'subnetgroup'
274 enum_spec = ('describe_cache_subnet_groups',
275 'CacheSubnetGroups', None)
276 name = id = 'CacheSubnetGroupName'
277 filter_name = 'CacheSubnetGroupName'
278 filter_type = 'scalar'
279 cfn_type = 'AWS::ElastiCache::SubnetGroup'
280
281
282@resources.register('cache-snapshot')
283class ElastiCacheSnapshot(QueryResourceManager):
284
285 class resource_type(TypeInfo):
286 service = 'elasticache'
287 arn_type = 'snapshot'
288 arn_separator = ":"
289 enum_spec = ('describe_snapshots', 'Snapshots', None)
290 name = id = 'SnapshotName'
291 filter_name = 'SnapshotName'
292 filter_type = 'scalar'
293 date = 'StartTime'
294 universal_taggable = True
295
296 permissions = ('elasticache:ListTagsForResource',)
297
298 def augment(self, resources):
299 return universal_augment(self, resources)
300
301
302@ElastiCacheSnapshot.filter_registry.register('age')
303class ElastiCacheSnapshotAge(AgeFilter):
304 """Filters elasticache snapshots based on their age (in days)
305
306 :example:
307
308 .. code-block:: yaml
309
310 policies:
311 - name: elasticache-stale-snapshots
312 resource: cache-snapshot
313 filters:
314 - type: age
315 days: 30
316 op: ge
317 """
318
319 schema = type_schema(
320 'age', days={'type': 'number'},
321 op={'$ref': '#/definitions/filters_common/comparison_operators'})
322
323 date_attribute = 'dummy'
324
325 def get_resource_date(self, snapshot):
326 """ Override superclass method as there is no single snapshot date attribute.
327 """
328 def to_datetime(v):
329 if not isinstance(v, datetime):
330 v = parse(v)
331 if not v.tzinfo:
332 v = v.replace(tzinfo=tzutc())
333 return v
334
335 # Return the earliest of the node snaphot creation times.
336 return min([to_datetime(ns['SnapshotCreateTime'])
337 for ns in snapshot['NodeSnapshots']])
338
339
340@ElastiCacheSnapshot.action_registry.register('delete')
341class DeleteElastiCacheSnapshot(BaseAction):
342 """Action to delete elasticache snapshots
343
344 To prevent unwanted deletion of elasticache snapshots, it is recommended to
345 apply a filter
346
347 :example:
348
349 .. code-block:: yaml
350
351 policies:
352 - name: delete-elasticache-stale-snapshots
353 resource: cache-snapshot
354 filters:
355 - type: age
356 days: 30
357 op: ge
358 actions:
359 - delete
360 """
361
362 schema = type_schema('delete')
363 permissions = ('elasticache:DeleteSnapshot',)
364
365 def process(self, snapshots):
366 self.log.info("Deleting %d ElastiCache snapshots", len(snapshots))
367 with self.executor_factory(max_workers=3) as w:
368 futures = []
369 client = local_session(self.manager.session_factory).client('elasticache')
370 for snapshot_set in chunks(reversed(snapshots), size=50):
371 futures.append(
372 w.submit(self.process_snapshot_set, client, snapshot_set))
373 for f in as_completed(futures):
374 if f.exception():
375 self.log.error(
376 "Exception deleting snapshot set \n %s",
377 f.exception())
378 return snapshots
379
380 def process_snapshot_set(self, client, snapshots_set):
381 for s in snapshots_set:
382 client.delete_snapshot(SnapshotName=s['SnapshotName'])
383
384
385@ElastiCacheSnapshot.action_registry.register('copy-cluster-tags')
386class CopyClusterTags(BaseAction):
387 """
388 Copy specified tags from Elasticache cluster to Snapshot
389 :example:
390
391 .. code-block:: yaml
392
393 - name: elasticache-test
394 resource: cache-snapshot
395 filters:
396 - type: value
397 key: SnapshotName
398 op: in
399 value:
400 - test-tags-backup
401 actions:
402 - type: copy-cluster-tags
403 tags:
404 - tag1
405 - tag2
406 """
407
408 schema = type_schema(
409 'copy-cluster-tags',
410 tags={'type': 'array', 'items': {'type': 'string'}, 'minItems': 1},
411 required=('tags',))
412
413 def get_permissions(self):
414 perms = self.manager.get_resource_manager('cache-cluster').get_permissions()
415 perms.append('elasticache:AddTagsToResource')
416 return perms
417
418 def process(self, snapshots):
419 client = local_session(self.manager.session_factory).client('elasticache')
420 clusters = {r['CacheClusterId']: r for r in
421 self.manager.get_resource_manager('cache-cluster').resources()}
422 copyable_tags = self.data.get('tags')
423
424 for s in snapshots:
425 # For replicated/sharded clusters it is possible for each
426 # shard to have separate tags, we go ahead and tag the
427 # snap with the union of tags with overlaps getting the
428 # last value (arbitrary if mismatched).
429 if 'CacheClusterId' not in s:
430 cluster_ids = [ns['CacheClusterId'] for ns in s['NodeSnapshots']]
431 else:
432 cluster_ids = [s['CacheClusterId']]
433
434 copy_tags = {}
435 for cid in sorted(cluster_ids):
436 if cid not in clusters:
437 continue
438
439 cluster_tags = {t['Key']: t['Value'] for t in clusters[cid]['Tags']}
440 snap_tags = {t['Key']: t['Value'] for t in s.get('Tags', ())}
441
442 for k, v in cluster_tags.items():
443 if copyable_tags and k not in copyable_tags:
444 continue
445 if k.startswith('aws:'):
446 continue
447 if snap_tags.get(k, '') == v:
448 continue
449 copy_tags[k] = v
450
451 if not copy_tags:
452 continue
453
454 if len(set(copy_tags).union(set(snap_tags))) > 50:
455 self.log.error(
456 "Cant copy tags, max tag limit hit on snapshot:%s",
457 s['SnapshotName'])
458 continue
459
460 arn = self.manager.generate_arn(s['SnapshotName'])
461 self.manager.retry(
462 client.add_tags_to_resource,
463 ResourceName=arn,
464 Tags=[{'Key': k, 'Value': v} for k, v in copy_tags.items()])
465
466
467def _cluster_eligible_for_snapshot(cluster):
468 # added regex search to filter unsupported cachenode types
469 return (
470 cluster['Engine'] != 'memcached' and not
471 TTYPE.match(cluster['CacheNodeType'])
472 )
473
474
475@resources.register('elasticache-group')
476class ElastiCacheReplicationGroup(QueryResourceManager):
477
478 class resource_type(TypeInfo):
479 service = "elasticache"
480 enum_spec = ('describe_replication_groups',
481 'ReplicationGroups[]', None)
482 arn_type = 'replicationgroup'
483 id = name = dimension = 'ReplicationGroupId'
484 cfn_type = 'AWS::ElastiCache::ReplicationGroup'
485 arn_separator = ":"
486 universal_taggable = object()
487
488 augment = universal_augment
489 permissions = ('elasticache:DescribeReplicationGroups',)
490
491
492@ElastiCacheReplicationGroup.filter_registry.register('kms-key')
493class KmsFilter(KmsRelatedFilter):
494
495 RelatedIdsExpression = 'KmsKeyId'
496
497
498def get_global_datastore_association(resources):
499 global_ds_rpgs = set()
500 for r in resources:
501 global_ds_association = jmespath_search(
502 'GlobalReplicationGroupInfo.GlobalReplicationGroupId', r)
503 if global_ds_association:
504 global_ds_rpgs.add(r['ReplicationGroupId'])
505 return global_ds_rpgs
506
507
508@ElastiCacheReplicationGroup.action_registry.register('delete')
509class DeleteReplicationGroup(BaseAction):
510 """Action to delete a cache replication group
511
512 :example:
513
514 .. code-block:: yaml
515
516 policies:
517 - name: elasticache-delete-replication-group
518 resource: aws.elasticache-group
519 filters:
520 - type: value
521 key: AtRestEncryptionEnabled
522 value: False
523 actions:
524 - type: delete
525 snapshot: False
526
527 """
528 schema = type_schema(
529 'delete', **{'snapshot': {'type': 'boolean'}})
530
531 valid_origin_states = ('available',)
532 permissions = ('elasticache:DeleteReplicationGroup',)
533
534 def process(self, resources):
535 resources = self.filter_resources(resources, 'Status', self.valid_origin_states)
536 client = local_session(self.manager.session_factory).client('elasticache')
537 global_datastore_association_map = get_global_datastore_association(resources)
538 for r in resources:
539 if r['ReplicationGroupId'] in global_datastore_association_map:
540 self.log.info(
541 f"Skipping {r['ReplicationGroupId']}: associated with a global datastore")
542 continue
543 params = {'ReplicationGroupId': r['ReplicationGroupId']}
544 if self.data.get('snapshot', False):
545 params.update({'FinalSnapshotIdentifier': r['ReplicationGroupId'] + '-snapshot'})
546 self.manager.retry(client.delete_replication_group, **params, ignore_err_codes=(
547 'ReplicationGroupNotFoundFault',))