1# Copyright 2023 Google LLC All rights reserved. 
    2# 
    3# Licensed under the Apache License, Version 2.0 (the "License"); 
    4# you may not use this file except in compliance with the License. 
    5# You may obtain a copy of the License at 
    6# 
    7#     http://www.apache.org/licenses/LICENSE-2.0 
    8# 
    9# Unless required by applicable law or agreed to in writing, software 
    10# distributed under the License is distributed on an "AS IS" BASIS, 
    11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
    12# See the License for the specific language governing permissions and 
    13# limitations under the License. 
    14 
    15"""Classes for representing aggregation queries for the Google Cloud Firestore API. 
    16 
    17A :class:`~google.cloud.firestore_v1.aggregation.AggregationQuery` can be created directly from 
    18a :class:`~google.cloud.firestore_v1.collection.Collection` and that can be 
    19a more common way to create an aggregation query than direct usage of the constructor. 
    20""" 
    21from __future__ import annotations 
    22 
    23from typing import TYPE_CHECKING, Any, Generator, List, Optional, Union 
    24 
    25from google.api_core import exceptions, gapic_v1 
    26from google.api_core import retry as retries 
    27 
    28from google.cloud.firestore_v1.base_aggregation import ( 
    29    AggregationResult, 
    30    BaseAggregationQuery, 
    31    _query_response_to_result, 
    32) 
    33from google.cloud.firestore_v1.query_results import QueryResultsList 
    34from google.cloud.firestore_v1.stream_generator import StreamGenerator 
    35 
    36# Types needed only for Type Hints 
    37if TYPE_CHECKING:  # pragma: NO COVER 
    38    from google.cloud.firestore_v1 import transaction 
    39    from google.cloud.firestore_v1.query_profile import ExplainMetrics 
    40    from google.cloud.firestore_v1.query_profile import ExplainOptions 
    41 
    42    import datetime 
    43 
    44 
    45class AggregationQuery(BaseAggregationQuery): 
    46    """Represents an aggregation query to the Firestore API.""" 
    47 
    48    def __init__( 
    49        self, 
    50        nested_query, 
    51    ) -> None: 
    52        super(AggregationQuery, self).__init__(nested_query) 
    53 
    54    def get( 
    55        self, 
    56        transaction=None, 
    57        retry: Union[retries.Retry, None, object] = gapic_v1.method.DEFAULT, 
    58        timeout: float | None = None, 
    59        *, 
    60        explain_options: Optional[ExplainOptions] = None, 
    61        read_time: Optional[datetime.datetime] = None, 
    62    ) -> QueryResultsList[AggregationResult]: 
    63        """Runs the aggregation query. 
    64 
    65        This sends a ``RunAggregationQuery`` RPC and returns a list of 
    66        aggregation results in the stream of ``RunAggregationQueryResponse`` 
    67        messages. 
    68 
    69        Args: 
    70            transaction 
    71                (Optional[:class:`~google.cloud.firestore_v1.transaction.Transaction`]): 
    72                An existing transaction that this query will run in. 
    73                If a ``transaction`` is used and it already has write operations 
    74                added, this method cannot be used (i.e. read-after-write is not 
    75                allowed). 
    76            retry (google.api_core.retry.Retry): Designation of what errors, if any, 
    77                should be retried.  Defaults to a system-specified policy. 
    78            timeout (float): The timeout for this request.  Defaults to a 
    79                system-specified value. 
    80            explain_options 
    81                (Optional[:class:`~google.cloud.firestore_v1.query_profile.ExplainOptions`]): 
    82                Options to enable query profiling for this query. When set, 
    83                explain_metrics will be available on the returned generator. 
    84            read_time (Optional[datetime.datetime]): If set, reads documents as they were at the given 
    85                time. This must be a timestamp within the past one hour, or if Point-in-Time Recovery 
    86                is enabled, can additionally be a whole minute timestamp within the past 7 days. If no 
    87                timezone is specified in the :class:`datetime.datetime` object, it is assumed to be UTC. 
    88 
    89        Returns: 
    90            QueryResultsList[AggregationResult]: The aggregation query results. 
    91 
    92        """ 
    93        explain_metrics: ExplainMetrics | None = None 
    94 
    95        result = self.stream( 
    96            transaction=transaction, 
    97            retry=retry, 
    98            timeout=timeout, 
    99            explain_options=explain_options, 
    100            read_time=read_time, 
    101        ) 
    102        result_list = list(result) 
    103 
    104        if explain_options is None: 
    105            explain_metrics = None 
    106        else: 
    107            explain_metrics = result.get_explain_metrics() 
    108 
    109        return QueryResultsList(result_list, explain_options, explain_metrics) 
    110 
    111    def _get_stream_iterator( 
    112        self, transaction, retry, timeout, explain_options=None, read_time=None 
    113    ): 
    114        """Helper method for :meth:`stream`.""" 
    115        request, kwargs = self._prep_stream( 
    116            transaction, 
    117            retry, 
    118            timeout, 
    119            explain_options, 
    120            read_time, 
    121        ) 
    122 
    123        return self._client._firestore_api.run_aggregation_query( 
    124            request=request, 
    125            metadata=self._client._rpc_metadata, 
    126            **kwargs, 
    127        ) 
    128 
    129    def _retry_query_after_exception(self, exc, retry, transaction): 
    130        """Helper method for :meth:`stream`.""" 
    131        if transaction is None:  # no snapshot-based retry inside transaction 
    132            if retry is gapic_v1.method.DEFAULT: 
    133                transport = self._client._firestore_api._transport 
    134                gapic_callable = transport.run_aggregation_query 
    135                retry = gapic_callable._retry 
    136            return retry._predicate(exc) 
    137 
    138        return False 
    139 
    140    def _make_stream( 
    141        self, 
    142        transaction: Optional[transaction.Transaction] = None, 
    143        retry: Union[retries.Retry, None, object] = gapic_v1.method.DEFAULT, 
    144        timeout: Optional[float] = None, 
    145        explain_options: Optional[ExplainOptions] = None, 
    146        read_time: Optional[datetime.datetime] = None, 
    147    ) -> Generator[List[AggregationResult], Any, Optional[ExplainMetrics]]: 
    148        """Internal method for stream(). Runs the aggregation query. 
    149 
    150        This sends a ``RunAggregationQuery`` RPC and then returns a generator 
    151        which consumes each document returned in the stream of 
    152        ``RunAggregationQueryResponse`` messages. 
    153 
    154        If a ``transaction`` is used and it already has write operations added, 
    155        this method cannot be used (i.e. read-after-write is not allowed). 
    156 
    157        Args: 
    158            transaction 
    159                (Optional[:class:`~google.cloud.firestore_v1.transaction.Transaction`]): 
    160                An existing transaction that this query will run in. 
    161            retry (Optional[google.api_core.retry.Retry]): Designation of what 
    162                errors, if any, should be retried.  Defaults to a 
    163                system-specified policy. 
    164            timeout (Optional[float]): The timeout for this request.  Defaults 
    165                to a system-specified value. 
    166            explain_options 
    167                (Optional[:class:`~google.cloud.firestore_v1.query_profile.ExplainOptions`]): 
    168                Options to enable query profiling for this query. When set, 
    169                explain_metrics will be available on the returned generator. 
    170            read_time (Optional[datetime.datetime]): If set, reads documents as they were at the given 
    171                time. This must be a timestamp within the past one hour, or if Point-in-Time Recovery 
    172                is enabled, can additionally be a whole minute timestamp within the past 7 days. If no 
    173                timezone is specified in the :class:`datetime.datetime` object, it is assumed to be UTC. 
    174 
    175        Yields: 
    176            List[AggregationResult]: 
    177            The result of aggregations of this query. 
    178 
    179        Returns: 
    180            (Optional[google.cloud.firestore_v1.types.query_profile.ExplainMetrtics]): 
    181            The results of query profiling, if received from the service. 
    182 
    183        """ 
    184        metrics: ExplainMetrics | None = None 
    185 
    186        response_iterator = self._get_stream_iterator( 
    187            transaction, 
    188            retry, 
    189            timeout, 
    190            explain_options, 
    191            read_time, 
    192        ) 
    193        while True: 
    194            try: 
    195                response = next(response_iterator, None) 
    196            except exceptions.GoogleAPICallError as exc: 
    197                if self._retry_query_after_exception(exc, retry, transaction): 
    198                    response_iterator = self._get_stream_iterator( 
    199                        transaction, 
    200                        retry, 
    201                        timeout, 
    202                        explain_options, 
    203                        read_time, 
    204                    ) 
    205                    continue 
    206                else: 
    207                    raise 
    208 
    209            if response is None:  # EOI 
    210                break 
    211 
    212            if metrics is None and response.explain_metrics: 
    213                metrics = response.explain_metrics 
    214 
    215            result = _query_response_to_result(response) 
    216            if result: 
    217                yield result 
    218 
    219        return metrics 
    220 
    221    def stream( 
    222        self, 
    223        transaction: Optional["transaction.Transaction"] = None, 
    224        retry: Union[retries.Retry, None, object] = gapic_v1.method.DEFAULT, 
    225        timeout: Optional[float] = None, 
    226        *, 
    227        explain_options: Optional[ExplainOptions] = None, 
    228        read_time: Optional[datetime.datetime] = None, 
    229    ) -> StreamGenerator[List[AggregationResult]]: 
    230        """Runs the aggregation query. 
    231 
    232        This sends a ``RunAggregationQuery`` RPC and then returns a generator 
    233        which consumes each document returned in the stream of 
    234        ``RunAggregationQueryResponse`` messages. 
    235 
    236        If a ``transaction`` is used and it already has write operations added, 
    237        this method cannot be used (i.e. read-after-write is not allowed). 
    238 
    239        Args: 
    240            transaction 
    241                (Optional[:class:`~google.cloud.firestore_v1.transaction.Transaction`]): 
    242                An existing transaction that this query will run in. 
    243            retry (Optional[google.api_core.retry.Retry]): Designation of what 
    244                errors, if any, should be retried.  Defaults to a 
    245                system-specified policy. 
    246            timeout (Optinal[float]): The timeout for this request.  Defaults 
    247            to a system-specified value. 
    248            explain_options 
    249                (Optional[:class:`~google.cloud.firestore_v1.query_profile.ExplainOptions`]): 
    250                Options to enable query profiling for this query. When set, 
    251                explain_metrics will be available on the returned generator. 
    252            read_time (Optional[datetime.datetime]): If set, reads documents as they were at the given 
    253                time. This must be a timestamp within the past one hour, or if Point-in-Time Recovery 
    254                is enabled, can additionally be a whole minute timestamp within the past 7 days. If no 
    255                timezone is specified in the :class:`datetime.datetime` object, it is assumed to be UTC. 
    256 
    257        Returns: 
    258            `StreamGenerator[List[AggregationResult]]`: 
    259            A generator of the query results. 
    260        """ 
    261        inner_generator = self._make_stream( 
    262            transaction=transaction, 
    263            retry=retry, 
    264            timeout=timeout, 
    265            explain_options=explain_options, 
    266            read_time=read_time, 
    267        ) 
    268        return StreamGenerator(inner_generator, explain_options)