1# Copyright 2017 Google LLC All rights reserved.
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7# http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14
15"""Classes for representing queries for the Google Cloud Firestore API.
16
17A :class:`~google.cloud.firestore_v1.query.Query` can be created directly from
18a :class:`~google.cloud.firestore_v1.collection.Collection` and that can be
19a more common way to create a query than direct usage of the constructor.
20"""
21from __future__ import annotations
22
23from typing import (
24 TYPE_CHECKING,
25 Any,
26 Callable,
27 Generator,
28 List,
29 Optional,
30 Sequence,
31 Type,
32 Union,
33)
34
35from google.api_core import exceptions, gapic_v1
36from google.api_core import retry as retries
37
38from google.cloud import firestore_v1
39from google.cloud.firestore_v1 import aggregation, transaction
40from google.cloud.firestore_v1.query_results import QueryResultsList
41from google.cloud.firestore_v1.base_document import (
42 DocumentSnapshot,
43)
44from google.cloud.firestore_v1.base_query import (
45 BaseCollectionGroup,
46 BaseQuery,
47 QueryPartition,
48 _collection_group_query_response_to_snapshot,
49 _enum_from_direction,
50 _query_response_to_snapshot,
51)
52from google.cloud.firestore_v1.stream_generator import StreamGenerator
53from google.cloud.firestore_v1.vector import Vector
54from google.cloud.firestore_v1.vector_query import VectorQuery
55from google.cloud.firestore_v1.watch import Watch
56
57if TYPE_CHECKING: # pragma: NO COVER
58 from google.cloud.firestore_v1.base_vector_query import DistanceMeasure
59 from google.cloud.firestore_v1.field_path import FieldPath
60 from google.cloud.firestore_v1.query_profile import ExplainMetrics, ExplainOptions
61
62 import datetime
63
64
65class Query(BaseQuery):
66 """Represents a query to the Firestore API.
67
68 Instances of this class are considered immutable: all methods that
69 would modify an instance instead return a new instance.
70
71 Args:
72 parent (:class:`~google.cloud.firestore_v1.collection.CollectionReference`):
73 The collection that this query applies to.
74 projection (Optional[:class:`google.cloud.firestore_v1.\
75 query.StructuredQuery.Projection`]):
76 A projection of document fields to limit the query results to.
77 field_filters (Optional[Tuple[:class:`google.cloud.firestore_v1.\
78 query.StructuredQuery.FieldFilter`, ...]]):
79 The filters to be applied in the query.
80 orders (Optional[Tuple[:class:`google.cloud.firestore_v1.\
81 query.StructuredQuery.Order`, ...]]):
82 The "order by" entries to use in the query.
83 limit (Optional[int]):
84 The maximum number of documents the query is allowed to return.
85 offset (Optional[int]):
86 The number of results to skip.
87 start_at (Optional[Tuple[dict, bool]]):
88 Two-tuple of :
89
90 * a mapping of fields. Any field that is present in this mapping
91 must also be present in ``orders``
92 * an ``after`` flag
93
94 The fields and the flag combine to form a cursor used as
95 a starting point in a query result set. If the ``after``
96 flag is :data:`True`, the results will start just after any
97 documents which have fields matching the cursor, otherwise
98 any matching documents will be included in the result set.
99 When the query is formed, the document values
100 will be used in the order given by ``orders``.
101 end_at (Optional[Tuple[dict, bool]]):
102 Two-tuple of:
103
104 * a mapping of fields. Any field that is present in this mapping
105 must also be present in ``orders``
106 * a ``before`` flag
107
108 The fields and the flag combine to form a cursor used as
109 an ending point in a query result set. If the ``before``
110 flag is :data:`True`, the results will end just before any
111 documents which have fields matching the cursor, otherwise
112 any matching documents will be included in the result set.
113 When the query is formed, the document values
114 will be used in the order given by ``orders``.
115 all_descendants (Optional[bool]):
116 When false, selects only collections that are immediate children
117 of the `parent` specified in the containing `RunQueryRequest`.
118 When true, selects all descendant collections.
119 """
120
121 def __init__(
122 self,
123 parent,
124 projection=None,
125 field_filters=(),
126 orders=(),
127 limit=None,
128 limit_to_last=False,
129 offset=None,
130 start_at=None,
131 end_at=None,
132 all_descendants=False,
133 recursive=False,
134 ) -> None:
135 super(Query, self).__init__(
136 parent=parent,
137 projection=projection,
138 field_filters=field_filters,
139 orders=orders,
140 limit=limit,
141 limit_to_last=limit_to_last,
142 offset=offset,
143 start_at=start_at,
144 end_at=end_at,
145 all_descendants=all_descendants,
146 recursive=recursive,
147 )
148
149 def get(
150 self,
151 transaction=None,
152 retry: retries.Retry | object | None = gapic_v1.method.DEFAULT,
153 timeout: Optional[float] = None,
154 *,
155 explain_options: Optional[ExplainOptions] = None,
156 read_time: Optional[datetime.datetime] = None,
157 ) -> QueryResultsList[DocumentSnapshot]:
158 """Read the documents in the collection that match this query.
159
160 This sends a ``RunQuery`` RPC and returns a list of documents
161 returned in the stream of ``RunQueryResponse`` messages.
162
163 Args:
164 transaction
165 (Optional[:class:`~google.cloud.firestore_v1.transaction.Transaction`]):
166 An existing transaction that this query will run in.
167 If a ``transaction`` is used and it already has write operations
168 added, this method cannot be used (i.e. read-after-write is not
169 allowed).
170 retry (google.api_core.retry.Retry): Designation of what errors, if any,
171 should be retried. Defaults to a system-specified policy.
172 timeout (float): The timeout for this request. Defaults to a
173 system-specified value.
174 explain_options
175 (Optional[:class:`~google.cloud.firestore_v1.query_profile.ExplainOptions`]):
176 Options to enable query profiling for this query. When set,
177 explain_metrics will be available on the returned generator.
178 read_time (Optional[datetime.datetime]): If set, reads documents as they were at the given
179 time. This must be a microsecond precision timestamp within the past one hour, or
180 if Point-in-Time Recovery is enabled, can additionally be a whole minute timestamp
181 within the past 7 days. For the most accurate results, use UTC timezone.
182
183 Returns:
184 QueryResultsList[DocumentSnapshot]: The documents in the collection
185 that match this query.
186 """
187 explain_metrics: ExplainMetrics | None = None
188
189 is_limited_to_last = self._limit_to_last
190
191 if self._limit_to_last:
192 # In order to fetch up to `self._limit` results from the end of the
193 # query flip the defined ordering on the query to start from the
194 # end, retrieving up to `self._limit` results from the backend.
195 for order in self._orders:
196 order.direction = _enum_from_direction(
197 self.DESCENDING
198 if order.direction.name == self.ASCENDING
199 else self.ASCENDING
200 )
201 self._limit_to_last = False
202
203 result = self.stream(
204 transaction=transaction,
205 retry=retry,
206 timeout=timeout,
207 explain_options=explain_options,
208 read_time=read_time,
209 )
210 result_list = list(result)
211 if is_limited_to_last:
212 result_list = list(reversed(result_list))
213
214 if explain_options is None:
215 explain_metrics = None
216 else:
217 explain_metrics = result.get_explain_metrics()
218
219 return QueryResultsList(result_list, explain_options, explain_metrics)
220
221 def _chunkify(
222 self, chunk_size: int
223 ) -> Generator[List[DocumentSnapshot], None, None]:
224 max_to_return: Optional[int] = self._limit
225 num_returned: int = 0
226 original: Query = self._copy()
227 last_document: Optional[DocumentSnapshot] = None
228
229 while True:
230 # Optionally trim the `chunk_size` down to honor a previously
231 # applied limits as set by `self.limit()`
232 _chunk_size: int = original._resolve_chunk_size(num_returned, chunk_size)
233
234 # Apply the optionally pruned limit and the cursor, if we are past
235 # the first page.
236 _q = original.limit(_chunk_size)
237
238 if last_document:
239 _q = _q.start_after(last_document)
240
241 snapshots = _q.get()
242
243 if snapshots:
244 last_document = snapshots[-1]
245
246 num_returned += len(snapshots)
247
248 yield snapshots
249
250 # Terminate the iterator if we have reached either of two end
251 # conditions:
252 # 1. There are no more documents, or
253 # 2. We have reached the desired overall limit
254 if len(snapshots) < _chunk_size or (
255 max_to_return and num_returned >= max_to_return
256 ):
257 return
258
259 def _get_stream_iterator(
260 self, transaction, retry, timeout, explain_options=None, read_time=None
261 ):
262 """Helper method for :meth:`stream`."""
263 request, expected_prefix, kwargs = self._prep_stream(
264 transaction, retry, timeout, explain_options, read_time
265 )
266
267 response_iterator = self._client._firestore_api.run_query(
268 request=request,
269 metadata=self._client._rpc_metadata,
270 **kwargs,
271 )
272
273 return response_iterator, expected_prefix
274
275 def _retry_query_after_exception(self, exc, retry, transaction):
276 """Helper method for :meth:`stream`."""
277 if transaction is None: # no snapshot-based retry inside transaction
278 if retry is gapic_v1.method.DEFAULT:
279 transport = self._client._firestore_api._transport
280 gapic_callable = transport.run_query
281 retry = gapic_callable._retry
282 return retry._predicate(exc)
283
284 return False
285
286 def find_nearest(
287 self,
288 vector_field: str,
289 query_vector: Union[Vector, Sequence[float]],
290 limit: int,
291 distance_measure: DistanceMeasure,
292 *,
293 distance_result_field: Optional[str] = None,
294 distance_threshold: Optional[float] = None,
295 ) -> Type["firestore_v1.vector_query.VectorQuery"]:
296 """
297 Finds the closest vector embeddings to the given query vector.
298
299 Args:
300 vector_field (str): An indexed vector field to search upon. Only documents which contain
301 vectors whose dimensionality match the query_vector can be returned.
302 query_vector(Vector | Sequence[float]): The query vector that we are searching on. Must be a vector of no more
303 than 2048 dimensions.
304 limit (int): The number of nearest neighbors to return. Must be a positive integer of no more than 1000.
305 distance_measure (:class:`DistanceMeasure`): The Distance Measure to use.
306 distance_result_field (Optional[str]):
307 Name of the field to output the result of the vector distance
308 calculation. If unset then the distance will not be returned.
309 distance_threshold (Optional[float]):
310 A threshold for which no less similar documents will be returned.
311
312
313 Returns:
314 :class`~firestore_v1.vector_query.VectorQuery`: the vector query.
315 """
316 return VectorQuery(self).find_nearest(
317 vector_field=vector_field,
318 query_vector=query_vector,
319 limit=limit,
320 distance_measure=distance_measure,
321 distance_result_field=distance_result_field,
322 distance_threshold=distance_threshold,
323 )
324
325 def count(
326 self, alias: str | None = None
327 ) -> Type["firestore_v1.aggregation.AggregationQuery"]:
328 """
329 Adds a count over the query.
330
331 :type alias: Optional[str]
332 :param alias: Optional name of the field to store the result of the aggregation into.
333 If not provided, Firestore will pick a default name following the format field_<incremental_id++>.
334 """
335 return aggregation.AggregationQuery(self).count(alias=alias)
336
337 def sum(
338 self, field_ref: str | FieldPath, alias: str | None = None
339 ) -> Type["firestore_v1.aggregation.AggregationQuery"]:
340 """
341 Adds a sum over the query.
342
343 :type field_ref: Union[str, google.cloud.firestore_v1.field_path.FieldPath]
344 :param field_ref: The field to aggregate across.
345
346 :type alias: Optional[str]
347 :param alias: Optional name of the field to store the result of the aggregation into.
348 If not provided, Firestore will pick a default name following the format field_<incremental_id++>.
349 """
350 return aggregation.AggregationQuery(self).sum(field_ref, alias=alias)
351
352 def avg(
353 self, field_ref: str | FieldPath, alias: str | None = None
354 ) -> Type["firestore_v1.aggregation.AggregationQuery"]:
355 """
356 Adds an avg over the query.
357
358 :type field_ref: [Union[str, google.cloud.firestore_v1.field_path.FieldPath]
359 :param field_ref: The field to aggregate across.
360
361 :type alias: Optional[str]
362 :param alias: Optional name of the field to store the result of the aggregation into.
363 If not provided, Firestore will pick a default name following the format field_<incremental_id++>.
364 """
365 return aggregation.AggregationQuery(self).avg(field_ref, alias=alias)
366
367 def _make_stream(
368 self,
369 transaction: Optional[transaction.Transaction] = None,
370 retry: retries.Retry | object | None = gapic_v1.method.DEFAULT,
371 timeout: float | None = None,
372 explain_options: Optional[ExplainOptions] = None,
373 read_time: Optional[datetime.datetime] = None,
374 ) -> Generator[DocumentSnapshot, Any, Optional[ExplainMetrics]]:
375 """Internal method for stream(). Read the documents in the collection
376 that match this query.
377
378 Internal method for stream().
379 This sends a ``RunQuery`` RPC and then returns a generator which
380 consumes each document returned in the stream of ``RunQueryResponse``
381 messages.
382
383 .. note::
384
385 The underlying stream of responses will time out after
386 the ``max_rpc_timeout_millis`` value set in the GAPIC
387 client configuration for the ``RunQuery`` API. Snapshots
388 not consumed from the iterator before that point will be lost.
389
390 If a ``transaction`` is used and it already has write operations
391 added, this method cannot be used (i.e. read-after-write is not
392 allowed).
393
394 Args:
395 transaction (Optional[:class:`~google.cloud.firestore_v1.transaction.\
396 Transaction`]):
397 An existing transaction that the query will run in.
398 retry (Optional[google.api_core.retry.Retry]): Designation of what
399 errors, if any, should be retried. Defaults to a
400 system-specified policy.
401 timeout (Optional[float]): The timeout for this request. Defaults
402 to a system-specified value.
403 explain_options
404 (Optional[:class:`~google.cloud.firestore_v1.query_profile.ExplainOptions`]):
405 Options to enable query profiling for this query. When set,
406 explain_metrics will be available on the returned generator.
407 read_time (Optional[datetime.datetime]): If set, reads documents as they were at the given
408 time. This must be a microsecond precision timestamp within the past one hour, or
409 if Point-in-Time Recovery is enabled, can additionally be a whole minute timestamp
410 within the past 7 days. For the most accurate results, use UTC timezone.
411
412 Yields:
413 DocumentSnapshot:
414 The next document that fulfills the query.
415
416 Returns:
417 ([google.cloud.firestore_v1.types.query_profile.ExplainMetrtics | None]):
418 The results of query profiling, if received from the service.
419 """
420 metrics: ExplainMetrics | None = None
421
422 response_iterator, expected_prefix = self._get_stream_iterator(
423 transaction,
424 retry,
425 timeout,
426 explain_options,
427 read_time,
428 )
429
430 last_snapshot = None
431
432 while True:
433 try:
434 response = next(response_iterator, None)
435 except exceptions.GoogleAPICallError as exc:
436 if self._retry_query_after_exception(exc, retry, transaction):
437 new_query = self.start_after(last_snapshot)
438 response_iterator, _ = new_query._get_stream_iterator(
439 transaction,
440 retry,
441 timeout,
442 read_time=read_time,
443 )
444 continue
445 else:
446 raise
447
448 if response is None: # EOI
449 break
450
451 if metrics is None and response.explain_metrics:
452 metrics = response.explain_metrics
453
454 if self._all_descendants:
455 snapshot = _collection_group_query_response_to_snapshot(
456 response, self._parent
457 )
458 else:
459 snapshot = _query_response_to_snapshot(
460 response, self._parent, expected_prefix
461 )
462 if snapshot is not None:
463 last_snapshot = snapshot
464 yield snapshot
465
466 return metrics
467
468 def stream(
469 self,
470 transaction: transaction.Transaction | None = None,
471 retry: retries.Retry | object | None = gapic_v1.method.DEFAULT,
472 timeout: float | None = None,
473 *,
474 explain_options: Optional[ExplainOptions] = None,
475 read_time: Optional[datetime.datetime] = None,
476 ) -> StreamGenerator[DocumentSnapshot]:
477 """Read the documents in the collection that match this query.
478
479 This sends a ``RunQuery`` RPC and then returns a generator which
480 consumes each document returned in the stream of ``RunQueryResponse``
481 messages.
482
483 .. note::
484
485 The underlying stream of responses will time out after
486 the ``max_rpc_timeout_millis`` value set in the GAPIC
487 client configuration for the ``RunQuery`` API. Snapshots
488 not consumed from the iterator before that point will be lost.
489
490 If a ``transaction`` is used and it already has write operations
491 added, this method cannot be used (i.e. read-after-write is not
492 allowed).
493
494 Args:
495 transaction
496 (Optional[:class:`~google.cloud.firestore_v1.transaction.Transaction`]):
497 An existing transaction that this query will run in.
498 retry (Optional[google.api_core.retry.Retry]): Designation of what
499 errors, if any, should be retried. Defaults to a
500 system-specified policy.
501 timeout (Optinal[float]): The timeout for this request. Defaults
502 to a system-specified value.
503 explain_options
504 (Optional[:class:`~google.cloud.firestore_v1.query_profile.ExplainOptions`]):
505 Options to enable query profiling for this query. When set,
506 explain_metrics will be available on the returned generator.
507 read_time (Optional[datetime.datetime]): If set, reads documents as they were at the given
508 time. This must be a microsecond precision timestamp within the past one hour, or
509 if Point-in-Time Recovery is enabled, can additionally be a whole minute timestamp
510 within the past 7 days. For the most accurate results, use UTC timezone.
511
512 Returns:
513 `StreamGenerator[DocumentSnapshot]`: A generator of the query results.
514 """
515 inner_generator = self._make_stream(
516 transaction=transaction,
517 retry=retry,
518 timeout=timeout,
519 explain_options=explain_options,
520 read_time=read_time,
521 )
522 return StreamGenerator(inner_generator, explain_options)
523
524 def on_snapshot(self, callback: Callable) -> Watch:
525 """Monitor the documents in this collection that match this query.
526
527 This starts a watch on this query using a background thread. The
528 provided callback is run on the snapshot of the documents.
529
530 Args:
531 callback(Callable[[:class:`~google.cloud.firestore.query.QuerySnapshot`], NoneType]):
532 a callback to run when a change occurs.
533
534 Example:
535
536 .. code-block:: python
537
538 from google.cloud import firestore_v1
539
540 db = firestore_v1.Client()
541 query_ref = db.collection(u'users').where("user", "==", u'Ada')
542
543 def on_snapshot(docs, changes, read_time):
544 for doc in docs:
545 print(u'{} => {}'.format(doc.id, doc.to_dict()))
546
547 # Watch this query
548 query_watch = query_ref.on_snapshot(on_snapshot)
549
550 # Terminate this watch
551 query_watch.unsubscribe()
552 """
553 return Watch.for_query(self, callback, DocumentSnapshot)
554
555 @staticmethod
556 def _get_collection_reference_class() -> (
557 Type["firestore_v1.collection.CollectionReference"]
558 ):
559 from google.cloud.firestore_v1.collection import CollectionReference
560
561 return CollectionReference
562
563
564class CollectionGroup(Query, BaseCollectionGroup):
565 """Represents a Collection Group in the Firestore API.
566
567 This is a specialization of :class:`.Query` that includes all documents in the
568 database that are contained in a collection or subcollection of the given
569 parent.
570
571 Args:
572 parent (:class:`~google.cloud.firestore_v1.collection.CollectionReference`):
573 The collection that this query applies to.
574 """
575
576 def __init__(
577 self,
578 parent,
579 projection=None,
580 field_filters=(),
581 orders=(),
582 limit=None,
583 limit_to_last=False,
584 offset=None,
585 start_at=None,
586 end_at=None,
587 all_descendants=True,
588 recursive=False,
589 ) -> None:
590 super(CollectionGroup, self).__init__(
591 parent=parent,
592 projection=projection,
593 field_filters=field_filters,
594 orders=orders,
595 limit=limit,
596 limit_to_last=limit_to_last,
597 offset=offset,
598 start_at=start_at,
599 end_at=end_at,
600 all_descendants=all_descendants,
601 recursive=recursive,
602 )
603
604 @staticmethod
605 def _get_query_class():
606 return Query
607
608 def get_partitions(
609 self,
610 partition_count,
611 retry: retries.Retry | object | None = gapic_v1.method.DEFAULT,
612 timeout: float | None = None,
613 *,
614 read_time: Optional[datetime.datetime] = None,
615 ) -> Generator[QueryPartition, None, None]:
616 """Partition a query for parallelization.
617
618 Partitions a query by returning partition cursors that can be used to run the
619 query in parallel. The returned partition cursors are split points that can be
620 used as starting/end points for the query results.
621
622 Args:
623 partition_count (int): The desired maximum number of partition points. The
624 number must be strictly positive. The actual number of partitions
625 returned may be fewer.
626 retry (google.api_core.retry.Retry): Designation of what errors, if any,
627 should be retried. Defaults to a system-specified policy.
628 timeout (float): The timeout for this request. Defaults to a
629 system-specified value.
630 read_time (Optional[datetime.datetime]): If set, reads documents as they were at the given
631 time. This must be a microsecond precision timestamp within the past one hour, or
632 if Point-in-Time Recovery is enabled, can additionally be a whole minute timestamp
633 within the past 7 days. For the most accurate results, use UTC timezone.
634 """
635 request, kwargs = self._prep_get_partitions(
636 partition_count, retry, timeout, read_time
637 )
638
639 pager = self._client._firestore_api.partition_query(
640 request=request,
641 metadata=self._client._rpc_metadata,
642 **kwargs,
643 )
644
645 start_at = None
646 for cursor_pb in pager:
647 cursor = self._client.document(cursor_pb.values[0].reference_value)
648 yield QueryPartition(self, start_at, cursor)
649 start_at = cursor
650
651 yield QueryPartition(self, start_at, None)