Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/c7n/resources/redshift.py: 46%
465 statements
« prev ^ index » next coverage.py v7.3.2, created at 2023-12-08 06:51 +0000
« prev ^ index » next coverage.py v7.3.2, created at 2023-12-08 06:51 +0000
1# Copyright The Cloud Custodian Authors.
2# SPDX-License-Identifier: Apache-2.0
3import json
4import itertools
6from botocore.exceptions import ClientError
7from concurrent.futures import as_completed
9from c7n.actions import BaseAction, ModifyVpcSecurityGroupsAction
10from c7n.exceptions import PolicyValidationError
11from c7n.filters import (
12 ValueFilter, AgeFilter, CrossAccountAccessFilter, Filter)
13import c7n.filters.vpc as net_filters
14from c7n.filters.kms import KmsRelatedFilter
15from c7n.filters.offhours import OffHour, OnHour
16from c7n.manager import resources
17from c7n.resolver import ValuesFrom
18from c7n.query import QueryResourceManager, TypeInfo, RetryPageIterator
19from c7n import tags
20from c7n.utils import (
21 type_schema, local_session, chunks, snapshot_identifier, jmespath_search)
22from .aws import shape_validate
23from datetime import datetime, timedelta
24from c7n.filters.backup import ConsecutiveAwsBackupsFilter
27@resources.register('redshift')
28class Redshift(QueryResourceManager):
30 class resource_type(TypeInfo):
31 service = 'redshift'
32 arn_type = 'cluster'
33 arn_separator = ":"
34 enum_spec = ('describe_clusters', 'Clusters', None)
35 name = id = 'ClusterIdentifier'
36 filter_name = 'ClusterIdentifier'
37 filter_type = 'scalar'
38 date = 'ClusterCreateTime'
39 dimension = 'ClusterIdentifier'
40 cfn_type = config_type = "AWS::Redshift::Cluster"
43Redshift.filter_registry.register('marked-for-op', tags.TagActionFilter)
44Redshift.filter_registry.register('network-location', net_filters.NetworkLocation)
45Redshift.filter_registry.register('offhour', OffHour)
46Redshift.filter_registry.register('onhour', OnHour)
47Redshift.filter_registry.register('consecutive-aws-backups', ConsecutiveAwsBackupsFilter)
50@Redshift.filter_registry.register('default-vpc')
51class DefaultVpc(net_filters.DefaultVpcBase):
52 """ Matches if an redshift database is in the default vpc
54 :example:
56 .. code-block:: yaml
58 policies:
59 - name: redshift-default-vpc
60 resource: redshift
61 filters:
62 - default-vpc
63 """
65 schema = type_schema('default-vpc')
67 def __call__(self, redshift):
68 return (redshift.get('VpcId') and
69 self.match(redshift.get('VpcId')) or False)
72@Redshift.filter_registry.register('logging')
73class LoggingFilter(ValueFilter):
74 """ Checks Redshift logging status and attributes.
76 :example:
78 .. code-block:: yaml
81 policies:
83 - name: redshift-logging-bucket-and-prefix-test
84 resource: redshift
85 filters:
86 - type: logging
87 key: LoggingEnabled
88 value: true
89 - type: logging
90 key: S3KeyPrefix
91 value: "accounts/{account_id}"
92 - type: logging
93 key: BucketName
94 value: "redshiftlogs"
97 """
98 permissions = ("redshift:DescribeLoggingStatus",)
99 schema = type_schema('logging', rinherit=ValueFilter.schema)
100 annotation_key = 'c7n:logging'
102 def process(self, clusters, event=None):
103 client = local_session(self.manager.session_factory).client('redshift')
104 results = []
105 for cluster in clusters:
106 if self.annotation_key not in cluster:
107 try:
108 result = client.describe_logging_status(
109 ClusterIdentifier=cluster['ClusterIdentifier'])
110 result.pop('ResponseMetadata')
111 except client.exceptions.ClusterNotFound:
112 continue
113 cluster[self.annotation_key] = result
115 if self.match(cluster[self.annotation_key]):
116 results.append(cluster)
117 return results
120@Redshift.action_registry.register('pause')
121class Pause(BaseAction):
123 schema = type_schema('pause')
124 permissions = ('redshift:PauseCluster',)
126 def process(self, resources):
127 client = local_session(
128 self.manager.session_factory).client('redshift')
129 for r in self.filter_resources(resources, 'ClusterStatus', ('available',)):
130 try:
131 client.pause_cluster(
132 ClusterIdentifier=r['ClusterIdentifier'])
133 except (client.exceptions.ClusterNotFoundFault,
134 client.exceptions.InvalidClusterStateFault):
135 raise
138@Redshift.action_registry.register('resume')
139class Resume(BaseAction):
141 schema = type_schema('resume')
142 permissions = ('redshift:ResumeCluster',)
144 def process(self, resources):
145 client = local_session(
146 self.manager.session_factory).client('redshift')
147 for r in self.filter_resources(resources, 'ClusterStatus', ('paused',)):
148 try:
149 client.resume_cluster(
150 ClusterIdentifier=r['ClusterIdentifier'])
151 except (client.exceptions.ClusterNotFoundFault,
152 client.exceptions.InvalidClusterStateFault):
153 raise
156@Redshift.action_registry.register('set-logging')
157class SetRedshiftLogging(BaseAction):
158 """Action to enable/disable Redshift logging for a Redshift Cluster.
160 :example:
162 .. code-block:: yaml
164 policies:
165 - name: redshift-test
166 resource: redshift
167 filters:
168 - type: logging
169 key: LoggingEnabled
170 value: false
171 actions:
172 - type: set-logging
173 bucket: redshiftlogtest
174 prefix: redshiftlogs
175 state: enabled
176 """
177 schema = type_schema(
178 'set-logging',
179 state={'enum': ['enabled', 'disabled']},
180 bucket={'type': 'string'},
181 prefix={'type': 'string'},
182 required=('state',))
184 def get_permissions(self):
185 perms = ('redshift:EnableLogging',)
186 if self.data.get('state') == 'disabled':
187 return ('redshift:DisableLogging',)
188 return perms
190 def validate(self):
191 if self.data.get('state') == 'enabled':
192 if 'bucket' not in self.data:
193 raise PolicyValidationError((
194 "redshift logging enablement requires `bucket` "
195 "and `prefix` specification on %s" % (self.manager.data,)))
196 return self
198 def process(self, resources):
199 client = local_session(self.manager.session_factory).client('redshift')
200 for redshift in resources:
201 redshift_id = redshift['ClusterIdentifier']
203 if self.data.get('state') == 'enabled':
205 prefix = self.data.get('prefix')
206 bucketname = self.data.get('bucket')
208 self.manager.retry(
209 client.enable_logging,
210 ClusterIdentifier=redshift_id, BucketName=bucketname, S3KeyPrefix=prefix)
212 elif self.data.get('state') == 'disabled':
214 self.manager.retry(
215 client.disable_logging,
216 ClusterIdentifier=redshift_id)
219@Redshift.filter_registry.register('security-group')
220class SecurityGroupFilter(net_filters.SecurityGroupFilter):
222 RelatedIdsExpression = "VpcSecurityGroups[].VpcSecurityGroupId"
225@Redshift.filter_registry.register('subnet')
226class SubnetFilter(net_filters.SubnetFilter):
228 RelatedIdsExpression = ""
230 def get_permissions(self):
231 return RedshiftSubnetGroup(self.manager.ctx, {}).get_permissions()
233 def get_related_ids(self, resources):
234 group_ids = set()
235 for r in resources:
236 group_ids.update(
237 [s['SubnetIdentifier'] for s in
238 self.groups[r['ClusterSubnetGroupName']]['Subnets']])
239 return group_ids
241 def process(self, resources, event=None):
242 self.groups = {r['ClusterSubnetGroupName']: r for r in
243 RedshiftSubnetGroup(self.manager.ctx, {}).resources()}
244 return super(SubnetFilter, self).process(resources, event)
247@Redshift.filter_registry.register('param')
248class Parameter(ValueFilter):
249 """Filter redshift clusters based on parameter values
251 :example:
253 .. code-block:: yaml
255 policies:
256 - name: redshift-param-ssl
257 resource: redshift
258 filters:
259 - type: param
260 key: require_ssl
261 value: false
262 op: eq
263 """
265 schema = type_schema('param', rinherit=ValueFilter.schema)
266 schema_alias = False
267 group_params = ()
269 permissions = ("redshift:DescribeClusterParameters",)
271 def process(self, clusters, event=None):
272 groups = {}
273 for r in clusters:
274 for pg in r['ClusterParameterGroups']:
275 groups.setdefault(pg['ParameterGroupName'], []).append(
276 r['ClusterIdentifier'])
278 def get_params(group_name):
279 c = local_session(self.manager.session_factory).client('redshift')
280 paginator = c.get_paginator('describe_cluster_parameters')
281 param_group = list(itertools.chain(*[p['Parameters']
282 for p in paginator.paginate(ParameterGroupName=group_name)]))
283 params = {}
284 for p in param_group:
285 v = p['ParameterValue']
286 if v != 'default' and p['DataType'] in ('integer', 'boolean'):
287 # overkill..
288 v = json.loads(v)
289 params[p['ParameterName']] = v
290 return params
292 with self.executor_factory(max_workers=3) as w:
293 group_names = groups.keys()
294 self.group_params = dict(
295 zip(group_names, w.map(get_params, group_names)))
296 return super(Parameter, self).process(clusters, event)
298 def __call__(self, db):
299 params = {}
300 for pg in db['ClusterParameterGroups']:
301 params.update(self.group_params[pg['ParameterGroupName']])
302 return self.match(params)
305@Redshift.filter_registry.register('kms-key')
306class KmsFilter(KmsRelatedFilter):
308 RelatedIdsExpression = 'KmsKeyId'
311@Redshift.action_registry.register('delete')
312class Delete(BaseAction):
313 """Action to delete a redshift cluster
315 To prevent unwanted deletion of redshift clusters, it is recommended to
316 apply a filter to the rule
318 :example:
320 .. code-block:: yaml
322 policies:
323 - name: redshift-no-ssl
324 resource: redshift
325 filters:
326 - type: param
327 key: require_ssl
328 value: false
329 op: eq
330 actions:
331 - type: delete
332 """
334 schema = type_schema(
335 'delete', **{'skip-snapshot': {'type': 'boolean'}})
337 permissions = ('redshift:DeleteCluster',)
339 def process(self, clusters):
340 with self.executor_factory(max_workers=2) as w:
341 futures = []
342 for db_set in chunks(clusters, size=5):
343 futures.append(
344 w.submit(self.process_db_set, db_set))
345 for f in as_completed(futures):
346 if f.exception():
347 self.log.error(
348 "Exception deleting redshift set \n %s",
349 f.exception())
351 def process_db_set(self, db_set):
352 skip = self.data.get('skip-snapshot', False)
353 c = local_session(self.manager.session_factory).client('redshift')
354 for db in db_set:
355 params = {'ClusterIdentifier': db['ClusterIdentifier']}
356 if skip:
357 params['SkipFinalClusterSnapshot'] = True
358 else:
359 params['FinalClusterSnapshotIdentifier'] = snapshot_identifier(
360 'Final', db['ClusterIdentifier'])
361 try:
362 c.delete_cluster(**params)
363 except ClientError as e:
364 if e.response['Error']['Code'] == "InvalidClusterState":
365 self.log.warning(
366 "Cannot delete cluster when not 'Available' state: %s",
367 db['ClusterIdentifier'])
368 continue
369 raise
372@Redshift.action_registry.register('retention')
373class RetentionWindow(BaseAction):
374 """Action to set the snapshot retention period (in days)
376 :example:
378 .. code-block:: yaml
380 policies:
381 - name: redshift-snapshot-retention
382 resource: redshift
383 filters:
384 - type: value
385 key: AutomatedSnapshotRetentionPeriod
386 value: 21
387 op: ne
388 actions:
389 - type: retention
390 days: 21
391 """
393 date_attribute = 'AutomatedSnapshotRetentionPeriod'
394 schema = type_schema(
395 'retention',
396 **{'days': {'type': 'number'}})
397 permissions = ('redshift:ModifyCluster',)
399 def process(self, clusters):
400 with self.executor_factory(max_workers=2) as w:
401 futures = []
402 for cluster in clusters:
403 futures.append(w.submit(
404 self.process_snapshot_retention,
405 cluster))
406 for f in as_completed(futures):
407 if f.exception():
408 self.log.error(
409 "Exception setting Redshift retention \n %s",
410 f.exception())
412 def process_snapshot_retention(self, cluster):
413 current_retention = int(cluster.get(self.date_attribute, 0))
414 new_retention = self.data['days']
416 if current_retention < new_retention:
417 self.set_retention_window(
418 cluster,
419 max(current_retention, new_retention))
420 return cluster
422 def set_retention_window(self, cluster, retention):
423 c = local_session(self.manager.session_factory).client('redshift')
424 c.modify_cluster(
425 ClusterIdentifier=cluster['ClusterIdentifier'],
426 AutomatedSnapshotRetentionPeriod=retention)
429@Redshift.action_registry.register('snapshot')
430class Snapshot(BaseAction):
431 """Action to take a snapshot of a redshift cluster
433 :example:
435 .. code-block:: yaml
437 policies:
438 - name: redshift-snapshot
439 resource: redshift
440 filters:
441 - type: value
442 key: ClusterStatus
443 value: available
444 op: eq
445 actions:
446 - snapshot
447 """
449 schema = type_schema('snapshot')
450 permissions = ('redshift:CreateClusterSnapshot',)
452 def process(self, clusters):
453 client = local_session(self.manager.session_factory).client('redshift')
454 with self.executor_factory(max_workers=2) as w:
455 futures = []
456 for cluster in clusters:
457 futures.append(w.submit(
458 self.process_cluster_snapshot,
459 client, cluster))
460 for f in as_completed(futures):
461 if f.exception():
462 self.log.error(
463 "Exception creating Redshift snapshot \n %s",
464 f.exception())
465 return clusters
467 def process_cluster_snapshot(self, client, cluster):
468 cluster_tags = cluster.get('Tags')
469 client.create_cluster_snapshot(
470 SnapshotIdentifier=snapshot_identifier(
471 'Backup',
472 cluster['ClusterIdentifier']),
473 ClusterIdentifier=cluster['ClusterIdentifier'],
474 Tags=cluster_tags)
477@Redshift.action_registry.register('enable-vpc-routing')
478class EnhancedVpcRoutine(BaseAction):
479 """Action to enable enhanced vpc routing on a redshift cluster
481 More: https://docs.aws.amazon.com/redshift/latest/mgmt/enhanced-vpc-routing.html
483 :example:
485 .. code-block:: yaml
487 policies:
488 - name: redshift-enable-enhanced-routing
489 resource: redshift
490 filters:
491 - type: value
492 key: EnhancedVpcRouting
493 value: false
494 op: eq
495 actions:
496 - type: enable-vpc-routing
497 value: true
498 """
500 schema = type_schema(
501 'enable-vpc-routing',
502 value={'type': 'boolean'})
503 permissions = ('redshift:ModifyCluster',)
505 def process(self, clusters):
506 with self.executor_factory(max_workers=3) as w:
507 futures = []
508 for cluster in clusters:
509 futures.append(w.submit(
510 self.process_vpc_routing,
511 cluster))
512 for f in as_completed(futures):
513 if f.exception():
514 self.log.error(
515 "Exception changing Redshift VPC routing \n %s",
516 f.exception())
517 return clusters
519 def process_vpc_routing(self, cluster):
520 current_routing = bool(cluster.get('EnhancedVpcRouting', False))
521 new_routing = self.data.get('value', True)
523 if current_routing != new_routing:
524 c = local_session(self.manager.session_factory).client('redshift')
525 c.modify_cluster(
526 ClusterIdentifier=cluster['ClusterIdentifier'],
527 EnhancedVpcRouting=new_routing)
530@Redshift.action_registry.register('set-public-access')
531class RedshiftSetPublicAccess(BaseAction):
532 """
533 Action to set the 'PubliclyAccessible' setting on a redshift cluster
535 :example:
537 .. code-block:: yaml
539 policies:
540 - name: redshift-set-public-access
541 resource: redshift
542 filters:
543 - PubliclyAccessible: true
544 actions:
545 - type: set-public-access
546 state: false
547 """
549 schema = type_schema(
550 'set-public-access',
551 state={'type': 'boolean'})
552 permissions = ('redshift:ModifyCluster',)
554 def set_access(self, c):
555 client = local_session(self.manager.session_factory).client('redshift')
556 client.modify_cluster(
557 ClusterIdentifier=c['ClusterIdentifier'],
558 PubliclyAccessible=self.data.get('state', False))
560 def process(self, clusters):
561 with self.executor_factory(max_workers=2) as w:
562 futures = {w.submit(self.set_access, c): c for c in clusters}
563 for f in as_completed(futures):
564 if f.exception():
565 self.log.error(
566 "Exception setting Redshift public access on %s \n %s",
567 futures[f]['ClusterIdentifier'], f.exception())
568 return clusters
571@Redshift.action_registry.register('set-attributes')
572class RedshiftSetAttributes(BaseAction):
573 """
574 Action to modify Redshift clusters
576 :example:
578 .. code-block:: yaml
580 policies:
581 - name: redshift-modify-cluster
582 resource: redshift
583 filters:
584 - type: value
585 key: AllowVersionUpgrade
586 value: false
587 actions:
588 - type: set-attributes
589 attributes:
590 AllowVersionUpgrade: true
591 """
593 schema = type_schema('set-attributes',
594 attributes={"type": "object"},
595 required=('attributes',))
597 permissions = ('redshift:ModifyCluster',)
598 cluster_mapping = {
599 'ElasticIp': 'ElasticIpStatus.ElasticIp',
600 'ClusterSecurityGroups': 'ClusterSecurityGroups[].ClusterSecurityGroupName',
601 'VpcSecurityGroupIds': 'VpcSecurityGroups[].ClusterSecurityGroupName',
602 'HsmClientCertificateIdentifier': 'HsmStatus.HsmClientCertificateIdentifier',
603 'HsmConfigurationIdentifier': 'HsmStatus.HsmConfigurationIdentifier'
604 }
606 shape = 'ModifyClusterMessage'
608 def validate(self):
609 attrs = dict(self.data.get('attributes'))
610 if attrs.get('ClusterIdentifier'):
611 raise PolicyValidationError('ClusterIdentifier field cannot be updated')
612 attrs["ClusterIdentifier"] = ""
613 return shape_validate(attrs, self.shape, 'redshift')
615 def process(self, clusters):
616 client = local_session(self.manager.session_factory).client(
617 self.manager.get_model().service)
618 for cluster in clusters:
619 self.process_cluster(client, cluster)
621 def process_cluster(self, client, cluster):
622 try:
623 config = dict(self.data.get('attributes'))
624 modify = {}
625 for k, v in config.items():
626 if ((k in self.cluster_mapping and
627 v != jmespath_search(self.cluster_mapping[k], cluster)) or
628 v != cluster.get('PendingModifiedValues', {}).get(k, cluster.get(k))):
629 modify[k] = v
630 if not modify:
631 return
633 modify['ClusterIdentifier'] = (cluster.get('PendingModifiedValues', {})
634 .get('ClusterIdentifier')
635 or cluster.get('ClusterIdentifier'))
636 client.modify_cluster(**modify)
637 except (client.exceptions.ClusterNotFoundFault):
638 return
639 except ClientError as e:
640 self.log.warning(
641 "Exception trying to modify cluster: %s error: %s",
642 cluster['ClusterIdentifier'], e)
643 raise
646@Redshift.action_registry.register('mark-for-op')
647class TagDelayedAction(tags.TagDelayedAction):
648 """Action to create an action to be performed at a later time
650 :example:
652 .. code-block:: yaml
654 policies:
655 - name: redshift-terminate-unencrypted
656 resource: redshift
657 filters:
658 - "tag:custodian_cleanup": absent
659 - type: value
660 key: Encrypted
661 value: false
662 op: eq
663 actions:
664 - type: mark-for-op
665 tag: custodian_cleanup
666 op: delete
667 days: 5
668 msg: "Unencrypted Redshift cluster: {op}@{action_date}"
669 """
672@Redshift.action_registry.register('tag')
673class Tag(tags.Tag):
674 """Action to add tag/tags to a redshift cluster
676 :example:
678 .. code-block:: yaml
680 policies:
681 - name: redshift-tag
682 resource: redshift
683 filters:
684 - "tag:RedshiftTag": absent
685 actions:
686 - type: tag
687 key: RedshiftTag
688 value: "Redshift Tag Value"
689 """
691 concurrency = 2
692 batch_size = 5
693 permissions = ('redshift:CreateTags',)
695 def process_resource_set(self, client, resources, tags):
696 for rarn, r in zip(self.manager.get_arns(resources), resources):
697 client.create_tags(ResourceName=rarn, Tags=tags)
700@Redshift.action_registry.register('unmark')
701@Redshift.action_registry.register('remove-tag')
702class RemoveTag(tags.RemoveTag):
703 """Action to remove tag/tags from a redshift cluster
705 :example:
707 .. code-block:: yaml
709 policies:
710 - name: redshift-remove-tag
711 resource: redshift
712 filters:
713 - "tag:RedshiftTag": present
714 actions:
715 - type: remove-tag
716 tags: ["RedshiftTags"]
717 """
719 concurrency = 2
720 batch_size = 5
721 permissions = ('redshift:DeleteTags',)
723 def process_resource_set(self, client, resources, tag_keys):
724 for rarn, r in zip(self.manager.get_arns(resources), resources):
725 client.delete_tags(ResourceName=rarn, TagKeys=tag_keys)
728@Redshift.action_registry.register('tag-trim')
729class TagTrim(tags.TagTrim):
730 """Action to remove tags from a redshift cluster
732 This can be used to prevent reaching the ceiling limit of tags on a
733 resource
735 :example:
737 .. code-block:: yaml
739 policies:
740 - name: redshift-tag-trim
741 resource: redshift
742 filters:
743 - type: value
744 key: "length(Tags)"
745 op: ge
746 value: 10
747 actions:
748 - type: tag-trim
749 space: 1
750 preserve:
751 - RequiredTag1
752 - RequiredTag2
753 """
755 max_tag_count = 10
756 permissions = ('redshift:DeleteTags',)
758 def process_tag_removal(self, client, resource, candidates):
759 arn = self.manager.generate_arn(resource['DBInstanceIdentifier'])
760 client.delete_tags(ResourceName=arn, TagKeys=candidates)
763@Redshift.action_registry.register('modify-security-groups')
764class RedshiftModifyVpcSecurityGroups(ModifyVpcSecurityGroupsAction):
765 """Modify security groups on a Redshift cluster"""
767 permissions = ('redshift:ModifyCluster',)
769 def process(self, clusters):
770 client = local_session(self.manager.session_factory).client('redshift')
771 groups = super(
772 RedshiftModifyVpcSecurityGroups, self).get_groups(clusters)
774 for idx, c in enumerate(clusters):
775 client.modify_cluster(
776 ClusterIdentifier=c['ClusterIdentifier'],
777 VpcSecurityGroupIds=groups[idx])
780@resources.register('redshift-subnet-group')
781class RedshiftSubnetGroup(QueryResourceManager):
782 """Redshift subnet group."""
784 class resource_type(TypeInfo):
785 service = 'redshift'
786 arn_type = 'subnetgroup'
787 arn_separator = ':'
788 id = name = 'ClusterSubnetGroupName'
789 enum_spec = (
790 'describe_cluster_subnet_groups', 'ClusterSubnetGroups', None)
791 filter_name = 'ClusterSubnetGroupName'
792 filter_type = 'scalar'
793 cfn_type = config_type = "AWS::Redshift::ClusterSubnetGroup"
794 universal_taggable = object()
797@resources.register('redshift-snapshot')
798class RedshiftSnapshot(QueryResourceManager):
799 """Resource manager for Redshift snapshots.
800 """
802 class resource_type(TypeInfo):
803 service = 'redshift'
804 arn_type = 'snapshot'
805 arn_separator = ':'
806 enum_spec = ('describe_cluster_snapshots', 'Snapshots', None)
807 name = id = 'SnapshotIdentifier'
808 date = 'SnapshotCreateTime'
809 config_type = "AWS::Redshift::ClusterSnapshot"
810 universal_taggable = True
812 def get_arns(self, resources):
813 arns = []
814 for r in resources:
815 arns.append(self.generate_arn(r['ClusterIdentifier'] + '/' + r[self.get_model().id]))
816 return arns
819@RedshiftSnapshot.filter_registry.register('age')
820class RedshiftSnapshotAge(AgeFilter):
821 """Filters redshift snapshots based on age (in days)
823 :example:
825 .. code-block:: yaml
827 policies:
828 - name: redshift-old-snapshots
829 resource: redshift-snapshot
830 filters:
831 - type: age
832 days: 21
833 op: gt
834 """
836 schema = type_schema(
837 'age', days={'type': 'number'},
838 op={'$ref': '#/definitions/filters_common/comparison_operators'})
840 date_attribute = 'SnapshotCreateTime'
843@RedshiftSnapshot.filter_registry.register('cross-account')
844class RedshiftSnapshotCrossAccount(CrossAccountAccessFilter):
845 """Filter all accounts that allow access to non-whitelisted accounts
846 """
847 permissions = ('redshift:DescribeClusterSnapshots',)
848 schema = type_schema(
849 'cross-account',
850 whitelist={'type': 'array', 'items': {'type': 'string'}},
851 whitelist_from=ValuesFrom.schema)
853 def process(self, snapshots, event=None):
854 accounts = self.get_accounts()
855 snapshots = [s for s in snapshots if s.get('AccountsWithRestoreAccess')]
856 results = []
857 for s in snapshots:
858 s_accounts = {a.get('AccountId') for a in s[
859 'AccountsWithRestoreAccess']}
860 delta_accounts = s_accounts.difference(accounts)
861 if delta_accounts:
862 s['c7n:CrossAccountViolations'] = list(delta_accounts)
863 results.append(s)
864 return results
867@RedshiftSnapshot.action_registry.register('delete')
868class RedshiftSnapshotDelete(BaseAction):
869 """Filters redshift snapshots based on age (in days)
871 :example:
873 .. code-block:: yaml
875 policies:
876 - name: redshift-delete-old-snapshots
877 resource: redshift-snapshot
878 filters:
879 - type: age
880 days: 21
881 op: gt
882 actions:
883 - delete
884 """
886 schema = type_schema('delete')
887 permissions = ('redshift:DeleteClusterSnapshot',)
889 def process(self, snapshots):
890 self.log.info("Deleting %d Redshift snapshots", len(snapshots))
891 with self.executor_factory(max_workers=3) as w:
892 futures = []
893 for snapshot_set in chunks(reversed(snapshots), size=50):
894 futures.append(
895 w.submit(self.process_snapshot_set, snapshot_set))
896 for f in as_completed(futures):
897 if f.exception():
898 self.log.error(
899 "Exception deleting snapshot set \n %s",
900 f.exception())
901 return snapshots
903 def process_snapshot_set(self, snapshots_set):
904 c = local_session(self.manager.session_factory).client('redshift')
905 for s in snapshots_set:
906 c.delete_cluster_snapshot(
907 SnapshotIdentifier=s['SnapshotIdentifier'],
908 SnapshotClusterIdentifier=s['ClusterIdentifier'])
911@RedshiftSnapshot.action_registry.register('revoke-access')
912class RedshiftSnapshotRevokeAccess(BaseAction):
913 """Revokes ability of accounts to restore a snapshot
915 :example:
917 .. code-block:: yaml
919 policies:
920 - name: redshift-snapshot-revoke-access
921 resource: redshift-snapshot
922 filters:
923 - type: cross-account
924 whitelist:
925 - 012345678910
926 actions:
927 - type: revoke-access
928 """
929 permissions = ('redshift:RevokeSnapshotAccess',)
930 schema = type_schema('revoke-access')
932 def validate(self):
933 for f in self.manager.iter_filters():
934 if isinstance(f, RedshiftSnapshotCrossAccount):
935 return self
936 raise PolicyValidationError(
937 '`revoke-access` may only be used in '
938 'conjunction with `cross-account` filter on %s' % (self.manager.data,))
940 def process_snapshot_set(self, client, snapshot_set):
941 for s in snapshot_set:
942 for a in s.get('c7n:CrossAccountViolations', []):
943 try:
944 self.manager.retry(
945 client.revoke_snapshot_access,
946 SnapshotIdentifier=s['SnapshotIdentifier'],
947 AccountWithRestoreAccess=a)
948 except ClientError as e:
949 if e.response['Error']['Code'] == 'ClusterSnapshotNotFound':
950 continue
951 raise
953 def process(self, snapshots):
954 client = local_session(self.manager.session_factory).client('redshift')
955 with self.executor_factory(max_workers=2) as w:
956 futures = {}
957 for snapshot_set in chunks(snapshots, 25):
958 futures[w.submit(
959 self.process_snapshot_set, client, snapshot_set)
960 ] = snapshot_set
961 for f in as_completed(futures):
962 if f.exception():
963 self.log.exception(
964 'Exception while revoking access on %s: %s' % (
965 ', '.join(
966 [s['SnapshotIdentifier'] for s in futures[f]]),
967 f.exception()))
970@resources.register('redshift-reserved')
971class ReservedNode(QueryResourceManager):
973 class resource_type(TypeInfo):
974 service = 'redshift'
975 name = id = 'ReservedNodeId'
976 date = 'StartTime'
977 enum_spec = (
978 'describe_reserved_nodes', 'ReservedNodes', None)
979 filter_name = 'ReservedNodes'
980 filter_type = 'list'
981 arn_type = "reserved-nodes"
982 permissions_enum = ('redshift:DescribeReservedNodes',)
985@Redshift.filter_registry.register('consecutive-snapshots')
986class ClusterConsecutiveSnapshots(Filter):
987 """Returns Clusters where number of consective daily backups is
988 equal to/or greater than n days.
990 :example:
992 .. code-block:: yaml
994 policies:
995 - name: redshift-daily-snapshot-count
996 resource: redshift
997 filters:
998 - type: consecutive-snapshots
999 count: 7
1000 period: days
1001 status: available
1002 """
1003 schema = type_schema('consecutive-snapshots', count={'type': 'number', 'minimum': 1},
1004 period={'enum': ['hours', 'days', 'weeks']},
1005 status={'enum': ['available', 'creating', 'final snapshot', 'failed']},
1006 required=['count', 'period', 'status'])
1007 permissions = ('redshift:DescribeClusterSnapshots', 'redshift:DescribeClusters', )
1008 annotation = 'c7n:RedshiftSnapshots'
1010 def process_resource_set(self, client, resources, lbdate):
1011 paginator = client.get_paginator('describe_cluster_snapshots')
1012 paginator.PAGE_ITERATOR_CLS = RetryPageIterator
1013 rs_snapshots = paginator.paginate(StartTime=lbdate).build_full_result().get(
1014 'Snapshots', [])
1016 cluster_map = {}
1017 for snap in rs_snapshots:
1018 cluster_map.setdefault(snap['ClusterIdentifier'], []).append(snap)
1019 for r in resources:
1020 r[self.annotation] = cluster_map.get(r['ClusterIdentifier'], [])
1022 def get_date(self, time):
1023 period = self.data.get('period')
1024 if period == 'weeks':
1025 date = (datetime.utcnow() - timedelta(weeks=time)).strftime('%Y-%m-%d')
1026 elif period == 'hours':
1027 date = (datetime.utcnow() - timedelta(hours=time)).strftime('%Y-%m-%d-%H')
1028 else:
1029 date = (datetime.utcnow() - timedelta(days=time)).strftime('%Y-%m-%d')
1030 return date
1032 def process(self, resources, event=None):
1033 client = local_session(self.manager.session_factory).client('redshift')
1034 results = []
1035 retention = self.data.get('count')
1036 lbdate = self.get_date(retention)
1037 expected_dates = set()
1038 for time in range(1, retention + 1):
1039 expected_dates.add(self.get_date(time))
1041 for resource_set in chunks(
1042 [r for r in resources if self.annotation not in r], 50):
1043 self.process_resource_set(client, resource_set, lbdate)
1045 for r in resources:
1046 snapshot_dates = set()
1047 for snapshot in r[self.annotation]:
1048 if snapshot['Status'] == self.data.get('status'):
1049 if self.data.get('period') == 'hours':
1050 snapshot_dates.add(snapshot['SnapshotCreateTime'].strftime('%Y-%m-%d-%H'))
1051 else:
1052 snapshot_dates.add(snapshot['SnapshotCreateTime'].strftime('%Y-%m-%d'))
1053 if expected_dates.issubset(snapshot_dates):
1054 results.append(r)
1055 return results