Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/c7n_gcp/resources/kms.py: 66%
140 statements
« prev ^ index » next coverage.py v7.3.2, created at 2023-12-08 06:51 +0000
« prev ^ index » next coverage.py v7.3.2, created at 2023-12-08 06:51 +0000
1# Copyright The Cloud Custodian Authors.
2# SPDX-License-Identifier: Apache-2.0
4import re
6from c7n.utils import local_session, type_schema
7from c7n_gcp.provider import resources
8from c7n_gcp.query import QueryResourceManager, TypeInfo, ChildResourceManager, ChildTypeInfo, \
9 GcpLocation
10from c7n_gcp.actions import SetIamPolicy
11from c7n_gcp.filters import IamPolicyFilter
12from c7n.filters import Filter
15@resources.register('kms-keyring')
16class KmsKeyRing(QueryResourceManager):
18 class resource_type(TypeInfo):
19 service = 'cloudkms'
20 version = 'v1'
21 component = 'projects.locations.keyRings'
22 enum_spec = ('list', 'keyRings[]', None)
23 scope = None
24 name = id = 'name'
25 default_report_fields = [
26 "name", "createTime"]
27 asset_type = "cloudkms.googleapis.com/KeyRing"
28 urn_component = "keyring"
29 urn_id_segments = (-1,) # Just use the last segment of the id in the URN
31 @staticmethod
32 def get(client, resource_info):
33 name = 'projects/{}/locations/{}/keyRings/{}' \
34 .format(resource_info['project_id'],
35 resource_info['location'],
36 resource_info['key_ring_id'])
37 return client.execute_command('get', {'name': name})
39 @classmethod
40 def _get_location(cls, resource):
41 return resource["name"].split('/')[3]
43 def get_resource_query(self):
44 if 'query' in self.data:
45 for child in self.data.get('query'):
46 if 'location' in child:
47 location_query = child['location']
48 return {'parent': location_query if isinstance(
49 location_query, list) else [location_query]}
51 def _fetch_resources(self, query):
52 super_fetch_resources = QueryResourceManager._fetch_resources
53 session = local_session(self.session_factory)
54 project = session.get_default_project()
55 locations = (query['parent'] if query and 'parent' in query
56 else GcpLocation.get_service_locations('kms'))
57 project_locations = ['projects/{}/locations/{}'.format(project, location)
58 for location in locations]
59 key_rings = []
60 for location in project_locations:
61 key_rings.extend(super_fetch_resources(self, {'parent': location}))
62 return key_rings
65@KmsKeyRing.filter_registry.register('iam-policy')
66class KmsKeyRingIamPolicyFilter(IamPolicyFilter):
67 """
68 Overrides the base implementation to process kms key ring resources correctly.
69 """
70 permissions = ('cloudkms.keyRings.getIamPolicy',)
73@resources.register('kms-cryptokey')
74class KmsCryptoKey(ChildResourceManager):
75 """GCP Resource
76 https://cloud.google.com/kms/docs/reference/rest/v1/projects.locations.keyRings.cryptoKeys
77 """
78 def _get_parent_resource_info(self, child_instance):
79 project_id, location, key_ring_id = re.match(
80 'projects/(.*?)/locations/(.*?)/keyRings/(.*?)/cryptoKeys/.*',
81 child_instance['name']).groups()
82 return {'project_id': project_id,
83 'location': location,
84 'key_ring_id': key_ring_id}
86 def get_resource_query(self):
87 """Does nothing as self does not need query values unlike its parent
88 which receives them with the use_child_query flag."""
89 pass
91 class resource_type(ChildTypeInfo):
92 service = 'cloudkms'
93 version = 'v1'
94 component = 'projects.locations.keyRings.cryptoKeys'
95 enum_spec = ('list', 'cryptoKeys[]', None)
96 scope = None
97 name = id = 'name'
98 default_report_fields = [
99 name, "purpose", "createTime", "nextRotationTime", "rotationPeriod"]
100 parent_spec = {
101 'resource': 'kms-keyring',
102 'child_enum_params': [
103 ('name', 'parent')
104 ],
105 'use_child_query': True
106 }
107 asset_type = "cloudkms.googleapis.com/CryptoKey"
108 scc_type = "google.cloud.kms.CryptoKey"
109 urn_component = "cryptokey"
110 urn_id_segments = (5, 7)
112 @staticmethod
113 def get(client, resource_info):
114 name = 'projects/{}/locations/{}/keyRings/{}/cryptoKeys/{}' \
115 .format(resource_info['project_id'],
116 resource_info['location'],
117 resource_info['key_ring_id'],
118 resource_info['crypto_key_id'])
119 return client.execute_command('get', {'name': name})
121 @classmethod
122 def _get_location(cls, resource):
123 return resource["name"].split('/')[3]
126@KmsCryptoKey.filter_registry.register('iam-policy')
127class KmsCryptokeyIamPolicyFilter(IamPolicyFilter):
128 """
129 Overrides the base implementation to process KMS Cryptokey resources correctly.
130 """
131 permissions = ('cloudkms.cryptoKeys.get', 'cloudkms.cryptoKeys.list',
132 'cloudkms.cryptoKeys.update', 'resourcemanager.projects.get')
134 def _verb_arguments(self, resource):
135 verb_arguments = SetIamPolicy._verb_arguments(self, resource)
136 return verb_arguments
139@resources.register('kms-cryptokey-version')
140class KmsCryptoKeyVersion(ChildResourceManager):
141 """GCP Resource
142 https://cloud.google.com/kms/docs/reference/rest/v1/projects.locations.keyRings.cryptoKeys.cryptoKeyVersions
143 """
144 def _get_parent_resource_info(self, child_instance):
145 path = 'projects/(.*?)/locations/(.*?)/keyRings/(.*?)/cryptoKeys/(.*?)/cryptoKeyVersions/.*'
146 project_id, location, key_ring_id, crypto_key_id = \
147 re.match(path, child_instance['name']).groups()
148 return {'project_id': project_id,
149 'location': location,
150 'key_ring_id': key_ring_id,
151 'crypto_key_id': crypto_key_id}
153 def get_resource_query(self):
154 """Does nothing as self does not need query values unlike its parent
155 which receives them with the use_child_query flag."""
156 pass
158 class resource_type(ChildTypeInfo):
159 service = 'cloudkms'
160 version = 'v1'
161 component = 'projects.locations.keyRings.cryptoKeys.cryptoKeyVersions'
162 enum_spec = ('list', 'cryptoKeyVersions[]', None)
163 scope = None
164 name = id = 'name'
165 default_report_fields = [
166 "name", "state", "protectionLevel", "algorithm", "createTime", "destroyTime"]
167 parent_spec = {
168 'resource': 'kms-cryptokey',
169 'child_enum_params': [
170 ('name', 'parent')
171 ],
172 'use_child_query': True
173 }
174 asset_type = "cloudkms.googleapis.com/CryptoKeyVersion"
175 urn_component = "cryptokey-version"
176 urn_id_segments = (5, 7, 9)
178 @staticmethod
179 def get(client, resource_info):
180 name = 'projects/{}/locations/{}/keyRings/{}/cryptoKeys/{}/cryptoKeyVersions/{}'\
181 .format(resource_info['project_id'],
182 resource_info['location'],
183 resource_info['key_ring_id'],
184 resource_info['crypto_key_id'],
185 resource_info['crypto_key_version_id'])
186 return client.execute_command('get', {'name': name})
188 @classmethod
189 def _get_location(cls, resource):
190 return resource["name"].split('/')[3]
192 @classmethod
193 def _get_location(cls, resource):
194 return resource["name"].split('/')[3]
197@resources.register('kms-location')
198class KmsLocation(QueryResourceManager):
200 class resource_type(TypeInfo):
201 service = 'cloudkms'
202 version = 'v1'
203 component = 'projects.locations'
204 enum_spec = ('list', 'locations[]', None)
205 scope = None
206 name = id = 'name'
207 default_report_fields = [
208 "name", "createTime"]
209 asset_type = "cloudkms.googleapis.com/Location"
211 def _fetch_resources(self, query):
212 super_fetch_resources = QueryResourceManager._fetch_resources
213 session = local_session(self.session_factory)
214 project = session.get_default_project()
215 return super_fetch_resources(self, {'name': 'projects/{}'.format(project)})
218@KmsLocation.filter_registry.register('keyring')
219class KmsLocationKmsKeyringFilter(Filter):
221 schema = type_schema('keyring', **{
222 'exist': {'type': 'boolean'}},)
223 permissions = ('cloudkms.keyRings.list',)
225 def process(self, resources, event=None):
226 session = local_session(self.manager.session_factory)
227 client = session.client(
228 service_name='cloudkms', version='v1', component='projects.locations.keyRings')
229 accepted_resources = []
230 expecting_exist = self.data['exist']
232 for resource in resources:
233 kms_key_rings = client.execute_query('list', {'parent': resource['name']})
234 kms_key_rings_exist = bool(kms_key_rings.get('keyRings'))
235 if kms_key_rings_exist == expecting_exist:
236 accepted_resources.append(resource)
237 return accepted_resources