Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/google/cloud/firestore_v1/aggregation.py: 28%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

53 statements  

1# Copyright 2023 Google LLC All rights reserved. 

2# 

3# Licensed under the Apache License, Version 2.0 (the "License"); 

4# you may not use this file except in compliance with the License. 

5# You may obtain a copy of the License at 

6# 

7# http://www.apache.org/licenses/LICENSE-2.0 

8# 

9# Unless required by applicable law or agreed to in writing, software 

10# distributed under the License is distributed on an "AS IS" BASIS, 

11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

12# See the License for the specific language governing permissions and 

13# limitations under the License. 

14 

15"""Classes for representing aggregation queries for the Google Cloud Firestore API. 

16 

17A :class:`~google.cloud.firestore_v1.aggregation.AggregationQuery` can be created directly from 

18a :class:`~google.cloud.firestore_v1.collection.Collection` and that can be 

19a more common way to create an aggregation query than direct usage of the constructor. 

20""" 

21from __future__ import annotations 

22 

23from typing import TYPE_CHECKING, Any, Generator, List, Optional, Union 

24 

25from google.api_core import exceptions, gapic_v1 

26from google.api_core import retry as retries 

27 

28from google.cloud.firestore_v1.base_aggregation import ( 

29 AggregationResult, 

30 BaseAggregationQuery, 

31 _query_response_to_result, 

32) 

33from google.cloud.firestore_v1.query_results import QueryResultsList 

34from google.cloud.firestore_v1.stream_generator import StreamGenerator 

35 

36# Types needed only for Type Hints 

37if TYPE_CHECKING: # pragma: NO COVER 

38 from google.cloud.firestore_v1 import transaction 

39 from google.cloud.firestore_v1.query_profile import ExplainMetrics 

40 from google.cloud.firestore_v1.query_profile import ExplainOptions 

41 

42 import datetime 

43 

44 

45class AggregationQuery(BaseAggregationQuery): 

46 """Represents an aggregation query to the Firestore API.""" 

47 

48 def __init__( 

49 self, 

50 nested_query, 

51 ) -> None: 

52 super(AggregationQuery, self).__init__(nested_query) 

53 

54 def get( 

55 self, 

56 transaction=None, 

57 retry: Union[retries.Retry, None, object] = gapic_v1.method.DEFAULT, 

58 timeout: float | None = None, 

59 *, 

60 explain_options: Optional[ExplainOptions] = None, 

61 read_time: Optional[datetime.datetime] = None, 

62 ) -> QueryResultsList[AggregationResult]: 

63 """Runs the aggregation query. 

64 

65 This sends a ``RunAggregationQuery`` RPC and returns a list of 

66 aggregation results in the stream of ``RunAggregationQueryResponse`` 

67 messages. 

68 

69 Args: 

70 transaction (Optional[:class:`~google.cloud.firestore_v1.transaction.Transaction`]): 

71 An existing transaction that this query will run in. 

72 If a ``transaction`` is used and it already has write operations 

73 added, this method cannot be used (i.e. read-after-write is not 

74 allowed). 

75 retry (google.api_core.retry.Retry): Designation of what errors, if any, 

76 should be retried. Defaults to a system-specified policy. 

77 timeout (float): The timeout for this request. Defaults to a 

78 system-specified value. 

79 explain_options (Optional[:class:`~google.cloud.firestore_v1.query_profile.ExplainOptions`]): 

80 Options to enable query profiling for this query. When set, 

81 explain_metrics will be available on the returned generator. 

82 read_time (Optional[datetime.datetime]): If set, reads documents as they were at the given 

83 time. This must be a timestamp within the past one hour, or if Point-in-Time Recovery 

84 is enabled, can additionally be a whole minute timestamp within the past 7 days. If no 

85 timezone is specified in the :class:`datetime.datetime` object, it is assumed to be UTC. 

86 

87 Returns: 

88 QueryResultsList[AggregationResult]: The aggregation query results. 

89 

90 """ 

91 explain_metrics: ExplainMetrics | None = None 

92 

93 result = self.stream( 

94 transaction=transaction, 

95 retry=retry, 

96 timeout=timeout, 

97 explain_options=explain_options, 

98 read_time=read_time, 

99 ) 

100 result_list = list(result) 

101 

102 if explain_options is None: 

103 explain_metrics = None 

104 else: 

105 explain_metrics = result.get_explain_metrics() 

106 

107 return QueryResultsList(result_list, explain_options, explain_metrics) 

108 

109 def _get_stream_iterator( 

110 self, transaction, retry, timeout, explain_options=None, read_time=None 

111 ): 

112 """Helper method for :meth:`stream`.""" 

113 request, kwargs = self._prep_stream( 

114 transaction, 

115 retry, 

116 timeout, 

117 explain_options, 

118 read_time, 

119 ) 

120 

121 return self._client._firestore_api.run_aggregation_query( 

122 request=request, 

123 metadata=self._client._rpc_metadata, 

124 **kwargs, 

125 ) 

126 

127 def _retry_query_after_exception(self, exc, retry, transaction): 

128 """Helper method for :meth:`stream`.""" 

129 if transaction is None: # no snapshot-based retry inside transaction 

130 if retry is gapic_v1.method.DEFAULT: 

131 transport = self._client._firestore_api._transport 

132 gapic_callable = transport.run_aggregation_query 

133 retry = gapic_callable._retry 

134 return retry._predicate(exc) 

135 

136 return False 

137 

138 def _make_stream( 

139 self, 

140 transaction: Optional[transaction.Transaction] = None, 

141 retry: Union[retries.Retry, None, object] = gapic_v1.method.DEFAULT, 

142 timeout: Optional[float] = None, 

143 explain_options: Optional[ExplainOptions] = None, 

144 read_time: Optional[datetime.datetime] = None, 

145 ) -> Generator[List[AggregationResult], Any, Optional[ExplainMetrics]]: 

146 """Internal method for stream(). Runs the aggregation query. 

147 

148 This sends a ``RunAggregationQuery`` RPC and then returns a generator 

149 which consumes each document returned in the stream of 

150 ``RunAggregationQueryResponse`` messages. 

151 

152 If a ``transaction`` is used and it already has write operations added, 

153 this method cannot be used (i.e. read-after-write is not allowed). 

154 

155 Args: 

156 transaction (Optional[:class:`~google.cloud.firestore_v1.transaction.Transaction`]): 

157 An existing transaction that this query will run in. 

158 retry (Optional[google.api_core.retry.Retry]): Designation of what 

159 errors, if any, should be retried. Defaults to a 

160 system-specified policy. 

161 timeout (Optional[float]): The timeout for this request. Defaults 

162 to a system-specified value. 

163 explain_options (Optional[:class:`~google.cloud.firestore_v1.query_profile.ExplainOptions`]): 

164 Options to enable query profiling for this query. When set, 

165 explain_metrics will be available on the returned generator. 

166 read_time (Optional[datetime.datetime]): If set, reads documents as they were at the given 

167 time. This must be a timestamp within the past one hour, or if Point-in-Time Recovery 

168 is enabled, can additionally be a whole minute timestamp within the past 7 days. If no 

169 timezone is specified in the :class:`datetime.datetime` object, it is assumed to be UTC. 

170 

171 Yields: 

172 List[AggregationResult]: 

173 The result of aggregations of this query. 

174 

175 Returns: 

176 (Optional[google.cloud.firestore_v1.types.query_profile.ExplainMetrtics]): 

177 The results of query profiling, if received from the service. 

178 

179 """ 

180 metrics: ExplainMetrics | None = None 

181 

182 response_iterator = self._get_stream_iterator( 

183 transaction, 

184 retry, 

185 timeout, 

186 explain_options, 

187 read_time, 

188 ) 

189 while True: 

190 try: 

191 response = next(response_iterator, None) 

192 except exceptions.GoogleAPICallError as exc: 

193 if self._retry_query_after_exception(exc, retry, transaction): 

194 response_iterator = self._get_stream_iterator( 

195 transaction, 

196 retry, 

197 timeout, 

198 explain_options, 

199 read_time, 

200 ) 

201 continue 

202 else: 

203 raise 

204 

205 if response is None: # EOI 

206 break 

207 

208 if metrics is None and response.explain_metrics: 

209 metrics = response.explain_metrics 

210 

211 result = _query_response_to_result(response) 

212 if result: 

213 yield result 

214 

215 return metrics 

216 

217 def stream( 

218 self, 

219 transaction: Optional["transaction.Transaction"] = None, 

220 retry: Union[retries.Retry, None, object] = gapic_v1.method.DEFAULT, 

221 timeout: Optional[float] = None, 

222 *, 

223 explain_options: Optional[ExplainOptions] = None, 

224 read_time: Optional[datetime.datetime] = None, 

225 ) -> StreamGenerator[List[AggregationResult]]: 

226 """Runs the aggregation query. 

227 

228 This sends a ``RunAggregationQuery`` RPC and then returns a generator 

229 which consumes each document returned in the stream of 

230 ``RunAggregationQueryResponse`` messages. 

231 

232 If a ``transaction`` is used and it already has write operations added, 

233 this method cannot be used (i.e. read-after-write is not allowed). 

234 

235 Args: 

236 transaction (Optional[:class:`~google.cloud.firestore_v1.transaction.Transaction`]): 

237 An existing transaction that this query will run in. 

238 retry (Optional[google.api_core.retry.Retry]): Designation of what 

239 errors, if any, should be retried. Defaults to a 

240 system-specified policy. 

241 timeout (Optinal[float]): The timeout for this request. Defaults 

242 to a system-specified value. 

243 explain_options (Optional[:class:`~google.cloud.firestore_v1.query_profile.ExplainOptions`]): 

244 Options to enable query profiling for this query. When set, 

245 explain_metrics will be available on the returned generator. 

246 read_time (Optional[datetime.datetime]): If set, reads documents as they were at the given 

247 time. This must be a timestamp within the past one hour, or if Point-in-Time Recovery 

248 is enabled, can additionally be a whole minute timestamp within the past 7 days. If no 

249 timezone is specified in the :class:`datetime.datetime` object, it is assumed to be UTC. 

250 

251 Returns: 

252 `StreamGenerator[List[AggregationResult]]`: 

253 A generator of the query results. 

254 """ 

255 inner_generator = self._make_stream( 

256 transaction=transaction, 

257 retry=retry, 

258 timeout=timeout, 

259 explain_options=explain_options, 

260 read_time=read_time, 

261 ) 

262 return StreamGenerator(inner_generator, explain_options)