Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/c7n/resources/elb.py: 43%
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1# Copyright The Cloud Custodian Authors.
2# SPDX-License-Identifier: Apache-2.0
3"""
4Elastic Load Balancers
5"""
6from concurrent.futures import as_completed
7import re
9from botocore.exceptions import ClientError
11from c7n.actions import ActionRegistry, BaseAction, ModifyVpcSecurityGroupsAction
12from c7n.exceptions import PolicyValidationError
13from c7n.filters import Filter, FilterRegistry, ValueFilter, ShieldMetrics
14import c7n.filters.vpc as net_filters
15from datetime import datetime
16from c7n import tags
17from c7n.manager import resources
18from c7n.query import ConfigSource, QueryResourceManager, DescribeSource, TypeInfo
19from c7n.utils import local_session, chunks, type_schema
21from c7n.resources.shield import IsShieldProtected, SetShieldProtection
24filters = FilterRegistry('elb.filters')
25actions = ActionRegistry('elb.actions')
27filters.register('tag-count', tags.TagCountFilter)
28filters.register('marked-for-op', tags.TagActionFilter)
29filters.register('shield-enabled', IsShieldProtected)
30filters.register('shield-metrics', ShieldMetrics)
33class DescribeELB(DescribeSource):
35 def augment(self, resources):
36 return tags.universal_augment(self.manager, resources)
39@resources.register('elb')
40class ELB(QueryResourceManager):
42 class resource_type(TypeInfo):
43 service = 'elb'
44 arn_type = 'loadbalancer'
45 permission_prefix = arn_service = 'elasticloadbalancing'
46 enum_spec = ('describe_load_balancers',
47 'LoadBalancerDescriptions', None)
48 id = 'LoadBalancerName'
49 filter_name = 'LoadBalancerNames'
50 filter_type = 'list'
51 name = 'DNSName'
52 date = 'CreatedTime'
53 dimension = 'LoadBalancerName'
54 cfn_type = config_type = "AWS::ElasticLoadBalancing::LoadBalancer"
55 permissions_augment = ("elasticloadbalancing:DescribeTags",)
56 default_report_fields = (
57 'LoadBalancerName',
58 'DNSName',
59 'VPCId',
60 'count:Instances',
61 'list:ListenerDescriptions[].Listener.LoadBalancerPort')
63 filter_registry = filters
64 action_registry = actions
65 source_mapping = {
66 'describe': DescribeELB,
67 'config': ConfigSource
68 }
70 @classmethod
71 def get_permissions(cls):
72 return ('elasticloadbalancing:DescribeLoadBalancers',
73 'elasticloadbalancing:DescribeLoadBalancerAttributes',
74 'elasticloadbalancing:DescribeTags')
77@actions.register('set-shield')
78class SetELBShieldProtection(SetShieldProtection):
80 def clear_stale(self, client, protections):
81 # elbs arns need extra discrimination to distinguish
82 # from app load balancer arns. See
83 # https://docs.aws.amazon.com/general/latest/gr/aws-arns-and-namespaces.html#arn-syntax-elb-application
84 super(SetELBShieldProtection, self).clear_stale(
85 client,
86 [p for p in protections if p['ResourceArn'].count('/') == 1])
89@actions.register('mark-for-op')
90class TagDelayedAction(tags.TagDelayedAction):
91 """Action to specify an action to occur at a later date
93 :example:
95 .. code-block:: yaml
97 policies:
98 - name: mark-elb-delete-unused
99 resource: elb
100 filters:
101 - "tag:custodian_cleanup": absent
102 - Instances: []
103 actions:
104 - type: mark-for-op
105 tag: custodian_cleanup
106 msg: "Unused ELB - No Instances: {op}@{action_date}"
107 op: delete
108 days: 7
109 """
112@actions.register('tag')
113class Tag(tags.Tag):
114 """Action to add tag(s) to ELB(s)
116 :example:
118 .. code-block:: yaml
120 policies:
121 - name: elb-add-owner-tag
122 resource: elb
123 filters:
124 - "tag:OwnerName": missing
125 actions:
126 - type: tag
127 key: OwnerName
128 value: OwnerName
129 """
131 batch_size = 1
132 permissions = ('elasticloadbalancing:AddTags',)
134 def process_resource_set(self, client, resource_set, tags):
135 for r in resource_set:
136 client.add_tags(
137 LoadBalancerNames=[r['LoadBalancerName'] for r in resource_set],
138 Tags=tags)
141@actions.register('remove-tag')
142class RemoveTag(tags.RemoveTag):
143 """Action to remove tag(s) from ELB(s)
145 :example:
147 .. code-block:: yaml
149 policies:
150 - name: elb-remove-old-tag
151 resource: elb
152 filters:
153 - "tag:OldTagKey": present
154 actions:
155 - type: remove-tag
156 tags: [OldTagKey1, OldTagKey2]
157 """
159 batch_size = 1
160 permissions = ('elasticloadbalancing:RemoveTags',)
162 def process_resource_set(self, client, resource_set, tag_keys):
163 client.remove_tags(
164 LoadBalancerNames=[r['LoadBalancerName'] for r in resource_set],
165 Tags=[{'Key': k} for k in tag_keys])
168@actions.register('delete')
169class Delete(BaseAction):
170 """Action to delete ELB(s)
172 It is recommended to apply a filter to the delete policy to avoid unwanted
173 deletion of any load balancers.
175 :example:
177 .. code-block:: yaml
179 policies:
180 - name: elb-delete-unused
181 resource: elb
182 filters:
183 - Instances: []
184 actions:
185 - delete
186 """
188 schema = type_schema('delete')
189 permissions = ('elasticloadbalancing:DeleteLoadBalancer',)
191 def process(self, load_balancers):
192 client = local_session(self.manager.session_factory).client('elb')
193 for elb in load_balancers:
194 self.manager.retry(
195 client.delete_load_balancer, LoadBalancerName=elb['LoadBalancerName'])
198@actions.register('set-ssl-listener-policy')
199class SetSslListenerPolicy(BaseAction):
200 """Action to set the ELB SSL listener policy
202 :example:
204 .. code-block:: yaml
206 policies:
207 - name: elb-set-listener-custom-policy
208 resource: elb
209 actions:
210 - type: set-ssl-listener-policy
211 name: SSLNegotiation-Custom-Policy-01
212 attributes:
213 - Protocol-SSLv3
214 - Protocol-TLSv1.1
215 - DHE-RSA-AES256-SHA256
218 Alternatively, you can specify one of AWS recommended policies
219 (https://docs.aws.amazon.com/elasticloadbalancing/latest/classic/elb-security-policy-table.html)
220 by specifying an attribute where key=Reference-Security-Policy
221 and value=name of the predefined policy. For example:
223 .. code-block:: yaml
225 policies:
226 - name: elb-set-listener-predefined-policy
227 resource: elb
228 actions:
229 - type: set-ssl-listener-policy
230 name: SSLNegotiation-Predefined-Policy-01
231 attributes:
232 Reference-Security-Policy: ELBSecurityPolicy-TLS-1-2-2017-01
233 """
235 schema = type_schema(
236 'set-ssl-listener-policy',
237 name={'type': 'string'},
238 attributes={'anyOf': [{'type': 'object'}, {'type': 'array', 'items': {'type': 'string'}}]},
239 required=['name', 'attributes'])
241 permissions = (
242 'elasticloadbalancing:CreateLoadBalancerPolicy',
243 'elasticloadbalancing:SetLoadBalancerPoliciesOfListener')
245 def process(self, load_balancers):
246 client = local_session(self.manager.session_factory).client('elb')
247 rid = self.manager.resource_type.id
248 error = None
250 with self.executor_factory(max_workers=2) as w:
251 futures = {}
252 for lb in load_balancers:
253 futures[w.submit(self.process_elb, client, lb)] = lb
255 for f in as_completed(futures):
256 if f.exception():
257 self.log.error(
258 "set-ssl-listener-policy error on lb:%s error:%s",
259 futures[f][rid], f.exception())
260 error = f.exception()
262 if error is not None:
263 raise error
265 def process_elb(self, client, elb):
266 if not is_ssl(elb):
267 return
269 # Create a custom policy with epoch timestamp.
270 # to make it unique within the
271 # set of policies for this load balancer.
272 policy_name = self.data.get('name') + '-' + \
273 str(int(datetime.utcnow().timestamp() * 1000))
274 lb_name = elb['LoadBalancerName']
275 attrs = self.data.get('attributes')
277 if isinstance(attrs, dict):
278 policy_attributes = [{'AttributeName': name, 'AttributeValue': value}
279 for name, value in attrs.items()]
280 else:
281 policy_attributes = [{'AttributeName': attr, 'AttributeValue': 'true'}
282 for attr in attrs]
284 try:
285 client.create_load_balancer_policy(
286 LoadBalancerName=lb_name,
287 PolicyName=policy_name,
288 PolicyTypeName='SSLNegotiationPolicyType',
289 PolicyAttributes=policy_attributes)
290 except ClientError as e:
291 if e.response['Error']['Code'] not in (
292 'DuplicatePolicyName', 'DuplicatePolicyNameException',
293 'DuplicationPolicyNameException'):
294 raise
296 # Apply it to all SSL listeners.
297 ssl_policies = ()
298 if 'c7n.ssl-policies' in elb:
299 ssl_policies = elb['c7n.ssl-policies']
301 for ld in elb['ListenerDescriptions']:
302 if ld['Listener']['Protocol'] in ('HTTPS', 'SSL'):
303 policy_names = [policy_name]
304 # Preserve extant non-ssl listener policies
305 policy_names.extend(ld.get('PolicyNames', ()))
306 # Remove extant ssl listener policy
307 if ssl_policies:
308 policy_names = list(set(policy_names).difference(ssl_policies))
309 client.set_load_balancer_policies_of_listener(
310 LoadBalancerName=lb_name,
311 LoadBalancerPort=ld['Listener']['LoadBalancerPort'],
312 PolicyNames=policy_names)
315@actions.register('modify-security-groups')
316class ELBModifyVpcSecurityGroups(ModifyVpcSecurityGroupsAction):
317 """Modify VPC security groups on an ELB."""
319 permissions = ('elasticloadbalancing:ApplySecurityGroupsToLoadBalancer',)
321 def process(self, load_balancers):
322 client = local_session(self.manager.session_factory).client('elb')
323 groups = super(ELBModifyVpcSecurityGroups, self).get_groups(
324 load_balancers)
325 for idx, lb in enumerate(load_balancers):
326 client.apply_security_groups_to_load_balancer(
327 LoadBalancerName=lb['LoadBalancerName'],
328 SecurityGroups=groups[idx])
331@actions.register('enable-s3-logging')
332class EnableS3Logging(BaseAction):
333 """Action to enable S3 logging for Elastic Load Balancers.
335 :example:
337 .. code-block:: yaml
339 policies:
340 - name: elb-test
341 resource: elb
342 filters:
343 - type: is-not-logging
344 actions:
345 - type: enable-s3-logging
346 bucket: elblogtest
347 prefix: dahlogs
348 emit_interval: 5
349 """
350 schema = type_schema('enable-s3-logging',
351 bucket={'type': 'string'},
352 prefix={'type': 'string'},
353 emit_interval={'type': 'integer'},
354 )
355 permissions = ("elasticloadbalancing:ModifyLoadBalancerAttributes",)
357 def process(self, resources):
358 client = local_session(self.manager.session_factory).client('elb')
359 for elb in resources:
360 elb_name = elb['LoadBalancerName']
361 log_attrs = {'Enabled': True}
362 if 'bucket' in self.data:
363 log_attrs['S3BucketName'] = self.data['bucket']
364 if 'prefix' in self.data:
365 log_attrs['S3BucketPrefix'] = self.data['prefix']
366 if 'emit_interval' in self.data:
367 log_attrs['EmitInterval'] = self.data['emit_interval']
369 client.modify_load_balancer_attributes(LoadBalancerName=elb_name,
370 LoadBalancerAttributes={
371 'AccessLog': log_attrs
372 })
373 return resources
376@actions.register('disable-s3-logging')
377class DisableS3Logging(BaseAction):
378 """Disable s3 logging for ElasticLoadBalancers.
380 :example:
382 .. code-block:: yaml
384 policies:
385 - name: turn-off-elb-logs
386 resource: elb
387 filters:
388 - type: is-logging
389 bucket: prodbucket
390 actions:
391 - type: disable-s3-logging
392 """
393 schema = type_schema('disable-s3-logging')
394 permissions = ("elasticloadbalancing:ModifyLoadBalancerAttributes",)
396 def process(self, resources):
397 client = local_session(self.manager.session_factory).client('elb')
398 for elb in resources:
399 elb_name = elb['LoadBalancerName']
400 client.modify_load_balancer_attributes(LoadBalancerName=elb_name,
401 LoadBalancerAttributes={
402 'AccessLog': {
403 'Enabled': False}
404 })
405 return resources
408def is_ssl(b):
409 for ld in b['ListenerDescriptions']:
410 if ld['Listener']['Protocol'] in ('HTTPS', 'SSL'):
411 return True
412 return False
415@filters.register('security-group')
416class SecurityGroupFilter(net_filters.SecurityGroupFilter):
417 """ELB security group filter"""
419 RelatedIdsExpression = "SecurityGroups[]"
422@filters.register('subnet')
423class SubnetFilter(net_filters.SubnetFilter):
424 """ELB subnet filter"""
426 RelatedIdsExpression = "Subnets[]"
429@filters.register('vpc')
430class VpcFilter(net_filters.VpcFilter):
431 """ELB vpc filter"""
433 RelatedIdsExpression = "VPCId"
436filters.register('network-location', net_filters.NetworkLocation)
439@filters.register('instance')
440class Instance(ValueFilter):
441 """Filter ELB by an associated instance value(s)
443 :example:
445 .. code-block:: yaml
447 policies:
448 - name: elb-image-filter
449 resource: elb
450 filters:
451 - type: instance
452 key: ImageId
453 value: ami-01ab23cd
454 """
456 schema = type_schema('instance', rinherit=ValueFilter.schema)
457 schema_alias = False
458 annotate = False
460 def get_permissions(self):
461 return self.manager.get_resource_manager('ec2').get_permissions()
463 def process(self, resources, event=None):
464 self.elb_instances = {}
465 instances = []
466 for r in resources:
467 instances.extend([i['InstanceId'] for i in r['Instances']])
468 for i in self.manager.get_resource_manager(
469 'ec2').get_resources(list(instances)):
470 self.elb_instances[i['InstanceId']] = i
471 return super(Instance, self).process(resources, event)
473 def __call__(self, elb):
474 matched = []
475 for i in elb['Instances']:
476 instance = self.elb_instances[i['InstanceId']]
477 if self.match(instance):
478 matched.append(instance)
479 if not matched:
480 return False
481 elb['c7n:MatchedInstances'] = matched
482 return True
485@filters.register('is-ssl')
486class IsSSLFilter(Filter):
487 """Filters ELB that are using a SSL policy
489 :example:
491 .. code-block:: yaml
493 policies:
494 - name: elb-using-ssl
495 resource: elb
496 filters:
497 - type: is-ssl
498 """
500 schema = type_schema('is-ssl')
502 def process(self, balancers, event=None):
503 return [b for b in balancers if is_ssl(b)]
506@filters.register('ssl-policy')
507class SSLPolicyFilter(Filter):
508 """Filter ELBs on the properties of SSLNegotation policies.
509 TODO: Only works on custom policies at the moment.
511 whitelist: filter all policies containing permitted protocols
512 blacklist: filter all policies containing forbidden protocols
514 Cannot specify both whitelist & blacklist in the same policy. These must
515 be done seperately (seperate policy statements).
517 Likewise, if you want to reduce the consideration set such that we only
518 compare certain keys (e.g. you only want to compare the `Protocol-` keys),
519 you can use the `matching` option with a regular expression:
521 :example:
523 .. code-block:: yaml
525 policies:
526 - name: elb-ssl-policies
527 resource: elb
528 filters:
529 - type: ssl-policy
530 blacklist:
531 - "Protocol-SSLv2"
532 - "Protocol-SSLv3"
533 - name: elb-modern-tls
534 resource: elb
535 filters:
536 - type: ssl-policy
537 matching: "^Protocol-"
538 whitelist:
539 - "Protocol-TLSv1.1"
540 - "Protocol-TLSv1.2"
541 """
543 schema = {
544 'type': 'object',
545 'additionalProperties': False,
546 'oneOf': [
547 {'required': ['type', 'whitelist']},
548 {'required': ['type', 'blacklist']}
549 ],
550 'properties': {
551 'type': {'enum': ['ssl-policy']},
552 'matching': {'type': 'string'},
553 'whitelist': {'type': 'array', 'items': {'type': 'string'}},
554 'blacklist': {'type': 'array', 'items': {'type': 'string'}}
555 }
556 }
557 permissions = ("elasticloadbalancing:DescribeLoadBalancerPolicies",)
559 def validate(self):
560 if 'whitelist' in self.data and 'blacklist' in self.data:
561 raise PolicyValidationError(
562 "cannot specify whitelist and black list on %s" % (
563 self.manager.data,))
564 if 'whitelist' not in self.data and 'blacklist' not in self.data:
565 raise PolicyValidationError(
566 "must specify either policy blacklist or whitelist on %s" % (
567 self.manager.data,))
568 if ('blacklist' in self.data and
569 not isinstance(self.data['blacklist'], list)):
570 raise PolicyValidationError("blacklist must be a list on %s" % (
571 self.manager.data,))
573 if 'matching' in self.data:
574 # Sanity check that we can compile
575 try:
576 re.compile(self.data['matching'])
577 except re.error as e:
578 raise PolicyValidationError(
579 "Invalid regex: %s %s" % (e, self.manager.data))
581 return self
583 def process(self, balancers, event=None):
584 balancers = [b for b in balancers if is_ssl(b)]
585 active_policy_attribute_tuples = (
586 self.create_elb_active_policy_attribute_tuples(balancers))
588 whitelist = set(self.data.get('whitelist', []))
589 blacklist = set(self.data.get('blacklist', []))
591 invalid_elbs = []
593 if 'matching' in self.data:
594 regex = self.data.get('matching')
595 filtered_pairs = []
596 for (elb, active_policies) in active_policy_attribute_tuples:
597 filtered_policies = [policy for policy in active_policies if
598 bool(re.match(regex, policy, flags=re.IGNORECASE))]
599 if filtered_policies:
600 filtered_pairs.append((elb, filtered_policies))
601 active_policy_attribute_tuples = filtered_pairs
603 if blacklist:
604 for elb, active_policies in active_policy_attribute_tuples:
605 if len(blacklist.intersection(active_policies)) > 0:
606 elb["ProhibitedPolicies"] = list(
607 blacklist.intersection(active_policies))
608 invalid_elbs.append(elb)
609 elif whitelist:
610 for elb, active_policies in active_policy_attribute_tuples:
611 if len(set(active_policies).difference(whitelist)) > 0:
612 elb["ProhibitedPolicies"] = list(
613 set(active_policies).difference(whitelist))
614 invalid_elbs.append(elb)
615 return invalid_elbs
617 def create_elb_active_policy_attribute_tuples(self, elbs):
618 """
619 Returns a list of tuples of active SSL policies attributes
620 for each elb [(elb['Protocol-SSLv1','Protocol-SSLv2',...])]
621 """
623 elb_custom_policy_tuples = self.create_elb_custom_policy_tuples(elbs)
625 active_policy_attribute_tuples = (
626 self.create_elb_active_attributes_tuples(elb_custom_policy_tuples))
628 return active_policy_attribute_tuples
630 def create_elb_custom_policy_tuples(self, balancers):
631 """
632 creates a list of tuples (elb,[sslpolicy1,sslpolicy2...])
633 for all custom policies on the ELB
634 """
635 elb_policy_tuples = []
636 for b in balancers:
637 policies = []
638 for ld in b['ListenerDescriptions']:
639 for p in ld['PolicyNames']:
640 policies.append(p)
641 elb_policy_tuples.append((b, policies))
643 return elb_policy_tuples
645 def create_elb_active_attributes_tuples(self, elb_policy_tuples):
646 """
647 creates a list of tuples for all attributes that are marked
648 as "true" in the load balancer's polices, e.g.
649 (myelb,['Protocol-SSLv1','Protocol-SSLv2'])
650 """
651 active_policy_attribute_tuples = []
652 client = local_session(self.manager.session_factory).client('elb')
653 with self.executor_factory(max_workers=2) as w:
654 futures = []
655 for elb_policy_set in chunks(elb_policy_tuples, 50):
656 futures.append(
657 w.submit(self.process_elb_policy_set, client, elb_policy_set))
659 for f in as_completed(futures):
660 if f.exception():
661 self.log.error(
662 "Exception processing elb policies \n %s" % (
663 f.exception()))
664 continue
665 for elb_policies in f.result():
666 active_policy_attribute_tuples.append(elb_policies)
668 return active_policy_attribute_tuples
670 def process_elb_policy_set(self, client, elb_policy_set):
671 results = []
673 for (elb, policy_names) in elb_policy_set:
674 elb_name = elb['LoadBalancerName']
675 try:
676 policies = client.describe_load_balancer_policies(
677 LoadBalancerName=elb_name,
678 PolicyNames=policy_names)['PolicyDescriptions']
679 except ClientError as e:
680 if e.response['Error']['Code'] in [
681 'LoadBalancerNotFound', 'PolicyNotFound']:
682 continue
683 raise
684 active_lb_policies = []
685 ssl_policies = []
686 for p in policies:
687 if p['PolicyTypeName'] != 'SSLNegotiationPolicyType':
688 continue
689 ssl_policies.append(p['PolicyName'])
690 active_lb_policies.extend(
691 [policy_description['AttributeName']
692 for policy_description in
693 p['PolicyAttributeDescriptions']
694 if policy_description['AttributeValue'] == 'true']
695 )
696 elb['c7n.ssl-policies'] = ssl_policies
697 results.append((elb, active_lb_policies))
699 return results
702@filters.register('healthcheck-protocol-mismatch')
703class HealthCheckProtocolMismatch(Filter):
704 """Filters ELB that have a health check protocol mismatch
706 The mismatch occurs if the ELB has a different protocol to check than
707 the associated instances allow to determine health status.
709 :example:
711 .. code-block:: yaml
713 policies:
714 - name: elb-healthcheck-mismatch
715 resource: elb
716 filters:
717 - type: healthcheck-protocol-mismatch
718 """
720 schema = type_schema('healthcheck-protocol-mismatch')
722 def __call__(self, load_balancer):
723 health_check_protocol = (
724 load_balancer['HealthCheck']['Target'].split(':')[0])
725 listener_descriptions = load_balancer['ListenerDescriptions']
727 if len(listener_descriptions) == 0:
728 return True
730 # check if any of the protocols in the ELB match the health
731 # check. There is only 1 health check, so if there are
732 # multiple listeners, we only check if at least one of them
733 # matches
734 protocols = [listener['Listener']['InstanceProtocol']
735 for listener in listener_descriptions]
736 return health_check_protocol in protocols
739@filters.register('default-vpc')
740class DefaultVpc(net_filters.DefaultVpcBase):
741 """ Matches if an elb database is in the default vpc
743 :example:
745 .. code-block:: yaml
747 policies:
748 - name: elb-default-vpc
749 resource: elb
750 filters:
751 - type: default-vpc
752 """
754 schema = type_schema('default-vpc')
756 def __call__(self, elb):
757 return elb.get('VPCId') and self.match(elb.get('VPCId')) or False
760class ELBAttributeFilterBase:
761 """ Mixin base class for filters that query LB attributes.
762 """
764 def initialize(self, elbs):
765 client = local_session(
766 self.manager.session_factory).client('elb')
768 def _process_attributes(elb):
769 if 'Attributes' not in elb:
770 results = client.describe_load_balancer_attributes(
771 LoadBalancerName=elb['LoadBalancerName'])
772 elb['Attributes'] = results['LoadBalancerAttributes']
774 with self.manager.executor_factory(max_workers=2) as w:
775 list(w.map(_process_attributes, elbs))
778@filters.register('is-logging')
779class IsLoggingFilter(Filter, ELBAttributeFilterBase):
780 """Matches ELBs that are logging to S3.
781 bucket and prefix are optional
783 :example:
785 .. code-block:: yaml
787 policies:
788 - name: elb-is-logging-test
789 resource: elb
790 filters:
791 - type: is-logging
793 - name: elb-is-logging-bucket-and-prefix-test
794 resource: elb
795 filters:
796 - type: is-logging
797 bucket: prodlogs
798 prefix: elblogs
799 """
801 permissions = ("elasticloadbalancing:DescribeLoadBalancerAttributes",)
802 schema = type_schema('is-logging',
803 bucket={'type': 'string'},
804 prefix={'type': 'string'}
805 )
807 def process(self, resources, event=None):
808 self.initialize(resources)
809 bucket_name = self.data.get('bucket', None)
810 bucket_prefix = self.data.get('prefix', None)
812 return [elb for elb in resources
813 if elb['Attributes']['AccessLog']['Enabled'] and
814 (not bucket_name or bucket_name == elb['Attributes'][
815 'AccessLog'].get('S3BucketName', None)) and
816 (not bucket_prefix or bucket_prefix == elb['Attributes'][
817 'AccessLog'].get('S3BucketPrefix', None))
818 ]
821@filters.register('is-not-logging')
822class IsNotLoggingFilter(Filter, ELBAttributeFilterBase):
823 """ Matches ELBs that are NOT logging to S3.
824 or do not match the optional bucket and/or prefix.
826 :example:
828 .. code-block:: yaml
830 policies:
831 - name: elb-is-not-logging-test
832 resource: elb
833 filters:
834 - type: is-not-logging
836 - name: is-not-logging-bucket-and-prefix-test
837 resource: app-elb
838 filters:
839 - type: is-not-logging
840 bucket: prodlogs
841 prefix: alblogs
843 """
844 permissions = ("elasticloadbalancing:DescribeLoadBalancerAttributes",)
845 schema = type_schema('is-not-logging',
846 bucket={'type': 'string'},
847 prefix={'type': 'string'}
848 )
850 def process(self, resources, event=None):
851 self.initialize(resources)
852 bucket_name = self.data.get('bucket', None)
853 bucket_prefix = self.data.get('prefix', None)
855 return [elb for elb in resources
856 if not elb['Attributes']['AccessLog']['Enabled'] or
857 (bucket_name and bucket_name != elb['Attributes'][
858 'AccessLog'].get(
859 'S3BucketName', None)) or
860 (bucket_prefix and bucket_prefix != elb['Attributes'][
861 'AccessLog'].get(
862 'S3BucketPrefix', None))
863 ]
866@filters.register('attributes')
867class CheckAttributes(ValueFilter, ELBAttributeFilterBase):
868 """Value Filter that allows filtering on ELB attributes
870 :example:
872 .. code-block:: yaml
874 policies:
875 - name: elb-is-connection-draining
876 resource: elb
877 filters:
878 - type: attributes
879 key: ConnectionDraining.Enabled
880 value: true
881 op: eq
883 """
884 annotate = False # no annotation from value filter
885 permissions = ("elasticloadbalancing:DescribeLoadBalancerAttributes",)
886 schema = type_schema('attributes', rinherit=ValueFilter.schema)
887 schema_alias = False
889 def process(self, resources, event=None):
890 self.augment(resources)
891 return super().process(resources, event)
893 def augment(self, resources):
894 self.initialize(resources)
896 def __call__(self, r):
897 return super().__call__(r['Attributes'])