1# Copyright 2020 Google LLC 
    2# 
    3# Licensed under the Apache License, Version 2.0 (the "License"); 
    4# you may not use this file except in compliance with the License. 
    5# You may obtain a copy of the License at 
    6# 
    7#     http://www.apache.org/licenses/LICENSE-2.0 
    8# 
    9# Unless required by applicable law or agreed to in writing, software 
    10# distributed under the License is distributed on an "AS IS" BASIS, 
    11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
    12# See the License for the specific language governing permissions and 
    13# limitations under the License. 
    14 
    15"""AsyncIO iterators for paging through paged API methods. 
    16 
    17These iterators simplify the process of paging through API responses 
    18where the request takes a page token and the response is a list of results with 
    19a token for the next page. See `list pagination`_ in the Google API Style Guide 
    20for more details. 
    21 
    22.. _list pagination: 
    23    https://cloud.google.com/apis/design/design_patterns#list_pagination 
    24 
    25API clients that have methods that follow the list pagination pattern can 
    26return an :class:`.AsyncIterator`: 
    27 
    28    >>> results_iterator = await client.list_resources() 
    29 
    30Or you can walk your way through items and call off the search early if 
    31you find what you're looking for (resulting in possibly fewer requests):: 
    32 
    33    >>> async for resource in results_iterator: 
    34    ...     print(resource.name) 
    35    ...     if not resource.is_valid: 
    36    ...         break 
    37 
    38At any point, you may check the number of items consumed by referencing the 
    39``num_results`` property of the iterator:: 
    40 
    41    >>> async for my_item in results_iterator: 
    42    ...     if results_iterator.num_results >= 10: 
    43    ...         break 
    44 
    45When iterating, not every new item will send a request to the server. 
    46To iterate based on each page of items (where a page corresponds to 
    47a request):: 
    48 
    49    >>> async for page in results_iterator.pages: 
    50    ...     print('=' * 20) 
    51    ...     print('    Page number: {:d}'.format(iterator.page_number)) 
    52    ...     print('  Items in page: {:d}'.format(page.num_items)) 
    53    ...     print('     First item: {!r}'.format(next(page))) 
    54    ...     print('Items remaining: {:d}'.format(page.remaining)) 
    55    ...     print('Next page token: {}'.format(iterator.next_page_token)) 
    56    ==================== 
    57        Page number: 1 
    58      Items in page: 1 
    59         First item: <MyItemClass at 0x7f1d3cccf690> 
    60    Items remaining: 0 
    61    Next page token: eav1OzQB0OM8rLdGXOEsyQWSG 
    62    ==================== 
    63        Page number: 2 
    64      Items in page: 19 
    65         First item: <MyItemClass at 0x7f1d3cccffd0> 
    66    Items remaining: 18 
    67    Next page token: None 
    68""" 
    69 
    70import abc 
    71 
    72from google.api_core.page_iterator import Page 
    73 
    74 
    75def _item_to_value_identity(iterator, item): 
    76    """An item to value transformer that returns the item un-changed.""" 
    77    # pylint: disable=unused-argument 
    78    # We are conforming to the interface defined by Iterator. 
    79    return item 
    80 
    81 
    82class AsyncIterator(abc.ABC): 
    83    """A generic class for iterating through API list responses. 
    84 
    85    Args: 
    86        client(google.cloud.client.Client): The API client. 
    87        item_to_value (Callable[google.api_core.page_iterator_async.AsyncIterator, Any]): 
    88            Callable to convert an item from the type in the raw API response 
    89            into the native object. Will be called with the iterator and a 
    90            single item. 
    91        page_token (str): A token identifying a page in a result set to start 
    92            fetching results from. 
    93        max_results (int): The maximum number of results to fetch. 
    94    """ 
    95 
    96    def __init__( 
    97        self, 
    98        client, 
    99        item_to_value=_item_to_value_identity, 
    100        page_token=None, 
    101        max_results=None, 
    102    ): 
    103        self._started = False 
    104        self.__active_aiterator = None 
    105 
    106        self.client = client 
    107        """Optional[Any]: The client that created this iterator.""" 
    108        self.item_to_value = item_to_value 
    109        """Callable[Iterator, Any]: Callable to convert an item from the type 
    110            in the raw API response into the native object. Will be called with 
    111            the iterator and a 
    112            single item. 
    113        """ 
    114        self.max_results = max_results 
    115        """int: The maximum number of results to fetch.""" 
    116 
    117        # The attributes below will change over the life of the iterator. 
    118        self.page_number = 0 
    119        """int: The current page of results.""" 
    120        self.next_page_token = page_token 
    121        """str: The token for the next page of results. If this is set before 
    122            the iterator starts, it effectively offsets the iterator to a 
    123            specific starting point.""" 
    124        self.num_results = 0 
    125        """int: The total number of results fetched so far.""" 
    126 
    127    @property 
    128    def pages(self): 
    129        """Iterator of pages in the response. 
    130 
    131        returns: 
    132            types.GeneratorType[google.api_core.page_iterator.Page]: A 
    133                generator of page instances. 
    134 
    135        raises: 
    136            ValueError: If the iterator has already been started. 
    137        """ 
    138        if self._started: 
    139            raise ValueError("Iterator has already started", self) 
    140        self._started = True 
    141        return self._page_aiter(increment=True) 
    142 
    143    async def _items_aiter(self): 
    144        """Iterator for each item returned.""" 
    145        async for page in self._page_aiter(increment=False): 
    146            for item in page: 
    147                self.num_results += 1 
    148                yield item 
    149 
    150    def __aiter__(self): 
    151        """Iterator for each item returned. 
    152 
    153        Returns: 
    154            types.GeneratorType[Any]: A generator of items from the API. 
    155 
    156        Raises: 
    157            ValueError: If the iterator has already been started. 
    158        """ 
    159        if self._started: 
    160            raise ValueError("Iterator has already started", self) 
    161        self._started = True 
    162        return self._items_aiter() 
    163 
    164    async def __anext__(self): 
    165        if self.__active_aiterator is None: 
    166            self.__active_aiterator = self.__aiter__() 
    167        return await self.__active_aiterator.__anext__() 
    168 
    169    async def _page_aiter(self, increment): 
    170        """Generator of pages of API responses. 
    171 
    172        Args: 
    173            increment (bool): Flag indicating if the total number of results 
    174                should be incremented on each page. This is useful since a page 
    175                iterator will want to increment by results per page while an 
    176                items iterator will want to increment per item. 
    177 
    178        Yields: 
    179            Page: each page of items from the API. 
    180        """ 
    181        page = await self._next_page() 
    182        while page is not None: 
    183            self.page_number += 1 
    184            if increment: 
    185                self.num_results += page.num_items 
    186            yield page 
    187            page = await self._next_page() 
    188 
    189    @abc.abstractmethod 
    190    async def _next_page(self): 
    191        """Get the next page in the iterator. 
    192 
    193        This does nothing and is intended to be over-ridden by subclasses 
    194        to return the next :class:`Page`. 
    195 
    196        Raises: 
    197            NotImplementedError: Always, this method is abstract. 
    198        """ 
    199        raise NotImplementedError 
    200 
    201 
    202class AsyncGRPCIterator(AsyncIterator): 
    203    """A generic class for iterating through gRPC list responses. 
    204 
    205    .. note:: The class does not take a ``page_token`` argument because it can 
    206        just be specified in the ``request``. 
    207 
    208    Args: 
    209        client (google.cloud.client.Client): The API client. This unused by 
    210            this class, but kept to satisfy the :class:`Iterator` interface. 
    211        method (Callable[protobuf.Message]): A bound gRPC method that should 
    212            take a single message for the request. 
    213        request (protobuf.Message): The request message. 
    214        items_field (str): The field in the response message that has the 
    215            items for the page. 
    216        item_to_value (Callable[GRPCIterator, Any]): Callable to convert an 
    217            item from the type in the JSON response into a native object. Will 
    218            be called with the iterator and a single item. 
    219        request_token_field (str): The field in the request message used to 
    220            specify the page token. 
    221        response_token_field (str): The field in the response message that has 
    222            the token for the next page. 
    223        max_results (int): The maximum number of results to fetch. 
    224 
    225    .. autoattribute:: pages 
    226    """ 
    227 
    228    _DEFAULT_REQUEST_TOKEN_FIELD = "page_token" 
    229    _DEFAULT_RESPONSE_TOKEN_FIELD = "next_page_token" 
    230 
    231    def __init__( 
    232        self, 
    233        client, 
    234        method, 
    235        request, 
    236        items_field, 
    237        item_to_value=_item_to_value_identity, 
    238        request_token_field=_DEFAULT_REQUEST_TOKEN_FIELD, 
    239        response_token_field=_DEFAULT_RESPONSE_TOKEN_FIELD, 
    240        max_results=None, 
    241    ): 
    242        super().__init__(client, item_to_value, max_results=max_results) 
    243        self._method = method 
    244        self._request = request 
    245        self._items_field = items_field 
    246        self._request_token_field = request_token_field 
    247        self._response_token_field = response_token_field 
    248 
    249    async def _next_page(self): 
    250        """Get the next page in the iterator. 
    251 
    252        Returns: 
    253            Page: The next page in the iterator or :data:`None` if 
    254                there are no pages left. 
    255        """ 
    256        if not self._has_next_page(): 
    257            return None 
    258 
    259        if self.next_page_token is not None: 
    260            setattr(self._request, self._request_token_field, self.next_page_token) 
    261 
    262        response = await self._method(self._request) 
    263 
    264        self.next_page_token = getattr(response, self._response_token_field) 
    265        items = getattr(response, self._items_field) 
    266        page = Page(self, items, self.item_to_value, raw_page=response) 
    267 
    268        return page 
    269 
    270    def _has_next_page(self): 
    271        """Determines whether or not there are more pages with results. 
    272 
    273        Returns: 
    274            bool: Whether the iterator has more pages. 
    275        """ 
    276        if self.page_number == 0: 
    277            return True 
    278 
    279        # Note: intentionally a falsy check instead of a None check. The RPC 
    280        # can return an empty string indicating no more pages. 
    281        if self.max_results is not None: 
    282            if self.num_results >= self.max_results: 
    283                return False 
    284 
    285        return True if self.next_page_token else False