Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/c7n_gcp/query.py: 41%

297 statements  

« prev     ^ index     » next       coverage.py v7.3.2, created at 2023-12-08 06:51 +0000

1# Copyright The Cloud Custodian Authors. 

2# SPDX-License-Identifier: Apache-2.0 

3 

4import json 

5import itertools 

6import logging 

7import re 

8import jmespath 

9 

10from googleapiclient.errors import HttpError 

11 

12from c7n.actions import ActionRegistry 

13from c7n.filters import FilterRegistry 

14from c7n.manager import ResourceManager 

15from c7n.query import sources, MaxResourceLimit 

16from c7n.utils import local_session, chunks, jmespath_search, jmespath_compile 

17 

18 

19log = logging.getLogger('c7n_gcp.query') 

20 

21 

22class ResourceQuery: 

23 

24 def __init__(self, session_factory): 

25 self.session_factory = session_factory 

26 

27 def filter(self, resource_manager, **params): 

28 m = resource_manager.resource_type 

29 session = local_session(self.session_factory) 

30 client = session.client( 

31 m.service, m.version, m.component) 

32 

33 # depends on resource scope 

34 if m.scope in ('project', 'zone'): 

35 project = session.get_default_project() 

36 if m.scope_template: 

37 project = m.scope_template.format(project) 

38 if m.scope_key: 

39 params[m.scope_key] = project 

40 else: 

41 params['project'] = project 

42 

43 if m.scope == 'zone': 

44 if session.get_default_zone(): 

45 params['zone'] = session.get_default_zone() 

46 

47 enum_op, path, extra_args = m.enum_spec 

48 if extra_args: 

49 params.update(extra_args) 

50 return self._invoke_client_enum( 

51 client, enum_op, params, path) 

52 

53 def _invoke_client_enum(self, client, enum_op, params, path): 

54 if client.supports_pagination(enum_op): 

55 results = [] 

56 for page in client.execute_paged_query(enum_op, params): 

57 page_items = jmespath_search(path, page) 

58 if page_items: 

59 results.extend(page_items) 

60 return results 

61 else: 

62 return jmespath_search(path, 

63 client.execute_query(enum_op, verb_arguments=params)) 

64 

65 

66@sources.register('describe-gcp') 

67class DescribeSource: 

68 

69 def __init__(self, manager): 

70 self.manager = manager 

71 self.query = ResourceQuery(manager.session_factory) 

72 

73 def get_resources(self, query): 

74 if query is None: 

75 query = {} 

76 return self.query.filter(self.manager, **query) 

77 

78 def get_permissions(self): 

79 m = self.manager.resource_type 

80 if m.permissions: 

81 return m.permissions 

82 method = m.enum_spec[0] 

83 if method == 'aggregatedList': 

84 method = 'list' 

85 component = m.component 

86 if '.' in component: 

87 component = component.split('.')[-1] 

88 return ("%s.%s.%s" % ( 

89 m.perm_service or m.service, component, method),) 

90 

91 def augment(self, resources): 

92 return resources 

93 

94 

95@sources.register('inventory') 

96class AssetInventory: 

97 

98 permissions = ("cloudasset.assets.searchAllResources", 

99 "cloudasset.assets.exportResource") 

100 

101 def __init__(self, manager): 

102 self.manager = manager 

103 

104 def get_resources(self, query): 

105 session = local_session(self.manager.session_factory) 

106 if query is None: 

107 query = {} 

108 if 'scope' not in query: 

109 query['scope'] = 'projects/%s' % session.get_default_project() 

110 if 'assetTypes' not in query: 

111 query['assetTypes'] = [self.manager.resource_type.asset_type] 

112 

113 search_client = session.client('cloudasset', 'v1p1beta1', 'resources') 

114 resource_client = session.client('cloudasset', 'v1', 'v1') 

115 resources = [] 

116 

117 results = list(search_client.execute_paged_query('searchAll', query)) 

118 for resource_set in chunks(itertools.chain(*[rs['results'] for rs in results]), 100): 

119 rquery = { 

120 'parent': query['scope'], 

121 'contentType': 'RESOURCE', 

122 'assetNames': [r['name'] for r in resource_set]} 

123 for history_result in resource_client.execute_query( 

124 'batchGetAssetsHistory', rquery).get('assets', ()): 

125 resource = history_result['asset']['resource']['data'] 

126 resource['c7n:history'] = { 

127 'window': history_result['window'], 

128 'ancestors': history_result['asset']['ancestors']} 

129 resources.append(resource) 

130 return resources 

131 

132 def get_permissions(self): 

133 return self.permissions 

134 

135 def augment(self, resources): 

136 return resources 

137 

138 

139class QueryMeta(type): 

140 """metaclass to have consistent action/filter registry for new resources.""" 

141 def __new__(cls, name, parents, attrs): 

142 if 'filter_registry' not in attrs: 

143 attrs['filter_registry'] = FilterRegistry( 

144 '%s.filters' % name.lower()) 

145 if 'action_registry' not in attrs: 

146 attrs['action_registry'] = ActionRegistry( 

147 '%s.actions' % name.lower()) 

148 

149 return super(QueryMeta, cls).__new__(cls, name, parents, attrs) 

150 

151 

152class QueryResourceManager(ResourceManager, metaclass=QueryMeta): 

153 # The resource manager type is injected by the PluginRegistry.register 

154 # decorator. 

155 type: str 

156 resource_type: 'TypeInfo' 

157 

158 def __init__(self, ctx, data): 

159 super(QueryResourceManager, self).__init__(ctx, data) 

160 self.source = self.get_source(self.source_type) 

161 

162 def get_permissions(self): 

163 return self.source.get_permissions() 

164 

165 def get_source(self, source_type): 

166 return sources.get(source_type)(self) 

167 

168 def get_client(self): 

169 return local_session(self.session_factory).client( 

170 self.resource_type.service, 

171 self.resource_type.version, 

172 self.resource_type.component) 

173 

174 def get_model(self): 

175 return self.resource_type 

176 

177 def get_cache_key(self, query): 

178 return {'source_type': self.source_type, 'query': query, 

179 'service': self.resource_type.service, 

180 'version': self.resource_type.version, 

181 'component': self.resource_type.component} 

182 

183 def get_resource(self, resource_info): 

184 return self.resource_type.get(self.get_client(), resource_info) 

185 

186 @property 

187 def source_type(self): 

188 return self.data.get('source', 'describe-gcp') 

189 

190 def get_resource_query(self): 

191 if 'query' in self.data: 

192 return {'filter': self.data.get('query')} 

193 

194 def resources(self, query=None): 

195 q = query or self.get_resource_query() 

196 cache_key = self.get_cache_key(q) 

197 resources = None 

198 

199 if self._cache.load(): 

200 resources = self._cache.get(cache_key) 

201 if resources is not None: 

202 self.log.debug("Using cached %s: %d" % ( 

203 "%s.%s" % (self.__class__.__module__, 

204 self.__class__.__name__), 

205 len(resources))) 

206 

207 if resources is None: 

208 with self.ctx.tracer.subsegment('resource-fetch'): 

209 resources = self._fetch_resources(q) 

210 self._cache.save(cache_key, resources) 

211 

212 self._cache.close() 

213 resource_count = len(resources) 

214 with self.ctx.tracer.subsegment('filter'): 

215 resources = self.filter_resources(resources) 

216 

217 # Check resource limits if we're the current policy execution. 

218 if self.data == self.ctx.policy.data: 

219 self.check_resource_limit(len(resources), resource_count) 

220 return resources 

221 

222 def check_resource_limit(self, selection_count, population_count): 

223 """Check if policy's execution affects more resources then its limit. 

224 """ 

225 p = self.ctx.policy 

226 max_resource_limits = MaxResourceLimit(p, selection_count, population_count) 

227 return max_resource_limits.check_resource_limits() 

228 

229 def _fetch_resources(self, query): 

230 try: 

231 return self.augment(self.source.get_resources(query)) or [] 

232 except HttpError as e: 

233 error_reason, error_code, error_message = extract_errors(e) 

234 

235 if error_reason is None and error_code is None: 

236 raise 

237 if error_code == 403 and 'disabled' in error_message: 

238 log.warning(error_message) 

239 return [] 

240 elif error_reason == 'accessNotConfigured': 

241 log.warning( 

242 "Resource:%s not available -> Service:%s not enabled on %s", 

243 self.type, 

244 self.resource_type.service, 

245 local_session(self.session_factory).get_default_project()) 

246 return [] 

247 raise 

248 

249 def augment(self, resources): 

250 return resources 

251 

252 def get_urns(self, resources): 

253 """Generate URNs for the resources. 

254 

255 A Uniform Resource Name (URN) is a URI that identifies a resource by 

256 name in a particular namespace. A URN may be used to talk about a 

257 resource without implying its location or how to access it. 

258 

259 The generated URNs can uniquely identify any given resource. 

260 

261 The generated URN is intended to follow a similar pattern to ARN, but be 

262 specific to GCP. 

263 

264 gcp:<service>:<region>:<project>:<resource-type>/<resource-id> 

265 

266 If the region is "global" then it is omitted from the URN. 

267 """ 

268 return self.resource_type.get_urns( 

269 resources, local_session(self.session_factory).project_id) 

270 

271 

272class ChildResourceManager(QueryResourceManager): 

273 

274 def get_resource(self, resource_info): 

275 child_instance = super(ChildResourceManager, self).get_resource(resource_info) 

276 

277 parent_resource = self.resource_type.parent_spec['resource'] 

278 parent_instance = self.get_resource_manager(parent_resource).get_resource( 

279 self._get_parent_resource_info(child_instance) 

280 ) 

281 

282 annotation_key = self.resource_type.get_parent_annotation_key() 

283 child_instance[annotation_key] = parent_instance 

284 

285 return child_instance 

286 

287 def _fetch_resources(self, query): 

288 if not query: 

289 query = {} 

290 

291 resources = [] 

292 annotation_key = self.resource_type.get_parent_annotation_key() 

293 parent_query = self.get_parent_resource_query() 

294 parent_resource_manager = self.get_resource_manager( 

295 resource_type=self.resource_type.parent_spec['resource'], 

296 data=({'query': parent_query} if parent_query else {}) 

297 ) 

298 

299 for parent_instance in parent_resource_manager.resources(): 

300 query.update(self._get_child_enum_args(parent_instance)) 

301 children = super(ChildResourceManager, self)._fetch_resources(query) 

302 

303 for child_instance in children: 

304 child_instance[annotation_key] = parent_instance 

305 

306 resources.extend(children) 

307 

308 return resources 

309 

310 def _get_parent_resource_info(self, child_instance): 

311 mappings = self.resource_type.parent_spec['parent_get_params'] 

312 return self._extract_fields(child_instance, mappings) 

313 

314 def _get_child_enum_args(self, parent_instance): 

315 mappings = self.resource_type.parent_spec['child_enum_params'] 

316 return self._extract_fields(parent_instance, mappings) 

317 

318 def get_parent_resource_query(self): 

319 parent_spec = self.resource_type.parent_spec 

320 enabled = parent_spec['use_child_query'] if 'use_child_query' in parent_spec else False 

321 if enabled and 'query' in self.data: 

322 return self.data.get('query') 

323 

324 @staticmethod 

325 def _extract_fields(source, mappings): 

326 result = {} 

327 

328 for mapping in mappings: 

329 result[mapping[1]] = jmespath.search(mapping[0], source) 

330 # Support for regex in child_enum_params. 

331 # Without this support you could only map parent-child elements with the raw data 

332 # they hold, but with regex you could regex that data as well while you map. 

333 if 'regex' in mapping: 

334 result[mapping[1]] = re.search(mapping[3],result[mapping[1]]).group(1) 

335 

336 return result 

337 

338 

339class RegionalResourceManager(ChildResourceManager): 

340 

341 def get_parent_resource_query(self): 

342 query = None 

343 if self.config.regions and 'all' not in self.config.regions: 

344 query = [{'name': r} for r in self.config.regions] 

345 elif self.config.region: 

346 query = [{'name': self.config.region}] 

347 return query 

348 

349 

350class TypeMeta(type): 

351 

352 def __repr__(cls): 

353 return "<TypeInfo service:%s component:%s scope:%s version:%s>" % ( 

354 cls.service, 

355 cls.component, 

356 cls.scope, 

357 cls.version) 

358 

359 

360class TypeInfo(metaclass=TypeMeta): 

361 

362 # api client construction information 

363 service = None 

364 version = None 

365 component = None 

366 

367 # resource enumeration parameters 

368 

369 scope = 'project' 

370 enum_spec = ('list', 'items[]', None) 

371 # ie. when project is passed instead as parent 

372 scope_key = None 

373 # custom formatting for scope key 

374 scope_template = None 

375 

376 # individual resource retrieval method, for serverless policies. 

377 get = None 

378 # for get methods that require the full event payload 

379 get_requires_event = False 

380 perm_service = None 

381 permissions = () 

382 

383 labels = False 

384 labels_op = 'setLabels' 

385 

386 # required for reporting 

387 id = None 

388 name = None 

389 default_report_fields = () 

390 

391 # cloud asset inventory type 

392 asset_type = None 

393 

394 # URN generation 

395 urn_region_key = 'region' 

396 # A jmespath into the resource object to find the id element of the URN. 

397 # If unset, it uses the value for id. 

398 urn_id_path = None 

399 # It is frequent enough that the id we want for the URN is made up of one or more 

400 # path segments from the id. Ids are frequently '/' delimited strings. 

401 # If set, this should be an iterable of integer indices into the segments. 

402 urn_id_segments = None 

403 # By default the component is taken for the URN. Can be overridden by specifying 

404 # a specific urn_component. 

405 urn_component = None 

406 # Truly global resources should override this to the empty string. 

407 urn_has_project = True 

408 # The location element is a zone, not a region. 

409 urn_zonal = False 

410 

411 # If the type supports refreshing an individual resource 

412 refresh = None 

413 

414 @classmethod 

415 def get_metric_resource_name(cls, resource): 

416 return resource.get(cls.name) 

417 

418 @classmethod 

419 def get_urns(cls, resources, project_id): 

420 """Generate URNs for the resources. 

421 

422 A Uniform Resource Name (URN) is a URI that identifies a resource by 

423 name in a particular namespace. A URN may be used to talk about a 

424 resource without implying its location or how to access it. 

425 

426 The generated URNs can uniquely identify any given resource. 

427 

428 The generated URN is intended to follow a similar pattern to ARN, but be 

429 specific to GCP. 

430 

431 gcp:<service>:<location>:<project>:<resource-type>/<resource-id> 

432 

433 If the region is "global" then it is omitted from the URN. 

434 """ 

435 return [cls._get_urn(r, project_id) for r in resources] 

436 

437 @classmethod 

438 def _get_urn(cls, resource, project_id) -> str: 

439 "Generate an URN for the resource." 

440 location = cls._get_location(resource) 

441 if location == "global": 

442 location = "" 

443 id = cls._get_urn_id(resource) 

444 if not cls.urn_has_project: 

445 project_id = "" 

446 # NOTE: not sure whether to use `component` or just the last part of 

447 # `component` (split on '.') for the part after project 

448 return f"gcp:{cls.service}:{location}:{project_id}:{cls.urn_component}/{id}" 

449 

450 @classmethod 

451 def _get_urn_id(cls, resource): 

452 path = cls.urn_id_path 

453 if path is None: 

454 path = cls.id 

455 id = jmespath_search(path, resource) 

456 if cls.urn_id_segments: 

457 parts = id.split('/') 

458 id = '/'.join([parts[index] for index in cls.urn_id_segments]) 

459 return id 

460 

461 @classmethod 

462 def _get_location(cls, resource): 

463 """Get the region for a single resource. 

464 

465 Resources are either global, regional, or zonal. When a resource is 

466 is zonal, the region is determined from the zone. 

467 """ 

468 if cls.urn_zonal and "zone" in resource: 

469 zone = resource["zone"].rsplit("/", 1)[-1] 

470 return zone 

471 

472 if cls.urn_region_key in resource: 

473 return resource[cls.urn_region_key].rsplit("/", 1)[-1] 

474 

475 return "global" 

476 

477 

478class ChildTypeInfo(TypeInfo): 

479 

480 parent_spec = None 

481 

482 @classmethod 

483 def get_parent_annotation_key(cls): 

484 parent_resource = cls.parent_spec['resource'] 

485 return 'c7n:{}'.format(parent_resource) 

486 

487 @classmethod 

488 def get_parent(cls, resource): 

489 "Return the annotated parent resource." 

490 return resource[cls.get_parent_annotation_key()] 

491 

492 

493ERROR_REASON = jmespath_compile('error.errors[0].reason') 

494ERROR_CODE = jmespath_compile('error.code') 

495ERROR_MESSAGE = jmespath_compile('error.message') 

496 

497 

498def extract_errors(e): 

499 try: 

500 edata = json.loads(e.content) 

501 except Exception: 

502 edata = None 

503 

504 return ERROR_REASON.search(edata), ERROR_CODE.search(edata), ERROR_MESSAGE.search(edata) 

505 

506 

507class GcpLocation: 

508 """ 

509 The `_locations` dict is formed by the string keys representing locations taken from 

510 `KMS <https://cloud.google.com/kms/docs/reference/rest/v1/projects.locations/list>`_ and 

511 `App Engine <https://cloud.google.com/appengine/docs/admin-api/reference/rest/v1 

512 /apps.locations/list>`_ and list values containing the string names of the services 

513 the locations are available for. 

514 """ 

515 _locations = {'eur4': ['kms'], 

516 'global': ['kms'], 

517 'europe-west4': ['kms'], 

518 'asia-east2': ['appengine', 'kms'], 

519 'asia-east1': ['kms'], 

520 'asia': ['kms'], 

521 'europe-north1': ['kms'], 

522 'us-central1': ['kms'], 

523 'nam4': ['kms'], 

524 'asia-southeast1': ['kms'], 

525 'europe': ['kms'], 

526 'australia-southeast1': ['appengine', 'kms'], 

527 'us-central': ['appengine'], 

528 'asia-south1': ['appengine', 'kms'], 

529 'us-west1': ['kms'], 

530 'us-west2': ['appengine', 'kms'], 

531 'asia-northeast2': ['appengine', 'kms'], 

532 'asia-northeast1': ['appengine', 'kms'], 

533 'europe-west2': ['appengine', 'kms'], 

534 'europe-west3': ['appengine', 'kms'], 

535 'us-east4': ['appengine', 'kms'], 

536 'europe-west1': ['kms'], 

537 'europe-west6': ['appengine', 'kms'], 

538 'us': ['kms'], 

539 'us-east1': ['appengine', 'kms'], 

540 'northamerica-northeast1': ['appengine', 'kms'], 

541 'europe-west': ['appengine'], 

542 'southamerica-east1': ['appengine', 'kms']} 

543 

544 @classmethod 

545 def get_service_locations(cls, service): 

546 """ 

547 Returns a list of the locations that have a given service in associated value lists. 

548 

549 :param service: a string representing the name of a service locations are queried for 

550 """ 

551 return [location for location in GcpLocation._locations 

552 if service in GcpLocation._locations[location]]