Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/c7n/resources/rdscluster.py: 44%

345 statements  

« prev     ^ index     » next       coverage.py v7.3.2, created at 2023-12-08 06:51 +0000

1# Copyright The Cloud Custodian Authors. 

2# SPDX-License-Identifier: Apache-2.0 

3import logging 

4import itertools 

5from concurrent.futures import as_completed 

6from datetime import datetime, timedelta 

7 

8from c7n.actions import BaseAction 

9from c7n.filters import AgeFilter, CrossAccountAccessFilter, Filter, ValueFilter 

10from c7n.filters.offhours import OffHour, OnHour 

11import c7n.filters.vpc as net_filters 

12from c7n.manager import resources 

13from c7n.query import ( 

14 ConfigSource, QueryResourceManager, TypeInfo, DescribeSource, RetryPageIterator) 

15from c7n.resources import rds 

16from c7n.filters.kms import KmsRelatedFilter 

17from .aws import shape_validate 

18from c7n.exceptions import PolicyValidationError 

19from c7n.utils import ( 

20 type_schema, local_session, snapshot_identifier, chunks) 

21 

22from c7n.resources.rds import ParameterFilter 

23from c7n.filters.backup import ConsecutiveAwsBackupsFilter 

24 

25log = logging.getLogger('custodian.rds-cluster') 

26 

27 

28class DescribeCluster(DescribeSource): 

29 

30 def get_resources(self, ids): 

31 return self.query.filter( 

32 self.manager, 

33 **{ 

34 'Filters': [ 

35 {'Name': 'db-cluster-id', 'Values': ids}]}) 

36 

37 def augment(self, resources): 

38 for r in resources: 

39 r['Tags'] = r.pop('TagList', ()) 

40 return resources 

41 

42 

43class ConfigCluster(ConfigSource): 

44 

45 def load_resource(self, item): 

46 resource = super().load_resource(item) 

47 resource.pop('TagList', None) # we pull tags from supplementary config 

48 for k in list(resource.keys()): 

49 if k.startswith('Dbc'): 

50 resource["DBC%s" % (k[3:])] = resource.pop(k) 

51 elif k.startswith('Iamd'): 

52 resource['IAMD%s' % (k[4:])] = resource.pop(k) 

53 elif k.startswith('Dbs'): 

54 resource["DBS%s" % (k[3:])] = resource.pop(k) 

55 return resource 

56 

57 

58@resources.register('rds-cluster') 

59class RDSCluster(QueryResourceManager): 

60 """Resource manager for RDS clusters. 

61 """ 

62 

63 class resource_type(TypeInfo): 

64 

65 service = 'rds' 

66 arn = 'DBClusterArn' 

67 arn_type = 'cluster' 

68 arn_separator = ":" 

69 enum_spec = ('describe_db_clusters', 'DBClusters', None) 

70 name = id = 'DBClusterIdentifier' 

71 config_id = 'DbClusterResourceId' 

72 dimension = 'DBClusterIdentifier' 

73 universal_taggable = True 

74 permissions_enum = ('rds:DescribeDBClusters',) 

75 cfn_type = config_type = 'AWS::RDS::DBCluster' 

76 

77 source_mapping = { 

78 'config': ConfigCluster, 

79 'describe': DescribeCluster 

80 } 

81 

82 

83RDSCluster.filter_registry.register('offhour', OffHour) 

84RDSCluster.filter_registry.register('onhour', OnHour) 

85 

86 

87@RDSCluster.filter_registry.register('security-group') 

88class SecurityGroupFilter(net_filters.SecurityGroupFilter): 

89 

90 RelatedIdsExpression = "VpcSecurityGroups[].VpcSecurityGroupId" 

91 

92 

93@RDSCluster.filter_registry.register('subnet') 

94class SubnetFilter(net_filters.SubnetFilter): 

95 

96 RelatedIdsExpression = "" 

97 groups = None 

98 

99 def get_permissions(self): 

100 return self.manager.get_resource_manager( 

101 'rds-subnet-group').get_permissions() 

102 

103 def get_subnet_groups(self): 

104 return { 

105 r['DBSubnetGroupName']: r for r in 

106 self.manager.get_resource_manager('rds-subnet-group').resources()} 

107 

108 def get_related_ids(self, resources): 

109 if not self.groups: 

110 self.groups = self.get_subnet_groups() 

111 group_ids = set() 

112 for r in resources: 

113 group_ids.update( 

114 [s['SubnetIdentifier'] for s in 

115 self.groups[r['DBSubnetGroup']]['Subnets']]) 

116 return group_ids 

117 

118 def process(self, resources, event=None): 

119 if not self.groups: 

120 self.groups = self.get_subnet_groups() 

121 return super(SubnetFilter, self).process(resources, event) 

122 

123 

124RDSCluster.filter_registry.register('network-location', net_filters.NetworkLocation) 

125 

126 

127@RDSCluster.filter_registry.register('kms-key') 

128class KmsFilter(KmsRelatedFilter): 

129 

130 RelatedIdsExpression = 'KmsKeyId' 

131 

132 

133@RDSCluster.action_registry.register('delete') 

134class Delete(BaseAction): 

135 """Action to delete a RDS cluster 

136 

137 To prevent unwanted deletion of clusters, it is recommended to apply a 

138 filter to the rule 

139 

140 :example: 

141 

142 .. code-block:: yaml 

143 

144 policies: 

145 - name: rds-cluster-delete-unused 

146 resource: rds-cluster 

147 filters: 

148 - type: metrics 

149 name: CPUUtilization 

150 days: 21 

151 value: 1.0 

152 op: le 

153 actions: 

154 - type: delete 

155 skip-snapshot: false 

156 delete-instances: true 

157 """ 

158 

159 schema = type_schema( 

160 'delete', **{'skip-snapshot': {'type': 'boolean'}, 

161 'delete-instances': {'type': 'boolean'}}) 

162 

163 permissions = ('rds:DeleteDBCluster',) 

164 

165 def process(self, clusters): 

166 skip = self.data.get('skip-snapshot', False) 

167 delete_instances = self.data.get('delete-instances', True) 

168 client = local_session(self.manager.session_factory).client('rds') 

169 

170 for cluster in clusters: 

171 if delete_instances: 

172 for instance in cluster.get('DBClusterMembers', []): 

173 client.delete_db_instance( 

174 DBInstanceIdentifier=instance['DBInstanceIdentifier'], 

175 SkipFinalSnapshot=True) 

176 self.log.info( 

177 'Deleted RDS instance: %s', 

178 instance['DBInstanceIdentifier']) 

179 

180 params = {'DBClusterIdentifier': cluster['DBClusterIdentifier']} 

181 if skip: 

182 params['SkipFinalSnapshot'] = True 

183 else: 

184 params['FinalDBSnapshotIdentifier'] = snapshot_identifier( 

185 'Final', cluster['DBClusterIdentifier']) 

186 

187 _run_cluster_method( 

188 client.delete_db_cluster, params, 

189 (client.exceptions.DBClusterNotFoundFault, client.exceptions.ResourceNotFoundFault), 

190 client.exceptions.InvalidDBClusterStateFault) 

191 

192 

193@RDSCluster.action_registry.register('retention') 

194class RetentionWindow(BaseAction): 

195 """ 

196 Action to set the retention period on rds cluster snapshots, 

197 enforce (min, max, exact) sets retention days occordingly. 

198 

199 :example: 

200 

201 .. code-block:: yaml 

202 

203 policies: 

204 - name: rds-cluster-backup-retention 

205 resource: rds-cluster 

206 filters: 

207 - type: value 

208 key: BackupRetentionPeriod 

209 value: 21 

210 op: ne 

211 actions: 

212 - type: retention 

213 days: 21 

214 enforce: min 

215 """ 

216 

217 date_attribute = "BackupRetentionPeriod" 

218 # Tag copy not yet available for Aurora: 

219 # https://forums.aws.amazon.com/thread.jspa?threadID=225812 

220 schema = type_schema( 

221 'retention', **{'days': {'type': 'number'}, 

222 'enforce': {'type': 'string', 'enum': [ 

223 'min', 'max', 'exact']}}) 

224 permissions = ('rds:ModifyDBCluster',) 

225 

226 def process(self, clusters): 

227 client = local_session(self.manager.session_factory).client('rds') 

228 

229 for cluster in clusters: 

230 self.process_snapshot_retention(client, cluster) 

231 

232 def process_snapshot_retention(self, client, cluster): 

233 current_retention = int(cluster.get('BackupRetentionPeriod', 0)) 

234 new_retention = self.data['days'] 

235 retention_type = self.data.get('enforce', 'min').lower() 

236 if retention_type == 'min': 

237 self.set_retention_window( 

238 client, cluster, max(current_retention, new_retention)) 

239 elif retention_type == 'max': 

240 self.set_retention_window( 

241 client, cluster, min(current_retention, new_retention)) 

242 elif retention_type == 'exact': 

243 self.set_retention_window(client, cluster, new_retention) 

244 

245 def set_retention_window(self, client, cluster, retention): 

246 params = dict( 

247 DBClusterIdentifier=cluster['DBClusterIdentifier'], 

248 BackupRetentionPeriod=retention 

249 ) 

250 if cluster.get('EngineMode') != 'serverless': 

251 params.update( 

252 dict( 

253 PreferredBackupWindow=cluster['PreferredBackupWindow'], 

254 PreferredMaintenanceWindow=cluster['PreferredMaintenanceWindow']) 

255 ) 

256 _run_cluster_method( 

257 client.modify_db_cluster, 

258 params, 

259 (client.exceptions.DBClusterNotFoundFault, client.exceptions.ResourceNotFoundFault), 

260 client.exceptions.InvalidDBClusterStateFault 

261 ) 

262 

263 

264@RDSCluster.action_registry.register('stop') 

265class Stop(BaseAction): 

266 """Stop a running db cluster 

267 """ 

268 

269 schema = type_schema('stop') 

270 permissions = ('rds:StopDBCluster',) 

271 

272 def process(self, clusters): 

273 client = local_session(self.manager.session_factory).client('rds') 

274 for c in clusters: 

275 _run_cluster_method( 

276 client.stop_db_cluster, dict(DBClusterIdentifier=c['DBClusterIdentifier']), 

277 (client.exceptions.DBClusterNotFoundFault, client.exceptions.ResourceNotFoundFault), 

278 client.exceptions.InvalidDBClusterStateFault) 

279 

280 

281@RDSCluster.action_registry.register('start') 

282class Start(BaseAction): 

283 """Start a stopped db cluster 

284 """ 

285 

286 schema = type_schema('start') 

287 permissions = ('rds:StartDBCluster',) 

288 

289 def process(self, clusters): 

290 client = local_session(self.manager.session_factory).client('rds') 

291 for c in clusters: 

292 _run_cluster_method( 

293 client.start_db_cluster, dict(DBClusterIdentifier=c['DBClusterIdentifier']), 

294 (client.exceptions.DBClusterNotFoundFault, client.exceptions.ResourceNotFoundFault), 

295 client.exceptions.InvalidDBClusterStateFault) 

296 

297 

298def _run_cluster_method(method, params, ignore=(), warn=(), method_name=""): 

299 try: 

300 method(**params) 

301 except ignore: 

302 pass 

303 except warn as e: 

304 log.warning( 

305 "error %s on cluster %s error %s", 

306 method_name or method.__name__, params['DBClusterIdentifier'], e) 

307 

308 

309@RDSCluster.action_registry.register('snapshot') 

310class Snapshot(BaseAction): 

311 """Action to create a snapshot of a rds cluster 

312 

313 :example: 

314 

315 .. code-block:: yaml 

316 

317 policies: 

318 - name: rds-cluster-snapshot 

319 resource: rds-cluster 

320 actions: 

321 - snapshot 

322 """ 

323 

324 schema = type_schema('snapshot') 

325 permissions = ('rds:CreateDBClusterSnapshot',) 

326 

327 def process(self, clusters): 

328 client = local_session(self.manager.session_factory).client('rds') 

329 for cluster in clusters: 

330 _run_cluster_method( 

331 client.create_db_cluster_snapshot, 

332 dict( 

333 DBClusterSnapshotIdentifier=snapshot_identifier( 

334 'Backup', cluster['DBClusterIdentifier']), 

335 DBClusterIdentifier=cluster['DBClusterIdentifier']), 

336 (client.exceptions.DBClusterNotFoundFault, client.exceptions.ResourceNotFoundFault), 

337 client.exceptions.InvalidDBClusterStateFault) 

338 

339 

340@RDSCluster.action_registry.register('modify-db-cluster') 

341class ModifyDbCluster(BaseAction): 

342 """Modifies an RDS instance based on specified parameter 

343 using ModifyDbInstance. 

344 

345 'Immediate" determines whether the modification is applied immediately 

346 or not. If 'immediate' is not specified, default is false. 

347 

348 :example: 

349 

350 .. code-block:: yaml 

351 

352 policies: 

353 - name: disable-db-cluster-deletion-protection 

354 resource: rds-cluster 

355 filters: 

356 - DeletionProtection: true 

357 - PubliclyAccessible: true 

358 actions: 

359 - type: modify-db-cluster 

360 attributes: 

361 CopyTagsToSnapshot: true 

362 DeletionProtection: false 

363 """ 

364 

365 schema = type_schema( 

366 'modify-db-cluster', 

367 attributes={'type': 'object'}, 

368 required=('attributes',)) 

369 

370 permissions = ('rds:ModifyDBCluster',) 

371 shape = 'ModifyDBClusterMessage' 

372 

373 def validate(self): 

374 attrs = dict(self.data['attributes']) 

375 if 'DBClusterIdentifier' in attrs: 

376 raise PolicyValidationError( 

377 "Can't include DBClusterIdentifier in modify-db-cluster action") 

378 attrs['DBClusterIdentifier'] = 'PolicyValidation' 

379 return shape_validate(attrs, self.shape, 'rds') 

380 

381 def process(self, clusters): 

382 client = local_session(self.manager.session_factory).client('rds') 

383 for c in clusters: 

384 client.modify_db_cluster( 

385 DBClusterIdentifier=c['DBClusterIdentifier'], 

386 **self.data['attributes']) 

387 

388 

389class DescribeClusterSnapshot(DescribeSource): 

390 

391 def get_resources(self, resource_ids, cache=True): 

392 client = local_session(self.manager.session_factory).client('rds') 

393 return self.manager.retry( 

394 client.describe_db_cluster_snapshots, 

395 Filters=[{ 

396 'Name': 'db-cluster-snapshot-id', 

397 'Values': resource_ids}]).get('DBClusterSnapshots', ()) 

398 

399 def augment(self, resources): 

400 for r in resources: 

401 r['Tags'] = r.pop('TagList', ()) 

402 return resources 

403 

404 

405class ConfigClusterSnapshot(ConfigSource): 

406 

407 def load_resource(self, item): 

408 

409 resource = super(ConfigClusterSnapshot, self).load_resource(item) 

410 # db cluster snapshots are particularly mangled on keys 

411 for k, v in list(resource.items()): 

412 if k.startswith('Dbcl'): 

413 resource.pop(k) 

414 k = 'DBCl%s' % k[4:] 

415 resource[k] = v 

416 elif k.startswith('Iamd'): 

417 resource.pop(k) 

418 k = 'IAMD%s' % k[4:] 

419 resource[k] = v 

420 return resource 

421 

422 

423@resources.register('rds-cluster-snapshot') 

424class RDSClusterSnapshot(QueryResourceManager): 

425 """Resource manager for RDS cluster snapshots. 

426 """ 

427 

428 class resource_type(TypeInfo): 

429 service = 'rds' 

430 arn_type = 'cluster-snapshot' 

431 arn_separator = ':' 

432 arn = 'DBClusterSnapshotArn' 

433 enum_spec = ( 

434 'describe_db_cluster_snapshots', 'DBClusterSnapshots', None) 

435 name = id = 'DBClusterSnapshotIdentifier' 

436 date = 'SnapshotCreateTime' 

437 universal_taggable = object() 

438 config_type = 'AWS::RDS::DBClusterSnapshot' 

439 permissions_enum = ('rds:DescribeDBClusterSnapshots',) 

440 

441 source_mapping = { 

442 'describe': DescribeClusterSnapshot, 

443 'config': ConfigClusterSnapshot 

444 } 

445 

446 

447@RDSClusterSnapshot.filter_registry.register('cross-account') 

448class CrossAccountSnapshot(CrossAccountAccessFilter): 

449 

450 permissions = ('rds:DescribeDBClusterSnapshotAttributes',) 

451 attributes_key = 'c7n:attributes' 

452 annotation_key = 'c7n:CrossAccountViolations' 

453 

454 def process(self, resources, event=None): 

455 self.accounts = self.get_accounts() 

456 results = [] 

457 with self.executor_factory(max_workers=2) as w: 

458 futures = [] 

459 for resource_set in chunks(resources, 20): 

460 futures.append(w.submit( 

461 self.process_resource_set, resource_set)) 

462 for f in as_completed(futures): 

463 results.extend(f.result()) 

464 return results 

465 

466 def process_resource_set(self, resource_set): 

467 client = local_session(self.manager.session_factory).client('rds') 

468 results = [] 

469 for r in resource_set: 

470 attrs = {t['AttributeName']: t['AttributeValues'] 

471 for t in self.manager.retry( 

472 client.describe_db_cluster_snapshot_attributes, 

473 DBClusterSnapshotIdentifier=r['DBClusterSnapshotIdentifier'])[ 

474 'DBClusterSnapshotAttributesResult']['DBClusterSnapshotAttributes']} 

475 r[self.attributes_key] = attrs 

476 shared_accounts = set(attrs.get('restore', [])) 

477 delta_accounts = shared_accounts.difference(self.accounts) 

478 if delta_accounts: 

479 r[self.annotation_key] = list(delta_accounts) 

480 results.append(r) 

481 return results 

482 

483 

484@RDSClusterSnapshot.filter_registry.register('age') 

485class RDSSnapshotAge(AgeFilter): 

486 """Filters rds cluster snapshots based on age (in days) 

487 

488 :example: 

489 

490 .. code-block:: yaml 

491 

492 policies: 

493 - name: rds-cluster-snapshots-expired 

494 resource: rds-cluster-snapshot 

495 filters: 

496 - type: age 

497 days: 30 

498 op: gt 

499 """ 

500 

501 schema = type_schema( 

502 'age', days={'type': 'number'}, 

503 op={'$ref': '#/definitions/filters_common/comparison_operators'}) 

504 

505 date_attribute = 'SnapshotCreateTime' 

506 

507 

508@RDSClusterSnapshot.action_registry.register('set-permissions') 

509class SetPermissions(rds.SetPermissions): 

510 """Set permissions for copying or restoring an RDS cluster snapshot 

511 

512 Use the 'add' and 'remove' parameters to control which accounts to 

513 add or remove, respectively. The default is to remove any 

514 permissions granted to other AWS accounts. 

515 

516 Use `remove: matched` in combination with the `cross-account` filter 

517 for more flexible removal options such as preserving access for 

518 a set of whitelisted accounts: 

519 

520 :example: 

521 

522 .. code-block:: yaml 

523 

524 policies: 

525 - name: rds-cluster-snapshot-prune-permissions 

526 resource: rds-cluster-snapshot 

527 filters: 

528 - type: cross-account 

529 whitelist: 

530 - '112233445566' 

531 actions: 

532 - type: set-permissions 

533 remove: matched 

534 """ 

535 permissions = ('rds:ModifyDBClusterSnapshotAttribute',) 

536 

537 def process_snapshot(self, client, snapshot): 

538 add_accounts = self.data.get('add', []) 

539 remove_accounts = self.data.get('remove', []) 

540 

541 if not (add_accounts or remove_accounts): 

542 if CrossAccountSnapshot.attributes_key not in snapshot: 

543 attrs = { 

544 t['AttributeName']: t['AttributeValues'] 

545 for t in self.manager.retry( 

546 client.describe_db_cluster_snapshot_attributes, 

547 DBClusterSnapshotIdentifier=snapshot['DBClusterSnapshotIdentifier'] 

548 )['DBClusterSnapshotAttributesResult']['DBClusterSnapshotAttributes'] 

549 } 

550 snapshot[CrossAccountSnapshot.attributes_key] = attrs 

551 remove_accounts = snapshot[CrossAccountSnapshot.attributes_key].get('restore', []) 

552 elif remove_accounts == 'matched': 

553 remove_accounts = snapshot.get(CrossAccountSnapshot.annotation_key, []) 

554 

555 if add_accounts or remove_accounts: 

556 client.modify_db_cluster_snapshot_attribute( 

557 DBClusterSnapshotIdentifier=snapshot['DBClusterSnapshotIdentifier'], 

558 AttributeName='restore', 

559 ValuesToRemove=remove_accounts, 

560 ValuesToAdd=add_accounts) 

561 

562 

563@RDSClusterSnapshot.action_registry.register('delete') 

564class RDSClusterSnapshotDelete(BaseAction): 

565 """Action to delete rds cluster snapshots 

566 

567 To prevent unwanted deletion of rds cluster snapshots, it is recommended 

568 to apply a filter to the rule 

569 

570 :example: 

571 

572 .. code-block:: yaml 

573 

574 policies: 

575 - name: rds-cluster-snapshots-expired-delete 

576 resource: rds-cluster-snapshot 

577 filters: 

578 - type: age 

579 days: 30 

580 op: gt 

581 actions: 

582 - delete 

583 """ 

584 

585 schema = type_schema('delete') 

586 permissions = ('rds:DeleteDBClusterSnapshot',) 

587 

588 def process(self, snapshots): 

589 self.log.info("Deleting %d RDS cluster snapshots", len(snapshots)) 

590 client = local_session(self.manager.session_factory).client('rds') 

591 error = None 

592 with self.executor_factory(max_workers=2) as w: 

593 futures = [] 

594 for snapshot_set in chunks(reversed(snapshots), size=50): 

595 futures.append( 

596 w.submit(self.process_snapshot_set, client, snapshot_set)) 

597 for f in as_completed(futures): 

598 if f.exception(): 

599 error = f.exception() 

600 self.log.error( 

601 "Exception deleting snapshot set \n %s", 

602 f.exception()) 

603 if error: 

604 raise error 

605 return snapshots 

606 

607 def process_snapshot_set(self, client, snapshots_set): 

608 for s in snapshots_set: 

609 try: 

610 client.delete_db_cluster_snapshot( 

611 DBClusterSnapshotIdentifier=s['DBClusterSnapshotIdentifier']) 

612 except (client.exceptions.DBSnapshotNotFoundFault, 

613 client.exceptions.InvalidDBSnapshotStateFault): 

614 continue 

615 

616 

617RDSCluster.filter_registry.register('consecutive-aws-backups', ConsecutiveAwsBackupsFilter) 

618 

619 

620@RDSCluster.filter_registry.register('consecutive-snapshots') 

621class ConsecutiveSnapshots(Filter): 

622 """Returns RDS clusters where number of consective daily snapshots is equal to/or greater 

623 than n days. 

624 

625 :example: 

626 

627 .. code-block:: yaml 

628 

629 policies: 

630 - name: rdscluster-daily-snapshot-count 

631 resource: rds-cluster 

632 filters: 

633 - type: consecutive-snapshots 

634 days: 7 

635 """ 

636 schema = type_schema('consecutive-snapshots', days={'type': 'number', 'minimum': 1}, 

637 required=['days']) 

638 permissions = ('rds:DescribeDBClusterSnapshots', 'rds:DescribeDBClusters') 

639 annotation = 'c7n:DBClusterSnapshots' 

640 

641 def process_resource_set(self, client, resources): 

642 rds_clusters = [r['DBClusterIdentifier'] for r in resources] 

643 paginator = client.get_paginator('describe_db_cluster_snapshots') 

644 paginator.PAGE_ITERATOR_CLS = RetryPageIterator 

645 cluster_snapshots = paginator.paginate(Filters=[{'Name': 'db-cluster-id', 

646 'Values': rds_clusters}]).build_full_result().get('DBClusterSnapshots', []) 

647 

648 cluster_map = {} 

649 for snapshot in cluster_snapshots: 

650 cluster_map.setdefault(snapshot['DBClusterIdentifier'], []).append(snapshot) 

651 for r in resources: 

652 r[self.annotation] = cluster_map.get(r['DBClusterIdentifier'], []) 

653 

654 def process(self, resources, event=None): 

655 client = local_session(self.manager.session_factory).client('rds') 

656 results = [] 

657 retention = self.data.get('days') 

658 utcnow = datetime.utcnow() 

659 expected_dates = set() 

660 for days in range(1, retention + 1): 

661 expected_dates.add((utcnow - timedelta(days=days)).strftime('%Y-%m-%d')) 

662 

663 for resource_set in chunks( 

664 [r for r in resources if self.annotation not in r], 50): 

665 self.process_resource_set(client, resource_set) 

666 

667 for r in resources: 

668 snapshot_dates = set() 

669 for snapshot in r[self.annotation]: 

670 if snapshot['Status'] == 'available': 

671 snapshot_dates.add(snapshot['SnapshotCreateTime'].strftime('%Y-%m-%d')) 

672 if expected_dates.issubset(snapshot_dates): 

673 results.append(r) 

674 return results 

675 

676 

677@RDSCluster.filter_registry.register('db-cluster-parameter') 

678class ClusterParameterFilter(ParameterFilter): 

679 """ 

680 Applies value type filter on set db cluster parameter values. 

681 

682 :example: 

683 

684 .. code-block:: yaml 

685 

686 policies: 

687 - name: rdscluster-pg 

688 resource: rds-cluster 

689 filters: 

690 - type: db-cluster-parameter 

691 key: someparam 

692 op: eq 

693 value: someval 

694 """ 

695 schema = type_schema('db-cluster-parameter', rinherit=ValueFilter.schema) 

696 schema_alias = False 

697 permissions = ('rds:DescribeDBInstances', 'rds:DescribeDBParameters',) 

698 policy_annotation = 'c7n:MatchedDBClusterParameter' 

699 param_group_attribute = 'DBClusterParameterGroup' 

700 

701 def _get_param_list(self, pg): 

702 client = local_session(self.manager.session_factory).client('rds') 

703 paginator = client.get_paginator('describe_db_cluster_parameters') 

704 param_list = list(itertools.chain(*[p['Parameters'] 

705 for p in paginator.paginate(DBClusterParameterGroupName=pg)])) 

706 return param_list 

707 

708 def process(self, resources, event=None): 

709 results = [] 

710 parameter_group_list = {db.get(self.param_group_attribute) for db in resources} 

711 paramcache = self.handle_paramgroup_cache(parameter_group_list) 

712 for resource in resources: 

713 pg_values = paramcache[resource['DBClusterParameterGroup']] 

714 if self.match(pg_values): 

715 resource.setdefault(self.policy_annotation, []).append( 

716 self.data.get('key')) 

717 results.append(resource) 

718 return results 

719 

720 

721@RDSCluster.filter_registry.register('pending-maintenance') 

722class PendingMaintenance(Filter): 

723 """ 

724 Scan DB Clusters for those with pending maintenance 

725 

726 :example: 

727 

728 .. code-block:: yaml 

729 

730 policies: 

731 - name: rds-cluster-pending-maintenance 

732 resource: rds-cluster 

733 filters: 

734 - pending-maintenance 

735 """ 

736 

737 schema = type_schema('pending-maintenance') 

738 permissions = ('rds:DescribePendingMaintenanceActions',) 

739 

740 def process(self, resources, event=None): 

741 client = local_session(self.manager.session_factory).client('rds') 

742 

743 results = [] 

744 pending_maintenance = set() 

745 paginator = client.get_paginator('describe_pending_maintenance_actions') 

746 for page in paginator.paginate(): 

747 pending_maintenance.update( 

748 {action['ResourceIdentifier'] for action in page['PendingMaintenanceActions']} 

749 ) 

750 

751 for r in resources: 

752 if r['DBClusterArn'] in pending_maintenance: 

753 results.append(r) 

754 

755 return results