Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/c7n/filters/waf.py: 35%
110 statements
« prev ^ index » next coverage.py v7.3.2, created at 2023-12-08 06:51 +0000
« prev ^ index » next coverage.py v7.3.2, created at 2023-12-08 06:51 +0000
1import re
2from abc import abstractmethod, ABCMeta
4from c7n.deprecated import DeprecatedField
5from c7n.filters.core import ValueFilter, type_schema
8class WafClassicRegionalFilterBase(ValueFilter, metaclass=ABCMeta):
9 """Filter a resource based on an associated WAF Classic WebACL using the generic value
10 filter. The value passed to the filter will be an instance of WebACL from AWS or an empty
11 object ({}) if no ACL is associated. WAF Classic can be associated with an Application
12 Load Balancer or an API Gateway REST API Stage.
14 https://docs.aws.amazon.com/waf/latest/APIReference/API_wafRegional_WebACL.html
16 :example:
18 Find all API Gateway Rest stages that don't have waf enabled with at least one rule
20 .. code-block:: yaml
22 policies:
23 - name: filter-waf-value
24 resource: aws.rest-stage
25 filters:
26 - type: waf-enabled
27 key: Rules
28 value: empty
30 """
32 associated_cache_key = 'c7n:AssociatedResources'
34 schema = type_schema(
35 'waf-enabled',
36 rinherit=ValueFilter.schema,
37 **{
38 'web-acl': {'type': 'string'},
39 'state': {'type': 'boolean'}
40 }
41 )
43 permissions = (
44 'waf-regional:ListWebACLs',
45 'waf-regional:GetWebACL', # for augment
46 'waf-regional:ListResourcesForWebACL' # for finding associated resources
47 )
49 def __init__(self, data, manager=None):
50 super().__init__(data, manager)
53 # "legacy" mode matches previous WAF based filters for backwards compatability and is
54 # enabled when one of the legacy properties is provided or when no value filter properties
55 # are given (none of the legacy properties are required)
56 self._is_legacy = (
57 'web-acl' in self.data
58 or 'state' in self.data
59 or len(self.data.keys()) == 1 # only filter "type" is given
60 )
61 self._cached_web_acls = None
63 def _legacy_match(self, resource):
64 target_acl = self.data.get('web-acl')
65 r_acl = self.get_associated_web_acl(resource)
66 state = self.data.get('state', False)
68 # WAF is considered enabled if there is an associated WebACL AND that ACL matches the
69 # specified target in the filter IF provided
70 return (bool(r_acl) and (target_acl is None or target_acl == r_acl['Name'])) == state
72 # get the set of web acls we should look through by asking the resource manager for the set
73 # based on the scope
74 def _get_web_acls(self):
75 if self._cached_web_acls is None:
76 self._cached_web_acls = self.manager.get_resource_manager('waf-regional').resources(
77 # required to get the additional detail needed for this filter (e.g. Rules), but
78 # the legacy mode does not require additional detail
79 augment=True
80 )
82 return self._cached_web_acls
84 # load the resources the web_acl is attached to and cache them with the web acl
85 def _load_associated_resources(self, web_acl, resource_type):
86 cache_key = f'{self.associated_cache_key}:{resource_type}'
88 if cache_key in web_acl:
89 return web_acl[cache_key]
91 client = self.manager.session_factory().client('waf-regional')
93 resource_arns = client.list_resources_for_web_acl(
94 WebACLId=web_acl['WebACLId'],
95 ResourceType=resource_type
96 ).get('ResourceArns', [])
98 web_acl[cache_key] = resource_arns
100 return resource_arns
102 def get_deprecations(self):
103 filter_name = self.data["type"]
104 return [
105 DeprecatedField(f"{filter_name}.{k}", "Use the value filter attributes instead")
106 for k in {'web-acl', 'state'}.intersection(self.data)
107 ]
109 def get_web_acl_from_associations(self, resource_type, resource_arn):
110 for web_acl in self._get_web_acls():
111 associated_arns = self._load_associated_resources(web_acl, resource_type)
112 if resource_arn in associated_arns:
113 return web_acl
115 # default empty so we can actually match where no web acl is present
116 return {}
118 def get_web_acl_by_arn(self, arn):
119 web_acls = self._get_web_acls()
121 return next(
122 filter(lambda acl: acl['WebACLArn'] == arn, web_acls),
123 # default empty so we can actually match where no web acl is present
124 {}
125 )
127 def validate(self):
128 # only allow legacy behavior or new ValueFilter behavior, not both
129 if not self._is_legacy:
130 # only validate value filter when not in "legacy" mode
131 super(WafClassicRegionalFilterBase, self).validate()
133 def process(self, resources, event=None):
134 if self._is_legacy:
135 return [
136 resource for resource in resources
137 # call value filter on associated WebACL
138 if self._legacy_match(resource)
139 ]
141 return [
142 resource for resource in resources
143 # call value filter on associated WebACL
144 if self(self.get_associated_web_acl(resource))
145 ]
147 # Main method used to determine the web acl associated with the given resource - must
148 # be overriden in a base class as each resource has a slightly unigue way of getting the
149 # associated web acl
150 @abstractmethod
151 def get_associated_web_acl(self, resource):
152 raise NotImplementedError('"get_associated_web_acl" must be overriden')
155class WafV2FilterBase(ValueFilter, metaclass=ABCMeta):
156 """Filter a resource based on an associated WAFv2 WebACL using the generic value filter. The
157 value passed to the filter will be an instance of WebACL from AWS or an empty object ({}) if
158 no ACL is associated with the rest stage. WAFv2 can be associated with an Application
159 Load Balancer, API Gateway REST API Stage, AppSync GraphQL API, Cognito User Pool, Cloudfront
160 Distribution, or App Runner Service.
162 https://docs.aws.amazon.com/waf/latest/APIReference/API_WebACL.html
164 :example:
166 Ensure an API Gateway Rest stage has waf enabled with at least one rule
168 .. code-block:: yaml
170 policies:
171 - name: filter-wafv2-value
172 resource: aws.rest-stage
173 filters:
174 - type: wafv2-enabled
175 key: Rules
176 value: empty
178 """
180 cache_key = 'c7n:WebACL'
181 associated_cache_key = 'c7n:AssociatedResources'
183 schema = type_schema(
184 'wafv2-enabled',
185 rinherit=ValueFilter.schema,
186 **{
187 'web-acl': {'type': 'string'},
188 'state': {'type': 'boolean'}
189 }
190 )
192 permissions = (
193 'wafv2:ListWebACLs',
194 'wafv2:GetWebACL', # for augment
195 'wafv2:ListResourcesForWebACL' # for finding associated regional resources
196 )
198 def __init__(self, data, manager=None):
199 super().__init__(data, manager)
201 # "legacy" mode matches previous WAF based filters for backwards compatability and is
202 # enabled when one of the legacy properties is provided or when no value filter properties
203 # are given (none of the legacy properties are required)
204 self._is_legacy = (
205 'web-acl' in self.data
206 or 'state' in self.data
207 or len(self.data.keys()) == 1 # only filter "type" is given
208 )
209 self._cached_web_acls = None
211 def _legacy_match(self, r_acl):
212 target_acl = self.data.get('web-acl')
213 state = self.data.get('state', False)
215 return (
216 bool(r_acl)
217 and (
218 target_acl is None
219 or bool(re.match(target_acl, r_acl['Name']))
220 )
221 ) == state
223 # get the set of web acls we should look through by asking the resource manager for the set
224 # based on the scope
225 def _get_web_acls(self, scope):
226 if self._cached_web_acls is None:
227 self._cached_web_acls = self.manager.get_resource_manager('wafv2').resources(
228 query=dict(Scope=scope),
229 # required to get the additional detail needed for this filter (e.g. Rules), but
230 # the legacy mode does not require additional detail
231 augment=(not self._is_legacy)
232 )
234 return self._cached_web_acls
236 # simple search over the list of web acls matching on the specified attribute, returns
237 # None if no match
238 def _get_associated_web_acl_by_attr(self, attr_name, attr_value, scope):
239 web_acls = self._get_web_acls(scope)
241 return next(
242 filter(lambda acl: acl[attr_name] == attr_value, web_acls),
243 # default empty so we can actually match where no web acl is present
244 {}
245 )
247 # load the resources the web_acl is attached to and cache them with the web acl
248 # we only need to do this for REGIONAL web acls as cloudfront holds a reference to
249 # web acl
250 def _load_associated_resources(self, web_acl, resource_type):
251 cache_key = f'{self.associated_cache_key}:{resource_type}'
253 if cache_key not in web_acl:
254 client = self.manager.session_factory().client('wafv2')
256 web_acl[cache_key] = client.list_resources_for_web_acl(
257 WebACLArn=web_acl['ARN'],
258 ResourceType=resource_type
259 ).get('ResourceArns', [])
261 return web_acl[cache_key]
263 def _get_associated_web_acl_cached(self, resource):
264 if self.cache_key not in resource:
265 resource[self.cache_key] = self.get_associated_web_acl(resource)
267 return resource[self.cache_key]
269 def get_deprecations(self):
270 filter_name = self.data["type"]
271 return [
272 DeprecatedField(f"{filter_name}.{k}", "Use the value filter attributes instead")
273 for k in {'web-acl', 'state'}.intersection(self.data)
274 ]
276 # only needed for REGIONAL resources so no scope used as regional is default
277 def get_web_acl_from_associations(self, resource_type, resource_arn):
278 for web_acl in self._get_web_acls(scope='REGIONAL'):
279 associated_arns = self._load_associated_resources(web_acl, resource_type)
280 if resource_arn in associated_arns:
281 return web_acl
283 # default empty so we can actually match where no web acl is present
284 return {}
286 def get_web_acl_by_arn(self, arn, scope='REGIONAL'):
287 return self._get_associated_web_acl_by_attr('ARN', arn, scope)
289 def get_web_acl_by_id(self, id, scope='REGIONAL'):
290 return self._get_associated_web_acl_by_attr('Id', id, scope)
292 def validate(self):
293 # only allow legacy behavior or new ValueFilter behavior, not both
294 if not self._is_legacy:
295 # only validate value filter when not in "legacy" mode
296 super(WafV2FilterBase, self).validate()
298 def process(self, resources, event=None):
299 matched = []
300 for resource in resources:
301 r_web_acl = self._get_associated_web_acl_cached(resource)
303 if self._is_legacy:
304 if self._legacy_match(r_web_acl):
305 matched.append(resource)
306 # call value filter on associated WebACL
307 elif self(r_web_acl):
308 matched.append(resource)
310 return matched
312 # Main method used to determine the web acl associated with the given resource - must
313 # be overriden in a base class as each resource has a slightly unigue way of getting the
314 # associated web acl
315 @abstractmethod
316 def get_associated_web_acl(self, resource):
317 raise NotImplementedError('"get_associated_web_acl" must be overriden')