1# Copyright 2017, Google LLC All rights reserved.
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7# http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14
15from __future__ import absolute_import
16
17import typing
18from typing import Any
19from typing import Union
20
21from google.cloud.pubsub_v1 import futures
22from google.cloud.pubsub_v1.subscriber.exceptions import AcknowledgeStatus
23
24if typing.TYPE_CHECKING: # pragma: NO COVER
25 from google.cloud.pubsub_v1.subscriber._protocol.streaming_pull_manager import (
26 StreamingPullManager,
27 )
28
29
30class StreamingPullFuture(futures.Future):
31 """Represents a process that asynchronously performs streaming pull and
32 schedules messages to be processed.
33
34 This future is resolved when the process is stopped (via :meth:`cancel`) or
35 if it encounters an unrecoverable error. Calling `.result()` will cause
36 the calling thread to block indefinitely.
37 """
38
39 def __init__(self, manager: "StreamingPullManager"):
40 super(StreamingPullFuture, self).__init__()
41 self.__manager = manager
42 self.__manager.add_close_callback(self._on_close_callback)
43 self.__cancelled = False
44
45 def _on_close_callback(self, manager: "StreamingPullManager", result: Any):
46 if self.done():
47 # The future has already been resolved in a different thread,
48 # nothing to do on the streaming pull manager shutdown.
49 return
50
51 if result is None:
52 self.set_result(True)
53 else:
54 self.set_exception(result)
55
56 def cancel(self) -> bool:
57 """Stops pulling messages and shutdowns the background thread consuming
58 messages.
59
60 The method always returns ``True``, as the shutdown is always initiated.
61 However, if the background stream is already being shut down or the shutdown
62 has completed, this method is a no-op.
63
64 .. versionchanged:: 2.4.1
65 The method does not block anymore, it just triggers the shutdown and returns
66 immediately. To block until the background stream is terminated, call
67 :meth:`result()` after cancelling the future.
68
69 .. versionchanged:: 2.10.0
70 The method always returns ``True`` instead of ``None``.
71 """
72 # NOTE: We circumvent the base future's self._state to track the cancellation
73 # state, as this state has different meaning with streaming pull futures.
74 self.__cancelled = True
75 self.__manager.close()
76 return True
77
78 def cancelled(self) -> bool:
79 """
80 Returns:
81 ``True`` if the subscription has been cancelled.
82 """
83 return self.__cancelled
84
85
86class Future(futures.Future):
87 """This future object is for subscribe-side calls.
88
89 Calling :meth:`result` will resolve the future by returning the message
90 ID, unless an error occurs.
91 """
92
93 def cancel(self) -> bool:
94 """Actions in Pub/Sub generally may not be canceled.
95
96 This method always returns ``False``.
97 """
98 return False
99
100 def cancelled(self) -> bool:
101 """Actions in Pub/Sub generally may not be canceled.
102
103 This method always returns ``False``.
104 """
105 return False
106
107 def result(self, timeout: Union[int, float, None] = None) -> AcknowledgeStatus:
108 """Return a success code or raise an exception.
109
110 This blocks until the operation completes successfully and
111 returns the error code unless an exception is raised.
112
113 Args:
114 timeout: The number of seconds before this call
115 times out and raises TimeoutError.
116
117 Returns:
118 AcknowledgeStatus.SUCCESS if the operation succeeded.
119
120 Raises:
121 concurrent.futures.TimeoutError: If the request times out.
122 AcknowledgeError: If the operation did not succeed for another
123 reason.
124 """
125 return super().result(timeout=timeout)