Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/google/cloud/firestore_v1/bulk_writer.py: 42%

264 statements  

« prev     ^ index     » next       coverage.py v7.3.2, created at 2023-12-09 06:27 +0000

1# Copyright 2021 Google LLC All rights reserved. 

2# 

3# Licensed under the Apache License, Version 2.0 (the "License"); 

4# you may not use this file except in compliance with the License. 

5# You may obtain a copy of the License at 

6# 

7# http://www.apache.org/licenses/LICENSE-2.0 

8# 

9# Unless required by applicable law or agreed to in writing, software 

10# distributed under the License is distributed on an "AS IS" BASIS, 

11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

12# See the License for the specific language governing permissions and 

13# limitations under the License. 

14 

15"""Helpers for efficiently writing large amounts of data to the Google Cloud 

16Firestore API.""" 

17 

18import bisect 

19import collections 

20import concurrent.futures 

21import datetime 

22import enum 

23import functools 

24import logging 

25import time 

26 

27from dataclasses import dataclass 

28from typing import Callable, Dict, List, Optional, Union, TYPE_CHECKING 

29 

30from google.rpc import status_pb2 # type: ignore 

31 

32from google.cloud.firestore_v1 import _helpers 

33from google.cloud.firestore_v1.base_document import BaseDocumentReference 

34from google.cloud.firestore_v1.bulk_batch import BulkWriteBatch 

35from google.cloud.firestore_v1.rate_limiter import RateLimiter 

36from google.cloud.firestore_v1.types.firestore import BatchWriteResponse 

37from google.cloud.firestore_v1.types.write import WriteResult 

38 

39if TYPE_CHECKING: 

40 from google.cloud.firestore_v1.base_client import BaseClient # pragma: NO COVER 

41 

42 

43logger = logging.getLogger(__name__) 

44 

45 

46class BulkRetry(enum.Enum): 

47 """Indicator for what retry strategy the BulkWriter should use.""" 

48 

49 # Common exponential backoff algorithm. This strategy is largely incompatible 

50 # with the default retry limit of 15, so use with caution. 

51 exponential = enum.auto() 

52 

53 # Default strategy that adds 1 second of delay per retry. 

54 linear = enum.auto() 

55 

56 # Immediate retries with no growing delays. 

57 immediate = enum.auto() 

58 

59 

60class SendMode(enum.Enum): 

61 """Indicator for whether a BulkWriter should commit batches in the main 

62 thread or hand that work off to an executor.""" 

63 

64 # Default strategy that parallelizes network I/O on an executor. You almost 

65 # certainly want this. 

66 parallel = enum.auto() 

67 

68 # Alternate strategy which blocks during all network I/O. Much slower, but 

69 # assures all batches are sent to the server in order. Note that 

70 # `SendMode.serial` is extremely susceptible to slowdowns from retries if 

71 # there are a lot of errors. 

72 serial = enum.auto() 

73 

74 

75class AsyncBulkWriterMixin: 

76 """ 

77 Mixin which contains the methods on `BulkWriter` which must only be 

78 submitted to the executor (or called by functions submitted to the executor). 

79 This mixin exists purely for organization and clarity of implementation 

80 (e.g., there is no metaclass magic). 

81 

82 The entrypoint to the parallelizable code path is `_send_batch()`, which is 

83 wrapped in a decorator which ensures that the `SendMode` is honored. 

84 """ 

85 

86 def _with_send_mode(fn): 

87 """Decorates a method to ensure it is only called via the executor 

88 (IFF the SendMode value is SendMode.parallel!). 

89 

90 Usage: 

91 

92 @_with_send_mode 

93 def my_method(self): 

94 parallel_stuff() 

95 

96 def something_else(self): 

97 # Because of the decorator around `my_method`, the following 

98 # method invocation: 

99 self.my_method() 

100 # becomes equivalent to `self._executor.submit(self.my_method)` 

101 # when the send mode is `SendMode.parallel`. 

102 

103 Use on entrypoint methods for code paths that *must* be parallelized. 

104 """ 

105 

106 @functools.wraps(fn) 

107 def wrapper(self, *args, **kwargs): 

108 if self._send_mode == SendMode.parallel: 

109 return self._executor.submit(lambda: fn(self, *args, **kwargs)) 

110 else: 

111 # For code parity, even `SendMode.serial` scenarios should return 

112 # a future here. Anything else would badly complicate calling code. 

113 result = fn(self, *args, **kwargs) 

114 future = concurrent.futures.Future() 

115 future.set_result(result) 

116 return future 

117 

118 return wrapper 

119 

120 @_with_send_mode 

121 def _send_batch( 

122 self, batch: BulkWriteBatch, operations: List["BulkWriterOperation"] 

123 ): 

124 """Sends a batch without regard to rate limits, meaning limits must have 

125 already been checked. To that end, do not call this directly; instead, 

126 call `_send_until_queue_is_empty`. 

127 

128 Args: 

129 batch(:class:`~google.cloud.firestore_v1.base_batch.BulkWriteBatch`) 

130 """ 

131 _len_batch: int = len(batch) 

132 self._in_flight_documents += _len_batch 

133 response: BatchWriteResponse = self._send(batch) 

134 self._in_flight_documents -= _len_batch 

135 

136 # Update bookkeeping totals 

137 self._total_batches_sent += 1 

138 self._total_write_operations += _len_batch 

139 

140 self._process_response(batch, response, operations) 

141 

142 def _process_response( 

143 self, 

144 batch: BulkWriteBatch, 

145 response: BatchWriteResponse, 

146 operations: List["BulkWriterOperation"], 

147 ) -> None: 

148 """Invokes submitted callbacks for each batch and each operation within 

149 each batch. As this is called from `_send_batch()`, this is parallelized 

150 if we are in that mode. 

151 """ 

152 batch_references: List[BaseDocumentReference] = list( 

153 batch._document_references.values(), 

154 ) 

155 self._batch_callback(batch, response, self) 

156 

157 status: status_pb2.Status 

158 for index, status in enumerate(response.status): 

159 if status.code == 0: 

160 self._success_callback( 

161 # DocumentReference 

162 batch_references[index], 

163 # WriteResult 

164 response.write_results[index], 

165 # BulkWriter 

166 self, 

167 ) 

168 else: 

169 operation: BulkWriterOperation = operations[index] 

170 should_retry: bool = self._error_callback( 

171 # BulkWriteFailure 

172 BulkWriteFailure( 

173 operation=operation, 

174 code=status.code, 

175 message=status.message, 

176 ), 

177 # BulkWriter 

178 self, 

179 ) 

180 if should_retry: 

181 operation.attempts += 1 

182 self._retry_operation(operation) 

183 

184 def _retry_operation( 

185 self, 

186 operation: "BulkWriterOperation", 

187 ) -> concurrent.futures.Future: 

188 delay: int = 0 

189 if self._options.retry == BulkRetry.exponential: 

190 delay = operation.attempts**2 # pragma: NO COVER 

191 elif self._options.retry == BulkRetry.linear: 

192 delay = operation.attempts 

193 

194 run_at = datetime.datetime.now(tz=datetime.timezone.utc) + datetime.timedelta( 

195 seconds=delay 

196 ) 

197 

198 # Use of `bisect.insort` maintains the requirement that `self._retries` 

199 # always remain sorted by each object's `run_at` time. Note that it is 

200 # able to do this because `OperationRetry` instances are entirely sortable 

201 # by their `run_at` value. 

202 bisect.insort( 

203 self._retries, 

204 OperationRetry(operation=operation, run_at=run_at), 

205 ) 

206 

207 def _send(self, batch: BulkWriteBatch) -> BatchWriteResponse: 

208 """Hook for overwriting the sending of batches. As this is only called 

209 from `_send_batch()`, this is parallelized if we are in that mode. 

210 """ 

211 return batch.commit() # pragma: NO COVER 

212 

213 

214class BulkWriter(AsyncBulkWriterMixin): 

215 """ 

216 Accumulate and efficiently save large amounts of document write operations 

217 to the server. 

218 

219 BulkWriter can handle large data migrations or updates, buffering records 

220 in memory and submitting them to the server in batches of 20. 

221 

222 The submission of batches is internally parallelized with a ThreadPoolExecutor, 

223 meaning end developers do not need to manage an event loop or worry about asyncio 

224 to see parallelization speed ups (which can easily 10x throughput). Because 

225 of this, there is no companion `AsyncBulkWriter` class, as is usually seen 

226 with other utility classes. 

227 

228 Usage: 

229 

230 # Instantiate the BulkWriter. This works from either `Client` or 

231 # `AsyncClient`. 

232 db = firestore.Client() 

233 bulk_writer = db.bulk_writer() 

234 

235 # Attach an optional success listener to be called once per document. 

236 bulk_writer.on_write_result( 

237 lambda reference, result, bulk_writer: print(f'Saved {reference._document_path}') 

238 ) 

239 

240 # Queue an arbitrary amount of write operations. 

241 # Assume `my_new_records` is a list of (DocumentReference, dict,) 

242 # tuple-pairs that you supply. 

243 

244 reference: DocumentReference 

245 data: dict 

246 for reference, data in my_new_records: 

247 bulk_writer.create(reference, data) 

248 

249 # Block until all pooled writes are complete. 

250 bulk_writer.flush() 

251 

252 Args: 

253 client(:class:`~google.cloud.firestore_v1.client.Client`): 

254 The client that created this BulkWriter. 

255 """ 

256 

257 batch_size: int = 20 

258 

259 def __init__( 

260 self, 

261 client: "BaseClient" = None, 

262 options: Optional["BulkWriterOptions"] = None, 

263 ): 

264 # Because `BulkWriter` instances are all synchronous/blocking on the 

265 # main thread (instead using other threads for asynchrony), it is 

266 # incompatible with AsyncClient's various methods that return Futures. 

267 # `BulkWriter` parallelizes all of its network I/O without the developer 

268 # having to worry about awaiting async methods, so we must convert an 

269 # AsyncClient instance into a plain Client instance. 

270 self._client = ( 

271 client._to_sync_copy() if type(client).__name__ == "AsyncClient" else client 

272 ) 

273 self._options = options or BulkWriterOptions() 

274 self._send_mode = self._options.mode 

275 

276 self._operations: List[BulkWriterOperation] 

277 # List of the `_document_path` attribute for each DocumentReference 

278 # contained in the current `self._operations`. This is reset every time 

279 # `self._operations` is reset. 

280 self._operations_document_paths: List[BaseDocumentReference] 

281 self._reset_operations() 

282 

283 # List of all `BulkWriterOperation` objects that are waiting to be retried. 

284 # Each such object is wrapped in an `OperationRetry` object which pairs 

285 # the raw operation with the `datetime` of its next scheduled attempt. 

286 # `self._retries` must always remain sorted for efficient reads, so it is 

287 # required to only ever add elements via `bisect.insort`. 

288 self._retries: collections.deque["OperationRetry"] = collections.deque([]) 

289 

290 self._queued_batches = collections.deque([]) 

291 self._is_open: bool = True 

292 

293 # This list will go on to store the future returned from each submission 

294 # to the executor, for the purpose of awaiting all of those futures' 

295 # completions in the `flush` method. 

296 self._pending_batch_futures: List[concurrent.futures.Future] = [] 

297 

298 self._success_callback: Callable[ 

299 [BaseDocumentReference, WriteResult, "BulkWriter"], None 

300 ] = BulkWriter._default_on_success 

301 self._batch_callback: Callable[ 

302 [BulkWriteBatch, BatchWriteResponse, "BulkWriter"], None 

303 ] = BulkWriter._default_on_batch 

304 self._error_callback: Callable[ 

305 [BulkWriteFailure, BulkWriter], bool 

306 ] = BulkWriter._default_on_error 

307 

308 self._in_flight_documents: int = 0 

309 self._rate_limiter = RateLimiter( 

310 initial_tokens=self._options.initial_ops_per_second, 

311 global_max_tokens=self._options.max_ops_per_second, 

312 ) 

313 

314 # Keep track of progress as batches and write operations are completed 

315 self._total_batches_sent: int = 0 

316 self._total_write_operations: int = 0 

317 

318 self._ensure_executor() 

319 

320 @staticmethod 

321 def _default_on_batch( 

322 batch: BulkWriteBatch, 

323 response: BatchWriteResponse, 

324 bulk_writer: "BulkWriter", 

325 ) -> None: 

326 pass 

327 

328 @staticmethod 

329 def _default_on_success( 

330 reference: BaseDocumentReference, 

331 result: WriteResult, 

332 bulk_writer: "BulkWriter", 

333 ) -> None: 

334 pass 

335 

336 @staticmethod 

337 def _default_on_error(error: "BulkWriteFailure", bulk_writer: "BulkWriter") -> bool: 

338 # Default number of retries for each operation is 15. This is a scary 

339 # number to combine with an exponential backoff, and as such, our default 

340 # backoff strategy is linear instead of exponential. 

341 return error.attempts < 15 

342 

343 def _reset_operations(self) -> None: 

344 self._operations = [] 

345 self._operations_document_paths = [] 

346 

347 def _ensure_executor(self): 

348 """Reboots the executor used to send batches if it has been shutdown.""" 

349 if getattr(self, "_executor", None) is None or self._executor._shutdown: 

350 self._executor = self._instantiate_executor() 

351 

352 def _ensure_sending(self): 

353 self._ensure_executor() 

354 self._send_until_queue_is_empty() 

355 

356 def _instantiate_executor(self): 

357 return concurrent.futures.ThreadPoolExecutor() 

358 

359 def flush(self): 

360 """ 

361 Block until all pooled write operations are complete and then resume 

362 accepting new write operations. 

363 """ 

364 # Calling `flush` consecutively is a no-op. 

365 if self._executor._shutdown: 

366 return 

367 

368 while True: 

369 # Queue any waiting operations and try our luck again. 

370 # This can happen if users add a number of records not divisible by 

371 # 20 and then call flush (which should be ~19 out of 20 use cases). 

372 # Execution will arrive here and find the leftover operations that 

373 # never filled up a batch organically, and so we must send them here. 

374 if self._operations: 

375 self._enqueue_current_batch() 

376 continue 

377 

378 # If we find queued but unsent batches or pending retries, begin 

379 # sending immediately. Note that if we are waiting on retries, but 

380 # they have longer to wait as specified by the retry backoff strategy, 

381 # we may have to make several passes through this part of the loop. 

382 # (This is related to the sleep and its explanation below.) 

383 if self._queued_batches or self._retries: 

384 self._ensure_sending() 

385 

386 # This sleep prevents max-speed laps through this loop, which can 

387 # and will happen if the BulkWriter is doing nothing except waiting 

388 # on retries to be ready to re-send. Removing this sleep will cause 

389 # whatever thread is running this code to sit near 100% CPU until 

390 # all retries are abandoned or successfully resolved. 

391 time.sleep(0.1) 

392 continue 

393 

394 # We store the executor's Future from each batch send operation, so 

395 # the first pass through here, we are guaranteed to find "pending" 

396 # batch futures and have to wait. However, the second pass through 

397 # will be fast unless the last batch introduced more retries. 

398 if self._pending_batch_futures: 

399 _batches = self._pending_batch_futures 

400 self._pending_batch_futures = [] 

401 concurrent.futures.wait(_batches) 

402 

403 # Continuing is critical here (as opposed to breaking) because 

404 # the final batch may have introduced retries which is most 

405 # straightforwardly verified by heading back to the top of the loop. 

406 continue 

407 

408 break 

409 

410 # We no longer expect to have any queued batches or pending futures, 

411 # so the executor can be shutdown. 

412 self._executor.shutdown() 

413 

414 def close(self): 

415 """ 

416 Block until all pooled write operations are complete and then reject 

417 any further write operations. 

418 """ 

419 self._is_open = False 

420 self.flush() 

421 

422 def _maybe_enqueue_current_batch(self): 

423 """ 

424 Checks to see whether the in-progress batch is full and, if it is, 

425 adds it to the sending queue. 

426 """ 

427 if len(self._operations) >= self.batch_size: 

428 self._enqueue_current_batch() 

429 

430 def _enqueue_current_batch(self): 

431 """Adds the current batch to the back of the sending line, resets the 

432 list of queued ops, and begins the process of actually sending whatever 

433 batch is in the front of the line, which will often be a different batch. 

434 """ 

435 # Put our batch in the back of the sending line 

436 self._queued_batches.append(self._operations) 

437 

438 # Reset the local store of operations 

439 self._reset_operations() 

440 

441 # The sending loop powers off upon reaching the end of the queue, so 

442 # here we make sure that is running. 

443 self._ensure_sending() 

444 

445 def _send_until_queue_is_empty(self): 

446 """First domino in the sending codepath. This does not need to be 

447 parallelized for two reasons: 

448 

449 1) Putting this on a worker thread could lead to two running in parallel 

450 and thus unpredictable commit ordering or failure to adhere to 

451 rate limits. 

452 2) This method only blocks when `self._request_send()` does not immediately 

453 return, and in that case, the BulkWriter's ramp-up / throttling logic 

454 has determined that it is attempting to exceed the maximum write speed, 

455 and so parallelizing this method would not increase performance anyway. 

456 

457 Once `self._request_send()` returns, this method calls `self._send_batch()`, 

458 which parallelizes itself if that is our SendMode value. 

459 

460 And once `self._send_batch()` is called (which does not block if we are 

461 sending in parallel), jumps back to the top and re-checks for any queued 

462 batches. 

463 

464 Note that for sufficiently large data migrations, this can block the 

465 submission of additional write operations (e.g., the CRUD methods); 

466 but again, that is only if the maximum write speed is being exceeded, 

467 and thus this scenario does not actually further reduce performance. 

468 """ 

469 self._schedule_ready_retries() 

470 

471 while self._queued_batches: 

472 # For FIFO order, add to the right of this deque (via `append`) and take 

473 # from the left (via `popleft`). 

474 operations: List[BulkWriterOperation] = self._queued_batches.popleft() 

475 

476 # Block until we are cleared for takeoff, which is fine because this 

477 # returns instantly unless the rate limiting logic determines that we 

478 # are attempting to exceed the maximum write speed. 

479 self._request_send(len(operations)) 

480 

481 # Handle some bookkeeping, and ultimately put these bits on the wire. 

482 batch = BulkWriteBatch(client=self._client) 

483 op: BulkWriterOperation 

484 for op in operations: 

485 op.add_to_batch(batch) 

486 

487 # `_send_batch` is optionally parallelized by `@_with_send_mode`. 

488 future = self._send_batch(batch=batch, operations=operations) 

489 self._pending_batch_futures.append(future) 

490 

491 self._schedule_ready_retries() 

492 

493 def _schedule_ready_retries(self): 

494 """Grabs all ready retries and re-queues them.""" 

495 

496 # Because `self._retries` always exists in a sorted state (thanks to only 

497 # ever adding to it via `bisect.insort`), and because `OperationRetry` 

498 # objects are comparable against `datetime` objects, this bisect functionally 

499 # returns the number of retires that are ready for immediate reenlistment. 

500 take_until_index = bisect.bisect( 

501 self._retries, datetime.datetime.now(tz=datetime.timezone.utc) 

502 ) 

503 

504 for _ in range(take_until_index): 

505 retry: OperationRetry = self._retries.popleft() 

506 retry.retry(self) 

507 

508 def _request_send(self, batch_size: int) -> bool: 

509 # Set up this boolean to avoid repeatedly taking tokens if we're only 

510 # waiting on the `max_in_flight` limit. 

511 have_received_tokens: bool = False 

512 

513 while True: 

514 # To avoid bottlenecks on the server, an additional limit is that no 

515 # more write operations can be "in flight" (sent but still awaiting 

516 # response) at any given point than the maximum number of writes per 

517 # second. 

518 under_threshold: bool = ( 

519 self._in_flight_documents <= self._rate_limiter._maximum_tokens 

520 ) 

521 # Ask for tokens each pass through this loop until they are granted, 

522 # and then stop. 

523 have_received_tokens = ( 

524 have_received_tokens or self._rate_limiter.take_tokens(batch_size) 

525 ) 

526 if not under_threshold or not have_received_tokens: 

527 # Try again until both checks are true. 

528 # Note that this sleep is helpful to prevent the main BulkWriter 

529 # thread from spinning through this loop as fast as possible and 

530 # pointlessly burning CPU while we wait for the arrival of a 

531 # fixed moment in the future. 

532 time.sleep(0.01) 

533 continue 

534 

535 return True 

536 

537 def create( 

538 self, 

539 reference: BaseDocumentReference, 

540 document_data: Dict, 

541 attempts: int = 0, 

542 ) -> None: 

543 """Adds a `create` pb to the in-progress batch. 

544 

545 If the in-progress batch already contains a write operation involving 

546 this document reference, the batch will be sealed and added to the commit 

547 queue, and a new batch will be created with this operation as its first 

548 entry. 

549 

550 If this create operation results in the in-progress batch reaching full 

551 capacity, then the batch will be similarly added to the commit queue, and 

552 a new batch will be created for future operations. 

553 

554 Args: 

555 reference (:class:`~google.cloud.firestore_v1.base_document.BaseDocumentReference`): 

556 Pointer to the document that should be created. 

557 document_data (dict): 

558 Raw data to save to the server. 

559 """ 

560 self._verify_not_closed() 

561 

562 if reference._document_path in self._operations_document_paths: 

563 self._enqueue_current_batch() 

564 

565 self._operations.append( 

566 BulkWriterCreateOperation( 

567 reference=reference, 

568 document_data=document_data, 

569 attempts=attempts, 

570 ), 

571 ) 

572 self._operations_document_paths.append(reference._document_path) 

573 

574 self._maybe_enqueue_current_batch() 

575 

576 def delete( 

577 self, 

578 reference: BaseDocumentReference, 

579 option: Optional[_helpers.WriteOption] = None, 

580 attempts: int = 0, 

581 ) -> None: 

582 """Adds a `delete` pb to the in-progress batch. 

583 

584 If the in-progress batch already contains a write operation involving 

585 this document reference, the batch will be sealed and added to the commit 

586 queue, and a new batch will be created with this operation as its first 

587 entry. 

588 

589 If this delete operation results in the in-progress batch reaching full 

590 capacity, then the batch will be similarly added to the commit queue, and 

591 a new batch will be created for future operations. 

592 

593 Args: 

594 reference (:class:`~google.cloud.firestore_v1.base_document.BaseDocumentReference`): 

595 Pointer to the document that should be created. 

596 option (:class:`~google.cloud.firestore_v1._helpers.WriteOption`): 

597 Optional flag to modify the nature of this write. 

598 """ 

599 self._verify_not_closed() 

600 

601 if reference._document_path in self._operations_document_paths: 

602 self._enqueue_current_batch() 

603 

604 self._operations.append( 

605 BulkWriterDeleteOperation( 

606 reference=reference, 

607 option=option, 

608 attempts=attempts, 

609 ), 

610 ) 

611 self._operations_document_paths.append(reference._document_path) 

612 

613 self._maybe_enqueue_current_batch() 

614 

615 def set( 

616 self, 

617 reference: BaseDocumentReference, 

618 document_data: Dict, 

619 merge: Union[bool, list] = False, 

620 attempts: int = 0, 

621 ) -> None: 

622 """Adds a `set` pb to the in-progress batch. 

623 

624 If the in-progress batch already contains a write operation involving 

625 this document reference, the batch will be sealed and added to the commit 

626 queue, and a new batch will be created with this operation as its first 

627 entry. 

628 

629 If this set operation results in the in-progress batch reaching full 

630 capacity, then the batch will be similarly added to the commit queue, and 

631 a new batch will be created for future operations. 

632 

633 Args: 

634 reference (:class:`~google.cloud.firestore_v1.base_document.BaseDocumentReference`): 

635 Pointer to the document that should be created. 

636 document_data (dict): 

637 Raw data to save to the server. 

638 merge (bool): 

639 Whether or not to completely overwrite any existing data with 

640 the supplied data. 

641 """ 

642 self._verify_not_closed() 

643 

644 if reference._document_path in self._operations_document_paths: 

645 self._enqueue_current_batch() 

646 

647 self._operations.append( 

648 BulkWriterSetOperation( 

649 reference=reference, 

650 document_data=document_data, 

651 merge=merge, 

652 attempts=attempts, 

653 ) 

654 ) 

655 self._operations_document_paths.append(reference._document_path) 

656 

657 self._maybe_enqueue_current_batch() 

658 

659 def update( 

660 self, 

661 reference: BaseDocumentReference, 

662 field_updates: dict, 

663 option: Optional[_helpers.WriteOption] = None, 

664 attempts: int = 0, 

665 ) -> None: 

666 """Adds an `update` pb to the in-progress batch. 

667 

668 If the in-progress batch already contains a write operation involving 

669 this document reference, the batch will be sealed and added to the commit 

670 queue, and a new batch will be created with this operation as its first 

671 entry. 

672 

673 If this update operation results in the in-progress batch reaching full 

674 capacity, then the batch will be similarly added to the commit queue, and 

675 a new batch will be created for future operations. 

676 

677 Args: 

678 reference (:class:`~google.cloud.firestore_v1.base_document.BaseDocumentReference`): 

679 Pointer to the document that should be created. 

680 field_updates (dict): 

681 Key paths to specific nested data that should be upated. 

682 option (:class:`~google.cloud.firestore_v1._helpers.WriteOption`): 

683 Optional flag to modify the nature of this write. 

684 """ 

685 # This check is copied from other Firestore classes for the purposes of 

686 # surfacing the error immediately. 

687 if option.__class__.__name__ == "ExistsOption": 

688 raise ValueError("you must not pass an explicit write option to update.") 

689 

690 self._verify_not_closed() 

691 

692 if reference._document_path in self._operations_document_paths: 

693 self._enqueue_current_batch() 

694 

695 self._operations.append( 

696 BulkWriterUpdateOperation( 

697 reference=reference, 

698 field_updates=field_updates, 

699 option=option, 

700 attempts=attempts, 

701 ) 

702 ) 

703 self._operations_document_paths.append(reference._document_path) 

704 

705 self._maybe_enqueue_current_batch() 

706 

707 def on_write_result( 

708 self, 

709 callback: Callable[[BaseDocumentReference, WriteResult, "BulkWriter"], None], 

710 ) -> None: 

711 """Sets a callback that will be invoked once for every successful operation.""" 

712 self._success_callback = callback or BulkWriter._default_on_success 

713 

714 def on_batch_result( 

715 self, 

716 callback: Callable[[BulkWriteBatch, BatchWriteResponse, "BulkWriter"], None], 

717 ) -> None: 

718 """Sets a callback that will be invoked once for every successful batch.""" 

719 self._batch_callback = callback or BulkWriter._default_on_batch 

720 

721 def on_write_error( 

722 self, callback: Callable[["BulkWriteFailure", "BulkWriter"], bool] 

723 ) -> None: 

724 """Sets a callback that will be invoked once for every batch that contains 

725 an error.""" 

726 self._error_callback = callback or BulkWriter._default_on_error 

727 

728 def _verify_not_closed(self): 

729 if not self._is_open: 

730 raise Exception("BulkWriter is closed and cannot accept new operations") 

731 

732 

733class BulkWriterOperation: 

734 """Parent class for all operation container classes. 

735 

736 `BulkWriterOperation` exists to house all the necessary information for a 

737 specific write task, including meta information like the current number of 

738 attempts. If a write fails, it is its wrapper `BulkWriteOperation` class 

739 that ferries it into its next retry without getting confused with other 

740 similar writes to the same document. 

741 """ 

742 

743 def add_to_batch(self, batch: BulkWriteBatch): 

744 """Adds `self` to the supplied batch.""" 

745 assert isinstance(batch, BulkWriteBatch) 

746 if isinstance(self, BulkWriterCreateOperation): 

747 return batch.create( 

748 reference=self.reference, 

749 document_data=self.document_data, 

750 ) 

751 

752 if isinstance(self, BulkWriterDeleteOperation): 

753 return batch.delete( 

754 reference=self.reference, 

755 option=self.option, 

756 ) 

757 

758 if isinstance(self, BulkWriterSetOperation): 

759 return batch.set( 

760 reference=self.reference, 

761 document_data=self.document_data, 

762 merge=self.merge, 

763 ) 

764 

765 if isinstance(self, BulkWriterUpdateOperation): 

766 return batch.update( 

767 reference=self.reference, 

768 field_updates=self.field_updates, 

769 option=self.option, 

770 ) 

771 raise TypeError( 

772 f"Unexpected type of {self.__class__.__name__} for batch" 

773 ) # pragma: NO COVER 

774 

775 

776@functools.total_ordering 

777class BaseOperationRetry: 

778 """Parent class for both the @dataclass and old-style `OperationRetry` 

779 classes. 

780 

781 Methods on this class be moved directly to `OperationRetry` when support for 

782 Python 3.6 is dropped and `dataclasses` becomes universal. 

783 """ 

784 

785 def __lt__(self, other: "OperationRetry"): 

786 """Allows use of `bisect` to maintain a sorted list of `OperationRetry` 

787 instances, which in turn allows us to cheaply grab all that are ready to 

788 run.""" 

789 if isinstance(other, OperationRetry): 

790 return self.run_at < other.run_at 

791 elif isinstance(other, datetime.datetime): 

792 return self.run_at < other 

793 return NotImplemented # pragma: NO COVER 

794 

795 def retry(self, bulk_writer: BulkWriter) -> None: 

796 """Call this after waiting any necessary time to re-add the enclosed 

797 operation to the supplied BulkWriter's internal queue.""" 

798 if isinstance(self.operation, BulkWriterCreateOperation): 

799 bulk_writer.create( 

800 reference=self.operation.reference, 

801 document_data=self.operation.document_data, 

802 attempts=self.operation.attempts, 

803 ) 

804 

805 elif isinstance(self.operation, BulkWriterDeleteOperation): 

806 bulk_writer.delete( 

807 reference=self.operation.reference, 

808 option=self.operation.option, 

809 attempts=self.operation.attempts, 

810 ) 

811 

812 elif isinstance(self.operation, BulkWriterSetOperation): 

813 bulk_writer.set( 

814 reference=self.operation.reference, 

815 document_data=self.operation.document_data, 

816 merge=self.operation.merge, 

817 attempts=self.operation.attempts, 

818 ) 

819 

820 elif isinstance(self.operation, BulkWriterUpdateOperation): 

821 bulk_writer.update( 

822 reference=self.operation.reference, 

823 field_updates=self.operation.field_updates, 

824 option=self.operation.option, 

825 attempts=self.operation.attempts, 

826 ) 

827 else: 

828 raise TypeError( 

829 f"Unexpected type of {self.operation.__class__.__name__} for OperationRetry.retry" 

830 ) # pragma: NO COVER 

831 

832 

833@dataclass 

834class BulkWriterOptions: 

835 initial_ops_per_second: int = 500 

836 max_ops_per_second: int = 500 

837 mode: SendMode = SendMode.parallel 

838 retry: BulkRetry = BulkRetry.linear 

839 

840 

841@dataclass 

842class BulkWriteFailure: 

843 operation: BulkWriterOperation 

844 # https://grpc.github.io/grpc/core/md_doc_statuscodes.html 

845 code: int 

846 message: str 

847 

848 @property 

849 def attempts(self) -> int: 

850 return self.operation.attempts 

851 

852 

853@dataclass 

854class OperationRetry(BaseOperationRetry): 

855 """Container for an additional attempt at an operation, scheduled for 

856 the future.""" 

857 

858 operation: BulkWriterOperation 

859 run_at: datetime.datetime 

860 

861 

862@dataclass 

863class BulkWriterCreateOperation(BulkWriterOperation): 

864 """Container for BulkWriter.create() operations.""" 

865 

866 reference: BaseDocumentReference 

867 document_data: Dict 

868 attempts: int = 0 

869 

870 

871@dataclass 

872class BulkWriterUpdateOperation(BulkWriterOperation): 

873 """Container for BulkWriter.update() operations.""" 

874 

875 reference: BaseDocumentReference 

876 field_updates: Dict 

877 option: Optional[_helpers.WriteOption] 

878 attempts: int = 0 

879 

880 

881@dataclass 

882class BulkWriterSetOperation(BulkWriterOperation): 

883 """Container for BulkWriter.set() operations.""" 

884 

885 reference: BaseDocumentReference 

886 document_data: Dict 

887 merge: Union[bool, list] = False 

888 attempts: int = 0 

889 

890 

891@dataclass 

892class BulkWriterDeleteOperation(BulkWriterOperation): 

893 """Container for BulkWriter.delete() operations.""" 

894 

895 reference: BaseDocumentReference 

896 option: Optional[_helpers.WriteOption] 

897 attempts: int = 0