Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/c7n/filters/waf.py: 35%
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1import re
2from abc import abstractmethod, ABCMeta
4from c7n.deprecated import DeprecatedField
5from c7n.filters.core import ValueFilter, type_schema
8class WafClassicRegionalFilterBase(ValueFilter, metaclass=ABCMeta):
9 """Filter a resource based on an associated WAF Classic WebACL using the generic value
10 filter. The value passed to the filter will be an instance of WebACL from AWS or an empty
11 object ({}) if no ACL is associated. WAF Classic can be associated with an Application
12 Load Balancer or an API Gateway REST API Stage.
14 https://docs.aws.amazon.com/waf/latest/APIReference/API_wafRegional_WebACL.html
16 :example:
18 Find all API Gateway Rest stages that don't have waf enabled with at least one rule
20 .. code-block:: yaml
22 policies:
23 - name: filter-waf-value
24 resource: aws.rest-stage
25 filters:
26 - type: waf-enabled
27 key: Rules
28 value: empty
30 """
32 associated_cache_key = 'c7n:AssociatedResources'
34 schema = type_schema(
35 'waf-enabled',
36 rinherit=ValueFilter.schema,
37 **{
38 'web-acl': {'type': 'string'},
39 'state': {'type': 'boolean'}
40 }
41 )
43 permissions = (
44 'waf-regional:ListWebACLs',
45 'waf-regional:GetWebACL', # for augment
46 'waf-regional:ListResourcesForWebACL' # for finding associated resources
47 )
49 def __init__(self, data, manager=None):
50 super().__init__(data, manager)
52 # "legacy" mode matches previous WAF based filters for backwards compatability and is
53 # enabled when one of the legacy properties is provided or when no value filter properties
54 # are given (none of the legacy properties are required)
55 self._is_legacy = (
56 'web-acl' in self.data
57 or 'state' in self.data
58 or len(self.data.keys()) == 1 # only filter "type" is given
59 )
60 self._cached_web_acls = None
62 def _legacy_match(self, resource):
63 target_acl = self.data.get('web-acl')
64 r_acl = self.get_associated_web_acl(resource)
65 state = self.data.get('state', False)
67 # WAF is considered enabled if there is an associated WebACL AND that ACL matches the
68 # specified target in the filter IF provided
69 return (bool(r_acl) and (target_acl is None or target_acl == r_acl['Name'])) == state
71 # get the set of web acls we should look through by asking the resource manager for the set
72 # based on the scope
73 def _get_web_acls(self):
74 if self._cached_web_acls is None:
75 self._cached_web_acls = self.manager.get_resource_manager('waf-regional').resources(
76 # required to get the additional detail needed for this filter (e.g. Rules), but
77 # the legacy mode does not require additional detail
78 augment=True
79 )
81 return self._cached_web_acls
83 # load the resources the web_acl is attached to and cache them with the web acl
84 def _load_associated_resources(self, web_acl, resource_type):
85 cache_key = f'{self.associated_cache_key}:{resource_type}'
87 if cache_key in web_acl:
88 return web_acl[cache_key]
90 client = self.manager.session_factory().client('waf-regional')
92 resource_arns = client.list_resources_for_web_acl(
93 WebACLId=web_acl['WebACLId'],
94 ResourceType=resource_type
95 ).get('ResourceArns', [])
97 web_acl[cache_key] = resource_arns
99 return resource_arns
101 def get_deprecations(self):
102 filter_name = self.data["type"]
103 return [
104 DeprecatedField(f"{filter_name}.{k}", "Use the value filter attributes instead")
105 for k in {'web-acl', 'state'}.intersection(self.data)
106 ]
108 def get_web_acl_from_associations(self, resource_type, resource_arn):
109 for web_acl in self._get_web_acls():
110 associated_arns = self._load_associated_resources(web_acl, resource_type)
111 if resource_arn in associated_arns:
112 return web_acl
114 # default empty so we can actually match where no web acl is present
115 return {}
117 def get_web_acl_by_arn(self, arn):
118 web_acls = self._get_web_acls()
120 return next(
121 filter(lambda acl: acl['WebACLArn'] == arn, web_acls),
122 # default empty so we can actually match where no web acl is present
123 {}
124 )
126 def validate(self):
127 # only allow legacy behavior or new ValueFilter behavior, not both
128 if not self._is_legacy:
129 # only validate value filter when not in "legacy" mode
130 super(WafClassicRegionalFilterBase, self).validate()
132 def process(self, resources, event=None):
133 if self._is_legacy:
134 return [
135 resource for resource in resources
136 # call value filter on associated WebACL
137 if self._legacy_match(resource)
138 ]
140 return [
141 resource for resource in resources
142 # call value filter on associated WebACL
143 if self(self.get_associated_web_acl(resource))
144 ]
146 # Main method used to determine the web acl associated with the given resource - must
147 # be overriden in a base class as each resource has a slightly unigue way of getting the
148 # associated web acl
149 @abstractmethod
150 def get_associated_web_acl(self, resource):
151 raise NotImplementedError('"get_associated_web_acl" must be overriden')
154class WafV2FilterBase(ValueFilter, metaclass=ABCMeta):
155 """Filter a resource based on an associated WAFv2 WebACL using the generic value filter. The
156 value passed to the filter will be an instance of WebACL from AWS or an empty object ({}) if
157 no ACL is associated with the rest stage. WAFv2 can be associated with an Application
158 Load Balancer, API Gateway REST API Stage, AppSync GraphQL API, Cognito User Pool, Cloudfront
159 Distribution, or App Runner Service.
161 https://docs.aws.amazon.com/waf/latest/APIReference/API_WebACL.html
163 :example:
165 Ensure an API Gateway Rest stage has waf enabled with at least one rule
167 .. code-block:: yaml
169 policies:
170 - name: filter-wafv2-value
171 resource: aws.rest-stage
172 filters:
173 - type: wafv2-enabled
174 key: Rules
175 value: empty
177 """
179 cache_key = 'c7n:WebACL'
180 associated_cache_key = 'c7n:AssociatedResources'
182 schema = type_schema(
183 'wafv2-enabled',
184 rinherit=ValueFilter.schema,
185 **{
186 'web-acl': {'type': 'string'},
187 'state': {'type': 'boolean'}
188 }
189 )
191 permissions = (
192 'wafv2:ListWebACLs',
193 'wafv2:GetWebACL', # for augment
194 'wafv2:ListResourcesForWebACL' # for finding associated regional resources
195 )
197 def __init__(self, data, manager=None):
198 super().__init__(data, manager)
200 # "legacy" mode matches previous WAF based filters for backwards compatability and is
201 # enabled when one of the legacy properties is provided or when no value filter properties
202 # are given (none of the legacy properties are required)
203 self._is_legacy = (
204 'web-acl' in self.data
205 or 'state' in self.data
206 or len(self.data.keys()) == 1 # only filter "type" is given
207 )
208 self._cached_web_acls = None
210 def _legacy_match(self, r_acl):
211 target_acl = self.data.get('web-acl')
212 state = self.data.get('state', False)
214 return (
215 bool(r_acl)
216 and (
217 target_acl is None
218 or bool(re.match(target_acl, r_acl['Name']))
219 )
220 ) == state
222 # get the set of web acls we should look through by asking the resource manager for the set
223 # based on the scope
224 def _get_web_acls(self, scope):
225 if self._cached_web_acls is None:
226 self._cached_web_acls = self.manager.get_resource_manager('wafv2').resources(
227 query=dict(Scope=scope),
228 # required to get the additional detail needed for this filter (e.g. Rules), but
229 # the legacy mode does not require additional detail
230 augment=(not self._is_legacy)
231 )
233 return self._cached_web_acls
235 # simple search over the list of web acls matching on the specified attribute, returns
236 # None if no match
237 def _get_associated_web_acl_by_attr(self, attr_name, attr_value, scope):
238 web_acls = self._get_web_acls(scope)
240 return next(
241 filter(lambda acl: acl[attr_name] == attr_value, web_acls),
242 # default empty so we can actually match where no web acl is present
243 {}
244 )
246 # load the resources the web_acl is attached to and cache them with the web acl
247 # we only need to do this for REGIONAL web acls as cloudfront holds a reference to
248 # web acl
249 def _load_associated_resources(self, web_acl, resource_type):
250 cache_key = f'{self.associated_cache_key}:{resource_type}'
252 if cache_key not in web_acl:
253 client = self.manager.session_factory().client('wafv2')
255 web_acl[cache_key] = client.list_resources_for_web_acl(
256 WebACLArn=web_acl['ARN'],
257 ResourceType=resource_type
258 ).get('ResourceArns', [])
260 return web_acl[cache_key]
262 def _get_associated_web_acl_cached(self, resource):
263 if self.cache_key not in resource:
264 resource[self.cache_key] = self.get_associated_web_acl(resource)
266 return resource[self.cache_key]
268 def get_deprecations(self):
269 filter_name = self.data["type"]
270 return [
271 DeprecatedField(f"{filter_name}.{k}", "Use the value filter attributes instead")
272 for k in {'web-acl', 'state'}.intersection(self.data)
273 ]
275 # only needed for REGIONAL resources so no scope used as regional is default
276 def get_web_acl_from_associations(self, resource_type, resource_arn):
277 for web_acl in self._get_web_acls(scope='REGIONAL'):
278 associated_arns = self._load_associated_resources(web_acl, resource_type)
279 if resource_arn in associated_arns:
280 return web_acl
282 # default empty so we can actually match where no web acl is present
283 return {}
285 def get_web_acl_by_arn(self, arn, scope='REGIONAL'):
286 return self._get_associated_web_acl_by_attr('ARN', arn, scope)
288 def get_web_acl_by_id(self, id, scope='REGIONAL'):
289 return self._get_associated_web_acl_by_attr('Id', id, scope)
291 def validate(self):
292 # only allow legacy behavior or new ValueFilter behavior, not both
293 if not self._is_legacy:
294 # only validate value filter when not in "legacy" mode
295 super(WafV2FilterBase, self).validate()
297 def process(self, resources, event=None):
298 matched = []
299 for resource in resources:
300 r_web_acl = self._get_associated_web_acl_cached(resource)
302 if self._is_legacy:
303 if self._legacy_match(r_web_acl):
304 matched.append(resource)
305 # call value filter on associated WebACL
306 elif self(r_web_acl):
307 matched.append(resource)
309 return matched
311 # Main method used to determine the web acl associated with the given resource - must
312 # be overriden in a base class as each resource has a slightly unigue way of getting the
313 # associated web acl
314 @abstractmethod
315 def get_associated_web_acl(self, resource):
316 raise NotImplementedError('"get_associated_web_acl" must be overriden')