Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/c7n/resources/elasticache.py: 49%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

373 statements  

1# Copyright The Cloud Custodian Authors. 

2# SPDX-License-Identifier: Apache-2.0 

3from datetime import datetime 

4import re 

5 

6from concurrent.futures import as_completed 

7from dateutil.tz import tzutc 

8from dateutil.parser import parse 

9 

10from c7n.actions import ( 

11 ActionRegistry, BaseAction, ModifyVpcSecurityGroupsAction) 

12from c7n.filters import FilterRegistry, AgeFilter, Filter 

13from c7n.filters.core import ComparableVersion 

14import c7n.filters.vpc as net_filters 

15from c7n.filters.kms import KmsRelatedFilter 

16from c7n.manager import resources 

17from c7n.query import QueryResourceManager, TypeInfo, DescribeSource, ConfigSource 

18from c7n.tags import universal_augment 

19from c7n.utils import ( 

20 local_session, chunks, snapshot_identifier, type_schema, jmespath_search, 

21 get_retry, QueryParser) 

22 

23from .aws import shape_validate 

24 

25filters = FilterRegistry('elasticache.filters') 

26actions = ActionRegistry('elasticache.actions') 

27 

28TTYPE = re.compile('cache.t1') 

29 

30 

31class ElastiCacheQueryParser(QueryParser): 

32 

33 QuerySchema = { 

34 'ShowCacheNodeInfo': bool, 

35 'ShowCacheClustersNotInReplicationGroups': bool, 

36 } 

37 type_name = "ElastiCache" 

38 multi_value = False 

39 value_key = 'Value' 

40 

41 

42class DescribeElastiCache(DescribeSource): 

43 """ 

44 Allows to use query to retrieve more information 

45 

46 :example 

47 

48 .. code-block:: yaml 

49 

50 policies: 

51 - name: cache-node-with-default-port 

52 resource: aws.cache-cluster 

53 query: 

54 - ShowCacheNodeInfo: true 

55 filters: 

56 - type: list-item 

57 key: CacheNodes 

58 attrs: 

59 - type: value 

60 key: Endpoint.Port 

61 value: 11211 

62 """ 

63 

64 def get_query_params(self, query_params): 

65 query_params = query_params or {} 

66 queries = ElastiCacheQueryParser.parse(self.manager.data.get('query', [])) 

67 for q in queries: 

68 query_params.update(q) 

69 return query_params 

70 

71 

72@resources.register('cache-cluster') 

73class ElastiCacheCluster(QueryResourceManager): 

74 

75 class resource_type(TypeInfo): 

76 service = 'elasticache' 

77 arn_type = 'cluster' 

78 arn_separator = ":" 

79 enum_spec = ('describe_cache_clusters', 

80 'CacheClusters[]', None) 

81 name = id = 'CacheClusterId' 

82 filter_name = 'CacheClusterId' 

83 filter_type = 'scalar' 

84 date = 'CacheClusterCreateTime' 

85 dimension = 'CacheClusterId' 

86 universal_taggable = True 

87 cfn_type = 'AWS::ElastiCache::CacheCluster' 

88 permissions_augment = ("elasticache:ListTagsForResource",) 

89 

90 filter_registry = filters 

91 action_registry = actions 

92 permissions = ('elasticache:ListTagsForResource',) 

93 augment = universal_augment 

94 

95 source_mapping = { 

96 'describe': DescribeElastiCache, 

97 'config': ConfigSource 

98 } 

99 

100 

101@filters.register('security-group') 

102class SecurityGroupFilter(net_filters.SecurityGroupFilter): 

103 

104 RelatedIdsExpression = "SecurityGroups[].SecurityGroupId" 

105 

106 

107@filters.register('subnet') 

108class SubnetFilter(net_filters.SubnetFilter): 

109 """Filters elasticache clusters based on their associated subnet 

110 

111 :example: 

112 

113 .. code-block:: yaml 

114 

115 policies: 

116 - name: elasticache-in-subnet-x 

117 resource: cache-cluster 

118 filters: 

119 - type: subnet 

120 key: SubnetId 

121 value: subnet-12ab34cd 

122 """ 

123 

124 RelatedIdsExpression = "" 

125 

126 def get_related_ids(self, resources): 

127 group_ids = set() 

128 for r in resources: 

129 group_ids.update( 

130 [s['SubnetIdentifier'] for s in 

131 self.groups[r['CacheSubnetGroupName']]['Subnets']]) 

132 return group_ids 

133 

134 def process(self, resources, event=None): 

135 self.groups = { 

136 r['CacheSubnetGroupName']: r for r in 

137 self.manager.get_resource_manager( 

138 'cache-subnet-group').resources()} 

139 return super(SubnetFilter, self).process(resources, event) 

140 

141 

142@filters.register('vpc') 

143class VpcFilter(net_filters.VpcFilter): 

144 """Filters elasticache clusters based on their associated VPCs 

145 

146 :example: 

147 

148 .. code-block:: yaml 

149 

150 policies: 

151 - name: cache-node-with-default-vpc 

152 resource: aws.cache-cluster 

153 filters: 

154 - type: vpc 

155 key: IsDefault 

156 value: false 

157 """ 

158 

159 RelatedIdsExpression = "" 

160 

161 def get_related_ids(self, resources): 

162 return { 

163 self.groups[res['CacheSubnetGroupName']]['VpcId'] for res in resources 

164 } 

165 

166 def process(self, resources, event=None): 

167 self.groups = { 

168 r['CacheSubnetGroupName']: r 

169 for r in self.manager.get_resource_manager('cache-subnet-group').resources() 

170 } 

171 return super().process(resources, event) 

172 

173 

174filters.register('network-location', net_filters.NetworkLocation) 

175 

176 

177@filters.register('upgrade-available') 

178class UpgradeAvailable(Filter): 

179 """Scans for ElastiCache clusters available upgrade versions 

180 

181 This will check all the ElastiCache clusters on the resources, and return 

182 a list of viable upgrade options. 

183 

184 :example: 

185 

186 .. code-block:: yaml 

187 

188 policies: 

189 - name: elasticache-upgrade-available 

190 resource: cache-cluster 

191 filters: 

192 - type: upgrade-available 

193 major: False 

194 

195 """ 

196 

197 schema = type_schema( 

198 'upgrade-available', 

199 major={'type': 'boolean'}, 

200 value={'type': 'boolean'}, 

201 ) 

202 permissions = ('elasticache:DescribeServiceUpdates',) 

203 

204 def process(self, resources, event=None): 

205 client = local_session(self.manager.session_factory).client('elasticache') 

206 # If `major` is `True`, we should include major version upgrades if found. 

207 check_major = self.data.get('major', False) 

208 # If `value` is `True`, we should include even matching upgrade versions. 

209 check_upgrade_extant = self.data.get('value', True) 

210 results = [] 

211 

212 # Get all available service updates 

213 paginator = client.get_paginator('describe_service_updates') 

214 page_iterator = paginator.paginate( 

215 ServiceUpdateStatus=['available'] 

216 ) 

217 

218 # Build a mapping of available upgrades: 

219 # { 

220 # engine: { 

221 # full_version: ComparableVersion, 

222 # } 

223 # } 

224 available_upgrades = {} 

225 

226 for page in page_iterator: 

227 for service_update in page['ServiceUpdates']: 

228 engine_name = service_update.get('Engine', '').lower() 

229 engine_version = service_update.get('EngineVersion', '') 

230 _, version = _parse_engine_version(engine_version) 

231 

232 if engine_name and version: 

233 available_upgrades.setdefault(engine_name, {}) 

234 # Process the version information once, rather than N-times below. 

235 cversion = ComparableVersion(version) 

236 available_upgrades[engine_name].setdefault(version, cversion) 

237 

238 for resource in resources: 

239 resource_engine = resource.get('Engine', '').lower() 

240 resource_version = resource.get('EngineVersion', '') 

241 resource_version_info = ComparableVersion(resource_version) 

242 

243 # Check if any available upgrades match this resource's engine 

244 for engine_name, engine_versions in available_upgrades.items(): 

245 if engine_name != resource_engine: 

246 continue 

247 

248 # Check the major version. 

249 for full_version, upgrade_version in engine_versions.items(): 

250 if upgrade_version < resource_version_info: 

251 # The resource is newer that the upgrade. Skip. 

252 continue 

253 

254 # The upgrade either matches or is higher. Check the filter 

255 # settings if we should include it. 

256 if upgrade_version == resource_version_info: 

257 if not check_upgrade_extant: 

258 continue 

259 

260 results.append(resource) 

261 

262 # The parsed upgrade version is greater than resource version. 

263 # Check the major version, as well as the filter settings, to 

264 # see if we should include it. 

265 if upgrade_version.version[0] == resource_version_info.version[0]: 

266 results.append(resource) 

267 elif check_major: 

268 # It's a larger major version, but we're supposed to 

269 # include it. 

270 results.append(resource) 

271 

272 return results 

273 

274 

275@actions.register('delete') 

276class DeleteElastiCacheCluster(BaseAction): 

277 """Action to delete an elasticache cluster 

278 

279 To prevent unwanted deletion of elasticache clusters, it is recommended 

280 to include a filter 

281 

282 :example: 

283 

284 .. code-block:: yaml 

285 

286 policies: 

287 - name: elasticache-delete-stale-clusters 

288 resource: cache-cluster 

289 filters: 

290 - type: value 

291 value_type: age 

292 key: CacheClusterCreateTime 

293 op: ge 

294 value: 90 

295 actions: 

296 - type: delete 

297 skip-snapshot: false 

298 """ 

299 

300 schema = type_schema( 

301 'delete', **{'skip-snapshot': {'type': 'boolean'}}) 

302 permissions = ('elasticache:DeleteCacheCluster', 

303 'elasticache:DeleteReplicationGroup') 

304 

305 def process(self, clusters): 

306 skip = self.data.get('skip-snapshot', False) 

307 client = local_session( 

308 self.manager.session_factory).client('elasticache') 

309 

310 clusters_to_delete = [] 

311 replication_groups_to_delete = set() 

312 for cluster in clusters: 

313 if cluster.get('ReplicationGroupId') in self.fetch_global_ds_rpgs(): 

314 self.log.info( 

315 f"Skipping {cluster['CacheClusterId']}: associated with a global datastore") 

316 continue 

317 if cluster.get('ReplicationGroupId', ''): 

318 replication_groups_to_delete.add(cluster['ReplicationGroupId']) 

319 else: 

320 clusters_to_delete.append(cluster) 

321 # added if statement to handle differences in parameters if snapshot is skipped 

322 for cluster in clusters_to_delete: 

323 params = {'CacheClusterId': cluster['CacheClusterId']} 

324 if _cluster_eligible_for_snapshot(cluster) and not skip: 

325 params['FinalSnapshotIdentifier'] = snapshot_identifier( 

326 'Final', cluster['CacheClusterId']) 

327 self.log.debug( 

328 "Taking final snapshot of %s", cluster['CacheClusterId']) 

329 else: 

330 self.log.debug( 

331 "Skipping final snapshot of %s", cluster['CacheClusterId']) 

332 client.delete_cache_cluster(**params) 

333 self.log.info( 

334 'Deleted ElastiCache cluster: %s', 

335 cluster['CacheClusterId']) 

336 

337 cacheClusterIds = set([c["CacheClusterId"] for c in clusters]) 

338 for replication_group in replication_groups_to_delete: 

339 # NOTE don't delete the group if it's not empty 

340 rg = client.describe_replication_groups( 

341 ReplicationGroupId=replication_group)["ReplicationGroups"][0] 

342 if not all(cluster in cacheClusterIds for cluster in rg["MemberClusters"]): 

343 # NOTE mark members for better presentation on notifications 

344 for c in clusters: 

345 if c.get("ReplicationGroupId") == replication_group: 

346 c["MemberClusters"] = rg["MemberClusters"] 

347 self.log.info(f'{replication_group} is not empty: {rg["MemberClusters"]}') 

348 continue 

349 

350 params = {'ReplicationGroupId': replication_group, 

351 'RetainPrimaryCluster': False} 

352 if not skip: 

353 params['FinalSnapshotIdentifier'] = snapshot_identifier( 

354 'Final', replication_group) 

355 client.delete_replication_group(**params) 

356 

357 self.log.info( 

358 'Deleted ElastiCache replication group: %s', 

359 replication_group) 

360 

361 def fetch_global_ds_rpgs(self): 

362 rpgs = self.manager.get_resource_manager('elasticache-group').resources(augment=False) 

363 global_rpgs = get_global_datastore_association(rpgs) 

364 return global_rpgs 

365 

366 

367@actions.register('snapshot') 

368class SnapshotElastiCacheCluster(BaseAction): 

369 """Action to snapshot an elasticache cluster 

370 

371 :example: 

372 

373 .. code-block:: yaml 

374 

375 policies: 

376 - name: elasticache-cluster-snapshot 

377 resource: cache-cluster 

378 filters: 

379 - type: value 

380 key: CacheClusterStatus 

381 op: not-in 

382 value: ["deleted","deleting","creating"] 

383 actions: 

384 - snapshot 

385 """ 

386 

387 schema = type_schema('snapshot') 

388 permissions = ('elasticache:CreateSnapshot',) 

389 

390 def process(self, clusters): 

391 set_size = len(clusters) 

392 clusters = [c for c in clusters if _cluster_eligible_for_snapshot(c)] 

393 if set_size != len(clusters): 

394 self.log.info( 

395 "action:snapshot implicitly filtered from %d to %d clusters for snapshot support", 

396 set_size, len(clusters)) 

397 

398 with self.executor_factory(max_workers=2) as w: 

399 futures = [] 

400 client = local_session(self.manager.session_factory).client('elasticache') 

401 for cluster in clusters: 

402 futures.append( 

403 w.submit(self.process_cluster_snapshot, client, cluster)) 

404 

405 for f in as_completed(futures): 

406 if f.exception(): 

407 self.log.error( 

408 "Exception creating cache cluster snapshot \n %s", 

409 f.exception()) 

410 return clusters 

411 

412 def process_cluster_snapshot(self, client, cluster): 

413 client.create_snapshot( 

414 SnapshotName=snapshot_identifier( 

415 'Backup', 

416 cluster['CacheClusterId']), 

417 CacheClusterId=cluster['CacheClusterId']) 

418 

419 

420@actions.register('modify-security-groups') 

421class ElasticacheClusterModifyVpcSecurityGroups(ModifyVpcSecurityGroupsAction): 

422 """Modify security groups on an Elasticache cluster. 

423 

424 Looks at the individual clusters and modifies the Replication 

425 Group's configuration for Security groups so all nodes get 

426 affected equally 

427 

428 """ 

429 permissions = ('elasticache:ModifyReplicationGroup',) 

430 

431 def process(self, clusters): 

432 replication_group_map = {} 

433 client = local_session( 

434 self.manager.session_factory).client('elasticache') 

435 groups = super( 

436 ElasticacheClusterModifyVpcSecurityGroups, self).get_groups( 

437 clusters) 

438 for idx, c in enumerate(clusters): 

439 # build map of Replication Groups to Security Groups 

440 replication_group_map[c['ReplicationGroupId']] = groups[idx] 

441 

442 for idx, r in enumerate(replication_group_map.keys()): 

443 client.modify_replication_group( 

444 ReplicationGroupId=r, 

445 SecurityGroupIds=replication_group_map[r]) 

446 

447 

448@resources.register('cache-subnet-group') 

449class ElastiCacheSubnetGroup(QueryResourceManager): 

450 

451 class resource_type(TypeInfo): 

452 service = 'elasticache' 

453 arn_type = 'subnetgroup' 

454 enum_spec = ('describe_cache_subnet_groups', 

455 'CacheSubnetGroups', None) 

456 name = id = 'CacheSubnetGroupName' 

457 filter_name = 'CacheSubnetGroupName' 

458 filter_type = 'scalar' 

459 cfn_type = 'AWS::ElastiCache::SubnetGroup' 

460 

461 

462@resources.register('cache-snapshot') 

463class ElastiCacheSnapshot(QueryResourceManager): 

464 

465 class resource_type(TypeInfo): 

466 service = 'elasticache' 

467 arn_type = 'snapshot' 

468 arn_separator = ":" 

469 enum_spec = ('describe_snapshots', 'Snapshots', None) 

470 name = id = 'SnapshotName' 

471 filter_name = 'SnapshotName' 

472 filter_type = 'scalar' 

473 date = 'StartTime' 

474 universal_taggable = True 

475 

476 permissions = ('elasticache:ListTagsForResource',) 

477 

478 def augment(self, resources): 

479 return universal_augment(self, resources) 

480 

481 

482@ElastiCacheSnapshot.filter_registry.register('age') 

483class ElastiCacheSnapshotAge(AgeFilter): 

484 """Filters elasticache snapshots based on their age (in days) 

485 

486 :example: 

487 

488 .. code-block:: yaml 

489 

490 policies: 

491 - name: elasticache-stale-snapshots 

492 resource: cache-snapshot 

493 filters: 

494 - type: age 

495 days: 30 

496 op: ge 

497 """ 

498 

499 schema = type_schema( 

500 'age', days={'type': 'number'}, 

501 op={'$ref': '#/definitions/filters_common/comparison_operators'}) 

502 

503 date_attribute = 'dummy' 

504 

505 def get_resource_date(self, snapshot): 

506 """ Override superclass method as there is no single snapshot date attribute. 

507 """ 

508 def to_datetime(v): 

509 if not isinstance(v, datetime): 

510 v = parse(v) 

511 if not v.tzinfo: 

512 v = v.replace(tzinfo=tzutc()) 

513 return v 

514 

515 # Return the earliest of the node snaphot creation times. 

516 return min([to_datetime(ns['SnapshotCreateTime']) 

517 for ns in snapshot['NodeSnapshots']]) 

518 

519 

520@ElastiCacheSnapshot.action_registry.register('delete') 

521class DeleteElastiCacheSnapshot(BaseAction): 

522 """Action to delete elasticache snapshots 

523 

524 To prevent unwanted deletion of elasticache snapshots, it is recommended to 

525 apply a filter 

526 

527 :example: 

528 

529 .. code-block:: yaml 

530 

531 policies: 

532 - name: delete-elasticache-stale-snapshots 

533 resource: cache-snapshot 

534 filters: 

535 - type: age 

536 days: 30 

537 op: ge 

538 actions: 

539 - delete 

540 """ 

541 

542 schema = type_schema('delete') 

543 permissions = ('elasticache:DeleteSnapshot',) 

544 

545 def process(self, snapshots): 

546 self.log.info("Deleting %d ElastiCache snapshots", len(snapshots)) 

547 with self.executor_factory(max_workers=3) as w: 

548 futures = [] 

549 client = local_session(self.manager.session_factory).client('elasticache') 

550 for snapshot_set in chunks(reversed(snapshots), size=50): 

551 futures.append( 

552 w.submit(self.process_snapshot_set, client, snapshot_set)) 

553 for f in as_completed(futures): 

554 if f.exception(): 

555 self.log.error( 

556 "Exception deleting snapshot set \n %s", 

557 f.exception()) 

558 return snapshots 

559 

560 def process_snapshot_set(self, client, snapshots_set): 

561 for s in snapshots_set: 

562 client.delete_snapshot(SnapshotName=s['SnapshotName']) 

563 

564 

565@ElastiCacheSnapshot.action_registry.register('copy-cluster-tags') 

566class CopyClusterTags(BaseAction): 

567 """ 

568 Copy specified tags from Elasticache cluster to Snapshot 

569 :example: 

570 

571 .. code-block:: yaml 

572 

573 - name: elasticache-test 

574 resource: cache-snapshot 

575 filters: 

576 - type: value 

577 key: SnapshotName 

578 op: in 

579 value: 

580 - test-tags-backup 

581 actions: 

582 - type: copy-cluster-tags 

583 tags: 

584 - tag1 

585 - tag2 

586 """ 

587 

588 schema = type_schema( 

589 'copy-cluster-tags', 

590 tags={'type': 'array', 'items': {'type': 'string'}, 'minItems': 1}, 

591 required=('tags',)) 

592 

593 def get_permissions(self): 

594 perms = self.manager.get_resource_manager('cache-cluster').get_permissions() 

595 perms.append('elasticache:AddTagsToResource') 

596 return perms 

597 

598 def process(self, snapshots): 

599 client = local_session(self.manager.session_factory).client('elasticache') 

600 clusters = {r['CacheClusterId']: r for r in 

601 self.manager.get_resource_manager('cache-cluster').resources()} 

602 copyable_tags = self.data.get('tags') 

603 

604 for s in snapshots: 

605 # For replicated/sharded clusters it is possible for each 

606 # shard to have separate tags, we go ahead and tag the 

607 # snap with the union of tags with overlaps getting the 

608 # last value (arbitrary if mismatched). 

609 if 'CacheClusterId' not in s: 

610 cluster_ids = [ns['CacheClusterId'] for ns in s['NodeSnapshots']] 

611 else: 

612 cluster_ids = [s['CacheClusterId']] 

613 

614 copy_tags = {} 

615 for cid in sorted(cluster_ids): 

616 if cid not in clusters: 

617 continue 

618 

619 cluster_tags = {t['Key']: t['Value'] for t in clusters[cid]['Tags']} 

620 snap_tags = {t['Key']: t['Value'] for t in s.get('Tags', ())} 

621 

622 for k, v in cluster_tags.items(): 

623 if copyable_tags and k not in copyable_tags: 

624 continue 

625 if k.startswith('aws:'): 

626 continue 

627 if snap_tags.get(k, '') == v: 

628 continue 

629 copy_tags[k] = v 

630 

631 if not copy_tags: 

632 continue 

633 

634 if len(set(copy_tags).union(set(snap_tags))) > 50: 

635 self.log.error( 

636 "Cant copy tags, max tag limit hit on snapshot:%s", 

637 s['SnapshotName']) 

638 continue 

639 

640 arn = self.manager.generate_arn(s['SnapshotName']) 

641 self.manager.retry( 

642 client.add_tags_to_resource, 

643 ResourceName=arn, 

644 Tags=[{'Key': k, 'Value': v} for k, v in copy_tags.items()]) 

645 

646 

647def _parse_engine_version(engine_version): 

648 """Parse EngineVersion string into (engine_name, version) tuple 

649 

650 Example inputs: 

651 - "redis-7.0" -> ("redis", "7.0") 

652 - "memcached-1.6.12" -> ("memcached", "1.6.12") 

653 - "valkey-7.2" -> ("valkey", "7.2") 

654 """ 

655 if not engine_version or '-' not in engine_version: 

656 return None, None 

657 

658 # Handle the strange `and onwards` verbiage. 

659 engine_version = engine_version.replace('and onwards', '').strip() 

660 

661 parts = engine_version.split('-', 1) 

662 return parts[0].lower(), parts[1] 

663 

664 

665def _cluster_eligible_for_snapshot(cluster): 

666 # added regex search to filter unsupported cachenode types 

667 return ( 

668 cluster['Engine'] != 'memcached' and not 

669 TTYPE.match(cluster['CacheNodeType']) 

670 ) 

671 

672 

673@resources.register('elasticache-group') 

674class ElastiCacheReplicationGroup(QueryResourceManager): 

675 

676 class resource_type(TypeInfo): 

677 service = "elasticache" 

678 enum_spec = ('describe_replication_groups', 

679 'ReplicationGroups[]', None) 

680 arn_type = 'replicationgroup' 

681 id = name = dimension = 'ReplicationGroupId' 

682 cfn_type = 'AWS::ElastiCache::ReplicationGroup' 

683 arn_separator = ":" 

684 universal_taggable = object() 

685 

686 augment = universal_augment 

687 permissions = ('elasticache:DescribeReplicationGroups',) 

688 

689 

690@ElastiCacheReplicationGroup.filter_registry.register('kms-key') 

691class KmsFilter(KmsRelatedFilter): 

692 

693 RelatedIdsExpression = 'KmsKeyId' 

694 

695 

696def get_global_datastore_association(resources): 

697 global_ds_rpgs = set() 

698 for r in resources: 

699 global_ds_association = jmespath_search( 

700 'GlobalReplicationGroupInfo.GlobalReplicationGroupId', r) 

701 if global_ds_association: 

702 global_ds_rpgs.add(r['ReplicationGroupId']) 

703 return global_ds_rpgs 

704 

705 

706@ElastiCacheReplicationGroup.action_registry.register('delete') 

707class DeleteReplicationGroup(BaseAction): 

708 """Action to delete a cache replication group 

709 

710 :example: 

711 

712 .. code-block:: yaml 

713 

714 policies: 

715 - name: elasticache-delete-replication-group 

716 resource: aws.elasticache-group 

717 filters: 

718 - type: value 

719 key: AtRestEncryptionEnabled 

720 value: False 

721 actions: 

722 - type: delete 

723 snapshot: False 

724 

725 """ 

726 schema = type_schema( 

727 'delete', **{'snapshot': {'type': 'boolean'}}) 

728 

729 valid_origin_states = ('available',) 

730 permissions = ('elasticache:DeleteReplicationGroup',) 

731 

732 def process(self, resources): 

733 resources = self.filter_resources(resources, 'Status', self.valid_origin_states) 

734 client = local_session(self.manager.session_factory).client('elasticache') 

735 global_datastore_association_map = get_global_datastore_association(resources) 

736 for r in resources: 

737 if r['ReplicationGroupId'] in global_datastore_association_map: 

738 self.log.info( 

739 f"Skipping {r['ReplicationGroupId']}: associated with a global datastore") 

740 continue 

741 params = {'ReplicationGroupId': r['ReplicationGroupId']} 

742 if self.data.get('snapshot', False): 

743 params.update({'FinalSnapshotIdentifier': r['ReplicationGroupId'] + '-snapshot'}) 

744 self.manager.retry(client.delete_replication_group, **params, ignore_err_codes=( 

745 'ReplicationGroupNotFoundFault',)) 

746 

747 

748@resources.register('elasticache-user') 

749class ElastiCacheUser(QueryResourceManager): 

750 

751 class resource_type(TypeInfo): 

752 service = 'elasticache' 

753 enum_spec = ('describe_users', 'Users[]', None) 

754 arn_separator = ":" 

755 arn_type = 'user' 

756 arn = 'ARN' 

757 id = 'UserId' 

758 name = 'UserName' 

759 cfn_type = 'AWS::ElastiCache::User' 

760 universal_taggable = object() 

761 

762 augment = universal_augment 

763 permissions = ('elasticache:DescribeUsers',) 

764 

765 

766@ElastiCacheUser.action_registry.register('delete') 

767class DeleteUser(BaseAction): 

768 """Action to delete a cache user 

769 

770 :example: 

771 

772 .. code-block:: yaml 

773 

774 policies: 

775 - name: elasticache-delete-user 

776 resource: aws.elasticache-user 

777 filters: 

778 - type: value 

779 key: Authentication.Type 

780 value: no-password 

781 actions: 

782 - delete 

783 

784 """ 

785 schema = type_schema('delete') 

786 

787 permissions = ('elasticache:DeleteUser',) 

788 

789 def process(self, resources): 

790 client = local_session(self.manager.session_factory).client('elasticache') 

791 retry = get_retry(('ThrottlingException', 'InvalidUserStateFault')) 

792 for r in resources: 

793 retry(client.delete_user, UserId=r['UserId'], ignore_err_codes=('UserNotFoundFault',)) 

794 

795 

796@ElastiCacheUser.action_registry.register('modify') 

797class ModifyUser(BaseAction): 

798 """Action to modify a cache user 

799 

800 :example: 

801 

802 .. code-block:: yaml 

803 

804 policies: 

805 - name: elasticache-modify-user 

806 resource: aws.elasticache-user 

807 filters: 

808 - type: value 

809 key: Authentication.Type 

810 value: no-password 

811 actions: 

812 - type: modify 

813 attributes: 

814 AuthenticationMode: 

815 Type: password 

816 Passwords: 

817 - "password" 

818 """ 

819 

820 permissions = ('elasticache:ModifyUser',) 

821 schema = type_schema( 

822 'modify', 

823 attributes={'type:': 'object'}, 

824 required=('attributes',)) 

825 shape = 'ModifyUserMessage' 

826 

827 def validate(self): 

828 req = dict(self.data["attributes"]) 

829 req["UserId"] = "validate" 

830 return shape_validate( 

831 req, self.shape, self.manager.resource_type.service 

832 ) 

833 

834 def process(self, resources): 

835 client = local_session(self.manager.session_factory).client('elasticache') 

836 retry = get_retry(('ThrottlingException', 'InvalidUserStateFault')) 

837 for r in resources: 

838 retry(client.modify_user, UserId=r['UserId'], **self.data["attributes"], 

839 ignore_err_codes=('UserNotFoundFault',)) 

840 

841 

842@resources.register('elasticache-reserved') 

843class ReservedCacheNodes(QueryResourceManager): 

844 """Lists all the reserved cache nodes 

845 

846 :example: 

847 

848 .. code-block:: yaml 

849 

850 policies: 

851 - name: reserved-cache-nodes-expiring 

852 resource: aws.elasticache-reserved 

853 filters: 

854 - type: value 

855 key: State 

856 value: active 

857 

858 """ 

859 

860 class resource_type(TypeInfo): 

861 service = 'elasticache' 

862 name = id = 'ReservedCacheNodeId' 

863 date = 'StartTime' 

864 enum_spec = ( 

865 'describe_reserved_cache_nodes', 

866 'ReservedCacheNodes', 

867 None, 

868 ) 

869 filter_name = 'ReservedCacheNodeId' 

870 filter_type = 'scalar' 

871 arn_type = "reserved-instance" 

872 permissions_enum = ('elasticache:DescribeReservedCacheNodes',)