Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/google/cloud/firestore_v1/aggregation.py: 28%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

53 statements  

1# Copyright 2023 Google LLC All rights reserved. 

2# 

3# Licensed under the Apache License, Version 2.0 (the "License"); 

4# you may not use this file except in compliance with the License. 

5# You may obtain a copy of the License at 

6# 

7# http://www.apache.org/licenses/LICENSE-2.0 

8# 

9# Unless required by applicable law or agreed to in writing, software 

10# distributed under the License is distributed on an "AS IS" BASIS, 

11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

12# See the License for the specific language governing permissions and 

13# limitations under the License. 

14 

15"""Classes for representing aggregation queries for the Google Cloud Firestore API. 

16 

17A :class:`~google.cloud.firestore_v1.aggregation.AggregationQuery` can be created directly from 

18a :class:`~google.cloud.firestore_v1.collection.Collection` and that can be 

19a more common way to create an aggregation query than direct usage of the constructor. 

20""" 

21from __future__ import annotations 

22 

23from typing import TYPE_CHECKING, Any, Generator, List, Optional, Union 

24 

25from google.api_core import exceptions, gapic_v1 

26from google.api_core import retry as retries 

27 

28from google.cloud.firestore_v1.base_aggregation import ( 

29 AggregationResult, 

30 BaseAggregationQuery, 

31 _query_response_to_result, 

32) 

33from google.cloud.firestore_v1.query_results import QueryResultsList 

34from google.cloud.firestore_v1.stream_generator import StreamGenerator 

35 

36# Types needed only for Type Hints 

37if TYPE_CHECKING: # pragma: NO COVER 

38 from google.cloud.firestore_v1 import transaction 

39 from google.cloud.firestore_v1.query_profile import ExplainMetrics 

40 from google.cloud.firestore_v1.query_profile import ExplainOptions 

41 

42 import datetime 

43 

44 

45class AggregationQuery(BaseAggregationQuery): 

46 """Represents an aggregation query to the Firestore API.""" 

47 

48 def __init__( 

49 self, 

50 nested_query, 

51 ) -> None: 

52 super(AggregationQuery, self).__init__(nested_query) 

53 

54 def get( 

55 self, 

56 transaction=None, 

57 retry: Union[retries.Retry, None, object] = gapic_v1.method.DEFAULT, 

58 timeout: float | None = None, 

59 *, 

60 explain_options: Optional[ExplainOptions] = None, 

61 read_time: Optional[datetime.datetime] = None, 

62 ) -> QueryResultsList[AggregationResult]: 

63 """Runs the aggregation query. 

64 

65 This sends a ``RunAggregationQuery`` RPC and returns a list of 

66 aggregation results in the stream of ``RunAggregationQueryResponse`` 

67 messages. 

68 

69 Args: 

70 transaction 

71 (Optional[:class:`~google.cloud.firestore_v1.transaction.Transaction`]): 

72 An existing transaction that this query will run in. 

73 If a ``transaction`` is used and it already has write operations 

74 added, this method cannot be used (i.e. read-after-write is not 

75 allowed). 

76 retry (google.api_core.retry.Retry): Designation of what errors, if any, 

77 should be retried. Defaults to a system-specified policy. 

78 timeout (float): The timeout for this request. Defaults to a 

79 system-specified value. 

80 explain_options 

81 (Optional[:class:`~google.cloud.firestore_v1.query_profile.ExplainOptions`]): 

82 Options to enable query profiling for this query. When set, 

83 explain_metrics will be available on the returned generator. 

84 read_time (Optional[datetime.datetime]): If set, reads documents as they were at the given 

85 time. This must be a timestamp within the past one hour, or if Point-in-Time Recovery 

86 is enabled, can additionally be a whole minute timestamp within the past 7 days. If no 

87 timezone is specified in the :class:`datetime.datetime` object, it is assumed to be UTC. 

88 

89 Returns: 

90 QueryResultsList[AggregationResult]: The aggregation query results. 

91 

92 """ 

93 explain_metrics: ExplainMetrics | None = None 

94 

95 result = self.stream( 

96 transaction=transaction, 

97 retry=retry, 

98 timeout=timeout, 

99 explain_options=explain_options, 

100 read_time=read_time, 

101 ) 

102 result_list = list(result) 

103 

104 if explain_options is None: 

105 explain_metrics = None 

106 else: 

107 explain_metrics = result.get_explain_metrics() 

108 

109 return QueryResultsList(result_list, explain_options, explain_metrics) 

110 

111 def _get_stream_iterator( 

112 self, transaction, retry, timeout, explain_options=None, read_time=None 

113 ): 

114 """Helper method for :meth:`stream`.""" 

115 request, kwargs = self._prep_stream( 

116 transaction, 

117 retry, 

118 timeout, 

119 explain_options, 

120 read_time, 

121 ) 

122 

123 return self._client._firestore_api.run_aggregation_query( 

124 request=request, 

125 metadata=self._client._rpc_metadata, 

126 **kwargs, 

127 ) 

128 

129 def _retry_query_after_exception(self, exc, retry, transaction): 

130 """Helper method for :meth:`stream`.""" 

131 if transaction is None: # no snapshot-based retry inside transaction 

132 if retry is gapic_v1.method.DEFAULT: 

133 transport = self._client._firestore_api._transport 

134 gapic_callable = transport.run_aggregation_query 

135 retry = gapic_callable._retry 

136 return retry._predicate(exc) 

137 

138 return False 

139 

140 def _make_stream( 

141 self, 

142 transaction: Optional[transaction.Transaction] = None, 

143 retry: Union[retries.Retry, None, object] = gapic_v1.method.DEFAULT, 

144 timeout: Optional[float] = None, 

145 explain_options: Optional[ExplainOptions] = None, 

146 read_time: Optional[datetime.datetime] = None, 

147 ) -> Generator[List[AggregationResult], Any, Optional[ExplainMetrics]]: 

148 """Internal method for stream(). Runs the aggregation query. 

149 

150 This sends a ``RunAggregationQuery`` RPC and then returns a generator 

151 which consumes each document returned in the stream of 

152 ``RunAggregationQueryResponse`` messages. 

153 

154 If a ``transaction`` is used and it already has write operations added, 

155 this method cannot be used (i.e. read-after-write is not allowed). 

156 

157 Args: 

158 transaction 

159 (Optional[:class:`~google.cloud.firestore_v1.transaction.Transaction`]): 

160 An existing transaction that this query will run in. 

161 retry (Optional[google.api_core.retry.Retry]): Designation of what 

162 errors, if any, should be retried. Defaults to a 

163 system-specified policy. 

164 timeout (Optional[float]): The timeout for this request. Defaults 

165 to a system-specified value. 

166 explain_options 

167 (Optional[:class:`~google.cloud.firestore_v1.query_profile.ExplainOptions`]): 

168 Options to enable query profiling for this query. When set, 

169 explain_metrics will be available on the returned generator. 

170 read_time (Optional[datetime.datetime]): If set, reads documents as they were at the given 

171 time. This must be a timestamp within the past one hour, or if Point-in-Time Recovery 

172 is enabled, can additionally be a whole minute timestamp within the past 7 days. If no 

173 timezone is specified in the :class:`datetime.datetime` object, it is assumed to be UTC. 

174 

175 Yields: 

176 List[AggregationResult]: 

177 The result of aggregations of this query. 

178 

179 Returns: 

180 (Optional[google.cloud.firestore_v1.types.query_profile.ExplainMetrtics]): 

181 The results of query profiling, if received from the service. 

182 

183 """ 

184 metrics: ExplainMetrics | None = None 

185 

186 response_iterator = self._get_stream_iterator( 

187 transaction, 

188 retry, 

189 timeout, 

190 explain_options, 

191 read_time, 

192 ) 

193 while True: 

194 try: 

195 response = next(response_iterator, None) 

196 except exceptions.GoogleAPICallError as exc: 

197 if self._retry_query_after_exception(exc, retry, transaction): 

198 response_iterator = self._get_stream_iterator( 

199 transaction, 

200 retry, 

201 timeout, 

202 explain_options, 

203 read_time, 

204 ) 

205 continue 

206 else: 

207 raise 

208 

209 if response is None: # EOI 

210 break 

211 

212 if metrics is None and response.explain_metrics: 

213 metrics = response.explain_metrics 

214 

215 result = _query_response_to_result(response) 

216 if result: 

217 yield result 

218 

219 return metrics 

220 

221 def stream( 

222 self, 

223 transaction: Optional["transaction.Transaction"] = None, 

224 retry: Union[retries.Retry, None, object] = gapic_v1.method.DEFAULT, 

225 timeout: Optional[float] = None, 

226 *, 

227 explain_options: Optional[ExplainOptions] = None, 

228 read_time: Optional[datetime.datetime] = None, 

229 ) -> StreamGenerator[List[AggregationResult]]: 

230 """Runs the aggregation query. 

231 

232 This sends a ``RunAggregationQuery`` RPC and then returns a generator 

233 which consumes each document returned in the stream of 

234 ``RunAggregationQueryResponse`` messages. 

235 

236 If a ``transaction`` is used and it already has write operations added, 

237 this method cannot be used (i.e. read-after-write is not allowed). 

238 

239 Args: 

240 transaction 

241 (Optional[:class:`~google.cloud.firestore_v1.transaction.Transaction`]): 

242 An existing transaction that this query will run in. 

243 retry (Optional[google.api_core.retry.Retry]): Designation of what 

244 errors, if any, should be retried. Defaults to a 

245 system-specified policy. 

246 timeout (Optinal[float]): The timeout for this request. Defaults 

247 to a system-specified value. 

248 explain_options 

249 (Optional[:class:`~google.cloud.firestore_v1.query_profile.ExplainOptions`]): 

250 Options to enable query profiling for this query. When set, 

251 explain_metrics will be available on the returned generator. 

252 read_time (Optional[datetime.datetime]): If set, reads documents as they were at the given 

253 time. This must be a timestamp within the past one hour, or if Point-in-Time Recovery 

254 is enabled, can additionally be a whole minute timestamp within the past 7 days. If no 

255 timezone is specified in the :class:`datetime.datetime` object, it is assumed to be UTC. 

256 

257 Returns: 

258 `StreamGenerator[List[AggregationResult]]`: 

259 A generator of the query results. 

260 """ 

261 inner_generator = self._make_stream( 

262 transaction=transaction, 

263 retry=retry, 

264 timeout=timeout, 

265 explain_options=explain_options, 

266 read_time=read_time, 

267 ) 

268 return StreamGenerator(inner_generator, explain_options)