1# -*- coding: utf-8 -*-
2# Copyright 2025 Google LLC
3#
4# Licensed under the Apache License, Version 2.0 (the "License");
5# you may not use this file except in compliance with the License.
6# You may obtain a copy of the License at
7#
8# http://www.apache.org/licenses/LICENSE-2.0
9#
10# Unless required by applicable law or agreed to in writing, software
11# distributed under the License is distributed on an "AS IS" BASIS,
12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13# See the License for the specific language governing permissions and
14# limitations under the License.
15#
16import logging
17import json # type: ignore
18
19from google.auth.transport.requests import AuthorizedSession # type: ignore
20from google.auth import credentials as ga_credentials # type: ignore
21from google.api_core import exceptions as core_exceptions
22from google.api_core import retry as retries
23from google.api_core import rest_helpers
24from google.api_core import rest_streaming
25from google.api_core import gapic_v1
26import google.protobuf
27
28from google.protobuf import json_format
29
30from requests import __version__ as requests_version
31import dataclasses
32from typing import Any, Callable, Dict, List, Optional, Sequence, Tuple, Union
33import warnings
34
35
36from google.cloud.errorreporting_v1beta1.types import common
37from google.cloud.errorreporting_v1beta1.types import error_group_service
38
39
40from .rest_base import _BaseErrorGroupServiceRestTransport
41from .base import DEFAULT_CLIENT_INFO as BASE_DEFAULT_CLIENT_INFO
42
43try:
44 OptionalRetry = Union[retries.Retry, gapic_v1.method._MethodDefault, None]
45except AttributeError: # pragma: NO COVER
46 OptionalRetry = Union[retries.Retry, object, None] # type: ignore
47
48try:
49 from google.api_core import client_logging # type: ignore
50
51 CLIENT_LOGGING_SUPPORTED = True # pragma: NO COVER
52except ImportError: # pragma: NO COVER
53 CLIENT_LOGGING_SUPPORTED = False
54
55_LOGGER = logging.getLogger(__name__)
56
57DEFAULT_CLIENT_INFO = gapic_v1.client_info.ClientInfo(
58 gapic_version=BASE_DEFAULT_CLIENT_INFO.gapic_version,
59 grpc_version=None,
60 rest_version=f"requests@{requests_version}",
61)
62
63if hasattr(DEFAULT_CLIENT_INFO, "protobuf_runtime_version"): # pragma: NO COVER
64 DEFAULT_CLIENT_INFO.protobuf_runtime_version = google.protobuf.__version__
65
66
67class ErrorGroupServiceRestInterceptor:
68 """Interceptor for ErrorGroupService.
69
70 Interceptors are used to manipulate requests, request metadata, and responses
71 in arbitrary ways.
72 Example use cases include:
73 * Logging
74 * Verifying requests according to service or custom semantics
75 * Stripping extraneous information from responses
76
77 These use cases and more can be enabled by injecting an
78 instance of a custom subclass when constructing the ErrorGroupServiceRestTransport.
79
80 .. code-block:: python
81 class MyCustomErrorGroupServiceInterceptor(ErrorGroupServiceRestInterceptor):
82 def pre_get_group(self, request, metadata):
83 logging.log(f"Received request: {request}")
84 return request, metadata
85
86 def post_get_group(self, response):
87 logging.log(f"Received response: {response}")
88 return response
89
90 def pre_update_group(self, request, metadata):
91 logging.log(f"Received request: {request}")
92 return request, metadata
93
94 def post_update_group(self, response):
95 logging.log(f"Received response: {response}")
96 return response
97
98 transport = ErrorGroupServiceRestTransport(interceptor=MyCustomErrorGroupServiceInterceptor())
99 client = ErrorGroupServiceClient(transport=transport)
100
101
102 """
103
104 def pre_get_group(
105 self,
106 request: error_group_service.GetGroupRequest,
107 metadata: Sequence[Tuple[str, Union[str, bytes]]],
108 ) -> Tuple[
109 error_group_service.GetGroupRequest, Sequence[Tuple[str, Union[str, bytes]]]
110 ]:
111 """Pre-rpc interceptor for get_group
112
113 Override in a subclass to manipulate the request or metadata
114 before they are sent to the ErrorGroupService server.
115 """
116 return request, metadata
117
118 def post_get_group(self, response: common.ErrorGroup) -> common.ErrorGroup:
119 """Post-rpc interceptor for get_group
120
121 DEPRECATED. Please use the `post_get_group_with_metadata`
122 interceptor instead.
123
124 Override in a subclass to read or manipulate the response
125 after it is returned by the ErrorGroupService server but before
126 it is returned to user code. This `post_get_group` interceptor runs
127 before the `post_get_group_with_metadata` interceptor.
128 """
129 return response
130
131 def post_get_group_with_metadata(
132 self,
133 response: common.ErrorGroup,
134 metadata: Sequence[Tuple[str, Union[str, bytes]]],
135 ) -> Tuple[common.ErrorGroup, Sequence[Tuple[str, Union[str, bytes]]]]:
136 """Post-rpc interceptor for get_group
137
138 Override in a subclass to read or manipulate the response or metadata after it
139 is returned by the ErrorGroupService server but before it is returned to user code.
140
141 We recommend only using this `post_get_group_with_metadata`
142 interceptor in new development instead of the `post_get_group` interceptor.
143 When both interceptors are used, this `post_get_group_with_metadata` interceptor runs after the
144 `post_get_group` interceptor. The (possibly modified) response returned by
145 `post_get_group` will be passed to
146 `post_get_group_with_metadata`.
147 """
148 return response, metadata
149
150 def pre_update_group(
151 self,
152 request: error_group_service.UpdateGroupRequest,
153 metadata: Sequence[Tuple[str, Union[str, bytes]]],
154 ) -> Tuple[
155 error_group_service.UpdateGroupRequest, Sequence[Tuple[str, Union[str, bytes]]]
156 ]:
157 """Pre-rpc interceptor for update_group
158
159 Override in a subclass to manipulate the request or metadata
160 before they are sent to the ErrorGroupService server.
161 """
162 return request, metadata
163
164 def post_update_group(self, response: common.ErrorGroup) -> common.ErrorGroup:
165 """Post-rpc interceptor for update_group
166
167 DEPRECATED. Please use the `post_update_group_with_metadata`
168 interceptor instead.
169
170 Override in a subclass to read or manipulate the response
171 after it is returned by the ErrorGroupService server but before
172 it is returned to user code. This `post_update_group` interceptor runs
173 before the `post_update_group_with_metadata` interceptor.
174 """
175 return response
176
177 def post_update_group_with_metadata(
178 self,
179 response: common.ErrorGroup,
180 metadata: Sequence[Tuple[str, Union[str, bytes]]],
181 ) -> Tuple[common.ErrorGroup, Sequence[Tuple[str, Union[str, bytes]]]]:
182 """Post-rpc interceptor for update_group
183
184 Override in a subclass to read or manipulate the response or metadata after it
185 is returned by the ErrorGroupService server but before it is returned to user code.
186
187 We recommend only using this `post_update_group_with_metadata`
188 interceptor in new development instead of the `post_update_group` interceptor.
189 When both interceptors are used, this `post_update_group_with_metadata` interceptor runs after the
190 `post_update_group` interceptor. The (possibly modified) response returned by
191 `post_update_group` will be passed to
192 `post_update_group_with_metadata`.
193 """
194 return response, metadata
195
196
197@dataclasses.dataclass
198class ErrorGroupServiceRestStub:
199 _session: AuthorizedSession
200 _host: str
201 _interceptor: ErrorGroupServiceRestInterceptor
202
203
204class ErrorGroupServiceRestTransport(_BaseErrorGroupServiceRestTransport):
205 """REST backend synchronous transport for ErrorGroupService.
206
207 Service for retrieving and updating individual error groups.
208
209 This class defines the same methods as the primary client, so the
210 primary client can load the underlying transport implementation
211 and call it.
212
213 It sends JSON representations of protocol buffers over HTTP/1.1
214 """
215
216 def __init__(
217 self,
218 *,
219 host: str = "clouderrorreporting.googleapis.com",
220 credentials: Optional[ga_credentials.Credentials] = None,
221 credentials_file: Optional[str] = None,
222 scopes: Optional[Sequence[str]] = None,
223 client_cert_source_for_mtls: Optional[Callable[[], Tuple[bytes, bytes]]] = None,
224 quota_project_id: Optional[str] = None,
225 client_info: gapic_v1.client_info.ClientInfo = DEFAULT_CLIENT_INFO,
226 always_use_jwt_access: Optional[bool] = False,
227 url_scheme: str = "https",
228 interceptor: Optional[ErrorGroupServiceRestInterceptor] = None,
229 api_audience: Optional[str] = None,
230 ) -> None:
231 """Instantiate the transport.
232
233 Args:
234 host (Optional[str]):
235 The hostname to connect to (default: 'clouderrorreporting.googleapis.com').
236 credentials (Optional[google.auth.credentials.Credentials]): The
237 authorization credentials to attach to requests. These
238 credentials identify the application to the service; if none
239 are specified, the client will attempt to ascertain the
240 credentials from the environment.
241
242 credentials_file (Optional[str]): A file with credentials that can
243 be loaded with :func:`google.auth.load_credentials_from_file`.
244 This argument is ignored if ``channel`` is provided.
245 scopes (Optional(Sequence[str])): A list of scopes. This argument is
246 ignored if ``channel`` is provided.
247 client_cert_source_for_mtls (Callable[[], Tuple[bytes, bytes]]): Client
248 certificate to configure mutual TLS HTTP channel. It is ignored
249 if ``channel`` is provided.
250 quota_project_id (Optional[str]): An optional project to use for billing
251 and quota.
252 client_info (google.api_core.gapic_v1.client_info.ClientInfo):
253 The client info used to send a user-agent string along with
254 API requests. If ``None``, then default info will be used.
255 Generally, you only need to set this if you are developing
256 your own client library.
257 always_use_jwt_access (Optional[bool]): Whether self signed JWT should
258 be used for service account credentials.
259 url_scheme: the protocol scheme for the API endpoint. Normally
260 "https", but for testing or local servers,
261 "http" can be specified.
262 """
263 # Run the base constructor
264 # TODO(yon-mg): resolve other ctor params i.e. scopes, quota, etc.
265 # TODO: When custom host (api_endpoint) is set, `scopes` must *also* be set on the
266 # credentials object
267 super().__init__(
268 host=host,
269 credentials=credentials,
270 client_info=client_info,
271 always_use_jwt_access=always_use_jwt_access,
272 url_scheme=url_scheme,
273 api_audience=api_audience,
274 )
275 self._session = AuthorizedSession(
276 self._credentials, default_host=self.DEFAULT_HOST
277 )
278 if client_cert_source_for_mtls:
279 self._session.configure_mtls_channel(client_cert_source_for_mtls)
280 self._interceptor = interceptor or ErrorGroupServiceRestInterceptor()
281 self._prep_wrapped_messages(client_info)
282
283 class _GetGroup(
284 _BaseErrorGroupServiceRestTransport._BaseGetGroup, ErrorGroupServiceRestStub
285 ):
286 def __hash__(self):
287 return hash("ErrorGroupServiceRestTransport.GetGroup")
288
289 @staticmethod
290 def _get_response(
291 host,
292 metadata,
293 query_params,
294 session,
295 timeout,
296 transcoded_request,
297 body=None,
298 ):
299 uri = transcoded_request["uri"]
300 method = transcoded_request["method"]
301 headers = dict(metadata)
302 headers["Content-Type"] = "application/json"
303 response = getattr(session, method)(
304 "{host}{uri}".format(host=host, uri=uri),
305 timeout=timeout,
306 headers=headers,
307 params=rest_helpers.flatten_query_params(query_params, strict=True),
308 )
309 return response
310
311 def __call__(
312 self,
313 request: error_group_service.GetGroupRequest,
314 *,
315 retry: OptionalRetry = gapic_v1.method.DEFAULT,
316 timeout: Optional[float] = None,
317 metadata: Sequence[Tuple[str, Union[str, bytes]]] = (),
318 ) -> common.ErrorGroup:
319 r"""Call the get group method over HTTP.
320
321 Args:
322 request (~.error_group_service.GetGroupRequest):
323 The request object. A request to return an individual
324 group.
325 retry (google.api_core.retry.Retry): Designation of what errors, if any,
326 should be retried.
327 timeout (float): The timeout for this request.
328 metadata (Sequence[Tuple[str, Union[str, bytes]]]): Key/value pairs which should be
329 sent along with the request as metadata. Normally, each value must be of type `str`,
330 but for metadata keys ending with the suffix `-bin`, the corresponding values must
331 be of type `bytes`.
332
333 Returns:
334 ~.common.ErrorGroup:
335 Description of a group of similar
336 error events.
337
338 """
339
340 http_options = (
341 _BaseErrorGroupServiceRestTransport._BaseGetGroup._get_http_options()
342 )
343
344 request, metadata = self._interceptor.pre_get_group(request, metadata)
345 transcoded_request = _BaseErrorGroupServiceRestTransport._BaseGetGroup._get_transcoded_request(
346 http_options, request
347 )
348
349 # Jsonify the query params
350 query_params = _BaseErrorGroupServiceRestTransport._BaseGetGroup._get_query_params_json(
351 transcoded_request
352 )
353
354 if CLIENT_LOGGING_SUPPORTED and _LOGGER.isEnabledFor(
355 logging.DEBUG
356 ): # pragma: NO COVER
357 request_url = "{host}{uri}".format(
358 host=self._host, uri=transcoded_request["uri"]
359 )
360 method = transcoded_request["method"]
361 try:
362 request_payload = type(request).to_json(request)
363 except:
364 request_payload = None
365 http_request = {
366 "payload": request_payload,
367 "requestMethod": method,
368 "requestUrl": request_url,
369 "headers": dict(metadata),
370 }
371 _LOGGER.debug(
372 f"Sending request for google.devtools.clouderrorreporting_v1beta1.ErrorGroupServiceClient.GetGroup",
373 extra={
374 "serviceName": "google.devtools.clouderrorreporting.v1beta1.ErrorGroupService",
375 "rpcName": "GetGroup",
376 "httpRequest": http_request,
377 "metadata": http_request["headers"],
378 },
379 )
380
381 # Send the request
382 response = ErrorGroupServiceRestTransport._GetGroup._get_response(
383 self._host,
384 metadata,
385 query_params,
386 self._session,
387 timeout,
388 transcoded_request,
389 )
390
391 # In case of error, raise the appropriate core_exceptions.GoogleAPICallError exception
392 # subclass.
393 if response.status_code >= 400:
394 raise core_exceptions.from_http_response(response)
395
396 # Return the response
397 resp = common.ErrorGroup()
398 pb_resp = common.ErrorGroup.pb(resp)
399
400 json_format.Parse(response.content, pb_resp, ignore_unknown_fields=True)
401
402 resp = self._interceptor.post_get_group(resp)
403 response_metadata = [(k, str(v)) for k, v in response.headers.items()]
404 resp, _ = self._interceptor.post_get_group_with_metadata(
405 resp, response_metadata
406 )
407 if CLIENT_LOGGING_SUPPORTED and _LOGGER.isEnabledFor(
408 logging.DEBUG
409 ): # pragma: NO COVER
410 try:
411 response_payload = common.ErrorGroup.to_json(response)
412 except:
413 response_payload = None
414 http_response = {
415 "payload": response_payload,
416 "headers": dict(response.headers),
417 "status": response.status_code,
418 }
419 _LOGGER.debug(
420 "Received response for google.devtools.clouderrorreporting_v1beta1.ErrorGroupServiceClient.get_group",
421 extra={
422 "serviceName": "google.devtools.clouderrorreporting.v1beta1.ErrorGroupService",
423 "rpcName": "GetGroup",
424 "metadata": http_response["headers"],
425 "httpResponse": http_response,
426 },
427 )
428 return resp
429
430 class _UpdateGroup(
431 _BaseErrorGroupServiceRestTransport._BaseUpdateGroup, ErrorGroupServiceRestStub
432 ):
433 def __hash__(self):
434 return hash("ErrorGroupServiceRestTransport.UpdateGroup")
435
436 @staticmethod
437 def _get_response(
438 host,
439 metadata,
440 query_params,
441 session,
442 timeout,
443 transcoded_request,
444 body=None,
445 ):
446 uri = transcoded_request["uri"]
447 method = transcoded_request["method"]
448 headers = dict(metadata)
449 headers["Content-Type"] = "application/json"
450 response = getattr(session, method)(
451 "{host}{uri}".format(host=host, uri=uri),
452 timeout=timeout,
453 headers=headers,
454 params=rest_helpers.flatten_query_params(query_params, strict=True),
455 data=body,
456 )
457 return response
458
459 def __call__(
460 self,
461 request: error_group_service.UpdateGroupRequest,
462 *,
463 retry: OptionalRetry = gapic_v1.method.DEFAULT,
464 timeout: Optional[float] = None,
465 metadata: Sequence[Tuple[str, Union[str, bytes]]] = (),
466 ) -> common.ErrorGroup:
467 r"""Call the update group method over HTTP.
468
469 Args:
470 request (~.error_group_service.UpdateGroupRequest):
471 The request object. A request to replace the existing
472 data for the given group.
473 retry (google.api_core.retry.Retry): Designation of what errors, if any,
474 should be retried.
475 timeout (float): The timeout for this request.
476 metadata (Sequence[Tuple[str, Union[str, bytes]]]): Key/value pairs which should be
477 sent along with the request as metadata. Normally, each value must be of type `str`,
478 but for metadata keys ending with the suffix `-bin`, the corresponding values must
479 be of type `bytes`.
480
481 Returns:
482 ~.common.ErrorGroup:
483 Description of a group of similar
484 error events.
485
486 """
487
488 http_options = (
489 _BaseErrorGroupServiceRestTransport._BaseUpdateGroup._get_http_options()
490 )
491
492 request, metadata = self._interceptor.pre_update_group(request, metadata)
493 transcoded_request = _BaseErrorGroupServiceRestTransport._BaseUpdateGroup._get_transcoded_request(
494 http_options, request
495 )
496
497 body = _BaseErrorGroupServiceRestTransport._BaseUpdateGroup._get_request_body_json(
498 transcoded_request
499 )
500
501 # Jsonify the query params
502 query_params = _BaseErrorGroupServiceRestTransport._BaseUpdateGroup._get_query_params_json(
503 transcoded_request
504 )
505
506 if CLIENT_LOGGING_SUPPORTED and _LOGGER.isEnabledFor(
507 logging.DEBUG
508 ): # pragma: NO COVER
509 request_url = "{host}{uri}".format(
510 host=self._host, uri=transcoded_request["uri"]
511 )
512 method = transcoded_request["method"]
513 try:
514 request_payload = type(request).to_json(request)
515 except:
516 request_payload = None
517 http_request = {
518 "payload": request_payload,
519 "requestMethod": method,
520 "requestUrl": request_url,
521 "headers": dict(metadata),
522 }
523 _LOGGER.debug(
524 f"Sending request for google.devtools.clouderrorreporting_v1beta1.ErrorGroupServiceClient.UpdateGroup",
525 extra={
526 "serviceName": "google.devtools.clouderrorreporting.v1beta1.ErrorGroupService",
527 "rpcName": "UpdateGroup",
528 "httpRequest": http_request,
529 "metadata": http_request["headers"],
530 },
531 )
532
533 # Send the request
534 response = ErrorGroupServiceRestTransport._UpdateGroup._get_response(
535 self._host,
536 metadata,
537 query_params,
538 self._session,
539 timeout,
540 transcoded_request,
541 body,
542 )
543
544 # In case of error, raise the appropriate core_exceptions.GoogleAPICallError exception
545 # subclass.
546 if response.status_code >= 400:
547 raise core_exceptions.from_http_response(response)
548
549 # Return the response
550 resp = common.ErrorGroup()
551 pb_resp = common.ErrorGroup.pb(resp)
552
553 json_format.Parse(response.content, pb_resp, ignore_unknown_fields=True)
554
555 resp = self._interceptor.post_update_group(resp)
556 response_metadata = [(k, str(v)) for k, v in response.headers.items()]
557 resp, _ = self._interceptor.post_update_group_with_metadata(
558 resp, response_metadata
559 )
560 if CLIENT_LOGGING_SUPPORTED and _LOGGER.isEnabledFor(
561 logging.DEBUG
562 ): # pragma: NO COVER
563 try:
564 response_payload = common.ErrorGroup.to_json(response)
565 except:
566 response_payload = None
567 http_response = {
568 "payload": response_payload,
569 "headers": dict(response.headers),
570 "status": response.status_code,
571 }
572 _LOGGER.debug(
573 "Received response for google.devtools.clouderrorreporting_v1beta1.ErrorGroupServiceClient.update_group",
574 extra={
575 "serviceName": "google.devtools.clouderrorreporting.v1beta1.ErrorGroupService",
576 "rpcName": "UpdateGroup",
577 "metadata": http_response["headers"],
578 "httpResponse": http_response,
579 },
580 )
581 return resp
582
583 @property
584 def get_group(
585 self,
586 ) -> Callable[[error_group_service.GetGroupRequest], common.ErrorGroup]:
587 # The return type is fine, but mypy isn't sophisticated enough to determine what's going on here.
588 # In C++ this would require a dynamic_cast
589 return self._GetGroup(self._session, self._host, self._interceptor) # type: ignore
590
591 @property
592 def update_group(
593 self,
594 ) -> Callable[[error_group_service.UpdateGroupRequest], common.ErrorGroup]:
595 # The return type is fine, but mypy isn't sophisticated enough to determine what's going on here.
596 # In C++ this would require a dynamic_cast
597 return self._UpdateGroup(self._session, self._host, self._interceptor) # type: ignore
598
599 @property
600 def kind(self) -> str:
601 return "rest"
602
603 def close(self):
604 self._session.close()
605
606
607__all__ = ("ErrorGroupServiceRestTransport",)