1# Copyright 2017 Google LLC All rights reserved.
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7# http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14
15"""Helpers for applying Google Cloud Firestore changes in a transaction."""
16from __future__ import annotations
17
18from typing import (
19 TYPE_CHECKING,
20 Any,
21 AsyncGenerator,
22 Coroutine,
23 Generator,
24 Optional,
25 Union,
26)
27
28from google.api_core import retry as retries
29
30from google.cloud.firestore_v1 import types
31
32# Types needed only for Type Hints
33if TYPE_CHECKING: # pragma: NO COVER
34 from google.cloud.firestore_v1.async_stream_generator import AsyncStreamGenerator
35 from google.cloud.firestore_v1.document import DocumentSnapshot
36 from google.cloud.firestore_v1.query_profile import ExplainOptions
37 from google.cloud.firestore_v1.stream_generator import StreamGenerator
38 from google.cloud.firestore_v1.types import write as write_pb
39
40 import datetime
41
42
43MAX_ATTEMPTS = 5
44"""int: Default number of transaction attempts (with retries)."""
45_CANT_BEGIN: str = "The transaction has already begun. Current transaction ID: {!r}."
46_MISSING_ID_TEMPLATE: str = "The transaction has no transaction ID, so it cannot be {}."
47_CANT_ROLLBACK: str = _MISSING_ID_TEMPLATE.format("rolled back")
48_CANT_COMMIT: str = _MISSING_ID_TEMPLATE.format("committed")
49_WRITE_READ_ONLY: str = "Cannot perform write operation in read-only transaction."
50_EXCEED_ATTEMPTS_TEMPLATE: str = "Failed to commit transaction in {:d} attempts."
51_CANT_RETRY_READ_ONLY: str = "Only read-write transactions can be retried."
52
53
54class BaseTransaction(object):
55 """Accumulate read-and-write operations to be sent in a transaction.
56
57 Args:
58 max_attempts (Optional[int]): The maximum number of attempts for
59 the transaction (i.e. allowing retries). Defaults to
60 :attr:`~google.cloud.firestore_v1.transaction.MAX_ATTEMPTS`.
61 read_only (Optional[bool]): Flag indicating if the transaction
62 should be read-only or should allow writes. Defaults to
63 :data:`False`.
64 """
65
66 def __init__(self, max_attempts=MAX_ATTEMPTS, read_only=False) -> None:
67 self._max_attempts = max_attempts
68 self._read_only = read_only
69 self._id = None
70
71 def _add_write_pbs(self, write_pbs: list[write_pb.Write]):
72 raise NotImplementedError
73
74 def _options_protobuf(
75 self, retry_id: Union[bytes, None]
76 ) -> Optional[types.common.TransactionOptions]:
77 """Convert the current object to protobuf.
78
79 The ``retry_id`` value is used when retrying a transaction that
80 failed (e.g. due to contention). It is intended to be the "first"
81 transaction that failed (i.e. if multiple retries are needed).
82
83 Args:
84 retry_id (Union[bytes, NoneType]): Transaction ID of a transaction
85 to be retried.
86
87 Returns:
88 Optional[google.cloud.firestore_v1.types.TransactionOptions]:
89 The protobuf ``TransactionOptions`` if ``read_only==True`` or if
90 there is a transaction ID to be retried, else :data:`None`.
91
92 Raises:
93 ValueError: If ``retry_id`` is not :data:`None` but the
94 transaction is read-only.
95 """
96 if retry_id is not None:
97 if self._read_only:
98 raise ValueError(_CANT_RETRY_READ_ONLY)
99
100 return types.TransactionOptions(
101 read_write=types.TransactionOptions.ReadWrite(
102 retry_transaction=retry_id
103 )
104 )
105 elif self._read_only:
106 return types.TransactionOptions(
107 read_only=types.TransactionOptions.ReadOnly()
108 )
109 else:
110 return None
111
112 @property
113 def in_progress(self):
114 """Determine if this transaction has already begun.
115
116 Returns:
117 bool: Indicates if the transaction has started.
118 """
119 return self._id is not None
120
121 @property
122 def id(self):
123 """Get the current transaction ID.
124
125 Returns:
126 Optional[bytes]: The transaction ID (or :data:`None` if the
127 current transaction is not in progress).
128 """
129 return self._id
130
131 def _clean_up(self) -> None:
132 """Clean up the instance after :meth:`_rollback`` or :meth:`_commit``.
133
134 This intended to occur on success or failure of the associated RPCs.
135 """
136 self._write_pbs: list[write_pb.Write] = []
137 self._id = None
138
139 def _begin(self, retry_id=None):
140 raise NotImplementedError
141
142 def _rollback(self):
143 raise NotImplementedError
144
145 def _commit(self) -> Union[list, Coroutine[Any, Any, list]]:
146 raise NotImplementedError
147
148 def get_all(
149 self,
150 references: list,
151 retry: retries.Retry | retries.AsyncRetry | object | None = None,
152 timeout: float | None = None,
153 *,
154 read_time: datetime.datetime | None = None,
155 ) -> (
156 Generator[DocumentSnapshot, Any, None]
157 | Coroutine[Any, Any, AsyncGenerator[DocumentSnapshot, Any]]
158 ):
159 raise NotImplementedError
160
161 def get(
162 self,
163 ref_or_query,
164 retry: retries.Retry | retries.AsyncRetry | object | None = None,
165 timeout: float | None = None,
166 *,
167 explain_options: Optional[ExplainOptions] = None,
168 read_time: Optional[datetime.datetime] = None,
169 ) -> (
170 StreamGenerator[DocumentSnapshot]
171 | Generator[DocumentSnapshot, Any, None]
172 | Coroutine[Any, Any, AsyncGenerator[DocumentSnapshot, Any]]
173 | Coroutine[Any, Any, AsyncStreamGenerator[DocumentSnapshot]]
174 ):
175 raise NotImplementedError
176
177
178class _BaseTransactional(object):
179 """Provide a callable object to use as a transactional decorater.
180
181 This is surfaced via
182 :func:`~google.cloud.firestore_v1.transaction.transactional`.
183
184 Args:
185 to_wrap (Callable[[:class:`~google.cloud.firestore_v1.transaction.Transaction`, ...], Any]):
186 A callable that should be run (and retried) in a transaction.
187 """
188
189 def __init__(self, to_wrap) -> None:
190 self.to_wrap = to_wrap
191 self.current_id = None
192 """Optional[bytes]: The current transaction ID."""
193 self.retry_id = None
194 """Optional[bytes]: The ID of the first attempted transaction."""
195
196 def _reset(self) -> None:
197 """Unset the transaction IDs."""
198 self.current_id = None
199 self.retry_id = None
200
201 def _pre_commit(self, transaction, *args, **kwargs):
202 raise NotImplementedError
203
204 def __call__(self, transaction, *args, **kwargs):
205 raise NotImplementedError