1# Copyright 2023 Google LLC All rights reserved.
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7# http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14
15"""Classes for representing aggregation queries for the Google Cloud Firestore API.
16
17A :class:`~google.cloud.firestore_v1.aggregation.AggregationQuery` can be created directly from
18a :class:`~google.cloud.firestore_v1.collection.Collection` and that can be
19a more common way to create an aggregation query than direct usage of the constructor.
20"""
21from __future__ import annotations
22
23from typing import TYPE_CHECKING, Any, Generator, List, Optional, Union
24
25from google.api_core import exceptions, gapic_v1
26from google.api_core import retry as retries
27
28from google.cloud.firestore_v1.base_aggregation import (
29 AggregationResult,
30 BaseAggregationQuery,
31 _query_response_to_result,
32)
33from google.cloud.firestore_v1.query_results import QueryResultsList
34from google.cloud.firestore_v1.stream_generator import StreamGenerator
35
36# Types needed only for Type Hints
37if TYPE_CHECKING: # pragma: NO COVER
38 from google.cloud.firestore_v1 import transaction
39 from google.cloud.firestore_v1.query_profile import ExplainMetrics
40 from google.cloud.firestore_v1.query_profile import ExplainOptions
41
42 import datetime
43
44
45class AggregationQuery(BaseAggregationQuery):
46 """Represents an aggregation query to the Firestore API."""
47
48 def __init__(
49 self,
50 nested_query,
51 ) -> None:
52 super(AggregationQuery, self).__init__(nested_query)
53
54 def get(
55 self,
56 transaction=None,
57 retry: Union[retries.Retry, None, object] = gapic_v1.method.DEFAULT,
58 timeout: float | None = None,
59 *,
60 explain_options: Optional[ExplainOptions] = None,
61 read_time: Optional[datetime.datetime] = None,
62 ) -> QueryResultsList[AggregationResult]:
63 """Runs the aggregation query.
64
65 This sends a ``RunAggregationQuery`` RPC and returns a list of
66 aggregation results in the stream of ``RunAggregationQueryResponse``
67 messages.
68
69 Args:
70 transaction
71 (Optional[:class:`~google.cloud.firestore_v1.transaction.Transaction`]):
72 An existing transaction that this query will run in.
73 If a ``transaction`` is used and it already has write operations
74 added, this method cannot be used (i.e. read-after-write is not
75 allowed).
76 retry (google.api_core.retry.Retry): Designation of what errors, if any,
77 should be retried. Defaults to a system-specified policy.
78 timeout (float): The timeout for this request. Defaults to a
79 system-specified value.
80 explain_options
81 (Optional[:class:`~google.cloud.firestore_v1.query_profile.ExplainOptions`]):
82 Options to enable query profiling for this query. When set,
83 explain_metrics will be available on the returned generator.
84 read_time (Optional[datetime.datetime]): If set, reads documents as they were at the given
85 time. This must be a timestamp within the past one hour, or if Point-in-Time Recovery
86 is enabled, can additionally be a whole minute timestamp within the past 7 days. If no
87 timezone is specified in the :class:`datetime.datetime` object, it is assumed to be UTC.
88
89 Returns:
90 QueryResultsList[AggregationResult]: The aggregation query results.
91
92 """
93 explain_metrics: ExplainMetrics | None = None
94
95 result = self.stream(
96 transaction=transaction,
97 retry=retry,
98 timeout=timeout,
99 explain_options=explain_options,
100 read_time=read_time,
101 )
102 result_list = list(result)
103
104 if explain_options is None:
105 explain_metrics = None
106 else:
107 explain_metrics = result.get_explain_metrics()
108
109 return QueryResultsList(result_list, explain_options, explain_metrics)
110
111 def _get_stream_iterator(
112 self, transaction, retry, timeout, explain_options=None, read_time=None
113 ):
114 """Helper method for :meth:`stream`."""
115 request, kwargs = self._prep_stream(
116 transaction,
117 retry,
118 timeout,
119 explain_options,
120 read_time,
121 )
122
123 return self._client._firestore_api.run_aggregation_query(
124 request=request,
125 metadata=self._client._rpc_metadata,
126 **kwargs,
127 )
128
129 def _retry_query_after_exception(self, exc, retry, transaction):
130 """Helper method for :meth:`stream`."""
131 if transaction is None: # no snapshot-based retry inside transaction
132 if retry is gapic_v1.method.DEFAULT:
133 transport = self._client._firestore_api._transport
134 gapic_callable = transport.run_aggregation_query
135 retry = gapic_callable._retry
136 return retry._predicate(exc)
137
138 return False
139
140 def _make_stream(
141 self,
142 transaction: Optional[transaction.Transaction] = None,
143 retry: Union[retries.Retry, None, object] = gapic_v1.method.DEFAULT,
144 timeout: Optional[float] = None,
145 explain_options: Optional[ExplainOptions] = None,
146 read_time: Optional[datetime.datetime] = None,
147 ) -> Generator[List[AggregationResult], Any, Optional[ExplainMetrics]]:
148 """Internal method for stream(). Runs the aggregation query.
149
150 This sends a ``RunAggregationQuery`` RPC and then returns a generator
151 which consumes each document returned in the stream of
152 ``RunAggregationQueryResponse`` messages.
153
154 If a ``transaction`` is used and it already has write operations added,
155 this method cannot be used (i.e. read-after-write is not allowed).
156
157 Args:
158 transaction
159 (Optional[:class:`~google.cloud.firestore_v1.transaction.Transaction`]):
160 An existing transaction that this query will run in.
161 retry (Optional[google.api_core.retry.Retry]): Designation of what
162 errors, if any, should be retried. Defaults to a
163 system-specified policy.
164 timeout (Optional[float]): The timeout for this request. Defaults
165 to a system-specified value.
166 explain_options
167 (Optional[:class:`~google.cloud.firestore_v1.query_profile.ExplainOptions`]):
168 Options to enable query profiling for this query. When set,
169 explain_metrics will be available on the returned generator.
170 read_time (Optional[datetime.datetime]): If set, reads documents as they were at the given
171 time. This must be a timestamp within the past one hour, or if Point-in-Time Recovery
172 is enabled, can additionally be a whole minute timestamp within the past 7 days. If no
173 timezone is specified in the :class:`datetime.datetime` object, it is assumed to be UTC.
174
175 Yields:
176 List[AggregationResult]:
177 The result of aggregations of this query.
178
179 Returns:
180 (Optional[google.cloud.firestore_v1.types.query_profile.ExplainMetrtics]):
181 The results of query profiling, if received from the service.
182
183 """
184 metrics: ExplainMetrics | None = None
185
186 response_iterator = self._get_stream_iterator(
187 transaction,
188 retry,
189 timeout,
190 explain_options,
191 read_time,
192 )
193 while True:
194 try:
195 response = next(response_iterator, None)
196 except exceptions.GoogleAPICallError as exc:
197 if self._retry_query_after_exception(exc, retry, transaction):
198 response_iterator = self._get_stream_iterator(
199 transaction,
200 retry,
201 timeout,
202 explain_options,
203 read_time,
204 )
205 continue
206 else:
207 raise
208
209 if response is None: # EOI
210 break
211
212 if metrics is None and response.explain_metrics:
213 metrics = response.explain_metrics
214
215 result = _query_response_to_result(response)
216 if result:
217 yield result
218
219 return metrics
220
221 def stream(
222 self,
223 transaction: Optional["transaction.Transaction"] = None,
224 retry: Union[retries.Retry, None, object] = gapic_v1.method.DEFAULT,
225 timeout: Optional[float] = None,
226 *,
227 explain_options: Optional[ExplainOptions] = None,
228 read_time: Optional[datetime.datetime] = None,
229 ) -> StreamGenerator[List[AggregationResult]]:
230 """Runs the aggregation query.
231
232 This sends a ``RunAggregationQuery`` RPC and then returns a generator
233 which consumes each document returned in the stream of
234 ``RunAggregationQueryResponse`` messages.
235
236 If a ``transaction`` is used and it already has write operations added,
237 this method cannot be used (i.e. read-after-write is not allowed).
238
239 Args:
240 transaction
241 (Optional[:class:`~google.cloud.firestore_v1.transaction.Transaction`]):
242 An existing transaction that this query will run in.
243 retry (Optional[google.api_core.retry.Retry]): Designation of what
244 errors, if any, should be retried. Defaults to a
245 system-specified policy.
246 timeout (Optinal[float]): The timeout for this request. Defaults
247 to a system-specified value.
248 explain_options
249 (Optional[:class:`~google.cloud.firestore_v1.query_profile.ExplainOptions`]):
250 Options to enable query profiling for this query. When set,
251 explain_metrics will be available on the returned generator.
252 read_time (Optional[datetime.datetime]): If set, reads documents as they were at the given
253 time. This must be a timestamp within the past one hour, or if Point-in-Time Recovery
254 is enabled, can additionally be a whole minute timestamp within the past 7 days. If no
255 timezone is specified in the :class:`datetime.datetime` object, it is assumed to be UTC.
256
257 Returns:
258 `StreamGenerator[List[AggregationResult]]`:
259 A generator of the query results.
260 """
261 inner_generator = self._make_stream(
262 transaction=transaction,
263 retry=retry,
264 timeout=timeout,
265 explain_options=explain_options,
266 read_time=read_time,
267 )
268 return StreamGenerator(inner_generator, explain_options)