Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/google/cloud/errorreporting_v1beta1/services/report_errors_service/transports/rest.py: 57%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

81 statements  

1# -*- coding: utf-8 -*- 

2# Copyright 2025 Google LLC 

3# 

4# Licensed under the Apache License, Version 2.0 (the "License"); 

5# you may not use this file except in compliance with the License. 

6# You may obtain a copy of the License at 

7# 

8# http://www.apache.org/licenses/LICENSE-2.0 

9# 

10# Unless required by applicable law or agreed to in writing, software 

11# distributed under the License is distributed on an "AS IS" BASIS, 

12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

13# See the License for the specific language governing permissions and 

14# limitations under the License. 

15# 

16import logging 

17import json # type: ignore 

18 

19from google.auth.transport.requests import AuthorizedSession # type: ignore 

20from google.auth import credentials as ga_credentials # type: ignore 

21from google.api_core import exceptions as core_exceptions 

22from google.api_core import retry as retries 

23from google.api_core import rest_helpers 

24from google.api_core import rest_streaming 

25from google.api_core import gapic_v1 

26import google.protobuf 

27 

28from google.protobuf import json_format 

29 

30from requests import __version__ as requests_version 

31import dataclasses 

32from typing import Any, Callable, Dict, List, Optional, Sequence, Tuple, Union 

33import warnings 

34 

35 

36from google.cloud.errorreporting_v1beta1.types import report_errors_service 

37 

38 

39from .rest_base import _BaseReportErrorsServiceRestTransport 

40from .base import DEFAULT_CLIENT_INFO as BASE_DEFAULT_CLIENT_INFO 

41 

42try: 

43 OptionalRetry = Union[retries.Retry, gapic_v1.method._MethodDefault, None] 

44except AttributeError: # pragma: NO COVER 

45 OptionalRetry = Union[retries.Retry, object, None] # type: ignore 

46 

47try: 

48 from google.api_core import client_logging # type: ignore 

49 

50 CLIENT_LOGGING_SUPPORTED = True # pragma: NO COVER 

51except ImportError: # pragma: NO COVER 

52 CLIENT_LOGGING_SUPPORTED = False 

53 

54_LOGGER = logging.getLogger(__name__) 

55 

56DEFAULT_CLIENT_INFO = gapic_v1.client_info.ClientInfo( 

57 gapic_version=BASE_DEFAULT_CLIENT_INFO.gapic_version, 

58 grpc_version=None, 

59 rest_version=f"requests@{requests_version}", 

60) 

61 

62if hasattr(DEFAULT_CLIENT_INFO, "protobuf_runtime_version"): # pragma: NO COVER 

63 DEFAULT_CLIENT_INFO.protobuf_runtime_version = google.protobuf.__version__ 

64 

65 

66class ReportErrorsServiceRestInterceptor: 

67 """Interceptor for ReportErrorsService. 

68 

69 Interceptors are used to manipulate requests, request metadata, and responses 

70 in arbitrary ways. 

71 Example use cases include: 

72 * Logging 

73 * Verifying requests according to service or custom semantics 

74 * Stripping extraneous information from responses 

75 

76 These use cases and more can be enabled by injecting an 

77 instance of a custom subclass when constructing the ReportErrorsServiceRestTransport. 

78 

79 .. code-block:: python 

80 class MyCustomReportErrorsServiceInterceptor(ReportErrorsServiceRestInterceptor): 

81 def pre_report_error_event(self, request, metadata): 

82 logging.log(f"Received request: {request}") 

83 return request, metadata 

84 

85 def post_report_error_event(self, response): 

86 logging.log(f"Received response: {response}") 

87 return response 

88 

89 transport = ReportErrorsServiceRestTransport(interceptor=MyCustomReportErrorsServiceInterceptor()) 

90 client = ReportErrorsServiceClient(transport=transport) 

91 

92 

93 """ 

94 

95 def pre_report_error_event( 

96 self, 

97 request: report_errors_service.ReportErrorEventRequest, 

98 metadata: Sequence[Tuple[str, Union[str, bytes]]], 

99 ) -> Tuple[ 

100 report_errors_service.ReportErrorEventRequest, 

101 Sequence[Tuple[str, Union[str, bytes]]], 

102 ]: 

103 """Pre-rpc interceptor for report_error_event 

104 

105 Override in a subclass to manipulate the request or metadata 

106 before they are sent to the ReportErrorsService server. 

107 """ 

108 return request, metadata 

109 

110 def post_report_error_event( 

111 self, response: report_errors_service.ReportErrorEventResponse 

112 ) -> report_errors_service.ReportErrorEventResponse: 

113 """Post-rpc interceptor for report_error_event 

114 

115 DEPRECATED. Please use the `post_report_error_event_with_metadata` 

116 interceptor instead. 

117 

118 Override in a subclass to read or manipulate the response 

119 after it is returned by the ReportErrorsService server but before 

120 it is returned to user code. This `post_report_error_event` interceptor runs 

121 before the `post_report_error_event_with_metadata` interceptor. 

122 """ 

123 return response 

124 

125 def post_report_error_event_with_metadata( 

126 self, 

127 response: report_errors_service.ReportErrorEventResponse, 

128 metadata: Sequence[Tuple[str, Union[str, bytes]]], 

129 ) -> Tuple[ 

130 report_errors_service.ReportErrorEventResponse, 

131 Sequence[Tuple[str, Union[str, bytes]]], 

132 ]: 

133 """Post-rpc interceptor for report_error_event 

134 

135 Override in a subclass to read or manipulate the response or metadata after it 

136 is returned by the ReportErrorsService server but before it is returned to user code. 

137 

138 We recommend only using this `post_report_error_event_with_metadata` 

139 interceptor in new development instead of the `post_report_error_event` interceptor. 

140 When both interceptors are used, this `post_report_error_event_with_metadata` interceptor runs after the 

141 `post_report_error_event` interceptor. The (possibly modified) response returned by 

142 `post_report_error_event` will be passed to 

143 `post_report_error_event_with_metadata`. 

144 """ 

145 return response, metadata 

146 

147 

148@dataclasses.dataclass 

149class ReportErrorsServiceRestStub: 

150 _session: AuthorizedSession 

151 _host: str 

152 _interceptor: ReportErrorsServiceRestInterceptor 

153 

154 

155class ReportErrorsServiceRestTransport(_BaseReportErrorsServiceRestTransport): 

156 """REST backend synchronous transport for ReportErrorsService. 

157 

158 An API for reporting error events. 

159 

160 This class defines the same methods as the primary client, so the 

161 primary client can load the underlying transport implementation 

162 and call it. 

163 

164 It sends JSON representations of protocol buffers over HTTP/1.1 

165 """ 

166 

167 def __init__( 

168 self, 

169 *, 

170 host: str = "clouderrorreporting.googleapis.com", 

171 credentials: Optional[ga_credentials.Credentials] = None, 

172 credentials_file: Optional[str] = None, 

173 scopes: Optional[Sequence[str]] = None, 

174 client_cert_source_for_mtls: Optional[Callable[[], Tuple[bytes, bytes]]] = None, 

175 quota_project_id: Optional[str] = None, 

176 client_info: gapic_v1.client_info.ClientInfo = DEFAULT_CLIENT_INFO, 

177 always_use_jwt_access: Optional[bool] = False, 

178 url_scheme: str = "https", 

179 interceptor: Optional[ReportErrorsServiceRestInterceptor] = None, 

180 api_audience: Optional[str] = None, 

181 ) -> None: 

182 """Instantiate the transport. 

183 

184 Args: 

185 host (Optional[str]): 

186 The hostname to connect to (default: 'clouderrorreporting.googleapis.com'). 

187 credentials (Optional[google.auth.credentials.Credentials]): The 

188 authorization credentials to attach to requests. These 

189 credentials identify the application to the service; if none 

190 are specified, the client will attempt to ascertain the 

191 credentials from the environment. 

192 

193 credentials_file (Optional[str]): Deprecated. A file with credentials that can 

194 be loaded with :func:`google.auth.load_credentials_from_file`. 

195 This argument is ignored if ``channel`` is provided. This argument will be 

196 removed in the next major version of this library. 

197 scopes (Optional(Sequence[str])): A list of scopes. This argument is 

198 ignored if ``channel`` is provided. 

199 client_cert_source_for_mtls (Callable[[], Tuple[bytes, bytes]]): Client 

200 certificate to configure mutual TLS HTTP channel. It is ignored 

201 if ``channel`` is provided. 

202 quota_project_id (Optional[str]): An optional project to use for billing 

203 and quota. 

204 client_info (google.api_core.gapic_v1.client_info.ClientInfo): 

205 The client info used to send a user-agent string along with 

206 API requests. If ``None``, then default info will be used. 

207 Generally, you only need to set this if you are developing 

208 your own client library. 

209 always_use_jwt_access (Optional[bool]): Whether self signed JWT should 

210 be used for service account credentials. 

211 url_scheme: the protocol scheme for the API endpoint. Normally 

212 "https", but for testing or local servers, 

213 "http" can be specified. 

214 """ 

215 # Run the base constructor 

216 # TODO(yon-mg): resolve other ctor params i.e. scopes, quota, etc. 

217 # TODO: When custom host (api_endpoint) is set, `scopes` must *also* be set on the 

218 # credentials object 

219 super().__init__( 

220 host=host, 

221 credentials=credentials, 

222 client_info=client_info, 

223 always_use_jwt_access=always_use_jwt_access, 

224 url_scheme=url_scheme, 

225 api_audience=api_audience, 

226 ) 

227 self._session = AuthorizedSession( 

228 self._credentials, default_host=self.DEFAULT_HOST 

229 ) 

230 if client_cert_source_for_mtls: 

231 self._session.configure_mtls_channel(client_cert_source_for_mtls) 

232 self._interceptor = interceptor or ReportErrorsServiceRestInterceptor() 

233 self._prep_wrapped_messages(client_info) 

234 

235 class _ReportErrorEvent( 

236 _BaseReportErrorsServiceRestTransport._BaseReportErrorEvent, 

237 ReportErrorsServiceRestStub, 

238 ): 

239 def __hash__(self): 

240 return hash("ReportErrorsServiceRestTransport.ReportErrorEvent") 

241 

242 @staticmethod 

243 def _get_response( 

244 host, 

245 metadata, 

246 query_params, 

247 session, 

248 timeout, 

249 transcoded_request, 

250 body=None, 

251 ): 

252 uri = transcoded_request["uri"] 

253 method = transcoded_request["method"] 

254 headers = dict(metadata) 

255 headers["Content-Type"] = "application/json" 

256 response = getattr(session, method)( 

257 "{host}{uri}".format(host=host, uri=uri), 

258 timeout=timeout, 

259 headers=headers, 

260 params=rest_helpers.flatten_query_params(query_params, strict=True), 

261 data=body, 

262 ) 

263 return response 

264 

265 def __call__( 

266 self, 

267 request: report_errors_service.ReportErrorEventRequest, 

268 *, 

269 retry: OptionalRetry = gapic_v1.method.DEFAULT, 

270 timeout: Optional[float] = None, 

271 metadata: Sequence[Tuple[str, Union[str, bytes]]] = (), 

272 ) -> report_errors_service.ReportErrorEventResponse: 

273 r"""Call the report error event method over HTTP. 

274 

275 Args: 

276 request (~.report_errors_service.ReportErrorEventRequest): 

277 The request object. A request for reporting an individual 

278 error event. 

279 retry (google.api_core.retry.Retry): Designation of what errors, if any, 

280 should be retried. 

281 timeout (float): The timeout for this request. 

282 metadata (Sequence[Tuple[str, Union[str, bytes]]]): Key/value pairs which should be 

283 sent along with the request as metadata. Normally, each value must be of type `str`, 

284 but for metadata keys ending with the suffix `-bin`, the corresponding values must 

285 be of type `bytes`. 

286 

287 Returns: 

288 ~.report_errors_service.ReportErrorEventResponse: 

289 Response for reporting an individual 

290 error event. Data may be added to this 

291 message in the future. 

292 

293 """ 

294 

295 http_options = ( 

296 _BaseReportErrorsServiceRestTransport._BaseReportErrorEvent._get_http_options() 

297 ) 

298 

299 request, metadata = self._interceptor.pre_report_error_event( 

300 request, metadata 

301 ) 

302 transcoded_request = _BaseReportErrorsServiceRestTransport._BaseReportErrorEvent._get_transcoded_request( 

303 http_options, request 

304 ) 

305 

306 body = _BaseReportErrorsServiceRestTransport._BaseReportErrorEvent._get_request_body_json( 

307 transcoded_request 

308 ) 

309 

310 # Jsonify the query params 

311 query_params = _BaseReportErrorsServiceRestTransport._BaseReportErrorEvent._get_query_params_json( 

312 transcoded_request 

313 ) 

314 

315 if CLIENT_LOGGING_SUPPORTED and _LOGGER.isEnabledFor( 

316 logging.DEBUG 

317 ): # pragma: NO COVER 

318 request_url = "{host}{uri}".format( 

319 host=self._host, uri=transcoded_request["uri"] 

320 ) 

321 method = transcoded_request["method"] 

322 try: 

323 request_payload = type(request).to_json(request) 

324 except: 

325 request_payload = None 

326 http_request = { 

327 "payload": request_payload, 

328 "requestMethod": method, 

329 "requestUrl": request_url, 

330 "headers": dict(metadata), 

331 } 

332 _LOGGER.debug( 

333 f"Sending request for google.devtools.clouderrorreporting_v1beta1.ReportErrorsServiceClient.ReportErrorEvent", 

334 extra={ 

335 "serviceName": "google.devtools.clouderrorreporting.v1beta1.ReportErrorsService", 

336 "rpcName": "ReportErrorEvent", 

337 "httpRequest": http_request, 

338 "metadata": http_request["headers"], 

339 }, 

340 ) 

341 

342 # Send the request 

343 response = ReportErrorsServiceRestTransport._ReportErrorEvent._get_response( 

344 self._host, 

345 metadata, 

346 query_params, 

347 self._session, 

348 timeout, 

349 transcoded_request, 

350 body, 

351 ) 

352 

353 # In case of error, raise the appropriate core_exceptions.GoogleAPICallError exception 

354 # subclass. 

355 if response.status_code >= 400: 

356 raise core_exceptions.from_http_response(response) 

357 

358 # Return the response 

359 resp = report_errors_service.ReportErrorEventResponse() 

360 pb_resp = report_errors_service.ReportErrorEventResponse.pb(resp) 

361 

362 json_format.Parse(response.content, pb_resp, ignore_unknown_fields=True) 

363 

364 resp = self._interceptor.post_report_error_event(resp) 

365 response_metadata = [(k, str(v)) for k, v in response.headers.items()] 

366 resp, _ = self._interceptor.post_report_error_event_with_metadata( 

367 resp, response_metadata 

368 ) 

369 if CLIENT_LOGGING_SUPPORTED and _LOGGER.isEnabledFor( 

370 logging.DEBUG 

371 ): # pragma: NO COVER 

372 try: 

373 response_payload = ( 

374 report_errors_service.ReportErrorEventResponse.to_json(response) 

375 ) 

376 except: 

377 response_payload = None 

378 http_response = { 

379 "payload": response_payload, 

380 "headers": dict(response.headers), 

381 "status": response.status_code, 

382 } 

383 _LOGGER.debug( 

384 "Received response for google.devtools.clouderrorreporting_v1beta1.ReportErrorsServiceClient.report_error_event", 

385 extra={ 

386 "serviceName": "google.devtools.clouderrorreporting.v1beta1.ReportErrorsService", 

387 "rpcName": "ReportErrorEvent", 

388 "metadata": http_response["headers"], 

389 "httpResponse": http_response, 

390 }, 

391 ) 

392 return resp 

393 

394 @property 

395 def report_error_event( 

396 self, 

397 ) -> Callable[ 

398 [report_errors_service.ReportErrorEventRequest], 

399 report_errors_service.ReportErrorEventResponse, 

400 ]: 

401 # The return type is fine, but mypy isn't sophisticated enough to determine what's going on here. 

402 # In C++ this would require a dynamic_cast 

403 return self._ReportErrorEvent(self._session, self._host, self._interceptor) # type: ignore 

404 

405 @property 

406 def kind(self) -> str: 

407 return "rest" 

408 

409 def close(self): 

410 self._session.close() 

411 

412 

413__all__ = ("ReportErrorsServiceRestTransport",)