Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/c7n/resources/rds.py: 40%

927 statements  

« prev     ^ index     » next       coverage.py v7.3.2, created at 2023-12-08 06:51 +0000

1# Copyright The Cloud Custodian Authors. 

2# SPDX-License-Identifier: Apache-2.0 

3""" 

4RDS Resource Manager 

5==================== 

6 

7Example Policies 

8---------------- 

9 

10Find rds instances that are publicly available 

11 

12.. code-block:: yaml 

13 

14 policies: 

15 - name: rds-public 

16 resource: rds 

17 filters: 

18 - PubliclyAccessible: true 

19 

20Find rds instances that are not encrypted 

21 

22.. code-block:: yaml 

23 

24 policies: 

25 - name: rds-non-encrypted 

26 resource: rds 

27 filters: 

28 - type: value 

29 key: StorageEncrypted 

30 value: true 

31 op: ne 

32 

33""" 

34import functools 

35import itertools 

36import logging 

37import operator 

38import re 

39import datetime 

40 

41from datetime import timedelta 

42 

43from decimal import Decimal as D, ROUND_HALF_UP 

44 

45from c7n.vendored.distutils.version import LooseVersion 

46from botocore.exceptions import ClientError 

47from concurrent.futures import as_completed 

48 

49from c7n.actions import ( 

50 ActionRegistry, BaseAction, ModifyVpcSecurityGroupsAction) 

51 

52from c7n.exceptions import PolicyValidationError 

53from c7n.filters import ( 

54 CrossAccountAccessFilter, FilterRegistry, Filter, ValueFilter, AgeFilter) 

55from c7n.filters.offhours import OffHour, OnHour 

56from c7n.filters import related 

57import c7n.filters.vpc as net_filters 

58from c7n.manager import resources 

59from c7n.query import ( 

60 QueryResourceManager, DescribeSource, ConfigSource, TypeInfo, RetryPageIterator) 

61from c7n import deprecated, tags 

62from c7n.tags import universal_augment 

63 

64from c7n.utils import ( 

65 local_session, type_schema, get_retry, chunks, snapshot_identifier, 

66 merge_dict_list, filter_empty, jmespath_search) 

67from c7n.resources.kms import ResourceKmsKeyAlias 

68from c7n.resources.securityhub import PostFinding 

69from c7n.filters.backup import ConsecutiveAwsBackupsFilter 

70 

71log = logging.getLogger('custodian.rds') 

72 

73filters = FilterRegistry('rds.filters') 

74actions = ActionRegistry('rds.actions') 

75 

76 

77class DescribeRDS(DescribeSource): 

78 

79 def augment(self, dbs): 

80 for d in dbs: 

81 d['Tags'] = d.pop('TagList', ()) 

82 return dbs 

83 

84 

85class ConfigRDS(ConfigSource): 

86 

87 def load_resource(self, item): 

88 resource = super().load_resource(item) 

89 for k in list(resource.keys()): 

90 if k.startswith('Db'): 

91 resource["DB%s" % k[2:]] = resource[k] 

92 return resource 

93 

94 

95@resources.register('rds') 

96class RDS(QueryResourceManager): 

97 """Resource manager for RDS DB instances. 

98 """ 

99 

100 class resource_type(TypeInfo): 

101 service = 'rds' 

102 arn_type = 'db' 

103 arn_separator = ':' 

104 enum_spec = ('describe_db_instances', 'DBInstances', None) 

105 id = 'DBInstanceIdentifier' 

106 config_id = 'DbiResourceId' 

107 name = 'Endpoint.Address' 

108 filter_name = 'DBInstanceIdentifier' 

109 filter_type = 'scalar' 

110 date = 'InstanceCreateTime' 

111 dimension = 'DBInstanceIdentifier' 

112 cfn_type = config_type = 'AWS::RDS::DBInstance' 

113 arn = 'DBInstanceArn' 

114 universal_taggable = True 

115 default_report_fields = ( 

116 'DBInstanceIdentifier', 

117 'DBName', 

118 'Engine', 

119 'EngineVersion', 

120 'MultiAZ', 

121 'AllocatedStorage', 

122 'StorageEncrypted', 

123 'PubliclyAccessible', 

124 'InstanceCreateTime', 

125 ) 

126 permissions_enum = ('rds:DescribeDBInstances',) 

127 

128 filter_registry = filters 

129 action_registry = actions 

130 

131 def resources(self, query=None): 

132 if query is None and 'query' in self.data: 

133 query = merge_dict_list(self.data['query']) 

134 elif query is None: 

135 query = {} 

136 return super(RDS, self).resources(query=query) 

137 

138 source_mapping = { 

139 'describe': DescribeRDS, 

140 'config': ConfigRDS 

141 } 

142 

143 

144def _db_instance_eligible_for_backup(resource): 

145 db_instance_id = resource['DBInstanceIdentifier'] 

146 

147 # Database instance is not in available state 

148 if resource.get('DBInstanceStatus', '') != 'available': 

149 log.debug( 

150 "DB instance %s is not in available state", 

151 db_instance_id) 

152 return False 

153 # The specified DB Instance is a member of a cluster and its 

154 # backup retention should not be modified directly. Instead, 

155 # modify the backup retention of the cluster using the 

156 # ModifyDbCluster API 

157 if resource.get('DBClusterIdentifier', ''): 

158 log.debug( 

159 "DB instance %s is a cluster member", 

160 db_instance_id) 

161 return False 

162 # DB Backups not supported on a read replica for engine postgres 

163 if (resource.get('ReadReplicaSourceDBInstanceIdentifier', '') and 

164 resource.get('Engine', '') == 'postgres'): 

165 log.debug( 

166 "DB instance %s is a postgres read-replica", 

167 db_instance_id) 

168 return False 

169 # DB Backups not supported on a read replica running a mysql 

170 # version before 5.6 

171 if (resource.get('ReadReplicaSourceDBInstanceIdentifier', '') and 

172 resource.get('Engine', '') == 'mysql'): 

173 engine_version = resource.get('EngineVersion', '') 

174 # Assume "<major>.<minor>.<whatever>" 

175 match = re.match(r'(?P<major>\d+)\.(?P<minor>\d+)\..*', engine_version) 

176 if (match and int(match.group('major')) < 5 or 

177 (int(match.group('major')) == 5 and int(match.group('minor')) < 6)): 

178 log.debug( 

179 "DB instance %s is a version %s mysql read-replica", 

180 db_instance_id, 

181 engine_version) 

182 return False 

183 return True 

184 

185 

186def _db_instance_eligible_for_final_snapshot(resource): 

187 status = resource.get('DBInstanceStatus', '') 

188 # If the DB instance you are deleting has a status of "Creating," 

189 # you will not be able to have a final DB snapshot taken 

190 # If the DB instance is in a failure state with a status of "failed," 

191 # "incompatible-restore," or "incompatible-network," you can only delete 

192 # the instance when the SkipFinalSnapshot parameter is set to "true." 

193 eligible_for_final_snapshot = True 

194 if status in ['creating', 'failed', 'incompatible-restore', 'incompatible-network']: 

195 eligible_for_final_snapshot = False 

196 

197 # FinalDBSnapshotIdentifier can not be specified when deleting a 

198 # replica instance 

199 if resource.get('ReadReplicaSourceDBInstanceIdentifier', ''): 

200 eligible_for_final_snapshot = False 

201 

202 # if it's a rds-cluster, don't try to run the rds instance snapshot api call 

203 if resource.get('DBClusterIdentifier', False): 

204 eligible_for_final_snapshot = False 

205 

206 if not eligible_for_final_snapshot: 

207 log.debug('DB instance is not eligible for a snapshot:/n %s', resource) 

208 return eligible_for_final_snapshot 

209 

210 

211def _get_available_engine_upgrades(client, major=False): 

212 """Returns all extant rds engine upgrades. 

213 

214 As a nested mapping of engine type to known versions 

215 and their upgrades. 

216 

217 Defaults to minor upgrades, but configurable to major. 

218 

219 Example:: 

220 

221 >>> _get_available_engine_upgrades(client) 

222 { 

223 'oracle-se2': {'12.1.0.2.v2': '12.1.0.2.v5', 

224 '12.1.0.2.v3': '12.1.0.2.v5'}, 

225 'postgres': {'9.3.1': '9.3.14', 

226 '9.3.10': '9.3.14', 

227 '9.3.12': '9.3.14', 

228 '9.3.2': '9.3.14'} 

229 } 

230 """ 

231 results = {} 

232 paginator = client.get_paginator('describe_db_engine_versions') 

233 for page in paginator.paginate(): 

234 engine_versions = page['DBEngineVersions'] 

235 for v in engine_versions: 

236 if v['Engine'] not in results: 

237 results[v['Engine']] = {} 

238 if 'ValidUpgradeTarget' not in v or len(v['ValidUpgradeTarget']) == 0: 

239 continue 

240 for t in v['ValidUpgradeTarget']: 

241 if not major and t['IsMajorVersionUpgrade']: 

242 continue 

243 if LooseVersion(t['EngineVersion']) > LooseVersion( 

244 results[v['Engine']].get(v['EngineVersion'], '0.0.0')): 

245 results[v['Engine']][v['EngineVersion']] = t['EngineVersion'] 

246 return results 

247 

248 

249filters.register('offhour', OffHour) 

250filters.register('onhour', OnHour) 

251 

252 

253@filters.register('default-vpc') 

254class DefaultVpc(net_filters.DefaultVpcBase): 

255 """ Matches if an rds database is in the default vpc 

256 

257 :example: 

258 

259 .. code-block:: yaml 

260 

261 policies: 

262 - name: default-vpc-rds 

263 resource: rds 

264 filters: 

265 - type: default-vpc 

266 """ 

267 schema = type_schema('default-vpc') 

268 

269 def __call__(self, rdb): 

270 return self.match(rdb['DBSubnetGroup']['VpcId']) 

271 

272 

273@filters.register('security-group') 

274class SecurityGroupFilter(net_filters.SecurityGroupFilter): 

275 

276 RelatedIdsExpression = "VpcSecurityGroups[].VpcSecurityGroupId" 

277 

278 

279@filters.register('subnet') 

280class SubnetFilter(net_filters.SubnetFilter): 

281 

282 RelatedIdsExpression = "DBSubnetGroup.Subnets[].SubnetIdentifier" 

283 

284 

285@filters.register('vpc') 

286class VpcFilter(net_filters.VpcFilter): 

287 

288 RelatedIdsExpression = "DBSubnetGroup.VpcId" 

289 

290 

291filters.register('network-location', net_filters.NetworkLocation) 

292 

293 

294@filters.register('kms-alias') 

295class KmsKeyAlias(ResourceKmsKeyAlias): 

296 

297 def process(self, dbs, event=None): 

298 return self.get_matching_aliases(dbs) 

299 

300 

301@actions.register('auto-patch') 

302class AutoPatch(BaseAction): 

303 """Toggle AutoMinorUpgrade flag on RDS instance 

304 

305 'window' parameter needs to be in the format 'ddd:hh:mm-ddd:hh:mm' and 

306 have at least 30 minutes between start & end time. 

307 If 'window' is not specified, AWS will assign a random maintenance window 

308 to each instance selected. 

309 

310 :example: 

311 

312 .. code-block:: yaml 

313 

314 policies: 

315 - name: enable-rds-autopatch 

316 resource: rds 

317 filters: 

318 - AutoMinorVersionUpgrade: false 

319 actions: 

320 - type: auto-patch 

321 minor: true 

322 window: Mon:23:00-Tue:01:00 

323 """ 

324 

325 schema = type_schema( 

326 'auto-patch', 

327 minor={'type': 'boolean'}, window={'type': 'string'}) 

328 permissions = ('rds:ModifyDBInstance',) 

329 

330 def process(self, dbs): 

331 client = local_session( 

332 self.manager.session_factory).client('rds') 

333 

334 params = {'AutoMinorVersionUpgrade': self.data.get('minor', True)} 

335 if self.data.get('window'): 

336 params['PreferredMaintenanceWindow'] = self.data['window'] 

337 

338 for db in dbs: 

339 client.modify_db_instance( 

340 DBInstanceIdentifier=db['DBInstanceIdentifier'], 

341 **params) 

342 

343 

344@filters.register('upgrade-available') 

345class UpgradeAvailable(Filter): 

346 """ Scan DB instances for available engine upgrades 

347 

348 This will pull DB instances & check their specific engine for any 

349 engine version with higher release numbers than the current one 

350 

351 This will also annotate the rds instance with 'target_engine' which is 

352 the most recent version of the engine available 

353 

354 :example: 

355 

356 .. code-block:: yaml 

357 

358 policies: 

359 - name: rds-upgrade-available 

360 resource: rds 

361 filters: 

362 - type: upgrade-available 

363 major: False 

364 

365 """ 

366 

367 schema = type_schema('upgrade-available', 

368 major={'type': 'boolean'}, 

369 value={'type': 'boolean'}) 

370 permissions = ('rds:DescribeDBEngineVersions',) 

371 

372 def process(self, resources, event=None): 

373 client = local_session(self.manager.session_factory).client('rds') 

374 check_upgrade_extant = self.data.get('value', True) 

375 check_major = self.data.get('major', False) 

376 engine_upgrades = _get_available_engine_upgrades( 

377 client, major=check_major) 

378 results = [] 

379 

380 for r in resources: 

381 target_upgrade = engine_upgrades.get( 

382 r['Engine'], {}).get(r['EngineVersion']) 

383 if target_upgrade is None: 

384 if check_upgrade_extant is False: 

385 results.append(r) 

386 continue 

387 r['c7n-rds-engine-upgrade'] = target_upgrade 

388 results.append(r) 

389 return results 

390 

391 

392@actions.register('upgrade') 

393class UpgradeMinor(BaseAction): 

394 """Upgrades a RDS instance to the latest major/minor version available 

395 

396 Use of the 'immediate' flag (default False) will automatically upgrade 

397 the RDS engine disregarding the existing maintenance window. 

398 

399 :example: 

400 

401 .. code-block:: yaml 

402 

403 policies: 

404 - name: upgrade-rds-minor 

405 resource: rds 

406 actions: 

407 - type: upgrade 

408 major: False 

409 immediate: False 

410 

411 """ 

412 

413 schema = type_schema( 

414 'upgrade', 

415 major={'type': 'boolean'}, 

416 immediate={'type': 'boolean'}) 

417 permissions = ('rds:ModifyDBInstance',) 

418 

419 def process(self, resources): 

420 client = local_session(self.manager.session_factory).client('rds') 

421 engine_upgrades = None 

422 for r in resources: 

423 if 'EngineVersion' in r['PendingModifiedValues']: 

424 # Upgrade has already been scheduled 

425 continue 

426 if 'c7n-rds-engine-upgrade' not in r: 

427 if engine_upgrades is None: 

428 engine_upgrades = _get_available_engine_upgrades( 

429 client, major=self.data.get('major', False)) 

430 target = engine_upgrades.get( 

431 r['Engine'], {}).get(r['EngineVersion']) 

432 if target is None: 

433 log.debug( 

434 "implicit filter no upgrade on %s", 

435 r['DBInstanceIdentifier']) 

436 continue 

437 r['c7n-rds-engine-upgrade'] = target 

438 client.modify_db_instance( 

439 DBInstanceIdentifier=r['DBInstanceIdentifier'], 

440 EngineVersion=r['c7n-rds-engine-upgrade'], 

441 ApplyImmediately=self.data.get('immediate', False)) 

442 

443 

444@actions.register('tag-trim') 

445class TagTrim(tags.TagTrim): 

446 

447 permissions = ('rds:RemoveTagsFromResource',) 

448 

449 def process_tag_removal(self, client, resource, candidates): 

450 client.remove_tags_from_resource(ResourceName=resource['DBInstanceArn'], TagKeys=candidates) 

451 

452 

453START_STOP_ELIGIBLE_ENGINES = { 

454 'postgres', 'sqlserver-ee', 

455 'oracle-se2', 'mariadb', 'oracle-ee', 

456 'sqlserver-ex', 'sqlserver-se', 'oracle-se', 

457 'mysql', 'oracle-se1', 'sqlserver-web'} 

458 

459 

460def _eligible_start_stop(db, state="available"): 

461 # See conditions noted here 

462 # https://docs.aws.amazon.com/AmazonRDS/latest/UserGuide/USER_StopInstance.html 

463 # Note that this doesn't really specify what happens for all the nosql engines 

464 # that are available as rds engines. 

465 if db.get('DBInstanceStatus') != state: 

466 return False 

467 

468 if db.get('MultiAZ') and db['Engine'].startswith('sqlserver-'): 

469 return False 

470 

471 if db['Engine'] not in START_STOP_ELIGIBLE_ENGINES: 

472 return False 

473 

474 if db.get('ReadReplicaDBInstanceIdentifiers'): 

475 return False 

476 

477 if db.get('ReadReplicaSourceDBInstanceIdentifier'): 

478 return False 

479 

480 # TODO is SQL Server mirror is detectable. 

481 return True 

482 

483 

484@actions.register('stop') 

485class Stop(BaseAction): 

486 """Stop an rds instance. 

487 

488 https://docs.aws.amazon.com/AmazonRDS/latest/UserGuide/USER_StopInstance.html 

489 """ 

490 

491 schema = type_schema('stop') 

492 

493 permissions = ("rds:StopDBInstance",) 

494 

495 def process(self, resources): 

496 client = local_session(self.manager.session_factory).client('rds') 

497 for r in filter(_eligible_start_stop, resources): 

498 try: 

499 client.stop_db_instance( 

500 DBInstanceIdentifier=r['DBInstanceIdentifier']) 

501 except ClientError as e: 

502 log.exception( 

503 "Error stopping db instance:%s err:%s", 

504 r['DBInstanceIdentifier'], e) 

505 

506 

507@actions.register('start') 

508class Start(BaseAction): 

509 """Start an rds instance. 

510 """ 

511 

512 schema = type_schema('start') 

513 

514 permissions = ("rds:StartDBInstance",) 

515 

516 def process(self, resources): 

517 client = local_session(self.manager.session_factory).client('rds') 

518 start_filter = functools.partial(_eligible_start_stop, state='stopped') 

519 for r in filter(start_filter, resources): 

520 try: 

521 client.start_db_instance( 

522 DBInstanceIdentifier=r['DBInstanceIdentifier']) 

523 except ClientError as e: 

524 log.exception( 

525 "Error starting db instance:%s err:%s", 

526 r['DBInstanceIdentifier'], e) 

527 

528 

529@actions.register('delete') 

530class Delete(BaseAction): 

531 """Deletes selected RDS instances 

532 

533 This will delete RDS instances. It is recommended to apply with a filter 

534 to avoid deleting all RDS instances in the account. 

535 

536 :example: 

537 

538 .. code-block:: yaml 

539 

540 policies: 

541 - name: rds-delete 

542 resource: rds 

543 filters: 

544 - default-vpc 

545 actions: 

546 - type: delete 

547 skip-snapshot: true 

548 """ 

549 

550 schema = type_schema('delete', **{ 

551 'skip-snapshot': {'type': 'boolean'}, 

552 'copy-restore-info': {'type': 'boolean'} 

553 }) 

554 

555 permissions = ('rds:DeleteDBInstance', 'rds:AddTagsToResource') 

556 

557 def validate(self): 

558 if self.data.get('skip-snapshot', False) and self.data.get( 

559 'copy-restore-info'): 

560 raise PolicyValidationError( 

561 "skip-snapshot cannot be specified with copy-restore-info on %s" % ( 

562 self.manager.data,)) 

563 return self 

564 

565 def process(self, dbs): 

566 skip = self.data.get('skip-snapshot', False) 

567 # Can't delete an instance in an aurora cluster, use a policy on the cluster 

568 dbs = [r for r in dbs if not r.get('DBClusterIdentifier')] 

569 # Concurrency feels like overkill here. 

570 client = local_session(self.manager.session_factory).client('rds') 

571 for db in dbs: 

572 params = dict( 

573 DBInstanceIdentifier=db['DBInstanceIdentifier']) 

574 if skip or not _db_instance_eligible_for_final_snapshot(db): 

575 params['SkipFinalSnapshot'] = True 

576 else: 

577 params['FinalDBSnapshotIdentifier'] = snapshot_identifier( 

578 'Final', db['DBInstanceIdentifier']) 

579 if self.data.get('copy-restore-info', False): 

580 self.copy_restore_info(client, db) 

581 if not db['CopyTagsToSnapshot']: 

582 client.modify_db_instance( 

583 DBInstanceIdentifier=db['DBInstanceIdentifier'], 

584 CopyTagsToSnapshot=True) 

585 self.log.info( 

586 "Deleting rds: %s snapshot: %s", 

587 db['DBInstanceIdentifier'], 

588 params.get('FinalDBSnapshotIdentifier', False)) 

589 

590 try: 

591 client.delete_db_instance(**params) 

592 except ClientError as e: 

593 if e.response['Error']['Code'] == "InvalidDBInstanceState": 

594 continue 

595 raise 

596 

597 return dbs 

598 

599 def copy_restore_info(self, client, instance): 

600 tags = [] 

601 tags.append({ 

602 'Key': 'VPCSecurityGroups', 

603 'Value': ''.join([ 

604 g['VpcSecurityGroupId'] for g in instance['VpcSecurityGroups'] 

605 ])}) 

606 tags.append({ 

607 'Key': 'OptionGroupName', 

608 'Value': instance['OptionGroupMemberships'][0]['OptionGroupName']}) 

609 tags.append({ 

610 'Key': 'ParameterGroupName', 

611 'Value': instance['DBParameterGroups'][0]['DBParameterGroupName']}) 

612 tags.append({ 

613 'Key': 'InstanceClass', 

614 'Value': instance['DBInstanceClass']}) 

615 tags.append({ 

616 'Key': 'StorageType', 

617 'Value': instance['StorageType']}) 

618 tags.append({ 

619 'Key': 'MultiAZ', 

620 'Value': str(instance['MultiAZ'])}) 

621 tags.append({ 

622 'Key': 'DBSubnetGroupName', 

623 'Value': instance['DBSubnetGroup']['DBSubnetGroupName']}) 

624 client.add_tags_to_resource( 

625 ResourceName=self.manager.generate_arn( 

626 instance['DBInstanceIdentifier']), 

627 Tags=tags) 

628 

629 

630@actions.register('set-snapshot-copy-tags') 

631class CopySnapshotTags(BaseAction): 

632 """Enables copying tags from rds instance to snapshot 

633 

634 DEPRECATED - use modify-db instead with `CopyTagsToSnapshot` 

635 

636 :example: 

637 

638 .. code-block:: yaml 

639 

640 policies: 

641 - name: enable-rds-snapshot-tags 

642 resource: rds 

643 filters: 

644 - type: value 

645 key: Engine 

646 value: aurora 

647 op: eq 

648 actions: 

649 - type: set-snapshot-copy-tags 

650 enable: True 

651 """ 

652 deprecations = ( 

653 deprecated.action("use modify-db instead with `CopyTagsToSnapshot`"), 

654 ) 

655 

656 schema = type_schema( 

657 'set-snapshot-copy-tags', 

658 enable={'type': 'boolean'}) 

659 permissions = ('rds:ModifyDBInstance',) 

660 

661 def process(self, resources): 

662 error = None 

663 with self.executor_factory(max_workers=2) as w: 

664 futures = {} 

665 client = local_session(self.manager.session_factory).client('rds') 

666 resources = [r for r in resources 

667 if r['CopyTagsToSnapshot'] != self.data.get('enable', True)] 

668 for r in resources: 

669 futures[w.submit(self.set_snapshot_tags, client, r)] = r 

670 for f in as_completed(futures): 

671 if f.exception(): 

672 error = f.exception() 

673 self.log.error( 

674 'error updating rds:%s CopyTagsToSnapshot \n %s', 

675 futures[f]['DBInstanceIdentifier'], error) 

676 if error: 

677 raise error 

678 return resources 

679 

680 def set_snapshot_tags(self, client, r): 

681 self.manager.retry( 

682 client.modify_db_instance, 

683 DBInstanceIdentifier=r['DBInstanceIdentifier'], 

684 CopyTagsToSnapshot=self.data.get('enable', True)) 

685 

686 

687@RDS.action_registry.register('post-finding') 

688class DbInstanceFinding(PostFinding): 

689 

690 resource_type = 'AwsRdsDbInstance' 

691 

692 def format_resource(self, r): 

693 

694 fields = [ 

695 'AssociatedRoles', 'CACertificateIdentifier', 'DBClusterIdentifier', 

696 'DBInstanceIdentifier', 'DBInstanceClass', 'DbInstancePort', 'DbiResourceId', 

697 'DBName', 'DeletionProtection', 'Endpoint', 'Engine', 'EngineVersion', 

698 'IAMDatabaseAuthenticationEnabled', 'InstanceCreateTime', 'KmsKeyId', 

699 'PubliclyAccessible', 'StorageEncrypted', 

700 'TdeCredentialArn', 'VpcSecurityGroups', 'MultiAz', 'EnhancedMonitoringResourceArn', 

701 'DbInstanceStatus', 'MasterUsername', 

702 'AllocatedStorage', 'PreferredBackupWindow', 'BackupRetentionPeriod', 

703 'DbSecurityGroups', 'DbParameterGroups', 

704 'AvailabilityZone', 'DbSubnetGroup', 'PreferredMaintenanceWindow', 

705 'PendingModifiedValues', 'LatestRestorableTime', 

706 'AutoMinorVersionUpgrade', 'ReadReplicaSourceDBInstanceIdentifier', 

707 'ReadReplicaDBInstanceIdentifiers', 

708 'ReadReplicaDBClusterIdentifiers', 'LicenseModel', 'Iops', 'OptionGroupMemberships', 

709 'CharacterSetName', 

710 'SecondaryAvailabilityZone', 'StatusInfos', 'StorageType', 'DomainMemberships', 

711 'CopyTagsToSnapshot', 

712 'MonitoringInterval', 'MonitoringRoleArn', 'PromotionTier', 'Timezone', 

713 'PerformanceInsightsEnabled', 

714 'PerformanceInsightsKmsKeyId', 'PerformanceInsightsRetentionPeriod', 

715 'EnabledCloudWatchLogsExports', 

716 'ProcessorFeatures', 'ListenerEndpoint', 'MaxAllocatedStorage' 

717 ] 

718 details = {} 

719 for f in fields: 

720 if r.get(f): 

721 value = r[f] 

722 if isinstance(r[f], datetime.datetime): 

723 value = r[f].isoformat() 

724 details.setdefault(f, value) 

725 

726 db_instance = { 

727 'Type': self.resource_type, 

728 'Id': r['DBInstanceArn'], 

729 'Region': self.manager.config.region, 

730 'Tags': {t['Key']: t['Value'] for t in r.get('Tags', [])}, 

731 'Details': {self.resource_type: filter_empty(details)}, 

732 } 

733 db_instance = filter_empty(db_instance) 

734 return db_instance 

735 

736 

737@actions.register('snapshot') 

738class Snapshot(BaseAction): 

739 """Creates a manual snapshot of a RDS instance 

740 

741 :example: 

742 

743 .. code-block:: yaml 

744 

745 policies: 

746 - name: rds-snapshot 

747 resource: rds 

748 actions: 

749 - snapshot 

750 """ 

751 

752 schema = type_schema('snapshot') 

753 permissions = ('rds:CreateDBSnapshot',) 

754 

755 def process(self, dbs): 

756 with self.executor_factory(max_workers=3) as w: 

757 futures = [] 

758 for db in dbs: 

759 futures.append(w.submit( 

760 self.process_rds_snapshot, 

761 db)) 

762 for f in as_completed(futures): 

763 if f.exception(): 

764 self.log.error( 

765 "Exception creating rds snapshot \n %s", 

766 f.exception()) 

767 return dbs 

768 

769 def process_rds_snapshot(self, resource): 

770 if not _db_instance_eligible_for_backup(resource): 

771 return 

772 

773 c = local_session(self.manager.session_factory).client('rds') 

774 c.create_db_snapshot( 

775 DBSnapshotIdentifier=snapshot_identifier( 

776 self.data.get('snapshot-prefix', 'Backup'), 

777 resource['DBInstanceIdentifier']), 

778 DBInstanceIdentifier=resource['DBInstanceIdentifier']) 

779 

780 

781@actions.register('resize') 

782class ResizeInstance(BaseAction): 

783 """Change the allocated storage of an rds instance. 

784 

785 :example: 

786 

787 This will find databases using over 85% of their allocated 

788 storage, and resize them to have an additional 30% storage 

789 the resize here is async during the next maintenance. 

790 

791 .. code-block:: yaml 

792 

793 policies: 

794 - name: rds-resize-up 

795 resource: rds 

796 filters: 

797 - type: metrics 

798 name: FreeStorageSpace 

799 percent-attr: AllocatedStorage 

800 attr-multiplier: 1073741824 

801 value: 90 

802 op: greater-than 

803 actions: 

804 - type: resize 

805 percent: 30 

806 

807 

808 This will find databases using under 20% of their allocated 

809 storage, and resize them to be 30% smaller, the resize here 

810 is configured to be immediate. 

811 

812 .. code-block:: yaml 

813 

814 policies: 

815 - name: rds-resize-down 

816 resource: rds 

817 filters: 

818 - type: metrics 

819 name: FreeStorageSpace 

820 percent-attr: AllocatedStorage 

821 attr-multiplier: 1073741824 

822 value: 90 

823 op: greater-than 

824 actions: 

825 - type: resize 

826 percent: -30 

827 immediate: true 

828 """ 

829 schema = type_schema( 

830 'resize', 

831 percent={'type': 'number'}, 

832 immediate={'type': 'boolean'}) 

833 

834 permissions = ('rds:ModifyDBInstance',) 

835 

836 def process(self, resources): 

837 c = local_session(self.manager.session_factory).client('rds') 

838 for r in resources: 

839 old_val = D(r['AllocatedStorage']) 

840 _100 = D(100) 

841 new_val = ((_100 + D(self.data['percent'])) / _100) * old_val 

842 rounded = int(new_val.quantize(D('0'), ROUND_HALF_UP)) 

843 c.modify_db_instance( 

844 DBInstanceIdentifier=r['DBInstanceIdentifier'], 

845 AllocatedStorage=rounded, 

846 ApplyImmediately=self.data.get('immediate', False)) 

847 

848 

849@actions.register('retention') 

850class RetentionWindow(BaseAction): 

851 """ 

852 Sets the 'BackupRetentionPeriod' value for automated snapshots, 

853 enforce (min, max, exact) sets retention days occordingly. 

854 :example: 

855 

856 .. code-block:: yaml 

857 

858 policies: 

859 - name: rds-snapshot-retention 

860 resource: rds 

861 filters: 

862 - type: value 

863 key: BackupRetentionPeriod 

864 value: 7 

865 op: lt 

866 actions: 

867 - type: retention 

868 days: 7 

869 copy-tags: true 

870 enforce: exact 

871 """ 

872 

873 date_attribute = "BackupRetentionPeriod" 

874 schema = type_schema( 

875 'retention', **{'days': {'type': 'number'}, 

876 'copy-tags': {'type': 'boolean'}, 

877 'enforce': {'type': 'string', 'enum': [ 

878 'min', 'max', 'exact']}}) 

879 permissions = ('rds:ModifyDBInstance',) 

880 

881 def process(self, dbs): 

882 with self.executor_factory(max_workers=3) as w: 

883 futures = [] 

884 for db in dbs: 

885 futures.append(w.submit( 

886 self.process_snapshot_retention, 

887 db)) 

888 for f in as_completed(futures): 

889 if f.exception(): 

890 self.log.error( 

891 "Exception setting rds retention \n %s", 

892 f.exception()) 

893 return dbs 

894 

895 def process_snapshot_retention(self, resource): 

896 current_retention = int(resource.get('BackupRetentionPeriod', 0)) 

897 current_copy_tags = resource['CopyTagsToSnapshot'] 

898 new_retention = self.data['days'] 

899 new_copy_tags = self.data.get('copy-tags', True) 

900 retention_type = self.data.get('enforce', 'min').lower() 

901 

902 if ((retention_type == 'min' or 

903 current_copy_tags != new_copy_tags) and 

904 _db_instance_eligible_for_backup(resource)): 

905 self.set_retention_window( 

906 resource, 

907 max(current_retention, new_retention), 

908 new_copy_tags) 

909 return resource 

910 

911 if ((retention_type == 'max' or 

912 current_copy_tags != new_copy_tags) and 

913 _db_instance_eligible_for_backup(resource)): 

914 self.set_retention_window( 

915 resource, 

916 min(current_retention, new_retention), 

917 new_copy_tags) 

918 return resource 

919 

920 if ((retention_type == 'exact' or 

921 current_copy_tags != new_copy_tags) and 

922 _db_instance_eligible_for_backup(resource)): 

923 self.set_retention_window(resource, new_retention, new_copy_tags) 

924 return resource 

925 

926 def set_retention_window(self, resource, retention, copy_tags): 

927 c = local_session(self.manager.session_factory).client('rds') 

928 c.modify_db_instance( 

929 DBInstanceIdentifier=resource['DBInstanceIdentifier'], 

930 BackupRetentionPeriod=retention, 

931 CopyTagsToSnapshot=copy_tags) 

932 

933 

934@actions.register('set-public-access') 

935class RDSSetPublicAvailability(BaseAction): 

936 """ 

937 This action allows for toggling an RDS instance 

938 'PubliclyAccessible' flag to true or false 

939 

940 :example: 

941 

942 .. code-block:: yaml 

943 

944 policies: 

945 - name: disable-rds-public-accessibility 

946 resource: rds 

947 filters: 

948 - PubliclyAccessible: true 

949 actions: 

950 - type: set-public-access 

951 state: false 

952 """ 

953 

954 schema = type_schema( 

955 "set-public-access", 

956 state={'type': 'boolean'}) 

957 permissions = ('rds:ModifyDBInstance',) 

958 

959 def set_accessibility(self, r): 

960 client = local_session(self.manager.session_factory).client('rds') 

961 client.modify_db_instance( 

962 DBInstanceIdentifier=r['DBInstanceIdentifier'], 

963 PubliclyAccessible=self.data.get('state', False)) 

964 

965 def process(self, rds): 

966 with self.executor_factory(max_workers=2) as w: 

967 futures = {w.submit(self.set_accessibility, r): r for r in rds} 

968 for f in as_completed(futures): 

969 if f.exception(): 

970 self.log.error( 

971 "Exception setting public access on %s \n %s", 

972 futures[f]['DBInstanceIdentifier'], f.exception()) 

973 return rds 

974 

975 

976@resources.register('rds-subscription') 

977class RDSSubscription(QueryResourceManager): 

978 

979 class resource_type(TypeInfo): 

980 service = 'rds' 

981 arn_type = 'es' 

982 cfn_type = 'AWS::RDS::EventSubscription' 

983 enum_spec = ( 

984 'describe_event_subscriptions', 'EventSubscriptionsList', None) 

985 name = id = "CustSubscriptionId" 

986 arn = 'EventSubscriptionArn' 

987 date = "SubscriptionCreateTime" 

988 permissions_enum = ('rds:DescribeEventSubscriptions',) 

989 universal_taggable = object() 

990 

991 augment = universal_augment 

992 

993 

994@RDSSubscription.action_registry.register('delete') 

995class RDSSubscriptionDelete(BaseAction): 

996 """Deletes a RDS snapshot resource 

997 

998 :example: 

999 

1000 .. code-block:: yaml 

1001 

1002 policies: 

1003 - name: rds-subscription-delete 

1004 resource: rds-subscription 

1005 filters: 

1006 - type: value 

1007 key: CustSubscriptionId 

1008 value: xyz 

1009 actions: 

1010 - delete 

1011 """ 

1012 

1013 schema = type_schema('delete') 

1014 permissions = ('rds:DeleteEventSubscription',) 

1015 

1016 def process(self, resources): 

1017 client = local_session(self.manager.session_factory).client('rds') 

1018 for r in resources: 

1019 self.manager.retry( 

1020 client.delete_event_subscription, SubscriptionName=r['CustSubscriptionId'], 

1021 ignore_err_codes=('SubscriptionNotFoundFault', 

1022 'InvalidEventSubscriptionStateFault')) 

1023 

1024 

1025class DescribeRDSSnapshot(DescribeSource): 

1026 

1027 def get_resources(self, ids, cache=True): 

1028 super_get = super().get_resources 

1029 return list(itertools.chain(*[super_get((i,)) for i in ids])) 

1030 

1031 def augment(self, snaps): 

1032 for s in snaps: 

1033 s['Tags'] = s.pop('TagList', ()) 

1034 return snaps 

1035 

1036 

1037@resources.register('rds-snapshot') 

1038class RDSSnapshot(QueryResourceManager): 

1039 """Resource manager for RDS DB snapshots. 

1040 """ 

1041 

1042 class resource_type(TypeInfo): 

1043 service = 'rds' 

1044 arn_type = 'snapshot' 

1045 arn_separator = ':' 

1046 enum_spec = ('describe_db_snapshots', 'DBSnapshots', None) 

1047 name = id = 'DBSnapshotIdentifier' 

1048 date = 'SnapshotCreateTime' 

1049 config_type = "AWS::RDS::DBSnapshot" 

1050 filter_name = "DBSnapshotIdentifier" 

1051 filter_type = "scalar" 

1052 universal_taggable = True 

1053 permissions_enum = ('rds:DescribeDBSnapshots',) 

1054 

1055 source_mapping = { 

1056 'describe': DescribeRDSSnapshot, 

1057 'config': ConfigSource 

1058 } 

1059 

1060 

1061@RDSSnapshot.filter_registry.register('onhour') 

1062class RDSSnapshotOnHour(OnHour): 

1063 """Scheduled action on rds snapshot.""" 

1064 

1065 

1066@RDSSnapshot.filter_registry.register('instance') 

1067class SnapshotInstance(related.RelatedResourceFilter): 

1068 """Filter snapshots by their database attributes. 

1069 

1070 :example: 

1071 

1072 Find snapshots without an extant database 

1073 

1074 .. code-block:: yaml 

1075 

1076 policies: 

1077 - name: rds-snapshot-orphan 

1078 resource: aws.rds-snapshot 

1079 filters: 

1080 - type: instance 

1081 value: 0 

1082 value_type: resource_count 

1083 """ 

1084 schema = type_schema( 

1085 'instance', rinherit=ValueFilter.schema 

1086 ) 

1087 

1088 RelatedResource = "c7n.resources.rds.RDS" 

1089 RelatedIdsExpression = "DBInstanceIdentifier" 

1090 FetchThreshold = 5 

1091 

1092 

1093@RDSSnapshot.filter_registry.register('latest') 

1094class LatestSnapshot(Filter): 

1095 """Return the latest snapshot for each database. 

1096 """ 

1097 schema = type_schema('latest', automatic={'type': 'boolean'}) 

1098 permissions = ('rds:DescribeDBSnapshots',) 

1099 

1100 def process(self, resources, event=None): 

1101 results = [] 

1102 if not self.data.get('automatic', True): 

1103 resources = [r for r in resources if r['SnapshotType'] == 'manual'] 

1104 for db_identifier, snapshots in itertools.groupby( 

1105 resources, operator.itemgetter('DBInstanceIdentifier')): 

1106 results.append( 

1107 sorted(snapshots, 

1108 key=operator.itemgetter('SnapshotCreateTime'))[-1]) 

1109 return results 

1110 

1111 

1112@RDSSnapshot.filter_registry.register('age') 

1113class RDSSnapshotAge(AgeFilter): 

1114 """Filters RDS snapshots based on age (in days) 

1115 

1116 :example: 

1117 

1118 .. code-block:: yaml 

1119 

1120 policies: 

1121 - name: rds-snapshot-expired 

1122 resource: rds-snapshot 

1123 filters: 

1124 - type: age 

1125 days: 28 

1126 op: ge 

1127 actions: 

1128 - delete 

1129 """ 

1130 

1131 schema = type_schema( 

1132 'age', days={'type': 'number'}, 

1133 op={'$ref': '#/definitions/filters_common/comparison_operators'}) 

1134 

1135 date_attribute = 'SnapshotCreateTime' 

1136 

1137 def get_resource_date(self, i): 

1138 return i.get('SnapshotCreateTime') 

1139 

1140 

1141@RDSSnapshot.action_registry.register('restore') 

1142class RestoreInstance(BaseAction): 

1143 """Restore an rds instance from a snapshot. 

1144 

1145 Note this requires the snapshot or db deletion be taken 

1146 with the `copy-restore-info` boolean flag set to true, as 

1147 various instance metadata is stored on the snapshot as tags. 

1148 

1149 additional parameters to restore db instance api call be overriden 

1150 via `restore_options` settings. various modify db instance parameters 

1151 can be specified via `modify_options` settings. 

1152 """ 

1153 

1154 schema = type_schema( 

1155 'restore', 

1156 restore_options={'type': 'object'}, 

1157 modify_options={'type': 'object'}) 

1158 

1159 permissions = ( 

1160 'rds:ModifyDBInstance', 

1161 'rds:ModifyDBParameterGroup', 

1162 'rds:ModifyOptionGroup', 

1163 'rds:RebootDBInstance', 

1164 'rds:RestoreDBInstanceFromDBSnapshot') 

1165 

1166 poll_period = 60 

1167 restore_keys = { 

1168 'VPCSecurityGroups', 'MultiAZ', 'DBSubnetGroupName', 

1169 'InstanceClass', 'StorageType', 'ParameterGroupName', 

1170 'OptionGroupName'} 

1171 

1172 def validate(self): 

1173 found = False 

1174 for f in self.manager.iter_filters(): 

1175 if isinstance(f, LatestSnapshot): 

1176 found = True 

1177 if not found: 

1178 # do we really need this... 

1179 raise PolicyValidationError( 

1180 "must filter by latest to use restore action %s" % ( 

1181 self.manager.data,)) 

1182 return self 

1183 

1184 def process(self, resources): 

1185 client = local_session(self.manager.session_factory).client('rds') 

1186 # restore up to 10 in parallel, we have to wait on each. 

1187 with self.executor_factory( 

1188 max_workers=min(10, len(resources) or 1)) as w: 

1189 futures = {} 

1190 for r in resources: 

1191 tags = {t['Key']: t['Value'] for t in r['Tags']} 

1192 if not set(tags).issuperset(self.restore_keys): 

1193 self.log.warning( 

1194 "snapshot:%s missing restore tags", 

1195 r['DBSnapshotIdentifier']) 

1196 continue 

1197 futures[w.submit(self.process_instance, client, r)] = r 

1198 for f in as_completed(futures): 

1199 r = futures[f] 

1200 if f.exception(): 

1201 self.log.warning( 

1202 "Error restoring db:%s from:%s error:\n%s", 

1203 r['DBInstanceIdentifier'], r['DBSnapshotIdentifier'], 

1204 f.exception()) 

1205 continue 

1206 

1207 def process_instance(self, client, r): 

1208 params, post_modify = self.get_restore_from_tags(r) 

1209 self.manager.retry( 

1210 client.restore_db_instance_from_db_snapshot, **params) 

1211 waiter = client.get_waiter('db_instance_available') 

1212 # wait up to 40m 

1213 waiter.config.delay = self.poll_period 

1214 waiter.wait(DBInstanceIdentifier=params['DBInstanceIdentifier']) 

1215 self.manager.retry( 

1216 client.modify_db_instance, 

1217 DBInstanceIdentifier=params['DBInstanceIdentifier'], 

1218 ApplyImmediately=True, 

1219 **post_modify) 

1220 self.manager.retry( 

1221 client.reboot_db_instance, 

1222 DBInstanceIdentifier=params['DBInstanceIdentifier'], 

1223 ForceFailover=False) 

1224 

1225 def get_restore_from_tags(self, snapshot): 

1226 params, post_modify = {}, {} 

1227 tags = {t['Key']: t['Value'] for t in snapshot['Tags']} 

1228 

1229 params['DBInstanceIdentifier'] = snapshot['DBInstanceIdentifier'] 

1230 params['DBSnapshotIdentifier'] = snapshot['DBSnapshotIdentifier'] 

1231 params['MultiAZ'] = tags['MultiAZ'] == 'True' and True or False 

1232 params['DBSubnetGroupName'] = tags['DBSubnetGroupName'] 

1233 params['DBInstanceClass'] = tags['InstanceClass'] 

1234 params['CopyTagsToSnapshot'] = True 

1235 params['StorageType'] = tags['StorageType'] 

1236 params['OptionGroupName'] = tags['OptionGroupName'] 

1237 

1238 post_modify['DBParameterGroupName'] = tags['ParameterGroupName'] 

1239 post_modify['VpcSecurityGroupIds'] = tags['VPCSecurityGroups'].split(',') 

1240 

1241 params['Tags'] = [ 

1242 {'Key': k, 'Value': v} for k, v in tags.items() 

1243 if k not in self.restore_keys] 

1244 

1245 params.update(self.data.get('restore_options', {})) 

1246 post_modify.update(self.data.get('modify_options', {})) 

1247 return params, post_modify 

1248 

1249 

1250@RDSSnapshot.filter_registry.register('cross-account') 

1251class CrossAccountAccess(CrossAccountAccessFilter): 

1252 

1253 permissions = ('rds:DescribeDBSnapshotAttributes',) 

1254 attributes_key = 'c7n:attributes' 

1255 annotation_key = 'c7n:CrossAccountViolations' 

1256 

1257 def process(self, resources, event=None): 

1258 self.accounts = self.get_accounts() 

1259 results = [] 

1260 with self.executor_factory(max_workers=2) as w: 

1261 futures = [] 

1262 for resource_set in chunks(resources, 20): 

1263 futures.append(w.submit( 

1264 self.process_resource_set, resource_set)) 

1265 for f in as_completed(futures): 

1266 if f.exception(): 

1267 self.log.error( 

1268 "Exception checking cross account access\n %s" % ( 

1269 f.exception())) 

1270 continue 

1271 results.extend(f.result()) 

1272 return results 

1273 

1274 def process_resource_set(self, resource_set): 

1275 client = local_session(self.manager.session_factory).client('rds') 

1276 results = [] 

1277 for r in resource_set: 

1278 attrs = {t['AttributeName']: t['AttributeValues'] 

1279 for t in self.manager.retry( 

1280 client.describe_db_snapshot_attributes, 

1281 DBSnapshotIdentifier=r['DBSnapshotIdentifier'])[ 

1282 'DBSnapshotAttributesResult']['DBSnapshotAttributes']} 

1283 r[self.attributes_key] = attrs 

1284 shared_accounts = set(attrs.get('restore', [])) 

1285 delta_accounts = shared_accounts.difference(self.accounts) 

1286 if delta_accounts: 

1287 r[self.annotation_key] = list(delta_accounts) 

1288 results.append(r) 

1289 return results 

1290 

1291 

1292@RDSSnapshot.action_registry.register('set-permissions') 

1293class SetPermissions(BaseAction): 

1294 """Set permissions for copying or restoring an RDS snapshot 

1295 

1296 Use the 'add' and 'remove' parameters to control which accounts to 

1297 add or remove, respectively. The default is to remove any 

1298 permissions granted to other AWS accounts. 

1299 

1300 Use `remove: matched` in combination with the `cross-account` filter 

1301 for more flexible removal options such as preserving access for 

1302 a set of whitelisted accounts: 

1303 

1304 :example: 

1305 

1306 .. code-block:: yaml 

1307 

1308 policies: 

1309 - name: rds-snapshot-remove-cross-account 

1310 resource: rds-snapshot 

1311 filters: 

1312 - type: cross-account 

1313 whitelist: 

1314 - '112233445566' 

1315 actions: 

1316 - type: set-permissions 

1317 remove: matched 

1318 """ 

1319 schema = type_schema( 

1320 'set-permissions', 

1321 remove={'oneOf': [ 

1322 {'enum': ['matched']}, 

1323 {'type': 'array', 'items': { 

1324 'oneOf': [ 

1325 {'type': 'string', 'minLength': 12, 'maxLength': 12}, 

1326 {'enum': ['all']}, 

1327 ], 

1328 }} 

1329 ]}, 

1330 add={ 

1331 'type': 'array', 'items': { 

1332 'oneOf': [ 

1333 {'type': 'string', 'minLength': 12, 'maxLength': 12}, 

1334 {'enum': ['all']}, 

1335 ] 

1336 } 

1337 } 

1338 ) 

1339 

1340 permissions = ('rds:ModifyDBSnapshotAttribute',) 

1341 

1342 def validate(self): 

1343 if self.data.get('remove') == 'matched': 

1344 found = False 

1345 for f in self.manager.iter_filters(): 

1346 if isinstance(f, CrossAccountAccessFilter): 

1347 found = True 

1348 break 

1349 if not found: 

1350 raise PolicyValidationError( 

1351 "policy:%s filter:%s with matched requires cross-account filter" % ( 

1352 self.manager.ctx.policy.name, self.type)) 

1353 

1354 def process(self, snapshots): 

1355 client = local_session(self.manager.session_factory).client('rds') 

1356 for s in snapshots: 

1357 self.process_snapshot(client, s) 

1358 

1359 def process_snapshot(self, client, snapshot): 

1360 add_accounts = self.data.get('add', []) 

1361 remove_accounts = self.data.get('remove', []) 

1362 

1363 if not (add_accounts or remove_accounts): 

1364 if CrossAccountAccess.attributes_key not in snapshot: 

1365 attrs = { 

1366 t['AttributeName']: t['AttributeValues'] 

1367 for t in self.manager.retry( 

1368 client.describe_db_snapshot_attributes, 

1369 DBSnapshotIdentifier=snapshot['DBSnapshotIdentifier'] 

1370 )['DBSnapshotAttributesResult']['DBSnapshotAttributes'] 

1371 } 

1372 snapshot[CrossAccountAccess.attributes_key] = attrs 

1373 remove_accounts = snapshot[CrossAccountAccess.attributes_key].get('restore', []) 

1374 elif remove_accounts == 'matched': 

1375 remove_accounts = snapshot.get(CrossAccountAccess.annotation_key, []) 

1376 

1377 if add_accounts or remove_accounts: 

1378 client.modify_db_snapshot_attribute( 

1379 DBSnapshotIdentifier=snapshot['DBSnapshotIdentifier'], 

1380 AttributeName='restore', 

1381 ValuesToRemove=remove_accounts, 

1382 ValuesToAdd=add_accounts) 

1383 

1384 

1385@RDSSnapshot.action_registry.register('region-copy') 

1386class RegionCopySnapshot(BaseAction): 

1387 """Copy a snapshot across regions. 

1388 

1389 Note there is a max in flight for cross region rds snapshots 

1390 of 5 per region. This action will attempt to retry automatically 

1391 for an hr. 

1392 

1393 Example:: 

1394 

1395 - name: copy-encrypted-snapshots 

1396 description: | 

1397 copy snapshots under 1 day old to dr region with kms 

1398 resource: rds-snapshot 

1399 region: us-east-1 

1400 filters: 

1401 - Status: available 

1402 - type: value 

1403 key: SnapshotCreateTime 

1404 value_type: age 

1405 value: 1 

1406 op: less-than 

1407 actions: 

1408 - type: region-copy 

1409 target_region: us-east-2 

1410 target_key: arn:aws:kms:us-east-2:0000:key/cb291f53-c9cf61 

1411 copy_tags: true 

1412 tags: 

1413 OriginRegion: us-east-1 

1414 """ 

1415 

1416 schema = type_schema( 

1417 'region-copy', 

1418 target_region={'type': 'string'}, 

1419 target_key={'type': 'string'}, 

1420 copy_tags={'type': 'boolean'}, 

1421 tags={'type': 'object'}, 

1422 required=('target_region',)) 

1423 

1424 permissions = ('rds:CopyDBSnapshot',) 

1425 min_delay = 120 

1426 max_attempts = 30 

1427 

1428 def validate(self): 

1429 if self.data.get('target_region') and self.manager.data.get('mode'): 

1430 raise PolicyValidationError( 

1431 "cross region snapshot may require waiting for " 

1432 "longer then lambda runtime allows %s" % (self.manager.data,)) 

1433 return self 

1434 

1435 def process(self, resources): 

1436 if self.data['target_region'] == self.manager.config.region: 

1437 self.log.warning( 

1438 "Source and destination region are the same, skipping copy") 

1439 return 

1440 for resource_set in chunks(resources, 20): 

1441 self.process_resource_set(resource_set) 

1442 

1443 def process_resource(self, target, key, tags, snapshot): 

1444 p = {} 

1445 if key: 

1446 p['KmsKeyId'] = key 

1447 p['TargetDBSnapshotIdentifier'] = snapshot[ 

1448 'DBSnapshotIdentifier'].replace(':', '-') 

1449 p['SourceRegion'] = self.manager.config.region 

1450 p['SourceDBSnapshotIdentifier'] = snapshot['DBSnapshotArn'] 

1451 

1452 if self.data.get('copy_tags', True): 

1453 p['CopyTags'] = True 

1454 if tags: 

1455 p['Tags'] = tags 

1456 

1457 retry = get_retry( 

1458 ('SnapshotQuotaExceeded',), 

1459 # TODO make this configurable, class defaults to 1hr 

1460 min_delay=self.min_delay, 

1461 max_attempts=self.max_attempts, 

1462 log_retries=logging.DEBUG) 

1463 try: 

1464 result = retry(target.copy_db_snapshot, **p) 

1465 except ClientError as e: 

1466 if e.response['Error']['Code'] == 'DBSnapshotAlreadyExists': 

1467 self.log.warning( 

1468 "Snapshot %s already exists in target region", 

1469 snapshot['DBSnapshotIdentifier']) 

1470 return 

1471 raise 

1472 snapshot['c7n:CopiedSnapshot'] = result[ 

1473 'DBSnapshot']['DBSnapshotArn'] 

1474 

1475 def process_resource_set(self, resource_set): 

1476 target_client = self.manager.session_factory( 

1477 region=self.data['target_region']).client('rds') 

1478 target_key = self.data.get('target_key') 

1479 tags = [{'Key': k, 'Value': v} for k, v 

1480 in self.data.get('tags', {}).items()] 

1481 

1482 for snapshot_set in chunks(resource_set, 5): 

1483 for r in snapshot_set: 

1484 # If tags are supplied, copy tags are ignored, and 

1485 # we need to augment the tag set with the original 

1486 # resource tags to preserve the common case. 

1487 rtags = tags and list(tags) or None 

1488 if tags and self.data.get('copy_tags', True): 

1489 rtags.extend(r['Tags']) 

1490 self.process_resource(target_client, target_key, rtags, r) 

1491 

1492 

1493@RDSSnapshot.action_registry.register('delete') 

1494class RDSSnapshotDelete(BaseAction): 

1495 """Deletes a RDS snapshot resource 

1496 

1497 :example: 

1498 

1499 .. code-block:: yaml 

1500 

1501 policies: 

1502 - name: rds-snapshot-delete-stale 

1503 resource: rds-snapshot 

1504 filters: 

1505 - type: age 

1506 days: 28 

1507 op: ge 

1508 actions: 

1509 - delete 

1510 """ 

1511 

1512 schema = type_schema('delete') 

1513 permissions = ('rds:DeleteDBSnapshot',) 

1514 

1515 def process(self, snapshots): 

1516 snapshots = self.filter_resources(snapshots, 'SnapshotType', ('manual',)) 

1517 if not snapshots: 

1518 return [] 

1519 log.info("Deleting %d rds snapshots", len(snapshots)) 

1520 with self.executor_factory(max_workers=3) as w: 

1521 futures = [] 

1522 for snapshot_set in chunks(reversed(snapshots), size=50): 

1523 futures.append( 

1524 w.submit(self.process_snapshot_set, snapshot_set)) 

1525 for f in as_completed(futures): 

1526 if f.exception(): 

1527 self.log.error( 

1528 "Exception deleting snapshot set \n %s", 

1529 f.exception()) 

1530 return snapshots 

1531 

1532 def process_snapshot_set(self, snapshots_set): 

1533 c = local_session(self.manager.session_factory).client('rds') 

1534 for s in snapshots_set: 

1535 c.delete_db_snapshot( 

1536 DBSnapshotIdentifier=s['DBSnapshotIdentifier']) 

1537 

1538 

1539@actions.register('modify-security-groups') 

1540class RDSModifyVpcSecurityGroups(ModifyVpcSecurityGroupsAction): 

1541 

1542 permissions = ('rds:ModifyDBInstance', 'rds:ModifyDBCluster') 

1543 vpc_expr = 'DBSubnetGroup.VpcId' 

1544 

1545 def process(self, rds_instances): 

1546 replication_group_map = {} 

1547 client = local_session(self.manager.session_factory).client('rds') 

1548 groups = super(RDSModifyVpcSecurityGroups, self).get_groups( 

1549 rds_instances) 

1550 

1551 # either build map for DB cluster or modify DB instance directly 

1552 for idx, i in enumerate(rds_instances): 

1553 if i.get('DBClusterIdentifier'): 

1554 # build map of Replication Groups to Security Groups 

1555 replication_group_map[i['DBClusterIdentifier']] = groups[idx] 

1556 else: 

1557 client.modify_db_instance( 

1558 DBInstanceIdentifier=i['DBInstanceIdentifier'], 

1559 VpcSecurityGroupIds=groups[idx]) 

1560 

1561 # handle DB cluster, if necessary 

1562 for idx, r in enumerate(replication_group_map.keys()): 

1563 client.modify_db_cluster( 

1564 DBClusterIdentifier=r, 

1565 VpcSecurityGroupIds=replication_group_map[r] 

1566 ) 

1567 

1568 

1569class DescribeSubnetGroup(DescribeSource): 

1570 

1571 def augment(self, resources): 

1572 _db_subnet_group_tags( 

1573 resources, self.manager.session_factory, 

1574 self.manager.executor_factory, self.manager.retry) 

1575 return resources 

1576 

1577 

1578@resources.register('rds-subnet-group') 

1579class RDSSubnetGroup(QueryResourceManager): 

1580 """RDS subnet group.""" 

1581 

1582 class resource_type(TypeInfo): 

1583 service = 'rds' 

1584 arn_type = 'subgrp' 

1585 id = name = 'DBSubnetGroupName' 

1586 arn_separator = ':' 

1587 enum_spec = ( 

1588 'describe_db_subnet_groups', 'DBSubnetGroups', None) 

1589 filter_name = 'DBSubnetGroupName' 

1590 filter_type = 'scalar' 

1591 permissions_enum = ('rds:DescribeDBSubnetGroups',) 

1592 cfn_type = config_type = 'AWS::RDS::DBSubnetGroup' 

1593 universal_taggable = object() 

1594 

1595 source_mapping = { 

1596 'config': ConfigSource, 

1597 'describe': DescribeSubnetGroup 

1598 } 

1599 

1600 

1601def _db_subnet_group_tags(subnet_groups, session_factory, executor_factory, retry): 

1602 client = local_session(session_factory).client('rds') 

1603 

1604 def process_tags(g): 

1605 try: 

1606 g['Tags'] = client.list_tags_for_resource( 

1607 ResourceName=g['DBSubnetGroupArn'])['TagList'] 

1608 return g 

1609 except client.exceptions.DBSubnetGroupNotFoundFault: 

1610 return None 

1611 

1612 return list(filter(None, map(process_tags, subnet_groups))) 

1613 

1614 

1615@RDSSubnetGroup.action_registry.register('delete') 

1616class RDSSubnetGroupDeleteAction(BaseAction): 

1617 """Action to delete RDS Subnet Group 

1618 

1619 It is recommended to apply a filter to the delete policy to avoid unwanted 

1620 deletion of any rds subnet groups. 

1621 

1622 :example: 

1623 

1624 .. code-block:: yaml 

1625 

1626 policies: 

1627 - name: rds-subnet-group-delete 

1628 resource: rds-subnet-group 

1629 filters: 

1630 - Instances: [] 

1631 actions: 

1632 - delete 

1633 """ 

1634 

1635 schema = type_schema('delete') 

1636 permissions = ('rds:DeleteDBSubnetGroup',) 

1637 

1638 def process(self, subnet_group): 

1639 with self.executor_factory(max_workers=2) as w: 

1640 list(w.map(self.process_subnetgroup, subnet_group)) 

1641 

1642 def process_subnetgroup(self, subnet_group): 

1643 client = local_session(self.manager.session_factory).client('rds') 

1644 client.delete_db_subnet_group(DBSubnetGroupName=subnet_group['DBSubnetGroupName']) 

1645 

1646 

1647@RDSSubnetGroup.filter_registry.register('unused') 

1648class UnusedRDSSubnetGroup(Filter): 

1649 """Filters all launch rds subnet groups that are not in use but exist 

1650 

1651 :example: 

1652 

1653 .. code-block:: yaml 

1654 

1655 policies: 

1656 - name: rds-subnet-group-delete-unused 

1657 resource: rds-subnet-group 

1658 filters: 

1659 - unused 

1660 """ 

1661 

1662 schema = type_schema('unused') 

1663 

1664 def get_permissions(self): 

1665 return self.manager.get_resource_manager('rds').get_permissions() 

1666 

1667 def process(self, configs, event=None): 

1668 rds = self.manager.get_resource_manager('rds').resources() 

1669 self.used = set(jmespath_search('[].DBSubnetGroup.DBSubnetGroupName', rds)) 

1670 self.used.update(set(jmespath_search('[].DBSubnetGroup.DBSubnetGroupName', 

1671 self.manager.get_resource_manager('rds-cluster').resources(augment=False)))) 

1672 return super(UnusedRDSSubnetGroup, self).process(configs) 

1673 

1674 def __call__(self, config): 

1675 return config['DBSubnetGroupName'] not in self.used 

1676 

1677 

1678@filters.register('db-parameter') 

1679class ParameterFilter(ValueFilter): 

1680 """ 

1681 Applies value type filter on set db parameter values. 

1682 :example: 

1683 

1684 .. code-block:: yaml 

1685 

1686 policies: 

1687 - name: rds-pg 

1688 resource: rds 

1689 filters: 

1690 - type: db-parameter 

1691 key: someparam 

1692 op: eq 

1693 value: someval 

1694 """ 

1695 

1696 schema = type_schema('db-parameter', rinherit=ValueFilter.schema) 

1697 schema_alias = False 

1698 permissions = ('rds:DescribeDBInstances', 'rds:DescribeDBParameters', ) 

1699 policy_annotation = 'c7n:MatchedDBParameter' 

1700 

1701 @staticmethod 

1702 def recast(val, datatype): 

1703 """ Re-cast the value based upon an AWS supplied datatype 

1704 and treat nulls sensibly. 

1705 """ 

1706 ret_val = val 

1707 if datatype == 'string': 

1708 ret_val = str(val) 

1709 elif datatype == 'boolean': 

1710 # AWS returns 1s and 0s for boolean for most of the cases 

1711 if val.isdigit(): 

1712 ret_val = bool(int(val)) 

1713 # AWS returns 'TRUE,FALSE' for Oracle engine 

1714 elif val == 'TRUE': 

1715 ret_val = True 

1716 elif val == 'FALSE': 

1717 ret_val = False 

1718 elif datatype == 'integer': 

1719 if val.isdigit(): 

1720 ret_val = int(val) 

1721 elif datatype == 'float': 

1722 ret_val = float(val) if val else 0.0 

1723 

1724 return ret_val 

1725 

1726 # Private method for 'DBParameterGroupName' paginator 

1727 def _get_param_list(self, pg): 

1728 client = local_session(self.manager.session_factory).client('rds') 

1729 paginator = client.get_paginator('describe_db_parameters') 

1730 param_list = list(itertools.chain(*[p['Parameters'] 

1731 for p in paginator.paginate(DBParameterGroupName=pg)])) 

1732 return param_list 

1733 

1734 def handle_paramgroup_cache(self, param_groups): 

1735 pgcache = {} 

1736 cache = self.manager._cache 

1737 

1738 with cache: 

1739 for pg in param_groups: 

1740 cache_key = { 

1741 'region': self.manager.config.region, 

1742 'account_id': self.manager.config.account_id, 

1743 'rds-pg': pg} 

1744 pg_values = cache.get(cache_key) 

1745 if pg_values is not None: 

1746 pgcache[pg] = pg_values 

1747 continue 

1748 param_list = self._get_param_list(pg) 

1749 pgcache[pg] = { 

1750 p['ParameterName']: self.recast(p['ParameterValue'], p['DataType']) 

1751 for p in param_list if 'ParameterValue' in p} 

1752 cache.save(cache_key, pgcache[pg]) 

1753 return pgcache 

1754 

1755 def process(self, resources, event=None): 

1756 results = [] 

1757 parameter_group_list = {db['DBParameterGroups'][0]['DBParameterGroupName'] 

1758 for db in resources} 

1759 paramcache = self.handle_paramgroup_cache(parameter_group_list) 

1760 for resource in resources: 

1761 for pg in resource['DBParameterGroups']: 

1762 pg_values = paramcache[pg['DBParameterGroupName']] 

1763 if self.match(pg_values): 

1764 resource.setdefault(self.policy_annotation, []).append( 

1765 self.data.get('key')) 

1766 results.append(resource) 

1767 break 

1768 return results 

1769 

1770 

1771@actions.register('modify-db') 

1772class ModifyDb(BaseAction): 

1773 """Modifies an RDS instance based on specified parameter 

1774 using ModifyDbInstance. 

1775 

1776 'Update' is an array with with key value pairs that should be set to 

1777 the property and value you wish to modify. 

1778 'Immediate" determines whether the modification is applied immediately 

1779 or not. If 'immediate' is not specified, default is false. 

1780 

1781 :example: 

1782 

1783 .. code-block:: yaml 

1784 

1785 policies: 

1786 - name: disable-rds-deletion-protection 

1787 resource: rds 

1788 filters: 

1789 - DeletionProtection: true 

1790 - PubliclyAccessible: true 

1791 actions: 

1792 - type: modify-db 

1793 update: 

1794 - property: 'DeletionProtection' 

1795 value: false 

1796 - property: 'PubliclyAccessible' 

1797 value: false 

1798 immediate: true 

1799 """ 

1800 

1801 schema = type_schema( 

1802 'modify-db', 

1803 immediate={"type": 'boolean'}, 

1804 update={ 

1805 'type': 'array', 

1806 'items': { 

1807 'type': 'object', 

1808 'properties': { 

1809 'property': {'type': 'string', 'enum': [ 

1810 'AllocatedStorage', 

1811 'DBInstanceClass', 

1812 'DBSubnetGroupName', 

1813 'DBSecurityGroups', 

1814 'VpcSecurityGroupIds', 

1815 'MasterUserPassword', 

1816 'DBParameterGroupName', 

1817 'BackupRetentionPeriod', 

1818 'PreferredBackupWindow', 

1819 'PreferredMaintenanceWindow', 

1820 'MultiAZ', 

1821 'EngineVersion', 

1822 'AllowMajorVersionUpgrade', 

1823 'AutoMinorVersionUpgrade', 

1824 'LicenseModel', 

1825 'Iops', 

1826 'OptionGroupName', 

1827 'NewDBInstanceIdentifier', 

1828 'StorageType', 

1829 'TdeCredentialArn', 

1830 'TdeCredentialPassword', 

1831 'CACertificateIdentifier', 

1832 'Domain', 

1833 'CopyTagsToSnapshot', 

1834 'MonitoringInterval', 

1835 'MonitoringRoleARN', 

1836 'DBPortNumber', 

1837 'PubliclyAccessible', 

1838 'DomainIAMRoleName', 

1839 'PromotionTier', 

1840 'EnableIAMDatabaseAuthentication', 

1841 'EnablePerformanceInsights', 

1842 'PerformanceInsightsKMSKeyId', 

1843 'PerformanceInsightsRetentionPeriod', 

1844 'CloudwatchLogsExportConfiguration', 

1845 'ProcessorFeatures', 

1846 'UseDefaultProcessorFeatures', 

1847 'DeletionProtection', 

1848 'MaxAllocatedStorage', 

1849 'CertificateRotationRestart']}, 

1850 'value': {} 

1851 }, 

1852 }, 

1853 }, 

1854 required=('update',)) 

1855 

1856 permissions = ('rds:ModifyDBInstance',) 

1857 conversion_map = { 

1858 'DBSubnetGroupName': 'DBSubnetGroup.DBSubnetGroupName', 

1859 'VpcSecurityGroupIds': 'VpcSecurityGroups[].VpcSecurityGroupId', 

1860 'DBParameterGroupName': 'DBParameterGroups[].DBParameterGroupName', 

1861 'OptionGroupName': 'OptionGroupMemberships[].OptionGroupName', 

1862 'NewDBInstanceIdentifier': 'DBInstanceIdentifier', 

1863 'Domain': 'DomainMemberships[].DomainName', 

1864 'DBPortNumber': 'Endpoint.Port', 

1865 'EnablePerformanceInsights': 'PerformanceInsightsEnabled', 

1866 'CloudwatchLogsExportConfiguration': 'EnabledCloudwatchLogsExports' 

1867 } 

1868 

1869 def validate(self): 

1870 if self.data.get('update'): 

1871 update_dict = dict((i['property'], i['value']) for i in self.data.get('update')) 

1872 if ('MonitoringInterval' in update_dict and update_dict['MonitoringInterval'] > 0 and 

1873 'MonitoringRoleARN' not in update_dict): 

1874 raise PolicyValidationError( 

1875 "A MonitoringRoleARN value is required \ 

1876 if you specify a MonitoringInterval value other than 0") 

1877 if ('CloudwatchLogsExportConfiguration' in update_dict 

1878 and all( 

1879 k not in update_dict.get('CloudwatchLogsExportConfiguration') 

1880 for k in ('EnableLogTypes', 'DisableLogTypes'))): 

1881 raise PolicyValidationError( 

1882 "A EnableLogTypes or DisableLogTypes input list is required\ 

1883 for setting CloudwatchLogsExportConfiguration") 

1884 return self 

1885 

1886 def process(self, resources): 

1887 c = local_session(self.manager.session_factory).client('rds') 

1888 for r in resources: 

1889 param = { 

1890 u['property']: u['value'] for u in self.data.get('update') 

1891 if r.get( 

1892 u['property'], 

1893 jmespath_search( 

1894 self.conversion_map.get(u['property'], 'None'), r)) 

1895 != u['value']} 

1896 if not param: 

1897 continue 

1898 param['ApplyImmediately'] = self.data.get('immediate', False) 

1899 param['DBInstanceIdentifier'] = r['DBInstanceIdentifier'] 

1900 try: 

1901 c.modify_db_instance(**param) 

1902 except c.exceptions.DBInstanceNotFoundFault: 

1903 raise 

1904 

1905 

1906@resources.register('rds-reserved') 

1907class ReservedRDS(QueryResourceManager): 

1908 """Lists all active rds reservations 

1909 

1910 :example: 

1911 

1912 .. code-block:: yaml 

1913 

1914 policies: 

1915 - name: existing-rds-reservations 

1916 resource: rds-reserved 

1917 filters: 

1918 - State: active 

1919 """ 

1920 

1921 class resource_type(TypeInfo): 

1922 service = 'rds' 

1923 name = id = 'ReservedDBInstanceId' 

1924 date = 'StartTime' 

1925 enum_spec = ( 

1926 'describe_reserved_db_instances', 'ReservedDBInstances', None) 

1927 filter_name = 'ReservedDBInstances' 

1928 filter_type = 'list' 

1929 arn_type = "ri" 

1930 arn = "ReservedDBInstanceArn" 

1931 permissions_enum = ('rds:DescribeReservedDBInstances',) 

1932 universal_taggable = object() 

1933 

1934 augment = universal_augment 

1935 

1936 

1937RDS.filter_registry.register('consecutive-aws-backups', ConsecutiveAwsBackupsFilter) 

1938 

1939 

1940@filters.register('consecutive-snapshots') 

1941class ConsecutiveSnapshots(Filter): 

1942 """Returns instances where number of consective daily snapshots is 

1943 equal to/or greater than n days. 

1944 

1945 :example: 

1946 

1947 .. code-block:: yaml 

1948 

1949 policies: 

1950 - name: rds-daily-snapshot-count 

1951 resource: rds 

1952 filters: 

1953 - type: consecutive-snapshots 

1954 days: 7 

1955 """ 

1956 schema = type_schema('consecutive-snapshots', days={'type': 'number', 'minimum': 1}, 

1957 required=['days']) 

1958 permissions = ('rds:DescribeDBSnapshots', 'rds:DescribeDBInstances') 

1959 annotation = 'c7n:DBSnapshots' 

1960 

1961 def process_resource_set(self, client, resources): 

1962 rds_instances = [r['DBInstanceIdentifier'] for r in resources] 

1963 paginator = client.get_paginator('describe_db_snapshots') 

1964 paginator.PAGE_ITERATOR_CLS = RetryPageIterator 

1965 db_snapshots = paginator.paginate(Filters=[{'Name': 'db-instance-id', 

1966 'Values': rds_instances}]).build_full_result().get('DBSnapshots', []) 

1967 

1968 inst_map = {} 

1969 for snapshot in db_snapshots: 

1970 inst_map.setdefault(snapshot['DBInstanceIdentifier'], []).append(snapshot) 

1971 for r in resources: 

1972 r[self.annotation] = inst_map.get(r['DBInstanceIdentifier'], []) 

1973 

1974 def process(self, resources, event=None): 

1975 client = local_session(self.manager.session_factory).client('rds') 

1976 results = [] 

1977 retention = self.data.get('days') 

1978 utcnow = datetime.datetime.utcnow() 

1979 expected_dates = set() 

1980 for days in range(1, retention + 1): 

1981 expected_dates.add((utcnow - timedelta(days=days)).strftime('%Y-%m-%d')) 

1982 

1983 for resource_set in chunks( 

1984 [r for r in resources if self.annotation not in r], 50): 

1985 self.process_resource_set(client, resource_set) 

1986 

1987 for r in resources: 

1988 snapshot_dates = set() 

1989 for snapshot in r[self.annotation]: 

1990 if snapshot['Status'] == 'available': 

1991 snapshot_dates.add(snapshot['SnapshotCreateTime'].strftime('%Y-%m-%d')) 

1992 if expected_dates.issubset(snapshot_dates): 

1993 results.append(r) 

1994 return results 

1995 

1996 

1997@filters.register('engine') 

1998class EngineFilter(ValueFilter): 

1999 """ 

2000 Filter a rds resource based on its Engine Metadata 

2001 

2002 :example: 

2003 

2004 .. code-block:: yaml 

2005 

2006 policies: 

2007 - name: find-deprecated-versions 

2008 resource: aws.rds 

2009 filters: 

2010 - type: engine 

2011 key: Status 

2012 value: deprecated 

2013 """ 

2014 

2015 schema = type_schema('engine', rinherit=ValueFilter.schema) 

2016 

2017 permissions = ("rds:DescribeDBEngineVersions", ) 

2018 

2019 def process(self, resources, event=None): 

2020 client = local_session(self.manager.session_factory).client('rds') 

2021 

2022 engines = set() 

2023 engine_versions = set() 

2024 for r in resources: 

2025 engines.add(r['Engine']) 

2026 engine_versions.add(r['EngineVersion']) 

2027 

2028 paginator = client.get_paginator('describe_db_engine_versions') 

2029 response = paginator.paginate( 

2030 Filters=[ 

2031 {'Name': 'engine', 'Values': list(engines)}, 

2032 {'Name': 'engine-version', 'Values': list(engine_versions)} 

2033 ], 

2034 IncludeAll=True, 

2035 ) 

2036 all_versions = {} 

2037 matched = [] 

2038 for page in response: 

2039 for e in page['DBEngineVersions']: 

2040 all_versions.setdefault(e['Engine'], {}) 

2041 all_versions[e['Engine']][e['EngineVersion']] = e 

2042 for r in resources: 

2043 v = all_versions[r['Engine']][r['EngineVersion']] 

2044 if self.match(v): 

2045 r['c7n:Engine'] = v 

2046 matched.append(r) 

2047 return matched 

2048 

2049 

2050class DescribeDBProxy(DescribeSource): 

2051 def augment(self, resources): 

2052 return universal_augment(self.manager, resources) 

2053 

2054 

2055@resources.register('rds-proxy') 

2056class RDSProxy(QueryResourceManager): 

2057 """Resource Manager for RDS DB Proxies 

2058 

2059 :example: 

2060 

2061 .. code-block:: yaml 

2062 

2063 policies: 

2064 - name: rds-proxy-tls-check 

2065 resource: rds-proxy 

2066 filters: 

2067 - type: value 

2068 key: RequireTLS 

2069 value: false 

2070 """ 

2071 

2072 class resource_type(TypeInfo): 

2073 service = 'rds' 

2074 name = id = 'DBProxyName' 

2075 date = 'CreatedDate' 

2076 enum_spec = ('describe_db_proxies', 'DBProxies', None) 

2077 arn = 'DBProxyArn' 

2078 arn_type = 'db-proxy' 

2079 cfn_type = config_type = 'AWS::RDS::DBInstance' 

2080 permissions_enum = ('rds:DescribeDBProxies',) 

2081 universal_taggable = object() 

2082 

2083 source_mapping = { 

2084 'describe': DescribeDBProxy, 

2085 'config': ConfigSource 

2086 } 

2087 

2088 

2089@RDSProxy.action_registry.register('delete') 

2090class DeleteRDSProxy(BaseAction): 

2091 """ 

2092 Deletes a RDS Proxy 

2093 

2094 :example: 

2095 

2096 .. code-block:: yaml 

2097 

2098 policies: 

2099 - name: delete-rds-proxy 

2100 resource: aws.rds-proxy 

2101 filters: 

2102 - type: value 

2103 key: "DBProxyName" 

2104 op: eq 

2105 value: "proxy-test-1" 

2106 actions: 

2107 - type: delete 

2108 """ 

2109 

2110 schema = type_schema('delete') 

2111 

2112 permissions = ('rds:DeleteDBProxy',) 

2113 

2114 def process(self, resources): 

2115 client = local_session(self.manager.session_factory).client('rds') 

2116 for r in resources: 

2117 self.manager.retry( 

2118 client.delete_db_proxy, DBProxyName=r['DBProxyName'], 

2119 ignore_err_codes=('DBProxyNotFoundFault', 

2120 'InvalidDBProxyStateFault')) 

2121 

2122 

2123@RDSProxy.filter_registry.register('subnet') 

2124class RDSProxySubnetFilter(net_filters.SubnetFilter): 

2125 

2126 RelatedIdsExpression = "VpcSubnetIds[]" 

2127 

2128 

2129@RDSProxy.filter_registry.register('security-group') 

2130class RDSProxySecurityGroupFilter(net_filters.SecurityGroupFilter): 

2131 

2132 RelatedIdsExpression = "VpcSecurityGroupIds[]" 

2133 

2134@RDSProxy.filter_registry.register('vpc') 

2135class RDSProxyVpcFilter(net_filters.VpcFilter): 

2136 

2137 RelatedIdsExpression = "VpcId" 

2138 

2139 

2140@filters.register('db-option-groups') 

2141class DbOptionGroups(ValueFilter): 

2142 """This filter describes RDS option groups for associated RDS instances. 

2143 Use this filter in conjunction with jmespath and value filter operators 

2144 to filter RDS instance based on their option groups 

2145 

2146 :example: 

2147 

2148 .. code-block:: yaml 

2149 

2150 policies: 

2151 - name: rds-data-in-transit-encrypted 

2152 resource: aws.rds 

2153 filters: 

2154 - type: db-option-groups 

2155 key: Options[].OptionName 

2156 op: intersect 

2157 value: 

2158 - SSL 

2159 - NATIVE_NETWORK_ENCRYPTION 

2160 

2161 :example: 

2162 

2163 .. code-block:: yaml 

2164 

2165 policies: 

2166 - name: rds-oracle-encryption-in-transit 

2167 resource: aws.rds 

2168 filters: 

2169 - Engine: oracle-ee 

2170 - type: db-option-groups 

2171 key: Options[].OptionSettings[?Name == 'SQLNET.ENCRYPTION_SERVER'].Value[] 

2172 value: 

2173 - REQUIRED 

2174 """ 

2175 

2176 schema = type_schema('db-option-groups', rinherit=ValueFilter.schema) 

2177 schema_alias = False 

2178 permissions = ('rds:DescribeDBInstances', 'rds:DescribeOptionGroups', ) 

2179 policy_annotation = 'c7n:MatchedDBOptionGroups' 

2180 

2181 def handle_optiongroup_cache(self, client, paginator, option_groups): 

2182 ogcache = {} 

2183 cache = self.manager._cache 

2184 

2185 with cache: 

2186 for og in option_groups: 

2187 cache_key = { 

2188 'region': self.manager.config.region, 

2189 'account_id': self.manager.config.account_id, 

2190 'rds-pg': og} 

2191 og_values = cache.get(cache_key) 

2192 if og_values is not None: 

2193 ogcache[og] = og_values 

2194 continue 

2195 option_groups_list = list(itertools.chain(*[p['OptionGroupsList'] 

2196 for p in paginator.paginate(OptionGroupName=og)])) 

2197 

2198 ogcache[og] = {} 

2199 for option_group in option_groups_list: 

2200 ogcache[og] = option_group 

2201 

2202 cache.save(cache_key, ogcache[og]) 

2203 

2204 return ogcache 

2205 

2206 def process(self, resources, event=None): 

2207 results = [] 

2208 client = local_session(self.manager.session_factory).client('rds') 

2209 paginator = client.get_paginator('describe_option_groups') 

2210 option_groups = [db['OptionGroupMemberships'][0]['OptionGroupName'] 

2211 for db in resources] 

2212 optioncache = self.handle_optiongroup_cache(client, paginator, option_groups) 

2213 

2214 for resource in resources: 

2215 for og in resource['OptionGroupMemberships']: 

2216 og_values = optioncache[og['OptionGroupName']] 

2217 if self.match(og_values): 

2218 resource.setdefault(self.policy_annotation, []).append({ 

2219 k: jmespath_search(k, og_values) 

2220 for k in {'OptionGroupName', self.data.get('key')} 

2221 }) 

2222 results.append(resource) 

2223 break 

2224 

2225 return results 

2226 

2227 

2228@filters.register('pending-maintenance') 

2229class PendingMaintenance(Filter): 

2230 """ Scan DB instances for those with pending maintenance 

2231 

2232 :example: 

2233 

2234 .. code-block:: yaml 

2235 

2236 policies: 

2237 - name: rds-pending-maintenance 

2238 resource: aws.rds 

2239 filters: 

2240 - pending-maintenance 

2241 """ 

2242 

2243 schema = type_schema('pending-maintenance') 

2244 permissions = ('rds:DescribePendingMaintenanceActions',) 

2245 

2246 def process(self, resources, event=None): 

2247 client = local_session(self.manager.session_factory).client('rds') 

2248 

2249 results = [] 

2250 pending_maintenance = set() 

2251 paginator = client.get_paginator('describe_pending_maintenance_actions') 

2252 for page in paginator.paginate(): 

2253 pending_maintenance.update( 

2254 {action['ResourceIdentifier'] for action in page['PendingMaintenanceActions']} 

2255 ) 

2256 

2257 for r in resources: 

2258 if r['DBInstanceArn'] in pending_maintenance: 

2259 results.append(r) 

2260 

2261 return results