Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/google/cloud/firestore_v1/async_aggregation.py: 39%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

36 statements  

1# Copyright 2023 Google LLC All rights reserved. 

2# 

3# Licensed under the Apache License, Version 2.0 (the "License"); 

4# you may not use this file except in compliance with the License. 

5# You may obtain a copy of the License at 

6# 

7# http://www.apache.org/licenses/LICENSE-2.0 

8# 

9# Unless required by applicable law or agreed to in writing, software 

10# distributed under the License is distributed on an "AS IS" BASIS, 

11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

12# See the License for the specific language governing permissions and 

13# limitations under the License. 

14 

15"""Classes for representing Async aggregation queries for the Google Cloud Firestore API. 

16 

17A :class:`~google.cloud.firestore_v1.async_aggregation.AsyncAggregationQuery` can be created directly from 

18a :class:`~google.cloud.firestore_v1.async_collection.AsyncCollection` and that can be 

19a more common way to create an aggregation query than direct usage of the constructor. 

20""" 

21from __future__ import annotations 

22 

23from typing import TYPE_CHECKING, Any, AsyncGenerator, List, Optional, Union 

24 

25from google.api_core import gapic_v1 

26from google.api_core import retry_async as retries 

27 

28from google.cloud.firestore_v1 import transaction 

29from google.cloud.firestore_v1.async_stream_generator import AsyncStreamGenerator 

30from google.cloud.firestore_v1.base_aggregation import ( 

31 BaseAggregationQuery, 

32 _query_response_to_result, 

33) 

34from google.cloud.firestore_v1.query_results import QueryResultsList 

35 

36if TYPE_CHECKING: # pragma: NO COVER 

37 from google.cloud.firestore_v1.base_aggregation import AggregationResult 

38 from google.cloud.firestore_v1.query_profile import ExplainMetrics, ExplainOptions 

39 import google.cloud.firestore_v1.types.query_profile as query_profile_pb 

40 import datetime 

41 

42 

43class AsyncAggregationQuery(BaseAggregationQuery): 

44 """Represents an aggregation query to the Firestore API.""" 

45 

46 def __init__( 

47 self, 

48 nested_query, 

49 ) -> None: 

50 super(AsyncAggregationQuery, self).__init__(nested_query) 

51 

52 async def get( 

53 self, 

54 transaction=None, 

55 retry: Union[retries.AsyncRetry, None, object] = gapic_v1.method.DEFAULT, 

56 timeout: float | None = None, 

57 *, 

58 explain_options: Optional[ExplainOptions] = None, 

59 read_time: Optional[datetime.datetime] = None, 

60 ) -> QueryResultsList[List[AggregationResult]]: 

61 """Runs the aggregation query. 

62 

63 This sends a ``RunAggregationQuery`` RPC and returns a list of aggregation results in the stream of ``RunAggregationQueryResponse`` messages. 

64 

65 Args: 

66 transaction (Optional[:class:`~google.cloud.firestore_v1.transaction.Transaction`]): 

67 An existing transaction that this query will run in. 

68 If a ``transaction`` is used and it already has write operations 

69 added, this method cannot be used (i.e. read-after-write is not 

70 allowed). 

71 retry (google.api_core.retry.Retry): Designation of what errors, if any, 

72 should be retried. Defaults to a system-specified policy. 

73 timeout (float): The timeout for this request. Defaults to a 

74 system-specified value. 

75 explain_options (Optional[:class:`~google.cloud.firestore_v1.query_profile.ExplainOptions`]): 

76 Options to enable query profiling for this query. When set, 

77 explain_metrics will be available on the returned generator. 

78 read_time (Optional[datetime.datetime]): If set, reads documents as they were at the given 

79 time. This must be a timestamp within the past one hour, or if Point-in-Time Recovery 

80 is enabled, can additionally be a whole minute timestamp within the past 7 days. If no 

81 timezone is specified in the :class:`datetime.datetime` object, it is assumed to be UTC. 

82 

83 Returns: 

84 QueryResultsList[List[AggregationResult]]: The aggregation query results. 

85 

86 """ 

87 explain_metrics: ExplainMetrics | None = None 

88 

89 stream_result = self.stream( 

90 transaction=transaction, 

91 retry=retry, 

92 timeout=timeout, 

93 explain_options=explain_options, 

94 read_time=read_time, 

95 ) 

96 try: 

97 result = [aggregation async for aggregation in stream_result] 

98 

99 if explain_options is None: 

100 explain_metrics = None 

101 else: 

102 explain_metrics = await stream_result.get_explain_metrics() 

103 finally: 

104 await stream_result.aclose() 

105 

106 return QueryResultsList(result, explain_options, explain_metrics) 

107 

108 async def _make_stream( 

109 self, 

110 transaction: Optional[transaction.Transaction] = None, 

111 retry: retries.AsyncRetry | object | None = gapic_v1.method.DEFAULT, 

112 timeout: Optional[float] = None, 

113 explain_options: Optional[ExplainOptions] = None, 

114 read_time: Optional[datetime.datetime] = None, 

115 ) -> AsyncGenerator[List[AggregationResult] | query_profile_pb.ExplainMetrics, Any]: 

116 """Internal method for stream(). Runs the aggregation query. 

117 

118 This sends a ``RunAggregationQuery`` RPC and then returns a generator which 

119 consumes each document returned in the stream of ``RunAggregationQueryResponse`` 

120 messages. 

121 

122 If a ``transaction`` is used and it already has write operations 

123 added, this method cannot be used (i.e. read-after-write is not 

124 allowed). 

125 

126 Args: 

127 transaction (Optional[:class:`~google.cloud.firestore_v1.transaction.\ 

128 Transaction`]): 

129 An existing transaction that the query will run in. 

130 retry (Optional[google.api_core.retry.Retry]): Designation of what 

131 errors, if any, should be retried. Defaults to a 

132 system-specified policy. 

133 timeout (Optional[float]): The timeout for this request. Defaults 

134 to a system-specified value. 

135 explain_options (Optional[:class:`~google.cloud.firestore_v1.query_profile.ExplainOptions`]): 

136 Options to enable query profiling for this query. When set, 

137 explain_metrics will be available on the returned generator. 

138 read_time (Optional[datetime.datetime]): If set, reads documents as they were at the given 

139 time. This must be a timestamp within the past one hour, or if Point-in-Time Recovery 

140 is enabled, can additionally be a whole minute timestamp within the past 7 days. If no 

141 timezone is specified in the :class:`datetime.datetime` object, it is assumed to be UTC. 

142 

143 Yields: 

144 List[AggregationResult] | query_profile_pb.ExplainMetrics: 

145 The result of aggregations of this query. Query results will be 

146 yielded as `List[AggregationResult]`. When the result contains 

147 returned explain metrics, yield `query_profile_pb.ExplainMetrics` 

148 individually. 

149 """ 

150 request, kwargs = self._prep_stream( 

151 transaction, 

152 retry, 

153 timeout, 

154 explain_options, 

155 read_time, 

156 ) 

157 

158 response_iterator = await self._client._firestore_api.run_aggregation_query( 

159 request=request, 

160 metadata=self._client._rpc_metadata, 

161 **kwargs, 

162 ) 

163 

164 async for response in response_iterator: 

165 result = _query_response_to_result(response) 

166 if result: 

167 yield result 

168 

169 if response.explain_metrics: 

170 metrics = response.explain_metrics 

171 yield metrics 

172 

173 def stream( 

174 self, 

175 transaction: Optional[transaction.Transaction] = None, 

176 retry: retries.AsyncRetry | object | None = gapic_v1.method.DEFAULT, 

177 timeout: Optional[float] = None, 

178 *, 

179 explain_options: Optional[ExplainOptions] = None, 

180 read_time: Optional[datetime.datetime] = None, 

181 ) -> AsyncStreamGenerator[List[AggregationResult]]: 

182 """Runs the aggregation query. 

183 

184 This sends a ``RunAggregationQuery`` RPC and then returns a generator 

185 which consumes each document returned in the stream of 

186 ``RunAggregationQueryResponse`` messages. 

187 

188 If a ``transaction`` is used and it already has write operations added, 

189 this method cannot be used (i.e. read-after-write is not allowed). 

190 

191 Args: 

192 transaction (Optional[:class:`~google.cloud.firestore_v1.transaction.\ 

193 Transaction`]): 

194 An existing transaction that the query will run in. 

195 retry (Optional[google.api_core.retry.Retry]): Designation of what 

196 errors, if any, should be retried. Defaults to a 

197 system-specified policy. 

198 timeout (Optional[float]): The timeout for this request. Defaults 

199 to a system-specified value. 

200 explain_options (Optional[:class:`~google.cloud.firestore_v1.query_profile.ExplainOptions`]): 

201 Options to enable query profiling for this query. When set, 

202 explain_metrics will be available on the returned generator. 

203 read_time (Optional[datetime.datetime]): If set, reads documents as they were at the given 

204 time. This must be a timestamp within the past one hour, or if Point-in-Time Recovery 

205 is enabled, can additionally be a whole minute timestamp within the past 7 days. If no 

206 timezone is specified in the :class:`datetime.datetime` object, it is assumed to be UTC. 

207 

208 Returns: 

209 `AsyncStreamGenerator[List[AggregationResult]]`: 

210 A generator of the query results. 

211 """ 

212 

213 inner_generator = self._make_stream( 

214 transaction=transaction, 

215 retry=retry, 

216 timeout=timeout, 

217 explain_options=explain_options, 

218 read_time=read_time, 

219 ) 

220 return AsyncStreamGenerator(inner_generator, explain_options)