1# -*- coding: utf-8 -*-
2# Copyright 2025 Google LLC
3#
4# Licensed under the Apache License, Version 2.0 (the "License");
5# you may not use this file except in compliance with the License.
6# You may obtain a copy of the License at
7#
8# http://www.apache.org/licenses/LICENSE-2.0
9#
10# Unless required by applicable law or agreed to in writing, software
11# distributed under the License is distributed on an "AS IS" BASIS,
12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13# See the License for the specific language governing permissions and
14# limitations under the License.
15#
16import logging
17import json # type: ignore
18
19from google.auth.transport.requests import AuthorizedSession # type: ignore
20from google.auth import credentials as ga_credentials # type: ignore
21from google.api_core import exceptions as core_exceptions
22from google.api_core import retry as retries
23from google.api_core import rest_helpers
24from google.api_core import rest_streaming
25from google.api_core import gapic_v1
26import google.protobuf
27
28from google.protobuf import json_format
29
30from requests import __version__ as requests_version
31import dataclasses
32from typing import Any, Callable, Dict, List, Optional, Sequence, Tuple, Union
33import warnings
34
35
36from google.cloud.errorreporting_v1beta1.types import report_errors_service
37
38
39from .rest_base import _BaseReportErrorsServiceRestTransport
40from .base import DEFAULT_CLIENT_INFO as BASE_DEFAULT_CLIENT_INFO
41
42try:
43 OptionalRetry = Union[retries.Retry, gapic_v1.method._MethodDefault, None]
44except AttributeError: # pragma: NO COVER
45 OptionalRetry = Union[retries.Retry, object, None] # type: ignore
46
47try:
48 from google.api_core import client_logging # type: ignore
49
50 CLIENT_LOGGING_SUPPORTED = True # pragma: NO COVER
51except ImportError: # pragma: NO COVER
52 CLIENT_LOGGING_SUPPORTED = False
53
54_LOGGER = logging.getLogger(__name__)
55
56DEFAULT_CLIENT_INFO = gapic_v1.client_info.ClientInfo(
57 gapic_version=BASE_DEFAULT_CLIENT_INFO.gapic_version,
58 grpc_version=None,
59 rest_version=f"requests@{requests_version}",
60)
61
62if hasattr(DEFAULT_CLIENT_INFO, "protobuf_runtime_version"): # pragma: NO COVER
63 DEFAULT_CLIENT_INFO.protobuf_runtime_version = google.protobuf.__version__
64
65
66class ReportErrorsServiceRestInterceptor:
67 """Interceptor for ReportErrorsService.
68
69 Interceptors are used to manipulate requests, request metadata, and responses
70 in arbitrary ways.
71 Example use cases include:
72 * Logging
73 * Verifying requests according to service or custom semantics
74 * Stripping extraneous information from responses
75
76 These use cases and more can be enabled by injecting an
77 instance of a custom subclass when constructing the ReportErrorsServiceRestTransport.
78
79 .. code-block:: python
80 class MyCustomReportErrorsServiceInterceptor(ReportErrorsServiceRestInterceptor):
81 def pre_report_error_event(self, request, metadata):
82 logging.log(f"Received request: {request}")
83 return request, metadata
84
85 def post_report_error_event(self, response):
86 logging.log(f"Received response: {response}")
87 return response
88
89 transport = ReportErrorsServiceRestTransport(interceptor=MyCustomReportErrorsServiceInterceptor())
90 client = ReportErrorsServiceClient(transport=transport)
91
92
93 """
94
95 def pre_report_error_event(
96 self,
97 request: report_errors_service.ReportErrorEventRequest,
98 metadata: Sequence[Tuple[str, Union[str, bytes]]],
99 ) -> Tuple[
100 report_errors_service.ReportErrorEventRequest,
101 Sequence[Tuple[str, Union[str, bytes]]],
102 ]:
103 """Pre-rpc interceptor for report_error_event
104
105 Override in a subclass to manipulate the request or metadata
106 before they are sent to the ReportErrorsService server.
107 """
108 return request, metadata
109
110 def post_report_error_event(
111 self, response: report_errors_service.ReportErrorEventResponse
112 ) -> report_errors_service.ReportErrorEventResponse:
113 """Post-rpc interceptor for report_error_event
114
115 DEPRECATED. Please use the `post_report_error_event_with_metadata`
116 interceptor instead.
117
118 Override in a subclass to read or manipulate the response
119 after it is returned by the ReportErrorsService server but before
120 it is returned to user code. This `post_report_error_event` interceptor runs
121 before the `post_report_error_event_with_metadata` interceptor.
122 """
123 return response
124
125 def post_report_error_event_with_metadata(
126 self,
127 response: report_errors_service.ReportErrorEventResponse,
128 metadata: Sequence[Tuple[str, Union[str, bytes]]],
129 ) -> Tuple[
130 report_errors_service.ReportErrorEventResponse,
131 Sequence[Tuple[str, Union[str, bytes]]],
132 ]:
133 """Post-rpc interceptor for report_error_event
134
135 Override in a subclass to read or manipulate the response or metadata after it
136 is returned by the ReportErrorsService server but before it is returned to user code.
137
138 We recommend only using this `post_report_error_event_with_metadata`
139 interceptor in new development instead of the `post_report_error_event` interceptor.
140 When both interceptors are used, this `post_report_error_event_with_metadata` interceptor runs after the
141 `post_report_error_event` interceptor. The (possibly modified) response returned by
142 `post_report_error_event` will be passed to
143 `post_report_error_event_with_metadata`.
144 """
145 return response, metadata
146
147
148@dataclasses.dataclass
149class ReportErrorsServiceRestStub:
150 _session: AuthorizedSession
151 _host: str
152 _interceptor: ReportErrorsServiceRestInterceptor
153
154
155class ReportErrorsServiceRestTransport(_BaseReportErrorsServiceRestTransport):
156 """REST backend synchronous transport for ReportErrorsService.
157
158 An API for reporting error events.
159
160 This class defines the same methods as the primary client, so the
161 primary client can load the underlying transport implementation
162 and call it.
163
164 It sends JSON representations of protocol buffers over HTTP/1.1
165 """
166
167 def __init__(
168 self,
169 *,
170 host: str = "clouderrorreporting.googleapis.com",
171 credentials: Optional[ga_credentials.Credentials] = None,
172 credentials_file: Optional[str] = None,
173 scopes: Optional[Sequence[str]] = None,
174 client_cert_source_for_mtls: Optional[Callable[[], Tuple[bytes, bytes]]] = None,
175 quota_project_id: Optional[str] = None,
176 client_info: gapic_v1.client_info.ClientInfo = DEFAULT_CLIENT_INFO,
177 always_use_jwt_access: Optional[bool] = False,
178 url_scheme: str = "https",
179 interceptor: Optional[ReportErrorsServiceRestInterceptor] = None,
180 api_audience: Optional[str] = None,
181 ) -> None:
182 """Instantiate the transport.
183
184 Args:
185 host (Optional[str]):
186 The hostname to connect to (default: 'clouderrorreporting.googleapis.com').
187 credentials (Optional[google.auth.credentials.Credentials]): The
188 authorization credentials to attach to requests. These
189 credentials identify the application to the service; if none
190 are specified, the client will attempt to ascertain the
191 credentials from the environment.
192
193 credentials_file (Optional[str]): A file with credentials that can
194 be loaded with :func:`google.auth.load_credentials_from_file`.
195 This argument is ignored if ``channel`` is provided.
196 scopes (Optional(Sequence[str])): A list of scopes. This argument is
197 ignored if ``channel`` is provided.
198 client_cert_source_for_mtls (Callable[[], Tuple[bytes, bytes]]): Client
199 certificate to configure mutual TLS HTTP channel. It is ignored
200 if ``channel`` is provided.
201 quota_project_id (Optional[str]): An optional project to use for billing
202 and quota.
203 client_info (google.api_core.gapic_v1.client_info.ClientInfo):
204 The client info used to send a user-agent string along with
205 API requests. If ``None``, then default info will be used.
206 Generally, you only need to set this if you are developing
207 your own client library.
208 always_use_jwt_access (Optional[bool]): Whether self signed JWT should
209 be used for service account credentials.
210 url_scheme: the protocol scheme for the API endpoint. Normally
211 "https", but for testing or local servers,
212 "http" can be specified.
213 """
214 # Run the base constructor
215 # TODO(yon-mg): resolve other ctor params i.e. scopes, quota, etc.
216 # TODO: When custom host (api_endpoint) is set, `scopes` must *also* be set on the
217 # credentials object
218 super().__init__(
219 host=host,
220 credentials=credentials,
221 client_info=client_info,
222 always_use_jwt_access=always_use_jwt_access,
223 url_scheme=url_scheme,
224 api_audience=api_audience,
225 )
226 self._session = AuthorizedSession(
227 self._credentials, default_host=self.DEFAULT_HOST
228 )
229 if client_cert_source_for_mtls:
230 self._session.configure_mtls_channel(client_cert_source_for_mtls)
231 self._interceptor = interceptor or ReportErrorsServiceRestInterceptor()
232 self._prep_wrapped_messages(client_info)
233
234 class _ReportErrorEvent(
235 _BaseReportErrorsServiceRestTransport._BaseReportErrorEvent,
236 ReportErrorsServiceRestStub,
237 ):
238 def __hash__(self):
239 return hash("ReportErrorsServiceRestTransport.ReportErrorEvent")
240
241 @staticmethod
242 def _get_response(
243 host,
244 metadata,
245 query_params,
246 session,
247 timeout,
248 transcoded_request,
249 body=None,
250 ):
251 uri = transcoded_request["uri"]
252 method = transcoded_request["method"]
253 headers = dict(metadata)
254 headers["Content-Type"] = "application/json"
255 response = getattr(session, method)(
256 "{host}{uri}".format(host=host, uri=uri),
257 timeout=timeout,
258 headers=headers,
259 params=rest_helpers.flatten_query_params(query_params, strict=True),
260 data=body,
261 )
262 return response
263
264 def __call__(
265 self,
266 request: report_errors_service.ReportErrorEventRequest,
267 *,
268 retry: OptionalRetry = gapic_v1.method.DEFAULT,
269 timeout: Optional[float] = None,
270 metadata: Sequence[Tuple[str, Union[str, bytes]]] = (),
271 ) -> report_errors_service.ReportErrorEventResponse:
272 r"""Call the report error event method over HTTP.
273
274 Args:
275 request (~.report_errors_service.ReportErrorEventRequest):
276 The request object. A request for reporting an individual
277 error event.
278 retry (google.api_core.retry.Retry): Designation of what errors, if any,
279 should be retried.
280 timeout (float): The timeout for this request.
281 metadata (Sequence[Tuple[str, Union[str, bytes]]]): Key/value pairs which should be
282 sent along with the request as metadata. Normally, each value must be of type `str`,
283 but for metadata keys ending with the suffix `-bin`, the corresponding values must
284 be of type `bytes`.
285
286 Returns:
287 ~.report_errors_service.ReportErrorEventResponse:
288 Response for reporting an individual
289 error event. Data may be added to this
290 message in the future.
291
292 """
293
294 http_options = (
295 _BaseReportErrorsServiceRestTransport._BaseReportErrorEvent._get_http_options()
296 )
297
298 request, metadata = self._interceptor.pre_report_error_event(
299 request, metadata
300 )
301 transcoded_request = _BaseReportErrorsServiceRestTransport._BaseReportErrorEvent._get_transcoded_request(
302 http_options, request
303 )
304
305 body = _BaseReportErrorsServiceRestTransport._BaseReportErrorEvent._get_request_body_json(
306 transcoded_request
307 )
308
309 # Jsonify the query params
310 query_params = _BaseReportErrorsServiceRestTransport._BaseReportErrorEvent._get_query_params_json(
311 transcoded_request
312 )
313
314 if CLIENT_LOGGING_SUPPORTED and _LOGGER.isEnabledFor(
315 logging.DEBUG
316 ): # pragma: NO COVER
317 request_url = "{host}{uri}".format(
318 host=self._host, uri=transcoded_request["uri"]
319 )
320 method = transcoded_request["method"]
321 try:
322 request_payload = type(request).to_json(request)
323 except:
324 request_payload = None
325 http_request = {
326 "payload": request_payload,
327 "requestMethod": method,
328 "requestUrl": request_url,
329 "headers": dict(metadata),
330 }
331 _LOGGER.debug(
332 f"Sending request for google.devtools.clouderrorreporting_v1beta1.ReportErrorsServiceClient.ReportErrorEvent",
333 extra={
334 "serviceName": "google.devtools.clouderrorreporting.v1beta1.ReportErrorsService",
335 "rpcName": "ReportErrorEvent",
336 "httpRequest": http_request,
337 "metadata": http_request["headers"],
338 },
339 )
340
341 # Send the request
342 response = ReportErrorsServiceRestTransport._ReportErrorEvent._get_response(
343 self._host,
344 metadata,
345 query_params,
346 self._session,
347 timeout,
348 transcoded_request,
349 body,
350 )
351
352 # In case of error, raise the appropriate core_exceptions.GoogleAPICallError exception
353 # subclass.
354 if response.status_code >= 400:
355 raise core_exceptions.from_http_response(response)
356
357 # Return the response
358 resp = report_errors_service.ReportErrorEventResponse()
359 pb_resp = report_errors_service.ReportErrorEventResponse.pb(resp)
360
361 json_format.Parse(response.content, pb_resp, ignore_unknown_fields=True)
362
363 resp = self._interceptor.post_report_error_event(resp)
364 response_metadata = [(k, str(v)) for k, v in response.headers.items()]
365 resp, _ = self._interceptor.post_report_error_event_with_metadata(
366 resp, response_metadata
367 )
368 if CLIENT_LOGGING_SUPPORTED and _LOGGER.isEnabledFor(
369 logging.DEBUG
370 ): # pragma: NO COVER
371 try:
372 response_payload = (
373 report_errors_service.ReportErrorEventResponse.to_json(response)
374 )
375 except:
376 response_payload = None
377 http_response = {
378 "payload": response_payload,
379 "headers": dict(response.headers),
380 "status": response.status_code,
381 }
382 _LOGGER.debug(
383 "Received response for google.devtools.clouderrorreporting_v1beta1.ReportErrorsServiceClient.report_error_event",
384 extra={
385 "serviceName": "google.devtools.clouderrorreporting.v1beta1.ReportErrorsService",
386 "rpcName": "ReportErrorEvent",
387 "metadata": http_response["headers"],
388 "httpResponse": http_response,
389 },
390 )
391 return resp
392
393 @property
394 def report_error_event(
395 self,
396 ) -> Callable[
397 [report_errors_service.ReportErrorEventRequest],
398 report_errors_service.ReportErrorEventResponse,
399 ]:
400 # The return type is fine, but mypy isn't sophisticated enough to determine what's going on here.
401 # In C++ this would require a dynamic_cast
402 return self._ReportErrorEvent(self._session, self._host, self._interceptor) # type: ignore
403
404 @property
405 def kind(self) -> str:
406 return "rest"
407
408 def close(self):
409 self._session.close()
410
411
412__all__ = ("ReportErrorsServiceRestTransport",)