Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/google/api_core/operations_v1/transports/rest.py: 26%
117 statements
« prev ^ index » next coverage.py v7.2.2, created at 2023-03-26 07:30 +0000
« prev ^ index » next coverage.py v7.2.2, created at 2023-03-26 07:30 +0000
1# -*- coding: utf-8 -*-
2# Copyright 2020 Google LLC
3#
4# Licensed under the Apache License, Version 2.0 (the "License");
5# you may not use this file except in compliance with the License.
6# You may obtain a copy of the License at
7#
8# http://www.apache.org/licenses/LICENSE-2.0
9#
10# Unless required by applicable law or agreed to in writing, software
11# distributed under the License is distributed on an "AS IS" BASIS,
12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13# See the License for the specific language governing permissions and
14# limitations under the License.
15#
17import re
18from typing import Callable, Dict, Optional, Sequence, Tuple, Union
20from requests import __version__ as requests_version
22from google.api_core import exceptions as core_exceptions # type: ignore
23from google.api_core import gapic_v1 # type: ignore
24from google.api_core import path_template # type: ignore
25from google.api_core import rest_helpers # type: ignore
26from google.api_core import retry as retries # type: ignore
27from google.auth import credentials as ga_credentials # type: ignore
28from google.auth.transport.requests import AuthorizedSession # type: ignore
29from google.longrunning import operations_pb2 # type: ignore
30from google.protobuf import empty_pb2 # type: ignore
31from google.protobuf import json_format # type: ignore
32from .base import DEFAULT_CLIENT_INFO as BASE_DEFAULT_CLIENT_INFO, OperationsTransport
34OptionalRetry = Union[retries.Retry, object]
36DEFAULT_CLIENT_INFO = gapic_v1.client_info.ClientInfo(
37 gapic_version=BASE_DEFAULT_CLIENT_INFO.gapic_version,
38 grpc_version=None,
39 rest_version=requests_version,
40)
43class OperationsRestTransport(OperationsTransport):
44 """REST backend transport for Operations.
46 Manages long-running operations with an API service.
48 When an API method normally takes long time to complete, it can be
49 designed to return [Operation][google.api_core.operations_v1.Operation] to the
50 client, and the client can use this interface to receive the real
51 response asynchronously by polling the operation resource, or pass
52 the operation resource to another API (such as Google Cloud Pub/Sub
53 API) to receive the response. Any API service that returns
54 long-running operations should implement the ``Operations``
55 interface so developers can have a consistent client experience.
57 This class defines the same methods as the primary client, so the
58 primary client can load the underlying transport implementation
59 and call it.
61 It sends JSON representations of protocol buffers over HTTP/1.1
62 """
64 def __init__(
65 self,
66 *,
67 host: str = "longrunning.googleapis.com",
68 credentials: ga_credentials.Credentials = None,
69 credentials_file: Optional[str] = None,
70 scopes: Optional[Sequence[str]] = None,
71 client_cert_source_for_mtls: Optional[Callable[[], Tuple[bytes, bytes]]] = None,
72 quota_project_id: Optional[str] = None,
73 client_info: gapic_v1.client_info.ClientInfo = DEFAULT_CLIENT_INFO,
74 always_use_jwt_access: Optional[bool] = False,
75 url_scheme: str = "https",
76 http_options: Optional[Dict] = None,
77 path_prefix: str = "v1",
78 ) -> None:
79 """Instantiate the transport.
81 Args:
82 host (Optional[str]):
83 The hostname to connect to.
84 credentials (Optional[google.auth.credentials.Credentials]): The
85 authorization credentials to attach to requests. These
86 credentials identify the application to the service; if none
87 are specified, the client will attempt to ascertain the
88 credentials from the environment.
90 credentials_file (Optional[str]): A file with credentials that can
91 be loaded with :func:`google.auth.load_credentials_from_file`.
92 This argument is ignored if ``channel`` is provided.
93 scopes (Optional(Sequence[str])): A list of scopes. This argument is
94 ignored if ``channel`` is provided.
95 client_cert_source_for_mtls (Callable[[], Tuple[bytes, bytes]]): Client
96 certificate to configure mutual TLS HTTP channel. It is ignored
97 if ``channel`` is provided.
98 quota_project_id (Optional[str]): An optional project to use for billing
99 and quota.
100 client_info (google.api_core.gapic_v1.client_info.ClientInfo):
101 The client info used to send a user-agent string along with
102 API requests. If ``None``, then default info will be used.
103 Generally, you only need to set this if you're developing
104 your own client library.
105 always_use_jwt_access (Optional[bool]): Whether self signed JWT should
106 be used for service account credentials.
107 url_scheme: the protocol scheme for the API endpoint. Normally
108 "https", but for testing or local servers,
109 "http" can be specified.
110 http_options: a dictionary of http_options for transcoding, to override
111 the defaults from operatons.proto. Each method has an entry
112 with the corresponding http rules as value.
113 path_prefix: path prefix (usually represents API version). Set to
114 "v1" by default.
116 """
117 # Run the base constructor
118 # TODO(yon-mg): resolve other ctor params i.e. scopes, quota, etc.
119 # TODO: When custom host (api_endpoint) is set, `scopes` must *also* be set on the
120 # credentials object
121 maybe_url_match = re.match("^(?P<scheme>http(?:s)?://)?(?P<host>.*)$", host)
122 if maybe_url_match is None:
123 raise ValueError(
124 f"Unexpected hostname structure: {host}"
125 ) # pragma: NO COVER
127 url_match_items = maybe_url_match.groupdict()
129 host = f"{url_scheme}://{host}" if not url_match_items["scheme"] else host
131 super().__init__(
132 host=host,
133 credentials=credentials,
134 client_info=client_info,
135 always_use_jwt_access=always_use_jwt_access,
136 )
137 self._session = AuthorizedSession(
138 self._credentials, default_host=self.DEFAULT_HOST
139 )
140 if client_cert_source_for_mtls:
141 self._session.configure_mtls_channel(client_cert_source_for_mtls)
142 self._prep_wrapped_messages(client_info)
143 self._http_options = http_options or {}
144 self._path_prefix = path_prefix
146 def _list_operations(
147 self,
148 request: operations_pb2.ListOperationsRequest,
149 *,
150 retry: OptionalRetry = gapic_v1.method.DEFAULT,
151 timeout: Optional[float] = None,
152 metadata: Sequence[Tuple[str, str]] = (),
153 ) -> operations_pb2.ListOperationsResponse:
154 r"""Call the list operations method over HTTP.
156 Args:
157 request (~.operations_pb2.ListOperationsRequest):
158 The request object. The request message for
159 [Operations.ListOperations][google.api_core.operations_v1.Operations.ListOperations].
161 retry (google.api_core.retry.Retry): Designation of what errors, if any,
162 should be retried.
163 timeout (float): The timeout for this request.
164 metadata (Sequence[Tuple[str, str]]): Strings which should be
165 sent along with the request as metadata.
167 Returns:
168 ~.operations_pb2.ListOperationsResponse:
169 The response message for
170 [Operations.ListOperations][google.api_core.operations_v1.Operations.ListOperations].
172 """
174 http_options = [
175 {
176 "method": "get",
177 "uri": "/{}/{{name=**}}/operations".format(self._path_prefix),
178 },
179 ]
180 if "google.longrunning.Operations.ListOperations" in self._http_options:
181 http_options = self._http_options[
182 "google.longrunning.Operations.ListOperations"
183 ]
185 request_kwargs = json_format.MessageToDict(
186 request,
187 preserving_proto_field_name=True,
188 including_default_value_fields=True,
189 )
190 transcoded_request = path_template.transcode(http_options, **request_kwargs)
192 uri = transcoded_request["uri"]
193 method = transcoded_request["method"]
195 # Jsonify the query params
196 query_params_request = operations_pb2.ListOperationsRequest()
197 json_format.ParseDict(transcoded_request["query_params"], query_params_request)
198 query_params = json_format.MessageToDict(
199 query_params_request,
200 including_default_value_fields=False,
201 preserving_proto_field_name=False,
202 use_integers_for_enums=False,
203 )
205 # Send the request
206 headers = dict(metadata)
207 headers["Content-Type"] = "application/json"
208 response = getattr(self._session, method)(
209 "{host}{uri}".format(host=self._host, uri=uri),
210 timeout=timeout,
211 headers=headers,
212 params=rest_helpers.flatten_query_params(query_params),
213 )
215 # In case of error, raise the appropriate core_exceptions.GoogleAPICallError exception
216 # subclass.
217 if response.status_code >= 400:
218 raise core_exceptions.from_http_response(response)
220 # Return the response
221 api_response = operations_pb2.ListOperationsResponse()
222 json_format.Parse(response.content, api_response, ignore_unknown_fields=False)
223 return api_response
225 def _get_operation(
226 self,
227 request: operations_pb2.GetOperationRequest,
228 *,
229 retry: OptionalRetry = gapic_v1.method.DEFAULT,
230 timeout: Optional[float] = None,
231 metadata: Sequence[Tuple[str, str]] = (),
232 ) -> operations_pb2.Operation:
233 r"""Call the get operation method over HTTP.
235 Args:
236 request (~.operations_pb2.GetOperationRequest):
237 The request object. The request message for
238 [Operations.GetOperation][google.api_core.operations_v1.Operations.GetOperation].
240 retry (google.api_core.retry.Retry): Designation of what errors, if any,
241 should be retried.
242 timeout (float): The timeout for this request.
243 metadata (Sequence[Tuple[str, str]]): Strings which should be
244 sent along with the request as metadata.
246 Returns:
247 ~.operations_pb2.Operation:
248 This resource represents a long-
249 unning operation that is the result of a
250 network API call.
252 """
254 http_options = [
255 {
256 "method": "get",
257 "uri": "/{}/{{name=**/operations/*}}".format(self._path_prefix),
258 },
259 ]
260 if "google.longrunning.Operations.GetOperation" in self._http_options:
261 http_options = self._http_options[
262 "google.longrunning.Operations.GetOperation"
263 ]
265 request_kwargs = json_format.MessageToDict(
266 request,
267 preserving_proto_field_name=True,
268 including_default_value_fields=True,
269 )
270 transcoded_request = path_template.transcode(http_options, **request_kwargs)
272 uri = transcoded_request["uri"]
273 method = transcoded_request["method"]
275 # Jsonify the query params
276 query_params_request = operations_pb2.GetOperationRequest()
277 json_format.ParseDict(transcoded_request["query_params"], query_params_request)
278 query_params = json_format.MessageToDict(
279 query_params_request,
280 including_default_value_fields=False,
281 preserving_proto_field_name=False,
282 use_integers_for_enums=False,
283 )
285 # Send the request
286 headers = dict(metadata)
287 headers["Content-Type"] = "application/json"
288 response = getattr(self._session, method)(
289 "{host}{uri}".format(host=self._host, uri=uri),
290 timeout=timeout,
291 headers=headers,
292 params=rest_helpers.flatten_query_params(query_params),
293 )
295 # In case of error, raise the appropriate core_exceptions.GoogleAPICallError exception
296 # subclass.
297 if response.status_code >= 400:
298 raise core_exceptions.from_http_response(response)
300 # Return the response
301 api_response = operations_pb2.Operation()
302 json_format.Parse(response.content, api_response, ignore_unknown_fields=False)
303 return api_response
305 def _delete_operation(
306 self,
307 request: operations_pb2.DeleteOperationRequest,
308 *,
309 retry: OptionalRetry = gapic_v1.method.DEFAULT,
310 timeout: Optional[float] = None,
311 metadata: Sequence[Tuple[str, str]] = (),
312 ) -> empty_pb2.Empty:
313 r"""Call the delete operation method over HTTP.
315 Args:
316 request (~.operations_pb2.DeleteOperationRequest):
317 The request object. The request message for
318 [Operations.DeleteOperation][google.api_core.operations_v1.Operations.DeleteOperation].
320 retry (google.api_core.retry.Retry): Designation of what errors, if any,
321 should be retried.
322 timeout (float): The timeout for this request.
323 metadata (Sequence[Tuple[str, str]]): Strings which should be
324 sent along with the request as metadata.
325 """
327 http_options = [
328 {
329 "method": "delete",
330 "uri": "/{}/{{name=**/operations/*}}".format(self._path_prefix),
331 },
332 ]
333 if "google.longrunning.Operations.DeleteOperation" in self._http_options:
334 http_options = self._http_options[
335 "google.longrunning.Operations.DeleteOperation"
336 ]
338 request_kwargs = json_format.MessageToDict(
339 request,
340 preserving_proto_field_name=True,
341 including_default_value_fields=True,
342 )
343 transcoded_request = path_template.transcode(http_options, **request_kwargs)
345 uri = transcoded_request["uri"]
346 method = transcoded_request["method"]
348 # Jsonify the query params
349 query_params_request = operations_pb2.DeleteOperationRequest()
350 json_format.ParseDict(transcoded_request["query_params"], query_params_request)
351 query_params = json_format.MessageToDict(
352 query_params_request,
353 including_default_value_fields=False,
354 preserving_proto_field_name=False,
355 use_integers_for_enums=False,
356 )
358 # Send the request
359 headers = dict(metadata)
360 headers["Content-Type"] = "application/json"
361 response = getattr(self._session, method)(
362 "{host}{uri}".format(host=self._host, uri=uri),
363 timeout=timeout,
364 headers=headers,
365 params=rest_helpers.flatten_query_params(query_params),
366 )
368 # In case of error, raise the appropriate core_exceptions.GoogleAPICallError exception
369 # subclass.
370 if response.status_code >= 400:
371 raise core_exceptions.from_http_response(response)
373 return empty_pb2.Empty()
375 def _cancel_operation(
376 self,
377 request: operations_pb2.CancelOperationRequest,
378 *,
379 retry: OptionalRetry = gapic_v1.method.DEFAULT,
380 timeout: Optional[float] = None,
381 metadata: Sequence[Tuple[str, str]] = (),
382 ) -> empty_pb2.Empty:
383 r"""Call the cancel operation method over HTTP.
385 Args:
386 request (~.operations_pb2.CancelOperationRequest):
387 The request object. The request message for
388 [Operations.CancelOperation][google.api_core.operations_v1.Operations.CancelOperation].
390 retry (google.api_core.retry.Retry): Designation of what errors, if any,
391 should be retried.
392 timeout (float): The timeout for this request.
393 metadata (Sequence[Tuple[str, str]]): Strings which should be
394 sent along with the request as metadata.
395 """
397 http_options = [
398 {
399 "method": "post",
400 "uri": "/{}/{{name=**/operations/*}}:cancel".format(self._path_prefix),
401 "body": "*",
402 },
403 ]
404 if "google.longrunning.Operations.CancelOperation" in self._http_options:
405 http_options = self._http_options[
406 "google.longrunning.Operations.CancelOperation"
407 ]
409 request_kwargs = json_format.MessageToDict(
410 request,
411 preserving_proto_field_name=True,
412 including_default_value_fields=True,
413 )
414 transcoded_request = path_template.transcode(http_options, **request_kwargs)
416 # Jsonify the request body
417 body_request = operations_pb2.CancelOperationRequest()
418 json_format.ParseDict(transcoded_request["body"], body_request)
419 body = json_format.MessageToDict(
420 body_request,
421 including_default_value_fields=False,
422 preserving_proto_field_name=False,
423 use_integers_for_enums=False,
424 )
425 uri = transcoded_request["uri"]
426 method = transcoded_request["method"]
428 # Jsonify the query params
429 query_params_request = operations_pb2.CancelOperationRequest()
430 json_format.ParseDict(transcoded_request["query_params"], query_params_request)
431 query_params = json_format.MessageToDict(
432 query_params_request,
433 including_default_value_fields=False,
434 preserving_proto_field_name=False,
435 use_integers_for_enums=False,
436 )
438 # Send the request
439 headers = dict(metadata)
440 headers["Content-Type"] = "application/json"
441 response = getattr(self._session, method)(
442 "{host}{uri}".format(host=self._host, uri=uri),
443 timeout=timeout,
444 headers=headers,
445 params=rest_helpers.flatten_query_params(query_params),
446 data=body,
447 )
449 # In case of error, raise the appropriate core_exceptions.GoogleAPICallError exception
450 # subclass.
451 if response.status_code >= 400:
452 raise core_exceptions.from_http_response(response)
454 return empty_pb2.Empty()
456 @property
457 def list_operations(
458 self,
459 ) -> Callable[
460 [operations_pb2.ListOperationsRequest], operations_pb2.ListOperationsResponse
461 ]:
462 return self._list_operations
464 @property
465 def get_operation(
466 self,
467 ) -> Callable[[operations_pb2.GetOperationRequest], operations_pb2.Operation]:
468 return self._get_operation
470 @property
471 def delete_operation(
472 self,
473 ) -> Callable[[operations_pb2.DeleteOperationRequest], empty_pb2.Empty]:
474 return self._delete_operation
476 @property
477 def cancel_operation(
478 self,
479 ) -> Callable[[operations_pb2.CancelOperationRequest], empty_pb2.Empty]:
480 return self._cancel_operation
483__all__ = ("OperationsRestTransport",)