Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/google/cloud/firestore_v1/transaction.py: 32%

90 statements  

« prev     ^ index     » next       coverage.py v7.3.2, created at 2023-12-09 06:27 +0000

1# Copyright 2017 Google LLC All rights reserved. 

2# 

3# Licensed under the Apache License, Version 2.0 (the "License"); 

4# you may not use this file except in compliance with the License. 

5# You may obtain a copy of the License at 

6# 

7# http://www.apache.org/licenses/LICENSE-2.0 

8# 

9# Unless required by applicable law or agreed to in writing, software 

10# distributed under the License is distributed on an "AS IS" BASIS, 

11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

12# See the License for the specific language governing permissions and 

13# limitations under the License. 

14 

15"""Helpers for applying Google Cloud Firestore changes in a transaction.""" 

16 

17 

18import random 

19import time 

20 

21from google.api_core import gapic_v1 

22from google.api_core import retry as retries 

23 

24from google.cloud.firestore_v1.base_transaction import ( 

25 _BaseTransactional, 

26 BaseTransaction, 

27 MAX_ATTEMPTS, 

28 _CANT_BEGIN, 

29 _CANT_ROLLBACK, 

30 _CANT_COMMIT, 

31 _WRITE_READ_ONLY, 

32 _INITIAL_SLEEP, 

33 _MAX_SLEEP, 

34 _MULTIPLIER, 

35 _EXCEED_ATTEMPTS_TEMPLATE, 

36) 

37 

38from google.api_core import exceptions 

39from google.cloud.firestore_v1 import batch 

40from google.cloud.firestore_v1.document import DocumentReference 

41from google.cloud.firestore_v1 import _helpers 

42from google.cloud.firestore_v1.query import Query 

43 

44# Types needed only for Type Hints 

45from google.cloud.firestore_v1.base_document import DocumentSnapshot 

46from google.cloud.firestore_v1.types import CommitResponse 

47from typing import Any, Callable, Generator 

48 

49 

50class Transaction(batch.WriteBatch, BaseTransaction): 

51 """Accumulate read-and-write operations to be sent in a transaction. 

52 

53 Args: 

54 client (:class:`~google.cloud.firestore_v1.client.Client`): 

55 The client that created this transaction. 

56 max_attempts (Optional[int]): The maximum number of attempts for 

57 the transaction (i.e. allowing retries). Defaults to 

58 :attr:`~google.cloud.firestore_v1.transaction.MAX_ATTEMPTS`. 

59 read_only (Optional[bool]): Flag indicating if the transaction 

60 should be read-only or should allow writes. Defaults to 

61 :data:`False`. 

62 """ 

63 

64 def __init__(self, client, max_attempts=MAX_ATTEMPTS, read_only=False) -> None: 

65 super(Transaction, self).__init__(client) 

66 BaseTransaction.__init__(self, max_attempts, read_only) 

67 

68 def _add_write_pbs(self, write_pbs: list) -> None: 

69 """Add `Write`` protobufs to this transaction. 

70 

71 Args: 

72 write_pbs (List[google.cloud.proto.firestore.v1.\ 

73 write.Write]): A list of write protobufs to be added. 

74 

75 Raises: 

76 ValueError: If this transaction is read-only. 

77 """ 

78 if self._read_only: 

79 raise ValueError(_WRITE_READ_ONLY) 

80 

81 super(Transaction, self)._add_write_pbs(write_pbs) 

82 

83 def _begin(self, retry_id: bytes = None) -> None: 

84 """Begin the transaction. 

85 

86 Args: 

87 retry_id (Optional[bytes]): Transaction ID of a transaction to be 

88 retried. 

89 

90 Raises: 

91 ValueError: If the current transaction has already begun. 

92 """ 

93 if self.in_progress: 

94 msg = _CANT_BEGIN.format(self._id) 

95 raise ValueError(msg) 

96 

97 transaction_response = self._client._firestore_api.begin_transaction( 

98 request={ 

99 "database": self._client._database_string, 

100 "options": self._options_protobuf(retry_id), 

101 }, 

102 metadata=self._client._rpc_metadata, 

103 ) 

104 self._id = transaction_response.transaction 

105 

106 def _rollback(self) -> None: 

107 """Roll back the transaction. 

108 

109 Raises: 

110 ValueError: If no transaction is in progress. 

111 google.api_core.exceptions.GoogleAPICallError: If the rollback fails. 

112 """ 

113 if not self.in_progress: 

114 raise ValueError(_CANT_ROLLBACK) 

115 

116 try: 

117 # NOTE: The response is just ``google.protobuf.Empty``. 

118 self._client._firestore_api.rollback( 

119 request={ 

120 "database": self._client._database_string, 

121 "transaction": self._id, 

122 }, 

123 metadata=self._client._rpc_metadata, 

124 ) 

125 finally: 

126 # clean up, even if rollback fails 

127 self._clean_up() 

128 

129 def _commit(self) -> list: 

130 """Transactionally commit the changes accumulated. 

131 

132 Returns: 

133 List[:class:`google.cloud.proto.firestore.v1.write.WriteResult`, ...]: 

134 The write results corresponding to the changes committed, returned 

135 in the same order as the changes were applied to this transaction. 

136 A write result contains an ``update_time`` field. 

137 

138 Raises: 

139 ValueError: If no transaction is in progress. 

140 """ 

141 if not self.in_progress: 

142 raise ValueError(_CANT_COMMIT) 

143 

144 commit_response = _commit_with_retry(self._client, self._write_pbs, self._id) 

145 

146 self._clean_up() 

147 return list(commit_response.write_results) 

148 

149 def get_all( 

150 self, 

151 references: list, 

152 retry: retries.Retry = gapic_v1.method.DEFAULT, 

153 timeout: float = None, 

154 ) -> Generator[DocumentSnapshot, Any, None]: 

155 """Retrieves multiple documents from Firestore. 

156 

157 Args: 

158 references (List[.DocumentReference, ...]): Iterable of document 

159 references to be retrieved. 

160 retry (google.api_core.retry.Retry): Designation of what errors, if any, 

161 should be retried. Defaults to a system-specified policy. 

162 timeout (float): The timeout for this request. Defaults to a 

163 system-specified value. 

164 

165 Yields: 

166 .DocumentSnapshot: The next document snapshot that fulfills the 

167 query, or :data:`None` if the document does not exist. 

168 """ 

169 kwargs = _helpers.make_retry_timeout_kwargs(retry, timeout) 

170 return self._client.get_all(references, transaction=self, **kwargs) 

171 

172 def get( 

173 self, 

174 ref_or_query, 

175 retry: retries.Retry = gapic_v1.method.DEFAULT, 

176 timeout: float = None, 

177 ) -> Generator[DocumentSnapshot, Any, None]: 

178 """Retrieve a document or a query result from the database. 

179 

180 Args: 

181 ref_or_query: The document references or query object to return. 

182 retry (google.api_core.retry.Retry): Designation of what errors, if any, 

183 should be retried. Defaults to a system-specified policy. 

184 timeout (float): The timeout for this request. Defaults to a 

185 system-specified value. 

186 

187 Yields: 

188 .DocumentSnapshot: The next document snapshot that fulfills the 

189 query, or :data:`None` if the document does not exist. 

190 """ 

191 kwargs = _helpers.make_retry_timeout_kwargs(retry, timeout) 

192 if isinstance(ref_or_query, DocumentReference): 

193 return self._client.get_all([ref_or_query], transaction=self, **kwargs) 

194 elif isinstance(ref_or_query, Query): 

195 return ref_or_query.stream(transaction=self, **kwargs) 

196 else: 

197 raise ValueError( 

198 'Value for argument "ref_or_query" must be a DocumentReference or a Query.' 

199 ) 

200 

201 

202class _Transactional(_BaseTransactional): 

203 """Provide a callable object to use as a transactional decorater. 

204 

205 This is surfaced via 

206 :func:`~google.cloud.firestore_v1.transaction.transactional`. 

207 

208 Args: 

209 to_wrap (Callable[[:class:`~google.cloud.firestore_v1.transaction.Transaction`, ...], Any]): 

210 A callable that should be run (and retried) in a transaction. 

211 """ 

212 

213 def __init__(self, to_wrap) -> None: 

214 super(_Transactional, self).__init__(to_wrap) 

215 

216 def _pre_commit(self, transaction: Transaction, *args, **kwargs) -> Any: 

217 """Begin transaction and call the wrapped callable. 

218 

219 Args: 

220 transaction 

221 (:class:`~google.cloud.firestore_v1.transaction.Transaction`): 

222 A transaction to execute the callable within. 

223 args (Tuple[Any, ...]): The extra positional arguments to pass 

224 along to the wrapped callable. 

225 kwargs (Dict[str, Any]): The extra keyword arguments to pass 

226 along to the wrapped callable. 

227 

228 Returns: 

229 Any: result of the wrapped callable. 

230 

231 Raises: 

232 Exception: Any failure caused by ``to_wrap``. 

233 """ 

234 # Force the ``transaction`` to be not "in progress". 

235 transaction._clean_up() 

236 transaction._begin(retry_id=self.retry_id) 

237 

238 # Update the stored transaction IDs. 

239 self.current_id = transaction._id 

240 if self.retry_id is None: 

241 self.retry_id = self.current_id 

242 return self.to_wrap(transaction, *args, **kwargs) 

243 

244 def __call__(self, transaction: Transaction, *args, **kwargs): 

245 """Execute the wrapped callable within a transaction. 

246 

247 Args: 

248 transaction 

249 (:class:`~google.cloud.firestore_v1.transaction.Transaction`): 

250 A transaction to execute the callable within. 

251 args (Tuple[Any, ...]): The extra positional arguments to pass 

252 along to the wrapped callable. 

253 kwargs (Dict[str, Any]): The extra keyword arguments to pass 

254 along to the wrapped callable. 

255 

256 Returns: 

257 Any: The result of the wrapped callable. 

258 

259 Raises: 

260 ValueError: If the transaction does not succeed in 

261 ``max_attempts``. 

262 """ 

263 self._reset() 

264 retryable_exceptions = ( 

265 (exceptions.Aborted) if not transaction._read_only else () 

266 ) 

267 last_exc = None 

268 

269 try: 

270 for attempt in range(transaction._max_attempts): 

271 result = self._pre_commit(transaction, *args, **kwargs) 

272 try: 

273 transaction._commit() 

274 return result 

275 except retryable_exceptions as exc: 

276 last_exc = exc 

277 # Retry attempts that result in retryable exceptions 

278 # Subsequent requests will use the failed transaction ID as part of 

279 # the ``BeginTransactionRequest`` when restarting this transaction 

280 # (via ``options.retry_transaction``). This preserves the "spot in 

281 # line" of the transaction, so exponential backoff is not required 

282 # in this case. 

283 # retries exhausted 

284 # wrap the last exception in a ValueError before raising 

285 msg = _EXCEED_ATTEMPTS_TEMPLATE.format(transaction._max_attempts) 

286 raise ValueError(msg) from last_exc 

287 except BaseException: # noqa: B901 

288 # rollback the transaction on any error 

289 # errors raised during _rollback will be chained to the original error through __context__ 

290 transaction._rollback() 

291 raise 

292 

293 

294def transactional(to_wrap: Callable) -> _Transactional: 

295 """Decorate a callable so that it runs in a transaction. 

296 

297 Args: 

298 to_wrap 

299 (Callable[[:class:`~google.cloud.firestore_v1.transaction.Transaction`, ...], Any]): 

300 A callable that should be run (and retried) in a transaction. 

301 

302 Returns: 

303 Callable[[:class:`~google.cloud.firestore_v1.transaction.Transaction`, ...], Any]: 

304 the wrapped callable. 

305 """ 

306 return _Transactional(to_wrap) 

307 

308 

309def _commit_with_retry( 

310 client, write_pbs: list, transaction_id: bytes 

311) -> CommitResponse: 

312 """Call ``Commit`` on the GAPIC client with retry / sleep. 

313 

314 Retries the ``Commit`` RPC on Unavailable. Usually this RPC-level 

315 retry is handled by the underlying GAPICd client, but in this case it 

316 doesn't because ``Commit`` is not always idempotent. But here we know it 

317 is "idempotent"-like because it has a transaction ID. We also need to do 

318 our own retry to special-case the ``INVALID_ARGUMENT`` error. 

319 

320 Args: 

321 client (:class:`~google.cloud.firestore_v1.client.Client`): 

322 A client with GAPIC client and configuration details. 

323 write_pbs (List[:class:`google.cloud.proto.firestore.v1.write.Write`, ...]): 

324 A ``Write`` protobuf instance to be committed. 

325 transaction_id (bytes): 

326 ID of an existing transaction that this commit will run in. 

327 

328 Returns: 

329 :class:`google.cloud.firestore_v1.types.CommitResponse`: 

330 The protobuf response from ``Commit``. 

331 

332 Raises: 

333 ~google.api_core.exceptions.GoogleAPICallError: If a non-retryable 

334 exception is encountered. 

335 """ 

336 current_sleep = _INITIAL_SLEEP 

337 while True: 

338 try: 

339 return client._firestore_api.commit( 

340 request={ 

341 "database": client._database_string, 

342 "writes": write_pbs, 

343 "transaction": transaction_id, 

344 }, 

345 metadata=client._rpc_metadata, 

346 ) 

347 except exceptions.ServiceUnavailable: 

348 # Retry 

349 pass 

350 

351 current_sleep = _sleep(current_sleep) 

352 

353 

354def _sleep( 

355 current_sleep: float, max_sleep: float = _MAX_SLEEP, multiplier: float = _MULTIPLIER 

356) -> float: 

357 """Sleep and produce a new sleep time. 

358 

359 .. _Exponential Backoff And Jitter: https://www.awsarchitectureblog.com/\ 

360 2015/03/backoff.html 

361 

362 Select a duration between zero and ``current_sleep``. It might seem 

363 counterintuitive to have so much jitter, but 

364 `Exponential Backoff And Jitter`_ argues that "full jitter" is 

365 the best strategy. 

366 

367 Args: 

368 current_sleep (float): The current "max" for sleep interval. 

369 max_sleep (Optional[float]): Eventual "max" sleep time 

370 multiplier (Optional[float]): Multiplier for exponential backoff. 

371 

372 Returns: 

373 float: Newly doubled ``current_sleep`` or ``max_sleep`` (whichever 

374 is smaller) 

375 """ 

376 actual_sleep = random.uniform(0.0, current_sleep) 

377 time.sleep(actual_sleep) 

378 return min(multiplier * current_sleep, max_sleep)