Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/c7n/resources/elb.py: 43%
337 statements
« prev ^ index » next coverage.py v7.3.2, created at 2023-12-08 06:51 +0000
« prev ^ index » next coverage.py v7.3.2, created at 2023-12-08 06:51 +0000
1# Copyright The Cloud Custodian Authors.
2# SPDX-License-Identifier: Apache-2.0
3"""
4Elastic Load Balancers
5"""
6from concurrent.futures import as_completed
7import re
9from botocore.exceptions import ClientError
11from c7n.actions import ActionRegistry, BaseAction, ModifyVpcSecurityGroupsAction
12from c7n.exceptions import PolicyValidationError
13from c7n.filters import Filter, FilterRegistry, ValueFilter, ShieldMetrics
14import c7n.filters.vpc as net_filters
15from datetime import datetime
16from c7n import tags
17from c7n.manager import resources
18from c7n.query import ConfigSource, QueryResourceManager, DescribeSource, TypeInfo
19from c7n.utils import local_session, chunks, type_schema
21from c7n.resources.shield import IsShieldProtected, SetShieldProtection
24filters = FilterRegistry('elb.filters')
25actions = ActionRegistry('elb.actions')
27filters.register('tag-count', tags.TagCountFilter)
28filters.register('marked-for-op', tags.TagActionFilter)
29filters.register('shield-enabled', IsShieldProtected)
30filters.register('shield-metrics', ShieldMetrics)
33class DescribeELB(DescribeSource):
35 def augment(self, resources):
36 return tags.universal_augment(self.manager, resources)
39@resources.register('elb')
40class ELB(QueryResourceManager):
42 class resource_type(TypeInfo):
43 service = 'elb'
44 arn_type = 'loadbalancer'
45 permission_prefix = arn_service = 'elasticloadbalancing'
46 enum_spec = ('describe_load_balancers',
47 'LoadBalancerDescriptions', None)
48 id = 'LoadBalancerName'
49 filter_name = 'LoadBalancerNames'
50 filter_type = 'list'
51 name = 'DNSName'
52 date = 'CreatedTime'
53 dimension = 'LoadBalancerName'
54 cfn_type = config_type = "AWS::ElasticLoadBalancing::LoadBalancer"
55 default_report_fields = (
56 'LoadBalancerName',
57 'DNSName',
58 'VPCId',
59 'count:Instances',
60 'list:ListenerDescriptions[].Listener.LoadBalancerPort')
62 filter_registry = filters
63 action_registry = actions
64 source_mapping = {
65 'describe': DescribeELB,
66 'config': ConfigSource
67 }
69 @classmethod
70 def get_permissions(cls):
71 return ('elasticloadbalancing:DescribeLoadBalancers',
72 'elasticloadbalancing:DescribeLoadBalancerAttributes',
73 'elasticloadbalancing:DescribeTags')
76@actions.register('set-shield')
77class SetELBShieldProtection(SetShieldProtection):
79 def clear_stale(self, client, protections):
80 # elbs arns need extra discrimination to distinguish
81 # from app load balancer arns. See
82 # https://docs.aws.amazon.com/general/latest/gr/aws-arns-and-namespaces.html#arn-syntax-elb-application
83 super(SetELBShieldProtection, self).clear_stale(
84 client,
85 [p for p in protections if p['ResourceArn'].count('/') == 1])
88@actions.register('mark-for-op')
89class TagDelayedAction(tags.TagDelayedAction):
90 """Action to specify an action to occur at a later date
92 :example:
94 .. code-block:: yaml
96 policies:
97 - name: mark-elb-delete-unused
98 resource: elb
99 filters:
100 - "tag:custodian_cleanup": absent
101 - Instances: []
102 actions:
103 - type: mark-for-op
104 tag: custodian_cleanup
105 msg: "Unused ELB - No Instances: {op}@{action_date}"
106 op: delete
107 days: 7
108 """
111@actions.register('tag')
112class Tag(tags.Tag):
113 """Action to add tag(s) to ELB(s)
115 :example:
117 .. code-block:: yaml
119 policies:
120 - name: elb-add-owner-tag
121 resource: elb
122 filters:
123 - "tag:OwnerName": missing
124 actions:
125 - type: tag
126 key: OwnerName
127 value: OwnerName
128 """
130 batch_size = 1
131 permissions = ('elasticloadbalancing:AddTags',)
133 def process_resource_set(self, client, resource_set, tags):
134 for r in resource_set:
135 client.add_tags(
136 LoadBalancerNames=[r['LoadBalancerName'] for r in resource_set],
137 Tags=tags)
140@actions.register('remove-tag')
141class RemoveTag(tags.RemoveTag):
142 """Action to remove tag(s) from ELB(s)
144 :example:
146 .. code-block:: yaml
148 policies:
149 - name: elb-remove-old-tag
150 resource: elb
151 filters:
152 - "tag:OldTagKey": present
153 actions:
154 - type: remove-tag
155 tags: [OldTagKey1, OldTagKey2]
156 """
158 batch_size = 1
159 permissions = ('elasticloadbalancing:RemoveTags',)
161 def process_resource_set(self, client, resource_set, tag_keys):
162 client.remove_tags(
163 LoadBalancerNames=[r['LoadBalancerName'] for r in resource_set],
164 Tags=[{'Key': k} for k in tag_keys])
167@actions.register('delete')
168class Delete(BaseAction):
169 """Action to delete ELB(s)
171 It is recommended to apply a filter to the delete policy to avoid unwanted
172 deletion of any load balancers.
174 :example:
176 .. code-block:: yaml
178 policies:
179 - name: elb-delete-unused
180 resource: elb
181 filters:
182 - Instances: []
183 actions:
184 - delete
185 """
187 schema = type_schema('delete')
188 permissions = ('elasticloadbalancing:DeleteLoadBalancer',)
190 def process(self, load_balancers):
191 client = local_session(self.manager.session_factory).client('elb')
192 for elb in load_balancers:
193 self.manager.retry(
194 client.delete_load_balancer, LoadBalancerName=elb['LoadBalancerName'])
197@actions.register('set-ssl-listener-policy')
198class SetSslListenerPolicy(BaseAction):
199 """Action to set the ELB SSL listener policy
201 :example:
203 .. code-block:: yaml
205 policies:
206 - name: elb-set-listener-custom-policy
207 resource: elb
208 actions:
209 - type: set-ssl-listener-policy
210 name: SSLNegotiation-Custom-Policy-01
211 attributes:
212 - Protocol-SSLv3
213 - Protocol-TLSv1.1
214 - DHE-RSA-AES256-SHA256
217 Alternatively, you can specify one of AWS recommended policies
218 (https://docs.aws.amazon.com/elasticloadbalancing/latest/classic/elb-security-policy-table.html)
219 by specifying an attribute where key=Reference-Security-Policy
220 and value=name of the predefined policy. For example:
222 .. code-block:: yaml
224 policies:
225 - name: elb-set-listener-predefined-policy
226 resource: elb
227 actions:
228 - type: set-ssl-listener-policy
229 name: SSLNegotiation-Predefined-Policy-01
230 attributes:
231 Reference-Security-Policy: ELBSecurityPolicy-TLS-1-2-2017-01
232 """
234 schema = type_schema(
235 'set-ssl-listener-policy',
236 name={'type': 'string'},
237 attributes={'anyOf': [{'type': 'object'}, {'type': 'array', 'items': {'type': 'string'}}]},
238 required=['name', 'attributes'])
240 permissions = (
241 'elasticloadbalancing:CreateLoadBalancerPolicy',
242 'elasticloadbalancing:SetLoadBalancerPoliciesOfListener')
244 def process(self, load_balancers):
245 client = local_session(self.manager.session_factory).client('elb')
246 rid = self.manager.resource_type.id
247 error = None
249 with self.executor_factory(max_workers=2) as w:
250 futures = {}
251 for lb in load_balancers:
252 futures[w.submit(self.process_elb, client, lb)] = lb
254 for f in as_completed(futures):
255 if f.exception():
256 self.log.error(
257 "set-ssl-listener-policy error on lb:%s error:%s",
258 futures[f][rid], f.exception())
259 error = f.exception()
261 if error is not None:
262 raise error
264 def process_elb(self, client, elb):
265 if not is_ssl(elb):
266 return
268 # Create a custom policy with epoch timestamp.
269 # to make it unique within the
270 # set of policies for this load balancer.
271 policy_name = self.data.get('name') + '-' + \
272 str(int(datetime.utcnow().timestamp() * 1000))
273 lb_name = elb['LoadBalancerName']
274 attrs = self.data.get('attributes')
276 if isinstance(attrs, dict):
277 policy_attributes = [{'AttributeName': name, 'AttributeValue': value}
278 for name, value in attrs.items()]
279 else:
280 policy_attributes = [{'AttributeName': attr, 'AttributeValue': 'true'}
281 for attr in attrs]
283 try:
284 client.create_load_balancer_policy(
285 LoadBalancerName=lb_name,
286 PolicyName=policy_name,
287 PolicyTypeName='SSLNegotiationPolicyType',
288 PolicyAttributes=policy_attributes)
289 except ClientError as e:
290 if e.response['Error']['Code'] not in (
291 'DuplicatePolicyName', 'DuplicatePolicyNameException',
292 'DuplicationPolicyNameException'):
293 raise
295 # Apply it to all SSL listeners.
296 ssl_policies = ()
297 if 'c7n.ssl-policies' in elb:
298 ssl_policies = elb['c7n.ssl-policies']
300 for ld in elb['ListenerDescriptions']:
301 if ld['Listener']['Protocol'] in ('HTTPS', 'SSL'):
302 policy_names = [policy_name]
303 # Preserve extant non-ssl listener policies
304 policy_names.extend(ld.get('PolicyNames', ()))
305 # Remove extant ssl listener policy
306 if ssl_policies:
307 policy_names = list(set(policy_names).difference(ssl_policies))
308 client.set_load_balancer_policies_of_listener(
309 LoadBalancerName=lb_name,
310 LoadBalancerPort=ld['Listener']['LoadBalancerPort'],
311 PolicyNames=policy_names)
314@actions.register('modify-security-groups')
315class ELBModifyVpcSecurityGroups(ModifyVpcSecurityGroupsAction):
316 """Modify VPC security groups on an ELB."""
318 permissions = ('elasticloadbalancing:ApplySecurityGroupsToLoadBalancer',)
320 def process(self, load_balancers):
321 client = local_session(self.manager.session_factory).client('elb')
322 groups = super(ELBModifyVpcSecurityGroups, self).get_groups(
323 load_balancers)
324 for idx, lb in enumerate(load_balancers):
325 client.apply_security_groups_to_load_balancer(
326 LoadBalancerName=lb['LoadBalancerName'],
327 SecurityGroups=groups[idx])
330@actions.register('enable-s3-logging')
331class EnableS3Logging(BaseAction):
332 """Action to enable S3 logging for Elastic Load Balancers.
334 :example:
336 .. code-block:: yaml
338 policies:
339 - name: elb-test
340 resource: elb
341 filters:
342 - type: is-not-logging
343 actions:
344 - type: enable-s3-logging
345 bucket: elblogtest
346 prefix: dahlogs
347 emit_interval: 5
348 """
349 schema = type_schema('enable-s3-logging',
350 bucket={'type': 'string'},
351 prefix={'type': 'string'},
352 emit_interval={'type': 'integer'},
353 )
354 permissions = ("elasticloadbalancing:ModifyLoadBalancerAttributes",)
356 def process(self, resources):
357 client = local_session(self.manager.session_factory).client('elb')
358 for elb in resources:
359 elb_name = elb['LoadBalancerName']
360 log_attrs = {'Enabled': True}
361 if 'bucket' in self.data:
362 log_attrs['S3BucketName'] = self.data['bucket']
363 if 'prefix' in self.data:
364 log_attrs['S3BucketPrefix'] = self.data['prefix']
365 if 'emit_interval' in self.data:
366 log_attrs['EmitInterval'] = self.data['emit_interval']
368 client.modify_load_balancer_attributes(LoadBalancerName=elb_name,
369 LoadBalancerAttributes={
370 'AccessLog': log_attrs
371 })
372 return resources
375@actions.register('disable-s3-logging')
376class DisableS3Logging(BaseAction):
377 """Disable s3 logging for ElasticLoadBalancers.
379 :example:
381 .. code-block:: yaml
383 policies:
384 - name: turn-off-elb-logs
385 resource: elb
386 filters:
387 - type: is-logging
388 bucket: prodbucket
389 actions:
390 - type: disable-s3-logging
391 """
392 schema = type_schema('disable-s3-logging')
393 permissions = ("elasticloadbalancing:ModifyLoadBalancerAttributes",)
395 def process(self, resources):
396 client = local_session(self.manager.session_factory).client('elb')
397 for elb in resources:
398 elb_name = elb['LoadBalancerName']
399 client.modify_load_balancer_attributes(LoadBalancerName=elb_name,
400 LoadBalancerAttributes={
401 'AccessLog': {
402 'Enabled': False}
403 })
404 return resources
407def is_ssl(b):
408 for ld in b['ListenerDescriptions']:
409 if ld['Listener']['Protocol'] in ('HTTPS', 'SSL'):
410 return True
411 return False
414@filters.register('security-group')
415class SecurityGroupFilter(net_filters.SecurityGroupFilter):
416 """ELB security group filter"""
418 RelatedIdsExpression = "SecurityGroups[]"
421@filters.register('subnet')
422class SubnetFilter(net_filters.SubnetFilter):
423 """ELB subnet filter"""
425 RelatedIdsExpression = "Subnets[]"
428@filters.register('vpc')
429class VpcFilter(net_filters.VpcFilter):
430 """ELB vpc filter"""
432 RelatedIdsExpression = "VPCId"
435filters.register('network-location', net_filters.NetworkLocation)
438@filters.register('instance')
439class Instance(ValueFilter):
440 """Filter ELB by an associated instance value(s)
442 :example:
444 .. code-block:: yaml
446 policies:
447 - name: elb-image-filter
448 resource: elb
449 filters:
450 - type: instance
451 key: ImageId
452 value: ami-01ab23cd
453 """
455 schema = type_schema('instance', rinherit=ValueFilter.schema)
456 schema_alias = False
457 annotate = False
459 def get_permissions(self):
460 return self.manager.get_resource_manager('ec2').get_permissions()
462 def process(self, resources, event=None):
463 self.elb_instances = {}
464 instances = []
465 for r in resources:
466 instances.extend([i['InstanceId'] for i in r['Instances']])
467 for i in self.manager.get_resource_manager(
468 'ec2').get_resources(list(instances)):
469 self.elb_instances[i['InstanceId']] = i
470 return super(Instance, self).process(resources, event)
472 def __call__(self, elb):
473 matched = []
474 for i in elb['Instances']:
475 instance = self.elb_instances[i['InstanceId']]
476 if self.match(instance):
477 matched.append(instance)
478 if not matched:
479 return False
480 elb['c7n:MatchedInstances'] = matched
481 return True
484@filters.register('is-ssl')
485class IsSSLFilter(Filter):
486 """Filters ELB that are using a SSL policy
488 :example:
490 .. code-block:: yaml
492 policies:
493 - name: elb-using-ssl
494 resource: elb
495 filters:
496 - type: is-ssl
497 """
499 schema = type_schema('is-ssl')
501 def process(self, balancers, event=None):
502 return [b for b in balancers if is_ssl(b)]
505@filters.register('ssl-policy')
506class SSLPolicyFilter(Filter):
507 """Filter ELBs on the properties of SSLNegotation policies.
508 TODO: Only works on custom policies at the moment.
510 whitelist: filter all policies containing permitted protocols
511 blacklist: filter all policies containing forbidden protocols
513 Cannot specify both whitelist & blacklist in the same policy. These must
514 be done seperately (seperate policy statements).
516 Likewise, if you want to reduce the consideration set such that we only
517 compare certain keys (e.g. you only want to compare the `Protocol-` keys),
518 you can use the `matching` option with a regular expression:
520 :example:
522 .. code-block:: yaml
524 policies:
525 - name: elb-ssl-policies
526 resource: elb
527 filters:
528 - type: ssl-policy
529 blacklist:
530 - "Protocol-SSLv2"
531 - "Protocol-SSLv3"
532 - name: elb-modern-tls
533 resource: elb
534 filters:
535 - type: ssl-policy
536 matching: "^Protocol-"
537 whitelist:
538 - "Protocol-TLSv1.1"
539 - "Protocol-TLSv1.2"
540 """
542 schema = {
543 'type': 'object',
544 'additionalProperties': False,
545 'oneOf': [
546 {'required': ['type', 'whitelist']},
547 {'required': ['type', 'blacklist']}
548 ],
549 'properties': {
550 'type': {'enum': ['ssl-policy']},
551 'matching': {'type': 'string'},
552 'whitelist': {'type': 'array', 'items': {'type': 'string'}},
553 'blacklist': {'type': 'array', 'items': {'type': 'string'}}
554 }
555 }
556 permissions = ("elasticloadbalancing:DescribeLoadBalancerPolicies",)
558 def validate(self):
559 if 'whitelist' in self.data and 'blacklist' in self.data:
560 raise PolicyValidationError(
561 "cannot specify whitelist and black list on %s" % (
562 self.manager.data,))
563 if 'whitelist' not in self.data and 'blacklist' not in self.data:
564 raise PolicyValidationError(
565 "must specify either policy blacklist or whitelist on %s" % (
566 self.manager.data,))
567 if ('blacklist' in self.data and
568 not isinstance(self.data['blacklist'], list)):
569 raise PolicyValidationError("blacklist must be a list on %s" % (
570 self.manager.data,))
572 if 'matching' in self.data:
573 # Sanity check that we can compile
574 try:
575 re.compile(self.data['matching'])
576 except re.error as e:
577 raise PolicyValidationError(
578 "Invalid regex: %s %s" % (e, self.manager.data))
580 return self
582 def process(self, balancers, event=None):
583 balancers = [b for b in balancers if is_ssl(b)]
584 active_policy_attribute_tuples = (
585 self.create_elb_active_policy_attribute_tuples(balancers))
587 whitelist = set(self.data.get('whitelist', []))
588 blacklist = set(self.data.get('blacklist', []))
590 invalid_elbs = []
592 if 'matching' in self.data:
593 regex = self.data.get('matching')
594 filtered_pairs = []
595 for (elb, active_policies) in active_policy_attribute_tuples:
596 filtered_policies = [policy for policy in active_policies if
597 bool(re.match(regex, policy, flags=re.IGNORECASE))]
598 if filtered_policies:
599 filtered_pairs.append((elb, filtered_policies))
600 active_policy_attribute_tuples = filtered_pairs
602 if blacklist:
603 for elb, active_policies in active_policy_attribute_tuples:
604 if len(blacklist.intersection(active_policies)) > 0:
605 elb["ProhibitedPolicies"] = list(
606 blacklist.intersection(active_policies))
607 invalid_elbs.append(elb)
608 elif whitelist:
609 for elb, active_policies in active_policy_attribute_tuples:
610 if len(set(active_policies).difference(whitelist)) > 0:
611 elb["ProhibitedPolicies"] = list(
612 set(active_policies).difference(whitelist))
613 invalid_elbs.append(elb)
614 return invalid_elbs
616 def create_elb_active_policy_attribute_tuples(self, elbs):
617 """
618 Returns a list of tuples of active SSL policies attributes
619 for each elb [(elb['Protocol-SSLv1','Protocol-SSLv2',...])]
620 """
622 elb_custom_policy_tuples = self.create_elb_custom_policy_tuples(elbs)
624 active_policy_attribute_tuples = (
625 self.create_elb_active_attributes_tuples(elb_custom_policy_tuples))
627 return active_policy_attribute_tuples
629 def create_elb_custom_policy_tuples(self, balancers):
630 """
631 creates a list of tuples (elb,[sslpolicy1,sslpolicy2...])
632 for all custom policies on the ELB
633 """
634 elb_policy_tuples = []
635 for b in balancers:
636 policies = []
637 for ld in b['ListenerDescriptions']:
638 for p in ld['PolicyNames']:
639 policies.append(p)
640 elb_policy_tuples.append((b, policies))
642 return elb_policy_tuples
644 def create_elb_active_attributes_tuples(self, elb_policy_tuples):
645 """
646 creates a list of tuples for all attributes that are marked
647 as "true" in the load balancer's polices, e.g.
648 (myelb,['Protocol-SSLv1','Protocol-SSLv2'])
649 """
650 active_policy_attribute_tuples = []
651 client = local_session(self.manager.session_factory).client('elb')
652 with self.executor_factory(max_workers=2) as w:
653 futures = []
654 for elb_policy_set in chunks(elb_policy_tuples, 50):
655 futures.append(
656 w.submit(self.process_elb_policy_set, client, elb_policy_set))
658 for f in as_completed(futures):
659 if f.exception():
660 self.log.error(
661 "Exception processing elb policies \n %s" % (
662 f.exception()))
663 continue
664 for elb_policies in f.result():
665 active_policy_attribute_tuples.append(elb_policies)
667 return active_policy_attribute_tuples
669 def process_elb_policy_set(self, client, elb_policy_set):
670 results = []
672 for (elb, policy_names) in elb_policy_set:
673 elb_name = elb['LoadBalancerName']
674 try:
675 policies = client.describe_load_balancer_policies(
676 LoadBalancerName=elb_name,
677 PolicyNames=policy_names)['PolicyDescriptions']
678 except ClientError as e:
679 if e.response['Error']['Code'] in [
680 'LoadBalancerNotFound', 'PolicyNotFound']:
681 continue
682 raise
683 active_lb_policies = []
684 ssl_policies = []
685 for p in policies:
686 if p['PolicyTypeName'] != 'SSLNegotiationPolicyType':
687 continue
688 ssl_policies.append(p['PolicyName'])
689 active_lb_policies.extend(
690 [policy_description['AttributeName']
691 for policy_description in
692 p['PolicyAttributeDescriptions']
693 if policy_description['AttributeValue'] == 'true']
694 )
695 elb['c7n.ssl-policies'] = ssl_policies
696 results.append((elb, active_lb_policies))
698 return results
701@filters.register('healthcheck-protocol-mismatch')
702class HealthCheckProtocolMismatch(Filter):
703 """Filters ELB that have a health check protocol mismatch
705 The mismatch occurs if the ELB has a different protocol to check than
706 the associated instances allow to determine health status.
708 :example:
710 .. code-block:: yaml
712 policies:
713 - name: elb-healthcheck-mismatch
714 resource: elb
715 filters:
716 - type: healthcheck-protocol-mismatch
717 """
719 schema = type_schema('healthcheck-protocol-mismatch')
721 def __call__(self, load_balancer):
722 health_check_protocol = (
723 load_balancer['HealthCheck']['Target'].split(':')[0])
724 listener_descriptions = load_balancer['ListenerDescriptions']
726 if len(listener_descriptions) == 0:
727 return True
729 # check if any of the protocols in the ELB match the health
730 # check. There is only 1 health check, so if there are
731 # multiple listeners, we only check if at least one of them
732 # matches
733 protocols = [listener['Listener']['InstanceProtocol']
734 for listener in listener_descriptions]
735 return health_check_protocol in protocols
738@filters.register('default-vpc')
739class DefaultVpc(net_filters.DefaultVpcBase):
740 """ Matches if an elb database is in the default vpc
742 :example:
744 .. code-block:: yaml
746 policies:
747 - name: elb-default-vpc
748 resource: elb
749 filters:
750 - type: default-vpc
751 """
753 schema = type_schema('default-vpc')
755 def __call__(self, elb):
756 return elb.get('VPCId') and self.match(elb.get('VPCId')) or False
759class ELBAttributeFilterBase:
760 """ Mixin base class for filters that query LB attributes.
761 """
763 def initialize(self, elbs):
764 client = local_session(
765 self.manager.session_factory).client('elb')
767 def _process_attributes(elb):
768 if 'Attributes' not in elb:
769 results = client.describe_load_balancer_attributes(
770 LoadBalancerName=elb['LoadBalancerName'])
771 elb['Attributes'] = results['LoadBalancerAttributes']
773 with self.manager.executor_factory(max_workers=2) as w:
774 list(w.map(_process_attributes, elbs))
777@filters.register('is-logging')
778class IsLoggingFilter(Filter, ELBAttributeFilterBase):
779 """Matches ELBs that are logging to S3.
780 bucket and prefix are optional
782 :example:
784 .. code-block:: yaml
786 policies:
787 - name: elb-is-logging-test
788 resource: elb
789 filters:
790 - type: is-logging
792 - name: elb-is-logging-bucket-and-prefix-test
793 resource: elb
794 filters:
795 - type: is-logging
796 bucket: prodlogs
797 prefix: elblogs
798 """
800 permissions = ("elasticloadbalancing:DescribeLoadBalancerAttributes",)
801 schema = type_schema('is-logging',
802 bucket={'type': 'string'},
803 prefix={'type': 'string'}
804 )
806 def process(self, resources, event=None):
807 self.initialize(resources)
808 bucket_name = self.data.get('bucket', None)
809 bucket_prefix = self.data.get('prefix', None)
811 return [elb for elb in resources
812 if elb['Attributes']['AccessLog']['Enabled'] and
813 (not bucket_name or bucket_name == elb['Attributes'][
814 'AccessLog'].get('S3BucketName', None)) and
815 (not bucket_prefix or bucket_prefix == elb['Attributes'][
816 'AccessLog'].get('S3BucketPrefix', None))
817 ]
820@filters.register('is-not-logging')
821class IsNotLoggingFilter(Filter, ELBAttributeFilterBase):
822 """ Matches ELBs that are NOT logging to S3.
823 or do not match the optional bucket and/or prefix.
825 :example:
827 .. code-block:: yaml
829 policies:
830 - name: elb-is-not-logging-test
831 resource: elb
832 filters:
833 - type: is-not-logging
835 - name: is-not-logging-bucket-and-prefix-test
836 resource: app-elb
837 filters:
838 - type: is-not-logging
839 bucket: prodlogs
840 prefix: alblogs
842 """
843 permissions = ("elasticloadbalancing:DescribeLoadBalancerAttributes",)
844 schema = type_schema('is-not-logging',
845 bucket={'type': 'string'},
846 prefix={'type': 'string'}
847 )
849 def process(self, resources, event=None):
850 self.initialize(resources)
851 bucket_name = self.data.get('bucket', None)
852 bucket_prefix = self.data.get('prefix', None)
854 return [elb for elb in resources
855 if not elb['Attributes']['AccessLog']['Enabled'] or
856 (bucket_name and bucket_name != elb['Attributes'][
857 'AccessLog'].get(
858 'S3BucketName', None)) or
859 (bucket_prefix and bucket_prefix != elb['Attributes'][
860 'AccessLog'].get(
861 'S3BucketPrefix', None))
862 ]
865@filters.register('attributes')
866class CheckAttributes(ValueFilter, ELBAttributeFilterBase):
867 """Value Filter that allows filtering on ELB attributes
869 :example:
871 .. code-block:: yaml
873 policies:
874 - name: elb-is-connection-draining
875 resource: elb
876 filters:
877 - type: attributes
878 key: ConnectionDraining.Enabled
879 value: true
880 op: eq
882 """
883 annotate = False # no annotation from value filter
884 permissions = ("elasticloadbalancing:DescribeLoadBalancerAttributes",)
885 schema = type_schema('attributes', rinherit=ValueFilter.schema)
886 schema_alias = False
888 def process(self, resources, event=None):
889 self.augment(resources)
890 return super().process(resources, event)
892 def augment(self, resources):
893 self.initialize(resources)
895 def __call__(self, r):
896 return super().__call__(r['Attributes'])