1# Copyright 2017, Google LLC 
    2# 
    3# Licensed under the Apache License, Version 2.0 (the "License"); 
    4# you may not use this file except in compliance with the License. 
    5# You may obtain a copy of the License at 
    6# 
    7#     http://www.apache.org/licenses/LICENSE-2.0 
    8# 
    9# Unless required by applicable law or agreed to in writing, software 
    10# distributed under the License is distributed on an "AS IS" BASIS, 
    11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
    12# See the License for the specific language governing permissions and 
    13# limitations under the License. 
    14 
    15"""Abstract and helper bases for Future implementations.""" 
    16 
    17import abc 
    18import concurrent.futures 
    19 
    20from google.api_core import exceptions 
    21from google.api_core import retry as retries 
    22from google.api_core.future import _helpers 
    23from google.api_core.future import base 
    24 
    25 
    26class _OperationNotComplete(Exception): 
    27    """Private exception used for polling via retry.""" 
    28 
    29    pass 
    30 
    31 
    32# DEPRECATED as it conflates RPC retry and polling concepts into one. 
    33# Use POLLING_PREDICATE instead to configure polling. 
    34RETRY_PREDICATE = retries.if_exception_type( 
    35    _OperationNotComplete, 
    36    exceptions.TooManyRequests, 
    37    exceptions.InternalServerError, 
    38    exceptions.BadGateway, 
    39    exceptions.ServiceUnavailable, 
    40) 
    41 
    42# DEPRECATED: use DEFAULT_POLLING to configure LRO polling logic. Construct 
    43# Retry object using its default values as a baseline for any custom retry logic 
    44# (not to be confused with polling logic). 
    45DEFAULT_RETRY = retries.Retry(predicate=RETRY_PREDICATE) 
    46 
    47# POLLING_PREDICATE is supposed to poll only on _OperationNotComplete. 
    48# Any RPC-specific errors (like ServiceUnavailable) will be handled 
    49# by retry logic (not to be confused with polling logic) which is triggered for 
    50# every polling RPC independently of polling logic but within its context. 
    51POLLING_PREDICATE = retries.if_exception_type( 
    52    _OperationNotComplete, 
    53) 
    54 
    55# Default polling configuration 
    56DEFAULT_POLLING = retries.Retry( 
    57    predicate=POLLING_PREDICATE, 
    58    initial=1.0,  # seconds 
    59    maximum=20.0,  # seconds 
    60    multiplier=1.5, 
    61    timeout=900,  # seconds 
    62) 
    63 
    64 
    65class PollingFuture(base.Future): 
    66    """A Future that needs to poll some service to check its status. 
    67 
    68    The :meth:`done` method should be implemented by subclasses. The polling 
    69    behavior will repeatedly call ``done`` until it returns True. 
    70 
    71    The actual polling logic is encapsulated in :meth:`result` method. See 
    72    documentation for that method for details on how polling works. 
    73 
    74    .. note:: 
    75 
    76        Privacy here is intended to prevent the final class from 
    77        overexposing, not to prevent subclasses from accessing methods. 
    78 
    79    Args: 
    80        polling (google.api_core.retry.Retry): The configuration used for polling. 
    81            This parameter controls how often :meth:`done` is polled. If the 
    82            ``timeout`` argument is specified in :meth:`result` method it will 
    83            override the ``polling.timeout`` property. 
    84        retry (google.api_core.retry.Retry): DEPRECATED use ``polling`` instead. 
    85            If set, it will override ``polling`` parameter for backward 
    86            compatibility. 
    87    """ 
    88 
    89    _DEFAULT_VALUE = object() 
    90 
    91    def __init__(self, polling=DEFAULT_POLLING, **kwargs): 
    92        super(PollingFuture, self).__init__() 
    93        self._polling = kwargs.get("retry", polling) 
    94        self._result = None 
    95        self._exception = None 
    96        self._result_set = False 
    97        """bool: Set to True when the result has been set via set_result or 
    98        set_exception.""" 
    99        self._polling_thread = None 
    100        self._done_callbacks = [] 
    101 
    102    @abc.abstractmethod 
    103    def done(self, retry=None): 
    104        """Checks to see if the operation is complete. 
    105 
    106        Args: 
    107            retry (google.api_core.retry.Retry): (Optional) How to retry the 
    108                polling RPC (to not be confused with polling configuration. See 
    109                the documentation for :meth:`result` for details). 
    110 
    111        Returns: 
    112            bool: True if the operation is complete, False otherwise. 
    113        """ 
    114        # pylint: disable=redundant-returns-doc, missing-raises-doc 
    115        raise NotImplementedError() 
    116 
    117    def _done_or_raise(self, retry=None): 
    118        """Check if the future is done and raise if it's not.""" 
    119        if not self.done(retry=retry): 
    120            raise _OperationNotComplete() 
    121 
    122    def running(self): 
    123        """True if the operation is currently running.""" 
    124        return not self.done() 
    125 
    126    def _blocking_poll(self, timeout=_DEFAULT_VALUE, retry=None, polling=None): 
    127        """Poll and wait for the Future to be resolved.""" 
    128 
    129        if self._result_set: 
    130            return 
    131 
    132        polling = polling or self._polling 
    133        if timeout is not PollingFuture._DEFAULT_VALUE: 
    134            polling = polling.with_timeout(timeout) 
    135 
    136        try: 
    137            polling(self._done_or_raise)(retry=retry) 
    138        except exceptions.RetryError: 
    139            raise concurrent.futures.TimeoutError( 
    140                f"Operation did not complete within the designated timeout of " 
    141                f"{polling.timeout} seconds." 
    142            ) 
    143 
    144    def result(self, timeout=_DEFAULT_VALUE, retry=None, polling=None): 
    145        """Get the result of the operation. 
    146 
    147        This method will poll for operation status periodically, blocking if 
    148        necessary. If you just want to make sure that this method does not block 
    149        for more than X seconds and you do not care about the nitty-gritty of 
    150        how this method operates, just call it with ``result(timeout=X)``. The 
    151        other parameters are for advanced use only. 
    152 
    153        Every call to this method is controlled by the following three 
    154        parameters, each of which has a specific, distinct role, even though all three 
    155        may look very similar: ``timeout``, ``retry`` and ``polling``. In most 
    156        cases users do not need to specify any custom values for any of these 
    157        parameters and may simply rely on default ones instead. 
    158 
    159        If you choose to specify custom parameters, please make sure you've 
    160        read the documentation below carefully. 
    161 
    162        First, please check :class:`google.api_core.retry.Retry` 
    163        class documentation for the proper definition of timeout and deadline 
    164        terms and for the definition the three different types of timeouts. 
    165        This class operates in terms of Retry Timeout and Polling Timeout. It 
    166        does not let customizing RPC timeout and the user is expected to rely on 
    167        default behavior for it. 
    168 
    169        The roles of each argument of this method are as follows: 
    170 
    171        ``timeout`` (int): (Optional) The Polling Timeout as defined in 
    172        :class:`google.api_core.retry.Retry`. If the operation does not complete 
    173        within this timeout an exception will be thrown. This parameter affects 
    174        neither Retry Timeout nor RPC Timeout. 
    175 
    176        ``retry`` (google.api_core.retry.Retry): (Optional) How to retry the 
    177        polling RPC. The ``retry.timeout`` property of this parameter is the 
    178        Retry Timeout as defined in :class:`google.api_core.retry.Retry`. 
    179        This parameter defines ONLY how the polling RPC call is retried 
    180        (i.e. what to do if the RPC we used for polling returned an error). It 
    181        does NOT define how the polling is done (i.e. how frequently and for 
    182        how long to call the polling RPC); use the ``polling`` parameter for that. 
    183        If a polling RPC throws and error and retrying it fails, the whole 
    184        future fails with the corresponding exception. If you want to tune which 
    185        server response error codes are not fatal for operation polling, use this 
    186        parameter to control that (``retry.predicate`` in particular). 
    187 
    188        ``polling`` (google.api_core.retry.Retry): (Optional) How often and 
    189        for how long to call the polling RPC periodically (i.e. what to do if 
    190        a polling rpc returned successfully but its returned result indicates 
    191        that the long running operation is not completed yet, so we need to 
    192        check it again at some point in future). This parameter does NOT define 
    193        how to retry each individual polling RPC in case of an error; use the 
    194        ``retry`` parameter for that. The ``polling.timeout`` of this parameter 
    195        is Polling Timeout as defined in as defined in 
    196        :class:`google.api_core.retry.Retry`. 
    197 
    198        For each of the arguments, there are also default values in place, which 
    199        will be used if a user does not specify their own. The default values 
    200        for the three parameters are not to be confused with the default values 
    201        for the corresponding arguments in this method (those serve as "not set" 
    202        markers for the resolution logic). 
    203 
    204        If ``timeout`` is provided (i.e.``timeout is not _DEFAULT VALUE``; note 
    205        the ``None`` value means "infinite timeout"), it will be used to control 
    206        the actual Polling Timeout. Otherwise, the ``polling.timeout`` value 
    207        will be used instead (see below for how the ``polling`` config itself 
    208        gets resolved). In other words, this parameter  effectively overrides 
    209        the ``polling.timeout`` value if specified. This is so to preserve 
    210        backward compatibility. 
    211 
    212        If ``retry`` is provided (i.e. ``retry is not None``) it will be used to 
    213        control retry behavior for the polling RPC and the ``retry.timeout`` 
    214        will determine the Retry Timeout. If not provided, the 
    215        polling RPC will be called with whichever default retry config was 
    216        specified for the polling RPC at the moment of the construction of the 
    217        polling RPC's client. For example, if the polling RPC is 
    218        ``operations_client.get_operation()``, the ``retry`` parameter will be 
    219        controlling its retry behavior (not polling  behavior) and, if not 
    220        specified, that specific method (``operations_client.get_operation()``) 
    221        will be retried according to the default retry config provided during 
    222        creation of ``operations_client`` client instead. This argument exists 
    223        mainly for backward compatibility; users are very unlikely to ever need 
    224        to set this parameter explicitly. 
    225 
    226        If ``polling`` is provided (i.e. ``polling is not None``), it will be used 
    227        to control the overall polling behavior and ``polling.timeout`` will 
    228        control Polling Timeout unless it is overridden by ``timeout`` parameter 
    229        as described above. If not provided, the``polling`` parameter specified 
    230        during construction of this future (the ``polling`` argument in the 
    231        constructor) will be used instead. Note: since the ``timeout`` argument may 
    232        override ``polling.timeout`` value, this parameter should be viewed as 
    233        coupled with the ``timeout`` parameter as described above. 
    234 
    235        Args: 
    236            timeout (int): (Optional) How long (in seconds) to wait for the 
    237                operation to complete. If None, wait indefinitely. 
    238            retry (google.api_core.retry.Retry): (Optional) How to retry the 
    239                polling RPC. This defines ONLY how the polling RPC call is 
    240                retried (i.e. what to do if the RPC we used for polling returned 
    241                an error). It does  NOT define how the polling is done (i.e. how 
    242                frequently and for how long to call the polling RPC). 
    243            polling (google.api_core.retry.Retry): (Optional) How often and 
    244                for how long to call polling RPC periodically. This parameter 
    245                does NOT define how to retry each individual polling RPC call 
    246                (use the ``retry`` parameter for that). 
    247 
    248        Returns: 
    249            google.protobuf.Message: The Operation's result. 
    250 
    251        Raises: 
    252            google.api_core.GoogleAPICallError: If the operation errors or if 
    253                the timeout is reached before the operation completes. 
    254        """ 
    255 
    256        self._blocking_poll(timeout=timeout, retry=retry, polling=polling) 
    257 
    258        if self._exception is not None: 
    259            # pylint: disable=raising-bad-type 
    260            # Pylint doesn't recognize that this is valid in this case. 
    261            raise self._exception 
    262 
    263        return self._result 
    264 
    265    def exception(self, timeout=_DEFAULT_VALUE): 
    266        """Get the exception from the operation, blocking if necessary. 
    267 
    268        See the documentation for the :meth:`result` method for details on how 
    269        this method operates, as both ``result`` and this method rely on the 
    270        exact same polling logic. The only difference is that this method does 
    271        not accept ``retry`` and ``polling`` arguments but relies on the default ones 
    272        instead. 
    273 
    274        Args: 
    275            timeout (int): How long to wait for the operation to complete. 
    276            If None, wait indefinitely. 
    277 
    278        Returns: 
    279            Optional[google.api_core.GoogleAPICallError]: The operation's 
    280                error. 
    281        """ 
    282        self._blocking_poll(timeout=timeout) 
    283        return self._exception 
    284 
    285    def add_done_callback(self, fn): 
    286        """Add a callback to be executed when the operation is complete. 
    287 
    288        If the operation is not already complete, this will start a helper 
    289        thread to poll for the status of the operation in the background. 
    290 
    291        Args: 
    292            fn (Callable[Future]): The callback to execute when the operation 
    293                is complete. 
    294        """ 
    295        if self._result_set: 
    296            _helpers.safe_invoke_callback(fn, self) 
    297            return 
    298 
    299        self._done_callbacks.append(fn) 
    300 
    301        if self._polling_thread is None: 
    302            # The polling thread will exit on its own as soon as the operation 
    303            # is done. 
    304            self._polling_thread = _helpers.start_daemon_thread( 
    305                target=self._blocking_poll 
    306            ) 
    307 
    308    def _invoke_callbacks(self, *args, **kwargs): 
    309        """Invoke all done callbacks.""" 
    310        for callback in self._done_callbacks: 
    311            _helpers.safe_invoke_callback(callback, *args, **kwargs) 
    312 
    313    def set_result(self, result): 
    314        """Set the Future's result.""" 
    315        self._result = result 
    316        self._result_set = True 
    317        self._invoke_callbacks(self) 
    318 
    319    def set_exception(self, exception): 
    320        """Set the Future's exception.""" 
    321        self._exception = exception 
    322        self._result_set = True 
    323        self._invoke_callbacks(self)