1# Copyright 2023 Google LLC All rights reserved. 
    2# 
    3# Licensed under the Apache License, Version 2.0 (the "License"); 
    4# you may not use this file except in compliance with the License. 
    5# You may obtain a copy of the License at 
    6# 
    7#     http://www.apache.org/licenses/LICENSE-2.0 
    8# 
    9# Unless required by applicable law or agreed to in writing, software 
    10# distributed under the License is distributed on an "AS IS" BASIS, 
    11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
    12# See the License for the specific language governing permissions and 
    13# limitations under the License. 
    14 
    15"""Classes for representing Async aggregation queries for the Google Cloud Firestore API. 
    16 
    17A :class:`~google.cloud.firestore_v1.async_aggregation.AsyncAggregationQuery` can be created directly from 
    18a :class:`~google.cloud.firestore_v1.async_collection.AsyncCollection` and that can be 
    19a more common way to create an aggregation query than direct usage of the constructor. 
    20""" 
    21from __future__ import annotations 
    22 
    23from typing import TYPE_CHECKING, Any, AsyncGenerator, List, Optional, Union 
    24 
    25from google.api_core import gapic_v1 
    26from google.api_core import retry_async as retries 
    27 
    28from google.cloud.firestore_v1 import transaction 
    29from google.cloud.firestore_v1.async_stream_generator import AsyncStreamGenerator 
    30from google.cloud.firestore_v1.base_aggregation import ( 
    31    BaseAggregationQuery, 
    32    _query_response_to_result, 
    33) 
    34from google.cloud.firestore_v1.query_results import QueryResultsList 
    35 
    36if TYPE_CHECKING:  # pragma: NO COVER 
    37    from google.cloud.firestore_v1.base_aggregation import AggregationResult 
    38    from google.cloud.firestore_v1.query_profile import ExplainMetrics, ExplainOptions 
    39    import google.cloud.firestore_v1.types.query_profile as query_profile_pb 
    40    import datetime 
    41 
    42 
    43class AsyncAggregationQuery(BaseAggregationQuery): 
    44    """Represents an aggregation query to the Firestore API.""" 
    45 
    46    def __init__( 
    47        self, 
    48        nested_query, 
    49    ) -> None: 
    50        super(AsyncAggregationQuery, self).__init__(nested_query) 
    51 
    52    async def get( 
    53        self, 
    54        transaction=None, 
    55        retry: Union[retries.AsyncRetry, None, object] = gapic_v1.method.DEFAULT, 
    56        timeout: float | None = None, 
    57        *, 
    58        explain_options: Optional[ExplainOptions] = None, 
    59        read_time: Optional[datetime.datetime] = None, 
    60    ) -> QueryResultsList[List[AggregationResult]]: 
    61        """Runs the aggregation query. 
    62 
    63        This sends a ``RunAggregationQuery`` RPC and returns a list of aggregation results in the stream of ``RunAggregationQueryResponse`` messages. 
    64 
    65        Args: 
    66            transaction 
    67                (Optional[:class:`~google.cloud.firestore_v1.transaction.Transaction`]): 
    68                An existing transaction that this query will run in. 
    69                If a ``transaction`` is used and it already has write operations 
    70                added, this method cannot be used (i.e. read-after-write is not 
    71                allowed). 
    72            retry (google.api_core.retry.Retry): Designation of what errors, if any, 
    73                should be retried.  Defaults to a system-specified policy. 
    74            timeout (float): The timeout for this request.  Defaults to a 
    75                system-specified value. 
    76            explain_options 
    77                (Optional[:class:`~google.cloud.firestore_v1.query_profile.ExplainOptions`]): 
    78                Options to enable query profiling for this query. When set, 
    79                explain_metrics will be available on the returned generator. 
    80            read_time (Optional[datetime.datetime]): If set, reads documents as they were at the given 
    81                time. This must be a timestamp within the past one hour, or if Point-in-Time Recovery 
    82                is enabled, can additionally be a whole minute timestamp within the past 7 days. If no 
    83                timezone is specified in the :class:`datetime.datetime` object, it is assumed to be UTC. 
    84 
    85        Returns: 
    86            QueryResultsList[List[AggregationResult]]: The aggregation query results. 
    87 
    88        """ 
    89        explain_metrics: ExplainMetrics | None = None 
    90 
    91        stream_result = self.stream( 
    92            transaction=transaction, 
    93            retry=retry, 
    94            timeout=timeout, 
    95            explain_options=explain_options, 
    96            read_time=read_time, 
    97        ) 
    98        try: 
    99            result = [aggregation async for aggregation in stream_result] 
    100 
    101            if explain_options is None: 
    102                explain_metrics = None 
    103            else: 
    104                explain_metrics = await stream_result.get_explain_metrics() 
    105        finally: 
    106            await stream_result.aclose() 
    107 
    108        return QueryResultsList(result, explain_options, explain_metrics) 
    109 
    110    async def _make_stream( 
    111        self, 
    112        transaction: Optional[transaction.Transaction] = None, 
    113        retry: retries.AsyncRetry | object | None = gapic_v1.method.DEFAULT, 
    114        timeout: Optional[float] = None, 
    115        explain_options: Optional[ExplainOptions] = None, 
    116        read_time: Optional[datetime.datetime] = None, 
    117    ) -> AsyncGenerator[List[AggregationResult] | query_profile_pb.ExplainMetrics, Any]: 
    118        """Internal method for stream(). Runs the aggregation query. 
    119 
    120        This sends a ``RunAggregationQuery`` RPC and then returns a generator which 
    121        consumes each document returned in the stream of ``RunAggregationQueryResponse`` 
    122        messages. 
    123 
    124        If a ``transaction`` is used and it already has write operations 
    125        added, this method cannot be used (i.e. read-after-write is not 
    126        allowed). 
    127 
    128        Args: 
    129            transaction (Optional[:class:`~google.cloud.firestore_v1.transaction.\ 
    130                Transaction`]): 
    131                An existing transaction that the query will run in. 
    132            retry (Optional[google.api_core.retry.Retry]): Designation of what 
    133                errors, if any, should be retried.  Defaults to a 
    134                system-specified policy. 
    135            timeout (Optional[float]): The timeout for this request. Defaults 
    136                to a system-specified value. 
    137            explain_options 
    138                (Optional[:class:`~google.cloud.firestore_v1.query_profile.ExplainOptions`]): 
    139                Options to enable query profiling for this query. When set, 
    140                explain_metrics will be available on the returned generator. 
    141            read_time (Optional[datetime.datetime]): If set, reads documents as they were at the given 
    142                time. This must be a timestamp within the past one hour, or if Point-in-Time Recovery 
    143                is enabled, can additionally be a whole minute timestamp within the past 7 days. If no 
    144                timezone is specified in the :class:`datetime.datetime` object, it is assumed to be UTC. 
    145 
    146        Yields: 
    147            List[AggregationResult] | query_profile_pb.ExplainMetrics: 
    148            The result of aggregations of this query. Query results will be 
    149            yielded as `List[AggregationResult]`. When the result contains 
    150            returned explain metrics, yield `query_profile_pb.ExplainMetrics` 
    151            individually. 
    152        """ 
    153        request, kwargs = self._prep_stream( 
    154            transaction, 
    155            retry, 
    156            timeout, 
    157            explain_options, 
    158            read_time, 
    159        ) 
    160 
    161        response_iterator = await self._client._firestore_api.run_aggregation_query( 
    162            request=request, 
    163            metadata=self._client._rpc_metadata, 
    164            **kwargs, 
    165        ) 
    166 
    167        async for response in response_iterator: 
    168            result = _query_response_to_result(response) 
    169            if result: 
    170                yield result 
    171 
    172            if response.explain_metrics: 
    173                metrics = response.explain_metrics 
    174                yield metrics 
    175 
    176    def stream( 
    177        self, 
    178        transaction: Optional[transaction.Transaction] = None, 
    179        retry: retries.AsyncRetry | object | None = gapic_v1.method.DEFAULT, 
    180        timeout: Optional[float] = None, 
    181        *, 
    182        explain_options: Optional[ExplainOptions] = None, 
    183        read_time: Optional[datetime.datetime] = None, 
    184    ) -> AsyncStreamGenerator[List[AggregationResult]]: 
    185        """Runs the aggregation query. 
    186 
    187        This sends a ``RunAggregationQuery`` RPC and then returns a generator 
    188        which consumes each document returned in the stream of 
    189        ``RunAggregationQueryResponse`` messages. 
    190 
    191        If a ``transaction`` is used and it already has write operations added, 
    192        this method cannot be used (i.e. read-after-write is not allowed). 
    193 
    194        Args: 
    195            transaction (Optional[:class:`~google.cloud.firestore_v1.transaction.\ 
    196                Transaction`]): 
    197                An existing transaction that the query will run in. 
    198            retry (Optional[google.api_core.retry.Retry]): Designation of what 
    199                errors, if any, should be retried.  Defaults to a 
    200                system-specified policy. 
    201            timeout (Optional[float]): The timeout for this request. Defaults 
    202                to a system-specified value. 
    203            explain_options 
    204                (Optional[:class:`~google.cloud.firestore_v1.query_profile.ExplainOptions`]): 
    205                Options to enable query profiling for this query. When set, 
    206                explain_metrics will be available on the returned generator. 
    207            read_time (Optional[datetime.datetime]): If set, reads documents as they were at the given 
    208                time. This must be a timestamp within the past one hour, or if Point-in-Time Recovery 
    209                is enabled, can additionally be a whole minute timestamp within the past 7 days. If no 
    210                timezone is specified in the :class:`datetime.datetime` object, it is assumed to be UTC. 
    211 
    212        Returns: 
    213            `AsyncStreamGenerator[List[AggregationResult]]`: 
    214                A generator of the query results. 
    215        """ 
    216 
    217        inner_generator = self._make_stream( 
    218            transaction=transaction, 
    219            retry=retry, 
    220            timeout=timeout, 
    221            explain_options=explain_options, 
    222            read_time=read_time, 
    223        ) 
    224        return AsyncStreamGenerator(inner_generator, explain_options)