Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/google/cloud/firestore_v1/bulk_writer.py: 41%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

274 statements  

1# Copyright 2021 Google LLC All rights reserved. 

2# 

3# Licensed under the Apache License, Version 2.0 (the "License"); 

4# you may not use this file except in compliance with the License. 

5# You may obtain a copy of the License at 

6# 

7# http://www.apache.org/licenses/LICENSE-2.0 

8# 

9# Unless required by applicable law or agreed to in writing, software 

10# distributed under the License is distributed on an "AS IS" BASIS, 

11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

12# See the License for the specific language governing permissions and 

13# limitations under the License. 

14 

15"""Helpers for efficiently writing large amounts of data to the Google Cloud 

16Firestore API.""" 

17 

18import bisect 

19import collections 

20import concurrent.futures 

21import datetime 

22import enum 

23import functools 

24import logging 

25import time 

26from dataclasses import dataclass 

27from typing import TYPE_CHECKING, Callable, Deque, Dict, List, Optional, Union 

28 

29from google.rpc import status_pb2 # type: ignore 

30 

31from google.cloud.firestore_v1 import _helpers 

32from google.cloud.firestore_v1.base_document import BaseDocumentReference 

33from google.cloud.firestore_v1.bulk_batch import BulkWriteBatch 

34from google.cloud.firestore_v1.rate_limiter import RateLimiter 

35from google.cloud.firestore_v1.types.firestore import BatchWriteResponse 

36from google.cloud.firestore_v1.types.write import WriteResult 

37 

38if TYPE_CHECKING: 

39 from google.cloud.firestore_v1.base_client import BaseClient # pragma: NO COVER 

40 

41 

42logger = logging.getLogger(__name__) 

43 

44 

45class BulkRetry(enum.Enum): 

46 """Indicator for what retry strategy the BulkWriter should use.""" 

47 

48 # Common exponential backoff algorithm. This strategy is largely incompatible 

49 # with the default retry limit of 15, so use with caution. 

50 exponential = enum.auto() 

51 

52 # Default strategy that adds 1 second of delay per retry. 

53 linear = enum.auto() 

54 

55 # Immediate retries with no growing delays. 

56 immediate = enum.auto() 

57 

58 

59class SendMode(enum.Enum): 

60 """Indicator for whether a BulkWriter should commit batches in the main 

61 thread or hand that work off to an executor.""" 

62 

63 # Default strategy that parallelizes network I/O on an executor. You almost 

64 # certainly want this. 

65 parallel = enum.auto() 

66 

67 # Alternate strategy which blocks during all network I/O. Much slower, but 

68 # assures all batches are sent to the server in order. Note that 

69 # `SendMode.serial` is extremely susceptible to slowdowns from retries if 

70 # there are a lot of errors. 

71 serial = enum.auto() 

72 

73 

74class AsyncBulkWriterMixin: 

75 """ 

76 Mixin which contains the methods on `BulkWriter` which must only be 

77 submitted to the executor (or called by functions submitted to the executor). 

78 This mixin exists purely for organization and clarity of implementation 

79 (e.g., there is no metaclass magic). 

80 

81 The entrypoint to the parallelizable code path is `_send_batch()`, which is 

82 wrapped in a decorator which ensures that the `SendMode` is honored. 

83 """ 

84 

85 def _with_send_mode(fn: Callable): # type: ignore 

86 """Decorates a method to ensure it is only called via the executor 

87 (IFF the SendMode value is SendMode.parallel!). 

88 

89 Usage: 

90 

91 @_with_send_mode 

92 def my_method(self): 

93 parallel_stuff() 

94 

95 def something_else(self): 

96 # Because of the decorator around `my_method`, the following 

97 # method invocation: 

98 self.my_method() 

99 # becomes equivalent to `self._executor.submit(self.my_method)` 

100 # when the send mode is `SendMode.parallel`. 

101 

102 Use on entrypoint methods for code paths that *must* be parallelized. 

103 """ 

104 

105 @functools.wraps(fn) 

106 def wrapper(self, *args, **kwargs): 

107 if self._send_mode == SendMode.parallel: 

108 return self._executor.submit(lambda: fn(self, *args, **kwargs)) 

109 else: 

110 # For code parity, even `SendMode.serial` scenarios should return 

111 # a future here. Anything else would badly complicate calling code. 

112 result = fn(self, *args, **kwargs) 

113 future: concurrent.futures.Future = concurrent.futures.Future() 

114 future.set_result(result) 

115 return future 

116 

117 return wrapper 

118 

119 @_with_send_mode 

120 def _send_batch( # type: ignore 

121 self: "BulkWriter", 

122 batch: BulkWriteBatch, 

123 operations: List["BulkWriterOperation"], 

124 ): 

125 """Sends a batch without regard to rate limits, meaning limits must have 

126 already been checked. To that end, do not call this directly; instead, 

127 call `_send_until_queue_is_empty`. 

128 

129 Args: 

130 batch(:class:`~google.cloud.firestore_v1.base_batch.BulkWriteBatch`) 

131 """ 

132 _len_batch: int = len(batch) 

133 self._in_flight_documents += _len_batch 

134 response: BatchWriteResponse = self._send(batch) 

135 self._in_flight_documents -= _len_batch 

136 

137 # Update bookkeeping totals 

138 self._total_batches_sent += 1 

139 self._total_write_operations += _len_batch 

140 

141 self._process_response(batch, response, operations) 

142 

143 def _process_response( # type: ignore 

144 self: "BulkWriter", 

145 batch: BulkWriteBatch, 

146 response: BatchWriteResponse, 

147 operations: List["BulkWriterOperation"], 

148 ): 

149 """Invokes submitted callbacks for each batch and each operation within 

150 each batch. As this is called from `_send_batch()`, this is parallelized 

151 if we are in that mode. 

152 """ 

153 batch_references: List[BaseDocumentReference] = list( 

154 batch._document_references.values(), 

155 ) 

156 self._batch_callback(batch, response, self) 

157 

158 status: status_pb2.Status 

159 for index, status in enumerate(response.status): 

160 if status.code == 0: 

161 self._success_callback( 

162 # DocumentReference 

163 batch_references[index], 

164 # WriteResult 

165 response.write_results[index], 

166 # BulkWriter 

167 self, 

168 ) 

169 else: 

170 operation: BulkWriterOperation = operations[index] 

171 should_retry: bool = self._error_callback( 

172 # BulkWriteFailure 

173 BulkWriteFailure( 

174 operation=operation, 

175 code=status.code, 

176 message=status.message, 

177 ), 

178 # BulkWriter 

179 self, 

180 ) 

181 if should_retry: 

182 operation.attempts += 1 

183 self._retry_operation(operation) 

184 

185 def _retry_operation( # type: ignore 

186 self: "BulkWriter", 

187 operation: "BulkWriterOperation", 

188 ): 

189 delay: int = 0 

190 if self._options.retry == BulkRetry.exponential: 

191 delay = operation.attempts**2 # pragma: NO COVER 

192 elif self._options.retry == BulkRetry.linear: 

193 delay = operation.attempts 

194 

195 run_at = datetime.datetime.now(tz=datetime.timezone.utc) + datetime.timedelta( 

196 seconds=delay 

197 ) 

198 

199 # Use of `bisect.insort` maintains the requirement that `self._retries` 

200 # always remain sorted by each object's `run_at` time. Note that it is 

201 # able to do this because `OperationRetry` instances are entirely sortable 

202 # by their `run_at` value. 

203 bisect.insort( 

204 self._retries, 

205 OperationRetry(operation=operation, run_at=run_at), 

206 ) 

207 

208 def _send(self, batch: BulkWriteBatch) -> BatchWriteResponse: 

209 """Hook for overwriting the sending of batches. As this is only called 

210 from `_send_batch()`, this is parallelized if we are in that mode. 

211 """ 

212 return batch.commit() # pragma: NO COVER 

213 

214 

215class BulkWriter(AsyncBulkWriterMixin): 

216 """ 

217 Accumulate and efficiently save large amounts of document write operations 

218 to the server. 

219 

220 BulkWriter can handle large data migrations or updates, buffering records 

221 in memory and submitting them to the server in batches of 20. 

222 

223 The submission of batches is internally parallelized with a ThreadPoolExecutor, 

224 meaning end developers do not need to manage an event loop or worry about asyncio 

225 to see parallelization speed ups (which can easily 10x throughput). Because 

226 of this, there is no companion `AsyncBulkWriter` class, as is usually seen 

227 with other utility classes. 

228 

229 Usage: 

230 

231 .. code-block:: python 

232 

233 # Instantiate the BulkWriter. This works from either `Client` or 

234 # `AsyncClient`. 

235 db = firestore.Client() 

236 bulk_writer = db.bulk_writer() 

237 

238 # Attach an optional success listener to be called once per document. 

239 bulk_writer.on_write_result( 

240 lambda reference, result, bulk_writer: print(f'Saved {reference._document_path}') 

241 ) 

242 

243 # Queue an arbitrary amount of write operations. 

244 # Assume `my_new_records` is a list of (DocumentReference, dict,) 

245 # tuple-pairs that you supply. 

246 

247 reference: DocumentReference 

248 data: dict 

249 for reference, data in my_new_records: 

250 bulk_writer.create(reference, data) 

251 

252 # Block until all pooled writes are complete. 

253 bulk_writer.flush() 

254 

255 Args: 

256 client(:class:`~google.cloud.firestore_v1.client.Client`): 

257 The client that created this BulkWriter. 

258 """ 

259 

260 batch_size: int = 20 

261 

262 def __init__( 

263 self, 

264 client: Optional["BaseClient"] = None, 

265 options: Optional["BulkWriterOptions"] = None, 

266 ): 

267 # Because `BulkWriter` instances are all synchronous/blocking on the 

268 # main thread (instead using other threads for asynchrony), it is 

269 # incompatible with AsyncClient's various methods that return Futures. 

270 # `BulkWriter` parallelizes all of its network I/O without the developer 

271 # having to worry about awaiting async methods, so we must convert an 

272 # AsyncClient instance into a plain Client instance. 

273 if type(client).__name__ == "AsyncClient": 

274 self._client = client._to_sync_copy() # type: ignore 

275 else: 

276 self._client = client 

277 self._options = options or BulkWriterOptions() 

278 self._send_mode = self._options.mode 

279 

280 self._operations: List[BulkWriterOperation] 

281 # List of the `_document_path` attribute for each DocumentReference 

282 # contained in the current `self._operations`. This is reset every time 

283 # `self._operations` is reset. 

284 self._operations_document_paths: List[BaseDocumentReference] 

285 self._reset_operations() 

286 

287 # List of all `BulkWriterOperation` objects that are waiting to be retried. 

288 # Each such object is wrapped in an `OperationRetry` object which pairs 

289 # the raw operation with the `datetime` of its next scheduled attempt. 

290 # `self._retries` must always remain sorted for efficient reads, so it is 

291 # required to only ever add elements via `bisect.insort`. 

292 self._retries: Deque["OperationRetry"] = collections.deque([]) 

293 

294 self._queued_batches: Deque[List[BulkWriterOperation]] = collections.deque([]) 

295 self._is_open: bool = True 

296 

297 # This list will go on to store the future returned from each submission 

298 # to the executor, for the purpose of awaiting all of those futures' 

299 # completions in the `flush` method. 

300 self._pending_batch_futures: List[concurrent.futures.Future] = [] 

301 

302 self._success_callback: Callable[ 

303 [BaseDocumentReference, WriteResult, "BulkWriter"], None 

304 ] = BulkWriter._default_on_success 

305 self._batch_callback: Callable[ 

306 [BulkWriteBatch, BatchWriteResponse, "BulkWriter"], None 

307 ] = BulkWriter._default_on_batch 

308 self._error_callback: Callable[ 

309 [BulkWriteFailure, BulkWriter], bool 

310 ] = BulkWriter._default_on_error 

311 

312 self._in_flight_documents: int = 0 

313 self._rate_limiter = RateLimiter( 

314 initial_tokens=self._options.initial_ops_per_second, 

315 global_max_tokens=self._options.max_ops_per_second, 

316 ) 

317 

318 # Keep track of progress as batches and write operations are completed 

319 self._total_batches_sent: int = 0 

320 self._total_write_operations: int = 0 

321 

322 self._executor: concurrent.futures.ThreadPoolExecutor 

323 self._ensure_executor() 

324 

325 @staticmethod 

326 def _default_on_batch( 

327 batch: BulkWriteBatch, 

328 response: BatchWriteResponse, 

329 bulk_writer: "BulkWriter", 

330 ) -> None: 

331 pass 

332 

333 @staticmethod 

334 def _default_on_success( 

335 reference: BaseDocumentReference, 

336 result: WriteResult, 

337 bulk_writer: "BulkWriter", 

338 ) -> None: 

339 pass 

340 

341 @staticmethod 

342 def _default_on_error(error: "BulkWriteFailure", bulk_writer: "BulkWriter") -> bool: 

343 # Default number of retries for each operation is 15. This is a scary 

344 # number to combine with an exponential backoff, and as such, our default 

345 # backoff strategy is linear instead of exponential. 

346 return error.attempts < 15 

347 

348 def _reset_operations(self) -> None: 

349 self._operations = [] 

350 self._operations_document_paths = [] 

351 

352 def _ensure_executor(self): 

353 """Reboots the executor used to send batches if it has been shutdown.""" 

354 if getattr(self, "_executor", None) is None or self._executor._shutdown: 

355 self._executor = self._instantiate_executor() 

356 

357 def _ensure_sending(self): 

358 self._ensure_executor() 

359 self._send_until_queue_is_empty() 

360 

361 def _instantiate_executor(self): 

362 return concurrent.futures.ThreadPoolExecutor() 

363 

364 def flush(self): 

365 """ 

366 Block until all pooled write operations are complete and then resume 

367 accepting new write operations. 

368 """ 

369 # Calling `flush` consecutively is a no-op. 

370 if self._executor._shutdown: 

371 return 

372 

373 while True: 

374 # Queue any waiting operations and try our luck again. 

375 # This can happen if users add a number of records not divisible by 

376 # 20 and then call flush (which should be ~19 out of 20 use cases). 

377 # Execution will arrive here and find the leftover operations that 

378 # never filled up a batch organically, and so we must send them here. 

379 if self._operations: 

380 self._enqueue_current_batch() 

381 continue 

382 

383 # If we find queued but unsent batches or pending retries, begin 

384 # sending immediately. Note that if we are waiting on retries, but 

385 # they have longer to wait as specified by the retry backoff strategy, 

386 # we may have to make several passes through this part of the loop. 

387 # (This is related to the sleep and its explanation below.) 

388 if self._queued_batches or self._retries: 

389 self._ensure_sending() 

390 

391 # This sleep prevents max-speed laps through this loop, which can 

392 # and will happen if the BulkWriter is doing nothing except waiting 

393 # on retries to be ready to re-send. Removing this sleep will cause 

394 # whatever thread is running this code to sit near 100% CPU until 

395 # all retries are abandoned or successfully resolved. 

396 time.sleep(0.1) 

397 continue 

398 

399 # We store the executor's Future from each batch send operation, so 

400 # the first pass through here, we are guaranteed to find "pending" 

401 # batch futures and have to wait. However, the second pass through 

402 # will be fast unless the last batch introduced more retries. 

403 if self._pending_batch_futures: 

404 _batches = self._pending_batch_futures 

405 self._pending_batch_futures = [] 

406 concurrent.futures.wait(_batches) 

407 

408 # Continuing is critical here (as opposed to breaking) because 

409 # the final batch may have introduced retries which is most 

410 # straightforwardly verified by heading back to the top of the loop. 

411 continue 

412 

413 break 

414 

415 # We no longer expect to have any queued batches or pending futures, 

416 # so the executor can be shutdown. 

417 self._executor.shutdown() 

418 

419 def close(self): 

420 """ 

421 Block until all pooled write operations are complete and then reject 

422 any further write operations. 

423 """ 

424 self._is_open = False 

425 self.flush() 

426 

427 def _maybe_enqueue_current_batch(self): 

428 """ 

429 Checks to see whether the in-progress batch is full and, if it is, 

430 adds it to the sending queue. 

431 """ 

432 if len(self._operations) >= self.batch_size: 

433 self._enqueue_current_batch() 

434 

435 def _enqueue_current_batch(self): 

436 """Adds the current batch to the back of the sending line, resets the 

437 list of queued ops, and begins the process of actually sending whatever 

438 batch is in the front of the line, which will often be a different batch. 

439 """ 

440 # Put our batch in the back of the sending line 

441 self._queued_batches.append(self._operations) 

442 

443 # Reset the local store of operations 

444 self._reset_operations() 

445 

446 # The sending loop powers off upon reaching the end of the queue, so 

447 # here we make sure that is running. 

448 self._ensure_sending() 

449 

450 def _send_until_queue_is_empty(self) -> None: 

451 """First domino in the sending codepath. This does not need to be 

452 parallelized for two reasons: 

453 

454 1) Putting this on a worker thread could lead to two running in parallel 

455 and thus unpredictable commit ordering or failure to adhere to 

456 rate limits. 

457 2) This method only blocks when `self._request_send()` does not immediately 

458 return, and in that case, the BulkWriter's ramp-up / throttling logic 

459 has determined that it is attempting to exceed the maximum write speed, 

460 and so parallelizing this method would not increase performance anyway. 

461 

462 Once `self._request_send()` returns, this method calls `self._send_batch()`, 

463 which parallelizes itself if that is our SendMode value. 

464 

465 And once `self._send_batch()` is called (which does not block if we are 

466 sending in parallel), jumps back to the top and re-checks for any queued 

467 batches. 

468 

469 Note that for sufficiently large data migrations, this can block the 

470 submission of additional write operations (e.g., the CRUD methods); 

471 but again, that is only if the maximum write speed is being exceeded, 

472 and thus this scenario does not actually further reduce performance. 

473 """ 

474 self._schedule_ready_retries() 

475 

476 while self._queued_batches: 

477 # For FIFO order, add to the right of this deque (via `append`) and take 

478 # from the left (via `popleft`). 

479 operations: List[BulkWriterOperation] = self._queued_batches.popleft() 

480 

481 # Block until we are cleared for takeoff, which is fine because this 

482 # returns instantly unless the rate limiting logic determines that we 

483 # are attempting to exceed the maximum write speed. 

484 self._request_send(len(operations)) 

485 

486 # Handle some bookkeeping, and ultimately put these bits on the wire. 

487 batch = BulkWriteBatch(client=self._client) 

488 op: BulkWriterOperation 

489 for op in operations: 

490 op.add_to_batch(batch) 

491 

492 # `_send_batch` is optionally parallelized by `@_with_send_mode`. 

493 future = self._send_batch(batch=batch, operations=operations) 

494 self._pending_batch_futures.append(future) 

495 

496 self._schedule_ready_retries() 

497 return None 

498 

499 def _schedule_ready_retries(self) -> None: 

500 """Grabs all ready retries and re-queues them.""" 

501 

502 # Because `self._retries` always exists in a sorted state (thanks to only 

503 # ever adding to it via `bisect.insort`), and because `OperationRetry` 

504 # objects are comparable against `datetime` objects, this bisect functionally 

505 # returns the number of retires that are ready for immediate reenlistment. 

506 take_until_index = bisect.bisect( 

507 self._retries, datetime.datetime.now(tz=datetime.timezone.utc) 

508 ) 

509 

510 for _ in range(take_until_index): 

511 retry: OperationRetry = self._retries.popleft() 

512 retry.retry(self) 

513 return None 

514 

515 def _request_send(self, batch_size: int) -> bool: 

516 # Set up this boolean to avoid repeatedly taking tokens if we're only 

517 # waiting on the `max_in_flight` limit. 

518 have_received_tokens: bool = False 

519 

520 while True: 

521 # To avoid bottlenecks on the server, an additional limit is that no 

522 # more write operations can be "in flight" (sent but still awaiting 

523 # response) at any given point than the maximum number of writes per 

524 # second. 

525 under_threshold: bool = ( 

526 self._in_flight_documents <= self._rate_limiter._maximum_tokens 

527 ) 

528 # Ask for tokens each pass through this loop until they are granted, 

529 # and then stop. 

530 have_received_tokens = have_received_tokens or bool( 

531 self._rate_limiter.take_tokens(batch_size) 

532 ) 

533 if not under_threshold or not have_received_tokens: 

534 # Try again until both checks are true. 

535 # Note that this sleep is helpful to prevent the main BulkWriter 

536 # thread from spinning through this loop as fast as possible and 

537 # pointlessly burning CPU while we wait for the arrival of a 

538 # fixed moment in the future. 

539 time.sleep(0.01) 

540 continue 

541 

542 return True 

543 

544 def create( 

545 self, 

546 reference: BaseDocumentReference, 

547 document_data: Dict, 

548 attempts: int = 0, 

549 ) -> None: 

550 """Adds a `create` pb to the in-progress batch. 

551 

552 If the in-progress batch already contains a write operation involving 

553 this document reference, the batch will be sealed and added to the commit 

554 queue, and a new batch will be created with this operation as its first 

555 entry. 

556 

557 If this create operation results in the in-progress batch reaching full 

558 capacity, then the batch will be similarly added to the commit queue, and 

559 a new batch will be created for future operations. 

560 

561 Args: 

562 reference (:class:`~google.cloud.firestore_v1.base_document.BaseDocumentReference`): 

563 Pointer to the document that should be created. 

564 document_data (dict): 

565 Raw data to save to the server. 

566 """ 

567 self._verify_not_closed() 

568 

569 if reference._document_path in self._operations_document_paths: 

570 self._enqueue_current_batch() 

571 

572 self._operations.append( 

573 BulkWriterCreateOperation( 

574 reference=reference, 

575 document_data=document_data, 

576 attempts=attempts, 

577 ), 

578 ) 

579 self._operations_document_paths.append(reference._document_path) 

580 

581 self._maybe_enqueue_current_batch() 

582 

583 def delete( 

584 self, 

585 reference: BaseDocumentReference, 

586 option: Optional[_helpers.WriteOption] = None, 

587 attempts: int = 0, 

588 ) -> None: 

589 """Adds a `delete` pb to the in-progress batch. 

590 

591 If the in-progress batch already contains a write operation involving 

592 this document reference, the batch will be sealed and added to the commit 

593 queue, and a new batch will be created with this operation as its first 

594 entry. 

595 

596 If this delete operation results in the in-progress batch reaching full 

597 capacity, then the batch will be similarly added to the commit queue, and 

598 a new batch will be created for future operations. 

599 

600 Args: 

601 reference (:class:`~google.cloud.firestore_v1.base_document.BaseDocumentReference`): 

602 Pointer to the document that should be created. 

603 option (:class:`~google.cloud.firestore_v1._helpers.WriteOption`): 

604 Optional flag to modify the nature of this write. 

605 """ 

606 self._verify_not_closed() 

607 

608 if reference._document_path in self._operations_document_paths: 

609 self._enqueue_current_batch() 

610 

611 self._operations.append( 

612 BulkWriterDeleteOperation( 

613 reference=reference, 

614 option=option, 

615 attempts=attempts, 

616 ), 

617 ) 

618 self._operations_document_paths.append(reference._document_path) 

619 

620 self._maybe_enqueue_current_batch() 

621 

622 def set( 

623 self, 

624 reference: BaseDocumentReference, 

625 document_data: Dict, 

626 merge: Union[bool, list] = False, 

627 attempts: int = 0, 

628 ) -> None: 

629 """Adds a `set` pb to the in-progress batch. 

630 

631 If the in-progress batch already contains a write operation involving 

632 this document reference, the batch will be sealed and added to the commit 

633 queue, and a new batch will be created with this operation as its first 

634 entry. 

635 

636 If this set operation results in the in-progress batch reaching full 

637 capacity, then the batch will be similarly added to the commit queue, and 

638 a new batch will be created for future operations. 

639 

640 Args: 

641 reference (:class:`~google.cloud.firestore_v1.base_document.BaseDocumentReference`): 

642 Pointer to the document that should be created. 

643 document_data (dict): 

644 Raw data to save to the server. 

645 merge (bool): 

646 Whether or not to completely overwrite any existing data with 

647 the supplied data. 

648 """ 

649 self._verify_not_closed() 

650 

651 if reference._document_path in self._operations_document_paths: 

652 self._enqueue_current_batch() 

653 

654 self._operations.append( 

655 BulkWriterSetOperation( 

656 reference=reference, 

657 document_data=document_data, 

658 merge=merge, 

659 attempts=attempts, 

660 ) 

661 ) 

662 self._operations_document_paths.append(reference._document_path) 

663 

664 self._maybe_enqueue_current_batch() 

665 

666 def update( 

667 self, 

668 reference: BaseDocumentReference, 

669 field_updates: dict, 

670 option: Optional[_helpers.WriteOption] = None, 

671 attempts: int = 0, 

672 ) -> None: 

673 """Adds an `update` pb to the in-progress batch. 

674 

675 If the in-progress batch already contains a write operation involving 

676 this document reference, the batch will be sealed and added to the commit 

677 queue, and a new batch will be created with this operation as its first 

678 entry. 

679 

680 If this update operation results in the in-progress batch reaching full 

681 capacity, then the batch will be similarly added to the commit queue, and 

682 a new batch will be created for future operations. 

683 

684 Args: 

685 reference (:class:`~google.cloud.firestore_v1.base_document.BaseDocumentReference`): 

686 Pointer to the document that should be created. 

687 field_updates (dict): 

688 Key paths to specific nested data that should be upated. 

689 option (:class:`~google.cloud.firestore_v1._helpers.WriteOption`): 

690 Optional flag to modify the nature of this write. 

691 """ 

692 # This check is copied from other Firestore classes for the purposes of 

693 # surfacing the error immediately. 

694 if option.__class__.__name__ == "ExistsOption": 

695 raise ValueError("you must not pass an explicit write option to update.") 

696 

697 self._verify_not_closed() 

698 

699 if reference._document_path in self._operations_document_paths: 

700 self._enqueue_current_batch() 

701 

702 self._operations.append( 

703 BulkWriterUpdateOperation( 

704 reference=reference, 

705 field_updates=field_updates, 

706 option=option, 

707 attempts=attempts, 

708 ) 

709 ) 

710 self._operations_document_paths.append(reference._document_path) 

711 

712 self._maybe_enqueue_current_batch() 

713 

714 def on_write_result( 

715 self, 

716 callback: Optional[ 

717 Callable[[BaseDocumentReference, WriteResult, "BulkWriter"], None] 

718 ], 

719 ) -> None: 

720 """Sets a callback that will be invoked once for every successful operation.""" 

721 self._success_callback = callback or BulkWriter._default_on_success 

722 

723 def on_batch_result( 

724 self, 

725 callback: Optional[ 

726 Callable[[BulkWriteBatch, BatchWriteResponse, "BulkWriter"], None] 

727 ], 

728 ) -> None: 

729 """Sets a callback that will be invoked once for every successful batch.""" 

730 self._batch_callback = callback or BulkWriter._default_on_batch 

731 

732 def on_write_error( 

733 self, callback: Optional[Callable[["BulkWriteFailure", "BulkWriter"], bool]] 

734 ) -> None: 

735 """Sets a callback that will be invoked once for every batch that contains 

736 an error.""" 

737 self._error_callback = callback or BulkWriter._default_on_error 

738 

739 def _verify_not_closed(self): 

740 if not self._is_open: 

741 raise Exception("BulkWriter is closed and cannot accept new operations") 

742 

743 

744class BulkWriterOperation: 

745 """Parent class for all operation container classes. 

746 

747 `BulkWriterOperation` exists to house all the necessary information for a 

748 specific write task, including meta information like the current number of 

749 attempts. If a write fails, it is its wrapper `BulkWriteOperation` class 

750 that ferries it into its next retry without getting confused with other 

751 similar writes to the same document. 

752 """ 

753 

754 def __init__(self, attempts: int = 0): 

755 self.attempts = attempts 

756 

757 def add_to_batch(self, batch: BulkWriteBatch): 

758 """Adds `self` to the supplied batch.""" 

759 assert isinstance(batch, BulkWriteBatch) 

760 if isinstance(self, BulkWriterCreateOperation): 

761 return batch.create( 

762 reference=self.reference, 

763 document_data=self.document_data, 

764 ) 

765 

766 if isinstance(self, BulkWriterDeleteOperation): 

767 return batch.delete( 

768 reference=self.reference, 

769 option=self.option, 

770 ) 

771 

772 if isinstance(self, BulkWriterSetOperation): 

773 return batch.set( 

774 reference=self.reference, 

775 document_data=self.document_data, 

776 merge=self.merge, 

777 ) 

778 

779 if isinstance(self, BulkWriterUpdateOperation): 

780 return batch.update( 

781 reference=self.reference, 

782 field_updates=self.field_updates, 

783 option=self.option, 

784 ) 

785 raise TypeError( 

786 f"Unexpected type of {self.__class__.__name__} for batch" 

787 ) # pragma: NO COVER 

788 

789 

790@functools.total_ordering 

791class BaseOperationRetry: 

792 """Parent class for both the @dataclass and old-style `OperationRetry` 

793 classes. 

794 

795 Methods on this class be moved directly to `OperationRetry` when support for 

796 Python 3.6 is dropped and `dataclasses` becomes universal. 

797 """ 

798 

799 def __lt__(self: "OperationRetry", other: "OperationRetry"): # type: ignore 

800 """Allows use of `bisect` to maintain a sorted list of `OperationRetry` 

801 instances, which in turn allows us to cheaply grab all that are ready to 

802 run.""" 

803 if isinstance(other, OperationRetry): 

804 return self.run_at < other.run_at 

805 elif isinstance(other, datetime.datetime): 

806 return self.run_at < other 

807 return NotImplemented # pragma: NO COVER 

808 

809 def retry(self: "OperationRetry", bulk_writer: BulkWriter) -> None: # type: ignore 

810 """Call this after waiting any necessary time to re-add the enclosed 

811 operation to the supplied BulkWriter's internal queue.""" 

812 if isinstance(self.operation, BulkWriterCreateOperation): 

813 bulk_writer.create( 

814 reference=self.operation.reference, 

815 document_data=self.operation.document_data, 

816 attempts=self.operation.attempts, 

817 ) 

818 

819 elif isinstance(self.operation, BulkWriterDeleteOperation): 

820 bulk_writer.delete( 

821 reference=self.operation.reference, 

822 option=self.operation.option, 

823 attempts=self.operation.attempts, 

824 ) 

825 

826 elif isinstance(self.operation, BulkWriterSetOperation): 

827 bulk_writer.set( 

828 reference=self.operation.reference, 

829 document_data=self.operation.document_data, 

830 merge=self.operation.merge, 

831 attempts=self.operation.attempts, 

832 ) 

833 

834 elif isinstance(self.operation, BulkWriterUpdateOperation): 

835 bulk_writer.update( 

836 reference=self.operation.reference, 

837 field_updates=self.operation.field_updates, 

838 option=self.operation.option, 

839 attempts=self.operation.attempts, 

840 ) 

841 else: 

842 raise TypeError( 

843 f"Unexpected type of {self.operation.__class__.__name__} for OperationRetry.retry" 

844 ) # pragma: NO COVER 

845 

846 

847@dataclass 

848class BulkWriterOptions: 

849 initial_ops_per_second: int = 500 

850 max_ops_per_second: int = 500 

851 mode: SendMode = SendMode.parallel 

852 retry: BulkRetry = BulkRetry.linear 

853 

854 

855@dataclass 

856class BulkWriteFailure: 

857 operation: BulkWriterOperation 

858 # https://grpc.github.io/grpc/core/md_doc_statuscodes.html 

859 code: int 

860 message: str 

861 

862 @property 

863 def attempts(self) -> int: 

864 return self.operation.attempts 

865 

866 

867@dataclass 

868class OperationRetry(BaseOperationRetry): 

869 """Container for an additional attempt at an operation, scheduled for 

870 the future.""" 

871 

872 operation: BulkWriterOperation 

873 run_at: datetime.datetime 

874 

875 

876@dataclass 

877class BulkWriterCreateOperation(BulkWriterOperation): 

878 """Container for BulkWriter.create() operations.""" 

879 

880 reference: BaseDocumentReference 

881 document_data: Dict 

882 attempts: int = 0 

883 

884 

885@dataclass 

886class BulkWriterUpdateOperation(BulkWriterOperation): 

887 """Container for BulkWriter.update() operations.""" 

888 

889 reference: BaseDocumentReference 

890 field_updates: Dict 

891 option: Optional[_helpers.WriteOption] 

892 attempts: int = 0 

893 

894 

895@dataclass 

896class BulkWriterSetOperation(BulkWriterOperation): 

897 """Container for BulkWriter.set() operations.""" 

898 

899 reference: BaseDocumentReference 

900 document_data: Dict 

901 merge: Union[bool, list] = False 

902 attempts: int = 0 

903 

904 

905@dataclass 

906class BulkWriterDeleteOperation(BulkWriterOperation): 

907 """Container for BulkWriter.delete() operations.""" 

908 

909 reference: BaseDocumentReference 

910 option: Optional[_helpers.WriteOption] 

911 attempts: int = 0