Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/google/cloud/firestore_v1/base_aggregation.py: 43%

99 statements  

« prev     ^ index     » next       coverage.py v7.3.2, created at 2023-12-09 06:27 +0000

1# Copyright 2023 Google LLC All rights reserved. 

2# 

3# Licensed under the Apache License, Version 2.0 (the "License"); 

4# you may not use this file except in compliance with the License. 

5# You may obtain a copy of the License at 

6# 

7# http://www.apache.org/licenses/LICENSE-2.0 

8# 

9# Unless required by applicable law or agreed to in writing, software 

10# distributed under the License is distributed on an "AS IS" BASIS, 

11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

12# See the License for the specific language governing permissions and 

13# limitations under the License. 

14 

15"""Classes for representing aggregation queries for the Google Cloud Firestore API. 

16 

17A :class:`~google.cloud.firestore_v1.aggregation.AggregationQuery` can be created directly from 

18a :class:`~google.cloud.firestore_v1.collection.Collection` and that can be 

19a more common way to create an aggregation query than direct usage of the constructor. 

20""" 

21 

22 

23from __future__ import annotations 

24 

25import abc 

26 

27 

28from abc import ABC 

29 

30from typing import List, Coroutine, Union, Tuple, Generator, Any, AsyncGenerator 

31 

32from google.api_core import gapic_v1 

33from google.api_core import retry as retries 

34 

35 

36from google.cloud.firestore_v1.field_path import FieldPath 

37from google.cloud.firestore_v1.types import RunAggregationQueryResponse 

38from google.cloud.firestore_v1.types import StructuredAggregationQuery 

39from google.cloud.firestore_v1 import _helpers 

40 

41 

42class AggregationResult(object): 

43 """ 

44 A class representing result from Aggregation Query 

45 :type alias: str 

46 :param alias: The alias for the aggregation. 

47 :type value: int 

48 :param value: The resulting value from the aggregation. 

49 :type read_time: 

50 :param value: The resulting read_time 

51 """ 

52 

53 def __init__(self, alias: str, value: int, read_time=None): 

54 self.alias = alias 

55 self.value = value 

56 self.read_time = read_time 

57 

58 def __repr__(self): 

59 return f"<Aggregation alias={self.alias}, value={self.value}, readtime={self.read_time}>" 

60 

61 

62class BaseAggregation(ABC): 

63 def __init__(self, alias: str | None = None): 

64 self.alias = alias 

65 

66 @abc.abstractmethod 

67 def _to_protobuf(self): 

68 """Convert this instance to the protobuf representation""" 

69 

70 

71class CountAggregation(BaseAggregation): 

72 def __init__(self, alias: str | None = None): 

73 super(CountAggregation, self).__init__(alias=alias) 

74 

75 def _to_protobuf(self): 

76 """Convert this instance to the protobuf representation""" 

77 aggregation_pb = StructuredAggregationQuery.Aggregation() 

78 aggregation_pb.alias = self.alias 

79 aggregation_pb.count = StructuredAggregationQuery.Aggregation.Count() 

80 return aggregation_pb 

81 

82 

83class SumAggregation(BaseAggregation): 

84 def __init__(self, field_ref: str | FieldPath, alias: str | None = None): 

85 if isinstance(field_ref, FieldPath): 

86 # convert field path to string 

87 field_ref = field_ref.to_api_repr() 

88 self.field_ref = field_ref 

89 super(SumAggregation, self).__init__(alias=alias) 

90 

91 def _to_protobuf(self): 

92 """Convert this instance to the protobuf representation""" 

93 aggregation_pb = StructuredAggregationQuery.Aggregation() 

94 aggregation_pb.alias = self.alias 

95 aggregation_pb.sum = StructuredAggregationQuery.Aggregation.Sum() 

96 aggregation_pb.sum.field.field_path = self.field_ref 

97 return aggregation_pb 

98 

99 

100class AvgAggregation(BaseAggregation): 

101 def __init__(self, field_ref: str | FieldPath, alias: str | None = None): 

102 if isinstance(field_ref, FieldPath): 

103 # convert field path to string 

104 field_ref = field_ref.to_api_repr() 

105 self.field_ref = field_ref 

106 super(AvgAggregation, self).__init__(alias=alias) 

107 

108 def _to_protobuf(self): 

109 """Convert this instance to the protobuf representation""" 

110 aggregation_pb = StructuredAggregationQuery.Aggregation() 

111 aggregation_pb.alias = self.alias 

112 aggregation_pb.avg = StructuredAggregationQuery.Aggregation.Avg() 

113 aggregation_pb.avg.field.field_path = self.field_ref 

114 return aggregation_pb 

115 

116 

117def _query_response_to_result( 

118 response_pb: RunAggregationQueryResponse, 

119) -> List[AggregationResult]: 

120 results = [ 

121 AggregationResult( 

122 alias=key, 

123 value=response_pb.result.aggregate_fields[key].integer_value 

124 or response_pb.result.aggregate_fields[key].double_value, 

125 read_time=response_pb.read_time, 

126 ) 

127 for key in response_pb.result.aggregate_fields.pb.keys() 

128 ] 

129 

130 return results 

131 

132 

133class BaseAggregationQuery(ABC): 

134 """Represents an aggregation query to the Firestore API.""" 

135 

136 def __init__(self, nested_query, alias: str | None = None) -> None: 

137 self._nested_query = nested_query 

138 self._alias = alias 

139 self._collection_ref = nested_query._parent 

140 self._aggregations: List[BaseAggregation] = [] 

141 

142 @property 

143 def _client(self): 

144 return self._collection_ref._client 

145 

146 def count(self, alias: str | None = None): 

147 """ 

148 Adds a count over the nested query 

149 """ 

150 count_aggregation = CountAggregation(alias=alias) 

151 self._aggregations.append(count_aggregation) 

152 return self 

153 

154 def sum(self, field_ref: str | FieldPath, alias: str | None = None): 

155 """ 

156 Adds a sum over the nested query 

157 """ 

158 sum_aggregation = SumAggregation(field_ref, alias=alias) 

159 self._aggregations.append(sum_aggregation) 

160 return self 

161 

162 def avg(self, field_ref: str | FieldPath, alias: str | None = None): 

163 """ 

164 Adds an avg over the nested query 

165 """ 

166 avg_aggregation = AvgAggregation(field_ref, alias=alias) 

167 self._aggregations.append(avg_aggregation) 

168 return self 

169 

170 def add_aggregation(self, aggregation: BaseAggregation) -> None: 

171 """ 

172 Adds an aggregation operation to the nested query 

173 

174 :type aggregation: :class:`google.cloud.firestore_v1.aggregation.BaseAggregation` 

175 :param aggregation: An aggregation operation, e.g. a CountAggregation 

176 """ 

177 self._aggregations.append(aggregation) 

178 

179 def add_aggregations(self, aggregations: List[BaseAggregation]) -> None: 

180 """ 

181 Adds a list of aggregations to the nested query 

182 

183 :type aggregations: list 

184 :param aggregations: a list of aggregation operations 

185 """ 

186 self._aggregations.extend(aggregations) 

187 

188 def _to_protobuf(self) -> StructuredAggregationQuery: 

189 pb = StructuredAggregationQuery() 

190 pb.structured_query = self._nested_query._to_protobuf() 

191 

192 for aggregation in self._aggregations: 

193 aggregation_pb = aggregation._to_protobuf() 

194 pb.aggregations.append(aggregation_pb) 

195 return pb 

196 

197 def _prep_stream( 

198 self, 

199 transaction=None, 

200 retry: Union[retries.Retry, None, gapic_v1.method._MethodDefault] = None, 

201 timeout: float | None = None, 

202 ) -> Tuple[dict, dict]: 

203 parent_path, expected_prefix = self._collection_ref._parent_info() 

204 request = { 

205 "parent": parent_path, 

206 "structured_aggregation_query": self._to_protobuf(), 

207 "transaction": _helpers.get_transaction_id(transaction), 

208 } 

209 kwargs = _helpers.make_retry_timeout_kwargs(retry, timeout) 

210 

211 return request, kwargs 

212 

213 @abc.abstractmethod 

214 def get( 

215 self, 

216 transaction=None, 

217 retry: Union[ 

218 retries.Retry, None, gapic_v1.method._MethodDefault 

219 ] = gapic_v1.method.DEFAULT, 

220 timeout: float | None = None, 

221 ) -> List[AggregationResult] | Coroutine[Any, Any, List[AggregationResult]]: 

222 """Runs the aggregation query. 

223 

224 This sends a ``RunAggregationQuery`` RPC and returns a list of aggregation results in the stream of ``RunAggregationQueryResponse`` messages. 

225 

226 Args: 

227 transaction 

228 (Optional[:class:`~google.cloud.firestore_v1.transaction.Transaction`]): 

229 An existing transaction that this query will run in. 

230 If a ``transaction`` is used and it already has write operations 

231 added, this method cannot be used (i.e. read-after-write is not 

232 allowed). 

233 retry (google.api_core.retry.Retry): Designation of what errors, if any, 

234 should be retried. Defaults to a system-specified policy. 

235 timeout (float): The timeout for this request. Defaults to a 

236 system-specified value. 

237 

238 Returns: 

239 list: The aggregation query results 

240 

241 """ 

242 

243 @abc.abstractmethod 

244 def stream( 

245 self, 

246 transaction=None, 

247 retry: Union[ 

248 retries.Retry, None, gapic_v1.method._MethodDefault 

249 ] = gapic_v1.method.DEFAULT, 

250 timeout: float | None = None, 

251 ) -> ( 

252 Generator[List[AggregationResult], Any, None] 

253 | AsyncGenerator[List[AggregationResult], None] 

254 ): 

255 """Runs the aggregation query. 

256 

257 This sends a``RunAggregationQuery`` RPC and returns an iterator in the stream of ``RunAggregationQueryResponse`` messages. 

258 

259 Args: 

260 transaction 

261 (Optional[:class:`~google.cloud.firestore_v1.transaction.Transaction`]): 

262 An existing transaction that this query will run in. 

263 If a ``transaction`` is used and it already has write operations 

264 added, this method cannot be used (i.e. read-after-write is not 

265 allowed). 

266 retry (google.api_core.retry.Retry): Designation of what errors, if any, 

267 should be retried. Defaults to a system-specified policy. 

268 timeout (float): The timeout for this request. Defaults to a 

269 system-specified value. 

270 

271 Returns: 

272 list: The aggregation query results 

273 

274 """