Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/google/cloud/errorreporting_v1beta1/services/report_errors_service/transports/rest.py: 57%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

81 statements  

1# -*- coding: utf-8 -*- 

2# Copyright 2025 Google LLC 

3# 

4# Licensed under the Apache License, Version 2.0 (the "License"); 

5# you may not use this file except in compliance with the License. 

6# You may obtain a copy of the License at 

7# 

8# http://www.apache.org/licenses/LICENSE-2.0 

9# 

10# Unless required by applicable law or agreed to in writing, software 

11# distributed under the License is distributed on an "AS IS" BASIS, 

12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

13# See the License for the specific language governing permissions and 

14# limitations under the License. 

15# 

16import logging 

17import json # type: ignore 

18 

19from google.auth.transport.requests import AuthorizedSession # type: ignore 

20from google.auth import credentials as ga_credentials # type: ignore 

21from google.api_core import exceptions as core_exceptions 

22from google.api_core import retry as retries 

23from google.api_core import rest_helpers 

24from google.api_core import rest_streaming 

25from google.api_core import gapic_v1 

26import google.protobuf 

27 

28from google.protobuf import json_format 

29 

30from requests import __version__ as requests_version 

31import dataclasses 

32from typing import Any, Callable, Dict, List, Optional, Sequence, Tuple, Union 

33import warnings 

34 

35 

36from google.cloud.errorreporting_v1beta1.types import report_errors_service 

37 

38 

39from .rest_base import _BaseReportErrorsServiceRestTransport 

40from .base import DEFAULT_CLIENT_INFO as BASE_DEFAULT_CLIENT_INFO 

41 

42try: 

43 OptionalRetry = Union[retries.Retry, gapic_v1.method._MethodDefault, None] 

44except AttributeError: # pragma: NO COVER 

45 OptionalRetry = Union[retries.Retry, object, None] # type: ignore 

46 

47try: 

48 from google.api_core import client_logging # type: ignore 

49 

50 CLIENT_LOGGING_SUPPORTED = True # pragma: NO COVER 

51except ImportError: # pragma: NO COVER 

52 CLIENT_LOGGING_SUPPORTED = False 

53 

54_LOGGER = logging.getLogger(__name__) 

55 

56DEFAULT_CLIENT_INFO = gapic_v1.client_info.ClientInfo( 

57 gapic_version=BASE_DEFAULT_CLIENT_INFO.gapic_version, 

58 grpc_version=None, 

59 rest_version=f"requests@{requests_version}", 

60) 

61 

62if hasattr(DEFAULT_CLIENT_INFO, "protobuf_runtime_version"): # pragma: NO COVER 

63 DEFAULT_CLIENT_INFO.protobuf_runtime_version = google.protobuf.__version__ 

64 

65 

66class ReportErrorsServiceRestInterceptor: 

67 """Interceptor for ReportErrorsService. 

68 

69 Interceptors are used to manipulate requests, request metadata, and responses 

70 in arbitrary ways. 

71 Example use cases include: 

72 * Logging 

73 * Verifying requests according to service or custom semantics 

74 * Stripping extraneous information from responses 

75 

76 These use cases and more can be enabled by injecting an 

77 instance of a custom subclass when constructing the ReportErrorsServiceRestTransport. 

78 

79 .. code-block:: python 

80 class MyCustomReportErrorsServiceInterceptor(ReportErrorsServiceRestInterceptor): 

81 def pre_report_error_event(self, request, metadata): 

82 logging.log(f"Received request: {request}") 

83 return request, metadata 

84 

85 def post_report_error_event(self, response): 

86 logging.log(f"Received response: {response}") 

87 return response 

88 

89 transport = ReportErrorsServiceRestTransport(interceptor=MyCustomReportErrorsServiceInterceptor()) 

90 client = ReportErrorsServiceClient(transport=transport) 

91 

92 

93 """ 

94 

95 def pre_report_error_event( 

96 self, 

97 request: report_errors_service.ReportErrorEventRequest, 

98 metadata: Sequence[Tuple[str, Union[str, bytes]]], 

99 ) -> Tuple[ 

100 report_errors_service.ReportErrorEventRequest, 

101 Sequence[Tuple[str, Union[str, bytes]]], 

102 ]: 

103 """Pre-rpc interceptor for report_error_event 

104 

105 Override in a subclass to manipulate the request or metadata 

106 before they are sent to the ReportErrorsService server. 

107 """ 

108 return request, metadata 

109 

110 def post_report_error_event( 

111 self, response: report_errors_service.ReportErrorEventResponse 

112 ) -> report_errors_service.ReportErrorEventResponse: 

113 """Post-rpc interceptor for report_error_event 

114 

115 DEPRECATED. Please use the `post_report_error_event_with_metadata` 

116 interceptor instead. 

117 

118 Override in a subclass to read or manipulate the response 

119 after it is returned by the ReportErrorsService server but before 

120 it is returned to user code. This `post_report_error_event` interceptor runs 

121 before the `post_report_error_event_with_metadata` interceptor. 

122 """ 

123 return response 

124 

125 def post_report_error_event_with_metadata( 

126 self, 

127 response: report_errors_service.ReportErrorEventResponse, 

128 metadata: Sequence[Tuple[str, Union[str, bytes]]], 

129 ) -> Tuple[ 

130 report_errors_service.ReportErrorEventResponse, 

131 Sequence[Tuple[str, Union[str, bytes]]], 

132 ]: 

133 """Post-rpc interceptor for report_error_event 

134 

135 Override in a subclass to read or manipulate the response or metadata after it 

136 is returned by the ReportErrorsService server but before it is returned to user code. 

137 

138 We recommend only using this `post_report_error_event_with_metadata` 

139 interceptor in new development instead of the `post_report_error_event` interceptor. 

140 When both interceptors are used, this `post_report_error_event_with_metadata` interceptor runs after the 

141 `post_report_error_event` interceptor. The (possibly modified) response returned by 

142 `post_report_error_event` will be passed to 

143 `post_report_error_event_with_metadata`. 

144 """ 

145 return response, metadata 

146 

147 

148@dataclasses.dataclass 

149class ReportErrorsServiceRestStub: 

150 _session: AuthorizedSession 

151 _host: str 

152 _interceptor: ReportErrorsServiceRestInterceptor 

153 

154 

155class ReportErrorsServiceRestTransport(_BaseReportErrorsServiceRestTransport): 

156 """REST backend synchronous transport for ReportErrorsService. 

157 

158 An API for reporting error events. 

159 

160 This class defines the same methods as the primary client, so the 

161 primary client can load the underlying transport implementation 

162 and call it. 

163 

164 It sends JSON representations of protocol buffers over HTTP/1.1 

165 """ 

166 

167 def __init__( 

168 self, 

169 *, 

170 host: str = "clouderrorreporting.googleapis.com", 

171 credentials: Optional[ga_credentials.Credentials] = None, 

172 credentials_file: Optional[str] = None, 

173 scopes: Optional[Sequence[str]] = None, 

174 client_cert_source_for_mtls: Optional[Callable[[], Tuple[bytes, bytes]]] = None, 

175 quota_project_id: Optional[str] = None, 

176 client_info: gapic_v1.client_info.ClientInfo = DEFAULT_CLIENT_INFO, 

177 always_use_jwt_access: Optional[bool] = False, 

178 url_scheme: str = "https", 

179 interceptor: Optional[ReportErrorsServiceRestInterceptor] = None, 

180 api_audience: Optional[str] = None, 

181 ) -> None: 

182 """Instantiate the transport. 

183 

184 Args: 

185 host (Optional[str]): 

186 The hostname to connect to (default: 'clouderrorreporting.googleapis.com'). 

187 credentials (Optional[google.auth.credentials.Credentials]): The 

188 authorization credentials to attach to requests. These 

189 credentials identify the application to the service; if none 

190 are specified, the client will attempt to ascertain the 

191 credentials from the environment. 

192 

193 credentials_file (Optional[str]): A file with credentials that can 

194 be loaded with :func:`google.auth.load_credentials_from_file`. 

195 This argument is ignored if ``channel`` is provided. 

196 scopes (Optional(Sequence[str])): A list of scopes. This argument is 

197 ignored if ``channel`` is provided. 

198 client_cert_source_for_mtls (Callable[[], Tuple[bytes, bytes]]): Client 

199 certificate to configure mutual TLS HTTP channel. It is ignored 

200 if ``channel`` is provided. 

201 quota_project_id (Optional[str]): An optional project to use for billing 

202 and quota. 

203 client_info (google.api_core.gapic_v1.client_info.ClientInfo): 

204 The client info used to send a user-agent string along with 

205 API requests. If ``None``, then default info will be used. 

206 Generally, you only need to set this if you are developing 

207 your own client library. 

208 always_use_jwt_access (Optional[bool]): Whether self signed JWT should 

209 be used for service account credentials. 

210 url_scheme: the protocol scheme for the API endpoint. Normally 

211 "https", but for testing or local servers, 

212 "http" can be specified. 

213 """ 

214 # Run the base constructor 

215 # TODO(yon-mg): resolve other ctor params i.e. scopes, quota, etc. 

216 # TODO: When custom host (api_endpoint) is set, `scopes` must *also* be set on the 

217 # credentials object 

218 super().__init__( 

219 host=host, 

220 credentials=credentials, 

221 client_info=client_info, 

222 always_use_jwt_access=always_use_jwt_access, 

223 url_scheme=url_scheme, 

224 api_audience=api_audience, 

225 ) 

226 self._session = AuthorizedSession( 

227 self._credentials, default_host=self.DEFAULT_HOST 

228 ) 

229 if client_cert_source_for_mtls: 

230 self._session.configure_mtls_channel(client_cert_source_for_mtls) 

231 self._interceptor = interceptor or ReportErrorsServiceRestInterceptor() 

232 self._prep_wrapped_messages(client_info) 

233 

234 class _ReportErrorEvent( 

235 _BaseReportErrorsServiceRestTransport._BaseReportErrorEvent, 

236 ReportErrorsServiceRestStub, 

237 ): 

238 def __hash__(self): 

239 return hash("ReportErrorsServiceRestTransport.ReportErrorEvent") 

240 

241 @staticmethod 

242 def _get_response( 

243 host, 

244 metadata, 

245 query_params, 

246 session, 

247 timeout, 

248 transcoded_request, 

249 body=None, 

250 ): 

251 uri = transcoded_request["uri"] 

252 method = transcoded_request["method"] 

253 headers = dict(metadata) 

254 headers["Content-Type"] = "application/json" 

255 response = getattr(session, method)( 

256 "{host}{uri}".format(host=host, uri=uri), 

257 timeout=timeout, 

258 headers=headers, 

259 params=rest_helpers.flatten_query_params(query_params, strict=True), 

260 data=body, 

261 ) 

262 return response 

263 

264 def __call__( 

265 self, 

266 request: report_errors_service.ReportErrorEventRequest, 

267 *, 

268 retry: OptionalRetry = gapic_v1.method.DEFAULT, 

269 timeout: Optional[float] = None, 

270 metadata: Sequence[Tuple[str, Union[str, bytes]]] = (), 

271 ) -> report_errors_service.ReportErrorEventResponse: 

272 r"""Call the report error event method over HTTP. 

273 

274 Args: 

275 request (~.report_errors_service.ReportErrorEventRequest): 

276 The request object. A request for reporting an individual 

277 error event. 

278 retry (google.api_core.retry.Retry): Designation of what errors, if any, 

279 should be retried. 

280 timeout (float): The timeout for this request. 

281 metadata (Sequence[Tuple[str, Union[str, bytes]]]): Key/value pairs which should be 

282 sent along with the request as metadata. Normally, each value must be of type `str`, 

283 but for metadata keys ending with the suffix `-bin`, the corresponding values must 

284 be of type `bytes`. 

285 

286 Returns: 

287 ~.report_errors_service.ReportErrorEventResponse: 

288 Response for reporting an individual 

289 error event. Data may be added to this 

290 message in the future. 

291 

292 """ 

293 

294 http_options = ( 

295 _BaseReportErrorsServiceRestTransport._BaseReportErrorEvent._get_http_options() 

296 ) 

297 

298 request, metadata = self._interceptor.pre_report_error_event( 

299 request, metadata 

300 ) 

301 transcoded_request = _BaseReportErrorsServiceRestTransport._BaseReportErrorEvent._get_transcoded_request( 

302 http_options, request 

303 ) 

304 

305 body = _BaseReportErrorsServiceRestTransport._BaseReportErrorEvent._get_request_body_json( 

306 transcoded_request 

307 ) 

308 

309 # Jsonify the query params 

310 query_params = _BaseReportErrorsServiceRestTransport._BaseReportErrorEvent._get_query_params_json( 

311 transcoded_request 

312 ) 

313 

314 if CLIENT_LOGGING_SUPPORTED and _LOGGER.isEnabledFor( 

315 logging.DEBUG 

316 ): # pragma: NO COVER 

317 request_url = "{host}{uri}".format( 

318 host=self._host, uri=transcoded_request["uri"] 

319 ) 

320 method = transcoded_request["method"] 

321 try: 

322 request_payload = type(request).to_json(request) 

323 except: 

324 request_payload = None 

325 http_request = { 

326 "payload": request_payload, 

327 "requestMethod": method, 

328 "requestUrl": request_url, 

329 "headers": dict(metadata), 

330 } 

331 _LOGGER.debug( 

332 f"Sending request for google.devtools.clouderrorreporting_v1beta1.ReportErrorsServiceClient.ReportErrorEvent", 

333 extra={ 

334 "serviceName": "google.devtools.clouderrorreporting.v1beta1.ReportErrorsService", 

335 "rpcName": "ReportErrorEvent", 

336 "httpRequest": http_request, 

337 "metadata": http_request["headers"], 

338 }, 

339 ) 

340 

341 # Send the request 

342 response = ReportErrorsServiceRestTransport._ReportErrorEvent._get_response( 

343 self._host, 

344 metadata, 

345 query_params, 

346 self._session, 

347 timeout, 

348 transcoded_request, 

349 body, 

350 ) 

351 

352 # In case of error, raise the appropriate core_exceptions.GoogleAPICallError exception 

353 # subclass. 

354 if response.status_code >= 400: 

355 raise core_exceptions.from_http_response(response) 

356 

357 # Return the response 

358 resp = report_errors_service.ReportErrorEventResponse() 

359 pb_resp = report_errors_service.ReportErrorEventResponse.pb(resp) 

360 

361 json_format.Parse(response.content, pb_resp, ignore_unknown_fields=True) 

362 

363 resp = self._interceptor.post_report_error_event(resp) 

364 response_metadata = [(k, str(v)) for k, v in response.headers.items()] 

365 resp, _ = self._interceptor.post_report_error_event_with_metadata( 

366 resp, response_metadata 

367 ) 

368 if CLIENT_LOGGING_SUPPORTED and _LOGGER.isEnabledFor( 

369 logging.DEBUG 

370 ): # pragma: NO COVER 

371 try: 

372 response_payload = ( 

373 report_errors_service.ReportErrorEventResponse.to_json(response) 

374 ) 

375 except: 

376 response_payload = None 

377 http_response = { 

378 "payload": response_payload, 

379 "headers": dict(response.headers), 

380 "status": response.status_code, 

381 } 

382 _LOGGER.debug( 

383 "Received response for google.devtools.clouderrorreporting_v1beta1.ReportErrorsServiceClient.report_error_event", 

384 extra={ 

385 "serviceName": "google.devtools.clouderrorreporting.v1beta1.ReportErrorsService", 

386 "rpcName": "ReportErrorEvent", 

387 "metadata": http_response["headers"], 

388 "httpResponse": http_response, 

389 }, 

390 ) 

391 return resp 

392 

393 @property 

394 def report_error_event( 

395 self, 

396 ) -> Callable[ 

397 [report_errors_service.ReportErrorEventRequest], 

398 report_errors_service.ReportErrorEventResponse, 

399 ]: 

400 # The return type is fine, but mypy isn't sophisticated enough to determine what's going on here. 

401 # In C++ this would require a dynamic_cast 

402 return self._ReportErrorEvent(self._session, self._host, self._interceptor) # type: ignore 

403 

404 @property 

405 def kind(self) -> str: 

406 return "rest" 

407 

408 def close(self): 

409 self._session.close() 

410 

411 

412__all__ = ("ReportErrorsServiceRestTransport",)