Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/c7n_gcp/query.py: 41%
297 statements
« prev ^ index » next coverage.py v7.3.2, created at 2023-12-08 06:51 +0000
« prev ^ index » next coverage.py v7.3.2, created at 2023-12-08 06:51 +0000
1# Copyright The Cloud Custodian Authors.
2# SPDX-License-Identifier: Apache-2.0
4import json
5import itertools
6import logging
7import re
8import jmespath
10from googleapiclient.errors import HttpError
12from c7n.actions import ActionRegistry
13from c7n.filters import FilterRegistry
14from c7n.manager import ResourceManager
15from c7n.query import sources, MaxResourceLimit
16from c7n.utils import local_session, chunks, jmespath_search, jmespath_compile
19log = logging.getLogger('c7n_gcp.query')
22class ResourceQuery:
24 def __init__(self, session_factory):
25 self.session_factory = session_factory
27 def filter(self, resource_manager, **params):
28 m = resource_manager.resource_type
29 session = local_session(self.session_factory)
30 client = session.client(
31 m.service, m.version, m.component)
33 # depends on resource scope
34 if m.scope in ('project', 'zone'):
35 project = session.get_default_project()
36 if m.scope_template:
37 project = m.scope_template.format(project)
38 if m.scope_key:
39 params[m.scope_key] = project
40 else:
41 params['project'] = project
43 if m.scope == 'zone':
44 if session.get_default_zone():
45 params['zone'] = session.get_default_zone()
47 enum_op, path, extra_args = m.enum_spec
48 if extra_args:
49 params.update(extra_args)
50 return self._invoke_client_enum(
51 client, enum_op, params, path)
53 def _invoke_client_enum(self, client, enum_op, params, path):
54 if client.supports_pagination(enum_op):
55 results = []
56 for page in client.execute_paged_query(enum_op, params):
57 page_items = jmespath_search(path, page)
58 if page_items:
59 results.extend(page_items)
60 return results
61 else:
62 return jmespath_search(path,
63 client.execute_query(enum_op, verb_arguments=params))
66@sources.register('describe-gcp')
67class DescribeSource:
69 def __init__(self, manager):
70 self.manager = manager
71 self.query = ResourceQuery(manager.session_factory)
73 def get_resources(self, query):
74 if query is None:
75 query = {}
76 return self.query.filter(self.manager, **query)
78 def get_permissions(self):
79 m = self.manager.resource_type
80 if m.permissions:
81 return m.permissions
82 method = m.enum_spec[0]
83 if method == 'aggregatedList':
84 method = 'list'
85 component = m.component
86 if '.' in component:
87 component = component.split('.')[-1]
88 return ("%s.%s.%s" % (
89 m.perm_service or m.service, component, method),)
91 def augment(self, resources):
92 return resources
95@sources.register('inventory')
96class AssetInventory:
98 permissions = ("cloudasset.assets.searchAllResources",
99 "cloudasset.assets.exportResource")
101 def __init__(self, manager):
102 self.manager = manager
104 def get_resources(self, query):
105 session = local_session(self.manager.session_factory)
106 if query is None:
107 query = {}
108 if 'scope' not in query:
109 query['scope'] = 'projects/%s' % session.get_default_project()
110 if 'assetTypes' not in query:
111 query['assetTypes'] = [self.manager.resource_type.asset_type]
113 search_client = session.client('cloudasset', 'v1p1beta1', 'resources')
114 resource_client = session.client('cloudasset', 'v1', 'v1')
115 resources = []
117 results = list(search_client.execute_paged_query('searchAll', query))
118 for resource_set in chunks(itertools.chain(*[rs['results'] for rs in results]), 100):
119 rquery = {
120 'parent': query['scope'],
121 'contentType': 'RESOURCE',
122 'assetNames': [r['name'] for r in resource_set]}
123 for history_result in resource_client.execute_query(
124 'batchGetAssetsHistory', rquery).get('assets', ()):
125 resource = history_result['asset']['resource']['data']
126 resource['c7n:history'] = {
127 'window': history_result['window'],
128 'ancestors': history_result['asset']['ancestors']}
129 resources.append(resource)
130 return resources
132 def get_permissions(self):
133 return self.permissions
135 def augment(self, resources):
136 return resources
139class QueryMeta(type):
140 """metaclass to have consistent action/filter registry for new resources."""
141 def __new__(cls, name, parents, attrs):
142 if 'filter_registry' not in attrs:
143 attrs['filter_registry'] = FilterRegistry(
144 '%s.filters' % name.lower())
145 if 'action_registry' not in attrs:
146 attrs['action_registry'] = ActionRegistry(
147 '%s.actions' % name.lower())
149 return super(QueryMeta, cls).__new__(cls, name, parents, attrs)
152class QueryResourceManager(ResourceManager, metaclass=QueryMeta):
153 # The resource manager type is injected by the PluginRegistry.register
154 # decorator.
155 type: str
156 resource_type: 'TypeInfo'
158 def __init__(self, ctx, data):
159 super(QueryResourceManager, self).__init__(ctx, data)
160 self.source = self.get_source(self.source_type)
162 def get_permissions(self):
163 return self.source.get_permissions()
165 def get_source(self, source_type):
166 return sources.get(source_type)(self)
168 def get_client(self):
169 return local_session(self.session_factory).client(
170 self.resource_type.service,
171 self.resource_type.version,
172 self.resource_type.component)
174 def get_model(self):
175 return self.resource_type
177 def get_cache_key(self, query):
178 return {'source_type': self.source_type, 'query': query,
179 'service': self.resource_type.service,
180 'version': self.resource_type.version,
181 'component': self.resource_type.component}
183 def get_resource(self, resource_info):
184 return self.resource_type.get(self.get_client(), resource_info)
186 @property
187 def source_type(self):
188 return self.data.get('source', 'describe-gcp')
190 def get_resource_query(self):
191 if 'query' in self.data:
192 return {'filter': self.data.get('query')}
194 def resources(self, query=None):
195 q = query or self.get_resource_query()
196 cache_key = self.get_cache_key(q)
197 resources = None
199 if self._cache.load():
200 resources = self._cache.get(cache_key)
201 if resources is not None:
202 self.log.debug("Using cached %s: %d" % (
203 "%s.%s" % (self.__class__.__module__,
204 self.__class__.__name__),
205 len(resources)))
207 if resources is None:
208 with self.ctx.tracer.subsegment('resource-fetch'):
209 resources = self._fetch_resources(q)
210 self._cache.save(cache_key, resources)
212 self._cache.close()
213 resource_count = len(resources)
214 with self.ctx.tracer.subsegment('filter'):
215 resources = self.filter_resources(resources)
217 # Check resource limits if we're the current policy execution.
218 if self.data == self.ctx.policy.data:
219 self.check_resource_limit(len(resources), resource_count)
220 return resources
222 def check_resource_limit(self, selection_count, population_count):
223 """Check if policy's execution affects more resources then its limit.
224 """
225 p = self.ctx.policy
226 max_resource_limits = MaxResourceLimit(p, selection_count, population_count)
227 return max_resource_limits.check_resource_limits()
229 def _fetch_resources(self, query):
230 try:
231 return self.augment(self.source.get_resources(query)) or []
232 except HttpError as e:
233 error_reason, error_code, error_message = extract_errors(e)
235 if error_reason is None and error_code is None:
236 raise
237 if error_code == 403 and 'disabled' in error_message:
238 log.warning(error_message)
239 return []
240 elif error_reason == 'accessNotConfigured':
241 log.warning(
242 "Resource:%s not available -> Service:%s not enabled on %s",
243 self.type,
244 self.resource_type.service,
245 local_session(self.session_factory).get_default_project())
246 return []
247 raise
249 def augment(self, resources):
250 return resources
252 def get_urns(self, resources):
253 """Generate URNs for the resources.
255 A Uniform Resource Name (URN) is a URI that identifies a resource by
256 name in a particular namespace. A URN may be used to talk about a
257 resource without implying its location or how to access it.
259 The generated URNs can uniquely identify any given resource.
261 The generated URN is intended to follow a similar pattern to ARN, but be
262 specific to GCP.
264 gcp:<service>:<region>:<project>:<resource-type>/<resource-id>
266 If the region is "global" then it is omitted from the URN.
267 """
268 return self.resource_type.get_urns(
269 resources, local_session(self.session_factory).project_id)
272class ChildResourceManager(QueryResourceManager):
274 def get_resource(self, resource_info):
275 child_instance = super(ChildResourceManager, self).get_resource(resource_info)
277 parent_resource = self.resource_type.parent_spec['resource']
278 parent_instance = self.get_resource_manager(parent_resource).get_resource(
279 self._get_parent_resource_info(child_instance)
280 )
282 annotation_key = self.resource_type.get_parent_annotation_key()
283 child_instance[annotation_key] = parent_instance
285 return child_instance
287 def _fetch_resources(self, query):
288 if not query:
289 query = {}
291 resources = []
292 annotation_key = self.resource_type.get_parent_annotation_key()
293 parent_query = self.get_parent_resource_query()
294 parent_resource_manager = self.get_resource_manager(
295 resource_type=self.resource_type.parent_spec['resource'],
296 data=({'query': parent_query} if parent_query else {})
297 )
299 for parent_instance in parent_resource_manager.resources():
300 query.update(self._get_child_enum_args(parent_instance))
301 children = super(ChildResourceManager, self)._fetch_resources(query)
303 for child_instance in children:
304 child_instance[annotation_key] = parent_instance
306 resources.extend(children)
308 return resources
310 def _get_parent_resource_info(self, child_instance):
311 mappings = self.resource_type.parent_spec['parent_get_params']
312 return self._extract_fields(child_instance, mappings)
314 def _get_child_enum_args(self, parent_instance):
315 mappings = self.resource_type.parent_spec['child_enum_params']
316 return self._extract_fields(parent_instance, mappings)
318 def get_parent_resource_query(self):
319 parent_spec = self.resource_type.parent_spec
320 enabled = parent_spec['use_child_query'] if 'use_child_query' in parent_spec else False
321 if enabled and 'query' in self.data:
322 return self.data.get('query')
324 @staticmethod
325 def _extract_fields(source, mappings):
326 result = {}
328 for mapping in mappings:
329 result[mapping[1]] = jmespath.search(mapping[0], source)
330 # Support for regex in child_enum_params.
331 # Without this support you could only map parent-child elements with the raw data
332 # they hold, but with regex you could regex that data as well while you map.
333 if 'regex' in mapping:
334 result[mapping[1]] = re.search(mapping[3],result[mapping[1]]).group(1)
336 return result
339class RegionalResourceManager(ChildResourceManager):
341 def get_parent_resource_query(self):
342 query = None
343 if self.config.regions and 'all' not in self.config.regions:
344 query = [{'name': r} for r in self.config.regions]
345 elif self.config.region:
346 query = [{'name': self.config.region}]
347 return query
350class TypeMeta(type):
352 def __repr__(cls):
353 return "<TypeInfo service:%s component:%s scope:%s version:%s>" % (
354 cls.service,
355 cls.component,
356 cls.scope,
357 cls.version)
360class TypeInfo(metaclass=TypeMeta):
362 # api client construction information
363 service = None
364 version = None
365 component = None
367 # resource enumeration parameters
369 scope = 'project'
370 enum_spec = ('list', 'items[]', None)
371 # ie. when project is passed instead as parent
372 scope_key = None
373 # custom formatting for scope key
374 scope_template = None
376 # individual resource retrieval method, for serverless policies.
377 get = None
378 # for get methods that require the full event payload
379 get_requires_event = False
380 perm_service = None
381 permissions = ()
383 labels = False
384 labels_op = 'setLabels'
386 # required for reporting
387 id = None
388 name = None
389 default_report_fields = ()
391 # cloud asset inventory type
392 asset_type = None
394 # URN generation
395 urn_region_key = 'region'
396 # A jmespath into the resource object to find the id element of the URN.
397 # If unset, it uses the value for id.
398 urn_id_path = None
399 # It is frequent enough that the id we want for the URN is made up of one or more
400 # path segments from the id. Ids are frequently '/' delimited strings.
401 # If set, this should be an iterable of integer indices into the segments.
402 urn_id_segments = None
403 # By default the component is taken for the URN. Can be overridden by specifying
404 # a specific urn_component.
405 urn_component = None
406 # Truly global resources should override this to the empty string.
407 urn_has_project = True
408 # The location element is a zone, not a region.
409 urn_zonal = False
411 # If the type supports refreshing an individual resource
412 refresh = None
414 @classmethod
415 def get_metric_resource_name(cls, resource):
416 return resource.get(cls.name)
418 @classmethod
419 def get_urns(cls, resources, project_id):
420 """Generate URNs for the resources.
422 A Uniform Resource Name (URN) is a URI that identifies a resource by
423 name in a particular namespace. A URN may be used to talk about a
424 resource without implying its location or how to access it.
426 The generated URNs can uniquely identify any given resource.
428 The generated URN is intended to follow a similar pattern to ARN, but be
429 specific to GCP.
431 gcp:<service>:<location>:<project>:<resource-type>/<resource-id>
433 If the region is "global" then it is omitted from the URN.
434 """
435 return [cls._get_urn(r, project_id) for r in resources]
437 @classmethod
438 def _get_urn(cls, resource, project_id) -> str:
439 "Generate an URN for the resource."
440 location = cls._get_location(resource)
441 if location == "global":
442 location = ""
443 id = cls._get_urn_id(resource)
444 if not cls.urn_has_project:
445 project_id = ""
446 # NOTE: not sure whether to use `component` or just the last part of
447 # `component` (split on '.') for the part after project
448 return f"gcp:{cls.service}:{location}:{project_id}:{cls.urn_component}/{id}"
450 @classmethod
451 def _get_urn_id(cls, resource):
452 path = cls.urn_id_path
453 if path is None:
454 path = cls.id
455 id = jmespath_search(path, resource)
456 if cls.urn_id_segments:
457 parts = id.split('/')
458 id = '/'.join([parts[index] for index in cls.urn_id_segments])
459 return id
461 @classmethod
462 def _get_location(cls, resource):
463 """Get the region for a single resource.
465 Resources are either global, regional, or zonal. When a resource is
466 is zonal, the region is determined from the zone.
467 """
468 if cls.urn_zonal and "zone" in resource:
469 zone = resource["zone"].rsplit("/", 1)[-1]
470 return zone
472 if cls.urn_region_key in resource:
473 return resource[cls.urn_region_key].rsplit("/", 1)[-1]
475 return "global"
478class ChildTypeInfo(TypeInfo):
480 parent_spec = None
482 @classmethod
483 def get_parent_annotation_key(cls):
484 parent_resource = cls.parent_spec['resource']
485 return 'c7n:{}'.format(parent_resource)
487 @classmethod
488 def get_parent(cls, resource):
489 "Return the annotated parent resource."
490 return resource[cls.get_parent_annotation_key()]
493ERROR_REASON = jmespath_compile('error.errors[0].reason')
494ERROR_CODE = jmespath_compile('error.code')
495ERROR_MESSAGE = jmespath_compile('error.message')
498def extract_errors(e):
499 try:
500 edata = json.loads(e.content)
501 except Exception:
502 edata = None
504 return ERROR_REASON.search(edata), ERROR_CODE.search(edata), ERROR_MESSAGE.search(edata)
507class GcpLocation:
508 """
509 The `_locations` dict is formed by the string keys representing locations taken from
510 `KMS <https://cloud.google.com/kms/docs/reference/rest/v1/projects.locations/list>`_ and
511 `App Engine <https://cloud.google.com/appengine/docs/admin-api/reference/rest/v1
512 /apps.locations/list>`_ and list values containing the string names of the services
513 the locations are available for.
514 """
515 _locations = {'eur4': ['kms'],
516 'global': ['kms'],
517 'europe-west4': ['kms'],
518 'asia-east2': ['appengine', 'kms'],
519 'asia-east1': ['kms'],
520 'asia': ['kms'],
521 'europe-north1': ['kms'],
522 'us-central1': ['kms'],
523 'nam4': ['kms'],
524 'asia-southeast1': ['kms'],
525 'europe': ['kms'],
526 'australia-southeast1': ['appengine', 'kms'],
527 'us-central': ['appengine'],
528 'asia-south1': ['appengine', 'kms'],
529 'us-west1': ['kms'],
530 'us-west2': ['appengine', 'kms'],
531 'asia-northeast2': ['appengine', 'kms'],
532 'asia-northeast1': ['appengine', 'kms'],
533 'europe-west2': ['appengine', 'kms'],
534 'europe-west3': ['appengine', 'kms'],
535 'us-east4': ['appengine', 'kms'],
536 'europe-west1': ['kms'],
537 'europe-west6': ['appengine', 'kms'],
538 'us': ['kms'],
539 'us-east1': ['appengine', 'kms'],
540 'northamerica-northeast1': ['appengine', 'kms'],
541 'europe-west': ['appengine'],
542 'southamerica-east1': ['appengine', 'kms']}
544 @classmethod
545 def get_service_locations(cls, service):
546 """
547 Returns a list of the locations that have a given service in associated value lists.
549 :param service: a string representing the name of a service locations are queried for
550 """
551 return [location for location in GcpLocation._locations
552 if service in GcpLocation._locations[location]]