1# Copyright 2023 Google LLC All rights reserved.
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7# http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14
15"""Classes for representing Async aggregation queries for the Google Cloud Firestore API.
16
17A :class:`~google.cloud.firestore_v1.async_aggregation.AsyncAggregationQuery` can be created directly from
18a :class:`~google.cloud.firestore_v1.async_collection.AsyncCollection` and that can be
19a more common way to create an aggregation query than direct usage of the constructor.
20"""
21from __future__ import annotations
22
23from typing import TYPE_CHECKING, Any, AsyncGenerator, List, Optional, Union
24
25from google.api_core import gapic_v1
26from google.api_core import retry_async as retries
27
28from google.cloud.firestore_v1 import transaction
29from google.cloud.firestore_v1.async_stream_generator import AsyncStreamGenerator
30from google.cloud.firestore_v1.base_aggregation import (
31 BaseAggregationQuery,
32 _query_response_to_result,
33)
34from google.cloud.firestore_v1.query_results import QueryResultsList
35
36if TYPE_CHECKING: # pragma: NO COVER
37 from google.cloud.firestore_v1.base_aggregation import AggregationResult
38 from google.cloud.firestore_v1.query_profile import ExplainMetrics, ExplainOptions
39 import google.cloud.firestore_v1.types.query_profile as query_profile_pb
40 import datetime
41
42
43class AsyncAggregationQuery(BaseAggregationQuery):
44 """Represents an aggregation query to the Firestore API."""
45
46 def __init__(
47 self,
48 nested_query,
49 ) -> None:
50 super(AsyncAggregationQuery, self).__init__(nested_query)
51
52 async def get(
53 self,
54 transaction=None,
55 retry: Union[retries.AsyncRetry, None, object] = gapic_v1.method.DEFAULT,
56 timeout: float | None = None,
57 *,
58 explain_options: Optional[ExplainOptions] = None,
59 read_time: Optional[datetime.datetime] = None,
60 ) -> QueryResultsList[List[AggregationResult]]:
61 """Runs the aggregation query.
62
63 This sends a ``RunAggregationQuery`` RPC and returns a list of aggregation results in the stream of ``RunAggregationQueryResponse`` messages.
64
65 Args:
66 transaction (Optional[:class:`~google.cloud.firestore_v1.transaction.Transaction`]):
67 An existing transaction that this query will run in.
68 If a ``transaction`` is used and it already has write operations
69 added, this method cannot be used (i.e. read-after-write is not
70 allowed).
71 retry (google.api_core.retry.Retry): Designation of what errors, if any,
72 should be retried. Defaults to a system-specified policy.
73 timeout (float): The timeout for this request. Defaults to a
74 system-specified value.
75 explain_options (Optional[:class:`~google.cloud.firestore_v1.query_profile.ExplainOptions`]):
76 Options to enable query profiling for this query. When set,
77 explain_metrics will be available on the returned generator.
78 read_time (Optional[datetime.datetime]): If set, reads documents as they were at the given
79 time. This must be a timestamp within the past one hour, or if Point-in-Time Recovery
80 is enabled, can additionally be a whole minute timestamp within the past 7 days. If no
81 timezone is specified in the :class:`datetime.datetime` object, it is assumed to be UTC.
82
83 Returns:
84 QueryResultsList[List[AggregationResult]]: The aggregation query results.
85
86 """
87 explain_metrics: ExplainMetrics | None = None
88
89 stream_result = self.stream(
90 transaction=transaction,
91 retry=retry,
92 timeout=timeout,
93 explain_options=explain_options,
94 read_time=read_time,
95 )
96 try:
97 result = [aggregation async for aggregation in stream_result]
98
99 if explain_options is None:
100 explain_metrics = None
101 else:
102 explain_metrics = await stream_result.get_explain_metrics()
103 finally:
104 await stream_result.aclose()
105
106 return QueryResultsList(result, explain_options, explain_metrics)
107
108 async def _make_stream(
109 self,
110 transaction: Optional[transaction.Transaction] = None,
111 retry: retries.AsyncRetry | object | None = gapic_v1.method.DEFAULT,
112 timeout: Optional[float] = None,
113 explain_options: Optional[ExplainOptions] = None,
114 read_time: Optional[datetime.datetime] = None,
115 ) -> AsyncGenerator[List[AggregationResult] | query_profile_pb.ExplainMetrics, Any]:
116 """Internal method for stream(). Runs the aggregation query.
117
118 This sends a ``RunAggregationQuery`` RPC and then returns a generator which
119 consumes each document returned in the stream of ``RunAggregationQueryResponse``
120 messages.
121
122 If a ``transaction`` is used and it already has write operations
123 added, this method cannot be used (i.e. read-after-write is not
124 allowed).
125
126 Args:
127 transaction (Optional[:class:`~google.cloud.firestore_v1.transaction.\
128 Transaction`]):
129 An existing transaction that the query will run in.
130 retry (Optional[google.api_core.retry.Retry]): Designation of what
131 errors, if any, should be retried. Defaults to a
132 system-specified policy.
133 timeout (Optional[float]): The timeout for this request. Defaults
134 to a system-specified value.
135 explain_options (Optional[:class:`~google.cloud.firestore_v1.query_profile.ExplainOptions`]):
136 Options to enable query profiling for this query. When set,
137 explain_metrics will be available on the returned generator.
138 read_time (Optional[datetime.datetime]): If set, reads documents as they were at the given
139 time. This must be a timestamp within the past one hour, or if Point-in-Time Recovery
140 is enabled, can additionally be a whole minute timestamp within the past 7 days. If no
141 timezone is specified in the :class:`datetime.datetime` object, it is assumed to be UTC.
142
143 Yields:
144 List[AggregationResult] | query_profile_pb.ExplainMetrics:
145 The result of aggregations of this query. Query results will be
146 yielded as `List[AggregationResult]`. When the result contains
147 returned explain metrics, yield `query_profile_pb.ExplainMetrics`
148 individually.
149 """
150 request, kwargs = self._prep_stream(
151 transaction,
152 retry,
153 timeout,
154 explain_options,
155 read_time,
156 )
157
158 response_iterator = await self._client._firestore_api.run_aggregation_query(
159 request=request,
160 metadata=self._client._rpc_metadata,
161 **kwargs,
162 )
163
164 async for response in response_iterator:
165 result = _query_response_to_result(response)
166 if result:
167 yield result
168
169 if response.explain_metrics:
170 metrics = response.explain_metrics
171 yield metrics
172
173 def stream(
174 self,
175 transaction: Optional[transaction.Transaction] = None,
176 retry: retries.AsyncRetry | object | None = gapic_v1.method.DEFAULT,
177 timeout: Optional[float] = None,
178 *,
179 explain_options: Optional[ExplainOptions] = None,
180 read_time: Optional[datetime.datetime] = None,
181 ) -> AsyncStreamGenerator[List[AggregationResult]]:
182 """Runs the aggregation query.
183
184 This sends a ``RunAggregationQuery`` RPC and then returns a generator
185 which consumes each document returned in the stream of
186 ``RunAggregationQueryResponse`` messages.
187
188 If a ``transaction`` is used and it already has write operations added,
189 this method cannot be used (i.e. read-after-write is not allowed).
190
191 Args:
192 transaction (Optional[:class:`~google.cloud.firestore_v1.transaction.\
193 Transaction`]):
194 An existing transaction that the query will run in.
195 retry (Optional[google.api_core.retry.Retry]): Designation of what
196 errors, if any, should be retried. Defaults to a
197 system-specified policy.
198 timeout (Optional[float]): The timeout for this request. Defaults
199 to a system-specified value.
200 explain_options (Optional[:class:`~google.cloud.firestore_v1.query_profile.ExplainOptions`]):
201 Options to enable query profiling for this query. When set,
202 explain_metrics will be available on the returned generator.
203 read_time (Optional[datetime.datetime]): If set, reads documents as they were at the given
204 time. This must be a timestamp within the past one hour, or if Point-in-Time Recovery
205 is enabled, can additionally be a whole minute timestamp within the past 7 days. If no
206 timezone is specified in the :class:`datetime.datetime` object, it is assumed to be UTC.
207
208 Returns:
209 `AsyncStreamGenerator[List[AggregationResult]]`:
210 A generator of the query results.
211 """
212
213 inner_generator = self._make_stream(
214 transaction=transaction,
215 retry=retry,
216 timeout=timeout,
217 explain_options=explain_options,
218 read_time=read_time,
219 )
220 return AsyncStreamGenerator(inner_generator, explain_options)