1# Copyright 2017 Google LLC All rights reserved.
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7# http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14
15"""Classes for representing collections for the Google Cloud Firestore API."""
16from __future__ import annotations
17
18from typing import TYPE_CHECKING, Any, Callable, Generator, Optional, Tuple, Union
19
20from google.api_core import gapic_v1
21from google.api_core import retry as retries
22
23from google.cloud.firestore_v1 import aggregation, document
24from google.cloud.firestore_v1 import query as query_mod
25from google.cloud.firestore_v1 import transaction, vector_query
26from google.cloud.firestore_v1.base_collection import (
27 BaseCollectionReference,
28 _item_to_document_ref,
29)
30from google.cloud.firestore_v1.query_results import QueryResultsList
31from google.cloud.firestore_v1.watch import Watch
32
33if TYPE_CHECKING: # pragma: NO COVER
34 from google.cloud.firestore_v1.base_document import DocumentSnapshot
35 from google.cloud.firestore_v1.query_profile import ExplainOptions
36 from google.cloud.firestore_v1.stream_generator import StreamGenerator
37
38 import datetime
39
40
41class CollectionReference(BaseCollectionReference[query_mod.Query]):
42 """A reference to a collection in a Firestore database.
43
44 The collection may already exist or this class can facilitate creation
45 of documents within the collection.
46
47 Args:
48 path (Tuple[str, ...]): The components in the collection path.
49 This is a series of strings representing each collection and
50 sub-collection ID, as well as the document IDs for any documents
51 that contain a sub-collection.
52 kwargs (dict): The keyword arguments for the constructor. The only
53 supported keyword is ``client`` and it must be a
54 :class:`~google.cloud.firestore_v1.client.Client` if provided. It
55 represents the client that created this collection reference.
56
57 Raises:
58 ValueError: if
59
60 * the ``path`` is empty
61 * there are an even number of elements
62 * a collection ID in ``path`` is not a string
63 * a document ID in ``path`` is not a string
64 TypeError: If a keyword other than ``client`` is used.
65 """
66
67 def __init__(self, *path, **kwargs) -> None:
68 super(CollectionReference, self).__init__(*path, **kwargs)
69
70 def _query(self) -> query_mod.Query:
71 """Query factory.
72
73 Returns:
74 :class:`~google.cloud.firestore_v1.query.Query`
75 """
76 return query_mod.Query(self)
77
78 def _aggregation_query(self) -> aggregation.AggregationQuery:
79 """AggregationQuery factory.
80
81 Returns:
82 :class:`~google.cloud.firestore_v1.aggregation_query.AggregationQuery`
83 """
84 return aggregation.AggregationQuery(self._query())
85
86 def _vector_query(self) -> vector_query.VectorQuery:
87 """VectorQuery factory.
88
89 Returns:
90 :class:`~google.cloud.firestore_v1.vector_query.VectorQuery`
91 """
92 return vector_query.VectorQuery(self._query())
93
94 def add(
95 self,
96 document_data: dict,
97 document_id: Union[str, None] = None,
98 retry: retries.Retry | object | None = gapic_v1.method.DEFAULT,
99 timeout: Union[float, None] = None,
100 ) -> Tuple[Any, Any]:
101 """Create a document in the Firestore database with the provided data.
102
103 Args:
104 document_data (dict): Property names and values to use for
105 creating the document.
106 document_id (Optional[str]): The document identifier within the
107 current collection. If not provided, an ID will be
108 automatically assigned by the server (the assigned ID will be
109 a random 20 character string composed of digits,
110 uppercase and lowercase letters).
111 retry (google.api_core.retry.Retry): Designation of what errors, if any,
112 should be retried. Defaults to a system-specified policy.
113 timeout (float): The timeout for this request. Defaults to a
114 system-specified value.
115
116 Returns:
117 Tuple[:class:`google.protobuf.timestamp_pb2.Timestamp`, \
118 :class:`~google.cloud.firestore_v1.document.DocumentReference`]:
119 Pair of
120
121 * The ``update_time`` when the document was created/overwritten.
122 * A document reference for the created document.
123
124 Raises:
125 :class:`google.cloud.exceptions.Conflict`:
126 If ``document_id`` is provided and the document already exists.
127 """
128 document_ref, kwargs = self._prep_add(
129 document_data,
130 document_id,
131 retry,
132 timeout,
133 )
134 write_result = document_ref.create(document_data, **kwargs)
135 return write_result.update_time, document_ref
136
137 def list_documents(
138 self,
139 page_size: Union[int, None] = None,
140 retry: retries.Retry | object | None = gapic_v1.method.DEFAULT,
141 timeout: Union[float, None] = None,
142 *,
143 read_time: Optional[datetime.datetime] = None,
144 ) -> Generator[Any, Any, None]:
145 """List all subdocuments of the current collection.
146
147 Args:
148 page_size (Optional[int]]): The maximum number of documents
149 in each page of results from this request. Non-positive values
150 are ignored. Defaults to a sensible value set by the API.
151 retry (google.api_core.retry.Retry): Designation of what errors, if any,
152 should be retried. Defaults to a system-specified policy.
153 timeout (float): The timeout for this request. Defaults to a
154 system-specified value.
155 read_time (Optional[datetime.datetime]): If set, reads documents as they were at the given
156 time. This must be a timestamp within the past one hour, or if Point-in-Time Recovery
157 is enabled, can additionally be a whole minute timestamp within the past 7 days. If no
158 timezone is specified in the :class:`datetime.datetime` object, it is assumed to be UTC.
159
160 Returns:
161 Sequence[:class:`~google.cloud.firestore_v1.collection.DocumentReference`]:
162 iterator of subdocuments of the current collection. If the
163 collection does not exist at the time of `snapshot`, the
164 iterator will be empty
165 """
166 request, kwargs = self._prep_list_documents(
167 page_size, retry, timeout, read_time
168 )
169
170 iterator = self._client._firestore_api.list_documents(
171 request=request,
172 metadata=self._client._rpc_metadata,
173 **kwargs,
174 )
175 return (_item_to_document_ref(self, i) for i in iterator)
176
177 def _chunkify(self, chunk_size: int):
178 return self._query()._chunkify(chunk_size)
179
180 def get(
181 self,
182 transaction: Union[transaction.Transaction, None] = None,
183 retry: retries.Retry | object | None = gapic_v1.method.DEFAULT,
184 timeout: Union[float, None] = None,
185 *,
186 explain_options: Optional[ExplainOptions] = None,
187 read_time: Optional[datetime.datetime] = None,
188 ) -> QueryResultsList[DocumentSnapshot]:
189 """Read the documents in this collection.
190
191 This sends a ``RunQuery`` RPC and returns a list of documents
192 returned in the stream of ``RunQueryResponse`` messages.
193
194 Args:
195 transaction
196 (Optional[:class:`~google.cloud.firestore_v1.transaction.transaction.Transaction`]):
197 An existing transaction that this query will run in.
198 retry (google.api_core.retry.Retry): Designation of what errors, if any,
199 should be retried. Defaults to a system-specified policy.
200 timeout (float): The timeout for this request. Defaults to a
201 system-specified value.
202 explain_options
203 (Optional[:class:`~google.cloud.firestore_v1.query_profile.ExplainOptions`]):
204 Options to enable query profiling for this query. When set,
205 explain_metrics will be available on the returned generator.
206 read_time (Optional[datetime.datetime]): If set, reads documents as they were at the given
207 time. This must be a timestamp within the past one hour, or if Point-in-Time Recovery
208 is enabled, can additionally be a whole minute timestamp within the past 7 days. If no
209 timezone is specified in the :class:`datetime.datetime` object, it is assumed to be UTC.
210
211 If a ``transaction`` is used and it already has write operations
212 added, this method cannot be used (i.e. read-after-write is not
213 allowed).
214
215 Returns:
216 QueryResultsList[DocumentSnapshot]: The documents in this collection
217 that match the query.
218 """
219 query, kwargs = self._prep_get_or_stream(retry, timeout)
220 if explain_options is not None:
221 kwargs["explain_options"] = explain_options
222 if read_time is not None:
223 kwargs["read_time"] = read_time
224
225 return query.get(transaction=transaction, **kwargs)
226
227 def stream(
228 self,
229 transaction: Optional[transaction.Transaction] = None,
230 retry: retries.Retry | object | None = gapic_v1.method.DEFAULT,
231 timeout: Optional[float] = None,
232 *,
233 explain_options: Optional[ExplainOptions] = None,
234 read_time: Optional[datetime.datetime] = None,
235 ) -> StreamGenerator[DocumentSnapshot]:
236 """Read the documents in this collection.
237
238 This sends a ``RunQuery`` RPC and then returns an iterator which
239 consumes each document returned in the stream of ``RunQueryResponse``
240 messages.
241
242 .. note::
243
244 The underlying stream of responses will time out after
245 the ``max_rpc_timeout_millis`` value set in the GAPIC
246 client configuration for the ``RunQuery`` API. Snapshots
247 not consumed from the iterator before that point will be lost.
248
249 If a ``transaction`` is used and it already has write operations
250 added, this method cannot be used (i.e. read-after-write is not
251 allowed).
252
253 Args:
254 transaction (Optional[:class:`~google.cloud.firestore_v1.transaction.\
255 transaction.Transaction`]):
256 An existing transaction that the query will run in.
257 retry (Optional[google.api_core.retry.Retry]): Designation of what
258 errors, if any, should be retried. Defaults to a
259 system-specified policy.
260 timeout (Optional[float]): The timeout for this request. Defaults
261 to a system-specified value.
262 explain_options
263 (Optional[:class:`~google.cloud.firestore_v1.query_profile.ExplainOptions`]):
264 Options to enable query profiling for this query. When set,
265 explain_metrics will be available on the returned generator.
266 read_time (Optional[datetime.datetime]): If set, reads documents as they were at the given
267 time. This must be a timestamp within the past one hour, or if Point-in-Time Recovery
268 is enabled, can additionally be a whole minute timestamp within the past 7 days. If no
269 timezone is specified in the :class:`datetime.datetime` object, it is assumed to be UTC.
270
271 Returns:
272 `StreamGenerator[DocumentSnapshot]`: A generator of the query results.
273 """
274 query, kwargs = self._prep_get_or_stream(retry, timeout)
275 if explain_options:
276 kwargs["explain_options"] = explain_options
277 if read_time is not None:
278 kwargs["read_time"] = read_time
279
280 return query.stream(transaction=transaction, **kwargs)
281
282 def on_snapshot(self, callback: Callable) -> Watch:
283 """Monitor the documents in this collection.
284
285 This starts a watch on this collection using a background thread. The
286 provided callback is run on the snapshot of the documents.
287
288 Args:
289 callback (Callable[[:class:`~google.cloud.firestore.collection.CollectionSnapshot`], NoneType]):
290 a callback to run when a change occurs.
291
292 Example:
293 from google.cloud import firestore_v1
294
295 db = firestore_v1.Client()
296 collection_ref = db.collection(u'users')
297
298 def on_snapshot(collection_snapshot, changes, read_time):
299 for doc in collection_snapshot.documents:
300 print(u'{} => {}'.format(doc.id, doc.to_dict()))
301
302 # Watch this collection
303 collection_watch = collection_ref.on_snapshot(on_snapshot)
304
305 # Terminate this watch
306 collection_watch.unsubscribe()
307 """
308 query = self._query()
309 return Watch.for_query(query, callback, document.DocumentSnapshot)