1# Copyright 2020 Google LLC All rights reserved.
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7# http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14
15"""Classes for representing queries for the Google Cloud Firestore API.
16
17A :class:`~google.cloud.firestore_v1.query.Query` can be created directly from
18a :class:`~google.cloud.firestore_v1.collection.Collection` and that can be
19a more common way to create a query than direct usage of the constructor.
20"""
21from __future__ import annotations
22
23from typing import (
24 TYPE_CHECKING,
25 Any,
26 AsyncGenerator,
27 List,
28 Optional,
29 Type,
30 Union,
31 Sequence,
32)
33
34from google.api_core import gapic_v1
35from google.api_core import retry_async as retries
36
37from google.cloud import firestore_v1
38from google.cloud.firestore_v1.async_aggregation import AsyncAggregationQuery
39from google.cloud.firestore_v1.async_stream_generator import AsyncStreamGenerator
40from google.cloud.firestore_v1.async_vector_query import AsyncVectorQuery
41from google.cloud.firestore_v1.base_query import (
42 BaseCollectionGroup,
43 BaseQuery,
44 QueryPartition,
45 _collection_group_query_response_to_snapshot,
46 _enum_from_direction,
47 _query_response_to_snapshot,
48)
49from google.cloud.firestore_v1.query_results import QueryResultsList
50
51if TYPE_CHECKING: # pragma: NO COVER
52 import datetime
53
54 # Types needed only for Type Hints
55 from google.cloud.firestore_v1.async_transaction import AsyncTransaction
56 from google.cloud.firestore_v1.base_document import DocumentSnapshot
57 from google.cloud.firestore_v1.base_vector_query import DistanceMeasure
58 from google.cloud.firestore_v1.field_path import FieldPath
59 from google.cloud.firestore_v1.query_profile import ExplainMetrics, ExplainOptions
60 import google.cloud.firestore_v1.types.query_profile as query_profile_pb
61 from google.cloud.firestore_v1.vector import Vector
62
63
64class AsyncQuery(BaseQuery):
65 """Represents a query to the Firestore API.
66
67 Instances of this class are considered immutable: all methods that
68 would modify an instance instead return a new instance.
69
70 Args:
71 parent (:class:`~google.cloud.firestore_v1.collection.CollectionReference`):
72 The collection that this query applies to.
73 projection (Optional[:class:`google.cloud.firestore_v1.\
74 query.StructuredQuery.Projection`]):
75 A projection of document fields to limit the query results to.
76 field_filters (Optional[Tuple[:class:`google.cloud.firestore_v1.\
77 query.StructuredQuery.FieldFilter`, ...]]):
78 The filters to be applied in the query.
79 orders (Optional[Tuple[:class:`google.cloud.firestore_v1.\
80 query.StructuredQuery.Order`, ...]]):
81 The "order by" entries to use in the query.
82 limit (Optional[int]):
83 The maximum number of documents the query is allowed to return.
84 offset (Optional[int]):
85 The number of results to skip.
86 start_at (Optional[Tuple[dict, bool]]):
87 Two-tuple of :
88
89 * a mapping of fields. Any field that is present in this mapping
90 must also be present in ``orders``
91 * an ``after`` flag
92
93 The fields and the flag combine to form a cursor used as
94 a starting point in a query result set. If the ``after``
95 flag is :data:`True`, the results will start just after any
96 documents which have fields matching the cursor, otherwise
97 any matching documents will be included in the result set.
98 When the query is formed, the document values
99 will be used in the order given by ``orders``.
100 end_at (Optional[Tuple[dict, bool]]):
101 Two-tuple of:
102
103 * a mapping of fields. Any field that is present in this mapping
104 must also be present in ``orders``
105 * a ``before`` flag
106
107 The fields and the flag combine to form a cursor used as
108 an ending point in a query result set. If the ``before``
109 flag is :data:`True`, the results will end just before any
110 documents which have fields matching the cursor, otherwise
111 any matching documents will be included in the result set.
112 When the query is formed, the document values
113 will be used in the order given by ``orders``.
114 all_descendants (Optional[bool]):
115 When false, selects only collections that are immediate children
116 of the `parent` specified in the containing `RunQueryRequest`.
117 When true, selects all descendant collections.
118 recursive (Optional[bool]):
119 When true, returns all documents and all documents in any subcollections
120 below them. Defaults to false.
121 """
122
123 def __init__(
124 self,
125 parent,
126 projection=None,
127 field_filters=(),
128 orders=(),
129 limit=None,
130 limit_to_last=False,
131 offset=None,
132 start_at=None,
133 end_at=None,
134 all_descendants=False,
135 recursive=False,
136 ) -> None:
137 super(AsyncQuery, self).__init__(
138 parent=parent,
139 projection=projection,
140 field_filters=field_filters,
141 orders=orders,
142 limit=limit,
143 limit_to_last=limit_to_last,
144 offset=offset,
145 start_at=start_at,
146 end_at=end_at,
147 all_descendants=all_descendants,
148 recursive=recursive,
149 )
150
151 async def _chunkify(
152 self, chunk_size: int
153 ) -> AsyncGenerator[List[DocumentSnapshot], None]:
154 max_to_return: Optional[int] = self._limit
155 num_returned: int = 0
156 original: AsyncQuery = self._copy()
157 last_document: Optional[DocumentSnapshot] = None
158
159 while True:
160 # Optionally trim the `chunk_size` down to honor a previously
161 # applied limit as set by `self.limit()`
162 _chunk_size: int = original._resolve_chunk_size(num_returned, chunk_size)
163
164 # Apply the optionally pruned limit and the cursor, if we are past
165 # the first page.
166 _q = original.limit(_chunk_size)
167
168 if last_document:
169 _q = _q.start_after(last_document)
170
171 snapshots = await _q.get()
172
173 if snapshots:
174 last_document = snapshots[-1]
175
176 num_returned += len(snapshots)
177
178 yield snapshots
179
180 # Terminate the iterator if we have reached either of two end
181 # conditions:
182 # 1. There are no more documents, or
183 # 2. We have reached the desired overall limit
184 if len(snapshots) < _chunk_size or (
185 max_to_return and num_returned >= max_to_return
186 ):
187 return
188
189 async def get(
190 self,
191 transaction: Optional[AsyncTransaction] = None,
192 retry: retries.AsyncRetry | object | None = gapic_v1.method.DEFAULT,
193 timeout: Optional[float] = None,
194 *,
195 explain_options: Optional[ExplainOptions] = None,
196 read_time: Optional[datetime.datetime] = None,
197 ) -> QueryResultsList[DocumentSnapshot]:
198 """Read the documents in the collection that match this query.
199
200 This sends a ``RunQuery`` RPC and returns a list of documents
201 returned in the stream of ``RunQueryResponse`` messages.
202
203 Args:
204 transaction
205 (Optional[:class:`~google.cloud.firestore_v1.transaction.Transaction`]):
206 An existing transaction that this query will run in.
207 retry (Optional[google.api_core.retry.Retry]): Designation of what
208 errors, if any, should be retried. Defaults to a
209 system-specified policy.
210 timeout (Otional[float]): The timeout for this request. Defaults
211 to a system-specified value.
212 explain_options
213 (Optional[:class:`~google.cloud.firestore_v1.query_profile.ExplainOptions`]):
214 Options to enable query profiling for this query. When set,
215 explain_metrics will be available on the returned generator.
216 read_time (Optional[datetime.datetime]): If set, reads documents as they were at the given
217 time. This must be a microsecond precision timestamp within the past one hour, or
218 if Point-in-Time Recovery is enabled, can additionally be a whole minute timestamp
219 within the past 7 days. For the most accurate results, use UTC timezone.
220
221 If a ``transaction`` is used and it already has write operations
222 added, this method cannot be used (i.e. read-after-write is not
223 allowed).
224
225 Returns:
226 QueryResultsList[DocumentSnapshot]: The documents in the collection
227 that match this query.
228 """
229 explain_metrics: ExplainMetrics | None = None
230
231 is_limited_to_last = self._limit_to_last
232
233 if self._limit_to_last:
234 # In order to fetch up to `self._limit` results from the end of the
235 # query flip the defined ordering on the query to start from the
236 # end, retrieving up to `self._limit` results from the backend.
237 for order in self._orders:
238 order.direction = _enum_from_direction(
239 self.DESCENDING
240 if order.direction == self.ASCENDING
241 else self.ASCENDING
242 )
243 self._limit_to_last = False
244 result = self.stream(
245 transaction=transaction,
246 retry=retry,
247 timeout=timeout,
248 explain_options=explain_options,
249 read_time=read_time,
250 )
251 try:
252 result_list = [d async for d in result]
253 if is_limited_to_last:
254 result_list = list(reversed(result_list))
255
256 if explain_options is None:
257 explain_metrics = None
258 else:
259 explain_metrics = await result.get_explain_metrics()
260 finally:
261 await result.aclose()
262
263 return QueryResultsList(result_list, explain_options, explain_metrics)
264
265 def find_nearest(
266 self,
267 vector_field: str,
268 query_vector: Union[Vector, Sequence[float]],
269 limit: int,
270 distance_measure: DistanceMeasure,
271 *,
272 distance_result_field: Optional[str] = None,
273 distance_threshold: Optional[float] = None,
274 ) -> AsyncVectorQuery:
275 """
276 Finds the closest vector embeddings to the given query vector.
277
278 Args:
279 vector_field (str): An indexed vector field to search upon. Only documents which contain
280 vectors whose dimensionality match the query_vector can be returned.
281 query_vector (Vector | Sequence[float]): The query vector that we are searching on. Must be a vector of no more
282 than 2048 dimensions.
283 limit (int): The number of nearest neighbors to return. Must be a positive integer of no more than 1000.
284 distance_measure (:class:`DistanceMeasure`): The Distance Measure to use.
285 distance_result_field (Optional[str]):
286 Name of the field to output the result of the vector distance
287 calculation. If unset then the distance will not be returned.
288 distance_threshold (Optional[float]):
289 A threshold for which no less similar documents will be returned.
290
291 Returns:
292 :class`~firestore_v1.vector_query.VectorQuery`: the vector query.
293 """
294 return AsyncVectorQuery(self).find_nearest(
295 vector_field=vector_field,
296 query_vector=query_vector,
297 limit=limit,
298 distance_measure=distance_measure,
299 distance_result_field=distance_result_field,
300 distance_threshold=distance_threshold,
301 )
302
303 def count(
304 self, alias: str | None = None
305 ) -> Type["firestore_v1.async_aggregation.AsyncAggregationQuery"]:
306 """Adds a count over the nested query.
307
308 Args:
309 alias(Optional[str]): Optional name of the field to store the result of the aggregation into.
310 If not provided, Firestore will pick a default name following the format field_<incremental_id++>.
311
312 Returns:
313 :class:`~google.cloud.firestore_v1.async_aggregation.AsyncAggregationQuery`:
314 An instance of an AsyncAggregationQuery object
315 """
316 return AsyncAggregationQuery(self).count(alias=alias)
317
318 def sum(
319 self, field_ref: str | FieldPath, alias: str | None = None
320 ) -> Type["firestore_v1.async_aggregation.AsyncAggregationQuery"]:
321 """Adds a sum over the nested query.
322
323 Args:
324 field_ref(Union[str, google.cloud.firestore_v1.field_path.FieldPath]): The field to aggregate across.
325 alias(Optional[str]): Optional name of the field to store the result of the aggregation into.
326 If not provided, Firestore will pick a default name following the format field_<incremental_id++>.
327
328 Returns:
329 :class:`~google.cloud.firestore_v1.async_aggregation.AsyncAggregationQuery`:
330 An instance of an AsyncAggregationQuery object
331 """
332 return AsyncAggregationQuery(self).sum(field_ref, alias=alias)
333
334 def avg(
335 self, field_ref: str | FieldPath, alias: str | None = None
336 ) -> Type["firestore_v1.async_aggregation.AsyncAggregationQuery"]:
337 """Adds an avg over the nested query.
338
339 Args:
340 field_ref(Union[str, google.cloud.firestore_v1.field_path.FieldPath]): The field to aggregate across.
341 alias(Optional[str]): Optional name of the field to store the result of the aggregation into.
342 If not provided, Firestore will pick a default name following the format field_<incremental_id++>.
343
344 Returns:
345 :class:`~google.cloud.firestore_v1.async_aggregation.AsyncAggregationQuery`:
346 An instance of an AsyncAggregationQuery object
347 """
348 return AsyncAggregationQuery(self).avg(field_ref, alias=alias)
349
350 async def _make_stream(
351 self,
352 transaction: Optional[AsyncTransaction] = None,
353 retry: retries.AsyncRetry | object | None = gapic_v1.method.DEFAULT,
354 timeout: Optional[float] = None,
355 explain_options: Optional[ExplainOptions] = None,
356 read_time: Optional[datetime.datetime] = None,
357 ) -> AsyncGenerator[DocumentSnapshot | query_profile_pb.ExplainMetrics, Any]:
358 """Internal method for stream(). Read the documents in the collection
359 that match this query.
360
361 This sends a ``RunQuery`` RPC and then returns a generator which
362 consumes each document returned in the stream of ``RunQueryResponse``
363 messages.
364
365 .. note::
366
367 The underlying stream of responses will time out after
368 the ``max_rpc_timeout_millis`` value set in the GAPIC
369 client configuration for the ``RunQuery`` API. Snapshots
370 not consumed from the iterator before that point will be lost.
371
372 If a ``transaction`` is used and it already has write operations
373 added, this method cannot be used (i.e. read-after-write is not
374 allowed).
375
376 Args:
377 transaction (Optional[:class:`~google.cloud.firestore_v1.transaction.\
378 Transaction`]):
379 An existing transaction that the query will run in.
380 retry (Optional[google.api_core.retry.Retry]): Designation of what
381 errors, if any, should be retried. Defaults to a
382 system-specified policy.
383 timeout (Optional[float]): The timeout for this request. Defaults
384 to a system-specified value.
385 explain_options
386 (Optional[:class:`~google.cloud.firestore_v1.query_profile.ExplainOptions`]):
387 Options to enable query profiling for this query. When set,
388 explain_metrics will be available on the returned generator.
389 read_time (Optional[datetime.datetime]): If set, reads documents as they were at the given
390 time. This must be a microsecond precision timestamp within the past one hour, or
391 if Point-in-Time Recovery is enabled, can additionally be a whole minute timestamp
392 within the past 7 days. For the most accurate results, use UTC timezone.
393
394 Yields:
395 [:class:`~google.cloud.firestore_v1.base_document.DocumentSnapshot` \
396 | google.cloud.firestore_v1.types.query_profile.ExplainMetrtics]:
397 The next document that fulfills the query. Query results will be
398 yielded as `DocumentSnapshot`. When the result contains returned
399 explain metrics, yield `query_profile_pb.ExplainMetrics` individually.
400 """
401 request, expected_prefix, kwargs = self._prep_stream(
402 transaction,
403 retry,
404 timeout,
405 explain_options,
406 read_time,
407 )
408
409 response_iterator = await self._client._firestore_api.run_query(
410 request=request,
411 metadata=self._client._rpc_metadata,
412 **kwargs,
413 )
414
415 async for response in response_iterator:
416 if self._all_descendants:
417 snapshot = _collection_group_query_response_to_snapshot(
418 response, self._parent
419 )
420 else:
421 snapshot = _query_response_to_snapshot(
422 response, self._parent, expected_prefix
423 )
424 if snapshot is not None:
425 yield snapshot
426
427 if response.explain_metrics:
428 metrics = response.explain_metrics
429 yield metrics
430
431 def stream(
432 self,
433 transaction: Optional[AsyncTransaction] = None,
434 retry: retries.AsyncRetry | object | None = gapic_v1.method.DEFAULT,
435 timeout: Optional[float] = None,
436 *,
437 explain_options: Optional[ExplainOptions] = None,
438 read_time: Optional[datetime.datetime] = None,
439 ) -> AsyncStreamGenerator[DocumentSnapshot]:
440 """Read the documents in the collection that match this query.
441
442 This sends a ``RunQuery`` RPC and then returns a generator which
443 consumes each document returned in the stream of ``RunQueryResponse``
444 messages.
445
446 .. note::
447
448 The underlying stream of responses will time out after
449 the ``max_rpc_timeout_millis`` value set in the GAPIC
450 client configuration for the ``RunQuery`` API. Snapshots
451 not consumed from the iterator before that point will be lost.
452
453 If a ``transaction`` is used and it already has write operations
454 added, this method cannot be used (i.e. read-after-write is not
455 allowed).
456
457 Args:
458 transaction (Optional[:class:`~google.cloud.firestore_v1.transaction.\
459 Transaction`]):
460 An existing transaction that the query will run in.
461 retry (Optional[google.api_core.retry.Retry]): Designation of what
462 errors, if any, should be retried. Defaults to a
463 system-specified policy.
464 timeout (Optional[float]): The timeout for this request. Defaults
465 to a system-specified value.
466 explain_options
467 (Optional[:class:`~google.cloud.firestore_v1.query_profile.ExplainOptions`]):
468 Options to enable query profiling for this query. When set,
469 explain_metrics will be available on the returned generator.
470 read_time (Optional[datetime.datetime]): If set, reads documents as they were at the given
471 time. This must be a microsecond precision timestamp within the past one hour, or
472 if Point-in-Time Recovery is enabled, can additionally be a whole minute timestamp
473 within the past 7 days. For the most accurate results, use UTC timezone.
474
475 Returns:
476 `AsyncStreamGenerator[DocumentSnapshot]`:
477 An asynchronous generator of the queryresults.
478 """
479 inner_generator = self._make_stream(
480 transaction=transaction,
481 retry=retry,
482 timeout=timeout,
483 explain_options=explain_options,
484 read_time=read_time,
485 )
486 return AsyncStreamGenerator(inner_generator, explain_options)
487
488 @staticmethod
489 def _get_collection_reference_class() -> (
490 Type["firestore_v1.async_collection.AsyncCollectionReference"]
491 ):
492 from google.cloud.firestore_v1.async_collection import AsyncCollectionReference
493
494 return AsyncCollectionReference
495
496
497class AsyncCollectionGroup(AsyncQuery, BaseCollectionGroup):
498 """Represents a Collection Group in the Firestore API.
499
500 This is a specialization of :class:`.AsyncQuery` that includes all documents in the
501 database that are contained in a collection or subcollection of the given
502 parent.
503
504 Args:
505 parent (:class:`~google.cloud.firestore_v1.collection.CollectionReference`):
506 The collection that this query applies to.
507 """
508
509 def __init__(
510 self,
511 parent,
512 projection=None,
513 field_filters=(),
514 orders=(),
515 limit=None,
516 limit_to_last=False,
517 offset=None,
518 start_at=None,
519 end_at=None,
520 all_descendants=True,
521 recursive=False,
522 ) -> None:
523 super(AsyncCollectionGroup, self).__init__(
524 parent=parent,
525 projection=projection,
526 field_filters=field_filters,
527 orders=orders,
528 limit=limit,
529 limit_to_last=limit_to_last,
530 offset=offset,
531 start_at=start_at,
532 end_at=end_at,
533 all_descendants=all_descendants,
534 recursive=recursive,
535 )
536
537 @staticmethod
538 def _get_query_class():
539 return AsyncQuery
540
541 async def get_partitions(
542 self,
543 partition_count,
544 retry: retries.AsyncRetry | object | None = gapic_v1.method.DEFAULT,
545 timeout: float | None = None,
546 *,
547 read_time: Optional[datetime.datetime] = None,
548 ) -> AsyncGenerator[QueryPartition, None]:
549 """Partition a query for parallelization.
550
551 Partitions a query by returning partition cursors that can be used to run the
552 query in parallel. The returned partition cursors are split points that can be
553 used as starting/end points for the query results.
554
555 Args:
556 partition_count (int): The desired maximum number of partition points. The
557 number must be strictly positive. The actual number of partitions
558 returned may be fewer.
559 retry (google.api_core.retry.Retry): Designation of what errors, if any,
560 should be retried. Defaults to a system-specified policy.
561 timeout (float): The timeout for this request. Defaults to a
562 system-specified value.
563 read_time (Optional[datetime.datetime]): If set, reads documents as they were at the given
564 time. This must be a microsecond precision timestamp within the past one hour, or
565 if Point-in-Time Recovery is enabled, can additionally be a whole minute timestamp
566 within the past 7 days. For the most accurate results, use UTC timezone.
567 """
568 request, kwargs = self._prep_get_partitions(
569 partition_count, retry, timeout, read_time
570 )
571
572 pager = await self._client._firestore_api.partition_query(
573 request=request,
574 metadata=self._client._rpc_metadata,
575 **kwargs,
576 )
577
578 start_at = None
579 async for cursor_pb in pager:
580 cursor = self._client.document(cursor_pb.values[0].reference_value)
581 yield QueryPartition(self, start_at, cursor)
582 start_at = cursor
583
584 yield QueryPartition(self, start_at, None)