Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/google/cloud/firestore_v1/bulk_writer.py: 42%
264 statements
« prev ^ index » next coverage.py v7.3.2, created at 2023-12-09 06:27 +0000
« prev ^ index » next coverage.py v7.3.2, created at 2023-12-09 06:27 +0000
1# Copyright 2021 Google LLC All rights reserved.
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7# http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
15"""Helpers for efficiently writing large amounts of data to the Google Cloud
16Firestore API."""
18import bisect
19import collections
20import concurrent.futures
21import datetime
22import enum
23import functools
24import logging
25import time
27from dataclasses import dataclass
28from typing import Callable, Dict, List, Optional, Union, TYPE_CHECKING
30from google.rpc import status_pb2 # type: ignore
32from google.cloud.firestore_v1 import _helpers
33from google.cloud.firestore_v1.base_document import BaseDocumentReference
34from google.cloud.firestore_v1.bulk_batch import BulkWriteBatch
35from google.cloud.firestore_v1.rate_limiter import RateLimiter
36from google.cloud.firestore_v1.types.firestore import BatchWriteResponse
37from google.cloud.firestore_v1.types.write import WriteResult
39if TYPE_CHECKING:
40 from google.cloud.firestore_v1.base_client import BaseClient # pragma: NO COVER
43logger = logging.getLogger(__name__)
46class BulkRetry(enum.Enum):
47 """Indicator for what retry strategy the BulkWriter should use."""
49 # Common exponential backoff algorithm. This strategy is largely incompatible
50 # with the default retry limit of 15, so use with caution.
51 exponential = enum.auto()
53 # Default strategy that adds 1 second of delay per retry.
54 linear = enum.auto()
56 # Immediate retries with no growing delays.
57 immediate = enum.auto()
60class SendMode(enum.Enum):
61 """Indicator for whether a BulkWriter should commit batches in the main
62 thread or hand that work off to an executor."""
64 # Default strategy that parallelizes network I/O on an executor. You almost
65 # certainly want this.
66 parallel = enum.auto()
68 # Alternate strategy which blocks during all network I/O. Much slower, but
69 # assures all batches are sent to the server in order. Note that
70 # `SendMode.serial` is extremely susceptible to slowdowns from retries if
71 # there are a lot of errors.
72 serial = enum.auto()
75class AsyncBulkWriterMixin:
76 """
77 Mixin which contains the methods on `BulkWriter` which must only be
78 submitted to the executor (or called by functions submitted to the executor).
79 This mixin exists purely for organization and clarity of implementation
80 (e.g., there is no metaclass magic).
82 The entrypoint to the parallelizable code path is `_send_batch()`, which is
83 wrapped in a decorator which ensures that the `SendMode` is honored.
84 """
86 def _with_send_mode(fn):
87 """Decorates a method to ensure it is only called via the executor
88 (IFF the SendMode value is SendMode.parallel!).
90 Usage:
92 @_with_send_mode
93 def my_method(self):
94 parallel_stuff()
96 def something_else(self):
97 # Because of the decorator around `my_method`, the following
98 # method invocation:
99 self.my_method()
100 # becomes equivalent to `self._executor.submit(self.my_method)`
101 # when the send mode is `SendMode.parallel`.
103 Use on entrypoint methods for code paths that *must* be parallelized.
104 """
106 @functools.wraps(fn)
107 def wrapper(self, *args, **kwargs):
108 if self._send_mode == SendMode.parallel:
109 return self._executor.submit(lambda: fn(self, *args, **kwargs))
110 else:
111 # For code parity, even `SendMode.serial` scenarios should return
112 # a future here. Anything else would badly complicate calling code.
113 result = fn(self, *args, **kwargs)
114 future = concurrent.futures.Future()
115 future.set_result(result)
116 return future
118 return wrapper
120 @_with_send_mode
121 def _send_batch(
122 self, batch: BulkWriteBatch, operations: List["BulkWriterOperation"]
123 ):
124 """Sends a batch without regard to rate limits, meaning limits must have
125 already been checked. To that end, do not call this directly; instead,
126 call `_send_until_queue_is_empty`.
128 Args:
129 batch(:class:`~google.cloud.firestore_v1.base_batch.BulkWriteBatch`)
130 """
131 _len_batch: int = len(batch)
132 self._in_flight_documents += _len_batch
133 response: BatchWriteResponse = self._send(batch)
134 self._in_flight_documents -= _len_batch
136 # Update bookkeeping totals
137 self._total_batches_sent += 1
138 self._total_write_operations += _len_batch
140 self._process_response(batch, response, operations)
142 def _process_response(
143 self,
144 batch: BulkWriteBatch,
145 response: BatchWriteResponse,
146 operations: List["BulkWriterOperation"],
147 ) -> None:
148 """Invokes submitted callbacks for each batch and each operation within
149 each batch. As this is called from `_send_batch()`, this is parallelized
150 if we are in that mode.
151 """
152 batch_references: List[BaseDocumentReference] = list(
153 batch._document_references.values(),
154 )
155 self._batch_callback(batch, response, self)
157 status: status_pb2.Status
158 for index, status in enumerate(response.status):
159 if status.code == 0:
160 self._success_callback(
161 # DocumentReference
162 batch_references[index],
163 # WriteResult
164 response.write_results[index],
165 # BulkWriter
166 self,
167 )
168 else:
169 operation: BulkWriterOperation = operations[index]
170 should_retry: bool = self._error_callback(
171 # BulkWriteFailure
172 BulkWriteFailure(
173 operation=operation,
174 code=status.code,
175 message=status.message,
176 ),
177 # BulkWriter
178 self,
179 )
180 if should_retry:
181 operation.attempts += 1
182 self._retry_operation(operation)
184 def _retry_operation(
185 self,
186 operation: "BulkWriterOperation",
187 ) -> concurrent.futures.Future:
188 delay: int = 0
189 if self._options.retry == BulkRetry.exponential:
190 delay = operation.attempts**2 # pragma: NO COVER
191 elif self._options.retry == BulkRetry.linear:
192 delay = operation.attempts
194 run_at = datetime.datetime.now(tz=datetime.timezone.utc) + datetime.timedelta(
195 seconds=delay
196 )
198 # Use of `bisect.insort` maintains the requirement that `self._retries`
199 # always remain sorted by each object's `run_at` time. Note that it is
200 # able to do this because `OperationRetry` instances are entirely sortable
201 # by their `run_at` value.
202 bisect.insort(
203 self._retries,
204 OperationRetry(operation=operation, run_at=run_at),
205 )
207 def _send(self, batch: BulkWriteBatch) -> BatchWriteResponse:
208 """Hook for overwriting the sending of batches. As this is only called
209 from `_send_batch()`, this is parallelized if we are in that mode.
210 """
211 return batch.commit() # pragma: NO COVER
214class BulkWriter(AsyncBulkWriterMixin):
215 """
216 Accumulate and efficiently save large amounts of document write operations
217 to the server.
219 BulkWriter can handle large data migrations or updates, buffering records
220 in memory and submitting them to the server in batches of 20.
222 The submission of batches is internally parallelized with a ThreadPoolExecutor,
223 meaning end developers do not need to manage an event loop or worry about asyncio
224 to see parallelization speed ups (which can easily 10x throughput). Because
225 of this, there is no companion `AsyncBulkWriter` class, as is usually seen
226 with other utility classes.
228 Usage:
230 # Instantiate the BulkWriter. This works from either `Client` or
231 # `AsyncClient`.
232 db = firestore.Client()
233 bulk_writer = db.bulk_writer()
235 # Attach an optional success listener to be called once per document.
236 bulk_writer.on_write_result(
237 lambda reference, result, bulk_writer: print(f'Saved {reference._document_path}')
238 )
240 # Queue an arbitrary amount of write operations.
241 # Assume `my_new_records` is a list of (DocumentReference, dict,)
242 # tuple-pairs that you supply.
244 reference: DocumentReference
245 data: dict
246 for reference, data in my_new_records:
247 bulk_writer.create(reference, data)
249 # Block until all pooled writes are complete.
250 bulk_writer.flush()
252 Args:
253 client(:class:`~google.cloud.firestore_v1.client.Client`):
254 The client that created this BulkWriter.
255 """
257 batch_size: int = 20
259 def __init__(
260 self,
261 client: "BaseClient" = None,
262 options: Optional["BulkWriterOptions"] = None,
263 ):
264 # Because `BulkWriter` instances are all synchronous/blocking on the
265 # main thread (instead using other threads for asynchrony), it is
266 # incompatible with AsyncClient's various methods that return Futures.
267 # `BulkWriter` parallelizes all of its network I/O without the developer
268 # having to worry about awaiting async methods, so we must convert an
269 # AsyncClient instance into a plain Client instance.
270 self._client = (
271 client._to_sync_copy() if type(client).__name__ == "AsyncClient" else client
272 )
273 self._options = options or BulkWriterOptions()
274 self._send_mode = self._options.mode
276 self._operations: List[BulkWriterOperation]
277 # List of the `_document_path` attribute for each DocumentReference
278 # contained in the current `self._operations`. This is reset every time
279 # `self._operations` is reset.
280 self._operations_document_paths: List[BaseDocumentReference]
281 self._reset_operations()
283 # List of all `BulkWriterOperation` objects that are waiting to be retried.
284 # Each such object is wrapped in an `OperationRetry` object which pairs
285 # the raw operation with the `datetime` of its next scheduled attempt.
286 # `self._retries` must always remain sorted for efficient reads, so it is
287 # required to only ever add elements via `bisect.insort`.
288 self._retries: collections.deque["OperationRetry"] = collections.deque([])
290 self._queued_batches = collections.deque([])
291 self._is_open: bool = True
293 # This list will go on to store the future returned from each submission
294 # to the executor, for the purpose of awaiting all of those futures'
295 # completions in the `flush` method.
296 self._pending_batch_futures: List[concurrent.futures.Future] = []
298 self._success_callback: Callable[
299 [BaseDocumentReference, WriteResult, "BulkWriter"], None
300 ] = BulkWriter._default_on_success
301 self._batch_callback: Callable[
302 [BulkWriteBatch, BatchWriteResponse, "BulkWriter"], None
303 ] = BulkWriter._default_on_batch
304 self._error_callback: Callable[
305 [BulkWriteFailure, BulkWriter], bool
306 ] = BulkWriter._default_on_error
308 self._in_flight_documents: int = 0
309 self._rate_limiter = RateLimiter(
310 initial_tokens=self._options.initial_ops_per_second,
311 global_max_tokens=self._options.max_ops_per_second,
312 )
314 # Keep track of progress as batches and write operations are completed
315 self._total_batches_sent: int = 0
316 self._total_write_operations: int = 0
318 self._ensure_executor()
320 @staticmethod
321 def _default_on_batch(
322 batch: BulkWriteBatch,
323 response: BatchWriteResponse,
324 bulk_writer: "BulkWriter",
325 ) -> None:
326 pass
328 @staticmethod
329 def _default_on_success(
330 reference: BaseDocumentReference,
331 result: WriteResult,
332 bulk_writer: "BulkWriter",
333 ) -> None:
334 pass
336 @staticmethod
337 def _default_on_error(error: "BulkWriteFailure", bulk_writer: "BulkWriter") -> bool:
338 # Default number of retries for each operation is 15. This is a scary
339 # number to combine with an exponential backoff, and as such, our default
340 # backoff strategy is linear instead of exponential.
341 return error.attempts < 15
343 def _reset_operations(self) -> None:
344 self._operations = []
345 self._operations_document_paths = []
347 def _ensure_executor(self):
348 """Reboots the executor used to send batches if it has been shutdown."""
349 if getattr(self, "_executor", None) is None or self._executor._shutdown:
350 self._executor = self._instantiate_executor()
352 def _ensure_sending(self):
353 self._ensure_executor()
354 self._send_until_queue_is_empty()
356 def _instantiate_executor(self):
357 return concurrent.futures.ThreadPoolExecutor()
359 def flush(self):
360 """
361 Block until all pooled write operations are complete and then resume
362 accepting new write operations.
363 """
364 # Calling `flush` consecutively is a no-op.
365 if self._executor._shutdown:
366 return
368 while True:
369 # Queue any waiting operations and try our luck again.
370 # This can happen if users add a number of records not divisible by
371 # 20 and then call flush (which should be ~19 out of 20 use cases).
372 # Execution will arrive here and find the leftover operations that
373 # never filled up a batch organically, and so we must send them here.
374 if self._operations:
375 self._enqueue_current_batch()
376 continue
378 # If we find queued but unsent batches or pending retries, begin
379 # sending immediately. Note that if we are waiting on retries, but
380 # they have longer to wait as specified by the retry backoff strategy,
381 # we may have to make several passes through this part of the loop.
382 # (This is related to the sleep and its explanation below.)
383 if self._queued_batches or self._retries:
384 self._ensure_sending()
386 # This sleep prevents max-speed laps through this loop, which can
387 # and will happen if the BulkWriter is doing nothing except waiting
388 # on retries to be ready to re-send. Removing this sleep will cause
389 # whatever thread is running this code to sit near 100% CPU until
390 # all retries are abandoned or successfully resolved.
391 time.sleep(0.1)
392 continue
394 # We store the executor's Future from each batch send operation, so
395 # the first pass through here, we are guaranteed to find "pending"
396 # batch futures and have to wait. However, the second pass through
397 # will be fast unless the last batch introduced more retries.
398 if self._pending_batch_futures:
399 _batches = self._pending_batch_futures
400 self._pending_batch_futures = []
401 concurrent.futures.wait(_batches)
403 # Continuing is critical here (as opposed to breaking) because
404 # the final batch may have introduced retries which is most
405 # straightforwardly verified by heading back to the top of the loop.
406 continue
408 break
410 # We no longer expect to have any queued batches or pending futures,
411 # so the executor can be shutdown.
412 self._executor.shutdown()
414 def close(self):
415 """
416 Block until all pooled write operations are complete and then reject
417 any further write operations.
418 """
419 self._is_open = False
420 self.flush()
422 def _maybe_enqueue_current_batch(self):
423 """
424 Checks to see whether the in-progress batch is full and, if it is,
425 adds it to the sending queue.
426 """
427 if len(self._operations) >= self.batch_size:
428 self._enqueue_current_batch()
430 def _enqueue_current_batch(self):
431 """Adds the current batch to the back of the sending line, resets the
432 list of queued ops, and begins the process of actually sending whatever
433 batch is in the front of the line, which will often be a different batch.
434 """
435 # Put our batch in the back of the sending line
436 self._queued_batches.append(self._operations)
438 # Reset the local store of operations
439 self._reset_operations()
441 # The sending loop powers off upon reaching the end of the queue, so
442 # here we make sure that is running.
443 self._ensure_sending()
445 def _send_until_queue_is_empty(self):
446 """First domino in the sending codepath. This does not need to be
447 parallelized for two reasons:
449 1) Putting this on a worker thread could lead to two running in parallel
450 and thus unpredictable commit ordering or failure to adhere to
451 rate limits.
452 2) This method only blocks when `self._request_send()` does not immediately
453 return, and in that case, the BulkWriter's ramp-up / throttling logic
454 has determined that it is attempting to exceed the maximum write speed,
455 and so parallelizing this method would not increase performance anyway.
457 Once `self._request_send()` returns, this method calls `self._send_batch()`,
458 which parallelizes itself if that is our SendMode value.
460 And once `self._send_batch()` is called (which does not block if we are
461 sending in parallel), jumps back to the top and re-checks for any queued
462 batches.
464 Note that for sufficiently large data migrations, this can block the
465 submission of additional write operations (e.g., the CRUD methods);
466 but again, that is only if the maximum write speed is being exceeded,
467 and thus this scenario does not actually further reduce performance.
468 """
469 self._schedule_ready_retries()
471 while self._queued_batches:
472 # For FIFO order, add to the right of this deque (via `append`) and take
473 # from the left (via `popleft`).
474 operations: List[BulkWriterOperation] = self._queued_batches.popleft()
476 # Block until we are cleared for takeoff, which is fine because this
477 # returns instantly unless the rate limiting logic determines that we
478 # are attempting to exceed the maximum write speed.
479 self._request_send(len(operations))
481 # Handle some bookkeeping, and ultimately put these bits on the wire.
482 batch = BulkWriteBatch(client=self._client)
483 op: BulkWriterOperation
484 for op in operations:
485 op.add_to_batch(batch)
487 # `_send_batch` is optionally parallelized by `@_with_send_mode`.
488 future = self._send_batch(batch=batch, operations=operations)
489 self._pending_batch_futures.append(future)
491 self._schedule_ready_retries()
493 def _schedule_ready_retries(self):
494 """Grabs all ready retries and re-queues them."""
496 # Because `self._retries` always exists in a sorted state (thanks to only
497 # ever adding to it via `bisect.insort`), and because `OperationRetry`
498 # objects are comparable against `datetime` objects, this bisect functionally
499 # returns the number of retires that are ready for immediate reenlistment.
500 take_until_index = bisect.bisect(
501 self._retries, datetime.datetime.now(tz=datetime.timezone.utc)
502 )
504 for _ in range(take_until_index):
505 retry: OperationRetry = self._retries.popleft()
506 retry.retry(self)
508 def _request_send(self, batch_size: int) -> bool:
509 # Set up this boolean to avoid repeatedly taking tokens if we're only
510 # waiting on the `max_in_flight` limit.
511 have_received_tokens: bool = False
513 while True:
514 # To avoid bottlenecks on the server, an additional limit is that no
515 # more write operations can be "in flight" (sent but still awaiting
516 # response) at any given point than the maximum number of writes per
517 # second.
518 under_threshold: bool = (
519 self._in_flight_documents <= self._rate_limiter._maximum_tokens
520 )
521 # Ask for tokens each pass through this loop until they are granted,
522 # and then stop.
523 have_received_tokens = (
524 have_received_tokens or self._rate_limiter.take_tokens(batch_size)
525 )
526 if not under_threshold or not have_received_tokens:
527 # Try again until both checks are true.
528 # Note that this sleep is helpful to prevent the main BulkWriter
529 # thread from spinning through this loop as fast as possible and
530 # pointlessly burning CPU while we wait for the arrival of a
531 # fixed moment in the future.
532 time.sleep(0.01)
533 continue
535 return True
537 def create(
538 self,
539 reference: BaseDocumentReference,
540 document_data: Dict,
541 attempts: int = 0,
542 ) -> None:
543 """Adds a `create` pb to the in-progress batch.
545 If the in-progress batch already contains a write operation involving
546 this document reference, the batch will be sealed and added to the commit
547 queue, and a new batch will be created with this operation as its first
548 entry.
550 If this create operation results in the in-progress batch reaching full
551 capacity, then the batch will be similarly added to the commit queue, and
552 a new batch will be created for future operations.
554 Args:
555 reference (:class:`~google.cloud.firestore_v1.base_document.BaseDocumentReference`):
556 Pointer to the document that should be created.
557 document_data (dict):
558 Raw data to save to the server.
559 """
560 self._verify_not_closed()
562 if reference._document_path in self._operations_document_paths:
563 self._enqueue_current_batch()
565 self._operations.append(
566 BulkWriterCreateOperation(
567 reference=reference,
568 document_data=document_data,
569 attempts=attempts,
570 ),
571 )
572 self._operations_document_paths.append(reference._document_path)
574 self._maybe_enqueue_current_batch()
576 def delete(
577 self,
578 reference: BaseDocumentReference,
579 option: Optional[_helpers.WriteOption] = None,
580 attempts: int = 0,
581 ) -> None:
582 """Adds a `delete` pb to the in-progress batch.
584 If the in-progress batch already contains a write operation involving
585 this document reference, the batch will be sealed and added to the commit
586 queue, and a new batch will be created with this operation as its first
587 entry.
589 If this delete operation results in the in-progress batch reaching full
590 capacity, then the batch will be similarly added to the commit queue, and
591 a new batch will be created for future operations.
593 Args:
594 reference (:class:`~google.cloud.firestore_v1.base_document.BaseDocumentReference`):
595 Pointer to the document that should be created.
596 option (:class:`~google.cloud.firestore_v1._helpers.WriteOption`):
597 Optional flag to modify the nature of this write.
598 """
599 self._verify_not_closed()
601 if reference._document_path in self._operations_document_paths:
602 self._enqueue_current_batch()
604 self._operations.append(
605 BulkWriterDeleteOperation(
606 reference=reference,
607 option=option,
608 attempts=attempts,
609 ),
610 )
611 self._operations_document_paths.append(reference._document_path)
613 self._maybe_enqueue_current_batch()
615 def set(
616 self,
617 reference: BaseDocumentReference,
618 document_data: Dict,
619 merge: Union[bool, list] = False,
620 attempts: int = 0,
621 ) -> None:
622 """Adds a `set` pb to the in-progress batch.
624 If the in-progress batch already contains a write operation involving
625 this document reference, the batch will be sealed and added to the commit
626 queue, and a new batch will be created with this operation as its first
627 entry.
629 If this set operation results in the in-progress batch reaching full
630 capacity, then the batch will be similarly added to the commit queue, and
631 a new batch will be created for future operations.
633 Args:
634 reference (:class:`~google.cloud.firestore_v1.base_document.BaseDocumentReference`):
635 Pointer to the document that should be created.
636 document_data (dict):
637 Raw data to save to the server.
638 merge (bool):
639 Whether or not to completely overwrite any existing data with
640 the supplied data.
641 """
642 self._verify_not_closed()
644 if reference._document_path in self._operations_document_paths:
645 self._enqueue_current_batch()
647 self._operations.append(
648 BulkWriterSetOperation(
649 reference=reference,
650 document_data=document_data,
651 merge=merge,
652 attempts=attempts,
653 )
654 )
655 self._operations_document_paths.append(reference._document_path)
657 self._maybe_enqueue_current_batch()
659 def update(
660 self,
661 reference: BaseDocumentReference,
662 field_updates: dict,
663 option: Optional[_helpers.WriteOption] = None,
664 attempts: int = 0,
665 ) -> None:
666 """Adds an `update` pb to the in-progress batch.
668 If the in-progress batch already contains a write operation involving
669 this document reference, the batch will be sealed and added to the commit
670 queue, and a new batch will be created with this operation as its first
671 entry.
673 If this update operation results in the in-progress batch reaching full
674 capacity, then the batch will be similarly added to the commit queue, and
675 a new batch will be created for future operations.
677 Args:
678 reference (:class:`~google.cloud.firestore_v1.base_document.BaseDocumentReference`):
679 Pointer to the document that should be created.
680 field_updates (dict):
681 Key paths to specific nested data that should be upated.
682 option (:class:`~google.cloud.firestore_v1._helpers.WriteOption`):
683 Optional flag to modify the nature of this write.
684 """
685 # This check is copied from other Firestore classes for the purposes of
686 # surfacing the error immediately.
687 if option.__class__.__name__ == "ExistsOption":
688 raise ValueError("you must not pass an explicit write option to update.")
690 self._verify_not_closed()
692 if reference._document_path in self._operations_document_paths:
693 self._enqueue_current_batch()
695 self._operations.append(
696 BulkWriterUpdateOperation(
697 reference=reference,
698 field_updates=field_updates,
699 option=option,
700 attempts=attempts,
701 )
702 )
703 self._operations_document_paths.append(reference._document_path)
705 self._maybe_enqueue_current_batch()
707 def on_write_result(
708 self,
709 callback: Callable[[BaseDocumentReference, WriteResult, "BulkWriter"], None],
710 ) -> None:
711 """Sets a callback that will be invoked once for every successful operation."""
712 self._success_callback = callback or BulkWriter._default_on_success
714 def on_batch_result(
715 self,
716 callback: Callable[[BulkWriteBatch, BatchWriteResponse, "BulkWriter"], None],
717 ) -> None:
718 """Sets a callback that will be invoked once for every successful batch."""
719 self._batch_callback = callback or BulkWriter._default_on_batch
721 def on_write_error(
722 self, callback: Callable[["BulkWriteFailure", "BulkWriter"], bool]
723 ) -> None:
724 """Sets a callback that will be invoked once for every batch that contains
725 an error."""
726 self._error_callback = callback or BulkWriter._default_on_error
728 def _verify_not_closed(self):
729 if not self._is_open:
730 raise Exception("BulkWriter is closed and cannot accept new operations")
733class BulkWriterOperation:
734 """Parent class for all operation container classes.
736 `BulkWriterOperation` exists to house all the necessary information for a
737 specific write task, including meta information like the current number of
738 attempts. If a write fails, it is its wrapper `BulkWriteOperation` class
739 that ferries it into its next retry without getting confused with other
740 similar writes to the same document.
741 """
743 def add_to_batch(self, batch: BulkWriteBatch):
744 """Adds `self` to the supplied batch."""
745 assert isinstance(batch, BulkWriteBatch)
746 if isinstance(self, BulkWriterCreateOperation):
747 return batch.create(
748 reference=self.reference,
749 document_data=self.document_data,
750 )
752 if isinstance(self, BulkWriterDeleteOperation):
753 return batch.delete(
754 reference=self.reference,
755 option=self.option,
756 )
758 if isinstance(self, BulkWriterSetOperation):
759 return batch.set(
760 reference=self.reference,
761 document_data=self.document_data,
762 merge=self.merge,
763 )
765 if isinstance(self, BulkWriterUpdateOperation):
766 return batch.update(
767 reference=self.reference,
768 field_updates=self.field_updates,
769 option=self.option,
770 )
771 raise TypeError(
772 f"Unexpected type of {self.__class__.__name__} for batch"
773 ) # pragma: NO COVER
776@functools.total_ordering
777class BaseOperationRetry:
778 """Parent class for both the @dataclass and old-style `OperationRetry`
779 classes.
781 Methods on this class be moved directly to `OperationRetry` when support for
782 Python 3.6 is dropped and `dataclasses` becomes universal.
783 """
785 def __lt__(self, other: "OperationRetry"):
786 """Allows use of `bisect` to maintain a sorted list of `OperationRetry`
787 instances, which in turn allows us to cheaply grab all that are ready to
788 run."""
789 if isinstance(other, OperationRetry):
790 return self.run_at < other.run_at
791 elif isinstance(other, datetime.datetime):
792 return self.run_at < other
793 return NotImplemented # pragma: NO COVER
795 def retry(self, bulk_writer: BulkWriter) -> None:
796 """Call this after waiting any necessary time to re-add the enclosed
797 operation to the supplied BulkWriter's internal queue."""
798 if isinstance(self.operation, BulkWriterCreateOperation):
799 bulk_writer.create(
800 reference=self.operation.reference,
801 document_data=self.operation.document_data,
802 attempts=self.operation.attempts,
803 )
805 elif isinstance(self.operation, BulkWriterDeleteOperation):
806 bulk_writer.delete(
807 reference=self.operation.reference,
808 option=self.operation.option,
809 attempts=self.operation.attempts,
810 )
812 elif isinstance(self.operation, BulkWriterSetOperation):
813 bulk_writer.set(
814 reference=self.operation.reference,
815 document_data=self.operation.document_data,
816 merge=self.operation.merge,
817 attempts=self.operation.attempts,
818 )
820 elif isinstance(self.operation, BulkWriterUpdateOperation):
821 bulk_writer.update(
822 reference=self.operation.reference,
823 field_updates=self.operation.field_updates,
824 option=self.operation.option,
825 attempts=self.operation.attempts,
826 )
827 else:
828 raise TypeError(
829 f"Unexpected type of {self.operation.__class__.__name__} for OperationRetry.retry"
830 ) # pragma: NO COVER
833@dataclass
834class BulkWriterOptions:
835 initial_ops_per_second: int = 500
836 max_ops_per_second: int = 500
837 mode: SendMode = SendMode.parallel
838 retry: BulkRetry = BulkRetry.linear
841@dataclass
842class BulkWriteFailure:
843 operation: BulkWriterOperation
844 # https://grpc.github.io/grpc/core/md_doc_statuscodes.html
845 code: int
846 message: str
848 @property
849 def attempts(self) -> int:
850 return self.operation.attempts
853@dataclass
854class OperationRetry(BaseOperationRetry):
855 """Container for an additional attempt at an operation, scheduled for
856 the future."""
858 operation: BulkWriterOperation
859 run_at: datetime.datetime
862@dataclass
863class BulkWriterCreateOperation(BulkWriterOperation):
864 """Container for BulkWriter.create() operations."""
866 reference: BaseDocumentReference
867 document_data: Dict
868 attempts: int = 0
871@dataclass
872class BulkWriterUpdateOperation(BulkWriterOperation):
873 """Container for BulkWriter.update() operations."""
875 reference: BaseDocumentReference
876 field_updates: Dict
877 option: Optional[_helpers.WriteOption]
878 attempts: int = 0
881@dataclass
882class BulkWriterSetOperation(BulkWriterOperation):
883 """Container for BulkWriter.set() operations."""
885 reference: BaseDocumentReference
886 document_data: Dict
887 merge: Union[bool, list] = False
888 attempts: int = 0
891@dataclass
892class BulkWriterDeleteOperation(BulkWriterOperation):
893 """Container for BulkWriter.delete() operations."""
895 reference: BaseDocumentReference
896 option: Optional[_helpers.WriteOption]
897 attempts: int = 0