1# Copyright 2023 Google LLC All rights reserved.
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7# http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14
15"""Classes for representing aggregation queries for the Google Cloud Firestore API.
16
17A :class:`~google.cloud.firestore_v1.aggregation.AggregationQuery` can be created directly from
18a :class:`~google.cloud.firestore_v1.collection.Collection` and that can be
19a more common way to create an aggregation query than direct usage of the constructor.
20"""
21from __future__ import annotations
22
23from typing import TYPE_CHECKING, Any, Generator, List, Optional, Union
24
25from google.api_core import exceptions, gapic_v1
26from google.api_core import retry as retries
27
28from google.cloud.firestore_v1.base_aggregation import (
29 AggregationResult,
30 BaseAggregationQuery,
31 _query_response_to_result,
32)
33from google.cloud.firestore_v1.query_results import QueryResultsList
34from google.cloud.firestore_v1.stream_generator import StreamGenerator
35
36# Types needed only for Type Hints
37if TYPE_CHECKING: # pragma: NO COVER
38 from google.cloud.firestore_v1 import transaction
39 from google.cloud.firestore_v1.query_profile import ExplainMetrics
40 from google.cloud.firestore_v1.query_profile import ExplainOptions
41
42 import datetime
43
44
45class AggregationQuery(BaseAggregationQuery):
46 """Represents an aggregation query to the Firestore API."""
47
48 def __init__(
49 self,
50 nested_query,
51 ) -> None:
52 super(AggregationQuery, self).__init__(nested_query)
53
54 def get(
55 self,
56 transaction=None,
57 retry: Union[retries.Retry, None, object] = gapic_v1.method.DEFAULT,
58 timeout: float | None = None,
59 *,
60 explain_options: Optional[ExplainOptions] = None,
61 read_time: Optional[datetime.datetime] = None,
62 ) -> QueryResultsList[AggregationResult]:
63 """Runs the aggregation query.
64
65 This sends a ``RunAggregationQuery`` RPC and returns a list of
66 aggregation results in the stream of ``RunAggregationQueryResponse``
67 messages.
68
69 Args:
70 transaction (Optional[:class:`~google.cloud.firestore_v1.transaction.Transaction`]):
71 An existing transaction that this query will run in.
72 If a ``transaction`` is used and it already has write operations
73 added, this method cannot be used (i.e. read-after-write is not
74 allowed).
75 retry (google.api_core.retry.Retry): Designation of what errors, if any,
76 should be retried. Defaults to a system-specified policy.
77 timeout (float): The timeout for this request. Defaults to a
78 system-specified value.
79 explain_options (Optional[:class:`~google.cloud.firestore_v1.query_profile.ExplainOptions`]):
80 Options to enable query profiling for this query. When set,
81 explain_metrics will be available on the returned generator.
82 read_time (Optional[datetime.datetime]): If set, reads documents as they were at the given
83 time. This must be a timestamp within the past one hour, or if Point-in-Time Recovery
84 is enabled, can additionally be a whole minute timestamp within the past 7 days. If no
85 timezone is specified in the :class:`datetime.datetime` object, it is assumed to be UTC.
86
87 Returns:
88 QueryResultsList[AggregationResult]: The aggregation query results.
89
90 """
91 explain_metrics: ExplainMetrics | None = None
92
93 result = self.stream(
94 transaction=transaction,
95 retry=retry,
96 timeout=timeout,
97 explain_options=explain_options,
98 read_time=read_time,
99 )
100 result_list = list(result)
101
102 if explain_options is None:
103 explain_metrics = None
104 else:
105 explain_metrics = result.get_explain_metrics()
106
107 return QueryResultsList(result_list, explain_options, explain_metrics)
108
109 def _get_stream_iterator(
110 self, transaction, retry, timeout, explain_options=None, read_time=None
111 ):
112 """Helper method for :meth:`stream`."""
113 request, kwargs = self._prep_stream(
114 transaction,
115 retry,
116 timeout,
117 explain_options,
118 read_time,
119 )
120
121 return self._client._firestore_api.run_aggregation_query(
122 request=request,
123 metadata=self._client._rpc_metadata,
124 **kwargs,
125 )
126
127 def _retry_query_after_exception(self, exc, retry, transaction):
128 """Helper method for :meth:`stream`."""
129 if transaction is None: # no snapshot-based retry inside transaction
130 if retry is gapic_v1.method.DEFAULT:
131 transport = self._client._firestore_api._transport
132 gapic_callable = transport.run_aggregation_query
133 retry = gapic_callable._retry
134 return retry._predicate(exc)
135
136 return False
137
138 def _make_stream(
139 self,
140 transaction: Optional[transaction.Transaction] = None,
141 retry: Union[retries.Retry, None, object] = gapic_v1.method.DEFAULT,
142 timeout: Optional[float] = None,
143 explain_options: Optional[ExplainOptions] = None,
144 read_time: Optional[datetime.datetime] = None,
145 ) -> Generator[List[AggregationResult], Any, Optional[ExplainMetrics]]:
146 """Internal method for stream(). Runs the aggregation query.
147
148 This sends a ``RunAggregationQuery`` RPC and then returns a generator
149 which consumes each document returned in the stream of
150 ``RunAggregationQueryResponse`` messages.
151
152 If a ``transaction`` is used and it already has write operations added,
153 this method cannot be used (i.e. read-after-write is not allowed).
154
155 Args:
156 transaction (Optional[:class:`~google.cloud.firestore_v1.transaction.Transaction`]):
157 An existing transaction that this query will run in.
158 retry (Optional[google.api_core.retry.Retry]): Designation of what
159 errors, if any, should be retried. Defaults to a
160 system-specified policy.
161 timeout (Optional[float]): The timeout for this request. Defaults
162 to a system-specified value.
163 explain_options (Optional[:class:`~google.cloud.firestore_v1.query_profile.ExplainOptions`]):
164 Options to enable query profiling for this query. When set,
165 explain_metrics will be available on the returned generator.
166 read_time (Optional[datetime.datetime]): If set, reads documents as they were at the given
167 time. This must be a timestamp within the past one hour, or if Point-in-Time Recovery
168 is enabled, can additionally be a whole minute timestamp within the past 7 days. If no
169 timezone is specified in the :class:`datetime.datetime` object, it is assumed to be UTC.
170
171 Yields:
172 List[AggregationResult]:
173 The result of aggregations of this query.
174
175 Returns:
176 (Optional[google.cloud.firestore_v1.types.query_profile.ExplainMetrtics]):
177 The results of query profiling, if received from the service.
178
179 """
180 metrics: ExplainMetrics | None = None
181
182 response_iterator = self._get_stream_iterator(
183 transaction,
184 retry,
185 timeout,
186 explain_options,
187 read_time,
188 )
189 while True:
190 try:
191 response = next(response_iterator, None)
192 except exceptions.GoogleAPICallError as exc:
193 if self._retry_query_after_exception(exc, retry, transaction):
194 response_iterator = self._get_stream_iterator(
195 transaction,
196 retry,
197 timeout,
198 explain_options,
199 read_time,
200 )
201 continue
202 else:
203 raise
204
205 if response is None: # EOI
206 break
207
208 if metrics is None and response.explain_metrics:
209 metrics = response.explain_metrics
210
211 result = _query_response_to_result(response)
212 if result:
213 yield result
214
215 return metrics
216
217 def stream(
218 self,
219 transaction: Optional["transaction.Transaction"] = None,
220 retry: Union[retries.Retry, None, object] = gapic_v1.method.DEFAULT,
221 timeout: Optional[float] = None,
222 *,
223 explain_options: Optional[ExplainOptions] = None,
224 read_time: Optional[datetime.datetime] = None,
225 ) -> StreamGenerator[List[AggregationResult]]:
226 """Runs the aggregation query.
227
228 This sends a ``RunAggregationQuery`` RPC and then returns a generator
229 which consumes each document returned in the stream of
230 ``RunAggregationQueryResponse`` messages.
231
232 If a ``transaction`` is used and it already has write operations added,
233 this method cannot be used (i.e. read-after-write is not allowed).
234
235 Args:
236 transaction (Optional[:class:`~google.cloud.firestore_v1.transaction.Transaction`]):
237 An existing transaction that this query will run in.
238 retry (Optional[google.api_core.retry.Retry]): Designation of what
239 errors, if any, should be retried. Defaults to a
240 system-specified policy.
241 timeout (Optinal[float]): The timeout for this request. Defaults
242 to a system-specified value.
243 explain_options (Optional[:class:`~google.cloud.firestore_v1.query_profile.ExplainOptions`]):
244 Options to enable query profiling for this query. When set,
245 explain_metrics will be available on the returned generator.
246 read_time (Optional[datetime.datetime]): If set, reads documents as they were at the given
247 time. This must be a timestamp within the past one hour, or if Point-in-Time Recovery
248 is enabled, can additionally be a whole minute timestamp within the past 7 days. If no
249 timezone is specified in the :class:`datetime.datetime` object, it is assumed to be UTC.
250
251 Returns:
252 `StreamGenerator[List[AggregationResult]]`:
253 A generator of the query results.
254 """
255 inner_generator = self._make_stream(
256 transaction=transaction,
257 retry=retry,
258 timeout=timeout,
259 explain_options=explain_options,
260 read_time=read_time,
261 )
262 return StreamGenerator(inner_generator, explain_options)