1# Copyright 2017 Google LLC All rights reserved.
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7# http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14
15"""Client for interacting with the Google Cloud Firestore API.
16
17This is the base from which all interactions with the API occur.
18
19In the hierarchy of API concepts
20
21* a :class:`~google.cloud.firestore_v1.client.Client` owns a
22 :class:`~google.cloud.firestore_v1.collection.CollectionReference`
23* a :class:`~google.cloud.firestore_v1.client.Client` owns a
24 :class:`~google.cloud.firestore_v1.document.DocumentReference`
25"""
26from __future__ import annotations
27
28import datetime
29import os
30from typing import (
31 Any,
32 AsyncGenerator,
33 Awaitable,
34 Generator,
35 Iterable,
36 List,
37 Optional,
38 Tuple,
39 Union,
40 Type,
41)
42
43import google.api_core.client_options
44import google.api_core.path_template
45import grpc # type: ignore
46from google.api_core import retry as retries
47from google.api_core.gapic_v1 import client_info
48from google.auth.credentials import AnonymousCredentials
49from google.cloud.client import ClientWithProject # type: ignore
50
51from google.cloud.firestore_v1 import __version__, _helpers, types
52from google.cloud.firestore_v1.base_batch import BaseWriteBatch
53
54# Types needed only for Type Hints
55from google.cloud.firestore_v1.base_collection import BaseCollectionReference
56from google.cloud.firestore_v1.base_document import (
57 BaseDocumentReference,
58 DocumentSnapshot,
59)
60from google.cloud.firestore_v1.base_query import BaseQuery
61from google.cloud.firestore_v1.base_transaction import MAX_ATTEMPTS, BaseTransaction
62from google.cloud.firestore_v1.bulk_writer import BulkWriter, BulkWriterOptions
63from google.cloud.firestore_v1.field_path import render_field_path
64from google.cloud.firestore_v1.services.firestore import client as firestore_client
65from google.cloud.firestore_v1.pipeline_source import PipelineSource
66from google.cloud.firestore_v1.base_pipeline import _BasePipeline
67
68DEFAULT_DATABASE = "(default)"
69"""str: The default database used in a :class:`~google.cloud.firestore_v1.client.Client`."""
70_DEFAULT_EMULATOR_PROJECT = "google-cloud-firestore-emulator"
71_BAD_OPTION_ERR = (
72 "Exactly one of ``last_update_time`` or ``exists`` " "must be provided."
73)
74_BAD_DOC_TEMPLATE: str = (
75 "Document {!r} appeared in response but was not present among references"
76)
77_ACTIVE_TXN: str = "There is already an active transaction."
78_INACTIVE_TXN: str = "There is no active transaction."
79_CLIENT_INFO: Any = client_info.ClientInfo(client_library_version=__version__)
80_FIRESTORE_EMULATOR_HOST: str = "FIRESTORE_EMULATOR_HOST"
81
82
83class BaseClient(ClientWithProject):
84 """Client for interacting with Google Cloud Firestore API.
85
86 .. note::
87
88 Since the Cloud Firestore API requires the gRPC transport, no
89 ``_http`` argument is accepted by this class.
90
91 Args:
92 project (Optional[str]): The project which the client acts on behalf
93 of. If not passed, falls back to the default inferred
94 from the environment.
95 credentials (Optional[~google.auth.credentials.Credentials]): The
96 OAuth2 Credentials to use for this client. If not passed, falls
97 back to the default inferred from the environment.
98 database (Optional[str]): The database name that the client targets.
99 For now, :attr:`DEFAULT_DATABASE` (the default value) is the
100 only valid database.
101 client_info (Optional[google.api_core.gapic_v1.client_info.ClientInfo]):
102 The client info used to send a user-agent string along with API
103 requests. If ``None``, then default info will be used. Generally,
104 you only need to set this if you're developing your own library
105 or partner tool.
106 client_options (Union[dict, google.api_core.client_options.ClientOptions]):
107 Client options used to set user options on the client. API Endpoint
108 should be set through client_options.
109 """
110
111 SCOPE = (
112 "https://www.googleapis.com/auth/cloud-platform",
113 "https://www.googleapis.com/auth/datastore",
114 )
115 """The scopes required for authenticating with the Firestore service."""
116
117 _firestore_api_internal = None
118 _database_string_internal = None
119 _rpc_metadata_internal = None
120
121 def __init__(
122 self,
123 project=None,
124 credentials=None,
125 database=None,
126 client_info=_CLIENT_INFO,
127 client_options=None,
128 ) -> None:
129 database = database or DEFAULT_DATABASE
130 # NOTE: This API has no use for the _http argument, but sending it
131 # will have no impact since the _http() @property only lazily
132 # creates a working HTTP object.
133 self._emulator_host = os.getenv(_FIRESTORE_EMULATOR_HOST)
134
135 if self._emulator_host is not None:
136 if credentials is None:
137 credentials = AnonymousCredentials()
138 if project is None:
139 # extract project from env var, or use system default
140 project = (
141 os.getenv("GOOGLE_CLOUD_PROJECT")
142 or os.getenv("GCLOUD_PROJECT")
143 or _DEFAULT_EMULATOR_PROJECT
144 )
145
146 super(BaseClient, self).__init__(
147 project=project,
148 credentials=credentials,
149 client_options=client_options,
150 _http=None,
151 )
152 self._client_info = client_info
153 if client_options:
154 if isinstance(client_options, dict):
155 client_options = google.api_core.client_options.from_dict(
156 client_options
157 )
158 self._client_options = client_options
159
160 self._database = database
161
162 def _firestore_api_helper(self, transport, client_class, client_module) -> Any:
163 """Lazy-loading getter GAPIC Firestore API.
164 Returns:
165 The GAPIC client with the credentials of the current client.
166 """
167 if self._firestore_api_internal is None:
168 # Use a custom channel.
169 # We need this in order to set appropriate keepalive options.
170
171 if self._emulator_host is not None:
172 channel = self._emulator_channel(transport)
173 else:
174 channel = transport.create_channel(
175 self._target,
176 credentials=self._credentials,
177 options={"grpc.keepalive_time_ms": 30000}.items(),
178 )
179
180 self._transport = transport(host=self._target, channel=channel)
181
182 self._firestore_api_internal = client_class(
183 transport=self._transport, client_options=self._client_options
184 )
185 client_module._client_info = self._client_info
186
187 return self._firestore_api_internal
188
189 def _emulator_channel(self, transport):
190 """
191 Creates an insecure channel to communicate with the local emulator.
192 If credentials are provided the token is extracted and added to the
193 headers. This supports local testing of firestore rules if the credentials
194 have been created from a signed custom token.
195
196 :return: grpc.Channel or grpc.aio.Channel
197 """
198 # Insecure channels are used for the emulator as secure channels
199 # cannot be used to communicate on some environments.
200 # https://github.com/googleapis/python-firestore/issues/359
201 # Default the token to a non-empty string, in this case "owner".
202 token = "owner"
203 if (
204 self._credentials is not None
205 and getattr(self._credentials, "id_token", None) is not None
206 ):
207 token = self._credentials.id_token
208 options = [("Authorization", f"Bearer {token}")]
209
210 if "GrpcAsyncIOTransport" in str(transport.__name__):
211 return grpc.aio.insecure_channel(self._emulator_host, options=options)
212 else:
213 return grpc.insecure_channel(self._emulator_host, options=options)
214
215 def _target_helper(self, client_class) -> str:
216 """Return the target (where the API is).
217 Eg. "firestore.googleapis.com"
218
219 Returns:
220 str: The location of the API.
221 """
222 if self._emulator_host is not None:
223 return self._emulator_host
224 elif self._client_options and self._client_options.api_endpoint:
225 return self._client_options.api_endpoint
226 else:
227 return client_class.DEFAULT_ENDPOINT
228
229 @property
230 def _target(self):
231 """Return the target (where the API is).
232 Eg. "firestore.googleapis.com"
233
234 Returns:
235 str: The location of the API.
236 """
237 return self._target_helper(firestore_client.FirestoreClient)
238
239 @property
240 def _database_string(self):
241 """The database string corresponding to this client's project.
242
243 This value is lazy-loaded and cached.
244
245 Will be of the form
246
247 ``projects/{project_id}/databases/{database_id}``
248
249 but ``database_id == '(default)'`` for the time being.
250
251 Returns:
252 str: The fully-qualified database string for the current
253 project. (The default database is also in this string.)
254 """
255 if self._database_string_internal is None:
256 db_str = google.api_core.path_template.expand(
257 "projects/{project}/databases/{database}",
258 project=self.project,
259 database=self._database,
260 )
261
262 self._database_string_internal = db_str
263
264 return self._database_string_internal
265
266 @property
267 def _rpc_metadata(self):
268 """The RPC metadata for this client's associated database.
269
270 Returns:
271 Sequence[Tuple(str, str)]: RPC metadata with resource prefix
272 for the database associated with this client.
273 """
274 if self._rpc_metadata_internal is None:
275 self._rpc_metadata_internal = _helpers.metadata_with_prefix(
276 self._database_string
277 )
278
279 if self._emulator_host is not None:
280 # The emulator requires additional metadata to be set.
281 self._rpc_metadata_internal.append(("authorization", "Bearer owner"))
282
283 return self._rpc_metadata_internal
284
285 def collection(self, *collection_path) -> BaseCollectionReference:
286 raise NotImplementedError
287
288 def collection_group(self, collection_id: str) -> BaseQuery:
289 raise NotImplementedError
290
291 def _get_collection_reference(
292 self, collection_id: str
293 ) -> BaseCollectionReference[BaseQuery]:
294 """Checks validity of collection_id and then uses subclasses collection implementation.
295
296 Args:
297 collection_id (str) Identifies the collections to query over.
298
299 Every collection or subcollection with this ID as the last segment of its
300 path will be included. Cannot contain a slash.
301
302 Returns:
303 The created collection.
304 """
305 if "/" in collection_id:
306 raise ValueError(
307 "Invalid collection_id "
308 + collection_id
309 + ". Collection IDs must not contain '/'."
310 )
311
312 return self.collection(collection_id)
313
314 def document(self, *document_path) -> BaseDocumentReference:
315 raise NotImplementedError
316
317 def bulk_writer(self, options: Optional[BulkWriterOptions] = None) -> BulkWriter:
318 """Get a BulkWriter instance from this client.
319
320 Args:
321 :class:`@google.cloud.firestore_v1.bulk_writer.BulkWriterOptions`:
322 Optional control parameters for the
323 :class:`@google.cloud.firestore_v1.bulk_writer.BulkWriter` returned.
324
325 Returns:
326 :class:`@google.cloud.firestore_v1.bulk_writer.BulkWriter`:
327 A utility to efficiently create and save many `WriteBatch` instances
328 to the server.
329 """
330 return BulkWriter(client=self, options=options)
331
332 def _document_path_helper(self, *document_path) -> List[str]:
333 """Standardize the format of path to tuple of path segments and strip the database string from path if present.
334
335 Args:
336 document_path (Tuple[str, ...]): Can either be
337
338 * A single ``/``-delimited path to a document
339 * A tuple of document path segments
340 """
341 path = _path_helper(document_path)
342 base_path = self._database_string + "/documents/"
343 joined_path = _helpers.DOCUMENT_PATH_DELIMITER.join(path)
344 if joined_path.startswith(base_path):
345 joined_path = joined_path[len(base_path) :]
346 return joined_path.split(_helpers.DOCUMENT_PATH_DELIMITER)
347
348 def recursive_delete(
349 self,
350 reference,
351 *,
352 bulk_writer: Optional["BulkWriter"] = None,
353 chunk_size: int = 5000,
354 ) -> int | Awaitable[int]:
355 raise NotImplementedError
356
357 @staticmethod
358 def field_path(*field_names: str) -> str:
359 """Create a **field path** from a list of nested field names.
360
361 A **field path** is a ``.``-delimited concatenation of the field
362 names. It is used to represent a nested field. For example,
363 in the data
364
365 .. code-block:: python
366
367 data = {
368 'aa': {
369 'bb': {
370 'cc': 10,
371 },
372 },
373 }
374
375 the field path ``'aa.bb.cc'`` represents the data stored in
376 ``data['aa']['bb']['cc']``.
377
378 Args:
379 field_names: The list of field names.
380
381 Returns:
382 str: The ``.``-delimited field path.
383 """
384 return render_field_path(field_names)
385
386 @staticmethod
387 def write_option(
388 **kwargs,
389 ) -> Union[_helpers.ExistsOption, _helpers.LastUpdateOption]:
390 """Create a write option for write operations.
391
392 Write operations include :meth:`~google.cloud.DocumentReference.set`,
393 :meth:`~google.cloud.DocumentReference.update` and
394 :meth:`~google.cloud.DocumentReference.delete`.
395
396 One of the following keyword arguments must be provided:
397
398 * ``last_update_time`` (:class:`google.protobuf.timestamp_pb2.\
399 Timestamp`): A timestamp. When set, the target document must
400 exist and have been last updated at that time. Protobuf
401 ``update_time`` timestamps are typically returned from methods
402 that perform write operations as part of a "write result"
403 protobuf or directly.
404 * ``exists`` (:class:`bool`): Indicates if the document being modified
405 should already exist.
406
407 Providing no argument would make the option have no effect (so
408 it is not allowed). Providing multiple would be an apparent
409 contradiction, since ``last_update_time`` assumes that the
410 document **was** updated (it can't have been updated if it
411 doesn't exist) and ``exists`` indicate that it is unknown if the
412 document exists or not.
413
414 Args:
415 kwargs (Dict[str, Any]): The keyword arguments described above.
416
417 Raises:
418 TypeError: If anything other than exactly one argument is
419 provided by the caller.
420
421 Returns:
422 :class:`~google.cloud.firestore_v1.client.WriteOption`:
423 The option to be used to configure a write message.
424 """
425 if len(kwargs) != 1:
426 raise TypeError(_BAD_OPTION_ERR)
427
428 name, value = kwargs.popitem()
429 if name == "last_update_time":
430 return _helpers.LastUpdateOption(value)
431 elif name == "exists":
432 return _helpers.ExistsOption(value)
433 else:
434 extra = "{!r} was provided".format(name)
435 raise TypeError(_BAD_OPTION_ERR, extra)
436
437 def _prep_get_all(
438 self,
439 references: list,
440 field_paths: Iterable[str] | None = None,
441 transaction: BaseTransaction | None = None,
442 retry: retries.Retry | retries.AsyncRetry | object | None = None,
443 timeout: float | None = None,
444 read_time: datetime.datetime | None = None,
445 ) -> Tuple[dict, dict, dict]:
446 """Shared setup for async/sync :meth:`get_all`."""
447 document_paths, reference_map = _reference_info(references)
448 mask = _get_doc_mask(field_paths)
449 request = {
450 "database": self._database_string,
451 "documents": document_paths,
452 "mask": mask,
453 "transaction": _helpers.get_transaction_id(transaction),
454 }
455 if read_time is not None:
456 request["read_time"] = read_time
457 kwargs = _helpers.make_retry_timeout_kwargs(retry, timeout)
458
459 return request, reference_map, kwargs
460
461 def get_all(
462 self,
463 references: list,
464 field_paths: Iterable[str] | None = None,
465 transaction=None,
466 retry: retries.Retry | retries.AsyncRetry | object | None = None,
467 timeout: float | None = None,
468 *,
469 read_time: datetime.datetime | None = None,
470 ) -> Union[
471 AsyncGenerator[DocumentSnapshot, Any], Generator[DocumentSnapshot, Any, Any]
472 ]:
473 raise NotImplementedError
474
475 def _prep_collections(
476 self,
477 retry: retries.Retry | retries.AsyncRetry | object | None = None,
478 timeout: float | None = None,
479 read_time: datetime.datetime | None = None,
480 ) -> Tuple[dict, dict]:
481 """Shared setup for async/sync :meth:`collections`."""
482 request: dict[str, Any] = {
483 "parent": "{}/documents".format(self._database_string),
484 }
485 if read_time is not None:
486 request["read_time"] = read_time
487 kwargs = _helpers.make_retry_timeout_kwargs(retry, timeout)
488
489 return request, kwargs
490
491 def collections(
492 self,
493 retry: retries.Retry | retries.AsyncRetry | object | None = None,
494 timeout: float | None = None,
495 *,
496 read_time: datetime.datetime | None = None,
497 ):
498 raise NotImplementedError
499
500 def batch(self) -> BaseWriteBatch:
501 raise NotImplementedError
502
503 def transaction(
504 self, max_attempts: int = MAX_ATTEMPTS, read_only: bool = False
505 ) -> BaseTransaction:
506 raise NotImplementedError
507
508 def pipeline(self) -> PipelineSource:
509 """
510 Start a pipeline with this client.
511
512 Returns:
513 :class:`~google.cloud.firestore_v1.pipeline_source.PipelineSource`:
514 A pipeline that uses this client`
515 """
516 raise NotImplementedError
517
518 @property
519 def _pipeline_cls(self) -> Type["_BasePipeline"]:
520 raise NotImplementedError
521
522
523def _reference_info(references: list) -> Tuple[list, dict]:
524 """Get information about document references.
525
526 Helper for :meth:`~google.cloud.firestore_v1.client.Client.get_all`.
527
528 Args:
529 references (List[.DocumentReference, ...]): Iterable of document
530 references.
531
532 Returns:
533 Tuple[List[str, ...], Dict[str, .DocumentReference]]: A two-tuple of
534
535 * fully-qualified documents paths for each reference in ``references``
536 * a mapping from the paths to the original reference. (If multiple
537 ``references`` contains multiple references to the same document,
538 that key will be overwritten in the result.)
539 """
540 document_paths = []
541 reference_map = {}
542 for reference in references:
543 doc_path = reference._document_path
544 document_paths.append(doc_path)
545 reference_map[doc_path] = reference
546
547 return document_paths, reference_map
548
549
550def _get_reference(document_path: str, reference_map: dict) -> BaseDocumentReference:
551 """Get a document reference from a dictionary.
552
553 This just wraps a simple dictionary look-up with a helpful error that is
554 specific to :meth:`~google.cloud.firestore.client.Client.get_all`, the
555 **public** caller of this function.
556
557 Args:
558 document_path (str): A fully-qualified document path.
559 reference_map (Dict[str, .DocumentReference]): A mapping (produced
560 by :func:`_reference_info`) of fully-qualified document paths to
561 document references.
562
563 Returns:
564 .DocumentReference: The matching reference.
565
566 Raises:
567 ValueError: If ``document_path`` has not been encountered.
568 """
569 try:
570 return reference_map[document_path]
571 except KeyError:
572 msg = _BAD_DOC_TEMPLATE.format(document_path)
573 raise ValueError(msg)
574
575
576def _parse_batch_get(
577 get_doc_response: types.BatchGetDocumentsResponse,
578 reference_map: dict,
579 client: BaseClient,
580) -> DocumentSnapshot:
581 """Parse a `BatchGetDocumentsResponse` protobuf.
582
583 Args:
584 get_doc_response (~google.cloud.firestore_v1.\
585 firestore.BatchGetDocumentsResponse): A single response (from
586 a stream) containing the "get" response for a document.
587 reference_map (Dict[str, .DocumentReference]): A mapping (produced
588 by :func:`_reference_info`) of fully-qualified document paths to
589 document references.
590 client (:class:`~google.cloud.firestore_v1.client.Client`):
591 A client that has a document factory.
592
593 Returns:
594 [.DocumentSnapshot]: The retrieved snapshot.
595
596 Raises:
597 ValueError: If the response has a ``result`` field (a oneof) other
598 than ``found`` or ``missing``.
599 """
600 result_type = get_doc_response._pb.WhichOneof("result")
601 if result_type == "found":
602 reference = _get_reference(get_doc_response.found.name, reference_map)
603 data = _helpers.decode_dict(get_doc_response.found.fields, client)
604 snapshot = DocumentSnapshot(
605 reference,
606 data,
607 exists=True,
608 read_time=get_doc_response.read_time,
609 create_time=get_doc_response.found.create_time,
610 update_time=get_doc_response.found.update_time,
611 )
612 elif result_type == "missing":
613 reference = _get_reference(get_doc_response.missing, reference_map)
614 snapshot = DocumentSnapshot(
615 reference,
616 None,
617 exists=False,
618 read_time=get_doc_response.read_time,
619 create_time=None,
620 update_time=None,
621 )
622 else:
623 raise ValueError(
624 "`BatchGetDocumentsResponse.result` (a oneof) had a field other "
625 "than `found` or `missing` set, or was unset"
626 )
627 return snapshot
628
629
630def _get_doc_mask(
631 field_paths: Iterable[str] | None,
632) -> Optional[types.common.DocumentMask]:
633 """Get a document mask if field paths are provided.
634
635 Args:
636 field_paths (Optional[Iterable[str, ...]]): An iterable of field
637 paths (``.``-delimited list of field names) to use as a
638 projection of document fields in the returned results.
639
640 Returns:
641 Optional[google.cloud.firestore_v1.types.common.DocumentMask]: A mask
642 to project documents to a restricted set of field paths.
643 """
644 if field_paths is None:
645 return None
646 else:
647 return types.DocumentMask(field_paths=field_paths)
648
649
650def _path_helper(path: tuple) -> Tuple[str]:
651 """Standardize path into a tuple of path segments.
652
653 Args:
654 path (Tuple[str, ...]): Can either be
655
656 * A single ``/``-delimited path
657 * A tuple of path segments
658 """
659 if len(path) == 1:
660 return path[0].split(_helpers.DOCUMENT_PATH_DELIMITER)
661 else:
662 return path