1# Copyright The Cloud Custodian Authors.
2# SPDX-License-Identifier: Apache-2.0
3import logging
4import itertools
5from concurrent.futures import as_completed
6from datetime import datetime, timedelta
7from itertools import chain
8
9from c7n.actions import BaseAction
10from c7n.filters import AgeFilter, CrossAccountAccessFilter, Filter, ValueFilter
11from c7n.filters.offhours import OffHour, OnHour
12import c7n.filters.vpc as net_filters
13from c7n.manager import resources
14from c7n.query import (
15 ConfigSource, QueryResourceManager, TypeInfo, DescribeSource, RetryPageIterator)
16from c7n.resources import rds
17from c7n.filters.kms import KmsRelatedFilter
18from .aws import shape_validate
19from c7n.exceptions import PolicyValidationError
20from botocore.exceptions import ClientError
21from c7n.utils import (
22 type_schema, local_session, get_retry, snapshot_identifier, chunks)
23
24from c7n.resources.rds import ParameterFilter
25from c7n.filters.backup import ConsecutiveAwsBackupsFilter
26
27log = logging.getLogger('custodian.rds-cluster')
28
29
30class DescribeCluster(DescribeSource):
31
32 def get_resources(self, ids):
33 resources = chain.from_iterable(
34 self.query.filter(
35 self.manager,
36 Filters=[
37 {'Name': 'db-cluster-id', 'Values': ids_chunk}
38 ]
39 )
40 for ids_chunk in chunks(ids, 100) # DescribeCluster filter length limit
41 )
42 return list(resources)
43
44 def augment(self, resources):
45 for r in resources:
46 r['Tags'] = r.pop('TagList', ())
47 return resources
48
49
50class ConfigCluster(ConfigSource):
51
52 def load_resource(self, item):
53 resource = super().load_resource(item)
54 resource.pop('TagList', None) # we pull tags from supplementary config
55 for k in list(resource.keys()):
56 if k.startswith('Dbc'):
57 resource["DBC%s" % (k[3:])] = resource.pop(k)
58 elif k.startswith('Iamd'):
59 resource['IAMD%s' % (k[4:])] = resource.pop(k)
60 elif k.startswith('Dbs'):
61 resource["DBS%s" % (k[3:])] = resource.pop(k)
62 return resource
63
64
65@resources.register('rds-cluster')
66class RDSCluster(QueryResourceManager):
67 """Resource manager for RDS clusters.
68 """
69
70 class resource_type(TypeInfo):
71
72 service = 'rds'
73 arn = 'DBClusterArn'
74 arn_type = 'cluster'
75 arn_separator = ":"
76 enum_spec = ('describe_db_clusters', 'DBClusters', None)
77 name = id = 'DBClusterIdentifier'
78 config_id = 'DbClusterResourceId'
79 dimension = 'DBClusterIdentifier'
80 universal_taggable = True
81 permissions_enum = ('rds:DescribeDBClusters',)
82 cfn_type = config_type = 'AWS::RDS::DBCluster'
83
84 source_mapping = {
85 'config': ConfigCluster,
86 'describe': DescribeCluster
87 }
88
89
90RDSCluster.filter_registry.register('offhour', OffHour)
91RDSCluster.filter_registry.register('onhour', OnHour)
92
93
94@RDSCluster.filter_registry.register('security-group')
95class SecurityGroupFilter(net_filters.SecurityGroupFilter):
96
97 RelatedIdsExpression = "VpcSecurityGroups[].VpcSecurityGroupId"
98
99
100@RDSCluster.filter_registry.register('subnet')
101class SubnetFilter(net_filters.SubnetFilter):
102
103 RelatedIdsExpression = ""
104 groups = None
105
106 def get_permissions(self):
107 return self.manager.get_resource_manager(
108 'rds-subnet-group').get_permissions()
109
110 def get_subnet_groups(self):
111 return {
112 r['DBSubnetGroupName']: r for r in
113 self.manager.get_resource_manager('rds-subnet-group').resources()}
114
115 def get_related_ids(self, resources):
116 if not self.groups:
117 self.groups = self.get_subnet_groups()
118 group_ids = set()
119 for r in resources:
120 group_ids.update(
121 [s['SubnetIdentifier'] for s in
122 self.groups[r['DBSubnetGroup']]['Subnets']])
123 return group_ids
124
125 def process(self, resources, event=None):
126 if not self.groups:
127 self.groups = self.get_subnet_groups()
128 return super(SubnetFilter, self).process(resources, event)
129
130
131RDSCluster.filter_registry.register('network-location', net_filters.NetworkLocation)
132
133
134@RDSCluster.filter_registry.register('kms-key')
135class KmsFilter(KmsRelatedFilter):
136
137 RelatedIdsExpression = 'KmsKeyId'
138
139
140@RDSCluster.action_registry.register('delete')
141class Delete(BaseAction):
142 """Action to delete a RDS cluster
143
144 To prevent unwanted deletion of clusters, it is recommended to apply a
145 filter to the rule
146
147 :example:
148
149 .. code-block:: yaml
150
151 policies:
152 - name: rds-cluster-delete-unused
153 resource: rds-cluster
154 filters:
155 - type: metrics
156 name: CPUUtilization
157 days: 21
158 value: 1.0
159 op: le
160 actions:
161 - type: delete
162 skip-snapshot: false
163 delete-instances: true
164 """
165
166 schema = type_schema(
167 'delete', **{'skip-snapshot': {'type': 'boolean'},
168 'delete-instances': {'type': 'boolean'}})
169
170 permissions = ('rds:DeleteDBCluster',)
171
172 def process(self, clusters):
173 skip = self.data.get('skip-snapshot', False)
174 delete_instances = self.data.get('delete-instances', True)
175 client = local_session(self.manager.session_factory).client('rds')
176
177 for cluster in clusters:
178 if delete_instances:
179 for instance in cluster.get('DBClusterMembers', []):
180 client.delete_db_instance(
181 DBInstanceIdentifier=instance['DBInstanceIdentifier'],
182 SkipFinalSnapshot=True)
183 self.log.info(
184 'Deleted RDS instance: %s',
185 instance['DBInstanceIdentifier'])
186
187 params = {'DBClusterIdentifier': cluster['DBClusterIdentifier']}
188 if skip:
189 params['SkipFinalSnapshot'] = True
190 else:
191 params['FinalDBSnapshotIdentifier'] = snapshot_identifier(
192 'Final', cluster['DBClusterIdentifier'])
193
194 _run_cluster_method(
195 client.delete_db_cluster, params,
196 (client.exceptions.DBClusterNotFoundFault, client.exceptions.ResourceNotFoundFault),
197 client.exceptions.InvalidDBClusterStateFault)
198
199
200@RDSCluster.action_registry.register('retention')
201class RetentionWindow(BaseAction):
202 """
203 Action to set the retention period on rds cluster snapshots,
204 enforce (min, max, exact) sets retention days occordingly.
205
206 :example:
207
208 .. code-block:: yaml
209
210 policies:
211 - name: rds-cluster-backup-retention
212 resource: rds-cluster
213 filters:
214 - type: value
215 key: BackupRetentionPeriod
216 value: 21
217 op: ne
218 actions:
219 - type: retention
220 days: 21
221 enforce: min
222 """
223
224 date_attribute = "BackupRetentionPeriod"
225 # Tag copy not yet available for Aurora:
226 # https://forums.aws.amazon.com/thread.jspa?threadID=225812
227 schema = type_schema(
228 'retention', **{'days': {'type': 'number'},
229 'enforce': {'type': 'string', 'enum': [
230 'min', 'max', 'exact']}})
231 permissions = ('rds:ModifyDBCluster',)
232
233 def process(self, clusters):
234 client = local_session(self.manager.session_factory).client('rds')
235
236 for cluster in clusters:
237 self.process_snapshot_retention(client, cluster)
238
239 def process_snapshot_retention(self, client, cluster):
240 current_retention = int(cluster.get('BackupRetentionPeriod', 0))
241 new_retention = self.data['days']
242 retention_type = self.data.get('enforce', 'min').lower()
243 if retention_type == 'min':
244 self.set_retention_window(
245 client, cluster, max(current_retention, new_retention))
246 elif retention_type == 'max':
247 self.set_retention_window(
248 client, cluster, min(current_retention, new_retention))
249 elif retention_type == 'exact':
250 self.set_retention_window(client, cluster, new_retention)
251
252 def set_retention_window(self, client, cluster, retention):
253 params = dict(
254 DBClusterIdentifier=cluster['DBClusterIdentifier'],
255 BackupRetentionPeriod=retention
256 )
257 if cluster.get('EngineMode') != 'serverless':
258 params.update(
259 dict(
260 PreferredBackupWindow=cluster['PreferredBackupWindow'],
261 PreferredMaintenanceWindow=cluster['PreferredMaintenanceWindow'])
262 )
263 _run_cluster_method(
264 client.modify_db_cluster,
265 params,
266 (client.exceptions.DBClusterNotFoundFault, client.exceptions.ResourceNotFoundFault),
267 client.exceptions.InvalidDBClusterStateFault
268 )
269
270
271@RDSCluster.action_registry.register('stop')
272class Stop(BaseAction):
273 """Stop a running db cluster
274 """
275
276 schema = type_schema('stop')
277 permissions = ('rds:StopDBCluster',)
278
279 def process(self, clusters):
280 client = local_session(self.manager.session_factory).client('rds')
281 for c in clusters:
282 _run_cluster_method(
283 client.stop_db_cluster, dict(DBClusterIdentifier=c['DBClusterIdentifier']),
284 (client.exceptions.DBClusterNotFoundFault, client.exceptions.ResourceNotFoundFault),
285 client.exceptions.InvalidDBClusterStateFault)
286
287
288@RDSCluster.action_registry.register('start')
289class Start(BaseAction):
290 """Start a stopped db cluster
291 """
292
293 schema = type_schema('start')
294 permissions = ('rds:StartDBCluster',)
295
296 def process(self, clusters):
297 client = local_session(self.manager.session_factory).client('rds')
298 for c in clusters:
299 _run_cluster_method(
300 client.start_db_cluster, dict(DBClusterIdentifier=c['DBClusterIdentifier']),
301 (client.exceptions.DBClusterNotFoundFault, client.exceptions.ResourceNotFoundFault),
302 client.exceptions.InvalidDBClusterStateFault)
303
304
305def _run_cluster_method(method, params, ignore=(), warn=(), method_name=""):
306 try:
307 method(**params)
308 except ignore:
309 pass
310 except warn as e:
311 log.warning(
312 "error %s on cluster %s error %s",
313 method_name or method.__name__, params['DBClusterIdentifier'], e)
314
315
316@RDSCluster.action_registry.register('snapshot')
317class Snapshot(BaseAction):
318 """Action to create a snapshot of a rds cluster
319
320 :example:
321
322 .. code-block:: yaml
323
324 policies:
325 - name: rds-cluster-snapshot
326 resource: rds-cluster
327 actions:
328 - snapshot
329 """
330
331 schema = type_schema('snapshot')
332 permissions = ('rds:CreateDBClusterSnapshot',)
333
334 def process(self, clusters):
335 client = local_session(self.manager.session_factory).client('rds')
336 for cluster in clusters:
337 _run_cluster_method(
338 client.create_db_cluster_snapshot,
339 dict(
340 DBClusterSnapshotIdentifier=snapshot_identifier(
341 'Backup', cluster['DBClusterIdentifier']),
342 DBClusterIdentifier=cluster['DBClusterIdentifier']),
343 (client.exceptions.DBClusterNotFoundFault, client.exceptions.ResourceNotFoundFault),
344 client.exceptions.InvalidDBClusterStateFault)
345
346
347@RDSCluster.action_registry.register('modify-db-cluster')
348class ModifyDbCluster(BaseAction):
349 """Modifies an RDS instance based on specified parameter
350 using ModifyDbInstance.
351
352 'Immediate" determines whether the modification is applied immediately
353 or not. If 'immediate' is not specified, default is false.
354
355 :example:
356
357 .. code-block:: yaml
358
359 policies:
360 - name: disable-db-cluster-deletion-protection
361 resource: rds-cluster
362 filters:
363 - DeletionProtection: true
364 - PubliclyAccessible: true
365 actions:
366 - type: modify-db-cluster
367 attributes:
368 CopyTagsToSnapshot: true
369 DeletionProtection: false
370 """
371
372 schema = type_schema(
373 'modify-db-cluster',
374 attributes={'type': 'object'},
375 required=('attributes',))
376
377 permissions = ('rds:ModifyDBCluster',)
378 shape = 'ModifyDBClusterMessage'
379
380 def validate(self):
381 attrs = dict(self.data['attributes'])
382 if 'DBClusterIdentifier' in attrs:
383 raise PolicyValidationError(
384 "Can't include DBClusterIdentifier in modify-db-cluster action")
385 attrs['DBClusterIdentifier'] = 'PolicyValidation'
386 return shape_validate(attrs, self.shape, 'rds')
387
388 def process(self, clusters):
389 client = local_session(self.manager.session_factory).client('rds')
390 for c in clusters:
391 client.modify_db_cluster(
392 DBClusterIdentifier=c['DBClusterIdentifier'],
393 **self.data['attributes'])
394
395
396class DescribeClusterSnapshot(DescribeSource):
397
398 def get_resources(self, resource_ids, cache=True):
399 client = local_session(self.manager.session_factory).client('rds')
400 return self.manager.retry(
401 client.describe_db_cluster_snapshots,
402 Filters=[{
403 'Name': 'db-cluster-snapshot-id',
404 'Values': resource_ids}]).get('DBClusterSnapshots', ())
405
406 def augment(self, resources):
407 for r in resources:
408 r['Tags'] = r.pop('TagList', ())
409 return resources
410
411
412class ConfigClusterSnapshot(ConfigSource):
413
414 def load_resource(self, item):
415
416 resource = super(ConfigClusterSnapshot, self).load_resource(item)
417 # db cluster snapshots are particularly mangled on keys
418 for k, v in list(resource.items()):
419 if k.startswith('Dbcl'):
420 resource.pop(k)
421 k = 'DBCl%s' % k[4:]
422 resource[k] = v
423 elif k.startswith('Iamd'):
424 resource.pop(k)
425 k = 'IAMD%s' % k[4:]
426 resource[k] = v
427 return resource
428
429
430@resources.register('rds-cluster-snapshot')
431class RDSClusterSnapshot(QueryResourceManager):
432 """Resource manager for RDS cluster snapshots.
433 """
434
435 class resource_type(TypeInfo):
436 service = 'rds'
437 arn_type = 'cluster-snapshot'
438 arn_separator = ':'
439 arn = 'DBClusterSnapshotArn'
440 enum_spec = (
441 'describe_db_cluster_snapshots', 'DBClusterSnapshots', None)
442 name = id = 'DBClusterSnapshotIdentifier'
443 date = 'SnapshotCreateTime'
444 universal_taggable = object()
445 config_type = 'AWS::RDS::DBClusterSnapshot'
446 permissions_enum = ('rds:DescribeDBClusterSnapshots',)
447
448 source_mapping = {
449 'describe': DescribeClusterSnapshot,
450 'config': ConfigClusterSnapshot
451 }
452
453
454@RDSClusterSnapshot.filter_registry.register('cross-account')
455class CrossAccountSnapshot(CrossAccountAccessFilter):
456
457 permissions = ('rds:DescribeDBClusterSnapshotAttributes',)
458 attributes_key = 'c7n:attributes'
459 annotation_key = 'c7n:CrossAccountViolations'
460
461 def process(self, resources, event=None):
462 self.accounts = self.get_accounts()
463 self.everyone_only = self.data.get("everyone_only", False)
464 results = []
465 with self.executor_factory(max_workers=2) as w:
466 futures = []
467 for resource_set in chunks(resources, 20):
468 futures.append(w.submit(
469 self.process_resource_set, resource_set))
470 for f in as_completed(futures):
471 results.extend(f.result())
472 return results
473
474 def process_resource_set(self, resource_set):
475 client = local_session(self.manager.session_factory).client('rds')
476 results = []
477 for r in resource_set:
478 attrs = {t['AttributeName']: t['AttributeValues']
479 for t in self.manager.retry(
480 client.describe_db_cluster_snapshot_attributes,
481 DBClusterSnapshotIdentifier=r['DBClusterSnapshotIdentifier'])[
482 'DBClusterSnapshotAttributesResult']['DBClusterSnapshotAttributes']}
483 r[self.attributes_key] = attrs
484 shared_accounts = set(attrs.get('restore', []))
485 if self.everyone_only:
486 shared_accounts = {a for a in shared_accounts if a == 'all'}
487 delta_accounts = shared_accounts.difference(self.accounts)
488 if delta_accounts:
489 r[self.annotation_key] = list(delta_accounts)
490 results.append(r)
491 return results
492
493
494@RDSClusterSnapshot.filter_registry.register('age')
495class RDSSnapshotAge(AgeFilter):
496 """Filters rds cluster snapshots based on age (in days)
497
498 :example:
499
500 .. code-block:: yaml
501
502 policies:
503 - name: rds-cluster-snapshots-expired
504 resource: rds-cluster-snapshot
505 filters:
506 - type: age
507 days: 30
508 op: gt
509 """
510
511 schema = type_schema(
512 'age', days={'type': 'number'},
513 op={'$ref': '#/definitions/filters_common/comparison_operators'})
514
515 date_attribute = 'SnapshotCreateTime'
516
517
518@RDSClusterSnapshot.action_registry.register('set-permissions')
519class SetPermissions(rds.SetPermissions):
520 """Set permissions for copying or restoring an RDS cluster snapshot
521
522 Use the 'add' and 'remove' parameters to control which accounts to
523 add or remove, respectively. The default is to remove any
524 permissions granted to other AWS accounts.
525
526 Use `remove: matched` in combination with the `cross-account` filter
527 for more flexible removal options such as preserving access for
528 a set of whitelisted accounts:
529
530 :example:
531
532 .. code-block:: yaml
533
534 policies:
535 - name: rds-cluster-snapshot-prune-permissions
536 resource: rds-cluster-snapshot
537 filters:
538 - type: cross-account
539 whitelist:
540 - '112233445566'
541 actions:
542 - type: set-permissions
543 remove: matched
544 """
545 permissions = ('rds:ModifyDBClusterSnapshotAttribute',)
546
547 def process_snapshot(self, client, snapshot):
548 add_accounts = self.data.get('add', [])
549 remove_accounts = self.data.get('remove', [])
550
551 if not (add_accounts or remove_accounts):
552 if CrossAccountSnapshot.attributes_key not in snapshot:
553 attrs = {
554 t['AttributeName']: t['AttributeValues']
555 for t in self.manager.retry(
556 client.describe_db_cluster_snapshot_attributes,
557 DBClusterSnapshotIdentifier=snapshot['DBClusterSnapshotIdentifier']
558 )['DBClusterSnapshotAttributesResult']['DBClusterSnapshotAttributes']
559 }
560 snapshot[CrossAccountSnapshot.attributes_key] = attrs
561 remove_accounts = snapshot[CrossAccountSnapshot.attributes_key].get('restore', [])
562 elif remove_accounts == 'matched':
563 remove_accounts = snapshot.get(CrossAccountSnapshot.annotation_key, [])
564
565 if add_accounts or remove_accounts:
566 client.modify_db_cluster_snapshot_attribute(
567 DBClusterSnapshotIdentifier=snapshot['DBClusterSnapshotIdentifier'],
568 AttributeName='restore',
569 ValuesToRemove=remove_accounts,
570 ValuesToAdd=add_accounts)
571
572
573@RDSClusterSnapshot.action_registry.register('delete')
574class RDSClusterSnapshotDelete(BaseAction):
575 """Action to delete rds cluster snapshots
576
577 To prevent unwanted deletion of rds cluster snapshots, it is recommended
578 to apply a filter to the rule
579
580 :example:
581
582 .. code-block:: yaml
583
584 policies:
585 - name: rds-cluster-snapshots-expired-delete
586 resource: rds-cluster-snapshot
587 filters:
588 - type: age
589 days: 30
590 op: gt
591 actions:
592 - delete
593 """
594
595 schema = type_schema('delete')
596 permissions = ('rds:DeleteDBClusterSnapshot',)
597
598 def process(self, snapshots):
599 self.log.info("Deleting %d RDS cluster snapshots", len(snapshots))
600 client = local_session(self.manager.session_factory).client('rds')
601 error = None
602 with self.executor_factory(max_workers=2) as w:
603 futures = []
604 for snapshot_set in chunks(reversed(snapshots), size=50):
605 futures.append(
606 w.submit(self.process_snapshot_set, client, snapshot_set))
607 for f in as_completed(futures):
608 if f.exception():
609 error = f.exception()
610 self.log.error(
611 "Exception deleting snapshot set \n %s",
612 f.exception())
613 if error:
614 raise error
615 return snapshots
616
617 def process_snapshot_set(self, client, snapshots_set):
618 for s in snapshots_set:
619 try:
620 client.delete_db_cluster_snapshot(
621 DBClusterSnapshotIdentifier=s['DBClusterSnapshotIdentifier'])
622 except (client.exceptions.DBSnapshotNotFoundFault,
623 client.exceptions.InvalidDBSnapshotStateFault):
624 continue
625
626
627@RDSClusterSnapshot.action_registry.register("region-copy")
628class RDSClusterSnapshotRegionCopy(BaseAction):
629 """Copy an cluster snapshot across regions
630
631
632 Example::
633
634 - name: copy-encrypted-cluster-snapshots
635 description: |
636 copy cluster snapshots under 1 day old to dr region with kms
637 resource: rds-cluster-snapshot
638 region: us-east-1
639 filters:
640 - Status: available
641 - type: value
642 key: SnapshotCreateTime
643 value_type: age
644 value: 1
645 op: less-than
646 actions:
647 - type: region-copy
648 target_region: us-east-2
649 target_key: arn:aws:kms:us-east-2:644160558196:key/b10f842a-feb7-4318-92d5-0640a75b7688
650 copy_tags: true
651 tags:
652 OriginRegion: us-east-1
653 """
654
655 schema = type_schema(
656 "region-copy",
657 target_region={"type": "string"},
658 target_key={"type": "string"},
659 copy_tags={"type": "boolean"},
660 tags={"type": "object"},
661 required=("target_region",),
662 )
663
664 permissions = ("rds:CopyDBClusterSnapshot",)
665 min_delay = 120
666 max_attempts = 30
667
668 def validate(self):
669 if self.data.get('target_region') and self.manager.data.get('mode'):
670 raise PolicyValidationError(
671 "cross region snapshot may require waiting for "
672 "longer then lambda runtime allows %s" % (self.manager.data,))
673 return self
674
675 def process(self, resources):
676 if self.data['target_region'] == self.manager.config.region:
677 self.log.warning(
678 "Source and destination region are the same, skipping copy")
679 return
680 for resource_set in chunks(resources, 20):
681 self.process_resource_set(resource_set)
682
683 def process_resource(self, target, key, tags, snapshot):
684 p = {}
685 if key:
686 p['KmsKeyId'] = key
687 p['TargetDBClusterSnapshotIdentifier'] = snapshot[
688 'DBClusterSnapshotIdentifier'].replace(':', '-')
689 p['SourceRegion'] = self.manager.config.region
690 p['SourceDBClusterSnapshotIdentifier'] = snapshot['DBClusterSnapshotArn']
691
692 if self.data.get('copy_tags', True):
693 p['CopyTags'] = True
694 if tags:
695 p['Tags'] = tags
696
697 retry = get_retry(
698 ('SnapshotQuotaExceeded',),
699 # TODO make this configurable, class defaults to 1hr
700 min_delay=self.min_delay,
701 max_attempts=self.max_attempts,
702 log_retries=logging.DEBUG)
703
704 try:
705 result = retry(target.copy_db_cluster_snapshot, **p)
706 except ClientError as e:
707 if e.response['Error']['Code'] == 'DBClusterSnapshotAlreadyExists':
708 self.log.warning(
709 "Cluster snapshot %s already exists in target region",
710 snapshot['DBClusterSnapshotIdentifier'])
711 return
712 raise
713 snapshot['c7n:CopiedClusterSnapshot'] = result[
714 'DBClusterSnapshot']['DBClusterSnapshotArn']
715
716 def process_resource_set(self, resource_set):
717 target_client = self.manager.session_factory(
718 region=self.data['target_region']).client('rds')
719 target_key = self.data.get('target_key')
720 tags = [{'Key': k, 'Value': v} for k, v
721 in self.data.get('tags', {}).items()]
722
723 for snapshot_set in chunks(resource_set, 5):
724 for r in snapshot_set:
725 # If tags are supplied, copy tags are ignored, and
726 # we need to augment the tag set with the original
727 # resource tags to preserve the common case.
728 rtags = tags and list(tags) or None
729 if tags and self.data.get('copy_tags', True):
730 rtags.extend(r['Tags'])
731 self.process_resource(target_client, target_key, rtags, r)
732
733
734RDSCluster.filter_registry.register('consecutive-aws-backups', ConsecutiveAwsBackupsFilter)
735
736
737@RDSCluster.filter_registry.register('consecutive-snapshots')
738class ConsecutiveSnapshots(Filter):
739 """Returns RDS clusters where number of consective daily snapshots is equal to/or greater
740 than n days.
741
742 :example:
743
744 .. code-block:: yaml
745
746 policies:
747 - name: rdscluster-daily-snapshot-count
748 resource: rds-cluster
749 filters:
750 - type: consecutive-snapshots
751 days: 7
752 """
753 schema = type_schema('consecutive-snapshots', days={'type': 'number', 'minimum': 1},
754 required=['days'])
755 permissions = ('rds:DescribeDBClusterSnapshots', 'rds:DescribeDBClusters')
756 annotation = 'c7n:DBClusterSnapshots'
757
758 def process_resource_set(self, client, resources):
759 rds_clusters = [r['DBClusterIdentifier'] for r in resources]
760 paginator = client.get_paginator('describe_db_cluster_snapshots')
761 paginator.PAGE_ITERATOR_CLS = RetryPageIterator
762 cluster_snapshots = paginator.paginate(Filters=[{'Name': 'db-cluster-id',
763 'Values': rds_clusters}]).build_full_result().get('DBClusterSnapshots', [])
764
765 cluster_map = {}
766 for snapshot in cluster_snapshots:
767 cluster_map.setdefault(snapshot['DBClusterIdentifier'], []).append(snapshot)
768 for r in resources:
769 r[self.annotation] = cluster_map.get(r['DBClusterIdentifier'], [])
770
771 def process(self, resources, event=None):
772 client = local_session(self.manager.session_factory).client('rds')
773 results = []
774 retention = self.data.get('days')
775 utcnow = datetime.utcnow()
776 expected_dates = set()
777 for days in range(1, retention + 1):
778 expected_dates.add((utcnow - timedelta(days=days)).strftime('%Y-%m-%d'))
779
780 for resource_set in chunks(
781 [r for r in resources if self.annotation not in r], 50):
782 self.process_resource_set(client, resource_set)
783
784 for r in resources:
785 snapshot_dates = set()
786 for snapshot in r[self.annotation]:
787 if snapshot['Status'] == 'available':
788 snapshot_dates.add(snapshot['SnapshotCreateTime'].strftime('%Y-%m-%d'))
789 if expected_dates.issubset(snapshot_dates):
790 results.append(r)
791 return results
792
793
794@RDSCluster.filter_registry.register('db-cluster-parameter')
795class ClusterParameterFilter(ParameterFilter):
796 """
797 Applies value type filter on set db cluster parameter values.
798
799 :example:
800
801 .. code-block:: yaml
802
803 policies:
804 - name: rdscluster-pg
805 resource: rds-cluster
806 filters:
807 - type: db-cluster-parameter
808 key: someparam
809 op: eq
810 value: someval
811 """
812 schema = type_schema('db-cluster-parameter', rinherit=ValueFilter.schema)
813 schema_alias = False
814 permissions = ('rds:DescribeDBInstances', 'rds:DescribeDBParameters',)
815 policy_annotation = 'c7n:MatchedDBClusterParameter'
816 param_group_attribute = 'DBClusterParameterGroup'
817
818 def _get_param_list(self, pg):
819 client = local_session(self.manager.session_factory).client('rds')
820 paginator = client.get_paginator('describe_db_cluster_parameters')
821 param_list = list(itertools.chain(*[p['Parameters']
822 for p in paginator.paginate(DBClusterParameterGroupName=pg)]))
823 return param_list
824
825 def process(self, resources, event=None):
826 results = []
827 parameter_group_list = {db.get(self.param_group_attribute) for db in resources}
828 paramcache = self.handle_paramgroup_cache(parameter_group_list)
829 for resource in resources:
830 pg_values = paramcache[resource['DBClusterParameterGroup']]
831 if self.match(pg_values):
832 resource.setdefault(self.policy_annotation, []).append(
833 self.data.get('key'))
834 results.append(resource)
835 return results
836
837
838@RDSCluster.filter_registry.register('pending-maintenance')
839class PendingMaintenance(Filter):
840 """
841 Scan DB Clusters for those with pending maintenance
842
843 :example:
844
845 .. code-block:: yaml
846
847 policies:
848 - name: rds-cluster-pending-maintenance
849 resource: rds-cluster
850 filters:
851 - pending-maintenance
852 - type: value
853 key: '"c7n:PendingMaintenance".PendingMaintenanceActionDetails[].Action'
854 op: intersect
855 value:
856 - system-update
857 """
858
859 annotation_key = 'c7n:PendingMaintenance'
860 schema = type_schema('pending-maintenance')
861 permissions = ('rds:DescribePendingMaintenanceActions',)
862
863 def process(self, resources, event=None):
864 client = local_session(self.manager.session_factory).client('rds')
865
866 results = []
867 resource_maintenances = {}
868 paginator = client.get_paginator('describe_pending_maintenance_actions')
869 for page in paginator.paginate():
870 for action in page['PendingMaintenanceActions']:
871 resource_maintenances.setdefault(action['ResourceIdentifier'], []).append(action)
872
873 for r in resources:
874 pending_maintenances = resource_maintenances.get(r['DBClusterArn'], [])
875 if len(pending_maintenances) > 0:
876 r[self.annotation_key] = pending_maintenances
877 results.append(r)
878
879 return results
880
881
882class DescribeDbShardGroup(DescribeSource):
883 def augment(self, resources):
884 for r in resources:
885 r['Tags'] = r.pop('TagList', ())
886 return resources
887
888
889@resources.register('rds-db-shard-group')
890class RDSDbShardGroup(QueryResourceManager):
891 class resource_type(TypeInfo):
892 service = 'rds'
893 arn = 'DBShardGroupArn'
894 name = 'DBShardGroupIdentifier'
895 id = 'DBShardGroupResourceId'
896 enum_spec = ('describe_db_shard_groups', 'DBShardGroups', None)
897 cfn_type = 'AWS::RDS::DBShardGroup'
898 permissions_enum = ("rds:DescribeDBShardGroups",)
899 universal_taggable = object()
900
901 source_mapping = {
902 'describe': DescribeDbShardGroup
903 }