Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/c7n/resources/elasticache.py: 48%

254 statements  

« prev     ^ index     » next       coverage.py v7.3.2, created at 2023-12-08 06:51 +0000

1# Copyright The Cloud Custodian Authors. 

2# SPDX-License-Identifier: Apache-2.0 

3from datetime import datetime 

4import re 

5 

6from concurrent.futures import as_completed 

7from dateutil.tz import tzutc 

8from dateutil.parser import parse 

9 

10from c7n.actions import ( 

11 ActionRegistry, BaseAction, ModifyVpcSecurityGroupsAction) 

12from c7n.filters import FilterRegistry, AgeFilter 

13import c7n.filters.vpc as net_filters 

14from c7n.filters.kms import KmsRelatedFilter 

15from c7n.manager import resources 

16from c7n.query import QueryResourceManager, TypeInfo 

17from c7n.tags import universal_augment 

18from c7n.utils import ( 

19 local_session, chunks, snapshot_identifier, type_schema, jmespath_search) 

20 

21filters = FilterRegistry('elasticache.filters') 

22actions = ActionRegistry('elasticache.actions') 

23 

24TTYPE = re.compile('cache.t1') 

25 

26 

27@resources.register('cache-cluster') 

28class ElastiCacheCluster(QueryResourceManager): 

29 

30 class resource_type(TypeInfo): 

31 service = 'elasticache' 

32 arn_type = 'cluster' 

33 arn_separator = ":" 

34 enum_spec = ('describe_cache_clusters', 

35 'CacheClusters[]', None) 

36 name = id = 'CacheClusterId' 

37 filter_name = 'CacheClusterId' 

38 filter_type = 'scalar' 

39 date = 'CacheClusterCreateTime' 

40 dimension = 'CacheClusterId' 

41 universal_taggable = True 

42 cfn_type = 'AWS::ElastiCache::CacheCluster' 

43 

44 filter_registry = filters 

45 action_registry = actions 

46 permissions = ('elasticache:ListTagsForResource',) 

47 augment = universal_augment 

48 

49 

50@filters.register('security-group') 

51class SecurityGroupFilter(net_filters.SecurityGroupFilter): 

52 

53 RelatedIdsExpression = "SecurityGroups[].SecurityGroupId" 

54 

55 

56@filters.register('subnet') 

57class SubnetFilter(net_filters.SubnetFilter): 

58 """Filters elasticache clusters based on their associated subnet 

59 

60 :example: 

61 

62 .. code-block:: yaml 

63 

64 policies: 

65 - name: elasticache-in-subnet-x 

66 resource: cache-cluster 

67 filters: 

68 - type: subnet 

69 key: SubnetId 

70 value: subnet-12ab34cd 

71 """ 

72 

73 RelatedIdsExpression = "" 

74 

75 def get_related_ids(self, resources): 

76 group_ids = set() 

77 for r in resources: 

78 group_ids.update( 

79 [s['SubnetIdentifier'] for s in 

80 self.groups[r['CacheSubnetGroupName']]['Subnets']]) 

81 return group_ids 

82 

83 def process(self, resources, event=None): 

84 self.groups = { 

85 r['CacheSubnetGroupName']: r for r in 

86 self.manager.get_resource_manager( 

87 'cache-subnet-group').resources()} 

88 return super(SubnetFilter, self).process(resources, event) 

89 

90 

91filters.register('network-location', net_filters.NetworkLocation) 

92 

93 

94@actions.register('delete') 

95class DeleteElastiCacheCluster(BaseAction): 

96 """Action to delete an elasticache cluster 

97 

98 To prevent unwanted deletion of elasticache clusters, it is recommended 

99 to include a filter 

100 

101 :example: 

102 

103 .. code-block:: yaml 

104 

105 policies: 

106 - name: elasticache-delete-stale-clusters 

107 resource: cache-cluster 

108 filters: 

109 - type: value 

110 value_type: age 

111 key: CacheClusterCreateTime 

112 op: ge 

113 value: 90 

114 actions: 

115 - type: delete 

116 skip-snapshot: false 

117 """ 

118 

119 schema = type_schema( 

120 'delete', **{'skip-snapshot': {'type': 'boolean'}}) 

121 permissions = ('elasticache:DeleteCacheCluster', 

122 'elasticache:DeleteReplicationGroup') 

123 

124 def process(self, clusters): 

125 skip = self.data.get('skip-snapshot', False) 

126 client = local_session( 

127 self.manager.session_factory).client('elasticache') 

128 

129 clusters_to_delete = [] 

130 replication_groups_to_delete = set() 

131 for cluster in clusters: 

132 if cluster.get('ReplicationGroupId') in self.fetch_global_ds_rpgs(): 

133 self.log.info( 

134 f"Skipping {cluster['CacheClusterId']}: associated with a global datastore") 

135 continue 

136 if cluster.get('ReplicationGroupId', ''): 

137 replication_groups_to_delete.add(cluster['ReplicationGroupId']) 

138 else: 

139 clusters_to_delete.append(cluster) 

140 # added if statement to handle differences in parameters if snapshot is skipped 

141 for cluster in clusters_to_delete: 

142 params = {'CacheClusterId': cluster['CacheClusterId']} 

143 if _cluster_eligible_for_snapshot(cluster) and not skip: 

144 params['FinalSnapshotIdentifier'] = snapshot_identifier( 

145 'Final', cluster['CacheClusterId']) 

146 self.log.debug( 

147 "Taking final snapshot of %s", cluster['CacheClusterId']) 

148 else: 

149 self.log.debug( 

150 "Skipping final snapshot of %s", cluster['CacheClusterId']) 

151 client.delete_cache_cluster(**params) 

152 self.log.info( 

153 'Deleted ElastiCache cluster: %s', 

154 cluster['CacheClusterId']) 

155 

156 cacheClusterIds = set([c["CacheClusterId"] for c in clusters]) 

157 for replication_group in replication_groups_to_delete: 

158 # NOTE don't delete the group if it's not empty 

159 rg = client.describe_replication_groups( 

160 ReplicationGroupId=replication_group)["ReplicationGroups"][0] 

161 if not all(cluster in cacheClusterIds for cluster in rg["MemberClusters"]): 

162 # NOTE mark members for better presentation on notifications 

163 for c in clusters: 

164 if c.get("ReplicationGroupId") == replication_group: 

165 c["MemberClusters"] = rg["MemberClusters"] 

166 self.log.info(f'{replication_group} is not empty: {rg["MemberClusters"]}') 

167 continue 

168 

169 params = {'ReplicationGroupId': replication_group, 

170 'RetainPrimaryCluster': False} 

171 if not skip: 

172 params['FinalSnapshotIdentifier'] = snapshot_identifier( 

173 'Final', replication_group) 

174 client.delete_replication_group(**params) 

175 

176 self.log.info( 

177 'Deleted ElastiCache replication group: %s', 

178 replication_group) 

179 

180 def fetch_global_ds_rpgs(self): 

181 rpgs = self.manager.get_resource_manager('elasticache-group').resources(augment=False) 

182 global_rpgs = get_global_datastore_association(rpgs) 

183 return global_rpgs 

184 

185 

186@actions.register('snapshot') 

187class SnapshotElastiCacheCluster(BaseAction): 

188 """Action to snapshot an elasticache cluster 

189 

190 :example: 

191 

192 .. code-block:: yaml 

193 

194 policies: 

195 - name: elasticache-cluster-snapshot 

196 resource: cache-cluster 

197 filters: 

198 - type: value 

199 key: CacheClusterStatus 

200 op: not-in 

201 value: ["deleted","deleting","creating"] 

202 actions: 

203 - snapshot 

204 """ 

205 

206 schema = type_schema('snapshot') 

207 permissions = ('elasticache:CreateSnapshot',) 

208 

209 def process(self, clusters): 

210 set_size = len(clusters) 

211 clusters = [c for c in clusters if _cluster_eligible_for_snapshot(c)] 

212 if set_size != len(clusters): 

213 self.log.info( 

214 "action:snapshot implicitly filtered from %d to %d clusters for snapshot support", 

215 set_size, len(clusters)) 

216 

217 with self.executor_factory(max_workers=2) as w: 

218 futures = [] 

219 client = local_session(self.manager.session_factory).client('elasticache') 

220 for cluster in clusters: 

221 futures.append( 

222 w.submit(self.process_cluster_snapshot, client, cluster)) 

223 

224 for f in as_completed(futures): 

225 if f.exception(): 

226 self.log.error( 

227 "Exception creating cache cluster snapshot \n %s", 

228 f.exception()) 

229 return clusters 

230 

231 def process_cluster_snapshot(self, client, cluster): 

232 client.create_snapshot( 

233 SnapshotName=snapshot_identifier( 

234 'Backup', 

235 cluster['CacheClusterId']), 

236 CacheClusterId=cluster['CacheClusterId']) 

237 

238 

239@actions.register('modify-security-groups') 

240class ElasticacheClusterModifyVpcSecurityGroups(ModifyVpcSecurityGroupsAction): 

241 """Modify security groups on an Elasticache cluster. 

242 

243 Looks at the individual clusters and modifies the Replication 

244 Group's configuration for Security groups so all nodes get 

245 affected equally 

246 

247 """ 

248 permissions = ('elasticache:ModifyReplicationGroup',) 

249 

250 def process(self, clusters): 

251 replication_group_map = {} 

252 client = local_session( 

253 self.manager.session_factory).client('elasticache') 

254 groups = super( 

255 ElasticacheClusterModifyVpcSecurityGroups, self).get_groups( 

256 clusters) 

257 for idx, c in enumerate(clusters): 

258 # build map of Replication Groups to Security Groups 

259 replication_group_map[c['ReplicationGroupId']] = groups[idx] 

260 

261 for idx, r in enumerate(replication_group_map.keys()): 

262 client.modify_replication_group( 

263 ReplicationGroupId=r, 

264 SecurityGroupIds=replication_group_map[r]) 

265 

266 

267@resources.register('cache-subnet-group') 

268class ElastiCacheSubnetGroup(QueryResourceManager): 

269 

270 class resource_type(TypeInfo): 

271 service = 'elasticache' 

272 arn_type = 'subnetgroup' 

273 enum_spec = ('describe_cache_subnet_groups', 

274 'CacheSubnetGroups', None) 

275 name = id = 'CacheSubnetGroupName' 

276 filter_name = 'CacheSubnetGroupName' 

277 filter_type = 'scalar' 

278 cfn_type = 'AWS::ElastiCache::SubnetGroup' 

279 

280 

281@resources.register('cache-snapshot') 

282class ElastiCacheSnapshot(QueryResourceManager): 

283 

284 class resource_type(TypeInfo): 

285 service = 'elasticache' 

286 arn_type = 'snapshot' 

287 arn_separator = ":" 

288 enum_spec = ('describe_snapshots', 'Snapshots', None) 

289 name = id = 'SnapshotName' 

290 filter_name = 'SnapshotName' 

291 filter_type = 'scalar' 

292 date = 'StartTime' 

293 universal_taggable = True 

294 

295 permissions = ('elasticache:ListTagsForResource',) 

296 

297 def augment(self, resources): 

298 return universal_augment(self, resources) 

299 

300 

301@ElastiCacheSnapshot.filter_registry.register('age') 

302class ElastiCacheSnapshotAge(AgeFilter): 

303 """Filters elasticache snapshots based on their age (in days) 

304 

305 :example: 

306 

307 .. code-block:: yaml 

308 

309 policies: 

310 - name: elasticache-stale-snapshots 

311 resource: cache-snapshot 

312 filters: 

313 - type: age 

314 days: 30 

315 op: ge 

316 """ 

317 

318 schema = type_schema( 

319 'age', days={'type': 'number'}, 

320 op={'$ref': '#/definitions/filters_common/comparison_operators'}) 

321 

322 date_attribute = 'dummy' 

323 

324 def get_resource_date(self, snapshot): 

325 """ Override superclass method as there is no single snapshot date attribute. 

326 """ 

327 def to_datetime(v): 

328 if not isinstance(v, datetime): 

329 v = parse(v) 

330 if not v.tzinfo: 

331 v = v.replace(tzinfo=tzutc()) 

332 return v 

333 

334 # Return the earliest of the node snaphot creation times. 

335 return min([to_datetime(ns['SnapshotCreateTime']) 

336 for ns in snapshot['NodeSnapshots']]) 

337 

338 

339@ElastiCacheSnapshot.action_registry.register('delete') 

340class DeleteElastiCacheSnapshot(BaseAction): 

341 """Action to delete elasticache snapshots 

342 

343 To prevent unwanted deletion of elasticache snapshots, it is recommended to 

344 apply a filter 

345 

346 :example: 

347 

348 .. code-block:: yaml 

349 

350 policies: 

351 - name: delete-elasticache-stale-snapshots 

352 resource: cache-snapshot 

353 filters: 

354 - type: age 

355 days: 30 

356 op: ge 

357 actions: 

358 - delete 

359 """ 

360 

361 schema = type_schema('delete') 

362 permissions = ('elasticache:DeleteSnapshot',) 

363 

364 def process(self, snapshots): 

365 self.log.info("Deleting %d ElastiCache snapshots", len(snapshots)) 

366 with self.executor_factory(max_workers=3) as w: 

367 futures = [] 

368 client = local_session(self.manager.session_factory).client('elasticache') 

369 for snapshot_set in chunks(reversed(snapshots), size=50): 

370 futures.append( 

371 w.submit(self.process_snapshot_set, client, snapshot_set)) 

372 for f in as_completed(futures): 

373 if f.exception(): 

374 self.log.error( 

375 "Exception deleting snapshot set \n %s", 

376 f.exception()) 

377 return snapshots 

378 

379 def process_snapshot_set(self, client, snapshots_set): 

380 for s in snapshots_set: 

381 client.delete_snapshot(SnapshotName=s['SnapshotName']) 

382 

383 

384@ElastiCacheSnapshot.action_registry.register('copy-cluster-tags') 

385class CopyClusterTags(BaseAction): 

386 """ 

387 Copy specified tags from Elasticache cluster to Snapshot 

388 :example: 

389 

390 .. code-block:: yaml 

391 

392 - name: elasticache-test 

393 resource: cache-snapshot 

394 filters: 

395 - type: value 

396 key: SnapshotName 

397 op: in 

398 value: 

399 - test-tags-backup 

400 actions: 

401 - type: copy-cluster-tags 

402 tags: 

403 - tag1 

404 - tag2 

405 """ 

406 

407 schema = type_schema( 

408 'copy-cluster-tags', 

409 tags={'type': 'array', 'items': {'type': 'string'}, 'minItems': 1}, 

410 required=('tags',)) 

411 

412 def get_permissions(self): 

413 perms = self.manager.get_resource_manager('cache-cluster').get_permissions() 

414 perms.append('elasticache:AddTagsToResource') 

415 return perms 

416 

417 def process(self, snapshots): 

418 client = local_session(self.manager.session_factory).client('elasticache') 

419 clusters = {r['CacheClusterId']: r for r in 

420 self.manager.get_resource_manager('cache-cluster').resources()} 

421 copyable_tags = self.data.get('tags') 

422 

423 for s in snapshots: 

424 # For replicated/sharded clusters it is possible for each 

425 # shard to have separate tags, we go ahead and tag the 

426 # snap with the union of tags with overlaps getting the 

427 # last value (arbitrary if mismatched). 

428 if 'CacheClusterId' not in s: 

429 cluster_ids = [ns['CacheClusterId'] for ns in s['NodeSnapshots']] 

430 else: 

431 cluster_ids = [s['CacheClusterId']] 

432 

433 copy_tags = {} 

434 for cid in sorted(cluster_ids): 

435 if cid not in clusters: 

436 continue 

437 

438 cluster_tags = {t['Key']: t['Value'] for t in clusters[cid]['Tags']} 

439 snap_tags = {t['Key']: t['Value'] for t in s.get('Tags', ())} 

440 

441 for k, v in cluster_tags.items(): 

442 if copyable_tags and k not in copyable_tags: 

443 continue 

444 if k.startswith('aws:'): 

445 continue 

446 if snap_tags.get(k, '') == v: 

447 continue 

448 copy_tags[k] = v 

449 

450 if not copy_tags: 

451 continue 

452 

453 if len(set(copy_tags).union(set(snap_tags))) > 50: 

454 self.log.error( 

455 "Cant copy tags, max tag limit hit on snapshot:%s", 

456 s['SnapshotName']) 

457 continue 

458 

459 arn = self.manager.generate_arn(s['SnapshotName']) 

460 self.manager.retry( 

461 client.add_tags_to_resource, 

462 ResourceName=arn, 

463 Tags=[{'Key': k, 'Value': v} for k, v in copy_tags.items()]) 

464 

465 

466def _cluster_eligible_for_snapshot(cluster): 

467 # added regex search to filter unsupported cachenode types 

468 return ( 

469 cluster['Engine'] != 'memcached' and not 

470 TTYPE.match(cluster['CacheNodeType']) 

471 ) 

472 

473 

474@resources.register('elasticache-group') 

475class ElastiCacheReplicationGroup(QueryResourceManager): 

476 

477 class resource_type(TypeInfo): 

478 service = "elasticache" 

479 enum_spec = ('describe_replication_groups', 

480 'ReplicationGroups[]', None) 

481 arn_type = 'replicationgroup' 

482 id = name = dimension = 'ReplicationGroupId' 

483 cfn_type = 'AWS::ElastiCache::ReplicationGroup' 

484 arn_separator = ":" 

485 universal_taggable = object() 

486 

487 augment = universal_augment 

488 permissions = ('elasticache:DescribeReplicationGroups',) 

489 

490 

491@ElastiCacheReplicationGroup.filter_registry.register('kms-key') 

492class KmsFilter(KmsRelatedFilter): 

493 

494 RelatedIdsExpression = 'KmsKeyId' 

495 

496 

497def get_global_datastore_association(resources): 

498 global_ds_rpgs = set() 

499 for r in resources: 

500 global_ds_association = jmespath_search( 

501 'GlobalReplicationGroupInfo.GlobalReplicationGroupId', r) 

502 if global_ds_association: 

503 global_ds_rpgs.add(r['ReplicationGroupId']) 

504 return global_ds_rpgs 

505 

506 

507@ElastiCacheReplicationGroup.action_registry.register('delete') 

508class DeleteReplicationGroup(BaseAction): 

509 """Action to delete a cache replication group 

510 

511 :example: 

512 

513 .. code-block:: yaml 

514 

515 policies: 

516 - name: elasticache-delete-replication-group 

517 resource: aws.elasticache-group 

518 filters: 

519 - type: value 

520 key: AtRestEncryptionEnabled 

521 value: False 

522 actions: 

523 - type: delete 

524 snapshot: False 

525 

526 """ 

527 schema = type_schema( 

528 'delete', **{'snapshot': {'type': 'boolean'}}) 

529 

530 valid_origin_states = ('available',) 

531 permissions = ('elasticache:DeleteReplicationGroup',) 

532 

533 def process(self, resources): 

534 resources = self.filter_resources(resources, 'Status', self.valid_origin_states) 

535 client = local_session(self.manager.session_factory).client('elasticache') 

536 global_datastore_association_map = get_global_datastore_association(resources) 

537 for r in resources: 

538 if r['ReplicationGroupId'] in global_datastore_association_map: 

539 self.log.info( 

540 f"Skipping {r['ReplicationGroupId']}: associated with a global datastore") 

541 continue 

542 params = {'ReplicationGroupId': r['ReplicationGroupId']} 

543 if self.data.get('snapshot', False): 

544 params.update({'FinalSnapshotIdentifier': r['ReplicationGroupId'] + '-snapshot'}) 

545 self.manager.retry(client.delete_replication_group, **params, ignore_err_codes=( 

546 'ReplicationGroupNotFoundFault',))