1# Copyright The Cloud Custodian Authors.
2# SPDX-License-Identifier: Apache-2.0
3from datetime import datetime
4import re
5
6from concurrent.futures import as_completed
7from dateutil.tz import tzutc
8from dateutil.parser import parse
9
10from c7n.actions import (
11 ActionRegistry, BaseAction, ModifyVpcSecurityGroupsAction)
12from c7n.filters import FilterRegistry, AgeFilter, Filter
13from c7n.filters.core import ComparableVersion
14import c7n.filters.vpc as net_filters
15from c7n.filters.kms import KmsRelatedFilter
16from c7n.manager import resources
17from c7n.query import QueryResourceManager, TypeInfo, DescribeSource, ConfigSource
18from c7n.tags import universal_augment
19from c7n.utils import (
20 local_session, chunks, snapshot_identifier, type_schema, jmespath_search,
21 get_retry, QueryParser)
22
23from .aws import shape_validate
24
25filters = FilterRegistry('elasticache.filters')
26actions = ActionRegistry('elasticache.actions')
27
28TTYPE = re.compile('cache.t1')
29
30
31class ElastiCacheQueryParser(QueryParser):
32
33 QuerySchema = {
34 'ShowCacheNodeInfo': bool,
35 'ShowCacheClustersNotInReplicationGroups': bool,
36 }
37 type_name = "ElastiCache"
38 multi_value = False
39 value_key = 'Value'
40
41
42class DescribeElastiCache(DescribeSource):
43 """
44 Allows to use query to retrieve more information
45
46 :example
47
48 .. code-block:: yaml
49
50 policies:
51 - name: cache-node-with-default-port
52 resource: aws.cache-cluster
53 query:
54 - ShowCacheNodeInfo: true
55 filters:
56 - type: list-item
57 key: CacheNodes
58 attrs:
59 - type: value
60 key: Endpoint.Port
61 value: 11211
62 """
63
64 def get_query_params(self, query_params):
65 query_params = query_params or {}
66 queries = ElastiCacheQueryParser.parse(self.manager.data.get('query', []))
67 for q in queries:
68 query_params.update(q)
69 return query_params
70
71
72@resources.register('cache-cluster')
73class ElastiCacheCluster(QueryResourceManager):
74
75 class resource_type(TypeInfo):
76 service = 'elasticache'
77 arn_type = 'cluster'
78 arn_separator = ":"
79 enum_spec = ('describe_cache_clusters',
80 'CacheClusters[]', None)
81 name = id = 'CacheClusterId'
82 filter_name = 'CacheClusterId'
83 filter_type = 'scalar'
84 date = 'CacheClusterCreateTime'
85 dimension = 'CacheClusterId'
86 universal_taggable = True
87 cfn_type = 'AWS::ElastiCache::CacheCluster'
88 permissions_augment = ("elasticache:ListTagsForResource",)
89
90 filter_registry = filters
91 action_registry = actions
92 permissions = ('elasticache:ListTagsForResource',)
93 augment = universal_augment
94
95 source_mapping = {
96 'describe': DescribeElastiCache,
97 'config': ConfigSource
98 }
99
100
101@filters.register('security-group')
102class SecurityGroupFilter(net_filters.SecurityGroupFilter):
103
104 RelatedIdsExpression = "SecurityGroups[].SecurityGroupId"
105
106
107@filters.register('subnet')
108class SubnetFilter(net_filters.SubnetFilter):
109 """Filters elasticache clusters based on their associated subnet
110
111 :example:
112
113 .. code-block:: yaml
114
115 policies:
116 - name: elasticache-in-subnet-x
117 resource: cache-cluster
118 filters:
119 - type: subnet
120 key: SubnetId
121 value: subnet-12ab34cd
122 """
123
124 RelatedIdsExpression = ""
125
126 def get_related_ids(self, resources):
127 group_ids = set()
128 for r in resources:
129 group_ids.update(
130 [s['SubnetIdentifier'] for s in
131 self.groups[r['CacheSubnetGroupName']]['Subnets']])
132 return group_ids
133
134 def process(self, resources, event=None):
135 self.groups = {
136 r['CacheSubnetGroupName']: r for r in
137 self.manager.get_resource_manager(
138 'cache-subnet-group').resources()}
139 return super(SubnetFilter, self).process(resources, event)
140
141
142@filters.register('vpc')
143class VpcFilter(net_filters.VpcFilter):
144 """Filters elasticache clusters based on their associated VPCs
145
146 :example:
147
148 .. code-block:: yaml
149
150 policies:
151 - name: cache-node-with-default-vpc
152 resource: aws.cache-cluster
153 filters:
154 - type: vpc
155 key: IsDefault
156 value: false
157 """
158
159 RelatedIdsExpression = ""
160
161 def get_related_ids(self, resources):
162 return {
163 self.groups[res['CacheSubnetGroupName']]['VpcId'] for res in resources
164 }
165
166 def process(self, resources, event=None):
167 self.groups = {
168 r['CacheSubnetGroupName']: r
169 for r in self.manager.get_resource_manager('cache-subnet-group').resources()
170 }
171 return super().process(resources, event)
172
173
174filters.register('network-location', net_filters.NetworkLocation)
175
176
177@filters.register('upgrade-available')
178class UpgradeAvailable(Filter):
179 """Scans for ElastiCache clusters available upgrade versions
180
181 This will check all the ElastiCache clusters on the resources, and return
182 a list of viable upgrade options.
183
184 :example:
185
186 .. code-block:: yaml
187
188 policies:
189 - name: elasticache-upgrade-available
190 resource: cache-cluster
191 filters:
192 - type: upgrade-available
193 major: False
194
195 """
196
197 schema = type_schema(
198 'upgrade-available',
199 major={'type': 'boolean'},
200 value={'type': 'boolean'},
201 )
202 permissions = ('elasticache:DescribeServiceUpdates',)
203
204 def process(self, resources, event=None):
205 client = local_session(self.manager.session_factory).client('elasticache')
206 # If `major` is `True`, we should include major version upgrades if found.
207 check_major = self.data.get('major', False)
208 # If `value` is `True`, we should include even matching upgrade versions.
209 check_upgrade_extant = self.data.get('value', True)
210 results = []
211
212 # Get all available service updates
213 paginator = client.get_paginator('describe_service_updates')
214 page_iterator = paginator.paginate(
215 ServiceUpdateStatus=['available']
216 )
217
218 # Build a mapping of available upgrades:
219 # {
220 # engine: {
221 # full_version: ComparableVersion,
222 # }
223 # }
224 available_upgrades = {}
225
226 for page in page_iterator:
227 for service_update in page['ServiceUpdates']:
228 engine_name = service_update.get('Engine', '').lower()
229 engine_version = service_update.get('EngineVersion', '')
230 _, version = _parse_engine_version(engine_version)
231
232 if engine_name and version:
233 available_upgrades.setdefault(engine_name, {})
234 # Process the version information once, rather than N-times below.
235 cversion = ComparableVersion(version)
236 available_upgrades[engine_name].setdefault(version, cversion)
237
238 for resource in resources:
239 resource_engine = resource.get('Engine', '').lower()
240 resource_version = resource.get('EngineVersion', '')
241 resource_version_info = ComparableVersion(resource_version)
242
243 # Check if any available upgrades match this resource's engine
244 for engine_name, engine_versions in available_upgrades.items():
245 if engine_name != resource_engine:
246 continue
247
248 # Check the major version.
249 for full_version, upgrade_version in engine_versions.items():
250 if upgrade_version < resource_version_info:
251 # The resource is newer that the upgrade. Skip.
252 continue
253
254 # The upgrade either matches or is higher. Check the filter
255 # settings if we should include it.
256 if upgrade_version == resource_version_info:
257 if not check_upgrade_extant:
258 continue
259
260 results.append(resource)
261
262 # The parsed upgrade version is greater than resource version.
263 # Check the major version, as well as the filter settings, to
264 # see if we should include it.
265 if upgrade_version.version[0] == resource_version_info.version[0]:
266 results.append(resource)
267 elif check_major:
268 # It's a larger major version, but we're supposed to
269 # include it.
270 results.append(resource)
271
272 return results
273
274
275@actions.register('delete')
276class DeleteElastiCacheCluster(BaseAction):
277 """Action to delete an elasticache cluster
278
279 To prevent unwanted deletion of elasticache clusters, it is recommended
280 to include a filter
281
282 :example:
283
284 .. code-block:: yaml
285
286 policies:
287 - name: elasticache-delete-stale-clusters
288 resource: cache-cluster
289 filters:
290 - type: value
291 value_type: age
292 key: CacheClusterCreateTime
293 op: ge
294 value: 90
295 actions:
296 - type: delete
297 skip-snapshot: false
298 """
299
300 schema = type_schema(
301 'delete', **{'skip-snapshot': {'type': 'boolean'}})
302 permissions = ('elasticache:DeleteCacheCluster',
303 'elasticache:DeleteReplicationGroup')
304
305 def process(self, clusters):
306 skip = self.data.get('skip-snapshot', False)
307 client = local_session(
308 self.manager.session_factory).client('elasticache')
309
310 clusters_to_delete = []
311 replication_groups_to_delete = set()
312 for cluster in clusters:
313 if cluster.get('ReplicationGroupId') in self.fetch_global_ds_rpgs():
314 self.log.info(
315 f"Skipping {cluster['CacheClusterId']}: associated with a global datastore")
316 continue
317 if cluster.get('ReplicationGroupId', ''):
318 replication_groups_to_delete.add(cluster['ReplicationGroupId'])
319 else:
320 clusters_to_delete.append(cluster)
321 # added if statement to handle differences in parameters if snapshot is skipped
322 for cluster in clusters_to_delete:
323 params = {'CacheClusterId': cluster['CacheClusterId']}
324 if _cluster_eligible_for_snapshot(cluster) and not skip:
325 params['FinalSnapshotIdentifier'] = snapshot_identifier(
326 'Final', cluster['CacheClusterId'])
327 self.log.debug(
328 "Taking final snapshot of %s", cluster['CacheClusterId'])
329 else:
330 self.log.debug(
331 "Skipping final snapshot of %s", cluster['CacheClusterId'])
332 client.delete_cache_cluster(**params)
333 self.log.info(
334 'Deleted ElastiCache cluster: %s',
335 cluster['CacheClusterId'])
336
337 cacheClusterIds = set([c["CacheClusterId"] for c in clusters])
338 for replication_group in replication_groups_to_delete:
339 # NOTE don't delete the group if it's not empty
340 rg = client.describe_replication_groups(
341 ReplicationGroupId=replication_group)["ReplicationGroups"][0]
342 if not all(cluster in cacheClusterIds for cluster in rg["MemberClusters"]):
343 # NOTE mark members for better presentation on notifications
344 for c in clusters:
345 if c.get("ReplicationGroupId") == replication_group:
346 c["MemberClusters"] = rg["MemberClusters"]
347 self.log.info(f'{replication_group} is not empty: {rg["MemberClusters"]}')
348 continue
349
350 params = {'ReplicationGroupId': replication_group,
351 'RetainPrimaryCluster': False}
352 if not skip:
353 params['FinalSnapshotIdentifier'] = snapshot_identifier(
354 'Final', replication_group)
355 client.delete_replication_group(**params)
356
357 self.log.info(
358 'Deleted ElastiCache replication group: %s',
359 replication_group)
360
361 def fetch_global_ds_rpgs(self):
362 rpgs = self.manager.get_resource_manager('elasticache-group').resources(augment=False)
363 global_rpgs = get_global_datastore_association(rpgs)
364 return global_rpgs
365
366
367@actions.register('snapshot')
368class SnapshotElastiCacheCluster(BaseAction):
369 """Action to snapshot an elasticache cluster
370
371 :example:
372
373 .. code-block:: yaml
374
375 policies:
376 - name: elasticache-cluster-snapshot
377 resource: cache-cluster
378 filters:
379 - type: value
380 key: CacheClusterStatus
381 op: not-in
382 value: ["deleted","deleting","creating"]
383 actions:
384 - snapshot
385 """
386
387 schema = type_schema('snapshot')
388 permissions = ('elasticache:CreateSnapshot',)
389
390 def process(self, clusters):
391 set_size = len(clusters)
392 clusters = [c for c in clusters if _cluster_eligible_for_snapshot(c)]
393 if set_size != len(clusters):
394 self.log.info(
395 "action:snapshot implicitly filtered from %d to %d clusters for snapshot support",
396 set_size, len(clusters))
397
398 with self.executor_factory(max_workers=2) as w:
399 futures = []
400 client = local_session(self.manager.session_factory).client('elasticache')
401 for cluster in clusters:
402 futures.append(
403 w.submit(self.process_cluster_snapshot, client, cluster))
404
405 for f in as_completed(futures):
406 if f.exception():
407 self.log.error(
408 "Exception creating cache cluster snapshot \n %s",
409 f.exception())
410 return clusters
411
412 def process_cluster_snapshot(self, client, cluster):
413 client.create_snapshot(
414 SnapshotName=snapshot_identifier(
415 'Backup',
416 cluster['CacheClusterId']),
417 CacheClusterId=cluster['CacheClusterId'])
418
419
420@actions.register('modify-security-groups')
421class ElasticacheClusterModifyVpcSecurityGroups(ModifyVpcSecurityGroupsAction):
422 """Modify security groups on an Elasticache cluster.
423
424 Looks at the individual clusters and modifies the Replication
425 Group's configuration for Security groups so all nodes get
426 affected equally
427
428 """
429 permissions = ('elasticache:ModifyReplicationGroup',)
430
431 def process(self, clusters):
432 replication_group_map = {}
433 client = local_session(
434 self.manager.session_factory).client('elasticache')
435 groups = super(
436 ElasticacheClusterModifyVpcSecurityGroups, self).get_groups(
437 clusters)
438 for idx, c in enumerate(clusters):
439 # build map of Replication Groups to Security Groups
440 replication_group_map[c['ReplicationGroupId']] = groups[idx]
441
442 for idx, r in enumerate(replication_group_map.keys()):
443 client.modify_replication_group(
444 ReplicationGroupId=r,
445 SecurityGroupIds=replication_group_map[r])
446
447
448@resources.register('cache-subnet-group')
449class ElastiCacheSubnetGroup(QueryResourceManager):
450
451 class resource_type(TypeInfo):
452 service = 'elasticache'
453 arn_type = 'subnetgroup'
454 enum_spec = ('describe_cache_subnet_groups',
455 'CacheSubnetGroups', None)
456 name = id = 'CacheSubnetGroupName'
457 filter_name = 'CacheSubnetGroupName'
458 filter_type = 'scalar'
459 cfn_type = 'AWS::ElastiCache::SubnetGroup'
460
461
462@resources.register('cache-snapshot')
463class ElastiCacheSnapshot(QueryResourceManager):
464
465 class resource_type(TypeInfo):
466 service = 'elasticache'
467 arn_type = 'snapshot'
468 arn_separator = ":"
469 enum_spec = ('describe_snapshots', 'Snapshots', None)
470 name = id = 'SnapshotName'
471 filter_name = 'SnapshotName'
472 filter_type = 'scalar'
473 date = 'StartTime'
474 universal_taggable = True
475
476 permissions = ('elasticache:ListTagsForResource',)
477
478 def augment(self, resources):
479 return universal_augment(self, resources)
480
481
482@ElastiCacheSnapshot.filter_registry.register('age')
483class ElastiCacheSnapshotAge(AgeFilter):
484 """Filters elasticache snapshots based on their age (in days)
485
486 :example:
487
488 .. code-block:: yaml
489
490 policies:
491 - name: elasticache-stale-snapshots
492 resource: cache-snapshot
493 filters:
494 - type: age
495 days: 30
496 op: ge
497 """
498
499 schema = type_schema(
500 'age', days={'type': 'number'},
501 op={'$ref': '#/definitions/filters_common/comparison_operators'})
502
503 date_attribute = 'dummy'
504
505 def get_resource_date(self, snapshot):
506 """ Override superclass method as there is no single snapshot date attribute.
507 """
508 def to_datetime(v):
509 if not isinstance(v, datetime):
510 v = parse(v)
511 if not v.tzinfo:
512 v = v.replace(tzinfo=tzutc())
513 return v
514
515 # Return the earliest of the node snaphot creation times.
516 return min([to_datetime(ns['SnapshotCreateTime'])
517 for ns in snapshot['NodeSnapshots']])
518
519
520@ElastiCacheSnapshot.action_registry.register('delete')
521class DeleteElastiCacheSnapshot(BaseAction):
522 """Action to delete elasticache snapshots
523
524 To prevent unwanted deletion of elasticache snapshots, it is recommended to
525 apply a filter
526
527 :example:
528
529 .. code-block:: yaml
530
531 policies:
532 - name: delete-elasticache-stale-snapshots
533 resource: cache-snapshot
534 filters:
535 - type: age
536 days: 30
537 op: ge
538 actions:
539 - delete
540 """
541
542 schema = type_schema('delete')
543 permissions = ('elasticache:DeleteSnapshot',)
544
545 def process(self, snapshots):
546 self.log.info("Deleting %d ElastiCache snapshots", len(snapshots))
547 with self.executor_factory(max_workers=3) as w:
548 futures = []
549 client = local_session(self.manager.session_factory).client('elasticache')
550 for snapshot_set in chunks(reversed(snapshots), size=50):
551 futures.append(
552 w.submit(self.process_snapshot_set, client, snapshot_set))
553 for f in as_completed(futures):
554 if f.exception():
555 self.log.error(
556 "Exception deleting snapshot set \n %s",
557 f.exception())
558 return snapshots
559
560 def process_snapshot_set(self, client, snapshots_set):
561 for s in snapshots_set:
562 client.delete_snapshot(SnapshotName=s['SnapshotName'])
563
564
565@ElastiCacheSnapshot.action_registry.register('copy-cluster-tags')
566class CopyClusterTags(BaseAction):
567 """
568 Copy specified tags from Elasticache cluster to Snapshot
569 :example:
570
571 .. code-block:: yaml
572
573 - name: elasticache-test
574 resource: cache-snapshot
575 filters:
576 - type: value
577 key: SnapshotName
578 op: in
579 value:
580 - test-tags-backup
581 actions:
582 - type: copy-cluster-tags
583 tags:
584 - tag1
585 - tag2
586 """
587
588 schema = type_schema(
589 'copy-cluster-tags',
590 tags={'type': 'array', 'items': {'type': 'string'}, 'minItems': 1},
591 required=('tags',))
592
593 def get_permissions(self):
594 perms = self.manager.get_resource_manager('cache-cluster').get_permissions()
595 perms.append('elasticache:AddTagsToResource')
596 return perms
597
598 def process(self, snapshots):
599 client = local_session(self.manager.session_factory).client('elasticache')
600 clusters = {r['CacheClusterId']: r for r in
601 self.manager.get_resource_manager('cache-cluster').resources()}
602 copyable_tags = self.data.get('tags')
603
604 for s in snapshots:
605 # For replicated/sharded clusters it is possible for each
606 # shard to have separate tags, we go ahead and tag the
607 # snap with the union of tags with overlaps getting the
608 # last value (arbitrary if mismatched).
609 if 'CacheClusterId' not in s:
610 cluster_ids = [ns['CacheClusterId'] for ns in s['NodeSnapshots']]
611 else:
612 cluster_ids = [s['CacheClusterId']]
613
614 copy_tags = {}
615 for cid in sorted(cluster_ids):
616 if cid not in clusters:
617 continue
618
619 cluster_tags = {t['Key']: t['Value'] for t in clusters[cid]['Tags']}
620 snap_tags = {t['Key']: t['Value'] for t in s.get('Tags', ())}
621
622 for k, v in cluster_tags.items():
623 if copyable_tags and k not in copyable_tags:
624 continue
625 if k.startswith('aws:'):
626 continue
627 if snap_tags.get(k, '') == v:
628 continue
629 copy_tags[k] = v
630
631 if not copy_tags:
632 continue
633
634 if len(set(copy_tags).union(set(snap_tags))) > 50:
635 self.log.error(
636 "Cant copy tags, max tag limit hit on snapshot:%s",
637 s['SnapshotName'])
638 continue
639
640 arn = self.manager.generate_arn(s['SnapshotName'])
641 self.manager.retry(
642 client.add_tags_to_resource,
643 ResourceName=arn,
644 Tags=[{'Key': k, 'Value': v} for k, v in copy_tags.items()])
645
646
647def _parse_engine_version(engine_version):
648 """Parse EngineVersion string into (engine_name, version) tuple
649
650 Example inputs:
651 - "redis-7.0" -> ("redis", "7.0")
652 - "memcached-1.6.12" -> ("memcached", "1.6.12")
653 - "valkey-7.2" -> ("valkey", "7.2")
654 """
655 if not engine_version or '-' not in engine_version:
656 return None, None
657
658 # Handle the strange `and onwards` verbiage.
659 engine_version = engine_version.replace('and onwards', '').strip()
660
661 parts = engine_version.split('-', 1)
662 return parts[0].lower(), parts[1]
663
664
665def _cluster_eligible_for_snapshot(cluster):
666 # added regex search to filter unsupported cachenode types
667 return (
668 cluster['Engine'] != 'memcached' and not
669 TTYPE.match(cluster['CacheNodeType'])
670 )
671
672
673@resources.register('elasticache-group')
674class ElastiCacheReplicationGroup(QueryResourceManager):
675
676 class resource_type(TypeInfo):
677 service = "elasticache"
678 enum_spec = ('describe_replication_groups',
679 'ReplicationGroups[]', None)
680 arn_type = 'replicationgroup'
681 id = name = dimension = 'ReplicationGroupId'
682 cfn_type = 'AWS::ElastiCache::ReplicationGroup'
683 arn_separator = ":"
684 universal_taggable = object()
685
686 augment = universal_augment
687 permissions = ('elasticache:DescribeReplicationGroups',)
688
689
690@ElastiCacheReplicationGroup.filter_registry.register('kms-key')
691class KmsFilter(KmsRelatedFilter):
692
693 RelatedIdsExpression = 'KmsKeyId'
694
695
696def get_global_datastore_association(resources):
697 global_ds_rpgs = set()
698 for r in resources:
699 global_ds_association = jmespath_search(
700 'GlobalReplicationGroupInfo.GlobalReplicationGroupId', r)
701 if global_ds_association:
702 global_ds_rpgs.add(r['ReplicationGroupId'])
703 return global_ds_rpgs
704
705
706@ElastiCacheReplicationGroup.action_registry.register('delete')
707class DeleteReplicationGroup(BaseAction):
708 """Action to delete a cache replication group
709
710 :example:
711
712 .. code-block:: yaml
713
714 policies:
715 - name: elasticache-delete-replication-group
716 resource: aws.elasticache-group
717 filters:
718 - type: value
719 key: AtRestEncryptionEnabled
720 value: False
721 actions:
722 - type: delete
723 snapshot: False
724
725 """
726 schema = type_schema(
727 'delete', **{'snapshot': {'type': 'boolean'}})
728
729 valid_origin_states = ('available',)
730 permissions = ('elasticache:DeleteReplicationGroup',)
731
732 def process(self, resources):
733 resources = self.filter_resources(resources, 'Status', self.valid_origin_states)
734 client = local_session(self.manager.session_factory).client('elasticache')
735 global_datastore_association_map = get_global_datastore_association(resources)
736 for r in resources:
737 if r['ReplicationGroupId'] in global_datastore_association_map:
738 self.log.info(
739 f"Skipping {r['ReplicationGroupId']}: associated with a global datastore")
740 continue
741 params = {'ReplicationGroupId': r['ReplicationGroupId']}
742 if self.data.get('snapshot', False):
743 params.update({'FinalSnapshotIdentifier': r['ReplicationGroupId'] + '-snapshot'})
744 self.manager.retry(client.delete_replication_group, **params, ignore_err_codes=(
745 'ReplicationGroupNotFoundFault',))
746
747
748@resources.register('elasticache-user')
749class ElastiCacheUser(QueryResourceManager):
750
751 class resource_type(TypeInfo):
752 service = 'elasticache'
753 enum_spec = ('describe_users', 'Users[]', None)
754 arn_separator = ":"
755 arn_type = 'user'
756 arn = 'ARN'
757 id = 'UserId'
758 name = 'UserName'
759 cfn_type = 'AWS::ElastiCache::User'
760 universal_taggable = object()
761
762 augment = universal_augment
763 permissions = ('elasticache:DescribeUsers',)
764
765
766@ElastiCacheUser.action_registry.register('delete')
767class DeleteUser(BaseAction):
768 """Action to delete a cache user
769
770 :example:
771
772 .. code-block:: yaml
773
774 policies:
775 - name: elasticache-delete-user
776 resource: aws.elasticache-user
777 filters:
778 - type: value
779 key: Authentication.Type
780 value: no-password
781 actions:
782 - delete
783
784 """
785 schema = type_schema('delete')
786
787 permissions = ('elasticache:DeleteUser',)
788
789 def process(self, resources):
790 client = local_session(self.manager.session_factory).client('elasticache')
791 retry = get_retry(('ThrottlingException', 'InvalidUserStateFault'))
792 for r in resources:
793 retry(client.delete_user, UserId=r['UserId'], ignore_err_codes=('UserNotFoundFault',))
794
795
796@ElastiCacheUser.action_registry.register('modify')
797class ModifyUser(BaseAction):
798 """Action to modify a cache user
799
800 :example:
801
802 .. code-block:: yaml
803
804 policies:
805 - name: elasticache-modify-user
806 resource: aws.elasticache-user
807 filters:
808 - type: value
809 key: Authentication.Type
810 value: no-password
811 actions:
812 - type: modify
813 attributes:
814 AuthenticationMode:
815 Type: password
816 Passwords:
817 - "password"
818 """
819
820 permissions = ('elasticache:ModifyUser',)
821 schema = type_schema(
822 'modify',
823 attributes={'type:': 'object'},
824 required=('attributes',))
825 shape = 'ModifyUserMessage'
826
827 def validate(self):
828 req = dict(self.data["attributes"])
829 req["UserId"] = "validate"
830 return shape_validate(
831 req, self.shape, self.manager.resource_type.service
832 )
833
834 def process(self, resources):
835 client = local_session(self.manager.session_factory).client('elasticache')
836 retry = get_retry(('ThrottlingException', 'InvalidUserStateFault'))
837 for r in resources:
838 retry(client.modify_user, UserId=r['UserId'], **self.data["attributes"],
839 ignore_err_codes=('UserNotFoundFault',))
840
841
842@resources.register('elasticache-reserved')
843class ReservedCacheNodes(QueryResourceManager):
844 """Lists all the reserved cache nodes
845
846 :example:
847
848 .. code-block:: yaml
849
850 policies:
851 - name: reserved-cache-nodes-expiring
852 resource: aws.elasticache-reserved
853 filters:
854 - type: value
855 key: State
856 value: active
857
858 """
859
860 class resource_type(TypeInfo):
861 service = 'elasticache'
862 name = id = 'ReservedCacheNodeId'
863 date = 'StartTime'
864 enum_spec = (
865 'describe_reserved_cache_nodes',
866 'ReservedCacheNodes',
867 None,
868 )
869 filter_name = 'ReservedCacheNodeId'
870 filter_type = 'scalar'
871 arn_type = "reserved-instance"
872 permissions_enum = ('elasticache:DescribeReservedCacheNodes',)