1# -*- coding: utf-8 -*-
2# Copyright 2025 Google LLC
3#
4# Licensed under the Apache License, Version 2.0 (the "License");
5# you may not use this file except in compliance with the License.
6# You may obtain a copy of the License at
7#
8# http://www.apache.org/licenses/LICENSE-2.0
9#
10# Unless required by applicable law or agreed to in writing, software
11# distributed under the License is distributed on an "AS IS" BASIS,
12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13# See the License for the specific language governing permissions and
14# limitations under the License.
15#
16import logging
17import json # type: ignore
18
19from google.auth.transport.requests import AuthorizedSession # type: ignore
20from google.auth import credentials as ga_credentials # type: ignore
21from google.api_core import exceptions as core_exceptions
22from google.api_core import retry as retries
23from google.api_core import rest_helpers
24from google.api_core import rest_streaming
25from google.api_core import gapic_v1
26import google.protobuf
27
28from google.protobuf import json_format
29
30from requests import __version__ as requests_version
31import dataclasses
32from typing import Any, Callable, Dict, List, Optional, Sequence, Tuple, Union
33import warnings
34
35
36from google.cloud.errorreporting_v1beta1.types import report_errors_service
37
38
39from .rest_base import _BaseReportErrorsServiceRestTransport
40from .base import DEFAULT_CLIENT_INFO as BASE_DEFAULT_CLIENT_INFO
41
42try:
43 OptionalRetry = Union[retries.Retry, gapic_v1.method._MethodDefault, None]
44except AttributeError: # pragma: NO COVER
45 OptionalRetry = Union[retries.Retry, object, None] # type: ignore
46
47try:
48 from google.api_core import client_logging # type: ignore
49
50 CLIENT_LOGGING_SUPPORTED = True # pragma: NO COVER
51except ImportError: # pragma: NO COVER
52 CLIENT_LOGGING_SUPPORTED = False
53
54_LOGGER = logging.getLogger(__name__)
55
56DEFAULT_CLIENT_INFO = gapic_v1.client_info.ClientInfo(
57 gapic_version=BASE_DEFAULT_CLIENT_INFO.gapic_version,
58 grpc_version=None,
59 rest_version=f"requests@{requests_version}",
60)
61
62if hasattr(DEFAULT_CLIENT_INFO, "protobuf_runtime_version"): # pragma: NO COVER
63 DEFAULT_CLIENT_INFO.protobuf_runtime_version = google.protobuf.__version__
64
65
66class ReportErrorsServiceRestInterceptor:
67 """Interceptor for ReportErrorsService.
68
69 Interceptors are used to manipulate requests, request metadata, and responses
70 in arbitrary ways.
71 Example use cases include:
72 * Logging
73 * Verifying requests according to service or custom semantics
74 * Stripping extraneous information from responses
75
76 These use cases and more can be enabled by injecting an
77 instance of a custom subclass when constructing the ReportErrorsServiceRestTransport.
78
79 .. code-block:: python
80 class MyCustomReportErrorsServiceInterceptor(ReportErrorsServiceRestInterceptor):
81 def pre_report_error_event(self, request, metadata):
82 logging.log(f"Received request: {request}")
83 return request, metadata
84
85 def post_report_error_event(self, response):
86 logging.log(f"Received response: {response}")
87 return response
88
89 transport = ReportErrorsServiceRestTransport(interceptor=MyCustomReportErrorsServiceInterceptor())
90 client = ReportErrorsServiceClient(transport=transport)
91
92
93 """
94
95 def pre_report_error_event(
96 self,
97 request: report_errors_service.ReportErrorEventRequest,
98 metadata: Sequence[Tuple[str, Union[str, bytes]]],
99 ) -> Tuple[
100 report_errors_service.ReportErrorEventRequest,
101 Sequence[Tuple[str, Union[str, bytes]]],
102 ]:
103 """Pre-rpc interceptor for report_error_event
104
105 Override in a subclass to manipulate the request or metadata
106 before they are sent to the ReportErrorsService server.
107 """
108 return request, metadata
109
110 def post_report_error_event(
111 self, response: report_errors_service.ReportErrorEventResponse
112 ) -> report_errors_service.ReportErrorEventResponse:
113 """Post-rpc interceptor for report_error_event
114
115 DEPRECATED. Please use the `post_report_error_event_with_metadata`
116 interceptor instead.
117
118 Override in a subclass to read or manipulate the response
119 after it is returned by the ReportErrorsService server but before
120 it is returned to user code. This `post_report_error_event` interceptor runs
121 before the `post_report_error_event_with_metadata` interceptor.
122 """
123 return response
124
125 def post_report_error_event_with_metadata(
126 self,
127 response: report_errors_service.ReportErrorEventResponse,
128 metadata: Sequence[Tuple[str, Union[str, bytes]]],
129 ) -> Tuple[
130 report_errors_service.ReportErrorEventResponse,
131 Sequence[Tuple[str, Union[str, bytes]]],
132 ]:
133 """Post-rpc interceptor for report_error_event
134
135 Override in a subclass to read or manipulate the response or metadata after it
136 is returned by the ReportErrorsService server but before it is returned to user code.
137
138 We recommend only using this `post_report_error_event_with_metadata`
139 interceptor in new development instead of the `post_report_error_event` interceptor.
140 When both interceptors are used, this `post_report_error_event_with_metadata` interceptor runs after the
141 `post_report_error_event` interceptor. The (possibly modified) response returned by
142 `post_report_error_event` will be passed to
143 `post_report_error_event_with_metadata`.
144 """
145 return response, metadata
146
147
148@dataclasses.dataclass
149class ReportErrorsServiceRestStub:
150 _session: AuthorizedSession
151 _host: str
152 _interceptor: ReportErrorsServiceRestInterceptor
153
154
155class ReportErrorsServiceRestTransport(_BaseReportErrorsServiceRestTransport):
156 """REST backend synchronous transport for ReportErrorsService.
157
158 An API for reporting error events.
159
160 This class defines the same methods as the primary client, so the
161 primary client can load the underlying transport implementation
162 and call it.
163
164 It sends JSON representations of protocol buffers over HTTP/1.1
165 """
166
167 def __init__(
168 self,
169 *,
170 host: str = "clouderrorreporting.googleapis.com",
171 credentials: Optional[ga_credentials.Credentials] = None,
172 credentials_file: Optional[str] = None,
173 scopes: Optional[Sequence[str]] = None,
174 client_cert_source_for_mtls: Optional[Callable[[], Tuple[bytes, bytes]]] = None,
175 quota_project_id: Optional[str] = None,
176 client_info: gapic_v1.client_info.ClientInfo = DEFAULT_CLIENT_INFO,
177 always_use_jwt_access: Optional[bool] = False,
178 url_scheme: str = "https",
179 interceptor: Optional[ReportErrorsServiceRestInterceptor] = None,
180 api_audience: Optional[str] = None,
181 ) -> None:
182 """Instantiate the transport.
183
184 Args:
185 host (Optional[str]):
186 The hostname to connect to (default: 'clouderrorreporting.googleapis.com').
187 credentials (Optional[google.auth.credentials.Credentials]): The
188 authorization credentials to attach to requests. These
189 credentials identify the application to the service; if none
190 are specified, the client will attempt to ascertain the
191 credentials from the environment.
192
193 credentials_file (Optional[str]): Deprecated. A file with credentials that can
194 be loaded with :func:`google.auth.load_credentials_from_file`.
195 This argument is ignored if ``channel`` is provided. This argument will be
196 removed in the next major version of this library.
197 scopes (Optional(Sequence[str])): A list of scopes. This argument is
198 ignored if ``channel`` is provided.
199 client_cert_source_for_mtls (Callable[[], Tuple[bytes, bytes]]): Client
200 certificate to configure mutual TLS HTTP channel. It is ignored
201 if ``channel`` is provided.
202 quota_project_id (Optional[str]): An optional project to use for billing
203 and quota.
204 client_info (google.api_core.gapic_v1.client_info.ClientInfo):
205 The client info used to send a user-agent string along with
206 API requests. If ``None``, then default info will be used.
207 Generally, you only need to set this if you are developing
208 your own client library.
209 always_use_jwt_access (Optional[bool]): Whether self signed JWT should
210 be used for service account credentials.
211 url_scheme: the protocol scheme for the API endpoint. Normally
212 "https", but for testing or local servers,
213 "http" can be specified.
214 """
215 # Run the base constructor
216 # TODO(yon-mg): resolve other ctor params i.e. scopes, quota, etc.
217 # TODO: When custom host (api_endpoint) is set, `scopes` must *also* be set on the
218 # credentials object
219 super().__init__(
220 host=host,
221 credentials=credentials,
222 client_info=client_info,
223 always_use_jwt_access=always_use_jwt_access,
224 url_scheme=url_scheme,
225 api_audience=api_audience,
226 )
227 self._session = AuthorizedSession(
228 self._credentials, default_host=self.DEFAULT_HOST
229 )
230 if client_cert_source_for_mtls:
231 self._session.configure_mtls_channel(client_cert_source_for_mtls)
232 self._interceptor = interceptor or ReportErrorsServiceRestInterceptor()
233 self._prep_wrapped_messages(client_info)
234
235 class _ReportErrorEvent(
236 _BaseReportErrorsServiceRestTransport._BaseReportErrorEvent,
237 ReportErrorsServiceRestStub,
238 ):
239 def __hash__(self):
240 return hash("ReportErrorsServiceRestTransport.ReportErrorEvent")
241
242 @staticmethod
243 def _get_response(
244 host,
245 metadata,
246 query_params,
247 session,
248 timeout,
249 transcoded_request,
250 body=None,
251 ):
252 uri = transcoded_request["uri"]
253 method = transcoded_request["method"]
254 headers = dict(metadata)
255 headers["Content-Type"] = "application/json"
256 response = getattr(session, method)(
257 "{host}{uri}".format(host=host, uri=uri),
258 timeout=timeout,
259 headers=headers,
260 params=rest_helpers.flatten_query_params(query_params, strict=True),
261 data=body,
262 )
263 return response
264
265 def __call__(
266 self,
267 request: report_errors_service.ReportErrorEventRequest,
268 *,
269 retry: OptionalRetry = gapic_v1.method.DEFAULT,
270 timeout: Optional[float] = None,
271 metadata: Sequence[Tuple[str, Union[str, bytes]]] = (),
272 ) -> report_errors_service.ReportErrorEventResponse:
273 r"""Call the report error event method over HTTP.
274
275 Args:
276 request (~.report_errors_service.ReportErrorEventRequest):
277 The request object. A request for reporting an individual
278 error event.
279 retry (google.api_core.retry.Retry): Designation of what errors, if any,
280 should be retried.
281 timeout (float): The timeout for this request.
282 metadata (Sequence[Tuple[str, Union[str, bytes]]]): Key/value pairs which should be
283 sent along with the request as metadata. Normally, each value must be of type `str`,
284 but for metadata keys ending with the suffix `-bin`, the corresponding values must
285 be of type `bytes`.
286
287 Returns:
288 ~.report_errors_service.ReportErrorEventResponse:
289 Response for reporting an individual
290 error event. Data may be added to this
291 message in the future.
292
293 """
294
295 http_options = (
296 _BaseReportErrorsServiceRestTransport._BaseReportErrorEvent._get_http_options()
297 )
298
299 request, metadata = self._interceptor.pre_report_error_event(
300 request, metadata
301 )
302 transcoded_request = _BaseReportErrorsServiceRestTransport._BaseReportErrorEvent._get_transcoded_request(
303 http_options, request
304 )
305
306 body = _BaseReportErrorsServiceRestTransport._BaseReportErrorEvent._get_request_body_json(
307 transcoded_request
308 )
309
310 # Jsonify the query params
311 query_params = _BaseReportErrorsServiceRestTransport._BaseReportErrorEvent._get_query_params_json(
312 transcoded_request
313 )
314
315 if CLIENT_LOGGING_SUPPORTED and _LOGGER.isEnabledFor(
316 logging.DEBUG
317 ): # pragma: NO COVER
318 request_url = "{host}{uri}".format(
319 host=self._host, uri=transcoded_request["uri"]
320 )
321 method = transcoded_request["method"]
322 try:
323 request_payload = type(request).to_json(request)
324 except:
325 request_payload = None
326 http_request = {
327 "payload": request_payload,
328 "requestMethod": method,
329 "requestUrl": request_url,
330 "headers": dict(metadata),
331 }
332 _LOGGER.debug(
333 f"Sending request for google.devtools.clouderrorreporting_v1beta1.ReportErrorsServiceClient.ReportErrorEvent",
334 extra={
335 "serviceName": "google.devtools.clouderrorreporting.v1beta1.ReportErrorsService",
336 "rpcName": "ReportErrorEvent",
337 "httpRequest": http_request,
338 "metadata": http_request["headers"],
339 },
340 )
341
342 # Send the request
343 response = ReportErrorsServiceRestTransport._ReportErrorEvent._get_response(
344 self._host,
345 metadata,
346 query_params,
347 self._session,
348 timeout,
349 transcoded_request,
350 body,
351 )
352
353 # In case of error, raise the appropriate core_exceptions.GoogleAPICallError exception
354 # subclass.
355 if response.status_code >= 400:
356 raise core_exceptions.from_http_response(response)
357
358 # Return the response
359 resp = report_errors_service.ReportErrorEventResponse()
360 pb_resp = report_errors_service.ReportErrorEventResponse.pb(resp)
361
362 json_format.Parse(response.content, pb_resp, ignore_unknown_fields=True)
363
364 resp = self._interceptor.post_report_error_event(resp)
365 response_metadata = [(k, str(v)) for k, v in response.headers.items()]
366 resp, _ = self._interceptor.post_report_error_event_with_metadata(
367 resp, response_metadata
368 )
369 if CLIENT_LOGGING_SUPPORTED and _LOGGER.isEnabledFor(
370 logging.DEBUG
371 ): # pragma: NO COVER
372 try:
373 response_payload = (
374 report_errors_service.ReportErrorEventResponse.to_json(response)
375 )
376 except:
377 response_payload = None
378 http_response = {
379 "payload": response_payload,
380 "headers": dict(response.headers),
381 "status": response.status_code,
382 }
383 _LOGGER.debug(
384 "Received response for google.devtools.clouderrorreporting_v1beta1.ReportErrorsServiceClient.report_error_event",
385 extra={
386 "serviceName": "google.devtools.clouderrorreporting.v1beta1.ReportErrorsService",
387 "rpcName": "ReportErrorEvent",
388 "metadata": http_response["headers"],
389 "httpResponse": http_response,
390 },
391 )
392 return resp
393
394 @property
395 def report_error_event(
396 self,
397 ) -> Callable[
398 [report_errors_service.ReportErrorEventRequest],
399 report_errors_service.ReportErrorEventResponse,
400 ]:
401 # The return type is fine, but mypy isn't sophisticated enough to determine what's going on here.
402 # In C++ this would require a dynamic_cast
403 return self._ReportErrorEvent(self._session, self._host, self._interceptor) # type: ignore
404
405 @property
406 def kind(self) -> str:
407 return "rest"
408
409 def close(self):
410 self._session.close()
411
412
413__all__ = ("ReportErrorsServiceRestTransport",)