Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/c7n/resources/rds.py: 40%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

931 statements  

1# Copyright The Cloud Custodian Authors. 

2# SPDX-License-Identifier: Apache-2.0 

3""" 

4RDS Resource Manager 

5==================== 

6 

7Example Policies 

8---------------- 

9 

10Find rds instances that are publicly available 

11 

12.. code-block:: yaml 

13 

14 policies: 

15 - name: rds-public 

16 resource: rds 

17 filters: 

18 - PubliclyAccessible: true 

19 

20Find rds instances that are not encrypted 

21 

22.. code-block:: yaml 

23 

24 policies: 

25 - name: rds-non-encrypted 

26 resource: rds 

27 filters: 

28 - type: value 

29 key: StorageEncrypted 

30 value: true 

31 op: ne 

32 

33""" 

34import functools 

35import itertools 

36import logging 

37import operator 

38import re 

39import datetime 

40 

41from datetime import timedelta 

42 

43from decimal import Decimal as D, ROUND_HALF_UP 

44 

45from c7n.vendored.distutils.version import LooseVersion 

46from botocore.exceptions import ClientError 

47from concurrent.futures import as_completed 

48 

49from c7n.actions import ( 

50 ActionRegistry, BaseAction, ModifyVpcSecurityGroupsAction) 

51 

52from c7n.exceptions import PolicyValidationError 

53from c7n.filters import ( 

54 CrossAccountAccessFilter, FilterRegistry, Filter, ValueFilter, AgeFilter) 

55from c7n.filters.offhours import OffHour, OnHour 

56from c7n.filters import related 

57import c7n.filters.vpc as net_filters 

58from c7n.manager import resources 

59from c7n.query import ( 

60 QueryResourceManager, DescribeSource, ConfigSource, TypeInfo, RetryPageIterator) 

61from c7n import deprecated, tags 

62from c7n.tags import universal_augment 

63 

64from c7n.utils import ( 

65 local_session, type_schema, get_retry, chunks, snapshot_identifier, 

66 merge_dict_list, filter_empty, jmespath_search) 

67from c7n.resources.kms import ResourceKmsKeyAlias 

68from c7n.resources.securityhub import PostFinding 

69from c7n.filters.backup import ConsecutiveAwsBackupsFilter 

70 

71log = logging.getLogger('custodian.rds') 

72 

73filters = FilterRegistry('rds.filters') 

74actions = ActionRegistry('rds.actions') 

75 

76 

77class DescribeRDS(DescribeSource): 

78 

79 def augment(self, dbs): 

80 for d in dbs: 

81 d['Tags'] = d.pop('TagList', ()) 

82 return dbs 

83 

84 

85class ConfigRDS(ConfigSource): 

86 

87 def load_resource(self, item): 

88 resource = super().load_resource(item) 

89 for k in list(resource.keys()): 

90 if k.startswith('Db'): 

91 resource["DB%s" % k[2:]] = resource[k] 

92 return resource 

93 

94 

95@resources.register('rds') 

96class RDS(QueryResourceManager): 

97 """Resource manager for RDS DB instances. 

98 """ 

99 

100 class resource_type(TypeInfo): 

101 service = 'rds' 

102 arn_type = 'db' 

103 arn_separator = ':' 

104 enum_spec = ('describe_db_instances', 'DBInstances', None) 

105 id = 'DBInstanceIdentifier' 

106 config_id = 'DbiResourceId' 

107 name = 'Endpoint.Address' 

108 filter_name = 'DBInstanceIdentifier' 

109 filter_type = 'scalar' 

110 date = 'InstanceCreateTime' 

111 dimension = 'DBInstanceIdentifier' 

112 cfn_type = config_type = 'AWS::RDS::DBInstance' 

113 arn = 'DBInstanceArn' 

114 universal_taggable = True 

115 default_report_fields = ( 

116 'DBInstanceIdentifier', 

117 'DBName', 

118 'Engine', 

119 'EngineVersion', 

120 'MultiAZ', 

121 'AllocatedStorage', 

122 'StorageEncrypted', 

123 'PubliclyAccessible', 

124 'InstanceCreateTime', 

125 ) 

126 permissions_enum = ('rds:DescribeDBInstances',) 

127 

128 filter_registry = filters 

129 action_registry = actions 

130 

131 def resources(self, query=None): 

132 if query is None and 'query' in self.data: 

133 query = merge_dict_list(self.data['query']) 

134 elif query is None: 

135 query = {} 

136 return super(RDS, self).resources(query=query) 

137 

138 source_mapping = { 

139 'describe': DescribeRDS, 

140 'config': ConfigRDS 

141 } 

142 

143 

144def _db_instance_eligible_for_backup(resource): 

145 db_instance_id = resource['DBInstanceIdentifier'] 

146 

147 # Database instance is not in available state 

148 if resource.get('DBInstanceStatus', '') != 'available': 

149 log.debug( 

150 "DB instance %s is not in available state", 

151 db_instance_id) 

152 return False 

153 # The specified DB Instance is a member of a cluster and its 

154 # backup retention should not be modified directly. Instead, 

155 # modify the backup retention of the cluster using the 

156 # ModifyDbCluster API 

157 if resource.get('DBClusterIdentifier', ''): 

158 log.debug( 

159 "DB instance %s is a cluster member", 

160 db_instance_id) 

161 return False 

162 # DB Backups not supported on a read replica for engine postgres 

163 if (resource.get('ReadReplicaSourceDBInstanceIdentifier', '') and 

164 resource.get('Engine', '') == 'postgres'): 

165 log.debug( 

166 "DB instance %s is a postgres read-replica", 

167 db_instance_id) 

168 return False 

169 # DB Backups not supported on a read replica running a mysql 

170 # version before 5.6 

171 if (resource.get('ReadReplicaSourceDBInstanceIdentifier', '') and 

172 resource.get('Engine', '') == 'mysql'): 

173 engine_version = resource.get('EngineVersion', '') 

174 # Assume "<major>.<minor>.<whatever>" 

175 match = re.match(r'(?P<major>\d+)\.(?P<minor>\d+)\..*', engine_version) 

176 if (match and int(match.group('major')) < 5 or 

177 (int(match.group('major')) == 5 and int(match.group('minor')) < 6)): 

178 log.debug( 

179 "DB instance %s is a version %s mysql read-replica", 

180 db_instance_id, 

181 engine_version) 

182 return False 

183 return True 

184 

185 

186def _db_instance_eligible_for_final_snapshot(resource): 

187 status = resource.get('DBInstanceStatus', '') 

188 # If the DB instance you are deleting has a status of "Creating," 

189 # you will not be able to have a final DB snapshot taken 

190 # If the DB instance is in a failure state with a status of "failed," 

191 # "incompatible-restore," or "incompatible-network," you can only delete 

192 # the instance when the SkipFinalSnapshot parameter is set to "true." 

193 eligible_for_final_snapshot = True 

194 if status in ['creating', 'failed', 'incompatible-restore', 'incompatible-network']: 

195 eligible_for_final_snapshot = False 

196 

197 # FinalDBSnapshotIdentifier can not be specified when deleting a 

198 # replica instance 

199 if resource.get('ReadReplicaSourceDBInstanceIdentifier', ''): 

200 eligible_for_final_snapshot = False 

201 

202 # if it's a rds-cluster, don't try to run the rds instance snapshot api call 

203 if resource.get('DBClusterIdentifier', False): 

204 eligible_for_final_snapshot = False 

205 

206 if not eligible_for_final_snapshot: 

207 log.debug('DB instance is not eligible for a snapshot:/n %s', resource) 

208 return eligible_for_final_snapshot 

209 

210 

211def _get_available_engine_upgrades(client, major=False): 

212 """Returns all extant rds engine upgrades. 

213 

214 As a nested mapping of engine type to known versions 

215 and their upgrades. 

216 

217 Defaults to minor upgrades, but configurable to major. 

218 

219 Example:: 

220 

221 >>> _get_available_engine_upgrades(client) 

222 { 

223 'oracle-se2': {'12.1.0.2.v2': '12.1.0.2.v5', 

224 '12.1.0.2.v3': '12.1.0.2.v5'}, 

225 'postgres': {'9.3.1': '9.3.14', 

226 '9.3.10': '9.3.14', 

227 '9.3.12': '9.3.14', 

228 '9.3.2': '9.3.14'} 

229 } 

230 """ 

231 results = {} 

232 paginator = client.get_paginator('describe_db_engine_versions') 

233 for page in paginator.paginate(): 

234 engine_versions = page['DBEngineVersions'] 

235 for v in engine_versions: 

236 if v['Engine'] not in results: 

237 results[v['Engine']] = {} 

238 if 'ValidUpgradeTarget' not in v or len(v['ValidUpgradeTarget']) == 0: 

239 continue 

240 for t in v['ValidUpgradeTarget']: 

241 if not major and t['IsMajorVersionUpgrade']: 

242 continue 

243 if LooseVersion(t['EngineVersion']) > LooseVersion( 

244 results[v['Engine']].get(v['EngineVersion'], '0.0.0')): 

245 results[v['Engine']][v['EngineVersion']] = t['EngineVersion'] 

246 return results 

247 

248 

249filters.register('offhour', OffHour) 

250filters.register('onhour', OnHour) 

251 

252 

253@filters.register('default-vpc') 

254class DefaultVpc(net_filters.DefaultVpcBase): 

255 """ Matches if an rds database is in the default vpc 

256 

257 :example: 

258 

259 .. code-block:: yaml 

260 

261 policies: 

262 - name: default-vpc-rds 

263 resource: rds 

264 filters: 

265 - type: default-vpc 

266 """ 

267 schema = type_schema('default-vpc') 

268 

269 def __call__(self, rdb): 

270 return self.match(rdb['DBSubnetGroup']['VpcId']) 

271 

272 

273@filters.register('security-group') 

274class SecurityGroupFilter(net_filters.SecurityGroupFilter): 

275 

276 RelatedIdsExpression = "VpcSecurityGroups[].VpcSecurityGroupId" 

277 

278 

279@filters.register('subnet') 

280class SubnetFilter(net_filters.SubnetFilter): 

281 

282 RelatedIdsExpression = "DBSubnetGroup.Subnets[].SubnetIdentifier" 

283 

284 

285@filters.register('vpc') 

286class VpcFilter(net_filters.VpcFilter): 

287 

288 RelatedIdsExpression = "DBSubnetGroup.VpcId" 

289 

290 

291filters.register('network-location', net_filters.NetworkLocation) 

292 

293 

294@filters.register('kms-alias') 

295class KmsKeyAlias(ResourceKmsKeyAlias): 

296 

297 def process(self, dbs, event=None): 

298 return self.get_matching_aliases(dbs) 

299 

300 

301@actions.register('auto-patch') 

302class AutoPatch(BaseAction): 

303 """Toggle AutoMinorUpgrade flag on RDS instance 

304 

305 'window' parameter needs to be in the format 'ddd:hh:mm-ddd:hh:mm' and 

306 have at least 30 minutes between start & end time. 

307 If 'window' is not specified, AWS will assign a random maintenance window 

308 to each instance selected. 

309 

310 :example: 

311 

312 .. code-block:: yaml 

313 

314 policies: 

315 - name: enable-rds-autopatch 

316 resource: rds 

317 filters: 

318 - AutoMinorVersionUpgrade: false 

319 actions: 

320 - type: auto-patch 

321 minor: true 

322 window: Mon:23:00-Tue:01:00 

323 """ 

324 

325 schema = type_schema( 

326 'auto-patch', 

327 minor={'type': 'boolean'}, window={'type': 'string'}) 

328 permissions = ('rds:ModifyDBInstance',) 

329 

330 def process(self, dbs): 

331 client = local_session( 

332 self.manager.session_factory).client('rds') 

333 

334 params = {'AutoMinorVersionUpgrade': self.data.get('minor', True)} 

335 if self.data.get('window'): 

336 params['PreferredMaintenanceWindow'] = self.data['window'] 

337 

338 for db in dbs: 

339 client.modify_db_instance( 

340 DBInstanceIdentifier=db['DBInstanceIdentifier'], 

341 **params) 

342 

343 

344@filters.register('upgrade-available') 

345class UpgradeAvailable(Filter): 

346 """ Scan DB instances for available engine upgrades 

347 

348 This will pull DB instances & check their specific engine for any 

349 engine version with higher release numbers than the current one 

350 

351 This will also annotate the rds instance with 'target_engine' which is 

352 the most recent version of the engine available 

353 

354 :example: 

355 

356 .. code-block:: yaml 

357 

358 policies: 

359 - name: rds-upgrade-available 

360 resource: rds 

361 filters: 

362 - type: upgrade-available 

363 major: False 

364 

365 """ 

366 

367 schema = type_schema('upgrade-available', 

368 major={'type': 'boolean'}, 

369 value={'type': 'boolean'}) 

370 permissions = ('rds:DescribeDBEngineVersions',) 

371 

372 def process(self, resources, event=None): 

373 client = local_session(self.manager.session_factory).client('rds') 

374 check_upgrade_extant = self.data.get('value', True) 

375 check_major = self.data.get('major', False) 

376 engine_upgrades = _get_available_engine_upgrades( 

377 client, major=check_major) 

378 results = [] 

379 

380 for r in resources: 

381 target_upgrade = engine_upgrades.get( 

382 r['Engine'], {}).get(r['EngineVersion']) 

383 if target_upgrade is None: 

384 if check_upgrade_extant is False: 

385 results.append(r) 

386 continue 

387 r['c7n-rds-engine-upgrade'] = target_upgrade 

388 results.append(r) 

389 return results 

390 

391 

392@actions.register('upgrade') 

393class UpgradeMinor(BaseAction): 

394 """Upgrades a RDS instance to the latest major/minor version available 

395 

396 Use of the 'immediate' flag (default False) will automatically upgrade 

397 the RDS engine disregarding the existing maintenance window. 

398 

399 :example: 

400 

401 .. code-block:: yaml 

402 

403 policies: 

404 - name: upgrade-rds-minor 

405 resource: rds 

406 actions: 

407 - type: upgrade 

408 major: False 

409 immediate: False 

410 

411 """ 

412 

413 schema = type_schema( 

414 'upgrade', 

415 major={'type': 'boolean'}, 

416 immediate={'type': 'boolean'}) 

417 permissions = ('rds:ModifyDBInstance',) 

418 

419 def process(self, resources): 

420 client = local_session(self.manager.session_factory).client('rds') 

421 engine_upgrades = None 

422 for r in resources: 

423 if 'EngineVersion' in r['PendingModifiedValues']: 

424 # Upgrade has already been scheduled 

425 continue 

426 if 'c7n-rds-engine-upgrade' not in r: 

427 if engine_upgrades is None: 

428 engine_upgrades = _get_available_engine_upgrades( 

429 client, major=self.data.get('major', False)) 

430 target = engine_upgrades.get( 

431 r['Engine'], {}).get(r['EngineVersion']) 

432 if target is None: 

433 log.debug( 

434 "implicit filter no upgrade on %s", 

435 r['DBInstanceIdentifier']) 

436 continue 

437 r['c7n-rds-engine-upgrade'] = target 

438 client.modify_db_instance( 

439 DBInstanceIdentifier=r['DBInstanceIdentifier'], 

440 EngineVersion=r['c7n-rds-engine-upgrade'], 

441 ApplyImmediately=self.data.get('immediate', False)) 

442 

443 

444@actions.register('tag-trim') 

445class TagTrim(tags.TagTrim): 

446 

447 permissions = ('rds:RemoveTagsFromResource',) 

448 

449 def process_tag_removal(self, client, resource, candidates): 

450 client.remove_tags_from_resource(ResourceName=resource['DBInstanceArn'], TagKeys=candidates) 

451 

452 

453START_STOP_ELIGIBLE_ENGINES = { 

454 'postgres', 'sqlserver-ee', 

455 'oracle-se2', 'mariadb', 'oracle-ee', 

456 'sqlserver-ex', 'sqlserver-se', 'oracle-se', 

457 'mysql', 'oracle-se1', 'sqlserver-web', 

458 'db2-ae', 'db2-se', 'oracle-ee-cdb', 

459 'sqlserver-ee', 'oracle-se2-cdb'} 

460 

461 

462def _eligible_start_stop(db, state="available"): 

463 # See conditions noted here 

464 # https://docs.aws.amazon.com/AmazonRDS/latest/UserGuide/USER_StopInstance.html 

465 # Note that this doesn't really specify what happens for all the nosql engines 

466 # that are available as rds engines. 

467 if db.get('DBInstanceStatus') != state: 

468 return False 

469 

470 if db.get('MultiAZ') and db['Engine'].startswith('sqlserver-'): 

471 return False 

472 

473 if db['Engine'] not in START_STOP_ELIGIBLE_ENGINES: 

474 return False 

475 

476 if db.get('ReadReplicaDBInstanceIdentifiers'): 

477 return False 

478 

479 if db.get('ReadReplicaSourceDBInstanceIdentifier'): 

480 return False 

481 

482 # TODO is SQL Server mirror is detectable. 

483 return True 

484 

485 

486@actions.register('stop') 

487class Stop(BaseAction): 

488 """Stop an rds instance. 

489 

490 https://docs.aws.amazon.com/AmazonRDS/latest/UserGuide/USER_StopInstance.html 

491 """ 

492 

493 schema = type_schema('stop') 

494 

495 permissions = ("rds:StopDBInstance",) 

496 

497 def process(self, resources): 

498 client = local_session(self.manager.session_factory).client('rds') 

499 for r in filter(_eligible_start_stop, resources): 

500 try: 

501 client.stop_db_instance( 

502 DBInstanceIdentifier=r['DBInstanceIdentifier']) 

503 except ClientError as e: 

504 log.exception( 

505 "Error stopping db instance:%s err:%s", 

506 r['DBInstanceIdentifier'], e) 

507 

508 

509@actions.register('start') 

510class Start(BaseAction): 

511 """Start an rds instance. 

512 """ 

513 

514 schema = type_schema('start') 

515 

516 permissions = ("rds:StartDBInstance",) 

517 

518 def process(self, resources): 

519 client = local_session(self.manager.session_factory).client('rds') 

520 start_filter = functools.partial(_eligible_start_stop, state='stopped') 

521 for r in filter(start_filter, resources): 

522 try: 

523 client.start_db_instance( 

524 DBInstanceIdentifier=r['DBInstanceIdentifier']) 

525 except ClientError as e: 

526 log.exception( 

527 "Error starting db instance:%s err:%s", 

528 r['DBInstanceIdentifier'], e) 

529 

530 

531@actions.register('delete') 

532class Delete(BaseAction): 

533 """Deletes selected RDS instances 

534 

535 This will delete RDS instances. It is recommended to apply with a filter 

536 to avoid deleting all RDS instances in the account. 

537 

538 :example: 

539 

540 .. code-block:: yaml 

541 

542 policies: 

543 - name: rds-delete 

544 resource: rds 

545 filters: 

546 - default-vpc 

547 actions: 

548 - type: delete 

549 skip-snapshot: true 

550 """ 

551 

552 schema = type_schema('delete', **{ 

553 'skip-snapshot': {'type': 'boolean'}, 

554 'copy-restore-info': {'type': 'boolean'} 

555 }) 

556 

557 permissions = ('rds:DeleteDBInstance', 'rds:AddTagsToResource') 

558 

559 def validate(self): 

560 if self.data.get('skip-snapshot', False) and self.data.get( 

561 'copy-restore-info'): 

562 raise PolicyValidationError( 

563 "skip-snapshot cannot be specified with copy-restore-info on %s" % ( 

564 self.manager.data,)) 

565 return self 

566 

567 def process(self, dbs): 

568 skip = self.data.get('skip-snapshot', False) 

569 # Can't delete an instance in an aurora cluster, use a policy on the cluster 

570 dbs = [r for r in dbs if not r.get('DBClusterIdentifier')] 

571 # Concurrency feels like overkill here. 

572 client = local_session(self.manager.session_factory).client('rds') 

573 for db in dbs: 

574 params = dict( 

575 DBInstanceIdentifier=db['DBInstanceIdentifier']) 

576 if skip or not _db_instance_eligible_for_final_snapshot(db): 

577 params['SkipFinalSnapshot'] = True 

578 else: 

579 params['FinalDBSnapshotIdentifier'] = snapshot_identifier( 

580 'Final', db['DBInstanceIdentifier']) 

581 if self.data.get('copy-restore-info', False): 

582 self.copy_restore_info(client, db) 

583 if not db['CopyTagsToSnapshot']: 

584 client.modify_db_instance( 

585 DBInstanceIdentifier=db['DBInstanceIdentifier'], 

586 CopyTagsToSnapshot=True) 

587 self.log.info( 

588 "Deleting rds: %s snapshot: %s", 

589 db['DBInstanceIdentifier'], 

590 params.get('FinalDBSnapshotIdentifier', False)) 

591 

592 try: 

593 client.delete_db_instance(**params) 

594 except ClientError as e: 

595 if e.response['Error']['Code'] == "InvalidDBInstanceState": 

596 continue 

597 raise 

598 

599 return dbs 

600 

601 def copy_restore_info(self, client, instance): 

602 tags = [] 

603 tags.append({ 

604 'Key': 'VPCSecurityGroups', 

605 'Value': ''.join([ 

606 g['VpcSecurityGroupId'] for g in instance['VpcSecurityGroups'] 

607 ])}) 

608 tags.append({ 

609 'Key': 'OptionGroupName', 

610 'Value': instance['OptionGroupMemberships'][0]['OptionGroupName']}) 

611 tags.append({ 

612 'Key': 'ParameterGroupName', 

613 'Value': instance['DBParameterGroups'][0]['DBParameterGroupName']}) 

614 tags.append({ 

615 'Key': 'InstanceClass', 

616 'Value': instance['DBInstanceClass']}) 

617 tags.append({ 

618 'Key': 'StorageType', 

619 'Value': instance['StorageType']}) 

620 tags.append({ 

621 'Key': 'MultiAZ', 

622 'Value': str(instance['MultiAZ'])}) 

623 tags.append({ 

624 'Key': 'DBSubnetGroupName', 

625 'Value': instance['DBSubnetGroup']['DBSubnetGroupName']}) 

626 client.add_tags_to_resource( 

627 ResourceName=self.manager.generate_arn( 

628 instance['DBInstanceIdentifier']), 

629 Tags=tags) 

630 

631 

632@actions.register('set-snapshot-copy-tags') 

633class CopySnapshotTags(BaseAction): 

634 """Enables copying tags from rds instance to snapshot 

635 

636 DEPRECATED - use modify-db instead with `CopyTagsToSnapshot` 

637 

638 :example: 

639 

640 .. code-block:: yaml 

641 

642 policies: 

643 - name: enable-rds-snapshot-tags 

644 resource: rds 

645 filters: 

646 - type: value 

647 key: Engine 

648 value: aurora 

649 op: eq 

650 actions: 

651 - type: set-snapshot-copy-tags 

652 enable: True 

653 """ 

654 deprecations = ( 

655 deprecated.action("use modify-db instead with `CopyTagsToSnapshot`"), 

656 ) 

657 

658 schema = type_schema( 

659 'set-snapshot-copy-tags', 

660 enable={'type': 'boolean'}) 

661 permissions = ('rds:ModifyDBInstance',) 

662 

663 def process(self, resources): 

664 error = None 

665 with self.executor_factory(max_workers=2) as w: 

666 futures = {} 

667 client = local_session(self.manager.session_factory).client('rds') 

668 resources = [r for r in resources 

669 if r['CopyTagsToSnapshot'] != self.data.get('enable', True)] 

670 for r in resources: 

671 futures[w.submit(self.set_snapshot_tags, client, r)] = r 

672 for f in as_completed(futures): 

673 if f.exception(): 

674 error = f.exception() 

675 self.log.error( 

676 'error updating rds:%s CopyTagsToSnapshot \n %s', 

677 futures[f]['DBInstanceIdentifier'], error) 

678 if error: 

679 raise error 

680 return resources 

681 

682 def set_snapshot_tags(self, client, r): 

683 self.manager.retry( 

684 client.modify_db_instance, 

685 DBInstanceIdentifier=r['DBInstanceIdentifier'], 

686 CopyTagsToSnapshot=self.data.get('enable', True)) 

687 

688 

689@RDS.action_registry.register('post-finding') 

690class DbInstanceFinding(PostFinding): 

691 

692 resource_type = 'AwsRdsDbInstance' 

693 

694 def format_resource(self, r): 

695 

696 fields = [ 

697 'AssociatedRoles', 'CACertificateIdentifier', 'DBClusterIdentifier', 

698 'DBInstanceIdentifier', 'DBInstanceClass', 'DbInstancePort', 'DbiResourceId', 

699 'DBName', 'DeletionProtection', 'Endpoint', 'Engine', 'EngineVersion', 

700 'IAMDatabaseAuthenticationEnabled', 'InstanceCreateTime', 'KmsKeyId', 

701 'PubliclyAccessible', 'StorageEncrypted', 

702 'TdeCredentialArn', 'VpcSecurityGroups', 'MultiAz', 'EnhancedMonitoringResourceArn', 

703 'DbInstanceStatus', 'MasterUsername', 

704 'AllocatedStorage', 'PreferredBackupWindow', 'BackupRetentionPeriod', 

705 'DbSecurityGroups', 'DbParameterGroups', 

706 'AvailabilityZone', 'DbSubnetGroup', 'PreferredMaintenanceWindow', 

707 'PendingModifiedValues', 'LatestRestorableTime', 

708 'AutoMinorVersionUpgrade', 'ReadReplicaSourceDBInstanceIdentifier', 

709 'ReadReplicaDBInstanceIdentifiers', 

710 'ReadReplicaDBClusterIdentifiers', 'LicenseModel', 'Iops', 'OptionGroupMemberships', 

711 'CharacterSetName', 

712 'SecondaryAvailabilityZone', 'StatusInfos', 'StorageType', 'DomainMemberships', 

713 'CopyTagsToSnapshot', 

714 'MonitoringInterval', 'MonitoringRoleArn', 'PromotionTier', 'Timezone', 

715 'PerformanceInsightsEnabled', 

716 'PerformanceInsightsKmsKeyId', 'PerformanceInsightsRetentionPeriod', 

717 'EnabledCloudWatchLogsExports', 

718 'ProcessorFeatures', 'ListenerEndpoint', 'MaxAllocatedStorage' 

719 ] 

720 details = {} 

721 for f in fields: 

722 if r.get(f): 

723 value = r[f] 

724 if isinstance(r[f], datetime.datetime): 

725 value = r[f].isoformat() 

726 details.setdefault(f, value) 

727 

728 db_instance = { 

729 'Type': self.resource_type, 

730 'Id': r['DBInstanceArn'], 

731 'Region': self.manager.config.region, 

732 'Tags': {t['Key']: t['Value'] for t in r.get('Tags', [])}, 

733 'Details': {self.resource_type: filter_empty(details)}, 

734 } 

735 db_instance = filter_empty(db_instance) 

736 return db_instance 

737 

738 

739@actions.register('snapshot') 

740class Snapshot(BaseAction): 

741 """Creates a manual snapshot of a RDS instance 

742 

743 :example: 

744 

745 .. code-block:: yaml 

746 

747 policies: 

748 - name: rds-snapshot 

749 resource: rds 

750 actions: 

751 - snapshot 

752 """ 

753 

754 schema = type_schema('snapshot') 

755 permissions = ('rds:CreateDBSnapshot',) 

756 

757 def process(self, dbs): 

758 with self.executor_factory(max_workers=3) as w: 

759 futures = [] 

760 for db in dbs: 

761 futures.append(w.submit( 

762 self.process_rds_snapshot, 

763 db)) 

764 for f in as_completed(futures): 

765 if f.exception(): 

766 self.log.error( 

767 "Exception creating rds snapshot \n %s", 

768 f.exception()) 

769 return dbs 

770 

771 def process_rds_snapshot(self, resource): 

772 if not _db_instance_eligible_for_backup(resource): 

773 return 

774 

775 c = local_session(self.manager.session_factory).client('rds') 

776 c.create_db_snapshot( 

777 DBSnapshotIdentifier=snapshot_identifier( 

778 self.data.get('snapshot-prefix', 'Backup'), 

779 resource['DBInstanceIdentifier']), 

780 DBInstanceIdentifier=resource['DBInstanceIdentifier']) 

781 

782 

783@actions.register('resize') 

784class ResizeInstance(BaseAction): 

785 """Change the allocated storage of an rds instance. 

786 

787 :example: 

788 

789 This will find databases using over 85% of their allocated 

790 storage, and resize them to have an additional 30% storage 

791 the resize here is async during the next maintenance. 

792 

793 .. code-block:: yaml 

794 

795 policies: 

796 - name: rds-resize-up 

797 resource: rds 

798 filters: 

799 - type: metrics 

800 name: FreeStorageSpace 

801 percent-attr: AllocatedStorage 

802 attr-multiplier: 1073741824 

803 value: 90 

804 op: greater-than 

805 actions: 

806 - type: resize 

807 percent: 30 

808 

809 

810 This will find databases using under 20% of their allocated 

811 storage, and resize them to be 30% smaller, the resize here 

812 is configured to be immediate. 

813 

814 .. code-block:: yaml 

815 

816 policies: 

817 - name: rds-resize-down 

818 resource: rds 

819 filters: 

820 - type: metrics 

821 name: FreeStorageSpace 

822 percent-attr: AllocatedStorage 

823 attr-multiplier: 1073741824 

824 value: 90 

825 op: greater-than 

826 actions: 

827 - type: resize 

828 percent: -30 

829 immediate: true 

830 """ 

831 schema = type_schema( 

832 'resize', 

833 percent={'type': 'number'}, 

834 immediate={'type': 'boolean'}) 

835 

836 permissions = ('rds:ModifyDBInstance',) 

837 

838 def process(self, resources): 

839 c = local_session(self.manager.session_factory).client('rds') 

840 for r in resources: 

841 old_val = D(r['AllocatedStorage']) 

842 _100 = D(100) 

843 new_val = ((_100 + D(self.data['percent'])) / _100) * old_val 

844 rounded = int(new_val.quantize(D('0'), ROUND_HALF_UP)) 

845 c.modify_db_instance( 

846 DBInstanceIdentifier=r['DBInstanceIdentifier'], 

847 AllocatedStorage=rounded, 

848 ApplyImmediately=self.data.get('immediate', False)) 

849 

850 

851@actions.register('retention') 

852class RetentionWindow(BaseAction): 

853 """ 

854 Sets the 'BackupRetentionPeriod' value for automated snapshots, 

855 enforce (min, max, exact) sets retention days occordingly. 

856 :example: 

857 

858 .. code-block:: yaml 

859 

860 policies: 

861 - name: rds-snapshot-retention 

862 resource: rds 

863 filters: 

864 - type: value 

865 key: BackupRetentionPeriod 

866 value: 7 

867 op: lt 

868 actions: 

869 - type: retention 

870 days: 7 

871 copy-tags: true 

872 enforce: exact 

873 """ 

874 

875 date_attribute = "BackupRetentionPeriod" 

876 schema = type_schema( 

877 'retention', **{'days': {'type': 'number'}, 

878 'copy-tags': {'type': 'boolean'}, 

879 'enforce': {'type': 'string', 'enum': [ 

880 'min', 'max', 'exact']}}) 

881 permissions = ('rds:ModifyDBInstance',) 

882 

883 def process(self, dbs): 

884 with self.executor_factory(max_workers=3) as w: 

885 futures = [] 

886 for db in dbs: 

887 futures.append(w.submit( 

888 self.process_snapshot_retention, 

889 db)) 

890 for f in as_completed(futures): 

891 if f.exception(): 

892 self.log.error( 

893 "Exception setting rds retention \n %s", 

894 f.exception()) 

895 return dbs 

896 

897 def process_snapshot_retention(self, resource): 

898 current_retention = int(resource.get('BackupRetentionPeriod', 0)) 

899 current_copy_tags = resource['CopyTagsToSnapshot'] 

900 new_retention = self.data['days'] 

901 new_copy_tags = self.data.get('copy-tags', True) 

902 retention_type = self.data.get('enforce', 'min').lower() 

903 

904 if ((retention_type == 'min' or 

905 current_copy_tags != new_copy_tags) and 

906 _db_instance_eligible_for_backup(resource)): 

907 self.set_retention_window( 

908 resource, 

909 max(current_retention, new_retention), 

910 new_copy_tags) 

911 return resource 

912 

913 if ((retention_type == 'max' or 

914 current_copy_tags != new_copy_tags) and 

915 _db_instance_eligible_for_backup(resource)): 

916 self.set_retention_window( 

917 resource, 

918 min(current_retention, new_retention), 

919 new_copy_tags) 

920 return resource 

921 

922 if ((retention_type == 'exact' or 

923 current_copy_tags != new_copy_tags) and 

924 _db_instance_eligible_for_backup(resource)): 

925 self.set_retention_window(resource, new_retention, new_copy_tags) 

926 return resource 

927 

928 def set_retention_window(self, resource, retention, copy_tags): 

929 c = local_session(self.manager.session_factory).client('rds') 

930 c.modify_db_instance( 

931 DBInstanceIdentifier=resource['DBInstanceIdentifier'], 

932 BackupRetentionPeriod=retention, 

933 CopyTagsToSnapshot=copy_tags) 

934 

935 

936@actions.register('set-public-access') 

937class RDSSetPublicAvailability(BaseAction): 

938 """ 

939 This action allows for toggling an RDS instance 

940 'PubliclyAccessible' flag to true or false 

941 

942 :example: 

943 

944 .. code-block:: yaml 

945 

946 policies: 

947 - name: disable-rds-public-accessibility 

948 resource: rds 

949 filters: 

950 - PubliclyAccessible: true 

951 actions: 

952 - type: set-public-access 

953 state: false 

954 """ 

955 

956 schema = type_schema( 

957 "set-public-access", 

958 state={'type': 'boolean'}) 

959 permissions = ('rds:ModifyDBInstance',) 

960 

961 def set_accessibility(self, r): 

962 client = local_session(self.manager.session_factory).client('rds') 

963 client.modify_db_instance( 

964 DBInstanceIdentifier=r['DBInstanceIdentifier'], 

965 PubliclyAccessible=self.data.get('state', False)) 

966 

967 def process(self, rds): 

968 with self.executor_factory(max_workers=2) as w: 

969 futures = {w.submit(self.set_accessibility, r): r for r in rds} 

970 for f in as_completed(futures): 

971 if f.exception(): 

972 self.log.error( 

973 "Exception setting public access on %s \n %s", 

974 futures[f]['DBInstanceIdentifier'], f.exception()) 

975 return rds 

976 

977 

978@resources.register('rds-subscription') 

979class RDSSubscription(QueryResourceManager): 

980 

981 class resource_type(TypeInfo): 

982 service = 'rds' 

983 arn_type = 'es' 

984 cfn_type = 'AWS::RDS::EventSubscription' 

985 enum_spec = ( 

986 'describe_event_subscriptions', 'EventSubscriptionsList', None) 

987 name = id = "CustSubscriptionId" 

988 arn = 'EventSubscriptionArn' 

989 date = "SubscriptionCreateTime" 

990 permissions_enum = ('rds:DescribeEventSubscriptions',) 

991 universal_taggable = object() 

992 

993 augment = universal_augment 

994 

995 

996@RDSSubscription.action_registry.register('delete') 

997class RDSSubscriptionDelete(BaseAction): 

998 """Deletes a RDS snapshot resource 

999 

1000 :example: 

1001 

1002 .. code-block:: yaml 

1003 

1004 policies: 

1005 - name: rds-subscription-delete 

1006 resource: rds-subscription 

1007 filters: 

1008 - type: value 

1009 key: CustSubscriptionId 

1010 value: xyz 

1011 actions: 

1012 - delete 

1013 """ 

1014 

1015 schema = type_schema('delete') 

1016 permissions = ('rds:DeleteEventSubscription',) 

1017 

1018 def process(self, resources): 

1019 client = local_session(self.manager.session_factory).client('rds') 

1020 for r in resources: 

1021 self.manager.retry( 

1022 client.delete_event_subscription, SubscriptionName=r['CustSubscriptionId'], 

1023 ignore_err_codes=('SubscriptionNotFoundFault', 

1024 'InvalidEventSubscriptionStateFault')) 

1025 

1026 

1027class DescribeRDSSnapshot(DescribeSource): 

1028 

1029 def get_resources(self, ids, cache=True): 

1030 super_get = super().get_resources 

1031 return list(itertools.chain(*[super_get((i,)) for i in ids])) 

1032 

1033 def augment(self, snaps): 

1034 for s in snaps: 

1035 s['Tags'] = s.pop('TagList', ()) 

1036 return snaps 

1037 

1038 

1039@resources.register('rds-snapshot') 

1040class RDSSnapshot(QueryResourceManager): 

1041 """Resource manager for RDS DB snapshots. 

1042 """ 

1043 

1044 class resource_type(TypeInfo): 

1045 service = 'rds' 

1046 arn_type = 'snapshot' 

1047 arn_separator = ':' 

1048 enum_spec = ('describe_db_snapshots', 'DBSnapshots', None) 

1049 name = id = 'DBSnapshotIdentifier' 

1050 date = 'SnapshotCreateTime' 

1051 config_type = "AWS::RDS::DBSnapshot" 

1052 filter_name = "DBSnapshotIdentifier" 

1053 filter_type = "scalar" 

1054 universal_taggable = True 

1055 permissions_enum = ('rds:DescribeDBSnapshots',) 

1056 

1057 source_mapping = { 

1058 'describe': DescribeRDSSnapshot, 

1059 'config': ConfigSource 

1060 } 

1061 

1062 

1063@RDSSnapshot.filter_registry.register('onhour') 

1064class RDSSnapshotOnHour(OnHour): 

1065 """Scheduled action on rds snapshot.""" 

1066 

1067 

1068@RDSSnapshot.filter_registry.register('instance') 

1069class SnapshotInstance(related.RelatedResourceFilter): 

1070 """Filter snapshots by their database attributes. 

1071 

1072 :example: 

1073 

1074 Find snapshots without an extant database 

1075 

1076 .. code-block:: yaml 

1077 

1078 policies: 

1079 - name: rds-snapshot-orphan 

1080 resource: aws.rds-snapshot 

1081 filters: 

1082 - type: instance 

1083 value: 0 

1084 value_type: resource_count 

1085 """ 

1086 schema = type_schema( 

1087 'instance', rinherit=ValueFilter.schema 

1088 ) 

1089 

1090 RelatedResource = "c7n.resources.rds.RDS" 

1091 RelatedIdsExpression = "DBInstanceIdentifier" 

1092 FetchThreshold = 5 

1093 

1094 

1095@RDSSnapshot.filter_registry.register('latest') 

1096class LatestSnapshot(Filter): 

1097 """Return the latest snapshot for each database. 

1098 """ 

1099 schema = type_schema('latest', automatic={'type': 'boolean'}) 

1100 permissions = ('rds:DescribeDBSnapshots',) 

1101 

1102 def process(self, resources, event=None): 

1103 results = [] 

1104 if not self.data.get('automatic', True): 

1105 resources = [r for r in resources if r['SnapshotType'] == 'manual'] 

1106 for db_identifier, snapshots in itertools.groupby( 

1107 resources, operator.itemgetter('DBInstanceIdentifier')): 

1108 results.append( 

1109 sorted(snapshots, 

1110 key=operator.itemgetter('SnapshotCreateTime'))[-1]) 

1111 return results 

1112 

1113 

1114@RDSSnapshot.filter_registry.register('age') 

1115class RDSSnapshotAge(AgeFilter): 

1116 """Filters RDS snapshots based on age (in days) 

1117 

1118 :example: 

1119 

1120 .. code-block:: yaml 

1121 

1122 policies: 

1123 - name: rds-snapshot-expired 

1124 resource: rds-snapshot 

1125 filters: 

1126 - type: age 

1127 days: 28 

1128 op: ge 

1129 actions: 

1130 - delete 

1131 """ 

1132 

1133 schema = type_schema( 

1134 'age', days={'type': 'number'}, 

1135 op={'$ref': '#/definitions/filters_common/comparison_operators'}) 

1136 

1137 date_attribute = 'SnapshotCreateTime' 

1138 

1139 def get_resource_date(self, i): 

1140 return i.get('SnapshotCreateTime') 

1141 

1142 

1143@RDSSnapshot.action_registry.register('restore') 

1144class RestoreInstance(BaseAction): 

1145 """Restore an rds instance from a snapshot. 

1146 

1147 Note this requires the snapshot or db deletion be taken 

1148 with the `copy-restore-info` boolean flag set to true, as 

1149 various instance metadata is stored on the snapshot as tags. 

1150 

1151 additional parameters to restore db instance api call be overriden 

1152 via `restore_options` settings. various modify db instance parameters 

1153 can be specified via `modify_options` settings. 

1154 """ 

1155 

1156 schema = type_schema( 

1157 'restore', 

1158 restore_options={'type': 'object'}, 

1159 modify_options={'type': 'object'}) 

1160 

1161 permissions = ( 

1162 'rds:ModifyDBInstance', 

1163 'rds:ModifyDBParameterGroup', 

1164 'rds:ModifyOptionGroup', 

1165 'rds:RebootDBInstance', 

1166 'rds:RestoreDBInstanceFromDBSnapshot') 

1167 

1168 poll_period = 60 

1169 restore_keys = { 

1170 'VPCSecurityGroups', 'MultiAZ', 'DBSubnetGroupName', 

1171 'InstanceClass', 'StorageType', 'ParameterGroupName', 

1172 'OptionGroupName'} 

1173 

1174 def validate(self): 

1175 found = False 

1176 for f in self.manager.iter_filters(): 

1177 if isinstance(f, LatestSnapshot): 

1178 found = True 

1179 if not found: 

1180 # do we really need this... 

1181 raise PolicyValidationError( 

1182 "must filter by latest to use restore action %s" % ( 

1183 self.manager.data,)) 

1184 return self 

1185 

1186 def process(self, resources): 

1187 client = local_session(self.manager.session_factory).client('rds') 

1188 # restore up to 10 in parallel, we have to wait on each. 

1189 with self.executor_factory( 

1190 max_workers=min(10, len(resources) or 1)) as w: 

1191 futures = {} 

1192 for r in resources: 

1193 tags = {t['Key']: t['Value'] for t in r['Tags']} 

1194 if not set(tags).issuperset(self.restore_keys): 

1195 self.log.warning( 

1196 "snapshot:%s missing restore tags", 

1197 r['DBSnapshotIdentifier']) 

1198 continue 

1199 futures[w.submit(self.process_instance, client, r)] = r 

1200 for f in as_completed(futures): 

1201 r = futures[f] 

1202 if f.exception(): 

1203 self.log.warning( 

1204 "Error restoring db:%s from:%s error:\n%s", 

1205 r['DBInstanceIdentifier'], r['DBSnapshotIdentifier'], 

1206 f.exception()) 

1207 continue 

1208 

1209 def process_instance(self, client, r): 

1210 params, post_modify = self.get_restore_from_tags(r) 

1211 self.manager.retry( 

1212 client.restore_db_instance_from_db_snapshot, **params) 

1213 waiter = client.get_waiter('db_instance_available') 

1214 # wait up to 40m 

1215 waiter.config.delay = self.poll_period 

1216 waiter.wait(DBInstanceIdentifier=params['DBInstanceIdentifier']) 

1217 self.manager.retry( 

1218 client.modify_db_instance, 

1219 DBInstanceIdentifier=params['DBInstanceIdentifier'], 

1220 ApplyImmediately=True, 

1221 **post_modify) 

1222 self.manager.retry( 

1223 client.reboot_db_instance, 

1224 DBInstanceIdentifier=params['DBInstanceIdentifier'], 

1225 ForceFailover=False) 

1226 

1227 def get_restore_from_tags(self, snapshot): 

1228 params, post_modify = {}, {} 

1229 tags = {t['Key']: t['Value'] for t in snapshot['Tags']} 

1230 

1231 params['DBInstanceIdentifier'] = snapshot['DBInstanceIdentifier'] 

1232 params['DBSnapshotIdentifier'] = snapshot['DBSnapshotIdentifier'] 

1233 params['MultiAZ'] = tags['MultiAZ'] == 'True' and True or False 

1234 params['DBSubnetGroupName'] = tags['DBSubnetGroupName'] 

1235 params['DBInstanceClass'] = tags['InstanceClass'] 

1236 params['CopyTagsToSnapshot'] = True 

1237 params['StorageType'] = tags['StorageType'] 

1238 params['OptionGroupName'] = tags['OptionGroupName'] 

1239 

1240 post_modify['DBParameterGroupName'] = tags['ParameterGroupName'] 

1241 post_modify['VpcSecurityGroupIds'] = tags['VPCSecurityGroups'].split(',') 

1242 

1243 params['Tags'] = [ 

1244 {'Key': k, 'Value': v} for k, v in tags.items() 

1245 if k not in self.restore_keys] 

1246 

1247 params.update(self.data.get('restore_options', {})) 

1248 post_modify.update(self.data.get('modify_options', {})) 

1249 return params, post_modify 

1250 

1251 

1252@RDSSnapshot.filter_registry.register('cross-account') 

1253class CrossAccountAccess(CrossAccountAccessFilter): 

1254 

1255 permissions = ('rds:DescribeDBSnapshotAttributes',) 

1256 attributes_key = 'c7n:attributes' 

1257 annotation_key = 'c7n:CrossAccountViolations' 

1258 

1259 def process(self, resources, event=None): 

1260 self.accounts = self.get_accounts() 

1261 results = [] 

1262 with self.executor_factory(max_workers=2) as w: 

1263 futures = [] 

1264 for resource_set in chunks(resources, 20): 

1265 futures.append(w.submit( 

1266 self.process_resource_set, resource_set)) 

1267 for f in as_completed(futures): 

1268 if f.exception(): 

1269 self.log.error( 

1270 "Exception checking cross account access\n %s" % ( 

1271 f.exception())) 

1272 continue 

1273 results.extend(f.result()) 

1274 return results 

1275 

1276 def process_resource_set(self, resource_set): 

1277 client = local_session(self.manager.session_factory).client('rds') 

1278 results = [] 

1279 for r in resource_set: 

1280 attrs = {t['AttributeName']: t['AttributeValues'] 

1281 for t in self.manager.retry( 

1282 client.describe_db_snapshot_attributes, 

1283 DBSnapshotIdentifier=r['DBSnapshotIdentifier'])[ 

1284 'DBSnapshotAttributesResult']['DBSnapshotAttributes']} 

1285 r[self.attributes_key] = attrs 

1286 shared_accounts = set(attrs.get('restore', [])) 

1287 delta_accounts = shared_accounts.difference(self.accounts) 

1288 if delta_accounts: 

1289 r[self.annotation_key] = list(delta_accounts) 

1290 results.append(r) 

1291 return results 

1292 

1293 

1294@RDSSnapshot.action_registry.register('set-permissions') 

1295class SetPermissions(BaseAction): 

1296 """Set permissions for copying or restoring an RDS snapshot 

1297 

1298 Use the 'add' and 'remove' parameters to control which accounts to 

1299 add or remove, respectively. The default is to remove any 

1300 permissions granted to other AWS accounts. 

1301 

1302 Use `remove: matched` in combination with the `cross-account` filter 

1303 for more flexible removal options such as preserving access for 

1304 a set of whitelisted accounts: 

1305 

1306 :example: 

1307 

1308 .. code-block:: yaml 

1309 

1310 policies: 

1311 - name: rds-snapshot-remove-cross-account 

1312 resource: rds-snapshot 

1313 filters: 

1314 - type: cross-account 

1315 whitelist: 

1316 - '112233445566' 

1317 actions: 

1318 - type: set-permissions 

1319 remove: matched 

1320 """ 

1321 schema = type_schema( 

1322 'set-permissions', 

1323 remove={'oneOf': [ 

1324 {'enum': ['matched']}, 

1325 {'type': 'array', 'items': { 

1326 'oneOf': [ 

1327 {'type': 'string', 'minLength': 12, 'maxLength': 12}, 

1328 {'enum': ['all']}, 

1329 ], 

1330 }} 

1331 ]}, 

1332 add={ 

1333 'type': 'array', 'items': { 

1334 'oneOf': [ 

1335 {'type': 'string', 'minLength': 12, 'maxLength': 12}, 

1336 {'enum': ['all']}, 

1337 ] 

1338 } 

1339 } 

1340 ) 

1341 

1342 permissions = ('rds:ModifyDBSnapshotAttribute',) 

1343 

1344 def validate(self): 

1345 if self.data.get('remove') == 'matched': 

1346 found = False 

1347 for f in self.manager.iter_filters(): 

1348 if isinstance(f, CrossAccountAccessFilter): 

1349 found = True 

1350 break 

1351 if not found: 

1352 raise PolicyValidationError( 

1353 "policy:%s filter:%s with matched requires cross-account filter" % ( 

1354 self.manager.ctx.policy.name, self.type)) 

1355 

1356 def process(self, snapshots): 

1357 client = local_session(self.manager.session_factory).client('rds') 

1358 for s in snapshots: 

1359 self.process_snapshot(client, s) 

1360 

1361 def process_snapshot(self, client, snapshot): 

1362 add_accounts = self.data.get('add', []) 

1363 remove_accounts = self.data.get('remove', []) 

1364 

1365 if not (add_accounts or remove_accounts): 

1366 if CrossAccountAccess.attributes_key not in snapshot: 

1367 attrs = { 

1368 t['AttributeName']: t['AttributeValues'] 

1369 for t in self.manager.retry( 

1370 client.describe_db_snapshot_attributes, 

1371 DBSnapshotIdentifier=snapshot['DBSnapshotIdentifier'] 

1372 )['DBSnapshotAttributesResult']['DBSnapshotAttributes'] 

1373 } 

1374 snapshot[CrossAccountAccess.attributes_key] = attrs 

1375 remove_accounts = snapshot[CrossAccountAccess.attributes_key].get('restore', []) 

1376 elif remove_accounts == 'matched': 

1377 remove_accounts = snapshot.get(CrossAccountAccess.annotation_key, []) 

1378 

1379 if add_accounts or remove_accounts: 

1380 client.modify_db_snapshot_attribute( 

1381 DBSnapshotIdentifier=snapshot['DBSnapshotIdentifier'], 

1382 AttributeName='restore', 

1383 ValuesToRemove=remove_accounts, 

1384 ValuesToAdd=add_accounts) 

1385 

1386 

1387@RDSSnapshot.action_registry.register('region-copy') 

1388class RegionCopySnapshot(BaseAction): 

1389 """Copy a snapshot across regions. 

1390 

1391 Note there is a max in flight for cross region rds snapshots 

1392 of 5 per region. This action will attempt to retry automatically 

1393 for an hr. 

1394 

1395 Example:: 

1396 

1397 - name: copy-encrypted-snapshots 

1398 description: | 

1399 copy snapshots under 1 day old to dr region with kms 

1400 resource: rds-snapshot 

1401 region: us-east-1 

1402 filters: 

1403 - Status: available 

1404 - type: value 

1405 key: SnapshotCreateTime 

1406 value_type: age 

1407 value: 1 

1408 op: less-than 

1409 actions: 

1410 - type: region-copy 

1411 target_region: us-east-2 

1412 target_key: arn:aws:kms:us-east-2:0000:key/cb291f53-c9cf61 

1413 copy_tags: true 

1414 tags: 

1415 OriginRegion: us-east-1 

1416 """ 

1417 

1418 schema = type_schema( 

1419 'region-copy', 

1420 target_region={'type': 'string'}, 

1421 target_key={'type': 'string'}, 

1422 copy_tags={'type': 'boolean'}, 

1423 tags={'type': 'object'}, 

1424 required=('target_region',)) 

1425 

1426 permissions = ('rds:CopyDBSnapshot',) 

1427 min_delay = 120 

1428 max_attempts = 30 

1429 

1430 def validate(self): 

1431 if self.data.get('target_region') and self.manager.data.get('mode'): 

1432 raise PolicyValidationError( 

1433 "cross region snapshot may require waiting for " 

1434 "longer then lambda runtime allows %s" % (self.manager.data,)) 

1435 return self 

1436 

1437 def process(self, resources): 

1438 if self.data['target_region'] == self.manager.config.region: 

1439 self.log.warning( 

1440 "Source and destination region are the same, skipping copy") 

1441 return 

1442 for resource_set in chunks(resources, 20): 

1443 self.process_resource_set(resource_set) 

1444 

1445 def process_resource(self, target, key, tags, snapshot): 

1446 p = {} 

1447 if key: 

1448 p['KmsKeyId'] = key 

1449 p['TargetDBSnapshotIdentifier'] = snapshot[ 

1450 'DBSnapshotIdentifier'].replace(':', '-') 

1451 p['SourceRegion'] = self.manager.config.region 

1452 p['SourceDBSnapshotIdentifier'] = snapshot['DBSnapshotArn'] 

1453 

1454 if self.data.get('copy_tags', True): 

1455 p['CopyTags'] = True 

1456 if tags: 

1457 p['Tags'] = tags 

1458 

1459 retry = get_retry( 

1460 ('SnapshotQuotaExceeded',), 

1461 # TODO make this configurable, class defaults to 1hr 

1462 min_delay=self.min_delay, 

1463 max_attempts=self.max_attempts, 

1464 log_retries=logging.DEBUG) 

1465 try: 

1466 result = retry(target.copy_db_snapshot, **p) 

1467 except ClientError as e: 

1468 if e.response['Error']['Code'] == 'DBSnapshotAlreadyExists': 

1469 self.log.warning( 

1470 "Snapshot %s already exists in target region", 

1471 snapshot['DBSnapshotIdentifier']) 

1472 return 

1473 raise 

1474 snapshot['c7n:CopiedSnapshot'] = result[ 

1475 'DBSnapshot']['DBSnapshotArn'] 

1476 

1477 def process_resource_set(self, resource_set): 

1478 target_client = self.manager.session_factory( 

1479 region=self.data['target_region']).client('rds') 

1480 target_key = self.data.get('target_key') 

1481 tags = [{'Key': k, 'Value': v} for k, v 

1482 in self.data.get('tags', {}).items()] 

1483 

1484 for snapshot_set in chunks(resource_set, 5): 

1485 for r in snapshot_set: 

1486 # If tags are supplied, copy tags are ignored, and 

1487 # we need to augment the tag set with the original 

1488 # resource tags to preserve the common case. 

1489 rtags = tags and list(tags) or None 

1490 if tags and self.data.get('copy_tags', True): 

1491 rtags.extend(r['Tags']) 

1492 self.process_resource(target_client, target_key, rtags, r) 

1493 

1494 

1495@RDSSnapshot.action_registry.register('delete') 

1496class RDSSnapshotDelete(BaseAction): 

1497 """Deletes a RDS snapshot resource 

1498 

1499 :example: 

1500 

1501 .. code-block:: yaml 

1502 

1503 policies: 

1504 - name: rds-snapshot-delete-stale 

1505 resource: rds-snapshot 

1506 filters: 

1507 - type: age 

1508 days: 28 

1509 op: ge 

1510 actions: 

1511 - delete 

1512 """ 

1513 

1514 schema = type_schema('delete') 

1515 permissions = ('rds:DeleteDBSnapshot',) 

1516 

1517 def process(self, snapshots): 

1518 snapshots = self.filter_resources(snapshots, 'SnapshotType', ('manual',)) 

1519 if not snapshots: 

1520 return [] 

1521 log.info("Deleting %d rds snapshots", len(snapshots)) 

1522 with self.executor_factory(max_workers=3) as w: 

1523 futures = [] 

1524 for snapshot_set in chunks(reversed(snapshots), size=50): 

1525 futures.append( 

1526 w.submit(self.process_snapshot_set, snapshot_set)) 

1527 for f in as_completed(futures): 

1528 if f.exception(): 

1529 self.log.error( 

1530 "Exception deleting snapshot set \n %s", 

1531 f.exception()) 

1532 return snapshots 

1533 

1534 def process_snapshot_set(self, snapshots_set): 

1535 c = local_session(self.manager.session_factory).client('rds') 

1536 for s in snapshots_set: 

1537 c.delete_db_snapshot( 

1538 DBSnapshotIdentifier=s['DBSnapshotIdentifier']) 

1539 

1540 

1541@actions.register('modify-security-groups') 

1542class RDSModifyVpcSecurityGroups(ModifyVpcSecurityGroupsAction): 

1543 

1544 permissions = ('rds:ModifyDBInstance', 'rds:ModifyDBCluster') 

1545 vpc_expr = 'DBSubnetGroup.VpcId' 

1546 

1547 def process(self, rds_instances): 

1548 replication_group_map = {} 

1549 client = local_session(self.manager.session_factory).client('rds') 

1550 groups = super(RDSModifyVpcSecurityGroups, self).get_groups( 

1551 rds_instances) 

1552 

1553 # either build map for DB cluster or modify DB instance directly 

1554 for idx, i in enumerate(rds_instances): 

1555 if i.get('DBClusterIdentifier'): 

1556 # build map of Replication Groups to Security Groups 

1557 replication_group_map[i['DBClusterIdentifier']] = groups[idx] 

1558 else: 

1559 client.modify_db_instance( 

1560 DBInstanceIdentifier=i['DBInstanceIdentifier'], 

1561 VpcSecurityGroupIds=groups[idx]) 

1562 

1563 # handle DB cluster, if necessary 

1564 for idx, r in enumerate(replication_group_map.keys()): 

1565 client.modify_db_cluster( 

1566 DBClusterIdentifier=r, 

1567 VpcSecurityGroupIds=replication_group_map[r] 

1568 ) 

1569 

1570 

1571class DescribeSubnetGroup(DescribeSource): 

1572 

1573 def augment(self, resources): 

1574 _db_subnet_group_tags( 

1575 resources, self.manager.session_factory, 

1576 self.manager.executor_factory, self.manager.retry) 

1577 return resources 

1578 

1579 

1580@resources.register('rds-subnet-group') 

1581class RDSSubnetGroup(QueryResourceManager): 

1582 """RDS subnet group.""" 

1583 

1584 class resource_type(TypeInfo): 

1585 service = 'rds' 

1586 arn_type = 'subgrp' 

1587 id = name = 'DBSubnetGroupName' 

1588 arn_separator = ':' 

1589 enum_spec = ( 

1590 'describe_db_subnet_groups', 'DBSubnetGroups', None) 

1591 filter_name = 'DBSubnetGroupName' 

1592 filter_type = 'scalar' 

1593 permissions_enum = ('rds:DescribeDBSubnetGroups',) 

1594 cfn_type = config_type = 'AWS::RDS::DBSubnetGroup' 

1595 universal_taggable = object() 

1596 

1597 source_mapping = { 

1598 'config': ConfigSource, 

1599 'describe': DescribeSubnetGroup 

1600 } 

1601 

1602 

1603def _db_subnet_group_tags(subnet_groups, session_factory, executor_factory, retry): 

1604 client = local_session(session_factory).client('rds') 

1605 

1606 def process_tags(g): 

1607 try: 

1608 g['Tags'] = client.list_tags_for_resource( 

1609 ResourceName=g['DBSubnetGroupArn'])['TagList'] 

1610 return g 

1611 except client.exceptions.DBSubnetGroupNotFoundFault: 

1612 return None 

1613 

1614 return list(filter(None, map(process_tags, subnet_groups))) 

1615 

1616 

1617@RDSSubnetGroup.action_registry.register('delete') 

1618class RDSSubnetGroupDeleteAction(BaseAction): 

1619 """Action to delete RDS Subnet Group 

1620 

1621 It is recommended to apply a filter to the delete policy to avoid unwanted 

1622 deletion of any rds subnet groups. 

1623 

1624 :example: 

1625 

1626 .. code-block:: yaml 

1627 

1628 policies: 

1629 - name: rds-subnet-group-delete 

1630 resource: rds-subnet-group 

1631 filters: 

1632 - Instances: [] 

1633 actions: 

1634 - delete 

1635 """ 

1636 

1637 schema = type_schema('delete') 

1638 permissions = ('rds:DeleteDBSubnetGroup',) 

1639 

1640 def process(self, subnet_group): 

1641 with self.executor_factory(max_workers=2) as w: 

1642 list(w.map(self.process_subnetgroup, subnet_group)) 

1643 

1644 def process_subnetgroup(self, subnet_group): 

1645 client = local_session(self.manager.session_factory).client('rds') 

1646 client.delete_db_subnet_group(DBSubnetGroupName=subnet_group['DBSubnetGroupName']) 

1647 

1648 

1649@RDSSubnetGroup.filter_registry.register('unused') 

1650class UnusedRDSSubnetGroup(Filter): 

1651 """Filters all launch rds subnet groups that are not in use but exist 

1652 

1653 :example: 

1654 

1655 .. code-block:: yaml 

1656 

1657 policies: 

1658 - name: rds-subnet-group-delete-unused 

1659 resource: rds-subnet-group 

1660 filters: 

1661 - unused 

1662 """ 

1663 

1664 schema = type_schema('unused') 

1665 

1666 def get_permissions(self): 

1667 return self.manager.get_resource_manager('rds').get_permissions() 

1668 

1669 def process(self, configs, event=None): 

1670 rds = self.manager.get_resource_manager('rds').resources() 

1671 self.used = set(jmespath_search('[].DBSubnetGroup.DBSubnetGroupName', rds)) 

1672 self.used.update(set(jmespath_search('[].DBSubnetGroup.DBSubnetGroupName', 

1673 self.manager.get_resource_manager('rds-cluster').resources(augment=False)))) 

1674 return super(UnusedRDSSubnetGroup, self).process(configs) 

1675 

1676 def __call__(self, config): 

1677 return config['DBSubnetGroupName'] not in self.used 

1678 

1679 

1680@filters.register('db-parameter') 

1681class ParameterFilter(ValueFilter): 

1682 """ 

1683 Applies value type filter on set db parameter values. 

1684 :example: 

1685 

1686 .. code-block:: yaml 

1687 

1688 policies: 

1689 - name: rds-pg 

1690 resource: rds 

1691 filters: 

1692 - type: db-parameter 

1693 key: someparam 

1694 op: eq 

1695 value: someval 

1696 """ 

1697 

1698 schema = type_schema('db-parameter', rinherit=ValueFilter.schema) 

1699 schema_alias = False 

1700 permissions = ('rds:DescribeDBInstances', 'rds:DescribeDBParameters', ) 

1701 policy_annotation = 'c7n:MatchedDBParameter' 

1702 

1703 @staticmethod 

1704 def recast(val, datatype): 

1705 """ Re-cast the value based upon an AWS supplied datatype 

1706 and treat nulls sensibly. 

1707 """ 

1708 ret_val = val 

1709 if datatype == 'string': 

1710 ret_val = str(val) 

1711 elif datatype == 'boolean': 

1712 # AWS returns 1s and 0s for boolean for most of the cases 

1713 if val.isdigit(): 

1714 ret_val = bool(int(val)) 

1715 # AWS returns 'TRUE,FALSE' for Oracle engine 

1716 elif val == 'TRUE': 

1717 ret_val = True 

1718 elif val == 'FALSE': 

1719 ret_val = False 

1720 elif datatype == 'integer': 

1721 if val.isdigit(): 

1722 ret_val = int(val) 

1723 elif datatype == 'float': 

1724 ret_val = float(val) if val else 0.0 

1725 

1726 return ret_val 

1727 

1728 # Private method for 'DBParameterGroupName' paginator 

1729 def _get_param_list(self, pg): 

1730 client = local_session(self.manager.session_factory).client('rds') 

1731 paginator = client.get_paginator('describe_db_parameters') 

1732 param_list = list(itertools.chain(*[p['Parameters'] 

1733 for p in paginator.paginate(DBParameterGroupName=pg)])) 

1734 return param_list 

1735 

1736 def handle_paramgroup_cache(self, param_groups): 

1737 pgcache = {} 

1738 cache = self.manager._cache 

1739 

1740 with cache: 

1741 for pg in param_groups: 

1742 cache_key = { 

1743 'region': self.manager.config.region, 

1744 'account_id': self.manager.config.account_id, 

1745 'rds-pg': pg} 

1746 pg_values = cache.get(cache_key) 

1747 if pg_values is not None: 

1748 pgcache[pg] = pg_values 

1749 continue 

1750 param_list = self._get_param_list(pg) 

1751 pgcache[pg] = { 

1752 p['ParameterName']: self.recast(p['ParameterValue'], p['DataType']) 

1753 for p in param_list if 'ParameterValue' in p} 

1754 cache.save(cache_key, pgcache[pg]) 

1755 return pgcache 

1756 

1757 def process(self, resources, event=None): 

1758 results = [] 

1759 parameter_group_list = {db['DBParameterGroups'][0]['DBParameterGroupName'] 

1760 for db in resources} 

1761 paramcache = self.handle_paramgroup_cache(parameter_group_list) 

1762 for resource in resources: 

1763 for pg in resource['DBParameterGroups']: 

1764 pg_values = paramcache[pg['DBParameterGroupName']] 

1765 if self.match(pg_values): 

1766 resource.setdefault(self.policy_annotation, []).append( 

1767 self.data.get('key')) 

1768 results.append(resource) 

1769 break 

1770 return results 

1771 

1772 

1773@actions.register('modify-db') 

1774class ModifyDb(BaseAction): 

1775 """Modifies an RDS instance based on specified parameter 

1776 using ModifyDbInstance. 

1777 

1778 'Update' is an array with with key value pairs that should be set to 

1779 the property and value you wish to modify. 

1780 'Immediate" determines whether the modification is applied immediately 

1781 or not. If 'immediate' is not specified, default is false. 

1782 

1783 :example: 

1784 

1785 .. code-block:: yaml 

1786 

1787 policies: 

1788 - name: disable-rds-deletion-protection 

1789 resource: rds 

1790 filters: 

1791 - DeletionProtection: true 

1792 - PubliclyAccessible: true 

1793 actions: 

1794 - type: modify-db 

1795 update: 

1796 - property: 'DeletionProtection' 

1797 value: false 

1798 - property: 'PubliclyAccessible' 

1799 value: false 

1800 immediate: true 

1801 """ 

1802 

1803 schema = type_schema( 

1804 'modify-db', 

1805 immediate={"type": 'boolean'}, 

1806 update={ 

1807 'type': 'array', 

1808 'items': { 

1809 'type': 'object', 

1810 'properties': { 

1811 'property': {'type': 'string', 'enum': [ 

1812 'AllocatedStorage', 

1813 'DBInstanceClass', 

1814 'DBSubnetGroupName', 

1815 'DBSecurityGroups', 

1816 'VpcSecurityGroupIds', 

1817 'MasterUserPassword', 

1818 'DBParameterGroupName', 

1819 'BackupRetentionPeriod', 

1820 'PreferredBackupWindow', 

1821 'PreferredMaintenanceWindow', 

1822 'MultiAZ', 

1823 'EngineVersion', 

1824 'AllowMajorVersionUpgrade', 

1825 'AutoMinorVersionUpgrade', 

1826 'LicenseModel', 

1827 'Iops', 

1828 'OptionGroupName', 

1829 'NewDBInstanceIdentifier', 

1830 'StorageType', 

1831 'TdeCredentialArn', 

1832 'TdeCredentialPassword', 

1833 'CACertificateIdentifier', 

1834 'Domain', 

1835 'CopyTagsToSnapshot', 

1836 'MonitoringInterval', 

1837 'MonitoringRoleARN', 

1838 'DBPortNumber', 

1839 'PubliclyAccessible', 

1840 'DomainIAMRoleName', 

1841 'PromotionTier', 

1842 'EnableIAMDatabaseAuthentication', 

1843 'EnablePerformanceInsights', 

1844 'PerformanceInsightsKMSKeyId', 

1845 'PerformanceInsightsRetentionPeriod', 

1846 'CloudwatchLogsExportConfiguration', 

1847 'ProcessorFeatures', 

1848 'UseDefaultProcessorFeatures', 

1849 'DeletionProtection', 

1850 'MaxAllocatedStorage', 

1851 'CertificateRotationRestart']}, 

1852 'value': {} 

1853 }, 

1854 }, 

1855 }, 

1856 required=('update',)) 

1857 

1858 permissions = ('rds:ModifyDBInstance',) 

1859 conversion_map = { 

1860 'DBSubnetGroupName': 'DBSubnetGroup.DBSubnetGroupName', 

1861 'VpcSecurityGroupIds': 'VpcSecurityGroups[].VpcSecurityGroupId', 

1862 'DBParameterGroupName': 'DBParameterGroups[].DBParameterGroupName', 

1863 'OptionGroupName': 'OptionGroupMemberships[].OptionGroupName', 

1864 'NewDBInstanceIdentifier': 'DBInstanceIdentifier', 

1865 'Domain': 'DomainMemberships[].DomainName', 

1866 'DBPortNumber': 'Endpoint.Port', 

1867 'EnablePerformanceInsights': 'PerformanceInsightsEnabled', 

1868 'CloudwatchLogsExportConfiguration': 'EnabledCloudwatchLogsExports' 

1869 } 

1870 

1871 def validate(self): 

1872 if self.data.get('update'): 

1873 update_dict = dict((i['property'], i['value']) for i in self.data.get('update')) 

1874 if ('MonitoringInterval' in update_dict and update_dict['MonitoringInterval'] > 0 and 

1875 'MonitoringRoleARN' not in update_dict): 

1876 raise PolicyValidationError( 

1877 "A MonitoringRoleARN value is required \ 

1878 if you specify a MonitoringInterval value other than 0") 

1879 if ('CloudwatchLogsExportConfiguration' in update_dict 

1880 and all( 

1881 k not in update_dict.get('CloudwatchLogsExportConfiguration') 

1882 for k in ('EnableLogTypes', 'DisableLogTypes'))): 

1883 raise PolicyValidationError( 

1884 "A EnableLogTypes or DisableLogTypes input list is required\ 

1885 for setting CloudwatchLogsExportConfiguration") 

1886 return self 

1887 

1888 def process(self, resources): 

1889 c = local_session(self.manager.session_factory).client('rds') 

1890 for r in resources: 

1891 param = { 

1892 u['property']: u['value'] for u in self.data.get('update') 

1893 if r.get( 

1894 u['property'], 

1895 jmespath_search( 

1896 self.conversion_map.get(u['property'], 'None'), r)) 

1897 != u['value']} 

1898 if not param: 

1899 continue 

1900 param['ApplyImmediately'] = self.data.get('immediate', False) 

1901 param['DBInstanceIdentifier'] = r['DBInstanceIdentifier'] 

1902 try: 

1903 c.modify_db_instance(**param) 

1904 except c.exceptions.DBInstanceNotFoundFault: 

1905 raise 

1906 

1907 

1908@resources.register('rds-reserved') 

1909class ReservedRDS(QueryResourceManager): 

1910 """Lists all active rds reservations 

1911 

1912 :example: 

1913 

1914 .. code-block:: yaml 

1915 

1916 policies: 

1917 - name: existing-rds-reservations 

1918 resource: rds-reserved 

1919 filters: 

1920 - State: active 

1921 """ 

1922 

1923 class resource_type(TypeInfo): 

1924 service = 'rds' 

1925 name = id = 'ReservedDBInstanceId' 

1926 date = 'StartTime' 

1927 enum_spec = ( 

1928 'describe_reserved_db_instances', 'ReservedDBInstances', None) 

1929 filter_name = 'ReservedDBInstances' 

1930 filter_type = 'list' 

1931 arn_type = "ri" 

1932 arn = "ReservedDBInstanceArn" 

1933 permissions_enum = ('rds:DescribeReservedDBInstances',) 

1934 universal_taggable = object() 

1935 

1936 augment = universal_augment 

1937 

1938 

1939RDS.filter_registry.register('consecutive-aws-backups', ConsecutiveAwsBackupsFilter) 

1940 

1941 

1942@filters.register('consecutive-snapshots') 

1943class ConsecutiveSnapshots(Filter): 

1944 """Returns instances where number of consective daily snapshots is 

1945 equal to/or greater than n days. 

1946 

1947 :example: 

1948 

1949 .. code-block:: yaml 

1950 

1951 policies: 

1952 - name: rds-daily-snapshot-count 

1953 resource: rds 

1954 filters: 

1955 - type: consecutive-snapshots 

1956 days: 7 

1957 """ 

1958 schema = type_schema('consecutive-snapshots', days={'type': 'number', 'minimum': 1}, 

1959 required=['days']) 

1960 permissions = ('rds:DescribeDBSnapshots', 'rds:DescribeDBInstances') 

1961 annotation = 'c7n:DBSnapshots' 

1962 

1963 def process_resource_set(self, client, resources): 

1964 rds_instances = [r['DBInstanceIdentifier'] for r in resources] 

1965 paginator = client.get_paginator('describe_db_snapshots') 

1966 paginator.PAGE_ITERATOR_CLS = RetryPageIterator 

1967 db_snapshots = paginator.paginate(Filters=[{'Name': 'db-instance-id', 

1968 'Values': rds_instances}]).build_full_result().get('DBSnapshots', []) 

1969 

1970 inst_map = {} 

1971 for snapshot in db_snapshots: 

1972 inst_map.setdefault(snapshot['DBInstanceIdentifier'], []).append(snapshot) 

1973 for r in resources: 

1974 r[self.annotation] = inst_map.get(r['DBInstanceIdentifier'], []) 

1975 

1976 def process(self, resources, event=None): 

1977 client = local_session(self.manager.session_factory).client('rds') 

1978 results = [] 

1979 retention = self.data.get('days') 

1980 utcnow = datetime.datetime.utcnow() 

1981 expected_dates = set() 

1982 for days in range(1, retention + 1): 

1983 expected_dates.add((utcnow - timedelta(days=days)).strftime('%Y-%m-%d')) 

1984 

1985 for resource_set in chunks( 

1986 [r for r in resources if self.annotation not in r], 50): 

1987 self.process_resource_set(client, resource_set) 

1988 

1989 for r in resources: 

1990 snapshot_dates = set() 

1991 for snapshot in r[self.annotation]: 

1992 if snapshot['Status'] == 'available': 

1993 snapshot_dates.add(snapshot['SnapshotCreateTime'].strftime('%Y-%m-%d')) 

1994 if expected_dates.issubset(snapshot_dates): 

1995 results.append(r) 

1996 return results 

1997 

1998 

1999@filters.register('engine') 

2000class EngineFilter(ValueFilter): 

2001 """ 

2002 Filter a rds resource based on its Engine Metadata 

2003 

2004 :example: 

2005 

2006 .. code-block:: yaml 

2007 

2008 policies: 

2009 - name: find-deprecated-versions 

2010 resource: aws.rds 

2011 filters: 

2012 - type: engine 

2013 key: Status 

2014 value: deprecated 

2015 """ 

2016 

2017 schema = type_schema('engine', rinherit=ValueFilter.schema) 

2018 

2019 permissions = ("rds:DescribeDBEngineVersions", ) 

2020 

2021 def process(self, resources, event=None): 

2022 client = local_session(self.manager.session_factory).client('rds') 

2023 

2024 engines = set() 

2025 engine_versions = set() 

2026 for r in resources: 

2027 engines.add(r['Engine']) 

2028 engine_versions.add(r['EngineVersion']) 

2029 

2030 paginator = client.get_paginator('describe_db_engine_versions') 

2031 response = paginator.paginate( 

2032 Filters=[ 

2033 {'Name': 'engine', 'Values': list(engines)}, 

2034 {'Name': 'engine-version', 'Values': list(engine_versions)} 

2035 ], 

2036 IncludeAll=True, 

2037 ) 

2038 all_versions = {} 

2039 matched = [] 

2040 for page in response: 

2041 for e in page['DBEngineVersions']: 

2042 all_versions.setdefault(e['Engine'], {}) 

2043 all_versions[e['Engine']][e['EngineVersion']] = e 

2044 for r in resources: 

2045 v = all_versions[r['Engine']][r['EngineVersion']] 

2046 if self.match(v): 

2047 r['c7n:Engine'] = v 

2048 matched.append(r) 

2049 return matched 

2050 

2051 

2052class DescribeDBProxy(DescribeSource): 

2053 def augment(self, resources): 

2054 return universal_augment(self.manager, resources) 

2055 

2056 

2057@resources.register('rds-proxy') 

2058class RDSProxy(QueryResourceManager): 

2059 """Resource Manager for RDS DB Proxies 

2060 

2061 :example: 

2062 

2063 .. code-block:: yaml 

2064 

2065 policies: 

2066 - name: rds-proxy-tls-check 

2067 resource: rds-proxy 

2068 filters: 

2069 - type: value 

2070 key: RequireTLS 

2071 value: false 

2072 """ 

2073 

2074 class resource_type(TypeInfo): 

2075 service = 'rds' 

2076 name = id = 'DBProxyName' 

2077 date = 'CreatedDate' 

2078 enum_spec = ('describe_db_proxies', 'DBProxies', None) 

2079 arn = 'DBProxyArn' 

2080 arn_type = 'db-proxy' 

2081 cfn_type = 'AWS::RDS::DBProxy' 

2082 permissions_enum = ('rds:DescribeDBProxies',) 

2083 universal_taggable = object() 

2084 

2085 source_mapping = { 

2086 'describe': DescribeDBProxy, 

2087 'config': ConfigSource 

2088 } 

2089 

2090 

2091@RDSProxy.action_registry.register('delete') 

2092class DeleteRDSProxy(BaseAction): 

2093 """ 

2094 Deletes a RDS Proxy 

2095 

2096 :example: 

2097 

2098 .. code-block:: yaml 

2099 

2100 policies: 

2101 - name: delete-rds-proxy 

2102 resource: aws.rds-proxy 

2103 filters: 

2104 - type: value 

2105 key: "DBProxyName" 

2106 op: eq 

2107 value: "proxy-test-1" 

2108 actions: 

2109 - type: delete 

2110 """ 

2111 

2112 schema = type_schema('delete') 

2113 

2114 permissions = ('rds:DeleteDBProxy',) 

2115 

2116 def process(self, resources): 

2117 client = local_session(self.manager.session_factory).client('rds') 

2118 for r in resources: 

2119 self.manager.retry( 

2120 client.delete_db_proxy, DBProxyName=r['DBProxyName'], 

2121 ignore_err_codes=('DBProxyNotFoundFault', 

2122 'InvalidDBProxyStateFault')) 

2123 

2124 

2125@RDSProxy.filter_registry.register('subnet') 

2126class RDSProxySubnetFilter(net_filters.SubnetFilter): 

2127 

2128 RelatedIdsExpression = "VpcSubnetIds[]" 

2129 

2130 

2131@RDSProxy.filter_registry.register('security-group') 

2132class RDSProxySecurityGroupFilter(net_filters.SecurityGroupFilter): 

2133 

2134 RelatedIdsExpression = "VpcSecurityGroupIds[]" 

2135 

2136 

2137@RDSProxy.filter_registry.register('vpc') 

2138class RDSProxyVpcFilter(net_filters.VpcFilter): 

2139 

2140 RelatedIdsExpression = "VpcId" 

2141 

2142 

2143@filters.register('db-option-groups') 

2144class DbOptionGroups(ValueFilter): 

2145 """This filter describes RDS option groups for associated RDS instances. 

2146 Use this filter in conjunction with jmespath and value filter operators 

2147 to filter RDS instance based on their option groups 

2148 

2149 :example: 

2150 

2151 .. code-block:: yaml 

2152 

2153 policies: 

2154 - name: rds-data-in-transit-encrypted 

2155 resource: aws.rds 

2156 filters: 

2157 - type: db-option-groups 

2158 key: Options[].OptionName 

2159 op: intersect 

2160 value: 

2161 - SSL 

2162 - NATIVE_NETWORK_ENCRYPTION 

2163 

2164 :example: 

2165 

2166 .. code-block:: yaml 

2167 

2168 policies: 

2169 - name: rds-oracle-encryption-in-transit 

2170 resource: aws.rds 

2171 filters: 

2172 - Engine: oracle-ee 

2173 - type: db-option-groups 

2174 key: Options[].OptionSettings[?Name == 'SQLNET.ENCRYPTION_SERVER'].Value[] 

2175 value: 

2176 - REQUIRED 

2177 """ 

2178 

2179 schema = type_schema('db-option-groups', rinherit=ValueFilter.schema) 

2180 schema_alias = False 

2181 permissions = ('rds:DescribeDBInstances', 'rds:DescribeOptionGroups', ) 

2182 policy_annotation = 'c7n:MatchedDBOptionGroups' 

2183 

2184 def handle_optiongroup_cache(self, client, paginator, option_groups): 

2185 ogcache = {} 

2186 cache = self.manager._cache 

2187 

2188 with cache: 

2189 for og in option_groups: 

2190 cache_key = { 

2191 'region': self.manager.config.region, 

2192 'account_id': self.manager.config.account_id, 

2193 'rds-pg': og} 

2194 og_values = cache.get(cache_key) 

2195 if og_values is not None: 

2196 ogcache[og] = og_values 

2197 continue 

2198 option_groups_list = list(itertools.chain(*[p['OptionGroupsList'] 

2199 for p in paginator.paginate(OptionGroupName=og)])) 

2200 

2201 ogcache[og] = {} 

2202 for option_group in option_groups_list: 

2203 ogcache[og] = option_group 

2204 

2205 cache.save(cache_key, ogcache[og]) 

2206 

2207 return ogcache 

2208 

2209 def process(self, resources, event=None): 

2210 results = [] 

2211 client = local_session(self.manager.session_factory).client('rds') 

2212 paginator = client.get_paginator('describe_option_groups') 

2213 option_groups = [db['OptionGroupMemberships'][0]['OptionGroupName'] 

2214 for db in resources] 

2215 optioncache = self.handle_optiongroup_cache(client, paginator, option_groups) 

2216 

2217 for resource in resources: 

2218 for og in resource['OptionGroupMemberships']: 

2219 og_values = optioncache[og['OptionGroupName']] 

2220 if self.match(og_values): 

2221 resource.setdefault(self.policy_annotation, []).append({ 

2222 k: jmespath_search(k, og_values) 

2223 for k in {'OptionGroupName', self.data.get('key')} 

2224 }) 

2225 results.append(resource) 

2226 break 

2227 

2228 return results 

2229 

2230 

2231@filters.register('pending-maintenance') 

2232class PendingMaintenance(Filter): 

2233 """Scan DB instances for those with pending maintenance 

2234 

2235 :example: 

2236 

2237 .. code-block:: yaml 

2238 

2239 policies: 

2240 - name: rds-pending-maintenance 

2241 resource: aws.rds 

2242 filters: 

2243 - pending-maintenance 

2244 - type: value 

2245 key: '"c7n:PendingMaintenance"[].PendingMaintenanceActionDetails[].Action' 

2246 op: intersect 

2247 value: 

2248 - system-update 

2249 """ 

2250 

2251 annotation_key = 'c7n:PendingMaintenance' 

2252 schema = type_schema('pending-maintenance') 

2253 permissions = ('rds:DescribePendingMaintenanceActions',) 

2254 

2255 def process(self, resources, event=None): 

2256 client = local_session(self.manager.session_factory).client('rds') 

2257 

2258 results = [] 

2259 resource_maintenances = {} 

2260 paginator = client.get_paginator('describe_pending_maintenance_actions') 

2261 for page in paginator.paginate(): 

2262 for action in page['PendingMaintenanceActions']: 

2263 resource_maintenances.setdefault(action['ResourceIdentifier'], []).append(action) 

2264 

2265 for r in resources: 

2266 pending_maintenances = resource_maintenances.get(r['DBInstanceArn'], []) 

2267 if len(pending_maintenances) > 0: 

2268 r[self.annotation_key] = pending_maintenances 

2269 results.append(r) 

2270 

2271 return results