Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/c7n_gcp/query.py: 41%
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1# Copyright The Cloud Custodian Authors.
2# SPDX-License-Identifier: Apache-2.0
4import json
5import itertools
6import logging
7import re
8import jmespath
10from googleapiclient.errors import HttpError
12from c7n.actions import ActionRegistry
13from c7n.filters import FilterRegistry
14from c7n.manager import ResourceManager
15from c7n.query import sources, MaxResourceLimit
16from c7n.utils import local_session, chunks, jmespath_search, jmespath_compile
19log = logging.getLogger('c7n_gcp.query')
22class ResourceQuery:
24 def __init__(self, session_factory):
25 self.session_factory = session_factory
27 def filter(self, resource_manager, **params):
28 m = resource_manager.resource_type
29 session = local_session(self.session_factory)
30 client = session.client(
31 m.service, m.version, m.component)
33 # depends on resource scope
34 if m.scope in ('project', 'zone'):
35 project = session.get_default_project()
36 if m.scope_template:
37 project = m.scope_template.format(project)
38 if m.scope_key:
39 params[m.scope_key] = project
40 else:
41 params['project'] = project
43 if m.scope == 'zone':
44 if session.get_default_zone():
45 params['zone'] = session.get_default_zone()
47 enum_op, path, extra_args = m.enum_spec
48 if extra_args:
49 params.update(extra_args)
50 return self._invoke_client_enum(
51 client, enum_op, params, path)
53 def _invoke_client_enum(self, client, enum_op, params, path):
54 if client.supports_pagination(enum_op):
55 results = []
56 for page in client.execute_paged_query(enum_op, params):
57 page_items = jmespath_search(path, page)
58 if page_items:
59 results.extend(page_items)
60 return results
61 else:
62 return jmespath_search(path,
63 client.execute_query(enum_op, verb_arguments=params))
66@sources.register('describe-gcp')
67class DescribeSource:
69 def __init__(self, manager):
70 self.manager = manager
71 self.query = ResourceQuery(manager.session_factory)
73 def get_resources(self, query):
74 if query is None:
75 query = {}
76 return self.query.filter(self.manager, **query)
78 def get_permissions(self):
79 m = self.manager.resource_type
80 if m.permissions:
81 return m.permissions
82 method = m.enum_spec[0]
83 if method == 'aggregatedList':
84 method = 'list'
85 component = m.component
86 if '.' in component:
87 component = component.split('.')[-1]
88 return ("%s.%s.%s" % (
89 m.perm_service or m.service, component, method),)
91 def augment(self, resources):
92 return resources
95@sources.register('inventory')
96class AssetInventory:
98 permissions = ("cloudasset.assets.searchAllResources",
99 "cloudasset.assets.exportResource")
101 def __init__(self, manager):
102 self.manager = manager
104 def get_resources(self, query):
105 session = local_session(self.manager.session_factory)
106 if query is None:
107 query = {}
108 if 'scope' not in query:
109 query['scope'] = 'projects/%s' % session.get_default_project()
110 if 'assetTypes' not in query:
111 query['assetTypes'] = [self.manager.resource_type.asset_type]
113 search_client = session.client('cloudasset', 'v1p1beta1', 'resources')
114 resource_client = session.client('cloudasset', 'v1', 'v1')
115 resources = []
117 results = list(search_client.execute_paged_query('searchAll', query))
118 for resource_set in chunks(itertools.chain(*[rs['results'] for rs in results]), 100):
119 rquery = {
120 'parent': query['scope'],
121 'contentType': 'RESOURCE',
122 'assetNames': [r['name'] for r in resource_set]}
123 for history_result in resource_client.execute_query(
124 'batchGetAssetsHistory', rquery).get('assets', ()):
125 resource = history_result['asset']['resource']['data']
126 resource['c7n:history'] = {
127 'window': history_result['window'],
128 'ancestors': history_result['asset']['ancestors']}
129 resources.append(resource)
130 return resources
132 def get_permissions(self):
133 return self.permissions
135 def augment(self, resources):
136 return resources
139class QueryMeta(type):
140 """metaclass to have consistent action/filter registry for new resources."""
141 def __new__(cls, name, parents, attrs):
142 if 'filter_registry' not in attrs:
143 attrs['filter_registry'] = FilterRegistry(
144 '%s.filters' % name.lower())
145 if 'action_registry' not in attrs:
146 attrs['action_registry'] = ActionRegistry(
147 '%s.actions' % name.lower())
149 return super(QueryMeta, cls).__new__(cls, name, parents, attrs)
152class QueryResourceManager(ResourceManager, metaclass=QueryMeta):
153 # The resource manager type is injected by the PluginRegistry.register
154 # decorator.
155 type: str
156 resource_type: 'TypeInfo'
158 def __init__(self, ctx, data):
159 super(QueryResourceManager, self).__init__(ctx, data)
160 self.source = self.get_source(self.source_type)
162 def get_permissions(self):
163 return self.source.get_permissions()
165 def get_source(self, source_type):
166 return sources.get(source_type)(self)
168 def get_client(self):
169 return local_session(self.session_factory).client(
170 self.resource_type.service,
171 self.resource_type.version,
172 self.resource_type.component)
174 def get_model(self):
175 return self.resource_type
177 def get_cache_key(self, query):
178 return {'source_type': self.source_type, 'query': query,
179 'service': self.resource_type.service,
180 'version': self.resource_type.version,
181 'component': self.resource_type.component}
183 def get_resource(self, resource_info):
184 return self.resource_type.get(self.get_client(), resource_info)
186 @property
187 def source_type(self):
188 return self.data.get('source', 'describe-gcp')
190 def get_resource_query(self):
191 if 'query' in self.data:
192 return {'filter': self.data.get('query')}
194 def resources(self, query=None):
195 q = query or self.get_resource_query()
196 cache_key = self.get_cache_key(q)
197 resources = None
199 if self._cache.load():
200 resources = self._cache.get(cache_key)
201 if resources is not None:
202 self.log.debug("Using cached %s: %d" % (
203 "%s.%s" % (self.__class__.__module__,
204 self.__class__.__name__),
205 len(resources)))
207 if resources is None:
208 with self.ctx.tracer.subsegment('resource-fetch'):
209 resources = self._fetch_resources(q)
210 self._cache.save(cache_key, resources)
212 self._cache.close()
213 resource_count = len(resources)
214 with self.ctx.tracer.subsegment('filter'):
215 resources = self.filter_resources(resources)
217 # Check resource limits if we're the current policy execution.
218 if self.data == self.ctx.policy.data:
219 self.check_resource_limit(len(resources), resource_count)
220 return resources
222 def check_resource_limit(self, selection_count, population_count):
223 """Check if policy's execution affects more resources then its limit.
224 """
225 p = self.ctx.policy
226 max_resource_limits = MaxResourceLimit(p, selection_count, population_count)
227 return max_resource_limits.check_resource_limits()
229 def _fetch_resources(self, query):
230 try:
231 return self.augment(self.source.get_resources(query)) or []
232 except HttpError as e:
233 error_reason, error_code, error_message = extract_errors(e)
235 if error_reason is None and error_code is None:
236 raise
237 if error_code == 403 and 'disabled' in error_message:
238 log.warning(error_message)
239 return []
240 elif error_reason == 'accessNotConfigured':
241 log.warning(
242 "Resource:%s not available -> Service:%s not enabled on %s",
243 self.type,
244 self.resource_type.service,
245 local_session(self.session_factory).get_default_project())
246 return []
247 raise
249 def augment(self, resources):
250 return resources
252 def get_urns(self, resources):
253 """Generate URNs for the resources.
255 A Uniform Resource Name (URN) is a URI that identifies a resource by
256 name in a particular namespace. A URN may be used to talk about a
257 resource without implying its location or how to access it.
259 The generated URNs can uniquely identify any given resource.
261 The generated URN is intended to follow a similar pattern to ARN, but be
262 specific to GCP.
264 gcp:<service>:<region>:<project>:<resource-type>/<resource-id>
266 If the region is "global" then it is omitted from the URN.
267 """
268 return self.resource_type.get_urns(
269 resources, local_session(self.session_factory).project_id)
272class ChildResourceManager(QueryResourceManager):
274 def get_resource(self, resource_info):
275 child_instance = super(ChildResourceManager, self).get_resource(resource_info)
277 parent_resource = self.resource_type.parent_spec['resource']
278 parent_instance = self.get_resource_manager(parent_resource).get_resource(
279 self._get_parent_resource_info(child_instance)
280 )
282 annotation_key = self.resource_type.get_parent_annotation_key()
283 child_instance[annotation_key] = parent_instance
285 return child_instance
287 def _fetch_resources(self, query):
288 if not query:
289 query = {}
291 resources = []
292 annotation_key = self.resource_type.get_parent_annotation_key()
293 parent_query = self.get_parent_resource_query()
294 parent_resource_manager = self.get_resource_manager(
295 resource_type=self.resource_type.parent_spec['resource'],
296 data=({'query': parent_query} if parent_query else {})
297 )
299 for parent_instance in parent_resource_manager.resources():
300 query.update(self._get_child_enum_args(parent_instance))
301 children = super(ChildResourceManager, self)._fetch_resources(query)
303 for child_instance in children:
304 child_instance[annotation_key] = parent_instance
306 resources.extend(children)
308 return resources
310 def _get_parent_resource_info(self, child_instance):
311 mappings = self.resource_type.parent_spec['parent_get_params']
312 return self._extract_fields(child_instance, mappings)
314 def _get_child_enum_args(self, parent_instance):
315 mappings = self.resource_type.parent_spec['child_enum_params']
316 return self._extract_fields(parent_instance, mappings)
318 def get_parent_resource_query(self):
319 parent_spec = self.resource_type.parent_spec
320 enabled = parent_spec['use_child_query'] if 'use_child_query' in parent_spec else False
321 if enabled and 'query' in self.data:
322 return self.data.get('query')
324 @staticmethod
325 def _extract_fields(source, mappings):
326 result = {}
328 for mapping in mappings:
329 result[mapping[1]] = jmespath.search(mapping[0], source)
330 # Support for regex in child_enum_params.
331 # Without this support you could only map parent-child elements with the raw data
332 # they hold, but with regex you could regex that data as well while you map.
333 if 'regex' in mapping:
334 result[mapping[1]] = re.search(mapping[3], result[mapping[1]]).group(1)
336 return result
339class RegionalResourceManager(ChildResourceManager):
341 def get_parent_resource_query(self):
342 query = None
343 if self.config.regions and 'all' not in self.config.regions:
344 query = [{'name': r} for r in self.config.regions]
345 elif self.config.region:
346 query = [{'name': self.config.region}]
347 return query
350class TypeMeta(type):
352 def __repr__(cls):
353 return "<TypeInfo service:%s component:%s scope:%s version:%s>" % (
354 cls.service,
355 cls.component,
356 cls.scope,
357 cls.version)
360class TypeInfo(metaclass=TypeMeta):
362 # api client construction information
363 service = None
364 version = None
365 component = None
367 # resource enumeration parameters
369 scope = 'project'
370 enum_spec = ('list', 'items[]', None)
371 # ie. when project is passed instead as parent
372 scope_key = None
373 # custom formatting for scope key
374 scope_template = None
376 # individual resource retrieval method, for serverless policies.
377 get = None
378 # for get methods that require the full event payload
379 get_requires_event = False
380 perm_service = None
381 permissions = ()
383 labels = False
384 labels_op = 'setLabels'
386 # required for reporting
387 id = None
388 name = None
389 default_report_fields = ()
391 # cloud asset inventory type
392 asset_type = None
394 # URN generation
395 urn_region_key = 'region'
396 # A jmespath into the resource object to find the id element of the URN.
397 # If unset, it uses the value for id.
398 urn_id_path = None
399 # It is frequent enough that the id we want for the URN is made up of one or more
400 # path segments from the id. Ids are frequently '/' delimited strings.
401 # If set, this should be an iterable of integer indices into the segments.
402 urn_id_segments = None
403 # By default the component is taken for the URN. Can be overridden by specifying
404 # a specific urn_component.
405 urn_component = None
406 # Truly global resources should override this to the empty string.
407 urn_has_project = True
408 # The location element is a zone, not a region.
409 urn_zonal = False
411 # If the type supports refreshing an individual resource
412 refresh = None
414 # Some resources don't support monitoring (and hence, no filtering).
415 # Assume they generally can, and flip this to `False` for those that don't.
416 allow_metrics_filters = True
418 @classmethod
419 def get_metric_resource_name(cls, resource):
420 return resource.get(cls.name)
422 @classmethod
423 def get_urns(cls, resources, project_id):
424 """Generate URNs for the resources.
426 A Uniform Resource Name (URN) is a URI that identifies a resource by
427 name in a particular namespace. A URN may be used to talk about a
428 resource without implying its location or how to access it.
430 The generated URNs can uniquely identify any given resource.
432 The generated URN is intended to follow a similar pattern to ARN, but be
433 specific to GCP.
435 gcp:<service>:<location>:<project>:<resource-type>/<resource-id>
437 If the region is "global" then it is omitted from the URN.
438 """
439 return [cls._get_urn(r, project_id) for r in resources]
441 @classmethod
442 def _get_urn(cls, resource, project_id) -> str:
443 "Generate an URN for the resource."
444 location = cls._get_location(resource)
445 if location == "global":
446 location = ""
447 id = cls._get_urn_id(resource)
448 if not cls.urn_has_project:
449 project_id = ""
450 # NOTE: not sure whether to use `component` or just the last part of
451 # `component` (split on '.') for the part after project
452 return f"gcp:{cls.service}:{location}:{project_id}:{cls.urn_component}/{id}"
454 @classmethod
455 def _get_urn_id(cls, resource):
456 path = cls.urn_id_path
457 if path is None:
458 path = cls.id
459 id = jmespath_search(path, resource)
460 if cls.urn_id_segments:
461 parts = id.split('/')
462 id = '/'.join([parts[index] for index in cls.urn_id_segments])
463 return id
465 @classmethod
466 def _get_location(cls, resource):
467 """Get the region for a single resource.
469 Resources are either global, regional, or zonal. When a resource is
470 is zonal, the region is determined from the zone.
471 """
472 if cls.urn_zonal and "zone" in resource:
473 zone = resource["zone"].rsplit("/", 1)[-1]
474 return zone
476 if cls.urn_region_key in resource:
477 return resource[cls.urn_region_key].rsplit("/", 1)[-1]
479 return "global"
482class ChildTypeInfo(TypeInfo):
484 parent_spec = None
486 @classmethod
487 def get_parent_annotation_key(cls):
488 parent_resource = cls.parent_spec['resource']
489 return 'c7n:{}'.format(parent_resource)
491 @classmethod
492 def get_parent(cls, resource):
493 "Return the annotated parent resource."
494 return resource[cls.get_parent_annotation_key()]
497ERROR_REASON = jmespath_compile('error.errors[0].reason')
498ERROR_CODE = jmespath_compile('error.code')
499ERROR_MESSAGE = jmespath_compile('error.message')
502def extract_errors(e):
503 try:
504 edata = json.loads(e.content)
505 except Exception:
506 edata = None
508 return ERROR_REASON.search(edata), ERROR_CODE.search(edata), ERROR_MESSAGE.search(edata)
511class GcpLocation:
512 """
513 The `_locations` dict is formed by the string keys representing locations taken from
514 `KMS <https://cloud.google.com/kms/docs/reference/rest/v1/projects.locations/list>`_ and
515 `App Engine <https://cloud.google.com/appengine/docs/admin-api/reference/rest/v1
516 /apps.locations/list>`_ and list values containing the string names of the services
517 the locations are available for.
518 """
519 _locations = {'eur4': ['kms'],
520 'global': ['kms'],
521 'europe-west4': ['kms'],
522 'asia-east2': ['appengine', 'kms'],
523 'asia-east1': ['kms'],
524 'asia': ['kms'],
525 'europe-north1': ['kms'],
526 'us-central1': ['kms'],
527 'nam4': ['kms'],
528 'asia-southeast1': ['kms'],
529 'europe': ['kms'],
530 'australia-southeast1': ['appengine', 'kms'],
531 'us-central': ['appengine'],
532 'asia-south1': ['appengine', 'kms'],
533 'us-west1': ['kms'],
534 'us-west2': ['appengine', 'kms'],
535 'asia-northeast2': ['appengine', 'kms'],
536 'asia-northeast1': ['appengine', 'kms'],
537 'europe-west2': ['appengine', 'kms'],
538 'europe-west3': ['appengine', 'kms'],
539 'us-east4': ['appengine', 'kms'],
540 'europe-west1': ['kms'],
541 'europe-west6': ['appengine', 'kms'],
542 'us': ['kms'],
543 'us-east1': ['appengine', 'kms'],
544 'northamerica-northeast1': ['appengine', 'kms'],
545 'europe-west': ['appengine'],
546 'southamerica-east1': ['appengine', 'kms']}
548 @classmethod
549 def get_service_locations(cls, service):
550 """
551 Returns a list of the locations that have a given service in associated value lists.
553 :param service: a string representing the name of a service locations are queried for
554 """
555 return [location for location in GcpLocation._locations
556 if service in GcpLocation._locations[location]]