Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/c7n/filters/iamaccess.py: 28%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

212 statements  

1# Copyright The Cloud Custodian Authors. 

2# SPDX-License-Identifier: Apache-2.0 

3""" 

4IAM Resource Policy Checker 

5--------------------------- 

6 

7When securing resources with iam policies, we want to parse and evaluate 

8the resource's policy for any cross account or public access grants that 

9are not intended. 

10 

11In general, iam policies can be complex, and where possible using iam 

12simulate is preferrable, but requires passing the caller's arn, which 

13is not feasible when we're evaluating who the valid set of callers 

14are. 

15 

16 

17References 

18 

19- IAM Policy Evaluation 

20 https://docs.aws.amazon.com/IAM/latest/UserGuide/reference_policies_evaluation-logic.html 

21 

22- IAM Policy Reference 

23 https://docs.aws.amazon.com/IAM/latest/UserGuide/reference_policies_elements.html 

24 

25- IAM Global Condition Context Keys 

26 https://docs.aws.amazon.com/IAM/latest/UserGuide/reference_policies_condition-keys.html 

27 

28""" 

29import fnmatch 

30import logging 

31import json 

32 

33from c7n.filters import Filter 

34from c7n.resolver import ValuesFrom 

35from c7n.utils import type_schema 

36 

37log = logging.getLogger('custodian.iamaccess') 

38 

39 

40def _account(arn): 

41 # we could try except but some minor runtime cost, basically flag 

42 # invalids values 

43 if arn.count(":") < 4: 

44 return arn 

45 return arn.split(':', 5)[4] 

46 

47 

48class PolicyChecker: 

49 """ 

50 checker_config: 

51 - check_actions: only check one of the specified actions 

52 - everyone_only: only check for wildcard permission grants 

53 - return_allowed: if true, return the statements that are allowed 

54 - allowed_accounts: permission grants to these accounts are okay 

55 - whitelist_conditions: a list of conditions that are considered 

56 sufficient enough to whitelist the statement. 

57 """ 

58 def __init__(self, checker_config): 

59 self.checker_config = checker_config 

60 

61 # Config properties 

62 @property 

63 def return_allowed(self): 

64 return self.checker_config.get('return_allowed', False) 

65 

66 @property 

67 def allowed_accounts(self): 

68 return self.checker_config.get('allowed_accounts', ()) 

69 

70 @property 

71 def everyone_only(self): 

72 return self.checker_config.get('everyone_only', False) 

73 

74 @property 

75 def check_actions(self): 

76 return self.checker_config.get('check_actions', ()) 

77 

78 @property 

79 def whitelist_conditions(self): 

80 return set(v.lower() for v in self.checker_config.get('whitelist_conditions', ())) 

81 

82 @property 

83 def allowed_vpce(self): 

84 return self.checker_config.get('allowed_vpce', ()) 

85 

86 @property 

87 def allowed_vpc(self): 

88 return self.checker_config.get('allowed_vpc', ()) 

89 

90 @property 

91 def allowed_orgid(self): 

92 return self.checker_config.get('allowed_orgid', ()) 

93 

94 # Policy statement handling 

95 def check(self, policy_text): 

96 if isinstance(policy_text, str): 

97 policy = json.loads(policy_text) 

98 else: 

99 policy = policy_text 

100 

101 allowlist_statements, violations = [], [] 

102 

103 for s in policy.get('Statement', ()): 

104 if self.handle_statement(s): 

105 violations.append(s) 

106 else: 

107 allowlist_statements.append(s) 

108 return allowlist_statements if self.return_allowed else violations 

109 

110 def handle_statement(self, s): 

111 if (all((self.handle_principal(s), 

112 self.handle_effect(s), 

113 self.handle_action(s))) and not self.handle_conditions(s)): 

114 return s 

115 

116 def handle_action(self, s): 

117 if self.check_actions: 

118 actions = s.get('Action') 

119 actions = isinstance(actions, str) and (actions,) or actions 

120 for a in actions: 

121 if fnmatch.filter(self.check_actions, a): 

122 return True 

123 return False 

124 return True 

125 

126 def handle_effect(self, s): 

127 if s['Effect'] == 'Allow': 

128 return True 

129 

130 def handle_principal(self, s): 

131 if 'NotPrincipal' in s: 

132 return True 

133 

134 principals = s.get('Principal') 

135 if not principals: 

136 return True 

137 if not isinstance(principals, dict): 

138 principals = {'AWS': principals} 

139 

140 # Ignore service principals, merge the rest into a single set 

141 non_service_principals = set() 

142 for principal_type in set(principals) - {'Service'}: 

143 p = principals[principal_type] 

144 non_service_principals.update({p} if isinstance(p, str) else p) 

145 

146 if not non_service_principals: 

147 return False 

148 

149 principal_ok = True 

150 for pid in non_service_principals: 

151 if pid == '*': 

152 principal_ok = False 

153 elif self.everyone_only: 

154 continue 

155 elif pid.startswith('arn:aws:iam::cloudfront:user'): 

156 continue 

157 else: 

158 account_id = _account(pid) 

159 if account_id not in self.allowed_accounts: 

160 principal_ok = False 

161 return not principal_ok 

162 

163 def handle_conditions(self, s): 

164 conditions = self.normalize_conditions(s) 

165 if not conditions: 

166 return False 

167 

168 results = [] 

169 for c in conditions: 

170 results.append(self.handle_condition(s, c)) 

171 

172 return all(results) 

173 

174 def handle_condition(self, s, c): 

175 if not c['op']: 

176 return False 

177 if c['key'] in self.whitelist_conditions: 

178 return True 

179 handler_name = "handle_%s" % c['key'].replace('-', '_').replace(':', '_') 

180 handler = getattr(self, handler_name, None) 

181 if handler is None: 

182 log.warning("no handler:%s op:%s key:%s values:%s" % ( 

183 handler_name, c['op'], c['key'], c['values'])) 

184 return 

185 return not handler(s, c) 

186 

187 def normalize_conditions(self, s): 

188 s_cond = [] 

189 if 'Condition' not in s: 

190 return s_cond 

191 

192 conditions = ( 

193 'StringEquals', 

194 'StringEqualsIgnoreCase', 

195 'StringLike', 

196 'ArnEquals', 

197 'ArnLike', 

198 'IpAddress', 

199 'NotIpAddress') 

200 set_conditions = ('ForAllValues', 'ForAnyValues') 

201 

202 for s_cond_op in list(s['Condition'].keys()): 

203 if s_cond_op not in conditions: 

204 if not any(s_cond_op.startswith(c) for c in set_conditions): 

205 continue 

206 

207 # Loop over all keys under each operator 

208 for key, value in s['Condition'][s_cond_op].items(): 

209 cond = {'op': s_cond_op} 

210 cond['key'] = key.lower() 

211 cond['values'] = (value,) if isinstance(value, str) else value 

212 s_cond.append(cond) 

213 

214 return s_cond 

215 

216 # Condition handlers 

217 

218 # sns default policy 

219 def handle_aws_sourceowner(self, s, c): 

220 return bool(set(map(_account, c['values'])).difference(self.allowed_accounts)) 

221 

222 # AWS Connect default policy on Lex 

223 def handle_aws_sourceaccount(self, s, c): 

224 return bool(set(map(_account, c['values'])).difference(self.allowed_accounts)) 

225 

226 # s3 logging 

227 def handle_aws_sourcearn(self, s, c): 

228 return bool(set(map(_account, c['values'])).difference(self.allowed_accounts)) 

229 

230 def handle_aws_sourceip(self, s, c): 

231 return False 

232 

233 def handle_aws_sourcevpce(self, s, c): 

234 if not self.allowed_vpce: 

235 return False 

236 return bool(set(map(_account, c['values'])).difference(self.allowed_vpce)) 

237 

238 def handle_aws_sourcevpc(self, s, c): 

239 if not self.allowed_vpc: 

240 return False 

241 return bool(set(map(_account, c['values'])).difference(self.allowed_vpc)) 

242 

243 def handle_aws_principalorgid(self, s, c): 

244 if not self.allowed_orgid: 

245 return True 

246 return bool(set(map(_account, c['values'])).difference(self.allowed_orgid)) 

247 

248 def handle_aws_principalarn(self, s, c): 

249 """Handle the aws:PrincipalArn condition key.""" 

250 

251 return bool(set(map(_account, c['values'])).difference(self.allowed_accounts)) 

252 

253 def handle_aws_resourceorgid(self, s, c): 

254 """Handle the aws:resourceOrgID condition key.""" 

255 

256 return bool(set(map(_account, c['values'])).difference(self.allowed_orgid)) 

257 

258 def handle_aws_principalaccount(self, s, c): 

259 """Handle the aws:PrincipalAccount condition key.""" 

260 

261 return bool(set(map(_account, c['values'])).difference(self.allowed_accounts)) 

262 

263 def handle_s3_dataaccesspointaccount(self, s, c): 

264 """Handle the s3:DataAccessPointAccount condition key.""" 

265 

266 return bool(set(map(_account, c['values'])).difference(self.allowed_accounts)) 

267 

268 

269class CrossAccountAccessFilter(Filter): 

270 """Check a resource's embedded iam policy for cross account access. 

271 """ 

272 

273 schema = type_schema( 

274 'cross-account', 

275 # only consider policies that grant one of the given actions. 

276 actions={'type': 'array', 'items': {'type': 'string'}}, 

277 # only consider policies which grant to * 

278 everyone_only={'type': 'boolean'}, 

279 # only consider policies which grant to the specified accounts 

280 return_allowed={'type': 'boolean'}, 

281 # disregard statements using these conditions. 

282 whitelist_conditions={'type': 'array', 'items': {'type': 'string'}}, 

283 # white list accounts 

284 whitelist_from={'$ref': '#/definitions/filters_common/value_from'}, 

285 whitelist={'type': 'array', 'items': {'type': 'string'}}, 

286 whitelist_orgids_from={'$ref': '#/definitions/filters_common/value_from'}, 

287 whitelist_orgids={'type': 'array', 'items': {'type': 'string'}}, 

288 whitelist_vpce_from={'$ref': '#/definitions/filters_common/value_from'}, 

289 whitelist_vpce={'type': 'array', 'items': {'type': 'string'}}, 

290 whitelist_vpc_from={'$ref': '#/definitions/filters_common/value_from'}, 

291 whitelist_vpc={'type': 'array', 'items': {'type': 'string'}}) 

292 

293 policy_attribute = 'Policy' 

294 annotation_key = 'CrossAccountViolations' 

295 allowlist_key = 'CrossAccountAllowlists' 

296 

297 checker_factory = PolicyChecker 

298 

299 def process(self, resources, event=None): 

300 self.everyone_only = self.data.get('everyone_only', False) 

301 self.return_allowed = self.data.get('return_allowed', False) 

302 self.conditions = set(self.data.get( 

303 'whitelist_conditions', 

304 ("aws:userid", "aws:username"))) 

305 self.actions = self.data.get('actions', ()) 

306 self.accounts = self.get_accounts() 

307 self.vpcs = self.get_vpcs() 

308 self.vpces = self.get_vpces() 

309 self.orgid = self.get_orgids() 

310 self.checker_config = getattr(self, 'checker_config', None) or {} 

311 self.checker_config.update( 

312 {'allowed_accounts': self.accounts, 

313 'allowed_vpc': self.vpcs, 

314 'allowed_vpce': self.vpces, 

315 'allowed_orgid': self.orgid, 

316 'check_actions': self.actions, 

317 'everyone_only': self.everyone_only, 

318 'whitelist_conditions': self.conditions, 

319 'return_allowed': self.return_allowed}) 

320 self.checker = self.checker_factory(self.checker_config) 

321 return super(CrossAccountAccessFilter, self).process(resources, event) 

322 

323 def get_accounts(self): 

324 owner_id = self.manager.config.account_id 

325 accounts = set(self.data.get('whitelist', ())) 

326 if 'whitelist_from' in self.data: 

327 values = ValuesFrom(self.data['whitelist_from'], self.manager) 

328 accounts = accounts.union(values.get_values()) 

329 accounts.add(owner_id) 

330 return accounts 

331 

332 def get_vpcs(self): 

333 vpc = set(self.data.get('whitelist_vpc', ())) 

334 if 'whitelist_vpc_from' in self.data: 

335 values = ValuesFrom(self.data['whitelist_vpc_from'], self.manager) 

336 vpc = vpc.union(values.get_values()) 

337 return vpc 

338 

339 def get_vpces(self): 

340 vpce = set(self.data.get('whitelist_vpce', ())) 

341 if 'whitelist_vpce_from' in self.data: 

342 values = ValuesFrom(self.data['whitelist_vpce_from'], self.manager) 

343 vpce = vpce.union(values.get_values()) 

344 return vpce 

345 

346 def get_orgids(self): 

347 org_ids = set(self.data.get('whitelist_orgids', ())) 

348 if 'whitelist_orgids_from' in self.data: 

349 values = ValuesFrom(self.data['whitelist_orgids_from'], self.manager) 

350 org_ids = org_ids.union(values.get_values()) 

351 return org_ids 

352 

353 def get_resource_policy(self, r): 

354 return r.get(self.policy_attribute, None) 

355 

356 def __call__(self, r): 

357 p = self.get_resource_policy(r) 

358 if p is None: 

359 return False 

360 results = self.checker.check(p) 

361 if self.return_allowed and results: 

362 r[self.allowlist_key] = results 

363 return True 

364 

365 if not self.return_allowed and results: 

366 r[self.annotation_key] = results 

367 return True