1# Copyright 2016 Google LLC
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7# http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14
15"""Futures for long-running operations returned from Google Cloud APIs.
16
17These futures can be used to synchronously wait for the result of a
18long-running operation using :meth:`Operation.result`:
19
20
21.. code-block:: python
22
23 operation = my_api_client.long_running_method()
24 result = operation.result()
25
26Or asynchronously using callbacks and :meth:`Operation.add_done_callback`:
27
28.. code-block:: python
29
30 operation = my_api_client.long_running_method()
31
32 def my_callback(future):
33 result = future.result()
34
35 operation.add_done_callback(my_callback)
36
37"""
38
39import functools
40import threading
41
42from google.api_core import exceptions
43from google.api_core import protobuf_helpers
44from google.api_core.future import polling
45from google.longrunning import operations_pb2
46from google.protobuf import json_format
47from google.rpc import code_pb2
48
49
50class Operation(polling.PollingFuture):
51 """A Future for interacting with a Google API Long-Running Operation.
52
53 Args:
54 operation (google.longrunning.operations_pb2.Operation): The
55 initial operation.
56 refresh (Callable[[], ~.api_core.operation.Operation]): A callable that
57 returns the latest state of the operation.
58 cancel (Callable[[], None]): A callable that tries to cancel
59 the operation.
60 result_type (func:`type`): The protobuf type for the operation's
61 result.
62 metadata_type (func:`type`): The protobuf type for the operation's
63 metadata.
64 polling (google.api_core.retry.Retry): The configuration used for polling.
65 This parameter controls how often :meth:`done` is polled. If the
66 ``timeout`` argument is specified in the :meth:`result` method, it will
67 override the ``polling.timeout`` property.
68 retry (google.api_core.retry.Retry): DEPRECATED: use ``polling`` instead.
69 If specified it will override ``polling`` parameter to maintain
70 backward compatibility.
71 """
72
73 def __init__(
74 self,
75 operation,
76 refresh,
77 cancel,
78 result_type,
79 metadata_type=None,
80 polling=polling.DEFAULT_POLLING,
81 **kwargs
82 ):
83 super(Operation, self).__init__(polling=polling, **kwargs)
84 self._operation = operation
85 self._refresh = refresh
86 self._cancel = cancel
87 self._result_type = result_type
88 self._metadata_type = metadata_type
89 self._completion_lock = threading.Lock()
90 # Invoke this in case the operation came back already complete.
91 self._set_result_from_operation()
92
93 @property
94 def operation(self):
95 """google.longrunning.Operation: The current long-running operation."""
96 return self._operation
97
98 @property
99 def metadata(self):
100 """google.protobuf.Message: the current operation metadata."""
101 if not self._operation.HasField("metadata"):
102 return None
103
104 return protobuf_helpers.from_any_pb(
105 self._metadata_type, self._operation.metadata
106 )
107
108 @classmethod
109 def deserialize(self, payload):
110 """Deserialize a ``google.longrunning.Operation`` protocol buffer.
111
112 Args:
113 payload (bytes): A serialized operation protocol buffer.
114
115 Returns:
116 ~.operations_pb2.Operation: An Operation protobuf object.
117 """
118 return operations_pb2.Operation.FromString(payload)
119
120 def _set_result_from_operation(self):
121 """Set the result or exception from the operation if it is complete."""
122 # This must be done in a lock to prevent the polling thread
123 # and main thread from both executing the completion logic
124 # at the same time.
125 with self._completion_lock:
126 # If the operation isn't complete or if the result has already been
127 # set, do not call set_result/set_exception again.
128 # Note: self._result_set is set to True in set_result and
129 # set_exception, in case those methods are invoked directly.
130 if not self._operation.done or self._result_set:
131 return
132
133 if self._operation.HasField("response"):
134 response = protobuf_helpers.from_any_pb(
135 self._result_type, self._operation.response
136 )
137 self.set_result(response)
138 elif self._operation.HasField("error"):
139 exception = exceptions.from_grpc_status(
140 status_code=self._operation.error.code,
141 message=self._operation.error.message,
142 errors=(self._operation.error,),
143 response=self._operation,
144 )
145 self.set_exception(exception)
146 else:
147 exception = exceptions.GoogleAPICallError(
148 "Unexpected state: Long-running operation had neither "
149 "response nor error set."
150 )
151 self.set_exception(exception)
152
153 def _refresh_and_update(self, retry=None):
154 """Refresh the operation and update the result if needed.
155
156 Args:
157 retry (google.api_core.retry.Retry): (Optional) How to retry the RPC.
158 """
159 # If the currently cached operation is done, no need to make another
160 # RPC as it will not change once done.
161 if not self._operation.done:
162 self._operation = self._refresh(retry=retry) if retry else self._refresh()
163 self._set_result_from_operation()
164
165 def done(self, retry=None):
166 """Checks to see if the operation is complete.
167
168 Args:
169 retry (google.api_core.retry.Retry): (Optional) How to retry the RPC.
170
171 Returns:
172 bool: True if the operation is complete, False otherwise.
173 """
174 self._refresh_and_update(retry)
175 return self._operation.done
176
177 def cancel(self):
178 """Attempt to cancel the operation.
179
180 Returns:
181 bool: True if the cancel RPC was made, False if the operation is
182 already complete.
183 """
184 if self.done():
185 return False
186
187 self._cancel()
188 return True
189
190 def cancelled(self):
191 """True if the operation was cancelled."""
192 self._refresh_and_update()
193 return (
194 self._operation.HasField("error")
195 and self._operation.error.code == code_pb2.CANCELLED
196 )
197
198
199def _refresh_http(api_request, operation_name, retry=None):
200 """Refresh an operation using a JSON/HTTP client.
201
202 Args:
203 api_request (Callable): A callable used to make an API request. This
204 should generally be
205 :meth:`google.cloud._http.Connection.api_request`.
206 operation_name (str): The name of the operation.
207 retry (google.api_core.retry.Retry): (Optional) retry policy
208
209 Returns:
210 google.longrunning.operations_pb2.Operation: The operation.
211 """
212 path = "operations/{}".format(operation_name)
213
214 if retry is not None:
215 api_request = retry(api_request)
216
217 api_response = api_request(method="GET", path=path)
218 return json_format.ParseDict(api_response, operations_pb2.Operation())
219
220
221def _cancel_http(api_request, operation_name):
222 """Cancel an operation using a JSON/HTTP client.
223
224 Args:
225 api_request (Callable): A callable used to make an API request. This
226 should generally be
227 :meth:`google.cloud._http.Connection.api_request`.
228 operation_name (str): The name of the operation.
229 """
230 path = "operations/{}:cancel".format(operation_name)
231 api_request(method="POST", path=path)
232
233
234def from_http_json(operation, api_request, result_type, **kwargs):
235 """Create an operation future using a HTTP/JSON client.
236
237 This interacts with the long-running operations `service`_ (specific
238 to a given API) via `HTTP/JSON`_.
239
240 .. _HTTP/JSON: https://cloud.google.com/speech/reference/rest/\
241 v1beta1/operations#Operation
242
243 Args:
244 operation (dict): Operation as a dictionary.
245 api_request (Callable): A callable used to make an API request. This
246 should generally be
247 :meth:`google.cloud._http.Connection.api_request`.
248 result_type (:func:`type`): The protobuf result type.
249 kwargs: Keyword args passed into the :class:`Operation` constructor.
250
251 Returns:
252 ~.api_core.operation.Operation: The operation future to track the given
253 operation.
254 """
255 operation_proto = json_format.ParseDict(operation, operations_pb2.Operation())
256 refresh = functools.partial(_refresh_http, api_request, operation_proto.name)
257 cancel = functools.partial(_cancel_http, api_request, operation_proto.name)
258 return Operation(operation_proto, refresh, cancel, result_type, **kwargs)
259
260
261def _refresh_grpc(operations_stub, operation_name, retry=None):
262 """Refresh an operation using a gRPC client.
263
264 Args:
265 operations_stub (google.longrunning.operations_pb2.OperationsStub):
266 The gRPC operations stub.
267 operation_name (str): The name of the operation.
268 retry (google.api_core.retry.Retry): (Optional) retry policy
269
270 Returns:
271 google.longrunning.operations_pb2.Operation: The operation.
272 """
273 request_pb = operations_pb2.GetOperationRequest(name=operation_name)
274
275 rpc = operations_stub.GetOperation
276 if retry is not None:
277 rpc = retry(rpc)
278
279 return rpc(request_pb)
280
281
282def _cancel_grpc(operations_stub, operation_name):
283 """Cancel an operation using a gRPC client.
284
285 Args:
286 operations_stub (google.longrunning.operations_pb2.OperationsStub):
287 The gRPC operations stub.
288 operation_name (str): The name of the operation.
289 """
290 request_pb = operations_pb2.CancelOperationRequest(name=operation_name)
291 operations_stub.CancelOperation(request_pb)
292
293
294def from_grpc(operation, operations_stub, result_type, grpc_metadata=None, **kwargs):
295 """Create an operation future using a gRPC client.
296
297 This interacts with the long-running operations `service`_ (specific
298 to a given API) via gRPC.
299
300 .. _service: https://github.com/googleapis/googleapis/blob/\
301 050400df0fdb16f63b63e9dee53819044bffc857/\
302 google/longrunning/operations.proto#L38
303
304 Args:
305 operation (google.longrunning.operations_pb2.Operation): The operation.
306 operations_stub (google.longrunning.operations_pb2.OperationsStub):
307 The operations stub.
308 result_type (:func:`type`): The protobuf result type.
309 grpc_metadata (Optional[List[Tuple[str, str]]]): Additional metadata to pass
310 to the rpc.
311 kwargs: Keyword args passed into the :class:`Operation` constructor.
312
313 Returns:
314 ~.api_core.operation.Operation: The operation future to track the given
315 operation.
316 """
317 refresh = functools.partial(
318 _refresh_grpc,
319 operations_stub,
320 operation.name,
321 metadata=grpc_metadata,
322 )
323 cancel = functools.partial(
324 _cancel_grpc,
325 operations_stub,
326 operation.name,
327 metadata=grpc_metadata,
328 )
329 return Operation(operation, refresh, cancel, result_type, **kwargs)
330
331
332def from_gapic(operation, operations_client, result_type, grpc_metadata=None, **kwargs):
333 """Create an operation future from a gapic client.
334
335 This interacts with the long-running operations `service`_ (specific
336 to a given API) via a gapic client.
337
338 .. _service: https://github.com/googleapis/googleapis/blob/\
339 050400df0fdb16f63b63e9dee53819044bffc857/\
340 google/longrunning/operations.proto#L38
341
342 Args:
343 operation (google.longrunning.operations_pb2.Operation): The operation.
344 operations_client (google.api_core.operations_v1.OperationsClient):
345 The operations client.
346 result_type (:func:`type`): The protobuf result type.
347 grpc_metadata (Optional[List[Tuple[str, str]]]): Additional metadata to pass
348 to the rpc.
349 kwargs: Keyword args passed into the :class:`Operation` constructor.
350
351 Returns:
352 ~.api_core.operation.Operation: The operation future to track the given
353 operation.
354 """
355 refresh = functools.partial(
356 operations_client.get_operation,
357 operation.name,
358 metadata=grpc_metadata,
359 )
360 cancel = functools.partial(
361 operations_client.cancel_operation,
362 operation.name,
363 metadata=grpc_metadata,
364 )
365 return Operation(operation, refresh, cancel, result_type, **kwargs)