Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/google/cloud/firestore_v1/async_aggregation.py: 39%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

36 statements  

1# Copyright 2023 Google LLC All rights reserved. 

2# 

3# Licensed under the Apache License, Version 2.0 (the "License"); 

4# you may not use this file except in compliance with the License. 

5# You may obtain a copy of the License at 

6# 

7# http://www.apache.org/licenses/LICENSE-2.0 

8# 

9# Unless required by applicable law or agreed to in writing, software 

10# distributed under the License is distributed on an "AS IS" BASIS, 

11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

12# See the License for the specific language governing permissions and 

13# limitations under the License. 

14 

15"""Classes for representing Async aggregation queries for the Google Cloud Firestore API. 

16 

17A :class:`~google.cloud.firestore_v1.async_aggregation.AsyncAggregationQuery` can be created directly from 

18a :class:`~google.cloud.firestore_v1.async_collection.AsyncCollection` and that can be 

19a more common way to create an aggregation query than direct usage of the constructor. 

20""" 

21from __future__ import annotations 

22 

23from typing import TYPE_CHECKING, Any, AsyncGenerator, List, Optional, Union 

24 

25from google.api_core import gapic_v1 

26from google.api_core import retry_async as retries 

27 

28from google.cloud.firestore_v1 import transaction 

29from google.cloud.firestore_v1.async_stream_generator import AsyncStreamGenerator 

30from google.cloud.firestore_v1.base_aggregation import ( 

31 BaseAggregationQuery, 

32 _query_response_to_result, 

33) 

34from google.cloud.firestore_v1.query_results import QueryResultsList 

35 

36if TYPE_CHECKING: # pragma: NO COVER 

37 from google.cloud.firestore_v1.base_aggregation import AggregationResult 

38 from google.cloud.firestore_v1.query_profile import ExplainMetrics, ExplainOptions 

39 import google.cloud.firestore_v1.types.query_profile as query_profile_pb 

40 import datetime 

41 

42 

43class AsyncAggregationQuery(BaseAggregationQuery): 

44 """Represents an aggregation query to the Firestore API.""" 

45 

46 def __init__( 

47 self, 

48 nested_query, 

49 ) -> None: 

50 super(AsyncAggregationQuery, self).__init__(nested_query) 

51 

52 async def get( 

53 self, 

54 transaction=None, 

55 retry: Union[retries.AsyncRetry, None, object] = gapic_v1.method.DEFAULT, 

56 timeout: float | None = None, 

57 *, 

58 explain_options: Optional[ExplainOptions] = None, 

59 read_time: Optional[datetime.datetime] = None, 

60 ) -> QueryResultsList[List[AggregationResult]]: 

61 """Runs the aggregation query. 

62 

63 This sends a ``RunAggregationQuery`` RPC and returns a list of aggregation results in the stream of ``RunAggregationQueryResponse`` messages. 

64 

65 Args: 

66 transaction 

67 (Optional[:class:`~google.cloud.firestore_v1.transaction.Transaction`]): 

68 An existing transaction that this query will run in. 

69 If a ``transaction`` is used and it already has write operations 

70 added, this method cannot be used (i.e. read-after-write is not 

71 allowed). 

72 retry (google.api_core.retry.Retry): Designation of what errors, if any, 

73 should be retried. Defaults to a system-specified policy. 

74 timeout (float): The timeout for this request. Defaults to a 

75 system-specified value. 

76 explain_options 

77 (Optional[:class:`~google.cloud.firestore_v1.query_profile.ExplainOptions`]): 

78 Options to enable query profiling for this query. When set, 

79 explain_metrics will be available on the returned generator. 

80 read_time (Optional[datetime.datetime]): If set, reads documents as they were at the given 

81 time. This must be a timestamp within the past one hour, or if Point-in-Time Recovery 

82 is enabled, can additionally be a whole minute timestamp within the past 7 days. If no 

83 timezone is specified in the :class:`datetime.datetime` object, it is assumed to be UTC. 

84 

85 Returns: 

86 QueryResultsList[List[AggregationResult]]: The aggregation query results. 

87 

88 """ 

89 explain_metrics: ExplainMetrics | None = None 

90 

91 stream_result = self.stream( 

92 transaction=transaction, 

93 retry=retry, 

94 timeout=timeout, 

95 explain_options=explain_options, 

96 read_time=read_time, 

97 ) 

98 try: 

99 result = [aggregation async for aggregation in stream_result] 

100 

101 if explain_options is None: 

102 explain_metrics = None 

103 else: 

104 explain_metrics = await stream_result.get_explain_metrics() 

105 finally: 

106 await stream_result.aclose() 

107 

108 return QueryResultsList(result, explain_options, explain_metrics) 

109 

110 async def _make_stream( 

111 self, 

112 transaction: Optional[transaction.Transaction] = None, 

113 retry: retries.AsyncRetry | object | None = gapic_v1.method.DEFAULT, 

114 timeout: Optional[float] = None, 

115 explain_options: Optional[ExplainOptions] = None, 

116 read_time: Optional[datetime.datetime] = None, 

117 ) -> AsyncGenerator[List[AggregationResult] | query_profile_pb.ExplainMetrics, Any]: 

118 """Internal method for stream(). Runs the aggregation query. 

119 

120 This sends a ``RunAggregationQuery`` RPC and then returns a generator which 

121 consumes each document returned in the stream of ``RunAggregationQueryResponse`` 

122 messages. 

123 

124 If a ``transaction`` is used and it already has write operations 

125 added, this method cannot be used (i.e. read-after-write is not 

126 allowed). 

127 

128 Args: 

129 transaction (Optional[:class:`~google.cloud.firestore_v1.transaction.\ 

130 Transaction`]): 

131 An existing transaction that the query will run in. 

132 retry (Optional[google.api_core.retry.Retry]): Designation of what 

133 errors, if any, should be retried. Defaults to a 

134 system-specified policy. 

135 timeout (Optional[float]): The timeout for this request. Defaults 

136 to a system-specified value. 

137 explain_options 

138 (Optional[:class:`~google.cloud.firestore_v1.query_profile.ExplainOptions`]): 

139 Options to enable query profiling for this query. When set, 

140 explain_metrics will be available on the returned generator. 

141 read_time (Optional[datetime.datetime]): If set, reads documents as they were at the given 

142 time. This must be a timestamp within the past one hour, or if Point-in-Time Recovery 

143 is enabled, can additionally be a whole minute timestamp within the past 7 days. If no 

144 timezone is specified in the :class:`datetime.datetime` object, it is assumed to be UTC. 

145 

146 Yields: 

147 List[AggregationResult] | query_profile_pb.ExplainMetrics: 

148 The result of aggregations of this query. Query results will be 

149 yielded as `List[AggregationResult]`. When the result contains 

150 returned explain metrics, yield `query_profile_pb.ExplainMetrics` 

151 individually. 

152 """ 

153 request, kwargs = self._prep_stream( 

154 transaction, 

155 retry, 

156 timeout, 

157 explain_options, 

158 read_time, 

159 ) 

160 

161 response_iterator = await self._client._firestore_api.run_aggregation_query( 

162 request=request, 

163 metadata=self._client._rpc_metadata, 

164 **kwargs, 

165 ) 

166 

167 async for response in response_iterator: 

168 result = _query_response_to_result(response) 

169 if result: 

170 yield result 

171 

172 if response.explain_metrics: 

173 metrics = response.explain_metrics 

174 yield metrics 

175 

176 def stream( 

177 self, 

178 transaction: Optional[transaction.Transaction] = None, 

179 retry: retries.AsyncRetry | object | None = gapic_v1.method.DEFAULT, 

180 timeout: Optional[float] = None, 

181 *, 

182 explain_options: Optional[ExplainOptions] = None, 

183 read_time: Optional[datetime.datetime] = None, 

184 ) -> AsyncStreamGenerator[List[AggregationResult]]: 

185 """Runs the aggregation query. 

186 

187 This sends a ``RunAggregationQuery`` RPC and then returns a generator 

188 which consumes each document returned in the stream of 

189 ``RunAggregationQueryResponse`` messages. 

190 

191 If a ``transaction`` is used and it already has write operations added, 

192 this method cannot be used (i.e. read-after-write is not allowed). 

193 

194 Args: 

195 transaction (Optional[:class:`~google.cloud.firestore_v1.transaction.\ 

196 Transaction`]): 

197 An existing transaction that the query will run in. 

198 retry (Optional[google.api_core.retry.Retry]): Designation of what 

199 errors, if any, should be retried. Defaults to a 

200 system-specified policy. 

201 timeout (Optional[float]): The timeout for this request. Defaults 

202 to a system-specified value. 

203 explain_options 

204 (Optional[:class:`~google.cloud.firestore_v1.query_profile.ExplainOptions`]): 

205 Options to enable query profiling for this query. When set, 

206 explain_metrics will be available on the returned generator. 

207 read_time (Optional[datetime.datetime]): If set, reads documents as they were at the given 

208 time. This must be a timestamp within the past one hour, or if Point-in-Time Recovery 

209 is enabled, can additionally be a whole minute timestamp within the past 7 days. If no 

210 timezone is specified in the :class:`datetime.datetime` object, it is assumed to be UTC. 

211 

212 Returns: 

213 `AsyncStreamGenerator[List[AggregationResult]]`: 

214 A generator of the query results. 

215 """ 

216 

217 inner_generator = self._make_stream( 

218 transaction=transaction, 

219 retry=retry, 

220 timeout=timeout, 

221 explain_options=explain_options, 

222 read_time=read_time, 

223 ) 

224 return AsyncStreamGenerator(inner_generator, explain_options)