Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/google/cloud/firestore_v1/watch.py: 22%

319 statements  

« prev     ^ index     » next       coverage.py v7.3.2, created at 2023-12-09 06:27 +0000

1# Copyright 2017 Google LLC All rights reserved. 

2# 

3# Licensed under the Apache License, Version 2.0 (the "License"); 

4# you may not use this file except in compliance with the License. 

5# You may obtain a copy of the License at 

6# 

7# http://www.apache.org/licenses/LICENSE-2.0 

8# 

9# Unless required by applicable law or agreed to in writing, software 

10# distributed under the License is distributed on an "AS IS" BASIS, 

11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

12# See the License for the specific language governing permissions and 

13# limitations under the License. 

14 

15import collections 

16from enum import Enum 

17import functools 

18import logging 

19import threading 

20 

21from google.api_core.bidi import ResumableBidiRpc 

22from google.api_core.bidi import BackgroundConsumer 

23from google.api_core import exceptions 

24import grpc # type: ignore 

25 

26from google.cloud.firestore_v1.types.firestore import ListenRequest 

27from google.cloud.firestore_v1.types.firestore import Target 

28from google.cloud.firestore_v1.types.firestore import TargetChange 

29from google.cloud.firestore_v1 import _helpers 

30 

31 

32TargetChangeType = TargetChange.TargetChangeType 

33 

34_LOGGER = logging.getLogger(__name__) 

35 

36WATCH_TARGET_ID = 0x5079 # "Py" 

37 

38GRPC_STATUS_CODE = { 

39 "OK": 0, 

40 "CANCELLED": 1, 

41 "UNKNOWN": 2, 

42 "INVALID_ARGUMENT": 3, 

43 "DEADLINE_EXCEEDED": 4, 

44 "NOT_FOUND": 5, 

45 "ALREADY_EXISTS": 6, 

46 "PERMISSION_DENIED": 7, 

47 "UNAUTHENTICATED": 16, 

48 "RESOURCE_EXHAUSTED": 8, 

49 "FAILED_PRECONDITION": 9, 

50 "ABORTED": 10, 

51 "OUT_OF_RANGE": 11, 

52 "UNIMPLEMENTED": 12, 

53 "INTERNAL": 13, 

54 "UNAVAILABLE": 14, 

55 "DATA_LOSS": 15, 

56 "DO_NOT_USE": -1, 

57} 

58_RPC_ERROR_THREAD_NAME = "Thread-OnRpcTerminated" 

59_RECOVERABLE_STREAM_EXCEPTIONS = ( 

60 exceptions.Aborted, 

61 exceptions.Cancelled, 

62 exceptions.Unknown, 

63 exceptions.DeadlineExceeded, 

64 exceptions.ResourceExhausted, 

65 exceptions.InternalServerError, 

66 exceptions.ServiceUnavailable, 

67 exceptions.Unauthenticated, 

68) 

69_TERMINATING_STREAM_EXCEPTIONS = (exceptions.Cancelled,) 

70 

71DocTreeEntry = collections.namedtuple("DocTreeEntry", ["value", "index"]) 

72 

73 

74class WatchDocTree(object): 

75 # TODO: Currently this uses a dict. Other implementations use a rbtree. 

76 # The performance of this implementation should be investigated and may 

77 # require modifying the underlying datastructure to a rbtree. 

78 def __init__(self): 

79 self._dict = {} 

80 self._index = 0 

81 

82 def keys(self): 

83 return list(self._dict.keys()) 

84 

85 def _copy(self): 

86 wdt = WatchDocTree() 

87 wdt._dict = self._dict.copy() 

88 wdt._index = self._index 

89 self = wdt 

90 return self 

91 

92 def insert(self, key, value): 

93 self = self._copy() 

94 self._dict[key] = DocTreeEntry(value, self._index) 

95 self._index += 1 

96 return self 

97 

98 def find(self, key): 

99 return self._dict[key] 

100 

101 def remove(self, key): 

102 self = self._copy() 

103 del self._dict[key] 

104 return self 

105 

106 def __iter__(self): 

107 for k in self._dict: 

108 yield k 

109 

110 def __len__(self): 

111 return len(self._dict) 

112 

113 def __contains__(self, k): 

114 return k in self._dict 

115 

116 

117class ChangeType(Enum): 

118 ADDED = 1 

119 REMOVED = 2 

120 MODIFIED = 3 

121 

122 

123class DocumentChange(object): 

124 def __init__(self, type, document, old_index, new_index): 

125 """DocumentChange 

126 

127 Args: 

128 type (ChangeType): 

129 document (document.DocumentSnapshot): 

130 old_index (int): 

131 new_index (int): 

132 """ 

133 # TODO: spec indicated an isEqual param also 

134 self.type = type 

135 self.document = document 

136 self.old_index = old_index 

137 self.new_index = new_index 

138 

139 

140class WatchResult(object): 

141 def __init__(self, snapshot, name, change_type): 

142 self.snapshot = snapshot 

143 self.name = name 

144 self.change_type = change_type 

145 

146 

147def _maybe_wrap_exception(exception): 

148 """Wraps a gRPC exception class, if needed.""" 

149 if isinstance(exception, grpc.RpcError): 

150 return exceptions.from_grpc_error(exception) 

151 return exception 

152 

153 

154def document_watch_comparator(doc1, doc2): 

155 assert doc1 == doc2, "Document watches only support one document." 

156 return 0 

157 

158 

159def _should_recover(exception): 

160 wrapped = _maybe_wrap_exception(exception) 

161 return isinstance(wrapped, _RECOVERABLE_STREAM_EXCEPTIONS) 

162 

163 

164def _should_terminate(exception): 

165 wrapped = _maybe_wrap_exception(exception) 

166 return isinstance(wrapped, _TERMINATING_STREAM_EXCEPTIONS) 

167 

168 

169class Watch(object): 

170 def __init__( 

171 self, 

172 document_reference, 

173 firestore, 

174 target, 

175 comparator, 

176 snapshot_callback, 

177 document_snapshot_cls, 

178 ): 

179 """ 

180 Args: 

181 firestore: 

182 target: 

183 comparator: 

184 snapshot_callback: Callback method to process snapshots. 

185 Args: 

186 docs (List(DocumentSnapshot)): A callback that returns the 

187 ordered list of documents stored in this snapshot. 

188 changes (List(str)): A callback that returns the list of 

189 changed documents since the last snapshot delivered for 

190 this watch. 

191 read_time (string): The ISO 8601 time at which this 

192 snapshot was obtained. 

193 

194 document_snapshot_cls: factory for instances of DocumentSnapshot 

195 """ 

196 self._document_reference = document_reference 

197 self._firestore = firestore 

198 self._targets = target 

199 self._comparator = comparator 

200 self._document_snapshot_cls = document_snapshot_cls 

201 self._snapshot_callback = snapshot_callback 

202 self._api = firestore._firestore_api 

203 self._closing = threading.Lock() 

204 self._closed = False 

205 self._set_documents_pfx(firestore._database_string) 

206 

207 self.resume_token = None 

208 

209 # Initialize state for on_snapshot 

210 # The sorted tree of QueryDocumentSnapshots as sent in the last 

211 # snapshot. We only look at the keys. 

212 self.doc_tree = WatchDocTree() 

213 

214 # A map of document names to QueryDocumentSnapshots for the last sent 

215 # snapshot. 

216 self.doc_map = {} 

217 

218 # The accumulates map of document changes (keyed by document name) for 

219 # the current snapshot. 

220 self.change_map = {} 

221 

222 # The current state of the query results. 

223 self.current = False 

224 

225 # We need this to track whether we've pushed an initial set of changes, 

226 # since we should push those even when there are no changes, if there 

227 # aren't docs. 

228 self.has_pushed = False 

229 

230 self._init_stream() 

231 

232 def _init_stream(self): 

233 rpc_request = self._get_rpc_request 

234 

235 self._rpc = ResumableBidiRpc( 

236 start_rpc=self._api._transport.listen, 

237 should_recover=_should_recover, 

238 should_terminate=_should_terminate, 

239 initial_request=rpc_request, 

240 metadata=self._firestore._rpc_metadata, 

241 ) 

242 

243 self._rpc.add_done_callback(self._on_rpc_done) 

244 

245 # The server assigns and updates the resume token. 

246 self._consumer = BackgroundConsumer(self._rpc, self.on_snapshot) 

247 self._consumer.start() 

248 

249 @classmethod 

250 def for_document( 

251 cls, 

252 document_ref, 

253 snapshot_callback, 

254 document_snapshot_cls, 

255 ): 

256 """ 

257 Creates a watch snapshot listener for a document. snapshot_callback 

258 receives a DocumentChange object, but may also start to get 

259 targetChange and such soon 

260 

261 Args: 

262 document_ref: Reference to Document 

263 snapshot_callback: callback to be called on snapshot 

264 document_snapshot_cls: class to make snapshots with 

265 reference_class_instance: class make references 

266 

267 """ 

268 return cls( 

269 document_ref, 

270 document_ref._client, 

271 { 

272 "documents": {"documents": [document_ref._document_path]}, 

273 "target_id": WATCH_TARGET_ID, 

274 }, 

275 document_watch_comparator, 

276 snapshot_callback, 

277 document_snapshot_cls, 

278 ) 

279 

280 @classmethod 

281 def for_query(cls, query, snapshot_callback, document_snapshot_cls): 

282 parent_path, _ = query._parent._parent_info() 

283 query_target = Target.QueryTarget( 

284 parent=parent_path, structured_query=query._to_protobuf() 

285 ) 

286 

287 return cls( 

288 query, 

289 query._client, 

290 {"query": query_target._pb, "target_id": WATCH_TARGET_ID}, 

291 query._comparator, 

292 snapshot_callback, 

293 document_snapshot_cls, 

294 ) 

295 

296 def _get_rpc_request(self): 

297 if self.resume_token is not None: 

298 self._targets["resume_token"] = self.resume_token 

299 else: 

300 self._targets.pop("resume_token", None) 

301 

302 return ListenRequest( 

303 database=self._firestore._database_string, add_target=self._targets 

304 ) 

305 

306 def _set_documents_pfx(self, database_string): 

307 self._documents_pfx = f"{database_string}/documents/" 

308 self._documents_pfx_len = len(self._documents_pfx) 

309 

310 @property 

311 def is_active(self): 

312 """bool: True if this manager is actively streaming. 

313 

314 Note that ``False`` does not indicate this is complete shut down, 

315 just that it stopped getting new messages. 

316 """ 

317 return self._consumer is not None and self._consumer.is_active 

318 

319 def close(self, reason=None): 

320 """Stop consuming messages and shutdown all helper threads. 

321 

322 This method is idempotent. Additional calls will have no effect. 

323 

324 Args: 

325 reason (Any): The reason to close this. If None, this is considered 

326 an "intentional" shutdown. 

327 """ 

328 with self._closing: 

329 if self._closed: 

330 return 

331 

332 # Stop consuming messages. 

333 if self.is_active: 

334 _LOGGER.debug("Stopping consumer.") 

335 self._consumer.stop() 

336 self._consumer = None 

337 

338 self._rpc.close() 

339 self._rpc = None 

340 self._closed = True 

341 _LOGGER.debug("Finished stopping manager.") 

342 

343 if reason: 

344 # Raise an exception if a reason is provided 

345 _LOGGER.debug("reason for closing: %s" % reason) 

346 if isinstance(reason, Exception): 

347 raise reason 

348 raise RuntimeError(reason) 

349 

350 def _on_rpc_done(self, future): 

351 """Triggered whenever the underlying RPC terminates without recovery. 

352 

353 This is typically triggered from one of two threads: the background 

354 consumer thread (when calling ``recv()`` produces a non-recoverable 

355 error) or the grpc management thread (when cancelling the RPC). 

356 

357 This method is *non-blocking*. It will start another thread to deal 

358 with shutting everything down. This is to prevent blocking in the 

359 background consumer and preventing it from being ``joined()``. 

360 """ 

361 _LOGGER.info("RPC termination has signaled manager shutdown.") 

362 future = _maybe_wrap_exception(future) 

363 thread = threading.Thread( 

364 name=_RPC_ERROR_THREAD_NAME, target=self.close, kwargs={"reason": future} 

365 ) 

366 thread.daemon = True 

367 thread.start() 

368 

369 def unsubscribe(self): 

370 self.close() 

371 

372 def _on_snapshot_target_change_no_change(self, target_change): 

373 _LOGGER.debug("on_snapshot: target change: NO_CHANGE") 

374 

375 no_target_ids = ( 

376 target_change.target_ids is None or len(target_change.target_ids) == 0 

377 ) 

378 if no_target_ids and target_change.read_time and self.current: 

379 # TargetChange.TargetChangeType.CURRENT followed by 

380 # TargetChange.TargetChangeType.NO_CHANGE 

381 # signals a consistent state. Invoke the onSnapshot 

382 # callback as specified by the user. 

383 self.push(target_change.read_time, target_change.resume_token) 

384 

385 def _on_snapshot_target_change_add(self, target_change): 

386 _LOGGER.debug("on_snapshot: target change: ADD") 

387 target_id = target_change.target_ids[0] 

388 if target_id != WATCH_TARGET_ID: 

389 raise RuntimeError("Unexpected target ID %s sent by server" % target_id) 

390 

391 def _on_snapshot_target_change_remove(self, target_change): 

392 _LOGGER.debug("on_snapshot: target change: REMOVE") 

393 

394 if target_change.cause.code: 

395 code = target_change.cause.code 

396 message = target_change.cause.message 

397 else: 

398 code = 13 

399 message = "internal error" 

400 

401 error_message = "Error %s: %s" % (code, message) 

402 

403 raise RuntimeError(error_message) from exceptions.from_grpc_status( 

404 code, message 

405 ) 

406 

407 def _on_snapshot_target_change_reset(self, target_change): 

408 # Whatever changes have happened so far no longer matter. 

409 _LOGGER.debug("on_snapshot: target change: RESET") 

410 self._reset_docs() 

411 

412 def _on_snapshot_target_change_current(self, target_change): 

413 _LOGGER.debug("on_snapshot: target change: CURRENT") 

414 self.current = True 

415 

416 _target_changetype_dispatch = { 

417 TargetChangeType.NO_CHANGE: _on_snapshot_target_change_no_change, 

418 TargetChangeType.ADD: _on_snapshot_target_change_add, 

419 TargetChangeType.REMOVE: _on_snapshot_target_change_remove, 

420 TargetChangeType.RESET: _on_snapshot_target_change_reset, 

421 TargetChangeType.CURRENT: _on_snapshot_target_change_current, 

422 } 

423 

424 def _strip_document_pfx(self, document_name): 

425 if document_name.startswith(self._documents_pfx): 

426 document_name = document_name[self._documents_pfx_len :] 

427 return document_name 

428 

429 def on_snapshot(self, proto): 

430 """Process a response from the bi-directional gRPC stream. 

431 

432 Collect changes and push the changes in a batch to the customer 

433 when we receive 'current' from the listen response. 

434 

435 Args: 

436 proto(`google.cloud.firestore_v1.types.ListenResponse`): 

437 Callback method that receives a object to 

438 """ 

439 if proto is None: 

440 self.close() 

441 return 

442 

443 pb = proto._pb 

444 which = pb.WhichOneof("response_type") 

445 

446 if which == "target_change": 

447 target_change_type = pb.target_change.target_change_type 

448 _LOGGER.debug(f"on_snapshot: target change: {target_change_type}") 

449 

450 meth = self._target_changetype_dispatch.get(target_change_type) 

451 

452 if meth is None: 

453 message = f"Unknown target change type: {target_change_type}" 

454 _LOGGER.info(f"on_snapshot: {message}") 

455 self.close(reason=ValueError(message)) 

456 

457 try: 

458 # Use 'proto' vs 'pb' for datetime handling 

459 meth(self, proto.target_change) 

460 except Exception as exc2: 

461 _LOGGER.debug(f"meth(proto) exc: {exc2}") 

462 raise 

463 

464 # NOTE: 

465 # in other implementations, such as node, the backoff is reset here 

466 # in this version bidi rpc is just used and will control this. 

467 

468 elif which == "document_change": 

469 _LOGGER.debug("on_snapshot: document change") 

470 

471 # No other target_ids can show up here, but we still need to see 

472 # if the targetId was in the added list or removed list. 

473 changed = WATCH_TARGET_ID in pb.document_change.target_ids 

474 removed = WATCH_TARGET_ID in pb.document_change.removed_target_ids 

475 

476 # google.cloud.firestore_v1.types.Document 

477 # Use 'proto' vs 'pb' for datetime handling 

478 document = proto.document_change.document 

479 

480 if changed: 

481 _LOGGER.debug("on_snapshot: document change: CHANGED") 

482 

483 data = _helpers.decode_dict(document.fields, self._firestore) 

484 

485 # Create a snapshot. As Document and Query objects can be 

486 # passed we need to get a Document Reference in a more manual 

487 # fashion than self._document_reference 

488 document_name = self._strip_document_pfx(document.name) 

489 document_ref = self._firestore.document(document_name) 

490 

491 snapshot = self._document_snapshot_cls( 

492 reference=document_ref, 

493 data=data, 

494 exists=True, 

495 read_time=None, 

496 create_time=document.create_time, 

497 update_time=document.update_time, 

498 ) 

499 self.change_map[document.name] = snapshot 

500 

501 elif removed: 

502 _LOGGER.debug("on_snapshot: document change: REMOVED") 

503 self.change_map[document.name] = ChangeType.REMOVED 

504 

505 # NB: document_delete and document_remove (as far as we, the client, 

506 # are concerned) are functionally equivalent 

507 

508 elif which == "document_delete": 

509 _LOGGER.debug("on_snapshot: document change: DELETE") 

510 name = pb.document_delete.document 

511 self.change_map[name] = ChangeType.REMOVED 

512 

513 elif which == "document_remove": 

514 _LOGGER.debug("on_snapshot: document change: REMOVE") 

515 name = pb.document_remove.document 

516 self.change_map[name] = ChangeType.REMOVED 

517 

518 elif which == "filter": 

519 _LOGGER.debug("on_snapshot: filter update") 

520 if pb.filter.count != self._current_size(): 

521 # First, shut down current stream 

522 _LOGGER.info("Filter mismatch -- restarting stream.") 

523 thread = threading.Thread( 

524 name=_RPC_ERROR_THREAD_NAME, 

525 target=self.close, 

526 ) 

527 thread.start() 

528 thread.join() # wait for shutdown to complete 

529 # Then, remove all the current results. 

530 self._reset_docs() 

531 # Finally, restart stream. 

532 self._init_stream() 

533 

534 else: 

535 _LOGGER.debug("UNKNOWN TYPE. UHOH") 

536 message = f"Unknown listen response type: {proto}" 

537 self.close(reason=ValueError(message)) 

538 

539 def push(self, read_time, next_resume_token): 

540 """Invoke the callback with a new snapshot 

541 

542 Build the sntapshot from the current set of changes. 

543 

544 Clear the current changes on completion. 

545 """ 

546 deletes, adds, updates = self._extract_changes( 

547 self.doc_map, self.change_map, read_time 

548 ) 

549 

550 updated_tree, updated_map, appliedChanges = self._compute_snapshot( 

551 self.doc_tree, self.doc_map, deletes, adds, updates 

552 ) 

553 

554 if not self.has_pushed or len(appliedChanges): 

555 # TODO: It is possible in the future we will have the tree order 

556 # on insert. For now, we sort here. 

557 key = functools.cmp_to_key(self._comparator) 

558 keys = sorted(updated_tree.keys(), key=key) 

559 

560 self._snapshot_callback(keys, appliedChanges, read_time) 

561 self.has_pushed = True 

562 

563 self.doc_tree = updated_tree 

564 self.doc_map = updated_map 

565 self.change_map.clear() 

566 self.resume_token = next_resume_token 

567 

568 @staticmethod 

569 def _extract_changes(doc_map, changes, read_time): 

570 deletes = [] 

571 adds = [] 

572 updates = [] 

573 

574 for name, value in changes.items(): 

575 if value == ChangeType.REMOVED: 

576 if name in doc_map: 

577 deletes.append(name) 

578 elif name in doc_map: 

579 if read_time is not None: 

580 value.read_time = read_time 

581 updates.append(value) 

582 else: 

583 if read_time is not None: 

584 value.read_time = read_time 

585 adds.append(value) 

586 

587 return (deletes, adds, updates) 

588 

589 def _compute_snapshot( 

590 self, doc_tree, doc_map, delete_changes, add_changes, update_changes 

591 ): 

592 updated_tree = doc_tree 

593 updated_map = doc_map 

594 

595 assert len(doc_tree) == len(doc_map), ( 

596 "The document tree and document map should have the same " 

597 + "number of entries." 

598 ) 

599 

600 def delete_doc(name, updated_tree, updated_map): 

601 """ 

602 Applies a document delete to the document tree and document map. 

603 Returns the corresponding DocumentChange event. 

604 """ 

605 assert name in updated_map, "Document to delete does not exist" 

606 old_document = updated_map.get(name) 

607 # TODO: If a document doesn't exist this raises IndexError. Handle? 

608 existing = updated_tree.find(old_document) 

609 old_index = existing.index 

610 updated_tree = updated_tree.remove(old_document) 

611 del updated_map[name] 

612 return ( 

613 DocumentChange(ChangeType.REMOVED, old_document, old_index, -1), 

614 updated_tree, 

615 updated_map, 

616 ) 

617 

618 def add_doc(new_document, updated_tree, updated_map): 

619 """ 

620 Applies a document add to the document tree and the document map. 

621 Returns the corresponding DocumentChange event. 

622 """ 

623 name = new_document.reference._document_path 

624 assert name not in updated_map, "Document to add already exists" 

625 updated_tree = updated_tree.insert(new_document, None) 

626 new_index = updated_tree.find(new_document).index 

627 updated_map[name] = new_document 

628 return ( 

629 DocumentChange(ChangeType.ADDED, new_document, -1, new_index), 

630 updated_tree, 

631 updated_map, 

632 ) 

633 

634 def modify_doc(new_document, updated_tree, updated_map): 

635 """ 

636 Applies a document modification to the document tree and the 

637 document map. 

638 Returns the DocumentChange event for successful modifications. 

639 """ 

640 name = new_document.reference._document_path 

641 assert name in updated_map, "Document to modify does not exist" 

642 old_document = updated_map.get(name) 

643 if old_document.update_time != new_document.update_time: 

644 remove_change, updated_tree, updated_map = delete_doc( 

645 name, updated_tree, updated_map 

646 ) 

647 add_change, updated_tree, updated_map = add_doc( 

648 new_document, updated_tree, updated_map 

649 ) 

650 return ( 

651 DocumentChange( 

652 ChangeType.MODIFIED, 

653 new_document, 

654 remove_change.old_index, 

655 add_change.new_index, 

656 ), 

657 updated_tree, 

658 updated_map, 

659 ) 

660 

661 return None, updated_tree, updated_map 

662 

663 # Process the sorted changes in the order that is expected by our 

664 # clients (removals, additions, and then modifications). We also need 

665 # to sort the individual changes to assure that old_index/new_index 

666 # keep incrementing. 

667 appliedChanges = [] 

668 

669 key = functools.cmp_to_key(self._comparator) 

670 

671 # Deletes are sorted based on the order of the existing document. 

672 delete_changes = sorted(delete_changes) 

673 for name in delete_changes: 

674 change, updated_tree, updated_map = delete_doc( 

675 name, updated_tree, updated_map 

676 ) 

677 appliedChanges.append(change) 

678 

679 add_changes = sorted(add_changes, key=key) 

680 _LOGGER.debug("walk over add_changes") 

681 for snapshot in add_changes: 

682 _LOGGER.debug("in add_changes") 

683 change, updated_tree, updated_map = add_doc( 

684 snapshot, updated_tree, updated_map 

685 ) 

686 appliedChanges.append(change) 

687 

688 update_changes = sorted(update_changes, key=key) 

689 for snapshot in update_changes: 

690 change, updated_tree, updated_map = modify_doc( 

691 snapshot, updated_tree, updated_map 

692 ) 

693 if change is not None: 

694 appliedChanges.append(change) 

695 

696 assert len(updated_tree) == len(updated_map), ( 

697 "The update document tree and document map " 

698 "should have the same number of entries." 

699 ) 

700 return (updated_tree, updated_map, appliedChanges) 

701 

702 def _current_size(self): 

703 """Return the current count of all documents. 

704 

705 Count includes the changes from the current changeMap. 

706 """ 

707 deletes, adds, _ = self._extract_changes(self.doc_map, self.change_map, None) 

708 return len(self.doc_map) + len(adds) - len(deletes) 

709 

710 def _reset_docs(self): 

711 """ 

712 Helper to clear the docs on RESET or filter mismatch. 

713 """ 

714 _LOGGER.debug("resetting documents") 

715 self.change_map.clear() 

716 self.resume_token = None 

717 

718 # Mark each document as deleted. If documents are not deleted 

719 # they will be sent again by the server. 

720 for snapshot in self.doc_tree.keys(): 

721 name = snapshot.reference._document_path 

722 self.change_map[name] = ChangeType.REMOVED 

723 

724 self.current = False