1# Copyright 2016 Google LLC 
    2# 
    3# Licensed under the Apache License, Version 2.0 (the "License"); 
    4# you may not use this file except in compliance with the License. 
    5# You may obtain a copy of the License at 
    6# 
    7#     http://www.apache.org/licenses/LICENSE-2.0 
    8# 
    9# Unless required by applicable law or agreed to in writing, software 
    10# distributed under the License is distributed on an "AS IS" BASIS, 
    11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
    12# See the License for the specific language governing permissions and 
    13# limitations under the License. 
    14 
    15"""Futures for long-running operations returned from Google Cloud APIs. 
    16 
    17These futures can be used to synchronously wait for the result of a 
    18long-running operation using :meth:`Operation.result`: 
    19 
    20 
    21.. code-block:: python 
    22 
    23    operation = my_api_client.long_running_method() 
    24    result = operation.result() 
    25 
    26Or asynchronously using callbacks and :meth:`Operation.add_done_callback`: 
    27 
    28.. code-block:: python 
    29 
    30    operation = my_api_client.long_running_method() 
    31 
    32    def my_callback(future): 
    33        result = future.result() 
    34 
    35    operation.add_done_callback(my_callback) 
    36 
    37""" 
    38 
    39import functools 
    40import threading 
    41 
    42from google.api_core import exceptions 
    43from google.api_core import protobuf_helpers 
    44from google.api_core.future import polling 
    45from google.longrunning import operations_pb2 
    46from google.protobuf import json_format 
    47from google.rpc import code_pb2 
    48 
    49 
    50class Operation(polling.PollingFuture): 
    51    """A Future for interacting with a Google API Long-Running Operation. 
    52 
    53    Args: 
    54        operation (google.longrunning.operations_pb2.Operation): The 
    55            initial operation. 
    56        refresh (Callable[[], ~.api_core.operation.Operation]): A callable that 
    57            returns the latest state of the operation. 
    58        cancel (Callable[[], None]): A callable that tries to cancel 
    59            the operation. 
    60        result_type (func:`type`): The protobuf type for the operation's 
    61            result. 
    62        metadata_type (func:`type`): The protobuf type for the operation's 
    63            metadata. 
    64        polling (google.api_core.retry.Retry): The configuration used for polling. 
    65            This parameter controls how often :meth:`done` is polled. If the 
    66            ``timeout`` argument is specified in the :meth:`result` method, it will 
    67            override the ``polling.timeout`` property. 
    68        retry (google.api_core.retry.Retry): DEPRECATED: use ``polling`` instead. 
    69            If specified it will override ``polling`` parameter to maintain 
    70            backward compatibility. 
    71    """ 
    72 
    73    def __init__( 
    74        self, 
    75        operation, 
    76        refresh, 
    77        cancel, 
    78        result_type, 
    79        metadata_type=None, 
    80        polling=polling.DEFAULT_POLLING, 
    81        **kwargs 
    82    ): 
    83        super(Operation, self).__init__(polling=polling, **kwargs) 
    84        self._operation = operation 
    85        self._refresh = refresh 
    86        self._cancel = cancel 
    87        self._result_type = result_type 
    88        self._metadata_type = metadata_type 
    89        self._completion_lock = threading.Lock() 
    90        # Invoke this in case the operation came back already complete. 
    91        self._set_result_from_operation() 
    92 
    93    @property 
    94    def operation(self): 
    95        """google.longrunning.Operation: The current long-running operation.""" 
    96        return self._operation 
    97 
    98    @property 
    99    def metadata(self): 
    100        """google.protobuf.Message: the current operation metadata.""" 
    101        if not self._operation.HasField("metadata"): 
    102            return None 
    103 
    104        return protobuf_helpers.from_any_pb( 
    105            self._metadata_type, self._operation.metadata 
    106        ) 
    107 
    108    @classmethod 
    109    def deserialize(self, payload): 
    110        """Deserialize a ``google.longrunning.Operation`` protocol buffer. 
    111 
    112        Args: 
    113            payload (bytes): A serialized operation protocol buffer. 
    114 
    115        Returns: 
    116            ~.operations_pb2.Operation: An Operation protobuf object. 
    117        """ 
    118        return operations_pb2.Operation.FromString(payload) 
    119 
    120    def _set_result_from_operation(self): 
    121        """Set the result or exception from the operation if it is complete.""" 
    122        # This must be done in a lock to prevent the polling thread 
    123        # and main thread from both executing the completion logic 
    124        # at the same time. 
    125        with self._completion_lock: 
    126            # If the operation isn't complete or if the result has already been 
    127            # set, do not call set_result/set_exception again. 
    128            # Note: self._result_set is set to True in set_result and 
    129            # set_exception, in case those methods are invoked directly. 
    130            if not self._operation.done or self._result_set: 
    131                return 
    132 
    133            if self._operation.HasField("response"): 
    134                response = protobuf_helpers.from_any_pb( 
    135                    self._result_type, self._operation.response 
    136                ) 
    137                self.set_result(response) 
    138            elif self._operation.HasField("error"): 
    139                exception = exceptions.from_grpc_status( 
    140                    status_code=self._operation.error.code, 
    141                    message=self._operation.error.message, 
    142                    errors=(self._operation.error,), 
    143                    response=self._operation, 
    144                ) 
    145                self.set_exception(exception) 
    146            else: 
    147                exception = exceptions.GoogleAPICallError( 
    148                    "Unexpected state: Long-running operation had neither " 
    149                    "response nor error set." 
    150                ) 
    151                self.set_exception(exception) 
    152 
    153    def _refresh_and_update(self, retry=None): 
    154        """Refresh the operation and update the result if needed. 
    155 
    156        Args: 
    157            retry (google.api_core.retry.Retry): (Optional) How to retry the RPC. 
    158        """ 
    159        # If the currently cached operation is done, no need to make another 
    160        # RPC as it will not change once done. 
    161        if not self._operation.done: 
    162            self._operation = self._refresh(retry=retry) if retry else self._refresh() 
    163            self._set_result_from_operation() 
    164 
    165    def done(self, retry=None): 
    166        """Checks to see if the operation is complete. 
    167 
    168        Args: 
    169            retry (google.api_core.retry.Retry): (Optional) How to retry the RPC. 
    170 
    171        Returns: 
    172            bool: True if the operation is complete, False otherwise. 
    173        """ 
    174        self._refresh_and_update(retry) 
    175        return self._operation.done 
    176 
    177    def cancel(self): 
    178        """Attempt to cancel the operation. 
    179 
    180        Returns: 
    181            bool: True if the cancel RPC was made, False if the operation is 
    182                already complete. 
    183        """ 
    184        if self.done(): 
    185            return False 
    186 
    187        self._cancel() 
    188        return True 
    189 
    190    def cancelled(self): 
    191        """True if the operation was cancelled.""" 
    192        self._refresh_and_update() 
    193        return ( 
    194            self._operation.HasField("error") 
    195            and self._operation.error.code == code_pb2.CANCELLED 
    196        ) 
    197 
    198 
    199def _refresh_http(api_request, operation_name, retry=None): 
    200    """Refresh an operation using a JSON/HTTP client. 
    201 
    202    Args: 
    203        api_request (Callable): A callable used to make an API request. This 
    204            should generally be 
    205            :meth:`google.cloud._http.Connection.api_request`. 
    206        operation_name (str): The name of the operation. 
    207        retry (google.api_core.retry.Retry): (Optional) retry policy 
    208 
    209    Returns: 
    210        google.longrunning.operations_pb2.Operation: The operation. 
    211    """ 
    212    path = "operations/{}".format(operation_name) 
    213 
    214    if retry is not None: 
    215        api_request = retry(api_request) 
    216 
    217    api_response = api_request(method="GET", path=path) 
    218    return json_format.ParseDict(api_response, operations_pb2.Operation()) 
    219 
    220 
    221def _cancel_http(api_request, operation_name): 
    222    """Cancel an operation using a JSON/HTTP client. 
    223 
    224    Args: 
    225        api_request (Callable): A callable used to make an API request. This 
    226            should generally be 
    227            :meth:`google.cloud._http.Connection.api_request`. 
    228        operation_name (str): The name of the operation. 
    229    """ 
    230    path = "operations/{}:cancel".format(operation_name) 
    231    api_request(method="POST", path=path) 
    232 
    233 
    234def from_http_json(operation, api_request, result_type, **kwargs): 
    235    """Create an operation future using a HTTP/JSON client. 
    236 
    237    This interacts with the long-running operations `service`_ (specific 
    238    to a given API) via `HTTP/JSON`_. 
    239 
    240    .. _HTTP/JSON: https://cloud.google.com/speech/reference/rest/\ 
    241            v1beta1/operations#Operation 
    242 
    243    Args: 
    244        operation (dict): Operation as a dictionary. 
    245        api_request (Callable): A callable used to make an API request. This 
    246            should generally be 
    247            :meth:`google.cloud._http.Connection.api_request`. 
    248        result_type (:func:`type`): The protobuf result type. 
    249        kwargs: Keyword args passed into the :class:`Operation` constructor. 
    250 
    251    Returns: 
    252        ~.api_core.operation.Operation: The operation future to track the given 
    253            operation. 
    254    """ 
    255    operation_proto = json_format.ParseDict(operation, operations_pb2.Operation()) 
    256    refresh = functools.partial(_refresh_http, api_request, operation_proto.name) 
    257    cancel = functools.partial(_cancel_http, api_request, operation_proto.name) 
    258    return Operation(operation_proto, refresh, cancel, result_type, **kwargs) 
    259 
    260 
    261def _refresh_grpc(operations_stub, operation_name, retry=None): 
    262    """Refresh an operation using a gRPC client. 
    263 
    264    Args: 
    265        operations_stub (google.longrunning.operations_pb2.OperationsStub): 
    266            The gRPC operations stub. 
    267        operation_name (str): The name of the operation. 
    268        retry (google.api_core.retry.Retry): (Optional) retry policy 
    269 
    270    Returns: 
    271        google.longrunning.operations_pb2.Operation: The operation. 
    272    """ 
    273    request_pb = operations_pb2.GetOperationRequest(name=operation_name) 
    274 
    275    rpc = operations_stub.GetOperation 
    276    if retry is not None: 
    277        rpc = retry(rpc) 
    278 
    279    return rpc(request_pb) 
    280 
    281 
    282def _cancel_grpc(operations_stub, operation_name): 
    283    """Cancel an operation using a gRPC client. 
    284 
    285    Args: 
    286        operations_stub (google.longrunning.operations_pb2.OperationsStub): 
    287            The gRPC operations stub. 
    288        operation_name (str): The name of the operation. 
    289    """ 
    290    request_pb = operations_pb2.CancelOperationRequest(name=operation_name) 
    291    operations_stub.CancelOperation(request_pb) 
    292 
    293 
    294def from_grpc(operation, operations_stub, result_type, grpc_metadata=None, **kwargs): 
    295    """Create an operation future using a gRPC client. 
    296 
    297    This interacts with the long-running operations `service`_ (specific 
    298    to a given API) via gRPC. 
    299 
    300    .. _service: https://github.com/googleapis/googleapis/blob/\ 
    301                 050400df0fdb16f63b63e9dee53819044bffc857/\ 
    302                 google/longrunning/operations.proto#L38 
    303 
    304    Args: 
    305        operation (google.longrunning.operations_pb2.Operation): The operation. 
    306        operations_stub (google.longrunning.operations_pb2.OperationsStub): 
    307            The operations stub. 
    308        result_type (:func:`type`): The protobuf result type. 
    309        grpc_metadata (Optional[List[Tuple[str, str]]]): Additional metadata to pass 
    310            to the rpc. 
    311        kwargs: Keyword args passed into the :class:`Operation` constructor. 
    312 
    313    Returns: 
    314        ~.api_core.operation.Operation: The operation future to track the given 
    315            operation. 
    316    """ 
    317    refresh = functools.partial( 
    318        _refresh_grpc, 
    319        operations_stub, 
    320        operation.name, 
    321        metadata=grpc_metadata, 
    322    ) 
    323    cancel = functools.partial( 
    324        _cancel_grpc, 
    325        operations_stub, 
    326        operation.name, 
    327        metadata=grpc_metadata, 
    328    ) 
    329    return Operation(operation, refresh, cancel, result_type, **kwargs) 
    330 
    331 
    332def from_gapic(operation, operations_client, result_type, grpc_metadata=None, **kwargs): 
    333    """Create an operation future from a gapic client. 
    334 
    335    This interacts with the long-running operations `service`_ (specific 
    336    to a given API) via a gapic client. 
    337 
    338    .. _service: https://github.com/googleapis/googleapis/blob/\ 
    339                 050400df0fdb16f63b63e9dee53819044bffc857/\ 
    340                 google/longrunning/operations.proto#L38 
    341 
    342    Args: 
    343        operation (google.longrunning.operations_pb2.Operation): The operation. 
    344        operations_client (google.api_core.operations_v1.OperationsClient): 
    345            The operations client. 
    346        result_type (:func:`type`): The protobuf result type. 
    347        grpc_metadata (Optional[List[Tuple[str, str]]]): Additional metadata to pass 
    348            to the rpc. 
    349        kwargs: Keyword args passed into the :class:`Operation` constructor. 
    350 
    351    Returns: 
    352        ~.api_core.operation.Operation: The operation future to track the given 
    353            operation. 
    354    """ 
    355    refresh = functools.partial( 
    356        operations_client.get_operation, 
    357        operation.name, 
    358        metadata=grpc_metadata, 
    359    ) 
    360    cancel = functools.partial( 
    361        operations_client.cancel_operation, 
    362        operation.name, 
    363        metadata=grpc_metadata, 
    364    ) 
    365    return Operation(operation, refresh, cancel, result_type, **kwargs)