Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/google/cloud/errorreporting_v1beta1/services/error_group_service/transports/rest.py: 49%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

117 statements  

1# -*- coding: utf-8 -*- 

2# Copyright 2025 Google LLC 

3# 

4# Licensed under the Apache License, Version 2.0 (the "License"); 

5# you may not use this file except in compliance with the License. 

6# You may obtain a copy of the License at 

7# 

8# http://www.apache.org/licenses/LICENSE-2.0 

9# 

10# Unless required by applicable law or agreed to in writing, software 

11# distributed under the License is distributed on an "AS IS" BASIS, 

12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

13# See the License for the specific language governing permissions and 

14# limitations under the License. 

15# 

16import logging 

17import json # type: ignore 

18 

19from google.auth.transport.requests import AuthorizedSession # type: ignore 

20from google.auth import credentials as ga_credentials # type: ignore 

21from google.api_core import exceptions as core_exceptions 

22from google.api_core import retry as retries 

23from google.api_core import rest_helpers 

24from google.api_core import rest_streaming 

25from google.api_core import gapic_v1 

26import google.protobuf 

27 

28from google.protobuf import json_format 

29 

30from requests import __version__ as requests_version 

31import dataclasses 

32from typing import Any, Callable, Dict, List, Optional, Sequence, Tuple, Union 

33import warnings 

34 

35 

36from google.cloud.errorreporting_v1beta1.types import common 

37from google.cloud.errorreporting_v1beta1.types import error_group_service 

38 

39 

40from .rest_base import _BaseErrorGroupServiceRestTransport 

41from .base import DEFAULT_CLIENT_INFO as BASE_DEFAULT_CLIENT_INFO 

42 

43try: 

44 OptionalRetry = Union[retries.Retry, gapic_v1.method._MethodDefault, None] 

45except AttributeError: # pragma: NO COVER 

46 OptionalRetry = Union[retries.Retry, object, None] # type: ignore 

47 

48try: 

49 from google.api_core import client_logging # type: ignore 

50 

51 CLIENT_LOGGING_SUPPORTED = True # pragma: NO COVER 

52except ImportError: # pragma: NO COVER 

53 CLIENT_LOGGING_SUPPORTED = False 

54 

55_LOGGER = logging.getLogger(__name__) 

56 

57DEFAULT_CLIENT_INFO = gapic_v1.client_info.ClientInfo( 

58 gapic_version=BASE_DEFAULT_CLIENT_INFO.gapic_version, 

59 grpc_version=None, 

60 rest_version=f"requests@{requests_version}", 

61) 

62 

63if hasattr(DEFAULT_CLIENT_INFO, "protobuf_runtime_version"): # pragma: NO COVER 

64 DEFAULT_CLIENT_INFO.protobuf_runtime_version = google.protobuf.__version__ 

65 

66 

67class ErrorGroupServiceRestInterceptor: 

68 """Interceptor for ErrorGroupService. 

69 

70 Interceptors are used to manipulate requests, request metadata, and responses 

71 in arbitrary ways. 

72 Example use cases include: 

73 * Logging 

74 * Verifying requests according to service or custom semantics 

75 * Stripping extraneous information from responses 

76 

77 These use cases and more can be enabled by injecting an 

78 instance of a custom subclass when constructing the ErrorGroupServiceRestTransport. 

79 

80 .. code-block:: python 

81 class MyCustomErrorGroupServiceInterceptor(ErrorGroupServiceRestInterceptor): 

82 def pre_get_group(self, request, metadata): 

83 logging.log(f"Received request: {request}") 

84 return request, metadata 

85 

86 def post_get_group(self, response): 

87 logging.log(f"Received response: {response}") 

88 return response 

89 

90 def pre_update_group(self, request, metadata): 

91 logging.log(f"Received request: {request}") 

92 return request, metadata 

93 

94 def post_update_group(self, response): 

95 logging.log(f"Received response: {response}") 

96 return response 

97 

98 transport = ErrorGroupServiceRestTransport(interceptor=MyCustomErrorGroupServiceInterceptor()) 

99 client = ErrorGroupServiceClient(transport=transport) 

100 

101 

102 """ 

103 

104 def pre_get_group( 

105 self, 

106 request: error_group_service.GetGroupRequest, 

107 metadata: Sequence[Tuple[str, Union[str, bytes]]], 

108 ) -> Tuple[ 

109 error_group_service.GetGroupRequest, Sequence[Tuple[str, Union[str, bytes]]] 

110 ]: 

111 """Pre-rpc interceptor for get_group 

112 

113 Override in a subclass to manipulate the request or metadata 

114 before they are sent to the ErrorGroupService server. 

115 """ 

116 return request, metadata 

117 

118 def post_get_group(self, response: common.ErrorGroup) -> common.ErrorGroup: 

119 """Post-rpc interceptor for get_group 

120 

121 DEPRECATED. Please use the `post_get_group_with_metadata` 

122 interceptor instead. 

123 

124 Override in a subclass to read or manipulate the response 

125 after it is returned by the ErrorGroupService server but before 

126 it is returned to user code. This `post_get_group` interceptor runs 

127 before the `post_get_group_with_metadata` interceptor. 

128 """ 

129 return response 

130 

131 def post_get_group_with_metadata( 

132 self, 

133 response: common.ErrorGroup, 

134 metadata: Sequence[Tuple[str, Union[str, bytes]]], 

135 ) -> Tuple[common.ErrorGroup, Sequence[Tuple[str, Union[str, bytes]]]]: 

136 """Post-rpc interceptor for get_group 

137 

138 Override in a subclass to read or manipulate the response or metadata after it 

139 is returned by the ErrorGroupService server but before it is returned to user code. 

140 

141 We recommend only using this `post_get_group_with_metadata` 

142 interceptor in new development instead of the `post_get_group` interceptor. 

143 When both interceptors are used, this `post_get_group_with_metadata` interceptor runs after the 

144 `post_get_group` interceptor. The (possibly modified) response returned by 

145 `post_get_group` will be passed to 

146 `post_get_group_with_metadata`. 

147 """ 

148 return response, metadata 

149 

150 def pre_update_group( 

151 self, 

152 request: error_group_service.UpdateGroupRequest, 

153 metadata: Sequence[Tuple[str, Union[str, bytes]]], 

154 ) -> Tuple[ 

155 error_group_service.UpdateGroupRequest, Sequence[Tuple[str, Union[str, bytes]]] 

156 ]: 

157 """Pre-rpc interceptor for update_group 

158 

159 Override in a subclass to manipulate the request or metadata 

160 before they are sent to the ErrorGroupService server. 

161 """ 

162 return request, metadata 

163 

164 def post_update_group(self, response: common.ErrorGroup) -> common.ErrorGroup: 

165 """Post-rpc interceptor for update_group 

166 

167 DEPRECATED. Please use the `post_update_group_with_metadata` 

168 interceptor instead. 

169 

170 Override in a subclass to read or manipulate the response 

171 after it is returned by the ErrorGroupService server but before 

172 it is returned to user code. This `post_update_group` interceptor runs 

173 before the `post_update_group_with_metadata` interceptor. 

174 """ 

175 return response 

176 

177 def post_update_group_with_metadata( 

178 self, 

179 response: common.ErrorGroup, 

180 metadata: Sequence[Tuple[str, Union[str, bytes]]], 

181 ) -> Tuple[common.ErrorGroup, Sequence[Tuple[str, Union[str, bytes]]]]: 

182 """Post-rpc interceptor for update_group 

183 

184 Override in a subclass to read or manipulate the response or metadata after it 

185 is returned by the ErrorGroupService server but before it is returned to user code. 

186 

187 We recommend only using this `post_update_group_with_metadata` 

188 interceptor in new development instead of the `post_update_group` interceptor. 

189 When both interceptors are used, this `post_update_group_with_metadata` interceptor runs after the 

190 `post_update_group` interceptor. The (possibly modified) response returned by 

191 `post_update_group` will be passed to 

192 `post_update_group_with_metadata`. 

193 """ 

194 return response, metadata 

195 

196 

197@dataclasses.dataclass 

198class ErrorGroupServiceRestStub: 

199 _session: AuthorizedSession 

200 _host: str 

201 _interceptor: ErrorGroupServiceRestInterceptor 

202 

203 

204class ErrorGroupServiceRestTransport(_BaseErrorGroupServiceRestTransport): 

205 """REST backend synchronous transport for ErrorGroupService. 

206 

207 Service for retrieving and updating individual error groups. 

208 

209 This class defines the same methods as the primary client, so the 

210 primary client can load the underlying transport implementation 

211 and call it. 

212 

213 It sends JSON representations of protocol buffers over HTTP/1.1 

214 """ 

215 

216 def __init__( 

217 self, 

218 *, 

219 host: str = "clouderrorreporting.googleapis.com", 

220 credentials: Optional[ga_credentials.Credentials] = None, 

221 credentials_file: Optional[str] = None, 

222 scopes: Optional[Sequence[str]] = None, 

223 client_cert_source_for_mtls: Optional[Callable[[], Tuple[bytes, bytes]]] = None, 

224 quota_project_id: Optional[str] = None, 

225 client_info: gapic_v1.client_info.ClientInfo = DEFAULT_CLIENT_INFO, 

226 always_use_jwt_access: Optional[bool] = False, 

227 url_scheme: str = "https", 

228 interceptor: Optional[ErrorGroupServiceRestInterceptor] = None, 

229 api_audience: Optional[str] = None, 

230 ) -> None: 

231 """Instantiate the transport. 

232 

233 Args: 

234 host (Optional[str]): 

235 The hostname to connect to (default: 'clouderrorreporting.googleapis.com'). 

236 credentials (Optional[google.auth.credentials.Credentials]): The 

237 authorization credentials to attach to requests. These 

238 credentials identify the application to the service; if none 

239 are specified, the client will attempt to ascertain the 

240 credentials from the environment. 

241 

242 credentials_file (Optional[str]): A file with credentials that can 

243 be loaded with :func:`google.auth.load_credentials_from_file`. 

244 This argument is ignored if ``channel`` is provided. 

245 scopes (Optional(Sequence[str])): A list of scopes. This argument is 

246 ignored if ``channel`` is provided. 

247 client_cert_source_for_mtls (Callable[[], Tuple[bytes, bytes]]): Client 

248 certificate to configure mutual TLS HTTP channel. It is ignored 

249 if ``channel`` is provided. 

250 quota_project_id (Optional[str]): An optional project to use for billing 

251 and quota. 

252 client_info (google.api_core.gapic_v1.client_info.ClientInfo): 

253 The client info used to send a user-agent string along with 

254 API requests. If ``None``, then default info will be used. 

255 Generally, you only need to set this if you are developing 

256 your own client library. 

257 always_use_jwt_access (Optional[bool]): Whether self signed JWT should 

258 be used for service account credentials. 

259 url_scheme: the protocol scheme for the API endpoint. Normally 

260 "https", but for testing or local servers, 

261 "http" can be specified. 

262 """ 

263 # Run the base constructor 

264 # TODO(yon-mg): resolve other ctor params i.e. scopes, quota, etc. 

265 # TODO: When custom host (api_endpoint) is set, `scopes` must *also* be set on the 

266 # credentials object 

267 super().__init__( 

268 host=host, 

269 credentials=credentials, 

270 client_info=client_info, 

271 always_use_jwt_access=always_use_jwt_access, 

272 url_scheme=url_scheme, 

273 api_audience=api_audience, 

274 ) 

275 self._session = AuthorizedSession( 

276 self._credentials, default_host=self.DEFAULT_HOST 

277 ) 

278 if client_cert_source_for_mtls: 

279 self._session.configure_mtls_channel(client_cert_source_for_mtls) 

280 self._interceptor = interceptor or ErrorGroupServiceRestInterceptor() 

281 self._prep_wrapped_messages(client_info) 

282 

283 class _GetGroup( 

284 _BaseErrorGroupServiceRestTransport._BaseGetGroup, ErrorGroupServiceRestStub 

285 ): 

286 def __hash__(self): 

287 return hash("ErrorGroupServiceRestTransport.GetGroup") 

288 

289 @staticmethod 

290 def _get_response( 

291 host, 

292 metadata, 

293 query_params, 

294 session, 

295 timeout, 

296 transcoded_request, 

297 body=None, 

298 ): 

299 uri = transcoded_request["uri"] 

300 method = transcoded_request["method"] 

301 headers = dict(metadata) 

302 headers["Content-Type"] = "application/json" 

303 response = getattr(session, method)( 

304 "{host}{uri}".format(host=host, uri=uri), 

305 timeout=timeout, 

306 headers=headers, 

307 params=rest_helpers.flatten_query_params(query_params, strict=True), 

308 ) 

309 return response 

310 

311 def __call__( 

312 self, 

313 request: error_group_service.GetGroupRequest, 

314 *, 

315 retry: OptionalRetry = gapic_v1.method.DEFAULT, 

316 timeout: Optional[float] = None, 

317 metadata: Sequence[Tuple[str, Union[str, bytes]]] = (), 

318 ) -> common.ErrorGroup: 

319 r"""Call the get group method over HTTP. 

320 

321 Args: 

322 request (~.error_group_service.GetGroupRequest): 

323 The request object. A request to return an individual 

324 group. 

325 retry (google.api_core.retry.Retry): Designation of what errors, if any, 

326 should be retried. 

327 timeout (float): The timeout for this request. 

328 metadata (Sequence[Tuple[str, Union[str, bytes]]]): Key/value pairs which should be 

329 sent along with the request as metadata. Normally, each value must be of type `str`, 

330 but for metadata keys ending with the suffix `-bin`, the corresponding values must 

331 be of type `bytes`. 

332 

333 Returns: 

334 ~.common.ErrorGroup: 

335 Description of a group of similar 

336 error events. 

337 

338 """ 

339 

340 http_options = ( 

341 _BaseErrorGroupServiceRestTransport._BaseGetGroup._get_http_options() 

342 ) 

343 

344 request, metadata = self._interceptor.pre_get_group(request, metadata) 

345 transcoded_request = _BaseErrorGroupServiceRestTransport._BaseGetGroup._get_transcoded_request( 

346 http_options, request 

347 ) 

348 

349 # Jsonify the query params 

350 query_params = _BaseErrorGroupServiceRestTransport._BaseGetGroup._get_query_params_json( 

351 transcoded_request 

352 ) 

353 

354 if CLIENT_LOGGING_SUPPORTED and _LOGGER.isEnabledFor( 

355 logging.DEBUG 

356 ): # pragma: NO COVER 

357 request_url = "{host}{uri}".format( 

358 host=self._host, uri=transcoded_request["uri"] 

359 ) 

360 method = transcoded_request["method"] 

361 try: 

362 request_payload = type(request).to_json(request) 

363 except: 

364 request_payload = None 

365 http_request = { 

366 "payload": request_payload, 

367 "requestMethod": method, 

368 "requestUrl": request_url, 

369 "headers": dict(metadata), 

370 } 

371 _LOGGER.debug( 

372 f"Sending request for google.devtools.clouderrorreporting_v1beta1.ErrorGroupServiceClient.GetGroup", 

373 extra={ 

374 "serviceName": "google.devtools.clouderrorreporting.v1beta1.ErrorGroupService", 

375 "rpcName": "GetGroup", 

376 "httpRequest": http_request, 

377 "metadata": http_request["headers"], 

378 }, 

379 ) 

380 

381 # Send the request 

382 response = ErrorGroupServiceRestTransport._GetGroup._get_response( 

383 self._host, 

384 metadata, 

385 query_params, 

386 self._session, 

387 timeout, 

388 transcoded_request, 

389 ) 

390 

391 # In case of error, raise the appropriate core_exceptions.GoogleAPICallError exception 

392 # subclass. 

393 if response.status_code >= 400: 

394 raise core_exceptions.from_http_response(response) 

395 

396 # Return the response 

397 resp = common.ErrorGroup() 

398 pb_resp = common.ErrorGroup.pb(resp) 

399 

400 json_format.Parse(response.content, pb_resp, ignore_unknown_fields=True) 

401 

402 resp = self._interceptor.post_get_group(resp) 

403 response_metadata = [(k, str(v)) for k, v in response.headers.items()] 

404 resp, _ = self._interceptor.post_get_group_with_metadata( 

405 resp, response_metadata 

406 ) 

407 if CLIENT_LOGGING_SUPPORTED and _LOGGER.isEnabledFor( 

408 logging.DEBUG 

409 ): # pragma: NO COVER 

410 try: 

411 response_payload = common.ErrorGroup.to_json(response) 

412 except: 

413 response_payload = None 

414 http_response = { 

415 "payload": response_payload, 

416 "headers": dict(response.headers), 

417 "status": response.status_code, 

418 } 

419 _LOGGER.debug( 

420 "Received response for google.devtools.clouderrorreporting_v1beta1.ErrorGroupServiceClient.get_group", 

421 extra={ 

422 "serviceName": "google.devtools.clouderrorreporting.v1beta1.ErrorGroupService", 

423 "rpcName": "GetGroup", 

424 "metadata": http_response["headers"], 

425 "httpResponse": http_response, 

426 }, 

427 ) 

428 return resp 

429 

430 class _UpdateGroup( 

431 _BaseErrorGroupServiceRestTransport._BaseUpdateGroup, ErrorGroupServiceRestStub 

432 ): 

433 def __hash__(self): 

434 return hash("ErrorGroupServiceRestTransport.UpdateGroup") 

435 

436 @staticmethod 

437 def _get_response( 

438 host, 

439 metadata, 

440 query_params, 

441 session, 

442 timeout, 

443 transcoded_request, 

444 body=None, 

445 ): 

446 uri = transcoded_request["uri"] 

447 method = transcoded_request["method"] 

448 headers = dict(metadata) 

449 headers["Content-Type"] = "application/json" 

450 response = getattr(session, method)( 

451 "{host}{uri}".format(host=host, uri=uri), 

452 timeout=timeout, 

453 headers=headers, 

454 params=rest_helpers.flatten_query_params(query_params, strict=True), 

455 data=body, 

456 ) 

457 return response 

458 

459 def __call__( 

460 self, 

461 request: error_group_service.UpdateGroupRequest, 

462 *, 

463 retry: OptionalRetry = gapic_v1.method.DEFAULT, 

464 timeout: Optional[float] = None, 

465 metadata: Sequence[Tuple[str, Union[str, bytes]]] = (), 

466 ) -> common.ErrorGroup: 

467 r"""Call the update group method over HTTP. 

468 

469 Args: 

470 request (~.error_group_service.UpdateGroupRequest): 

471 The request object. A request to replace the existing 

472 data for the given group. 

473 retry (google.api_core.retry.Retry): Designation of what errors, if any, 

474 should be retried. 

475 timeout (float): The timeout for this request. 

476 metadata (Sequence[Tuple[str, Union[str, bytes]]]): Key/value pairs which should be 

477 sent along with the request as metadata. Normally, each value must be of type `str`, 

478 but for metadata keys ending with the suffix `-bin`, the corresponding values must 

479 be of type `bytes`. 

480 

481 Returns: 

482 ~.common.ErrorGroup: 

483 Description of a group of similar 

484 error events. 

485 

486 """ 

487 

488 http_options = ( 

489 _BaseErrorGroupServiceRestTransport._BaseUpdateGroup._get_http_options() 

490 ) 

491 

492 request, metadata = self._interceptor.pre_update_group(request, metadata) 

493 transcoded_request = _BaseErrorGroupServiceRestTransport._BaseUpdateGroup._get_transcoded_request( 

494 http_options, request 

495 ) 

496 

497 body = _BaseErrorGroupServiceRestTransport._BaseUpdateGroup._get_request_body_json( 

498 transcoded_request 

499 ) 

500 

501 # Jsonify the query params 

502 query_params = _BaseErrorGroupServiceRestTransport._BaseUpdateGroup._get_query_params_json( 

503 transcoded_request 

504 ) 

505 

506 if CLIENT_LOGGING_SUPPORTED and _LOGGER.isEnabledFor( 

507 logging.DEBUG 

508 ): # pragma: NO COVER 

509 request_url = "{host}{uri}".format( 

510 host=self._host, uri=transcoded_request["uri"] 

511 ) 

512 method = transcoded_request["method"] 

513 try: 

514 request_payload = type(request).to_json(request) 

515 except: 

516 request_payload = None 

517 http_request = { 

518 "payload": request_payload, 

519 "requestMethod": method, 

520 "requestUrl": request_url, 

521 "headers": dict(metadata), 

522 } 

523 _LOGGER.debug( 

524 f"Sending request for google.devtools.clouderrorreporting_v1beta1.ErrorGroupServiceClient.UpdateGroup", 

525 extra={ 

526 "serviceName": "google.devtools.clouderrorreporting.v1beta1.ErrorGroupService", 

527 "rpcName": "UpdateGroup", 

528 "httpRequest": http_request, 

529 "metadata": http_request["headers"], 

530 }, 

531 ) 

532 

533 # Send the request 

534 response = ErrorGroupServiceRestTransport._UpdateGroup._get_response( 

535 self._host, 

536 metadata, 

537 query_params, 

538 self._session, 

539 timeout, 

540 transcoded_request, 

541 body, 

542 ) 

543 

544 # In case of error, raise the appropriate core_exceptions.GoogleAPICallError exception 

545 # subclass. 

546 if response.status_code >= 400: 

547 raise core_exceptions.from_http_response(response) 

548 

549 # Return the response 

550 resp = common.ErrorGroup() 

551 pb_resp = common.ErrorGroup.pb(resp) 

552 

553 json_format.Parse(response.content, pb_resp, ignore_unknown_fields=True) 

554 

555 resp = self._interceptor.post_update_group(resp) 

556 response_metadata = [(k, str(v)) for k, v in response.headers.items()] 

557 resp, _ = self._interceptor.post_update_group_with_metadata( 

558 resp, response_metadata 

559 ) 

560 if CLIENT_LOGGING_SUPPORTED and _LOGGER.isEnabledFor( 

561 logging.DEBUG 

562 ): # pragma: NO COVER 

563 try: 

564 response_payload = common.ErrorGroup.to_json(response) 

565 except: 

566 response_payload = None 

567 http_response = { 

568 "payload": response_payload, 

569 "headers": dict(response.headers), 

570 "status": response.status_code, 

571 } 

572 _LOGGER.debug( 

573 "Received response for google.devtools.clouderrorreporting_v1beta1.ErrorGroupServiceClient.update_group", 

574 extra={ 

575 "serviceName": "google.devtools.clouderrorreporting.v1beta1.ErrorGroupService", 

576 "rpcName": "UpdateGroup", 

577 "metadata": http_response["headers"], 

578 "httpResponse": http_response, 

579 }, 

580 ) 

581 return resp 

582 

583 @property 

584 def get_group( 

585 self, 

586 ) -> Callable[[error_group_service.GetGroupRequest], common.ErrorGroup]: 

587 # The return type is fine, but mypy isn't sophisticated enough to determine what's going on here. 

588 # In C++ this would require a dynamic_cast 

589 return self._GetGroup(self._session, self._host, self._interceptor) # type: ignore 

590 

591 @property 

592 def update_group( 

593 self, 

594 ) -> Callable[[error_group_service.UpdateGroupRequest], common.ErrorGroup]: 

595 # The return type is fine, but mypy isn't sophisticated enough to determine what's going on here. 

596 # In C++ this would require a dynamic_cast 

597 return self._UpdateGroup(self._session, self._host, self._interceptor) # type: ignore 

598 

599 @property 

600 def kind(self) -> str: 

601 return "rest" 

602 

603 def close(self): 

604 self._session.close() 

605 

606 

607__all__ = ("ErrorGroupServiceRestTransport",)