1# -------------------------------------------------------------------------- 
    2# 
    3# Copyright (c) Microsoft Corporation. All rights reserved. 
    4# 
    5# The MIT License (MIT) 
    6# 
    7# Permission is hereby granted, free of charge, to any person obtaining a copy 
    8# of this software and associated documentation files (the ""Software""), to 
    9# deal in the Software without restriction, including without limitation the 
    10# rights to use, copy, modify, merge, publish, distribute, sublicense, and/or 
    11# sell copies of the Software, and to permit persons to whom the Software is 
    12# furnished to do so, subject to the following conditions: 
    13# 
    14# The above copyright notice and this permission notice shall be included in 
    15# all copies or substantial portions of the Software. 
    16# 
    17# THE SOFTWARE IS PROVIDED *AS IS*, WITHOUT WARRANTY OF ANY KIND, EXPRESS OR 
    18# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, 
    19# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE 
    20# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER 
    21# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING 
    22# FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS 
    23# IN THE SOFTWARE. 
    24# 
    25# -------------------------------------------------------------------------- 
    26import logging 
    27from typing import Callable, Any, Tuple, Generic, TypeVar, Generator, Awaitable 
    28 
    29from ..exceptions import AzureError 
    30from ._poller import _SansIONoPolling 
    31 
    32 
    33PollingReturnType_co = TypeVar("PollingReturnType_co", covariant=True) 
    34DeserializationCallbackType = Any 
    35 
    36_LOGGER = logging.getLogger(__name__) 
    37 
    38 
    39class AsyncPollingMethod(Generic[PollingReturnType_co]): 
    40    """ABC class for polling method.""" 
    41 
    42    def initialize( 
    43        self, 
    44        client: Any, 
    45        initial_response: Any, 
    46        deserialization_callback: DeserializationCallbackType, 
    47    ) -> None: 
    48        """Initialize the polling method with the client, initial response, and deserialization callback. 
    49 
    50        :param client: A pipeline service client. 
    51        :type client: ~azure.core.PipelineClient 
    52        :param initial_response: The initial call response. 
    53        :type initial_response: ~azure.core.pipeline.PipelineResponse 
    54        :param deserialization_callback: A callback that takes a Response and returns a deserialized object. 
    55                                         If a subclass of Model is given, this passes "deserialize" as callback. 
    56        :type deserialization_callback: callable or msrest.serialization.Model 
    57        :return: None 
    58        :rtype: None 
    59        """ 
    60        raise NotImplementedError("This method needs to be implemented") 
    61 
    62    async def run(self) -> None: 
    63        """Run the polling method. 
    64        This method should be overridden to implement the polling logic. 
    65 
    66        :return: None 
    67        :rtype: None 
    68        """ 
    69        raise NotImplementedError("This method needs to be implemented") 
    70 
    71    def status(self) -> str: 
    72        """Return the current status of the polling operation. 
    73 
    74        :returns: The current status string. 
    75        :rtype: str 
    76        """ 
    77        raise NotImplementedError("This method needs to be implemented") 
    78 
    79    def finished(self) -> bool: 
    80        """Check if the polling operation is finished. 
    81 
    82        :returns: True if the polling operation is finished, False otherwise. 
    83        :rtype: bool 
    84        """ 
    85        raise NotImplementedError("This method needs to be implemented") 
    86 
    87    def resource(self) -> PollingReturnType_co: 
    88        """Return the resource of the long running operation. 
    89 
    90        :returns: The deserialized resource of the long running operation, if one is available. 
    91        :rtype: any 
    92        """ 
    93        raise NotImplementedError("This method needs to be implemented") 
    94 
    95    def get_continuation_token(self) -> str: 
    96        """Return a continuation token that allows to restart the poller later. 
    97 
    98        :returns: An opaque continuation token 
    99        :rtype: str 
    100        """ 
    101        raise TypeError("Polling method '{}' doesn't support get_continuation_token".format(self.__class__.__name__)) 
    102 
    103    @classmethod 
    104    def from_continuation_token( 
    105        cls, continuation_token: str, **kwargs: Any 
    106    ) -> Tuple[Any, Any, DeserializationCallbackType]: 
    107        """Create a poller from a continuation token. 
    108 
    109        :param continuation_token: An opaque continuation token 
    110        :type continuation_token: str 
    111        :return: A tuple containing the client, initial response, and deserialization callback. 
    112        :rtype: Tuple[Any, Any, DeserializationCallbackType] 
    113        """ 
    114        raise TypeError("Polling method '{}' doesn't support from_continuation_token".format(cls.__name__)) 
    115 
    116 
    117class AsyncNoPolling(_SansIONoPolling[PollingReturnType_co], AsyncPollingMethod[PollingReturnType_co]): 
    118    """An empty async poller that returns the deserialized initial response.""" 
    119 
    120    async def run(self) -> None: 
    121        """Empty run, no polling. 
    122        Just override initial run to add "async" 
    123        """ 
    124 
    125 
    126async def async_poller( 
    127    client: Any, 
    128    initial_response: Any, 
    129    deserialization_callback: Callable[[Any], PollingReturnType_co], 
    130    polling_method: AsyncPollingMethod[PollingReturnType_co], 
    131) -> PollingReturnType_co: 
    132    """Async Poller for long running operations. 
    133 
    134    .. deprecated:: 1.5.0 
    135       Use :class:`AsyncLROPoller` instead. 
    136 
    137    :param client: A pipeline service client. 
    138    :type client: ~azure.core.PipelineClient 
    139    :param initial_response: The initial call response 
    140    :type initial_response: ~azure.core.pipeline.PipelineResponse 
    141    :param deserialization_callback: A callback that takes a Response and return a deserialized object. 
    142                                     If a subclass of Model is given, this passes "deserialize" as callback. 
    143    :type deserialization_callback: callable or msrest.serialization.Model 
    144    :param polling_method: The polling strategy to adopt 
    145    :type polling_method: ~azure.core.polling.PollingMethod 
    146    :return: The final resource at the end of the polling. 
    147    :rtype: any or None 
    148    """ 
    149    poller = AsyncLROPoller(client, initial_response, deserialization_callback, polling_method) 
    150    return await poller 
    151 
    152 
    153class AsyncLROPoller(Generic[PollingReturnType_co], Awaitable[PollingReturnType_co]): 
    154    """Async poller for long running operations. 
    155 
    156    :param client: A pipeline service client 
    157    :type client: ~azure.core.PipelineClient 
    158    :param initial_response: The initial call response 
    159    :type initial_response: ~azure.core.pipeline.PipelineResponse 
    160    :param deserialization_callback: A callback that takes a Response and return a deserialized object. 
    161                                     If a subclass of Model is given, this passes "deserialize" as callback. 
    162    :type deserialization_callback: callable or msrest.serialization.Model 
    163    :param polling_method: The polling strategy to adopt 
    164    :type polling_method: ~azure.core.polling.AsyncPollingMethod 
    165    """ 
    166 
    167    def __init__( 
    168        self, 
    169        client: Any, 
    170        initial_response: Any, 
    171        deserialization_callback: Callable[[Any], PollingReturnType_co], 
    172        polling_method: AsyncPollingMethod[PollingReturnType_co], 
    173    ): 
    174        self._polling_method = polling_method 
    175        self._done = False 
    176 
    177        # This implicit test avoids bringing in an explicit dependency on Model directly 
    178        try: 
    179            deserialization_callback = deserialization_callback.deserialize  # type: ignore 
    180        except AttributeError: 
    181            pass 
    182 
    183        self._polling_method.initialize(client, initial_response, deserialization_callback) 
    184 
    185    def polling_method(self) -> AsyncPollingMethod[PollingReturnType_co]: 
    186        """Return the polling method associated to this poller. 
    187 
    188        :return: The polling method associated to this poller. 
    189        :rtype: ~azure.core.polling.AsyncPollingMethod 
    190        """ 
    191        return self._polling_method 
    192 
    193    def continuation_token(self) -> str: 
    194        """Return a continuation token that allows to restart the poller later. 
    195 
    196        :returns: An opaque continuation token 
    197        :rtype: str 
    198        """ 
    199        return self._polling_method.get_continuation_token() 
    200 
    201    @classmethod 
    202    def from_continuation_token( 
    203        cls, polling_method: AsyncPollingMethod[PollingReturnType_co], continuation_token: str, **kwargs: Any 
    204    ) -> "AsyncLROPoller[PollingReturnType_co]": 
    205        """Create a poller from a continuation token. 
    206 
    207        :param polling_method: The polling strategy to adopt 
    208        :type polling_method: ~azure.core.polling.AsyncPollingMethod 
    209        :param continuation_token: An opaque continuation token 
    210        :type continuation_token: str 
    211        :return: An instance of AsyncLROPoller 
    212        :rtype: ~azure.core.polling.AsyncLROPoller 
    213        :raises ~azure.core.exceptions.HttpResponseError: If the continuation token is invalid. 
    214        """ 
    215        ( 
    216            client, 
    217            initial_response, 
    218            deserialization_callback, 
    219        ) = polling_method.from_continuation_token(continuation_token, **kwargs) 
    220        return cls(client, initial_response, deserialization_callback, polling_method) 
    221 
    222    def status(self) -> str: 
    223        """Returns the current status string. 
    224 
    225        :returns: The current status string 
    226        :rtype: str 
    227        """ 
    228        return self._polling_method.status() 
    229 
    230    async def result(self) -> PollingReturnType_co: 
    231        """Return the result of the long running operation. 
    232 
    233        :returns: The deserialized resource of the long running operation, if one is available. 
    234        :rtype: any or None 
    235        :raises ~azure.core.exceptions.HttpResponseError: Server problem with the query. 
    236        """ 
    237        await self.wait() 
    238        return self._polling_method.resource() 
    239 
    240    def __await__(self) -> Generator[Any, None, PollingReturnType_co]: 
    241        return self.result().__await__() 
    242 
    243    async def wait(self) -> None: 
    244        """Wait on the long running operation. 
    245 
    246        :raises ~azure.core.exceptions.HttpResponseError: Server problem with the query. 
    247        """ 
    248        try: 
    249            await self._polling_method.run() 
    250        except AzureError as error: 
    251            if not error.continuation_token: 
    252                try: 
    253                    error.continuation_token = self.continuation_token() 
    254                except Exception:  # pylint: disable=broad-except 
    255                    _LOGGER.warning("Unable to retrieve continuation token.") 
    256                    error.continuation_token = None 
    257            raise 
    258        self._done = True 
    259 
    260    def done(self) -> bool: 
    261        """Check status of the long running operation. 
    262 
    263        :returns: 'True' if the process has completed, else 'False'. 
    264        :rtype: bool 
    265        """ 
    266        return self._done