Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/google/cloud/firestore_v1/query.py: 33%
99 statements
« prev ^ index » next coverage.py v7.3.2, created at 2023-12-09 06:27 +0000
« prev ^ index » next coverage.py v7.3.2, created at 2023-12-09 06:27 +0000
1# Copyright 2017 Google LLC All rights reserved.
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7# http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
15"""Classes for representing queries for the Google Cloud Firestore API.
17A :class:`~google.cloud.firestore_v1.query.Query` can be created directly from
18a :class:`~google.cloud.firestore_v1.collection.Collection` and that can be
19a more common way to create a query than direct usage of the constructor.
20"""
21from __future__ import annotations
23from google.cloud import firestore_v1
24from google.cloud.firestore_v1.base_document import DocumentSnapshot
25from google.api_core import exceptions
26from google.api_core import gapic_v1
27from google.api_core import retry as retries
29from google.cloud.firestore_v1.base_query import (
30 BaseCollectionGroup,
31 BaseQuery,
32 QueryPartition,
33 _query_response_to_snapshot,
34 _collection_group_query_response_to_snapshot,
35 _enum_from_direction,
36)
37from google.cloud.firestore_v1 import aggregation
39from google.cloud.firestore_v1 import document
40from google.cloud.firestore_v1.watch import Watch
41from typing import Any, Callable, Generator, List, Optional, Type, TYPE_CHECKING
43if TYPE_CHECKING: # pragma: NO COVER
44 from google.cloud.firestore_v1.field_path import FieldPath
47class Query(BaseQuery):
48 """Represents a query to the Firestore API.
50 Instances of this class are considered immutable: all methods that
51 would modify an instance instead return a new instance.
53 Args:
54 parent (:class:`~google.cloud.firestore_v1.collection.CollectionReference`):
55 The collection that this query applies to.
56 projection (Optional[:class:`google.cloud.proto.firestore.v1.\
57 query.StructuredQuery.Projection`]):
58 A projection of document fields to limit the query results to.
59 field_filters (Optional[Tuple[:class:`google.cloud.proto.firestore.v1.\
60 query.StructuredQuery.FieldFilter`, ...]]):
61 The filters to be applied in the query.
62 orders (Optional[Tuple[:class:`google.cloud.proto.firestore.v1.\
63 query.StructuredQuery.Order`, ...]]):
64 The "order by" entries to use in the query.
65 limit (Optional[int]):
66 The maximum number of documents the query is allowed to return.
67 offset (Optional[int]):
68 The number of results to skip.
69 start_at (Optional[Tuple[dict, bool]]):
70 Two-tuple of :
72 * a mapping of fields. Any field that is present in this mapping
73 must also be present in ``orders``
74 * an ``after`` flag
76 The fields and the flag combine to form a cursor used as
77 a starting point in a query result set. If the ``after``
78 flag is :data:`True`, the results will start just after any
79 documents which have fields matching the cursor, otherwise
80 any matching documents will be included in the result set.
81 When the query is formed, the document values
82 will be used in the order given by ``orders``.
83 end_at (Optional[Tuple[dict, bool]]):
84 Two-tuple of:
86 * a mapping of fields. Any field that is present in this mapping
87 must also be present in ``orders``
88 * a ``before`` flag
90 The fields and the flag combine to form a cursor used as
91 an ending point in a query result set. If the ``before``
92 flag is :data:`True`, the results will end just before any
93 documents which have fields matching the cursor, otherwise
94 any matching documents will be included in the result set.
95 When the query is formed, the document values
96 will be used in the order given by ``orders``.
97 all_descendants (Optional[bool]):
98 When false, selects only collections that are immediate children
99 of the `parent` specified in the containing `RunQueryRequest`.
100 When true, selects all descendant collections.
101 """
103 def __init__(
104 self,
105 parent,
106 projection=None,
107 field_filters=(),
108 orders=(),
109 limit=None,
110 limit_to_last=False,
111 offset=None,
112 start_at=None,
113 end_at=None,
114 all_descendants=False,
115 recursive=False,
116 ) -> None:
117 super(Query, self).__init__(
118 parent=parent,
119 projection=projection,
120 field_filters=field_filters,
121 orders=orders,
122 limit=limit,
123 limit_to_last=limit_to_last,
124 offset=offset,
125 start_at=start_at,
126 end_at=end_at,
127 all_descendants=all_descendants,
128 recursive=recursive,
129 )
131 def get(
132 self,
133 transaction=None,
134 retry: retries.Retry = gapic_v1.method.DEFAULT,
135 timeout: float = None,
136 ) -> List[DocumentSnapshot]:
137 """Read the documents in the collection that match this query.
139 This sends a ``RunQuery`` RPC and returns a list of documents
140 returned in the stream of ``RunQueryResponse`` messages.
142 Args:
143 transaction
144 (Optional[:class:`~google.cloud.firestore_v1.transaction.Transaction`]):
145 An existing transaction that this query will run in.
146 If a ``transaction`` is used and it already has write operations
147 added, this method cannot be used (i.e. read-after-write is not
148 allowed).
149 retry (google.api_core.retry.Retry): Designation of what errors, if any,
150 should be retried. Defaults to a system-specified policy.
151 timeout (float): The timeout for this request. Defaults to a
152 system-specified value.
154 Returns:
155 list: The documents in the collection that match this query.
156 """
157 is_limited_to_last = self._limit_to_last
159 if self._limit_to_last:
160 # In order to fetch up to `self._limit` results from the end of the
161 # query flip the defined ordering on the query to start from the
162 # end, retrieving up to `self._limit` results from the backend.
163 for order in self._orders:
164 order.direction = _enum_from_direction(
165 self.DESCENDING
166 if order.direction.name == self.ASCENDING
167 else self.ASCENDING
168 )
169 self._limit_to_last = False
171 result = self.stream(transaction=transaction, retry=retry, timeout=timeout)
172 if is_limited_to_last:
173 result = reversed(list(result))
175 return list(result)
177 def _chunkify(
178 self, chunk_size: int
179 ) -> Generator[List[DocumentSnapshot], None, None]:
180 max_to_return: Optional[int] = self._limit
181 num_returned: int = 0
182 original: Query = self._copy()
183 last_document: Optional[DocumentSnapshot] = None
185 while True:
186 # Optionally trim the `chunk_size` down to honor a previously
187 # applied limits as set by `self.limit()`
188 _chunk_size: int = original._resolve_chunk_size(num_returned, chunk_size)
190 # Apply the optionally pruned limit and the cursor, if we are past
191 # the first page.
192 _q = original.limit(_chunk_size)
194 if last_document:
195 _q = _q.start_after(last_document)
197 snapshots = _q.get()
199 if snapshots:
200 last_document = snapshots[-1]
202 num_returned += len(snapshots)
204 yield snapshots
206 # Terminate the iterator if we have reached either of two end
207 # conditions:
208 # 1. There are no more documents, or
209 # 2. We have reached the desired overall limit
210 if len(snapshots) < _chunk_size or (
211 max_to_return and num_returned >= max_to_return
212 ):
213 return
215 def _get_stream_iterator(self, transaction, retry, timeout):
216 """Helper method for :meth:`stream`."""
217 request, expected_prefix, kwargs = self._prep_stream(
218 transaction,
219 retry,
220 timeout,
221 )
223 response_iterator = self._client._firestore_api.run_query(
224 request=request,
225 metadata=self._client._rpc_metadata,
226 **kwargs,
227 )
229 return response_iterator, expected_prefix
231 def _retry_query_after_exception(self, exc, retry, transaction):
232 """Helper method for :meth:`stream`."""
233 if transaction is None: # no snapshot-based retry inside transaction
234 if retry is gapic_v1.method.DEFAULT:
235 transport = self._client._firestore_api._transport
236 gapic_callable = transport.run_query
237 retry = gapic_callable._retry
238 return retry._predicate(exc)
240 return False
242 def count(
243 self, alias: str | None = None
244 ) -> Type["firestore_v1.aggregation.AggregationQuery"]:
245 """
246 Adds a count over the query.
248 :type alias: Optional[str]
249 :param alias: Optional name of the field to store the result of the aggregation into.
250 If not provided, Firestore will pick a default name following the format field_<incremental_id++>.
251 """
252 return aggregation.AggregationQuery(self).count(alias=alias)
254 def sum(
255 self, field_ref: str | FieldPath, alias: str | None = None
256 ) -> Type["firestore_v1.aggregation.AggregationQuery"]:
257 """
258 Adds a sum over the query.
260 :type field_ref: Union[str, google.cloud.firestore_v1.field_path.FieldPath]
261 :param field_ref: The field to aggregate across.
263 :type alias: Optional[str]
264 :param alias: Optional name of the field to store the result of the aggregation into.
265 If not provided, Firestore will pick a default name following the format field_<incremental_id++>.
266 """
267 return aggregation.AggregationQuery(self).sum(field_ref, alias=alias)
269 def avg(
270 self, field_ref: str | FieldPath, alias: str | None = None
271 ) -> Type["firestore_v1.aggregation.AggregationQuery"]:
272 """
273 Adds an avg over the query.
275 :type field_ref: [Union[str, google.cloud.firestore_v1.field_path.FieldPath]
276 :param field_ref: The field to aggregate across.
278 :type alias: Optional[str]
279 :param alias: Optional name of the field to store the result of the aggregation into.
280 If not provided, Firestore will pick a default name following the format field_<incremental_id++>.
281 """
282 return aggregation.AggregationQuery(self).avg(field_ref, alias=alias)
284 def stream(
285 self,
286 transaction=None,
287 retry: retries.Retry = gapic_v1.method.DEFAULT,
288 timeout: float = None,
289 ) -> Generator[document.DocumentSnapshot, Any, None]:
290 """Read the documents in the collection that match this query.
292 This sends a ``RunQuery`` RPC and then returns an iterator which
293 consumes each document returned in the stream of ``RunQueryResponse``
294 messages.
296 .. note::
298 The underlying stream of responses will time out after
299 the ``max_rpc_timeout_millis`` value set in the GAPIC
300 client configuration for the ``RunQuery`` API. Snapshots
301 not consumed from the iterator before that point will be lost.
303 If a ``transaction`` is used and it already has write operations
304 added, this method cannot be used (i.e. read-after-write is not
305 allowed).
307 Args:
308 transaction
309 (Optional[:class:`~google.cloud.firestore_v1.transaction.Transaction`]):
310 An existing transaction that this query will run in.
311 retry (google.api_core.retry.Retry): Designation of what errors, if any,
312 should be retried. Defaults to a system-specified policy.
313 timeout (float): The timeout for this request. Defaults to a
314 system-specified value.
316 Yields:
317 :class:`~google.cloud.firestore_v1.document.DocumentSnapshot`:
318 The next document that fulfills the query.
319 """
320 response_iterator, expected_prefix = self._get_stream_iterator(
321 transaction,
322 retry,
323 timeout,
324 )
326 last_snapshot = None
328 while True:
329 try:
330 response = next(response_iterator, None)
331 except exceptions.GoogleAPICallError as exc:
332 if self._retry_query_after_exception(exc, retry, transaction):
333 new_query = self.start_after(last_snapshot)
334 response_iterator, _ = new_query._get_stream_iterator(
335 transaction,
336 retry,
337 timeout,
338 )
339 continue
340 else:
341 raise
343 if response is None: # EOI
344 break
346 if self._all_descendants:
347 snapshot = _collection_group_query_response_to_snapshot(
348 response, self._parent
349 )
350 else:
351 snapshot = _query_response_to_snapshot(
352 response, self._parent, expected_prefix
353 )
354 if snapshot is not None:
355 last_snapshot = snapshot
356 yield snapshot
358 def on_snapshot(self, callback: Callable) -> Watch:
359 """Monitor the documents in this collection that match this query.
361 This starts a watch on this query using a background thread. The
362 provided callback is run on the snapshot of the documents.
364 Args:
365 callback(Callable[[:class:`~google.cloud.firestore.query.QuerySnapshot`], NoneType]):
366 a callback to run when a change occurs.
368 Example:
370 .. code-block:: python
372 from google.cloud import firestore_v1
374 db = firestore_v1.Client()
375 query_ref = db.collection(u'users').where("user", "==", u'Ada')
377 def on_snapshot(docs, changes, read_time):
378 for doc in docs:
379 print(u'{} => {}'.format(doc.id, doc.to_dict()))
381 # Watch this query
382 query_watch = query_ref.on_snapshot(on_snapshot)
384 # Terminate this watch
385 query_watch.unsubscribe()
386 """
387 return Watch.for_query(self, callback, document.DocumentSnapshot)
389 @staticmethod
390 def _get_collection_reference_class() -> (
391 Type["firestore_v1.collection.CollectionReference"]
392 ):
393 from google.cloud.firestore_v1.collection import CollectionReference
395 return CollectionReference
398class CollectionGroup(Query, BaseCollectionGroup):
399 """Represents a Collection Group in the Firestore API.
401 This is a specialization of :class:`.Query` that includes all documents in the
402 database that are contained in a collection or subcollection of the given
403 parent.
405 Args:
406 parent (:class:`~google.cloud.firestore_v1.collection.CollectionReference`):
407 The collection that this query applies to.
408 """
410 def __init__(
411 self,
412 parent,
413 projection=None,
414 field_filters=(),
415 orders=(),
416 limit=None,
417 limit_to_last=False,
418 offset=None,
419 start_at=None,
420 end_at=None,
421 all_descendants=True,
422 recursive=False,
423 ) -> None:
424 super(CollectionGroup, self).__init__(
425 parent=parent,
426 projection=projection,
427 field_filters=field_filters,
428 orders=orders,
429 limit=limit,
430 limit_to_last=limit_to_last,
431 offset=offset,
432 start_at=start_at,
433 end_at=end_at,
434 all_descendants=all_descendants,
435 recursive=recursive,
436 )
438 @staticmethod
439 def _get_query_class():
440 return Query
442 def get_partitions(
443 self,
444 partition_count,
445 retry: retries.Retry = gapic_v1.method.DEFAULT,
446 timeout: float = None,
447 ) -> Generator[QueryPartition, None, None]:
448 """Partition a query for parallelization.
450 Partitions a query by returning partition cursors that can be used to run the
451 query in parallel. The returned partition cursors are split points that can be
452 used as starting/end points for the query results.
454 Args:
455 partition_count (int): The desired maximum number of partition points. The
456 number must be strictly positive. The actual number of partitions
457 returned may be fewer.
458 retry (google.api_core.retry.Retry): Designation of what errors, if any,
459 should be retried. Defaults to a system-specified policy.
460 timeout (float): The timeout for this request. Defaults to a
461 system-specified value.
462 """
463 request, kwargs = self._prep_get_partitions(partition_count, retry, timeout)
465 pager = self._client._firestore_api.partition_query(
466 request=request,
467 metadata=self._client._rpc_metadata,
468 **kwargs,
469 )
471 start_at = None
472 for cursor_pb in pager:
473 cursor = self._client.document(cursor_pb.values[0].reference_value)
474 yield QueryPartition(self, start_at, cursor)
475 start_at = cursor
477 yield QueryPartition(self, start_at, None)