Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/c7n_gcp/query.py: 41%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

299 statements  

1# Copyright The Cloud Custodian Authors. 

2# SPDX-License-Identifier: Apache-2.0 

3 

4import json 

5import itertools 

6import logging 

7import re 

8import jmespath 

9 

10from googleapiclient.errors import HttpError 

11 

12from c7n.actions import ActionRegistry 

13from c7n.filters import FilterRegistry 

14from c7n.manager import ResourceManager 

15from c7n.query import sources, MaxResourceLimit 

16from c7n.utils import local_session, chunks, jmespath_search, jmespath_compile 

17 

18 

19log = logging.getLogger('c7n_gcp.query') 

20 

21 

22class ResourceQuery: 

23 

24 def __init__(self, session_factory): 

25 self.session_factory = session_factory 

26 

27 def filter(self, resource_manager, **params): 

28 m = resource_manager.resource_type 

29 session = local_session(self.session_factory) 

30 client = session.client( 

31 m.service, m.version, m.component) 

32 

33 # depends on resource scope 

34 if m.scope in ('project', 'zone'): 

35 project = session.get_default_project() 

36 if m.scope_template: 

37 project = m.scope_template.format(project) 

38 if m.scope_key: 

39 params[m.scope_key] = project 

40 else: 

41 params['project'] = project 

42 

43 if m.scope == 'zone': 

44 if session.get_default_zone(): 

45 params['zone'] = session.get_default_zone() 

46 

47 enum_op, path, extra_args = m.enum_spec 

48 if extra_args: 

49 params.update(extra_args) 

50 return self._invoke_client_enum( 

51 client, enum_op, params, path) 

52 

53 def _invoke_client_enum(self, client, enum_op, params, path): 

54 if client.supports_pagination(enum_op): 

55 results = [] 

56 for page in client.execute_paged_query(enum_op, params): 

57 page_items = jmespath_search(path, page) 

58 if page_items: 

59 results.extend(page_items) 

60 return results 

61 else: 

62 return jmespath_search(path, 

63 client.execute_query(enum_op, verb_arguments=params)) 

64 

65 

66@sources.register('describe-gcp') 

67class DescribeSource: 

68 

69 def __init__(self, manager): 

70 self.manager = manager 

71 self.query = ResourceQuery(manager.session_factory) 

72 

73 def get_resources(self, query): 

74 if query is None: 

75 query = {} 

76 return self.query.filter(self.manager, **query) 

77 

78 def get_permissions(self): 

79 m = self.manager.resource_type 

80 if m.permissions: 

81 return m.permissions 

82 method = m.enum_spec[0] 

83 if method == 'aggregatedList': 

84 method = 'list' 

85 component = m.component 

86 if '.' in component: 

87 component = component.split('.')[-1] 

88 return ("%s.%s.%s" % ( 

89 m.perm_service or m.service, component, method),) 

90 

91 def augment(self, resources): 

92 return resources 

93 

94 

95@sources.register('inventory') 

96class AssetInventory: 

97 

98 permissions = ("cloudasset.assets.searchAllResources", 

99 "cloudasset.assets.exportResource") 

100 

101 def __init__(self, manager): 

102 self.manager = manager 

103 

104 def get_resources(self, query): 

105 session = local_session(self.manager.session_factory) 

106 if query is None: 

107 query = {} 

108 if 'scope' not in query: 

109 query['scope'] = 'projects/%s' % session.get_default_project() 

110 if 'assetTypes' not in query: 

111 query['assetTypes'] = [self.manager.resource_type.asset_type] 

112 

113 search_client = session.client('cloudasset', 'v1p1beta1', 'resources') 

114 resource_client = session.client('cloudasset', 'v1', 'v1') 

115 resources = [] 

116 

117 results = list(search_client.execute_paged_query('searchAll', query)) 

118 for resource_set in chunks(itertools.chain(*[rs['results'] for rs in results]), 100): 

119 rquery = { 

120 'parent': query['scope'], 

121 'contentType': 'RESOURCE', 

122 'assetNames': [r['name'] for r in resource_set]} 

123 for history_result in resource_client.execute_query( 

124 'batchGetAssetsHistory', rquery).get('assets', ()): 

125 resource = history_result['asset']['resource']['data'] 

126 resource['c7n:history'] = { 

127 'window': history_result['window'], 

128 'ancestors': history_result['asset']['ancestors']} 

129 resources.append(resource) 

130 return resources 

131 

132 def get_permissions(self): 

133 return self.permissions 

134 

135 def augment(self, resources): 

136 return resources 

137 

138 

139class QueryMeta(type): 

140 """metaclass to have consistent action/filter registry for new resources.""" 

141 def __new__(cls, name, parents, attrs): 

142 if 'filter_registry' not in attrs: 

143 attrs['filter_registry'] = FilterRegistry( 

144 '%s.filters' % name.lower()) 

145 if 'action_registry' not in attrs: 

146 attrs['action_registry'] = ActionRegistry( 

147 '%s.actions' % name.lower()) 

148 

149 return super(QueryMeta, cls).__new__(cls, name, parents, attrs) 

150 

151 

152class QueryResourceManager(ResourceManager, metaclass=QueryMeta): 

153 # The resource manager type is injected by the PluginRegistry.register 

154 # decorator. 

155 type: str 

156 resource_type: 'TypeInfo' 

157 

158 def __init__(self, ctx, data): 

159 super(QueryResourceManager, self).__init__(ctx, data) 

160 self.source = self.get_source(self.source_type) 

161 

162 def get_permissions(self): 

163 return self.source.get_permissions() 

164 

165 def get_source(self, source_type): 

166 return sources.get(source_type)(self) 

167 

168 def get_client(self): 

169 return local_session(self.session_factory).client( 

170 self.resource_type.service, 

171 self.resource_type.version, 

172 self.resource_type.component) 

173 

174 def get_model(self): 

175 return self.resource_type 

176 

177 def get_cache_key(self, query): 

178 return {'source_type': self.source_type, 'query': query, 

179 'service': self.resource_type.service, 

180 'version': self.resource_type.version, 

181 'component': self.resource_type.component} 

182 

183 def get_resource(self, resource_info): 

184 return self.resource_type.get(self.get_client(), resource_info) 

185 

186 @property 

187 def source_type(self): 

188 return self.data.get('source', 'describe-gcp') 

189 

190 def get_resource_query(self): 

191 if 'query' in self.data: 

192 return {'filter': self.data.get('query')} 

193 

194 def resources(self, query=None): 

195 q = query or self.get_resource_query() 

196 cache_key = self.get_cache_key(q) 

197 resources = None 

198 

199 if self._cache.load(): 

200 resources = self._cache.get(cache_key) 

201 if resources is not None: 

202 self.log.debug("Using cached %s: %d" % ( 

203 "%s.%s" % (self.__class__.__module__, 

204 self.__class__.__name__), 

205 len(resources))) 

206 

207 if resources is None: 

208 with self.ctx.tracer.subsegment('resource-fetch'): 

209 resources = self._fetch_resources(q) 

210 self._cache.save(cache_key, resources) 

211 

212 self._cache.close() 

213 resource_count = len(resources) 

214 with self.ctx.tracer.subsegment('filter'): 

215 resources = self.filter_resources(resources) 

216 

217 # Check resource limits if we're the current policy execution. 

218 if self.data == self.ctx.policy.data: 

219 self.check_resource_limit(len(resources), resource_count) 

220 return resources 

221 

222 def check_resource_limit(self, selection_count, population_count): 

223 """Check if policy's execution affects more resources then its limit. 

224 """ 

225 p = self.ctx.policy 

226 max_resource_limits = MaxResourceLimit(p, selection_count, population_count) 

227 return max_resource_limits.check_resource_limits() 

228 

229 def _fetch_resources(self, query): 

230 try: 

231 return self.augment(self.source.get_resources(query)) or [] 

232 except HttpError as e: 

233 error_reason, error_code, error_message = extract_errors(e) 

234 

235 if error_reason is None and error_code is None: 

236 raise 

237 if error_code == 403 and 'disabled' in error_message: 

238 log.warning(error_message) 

239 return [] 

240 elif error_reason == 'accessNotConfigured': 

241 log.warning( 

242 "Resource:%s not available -> Service:%s not enabled on %s", 

243 self.type, 

244 self.resource_type.service, 

245 local_session(self.session_factory).get_default_project()) 

246 return [] 

247 raise 

248 

249 def augment(self, resources): 

250 return resources 

251 

252 def get_urns(self, resources): 

253 """Generate URNs for the resources. 

254 

255 A Uniform Resource Name (URN) is a URI that identifies a resource by 

256 name in a particular namespace. A URN may be used to talk about a 

257 resource without implying its location or how to access it. 

258 

259 The generated URNs can uniquely identify any given resource. 

260 

261 The generated URN is intended to follow a similar pattern to ARN, but be 

262 specific to GCP. 

263 

264 gcp:<service>:<region>:<project>:<resource-type>/<resource-id> 

265 

266 If the region is "global" then it is omitted from the URN. 

267 """ 

268 return self.resource_type.get_urns( 

269 resources, local_session(self.session_factory).project_id) 

270 

271 

272class ChildResourceManager(QueryResourceManager): 

273 

274 def get_resource(self, resource_info): 

275 child_instance = super(ChildResourceManager, self).get_resource(resource_info) 

276 

277 parent_resource = self.resource_type.parent_spec['resource'] 

278 parent_instance = self.get_resource_manager(parent_resource).get_resource( 

279 self._get_parent_resource_info(child_instance) 

280 ) 

281 

282 annotation_key = self.resource_type.get_parent_annotation_key() 

283 child_instance[annotation_key] = parent_instance 

284 

285 return child_instance 

286 

287 def _fetch_resources(self, query): 

288 if not query: 

289 query = {} 

290 

291 resources = [] 

292 annotation_key = self.resource_type.get_parent_annotation_key() 

293 parent_query = self.get_parent_resource_query() 

294 parent_resource_manager = self.get_resource_manager( 

295 resource_type=self.resource_type.parent_spec['resource'], 

296 data=({'query': parent_query} if parent_query else {}) 

297 ) 

298 

299 for parent_instance in parent_resource_manager.resources(): 

300 query.update(self._get_child_enum_args(parent_instance)) 

301 children = super(ChildResourceManager, self)._fetch_resources(query) 

302 

303 for child_instance in children: 

304 child_instance[annotation_key] = parent_instance 

305 

306 resources.extend(children) 

307 

308 return resources 

309 

310 def _get_parent_resource_info(self, child_instance): 

311 mappings = self.resource_type.parent_spec['parent_get_params'] 

312 return self._extract_fields(child_instance, mappings) 

313 

314 def _get_child_enum_args(self, parent_instance): 

315 mappings = self.resource_type.parent_spec['child_enum_params'] 

316 return self._extract_fields(parent_instance, mappings) 

317 

318 def get_parent_resource_query(self): 

319 parent_spec = self.resource_type.parent_spec 

320 enabled = parent_spec['use_child_query'] if 'use_child_query' in parent_spec else False 

321 if enabled and 'query' in self.data: 

322 return self.data.get('query') 

323 

324 @staticmethod 

325 def _extract_fields(source, mappings): 

326 result = {} 

327 

328 for mapping in mappings: 

329 result[mapping[1]] = jmespath.search(mapping[0], source) 

330 # Support for regex in child_enum_params. 

331 # Without this support you could only map parent-child elements with the raw data 

332 # they hold, but with regex you could regex that data as well while you map. 

333 if 'regex' in mapping: 

334 result[mapping[1]] = re.search(mapping[3], result[mapping[1]]).group(1) 

335 

336 return result 

337 

338 

339class RegionalResourceManager(ChildResourceManager): 

340 

341 def get_parent_resource_query(self): 

342 query = None 

343 if self.config.regions and 'all' not in self.config.regions: 

344 query = [{'name': r} for r in self.config.regions] 

345 elif self.config.region: 

346 query = [{'name': self.config.region}] 

347 return query 

348 

349 

350class TypeMeta(type): 

351 

352 def __repr__(cls): 

353 return "<TypeInfo service:%s component:%s scope:%s version:%s>" % ( 

354 cls.service, 

355 cls.component, 

356 cls.scope, 

357 cls.version) 

358 

359 

360class TypeInfo(metaclass=TypeMeta): 

361 

362 # api client construction information 

363 service = None 

364 version = None 

365 component = None 

366 

367 # resource enumeration parameters 

368 

369 scope = 'project' 

370 enum_spec = ('list', 'items[]', None) 

371 # ie. when project is passed instead as parent 

372 scope_key = None 

373 # custom formatting for scope key 

374 scope_template = None 

375 

376 # individual resource retrieval method, for serverless policies. 

377 get = None 

378 # for get methods that require the full event payload 

379 get_requires_event = False 

380 perm_service = None 

381 permissions = () 

382 

383 labels = False 

384 labels_op = 'setLabels' 

385 

386 # required for reporting 

387 id = None 

388 name = None 

389 default_report_fields = () 

390 

391 # cloud asset inventory type 

392 asset_type = None 

393 

394 # URN generation 

395 urn_region_key = 'region' 

396 # A jmespath into the resource object to find the id element of the URN. 

397 # If unset, it uses the value for id. 

398 urn_id_path = None 

399 # It is frequent enough that the id we want for the URN is made up of one or more 

400 # path segments from the id. Ids are frequently '/' delimited strings. 

401 # If set, this should be an iterable of integer indices into the segments. 

402 urn_id_segments = None 

403 # By default the component is taken for the URN. Can be overridden by specifying 

404 # a specific urn_component. 

405 urn_component = None 

406 # Truly global resources should override this to the empty string. 

407 urn_has_project = True 

408 # The location element is a zone, not a region. 

409 urn_zonal = False 

410 

411 # If the type supports refreshing an individual resource 

412 refresh = None 

413 

414 # Some resources don't support monitoring (and hence, no filtering). 

415 # Assume they generally can, and flip this to `False` for those that don't. 

416 allow_metrics_filters = True 

417 

418 @classmethod 

419 def get_metric_resource_name(cls, resource): 

420 return resource.get(cls.name) 

421 

422 @classmethod 

423 def get_urns(cls, resources, project_id): 

424 """Generate URNs for the resources. 

425 

426 A Uniform Resource Name (URN) is a URI that identifies a resource by 

427 name in a particular namespace. A URN may be used to talk about a 

428 resource without implying its location or how to access it. 

429 

430 The generated URNs can uniquely identify any given resource. 

431 

432 The generated URN is intended to follow a similar pattern to ARN, but be 

433 specific to GCP. 

434 

435 gcp:<service>:<location>:<project>:<resource-type>/<resource-id> 

436 

437 If the region is "global" then it is omitted from the URN. 

438 """ 

439 return [cls._get_urn(r, project_id) for r in resources] 

440 

441 @classmethod 

442 def _get_urn(cls, resource, project_id) -> str: 

443 "Generate an URN for the resource." 

444 location = cls._get_location(resource) 

445 if location == "global": 

446 location = "" 

447 id = cls._get_urn_id(resource) 

448 if not cls.urn_has_project: 

449 project_id = "" 

450 # NOTE: not sure whether to use `component` or just the last part of 

451 # `component` (split on '.') for the part after project 

452 return f"gcp:{cls.service}:{location}:{project_id}:{cls.urn_component}/{id}" 

453 

454 @classmethod 

455 def _get_urn_id(cls, resource): 

456 path = cls.urn_id_path 

457 if path is None: 

458 path = cls.id 

459 id = jmespath_search(path, resource) 

460 if cls.urn_id_segments: 

461 parts = id.split('/') 

462 id = '/'.join([parts[index] for index in cls.urn_id_segments]) 

463 return id 

464 

465 @classmethod 

466 def _get_location(cls, resource): 

467 """Get the region for a single resource. 

468 

469 Resources are either global, regional, or zonal. When a resource is 

470 is zonal, the region is determined from the zone. 

471 """ 

472 if cls.urn_zonal and "zone" in resource: 

473 zone = resource["zone"].rsplit("/", 1)[-1] 

474 return zone 

475 

476 if cls.urn_region_key in resource: 

477 return resource[cls.urn_region_key].rsplit("/", 1)[-1] 

478 

479 return "global" 

480 

481 

482class ChildTypeInfo(TypeInfo): 

483 

484 parent_spec = None 

485 

486 @classmethod 

487 def get_parent_annotation_key(cls): 

488 parent_resource = cls.parent_spec['resource'] 

489 return 'c7n:{}'.format(parent_resource) 

490 

491 @classmethod 

492 def get_parent(cls, resource): 

493 "Return the annotated parent resource." 

494 return resource[cls.get_parent_annotation_key()] 

495 

496 

497ERROR_REASON = jmespath_compile('error.errors[0].reason') 

498ERROR_CODE = jmespath_compile('error.code') 

499ERROR_MESSAGE = jmespath_compile('error.message') 

500 

501 

502def extract_errors(e): 

503 try: 

504 edata = json.loads(e.content) 

505 except Exception: 

506 edata = None 

507 

508 return ERROR_REASON.search(edata), ERROR_CODE.search(edata), ERROR_MESSAGE.search(edata) 

509 

510 

511class GcpLocation: 

512 """ 

513 The `_locations` dict is formed by the string keys representing locations taken from 

514 `KMS <https://cloud.google.com/kms/docs/reference/rest/v1/projects.locations/list>`_ and 

515 `App Engine <https://cloud.google.com/appengine/docs/admin-api/reference/rest/v1 

516 /apps.locations/list>`_ and list values containing the string names of the services 

517 the locations are available for. 

518 """ 

519 _locations = {'eur4': ['kms'], 

520 'global': ['kms'], 

521 'europe-west4': ['kms'], 

522 'asia-east2': ['appengine', 'kms'], 

523 'asia-east1': ['kms'], 

524 'asia': ['kms'], 

525 'europe-north1': ['kms'], 

526 'us-central1': ['kms'], 

527 'nam4': ['kms'], 

528 'asia-southeast1': ['kms'], 

529 'europe': ['kms'], 

530 'australia-southeast1': ['appengine', 'kms'], 

531 'us-central': ['appengine'], 

532 'asia-south1': ['appengine', 'kms'], 

533 'us-west1': ['kms'], 

534 'us-west2': ['appengine', 'kms'], 

535 'asia-northeast2': ['appengine', 'kms'], 

536 'asia-northeast1': ['appengine', 'kms'], 

537 'europe-west2': ['appengine', 'kms'], 

538 'europe-west3': ['appengine', 'kms'], 

539 'us-east4': ['appengine', 'kms'], 

540 'europe-west1': ['kms'], 

541 'europe-west6': ['appengine', 'kms'], 

542 'us': ['kms'], 

543 'us-east1': ['appengine', 'kms'], 

544 'northamerica-northeast1': ['appengine', 'kms'], 

545 'europe-west': ['appengine'], 

546 'southamerica-east1': ['appengine', 'kms']} 

547 

548 @classmethod 

549 def get_service_locations(cls, service): 

550 """ 

551 Returns a list of the locations that have a given service in associated value lists. 

552 

553 :param service: a string representing the name of a service locations are queried for 

554 """ 

555 return [location for location in GcpLocation._locations 

556 if service in GcpLocation._locations[location]]