Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/google/cloud/firestore_v1/watch.py: 22%
319 statements
« prev ^ index » next coverage.py v7.3.2, created at 2023-12-09 06:27 +0000
« prev ^ index » next coverage.py v7.3.2, created at 2023-12-09 06:27 +0000
1# Copyright 2017 Google LLC All rights reserved.
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7# http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
15import collections
16from enum import Enum
17import functools
18import logging
19import threading
21from google.api_core.bidi import ResumableBidiRpc
22from google.api_core.bidi import BackgroundConsumer
23from google.api_core import exceptions
24import grpc # type: ignore
26from google.cloud.firestore_v1.types.firestore import ListenRequest
27from google.cloud.firestore_v1.types.firestore import Target
28from google.cloud.firestore_v1.types.firestore import TargetChange
29from google.cloud.firestore_v1 import _helpers
32TargetChangeType = TargetChange.TargetChangeType
34_LOGGER = logging.getLogger(__name__)
36WATCH_TARGET_ID = 0x5079 # "Py"
38GRPC_STATUS_CODE = {
39 "OK": 0,
40 "CANCELLED": 1,
41 "UNKNOWN": 2,
42 "INVALID_ARGUMENT": 3,
43 "DEADLINE_EXCEEDED": 4,
44 "NOT_FOUND": 5,
45 "ALREADY_EXISTS": 6,
46 "PERMISSION_DENIED": 7,
47 "UNAUTHENTICATED": 16,
48 "RESOURCE_EXHAUSTED": 8,
49 "FAILED_PRECONDITION": 9,
50 "ABORTED": 10,
51 "OUT_OF_RANGE": 11,
52 "UNIMPLEMENTED": 12,
53 "INTERNAL": 13,
54 "UNAVAILABLE": 14,
55 "DATA_LOSS": 15,
56 "DO_NOT_USE": -1,
57}
58_RPC_ERROR_THREAD_NAME = "Thread-OnRpcTerminated"
59_RECOVERABLE_STREAM_EXCEPTIONS = (
60 exceptions.Aborted,
61 exceptions.Cancelled,
62 exceptions.Unknown,
63 exceptions.DeadlineExceeded,
64 exceptions.ResourceExhausted,
65 exceptions.InternalServerError,
66 exceptions.ServiceUnavailable,
67 exceptions.Unauthenticated,
68)
69_TERMINATING_STREAM_EXCEPTIONS = (exceptions.Cancelled,)
71DocTreeEntry = collections.namedtuple("DocTreeEntry", ["value", "index"])
74class WatchDocTree(object):
75 # TODO: Currently this uses a dict. Other implementations use a rbtree.
76 # The performance of this implementation should be investigated and may
77 # require modifying the underlying datastructure to a rbtree.
78 def __init__(self):
79 self._dict = {}
80 self._index = 0
82 def keys(self):
83 return list(self._dict.keys())
85 def _copy(self):
86 wdt = WatchDocTree()
87 wdt._dict = self._dict.copy()
88 wdt._index = self._index
89 self = wdt
90 return self
92 def insert(self, key, value):
93 self = self._copy()
94 self._dict[key] = DocTreeEntry(value, self._index)
95 self._index += 1
96 return self
98 def find(self, key):
99 return self._dict[key]
101 def remove(self, key):
102 self = self._copy()
103 del self._dict[key]
104 return self
106 def __iter__(self):
107 for k in self._dict:
108 yield k
110 def __len__(self):
111 return len(self._dict)
113 def __contains__(self, k):
114 return k in self._dict
117class ChangeType(Enum):
118 ADDED = 1
119 REMOVED = 2
120 MODIFIED = 3
123class DocumentChange(object):
124 def __init__(self, type, document, old_index, new_index):
125 """DocumentChange
127 Args:
128 type (ChangeType):
129 document (document.DocumentSnapshot):
130 old_index (int):
131 new_index (int):
132 """
133 # TODO: spec indicated an isEqual param also
134 self.type = type
135 self.document = document
136 self.old_index = old_index
137 self.new_index = new_index
140class WatchResult(object):
141 def __init__(self, snapshot, name, change_type):
142 self.snapshot = snapshot
143 self.name = name
144 self.change_type = change_type
147def _maybe_wrap_exception(exception):
148 """Wraps a gRPC exception class, if needed."""
149 if isinstance(exception, grpc.RpcError):
150 return exceptions.from_grpc_error(exception)
151 return exception
154def document_watch_comparator(doc1, doc2):
155 assert doc1 == doc2, "Document watches only support one document."
156 return 0
159def _should_recover(exception):
160 wrapped = _maybe_wrap_exception(exception)
161 return isinstance(wrapped, _RECOVERABLE_STREAM_EXCEPTIONS)
164def _should_terminate(exception):
165 wrapped = _maybe_wrap_exception(exception)
166 return isinstance(wrapped, _TERMINATING_STREAM_EXCEPTIONS)
169class Watch(object):
170 def __init__(
171 self,
172 document_reference,
173 firestore,
174 target,
175 comparator,
176 snapshot_callback,
177 document_snapshot_cls,
178 ):
179 """
180 Args:
181 firestore:
182 target:
183 comparator:
184 snapshot_callback: Callback method to process snapshots.
185 Args:
186 docs (List(DocumentSnapshot)): A callback that returns the
187 ordered list of documents stored in this snapshot.
188 changes (List(str)): A callback that returns the list of
189 changed documents since the last snapshot delivered for
190 this watch.
191 read_time (string): The ISO 8601 time at which this
192 snapshot was obtained.
194 document_snapshot_cls: factory for instances of DocumentSnapshot
195 """
196 self._document_reference = document_reference
197 self._firestore = firestore
198 self._targets = target
199 self._comparator = comparator
200 self._document_snapshot_cls = document_snapshot_cls
201 self._snapshot_callback = snapshot_callback
202 self._api = firestore._firestore_api
203 self._closing = threading.Lock()
204 self._closed = False
205 self._set_documents_pfx(firestore._database_string)
207 self.resume_token = None
209 # Initialize state for on_snapshot
210 # The sorted tree of QueryDocumentSnapshots as sent in the last
211 # snapshot. We only look at the keys.
212 self.doc_tree = WatchDocTree()
214 # A map of document names to QueryDocumentSnapshots for the last sent
215 # snapshot.
216 self.doc_map = {}
218 # The accumulates map of document changes (keyed by document name) for
219 # the current snapshot.
220 self.change_map = {}
222 # The current state of the query results.
223 self.current = False
225 # We need this to track whether we've pushed an initial set of changes,
226 # since we should push those even when there are no changes, if there
227 # aren't docs.
228 self.has_pushed = False
230 self._init_stream()
232 def _init_stream(self):
233 rpc_request = self._get_rpc_request
235 self._rpc = ResumableBidiRpc(
236 start_rpc=self._api._transport.listen,
237 should_recover=_should_recover,
238 should_terminate=_should_terminate,
239 initial_request=rpc_request,
240 metadata=self._firestore._rpc_metadata,
241 )
243 self._rpc.add_done_callback(self._on_rpc_done)
245 # The server assigns and updates the resume token.
246 self._consumer = BackgroundConsumer(self._rpc, self.on_snapshot)
247 self._consumer.start()
249 @classmethod
250 def for_document(
251 cls,
252 document_ref,
253 snapshot_callback,
254 document_snapshot_cls,
255 ):
256 """
257 Creates a watch snapshot listener for a document. snapshot_callback
258 receives a DocumentChange object, but may also start to get
259 targetChange and such soon
261 Args:
262 document_ref: Reference to Document
263 snapshot_callback: callback to be called on snapshot
264 document_snapshot_cls: class to make snapshots with
265 reference_class_instance: class make references
267 """
268 return cls(
269 document_ref,
270 document_ref._client,
271 {
272 "documents": {"documents": [document_ref._document_path]},
273 "target_id": WATCH_TARGET_ID,
274 },
275 document_watch_comparator,
276 snapshot_callback,
277 document_snapshot_cls,
278 )
280 @classmethod
281 def for_query(cls, query, snapshot_callback, document_snapshot_cls):
282 parent_path, _ = query._parent._parent_info()
283 query_target = Target.QueryTarget(
284 parent=parent_path, structured_query=query._to_protobuf()
285 )
287 return cls(
288 query,
289 query._client,
290 {"query": query_target._pb, "target_id": WATCH_TARGET_ID},
291 query._comparator,
292 snapshot_callback,
293 document_snapshot_cls,
294 )
296 def _get_rpc_request(self):
297 if self.resume_token is not None:
298 self._targets["resume_token"] = self.resume_token
299 else:
300 self._targets.pop("resume_token", None)
302 return ListenRequest(
303 database=self._firestore._database_string, add_target=self._targets
304 )
306 def _set_documents_pfx(self, database_string):
307 self._documents_pfx = f"{database_string}/documents/"
308 self._documents_pfx_len = len(self._documents_pfx)
310 @property
311 def is_active(self):
312 """bool: True if this manager is actively streaming.
314 Note that ``False`` does not indicate this is complete shut down,
315 just that it stopped getting new messages.
316 """
317 return self._consumer is not None and self._consumer.is_active
319 def close(self, reason=None):
320 """Stop consuming messages and shutdown all helper threads.
322 This method is idempotent. Additional calls will have no effect.
324 Args:
325 reason (Any): The reason to close this. If None, this is considered
326 an "intentional" shutdown.
327 """
328 with self._closing:
329 if self._closed:
330 return
332 # Stop consuming messages.
333 if self.is_active:
334 _LOGGER.debug("Stopping consumer.")
335 self._consumer.stop()
336 self._consumer = None
338 self._rpc.close()
339 self._rpc = None
340 self._closed = True
341 _LOGGER.debug("Finished stopping manager.")
343 if reason:
344 # Raise an exception if a reason is provided
345 _LOGGER.debug("reason for closing: %s" % reason)
346 if isinstance(reason, Exception):
347 raise reason
348 raise RuntimeError(reason)
350 def _on_rpc_done(self, future):
351 """Triggered whenever the underlying RPC terminates without recovery.
353 This is typically triggered from one of two threads: the background
354 consumer thread (when calling ``recv()`` produces a non-recoverable
355 error) or the grpc management thread (when cancelling the RPC).
357 This method is *non-blocking*. It will start another thread to deal
358 with shutting everything down. This is to prevent blocking in the
359 background consumer and preventing it from being ``joined()``.
360 """
361 _LOGGER.info("RPC termination has signaled manager shutdown.")
362 future = _maybe_wrap_exception(future)
363 thread = threading.Thread(
364 name=_RPC_ERROR_THREAD_NAME, target=self.close, kwargs={"reason": future}
365 )
366 thread.daemon = True
367 thread.start()
369 def unsubscribe(self):
370 self.close()
372 def _on_snapshot_target_change_no_change(self, target_change):
373 _LOGGER.debug("on_snapshot: target change: NO_CHANGE")
375 no_target_ids = (
376 target_change.target_ids is None or len(target_change.target_ids) == 0
377 )
378 if no_target_ids and target_change.read_time and self.current:
379 # TargetChange.TargetChangeType.CURRENT followed by
380 # TargetChange.TargetChangeType.NO_CHANGE
381 # signals a consistent state. Invoke the onSnapshot
382 # callback as specified by the user.
383 self.push(target_change.read_time, target_change.resume_token)
385 def _on_snapshot_target_change_add(self, target_change):
386 _LOGGER.debug("on_snapshot: target change: ADD")
387 target_id = target_change.target_ids[0]
388 if target_id != WATCH_TARGET_ID:
389 raise RuntimeError("Unexpected target ID %s sent by server" % target_id)
391 def _on_snapshot_target_change_remove(self, target_change):
392 _LOGGER.debug("on_snapshot: target change: REMOVE")
394 if target_change.cause.code:
395 code = target_change.cause.code
396 message = target_change.cause.message
397 else:
398 code = 13
399 message = "internal error"
401 error_message = "Error %s: %s" % (code, message)
403 raise RuntimeError(error_message) from exceptions.from_grpc_status(
404 code, message
405 )
407 def _on_snapshot_target_change_reset(self, target_change):
408 # Whatever changes have happened so far no longer matter.
409 _LOGGER.debug("on_snapshot: target change: RESET")
410 self._reset_docs()
412 def _on_snapshot_target_change_current(self, target_change):
413 _LOGGER.debug("on_snapshot: target change: CURRENT")
414 self.current = True
416 _target_changetype_dispatch = {
417 TargetChangeType.NO_CHANGE: _on_snapshot_target_change_no_change,
418 TargetChangeType.ADD: _on_snapshot_target_change_add,
419 TargetChangeType.REMOVE: _on_snapshot_target_change_remove,
420 TargetChangeType.RESET: _on_snapshot_target_change_reset,
421 TargetChangeType.CURRENT: _on_snapshot_target_change_current,
422 }
424 def _strip_document_pfx(self, document_name):
425 if document_name.startswith(self._documents_pfx):
426 document_name = document_name[self._documents_pfx_len :]
427 return document_name
429 def on_snapshot(self, proto):
430 """Process a response from the bi-directional gRPC stream.
432 Collect changes and push the changes in a batch to the customer
433 when we receive 'current' from the listen response.
435 Args:
436 proto(`google.cloud.firestore_v1.types.ListenResponse`):
437 Callback method that receives a object to
438 """
439 if proto is None:
440 self.close()
441 return
443 pb = proto._pb
444 which = pb.WhichOneof("response_type")
446 if which == "target_change":
447 target_change_type = pb.target_change.target_change_type
448 _LOGGER.debug(f"on_snapshot: target change: {target_change_type}")
450 meth = self._target_changetype_dispatch.get(target_change_type)
452 if meth is None:
453 message = f"Unknown target change type: {target_change_type}"
454 _LOGGER.info(f"on_snapshot: {message}")
455 self.close(reason=ValueError(message))
457 try:
458 # Use 'proto' vs 'pb' for datetime handling
459 meth(self, proto.target_change)
460 except Exception as exc2:
461 _LOGGER.debug(f"meth(proto) exc: {exc2}")
462 raise
464 # NOTE:
465 # in other implementations, such as node, the backoff is reset here
466 # in this version bidi rpc is just used and will control this.
468 elif which == "document_change":
469 _LOGGER.debug("on_snapshot: document change")
471 # No other target_ids can show up here, but we still need to see
472 # if the targetId was in the added list or removed list.
473 changed = WATCH_TARGET_ID in pb.document_change.target_ids
474 removed = WATCH_TARGET_ID in pb.document_change.removed_target_ids
476 # google.cloud.firestore_v1.types.Document
477 # Use 'proto' vs 'pb' for datetime handling
478 document = proto.document_change.document
480 if changed:
481 _LOGGER.debug("on_snapshot: document change: CHANGED")
483 data = _helpers.decode_dict(document.fields, self._firestore)
485 # Create a snapshot. As Document and Query objects can be
486 # passed we need to get a Document Reference in a more manual
487 # fashion than self._document_reference
488 document_name = self._strip_document_pfx(document.name)
489 document_ref = self._firestore.document(document_name)
491 snapshot = self._document_snapshot_cls(
492 reference=document_ref,
493 data=data,
494 exists=True,
495 read_time=None,
496 create_time=document.create_time,
497 update_time=document.update_time,
498 )
499 self.change_map[document.name] = snapshot
501 elif removed:
502 _LOGGER.debug("on_snapshot: document change: REMOVED")
503 self.change_map[document.name] = ChangeType.REMOVED
505 # NB: document_delete and document_remove (as far as we, the client,
506 # are concerned) are functionally equivalent
508 elif which == "document_delete":
509 _LOGGER.debug("on_snapshot: document change: DELETE")
510 name = pb.document_delete.document
511 self.change_map[name] = ChangeType.REMOVED
513 elif which == "document_remove":
514 _LOGGER.debug("on_snapshot: document change: REMOVE")
515 name = pb.document_remove.document
516 self.change_map[name] = ChangeType.REMOVED
518 elif which == "filter":
519 _LOGGER.debug("on_snapshot: filter update")
520 if pb.filter.count != self._current_size():
521 # First, shut down current stream
522 _LOGGER.info("Filter mismatch -- restarting stream.")
523 thread = threading.Thread(
524 name=_RPC_ERROR_THREAD_NAME,
525 target=self.close,
526 )
527 thread.start()
528 thread.join() # wait for shutdown to complete
529 # Then, remove all the current results.
530 self._reset_docs()
531 # Finally, restart stream.
532 self._init_stream()
534 else:
535 _LOGGER.debug("UNKNOWN TYPE. UHOH")
536 message = f"Unknown listen response type: {proto}"
537 self.close(reason=ValueError(message))
539 def push(self, read_time, next_resume_token):
540 """Invoke the callback with a new snapshot
542 Build the sntapshot from the current set of changes.
544 Clear the current changes on completion.
545 """
546 deletes, adds, updates = self._extract_changes(
547 self.doc_map, self.change_map, read_time
548 )
550 updated_tree, updated_map, appliedChanges = self._compute_snapshot(
551 self.doc_tree, self.doc_map, deletes, adds, updates
552 )
554 if not self.has_pushed or len(appliedChanges):
555 # TODO: It is possible in the future we will have the tree order
556 # on insert. For now, we sort here.
557 key = functools.cmp_to_key(self._comparator)
558 keys = sorted(updated_tree.keys(), key=key)
560 self._snapshot_callback(keys, appliedChanges, read_time)
561 self.has_pushed = True
563 self.doc_tree = updated_tree
564 self.doc_map = updated_map
565 self.change_map.clear()
566 self.resume_token = next_resume_token
568 @staticmethod
569 def _extract_changes(doc_map, changes, read_time):
570 deletes = []
571 adds = []
572 updates = []
574 for name, value in changes.items():
575 if value == ChangeType.REMOVED:
576 if name in doc_map:
577 deletes.append(name)
578 elif name in doc_map:
579 if read_time is not None:
580 value.read_time = read_time
581 updates.append(value)
582 else:
583 if read_time is not None:
584 value.read_time = read_time
585 adds.append(value)
587 return (deletes, adds, updates)
589 def _compute_snapshot(
590 self, doc_tree, doc_map, delete_changes, add_changes, update_changes
591 ):
592 updated_tree = doc_tree
593 updated_map = doc_map
595 assert len(doc_tree) == len(doc_map), (
596 "The document tree and document map should have the same "
597 + "number of entries."
598 )
600 def delete_doc(name, updated_tree, updated_map):
601 """
602 Applies a document delete to the document tree and document map.
603 Returns the corresponding DocumentChange event.
604 """
605 assert name in updated_map, "Document to delete does not exist"
606 old_document = updated_map.get(name)
607 # TODO: If a document doesn't exist this raises IndexError. Handle?
608 existing = updated_tree.find(old_document)
609 old_index = existing.index
610 updated_tree = updated_tree.remove(old_document)
611 del updated_map[name]
612 return (
613 DocumentChange(ChangeType.REMOVED, old_document, old_index, -1),
614 updated_tree,
615 updated_map,
616 )
618 def add_doc(new_document, updated_tree, updated_map):
619 """
620 Applies a document add to the document tree and the document map.
621 Returns the corresponding DocumentChange event.
622 """
623 name = new_document.reference._document_path
624 assert name not in updated_map, "Document to add already exists"
625 updated_tree = updated_tree.insert(new_document, None)
626 new_index = updated_tree.find(new_document).index
627 updated_map[name] = new_document
628 return (
629 DocumentChange(ChangeType.ADDED, new_document, -1, new_index),
630 updated_tree,
631 updated_map,
632 )
634 def modify_doc(new_document, updated_tree, updated_map):
635 """
636 Applies a document modification to the document tree and the
637 document map.
638 Returns the DocumentChange event for successful modifications.
639 """
640 name = new_document.reference._document_path
641 assert name in updated_map, "Document to modify does not exist"
642 old_document = updated_map.get(name)
643 if old_document.update_time != new_document.update_time:
644 remove_change, updated_tree, updated_map = delete_doc(
645 name, updated_tree, updated_map
646 )
647 add_change, updated_tree, updated_map = add_doc(
648 new_document, updated_tree, updated_map
649 )
650 return (
651 DocumentChange(
652 ChangeType.MODIFIED,
653 new_document,
654 remove_change.old_index,
655 add_change.new_index,
656 ),
657 updated_tree,
658 updated_map,
659 )
661 return None, updated_tree, updated_map
663 # Process the sorted changes in the order that is expected by our
664 # clients (removals, additions, and then modifications). We also need
665 # to sort the individual changes to assure that old_index/new_index
666 # keep incrementing.
667 appliedChanges = []
669 key = functools.cmp_to_key(self._comparator)
671 # Deletes are sorted based on the order of the existing document.
672 delete_changes = sorted(delete_changes)
673 for name in delete_changes:
674 change, updated_tree, updated_map = delete_doc(
675 name, updated_tree, updated_map
676 )
677 appliedChanges.append(change)
679 add_changes = sorted(add_changes, key=key)
680 _LOGGER.debug("walk over add_changes")
681 for snapshot in add_changes:
682 _LOGGER.debug("in add_changes")
683 change, updated_tree, updated_map = add_doc(
684 snapshot, updated_tree, updated_map
685 )
686 appliedChanges.append(change)
688 update_changes = sorted(update_changes, key=key)
689 for snapshot in update_changes:
690 change, updated_tree, updated_map = modify_doc(
691 snapshot, updated_tree, updated_map
692 )
693 if change is not None:
694 appliedChanges.append(change)
696 assert len(updated_tree) == len(updated_map), (
697 "The update document tree and document map "
698 "should have the same number of entries."
699 )
700 return (updated_tree, updated_map, appliedChanges)
702 def _current_size(self):
703 """Return the current count of all documents.
705 Count includes the changes from the current changeMap.
706 """
707 deletes, adds, _ = self._extract_changes(self.doc_map, self.change_map, None)
708 return len(self.doc_map) + len(adds) - len(deletes)
710 def _reset_docs(self):
711 """
712 Helper to clear the docs on RESET or filter mismatch.
713 """
714 _LOGGER.debug("resetting documents")
715 self.change_map.clear()
716 self.resume_token = None
718 # Mark each document as deleted. If documents are not deleted
719 # they will be sent again by the server.
720 for snapshot in self.doc_tree.keys():
721 name = snapshot.reference._document_path
722 self.change_map[name] = ChangeType.REMOVED
724 self.current = False