Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/c7n/resources/rds.py: 40%
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1# Copyright The Cloud Custodian Authors.
2# SPDX-License-Identifier: Apache-2.0
3"""
4RDS Resource Manager
5====================
7Example Policies
8----------------
10Find rds instances that are publicly available
12.. code-block:: yaml
14 policies:
15 - name: rds-public
16 resource: rds
17 filters:
18 - PubliclyAccessible: true
20Find rds instances that are not encrypted
22.. code-block:: yaml
24 policies:
25 - name: rds-non-encrypted
26 resource: rds
27 filters:
28 - type: value
29 key: StorageEncrypted
30 value: true
31 op: ne
33"""
34import functools
35import itertools
36import logging
37import operator
38import re
39import datetime
41from datetime import timedelta
43from decimal import Decimal as D, ROUND_HALF_UP
45from c7n.vendored.distutils.version import LooseVersion
46from botocore.exceptions import ClientError
47from concurrent.futures import as_completed
49from c7n.actions import (
50 ActionRegistry, BaseAction, ModifyVpcSecurityGroupsAction)
52from c7n.exceptions import PolicyValidationError
53from c7n.filters import (
54 CrossAccountAccessFilter, FilterRegistry, Filter, ValueFilter, AgeFilter)
55from c7n.filters.offhours import OffHour, OnHour
56from c7n.filters import related
57import c7n.filters.vpc as net_filters
58from c7n.manager import resources
59from c7n.query import (
60 QueryResourceManager, DescribeSource, ConfigSource, TypeInfo, RetryPageIterator)
61from c7n import deprecated, tags
62from c7n.tags import universal_augment
64from c7n.utils import (
65 local_session, type_schema, get_retry, chunks, snapshot_identifier,
66 merge_dict_list, filter_empty, jmespath_search)
67from c7n.resources.kms import ResourceKmsKeyAlias
68from c7n.resources.securityhub import PostFinding
69from c7n.filters.backup import ConsecutiveAwsBackupsFilter
71log = logging.getLogger('custodian.rds')
73filters = FilterRegistry('rds.filters')
74actions = ActionRegistry('rds.actions')
77class DescribeRDS(DescribeSource):
79 def augment(self, dbs):
80 for d in dbs:
81 d['Tags'] = d.pop('TagList', ())
82 return dbs
85class ConfigRDS(ConfigSource):
87 def load_resource(self, item):
88 resource = super().load_resource(item)
89 for k in list(resource.keys()):
90 if k.startswith('Db'):
91 resource["DB%s" % k[2:]] = resource[k]
92 return resource
95@resources.register('rds')
96class RDS(QueryResourceManager):
97 """Resource manager for RDS DB instances.
98 """
100 class resource_type(TypeInfo):
101 service = 'rds'
102 arn_type = 'db'
103 arn_separator = ':'
104 enum_spec = ('describe_db_instances', 'DBInstances', None)
105 id = 'DBInstanceIdentifier'
106 config_id = 'DbiResourceId'
107 name = 'Endpoint.Address'
108 filter_name = 'DBInstanceIdentifier'
109 filter_type = 'scalar'
110 date = 'InstanceCreateTime'
111 dimension = 'DBInstanceIdentifier'
112 cfn_type = config_type = 'AWS::RDS::DBInstance'
113 arn = 'DBInstanceArn'
114 universal_taggable = True
115 default_report_fields = (
116 'DBInstanceIdentifier',
117 'DBName',
118 'Engine',
119 'EngineVersion',
120 'MultiAZ',
121 'AllocatedStorage',
122 'StorageEncrypted',
123 'PubliclyAccessible',
124 'InstanceCreateTime',
125 )
126 permissions_enum = ('rds:DescribeDBInstances',)
128 filter_registry = filters
129 action_registry = actions
131 def resources(self, query=None):
132 if query is None and 'query' in self.data:
133 query = merge_dict_list(self.data['query'])
134 elif query is None:
135 query = {}
136 return super(RDS, self).resources(query=query)
138 source_mapping = {
139 'describe': DescribeRDS,
140 'config': ConfigRDS
141 }
144def _db_instance_eligible_for_backup(resource):
145 db_instance_id = resource['DBInstanceIdentifier']
147 # Database instance is not in available state
148 if resource.get('DBInstanceStatus', '') != 'available':
149 log.debug(
150 "DB instance %s is not in available state",
151 db_instance_id)
152 return False
153 # The specified DB Instance is a member of a cluster and its
154 # backup retention should not be modified directly. Instead,
155 # modify the backup retention of the cluster using the
156 # ModifyDbCluster API
157 if resource.get('DBClusterIdentifier', ''):
158 log.debug(
159 "DB instance %s is a cluster member",
160 db_instance_id)
161 return False
162 # DB Backups not supported on a read replica for engine postgres
163 if (resource.get('ReadReplicaSourceDBInstanceIdentifier', '') and
164 resource.get('Engine', '') == 'postgres'):
165 log.debug(
166 "DB instance %s is a postgres read-replica",
167 db_instance_id)
168 return False
169 # DB Backups not supported on a read replica running a mysql
170 # version before 5.6
171 if (resource.get('ReadReplicaSourceDBInstanceIdentifier', '') and
172 resource.get('Engine', '') == 'mysql'):
173 engine_version = resource.get('EngineVersion', '')
174 # Assume "<major>.<minor>.<whatever>"
175 match = re.match(r'(?P<major>\d+)\.(?P<minor>\d+)\..*', engine_version)
176 if (match and int(match.group('major')) < 5 or
177 (int(match.group('major')) == 5 and int(match.group('minor')) < 6)):
178 log.debug(
179 "DB instance %s is a version %s mysql read-replica",
180 db_instance_id,
181 engine_version)
182 return False
183 return True
186def _db_instance_eligible_for_final_snapshot(resource):
187 status = resource.get('DBInstanceStatus', '')
188 # If the DB instance you are deleting has a status of "Creating,"
189 # you will not be able to have a final DB snapshot taken
190 # If the DB instance is in a failure state with a status of "failed,"
191 # "incompatible-restore," or "incompatible-network," you can only delete
192 # the instance when the SkipFinalSnapshot parameter is set to "true."
193 eligible_for_final_snapshot = True
194 if status in ['creating', 'failed', 'incompatible-restore', 'incompatible-network']:
195 eligible_for_final_snapshot = False
197 # FinalDBSnapshotIdentifier can not be specified when deleting a
198 # replica instance
199 if resource.get('ReadReplicaSourceDBInstanceIdentifier', ''):
200 eligible_for_final_snapshot = False
202 # if it's a rds-cluster, don't try to run the rds instance snapshot api call
203 if resource.get('DBClusterIdentifier', False):
204 eligible_for_final_snapshot = False
206 if not eligible_for_final_snapshot:
207 log.debug('DB instance is not eligible for a snapshot:/n %s', resource)
208 return eligible_for_final_snapshot
211def _get_available_engine_upgrades(client, major=False):
212 """Returns all extant rds engine upgrades.
214 As a nested mapping of engine type to known versions
215 and their upgrades.
217 Defaults to minor upgrades, but configurable to major.
219 Example::
221 >>> _get_available_engine_upgrades(client)
222 {
223 'oracle-se2': {'12.1.0.2.v2': '12.1.0.2.v5',
224 '12.1.0.2.v3': '12.1.0.2.v5'},
225 'postgres': {'9.3.1': '9.3.14',
226 '9.3.10': '9.3.14',
227 '9.3.12': '9.3.14',
228 '9.3.2': '9.3.14'}
229 }
230 """
231 results = {}
232 paginator = client.get_paginator('describe_db_engine_versions')
233 for page in paginator.paginate():
234 engine_versions = page['DBEngineVersions']
235 for v in engine_versions:
236 if v['Engine'] not in results:
237 results[v['Engine']] = {}
238 if 'ValidUpgradeTarget' not in v or len(v['ValidUpgradeTarget']) == 0:
239 continue
240 for t in v['ValidUpgradeTarget']:
241 if not major and t['IsMajorVersionUpgrade']:
242 continue
243 if LooseVersion(t['EngineVersion']) > LooseVersion(
244 results[v['Engine']].get(v['EngineVersion'], '0.0.0')):
245 results[v['Engine']][v['EngineVersion']] = t['EngineVersion']
246 return results
249filters.register('offhour', OffHour)
250filters.register('onhour', OnHour)
253@filters.register('default-vpc')
254class DefaultVpc(net_filters.DefaultVpcBase):
255 """ Matches if an rds database is in the default vpc
257 :example:
259 .. code-block:: yaml
261 policies:
262 - name: default-vpc-rds
263 resource: rds
264 filters:
265 - type: default-vpc
266 """
267 schema = type_schema('default-vpc')
269 def __call__(self, rdb):
270 return self.match(rdb['DBSubnetGroup']['VpcId'])
273@filters.register('security-group')
274class SecurityGroupFilter(net_filters.SecurityGroupFilter):
276 RelatedIdsExpression = "VpcSecurityGroups[].VpcSecurityGroupId"
279@filters.register('subnet')
280class SubnetFilter(net_filters.SubnetFilter):
282 RelatedIdsExpression = "DBSubnetGroup.Subnets[].SubnetIdentifier"
285@filters.register('vpc')
286class VpcFilter(net_filters.VpcFilter):
288 RelatedIdsExpression = "DBSubnetGroup.VpcId"
291filters.register('network-location', net_filters.NetworkLocation)
294@filters.register('kms-alias')
295class KmsKeyAlias(ResourceKmsKeyAlias):
297 def process(self, dbs, event=None):
298 return self.get_matching_aliases(dbs)
301@actions.register('auto-patch')
302class AutoPatch(BaseAction):
303 """Toggle AutoMinorUpgrade flag on RDS instance
305 'window' parameter needs to be in the format 'ddd:hh:mm-ddd:hh:mm' and
306 have at least 30 minutes between start & end time.
307 If 'window' is not specified, AWS will assign a random maintenance window
308 to each instance selected.
310 :example:
312 .. code-block:: yaml
314 policies:
315 - name: enable-rds-autopatch
316 resource: rds
317 filters:
318 - AutoMinorVersionUpgrade: false
319 actions:
320 - type: auto-patch
321 minor: true
322 window: Mon:23:00-Tue:01:00
323 """
325 schema = type_schema(
326 'auto-patch',
327 minor={'type': 'boolean'}, window={'type': 'string'})
328 permissions = ('rds:ModifyDBInstance',)
330 def process(self, dbs):
331 client = local_session(
332 self.manager.session_factory).client('rds')
334 params = {'AutoMinorVersionUpgrade': self.data.get('minor', True)}
335 if self.data.get('window'):
336 params['PreferredMaintenanceWindow'] = self.data['window']
338 for db in dbs:
339 client.modify_db_instance(
340 DBInstanceIdentifier=db['DBInstanceIdentifier'],
341 **params)
344@filters.register('upgrade-available')
345class UpgradeAvailable(Filter):
346 """ Scan DB instances for available engine upgrades
348 This will pull DB instances & check their specific engine for any
349 engine version with higher release numbers than the current one
351 This will also annotate the rds instance with 'target_engine' which is
352 the most recent version of the engine available
354 :example:
356 .. code-block:: yaml
358 policies:
359 - name: rds-upgrade-available
360 resource: rds
361 filters:
362 - type: upgrade-available
363 major: False
365 """
367 schema = type_schema('upgrade-available',
368 major={'type': 'boolean'},
369 value={'type': 'boolean'})
370 permissions = ('rds:DescribeDBEngineVersions',)
372 def process(self, resources, event=None):
373 client = local_session(self.manager.session_factory).client('rds')
374 check_upgrade_extant = self.data.get('value', True)
375 check_major = self.data.get('major', False)
376 engine_upgrades = _get_available_engine_upgrades(
377 client, major=check_major)
378 results = []
380 for r in resources:
381 target_upgrade = engine_upgrades.get(
382 r['Engine'], {}).get(r['EngineVersion'])
383 if target_upgrade is None:
384 if check_upgrade_extant is False:
385 results.append(r)
386 continue
387 r['c7n-rds-engine-upgrade'] = target_upgrade
388 results.append(r)
389 return results
392@actions.register('upgrade')
393class UpgradeMinor(BaseAction):
394 """Upgrades a RDS instance to the latest major/minor version available
396 Use of the 'immediate' flag (default False) will automatically upgrade
397 the RDS engine disregarding the existing maintenance window.
399 :example:
401 .. code-block:: yaml
403 policies:
404 - name: upgrade-rds-minor
405 resource: rds
406 actions:
407 - type: upgrade
408 major: False
409 immediate: False
411 """
413 schema = type_schema(
414 'upgrade',
415 major={'type': 'boolean'},
416 immediate={'type': 'boolean'})
417 permissions = ('rds:ModifyDBInstance',)
419 def process(self, resources):
420 client = local_session(self.manager.session_factory).client('rds')
421 engine_upgrades = None
422 for r in resources:
423 if 'EngineVersion' in r['PendingModifiedValues']:
424 # Upgrade has already been scheduled
425 continue
426 if 'c7n-rds-engine-upgrade' not in r:
427 if engine_upgrades is None:
428 engine_upgrades = _get_available_engine_upgrades(
429 client, major=self.data.get('major', False))
430 target = engine_upgrades.get(
431 r['Engine'], {}).get(r['EngineVersion'])
432 if target is None:
433 log.debug(
434 "implicit filter no upgrade on %s",
435 r['DBInstanceIdentifier'])
436 continue
437 r['c7n-rds-engine-upgrade'] = target
438 client.modify_db_instance(
439 DBInstanceIdentifier=r['DBInstanceIdentifier'],
440 EngineVersion=r['c7n-rds-engine-upgrade'],
441 ApplyImmediately=self.data.get('immediate', False))
444@actions.register('tag-trim')
445class TagTrim(tags.TagTrim):
447 permissions = ('rds:RemoveTagsFromResource',)
449 def process_tag_removal(self, client, resource, candidates):
450 client.remove_tags_from_resource(ResourceName=resource['DBInstanceArn'], TagKeys=candidates)
453START_STOP_ELIGIBLE_ENGINES = {
454 'postgres', 'sqlserver-ee',
455 'oracle-se2', 'mariadb', 'oracle-ee',
456 'sqlserver-ex', 'sqlserver-se', 'oracle-se',
457 'mysql', 'oracle-se1', 'sqlserver-web',
458 'db2-ae', 'db2-se', 'oracle-ee-cdb',
459 'sqlserver-ee', 'oracle-se2-cdb'}
462def _eligible_start_stop(db, state="available"):
463 # See conditions noted here
464 # https://docs.aws.amazon.com/AmazonRDS/latest/UserGuide/USER_StopInstance.html
465 # Note that this doesn't really specify what happens for all the nosql engines
466 # that are available as rds engines.
467 if db.get('DBInstanceStatus') != state:
468 return False
470 if db.get('MultiAZ') and db['Engine'].startswith('sqlserver-'):
471 return False
473 if db['Engine'] not in START_STOP_ELIGIBLE_ENGINES:
474 return False
476 if db.get('ReadReplicaDBInstanceIdentifiers'):
477 return False
479 if db.get('ReadReplicaSourceDBInstanceIdentifier'):
480 return False
482 # TODO is SQL Server mirror is detectable.
483 return True
486@actions.register('stop')
487class Stop(BaseAction):
488 """Stop an rds instance.
490 https://docs.aws.amazon.com/AmazonRDS/latest/UserGuide/USER_StopInstance.html
491 """
493 schema = type_schema('stop')
495 permissions = ("rds:StopDBInstance",)
497 def process(self, resources):
498 client = local_session(self.manager.session_factory).client('rds')
499 for r in filter(_eligible_start_stop, resources):
500 try:
501 client.stop_db_instance(
502 DBInstanceIdentifier=r['DBInstanceIdentifier'])
503 except ClientError as e:
504 log.exception(
505 "Error stopping db instance:%s err:%s",
506 r['DBInstanceIdentifier'], e)
509@actions.register('start')
510class Start(BaseAction):
511 """Start an rds instance.
512 """
514 schema = type_schema('start')
516 permissions = ("rds:StartDBInstance",)
518 def process(self, resources):
519 client = local_session(self.manager.session_factory).client('rds')
520 start_filter = functools.partial(_eligible_start_stop, state='stopped')
521 for r in filter(start_filter, resources):
522 try:
523 client.start_db_instance(
524 DBInstanceIdentifier=r['DBInstanceIdentifier'])
525 except ClientError as e:
526 log.exception(
527 "Error starting db instance:%s err:%s",
528 r['DBInstanceIdentifier'], e)
531@actions.register('delete')
532class Delete(BaseAction):
533 """Deletes selected RDS instances
535 This will delete RDS instances. It is recommended to apply with a filter
536 to avoid deleting all RDS instances in the account.
538 :example:
540 .. code-block:: yaml
542 policies:
543 - name: rds-delete
544 resource: rds
545 filters:
546 - default-vpc
547 actions:
548 - type: delete
549 skip-snapshot: true
550 """
552 schema = type_schema('delete', **{
553 'skip-snapshot': {'type': 'boolean'},
554 'copy-restore-info': {'type': 'boolean'}
555 })
557 permissions = ('rds:DeleteDBInstance', 'rds:AddTagsToResource')
559 def validate(self):
560 if self.data.get('skip-snapshot', False) and self.data.get(
561 'copy-restore-info'):
562 raise PolicyValidationError(
563 "skip-snapshot cannot be specified with copy-restore-info on %s" % (
564 self.manager.data,))
565 return self
567 def process(self, dbs):
568 skip = self.data.get('skip-snapshot', False)
569 # Can't delete an instance in an aurora cluster, use a policy on the cluster
570 dbs = [r for r in dbs if not r.get('DBClusterIdentifier')]
571 # Concurrency feels like overkill here.
572 client = local_session(self.manager.session_factory).client('rds')
573 for db in dbs:
574 params = dict(
575 DBInstanceIdentifier=db['DBInstanceIdentifier'])
576 if skip or not _db_instance_eligible_for_final_snapshot(db):
577 params['SkipFinalSnapshot'] = True
578 else:
579 params['FinalDBSnapshotIdentifier'] = snapshot_identifier(
580 'Final', db['DBInstanceIdentifier'])
581 if self.data.get('copy-restore-info', False):
582 self.copy_restore_info(client, db)
583 if not db['CopyTagsToSnapshot']:
584 client.modify_db_instance(
585 DBInstanceIdentifier=db['DBInstanceIdentifier'],
586 CopyTagsToSnapshot=True)
587 self.log.info(
588 "Deleting rds: %s snapshot: %s",
589 db['DBInstanceIdentifier'],
590 params.get('FinalDBSnapshotIdentifier', False))
592 try:
593 client.delete_db_instance(**params)
594 except ClientError as e:
595 if e.response['Error']['Code'] == "InvalidDBInstanceState":
596 continue
597 raise
599 return dbs
601 def copy_restore_info(self, client, instance):
602 tags = []
603 tags.append({
604 'Key': 'VPCSecurityGroups',
605 'Value': ''.join([
606 g['VpcSecurityGroupId'] for g in instance['VpcSecurityGroups']
607 ])})
608 tags.append({
609 'Key': 'OptionGroupName',
610 'Value': instance['OptionGroupMemberships'][0]['OptionGroupName']})
611 tags.append({
612 'Key': 'ParameterGroupName',
613 'Value': instance['DBParameterGroups'][0]['DBParameterGroupName']})
614 tags.append({
615 'Key': 'InstanceClass',
616 'Value': instance['DBInstanceClass']})
617 tags.append({
618 'Key': 'StorageType',
619 'Value': instance['StorageType']})
620 tags.append({
621 'Key': 'MultiAZ',
622 'Value': str(instance['MultiAZ'])})
623 tags.append({
624 'Key': 'DBSubnetGroupName',
625 'Value': instance['DBSubnetGroup']['DBSubnetGroupName']})
626 client.add_tags_to_resource(
627 ResourceName=self.manager.generate_arn(
628 instance['DBInstanceIdentifier']),
629 Tags=tags)
632@actions.register('set-snapshot-copy-tags')
633class CopySnapshotTags(BaseAction):
634 """Enables copying tags from rds instance to snapshot
636 DEPRECATED - use modify-db instead with `CopyTagsToSnapshot`
638 :example:
640 .. code-block:: yaml
642 policies:
643 - name: enable-rds-snapshot-tags
644 resource: rds
645 filters:
646 - type: value
647 key: Engine
648 value: aurora
649 op: eq
650 actions:
651 - type: set-snapshot-copy-tags
652 enable: True
653 """
654 deprecations = (
655 deprecated.action("use modify-db instead with `CopyTagsToSnapshot`"),
656 )
658 schema = type_schema(
659 'set-snapshot-copy-tags',
660 enable={'type': 'boolean'})
661 permissions = ('rds:ModifyDBInstance',)
663 def process(self, resources):
664 error = None
665 with self.executor_factory(max_workers=2) as w:
666 futures = {}
667 client = local_session(self.manager.session_factory).client('rds')
668 resources = [r for r in resources
669 if r['CopyTagsToSnapshot'] != self.data.get('enable', True)]
670 for r in resources:
671 futures[w.submit(self.set_snapshot_tags, client, r)] = r
672 for f in as_completed(futures):
673 if f.exception():
674 error = f.exception()
675 self.log.error(
676 'error updating rds:%s CopyTagsToSnapshot \n %s',
677 futures[f]['DBInstanceIdentifier'], error)
678 if error:
679 raise error
680 return resources
682 def set_snapshot_tags(self, client, r):
683 self.manager.retry(
684 client.modify_db_instance,
685 DBInstanceIdentifier=r['DBInstanceIdentifier'],
686 CopyTagsToSnapshot=self.data.get('enable', True))
689@RDS.action_registry.register('post-finding')
690class DbInstanceFinding(PostFinding):
692 resource_type = 'AwsRdsDbInstance'
694 def format_resource(self, r):
696 fields = [
697 'AssociatedRoles', 'CACertificateIdentifier', 'DBClusterIdentifier',
698 'DBInstanceIdentifier', 'DBInstanceClass', 'DbInstancePort', 'DbiResourceId',
699 'DBName', 'DeletionProtection', 'Endpoint', 'Engine', 'EngineVersion',
700 'IAMDatabaseAuthenticationEnabled', 'InstanceCreateTime', 'KmsKeyId',
701 'PubliclyAccessible', 'StorageEncrypted',
702 'TdeCredentialArn', 'VpcSecurityGroups', 'MultiAz', 'EnhancedMonitoringResourceArn',
703 'DbInstanceStatus', 'MasterUsername',
704 'AllocatedStorage', 'PreferredBackupWindow', 'BackupRetentionPeriod',
705 'DbSecurityGroups', 'DbParameterGroups',
706 'AvailabilityZone', 'DbSubnetGroup', 'PreferredMaintenanceWindow',
707 'PendingModifiedValues', 'LatestRestorableTime',
708 'AutoMinorVersionUpgrade', 'ReadReplicaSourceDBInstanceIdentifier',
709 'ReadReplicaDBInstanceIdentifiers',
710 'ReadReplicaDBClusterIdentifiers', 'LicenseModel', 'Iops', 'OptionGroupMemberships',
711 'CharacterSetName',
712 'SecondaryAvailabilityZone', 'StatusInfos', 'StorageType', 'DomainMemberships',
713 'CopyTagsToSnapshot',
714 'MonitoringInterval', 'MonitoringRoleArn', 'PromotionTier', 'Timezone',
715 'PerformanceInsightsEnabled',
716 'PerformanceInsightsKmsKeyId', 'PerformanceInsightsRetentionPeriod',
717 'EnabledCloudWatchLogsExports',
718 'ProcessorFeatures', 'ListenerEndpoint', 'MaxAllocatedStorage'
719 ]
720 details = {}
721 for f in fields:
722 if r.get(f):
723 value = r[f]
724 if isinstance(r[f], datetime.datetime):
725 value = r[f].isoformat()
726 details.setdefault(f, value)
728 db_instance = {
729 'Type': self.resource_type,
730 'Id': r['DBInstanceArn'],
731 'Region': self.manager.config.region,
732 'Tags': {t['Key']: t['Value'] for t in r.get('Tags', [])},
733 'Details': {self.resource_type: filter_empty(details)},
734 }
735 db_instance = filter_empty(db_instance)
736 return db_instance
739@actions.register('snapshot')
740class Snapshot(BaseAction):
741 """Creates a manual snapshot of a RDS instance
743 :example:
745 .. code-block:: yaml
747 policies:
748 - name: rds-snapshot
749 resource: rds
750 actions:
751 - snapshot
752 """
754 schema = type_schema('snapshot')
755 permissions = ('rds:CreateDBSnapshot',)
757 def process(self, dbs):
758 with self.executor_factory(max_workers=3) as w:
759 futures = []
760 for db in dbs:
761 futures.append(w.submit(
762 self.process_rds_snapshot,
763 db))
764 for f in as_completed(futures):
765 if f.exception():
766 self.log.error(
767 "Exception creating rds snapshot \n %s",
768 f.exception())
769 return dbs
771 def process_rds_snapshot(self, resource):
772 if not _db_instance_eligible_for_backup(resource):
773 return
775 c = local_session(self.manager.session_factory).client('rds')
776 c.create_db_snapshot(
777 DBSnapshotIdentifier=snapshot_identifier(
778 self.data.get('snapshot-prefix', 'Backup'),
779 resource['DBInstanceIdentifier']),
780 DBInstanceIdentifier=resource['DBInstanceIdentifier'])
783@actions.register('resize')
784class ResizeInstance(BaseAction):
785 """Change the allocated storage of an rds instance.
787 :example:
789 This will find databases using over 85% of their allocated
790 storage, and resize them to have an additional 30% storage
791 the resize here is async during the next maintenance.
793 .. code-block:: yaml
795 policies:
796 - name: rds-resize-up
797 resource: rds
798 filters:
799 - type: metrics
800 name: FreeStorageSpace
801 percent-attr: AllocatedStorage
802 attr-multiplier: 1073741824
803 value: 90
804 op: greater-than
805 actions:
806 - type: resize
807 percent: 30
810 This will find databases using under 20% of their allocated
811 storage, and resize them to be 30% smaller, the resize here
812 is configured to be immediate.
814 .. code-block:: yaml
816 policies:
817 - name: rds-resize-down
818 resource: rds
819 filters:
820 - type: metrics
821 name: FreeStorageSpace
822 percent-attr: AllocatedStorage
823 attr-multiplier: 1073741824
824 value: 90
825 op: greater-than
826 actions:
827 - type: resize
828 percent: -30
829 immediate: true
830 """
831 schema = type_schema(
832 'resize',
833 percent={'type': 'number'},
834 immediate={'type': 'boolean'})
836 permissions = ('rds:ModifyDBInstance',)
838 def process(self, resources):
839 c = local_session(self.manager.session_factory).client('rds')
840 for r in resources:
841 old_val = D(r['AllocatedStorage'])
842 _100 = D(100)
843 new_val = ((_100 + D(self.data['percent'])) / _100) * old_val
844 rounded = int(new_val.quantize(D('0'), ROUND_HALF_UP))
845 c.modify_db_instance(
846 DBInstanceIdentifier=r['DBInstanceIdentifier'],
847 AllocatedStorage=rounded,
848 ApplyImmediately=self.data.get('immediate', False))
851@actions.register('retention')
852class RetentionWindow(BaseAction):
853 """
854 Sets the 'BackupRetentionPeriod' value for automated snapshots,
855 enforce (min, max, exact) sets retention days occordingly.
856 :example:
858 .. code-block:: yaml
860 policies:
861 - name: rds-snapshot-retention
862 resource: rds
863 filters:
864 - type: value
865 key: BackupRetentionPeriod
866 value: 7
867 op: lt
868 actions:
869 - type: retention
870 days: 7
871 copy-tags: true
872 enforce: exact
873 """
875 date_attribute = "BackupRetentionPeriod"
876 schema = type_schema(
877 'retention', **{'days': {'type': 'number'},
878 'copy-tags': {'type': 'boolean'},
879 'enforce': {'type': 'string', 'enum': [
880 'min', 'max', 'exact']}})
881 permissions = ('rds:ModifyDBInstance',)
883 def process(self, dbs):
884 with self.executor_factory(max_workers=3) as w:
885 futures = []
886 for db in dbs:
887 futures.append(w.submit(
888 self.process_snapshot_retention,
889 db))
890 for f in as_completed(futures):
891 if f.exception():
892 self.log.error(
893 "Exception setting rds retention \n %s",
894 f.exception())
895 return dbs
897 def process_snapshot_retention(self, resource):
898 current_retention = int(resource.get('BackupRetentionPeriod', 0))
899 current_copy_tags = resource['CopyTagsToSnapshot']
900 new_retention = self.data['days']
901 new_copy_tags = self.data.get('copy-tags', True)
902 retention_type = self.data.get('enforce', 'min').lower()
904 if ((retention_type == 'min' or
905 current_copy_tags != new_copy_tags) and
906 _db_instance_eligible_for_backup(resource)):
907 self.set_retention_window(
908 resource,
909 max(current_retention, new_retention),
910 new_copy_tags)
911 return resource
913 if ((retention_type == 'max' or
914 current_copy_tags != new_copy_tags) and
915 _db_instance_eligible_for_backup(resource)):
916 self.set_retention_window(
917 resource,
918 min(current_retention, new_retention),
919 new_copy_tags)
920 return resource
922 if ((retention_type == 'exact' or
923 current_copy_tags != new_copy_tags) and
924 _db_instance_eligible_for_backup(resource)):
925 self.set_retention_window(resource, new_retention, new_copy_tags)
926 return resource
928 def set_retention_window(self, resource, retention, copy_tags):
929 c = local_session(self.manager.session_factory).client('rds')
930 c.modify_db_instance(
931 DBInstanceIdentifier=resource['DBInstanceIdentifier'],
932 BackupRetentionPeriod=retention,
933 CopyTagsToSnapshot=copy_tags)
936@actions.register('set-public-access')
937class RDSSetPublicAvailability(BaseAction):
938 """
939 This action allows for toggling an RDS instance
940 'PubliclyAccessible' flag to true or false
942 :example:
944 .. code-block:: yaml
946 policies:
947 - name: disable-rds-public-accessibility
948 resource: rds
949 filters:
950 - PubliclyAccessible: true
951 actions:
952 - type: set-public-access
953 state: false
954 """
956 schema = type_schema(
957 "set-public-access",
958 state={'type': 'boolean'})
959 permissions = ('rds:ModifyDBInstance',)
961 def set_accessibility(self, r):
962 client = local_session(self.manager.session_factory).client('rds')
963 client.modify_db_instance(
964 DBInstanceIdentifier=r['DBInstanceIdentifier'],
965 PubliclyAccessible=self.data.get('state', False))
967 def process(self, rds):
968 with self.executor_factory(max_workers=2) as w:
969 futures = {w.submit(self.set_accessibility, r): r for r in rds}
970 for f in as_completed(futures):
971 if f.exception():
972 self.log.error(
973 "Exception setting public access on %s \n %s",
974 futures[f]['DBInstanceIdentifier'], f.exception())
975 return rds
978@resources.register('rds-subscription')
979class RDSSubscription(QueryResourceManager):
981 class resource_type(TypeInfo):
982 service = 'rds'
983 arn_type = 'es'
984 cfn_type = 'AWS::RDS::EventSubscription'
985 enum_spec = (
986 'describe_event_subscriptions', 'EventSubscriptionsList', None)
987 name = id = "CustSubscriptionId"
988 arn = 'EventSubscriptionArn'
989 date = "SubscriptionCreateTime"
990 permissions_enum = ('rds:DescribeEventSubscriptions',)
991 universal_taggable = object()
993 augment = universal_augment
996@RDSSubscription.filter_registry.register('topic')
997class RDSSubscriptionSNSTopic(related.RelatedResourceFilter):
998 """
999 Retrieves topics related to RDS event subscriptions
1001 :example:
1003 .. code-block:: yaml
1005 policies:
1006 - name: rds-subscriptions-no-confirmed-topics
1007 resource: aws.rds-subscription
1008 filters:
1009 - type: topic
1010 key: SubscriptionsConfirmed
1011 value: 0
1012 value_type: integer
1013 """
1014 schema = type_schema('topic', rinherit=ValueFilter.schema)
1015 RelatedResource = 'c7n.resources.sns.SNS'
1016 RelatedIdsExpression = 'SnsTopicArn'
1017 annotation_key = 'c7n:SnsTopic'
1019 def process(self, resources, event=None):
1020 rel = self.get_related(resources)
1021 matched = []
1022 for resource in resources:
1023 if self.process_resource(resource, rel):
1024 # adding full topic details
1025 resource[self.annotation_key] = rel.get(
1026 resource[self.RelatedIdsExpression],
1027 None # can be if value is "absent"
1028 )
1029 matched.append(resource)
1030 return matched
1033@RDSSubscription.action_registry.register('delete')
1034class RDSSubscriptionDelete(BaseAction):
1035 """Deletes a RDS snapshot resource
1037 :example:
1039 .. code-block:: yaml
1041 policies:
1042 - name: rds-subscription-delete
1043 resource: rds-subscription
1044 filters:
1045 - type: value
1046 key: CustSubscriptionId
1047 value: xyz
1048 actions:
1049 - delete
1050 """
1052 schema = type_schema('delete')
1053 permissions = ('rds:DeleteEventSubscription',)
1055 def process(self, resources):
1056 client = local_session(self.manager.session_factory).client('rds')
1057 for r in resources:
1058 self.manager.retry(
1059 client.delete_event_subscription, SubscriptionName=r['CustSubscriptionId'],
1060 ignore_err_codes=('SubscriptionNotFoundFault',
1061 'InvalidEventSubscriptionStateFault'))
1064class DescribeRDSSnapshot(DescribeSource):
1066 def get_resources(self, ids, cache=True):
1067 super_get = super().get_resources
1068 return list(itertools.chain(*[super_get((i,)) for i in ids]))
1070 def augment(self, snaps):
1071 for s in snaps:
1072 s['Tags'] = s.pop('TagList', ())
1073 return snaps
1076@resources.register('rds-snapshot')
1077class RDSSnapshot(QueryResourceManager):
1078 """Resource manager for RDS DB snapshots.
1079 """
1081 class resource_type(TypeInfo):
1082 service = 'rds'
1083 arn_type = 'snapshot'
1084 arn_separator = ':'
1085 enum_spec = ('describe_db_snapshots', 'DBSnapshots', None)
1086 name = id = 'DBSnapshotIdentifier'
1087 date = 'SnapshotCreateTime'
1088 config_type = "AWS::RDS::DBSnapshot"
1089 filter_name = "DBSnapshotIdentifier"
1090 filter_type = "scalar"
1091 universal_taggable = True
1092 permissions_enum = ('rds:DescribeDBSnapshots',)
1094 source_mapping = {
1095 'describe': DescribeRDSSnapshot,
1096 'config': ConfigSource
1097 }
1100@RDSSnapshot.filter_registry.register('onhour')
1101class RDSSnapshotOnHour(OnHour):
1102 """Scheduled action on rds snapshot."""
1105@RDSSnapshot.filter_registry.register('instance')
1106class SnapshotInstance(related.RelatedResourceFilter):
1107 """Filter snapshots by their database attributes.
1109 :example:
1111 Find snapshots without an extant database
1113 .. code-block:: yaml
1115 policies:
1116 - name: rds-snapshot-orphan
1117 resource: aws.rds-snapshot
1118 filters:
1119 - type: instance
1120 value: 0
1121 value_type: resource_count
1122 """
1123 schema = type_schema(
1124 'instance', rinherit=ValueFilter.schema
1125 )
1127 RelatedResource = "c7n.resources.rds.RDS"
1128 RelatedIdsExpression = "DBInstanceIdentifier"
1129 FetchThreshold = 5
1132@RDSSnapshot.filter_registry.register('latest')
1133class LatestSnapshot(Filter):
1134 """Return the latest snapshot for each database.
1135 """
1136 schema = type_schema('latest', automatic={'type': 'boolean'})
1137 permissions = ('rds:DescribeDBSnapshots',)
1139 def process(self, resources, event=None):
1140 results = []
1141 if not self.data.get('automatic', True):
1142 resources = [r for r in resources if r['SnapshotType'] == 'manual']
1143 for db_identifier, snapshots in itertools.groupby(
1144 resources, operator.itemgetter('DBInstanceIdentifier')):
1145 results.append(
1146 sorted(snapshots,
1147 key=operator.itemgetter('SnapshotCreateTime'))[-1])
1148 return results
1151@RDSSnapshot.filter_registry.register('age')
1152class RDSSnapshotAge(AgeFilter):
1153 """Filters RDS snapshots based on age (in days)
1155 :example:
1157 .. code-block:: yaml
1159 policies:
1160 - name: rds-snapshot-expired
1161 resource: rds-snapshot
1162 filters:
1163 - type: age
1164 days: 28
1165 op: ge
1166 actions:
1167 - delete
1168 """
1170 schema = type_schema(
1171 'age', days={'type': 'number'},
1172 op={'$ref': '#/definitions/filters_common/comparison_operators'})
1174 date_attribute = 'SnapshotCreateTime'
1176 def get_resource_date(self, i):
1177 return i.get('SnapshotCreateTime')
1180@RDSSnapshot.action_registry.register('restore')
1181class RestoreInstance(BaseAction):
1182 """Restore an rds instance from a snapshot.
1184 Note this requires the snapshot or db deletion be taken
1185 with the `copy-restore-info` boolean flag set to true, as
1186 various instance metadata is stored on the snapshot as tags.
1188 additional parameters to restore db instance api call be overriden
1189 via `restore_options` settings. various modify db instance parameters
1190 can be specified via `modify_options` settings.
1191 """
1193 schema = type_schema(
1194 'restore',
1195 restore_options={'type': 'object'},
1196 modify_options={'type': 'object'})
1198 permissions = (
1199 'rds:ModifyDBInstance',
1200 'rds:ModifyDBParameterGroup',
1201 'rds:ModifyOptionGroup',
1202 'rds:RebootDBInstance',
1203 'rds:RestoreDBInstanceFromDBSnapshot')
1205 poll_period = 60
1206 restore_keys = {
1207 'VPCSecurityGroups', 'MultiAZ', 'DBSubnetGroupName',
1208 'InstanceClass', 'StorageType', 'ParameterGroupName',
1209 'OptionGroupName'}
1211 def validate(self):
1212 found = False
1213 for f in self.manager.iter_filters():
1214 if isinstance(f, LatestSnapshot):
1215 found = True
1216 if not found:
1217 # do we really need this...
1218 raise PolicyValidationError(
1219 "must filter by latest to use restore action %s" % (
1220 self.manager.data,))
1221 return self
1223 def process(self, resources):
1224 client = local_session(self.manager.session_factory).client('rds')
1225 # restore up to 10 in parallel, we have to wait on each.
1226 with self.executor_factory(
1227 max_workers=min(10, len(resources) or 1)) as w:
1228 futures = {}
1229 for r in resources:
1230 tags = {t['Key']: t['Value'] for t in r['Tags']}
1231 if not set(tags).issuperset(self.restore_keys):
1232 self.log.warning(
1233 "snapshot:%s missing restore tags",
1234 r['DBSnapshotIdentifier'])
1235 continue
1236 futures[w.submit(self.process_instance, client, r)] = r
1237 for f in as_completed(futures):
1238 r = futures[f]
1239 if f.exception():
1240 self.log.warning(
1241 "Error restoring db:%s from:%s error:\n%s",
1242 r['DBInstanceIdentifier'], r['DBSnapshotIdentifier'],
1243 f.exception())
1244 continue
1246 def process_instance(self, client, r):
1247 params, post_modify = self.get_restore_from_tags(r)
1248 self.manager.retry(
1249 client.restore_db_instance_from_db_snapshot, **params)
1250 waiter = client.get_waiter('db_instance_available')
1251 # wait up to 40m
1252 waiter.config.delay = self.poll_period
1253 waiter.wait(DBInstanceIdentifier=params['DBInstanceIdentifier'])
1254 self.manager.retry(
1255 client.modify_db_instance,
1256 DBInstanceIdentifier=params['DBInstanceIdentifier'],
1257 ApplyImmediately=True,
1258 **post_modify)
1259 self.manager.retry(
1260 client.reboot_db_instance,
1261 DBInstanceIdentifier=params['DBInstanceIdentifier'],
1262 ForceFailover=False)
1264 def get_restore_from_tags(self, snapshot):
1265 params, post_modify = {}, {}
1266 tags = {t['Key']: t['Value'] for t in snapshot['Tags']}
1268 params['DBInstanceIdentifier'] = snapshot['DBInstanceIdentifier']
1269 params['DBSnapshotIdentifier'] = snapshot['DBSnapshotIdentifier']
1270 params['MultiAZ'] = tags['MultiAZ'] == 'True' and True or False
1271 params['DBSubnetGroupName'] = tags['DBSubnetGroupName']
1272 params['DBInstanceClass'] = tags['InstanceClass']
1273 params['CopyTagsToSnapshot'] = True
1274 params['StorageType'] = tags['StorageType']
1275 params['OptionGroupName'] = tags['OptionGroupName']
1277 post_modify['DBParameterGroupName'] = tags['ParameterGroupName']
1278 post_modify['VpcSecurityGroupIds'] = tags['VPCSecurityGroups'].split(',')
1280 params['Tags'] = [
1281 {'Key': k, 'Value': v} for k, v in tags.items()
1282 if k not in self.restore_keys]
1284 params.update(self.data.get('restore_options', {}))
1285 post_modify.update(self.data.get('modify_options', {}))
1286 return params, post_modify
1289@RDSSnapshot.filter_registry.register('cross-account')
1290class CrossAccountAccess(CrossAccountAccessFilter):
1292 permissions = ('rds:DescribeDBSnapshotAttributes',)
1293 attributes_key = 'c7n:attributes'
1294 annotation_key = 'c7n:CrossAccountViolations'
1296 def process(self, resources, event=None):
1297 self.accounts = self.get_accounts()
1298 self.everyone_only = self.data.get("everyone_only", False)
1299 results = []
1300 with self.executor_factory(max_workers=2) as w:
1301 futures = []
1302 for resource_set in chunks(resources, 20):
1303 futures.append(w.submit(
1304 self.process_resource_set, resource_set))
1305 for f in as_completed(futures):
1306 if f.exception():
1307 self.log.error(
1308 "Exception checking cross account access\n %s" % (
1309 f.exception()))
1310 continue
1311 results.extend(f.result())
1312 return results
1314 def process_resource_set(self, resource_set):
1315 client = local_session(self.manager.session_factory).client('rds')
1316 results = []
1317 for r in resource_set:
1318 attrs = {t['AttributeName']: t['AttributeValues']
1319 for t in self.manager.retry(
1320 client.describe_db_snapshot_attributes,
1321 DBSnapshotIdentifier=r['DBSnapshotIdentifier'])[
1322 'DBSnapshotAttributesResult']['DBSnapshotAttributes']}
1323 r[self.attributes_key] = attrs
1324 shared_accounts = set(attrs.get('restore', []))
1325 if self.everyone_only:
1326 shared_accounts = {a for a in shared_accounts if a == 'all'}
1327 delta_accounts = shared_accounts.difference(self.accounts)
1328 if delta_accounts:
1329 r[self.annotation_key] = list(delta_accounts)
1330 results.append(r)
1331 return results
1334@RDSSnapshot.action_registry.register('set-permissions')
1335class SetPermissions(BaseAction):
1336 """Set permissions for copying or restoring an RDS snapshot
1338 Use the 'add' and 'remove' parameters to control which accounts to
1339 add or remove, respectively. The default is to remove any
1340 permissions granted to other AWS accounts.
1342 Use `remove: matched` in combination with the `cross-account` filter
1343 for more flexible removal options such as preserving access for
1344 a set of whitelisted accounts:
1346 :example:
1348 .. code-block:: yaml
1350 policies:
1351 - name: rds-snapshot-remove-cross-account
1352 resource: rds-snapshot
1353 filters:
1354 - type: cross-account
1355 whitelist:
1356 - '112233445566'
1357 actions:
1358 - type: set-permissions
1359 remove: matched
1360 """
1361 schema = type_schema(
1362 'set-permissions',
1363 remove={'oneOf': [
1364 {'enum': ['matched']},
1365 {'type': 'array', 'items': {
1366 'oneOf': [
1367 {'type': 'string', 'minLength': 12, 'maxLength': 12},
1368 {'enum': ['all']},
1369 ],
1370 }}
1371 ]},
1372 add={
1373 'type': 'array', 'items': {
1374 'oneOf': [
1375 {'type': 'string', 'minLength': 12, 'maxLength': 12},
1376 {'enum': ['all']},
1377 ]
1378 }
1379 }
1380 )
1382 permissions = ('rds:ModifyDBSnapshotAttribute',)
1384 def validate(self):
1385 if self.data.get('remove') == 'matched':
1386 found = False
1387 for f in self.manager.iter_filters():
1388 if isinstance(f, CrossAccountAccessFilter):
1389 found = True
1390 break
1391 if not found:
1392 raise PolicyValidationError(
1393 "policy:%s filter:%s with matched requires cross-account filter" % (
1394 self.manager.ctx.policy.name, self.type))
1396 def process(self, snapshots):
1397 client = local_session(self.manager.session_factory).client('rds')
1398 for s in snapshots:
1399 self.process_snapshot(client, s)
1401 def process_snapshot(self, client, snapshot):
1402 add_accounts = self.data.get('add', [])
1403 remove_accounts = self.data.get('remove', [])
1405 if not (add_accounts or remove_accounts):
1406 if CrossAccountAccess.attributes_key not in snapshot:
1407 attrs = {
1408 t['AttributeName']: t['AttributeValues']
1409 for t in self.manager.retry(
1410 client.describe_db_snapshot_attributes,
1411 DBSnapshotIdentifier=snapshot['DBSnapshotIdentifier']
1412 )['DBSnapshotAttributesResult']['DBSnapshotAttributes']
1413 }
1414 snapshot[CrossAccountAccess.attributes_key] = attrs
1415 remove_accounts = snapshot[CrossAccountAccess.attributes_key].get('restore', [])
1416 elif remove_accounts == 'matched':
1417 remove_accounts = snapshot.get(CrossAccountAccess.annotation_key, [])
1419 if add_accounts or remove_accounts:
1420 client.modify_db_snapshot_attribute(
1421 DBSnapshotIdentifier=snapshot['DBSnapshotIdentifier'],
1422 AttributeName='restore',
1423 ValuesToRemove=remove_accounts,
1424 ValuesToAdd=add_accounts)
1427@RDSSnapshot.action_registry.register('region-copy')
1428class RegionCopySnapshot(BaseAction):
1429 """Copy a snapshot across regions.
1431 Note there is a max in flight for cross region rds snapshots
1432 of 5 per region. This action will attempt to retry automatically
1433 for an hr.
1435 Example::
1437 - name: copy-encrypted-snapshots
1438 description: |
1439 copy snapshots under 1 day old to dr region with kms
1440 resource: rds-snapshot
1441 region: us-east-1
1442 filters:
1443 - Status: available
1444 - type: value
1445 key: SnapshotCreateTime
1446 value_type: age
1447 value: 1
1448 op: less-than
1449 actions:
1450 - type: region-copy
1451 target_region: us-east-2
1452 target_key: arn:aws:kms:us-east-2:0000:key/cb291f53-c9cf61
1453 copy_tags: true
1454 tags:
1455 OriginRegion: us-east-1
1456 """
1458 schema = type_schema(
1459 'region-copy',
1460 target_region={'type': 'string'},
1461 target_key={'type': 'string'},
1462 copy_tags={'type': 'boolean'},
1463 tags={'type': 'object'},
1464 required=('target_region',))
1466 permissions = ('rds:CopyDBSnapshot',)
1467 min_delay = 120
1468 max_attempts = 30
1470 def validate(self):
1471 if self.data.get('target_region') and self.manager.data.get('mode'):
1472 raise PolicyValidationError(
1473 "cross region snapshot may require waiting for "
1474 "longer then lambda runtime allows %s" % (self.manager.data,))
1475 return self
1477 def process(self, resources):
1478 if self.data['target_region'] == self.manager.config.region:
1479 self.log.warning(
1480 "Source and destination region are the same, skipping copy")
1481 return
1482 for resource_set in chunks(resources, 20):
1483 self.process_resource_set(resource_set)
1485 def process_resource(self, target, key, tags, snapshot):
1486 p = {}
1487 if key:
1488 p['KmsKeyId'] = key
1489 p['TargetDBSnapshotIdentifier'] = snapshot[
1490 'DBSnapshotIdentifier'].replace(':', '-')
1491 p['SourceRegion'] = self.manager.config.region
1492 p['SourceDBSnapshotIdentifier'] = snapshot['DBSnapshotArn']
1494 if self.data.get('copy_tags', True):
1495 p['CopyTags'] = True
1496 if tags:
1497 p['Tags'] = tags
1499 retry = get_retry(
1500 ('SnapshotQuotaExceeded',),
1501 # TODO make this configurable, class defaults to 1hr
1502 min_delay=self.min_delay,
1503 max_attempts=self.max_attempts,
1504 log_retries=logging.DEBUG)
1505 try:
1506 result = retry(target.copy_db_snapshot, **p)
1507 except ClientError as e:
1508 if e.response['Error']['Code'] == 'DBSnapshotAlreadyExists':
1509 self.log.warning(
1510 "Snapshot %s already exists in target region",
1511 snapshot['DBSnapshotIdentifier'])
1512 return
1513 raise
1514 snapshot['c7n:CopiedSnapshot'] = result[
1515 'DBSnapshot']['DBSnapshotArn']
1517 def process_resource_set(self, resource_set):
1518 target_client = self.manager.session_factory(
1519 region=self.data['target_region']).client('rds')
1520 target_key = self.data.get('target_key')
1521 tags = [{'Key': k, 'Value': v} for k, v
1522 in self.data.get('tags', {}).items()]
1524 for snapshot_set in chunks(resource_set, 5):
1525 for r in snapshot_set:
1526 # If tags are supplied, copy tags are ignored, and
1527 # we need to augment the tag set with the original
1528 # resource tags to preserve the common case.
1529 rtags = tags and list(tags) or None
1530 if tags and self.data.get('copy_tags', True):
1531 rtags.extend(r['Tags'])
1532 self.process_resource(target_client, target_key, rtags, r)
1535@RDSSnapshot.action_registry.register('delete')
1536class RDSSnapshotDelete(BaseAction):
1537 """Deletes a RDS snapshot resource
1539 :example:
1541 .. code-block:: yaml
1543 policies:
1544 - name: rds-snapshot-delete-stale
1545 resource: rds-snapshot
1546 filters:
1547 - type: age
1548 days: 28
1549 op: ge
1550 actions:
1551 - delete
1552 """
1554 schema = type_schema('delete')
1555 permissions = ('rds:DeleteDBSnapshot',)
1557 def process(self, snapshots):
1558 snapshots = self.filter_resources(snapshots, 'SnapshotType', ('manual',))
1559 if not snapshots:
1560 return []
1561 log.info("Deleting %d rds snapshots", len(snapshots))
1562 with self.executor_factory(max_workers=3) as w:
1563 futures = []
1564 for snapshot_set in chunks(reversed(snapshots), size=50):
1565 futures.append(
1566 w.submit(self.process_snapshot_set, snapshot_set))
1567 for f in as_completed(futures):
1568 if f.exception():
1569 self.log.error(
1570 "Exception deleting snapshot set \n %s",
1571 f.exception())
1572 return snapshots
1574 def process_snapshot_set(self, snapshots_set):
1575 c = local_session(self.manager.session_factory).client('rds')
1576 for s in snapshots_set:
1577 c.delete_db_snapshot(
1578 DBSnapshotIdentifier=s['DBSnapshotIdentifier'])
1581@actions.register('modify-security-groups')
1582class RDSModifyVpcSecurityGroups(ModifyVpcSecurityGroupsAction):
1584 permissions = ('rds:ModifyDBInstance', 'rds:ModifyDBCluster')
1585 vpc_expr = 'DBSubnetGroup.VpcId'
1587 def process(self, rds_instances):
1588 replication_group_map = {}
1589 client = local_session(self.manager.session_factory).client('rds')
1590 groups = super(RDSModifyVpcSecurityGroups, self).get_groups(
1591 rds_instances)
1593 # either build map for DB cluster or modify DB instance directly
1594 for idx, i in enumerate(rds_instances):
1595 if i.get('DBClusterIdentifier'):
1596 # build map of Replication Groups to Security Groups
1597 replication_group_map[i['DBClusterIdentifier']] = groups[idx]
1598 else:
1599 client.modify_db_instance(
1600 DBInstanceIdentifier=i['DBInstanceIdentifier'],
1601 VpcSecurityGroupIds=groups[idx])
1603 # handle DB cluster, if necessary
1604 for idx, r in enumerate(replication_group_map.keys()):
1605 client.modify_db_cluster(
1606 DBClusterIdentifier=r,
1607 VpcSecurityGroupIds=replication_group_map[r]
1608 )
1611class DescribeSubnetGroup(DescribeSource):
1613 def augment(self, resources):
1614 _db_subnet_group_tags(
1615 resources, self.manager.session_factory,
1616 self.manager.executor_factory, self.manager.retry)
1617 return resources
1620@resources.register('rds-subnet-group')
1621class RDSSubnetGroup(QueryResourceManager):
1622 """RDS subnet group."""
1624 class resource_type(TypeInfo):
1625 service = 'rds'
1626 arn_type = 'subgrp'
1627 id = name = 'DBSubnetGroupName'
1628 arn_separator = ':'
1629 enum_spec = (
1630 'describe_db_subnet_groups', 'DBSubnetGroups', None)
1631 filter_name = 'DBSubnetGroupName'
1632 filter_type = 'scalar'
1633 permissions_enum = ('rds:DescribeDBSubnetGroups',)
1634 cfn_type = config_type = 'AWS::RDS::DBSubnetGroup'
1635 universal_taggable = object()
1637 source_mapping = {
1638 'config': ConfigSource,
1639 'describe': DescribeSubnetGroup
1640 }
1643def _db_subnet_group_tags(subnet_groups, session_factory, executor_factory, retry):
1644 client = local_session(session_factory).client('rds')
1646 def process_tags(g):
1647 try:
1648 g['Tags'] = client.list_tags_for_resource(
1649 ResourceName=g['DBSubnetGroupArn'])['TagList']
1650 return g
1651 except client.exceptions.DBSubnetGroupNotFoundFault:
1652 return None
1654 return list(filter(None, map(process_tags, subnet_groups)))
1657@RDSSubnetGroup.action_registry.register('delete')
1658class RDSSubnetGroupDeleteAction(BaseAction):
1659 """Action to delete RDS Subnet Group
1661 It is recommended to apply a filter to the delete policy to avoid unwanted
1662 deletion of any rds subnet groups.
1664 :example:
1666 .. code-block:: yaml
1668 policies:
1669 - name: rds-subnet-group-delete
1670 resource: rds-subnet-group
1671 filters:
1672 - Instances: []
1673 actions:
1674 - delete
1675 """
1677 schema = type_schema('delete')
1678 permissions = ('rds:DeleteDBSubnetGroup',)
1680 def process(self, subnet_group):
1681 with self.executor_factory(max_workers=2) as w:
1682 list(w.map(self.process_subnetgroup, subnet_group))
1684 def process_subnetgroup(self, subnet_group):
1685 client = local_session(self.manager.session_factory).client('rds')
1686 client.delete_db_subnet_group(DBSubnetGroupName=subnet_group['DBSubnetGroupName'])
1689@RDSSubnetGroup.filter_registry.register('unused')
1690class UnusedRDSSubnetGroup(Filter):
1691 """Filters all launch rds subnet groups that are not in use but exist
1693 :example:
1695 .. code-block:: yaml
1697 policies:
1698 - name: rds-subnet-group-delete-unused
1699 resource: rds-subnet-group
1700 filters:
1701 - unused
1702 """
1704 schema = type_schema('unused')
1706 def get_permissions(self):
1707 return self.manager.get_resource_manager('rds').get_permissions()
1709 def process(self, configs, event=None):
1710 rds = self.manager.get_resource_manager('rds').resources()
1711 self.used = set(jmespath_search('[].DBSubnetGroup.DBSubnetGroupName', rds))
1712 self.used.update(set(jmespath_search('[].DBSubnetGroup.DBSubnetGroupName',
1713 self.manager.get_resource_manager('rds-cluster').resources(augment=False))))
1714 return super(UnusedRDSSubnetGroup, self).process(configs)
1716 def __call__(self, config):
1717 return config['DBSubnetGroupName'] not in self.used
1720@filters.register('db-parameter')
1721class ParameterFilter(ValueFilter):
1722 """
1723 Applies value type filter on set db parameter values.
1724 :example:
1726 .. code-block:: yaml
1728 policies:
1729 - name: rds-pg
1730 resource: rds
1731 filters:
1732 - type: db-parameter
1733 key: someparam
1734 op: eq
1735 value: someval
1736 """
1738 schema = type_schema('db-parameter', rinherit=ValueFilter.schema)
1739 schema_alias = False
1740 permissions = ('rds:DescribeDBInstances', 'rds:DescribeDBParameters', )
1741 policy_annotation = 'c7n:MatchedDBParameter'
1743 @staticmethod
1744 def recast(val, datatype):
1745 """ Re-cast the value based upon an AWS supplied datatype
1746 and treat nulls sensibly.
1747 """
1748 ret_val = val
1749 if datatype == 'string':
1750 ret_val = str(val)
1751 elif datatype == 'boolean':
1752 # AWS returns 1s and 0s for boolean for most of the cases
1753 if val.isdigit():
1754 ret_val = bool(int(val))
1755 # AWS returns 'TRUE,FALSE' for Oracle engine
1756 elif val == 'TRUE':
1757 ret_val = True
1758 elif val == 'FALSE':
1759 ret_val = False
1760 elif datatype == 'integer':
1761 if val.isdigit():
1762 ret_val = int(val)
1763 elif datatype == 'float':
1764 ret_val = float(val) if val else 0.0
1766 return ret_val
1768 # Private method for 'DBParameterGroupName' paginator
1769 def _get_param_list(self, pg):
1770 client = local_session(self.manager.session_factory).client('rds')
1771 paginator = client.get_paginator('describe_db_parameters')
1772 param_list = list(itertools.chain(*[p['Parameters']
1773 for p in paginator.paginate(DBParameterGroupName=pg)]))
1774 return param_list
1776 def handle_paramgroup_cache(self, param_groups):
1777 pgcache = {}
1778 cache = self.manager._cache
1780 with cache:
1781 for pg in param_groups:
1782 cache_key = {
1783 'region': self.manager.config.region,
1784 'account_id': self.manager.config.account_id,
1785 'rds-pg': pg}
1786 pg_values = cache.get(cache_key)
1787 if pg_values is not None:
1788 pgcache[pg] = pg_values
1789 continue
1790 param_list = self._get_param_list(pg)
1791 pgcache[pg] = {
1792 p['ParameterName']: self.recast(p['ParameterValue'], p['DataType'])
1793 for p in param_list if 'ParameterValue' in p}
1794 cache.save(cache_key, pgcache[pg])
1795 return pgcache
1797 def process(self, resources, event=None):
1798 results = []
1799 parameter_group_list = {db['DBParameterGroups'][0]['DBParameterGroupName']
1800 for db in resources}
1801 paramcache = self.handle_paramgroup_cache(parameter_group_list)
1802 for resource in resources:
1803 for pg in resource['DBParameterGroups']:
1804 pg_values = paramcache[pg['DBParameterGroupName']]
1805 if self.match(pg_values):
1806 resource.setdefault(self.policy_annotation, []).append(
1807 self.data.get('key'))
1808 results.append(resource)
1809 break
1810 return results
1813@actions.register('modify-db')
1814class ModifyDb(BaseAction):
1815 """Modifies an RDS instance based on specified parameter
1816 using ModifyDbInstance.
1818 'Update' is an array with with key value pairs that should be set to
1819 the property and value you wish to modify.
1820 'Immediate" determines whether the modification is applied immediately
1821 or not. If 'immediate' is not specified, default is false.
1823 :example:
1825 .. code-block:: yaml
1827 policies:
1828 - name: disable-rds-deletion-protection
1829 resource: rds
1830 filters:
1831 - DeletionProtection: true
1832 - PubliclyAccessible: true
1833 actions:
1834 - type: modify-db
1835 update:
1836 - property: 'DeletionProtection'
1837 value: false
1838 - property: 'PubliclyAccessible'
1839 value: false
1840 immediate: true
1841 """
1843 schema = type_schema(
1844 'modify-db',
1845 immediate={"type": 'boolean'},
1846 update={
1847 'type': 'array',
1848 'items': {
1849 'type': 'object',
1850 'properties': {
1851 'property': {'type': 'string', 'enum': [
1852 'AllocatedStorage',
1853 'DBInstanceClass',
1854 'DBSubnetGroupName',
1855 'DBSecurityGroups',
1856 'VpcSecurityGroupIds',
1857 'MasterUserPassword',
1858 'DBParameterGroupName',
1859 'BackupRetentionPeriod',
1860 'PreferredBackupWindow',
1861 'PreferredMaintenanceWindow',
1862 'MultiAZ',
1863 'EngineVersion',
1864 'AllowMajorVersionUpgrade',
1865 'AutoMinorVersionUpgrade',
1866 'LicenseModel',
1867 'Iops',
1868 'OptionGroupName',
1869 'NewDBInstanceIdentifier',
1870 'StorageType',
1871 'TdeCredentialArn',
1872 'TdeCredentialPassword',
1873 'CACertificateIdentifier',
1874 'Domain',
1875 'CopyTagsToSnapshot',
1876 'MonitoringInterval',
1877 'MonitoringRoleARN',
1878 'DBPortNumber',
1879 'PubliclyAccessible',
1880 'DomainIAMRoleName',
1881 'PromotionTier',
1882 'EnableIAMDatabaseAuthentication',
1883 'EnablePerformanceInsights',
1884 'PerformanceInsightsKMSKeyId',
1885 'PerformanceInsightsRetentionPeriod',
1886 'CloudwatchLogsExportConfiguration',
1887 'ProcessorFeatures',
1888 'UseDefaultProcessorFeatures',
1889 'DeletionProtection',
1890 'MaxAllocatedStorage',
1891 'CertificateRotationRestart']},
1892 'value': {}
1893 },
1894 },
1895 },
1896 required=('update',))
1898 permissions = ('rds:ModifyDBInstance',)
1899 conversion_map = {
1900 'DBSubnetGroupName': 'DBSubnetGroup.DBSubnetGroupName',
1901 'VpcSecurityGroupIds': 'VpcSecurityGroups[].VpcSecurityGroupId',
1902 'DBParameterGroupName': 'DBParameterGroups[].DBParameterGroupName',
1903 'OptionGroupName': 'OptionGroupMemberships[].OptionGroupName',
1904 'NewDBInstanceIdentifier': 'DBInstanceIdentifier',
1905 'Domain': 'DomainMemberships[].DomainName',
1906 'DBPortNumber': 'Endpoint.Port',
1907 'EnablePerformanceInsights': 'PerformanceInsightsEnabled',
1908 'CloudwatchLogsExportConfiguration': 'EnabledCloudwatchLogsExports'
1909 }
1911 def validate(self):
1912 if self.data.get('update'):
1913 update_dict = dict((i['property'], i['value']) for i in self.data.get('update'))
1914 if ('MonitoringInterval' in update_dict and update_dict['MonitoringInterval'] > 0 and
1915 'MonitoringRoleARN' not in update_dict):
1916 raise PolicyValidationError(
1917 "A MonitoringRoleARN value is required \
1918 if you specify a MonitoringInterval value other than 0")
1919 if ('CloudwatchLogsExportConfiguration' in update_dict
1920 and all(
1921 k not in update_dict.get('CloudwatchLogsExportConfiguration')
1922 for k in ('EnableLogTypes', 'DisableLogTypes'))):
1923 raise PolicyValidationError(
1924 "A EnableLogTypes or DisableLogTypes input list is required\
1925 for setting CloudwatchLogsExportConfiguration")
1926 return self
1928 def process(self, resources):
1929 c = local_session(self.manager.session_factory).client('rds')
1930 for r in resources:
1931 param = {
1932 u['property']: u['value'] for u in self.data.get('update')
1933 if r.get(
1934 u['property'],
1935 jmespath_search(
1936 self.conversion_map.get(u['property'], 'None'), r))
1937 != u['value']}
1938 if not param:
1939 continue
1940 param['ApplyImmediately'] = self.data.get('immediate', False)
1941 param['DBInstanceIdentifier'] = r['DBInstanceIdentifier']
1942 try:
1943 c.modify_db_instance(**param)
1944 except c.exceptions.DBInstanceNotFoundFault:
1945 raise
1948@resources.register('rds-reserved')
1949class ReservedRDS(QueryResourceManager):
1950 """Lists all active rds reservations
1952 :example:
1954 .. code-block:: yaml
1956 policies:
1957 - name: existing-rds-reservations
1958 resource: rds-reserved
1959 filters:
1960 - State: active
1961 """
1963 class resource_type(TypeInfo):
1964 service = 'rds'
1965 name = id = 'ReservedDBInstanceId'
1966 date = 'StartTime'
1967 enum_spec = (
1968 'describe_reserved_db_instances', 'ReservedDBInstances', None)
1969 filter_name = 'ReservedDBInstances'
1970 filter_type = 'list'
1971 arn_type = "ri"
1972 arn = "ReservedDBInstanceArn"
1973 permissions_enum = ('rds:DescribeReservedDBInstances',)
1974 universal_taggable = object()
1976 augment = universal_augment
1979RDS.filter_registry.register('consecutive-aws-backups', ConsecutiveAwsBackupsFilter)
1982@filters.register('consecutive-snapshots')
1983class ConsecutiveSnapshots(Filter):
1984 """Returns instances where number of consective daily snapshots is
1985 equal to/or greater than n days.
1987 :example:
1989 .. code-block:: yaml
1991 policies:
1992 - name: rds-daily-snapshot-count
1993 resource: rds
1994 filters:
1995 - type: consecutive-snapshots
1996 days: 7
1997 """
1998 schema = type_schema('consecutive-snapshots', days={'type': 'number', 'minimum': 1},
1999 required=['days'])
2000 permissions = ('rds:DescribeDBSnapshots', 'rds:DescribeDBInstances')
2001 annotation = 'c7n:DBSnapshots'
2003 def process_resource_set(self, client, resources):
2004 rds_instances = [r['DBInstanceIdentifier'] for r in resources]
2005 paginator = client.get_paginator('describe_db_snapshots')
2006 paginator.PAGE_ITERATOR_CLS = RetryPageIterator
2007 db_snapshots = paginator.paginate(Filters=[{'Name': 'db-instance-id',
2008 'Values': rds_instances}]).build_full_result().get('DBSnapshots', [])
2010 inst_map = {}
2011 for snapshot in db_snapshots:
2012 inst_map.setdefault(snapshot['DBInstanceIdentifier'], []).append(snapshot)
2013 for r in resources:
2014 r[self.annotation] = inst_map.get(r['DBInstanceIdentifier'], [])
2016 def process(self, resources, event=None):
2017 client = local_session(self.manager.session_factory).client('rds')
2018 results = []
2019 retention = self.data.get('days')
2020 utcnow = datetime.datetime.utcnow()
2021 expected_dates = set()
2022 for days in range(1, retention + 1):
2023 expected_dates.add((utcnow - timedelta(days=days)).strftime('%Y-%m-%d'))
2025 for resource_set in chunks(
2026 [r for r in resources if self.annotation not in r], 50):
2027 self.process_resource_set(client, resource_set)
2029 for r in resources:
2030 snapshot_dates = set()
2031 for snapshot in r[self.annotation]:
2032 if snapshot['Status'] == 'available':
2033 snapshot_dates.add(snapshot['SnapshotCreateTime'].strftime('%Y-%m-%d'))
2034 if expected_dates.issubset(snapshot_dates):
2035 results.append(r)
2036 return results
2039@filters.register('engine')
2040class EngineFilter(ValueFilter):
2041 """
2042 Filter a rds resource based on its Engine Metadata
2044 :example:
2046 .. code-block:: yaml
2048 policies:
2049 - name: find-deprecated-versions
2050 resource: aws.rds
2051 filters:
2052 - type: engine
2053 key: Status
2054 value: deprecated
2055 """
2057 schema = type_schema('engine', rinherit=ValueFilter.schema)
2059 permissions = ("rds:DescribeDBEngineVersions", )
2061 def process(self, resources, event=None):
2062 client = local_session(self.manager.session_factory).client('rds')
2064 engines = set()
2065 engine_versions = set()
2066 for r in resources:
2067 engines.add(r['Engine'])
2068 engine_versions.add(r['EngineVersion'])
2070 paginator = client.get_paginator('describe_db_engine_versions')
2071 response = paginator.paginate(
2072 Filters=[
2073 {'Name': 'engine', 'Values': list(engines)},
2074 {'Name': 'engine-version', 'Values': list(engine_versions)}
2075 ],
2076 IncludeAll=True,
2077 )
2078 all_versions = {}
2079 matched = []
2080 for page in response:
2081 for e in page['DBEngineVersions']:
2082 all_versions.setdefault(e['Engine'], {})
2083 all_versions[e['Engine']][e['EngineVersion']] = e
2084 for r in resources:
2085 v = all_versions[r['Engine']][r['EngineVersion']]
2086 if self.match(v):
2087 r['c7n:Engine'] = v
2088 matched.append(r)
2089 return matched
2092class DescribeDBProxy(DescribeSource):
2093 def augment(self, resources):
2094 return universal_augment(self.manager, resources)
2097@resources.register('rds-proxy')
2098class RDSProxy(QueryResourceManager):
2099 """Resource Manager for RDS DB Proxies
2101 :example:
2103 .. code-block:: yaml
2105 policies:
2106 - name: rds-proxy-tls-check
2107 resource: rds-proxy
2108 filters:
2109 - type: value
2110 key: RequireTLS
2111 value: false
2112 """
2114 class resource_type(TypeInfo):
2115 service = 'rds'
2116 name = id = 'DBProxyName'
2117 date = 'CreatedDate'
2118 enum_spec = ('describe_db_proxies', 'DBProxies', None)
2119 arn = 'DBProxyArn'
2120 arn_type = 'db-proxy'
2121 cfn_type = 'AWS::RDS::DBProxy'
2122 permissions_enum = ('rds:DescribeDBProxies',)
2123 universal_taggable = object()
2125 source_mapping = {
2126 'describe': DescribeDBProxy,
2127 'config': ConfigSource
2128 }
2131@RDSProxy.action_registry.register('delete')
2132class DeleteRDSProxy(BaseAction):
2133 """
2134 Deletes a RDS Proxy
2136 :example:
2138 .. code-block:: yaml
2140 policies:
2141 - name: delete-rds-proxy
2142 resource: aws.rds-proxy
2143 filters:
2144 - type: value
2145 key: "DBProxyName"
2146 op: eq
2147 value: "proxy-test-1"
2148 actions:
2149 - type: delete
2150 """
2152 schema = type_schema('delete')
2154 permissions = ('rds:DeleteDBProxy',)
2156 def process(self, resources):
2157 client = local_session(self.manager.session_factory).client('rds')
2158 for r in resources:
2159 self.manager.retry(
2160 client.delete_db_proxy, DBProxyName=r['DBProxyName'],
2161 ignore_err_codes=('DBProxyNotFoundFault',
2162 'InvalidDBProxyStateFault'))
2165@RDSProxy.filter_registry.register('subnet')
2166class RDSProxySubnetFilter(net_filters.SubnetFilter):
2168 RelatedIdsExpression = "VpcSubnetIds[]"
2171@RDSProxy.filter_registry.register('security-group')
2172class RDSProxySecurityGroupFilter(net_filters.SecurityGroupFilter):
2174 RelatedIdsExpression = "VpcSecurityGroupIds[]"
2177@RDSProxy.filter_registry.register('vpc')
2178class RDSProxyVpcFilter(net_filters.VpcFilter):
2180 RelatedIdsExpression = "VpcId"
2183@filters.register('db-option-groups')
2184class DbOptionGroups(ValueFilter):
2185 """This filter describes RDS option groups for associated RDS instances.
2186 Use this filter in conjunction with jmespath and value filter operators
2187 to filter RDS instance based on their option groups
2189 :example:
2191 .. code-block:: yaml
2193 policies:
2194 - name: rds-data-in-transit-encrypted
2195 resource: aws.rds
2196 filters:
2197 - type: db-option-groups
2198 key: Options[].OptionName
2199 op: intersect
2200 value:
2201 - SSL
2202 - NATIVE_NETWORK_ENCRYPTION
2204 :example:
2206 .. code-block:: yaml
2208 policies:
2209 - name: rds-oracle-encryption-in-transit
2210 resource: aws.rds
2211 filters:
2212 - Engine: oracle-ee
2213 - type: db-option-groups
2214 key: Options[].OptionSettings[?Name == 'SQLNET.ENCRYPTION_SERVER'].Value[]
2215 value:
2216 - REQUIRED
2217 """
2219 schema = type_schema('db-option-groups', rinherit=ValueFilter.schema)
2220 schema_alias = False
2221 permissions = ('rds:DescribeDBInstances', 'rds:DescribeOptionGroups', )
2222 policy_annotation = 'c7n:MatchedDBOptionGroups'
2224 def handle_optiongroup_cache(self, client, paginator, option_groups):
2225 ogcache = {}
2226 cache = self.manager._cache
2228 with cache:
2229 for og in option_groups:
2230 cache_key = {
2231 'region': self.manager.config.region,
2232 'account_id': self.manager.config.account_id,
2233 'rds-pg': og}
2234 og_values = cache.get(cache_key)
2235 if og_values is not None:
2236 ogcache[og] = og_values
2237 continue
2238 option_groups_list = list(itertools.chain(*[p['OptionGroupsList']
2239 for p in paginator.paginate(OptionGroupName=og)]))
2241 ogcache[og] = {}
2242 for option_group in option_groups_list:
2243 ogcache[og] = option_group
2245 cache.save(cache_key, ogcache[og])
2247 return ogcache
2249 def process(self, resources, event=None):
2250 results = []
2251 client = local_session(self.manager.session_factory).client('rds')
2252 paginator = client.get_paginator('describe_option_groups')
2253 option_groups = [db['OptionGroupMemberships'][0]['OptionGroupName']
2254 for db in resources]
2255 optioncache = self.handle_optiongroup_cache(client, paginator, option_groups)
2257 for resource in resources:
2258 for og in resource['OptionGroupMemberships']:
2259 og_values = optioncache[og['OptionGroupName']]
2260 if self.match(og_values):
2261 resource.setdefault(self.policy_annotation, []).append({
2262 k: jmespath_search(k, og_values)
2263 for k in {'OptionGroupName', self.data.get('key')}
2264 })
2265 results.append(resource)
2266 break
2268 return results
2271@filters.register('pending-maintenance')
2272class PendingMaintenance(Filter):
2273 """Scan DB instances for those with pending maintenance
2275 :example:
2277 .. code-block:: yaml
2279 policies:
2280 - name: rds-pending-maintenance
2281 resource: aws.rds
2282 filters:
2283 - pending-maintenance
2284 - type: value
2285 key: '"c7n:PendingMaintenance"[].PendingMaintenanceActionDetails[].Action'
2286 op: intersect
2287 value:
2288 - system-update
2289 """
2291 annotation_key = 'c7n:PendingMaintenance'
2292 schema = type_schema('pending-maintenance')
2293 permissions = ('rds:DescribePendingMaintenanceActions',)
2295 def process(self, resources, event=None):
2296 client = local_session(self.manager.session_factory).client('rds')
2298 results = []
2299 resource_maintenances = {}
2300 paginator = client.get_paginator('describe_pending_maintenance_actions')
2301 for page in paginator.paginate():
2302 for action in page['PendingMaintenanceActions']:
2303 resource_maintenances.setdefault(action['ResourceIdentifier'], []).append(action)
2305 for r in resources:
2306 pending_maintenances = resource_maintenances.get(r['DBInstanceArn'], [])
2307 if len(pending_maintenances) > 0:
2308 r[self.annotation_key] = pending_maintenances
2309 results.append(r)
2311 return results