1# -*- coding: utf-8 -*-
2# Copyright 2024 Google LLC
3#
4# Licensed under the Apache License, Version 2.0 (the "License");
5# you may not use this file except in compliance with the License.
6# You may obtain a copy of the License at
7#
8# http://www.apache.org/licenses/LICENSE-2.0
9#
10# Unless required by applicable law or agreed to in writing, software
11# distributed under the License is distributed on an "AS IS" BASIS,
12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13# See the License for the specific language governing permissions and
14# limitations under the License.
15#
16
17import json
18from typing import Any, Callable, Coroutine, Dict, Optional, Sequence, Tuple
19import warnings
20
21from google.auth import __version__ as auth_version
22
23try:
24 from google.auth.aio.transport.sessions import AsyncAuthorizedSession # type: ignore
25except ImportError as e: # pragma: NO COVER
26 raise ImportError(
27 "The `async_rest` extra of `google-api-core` is required to use long-running operations. Install it by running "
28 "`pip install google-api-core[async_rest]`."
29 ) from e
30
31from google.api_core import exceptions as core_exceptions # type: ignore
32from google.api_core import gapic_v1 # type: ignore
33from google.api_core import general_helpers
34from google.api_core import path_template # type: ignore
35from google.api_core import rest_helpers # type: ignore
36from google.api_core import retry_async as retries_async # type: ignore
37from google.auth.aio import credentials as ga_credentials_async # type: ignore
38from google.longrunning import operations_pb2 # type: ignore
39from google.protobuf import empty_pb2 # type: ignore
40from google.protobuf import json_format # type: ignore
41
42from .base import DEFAULT_CLIENT_INFO as BASE_DEFAULT_CLIENT_INFO, OperationsTransport
43
44DEFAULT_CLIENT_INFO = gapic_v1.client_info.ClientInfo(
45 gapic_version=BASE_DEFAULT_CLIENT_INFO.gapic_version,
46 grpc_version=None,
47 rest_version=f"google-auth@{auth_version}",
48)
49
50
51class AsyncOperationsRestTransport(OperationsTransport):
52 """Asynchronous REST backend transport for Operations.
53
54 Manages async long-running operations with an API service.
55
56 When an API method normally takes long time to complete, it can be
57 designed to return [Operation][google.api_core.operations_v1.Operation] to the
58 client, and the client can use this interface to receive the real
59 response asynchronously by polling the operation resource, or pass
60 the operation resource to another API (such as Google Cloud Pub/Sub
61 API) to receive the response. Any API service that returns
62 long-running operations should implement the ``Operations``
63 interface so developers can have a consistent client experience.
64
65 This class defines the same methods as the primary client, so the
66 primary client can load the underlying transport implementation
67 and call it.
68
69 It sends JSON representations of protocol buffers over HTTP/1.1
70 """
71
72 def __init__(
73 self,
74 *,
75 host: str = "longrunning.googleapis.com",
76 credentials: Optional[ga_credentials_async.Credentials] = None,
77 credentials_file: Optional[str] = None,
78 scopes: Optional[Sequence[str]] = None,
79 client_cert_source_for_mtls: Optional[Callable[[], Tuple[bytes, bytes]]] = None,
80 quota_project_id: Optional[str] = None,
81 client_info: gapic_v1.client_info.ClientInfo = DEFAULT_CLIENT_INFO,
82 always_use_jwt_access: Optional[bool] = False,
83 url_scheme: str = "https",
84 http_options: Optional[Dict] = None,
85 path_prefix: str = "v1",
86 # TODO(https://github.com/googleapis/python-api-core/issues/715): Add docstring for `credentials_file` to async REST transport.
87 # TODO(https://github.com/googleapis/python-api-core/issues/716): Add docstring for `scopes` to async REST transport.
88 # TODO(https://github.com/googleapis/python-api-core/issues/717): Add docstring for `quota_project_id` to async REST transport.
89 # TODO(https://github.com/googleapis/python-api-core/issues/718): Add docstring for `client_cert_source` to async REST transport.
90 ) -> None:
91 """Instantiate the transport.
92
93 Args:
94 host (Optional[str]):
95 The hostname to connect to.
96 credentials (Optional[google.auth.aio.credentials.Credentials]): The
97 authorization credentials to attach to requests. These
98 credentials identify the application to the service; if none
99 are specified, the client will attempt to ascertain the
100 credentials from the environment.
101 credentials_file (Optional[str]): Deprecated. A file with credentials that can
102 be loaded with :func:`google.auth.load_credentials_from_file`.
103 This argument is ignored if ``channel`` is provided. This argument will be
104 removed in the next major version of `google-api-core`.
105
106 .. warning::
107 Important: If you accept a credential configuration (credential JSON/File/Stream)
108 from an external source for authentication to Google Cloud Platform, you must
109 validate it before providing it to any Google API or client library. Providing an
110 unvalidated credential configuration to Google APIs or libraries can compromise
111 the security of your systems and data. For more information, refer to
112 `Validate credential configurations from external sources`_.
113
114 .. _Validate credential configurations from external sources:
115
116 https://cloud.google.com/docs/authentication/external/externally-sourced-credentials
117 client_info (google.api_core.gapic_v1.client_info.ClientInfo):
118 The client info used to send a user-agent string along with
119 API requests. If ``None``, then default info will be used.
120 Generally, you only need to set this if you're developing
121 your own client library.
122 always_use_jwt_access (Optional[bool]): Whether self signed JWT should
123 be used for service account credentials.
124 url_scheme: the protocol scheme for the API endpoint. Normally
125 "https", but for testing or local servers,
126 "http" can be specified.
127 http_options: a dictionary of http_options for transcoding, to override
128 the defaults from operations.proto. Each method has an entry
129 with the corresponding http rules as value.
130 path_prefix: path prefix (usually represents API version). Set to
131 "v1" by default.
132
133 """
134 if credentials_file is not None:
135 warnings.warn(general_helpers._CREDENTIALS_FILE_WARNING, DeprecationWarning)
136
137 unsupported_params = {
138 # TODO(https://github.com/googleapis/python-api-core/issues/715): Add support for `credentials_file` to async REST transport.
139 "google.api_core.client_options.ClientOptions.credentials_file": credentials_file,
140 # TODO(https://github.com/googleapis/python-api-core/issues/716): Add support for `scopes` to async REST transport.
141 "google.api_core.client_options.ClientOptions.scopes": scopes,
142 # TODO(https://github.com/googleapis/python-api-core/issues/717): Add support for `quota_project_id` to async REST transport.
143 "google.api_core.client_options.ClientOptions.quota_project_id": quota_project_id,
144 # TODO(https://github.com/googleapis/python-api-core/issues/718): Add support for `client_cert_source` to async REST transport.
145 "google.api_core.client_options.ClientOptions.client_cert_source": client_cert_source_for_mtls,
146 # TODO(https://github.com/googleapis/python-api-core/issues/718): Add support for `client_cert_source` to async REST transport.
147 "google.api_core.client_options.ClientOptions.client_cert_source": client_cert_source_for_mtls,
148 }
149 provided_unsupported_params = [
150 name for name, value in unsupported_params.items() if value is not None
151 ]
152 if provided_unsupported_params:
153 raise core_exceptions.AsyncRestUnsupportedParameterError(
154 f"The following provided parameters are not supported for `transport=rest_asyncio`: {', '.join(provided_unsupported_params)}"
155 )
156
157 super().__init__(
158 host=host,
159 # TODO(https://github.com/googleapis/python-api-core/issues/709): Remove `type: ignore` when the linked issue is resolved.
160 credentials=credentials, # type: ignore
161 client_info=client_info,
162 # TODO(https://github.com/googleapis/python-api-core/issues/725): Set always_use_jwt_access token when supported.
163 always_use_jwt_access=False,
164 )
165 # TODO(https://github.com/googleapis/python-api-core/issues/708): add support for
166 # `default_host` in AsyncAuthorizedSession for feature parity with the synchronous
167 # code.
168 # TODO(https://github.com/googleapis/python-api-core/issues/709): Remove `type: ignore` when the linked issue is resolved.
169 self._session = AsyncAuthorizedSession(self._credentials) # type: ignore
170 # TODO(https://github.com/googleapis/python-api-core/issues/720): Add wrap logic directly to the property methods for callables.
171 self._prep_wrapped_messages(client_info)
172 self._http_options = http_options or {}
173 self._path_prefix = path_prefix
174
175 def _prep_wrapped_messages(self, client_info):
176 # Precompute the wrapped methods.
177 self._wrapped_methods = {
178 self.list_operations: gapic_v1.method_async.wrap_method(
179 self.list_operations,
180 default_retry=retries_async.AsyncRetry(
181 initial=0.5,
182 maximum=10.0,
183 multiplier=2.0,
184 predicate=retries_async.if_exception_type(
185 core_exceptions.ServiceUnavailable,
186 ),
187 deadline=10.0,
188 ),
189 default_timeout=10.0,
190 client_info=client_info,
191 kind="rest_asyncio",
192 ),
193 self.get_operation: gapic_v1.method_async.wrap_method(
194 self.get_operation,
195 default_retry=retries_async.AsyncRetry(
196 initial=0.5,
197 maximum=10.0,
198 multiplier=2.0,
199 predicate=retries_async.if_exception_type(
200 core_exceptions.ServiceUnavailable,
201 ),
202 deadline=10.0,
203 ),
204 default_timeout=10.0,
205 client_info=client_info,
206 kind="rest_asyncio",
207 ),
208 self.delete_operation: gapic_v1.method_async.wrap_method(
209 self.delete_operation,
210 default_retry=retries_async.AsyncRetry(
211 initial=0.5,
212 maximum=10.0,
213 multiplier=2.0,
214 predicate=retries_async.if_exception_type(
215 core_exceptions.ServiceUnavailable,
216 ),
217 deadline=10.0,
218 ),
219 default_timeout=10.0,
220 client_info=client_info,
221 kind="rest_asyncio",
222 ),
223 self.cancel_operation: gapic_v1.method_async.wrap_method(
224 self.cancel_operation,
225 default_retry=retries_async.AsyncRetry(
226 initial=0.5,
227 maximum=10.0,
228 multiplier=2.0,
229 predicate=retries_async.if_exception_type(
230 core_exceptions.ServiceUnavailable,
231 ),
232 deadline=10.0,
233 ),
234 default_timeout=10.0,
235 client_info=client_info,
236 kind="rest_asyncio",
237 ),
238 }
239
240 async def _list_operations(
241 self,
242 request: operations_pb2.ListOperationsRequest,
243 *,
244 # TODO(https://github.com/googleapis/python-api-core/issues/722): Leverage `retry`
245 # to allow configuring retryable error codes.
246 retry=gapic_v1.method_async.DEFAULT,
247 timeout: Optional[float] = None,
248 metadata: Sequence[Tuple[str, str]] = (),
249 ) -> operations_pb2.ListOperationsResponse:
250 r"""Asynchronously call the list operations method over HTTP.
251
252 Args:
253 request (~.operations_pb2.ListOperationsRequest):
254 The request object. The request message for
255 [Operations.ListOperations][google.api_core.operations_v1.Operations.ListOperations].
256 timeout (float): The timeout for this request.
257 metadata (Sequence[Tuple[str, str]]): Strings which should be
258 sent along with the request as metadata.
259
260 Returns:
261 ~.operations_pb2.ListOperationsResponse:
262 The response message for
263 [Operations.ListOperations][google.api_core.operations_v1.Operations.ListOperations].
264
265 """
266
267 http_options = [
268 {
269 "method": "get",
270 "uri": "/{}/{{name=**}}/operations".format(self._path_prefix),
271 },
272 ]
273 if "google.longrunning.Operations.ListOperations" in self._http_options:
274 http_options = self._http_options[
275 "google.longrunning.Operations.ListOperations"
276 ]
277
278 request_kwargs = self._convert_protobuf_message_to_dict(request)
279 transcoded_request = path_template.transcode(http_options, **request_kwargs)
280
281 uri = transcoded_request["uri"]
282 method = transcoded_request["method"]
283
284 # Jsonify the query params
285 query_params_request = operations_pb2.ListOperationsRequest()
286 json_format.ParseDict(transcoded_request["query_params"], query_params_request)
287 query_params = json_format.MessageToDict(
288 query_params_request,
289 preserving_proto_field_name=False,
290 use_integers_for_enums=False,
291 )
292
293 # Send the request
294 headers = dict(metadata)
295 headers["Content-Type"] = "application/json"
296 # TODO(https://github.com/googleapis/python-api-core/issues/721): Update incorrect use of `uri`` variable name.
297 response = await getattr(self._session, method)(
298 "{host}{uri}".format(host=self._host, uri=uri),
299 timeout=timeout,
300 headers=headers,
301 params=rest_helpers.flatten_query_params(query_params),
302 )
303 content = await response.read()
304
305 # In case of error, raise the appropriate core_exceptions.GoogleAPICallError exception
306 # subclass.
307 if response.status_code >= 400:
308 payload = json.loads(content.decode("utf-8"))
309 request_url = "{host}{uri}".format(host=self._host, uri=uri)
310 raise core_exceptions.format_http_response_error(response, method, request_url, payload) # type: ignore
311
312 # Return the response
313 api_response = operations_pb2.ListOperationsResponse()
314 json_format.Parse(content, api_response, ignore_unknown_fields=False)
315 return api_response
316
317 async def _get_operation(
318 self,
319 request: operations_pb2.GetOperationRequest,
320 *,
321 # TODO(https://github.com/googleapis/python-api-core/issues/722): Leverage `retry`
322 # to allow configuring retryable error codes.
323 retry=gapic_v1.method_async.DEFAULT,
324 timeout: Optional[float] = None,
325 metadata: Sequence[Tuple[str, str]] = (),
326 ) -> operations_pb2.Operation:
327 r"""Asynchronously call the get operation method over HTTP.
328
329 Args:
330 request (~.operations_pb2.GetOperationRequest):
331 The request object. The request message for
332 [Operations.GetOperation][google.api_core.operations_v1.Operations.GetOperation].
333 timeout (float): The timeout for this request.
334 metadata (Sequence[Tuple[str, str]]): Strings which should be
335 sent along with the request as metadata.
336
337 Returns:
338 ~.operations_pb2.Operation:
339 This resource represents a long-
340 running operation that is the result of a
341 network API call.
342
343 """
344
345 http_options = [
346 {
347 "method": "get",
348 "uri": "/{}/{{name=**/operations/*}}".format(self._path_prefix),
349 },
350 ]
351 if "google.longrunning.Operations.GetOperation" in self._http_options:
352 http_options = self._http_options[
353 "google.longrunning.Operations.GetOperation"
354 ]
355
356 request_kwargs = self._convert_protobuf_message_to_dict(request)
357 transcoded_request = path_template.transcode(http_options, **request_kwargs)
358
359 uri = transcoded_request["uri"]
360 method = transcoded_request["method"]
361
362 # Jsonify the query params
363 query_params_request = operations_pb2.GetOperationRequest()
364 json_format.ParseDict(transcoded_request["query_params"], query_params_request)
365 query_params = json_format.MessageToDict(
366 query_params_request,
367 preserving_proto_field_name=False,
368 use_integers_for_enums=False,
369 )
370
371 # Send the request
372 headers = dict(metadata)
373 headers["Content-Type"] = "application/json"
374 # TODO(https://github.com/googleapis/python-api-core/issues/721): Update incorrect use of `uri`` variable name.
375 response = await getattr(self._session, method)(
376 "{host}{uri}".format(host=self._host, uri=uri),
377 timeout=timeout,
378 headers=headers,
379 params=rest_helpers.flatten_query_params(query_params),
380 )
381 content = await response.read()
382
383 # In case of error, raise the appropriate core_exceptions.GoogleAPICallError exception
384 # subclass.
385 if response.status_code >= 400:
386 payload = json.loads(content.decode("utf-8"))
387 request_url = "{host}{uri}".format(host=self._host, uri=uri)
388 raise core_exceptions.format_http_response_error(response, method, request_url, payload) # type: ignore
389
390 # Return the response
391 api_response = operations_pb2.Operation()
392 json_format.Parse(content, api_response, ignore_unknown_fields=False)
393 return api_response
394
395 async def _delete_operation(
396 self,
397 request: operations_pb2.DeleteOperationRequest,
398 *,
399 # TODO(https://github.com/googleapis/python-api-core/issues/722): Leverage `retry`
400 # to allow configuring retryable error codes.
401 retry=gapic_v1.method_async.DEFAULT,
402 timeout: Optional[float] = None,
403 metadata: Sequence[Tuple[str, str]] = (),
404 ) -> empty_pb2.Empty:
405 r"""Asynchronously call the delete operation method over HTTP.
406
407 Args:
408 request (~.operations_pb2.DeleteOperationRequest):
409 The request object. The request message for
410 [Operations.DeleteOperation][google.api_core.operations_v1.Operations.DeleteOperation].
411
412 retry (google.api_core.retry.Retry): Designation of what errors, if any,
413 should be retried.
414 timeout (float): The timeout for this request.
415 metadata (Sequence[Tuple[str, str]]): Strings which should be
416 sent along with the request as metadata.
417 """
418
419 http_options = [
420 {
421 "method": "delete",
422 "uri": "/{}/{{name=**/operations/*}}".format(self._path_prefix),
423 },
424 ]
425 if "google.longrunning.Operations.DeleteOperation" in self._http_options:
426 http_options = self._http_options[
427 "google.longrunning.Operations.DeleteOperation"
428 ]
429
430 request_kwargs = self._convert_protobuf_message_to_dict(request)
431 transcoded_request = path_template.transcode(http_options, **request_kwargs)
432
433 uri = transcoded_request["uri"]
434 method = transcoded_request["method"]
435
436 # Jsonify the query params
437 query_params_request = operations_pb2.DeleteOperationRequest()
438 json_format.ParseDict(transcoded_request["query_params"], query_params_request)
439 query_params = json_format.MessageToDict(
440 query_params_request,
441 preserving_proto_field_name=False,
442 use_integers_for_enums=False,
443 )
444
445 # Send the request
446 headers = dict(metadata)
447 headers["Content-Type"] = "application/json"
448 # TODO(https://github.com/googleapis/python-api-core/issues/721): Update incorrect use of `uri`` variable name.
449 response = await getattr(self._session, method)(
450 "{host}{uri}".format(host=self._host, uri=uri),
451 timeout=timeout,
452 headers=headers,
453 params=rest_helpers.flatten_query_params(query_params),
454 )
455
456 # In case of error, raise the appropriate core_exceptions.GoogleAPICallError exception
457 # subclass.
458 if response.status_code >= 400:
459 content = await response.read()
460 payload = json.loads(content.decode("utf-8"))
461 request_url = "{host}{uri}".format(host=self._host, uri=uri)
462 raise core_exceptions.format_http_response_error(response, method, request_url, payload) # type: ignore
463
464 return empty_pb2.Empty()
465
466 async def _cancel_operation(
467 self,
468 request: operations_pb2.CancelOperationRequest,
469 *,
470 # TODO(https://github.com/googleapis/python-api-core/issues/722): Leverage `retry`
471 # to allow configuring retryable error codes.
472 retry=gapic_v1.method_async.DEFAULT,
473 timeout: Optional[float] = None,
474 metadata: Sequence[Tuple[str, str]] = (),
475 # TODO(https://github.com/googleapis/python-api-core/issues/722): Add `retry` parameter
476 # to allow configuring retryable error codes.
477 ) -> empty_pb2.Empty:
478 r"""Asynchronously call the cancel operation method over HTTP.
479
480 Args:
481 request (~.operations_pb2.CancelOperationRequest):
482 The request object. The request message for
483 [Operations.CancelOperation][google.api_core.operations_v1.Operations.CancelOperation].
484 timeout (float): The timeout for this request.
485 metadata (Sequence[Tuple[str, str]]): Strings which should be
486 sent along with the request as metadata.
487 """
488
489 http_options = [
490 {
491 "method": "post",
492 "uri": "/{}/{{name=**/operations/*}}:cancel".format(self._path_prefix),
493 "body": "*",
494 },
495 ]
496 if "google.longrunning.Operations.CancelOperation" in self._http_options:
497 http_options = self._http_options[
498 "google.longrunning.Operations.CancelOperation"
499 ]
500
501 request_kwargs = self._convert_protobuf_message_to_dict(request)
502 transcoded_request = path_template.transcode(http_options, **request_kwargs)
503
504 # Jsonify the request body
505 body_request = operations_pb2.CancelOperationRequest()
506 json_format.ParseDict(transcoded_request["body"], body_request)
507 body = json_format.MessageToDict(
508 body_request,
509 preserving_proto_field_name=False,
510 use_integers_for_enums=False,
511 )
512 uri = transcoded_request["uri"]
513 method = transcoded_request["method"]
514
515 # Jsonify the query params
516 query_params_request = operations_pb2.CancelOperationRequest()
517 json_format.ParseDict(transcoded_request["query_params"], query_params_request)
518 query_params = json_format.MessageToDict(
519 query_params_request,
520 preserving_proto_field_name=False,
521 use_integers_for_enums=False,
522 )
523
524 # Send the request
525 headers = dict(metadata)
526 headers["Content-Type"] = "application/json"
527 # TODO(https://github.com/googleapis/python-api-core/issues/721): Update incorrect use of `uri`` variable name.
528 response = await getattr(self._session, method)(
529 "{host}{uri}".format(host=self._host, uri=uri),
530 timeout=timeout,
531 headers=headers,
532 params=rest_helpers.flatten_query_params(query_params),
533 data=body,
534 )
535
536 # In case of error, raise the appropriate core_exceptions.GoogleAPICallError exception
537 # subclass.
538 if response.status_code >= 400:
539 content = await response.read()
540 payload = json.loads(content.decode("utf-8"))
541 request_url = "{host}{uri}".format(host=self._host, uri=uri)
542 raise core_exceptions.format_http_response_error(response, method, request_url, payload) # type: ignore
543
544 return empty_pb2.Empty()
545
546 @property
547 def list_operations(
548 self,
549 ) -> Callable[
550 [operations_pb2.ListOperationsRequest],
551 Coroutine[Any, Any, operations_pb2.ListOperationsResponse],
552 ]:
553 return self._list_operations
554
555 @property
556 def get_operation(
557 self,
558 ) -> Callable[
559 [operations_pb2.GetOperationRequest],
560 Coroutine[Any, Any, operations_pb2.Operation],
561 ]:
562 return self._get_operation
563
564 @property
565 def delete_operation(
566 self,
567 ) -> Callable[
568 [operations_pb2.DeleteOperationRequest], Coroutine[Any, Any, empty_pb2.Empty]
569 ]:
570 return self._delete_operation
571
572 @property
573 def cancel_operation(
574 self,
575 ) -> Callable[
576 [operations_pb2.CancelOperationRequest], Coroutine[Any, Any, empty_pb2.Empty]
577 ]:
578 return self._cancel_operation
579
580
581__all__ = ("AsyncOperationsRestTransport",)