Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/c7n/resources/rdscluster.py: 43%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

419 statements  

1# Copyright The Cloud Custodian Authors. 

2# SPDX-License-Identifier: Apache-2.0 

3import logging 

4import itertools 

5from concurrent.futures import as_completed 

6from datetime import datetime, timedelta 

7from itertools import chain 

8 

9from c7n.actions import BaseAction 

10from c7n.filters import AgeFilter, CrossAccountAccessFilter, Filter, ValueFilter 

11from c7n.filters.offhours import OffHour, OnHour 

12import c7n.filters.vpc as net_filters 

13from c7n.manager import resources 

14from c7n.query import ( 

15 ConfigSource, QueryResourceManager, TypeInfo, DescribeSource, RetryPageIterator) 

16from c7n.resources import rds 

17from c7n.filters.kms import KmsRelatedFilter 

18from .aws import shape_validate 

19from c7n.exceptions import PolicyValidationError 

20from botocore.exceptions import ClientError 

21from c7n.utils import ( 

22 type_schema, local_session, get_retry, snapshot_identifier, chunks) 

23 

24from c7n.resources.rds import ParameterFilter 

25from c7n.filters.backup import ConsecutiveAwsBackupsFilter 

26 

27log = logging.getLogger('custodian.rds-cluster') 

28 

29 

30class DescribeCluster(DescribeSource): 

31 

32 def get_resources(self, ids): 

33 resources = chain.from_iterable( 

34 self.query.filter( 

35 self.manager, 

36 Filters=[ 

37 {'Name': 'db-cluster-id', 'Values': ids_chunk} 

38 ] 

39 ) 

40 for ids_chunk in chunks(ids, 100) # DescribeCluster filter length limit 

41 ) 

42 return list(resources) 

43 

44 def augment(self, resources): 

45 for r in resources: 

46 r['Tags'] = r.pop('TagList', ()) 

47 return resources 

48 

49 

50class ConfigCluster(ConfigSource): 

51 

52 def load_resource(self, item): 

53 resource = super().load_resource(item) 

54 resource.pop('TagList', None) # we pull tags from supplementary config 

55 for k in list(resource.keys()): 

56 if k.startswith('Dbc'): 

57 resource["DBC%s" % (k[3:])] = resource.pop(k) 

58 elif k.startswith('Iamd'): 

59 resource['IAMD%s' % (k[4:])] = resource.pop(k) 

60 elif k.startswith('Dbs'): 

61 resource["DBS%s" % (k[3:])] = resource.pop(k) 

62 return resource 

63 

64 

65@resources.register('rds-cluster') 

66class RDSCluster(QueryResourceManager): 

67 """Resource manager for RDS clusters. 

68 """ 

69 

70 class resource_type(TypeInfo): 

71 

72 service = 'rds' 

73 arn = 'DBClusterArn' 

74 arn_type = 'cluster' 

75 arn_separator = ":" 

76 enum_spec = ('describe_db_clusters', 'DBClusters', None) 

77 name = id = 'DBClusterIdentifier' 

78 config_id = 'DbClusterResourceId' 

79 dimension = 'DBClusterIdentifier' 

80 universal_taggable = True 

81 permissions_enum = ('rds:DescribeDBClusters',) 

82 cfn_type = config_type = 'AWS::RDS::DBCluster' 

83 

84 source_mapping = { 

85 'config': ConfigCluster, 

86 'describe': DescribeCluster 

87 } 

88 

89 

90RDSCluster.filter_registry.register('offhour', OffHour) 

91RDSCluster.filter_registry.register('onhour', OnHour) 

92 

93 

94@RDSCluster.filter_registry.register('security-group') 

95class SecurityGroupFilter(net_filters.SecurityGroupFilter): 

96 

97 RelatedIdsExpression = "VpcSecurityGroups[].VpcSecurityGroupId" 

98 

99 

100@RDSCluster.filter_registry.register('subnet') 

101class SubnetFilter(net_filters.SubnetFilter): 

102 

103 RelatedIdsExpression = "" 

104 groups = None 

105 

106 def get_permissions(self): 

107 return self.manager.get_resource_manager( 

108 'rds-subnet-group').get_permissions() 

109 

110 def get_subnet_groups(self): 

111 return { 

112 r['DBSubnetGroupName']: r for r in 

113 self.manager.get_resource_manager('rds-subnet-group').resources()} 

114 

115 def get_related_ids(self, resources): 

116 if not self.groups: 

117 self.groups = self.get_subnet_groups() 

118 group_ids = set() 

119 for r in resources: 

120 group_ids.update( 

121 [s['SubnetIdentifier'] for s in 

122 self.groups[r['DBSubnetGroup']]['Subnets']]) 

123 return group_ids 

124 

125 def process(self, resources, event=None): 

126 if not self.groups: 

127 self.groups = self.get_subnet_groups() 

128 return super(SubnetFilter, self).process(resources, event) 

129 

130 

131RDSCluster.filter_registry.register('network-location', net_filters.NetworkLocation) 

132 

133 

134@RDSCluster.filter_registry.register('kms-key') 

135class KmsFilter(KmsRelatedFilter): 

136 

137 RelatedIdsExpression = 'KmsKeyId' 

138 

139 

140@RDSCluster.action_registry.register('delete') 

141class Delete(BaseAction): 

142 """Action to delete a RDS cluster 

143 

144 To prevent unwanted deletion of clusters, it is recommended to apply a 

145 filter to the rule 

146 

147 :example: 

148 

149 .. code-block:: yaml 

150 

151 policies: 

152 - name: rds-cluster-delete-unused 

153 resource: rds-cluster 

154 filters: 

155 - type: metrics 

156 name: CPUUtilization 

157 days: 21 

158 value: 1.0 

159 op: le 

160 actions: 

161 - type: delete 

162 skip-snapshot: false 

163 delete-instances: true 

164 """ 

165 

166 schema = type_schema( 

167 'delete', **{'skip-snapshot': {'type': 'boolean'}, 

168 'delete-instances': {'type': 'boolean'}}) 

169 

170 permissions = ('rds:DeleteDBCluster',) 

171 

172 def process(self, clusters): 

173 skip = self.data.get('skip-snapshot', False) 

174 delete_instances = self.data.get('delete-instances', True) 

175 client = local_session(self.manager.session_factory).client('rds') 

176 

177 for cluster in clusters: 

178 if delete_instances: 

179 for instance in cluster.get('DBClusterMembers', []): 

180 client.delete_db_instance( 

181 DBInstanceIdentifier=instance['DBInstanceIdentifier'], 

182 SkipFinalSnapshot=True) 

183 self.log.info( 

184 'Deleted RDS instance: %s', 

185 instance['DBInstanceIdentifier']) 

186 

187 params = {'DBClusterIdentifier': cluster['DBClusterIdentifier']} 

188 if skip: 

189 params['SkipFinalSnapshot'] = True 

190 else: 

191 params['FinalDBSnapshotIdentifier'] = snapshot_identifier( 

192 'Final', cluster['DBClusterIdentifier']) 

193 

194 _run_cluster_method( 

195 client.delete_db_cluster, params, 

196 (client.exceptions.DBClusterNotFoundFault, client.exceptions.ResourceNotFoundFault), 

197 client.exceptions.InvalidDBClusterStateFault) 

198 

199 

200@RDSCluster.action_registry.register('retention') 

201class RetentionWindow(BaseAction): 

202 """ 

203 Action to set the retention period on rds cluster snapshots, 

204 enforce (min, max, exact) sets retention days occordingly. 

205 

206 :example: 

207 

208 .. code-block:: yaml 

209 

210 policies: 

211 - name: rds-cluster-backup-retention 

212 resource: rds-cluster 

213 filters: 

214 - type: value 

215 key: BackupRetentionPeriod 

216 value: 21 

217 op: ne 

218 actions: 

219 - type: retention 

220 days: 21 

221 enforce: min 

222 """ 

223 

224 date_attribute = "BackupRetentionPeriod" 

225 # Tag copy not yet available for Aurora: 

226 # https://forums.aws.amazon.com/thread.jspa?threadID=225812 

227 schema = type_schema( 

228 'retention', **{'days': {'type': 'number'}, 

229 'enforce': {'type': 'string', 'enum': [ 

230 'min', 'max', 'exact']}}) 

231 permissions = ('rds:ModifyDBCluster',) 

232 

233 def process(self, clusters): 

234 client = local_session(self.manager.session_factory).client('rds') 

235 

236 for cluster in clusters: 

237 self.process_snapshot_retention(client, cluster) 

238 

239 def process_snapshot_retention(self, client, cluster): 

240 current_retention = int(cluster.get('BackupRetentionPeriod', 0)) 

241 new_retention = self.data['days'] 

242 retention_type = self.data.get('enforce', 'min').lower() 

243 if retention_type == 'min': 

244 self.set_retention_window( 

245 client, cluster, max(current_retention, new_retention)) 

246 elif retention_type == 'max': 

247 self.set_retention_window( 

248 client, cluster, min(current_retention, new_retention)) 

249 elif retention_type == 'exact': 

250 self.set_retention_window(client, cluster, new_retention) 

251 

252 def set_retention_window(self, client, cluster, retention): 

253 params = dict( 

254 DBClusterIdentifier=cluster['DBClusterIdentifier'], 

255 BackupRetentionPeriod=retention 

256 ) 

257 if cluster.get('EngineMode') != 'serverless': 

258 params.update( 

259 dict( 

260 PreferredBackupWindow=cluster['PreferredBackupWindow'], 

261 PreferredMaintenanceWindow=cluster['PreferredMaintenanceWindow']) 

262 ) 

263 _run_cluster_method( 

264 client.modify_db_cluster, 

265 params, 

266 (client.exceptions.DBClusterNotFoundFault, client.exceptions.ResourceNotFoundFault), 

267 client.exceptions.InvalidDBClusterStateFault 

268 ) 

269 

270 

271@RDSCluster.action_registry.register('stop') 

272class Stop(BaseAction): 

273 """Stop a running db cluster 

274 """ 

275 

276 schema = type_schema('stop') 

277 permissions = ('rds:StopDBCluster',) 

278 

279 def process(self, clusters): 

280 client = local_session(self.manager.session_factory).client('rds') 

281 for c in clusters: 

282 _run_cluster_method( 

283 client.stop_db_cluster, dict(DBClusterIdentifier=c['DBClusterIdentifier']), 

284 (client.exceptions.DBClusterNotFoundFault, client.exceptions.ResourceNotFoundFault), 

285 client.exceptions.InvalidDBClusterStateFault) 

286 

287 

288@RDSCluster.action_registry.register('start') 

289class Start(BaseAction): 

290 """Start a stopped db cluster 

291 """ 

292 

293 schema = type_schema('start') 

294 permissions = ('rds:StartDBCluster',) 

295 

296 def process(self, clusters): 

297 client = local_session(self.manager.session_factory).client('rds') 

298 for c in clusters: 

299 _run_cluster_method( 

300 client.start_db_cluster, dict(DBClusterIdentifier=c['DBClusterIdentifier']), 

301 (client.exceptions.DBClusterNotFoundFault, client.exceptions.ResourceNotFoundFault), 

302 client.exceptions.InvalidDBClusterStateFault) 

303 

304 

305def _run_cluster_method(method, params, ignore=(), warn=(), method_name=""): 

306 try: 

307 method(**params) 

308 except ignore: 

309 pass 

310 except warn as e: 

311 log.warning( 

312 "error %s on cluster %s error %s", 

313 method_name or method.__name__, params['DBClusterIdentifier'], e) 

314 

315 

316@RDSCluster.action_registry.register('snapshot') 

317class Snapshot(BaseAction): 

318 """Action to create a snapshot of a rds cluster 

319 

320 :example: 

321 

322 .. code-block:: yaml 

323 

324 policies: 

325 - name: rds-cluster-snapshot 

326 resource: rds-cluster 

327 actions: 

328 - snapshot 

329 """ 

330 

331 schema = type_schema('snapshot') 

332 permissions = ('rds:CreateDBClusterSnapshot',) 

333 

334 def process(self, clusters): 

335 client = local_session(self.manager.session_factory).client('rds') 

336 for cluster in clusters: 

337 _run_cluster_method( 

338 client.create_db_cluster_snapshot, 

339 dict( 

340 DBClusterSnapshotIdentifier=snapshot_identifier( 

341 'Backup', cluster['DBClusterIdentifier']), 

342 DBClusterIdentifier=cluster['DBClusterIdentifier']), 

343 (client.exceptions.DBClusterNotFoundFault, client.exceptions.ResourceNotFoundFault), 

344 client.exceptions.InvalidDBClusterStateFault) 

345 

346 

347@RDSCluster.action_registry.register('modify-db-cluster') 

348class ModifyDbCluster(BaseAction): 

349 """Modifies an RDS instance based on specified parameter 

350 using ModifyDbInstance. 

351 

352 'Immediate" determines whether the modification is applied immediately 

353 or not. If 'immediate' is not specified, default is false. 

354 

355 :example: 

356 

357 .. code-block:: yaml 

358 

359 policies: 

360 - name: disable-db-cluster-deletion-protection 

361 resource: rds-cluster 

362 filters: 

363 - DeletionProtection: true 

364 - PubliclyAccessible: true 

365 actions: 

366 - type: modify-db-cluster 

367 attributes: 

368 CopyTagsToSnapshot: true 

369 DeletionProtection: false 

370 """ 

371 

372 schema = type_schema( 

373 'modify-db-cluster', 

374 attributes={'type': 'object'}, 

375 required=('attributes',)) 

376 

377 permissions = ('rds:ModifyDBCluster',) 

378 shape = 'ModifyDBClusterMessage' 

379 

380 def validate(self): 

381 attrs = dict(self.data['attributes']) 

382 if 'DBClusterIdentifier' in attrs: 

383 raise PolicyValidationError( 

384 "Can't include DBClusterIdentifier in modify-db-cluster action") 

385 attrs['DBClusterIdentifier'] = 'PolicyValidation' 

386 return shape_validate(attrs, self.shape, 'rds') 

387 

388 def process(self, clusters): 

389 client = local_session(self.manager.session_factory).client('rds') 

390 for c in clusters: 

391 client.modify_db_cluster( 

392 DBClusterIdentifier=c['DBClusterIdentifier'], 

393 **self.data['attributes']) 

394 

395 

396class DescribeClusterSnapshot(DescribeSource): 

397 

398 def get_resources(self, resource_ids, cache=True): 

399 client = local_session(self.manager.session_factory).client('rds') 

400 return self.manager.retry( 

401 client.describe_db_cluster_snapshots, 

402 Filters=[{ 

403 'Name': 'db-cluster-snapshot-id', 

404 'Values': resource_ids}]).get('DBClusterSnapshots', ()) 

405 

406 def augment(self, resources): 

407 for r in resources: 

408 r['Tags'] = r.pop('TagList', ()) 

409 return resources 

410 

411 

412class ConfigClusterSnapshot(ConfigSource): 

413 

414 def load_resource(self, item): 

415 

416 resource = super(ConfigClusterSnapshot, self).load_resource(item) 

417 # db cluster snapshots are particularly mangled on keys 

418 for k, v in list(resource.items()): 

419 if k.startswith('Dbcl'): 

420 resource.pop(k) 

421 k = 'DBCl%s' % k[4:] 

422 resource[k] = v 

423 elif k.startswith('Iamd'): 

424 resource.pop(k) 

425 k = 'IAMD%s' % k[4:] 

426 resource[k] = v 

427 return resource 

428 

429 

430@resources.register('rds-cluster-snapshot') 

431class RDSClusterSnapshot(QueryResourceManager): 

432 """Resource manager for RDS cluster snapshots. 

433 """ 

434 

435 class resource_type(TypeInfo): 

436 service = 'rds' 

437 arn_type = 'cluster-snapshot' 

438 arn_separator = ':' 

439 arn = 'DBClusterSnapshotArn' 

440 enum_spec = ( 

441 'describe_db_cluster_snapshots', 'DBClusterSnapshots', None) 

442 name = id = 'DBClusterSnapshotIdentifier' 

443 date = 'SnapshotCreateTime' 

444 universal_taggable = object() 

445 config_type = 'AWS::RDS::DBClusterSnapshot' 

446 permissions_enum = ('rds:DescribeDBClusterSnapshots',) 

447 

448 source_mapping = { 

449 'describe': DescribeClusterSnapshot, 

450 'config': ConfigClusterSnapshot 

451 } 

452 

453 

454@RDSClusterSnapshot.filter_registry.register('cross-account') 

455class CrossAccountSnapshot(CrossAccountAccessFilter): 

456 

457 permissions = ('rds:DescribeDBClusterSnapshotAttributes',) 

458 attributes_key = 'c7n:attributes' 

459 annotation_key = 'c7n:CrossAccountViolations' 

460 

461 def process(self, resources, event=None): 

462 self.accounts = self.get_accounts() 

463 self.everyone_only = self.data.get("everyone_only", False) 

464 results = [] 

465 with self.executor_factory(max_workers=2) as w: 

466 futures = [] 

467 for resource_set in chunks(resources, 20): 

468 futures.append(w.submit( 

469 self.process_resource_set, resource_set)) 

470 for f in as_completed(futures): 

471 results.extend(f.result()) 

472 return results 

473 

474 def process_resource_set(self, resource_set): 

475 client = local_session(self.manager.session_factory).client('rds') 

476 results = [] 

477 for r in resource_set: 

478 attrs = {t['AttributeName']: t['AttributeValues'] 

479 for t in self.manager.retry( 

480 client.describe_db_cluster_snapshot_attributes, 

481 DBClusterSnapshotIdentifier=r['DBClusterSnapshotIdentifier'])[ 

482 'DBClusterSnapshotAttributesResult']['DBClusterSnapshotAttributes']} 

483 r[self.attributes_key] = attrs 

484 shared_accounts = set(attrs.get('restore', [])) 

485 if self.everyone_only: 

486 shared_accounts = {a for a in shared_accounts if a == 'all'} 

487 delta_accounts = shared_accounts.difference(self.accounts) 

488 if delta_accounts: 

489 r[self.annotation_key] = list(delta_accounts) 

490 results.append(r) 

491 return results 

492 

493 

494@RDSClusterSnapshot.filter_registry.register('age') 

495class RDSSnapshotAge(AgeFilter): 

496 """Filters rds cluster snapshots based on age (in days) 

497 

498 :example: 

499 

500 .. code-block:: yaml 

501 

502 policies: 

503 - name: rds-cluster-snapshots-expired 

504 resource: rds-cluster-snapshot 

505 filters: 

506 - type: age 

507 days: 30 

508 op: gt 

509 """ 

510 

511 schema = type_schema( 

512 'age', days={'type': 'number'}, 

513 op={'$ref': '#/definitions/filters_common/comparison_operators'}) 

514 

515 date_attribute = 'SnapshotCreateTime' 

516 

517 

518@RDSClusterSnapshot.action_registry.register('set-permissions') 

519class SetPermissions(rds.SetPermissions): 

520 """Set permissions for copying or restoring an RDS cluster snapshot 

521 

522 Use the 'add' and 'remove' parameters to control which accounts to 

523 add or remove, respectively. The default is to remove any 

524 permissions granted to other AWS accounts. 

525 

526 Use `remove: matched` in combination with the `cross-account` filter 

527 for more flexible removal options such as preserving access for 

528 a set of whitelisted accounts: 

529 

530 :example: 

531 

532 .. code-block:: yaml 

533 

534 policies: 

535 - name: rds-cluster-snapshot-prune-permissions 

536 resource: rds-cluster-snapshot 

537 filters: 

538 - type: cross-account 

539 whitelist: 

540 - '112233445566' 

541 actions: 

542 - type: set-permissions 

543 remove: matched 

544 """ 

545 permissions = ('rds:ModifyDBClusterSnapshotAttribute',) 

546 

547 def process_snapshot(self, client, snapshot): 

548 add_accounts = self.data.get('add', []) 

549 remove_accounts = self.data.get('remove', []) 

550 

551 if not (add_accounts or remove_accounts): 

552 if CrossAccountSnapshot.attributes_key not in snapshot: 

553 attrs = { 

554 t['AttributeName']: t['AttributeValues'] 

555 for t in self.manager.retry( 

556 client.describe_db_cluster_snapshot_attributes, 

557 DBClusterSnapshotIdentifier=snapshot['DBClusterSnapshotIdentifier'] 

558 )['DBClusterSnapshotAttributesResult']['DBClusterSnapshotAttributes'] 

559 } 

560 snapshot[CrossAccountSnapshot.attributes_key] = attrs 

561 remove_accounts = snapshot[CrossAccountSnapshot.attributes_key].get('restore', []) 

562 elif remove_accounts == 'matched': 

563 remove_accounts = snapshot.get(CrossAccountSnapshot.annotation_key, []) 

564 

565 if add_accounts or remove_accounts: 

566 client.modify_db_cluster_snapshot_attribute( 

567 DBClusterSnapshotIdentifier=snapshot['DBClusterSnapshotIdentifier'], 

568 AttributeName='restore', 

569 ValuesToRemove=remove_accounts, 

570 ValuesToAdd=add_accounts) 

571 

572 

573@RDSClusterSnapshot.action_registry.register('delete') 

574class RDSClusterSnapshotDelete(BaseAction): 

575 """Action to delete rds cluster snapshots 

576 

577 To prevent unwanted deletion of rds cluster snapshots, it is recommended 

578 to apply a filter to the rule 

579 

580 :example: 

581 

582 .. code-block:: yaml 

583 

584 policies: 

585 - name: rds-cluster-snapshots-expired-delete 

586 resource: rds-cluster-snapshot 

587 filters: 

588 - type: age 

589 days: 30 

590 op: gt 

591 actions: 

592 - delete 

593 """ 

594 

595 schema = type_schema('delete') 

596 permissions = ('rds:DeleteDBClusterSnapshot',) 

597 

598 def process(self, snapshots): 

599 self.log.info("Deleting %d RDS cluster snapshots", len(snapshots)) 

600 client = local_session(self.manager.session_factory).client('rds') 

601 error = None 

602 with self.executor_factory(max_workers=2) as w: 

603 futures = [] 

604 for snapshot_set in chunks(reversed(snapshots), size=50): 

605 futures.append( 

606 w.submit(self.process_snapshot_set, client, snapshot_set)) 

607 for f in as_completed(futures): 

608 if f.exception(): 

609 error = f.exception() 

610 self.log.error( 

611 "Exception deleting snapshot set \n %s", 

612 f.exception()) 

613 if error: 

614 raise error 

615 return snapshots 

616 

617 def process_snapshot_set(self, client, snapshots_set): 

618 for s in snapshots_set: 

619 try: 

620 client.delete_db_cluster_snapshot( 

621 DBClusterSnapshotIdentifier=s['DBClusterSnapshotIdentifier']) 

622 except (client.exceptions.DBSnapshotNotFoundFault, 

623 client.exceptions.InvalidDBSnapshotStateFault): 

624 continue 

625 

626 

627@RDSClusterSnapshot.action_registry.register("region-copy") 

628class RDSClusterSnapshotRegionCopy(BaseAction): 

629 """Copy an cluster snapshot across regions 

630 

631 

632 Example:: 

633 

634 - name: copy-encrypted-cluster-snapshots 

635 description: | 

636 copy cluster snapshots under 1 day old to dr region with kms 

637 resource: rds-cluster-snapshot 

638 region: us-east-1 

639 filters: 

640 - Status: available 

641 - type: value 

642 key: SnapshotCreateTime 

643 value_type: age 

644 value: 1 

645 op: less-than 

646 actions: 

647 - type: region-copy 

648 target_region: us-east-2 

649 target_key: arn:aws:kms:us-east-2:644160558196:key/b10f842a-feb7-4318-92d5-0640a75b7688 

650 copy_tags: true 

651 tags: 

652 OriginRegion: us-east-1 

653 """ 

654 

655 schema = type_schema( 

656 "region-copy", 

657 target_region={"type": "string"}, 

658 target_key={"type": "string"}, 

659 copy_tags={"type": "boolean"}, 

660 tags={"type": "object"}, 

661 required=("target_region",), 

662 ) 

663 

664 permissions = ("rds:CopyDBClusterSnapshot",) 

665 min_delay = 120 

666 max_attempts = 30 

667 

668 def validate(self): 

669 if self.data.get('target_region') and self.manager.data.get('mode'): 

670 raise PolicyValidationError( 

671 "cross region snapshot may require waiting for " 

672 "longer then lambda runtime allows %s" % (self.manager.data,)) 

673 return self 

674 

675 def process(self, resources): 

676 if self.data['target_region'] == self.manager.config.region: 

677 self.log.warning( 

678 "Source and destination region are the same, skipping copy") 

679 return 

680 for resource_set in chunks(resources, 20): 

681 self.process_resource_set(resource_set) 

682 

683 def process_resource(self, target, key, tags, snapshot): 

684 p = {} 

685 if key: 

686 p['KmsKeyId'] = key 

687 p['TargetDBClusterSnapshotIdentifier'] = snapshot[ 

688 'DBClusterSnapshotIdentifier'].replace(':', '-') 

689 p['SourceRegion'] = self.manager.config.region 

690 p['SourceDBClusterSnapshotIdentifier'] = snapshot['DBClusterSnapshotArn'] 

691 

692 if self.data.get('copy_tags', True): 

693 p['CopyTags'] = True 

694 if tags: 

695 p['Tags'] = tags 

696 

697 retry = get_retry( 

698 ('SnapshotQuotaExceeded',), 

699 # TODO make this configurable, class defaults to 1hr 

700 min_delay=self.min_delay, 

701 max_attempts=self.max_attempts, 

702 log_retries=logging.DEBUG) 

703 

704 try: 

705 result = retry(target.copy_db_cluster_snapshot, **p) 

706 except ClientError as e: 

707 if e.response['Error']['Code'] == 'DBClusterSnapshotAlreadyExists': 

708 self.log.warning( 

709 "Cluster snapshot %s already exists in target region", 

710 snapshot['DBClusterSnapshotIdentifier']) 

711 return 

712 raise 

713 snapshot['c7n:CopiedClusterSnapshot'] = result[ 

714 'DBClusterSnapshot']['DBClusterSnapshotArn'] 

715 

716 def process_resource_set(self, resource_set): 

717 target_client = self.manager.session_factory( 

718 region=self.data['target_region']).client('rds') 

719 target_key = self.data.get('target_key') 

720 tags = [{'Key': k, 'Value': v} for k, v 

721 in self.data.get('tags', {}).items()] 

722 

723 for snapshot_set in chunks(resource_set, 5): 

724 for r in snapshot_set: 

725 # If tags are supplied, copy tags are ignored, and 

726 # we need to augment the tag set with the original 

727 # resource tags to preserve the common case. 

728 rtags = tags and list(tags) or None 

729 if tags and self.data.get('copy_tags', True): 

730 rtags.extend(r['Tags']) 

731 self.process_resource(target_client, target_key, rtags, r) 

732 

733 

734RDSCluster.filter_registry.register('consecutive-aws-backups', ConsecutiveAwsBackupsFilter) 

735 

736 

737@RDSCluster.filter_registry.register('consecutive-snapshots') 

738class ConsecutiveSnapshots(Filter): 

739 """Returns RDS clusters where number of consective daily snapshots is equal to/or greater 

740 than n days. 

741 

742 :example: 

743 

744 .. code-block:: yaml 

745 

746 policies: 

747 - name: rdscluster-daily-snapshot-count 

748 resource: rds-cluster 

749 filters: 

750 - type: consecutive-snapshots 

751 days: 7 

752 """ 

753 schema = type_schema('consecutive-snapshots', days={'type': 'number', 'minimum': 1}, 

754 required=['days']) 

755 permissions = ('rds:DescribeDBClusterSnapshots', 'rds:DescribeDBClusters') 

756 annotation = 'c7n:DBClusterSnapshots' 

757 

758 def process_resource_set(self, client, resources): 

759 rds_clusters = [r['DBClusterIdentifier'] for r in resources] 

760 paginator = client.get_paginator('describe_db_cluster_snapshots') 

761 paginator.PAGE_ITERATOR_CLS = RetryPageIterator 

762 cluster_snapshots = paginator.paginate(Filters=[{'Name': 'db-cluster-id', 

763 'Values': rds_clusters}]).build_full_result().get('DBClusterSnapshots', []) 

764 

765 cluster_map = {} 

766 for snapshot in cluster_snapshots: 

767 cluster_map.setdefault(snapshot['DBClusterIdentifier'], []).append(snapshot) 

768 for r in resources: 

769 r[self.annotation] = cluster_map.get(r['DBClusterIdentifier'], []) 

770 

771 def process(self, resources, event=None): 

772 client = local_session(self.manager.session_factory).client('rds') 

773 results = [] 

774 retention = self.data.get('days') 

775 utcnow = datetime.utcnow() 

776 expected_dates = set() 

777 for days in range(1, retention + 1): 

778 expected_dates.add((utcnow - timedelta(days=days)).strftime('%Y-%m-%d')) 

779 

780 for resource_set in chunks( 

781 [r for r in resources if self.annotation not in r], 50): 

782 self.process_resource_set(client, resource_set) 

783 

784 for r in resources: 

785 snapshot_dates = set() 

786 for snapshot in r[self.annotation]: 

787 if snapshot['Status'] == 'available': 

788 snapshot_dates.add(snapshot['SnapshotCreateTime'].strftime('%Y-%m-%d')) 

789 if expected_dates.issubset(snapshot_dates): 

790 results.append(r) 

791 return results 

792 

793 

794@RDSCluster.filter_registry.register('db-cluster-parameter') 

795class ClusterParameterFilter(ParameterFilter): 

796 """ 

797 Applies value type filter on set db cluster parameter values. 

798 

799 :example: 

800 

801 .. code-block:: yaml 

802 

803 policies: 

804 - name: rdscluster-pg 

805 resource: rds-cluster 

806 filters: 

807 - type: db-cluster-parameter 

808 key: someparam 

809 op: eq 

810 value: someval 

811 """ 

812 schema = type_schema('db-cluster-parameter', rinherit=ValueFilter.schema) 

813 schema_alias = False 

814 permissions = ('rds:DescribeDBInstances', 'rds:DescribeDBParameters',) 

815 policy_annotation = 'c7n:MatchedDBClusterParameter' 

816 param_group_attribute = 'DBClusterParameterGroup' 

817 

818 def _get_param_list(self, pg): 

819 client = local_session(self.manager.session_factory).client('rds') 

820 paginator = client.get_paginator('describe_db_cluster_parameters') 

821 param_list = list(itertools.chain(*[p['Parameters'] 

822 for p in paginator.paginate(DBClusterParameterGroupName=pg)])) 

823 return param_list 

824 

825 def process(self, resources, event=None): 

826 results = [] 

827 parameter_group_list = {db.get(self.param_group_attribute) for db in resources} 

828 paramcache = self.handle_paramgroup_cache(parameter_group_list) 

829 for resource in resources: 

830 pg_values = paramcache[resource['DBClusterParameterGroup']] 

831 if self.match(pg_values): 

832 resource.setdefault(self.policy_annotation, []).append( 

833 self.data.get('key')) 

834 results.append(resource) 

835 return results 

836 

837 

838@RDSCluster.filter_registry.register('pending-maintenance') 

839class PendingMaintenance(Filter): 

840 """ 

841 Scan DB Clusters for those with pending maintenance 

842 

843 :example: 

844 

845 .. code-block:: yaml 

846 

847 policies: 

848 - name: rds-cluster-pending-maintenance 

849 resource: rds-cluster 

850 filters: 

851 - pending-maintenance 

852 - type: value 

853 key: '"c7n:PendingMaintenance".PendingMaintenanceActionDetails[].Action' 

854 op: intersect 

855 value: 

856 - system-update 

857 """ 

858 

859 annotation_key = 'c7n:PendingMaintenance' 

860 schema = type_schema('pending-maintenance') 

861 permissions = ('rds:DescribePendingMaintenanceActions',) 

862 

863 def process(self, resources, event=None): 

864 client = local_session(self.manager.session_factory).client('rds') 

865 

866 results = [] 

867 resource_maintenances = {} 

868 paginator = client.get_paginator('describe_pending_maintenance_actions') 

869 for page in paginator.paginate(): 

870 for action in page['PendingMaintenanceActions']: 

871 resource_maintenances.setdefault(action['ResourceIdentifier'], []).append(action) 

872 

873 for r in resources: 

874 pending_maintenances = resource_maintenances.get(r['DBClusterArn'], []) 

875 if len(pending_maintenances) > 0: 

876 r[self.annotation_key] = pending_maintenances 

877 results.append(r) 

878 

879 return results 

880 

881 

882class DescribeDbShardGroup(DescribeSource): 

883 def augment(self, resources): 

884 for r in resources: 

885 r['Tags'] = r.pop('TagList', ()) 

886 return resources 

887 

888 

889@resources.register('rds-db-shard-group') 

890class RDSDbShardGroup(QueryResourceManager): 

891 class resource_type(TypeInfo): 

892 service = 'rds' 

893 arn = 'DBShardGroupArn' 

894 name = 'DBShardGroupIdentifier' 

895 id = 'DBShardGroupResourceId' 

896 enum_spec = ('describe_db_shard_groups', 'DBShardGroups', None) 

897 cfn_type = 'AWS::RDS::DBShardGroup' 

898 permissions_enum = ("rds:DescribeDBShardGroups",) 

899 universal_taggable = object() 

900 

901 source_mapping = { 

902 'describe': DescribeDbShardGroup 

903 }