Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/c7n/resources/elasticache.py: 48%
254 statements
« prev ^ index » next coverage.py v7.3.2, created at 2023-12-08 06:51 +0000
« prev ^ index » next coverage.py v7.3.2, created at 2023-12-08 06:51 +0000
1# Copyright The Cloud Custodian Authors.
2# SPDX-License-Identifier: Apache-2.0
3from datetime import datetime
4import re
6from concurrent.futures import as_completed
7from dateutil.tz import tzutc
8from dateutil.parser import parse
10from c7n.actions import (
11 ActionRegistry, BaseAction, ModifyVpcSecurityGroupsAction)
12from c7n.filters import FilterRegistry, AgeFilter
13import c7n.filters.vpc as net_filters
14from c7n.filters.kms import KmsRelatedFilter
15from c7n.manager import resources
16from c7n.query import QueryResourceManager, TypeInfo
17from c7n.tags import universal_augment
18from c7n.utils import (
19 local_session, chunks, snapshot_identifier, type_schema, jmespath_search)
21filters = FilterRegistry('elasticache.filters')
22actions = ActionRegistry('elasticache.actions')
24TTYPE = re.compile('cache.t1')
27@resources.register('cache-cluster')
28class ElastiCacheCluster(QueryResourceManager):
30 class resource_type(TypeInfo):
31 service = 'elasticache'
32 arn_type = 'cluster'
33 arn_separator = ":"
34 enum_spec = ('describe_cache_clusters',
35 'CacheClusters[]', None)
36 name = id = 'CacheClusterId'
37 filter_name = 'CacheClusterId'
38 filter_type = 'scalar'
39 date = 'CacheClusterCreateTime'
40 dimension = 'CacheClusterId'
41 universal_taggable = True
42 cfn_type = 'AWS::ElastiCache::CacheCluster'
44 filter_registry = filters
45 action_registry = actions
46 permissions = ('elasticache:ListTagsForResource',)
47 augment = universal_augment
50@filters.register('security-group')
51class SecurityGroupFilter(net_filters.SecurityGroupFilter):
53 RelatedIdsExpression = "SecurityGroups[].SecurityGroupId"
56@filters.register('subnet')
57class SubnetFilter(net_filters.SubnetFilter):
58 """Filters elasticache clusters based on their associated subnet
60 :example:
62 .. code-block:: yaml
64 policies:
65 - name: elasticache-in-subnet-x
66 resource: cache-cluster
67 filters:
68 - type: subnet
69 key: SubnetId
70 value: subnet-12ab34cd
71 """
73 RelatedIdsExpression = ""
75 def get_related_ids(self, resources):
76 group_ids = set()
77 for r in resources:
78 group_ids.update(
79 [s['SubnetIdentifier'] for s in
80 self.groups[r['CacheSubnetGroupName']]['Subnets']])
81 return group_ids
83 def process(self, resources, event=None):
84 self.groups = {
85 r['CacheSubnetGroupName']: r for r in
86 self.manager.get_resource_manager(
87 'cache-subnet-group').resources()}
88 return super(SubnetFilter, self).process(resources, event)
91filters.register('network-location', net_filters.NetworkLocation)
94@actions.register('delete')
95class DeleteElastiCacheCluster(BaseAction):
96 """Action to delete an elasticache cluster
98 To prevent unwanted deletion of elasticache clusters, it is recommended
99 to include a filter
101 :example:
103 .. code-block:: yaml
105 policies:
106 - name: elasticache-delete-stale-clusters
107 resource: cache-cluster
108 filters:
109 - type: value
110 value_type: age
111 key: CacheClusterCreateTime
112 op: ge
113 value: 90
114 actions:
115 - type: delete
116 skip-snapshot: false
117 """
119 schema = type_schema(
120 'delete', **{'skip-snapshot': {'type': 'boolean'}})
121 permissions = ('elasticache:DeleteCacheCluster',
122 'elasticache:DeleteReplicationGroup')
124 def process(self, clusters):
125 skip = self.data.get('skip-snapshot', False)
126 client = local_session(
127 self.manager.session_factory).client('elasticache')
129 clusters_to_delete = []
130 replication_groups_to_delete = set()
131 for cluster in clusters:
132 if cluster.get('ReplicationGroupId') in self.fetch_global_ds_rpgs():
133 self.log.info(
134 f"Skipping {cluster['CacheClusterId']}: associated with a global datastore")
135 continue
136 if cluster.get('ReplicationGroupId', ''):
137 replication_groups_to_delete.add(cluster['ReplicationGroupId'])
138 else:
139 clusters_to_delete.append(cluster)
140 # added if statement to handle differences in parameters if snapshot is skipped
141 for cluster in clusters_to_delete:
142 params = {'CacheClusterId': cluster['CacheClusterId']}
143 if _cluster_eligible_for_snapshot(cluster) and not skip:
144 params['FinalSnapshotIdentifier'] = snapshot_identifier(
145 'Final', cluster['CacheClusterId'])
146 self.log.debug(
147 "Taking final snapshot of %s", cluster['CacheClusterId'])
148 else:
149 self.log.debug(
150 "Skipping final snapshot of %s", cluster['CacheClusterId'])
151 client.delete_cache_cluster(**params)
152 self.log.info(
153 'Deleted ElastiCache cluster: %s',
154 cluster['CacheClusterId'])
156 cacheClusterIds = set([c["CacheClusterId"] for c in clusters])
157 for replication_group in replication_groups_to_delete:
158 # NOTE don't delete the group if it's not empty
159 rg = client.describe_replication_groups(
160 ReplicationGroupId=replication_group)["ReplicationGroups"][0]
161 if not all(cluster in cacheClusterIds for cluster in rg["MemberClusters"]):
162 # NOTE mark members for better presentation on notifications
163 for c in clusters:
164 if c.get("ReplicationGroupId") == replication_group:
165 c["MemberClusters"] = rg["MemberClusters"]
166 self.log.info(f'{replication_group} is not empty: {rg["MemberClusters"]}')
167 continue
169 params = {'ReplicationGroupId': replication_group,
170 'RetainPrimaryCluster': False}
171 if not skip:
172 params['FinalSnapshotIdentifier'] = snapshot_identifier(
173 'Final', replication_group)
174 client.delete_replication_group(**params)
176 self.log.info(
177 'Deleted ElastiCache replication group: %s',
178 replication_group)
180 def fetch_global_ds_rpgs(self):
181 rpgs = self.manager.get_resource_manager('elasticache-group').resources(augment=False)
182 global_rpgs = get_global_datastore_association(rpgs)
183 return global_rpgs
186@actions.register('snapshot')
187class SnapshotElastiCacheCluster(BaseAction):
188 """Action to snapshot an elasticache cluster
190 :example:
192 .. code-block:: yaml
194 policies:
195 - name: elasticache-cluster-snapshot
196 resource: cache-cluster
197 filters:
198 - type: value
199 key: CacheClusterStatus
200 op: not-in
201 value: ["deleted","deleting","creating"]
202 actions:
203 - snapshot
204 """
206 schema = type_schema('snapshot')
207 permissions = ('elasticache:CreateSnapshot',)
209 def process(self, clusters):
210 set_size = len(clusters)
211 clusters = [c for c in clusters if _cluster_eligible_for_snapshot(c)]
212 if set_size != len(clusters):
213 self.log.info(
214 "action:snapshot implicitly filtered from %d to %d clusters for snapshot support",
215 set_size, len(clusters))
217 with self.executor_factory(max_workers=2) as w:
218 futures = []
219 client = local_session(self.manager.session_factory).client('elasticache')
220 for cluster in clusters:
221 futures.append(
222 w.submit(self.process_cluster_snapshot, client, cluster))
224 for f in as_completed(futures):
225 if f.exception():
226 self.log.error(
227 "Exception creating cache cluster snapshot \n %s",
228 f.exception())
229 return clusters
231 def process_cluster_snapshot(self, client, cluster):
232 client.create_snapshot(
233 SnapshotName=snapshot_identifier(
234 'Backup',
235 cluster['CacheClusterId']),
236 CacheClusterId=cluster['CacheClusterId'])
239@actions.register('modify-security-groups')
240class ElasticacheClusterModifyVpcSecurityGroups(ModifyVpcSecurityGroupsAction):
241 """Modify security groups on an Elasticache cluster.
243 Looks at the individual clusters and modifies the Replication
244 Group's configuration for Security groups so all nodes get
245 affected equally
247 """
248 permissions = ('elasticache:ModifyReplicationGroup',)
250 def process(self, clusters):
251 replication_group_map = {}
252 client = local_session(
253 self.manager.session_factory).client('elasticache')
254 groups = super(
255 ElasticacheClusterModifyVpcSecurityGroups, self).get_groups(
256 clusters)
257 for idx, c in enumerate(clusters):
258 # build map of Replication Groups to Security Groups
259 replication_group_map[c['ReplicationGroupId']] = groups[idx]
261 for idx, r in enumerate(replication_group_map.keys()):
262 client.modify_replication_group(
263 ReplicationGroupId=r,
264 SecurityGroupIds=replication_group_map[r])
267@resources.register('cache-subnet-group')
268class ElastiCacheSubnetGroup(QueryResourceManager):
270 class resource_type(TypeInfo):
271 service = 'elasticache'
272 arn_type = 'subnetgroup'
273 enum_spec = ('describe_cache_subnet_groups',
274 'CacheSubnetGroups', None)
275 name = id = 'CacheSubnetGroupName'
276 filter_name = 'CacheSubnetGroupName'
277 filter_type = 'scalar'
278 cfn_type = 'AWS::ElastiCache::SubnetGroup'
281@resources.register('cache-snapshot')
282class ElastiCacheSnapshot(QueryResourceManager):
284 class resource_type(TypeInfo):
285 service = 'elasticache'
286 arn_type = 'snapshot'
287 arn_separator = ":"
288 enum_spec = ('describe_snapshots', 'Snapshots', None)
289 name = id = 'SnapshotName'
290 filter_name = 'SnapshotName'
291 filter_type = 'scalar'
292 date = 'StartTime'
293 universal_taggable = True
295 permissions = ('elasticache:ListTagsForResource',)
297 def augment(self, resources):
298 return universal_augment(self, resources)
301@ElastiCacheSnapshot.filter_registry.register('age')
302class ElastiCacheSnapshotAge(AgeFilter):
303 """Filters elasticache snapshots based on their age (in days)
305 :example:
307 .. code-block:: yaml
309 policies:
310 - name: elasticache-stale-snapshots
311 resource: cache-snapshot
312 filters:
313 - type: age
314 days: 30
315 op: ge
316 """
318 schema = type_schema(
319 'age', days={'type': 'number'},
320 op={'$ref': '#/definitions/filters_common/comparison_operators'})
322 date_attribute = 'dummy'
324 def get_resource_date(self, snapshot):
325 """ Override superclass method as there is no single snapshot date attribute.
326 """
327 def to_datetime(v):
328 if not isinstance(v, datetime):
329 v = parse(v)
330 if not v.tzinfo:
331 v = v.replace(tzinfo=tzutc())
332 return v
334 # Return the earliest of the node snaphot creation times.
335 return min([to_datetime(ns['SnapshotCreateTime'])
336 for ns in snapshot['NodeSnapshots']])
339@ElastiCacheSnapshot.action_registry.register('delete')
340class DeleteElastiCacheSnapshot(BaseAction):
341 """Action to delete elasticache snapshots
343 To prevent unwanted deletion of elasticache snapshots, it is recommended to
344 apply a filter
346 :example:
348 .. code-block:: yaml
350 policies:
351 - name: delete-elasticache-stale-snapshots
352 resource: cache-snapshot
353 filters:
354 - type: age
355 days: 30
356 op: ge
357 actions:
358 - delete
359 """
361 schema = type_schema('delete')
362 permissions = ('elasticache:DeleteSnapshot',)
364 def process(self, snapshots):
365 self.log.info("Deleting %d ElastiCache snapshots", len(snapshots))
366 with self.executor_factory(max_workers=3) as w:
367 futures = []
368 client = local_session(self.manager.session_factory).client('elasticache')
369 for snapshot_set in chunks(reversed(snapshots), size=50):
370 futures.append(
371 w.submit(self.process_snapshot_set, client, snapshot_set))
372 for f in as_completed(futures):
373 if f.exception():
374 self.log.error(
375 "Exception deleting snapshot set \n %s",
376 f.exception())
377 return snapshots
379 def process_snapshot_set(self, client, snapshots_set):
380 for s in snapshots_set:
381 client.delete_snapshot(SnapshotName=s['SnapshotName'])
384@ElastiCacheSnapshot.action_registry.register('copy-cluster-tags')
385class CopyClusterTags(BaseAction):
386 """
387 Copy specified tags from Elasticache cluster to Snapshot
388 :example:
390 .. code-block:: yaml
392 - name: elasticache-test
393 resource: cache-snapshot
394 filters:
395 - type: value
396 key: SnapshotName
397 op: in
398 value:
399 - test-tags-backup
400 actions:
401 - type: copy-cluster-tags
402 tags:
403 - tag1
404 - tag2
405 """
407 schema = type_schema(
408 'copy-cluster-tags',
409 tags={'type': 'array', 'items': {'type': 'string'}, 'minItems': 1},
410 required=('tags',))
412 def get_permissions(self):
413 perms = self.manager.get_resource_manager('cache-cluster').get_permissions()
414 perms.append('elasticache:AddTagsToResource')
415 return perms
417 def process(self, snapshots):
418 client = local_session(self.manager.session_factory).client('elasticache')
419 clusters = {r['CacheClusterId']: r for r in
420 self.manager.get_resource_manager('cache-cluster').resources()}
421 copyable_tags = self.data.get('tags')
423 for s in snapshots:
424 # For replicated/sharded clusters it is possible for each
425 # shard to have separate tags, we go ahead and tag the
426 # snap with the union of tags with overlaps getting the
427 # last value (arbitrary if mismatched).
428 if 'CacheClusterId' not in s:
429 cluster_ids = [ns['CacheClusterId'] for ns in s['NodeSnapshots']]
430 else:
431 cluster_ids = [s['CacheClusterId']]
433 copy_tags = {}
434 for cid in sorted(cluster_ids):
435 if cid not in clusters:
436 continue
438 cluster_tags = {t['Key']: t['Value'] for t in clusters[cid]['Tags']}
439 snap_tags = {t['Key']: t['Value'] for t in s.get('Tags', ())}
441 for k, v in cluster_tags.items():
442 if copyable_tags and k not in copyable_tags:
443 continue
444 if k.startswith('aws:'):
445 continue
446 if snap_tags.get(k, '') == v:
447 continue
448 copy_tags[k] = v
450 if not copy_tags:
451 continue
453 if len(set(copy_tags).union(set(snap_tags))) > 50:
454 self.log.error(
455 "Cant copy tags, max tag limit hit on snapshot:%s",
456 s['SnapshotName'])
457 continue
459 arn = self.manager.generate_arn(s['SnapshotName'])
460 self.manager.retry(
461 client.add_tags_to_resource,
462 ResourceName=arn,
463 Tags=[{'Key': k, 'Value': v} for k, v in copy_tags.items()])
466def _cluster_eligible_for_snapshot(cluster):
467 # added regex search to filter unsupported cachenode types
468 return (
469 cluster['Engine'] != 'memcached' and not
470 TTYPE.match(cluster['CacheNodeType'])
471 )
474@resources.register('elasticache-group')
475class ElastiCacheReplicationGroup(QueryResourceManager):
477 class resource_type(TypeInfo):
478 service = "elasticache"
479 enum_spec = ('describe_replication_groups',
480 'ReplicationGroups[]', None)
481 arn_type = 'replicationgroup'
482 id = name = dimension = 'ReplicationGroupId'
483 cfn_type = 'AWS::ElastiCache::ReplicationGroup'
484 arn_separator = ":"
485 universal_taggable = object()
487 augment = universal_augment
488 permissions = ('elasticache:DescribeReplicationGroups',)
491@ElastiCacheReplicationGroup.filter_registry.register('kms-key')
492class KmsFilter(KmsRelatedFilter):
494 RelatedIdsExpression = 'KmsKeyId'
497def get_global_datastore_association(resources):
498 global_ds_rpgs = set()
499 for r in resources:
500 global_ds_association = jmespath_search(
501 'GlobalReplicationGroupInfo.GlobalReplicationGroupId', r)
502 if global_ds_association:
503 global_ds_rpgs.add(r['ReplicationGroupId'])
504 return global_ds_rpgs
507@ElastiCacheReplicationGroup.action_registry.register('delete')
508class DeleteReplicationGroup(BaseAction):
509 """Action to delete a cache replication group
511 :example:
513 .. code-block:: yaml
515 policies:
516 - name: elasticache-delete-replication-group
517 resource: aws.elasticache-group
518 filters:
519 - type: value
520 key: AtRestEncryptionEnabled
521 value: False
522 actions:
523 - type: delete
524 snapshot: False
526 """
527 schema = type_schema(
528 'delete', **{'snapshot': {'type': 'boolean'}})
530 valid_origin_states = ('available',)
531 permissions = ('elasticache:DeleteReplicationGroup',)
533 def process(self, resources):
534 resources = self.filter_resources(resources, 'Status', self.valid_origin_states)
535 client = local_session(self.manager.session_factory).client('elasticache')
536 global_datastore_association_map = get_global_datastore_association(resources)
537 for r in resources:
538 if r['ReplicationGroupId'] in global_datastore_association_map:
539 self.log.info(
540 f"Skipping {r['ReplicationGroupId']}: associated with a global datastore")
541 continue
542 params = {'ReplicationGroupId': r['ReplicationGroupId']}
543 if self.data.get('snapshot', False):
544 params.update({'FinalSnapshotIdentifier': r['ReplicationGroupId'] + '-snapshot'})
545 self.manager.retry(client.delete_replication_group, **params, ignore_err_codes=(
546 'ReplicationGroupNotFoundFault',))