1# -*- coding: utf-8 -*-
2# Copyright 2024 Google LLC
3#
4# Licensed under the Apache License, Version 2.0 (the "License");
5# you may not use this file except in compliance with the License.
6# You may obtain a copy of the License at
7#
8# http://www.apache.org/licenses/LICENSE-2.0
9#
10# Unless required by applicable law or agreed to in writing, software
11# distributed under the License is distributed on an "AS IS" BASIS,
12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13# See the License for the specific language governing permissions and
14# limitations under the License.
15#
16
17import json
18from typing import Any, Callable, Coroutine, Dict, Optional, Sequence, Tuple
19
20from google.auth import __version__ as auth_version
21
22try:
23 from google.auth.aio.transport.sessions import AsyncAuthorizedSession # type: ignore
24except ImportError as e: # pragma: NO COVER
25 raise ImportError(
26 "The `async_rest` extra of `google-api-core` is required to use long-running operations. Install it by running "
27 "`pip install google-api-core[async_rest]`."
28 ) from e
29
30from google.api_core import exceptions as core_exceptions # type: ignore
31from google.api_core import gapic_v1 # type: ignore
32from google.api_core import path_template # type: ignore
33from google.api_core import rest_helpers # type: ignore
34from google.api_core import retry_async as retries_async # type: ignore
35from google.auth.aio import credentials as ga_credentials_async # type: ignore
36from google.longrunning import operations_pb2 # type: ignore
37from google.protobuf import empty_pb2 # type: ignore
38from google.protobuf import json_format # type: ignore
39
40from .base import DEFAULT_CLIENT_INFO as BASE_DEFAULT_CLIENT_INFO, OperationsTransport
41
42DEFAULT_CLIENT_INFO = gapic_v1.client_info.ClientInfo(
43 gapic_version=BASE_DEFAULT_CLIENT_INFO.gapic_version,
44 grpc_version=None,
45 rest_version=f"google-auth@{auth_version}",
46)
47
48
49class AsyncOperationsRestTransport(OperationsTransport):
50 """Asynchronous REST backend transport for Operations.
51
52 Manages async long-running operations with an API service.
53
54 When an API method normally takes long time to complete, it can be
55 designed to return [Operation][google.api_core.operations_v1.Operation] to the
56 client, and the client can use this interface to receive the real
57 response asynchronously by polling the operation resource, or pass
58 the operation resource to another API (such as Google Cloud Pub/Sub
59 API) to receive the response. Any API service that returns
60 long-running operations should implement the ``Operations``
61 interface so developers can have a consistent client experience.
62
63 This class defines the same methods as the primary client, so the
64 primary client can load the underlying transport implementation
65 and call it.
66
67 It sends JSON representations of protocol buffers over HTTP/1.1
68 """
69
70 def __init__(
71 self,
72 *,
73 host: str = "longrunning.googleapis.com",
74 credentials: Optional[ga_credentials_async.Credentials] = None,
75 credentials_file: Optional[str] = None,
76 scopes: Optional[Sequence[str]] = None,
77 client_cert_source_for_mtls: Optional[Callable[[], Tuple[bytes, bytes]]] = None,
78 quota_project_id: Optional[str] = None,
79 client_info: gapic_v1.client_info.ClientInfo = DEFAULT_CLIENT_INFO,
80 always_use_jwt_access: Optional[bool] = False,
81 url_scheme: str = "https",
82 http_options: Optional[Dict] = None,
83 path_prefix: str = "v1",
84 # TODO(https://github.com/googleapis/python-api-core/issues/715): Add docstring for `credentials_file` to async REST transport.
85 # TODO(https://github.com/googleapis/python-api-core/issues/716): Add docstring for `scopes` to async REST transport.
86 # TODO(https://github.com/googleapis/python-api-core/issues/717): Add docstring for `quota_project_id` to async REST transport.
87 # TODO(https://github.com/googleapis/python-api-core/issues/718): Add docstring for `client_cert_source` to async REST transport.
88 ) -> None:
89 """Instantiate the transport.
90
91 Args:
92 host (Optional[str]):
93 The hostname to connect to.
94 credentials (Optional[google.auth.aio.credentials.Credentials]): The
95 authorization credentials to attach to requests. These
96 credentials identify the application to the service; if none
97 are specified, the client will attempt to ascertain the
98 credentials from the environment.
99 client_info (google.api_core.gapic_v1.client_info.ClientInfo):
100 The client info used to send a user-agent string along with
101 API requests. If ``None``, then default info will be used.
102 Generally, you only need to set this if you're developing
103 your own client library.
104 always_use_jwt_access (Optional[bool]): Whether self signed JWT should
105 be used for service account credentials.
106 url_scheme: the protocol scheme for the API endpoint. Normally
107 "https", but for testing or local servers,
108 "http" can be specified.
109 http_options: a dictionary of http_options for transcoding, to override
110 the defaults from operations.proto. Each method has an entry
111 with the corresponding http rules as value.
112 path_prefix: path prefix (usually represents API version). Set to
113 "v1" by default.
114
115 """
116 unsupported_params = {
117 # TODO(https://github.com/googleapis/python-api-core/issues/715): Add support for `credentials_file` to async REST transport.
118 "google.api_core.client_options.ClientOptions.credentials_file": credentials_file,
119 # TODO(https://github.com/googleapis/python-api-core/issues/716): Add support for `scopes` to async REST transport.
120 "google.api_core.client_options.ClientOptions.scopes": scopes,
121 # TODO(https://github.com/googleapis/python-api-core/issues/717): Add support for `quota_project_id` to async REST transport.
122 "google.api_core.client_options.ClientOptions.quota_project_id": quota_project_id,
123 # TODO(https://github.com/googleapis/python-api-core/issues/718): Add support for `client_cert_source` to async REST transport.
124 "google.api_core.client_options.ClientOptions.client_cert_source": client_cert_source_for_mtls,
125 # TODO(https://github.com/googleapis/python-api-core/issues/718): Add support for `client_cert_source` to async REST transport.
126 "google.api_core.client_options.ClientOptions.client_cert_source": client_cert_source_for_mtls,
127 }
128 provided_unsupported_params = [
129 name for name, value in unsupported_params.items() if value is not None
130 ]
131 if provided_unsupported_params:
132 raise core_exceptions.AsyncRestUnsupportedParameterError(
133 f"The following provided parameters are not supported for `transport=rest_asyncio`: {', '.join(provided_unsupported_params)}"
134 )
135
136 super().__init__(
137 host=host,
138 # TODO(https://github.com/googleapis/python-api-core/issues/709): Remove `type: ignore` when the linked issue is resolved.
139 credentials=credentials, # type: ignore
140 client_info=client_info,
141 # TODO(https://github.com/googleapis/python-api-core/issues/725): Set always_use_jwt_access token when supported.
142 always_use_jwt_access=False,
143 )
144 # TODO(https://github.com/googleapis/python-api-core/issues/708): add support for
145 # `default_host` in AsyncAuthorizedSession for feature parity with the synchronous
146 # code.
147 # TODO(https://github.com/googleapis/python-api-core/issues/709): Remove `type: ignore` when the linked issue is resolved.
148 self._session = AsyncAuthorizedSession(self._credentials) # type: ignore
149 # TODO(https://github.com/googleapis/python-api-core/issues/720): Add wrap logic directly to the property methods for callables.
150 self._prep_wrapped_messages(client_info)
151 self._http_options = http_options or {}
152 self._path_prefix = path_prefix
153
154 def _prep_wrapped_messages(self, client_info):
155 # Precompute the wrapped methods.
156 self._wrapped_methods = {
157 self.list_operations: gapic_v1.method_async.wrap_method(
158 self.list_operations,
159 default_retry=retries_async.AsyncRetry(
160 initial=0.5,
161 maximum=10.0,
162 multiplier=2.0,
163 predicate=retries_async.if_exception_type(
164 core_exceptions.ServiceUnavailable,
165 ),
166 deadline=10.0,
167 ),
168 default_timeout=10.0,
169 client_info=client_info,
170 kind="rest_asyncio",
171 ),
172 self.get_operation: gapic_v1.method_async.wrap_method(
173 self.get_operation,
174 default_retry=retries_async.AsyncRetry(
175 initial=0.5,
176 maximum=10.0,
177 multiplier=2.0,
178 predicate=retries_async.if_exception_type(
179 core_exceptions.ServiceUnavailable,
180 ),
181 deadline=10.0,
182 ),
183 default_timeout=10.0,
184 client_info=client_info,
185 kind="rest_asyncio",
186 ),
187 self.delete_operation: gapic_v1.method_async.wrap_method(
188 self.delete_operation,
189 default_retry=retries_async.AsyncRetry(
190 initial=0.5,
191 maximum=10.0,
192 multiplier=2.0,
193 predicate=retries_async.if_exception_type(
194 core_exceptions.ServiceUnavailable,
195 ),
196 deadline=10.0,
197 ),
198 default_timeout=10.0,
199 client_info=client_info,
200 kind="rest_asyncio",
201 ),
202 self.cancel_operation: gapic_v1.method_async.wrap_method(
203 self.cancel_operation,
204 default_retry=retries_async.AsyncRetry(
205 initial=0.5,
206 maximum=10.0,
207 multiplier=2.0,
208 predicate=retries_async.if_exception_type(
209 core_exceptions.ServiceUnavailable,
210 ),
211 deadline=10.0,
212 ),
213 default_timeout=10.0,
214 client_info=client_info,
215 kind="rest_asyncio",
216 ),
217 }
218
219 async def _list_operations(
220 self,
221 request: operations_pb2.ListOperationsRequest,
222 *,
223 # TODO(https://github.com/googleapis/python-api-core/issues/722): Leverage `retry`
224 # to allow configuring retryable error codes.
225 retry=gapic_v1.method_async.DEFAULT,
226 timeout: Optional[float] = None,
227 metadata: Sequence[Tuple[str, str]] = (),
228 ) -> operations_pb2.ListOperationsResponse:
229 r"""Asynchronously call the list operations method over HTTP.
230
231 Args:
232 request (~.operations_pb2.ListOperationsRequest):
233 The request object. The request message for
234 [Operations.ListOperations][google.api_core.operations_v1.Operations.ListOperations].
235 timeout (float): The timeout for this request.
236 metadata (Sequence[Tuple[str, str]]): Strings which should be
237 sent along with the request as metadata.
238
239 Returns:
240 ~.operations_pb2.ListOperationsResponse:
241 The response message for
242 [Operations.ListOperations][google.api_core.operations_v1.Operations.ListOperations].
243
244 """
245
246 http_options = [
247 {
248 "method": "get",
249 "uri": "/{}/{{name=**}}/operations".format(self._path_prefix),
250 },
251 ]
252 if "google.longrunning.Operations.ListOperations" in self._http_options:
253 http_options = self._http_options[
254 "google.longrunning.Operations.ListOperations"
255 ]
256
257 request_kwargs = self._convert_protobuf_message_to_dict(request)
258 transcoded_request = path_template.transcode(http_options, **request_kwargs)
259
260 uri = transcoded_request["uri"]
261 method = transcoded_request["method"]
262
263 # Jsonify the query params
264 query_params_request = operations_pb2.ListOperationsRequest()
265 json_format.ParseDict(transcoded_request["query_params"], query_params_request)
266 query_params = json_format.MessageToDict(
267 query_params_request,
268 preserving_proto_field_name=False,
269 use_integers_for_enums=False,
270 )
271
272 # Send the request
273 headers = dict(metadata)
274 headers["Content-Type"] = "application/json"
275 # TODO(https://github.com/googleapis/python-api-core/issues/721): Update incorrect use of `uri`` variable name.
276 response = await getattr(self._session, method)(
277 "{host}{uri}".format(host=self._host, uri=uri),
278 timeout=timeout,
279 headers=headers,
280 params=rest_helpers.flatten_query_params(query_params),
281 )
282 content = await response.read()
283
284 # In case of error, raise the appropriate core_exceptions.GoogleAPICallError exception
285 # subclass.
286 if response.status_code >= 400:
287 payload = json.loads(content.decode("utf-8"))
288 request_url = "{host}{uri}".format(host=self._host, uri=uri)
289 raise core_exceptions.format_http_response_error(response, method, request_url, payload) # type: ignore
290
291 # Return the response
292 api_response = operations_pb2.ListOperationsResponse()
293 json_format.Parse(content, api_response, ignore_unknown_fields=False)
294 return api_response
295
296 async def _get_operation(
297 self,
298 request: operations_pb2.GetOperationRequest,
299 *,
300 # TODO(https://github.com/googleapis/python-api-core/issues/722): Leverage `retry`
301 # to allow configuring retryable error codes.
302 retry=gapic_v1.method_async.DEFAULT,
303 timeout: Optional[float] = None,
304 metadata: Sequence[Tuple[str, str]] = (),
305 ) -> operations_pb2.Operation:
306 r"""Asynchronously call the get operation method over HTTP.
307
308 Args:
309 request (~.operations_pb2.GetOperationRequest):
310 The request object. The request message for
311 [Operations.GetOperation][google.api_core.operations_v1.Operations.GetOperation].
312 timeout (float): The timeout for this request.
313 metadata (Sequence[Tuple[str, str]]): Strings which should be
314 sent along with the request as metadata.
315
316 Returns:
317 ~.operations_pb2.Operation:
318 This resource represents a long-
319 running operation that is the result of a
320 network API call.
321
322 """
323
324 http_options = [
325 {
326 "method": "get",
327 "uri": "/{}/{{name=**/operations/*}}".format(self._path_prefix),
328 },
329 ]
330 if "google.longrunning.Operations.GetOperation" in self._http_options:
331 http_options = self._http_options[
332 "google.longrunning.Operations.GetOperation"
333 ]
334
335 request_kwargs = self._convert_protobuf_message_to_dict(request)
336 transcoded_request = path_template.transcode(http_options, **request_kwargs)
337
338 uri = transcoded_request["uri"]
339 method = transcoded_request["method"]
340
341 # Jsonify the query params
342 query_params_request = operations_pb2.GetOperationRequest()
343 json_format.ParseDict(transcoded_request["query_params"], query_params_request)
344 query_params = json_format.MessageToDict(
345 query_params_request,
346 preserving_proto_field_name=False,
347 use_integers_for_enums=False,
348 )
349
350 # Send the request
351 headers = dict(metadata)
352 headers["Content-Type"] = "application/json"
353 # TODO(https://github.com/googleapis/python-api-core/issues/721): Update incorrect use of `uri`` variable name.
354 response = await getattr(self._session, method)(
355 "{host}{uri}".format(host=self._host, uri=uri),
356 timeout=timeout,
357 headers=headers,
358 params=rest_helpers.flatten_query_params(query_params),
359 )
360 content = await response.read()
361
362 # In case of error, raise the appropriate core_exceptions.GoogleAPICallError exception
363 # subclass.
364 if response.status_code >= 400:
365 payload = json.loads(content.decode("utf-8"))
366 request_url = "{host}{uri}".format(host=self._host, uri=uri)
367 raise core_exceptions.format_http_response_error(response, method, request_url, payload) # type: ignore
368
369 # Return the response
370 api_response = operations_pb2.Operation()
371 json_format.Parse(content, api_response, ignore_unknown_fields=False)
372 return api_response
373
374 async def _delete_operation(
375 self,
376 request: operations_pb2.DeleteOperationRequest,
377 *,
378 # TODO(https://github.com/googleapis/python-api-core/issues/722): Leverage `retry`
379 # to allow configuring retryable error codes.
380 retry=gapic_v1.method_async.DEFAULT,
381 timeout: Optional[float] = None,
382 metadata: Sequence[Tuple[str, str]] = (),
383 ) -> empty_pb2.Empty:
384 r"""Asynchronously call the delete operation method over HTTP.
385
386 Args:
387 request (~.operations_pb2.DeleteOperationRequest):
388 The request object. The request message for
389 [Operations.DeleteOperation][google.api_core.operations_v1.Operations.DeleteOperation].
390
391 retry (google.api_core.retry.Retry): Designation of what errors, if any,
392 should be retried.
393 timeout (float): The timeout for this request.
394 metadata (Sequence[Tuple[str, str]]): Strings which should be
395 sent along with the request as metadata.
396 """
397
398 http_options = [
399 {
400 "method": "delete",
401 "uri": "/{}/{{name=**/operations/*}}".format(self._path_prefix),
402 },
403 ]
404 if "google.longrunning.Operations.DeleteOperation" in self._http_options:
405 http_options = self._http_options[
406 "google.longrunning.Operations.DeleteOperation"
407 ]
408
409 request_kwargs = self._convert_protobuf_message_to_dict(request)
410 transcoded_request = path_template.transcode(http_options, **request_kwargs)
411
412 uri = transcoded_request["uri"]
413 method = transcoded_request["method"]
414
415 # Jsonify the query params
416 query_params_request = operations_pb2.DeleteOperationRequest()
417 json_format.ParseDict(transcoded_request["query_params"], query_params_request)
418 query_params = json_format.MessageToDict(
419 query_params_request,
420 preserving_proto_field_name=False,
421 use_integers_for_enums=False,
422 )
423
424 # Send the request
425 headers = dict(metadata)
426 headers["Content-Type"] = "application/json"
427 # TODO(https://github.com/googleapis/python-api-core/issues/721): Update incorrect use of `uri`` variable name.
428 response = await getattr(self._session, method)(
429 "{host}{uri}".format(host=self._host, uri=uri),
430 timeout=timeout,
431 headers=headers,
432 params=rest_helpers.flatten_query_params(query_params),
433 )
434
435 # In case of error, raise the appropriate core_exceptions.GoogleAPICallError exception
436 # subclass.
437 if response.status_code >= 400:
438 content = await response.read()
439 payload = json.loads(content.decode("utf-8"))
440 request_url = "{host}{uri}".format(host=self._host, uri=uri)
441 raise core_exceptions.format_http_response_error(response, method, request_url, payload) # type: ignore
442
443 return empty_pb2.Empty()
444
445 async def _cancel_operation(
446 self,
447 request: operations_pb2.CancelOperationRequest,
448 *,
449 # TODO(https://github.com/googleapis/python-api-core/issues/722): Leverage `retry`
450 # to allow configuring retryable error codes.
451 retry=gapic_v1.method_async.DEFAULT,
452 timeout: Optional[float] = None,
453 metadata: Sequence[Tuple[str, str]] = (),
454 # TODO(https://github.com/googleapis/python-api-core/issues/722): Add `retry` parameter
455 # to allow configuring retryable error codes.
456 ) -> empty_pb2.Empty:
457 r"""Asynchronously call the cancel operation method over HTTP.
458
459 Args:
460 request (~.operations_pb2.CancelOperationRequest):
461 The request object. The request message for
462 [Operations.CancelOperation][google.api_core.operations_v1.Operations.CancelOperation].
463 timeout (float): The timeout for this request.
464 metadata (Sequence[Tuple[str, str]]): Strings which should be
465 sent along with the request as metadata.
466 """
467
468 http_options = [
469 {
470 "method": "post",
471 "uri": "/{}/{{name=**/operations/*}}:cancel".format(self._path_prefix),
472 "body": "*",
473 },
474 ]
475 if "google.longrunning.Operations.CancelOperation" in self._http_options:
476 http_options = self._http_options[
477 "google.longrunning.Operations.CancelOperation"
478 ]
479
480 request_kwargs = self._convert_protobuf_message_to_dict(request)
481 transcoded_request = path_template.transcode(http_options, **request_kwargs)
482
483 # Jsonify the request body
484 body_request = operations_pb2.CancelOperationRequest()
485 json_format.ParseDict(transcoded_request["body"], body_request)
486 body = json_format.MessageToDict(
487 body_request,
488 preserving_proto_field_name=False,
489 use_integers_for_enums=False,
490 )
491 uri = transcoded_request["uri"]
492 method = transcoded_request["method"]
493
494 # Jsonify the query params
495 query_params_request = operations_pb2.CancelOperationRequest()
496 json_format.ParseDict(transcoded_request["query_params"], query_params_request)
497 query_params = json_format.MessageToDict(
498 query_params_request,
499 preserving_proto_field_name=False,
500 use_integers_for_enums=False,
501 )
502
503 # Send the request
504 headers = dict(metadata)
505 headers["Content-Type"] = "application/json"
506 # TODO(https://github.com/googleapis/python-api-core/issues/721): Update incorrect use of `uri`` variable name.
507 response = await getattr(self._session, method)(
508 "{host}{uri}".format(host=self._host, uri=uri),
509 timeout=timeout,
510 headers=headers,
511 params=rest_helpers.flatten_query_params(query_params),
512 data=body,
513 )
514
515 # In case of error, raise the appropriate core_exceptions.GoogleAPICallError exception
516 # subclass.
517 if response.status_code >= 400:
518 content = await response.read()
519 payload = json.loads(content.decode("utf-8"))
520 request_url = "{host}{uri}".format(host=self._host, uri=uri)
521 raise core_exceptions.format_http_response_error(response, method, request_url, payload) # type: ignore
522
523 return empty_pb2.Empty()
524
525 @property
526 def list_operations(
527 self,
528 ) -> Callable[
529 [operations_pb2.ListOperationsRequest],
530 Coroutine[Any, Any, operations_pb2.ListOperationsResponse],
531 ]:
532 return self._list_operations
533
534 @property
535 def get_operation(
536 self,
537 ) -> Callable[
538 [operations_pb2.GetOperationRequest],
539 Coroutine[Any, Any, operations_pb2.Operation],
540 ]:
541 return self._get_operation
542
543 @property
544 def delete_operation(
545 self,
546 ) -> Callable[
547 [operations_pb2.DeleteOperationRequest], Coroutine[Any, Any, empty_pb2.Empty]
548 ]:
549 return self._delete_operation
550
551 @property
552 def cancel_operation(
553 self,
554 ) -> Callable[
555 [operations_pb2.CancelOperationRequest], Coroutine[Any, Any, empty_pb2.Empty]
556 ]:
557 return self._cancel_operation
558
559
560__all__ = ("AsyncOperationsRestTransport",)