1# Copyright 2021 Google LLC All rights reserved.
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7# http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14
15"""Helpers for efficiently writing large amounts of data to the Google Cloud
16Firestore API."""
17
18import bisect
19import collections
20import concurrent.futures
21import datetime
22import enum
23import functools
24import logging
25import time
26from dataclasses import dataclass
27from typing import TYPE_CHECKING, Callable, Deque, Dict, List, Optional, Union
28
29from google.rpc import status_pb2 # type: ignore
30
31from google.cloud.firestore_v1 import _helpers
32from google.cloud.firestore_v1.base_document import BaseDocumentReference
33from google.cloud.firestore_v1.bulk_batch import BulkWriteBatch
34from google.cloud.firestore_v1.rate_limiter import RateLimiter
35from google.cloud.firestore_v1.types.firestore import BatchWriteResponse
36from google.cloud.firestore_v1.types.write import WriteResult
37
38if TYPE_CHECKING:
39 from google.cloud.firestore_v1.base_client import BaseClient # pragma: NO COVER
40
41
42logger = logging.getLogger(__name__)
43
44
45class BulkRetry(enum.Enum):
46 """Indicator for what retry strategy the BulkWriter should use."""
47
48 # Common exponential backoff algorithm. This strategy is largely incompatible
49 # with the default retry limit of 15, so use with caution.
50 exponential = enum.auto()
51
52 # Default strategy that adds 1 second of delay per retry.
53 linear = enum.auto()
54
55 # Immediate retries with no growing delays.
56 immediate = enum.auto()
57
58
59class SendMode(enum.Enum):
60 """Indicator for whether a BulkWriter should commit batches in the main
61 thread or hand that work off to an executor."""
62
63 # Default strategy that parallelizes network I/O on an executor. You almost
64 # certainly want this.
65 parallel = enum.auto()
66
67 # Alternate strategy which blocks during all network I/O. Much slower, but
68 # assures all batches are sent to the server in order. Note that
69 # `SendMode.serial` is extremely susceptible to slowdowns from retries if
70 # there are a lot of errors.
71 serial = enum.auto()
72
73
74class AsyncBulkWriterMixin:
75 """
76 Mixin which contains the methods on `BulkWriter` which must only be
77 submitted to the executor (or called by functions submitted to the executor).
78 This mixin exists purely for organization and clarity of implementation
79 (e.g., there is no metaclass magic).
80
81 The entrypoint to the parallelizable code path is `_send_batch()`, which is
82 wrapped in a decorator which ensures that the `SendMode` is honored.
83 """
84
85 def _with_send_mode(fn: Callable): # type: ignore
86 """Decorates a method to ensure it is only called via the executor
87 (IFF the SendMode value is SendMode.parallel!).
88
89 Usage:
90
91 @_with_send_mode
92 def my_method(self):
93 parallel_stuff()
94
95 def something_else(self):
96 # Because of the decorator around `my_method`, the following
97 # method invocation:
98 self.my_method()
99 # becomes equivalent to `self._executor.submit(self.my_method)`
100 # when the send mode is `SendMode.parallel`.
101
102 Use on entrypoint methods for code paths that *must* be parallelized.
103 """
104
105 @functools.wraps(fn)
106 def wrapper(self, *args, **kwargs):
107 if self._send_mode == SendMode.parallel:
108 return self._executor.submit(lambda: fn(self, *args, **kwargs))
109 else:
110 # For code parity, even `SendMode.serial` scenarios should return
111 # a future here. Anything else would badly complicate calling code.
112 result = fn(self, *args, **kwargs)
113 future: concurrent.futures.Future = concurrent.futures.Future()
114 future.set_result(result)
115 return future
116
117 return wrapper
118
119 @_with_send_mode
120 def _send_batch( # type: ignore
121 self: "BulkWriter",
122 batch: BulkWriteBatch,
123 operations: List["BulkWriterOperation"],
124 ):
125 """Sends a batch without regard to rate limits, meaning limits must have
126 already been checked. To that end, do not call this directly; instead,
127 call `_send_until_queue_is_empty`.
128
129 Args:
130 batch(:class:`~google.cloud.firestore_v1.base_batch.BulkWriteBatch`)
131 """
132 _len_batch: int = len(batch)
133 self._in_flight_documents += _len_batch
134 response: BatchWriteResponse = self._send(batch)
135 self._in_flight_documents -= _len_batch
136
137 # Update bookkeeping totals
138 self._total_batches_sent += 1
139 self._total_write_operations += _len_batch
140
141 self._process_response(batch, response, operations)
142
143 def _process_response( # type: ignore
144 self: "BulkWriter",
145 batch: BulkWriteBatch,
146 response: BatchWriteResponse,
147 operations: List["BulkWriterOperation"],
148 ):
149 """Invokes submitted callbacks for each batch and each operation within
150 each batch. As this is called from `_send_batch()`, this is parallelized
151 if we are in that mode.
152 """
153 batch_references: List[BaseDocumentReference] = list(
154 batch._document_references.values(),
155 )
156 self._batch_callback(batch, response, self)
157
158 status: status_pb2.Status
159 for index, status in enumerate(response.status):
160 if status.code == 0:
161 self._success_callback(
162 # DocumentReference
163 batch_references[index],
164 # WriteResult
165 response.write_results[index],
166 # BulkWriter
167 self,
168 )
169 else:
170 operation: BulkWriterOperation = operations[index]
171 should_retry: bool = self._error_callback(
172 # BulkWriteFailure
173 BulkWriteFailure(
174 operation=operation,
175 code=status.code,
176 message=status.message,
177 ),
178 # BulkWriter
179 self,
180 )
181 if should_retry:
182 operation.attempts += 1
183 self._retry_operation(operation)
184
185 def _retry_operation( # type: ignore
186 self: "BulkWriter",
187 operation: "BulkWriterOperation",
188 ):
189 delay: int = 0
190 if self._options.retry == BulkRetry.exponential:
191 delay = operation.attempts**2 # pragma: NO COVER
192 elif self._options.retry == BulkRetry.linear:
193 delay = operation.attempts
194
195 run_at = datetime.datetime.now(tz=datetime.timezone.utc) + datetime.timedelta(
196 seconds=delay
197 )
198
199 # Use of `bisect.insort` maintains the requirement that `self._retries`
200 # always remain sorted by each object's `run_at` time. Note that it is
201 # able to do this because `OperationRetry` instances are entirely sortable
202 # by their `run_at` value.
203 bisect.insort(
204 self._retries,
205 OperationRetry(operation=operation, run_at=run_at),
206 )
207
208 def _send(self, batch: BulkWriteBatch) -> BatchWriteResponse:
209 """Hook for overwriting the sending of batches. As this is only called
210 from `_send_batch()`, this is parallelized if we are in that mode.
211 """
212 return batch.commit() # pragma: NO COVER
213
214
215class BulkWriter(AsyncBulkWriterMixin):
216 """
217 Accumulate and efficiently save large amounts of document write operations
218 to the server.
219
220 BulkWriter can handle large data migrations or updates, buffering records
221 in memory and submitting them to the server in batches of 20.
222
223 The submission of batches is internally parallelized with a ThreadPoolExecutor,
224 meaning end developers do not need to manage an event loop or worry about asyncio
225 to see parallelization speed ups (which can easily 10x throughput). Because
226 of this, there is no companion `AsyncBulkWriter` class, as is usually seen
227 with other utility classes.
228
229 Usage:
230
231 .. code-block:: python
232
233 # Instantiate the BulkWriter. This works from either `Client` or
234 # `AsyncClient`.
235 db = firestore.Client()
236 bulk_writer = db.bulk_writer()
237
238 # Attach an optional success listener to be called once per document.
239 bulk_writer.on_write_result(
240 lambda reference, result, bulk_writer: print(f'Saved {reference._document_path}')
241 )
242
243 # Queue an arbitrary amount of write operations.
244 # Assume `my_new_records` is a list of (DocumentReference, dict,)
245 # tuple-pairs that you supply.
246
247 reference: DocumentReference
248 data: dict
249 for reference, data in my_new_records:
250 bulk_writer.create(reference, data)
251
252 # Block until all pooled writes are complete.
253 bulk_writer.flush()
254
255 Args:
256 client(:class:`~google.cloud.firestore_v1.client.Client`):
257 The client that created this BulkWriter.
258 """
259
260 batch_size: int = 20
261
262 def __init__(
263 self,
264 client: Optional["BaseClient"] = None,
265 options: Optional["BulkWriterOptions"] = None,
266 ):
267 # Because `BulkWriter` instances are all synchronous/blocking on the
268 # main thread (instead using other threads for asynchrony), it is
269 # incompatible with AsyncClient's various methods that return Futures.
270 # `BulkWriter` parallelizes all of its network I/O without the developer
271 # having to worry about awaiting async methods, so we must convert an
272 # AsyncClient instance into a plain Client instance.
273 if type(client).__name__ == "AsyncClient":
274 self._client = client._to_sync_copy() # type: ignore
275 else:
276 self._client = client
277 self._options = options or BulkWriterOptions()
278 self._send_mode = self._options.mode
279
280 self._operations: List[BulkWriterOperation]
281 # List of the `_document_path` attribute for each DocumentReference
282 # contained in the current `self._operations`. This is reset every time
283 # `self._operations` is reset.
284 self._operations_document_paths: List[BaseDocumentReference]
285 self._reset_operations()
286
287 # List of all `BulkWriterOperation` objects that are waiting to be retried.
288 # Each such object is wrapped in an `OperationRetry` object which pairs
289 # the raw operation with the `datetime` of its next scheduled attempt.
290 # `self._retries` must always remain sorted for efficient reads, so it is
291 # required to only ever add elements via `bisect.insort`.
292 self._retries: Deque["OperationRetry"] = collections.deque([])
293
294 self._queued_batches: Deque[List[BulkWriterOperation]] = collections.deque([])
295 self._is_open: bool = True
296
297 # This list will go on to store the future returned from each submission
298 # to the executor, for the purpose of awaiting all of those futures'
299 # completions in the `flush` method.
300 self._pending_batch_futures: List[concurrent.futures.Future] = []
301
302 self._success_callback: Callable[
303 [BaseDocumentReference, WriteResult, "BulkWriter"], None
304 ] = BulkWriter._default_on_success
305 self._batch_callback: Callable[
306 [BulkWriteBatch, BatchWriteResponse, "BulkWriter"], None
307 ] = BulkWriter._default_on_batch
308 self._error_callback: Callable[
309 [BulkWriteFailure, BulkWriter], bool
310 ] = BulkWriter._default_on_error
311
312 self._in_flight_documents: int = 0
313 self._rate_limiter = RateLimiter(
314 initial_tokens=self._options.initial_ops_per_second,
315 global_max_tokens=self._options.max_ops_per_second,
316 )
317
318 # Keep track of progress as batches and write operations are completed
319 self._total_batches_sent: int = 0
320 self._total_write_operations: int = 0
321
322 self._executor: concurrent.futures.ThreadPoolExecutor
323 self._ensure_executor()
324
325 @staticmethod
326 def _default_on_batch(
327 batch: BulkWriteBatch,
328 response: BatchWriteResponse,
329 bulk_writer: "BulkWriter",
330 ) -> None:
331 pass
332
333 @staticmethod
334 def _default_on_success(
335 reference: BaseDocumentReference,
336 result: WriteResult,
337 bulk_writer: "BulkWriter",
338 ) -> None:
339 pass
340
341 @staticmethod
342 def _default_on_error(error: "BulkWriteFailure", bulk_writer: "BulkWriter") -> bool:
343 # Default number of retries for each operation is 15. This is a scary
344 # number to combine with an exponential backoff, and as such, our default
345 # backoff strategy is linear instead of exponential.
346 return error.attempts < 15
347
348 def _reset_operations(self) -> None:
349 self._operations = []
350 self._operations_document_paths = []
351
352 def _ensure_executor(self):
353 """Reboots the executor used to send batches if it has been shutdown."""
354 if getattr(self, "_executor", None) is None or self._executor._shutdown:
355 self._executor = self._instantiate_executor()
356
357 def _ensure_sending(self):
358 self._ensure_executor()
359 self._send_until_queue_is_empty()
360
361 def _instantiate_executor(self):
362 return concurrent.futures.ThreadPoolExecutor()
363
364 def flush(self):
365 """
366 Block until all pooled write operations are complete and then resume
367 accepting new write operations.
368 """
369 # Calling `flush` consecutively is a no-op.
370 if self._executor._shutdown:
371 return
372
373 while True:
374 # Queue any waiting operations and try our luck again.
375 # This can happen if users add a number of records not divisible by
376 # 20 and then call flush (which should be ~19 out of 20 use cases).
377 # Execution will arrive here and find the leftover operations that
378 # never filled up a batch organically, and so we must send them here.
379 if self._operations:
380 self._enqueue_current_batch()
381 continue
382
383 # If we find queued but unsent batches or pending retries, begin
384 # sending immediately. Note that if we are waiting on retries, but
385 # they have longer to wait as specified by the retry backoff strategy,
386 # we may have to make several passes through this part of the loop.
387 # (This is related to the sleep and its explanation below.)
388 if self._queued_batches or self._retries:
389 self._ensure_sending()
390
391 # This sleep prevents max-speed laps through this loop, which can
392 # and will happen if the BulkWriter is doing nothing except waiting
393 # on retries to be ready to re-send. Removing this sleep will cause
394 # whatever thread is running this code to sit near 100% CPU until
395 # all retries are abandoned or successfully resolved.
396 time.sleep(0.1)
397 continue
398
399 # We store the executor's Future from each batch send operation, so
400 # the first pass through here, we are guaranteed to find "pending"
401 # batch futures and have to wait. However, the second pass through
402 # will be fast unless the last batch introduced more retries.
403 if self._pending_batch_futures:
404 _batches = self._pending_batch_futures
405 self._pending_batch_futures = []
406 concurrent.futures.wait(_batches)
407
408 # Continuing is critical here (as opposed to breaking) because
409 # the final batch may have introduced retries which is most
410 # straightforwardly verified by heading back to the top of the loop.
411 continue
412
413 break
414
415 # We no longer expect to have any queued batches or pending futures,
416 # so the executor can be shutdown.
417 self._executor.shutdown()
418
419 def close(self):
420 """
421 Block until all pooled write operations are complete and then reject
422 any further write operations.
423 """
424 self._is_open = False
425 self.flush()
426
427 def _maybe_enqueue_current_batch(self):
428 """
429 Checks to see whether the in-progress batch is full and, if it is,
430 adds it to the sending queue.
431 """
432 if len(self._operations) >= self.batch_size:
433 self._enqueue_current_batch()
434
435 def _enqueue_current_batch(self):
436 """Adds the current batch to the back of the sending line, resets the
437 list of queued ops, and begins the process of actually sending whatever
438 batch is in the front of the line, which will often be a different batch.
439 """
440 # Put our batch in the back of the sending line
441 self._queued_batches.append(self._operations)
442
443 # Reset the local store of operations
444 self._reset_operations()
445
446 # The sending loop powers off upon reaching the end of the queue, so
447 # here we make sure that is running.
448 self._ensure_sending()
449
450 def _send_until_queue_is_empty(self) -> None:
451 """First domino in the sending codepath. This does not need to be
452 parallelized for two reasons:
453
454 1) Putting this on a worker thread could lead to two running in parallel
455 and thus unpredictable commit ordering or failure to adhere to
456 rate limits.
457 2) This method only blocks when `self._request_send()` does not immediately
458 return, and in that case, the BulkWriter's ramp-up / throttling logic
459 has determined that it is attempting to exceed the maximum write speed,
460 and so parallelizing this method would not increase performance anyway.
461
462 Once `self._request_send()` returns, this method calls `self._send_batch()`,
463 which parallelizes itself if that is our SendMode value.
464
465 And once `self._send_batch()` is called (which does not block if we are
466 sending in parallel), jumps back to the top and re-checks for any queued
467 batches.
468
469 Note that for sufficiently large data migrations, this can block the
470 submission of additional write operations (e.g., the CRUD methods);
471 but again, that is only if the maximum write speed is being exceeded,
472 and thus this scenario does not actually further reduce performance.
473 """
474 self._schedule_ready_retries()
475
476 while self._queued_batches:
477 # For FIFO order, add to the right of this deque (via `append`) and take
478 # from the left (via `popleft`).
479 operations: List[BulkWriterOperation] = self._queued_batches.popleft()
480
481 # Block until we are cleared for takeoff, which is fine because this
482 # returns instantly unless the rate limiting logic determines that we
483 # are attempting to exceed the maximum write speed.
484 self._request_send(len(operations))
485
486 # Handle some bookkeeping, and ultimately put these bits on the wire.
487 batch = BulkWriteBatch(client=self._client)
488 op: BulkWriterOperation
489 for op in operations:
490 op.add_to_batch(batch)
491
492 # `_send_batch` is optionally parallelized by `@_with_send_mode`.
493 future = self._send_batch(batch=batch, operations=operations)
494 self._pending_batch_futures.append(future)
495
496 self._schedule_ready_retries()
497 return None
498
499 def _schedule_ready_retries(self) -> None:
500 """Grabs all ready retries and re-queues them."""
501
502 # Because `self._retries` always exists in a sorted state (thanks to only
503 # ever adding to it via `bisect.insort`), and because `OperationRetry`
504 # objects are comparable against `datetime` objects, this bisect functionally
505 # returns the number of retires that are ready for immediate reenlistment.
506 take_until_index = bisect.bisect(
507 self._retries, datetime.datetime.now(tz=datetime.timezone.utc)
508 )
509
510 for _ in range(take_until_index):
511 retry: OperationRetry = self._retries.popleft()
512 retry.retry(self)
513 return None
514
515 def _request_send(self, batch_size: int) -> bool:
516 # Set up this boolean to avoid repeatedly taking tokens if we're only
517 # waiting on the `max_in_flight` limit.
518 have_received_tokens: bool = False
519
520 while True:
521 # To avoid bottlenecks on the server, an additional limit is that no
522 # more write operations can be "in flight" (sent but still awaiting
523 # response) at any given point than the maximum number of writes per
524 # second.
525 under_threshold: bool = (
526 self._in_flight_documents <= self._rate_limiter._maximum_tokens
527 )
528 # Ask for tokens each pass through this loop until they are granted,
529 # and then stop.
530 have_received_tokens = have_received_tokens or bool(
531 self._rate_limiter.take_tokens(batch_size)
532 )
533 if not under_threshold or not have_received_tokens:
534 # Try again until both checks are true.
535 # Note that this sleep is helpful to prevent the main BulkWriter
536 # thread from spinning through this loop as fast as possible and
537 # pointlessly burning CPU while we wait for the arrival of a
538 # fixed moment in the future.
539 time.sleep(0.01)
540 continue
541
542 return True
543
544 def create(
545 self,
546 reference: BaseDocumentReference,
547 document_data: Dict,
548 attempts: int = 0,
549 ) -> None:
550 """Adds a `create` pb to the in-progress batch.
551
552 If the in-progress batch already contains a write operation involving
553 this document reference, the batch will be sealed and added to the commit
554 queue, and a new batch will be created with this operation as its first
555 entry.
556
557 If this create operation results in the in-progress batch reaching full
558 capacity, then the batch will be similarly added to the commit queue, and
559 a new batch will be created for future operations.
560
561 Args:
562 reference (:class:`~google.cloud.firestore_v1.base_document.BaseDocumentReference`):
563 Pointer to the document that should be created.
564 document_data (dict):
565 Raw data to save to the server.
566 """
567 self._verify_not_closed()
568
569 if reference._document_path in self._operations_document_paths:
570 self._enqueue_current_batch()
571
572 self._operations.append(
573 BulkWriterCreateOperation(
574 reference=reference,
575 document_data=document_data,
576 attempts=attempts,
577 ),
578 )
579 self._operations_document_paths.append(reference._document_path)
580
581 self._maybe_enqueue_current_batch()
582
583 def delete(
584 self,
585 reference: BaseDocumentReference,
586 option: Optional[_helpers.WriteOption] = None,
587 attempts: int = 0,
588 ) -> None:
589 """Adds a `delete` pb to the in-progress batch.
590
591 If the in-progress batch already contains a write operation involving
592 this document reference, the batch will be sealed and added to the commit
593 queue, and a new batch will be created with this operation as its first
594 entry.
595
596 If this delete operation results in the in-progress batch reaching full
597 capacity, then the batch will be similarly added to the commit queue, and
598 a new batch will be created for future operations.
599
600 Args:
601 reference (:class:`~google.cloud.firestore_v1.base_document.BaseDocumentReference`):
602 Pointer to the document that should be created.
603 option (:class:`~google.cloud.firestore_v1._helpers.WriteOption`):
604 Optional flag to modify the nature of this write.
605 """
606 self._verify_not_closed()
607
608 if reference._document_path in self._operations_document_paths:
609 self._enqueue_current_batch()
610
611 self._operations.append(
612 BulkWriterDeleteOperation(
613 reference=reference,
614 option=option,
615 attempts=attempts,
616 ),
617 )
618 self._operations_document_paths.append(reference._document_path)
619
620 self._maybe_enqueue_current_batch()
621
622 def set(
623 self,
624 reference: BaseDocumentReference,
625 document_data: Dict,
626 merge: Union[bool, list] = False,
627 attempts: int = 0,
628 ) -> None:
629 """Adds a `set` pb to the in-progress batch.
630
631 If the in-progress batch already contains a write operation involving
632 this document reference, the batch will be sealed and added to the commit
633 queue, and a new batch will be created with this operation as its first
634 entry.
635
636 If this set operation results in the in-progress batch reaching full
637 capacity, then the batch will be similarly added to the commit queue, and
638 a new batch will be created for future operations.
639
640 Args:
641 reference (:class:`~google.cloud.firestore_v1.base_document.BaseDocumentReference`):
642 Pointer to the document that should be created.
643 document_data (dict):
644 Raw data to save to the server.
645 merge (bool):
646 Whether or not to completely overwrite any existing data with
647 the supplied data.
648 """
649 self._verify_not_closed()
650
651 if reference._document_path in self._operations_document_paths:
652 self._enqueue_current_batch()
653
654 self._operations.append(
655 BulkWriterSetOperation(
656 reference=reference,
657 document_data=document_data,
658 merge=merge,
659 attempts=attempts,
660 )
661 )
662 self._operations_document_paths.append(reference._document_path)
663
664 self._maybe_enqueue_current_batch()
665
666 def update(
667 self,
668 reference: BaseDocumentReference,
669 field_updates: dict,
670 option: Optional[_helpers.WriteOption] = None,
671 attempts: int = 0,
672 ) -> None:
673 """Adds an `update` pb to the in-progress batch.
674
675 If the in-progress batch already contains a write operation involving
676 this document reference, the batch will be sealed and added to the commit
677 queue, and a new batch will be created with this operation as its first
678 entry.
679
680 If this update operation results in the in-progress batch reaching full
681 capacity, then the batch will be similarly added to the commit queue, and
682 a new batch will be created for future operations.
683
684 Args:
685 reference (:class:`~google.cloud.firestore_v1.base_document.BaseDocumentReference`):
686 Pointer to the document that should be created.
687 field_updates (dict):
688 Key paths to specific nested data that should be upated.
689 option (:class:`~google.cloud.firestore_v1._helpers.WriteOption`):
690 Optional flag to modify the nature of this write.
691 """
692 # This check is copied from other Firestore classes for the purposes of
693 # surfacing the error immediately.
694 if option.__class__.__name__ == "ExistsOption":
695 raise ValueError("you must not pass an explicit write option to update.")
696
697 self._verify_not_closed()
698
699 if reference._document_path in self._operations_document_paths:
700 self._enqueue_current_batch()
701
702 self._operations.append(
703 BulkWriterUpdateOperation(
704 reference=reference,
705 field_updates=field_updates,
706 option=option,
707 attempts=attempts,
708 )
709 )
710 self._operations_document_paths.append(reference._document_path)
711
712 self._maybe_enqueue_current_batch()
713
714 def on_write_result(
715 self,
716 callback: Optional[
717 Callable[[BaseDocumentReference, WriteResult, "BulkWriter"], None]
718 ],
719 ) -> None:
720 """Sets a callback that will be invoked once for every successful operation."""
721 self._success_callback = callback or BulkWriter._default_on_success
722
723 def on_batch_result(
724 self,
725 callback: Optional[
726 Callable[[BulkWriteBatch, BatchWriteResponse, "BulkWriter"], None]
727 ],
728 ) -> None:
729 """Sets a callback that will be invoked once for every successful batch."""
730 self._batch_callback = callback or BulkWriter._default_on_batch
731
732 def on_write_error(
733 self, callback: Optional[Callable[["BulkWriteFailure", "BulkWriter"], bool]]
734 ) -> None:
735 """Sets a callback that will be invoked once for every batch that contains
736 an error."""
737 self._error_callback = callback or BulkWriter._default_on_error
738
739 def _verify_not_closed(self):
740 if not self._is_open:
741 raise Exception("BulkWriter is closed and cannot accept new operations")
742
743
744class BulkWriterOperation:
745 """Parent class for all operation container classes.
746
747 `BulkWriterOperation` exists to house all the necessary information for a
748 specific write task, including meta information like the current number of
749 attempts. If a write fails, it is its wrapper `BulkWriteOperation` class
750 that ferries it into its next retry without getting confused with other
751 similar writes to the same document.
752 """
753
754 def __init__(self, attempts: int = 0):
755 self.attempts = attempts
756
757 def add_to_batch(self, batch: BulkWriteBatch):
758 """Adds `self` to the supplied batch."""
759 assert isinstance(batch, BulkWriteBatch)
760 if isinstance(self, BulkWriterCreateOperation):
761 return batch.create(
762 reference=self.reference,
763 document_data=self.document_data,
764 )
765
766 if isinstance(self, BulkWriterDeleteOperation):
767 return batch.delete(
768 reference=self.reference,
769 option=self.option,
770 )
771
772 if isinstance(self, BulkWriterSetOperation):
773 return batch.set(
774 reference=self.reference,
775 document_data=self.document_data,
776 merge=self.merge,
777 )
778
779 if isinstance(self, BulkWriterUpdateOperation):
780 return batch.update(
781 reference=self.reference,
782 field_updates=self.field_updates,
783 option=self.option,
784 )
785 raise TypeError(
786 f"Unexpected type of {self.__class__.__name__} for batch"
787 ) # pragma: NO COVER
788
789
790@functools.total_ordering
791class BaseOperationRetry:
792 """Parent class for both the @dataclass and old-style `OperationRetry`
793 classes.
794
795 Methods on this class be moved directly to `OperationRetry` when support for
796 Python 3.6 is dropped and `dataclasses` becomes universal.
797 """
798
799 def __lt__(self: "OperationRetry", other: "OperationRetry"): # type: ignore
800 """Allows use of `bisect` to maintain a sorted list of `OperationRetry`
801 instances, which in turn allows us to cheaply grab all that are ready to
802 run."""
803 if isinstance(other, OperationRetry):
804 return self.run_at < other.run_at
805 elif isinstance(other, datetime.datetime):
806 return self.run_at < other
807 return NotImplemented # pragma: NO COVER
808
809 def retry(self: "OperationRetry", bulk_writer: BulkWriter) -> None: # type: ignore
810 """Call this after waiting any necessary time to re-add the enclosed
811 operation to the supplied BulkWriter's internal queue."""
812 if isinstance(self.operation, BulkWriterCreateOperation):
813 bulk_writer.create(
814 reference=self.operation.reference,
815 document_data=self.operation.document_data,
816 attempts=self.operation.attempts,
817 )
818
819 elif isinstance(self.operation, BulkWriterDeleteOperation):
820 bulk_writer.delete(
821 reference=self.operation.reference,
822 option=self.operation.option,
823 attempts=self.operation.attempts,
824 )
825
826 elif isinstance(self.operation, BulkWriterSetOperation):
827 bulk_writer.set(
828 reference=self.operation.reference,
829 document_data=self.operation.document_data,
830 merge=self.operation.merge,
831 attempts=self.operation.attempts,
832 )
833
834 elif isinstance(self.operation, BulkWriterUpdateOperation):
835 bulk_writer.update(
836 reference=self.operation.reference,
837 field_updates=self.operation.field_updates,
838 option=self.operation.option,
839 attempts=self.operation.attempts,
840 )
841 else:
842 raise TypeError(
843 f"Unexpected type of {self.operation.__class__.__name__} for OperationRetry.retry"
844 ) # pragma: NO COVER
845
846
847@dataclass
848class BulkWriterOptions:
849 initial_ops_per_second: int = 500
850 max_ops_per_second: int = 500
851 mode: SendMode = SendMode.parallel
852 retry: BulkRetry = BulkRetry.linear
853
854
855@dataclass
856class BulkWriteFailure:
857 operation: BulkWriterOperation
858 # https://grpc.github.io/grpc/core/md_doc_statuscodes.html
859 code: int
860 message: str
861
862 @property
863 def attempts(self) -> int:
864 return self.operation.attempts
865
866
867@dataclass
868class OperationRetry(BaseOperationRetry):
869 """Container for an additional attempt at an operation, scheduled for
870 the future."""
871
872 operation: BulkWriterOperation
873 run_at: datetime.datetime
874
875
876@dataclass
877class BulkWriterCreateOperation(BulkWriterOperation):
878 """Container for BulkWriter.create() operations."""
879
880 reference: BaseDocumentReference
881 document_data: Dict
882 attempts: int = 0
883
884
885@dataclass
886class BulkWriterUpdateOperation(BulkWriterOperation):
887 """Container for BulkWriter.update() operations."""
888
889 reference: BaseDocumentReference
890 field_updates: Dict
891 option: Optional[_helpers.WriteOption]
892 attempts: int = 0
893
894
895@dataclass
896class BulkWriterSetOperation(BulkWriterOperation):
897 """Container for BulkWriter.set() operations."""
898
899 reference: BaseDocumentReference
900 document_data: Dict
901 merge: Union[bool, list] = False
902 attempts: int = 0
903
904
905@dataclass
906class BulkWriterDeleteOperation(BulkWriterOperation):
907 """Container for BulkWriter.delete() operations."""
908
909 reference: BaseDocumentReference
910 option: Optional[_helpers.WriteOption]
911 attempts: int = 0