1# Copyright 2020 Google LLC
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7# http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14
15"""AsyncIO futures for long-running operations returned from Google Cloud APIs.
16
17These futures can be used to await for the result of a long-running operation
18using :meth:`AsyncOperation.result`:
19
20
21.. code-block:: python
22
23 operation = my_api_client.long_running_method()
24 result = await operation.result()
25
26Or asynchronously using callbacks and :meth:`Operation.add_done_callback`:
27
28.. code-block:: python
29
30 operation = my_api_client.long_running_method()
31
32 def my_callback(future):
33 result = await future.result()
34
35 operation.add_done_callback(my_callback)
36
37"""
38
39import functools
40import threading
41
42from google.api_core import exceptions
43from google.api_core import protobuf_helpers
44from google.api_core.future import async_future
45from google.longrunning import operations_pb2
46from google.rpc import code_pb2
47
48
49class AsyncOperation(async_future.AsyncFuture):
50 """A Future for interacting with a Google API Long-Running Operation.
51
52 Args:
53 operation (google.longrunning.operations_pb2.Operation): The
54 initial operation.
55 refresh (Callable[[], ~.api_core.operation.Operation]): A callable that
56 returns the latest state of the operation.
57 cancel (Callable[[], None]): A callable that tries to cancel
58 the operation.
59 result_type (func:`type`): The protobuf type for the operation's
60 result.
61 metadata_type (func:`type`): The protobuf type for the operation's
62 metadata.
63 retry (google.api_core.retry.Retry): The retry configuration used
64 when polling. This can be used to control how often :meth:`done`
65 is polled. Regardless of the retry's ``deadline``, it will be
66 overridden by the ``timeout`` argument to :meth:`result`.
67 """
68
69 def __init__(
70 self,
71 operation,
72 refresh,
73 cancel,
74 result_type,
75 metadata_type=None,
76 retry=async_future.DEFAULT_RETRY,
77 ):
78 super().__init__(retry=retry)
79 self._operation = operation
80 self._refresh = refresh
81 self._cancel = cancel
82 self._result_type = result_type
83 self._metadata_type = metadata_type
84 self._completion_lock = threading.Lock()
85 # Invoke this in case the operation came back already complete.
86 self._set_result_from_operation()
87
88 @property
89 def operation(self):
90 """google.longrunning.Operation: The current long-running operation."""
91 return self._operation
92
93 @property
94 def metadata(self):
95 """google.protobuf.Message: the current operation metadata."""
96 if not self._operation.HasField("metadata"):
97 return None
98
99 return protobuf_helpers.from_any_pb(
100 self._metadata_type, self._operation.metadata
101 )
102
103 @classmethod
104 def deserialize(cls, payload):
105 """Deserialize a ``google.longrunning.Operation`` protocol buffer.
106
107 Args:
108 payload (bytes): A serialized operation protocol buffer.
109
110 Returns:
111 ~.operations_pb2.Operation: An Operation protobuf object.
112 """
113 return operations_pb2.Operation.FromString(payload)
114
115 def _set_result_from_operation(self):
116 """Set the result or exception from the operation if it is complete."""
117 # This must be done in a lock to prevent the async_future thread
118 # and main thread from both executing the completion logic
119 # at the same time.
120 with self._completion_lock:
121 # If the operation isn't complete or if the result has already been
122 # set, do not call set_result/set_exception again.
123 if not self._operation.done or self._future.done():
124 return
125
126 if self._operation.HasField("response"):
127 response = protobuf_helpers.from_any_pb(
128 self._result_type, self._operation.response
129 )
130 self.set_result(response)
131 elif self._operation.HasField("error"):
132 exception = exceptions.GoogleAPICallError(
133 self._operation.error.message,
134 errors=(self._operation.error,),
135 response=self._operation,
136 )
137 self.set_exception(exception)
138 else:
139 exception = exceptions.GoogleAPICallError(
140 "Unexpected state: Long-running operation had neither "
141 "response nor error set."
142 )
143 self.set_exception(exception)
144
145 async def _refresh_and_update(self, retry=async_future.DEFAULT_RETRY):
146 """Refresh the operation and update the result if needed.
147
148 Args:
149 retry (google.api_core.retry.Retry): (Optional) How to retry the RPC.
150 """
151 # If the currently cached operation is done, no need to make another
152 # RPC as it will not change once done.
153 if not self._operation.done:
154 self._operation = await self._refresh(retry=retry)
155 self._set_result_from_operation()
156
157 async def done(self, retry=async_future.DEFAULT_RETRY):
158 """Checks to see if the operation is complete.
159
160 Args:
161 retry (google.api_core.retry.Retry): (Optional) How to retry the RPC.
162
163 Returns:
164 bool: True if the operation is complete, False otherwise.
165 """
166 await self._refresh_and_update(retry)
167 return self._operation.done
168
169 async def cancel(self):
170 """Attempt to cancel the operation.
171
172 Returns:
173 bool: True if the cancel RPC was made, False if the operation is
174 already complete.
175 """
176 result = await self.done()
177 if result:
178 return False
179 else:
180 await self._cancel()
181 return True
182
183 async def cancelled(self):
184 """True if the operation was cancelled."""
185 await self._refresh_and_update()
186 return (
187 self._operation.HasField("error")
188 and self._operation.error.code == code_pb2.CANCELLED
189 )
190
191
192def from_gapic(operation, operations_client, result_type, grpc_metadata=None, **kwargs):
193 """Create an operation future from a gapic client.
194
195 This interacts with the long-running operations `service`_ (specific
196 to a given API) via a gapic client.
197
198 .. _service: https://github.com/googleapis/googleapis/blob/\
199 050400df0fdb16f63b63e9dee53819044bffc857/\
200 google/longrunning/operations.proto#L38
201
202 Args:
203 operation (google.longrunning.operations_pb2.Operation): The operation.
204 operations_client (google.api_core.operations_v1.OperationsClient):
205 The operations client.
206 result_type (:func:`type`): The protobuf result type.
207 grpc_metadata (Optional[List[Tuple[str, str]]]): Additional metadata to pass
208 to the rpc.
209 kwargs: Keyword args passed into the :class:`Operation` constructor.
210
211 Returns:
212 ~.api_core.operation.Operation: The operation future to track the given
213 operation.
214 """
215 refresh = functools.partial(
216 operations_client.get_operation,
217 operation.name,
218 metadata=grpc_metadata,
219 )
220 cancel = functools.partial(
221 operations_client.cancel_operation,
222 operation.name,
223 metadata=grpc_metadata,
224 )
225 return AsyncOperation(operation, refresh, cancel, result_type, **kwargs)