Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/c7n_gcp/actions/cscc.py: 43%
92 statements
« prev ^ index » next coverage.py v7.3.2, created at 2023-12-08 06:51 +0000
« prev ^ index » next coverage.py v7.3.2, created at 2023-12-08 06:51 +0000
1# Copyright The Cloud Custodian Authors.
2# SPDX-License-Identifier: Apache-2.0
3import datetime
4import json
5import hashlib
6from urllib.parse import urlparse
8from c7n.exceptions import PolicyExecutionError, PolicyValidationError
9from c7n.utils import local_session, type_schema
10from .core import MethodAction
12from c7n_gcp.provider import resources as gcp_resources
14SEVERITIES = ['LOW', 'MEDIUM', 'HIGH', 'SEVERITY_UNSPECIFIED']
17class PostFinding(MethodAction):
18 """Post finding for matched resources to Cloud Security Command Center.
21 :Example:
23 .. code-block:: yaml
25 policies:
26 - name: gcp-instances-with-label
27 resource: gcp.instance
28 filters:
29 - "tag:name": "bad-instance"
30 actions:
31 - type: post-finding
32 org-domain: example.io
33 category: MEDIUM_INTERNET_SECURITY
35 The source for custodian can either be specified inline to the policy, or
36 custodian can generate one at runtime if it doesn't exist given a org-domain
37 or org-id.
39 Finding updates are not currently supported, due to upstream api issues.
40 """
41 schema = type_schema(
42 'post-finding',
43 **{
44 'source': {
45 'type': 'string',
46 'description': 'qualified name of source to post to CSCC as'},
47 'org-domain': {'type': 'string'},
48 'org-id': {'type': 'integer'},
49 'category': {'type': 'string'},
50 'severity': {'type': 'string', 'enum': SEVERITIES}})
51 schema_alias = True
52 method_spec = {'op': 'create', 'result': 'name', 'annotation_key': 'c7n:Finding'}
54 # create throws error if already exists, patch method has bad docs.
55 ignore_error_codes = (409,)
57 CustodianSourceName = 'CloudCustodian'
58 DefaultCategory = 'Custodian'
59 DefaultSeverity = 'SEVERITY_UNSPECIFIED'
60 Service = 'securitycenter'
61 ServiceVersion = 'v1'
63 _source = None
65 # security center permission model is pretty obtuse to correct
66 permissions = (
67 'securitycenter.findings.list',
68 'securitycenter.findings.update',
69 'resourcemanager.organizations.get',
70 'securitycenter.assetsecuritymarks.update',
71 'securitycenter.sources.update',
72 'securitycenter.sources.list'
73 )
75 def validate(self):
76 if not any([self.data.get(k) for k in ('source', 'org-domain', 'org-id')]):
77 raise PolicyValidationError(
78 "policy:%s CSCC post-finding requires one of source, org-domain, org-id" % (
79 self.manager.ctx.policy.name))
81 def process(self, resources):
82 self.initialize_source()
83 return super(PostFinding, self).process(resources)
85 def get_client(self, session, model):
86 return session.client(
87 self.Service, self.ServiceVersion, 'organizations.sources.findings')
89 def get_resource_params(self, model, resource):
90 return self.get_finding(resource)
92 def initialize_source(self):
93 # Ideally we'll be given a source, but we'll attempt to auto create it
94 # if given an org_domain or org_id.
95 if self._source:
96 return self._source
97 elif 'source' in self.data:
98 self._source = self.data['source']
99 return self._source
101 session = local_session(self.manager.session_factory)
103 # Resolve Organization Id
104 if 'org-id' in self.data:
105 org_id = self.data['org-id']
106 else:
107 orgs = session.client('cloudresourcemanager', 'v1', 'organizations')
108 res = orgs.execute_query(
109 'search', {'body': {
110 'filter': 'domain:%s' % self.data['org-domain']}}).get(
111 'organizations')
112 if not res:
113 raise PolicyExecutionError("Could not determine organization id")
114 org_id = res[0]['name'].rsplit('/', 1)[-1]
116 # Resolve Source
117 client = session.client(self.Service, self.ServiceVersion, 'organizations.sources')
118 source = None
119 res = [s for s in
120 client.execute_query(
121 'list', {'parent': 'organizations/{}'.format(org_id)}).get('sources')
122 if s['displayName'] == self.CustodianSourceName]
123 if res:
124 source = res[0]['name']
126 if source is None:
127 source = client.execute_command(
128 'create',
129 {'parent': 'organizations/{}'.format(org_id),
130 'body': {
131 'displayName': self.CustodianSourceName,
132 'description': 'Cloud Management Rules Engine'}}).get('name')
133 self.log.info(
134 "policy:%s resolved cscc source: %s, update policy with this source value",
135 self.manager.ctx.policy.name,
136 source)
137 self._source = source
138 return self._source
140 def get_name(self, r):
141 """Given an arbitrary resource attempt to resolve back to a qualified name."""
142 namer = ResourceNameAdapters[self.manager.resource_type.service]
143 return namer(r)
145 def get_finding(self, resource):
146 policy = self.manager.ctx.policy
147 resource_name = self.get_name(resource)
148 # ideally we could be using shake, but its py3.6+ only
149 finding_id = hashlib.sha256(
150 b"%s%s" % (
151 policy.name.encode('utf8'),
152 resource_name.encode('utf8'))).hexdigest()[:32]
154 finding = {
155 'name': '{}/findings/{}'.format(self._source, finding_id),
156 'resourceName': resource_name,
157 'state': 'ACTIVE',
158 'category': self.data.get('category', self.DefaultCategory),
159 'severity': self.data.get('severity', self.DefaultSeverity),
160 'eventTime': datetime.datetime.utcnow().isoformat('T') + 'Z',
161 'sourceProperties': {
162 'resource_type': self.manager.type,
163 'title': policy.data.get('title', policy.name),
164 'policy_name': policy.name,
165 'policy': json.dumps(policy.data)
166 }
167 }
169 request = {
170 'parent': self._source,
171 'findingId': finding_id[:31],
172 'body': finding}
173 return request
175 @classmethod
176 def register_resource(klass, registry, resource_class):
177 if resource_class.resource_type.service not in ResourceNameAdapters:
178 return
179 if 'post-finding' in resource_class.action_registry:
180 return
181 resource_class.action_registry.register('post-finding', klass)
184# CSCC uses its own notion of resource id, if we want our findings on
185# a resource to be linked from the asset view we need to post w/ the
186# same resource name. If this conceptulization of resource name is
187# standard, then we should move these to resource types with
188# appropriate hierarchies by service.
191def name_compute(r):
192 prefix = urlparse(r['selfLink']).path.strip('/').split('/')[2:][:-1]
193 return "//compute.googleapis.com/{}/{}".format(
194 "/".join(prefix),
195 r['id'])
198def name_iam(r):
199 return "//iam.googleapis.com/projects/{}/serviceAccounts/{}".format(
200 r['projectId'],
201 r['uniqueId'])
204def name_resourcemanager(r):
205 rid = r.get('projectNumber')
206 if rid is not None:
207 rtype = 'projects'
208 else:
209 rid = r.get('organizationId')
210 rtype = 'organizations'
211 return "//cloudresourcemanager.googleapis.com/{}/{}".format(
212 rtype, rid)
215def name_container(r):
216 return "//container.googleapis.com/{}".format(
217 "/".join(urlparse(r['selfLink']).path.strip('/').split('/')[1:]))
220def name_storage(r):
221 return "//storage.googleapis.com/{}".format(r['name'])
224def name_appengine(r):
225 return "//appengine.googleapis.com/{}".format(r['name'])
228ResourceNameAdapters = {
229 'appengine': name_appengine,
230 'cloudresourcemanager': name_resourcemanager,
231 'compute': name_compute,
232 'container': name_container,
233 'iam': name_iam,
234 'storage': name_storage,
235}
237gcp_resources.subscribe(PostFinding.register_resource)