1# Copyright 2020 Google LLC All rights reserved.
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7# http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14
15"""Classes for representing collections for the Google Cloud Firestore API."""
16from __future__ import annotations
17
18from typing import TYPE_CHECKING, Any, AsyncGenerator, Optional, Tuple, cast
19
20from google.api_core import gapic_v1
21from google.api_core import retry_async as retries
22
23from google.cloud.firestore_v1 import (
24 async_aggregation,
25 async_query,
26 async_vector_query,
27 transaction,
28)
29from google.cloud.firestore_v1.base_collection import (
30 BaseCollectionReference,
31 _item_to_document_ref,
32)
33
34if TYPE_CHECKING: # pragma: NO COVER
35 import datetime
36 from google.cloud.firestore_v1.async_document import AsyncDocumentReference
37 from google.cloud.firestore_v1.async_stream_generator import AsyncStreamGenerator
38 from google.cloud.firestore_v1.base_document import DocumentSnapshot
39 from google.cloud.firestore_v1.query_profile import ExplainOptions
40 from google.cloud.firestore_v1.query_results import QueryResultsList
41
42
43class AsyncCollectionReference(BaseCollectionReference[async_query.AsyncQuery]):
44 """A reference to a collection in a Firestore database.
45
46 The collection may already exist or this class can facilitate creation
47 of documents within the collection.
48
49 Args:
50 path (Tuple[str, ...]): The components in the collection path.
51 This is a series of strings representing each collection and
52 sub-collection ID, as well as the document IDs for any documents
53 that contain a sub-collection.
54 kwargs (dict): The keyword arguments for the constructor. The only
55 supported keyword is ``client`` and it must be a
56 :class:`~google.cloud.firestore_v1.client.Client` if provided. It
57 represents the client that created this collection reference.
58
59 Raises:
60 ValueError: if
61
62 * the ``path`` is empty
63 * there are an even number of elements
64 * a collection ID in ``path`` is not a string
65 * a document ID in ``path`` is not a string
66 TypeError: If a keyword other than ``client`` is used.
67 """
68
69 def __init__(self, *path, **kwargs) -> None:
70 super(AsyncCollectionReference, self).__init__(*path, **kwargs)
71
72 def _query(self) -> async_query.AsyncQuery:
73 """Query factory.
74
75 Returns:
76 :class:`~google.cloud.firestore_v1.query.Query`
77 """
78 return async_query.AsyncQuery(self)
79
80 def _aggregation_query(self) -> async_aggregation.AsyncAggregationQuery:
81 """AsyncAggregationQuery factory.
82
83 Returns:
84 :class:`~google.cloud.firestore_v1.async_aggregation.AsyncAggregationQuery
85 """
86 return async_aggregation.AsyncAggregationQuery(self._query())
87
88 def _vector_query(self) -> async_vector_query.AsyncVectorQuery:
89 """AsyncVectorQuery factory.
90
91 Returns:
92 :class:`~google.cloud.firestore_v1.async_vector_query.AsyncVectorQuery`
93 """
94 return async_vector_query.AsyncVectorQuery(self._query())
95
96 async def _chunkify(self, chunk_size: int):
97 async for page in self._query()._chunkify(chunk_size):
98 yield page
99
100 async def add(
101 self,
102 document_data: dict,
103 document_id: str | None = None,
104 retry: retries.AsyncRetry | object | None = gapic_v1.method.DEFAULT,
105 timeout: float | None = None,
106 ) -> Tuple[Any, Any]:
107 """Create a document in the Firestore database with the provided data.
108
109 Args:
110 document_data (dict): Property names and values to use for
111 creating the document.
112 document_id (Optional[str]): The document identifier within the
113 current collection. If not provided, an ID will be
114 automatically assigned by the server (the assigned ID will be
115 a random 20 character string composed of digits,
116 uppercase and lowercase letters).
117 retry (google.api_core.retry.Retry): Designation of what errors, if any,
118 should be retried. Defaults to a system-specified policy.
119 timeout (float): The timeout for this request. Defaults to a
120 system-specified value.
121
122 Returns:
123 Tuple[:class:`google.protobuf.timestamp_pb2.Timestamp`, \
124 :class:`~google.cloud.firestore_v1.async_document.AsyncDocumentReference`]:
125 Pair of
126
127 * The ``update_time`` when the document was created/overwritten.
128 * A document reference for the created document.
129
130 Raises:
131 :class:`google.cloud.exceptions.Conflict`:
132 If ``document_id`` is provided and the document already exists.
133 """
134 document_ref, kwargs = self._prep_add(
135 document_data,
136 document_id,
137 retry,
138 timeout,
139 )
140 write_result = await document_ref.create(document_data, **kwargs)
141 return write_result.update_time, document_ref
142
143 def document(self, document_id: str | None = None) -> AsyncDocumentReference:
144 """Create a sub-document underneath the current collection.
145
146 Args:
147 document_id (Optional[str]): The document identifier
148 within the current collection. If not provided, will default
149 to a random 20 character string composed of digits,
150 uppercase and lowercase and letters.
151
152 Returns:
153 :class:`~google.cloud.firestore_v1.document.async_document.AsyncDocumentReference`:
154 The child document.
155 """
156 doc = super(AsyncCollectionReference, self).document(document_id)
157 return cast("AsyncDocumentReference", doc)
158
159 async def list_documents(
160 self,
161 page_size: int | None = None,
162 retry: retries.AsyncRetry | object | None = gapic_v1.method.DEFAULT,
163 timeout: float | None = None,
164 *,
165 read_time: datetime.datetime | None = None,
166 ) -> AsyncGenerator[AsyncDocumentReference, None]:
167 """List all subdocuments of the current collection.
168
169 Args:
170 page_size (Optional[int]]): The maximum number of documents
171 in each page of results from this request. Non-positive values
172 are ignored. Defaults to a sensible value set by the API.
173 retry (google.api_core.retry.Retry): Designation of what errors, if any,
174 should be retried. Defaults to a system-specified policy.
175 timeout (float): The timeout for this request. Defaults to a
176 system-specified value.
177 read_time (Optional[datetime.datetime]): If set, reads documents as they were at the given
178 time. This must be a timestamp within the past one hour, or if Point-in-Time Recovery
179 is enabled, can additionally be a whole minute timestamp within the past 7 days. If no
180 timezone is specified in the :class:`datetime.datetime` object, it is assumed to be UTC.
181
182 Returns:
183 Sequence[:class:`~google.cloud.firestore_v1.collection.DocumentReference`]:
184 iterator of subdocuments of the current collection. If the
185 collection does not exist at the time of `snapshot`, the
186 iterator will be empty
187 """
188 request, kwargs = self._prep_list_documents(
189 page_size, retry, timeout, read_time
190 )
191
192 iterator = await self._client._firestore_api.list_documents(
193 request=request,
194 metadata=self._client._rpc_metadata,
195 **kwargs,
196 )
197 async for i in iterator:
198 yield _item_to_document_ref(self, i)
199
200 async def get(
201 self,
202 transaction: Optional[transaction.Transaction] = None,
203 retry: retries.AsyncRetry | object | None = gapic_v1.method.DEFAULT,
204 timeout: Optional[float] = None,
205 *,
206 explain_options: Optional[ExplainOptions] = None,
207 read_time: Optional[datetime.datetime] = None,
208 ) -> QueryResultsList[DocumentSnapshot]:
209 """Read the documents in this collection.
210
211 This sends a ``RunQuery`` RPC and returns a list of documents
212 returned in the stream of ``RunQueryResponse`` messages.
213
214 Args:
215 transaction
216 (Optional[:class:`~google.cloud.firestore_v1.transaction.Transaction`]):
217 An existing transaction that this query will run in.
218 retry (Optional[google.api_core.retry.Retry]): Designation of what
219 errors, if any, should be retried. Defaults to a
220 system-specified policy.
221 timeout (Otional[float]): The timeout for this request. Defaults
222 to a system-specified value.
223 explain_options
224 (Optional[:class:`~google.cloud.firestore_v1.query_profile.ExplainOptions`]):
225 Options to enable query profiling for this query. When set,
226 explain_metrics will be available on the returned generator.
227 read_time (Optional[datetime.datetime]): If set, reads documents as they were at the given
228 time. This must be a timestamp within the past one hour, or if Point-in-Time Recovery
229 is enabled, can additionally be a whole minute timestamp within the past 7 days. If no
230 timezone is specified in the :class:`datetime.datetime` object, it is assumed to be UTC.
231
232 If a ``transaction`` is used and it already has write operations added,
233 this method cannot be used (i.e. read-after-write is not allowed).
234
235 Returns:
236 QueryResultsList[DocumentSnapshot]:
237 The documents in this collection that match the query.
238 """
239 query, kwargs = self._prep_get_or_stream(retry, timeout)
240 if explain_options is not None:
241 kwargs["explain_options"] = explain_options
242 if read_time is not None:
243 kwargs["read_time"] = read_time
244
245 return await query.get(transaction=transaction, **kwargs)
246
247 def stream(
248 self,
249 transaction: Optional[transaction.Transaction] = None,
250 retry: retries.AsyncRetry | object | None = gapic_v1.method.DEFAULT,
251 timeout: Optional[float] = None,
252 *,
253 explain_options: Optional[ExplainOptions] = None,
254 read_time: Optional[datetime.datetime] = None,
255 ) -> AsyncStreamGenerator[DocumentSnapshot]:
256 """Read the documents in this collection.
257
258 This sends a ``RunQuery`` RPC and then returns a generator which
259 consumes each document returned in the stream of ``RunQueryResponse``
260 messages.
261
262 .. note::
263
264 The underlying stream of responses will time out after
265 the ``max_rpc_timeout_millis`` value set in the GAPIC
266 client configuration for the ``RunQuery`` API. Snapshots
267 not consumed from the iterator before that point will be lost.
268
269 If a ``transaction`` is used and it already has write operations
270 added, this method cannot be used (i.e. read-after-write is not
271 allowed).
272
273 Args:
274 transaction (Optional[:class:`~google.cloud.firestore_v1.transaction.\
275 Transaction`]):
276 An existing transaction that the query will run in.
277 retry (Optional[google.api_core.retry.Retry]): Designation of what
278 errors, if any, should be retried. Defaults to a
279 system-specified policy.
280 timeout (Optional[float]): The timeout for this request. Defaults
281 to a system-specified value.
282 explain_options
283 (Optional[:class:`~google.cloud.firestore_v1.query_profile.ExplainOptions`]):
284 Options to enable query profiling for this query. When set,
285 explain_metrics will be available on the returned generator.
286 read_time (Optional[datetime.datetime]): If set, reads documents as they were at the given
287 time. This must be a timestamp within the past one hour, or if Point-in-Time Recovery
288 is enabled, can additionally be a whole minute timestamp within the past 7 days. If no
289 timezone is specified in the :class:`datetime.datetime` object, it is assumed to be UTC.
290
291 Returns:
292 `AsyncStreamGenerator[DocumentSnapshot]`: A generator of the query
293 results.
294 """
295 query, kwargs = self._prep_get_or_stream(retry, timeout)
296 if explain_options:
297 kwargs["explain_options"] = explain_options
298 if read_time is not None:
299 kwargs["read_time"] = read_time
300
301 return query.stream(transaction=transaction, **kwargs)