Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/google/cloud/firestore_v1/async_query.py: 34%
74 statements
« prev ^ index » next coverage.py v7.3.2, created at 2023-12-09 06:27 +0000
« prev ^ index » next coverage.py v7.3.2, created at 2023-12-09 06:27 +0000
1# Copyright 2020 Google LLC All rights reserved.
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7# http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
15"""Classes for representing queries for the Google Cloud Firestore API.
17A :class:`~google.cloud.firestore_v1.query.Query` can be created directly from
18a :class:`~google.cloud.firestore_v1.collection.Collection` and that can be
19a more common way to create a query than direct usage of the constructor.
20"""
21from __future__ import annotations
23from google.api_core import gapic_v1
24from google.api_core import retry as retries
26from google.cloud import firestore_v1
27from google.cloud.firestore_v1.base_query import (
28 BaseCollectionGroup,
29 BaseQuery,
30 QueryPartition,
31 _query_response_to_snapshot,
32 _collection_group_query_response_to_snapshot,
33 _enum_from_direction,
34)
36from google.cloud.firestore_v1 import async_document
37from google.cloud.firestore_v1.async_aggregation import AsyncAggregationQuery
38from google.cloud.firestore_v1.base_document import DocumentSnapshot
39from typing import AsyncGenerator, List, Optional, Type, TYPE_CHECKING
41if TYPE_CHECKING: # pragma: NO COVER
42 # Types needed only for Type Hints
43 from google.cloud.firestore_v1.transaction import Transaction
44 from google.cloud.firestore_v1.field_path import FieldPath
47class AsyncQuery(BaseQuery):
48 """Represents a query to the Firestore API.
50 Instances of this class are considered immutable: all methods that
51 would modify an instance instead return a new instance.
53 Args:
54 parent (:class:`~google.cloud.firestore_v1.collection.CollectionReference`):
55 The collection that this query applies to.
56 projection (Optional[:class:`google.cloud.proto.firestore.v1.\
57 query.StructuredQuery.Projection`]):
58 A projection of document fields to limit the query results to.
59 field_filters (Optional[Tuple[:class:`google.cloud.proto.firestore.v1.\
60 query.StructuredQuery.FieldFilter`, ...]]):
61 The filters to be applied in the query.
62 orders (Optional[Tuple[:class:`google.cloud.proto.firestore.v1.\
63 query.StructuredQuery.Order`, ...]]):
64 The "order by" entries to use in the query.
65 limit (Optional[int]):
66 The maximum number of documents the query is allowed to return.
67 offset (Optional[int]):
68 The number of results to skip.
69 start_at (Optional[Tuple[dict, bool]]):
70 Two-tuple of :
72 * a mapping of fields. Any field that is present in this mapping
73 must also be present in ``orders``
74 * an ``after`` flag
76 The fields and the flag combine to form a cursor used as
77 a starting point in a query result set. If the ``after``
78 flag is :data:`True`, the results will start just after any
79 documents which have fields matching the cursor, otherwise
80 any matching documents will be included in the result set.
81 When the query is formed, the document values
82 will be used in the order given by ``orders``.
83 end_at (Optional[Tuple[dict, bool]]):
84 Two-tuple of:
86 * a mapping of fields. Any field that is present in this mapping
87 must also be present in ``orders``
88 * a ``before`` flag
90 The fields and the flag combine to form a cursor used as
91 an ending point in a query result set. If the ``before``
92 flag is :data:`True`, the results will end just before any
93 documents which have fields matching the cursor, otherwise
94 any matching documents will be included in the result set.
95 When the query is formed, the document values
96 will be used in the order given by ``orders``.
97 all_descendants (Optional[bool]):
98 When false, selects only collections that are immediate children
99 of the `parent` specified in the containing `RunQueryRequest`.
100 When true, selects all descendant collections.
101 recursive (Optional[bool]):
102 When true, returns all documents and all documents in any subcollections
103 below them. Defaults to false.
104 """
106 def __init__(
107 self,
108 parent,
109 projection=None,
110 field_filters=(),
111 orders=(),
112 limit=None,
113 limit_to_last=False,
114 offset=None,
115 start_at=None,
116 end_at=None,
117 all_descendants=False,
118 recursive=False,
119 ) -> None:
120 super(AsyncQuery, self).__init__(
121 parent=parent,
122 projection=projection,
123 field_filters=field_filters,
124 orders=orders,
125 limit=limit,
126 limit_to_last=limit_to_last,
127 offset=offset,
128 start_at=start_at,
129 end_at=end_at,
130 all_descendants=all_descendants,
131 recursive=recursive,
132 )
134 async def _chunkify(
135 self, chunk_size: int
136 ) -> AsyncGenerator[List[DocumentSnapshot], None]:
137 max_to_return: Optional[int] = self._limit
138 num_returned: int = 0
139 original: AsyncQuery = self._copy()
140 last_document: Optional[DocumentSnapshot] = None
142 while True:
143 # Optionally trim the `chunk_size` down to honor a previously
144 # applied limit as set by `self.limit()`
145 _chunk_size: int = original._resolve_chunk_size(num_returned, chunk_size)
147 # Apply the optionally pruned limit and the cursor, if we are past
148 # the first page.
149 _q = original.limit(_chunk_size)
151 if last_document:
152 _q = _q.start_after(last_document)
154 snapshots = await _q.get()
156 if snapshots:
157 last_document = snapshots[-1]
159 num_returned += len(snapshots)
161 yield snapshots
163 # Terminate the iterator if we have reached either of two end
164 # conditions:
165 # 1. There are no more documents, or
166 # 2. We have reached the desired overall limit
167 if len(snapshots) < _chunk_size or (
168 max_to_return and num_returned >= max_to_return
169 ):
170 return
172 async def get(
173 self,
174 transaction: Transaction = None,
175 retry: retries.Retry = gapic_v1.method.DEFAULT,
176 timeout: float = None,
177 ) -> list:
178 """Read the documents in the collection that match this query.
180 This sends a ``RunQuery`` RPC and returns a list of documents
181 returned in the stream of ``RunQueryResponse`` messages.
183 Args:
184 transaction
185 (Optional[:class:`~google.cloud.firestore_v1.transaction.Transaction`]):
186 An existing transaction that this query will run in.
187 retry (google.api_core.retry.Retry): Designation of what errors, if any,
188 should be retried. Defaults to a system-specified policy.
189 timeout (float): The timeout for this request. Defaults to a
190 system-specified value.
192 If a ``transaction`` is used and it already has write operations
193 added, this method cannot be used (i.e. read-after-write is not
194 allowed).
196 Returns:
197 list: The documents in the collection that match this query.
198 """
199 is_limited_to_last = self._limit_to_last
201 if self._limit_to_last:
202 # In order to fetch up to `self._limit` results from the end of the
203 # query flip the defined ordering on the query to start from the
204 # end, retrieving up to `self._limit` results from the backend.
205 for order in self._orders:
206 order.direction = _enum_from_direction(
207 self.DESCENDING
208 if order.direction == self.ASCENDING
209 else self.ASCENDING
210 )
211 self._limit_to_last = False
213 result = self.stream(transaction=transaction, retry=retry, timeout=timeout)
214 result = [d async for d in result]
215 if is_limited_to_last:
216 result = list(reversed(result))
218 return result
220 def count(
221 self, alias: str | None = None
222 ) -> Type["firestore_v1.async_aggregation.AsyncAggregationQuery"]:
223 """Adds a count over the nested query.
225 Args:
226 alias(Optional[str]): Optional name of the field to store the result of the aggregation into.
227 If not provided, Firestore will pick a default name following the format field_<incremental_id++>.
229 Returns:
230 :class:`~google.cloud.firestore_v1.async_aggregation.AsyncAggregationQuery`:
231 An instance of an AsyncAggregationQuery object
232 """
233 return AsyncAggregationQuery(self).count(alias=alias)
235 def sum(
236 self, field_ref: str | FieldPath, alias: str | None = None
237 ) -> Type["firestore_v1.async_aggregation.AsyncAggregationQuery"]:
238 """Adds a sum over the nested query.
240 Args:
241 field_ref(Union[str, google.cloud.firestore_v1.field_path.FieldPath]): The field to aggregate across.
242 alias(Optional[str]): Optional name of the field to store the result of the aggregation into.
243 If not provided, Firestore will pick a default name following the format field_<incremental_id++>.
245 Returns:
246 :class:`~google.cloud.firestore_v1.async_aggregation.AsyncAggregationQuery`:
247 An instance of an AsyncAggregationQuery object
248 """
249 return AsyncAggregationQuery(self).sum(field_ref, alias=alias)
251 def avg(
252 self, field_ref: str | FieldPath, alias: str | None = None
253 ) -> Type["firestore_v1.async_aggregation.AsyncAggregationQuery"]:
254 """Adds an avg over the nested query.
256 Args:
257 field_ref(Union[str, google.cloud.firestore_v1.field_path.FieldPath]): The field to aggregate across.
258 alias(Optional[str]): Optional name of the field to store the result of the aggregation into.
259 If not provided, Firestore will pick a default name following the format field_<incremental_id++>.
261 Returns:
262 :class:`~google.cloud.firestore_v1.async_aggregation.AsyncAggregationQuery`:
263 An instance of an AsyncAggregationQuery object
264 """
265 return AsyncAggregationQuery(self).avg(field_ref, alias=alias)
267 async def stream(
268 self,
269 transaction=None,
270 retry: retries.Retry = gapic_v1.method.DEFAULT,
271 timeout: float = None,
272 ) -> AsyncGenerator[async_document.DocumentSnapshot, None]:
273 """Read the documents in the collection that match this query.
275 This sends a ``RunQuery`` RPC and then returns an iterator which
276 consumes each document returned in the stream of ``RunQueryResponse``
277 messages.
279 .. note::
281 The underlying stream of responses will time out after
282 the ``max_rpc_timeout_millis`` value set in the GAPIC
283 client configuration for the ``RunQuery`` API. Snapshots
284 not consumed from the iterator before that point will be lost.
286 If a ``transaction`` is used and it already has write operations
287 added, this method cannot be used (i.e. read-after-write is not
288 allowed).
290 Args:
291 transaction
292 (Optional[:class:`~google.cloud.firestore_v1.transaction.Transaction`]):
293 An existing transaction that this query will run in.
294 retry (google.api_core.retry.Retry): Designation of what errors, if any,
295 should be retried. Defaults to a system-specified policy.
296 timeout (float): The timeout for this request. Defaults to a
297 system-specified value.
299 Yields:
300 :class:`~google.cloud.firestore_v1.async_document.DocumentSnapshot`:
301 The next document that fulfills the query.
302 """
303 request, expected_prefix, kwargs = self._prep_stream(
304 transaction,
305 retry,
306 timeout,
307 )
309 response_iterator = await self._client._firestore_api.run_query(
310 request=request,
311 metadata=self._client._rpc_metadata,
312 **kwargs,
313 )
315 async for response in response_iterator:
316 if self._all_descendants:
317 snapshot = _collection_group_query_response_to_snapshot(
318 response, self._parent
319 )
320 else:
321 snapshot = _query_response_to_snapshot(
322 response, self._parent, expected_prefix
323 )
324 if snapshot is not None:
325 yield snapshot
327 @staticmethod
328 def _get_collection_reference_class() -> (
329 Type["firestore_v1.async_collection.AsyncCollectionReference"]
330 ):
331 from google.cloud.firestore_v1.async_collection import AsyncCollectionReference
333 return AsyncCollectionReference
336class AsyncCollectionGroup(AsyncQuery, BaseCollectionGroup):
337 """Represents a Collection Group in the Firestore API.
339 This is a specialization of :class:`.AsyncQuery` that includes all documents in the
340 database that are contained in a collection or subcollection of the given
341 parent.
343 Args:
344 parent (:class:`~google.cloud.firestore_v1.collection.CollectionReference`):
345 The collection that this query applies to.
346 """
348 def __init__(
349 self,
350 parent,
351 projection=None,
352 field_filters=(),
353 orders=(),
354 limit=None,
355 limit_to_last=False,
356 offset=None,
357 start_at=None,
358 end_at=None,
359 all_descendants=True,
360 recursive=False,
361 ) -> None:
362 super(AsyncCollectionGroup, self).__init__(
363 parent=parent,
364 projection=projection,
365 field_filters=field_filters,
366 orders=orders,
367 limit=limit,
368 limit_to_last=limit_to_last,
369 offset=offset,
370 start_at=start_at,
371 end_at=end_at,
372 all_descendants=all_descendants,
373 recursive=recursive,
374 )
376 @staticmethod
377 def _get_query_class():
378 return AsyncQuery
380 async def get_partitions(
381 self,
382 partition_count,
383 retry: retries.Retry = gapic_v1.method.DEFAULT,
384 timeout: float = None,
385 ) -> AsyncGenerator[QueryPartition, None]:
386 """Partition a query for parallelization.
388 Partitions a query by returning partition cursors that can be used to run the
389 query in parallel. The returned partition cursors are split points that can be
390 used as starting/end points for the query results.
392 Args:
393 partition_count (int): The desired maximum number of partition points. The
394 number must be strictly positive. The actual number of partitions
395 returned may be fewer.
396 retry (google.api_core.retry.Retry): Designation of what errors, if any,
397 should be retried. Defaults to a system-specified policy.
398 timeout (float): The timeout for this request. Defaults to a
399 system-specified value.
400 """
401 request, kwargs = self._prep_get_partitions(partition_count, retry, timeout)
402 pager = await self._client._firestore_api.partition_query(
403 request=request,
404 metadata=self._client._rpc_metadata,
405 **kwargs,
406 )
408 start_at = None
409 async for cursor_pb in pager:
410 cursor = self._client.document(cursor_pb.values[0].reference_value)
411 yield QueryPartition(self, start_at, cursor)
412 start_at = cursor
414 yield QueryPartition(self, start_at, None)