Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/google/cloud/firestore_v1/base_transaction.py: 52%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

63 statements  

1# Copyright 2017 Google LLC All rights reserved. 

2# 

3# Licensed under the Apache License, Version 2.0 (the "License"); 

4# you may not use this file except in compliance with the License. 

5# You may obtain a copy of the License at 

6# 

7# http://www.apache.org/licenses/LICENSE-2.0 

8# 

9# Unless required by applicable law or agreed to in writing, software 

10# distributed under the License is distributed on an "AS IS" BASIS, 

11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

12# See the License for the specific language governing permissions and 

13# limitations under the License. 

14 

15"""Helpers for applying Google Cloud Firestore changes in a transaction.""" 

16from __future__ import annotations 

17 

18from typing import ( 

19 TYPE_CHECKING, 

20 Any, 

21 AsyncGenerator, 

22 Coroutine, 

23 Generator, 

24 Optional, 

25 Union, 

26) 

27 

28from google.api_core import retry as retries 

29 

30from google.cloud.firestore_v1 import types 

31 

32# Types needed only for Type Hints 

33if TYPE_CHECKING: # pragma: NO COVER 

34 from google.cloud.firestore_v1.async_stream_generator import AsyncStreamGenerator 

35 from google.cloud.firestore_v1.document import DocumentSnapshot 

36 from google.cloud.firestore_v1.query_profile import ExplainOptions 

37 from google.cloud.firestore_v1.stream_generator import StreamGenerator 

38 from google.cloud.firestore_v1.types import write as write_pb 

39 

40 import datetime 

41 

42 

43MAX_ATTEMPTS = 5 

44"""int: Default number of transaction attempts (with retries).""" 

45_CANT_BEGIN: str = "The transaction has already begun. Current transaction ID: {!r}." 

46_MISSING_ID_TEMPLATE: str = "The transaction has no transaction ID, so it cannot be {}." 

47_CANT_ROLLBACK: str = _MISSING_ID_TEMPLATE.format("rolled back") 

48_CANT_COMMIT: str = _MISSING_ID_TEMPLATE.format("committed") 

49_WRITE_READ_ONLY: str = "Cannot perform write operation in read-only transaction." 

50_EXCEED_ATTEMPTS_TEMPLATE: str = "Failed to commit transaction in {:d} attempts." 

51_CANT_RETRY_READ_ONLY: str = "Only read-write transactions can be retried." 

52 

53 

54class BaseTransaction(object): 

55 """Accumulate read-and-write operations to be sent in a transaction. 

56 

57 Args: 

58 max_attempts (Optional[int]): The maximum number of attempts for 

59 the transaction (i.e. allowing retries). Defaults to 

60 :attr:`~google.cloud.firestore_v1.transaction.MAX_ATTEMPTS`. 

61 read_only (Optional[bool]): Flag indicating if the transaction 

62 should be read-only or should allow writes. Defaults to 

63 :data:`False`. 

64 """ 

65 

66 def __init__(self, max_attempts=MAX_ATTEMPTS, read_only=False) -> None: 

67 self._max_attempts = max_attempts 

68 self._read_only = read_only 

69 self._id = None 

70 

71 def _add_write_pbs(self, write_pbs: list[write_pb.Write]): 

72 raise NotImplementedError 

73 

74 def _options_protobuf( 

75 self, retry_id: Union[bytes, None] 

76 ) -> Optional[types.common.TransactionOptions]: 

77 """Convert the current object to protobuf. 

78 

79 The ``retry_id`` value is used when retrying a transaction that 

80 failed (e.g. due to contention). It is intended to be the "first" 

81 transaction that failed (i.e. if multiple retries are needed). 

82 

83 Args: 

84 retry_id (Union[bytes, NoneType]): Transaction ID of a transaction 

85 to be retried. 

86 

87 Returns: 

88 Optional[google.cloud.firestore_v1.types.TransactionOptions]: 

89 The protobuf ``TransactionOptions`` if ``read_only==True`` or if 

90 there is a transaction ID to be retried, else :data:`None`. 

91 

92 Raises: 

93 ValueError: If ``retry_id`` is not :data:`None` but the 

94 transaction is read-only. 

95 """ 

96 if retry_id is not None: 

97 if self._read_only: 

98 raise ValueError(_CANT_RETRY_READ_ONLY) 

99 

100 return types.TransactionOptions( 

101 read_write=types.TransactionOptions.ReadWrite( 

102 retry_transaction=retry_id 

103 ) 

104 ) 

105 elif self._read_only: 

106 return types.TransactionOptions( 

107 read_only=types.TransactionOptions.ReadOnly() 

108 ) 

109 else: 

110 return None 

111 

112 @property 

113 def in_progress(self): 

114 """Determine if this transaction has already begun. 

115 

116 Returns: 

117 bool: Indicates if the transaction has started. 

118 """ 

119 return self._id is not None 

120 

121 @property 

122 def id(self): 

123 """Get the current transaction ID. 

124 

125 Returns: 

126 Optional[bytes]: The transaction ID (or :data:`None` if the 

127 current transaction is not in progress). 

128 """ 

129 return self._id 

130 

131 def _clean_up(self) -> None: 

132 """Clean up the instance after :meth:`_rollback`` or :meth:`_commit``. 

133 

134 This intended to occur on success or failure of the associated RPCs. 

135 """ 

136 self._write_pbs: list[write_pb.Write] = [] 

137 self._id = None 

138 

139 def _begin(self, retry_id=None): 

140 raise NotImplementedError 

141 

142 def _rollback(self): 

143 raise NotImplementedError 

144 

145 def _commit(self) -> Union[list, Coroutine[Any, Any, list]]: 

146 raise NotImplementedError 

147 

148 def get_all( 

149 self, 

150 references: list, 

151 retry: retries.Retry | retries.AsyncRetry | object | None = None, 

152 timeout: float | None = None, 

153 *, 

154 read_time: datetime.datetime | None = None, 

155 ) -> ( 

156 Generator[DocumentSnapshot, Any, None] 

157 | Coroutine[Any, Any, AsyncGenerator[DocumentSnapshot, Any]] 

158 ): 

159 raise NotImplementedError 

160 

161 def get( 

162 self, 

163 ref_or_query, 

164 retry: retries.Retry | retries.AsyncRetry | object | None = None, 

165 timeout: float | None = None, 

166 *, 

167 explain_options: Optional[ExplainOptions] = None, 

168 read_time: Optional[datetime.datetime] = None, 

169 ) -> ( 

170 StreamGenerator[DocumentSnapshot] 

171 | Generator[DocumentSnapshot, Any, None] 

172 | Coroutine[Any, Any, AsyncGenerator[DocumentSnapshot, Any]] 

173 | Coroutine[Any, Any, AsyncStreamGenerator[DocumentSnapshot]] 

174 ): 

175 raise NotImplementedError 

176 

177 

178class _BaseTransactional(object): 

179 """Provide a callable object to use as a transactional decorater. 

180 

181 This is surfaced via 

182 :func:`~google.cloud.firestore_v1.transaction.transactional`. 

183 

184 Args: 

185 to_wrap (Callable[[:class:`~google.cloud.firestore_v1.transaction.Transaction`, ...], Any]): 

186 A callable that should be run (and retried) in a transaction. 

187 """ 

188 

189 def __init__(self, to_wrap) -> None: 

190 self.to_wrap = to_wrap 

191 self.current_id = None 

192 """Optional[bytes]: The current transaction ID.""" 

193 self.retry_id = None 

194 """Optional[bytes]: The ID of the first attempted transaction.""" 

195 

196 def _reset(self) -> None: 

197 """Unset the transaction IDs.""" 

198 self.current_id = None 

199 self.retry_id = None 

200 

201 def _pre_commit(self, transaction, *args, **kwargs): 

202 raise NotImplementedError 

203 

204 def __call__(self, transaction, *args, **kwargs): 

205 raise NotImplementedError