Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/google/cloud/pubsub_v1/subscriber/futures.py: 48%
31 statements
« prev ^ index » next coverage.py v7.2.2, created at 2023-03-26 06:25 +0000
« prev ^ index » next coverage.py v7.2.2, created at 2023-03-26 06:25 +0000
1# Copyright 2017, Google LLC All rights reserved.
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7# http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
15from __future__ import absolute_import
17import typing
18from typing import Any
19from typing import Union
21from google.cloud.pubsub_v1 import futures
22from google.cloud.pubsub_v1.subscriber.exceptions import AcknowledgeStatus
24if typing.TYPE_CHECKING: # pragma: NO COVER
25 from google.cloud.pubsub_v1.subscriber._protocol.streaming_pull_manager import (
26 StreamingPullManager,
27 )
30class StreamingPullFuture(futures.Future):
31 """Represents a process that asynchronously performs streaming pull and
32 schedules messages to be processed.
34 This future is resolved when the process is stopped (via :meth:`cancel`) or
35 if it encounters an unrecoverable error. Calling `.result()` will cause
36 the calling thread to block indefinitely.
37 """
39 def __init__(self, manager: "StreamingPullManager"):
40 super(StreamingPullFuture, self).__init__()
41 self.__manager = manager
42 self.__manager.add_close_callback(self._on_close_callback)
43 self.__cancelled = False
45 def _on_close_callback(self, manager: "StreamingPullManager", result: Any):
46 if self.done():
47 # The future has already been resolved in a different thread,
48 # nothing to do on the streaming pull manager shutdown.
49 return
51 if result is None:
52 self.set_result(True)
53 else:
54 self.set_exception(result)
56 def cancel(self) -> bool:
57 """Stops pulling messages and shutdowns the background thread consuming
58 messages.
60 The method always returns ``True``, as the shutdown is always initiated.
61 However, if the background stream is already being shut down or the shutdown
62 has completed, this method is a no-op.
64 .. versionchanged:: 2.4.1
65 The method does not block anymore, it just triggers the shutdown and returns
66 immediately. To block until the background stream is terminated, call
67 :meth:`result()` after cancelling the future.
69 .. versionchanged:: 2.10.0
70 The method always returns ``True`` instead of ``None``.
71 """
72 # NOTE: We circumvent the base future's self._state to track the cancellation
73 # state, as this state has different meaning with streaming pull futures.
74 self.__cancelled = True
75 self.__manager.close()
76 return True
78 def cancelled(self) -> bool:
79 """
80 Returns:
81 ``True`` if the subscription has been cancelled.
82 """
83 return self.__cancelled
86class Future(futures.Future):
87 """This future object is for subscribe-side calls.
89 Calling :meth:`result` will resolve the future by returning the message
90 ID, unless an error occurs.
91 """
93 def cancel(self) -> bool:
94 """Actions in Pub/Sub generally may not be canceled.
96 This method always returns ``False``.
97 """
98 return False
100 def cancelled(self) -> bool:
101 """Actions in Pub/Sub generally may not be canceled.
103 This method always returns ``False``.
104 """
105 return False
107 def result(self, timeout: Union[int, float] = None) -> AcknowledgeStatus:
108 """Return a success code or raise an exception.
110 This blocks until the operation completes successfully and
111 returns the error code unless an exception is raised.
113 Args:
114 timeout: The number of seconds before this call
115 times out and raises TimeoutError.
117 Returns:
118 AcknowledgeStatus.SUCCESS if the operation succeeded.
120 Raises:
121 concurrent.futures.TimeoutError: If the request times out.
122 AcknowledgeError: If the operation did not succeed for another
123 reason.
124 """
125 return super().result(timeout=timeout)