Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/c7n/filters/iamaccess.py: 26%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

229 statements  

1# Copyright The Cloud Custodian Authors. 

2# SPDX-License-Identifier: Apache-2.0 

3""" 

4IAM Resource Policy Checker 

5--------------------------- 

6 

7When securing resources with iam policies, we want to parse and evaluate 

8the resource's policy for any cross account or public access grants that 

9are not intended. 

10 

11In general, iam policies can be complex, and where possible using iam 

12simulate is preferrable, but requires passing the caller's arn, which 

13is not feasible when we're evaluating who the valid set of callers 

14are. 

15 

16 

17References 

18 

19- IAM Policy Evaluation 

20 https://docs.aws.amazon.com/IAM/latest/UserGuide/reference_policies_evaluation-logic.html 

21 

22- IAM Policy Reference 

23 https://docs.aws.amazon.com/IAM/latest/UserGuide/reference_policies_elements.html 

24 

25- IAM Global Condition Context Keys 

26 https://docs.aws.amazon.com/IAM/latest/UserGuide/reference_policies_condition-keys.html 

27 

28""" 

29import fnmatch 

30import logging 

31import json 

32 

33from c7n.filters import Filter 

34from c7n.resolver import ValuesFrom 

35from c7n.utils import type_schema 

36 

37log = logging.getLogger('custodian.iamaccess') 

38 

39 

40def _account(arn): 

41 # we could try except but some minor runtime cost, basically flag 

42 # invalids values 

43 if arn.count(":") < 4: 

44 return arn 

45 return arn.split(':', 5)[4] 

46 

47 

48class PolicyChecker: 

49 """ 

50 checker_config: 

51 - check_actions: only check one of the specified actions 

52 - everyone_only: only check for wildcard permission grants 

53 - return_allowed: if true, return the statements that are allowed 

54 - allowed_accounts: permission grants to these accounts are okay 

55 - whitelist_conditions: a list of conditions that are considered 

56 sufficient enough to whitelist the statement. 

57 """ 

58 def __init__(self, checker_config): 

59 self.checker_config = checker_config 

60 

61 # Config properties 

62 @property 

63 def return_allowed(self): 

64 return self.checker_config.get('return_allowed', False) 

65 

66 @property 

67 def allowed_accounts(self): 

68 return self.checker_config.get('allowed_accounts', ()) 

69 

70 @property 

71 def everyone_only(self): 

72 return self.checker_config.get('everyone_only', False) 

73 

74 @property 

75 def check_actions(self): 

76 return self.checker_config.get('check_actions', ()) 

77 

78 @property 

79 def whitelist_conditions(self): 

80 return set(v.lower() for v in self.checker_config.get('whitelist_conditions', ())) 

81 

82 @property 

83 def allowed_vpce(self): 

84 return self.checker_config.get('allowed_vpce', ()) 

85 

86 @property 

87 def allowed_vpc(self): 

88 return self.checker_config.get('allowed_vpc', ()) 

89 

90 @property 

91 def allowed_orgid(self): 

92 return self.checker_config.get('allowed_orgid', ()) 

93 

94 # Policy statement handling 

95 def check(self, policy_text): 

96 if isinstance(policy_text, str): 

97 policy = json.loads(policy_text) 

98 else: 

99 policy = policy_text 

100 

101 allowlist_statements, violations = [], [] 

102 

103 for s in policy.get('Statement', ()): 

104 if self.handle_statement(s): 

105 violations.append(s) 

106 else: 

107 allowlist_statements.append(s) 

108 return allowlist_statements if self.return_allowed else violations 

109 

110 def handle_statement(self, s): 

111 if (all((self.handle_principal(s), 

112 self.handle_effect(s), 

113 self.handle_action(s))) and not self.handle_conditions(s)): 

114 return s 

115 

116 def handle_action(self, s): 

117 if self.check_actions: 

118 actions = s.get('Action') 

119 actions = isinstance(actions, str) and (actions,) or actions 

120 for a in actions: 

121 if fnmatch.filter(self.check_actions, a): 

122 return True 

123 return False 

124 return True 

125 

126 def handle_effect(self, s): 

127 if s['Effect'] == 'Allow': 

128 return True 

129 

130 def handle_principal(self, s): 

131 if 'NotPrincipal' in s: 

132 return True 

133 

134 principals = s.get('Principal') 

135 if not principals: 

136 return True 

137 if not isinstance(principals, dict): 

138 principals = {'AWS': principals} 

139 

140 # Ignore service principals, merge the rest into a single set 

141 non_service_principals = set() 

142 for principal_type in set(principals) - {'Service'}: 

143 p = principals[principal_type] 

144 non_service_principals.update({p} if isinstance(p, str) else p) 

145 

146 if not non_service_principals: 

147 return False 

148 

149 principal_ok = True 

150 for pid in non_service_principals: 

151 if pid == '*': 

152 principal_ok = False 

153 elif self.everyone_only: 

154 continue 

155 elif pid.startswith('arn:aws:iam::cloudfront:user'): 

156 continue 

157 else: 

158 account_id = _account(pid) 

159 if account_id not in self.allowed_accounts: 

160 principal_ok = False 

161 return not principal_ok 

162 

163 def handle_conditions(self, s): 

164 conditions = self.normalize_conditions(s) 

165 if not conditions: 

166 return False 

167 

168 # Evaluate each condition 

169 # handle_condition returns True if the condition whitelists (handler returned False) 

170 # handle_condition returns False if the condition is a violation (handler returned True) 

171 # handle_condition returns None if handler doesn't exist (unknown condition) 

172 

173 results = [] 

174 has_whitelisted_org = False 

175 

176 for c in conditions: 

177 result = self.handle_condition(s, c) 

178 

179 # Unknown handler - be conservative and reject immediately 

180 if result is None: 

181 return False 

182 

183 # Track if we have a whitelisted org condition 

184 if result is True and c['key'] in ('aws:principalorgid', 'aws:resourceorgid'): 

185 has_whitelisted_org = True 

186 

187 results.append(result) 

188 

189 # If all conditions whitelist, return True 

190 if all(results): 

191 return True 

192 

193 # Special case: org ID whitelisted + only wildcard principal conditions fail 

194 if has_whitelisted_org and not all(results): 

195 principal_conditions = { 

196 'aws:principalarn', 'aws:principalaccount', 

197 'aws:sourceaccount', 'aws:sourcearn', 

198 's3:dataaccesspointaccount' 

199 } 

200 

201 # Check which conditions failed 

202 for i, c in enumerate(conditions): 

203 if not results[i]: # This condition failed (didn't whitelist) 

204 if c['key'] not in principal_conditions: 

205 # Non-principal condition failed, can't be saved by org ID 

206 return False 

207 # Check if it's a wildcard 

208 if not all('*' in str(v) for v in c['values']): 

209 # Not a wildcard, it's a real violation 

210 return False 

211 # All failures are wildcard principals, org ID saves it 

212 return True 

213 

214 # Some conditions failed and not covered by special case 

215 return False 

216 

217 def handle_condition(self, s, c): 

218 if not c['op']: 

219 return False 

220 if c['key'] in self.whitelist_conditions: 

221 return True 

222 handler_name = "handle_%s" % c['key'].replace('-', '_').replace(':', '_') 

223 handler = getattr(self, handler_name, None) 

224 if handler is None: 

225 log.warning("no handler:%s op:%s key:%s values:%s" % ( 

226 handler_name, c['op'], c['key'], c['values'])) 

227 return 

228 return not handler(s, c) 

229 

230 def normalize_conditions(self, s): 

231 s_cond = [] 

232 if 'Condition' not in s: 

233 return s_cond 

234 

235 conditions = ( 

236 'StringEquals', 

237 'StringEqualsIgnoreCase', 

238 'StringLike', 

239 'ArnEquals', 

240 'ArnLike', 

241 'IpAddress', 

242 'NotIpAddress') 

243 set_conditions = ('ForAllValues', 'ForAnyValues') 

244 

245 for s_cond_op in list(s['Condition'].keys()): 

246 if s_cond_op not in conditions: 

247 if not any(s_cond_op.startswith(c) for c in set_conditions): 

248 continue 

249 

250 # Loop over all keys under each operator 

251 for key, value in s['Condition'][s_cond_op].items(): 

252 cond = {'op': s_cond_op} 

253 cond['key'] = key.lower() 

254 cond['values'] = (value,) if isinstance(value, str) else value 

255 s_cond.append(cond) 

256 

257 return s_cond 

258 

259 # Condition handlers 

260 

261 # sns default policy 

262 def handle_aws_sourceowner(self, s, c): 

263 return bool(set(map(_account, c['values'])).difference(self.allowed_accounts)) 

264 

265 # AWS Connect default policy on Lex 

266 def handle_aws_sourceaccount(self, s, c): 

267 return bool(set(map(_account, c['values'])).difference(self.allowed_accounts)) 

268 

269 # s3 logging 

270 def handle_aws_sourcearn(self, s, c): 

271 return bool(set(map(_account, c['values'])).difference(self.allowed_accounts)) 

272 

273 def handle_aws_sourceip(self, s, c): 

274 return False 

275 

276 def handle_aws_sourcevpce(self, s, c): 

277 if not self.allowed_vpce: 

278 return False 

279 return bool(set(map(_account, c['values'])).difference(self.allowed_vpce)) 

280 

281 def handle_aws_sourcevpc(self, s, c): 

282 if not self.allowed_vpc: 

283 return False 

284 return bool(set(map(_account, c['values'])).difference(self.allowed_vpc)) 

285 

286 def handle_aws_principalorgid(self, s, c): 

287 if not self.allowed_orgid: 

288 return True 

289 return bool(set(map(_account, c['values'])).difference(self.allowed_orgid)) 

290 

291 def handle_aws_principalarn(self, s, c): 

292 """Handle the aws:PrincipalArn condition key.""" 

293 return bool(set(map(_account, c['values'])).difference(self.allowed_accounts)) 

294 

295 def handle_aws_resourceorgid(self, s, c): 

296 """Handle the aws:resourceOrgID condition key.""" 

297 

298 return bool(set(map(_account, c['values'])).difference(self.allowed_orgid)) 

299 

300 def handle_aws_principalaccount(self, s, c): 

301 """Handle the aws:PrincipalAccount condition key.""" 

302 

303 return bool(set(map(_account, c['values'])).difference(self.allowed_accounts)) 

304 

305 def handle_s3_dataaccesspointaccount(self, s, c): 

306 """Handle the s3:DataAccessPointAccount condition key.""" 

307 

308 return bool(set(map(_account, c['values'])).difference(self.allowed_accounts)) 

309 

310 

311class CrossAccountAccessFilter(Filter): 

312 """Check a resource's embedded iam policy for cross account access. 

313 """ 

314 

315 schema = type_schema( 

316 'cross-account', 

317 # only consider policies that grant one of the given actions. 

318 actions={'type': 'array', 'items': {'type': 'string'}}, 

319 # only consider policies which grant to * 

320 everyone_only={'type': 'boolean'}, 

321 # only consider policies which grant to the specified accounts 

322 return_allowed={'type': 'boolean'}, 

323 # disregard statements using these conditions. 

324 whitelist_conditions={'type': 'array', 'items': {'type': 'string'}}, 

325 # white list accounts 

326 whitelist_from={'$ref': '#/definitions/filters_common/value_from'}, 

327 whitelist={'type': 'array', 'items': {'type': 'string'}}, 

328 whitelist_orgids_from={'$ref': '#/definitions/filters_common/value_from'}, 

329 whitelist_orgids={'type': 'array', 'items': {'type': 'string'}}, 

330 whitelist_vpce_from={'$ref': '#/definitions/filters_common/value_from'}, 

331 whitelist_vpce={'type': 'array', 'items': {'type': 'string'}}, 

332 whitelist_vpc_from={'$ref': '#/definitions/filters_common/value_from'}, 

333 whitelist_vpc={'type': 'array', 'items': {'type': 'string'}}) 

334 

335 policy_attribute = 'Policy' 

336 annotation_key = 'CrossAccountViolations' 

337 allowlist_key = 'CrossAccountAllowlists' 

338 

339 checker_factory = PolicyChecker 

340 

341 def process(self, resources, event=None): 

342 self.everyone_only = self.data.get('everyone_only', False) 

343 self.return_allowed = self.data.get('return_allowed', False) 

344 self.conditions = set(self.data.get( 

345 'whitelist_conditions', 

346 ("aws:userid", "aws:username"))) 

347 self.actions = self.data.get('actions', ()) 

348 self.accounts = self.get_accounts() 

349 self.vpcs = self.get_vpcs() 

350 self.vpces = self.get_vpces() 

351 self.orgid = self.get_orgids() 

352 self.checker_config = getattr(self, 'checker_config', None) or {} 

353 self.checker_config.update( 

354 {'allowed_accounts': self.accounts, 

355 'allowed_vpc': self.vpcs, 

356 'allowed_vpce': self.vpces, 

357 'allowed_orgid': self.orgid, 

358 'check_actions': self.actions, 

359 'everyone_only': self.everyone_only, 

360 'whitelist_conditions': self.conditions, 

361 'return_allowed': self.return_allowed}) 

362 self.checker = self.checker_factory(self.checker_config) 

363 return super(CrossAccountAccessFilter, self).process(resources, event) 

364 

365 def get_accounts(self): 

366 owner_id = self.manager.config.account_id 

367 accounts = set(self.data.get('whitelist', ())) 

368 if 'whitelist_from' in self.data: 

369 values = ValuesFrom(self.data['whitelist_from'], self.manager) 

370 accounts = accounts.union(values.get_values()) 

371 accounts.add(owner_id) 

372 return accounts 

373 

374 def get_vpcs(self): 

375 vpc = set(self.data.get('whitelist_vpc', ())) 

376 if 'whitelist_vpc_from' in self.data: 

377 values = ValuesFrom(self.data['whitelist_vpc_from'], self.manager) 

378 vpc = vpc.union(values.get_values()) 

379 return vpc 

380 

381 def get_vpces(self): 

382 vpce = set(self.data.get('whitelist_vpce', ())) 

383 if 'whitelist_vpce_from' in self.data: 

384 values = ValuesFrom(self.data['whitelist_vpce_from'], self.manager) 

385 vpce = vpce.union(values.get_values()) 

386 return vpce 

387 

388 def get_orgids(self): 

389 org_ids = set(self.data.get('whitelist_orgids', ())) 

390 if 'whitelist_orgids_from' in self.data: 

391 values = ValuesFrom(self.data['whitelist_orgids_from'], self.manager) 

392 org_ids = org_ids.union(values.get_values()) 

393 return org_ids 

394 

395 def get_resource_policy(self, r): 

396 return r.get(self.policy_attribute, None) 

397 

398 def __call__(self, r): 

399 p = self.get_resource_policy(r) 

400 if p is None: 

401 return False 

402 results = self.checker.check(p) 

403 if self.return_allowed and results: 

404 r[self.allowlist_key] = results 

405 return True 

406 

407 if not self.return_allowed and results: 

408 r[self.annotation_key] = results 

409 return True