Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/google/cloud/firestore_v1/base_aggregation.py: 43%
99 statements
« prev ^ index » next coverage.py v7.3.2, created at 2023-12-09 06:27 +0000
« prev ^ index » next coverage.py v7.3.2, created at 2023-12-09 06:27 +0000
1# Copyright 2023 Google LLC All rights reserved.
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7# http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
15"""Classes for representing aggregation queries for the Google Cloud Firestore API.
17A :class:`~google.cloud.firestore_v1.aggregation.AggregationQuery` can be created directly from
18a :class:`~google.cloud.firestore_v1.collection.Collection` and that can be
19a more common way to create an aggregation query than direct usage of the constructor.
20"""
23from __future__ import annotations
25import abc
28from abc import ABC
30from typing import List, Coroutine, Union, Tuple, Generator, Any, AsyncGenerator
32from google.api_core import gapic_v1
33from google.api_core import retry as retries
36from google.cloud.firestore_v1.field_path import FieldPath
37from google.cloud.firestore_v1.types import RunAggregationQueryResponse
38from google.cloud.firestore_v1.types import StructuredAggregationQuery
39from google.cloud.firestore_v1 import _helpers
42class AggregationResult(object):
43 """
44 A class representing result from Aggregation Query
45 :type alias: str
46 :param alias: The alias for the aggregation.
47 :type value: int
48 :param value: The resulting value from the aggregation.
49 :type read_time:
50 :param value: The resulting read_time
51 """
53 def __init__(self, alias: str, value: int, read_time=None):
54 self.alias = alias
55 self.value = value
56 self.read_time = read_time
58 def __repr__(self):
59 return f"<Aggregation alias={self.alias}, value={self.value}, readtime={self.read_time}>"
62class BaseAggregation(ABC):
63 def __init__(self, alias: str | None = None):
64 self.alias = alias
66 @abc.abstractmethod
67 def _to_protobuf(self):
68 """Convert this instance to the protobuf representation"""
71class CountAggregation(BaseAggregation):
72 def __init__(self, alias: str | None = None):
73 super(CountAggregation, self).__init__(alias=alias)
75 def _to_protobuf(self):
76 """Convert this instance to the protobuf representation"""
77 aggregation_pb = StructuredAggregationQuery.Aggregation()
78 aggregation_pb.alias = self.alias
79 aggregation_pb.count = StructuredAggregationQuery.Aggregation.Count()
80 return aggregation_pb
83class SumAggregation(BaseAggregation):
84 def __init__(self, field_ref: str | FieldPath, alias: str | None = None):
85 if isinstance(field_ref, FieldPath):
86 # convert field path to string
87 field_ref = field_ref.to_api_repr()
88 self.field_ref = field_ref
89 super(SumAggregation, self).__init__(alias=alias)
91 def _to_protobuf(self):
92 """Convert this instance to the protobuf representation"""
93 aggregation_pb = StructuredAggregationQuery.Aggregation()
94 aggregation_pb.alias = self.alias
95 aggregation_pb.sum = StructuredAggregationQuery.Aggregation.Sum()
96 aggregation_pb.sum.field.field_path = self.field_ref
97 return aggregation_pb
100class AvgAggregation(BaseAggregation):
101 def __init__(self, field_ref: str | FieldPath, alias: str | None = None):
102 if isinstance(field_ref, FieldPath):
103 # convert field path to string
104 field_ref = field_ref.to_api_repr()
105 self.field_ref = field_ref
106 super(AvgAggregation, self).__init__(alias=alias)
108 def _to_protobuf(self):
109 """Convert this instance to the protobuf representation"""
110 aggregation_pb = StructuredAggregationQuery.Aggregation()
111 aggregation_pb.alias = self.alias
112 aggregation_pb.avg = StructuredAggregationQuery.Aggregation.Avg()
113 aggregation_pb.avg.field.field_path = self.field_ref
114 return aggregation_pb
117def _query_response_to_result(
118 response_pb: RunAggregationQueryResponse,
119) -> List[AggregationResult]:
120 results = [
121 AggregationResult(
122 alias=key,
123 value=response_pb.result.aggregate_fields[key].integer_value
124 or response_pb.result.aggregate_fields[key].double_value,
125 read_time=response_pb.read_time,
126 )
127 for key in response_pb.result.aggregate_fields.pb.keys()
128 ]
130 return results
133class BaseAggregationQuery(ABC):
134 """Represents an aggregation query to the Firestore API."""
136 def __init__(self, nested_query, alias: str | None = None) -> None:
137 self._nested_query = nested_query
138 self._alias = alias
139 self._collection_ref = nested_query._parent
140 self._aggregations: List[BaseAggregation] = []
142 @property
143 def _client(self):
144 return self._collection_ref._client
146 def count(self, alias: str | None = None):
147 """
148 Adds a count over the nested query
149 """
150 count_aggregation = CountAggregation(alias=alias)
151 self._aggregations.append(count_aggregation)
152 return self
154 def sum(self, field_ref: str | FieldPath, alias: str | None = None):
155 """
156 Adds a sum over the nested query
157 """
158 sum_aggregation = SumAggregation(field_ref, alias=alias)
159 self._aggregations.append(sum_aggregation)
160 return self
162 def avg(self, field_ref: str | FieldPath, alias: str | None = None):
163 """
164 Adds an avg over the nested query
165 """
166 avg_aggregation = AvgAggregation(field_ref, alias=alias)
167 self._aggregations.append(avg_aggregation)
168 return self
170 def add_aggregation(self, aggregation: BaseAggregation) -> None:
171 """
172 Adds an aggregation operation to the nested query
174 :type aggregation: :class:`google.cloud.firestore_v1.aggregation.BaseAggregation`
175 :param aggregation: An aggregation operation, e.g. a CountAggregation
176 """
177 self._aggregations.append(aggregation)
179 def add_aggregations(self, aggregations: List[BaseAggregation]) -> None:
180 """
181 Adds a list of aggregations to the nested query
183 :type aggregations: list
184 :param aggregations: a list of aggregation operations
185 """
186 self._aggregations.extend(aggregations)
188 def _to_protobuf(self) -> StructuredAggregationQuery:
189 pb = StructuredAggregationQuery()
190 pb.structured_query = self._nested_query._to_protobuf()
192 for aggregation in self._aggregations:
193 aggregation_pb = aggregation._to_protobuf()
194 pb.aggregations.append(aggregation_pb)
195 return pb
197 def _prep_stream(
198 self,
199 transaction=None,
200 retry: Union[retries.Retry, None, gapic_v1.method._MethodDefault] = None,
201 timeout: float | None = None,
202 ) -> Tuple[dict, dict]:
203 parent_path, expected_prefix = self._collection_ref._parent_info()
204 request = {
205 "parent": parent_path,
206 "structured_aggregation_query": self._to_protobuf(),
207 "transaction": _helpers.get_transaction_id(transaction),
208 }
209 kwargs = _helpers.make_retry_timeout_kwargs(retry, timeout)
211 return request, kwargs
213 @abc.abstractmethod
214 def get(
215 self,
216 transaction=None,
217 retry: Union[
218 retries.Retry, None, gapic_v1.method._MethodDefault
219 ] = gapic_v1.method.DEFAULT,
220 timeout: float | None = None,
221 ) -> List[AggregationResult] | Coroutine[Any, Any, List[AggregationResult]]:
222 """Runs the aggregation query.
224 This sends a ``RunAggregationQuery`` RPC and returns a list of aggregation results in the stream of ``RunAggregationQueryResponse`` messages.
226 Args:
227 transaction
228 (Optional[:class:`~google.cloud.firestore_v1.transaction.Transaction`]):
229 An existing transaction that this query will run in.
230 If a ``transaction`` is used and it already has write operations
231 added, this method cannot be used (i.e. read-after-write is not
232 allowed).
233 retry (google.api_core.retry.Retry): Designation of what errors, if any,
234 should be retried. Defaults to a system-specified policy.
235 timeout (float): The timeout for this request. Defaults to a
236 system-specified value.
238 Returns:
239 list: The aggregation query results
241 """
243 @abc.abstractmethod
244 def stream(
245 self,
246 transaction=None,
247 retry: Union[
248 retries.Retry, None, gapic_v1.method._MethodDefault
249 ] = gapic_v1.method.DEFAULT,
250 timeout: float | None = None,
251 ) -> (
252 Generator[List[AggregationResult], Any, None]
253 | AsyncGenerator[List[AggregationResult], None]
254 ):
255 """Runs the aggregation query.
257 This sends a``RunAggregationQuery`` RPC and returns an iterator in the stream of ``RunAggregationQueryResponse`` messages.
259 Args:
260 transaction
261 (Optional[:class:`~google.cloud.firestore_v1.transaction.Transaction`]):
262 An existing transaction that this query will run in.
263 If a ``transaction`` is used and it already has write operations
264 added, this method cannot be used (i.e. read-after-write is not
265 allowed).
266 retry (google.api_core.retry.Retry): Designation of what errors, if any,
267 should be retried. Defaults to a system-specified policy.
268 timeout (float): The timeout for this request. Defaults to a
269 system-specified value.
271 Returns:
272 list: The aggregation query results
274 """