Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/google/cloud/firestore_v1/async_vector_query.py: 37%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

38 statements  

1# Copyright 2024 Google LLC All rights reserved. 

2# 

3# Licensed under the Apache License, Version 2.0 (the "License"); 

4# you may not use this file except in compliance with the License. 

5# You may obtain a copy of the License at 

6# 

7# http://www.apache.org/licenses/LICENSE-2.0 

8# 

9# Unless required by applicable law or agreed to in writing, software 

10# distributed under the License is distributed on an "AS IS" BASIS, 

11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

12# See the License for the specific language governing permissions and 

13# limitations under the License. 

14 

15from __future__ import annotations 

16 

17from typing import TYPE_CHECKING, Any, AsyncGenerator, Optional, TypeVar, Union 

18 

19from google.api_core import gapic_v1 

20from google.api_core import retry as retries 

21 

22from google.cloud.firestore_v1.async_stream_generator import AsyncStreamGenerator 

23from google.cloud.firestore_v1.base_query import ( 

24 BaseQuery, 

25 _collection_group_query_response_to_snapshot, 

26 _query_response_to_snapshot, 

27) 

28from google.cloud.firestore_v1.base_vector_query import BaseVectorQuery 

29from google.cloud.firestore_v1.query_results import QueryResultsList 

30 

31# Types needed only for Type Hints 

32if TYPE_CHECKING: # pragma: NO COVER 

33 from google.cloud.firestore_v1.base_document import DocumentSnapshot 

34 from google.cloud.firestore_v1.query_profile import ExplainMetrics, ExplainOptions 

35 from google.cloud.firestore_v1 import transaction 

36 import google.cloud.firestore_v1.types.query_profile as query_profile_pb 

37 

38TAsyncVectorQuery = TypeVar("TAsyncVectorQuery", bound="AsyncVectorQuery") 

39 

40 

41class AsyncVectorQuery(BaseVectorQuery): 

42 """Represents an async vector query to the Firestore API.""" 

43 

44 def __init__( 

45 self, 

46 nested_query: Union[BaseQuery, TAsyncVectorQuery], 

47 ) -> None: 

48 """Presents the vector query. 

49 Args: 

50 nested_query (BaseQuery | VectorQuery): the base query to apply as the prefilter. 

51 """ 

52 super(AsyncVectorQuery, self).__init__(nested_query) 

53 

54 async def get( 

55 self, 

56 transaction=None, 

57 retry: retries.AsyncRetry | object | None = gapic_v1.method.DEFAULT, 

58 timeout: Optional[float] = None, 

59 *, 

60 explain_options: Optional[ExplainOptions] = None, 

61 ) -> QueryResultsList[DocumentSnapshot]: 

62 """Runs the vector query. 

63 

64 This sends a ``RunQuery`` RPC and returns a list of document messages. 

65 

66 Args: 

67 transaction 

68 (Optional[:class:`~google.cloud.firestore_v1.transaction.Transaction`]): 

69 An existing transaction that this query will run in. 

70 If a ``transaction`` is used and it already has write operations 

71 added, this method cannot be used (i.e. read-after-write is not 

72 allowed). 

73 retry (google.api_core.retry.Retry): Designation of what errors, if any, 

74 should be retried. Defaults to a system-specified policy. 

75 timeout (float): The timeout for this request. Defaults to a 

76 system-specified value. 

77 explain_options 

78 (Optional[:class:`~google.cloud.firestore_v1.query_profile.ExplainOptions`]): 

79 Options to enable query profiling for this query. When set, 

80 explain_metrics will be available on the returned generator. 

81 

82 Returns: 

83 QueryResultsList[DocumentSnapshot]: The documents in the collection 

84 that match this query. 

85 """ 

86 explain_metrics: ExplainMetrics | None = None 

87 

88 stream_result = self.stream( 

89 transaction=transaction, 

90 retry=retry, 

91 timeout=timeout, 

92 explain_options=explain_options, 

93 ) 

94 try: 

95 result = [snapshot async for snapshot in stream_result] 

96 

97 if explain_options is None: 

98 explain_metrics = None 

99 else: 

100 explain_metrics = await stream_result.get_explain_metrics() 

101 finally: 

102 await stream_result.aclose() 

103 

104 return QueryResultsList(result, explain_options, explain_metrics) 

105 

106 async def _make_stream( 

107 self, 

108 transaction: Optional[transaction.Transaction] = None, 

109 retry: retries.AsyncRetry | object | None = gapic_v1.method.DEFAULT, 

110 timeout: Optional[float] = None, 

111 explain_options: Optional[ExplainOptions] = None, 

112 ) -> AsyncGenerator[DocumentSnapshot | query_profile_pb.ExplainMetrics, Any]: 

113 """Internal method for stream(). Read the documents in the collection 

114 that match this query. 

115 

116 This sends a ``RunQuery`` RPC and then returns a generator which 

117 consumes each document returned in the stream of ``RunQueryResponse`` 

118 messages. 

119 

120 If a ``transaction`` is used and it already has write operations 

121 added, this method cannot be used (i.e. read-after-write is not 

122 allowed). 

123 

124 Args: 

125 transaction (Optional[:class:`~google.cloud.firestore_v1.transaction.\ 

126 Transaction`]): 

127 An existing transaction that the query will run in. 

128 retry (Optional[google.api_core.retry.Retry]): Designation of what 

129 errors, if any, should be retried. Defaults to a 

130 system-specified policy. 

131 timeout (Optional[float]): The timeout for this request. Defaults 

132 to a system-specified value. 

133 explain_options 

134 (Optional[:class:`~google.cloud.firestore_v1.query_profile.ExplainOptions`]): 

135 Options to enable query profiling for this query. When set, 

136 explain_metrics will be available on the returned generator. 

137 

138 Yields: 

139 [:class:`~google.cloud.firestore_v1.base_document.DocumentSnapshot` \ 

140 | google.cloud.firestore_v1.types.query_profile.ExplainMetrtics]: 

141 The next document that fulfills the query. Query results will be 

142 yielded as `DocumentSnapshot`. When the result contains returned 

143 explain metrics, yield `query_profile_pb.ExplainMetrics` individually. 

144 """ 

145 request, expected_prefix, kwargs = self._prep_stream( 

146 transaction, 

147 retry, 

148 timeout, 

149 explain_options, 

150 ) 

151 

152 response_iterator = await self._client._firestore_api.run_query( 

153 request=request, 

154 metadata=self._client._rpc_metadata, 

155 **kwargs, 

156 ) 

157 async for response in response_iterator: 

158 if self._nested_query._all_descendants: 

159 snapshot = _collection_group_query_response_to_snapshot( 

160 response, self._nested_query._parent 

161 ) 

162 else: 

163 snapshot = _query_response_to_snapshot( 

164 response, self._nested_query._parent, expected_prefix 

165 ) 

166 if snapshot is not None: 

167 yield snapshot 

168 

169 if response.explain_metrics: 

170 metrics = response.explain_metrics 

171 yield metrics 

172 

173 def stream( 

174 self, 

175 transaction=None, 

176 retry: retries.AsyncRetry | object | None = gapic_v1.method.DEFAULT, 

177 timeout: Optional[float] = None, 

178 *, 

179 explain_options: Optional[ExplainOptions] = None, 

180 ) -> AsyncStreamGenerator[DocumentSnapshot]: 

181 """Reads the documents in the collection that match this query. 

182 

183 This sends a ``RunQuery`` RPC and then returns an iterator which 

184 consumes each document returned in the stream of ``RunQueryResponse`` 

185 messages. 

186 

187 If a ``transaction`` is used and it already has write operations 

188 added, this method cannot be used (i.e. read-after-write is not 

189 allowed). 

190 

191 Args: 

192 transaction 

193 (Optional[:class:`~google.cloud.firestore_v1.transaction.Transaction`]): 

194 An existing transaction that this query will run in. 

195 retry (google.api_core.retry.Retry): Designation of what errors, if any, 

196 should be retried. Defaults to a system-specified policy. 

197 timeout (float): The timeout for this request. Defaults to a 

198 system-specified value. 

199 explain_options 

200 (Optional[:class:`~google.cloud.firestore_v1.query_profile.ExplainOptions`]): 

201 Options to enable query profiling for this query. When set, 

202 explain_metrics will be available on the returned generator. 

203 

204 Returns: 

205 `AsyncStreamGenerator[DocumentSnapshot]`: 

206 An asynchronous generator of the queryresults. 

207 """ 

208 

209 inner_generator = self._make_stream( 

210 transaction=transaction, 

211 retry=retry, 

212 timeout=timeout, 

213 explain_options=explain_options, 

214 ) 

215 return AsyncStreamGenerator(inner_generator, explain_options)