Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/c7n/filters/iamaccess.py: 40%

195 statements  

« prev     ^ index     » next       coverage.py v7.3.2, created at 2023-12-08 06:51 +0000

1# Copyright The Cloud Custodian Authors. 

2# SPDX-License-Identifier: Apache-2.0 

3""" 

4IAM Resource Policy Checker 

5--------------------------- 

6 

7When securing resources with iam policies, we want to parse and evaluate 

8the resource's policy for any cross account or public access grants that 

9are not intended. 

10 

11In general, iam policies can be complex, and where possible using iam 

12simulate is preferrable, but requires passing the caller's arn, which 

13is not feasible when we're evaluating who the valid set of callers 

14are. 

15 

16 

17References 

18 

19- IAM Policy Evaluation 

20 https://docs.aws.amazon.com/IAM/latest/UserGuide/reference_policies_evaluation-logic.html 

21 

22- IAM Policy Reference 

23 https://docs.aws.amazon.com/IAM/latest/UserGuide/reference_policies_elements.html 

24 

25- IAM Global Condition Context Keys 

26 https://docs.aws.amazon.com/IAM/latest/UserGuide/reference_policies_condition-keys.html 

27 

28""" 

29import fnmatch 

30import logging 

31import json 

32 

33from c7n.filters import Filter 

34from c7n.resolver import ValuesFrom 

35from c7n.utils import type_schema 

36 

37log = logging.getLogger('custodian.iamaccess') 

38 

39 

40def _account(arn): 

41 # we could try except but some minor runtime cost, basically flag 

42 # invalids values 

43 if arn.count(":") < 4: 

44 return arn 

45 return arn.split(':', 5)[4] 

46 

47 

48class PolicyChecker: 

49 """ 

50 checker_config: 

51 - check_actions: only check one of the specified actions 

52 - everyone_only: only check for wildcard permission grants 

53 - allowed_accounts: permission grants to these accounts are okay 

54 - whitelist_conditions: a list of conditions that are considered 

55 sufficient enough to whitelist the statement. 

56 """ 

57 def __init__(self, checker_config): 

58 self.checker_config = checker_config 

59 

60 # Config properties 

61 @property 

62 def allowed_accounts(self): 

63 return self.checker_config.get('allowed_accounts', ()) 

64 

65 @property 

66 def everyone_only(self): 

67 return self.checker_config.get('everyone_only', False) 

68 

69 @property 

70 def check_actions(self): 

71 return self.checker_config.get('check_actions', ()) 

72 

73 @property 

74 def whitelist_conditions(self): 

75 return set(v.lower() for v in self.checker_config.get('whitelist_conditions', ())) 

76 

77 @property 

78 def allowed_vpce(self): 

79 return self.checker_config.get('allowed_vpce', ()) 

80 

81 @property 

82 def allowed_vpc(self): 

83 return self.checker_config.get('allowed_vpc', ()) 

84 

85 @property 

86 def allowed_orgid(self): 

87 return self.checker_config.get('allowed_orgid', ()) 

88 

89 # Policy statement handling 

90 def check(self, policy_text): 

91 if isinstance(policy_text, str): 

92 policy = json.loads(policy_text) 

93 else: 

94 policy = policy_text 

95 

96 violations = [] 

97 for s in policy.get('Statement', ()): 

98 if self.handle_statement(s): 

99 violations.append(s) 

100 return violations 

101 

102 def handle_statement(self, s): 

103 if (all((self.handle_principal(s), 

104 self.handle_effect(s), 

105 self.handle_action(s))) and not self.handle_conditions(s)): 

106 return s 

107 

108 def handle_action(self, s): 

109 if self.check_actions: 

110 actions = s.get('Action') 

111 actions = isinstance(actions, str) and (actions,) or actions 

112 for a in actions: 

113 if fnmatch.filter(self.check_actions, a): 

114 return True 

115 return False 

116 return True 

117 

118 def handle_effect(self, s): 

119 if s['Effect'] == 'Allow': 

120 return True 

121 

122 def handle_principal(self, s): 

123 if 'NotPrincipal' in s: 

124 return True 

125 

126 principals = s.get('Principal') 

127 if not principals: 

128 return True 

129 if not isinstance(principals, dict): 

130 principals = {'AWS': principals} 

131 

132 # Ignore service principals, merge the rest into a single set 

133 non_service_principals = set() 

134 for principal_type in set(principals) - {'Service'}: 

135 p = principals[principal_type] 

136 non_service_principals.update({p} if isinstance(p, str) else p) 

137 

138 if not non_service_principals: 

139 return False 

140 

141 principal_ok = True 

142 for pid in non_service_principals: 

143 if pid == '*': 

144 principal_ok = False 

145 elif self.everyone_only: 

146 continue 

147 elif pid.startswith('arn:aws:iam::cloudfront:user'): 

148 continue 

149 else: 

150 account_id = _account(pid) 

151 if account_id not in self.allowed_accounts: 

152 principal_ok = False 

153 return not principal_ok 

154 

155 def handle_conditions(self, s): 

156 conditions = self.normalize_conditions(s) 

157 if not conditions: 

158 return False 

159 

160 results = [] 

161 for c in conditions: 

162 results.append(self.handle_condition(s, c)) 

163 

164 return all(results) 

165 

166 def handle_condition(self, s, c): 

167 if not c['op']: 

168 return False 

169 if c['key'] in self.whitelist_conditions: 

170 return True 

171 handler_name = "handle_%s" % c['key'].replace('-', '_').replace(':', '_') 

172 handler = getattr(self, handler_name, None) 

173 if handler is None: 

174 log.warning("no handler:%s op:%s key:%s values:%s" % ( 

175 handler_name, c['op'], c['key'], c['values'])) 

176 return 

177 return not handler(s, c) 

178 

179 def normalize_conditions(self, s): 

180 s_cond = [] 

181 if 'Condition' not in s: 

182 return s_cond 

183 

184 conditions = ( 

185 'StringEquals', 

186 'StringEqualsIgnoreCase', 

187 'StringLike', 

188 'ArnEquals', 

189 'ArnLike', 

190 'IpAddress', 

191 'NotIpAddress') 

192 set_conditions = ('ForAllValues', 'ForAnyValues') 

193 

194 for s_cond_op in list(s['Condition'].keys()): 

195 cond = {'op': s_cond_op} 

196 

197 if s_cond_op not in conditions: 

198 if not any(s_cond_op.startswith(c) for c in set_conditions): 

199 continue 

200 

201 cond['key'] = list(s['Condition'][s_cond_op].keys())[0] 

202 cond['values'] = s['Condition'][s_cond_op][cond['key']] 

203 cond['values'] = ( 

204 isinstance(cond['values'], 

205 str) and (cond['values'],) or cond['values']) 

206 cond['key'] = cond['key'].lower() 

207 s_cond.append(cond) 

208 

209 return s_cond 

210 

211 # Condition handlers 

212 

213 # sns default policy 

214 def handle_aws_sourceowner(self, s, c): 

215 return bool(set(map(_account, c['values'])).difference(self.allowed_accounts)) 

216 

217 # AWS Connect default policy on Lex 

218 def handle_aws_sourceaccount(self, s, c): 

219 return bool(set(map(_account, c['values'])).difference(self.allowed_accounts)) 

220 

221 # s3 logging 

222 def handle_aws_sourcearn(self, s, c): 

223 return bool(set(map(_account, c['values'])).difference(self.allowed_accounts)) 

224 

225 def handle_aws_sourceip(self, s, c): 

226 return False 

227 

228 def handle_aws_sourcevpce(self, s, c): 

229 if not self.allowed_vpce: 

230 return False 

231 return bool(set(map(_account, c['values'])).difference(self.allowed_vpce)) 

232 

233 def handle_aws_sourcevpc(self, s, c): 

234 if not self.allowed_vpc: 

235 return False 

236 return bool(set(map(_account, c['values'])).difference(self.allowed_vpc)) 

237 

238 def handle_aws_principalorgid(self, s, c): 

239 if not self.allowed_orgid: 

240 return True 

241 return bool(set(map(_account, c['values'])).difference(self.allowed_orgid)) 

242 

243 

244class CrossAccountAccessFilter(Filter): 

245 """Check a resource's embedded iam policy for cross account access. 

246 """ 

247 

248 schema = type_schema( 

249 'cross-account', 

250 # only consider policies that grant one of the given actions. 

251 actions={'type': 'array', 'items': {'type': 'string'}}, 

252 # only consider policies which grant to * 

253 everyone_only={'type': 'boolean'}, 

254 # disregard statements using these conditions. 

255 whitelist_conditions={'type': 'array', 'items': {'type': 'string'}}, 

256 # white list accounts 

257 whitelist_from={'$ref': '#/definitions/filters_common/value_from'}, 

258 whitelist={'type': 'array', 'items': {'type': 'string'}}, 

259 whitelist_orgids_from={'$ref': '#/definitions/filters_common/value_from'}, 

260 whitelist_orgids={'type': 'array', 'items': {'type': 'string'}}, 

261 whitelist_vpce_from={'$ref': '#/definitions/filters_common/value_from'}, 

262 whitelist_vpce={'type': 'array', 'items': {'type': 'string'}}, 

263 whitelist_vpc_from={'$ref': '#/definitions/filters_common/value_from'}, 

264 whitelist_vpc={'type': 'array', 'items': {'type': 'string'}}) 

265 

266 policy_attribute = 'Policy' 

267 annotation_key = 'CrossAccountViolations' 

268 

269 checker_factory = PolicyChecker 

270 

271 def process(self, resources, event=None): 

272 self.everyone_only = self.data.get('everyone_only', False) 

273 self.conditions = set(self.data.get( 

274 'whitelist_conditions', 

275 ("aws:userid", "aws:username"))) 

276 self.actions = self.data.get('actions', ()) 

277 self.accounts = self.get_accounts() 

278 self.vpcs = self.get_vpcs() 

279 self.vpces = self.get_vpces() 

280 self.orgid = self.get_orgids() 

281 self.checker_config = getattr(self, 'checker_config', None) or {} 

282 self.checker_config.update( 

283 {'allowed_accounts': self.accounts, 

284 'allowed_vpc': self.vpcs, 

285 'allowed_vpce': self.vpces, 

286 'allowed_orgid': self.orgid, 

287 'check_actions': self.actions, 

288 'everyone_only': self.everyone_only, 

289 'whitelist_conditions': self.conditions}) 

290 self.checker = self.checker_factory(self.checker_config) 

291 return super(CrossAccountAccessFilter, self).process(resources, event) 

292 

293 def get_accounts(self): 

294 owner_id = self.manager.config.account_id 

295 accounts = set(self.data.get('whitelist', ())) 

296 if 'whitelist_from' in self.data: 

297 values = ValuesFrom(self.data['whitelist_from'], self.manager) 

298 accounts = accounts.union(values.get_values()) 

299 accounts.add(owner_id) 

300 return accounts 

301 

302 def get_vpcs(self): 

303 vpc = set(self.data.get('whitelist_vpc', ())) 

304 if 'whitelist_vpc_from' in self.data: 

305 values = ValuesFrom(self.data['whitelist_vpc_from'], self.manager) 

306 vpc = vpc.union(values.get_values()) 

307 return vpc 

308 

309 def get_vpces(self): 

310 vpce = set(self.data.get('whitelist_vpce', ())) 

311 if 'whitelist_vpce_from' in self.data: 

312 values = ValuesFrom(self.data['whitelist_vpce_from'], self.manager) 

313 vpce = vpce.union(values.get_values()) 

314 return vpce 

315 

316 def get_orgids(self): 

317 org_ids = set(self.data.get('whitelist_orgids', ())) 

318 if 'whitelist_orgids_from' in self.data: 

319 values = ValuesFrom(self.data['whitelist_orgids_from'], self.manager) 

320 org_ids = org_ids.union(values.get_values()) 

321 return org_ids 

322 

323 def get_resource_policy(self, r): 

324 return r.get(self.policy_attribute, None) 

325 

326 def __call__(self, r): 

327 p = self.get_resource_policy(r) 

328 if p is None: 

329 return False 

330 violations = self.checker.check(p) 

331 if violations: 

332 r[self.annotation_key] = violations 

333 return True