Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/google/cloud/errorreporting_v1beta1/services/error_stats_service/transports/rest.py: 43%

148 statements  

« prev     ^ index     » next       coverage.py v7.3.2, created at 2023-12-08 06:45 +0000

1# -*- coding: utf-8 -*- 

2# Copyright 2023 Google LLC 

3# 

4# Licensed under the Apache License, Version 2.0 (the "License"); 

5# you may not use this file except in compliance with the License. 

6# You may obtain a copy of the License at 

7# 

8# http://www.apache.org/licenses/LICENSE-2.0 

9# 

10# Unless required by applicable law or agreed to in writing, software 

11# distributed under the License is distributed on an "AS IS" BASIS, 

12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

13# See the License for the specific language governing permissions and 

14# limitations under the License. 

15# 

16 

17from google.auth.transport.requests import AuthorizedSession # type: ignore 

18import json # type: ignore 

19import grpc # type: ignore 

20from google.auth.transport.grpc import SslCredentials # type: ignore 

21from google.auth import credentials as ga_credentials # type: ignore 

22from google.api_core import exceptions as core_exceptions 

23from google.api_core import retry as retries 

24from google.api_core import rest_helpers 

25from google.api_core import rest_streaming 

26from google.api_core import path_template 

27from google.api_core import gapic_v1 

28 

29from google.protobuf import json_format 

30from requests import __version__ as requests_version 

31import dataclasses 

32import re 

33from typing import Any, Callable, Dict, List, Optional, Sequence, Tuple, Union 

34import warnings 

35 

36try: 

37 OptionalRetry = Union[retries.Retry, gapic_v1.method._MethodDefault] 

38except AttributeError: # pragma: NO COVER 

39 OptionalRetry = Union[retries.Retry, object] # type: ignore 

40 

41 

42from google.cloud.errorreporting_v1beta1.types import error_stats_service 

43 

44from .base import ( 

45 ErrorStatsServiceTransport, 

46 DEFAULT_CLIENT_INFO as BASE_DEFAULT_CLIENT_INFO, 

47) 

48 

49 

50DEFAULT_CLIENT_INFO = gapic_v1.client_info.ClientInfo( 

51 gapic_version=BASE_DEFAULT_CLIENT_INFO.gapic_version, 

52 grpc_version=None, 

53 rest_version=requests_version, 

54) 

55 

56 

57class ErrorStatsServiceRestInterceptor: 

58 """Interceptor for ErrorStatsService. 

59 

60 Interceptors are used to manipulate requests, request metadata, and responses 

61 in arbitrary ways. 

62 Example use cases include: 

63 * Logging 

64 * Verifying requests according to service or custom semantics 

65 * Stripping extraneous information from responses 

66 

67 These use cases and more can be enabled by injecting an 

68 instance of a custom subclass when constructing the ErrorStatsServiceRestTransport. 

69 

70 .. code-block:: python 

71 class MyCustomErrorStatsServiceInterceptor(ErrorStatsServiceRestInterceptor): 

72 def pre_delete_events(self, request, metadata): 

73 logging.log(f"Received request: {request}") 

74 return request, metadata 

75 

76 def post_delete_events(self, response): 

77 logging.log(f"Received response: {response}") 

78 return response 

79 

80 def pre_list_events(self, request, metadata): 

81 logging.log(f"Received request: {request}") 

82 return request, metadata 

83 

84 def post_list_events(self, response): 

85 logging.log(f"Received response: {response}") 

86 return response 

87 

88 def pre_list_group_stats(self, request, metadata): 

89 logging.log(f"Received request: {request}") 

90 return request, metadata 

91 

92 def post_list_group_stats(self, response): 

93 logging.log(f"Received response: {response}") 

94 return response 

95 

96 transport = ErrorStatsServiceRestTransport(interceptor=MyCustomErrorStatsServiceInterceptor()) 

97 client = ErrorStatsServiceClient(transport=transport) 

98 

99 

100 """ 

101 

102 def pre_delete_events( 

103 self, 

104 request: error_stats_service.DeleteEventsRequest, 

105 metadata: Sequence[Tuple[str, str]], 

106 ) -> Tuple[error_stats_service.DeleteEventsRequest, Sequence[Tuple[str, str]]]: 

107 """Pre-rpc interceptor for delete_events 

108 

109 Override in a subclass to manipulate the request or metadata 

110 before they are sent to the ErrorStatsService server. 

111 """ 

112 return request, metadata 

113 

114 def post_delete_events( 

115 self, response: error_stats_service.DeleteEventsResponse 

116 ) -> error_stats_service.DeleteEventsResponse: 

117 """Post-rpc interceptor for delete_events 

118 

119 Override in a subclass to manipulate the response 

120 after it is returned by the ErrorStatsService server but before 

121 it is returned to user code. 

122 """ 

123 return response 

124 

125 def pre_list_events( 

126 self, 

127 request: error_stats_service.ListEventsRequest, 

128 metadata: Sequence[Tuple[str, str]], 

129 ) -> Tuple[error_stats_service.ListEventsRequest, Sequence[Tuple[str, str]]]: 

130 """Pre-rpc interceptor for list_events 

131 

132 Override in a subclass to manipulate the request or metadata 

133 before they are sent to the ErrorStatsService server. 

134 """ 

135 return request, metadata 

136 

137 def post_list_events( 

138 self, response: error_stats_service.ListEventsResponse 

139 ) -> error_stats_service.ListEventsResponse: 

140 """Post-rpc interceptor for list_events 

141 

142 Override in a subclass to manipulate the response 

143 after it is returned by the ErrorStatsService server but before 

144 it is returned to user code. 

145 """ 

146 return response 

147 

148 def pre_list_group_stats( 

149 self, 

150 request: error_stats_service.ListGroupStatsRequest, 

151 metadata: Sequence[Tuple[str, str]], 

152 ) -> Tuple[error_stats_service.ListGroupStatsRequest, Sequence[Tuple[str, str]]]: 

153 """Pre-rpc interceptor for list_group_stats 

154 

155 Override in a subclass to manipulate the request or metadata 

156 before they are sent to the ErrorStatsService server. 

157 """ 

158 return request, metadata 

159 

160 def post_list_group_stats( 

161 self, response: error_stats_service.ListGroupStatsResponse 

162 ) -> error_stats_service.ListGroupStatsResponse: 

163 """Post-rpc interceptor for list_group_stats 

164 

165 Override in a subclass to manipulate the response 

166 after it is returned by the ErrorStatsService server but before 

167 it is returned to user code. 

168 """ 

169 return response 

170 

171 

172@dataclasses.dataclass 

173class ErrorStatsServiceRestStub: 

174 _session: AuthorizedSession 

175 _host: str 

176 _interceptor: ErrorStatsServiceRestInterceptor 

177 

178 

179class ErrorStatsServiceRestTransport(ErrorStatsServiceTransport): 

180 """REST backend transport for ErrorStatsService. 

181 

182 An API for retrieving and managing error statistics as well 

183 as data for individual events. 

184 

185 This class defines the same methods as the primary client, so the 

186 primary client can load the underlying transport implementation 

187 and call it. 

188 

189 It sends JSON representations of protocol buffers over HTTP/1.1 

190 

191 """ 

192 

193 def __init__( 

194 self, 

195 *, 

196 host: str = "clouderrorreporting.googleapis.com", 

197 credentials: Optional[ga_credentials.Credentials] = None, 

198 credentials_file: Optional[str] = None, 

199 scopes: Optional[Sequence[str]] = None, 

200 client_cert_source_for_mtls: Optional[Callable[[], Tuple[bytes, bytes]]] = None, 

201 quota_project_id: Optional[str] = None, 

202 client_info: gapic_v1.client_info.ClientInfo = DEFAULT_CLIENT_INFO, 

203 always_use_jwt_access: Optional[bool] = False, 

204 url_scheme: str = "https", 

205 interceptor: Optional[ErrorStatsServiceRestInterceptor] = None, 

206 api_audience: Optional[str] = None, 

207 ) -> None: 

208 """Instantiate the transport. 

209 

210 Args: 

211 host (Optional[str]): 

212 The hostname to connect to. 

213 credentials (Optional[google.auth.credentials.Credentials]): The 

214 authorization credentials to attach to requests. These 

215 credentials identify the application to the service; if none 

216 are specified, the client will attempt to ascertain the 

217 credentials from the environment. 

218 

219 credentials_file (Optional[str]): A file with credentials that can 

220 be loaded with :func:`google.auth.load_credentials_from_file`. 

221 This argument is ignored if ``channel`` is provided. 

222 scopes (Optional(Sequence[str])): A list of scopes. This argument is 

223 ignored if ``channel`` is provided. 

224 client_cert_source_for_mtls (Callable[[], Tuple[bytes, bytes]]): Client 

225 certificate to configure mutual TLS HTTP channel. It is ignored 

226 if ``channel`` is provided. 

227 quota_project_id (Optional[str]): An optional project to use for billing 

228 and quota. 

229 client_info (google.api_core.gapic_v1.client_info.ClientInfo): 

230 The client info used to send a user-agent string along with 

231 API requests. If ``None``, then default info will be used. 

232 Generally, you only need to set this if you are developing 

233 your own client library. 

234 always_use_jwt_access (Optional[bool]): Whether self signed JWT should 

235 be used for service account credentials. 

236 url_scheme: the protocol scheme for the API endpoint. Normally 

237 "https", but for testing or local servers, 

238 "http" can be specified. 

239 """ 

240 # Run the base constructor 

241 # TODO(yon-mg): resolve other ctor params i.e. scopes, quota, etc. 

242 # TODO: When custom host (api_endpoint) is set, `scopes` must *also* be set on the 

243 # credentials object 

244 maybe_url_match = re.match("^(?P<scheme>http(?:s)?://)?(?P<host>.*)$", host) 

245 if maybe_url_match is None: 

246 raise ValueError( 

247 f"Unexpected hostname structure: {host}" 

248 ) # pragma: NO COVER 

249 

250 url_match_items = maybe_url_match.groupdict() 

251 

252 host = f"{url_scheme}://{host}" if not url_match_items["scheme"] else host 

253 

254 super().__init__( 

255 host=host, 

256 credentials=credentials, 

257 client_info=client_info, 

258 always_use_jwt_access=always_use_jwt_access, 

259 api_audience=api_audience, 

260 ) 

261 self._session = AuthorizedSession( 

262 self._credentials, default_host=self.DEFAULT_HOST 

263 ) 

264 if client_cert_source_for_mtls: 

265 self._session.configure_mtls_channel(client_cert_source_for_mtls) 

266 self._interceptor = interceptor or ErrorStatsServiceRestInterceptor() 

267 self._prep_wrapped_messages(client_info) 

268 

269 class _DeleteEvents(ErrorStatsServiceRestStub): 

270 def __hash__(self): 

271 return hash("DeleteEvents") 

272 

273 __REQUIRED_FIELDS_DEFAULT_VALUES: Dict[str, Any] = {} 

274 

275 @classmethod 

276 def _get_unset_required_fields(cls, message_dict): 

277 return { 

278 k: v 

279 for k, v in cls.__REQUIRED_FIELDS_DEFAULT_VALUES.items() 

280 if k not in message_dict 

281 } 

282 

283 def __call__( 

284 self, 

285 request: error_stats_service.DeleteEventsRequest, 

286 *, 

287 retry: OptionalRetry = gapic_v1.method.DEFAULT, 

288 timeout: Optional[float] = None, 

289 metadata: Sequence[Tuple[str, str]] = (), 

290 ) -> error_stats_service.DeleteEventsResponse: 

291 r"""Call the delete events method over HTTP. 

292 

293 Args: 

294 request (~.error_stats_service.DeleteEventsRequest): 

295 The request object. Deletes all events in the project. 

296 retry (google.api_core.retry.Retry): Designation of what errors, if any, 

297 should be retried. 

298 timeout (float): The timeout for this request. 

299 metadata (Sequence[Tuple[str, str]]): Strings which should be 

300 sent along with the request as metadata. 

301 

302 Returns: 

303 ~.error_stats_service.DeleteEventsResponse: 

304 Response message for deleting error 

305 events. 

306 

307 """ 

308 

309 http_options: List[Dict[str, str]] = [ 

310 { 

311 "method": "delete", 

312 "uri": "/v1beta1/{project_name=projects/*}/events", 

313 }, 

314 ] 

315 request, metadata = self._interceptor.pre_delete_events(request, metadata) 

316 pb_request = error_stats_service.DeleteEventsRequest.pb(request) 

317 transcoded_request = path_template.transcode(http_options, pb_request) 

318 

319 uri = transcoded_request["uri"] 

320 method = transcoded_request["method"] 

321 

322 # Jsonify the query params 

323 query_params = json.loads( 

324 json_format.MessageToJson( 

325 transcoded_request["query_params"], 

326 including_default_value_fields=False, 

327 use_integers_for_enums=True, 

328 ) 

329 ) 

330 query_params.update(self._get_unset_required_fields(query_params)) 

331 

332 query_params["$alt"] = "json;enum-encoding=int" 

333 

334 # Send the request 

335 headers = dict(metadata) 

336 headers["Content-Type"] = "application/json" 

337 response = getattr(self._session, method)( 

338 "{host}{uri}".format(host=self._host, uri=uri), 

339 timeout=timeout, 

340 headers=headers, 

341 params=rest_helpers.flatten_query_params(query_params, strict=True), 

342 ) 

343 

344 # In case of error, raise the appropriate core_exceptions.GoogleAPICallError exception 

345 # subclass. 

346 if response.status_code >= 400: 

347 raise core_exceptions.from_http_response(response) 

348 

349 # Return the response 

350 resp = error_stats_service.DeleteEventsResponse() 

351 pb_resp = error_stats_service.DeleteEventsResponse.pb(resp) 

352 

353 json_format.Parse(response.content, pb_resp, ignore_unknown_fields=True) 

354 resp = self._interceptor.post_delete_events(resp) 

355 return resp 

356 

357 class _ListEvents(ErrorStatsServiceRestStub): 

358 def __hash__(self): 

359 return hash("ListEvents") 

360 

361 __REQUIRED_FIELDS_DEFAULT_VALUES: Dict[str, Any] = { 

362 "groupId": "", 

363 } 

364 

365 @classmethod 

366 def _get_unset_required_fields(cls, message_dict): 

367 return { 

368 k: v 

369 for k, v in cls.__REQUIRED_FIELDS_DEFAULT_VALUES.items() 

370 if k not in message_dict 

371 } 

372 

373 def __call__( 

374 self, 

375 request: error_stats_service.ListEventsRequest, 

376 *, 

377 retry: OptionalRetry = gapic_v1.method.DEFAULT, 

378 timeout: Optional[float] = None, 

379 metadata: Sequence[Tuple[str, str]] = (), 

380 ) -> error_stats_service.ListEventsResponse: 

381 r"""Call the list events method over HTTP. 

382 

383 Args: 

384 request (~.error_stats_service.ListEventsRequest): 

385 The request object. Specifies a set of error events to 

386 return. 

387 retry (google.api_core.retry.Retry): Designation of what errors, if any, 

388 should be retried. 

389 timeout (float): The timeout for this request. 

390 metadata (Sequence[Tuple[str, str]]): Strings which should be 

391 sent along with the request as metadata. 

392 

393 Returns: 

394 ~.error_stats_service.ListEventsResponse: 

395 Contains a set of requested error 

396 events. 

397 

398 """ 

399 

400 http_options: List[Dict[str, str]] = [ 

401 { 

402 "method": "get", 

403 "uri": "/v1beta1/{project_name=projects/*}/events", 

404 }, 

405 ] 

406 request, metadata = self._interceptor.pre_list_events(request, metadata) 

407 pb_request = error_stats_service.ListEventsRequest.pb(request) 

408 transcoded_request = path_template.transcode(http_options, pb_request) 

409 

410 uri = transcoded_request["uri"] 

411 method = transcoded_request["method"] 

412 

413 # Jsonify the query params 

414 query_params = json.loads( 

415 json_format.MessageToJson( 

416 transcoded_request["query_params"], 

417 including_default_value_fields=False, 

418 use_integers_for_enums=True, 

419 ) 

420 ) 

421 query_params.update(self._get_unset_required_fields(query_params)) 

422 

423 query_params["$alt"] = "json;enum-encoding=int" 

424 

425 # Send the request 

426 headers = dict(metadata) 

427 headers["Content-Type"] = "application/json" 

428 response = getattr(self._session, method)( 

429 "{host}{uri}".format(host=self._host, uri=uri), 

430 timeout=timeout, 

431 headers=headers, 

432 params=rest_helpers.flatten_query_params(query_params, strict=True), 

433 ) 

434 

435 # In case of error, raise the appropriate core_exceptions.GoogleAPICallError exception 

436 # subclass. 

437 if response.status_code >= 400: 

438 raise core_exceptions.from_http_response(response) 

439 

440 # Return the response 

441 resp = error_stats_service.ListEventsResponse() 

442 pb_resp = error_stats_service.ListEventsResponse.pb(resp) 

443 

444 json_format.Parse(response.content, pb_resp, ignore_unknown_fields=True) 

445 resp = self._interceptor.post_list_events(resp) 

446 return resp 

447 

448 class _ListGroupStats(ErrorStatsServiceRestStub): 

449 def __hash__(self): 

450 return hash("ListGroupStats") 

451 

452 __REQUIRED_FIELDS_DEFAULT_VALUES: Dict[str, Any] = {} 

453 

454 @classmethod 

455 def _get_unset_required_fields(cls, message_dict): 

456 return { 

457 k: v 

458 for k, v in cls.__REQUIRED_FIELDS_DEFAULT_VALUES.items() 

459 if k not in message_dict 

460 } 

461 

462 def __call__( 

463 self, 

464 request: error_stats_service.ListGroupStatsRequest, 

465 *, 

466 retry: OptionalRetry = gapic_v1.method.DEFAULT, 

467 timeout: Optional[float] = None, 

468 metadata: Sequence[Tuple[str, str]] = (), 

469 ) -> error_stats_service.ListGroupStatsResponse: 

470 r"""Call the list group stats method over HTTP. 

471 

472 Args: 

473 request (~.error_stats_service.ListGroupStatsRequest): 

474 The request object. Specifies a set of ``ErrorGroupStats`` to return. 

475 retry (google.api_core.retry.Retry): Designation of what errors, if any, 

476 should be retried. 

477 timeout (float): The timeout for this request. 

478 metadata (Sequence[Tuple[str, str]]): Strings which should be 

479 sent along with the request as metadata. 

480 

481 Returns: 

482 ~.error_stats_service.ListGroupStatsResponse: 

483 Contains a set of requested error 

484 group stats. 

485 

486 """ 

487 

488 http_options: List[Dict[str, str]] = [ 

489 { 

490 "method": "get", 

491 "uri": "/v1beta1/{project_name=projects/*}/groupStats", 

492 }, 

493 ] 

494 request, metadata = self._interceptor.pre_list_group_stats( 

495 request, metadata 

496 ) 

497 pb_request = error_stats_service.ListGroupStatsRequest.pb(request) 

498 transcoded_request = path_template.transcode(http_options, pb_request) 

499 

500 uri = transcoded_request["uri"] 

501 method = transcoded_request["method"] 

502 

503 # Jsonify the query params 

504 query_params = json.loads( 

505 json_format.MessageToJson( 

506 transcoded_request["query_params"], 

507 including_default_value_fields=False, 

508 use_integers_for_enums=True, 

509 ) 

510 ) 

511 query_params.update(self._get_unset_required_fields(query_params)) 

512 

513 query_params["$alt"] = "json;enum-encoding=int" 

514 

515 # Send the request 

516 headers = dict(metadata) 

517 headers["Content-Type"] = "application/json" 

518 response = getattr(self._session, method)( 

519 "{host}{uri}".format(host=self._host, uri=uri), 

520 timeout=timeout, 

521 headers=headers, 

522 params=rest_helpers.flatten_query_params(query_params, strict=True), 

523 ) 

524 

525 # In case of error, raise the appropriate core_exceptions.GoogleAPICallError exception 

526 # subclass. 

527 if response.status_code >= 400: 

528 raise core_exceptions.from_http_response(response) 

529 

530 # Return the response 

531 resp = error_stats_service.ListGroupStatsResponse() 

532 pb_resp = error_stats_service.ListGroupStatsResponse.pb(resp) 

533 

534 json_format.Parse(response.content, pb_resp, ignore_unknown_fields=True) 

535 resp = self._interceptor.post_list_group_stats(resp) 

536 return resp 

537 

538 @property 

539 def delete_events( 

540 self, 

541 ) -> Callable[ 

542 [error_stats_service.DeleteEventsRequest], 

543 error_stats_service.DeleteEventsResponse, 

544 ]: 

545 # The return type is fine, but mypy isn't sophisticated enough to determine what's going on here. 

546 # In C++ this would require a dynamic_cast 

547 return self._DeleteEvents(self._session, self._host, self._interceptor) # type: ignore 

548 

549 @property 

550 def list_events( 

551 self, 

552 ) -> Callable[ 

553 [error_stats_service.ListEventsRequest], error_stats_service.ListEventsResponse 

554 ]: 

555 # The return type is fine, but mypy isn't sophisticated enough to determine what's going on here. 

556 # In C++ this would require a dynamic_cast 

557 return self._ListEvents(self._session, self._host, self._interceptor) # type: ignore 

558 

559 @property 

560 def list_group_stats( 

561 self, 

562 ) -> Callable[ 

563 [error_stats_service.ListGroupStatsRequest], 

564 error_stats_service.ListGroupStatsResponse, 

565 ]: 

566 # The return type is fine, but mypy isn't sophisticated enough to determine what's going on here. 

567 # In C++ this would require a dynamic_cast 

568 return self._ListGroupStats(self._session, self._host, self._interceptor) # type: ignore 

569 

570 @property 

571 def kind(self) -> str: 

572 return "rest" 

573 

574 def close(self): 

575 self._session.close() 

576 

577 

578__all__ = ("ErrorStatsServiceRestTransport",)