Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/google/cloud/firestore_v1/transaction.py: 32%
90 statements
« prev ^ index » next coverage.py v7.3.2, created at 2023-12-09 06:27 +0000
« prev ^ index » next coverage.py v7.3.2, created at 2023-12-09 06:27 +0000
1# Copyright 2017 Google LLC All rights reserved.
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7# http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
15"""Helpers for applying Google Cloud Firestore changes in a transaction."""
18import random
19import time
21from google.api_core import gapic_v1
22from google.api_core import retry as retries
24from google.cloud.firestore_v1.base_transaction import (
25 _BaseTransactional,
26 BaseTransaction,
27 MAX_ATTEMPTS,
28 _CANT_BEGIN,
29 _CANT_ROLLBACK,
30 _CANT_COMMIT,
31 _WRITE_READ_ONLY,
32 _INITIAL_SLEEP,
33 _MAX_SLEEP,
34 _MULTIPLIER,
35 _EXCEED_ATTEMPTS_TEMPLATE,
36)
38from google.api_core import exceptions
39from google.cloud.firestore_v1 import batch
40from google.cloud.firestore_v1.document import DocumentReference
41from google.cloud.firestore_v1 import _helpers
42from google.cloud.firestore_v1.query import Query
44# Types needed only for Type Hints
45from google.cloud.firestore_v1.base_document import DocumentSnapshot
46from google.cloud.firestore_v1.types import CommitResponse
47from typing import Any, Callable, Generator
50class Transaction(batch.WriteBatch, BaseTransaction):
51 """Accumulate read-and-write operations to be sent in a transaction.
53 Args:
54 client (:class:`~google.cloud.firestore_v1.client.Client`):
55 The client that created this transaction.
56 max_attempts (Optional[int]): The maximum number of attempts for
57 the transaction (i.e. allowing retries). Defaults to
58 :attr:`~google.cloud.firestore_v1.transaction.MAX_ATTEMPTS`.
59 read_only (Optional[bool]): Flag indicating if the transaction
60 should be read-only or should allow writes. Defaults to
61 :data:`False`.
62 """
64 def __init__(self, client, max_attempts=MAX_ATTEMPTS, read_only=False) -> None:
65 super(Transaction, self).__init__(client)
66 BaseTransaction.__init__(self, max_attempts, read_only)
68 def _add_write_pbs(self, write_pbs: list) -> None:
69 """Add `Write`` protobufs to this transaction.
71 Args:
72 write_pbs (List[google.cloud.proto.firestore.v1.\
73 write.Write]): A list of write protobufs to be added.
75 Raises:
76 ValueError: If this transaction is read-only.
77 """
78 if self._read_only:
79 raise ValueError(_WRITE_READ_ONLY)
81 super(Transaction, self)._add_write_pbs(write_pbs)
83 def _begin(self, retry_id: bytes = None) -> None:
84 """Begin the transaction.
86 Args:
87 retry_id (Optional[bytes]): Transaction ID of a transaction to be
88 retried.
90 Raises:
91 ValueError: If the current transaction has already begun.
92 """
93 if self.in_progress:
94 msg = _CANT_BEGIN.format(self._id)
95 raise ValueError(msg)
97 transaction_response = self._client._firestore_api.begin_transaction(
98 request={
99 "database": self._client._database_string,
100 "options": self._options_protobuf(retry_id),
101 },
102 metadata=self._client._rpc_metadata,
103 )
104 self._id = transaction_response.transaction
106 def _rollback(self) -> None:
107 """Roll back the transaction.
109 Raises:
110 ValueError: If no transaction is in progress.
111 google.api_core.exceptions.GoogleAPICallError: If the rollback fails.
112 """
113 if not self.in_progress:
114 raise ValueError(_CANT_ROLLBACK)
116 try:
117 # NOTE: The response is just ``google.protobuf.Empty``.
118 self._client._firestore_api.rollback(
119 request={
120 "database": self._client._database_string,
121 "transaction": self._id,
122 },
123 metadata=self._client._rpc_metadata,
124 )
125 finally:
126 # clean up, even if rollback fails
127 self._clean_up()
129 def _commit(self) -> list:
130 """Transactionally commit the changes accumulated.
132 Returns:
133 List[:class:`google.cloud.proto.firestore.v1.write.WriteResult`, ...]:
134 The write results corresponding to the changes committed, returned
135 in the same order as the changes were applied to this transaction.
136 A write result contains an ``update_time`` field.
138 Raises:
139 ValueError: If no transaction is in progress.
140 """
141 if not self.in_progress:
142 raise ValueError(_CANT_COMMIT)
144 commit_response = _commit_with_retry(self._client, self._write_pbs, self._id)
146 self._clean_up()
147 return list(commit_response.write_results)
149 def get_all(
150 self,
151 references: list,
152 retry: retries.Retry = gapic_v1.method.DEFAULT,
153 timeout: float = None,
154 ) -> Generator[DocumentSnapshot, Any, None]:
155 """Retrieves multiple documents from Firestore.
157 Args:
158 references (List[.DocumentReference, ...]): Iterable of document
159 references to be retrieved.
160 retry (google.api_core.retry.Retry): Designation of what errors, if any,
161 should be retried. Defaults to a system-specified policy.
162 timeout (float): The timeout for this request. Defaults to a
163 system-specified value.
165 Yields:
166 .DocumentSnapshot: The next document snapshot that fulfills the
167 query, or :data:`None` if the document does not exist.
168 """
169 kwargs = _helpers.make_retry_timeout_kwargs(retry, timeout)
170 return self._client.get_all(references, transaction=self, **kwargs)
172 def get(
173 self,
174 ref_or_query,
175 retry: retries.Retry = gapic_v1.method.DEFAULT,
176 timeout: float = None,
177 ) -> Generator[DocumentSnapshot, Any, None]:
178 """Retrieve a document or a query result from the database.
180 Args:
181 ref_or_query: The document references or query object to return.
182 retry (google.api_core.retry.Retry): Designation of what errors, if any,
183 should be retried. Defaults to a system-specified policy.
184 timeout (float): The timeout for this request. Defaults to a
185 system-specified value.
187 Yields:
188 .DocumentSnapshot: The next document snapshot that fulfills the
189 query, or :data:`None` if the document does not exist.
190 """
191 kwargs = _helpers.make_retry_timeout_kwargs(retry, timeout)
192 if isinstance(ref_or_query, DocumentReference):
193 return self._client.get_all([ref_or_query], transaction=self, **kwargs)
194 elif isinstance(ref_or_query, Query):
195 return ref_or_query.stream(transaction=self, **kwargs)
196 else:
197 raise ValueError(
198 'Value for argument "ref_or_query" must be a DocumentReference or a Query.'
199 )
202class _Transactional(_BaseTransactional):
203 """Provide a callable object to use as a transactional decorater.
205 This is surfaced via
206 :func:`~google.cloud.firestore_v1.transaction.transactional`.
208 Args:
209 to_wrap (Callable[[:class:`~google.cloud.firestore_v1.transaction.Transaction`, ...], Any]):
210 A callable that should be run (and retried) in a transaction.
211 """
213 def __init__(self, to_wrap) -> None:
214 super(_Transactional, self).__init__(to_wrap)
216 def _pre_commit(self, transaction: Transaction, *args, **kwargs) -> Any:
217 """Begin transaction and call the wrapped callable.
219 Args:
220 transaction
221 (:class:`~google.cloud.firestore_v1.transaction.Transaction`):
222 A transaction to execute the callable within.
223 args (Tuple[Any, ...]): The extra positional arguments to pass
224 along to the wrapped callable.
225 kwargs (Dict[str, Any]): The extra keyword arguments to pass
226 along to the wrapped callable.
228 Returns:
229 Any: result of the wrapped callable.
231 Raises:
232 Exception: Any failure caused by ``to_wrap``.
233 """
234 # Force the ``transaction`` to be not "in progress".
235 transaction._clean_up()
236 transaction._begin(retry_id=self.retry_id)
238 # Update the stored transaction IDs.
239 self.current_id = transaction._id
240 if self.retry_id is None:
241 self.retry_id = self.current_id
242 return self.to_wrap(transaction, *args, **kwargs)
244 def __call__(self, transaction: Transaction, *args, **kwargs):
245 """Execute the wrapped callable within a transaction.
247 Args:
248 transaction
249 (:class:`~google.cloud.firestore_v1.transaction.Transaction`):
250 A transaction to execute the callable within.
251 args (Tuple[Any, ...]): The extra positional arguments to pass
252 along to the wrapped callable.
253 kwargs (Dict[str, Any]): The extra keyword arguments to pass
254 along to the wrapped callable.
256 Returns:
257 Any: The result of the wrapped callable.
259 Raises:
260 ValueError: If the transaction does not succeed in
261 ``max_attempts``.
262 """
263 self._reset()
264 retryable_exceptions = (
265 (exceptions.Aborted) if not transaction._read_only else ()
266 )
267 last_exc = None
269 try:
270 for attempt in range(transaction._max_attempts):
271 result = self._pre_commit(transaction, *args, **kwargs)
272 try:
273 transaction._commit()
274 return result
275 except retryable_exceptions as exc:
276 last_exc = exc
277 # Retry attempts that result in retryable exceptions
278 # Subsequent requests will use the failed transaction ID as part of
279 # the ``BeginTransactionRequest`` when restarting this transaction
280 # (via ``options.retry_transaction``). This preserves the "spot in
281 # line" of the transaction, so exponential backoff is not required
282 # in this case.
283 # retries exhausted
284 # wrap the last exception in a ValueError before raising
285 msg = _EXCEED_ATTEMPTS_TEMPLATE.format(transaction._max_attempts)
286 raise ValueError(msg) from last_exc
287 except BaseException: # noqa: B901
288 # rollback the transaction on any error
289 # errors raised during _rollback will be chained to the original error through __context__
290 transaction._rollback()
291 raise
294def transactional(to_wrap: Callable) -> _Transactional:
295 """Decorate a callable so that it runs in a transaction.
297 Args:
298 to_wrap
299 (Callable[[:class:`~google.cloud.firestore_v1.transaction.Transaction`, ...], Any]):
300 A callable that should be run (and retried) in a transaction.
302 Returns:
303 Callable[[:class:`~google.cloud.firestore_v1.transaction.Transaction`, ...], Any]:
304 the wrapped callable.
305 """
306 return _Transactional(to_wrap)
309def _commit_with_retry(
310 client, write_pbs: list, transaction_id: bytes
311) -> CommitResponse:
312 """Call ``Commit`` on the GAPIC client with retry / sleep.
314 Retries the ``Commit`` RPC on Unavailable. Usually this RPC-level
315 retry is handled by the underlying GAPICd client, but in this case it
316 doesn't because ``Commit`` is not always idempotent. But here we know it
317 is "idempotent"-like because it has a transaction ID. We also need to do
318 our own retry to special-case the ``INVALID_ARGUMENT`` error.
320 Args:
321 client (:class:`~google.cloud.firestore_v1.client.Client`):
322 A client with GAPIC client and configuration details.
323 write_pbs (List[:class:`google.cloud.proto.firestore.v1.write.Write`, ...]):
324 A ``Write`` protobuf instance to be committed.
325 transaction_id (bytes):
326 ID of an existing transaction that this commit will run in.
328 Returns:
329 :class:`google.cloud.firestore_v1.types.CommitResponse`:
330 The protobuf response from ``Commit``.
332 Raises:
333 ~google.api_core.exceptions.GoogleAPICallError: If a non-retryable
334 exception is encountered.
335 """
336 current_sleep = _INITIAL_SLEEP
337 while True:
338 try:
339 return client._firestore_api.commit(
340 request={
341 "database": client._database_string,
342 "writes": write_pbs,
343 "transaction": transaction_id,
344 },
345 metadata=client._rpc_metadata,
346 )
347 except exceptions.ServiceUnavailable:
348 # Retry
349 pass
351 current_sleep = _sleep(current_sleep)
354def _sleep(
355 current_sleep: float, max_sleep: float = _MAX_SLEEP, multiplier: float = _MULTIPLIER
356) -> float:
357 """Sleep and produce a new sleep time.
359 .. _Exponential Backoff And Jitter: https://www.awsarchitectureblog.com/\
360 2015/03/backoff.html
362 Select a duration between zero and ``current_sleep``. It might seem
363 counterintuitive to have so much jitter, but
364 `Exponential Backoff And Jitter`_ argues that "full jitter" is
365 the best strategy.
367 Args:
368 current_sleep (float): The current "max" for sleep interval.
369 max_sleep (Optional[float]): Eventual "max" sleep time
370 multiplier (Optional[float]): Multiplier for exponential backoff.
372 Returns:
373 float: Newly doubled ``current_sleep`` or ``max_sleep`` (whichever
374 is smaller)
375 """
376 actual_sleep = random.uniform(0.0, current_sleep)
377 time.sleep(actual_sleep)
378 return min(multiplier * current_sleep, max_sleep)