Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/c7n/resources/rds.py: 39%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

952 statements  

1# Copyright The Cloud Custodian Authors. 

2# SPDX-License-Identifier: Apache-2.0 

3""" 

4RDS Resource Manager 

5==================== 

6 

7Example Policies 

8---------------- 

9 

10Find rds instances that are publicly available 

11 

12.. code-block:: yaml 

13 

14 policies: 

15 - name: rds-public 

16 resource: rds 

17 filters: 

18 - PubliclyAccessible: true 

19 

20Find rds instances that are not encrypted 

21 

22.. code-block:: yaml 

23 

24 policies: 

25 - name: rds-non-encrypted 

26 resource: rds 

27 filters: 

28 - type: value 

29 key: StorageEncrypted 

30 value: true 

31 op: ne 

32 

33""" 

34import functools 

35import itertools 

36import logging 

37import operator 

38import re 

39import datetime 

40 

41from datetime import timedelta 

42 

43from decimal import Decimal as D, ROUND_HALF_UP 

44 

45from c7n.vendored.distutils.version import LooseVersion 

46from botocore.exceptions import ClientError 

47from concurrent.futures import as_completed 

48 

49from c7n.actions import ( 

50 ActionRegistry, BaseAction, ModifyVpcSecurityGroupsAction) 

51 

52from c7n.exceptions import PolicyValidationError 

53from c7n.filters import ( 

54 CrossAccountAccessFilter, FilterRegistry, Filter, ValueFilter, AgeFilter) 

55from c7n.filters.offhours import OffHour, OnHour 

56from c7n.filters import related 

57import c7n.filters.vpc as net_filters 

58from c7n.manager import resources 

59from c7n.query import ( 

60 QueryResourceManager, DescribeSource, ConfigSource, TypeInfo, RetryPageIterator) 

61from c7n import deprecated, tags 

62from c7n.tags import universal_augment 

63 

64from c7n.utils import ( 

65 local_session, type_schema, get_retry, chunks, snapshot_identifier, 

66 merge_dict_list, filter_empty, jmespath_search) 

67from c7n.resources.kms import ResourceKmsKeyAlias 

68from c7n.resources.securityhub import PostFinding 

69from c7n.filters.backup import ConsecutiveAwsBackupsFilter 

70 

71log = logging.getLogger('custodian.rds') 

72 

73filters = FilterRegistry('rds.filters') 

74actions = ActionRegistry('rds.actions') 

75 

76 

77class DescribeRDS(DescribeSource): 

78 

79 def augment(self, dbs): 

80 for d in dbs: 

81 d['Tags'] = d.pop('TagList', ()) 

82 return dbs 

83 

84 

85class ConfigRDS(ConfigSource): 

86 

87 def load_resource(self, item): 

88 resource = super().load_resource(item) 

89 for k in list(resource.keys()): 

90 if k.startswith('Db'): 

91 resource["DB%s" % k[2:]] = resource[k] 

92 return resource 

93 

94 

95@resources.register('rds') 

96class RDS(QueryResourceManager): 

97 """Resource manager for RDS DB instances. 

98 """ 

99 

100 class resource_type(TypeInfo): 

101 service = 'rds' 

102 arn_type = 'db' 

103 arn_separator = ':' 

104 enum_spec = ('describe_db_instances', 'DBInstances', None) 

105 id = 'DBInstanceIdentifier' 

106 config_id = 'DbiResourceId' 

107 name = 'Endpoint.Address' 

108 filter_name = 'DBInstanceIdentifier' 

109 filter_type = 'scalar' 

110 date = 'InstanceCreateTime' 

111 dimension = 'DBInstanceIdentifier' 

112 cfn_type = config_type = 'AWS::RDS::DBInstance' 

113 arn = 'DBInstanceArn' 

114 universal_taggable = True 

115 default_report_fields = ( 

116 'DBInstanceIdentifier', 

117 'DBName', 

118 'Engine', 

119 'EngineVersion', 

120 'MultiAZ', 

121 'AllocatedStorage', 

122 'StorageEncrypted', 

123 'PubliclyAccessible', 

124 'InstanceCreateTime', 

125 ) 

126 permissions_enum = ('rds:DescribeDBInstances',) 

127 

128 filter_registry = filters 

129 action_registry = actions 

130 

131 def resources(self, query=None): 

132 if query is None and 'query' in self.data: 

133 query = merge_dict_list(self.data['query']) 

134 elif query is None: 

135 query = {} 

136 return super(RDS, self).resources(query=query) 

137 

138 source_mapping = { 

139 'describe': DescribeRDS, 

140 'config': ConfigRDS 

141 } 

142 

143 

144def _db_instance_eligible_for_backup(resource): 

145 db_instance_id = resource['DBInstanceIdentifier'] 

146 

147 # Database instance is not in available state 

148 if resource.get('DBInstanceStatus', '') != 'available': 

149 log.debug( 

150 "DB instance %s is not in available state", 

151 db_instance_id) 

152 return False 

153 # The specified DB Instance is a member of a cluster and its 

154 # backup retention should not be modified directly. Instead, 

155 # modify the backup retention of the cluster using the 

156 # ModifyDbCluster API 

157 if resource.get('DBClusterIdentifier', ''): 

158 log.debug( 

159 "DB instance %s is a cluster member", 

160 db_instance_id) 

161 return False 

162 # DB Backups not supported on a read replica for engine postgres 

163 if (resource.get('ReadReplicaSourceDBInstanceIdentifier', '') and 

164 resource.get('Engine', '') == 'postgres'): 

165 log.debug( 

166 "DB instance %s is a postgres read-replica", 

167 db_instance_id) 

168 return False 

169 # DB Backups not supported on a read replica running a mysql 

170 # version before 5.6 

171 if (resource.get('ReadReplicaSourceDBInstanceIdentifier', '') and 

172 resource.get('Engine', '') == 'mysql'): 

173 engine_version = resource.get('EngineVersion', '') 

174 # Assume "<major>.<minor>.<whatever>" 

175 match = re.match(r'(?P<major>\d+)\.(?P<minor>\d+)\..*', engine_version) 

176 if (match and int(match.group('major')) < 5 or 

177 (int(match.group('major')) == 5 and int(match.group('minor')) < 6)): 

178 log.debug( 

179 "DB instance %s is a version %s mysql read-replica", 

180 db_instance_id, 

181 engine_version) 

182 return False 

183 return True 

184 

185 

186def _db_instance_eligible_for_final_snapshot(resource): 

187 status = resource.get('DBInstanceStatus', '') 

188 # If the DB instance you are deleting has a status of "Creating," 

189 # you will not be able to have a final DB snapshot taken 

190 # If the DB instance is in a failure state with a status of "failed," 

191 # "incompatible-restore," or "incompatible-network," you can only delete 

192 # the instance when the SkipFinalSnapshot parameter is set to "true." 

193 eligible_for_final_snapshot = True 

194 if status in ['creating', 'failed', 'incompatible-restore', 'incompatible-network']: 

195 eligible_for_final_snapshot = False 

196 

197 # FinalDBSnapshotIdentifier can not be specified when deleting a 

198 # replica instance 

199 if resource.get('ReadReplicaSourceDBInstanceIdentifier', ''): 

200 eligible_for_final_snapshot = False 

201 

202 # if it's a rds-cluster, don't try to run the rds instance snapshot api call 

203 if resource.get('DBClusterIdentifier', False): 

204 eligible_for_final_snapshot = False 

205 

206 if not eligible_for_final_snapshot: 

207 log.debug('DB instance is not eligible for a snapshot:/n %s', resource) 

208 return eligible_for_final_snapshot 

209 

210 

211def _get_available_engine_upgrades(client, major=False): 

212 """Returns all extant rds engine upgrades. 

213 

214 As a nested mapping of engine type to known versions 

215 and their upgrades. 

216 

217 Defaults to minor upgrades, but configurable to major. 

218 

219 Example:: 

220 

221 >>> _get_available_engine_upgrades(client) 

222 { 

223 'oracle-se2': {'12.1.0.2.v2': '12.1.0.2.v5', 

224 '12.1.0.2.v3': '12.1.0.2.v5'}, 

225 'postgres': {'9.3.1': '9.3.14', 

226 '9.3.10': '9.3.14', 

227 '9.3.12': '9.3.14', 

228 '9.3.2': '9.3.14'} 

229 } 

230 """ 

231 results = {} 

232 paginator = client.get_paginator('describe_db_engine_versions') 

233 for page in paginator.paginate(): 

234 engine_versions = page['DBEngineVersions'] 

235 for v in engine_versions: 

236 if v['Engine'] not in results: 

237 results[v['Engine']] = {} 

238 if 'ValidUpgradeTarget' not in v or len(v['ValidUpgradeTarget']) == 0: 

239 continue 

240 for t in v['ValidUpgradeTarget']: 

241 if not major and t['IsMajorVersionUpgrade']: 

242 continue 

243 if LooseVersion(t['EngineVersion']) > LooseVersion( 

244 results[v['Engine']].get(v['EngineVersion'], '0.0.0')): 

245 results[v['Engine']][v['EngineVersion']] = t['EngineVersion'] 

246 return results 

247 

248 

249filters.register('offhour', OffHour) 

250filters.register('onhour', OnHour) 

251 

252 

253@filters.register('default-vpc') 

254class DefaultVpc(net_filters.DefaultVpcBase): 

255 """ Matches if an rds database is in the default vpc 

256 

257 :example: 

258 

259 .. code-block:: yaml 

260 

261 policies: 

262 - name: default-vpc-rds 

263 resource: rds 

264 filters: 

265 - type: default-vpc 

266 """ 

267 schema = type_schema('default-vpc') 

268 

269 def __call__(self, rdb): 

270 return self.match(rdb['DBSubnetGroup']['VpcId']) 

271 

272 

273@filters.register('security-group') 

274class SecurityGroupFilter(net_filters.SecurityGroupFilter): 

275 

276 RelatedIdsExpression = "VpcSecurityGroups[].VpcSecurityGroupId" 

277 

278 

279@filters.register('subnet') 

280class SubnetFilter(net_filters.SubnetFilter): 

281 

282 RelatedIdsExpression = "DBSubnetGroup.Subnets[].SubnetIdentifier" 

283 

284 

285@filters.register('vpc') 

286class VpcFilter(net_filters.VpcFilter): 

287 

288 RelatedIdsExpression = "DBSubnetGroup.VpcId" 

289 

290 

291filters.register('network-location', net_filters.NetworkLocation) 

292 

293 

294@filters.register('kms-alias') 

295class KmsKeyAlias(ResourceKmsKeyAlias): 

296 

297 def process(self, dbs, event=None): 

298 return self.get_matching_aliases(dbs) 

299 

300 

301@actions.register('auto-patch') 

302class AutoPatch(BaseAction): 

303 """Toggle AutoMinorUpgrade flag on RDS instance 

304 

305 'window' parameter needs to be in the format 'ddd:hh:mm-ddd:hh:mm' and 

306 have at least 30 minutes between start & end time. 

307 If 'window' is not specified, AWS will assign a random maintenance window 

308 to each instance selected. 

309 

310 :example: 

311 

312 .. code-block:: yaml 

313 

314 policies: 

315 - name: enable-rds-autopatch 

316 resource: rds 

317 filters: 

318 - AutoMinorVersionUpgrade: false 

319 actions: 

320 - type: auto-patch 

321 minor: true 

322 window: Mon:23:00-Tue:01:00 

323 """ 

324 

325 schema = type_schema( 

326 'auto-patch', 

327 minor={'type': 'boolean'}, window={'type': 'string'}) 

328 permissions = ('rds:ModifyDBInstance',) 

329 

330 def process(self, dbs): 

331 client = local_session( 

332 self.manager.session_factory).client('rds') 

333 

334 params = {'AutoMinorVersionUpgrade': self.data.get('minor', True)} 

335 if self.data.get('window'): 

336 params['PreferredMaintenanceWindow'] = self.data['window'] 

337 

338 for db in dbs: 

339 client.modify_db_instance( 

340 DBInstanceIdentifier=db['DBInstanceIdentifier'], 

341 **params) 

342 

343 

344@filters.register('upgrade-available') 

345class UpgradeAvailable(Filter): 

346 """ Scan DB instances for available engine upgrades 

347 

348 This will pull DB instances & check their specific engine for any 

349 engine version with higher release numbers than the current one 

350 

351 This will also annotate the rds instance with 'target_engine' which is 

352 the most recent version of the engine available 

353 

354 :example: 

355 

356 .. code-block:: yaml 

357 

358 policies: 

359 - name: rds-upgrade-available 

360 resource: rds 

361 filters: 

362 - type: upgrade-available 

363 major: False 

364 

365 """ 

366 

367 schema = type_schema('upgrade-available', 

368 major={'type': 'boolean'}, 

369 value={'type': 'boolean'}) 

370 permissions = ('rds:DescribeDBEngineVersions',) 

371 

372 def process(self, resources, event=None): 

373 client = local_session(self.manager.session_factory).client('rds') 

374 check_upgrade_extant = self.data.get('value', True) 

375 check_major = self.data.get('major', False) 

376 engine_upgrades = _get_available_engine_upgrades( 

377 client, major=check_major) 

378 results = [] 

379 

380 for r in resources: 

381 target_upgrade = engine_upgrades.get( 

382 r['Engine'], {}).get(r['EngineVersion']) 

383 if target_upgrade is None: 

384 if check_upgrade_extant is False: 

385 results.append(r) 

386 continue 

387 r['c7n-rds-engine-upgrade'] = target_upgrade 

388 results.append(r) 

389 return results 

390 

391 

392@actions.register('upgrade') 

393class UpgradeMinor(BaseAction): 

394 """Upgrades a RDS instance to the latest major/minor version available 

395 

396 Use of the 'immediate' flag (default False) will automatically upgrade 

397 the RDS engine disregarding the existing maintenance window. 

398 

399 :example: 

400 

401 .. code-block:: yaml 

402 

403 policies: 

404 - name: upgrade-rds-minor 

405 resource: rds 

406 actions: 

407 - type: upgrade 

408 major: False 

409 immediate: False 

410 

411 """ 

412 

413 schema = type_schema( 

414 'upgrade', 

415 major={'type': 'boolean'}, 

416 immediate={'type': 'boolean'}) 

417 permissions = ('rds:ModifyDBInstance',) 

418 

419 def process(self, resources): 

420 client = local_session(self.manager.session_factory).client('rds') 

421 engine_upgrades = None 

422 for r in resources: 

423 if 'EngineVersion' in r['PendingModifiedValues']: 

424 # Upgrade has already been scheduled 

425 continue 

426 if 'c7n-rds-engine-upgrade' not in r: 

427 if engine_upgrades is None: 

428 engine_upgrades = _get_available_engine_upgrades( 

429 client, major=self.data.get('major', False)) 

430 target = engine_upgrades.get( 

431 r['Engine'], {}).get(r['EngineVersion']) 

432 if target is None: 

433 log.debug( 

434 "implicit filter no upgrade on %s", 

435 r['DBInstanceIdentifier']) 

436 continue 

437 r['c7n-rds-engine-upgrade'] = target 

438 client.modify_db_instance( 

439 DBInstanceIdentifier=r['DBInstanceIdentifier'], 

440 EngineVersion=r['c7n-rds-engine-upgrade'], 

441 ApplyImmediately=self.data.get('immediate', False)) 

442 

443 

444@actions.register('tag-trim') 

445class TagTrim(tags.TagTrim): 

446 

447 permissions = ('rds:RemoveTagsFromResource',) 

448 

449 def process_tag_removal(self, client, resource, candidates): 

450 client.remove_tags_from_resource(ResourceName=resource['DBInstanceArn'], TagKeys=candidates) 

451 

452 

453START_STOP_ELIGIBLE_ENGINES = { 

454 'postgres', 'sqlserver-ee', 

455 'oracle-se2', 'mariadb', 'oracle-ee', 

456 'sqlserver-ex', 'sqlserver-se', 'oracle-se', 

457 'mysql', 'oracle-se1', 'sqlserver-web', 

458 'db2-ae', 'db2-se', 'oracle-ee-cdb', 

459 'sqlserver-ee', 'oracle-se2-cdb'} 

460 

461 

462def _eligible_start_stop(db, state="available"): 

463 # See conditions noted here 

464 # https://docs.aws.amazon.com/AmazonRDS/latest/UserGuide/USER_StopInstance.html 

465 # Note that this doesn't really specify what happens for all the nosql engines 

466 # that are available as rds engines. 

467 if db.get('DBInstanceStatus') != state: 

468 return False 

469 

470 if db.get('MultiAZ') and db['Engine'].startswith('sqlserver-'): 

471 return False 

472 

473 if db['Engine'] not in START_STOP_ELIGIBLE_ENGINES: 

474 return False 

475 

476 if db.get('ReadReplicaDBInstanceIdentifiers'): 

477 return False 

478 

479 if db.get('ReadReplicaSourceDBInstanceIdentifier'): 

480 return False 

481 

482 # If the instance is part of an Aurora cluster, it can't be individually 

483 # stopped. 

484 if db.get('DBClusterIdentifier'): 

485 log.warning( 

486 ( 

487 "DB instance %s could not be started/stopped because it's part " 

488 "of an Aurora cluster." 

489 ), 

490 db['DBInstanceIdentifier'], 

491 ) 

492 return False 

493 

494 # TODO is SQL Server mirror is detectable. 

495 return True 

496 

497 

498@actions.register('stop') 

499class Stop(BaseAction): 

500 """Stop an rds instance. 

501 

502 https://docs.aws.amazon.com/AmazonRDS/latest/UserGuide/USER_StopInstance.html 

503 """ 

504 

505 schema = type_schema('stop') 

506 

507 permissions = ("rds:StopDBInstance",) 

508 

509 def process(self, resources): 

510 client = local_session(self.manager.session_factory).client('rds') 

511 for r in filter(_eligible_start_stop, resources): 

512 try: 

513 client.stop_db_instance( 

514 DBInstanceIdentifier=r['DBInstanceIdentifier']) 

515 except ClientError as e: 

516 log.exception( 

517 "Error stopping db instance:%s err:%s", 

518 r['DBInstanceIdentifier'], e) 

519 

520 

521@actions.register('start') 

522class Start(BaseAction): 

523 """Start an rds instance. 

524 """ 

525 

526 schema = type_schema('start') 

527 

528 permissions = ("rds:StartDBInstance",) 

529 

530 def process(self, resources): 

531 client = local_session(self.manager.session_factory).client('rds') 

532 start_filter = functools.partial(_eligible_start_stop, state='stopped') 

533 for r in filter(start_filter, resources): 

534 try: 

535 client.start_db_instance( 

536 DBInstanceIdentifier=r['DBInstanceIdentifier']) 

537 except ClientError as e: 

538 log.exception( 

539 "Error starting db instance:%s err:%s", 

540 r['DBInstanceIdentifier'], e) 

541 

542 

543@actions.register('delete') 

544class Delete(BaseAction): 

545 """Deletes selected RDS instances 

546 

547 This will delete RDS instances. It is recommended to apply with a filter 

548 to avoid deleting all RDS instances in the account. 

549 

550 :example: 

551 

552 .. code-block:: yaml 

553 

554 policies: 

555 - name: rds-delete 

556 resource: rds 

557 filters: 

558 - default-vpc 

559 actions: 

560 - type: delete 

561 skip-snapshot: true 

562 """ 

563 

564 schema = type_schema('delete', **{ 

565 'skip-snapshot': {'type': 'boolean'}, 

566 'copy-restore-info': {'type': 'boolean'} 

567 }) 

568 

569 permissions = ('rds:DeleteDBInstance', 'rds:AddTagsToResource') 

570 

571 def validate(self): 

572 if self.data.get('skip-snapshot', False) and self.data.get( 

573 'copy-restore-info'): 

574 raise PolicyValidationError( 

575 "skip-snapshot cannot be specified with copy-restore-info on %s" % ( 

576 self.manager.data,)) 

577 return self 

578 

579 def process(self, dbs): 

580 skip = self.data.get('skip-snapshot', False) 

581 # Can't delete an instance in an aurora cluster, use a policy on the cluster 

582 dbs = self.filter_resources(dbs, 'DBClusterIdentifier', (None,)) 

583 # Concurrency feels like overkill here. 

584 client = local_session(self.manager.session_factory).client('rds') 

585 for db in dbs: 

586 params = dict( 

587 DBInstanceIdentifier=db['DBInstanceIdentifier']) 

588 if skip or not _db_instance_eligible_for_final_snapshot(db): 

589 params['SkipFinalSnapshot'] = True 

590 else: 

591 params['FinalDBSnapshotIdentifier'] = snapshot_identifier( 

592 'Final', db['DBInstanceIdentifier']) 

593 if self.data.get('copy-restore-info', False): 

594 self.copy_restore_info(client, db) 

595 if not db['CopyTagsToSnapshot']: 

596 client.modify_db_instance( 

597 DBInstanceIdentifier=db['DBInstanceIdentifier'], 

598 CopyTagsToSnapshot=True) 

599 self.log.info( 

600 "Deleting rds: %s snapshot: %s", 

601 db['DBInstanceIdentifier'], 

602 params.get('FinalDBSnapshotIdentifier', False)) 

603 

604 try: 

605 client.delete_db_instance(**params) 

606 except ClientError as e: 

607 if e.response['Error']['Code'] == "InvalidDBInstanceState": 

608 continue 

609 raise 

610 

611 return dbs 

612 

613 def copy_restore_info(self, client, instance): 

614 tags = [] 

615 tags.append({ 

616 'Key': 'VPCSecurityGroups', 

617 'Value': ''.join([ 

618 g['VpcSecurityGroupId'] for g in instance['VpcSecurityGroups'] 

619 ])}) 

620 tags.append({ 

621 'Key': 'OptionGroupName', 

622 'Value': instance['OptionGroupMemberships'][0]['OptionGroupName']}) 

623 tags.append({ 

624 'Key': 'ParameterGroupName', 

625 'Value': instance['DBParameterGroups'][0]['DBParameterGroupName']}) 

626 tags.append({ 

627 'Key': 'InstanceClass', 

628 'Value': instance['DBInstanceClass']}) 

629 tags.append({ 

630 'Key': 'StorageType', 

631 'Value': instance['StorageType']}) 

632 tags.append({ 

633 'Key': 'MultiAZ', 

634 'Value': str(instance['MultiAZ'])}) 

635 tags.append({ 

636 'Key': 'DBSubnetGroupName', 

637 'Value': instance['DBSubnetGroup']['DBSubnetGroupName']}) 

638 client.add_tags_to_resource( 

639 ResourceName=self.manager.generate_arn( 

640 instance['DBInstanceIdentifier']), 

641 Tags=tags) 

642 

643 

644@actions.register('set-snapshot-copy-tags') 

645class CopySnapshotTags(BaseAction): 

646 """Enables copying tags from rds instance to snapshot 

647 

648 DEPRECATED - use modify-db instead with `CopyTagsToSnapshot` 

649 

650 :example: 

651 

652 .. code-block:: yaml 

653 

654 policies: 

655 - name: enable-rds-snapshot-tags 

656 resource: rds 

657 filters: 

658 - type: value 

659 key: Engine 

660 value: aurora 

661 op: eq 

662 actions: 

663 - type: set-snapshot-copy-tags 

664 enable: True 

665 """ 

666 deprecations = ( 

667 deprecated.action("use modify-db instead with `CopyTagsToSnapshot`"), 

668 ) 

669 

670 schema = type_schema( 

671 'set-snapshot-copy-tags', 

672 enable={'type': 'boolean'}) 

673 permissions = ('rds:ModifyDBInstance',) 

674 

675 def process(self, resources): 

676 error = None 

677 with self.executor_factory(max_workers=2) as w: 

678 futures = {} 

679 client = local_session(self.manager.session_factory).client('rds') 

680 resources = [r for r in resources 

681 if r['CopyTagsToSnapshot'] != self.data.get('enable', True)] 

682 for r in resources: 

683 futures[w.submit(self.set_snapshot_tags, client, r)] = r 

684 for f in as_completed(futures): 

685 if f.exception(): 

686 error = f.exception() 

687 self.log.error( 

688 'error updating rds:%s CopyTagsToSnapshot \n %s', 

689 futures[f]['DBInstanceIdentifier'], error) 

690 if error: 

691 raise error 

692 return resources 

693 

694 def set_snapshot_tags(self, client, r): 

695 self.manager.retry( 

696 client.modify_db_instance, 

697 DBInstanceIdentifier=r['DBInstanceIdentifier'], 

698 CopyTagsToSnapshot=self.data.get('enable', True)) 

699 

700 

701@RDS.action_registry.register('post-finding') 

702class DbInstanceFinding(PostFinding): 

703 

704 resource_type = 'AwsRdsDbInstance' 

705 

706 def format_resource(self, r): 

707 

708 fields = [ 

709 'AssociatedRoles', 'CACertificateIdentifier', 'DBClusterIdentifier', 

710 'DBInstanceIdentifier', 'DBInstanceClass', 'DbInstancePort', 'DbiResourceId', 

711 'DBName', 'DeletionProtection', 'Endpoint', 'Engine', 'EngineVersion', 

712 'IAMDatabaseAuthenticationEnabled', 'InstanceCreateTime', 'KmsKeyId', 

713 'PubliclyAccessible', 'StorageEncrypted', 

714 'TdeCredentialArn', 'VpcSecurityGroups', 'MultiAz', 'EnhancedMonitoringResourceArn', 

715 'DbInstanceStatus', 'MasterUsername', 

716 'AllocatedStorage', 'PreferredBackupWindow', 'BackupRetentionPeriod', 

717 'DbSecurityGroups', 'DbParameterGroups', 

718 'AvailabilityZone', 'DbSubnetGroup', 'PreferredMaintenanceWindow', 

719 'PendingModifiedValues', 'LatestRestorableTime', 

720 'AutoMinorVersionUpgrade', 'ReadReplicaSourceDBInstanceIdentifier', 

721 'ReadReplicaDBInstanceIdentifiers', 

722 'ReadReplicaDBClusterIdentifiers', 'LicenseModel', 'Iops', 'OptionGroupMemberships', 

723 'CharacterSetName', 

724 'SecondaryAvailabilityZone', 'StatusInfos', 'StorageType', 'DomainMemberships', 

725 'CopyTagsToSnapshot', 

726 'MonitoringInterval', 'MonitoringRoleArn', 'PromotionTier', 'Timezone', 

727 'PerformanceInsightsEnabled', 

728 'PerformanceInsightsKmsKeyId', 'PerformanceInsightsRetentionPeriod', 

729 'EnabledCloudWatchLogsExports', 

730 'ProcessorFeatures', 'ListenerEndpoint', 'MaxAllocatedStorage' 

731 ] 

732 details = {} 

733 for f in fields: 

734 if r.get(f): 

735 value = r[f] 

736 if isinstance(r[f], datetime.datetime): 

737 value = r[f].isoformat() 

738 details.setdefault(f, value) 

739 

740 db_instance = { 

741 'Type': self.resource_type, 

742 'Id': r['DBInstanceArn'], 

743 'Region': self.manager.config.region, 

744 'Tags': {t['Key']: t['Value'] for t in r.get('Tags', [])}, 

745 'Details': {self.resource_type: filter_empty(details)}, 

746 } 

747 db_instance = filter_empty(db_instance) 

748 return db_instance 

749 

750 

751@actions.register('snapshot') 

752class Snapshot(BaseAction): 

753 """Creates a manual snapshot of a RDS instance 

754 

755 :example: 

756 

757 .. code-block:: yaml 

758 

759 policies: 

760 - name: rds-snapshot 

761 resource: rds 

762 actions: 

763 - snapshot 

764 """ 

765 

766 schema = type_schema('snapshot') 

767 permissions = ('rds:CreateDBSnapshot',) 

768 

769 def process(self, dbs): 

770 with self.executor_factory(max_workers=3) as w: 

771 futures = [] 

772 for db in dbs: 

773 futures.append(w.submit( 

774 self.process_rds_snapshot, 

775 db)) 

776 for f in as_completed(futures): 

777 if f.exception(): 

778 self.log.error( 

779 "Exception creating rds snapshot \n %s", 

780 f.exception()) 

781 return dbs 

782 

783 def process_rds_snapshot(self, resource): 

784 if not _db_instance_eligible_for_backup(resource): 

785 return 

786 

787 c = local_session(self.manager.session_factory).client('rds') 

788 c.create_db_snapshot( 

789 DBSnapshotIdentifier=snapshot_identifier( 

790 self.data.get('snapshot-prefix', 'Backup'), 

791 resource['DBInstanceIdentifier']), 

792 DBInstanceIdentifier=resource['DBInstanceIdentifier']) 

793 

794 

795@actions.register('resize') 

796class ResizeInstance(BaseAction): 

797 """Change the allocated storage of an rds instance. 

798 

799 :example: 

800 

801 This will find databases using over 85% of their allocated 

802 storage, and resize them to have an additional 30% storage 

803 the resize here is async during the next maintenance. 

804 

805 .. code-block:: yaml 

806 

807 policies: 

808 - name: rds-resize-up 

809 resource: rds 

810 filters: 

811 - type: metrics 

812 name: FreeStorageSpace 

813 percent-attr: AllocatedStorage 

814 attr-multiplier: 1073741824 

815 value: 90 

816 op: greater-than 

817 actions: 

818 - type: resize 

819 percent: 30 

820 

821 

822 This will find databases using under 20% of their allocated 

823 storage, and resize them to be 30% smaller, the resize here 

824 is configured to be immediate. 

825 

826 .. code-block:: yaml 

827 

828 policies: 

829 - name: rds-resize-down 

830 resource: rds 

831 filters: 

832 - type: metrics 

833 name: FreeStorageSpace 

834 percent-attr: AllocatedStorage 

835 attr-multiplier: 1073741824 

836 value: 90 

837 op: greater-than 

838 actions: 

839 - type: resize 

840 percent: -30 

841 immediate: true 

842 """ 

843 schema = type_schema( 

844 'resize', 

845 percent={'type': 'number'}, 

846 immediate={'type': 'boolean'}) 

847 

848 permissions = ('rds:ModifyDBInstance',) 

849 

850 def process(self, resources): 

851 c = local_session(self.manager.session_factory).client('rds') 

852 for r in resources: 

853 old_val = D(r['AllocatedStorage']) 

854 _100 = D(100) 

855 new_val = ((_100 + D(self.data['percent'])) / _100) * old_val 

856 rounded = int(new_val.quantize(D('0'), ROUND_HALF_UP)) 

857 c.modify_db_instance( 

858 DBInstanceIdentifier=r['DBInstanceIdentifier'], 

859 AllocatedStorage=rounded, 

860 ApplyImmediately=self.data.get('immediate', False)) 

861 

862 

863@actions.register('retention') 

864class RetentionWindow(BaseAction): 

865 """ 

866 Sets the 'BackupRetentionPeriod' value for automated snapshots, 

867 enforce (min, max, exact) sets retention days occordingly. 

868 :example: 

869 

870 .. code-block:: yaml 

871 

872 policies: 

873 - name: rds-snapshot-retention 

874 resource: rds 

875 filters: 

876 - type: value 

877 key: BackupRetentionPeriod 

878 value: 7 

879 op: lt 

880 actions: 

881 - type: retention 

882 days: 7 

883 copy-tags: true 

884 enforce: exact 

885 """ 

886 

887 date_attribute = "BackupRetentionPeriod" 

888 schema = type_schema( 

889 'retention', **{'days': {'type': 'number'}, 

890 'copy-tags': {'type': 'boolean'}, 

891 'enforce': {'type': 'string', 'enum': [ 

892 'min', 'max', 'exact']}}) 

893 permissions = ('rds:ModifyDBInstance',) 

894 

895 def process(self, dbs): 

896 with self.executor_factory(max_workers=3) as w: 

897 futures = [] 

898 for db in dbs: 

899 futures.append(w.submit( 

900 self.process_snapshot_retention, 

901 db)) 

902 for f in as_completed(futures): 

903 if f.exception(): 

904 self.log.error( 

905 "Exception setting rds retention \n %s", 

906 f.exception()) 

907 return dbs 

908 

909 def process_snapshot_retention(self, resource): 

910 current_retention = int(resource.get('BackupRetentionPeriod', 0)) 

911 current_copy_tags = resource['CopyTagsToSnapshot'] 

912 new_retention = self.data['days'] 

913 new_copy_tags = self.data.get('copy-tags', True) 

914 retention_type = self.data.get('enforce', 'min').lower() 

915 

916 if ((retention_type == 'min' or 

917 current_copy_tags != new_copy_tags) and 

918 _db_instance_eligible_for_backup(resource)): 

919 self.set_retention_window( 

920 resource, 

921 max(current_retention, new_retention), 

922 new_copy_tags) 

923 return resource 

924 

925 if ((retention_type == 'max' or 

926 current_copy_tags != new_copy_tags) and 

927 _db_instance_eligible_for_backup(resource)): 

928 self.set_retention_window( 

929 resource, 

930 min(current_retention, new_retention), 

931 new_copy_tags) 

932 return resource 

933 

934 if ((retention_type == 'exact' or 

935 current_copy_tags != new_copy_tags) and 

936 _db_instance_eligible_for_backup(resource)): 

937 self.set_retention_window(resource, new_retention, new_copy_tags) 

938 return resource 

939 

940 def set_retention_window(self, resource, retention, copy_tags): 

941 c = local_session(self.manager.session_factory).client('rds') 

942 c.modify_db_instance( 

943 DBInstanceIdentifier=resource['DBInstanceIdentifier'], 

944 BackupRetentionPeriod=retention, 

945 CopyTagsToSnapshot=copy_tags) 

946 

947 

948@actions.register('set-public-access') 

949class RDSSetPublicAvailability(BaseAction): 

950 """ 

951 This action allows for toggling an RDS instance 

952 'PubliclyAccessible' flag to true or false 

953 

954 :example: 

955 

956 .. code-block:: yaml 

957 

958 policies: 

959 - name: disable-rds-public-accessibility 

960 resource: rds 

961 filters: 

962 - PubliclyAccessible: true 

963 actions: 

964 - type: set-public-access 

965 state: false 

966 """ 

967 

968 schema = type_schema( 

969 "set-public-access", 

970 state={'type': 'boolean'}) 

971 permissions = ('rds:ModifyDBInstance',) 

972 

973 def set_accessibility(self, r): 

974 client = local_session(self.manager.session_factory).client('rds') 

975 client.modify_db_instance( 

976 DBInstanceIdentifier=r['DBInstanceIdentifier'], 

977 PubliclyAccessible=self.data.get('state', False)) 

978 

979 def process(self, rds): 

980 with self.executor_factory(max_workers=2) as w: 

981 futures = {w.submit(self.set_accessibility, r): r for r in rds} 

982 for f in as_completed(futures): 

983 if f.exception(): 

984 self.log.error( 

985 "Exception setting public access on %s \n %s", 

986 futures[f]['DBInstanceIdentifier'], f.exception()) 

987 return rds 

988 

989 

990@resources.register('rds-subscription') 

991class RDSSubscription(QueryResourceManager): 

992 

993 class resource_type(TypeInfo): 

994 service = 'rds' 

995 arn_type = 'es' 

996 cfn_type = 'AWS::RDS::EventSubscription' 

997 enum_spec = ( 

998 'describe_event_subscriptions', 'EventSubscriptionsList', None) 

999 name = id = "CustSubscriptionId" 

1000 arn = 'EventSubscriptionArn' 

1001 date = "SubscriptionCreateTime" 

1002 permissions_enum = ('rds:DescribeEventSubscriptions',) 

1003 universal_taggable = object() 

1004 

1005 augment = universal_augment 

1006 

1007 

1008@RDSSubscription.filter_registry.register('topic') 

1009class RDSSubscriptionSNSTopic(related.RelatedResourceFilter): 

1010 """ 

1011 Retrieves topics related to RDS event subscriptions 

1012 

1013 :example: 

1014 

1015 .. code-block:: yaml 

1016 

1017 policies: 

1018 - name: rds-subscriptions-no-confirmed-topics 

1019 resource: aws.rds-subscription 

1020 filters: 

1021 - type: topic 

1022 key: SubscriptionsConfirmed 

1023 value: 0 

1024 value_type: integer 

1025 """ 

1026 schema = type_schema('topic', rinherit=ValueFilter.schema) 

1027 RelatedResource = 'c7n.resources.sns.SNS' 

1028 RelatedIdsExpression = 'SnsTopicArn' 

1029 annotation_key = 'c7n:SnsTopic' 

1030 

1031 def process(self, resources, event=None): 

1032 rel = self.get_related(resources) 

1033 matched = [] 

1034 for resource in resources: 

1035 if self.process_resource(resource, rel): 

1036 # adding full topic details 

1037 resource[self.annotation_key] = rel.get( 

1038 resource[self.RelatedIdsExpression], 

1039 None # can be if value is "absent" 

1040 ) 

1041 matched.append(resource) 

1042 return matched 

1043 

1044 

1045@RDSSubscription.action_registry.register('delete') 

1046class RDSSubscriptionDelete(BaseAction): 

1047 """Deletes a RDS snapshot resource 

1048 

1049 :example: 

1050 

1051 .. code-block:: yaml 

1052 

1053 policies: 

1054 - name: rds-subscription-delete 

1055 resource: rds-subscription 

1056 filters: 

1057 - type: value 

1058 key: CustSubscriptionId 

1059 value: xyz 

1060 actions: 

1061 - delete 

1062 """ 

1063 

1064 schema = type_schema('delete') 

1065 permissions = ('rds:DeleteEventSubscription',) 

1066 

1067 def process(self, resources): 

1068 client = local_session(self.manager.session_factory).client('rds') 

1069 for r in resources: 

1070 self.manager.retry( 

1071 client.delete_event_subscription, SubscriptionName=r['CustSubscriptionId'], 

1072 ignore_err_codes=('SubscriptionNotFoundFault', 

1073 'InvalidEventSubscriptionStateFault')) 

1074 

1075 

1076class DescribeRDSSnapshot(DescribeSource): 

1077 

1078 def get_resources(self, ids, cache=True): 

1079 super_get = super().get_resources 

1080 return list(itertools.chain(*[super_get((i,)) for i in ids])) 

1081 

1082 def augment(self, snaps): 

1083 for s in snaps: 

1084 s['Tags'] = s.pop('TagList', ()) 

1085 return snaps 

1086 

1087 

1088@resources.register('rds-snapshot') 

1089class RDSSnapshot(QueryResourceManager): 

1090 """Resource manager for RDS DB snapshots. 

1091 """ 

1092 

1093 class resource_type(TypeInfo): 

1094 service = 'rds' 

1095 arn_type = 'snapshot' 

1096 arn_separator = ':' 

1097 enum_spec = ('describe_db_snapshots', 'DBSnapshots', None) 

1098 name = id = 'DBSnapshotIdentifier' 

1099 date = 'SnapshotCreateTime' 

1100 config_type = "AWS::RDS::DBSnapshot" 

1101 filter_name = "DBSnapshotIdentifier" 

1102 filter_type = "scalar" 

1103 universal_taggable = True 

1104 permissions_enum = ('rds:DescribeDBSnapshots',) 

1105 

1106 source_mapping = { 

1107 'describe': DescribeRDSSnapshot, 

1108 'config': ConfigSource 

1109 } 

1110 

1111 

1112@RDSSnapshot.filter_registry.register('onhour') 

1113class RDSSnapshotOnHour(OnHour): 

1114 """Scheduled action on rds snapshot.""" 

1115 

1116 

1117@RDSSnapshot.filter_registry.register('instance') 

1118class SnapshotInstance(related.RelatedResourceFilter): 

1119 """Filter snapshots by their database attributes. 

1120 

1121 :example: 

1122 

1123 Find snapshots without an extant database 

1124 

1125 .. code-block:: yaml 

1126 

1127 policies: 

1128 - name: rds-snapshot-orphan 

1129 resource: aws.rds-snapshot 

1130 filters: 

1131 - type: instance 

1132 value: 0 

1133 value_type: resource_count 

1134 """ 

1135 schema = type_schema( 

1136 'instance', rinherit=ValueFilter.schema 

1137 ) 

1138 

1139 RelatedResource = "c7n.resources.rds.RDS" 

1140 RelatedIdsExpression = "DBInstanceIdentifier" 

1141 FetchThreshold = 5 

1142 

1143 

1144@RDSSnapshot.filter_registry.register('latest') 

1145class LatestSnapshot(Filter): 

1146 """Return the latest snapshot for each database. 

1147 """ 

1148 schema = type_schema('latest', automatic={'type': 'boolean'}) 

1149 permissions = ('rds:DescribeDBSnapshots',) 

1150 

1151 def process(self, resources, event=None): 

1152 results = [] 

1153 if not self.data.get('automatic', True): 

1154 resources = [r for r in resources if r['SnapshotType'] == 'manual'] 

1155 for db_identifier, snapshots in itertools.groupby( 

1156 resources, operator.itemgetter('DBInstanceIdentifier')): 

1157 results.append( 

1158 sorted(snapshots, 

1159 key=operator.itemgetter('SnapshotCreateTime'))[-1]) 

1160 return results 

1161 

1162 

1163@RDSSnapshot.filter_registry.register('age') 

1164class RDSSnapshotAge(AgeFilter): 

1165 """Filters RDS snapshots based on age (in days) 

1166 

1167 :example: 

1168 

1169 .. code-block:: yaml 

1170 

1171 policies: 

1172 - name: rds-snapshot-expired 

1173 resource: rds-snapshot 

1174 filters: 

1175 - type: age 

1176 days: 28 

1177 op: ge 

1178 actions: 

1179 - delete 

1180 """ 

1181 

1182 schema = type_schema( 

1183 'age', days={'type': 'number'}, 

1184 op={'$ref': '#/definitions/filters_common/comparison_operators'}) 

1185 

1186 date_attribute = 'SnapshotCreateTime' 

1187 

1188 def get_resource_date(self, i): 

1189 return i.get('SnapshotCreateTime') 

1190 

1191 

1192@RDSSnapshot.action_registry.register('restore') 

1193class RestoreInstance(BaseAction): 

1194 """Restore an rds instance from a snapshot. 

1195 

1196 Note this requires the snapshot or db deletion be taken 

1197 with the `copy-restore-info` boolean flag set to true, as 

1198 various instance metadata is stored on the snapshot as tags. 

1199 

1200 additional parameters to restore db instance api call be overriden 

1201 via `restore_options` settings. various modify db instance parameters 

1202 can be specified via `modify_options` settings. 

1203 """ 

1204 

1205 schema = type_schema( 

1206 'restore', 

1207 restore_options={'type': 'object'}, 

1208 modify_options={'type': 'object'}) 

1209 

1210 permissions = ( 

1211 'rds:ModifyDBInstance', 

1212 'rds:ModifyDBParameterGroup', 

1213 'rds:ModifyOptionGroup', 

1214 'rds:RebootDBInstance', 

1215 'rds:RestoreDBInstanceFromDBSnapshot') 

1216 

1217 poll_period = 60 

1218 restore_keys = { 

1219 'VPCSecurityGroups', 'MultiAZ', 'DBSubnetGroupName', 

1220 'InstanceClass', 'StorageType', 'ParameterGroupName', 

1221 'OptionGroupName'} 

1222 

1223 def validate(self): 

1224 found = False 

1225 for f in self.manager.iter_filters(): 

1226 if isinstance(f, LatestSnapshot): 

1227 found = True 

1228 if not found: 

1229 # do we really need this... 

1230 raise PolicyValidationError( 

1231 "must filter by latest to use restore action %s" % ( 

1232 self.manager.data,)) 

1233 return self 

1234 

1235 def process(self, resources): 

1236 client = local_session(self.manager.session_factory).client('rds') 

1237 # restore up to 10 in parallel, we have to wait on each. 

1238 with self.executor_factory( 

1239 max_workers=min(10, len(resources) or 1)) as w: 

1240 futures = {} 

1241 for r in resources: 

1242 tags = {t['Key']: t['Value'] for t in r['Tags']} 

1243 if not set(tags).issuperset(self.restore_keys): 

1244 self.log.warning( 

1245 "snapshot:%s missing restore tags", 

1246 r['DBSnapshotIdentifier']) 

1247 continue 

1248 futures[w.submit(self.process_instance, client, r)] = r 

1249 for f in as_completed(futures): 

1250 r = futures[f] 

1251 if f.exception(): 

1252 self.log.warning( 

1253 "Error restoring db:%s from:%s error:\n%s", 

1254 r['DBInstanceIdentifier'], r['DBSnapshotIdentifier'], 

1255 f.exception()) 

1256 continue 

1257 

1258 def process_instance(self, client, r): 

1259 params, post_modify = self.get_restore_from_tags(r) 

1260 self.manager.retry( 

1261 client.restore_db_instance_from_db_snapshot, **params) 

1262 waiter = client.get_waiter('db_instance_available') 

1263 # wait up to 40m 

1264 waiter.config.delay = self.poll_period 

1265 waiter.wait(DBInstanceIdentifier=params['DBInstanceIdentifier']) 

1266 self.manager.retry( 

1267 client.modify_db_instance, 

1268 DBInstanceIdentifier=params['DBInstanceIdentifier'], 

1269 ApplyImmediately=True, 

1270 **post_modify) 

1271 self.manager.retry( 

1272 client.reboot_db_instance, 

1273 DBInstanceIdentifier=params['DBInstanceIdentifier'], 

1274 ForceFailover=False) 

1275 

1276 def get_restore_from_tags(self, snapshot): 

1277 params, post_modify = {}, {} 

1278 tags = {t['Key']: t['Value'] for t in snapshot['Tags']} 

1279 

1280 params['DBInstanceIdentifier'] = snapshot['DBInstanceIdentifier'] 

1281 params['DBSnapshotIdentifier'] = snapshot['DBSnapshotIdentifier'] 

1282 params['MultiAZ'] = tags['MultiAZ'] == 'True' and True or False 

1283 params['DBSubnetGroupName'] = tags['DBSubnetGroupName'] 

1284 params['DBInstanceClass'] = tags['InstanceClass'] 

1285 params['CopyTagsToSnapshot'] = True 

1286 params['StorageType'] = tags['StorageType'] 

1287 params['OptionGroupName'] = tags['OptionGroupName'] 

1288 

1289 post_modify['DBParameterGroupName'] = tags['ParameterGroupName'] 

1290 post_modify['VpcSecurityGroupIds'] = tags['VPCSecurityGroups'].split(',') 

1291 

1292 params['Tags'] = [ 

1293 {'Key': k, 'Value': v} for k, v in tags.items() 

1294 if k not in self.restore_keys] 

1295 

1296 params.update(self.data.get('restore_options', {})) 

1297 post_modify.update(self.data.get('modify_options', {})) 

1298 return params, post_modify 

1299 

1300 

1301@RDSSnapshot.filter_registry.register('cross-account') 

1302class CrossAccountAccess(CrossAccountAccessFilter): 

1303 

1304 permissions = ('rds:DescribeDBSnapshotAttributes',) 

1305 attributes_key = 'c7n:attributes' 

1306 annotation_key = 'c7n:CrossAccountViolations' 

1307 

1308 def process(self, resources, event=None): 

1309 self.accounts = self.get_accounts() 

1310 self.everyone_only = self.data.get("everyone_only", False) 

1311 results = [] 

1312 with self.executor_factory(max_workers=2) as w: 

1313 futures = [] 

1314 for resource_set in chunks(resources, 20): 

1315 futures.append(w.submit( 

1316 self.process_resource_set, resource_set)) 

1317 for f in as_completed(futures): 

1318 if f.exception(): 

1319 self.log.error( 

1320 "Exception checking cross account access\n %s" % ( 

1321 f.exception())) 

1322 continue 

1323 results.extend(f.result()) 

1324 return results 

1325 

1326 def process_resource_set(self, resource_set): 

1327 client = local_session(self.manager.session_factory).client('rds') 

1328 results = [] 

1329 for r in resource_set: 

1330 attrs = {t['AttributeName']: t['AttributeValues'] 

1331 for t in self.manager.retry( 

1332 client.describe_db_snapshot_attributes, 

1333 DBSnapshotIdentifier=r['DBSnapshotIdentifier'])[ 

1334 'DBSnapshotAttributesResult']['DBSnapshotAttributes']} 

1335 r[self.attributes_key] = attrs 

1336 shared_accounts = set(attrs.get('restore', [])) 

1337 if self.everyone_only: 

1338 shared_accounts = {a for a in shared_accounts if a == 'all'} 

1339 delta_accounts = shared_accounts.difference(self.accounts) 

1340 if delta_accounts: 

1341 r[self.annotation_key] = list(delta_accounts) 

1342 results.append(r) 

1343 return results 

1344 

1345 

1346@RDSSnapshot.action_registry.register('set-permissions') 

1347class SetPermissions(BaseAction): 

1348 """Set permissions for copying or restoring an RDS snapshot 

1349 

1350 Use the 'add' and 'remove' parameters to control which accounts to 

1351 add or remove, respectively. The default is to remove any 

1352 permissions granted to other AWS accounts. 

1353 

1354 Use `remove: matched` in combination with the `cross-account` filter 

1355 for more flexible removal options such as preserving access for 

1356 a set of whitelisted accounts: 

1357 

1358 :example: 

1359 

1360 .. code-block:: yaml 

1361 

1362 policies: 

1363 - name: rds-snapshot-remove-cross-account 

1364 resource: rds-snapshot 

1365 filters: 

1366 - type: cross-account 

1367 whitelist: 

1368 - '112233445566' 

1369 actions: 

1370 - type: set-permissions 

1371 remove: matched 

1372 """ 

1373 schema = type_schema( 

1374 'set-permissions', 

1375 remove={'oneOf': [ 

1376 {'enum': ['matched']}, 

1377 {'type': 'array', 'items': { 

1378 'oneOf': [ 

1379 {'type': 'string', 'minLength': 12, 'maxLength': 12}, 

1380 {'enum': ['all']}, 

1381 ], 

1382 }} 

1383 ]}, 

1384 add={ 

1385 'type': 'array', 'items': { 

1386 'oneOf': [ 

1387 {'type': 'string', 'minLength': 12, 'maxLength': 12}, 

1388 {'enum': ['all']}, 

1389 ] 

1390 } 

1391 } 

1392 ) 

1393 

1394 permissions = ('rds:ModifyDBSnapshotAttribute',) 

1395 

1396 def validate(self): 

1397 if self.data.get('remove') == 'matched': 

1398 found = False 

1399 for f in self.manager.iter_filters(): 

1400 if isinstance(f, CrossAccountAccessFilter): 

1401 found = True 

1402 break 

1403 if not found: 

1404 raise PolicyValidationError( 

1405 "policy:%s filter:%s with matched requires cross-account filter" % ( 

1406 self.manager.ctx.policy.name, self.type)) 

1407 

1408 def process(self, snapshots): 

1409 client = local_session(self.manager.session_factory).client('rds') 

1410 for s in snapshots: 

1411 self.process_snapshot(client, s) 

1412 

1413 def process_snapshot(self, client, snapshot): 

1414 add_accounts = self.data.get('add', []) 

1415 remove_accounts = self.data.get('remove', []) 

1416 

1417 if not (add_accounts or remove_accounts): 

1418 if CrossAccountAccess.attributes_key not in snapshot: 

1419 attrs = { 

1420 t['AttributeName']: t['AttributeValues'] 

1421 for t in self.manager.retry( 

1422 client.describe_db_snapshot_attributes, 

1423 DBSnapshotIdentifier=snapshot['DBSnapshotIdentifier'] 

1424 )['DBSnapshotAttributesResult']['DBSnapshotAttributes'] 

1425 } 

1426 snapshot[CrossAccountAccess.attributes_key] = attrs 

1427 remove_accounts = snapshot[CrossAccountAccess.attributes_key].get('restore', []) 

1428 elif remove_accounts == 'matched': 

1429 remove_accounts = snapshot.get(CrossAccountAccess.annotation_key, []) 

1430 

1431 if add_accounts or remove_accounts: 

1432 client.modify_db_snapshot_attribute( 

1433 DBSnapshotIdentifier=snapshot['DBSnapshotIdentifier'], 

1434 AttributeName='restore', 

1435 ValuesToRemove=remove_accounts, 

1436 ValuesToAdd=add_accounts) 

1437 

1438 

1439@RDSSnapshot.action_registry.register('region-copy') 

1440class RegionCopySnapshot(BaseAction): 

1441 """Copy a snapshot across regions. 

1442 

1443 Note there is a max in flight for cross region rds snapshots 

1444 of 5 per region. This action will attempt to retry automatically 

1445 for an hr. 

1446 

1447 Example:: 

1448 

1449 - name: copy-encrypted-snapshots 

1450 description: | 

1451 copy snapshots under 1 day old to dr region with kms 

1452 resource: rds-snapshot 

1453 region: us-east-1 

1454 filters: 

1455 - Status: available 

1456 - type: value 

1457 key: SnapshotCreateTime 

1458 value_type: age 

1459 value: 1 

1460 op: less-than 

1461 actions: 

1462 - type: region-copy 

1463 target_region: us-east-2 

1464 target_key: arn:aws:kms:us-east-2:0000:key/cb291f53-c9cf61 

1465 copy_tags: true 

1466 tags: 

1467 OriginRegion: us-east-1 

1468 """ 

1469 

1470 schema = type_schema( 

1471 'region-copy', 

1472 target_region={'type': 'string'}, 

1473 target_key={'type': 'string'}, 

1474 copy_tags={'type': 'boolean'}, 

1475 tags={'type': 'object'}, 

1476 required=('target_region',)) 

1477 

1478 permissions = ('rds:CopyDBSnapshot',) 

1479 min_delay = 120 

1480 max_attempts = 30 

1481 

1482 def validate(self): 

1483 if self.data.get('target_region') and self.manager.data.get('mode'): 

1484 raise PolicyValidationError( 

1485 "cross region snapshot may require waiting for " 

1486 "longer then lambda runtime allows %s" % (self.manager.data,)) 

1487 return self 

1488 

1489 def process(self, resources): 

1490 if self.data['target_region'] == self.manager.config.region: 

1491 self.log.warning( 

1492 "Source and destination region are the same, skipping copy") 

1493 return 

1494 for resource_set in chunks(resources, 20): 

1495 self.process_resource_set(resource_set) 

1496 

1497 def process_resource(self, target, key, tags, snapshot): 

1498 p = {} 

1499 if key: 

1500 p['KmsKeyId'] = key 

1501 p['TargetDBSnapshotIdentifier'] = snapshot[ 

1502 'DBSnapshotIdentifier'].replace(':', '-') 

1503 p['SourceRegion'] = self.manager.config.region 

1504 p['SourceDBSnapshotIdentifier'] = snapshot['DBSnapshotArn'] 

1505 

1506 if self.data.get('copy_tags', True): 

1507 p['CopyTags'] = True 

1508 if tags: 

1509 p['Tags'] = tags 

1510 

1511 retry = get_retry( 

1512 ('SnapshotQuotaExceeded',), 

1513 # TODO make this configurable, class defaults to 1hr 

1514 min_delay=self.min_delay, 

1515 max_attempts=self.max_attempts, 

1516 log_retries=logging.DEBUG) 

1517 try: 

1518 result = retry(target.copy_db_snapshot, **p) 

1519 except ClientError as e: 

1520 if e.response['Error']['Code'] == 'DBSnapshotAlreadyExists': 

1521 self.log.warning( 

1522 "Snapshot %s already exists in target region", 

1523 snapshot['DBSnapshotIdentifier']) 

1524 return 

1525 raise 

1526 snapshot['c7n:CopiedSnapshot'] = result[ 

1527 'DBSnapshot']['DBSnapshotArn'] 

1528 

1529 def process_resource_set(self, resource_set): 

1530 target_client = self.manager.session_factory( 

1531 region=self.data['target_region']).client('rds') 

1532 target_key = self.data.get('target_key') 

1533 tags = [{'Key': k, 'Value': v} for k, v 

1534 in self.data.get('tags', {}).items()] 

1535 

1536 for snapshot_set in chunks(resource_set, 5): 

1537 for r in snapshot_set: 

1538 # If tags are supplied, copy tags are ignored, and 

1539 # we need to augment the tag set with the original 

1540 # resource tags to preserve the common case. 

1541 rtags = tags and list(tags) or None 

1542 if tags and self.data.get('copy_tags', True): 

1543 rtags.extend(r['Tags']) 

1544 self.process_resource(target_client, target_key, rtags, r) 

1545 

1546 

1547@RDSSnapshot.action_registry.register('delete') 

1548class RDSSnapshotDelete(BaseAction): 

1549 """Deletes a RDS snapshot resource 

1550 

1551 :example: 

1552 

1553 .. code-block:: yaml 

1554 

1555 policies: 

1556 - name: rds-snapshot-delete-stale 

1557 resource: rds-snapshot 

1558 filters: 

1559 - type: age 

1560 days: 28 

1561 op: ge 

1562 actions: 

1563 - delete 

1564 """ 

1565 

1566 schema = type_schema('delete') 

1567 permissions = ('rds:DeleteDBSnapshot',) 

1568 

1569 def process(self, snapshots): 

1570 snapshots = self.filter_resources(snapshots, 'SnapshotType', ('manual',)) 

1571 if not snapshots: 

1572 return [] 

1573 log.info("Deleting %d rds snapshots", len(snapshots)) 

1574 with self.executor_factory(max_workers=3) as w: 

1575 futures = [] 

1576 for snapshot_set in chunks(reversed(snapshots), size=50): 

1577 futures.append( 

1578 w.submit(self.process_snapshot_set, snapshot_set)) 

1579 for f in as_completed(futures): 

1580 if f.exception(): 

1581 self.log.error( 

1582 "Exception deleting snapshot set \n %s", 

1583 f.exception()) 

1584 return snapshots 

1585 

1586 def process_snapshot_set(self, snapshots_set): 

1587 c = local_session(self.manager.session_factory).client('rds') 

1588 for s in snapshots_set: 

1589 c.delete_db_snapshot( 

1590 DBSnapshotIdentifier=s['DBSnapshotIdentifier']) 

1591 

1592 

1593@actions.register('modify-security-groups') 

1594class RDSModifyVpcSecurityGroups(ModifyVpcSecurityGroupsAction): 

1595 

1596 permissions = ('rds:ModifyDBInstance', 'rds:ModifyDBCluster') 

1597 vpc_expr = 'DBSubnetGroup.VpcId' 

1598 

1599 def process(self, rds_instances): 

1600 replication_group_map = {} 

1601 client = local_session(self.manager.session_factory).client('rds') 

1602 groups = super(RDSModifyVpcSecurityGroups, self).get_groups( 

1603 rds_instances) 

1604 

1605 # either build map for DB cluster or modify DB instance directly 

1606 for idx, i in enumerate(rds_instances): 

1607 if i.get('DBClusterIdentifier'): 

1608 # build map of Replication Groups to Security Groups 

1609 replication_group_map[i['DBClusterIdentifier']] = groups[idx] 

1610 else: 

1611 client.modify_db_instance( 

1612 DBInstanceIdentifier=i['DBInstanceIdentifier'], 

1613 VpcSecurityGroupIds=groups[idx]) 

1614 

1615 # handle DB cluster, if necessary 

1616 for idx, r in enumerate(replication_group_map.keys()): 

1617 client.modify_db_cluster( 

1618 DBClusterIdentifier=r, 

1619 VpcSecurityGroupIds=replication_group_map[r] 

1620 ) 

1621 

1622 

1623class DescribeSubnetGroup(DescribeSource): 

1624 

1625 def augment(self, resources): 

1626 _db_subnet_group_tags( 

1627 resources, self.manager.session_factory, 

1628 self.manager.executor_factory, self.manager.retry) 

1629 return resources 

1630 

1631 

1632@resources.register('rds-subnet-group') 

1633class RDSSubnetGroup(QueryResourceManager): 

1634 """RDS subnet group.""" 

1635 

1636 class resource_type(TypeInfo): 

1637 service = 'rds' 

1638 arn_type = 'subgrp' 

1639 id = name = 'DBSubnetGroupName' 

1640 arn_separator = ':' 

1641 enum_spec = ( 

1642 'describe_db_subnet_groups', 'DBSubnetGroups', None) 

1643 filter_name = 'DBSubnetGroupName' 

1644 filter_type = 'scalar' 

1645 permissions_enum = ('rds:DescribeDBSubnetGroups',) 

1646 cfn_type = config_type = 'AWS::RDS::DBSubnetGroup' 

1647 universal_taggable = object() 

1648 

1649 source_mapping = { 

1650 'config': ConfigSource, 

1651 'describe': DescribeSubnetGroup 

1652 } 

1653 

1654 

1655def _db_subnet_group_tags(subnet_groups, session_factory, executor_factory, retry): 

1656 client = local_session(session_factory).client('rds') 

1657 

1658 def process_tags(g): 

1659 try: 

1660 g['Tags'] = client.list_tags_for_resource( 

1661 ResourceName=g['DBSubnetGroupArn'])['TagList'] 

1662 return g 

1663 except client.exceptions.DBSubnetGroupNotFoundFault: 

1664 return None 

1665 

1666 return list(filter(None, map(process_tags, subnet_groups))) 

1667 

1668 

1669@RDSSubnetGroup.action_registry.register('delete') 

1670class RDSSubnetGroupDeleteAction(BaseAction): 

1671 """Action to delete RDS Subnet Group 

1672 

1673 It is recommended to apply a filter to the delete policy to avoid unwanted 

1674 deletion of any rds subnet groups. 

1675 

1676 :example: 

1677 

1678 .. code-block:: yaml 

1679 

1680 policies: 

1681 - name: rds-subnet-group-delete 

1682 resource: rds-subnet-group 

1683 filters: 

1684 - Instances: [] 

1685 actions: 

1686 - delete 

1687 """ 

1688 

1689 schema = type_schema('delete') 

1690 permissions = ('rds:DeleteDBSubnetGroup',) 

1691 

1692 def process(self, subnet_group): 

1693 with self.executor_factory(max_workers=2) as w: 

1694 list(w.map(self.process_subnetgroup, subnet_group)) 

1695 

1696 def process_subnetgroup(self, subnet_group): 

1697 client = local_session(self.manager.session_factory).client('rds') 

1698 client.delete_db_subnet_group(DBSubnetGroupName=subnet_group['DBSubnetGroupName']) 

1699 

1700 

1701@RDSSubnetGroup.filter_registry.register('unused') 

1702class UnusedRDSSubnetGroup(Filter): 

1703 """Filters all launch rds subnet groups that are not in use but exist 

1704 

1705 :example: 

1706 

1707 .. code-block:: yaml 

1708 

1709 policies: 

1710 - name: rds-subnet-group-delete-unused 

1711 resource: rds-subnet-group 

1712 filters: 

1713 - unused 

1714 """ 

1715 

1716 schema = type_schema('unused') 

1717 

1718 def get_permissions(self): 

1719 return self.manager.get_resource_manager('rds').get_permissions() 

1720 

1721 def process(self, configs, event=None): 

1722 rds = self.manager.get_resource_manager('rds').resources() 

1723 self.used = set(jmespath_search('[].DBSubnetGroup.DBSubnetGroupName', rds)) 

1724 self.used.update(set(jmespath_search('[].DBSubnetGroup.DBSubnetGroupName', 

1725 self.manager.get_resource_manager('rds-cluster').resources(augment=False)))) 

1726 return super(UnusedRDSSubnetGroup, self).process(configs) 

1727 

1728 def __call__(self, config): 

1729 return config['DBSubnetGroupName'] not in self.used 

1730 

1731 

1732@filters.register('db-parameter') 

1733class ParameterFilter(ValueFilter): 

1734 """ 

1735 Applies value type filter on set db parameter values. 

1736 :example: 

1737 

1738 .. code-block:: yaml 

1739 

1740 policies: 

1741 - name: rds-pg 

1742 resource: rds 

1743 filters: 

1744 - type: db-parameter 

1745 key: someparam 

1746 op: eq 

1747 value: someval 

1748 """ 

1749 

1750 schema = type_schema('db-parameter', rinherit=ValueFilter.schema) 

1751 schema_alias = False 

1752 permissions = ('rds:DescribeDBInstances', 'rds:DescribeDBParameters', ) 

1753 policy_annotation = 'c7n:MatchedDBParameter' 

1754 

1755 @staticmethod 

1756 def recast(val, datatype): 

1757 """ Re-cast the value based upon an AWS supplied datatype 

1758 and treat nulls sensibly. 

1759 """ 

1760 ret_val = val 

1761 if datatype == 'string': 

1762 ret_val = str(val) 

1763 elif datatype == 'boolean': 

1764 # AWS returns 1s and 0s for boolean for most of the cases 

1765 if val.isdigit(): 

1766 ret_val = bool(int(val)) 

1767 # AWS returns 'TRUE,FALSE' for Oracle engine 

1768 elif val == 'TRUE': 

1769 ret_val = True 

1770 elif val == 'FALSE': 

1771 ret_val = False 

1772 elif datatype == 'integer': 

1773 if val.isdigit(): 

1774 ret_val = int(val) 

1775 elif datatype == 'float': 

1776 ret_val = float(val) if val else 0.0 

1777 

1778 return ret_val 

1779 

1780 # Private method for 'DBParameterGroupName' paginator 

1781 def _get_param_list(self, pg): 

1782 client = local_session(self.manager.session_factory).client('rds') 

1783 paginator = client.get_paginator('describe_db_parameters') 

1784 param_list = list(itertools.chain(*[p['Parameters'] 

1785 for p in paginator.paginate(DBParameterGroupName=pg)])) 

1786 return param_list 

1787 

1788 def handle_paramgroup_cache(self, param_groups): 

1789 pgcache = {} 

1790 cache = self.manager._cache 

1791 

1792 with cache: 

1793 for pg in param_groups: 

1794 cache_key = { 

1795 'region': self.manager.config.region, 

1796 'account_id': self.manager.config.account_id, 

1797 'rds-pg': pg} 

1798 pg_values = cache.get(cache_key) 

1799 if pg_values is not None: 

1800 pgcache[pg] = pg_values 

1801 continue 

1802 param_list = self._get_param_list(pg) 

1803 pgcache[pg] = { 

1804 p['ParameterName']: self.recast(p['ParameterValue'], p['DataType']) 

1805 for p in param_list if 'ParameterValue' in p} 

1806 cache.save(cache_key, pgcache[pg]) 

1807 return pgcache 

1808 

1809 def process(self, resources, event=None): 

1810 results = [] 

1811 parameter_group_list = {db['DBParameterGroups'][0]['DBParameterGroupName'] 

1812 for db in resources} 

1813 paramcache = self.handle_paramgroup_cache(parameter_group_list) 

1814 for resource in resources: 

1815 for pg in resource['DBParameterGroups']: 

1816 pg_values = paramcache[pg['DBParameterGroupName']] 

1817 if self.match(pg_values): 

1818 resource.setdefault(self.policy_annotation, []).append( 

1819 self.data.get('key')) 

1820 results.append(resource) 

1821 break 

1822 return results 

1823 

1824 

1825@actions.register('modify-db') 

1826class ModifyDb(BaseAction): 

1827 """Modifies an RDS instance based on specified parameter 

1828 using ModifyDbInstance. 

1829 

1830 'Update' is an array with with key value pairs that should be set to 

1831 the property and value you wish to modify. 

1832 'Immediate" determines whether the modification is applied immediately 

1833 or not. If 'immediate' is not specified, default is false. 

1834 

1835 :example: 

1836 

1837 .. code-block:: yaml 

1838 

1839 policies: 

1840 - name: disable-rds-deletion-protection 

1841 resource: rds 

1842 filters: 

1843 - DeletionProtection: true 

1844 - PubliclyAccessible: true 

1845 actions: 

1846 - type: modify-db 

1847 update: 

1848 - property: 'DeletionProtection' 

1849 value: false 

1850 - property: 'PubliclyAccessible' 

1851 value: false 

1852 immediate: true 

1853 """ 

1854 

1855 schema = type_schema( 

1856 'modify-db', 

1857 immediate={"type": 'boolean'}, 

1858 update={ 

1859 'type': 'array', 

1860 'items': { 

1861 'type': 'object', 

1862 'properties': { 

1863 'property': {'type': 'string', 'enum': [ 

1864 'AllocatedStorage', 

1865 'DBInstanceClass', 

1866 'DBSubnetGroupName', 

1867 'DBSecurityGroups', 

1868 'VpcSecurityGroupIds', 

1869 'MasterUserPassword', 

1870 'DBParameterGroupName', 

1871 'BackupRetentionPeriod', 

1872 'PreferredBackupWindow', 

1873 'PreferredMaintenanceWindow', 

1874 'MultiAZ', 

1875 'EngineVersion', 

1876 'AllowMajorVersionUpgrade', 

1877 'AutoMinorVersionUpgrade', 

1878 'LicenseModel', 

1879 'Iops', 

1880 'OptionGroupName', 

1881 'NewDBInstanceIdentifier', 

1882 'StorageType', 

1883 'TdeCredentialArn', 

1884 'TdeCredentialPassword', 

1885 'CACertificateIdentifier', 

1886 'Domain', 

1887 'CopyTagsToSnapshot', 

1888 'MonitoringInterval', 

1889 'MonitoringRoleARN', 

1890 'DBPortNumber', 

1891 'PubliclyAccessible', 

1892 'DomainIAMRoleName', 

1893 'PromotionTier', 

1894 'EnableIAMDatabaseAuthentication', 

1895 'EnablePerformanceInsights', 

1896 'PerformanceInsightsKMSKeyId', 

1897 'PerformanceInsightsRetentionPeriod', 

1898 'CloudwatchLogsExportConfiguration', 

1899 'ProcessorFeatures', 

1900 'UseDefaultProcessorFeatures', 

1901 'DeletionProtection', 

1902 'MaxAllocatedStorage', 

1903 'CertificateRotationRestart']}, 

1904 'value': {} 

1905 }, 

1906 }, 

1907 }, 

1908 required=('update',)) 

1909 

1910 permissions = ('rds:ModifyDBInstance',) 

1911 conversion_map = { 

1912 'DBSubnetGroupName': 'DBSubnetGroup.DBSubnetGroupName', 

1913 'VpcSecurityGroupIds': 'VpcSecurityGroups[].VpcSecurityGroupId', 

1914 'DBParameterGroupName': 'DBParameterGroups[].DBParameterGroupName', 

1915 'OptionGroupName': 'OptionGroupMemberships[].OptionGroupName', 

1916 'NewDBInstanceIdentifier': 'DBInstanceIdentifier', 

1917 'Domain': 'DomainMemberships[].DomainName', 

1918 'DBPortNumber': 'Endpoint.Port', 

1919 'EnablePerformanceInsights': 'PerformanceInsightsEnabled', 

1920 'CloudwatchLogsExportConfiguration': 'EnabledCloudwatchLogsExports' 

1921 } 

1922 

1923 def validate(self): 

1924 if self.data.get('update'): 

1925 update_dict = dict((i['property'], i['value']) for i in self.data.get('update')) 

1926 if ('MonitoringInterval' in update_dict and update_dict['MonitoringInterval'] > 0 and 

1927 'MonitoringRoleARN' not in update_dict): 

1928 raise PolicyValidationError( 

1929 "A MonitoringRoleARN value is required \ 

1930 if you specify a MonitoringInterval value other than 0") 

1931 if ('CloudwatchLogsExportConfiguration' in update_dict 

1932 and all( 

1933 k not in update_dict.get('CloudwatchLogsExportConfiguration') 

1934 for k in ('EnableLogTypes', 'DisableLogTypes'))): 

1935 raise PolicyValidationError( 

1936 "A EnableLogTypes or DisableLogTypes input list is required\ 

1937 for setting CloudwatchLogsExportConfiguration") 

1938 return self 

1939 

1940 def process(self, resources): 

1941 c = local_session(self.manager.session_factory).client('rds') 

1942 for r in resources: 

1943 param = { 

1944 u['property']: u['value'] for u in self.data.get('update') 

1945 if r.get( 

1946 u['property'], 

1947 jmespath_search( 

1948 self.conversion_map.get(u['property'], 'None'), r)) 

1949 != u['value']} 

1950 if not param: 

1951 continue 

1952 param['ApplyImmediately'] = self.data.get('immediate', False) 

1953 param['DBInstanceIdentifier'] = r['DBInstanceIdentifier'] 

1954 try: 

1955 c.modify_db_instance(**param) 

1956 except c.exceptions.DBInstanceNotFoundFault: 

1957 raise 

1958 

1959 

1960@resources.register('rds-reserved') 

1961class ReservedRDS(QueryResourceManager): 

1962 """Lists all active rds reservations 

1963 

1964 :example: 

1965 

1966 .. code-block:: yaml 

1967 

1968 policies: 

1969 - name: existing-rds-reservations 

1970 resource: rds-reserved 

1971 filters: 

1972 - State: active 

1973 """ 

1974 

1975 class resource_type(TypeInfo): 

1976 service = 'rds' 

1977 name = id = 'ReservedDBInstanceId' 

1978 date = 'StartTime' 

1979 enum_spec = ( 

1980 'describe_reserved_db_instances', 'ReservedDBInstances', None) 

1981 filter_name = 'ReservedDBInstances' 

1982 filter_type = 'list' 

1983 arn_type = "ri" 

1984 arn = "ReservedDBInstanceArn" 

1985 permissions_enum = ('rds:DescribeReservedDBInstances',) 

1986 universal_taggable = object() 

1987 

1988 augment = universal_augment 

1989 

1990 

1991RDS.filter_registry.register('consecutive-aws-backups', ConsecutiveAwsBackupsFilter) 

1992 

1993 

1994@filters.register('consecutive-snapshots') 

1995class ConsecutiveSnapshots(Filter): 

1996 """Returns instances where number of consective daily snapshots is 

1997 equal to/or greater than n days. 

1998 

1999 :example: 

2000 

2001 .. code-block:: yaml 

2002 

2003 policies: 

2004 - name: rds-daily-snapshot-count 

2005 resource: rds 

2006 filters: 

2007 - type: consecutive-snapshots 

2008 days: 7 

2009 """ 

2010 schema = type_schema('consecutive-snapshots', days={'type': 'number', 'minimum': 1}, 

2011 required=['days']) 

2012 permissions = ('rds:DescribeDBSnapshots', 'rds:DescribeDBInstances') 

2013 annotation = 'c7n:DBSnapshots' 

2014 

2015 def process_resource_set(self, client, resources): 

2016 rds_instances = [r['DBInstanceIdentifier'] for r in resources] 

2017 paginator = client.get_paginator('describe_db_snapshots') 

2018 paginator.PAGE_ITERATOR_CLS = RetryPageIterator 

2019 db_snapshots = paginator.paginate(Filters=[{'Name': 'db-instance-id', 

2020 'Values': rds_instances}]).build_full_result().get('DBSnapshots', []) 

2021 

2022 inst_map = {} 

2023 for snapshot in db_snapshots: 

2024 inst_map.setdefault(snapshot['DBInstanceIdentifier'], []).append(snapshot) 

2025 for r in resources: 

2026 r[self.annotation] = inst_map.get(r['DBInstanceIdentifier'], []) 

2027 

2028 def process(self, resources, event=None): 

2029 client = local_session(self.manager.session_factory).client('rds') 

2030 results = [] 

2031 retention = self.data.get('days') 

2032 utcnow = datetime.datetime.utcnow() 

2033 expected_dates = set() 

2034 for days in range(1, retention + 1): 

2035 expected_dates.add((utcnow - timedelta(days=days)).strftime('%Y-%m-%d')) 

2036 

2037 for resource_set in chunks( 

2038 [r for r in resources if self.annotation not in r], 50): 

2039 self.process_resource_set(client, resource_set) 

2040 

2041 for r in resources: 

2042 snapshot_dates = set() 

2043 for snapshot in r[self.annotation]: 

2044 if snapshot['Status'] == 'available': 

2045 snapshot_dates.add(snapshot['SnapshotCreateTime'].strftime('%Y-%m-%d')) 

2046 if expected_dates.issubset(snapshot_dates): 

2047 results.append(r) 

2048 return results 

2049 

2050 

2051@filters.register('engine') 

2052class EngineFilter(ValueFilter): 

2053 """ 

2054 Filter a rds resource based on its Engine Metadata 

2055 

2056 :example: 

2057 

2058 .. code-block:: yaml 

2059 

2060 policies: 

2061 - name: find-deprecated-versions 

2062 resource: aws.rds 

2063 filters: 

2064 - type: engine 

2065 key: Status 

2066 value: deprecated 

2067 """ 

2068 

2069 schema = type_schema('engine', rinherit=ValueFilter.schema) 

2070 

2071 permissions = ("rds:DescribeDBEngineVersions", ) 

2072 

2073 def process(self, resources, event=None): 

2074 client = local_session(self.manager.session_factory).client('rds') 

2075 

2076 engines = set() 

2077 engine_versions = set() 

2078 for r in resources: 

2079 engines.add(r['Engine']) 

2080 engine_versions.add(r['EngineVersion']) 

2081 

2082 paginator = client.get_paginator('describe_db_engine_versions') 

2083 response = paginator.paginate( 

2084 Filters=[ 

2085 {'Name': 'engine', 'Values': list(engines)}, 

2086 {'Name': 'engine-version', 'Values': list(engine_versions)} 

2087 ], 

2088 IncludeAll=True, 

2089 ) 

2090 all_versions = {} 

2091 matched = [] 

2092 for page in response: 

2093 for e in page['DBEngineVersions']: 

2094 all_versions.setdefault(e['Engine'], {}) 

2095 all_versions[e['Engine']][e['EngineVersion']] = e 

2096 for r in resources: 

2097 v = all_versions[r['Engine']][r['EngineVersion']] 

2098 if self.match(v): 

2099 r['c7n:Engine'] = v 

2100 matched.append(r) 

2101 return matched 

2102 

2103 

2104class DescribeDBProxy(DescribeSource): 

2105 def augment(self, resources): 

2106 return universal_augment(self.manager, resources) 

2107 

2108 

2109@resources.register('rds-proxy') 

2110class RDSProxy(QueryResourceManager): 

2111 """Resource Manager for RDS DB Proxies 

2112 

2113 :example: 

2114 

2115 .. code-block:: yaml 

2116 

2117 policies: 

2118 - name: rds-proxy-tls-check 

2119 resource: rds-proxy 

2120 filters: 

2121 - type: value 

2122 key: RequireTLS 

2123 value: false 

2124 """ 

2125 

2126 class resource_type(TypeInfo): 

2127 service = 'rds' 

2128 name = id = 'DBProxyName' 

2129 date = 'CreatedDate' 

2130 enum_spec = ('describe_db_proxies', 'DBProxies', None) 

2131 arn = 'DBProxyArn' 

2132 arn_type = 'db-proxy' 

2133 cfn_type = 'AWS::RDS::DBProxy' 

2134 permissions_enum = ('rds:DescribeDBProxies',) 

2135 universal_taggable = object() 

2136 

2137 source_mapping = { 

2138 'describe': DescribeDBProxy, 

2139 'config': ConfigSource 

2140 } 

2141 

2142 

2143@RDSProxy.action_registry.register('delete') 

2144class DeleteRDSProxy(BaseAction): 

2145 """ 

2146 Deletes a RDS Proxy 

2147 

2148 :example: 

2149 

2150 .. code-block:: yaml 

2151 

2152 policies: 

2153 - name: delete-rds-proxy 

2154 resource: aws.rds-proxy 

2155 filters: 

2156 - type: value 

2157 key: "DBProxyName" 

2158 op: eq 

2159 value: "proxy-test-1" 

2160 actions: 

2161 - type: delete 

2162 """ 

2163 

2164 schema = type_schema('delete') 

2165 

2166 permissions = ('rds:DeleteDBProxy',) 

2167 

2168 def process(self, resources): 

2169 client = local_session(self.manager.session_factory).client('rds') 

2170 for r in resources: 

2171 self.manager.retry( 

2172 client.delete_db_proxy, DBProxyName=r['DBProxyName'], 

2173 ignore_err_codes=('DBProxyNotFoundFault', 

2174 'InvalidDBProxyStateFault')) 

2175 

2176 

2177@RDSProxy.filter_registry.register('subnet') 

2178class RDSProxySubnetFilter(net_filters.SubnetFilter): 

2179 

2180 RelatedIdsExpression = "VpcSubnetIds[]" 

2181 

2182 

2183@RDSProxy.filter_registry.register('security-group') 

2184class RDSProxySecurityGroupFilter(net_filters.SecurityGroupFilter): 

2185 

2186 RelatedIdsExpression = "VpcSecurityGroupIds[]" 

2187 

2188 

2189@RDSProxy.filter_registry.register('vpc') 

2190class RDSProxyVpcFilter(net_filters.VpcFilter): 

2191 

2192 RelatedIdsExpression = "VpcId" 

2193 

2194 

2195@filters.register('db-option-groups') 

2196class DbOptionGroups(ValueFilter): 

2197 """This filter describes RDS option groups for associated RDS instances. 

2198 Use this filter in conjunction with jmespath and value filter operators 

2199 to filter RDS instance based on their option groups 

2200 

2201 :example: 

2202 

2203 .. code-block:: yaml 

2204 

2205 policies: 

2206 - name: rds-data-in-transit-encrypted 

2207 resource: aws.rds 

2208 filters: 

2209 - type: db-option-groups 

2210 key: Options[].OptionName 

2211 op: intersect 

2212 value: 

2213 - SSL 

2214 - NATIVE_NETWORK_ENCRYPTION 

2215 

2216 :example: 

2217 

2218 .. code-block:: yaml 

2219 

2220 policies: 

2221 - name: rds-oracle-encryption-in-transit 

2222 resource: aws.rds 

2223 filters: 

2224 - Engine: oracle-ee 

2225 - type: db-option-groups 

2226 key: Options[].OptionSettings[?Name == 'SQLNET.ENCRYPTION_SERVER'].Value[] 

2227 value: 

2228 - REQUIRED 

2229 """ 

2230 

2231 schema = type_schema('db-option-groups', rinherit=ValueFilter.schema) 

2232 schema_alias = False 

2233 permissions = ('rds:DescribeDBInstances', 'rds:DescribeOptionGroups', ) 

2234 policy_annotation = 'c7n:MatchedDBOptionGroups' 

2235 

2236 def handle_optiongroup_cache(self, client, paginator, option_groups): 

2237 ogcache = {} 

2238 cache = self.manager._cache 

2239 

2240 with cache: 

2241 for og in option_groups: 

2242 cache_key = { 

2243 'region': self.manager.config.region, 

2244 'account_id': self.manager.config.account_id, 

2245 'rds-pg': og} 

2246 og_values = cache.get(cache_key) 

2247 if og_values is not None: 

2248 ogcache[og] = og_values 

2249 continue 

2250 option_groups_list = list(itertools.chain(*[p['OptionGroupsList'] 

2251 for p in paginator.paginate(OptionGroupName=og)])) 

2252 

2253 ogcache[og] = {} 

2254 for option_group in option_groups_list: 

2255 ogcache[og] = option_group 

2256 

2257 cache.save(cache_key, ogcache[og]) 

2258 

2259 return ogcache 

2260 

2261 def process(self, resources, event=None): 

2262 results = [] 

2263 client = local_session(self.manager.session_factory).client('rds') 

2264 paginator = client.get_paginator('describe_option_groups') 

2265 option_groups = [db['OptionGroupMemberships'][0]['OptionGroupName'] 

2266 for db in resources] 

2267 optioncache = self.handle_optiongroup_cache(client, paginator, option_groups) 

2268 

2269 for resource in resources: 

2270 for og in resource['OptionGroupMemberships']: 

2271 og_values = optioncache[og['OptionGroupName']] 

2272 if self.match(og_values): 

2273 resource.setdefault(self.policy_annotation, []).append({ 

2274 k: jmespath_search(k, og_values) 

2275 for k in {'OptionGroupName', self.data.get('key')} 

2276 }) 

2277 results.append(resource) 

2278 break 

2279 

2280 return results 

2281 

2282 

2283@filters.register('pending-maintenance') 

2284class PendingMaintenance(Filter): 

2285 """Scan DB instances for those with pending maintenance 

2286 

2287 :example: 

2288 

2289 .. code-block:: yaml 

2290 

2291 policies: 

2292 - name: rds-pending-maintenance 

2293 resource: aws.rds 

2294 filters: 

2295 - pending-maintenance 

2296 - type: value 

2297 key: '"c7n:PendingMaintenance"[].PendingMaintenanceActionDetails[].Action' 

2298 op: intersect 

2299 value: 

2300 - system-update 

2301 """ 

2302 

2303 annotation_key = 'c7n:PendingMaintenance' 

2304 schema = type_schema('pending-maintenance') 

2305 permissions = ('rds:DescribePendingMaintenanceActions',) 

2306 

2307 def process(self, resources, event=None): 

2308 client = local_session(self.manager.session_factory).client('rds') 

2309 

2310 results = [] 

2311 resource_maintenances = {} 

2312 paginator = client.get_paginator('describe_pending_maintenance_actions') 

2313 for page in paginator.paginate(): 

2314 for action in page['PendingMaintenanceActions']: 

2315 resource_maintenances.setdefault(action['ResourceIdentifier'], []).append(action) 

2316 

2317 for r in resources: 

2318 pending_maintenances = resource_maintenances.get(r['DBInstanceArn'], []) 

2319 if len(pending_maintenances) > 0: 

2320 r[self.annotation_key] = pending_maintenances 

2321 results.append(r) 

2322 

2323 return results