1# Copyright 2017 Google LLC All rights reserved.
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7# http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14
15"""Classes for representing collections for the Google Cloud Firestore API."""
16from __future__ import annotations
17
18import random
19
20from typing import (
21 TYPE_CHECKING,
22 Any,
23 AsyncGenerator,
24 AsyncIterator,
25 Coroutine,
26 Generator,
27 Generic,
28 Iterable,
29 Sequence,
30 Tuple,
31 Union,
32 Optional,
33)
34
35from google.api_core import retry as retries
36
37from google.cloud.firestore_v1 import _helpers
38from google.cloud.firestore_v1.base_document import BaseDocumentReference
39from google.cloud.firestore_v1.base_query import QueryType
40
41if TYPE_CHECKING: # pragma: NO COVER
42 # Types needed only for Type Hints
43 from google.cloud.firestore_v1.base_aggregation import BaseAggregationQuery
44 from google.cloud.firestore_v1.base_document import DocumentSnapshot
45 from google.cloud.firestore_v1.base_vector_query import (
46 BaseVectorQuery,
47 DistanceMeasure,
48 )
49 from google.cloud.firestore_v1.async_document import AsyncDocumentReference
50 from google.cloud.firestore_v1.document import DocumentReference
51 from google.cloud.firestore_v1.field_path import FieldPath
52 from google.cloud.firestore_v1.pipeline_source import PipelineSource
53 from google.cloud.firestore_v1.query_profile import ExplainOptions
54 from google.cloud.firestore_v1.query_results import QueryResultsList
55 from google.cloud.firestore_v1.stream_generator import StreamGenerator
56 from google.cloud.firestore_v1.transaction import Transaction
57 from google.cloud.firestore_v1.vector import Vector
58 from google.cloud.firestore_v1.vector_query import VectorQuery
59
60 import datetime
61
62_AUTO_ID_CHARS = "ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789"
63system_random = random.SystemRandom()
64
65
66class BaseCollectionReference(Generic[QueryType]):
67 """A reference to a collection in a Firestore database.
68
69 The collection may already exist or this class can facilitate creation
70 of documents within the collection.
71
72 Args:
73 path (Tuple[str, ...]): The components in the collection path.
74 This is a series of strings representing each collection and
75 sub-collection ID, as well as the document IDs for any documents
76 that contain a sub-collection.
77 kwargs (dict): The keyword arguments for the constructor. The only
78 supported keyword is ``client`` and it must be a
79 :class:`~google.cloud.firestore_v1.client.Client` if provided. It
80 represents the client that created this collection reference.
81
82 Raises:
83 ValueError: if
84
85 * the ``path`` is empty
86 * there are an even number of elements
87 * a collection ID in ``path`` is not a string
88 * a document ID in ``path`` is not a string
89 TypeError: If a keyword other than ``client`` is used.
90 """
91
92 def __init__(self, *path, **kwargs) -> None:
93 _helpers.verify_path(path, is_collection=True)
94 self._path = path
95 self._client = kwargs.pop("client", None)
96 if kwargs:
97 raise TypeError(
98 "Received unexpected arguments", kwargs, "Only `client` is supported"
99 )
100
101 def __eq__(self, other):
102 if not isinstance(other, self.__class__):
103 return NotImplemented
104 return self._path == other._path and self._client == other._client
105
106 @property
107 def id(self):
108 """The collection identifier.
109
110 Returns:
111 str: The last component of the path.
112 """
113 return self._path[-1]
114
115 @property
116 def parent(self):
117 """Document that owns the current collection.
118
119 Returns:
120 Optional[:class:`~google.cloud.firestore_v1.document.DocumentReference`]:
121 The parent document, if the current collection is not a
122 top-level collection.
123 """
124 if len(self._path) == 1:
125 return None
126 else:
127 parent_path = self._path[:-1]
128 return self._client.document(*parent_path)
129
130 def _query(self) -> QueryType:
131 raise NotImplementedError
132
133 def _aggregation_query(self) -> BaseAggregationQuery:
134 raise NotImplementedError
135
136 def _vector_query(self) -> BaseVectorQuery:
137 raise NotImplementedError
138
139 def document(self, document_id: Optional[str] = None) -> BaseDocumentReference:
140 """Create a sub-document underneath the current collection.
141
142 Args:
143 document_id (Optional[str]): The document identifier
144 within the current collection. If not provided, will default
145 to a random 20 character string composed of digits,
146 uppercase and lowercase and letters.
147
148 Returns:
149 :class:`~google.cloud.firestore_v1.base_document.BaseDocumentReference`:
150 The child document.
151 """
152 if document_id is None:
153 document_id = _auto_id()
154
155 # Append `self._path` and the passed document's ID as long as the first
156 # element in the path is not an empty string, which comes from setting the
157 # parent to "" for recursive queries.
158 child_path = self._path + (document_id,) if self._path[0] else (document_id,)
159 return self._client.document(*child_path)
160
161 def _parent_info(self) -> Tuple[Any, str]:
162 """Get fully-qualified parent path and prefix for this collection.
163
164 Returns:
165 Tuple[str, str]: Pair of
166
167 * the fully-qualified (with database and project) path to the
168 parent of this collection (will either be the database path
169 or a document path).
170 * the prefix to a document in this collection.
171 """
172 parent_doc = self.parent
173 if parent_doc is None:
174 parent_path = _helpers.DOCUMENT_PATH_DELIMITER.join(
175 (self._client._database_string, "documents")
176 )
177 else:
178 parent_path = parent_doc._document_path
179
180 expected_prefix = _helpers.DOCUMENT_PATH_DELIMITER.join((parent_path, self.id))
181 return parent_path, expected_prefix
182
183 def _prep_add(
184 self,
185 document_data: dict,
186 document_id: Optional[str] = None,
187 retry: retries.Retry | retries.AsyncRetry | object | None = None,
188 timeout: Optional[float] = None,
189 ):
190 """Shared setup for async / sync :method:`add`"""
191 if document_id is None:
192 document_id = _auto_id()
193
194 document_ref = self.document(document_id)
195 kwargs = _helpers.make_retry_timeout_kwargs(retry, timeout)
196
197 return document_ref, kwargs
198
199 def add(
200 self,
201 document_data: dict,
202 document_id: Optional[str] = None,
203 retry: retries.Retry | retries.AsyncRetry | object | None = None,
204 timeout: Optional[float] = None,
205 ) -> Union[Tuple[Any, Any], Coroutine[Any, Any, Tuple[Any, Any]]]:
206 raise NotImplementedError
207
208 def _prep_list_documents(
209 self,
210 page_size: Optional[int] = None,
211 retry: retries.Retry | retries.AsyncRetry | object | None = None,
212 timeout: Optional[float] = None,
213 read_time: Optional[datetime.datetime] = None,
214 ) -> Tuple[dict, dict]:
215 """Shared setup for async / sync :method:`list_documents`"""
216 parent, _ = self._parent_info()
217 request = {
218 "parent": parent,
219 "collection_id": self.id,
220 "page_size": page_size,
221 "show_missing": True,
222 # list_documents returns an iterator of document references, which do not
223 # include any fields. To save on data transfer, we can set a field_path mask
224 # to include no fields
225 "mask": {"field_paths": None},
226 }
227 if read_time is not None:
228 request["read_time"] = read_time
229 kwargs = _helpers.make_retry_timeout_kwargs(retry, timeout)
230
231 return request, kwargs
232
233 def list_documents(
234 self,
235 page_size: Optional[int] = None,
236 retry: retries.Retry | retries.AsyncRetry | object | None = None,
237 timeout: Optional[float] = None,
238 *,
239 read_time: Optional[datetime.datetime] = None,
240 ) -> Union[
241 Generator[DocumentReference, Any, Any],
242 AsyncGenerator[AsyncDocumentReference, Any],
243 ]:
244 raise NotImplementedError
245
246 def recursive(self) -> QueryType:
247 return self._query().recursive()
248
249 def select(self, field_paths: Iterable[str]) -> QueryType:
250 """Create a "select" query with this collection as parent.
251
252 See
253 :meth:`~google.cloud.firestore_v1.query.Query.select` for
254 more information on this method.
255
256 Args:
257 field_paths (Iterable[str, ...]): An iterable of field paths
258 (``.``-delimited list of field names) to use as a projection
259 of document fields in the query results.
260
261 Returns:
262 :class:`~google.cloud.firestore_v1.query.Query`:
263 A "projected" query.
264 """
265 query = self._query()
266 return query.select(field_paths)
267
268 def where(
269 self,
270 field_path: Optional[str] = None,
271 op_string: Optional[str] = None,
272 value=None,
273 *,
274 filter=None,
275 ) -> QueryType:
276 """Create a "where" query with this collection as parent.
277
278 See
279 :meth:`~google.cloud.firestore_v1.query.Query.where` for
280 more information on this method.
281
282 Args:
283 field_path (str): A field path (``.``-delimited list of
284 field names) for the field to filter on. Optional.
285 op_string (str): A comparison operation in the form of a string.
286 Acceptable values are ``<``, ``<=``, ``==``, ``>=``, ``>``,
287 and ``in``. Optional.
288 value (Any): The value to compare the field against in the filter.
289 If ``value`` is :data:`None` or a NaN, then ``==`` is the only
290 allowed operation. If ``op_string`` is ``in``, ``value``
291 must be a sequence of values. Optional.
292 filter (class:`~google.cloud.firestore_v1.base_query.BaseFilter`): an instance of a Filter.
293 Either a FieldFilter or a CompositeFilter.
294 Returns:
295 :class:`~google.cloud.firestore_v1.query.Query`:
296 A filtered query.
297 Raises:
298 ValueError, if both the positional arguments (field_path, op_string, value)
299 and the filter keyword argument are passed at the same time.
300 """
301 query = self._query()
302 if field_path and op_string:
303 if filter is not None:
304 raise ValueError(
305 "Can't pass in both the positional arguments and 'filter' at the same time"
306 )
307 if field_path == "__name__" and op_string == "in":
308 wrapped_names = []
309
310 for name in value:
311 if isinstance(name, str):
312 name = self.document(name)
313
314 wrapped_names.append(name)
315
316 value = wrapped_names
317 return query.where(field_path, op_string, value)
318 else:
319 return query.where(filter=filter)
320
321 def order_by(self, field_path: str, **kwargs) -> QueryType:
322 """Create an "order by" query with this collection as parent.
323
324 See
325 :meth:`~google.cloud.firestore_v1.query.Query.order_by` for
326 more information on this method.
327
328 Args:
329 field_path (str): A field path (``.``-delimited list of
330 field names) on which to order the query results.
331 kwargs (Dict[str, Any]): The keyword arguments to pass along
332 to the query. The only supported keyword is ``direction``,
333 see :meth:`~google.cloud.firestore_v1.query.Query.order_by`
334 for more information.
335
336 Returns:
337 :class:`~google.cloud.firestore_v1.query.Query`:
338 An "order by" query.
339 """
340 query = self._query()
341 return query.order_by(field_path, **kwargs)
342
343 def limit(self, count: int) -> QueryType:
344 """Create a limited query with this collection as parent.
345
346 .. note::
347 `limit` and `limit_to_last` are mutually exclusive.
348 Setting `limit` will drop previously set `limit_to_last`.
349
350 See
351 :meth:`~google.cloud.firestore_v1.query.Query.limit` for
352 more information on this method.
353
354 Args:
355 count (int): Maximum number of documents to return that match
356 the query.
357
358 Returns:
359 :class:`~google.cloud.firestore_v1.query.Query`:
360 A limited query.
361 """
362 query = self._query()
363 return query.limit(count)
364
365 def limit_to_last(self, count: int):
366 """Create a limited to last query with this collection as parent.
367
368 .. note::
369 `limit` and `limit_to_last` are mutually exclusive.
370 Setting `limit_to_last` will drop previously set `limit`.
371
372 See
373 :meth:`~google.cloud.firestore_v1.query.Query.limit_to_last`
374 for more information on this method.
375
376 Args:
377 count (int): Maximum number of documents to return that
378 match the query.
379 Returns:
380 :class:`~google.cloud.firestore_v1.query.Query`:
381 A limited to last query.
382 """
383 query = self._query()
384 return query.limit_to_last(count)
385
386 def offset(self, num_to_skip: int) -> QueryType:
387 """Skip to an offset in a query with this collection as parent.
388
389 See
390 :meth:`~google.cloud.firestore_v1.query.Query.offset` for
391 more information on this method.
392
393 Args:
394 num_to_skip (int): The number of results to skip at the beginning
395 of query results. (Must be non-negative.)
396
397 Returns:
398 :class:`~google.cloud.firestore_v1.query.Query`:
399 An offset query.
400 """
401 query = self._query()
402 return query.offset(num_to_skip)
403
404 def start_at(
405 self, document_fields: Union[DocumentSnapshot, dict, list, tuple]
406 ) -> QueryType:
407 """Start query at a cursor with this collection as parent.
408
409 See
410 :meth:`~google.cloud.firestore_v1.query.Query.start_at` for
411 more information on this method.
412
413 Args:
414 document_fields (Union[:class:`~google.cloud.firestore_v1.\
415 document.DocumentSnapshot`, dict, list, tuple]):
416 A document snapshot or a dictionary/list/tuple of fields
417 representing a query results cursor. A cursor is a collection
418 of values that represent a position in a query result set.
419
420 Returns:
421 :class:`~google.cloud.firestore_v1.query.Query`:
422 A query with cursor.
423 """
424 query = self._query()
425 return query.start_at(document_fields)
426
427 def start_after(
428 self, document_fields: Union[DocumentSnapshot, dict, list, tuple]
429 ) -> QueryType:
430 """Start query after a cursor with this collection as parent.
431
432 See
433 :meth:`~google.cloud.firestore_v1.query.Query.start_after` for
434 more information on this method.
435
436 Args:
437 document_fields (Union[:class:`~google.cloud.firestore_v1.\
438 document.DocumentSnapshot`, dict, list, tuple]):
439 A document snapshot or a dictionary/list/tuple of fields
440 representing a query results cursor. A cursor is a collection
441 of values that represent a position in a query result set.
442
443 Returns:
444 :class:`~google.cloud.firestore_v1.query.Query`:
445 A query with cursor.
446 """
447 query = self._query()
448 return query.start_after(document_fields)
449
450 def end_before(
451 self, document_fields: Union[DocumentSnapshot, dict, list, tuple]
452 ) -> QueryType:
453 """End query before a cursor with this collection as parent.
454
455 See
456 :meth:`~google.cloud.firestore_v1.query.Query.end_before` for
457 more information on this method.
458
459 Args:
460 document_fields (Union[:class:`~google.cloud.firestore_v1.\
461 document.DocumentSnapshot`, dict, list, tuple]):
462 A document snapshot or a dictionary/list/tuple of fields
463 representing a query results cursor. A cursor is a collection
464 of values that represent a position in a query result set.
465
466 Returns:
467 :class:`~google.cloud.firestore_v1.query.Query`:
468 A query with cursor.
469 """
470 query = self._query()
471 return query.end_before(document_fields)
472
473 def end_at(
474 self, document_fields: Union[DocumentSnapshot, dict, list, tuple]
475 ) -> QueryType:
476 """End query at a cursor with this collection as parent.
477
478 See
479 :meth:`~google.cloud.firestore_v1.query.Query.end_at` for
480 more information on this method.
481
482 Args:
483 document_fields (Union[:class:`~google.cloud.firestore_v1.\
484 document.DocumentSnapshot`, dict, list, tuple]):
485 A document snapshot or a dictionary/list/tuple of fields
486 representing a query results cursor. A cursor is a collection
487 of values that represent a position in a query result set.
488
489 Returns:
490 :class:`~google.cloud.firestore_v1.query.Query`:
491 A query with cursor.
492 """
493 query = self._query()
494 return query.end_at(document_fields)
495
496 def _prep_get_or_stream(
497 self,
498 retry: retries.Retry | retries.AsyncRetry | object | None = None,
499 timeout: Optional[float] = None,
500 ) -> Tuple[Any, dict]:
501 """Shared setup for async / sync :meth:`get` / :meth:`stream`"""
502 query = self._query()
503 kwargs = _helpers.make_retry_timeout_kwargs(retry, timeout)
504
505 return query, kwargs
506
507 def get(
508 self,
509 transaction: Optional[Transaction] = None,
510 retry: retries.Retry | retries.AsyncRetry | object | None = None,
511 timeout: Optional[float] = None,
512 *,
513 explain_options: Optional[ExplainOptions] = None,
514 read_time: Optional[datetime.datetime] = None,
515 ) -> (
516 QueryResultsList[DocumentSnapshot]
517 | Coroutine[Any, Any, QueryResultsList[DocumentSnapshot]]
518 ):
519 raise NotImplementedError
520
521 def stream(
522 self,
523 transaction: Optional[Transaction] = None,
524 retry: retries.Retry | retries.AsyncRetry | object | None = None,
525 timeout: Optional[float] = None,
526 *,
527 explain_options: Optional[ExplainOptions] = None,
528 read_time: Optional[datetime.datetime] = None,
529 ) -> StreamGenerator[DocumentSnapshot] | AsyncIterator[DocumentSnapshot]:
530 raise NotImplementedError
531
532 def on_snapshot(self, callback):
533 raise NotImplementedError
534
535 def count(self, alias=None):
536 """
537 Adds a count over the nested query.
538
539 :type alias: str
540 :param alias: (Optional) The alias for the count
541 """
542 return self._aggregation_query().count(alias=alias)
543
544 def sum(self, field_ref: str | FieldPath, alias=None):
545 """
546 Adds a sum over the nested query.
547
548 :type field_ref: Union[str, google.cloud.firestore_v1.field_path.FieldPath]
549 :param field_ref: The field to aggregate across.
550
551 :type alias: Optional[str]
552 :param alias: Optional name of the field to store the result of the aggregation into.
553 If not provided, Firestore will pick a default name following the format field_<incremental_id++>.
554
555 """
556 return self._aggregation_query().sum(field_ref, alias=alias)
557
558 def avg(self, field_ref: str | FieldPath, alias=None):
559 """
560 Adds an avg over the nested query.
561
562 :type field_ref: Union[str, google.cloud.firestore_v1.field_path.FieldPath]
563 :param field_ref: The field to aggregate across.
564
565 :type alias: Optional[str]
566 :param alias: Optional name of the field to store the result of the aggregation into.
567 If not provided, Firestore will pick a default name following the format field_<incremental_id++>.
568 """
569 return self._aggregation_query().avg(field_ref, alias=alias)
570
571 def find_nearest(
572 self,
573 vector_field: str,
574 query_vector: Union[Vector, Sequence[float]],
575 limit: int,
576 distance_measure: DistanceMeasure,
577 *,
578 distance_result_field: Optional[str] = None,
579 distance_threshold: Optional[float] = None,
580 ) -> VectorQuery:
581 """
582 Finds the closest vector embeddings to the given query vector.
583
584 Args:
585 vector_field (str): An indexed vector field to search upon. Only documents which contain
586 vectors whose dimensionality match the query_vector can be returned.
587 query_vector(Union[Vector, Sequence[float]]): The query vector that we are searching on. Must be a vector of no more
588 than 2048 dimensions.
589 limit (int): The number of nearest neighbors to return. Must be a positive integer of no more than 1000.
590 distance_measure (:class:`DistanceMeasure`): The Distance Measure to use.
591 distance_result_field (Optional[str]):
592 Name of the field to output the result of the vector distance calculation
593 distance_threshold (Optional[float]):
594 A threshold for which no less similar documents will be returned.
595
596 Returns:
597 :class`~firestore_v1.vector_query.VectorQuery`: the vector query.
598 """
599 return self._vector_query().find_nearest(
600 vector_field,
601 query_vector,
602 limit,
603 distance_measure,
604 distance_result_field=distance_result_field,
605 distance_threshold=distance_threshold,
606 )
607
608 def _build_pipeline(self, source: "PipelineSource"):
609 """
610 Convert this query into a Pipeline
611
612 Args:
613 source: the PipelineSource to build the pipeline off o
614 Returns:
615 a Pipeline representing the query
616 """
617 return self._query()._build_pipeline(source)
618
619
620def _auto_id() -> str:
621 """Generate a "random" automatically generated ID.
622
623 Returns:
624 str: A 20 character string composed of digits, uppercase and
625 lowercase and letters.
626 """
627 try:
628 return "".join(system_random.choice(_AUTO_ID_CHARS) for _ in range(20))
629 # Very old Unix systems don't have os.urandom (/dev/urandom), in which case use random.choice
630 except NotImplementedError:
631 return "".join(random.choice(_AUTO_ID_CHARS) for _ in range(20))
632
633
634def _item_to_document_ref(collection_reference, item):
635 """Convert Document resource to document ref.
636
637 Args:
638 collection_reference (google.api_core.page_iterator.GRPCIterator):
639 iterator response
640 item (dict): document resource
641
642 Returns:
643 :class:`~google.cloud.firestore_v1.base_document.BaseDocumentReference`:
644 The child document
645 """
646 document_id = item.name.split(_helpers.DOCUMENT_PATH_DELIMITER)[-1]
647 return collection_reference.document(document_id)