Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/c7n_gcp/actions/cscc.py: 43%

92 statements  

« prev     ^ index     » next       coverage.py v7.3.2, created at 2023-12-08 06:51 +0000

1# Copyright The Cloud Custodian Authors. 

2# SPDX-License-Identifier: Apache-2.0 

3import datetime 

4import json 

5import hashlib 

6from urllib.parse import urlparse 

7 

8from c7n.exceptions import PolicyExecutionError, PolicyValidationError 

9from c7n.utils import local_session, type_schema 

10from .core import MethodAction 

11 

12from c7n_gcp.provider import resources as gcp_resources 

13 

14SEVERITIES = ['LOW', 'MEDIUM', 'HIGH', 'SEVERITY_UNSPECIFIED'] 

15 

16 

17class PostFinding(MethodAction): 

18 """Post finding for matched resources to Cloud Security Command Center. 

19 

20 

21 :Example: 

22 

23 .. code-block:: yaml 

24 

25 policies: 

26 - name: gcp-instances-with-label 

27 resource: gcp.instance 

28 filters: 

29 - "tag:name": "bad-instance" 

30 actions: 

31 - type: post-finding 

32 org-domain: example.io 

33 category: MEDIUM_INTERNET_SECURITY 

34 

35 The source for custodian can either be specified inline to the policy, or 

36 custodian can generate one at runtime if it doesn't exist given a org-domain 

37 or org-id. 

38 

39 Finding updates are not currently supported, due to upstream api issues. 

40 """ 

41 schema = type_schema( 

42 'post-finding', 

43 **{ 

44 'source': { 

45 'type': 'string', 

46 'description': 'qualified name of source to post to CSCC as'}, 

47 'org-domain': {'type': 'string'}, 

48 'org-id': {'type': 'integer'}, 

49 'category': {'type': 'string'}, 

50 'severity': {'type': 'string', 'enum': SEVERITIES}}) 

51 schema_alias = True 

52 method_spec = {'op': 'create', 'result': 'name', 'annotation_key': 'c7n:Finding'} 

53 

54 # create throws error if already exists, patch method has bad docs. 

55 ignore_error_codes = (409,) 

56 

57 CustodianSourceName = 'CloudCustodian' 

58 DefaultCategory = 'Custodian' 

59 DefaultSeverity = 'SEVERITY_UNSPECIFIED' 

60 Service = 'securitycenter' 

61 ServiceVersion = 'v1' 

62 

63 _source = None 

64 

65 # security center permission model is pretty obtuse to correct 

66 permissions = ( 

67 'securitycenter.findings.list', 

68 'securitycenter.findings.update', 

69 'resourcemanager.organizations.get', 

70 'securitycenter.assetsecuritymarks.update', 

71 'securitycenter.sources.update', 

72 'securitycenter.sources.list' 

73 ) 

74 

75 def validate(self): 

76 if not any([self.data.get(k) for k in ('source', 'org-domain', 'org-id')]): 

77 raise PolicyValidationError( 

78 "policy:%s CSCC post-finding requires one of source, org-domain, org-id" % ( 

79 self.manager.ctx.policy.name)) 

80 

81 def process(self, resources): 

82 self.initialize_source() 

83 return super(PostFinding, self).process(resources) 

84 

85 def get_client(self, session, model): 

86 return session.client( 

87 self.Service, self.ServiceVersion, 'organizations.sources.findings') 

88 

89 def get_resource_params(self, model, resource): 

90 return self.get_finding(resource) 

91 

92 def initialize_source(self): 

93 # Ideally we'll be given a source, but we'll attempt to auto create it 

94 # if given an org_domain or org_id. 

95 if self._source: 

96 return self._source 

97 elif 'source' in self.data: 

98 self._source = self.data['source'] 

99 return self._source 

100 

101 session = local_session(self.manager.session_factory) 

102 

103 # Resolve Organization Id 

104 if 'org-id' in self.data: 

105 org_id = self.data['org-id'] 

106 else: 

107 orgs = session.client('cloudresourcemanager', 'v1', 'organizations') 

108 res = orgs.execute_query( 

109 'search', {'body': { 

110 'filter': 'domain:%s' % self.data['org-domain']}}).get( 

111 'organizations') 

112 if not res: 

113 raise PolicyExecutionError("Could not determine organization id") 

114 org_id = res[0]['name'].rsplit('/', 1)[-1] 

115 

116 # Resolve Source 

117 client = session.client(self.Service, self.ServiceVersion, 'organizations.sources') 

118 source = None 

119 res = [s for s in 

120 client.execute_query( 

121 'list', {'parent': 'organizations/{}'.format(org_id)}).get('sources') 

122 if s['displayName'] == self.CustodianSourceName] 

123 if res: 

124 source = res[0]['name'] 

125 

126 if source is None: 

127 source = client.execute_command( 

128 'create', 

129 {'parent': 'organizations/{}'.format(org_id), 

130 'body': { 

131 'displayName': self.CustodianSourceName, 

132 'description': 'Cloud Management Rules Engine'}}).get('name') 

133 self.log.info( 

134 "policy:%s resolved cscc source: %s, update policy with this source value", 

135 self.manager.ctx.policy.name, 

136 source) 

137 self._source = source 

138 return self._source 

139 

140 def get_name(self, r): 

141 """Given an arbitrary resource attempt to resolve back to a qualified name.""" 

142 namer = ResourceNameAdapters[self.manager.resource_type.service] 

143 return namer(r) 

144 

145 def get_finding(self, resource): 

146 policy = self.manager.ctx.policy 

147 resource_name = self.get_name(resource) 

148 # ideally we could be using shake, but its py3.6+ only 

149 finding_id = hashlib.sha256( 

150 b"%s%s" % ( 

151 policy.name.encode('utf8'), 

152 resource_name.encode('utf8'))).hexdigest()[:32] 

153 

154 finding = { 

155 'name': '{}/findings/{}'.format(self._source, finding_id), 

156 'resourceName': resource_name, 

157 'state': 'ACTIVE', 

158 'category': self.data.get('category', self.DefaultCategory), 

159 'severity': self.data.get('severity', self.DefaultSeverity), 

160 'eventTime': datetime.datetime.utcnow().isoformat('T') + 'Z', 

161 'sourceProperties': { 

162 'resource_type': self.manager.type, 

163 'title': policy.data.get('title', policy.name), 

164 'policy_name': policy.name, 

165 'policy': json.dumps(policy.data) 

166 } 

167 } 

168 

169 request = { 

170 'parent': self._source, 

171 'findingId': finding_id[:31], 

172 'body': finding} 

173 return request 

174 

175 @classmethod 

176 def register_resource(klass, registry, resource_class): 

177 if resource_class.resource_type.service not in ResourceNameAdapters: 

178 return 

179 if 'post-finding' in resource_class.action_registry: 

180 return 

181 resource_class.action_registry.register('post-finding', klass) 

182 

183 

184# CSCC uses its own notion of resource id, if we want our findings on 

185# a resource to be linked from the asset view we need to post w/ the 

186# same resource name. If this conceptulization of resource name is 

187# standard, then we should move these to resource types with 

188# appropriate hierarchies by service. 

189 

190 

191def name_compute(r): 

192 prefix = urlparse(r['selfLink']).path.strip('/').split('/')[2:][:-1] 

193 return "//compute.googleapis.com/{}/{}".format( 

194 "/".join(prefix), 

195 r['id']) 

196 

197 

198def name_iam(r): 

199 return "//iam.googleapis.com/projects/{}/serviceAccounts/{}".format( 

200 r['projectId'], 

201 r['uniqueId']) 

202 

203 

204def name_resourcemanager(r): 

205 rid = r.get('projectNumber') 

206 if rid is not None: 

207 rtype = 'projects' 

208 else: 

209 rid = r.get('organizationId') 

210 rtype = 'organizations' 

211 return "//cloudresourcemanager.googleapis.com/{}/{}".format( 

212 rtype, rid) 

213 

214 

215def name_container(r): 

216 return "//container.googleapis.com/{}".format( 

217 "/".join(urlparse(r['selfLink']).path.strip('/').split('/')[1:])) 

218 

219 

220def name_storage(r): 

221 return "//storage.googleapis.com/{}".format(r['name']) 

222 

223 

224def name_appengine(r): 

225 return "//appengine.googleapis.com/{}".format(r['name']) 

226 

227 

228ResourceNameAdapters = { 

229 'appengine': name_appengine, 

230 'cloudresourcemanager': name_resourcemanager, 

231 'compute': name_compute, 

232 'container': name_container, 

233 'iam': name_iam, 

234 'storage': name_storage, 

235} 

236 

237gcp_resources.subscribe(PostFinding.register_resource)