Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/c7n/filters/vpc.py: 28%

163 statements  

« prev     ^ index     » next       coverage.py v7.3.2, created at 2023-12-08 06:51 +0000

1# Copyright The Cloud Custodian Authors. 

2# SPDX-License-Identifier: Apache-2.0 

3from c7n.exceptions import PolicyValidationError 

4from c7n.utils import local_session, type_schema 

5 

6from .core import Filter, ValueFilter 

7from .related import RelatedResourceFilter 

8 

9 

10class MatchResourceValidator: 

11 

12 def validate(self): 

13 if self.data.get('match-resource'): 

14 self.required_keys = set('key',) 

15 return super(MatchResourceValidator, self).validate() 

16 

17 

18class SecurityGroupFilter(MatchResourceValidator, RelatedResourceFilter): 

19 """Filter a resource by its associated security groups.""" 

20 schema = type_schema( 

21 'security-group', rinherit=ValueFilter.schema, 

22 **{'match-resource': {'type': 'boolean'}, 

23 'operator': {'enum': ['and', 'or']}}) 

24 schema_alias = True 

25 

26 RelatedResource = "c7n.resources.vpc.SecurityGroup" 

27 AnnotationKey = "matched-security-groups" 

28 

29 

30class SubnetFilter(MatchResourceValidator, RelatedResourceFilter): 

31 """Filter a resource by its associated subnets attributes. 

32 

33 This filter is generally available for network attached resources. 

34 

35 ie. to find lambda functions that are vpc attached to subnets with 

36 a tag key Location and value Database. 

37 

38 :example: 

39 

40 .. code-block:: yaml 

41 

42 policies: 

43 - name: lambda 

44 resource: aws.lambda 

45 filters: 

46 - type: subnet 

47 key: tag:Location 

48 value: Database 

49 

50 It also supports finding resources on public or private subnets 

51 via route table introspection to determine if the subnet is 

52 associated to an internet gateway. 

53 

54 :example: 

55 

56 .. code-block:: yaml 

57 

58 policies: 

59 - name: public-ec2 

60 resource: aws.ec2 

61 filters: 

62 - type: subnet 

63 igw: True 

64 key: SubnetId 

65 value: present 

66 

67 """ 

68 

69 schema = type_schema( 

70 'subnet', rinherit=ValueFilter.schema, 

71 **{'match-resource': {'type': 'boolean'}, 

72 'operator': {'enum': ['and', 'or']}, 

73 'igw': {'enum': [True, False]}, 

74 }) 

75 

76 schema_alias = True 

77 RelatedResource = "c7n.resources.vpc.Subnet" 

78 AnnotationKey = "matched-subnets" 

79 

80 def get_permissions(self): 

81 perms = super().get_permissions() 

82 if self.data.get('igw') in (True, False): 

83 perms += self.manager.get_resource_manager( 

84 'aws.route-table').get_permissions() 

85 return perms 

86 

87 def validate(self): 

88 super().validate() 

89 self.check_igw = self.data.get('igw') 

90 

91 def match(self, related): 

92 if self.check_igw in [True, False]: 

93 if not self.match_igw(related): 

94 return False 

95 return super().match(related) 

96 

97 def process(self, resources, event=None): 

98 related = self.get_related(resources) 

99 if self.check_igw in [True, False]: 

100 self.route_tables = self.get_route_tables() 

101 return [r for r in resources if self.process_resource(r, related)] 

102 

103 def get_route_tables(self): 

104 rmanager = self.manager.get_resource_manager('aws.route-table') 

105 route_tables = {} 

106 for r in rmanager.resources(): 

107 for a in r['Associations']: 

108 if a['Main']: 

109 route_tables[r['VpcId']] = r 

110 elif 'SubnetId' in a: 

111 route_tables[a['SubnetId']] = r 

112 return route_tables 

113 

114 def match_igw(self, subnet): 

115 rtable = self.route_tables.get( 

116 subnet['SubnetId'], 

117 self.route_tables.get(subnet['VpcId'])) 

118 if rtable is None: 

119 self.log.debug('route table for %s not found', subnet['SubnetId']) 

120 return 

121 found_igw = False 

122 for route in rtable['Routes']: 

123 if route.get('GatewayId') and route['GatewayId'].startswith('igw-'): 

124 found_igw = True 

125 break 

126 if self.check_igw and found_igw: 

127 return True 

128 elif not self.check_igw and not found_igw: 

129 return True 

130 return False 

131 

132 

133class VpcFilter(MatchResourceValidator, RelatedResourceFilter): 

134 """Filter a resource by its associated vpc.""" 

135 schema = type_schema( 

136 'vpc', rinherit=ValueFilter.schema, 

137 **{'match-resource': {'type': 'boolean'}, 

138 'operator': {'enum': ['and', 'or']}}) 

139 

140 schema_alias = True 

141 RelatedResource = "c7n.resources.vpc.Vpc" 

142 AnnotationKey = "matched-vpcs" 

143 

144 

145class DefaultVpcBase(Filter): 

146 """Filter to resources in a default vpc.""" 

147 vpcs = None 

148 default_vpc = None 

149 permissions = ('ec2:DescribeVpcs',) 

150 

151 def match(self, vpc_id): 

152 if self.default_vpc is None: 

153 self.log.debug("querying default vpc %s" % vpc_id) 

154 client = local_session(self.manager.session_factory).client('ec2') 

155 vpcs = [v['VpcId'] for v 

156 in client.describe_vpcs()['Vpcs'] 

157 if v['IsDefault']] 

158 if vpcs: 

159 self.default_vpc = vpcs.pop() 

160 return vpc_id == self.default_vpc and True or False 

161 

162 

163class NetworkLocation(Filter): 

164 """On a network attached resource, determine intersection of 

165 security-group attributes, subnet attributes, and resource attributes. 

166 

167 The use case is a bit specialized, for most use cases using `subnet` 

168 and `security-group` filters suffice. but say for example you wanted to 

169 verify that an ec2 instance was only using subnets and security groups 

170 with a given tag value, and that tag was not present on the resource. 

171 

172 :Example: 

173 

174 .. code-block:: yaml 

175 

176 policies: 

177 - name: ec2-mismatched-sg-remove 

178 resource: ec2 

179 filters: 

180 - type: network-location 

181 compare: ["resource","security-group"] 

182 key: "tag:TEAM_NAME" 

183 ignore: 

184 - "tag:TEAM_NAME": Enterprise 

185 actions: 

186 - type: modify-security-groups 

187 remove: network-location 

188 isolation-group: sg-xxxxxxxx 

189 """ 

190 

191 schema = type_schema( 

192 'network-location', 

193 **{'missing-ok': { 

194 'type': 'boolean', 

195 'default': False, 

196 'description': ( 

197 "How to handle missing keys on elements, by default this causes" 

198 "resources to be considered not-equal")}, 

199 'match': {'type': 'string', 'enum': ['equal', 'not-equal'], 

200 'default': 'non-equal'}, 

201 'compare': { 

202 'type': 'array', 

203 'description': ( 

204 'Which elements of network location should be considered when' 

205 ' matching.'), 

206 'default': ['resource', 'subnet', 'security-group'], 

207 'items': { 

208 'enum': ['resource', 'subnet', 'security-group']}}, 

209 'key': { 

210 'type': 'string', 

211 'description': 'The attribute expression that should be matched on'}, 

212 'max-cardinality': { 

213 'type': 'integer', 'default': 1, 

214 'title': ''}, 

215 'ignore': {'type': 'array', 'items': {'type': 'object'}}, 

216 'required': ['key'], 

217 

218 }) 

219 schema_alias = True 

220 permissions = ('ec2:DescribeSecurityGroups', 'ec2:DescribeSubnets') 

221 

222 def validate(self): 

223 rfilters = self.manager.filter_registry.keys() 

224 if 'subnet' not in rfilters: 

225 raise PolicyValidationError( 

226 "network-location requires resource subnet filter availability on %s" % ( 

227 self.manager.data)) 

228 

229 if 'security-group' not in rfilters: 

230 raise PolicyValidationError( 

231 "network-location requires resource security-group filter availability on %s" % ( 

232 self.manager.data)) 

233 return self 

234 

235 def process(self, resources, event=None): 

236 self.sg = self.manager.filter_registry.get('security-group')({}, self.manager) 

237 related_sg = self.sg.get_related(resources) 

238 

239 self.subnet = self.manager.filter_registry.get('subnet')({}, self.manager) 

240 related_subnet = self.subnet.get_related(resources) 

241 

242 self.sg_model = self.manager.get_resource_manager('security-group').get_model() 

243 self.subnet_model = self.manager.get_resource_manager('subnet').get_model() 

244 self.vf = self.manager.filter_registry.get('value')({}, self.manager) 

245 

246 # filter options 

247 key = self.data.get('key') 

248 self.compare = self.data.get('compare', ['subnet', 'security-group', 'resource']) 

249 self.max_cardinality = self.data.get('max-cardinality', 1) 

250 self.match = self.data.get('match', 'not-equal') 

251 self.missing_ok = self.data.get('missing-ok', False) 

252 

253 results = [] 

254 for r in resources: 

255 resource_sgs = self.filter_ignored( 

256 [related_sg[sid] for sid in self.sg.get_related_ids([r]) if sid in related_sg]) 

257 resource_subnets = self.filter_ignored( 

258 [related_subnet[sid] for sid in self.subnet.get_related_ids([r]) 

259 if sid in related_subnet]) 

260 found = self.process_resource(r, resource_sgs, resource_subnets, key) 

261 if found: 

262 results.append(found) 

263 

264 return results 

265 

266 def filter_ignored(self, resources): 

267 ignores = self.data.get('ignore', ()) 

268 results = [] 

269 

270 for r in resources: 

271 found = False 

272 for i in ignores: 

273 for k, v in i.items(): 

274 if self.vf.get_resource_value(k, r) == v: 

275 found = True 

276 if found is True: 

277 break 

278 if found is True: 

279 continue 

280 results.append(r) 

281 return results 

282 

283 def process_resource(self, r, resource_sgs, resource_subnets, key): 

284 evaluation = [] 

285 sg_space = set() 

286 subnet_space = set() 

287 

288 if 'subnet' in self.compare: 

289 subnet_values = { 

290 rsub[self.subnet_model.id]: self.subnet.get_resource_value(key, rsub) 

291 for rsub in resource_subnets} 

292 

293 if not self.missing_ok and None in subnet_values.values(): 

294 evaluation.append({ 

295 'reason': 'SubnetLocationAbsent', 

296 'subnets': subnet_values}) 

297 subnet_space = set(filter(None, subnet_values.values())) 

298 

299 if len(subnet_space) > self.max_cardinality: 

300 evaluation.append({ 

301 'reason': 'SubnetLocationCardinality', 

302 'subnets': subnet_values}) 

303 

304 if 'security-group' in self.compare: 

305 sg_values = { 

306 rsg[self.sg_model.id]: self.sg.get_resource_value(key, rsg) 

307 for rsg in resource_sgs} 

308 if not self.missing_ok and None in sg_values.values(): 

309 evaluation.append({ 

310 'reason': 'SecurityGroupLocationAbsent', 

311 'security-groups': sg_values}) 

312 

313 sg_space = set(filter(None, sg_values.values())) 

314 

315 if len(sg_space) > self.max_cardinality: 

316 evaluation.append({ 

317 'reason': 'SecurityGroupLocationCardinality', 

318 'security-groups': sg_values}) 

319 

320 if ('subnet' in self.compare and 

321 'security-group' in self.compare and 

322 sg_space != subnet_space): 

323 evaluation.append({ 

324 'reason': 'LocationMismatch', 

325 'subnets': subnet_values, 

326 'security-groups': sg_values}) 

327 

328 if 'resource' in self.compare: 

329 r_value = self.vf.get_resource_value(key, r) 

330 if not self.missing_ok and r_value is None: 

331 evaluation.append({ 

332 'reason': 'ResourceLocationAbsent', 

333 'resource': r_value}) 

334 elif 'security-group' in self.compare and resource_sgs and r_value not in sg_space: 

335 evaluation.append({ 

336 'reason': 'ResourceLocationMismatch', 

337 'resource': r_value, 

338 'security-groups': sg_values}) 

339 elif 'subnet' in self.compare and resource_subnets and r_value not in subnet_space: 

340 evaluation.append({ 

341 'reason': 'ResourceLocationMismatch', 

342 'resource': r_value, 

343 'subnet': subnet_values}) 

344 if 'security-group' in self.compare and resource_sgs: 

345 mismatched_sgs = {sg_id: sg_value 

346 for sg_id, sg_value in sg_values.items() 

347 if sg_value != r_value} 

348 if mismatched_sgs: 

349 evaluation.append({ 

350 'reason': 'SecurityGroupMismatch', 

351 'resource': r_value, 

352 'security-groups': mismatched_sgs}) 

353 

354 if evaluation and self.match == 'not-equal': 

355 r['c7n:NetworkLocation'] = evaluation 

356 return r 

357 elif not evaluation and self.match == 'equal': 

358 return r