1# Copyright 2024 Google LLC All rights reserved. 
    2# 
    3# Licensed under the Apache License, Version 2.0 (the "License"); 
    4# you may not use this file except in compliance with the License. 
    5# You may obtain a copy of the License at 
    6# 
    7#     http://www.apache.org/licenses/LICENSE-2.0 
    8# 
    9# Unless required by applicable law or agreed to in writing, software 
    10# distributed under the License is distributed on an "AS IS" BASIS, 
    11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
    12# See the License for the specific language governing permissions and 
    13# limitations under the License. 
    14 
    15"""Classes for iterating over stream results for the Google Cloud Firestore API. 
    16""" 
    17from __future__ import annotations 
    18 
    19from typing import TYPE_CHECKING, Any, Generator, Optional, TypeVar 
    20 
    21from google.cloud.firestore_v1.query_profile import ( 
    22    ExplainMetrics, 
    23    QueryExplainError, 
    24) 
    25 
    26if TYPE_CHECKING:  # pragma: NO COVER 
    27    from google.cloud.firestore_v1.query_profile import ExplainOptions 
    28 
    29 
    30T = TypeVar("T") 
    31 
    32 
    33class StreamGenerator(Generator[T, Any, Optional[ExplainMetrics]]): 
    34    """Generator for the streamed results. 
    35 
    36    Args: 
    37        response_generator (Generator[T, Any, Optional[ExplainMetrics]]): 
    38            The inner generator that yields the returned document in the stream. 
    39        explain_options 
    40            (Optional[:class:`~google.cloud.firestore_v1.query_profile.ExplainOptions`]): 
    41            Query profiling options for this stream request. 
    42    """ 
    43 
    44    def __init__( 
    45        self, 
    46        response_generator: Generator[T, Any, Optional[ExplainMetrics]], 
    47        explain_options: Optional[ExplainOptions] = None, 
    48    ): 
    49        self._generator = response_generator 
    50        self._explain_options = explain_options 
    51        self._explain_metrics = None 
    52 
    53    def __iter__(self) -> StreamGenerator: 
    54        return self 
    55 
    56    def __next__(self) -> T: 
    57        try: 
    58            return self._generator.__next__() 
    59        except StopIteration as e: 
    60            # If explain_metrics is available, it would be returned. 
    61            if e.value: 
    62                self._explain_metrics = ExplainMetrics._from_pb(e.value) 
    63            raise 
    64 
    65    def send(self, value: Any = None) -> T: 
    66        return self._generator.send(value) 
    67 
    68    def throw(self, *args, **kwargs) -> T: 
    69        return self._generator.throw(*args, **kwargs) 
    70 
    71    def close(self): 
    72        return self._generator.close() 
    73 
    74    @property 
    75    def explain_options(self) -> ExplainOptions | None: 
    76        """Query profiling options for this stream request.""" 
    77        return self._explain_options 
    78 
    79    def get_explain_metrics(self) -> ExplainMetrics: 
    80        """ 
    81        Get the metrics associated with the query execution. 
    82        Metrics are only available when explain_options is set on the query. If 
    83        ExplainOptions.analyze is False, only plan_summary is available. If it is 
    84        True, execution_stats is also available. 
    85        :rtype: :class:`~google.cloud.firestore_v1.query_profile.ExplainMetrics` 
    86        :returns: The metrics associated with the query execution. 
    87        :raises: :class:`~google.cloud.firestore_v1.query_profile.QueryExplainError` 
    88            if explain_metrics is not available on the query. 
    89        """ 
    90        if self._explain_metrics is not None: 
    91            return self._explain_metrics 
    92        elif self._explain_options is None: 
    93            raise QueryExplainError("explain_options not set on query.") 
    94        elif self._explain_options.analyze is False: 
    95            # We need to run the query to get the explain_metrics. Since no 
    96            # query results are returned, it's ok to discard the returned value. 
    97            try: 
    98                next(self) 
    99            except StopIteration: 
    100                pass 
    101 
    102            if self._explain_metrics is None: 
    103                raise QueryExplainError( 
    104                    "Did not receive explain_metrics for this query, despite " 
    105                    "explain_options is set and analyze = False." 
    106                ) 
    107            else: 
    108                return self._explain_metrics 
    109        raise QueryExplainError( 
    110            "explain_metrics not available until query is complete." 
    111        )