Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/c7n/filters/waf.py: 35%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

110 statements  

1import re 

2from abc import abstractmethod, ABCMeta 

3 

4from c7n.deprecated import DeprecatedField 

5from c7n.filters.core import ValueFilter, type_schema 

6 

7 

8class WafClassicRegionalFilterBase(ValueFilter, metaclass=ABCMeta): 

9 """Filter a resource based on an associated WAF Classic WebACL using the generic value 

10 filter. The value passed to the filter will be an instance of WebACL from AWS or an empty 

11 object ({}) if no ACL is associated. WAF Classic can be associated with an Application 

12 Load Balancer or an API Gateway REST API Stage. 

13 

14 https://docs.aws.amazon.com/waf/latest/APIReference/API_wafRegional_WebACL.html 

15 

16 :example: 

17 

18 Find all API Gateway Rest stages that don't have waf enabled with at least one rule 

19 

20 .. code-block:: yaml 

21 

22 policies: 

23 - name: filter-waf-value 

24 resource: aws.rest-stage 

25 filters: 

26 - type: waf-enabled 

27 key: Rules 

28 value: empty 

29 

30 """ 

31 

32 associated_cache_key = 'c7n:AssociatedResources' 

33 

34 schema = type_schema( 

35 'waf-enabled', 

36 rinherit=ValueFilter.schema, 

37 **{ 

38 'web-acl': {'type': 'string'}, 

39 'state': {'type': 'boolean'} 

40 } 

41 ) 

42 

43 permissions = ( 

44 'waf-regional:ListWebACLs', 

45 'waf-regional:GetWebACL', # for augment 

46 'waf-regional:ListResourcesForWebACL' # for finding associated resources 

47 ) 

48 

49 def __init__(self, data, manager=None): 

50 super().__init__(data, manager) 

51 

52 # "legacy" mode matches previous WAF based filters for backwards compatability and is 

53 # enabled when one of the legacy properties is provided or when no value filter properties 

54 # are given (none of the legacy properties are required) 

55 self._is_legacy = ( 

56 'web-acl' in self.data 

57 or 'state' in self.data 

58 or len(self.data.keys()) == 1 # only filter "type" is given 

59 ) 

60 self._cached_web_acls = None 

61 

62 def _legacy_match(self, resource): 

63 target_acl = self.data.get('web-acl') 

64 r_acl = self.get_associated_web_acl(resource) 

65 state = self.data.get('state', False) 

66 

67 # WAF is considered enabled if there is an associated WebACL AND that ACL matches the 

68 # specified target in the filter IF provided 

69 return (bool(r_acl) and (target_acl is None or target_acl == r_acl['Name'])) == state 

70 

71 # get the set of web acls we should look through by asking the resource manager for the set 

72 # based on the scope 

73 def _get_web_acls(self): 

74 if self._cached_web_acls is None: 

75 self._cached_web_acls = self.manager.get_resource_manager('waf-regional').resources( 

76 # required to get the additional detail needed for this filter (e.g. Rules), but 

77 # the legacy mode does not require additional detail 

78 augment=True 

79 ) 

80 

81 return self._cached_web_acls 

82 

83 # load the resources the web_acl is attached to and cache them with the web acl 

84 def _load_associated_resources(self, web_acl, resource_type): 

85 cache_key = f'{self.associated_cache_key}:{resource_type}' 

86 

87 if cache_key in web_acl: 

88 return web_acl[cache_key] 

89 

90 client = self.manager.session_factory().client('waf-regional') 

91 

92 resource_arns = client.list_resources_for_web_acl( 

93 WebACLId=web_acl['WebACLId'], 

94 ResourceType=resource_type 

95 ).get('ResourceArns', []) 

96 

97 web_acl[cache_key] = resource_arns 

98 

99 return resource_arns 

100 

101 def get_deprecations(self): 

102 filter_name = self.data["type"] 

103 return [ 

104 DeprecatedField(f"{filter_name}.{k}", "Use the value filter attributes instead") 

105 for k in {'web-acl', 'state'}.intersection(self.data) 

106 ] 

107 

108 def get_web_acl_from_associations(self, resource_type, resource_arn): 

109 for web_acl in self._get_web_acls(): 

110 associated_arns = self._load_associated_resources(web_acl, resource_type) 

111 if resource_arn in associated_arns: 

112 return web_acl 

113 

114 # default empty so we can actually match where no web acl is present 

115 return {} 

116 

117 def get_web_acl_by_arn(self, arn): 

118 web_acls = self._get_web_acls() 

119 

120 return next( 

121 filter(lambda acl: acl['WebACLArn'] == arn, web_acls), 

122 # default empty so we can actually match where no web acl is present 

123 {} 

124 ) 

125 

126 def validate(self): 

127 # only allow legacy behavior or new ValueFilter behavior, not both 

128 if not self._is_legacy: 

129 # only validate value filter when not in "legacy" mode 

130 super(WafClassicRegionalFilterBase, self).validate() 

131 

132 def process(self, resources, event=None): 

133 if self._is_legacy: 

134 return [ 

135 resource for resource in resources 

136 # call value filter on associated WebACL 

137 if self._legacy_match(resource) 

138 ] 

139 

140 return [ 

141 resource for resource in resources 

142 # call value filter on associated WebACL 

143 if self(self.get_associated_web_acl(resource)) 

144 ] 

145 

146 # Main method used to determine the web acl associated with the given resource - must 

147 # be overriden in a base class as each resource has a slightly unigue way of getting the 

148 # associated web acl 

149 @abstractmethod 

150 def get_associated_web_acl(self, resource): 

151 raise NotImplementedError('"get_associated_web_acl" must be overriden') 

152 

153 

154class WafV2FilterBase(ValueFilter, metaclass=ABCMeta): 

155 """Filter a resource based on an associated WAFv2 WebACL using the generic value filter. The 

156 value passed to the filter will be an instance of WebACL from AWS or an empty object ({}) if 

157 no ACL is associated with the rest stage. WAFv2 can be associated with an Application 

158 Load Balancer, API Gateway REST API Stage, AppSync GraphQL API, Cognito User Pool, Cloudfront 

159 Distribution, or App Runner Service. 

160 

161 https://docs.aws.amazon.com/waf/latest/APIReference/API_WebACL.html 

162 

163 :example: 

164 

165 Ensure an API Gateway Rest stage has waf enabled with at least one rule 

166 

167 .. code-block:: yaml 

168 

169 policies: 

170 - name: filter-wafv2-value 

171 resource: aws.rest-stage 

172 filters: 

173 - type: wafv2-enabled 

174 key: Rules 

175 value: empty 

176 

177 """ 

178 

179 cache_key = 'c7n:WebACL' 

180 associated_cache_key = 'c7n:AssociatedResources' 

181 

182 schema = type_schema( 

183 'wafv2-enabled', 

184 rinherit=ValueFilter.schema, 

185 **{ 

186 'web-acl': {'type': 'string'}, 

187 'state': {'type': 'boolean'} 

188 } 

189 ) 

190 

191 permissions = ( 

192 'wafv2:ListWebACLs', 

193 'wafv2:GetWebACL', # for augment 

194 'wafv2:ListResourcesForWebACL' # for finding associated regional resources 

195 ) 

196 

197 def __init__(self, data, manager=None): 

198 super().__init__(data, manager) 

199 

200 # "legacy" mode matches previous WAF based filters for backwards compatability and is 

201 # enabled when one of the legacy properties is provided or when no value filter properties 

202 # are given (none of the legacy properties are required) 

203 self._is_legacy = ( 

204 'web-acl' in self.data 

205 or 'state' in self.data 

206 or len(self.data.keys()) == 1 # only filter "type" is given 

207 ) 

208 self._cached_web_acls = None 

209 

210 def _legacy_match(self, r_acl): 

211 target_acl = self.data.get('web-acl') 

212 state = self.data.get('state', False) 

213 

214 return ( 

215 bool(r_acl) 

216 and ( 

217 target_acl is None 

218 or bool(re.match(target_acl, r_acl['Name'])) 

219 ) 

220 ) == state 

221 

222 # get the set of web acls we should look through by asking the resource manager for the set 

223 # based on the scope 

224 def _get_web_acls(self, scope): 

225 if self._cached_web_acls is None: 

226 self._cached_web_acls = self.manager.get_resource_manager('wafv2').resources( 

227 query=dict(Scope=scope), 

228 # required to get the additional detail needed for this filter (e.g. Rules), but 

229 # the legacy mode does not require additional detail 

230 augment=(not self._is_legacy) 

231 ) 

232 

233 return self._cached_web_acls 

234 

235 # simple search over the list of web acls matching on the specified attribute, returns 

236 # None if no match 

237 def _get_associated_web_acl_by_attr(self, attr_name, attr_value, scope): 

238 web_acls = self._get_web_acls(scope) 

239 

240 return next( 

241 filter(lambda acl: acl[attr_name] == attr_value, web_acls), 

242 # default empty so we can actually match where no web acl is present 

243 {} 

244 ) 

245 

246 # load the resources the web_acl is attached to and cache them with the web acl 

247 # we only need to do this for REGIONAL web acls as cloudfront holds a reference to 

248 # web acl 

249 def _load_associated_resources(self, web_acl, resource_type): 

250 cache_key = f'{self.associated_cache_key}:{resource_type}' 

251 

252 if cache_key not in web_acl: 

253 client = self.manager.session_factory().client('wafv2') 

254 

255 web_acl[cache_key] = client.list_resources_for_web_acl( 

256 WebACLArn=web_acl['ARN'], 

257 ResourceType=resource_type 

258 ).get('ResourceArns', []) 

259 

260 return web_acl[cache_key] 

261 

262 def _get_associated_web_acl_cached(self, resource): 

263 if self.cache_key not in resource: 

264 resource[self.cache_key] = self.get_associated_web_acl(resource) 

265 

266 return resource[self.cache_key] 

267 

268 def get_deprecations(self): 

269 filter_name = self.data["type"] 

270 return [ 

271 DeprecatedField(f"{filter_name}.{k}", "Use the value filter attributes instead") 

272 for k in {'web-acl', 'state'}.intersection(self.data) 

273 ] 

274 

275 # only needed for REGIONAL resources so no scope used as regional is default 

276 def get_web_acl_from_associations(self, resource_type, resource_arn): 

277 for web_acl in self._get_web_acls(scope='REGIONAL'): 

278 associated_arns = self._load_associated_resources(web_acl, resource_type) 

279 if resource_arn in associated_arns: 

280 return web_acl 

281 

282 # default empty so we can actually match where no web acl is present 

283 return {} 

284 

285 def get_web_acl_by_arn(self, arn, scope='REGIONAL'): 

286 return self._get_associated_web_acl_by_attr('ARN', arn, scope) 

287 

288 def get_web_acl_by_id(self, id, scope='REGIONAL'): 

289 return self._get_associated_web_acl_by_attr('Id', id, scope) 

290 

291 def validate(self): 

292 # only allow legacy behavior or new ValueFilter behavior, not both 

293 if not self._is_legacy: 

294 # only validate value filter when not in "legacy" mode 

295 super(WafV2FilterBase, self).validate() 

296 

297 def process(self, resources, event=None): 

298 matched = [] 

299 for resource in resources: 

300 r_web_acl = self._get_associated_web_acl_cached(resource) 

301 

302 if self._is_legacy: 

303 if self._legacy_match(r_web_acl): 

304 matched.append(resource) 

305 # call value filter on associated WebACL 

306 elif self(r_web_acl): 

307 matched.append(resource) 

308 

309 return matched 

310 

311 # Main method used to determine the web acl associated with the given resource - must 

312 # be overriden in a base class as each resource has a slightly unigue way of getting the 

313 # associated web acl 

314 @abstractmethod 

315 def get_associated_web_acl(self, resource): 

316 raise NotImplementedError('"get_associated_web_acl" must be overriden')