Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/google/cloud/firestore_v1/async_transaction.py: 33%

91 statements  

« prev     ^ index     » next       coverage.py v7.3.2, created at 2023-12-09 06:27 +0000

1# Copyright 2020 Google LLC All rights reserved. 

2# 

3# Licensed under the Apache License, Version 2.0 (the "License"); 

4# you may not use this file except in compliance with the License. 

5# You may obtain a copy of the License at 

6# 

7# http://www.apache.org/licenses/LICENSE-2.0 

8# 

9# Unless required by applicable law or agreed to in writing, software 

10# distributed under the License is distributed on an "AS IS" BASIS, 

11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

12# See the License for the specific language governing permissions and 

13# limitations under the License. 

14 

15"""Helpers for applying Google Cloud Firestore changes in a transaction.""" 

16 

17 

18import asyncio 

19import random 

20 

21from google.api_core import gapic_v1 

22from google.api_core import retry as retries 

23 

24from google.cloud.firestore_v1.base_transaction import ( 

25 _BaseTransactional, 

26 BaseTransaction, 

27 MAX_ATTEMPTS, 

28 _CANT_BEGIN, 

29 _CANT_ROLLBACK, 

30 _CANT_COMMIT, 

31 _WRITE_READ_ONLY, 

32 _INITIAL_SLEEP, 

33 _MAX_SLEEP, 

34 _MULTIPLIER, 

35 _EXCEED_ATTEMPTS_TEMPLATE, 

36) 

37 

38from google.api_core import exceptions 

39from google.cloud.firestore_v1 import async_batch 

40from google.cloud.firestore_v1 import _helpers 

41from google.cloud.firestore_v1 import types 

42 

43from google.cloud.firestore_v1.async_document import AsyncDocumentReference 

44from google.cloud.firestore_v1.async_document import DocumentSnapshot 

45from google.cloud.firestore_v1.async_query import AsyncQuery 

46from typing import Any, AsyncGenerator, Callable, Coroutine 

47 

48# Types needed only for Type Hints 

49from google.cloud.firestore_v1.client import Client 

50 

51 

52class AsyncTransaction(async_batch.AsyncWriteBatch, BaseTransaction): 

53 """Accumulate read-and-write operations to be sent in a transaction. 

54 

55 Args: 

56 client (:class:`~google.cloud.firestore_v1.client.Client`): 

57 The client that created this transaction. 

58 max_attempts (Optional[int]): The maximum number of attempts for 

59 the transaction (i.e. allowing retries). Defaults to 

60 :attr:`~google.cloud.firestore_v1.transaction.MAX_ATTEMPTS`. 

61 read_only (Optional[bool]): Flag indicating if the transaction 

62 should be read-only or should allow writes. Defaults to 

63 :data:`False`. 

64 """ 

65 

66 def __init__(self, client, max_attempts=MAX_ATTEMPTS, read_only=False) -> None: 

67 super(AsyncTransaction, self).__init__(client) 

68 BaseTransaction.__init__(self, max_attempts, read_only) 

69 

70 def _add_write_pbs(self, write_pbs: list) -> None: 

71 """Add `Write`` protobufs to this transaction. 

72 

73 Args: 

74 write_pbs (List[google.cloud.proto.firestore.v1.\ 

75 write.Write]): A list of write protobufs to be added. 

76 

77 Raises: 

78 ValueError: If this transaction is read-only. 

79 """ 

80 if self._read_only: 

81 raise ValueError(_WRITE_READ_ONLY) 

82 

83 super(AsyncTransaction, self)._add_write_pbs(write_pbs) 

84 

85 async def _begin(self, retry_id: bytes = None) -> None: 

86 """Begin the transaction. 

87 

88 Args: 

89 retry_id (Optional[bytes]): Transaction ID of a transaction to be 

90 retried. 

91 

92 Raises: 

93 ValueError: If the current transaction has already begun. 

94 """ 

95 if self.in_progress: 

96 msg = _CANT_BEGIN.format(self._id) 

97 raise ValueError(msg) 

98 

99 transaction_response = await self._client._firestore_api.begin_transaction( 

100 request={ 

101 "database": self._client._database_string, 

102 "options": self._options_protobuf(retry_id), 

103 }, 

104 metadata=self._client._rpc_metadata, 

105 ) 

106 self._id = transaction_response.transaction 

107 

108 async def _rollback(self) -> None: 

109 """Roll back the transaction. 

110 

111 Raises: 

112 ValueError: If no transaction is in progress. 

113 google.api_core.exceptions.GoogleAPICallError: If the rollback fails. 

114 """ 

115 if not self.in_progress: 

116 raise ValueError(_CANT_ROLLBACK) 

117 

118 try: 

119 # NOTE: The response is just ``google.protobuf.Empty``. 

120 await self._client._firestore_api.rollback( 

121 request={ 

122 "database": self._client._database_string, 

123 "transaction": self._id, 

124 }, 

125 metadata=self._client._rpc_metadata, 

126 ) 

127 finally: 

128 # clean up, even if rollback fails 

129 self._clean_up() 

130 

131 async def _commit(self) -> list: 

132 """Transactionally commit the changes accumulated. 

133 

134 Returns: 

135 List[:class:`google.cloud.proto.firestore.v1.write.WriteResult`, ...]: 

136 The write results corresponding to the changes committed, returned 

137 in the same order as the changes were applied to this transaction. 

138 A write result contains an ``update_time`` field. 

139 

140 Raises: 

141 ValueError: If no transaction is in progress. 

142 """ 

143 if not self.in_progress: 

144 raise ValueError(_CANT_COMMIT) 

145 

146 commit_response = await _commit_with_retry( 

147 self._client, self._write_pbs, self._id 

148 ) 

149 

150 self._clean_up() 

151 return list(commit_response.write_results) 

152 

153 async def get_all( 

154 self, 

155 references: list, 

156 retry: retries.Retry = gapic_v1.method.DEFAULT, 

157 timeout: float = None, 

158 ) -> AsyncGenerator[DocumentSnapshot, Any]: 

159 """Retrieves multiple documents from Firestore. 

160 

161 Args: 

162 references (List[.AsyncDocumentReference, ...]): Iterable of document 

163 references to be retrieved. 

164 retry (google.api_core.retry.Retry): Designation of what errors, if any, 

165 should be retried. Defaults to a system-specified policy. 

166 timeout (float): The timeout for this request. Defaults to a 

167 system-specified value. 

168 

169 Yields: 

170 .DocumentSnapshot: The next document snapshot that fulfills the 

171 query, or :data:`None` if the document does not exist. 

172 """ 

173 kwargs = _helpers.make_retry_timeout_kwargs(retry, timeout) 

174 return await self._client.get_all(references, transaction=self, **kwargs) 

175 

176 async def get( 

177 self, 

178 ref_or_query, 

179 retry: retries.Retry = gapic_v1.method.DEFAULT, 

180 timeout: float = None, 

181 ) -> AsyncGenerator[DocumentSnapshot, Any]: 

182 """ 

183 Retrieve a document or a query result from the database. 

184 

185 Args: 

186 ref_or_query The document references or query object to return. 

187 retry (google.api_core.retry.Retry): Designation of what errors, if any, 

188 should be retried. Defaults to a system-specified policy. 

189 timeout (float): The timeout for this request. Defaults to a 

190 system-specified value. 

191 

192 Yields: 

193 .DocumentSnapshot: The next document snapshot that fulfills the 

194 query, or :data:`None` if the document does not exist. 

195 """ 

196 kwargs = _helpers.make_retry_timeout_kwargs(retry, timeout) 

197 if isinstance(ref_or_query, AsyncDocumentReference): 

198 return await self._client.get_all( 

199 [ref_or_query], transaction=self, **kwargs 

200 ) 

201 elif isinstance(ref_or_query, AsyncQuery): 

202 return await ref_or_query.stream(transaction=self, **kwargs) 

203 else: 

204 raise ValueError( 

205 'Value for argument "ref_or_query" must be a AsyncDocumentReference or a AsyncQuery.' 

206 ) 

207 

208 

209class _AsyncTransactional(_BaseTransactional): 

210 """Provide a callable object to use as a transactional decorater. 

211 

212 This is surfaced via 

213 :func:`~google.cloud.firestore_v1.async_transaction.transactional`. 

214 

215 Args: 

216 to_wrap (Coroutine[[:class:`~google.cloud.firestore_v1.async_transaction.AsyncTransaction`, ...], Any]): 

217 A coroutine that should be run (and retried) in a transaction. 

218 """ 

219 

220 def __init__(self, to_wrap) -> None: 

221 super(_AsyncTransactional, self).__init__(to_wrap) 

222 

223 async def _pre_commit( 

224 self, transaction: AsyncTransaction, *args, **kwargs 

225 ) -> Coroutine: 

226 """Begin transaction and call the wrapped coroutine. 

227 

228 Args: 

229 transaction 

230 (:class:`~google.cloud.firestore_v1.async_transaction.AsyncTransaction`): 

231 A transaction to execute the coroutine within. 

232 args (Tuple[Any, ...]): The extra positional arguments to pass 

233 along to the wrapped coroutine. 

234 kwargs (Dict[str, Any]): The extra keyword arguments to pass 

235 along to the wrapped coroutine. 

236 

237 Returns: 

238 Any: result of the wrapped coroutine. 

239 

240 Raises: 

241 Exception: Any failure caused by ``to_wrap``. 

242 """ 

243 # Force the ``transaction`` to be not "in progress". 

244 transaction._clean_up() 

245 await transaction._begin(retry_id=self.retry_id) 

246 

247 # Update the stored transaction IDs. 

248 self.current_id = transaction._id 

249 if self.retry_id is None: 

250 self.retry_id = self.current_id 

251 return await self.to_wrap(transaction, *args, **kwargs) 

252 

253 async def __call__(self, transaction, *args, **kwargs): 

254 """Execute the wrapped callable within a transaction. 

255 

256 Args: 

257 transaction 

258 (:class:`~google.cloud.firestore_v1.transaction.Transaction`): 

259 A transaction to execute the callable within. 

260 args (Tuple[Any, ...]): The extra positional arguments to pass 

261 along to the wrapped callable. 

262 kwargs (Dict[str, Any]): The extra keyword arguments to pass 

263 along to the wrapped callable. 

264 

265 Returns: 

266 Any: The result of the wrapped callable. 

267 

268 Raises: 

269 ValueError: If the transaction does not succeed in 

270 ``max_attempts``. 

271 """ 

272 self._reset() 

273 retryable_exceptions = ( 

274 (exceptions.Aborted) if not transaction._read_only else () 

275 ) 

276 last_exc = None 

277 

278 try: 

279 for attempt in range(transaction._max_attempts): 

280 result = await self._pre_commit(transaction, *args, **kwargs) 

281 try: 

282 await transaction._commit() 

283 return result 

284 except retryable_exceptions as exc: 

285 last_exc = exc 

286 # Retry attempts that result in retryable exceptions 

287 # Subsequent requests will use the failed transaction ID as part of 

288 # the ``BeginTransactionRequest`` when restarting this transaction 

289 # (via ``options.retry_transaction``). This preserves the "spot in 

290 # line" of the transaction, so exponential backoff is not required 

291 # in this case. 

292 # retries exhausted 

293 # wrap the last exception in a ValueError before raising 

294 msg = _EXCEED_ATTEMPTS_TEMPLATE.format(transaction._max_attempts) 

295 raise ValueError(msg) from last_exc 

296 

297 except BaseException: 

298 # rollback the transaction on any error 

299 # errors raised during _rollback will be chained to the original error through __context__ 

300 await transaction._rollback() 

301 raise 

302 

303 

304def async_transactional( 

305 to_wrap: Callable[[AsyncTransaction], Any] 

306) -> _AsyncTransactional: 

307 """Decorate a callable so that it runs in a transaction. 

308 

309 Args: 

310 to_wrap 

311 (Callable[[:class:`~google.cloud.firestore_v1.transaction.Transaction`, ...], Any]): 

312 A callable that should be run (and retried) in a transaction. 

313 

314 Returns: 

315 Callable[[:class:`~google.cloud.firestore_v1.transaction.Transaction`, ...], Any]: 

316 the wrapped callable. 

317 """ 

318 return _AsyncTransactional(to_wrap) 

319 

320 

321# TODO(crwilcox): this was 'coroutine' from pytype merge-pyi... 

322async def _commit_with_retry( 

323 client: Client, write_pbs: list, transaction_id: bytes 

324) -> types.CommitResponse: 

325 """Call ``Commit`` on the GAPIC client with retry / sleep. 

326 

327 Retries the ``Commit`` RPC on Unavailable. Usually this RPC-level 

328 retry is handled by the underlying GAPICd client, but in this case it 

329 doesn't because ``Commit`` is not always idempotent. But here we know it 

330 is "idempotent"-like because it has a transaction ID. We also need to do 

331 our own retry to special-case the ``INVALID_ARGUMENT`` error. 

332 

333 Args: 

334 client (:class:`~google.cloud.firestore_v1.client.Client`): 

335 A client with GAPIC client and configuration details. 

336 write_pbs (List[:class:`google.cloud.proto.firestore.v1.write.Write`, ...]): 

337 A ``Write`` protobuf instance to be committed. 

338 transaction_id (bytes): 

339 ID of an existing transaction that this commit will run in. 

340 

341 Returns: 

342 :class:`google.cloud.firestore_v1.types.CommitResponse`: 

343 The protobuf response from ``Commit``. 

344 

345 Raises: 

346 ~google.api_core.exceptions.GoogleAPICallError: If a non-retryable 

347 exception is encountered. 

348 """ 

349 current_sleep = _INITIAL_SLEEP 

350 while True: 

351 try: 

352 return await client._firestore_api.commit( 

353 request={ 

354 "database": client._database_string, 

355 "writes": write_pbs, 

356 "transaction": transaction_id, 

357 }, 

358 metadata=client._rpc_metadata, 

359 ) 

360 except exceptions.ServiceUnavailable: 

361 # Retry 

362 pass 

363 

364 current_sleep = await _sleep(current_sleep) 

365 

366 

367async def _sleep( 

368 current_sleep: float, max_sleep: float = _MAX_SLEEP, multiplier: float = _MULTIPLIER 

369) -> float: 

370 """Sleep and produce a new sleep time. 

371 

372 .. _Exponential Backoff And Jitter: https://www.awsarchitectureblog.com/\ 

373 2015/03/backoff.html 

374 

375 Select a duration between zero and ``current_sleep``. It might seem 

376 counterintuitive to have so much jitter, but 

377 `Exponential Backoff And Jitter`_ argues that "full jitter" is 

378 the best strategy. 

379 

380 Args: 

381 current_sleep (float): The current "max" for sleep interval. 

382 max_sleep (Optional[float]): Eventual "max" sleep time 

383 multiplier (Optional[float]): Multiplier for exponential backoff. 

384 

385 Returns: 

386 float: Newly doubled ``current_sleep`` or ``max_sleep`` (whichever 

387 is smaller) 

388 """ 

389 actual_sleep = random.uniform(0.0, current_sleep) 

390 await asyncio.sleep(actual_sleep) 

391 return min(multiplier * current_sleep, max_sleep)