Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/c7n_gcp/resources/kms.py: 66%

140 statements  

« prev     ^ index     » next       coverage.py v7.3.2, created at 2023-12-08 06:51 +0000

1# Copyright The Cloud Custodian Authors. 

2# SPDX-License-Identifier: Apache-2.0 

3 

4import re 

5 

6from c7n.utils import local_session, type_schema 

7from c7n_gcp.provider import resources 

8from c7n_gcp.query import QueryResourceManager, TypeInfo, ChildResourceManager, ChildTypeInfo, \ 

9 GcpLocation 

10from c7n_gcp.actions import SetIamPolicy 

11from c7n_gcp.filters import IamPolicyFilter 

12from c7n.filters import Filter 

13 

14 

15@resources.register('kms-keyring') 

16class KmsKeyRing(QueryResourceManager): 

17 

18 class resource_type(TypeInfo): 

19 service = 'cloudkms' 

20 version = 'v1' 

21 component = 'projects.locations.keyRings' 

22 enum_spec = ('list', 'keyRings[]', None) 

23 scope = None 

24 name = id = 'name' 

25 default_report_fields = [ 

26 "name", "createTime"] 

27 asset_type = "cloudkms.googleapis.com/KeyRing" 

28 urn_component = "keyring" 

29 urn_id_segments = (-1,) # Just use the last segment of the id in the URN 

30 

31 @staticmethod 

32 def get(client, resource_info): 

33 name = 'projects/{}/locations/{}/keyRings/{}' \ 

34 .format(resource_info['project_id'], 

35 resource_info['location'], 

36 resource_info['key_ring_id']) 

37 return client.execute_command('get', {'name': name}) 

38 

39 @classmethod 

40 def _get_location(cls, resource): 

41 return resource["name"].split('/')[3] 

42 

43 def get_resource_query(self): 

44 if 'query' in self.data: 

45 for child in self.data.get('query'): 

46 if 'location' in child: 

47 location_query = child['location'] 

48 return {'parent': location_query if isinstance( 

49 location_query, list) else [location_query]} 

50 

51 def _fetch_resources(self, query): 

52 super_fetch_resources = QueryResourceManager._fetch_resources 

53 session = local_session(self.session_factory) 

54 project = session.get_default_project() 

55 locations = (query['parent'] if query and 'parent' in query 

56 else GcpLocation.get_service_locations('kms')) 

57 project_locations = ['projects/{}/locations/{}'.format(project, location) 

58 for location in locations] 

59 key_rings = [] 

60 for location in project_locations: 

61 key_rings.extend(super_fetch_resources(self, {'parent': location})) 

62 return key_rings 

63 

64 

65@KmsKeyRing.filter_registry.register('iam-policy') 

66class KmsKeyRingIamPolicyFilter(IamPolicyFilter): 

67 """ 

68 Overrides the base implementation to process kms key ring resources correctly. 

69 """ 

70 permissions = ('cloudkms.keyRings.getIamPolicy',) 

71 

72 

73@resources.register('kms-cryptokey') 

74class KmsCryptoKey(ChildResourceManager): 

75 """GCP Resource 

76 https://cloud.google.com/kms/docs/reference/rest/v1/projects.locations.keyRings.cryptoKeys 

77 """ 

78 def _get_parent_resource_info(self, child_instance): 

79 project_id, location, key_ring_id = re.match( 

80 'projects/(.*?)/locations/(.*?)/keyRings/(.*?)/cryptoKeys/.*', 

81 child_instance['name']).groups() 

82 return {'project_id': project_id, 

83 'location': location, 

84 'key_ring_id': key_ring_id} 

85 

86 def get_resource_query(self): 

87 """Does nothing as self does not need query values unlike its parent 

88 which receives them with the use_child_query flag.""" 

89 pass 

90 

91 class resource_type(ChildTypeInfo): 

92 service = 'cloudkms' 

93 version = 'v1' 

94 component = 'projects.locations.keyRings.cryptoKeys' 

95 enum_spec = ('list', 'cryptoKeys[]', None) 

96 scope = None 

97 name = id = 'name' 

98 default_report_fields = [ 

99 name, "purpose", "createTime", "nextRotationTime", "rotationPeriod"] 

100 parent_spec = { 

101 'resource': 'kms-keyring', 

102 'child_enum_params': [ 

103 ('name', 'parent') 

104 ], 

105 'use_child_query': True 

106 } 

107 asset_type = "cloudkms.googleapis.com/CryptoKey" 

108 scc_type = "google.cloud.kms.CryptoKey" 

109 urn_component = "cryptokey" 

110 urn_id_segments = (5, 7) 

111 

112 @staticmethod 

113 def get(client, resource_info): 

114 name = 'projects/{}/locations/{}/keyRings/{}/cryptoKeys/{}' \ 

115 .format(resource_info['project_id'], 

116 resource_info['location'], 

117 resource_info['key_ring_id'], 

118 resource_info['crypto_key_id']) 

119 return client.execute_command('get', {'name': name}) 

120 

121 @classmethod 

122 def _get_location(cls, resource): 

123 return resource["name"].split('/')[3] 

124 

125 

126@KmsCryptoKey.filter_registry.register('iam-policy') 

127class KmsCryptokeyIamPolicyFilter(IamPolicyFilter): 

128 """ 

129 Overrides the base implementation to process KMS Cryptokey resources correctly. 

130 """ 

131 permissions = ('cloudkms.cryptoKeys.get', 'cloudkms.cryptoKeys.list', 

132 'cloudkms.cryptoKeys.update', 'resourcemanager.projects.get') 

133 

134 def _verb_arguments(self, resource): 

135 verb_arguments = SetIamPolicy._verb_arguments(self, resource) 

136 return verb_arguments 

137 

138 

139@resources.register('kms-cryptokey-version') 

140class KmsCryptoKeyVersion(ChildResourceManager): 

141 """GCP Resource 

142 https://cloud.google.com/kms/docs/reference/rest/v1/projects.locations.keyRings.cryptoKeys.cryptoKeyVersions 

143 """ 

144 def _get_parent_resource_info(self, child_instance): 

145 path = 'projects/(.*?)/locations/(.*?)/keyRings/(.*?)/cryptoKeys/(.*?)/cryptoKeyVersions/.*' 

146 project_id, location, key_ring_id, crypto_key_id = \ 

147 re.match(path, child_instance['name']).groups() 

148 return {'project_id': project_id, 

149 'location': location, 

150 'key_ring_id': key_ring_id, 

151 'crypto_key_id': crypto_key_id} 

152 

153 def get_resource_query(self): 

154 """Does nothing as self does not need query values unlike its parent 

155 which receives them with the use_child_query flag.""" 

156 pass 

157 

158 class resource_type(ChildTypeInfo): 

159 service = 'cloudkms' 

160 version = 'v1' 

161 component = 'projects.locations.keyRings.cryptoKeys.cryptoKeyVersions' 

162 enum_spec = ('list', 'cryptoKeyVersions[]', None) 

163 scope = None 

164 name = id = 'name' 

165 default_report_fields = [ 

166 "name", "state", "protectionLevel", "algorithm", "createTime", "destroyTime"] 

167 parent_spec = { 

168 'resource': 'kms-cryptokey', 

169 'child_enum_params': [ 

170 ('name', 'parent') 

171 ], 

172 'use_child_query': True 

173 } 

174 asset_type = "cloudkms.googleapis.com/CryptoKeyVersion" 

175 urn_component = "cryptokey-version" 

176 urn_id_segments = (5, 7, 9) 

177 

178 @staticmethod 

179 def get(client, resource_info): 

180 name = 'projects/{}/locations/{}/keyRings/{}/cryptoKeys/{}/cryptoKeyVersions/{}'\ 

181 .format(resource_info['project_id'], 

182 resource_info['location'], 

183 resource_info['key_ring_id'], 

184 resource_info['crypto_key_id'], 

185 resource_info['crypto_key_version_id']) 

186 return client.execute_command('get', {'name': name}) 

187 

188 @classmethod 

189 def _get_location(cls, resource): 

190 return resource["name"].split('/')[3] 

191 

192 @classmethod 

193 def _get_location(cls, resource): 

194 return resource["name"].split('/')[3] 

195 

196 

197@resources.register('kms-location') 

198class KmsLocation(QueryResourceManager): 

199 

200 class resource_type(TypeInfo): 

201 service = 'cloudkms' 

202 version = 'v1' 

203 component = 'projects.locations' 

204 enum_spec = ('list', 'locations[]', None) 

205 scope = None 

206 name = id = 'name' 

207 default_report_fields = [ 

208 "name", "createTime"] 

209 asset_type = "cloudkms.googleapis.com/Location" 

210 

211 def _fetch_resources(self, query): 

212 super_fetch_resources = QueryResourceManager._fetch_resources 

213 session = local_session(self.session_factory) 

214 project = session.get_default_project() 

215 return super_fetch_resources(self, {'name': 'projects/{}'.format(project)}) 

216 

217 

218@KmsLocation.filter_registry.register('keyring') 

219class KmsLocationKmsKeyringFilter(Filter): 

220 

221 schema = type_schema('keyring', **{ 

222 'exist': {'type': 'boolean'}},) 

223 permissions = ('cloudkms.keyRings.list',) 

224 

225 def process(self, resources, event=None): 

226 session = local_session(self.manager.session_factory) 

227 client = session.client( 

228 service_name='cloudkms', version='v1', component='projects.locations.keyRings') 

229 accepted_resources = [] 

230 expecting_exist = self.data['exist'] 

231 

232 for resource in resources: 

233 kms_key_rings = client.execute_query('list', {'parent': resource['name']}) 

234 kms_key_rings_exist = bool(kms_key_rings.get('keyRings')) 

235 if kms_key_rings_exist == expecting_exist: 

236 accepted_resources.append(resource) 

237 return accepted_resources