Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/google/cloud/firestore_v1/aggregation.py: 35%

37 statements  

« prev     ^ index     » next       coverage.py v7.3.2, created at 2023-12-09 06:27 +0000

1# Copyright 2023 Google LLC All rights reserved. 

2# 

3# Licensed under the Apache License, Version 2.0 (the "License"); 

4# you may not use this file except in compliance with the License. 

5# You may obtain a copy of the License at 

6# 

7# http://www.apache.org/licenses/LICENSE-2.0 

8# 

9# Unless required by applicable law or agreed to in writing, software 

10# distributed under the License is distributed on an "AS IS" BASIS, 

11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

12# See the License for the specific language governing permissions and 

13# limitations under the License. 

14 

15"""Classes for representing aggregation queries for the Google Cloud Firestore API. 

16 

17A :class:`~google.cloud.firestore_v1.aggregation.AggregationQuery` can be created directly from 

18a :class:`~google.cloud.firestore_v1.collection.Collection` and that can be 

19a more common way to create an aggregation query than direct usage of the constructor. 

20""" 

21from __future__ import annotations 

22 

23from google.api_core import exceptions 

24from google.api_core import gapic_v1 

25from google.api_core import retry as retries 

26 

27 

28from google.cloud.firestore_v1.base_aggregation import ( 

29 AggregationResult, 

30 BaseAggregationQuery, 

31 _query_response_to_result, 

32) 

33 

34from typing import Generator, Union, List, Any 

35 

36 

37class AggregationQuery(BaseAggregationQuery): 

38 """Represents an aggregation query to the Firestore API.""" 

39 

40 def __init__( 

41 self, 

42 nested_query, 

43 ) -> None: 

44 super(AggregationQuery, self).__init__(nested_query) 

45 

46 def get( 

47 self, 

48 transaction=None, 

49 retry: Union[ 

50 retries.Retry, None, gapic_v1.method._MethodDefault 

51 ] = gapic_v1.method.DEFAULT, 

52 timeout: float | None = None, 

53 ) -> List[AggregationResult]: 

54 """Runs the aggregation query. 

55 

56 This sends a ``RunAggregationQuery`` RPC and returns a list of aggregation results in the stream of ``RunAggregationQueryResponse`` messages. 

57 

58 Args: 

59 transaction 

60 (Optional[:class:`~google.cloud.firestore_v1.transaction.Transaction`]): 

61 An existing transaction that this query will run in. 

62 If a ``transaction`` is used and it already has write operations 

63 added, this method cannot be used (i.e. read-after-write is not 

64 allowed). 

65 retry (google.api_core.retry.Retry): Designation of what errors, if any, 

66 should be retried. Defaults to a system-specified policy. 

67 timeout (float): The timeout for this request. Defaults to a 

68 system-specified value. 

69 

70 Returns: 

71 list: The aggregation query results 

72 

73 """ 

74 result = self.stream(transaction=transaction, retry=retry, timeout=timeout) 

75 return list(result) # type: ignore 

76 

77 def _get_stream_iterator(self, transaction, retry, timeout): 

78 """Helper method for :meth:`stream`.""" 

79 request, kwargs = self._prep_stream( 

80 transaction, 

81 retry, 

82 timeout, 

83 ) 

84 

85 return self._client._firestore_api.run_aggregation_query( 

86 request=request, 

87 metadata=self._client._rpc_metadata, 

88 **kwargs, 

89 ) 

90 

91 def _retry_query_after_exception(self, exc, retry, transaction): 

92 """Helper method for :meth:`stream`.""" 

93 if transaction is None: # no snapshot-based retry inside transaction 

94 if retry is gapic_v1.method.DEFAULT: 

95 transport = self._client._firestore_api._transport 

96 gapic_callable = transport.run_aggregation_query 

97 retry = gapic_callable._retry 

98 return retry._predicate(exc) 

99 

100 return False 

101 

102 def stream( 

103 self, 

104 transaction=None, 

105 retry: Union[ 

106 retries.Retry, None, gapic_v1.method._MethodDefault 

107 ] = gapic_v1.method.DEFAULT, 

108 timeout: float | None = None, 

109 ) -> Union[Generator[List[AggregationResult], Any, None]]: 

110 """Runs the aggregation query. 

111 

112 This sends a ``RunAggregationQuery`` RPC and then returns an iterator which 

113 consumes each document returned in the stream of ``RunAggregationQueryResponse`` 

114 messages. 

115 

116 If a ``transaction`` is used and it already has write operations 

117 added, this method cannot be used (i.e. read-after-write is not 

118 allowed). 

119 

120 Args: 

121 transaction 

122 (Optional[:class:`~google.cloud.firestore_v1.transaction.Transaction`]): 

123 An existing transaction that this query will run in. 

124 retry (google.api_core.retry.Retry): Designation of what errors, if any, 

125 should be retried. Defaults to a system-specified policy. 

126 timeout (float): The timeout for this request. Defaults to a 

127 system-specified value. 

128 

129 Yields: 

130 :class:`~google.cloud.firestore_v1.base_aggregation.AggregationResult`: 

131 The result of aggregations of this query 

132 """ 

133 

134 response_iterator = self._get_stream_iterator( 

135 transaction, 

136 retry, 

137 timeout, 

138 ) 

139 while True: 

140 try: 

141 response = next(response_iterator, None) 

142 except exceptions.GoogleAPICallError as exc: 

143 if self._retry_query_after_exception(exc, retry, transaction): 

144 response_iterator = self._get_stream_iterator( 

145 transaction, 

146 retry, 

147 timeout, 

148 ) 

149 continue 

150 else: 

151 raise 

152 

153 if response is None: # EOI 

154 break 

155 result = _query_response_to_result(response) 

156 yield result