Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/c7n/resources/rds.py: 40%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

949 statements  

1# Copyright The Cloud Custodian Authors. 

2# SPDX-License-Identifier: Apache-2.0 

3""" 

4RDS Resource Manager 

5==================== 

6 

7Example Policies 

8---------------- 

9 

10Find rds instances that are publicly available 

11 

12.. code-block:: yaml 

13 

14 policies: 

15 - name: rds-public 

16 resource: rds 

17 filters: 

18 - PubliclyAccessible: true 

19 

20Find rds instances that are not encrypted 

21 

22.. code-block:: yaml 

23 

24 policies: 

25 - name: rds-non-encrypted 

26 resource: rds 

27 filters: 

28 - type: value 

29 key: StorageEncrypted 

30 value: true 

31 op: ne 

32 

33""" 

34import functools 

35import itertools 

36import logging 

37import operator 

38import re 

39import datetime 

40 

41from datetime import timedelta 

42 

43from decimal import Decimal as D, ROUND_HALF_UP 

44 

45from c7n.vendored.distutils.version import LooseVersion 

46from botocore.exceptions import ClientError 

47from concurrent.futures import as_completed 

48 

49from c7n.actions import ( 

50 ActionRegistry, BaseAction, ModifyVpcSecurityGroupsAction) 

51 

52from c7n.exceptions import PolicyValidationError 

53from c7n.filters import ( 

54 CrossAccountAccessFilter, FilterRegistry, Filter, ValueFilter, AgeFilter) 

55from c7n.filters.offhours import OffHour, OnHour 

56from c7n.filters import related 

57import c7n.filters.vpc as net_filters 

58from c7n.manager import resources 

59from c7n.query import ( 

60 QueryResourceManager, DescribeSource, ConfigSource, TypeInfo, RetryPageIterator) 

61from c7n import deprecated, tags 

62from c7n.tags import universal_augment 

63 

64from c7n.utils import ( 

65 local_session, type_schema, get_retry, chunks, snapshot_identifier, 

66 merge_dict_list, filter_empty, jmespath_search) 

67from c7n.resources.kms import ResourceKmsKeyAlias 

68from c7n.resources.securityhub import PostFinding 

69from c7n.filters.backup import ConsecutiveAwsBackupsFilter 

70 

71log = logging.getLogger('custodian.rds') 

72 

73filters = FilterRegistry('rds.filters') 

74actions = ActionRegistry('rds.actions') 

75 

76 

77class DescribeRDS(DescribeSource): 

78 

79 def augment(self, dbs): 

80 for d in dbs: 

81 d['Tags'] = d.pop('TagList', ()) 

82 return dbs 

83 

84 

85class ConfigRDS(ConfigSource): 

86 

87 def load_resource(self, item): 

88 resource = super().load_resource(item) 

89 for k in list(resource.keys()): 

90 if k.startswith('Db'): 

91 resource["DB%s" % k[2:]] = resource[k] 

92 return resource 

93 

94 

95@resources.register('rds') 

96class RDS(QueryResourceManager): 

97 """Resource manager for RDS DB instances. 

98 """ 

99 

100 class resource_type(TypeInfo): 

101 service = 'rds' 

102 arn_type = 'db' 

103 arn_separator = ':' 

104 enum_spec = ('describe_db_instances', 'DBInstances', None) 

105 id = 'DBInstanceIdentifier' 

106 config_id = 'DbiResourceId' 

107 name = 'Endpoint.Address' 

108 filter_name = 'DBInstanceIdentifier' 

109 filter_type = 'scalar' 

110 date = 'InstanceCreateTime' 

111 dimension = 'DBInstanceIdentifier' 

112 cfn_type = config_type = 'AWS::RDS::DBInstance' 

113 arn = 'DBInstanceArn' 

114 universal_taggable = True 

115 default_report_fields = ( 

116 'DBInstanceIdentifier', 

117 'DBName', 

118 'Engine', 

119 'EngineVersion', 

120 'MultiAZ', 

121 'AllocatedStorage', 

122 'StorageEncrypted', 

123 'PubliclyAccessible', 

124 'InstanceCreateTime', 

125 ) 

126 permissions_enum = ('rds:DescribeDBInstances',) 

127 

128 filter_registry = filters 

129 action_registry = actions 

130 

131 def resources(self, query=None): 

132 if query is None and 'query' in self.data: 

133 query = merge_dict_list(self.data['query']) 

134 elif query is None: 

135 query = {} 

136 return super(RDS, self).resources(query=query) 

137 

138 source_mapping = { 

139 'describe': DescribeRDS, 

140 'config': ConfigRDS 

141 } 

142 

143 

144def _db_instance_eligible_for_backup(resource): 

145 db_instance_id = resource['DBInstanceIdentifier'] 

146 

147 # Database instance is not in available state 

148 if resource.get('DBInstanceStatus', '') != 'available': 

149 log.debug( 

150 "DB instance %s is not in available state", 

151 db_instance_id) 

152 return False 

153 # The specified DB Instance is a member of a cluster and its 

154 # backup retention should not be modified directly. Instead, 

155 # modify the backup retention of the cluster using the 

156 # ModifyDbCluster API 

157 if resource.get('DBClusterIdentifier', ''): 

158 log.debug( 

159 "DB instance %s is a cluster member", 

160 db_instance_id) 

161 return False 

162 # DB Backups not supported on a read replica for engine postgres 

163 if (resource.get('ReadReplicaSourceDBInstanceIdentifier', '') and 

164 resource.get('Engine', '') == 'postgres'): 

165 log.debug( 

166 "DB instance %s is a postgres read-replica", 

167 db_instance_id) 

168 return False 

169 # DB Backups not supported on a read replica running a mysql 

170 # version before 5.6 

171 if (resource.get('ReadReplicaSourceDBInstanceIdentifier', '') and 

172 resource.get('Engine', '') == 'mysql'): 

173 engine_version = resource.get('EngineVersion', '') 

174 # Assume "<major>.<minor>.<whatever>" 

175 match = re.match(r'(?P<major>\d+)\.(?P<minor>\d+)\..*', engine_version) 

176 if (match and int(match.group('major')) < 5 or 

177 (int(match.group('major')) == 5 and int(match.group('minor')) < 6)): 

178 log.debug( 

179 "DB instance %s is a version %s mysql read-replica", 

180 db_instance_id, 

181 engine_version) 

182 return False 

183 return True 

184 

185 

186def _db_instance_eligible_for_final_snapshot(resource): 

187 status = resource.get('DBInstanceStatus', '') 

188 # If the DB instance you are deleting has a status of "Creating," 

189 # you will not be able to have a final DB snapshot taken 

190 # If the DB instance is in a failure state with a status of "failed," 

191 # "incompatible-restore," or "incompatible-network," you can only delete 

192 # the instance when the SkipFinalSnapshot parameter is set to "true." 

193 eligible_for_final_snapshot = True 

194 if status in ['creating', 'failed', 'incompatible-restore', 'incompatible-network']: 

195 eligible_for_final_snapshot = False 

196 

197 # FinalDBSnapshotIdentifier can not be specified when deleting a 

198 # replica instance 

199 if resource.get('ReadReplicaSourceDBInstanceIdentifier', ''): 

200 eligible_for_final_snapshot = False 

201 

202 # if it's a rds-cluster, don't try to run the rds instance snapshot api call 

203 if resource.get('DBClusterIdentifier', False): 

204 eligible_for_final_snapshot = False 

205 

206 if not eligible_for_final_snapshot: 

207 log.debug('DB instance is not eligible for a snapshot:/n %s', resource) 

208 return eligible_for_final_snapshot 

209 

210 

211def _get_available_engine_upgrades(client, major=False): 

212 """Returns all extant rds engine upgrades. 

213 

214 As a nested mapping of engine type to known versions 

215 and their upgrades. 

216 

217 Defaults to minor upgrades, but configurable to major. 

218 

219 Example:: 

220 

221 >>> _get_available_engine_upgrades(client) 

222 { 

223 'oracle-se2': {'12.1.0.2.v2': '12.1.0.2.v5', 

224 '12.1.0.2.v3': '12.1.0.2.v5'}, 

225 'postgres': {'9.3.1': '9.3.14', 

226 '9.3.10': '9.3.14', 

227 '9.3.12': '9.3.14', 

228 '9.3.2': '9.3.14'} 

229 } 

230 """ 

231 results = {} 

232 paginator = client.get_paginator('describe_db_engine_versions') 

233 for page in paginator.paginate(): 

234 engine_versions = page['DBEngineVersions'] 

235 for v in engine_versions: 

236 if v['Engine'] not in results: 

237 results[v['Engine']] = {} 

238 if 'ValidUpgradeTarget' not in v or len(v['ValidUpgradeTarget']) == 0: 

239 continue 

240 for t in v['ValidUpgradeTarget']: 

241 if not major and t['IsMajorVersionUpgrade']: 

242 continue 

243 if LooseVersion(t['EngineVersion']) > LooseVersion( 

244 results[v['Engine']].get(v['EngineVersion'], '0.0.0')): 

245 results[v['Engine']][v['EngineVersion']] = t['EngineVersion'] 

246 return results 

247 

248 

249filters.register('offhour', OffHour) 

250filters.register('onhour', OnHour) 

251 

252 

253@filters.register('default-vpc') 

254class DefaultVpc(net_filters.DefaultVpcBase): 

255 """ Matches if an rds database is in the default vpc 

256 

257 :example: 

258 

259 .. code-block:: yaml 

260 

261 policies: 

262 - name: default-vpc-rds 

263 resource: rds 

264 filters: 

265 - type: default-vpc 

266 """ 

267 schema = type_schema('default-vpc') 

268 

269 def __call__(self, rdb): 

270 return self.match(rdb['DBSubnetGroup']['VpcId']) 

271 

272 

273@filters.register('security-group') 

274class SecurityGroupFilter(net_filters.SecurityGroupFilter): 

275 

276 RelatedIdsExpression = "VpcSecurityGroups[].VpcSecurityGroupId" 

277 

278 

279@filters.register('subnet') 

280class SubnetFilter(net_filters.SubnetFilter): 

281 

282 RelatedIdsExpression = "DBSubnetGroup.Subnets[].SubnetIdentifier" 

283 

284 

285@filters.register('vpc') 

286class VpcFilter(net_filters.VpcFilter): 

287 

288 RelatedIdsExpression = "DBSubnetGroup.VpcId" 

289 

290 

291filters.register('network-location', net_filters.NetworkLocation) 

292 

293 

294@filters.register('kms-alias') 

295class KmsKeyAlias(ResourceKmsKeyAlias): 

296 

297 def process(self, dbs, event=None): 

298 return self.get_matching_aliases(dbs) 

299 

300 

301@actions.register('auto-patch') 

302class AutoPatch(BaseAction): 

303 """Toggle AutoMinorUpgrade flag on RDS instance 

304 

305 'window' parameter needs to be in the format 'ddd:hh:mm-ddd:hh:mm' and 

306 have at least 30 minutes between start & end time. 

307 If 'window' is not specified, AWS will assign a random maintenance window 

308 to each instance selected. 

309 

310 :example: 

311 

312 .. code-block:: yaml 

313 

314 policies: 

315 - name: enable-rds-autopatch 

316 resource: rds 

317 filters: 

318 - AutoMinorVersionUpgrade: false 

319 actions: 

320 - type: auto-patch 

321 minor: true 

322 window: Mon:23:00-Tue:01:00 

323 """ 

324 

325 schema = type_schema( 

326 'auto-patch', 

327 minor={'type': 'boolean'}, window={'type': 'string'}) 

328 permissions = ('rds:ModifyDBInstance',) 

329 

330 def process(self, dbs): 

331 client = local_session( 

332 self.manager.session_factory).client('rds') 

333 

334 params = {'AutoMinorVersionUpgrade': self.data.get('minor', True)} 

335 if self.data.get('window'): 

336 params['PreferredMaintenanceWindow'] = self.data['window'] 

337 

338 for db in dbs: 

339 client.modify_db_instance( 

340 DBInstanceIdentifier=db['DBInstanceIdentifier'], 

341 **params) 

342 

343 

344@filters.register('upgrade-available') 

345class UpgradeAvailable(Filter): 

346 """ Scan DB instances for available engine upgrades 

347 

348 This will pull DB instances & check their specific engine for any 

349 engine version with higher release numbers than the current one 

350 

351 This will also annotate the rds instance with 'target_engine' which is 

352 the most recent version of the engine available 

353 

354 :example: 

355 

356 .. code-block:: yaml 

357 

358 policies: 

359 - name: rds-upgrade-available 

360 resource: rds 

361 filters: 

362 - type: upgrade-available 

363 major: False 

364 

365 """ 

366 

367 schema = type_schema('upgrade-available', 

368 major={'type': 'boolean'}, 

369 value={'type': 'boolean'}) 

370 permissions = ('rds:DescribeDBEngineVersions',) 

371 

372 def process(self, resources, event=None): 

373 client = local_session(self.manager.session_factory).client('rds') 

374 check_upgrade_extant = self.data.get('value', True) 

375 check_major = self.data.get('major', False) 

376 engine_upgrades = _get_available_engine_upgrades( 

377 client, major=check_major) 

378 results = [] 

379 

380 for r in resources: 

381 target_upgrade = engine_upgrades.get( 

382 r['Engine'], {}).get(r['EngineVersion']) 

383 if target_upgrade is None: 

384 if check_upgrade_extant is False: 

385 results.append(r) 

386 continue 

387 r['c7n-rds-engine-upgrade'] = target_upgrade 

388 results.append(r) 

389 return results 

390 

391 

392@actions.register('upgrade') 

393class UpgradeMinor(BaseAction): 

394 """Upgrades a RDS instance to the latest major/minor version available 

395 

396 Use of the 'immediate' flag (default False) will automatically upgrade 

397 the RDS engine disregarding the existing maintenance window. 

398 

399 :example: 

400 

401 .. code-block:: yaml 

402 

403 policies: 

404 - name: upgrade-rds-minor 

405 resource: rds 

406 actions: 

407 - type: upgrade 

408 major: False 

409 immediate: False 

410 

411 """ 

412 

413 schema = type_schema( 

414 'upgrade', 

415 major={'type': 'boolean'}, 

416 immediate={'type': 'boolean'}) 

417 permissions = ('rds:ModifyDBInstance',) 

418 

419 def process(self, resources): 

420 client = local_session(self.manager.session_factory).client('rds') 

421 engine_upgrades = None 

422 for r in resources: 

423 if 'EngineVersion' in r['PendingModifiedValues']: 

424 # Upgrade has already been scheduled 

425 continue 

426 if 'c7n-rds-engine-upgrade' not in r: 

427 if engine_upgrades is None: 

428 engine_upgrades = _get_available_engine_upgrades( 

429 client, major=self.data.get('major', False)) 

430 target = engine_upgrades.get( 

431 r['Engine'], {}).get(r['EngineVersion']) 

432 if target is None: 

433 log.debug( 

434 "implicit filter no upgrade on %s", 

435 r['DBInstanceIdentifier']) 

436 continue 

437 r['c7n-rds-engine-upgrade'] = target 

438 client.modify_db_instance( 

439 DBInstanceIdentifier=r['DBInstanceIdentifier'], 

440 EngineVersion=r['c7n-rds-engine-upgrade'], 

441 ApplyImmediately=self.data.get('immediate', False)) 

442 

443 

444@actions.register('tag-trim') 

445class TagTrim(tags.TagTrim): 

446 

447 permissions = ('rds:RemoveTagsFromResource',) 

448 

449 def process_tag_removal(self, client, resource, candidates): 

450 client.remove_tags_from_resource(ResourceName=resource['DBInstanceArn'], TagKeys=candidates) 

451 

452 

453START_STOP_ELIGIBLE_ENGINES = { 

454 'postgres', 'sqlserver-ee', 

455 'oracle-se2', 'mariadb', 'oracle-ee', 

456 'sqlserver-ex', 'sqlserver-se', 'oracle-se', 

457 'mysql', 'oracle-se1', 'sqlserver-web', 

458 'db2-ae', 'db2-se', 'oracle-ee-cdb', 

459 'sqlserver-ee', 'oracle-se2-cdb'} 

460 

461 

462def _eligible_start_stop(db, state="available"): 

463 # See conditions noted here 

464 # https://docs.aws.amazon.com/AmazonRDS/latest/UserGuide/USER_StopInstance.html 

465 # Note that this doesn't really specify what happens for all the nosql engines 

466 # that are available as rds engines. 

467 if db.get('DBInstanceStatus') != state: 

468 return False 

469 

470 if db.get('MultiAZ') and db['Engine'].startswith('sqlserver-'): 

471 return False 

472 

473 if db['Engine'] not in START_STOP_ELIGIBLE_ENGINES: 

474 return False 

475 

476 if db.get('ReadReplicaDBInstanceIdentifiers'): 

477 return False 

478 

479 if db.get('ReadReplicaSourceDBInstanceIdentifier'): 

480 return False 

481 

482 # TODO is SQL Server mirror is detectable. 

483 return True 

484 

485 

486@actions.register('stop') 

487class Stop(BaseAction): 

488 """Stop an rds instance. 

489 

490 https://docs.aws.amazon.com/AmazonRDS/latest/UserGuide/USER_StopInstance.html 

491 """ 

492 

493 schema = type_schema('stop') 

494 

495 permissions = ("rds:StopDBInstance",) 

496 

497 def process(self, resources): 

498 client = local_session(self.manager.session_factory).client('rds') 

499 for r in filter(_eligible_start_stop, resources): 

500 try: 

501 client.stop_db_instance( 

502 DBInstanceIdentifier=r['DBInstanceIdentifier']) 

503 except ClientError as e: 

504 log.exception( 

505 "Error stopping db instance:%s err:%s", 

506 r['DBInstanceIdentifier'], e) 

507 

508 

509@actions.register('start') 

510class Start(BaseAction): 

511 """Start an rds instance. 

512 """ 

513 

514 schema = type_schema('start') 

515 

516 permissions = ("rds:StartDBInstance",) 

517 

518 def process(self, resources): 

519 client = local_session(self.manager.session_factory).client('rds') 

520 start_filter = functools.partial(_eligible_start_stop, state='stopped') 

521 for r in filter(start_filter, resources): 

522 try: 

523 client.start_db_instance( 

524 DBInstanceIdentifier=r['DBInstanceIdentifier']) 

525 except ClientError as e: 

526 log.exception( 

527 "Error starting db instance:%s err:%s", 

528 r['DBInstanceIdentifier'], e) 

529 

530 

531@actions.register('delete') 

532class Delete(BaseAction): 

533 """Deletes selected RDS instances 

534 

535 This will delete RDS instances. It is recommended to apply with a filter 

536 to avoid deleting all RDS instances in the account. 

537 

538 :example: 

539 

540 .. code-block:: yaml 

541 

542 policies: 

543 - name: rds-delete 

544 resource: rds 

545 filters: 

546 - default-vpc 

547 actions: 

548 - type: delete 

549 skip-snapshot: true 

550 """ 

551 

552 schema = type_schema('delete', **{ 

553 'skip-snapshot': {'type': 'boolean'}, 

554 'copy-restore-info': {'type': 'boolean'} 

555 }) 

556 

557 permissions = ('rds:DeleteDBInstance', 'rds:AddTagsToResource') 

558 

559 def validate(self): 

560 if self.data.get('skip-snapshot', False) and self.data.get( 

561 'copy-restore-info'): 

562 raise PolicyValidationError( 

563 "skip-snapshot cannot be specified with copy-restore-info on %s" % ( 

564 self.manager.data,)) 

565 return self 

566 

567 def process(self, dbs): 

568 skip = self.data.get('skip-snapshot', False) 

569 # Can't delete an instance in an aurora cluster, use a policy on the cluster 

570 dbs = [r for r in dbs if not r.get('DBClusterIdentifier')] 

571 # Concurrency feels like overkill here. 

572 client = local_session(self.manager.session_factory).client('rds') 

573 for db in dbs: 

574 params = dict( 

575 DBInstanceIdentifier=db['DBInstanceIdentifier']) 

576 if skip or not _db_instance_eligible_for_final_snapshot(db): 

577 params['SkipFinalSnapshot'] = True 

578 else: 

579 params['FinalDBSnapshotIdentifier'] = snapshot_identifier( 

580 'Final', db['DBInstanceIdentifier']) 

581 if self.data.get('copy-restore-info', False): 

582 self.copy_restore_info(client, db) 

583 if not db['CopyTagsToSnapshot']: 

584 client.modify_db_instance( 

585 DBInstanceIdentifier=db['DBInstanceIdentifier'], 

586 CopyTagsToSnapshot=True) 

587 self.log.info( 

588 "Deleting rds: %s snapshot: %s", 

589 db['DBInstanceIdentifier'], 

590 params.get('FinalDBSnapshotIdentifier', False)) 

591 

592 try: 

593 client.delete_db_instance(**params) 

594 except ClientError as e: 

595 if e.response['Error']['Code'] == "InvalidDBInstanceState": 

596 continue 

597 raise 

598 

599 return dbs 

600 

601 def copy_restore_info(self, client, instance): 

602 tags = [] 

603 tags.append({ 

604 'Key': 'VPCSecurityGroups', 

605 'Value': ''.join([ 

606 g['VpcSecurityGroupId'] for g in instance['VpcSecurityGroups'] 

607 ])}) 

608 tags.append({ 

609 'Key': 'OptionGroupName', 

610 'Value': instance['OptionGroupMemberships'][0]['OptionGroupName']}) 

611 tags.append({ 

612 'Key': 'ParameterGroupName', 

613 'Value': instance['DBParameterGroups'][0]['DBParameterGroupName']}) 

614 tags.append({ 

615 'Key': 'InstanceClass', 

616 'Value': instance['DBInstanceClass']}) 

617 tags.append({ 

618 'Key': 'StorageType', 

619 'Value': instance['StorageType']}) 

620 tags.append({ 

621 'Key': 'MultiAZ', 

622 'Value': str(instance['MultiAZ'])}) 

623 tags.append({ 

624 'Key': 'DBSubnetGroupName', 

625 'Value': instance['DBSubnetGroup']['DBSubnetGroupName']}) 

626 client.add_tags_to_resource( 

627 ResourceName=self.manager.generate_arn( 

628 instance['DBInstanceIdentifier']), 

629 Tags=tags) 

630 

631 

632@actions.register('set-snapshot-copy-tags') 

633class CopySnapshotTags(BaseAction): 

634 """Enables copying tags from rds instance to snapshot 

635 

636 DEPRECATED - use modify-db instead with `CopyTagsToSnapshot` 

637 

638 :example: 

639 

640 .. code-block:: yaml 

641 

642 policies: 

643 - name: enable-rds-snapshot-tags 

644 resource: rds 

645 filters: 

646 - type: value 

647 key: Engine 

648 value: aurora 

649 op: eq 

650 actions: 

651 - type: set-snapshot-copy-tags 

652 enable: True 

653 """ 

654 deprecations = ( 

655 deprecated.action("use modify-db instead with `CopyTagsToSnapshot`"), 

656 ) 

657 

658 schema = type_schema( 

659 'set-snapshot-copy-tags', 

660 enable={'type': 'boolean'}) 

661 permissions = ('rds:ModifyDBInstance',) 

662 

663 def process(self, resources): 

664 error = None 

665 with self.executor_factory(max_workers=2) as w: 

666 futures = {} 

667 client = local_session(self.manager.session_factory).client('rds') 

668 resources = [r for r in resources 

669 if r['CopyTagsToSnapshot'] != self.data.get('enable', True)] 

670 for r in resources: 

671 futures[w.submit(self.set_snapshot_tags, client, r)] = r 

672 for f in as_completed(futures): 

673 if f.exception(): 

674 error = f.exception() 

675 self.log.error( 

676 'error updating rds:%s CopyTagsToSnapshot \n %s', 

677 futures[f]['DBInstanceIdentifier'], error) 

678 if error: 

679 raise error 

680 return resources 

681 

682 def set_snapshot_tags(self, client, r): 

683 self.manager.retry( 

684 client.modify_db_instance, 

685 DBInstanceIdentifier=r['DBInstanceIdentifier'], 

686 CopyTagsToSnapshot=self.data.get('enable', True)) 

687 

688 

689@RDS.action_registry.register('post-finding') 

690class DbInstanceFinding(PostFinding): 

691 

692 resource_type = 'AwsRdsDbInstance' 

693 

694 def format_resource(self, r): 

695 

696 fields = [ 

697 'AssociatedRoles', 'CACertificateIdentifier', 'DBClusterIdentifier', 

698 'DBInstanceIdentifier', 'DBInstanceClass', 'DbInstancePort', 'DbiResourceId', 

699 'DBName', 'DeletionProtection', 'Endpoint', 'Engine', 'EngineVersion', 

700 'IAMDatabaseAuthenticationEnabled', 'InstanceCreateTime', 'KmsKeyId', 

701 'PubliclyAccessible', 'StorageEncrypted', 

702 'TdeCredentialArn', 'VpcSecurityGroups', 'MultiAz', 'EnhancedMonitoringResourceArn', 

703 'DbInstanceStatus', 'MasterUsername', 

704 'AllocatedStorage', 'PreferredBackupWindow', 'BackupRetentionPeriod', 

705 'DbSecurityGroups', 'DbParameterGroups', 

706 'AvailabilityZone', 'DbSubnetGroup', 'PreferredMaintenanceWindow', 

707 'PendingModifiedValues', 'LatestRestorableTime', 

708 'AutoMinorVersionUpgrade', 'ReadReplicaSourceDBInstanceIdentifier', 

709 'ReadReplicaDBInstanceIdentifiers', 

710 'ReadReplicaDBClusterIdentifiers', 'LicenseModel', 'Iops', 'OptionGroupMemberships', 

711 'CharacterSetName', 

712 'SecondaryAvailabilityZone', 'StatusInfos', 'StorageType', 'DomainMemberships', 

713 'CopyTagsToSnapshot', 

714 'MonitoringInterval', 'MonitoringRoleArn', 'PromotionTier', 'Timezone', 

715 'PerformanceInsightsEnabled', 

716 'PerformanceInsightsKmsKeyId', 'PerformanceInsightsRetentionPeriod', 

717 'EnabledCloudWatchLogsExports', 

718 'ProcessorFeatures', 'ListenerEndpoint', 'MaxAllocatedStorage' 

719 ] 

720 details = {} 

721 for f in fields: 

722 if r.get(f): 

723 value = r[f] 

724 if isinstance(r[f], datetime.datetime): 

725 value = r[f].isoformat() 

726 details.setdefault(f, value) 

727 

728 db_instance = { 

729 'Type': self.resource_type, 

730 'Id': r['DBInstanceArn'], 

731 'Region': self.manager.config.region, 

732 'Tags': {t['Key']: t['Value'] for t in r.get('Tags', [])}, 

733 'Details': {self.resource_type: filter_empty(details)}, 

734 } 

735 db_instance = filter_empty(db_instance) 

736 return db_instance 

737 

738 

739@actions.register('snapshot') 

740class Snapshot(BaseAction): 

741 """Creates a manual snapshot of a RDS instance 

742 

743 :example: 

744 

745 .. code-block:: yaml 

746 

747 policies: 

748 - name: rds-snapshot 

749 resource: rds 

750 actions: 

751 - snapshot 

752 """ 

753 

754 schema = type_schema('snapshot') 

755 permissions = ('rds:CreateDBSnapshot',) 

756 

757 def process(self, dbs): 

758 with self.executor_factory(max_workers=3) as w: 

759 futures = [] 

760 for db in dbs: 

761 futures.append(w.submit( 

762 self.process_rds_snapshot, 

763 db)) 

764 for f in as_completed(futures): 

765 if f.exception(): 

766 self.log.error( 

767 "Exception creating rds snapshot \n %s", 

768 f.exception()) 

769 return dbs 

770 

771 def process_rds_snapshot(self, resource): 

772 if not _db_instance_eligible_for_backup(resource): 

773 return 

774 

775 c = local_session(self.manager.session_factory).client('rds') 

776 c.create_db_snapshot( 

777 DBSnapshotIdentifier=snapshot_identifier( 

778 self.data.get('snapshot-prefix', 'Backup'), 

779 resource['DBInstanceIdentifier']), 

780 DBInstanceIdentifier=resource['DBInstanceIdentifier']) 

781 

782 

783@actions.register('resize') 

784class ResizeInstance(BaseAction): 

785 """Change the allocated storage of an rds instance. 

786 

787 :example: 

788 

789 This will find databases using over 85% of their allocated 

790 storage, and resize them to have an additional 30% storage 

791 the resize here is async during the next maintenance. 

792 

793 .. code-block:: yaml 

794 

795 policies: 

796 - name: rds-resize-up 

797 resource: rds 

798 filters: 

799 - type: metrics 

800 name: FreeStorageSpace 

801 percent-attr: AllocatedStorage 

802 attr-multiplier: 1073741824 

803 value: 90 

804 op: greater-than 

805 actions: 

806 - type: resize 

807 percent: 30 

808 

809 

810 This will find databases using under 20% of their allocated 

811 storage, and resize them to be 30% smaller, the resize here 

812 is configured to be immediate. 

813 

814 .. code-block:: yaml 

815 

816 policies: 

817 - name: rds-resize-down 

818 resource: rds 

819 filters: 

820 - type: metrics 

821 name: FreeStorageSpace 

822 percent-attr: AllocatedStorage 

823 attr-multiplier: 1073741824 

824 value: 90 

825 op: greater-than 

826 actions: 

827 - type: resize 

828 percent: -30 

829 immediate: true 

830 """ 

831 schema = type_schema( 

832 'resize', 

833 percent={'type': 'number'}, 

834 immediate={'type': 'boolean'}) 

835 

836 permissions = ('rds:ModifyDBInstance',) 

837 

838 def process(self, resources): 

839 c = local_session(self.manager.session_factory).client('rds') 

840 for r in resources: 

841 old_val = D(r['AllocatedStorage']) 

842 _100 = D(100) 

843 new_val = ((_100 + D(self.data['percent'])) / _100) * old_val 

844 rounded = int(new_val.quantize(D('0'), ROUND_HALF_UP)) 

845 c.modify_db_instance( 

846 DBInstanceIdentifier=r['DBInstanceIdentifier'], 

847 AllocatedStorage=rounded, 

848 ApplyImmediately=self.data.get('immediate', False)) 

849 

850 

851@actions.register('retention') 

852class RetentionWindow(BaseAction): 

853 """ 

854 Sets the 'BackupRetentionPeriod' value for automated snapshots, 

855 enforce (min, max, exact) sets retention days occordingly. 

856 :example: 

857 

858 .. code-block:: yaml 

859 

860 policies: 

861 - name: rds-snapshot-retention 

862 resource: rds 

863 filters: 

864 - type: value 

865 key: BackupRetentionPeriod 

866 value: 7 

867 op: lt 

868 actions: 

869 - type: retention 

870 days: 7 

871 copy-tags: true 

872 enforce: exact 

873 """ 

874 

875 date_attribute = "BackupRetentionPeriod" 

876 schema = type_schema( 

877 'retention', **{'days': {'type': 'number'}, 

878 'copy-tags': {'type': 'boolean'}, 

879 'enforce': {'type': 'string', 'enum': [ 

880 'min', 'max', 'exact']}}) 

881 permissions = ('rds:ModifyDBInstance',) 

882 

883 def process(self, dbs): 

884 with self.executor_factory(max_workers=3) as w: 

885 futures = [] 

886 for db in dbs: 

887 futures.append(w.submit( 

888 self.process_snapshot_retention, 

889 db)) 

890 for f in as_completed(futures): 

891 if f.exception(): 

892 self.log.error( 

893 "Exception setting rds retention \n %s", 

894 f.exception()) 

895 return dbs 

896 

897 def process_snapshot_retention(self, resource): 

898 current_retention = int(resource.get('BackupRetentionPeriod', 0)) 

899 current_copy_tags = resource['CopyTagsToSnapshot'] 

900 new_retention = self.data['days'] 

901 new_copy_tags = self.data.get('copy-tags', True) 

902 retention_type = self.data.get('enforce', 'min').lower() 

903 

904 if ((retention_type == 'min' or 

905 current_copy_tags != new_copy_tags) and 

906 _db_instance_eligible_for_backup(resource)): 

907 self.set_retention_window( 

908 resource, 

909 max(current_retention, new_retention), 

910 new_copy_tags) 

911 return resource 

912 

913 if ((retention_type == 'max' or 

914 current_copy_tags != new_copy_tags) and 

915 _db_instance_eligible_for_backup(resource)): 

916 self.set_retention_window( 

917 resource, 

918 min(current_retention, new_retention), 

919 new_copy_tags) 

920 return resource 

921 

922 if ((retention_type == 'exact' or 

923 current_copy_tags != new_copy_tags) and 

924 _db_instance_eligible_for_backup(resource)): 

925 self.set_retention_window(resource, new_retention, new_copy_tags) 

926 return resource 

927 

928 def set_retention_window(self, resource, retention, copy_tags): 

929 c = local_session(self.manager.session_factory).client('rds') 

930 c.modify_db_instance( 

931 DBInstanceIdentifier=resource['DBInstanceIdentifier'], 

932 BackupRetentionPeriod=retention, 

933 CopyTagsToSnapshot=copy_tags) 

934 

935 

936@actions.register('set-public-access') 

937class RDSSetPublicAvailability(BaseAction): 

938 """ 

939 This action allows for toggling an RDS instance 

940 'PubliclyAccessible' flag to true or false 

941 

942 :example: 

943 

944 .. code-block:: yaml 

945 

946 policies: 

947 - name: disable-rds-public-accessibility 

948 resource: rds 

949 filters: 

950 - PubliclyAccessible: true 

951 actions: 

952 - type: set-public-access 

953 state: false 

954 """ 

955 

956 schema = type_schema( 

957 "set-public-access", 

958 state={'type': 'boolean'}) 

959 permissions = ('rds:ModifyDBInstance',) 

960 

961 def set_accessibility(self, r): 

962 client = local_session(self.manager.session_factory).client('rds') 

963 client.modify_db_instance( 

964 DBInstanceIdentifier=r['DBInstanceIdentifier'], 

965 PubliclyAccessible=self.data.get('state', False)) 

966 

967 def process(self, rds): 

968 with self.executor_factory(max_workers=2) as w: 

969 futures = {w.submit(self.set_accessibility, r): r for r in rds} 

970 for f in as_completed(futures): 

971 if f.exception(): 

972 self.log.error( 

973 "Exception setting public access on %s \n %s", 

974 futures[f]['DBInstanceIdentifier'], f.exception()) 

975 return rds 

976 

977 

978@resources.register('rds-subscription') 

979class RDSSubscription(QueryResourceManager): 

980 

981 class resource_type(TypeInfo): 

982 service = 'rds' 

983 arn_type = 'es' 

984 cfn_type = 'AWS::RDS::EventSubscription' 

985 enum_spec = ( 

986 'describe_event_subscriptions', 'EventSubscriptionsList', None) 

987 name = id = "CustSubscriptionId" 

988 arn = 'EventSubscriptionArn' 

989 date = "SubscriptionCreateTime" 

990 permissions_enum = ('rds:DescribeEventSubscriptions',) 

991 universal_taggable = object() 

992 

993 augment = universal_augment 

994 

995 

996@RDSSubscription.filter_registry.register('topic') 

997class RDSSubscriptionSNSTopic(related.RelatedResourceFilter): 

998 """ 

999 Retrieves topics related to RDS event subscriptions 

1000 

1001 :example: 

1002 

1003 .. code-block:: yaml 

1004 

1005 policies: 

1006 - name: rds-subscriptions-no-confirmed-topics 

1007 resource: aws.rds-subscription 

1008 filters: 

1009 - type: topic 

1010 key: SubscriptionsConfirmed 

1011 value: 0 

1012 value_type: integer 

1013 """ 

1014 schema = type_schema('topic', rinherit=ValueFilter.schema) 

1015 RelatedResource = 'c7n.resources.sns.SNS' 

1016 RelatedIdsExpression = 'SnsTopicArn' 

1017 annotation_key = 'c7n:SnsTopic' 

1018 

1019 def process(self, resources, event=None): 

1020 rel = self.get_related(resources) 

1021 matched = [] 

1022 for resource in resources: 

1023 if self.process_resource(resource, rel): 

1024 # adding full topic details 

1025 resource[self.annotation_key] = rel.get( 

1026 resource[self.RelatedIdsExpression], 

1027 None # can be if value is "absent" 

1028 ) 

1029 matched.append(resource) 

1030 return matched 

1031 

1032 

1033@RDSSubscription.action_registry.register('delete') 

1034class RDSSubscriptionDelete(BaseAction): 

1035 """Deletes a RDS snapshot resource 

1036 

1037 :example: 

1038 

1039 .. code-block:: yaml 

1040 

1041 policies: 

1042 - name: rds-subscription-delete 

1043 resource: rds-subscription 

1044 filters: 

1045 - type: value 

1046 key: CustSubscriptionId 

1047 value: xyz 

1048 actions: 

1049 - delete 

1050 """ 

1051 

1052 schema = type_schema('delete') 

1053 permissions = ('rds:DeleteEventSubscription',) 

1054 

1055 def process(self, resources): 

1056 client = local_session(self.manager.session_factory).client('rds') 

1057 for r in resources: 

1058 self.manager.retry( 

1059 client.delete_event_subscription, SubscriptionName=r['CustSubscriptionId'], 

1060 ignore_err_codes=('SubscriptionNotFoundFault', 

1061 'InvalidEventSubscriptionStateFault')) 

1062 

1063 

1064class DescribeRDSSnapshot(DescribeSource): 

1065 

1066 def get_resources(self, ids, cache=True): 

1067 super_get = super().get_resources 

1068 return list(itertools.chain(*[super_get((i,)) for i in ids])) 

1069 

1070 def augment(self, snaps): 

1071 for s in snaps: 

1072 s['Tags'] = s.pop('TagList', ()) 

1073 return snaps 

1074 

1075 

1076@resources.register('rds-snapshot') 

1077class RDSSnapshot(QueryResourceManager): 

1078 """Resource manager for RDS DB snapshots. 

1079 """ 

1080 

1081 class resource_type(TypeInfo): 

1082 service = 'rds' 

1083 arn_type = 'snapshot' 

1084 arn_separator = ':' 

1085 enum_spec = ('describe_db_snapshots', 'DBSnapshots', None) 

1086 name = id = 'DBSnapshotIdentifier' 

1087 date = 'SnapshotCreateTime' 

1088 config_type = "AWS::RDS::DBSnapshot" 

1089 filter_name = "DBSnapshotIdentifier" 

1090 filter_type = "scalar" 

1091 universal_taggable = True 

1092 permissions_enum = ('rds:DescribeDBSnapshots',) 

1093 

1094 source_mapping = { 

1095 'describe': DescribeRDSSnapshot, 

1096 'config': ConfigSource 

1097 } 

1098 

1099 

1100@RDSSnapshot.filter_registry.register('onhour') 

1101class RDSSnapshotOnHour(OnHour): 

1102 """Scheduled action on rds snapshot.""" 

1103 

1104 

1105@RDSSnapshot.filter_registry.register('instance') 

1106class SnapshotInstance(related.RelatedResourceFilter): 

1107 """Filter snapshots by their database attributes. 

1108 

1109 :example: 

1110 

1111 Find snapshots without an extant database 

1112 

1113 .. code-block:: yaml 

1114 

1115 policies: 

1116 - name: rds-snapshot-orphan 

1117 resource: aws.rds-snapshot 

1118 filters: 

1119 - type: instance 

1120 value: 0 

1121 value_type: resource_count 

1122 """ 

1123 schema = type_schema( 

1124 'instance', rinherit=ValueFilter.schema 

1125 ) 

1126 

1127 RelatedResource = "c7n.resources.rds.RDS" 

1128 RelatedIdsExpression = "DBInstanceIdentifier" 

1129 FetchThreshold = 5 

1130 

1131 

1132@RDSSnapshot.filter_registry.register('latest') 

1133class LatestSnapshot(Filter): 

1134 """Return the latest snapshot for each database. 

1135 """ 

1136 schema = type_schema('latest', automatic={'type': 'boolean'}) 

1137 permissions = ('rds:DescribeDBSnapshots',) 

1138 

1139 def process(self, resources, event=None): 

1140 results = [] 

1141 if not self.data.get('automatic', True): 

1142 resources = [r for r in resources if r['SnapshotType'] == 'manual'] 

1143 for db_identifier, snapshots in itertools.groupby( 

1144 resources, operator.itemgetter('DBInstanceIdentifier')): 

1145 results.append( 

1146 sorted(snapshots, 

1147 key=operator.itemgetter('SnapshotCreateTime'))[-1]) 

1148 return results 

1149 

1150 

1151@RDSSnapshot.filter_registry.register('age') 

1152class RDSSnapshotAge(AgeFilter): 

1153 """Filters RDS snapshots based on age (in days) 

1154 

1155 :example: 

1156 

1157 .. code-block:: yaml 

1158 

1159 policies: 

1160 - name: rds-snapshot-expired 

1161 resource: rds-snapshot 

1162 filters: 

1163 - type: age 

1164 days: 28 

1165 op: ge 

1166 actions: 

1167 - delete 

1168 """ 

1169 

1170 schema = type_schema( 

1171 'age', days={'type': 'number'}, 

1172 op={'$ref': '#/definitions/filters_common/comparison_operators'}) 

1173 

1174 date_attribute = 'SnapshotCreateTime' 

1175 

1176 def get_resource_date(self, i): 

1177 return i.get('SnapshotCreateTime') 

1178 

1179 

1180@RDSSnapshot.action_registry.register('restore') 

1181class RestoreInstance(BaseAction): 

1182 """Restore an rds instance from a snapshot. 

1183 

1184 Note this requires the snapshot or db deletion be taken 

1185 with the `copy-restore-info` boolean flag set to true, as 

1186 various instance metadata is stored on the snapshot as tags. 

1187 

1188 additional parameters to restore db instance api call be overriden 

1189 via `restore_options` settings. various modify db instance parameters 

1190 can be specified via `modify_options` settings. 

1191 """ 

1192 

1193 schema = type_schema( 

1194 'restore', 

1195 restore_options={'type': 'object'}, 

1196 modify_options={'type': 'object'}) 

1197 

1198 permissions = ( 

1199 'rds:ModifyDBInstance', 

1200 'rds:ModifyDBParameterGroup', 

1201 'rds:ModifyOptionGroup', 

1202 'rds:RebootDBInstance', 

1203 'rds:RestoreDBInstanceFromDBSnapshot') 

1204 

1205 poll_period = 60 

1206 restore_keys = { 

1207 'VPCSecurityGroups', 'MultiAZ', 'DBSubnetGroupName', 

1208 'InstanceClass', 'StorageType', 'ParameterGroupName', 

1209 'OptionGroupName'} 

1210 

1211 def validate(self): 

1212 found = False 

1213 for f in self.manager.iter_filters(): 

1214 if isinstance(f, LatestSnapshot): 

1215 found = True 

1216 if not found: 

1217 # do we really need this... 

1218 raise PolicyValidationError( 

1219 "must filter by latest to use restore action %s" % ( 

1220 self.manager.data,)) 

1221 return self 

1222 

1223 def process(self, resources): 

1224 client = local_session(self.manager.session_factory).client('rds') 

1225 # restore up to 10 in parallel, we have to wait on each. 

1226 with self.executor_factory( 

1227 max_workers=min(10, len(resources) or 1)) as w: 

1228 futures = {} 

1229 for r in resources: 

1230 tags = {t['Key']: t['Value'] for t in r['Tags']} 

1231 if not set(tags).issuperset(self.restore_keys): 

1232 self.log.warning( 

1233 "snapshot:%s missing restore tags", 

1234 r['DBSnapshotIdentifier']) 

1235 continue 

1236 futures[w.submit(self.process_instance, client, r)] = r 

1237 for f in as_completed(futures): 

1238 r = futures[f] 

1239 if f.exception(): 

1240 self.log.warning( 

1241 "Error restoring db:%s from:%s error:\n%s", 

1242 r['DBInstanceIdentifier'], r['DBSnapshotIdentifier'], 

1243 f.exception()) 

1244 continue 

1245 

1246 def process_instance(self, client, r): 

1247 params, post_modify = self.get_restore_from_tags(r) 

1248 self.manager.retry( 

1249 client.restore_db_instance_from_db_snapshot, **params) 

1250 waiter = client.get_waiter('db_instance_available') 

1251 # wait up to 40m 

1252 waiter.config.delay = self.poll_period 

1253 waiter.wait(DBInstanceIdentifier=params['DBInstanceIdentifier']) 

1254 self.manager.retry( 

1255 client.modify_db_instance, 

1256 DBInstanceIdentifier=params['DBInstanceIdentifier'], 

1257 ApplyImmediately=True, 

1258 **post_modify) 

1259 self.manager.retry( 

1260 client.reboot_db_instance, 

1261 DBInstanceIdentifier=params['DBInstanceIdentifier'], 

1262 ForceFailover=False) 

1263 

1264 def get_restore_from_tags(self, snapshot): 

1265 params, post_modify = {}, {} 

1266 tags = {t['Key']: t['Value'] for t in snapshot['Tags']} 

1267 

1268 params['DBInstanceIdentifier'] = snapshot['DBInstanceIdentifier'] 

1269 params['DBSnapshotIdentifier'] = snapshot['DBSnapshotIdentifier'] 

1270 params['MultiAZ'] = tags['MultiAZ'] == 'True' and True or False 

1271 params['DBSubnetGroupName'] = tags['DBSubnetGroupName'] 

1272 params['DBInstanceClass'] = tags['InstanceClass'] 

1273 params['CopyTagsToSnapshot'] = True 

1274 params['StorageType'] = tags['StorageType'] 

1275 params['OptionGroupName'] = tags['OptionGroupName'] 

1276 

1277 post_modify['DBParameterGroupName'] = tags['ParameterGroupName'] 

1278 post_modify['VpcSecurityGroupIds'] = tags['VPCSecurityGroups'].split(',') 

1279 

1280 params['Tags'] = [ 

1281 {'Key': k, 'Value': v} for k, v in tags.items() 

1282 if k not in self.restore_keys] 

1283 

1284 params.update(self.data.get('restore_options', {})) 

1285 post_modify.update(self.data.get('modify_options', {})) 

1286 return params, post_modify 

1287 

1288 

1289@RDSSnapshot.filter_registry.register('cross-account') 

1290class CrossAccountAccess(CrossAccountAccessFilter): 

1291 

1292 permissions = ('rds:DescribeDBSnapshotAttributes',) 

1293 attributes_key = 'c7n:attributes' 

1294 annotation_key = 'c7n:CrossAccountViolations' 

1295 

1296 def process(self, resources, event=None): 

1297 self.accounts = self.get_accounts() 

1298 self.everyone_only = self.data.get("everyone_only", False) 

1299 results = [] 

1300 with self.executor_factory(max_workers=2) as w: 

1301 futures = [] 

1302 for resource_set in chunks(resources, 20): 

1303 futures.append(w.submit( 

1304 self.process_resource_set, resource_set)) 

1305 for f in as_completed(futures): 

1306 if f.exception(): 

1307 self.log.error( 

1308 "Exception checking cross account access\n %s" % ( 

1309 f.exception())) 

1310 continue 

1311 results.extend(f.result()) 

1312 return results 

1313 

1314 def process_resource_set(self, resource_set): 

1315 client = local_session(self.manager.session_factory).client('rds') 

1316 results = [] 

1317 for r in resource_set: 

1318 attrs = {t['AttributeName']: t['AttributeValues'] 

1319 for t in self.manager.retry( 

1320 client.describe_db_snapshot_attributes, 

1321 DBSnapshotIdentifier=r['DBSnapshotIdentifier'])[ 

1322 'DBSnapshotAttributesResult']['DBSnapshotAttributes']} 

1323 r[self.attributes_key] = attrs 

1324 shared_accounts = set(attrs.get('restore', [])) 

1325 if self.everyone_only: 

1326 shared_accounts = {a for a in shared_accounts if a == 'all'} 

1327 delta_accounts = shared_accounts.difference(self.accounts) 

1328 if delta_accounts: 

1329 r[self.annotation_key] = list(delta_accounts) 

1330 results.append(r) 

1331 return results 

1332 

1333 

1334@RDSSnapshot.action_registry.register('set-permissions') 

1335class SetPermissions(BaseAction): 

1336 """Set permissions for copying or restoring an RDS snapshot 

1337 

1338 Use the 'add' and 'remove' parameters to control which accounts to 

1339 add or remove, respectively. The default is to remove any 

1340 permissions granted to other AWS accounts. 

1341 

1342 Use `remove: matched` in combination with the `cross-account` filter 

1343 for more flexible removal options such as preserving access for 

1344 a set of whitelisted accounts: 

1345 

1346 :example: 

1347 

1348 .. code-block:: yaml 

1349 

1350 policies: 

1351 - name: rds-snapshot-remove-cross-account 

1352 resource: rds-snapshot 

1353 filters: 

1354 - type: cross-account 

1355 whitelist: 

1356 - '112233445566' 

1357 actions: 

1358 - type: set-permissions 

1359 remove: matched 

1360 """ 

1361 schema = type_schema( 

1362 'set-permissions', 

1363 remove={'oneOf': [ 

1364 {'enum': ['matched']}, 

1365 {'type': 'array', 'items': { 

1366 'oneOf': [ 

1367 {'type': 'string', 'minLength': 12, 'maxLength': 12}, 

1368 {'enum': ['all']}, 

1369 ], 

1370 }} 

1371 ]}, 

1372 add={ 

1373 'type': 'array', 'items': { 

1374 'oneOf': [ 

1375 {'type': 'string', 'minLength': 12, 'maxLength': 12}, 

1376 {'enum': ['all']}, 

1377 ] 

1378 } 

1379 } 

1380 ) 

1381 

1382 permissions = ('rds:ModifyDBSnapshotAttribute',) 

1383 

1384 def validate(self): 

1385 if self.data.get('remove') == 'matched': 

1386 found = False 

1387 for f in self.manager.iter_filters(): 

1388 if isinstance(f, CrossAccountAccessFilter): 

1389 found = True 

1390 break 

1391 if not found: 

1392 raise PolicyValidationError( 

1393 "policy:%s filter:%s with matched requires cross-account filter" % ( 

1394 self.manager.ctx.policy.name, self.type)) 

1395 

1396 def process(self, snapshots): 

1397 client = local_session(self.manager.session_factory).client('rds') 

1398 for s in snapshots: 

1399 self.process_snapshot(client, s) 

1400 

1401 def process_snapshot(self, client, snapshot): 

1402 add_accounts = self.data.get('add', []) 

1403 remove_accounts = self.data.get('remove', []) 

1404 

1405 if not (add_accounts or remove_accounts): 

1406 if CrossAccountAccess.attributes_key not in snapshot: 

1407 attrs = { 

1408 t['AttributeName']: t['AttributeValues'] 

1409 for t in self.manager.retry( 

1410 client.describe_db_snapshot_attributes, 

1411 DBSnapshotIdentifier=snapshot['DBSnapshotIdentifier'] 

1412 )['DBSnapshotAttributesResult']['DBSnapshotAttributes'] 

1413 } 

1414 snapshot[CrossAccountAccess.attributes_key] = attrs 

1415 remove_accounts = snapshot[CrossAccountAccess.attributes_key].get('restore', []) 

1416 elif remove_accounts == 'matched': 

1417 remove_accounts = snapshot.get(CrossAccountAccess.annotation_key, []) 

1418 

1419 if add_accounts or remove_accounts: 

1420 client.modify_db_snapshot_attribute( 

1421 DBSnapshotIdentifier=snapshot['DBSnapshotIdentifier'], 

1422 AttributeName='restore', 

1423 ValuesToRemove=remove_accounts, 

1424 ValuesToAdd=add_accounts) 

1425 

1426 

1427@RDSSnapshot.action_registry.register('region-copy') 

1428class RegionCopySnapshot(BaseAction): 

1429 """Copy a snapshot across regions. 

1430 

1431 Note there is a max in flight for cross region rds snapshots 

1432 of 5 per region. This action will attempt to retry automatically 

1433 for an hr. 

1434 

1435 Example:: 

1436 

1437 - name: copy-encrypted-snapshots 

1438 description: | 

1439 copy snapshots under 1 day old to dr region with kms 

1440 resource: rds-snapshot 

1441 region: us-east-1 

1442 filters: 

1443 - Status: available 

1444 - type: value 

1445 key: SnapshotCreateTime 

1446 value_type: age 

1447 value: 1 

1448 op: less-than 

1449 actions: 

1450 - type: region-copy 

1451 target_region: us-east-2 

1452 target_key: arn:aws:kms:us-east-2:0000:key/cb291f53-c9cf61 

1453 copy_tags: true 

1454 tags: 

1455 OriginRegion: us-east-1 

1456 """ 

1457 

1458 schema = type_schema( 

1459 'region-copy', 

1460 target_region={'type': 'string'}, 

1461 target_key={'type': 'string'}, 

1462 copy_tags={'type': 'boolean'}, 

1463 tags={'type': 'object'}, 

1464 required=('target_region',)) 

1465 

1466 permissions = ('rds:CopyDBSnapshot',) 

1467 min_delay = 120 

1468 max_attempts = 30 

1469 

1470 def validate(self): 

1471 if self.data.get('target_region') and self.manager.data.get('mode'): 

1472 raise PolicyValidationError( 

1473 "cross region snapshot may require waiting for " 

1474 "longer then lambda runtime allows %s" % (self.manager.data,)) 

1475 return self 

1476 

1477 def process(self, resources): 

1478 if self.data['target_region'] == self.manager.config.region: 

1479 self.log.warning( 

1480 "Source and destination region are the same, skipping copy") 

1481 return 

1482 for resource_set in chunks(resources, 20): 

1483 self.process_resource_set(resource_set) 

1484 

1485 def process_resource(self, target, key, tags, snapshot): 

1486 p = {} 

1487 if key: 

1488 p['KmsKeyId'] = key 

1489 p['TargetDBSnapshotIdentifier'] = snapshot[ 

1490 'DBSnapshotIdentifier'].replace(':', '-') 

1491 p['SourceRegion'] = self.manager.config.region 

1492 p['SourceDBSnapshotIdentifier'] = snapshot['DBSnapshotArn'] 

1493 

1494 if self.data.get('copy_tags', True): 

1495 p['CopyTags'] = True 

1496 if tags: 

1497 p['Tags'] = tags 

1498 

1499 retry = get_retry( 

1500 ('SnapshotQuotaExceeded',), 

1501 # TODO make this configurable, class defaults to 1hr 

1502 min_delay=self.min_delay, 

1503 max_attempts=self.max_attempts, 

1504 log_retries=logging.DEBUG) 

1505 try: 

1506 result = retry(target.copy_db_snapshot, **p) 

1507 except ClientError as e: 

1508 if e.response['Error']['Code'] == 'DBSnapshotAlreadyExists': 

1509 self.log.warning( 

1510 "Snapshot %s already exists in target region", 

1511 snapshot['DBSnapshotIdentifier']) 

1512 return 

1513 raise 

1514 snapshot['c7n:CopiedSnapshot'] = result[ 

1515 'DBSnapshot']['DBSnapshotArn'] 

1516 

1517 def process_resource_set(self, resource_set): 

1518 target_client = self.manager.session_factory( 

1519 region=self.data['target_region']).client('rds') 

1520 target_key = self.data.get('target_key') 

1521 tags = [{'Key': k, 'Value': v} for k, v 

1522 in self.data.get('tags', {}).items()] 

1523 

1524 for snapshot_set in chunks(resource_set, 5): 

1525 for r in snapshot_set: 

1526 # If tags are supplied, copy tags are ignored, and 

1527 # we need to augment the tag set with the original 

1528 # resource tags to preserve the common case. 

1529 rtags = tags and list(tags) or None 

1530 if tags and self.data.get('copy_tags', True): 

1531 rtags.extend(r['Tags']) 

1532 self.process_resource(target_client, target_key, rtags, r) 

1533 

1534 

1535@RDSSnapshot.action_registry.register('delete') 

1536class RDSSnapshotDelete(BaseAction): 

1537 """Deletes a RDS snapshot resource 

1538 

1539 :example: 

1540 

1541 .. code-block:: yaml 

1542 

1543 policies: 

1544 - name: rds-snapshot-delete-stale 

1545 resource: rds-snapshot 

1546 filters: 

1547 - type: age 

1548 days: 28 

1549 op: ge 

1550 actions: 

1551 - delete 

1552 """ 

1553 

1554 schema = type_schema('delete') 

1555 permissions = ('rds:DeleteDBSnapshot',) 

1556 

1557 def process(self, snapshots): 

1558 snapshots = self.filter_resources(snapshots, 'SnapshotType', ('manual',)) 

1559 if not snapshots: 

1560 return [] 

1561 log.info("Deleting %d rds snapshots", len(snapshots)) 

1562 with self.executor_factory(max_workers=3) as w: 

1563 futures = [] 

1564 for snapshot_set in chunks(reversed(snapshots), size=50): 

1565 futures.append( 

1566 w.submit(self.process_snapshot_set, snapshot_set)) 

1567 for f in as_completed(futures): 

1568 if f.exception(): 

1569 self.log.error( 

1570 "Exception deleting snapshot set \n %s", 

1571 f.exception()) 

1572 return snapshots 

1573 

1574 def process_snapshot_set(self, snapshots_set): 

1575 c = local_session(self.manager.session_factory).client('rds') 

1576 for s in snapshots_set: 

1577 c.delete_db_snapshot( 

1578 DBSnapshotIdentifier=s['DBSnapshotIdentifier']) 

1579 

1580 

1581@actions.register('modify-security-groups') 

1582class RDSModifyVpcSecurityGroups(ModifyVpcSecurityGroupsAction): 

1583 

1584 permissions = ('rds:ModifyDBInstance', 'rds:ModifyDBCluster') 

1585 vpc_expr = 'DBSubnetGroup.VpcId' 

1586 

1587 def process(self, rds_instances): 

1588 replication_group_map = {} 

1589 client = local_session(self.manager.session_factory).client('rds') 

1590 groups = super(RDSModifyVpcSecurityGroups, self).get_groups( 

1591 rds_instances) 

1592 

1593 # either build map for DB cluster or modify DB instance directly 

1594 for idx, i in enumerate(rds_instances): 

1595 if i.get('DBClusterIdentifier'): 

1596 # build map of Replication Groups to Security Groups 

1597 replication_group_map[i['DBClusterIdentifier']] = groups[idx] 

1598 else: 

1599 client.modify_db_instance( 

1600 DBInstanceIdentifier=i['DBInstanceIdentifier'], 

1601 VpcSecurityGroupIds=groups[idx]) 

1602 

1603 # handle DB cluster, if necessary 

1604 for idx, r in enumerate(replication_group_map.keys()): 

1605 client.modify_db_cluster( 

1606 DBClusterIdentifier=r, 

1607 VpcSecurityGroupIds=replication_group_map[r] 

1608 ) 

1609 

1610 

1611class DescribeSubnetGroup(DescribeSource): 

1612 

1613 def augment(self, resources): 

1614 _db_subnet_group_tags( 

1615 resources, self.manager.session_factory, 

1616 self.manager.executor_factory, self.manager.retry) 

1617 return resources 

1618 

1619 

1620@resources.register('rds-subnet-group') 

1621class RDSSubnetGroup(QueryResourceManager): 

1622 """RDS subnet group.""" 

1623 

1624 class resource_type(TypeInfo): 

1625 service = 'rds' 

1626 arn_type = 'subgrp' 

1627 id = name = 'DBSubnetGroupName' 

1628 arn_separator = ':' 

1629 enum_spec = ( 

1630 'describe_db_subnet_groups', 'DBSubnetGroups', None) 

1631 filter_name = 'DBSubnetGroupName' 

1632 filter_type = 'scalar' 

1633 permissions_enum = ('rds:DescribeDBSubnetGroups',) 

1634 cfn_type = config_type = 'AWS::RDS::DBSubnetGroup' 

1635 universal_taggable = object() 

1636 

1637 source_mapping = { 

1638 'config': ConfigSource, 

1639 'describe': DescribeSubnetGroup 

1640 } 

1641 

1642 

1643def _db_subnet_group_tags(subnet_groups, session_factory, executor_factory, retry): 

1644 client = local_session(session_factory).client('rds') 

1645 

1646 def process_tags(g): 

1647 try: 

1648 g['Tags'] = client.list_tags_for_resource( 

1649 ResourceName=g['DBSubnetGroupArn'])['TagList'] 

1650 return g 

1651 except client.exceptions.DBSubnetGroupNotFoundFault: 

1652 return None 

1653 

1654 return list(filter(None, map(process_tags, subnet_groups))) 

1655 

1656 

1657@RDSSubnetGroup.action_registry.register('delete') 

1658class RDSSubnetGroupDeleteAction(BaseAction): 

1659 """Action to delete RDS Subnet Group 

1660 

1661 It is recommended to apply a filter to the delete policy to avoid unwanted 

1662 deletion of any rds subnet groups. 

1663 

1664 :example: 

1665 

1666 .. code-block:: yaml 

1667 

1668 policies: 

1669 - name: rds-subnet-group-delete 

1670 resource: rds-subnet-group 

1671 filters: 

1672 - Instances: [] 

1673 actions: 

1674 - delete 

1675 """ 

1676 

1677 schema = type_schema('delete') 

1678 permissions = ('rds:DeleteDBSubnetGroup',) 

1679 

1680 def process(self, subnet_group): 

1681 with self.executor_factory(max_workers=2) as w: 

1682 list(w.map(self.process_subnetgroup, subnet_group)) 

1683 

1684 def process_subnetgroup(self, subnet_group): 

1685 client = local_session(self.manager.session_factory).client('rds') 

1686 client.delete_db_subnet_group(DBSubnetGroupName=subnet_group['DBSubnetGroupName']) 

1687 

1688 

1689@RDSSubnetGroup.filter_registry.register('unused') 

1690class UnusedRDSSubnetGroup(Filter): 

1691 """Filters all launch rds subnet groups that are not in use but exist 

1692 

1693 :example: 

1694 

1695 .. code-block:: yaml 

1696 

1697 policies: 

1698 - name: rds-subnet-group-delete-unused 

1699 resource: rds-subnet-group 

1700 filters: 

1701 - unused 

1702 """ 

1703 

1704 schema = type_schema('unused') 

1705 

1706 def get_permissions(self): 

1707 return self.manager.get_resource_manager('rds').get_permissions() 

1708 

1709 def process(self, configs, event=None): 

1710 rds = self.manager.get_resource_manager('rds').resources() 

1711 self.used = set(jmespath_search('[].DBSubnetGroup.DBSubnetGroupName', rds)) 

1712 self.used.update(set(jmespath_search('[].DBSubnetGroup.DBSubnetGroupName', 

1713 self.manager.get_resource_manager('rds-cluster').resources(augment=False)))) 

1714 return super(UnusedRDSSubnetGroup, self).process(configs) 

1715 

1716 def __call__(self, config): 

1717 return config['DBSubnetGroupName'] not in self.used 

1718 

1719 

1720@filters.register('db-parameter') 

1721class ParameterFilter(ValueFilter): 

1722 """ 

1723 Applies value type filter on set db parameter values. 

1724 :example: 

1725 

1726 .. code-block:: yaml 

1727 

1728 policies: 

1729 - name: rds-pg 

1730 resource: rds 

1731 filters: 

1732 - type: db-parameter 

1733 key: someparam 

1734 op: eq 

1735 value: someval 

1736 """ 

1737 

1738 schema = type_schema('db-parameter', rinherit=ValueFilter.schema) 

1739 schema_alias = False 

1740 permissions = ('rds:DescribeDBInstances', 'rds:DescribeDBParameters', ) 

1741 policy_annotation = 'c7n:MatchedDBParameter' 

1742 

1743 @staticmethod 

1744 def recast(val, datatype): 

1745 """ Re-cast the value based upon an AWS supplied datatype 

1746 and treat nulls sensibly. 

1747 """ 

1748 ret_val = val 

1749 if datatype == 'string': 

1750 ret_val = str(val) 

1751 elif datatype == 'boolean': 

1752 # AWS returns 1s and 0s for boolean for most of the cases 

1753 if val.isdigit(): 

1754 ret_val = bool(int(val)) 

1755 # AWS returns 'TRUE,FALSE' for Oracle engine 

1756 elif val == 'TRUE': 

1757 ret_val = True 

1758 elif val == 'FALSE': 

1759 ret_val = False 

1760 elif datatype == 'integer': 

1761 if val.isdigit(): 

1762 ret_val = int(val) 

1763 elif datatype == 'float': 

1764 ret_val = float(val) if val else 0.0 

1765 

1766 return ret_val 

1767 

1768 # Private method for 'DBParameterGroupName' paginator 

1769 def _get_param_list(self, pg): 

1770 client = local_session(self.manager.session_factory).client('rds') 

1771 paginator = client.get_paginator('describe_db_parameters') 

1772 param_list = list(itertools.chain(*[p['Parameters'] 

1773 for p in paginator.paginate(DBParameterGroupName=pg)])) 

1774 return param_list 

1775 

1776 def handle_paramgroup_cache(self, param_groups): 

1777 pgcache = {} 

1778 cache = self.manager._cache 

1779 

1780 with cache: 

1781 for pg in param_groups: 

1782 cache_key = { 

1783 'region': self.manager.config.region, 

1784 'account_id': self.manager.config.account_id, 

1785 'rds-pg': pg} 

1786 pg_values = cache.get(cache_key) 

1787 if pg_values is not None: 

1788 pgcache[pg] = pg_values 

1789 continue 

1790 param_list = self._get_param_list(pg) 

1791 pgcache[pg] = { 

1792 p['ParameterName']: self.recast(p['ParameterValue'], p['DataType']) 

1793 for p in param_list if 'ParameterValue' in p} 

1794 cache.save(cache_key, pgcache[pg]) 

1795 return pgcache 

1796 

1797 def process(self, resources, event=None): 

1798 results = [] 

1799 parameter_group_list = {db['DBParameterGroups'][0]['DBParameterGroupName'] 

1800 for db in resources} 

1801 paramcache = self.handle_paramgroup_cache(parameter_group_list) 

1802 for resource in resources: 

1803 for pg in resource['DBParameterGroups']: 

1804 pg_values = paramcache[pg['DBParameterGroupName']] 

1805 if self.match(pg_values): 

1806 resource.setdefault(self.policy_annotation, []).append( 

1807 self.data.get('key')) 

1808 results.append(resource) 

1809 break 

1810 return results 

1811 

1812 

1813@actions.register('modify-db') 

1814class ModifyDb(BaseAction): 

1815 """Modifies an RDS instance based on specified parameter 

1816 using ModifyDbInstance. 

1817 

1818 'Update' is an array with with key value pairs that should be set to 

1819 the property and value you wish to modify. 

1820 'Immediate" determines whether the modification is applied immediately 

1821 or not. If 'immediate' is not specified, default is false. 

1822 

1823 :example: 

1824 

1825 .. code-block:: yaml 

1826 

1827 policies: 

1828 - name: disable-rds-deletion-protection 

1829 resource: rds 

1830 filters: 

1831 - DeletionProtection: true 

1832 - PubliclyAccessible: true 

1833 actions: 

1834 - type: modify-db 

1835 update: 

1836 - property: 'DeletionProtection' 

1837 value: false 

1838 - property: 'PubliclyAccessible' 

1839 value: false 

1840 immediate: true 

1841 """ 

1842 

1843 schema = type_schema( 

1844 'modify-db', 

1845 immediate={"type": 'boolean'}, 

1846 update={ 

1847 'type': 'array', 

1848 'items': { 

1849 'type': 'object', 

1850 'properties': { 

1851 'property': {'type': 'string', 'enum': [ 

1852 'AllocatedStorage', 

1853 'DBInstanceClass', 

1854 'DBSubnetGroupName', 

1855 'DBSecurityGroups', 

1856 'VpcSecurityGroupIds', 

1857 'MasterUserPassword', 

1858 'DBParameterGroupName', 

1859 'BackupRetentionPeriod', 

1860 'PreferredBackupWindow', 

1861 'PreferredMaintenanceWindow', 

1862 'MultiAZ', 

1863 'EngineVersion', 

1864 'AllowMajorVersionUpgrade', 

1865 'AutoMinorVersionUpgrade', 

1866 'LicenseModel', 

1867 'Iops', 

1868 'OptionGroupName', 

1869 'NewDBInstanceIdentifier', 

1870 'StorageType', 

1871 'TdeCredentialArn', 

1872 'TdeCredentialPassword', 

1873 'CACertificateIdentifier', 

1874 'Domain', 

1875 'CopyTagsToSnapshot', 

1876 'MonitoringInterval', 

1877 'MonitoringRoleARN', 

1878 'DBPortNumber', 

1879 'PubliclyAccessible', 

1880 'DomainIAMRoleName', 

1881 'PromotionTier', 

1882 'EnableIAMDatabaseAuthentication', 

1883 'EnablePerformanceInsights', 

1884 'PerformanceInsightsKMSKeyId', 

1885 'PerformanceInsightsRetentionPeriod', 

1886 'CloudwatchLogsExportConfiguration', 

1887 'ProcessorFeatures', 

1888 'UseDefaultProcessorFeatures', 

1889 'DeletionProtection', 

1890 'MaxAllocatedStorage', 

1891 'CertificateRotationRestart']}, 

1892 'value': {} 

1893 }, 

1894 }, 

1895 }, 

1896 required=('update',)) 

1897 

1898 permissions = ('rds:ModifyDBInstance',) 

1899 conversion_map = { 

1900 'DBSubnetGroupName': 'DBSubnetGroup.DBSubnetGroupName', 

1901 'VpcSecurityGroupIds': 'VpcSecurityGroups[].VpcSecurityGroupId', 

1902 'DBParameterGroupName': 'DBParameterGroups[].DBParameterGroupName', 

1903 'OptionGroupName': 'OptionGroupMemberships[].OptionGroupName', 

1904 'NewDBInstanceIdentifier': 'DBInstanceIdentifier', 

1905 'Domain': 'DomainMemberships[].DomainName', 

1906 'DBPortNumber': 'Endpoint.Port', 

1907 'EnablePerformanceInsights': 'PerformanceInsightsEnabled', 

1908 'CloudwatchLogsExportConfiguration': 'EnabledCloudwatchLogsExports' 

1909 } 

1910 

1911 def validate(self): 

1912 if self.data.get('update'): 

1913 update_dict = dict((i['property'], i['value']) for i in self.data.get('update')) 

1914 if ('MonitoringInterval' in update_dict and update_dict['MonitoringInterval'] > 0 and 

1915 'MonitoringRoleARN' not in update_dict): 

1916 raise PolicyValidationError( 

1917 "A MonitoringRoleARN value is required \ 

1918 if you specify a MonitoringInterval value other than 0") 

1919 if ('CloudwatchLogsExportConfiguration' in update_dict 

1920 and all( 

1921 k not in update_dict.get('CloudwatchLogsExportConfiguration') 

1922 for k in ('EnableLogTypes', 'DisableLogTypes'))): 

1923 raise PolicyValidationError( 

1924 "A EnableLogTypes or DisableLogTypes input list is required\ 

1925 for setting CloudwatchLogsExportConfiguration") 

1926 return self 

1927 

1928 def process(self, resources): 

1929 c = local_session(self.manager.session_factory).client('rds') 

1930 for r in resources: 

1931 param = { 

1932 u['property']: u['value'] for u in self.data.get('update') 

1933 if r.get( 

1934 u['property'], 

1935 jmespath_search( 

1936 self.conversion_map.get(u['property'], 'None'), r)) 

1937 != u['value']} 

1938 if not param: 

1939 continue 

1940 param['ApplyImmediately'] = self.data.get('immediate', False) 

1941 param['DBInstanceIdentifier'] = r['DBInstanceIdentifier'] 

1942 try: 

1943 c.modify_db_instance(**param) 

1944 except c.exceptions.DBInstanceNotFoundFault: 

1945 raise 

1946 

1947 

1948@resources.register('rds-reserved') 

1949class ReservedRDS(QueryResourceManager): 

1950 """Lists all active rds reservations 

1951 

1952 :example: 

1953 

1954 .. code-block:: yaml 

1955 

1956 policies: 

1957 - name: existing-rds-reservations 

1958 resource: rds-reserved 

1959 filters: 

1960 - State: active 

1961 """ 

1962 

1963 class resource_type(TypeInfo): 

1964 service = 'rds' 

1965 name = id = 'ReservedDBInstanceId' 

1966 date = 'StartTime' 

1967 enum_spec = ( 

1968 'describe_reserved_db_instances', 'ReservedDBInstances', None) 

1969 filter_name = 'ReservedDBInstances' 

1970 filter_type = 'list' 

1971 arn_type = "ri" 

1972 arn = "ReservedDBInstanceArn" 

1973 permissions_enum = ('rds:DescribeReservedDBInstances',) 

1974 universal_taggable = object() 

1975 

1976 augment = universal_augment 

1977 

1978 

1979RDS.filter_registry.register('consecutive-aws-backups', ConsecutiveAwsBackupsFilter) 

1980 

1981 

1982@filters.register('consecutive-snapshots') 

1983class ConsecutiveSnapshots(Filter): 

1984 """Returns instances where number of consective daily snapshots is 

1985 equal to/or greater than n days. 

1986 

1987 :example: 

1988 

1989 .. code-block:: yaml 

1990 

1991 policies: 

1992 - name: rds-daily-snapshot-count 

1993 resource: rds 

1994 filters: 

1995 - type: consecutive-snapshots 

1996 days: 7 

1997 """ 

1998 schema = type_schema('consecutive-snapshots', days={'type': 'number', 'minimum': 1}, 

1999 required=['days']) 

2000 permissions = ('rds:DescribeDBSnapshots', 'rds:DescribeDBInstances') 

2001 annotation = 'c7n:DBSnapshots' 

2002 

2003 def process_resource_set(self, client, resources): 

2004 rds_instances = [r['DBInstanceIdentifier'] for r in resources] 

2005 paginator = client.get_paginator('describe_db_snapshots') 

2006 paginator.PAGE_ITERATOR_CLS = RetryPageIterator 

2007 db_snapshots = paginator.paginate(Filters=[{'Name': 'db-instance-id', 

2008 'Values': rds_instances}]).build_full_result().get('DBSnapshots', []) 

2009 

2010 inst_map = {} 

2011 for snapshot in db_snapshots: 

2012 inst_map.setdefault(snapshot['DBInstanceIdentifier'], []).append(snapshot) 

2013 for r in resources: 

2014 r[self.annotation] = inst_map.get(r['DBInstanceIdentifier'], []) 

2015 

2016 def process(self, resources, event=None): 

2017 client = local_session(self.manager.session_factory).client('rds') 

2018 results = [] 

2019 retention = self.data.get('days') 

2020 utcnow = datetime.datetime.utcnow() 

2021 expected_dates = set() 

2022 for days in range(1, retention + 1): 

2023 expected_dates.add((utcnow - timedelta(days=days)).strftime('%Y-%m-%d')) 

2024 

2025 for resource_set in chunks( 

2026 [r for r in resources if self.annotation not in r], 50): 

2027 self.process_resource_set(client, resource_set) 

2028 

2029 for r in resources: 

2030 snapshot_dates = set() 

2031 for snapshot in r[self.annotation]: 

2032 if snapshot['Status'] == 'available': 

2033 snapshot_dates.add(snapshot['SnapshotCreateTime'].strftime('%Y-%m-%d')) 

2034 if expected_dates.issubset(snapshot_dates): 

2035 results.append(r) 

2036 return results 

2037 

2038 

2039@filters.register('engine') 

2040class EngineFilter(ValueFilter): 

2041 """ 

2042 Filter a rds resource based on its Engine Metadata 

2043 

2044 :example: 

2045 

2046 .. code-block:: yaml 

2047 

2048 policies: 

2049 - name: find-deprecated-versions 

2050 resource: aws.rds 

2051 filters: 

2052 - type: engine 

2053 key: Status 

2054 value: deprecated 

2055 """ 

2056 

2057 schema = type_schema('engine', rinherit=ValueFilter.schema) 

2058 

2059 permissions = ("rds:DescribeDBEngineVersions", ) 

2060 

2061 def process(self, resources, event=None): 

2062 client = local_session(self.manager.session_factory).client('rds') 

2063 

2064 engines = set() 

2065 engine_versions = set() 

2066 for r in resources: 

2067 engines.add(r['Engine']) 

2068 engine_versions.add(r['EngineVersion']) 

2069 

2070 paginator = client.get_paginator('describe_db_engine_versions') 

2071 response = paginator.paginate( 

2072 Filters=[ 

2073 {'Name': 'engine', 'Values': list(engines)}, 

2074 {'Name': 'engine-version', 'Values': list(engine_versions)} 

2075 ], 

2076 IncludeAll=True, 

2077 ) 

2078 all_versions = {} 

2079 matched = [] 

2080 for page in response: 

2081 for e in page['DBEngineVersions']: 

2082 all_versions.setdefault(e['Engine'], {}) 

2083 all_versions[e['Engine']][e['EngineVersion']] = e 

2084 for r in resources: 

2085 v = all_versions[r['Engine']][r['EngineVersion']] 

2086 if self.match(v): 

2087 r['c7n:Engine'] = v 

2088 matched.append(r) 

2089 return matched 

2090 

2091 

2092class DescribeDBProxy(DescribeSource): 

2093 def augment(self, resources): 

2094 return universal_augment(self.manager, resources) 

2095 

2096 

2097@resources.register('rds-proxy') 

2098class RDSProxy(QueryResourceManager): 

2099 """Resource Manager for RDS DB Proxies 

2100 

2101 :example: 

2102 

2103 .. code-block:: yaml 

2104 

2105 policies: 

2106 - name: rds-proxy-tls-check 

2107 resource: rds-proxy 

2108 filters: 

2109 - type: value 

2110 key: RequireTLS 

2111 value: false 

2112 """ 

2113 

2114 class resource_type(TypeInfo): 

2115 service = 'rds' 

2116 name = id = 'DBProxyName' 

2117 date = 'CreatedDate' 

2118 enum_spec = ('describe_db_proxies', 'DBProxies', None) 

2119 arn = 'DBProxyArn' 

2120 arn_type = 'db-proxy' 

2121 cfn_type = 'AWS::RDS::DBProxy' 

2122 permissions_enum = ('rds:DescribeDBProxies',) 

2123 universal_taggable = object() 

2124 

2125 source_mapping = { 

2126 'describe': DescribeDBProxy, 

2127 'config': ConfigSource 

2128 } 

2129 

2130 

2131@RDSProxy.action_registry.register('delete') 

2132class DeleteRDSProxy(BaseAction): 

2133 """ 

2134 Deletes a RDS Proxy 

2135 

2136 :example: 

2137 

2138 .. code-block:: yaml 

2139 

2140 policies: 

2141 - name: delete-rds-proxy 

2142 resource: aws.rds-proxy 

2143 filters: 

2144 - type: value 

2145 key: "DBProxyName" 

2146 op: eq 

2147 value: "proxy-test-1" 

2148 actions: 

2149 - type: delete 

2150 """ 

2151 

2152 schema = type_schema('delete') 

2153 

2154 permissions = ('rds:DeleteDBProxy',) 

2155 

2156 def process(self, resources): 

2157 client = local_session(self.manager.session_factory).client('rds') 

2158 for r in resources: 

2159 self.manager.retry( 

2160 client.delete_db_proxy, DBProxyName=r['DBProxyName'], 

2161 ignore_err_codes=('DBProxyNotFoundFault', 

2162 'InvalidDBProxyStateFault')) 

2163 

2164 

2165@RDSProxy.filter_registry.register('subnet') 

2166class RDSProxySubnetFilter(net_filters.SubnetFilter): 

2167 

2168 RelatedIdsExpression = "VpcSubnetIds[]" 

2169 

2170 

2171@RDSProxy.filter_registry.register('security-group') 

2172class RDSProxySecurityGroupFilter(net_filters.SecurityGroupFilter): 

2173 

2174 RelatedIdsExpression = "VpcSecurityGroupIds[]" 

2175 

2176 

2177@RDSProxy.filter_registry.register('vpc') 

2178class RDSProxyVpcFilter(net_filters.VpcFilter): 

2179 

2180 RelatedIdsExpression = "VpcId" 

2181 

2182 

2183@filters.register('db-option-groups') 

2184class DbOptionGroups(ValueFilter): 

2185 """This filter describes RDS option groups for associated RDS instances. 

2186 Use this filter in conjunction with jmespath and value filter operators 

2187 to filter RDS instance based on their option groups 

2188 

2189 :example: 

2190 

2191 .. code-block:: yaml 

2192 

2193 policies: 

2194 - name: rds-data-in-transit-encrypted 

2195 resource: aws.rds 

2196 filters: 

2197 - type: db-option-groups 

2198 key: Options[].OptionName 

2199 op: intersect 

2200 value: 

2201 - SSL 

2202 - NATIVE_NETWORK_ENCRYPTION 

2203 

2204 :example: 

2205 

2206 .. code-block:: yaml 

2207 

2208 policies: 

2209 - name: rds-oracle-encryption-in-transit 

2210 resource: aws.rds 

2211 filters: 

2212 - Engine: oracle-ee 

2213 - type: db-option-groups 

2214 key: Options[].OptionSettings[?Name == 'SQLNET.ENCRYPTION_SERVER'].Value[] 

2215 value: 

2216 - REQUIRED 

2217 """ 

2218 

2219 schema = type_schema('db-option-groups', rinherit=ValueFilter.schema) 

2220 schema_alias = False 

2221 permissions = ('rds:DescribeDBInstances', 'rds:DescribeOptionGroups', ) 

2222 policy_annotation = 'c7n:MatchedDBOptionGroups' 

2223 

2224 def handle_optiongroup_cache(self, client, paginator, option_groups): 

2225 ogcache = {} 

2226 cache = self.manager._cache 

2227 

2228 with cache: 

2229 for og in option_groups: 

2230 cache_key = { 

2231 'region': self.manager.config.region, 

2232 'account_id': self.manager.config.account_id, 

2233 'rds-pg': og} 

2234 og_values = cache.get(cache_key) 

2235 if og_values is not None: 

2236 ogcache[og] = og_values 

2237 continue 

2238 option_groups_list = list(itertools.chain(*[p['OptionGroupsList'] 

2239 for p in paginator.paginate(OptionGroupName=og)])) 

2240 

2241 ogcache[og] = {} 

2242 for option_group in option_groups_list: 

2243 ogcache[og] = option_group 

2244 

2245 cache.save(cache_key, ogcache[og]) 

2246 

2247 return ogcache 

2248 

2249 def process(self, resources, event=None): 

2250 results = [] 

2251 client = local_session(self.manager.session_factory).client('rds') 

2252 paginator = client.get_paginator('describe_option_groups') 

2253 option_groups = [db['OptionGroupMemberships'][0]['OptionGroupName'] 

2254 for db in resources] 

2255 optioncache = self.handle_optiongroup_cache(client, paginator, option_groups) 

2256 

2257 for resource in resources: 

2258 for og in resource['OptionGroupMemberships']: 

2259 og_values = optioncache[og['OptionGroupName']] 

2260 if self.match(og_values): 

2261 resource.setdefault(self.policy_annotation, []).append({ 

2262 k: jmespath_search(k, og_values) 

2263 for k in {'OptionGroupName', self.data.get('key')} 

2264 }) 

2265 results.append(resource) 

2266 break 

2267 

2268 return results 

2269 

2270 

2271@filters.register('pending-maintenance') 

2272class PendingMaintenance(Filter): 

2273 """Scan DB instances for those with pending maintenance 

2274 

2275 :example: 

2276 

2277 .. code-block:: yaml 

2278 

2279 policies: 

2280 - name: rds-pending-maintenance 

2281 resource: aws.rds 

2282 filters: 

2283 - pending-maintenance 

2284 - type: value 

2285 key: '"c7n:PendingMaintenance"[].PendingMaintenanceActionDetails[].Action' 

2286 op: intersect 

2287 value: 

2288 - system-update 

2289 """ 

2290 

2291 annotation_key = 'c7n:PendingMaintenance' 

2292 schema = type_schema('pending-maintenance') 

2293 permissions = ('rds:DescribePendingMaintenanceActions',) 

2294 

2295 def process(self, resources, event=None): 

2296 client = local_session(self.manager.session_factory).client('rds') 

2297 

2298 results = [] 

2299 resource_maintenances = {} 

2300 paginator = client.get_paginator('describe_pending_maintenance_actions') 

2301 for page in paginator.paginate(): 

2302 for action in page['PendingMaintenanceActions']: 

2303 resource_maintenances.setdefault(action['ResourceIdentifier'], []).append(action) 

2304 

2305 for r in resources: 

2306 pending_maintenances = resource_maintenances.get(r['DBInstanceArn'], []) 

2307 if len(pending_maintenances) > 0: 

2308 r[self.annotation_key] = pending_maintenances 

2309 results.append(r) 

2310 

2311 return results