1# Copyright 2023 Google LLC All rights reserved.
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7# http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14
15"""Classes for representing Async aggregation queries for the Google Cloud Firestore API.
16
17A :class:`~google.cloud.firestore_v1.async_aggregation.AsyncAggregationQuery` can be created directly from
18a :class:`~google.cloud.firestore_v1.async_collection.AsyncCollection` and that can be
19a more common way to create an aggregation query than direct usage of the constructor.
20"""
21from __future__ import annotations
22
23from typing import TYPE_CHECKING, Any, AsyncGenerator, List, Optional, Union
24
25from google.api_core import gapic_v1
26from google.api_core import retry_async as retries
27
28from google.cloud.firestore_v1 import transaction
29from google.cloud.firestore_v1.async_stream_generator import AsyncStreamGenerator
30from google.cloud.firestore_v1.base_aggregation import (
31 BaseAggregationQuery,
32 _query_response_to_result,
33)
34from google.cloud.firestore_v1.query_results import QueryResultsList
35
36if TYPE_CHECKING: # pragma: NO COVER
37 from google.cloud.firestore_v1.base_aggregation import AggregationResult
38 from google.cloud.firestore_v1.query_profile import ExplainMetrics, ExplainOptions
39 import google.cloud.firestore_v1.types.query_profile as query_profile_pb
40 import datetime
41
42
43class AsyncAggregationQuery(BaseAggregationQuery):
44 """Represents an aggregation query to the Firestore API."""
45
46 def __init__(
47 self,
48 nested_query,
49 ) -> None:
50 super(AsyncAggregationQuery, self).__init__(nested_query)
51
52 async def get(
53 self,
54 transaction=None,
55 retry: Union[retries.AsyncRetry, None, object] = gapic_v1.method.DEFAULT,
56 timeout: float | None = None,
57 *,
58 explain_options: Optional[ExplainOptions] = None,
59 read_time: Optional[datetime.datetime] = None,
60 ) -> QueryResultsList[List[AggregationResult]]:
61 """Runs the aggregation query.
62
63 This sends a ``RunAggregationQuery`` RPC and returns a list of aggregation results in the stream of ``RunAggregationQueryResponse`` messages.
64
65 Args:
66 transaction
67 (Optional[:class:`~google.cloud.firestore_v1.transaction.Transaction`]):
68 An existing transaction that this query will run in.
69 If a ``transaction`` is used and it already has write operations
70 added, this method cannot be used (i.e. read-after-write is not
71 allowed).
72 retry (google.api_core.retry.Retry): Designation of what errors, if any,
73 should be retried. Defaults to a system-specified policy.
74 timeout (float): The timeout for this request. Defaults to a
75 system-specified value.
76 explain_options
77 (Optional[:class:`~google.cloud.firestore_v1.query_profile.ExplainOptions`]):
78 Options to enable query profiling for this query. When set,
79 explain_metrics will be available on the returned generator.
80 read_time (Optional[datetime.datetime]): If set, reads documents as they were at the given
81 time. This must be a timestamp within the past one hour, or if Point-in-Time Recovery
82 is enabled, can additionally be a whole minute timestamp within the past 7 days. If no
83 timezone is specified in the :class:`datetime.datetime` object, it is assumed to be UTC.
84
85 Returns:
86 QueryResultsList[List[AggregationResult]]: The aggregation query results.
87
88 """
89 explain_metrics: ExplainMetrics | None = None
90
91 stream_result = self.stream(
92 transaction=transaction,
93 retry=retry,
94 timeout=timeout,
95 explain_options=explain_options,
96 read_time=read_time,
97 )
98 try:
99 result = [aggregation async for aggregation in stream_result]
100
101 if explain_options is None:
102 explain_metrics = None
103 else:
104 explain_metrics = await stream_result.get_explain_metrics()
105 finally:
106 await stream_result.aclose()
107
108 return QueryResultsList(result, explain_options, explain_metrics)
109
110 async def _make_stream(
111 self,
112 transaction: Optional[transaction.Transaction] = None,
113 retry: retries.AsyncRetry | object | None = gapic_v1.method.DEFAULT,
114 timeout: Optional[float] = None,
115 explain_options: Optional[ExplainOptions] = None,
116 read_time: Optional[datetime.datetime] = None,
117 ) -> AsyncGenerator[List[AggregationResult] | query_profile_pb.ExplainMetrics, Any]:
118 """Internal method for stream(). Runs the aggregation query.
119
120 This sends a ``RunAggregationQuery`` RPC and then returns a generator which
121 consumes each document returned in the stream of ``RunAggregationQueryResponse``
122 messages.
123
124 If a ``transaction`` is used and it already has write operations
125 added, this method cannot be used (i.e. read-after-write is not
126 allowed).
127
128 Args:
129 transaction (Optional[:class:`~google.cloud.firestore_v1.transaction.\
130 Transaction`]):
131 An existing transaction that the query will run in.
132 retry (Optional[google.api_core.retry.Retry]): Designation of what
133 errors, if any, should be retried. Defaults to a
134 system-specified policy.
135 timeout (Optional[float]): The timeout for this request. Defaults
136 to a system-specified value.
137 explain_options
138 (Optional[:class:`~google.cloud.firestore_v1.query_profile.ExplainOptions`]):
139 Options to enable query profiling for this query. When set,
140 explain_metrics will be available on the returned generator.
141 read_time (Optional[datetime.datetime]): If set, reads documents as they were at the given
142 time. This must be a timestamp within the past one hour, or if Point-in-Time Recovery
143 is enabled, can additionally be a whole minute timestamp within the past 7 days. If no
144 timezone is specified in the :class:`datetime.datetime` object, it is assumed to be UTC.
145
146 Yields:
147 List[AggregationResult] | query_profile_pb.ExplainMetrics:
148 The result of aggregations of this query. Query results will be
149 yielded as `List[AggregationResult]`. When the result contains
150 returned explain metrics, yield `query_profile_pb.ExplainMetrics`
151 individually.
152 """
153 request, kwargs = self._prep_stream(
154 transaction,
155 retry,
156 timeout,
157 explain_options,
158 read_time,
159 )
160
161 response_iterator = await self._client._firestore_api.run_aggregation_query(
162 request=request,
163 metadata=self._client._rpc_metadata,
164 **kwargs,
165 )
166
167 async for response in response_iterator:
168 result = _query_response_to_result(response)
169 if result:
170 yield result
171
172 if response.explain_metrics:
173 metrics = response.explain_metrics
174 yield metrics
175
176 def stream(
177 self,
178 transaction: Optional[transaction.Transaction] = None,
179 retry: retries.AsyncRetry | object | None = gapic_v1.method.DEFAULT,
180 timeout: Optional[float] = None,
181 *,
182 explain_options: Optional[ExplainOptions] = None,
183 read_time: Optional[datetime.datetime] = None,
184 ) -> AsyncStreamGenerator[List[AggregationResult]]:
185 """Runs the aggregation query.
186
187 This sends a ``RunAggregationQuery`` RPC and then returns a generator
188 which consumes each document returned in the stream of
189 ``RunAggregationQueryResponse`` messages.
190
191 If a ``transaction`` is used and it already has write operations added,
192 this method cannot be used (i.e. read-after-write is not allowed).
193
194 Args:
195 transaction (Optional[:class:`~google.cloud.firestore_v1.transaction.\
196 Transaction`]):
197 An existing transaction that the query will run in.
198 retry (Optional[google.api_core.retry.Retry]): Designation of what
199 errors, if any, should be retried. Defaults to a
200 system-specified policy.
201 timeout (Optional[float]): The timeout for this request. Defaults
202 to a system-specified value.
203 explain_options
204 (Optional[:class:`~google.cloud.firestore_v1.query_profile.ExplainOptions`]):
205 Options to enable query profiling for this query. When set,
206 explain_metrics will be available on the returned generator.
207 read_time (Optional[datetime.datetime]): If set, reads documents as they were at the given
208 time. This must be a timestamp within the past one hour, or if Point-in-Time Recovery
209 is enabled, can additionally be a whole minute timestamp within the past 7 days. If no
210 timezone is specified in the :class:`datetime.datetime` object, it is assumed to be UTC.
211
212 Returns:
213 `AsyncStreamGenerator[List[AggregationResult]]`:
214 A generator of the query results.
215 """
216
217 inner_generator = self._make_stream(
218 transaction=transaction,
219 retry=retry,
220 timeout=timeout,
221 explain_options=explain_options,
222 read_time=read_time,
223 )
224 return AsyncStreamGenerator(inner_generator, explain_options)