1# Copyright 2024 Google LLC All rights reserved. 
    2# 
    3# Licensed under the Apache License, Version 2.0 (the "License"); 
    4# you may not use this file except in compliance with the License. 
    5# You may obtain a copy of the License at 
    6# 
    7#     http://www.apache.org/licenses/LICENSE-2.0 
    8# 
    9# Unless required by applicable law or agreed to in writing, software 
    10# distributed under the License is distributed on an "AS IS" BASIS, 
    11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
    12# See the License for the specific language governing permissions and 
    13# limitations under the License. 
    14 
    15"""Classes for iterating over stream results async for the Google Cloud 
    16Firestore API. 
    17""" 
    18from __future__ import annotations 
    19 
    20from typing import TYPE_CHECKING, Any, AsyncGenerator, Coroutine, Optional, TypeVar 
    21 
    22from google.cloud.firestore_v1.query_profile import ( 
    23    ExplainMetrics, 
    24    QueryExplainError, 
    25) 
    26import google.cloud.firestore_v1.types.query_profile as query_profile_pb 
    27 
    28if TYPE_CHECKING:  # pragma: NO COVER 
    29    from google.cloud.firestore_v1.query_profile import ExplainOptions 
    30 
    31 
    32T = TypeVar("T") 
    33 
    34 
    35class AsyncStreamGenerator(AsyncGenerator[T, Any]): 
    36    """Asynchronous Generator for the streamed results. 
    37 
    38    Args: 
    39        response_generator (AsyncGenerator): 
    40            The inner generator that yields the returned results in the stream. 
    41        explain_options 
    42            (Optional[:class:`~google.cloud.firestore_v1.query_profile.ExplainOptions`]): 
    43            Query profiling options for this stream request. 
    44    """ 
    45 
    46    def __init__( 
    47        self, 
    48        response_generator: AsyncGenerator[T | query_profile_pb.ExplainMetrics, Any], 
    49        explain_options: Optional[ExplainOptions] = None, 
    50    ): 
    51        self._generator = response_generator 
    52        self._explain_options = explain_options 
    53        self._explain_metrics = None 
    54 
    55    def __aiter__(self) -> AsyncGenerator[T, Any]: 
    56        return self 
    57 
    58    async def __anext__(self) -> T: 
    59        try: 
    60            next_value = await self._generator.__anext__() 
    61            if type(next_value) is query_profile_pb.ExplainMetrics: 
    62                self._explain_metrics = ExplainMetrics._from_pb(next_value) 
    63                raise StopAsyncIteration 
    64            else: 
    65                return next_value 
    66        except StopAsyncIteration: 
    67            raise 
    68 
    69    def asend(self, value: Any = None) -> Coroutine[Any, Any, T]: 
    70        return self._generator.asend(value) 
    71 
    72    def athrow(self, *args, **kwargs) -> Coroutine[Any, Any, T]: 
    73        return self._generator.athrow(*args, **kwargs) 
    74 
    75    def aclose(self): 
    76        return self._generator.aclose() 
    77 
    78    @property 
    79    def explain_options(self) -> ExplainOptions | None: 
    80        """Query profiling options for this stream request.""" 
    81        return self._explain_options 
    82 
    83    async def get_explain_metrics(self) -> ExplainMetrics: 
    84        """ 
    85        Get the metrics associated with the query execution. 
    86        Metrics are only available when explain_options is set on the query. If 
    87        ExplainOptions.analyze is False, only plan_summary is available. If it is 
    88        True, execution_stats is also available. 
    89        :rtype: :class:`~google.cloud.firestore_v1.query_profile.ExplainMetrics` 
    90        :returns: The metrics associated with the query execution. 
    91        :raises: :class:`~google.cloud.firestore_v1.query_profile.QueryExplainError` 
    92            if explain_metrics is not available on the query. 
    93        """ 
    94        if self._explain_metrics is not None: 
    95            return self._explain_metrics 
    96        elif self._explain_options is None: 
    97            raise QueryExplainError("explain_options not set on query.") 
    98        elif self._explain_options.analyze is False: 
    99            # We need to run the query to get the explain_metrics. Since no 
    100            # query results are returned, it's ok to discard the returned value. 
    101            try: 
    102                await self.__anext__() 
    103            except StopAsyncIteration: 
    104                pass 
    105 
    106            if self._explain_metrics is None: 
    107                raise QueryExplainError( 
    108                    "Did not receive explain_metrics for this query, despite " 
    109                    "explain_options is set and analyze = False." 
    110                ) 
    111            else: 
    112                return self._explain_metrics 
    113        raise QueryExplainError( 
    114            "explain_metrics not available until query is complete." 
    115        )