Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/c7n/filters/iamaccess.py: 27%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

205 statements  

1# Copyright The Cloud Custodian Authors. 

2# SPDX-License-Identifier: Apache-2.0 

3""" 

4IAM Resource Policy Checker 

5--------------------------- 

6 

7When securing resources with iam policies, we want to parse and evaluate 

8the resource's policy for any cross account or public access grants that 

9are not intended. 

10 

11In general, iam policies can be complex, and where possible using iam 

12simulate is preferrable, but requires passing the caller's arn, which 

13is not feasible when we're evaluating who the valid set of callers 

14are. 

15 

16 

17References 

18 

19- IAM Policy Evaluation 

20 https://docs.aws.amazon.com/IAM/latest/UserGuide/reference_policies_evaluation-logic.html 

21 

22- IAM Policy Reference 

23 https://docs.aws.amazon.com/IAM/latest/UserGuide/reference_policies_elements.html 

24 

25- IAM Global Condition Context Keys 

26 https://docs.aws.amazon.com/IAM/latest/UserGuide/reference_policies_condition-keys.html 

27 

28""" 

29import fnmatch 

30import logging 

31import json 

32 

33from c7n.filters import Filter 

34from c7n.resolver import ValuesFrom 

35from c7n.utils import type_schema 

36 

37log = logging.getLogger('custodian.iamaccess') 

38 

39 

40def _account(arn): 

41 # we could try except but some minor runtime cost, basically flag 

42 # invalids values 

43 if arn.count(":") < 4: 

44 return arn 

45 return arn.split(':', 5)[4] 

46 

47 

48class PolicyChecker: 

49 """ 

50 checker_config: 

51 - check_actions: only check one of the specified actions 

52 - everyone_only: only check for wildcard permission grants 

53 - return_allowed: if true, return the statements that are allowed 

54 - allowed_accounts: permission grants to these accounts are okay 

55 - whitelist_conditions: a list of conditions that are considered 

56 sufficient enough to whitelist the statement. 

57 """ 

58 def __init__(self, checker_config): 

59 self.checker_config = checker_config 

60 

61 # Config properties 

62 @property 

63 def return_allowed(self): 

64 return self.checker_config.get('return_allowed', False) 

65 

66 @property 

67 def allowed_accounts(self): 

68 return self.checker_config.get('allowed_accounts', ()) 

69 

70 @property 

71 def everyone_only(self): 

72 return self.checker_config.get('everyone_only', False) 

73 

74 @property 

75 def check_actions(self): 

76 return self.checker_config.get('check_actions', ()) 

77 

78 @property 

79 def whitelist_conditions(self): 

80 return set(v.lower() for v in self.checker_config.get('whitelist_conditions', ())) 

81 

82 @property 

83 def allowed_vpce(self): 

84 return self.checker_config.get('allowed_vpce', ()) 

85 

86 @property 

87 def allowed_vpc(self): 

88 return self.checker_config.get('allowed_vpc', ()) 

89 

90 @property 

91 def allowed_orgid(self): 

92 return self.checker_config.get('allowed_orgid', ()) 

93 

94 # Policy statement handling 

95 def check(self, policy_text): 

96 if isinstance(policy_text, str): 

97 policy = json.loads(policy_text) 

98 else: 

99 policy = policy_text 

100 

101 allowlist_statements, violations = [], [] 

102 

103 for s in policy.get('Statement', ()): 

104 if self.handle_statement(s): 

105 violations.append(s) 

106 else: 

107 allowlist_statements.append(s) 

108 return allowlist_statements if self.return_allowed else violations 

109 

110 def handle_statement(self, s): 

111 if (all((self.handle_principal(s), 

112 self.handle_effect(s), 

113 self.handle_action(s))) and not self.handle_conditions(s)): 

114 return s 

115 

116 def handle_action(self, s): 

117 if self.check_actions: 

118 actions = s.get('Action') 

119 actions = isinstance(actions, str) and (actions,) or actions 

120 for a in actions: 

121 if fnmatch.filter(self.check_actions, a): 

122 return True 

123 return False 

124 return True 

125 

126 def handle_effect(self, s): 

127 if s['Effect'] == 'Allow': 

128 return True 

129 

130 def handle_principal(self, s): 

131 if 'NotPrincipal' in s: 

132 return True 

133 

134 principals = s.get('Principal') 

135 if not principals: 

136 return True 

137 if not isinstance(principals, dict): 

138 principals = {'AWS': principals} 

139 

140 # Ignore service principals, merge the rest into a single set 

141 non_service_principals = set() 

142 for principal_type in set(principals) - {'Service'}: 

143 p = principals[principal_type] 

144 non_service_principals.update({p} if isinstance(p, str) else p) 

145 

146 if not non_service_principals: 

147 return False 

148 

149 principal_ok = True 

150 for pid in non_service_principals: 

151 if pid == '*': 

152 principal_ok = False 

153 elif self.everyone_only: 

154 continue 

155 elif pid.startswith('arn:aws:iam::cloudfront:user'): 

156 continue 

157 else: 

158 account_id = _account(pid) 

159 if account_id not in self.allowed_accounts: 

160 principal_ok = False 

161 return not principal_ok 

162 

163 def handle_conditions(self, s): 

164 conditions = self.normalize_conditions(s) 

165 if not conditions: 

166 return False 

167 

168 results = [] 

169 for c in conditions: 

170 results.append(self.handle_condition(s, c)) 

171 

172 return all(results) 

173 

174 def handle_condition(self, s, c): 

175 if not c['op']: 

176 return False 

177 if c['key'] in self.whitelist_conditions: 

178 return True 

179 handler_name = "handle_%s" % c['key'].replace('-', '_').replace(':', '_') 

180 handler = getattr(self, handler_name, None) 

181 if handler is None: 

182 log.warning("no handler:%s op:%s key:%s values:%s" % ( 

183 handler_name, c['op'], c['key'], c['values'])) 

184 return 

185 return not handler(s, c) 

186 

187 def normalize_conditions(self, s): 

188 s_cond = [] 

189 if 'Condition' not in s: 

190 return s_cond 

191 

192 conditions = ( 

193 'StringEquals', 

194 'StringEqualsIgnoreCase', 

195 'StringLike', 

196 'ArnEquals', 

197 'ArnLike', 

198 'IpAddress', 

199 'NotIpAddress') 

200 set_conditions = ('ForAllValues', 'ForAnyValues') 

201 

202 for s_cond_op in list(s['Condition'].keys()): 

203 cond = {'op': s_cond_op} 

204 

205 if s_cond_op not in conditions: 

206 if not any(s_cond_op.startswith(c) for c in set_conditions): 

207 continue 

208 

209 cond['key'] = list(s['Condition'][s_cond_op].keys())[0] 

210 cond['values'] = s['Condition'][s_cond_op][cond['key']] 

211 cond['values'] = ( 

212 isinstance(cond['values'], 

213 str) and (cond['values'],) or cond['values']) 

214 cond['key'] = cond['key'].lower() 

215 s_cond.append(cond) 

216 

217 return s_cond 

218 

219 # Condition handlers 

220 

221 # sns default policy 

222 def handle_aws_sourceowner(self, s, c): 

223 return bool(set(map(_account, c['values'])).difference(self.allowed_accounts)) 

224 

225 # AWS Connect default policy on Lex 

226 def handle_aws_sourceaccount(self, s, c): 

227 return bool(set(map(_account, c['values'])).difference(self.allowed_accounts)) 

228 

229 # s3 logging 

230 def handle_aws_sourcearn(self, s, c): 

231 return bool(set(map(_account, c['values'])).difference(self.allowed_accounts)) 

232 

233 def handle_aws_sourceip(self, s, c): 

234 return False 

235 

236 def handle_aws_sourcevpce(self, s, c): 

237 if not self.allowed_vpce: 

238 return False 

239 return bool(set(map(_account, c['values'])).difference(self.allowed_vpce)) 

240 

241 def handle_aws_sourcevpc(self, s, c): 

242 if not self.allowed_vpc: 

243 return False 

244 return bool(set(map(_account, c['values'])).difference(self.allowed_vpc)) 

245 

246 def handle_aws_principalorgid(self, s, c): 

247 if not self.allowed_orgid: 

248 return True 

249 return bool(set(map(_account, c['values'])).difference(self.allowed_orgid)) 

250 

251 

252class CrossAccountAccessFilter(Filter): 

253 """Check a resource's embedded iam policy for cross account access. 

254 """ 

255 

256 schema = type_schema( 

257 'cross-account', 

258 # only consider policies that grant one of the given actions. 

259 actions={'type': 'array', 'items': {'type': 'string'}}, 

260 # only consider policies which grant to * 

261 everyone_only={'type': 'boolean'}, 

262 # only consider policies which grant to the specified accounts 

263 return_allowed={'type': 'boolean'}, 

264 # disregard statements using these conditions. 

265 whitelist_conditions={'type': 'array', 'items': {'type': 'string'}}, 

266 # white list accounts 

267 whitelist_from={'$ref': '#/definitions/filters_common/value_from'}, 

268 whitelist={'type': 'array', 'items': {'type': 'string'}}, 

269 whitelist_orgids_from={'$ref': '#/definitions/filters_common/value_from'}, 

270 whitelist_orgids={'type': 'array', 'items': {'type': 'string'}}, 

271 whitelist_vpce_from={'$ref': '#/definitions/filters_common/value_from'}, 

272 whitelist_vpce={'type': 'array', 'items': {'type': 'string'}}, 

273 whitelist_vpc_from={'$ref': '#/definitions/filters_common/value_from'}, 

274 whitelist_vpc={'type': 'array', 'items': {'type': 'string'}}) 

275 

276 policy_attribute = 'Policy' 

277 annotation_key = 'CrossAccountViolations' 

278 allowlist_key = 'CrossAccountAllowlists' 

279 

280 checker_factory = PolicyChecker 

281 

282 def process(self, resources, event=None): 

283 self.everyone_only = self.data.get('everyone_only', False) 

284 self.return_allowed = self.data.get('return_allowed', False) 

285 self.conditions = set(self.data.get( 

286 'whitelist_conditions', 

287 ("aws:userid", "aws:username"))) 

288 self.actions = self.data.get('actions', ()) 

289 self.accounts = self.get_accounts() 

290 self.vpcs = self.get_vpcs() 

291 self.vpces = self.get_vpces() 

292 self.orgid = self.get_orgids() 

293 self.checker_config = getattr(self, 'checker_config', None) or {} 

294 self.checker_config.update( 

295 {'allowed_accounts': self.accounts, 

296 'allowed_vpc': self.vpcs, 

297 'allowed_vpce': self.vpces, 

298 'allowed_orgid': self.orgid, 

299 'check_actions': self.actions, 

300 'everyone_only': self.everyone_only, 

301 'whitelist_conditions': self.conditions, 

302 'return_allowed': self.return_allowed}) 

303 self.checker = self.checker_factory(self.checker_config) 

304 return super(CrossAccountAccessFilter, self).process(resources, event) 

305 

306 def get_accounts(self): 

307 owner_id = self.manager.config.account_id 

308 accounts = set(self.data.get('whitelist', ())) 

309 if 'whitelist_from' in self.data: 

310 values = ValuesFrom(self.data['whitelist_from'], self.manager) 

311 accounts = accounts.union(values.get_values()) 

312 accounts.add(owner_id) 

313 return accounts 

314 

315 def get_vpcs(self): 

316 vpc = set(self.data.get('whitelist_vpc', ())) 

317 if 'whitelist_vpc_from' in self.data: 

318 values = ValuesFrom(self.data['whitelist_vpc_from'], self.manager) 

319 vpc = vpc.union(values.get_values()) 

320 return vpc 

321 

322 def get_vpces(self): 

323 vpce = set(self.data.get('whitelist_vpce', ())) 

324 if 'whitelist_vpce_from' in self.data: 

325 values = ValuesFrom(self.data['whitelist_vpce_from'], self.manager) 

326 vpce = vpce.union(values.get_values()) 

327 return vpce 

328 

329 def get_orgids(self): 

330 org_ids = set(self.data.get('whitelist_orgids', ())) 

331 if 'whitelist_orgids_from' in self.data: 

332 values = ValuesFrom(self.data['whitelist_orgids_from'], self.manager) 

333 org_ids = org_ids.union(values.get_values()) 

334 return org_ids 

335 

336 def get_resource_policy(self, r): 

337 return r.get(self.policy_attribute, None) 

338 

339 def __call__(self, r): 

340 p = self.get_resource_policy(r) 

341 if p is None: 

342 return False 

343 results = self.checker.check(p) 

344 if self.return_allowed and results: 

345 r[self.allowlist_key] = results 

346 return True 

347 

348 if not self.return_allowed and results: 

349 r[self.annotation_key] = results 

350 return True