1# Copyright 2017 Google LLC All rights reserved.
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7# http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14
15"""Classes for representing queries for the Google Cloud Firestore API.
16
17A :class:`~google.cloud.firestore_v1.query.Query` can be created directly from
18a :class:`~google.cloud.firestore_v1.collection.Collection` and that can be
19a more common way to create a query than direct usage of the constructor.
20"""
21from __future__ import annotations
22
23import abc
24import copy
25import math
26import warnings
27
28from typing import (
29 TYPE_CHECKING,
30 Any,
31 Coroutine,
32 Dict,
33 Iterable,
34 List,
35 Optional,
36 Sequence,
37 Tuple,
38 Type,
39 Union,
40 TypeVar,
41)
42
43from google.api_core import retry as retries
44from google.protobuf import wrappers_pb2
45
46from google.cloud import firestore_v1
47from google.cloud.firestore_v1 import _helpers, document
48from google.cloud.firestore_v1 import field_path as field_path_module
49from google.cloud.firestore_v1 import transforms
50
51# Types needed only for Type Hints
52from google.cloud.firestore_v1.base_document import DocumentSnapshot
53from google.cloud.firestore_v1.base_vector_query import DistanceMeasure
54from google.cloud.firestore_v1.order import Order
55from google.cloud.firestore_v1.types import (
56 Cursor,
57 RunQueryResponse,
58 StructuredQuery,
59 query,
60)
61from google.cloud.firestore_v1.vector import Vector
62from google.cloud.firestore_v1 import pipeline_expressions
63
64if TYPE_CHECKING: # pragma: NO COVER
65 from google.cloud.firestore_v1.async_stream_generator import AsyncStreamGenerator
66 from google.cloud.firestore_v1.field_path import FieldPath
67 from google.cloud.firestore_v1.query_profile import ExplainOptions
68 from google.cloud.firestore_v1.query_results import QueryResultsList
69 from google.cloud.firestore_v1.stream_generator import StreamGenerator
70 from google.cloud.firestore_v1.pipeline_source import PipelineSource
71
72 import datetime
73
74
75_BAD_DIR_STRING: str
76_BAD_OP_NAN: str
77_BAD_OP_NULL: str
78_BAD_OP_STRING: str
79_COMPARISON_OPERATORS: Dict[str, Any]
80_EQ_OP: str
81_NEQ_OP: str
82_INVALID_CURSOR_TRANSFORM: str
83_INVALID_WHERE_TRANSFORM: str
84_MISMATCH_CURSOR_W_ORDER_BY: str
85_MISSING_ORDER_BY: str
86_NO_ORDERS_FOR_CURSOR: str
87_operator_enum: Any
88
89
90_EQ_OP = "=="
91_NEQ_OP = "!="
92_operator_enum = StructuredQuery.FieldFilter.Operator
93_COMPARISON_OPERATORS = {
94 "<": _operator_enum.LESS_THAN,
95 "<=": _operator_enum.LESS_THAN_OR_EQUAL,
96 _EQ_OP: _operator_enum.EQUAL,
97 _NEQ_OP: _operator_enum.NOT_EQUAL,
98 ">=": _operator_enum.GREATER_THAN_OR_EQUAL,
99 ">": _operator_enum.GREATER_THAN,
100 "array_contains": _operator_enum.ARRAY_CONTAINS,
101 "in": _operator_enum.IN,
102 "not-in": _operator_enum.NOT_IN,
103 "array_contains_any": _operator_enum.ARRAY_CONTAINS_ANY,
104}
105# set of operators that don't involve equlity comparisons
106# will be used in query normalization
107_INEQUALITY_OPERATORS = (
108 _operator_enum.LESS_THAN,
109 _operator_enum.LESS_THAN_OR_EQUAL,
110 _operator_enum.GREATER_THAN_OR_EQUAL,
111 _operator_enum.GREATER_THAN,
112 _operator_enum.NOT_EQUAL,
113 _operator_enum.NOT_IN,
114)
115_BAD_OP_STRING = "Operator string {!r} is invalid. Valid choices are: {}."
116_BAD_OP_NAN_NULL = 'Only equality ("==") or not-equal ("!=") filters can be used with None or NaN values'
117_INVALID_WHERE_TRANSFORM = "Transforms cannot be used as where values."
118_BAD_DIR_STRING = "Invalid direction {!r}. Must be one of {!r} or {!r}."
119_INVALID_CURSOR_TRANSFORM = "Transforms cannot be used as cursor values."
120_MISSING_ORDER_BY = (
121 'The "order by" field path {!r} is not present in the cursor data {!r}. '
122 "All fields sent to ``order_by()`` must be present in the fields "
123 "if passed to one of ``start_at()`` / ``start_after()`` / "
124 "``end_before()`` / ``end_at()`` to define a cursor."
125)
126
127_NO_ORDERS_FOR_CURSOR = (
128 "Attempting to create a cursor with no fields to order on. "
129 "When defining a cursor with one of ``start_at()`` / ``start_after()`` / "
130 "``end_before()`` / ``end_at()``, all fields in the cursor must "
131 "come from fields set in ``order_by()``."
132)
133_MISMATCH_CURSOR_W_ORDER_BY = "The cursor {!r} does not match the order fields {!r}."
134
135_not_passed = object()
136
137QueryType = TypeVar("QueryType", bound="BaseQuery")
138
139
140class BaseFilter(abc.ABC):
141 """Base class for Filters"""
142
143 @abc.abstractmethod
144 def _to_pb(self):
145 """Build the protobuf representation based on values in the filter"""
146
147
148def _validate_opation(op_string, value):
149 """
150 Given an input operator string (e.g, '!='), and a value (e.g. None),
151 ensure that the operator and value combination is valid, and return
152 an approproate new operator value. A new operator will be used if
153 the operaion is a comparison against Null or NaN
154
155 Args:
156 op_string (Optional[str]): the requested operator
157 value (Any): the value the operator is acting on
158 Returns:
159 str | StructuredQuery.UnaryFilter.Operator: operator to use in requests
160 Raises:
161 ValueError: if the operator and value combination is invalid
162 """
163 if value is None:
164 if op_string == _EQ_OP:
165 return StructuredQuery.UnaryFilter.Operator.IS_NULL
166 elif op_string == _NEQ_OP:
167 return StructuredQuery.UnaryFilter.Operator.IS_NOT_NULL
168 else:
169 raise ValueError(_BAD_OP_NAN_NULL)
170
171 elif _isnan(value):
172 if op_string == _EQ_OP:
173 return StructuredQuery.UnaryFilter.Operator.IS_NAN
174 elif op_string == _NEQ_OP:
175 return StructuredQuery.UnaryFilter.Operator.IS_NOT_NAN
176 else:
177 raise ValueError(_BAD_OP_NAN_NULL)
178 elif isinstance(value, (transforms.Sentinel, transforms._ValueList)):
179 raise ValueError(_INVALID_WHERE_TRANSFORM)
180 else:
181 return op_string
182
183
184class FieldFilter(BaseFilter):
185 """Class representation of a Field Filter."""
186
187 def __init__(self, field_path: str, op_string: str, value: Any | None = None):
188 self.field_path = field_path
189 self.value = value
190 self.op_string = _validate_opation(op_string, value)
191
192 def _to_pb(self):
193 """Returns the protobuf representation, either a StructuredQuery.UnaryFilter or a StructuredQuery.FieldFilter"""
194 if self.value is None or _isnan(self.value):
195 filter_pb = query.StructuredQuery.UnaryFilter(
196 field=query.StructuredQuery.FieldReference(field_path=self.field_path),
197 op=self.op_string,
198 )
199 else:
200 filter_pb = query.StructuredQuery.FieldFilter(
201 field=query.StructuredQuery.FieldReference(field_path=self.field_path),
202 op=_enum_from_op_string(self.op_string),
203 value=_helpers.encode_value(self.value),
204 )
205 return filter_pb
206
207
208class BaseCompositeFilter(BaseFilter):
209 """Base class for a Composite Filter. (either OR or AND)."""
210
211 def __init__(
212 self,
213 operator: int = StructuredQuery.CompositeFilter.Operator.OPERATOR_UNSPECIFIED,
214 filters: list[BaseFilter] | None = None,
215 ):
216 self.operator = operator
217 if filters is None:
218 self.filters = []
219 else:
220 self.filters = filters
221
222 def __repr__(self):
223 repr = f"op: {self.operator}\nFilters:"
224 for filter in self.filters:
225 repr += f"\n\t{filter}"
226 return repr
227
228 def _to_pb(self):
229 """Build the protobuf representation based on values in the Composite Filter."""
230 filter_pb = StructuredQuery.CompositeFilter(
231 op=self.operator,
232 )
233 for filter in self.filters:
234 if isinstance(filter, BaseCompositeFilter):
235 fb = query.StructuredQuery.Filter(composite_filter=filter._to_pb())
236 else:
237 fb = _filter_pb(filter._to_pb())
238 filter_pb.filters.append(fb)
239
240 return filter_pb
241
242
243class Or(BaseCompositeFilter):
244 """Class representation of an OR Filter."""
245
246 def __init__(self, filters: list[BaseFilter]):
247 super().__init__(
248 operator=StructuredQuery.CompositeFilter.Operator.OR, filters=filters
249 )
250
251
252class And(BaseCompositeFilter):
253 """Class representation of an AND Filter."""
254
255 def __init__(self, filters: list[BaseFilter]):
256 super().__init__(
257 operator=StructuredQuery.CompositeFilter.Operator.AND, filters=filters
258 )
259
260
261class BaseQuery(object):
262 """Represents a query to the Firestore API.
263
264 Instances of this class are considered immutable: all methods that
265 would modify an instance instead return a new instance.
266
267 Args:
268 parent (:class:`~google.cloud.firestore_v1.collection.CollectionReference`):
269 The collection that this query applies to.
270 projection (Optional[:class:`google.cloud.firestore_v1.\
271 query.StructuredQuery.Projection`]):
272 A projection of document fields to limit the query results to.
273 field_filters (Optional[Tuple[:class:`google.cloud.firestore_v1.\
274 query.StructuredQuery.FieldFilter`, ...]]):
275 The filters to be applied in the query.
276 orders (Optional[Tuple[:class:`google.cloud.firestore_v1.\
277 query.StructuredQuery.Order`, ...]]):
278 The "order by" entries to use in the query.
279 limit (Optional[int]):
280 The maximum number of documents the query is allowed to return.
281 limit_to_last (Optional[bool]):
282 Denotes whether a provided limit is applied to the end of the result set.
283 offset (Optional[int]):
284 The number of results to skip.
285 start_at (Optional[Tuple[dict, bool]]):
286 Two-tuple of :
287
288 * a mapping of fields. Any field that is present in this mapping
289 must also be present in ``orders``
290 * an ``after`` flag
291
292 The fields and the flag combine to form a cursor used as
293 a starting point in a query result set. If the ``after``
294 flag is :data:`True`, the results will start just after any
295 documents which have fields matching the cursor, otherwise
296 any matching documents will be included in the result set.
297 When the query is formed, the document values
298 will be used in the order given by ``orders``.
299 end_at (Optional[Tuple[dict, bool]]):
300 Two-tuple of:
301
302 * a mapping of fields. Any field that is present in this mapping
303 must also be present in ``orders``
304 * a ``before`` flag
305
306 The fields and the flag combine to form a cursor used as
307 an ending point in a query result set. If the ``before``
308 flag is :data:`True`, the results will end just before any
309 documents which have fields matching the cursor, otherwise
310 any matching documents will be included in the result set.
311 When the query is formed, the document values
312 will be used in the order given by ``orders``.
313 all_descendants (Optional[bool]):
314 When false, selects only collections that are immediate children
315 of the `parent` specified in the containing `RunQueryRequest`.
316 When true, selects all descendant collections.
317 recursive (Optional[bool]):
318 When true, returns all documents and all documents in any subcollections
319 below them. Defaults to false.
320 """
321
322 ASCENDING = "ASCENDING"
323 """str: Sort query results in ascending order on a field."""
324 DESCENDING = "DESCENDING"
325 """str: Sort query results in descending order on a field."""
326
327 def __init__(
328 self,
329 parent,
330 projection=None,
331 field_filters=(),
332 orders=(),
333 limit=None,
334 limit_to_last=False,
335 offset=None,
336 start_at=None,
337 end_at=None,
338 all_descendants=False,
339 recursive=False,
340 ) -> None:
341 self._parent = parent
342 self._projection = projection
343 self._field_filters = field_filters
344 self._orders = orders
345 self._limit = limit
346 self._limit_to_last = limit_to_last
347 self._offset = offset
348 self._start_at = start_at
349 self._end_at = end_at
350 self._all_descendants = all_descendants
351 self._recursive = recursive
352
353 def __eq__(self, other):
354 if not isinstance(other, self.__class__):
355 return NotImplemented
356 return (
357 self._parent == other._parent
358 and self._projection == other._projection
359 and self._field_filters == other._field_filters
360 and self._orders == other._orders
361 and self._limit == other._limit
362 and self._limit_to_last == other._limit_to_last
363 and self._offset == other._offset
364 and self._start_at == other._start_at
365 and self._end_at == other._end_at
366 and self._all_descendants == other._all_descendants
367 )
368
369 @property
370 def _client(self):
371 """The client of the parent collection.
372
373 Returns:
374 :class:`~google.cloud.firestore_v1.client.Client`:
375 The client that owns this query.
376 """
377 return self._parent._client
378
379 def select(self: QueryType, field_paths: Iterable[str]) -> QueryType:
380 """Project documents matching query to a limited set of fields.
381
382 See :meth:`~google.cloud.firestore_v1.client.Client.field_path` for
383 more information on **field paths**.
384
385 If the current query already has a projection set (i.e. has already
386 called :meth:`~google.cloud.firestore_v1.query.Query.select`), this
387 will overwrite it.
388
389 Args:
390 field_paths (Iterable[str, ...]): An iterable of field paths
391 (``.``-delimited list of field names) to use as a projection
392 of document fields in the query results.
393
394 Returns:
395 :class:`~google.cloud.firestore_v1.query.Query`:
396 A "projected" query. Acts as a copy of the current query,
397 modified with the newly added projection.
398 Raises:
399 ValueError: If any ``field_path`` is invalid.
400 """
401 field_paths = list(field_paths)
402 for field_path in field_paths:
403 field_path_module.split_field_path(field_path)
404
405 new_projection = query.StructuredQuery.Projection(
406 fields=[
407 query.StructuredQuery.FieldReference(field_path=field_path)
408 for field_path in field_paths
409 ]
410 )
411 return self._copy(projection=new_projection)
412
413 def _copy(
414 self: QueryType,
415 *,
416 projection: Optional[query.StructuredQuery.Projection] | object = _not_passed,
417 field_filters: Optional[Tuple[query.StructuredQuery.FieldFilter]]
418 | object = _not_passed,
419 orders: Optional[Tuple[query.StructuredQuery.Order]] | object = _not_passed,
420 limit: Optional[int] | object = _not_passed,
421 limit_to_last: Optional[bool] | object = _not_passed,
422 offset: Optional[int] | object = _not_passed,
423 start_at: Optional[Tuple[dict, bool]] | object = _not_passed,
424 end_at: Optional[Tuple[dict, bool]] | object = _not_passed,
425 all_descendants: Optional[bool] | object = _not_passed,
426 recursive: Optional[bool] | object = _not_passed,
427 ) -> QueryType:
428 return self.__class__(
429 self._parent,
430 projection=self._evaluate_param(projection, self._projection),
431 field_filters=self._evaluate_param(field_filters, self._field_filters),
432 orders=self._evaluate_param(orders, self._orders),
433 limit=self._evaluate_param(limit, self._limit),
434 limit_to_last=self._evaluate_param(limit_to_last, self._limit_to_last),
435 offset=self._evaluate_param(offset, self._offset),
436 start_at=self._evaluate_param(start_at, self._start_at),
437 end_at=self._evaluate_param(end_at, self._end_at),
438 all_descendants=self._evaluate_param(
439 all_descendants, self._all_descendants
440 ),
441 recursive=self._evaluate_param(recursive, self._recursive),
442 )
443
444 def _evaluate_param(self, value, fallback_value):
445 """Helper which allows `None` to be passed into `copy` and be set on the
446 copy instead of being misinterpreted as an unpassed parameter."""
447 return value if value is not _not_passed else fallback_value
448
449 def where(
450 self: QueryType,
451 field_path: Optional[str] = None,
452 op_string: Optional[str] = None,
453 value=None,
454 *,
455 filter=None,
456 ) -> QueryType:
457 """Filter the query on a field.
458
459 See :meth:`~google.cloud.firestore_v1.client.Client.field_path` for
460 more information on **field paths**.
461
462 Returns a new :class:`~google.cloud.firestore_v1.query.Query` that
463 filters on a specific field path, according to an operation (e.g.
464 ``==`` or "equals") and a particular value to be paired with that
465 operation.
466
467 Args:
468 field_path (Optional[str]): A field path (``.``-delimited list of
469 field names) for the field to filter on.
470 op_string (Optional[str]): A comparison operation in the form of a string.
471 Acceptable values are ``<``, ``<=``, ``==``, ``!=``, ``>=``, ``>``,
472 ``in``, ``not-in``, ``array_contains`` and ``array_contains_any``.
473 value (Any): The value to compare the field against in the filter.
474 If ``value`` is :data:`None` or a NaN, then ``==`` is the only
475 allowed operation.
476
477 Returns:
478 :class:`~google.cloud.firestore_v1.query.Query`:
479 A filtered query. Acts as a copy of the current query,
480 modified with the newly added filter.
481
482 Raises:
483 ValueError: If
484 * ``field_path`` is invalid.
485 * If ``value`` is a NaN or :data:`None` and ``op_string`` is not ``==``.
486 * FieldFilter was passed without using the filter keyword argument.
487 * `And` or `Or` was passed without using the filter keyword argument .
488 * Both the positional arguments and the keyword argument `filter` were passed.
489 """
490
491 if isinstance(field_path, FieldFilter):
492 raise ValueError(
493 "FieldFilter object must be passed using keyword argument 'filter'"
494 )
495 if isinstance(field_path, BaseCompositeFilter):
496 raise ValueError(
497 "'Or' and 'And' objects must be passed using keyword argument 'filter'"
498 )
499
500 field_path_module.split_field_path(field_path)
501 new_filters = self._field_filters
502
503 if field_path is not None and op_string is not None:
504 if filter is not None:
505 raise ValueError(
506 "Can't pass in both the positional arguments and 'filter' at the same time"
507 )
508 warnings.warn(
509 "Detected filter using positional arguments. Prefer using the 'filter' keyword argument instead.",
510 UserWarning,
511 stacklevel=2,
512 )
513 op = _validate_opation(op_string, value)
514 if isinstance(op, StructuredQuery.UnaryFilter.Operator):
515 filter_pb = query.StructuredQuery.UnaryFilter(
516 field=query.StructuredQuery.FieldReference(field_path=field_path),
517 op=op,
518 )
519 else:
520 filter_pb = query.StructuredQuery.FieldFilter(
521 field=query.StructuredQuery.FieldReference(field_path=field_path),
522 op=_enum_from_op_string(op_string),
523 value=_helpers.encode_value(value),
524 )
525
526 new_filters += (filter_pb,)
527 elif isinstance(filter, BaseFilter):
528 new_filters += (filter._to_pb(),)
529 else:
530 raise ValueError(
531 "Filter must be provided through positional arguments or the 'filter' keyword argument."
532 )
533 return self._copy(field_filters=new_filters)
534
535 @staticmethod
536 def _make_order(field_path, direction) -> StructuredQuery.Order:
537 """Helper for :meth:`order_by`."""
538 return query.StructuredQuery.Order(
539 field=query.StructuredQuery.FieldReference(field_path=field_path),
540 direction=_enum_from_direction(direction),
541 )
542
543 def order_by(
544 self: QueryType, field_path: str, direction: str = ASCENDING
545 ) -> QueryType:
546 """Modify the query to add an order clause on a specific field.
547
548 See :meth:`~google.cloud.firestore_v1.client.Client.field_path` for
549 more information on **field paths**.
550
551 Successive :meth:`~google.cloud.firestore_v1.query.Query.order_by`
552 calls will further refine the ordering of results returned by the query
553 (i.e. the new "order by" fields will be added to existing ones).
554
555 Args:
556 field_path (str): A field path (``.``-delimited list of
557 field names) on which to order the query results.
558 direction (Optional[str]): The direction to order by. Must be one
559 of :attr:`ASCENDING` or :attr:`DESCENDING`, defaults to
560 :attr:`ASCENDING`.
561
562 Returns:
563 :class:`~google.cloud.firestore_v1.query.Query`:
564 An ordered query. Acts as a copy of the current query, modified
565 with the newly added "order by" constraint.
566
567 Raises:
568 ValueError: If ``field_path`` is invalid.
569 ValueError: If ``direction`` is not one of :attr:`ASCENDING` or
570 :attr:`DESCENDING`.
571 """
572 field_path_module.split_field_path(field_path) # raises
573
574 order_pb = self._make_order(field_path, direction)
575
576 new_orders = self._orders + (order_pb,)
577 return self._copy(orders=new_orders)
578
579 def limit(self: QueryType, count: int) -> QueryType:
580 """Limit a query to return at most `count` matching results.
581
582 If the current query already has a `limit` set, this will override it.
583
584 .. note::
585 `limit` and `limit_to_last` are mutually exclusive.
586 Setting `limit` will drop previously set `limit_to_last`.
587
588 Args:
589 count (int): Maximum number of documents to return that match
590 the query.
591 Returns:
592 :class:`~google.cloud.firestore_v1.query.Query`:
593 A limited query. Acts as a copy of the current query, modified
594 with the newly added "limit" filter.
595 """
596 return self._copy(limit=count, limit_to_last=False)
597
598 def limit_to_last(self: QueryType, count: int) -> QueryType:
599 """Limit a query to return the last `count` matching results.
600 If the current query already has a `limit_to_last`
601 set, this will override it.
602
603 .. note::
604 `limit` and `limit_to_last` are mutually exclusive.
605 Setting `limit_to_last` will drop previously set `limit`.
606
607 Args:
608 count (int): Maximum number of documents to return that match
609 the query.
610 Returns:
611 :class:`~google.cloud.firestore_v1.query.Query`:
612 A limited query. Acts as a copy of the current query, modified
613 with the newly added "limit" filter.
614 """
615 return self._copy(limit=count, limit_to_last=True)
616
617 def _resolve_chunk_size(self, num_loaded: int, chunk_size: int) -> int:
618 """Utility function for chunkify."""
619 if self._limit is not None and (num_loaded + chunk_size) > self._limit:
620 return max(self._limit - num_loaded, 0)
621 return chunk_size
622
623 def offset(self: QueryType, num_to_skip: int) -> QueryType:
624 """Skip to an offset in a query.
625
626 If the current query already has specified an offset, this will
627 overwrite it.
628
629 Args:
630 num_to_skip (int): The number of results to skip at the beginning
631 of query results. (Must be non-negative.)
632
633 Returns:
634 :class:`~google.cloud.firestore_v1.query.Query`:
635 An offset query. Acts as a copy of the current query, modified
636 with the newly added "offset" field.
637 """
638 return self._copy(offset=num_to_skip)
639
640 def _check_snapshot(self, document_snapshot) -> None:
641 """Validate local snapshots for non-collection-group queries.
642
643 Raises:
644 ValueError: for non-collection-group queries, if the snapshot
645 is from a different collection.
646 """
647 if self._all_descendants:
648 return
649
650 if document_snapshot.reference._path[:-1] != self._parent._path:
651 raise ValueError("Cannot use snapshot from another collection as a cursor.")
652
653 def _cursor_helper(
654 self: QueryType,
655 document_fields_or_snapshot: Union[DocumentSnapshot, dict, list, tuple, None],
656 before: bool,
657 start: bool,
658 ) -> QueryType:
659 """Set values to be used for a ``start_at`` or ``end_at`` cursor.
660
661 The values will later be used in a query protobuf.
662
663 When the query is sent to the server, the ``document_fields_or_snapshot`` will
664 be used in the order given by fields set by
665 :meth:`~google.cloud.firestore_v1.query.Query.order_by`.
666
667 Args:
668 document_fields_or_snapshot
669 (Union[:class:`~google.cloud.firestore_v1.document.DocumentSnapshot`, dict, list, tuple]):
670 a document snapshot or a dictionary/list/tuple of fields
671 representing a query results cursor. A cursor is a collection
672 of values that represent a position in a query result set.
673 before (bool): Flag indicating if the document in
674 ``document_fields_or_snapshot`` should (:data:`False`) or
675 shouldn't (:data:`True`) be included in the result set.
676 start (Optional[bool]): determines if the cursor is a ``start_at``
677 cursor (:data:`True`) or an ``end_at`` cursor (:data:`False`).
678
679 Returns:
680 :class:`~google.cloud.firestore_v1.query.Query`:
681 A query with cursor. Acts as a copy of the current query, modified
682 with the newly added "start at" cursor.
683 """
684 if isinstance(document_fields_or_snapshot, tuple):
685 document_fields_or_snapshot = list(document_fields_or_snapshot)
686 elif isinstance(document_fields_or_snapshot, document.DocumentSnapshot):
687 self._check_snapshot(document_fields_or_snapshot)
688 else:
689 # NOTE: We copy so that the caller can't modify after calling.
690 document_fields_or_snapshot = copy.deepcopy(document_fields_or_snapshot)
691
692 cursor_pair = document_fields_or_snapshot, before
693 query_kwargs = {
694 "projection": self._projection,
695 "field_filters": self._field_filters,
696 "orders": self._orders,
697 "limit": self._limit,
698 "offset": self._offset,
699 "all_descendants": self._all_descendants,
700 }
701 if start:
702 query_kwargs["start_at"] = cursor_pair
703 query_kwargs["end_at"] = self._end_at
704 else:
705 query_kwargs["start_at"] = self._start_at
706 query_kwargs["end_at"] = cursor_pair
707
708 return self._copy(**query_kwargs)
709
710 def start_at(
711 self: QueryType,
712 document_fields_or_snapshot: Union[DocumentSnapshot, dict, list, tuple, None],
713 ) -> QueryType:
714 """Start query results at a particular document value.
715
716 The result set will **include** the document specified by
717 ``document_fields_or_snapshot``.
718
719 If the current query already has specified a start cursor -- either
720 via this method or
721 :meth:`~google.cloud.firestore_v1.query.Query.start_after` -- this
722 will overwrite it.
723
724 When the query is sent to the server, the ``document_fields`` will
725 be used in the order given by fields set by
726 :meth:`~google.cloud.firestore_v1.query.Query.order_by`.
727
728 Args:
729 document_fields_or_snapshot
730 (Union[:class:`~google.cloud.firestore_v1.document.DocumentSnapshot`, dict, list, tuple]):
731 a document snapshot or a dictionary/list/tuple of fields
732 representing a query results cursor. A cursor is a collection
733 of values that represent a position in a query result set.
734
735 Returns:
736 :class:`~google.cloud.firestore_v1.query.Query`:
737 A query with cursor. Acts as
738 a copy of the current query, modified with the newly added
739 "start at" cursor.
740 """
741 return self._cursor_helper(document_fields_or_snapshot, before=True, start=True)
742
743 def start_after(
744 self: QueryType,
745 document_fields_or_snapshot: Union[DocumentSnapshot, dict, list, tuple, None],
746 ) -> QueryType:
747 """Start query results after a particular document value.
748
749 The result set will **exclude** the document specified by
750 ``document_fields_or_snapshot``.
751
752 If the current query already has specified a start cursor -- either
753 via this method or
754 :meth:`~google.cloud.firestore_v1.query.Query.start_at` -- this will
755 overwrite it.
756
757 When the query is sent to the server, the ``document_fields_or_snapshot`` will
758 be used in the order given by fields set by
759 :meth:`~google.cloud.firestore_v1.query.Query.order_by`.
760
761 Args:
762 document_fields_or_snapshot
763 (Union[:class:`~google.cloud.firestore_v1.document.DocumentSnapshot`, dict, list, tuple]):
764 a document snapshot or a dictionary/list/tuple of fields
765 representing a query results cursor. A cursor is a collection
766 of values that represent a position in a query result set.
767
768 Returns:
769 :class:`~google.cloud.firestore_v1.query.Query`:
770 A query with cursor. Acts as a copy of the current query, modified
771 with the newly added "start after" cursor.
772 """
773 return self._cursor_helper(
774 document_fields_or_snapshot, before=False, start=True
775 )
776
777 def end_before(
778 self: QueryType,
779 document_fields_or_snapshot: Union[DocumentSnapshot, dict, list, tuple, None],
780 ) -> QueryType:
781 """End query results before a particular document value.
782
783 The result set will **exclude** the document specified by
784 ``document_fields_or_snapshot``.
785
786 If the current query already has specified an end cursor -- either
787 via this method or
788 :meth:`~google.cloud.firestore_v1.query.Query.end_at` -- this will
789 overwrite it.
790
791 When the query is sent to the server, the ``document_fields_or_snapshot`` will
792 be used in the order given by fields set by
793 :meth:`~google.cloud.firestore_v1.query.Query.order_by`.
794
795 Args:
796 document_fields_or_snapshot
797 (Union[:class:`~google.cloud.firestore_v1.document.DocumentSnapshot`, dict, list, tuple]):
798 a document snapshot or a dictionary/list/tuple of fields
799 representing a query results cursor. A cursor is a collection
800 of values that represent a position in a query result set.
801
802 Returns:
803 :class:`~google.cloud.firestore_v1.query.Query`:
804 A query with cursor. Acts as a copy of the current query, modified
805 with the newly added "end before" cursor.
806 """
807 return self._cursor_helper(
808 document_fields_or_snapshot, before=True, start=False
809 )
810
811 def end_at(
812 self: QueryType,
813 document_fields_or_snapshot: Union[DocumentSnapshot, dict, list, tuple, None],
814 ) -> QueryType:
815 """End query results at a particular document value.
816
817 The result set will **include** the document specified by
818 ``document_fields_or_snapshot``.
819
820 If the current query already has specified an end cursor -- either
821 via this method or
822 :meth:`~google.cloud.firestore_v1.query.Query.end_before` -- this will
823 overwrite it.
824
825 When the query is sent to the server, the ``document_fields_or_snapshot`` will
826 be used in the order given by fields set by
827 :meth:`~google.cloud.firestore_v1.query.Query.order_by`.
828
829 Args:
830 document_fields_or_snapshot
831 (Union[:class:`~google.cloud.firestore_v1.document.DocumentSnapshot`, dict, list, tuple]):
832 a document snapshot or a dictionary/list/tuple of fields
833 representing a query results cursor. A cursor is a collection
834 of values that represent a position in a query result set.
835
836 Returns:
837 :class:`~google.cloud.firestore_v1.query.Query`:
838 A query with cursor. Acts as a copy of the current query, modified
839 with the newly added "end at" cursor.
840 """
841 return self._cursor_helper(
842 document_fields_or_snapshot, before=False, start=False
843 )
844
845 def _filters_pb(self) -> Optional[StructuredQuery.Filter]:
846 """Convert all the filters into a single generic Filter protobuf.
847
848 This may be a lone field filter or unary filter, may be a composite
849 filter or may be :data:`None`.
850
851 Returns:
852 :class:`google.cloud.firestore_v1.types.StructuredQuery.Filter`:
853 A "generic" filter representing the current query's filters.
854 """
855 num_filters = len(self._field_filters)
856 if num_filters == 0:
857 return None
858 elif num_filters == 1:
859 filter = self._field_filters[0]
860 if isinstance(filter, query.StructuredQuery.CompositeFilter):
861 return query.StructuredQuery.Filter(composite_filter=filter)
862 else:
863 return _filter_pb(filter)
864 else:
865 composite_filter = query.StructuredQuery.CompositeFilter(
866 op=StructuredQuery.CompositeFilter.Operator.AND,
867 )
868 for filter_ in self._field_filters:
869 if isinstance(filter_, query.StructuredQuery.CompositeFilter):
870 composite_filter.filters.append(
871 query.StructuredQuery.Filter(composite_filter=filter_)
872 )
873 else:
874 composite_filter.filters.append(_filter_pb(filter_))
875
876 return query.StructuredQuery.Filter(composite_filter=composite_filter)
877
878 @staticmethod
879 def _normalize_projection(projection) -> StructuredQuery.Projection:
880 """Helper: convert field paths to message."""
881 if projection is not None:
882 fields = list(projection.fields)
883
884 if not fields:
885 field_ref = query.StructuredQuery.FieldReference(field_path="__name__")
886 return query.StructuredQuery.Projection(fields=[field_ref])
887
888 return projection
889
890 def _normalize_orders(self) -> list:
891 """Helper: adjust orders based on cursors, where clauses."""
892 orders = list(self._orders)
893 _has_snapshot_cursor = False
894
895 if self._start_at:
896 if isinstance(self._start_at[0], document.DocumentSnapshot):
897 _has_snapshot_cursor = True
898
899 if self._end_at:
900 if isinstance(self._end_at[0], document.DocumentSnapshot):
901 _has_snapshot_cursor = True
902 if _has_snapshot_cursor:
903 # added orders should use direction of last order
904 last_direction = orders[-1].direction if orders else BaseQuery.ASCENDING
905 order_keys = [order.field.field_path for order in orders]
906 for filter_ in self._field_filters:
907 # FieldFilter.Operator should not compare equal to
908 # UnaryFilter.Operator, but it does
909 if isinstance(filter_.op, StructuredQuery.FieldFilter.Operator):
910 field = filter_.field.field_path
911 # skip equality filters and filters on fields already ordered
912 if filter_.op in _INEQUALITY_OPERATORS and field not in order_keys:
913 orders.append(self._make_order(field, last_direction))
914 # add __name__ if not already in orders
915 if "__name__" not in [order.field.field_path for order in orders]:
916 orders.append(self._make_order("__name__", last_direction))
917
918 return orders
919
920 def _normalize_cursor(self, cursor, orders) -> Tuple[List, bool] | None:
921 """Helper: convert cursor to a list of values based on orders."""
922 if cursor is None:
923 return None
924
925 if not orders:
926 raise ValueError(_NO_ORDERS_FOR_CURSOR)
927
928 document_fields, before = cursor
929
930 order_keys = [order.field.field_path for order in orders]
931
932 if isinstance(document_fields, document.DocumentSnapshot):
933 snapshot = document_fields
934 document_fields = copy.deepcopy(snapshot._data)
935 document_fields["__name__"] = snapshot.reference
936
937 if isinstance(document_fields, dict):
938 # Transform to list using orders
939 values = []
940 data = document_fields
941
942 # It isn't required that all order by have a cursor.
943 # However, we need to be sure they are specified in order without gaps
944 for order_key in order_keys[: len(data)]:
945 try:
946 if order_key in data:
947 values.append(data[order_key])
948 else:
949 values.append(
950 field_path_module.get_nested_value(order_key, data)
951 )
952 except KeyError:
953 msg = _MISSING_ORDER_BY.format(order_key, data)
954 raise ValueError(msg)
955
956 document_fields = values
957
958 if document_fields and len(document_fields) > len(orders):
959 msg = _MISMATCH_CURSOR_W_ORDER_BY.format(document_fields, order_keys)
960 raise ValueError(msg)
961
962 _transform_bases = (transforms.Sentinel, transforms._ValueList)
963
964 for index, key_field in enumerate(zip(order_keys, document_fields)):
965 key, field = key_field
966
967 if isinstance(field, _transform_bases):
968 msg = _INVALID_CURSOR_TRANSFORM
969 raise ValueError(msg)
970
971 if key == "__name__" and isinstance(field, str):
972 document_fields[index] = self._parent.document(field)
973
974 return document_fields, before
975
976 def _to_protobuf(self) -> StructuredQuery:
977 """Convert the current query into the equivalent protobuf.
978
979 Returns:
980 :class:`google.cloud.firestore_v1.types.StructuredQuery`:
981 The query protobuf.
982 """
983 projection = self._normalize_projection(self._projection)
984 orders = self._normalize_orders()
985 start_at = self._normalize_cursor(self._start_at, orders)
986 end_at = self._normalize_cursor(self._end_at, orders)
987
988 query_kwargs = {
989 "select": projection,
990 "from_": [
991 query.StructuredQuery.CollectionSelector(
992 collection_id=self._parent.id, all_descendants=self._all_descendants
993 )
994 ],
995 "where": self._filters_pb(),
996 "order_by": orders,
997 "start_at": _cursor_pb(start_at),
998 "end_at": _cursor_pb(end_at),
999 }
1000 if self._offset is not None:
1001 query_kwargs["offset"] = self._offset
1002 if self._limit is not None:
1003 query_kwargs["limit"] = wrappers_pb2.Int32Value(value=self._limit)
1004 return query.StructuredQuery(**query_kwargs)
1005
1006 def find_nearest(
1007 self,
1008 vector_field: str,
1009 query_vector: Union[Vector, Sequence[float]],
1010 limit: int,
1011 distance_measure: DistanceMeasure,
1012 *,
1013 distance_result_field: Optional[str] = None,
1014 distance_threshold: Optional[float] = None,
1015 ):
1016 raise NotImplementedError
1017
1018 def count(
1019 self, alias: str | None = None
1020 ) -> Type["firestore_v1.base_aggregation.BaseAggregationQuery"]:
1021 raise NotImplementedError
1022
1023 def sum(
1024 self, field_ref: str | FieldPath, alias: str | None = None
1025 ) -> Type["firestore_v1.base_aggregation.BaseAggregationQuery"]:
1026 raise NotImplementedError
1027
1028 def avg(
1029 self, field_ref: str | FieldPath, alias: str | None = None
1030 ) -> Type["firestore_v1.base_aggregation.BaseAggregationQuery"]:
1031 raise NotImplementedError
1032
1033 def get(
1034 self,
1035 transaction=None,
1036 retry: Optional[retries.Retry] = None,
1037 timeout: Optional[float] = None,
1038 *,
1039 explain_options: Optional[ExplainOptions] = None,
1040 read_time: Optional[datetime.datetime] = None,
1041 ) -> (
1042 QueryResultsList[DocumentSnapshot]
1043 | Coroutine[Any, Any, QueryResultsList[DocumentSnapshot]]
1044 ):
1045 raise NotImplementedError
1046
1047 def _prep_stream(
1048 self,
1049 transaction=None,
1050 retry: retries.Retry | retries.AsyncRetry | object | None = None,
1051 timeout: Optional[float] = None,
1052 explain_options: Optional[ExplainOptions] = None,
1053 read_time: Optional[datetime.datetime] = None,
1054 ) -> Tuple[dict, str, dict]:
1055 """Shared setup for async / sync :meth:`stream`"""
1056 if self._limit_to_last:
1057 raise ValueError(
1058 "Query results for queries that include limit_to_last() "
1059 "constraints cannot be streamed. Use Query.get() instead."
1060 )
1061
1062 parent_path, expected_prefix = self._parent._parent_info()
1063 request = {
1064 "parent": parent_path,
1065 "structured_query": self._to_protobuf(),
1066 "transaction": _helpers.get_transaction_id(transaction),
1067 }
1068 if explain_options is not None:
1069 request["explain_options"] = explain_options._to_dict()
1070 if read_time is not None:
1071 request["read_time"] = read_time
1072 kwargs = _helpers.make_retry_timeout_kwargs(retry, timeout)
1073
1074 return request, expected_prefix, kwargs
1075
1076 def stream(
1077 self,
1078 transaction=None,
1079 retry: Optional[retries.Retry] = None,
1080 timeout: Optional[float] = None,
1081 *,
1082 explain_options: Optional[ExplainOptions] = None,
1083 read_time: Optional[datetime.datetime] = None,
1084 ) -> (
1085 StreamGenerator[document.DocumentSnapshot]
1086 | AsyncStreamGenerator[DocumentSnapshot]
1087 ):
1088 raise NotImplementedError
1089
1090 def on_snapshot(self, callback):
1091 raise NotImplementedError
1092
1093 def recursive(self: QueryType) -> QueryType:
1094 """Returns a copy of this query whose iterator will yield all matching
1095 documents as well as each of their descendent subcollections and documents.
1096
1097 This differs from the `all_descendents` flag, which only returns descendents
1098 whose subcollection names match the parent collection's name. To return
1099 all descendents, regardless of their subcollection name, use this.
1100 """
1101 copied = self._copy(recursive=True, all_descendants=True)
1102 if copied._parent and copied._parent.id:
1103 original_collection_id = "/".join(copied._parent._path)
1104
1105 # Reset the parent to nothing so we can recurse through the entire
1106 # database. This is required to have
1107 # `CollectionSelector.collection_id` not override
1108 # `CollectionSelector.all_descendants`, which happens if both are
1109 # set.
1110 copied._parent = copied._get_collection_reference_class()("")
1111 copied._parent._client = self._parent._client
1112
1113 # But wait! We don't want to load the entire database; only the
1114 # collection the user originally specified. To accomplish that, we
1115 # add the following arcane filters.
1116
1117 REFERENCE_NAME_MIN_ID = "__id-9223372036854775808__"
1118 start_at = f"{original_collection_id}/{REFERENCE_NAME_MIN_ID}"
1119
1120 # The backend interprets this null character is flipping the filter
1121 # to mean the end of the range instead of the beginning.
1122 nullChar = "\0"
1123 end_at = f"{original_collection_id}{nullChar}/{REFERENCE_NAME_MIN_ID}"
1124
1125 copied = (
1126 copied.order_by(field_path_module.FieldPath.document_id())
1127 .start_at({field_path_module.FieldPath.document_id(): start_at})
1128 .end_at({field_path_module.FieldPath.document_id(): end_at})
1129 )
1130
1131 return copied
1132
1133 def _build_pipeline(self, source: "PipelineSource"):
1134 """
1135 Convert this query into a Pipeline
1136
1137 Queries containing a `cursor` or `limit_to_last` are not currently supported
1138
1139 Args:
1140 source: the PipelineSource to build the pipeline off of
1141 Raises:
1142 - NotImplementedError: raised if the query contains a `cursor` or `limit_to_last`
1143 Returns:
1144 a Pipeline representing the query
1145 """
1146 if self._all_descendants:
1147 ppl = source.collection_group(self._parent.id)
1148 else:
1149 ppl = source.collection(self._parent._path)
1150
1151 # Filters
1152 for filter_ in self._field_filters:
1153 ppl = ppl.where(
1154 pipeline_expressions.BooleanExpression._from_query_filter_pb(
1155 filter_, self._client
1156 )
1157 )
1158
1159 # Projections
1160 if self._projection and self._projection.fields:
1161 ppl = ppl.select(*[field.field_path for field in self._projection.fields])
1162
1163 # Orders
1164 orders = self._normalize_orders()
1165 if orders:
1166 exists = []
1167 orderings = []
1168 for order in orders:
1169 field = pipeline_expressions.Field.of(order.field.field_path)
1170 exists.append(field.exists())
1171 direction = (
1172 "ascending"
1173 if order.direction == StructuredQuery.Direction.ASCENDING
1174 else "descending"
1175 )
1176 orderings.append(pipeline_expressions.Ordering(field, direction))
1177
1178 # Add exists filters to match Query's implicit orderby semantics.
1179 if len(exists) == 1:
1180 ppl = ppl.where(exists[0])
1181 else:
1182 ppl = ppl.where(pipeline_expressions.And(*exists))
1183
1184 # Add sort orderings
1185 ppl = ppl.sort(*orderings)
1186
1187 # Cursors, Limit and Offset
1188 if self._start_at or self._end_at or self._limit_to_last:
1189 raise NotImplementedError(
1190 "Query to Pipeline conversion: cursors and limit_to_last is not supported yet."
1191 )
1192 else: # Limit & Offset without cursors
1193 if self._offset:
1194 ppl = ppl.offset(self._offset)
1195 if self._limit:
1196 ppl = ppl.limit(self._limit)
1197
1198 return ppl
1199
1200 def _comparator(self, doc1, doc2) -> int:
1201 _orders = self._orders
1202
1203 # Add implicit sorting by name, using the last specified direction.
1204 if len(_orders) == 0:
1205 lastDirection = BaseQuery.ASCENDING
1206 else:
1207 if _orders[-1].direction == 1:
1208 lastDirection = BaseQuery.ASCENDING
1209 else:
1210 lastDirection = BaseQuery.DESCENDING
1211
1212 orderBys = list(_orders)
1213
1214 order_pb = query.StructuredQuery.Order(
1215 field=query.StructuredQuery.FieldReference(field_path="id"),
1216 direction=_enum_from_direction(lastDirection),
1217 )
1218 orderBys.append(order_pb)
1219
1220 for orderBy in orderBys:
1221 if orderBy.field.field_path == "id":
1222 # If ordering by document id, compare resource paths.
1223 comp = Order()._compare_to(doc1.reference._path, doc2.reference._path)
1224 else:
1225 if (
1226 orderBy.field.field_path not in doc1._data
1227 or orderBy.field.field_path not in doc2._data
1228 ):
1229 raise ValueError(
1230 "Can only compare fields that exist in the "
1231 "DocumentSnapshot. Please include the fields you are "
1232 "ordering on in your select() call."
1233 )
1234 v1 = doc1._data[orderBy.field.field_path]
1235 v2 = doc2._data[orderBy.field.field_path]
1236 encoded_v1 = _helpers.encode_value(v1)
1237 encoded_v2 = _helpers.encode_value(v2)
1238 comp = Order().compare(encoded_v1, encoded_v2)
1239
1240 if comp != 0:
1241 # 1 == Ascending, -1 == Descending
1242 return orderBy.direction * comp
1243
1244 return 0
1245
1246 @staticmethod
1247 def _get_collection_reference_class():
1248 raise NotImplementedError
1249
1250
1251def _enum_from_op_string(op_string: str) -> int:
1252 """Convert a string representation of a binary operator to an enum.
1253
1254 These enums come from the protobuf message definition
1255 ``StructuredQuery.FieldFilter.Operator``.
1256
1257 Args:
1258 op_string (str): A comparison operation in the form of a string.
1259 Acceptable values are ``<``, ``<=``, ``==``, ``!=``, ``>=``
1260 and ``>``.
1261
1262 Returns:
1263 int: The enum corresponding to ``op_string``.
1264
1265 Raises:
1266 ValueError: If ``op_string`` is not a valid operator.
1267 """
1268 try:
1269 return _COMPARISON_OPERATORS[op_string]
1270 except KeyError:
1271 choices = ", ".join(sorted(_COMPARISON_OPERATORS.keys()))
1272 msg = _BAD_OP_STRING.format(op_string, choices)
1273 raise ValueError(msg)
1274
1275
1276def _isnan(value) -> bool:
1277 """Check if a value is NaN.
1278
1279 This differs from ``math.isnan`` in that **any** input type is
1280 allowed.
1281
1282 Args:
1283 value (Any): A value to check for NaN-ness.
1284
1285 Returns:
1286 bool: Indicates if the value is the NaN float.
1287 """
1288 if isinstance(value, float):
1289 return math.isnan(value)
1290 else:
1291 return False
1292
1293
1294def _enum_from_direction(direction: str) -> int:
1295 """Convert a string representation of a direction to an enum.
1296
1297 Args:
1298 direction (str): A direction to order by. Must be one of
1299 :attr:`~google.cloud.firestore.BaseQuery.ASCENDING` or
1300 :attr:`~google.cloud.firestore.BaseQuery.DESCENDING`.
1301
1302 Returns:
1303 int: The enum corresponding to ``direction``.
1304
1305 Raises:
1306 ValueError: If ``direction`` is not a valid direction.
1307 """
1308 if isinstance(direction, int):
1309 return direction
1310
1311 if direction == BaseQuery.ASCENDING:
1312 return StructuredQuery.Direction.ASCENDING
1313 elif direction == BaseQuery.DESCENDING:
1314 return StructuredQuery.Direction.DESCENDING
1315 else:
1316 msg = _BAD_DIR_STRING.format(
1317 direction, BaseQuery.ASCENDING, BaseQuery.DESCENDING
1318 )
1319 raise ValueError(msg)
1320
1321
1322def _filter_pb(field_or_unary) -> StructuredQuery.Filter:
1323 """Convert a specific protobuf filter to the generic filter type.
1324
1325 Args:
1326 field_or_unary (Union[google.cloud.firestore_v1.\
1327 query.StructuredQuery.FieldFilter, google.cloud.\
1328 firestore_v1.query.StructuredQuery.FieldFilter]): A
1329 field or unary filter to convert to a generic filter.
1330
1331 Returns:
1332 google.cloud.firestore_v1.types.\
1333 StructuredQuery.Filter: A "generic" filter.
1334
1335 Raises:
1336 ValueError: If ``field_or_unary`` is not a field or unary filter.
1337 """
1338 if isinstance(field_or_unary, query.StructuredQuery.FieldFilter):
1339 return query.StructuredQuery.Filter(field_filter=field_or_unary)
1340 elif isinstance(field_or_unary, query.StructuredQuery.UnaryFilter):
1341 return query.StructuredQuery.Filter(unary_filter=field_or_unary)
1342 else:
1343 raise ValueError("Unexpected filter type", type(field_or_unary), field_or_unary)
1344
1345
1346def _cursor_pb(cursor_pair: Optional[Tuple[list, bool]]) -> Optional[Cursor]:
1347 """Convert a cursor pair to a protobuf.
1348
1349 If ``cursor_pair`` is :data:`None`, just returns :data:`None`.
1350
1351 Args:
1352 cursor_pair (Optional[Tuple[list, bool]]): Two-tuple of
1353
1354 * a list of field values.
1355 * a ``before`` flag
1356
1357 Returns:
1358 Optional[google.cloud.firestore_v1.types.Cursor]: A
1359 protobuf cursor corresponding to the values.
1360 """
1361 if cursor_pair is not None:
1362 data, before = cursor_pair
1363 value_pbs = [_helpers.encode_value(value) for value in data]
1364 return query.Cursor(values=value_pbs, before=before)
1365 else:
1366 return None
1367
1368
1369def _query_response_to_snapshot(
1370 response_pb: RunQueryResponse, collection, expected_prefix: str
1371) -> Optional[document.DocumentSnapshot]:
1372 """Parse a query response protobuf to a document snapshot.
1373
1374 Args:
1375 response_pb (google.cloud.firestore_v1.\
1376 firestore.RunQueryResponse): A
1377 collection (:class:`~google.cloud.firestore_v1.collection.CollectionReference`):
1378 A reference to the collection that initiated the query.
1379 expected_prefix (str): The expected prefix for fully-qualified
1380 document names returned in the query results. This can be computed
1381 directly from ``collection`` via :meth:`_parent_info`.
1382
1383 Returns:
1384 Optional[:class:`~google.cloud.firestore.document.DocumentSnapshot`]:
1385 A snapshot of the data returned in the query. If
1386 ``response_pb.document`` is not set, the snapshot will be :data:`None`.
1387 """
1388 if not response_pb._pb.HasField("document"):
1389 return None
1390
1391 document_id = _helpers.get_doc_id(response_pb.document, expected_prefix)
1392 reference = collection.document(document_id)
1393 data = _helpers.decode_dict(response_pb.document.fields, collection._client)
1394 snapshot = document.DocumentSnapshot(
1395 reference,
1396 data,
1397 exists=True,
1398 read_time=response_pb.read_time,
1399 create_time=response_pb.document.create_time,
1400 update_time=response_pb.document.update_time,
1401 )
1402 return snapshot
1403
1404
1405def _collection_group_query_response_to_snapshot(
1406 response_pb: RunQueryResponse, collection
1407) -> Optional[document.DocumentSnapshot]:
1408 """Parse a query response protobuf to a document snapshot.
1409
1410 Args:
1411 response_pb (google.cloud.firestore_v1.\
1412 firestore.RunQueryResponse): A
1413 collection (:class:`~google.cloud.firestore_v1.collection.CollectionReference`):
1414 A reference to the collection that initiated the query.
1415
1416 Returns:
1417 Optional[:class:`~google.cloud.firestore.document.DocumentSnapshot`]:
1418 A snapshot of the data returned in the query. If
1419 ``response_pb.document`` is not set, the snapshot will be :data:`None`.
1420 """
1421 if not response_pb._pb.HasField("document"):
1422 return None
1423 reference = collection._client.document(response_pb.document.name)
1424 data = _helpers.decode_dict(response_pb.document.fields, collection._client)
1425 snapshot = document.DocumentSnapshot(
1426 reference,
1427 data,
1428 exists=True,
1429 read_time=response_pb._pb.read_time,
1430 create_time=response_pb._pb.document.create_time,
1431 update_time=response_pb._pb.document.update_time,
1432 )
1433 return snapshot
1434
1435
1436class BaseCollectionGroup(BaseQuery):
1437 """Represents a Collection Group in the Firestore API.
1438
1439 This is a specialization of :class:`.Query` that includes all documents in the
1440 database that are contained in a collection or subcollection of the given
1441 parent.
1442
1443 Args:
1444 parent (:class:`~google.cloud.firestore_v1.collection.CollectionReference`):
1445 The collection that this query applies to.
1446 """
1447
1448 _PARTITION_QUERY_ORDER = (
1449 BaseQuery._make_order(
1450 field_path_module.FieldPath.document_id(),
1451 BaseQuery.ASCENDING,
1452 ),
1453 )
1454
1455 def __init__(
1456 self,
1457 parent,
1458 projection=None,
1459 field_filters=(),
1460 orders=(),
1461 limit=None,
1462 limit_to_last=False,
1463 offset=None,
1464 start_at=None,
1465 end_at=None,
1466 all_descendants=True,
1467 recursive=False,
1468 ) -> None:
1469 if not all_descendants:
1470 raise ValueError("all_descendants must be True for collection group query.")
1471
1472 super(BaseCollectionGroup, self).__init__(
1473 parent=parent,
1474 projection=projection,
1475 field_filters=field_filters,
1476 orders=orders,
1477 limit=limit,
1478 limit_to_last=limit_to_last,
1479 offset=offset,
1480 start_at=start_at,
1481 end_at=end_at,
1482 all_descendants=all_descendants,
1483 recursive=recursive,
1484 )
1485
1486 def _validate_partition_query(self):
1487 if self._field_filters:
1488 raise ValueError("Can't partition query with filters.")
1489
1490 if self._projection:
1491 raise ValueError("Can't partition query with projection.")
1492
1493 if self._limit:
1494 raise ValueError("Can't partition query with limit.")
1495
1496 if self._offset:
1497 raise ValueError("Can't partition query with offset.")
1498
1499 def _get_query_class(self):
1500 raise NotImplementedError
1501
1502 def _prep_get_partitions(
1503 self,
1504 partition_count,
1505 retry: retries.Retry | object | None = None,
1506 timeout: float | None = None,
1507 read_time: datetime.datetime | None = None,
1508 ) -> Tuple[dict, dict]:
1509 self._validate_partition_query()
1510 parent_path, expected_prefix = self._parent._parent_info()
1511 klass = self._get_query_class()
1512 query = klass(
1513 self._parent,
1514 orders=self._PARTITION_QUERY_ORDER,
1515 start_at=self._start_at,
1516 end_at=self._end_at,
1517 all_descendants=self._all_descendants,
1518 )
1519 request = {
1520 "parent": parent_path,
1521 "structured_query": query._to_protobuf(),
1522 "partition_count": partition_count,
1523 }
1524 if read_time is not None:
1525 request["read_time"] = read_time
1526 kwargs = _helpers.make_retry_timeout_kwargs(retry, timeout)
1527
1528 return request, kwargs
1529
1530 def get_partitions(
1531 self,
1532 partition_count,
1533 retry: Optional[retries.Retry] = None,
1534 timeout: Optional[float] = None,
1535 *,
1536 read_time: Optional[datetime.datetime] = None,
1537 ):
1538 raise NotImplementedError
1539
1540
1541class QueryPartition:
1542 """Represents a bounded partition of a collection group query.
1543
1544 Contains cursors that can be used in a query as a starting and/or end point for the
1545 collection group query. The cursors may only be used in a query that matches the
1546 constraints of the query that produced this partition.
1547
1548 Args:
1549 query (BaseQuery): The original query that this is a partition of.
1550 start_at (Optional[~google.cloud.firestore_v1.document.DocumentSnapshot]):
1551 Cursor for first query result to include. If `None`, the partition starts at
1552 the beginning of the result set.
1553 end_at (Optional[~google.cloud.firestore_v1.document.DocumentSnapshot]):
1554 Cursor for first query result after the last result included in the
1555 partition. If `None`, the partition runs to the end of the result set.
1556
1557 """
1558
1559 def __init__(self, query, start_at, end_at):
1560 self._query = query
1561 self._start_at = start_at
1562 self._end_at = end_at
1563
1564 @property
1565 def start_at(self):
1566 return self._start_at
1567
1568 @property
1569 def end_at(self):
1570 return self._end_at
1571
1572 def query(self):
1573 """Generate a new query using this partition's bounds.
1574
1575 Returns:
1576 BaseQuery: Copy of the original query with start and end bounds set by the
1577 cursors from this partition.
1578 """
1579 query = self._query
1580 start_at = ([self.start_at], True) if self.start_at else None
1581 end_at = ([self.end_at], True) if self.end_at else None
1582
1583 return type(query)(
1584 query._parent,
1585 all_descendants=query._all_descendants,
1586 orders=query._PARTITION_QUERY_ORDER,
1587 start_at=start_at,
1588 end_at=end_at,
1589 )