Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/c7n_gcp/client.py: 67%
171 statements
« prev ^ index » next coverage.py v7.3.2, created at 2023-12-08 06:51 +0000
« prev ^ index » next coverage.py v7.3.2, created at 2023-12-08 06:51 +0000
1# Copyright The Cloud Custodian Authors.
2# SPDX-License-Identifier: Apache-2.0
3# Copyright 2017 The Forseti Security Authors. All rights reserved.
4#
5# Licensed under the Apache License, Version 2.0 (the "License");
6# you may not use this file except in compliance with the License.
7# You may obtain a copy of the License at
8#
9# http://www.apache.org/licenses/LICENSE-2.0
10#
11# Unless required by applicable law or agreed to in writing, software
12# distributed under the License is distributed on an "AS IS" BASIS,
13# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14# See the License for the specific language governing permissions and
15# limitations under the License.
16"""Base GCP client which uses the discovery API.
17"""
18# modifications (c7n)
19# - flight recorder support
20# - env creds sourcing
21# - various minor bug fixes
23# todo:
24# - consider forking googleapiclient to get rid of httplib2
26import http.client
27import logging
28import threading
29import os
30import socket
31import ssl
32from contextlib import nullcontext as no_rate_limiter
33from urllib.error import URLError
35from googleapiclient import discovery, errors # NOQA
36from googleapiclient.http import set_user_agent
37from google.auth.credentials import with_scopes_if_required
38import google.auth.impersonated_credentials
39import google.oauth2.credentials
40import google_auth_httplib2
42import httplib2
43from pyrate_limiter import Limiter, RequestRate
45from retrying import retry
48HTTPLIB_CA_BUNDLE = os.environ.get('HTTPLIB_CA_BUNDLE')
49GOOGLE_IMPERSONATE_SERVICE_ACCOUNT = os.environ.get('GOOGLE_IMPERSONATE_SERVICE_ACCOUNT')
51CLOUD_SCOPES = frozenset(['https://www.googleapis.com/auth/cloud-platform'])
53# Per request max wait timeout.
54HTTP_REQUEST_TIMEOUT = 30.0
56# Per thread storage.
57LOCAL_THREAD = threading.local()
59log = logging.getLogger('c7n_gcp.client')
61# Default value num_retries within HttpRequest execute method
62NUM_HTTP_RETRIES = 5
64RETRYABLE_EXCEPTIONS = (
65 http.client.ResponseNotReady,
66 http.client.IncompleteRead,
67 httplib2.ServerNotFoundError,
68 socket.error,
69 ssl.SSLError,
70 URLError, # include "no network connection"
71)
74def get_default_project():
75 for k in ('GCP_PROJECT', 'GOOGLE_PROJECT', 'GCLOUD_PROJECT',
76 'GOOGLE_CLOUD_PROJECT', 'CLOUDSDK_CORE_PROJECT'):
77 if k in os.environ:
78 return os.environ[k]
81class PaginationNotSupported(Exception):
82 """Pagination not supported on this api."""
85def is_retryable_exception(e):
86 """Whether exception should be retried.
88 Args:
89 e (Exception): Exception object.
91 Returns:
92 bool: True for exceptions to retry. False otherwise.
93 """
94 return isinstance(e, RETRYABLE_EXCEPTIONS)
97@retry(retry_on_exception=is_retryable_exception,
98 wait_exponential_multiplier=1000,
99 wait_exponential_max=10000,
100 stop_max_attempt_number=5)
101def _create_service_api(credentials, service_name, version, developer_key=None,
102 cache_discovery=False, http=None):
103 """Builds and returns a cloud API service object.
105 Args:
106 credentials (OAuth2Credentials): Credentials that will be used to
107 authenticate the API calls.
108 service_name (str): The name of the API.
109 version (str): The version of the API to use.
110 developer_key (str): The api key to use to determine the project
111 associated with the API call, most API services do not require
112 this to be set.
113 cache_discovery (bool): Whether or not to cache the discovery doc.
115 Returns:
116 object: A Resource object with methods for interacting with the service.
117 """
118 # The default logging of the discovery obj is very noisy in recent versions.
119 # Lower the default logging level of just this module to WARNING unless
120 # debug is enabled.
121 if log.getEffectiveLevel() > logging.DEBUG:
122 logging.getLogger(discovery.__name__).setLevel(logging.WARNING)
124 discovery_kwargs = {
125 'serviceName': service_name,
126 'version': version,
127 'developerKey': developer_key,
128 'cache_discovery': cache_discovery,
129 }
131 if http:
132 discovery_kwargs['http'] = http
133 else:
134 discovery_kwargs['credentials'] = credentials
136 return discovery.build(**discovery_kwargs)
139def _build_http(http=None):
140 """Construct an http client suitable for googleapiclient usage w/ user agent.
141 """
142 if not http:
143 http = httplib2.Http(
144 timeout=HTTP_REQUEST_TIMEOUT, ca_certs=HTTPLIB_CA_BUNDLE)
146 user_agent = 'Python-httplib2/{} (gzip), {}/{}'.format(
147 httplib2.__version__,
148 'custodian-gcp',
149 '0.1')
150 return set_user_agent(http, user_agent)
153class Session:
154 """Base class for API repository for a specified Cloud API."""
156 def __init__(self,
157 credentials=None,
158 quota_max_calls=None,
159 quota_period=None,
160 use_rate_limiter=False,
161 http=None,
162 project_id=None,
163 impersonate_service=None,
164 **kwargs):
165 """Constructor.
167 Args:
168 api_name (str): The API name to wrap. More details here:
169 https://developers.google.com/api-client-library/python/apis/
170 versions (list): A list of version strings to initialize.
171 credentials (object): GoogleCredentials.
172 quota_max_calls (int): Allowed requests per <quota_period> for the
173 API.
174 quota_period (float): The time period (in seconds) to track requests over.
175 use_rate_limiter (bool): Set to false to disable the use of a rate
176 limiter for this service.
177 **kwargs (dict): Additional args such as version.
178 """
179 self._use_cached_http = False
180 if not credentials:
181 # Only share the http object when using the default credentials.
182 self._use_cached_http = True
183 # This causes error: https://github.com/cloud-custodian/cloud-custodian/issues/7155
184 #default_credentials, _ = google.auth.default(
185 # quota_project_id=project_id or get_default_project()
186 #)
187 default_credentials, _ = google.auth.default()
188 impersonated_credentials = None
189 if impersonate_service or GOOGLE_IMPERSONATE_SERVICE_ACCOUNT:
190 impersonate_target = impersonate_service or GOOGLE_IMPERSONATE_SERVICE_ACCOUNT
191 log.info('using impersonated service account %s', impersonate_target)
192 impersonated_credentials = google.auth.impersonated_credentials.Credentials(
193 source_credentials=credentials or default_credentials,
194 target_principal=impersonate_target,
195 target_scopes=list(CLOUD_SCOPES))
196 target_credentials = impersonated_credentials or credentials or default_credentials
197 if not impersonated_credentials:
198 # get token with scopes if necessary
199 self._credentials = with_scopes_if_required(target_credentials, list(CLOUD_SCOPES))
200 else:
201 # impersonated_credentials already have scope
202 self._credentials = target_credentials
203 if use_rate_limiter:
204 limiter = Limiter(RequestRate(quota_max_calls, quota_period))
205 self._rate_limiter = limiter.ratelimit('gcp_session', delay=True)
206 else:
207 self._rate_limiter = no_rate_limiter()
208 self._http = http
210 self.project_id = project_id
212 def __repr__(self):
213 """The object representation.
215 Returns:
216 str: The object representation.
217 """
218 return '<gcp-session: http=%s>' % (self._http,)
220 def get_default_project(self):
221 if self.project_id:
222 return self.project_id
223 default_project = get_default_project()
224 if default_project:
225 return default_project
227 raise ValueError("No GCP Project ID set - set CLOUDSDK_CORE_PROJECT")
229 def get_default_region(self):
230 for k in ('GOOGLE_REGION', 'GCLOUD_REGION', 'CLOUDSDK_COMPUTE_REGION'):
231 if k in os.environ:
232 return os.environ[k]
234 def get_default_zone(self):
235 for k in ('GOOGLE_ZONE', 'GCLOUD_ZONE', 'CLOUDSDK_COMPUTE_ZONE'):
236 if k in os.environ:
237 return os.environ[k]
239 def client(self, service_name, version, component, **kw):
240 """Safely initialize a repository class to a property.
242 Args:
243 repository_class (class): The class to initialize.
244 version (str): The gcp service version for the repository.
246 Returns:
247 object: An instance of repository_class.
248 """
249 service = _create_service_api(
250 self._credentials,
251 service_name,
252 version,
253 kw.get('developer_key'),
254 kw.get('cache_discovery', False),
255 self._http or _build_http())
257 return ServiceClient(
258 gcp_service=service,
259 component=component,
260 credentials=self._credentials,
261 rate_limiter=self._rate_limiter,
262 use_cached_http=self._use_cached_http,
263 http=self._http)
266# pylint: disable=too-many-instance-attributes, too-many-arguments
267class ServiceClient:
268 """Base class for GCP APIs."""
270 def __init__(self, gcp_service, credentials, component=None,
271 num_retries=NUM_HTTP_RETRIES, key_field='project',
272 entity_field=None, list_key_field=None, get_key_field=None,
273 max_results_field='maxResults', search_query_field='query',
274 rate_limiter=None, use_cached_http=True, http=None):
275 """Constructor.
277 Args:
278 gcp_service (object): A Resource object with methods for interacting
279 with the service.
280 credentials (OAuth2Credentials): A Credentials object
281 component (str): The subcomponent of the gcp service for this
282 repository instance. E.g. 'instances' for compute.instances().*
283 APIs
284 num_retries (int): The number of http retriable errors to retry on
285 before hard failing.
286 key_field (str): The field name representing the project to
287 query in the API.
288 entity_field (str): The API entity returned generally by the .get()
289 api. E.g. 'instance' for compute.instances().get()
290 list_key_field (str): Optional override of key field for calls to
291 list methods.
292 get_key_field (str): Optional override of key field for calls to
293 get methods.
294 max_results_field (str): The field name that represents the maximum
295 number of results to return in one page.
296 search_query_field (str): The field name used to filter search
297 results.
298 rate_limiter (object): A RateLimiter object to manage API quota.
299 use_cached_http (bool): If set to true, calls to the API will use
300 a thread local shared http object. When false a new http object
301 is used for each request.
302 """
303 self.gcp_service = gcp_service
304 self._credentials = credentials
305 self._component = None
307 if component:
308 component_api = gcp_service
309 for c in component.split('.'):
310 component_api = getattr(component_api, c)()
312 self._component = component_api
314 self._entity_field = entity_field
315 self._num_retries = num_retries
316 if list_key_field:
317 self._list_key_field = list_key_field
318 else:
319 self._list_key_field = key_field
320 if get_key_field:
321 self._get_key_field = get_key_field
322 else:
323 self._get_key_field = key_field
324 self._max_results_field = max_results_field
325 self._search_query_field = search_query_field
326 self._rate_limiter = rate_limiter
328 self._use_cached_http = use_cached_http
329 self._local = LOCAL_THREAD
330 self._http_replay = http
332 @property
333 def http(self):
334 """A thread local instance of httplib2.Http.
336 Returns:
337 httplib2.Http: An Http instance authorized by the credentials.
338 """
339 if self._use_cached_http and hasattr(self._local, 'http'):
340 return self._local.http
341 if self._http_replay is not None:
342 # httplib2 instance is not thread safe
343 http = self._http_replay
344 else:
345 http = _build_http()
346 authorized_http = google_auth_httplib2.AuthorizedHttp(
347 self._credentials, http=http)
348 if self._use_cached_http:
349 self._local.http = authorized_http
350 return authorized_http
352 def get_http(self):
353 """Return an http instance sans credentials"""
354 if self._http_replay:
355 return self._http_replay
356 return _build_http()
358 def _build_request(self, verb, verb_arguments):
359 """Builds HttpRequest object.
361 Args:
362 verb (str): Request verb (ex. insert, update, delete).
363 verb_arguments (dict): Arguments to be passed with the request.
365 Returns:
366 httplib2.HttpRequest: HttpRequest to be sent to the API.
367 """
368 method = getattr(self._component, verb)
370 # Python insists that keys in **kwargs be strings (not variables).
371 # Since we initially build our kwargs as a dictionary where one of the
372 # keys is a variable (target), we need to convert keys to strings,
373 # even though the variable in question is of type str.
374 method_args = {str(k): v for k, v in verb_arguments.items()}
375 return method(**method_args)
377 def _build_next_request(self, verb, prior_request, prior_response):
378 """Builds pagination-aware request object.
380 More details:
381 https://developers.google.com/api-client-library/python/guide/pagination
383 Args:
384 verb (str): Request verb (ex. insert, update, delete).
385 prior_request (httplib2.HttpRequest): Request that may trigger
386 paging.
387 prior_response (dict): Potentially partial response.
389 Returns:
390 httplib2.HttpRequest: HttpRequest or None. None is returned when
391 there is nothing more to fetch - request completed.
392 """
393 method = getattr(self._component, verb + '_next')
394 return method(prior_request, prior_response)
396 def supports_pagination(self, verb):
397 """Determines if the API action supports pagination.
399 Args:
400 verb (str): Request verb (ex. insert, update, delete).
402 Returns:
403 bool: True when API supports pagination, False otherwise.
404 """
405 return getattr(self._component, verb + '_next', None)
407 def execute_command(self, verb, verb_arguments):
408 """Executes command (ex. add) via a dedicated http object.
410 Async APIs may take minutes to complete. Therefore, callers are
411 encouraged to leverage concurrent.futures (or similar) to place long
412 running commands on a separate threads.
414 Args:
415 verb (str): Method to execute on the component (ex. get, list).
416 verb_arguments (dict): key-value pairs to be passed to _build_request.
418 Returns:
419 dict: An async operation Service Response.
420 """
421 request = self._build_request(verb, verb_arguments)
422 return self._execute(request)
424 def execute_paged_query(self, verb, verb_arguments):
425 """Executes query (ex. list) via a dedicated http object.
427 Args:
428 verb (str): Method to execute on the component (ex. get, list).
429 verb_arguments (dict): key-value pairs to be passed to _BuildRequest.
431 Yields:
432 dict: Service Response.
434 Raises:
435 PaginationNotSupportedError: When an API does not support paging.
436 """
437 if not self.supports_pagination(verb=verb):
438 raise PaginationNotSupported('{} does not support pagination')
440 request = self._build_request(verb, verb_arguments)
442 number_of_pages_processed = 0
443 while request is not None:
444 response = self._execute(request)
445 number_of_pages_processed += 1
446 log.debug('Executing paged request #%s', number_of_pages_processed)
447 request = self._build_next_request(verb, request, response)
448 yield response
450 def execute_search_query(self, verb, verb_arguments):
451 """Executes query (ex. search) via a dedicated http object.
453 Args:
454 verb (str): Method to execute on the component (ex. search).
455 verb_arguments (dict): key-value pairs to be passed to _BuildRequest.
457 Yields:
458 dict: Service Response.
459 """
460 # Implementation of search does not follow the standard API pattern.
461 # Fields need to be in the body rather than sent seperately.
462 next_page_token = None
463 number_of_pages_processed = 0
464 while True:
465 req_body = verb_arguments.get('body', dict())
466 if next_page_token:
467 req_body['pageToken'] = next_page_token
468 request = self._build_request(verb, verb_arguments)
469 response = self._execute(request)
470 number_of_pages_processed += 1
471 log.debug('Executing paged request #%s', number_of_pages_processed)
472 next_page_token = response.get('nextPageToken')
473 yield response
475 if not next_page_token:
476 break
478 def execute_query(self, verb, verb_arguments):
479 """Executes query (ex. get) via a dedicated http object.
481 Args:
482 verb (str): Method to execute on the component (ex. get, list).
483 verb_arguments (dict): key-value pairs to be passed to _BuildRequest.
485 Returns:
486 dict: Service Response.
487 """
488 request = self._build_request(verb, verb_arguments)
489 return self._execute(request)
491 @retry(retry_on_exception=is_retryable_exception,
492 wait_exponential_multiplier=1000,
493 wait_exponential_max=10000,
494 stop_max_attempt_number=5)
495 def _execute(self, request):
496 """Run execute with retries and rate limiting.
498 Args:
499 request (object): The HttpRequest object to execute.
501 Returns:
502 dict: The response from the API.
503 """
504 with self._rate_limiter:
505 return request.execute(http=self.http, num_retries=self._num_retries)