Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/c7n_gcp/filters/iampolicy.py: 36%

75 statements  

« prev     ^ index     » next       coverage.py v7.3.2, created at 2023-12-08 06:51 +0000

1# Copyright The Cloud Custodian Authors. 

2# SPDX-License-Identifier: Apache-2.0 

3import copy 

4from c7n.filters.core import Filter, ValueFilter 

5 

6from c7n.utils import local_session, type_schema 

7 

8 

9class IamPolicyFilter(Filter): 

10 """ 

11 Filters resources based on their IAM policy 

12 """ 

13 

14 value_filter_schema = copy.deepcopy(ValueFilter.schema) 

15 del value_filter_schema['required'] 

16 

17 user_role_schema = { 

18 'type': 'object', 

19 'additionalProperties': False, 

20 'required': ['user', 'role'], 

21 'properties': { 

22 'user': {'type': 'string'}, 

23 'role': {'type': 'string'}, 

24 'has': {'type': 'boolean'} 

25 } 

26 } 

27 

28 schema = type_schema( 

29 'iam-policy', 

30 **{'doc': value_filter_schema, 

31 'user-role': user_role_schema}) 

32 

33 def get_client(self, session, model): 

34 return session.client( 

35 model.service, model.version, model.component) 

36 

37 def process(self, resources, event=None): 

38 if 'doc' in self.data: 

39 try: 

40 resources = self.process_resources(resources) 

41 except TypeError: 

42 valueFilter = IamPolicyValueFilter(self.data['doc'], self.manager, "bucket") 

43 resources = valueFilter.process(resources) 

44 if 'user-role' in self.data: 

45 user_role = self.data['user-role'] 

46 key = user_role['user'] 

47 val = user_role['role'] 

48 op = 'in' if user_role.get('has', True) else 'not-in' 

49 value_type = 'swap' 

50 userRolePairFilter = IamPolicyUserRolePairFilter({'key': key, 'value': val, 

51 'op': op, 'value_type': value_type}, self.manager) 

52 resources = userRolePairFilter.process(resources) 

53 

54 return resources 

55 

56 def process_resources(self, resources): 

57 valueFilter = IamPolicyValueFilter(self.data['doc'], self.manager) 

58 resources = valueFilter.process(resources) 

59 return resources 

60 

61 

62class IamPolicyValueFilter(ValueFilter): 

63 """Generic value filter on resources' IAM policy bindings using jmespath 

64 

65 :example: 

66 

67 Filter all kms-cryptokeys accessible to all users 

68 or all authenticated users 

69 

70 .. code-block :: yaml 

71 

72 policies: 

73 - name: gcp-iam-policy-value 

74 resource: gcp.kms-cryptokey 

75 filters: 

76 - type: iam-policy 

77 doc: 

78 key: "bindings[*].members[]" 

79 op: intersect 

80 value: ["allUsers", "allAuthenticatedUsers"] 

81 """ 

82 

83 schema = type_schema('iam-policy', rinherit=ValueFilter.schema,) 

84# permissions = 'GCP_SERVICE.GCP_RESOURCE.getIamPolicy',) 

85 

86 def __init__(self, data, manager=None, identifier="resource"): 

87 super(IamPolicyValueFilter, self).__init__(data, manager) 

88 self.identifier = identifier 

89 

90 def get_client(self, session, model): 

91 return session.client( 

92 model.service, model.version, model.component) 

93 

94 def process(self, resources, event=None): 

95 model = self.manager.get_model() 

96 session = local_session(self.manager.session_factory) 

97 client = self.get_client(session, model) 

98 

99 for r in resources: 

100 iam_policy = client.execute_command('getIamPolicy', self._verb_arguments(r)) 

101 r["c7n:iamPolicy"] = iam_policy 

102 

103 return super(IamPolicyValueFilter, self).process(resources) 

104 

105 def __call__(self, r): 

106 return self.match(r['c7n:iamPolicy']) 

107 

108 def _verb_arguments(self, resource): 

109 """ 

110 Returns a dictionary passed when making the `getIamPolicy` and 'setIamPolicy' API calls. 

111 

112 :param resource: the same as in `get_resource_params` 

113 """ 

114 return {self.identifier: resource[self.manager.resource_type.id]} 

115 

116 

117class IamPolicyUserRolePairFilter(ValueFilter): 

118 """Filters resources based on specified user-role pairs. 

119 

120 :example: 

121 

122 Filter all projects where the user test123@gmail.com does not have the owner role 

123 

124 .. code-block :: yaml 

125 

126 policies: 

127 - name: gcp-iam-user-roles 

128 resource: gcp.project 

129 filters: 

130 - type: iam-policy 

131 user-role: 

132 user: "user:test123@gmail.com" 

133 has: false 

134 role: "roles/owner" 

135 """ 

136 

137 schema = type_schema('iam-user-roles', rinherit=ValueFilter.schema) 

138# permissions = ('resourcemanager.projects.getIamPolicy',) 

139 

140 def get_client(self, session, model): 

141 return session.client( 

142 model.service, model.version, model.component) 

143 

144 def process(self, resources, event=None): 

145 model = self.manager.get_model() 

146 session = local_session(self.manager.session_factory) 

147 client = self.get_client(session, model) 

148 

149 for r in resources: 

150 resource_key = 'projectId' if 'projectId' in r else 'name' 

151 iam_policy = client.execute_command('getIamPolicy', {"resource": r[resource_key]}) 

152 r["c7n:iamPolicyUserRolePair"] = {} 

153 userToRolesMap = {} 

154 

155 for b in iam_policy["bindings"]: 

156 role, members = b["role"], b["members"] 

157 for user in members: 

158 if user in userToRolesMap: 

159 userToRolesMap[user].append(role) 

160 else: 

161 userToRolesMap[user] = [role] 

162 for user, roles in userToRolesMap.items(): 

163 r["c7n:iamPolicyUserRolePair"][user] = roles 

164 

165 return super(IamPolicyUserRolePairFilter, self).process(resources) 

166 

167 def __call__(self, r): 

168 return self.match(r["c7n:iamPolicyUserRolePair"]) 

169 

170 def _verb_arguments(self, resource, identifier="resource"): 

171 """ 

172 Returns a dictionary passed when making the `getIamPolicy` and 'setIamPolicy' API calls. 

173 

174 :param resource: the same as in `get_resource_params` 

175 """ 

176 return {identifier: resource[self.manager.resource_type.id]}