Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/google/cloud/errorreporting_v1beta1/services/error_group_service/transports/rest.py: 49%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

117 statements  

1# -*- coding: utf-8 -*- 

2# Copyright 2025 Google LLC 

3# 

4# Licensed under the Apache License, Version 2.0 (the "License"); 

5# you may not use this file except in compliance with the License. 

6# You may obtain a copy of the License at 

7# 

8# http://www.apache.org/licenses/LICENSE-2.0 

9# 

10# Unless required by applicable law or agreed to in writing, software 

11# distributed under the License is distributed on an "AS IS" BASIS, 

12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

13# See the License for the specific language governing permissions and 

14# limitations under the License. 

15# 

16import logging 

17import json # type: ignore 

18 

19from google.auth.transport.requests import AuthorizedSession # type: ignore 

20from google.auth import credentials as ga_credentials # type: ignore 

21from google.api_core import exceptions as core_exceptions 

22from google.api_core import retry as retries 

23from google.api_core import rest_helpers 

24from google.api_core import rest_streaming 

25from google.api_core import gapic_v1 

26import google.protobuf 

27 

28from google.protobuf import json_format 

29 

30from requests import __version__ as requests_version 

31import dataclasses 

32from typing import Any, Callable, Dict, List, Optional, Sequence, Tuple, Union 

33import warnings 

34 

35 

36from google.cloud.errorreporting_v1beta1.types import common 

37from google.cloud.errorreporting_v1beta1.types import error_group_service 

38 

39 

40from .rest_base import _BaseErrorGroupServiceRestTransport 

41from .base import DEFAULT_CLIENT_INFO as BASE_DEFAULT_CLIENT_INFO 

42 

43try: 

44 OptionalRetry = Union[retries.Retry, gapic_v1.method._MethodDefault, None] 

45except AttributeError: # pragma: NO COVER 

46 OptionalRetry = Union[retries.Retry, object, None] # type: ignore 

47 

48try: 

49 from google.api_core import client_logging # type: ignore 

50 

51 CLIENT_LOGGING_SUPPORTED = True # pragma: NO COVER 

52except ImportError: # pragma: NO COVER 

53 CLIENT_LOGGING_SUPPORTED = False 

54 

55_LOGGER = logging.getLogger(__name__) 

56 

57DEFAULT_CLIENT_INFO = gapic_v1.client_info.ClientInfo( 

58 gapic_version=BASE_DEFAULT_CLIENT_INFO.gapic_version, 

59 grpc_version=None, 

60 rest_version=f"requests@{requests_version}", 

61) 

62 

63if hasattr(DEFAULT_CLIENT_INFO, "protobuf_runtime_version"): # pragma: NO COVER 

64 DEFAULT_CLIENT_INFO.protobuf_runtime_version = google.protobuf.__version__ 

65 

66 

67class ErrorGroupServiceRestInterceptor: 

68 """Interceptor for ErrorGroupService. 

69 

70 Interceptors are used to manipulate requests, request metadata, and responses 

71 in arbitrary ways. 

72 Example use cases include: 

73 * Logging 

74 * Verifying requests according to service or custom semantics 

75 * Stripping extraneous information from responses 

76 

77 These use cases and more can be enabled by injecting an 

78 instance of a custom subclass when constructing the ErrorGroupServiceRestTransport. 

79 

80 .. code-block:: python 

81 class MyCustomErrorGroupServiceInterceptor(ErrorGroupServiceRestInterceptor): 

82 def pre_get_group(self, request, metadata): 

83 logging.log(f"Received request: {request}") 

84 return request, metadata 

85 

86 def post_get_group(self, response): 

87 logging.log(f"Received response: {response}") 

88 return response 

89 

90 def pre_update_group(self, request, metadata): 

91 logging.log(f"Received request: {request}") 

92 return request, metadata 

93 

94 def post_update_group(self, response): 

95 logging.log(f"Received response: {response}") 

96 return response 

97 

98 transport = ErrorGroupServiceRestTransport(interceptor=MyCustomErrorGroupServiceInterceptor()) 

99 client = ErrorGroupServiceClient(transport=transport) 

100 

101 

102 """ 

103 

104 def pre_get_group( 

105 self, 

106 request: error_group_service.GetGroupRequest, 

107 metadata: Sequence[Tuple[str, Union[str, bytes]]], 

108 ) -> Tuple[ 

109 error_group_service.GetGroupRequest, Sequence[Tuple[str, Union[str, bytes]]] 

110 ]: 

111 """Pre-rpc interceptor for get_group 

112 

113 Override in a subclass to manipulate the request or metadata 

114 before they are sent to the ErrorGroupService server. 

115 """ 

116 return request, metadata 

117 

118 def post_get_group(self, response: common.ErrorGroup) -> common.ErrorGroup: 

119 """Post-rpc interceptor for get_group 

120 

121 DEPRECATED. Please use the `post_get_group_with_metadata` 

122 interceptor instead. 

123 

124 Override in a subclass to read or manipulate the response 

125 after it is returned by the ErrorGroupService server but before 

126 it is returned to user code. This `post_get_group` interceptor runs 

127 before the `post_get_group_with_metadata` interceptor. 

128 """ 

129 return response 

130 

131 def post_get_group_with_metadata( 

132 self, 

133 response: common.ErrorGroup, 

134 metadata: Sequence[Tuple[str, Union[str, bytes]]], 

135 ) -> Tuple[common.ErrorGroup, Sequence[Tuple[str, Union[str, bytes]]]]: 

136 """Post-rpc interceptor for get_group 

137 

138 Override in a subclass to read or manipulate the response or metadata after it 

139 is returned by the ErrorGroupService server but before it is returned to user code. 

140 

141 We recommend only using this `post_get_group_with_metadata` 

142 interceptor in new development instead of the `post_get_group` interceptor. 

143 When both interceptors are used, this `post_get_group_with_metadata` interceptor runs after the 

144 `post_get_group` interceptor. The (possibly modified) response returned by 

145 `post_get_group` will be passed to 

146 `post_get_group_with_metadata`. 

147 """ 

148 return response, metadata 

149 

150 def pre_update_group( 

151 self, 

152 request: error_group_service.UpdateGroupRequest, 

153 metadata: Sequence[Tuple[str, Union[str, bytes]]], 

154 ) -> Tuple[ 

155 error_group_service.UpdateGroupRequest, Sequence[Tuple[str, Union[str, bytes]]] 

156 ]: 

157 """Pre-rpc interceptor for update_group 

158 

159 Override in a subclass to manipulate the request or metadata 

160 before they are sent to the ErrorGroupService server. 

161 """ 

162 return request, metadata 

163 

164 def post_update_group(self, response: common.ErrorGroup) -> common.ErrorGroup: 

165 """Post-rpc interceptor for update_group 

166 

167 DEPRECATED. Please use the `post_update_group_with_metadata` 

168 interceptor instead. 

169 

170 Override in a subclass to read or manipulate the response 

171 after it is returned by the ErrorGroupService server but before 

172 it is returned to user code. This `post_update_group` interceptor runs 

173 before the `post_update_group_with_metadata` interceptor. 

174 """ 

175 return response 

176 

177 def post_update_group_with_metadata( 

178 self, 

179 response: common.ErrorGroup, 

180 metadata: Sequence[Tuple[str, Union[str, bytes]]], 

181 ) -> Tuple[common.ErrorGroup, Sequence[Tuple[str, Union[str, bytes]]]]: 

182 """Post-rpc interceptor for update_group 

183 

184 Override in a subclass to read or manipulate the response or metadata after it 

185 is returned by the ErrorGroupService server but before it is returned to user code. 

186 

187 We recommend only using this `post_update_group_with_metadata` 

188 interceptor in new development instead of the `post_update_group` interceptor. 

189 When both interceptors are used, this `post_update_group_with_metadata` interceptor runs after the 

190 `post_update_group` interceptor. The (possibly modified) response returned by 

191 `post_update_group` will be passed to 

192 `post_update_group_with_metadata`. 

193 """ 

194 return response, metadata 

195 

196 

197@dataclasses.dataclass 

198class ErrorGroupServiceRestStub: 

199 _session: AuthorizedSession 

200 _host: str 

201 _interceptor: ErrorGroupServiceRestInterceptor 

202 

203 

204class ErrorGroupServiceRestTransport(_BaseErrorGroupServiceRestTransport): 

205 """REST backend synchronous transport for ErrorGroupService. 

206 

207 Service for retrieving and updating individual error groups. 

208 

209 This class defines the same methods as the primary client, so the 

210 primary client can load the underlying transport implementation 

211 and call it. 

212 

213 It sends JSON representations of protocol buffers over HTTP/1.1 

214 """ 

215 

216 def __init__( 

217 self, 

218 *, 

219 host: str = "clouderrorreporting.googleapis.com", 

220 credentials: Optional[ga_credentials.Credentials] = None, 

221 credentials_file: Optional[str] = None, 

222 scopes: Optional[Sequence[str]] = None, 

223 client_cert_source_for_mtls: Optional[Callable[[], Tuple[bytes, bytes]]] = None, 

224 quota_project_id: Optional[str] = None, 

225 client_info: gapic_v1.client_info.ClientInfo = DEFAULT_CLIENT_INFO, 

226 always_use_jwt_access: Optional[bool] = False, 

227 url_scheme: str = "https", 

228 interceptor: Optional[ErrorGroupServiceRestInterceptor] = None, 

229 api_audience: Optional[str] = None, 

230 ) -> None: 

231 """Instantiate the transport. 

232 

233 Args: 

234 host (Optional[str]): 

235 The hostname to connect to (default: 'clouderrorreporting.googleapis.com'). 

236 credentials (Optional[google.auth.credentials.Credentials]): The 

237 authorization credentials to attach to requests. These 

238 credentials identify the application to the service; if none 

239 are specified, the client will attempt to ascertain the 

240 credentials from the environment. 

241 

242 credentials_file (Optional[str]): Deprecated. A file with credentials that can 

243 be loaded with :func:`google.auth.load_credentials_from_file`. 

244 This argument is ignored if ``channel`` is provided. This argument will be 

245 removed in the next major version of this library. 

246 scopes (Optional(Sequence[str])): A list of scopes. This argument is 

247 ignored if ``channel`` is provided. 

248 client_cert_source_for_mtls (Callable[[], Tuple[bytes, bytes]]): Client 

249 certificate to configure mutual TLS HTTP channel. It is ignored 

250 if ``channel`` is provided. 

251 quota_project_id (Optional[str]): An optional project to use for billing 

252 and quota. 

253 client_info (google.api_core.gapic_v1.client_info.ClientInfo): 

254 The client info used to send a user-agent string along with 

255 API requests. If ``None``, then default info will be used. 

256 Generally, you only need to set this if you are developing 

257 your own client library. 

258 always_use_jwt_access (Optional[bool]): Whether self signed JWT should 

259 be used for service account credentials. 

260 url_scheme: the protocol scheme for the API endpoint. Normally 

261 "https", but for testing or local servers, 

262 "http" can be specified. 

263 """ 

264 # Run the base constructor 

265 # TODO(yon-mg): resolve other ctor params i.e. scopes, quota, etc. 

266 # TODO: When custom host (api_endpoint) is set, `scopes` must *also* be set on the 

267 # credentials object 

268 super().__init__( 

269 host=host, 

270 credentials=credentials, 

271 client_info=client_info, 

272 always_use_jwt_access=always_use_jwt_access, 

273 url_scheme=url_scheme, 

274 api_audience=api_audience, 

275 ) 

276 self._session = AuthorizedSession( 

277 self._credentials, default_host=self.DEFAULT_HOST 

278 ) 

279 if client_cert_source_for_mtls: 

280 self._session.configure_mtls_channel(client_cert_source_for_mtls) 

281 self._interceptor = interceptor or ErrorGroupServiceRestInterceptor() 

282 self._prep_wrapped_messages(client_info) 

283 

284 class _GetGroup( 

285 _BaseErrorGroupServiceRestTransport._BaseGetGroup, ErrorGroupServiceRestStub 

286 ): 

287 def __hash__(self): 

288 return hash("ErrorGroupServiceRestTransport.GetGroup") 

289 

290 @staticmethod 

291 def _get_response( 

292 host, 

293 metadata, 

294 query_params, 

295 session, 

296 timeout, 

297 transcoded_request, 

298 body=None, 

299 ): 

300 uri = transcoded_request["uri"] 

301 method = transcoded_request["method"] 

302 headers = dict(metadata) 

303 headers["Content-Type"] = "application/json" 

304 response = getattr(session, method)( 

305 "{host}{uri}".format(host=host, uri=uri), 

306 timeout=timeout, 

307 headers=headers, 

308 params=rest_helpers.flatten_query_params(query_params, strict=True), 

309 ) 

310 return response 

311 

312 def __call__( 

313 self, 

314 request: error_group_service.GetGroupRequest, 

315 *, 

316 retry: OptionalRetry = gapic_v1.method.DEFAULT, 

317 timeout: Optional[float] = None, 

318 metadata: Sequence[Tuple[str, Union[str, bytes]]] = (), 

319 ) -> common.ErrorGroup: 

320 r"""Call the get group method over HTTP. 

321 

322 Args: 

323 request (~.error_group_service.GetGroupRequest): 

324 The request object. A request to return an individual 

325 group. 

326 retry (google.api_core.retry.Retry): Designation of what errors, if any, 

327 should be retried. 

328 timeout (float): The timeout for this request. 

329 metadata (Sequence[Tuple[str, Union[str, bytes]]]): Key/value pairs which should be 

330 sent along with the request as metadata. Normally, each value must be of type `str`, 

331 but for metadata keys ending with the suffix `-bin`, the corresponding values must 

332 be of type `bytes`. 

333 

334 Returns: 

335 ~.common.ErrorGroup: 

336 Description of a group of similar 

337 error events. 

338 

339 """ 

340 

341 http_options = ( 

342 _BaseErrorGroupServiceRestTransport._BaseGetGroup._get_http_options() 

343 ) 

344 

345 request, metadata = self._interceptor.pre_get_group(request, metadata) 

346 transcoded_request = _BaseErrorGroupServiceRestTransport._BaseGetGroup._get_transcoded_request( 

347 http_options, request 

348 ) 

349 

350 # Jsonify the query params 

351 query_params = _BaseErrorGroupServiceRestTransport._BaseGetGroup._get_query_params_json( 

352 transcoded_request 

353 ) 

354 

355 if CLIENT_LOGGING_SUPPORTED and _LOGGER.isEnabledFor( 

356 logging.DEBUG 

357 ): # pragma: NO COVER 

358 request_url = "{host}{uri}".format( 

359 host=self._host, uri=transcoded_request["uri"] 

360 ) 

361 method = transcoded_request["method"] 

362 try: 

363 request_payload = type(request).to_json(request) 

364 except: 

365 request_payload = None 

366 http_request = { 

367 "payload": request_payload, 

368 "requestMethod": method, 

369 "requestUrl": request_url, 

370 "headers": dict(metadata), 

371 } 

372 _LOGGER.debug( 

373 f"Sending request for google.devtools.clouderrorreporting_v1beta1.ErrorGroupServiceClient.GetGroup", 

374 extra={ 

375 "serviceName": "google.devtools.clouderrorreporting.v1beta1.ErrorGroupService", 

376 "rpcName": "GetGroup", 

377 "httpRequest": http_request, 

378 "metadata": http_request["headers"], 

379 }, 

380 ) 

381 

382 # Send the request 

383 response = ErrorGroupServiceRestTransport._GetGroup._get_response( 

384 self._host, 

385 metadata, 

386 query_params, 

387 self._session, 

388 timeout, 

389 transcoded_request, 

390 ) 

391 

392 # In case of error, raise the appropriate core_exceptions.GoogleAPICallError exception 

393 # subclass. 

394 if response.status_code >= 400: 

395 raise core_exceptions.from_http_response(response) 

396 

397 # Return the response 

398 resp = common.ErrorGroup() 

399 pb_resp = common.ErrorGroup.pb(resp) 

400 

401 json_format.Parse(response.content, pb_resp, ignore_unknown_fields=True) 

402 

403 resp = self._interceptor.post_get_group(resp) 

404 response_metadata = [(k, str(v)) for k, v in response.headers.items()] 

405 resp, _ = self._interceptor.post_get_group_with_metadata( 

406 resp, response_metadata 

407 ) 

408 if CLIENT_LOGGING_SUPPORTED and _LOGGER.isEnabledFor( 

409 logging.DEBUG 

410 ): # pragma: NO COVER 

411 try: 

412 response_payload = common.ErrorGroup.to_json(response) 

413 except: 

414 response_payload = None 

415 http_response = { 

416 "payload": response_payload, 

417 "headers": dict(response.headers), 

418 "status": response.status_code, 

419 } 

420 _LOGGER.debug( 

421 "Received response for google.devtools.clouderrorreporting_v1beta1.ErrorGroupServiceClient.get_group", 

422 extra={ 

423 "serviceName": "google.devtools.clouderrorreporting.v1beta1.ErrorGroupService", 

424 "rpcName": "GetGroup", 

425 "metadata": http_response["headers"], 

426 "httpResponse": http_response, 

427 }, 

428 ) 

429 return resp 

430 

431 class _UpdateGroup( 

432 _BaseErrorGroupServiceRestTransport._BaseUpdateGroup, ErrorGroupServiceRestStub 

433 ): 

434 def __hash__(self): 

435 return hash("ErrorGroupServiceRestTransport.UpdateGroup") 

436 

437 @staticmethod 

438 def _get_response( 

439 host, 

440 metadata, 

441 query_params, 

442 session, 

443 timeout, 

444 transcoded_request, 

445 body=None, 

446 ): 

447 uri = transcoded_request["uri"] 

448 method = transcoded_request["method"] 

449 headers = dict(metadata) 

450 headers["Content-Type"] = "application/json" 

451 response = getattr(session, method)( 

452 "{host}{uri}".format(host=host, uri=uri), 

453 timeout=timeout, 

454 headers=headers, 

455 params=rest_helpers.flatten_query_params(query_params, strict=True), 

456 data=body, 

457 ) 

458 return response 

459 

460 def __call__( 

461 self, 

462 request: error_group_service.UpdateGroupRequest, 

463 *, 

464 retry: OptionalRetry = gapic_v1.method.DEFAULT, 

465 timeout: Optional[float] = None, 

466 metadata: Sequence[Tuple[str, Union[str, bytes]]] = (), 

467 ) -> common.ErrorGroup: 

468 r"""Call the update group method over HTTP. 

469 

470 Args: 

471 request (~.error_group_service.UpdateGroupRequest): 

472 The request object. A request to replace the existing 

473 data for the given group. 

474 retry (google.api_core.retry.Retry): Designation of what errors, if any, 

475 should be retried. 

476 timeout (float): The timeout for this request. 

477 metadata (Sequence[Tuple[str, Union[str, bytes]]]): Key/value pairs which should be 

478 sent along with the request as metadata. Normally, each value must be of type `str`, 

479 but for metadata keys ending with the suffix `-bin`, the corresponding values must 

480 be of type `bytes`. 

481 

482 Returns: 

483 ~.common.ErrorGroup: 

484 Description of a group of similar 

485 error events. 

486 

487 """ 

488 

489 http_options = ( 

490 _BaseErrorGroupServiceRestTransport._BaseUpdateGroup._get_http_options() 

491 ) 

492 

493 request, metadata = self._interceptor.pre_update_group(request, metadata) 

494 transcoded_request = _BaseErrorGroupServiceRestTransport._BaseUpdateGroup._get_transcoded_request( 

495 http_options, request 

496 ) 

497 

498 body = _BaseErrorGroupServiceRestTransport._BaseUpdateGroup._get_request_body_json( 

499 transcoded_request 

500 ) 

501 

502 # Jsonify the query params 

503 query_params = _BaseErrorGroupServiceRestTransport._BaseUpdateGroup._get_query_params_json( 

504 transcoded_request 

505 ) 

506 

507 if CLIENT_LOGGING_SUPPORTED and _LOGGER.isEnabledFor( 

508 logging.DEBUG 

509 ): # pragma: NO COVER 

510 request_url = "{host}{uri}".format( 

511 host=self._host, uri=transcoded_request["uri"] 

512 ) 

513 method = transcoded_request["method"] 

514 try: 

515 request_payload = type(request).to_json(request) 

516 except: 

517 request_payload = None 

518 http_request = { 

519 "payload": request_payload, 

520 "requestMethod": method, 

521 "requestUrl": request_url, 

522 "headers": dict(metadata), 

523 } 

524 _LOGGER.debug( 

525 f"Sending request for google.devtools.clouderrorreporting_v1beta1.ErrorGroupServiceClient.UpdateGroup", 

526 extra={ 

527 "serviceName": "google.devtools.clouderrorreporting.v1beta1.ErrorGroupService", 

528 "rpcName": "UpdateGroup", 

529 "httpRequest": http_request, 

530 "metadata": http_request["headers"], 

531 }, 

532 ) 

533 

534 # Send the request 

535 response = ErrorGroupServiceRestTransport._UpdateGroup._get_response( 

536 self._host, 

537 metadata, 

538 query_params, 

539 self._session, 

540 timeout, 

541 transcoded_request, 

542 body, 

543 ) 

544 

545 # In case of error, raise the appropriate core_exceptions.GoogleAPICallError exception 

546 # subclass. 

547 if response.status_code >= 400: 

548 raise core_exceptions.from_http_response(response) 

549 

550 # Return the response 

551 resp = common.ErrorGroup() 

552 pb_resp = common.ErrorGroup.pb(resp) 

553 

554 json_format.Parse(response.content, pb_resp, ignore_unknown_fields=True) 

555 

556 resp = self._interceptor.post_update_group(resp) 

557 response_metadata = [(k, str(v)) for k, v in response.headers.items()] 

558 resp, _ = self._interceptor.post_update_group_with_metadata( 

559 resp, response_metadata 

560 ) 

561 if CLIENT_LOGGING_SUPPORTED and _LOGGER.isEnabledFor( 

562 logging.DEBUG 

563 ): # pragma: NO COVER 

564 try: 

565 response_payload = common.ErrorGroup.to_json(response) 

566 except: 

567 response_payload = None 

568 http_response = { 

569 "payload": response_payload, 

570 "headers": dict(response.headers), 

571 "status": response.status_code, 

572 } 

573 _LOGGER.debug( 

574 "Received response for google.devtools.clouderrorreporting_v1beta1.ErrorGroupServiceClient.update_group", 

575 extra={ 

576 "serviceName": "google.devtools.clouderrorreporting.v1beta1.ErrorGroupService", 

577 "rpcName": "UpdateGroup", 

578 "metadata": http_response["headers"], 

579 "httpResponse": http_response, 

580 }, 

581 ) 

582 return resp 

583 

584 @property 

585 def get_group( 

586 self, 

587 ) -> Callable[[error_group_service.GetGroupRequest], common.ErrorGroup]: 

588 # The return type is fine, but mypy isn't sophisticated enough to determine what's going on here. 

589 # In C++ this would require a dynamic_cast 

590 return self._GetGroup(self._session, self._host, self._interceptor) # type: ignore 

591 

592 @property 

593 def update_group( 

594 self, 

595 ) -> Callable[[error_group_service.UpdateGroupRequest], common.ErrorGroup]: 

596 # The return type is fine, but mypy isn't sophisticated enough to determine what's going on here. 

597 # In C++ this would require a dynamic_cast 

598 return self._UpdateGroup(self._session, self._host, self._interceptor) # type: ignore 

599 

600 @property 

601 def kind(self) -> str: 

602 return "rest" 

603 

604 def close(self): 

605 self._session.close() 

606 

607 

608__all__ = ("ErrorGroupServiceRestTransport",)