Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/c7n/resources/elasticache.py: 48%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

255 statements  

1# Copyright The Cloud Custodian Authors. 

2# SPDX-License-Identifier: Apache-2.0 

3from datetime import datetime 

4import re 

5 

6from concurrent.futures import as_completed 

7from dateutil.tz import tzutc 

8from dateutil.parser import parse 

9 

10from c7n.actions import ( 

11 ActionRegistry, BaseAction, ModifyVpcSecurityGroupsAction) 

12from c7n.filters import FilterRegistry, AgeFilter 

13import c7n.filters.vpc as net_filters 

14from c7n.filters.kms import KmsRelatedFilter 

15from c7n.manager import resources 

16from c7n.query import QueryResourceManager, TypeInfo 

17from c7n.tags import universal_augment 

18from c7n.utils import ( 

19 local_session, chunks, snapshot_identifier, type_schema, jmespath_search) 

20 

21filters = FilterRegistry('elasticache.filters') 

22actions = ActionRegistry('elasticache.actions') 

23 

24TTYPE = re.compile('cache.t1') 

25 

26 

27@resources.register('cache-cluster') 

28class ElastiCacheCluster(QueryResourceManager): 

29 

30 class resource_type(TypeInfo): 

31 service = 'elasticache' 

32 arn_type = 'cluster' 

33 arn_separator = ":" 

34 enum_spec = ('describe_cache_clusters', 

35 'CacheClusters[]', None) 

36 name = id = 'CacheClusterId' 

37 filter_name = 'CacheClusterId' 

38 filter_type = 'scalar' 

39 date = 'CacheClusterCreateTime' 

40 dimension = 'CacheClusterId' 

41 universal_taggable = True 

42 cfn_type = 'AWS::ElastiCache::CacheCluster' 

43 permissions_augment = ("elasticache:ListTagsForResource",) 

44 

45 filter_registry = filters 

46 action_registry = actions 

47 permissions = ('elasticache:ListTagsForResource',) 

48 augment = universal_augment 

49 

50 

51@filters.register('security-group') 

52class SecurityGroupFilter(net_filters.SecurityGroupFilter): 

53 

54 RelatedIdsExpression = "SecurityGroups[].SecurityGroupId" 

55 

56 

57@filters.register('subnet') 

58class SubnetFilter(net_filters.SubnetFilter): 

59 """Filters elasticache clusters based on their associated subnet 

60 

61 :example: 

62 

63 .. code-block:: yaml 

64 

65 policies: 

66 - name: elasticache-in-subnet-x 

67 resource: cache-cluster 

68 filters: 

69 - type: subnet 

70 key: SubnetId 

71 value: subnet-12ab34cd 

72 """ 

73 

74 RelatedIdsExpression = "" 

75 

76 def get_related_ids(self, resources): 

77 group_ids = set() 

78 for r in resources: 

79 group_ids.update( 

80 [s['SubnetIdentifier'] for s in 

81 self.groups[r['CacheSubnetGroupName']]['Subnets']]) 

82 return group_ids 

83 

84 def process(self, resources, event=None): 

85 self.groups = { 

86 r['CacheSubnetGroupName']: r for r in 

87 self.manager.get_resource_manager( 

88 'cache-subnet-group').resources()} 

89 return super(SubnetFilter, self).process(resources, event) 

90 

91 

92filters.register('network-location', net_filters.NetworkLocation) 

93 

94 

95@actions.register('delete') 

96class DeleteElastiCacheCluster(BaseAction): 

97 """Action to delete an elasticache cluster 

98 

99 To prevent unwanted deletion of elasticache clusters, it is recommended 

100 to include a filter 

101 

102 :example: 

103 

104 .. code-block:: yaml 

105 

106 policies: 

107 - name: elasticache-delete-stale-clusters 

108 resource: cache-cluster 

109 filters: 

110 - type: value 

111 value_type: age 

112 key: CacheClusterCreateTime 

113 op: ge 

114 value: 90 

115 actions: 

116 - type: delete 

117 skip-snapshot: false 

118 """ 

119 

120 schema = type_schema( 

121 'delete', **{'skip-snapshot': {'type': 'boolean'}}) 

122 permissions = ('elasticache:DeleteCacheCluster', 

123 'elasticache:DeleteReplicationGroup') 

124 

125 def process(self, clusters): 

126 skip = self.data.get('skip-snapshot', False) 

127 client = local_session( 

128 self.manager.session_factory).client('elasticache') 

129 

130 clusters_to_delete = [] 

131 replication_groups_to_delete = set() 

132 for cluster in clusters: 

133 if cluster.get('ReplicationGroupId') in self.fetch_global_ds_rpgs(): 

134 self.log.info( 

135 f"Skipping {cluster['CacheClusterId']}: associated with a global datastore") 

136 continue 

137 if cluster.get('ReplicationGroupId', ''): 

138 replication_groups_to_delete.add(cluster['ReplicationGroupId']) 

139 else: 

140 clusters_to_delete.append(cluster) 

141 # added if statement to handle differences in parameters if snapshot is skipped 

142 for cluster in clusters_to_delete: 

143 params = {'CacheClusterId': cluster['CacheClusterId']} 

144 if _cluster_eligible_for_snapshot(cluster) and not skip: 

145 params['FinalSnapshotIdentifier'] = snapshot_identifier( 

146 'Final', cluster['CacheClusterId']) 

147 self.log.debug( 

148 "Taking final snapshot of %s", cluster['CacheClusterId']) 

149 else: 

150 self.log.debug( 

151 "Skipping final snapshot of %s", cluster['CacheClusterId']) 

152 client.delete_cache_cluster(**params) 

153 self.log.info( 

154 'Deleted ElastiCache cluster: %s', 

155 cluster['CacheClusterId']) 

156 

157 cacheClusterIds = set([c["CacheClusterId"] for c in clusters]) 

158 for replication_group in replication_groups_to_delete: 

159 # NOTE don't delete the group if it's not empty 

160 rg = client.describe_replication_groups( 

161 ReplicationGroupId=replication_group)["ReplicationGroups"][0] 

162 if not all(cluster in cacheClusterIds for cluster in rg["MemberClusters"]): 

163 # NOTE mark members for better presentation on notifications 

164 for c in clusters: 

165 if c.get("ReplicationGroupId") == replication_group: 

166 c["MemberClusters"] = rg["MemberClusters"] 

167 self.log.info(f'{replication_group} is not empty: {rg["MemberClusters"]}') 

168 continue 

169 

170 params = {'ReplicationGroupId': replication_group, 

171 'RetainPrimaryCluster': False} 

172 if not skip: 

173 params['FinalSnapshotIdentifier'] = snapshot_identifier( 

174 'Final', replication_group) 

175 client.delete_replication_group(**params) 

176 

177 self.log.info( 

178 'Deleted ElastiCache replication group: %s', 

179 replication_group) 

180 

181 def fetch_global_ds_rpgs(self): 

182 rpgs = self.manager.get_resource_manager('elasticache-group').resources(augment=False) 

183 global_rpgs = get_global_datastore_association(rpgs) 

184 return global_rpgs 

185 

186 

187@actions.register('snapshot') 

188class SnapshotElastiCacheCluster(BaseAction): 

189 """Action to snapshot an elasticache cluster 

190 

191 :example: 

192 

193 .. code-block:: yaml 

194 

195 policies: 

196 - name: elasticache-cluster-snapshot 

197 resource: cache-cluster 

198 filters: 

199 - type: value 

200 key: CacheClusterStatus 

201 op: not-in 

202 value: ["deleted","deleting","creating"] 

203 actions: 

204 - snapshot 

205 """ 

206 

207 schema = type_schema('snapshot') 

208 permissions = ('elasticache:CreateSnapshot',) 

209 

210 def process(self, clusters): 

211 set_size = len(clusters) 

212 clusters = [c for c in clusters if _cluster_eligible_for_snapshot(c)] 

213 if set_size != len(clusters): 

214 self.log.info( 

215 "action:snapshot implicitly filtered from %d to %d clusters for snapshot support", 

216 set_size, len(clusters)) 

217 

218 with self.executor_factory(max_workers=2) as w: 

219 futures = [] 

220 client = local_session(self.manager.session_factory).client('elasticache') 

221 for cluster in clusters: 

222 futures.append( 

223 w.submit(self.process_cluster_snapshot, client, cluster)) 

224 

225 for f in as_completed(futures): 

226 if f.exception(): 

227 self.log.error( 

228 "Exception creating cache cluster snapshot \n %s", 

229 f.exception()) 

230 return clusters 

231 

232 def process_cluster_snapshot(self, client, cluster): 

233 client.create_snapshot( 

234 SnapshotName=snapshot_identifier( 

235 'Backup', 

236 cluster['CacheClusterId']), 

237 CacheClusterId=cluster['CacheClusterId']) 

238 

239 

240@actions.register('modify-security-groups') 

241class ElasticacheClusterModifyVpcSecurityGroups(ModifyVpcSecurityGroupsAction): 

242 """Modify security groups on an Elasticache cluster. 

243 

244 Looks at the individual clusters and modifies the Replication 

245 Group's configuration for Security groups so all nodes get 

246 affected equally 

247 

248 """ 

249 permissions = ('elasticache:ModifyReplicationGroup',) 

250 

251 def process(self, clusters): 

252 replication_group_map = {} 

253 client = local_session( 

254 self.manager.session_factory).client('elasticache') 

255 groups = super( 

256 ElasticacheClusterModifyVpcSecurityGroups, self).get_groups( 

257 clusters) 

258 for idx, c in enumerate(clusters): 

259 # build map of Replication Groups to Security Groups 

260 replication_group_map[c['ReplicationGroupId']] = groups[idx] 

261 

262 for idx, r in enumerate(replication_group_map.keys()): 

263 client.modify_replication_group( 

264 ReplicationGroupId=r, 

265 SecurityGroupIds=replication_group_map[r]) 

266 

267 

268@resources.register('cache-subnet-group') 

269class ElastiCacheSubnetGroup(QueryResourceManager): 

270 

271 class resource_type(TypeInfo): 

272 service = 'elasticache' 

273 arn_type = 'subnetgroup' 

274 enum_spec = ('describe_cache_subnet_groups', 

275 'CacheSubnetGroups', None) 

276 name = id = 'CacheSubnetGroupName' 

277 filter_name = 'CacheSubnetGroupName' 

278 filter_type = 'scalar' 

279 cfn_type = 'AWS::ElastiCache::SubnetGroup' 

280 

281 

282@resources.register('cache-snapshot') 

283class ElastiCacheSnapshot(QueryResourceManager): 

284 

285 class resource_type(TypeInfo): 

286 service = 'elasticache' 

287 arn_type = 'snapshot' 

288 arn_separator = ":" 

289 enum_spec = ('describe_snapshots', 'Snapshots', None) 

290 name = id = 'SnapshotName' 

291 filter_name = 'SnapshotName' 

292 filter_type = 'scalar' 

293 date = 'StartTime' 

294 universal_taggable = True 

295 

296 permissions = ('elasticache:ListTagsForResource',) 

297 

298 def augment(self, resources): 

299 return universal_augment(self, resources) 

300 

301 

302@ElastiCacheSnapshot.filter_registry.register('age') 

303class ElastiCacheSnapshotAge(AgeFilter): 

304 """Filters elasticache snapshots based on their age (in days) 

305 

306 :example: 

307 

308 .. code-block:: yaml 

309 

310 policies: 

311 - name: elasticache-stale-snapshots 

312 resource: cache-snapshot 

313 filters: 

314 - type: age 

315 days: 30 

316 op: ge 

317 """ 

318 

319 schema = type_schema( 

320 'age', days={'type': 'number'}, 

321 op={'$ref': '#/definitions/filters_common/comparison_operators'}) 

322 

323 date_attribute = 'dummy' 

324 

325 def get_resource_date(self, snapshot): 

326 """ Override superclass method as there is no single snapshot date attribute. 

327 """ 

328 def to_datetime(v): 

329 if not isinstance(v, datetime): 

330 v = parse(v) 

331 if not v.tzinfo: 

332 v = v.replace(tzinfo=tzutc()) 

333 return v 

334 

335 # Return the earliest of the node snaphot creation times. 

336 return min([to_datetime(ns['SnapshotCreateTime']) 

337 for ns in snapshot['NodeSnapshots']]) 

338 

339 

340@ElastiCacheSnapshot.action_registry.register('delete') 

341class DeleteElastiCacheSnapshot(BaseAction): 

342 """Action to delete elasticache snapshots 

343 

344 To prevent unwanted deletion of elasticache snapshots, it is recommended to 

345 apply a filter 

346 

347 :example: 

348 

349 .. code-block:: yaml 

350 

351 policies: 

352 - name: delete-elasticache-stale-snapshots 

353 resource: cache-snapshot 

354 filters: 

355 - type: age 

356 days: 30 

357 op: ge 

358 actions: 

359 - delete 

360 """ 

361 

362 schema = type_schema('delete') 

363 permissions = ('elasticache:DeleteSnapshot',) 

364 

365 def process(self, snapshots): 

366 self.log.info("Deleting %d ElastiCache snapshots", len(snapshots)) 

367 with self.executor_factory(max_workers=3) as w: 

368 futures = [] 

369 client = local_session(self.manager.session_factory).client('elasticache') 

370 for snapshot_set in chunks(reversed(snapshots), size=50): 

371 futures.append( 

372 w.submit(self.process_snapshot_set, client, snapshot_set)) 

373 for f in as_completed(futures): 

374 if f.exception(): 

375 self.log.error( 

376 "Exception deleting snapshot set \n %s", 

377 f.exception()) 

378 return snapshots 

379 

380 def process_snapshot_set(self, client, snapshots_set): 

381 for s in snapshots_set: 

382 client.delete_snapshot(SnapshotName=s['SnapshotName']) 

383 

384 

385@ElastiCacheSnapshot.action_registry.register('copy-cluster-tags') 

386class CopyClusterTags(BaseAction): 

387 """ 

388 Copy specified tags from Elasticache cluster to Snapshot 

389 :example: 

390 

391 .. code-block:: yaml 

392 

393 - name: elasticache-test 

394 resource: cache-snapshot 

395 filters: 

396 - type: value 

397 key: SnapshotName 

398 op: in 

399 value: 

400 - test-tags-backup 

401 actions: 

402 - type: copy-cluster-tags 

403 tags: 

404 - tag1 

405 - tag2 

406 """ 

407 

408 schema = type_schema( 

409 'copy-cluster-tags', 

410 tags={'type': 'array', 'items': {'type': 'string'}, 'minItems': 1}, 

411 required=('tags',)) 

412 

413 def get_permissions(self): 

414 perms = self.manager.get_resource_manager('cache-cluster').get_permissions() 

415 perms.append('elasticache:AddTagsToResource') 

416 return perms 

417 

418 def process(self, snapshots): 

419 client = local_session(self.manager.session_factory).client('elasticache') 

420 clusters = {r['CacheClusterId']: r for r in 

421 self.manager.get_resource_manager('cache-cluster').resources()} 

422 copyable_tags = self.data.get('tags') 

423 

424 for s in snapshots: 

425 # For replicated/sharded clusters it is possible for each 

426 # shard to have separate tags, we go ahead and tag the 

427 # snap with the union of tags with overlaps getting the 

428 # last value (arbitrary if mismatched). 

429 if 'CacheClusterId' not in s: 

430 cluster_ids = [ns['CacheClusterId'] for ns in s['NodeSnapshots']] 

431 else: 

432 cluster_ids = [s['CacheClusterId']] 

433 

434 copy_tags = {} 

435 for cid in sorted(cluster_ids): 

436 if cid not in clusters: 

437 continue 

438 

439 cluster_tags = {t['Key']: t['Value'] for t in clusters[cid]['Tags']} 

440 snap_tags = {t['Key']: t['Value'] for t in s.get('Tags', ())} 

441 

442 for k, v in cluster_tags.items(): 

443 if copyable_tags and k not in copyable_tags: 

444 continue 

445 if k.startswith('aws:'): 

446 continue 

447 if snap_tags.get(k, '') == v: 

448 continue 

449 copy_tags[k] = v 

450 

451 if not copy_tags: 

452 continue 

453 

454 if len(set(copy_tags).union(set(snap_tags))) > 50: 

455 self.log.error( 

456 "Cant copy tags, max tag limit hit on snapshot:%s", 

457 s['SnapshotName']) 

458 continue 

459 

460 arn = self.manager.generate_arn(s['SnapshotName']) 

461 self.manager.retry( 

462 client.add_tags_to_resource, 

463 ResourceName=arn, 

464 Tags=[{'Key': k, 'Value': v} for k, v in copy_tags.items()]) 

465 

466 

467def _cluster_eligible_for_snapshot(cluster): 

468 # added regex search to filter unsupported cachenode types 

469 return ( 

470 cluster['Engine'] != 'memcached' and not 

471 TTYPE.match(cluster['CacheNodeType']) 

472 ) 

473 

474 

475@resources.register('elasticache-group') 

476class ElastiCacheReplicationGroup(QueryResourceManager): 

477 

478 class resource_type(TypeInfo): 

479 service = "elasticache" 

480 enum_spec = ('describe_replication_groups', 

481 'ReplicationGroups[]', None) 

482 arn_type = 'replicationgroup' 

483 id = name = dimension = 'ReplicationGroupId' 

484 cfn_type = 'AWS::ElastiCache::ReplicationGroup' 

485 arn_separator = ":" 

486 universal_taggable = object() 

487 

488 augment = universal_augment 

489 permissions = ('elasticache:DescribeReplicationGroups',) 

490 

491 

492@ElastiCacheReplicationGroup.filter_registry.register('kms-key') 

493class KmsFilter(KmsRelatedFilter): 

494 

495 RelatedIdsExpression = 'KmsKeyId' 

496 

497 

498def get_global_datastore_association(resources): 

499 global_ds_rpgs = set() 

500 for r in resources: 

501 global_ds_association = jmespath_search( 

502 'GlobalReplicationGroupInfo.GlobalReplicationGroupId', r) 

503 if global_ds_association: 

504 global_ds_rpgs.add(r['ReplicationGroupId']) 

505 return global_ds_rpgs 

506 

507 

508@ElastiCacheReplicationGroup.action_registry.register('delete') 

509class DeleteReplicationGroup(BaseAction): 

510 """Action to delete a cache replication group 

511 

512 :example: 

513 

514 .. code-block:: yaml 

515 

516 policies: 

517 - name: elasticache-delete-replication-group 

518 resource: aws.elasticache-group 

519 filters: 

520 - type: value 

521 key: AtRestEncryptionEnabled 

522 value: False 

523 actions: 

524 - type: delete 

525 snapshot: False 

526 

527 """ 

528 schema = type_schema( 

529 'delete', **{'snapshot': {'type': 'boolean'}}) 

530 

531 valid_origin_states = ('available',) 

532 permissions = ('elasticache:DeleteReplicationGroup',) 

533 

534 def process(self, resources): 

535 resources = self.filter_resources(resources, 'Status', self.valid_origin_states) 

536 client = local_session(self.manager.session_factory).client('elasticache') 

537 global_datastore_association_map = get_global_datastore_association(resources) 

538 for r in resources: 

539 if r['ReplicationGroupId'] in global_datastore_association_map: 

540 self.log.info( 

541 f"Skipping {r['ReplicationGroupId']}: associated with a global datastore") 

542 continue 

543 params = {'ReplicationGroupId': r['ReplicationGroupId']} 

544 if self.data.get('snapshot', False): 

545 params.update({'FinalSnapshotIdentifier': r['ReplicationGroupId'] + '-snapshot'}) 

546 self.manager.retry(client.delete_replication_group, **params, ignore_err_codes=( 

547 'ReplicationGroupNotFoundFault',))