Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/c7n/resources/rdscluster.py: 45%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

372 statements  

1# Copyright The Cloud Custodian Authors. 

2# SPDX-License-Identifier: Apache-2.0 

3import logging 

4import itertools 

5from concurrent.futures import as_completed 

6from datetime import datetime, timedelta 

7from itertools import chain 

8 

9from c7n.actions import BaseAction 

10from c7n.filters import AgeFilter, CrossAccountAccessFilter, Filter, ValueFilter 

11from c7n.filters.offhours import OffHour, OnHour 

12import c7n.filters.vpc as net_filters 

13from c7n.manager import resources 

14from c7n.query import ( 

15 ConfigSource, QueryResourceManager, TypeInfo, DescribeSource, RetryPageIterator) 

16from c7n.resources import rds 

17from c7n.filters.kms import KmsRelatedFilter 

18from .aws import shape_validate 

19from c7n.exceptions import PolicyValidationError 

20from c7n.utils import ( 

21 type_schema, local_session, snapshot_identifier, chunks) 

22 

23from c7n.resources.rds import ParameterFilter 

24from c7n.filters.backup import ConsecutiveAwsBackupsFilter 

25 

26log = logging.getLogger('custodian.rds-cluster') 

27 

28 

29class DescribeCluster(DescribeSource): 

30 

31 def get_resources(self, ids): 

32 resources = chain.from_iterable( 

33 self.query.filter( 

34 self.manager, 

35 Filters=[ 

36 {'Name': 'db-cluster-id', 'Values': ids_chunk} 

37 ] 

38 ) 

39 for ids_chunk in chunks(ids, 100) # DescribeCluster filter length limit 

40 ) 

41 return list(resources) 

42 

43 def augment(self, resources): 

44 for r in resources: 

45 r['Tags'] = r.pop('TagList', ()) 

46 return resources 

47 

48 

49class ConfigCluster(ConfigSource): 

50 

51 def load_resource(self, item): 

52 resource = super().load_resource(item) 

53 resource.pop('TagList', None) # we pull tags from supplementary config 

54 for k in list(resource.keys()): 

55 if k.startswith('Dbc'): 

56 resource["DBC%s" % (k[3:])] = resource.pop(k) 

57 elif k.startswith('Iamd'): 

58 resource['IAMD%s' % (k[4:])] = resource.pop(k) 

59 elif k.startswith('Dbs'): 

60 resource["DBS%s" % (k[3:])] = resource.pop(k) 

61 return resource 

62 

63 

64@resources.register('rds-cluster') 

65class RDSCluster(QueryResourceManager): 

66 """Resource manager for RDS clusters. 

67 """ 

68 

69 class resource_type(TypeInfo): 

70 

71 service = 'rds' 

72 arn = 'DBClusterArn' 

73 arn_type = 'cluster' 

74 arn_separator = ":" 

75 enum_spec = ('describe_db_clusters', 'DBClusters', None) 

76 name = id = 'DBClusterIdentifier' 

77 config_id = 'DbClusterResourceId' 

78 dimension = 'DBClusterIdentifier' 

79 universal_taggable = True 

80 permissions_enum = ('rds:DescribeDBClusters',) 

81 cfn_type = config_type = 'AWS::RDS::DBCluster' 

82 

83 source_mapping = { 

84 'config': ConfigCluster, 

85 'describe': DescribeCluster 

86 } 

87 

88 

89RDSCluster.filter_registry.register('offhour', OffHour) 

90RDSCluster.filter_registry.register('onhour', OnHour) 

91 

92 

93@RDSCluster.filter_registry.register('security-group') 

94class SecurityGroupFilter(net_filters.SecurityGroupFilter): 

95 

96 RelatedIdsExpression = "VpcSecurityGroups[].VpcSecurityGroupId" 

97 

98 

99@RDSCluster.filter_registry.register('subnet') 

100class SubnetFilter(net_filters.SubnetFilter): 

101 

102 RelatedIdsExpression = "" 

103 groups = None 

104 

105 def get_permissions(self): 

106 return self.manager.get_resource_manager( 

107 'rds-subnet-group').get_permissions() 

108 

109 def get_subnet_groups(self): 

110 return { 

111 r['DBSubnetGroupName']: r for r in 

112 self.manager.get_resource_manager('rds-subnet-group').resources()} 

113 

114 def get_related_ids(self, resources): 

115 if not self.groups: 

116 self.groups = self.get_subnet_groups() 

117 group_ids = set() 

118 for r in resources: 

119 group_ids.update( 

120 [s['SubnetIdentifier'] for s in 

121 self.groups[r['DBSubnetGroup']]['Subnets']]) 

122 return group_ids 

123 

124 def process(self, resources, event=None): 

125 if not self.groups: 

126 self.groups = self.get_subnet_groups() 

127 return super(SubnetFilter, self).process(resources, event) 

128 

129 

130RDSCluster.filter_registry.register('network-location', net_filters.NetworkLocation) 

131 

132 

133@RDSCluster.filter_registry.register('kms-key') 

134class KmsFilter(KmsRelatedFilter): 

135 

136 RelatedIdsExpression = 'KmsKeyId' 

137 

138 

139@RDSCluster.action_registry.register('delete') 

140class Delete(BaseAction): 

141 """Action to delete a RDS cluster 

142 

143 To prevent unwanted deletion of clusters, it is recommended to apply a 

144 filter to the rule 

145 

146 :example: 

147 

148 .. code-block:: yaml 

149 

150 policies: 

151 - name: rds-cluster-delete-unused 

152 resource: rds-cluster 

153 filters: 

154 - type: metrics 

155 name: CPUUtilization 

156 days: 21 

157 value: 1.0 

158 op: le 

159 actions: 

160 - type: delete 

161 skip-snapshot: false 

162 delete-instances: true 

163 """ 

164 

165 schema = type_schema( 

166 'delete', **{'skip-snapshot': {'type': 'boolean'}, 

167 'delete-instances': {'type': 'boolean'}}) 

168 

169 permissions = ('rds:DeleteDBCluster',) 

170 

171 def process(self, clusters): 

172 skip = self.data.get('skip-snapshot', False) 

173 delete_instances = self.data.get('delete-instances', True) 

174 client = local_session(self.manager.session_factory).client('rds') 

175 

176 for cluster in clusters: 

177 if delete_instances: 

178 for instance in cluster.get('DBClusterMembers', []): 

179 client.delete_db_instance( 

180 DBInstanceIdentifier=instance['DBInstanceIdentifier'], 

181 SkipFinalSnapshot=True) 

182 self.log.info( 

183 'Deleted RDS instance: %s', 

184 instance['DBInstanceIdentifier']) 

185 

186 params = {'DBClusterIdentifier': cluster['DBClusterIdentifier']} 

187 if skip: 

188 params['SkipFinalSnapshot'] = True 

189 else: 

190 params['FinalDBSnapshotIdentifier'] = snapshot_identifier( 

191 'Final', cluster['DBClusterIdentifier']) 

192 

193 _run_cluster_method( 

194 client.delete_db_cluster, params, 

195 (client.exceptions.DBClusterNotFoundFault, client.exceptions.ResourceNotFoundFault), 

196 client.exceptions.InvalidDBClusterStateFault) 

197 

198 

199@RDSCluster.action_registry.register('retention') 

200class RetentionWindow(BaseAction): 

201 """ 

202 Action to set the retention period on rds cluster snapshots, 

203 enforce (min, max, exact) sets retention days occordingly. 

204 

205 :example: 

206 

207 .. code-block:: yaml 

208 

209 policies: 

210 - name: rds-cluster-backup-retention 

211 resource: rds-cluster 

212 filters: 

213 - type: value 

214 key: BackupRetentionPeriod 

215 value: 21 

216 op: ne 

217 actions: 

218 - type: retention 

219 days: 21 

220 enforce: min 

221 """ 

222 

223 date_attribute = "BackupRetentionPeriod" 

224 # Tag copy not yet available for Aurora: 

225 # https://forums.aws.amazon.com/thread.jspa?threadID=225812 

226 schema = type_schema( 

227 'retention', **{'days': {'type': 'number'}, 

228 'enforce': {'type': 'string', 'enum': [ 

229 'min', 'max', 'exact']}}) 

230 permissions = ('rds:ModifyDBCluster',) 

231 

232 def process(self, clusters): 

233 client = local_session(self.manager.session_factory).client('rds') 

234 

235 for cluster in clusters: 

236 self.process_snapshot_retention(client, cluster) 

237 

238 def process_snapshot_retention(self, client, cluster): 

239 current_retention = int(cluster.get('BackupRetentionPeriod', 0)) 

240 new_retention = self.data['days'] 

241 retention_type = self.data.get('enforce', 'min').lower() 

242 if retention_type == 'min': 

243 self.set_retention_window( 

244 client, cluster, max(current_retention, new_retention)) 

245 elif retention_type == 'max': 

246 self.set_retention_window( 

247 client, cluster, min(current_retention, new_retention)) 

248 elif retention_type == 'exact': 

249 self.set_retention_window(client, cluster, new_retention) 

250 

251 def set_retention_window(self, client, cluster, retention): 

252 params = dict( 

253 DBClusterIdentifier=cluster['DBClusterIdentifier'], 

254 BackupRetentionPeriod=retention 

255 ) 

256 if cluster.get('EngineMode') != 'serverless': 

257 params.update( 

258 dict( 

259 PreferredBackupWindow=cluster['PreferredBackupWindow'], 

260 PreferredMaintenanceWindow=cluster['PreferredMaintenanceWindow']) 

261 ) 

262 _run_cluster_method( 

263 client.modify_db_cluster, 

264 params, 

265 (client.exceptions.DBClusterNotFoundFault, client.exceptions.ResourceNotFoundFault), 

266 client.exceptions.InvalidDBClusterStateFault 

267 ) 

268 

269 

270@RDSCluster.action_registry.register('stop') 

271class Stop(BaseAction): 

272 """Stop a running db cluster 

273 """ 

274 

275 schema = type_schema('stop') 

276 permissions = ('rds:StopDBCluster',) 

277 

278 def process(self, clusters): 

279 client = local_session(self.manager.session_factory).client('rds') 

280 for c in clusters: 

281 _run_cluster_method( 

282 client.stop_db_cluster, dict(DBClusterIdentifier=c['DBClusterIdentifier']), 

283 (client.exceptions.DBClusterNotFoundFault, client.exceptions.ResourceNotFoundFault), 

284 client.exceptions.InvalidDBClusterStateFault) 

285 

286 

287@RDSCluster.action_registry.register('start') 

288class Start(BaseAction): 

289 """Start a stopped db cluster 

290 """ 

291 

292 schema = type_schema('start') 

293 permissions = ('rds:StartDBCluster',) 

294 

295 def process(self, clusters): 

296 client = local_session(self.manager.session_factory).client('rds') 

297 for c in clusters: 

298 _run_cluster_method( 

299 client.start_db_cluster, dict(DBClusterIdentifier=c['DBClusterIdentifier']), 

300 (client.exceptions.DBClusterNotFoundFault, client.exceptions.ResourceNotFoundFault), 

301 client.exceptions.InvalidDBClusterStateFault) 

302 

303 

304def _run_cluster_method(method, params, ignore=(), warn=(), method_name=""): 

305 try: 

306 method(**params) 

307 except ignore: 

308 pass 

309 except warn as e: 

310 log.warning( 

311 "error %s on cluster %s error %s", 

312 method_name or method.__name__, params['DBClusterIdentifier'], e) 

313 

314 

315@RDSCluster.action_registry.register('snapshot') 

316class Snapshot(BaseAction): 

317 """Action to create a snapshot of a rds cluster 

318 

319 :example: 

320 

321 .. code-block:: yaml 

322 

323 policies: 

324 - name: rds-cluster-snapshot 

325 resource: rds-cluster 

326 actions: 

327 - snapshot 

328 """ 

329 

330 schema = type_schema('snapshot') 

331 permissions = ('rds:CreateDBClusterSnapshot',) 

332 

333 def process(self, clusters): 

334 client = local_session(self.manager.session_factory).client('rds') 

335 for cluster in clusters: 

336 _run_cluster_method( 

337 client.create_db_cluster_snapshot, 

338 dict( 

339 DBClusterSnapshotIdentifier=snapshot_identifier( 

340 'Backup', cluster['DBClusterIdentifier']), 

341 DBClusterIdentifier=cluster['DBClusterIdentifier']), 

342 (client.exceptions.DBClusterNotFoundFault, client.exceptions.ResourceNotFoundFault), 

343 client.exceptions.InvalidDBClusterStateFault) 

344 

345 

346@RDSCluster.action_registry.register('modify-db-cluster') 

347class ModifyDbCluster(BaseAction): 

348 """Modifies an RDS instance based on specified parameter 

349 using ModifyDbInstance. 

350 

351 'Immediate" determines whether the modification is applied immediately 

352 or not. If 'immediate' is not specified, default is false. 

353 

354 :example: 

355 

356 .. code-block:: yaml 

357 

358 policies: 

359 - name: disable-db-cluster-deletion-protection 

360 resource: rds-cluster 

361 filters: 

362 - DeletionProtection: true 

363 - PubliclyAccessible: true 

364 actions: 

365 - type: modify-db-cluster 

366 attributes: 

367 CopyTagsToSnapshot: true 

368 DeletionProtection: false 

369 """ 

370 

371 schema = type_schema( 

372 'modify-db-cluster', 

373 attributes={'type': 'object'}, 

374 required=('attributes',)) 

375 

376 permissions = ('rds:ModifyDBCluster',) 

377 shape = 'ModifyDBClusterMessage' 

378 

379 def validate(self): 

380 attrs = dict(self.data['attributes']) 

381 if 'DBClusterIdentifier' in attrs: 

382 raise PolicyValidationError( 

383 "Can't include DBClusterIdentifier in modify-db-cluster action") 

384 attrs['DBClusterIdentifier'] = 'PolicyValidation' 

385 return shape_validate(attrs, self.shape, 'rds') 

386 

387 def process(self, clusters): 

388 client = local_session(self.manager.session_factory).client('rds') 

389 for c in clusters: 

390 client.modify_db_cluster( 

391 DBClusterIdentifier=c['DBClusterIdentifier'], 

392 **self.data['attributes']) 

393 

394 

395class DescribeClusterSnapshot(DescribeSource): 

396 

397 def get_resources(self, resource_ids, cache=True): 

398 client = local_session(self.manager.session_factory).client('rds') 

399 return self.manager.retry( 

400 client.describe_db_cluster_snapshots, 

401 Filters=[{ 

402 'Name': 'db-cluster-snapshot-id', 

403 'Values': resource_ids}]).get('DBClusterSnapshots', ()) 

404 

405 def augment(self, resources): 

406 for r in resources: 

407 r['Tags'] = r.pop('TagList', ()) 

408 return resources 

409 

410 

411class ConfigClusterSnapshot(ConfigSource): 

412 

413 def load_resource(self, item): 

414 

415 resource = super(ConfigClusterSnapshot, self).load_resource(item) 

416 # db cluster snapshots are particularly mangled on keys 

417 for k, v in list(resource.items()): 

418 if k.startswith('Dbcl'): 

419 resource.pop(k) 

420 k = 'DBCl%s' % k[4:] 

421 resource[k] = v 

422 elif k.startswith('Iamd'): 

423 resource.pop(k) 

424 k = 'IAMD%s' % k[4:] 

425 resource[k] = v 

426 return resource 

427 

428 

429@resources.register('rds-cluster-snapshot') 

430class RDSClusterSnapshot(QueryResourceManager): 

431 """Resource manager for RDS cluster snapshots. 

432 """ 

433 

434 class resource_type(TypeInfo): 

435 service = 'rds' 

436 arn_type = 'cluster-snapshot' 

437 arn_separator = ':' 

438 arn = 'DBClusterSnapshotArn' 

439 enum_spec = ( 

440 'describe_db_cluster_snapshots', 'DBClusterSnapshots', None) 

441 name = id = 'DBClusterSnapshotIdentifier' 

442 date = 'SnapshotCreateTime' 

443 universal_taggable = object() 

444 config_type = 'AWS::RDS::DBClusterSnapshot' 

445 permissions_enum = ('rds:DescribeDBClusterSnapshots',) 

446 

447 source_mapping = { 

448 'describe': DescribeClusterSnapshot, 

449 'config': ConfigClusterSnapshot 

450 } 

451 

452 

453@RDSClusterSnapshot.filter_registry.register('cross-account') 

454class CrossAccountSnapshot(CrossAccountAccessFilter): 

455 

456 permissions = ('rds:DescribeDBClusterSnapshotAttributes',) 

457 attributes_key = 'c7n:attributes' 

458 annotation_key = 'c7n:CrossAccountViolations' 

459 

460 def process(self, resources, event=None): 

461 self.accounts = self.get_accounts() 

462 self.everyone_only = self.data.get("everyone_only", False) 

463 results = [] 

464 with self.executor_factory(max_workers=2) as w: 

465 futures = [] 

466 for resource_set in chunks(resources, 20): 

467 futures.append(w.submit( 

468 self.process_resource_set, resource_set)) 

469 for f in as_completed(futures): 

470 results.extend(f.result()) 

471 return results 

472 

473 def process_resource_set(self, resource_set): 

474 client = local_session(self.manager.session_factory).client('rds') 

475 results = [] 

476 for r in resource_set: 

477 attrs = {t['AttributeName']: t['AttributeValues'] 

478 for t in self.manager.retry( 

479 client.describe_db_cluster_snapshot_attributes, 

480 DBClusterSnapshotIdentifier=r['DBClusterSnapshotIdentifier'])[ 

481 'DBClusterSnapshotAttributesResult']['DBClusterSnapshotAttributes']} 

482 r[self.attributes_key] = attrs 

483 shared_accounts = set(attrs.get('restore', [])) 

484 if self.everyone_only: 

485 shared_accounts = {a for a in shared_accounts if a == 'all'} 

486 delta_accounts = shared_accounts.difference(self.accounts) 

487 if delta_accounts: 

488 r[self.annotation_key] = list(delta_accounts) 

489 results.append(r) 

490 return results 

491 

492 

493@RDSClusterSnapshot.filter_registry.register('age') 

494class RDSSnapshotAge(AgeFilter): 

495 """Filters rds cluster snapshots based on age (in days) 

496 

497 :example: 

498 

499 .. code-block:: yaml 

500 

501 policies: 

502 - name: rds-cluster-snapshots-expired 

503 resource: rds-cluster-snapshot 

504 filters: 

505 - type: age 

506 days: 30 

507 op: gt 

508 """ 

509 

510 schema = type_schema( 

511 'age', days={'type': 'number'}, 

512 op={'$ref': '#/definitions/filters_common/comparison_operators'}) 

513 

514 date_attribute = 'SnapshotCreateTime' 

515 

516 

517@RDSClusterSnapshot.action_registry.register('set-permissions') 

518class SetPermissions(rds.SetPermissions): 

519 """Set permissions for copying or restoring an RDS cluster snapshot 

520 

521 Use the 'add' and 'remove' parameters to control which accounts to 

522 add or remove, respectively. The default is to remove any 

523 permissions granted to other AWS accounts. 

524 

525 Use `remove: matched` in combination with the `cross-account` filter 

526 for more flexible removal options such as preserving access for 

527 a set of whitelisted accounts: 

528 

529 :example: 

530 

531 .. code-block:: yaml 

532 

533 policies: 

534 - name: rds-cluster-snapshot-prune-permissions 

535 resource: rds-cluster-snapshot 

536 filters: 

537 - type: cross-account 

538 whitelist: 

539 - '112233445566' 

540 actions: 

541 - type: set-permissions 

542 remove: matched 

543 """ 

544 permissions = ('rds:ModifyDBClusterSnapshotAttribute',) 

545 

546 def process_snapshot(self, client, snapshot): 

547 add_accounts = self.data.get('add', []) 

548 remove_accounts = self.data.get('remove', []) 

549 

550 if not (add_accounts or remove_accounts): 

551 if CrossAccountSnapshot.attributes_key not in snapshot: 

552 attrs = { 

553 t['AttributeName']: t['AttributeValues'] 

554 for t in self.manager.retry( 

555 client.describe_db_cluster_snapshot_attributes, 

556 DBClusterSnapshotIdentifier=snapshot['DBClusterSnapshotIdentifier'] 

557 )['DBClusterSnapshotAttributesResult']['DBClusterSnapshotAttributes'] 

558 } 

559 snapshot[CrossAccountSnapshot.attributes_key] = attrs 

560 remove_accounts = snapshot[CrossAccountSnapshot.attributes_key].get('restore', []) 

561 elif remove_accounts == 'matched': 

562 remove_accounts = snapshot.get(CrossAccountSnapshot.annotation_key, []) 

563 

564 if add_accounts or remove_accounts: 

565 client.modify_db_cluster_snapshot_attribute( 

566 DBClusterSnapshotIdentifier=snapshot['DBClusterSnapshotIdentifier'], 

567 AttributeName='restore', 

568 ValuesToRemove=remove_accounts, 

569 ValuesToAdd=add_accounts) 

570 

571 

572@RDSClusterSnapshot.action_registry.register('delete') 

573class RDSClusterSnapshotDelete(BaseAction): 

574 """Action to delete rds cluster snapshots 

575 

576 To prevent unwanted deletion of rds cluster snapshots, it is recommended 

577 to apply a filter to the rule 

578 

579 :example: 

580 

581 .. code-block:: yaml 

582 

583 policies: 

584 - name: rds-cluster-snapshots-expired-delete 

585 resource: rds-cluster-snapshot 

586 filters: 

587 - type: age 

588 days: 30 

589 op: gt 

590 actions: 

591 - delete 

592 """ 

593 

594 schema = type_schema('delete') 

595 permissions = ('rds:DeleteDBClusterSnapshot',) 

596 

597 def process(self, snapshots): 

598 self.log.info("Deleting %d RDS cluster snapshots", len(snapshots)) 

599 client = local_session(self.manager.session_factory).client('rds') 

600 error = None 

601 with self.executor_factory(max_workers=2) as w: 

602 futures = [] 

603 for snapshot_set in chunks(reversed(snapshots), size=50): 

604 futures.append( 

605 w.submit(self.process_snapshot_set, client, snapshot_set)) 

606 for f in as_completed(futures): 

607 if f.exception(): 

608 error = f.exception() 

609 self.log.error( 

610 "Exception deleting snapshot set \n %s", 

611 f.exception()) 

612 if error: 

613 raise error 

614 return snapshots 

615 

616 def process_snapshot_set(self, client, snapshots_set): 

617 for s in snapshots_set: 

618 try: 

619 client.delete_db_cluster_snapshot( 

620 DBClusterSnapshotIdentifier=s['DBClusterSnapshotIdentifier']) 

621 except (client.exceptions.DBSnapshotNotFoundFault, 

622 client.exceptions.InvalidDBSnapshotStateFault): 

623 continue 

624 

625 

626RDSCluster.filter_registry.register('consecutive-aws-backups', ConsecutiveAwsBackupsFilter) 

627 

628 

629@RDSCluster.filter_registry.register('consecutive-snapshots') 

630class ConsecutiveSnapshots(Filter): 

631 """Returns RDS clusters where number of consective daily snapshots is equal to/or greater 

632 than n days. 

633 

634 :example: 

635 

636 .. code-block:: yaml 

637 

638 policies: 

639 - name: rdscluster-daily-snapshot-count 

640 resource: rds-cluster 

641 filters: 

642 - type: consecutive-snapshots 

643 days: 7 

644 """ 

645 schema = type_schema('consecutive-snapshots', days={'type': 'number', 'minimum': 1}, 

646 required=['days']) 

647 permissions = ('rds:DescribeDBClusterSnapshots', 'rds:DescribeDBClusters') 

648 annotation = 'c7n:DBClusterSnapshots' 

649 

650 def process_resource_set(self, client, resources): 

651 rds_clusters = [r['DBClusterIdentifier'] for r in resources] 

652 paginator = client.get_paginator('describe_db_cluster_snapshots') 

653 paginator.PAGE_ITERATOR_CLS = RetryPageIterator 

654 cluster_snapshots = paginator.paginate(Filters=[{'Name': 'db-cluster-id', 

655 'Values': rds_clusters}]).build_full_result().get('DBClusterSnapshots', []) 

656 

657 cluster_map = {} 

658 for snapshot in cluster_snapshots: 

659 cluster_map.setdefault(snapshot['DBClusterIdentifier'], []).append(snapshot) 

660 for r in resources: 

661 r[self.annotation] = cluster_map.get(r['DBClusterIdentifier'], []) 

662 

663 def process(self, resources, event=None): 

664 client = local_session(self.manager.session_factory).client('rds') 

665 results = [] 

666 retention = self.data.get('days') 

667 utcnow = datetime.utcnow() 

668 expected_dates = set() 

669 for days in range(1, retention + 1): 

670 expected_dates.add((utcnow - timedelta(days=days)).strftime('%Y-%m-%d')) 

671 

672 for resource_set in chunks( 

673 [r for r in resources if self.annotation not in r], 50): 

674 self.process_resource_set(client, resource_set) 

675 

676 for r in resources: 

677 snapshot_dates = set() 

678 for snapshot in r[self.annotation]: 

679 if snapshot['Status'] == 'available': 

680 snapshot_dates.add(snapshot['SnapshotCreateTime'].strftime('%Y-%m-%d')) 

681 if expected_dates.issubset(snapshot_dates): 

682 results.append(r) 

683 return results 

684 

685 

686@RDSCluster.filter_registry.register('db-cluster-parameter') 

687class ClusterParameterFilter(ParameterFilter): 

688 """ 

689 Applies value type filter on set db cluster parameter values. 

690 

691 :example: 

692 

693 .. code-block:: yaml 

694 

695 policies: 

696 - name: rdscluster-pg 

697 resource: rds-cluster 

698 filters: 

699 - type: db-cluster-parameter 

700 key: someparam 

701 op: eq 

702 value: someval 

703 """ 

704 schema = type_schema('db-cluster-parameter', rinherit=ValueFilter.schema) 

705 schema_alias = False 

706 permissions = ('rds:DescribeDBInstances', 'rds:DescribeDBParameters',) 

707 policy_annotation = 'c7n:MatchedDBClusterParameter' 

708 param_group_attribute = 'DBClusterParameterGroup' 

709 

710 def _get_param_list(self, pg): 

711 client = local_session(self.manager.session_factory).client('rds') 

712 paginator = client.get_paginator('describe_db_cluster_parameters') 

713 param_list = list(itertools.chain(*[p['Parameters'] 

714 for p in paginator.paginate(DBClusterParameterGroupName=pg)])) 

715 return param_list 

716 

717 def process(self, resources, event=None): 

718 results = [] 

719 parameter_group_list = {db.get(self.param_group_attribute) for db in resources} 

720 paramcache = self.handle_paramgroup_cache(parameter_group_list) 

721 for resource in resources: 

722 pg_values = paramcache[resource['DBClusterParameterGroup']] 

723 if self.match(pg_values): 

724 resource.setdefault(self.policy_annotation, []).append( 

725 self.data.get('key')) 

726 results.append(resource) 

727 return results 

728 

729 

730@RDSCluster.filter_registry.register('pending-maintenance') 

731class PendingMaintenance(Filter): 

732 """ 

733 Scan DB Clusters for those with pending maintenance 

734 

735 :example: 

736 

737 .. code-block:: yaml 

738 

739 policies: 

740 - name: rds-cluster-pending-maintenance 

741 resource: rds-cluster 

742 filters: 

743 - pending-maintenance 

744 - type: value 

745 key: '"c7n:PendingMaintenance".PendingMaintenanceActionDetails[].Action' 

746 op: intersect 

747 value: 

748 - system-update 

749 """ 

750 

751 annotation_key = 'c7n:PendingMaintenance' 

752 schema = type_schema('pending-maintenance') 

753 permissions = ('rds:DescribePendingMaintenanceActions',) 

754 

755 def process(self, resources, event=None): 

756 client = local_session(self.manager.session_factory).client('rds') 

757 

758 results = [] 

759 resource_maintenances = {} 

760 paginator = client.get_paginator('describe_pending_maintenance_actions') 

761 for page in paginator.paginate(): 

762 for action in page['PendingMaintenanceActions']: 

763 resource_maintenances.setdefault(action['ResourceIdentifier'], []).append(action) 

764 

765 for r in resources: 

766 pending_maintenances = resource_maintenances.get(r['DBClusterArn'], []) 

767 if len(pending_maintenances) > 0: 

768 r[self.annotation_key] = pending_maintenances 

769 results.append(r) 

770 

771 return results 

772 

773 

774class DescribeDbShardGroup(DescribeSource): 

775 def augment(self, resources): 

776 for r in resources: 

777 r['Tags'] = r.pop('TagList', ()) 

778 return resources 

779 

780 

781@resources.register('rds-db-shard-group') 

782class RDSDbShardGroup(QueryResourceManager): 

783 class resource_type(TypeInfo): 

784 service = 'rds' 

785 arn = 'DBShardGroupArn' 

786 name = 'DBShardGroupIdentifier' 

787 id = 'DBShardGroupResourceId' 

788 enum_spec = ('describe_db_shard_groups', 'DBShardGroups', None) 

789 cfn_type = 'AWS::RDS::DBShardGroup' 

790 permissions_enum = ("rds:DescribeDBShardGroups",) 

791 universal_taggable = object() 

792 

793 source_mapping = { 

794 'describe': DescribeDbShardGroup 

795 }