Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/google/cloud/firestore_v1/async_transaction.py: 33%
91 statements
« prev ^ index » next coverage.py v7.3.2, created at 2023-12-09 06:27 +0000
« prev ^ index » next coverage.py v7.3.2, created at 2023-12-09 06:27 +0000
1# Copyright 2020 Google LLC All rights reserved.
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7# http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
15"""Helpers for applying Google Cloud Firestore changes in a transaction."""
18import asyncio
19import random
21from google.api_core import gapic_v1
22from google.api_core import retry as retries
24from google.cloud.firestore_v1.base_transaction import (
25 _BaseTransactional,
26 BaseTransaction,
27 MAX_ATTEMPTS,
28 _CANT_BEGIN,
29 _CANT_ROLLBACK,
30 _CANT_COMMIT,
31 _WRITE_READ_ONLY,
32 _INITIAL_SLEEP,
33 _MAX_SLEEP,
34 _MULTIPLIER,
35 _EXCEED_ATTEMPTS_TEMPLATE,
36)
38from google.api_core import exceptions
39from google.cloud.firestore_v1 import async_batch
40from google.cloud.firestore_v1 import _helpers
41from google.cloud.firestore_v1 import types
43from google.cloud.firestore_v1.async_document import AsyncDocumentReference
44from google.cloud.firestore_v1.async_document import DocumentSnapshot
45from google.cloud.firestore_v1.async_query import AsyncQuery
46from typing import Any, AsyncGenerator, Callable, Coroutine
48# Types needed only for Type Hints
49from google.cloud.firestore_v1.client import Client
52class AsyncTransaction(async_batch.AsyncWriteBatch, BaseTransaction):
53 """Accumulate read-and-write operations to be sent in a transaction.
55 Args:
56 client (:class:`~google.cloud.firestore_v1.client.Client`):
57 The client that created this transaction.
58 max_attempts (Optional[int]): The maximum number of attempts for
59 the transaction (i.e. allowing retries). Defaults to
60 :attr:`~google.cloud.firestore_v1.transaction.MAX_ATTEMPTS`.
61 read_only (Optional[bool]): Flag indicating if the transaction
62 should be read-only or should allow writes. Defaults to
63 :data:`False`.
64 """
66 def __init__(self, client, max_attempts=MAX_ATTEMPTS, read_only=False) -> None:
67 super(AsyncTransaction, self).__init__(client)
68 BaseTransaction.__init__(self, max_attempts, read_only)
70 def _add_write_pbs(self, write_pbs: list) -> None:
71 """Add `Write`` protobufs to this transaction.
73 Args:
74 write_pbs (List[google.cloud.proto.firestore.v1.\
75 write.Write]): A list of write protobufs to be added.
77 Raises:
78 ValueError: If this transaction is read-only.
79 """
80 if self._read_only:
81 raise ValueError(_WRITE_READ_ONLY)
83 super(AsyncTransaction, self)._add_write_pbs(write_pbs)
85 async def _begin(self, retry_id: bytes = None) -> None:
86 """Begin the transaction.
88 Args:
89 retry_id (Optional[bytes]): Transaction ID of a transaction to be
90 retried.
92 Raises:
93 ValueError: If the current transaction has already begun.
94 """
95 if self.in_progress:
96 msg = _CANT_BEGIN.format(self._id)
97 raise ValueError(msg)
99 transaction_response = await self._client._firestore_api.begin_transaction(
100 request={
101 "database": self._client._database_string,
102 "options": self._options_protobuf(retry_id),
103 },
104 metadata=self._client._rpc_metadata,
105 )
106 self._id = transaction_response.transaction
108 async def _rollback(self) -> None:
109 """Roll back the transaction.
111 Raises:
112 ValueError: If no transaction is in progress.
113 google.api_core.exceptions.GoogleAPICallError: If the rollback fails.
114 """
115 if not self.in_progress:
116 raise ValueError(_CANT_ROLLBACK)
118 try:
119 # NOTE: The response is just ``google.protobuf.Empty``.
120 await self._client._firestore_api.rollback(
121 request={
122 "database": self._client._database_string,
123 "transaction": self._id,
124 },
125 metadata=self._client._rpc_metadata,
126 )
127 finally:
128 # clean up, even if rollback fails
129 self._clean_up()
131 async def _commit(self) -> list:
132 """Transactionally commit the changes accumulated.
134 Returns:
135 List[:class:`google.cloud.proto.firestore.v1.write.WriteResult`, ...]:
136 The write results corresponding to the changes committed, returned
137 in the same order as the changes were applied to this transaction.
138 A write result contains an ``update_time`` field.
140 Raises:
141 ValueError: If no transaction is in progress.
142 """
143 if not self.in_progress:
144 raise ValueError(_CANT_COMMIT)
146 commit_response = await _commit_with_retry(
147 self._client, self._write_pbs, self._id
148 )
150 self._clean_up()
151 return list(commit_response.write_results)
153 async def get_all(
154 self,
155 references: list,
156 retry: retries.Retry = gapic_v1.method.DEFAULT,
157 timeout: float = None,
158 ) -> AsyncGenerator[DocumentSnapshot, Any]:
159 """Retrieves multiple documents from Firestore.
161 Args:
162 references (List[.AsyncDocumentReference, ...]): Iterable of document
163 references to be retrieved.
164 retry (google.api_core.retry.Retry): Designation of what errors, if any,
165 should be retried. Defaults to a system-specified policy.
166 timeout (float): The timeout for this request. Defaults to a
167 system-specified value.
169 Yields:
170 .DocumentSnapshot: The next document snapshot that fulfills the
171 query, or :data:`None` if the document does not exist.
172 """
173 kwargs = _helpers.make_retry_timeout_kwargs(retry, timeout)
174 return await self._client.get_all(references, transaction=self, **kwargs)
176 async def get(
177 self,
178 ref_or_query,
179 retry: retries.Retry = gapic_v1.method.DEFAULT,
180 timeout: float = None,
181 ) -> AsyncGenerator[DocumentSnapshot, Any]:
182 """
183 Retrieve a document or a query result from the database.
185 Args:
186 ref_or_query The document references or query object to return.
187 retry (google.api_core.retry.Retry): Designation of what errors, if any,
188 should be retried. Defaults to a system-specified policy.
189 timeout (float): The timeout for this request. Defaults to a
190 system-specified value.
192 Yields:
193 .DocumentSnapshot: The next document snapshot that fulfills the
194 query, or :data:`None` if the document does not exist.
195 """
196 kwargs = _helpers.make_retry_timeout_kwargs(retry, timeout)
197 if isinstance(ref_or_query, AsyncDocumentReference):
198 return await self._client.get_all(
199 [ref_or_query], transaction=self, **kwargs
200 )
201 elif isinstance(ref_or_query, AsyncQuery):
202 return await ref_or_query.stream(transaction=self, **kwargs)
203 else:
204 raise ValueError(
205 'Value for argument "ref_or_query" must be a AsyncDocumentReference or a AsyncQuery.'
206 )
209class _AsyncTransactional(_BaseTransactional):
210 """Provide a callable object to use as a transactional decorater.
212 This is surfaced via
213 :func:`~google.cloud.firestore_v1.async_transaction.transactional`.
215 Args:
216 to_wrap (Coroutine[[:class:`~google.cloud.firestore_v1.async_transaction.AsyncTransaction`, ...], Any]):
217 A coroutine that should be run (and retried) in a transaction.
218 """
220 def __init__(self, to_wrap) -> None:
221 super(_AsyncTransactional, self).__init__(to_wrap)
223 async def _pre_commit(
224 self, transaction: AsyncTransaction, *args, **kwargs
225 ) -> Coroutine:
226 """Begin transaction and call the wrapped coroutine.
228 Args:
229 transaction
230 (:class:`~google.cloud.firestore_v1.async_transaction.AsyncTransaction`):
231 A transaction to execute the coroutine within.
232 args (Tuple[Any, ...]): The extra positional arguments to pass
233 along to the wrapped coroutine.
234 kwargs (Dict[str, Any]): The extra keyword arguments to pass
235 along to the wrapped coroutine.
237 Returns:
238 Any: result of the wrapped coroutine.
240 Raises:
241 Exception: Any failure caused by ``to_wrap``.
242 """
243 # Force the ``transaction`` to be not "in progress".
244 transaction._clean_up()
245 await transaction._begin(retry_id=self.retry_id)
247 # Update the stored transaction IDs.
248 self.current_id = transaction._id
249 if self.retry_id is None:
250 self.retry_id = self.current_id
251 return await self.to_wrap(transaction, *args, **kwargs)
253 async def __call__(self, transaction, *args, **kwargs):
254 """Execute the wrapped callable within a transaction.
256 Args:
257 transaction
258 (:class:`~google.cloud.firestore_v1.transaction.Transaction`):
259 A transaction to execute the callable within.
260 args (Tuple[Any, ...]): The extra positional arguments to pass
261 along to the wrapped callable.
262 kwargs (Dict[str, Any]): The extra keyword arguments to pass
263 along to the wrapped callable.
265 Returns:
266 Any: The result of the wrapped callable.
268 Raises:
269 ValueError: If the transaction does not succeed in
270 ``max_attempts``.
271 """
272 self._reset()
273 retryable_exceptions = (
274 (exceptions.Aborted) if not transaction._read_only else ()
275 )
276 last_exc = None
278 try:
279 for attempt in range(transaction._max_attempts):
280 result = await self._pre_commit(transaction, *args, **kwargs)
281 try:
282 await transaction._commit()
283 return result
284 except retryable_exceptions as exc:
285 last_exc = exc
286 # Retry attempts that result in retryable exceptions
287 # Subsequent requests will use the failed transaction ID as part of
288 # the ``BeginTransactionRequest`` when restarting this transaction
289 # (via ``options.retry_transaction``). This preserves the "spot in
290 # line" of the transaction, so exponential backoff is not required
291 # in this case.
292 # retries exhausted
293 # wrap the last exception in a ValueError before raising
294 msg = _EXCEED_ATTEMPTS_TEMPLATE.format(transaction._max_attempts)
295 raise ValueError(msg) from last_exc
297 except BaseException:
298 # rollback the transaction on any error
299 # errors raised during _rollback will be chained to the original error through __context__
300 await transaction._rollback()
301 raise
304def async_transactional(
305 to_wrap: Callable[[AsyncTransaction], Any]
306) -> _AsyncTransactional:
307 """Decorate a callable so that it runs in a transaction.
309 Args:
310 to_wrap
311 (Callable[[:class:`~google.cloud.firestore_v1.transaction.Transaction`, ...], Any]):
312 A callable that should be run (and retried) in a transaction.
314 Returns:
315 Callable[[:class:`~google.cloud.firestore_v1.transaction.Transaction`, ...], Any]:
316 the wrapped callable.
317 """
318 return _AsyncTransactional(to_wrap)
321# TODO(crwilcox): this was 'coroutine' from pytype merge-pyi...
322async def _commit_with_retry(
323 client: Client, write_pbs: list, transaction_id: bytes
324) -> types.CommitResponse:
325 """Call ``Commit`` on the GAPIC client with retry / sleep.
327 Retries the ``Commit`` RPC on Unavailable. Usually this RPC-level
328 retry is handled by the underlying GAPICd client, but in this case it
329 doesn't because ``Commit`` is not always idempotent. But here we know it
330 is "idempotent"-like because it has a transaction ID. We also need to do
331 our own retry to special-case the ``INVALID_ARGUMENT`` error.
333 Args:
334 client (:class:`~google.cloud.firestore_v1.client.Client`):
335 A client with GAPIC client and configuration details.
336 write_pbs (List[:class:`google.cloud.proto.firestore.v1.write.Write`, ...]):
337 A ``Write`` protobuf instance to be committed.
338 transaction_id (bytes):
339 ID of an existing transaction that this commit will run in.
341 Returns:
342 :class:`google.cloud.firestore_v1.types.CommitResponse`:
343 The protobuf response from ``Commit``.
345 Raises:
346 ~google.api_core.exceptions.GoogleAPICallError: If a non-retryable
347 exception is encountered.
348 """
349 current_sleep = _INITIAL_SLEEP
350 while True:
351 try:
352 return await client._firestore_api.commit(
353 request={
354 "database": client._database_string,
355 "writes": write_pbs,
356 "transaction": transaction_id,
357 },
358 metadata=client._rpc_metadata,
359 )
360 except exceptions.ServiceUnavailable:
361 # Retry
362 pass
364 current_sleep = await _sleep(current_sleep)
367async def _sleep(
368 current_sleep: float, max_sleep: float = _MAX_SLEEP, multiplier: float = _MULTIPLIER
369) -> float:
370 """Sleep and produce a new sleep time.
372 .. _Exponential Backoff And Jitter: https://www.awsarchitectureblog.com/\
373 2015/03/backoff.html
375 Select a duration between zero and ``current_sleep``. It might seem
376 counterintuitive to have so much jitter, but
377 `Exponential Backoff And Jitter`_ argues that "full jitter" is
378 the best strategy.
380 Args:
381 current_sleep (float): The current "max" for sleep interval.
382 max_sleep (Optional[float]): Eventual "max" sleep time
383 multiplier (Optional[float]): Multiplier for exponential backoff.
385 Returns:
386 float: Newly doubled ``current_sleep`` or ``max_sleep`` (whichever
387 is smaller)
388 """
389 actual_sleep = random.uniform(0.0, current_sleep)
390 await asyncio.sleep(actual_sleep)
391 return min(multiplier * current_sleep, max_sleep)