Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/google/api_core/operations_v1/transports/rest.py: 27%
118 statements
« prev ^ index » next coverage.py v7.3.2, created at 2023-12-08 06:45 +0000
« prev ^ index » next coverage.py v7.3.2, created at 2023-12-08 06:45 +0000
1# -*- coding: utf-8 -*-
2# Copyright 2020 Google LLC
3#
4# Licensed under the Apache License, Version 2.0 (the "License");
5# you may not use this file except in compliance with the License.
6# You may obtain a copy of the License at
7#
8# http://www.apache.org/licenses/LICENSE-2.0
9#
10# Unless required by applicable law or agreed to in writing, software
11# distributed under the License is distributed on an "AS IS" BASIS,
12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13# See the License for the specific language governing permissions and
14# limitations under the License.
15#
17import re
18from typing import Callable, Dict, Optional, Sequence, Tuple, Union
20from requests import __version__ as requests_version
22from google.api_core import exceptions as core_exceptions # type: ignore
23from google.api_core import gapic_v1 # type: ignore
24from google.api_core import path_template # type: ignore
25from google.api_core import rest_helpers # type: ignore
26from google.api_core import retry as retries # type: ignore
27from google.auth import credentials as ga_credentials # type: ignore
28from google.auth.transport.requests import AuthorizedSession # type: ignore
29from google.longrunning import operations_pb2 # type: ignore
30from google.protobuf import empty_pb2 # type: ignore
31from google.protobuf import json_format # type: ignore
32import grpc
33from .base import DEFAULT_CLIENT_INFO as BASE_DEFAULT_CLIENT_INFO, OperationsTransport
35OptionalRetry = Union[retries.Retry, object]
37DEFAULT_CLIENT_INFO = gapic_v1.client_info.ClientInfo(
38 gapic_version=BASE_DEFAULT_CLIENT_INFO.gapic_version,
39 grpc_version=None,
40 rest_version=requests_version,
41)
44class OperationsRestTransport(OperationsTransport):
45 """REST backend transport for Operations.
47 Manages long-running operations with an API service.
49 When an API method normally takes long time to complete, it can be
50 designed to return [Operation][google.api_core.operations_v1.Operation] to the
51 client, and the client can use this interface to receive the real
52 response asynchronously by polling the operation resource, or pass
53 the operation resource to another API (such as Google Cloud Pub/Sub
54 API) to receive the response. Any API service that returns
55 long-running operations should implement the ``Operations``
56 interface so developers can have a consistent client experience.
58 This class defines the same methods as the primary client, so the
59 primary client can load the underlying transport implementation
60 and call it.
62 It sends JSON representations of protocol buffers over HTTP/1.1
63 """
65 def __init__(
66 self,
67 *,
68 host: str = "longrunning.googleapis.com",
69 credentials: ga_credentials.Credentials = None,
70 credentials_file: Optional[str] = None,
71 scopes: Optional[Sequence[str]] = None,
72 client_cert_source_for_mtls: Optional[Callable[[], Tuple[bytes, bytes]]] = None,
73 quota_project_id: Optional[str] = None,
74 client_info: gapic_v1.client_info.ClientInfo = DEFAULT_CLIENT_INFO,
75 always_use_jwt_access: Optional[bool] = False,
76 url_scheme: str = "https",
77 http_options: Optional[Dict] = None,
78 path_prefix: str = "v1",
79 ) -> None:
80 """Instantiate the transport.
82 Args:
83 host (Optional[str]):
84 The hostname to connect to.
85 credentials (Optional[google.auth.credentials.Credentials]): The
86 authorization credentials to attach to requests. These
87 credentials identify the application to the service; if none
88 are specified, the client will attempt to ascertain the
89 credentials from the environment.
91 credentials_file (Optional[str]): A file with credentials that can
92 be loaded with :func:`google.auth.load_credentials_from_file`.
93 This argument is ignored if ``channel`` is provided.
94 scopes (Optional(Sequence[str])): A list of scopes. This argument is
95 ignored if ``channel`` is provided.
96 client_cert_source_for_mtls (Callable[[], Tuple[bytes, bytes]]): Client
97 certificate to configure mutual TLS HTTP channel. It is ignored
98 if ``channel`` is provided.
99 quota_project_id (Optional[str]): An optional project to use for billing
100 and quota.
101 client_info (google.api_core.gapic_v1.client_info.ClientInfo):
102 The client info used to send a user-agent string along with
103 API requests. If ``None``, then default info will be used.
104 Generally, you only need to set this if you're developing
105 your own client library.
106 always_use_jwt_access (Optional[bool]): Whether self signed JWT should
107 be used for service account credentials.
108 url_scheme: the protocol scheme for the API endpoint. Normally
109 "https", but for testing or local servers,
110 "http" can be specified.
111 http_options: a dictionary of http_options for transcoding, to override
112 the defaults from operations.proto. Each method has an entry
113 with the corresponding http rules as value.
114 path_prefix: path prefix (usually represents API version). Set to
115 "v1" by default.
117 """
118 # Run the base constructor
119 # TODO(yon-mg): resolve other ctor params i.e. scopes, quota, etc.
120 # TODO: When custom host (api_endpoint) is set, `scopes` must *also* be set on the
121 # credentials object
122 maybe_url_match = re.match("^(?P<scheme>http(?:s)?://)?(?P<host>.*)$", host)
123 if maybe_url_match is None:
124 raise ValueError(
125 f"Unexpected hostname structure: {host}"
126 ) # pragma: NO COVER
128 url_match_items = maybe_url_match.groupdict()
130 host = f"{url_scheme}://{host}" if not url_match_items["scheme"] else host
132 super().__init__(
133 host=host,
134 credentials=credentials,
135 client_info=client_info,
136 always_use_jwt_access=always_use_jwt_access,
137 )
138 self._session = AuthorizedSession(
139 self._credentials, default_host=self.DEFAULT_HOST
140 )
141 if client_cert_source_for_mtls:
142 self._session.configure_mtls_channel(client_cert_source_for_mtls)
143 self._prep_wrapped_messages(client_info)
144 self._http_options = http_options or {}
145 self._path_prefix = path_prefix
147 def _list_operations(
148 self,
149 request: operations_pb2.ListOperationsRequest,
150 *,
151 retry: OptionalRetry = gapic_v1.method.DEFAULT,
152 timeout: Optional[float] = None,
153 compression: Optional[grpc.Compression] = gapic_v1.method.DEFAULT,
154 metadata: Sequence[Tuple[str, str]] = (),
155 ) -> operations_pb2.ListOperationsResponse:
156 r"""Call the list operations method over HTTP.
158 Args:
159 request (~.operations_pb2.ListOperationsRequest):
160 The request object. The request message for
161 [Operations.ListOperations][google.api_core.operations_v1.Operations.ListOperations].
163 retry (google.api_core.retry.Retry): Designation of what errors, if any,
164 should be retried.
165 timeout (float): The timeout for this request.
166 metadata (Sequence[Tuple[str, str]]): Strings which should be
167 sent along with the request as metadata.
169 Returns:
170 ~.operations_pb2.ListOperationsResponse:
171 The response message for
172 [Operations.ListOperations][google.api_core.operations_v1.Operations.ListOperations].
174 """
176 http_options = [
177 {
178 "method": "get",
179 "uri": "/{}/{{name=**}}/operations".format(self._path_prefix),
180 },
181 ]
182 if "google.longrunning.Operations.ListOperations" in self._http_options:
183 http_options = self._http_options[
184 "google.longrunning.Operations.ListOperations"
185 ]
187 request_kwargs = json_format.MessageToDict(
188 request,
189 preserving_proto_field_name=True,
190 including_default_value_fields=True,
191 )
192 transcoded_request = path_template.transcode(http_options, **request_kwargs)
194 uri = transcoded_request["uri"]
195 method = transcoded_request["method"]
197 # Jsonify the query params
198 query_params_request = operations_pb2.ListOperationsRequest()
199 json_format.ParseDict(transcoded_request["query_params"], query_params_request)
200 query_params = json_format.MessageToDict(
201 query_params_request,
202 including_default_value_fields=False,
203 preserving_proto_field_name=False,
204 use_integers_for_enums=False,
205 )
207 # Send the request
208 headers = dict(metadata)
209 headers["Content-Type"] = "application/json"
210 response = getattr(self._session, method)(
211 "{host}{uri}".format(host=self._host, uri=uri),
212 timeout=timeout,
213 headers=headers,
214 params=rest_helpers.flatten_query_params(query_params),
215 )
217 # In case of error, raise the appropriate core_exceptions.GoogleAPICallError exception
218 # subclass.
219 if response.status_code >= 400:
220 raise core_exceptions.from_http_response(response)
222 # Return the response
223 api_response = operations_pb2.ListOperationsResponse()
224 json_format.Parse(response.content, api_response, ignore_unknown_fields=False)
225 return api_response
227 def _get_operation(
228 self,
229 request: operations_pb2.GetOperationRequest,
230 *,
231 retry: OptionalRetry = gapic_v1.method.DEFAULT,
232 timeout: Optional[float] = None,
233 compression: Optional[grpc.Compression] = gapic_v1.method.DEFAULT,
234 metadata: Sequence[Tuple[str, str]] = (),
235 ) -> operations_pb2.Operation:
236 r"""Call the get operation method over HTTP.
238 Args:
239 request (~.operations_pb2.GetOperationRequest):
240 The request object. The request message for
241 [Operations.GetOperation][google.api_core.operations_v1.Operations.GetOperation].
243 retry (google.api_core.retry.Retry): Designation of what errors, if any,
244 should be retried.
245 timeout (float): The timeout for this request.
246 metadata (Sequence[Tuple[str, str]]): Strings which should be
247 sent along with the request as metadata.
249 Returns:
250 ~.operations_pb2.Operation:
251 This resource represents a long-
252 running operation that is the result of a
253 network API call.
255 """
257 http_options = [
258 {
259 "method": "get",
260 "uri": "/{}/{{name=**/operations/*}}".format(self._path_prefix),
261 },
262 ]
263 if "google.longrunning.Operations.GetOperation" in self._http_options:
264 http_options = self._http_options[
265 "google.longrunning.Operations.GetOperation"
266 ]
268 request_kwargs = json_format.MessageToDict(
269 request,
270 preserving_proto_field_name=True,
271 including_default_value_fields=True,
272 )
273 transcoded_request = path_template.transcode(http_options, **request_kwargs)
275 uri = transcoded_request["uri"]
276 method = transcoded_request["method"]
278 # Jsonify the query params
279 query_params_request = operations_pb2.GetOperationRequest()
280 json_format.ParseDict(transcoded_request["query_params"], query_params_request)
281 query_params = json_format.MessageToDict(
282 query_params_request,
283 including_default_value_fields=False,
284 preserving_proto_field_name=False,
285 use_integers_for_enums=False,
286 )
288 # Send the request
289 headers = dict(metadata)
290 headers["Content-Type"] = "application/json"
291 response = getattr(self._session, method)(
292 "{host}{uri}".format(host=self._host, uri=uri),
293 timeout=timeout,
294 headers=headers,
295 params=rest_helpers.flatten_query_params(query_params),
296 )
298 # In case of error, raise the appropriate core_exceptions.GoogleAPICallError exception
299 # subclass.
300 if response.status_code >= 400:
301 raise core_exceptions.from_http_response(response)
303 # Return the response
304 api_response = operations_pb2.Operation()
305 json_format.Parse(response.content, api_response, ignore_unknown_fields=False)
306 return api_response
308 def _delete_operation(
309 self,
310 request: operations_pb2.DeleteOperationRequest,
311 *,
312 retry: OptionalRetry = gapic_v1.method.DEFAULT,
313 timeout: Optional[float] = None,
314 compression: Optional[grpc.Compression] = gapic_v1.method.DEFAULT,
315 metadata: Sequence[Tuple[str, str]] = (),
316 ) -> empty_pb2.Empty:
317 r"""Call the delete operation method over HTTP.
319 Args:
320 request (~.operations_pb2.DeleteOperationRequest):
321 The request object. The request message for
322 [Operations.DeleteOperation][google.api_core.operations_v1.Operations.DeleteOperation].
324 retry (google.api_core.retry.Retry): Designation of what errors, if any,
325 should be retried.
326 timeout (float): The timeout for this request.
327 metadata (Sequence[Tuple[str, str]]): Strings which should be
328 sent along with the request as metadata.
329 """
331 http_options = [
332 {
333 "method": "delete",
334 "uri": "/{}/{{name=**/operations/*}}".format(self._path_prefix),
335 },
336 ]
337 if "google.longrunning.Operations.DeleteOperation" in self._http_options:
338 http_options = self._http_options[
339 "google.longrunning.Operations.DeleteOperation"
340 ]
342 request_kwargs = json_format.MessageToDict(
343 request,
344 preserving_proto_field_name=True,
345 including_default_value_fields=True,
346 )
347 transcoded_request = path_template.transcode(http_options, **request_kwargs)
349 uri = transcoded_request["uri"]
350 method = transcoded_request["method"]
352 # Jsonify the query params
353 query_params_request = operations_pb2.DeleteOperationRequest()
354 json_format.ParseDict(transcoded_request["query_params"], query_params_request)
355 query_params = json_format.MessageToDict(
356 query_params_request,
357 including_default_value_fields=False,
358 preserving_proto_field_name=False,
359 use_integers_for_enums=False,
360 )
362 # Send the request
363 headers = dict(metadata)
364 headers["Content-Type"] = "application/json"
365 response = getattr(self._session, method)(
366 "{host}{uri}".format(host=self._host, uri=uri),
367 timeout=timeout,
368 headers=headers,
369 params=rest_helpers.flatten_query_params(query_params),
370 )
372 # In case of error, raise the appropriate core_exceptions.GoogleAPICallError exception
373 # subclass.
374 if response.status_code >= 400:
375 raise core_exceptions.from_http_response(response)
377 return empty_pb2.Empty()
379 def _cancel_operation(
380 self,
381 request: operations_pb2.CancelOperationRequest,
382 *,
383 retry: OptionalRetry = gapic_v1.method.DEFAULT,
384 timeout: Optional[float] = None,
385 compression: Optional[grpc.Compression] = gapic_v1.method.DEFAULT,
386 metadata: Sequence[Tuple[str, str]] = (),
387 ) -> empty_pb2.Empty:
388 r"""Call the cancel operation method over HTTP.
390 Args:
391 request (~.operations_pb2.CancelOperationRequest):
392 The request object. The request message for
393 [Operations.CancelOperation][google.api_core.operations_v1.Operations.CancelOperation].
395 retry (google.api_core.retry.Retry): Designation of what errors, if any,
396 should be retried.
397 timeout (float): The timeout for this request.
398 metadata (Sequence[Tuple[str, str]]): Strings which should be
399 sent along with the request as metadata.
400 """
402 http_options = [
403 {
404 "method": "post",
405 "uri": "/{}/{{name=**/operations/*}}:cancel".format(self._path_prefix),
406 "body": "*",
407 },
408 ]
409 if "google.longrunning.Operations.CancelOperation" in self._http_options:
410 http_options = self._http_options[
411 "google.longrunning.Operations.CancelOperation"
412 ]
414 request_kwargs = json_format.MessageToDict(
415 request,
416 preserving_proto_field_name=True,
417 including_default_value_fields=True,
418 )
419 transcoded_request = path_template.transcode(http_options, **request_kwargs)
421 # Jsonify the request body
422 body_request = operations_pb2.CancelOperationRequest()
423 json_format.ParseDict(transcoded_request["body"], body_request)
424 body = json_format.MessageToDict(
425 body_request,
426 including_default_value_fields=False,
427 preserving_proto_field_name=False,
428 use_integers_for_enums=False,
429 )
430 uri = transcoded_request["uri"]
431 method = transcoded_request["method"]
433 # Jsonify the query params
434 query_params_request = operations_pb2.CancelOperationRequest()
435 json_format.ParseDict(transcoded_request["query_params"], query_params_request)
436 query_params = json_format.MessageToDict(
437 query_params_request,
438 including_default_value_fields=False,
439 preserving_proto_field_name=False,
440 use_integers_for_enums=False,
441 )
443 # Send the request
444 headers = dict(metadata)
445 headers["Content-Type"] = "application/json"
446 response = getattr(self._session, method)(
447 "{host}{uri}".format(host=self._host, uri=uri),
448 timeout=timeout,
449 headers=headers,
450 params=rest_helpers.flatten_query_params(query_params),
451 data=body,
452 )
454 # In case of error, raise the appropriate core_exceptions.GoogleAPICallError exception
455 # subclass.
456 if response.status_code >= 400:
457 raise core_exceptions.from_http_response(response)
459 return empty_pb2.Empty()
461 @property
462 def list_operations(
463 self,
464 ) -> Callable[
465 [operations_pb2.ListOperationsRequest], operations_pb2.ListOperationsResponse
466 ]:
467 return self._list_operations
469 @property
470 def get_operation(
471 self,
472 ) -> Callable[[operations_pb2.GetOperationRequest], operations_pb2.Operation]:
473 return self._get_operation
475 @property
476 def delete_operation(
477 self,
478 ) -> Callable[[operations_pb2.DeleteOperationRequest], empty_pb2.Empty]:
479 return self._delete_operation
481 @property
482 def cancel_operation(
483 self,
484 ) -> Callable[[operations_pb2.CancelOperationRequest], empty_pb2.Empty]:
485 return self._cancel_operation
488__all__ = ("OperationsRestTransport",)