1# Copyright 2020 Google LLC All rights reserved.
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7# http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14
15"""Helpers for batch requests to the Google Cloud Firestore API."""
16from __future__ import annotations
17import abc
18from typing import Dict, Union
19
20# Types needed only for Type Hints
21from google.api_core import retry as retries
22
23from google.cloud.firestore_v1 import _helpers
24from google.cloud.firestore_v1.base_document import BaseDocumentReference
25from google.cloud.firestore_v1.types import write as write_pb
26
27
28class BaseBatch(metaclass=abc.ABCMeta):
29 """Accumulate write operations to be sent in a batch.
30
31 This has the same set of methods for write operations that
32 :class:`~google.cloud.firestore_v1.document.DocumentReference` does,
33 e.g. :meth:`~google.cloud.firestore_v1.document.DocumentReference.create`.
34
35 Args:
36 client (:class:`~google.cloud.firestore_v1.client.Client`):
37 The client that created this batch.
38 """
39
40 def __init__(self, client) -> None:
41 self._client = client
42 self._write_pbs: list[write_pb.Write] = []
43 self._document_references: Dict[str, BaseDocumentReference] = {}
44 self.write_results: list[write_pb.WriteResult] | None = None
45 self.commit_time = None
46
47 def __len__(self):
48 return len(self._document_references)
49
50 def __contains__(self, reference: BaseDocumentReference):
51 return reference._document_path in self._document_references
52
53 def _add_write_pbs(self, write_pbs: list[write_pb.Write]) -> None:
54 """Add `Write`` protobufs to this transaction.
55
56 This method intended to be over-ridden by subclasses.
57
58 Args:
59 write_pbs (List[google.cloud.firestore_v1.\
60 write_pb2.Write]): A list of write protobufs to be added.
61 """
62 self._write_pbs.extend(write_pbs)
63
64 @abc.abstractmethod
65 def commit(self):
66 """Sends all accumulated write operations to the server. The details of this
67 write depend on the implementing class."""
68 raise NotImplementedError()
69
70 def create(self, reference: BaseDocumentReference, document_data: dict) -> None:
71 """Add a "change" to this batch to create a document.
72
73 If the document given by ``reference`` already exists, then this
74 batch will fail when :meth:`commit`-ed.
75
76 Args:
77 reference (:class:`~google.cloud.firestore_v1.document.DocumentReference`):
78 A document reference to be created in this batch.
79 document_data (dict): Property names and values to use for
80 creating a document.
81 """
82 write_pbs = _helpers.pbs_for_create(reference._document_path, document_data)
83 self._document_references[reference._document_path] = reference
84 self._add_write_pbs(write_pbs)
85
86 def set(
87 self,
88 reference: BaseDocumentReference,
89 document_data: dict,
90 merge: Union[bool, list] = False,
91 ) -> None:
92 """Add a "change" to replace a document.
93
94 See
95 :meth:`google.cloud.firestore_v1.document.DocumentReference.set` for
96 more information on how ``option`` determines how the change is
97 applied.
98
99 Args:
100 reference (:class:`~google.cloud.firestore_v1.document.DocumentReference`):
101 A document reference that will have values set in this batch.
102 document_data (dict):
103 Property names and values to use for replacing a document.
104 merge (Optional[bool] or Optional[List<apispec>]):
105 If True, apply merging instead of overwriting the state
106 of the document.
107 """
108 if merge is not False:
109 write_pbs = _helpers.pbs_for_set_with_merge(
110 reference._document_path, document_data, merge
111 )
112 else:
113 write_pbs = _helpers.pbs_for_set_no_merge(
114 reference._document_path, document_data
115 )
116
117 self._document_references[reference._document_path] = reference
118 self._add_write_pbs(write_pbs)
119
120 def update(
121 self,
122 reference: BaseDocumentReference,
123 field_updates: dict,
124 option: _helpers.WriteOption | None = None,
125 ) -> None:
126 """Add a "change" to update a document.
127
128 See
129 :meth:`google.cloud.firestore_v1.document.DocumentReference.update`
130 for more information on ``field_updates`` and ``option``.
131
132 Args:
133 reference (:class:`~google.cloud.firestore_v1.document.DocumentReference`):
134 A document reference that will be updated in this batch.
135 field_updates (dict):
136 Field names or paths to update and values to update with.
137 option (Optional[:class:`~google.cloud.firestore_v1.client.WriteOption`]):
138 A write option to make assertions / preconditions on the server
139 state of the document before applying changes.
140 """
141 if option.__class__.__name__ == "ExistsOption":
142 raise ValueError("you must not pass an explicit write option to " "update.")
143 write_pbs = _helpers.pbs_for_update(
144 reference._document_path, field_updates, option
145 )
146 self._document_references[reference._document_path] = reference
147 self._add_write_pbs(write_pbs)
148
149 def delete(
150 self,
151 reference: BaseDocumentReference,
152 option: _helpers.WriteOption | None = None,
153 ) -> None:
154 """Add a "change" to delete a document.
155
156 See
157 :meth:`google.cloud.firestore_v1.document.DocumentReference.delete`
158 for more information on how ``option`` determines how the change is
159 applied.
160
161 Args:
162 reference (:class:`~google.cloud.firestore_v1.document.DocumentReference`):
163 A document reference that will be deleted in this batch.
164 option (Optional[:class:`~google.cloud.firestore_v1.client.WriteOption`]):
165 A write option to make assertions / preconditions on the server
166 state of the document before applying changes.
167 """
168 write_pb = _helpers.pb_for_delete(reference._document_path, option)
169 self._document_references[reference._document_path] = reference
170 self._add_write_pbs([write_pb])
171
172
173class BaseWriteBatch(BaseBatch):
174 """Base class for a/sync implementations of the `commit` RPC. `commit` is useful
175 for lower volumes or when the order of write operations is important."""
176
177 def _prep_commit(
178 self,
179 retry: retries.Retry | retries.AsyncRetry | object | None,
180 timeout: float | None,
181 ):
182 """Shared setup for async/sync :meth:`commit`."""
183 request = {
184 "database": self._client._database_string,
185 "writes": self._write_pbs,
186 "transaction": None,
187 }
188 kwargs = _helpers.make_retry_timeout_kwargs(retry, timeout)
189 return request, kwargs