1# -*- coding: utf-8 -*-
2# Copyright 2025 Google LLC
3#
4# Licensed under the Apache License, Version 2.0 (the "License");
5# you may not use this file except in compliance with the License.
6# You may obtain a copy of the License at
7#
8# http://www.apache.org/licenses/LICENSE-2.0
9#
10# Unless required by applicable law or agreed to in writing, software
11# distributed under the License is distributed on an "AS IS" BASIS,
12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13# See the License for the specific language governing permissions and
14# limitations under the License.
15#
16import logging
17import json # type: ignore
18
19from google.auth.transport.requests import AuthorizedSession # type: ignore
20from google.auth import credentials as ga_credentials # type: ignore
21from google.api_core import exceptions as core_exceptions
22from google.api_core import retry as retries
23from google.api_core import rest_helpers
24from google.api_core import rest_streaming
25from google.api_core import gapic_v1
26import google.protobuf
27
28from google.protobuf import json_format
29
30from requests import __version__ as requests_version
31import dataclasses
32from typing import Any, Callable, Dict, List, Optional, Sequence, Tuple, Union
33import warnings
34
35
36from google.cloud.errorreporting_v1beta1.types import common
37from google.cloud.errorreporting_v1beta1.types import error_group_service
38
39
40from .rest_base import _BaseErrorGroupServiceRestTransport
41from .base import DEFAULT_CLIENT_INFO as BASE_DEFAULT_CLIENT_INFO
42
43try:
44 OptionalRetry = Union[retries.Retry, gapic_v1.method._MethodDefault, None]
45except AttributeError: # pragma: NO COVER
46 OptionalRetry = Union[retries.Retry, object, None] # type: ignore
47
48try:
49 from google.api_core import client_logging # type: ignore
50
51 CLIENT_LOGGING_SUPPORTED = True # pragma: NO COVER
52except ImportError: # pragma: NO COVER
53 CLIENT_LOGGING_SUPPORTED = False
54
55_LOGGER = logging.getLogger(__name__)
56
57DEFAULT_CLIENT_INFO = gapic_v1.client_info.ClientInfo(
58 gapic_version=BASE_DEFAULT_CLIENT_INFO.gapic_version,
59 grpc_version=None,
60 rest_version=f"requests@{requests_version}",
61)
62
63if hasattr(DEFAULT_CLIENT_INFO, "protobuf_runtime_version"): # pragma: NO COVER
64 DEFAULT_CLIENT_INFO.protobuf_runtime_version = google.protobuf.__version__
65
66
67class ErrorGroupServiceRestInterceptor:
68 """Interceptor for ErrorGroupService.
69
70 Interceptors are used to manipulate requests, request metadata, and responses
71 in arbitrary ways.
72 Example use cases include:
73 * Logging
74 * Verifying requests according to service or custom semantics
75 * Stripping extraneous information from responses
76
77 These use cases and more can be enabled by injecting an
78 instance of a custom subclass when constructing the ErrorGroupServiceRestTransport.
79
80 .. code-block:: python
81 class MyCustomErrorGroupServiceInterceptor(ErrorGroupServiceRestInterceptor):
82 def pre_get_group(self, request, metadata):
83 logging.log(f"Received request: {request}")
84 return request, metadata
85
86 def post_get_group(self, response):
87 logging.log(f"Received response: {response}")
88 return response
89
90 def pre_update_group(self, request, metadata):
91 logging.log(f"Received request: {request}")
92 return request, metadata
93
94 def post_update_group(self, response):
95 logging.log(f"Received response: {response}")
96 return response
97
98 transport = ErrorGroupServiceRestTransport(interceptor=MyCustomErrorGroupServiceInterceptor())
99 client = ErrorGroupServiceClient(transport=transport)
100
101
102 """
103
104 def pre_get_group(
105 self,
106 request: error_group_service.GetGroupRequest,
107 metadata: Sequence[Tuple[str, Union[str, bytes]]],
108 ) -> Tuple[
109 error_group_service.GetGroupRequest, Sequence[Tuple[str, Union[str, bytes]]]
110 ]:
111 """Pre-rpc interceptor for get_group
112
113 Override in a subclass to manipulate the request or metadata
114 before they are sent to the ErrorGroupService server.
115 """
116 return request, metadata
117
118 def post_get_group(self, response: common.ErrorGroup) -> common.ErrorGroup:
119 """Post-rpc interceptor for get_group
120
121 DEPRECATED. Please use the `post_get_group_with_metadata`
122 interceptor instead.
123
124 Override in a subclass to read or manipulate the response
125 after it is returned by the ErrorGroupService server but before
126 it is returned to user code. This `post_get_group` interceptor runs
127 before the `post_get_group_with_metadata` interceptor.
128 """
129 return response
130
131 def post_get_group_with_metadata(
132 self,
133 response: common.ErrorGroup,
134 metadata: Sequence[Tuple[str, Union[str, bytes]]],
135 ) -> Tuple[common.ErrorGroup, Sequence[Tuple[str, Union[str, bytes]]]]:
136 """Post-rpc interceptor for get_group
137
138 Override in a subclass to read or manipulate the response or metadata after it
139 is returned by the ErrorGroupService server but before it is returned to user code.
140
141 We recommend only using this `post_get_group_with_metadata`
142 interceptor in new development instead of the `post_get_group` interceptor.
143 When both interceptors are used, this `post_get_group_with_metadata` interceptor runs after the
144 `post_get_group` interceptor. The (possibly modified) response returned by
145 `post_get_group` will be passed to
146 `post_get_group_with_metadata`.
147 """
148 return response, metadata
149
150 def pre_update_group(
151 self,
152 request: error_group_service.UpdateGroupRequest,
153 metadata: Sequence[Tuple[str, Union[str, bytes]]],
154 ) -> Tuple[
155 error_group_service.UpdateGroupRequest, Sequence[Tuple[str, Union[str, bytes]]]
156 ]:
157 """Pre-rpc interceptor for update_group
158
159 Override in a subclass to manipulate the request or metadata
160 before they are sent to the ErrorGroupService server.
161 """
162 return request, metadata
163
164 def post_update_group(self, response: common.ErrorGroup) -> common.ErrorGroup:
165 """Post-rpc interceptor for update_group
166
167 DEPRECATED. Please use the `post_update_group_with_metadata`
168 interceptor instead.
169
170 Override in a subclass to read or manipulate the response
171 after it is returned by the ErrorGroupService server but before
172 it is returned to user code. This `post_update_group` interceptor runs
173 before the `post_update_group_with_metadata` interceptor.
174 """
175 return response
176
177 def post_update_group_with_metadata(
178 self,
179 response: common.ErrorGroup,
180 metadata: Sequence[Tuple[str, Union[str, bytes]]],
181 ) -> Tuple[common.ErrorGroup, Sequence[Tuple[str, Union[str, bytes]]]]:
182 """Post-rpc interceptor for update_group
183
184 Override in a subclass to read or manipulate the response or metadata after it
185 is returned by the ErrorGroupService server but before it is returned to user code.
186
187 We recommend only using this `post_update_group_with_metadata`
188 interceptor in new development instead of the `post_update_group` interceptor.
189 When both interceptors are used, this `post_update_group_with_metadata` interceptor runs after the
190 `post_update_group` interceptor. The (possibly modified) response returned by
191 `post_update_group` will be passed to
192 `post_update_group_with_metadata`.
193 """
194 return response, metadata
195
196
197@dataclasses.dataclass
198class ErrorGroupServiceRestStub:
199 _session: AuthorizedSession
200 _host: str
201 _interceptor: ErrorGroupServiceRestInterceptor
202
203
204class ErrorGroupServiceRestTransport(_BaseErrorGroupServiceRestTransport):
205 """REST backend synchronous transport for ErrorGroupService.
206
207 Service for retrieving and updating individual error groups.
208
209 This class defines the same methods as the primary client, so the
210 primary client can load the underlying transport implementation
211 and call it.
212
213 It sends JSON representations of protocol buffers over HTTP/1.1
214 """
215
216 def __init__(
217 self,
218 *,
219 host: str = "clouderrorreporting.googleapis.com",
220 credentials: Optional[ga_credentials.Credentials] = None,
221 credentials_file: Optional[str] = None,
222 scopes: Optional[Sequence[str]] = None,
223 client_cert_source_for_mtls: Optional[Callable[[], Tuple[bytes, bytes]]] = None,
224 quota_project_id: Optional[str] = None,
225 client_info: gapic_v1.client_info.ClientInfo = DEFAULT_CLIENT_INFO,
226 always_use_jwt_access: Optional[bool] = False,
227 url_scheme: str = "https",
228 interceptor: Optional[ErrorGroupServiceRestInterceptor] = None,
229 api_audience: Optional[str] = None,
230 ) -> None:
231 """Instantiate the transport.
232
233 Args:
234 host (Optional[str]):
235 The hostname to connect to (default: 'clouderrorreporting.googleapis.com').
236 credentials (Optional[google.auth.credentials.Credentials]): The
237 authorization credentials to attach to requests. These
238 credentials identify the application to the service; if none
239 are specified, the client will attempt to ascertain the
240 credentials from the environment.
241
242 credentials_file (Optional[str]): Deprecated. A file with credentials that can
243 be loaded with :func:`google.auth.load_credentials_from_file`.
244 This argument is ignored if ``channel`` is provided. This argument will be
245 removed in the next major version of this library.
246 scopes (Optional(Sequence[str])): A list of scopes. This argument is
247 ignored if ``channel`` is provided.
248 client_cert_source_for_mtls (Callable[[], Tuple[bytes, bytes]]): Client
249 certificate to configure mutual TLS HTTP channel. It is ignored
250 if ``channel`` is provided.
251 quota_project_id (Optional[str]): An optional project to use for billing
252 and quota.
253 client_info (google.api_core.gapic_v1.client_info.ClientInfo):
254 The client info used to send a user-agent string along with
255 API requests. If ``None``, then default info will be used.
256 Generally, you only need to set this if you are developing
257 your own client library.
258 always_use_jwt_access (Optional[bool]): Whether self signed JWT should
259 be used for service account credentials.
260 url_scheme: the protocol scheme for the API endpoint. Normally
261 "https", but for testing or local servers,
262 "http" can be specified.
263 """
264 # Run the base constructor
265 # TODO(yon-mg): resolve other ctor params i.e. scopes, quota, etc.
266 # TODO: When custom host (api_endpoint) is set, `scopes` must *also* be set on the
267 # credentials object
268 super().__init__(
269 host=host,
270 credentials=credentials,
271 client_info=client_info,
272 always_use_jwt_access=always_use_jwt_access,
273 url_scheme=url_scheme,
274 api_audience=api_audience,
275 )
276 self._session = AuthorizedSession(
277 self._credentials, default_host=self.DEFAULT_HOST
278 )
279 if client_cert_source_for_mtls:
280 self._session.configure_mtls_channel(client_cert_source_for_mtls)
281 self._interceptor = interceptor or ErrorGroupServiceRestInterceptor()
282 self._prep_wrapped_messages(client_info)
283
284 class _GetGroup(
285 _BaseErrorGroupServiceRestTransport._BaseGetGroup, ErrorGroupServiceRestStub
286 ):
287 def __hash__(self):
288 return hash("ErrorGroupServiceRestTransport.GetGroup")
289
290 @staticmethod
291 def _get_response(
292 host,
293 metadata,
294 query_params,
295 session,
296 timeout,
297 transcoded_request,
298 body=None,
299 ):
300 uri = transcoded_request["uri"]
301 method = transcoded_request["method"]
302 headers = dict(metadata)
303 headers["Content-Type"] = "application/json"
304 response = getattr(session, method)(
305 "{host}{uri}".format(host=host, uri=uri),
306 timeout=timeout,
307 headers=headers,
308 params=rest_helpers.flatten_query_params(query_params, strict=True),
309 )
310 return response
311
312 def __call__(
313 self,
314 request: error_group_service.GetGroupRequest,
315 *,
316 retry: OptionalRetry = gapic_v1.method.DEFAULT,
317 timeout: Optional[float] = None,
318 metadata: Sequence[Tuple[str, Union[str, bytes]]] = (),
319 ) -> common.ErrorGroup:
320 r"""Call the get group method over HTTP.
321
322 Args:
323 request (~.error_group_service.GetGroupRequest):
324 The request object. A request to return an individual
325 group.
326 retry (google.api_core.retry.Retry): Designation of what errors, if any,
327 should be retried.
328 timeout (float): The timeout for this request.
329 metadata (Sequence[Tuple[str, Union[str, bytes]]]): Key/value pairs which should be
330 sent along with the request as metadata. Normally, each value must be of type `str`,
331 but for metadata keys ending with the suffix `-bin`, the corresponding values must
332 be of type `bytes`.
333
334 Returns:
335 ~.common.ErrorGroup:
336 Description of a group of similar
337 error events.
338
339 """
340
341 http_options = (
342 _BaseErrorGroupServiceRestTransport._BaseGetGroup._get_http_options()
343 )
344
345 request, metadata = self._interceptor.pre_get_group(request, metadata)
346 transcoded_request = _BaseErrorGroupServiceRestTransport._BaseGetGroup._get_transcoded_request(
347 http_options, request
348 )
349
350 # Jsonify the query params
351 query_params = _BaseErrorGroupServiceRestTransport._BaseGetGroup._get_query_params_json(
352 transcoded_request
353 )
354
355 if CLIENT_LOGGING_SUPPORTED and _LOGGER.isEnabledFor(
356 logging.DEBUG
357 ): # pragma: NO COVER
358 request_url = "{host}{uri}".format(
359 host=self._host, uri=transcoded_request["uri"]
360 )
361 method = transcoded_request["method"]
362 try:
363 request_payload = type(request).to_json(request)
364 except:
365 request_payload = None
366 http_request = {
367 "payload": request_payload,
368 "requestMethod": method,
369 "requestUrl": request_url,
370 "headers": dict(metadata),
371 }
372 _LOGGER.debug(
373 f"Sending request for google.devtools.clouderrorreporting_v1beta1.ErrorGroupServiceClient.GetGroup",
374 extra={
375 "serviceName": "google.devtools.clouderrorreporting.v1beta1.ErrorGroupService",
376 "rpcName": "GetGroup",
377 "httpRequest": http_request,
378 "metadata": http_request["headers"],
379 },
380 )
381
382 # Send the request
383 response = ErrorGroupServiceRestTransport._GetGroup._get_response(
384 self._host,
385 metadata,
386 query_params,
387 self._session,
388 timeout,
389 transcoded_request,
390 )
391
392 # In case of error, raise the appropriate core_exceptions.GoogleAPICallError exception
393 # subclass.
394 if response.status_code >= 400:
395 raise core_exceptions.from_http_response(response)
396
397 # Return the response
398 resp = common.ErrorGroup()
399 pb_resp = common.ErrorGroup.pb(resp)
400
401 json_format.Parse(response.content, pb_resp, ignore_unknown_fields=True)
402
403 resp = self._interceptor.post_get_group(resp)
404 response_metadata = [(k, str(v)) for k, v in response.headers.items()]
405 resp, _ = self._interceptor.post_get_group_with_metadata(
406 resp, response_metadata
407 )
408 if CLIENT_LOGGING_SUPPORTED and _LOGGER.isEnabledFor(
409 logging.DEBUG
410 ): # pragma: NO COVER
411 try:
412 response_payload = common.ErrorGroup.to_json(response)
413 except:
414 response_payload = None
415 http_response = {
416 "payload": response_payload,
417 "headers": dict(response.headers),
418 "status": response.status_code,
419 }
420 _LOGGER.debug(
421 "Received response for google.devtools.clouderrorreporting_v1beta1.ErrorGroupServiceClient.get_group",
422 extra={
423 "serviceName": "google.devtools.clouderrorreporting.v1beta1.ErrorGroupService",
424 "rpcName": "GetGroup",
425 "metadata": http_response["headers"],
426 "httpResponse": http_response,
427 },
428 )
429 return resp
430
431 class _UpdateGroup(
432 _BaseErrorGroupServiceRestTransport._BaseUpdateGroup, ErrorGroupServiceRestStub
433 ):
434 def __hash__(self):
435 return hash("ErrorGroupServiceRestTransport.UpdateGroup")
436
437 @staticmethod
438 def _get_response(
439 host,
440 metadata,
441 query_params,
442 session,
443 timeout,
444 transcoded_request,
445 body=None,
446 ):
447 uri = transcoded_request["uri"]
448 method = transcoded_request["method"]
449 headers = dict(metadata)
450 headers["Content-Type"] = "application/json"
451 response = getattr(session, method)(
452 "{host}{uri}".format(host=host, uri=uri),
453 timeout=timeout,
454 headers=headers,
455 params=rest_helpers.flatten_query_params(query_params, strict=True),
456 data=body,
457 )
458 return response
459
460 def __call__(
461 self,
462 request: error_group_service.UpdateGroupRequest,
463 *,
464 retry: OptionalRetry = gapic_v1.method.DEFAULT,
465 timeout: Optional[float] = None,
466 metadata: Sequence[Tuple[str, Union[str, bytes]]] = (),
467 ) -> common.ErrorGroup:
468 r"""Call the update group method over HTTP.
469
470 Args:
471 request (~.error_group_service.UpdateGroupRequest):
472 The request object. A request to replace the existing
473 data for the given group.
474 retry (google.api_core.retry.Retry): Designation of what errors, if any,
475 should be retried.
476 timeout (float): The timeout for this request.
477 metadata (Sequence[Tuple[str, Union[str, bytes]]]): Key/value pairs which should be
478 sent along with the request as metadata. Normally, each value must be of type `str`,
479 but for metadata keys ending with the suffix `-bin`, the corresponding values must
480 be of type `bytes`.
481
482 Returns:
483 ~.common.ErrorGroup:
484 Description of a group of similar
485 error events.
486
487 """
488
489 http_options = (
490 _BaseErrorGroupServiceRestTransport._BaseUpdateGroup._get_http_options()
491 )
492
493 request, metadata = self._interceptor.pre_update_group(request, metadata)
494 transcoded_request = _BaseErrorGroupServiceRestTransport._BaseUpdateGroup._get_transcoded_request(
495 http_options, request
496 )
497
498 body = _BaseErrorGroupServiceRestTransport._BaseUpdateGroup._get_request_body_json(
499 transcoded_request
500 )
501
502 # Jsonify the query params
503 query_params = _BaseErrorGroupServiceRestTransport._BaseUpdateGroup._get_query_params_json(
504 transcoded_request
505 )
506
507 if CLIENT_LOGGING_SUPPORTED and _LOGGER.isEnabledFor(
508 logging.DEBUG
509 ): # pragma: NO COVER
510 request_url = "{host}{uri}".format(
511 host=self._host, uri=transcoded_request["uri"]
512 )
513 method = transcoded_request["method"]
514 try:
515 request_payload = type(request).to_json(request)
516 except:
517 request_payload = None
518 http_request = {
519 "payload": request_payload,
520 "requestMethod": method,
521 "requestUrl": request_url,
522 "headers": dict(metadata),
523 }
524 _LOGGER.debug(
525 f"Sending request for google.devtools.clouderrorreporting_v1beta1.ErrorGroupServiceClient.UpdateGroup",
526 extra={
527 "serviceName": "google.devtools.clouderrorreporting.v1beta1.ErrorGroupService",
528 "rpcName": "UpdateGroup",
529 "httpRequest": http_request,
530 "metadata": http_request["headers"],
531 },
532 )
533
534 # Send the request
535 response = ErrorGroupServiceRestTransport._UpdateGroup._get_response(
536 self._host,
537 metadata,
538 query_params,
539 self._session,
540 timeout,
541 transcoded_request,
542 body,
543 )
544
545 # In case of error, raise the appropriate core_exceptions.GoogleAPICallError exception
546 # subclass.
547 if response.status_code >= 400:
548 raise core_exceptions.from_http_response(response)
549
550 # Return the response
551 resp = common.ErrorGroup()
552 pb_resp = common.ErrorGroup.pb(resp)
553
554 json_format.Parse(response.content, pb_resp, ignore_unknown_fields=True)
555
556 resp = self._interceptor.post_update_group(resp)
557 response_metadata = [(k, str(v)) for k, v in response.headers.items()]
558 resp, _ = self._interceptor.post_update_group_with_metadata(
559 resp, response_metadata
560 )
561 if CLIENT_LOGGING_SUPPORTED and _LOGGER.isEnabledFor(
562 logging.DEBUG
563 ): # pragma: NO COVER
564 try:
565 response_payload = common.ErrorGroup.to_json(response)
566 except:
567 response_payload = None
568 http_response = {
569 "payload": response_payload,
570 "headers": dict(response.headers),
571 "status": response.status_code,
572 }
573 _LOGGER.debug(
574 "Received response for google.devtools.clouderrorreporting_v1beta1.ErrorGroupServiceClient.update_group",
575 extra={
576 "serviceName": "google.devtools.clouderrorreporting.v1beta1.ErrorGroupService",
577 "rpcName": "UpdateGroup",
578 "metadata": http_response["headers"],
579 "httpResponse": http_response,
580 },
581 )
582 return resp
583
584 @property
585 def get_group(
586 self,
587 ) -> Callable[[error_group_service.GetGroupRequest], common.ErrorGroup]:
588 # The return type is fine, but mypy isn't sophisticated enough to determine what's going on here.
589 # In C++ this would require a dynamic_cast
590 return self._GetGroup(self._session, self._host, self._interceptor) # type: ignore
591
592 @property
593 def update_group(
594 self,
595 ) -> Callable[[error_group_service.UpdateGroupRequest], common.ErrorGroup]:
596 # The return type is fine, but mypy isn't sophisticated enough to determine what's going on here.
597 # In C++ this would require a dynamic_cast
598 return self._UpdateGroup(self._session, self._host, self._interceptor) # type: ignore
599
600 @property
601 def kind(self) -> str:
602 return "rest"
603
604 def close(self):
605 self._session.close()
606
607
608__all__ = ("ErrorGroupServiceRestTransport",)