1# Copyright The Cloud Custodian Authors.
2# SPDX-License-Identifier: Apache-2.0
3"""
4RDS Resource Manager
5====================
6
7Example Policies
8----------------
9
10Find rds instances that are publicly available
11
12.. code-block:: yaml
13
14 policies:
15 - name: rds-public
16 resource: rds
17 filters:
18 - PubliclyAccessible: true
19
20Find rds instances that are not encrypted
21
22.. code-block:: yaml
23
24 policies:
25 - name: rds-non-encrypted
26 resource: rds
27 filters:
28 - type: value
29 key: StorageEncrypted
30 value: true
31 op: ne
32
33"""
34import functools
35import itertools
36import logging
37import operator
38import re
39import datetime
40
41from datetime import timedelta
42
43from decimal import Decimal as D, ROUND_HALF_UP
44
45from c7n.vendored.distutils.version import LooseVersion
46from botocore.exceptions import ClientError
47from concurrent.futures import as_completed
48
49from c7n.actions import (
50 ActionRegistry, BaseAction, ModifyVpcSecurityGroupsAction)
51
52from c7n.exceptions import PolicyValidationError
53from c7n.filters import (
54 CrossAccountAccessFilter, FilterRegistry, Filter, ValueFilter, AgeFilter)
55from c7n.filters.offhours import OffHour, OnHour
56from c7n.filters import related
57import c7n.filters.vpc as net_filters
58from c7n.manager import resources
59from c7n.query import (
60 QueryResourceManager, DescribeSource, ConfigSource, TypeInfo, RetryPageIterator)
61from c7n import deprecated, tags
62from c7n.tags import universal_augment
63
64from c7n.utils import (
65 local_session, type_schema, get_retry, chunks, snapshot_identifier,
66 merge_dict_list, filter_empty, jmespath_search)
67from c7n.resources.kms import ResourceKmsKeyAlias
68from c7n.resources.securityhub import PostFinding
69from c7n.filters.backup import ConsecutiveAwsBackupsFilter
70
71log = logging.getLogger('custodian.rds')
72
73filters = FilterRegistry('rds.filters')
74actions = ActionRegistry('rds.actions')
75
76
77class DescribeRDS(DescribeSource):
78
79 def augment(self, dbs):
80 for d in dbs:
81 d['Tags'] = d.pop('TagList', ())
82 return dbs
83
84
85class ConfigRDS(ConfigSource):
86
87 def load_resource(self, item):
88 resource = super().load_resource(item)
89 for k in list(resource.keys()):
90 if k.startswith('Db'):
91 resource["DB%s" % k[2:]] = resource[k]
92 return resource
93
94
95@resources.register('rds')
96class RDS(QueryResourceManager):
97 """Resource manager for RDS DB instances.
98 """
99
100 class resource_type(TypeInfo):
101 service = 'rds'
102 arn_type = 'db'
103 arn_separator = ':'
104 enum_spec = ('describe_db_instances', 'DBInstances', None)
105 id = 'DBInstanceIdentifier'
106 config_id = 'DbiResourceId'
107 name = 'Endpoint.Address'
108 filter_name = 'DBInstanceIdentifier'
109 filter_type = 'scalar'
110 date = 'InstanceCreateTime'
111 dimension = 'DBInstanceIdentifier'
112 cfn_type = config_type = 'AWS::RDS::DBInstance'
113 arn = 'DBInstanceArn'
114 universal_taggable = True
115 default_report_fields = (
116 'DBInstanceIdentifier',
117 'DBName',
118 'Engine',
119 'EngineVersion',
120 'MultiAZ',
121 'AllocatedStorage',
122 'StorageEncrypted',
123 'PubliclyAccessible',
124 'InstanceCreateTime',
125 )
126 permissions_enum = ('rds:DescribeDBInstances',)
127
128 filter_registry = filters
129 action_registry = actions
130
131 def resources(self, query=None):
132 if query is None and 'query' in self.data:
133 query = merge_dict_list(self.data['query'])
134 elif query is None:
135 query = {}
136 return super(RDS, self).resources(query=query)
137
138 source_mapping = {
139 'describe': DescribeRDS,
140 'config': ConfigRDS
141 }
142
143
144def _db_instance_eligible_for_backup(resource):
145 db_instance_id = resource['DBInstanceIdentifier']
146
147 # Database instance is not in available state
148 if resource.get('DBInstanceStatus', '') != 'available':
149 log.debug(
150 "DB instance %s is not in available state",
151 db_instance_id)
152 return False
153 # The specified DB Instance is a member of a cluster and its
154 # backup retention should not be modified directly. Instead,
155 # modify the backup retention of the cluster using the
156 # ModifyDbCluster API
157 if resource.get('DBClusterIdentifier', ''):
158 log.debug(
159 "DB instance %s is a cluster member",
160 db_instance_id)
161 return False
162 # DB Backups not supported on a read replica for engine postgres
163 if (resource.get('ReadReplicaSourceDBInstanceIdentifier', '') and
164 resource.get('Engine', '') == 'postgres'):
165 log.debug(
166 "DB instance %s is a postgres read-replica",
167 db_instance_id)
168 return False
169 # DB Backups not supported on a read replica running a mysql
170 # version before 5.6
171 if (resource.get('ReadReplicaSourceDBInstanceIdentifier', '') and
172 resource.get('Engine', '') == 'mysql'):
173 engine_version = resource.get('EngineVersion', '')
174 # Assume "<major>.<minor>.<whatever>"
175 match = re.match(r'(?P<major>\d+)\.(?P<minor>\d+)\..*', engine_version)
176 if (match and int(match.group('major')) < 5 or
177 (int(match.group('major')) == 5 and int(match.group('minor')) < 6)):
178 log.debug(
179 "DB instance %s is a version %s mysql read-replica",
180 db_instance_id,
181 engine_version)
182 return False
183 return True
184
185
186def _db_instance_eligible_for_final_snapshot(resource):
187 status = resource.get('DBInstanceStatus', '')
188 # If the DB instance you are deleting has a status of "Creating,"
189 # you will not be able to have a final DB snapshot taken
190 # If the DB instance is in a failure state with a status of "failed,"
191 # "incompatible-restore," or "incompatible-network," you can only delete
192 # the instance when the SkipFinalSnapshot parameter is set to "true."
193 eligible_for_final_snapshot = True
194 if status in ['creating', 'failed', 'incompatible-restore', 'incompatible-network']:
195 eligible_for_final_snapshot = False
196
197 # FinalDBSnapshotIdentifier can not be specified when deleting a
198 # replica instance
199 if resource.get('ReadReplicaSourceDBInstanceIdentifier', ''):
200 eligible_for_final_snapshot = False
201
202 # if it's a rds-cluster, don't try to run the rds instance snapshot api call
203 if resource.get('DBClusterIdentifier', False):
204 eligible_for_final_snapshot = False
205
206 if not eligible_for_final_snapshot:
207 log.debug('DB instance is not eligible for a snapshot:/n %s', resource)
208 return eligible_for_final_snapshot
209
210
211def _get_available_engine_upgrades(client, major=False):
212 """Returns all extant rds engine upgrades.
213
214 As a nested mapping of engine type to known versions
215 and their upgrades.
216
217 Defaults to minor upgrades, but configurable to major.
218
219 Example::
220
221 >>> _get_available_engine_upgrades(client)
222 {
223 'oracle-se2': {'12.1.0.2.v2': '12.1.0.2.v5',
224 '12.1.0.2.v3': '12.1.0.2.v5'},
225 'postgres': {'9.3.1': '9.3.14',
226 '9.3.10': '9.3.14',
227 '9.3.12': '9.3.14',
228 '9.3.2': '9.3.14'}
229 }
230 """
231 results = {}
232 paginator = client.get_paginator('describe_db_engine_versions')
233 for page in paginator.paginate():
234 engine_versions = page['DBEngineVersions']
235 for v in engine_versions:
236 if v['Engine'] not in results:
237 results[v['Engine']] = {}
238 if 'ValidUpgradeTarget' not in v or len(v['ValidUpgradeTarget']) == 0:
239 continue
240 for t in v['ValidUpgradeTarget']:
241 if not major and t['IsMajorVersionUpgrade']:
242 continue
243 if LooseVersion(t['EngineVersion']) > LooseVersion(
244 results[v['Engine']].get(v['EngineVersion'], '0.0.0')):
245 results[v['Engine']][v['EngineVersion']] = t['EngineVersion']
246 return results
247
248
249filters.register('offhour', OffHour)
250filters.register('onhour', OnHour)
251
252
253@filters.register('default-vpc')
254class DefaultVpc(net_filters.DefaultVpcBase):
255 """ Matches if an rds database is in the default vpc
256
257 :example:
258
259 .. code-block:: yaml
260
261 policies:
262 - name: default-vpc-rds
263 resource: rds
264 filters:
265 - type: default-vpc
266 """
267 schema = type_schema('default-vpc')
268
269 def __call__(self, rdb):
270 return self.match(rdb['DBSubnetGroup']['VpcId'])
271
272
273@filters.register('security-group')
274class SecurityGroupFilter(net_filters.SecurityGroupFilter):
275
276 RelatedIdsExpression = "VpcSecurityGroups[].VpcSecurityGroupId"
277
278
279@filters.register('subnet')
280class SubnetFilter(net_filters.SubnetFilter):
281
282 RelatedIdsExpression = "DBSubnetGroup.Subnets[].SubnetIdentifier"
283
284
285@filters.register('vpc')
286class VpcFilter(net_filters.VpcFilter):
287
288 RelatedIdsExpression = "DBSubnetGroup.VpcId"
289
290
291filters.register('network-location', net_filters.NetworkLocation)
292
293
294@filters.register('kms-alias')
295class KmsKeyAlias(ResourceKmsKeyAlias):
296
297 def process(self, dbs, event=None):
298 return self.get_matching_aliases(dbs)
299
300
301@actions.register('auto-patch')
302class AutoPatch(BaseAction):
303 """Toggle AutoMinorUpgrade flag on RDS instance
304
305 'window' parameter needs to be in the format 'ddd:hh:mm-ddd:hh:mm' and
306 have at least 30 minutes between start & end time.
307 If 'window' is not specified, AWS will assign a random maintenance window
308 to each instance selected.
309
310 :example:
311
312 .. code-block:: yaml
313
314 policies:
315 - name: enable-rds-autopatch
316 resource: rds
317 filters:
318 - AutoMinorVersionUpgrade: false
319 actions:
320 - type: auto-patch
321 minor: true
322 window: Mon:23:00-Tue:01:00
323 """
324
325 schema = type_schema(
326 'auto-patch',
327 minor={'type': 'boolean'}, window={'type': 'string'})
328 permissions = ('rds:ModifyDBInstance',)
329
330 def process(self, dbs):
331 client = local_session(
332 self.manager.session_factory).client('rds')
333
334 params = {'AutoMinorVersionUpgrade': self.data.get('minor', True)}
335 if self.data.get('window'):
336 params['PreferredMaintenanceWindow'] = self.data['window']
337
338 for db in dbs:
339 client.modify_db_instance(
340 DBInstanceIdentifier=db['DBInstanceIdentifier'],
341 **params)
342
343
344@filters.register('upgrade-available')
345class UpgradeAvailable(Filter):
346 """ Scan DB instances for available engine upgrades
347
348 This will pull DB instances & check their specific engine for any
349 engine version with higher release numbers than the current one
350
351 This will also annotate the rds instance with 'target_engine' which is
352 the most recent version of the engine available
353
354 :example:
355
356 .. code-block:: yaml
357
358 policies:
359 - name: rds-upgrade-available
360 resource: rds
361 filters:
362 - type: upgrade-available
363 major: False
364
365 """
366
367 schema = type_schema('upgrade-available',
368 major={'type': 'boolean'},
369 value={'type': 'boolean'})
370 permissions = ('rds:DescribeDBEngineVersions',)
371
372 def process(self, resources, event=None):
373 client = local_session(self.manager.session_factory).client('rds')
374 check_upgrade_extant = self.data.get('value', True)
375 check_major = self.data.get('major', False)
376 engine_upgrades = _get_available_engine_upgrades(
377 client, major=check_major)
378 results = []
379
380 for r in resources:
381 target_upgrade = engine_upgrades.get(
382 r['Engine'], {}).get(r['EngineVersion'])
383 if target_upgrade is None:
384 if check_upgrade_extant is False:
385 results.append(r)
386 continue
387 r['c7n-rds-engine-upgrade'] = target_upgrade
388 results.append(r)
389 return results
390
391
392@actions.register('upgrade')
393class UpgradeMinor(BaseAction):
394 """Upgrades a RDS instance to the latest major/minor version available
395
396 Use of the 'immediate' flag (default False) will automatically upgrade
397 the RDS engine disregarding the existing maintenance window.
398
399 :example:
400
401 .. code-block:: yaml
402
403 policies:
404 - name: upgrade-rds-minor
405 resource: rds
406 actions:
407 - type: upgrade
408 major: False
409 immediate: False
410
411 """
412
413 schema = type_schema(
414 'upgrade',
415 major={'type': 'boolean'},
416 immediate={'type': 'boolean'})
417 permissions = ('rds:ModifyDBInstance',)
418
419 def process(self, resources):
420 client = local_session(self.manager.session_factory).client('rds')
421 engine_upgrades = None
422 for r in resources:
423 if 'EngineVersion' in r['PendingModifiedValues']:
424 # Upgrade has already been scheduled
425 continue
426 if 'c7n-rds-engine-upgrade' not in r:
427 if engine_upgrades is None:
428 engine_upgrades = _get_available_engine_upgrades(
429 client, major=self.data.get('major', False))
430 target = engine_upgrades.get(
431 r['Engine'], {}).get(r['EngineVersion'])
432 if target is None:
433 log.debug(
434 "implicit filter no upgrade on %s",
435 r['DBInstanceIdentifier'])
436 continue
437 r['c7n-rds-engine-upgrade'] = target
438 client.modify_db_instance(
439 DBInstanceIdentifier=r['DBInstanceIdentifier'],
440 EngineVersion=r['c7n-rds-engine-upgrade'],
441 ApplyImmediately=self.data.get('immediate', False))
442
443
444@actions.register('tag-trim')
445class TagTrim(tags.TagTrim):
446
447 permissions = ('rds:RemoveTagsFromResource',)
448
449 def process_tag_removal(self, client, resource, candidates):
450 client.remove_tags_from_resource(ResourceName=resource['DBInstanceArn'], TagKeys=candidates)
451
452
453START_STOP_ELIGIBLE_ENGINES = {
454 'postgres', 'sqlserver-ee',
455 'oracle-se2', 'mariadb', 'oracle-ee',
456 'sqlserver-ex', 'sqlserver-se', 'oracle-se',
457 'mysql', 'oracle-se1', 'sqlserver-web',
458 'db2-ae', 'db2-se', 'oracle-ee-cdb',
459 'sqlserver-ee', 'oracle-se2-cdb'}
460
461
462def _eligible_start_stop(db, state="available"):
463 # See conditions noted here
464 # https://docs.aws.amazon.com/AmazonRDS/latest/UserGuide/USER_StopInstance.html
465 # Note that this doesn't really specify what happens for all the nosql engines
466 # that are available as rds engines.
467 if db.get('DBInstanceStatus') != state:
468 return False
469
470 if db.get('MultiAZ') and db['Engine'].startswith('sqlserver-'):
471 return False
472
473 if db['Engine'] not in START_STOP_ELIGIBLE_ENGINES:
474 return False
475
476 if db.get('ReadReplicaDBInstanceIdentifiers'):
477 return False
478
479 if db.get('ReadReplicaSourceDBInstanceIdentifier'):
480 return False
481
482 # TODO is SQL Server mirror is detectable.
483 return True
484
485
486@actions.register('stop')
487class Stop(BaseAction):
488 """Stop an rds instance.
489
490 https://docs.aws.amazon.com/AmazonRDS/latest/UserGuide/USER_StopInstance.html
491 """
492
493 schema = type_schema('stop')
494
495 permissions = ("rds:StopDBInstance",)
496
497 def process(self, resources):
498 client = local_session(self.manager.session_factory).client('rds')
499 for r in filter(_eligible_start_stop, resources):
500 try:
501 client.stop_db_instance(
502 DBInstanceIdentifier=r['DBInstanceIdentifier'])
503 except ClientError as e:
504 log.exception(
505 "Error stopping db instance:%s err:%s",
506 r['DBInstanceIdentifier'], e)
507
508
509@actions.register('start')
510class Start(BaseAction):
511 """Start an rds instance.
512 """
513
514 schema = type_schema('start')
515
516 permissions = ("rds:StartDBInstance",)
517
518 def process(self, resources):
519 client = local_session(self.manager.session_factory).client('rds')
520 start_filter = functools.partial(_eligible_start_stop, state='stopped')
521 for r in filter(start_filter, resources):
522 try:
523 client.start_db_instance(
524 DBInstanceIdentifier=r['DBInstanceIdentifier'])
525 except ClientError as e:
526 log.exception(
527 "Error starting db instance:%s err:%s",
528 r['DBInstanceIdentifier'], e)
529
530
531@actions.register('delete')
532class Delete(BaseAction):
533 """Deletes selected RDS instances
534
535 This will delete RDS instances. It is recommended to apply with a filter
536 to avoid deleting all RDS instances in the account.
537
538 :example:
539
540 .. code-block:: yaml
541
542 policies:
543 - name: rds-delete
544 resource: rds
545 filters:
546 - default-vpc
547 actions:
548 - type: delete
549 skip-snapshot: true
550 """
551
552 schema = type_schema('delete', **{
553 'skip-snapshot': {'type': 'boolean'},
554 'copy-restore-info': {'type': 'boolean'}
555 })
556
557 permissions = ('rds:DeleteDBInstance', 'rds:AddTagsToResource')
558
559 def validate(self):
560 if self.data.get('skip-snapshot', False) and self.data.get(
561 'copy-restore-info'):
562 raise PolicyValidationError(
563 "skip-snapshot cannot be specified with copy-restore-info on %s" % (
564 self.manager.data,))
565 return self
566
567 def process(self, dbs):
568 skip = self.data.get('skip-snapshot', False)
569 # Can't delete an instance in an aurora cluster, use a policy on the cluster
570 dbs = [r for r in dbs if not r.get('DBClusterIdentifier')]
571 # Concurrency feels like overkill here.
572 client = local_session(self.manager.session_factory).client('rds')
573 for db in dbs:
574 params = dict(
575 DBInstanceIdentifier=db['DBInstanceIdentifier'])
576 if skip or not _db_instance_eligible_for_final_snapshot(db):
577 params['SkipFinalSnapshot'] = True
578 else:
579 params['FinalDBSnapshotIdentifier'] = snapshot_identifier(
580 'Final', db['DBInstanceIdentifier'])
581 if self.data.get('copy-restore-info', False):
582 self.copy_restore_info(client, db)
583 if not db['CopyTagsToSnapshot']:
584 client.modify_db_instance(
585 DBInstanceIdentifier=db['DBInstanceIdentifier'],
586 CopyTagsToSnapshot=True)
587 self.log.info(
588 "Deleting rds: %s snapshot: %s",
589 db['DBInstanceIdentifier'],
590 params.get('FinalDBSnapshotIdentifier', False))
591
592 try:
593 client.delete_db_instance(**params)
594 except ClientError as e:
595 if e.response['Error']['Code'] == "InvalidDBInstanceState":
596 continue
597 raise
598
599 return dbs
600
601 def copy_restore_info(self, client, instance):
602 tags = []
603 tags.append({
604 'Key': 'VPCSecurityGroups',
605 'Value': ''.join([
606 g['VpcSecurityGroupId'] for g in instance['VpcSecurityGroups']
607 ])})
608 tags.append({
609 'Key': 'OptionGroupName',
610 'Value': instance['OptionGroupMemberships'][0]['OptionGroupName']})
611 tags.append({
612 'Key': 'ParameterGroupName',
613 'Value': instance['DBParameterGroups'][0]['DBParameterGroupName']})
614 tags.append({
615 'Key': 'InstanceClass',
616 'Value': instance['DBInstanceClass']})
617 tags.append({
618 'Key': 'StorageType',
619 'Value': instance['StorageType']})
620 tags.append({
621 'Key': 'MultiAZ',
622 'Value': str(instance['MultiAZ'])})
623 tags.append({
624 'Key': 'DBSubnetGroupName',
625 'Value': instance['DBSubnetGroup']['DBSubnetGroupName']})
626 client.add_tags_to_resource(
627 ResourceName=self.manager.generate_arn(
628 instance['DBInstanceIdentifier']),
629 Tags=tags)
630
631
632@actions.register('set-snapshot-copy-tags')
633class CopySnapshotTags(BaseAction):
634 """Enables copying tags from rds instance to snapshot
635
636 DEPRECATED - use modify-db instead with `CopyTagsToSnapshot`
637
638 :example:
639
640 .. code-block:: yaml
641
642 policies:
643 - name: enable-rds-snapshot-tags
644 resource: rds
645 filters:
646 - type: value
647 key: Engine
648 value: aurora
649 op: eq
650 actions:
651 - type: set-snapshot-copy-tags
652 enable: True
653 """
654 deprecations = (
655 deprecated.action("use modify-db instead with `CopyTagsToSnapshot`"),
656 )
657
658 schema = type_schema(
659 'set-snapshot-copy-tags',
660 enable={'type': 'boolean'})
661 permissions = ('rds:ModifyDBInstance',)
662
663 def process(self, resources):
664 error = None
665 with self.executor_factory(max_workers=2) as w:
666 futures = {}
667 client = local_session(self.manager.session_factory).client('rds')
668 resources = [r for r in resources
669 if r['CopyTagsToSnapshot'] != self.data.get('enable', True)]
670 for r in resources:
671 futures[w.submit(self.set_snapshot_tags, client, r)] = r
672 for f in as_completed(futures):
673 if f.exception():
674 error = f.exception()
675 self.log.error(
676 'error updating rds:%s CopyTagsToSnapshot \n %s',
677 futures[f]['DBInstanceIdentifier'], error)
678 if error:
679 raise error
680 return resources
681
682 def set_snapshot_tags(self, client, r):
683 self.manager.retry(
684 client.modify_db_instance,
685 DBInstanceIdentifier=r['DBInstanceIdentifier'],
686 CopyTagsToSnapshot=self.data.get('enable', True))
687
688
689@RDS.action_registry.register('post-finding')
690class DbInstanceFinding(PostFinding):
691
692 resource_type = 'AwsRdsDbInstance'
693
694 def format_resource(self, r):
695
696 fields = [
697 'AssociatedRoles', 'CACertificateIdentifier', 'DBClusterIdentifier',
698 'DBInstanceIdentifier', 'DBInstanceClass', 'DbInstancePort', 'DbiResourceId',
699 'DBName', 'DeletionProtection', 'Endpoint', 'Engine', 'EngineVersion',
700 'IAMDatabaseAuthenticationEnabled', 'InstanceCreateTime', 'KmsKeyId',
701 'PubliclyAccessible', 'StorageEncrypted',
702 'TdeCredentialArn', 'VpcSecurityGroups', 'MultiAz', 'EnhancedMonitoringResourceArn',
703 'DbInstanceStatus', 'MasterUsername',
704 'AllocatedStorage', 'PreferredBackupWindow', 'BackupRetentionPeriod',
705 'DbSecurityGroups', 'DbParameterGroups',
706 'AvailabilityZone', 'DbSubnetGroup', 'PreferredMaintenanceWindow',
707 'PendingModifiedValues', 'LatestRestorableTime',
708 'AutoMinorVersionUpgrade', 'ReadReplicaSourceDBInstanceIdentifier',
709 'ReadReplicaDBInstanceIdentifiers',
710 'ReadReplicaDBClusterIdentifiers', 'LicenseModel', 'Iops', 'OptionGroupMemberships',
711 'CharacterSetName',
712 'SecondaryAvailabilityZone', 'StatusInfos', 'StorageType', 'DomainMemberships',
713 'CopyTagsToSnapshot',
714 'MonitoringInterval', 'MonitoringRoleArn', 'PromotionTier', 'Timezone',
715 'PerformanceInsightsEnabled',
716 'PerformanceInsightsKmsKeyId', 'PerformanceInsightsRetentionPeriod',
717 'EnabledCloudWatchLogsExports',
718 'ProcessorFeatures', 'ListenerEndpoint', 'MaxAllocatedStorage'
719 ]
720 details = {}
721 for f in fields:
722 if r.get(f):
723 value = r[f]
724 if isinstance(r[f], datetime.datetime):
725 value = r[f].isoformat()
726 details.setdefault(f, value)
727
728 db_instance = {
729 'Type': self.resource_type,
730 'Id': r['DBInstanceArn'],
731 'Region': self.manager.config.region,
732 'Tags': {t['Key']: t['Value'] for t in r.get('Tags', [])},
733 'Details': {self.resource_type: filter_empty(details)},
734 }
735 db_instance = filter_empty(db_instance)
736 return db_instance
737
738
739@actions.register('snapshot')
740class Snapshot(BaseAction):
741 """Creates a manual snapshot of a RDS instance
742
743 :example:
744
745 .. code-block:: yaml
746
747 policies:
748 - name: rds-snapshot
749 resource: rds
750 actions:
751 - snapshot
752 """
753
754 schema = type_schema('snapshot')
755 permissions = ('rds:CreateDBSnapshot',)
756
757 def process(self, dbs):
758 with self.executor_factory(max_workers=3) as w:
759 futures = []
760 for db in dbs:
761 futures.append(w.submit(
762 self.process_rds_snapshot,
763 db))
764 for f in as_completed(futures):
765 if f.exception():
766 self.log.error(
767 "Exception creating rds snapshot \n %s",
768 f.exception())
769 return dbs
770
771 def process_rds_snapshot(self, resource):
772 if not _db_instance_eligible_for_backup(resource):
773 return
774
775 c = local_session(self.manager.session_factory).client('rds')
776 c.create_db_snapshot(
777 DBSnapshotIdentifier=snapshot_identifier(
778 self.data.get('snapshot-prefix', 'Backup'),
779 resource['DBInstanceIdentifier']),
780 DBInstanceIdentifier=resource['DBInstanceIdentifier'])
781
782
783@actions.register('resize')
784class ResizeInstance(BaseAction):
785 """Change the allocated storage of an rds instance.
786
787 :example:
788
789 This will find databases using over 85% of their allocated
790 storage, and resize them to have an additional 30% storage
791 the resize here is async during the next maintenance.
792
793 .. code-block:: yaml
794
795 policies:
796 - name: rds-resize-up
797 resource: rds
798 filters:
799 - type: metrics
800 name: FreeStorageSpace
801 percent-attr: AllocatedStorage
802 attr-multiplier: 1073741824
803 value: 90
804 op: greater-than
805 actions:
806 - type: resize
807 percent: 30
808
809
810 This will find databases using under 20% of their allocated
811 storage, and resize them to be 30% smaller, the resize here
812 is configured to be immediate.
813
814 .. code-block:: yaml
815
816 policies:
817 - name: rds-resize-down
818 resource: rds
819 filters:
820 - type: metrics
821 name: FreeStorageSpace
822 percent-attr: AllocatedStorage
823 attr-multiplier: 1073741824
824 value: 90
825 op: greater-than
826 actions:
827 - type: resize
828 percent: -30
829 immediate: true
830 """
831 schema = type_schema(
832 'resize',
833 percent={'type': 'number'},
834 immediate={'type': 'boolean'})
835
836 permissions = ('rds:ModifyDBInstance',)
837
838 def process(self, resources):
839 c = local_session(self.manager.session_factory).client('rds')
840 for r in resources:
841 old_val = D(r['AllocatedStorage'])
842 _100 = D(100)
843 new_val = ((_100 + D(self.data['percent'])) / _100) * old_val
844 rounded = int(new_val.quantize(D('0'), ROUND_HALF_UP))
845 c.modify_db_instance(
846 DBInstanceIdentifier=r['DBInstanceIdentifier'],
847 AllocatedStorage=rounded,
848 ApplyImmediately=self.data.get('immediate', False))
849
850
851@actions.register('retention')
852class RetentionWindow(BaseAction):
853 """
854 Sets the 'BackupRetentionPeriod' value for automated snapshots,
855 enforce (min, max, exact) sets retention days occordingly.
856 :example:
857
858 .. code-block:: yaml
859
860 policies:
861 - name: rds-snapshot-retention
862 resource: rds
863 filters:
864 - type: value
865 key: BackupRetentionPeriod
866 value: 7
867 op: lt
868 actions:
869 - type: retention
870 days: 7
871 copy-tags: true
872 enforce: exact
873 """
874
875 date_attribute = "BackupRetentionPeriod"
876 schema = type_schema(
877 'retention', **{'days': {'type': 'number'},
878 'copy-tags': {'type': 'boolean'},
879 'enforce': {'type': 'string', 'enum': [
880 'min', 'max', 'exact']}})
881 permissions = ('rds:ModifyDBInstance',)
882
883 def process(self, dbs):
884 with self.executor_factory(max_workers=3) as w:
885 futures = []
886 for db in dbs:
887 futures.append(w.submit(
888 self.process_snapshot_retention,
889 db))
890 for f in as_completed(futures):
891 if f.exception():
892 self.log.error(
893 "Exception setting rds retention \n %s",
894 f.exception())
895 return dbs
896
897 def process_snapshot_retention(self, resource):
898 current_retention = int(resource.get('BackupRetentionPeriod', 0))
899 current_copy_tags = resource['CopyTagsToSnapshot']
900 new_retention = self.data['days']
901 new_copy_tags = self.data.get('copy-tags', True)
902 retention_type = self.data.get('enforce', 'min').lower()
903
904 if ((retention_type == 'min' or
905 current_copy_tags != new_copy_tags) and
906 _db_instance_eligible_for_backup(resource)):
907 self.set_retention_window(
908 resource,
909 max(current_retention, new_retention),
910 new_copy_tags)
911 return resource
912
913 if ((retention_type == 'max' or
914 current_copy_tags != new_copy_tags) and
915 _db_instance_eligible_for_backup(resource)):
916 self.set_retention_window(
917 resource,
918 min(current_retention, new_retention),
919 new_copy_tags)
920 return resource
921
922 if ((retention_type == 'exact' or
923 current_copy_tags != new_copy_tags) and
924 _db_instance_eligible_for_backup(resource)):
925 self.set_retention_window(resource, new_retention, new_copy_tags)
926 return resource
927
928 def set_retention_window(self, resource, retention, copy_tags):
929 c = local_session(self.manager.session_factory).client('rds')
930 c.modify_db_instance(
931 DBInstanceIdentifier=resource['DBInstanceIdentifier'],
932 BackupRetentionPeriod=retention,
933 CopyTagsToSnapshot=copy_tags)
934
935
936@actions.register('set-public-access')
937class RDSSetPublicAvailability(BaseAction):
938 """
939 This action allows for toggling an RDS instance
940 'PubliclyAccessible' flag to true or false
941
942 :example:
943
944 .. code-block:: yaml
945
946 policies:
947 - name: disable-rds-public-accessibility
948 resource: rds
949 filters:
950 - PubliclyAccessible: true
951 actions:
952 - type: set-public-access
953 state: false
954 """
955
956 schema = type_schema(
957 "set-public-access",
958 state={'type': 'boolean'})
959 permissions = ('rds:ModifyDBInstance',)
960
961 def set_accessibility(self, r):
962 client = local_session(self.manager.session_factory).client('rds')
963 client.modify_db_instance(
964 DBInstanceIdentifier=r['DBInstanceIdentifier'],
965 PubliclyAccessible=self.data.get('state', False))
966
967 def process(self, rds):
968 with self.executor_factory(max_workers=2) as w:
969 futures = {w.submit(self.set_accessibility, r): r for r in rds}
970 for f in as_completed(futures):
971 if f.exception():
972 self.log.error(
973 "Exception setting public access on %s \n %s",
974 futures[f]['DBInstanceIdentifier'], f.exception())
975 return rds
976
977
978@resources.register('rds-subscription')
979class RDSSubscription(QueryResourceManager):
980
981 class resource_type(TypeInfo):
982 service = 'rds'
983 arn_type = 'es'
984 cfn_type = 'AWS::RDS::EventSubscription'
985 enum_spec = (
986 'describe_event_subscriptions', 'EventSubscriptionsList', None)
987 name = id = "CustSubscriptionId"
988 arn = 'EventSubscriptionArn'
989 date = "SubscriptionCreateTime"
990 permissions_enum = ('rds:DescribeEventSubscriptions',)
991 universal_taggable = object()
992
993 augment = universal_augment
994
995
996@RDSSubscription.filter_registry.register('topic')
997class RDSSubscriptionSNSTopic(related.RelatedResourceFilter):
998 """
999 Retrieves topics related to RDS event subscriptions
1000
1001 :example:
1002
1003 .. code-block:: yaml
1004
1005 policies:
1006 - name: rds-subscriptions-no-confirmed-topics
1007 resource: aws.rds-subscription
1008 filters:
1009 - type: topic
1010 key: SubscriptionsConfirmed
1011 value: 0
1012 value_type: integer
1013 """
1014 schema = type_schema('topic', rinherit=ValueFilter.schema)
1015 RelatedResource = 'c7n.resources.sns.SNS'
1016 RelatedIdsExpression = 'SnsTopicArn'
1017 annotation_key = 'c7n:SnsTopic'
1018
1019 def process(self, resources, event=None):
1020 rel = self.get_related(resources)
1021 matched = []
1022 for resource in resources:
1023 if self.process_resource(resource, rel):
1024 # adding full topic details
1025 resource[self.annotation_key] = rel.get(
1026 resource[self.RelatedIdsExpression],
1027 None # can be if value is "absent"
1028 )
1029 matched.append(resource)
1030 return matched
1031
1032
1033@RDSSubscription.action_registry.register('delete')
1034class RDSSubscriptionDelete(BaseAction):
1035 """Deletes a RDS snapshot resource
1036
1037 :example:
1038
1039 .. code-block:: yaml
1040
1041 policies:
1042 - name: rds-subscription-delete
1043 resource: rds-subscription
1044 filters:
1045 - type: value
1046 key: CustSubscriptionId
1047 value: xyz
1048 actions:
1049 - delete
1050 """
1051
1052 schema = type_schema('delete')
1053 permissions = ('rds:DeleteEventSubscription',)
1054
1055 def process(self, resources):
1056 client = local_session(self.manager.session_factory).client('rds')
1057 for r in resources:
1058 self.manager.retry(
1059 client.delete_event_subscription, SubscriptionName=r['CustSubscriptionId'],
1060 ignore_err_codes=('SubscriptionNotFoundFault',
1061 'InvalidEventSubscriptionStateFault'))
1062
1063
1064class DescribeRDSSnapshot(DescribeSource):
1065
1066 def get_resources(self, ids, cache=True):
1067 super_get = super().get_resources
1068 return list(itertools.chain(*[super_get((i,)) for i in ids]))
1069
1070 def augment(self, snaps):
1071 for s in snaps:
1072 s['Tags'] = s.pop('TagList', ())
1073 return snaps
1074
1075
1076@resources.register('rds-snapshot')
1077class RDSSnapshot(QueryResourceManager):
1078 """Resource manager for RDS DB snapshots.
1079 """
1080
1081 class resource_type(TypeInfo):
1082 service = 'rds'
1083 arn_type = 'snapshot'
1084 arn_separator = ':'
1085 enum_spec = ('describe_db_snapshots', 'DBSnapshots', None)
1086 name = id = 'DBSnapshotIdentifier'
1087 date = 'SnapshotCreateTime'
1088 config_type = "AWS::RDS::DBSnapshot"
1089 filter_name = "DBSnapshotIdentifier"
1090 filter_type = "scalar"
1091 universal_taggable = True
1092 permissions_enum = ('rds:DescribeDBSnapshots',)
1093
1094 source_mapping = {
1095 'describe': DescribeRDSSnapshot,
1096 'config': ConfigSource
1097 }
1098
1099
1100@RDSSnapshot.filter_registry.register('onhour')
1101class RDSSnapshotOnHour(OnHour):
1102 """Scheduled action on rds snapshot."""
1103
1104
1105@RDSSnapshot.filter_registry.register('instance')
1106class SnapshotInstance(related.RelatedResourceFilter):
1107 """Filter snapshots by their database attributes.
1108
1109 :example:
1110
1111 Find snapshots without an extant database
1112
1113 .. code-block:: yaml
1114
1115 policies:
1116 - name: rds-snapshot-orphan
1117 resource: aws.rds-snapshot
1118 filters:
1119 - type: instance
1120 value: 0
1121 value_type: resource_count
1122 """
1123 schema = type_schema(
1124 'instance', rinherit=ValueFilter.schema
1125 )
1126
1127 RelatedResource = "c7n.resources.rds.RDS"
1128 RelatedIdsExpression = "DBInstanceIdentifier"
1129 FetchThreshold = 5
1130
1131
1132@RDSSnapshot.filter_registry.register('latest')
1133class LatestSnapshot(Filter):
1134 """Return the latest snapshot for each database.
1135 """
1136 schema = type_schema('latest', automatic={'type': 'boolean'})
1137 permissions = ('rds:DescribeDBSnapshots',)
1138
1139 def process(self, resources, event=None):
1140 results = []
1141 if not self.data.get('automatic', True):
1142 resources = [r for r in resources if r['SnapshotType'] == 'manual']
1143 for db_identifier, snapshots in itertools.groupby(
1144 resources, operator.itemgetter('DBInstanceIdentifier')):
1145 results.append(
1146 sorted(snapshots,
1147 key=operator.itemgetter('SnapshotCreateTime'))[-1])
1148 return results
1149
1150
1151@RDSSnapshot.filter_registry.register('age')
1152class RDSSnapshotAge(AgeFilter):
1153 """Filters RDS snapshots based on age (in days)
1154
1155 :example:
1156
1157 .. code-block:: yaml
1158
1159 policies:
1160 - name: rds-snapshot-expired
1161 resource: rds-snapshot
1162 filters:
1163 - type: age
1164 days: 28
1165 op: ge
1166 actions:
1167 - delete
1168 """
1169
1170 schema = type_schema(
1171 'age', days={'type': 'number'},
1172 op={'$ref': '#/definitions/filters_common/comparison_operators'})
1173
1174 date_attribute = 'SnapshotCreateTime'
1175
1176 def get_resource_date(self, i):
1177 return i.get('SnapshotCreateTime')
1178
1179
1180@RDSSnapshot.action_registry.register('restore')
1181class RestoreInstance(BaseAction):
1182 """Restore an rds instance from a snapshot.
1183
1184 Note this requires the snapshot or db deletion be taken
1185 with the `copy-restore-info` boolean flag set to true, as
1186 various instance metadata is stored on the snapshot as tags.
1187
1188 additional parameters to restore db instance api call be overriden
1189 via `restore_options` settings. various modify db instance parameters
1190 can be specified via `modify_options` settings.
1191 """
1192
1193 schema = type_schema(
1194 'restore',
1195 restore_options={'type': 'object'},
1196 modify_options={'type': 'object'})
1197
1198 permissions = (
1199 'rds:ModifyDBInstance',
1200 'rds:ModifyDBParameterGroup',
1201 'rds:ModifyOptionGroup',
1202 'rds:RebootDBInstance',
1203 'rds:RestoreDBInstanceFromDBSnapshot')
1204
1205 poll_period = 60
1206 restore_keys = {
1207 'VPCSecurityGroups', 'MultiAZ', 'DBSubnetGroupName',
1208 'InstanceClass', 'StorageType', 'ParameterGroupName',
1209 'OptionGroupName'}
1210
1211 def validate(self):
1212 found = False
1213 for f in self.manager.iter_filters():
1214 if isinstance(f, LatestSnapshot):
1215 found = True
1216 if not found:
1217 # do we really need this...
1218 raise PolicyValidationError(
1219 "must filter by latest to use restore action %s" % (
1220 self.manager.data,))
1221 return self
1222
1223 def process(self, resources):
1224 client = local_session(self.manager.session_factory).client('rds')
1225 # restore up to 10 in parallel, we have to wait on each.
1226 with self.executor_factory(
1227 max_workers=min(10, len(resources) or 1)) as w:
1228 futures = {}
1229 for r in resources:
1230 tags = {t['Key']: t['Value'] for t in r['Tags']}
1231 if not set(tags).issuperset(self.restore_keys):
1232 self.log.warning(
1233 "snapshot:%s missing restore tags",
1234 r['DBSnapshotIdentifier'])
1235 continue
1236 futures[w.submit(self.process_instance, client, r)] = r
1237 for f in as_completed(futures):
1238 r = futures[f]
1239 if f.exception():
1240 self.log.warning(
1241 "Error restoring db:%s from:%s error:\n%s",
1242 r['DBInstanceIdentifier'], r['DBSnapshotIdentifier'],
1243 f.exception())
1244 continue
1245
1246 def process_instance(self, client, r):
1247 params, post_modify = self.get_restore_from_tags(r)
1248 self.manager.retry(
1249 client.restore_db_instance_from_db_snapshot, **params)
1250 waiter = client.get_waiter('db_instance_available')
1251 # wait up to 40m
1252 waiter.config.delay = self.poll_period
1253 waiter.wait(DBInstanceIdentifier=params['DBInstanceIdentifier'])
1254 self.manager.retry(
1255 client.modify_db_instance,
1256 DBInstanceIdentifier=params['DBInstanceIdentifier'],
1257 ApplyImmediately=True,
1258 **post_modify)
1259 self.manager.retry(
1260 client.reboot_db_instance,
1261 DBInstanceIdentifier=params['DBInstanceIdentifier'],
1262 ForceFailover=False)
1263
1264 def get_restore_from_tags(self, snapshot):
1265 params, post_modify = {}, {}
1266 tags = {t['Key']: t['Value'] for t in snapshot['Tags']}
1267
1268 params['DBInstanceIdentifier'] = snapshot['DBInstanceIdentifier']
1269 params['DBSnapshotIdentifier'] = snapshot['DBSnapshotIdentifier']
1270 params['MultiAZ'] = tags['MultiAZ'] == 'True' and True or False
1271 params['DBSubnetGroupName'] = tags['DBSubnetGroupName']
1272 params['DBInstanceClass'] = tags['InstanceClass']
1273 params['CopyTagsToSnapshot'] = True
1274 params['StorageType'] = tags['StorageType']
1275 params['OptionGroupName'] = tags['OptionGroupName']
1276
1277 post_modify['DBParameterGroupName'] = tags['ParameterGroupName']
1278 post_modify['VpcSecurityGroupIds'] = tags['VPCSecurityGroups'].split(',')
1279
1280 params['Tags'] = [
1281 {'Key': k, 'Value': v} for k, v in tags.items()
1282 if k not in self.restore_keys]
1283
1284 params.update(self.data.get('restore_options', {}))
1285 post_modify.update(self.data.get('modify_options', {}))
1286 return params, post_modify
1287
1288
1289@RDSSnapshot.filter_registry.register('cross-account')
1290class CrossAccountAccess(CrossAccountAccessFilter):
1291
1292 permissions = ('rds:DescribeDBSnapshotAttributes',)
1293 attributes_key = 'c7n:attributes'
1294 annotation_key = 'c7n:CrossAccountViolations'
1295
1296 def process(self, resources, event=None):
1297 self.accounts = self.get_accounts()
1298 self.everyone_only = self.data.get("everyone_only", False)
1299 results = []
1300 with self.executor_factory(max_workers=2) as w:
1301 futures = []
1302 for resource_set in chunks(resources, 20):
1303 futures.append(w.submit(
1304 self.process_resource_set, resource_set))
1305 for f in as_completed(futures):
1306 if f.exception():
1307 self.log.error(
1308 "Exception checking cross account access\n %s" % (
1309 f.exception()))
1310 continue
1311 results.extend(f.result())
1312 return results
1313
1314 def process_resource_set(self, resource_set):
1315 client = local_session(self.manager.session_factory).client('rds')
1316 results = []
1317 for r in resource_set:
1318 attrs = {t['AttributeName']: t['AttributeValues']
1319 for t in self.manager.retry(
1320 client.describe_db_snapshot_attributes,
1321 DBSnapshotIdentifier=r['DBSnapshotIdentifier'])[
1322 'DBSnapshotAttributesResult']['DBSnapshotAttributes']}
1323 r[self.attributes_key] = attrs
1324 shared_accounts = set(attrs.get('restore', []))
1325 if self.everyone_only:
1326 shared_accounts = {a for a in shared_accounts if a == 'all'}
1327 delta_accounts = shared_accounts.difference(self.accounts)
1328 if delta_accounts:
1329 r[self.annotation_key] = list(delta_accounts)
1330 results.append(r)
1331 return results
1332
1333
1334@RDSSnapshot.action_registry.register('set-permissions')
1335class SetPermissions(BaseAction):
1336 """Set permissions for copying or restoring an RDS snapshot
1337
1338 Use the 'add' and 'remove' parameters to control which accounts to
1339 add or remove, respectively. The default is to remove any
1340 permissions granted to other AWS accounts.
1341
1342 Use `remove: matched` in combination with the `cross-account` filter
1343 for more flexible removal options such as preserving access for
1344 a set of whitelisted accounts:
1345
1346 :example:
1347
1348 .. code-block:: yaml
1349
1350 policies:
1351 - name: rds-snapshot-remove-cross-account
1352 resource: rds-snapshot
1353 filters:
1354 - type: cross-account
1355 whitelist:
1356 - '112233445566'
1357 actions:
1358 - type: set-permissions
1359 remove: matched
1360 """
1361 schema = type_schema(
1362 'set-permissions',
1363 remove={'oneOf': [
1364 {'enum': ['matched']},
1365 {'type': 'array', 'items': {
1366 'oneOf': [
1367 {'type': 'string', 'minLength': 12, 'maxLength': 12},
1368 {'enum': ['all']},
1369 ],
1370 }}
1371 ]},
1372 add={
1373 'type': 'array', 'items': {
1374 'oneOf': [
1375 {'type': 'string', 'minLength': 12, 'maxLength': 12},
1376 {'enum': ['all']},
1377 ]
1378 }
1379 }
1380 )
1381
1382 permissions = ('rds:ModifyDBSnapshotAttribute',)
1383
1384 def validate(self):
1385 if self.data.get('remove') == 'matched':
1386 found = False
1387 for f in self.manager.iter_filters():
1388 if isinstance(f, CrossAccountAccessFilter):
1389 found = True
1390 break
1391 if not found:
1392 raise PolicyValidationError(
1393 "policy:%s filter:%s with matched requires cross-account filter" % (
1394 self.manager.ctx.policy.name, self.type))
1395
1396 def process(self, snapshots):
1397 client = local_session(self.manager.session_factory).client('rds')
1398 for s in snapshots:
1399 self.process_snapshot(client, s)
1400
1401 def process_snapshot(self, client, snapshot):
1402 add_accounts = self.data.get('add', [])
1403 remove_accounts = self.data.get('remove', [])
1404
1405 if not (add_accounts or remove_accounts):
1406 if CrossAccountAccess.attributes_key not in snapshot:
1407 attrs = {
1408 t['AttributeName']: t['AttributeValues']
1409 for t in self.manager.retry(
1410 client.describe_db_snapshot_attributes,
1411 DBSnapshotIdentifier=snapshot['DBSnapshotIdentifier']
1412 )['DBSnapshotAttributesResult']['DBSnapshotAttributes']
1413 }
1414 snapshot[CrossAccountAccess.attributes_key] = attrs
1415 remove_accounts = snapshot[CrossAccountAccess.attributes_key].get('restore', [])
1416 elif remove_accounts == 'matched':
1417 remove_accounts = snapshot.get(CrossAccountAccess.annotation_key, [])
1418
1419 if add_accounts or remove_accounts:
1420 client.modify_db_snapshot_attribute(
1421 DBSnapshotIdentifier=snapshot['DBSnapshotIdentifier'],
1422 AttributeName='restore',
1423 ValuesToRemove=remove_accounts,
1424 ValuesToAdd=add_accounts)
1425
1426
1427@RDSSnapshot.action_registry.register('region-copy')
1428class RegionCopySnapshot(BaseAction):
1429 """Copy a snapshot across regions.
1430
1431 Note there is a max in flight for cross region rds snapshots
1432 of 5 per region. This action will attempt to retry automatically
1433 for an hr.
1434
1435 Example::
1436
1437 - name: copy-encrypted-snapshots
1438 description: |
1439 copy snapshots under 1 day old to dr region with kms
1440 resource: rds-snapshot
1441 region: us-east-1
1442 filters:
1443 - Status: available
1444 - type: value
1445 key: SnapshotCreateTime
1446 value_type: age
1447 value: 1
1448 op: less-than
1449 actions:
1450 - type: region-copy
1451 target_region: us-east-2
1452 target_key: arn:aws:kms:us-east-2:0000:key/cb291f53-c9cf61
1453 copy_tags: true
1454 tags:
1455 OriginRegion: us-east-1
1456 """
1457
1458 schema = type_schema(
1459 'region-copy',
1460 target_region={'type': 'string'},
1461 target_key={'type': 'string'},
1462 copy_tags={'type': 'boolean'},
1463 tags={'type': 'object'},
1464 required=('target_region',))
1465
1466 permissions = ('rds:CopyDBSnapshot',)
1467 min_delay = 120
1468 max_attempts = 30
1469
1470 def validate(self):
1471 if self.data.get('target_region') and self.manager.data.get('mode'):
1472 raise PolicyValidationError(
1473 "cross region snapshot may require waiting for "
1474 "longer then lambda runtime allows %s" % (self.manager.data,))
1475 return self
1476
1477 def process(self, resources):
1478 if self.data['target_region'] == self.manager.config.region:
1479 self.log.warning(
1480 "Source and destination region are the same, skipping copy")
1481 return
1482 for resource_set in chunks(resources, 20):
1483 self.process_resource_set(resource_set)
1484
1485 def process_resource(self, target, key, tags, snapshot):
1486 p = {}
1487 if key:
1488 p['KmsKeyId'] = key
1489 p['TargetDBSnapshotIdentifier'] = snapshot[
1490 'DBSnapshotIdentifier'].replace(':', '-')
1491 p['SourceRegion'] = self.manager.config.region
1492 p['SourceDBSnapshotIdentifier'] = snapshot['DBSnapshotArn']
1493
1494 if self.data.get('copy_tags', True):
1495 p['CopyTags'] = True
1496 if tags:
1497 p['Tags'] = tags
1498
1499 retry = get_retry(
1500 ('SnapshotQuotaExceeded',),
1501 # TODO make this configurable, class defaults to 1hr
1502 min_delay=self.min_delay,
1503 max_attempts=self.max_attempts,
1504 log_retries=logging.DEBUG)
1505 try:
1506 result = retry(target.copy_db_snapshot, **p)
1507 except ClientError as e:
1508 if e.response['Error']['Code'] == 'DBSnapshotAlreadyExists':
1509 self.log.warning(
1510 "Snapshot %s already exists in target region",
1511 snapshot['DBSnapshotIdentifier'])
1512 return
1513 raise
1514 snapshot['c7n:CopiedSnapshot'] = result[
1515 'DBSnapshot']['DBSnapshotArn']
1516
1517 def process_resource_set(self, resource_set):
1518 target_client = self.manager.session_factory(
1519 region=self.data['target_region']).client('rds')
1520 target_key = self.data.get('target_key')
1521 tags = [{'Key': k, 'Value': v} for k, v
1522 in self.data.get('tags', {}).items()]
1523
1524 for snapshot_set in chunks(resource_set, 5):
1525 for r in snapshot_set:
1526 # If tags are supplied, copy tags are ignored, and
1527 # we need to augment the tag set with the original
1528 # resource tags to preserve the common case.
1529 rtags = tags and list(tags) or None
1530 if tags and self.data.get('copy_tags', True):
1531 rtags.extend(r['Tags'])
1532 self.process_resource(target_client, target_key, rtags, r)
1533
1534
1535@RDSSnapshot.action_registry.register('delete')
1536class RDSSnapshotDelete(BaseAction):
1537 """Deletes a RDS snapshot resource
1538
1539 :example:
1540
1541 .. code-block:: yaml
1542
1543 policies:
1544 - name: rds-snapshot-delete-stale
1545 resource: rds-snapshot
1546 filters:
1547 - type: age
1548 days: 28
1549 op: ge
1550 actions:
1551 - delete
1552 """
1553
1554 schema = type_schema('delete')
1555 permissions = ('rds:DeleteDBSnapshot',)
1556
1557 def process(self, snapshots):
1558 snapshots = self.filter_resources(snapshots, 'SnapshotType', ('manual',))
1559 if not snapshots:
1560 return []
1561 log.info("Deleting %d rds snapshots", len(snapshots))
1562 with self.executor_factory(max_workers=3) as w:
1563 futures = []
1564 for snapshot_set in chunks(reversed(snapshots), size=50):
1565 futures.append(
1566 w.submit(self.process_snapshot_set, snapshot_set))
1567 for f in as_completed(futures):
1568 if f.exception():
1569 self.log.error(
1570 "Exception deleting snapshot set \n %s",
1571 f.exception())
1572 return snapshots
1573
1574 def process_snapshot_set(self, snapshots_set):
1575 c = local_session(self.manager.session_factory).client('rds')
1576 for s in snapshots_set:
1577 c.delete_db_snapshot(
1578 DBSnapshotIdentifier=s['DBSnapshotIdentifier'])
1579
1580
1581@actions.register('modify-security-groups')
1582class RDSModifyVpcSecurityGroups(ModifyVpcSecurityGroupsAction):
1583
1584 permissions = ('rds:ModifyDBInstance', 'rds:ModifyDBCluster')
1585 vpc_expr = 'DBSubnetGroup.VpcId'
1586
1587 def process(self, rds_instances):
1588 replication_group_map = {}
1589 client = local_session(self.manager.session_factory).client('rds')
1590 groups = super(RDSModifyVpcSecurityGroups, self).get_groups(
1591 rds_instances)
1592
1593 # either build map for DB cluster or modify DB instance directly
1594 for idx, i in enumerate(rds_instances):
1595 if i.get('DBClusterIdentifier'):
1596 # build map of Replication Groups to Security Groups
1597 replication_group_map[i['DBClusterIdentifier']] = groups[idx]
1598 else:
1599 client.modify_db_instance(
1600 DBInstanceIdentifier=i['DBInstanceIdentifier'],
1601 VpcSecurityGroupIds=groups[idx])
1602
1603 # handle DB cluster, if necessary
1604 for idx, r in enumerate(replication_group_map.keys()):
1605 client.modify_db_cluster(
1606 DBClusterIdentifier=r,
1607 VpcSecurityGroupIds=replication_group_map[r]
1608 )
1609
1610
1611class DescribeSubnetGroup(DescribeSource):
1612
1613 def augment(self, resources):
1614 _db_subnet_group_tags(
1615 resources, self.manager.session_factory,
1616 self.manager.executor_factory, self.manager.retry)
1617 return resources
1618
1619
1620@resources.register('rds-subnet-group')
1621class RDSSubnetGroup(QueryResourceManager):
1622 """RDS subnet group."""
1623
1624 class resource_type(TypeInfo):
1625 service = 'rds'
1626 arn_type = 'subgrp'
1627 id = name = 'DBSubnetGroupName'
1628 arn_separator = ':'
1629 enum_spec = (
1630 'describe_db_subnet_groups', 'DBSubnetGroups', None)
1631 filter_name = 'DBSubnetGroupName'
1632 filter_type = 'scalar'
1633 permissions_enum = ('rds:DescribeDBSubnetGroups',)
1634 cfn_type = config_type = 'AWS::RDS::DBSubnetGroup'
1635 universal_taggable = object()
1636
1637 source_mapping = {
1638 'config': ConfigSource,
1639 'describe': DescribeSubnetGroup
1640 }
1641
1642
1643def _db_subnet_group_tags(subnet_groups, session_factory, executor_factory, retry):
1644 client = local_session(session_factory).client('rds')
1645
1646 def process_tags(g):
1647 try:
1648 g['Tags'] = client.list_tags_for_resource(
1649 ResourceName=g['DBSubnetGroupArn'])['TagList']
1650 return g
1651 except client.exceptions.DBSubnetGroupNotFoundFault:
1652 return None
1653
1654 return list(filter(None, map(process_tags, subnet_groups)))
1655
1656
1657@RDSSubnetGroup.action_registry.register('delete')
1658class RDSSubnetGroupDeleteAction(BaseAction):
1659 """Action to delete RDS Subnet Group
1660
1661 It is recommended to apply a filter to the delete policy to avoid unwanted
1662 deletion of any rds subnet groups.
1663
1664 :example:
1665
1666 .. code-block:: yaml
1667
1668 policies:
1669 - name: rds-subnet-group-delete
1670 resource: rds-subnet-group
1671 filters:
1672 - Instances: []
1673 actions:
1674 - delete
1675 """
1676
1677 schema = type_schema('delete')
1678 permissions = ('rds:DeleteDBSubnetGroup',)
1679
1680 def process(self, subnet_group):
1681 with self.executor_factory(max_workers=2) as w:
1682 list(w.map(self.process_subnetgroup, subnet_group))
1683
1684 def process_subnetgroup(self, subnet_group):
1685 client = local_session(self.manager.session_factory).client('rds')
1686 client.delete_db_subnet_group(DBSubnetGroupName=subnet_group['DBSubnetGroupName'])
1687
1688
1689@RDSSubnetGroup.filter_registry.register('unused')
1690class UnusedRDSSubnetGroup(Filter):
1691 """Filters all launch rds subnet groups that are not in use but exist
1692
1693 :example:
1694
1695 .. code-block:: yaml
1696
1697 policies:
1698 - name: rds-subnet-group-delete-unused
1699 resource: rds-subnet-group
1700 filters:
1701 - unused
1702 """
1703
1704 schema = type_schema('unused')
1705
1706 def get_permissions(self):
1707 return self.manager.get_resource_manager('rds').get_permissions()
1708
1709 def process(self, configs, event=None):
1710 rds = self.manager.get_resource_manager('rds').resources()
1711 self.used = set(jmespath_search('[].DBSubnetGroup.DBSubnetGroupName', rds))
1712 self.used.update(set(jmespath_search('[].DBSubnetGroup.DBSubnetGroupName',
1713 self.manager.get_resource_manager('rds-cluster').resources(augment=False))))
1714 return super(UnusedRDSSubnetGroup, self).process(configs)
1715
1716 def __call__(self, config):
1717 return config['DBSubnetGroupName'] not in self.used
1718
1719
1720@filters.register('db-parameter')
1721class ParameterFilter(ValueFilter):
1722 """
1723 Applies value type filter on set db parameter values.
1724 :example:
1725
1726 .. code-block:: yaml
1727
1728 policies:
1729 - name: rds-pg
1730 resource: rds
1731 filters:
1732 - type: db-parameter
1733 key: someparam
1734 op: eq
1735 value: someval
1736 """
1737
1738 schema = type_schema('db-parameter', rinherit=ValueFilter.schema)
1739 schema_alias = False
1740 permissions = ('rds:DescribeDBInstances', 'rds:DescribeDBParameters', )
1741 policy_annotation = 'c7n:MatchedDBParameter'
1742
1743 @staticmethod
1744 def recast(val, datatype):
1745 """ Re-cast the value based upon an AWS supplied datatype
1746 and treat nulls sensibly.
1747 """
1748 ret_val = val
1749 if datatype == 'string':
1750 ret_val = str(val)
1751 elif datatype == 'boolean':
1752 # AWS returns 1s and 0s for boolean for most of the cases
1753 if val.isdigit():
1754 ret_val = bool(int(val))
1755 # AWS returns 'TRUE,FALSE' for Oracle engine
1756 elif val == 'TRUE':
1757 ret_val = True
1758 elif val == 'FALSE':
1759 ret_val = False
1760 elif datatype == 'integer':
1761 if val.isdigit():
1762 ret_val = int(val)
1763 elif datatype == 'float':
1764 ret_val = float(val) if val else 0.0
1765
1766 return ret_val
1767
1768 # Private method for 'DBParameterGroupName' paginator
1769 def _get_param_list(self, pg):
1770 client = local_session(self.manager.session_factory).client('rds')
1771 paginator = client.get_paginator('describe_db_parameters')
1772 param_list = list(itertools.chain(*[p['Parameters']
1773 for p in paginator.paginate(DBParameterGroupName=pg)]))
1774 return param_list
1775
1776 def handle_paramgroup_cache(self, param_groups):
1777 pgcache = {}
1778 cache = self.manager._cache
1779
1780 with cache:
1781 for pg in param_groups:
1782 cache_key = {
1783 'region': self.manager.config.region,
1784 'account_id': self.manager.config.account_id,
1785 'rds-pg': pg}
1786 pg_values = cache.get(cache_key)
1787 if pg_values is not None:
1788 pgcache[pg] = pg_values
1789 continue
1790 param_list = self._get_param_list(pg)
1791 pgcache[pg] = {
1792 p['ParameterName']: self.recast(p['ParameterValue'], p['DataType'])
1793 for p in param_list if 'ParameterValue' in p}
1794 cache.save(cache_key, pgcache[pg])
1795 return pgcache
1796
1797 def process(self, resources, event=None):
1798 results = []
1799 parameter_group_list = {db['DBParameterGroups'][0]['DBParameterGroupName']
1800 for db in resources}
1801 paramcache = self.handle_paramgroup_cache(parameter_group_list)
1802 for resource in resources:
1803 for pg in resource['DBParameterGroups']:
1804 pg_values = paramcache[pg['DBParameterGroupName']]
1805 if self.match(pg_values):
1806 resource.setdefault(self.policy_annotation, []).append(
1807 self.data.get('key'))
1808 results.append(resource)
1809 break
1810 return results
1811
1812
1813@actions.register('modify-db')
1814class ModifyDb(BaseAction):
1815 """Modifies an RDS instance based on specified parameter
1816 using ModifyDbInstance.
1817
1818 'Update' is an array with with key value pairs that should be set to
1819 the property and value you wish to modify.
1820 'Immediate" determines whether the modification is applied immediately
1821 or not. If 'immediate' is not specified, default is false.
1822
1823 :example:
1824
1825 .. code-block:: yaml
1826
1827 policies:
1828 - name: disable-rds-deletion-protection
1829 resource: rds
1830 filters:
1831 - DeletionProtection: true
1832 - PubliclyAccessible: true
1833 actions:
1834 - type: modify-db
1835 update:
1836 - property: 'DeletionProtection'
1837 value: false
1838 - property: 'PubliclyAccessible'
1839 value: false
1840 immediate: true
1841 """
1842
1843 schema = type_schema(
1844 'modify-db',
1845 immediate={"type": 'boolean'},
1846 update={
1847 'type': 'array',
1848 'items': {
1849 'type': 'object',
1850 'properties': {
1851 'property': {'type': 'string', 'enum': [
1852 'AllocatedStorage',
1853 'DBInstanceClass',
1854 'DBSubnetGroupName',
1855 'DBSecurityGroups',
1856 'VpcSecurityGroupIds',
1857 'MasterUserPassword',
1858 'DBParameterGroupName',
1859 'BackupRetentionPeriod',
1860 'PreferredBackupWindow',
1861 'PreferredMaintenanceWindow',
1862 'MultiAZ',
1863 'EngineVersion',
1864 'AllowMajorVersionUpgrade',
1865 'AutoMinorVersionUpgrade',
1866 'LicenseModel',
1867 'Iops',
1868 'OptionGroupName',
1869 'NewDBInstanceIdentifier',
1870 'StorageType',
1871 'TdeCredentialArn',
1872 'TdeCredentialPassword',
1873 'CACertificateIdentifier',
1874 'Domain',
1875 'CopyTagsToSnapshot',
1876 'MonitoringInterval',
1877 'MonitoringRoleARN',
1878 'DBPortNumber',
1879 'PubliclyAccessible',
1880 'DomainIAMRoleName',
1881 'PromotionTier',
1882 'EnableIAMDatabaseAuthentication',
1883 'EnablePerformanceInsights',
1884 'PerformanceInsightsKMSKeyId',
1885 'PerformanceInsightsRetentionPeriod',
1886 'CloudwatchLogsExportConfiguration',
1887 'ProcessorFeatures',
1888 'UseDefaultProcessorFeatures',
1889 'DeletionProtection',
1890 'MaxAllocatedStorage',
1891 'CertificateRotationRestart']},
1892 'value': {}
1893 },
1894 },
1895 },
1896 required=('update',))
1897
1898 permissions = ('rds:ModifyDBInstance',)
1899 conversion_map = {
1900 'DBSubnetGroupName': 'DBSubnetGroup.DBSubnetGroupName',
1901 'VpcSecurityGroupIds': 'VpcSecurityGroups[].VpcSecurityGroupId',
1902 'DBParameterGroupName': 'DBParameterGroups[].DBParameterGroupName',
1903 'OptionGroupName': 'OptionGroupMemberships[].OptionGroupName',
1904 'NewDBInstanceIdentifier': 'DBInstanceIdentifier',
1905 'Domain': 'DomainMemberships[].DomainName',
1906 'DBPortNumber': 'Endpoint.Port',
1907 'EnablePerformanceInsights': 'PerformanceInsightsEnabled',
1908 'CloudwatchLogsExportConfiguration': 'EnabledCloudwatchLogsExports'
1909 }
1910
1911 def validate(self):
1912 if self.data.get('update'):
1913 update_dict = dict((i['property'], i['value']) for i in self.data.get('update'))
1914 if ('MonitoringInterval' in update_dict and update_dict['MonitoringInterval'] > 0 and
1915 'MonitoringRoleARN' not in update_dict):
1916 raise PolicyValidationError(
1917 "A MonitoringRoleARN value is required \
1918 if you specify a MonitoringInterval value other than 0")
1919 if ('CloudwatchLogsExportConfiguration' in update_dict
1920 and all(
1921 k not in update_dict.get('CloudwatchLogsExportConfiguration')
1922 for k in ('EnableLogTypes', 'DisableLogTypes'))):
1923 raise PolicyValidationError(
1924 "A EnableLogTypes or DisableLogTypes input list is required\
1925 for setting CloudwatchLogsExportConfiguration")
1926 return self
1927
1928 def process(self, resources):
1929 c = local_session(self.manager.session_factory).client('rds')
1930 for r in resources:
1931 param = {
1932 u['property']: u['value'] for u in self.data.get('update')
1933 if r.get(
1934 u['property'],
1935 jmespath_search(
1936 self.conversion_map.get(u['property'], 'None'), r))
1937 != u['value']}
1938 if not param:
1939 continue
1940 param['ApplyImmediately'] = self.data.get('immediate', False)
1941 param['DBInstanceIdentifier'] = r['DBInstanceIdentifier']
1942 try:
1943 c.modify_db_instance(**param)
1944 except c.exceptions.DBInstanceNotFoundFault:
1945 raise
1946
1947
1948@resources.register('rds-reserved')
1949class ReservedRDS(QueryResourceManager):
1950 """Lists all active rds reservations
1951
1952 :example:
1953
1954 .. code-block:: yaml
1955
1956 policies:
1957 - name: existing-rds-reservations
1958 resource: rds-reserved
1959 filters:
1960 - State: active
1961 """
1962
1963 class resource_type(TypeInfo):
1964 service = 'rds'
1965 name = id = 'ReservedDBInstanceId'
1966 date = 'StartTime'
1967 enum_spec = (
1968 'describe_reserved_db_instances', 'ReservedDBInstances', None)
1969 filter_name = 'ReservedDBInstances'
1970 filter_type = 'list'
1971 arn_type = "ri"
1972 arn = "ReservedDBInstanceArn"
1973 permissions_enum = ('rds:DescribeReservedDBInstances',)
1974 universal_taggable = object()
1975
1976 augment = universal_augment
1977
1978
1979RDS.filter_registry.register('consecutive-aws-backups', ConsecutiveAwsBackupsFilter)
1980
1981
1982@filters.register('consecutive-snapshots')
1983class ConsecutiveSnapshots(Filter):
1984 """Returns instances where number of consective daily snapshots is
1985 equal to/or greater than n days.
1986
1987 :example:
1988
1989 .. code-block:: yaml
1990
1991 policies:
1992 - name: rds-daily-snapshot-count
1993 resource: rds
1994 filters:
1995 - type: consecutive-snapshots
1996 days: 7
1997 """
1998 schema = type_schema('consecutive-snapshots', days={'type': 'number', 'minimum': 1},
1999 required=['days'])
2000 permissions = ('rds:DescribeDBSnapshots', 'rds:DescribeDBInstances')
2001 annotation = 'c7n:DBSnapshots'
2002
2003 def process_resource_set(self, client, resources):
2004 rds_instances = [r['DBInstanceIdentifier'] for r in resources]
2005 paginator = client.get_paginator('describe_db_snapshots')
2006 paginator.PAGE_ITERATOR_CLS = RetryPageIterator
2007 db_snapshots = paginator.paginate(Filters=[{'Name': 'db-instance-id',
2008 'Values': rds_instances}]).build_full_result().get('DBSnapshots', [])
2009
2010 inst_map = {}
2011 for snapshot in db_snapshots:
2012 inst_map.setdefault(snapshot['DBInstanceIdentifier'], []).append(snapshot)
2013 for r in resources:
2014 r[self.annotation] = inst_map.get(r['DBInstanceIdentifier'], [])
2015
2016 def process(self, resources, event=None):
2017 client = local_session(self.manager.session_factory).client('rds')
2018 results = []
2019 retention = self.data.get('days')
2020 utcnow = datetime.datetime.utcnow()
2021 expected_dates = set()
2022 for days in range(1, retention + 1):
2023 expected_dates.add((utcnow - timedelta(days=days)).strftime('%Y-%m-%d'))
2024
2025 for resource_set in chunks(
2026 [r for r in resources if self.annotation not in r], 50):
2027 self.process_resource_set(client, resource_set)
2028
2029 for r in resources:
2030 snapshot_dates = set()
2031 for snapshot in r[self.annotation]:
2032 if snapshot['Status'] == 'available':
2033 snapshot_dates.add(snapshot['SnapshotCreateTime'].strftime('%Y-%m-%d'))
2034 if expected_dates.issubset(snapshot_dates):
2035 results.append(r)
2036 return results
2037
2038
2039@filters.register('engine')
2040class EngineFilter(ValueFilter):
2041 """
2042 Filter a rds resource based on its Engine Metadata
2043
2044 :example:
2045
2046 .. code-block:: yaml
2047
2048 policies:
2049 - name: find-deprecated-versions
2050 resource: aws.rds
2051 filters:
2052 - type: engine
2053 key: Status
2054 value: deprecated
2055 """
2056
2057 schema = type_schema('engine', rinherit=ValueFilter.schema)
2058
2059 permissions = ("rds:DescribeDBEngineVersions", )
2060
2061 def process(self, resources, event=None):
2062 client = local_session(self.manager.session_factory).client('rds')
2063
2064 engines = set()
2065 engine_versions = set()
2066 for r in resources:
2067 engines.add(r['Engine'])
2068 engine_versions.add(r['EngineVersion'])
2069
2070 paginator = client.get_paginator('describe_db_engine_versions')
2071 response = paginator.paginate(
2072 Filters=[
2073 {'Name': 'engine', 'Values': list(engines)},
2074 {'Name': 'engine-version', 'Values': list(engine_versions)}
2075 ],
2076 IncludeAll=True,
2077 )
2078 all_versions = {}
2079 matched = []
2080 for page in response:
2081 for e in page['DBEngineVersions']:
2082 all_versions.setdefault(e['Engine'], {})
2083 all_versions[e['Engine']][e['EngineVersion']] = e
2084 for r in resources:
2085 v = all_versions[r['Engine']][r['EngineVersion']]
2086 if self.match(v):
2087 r['c7n:Engine'] = v
2088 matched.append(r)
2089 return matched
2090
2091
2092class DescribeDBProxy(DescribeSource):
2093 def augment(self, resources):
2094 return universal_augment(self.manager, resources)
2095
2096
2097@resources.register('rds-proxy')
2098class RDSProxy(QueryResourceManager):
2099 """Resource Manager for RDS DB Proxies
2100
2101 :example:
2102
2103 .. code-block:: yaml
2104
2105 policies:
2106 - name: rds-proxy-tls-check
2107 resource: rds-proxy
2108 filters:
2109 - type: value
2110 key: RequireTLS
2111 value: false
2112 """
2113
2114 class resource_type(TypeInfo):
2115 service = 'rds'
2116 name = id = 'DBProxyName'
2117 date = 'CreatedDate'
2118 enum_spec = ('describe_db_proxies', 'DBProxies', None)
2119 arn = 'DBProxyArn'
2120 arn_type = 'db-proxy'
2121 cfn_type = 'AWS::RDS::DBProxy'
2122 permissions_enum = ('rds:DescribeDBProxies',)
2123 universal_taggable = object()
2124
2125 source_mapping = {
2126 'describe': DescribeDBProxy,
2127 'config': ConfigSource
2128 }
2129
2130
2131@RDSProxy.action_registry.register('delete')
2132class DeleteRDSProxy(BaseAction):
2133 """
2134 Deletes a RDS Proxy
2135
2136 :example:
2137
2138 .. code-block:: yaml
2139
2140 policies:
2141 - name: delete-rds-proxy
2142 resource: aws.rds-proxy
2143 filters:
2144 - type: value
2145 key: "DBProxyName"
2146 op: eq
2147 value: "proxy-test-1"
2148 actions:
2149 - type: delete
2150 """
2151
2152 schema = type_schema('delete')
2153
2154 permissions = ('rds:DeleteDBProxy',)
2155
2156 def process(self, resources):
2157 client = local_session(self.manager.session_factory).client('rds')
2158 for r in resources:
2159 self.manager.retry(
2160 client.delete_db_proxy, DBProxyName=r['DBProxyName'],
2161 ignore_err_codes=('DBProxyNotFoundFault',
2162 'InvalidDBProxyStateFault'))
2163
2164
2165@RDSProxy.filter_registry.register('subnet')
2166class RDSProxySubnetFilter(net_filters.SubnetFilter):
2167
2168 RelatedIdsExpression = "VpcSubnetIds[]"
2169
2170
2171@RDSProxy.filter_registry.register('security-group')
2172class RDSProxySecurityGroupFilter(net_filters.SecurityGroupFilter):
2173
2174 RelatedIdsExpression = "VpcSecurityGroupIds[]"
2175
2176
2177@RDSProxy.filter_registry.register('vpc')
2178class RDSProxyVpcFilter(net_filters.VpcFilter):
2179
2180 RelatedIdsExpression = "VpcId"
2181
2182
2183@filters.register('db-option-groups')
2184class DbOptionGroups(ValueFilter):
2185 """This filter describes RDS option groups for associated RDS instances.
2186 Use this filter in conjunction with jmespath and value filter operators
2187 to filter RDS instance based on their option groups
2188
2189 :example:
2190
2191 .. code-block:: yaml
2192
2193 policies:
2194 - name: rds-data-in-transit-encrypted
2195 resource: aws.rds
2196 filters:
2197 - type: db-option-groups
2198 key: Options[].OptionName
2199 op: intersect
2200 value:
2201 - SSL
2202 - NATIVE_NETWORK_ENCRYPTION
2203
2204 :example:
2205
2206 .. code-block:: yaml
2207
2208 policies:
2209 - name: rds-oracle-encryption-in-transit
2210 resource: aws.rds
2211 filters:
2212 - Engine: oracle-ee
2213 - type: db-option-groups
2214 key: Options[].OptionSettings[?Name == 'SQLNET.ENCRYPTION_SERVER'].Value[]
2215 value:
2216 - REQUIRED
2217 """
2218
2219 schema = type_schema('db-option-groups', rinherit=ValueFilter.schema)
2220 schema_alias = False
2221 permissions = ('rds:DescribeDBInstances', 'rds:DescribeOptionGroups', )
2222 policy_annotation = 'c7n:MatchedDBOptionGroups'
2223
2224 def handle_optiongroup_cache(self, client, paginator, option_groups):
2225 ogcache = {}
2226 cache = self.manager._cache
2227
2228 with cache:
2229 for og in option_groups:
2230 cache_key = {
2231 'region': self.manager.config.region,
2232 'account_id': self.manager.config.account_id,
2233 'rds-pg': og}
2234 og_values = cache.get(cache_key)
2235 if og_values is not None:
2236 ogcache[og] = og_values
2237 continue
2238 option_groups_list = list(itertools.chain(*[p['OptionGroupsList']
2239 for p in paginator.paginate(OptionGroupName=og)]))
2240
2241 ogcache[og] = {}
2242 for option_group in option_groups_list:
2243 ogcache[og] = option_group
2244
2245 cache.save(cache_key, ogcache[og])
2246
2247 return ogcache
2248
2249 def process(self, resources, event=None):
2250 results = []
2251 client = local_session(self.manager.session_factory).client('rds')
2252 paginator = client.get_paginator('describe_option_groups')
2253 option_groups = [db['OptionGroupMemberships'][0]['OptionGroupName']
2254 for db in resources]
2255 optioncache = self.handle_optiongroup_cache(client, paginator, option_groups)
2256
2257 for resource in resources:
2258 for og in resource['OptionGroupMemberships']:
2259 og_values = optioncache[og['OptionGroupName']]
2260 if self.match(og_values):
2261 resource.setdefault(self.policy_annotation, []).append({
2262 k: jmespath_search(k, og_values)
2263 for k in {'OptionGroupName', self.data.get('key')}
2264 })
2265 results.append(resource)
2266 break
2267
2268 return results
2269
2270
2271@filters.register('pending-maintenance')
2272class PendingMaintenance(Filter):
2273 """Scan DB instances for those with pending maintenance
2274
2275 :example:
2276
2277 .. code-block:: yaml
2278
2279 policies:
2280 - name: rds-pending-maintenance
2281 resource: aws.rds
2282 filters:
2283 - pending-maintenance
2284 - type: value
2285 key: '"c7n:PendingMaintenance"[].PendingMaintenanceActionDetails[].Action'
2286 op: intersect
2287 value:
2288 - system-update
2289 """
2290
2291 annotation_key = 'c7n:PendingMaintenance'
2292 schema = type_schema('pending-maintenance')
2293 permissions = ('rds:DescribePendingMaintenanceActions',)
2294
2295 def process(self, resources, event=None):
2296 client = local_session(self.manager.session_factory).client('rds')
2297
2298 results = []
2299 resource_maintenances = {}
2300 paginator = client.get_paginator('describe_pending_maintenance_actions')
2301 for page in paginator.paginate():
2302 for action in page['PendingMaintenanceActions']:
2303 resource_maintenances.setdefault(action['ResourceIdentifier'], []).append(action)
2304
2305 for r in resources:
2306 pending_maintenances = resource_maintenances.get(r['DBInstanceArn'], [])
2307 if len(pending_maintenances) > 0:
2308 r[self.annotation_key] = pending_maintenances
2309 results.append(r)
2310
2311 return results