Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/c7n_gcp/client.py: 67%

171 statements  

« prev     ^ index     » next       coverage.py v7.3.2, created at 2023-12-08 06:51 +0000

1# Copyright The Cloud Custodian Authors. 

2# SPDX-License-Identifier: Apache-2.0 

3# Copyright 2017 The Forseti Security Authors. All rights reserved. 

4# 

5# Licensed under the Apache License, Version 2.0 (the "License"); 

6# you may not use this file except in compliance with the License. 

7# You may obtain a copy of the License at 

8# 

9# http://www.apache.org/licenses/LICENSE-2.0 

10# 

11# Unless required by applicable law or agreed to in writing, software 

12# distributed under the License is distributed on an "AS IS" BASIS, 

13# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

14# See the License for the specific language governing permissions and 

15# limitations under the License. 

16"""Base GCP client which uses the discovery API. 

17""" 

18# modifications (c7n) 

19# - flight recorder support 

20# - env creds sourcing 

21# - various minor bug fixes 

22 

23# todo: 

24# - consider forking googleapiclient to get rid of httplib2 

25 

26import http.client 

27import logging 

28import threading 

29import os 

30import socket 

31import ssl 

32from contextlib import nullcontext as no_rate_limiter 

33from urllib.error import URLError 

34 

35from googleapiclient import discovery, errors # NOQA 

36from googleapiclient.http import set_user_agent 

37from google.auth.credentials import with_scopes_if_required 

38import google.auth.impersonated_credentials 

39import google.oauth2.credentials 

40import google_auth_httplib2 

41 

42import httplib2 

43from pyrate_limiter import Limiter, RequestRate 

44 

45from retrying import retry 

46 

47 

48HTTPLIB_CA_BUNDLE = os.environ.get('HTTPLIB_CA_BUNDLE') 

49GOOGLE_IMPERSONATE_SERVICE_ACCOUNT = os.environ.get('GOOGLE_IMPERSONATE_SERVICE_ACCOUNT') 

50 

51CLOUD_SCOPES = frozenset(['https://www.googleapis.com/auth/cloud-platform']) 

52 

53# Per request max wait timeout. 

54HTTP_REQUEST_TIMEOUT = 30.0 

55 

56# Per thread storage. 

57LOCAL_THREAD = threading.local() 

58 

59log = logging.getLogger('c7n_gcp.client') 

60 

61# Default value num_retries within HttpRequest execute method 

62NUM_HTTP_RETRIES = 5 

63 

64RETRYABLE_EXCEPTIONS = ( 

65 http.client.ResponseNotReady, 

66 http.client.IncompleteRead, 

67 httplib2.ServerNotFoundError, 

68 socket.error, 

69 ssl.SSLError, 

70 URLError, # include "no network connection" 

71) 

72 

73 

74def get_default_project(): 

75 for k in ('GCP_PROJECT', 'GOOGLE_PROJECT', 'GCLOUD_PROJECT', 

76 'GOOGLE_CLOUD_PROJECT', 'CLOUDSDK_CORE_PROJECT'): 

77 if k in os.environ: 

78 return os.environ[k] 

79 

80 

81class PaginationNotSupported(Exception): 

82 """Pagination not supported on this api.""" 

83 

84 

85def is_retryable_exception(e): 

86 """Whether exception should be retried. 

87 

88 Args: 

89 e (Exception): Exception object. 

90 

91 Returns: 

92 bool: True for exceptions to retry. False otherwise. 

93 """ 

94 return isinstance(e, RETRYABLE_EXCEPTIONS) 

95 

96 

97@retry(retry_on_exception=is_retryable_exception, 

98 wait_exponential_multiplier=1000, 

99 wait_exponential_max=10000, 

100 stop_max_attempt_number=5) 

101def _create_service_api(credentials, service_name, version, developer_key=None, 

102 cache_discovery=False, http=None): 

103 """Builds and returns a cloud API service object. 

104 

105 Args: 

106 credentials (OAuth2Credentials): Credentials that will be used to 

107 authenticate the API calls. 

108 service_name (str): The name of the API. 

109 version (str): The version of the API to use. 

110 developer_key (str): The api key to use to determine the project 

111 associated with the API call, most API services do not require 

112 this to be set. 

113 cache_discovery (bool): Whether or not to cache the discovery doc. 

114 

115 Returns: 

116 object: A Resource object with methods for interacting with the service. 

117 """ 

118 # The default logging of the discovery obj is very noisy in recent versions. 

119 # Lower the default logging level of just this module to WARNING unless 

120 # debug is enabled. 

121 if log.getEffectiveLevel() > logging.DEBUG: 

122 logging.getLogger(discovery.__name__).setLevel(logging.WARNING) 

123 

124 discovery_kwargs = { 

125 'serviceName': service_name, 

126 'version': version, 

127 'developerKey': developer_key, 

128 'cache_discovery': cache_discovery, 

129 } 

130 

131 if http: 

132 discovery_kwargs['http'] = http 

133 else: 

134 discovery_kwargs['credentials'] = credentials 

135 

136 return discovery.build(**discovery_kwargs) 

137 

138 

139def _build_http(http=None): 

140 """Construct an http client suitable for googleapiclient usage w/ user agent. 

141 """ 

142 if not http: 

143 http = httplib2.Http( 

144 timeout=HTTP_REQUEST_TIMEOUT, ca_certs=HTTPLIB_CA_BUNDLE) 

145 

146 user_agent = 'Python-httplib2/{} (gzip), {}/{}'.format( 

147 httplib2.__version__, 

148 'custodian-gcp', 

149 '0.1') 

150 return set_user_agent(http, user_agent) 

151 

152 

153class Session: 

154 """Base class for API repository for a specified Cloud API.""" 

155 

156 def __init__(self, 

157 credentials=None, 

158 quota_max_calls=None, 

159 quota_period=None, 

160 use_rate_limiter=False, 

161 http=None, 

162 project_id=None, 

163 impersonate_service=None, 

164 **kwargs): 

165 """Constructor. 

166 

167 Args: 

168 api_name (str): The API name to wrap. More details here: 

169 https://developers.google.com/api-client-library/python/apis/ 

170 versions (list): A list of version strings to initialize. 

171 credentials (object): GoogleCredentials. 

172 quota_max_calls (int): Allowed requests per <quota_period> for the 

173 API. 

174 quota_period (float): The time period (in seconds) to track requests over. 

175 use_rate_limiter (bool): Set to false to disable the use of a rate 

176 limiter for this service. 

177 **kwargs (dict): Additional args such as version. 

178 """ 

179 self._use_cached_http = False 

180 if not credentials: 

181 # Only share the http object when using the default credentials. 

182 self._use_cached_http = True 

183 # This causes error: https://github.com/cloud-custodian/cloud-custodian/issues/7155 

184 #default_credentials, _ = google.auth.default( 

185 # quota_project_id=project_id or get_default_project() 

186 #) 

187 default_credentials, _ = google.auth.default() 

188 impersonated_credentials = None 

189 if impersonate_service or GOOGLE_IMPERSONATE_SERVICE_ACCOUNT: 

190 impersonate_target = impersonate_service or GOOGLE_IMPERSONATE_SERVICE_ACCOUNT 

191 log.info('using impersonated service account %s', impersonate_target) 

192 impersonated_credentials = google.auth.impersonated_credentials.Credentials( 

193 source_credentials=credentials or default_credentials, 

194 target_principal=impersonate_target, 

195 target_scopes=list(CLOUD_SCOPES)) 

196 target_credentials = impersonated_credentials or credentials or default_credentials 

197 if not impersonated_credentials: 

198 # get token with scopes if necessary 

199 self._credentials = with_scopes_if_required(target_credentials, list(CLOUD_SCOPES)) 

200 else: 

201 # impersonated_credentials already have scope 

202 self._credentials = target_credentials 

203 if use_rate_limiter: 

204 limiter = Limiter(RequestRate(quota_max_calls, quota_period)) 

205 self._rate_limiter = limiter.ratelimit('gcp_session', delay=True) 

206 else: 

207 self._rate_limiter = no_rate_limiter() 

208 self._http = http 

209 

210 self.project_id = project_id 

211 

212 def __repr__(self): 

213 """The object representation. 

214 

215 Returns: 

216 str: The object representation. 

217 """ 

218 return '<gcp-session: http=%s>' % (self._http,) 

219 

220 def get_default_project(self): 

221 if self.project_id: 

222 return self.project_id 

223 default_project = get_default_project() 

224 if default_project: 

225 return default_project 

226 

227 raise ValueError("No GCP Project ID set - set CLOUDSDK_CORE_PROJECT") 

228 

229 def get_default_region(self): 

230 for k in ('GOOGLE_REGION', 'GCLOUD_REGION', 'CLOUDSDK_COMPUTE_REGION'): 

231 if k in os.environ: 

232 return os.environ[k] 

233 

234 def get_default_zone(self): 

235 for k in ('GOOGLE_ZONE', 'GCLOUD_ZONE', 'CLOUDSDK_COMPUTE_ZONE'): 

236 if k in os.environ: 

237 return os.environ[k] 

238 

239 def client(self, service_name, version, component, **kw): 

240 """Safely initialize a repository class to a property. 

241 

242 Args: 

243 repository_class (class): The class to initialize. 

244 version (str): The gcp service version for the repository. 

245 

246 Returns: 

247 object: An instance of repository_class. 

248 """ 

249 service = _create_service_api( 

250 self._credentials, 

251 service_name, 

252 version, 

253 kw.get('developer_key'), 

254 kw.get('cache_discovery', False), 

255 self._http or _build_http()) 

256 

257 return ServiceClient( 

258 gcp_service=service, 

259 component=component, 

260 credentials=self._credentials, 

261 rate_limiter=self._rate_limiter, 

262 use_cached_http=self._use_cached_http, 

263 http=self._http) 

264 

265 

266# pylint: disable=too-many-instance-attributes, too-many-arguments 

267class ServiceClient: 

268 """Base class for GCP APIs.""" 

269 

270 def __init__(self, gcp_service, credentials, component=None, 

271 num_retries=NUM_HTTP_RETRIES, key_field='project', 

272 entity_field=None, list_key_field=None, get_key_field=None, 

273 max_results_field='maxResults', search_query_field='query', 

274 rate_limiter=None, use_cached_http=True, http=None): 

275 """Constructor. 

276 

277 Args: 

278 gcp_service (object): A Resource object with methods for interacting 

279 with the service. 

280 credentials (OAuth2Credentials): A Credentials object 

281 component (str): The subcomponent of the gcp service for this 

282 repository instance. E.g. 'instances' for compute.instances().* 

283 APIs 

284 num_retries (int): The number of http retriable errors to retry on 

285 before hard failing. 

286 key_field (str): The field name representing the project to 

287 query in the API. 

288 entity_field (str): The API entity returned generally by the .get() 

289 api. E.g. 'instance' for compute.instances().get() 

290 list_key_field (str): Optional override of key field for calls to 

291 list methods. 

292 get_key_field (str): Optional override of key field for calls to 

293 get methods. 

294 max_results_field (str): The field name that represents the maximum 

295 number of results to return in one page. 

296 search_query_field (str): The field name used to filter search 

297 results. 

298 rate_limiter (object): A RateLimiter object to manage API quota. 

299 use_cached_http (bool): If set to true, calls to the API will use 

300 a thread local shared http object. When false a new http object 

301 is used for each request. 

302 """ 

303 self.gcp_service = gcp_service 

304 self._credentials = credentials 

305 self._component = None 

306 

307 if component: 

308 component_api = gcp_service 

309 for c in component.split('.'): 

310 component_api = getattr(component_api, c)() 

311 

312 self._component = component_api 

313 

314 self._entity_field = entity_field 

315 self._num_retries = num_retries 

316 if list_key_field: 

317 self._list_key_field = list_key_field 

318 else: 

319 self._list_key_field = key_field 

320 if get_key_field: 

321 self._get_key_field = get_key_field 

322 else: 

323 self._get_key_field = key_field 

324 self._max_results_field = max_results_field 

325 self._search_query_field = search_query_field 

326 self._rate_limiter = rate_limiter 

327 

328 self._use_cached_http = use_cached_http 

329 self._local = LOCAL_THREAD 

330 self._http_replay = http 

331 

332 @property 

333 def http(self): 

334 """A thread local instance of httplib2.Http. 

335 

336 Returns: 

337 httplib2.Http: An Http instance authorized by the credentials. 

338 """ 

339 if self._use_cached_http and hasattr(self._local, 'http'): 

340 return self._local.http 

341 if self._http_replay is not None: 

342 # httplib2 instance is not thread safe 

343 http = self._http_replay 

344 else: 

345 http = _build_http() 

346 authorized_http = google_auth_httplib2.AuthorizedHttp( 

347 self._credentials, http=http) 

348 if self._use_cached_http: 

349 self._local.http = authorized_http 

350 return authorized_http 

351 

352 def get_http(self): 

353 """Return an http instance sans credentials""" 

354 if self._http_replay: 

355 return self._http_replay 

356 return _build_http() 

357 

358 def _build_request(self, verb, verb_arguments): 

359 """Builds HttpRequest object. 

360 

361 Args: 

362 verb (str): Request verb (ex. insert, update, delete). 

363 verb_arguments (dict): Arguments to be passed with the request. 

364 

365 Returns: 

366 httplib2.HttpRequest: HttpRequest to be sent to the API. 

367 """ 

368 method = getattr(self._component, verb) 

369 

370 # Python insists that keys in **kwargs be strings (not variables). 

371 # Since we initially build our kwargs as a dictionary where one of the 

372 # keys is a variable (target), we need to convert keys to strings, 

373 # even though the variable in question is of type str. 

374 method_args = {str(k): v for k, v in verb_arguments.items()} 

375 return method(**method_args) 

376 

377 def _build_next_request(self, verb, prior_request, prior_response): 

378 """Builds pagination-aware request object. 

379 

380 More details: 

381 https://developers.google.com/api-client-library/python/guide/pagination 

382 

383 Args: 

384 verb (str): Request verb (ex. insert, update, delete). 

385 prior_request (httplib2.HttpRequest): Request that may trigger 

386 paging. 

387 prior_response (dict): Potentially partial response. 

388 

389 Returns: 

390 httplib2.HttpRequest: HttpRequest or None. None is returned when 

391 there is nothing more to fetch - request completed. 

392 """ 

393 method = getattr(self._component, verb + '_next') 

394 return method(prior_request, prior_response) 

395 

396 def supports_pagination(self, verb): 

397 """Determines if the API action supports pagination. 

398 

399 Args: 

400 verb (str): Request verb (ex. insert, update, delete). 

401 

402 Returns: 

403 bool: True when API supports pagination, False otherwise. 

404 """ 

405 return getattr(self._component, verb + '_next', None) 

406 

407 def execute_command(self, verb, verb_arguments): 

408 """Executes command (ex. add) via a dedicated http object. 

409 

410 Async APIs may take minutes to complete. Therefore, callers are 

411 encouraged to leverage concurrent.futures (or similar) to place long 

412 running commands on a separate threads. 

413 

414 Args: 

415 verb (str): Method to execute on the component (ex. get, list). 

416 verb_arguments (dict): key-value pairs to be passed to _build_request. 

417 

418 Returns: 

419 dict: An async operation Service Response. 

420 """ 

421 request = self._build_request(verb, verb_arguments) 

422 return self._execute(request) 

423 

424 def execute_paged_query(self, verb, verb_arguments): 

425 """Executes query (ex. list) via a dedicated http object. 

426 

427 Args: 

428 verb (str): Method to execute on the component (ex. get, list). 

429 verb_arguments (dict): key-value pairs to be passed to _BuildRequest. 

430 

431 Yields: 

432 dict: Service Response. 

433 

434 Raises: 

435 PaginationNotSupportedError: When an API does not support paging. 

436 """ 

437 if not self.supports_pagination(verb=verb): 

438 raise PaginationNotSupported('{} does not support pagination') 

439 

440 request = self._build_request(verb, verb_arguments) 

441 

442 number_of_pages_processed = 0 

443 while request is not None: 

444 response = self._execute(request) 

445 number_of_pages_processed += 1 

446 log.debug('Executing paged request #%s', number_of_pages_processed) 

447 request = self._build_next_request(verb, request, response) 

448 yield response 

449 

450 def execute_search_query(self, verb, verb_arguments): 

451 """Executes query (ex. search) via a dedicated http object. 

452 

453 Args: 

454 verb (str): Method to execute on the component (ex. search). 

455 verb_arguments (dict): key-value pairs to be passed to _BuildRequest. 

456 

457 Yields: 

458 dict: Service Response. 

459 """ 

460 # Implementation of search does not follow the standard API pattern. 

461 # Fields need to be in the body rather than sent seperately. 

462 next_page_token = None 

463 number_of_pages_processed = 0 

464 while True: 

465 req_body = verb_arguments.get('body', dict()) 

466 if next_page_token: 

467 req_body['pageToken'] = next_page_token 

468 request = self._build_request(verb, verb_arguments) 

469 response = self._execute(request) 

470 number_of_pages_processed += 1 

471 log.debug('Executing paged request #%s', number_of_pages_processed) 

472 next_page_token = response.get('nextPageToken') 

473 yield response 

474 

475 if not next_page_token: 

476 break 

477 

478 def execute_query(self, verb, verb_arguments): 

479 """Executes query (ex. get) via a dedicated http object. 

480 

481 Args: 

482 verb (str): Method to execute on the component (ex. get, list). 

483 verb_arguments (dict): key-value pairs to be passed to _BuildRequest. 

484 

485 Returns: 

486 dict: Service Response. 

487 """ 

488 request = self._build_request(verb, verb_arguments) 

489 return self._execute(request) 

490 

491 @retry(retry_on_exception=is_retryable_exception, 

492 wait_exponential_multiplier=1000, 

493 wait_exponential_max=10000, 

494 stop_max_attempt_number=5) 

495 def _execute(self, request): 

496 """Run execute with retries and rate limiting. 

497 

498 Args: 

499 request (object): The HttpRequest object to execute. 

500 

501 Returns: 

502 dict: The response from the API. 

503 """ 

504 with self._rate_limiter: 

505 return request.execute(http=self.http, num_retries=self._num_retries)