Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/c7n/filters/waf.py: 35%

110 statements  

« prev     ^ index     » next       coverage.py v7.3.2, created at 2023-12-08 06:51 +0000

1import re 

2from abc import abstractmethod, ABCMeta 

3 

4from c7n.deprecated import DeprecatedField 

5from c7n.filters.core import ValueFilter, type_schema 

6 

7 

8class WafClassicRegionalFilterBase(ValueFilter, metaclass=ABCMeta): 

9 """Filter a resource based on an associated WAF Classic WebACL using the generic value 

10 filter. The value passed to the filter will be an instance of WebACL from AWS or an empty 

11 object ({}) if no ACL is associated. WAF Classic can be associated with an Application 

12 Load Balancer or an API Gateway REST API Stage. 

13 

14 https://docs.aws.amazon.com/waf/latest/APIReference/API_wafRegional_WebACL.html 

15 

16 :example: 

17 

18 Find all API Gateway Rest stages that don't have waf enabled with at least one rule 

19 

20 .. code-block:: yaml 

21 

22 policies: 

23 - name: filter-waf-value 

24 resource: aws.rest-stage 

25 filters: 

26 - type: waf-enabled 

27 key: Rules 

28 value: empty 

29 

30 """ 

31 

32 associated_cache_key = 'c7n:AssociatedResources' 

33 

34 schema = type_schema( 

35 'waf-enabled', 

36 rinherit=ValueFilter.schema, 

37 **{ 

38 'web-acl': {'type': 'string'}, 

39 'state': {'type': 'boolean'} 

40 } 

41 ) 

42 

43 permissions = ( 

44 'waf-regional:ListWebACLs', 

45 'waf-regional:GetWebACL', # for augment 

46 'waf-regional:ListResourcesForWebACL' # for finding associated resources 

47 ) 

48 

49 def __init__(self, data, manager=None): 

50 super().__init__(data, manager) 

51 

52 

53 # "legacy" mode matches previous WAF based filters for backwards compatability and is 

54 # enabled when one of the legacy properties is provided or when no value filter properties 

55 # are given (none of the legacy properties are required) 

56 self._is_legacy = ( 

57 'web-acl' in self.data 

58 or 'state' in self.data 

59 or len(self.data.keys()) == 1 # only filter "type" is given 

60 ) 

61 self._cached_web_acls = None 

62 

63 def _legacy_match(self, resource): 

64 target_acl = self.data.get('web-acl') 

65 r_acl = self.get_associated_web_acl(resource) 

66 state = self.data.get('state', False) 

67 

68 # WAF is considered enabled if there is an associated WebACL AND that ACL matches the 

69 # specified target in the filter IF provided 

70 return (bool(r_acl) and (target_acl is None or target_acl == r_acl['Name'])) == state 

71 

72 # get the set of web acls we should look through by asking the resource manager for the set 

73 # based on the scope 

74 def _get_web_acls(self): 

75 if self._cached_web_acls is None: 

76 self._cached_web_acls = self.manager.get_resource_manager('waf-regional').resources( 

77 # required to get the additional detail needed for this filter (e.g. Rules), but 

78 # the legacy mode does not require additional detail 

79 augment=True 

80 ) 

81 

82 return self._cached_web_acls 

83 

84 # load the resources the web_acl is attached to and cache them with the web acl 

85 def _load_associated_resources(self, web_acl, resource_type): 

86 cache_key = f'{self.associated_cache_key}:{resource_type}' 

87 

88 if cache_key in web_acl: 

89 return web_acl[cache_key] 

90 

91 client = self.manager.session_factory().client('waf-regional') 

92 

93 resource_arns = client.list_resources_for_web_acl( 

94 WebACLId=web_acl['WebACLId'], 

95 ResourceType=resource_type 

96 ).get('ResourceArns', []) 

97 

98 web_acl[cache_key] = resource_arns 

99 

100 return resource_arns 

101 

102 def get_deprecations(self): 

103 filter_name = self.data["type"] 

104 return [ 

105 DeprecatedField(f"{filter_name}.{k}", "Use the value filter attributes instead") 

106 for k in {'web-acl', 'state'}.intersection(self.data) 

107 ] 

108 

109 def get_web_acl_from_associations(self, resource_type, resource_arn): 

110 for web_acl in self._get_web_acls(): 

111 associated_arns = self._load_associated_resources(web_acl, resource_type) 

112 if resource_arn in associated_arns: 

113 return web_acl 

114 

115 # default empty so we can actually match where no web acl is present 

116 return {} 

117 

118 def get_web_acl_by_arn(self, arn): 

119 web_acls = self._get_web_acls() 

120 

121 return next( 

122 filter(lambda acl: acl['WebACLArn'] == arn, web_acls), 

123 # default empty so we can actually match where no web acl is present 

124 {} 

125 ) 

126 

127 def validate(self): 

128 # only allow legacy behavior or new ValueFilter behavior, not both 

129 if not self._is_legacy: 

130 # only validate value filter when not in "legacy" mode 

131 super(WafClassicRegionalFilterBase, self).validate() 

132 

133 def process(self, resources, event=None): 

134 if self._is_legacy: 

135 return [ 

136 resource for resource in resources 

137 # call value filter on associated WebACL 

138 if self._legacy_match(resource) 

139 ] 

140 

141 return [ 

142 resource for resource in resources 

143 # call value filter on associated WebACL 

144 if self(self.get_associated_web_acl(resource)) 

145 ] 

146 

147 # Main method used to determine the web acl associated with the given resource - must 

148 # be overriden in a base class as each resource has a slightly unigue way of getting the 

149 # associated web acl 

150 @abstractmethod 

151 def get_associated_web_acl(self, resource): 

152 raise NotImplementedError('"get_associated_web_acl" must be overriden') 

153 

154 

155class WafV2FilterBase(ValueFilter, metaclass=ABCMeta): 

156 """Filter a resource based on an associated WAFv2 WebACL using the generic value filter. The 

157 value passed to the filter will be an instance of WebACL from AWS or an empty object ({}) if 

158 no ACL is associated with the rest stage. WAFv2 can be associated with an Application 

159 Load Balancer, API Gateway REST API Stage, AppSync GraphQL API, Cognito User Pool, Cloudfront 

160 Distribution, or App Runner Service. 

161 

162 https://docs.aws.amazon.com/waf/latest/APIReference/API_WebACL.html 

163 

164 :example: 

165 

166 Ensure an API Gateway Rest stage has waf enabled with at least one rule 

167 

168 .. code-block:: yaml 

169 

170 policies: 

171 - name: filter-wafv2-value 

172 resource: aws.rest-stage 

173 filters: 

174 - type: wafv2-enabled 

175 key: Rules 

176 value: empty 

177 

178 """ 

179 

180 cache_key = 'c7n:WebACL' 

181 associated_cache_key = 'c7n:AssociatedResources' 

182 

183 schema = type_schema( 

184 'wafv2-enabled', 

185 rinherit=ValueFilter.schema, 

186 **{ 

187 'web-acl': {'type': 'string'}, 

188 'state': {'type': 'boolean'} 

189 } 

190 ) 

191 

192 permissions = ( 

193 'wafv2:ListWebACLs', 

194 'wafv2:GetWebACL', # for augment 

195 'wafv2:ListResourcesForWebACL' # for finding associated regional resources 

196 ) 

197 

198 def __init__(self, data, manager=None): 

199 super().__init__(data, manager) 

200 

201 # "legacy" mode matches previous WAF based filters for backwards compatability and is 

202 # enabled when one of the legacy properties is provided or when no value filter properties 

203 # are given (none of the legacy properties are required) 

204 self._is_legacy = ( 

205 'web-acl' in self.data 

206 or 'state' in self.data 

207 or len(self.data.keys()) == 1 # only filter "type" is given 

208 ) 

209 self._cached_web_acls = None 

210 

211 def _legacy_match(self, r_acl): 

212 target_acl = self.data.get('web-acl') 

213 state = self.data.get('state', False) 

214 

215 return ( 

216 bool(r_acl) 

217 and ( 

218 target_acl is None 

219 or bool(re.match(target_acl, r_acl['Name'])) 

220 ) 

221 ) == state 

222 

223 # get the set of web acls we should look through by asking the resource manager for the set 

224 # based on the scope 

225 def _get_web_acls(self, scope): 

226 if self._cached_web_acls is None: 

227 self._cached_web_acls = self.manager.get_resource_manager('wafv2').resources( 

228 query=dict(Scope=scope), 

229 # required to get the additional detail needed for this filter (e.g. Rules), but 

230 # the legacy mode does not require additional detail 

231 augment=(not self._is_legacy) 

232 ) 

233 

234 return self._cached_web_acls 

235 

236 # simple search over the list of web acls matching on the specified attribute, returns 

237 # None if no match 

238 def _get_associated_web_acl_by_attr(self, attr_name, attr_value, scope): 

239 web_acls = self._get_web_acls(scope) 

240 

241 return next( 

242 filter(lambda acl: acl[attr_name] == attr_value, web_acls), 

243 # default empty so we can actually match where no web acl is present 

244 {} 

245 ) 

246 

247 # load the resources the web_acl is attached to and cache them with the web acl 

248 # we only need to do this for REGIONAL web acls as cloudfront holds a reference to 

249 # web acl 

250 def _load_associated_resources(self, web_acl, resource_type): 

251 cache_key = f'{self.associated_cache_key}:{resource_type}' 

252 

253 if cache_key not in web_acl: 

254 client = self.manager.session_factory().client('wafv2') 

255 

256 web_acl[cache_key] = client.list_resources_for_web_acl( 

257 WebACLArn=web_acl['ARN'], 

258 ResourceType=resource_type 

259 ).get('ResourceArns', []) 

260 

261 return web_acl[cache_key] 

262 

263 def _get_associated_web_acl_cached(self, resource): 

264 if self.cache_key not in resource: 

265 resource[self.cache_key] = self.get_associated_web_acl(resource) 

266 

267 return resource[self.cache_key] 

268 

269 def get_deprecations(self): 

270 filter_name = self.data["type"] 

271 return [ 

272 DeprecatedField(f"{filter_name}.{k}", "Use the value filter attributes instead") 

273 for k in {'web-acl', 'state'}.intersection(self.data) 

274 ] 

275 

276 # only needed for REGIONAL resources so no scope used as regional is default 

277 def get_web_acl_from_associations(self, resource_type, resource_arn): 

278 for web_acl in self._get_web_acls(scope='REGIONAL'): 

279 associated_arns = self._load_associated_resources(web_acl, resource_type) 

280 if resource_arn in associated_arns: 

281 return web_acl 

282 

283 # default empty so we can actually match where no web acl is present 

284 return {} 

285 

286 def get_web_acl_by_arn(self, arn, scope='REGIONAL'): 

287 return self._get_associated_web_acl_by_attr('ARN', arn, scope) 

288 

289 def get_web_acl_by_id(self, id, scope='REGIONAL'): 

290 return self._get_associated_web_acl_by_attr('Id', id, scope) 

291 

292 def validate(self): 

293 # only allow legacy behavior or new ValueFilter behavior, not both 

294 if not self._is_legacy: 

295 # only validate value filter when not in "legacy" mode 

296 super(WafV2FilterBase, self).validate() 

297 

298 def process(self, resources, event=None): 

299 matched = [] 

300 for resource in resources: 

301 r_web_acl = self._get_associated_web_acl_cached(resource) 

302 

303 if self._is_legacy: 

304 if self._legacy_match(r_web_acl): 

305 matched.append(resource) 

306 # call value filter on associated WebACL 

307 elif self(r_web_acl): 

308 matched.append(resource) 

309 

310 return matched 

311 

312 # Main method used to determine the web acl associated with the given resource - must 

313 # be overriden in a base class as each resource has a slightly unigue way of getting the 

314 # associated web acl 

315 @abstractmethod 

316 def get_associated_web_acl(self, resource): 

317 raise NotImplementedError('"get_associated_web_acl" must be overriden')