Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/c7n/filters/vpc.py: 21%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

209 statements  

1# Copyright The Cloud Custodian Authors. 

2# SPDX-License-Identifier: Apache-2.0 

3from c7n.exceptions import PolicyValidationError 

4from c7n.utils import local_session, type_schema 

5 

6from .core import Filter, ValueFilter 

7from .related import RelatedResourceFilter 

8 

9 

10class MatchResourceValidator: 

11 

12 def validate(self): 

13 if self.data.get('match-resource'): 

14 self.required_keys = set('key',) 

15 return super(MatchResourceValidator, self).validate() 

16 

17 

18class SecurityGroupFilter(MatchResourceValidator, RelatedResourceFilter): 

19 """Filter a resource by its associated security groups.""" 

20 schema = type_schema( 

21 'security-group', rinherit=ValueFilter.schema, 

22 **{'match-resource': {'type': 'boolean'}, 

23 'operator': {'enum': ['and', 'or']}}) 

24 schema_alias = True 

25 

26 RelatedResource = "c7n.resources.vpc.SecurityGroup" 

27 AnnotationKey = "matched-security-groups" 

28 

29 

30class SubnetFilter(MatchResourceValidator, RelatedResourceFilter): 

31 """Filter a resource by its associated subnets attributes. 

32 

33 This filter is generally available for network attached resources. 

34 

35 ie. to find lambda functions that are vpc attached to subnets with 

36 a tag key Location and value Database. 

37 

38 :example: 

39 

40 .. code-block:: yaml 

41 

42 policies: 

43 - name: lambda 

44 resource: aws.lambda 

45 filters: 

46 - type: subnet 

47 key: tag:Location 

48 value: Database 

49 

50 It also supports finding resources on public or private subnets 

51 via route table introspection to determine if the subnet is 

52 associated to an internet gateway or a nat gateway. 

53 

54 :example: 

55 

56 .. code-block:: yaml 

57 

58 policies: 

59 - name: public-ec2 

60 resource: aws.ec2 

61 filters: 

62 - type: subnet 

63 operator: or 

64 igw: True 

65 nat: True 

66 

67 """ 

68 

69 schema = type_schema( 

70 'subnet', rinherit=ValueFilter.schema, 

71 **{'match-resource': {'type': 'boolean'}, 

72 'operator': {'enum': ['and', 'or']}, 

73 'igw': {'enum': [True, False]}, 

74 'nat': {'enum': [True, False]}, 

75 }) 

76 

77 schema_alias = True 

78 RelatedResource = "c7n.resources.vpc.Subnet" 

79 AnnotationKey = "matched-subnets" 

80 required_keys = set() 

81 

82 def get_permissions(self): 

83 perms = super().get_permissions() 

84 if self.data.get('igw') in (True, False) or self.data.get('nat') in (True, False): 

85 perms += self.manager.get_resource_manager( 

86 'aws.route-table').get_permissions() 

87 return perms 

88 

89 def validate(self): 

90 super().validate() 

91 self.check_igw = self.data.get('igw') 

92 self.check_nat = self.data.get('nat') 

93 

94 def match(self, related): 

95 op = all if self.data.get('operator', 'and') == 'and' else any 

96 

97 # If the policy doesn't define value filter keys, implicitly 

98 # pass and skip the value filter match. 

99 value_match = {'key', 'value'}.difference(self.data) or super().match(related) 

100 

101 return op( 

102 ( 

103 self.match_igw(related), 

104 self.match_nat(related), 

105 value_match, 

106 ) 

107 ) 

108 

109 def process(self, resources, event=None): 

110 related = self.get_related(resources) 

111 if self.check_igw in [True, False] or self.check_nat in [True, False]: 

112 self.route_tables = self.get_route_tables() 

113 return [r for r in resources if self.process_resource(r, related)] 

114 

115 def get_route_tables(self): 

116 rmanager = self.manager.get_resource_manager('aws.route-table') 

117 route_tables = {} 

118 for r in rmanager.resources(): 

119 for a in r['Associations']: 

120 if a['Main']: 

121 route_tables[r['VpcId']] = r 

122 elif 'SubnetId' in a: 

123 route_tables[a['SubnetId']] = r 

124 return route_tables 

125 

126 def match_igw(self, subnet): 

127 if self.check_igw is None: 

128 return True 

129 rtable = self.route_tables.get( 

130 subnet['SubnetId'], 

131 self.route_tables.get(subnet['VpcId'])) 

132 if rtable is None: 

133 self.log.debug('route table for %s not found', subnet['SubnetId']) 

134 return 

135 found_igw = False 

136 for route in rtable['Routes']: 

137 if route.get('GatewayId') and route['GatewayId'].startswith('igw-'): 

138 found_igw = True 

139 break 

140 if self.check_igw and found_igw: 

141 return True 

142 elif not self.check_igw and not found_igw: 

143 return True 

144 return False 

145 

146 def match_nat(self, subnet): 

147 if self.check_nat is None: 

148 return True 

149 rtable = self.route_tables.get( 

150 subnet['SubnetId'], 

151 self.route_tables.get(subnet['VpcId'])) 

152 if rtable is None: 

153 self.log.debug('route table for %s not found', subnet['SubnetId']) 

154 return 

155 found_nat = False 

156 for route in rtable['Routes']: 

157 if route.get('NatGatewayId'): 

158 found_nat = True 

159 break 

160 if self.check_nat and found_nat: 

161 return True 

162 elif not self.check_nat and not found_nat: 

163 return True 

164 return False 

165 

166 

167class VpcFilter(MatchResourceValidator, RelatedResourceFilter): 

168 """Filter a resource by its associated vpc.""" 

169 schema = type_schema( 

170 'vpc', rinherit=ValueFilter.schema, 

171 **{'match-resource': {'type': 'boolean'}, 

172 'operator': {'enum': ['and', 'or']}}) 

173 

174 schema_alias = True 

175 RelatedResource = "c7n.resources.vpc.Vpc" 

176 AnnotationKey = "matched-vpcs" 

177 

178 

179class DefaultVpcBase(Filter): 

180 """Filter to resources in a default vpc.""" 

181 vpcs = None 

182 default_vpc = None 

183 permissions = ('ec2:DescribeVpcs',) 

184 

185 def match(self, vpc_id): 

186 if self.default_vpc is None: 

187 self.log.debug("querying default vpc %s" % vpc_id) 

188 client = local_session(self.manager.session_factory).client('ec2') 

189 vpcs = [v['VpcId'] for v 

190 in client.describe_vpcs()['Vpcs'] 

191 if v['IsDefault']] 

192 if vpcs: 

193 self.default_vpc = vpcs.pop() 

194 return vpc_id == self.default_vpc and True or False 

195 

196 

197class NetworkLocation(Filter): 

198 """On a network attached resource, determine intersection of 

199 security-group attributes, subnet attributes, and resource attributes. 

200 

201 The use case is a bit specialized, for most use cases using `subnet` 

202 and `security-group` filters suffice. but say for example you wanted to 

203 verify that an ec2 instance was only using subnets and security groups 

204 with a given tag value, and that tag was not present on the resource. 

205 

206 :Example: 

207 

208 .. code-block:: yaml 

209 

210 policies: 

211 - name: ec2-mismatched-sg-remove 

212 resource: ec2 

213 filters: 

214 - type: network-location 

215 compare: ["resource","security-group"] 

216 key: "tag:TEAM_NAME" 

217 ignore: 

218 - "tag:TEAM_NAME": Enterprise 

219 actions: 

220 - type: modify-security-groups 

221 remove: network-location 

222 isolation-group: sg-xxxxxxxx 

223 """ 

224 

225 schema = type_schema( 

226 'network-location', 

227 **{'missing-ok': { 

228 'type': 'boolean', 

229 'default': False, 

230 'description': ( 

231 "How to handle missing keys on elements, by default this causes" 

232 "resources to be considered not-equal")}, 

233 'match': {'type': 'string', 'enum': ['equal', 'not-equal', 'in'], 

234 'default': 'non-equal'}, 

235 'compare': { 

236 'type': 'array', 

237 'description': ( 

238 'Which elements of network location should be considered when' 

239 ' matching.'), 

240 'default': ['resource', 'subnet', 'security-group'], 

241 'items': { 

242 'enum': ['resource', 'subnet', 'security-group']}}, 

243 'key': { 

244 'type': 'string', 

245 'description': 'The attribute expression that should be matched on'}, 

246 'max-cardinality': { 

247 'type': 'integer', 'default': 1, 

248 'title': ''}, 

249 'ignore': {'type': 'array', 'items': {'type': 'object'}}, 

250 'required': ['key'], 

251 'value': {'type': 'array', 'items': {'type': 'string'}} 

252 }) 

253 schema_alias = True 

254 permissions = ('ec2:DescribeSecurityGroups', 'ec2:DescribeSubnets') 

255 

256 def validate(self): 

257 rfilters = self.manager.filter_registry.keys() 

258 if 'subnet' not in rfilters: 

259 raise PolicyValidationError( 

260 "network-location requires resource subnet filter availability on %s" % ( 

261 self.manager.data)) 

262 

263 if 'security-group' not in rfilters: 

264 raise PolicyValidationError( 

265 "network-location requires resource security-group filter availability on %s" % ( 

266 self.manager.data)) 

267 return self 

268 

269 def process(self, resources, event=None): 

270 self.sg = self.manager.filter_registry.get('security-group')({}, self.manager) 

271 related_sg = self.sg.get_related(resources) 

272 

273 self.subnet = self.manager.filter_registry.get('subnet')({}, self.manager) 

274 related_subnet = self.subnet.get_related(resources) 

275 

276 self.sg_model = self.manager.get_resource_manager('security-group').get_model() 

277 self.subnet_model = self.manager.get_resource_manager('subnet').get_model() 

278 self.vf = self.manager.filter_registry.get('value')({}, self.manager) 

279 

280 # filter options 

281 key = self.data.get('key') 

282 self.compare = self.data.get('compare', ['subnet', 'security-group', 'resource']) 

283 self.max_cardinality = self.data.get('max-cardinality', 1) 

284 self.match = self.data.get('match', 'not-equal') 

285 self.missing_ok = self.data.get('missing-ok', False) 

286 

287 results = [] 

288 for r in resources: 

289 resource_sgs = self.filter_ignored( 

290 [related_sg[sid] for sid in self.sg.get_related_ids([r]) if sid in related_sg]) 

291 resource_subnets = self.filter_ignored( 

292 [related_subnet[sid] for sid in self.subnet.get_related_ids([r]) 

293 if sid in related_subnet]) 

294 found = self.process_resource(r, resource_sgs, resource_subnets, key) 

295 if found: 

296 results.append(found) 

297 

298 return results 

299 

300 def filter_ignored(self, resources): 

301 ignores = self.data.get('ignore', ()) 

302 results = [] 

303 

304 for r in resources: 

305 found = False 

306 for i in ignores: 

307 for k, v in i.items(): 

308 if self.vf.get_resource_value(k, r) == v: 

309 found = True 

310 if found is True: 

311 break 

312 if found is True: 

313 continue 

314 results.append(r) 

315 return results 

316 

317 def process_resource(self, r, resource_sgs, resource_subnets, key): 

318 evaluation = [] 

319 sg_space = set() 

320 subnet_space = set() 

321 

322 if self.match == 'in': 

323 return self.process_match_in(r, resource_sgs, resource_subnets, key) 

324 

325 if 'subnet' in self.compare: 

326 subnet_values = { 

327 rsub[self.subnet_model.id]: self.subnet.get_resource_value(key, rsub) 

328 for rsub in resource_subnets} 

329 

330 if not self.missing_ok and None in subnet_values.values(): 

331 evaluation.append({ 

332 'reason': 'SubnetLocationAbsent', 

333 'subnets': subnet_values}) 

334 subnet_space = set(filter(None, subnet_values.values())) 

335 

336 if len(subnet_space) > self.max_cardinality: 

337 evaluation.append({ 

338 'reason': 'SubnetLocationCardinality', 

339 'subnets': subnet_values}) 

340 

341 if 'security-group' in self.compare: 

342 sg_values = { 

343 rsg[self.sg_model.id]: self.sg.get_resource_value(key, rsg) 

344 for rsg in resource_sgs} 

345 if not self.missing_ok and None in sg_values.values(): 

346 evaluation.append({ 

347 'reason': 'SecurityGroupLocationAbsent', 

348 'security-groups': sg_values}) 

349 

350 sg_space = set(filter(None, sg_values.values())) 

351 

352 if len(sg_space) > self.max_cardinality: 

353 evaluation.append({ 

354 'reason': 'SecurityGroupLocationCardinality', 

355 'security-groups': sg_values}) 

356 

357 if ('subnet' in self.compare and 

358 'security-group' in self.compare and 

359 sg_space != subnet_space): 

360 evaluation.append({ 

361 'reason': 'LocationMismatch', 

362 'subnets': subnet_values, 

363 'security-groups': sg_values}) 

364 

365 if 'resource' in self.compare: 

366 r_value = self.vf.get_resource_value(key, r) 

367 if not self.missing_ok and r_value is None: 

368 evaluation.append({ 

369 'reason': 'ResourceLocationAbsent', 

370 'resource': r_value}) 

371 elif 'security-group' in self.compare and resource_sgs and r_value not in sg_space: 

372 evaluation.append({ 

373 'reason': 'ResourceLocationMismatch', 

374 'resource': r_value, 

375 'security-groups': sg_values}) 

376 elif 'subnet' in self.compare and resource_subnets and r_value not in subnet_space: 

377 evaluation.append({ 

378 'reason': 'ResourceLocationMismatch', 

379 'resource': r_value, 

380 'subnet': subnet_values}) 

381 if 'security-group' in self.compare and resource_sgs: 

382 mismatched_sgs = {sg_id: sg_value 

383 for sg_id, sg_value in sg_values.items() 

384 if sg_value != r_value} 

385 if mismatched_sgs: 

386 evaluation.append({ 

387 'reason': 'SecurityGroupMismatch', 

388 'resource': r_value, 

389 'security-groups': mismatched_sgs}) 

390 

391 if evaluation and self.match == 'not-equal': 

392 r['c7n:NetworkLocation'] = evaluation 

393 return r 

394 elif not evaluation and self.match == 'equal': 

395 return r 

396 

397 def process_match_in(self, r, resource_sgs, resource_subnets, key): 

398 network_location_vals = set(self.data.get('value', [])) 

399 

400 if 'subnet' in self.compare: 

401 subnet_values = { 

402 rsub[self.subnet_model.id]: self.subnet.get_resource_value(key, rsub) 

403 for rsub in resource_subnets} 

404 # import pdb; pdb.set_trace() 

405 if not self.missing_ok and None in subnet_values.values(): 

406 return 

407 

408 subnet_space = set(filter(None, subnet_values.values())) 

409 if not subnet_space.issubset(network_location_vals): 

410 return 

411 

412 if 'security-group' in self.compare: 

413 sg_values = { 

414 rsg[self.sg_model.id]: self.sg.get_resource_value(key, rsg) 

415 for rsg in resource_sgs} 

416 if not self.missing_ok and None in sg_values.values(): 

417 return 

418 

419 sg_space = set(filter(None, sg_values.values())) 

420 

421 if not sg_space.issubset(network_location_vals): 

422 return 

423 

424 if 'resource' in self.compare: 

425 r_value = self.vf.get_resource_value(key, r) 

426 if not self.missing_ok and r_value is None: 

427 return 

428 

429 if r_value not in network_location_vals: 

430 return 

431 

432 return r