Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/google/cloud/pubsub_v1/subscriber/futures.py: 47%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

32 statements  

1# Copyright 2017, Google LLC All rights reserved. 

2# 

3# Licensed under the Apache License, Version 2.0 (the "License"); 

4# you may not use this file except in compliance with the License. 

5# You may obtain a copy of the License at 

6# 

7# http://www.apache.org/licenses/LICENSE-2.0 

8# 

9# Unless required by applicable law or agreed to in writing, software 

10# distributed under the License is distributed on an "AS IS" BASIS, 

11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

12# See the License for the specific language governing permissions and 

13# limitations under the License. 

14 

15from __future__ import absolute_import 

16 

17import typing 

18from typing import Any 

19from typing import Union 

20 

21from google.cloud.pubsub_v1 import futures 

22from google.cloud.pubsub_v1.subscriber.exceptions import AcknowledgeStatus 

23 

24if typing.TYPE_CHECKING: # pragma: NO COVER 

25 from google.cloud.pubsub_v1.subscriber._protocol.streaming_pull_manager import ( 

26 StreamingPullManager, 

27 ) 

28 

29 

30class StreamingPullFuture(futures.Future): 

31 """Represents a process that asynchronously performs streaming pull and 

32 schedules messages to be processed. 

33 

34 This future is resolved when the process is stopped (via :meth:`cancel`) or 

35 if it encounters an unrecoverable error. Calling `.result()` will cause 

36 the calling thread to block indefinitely. 

37 """ 

38 

39 def __init__(self, manager: "StreamingPullManager"): 

40 super(StreamingPullFuture, self).__init__() 

41 self.__manager = manager 

42 self.__manager.add_close_callback(self._on_close_callback) 

43 self.__cancelled = False 

44 

45 def _on_close_callback(self, manager: "StreamingPullManager", result: Any): 

46 if self.done(): 

47 # The future has already been resolved in a different thread, 

48 # nothing to do on the streaming pull manager shutdown. 

49 return 

50 

51 if result is None: 

52 self.set_result(True) 

53 else: 

54 self.set_exception(result) 

55 

56 def cancel(self) -> bool: 

57 """Stops pulling messages and shutdowns the background thread consuming 

58 messages. 

59 

60 The method always returns ``True``, as the shutdown is always initiated. 

61 However, if the background stream is already being shut down or the shutdown 

62 has completed, this method is a no-op. 

63 

64 .. versionchanged:: 2.4.1 

65 The method does not block anymore, it just triggers the shutdown and returns 

66 immediately. To block until the background stream is terminated, call 

67 :meth:`result()` after cancelling the future. 

68 

69 .. versionchanged:: 2.10.0 

70 The method always returns ``True`` instead of ``None``. 

71 """ 

72 # NOTE: We circumvent the base future's self._state to track the cancellation 

73 # state, as this state has different meaning with streaming pull futures. 

74 self.__cancelled = True 

75 self.__manager.close() 

76 return True 

77 

78 def cancelled(self) -> bool: 

79 """ 

80 Returns: 

81 ``True`` if the subscription has been cancelled. 

82 """ 

83 return self.__cancelled 

84 

85 

86class Future(futures.Future): 

87 """This future object is for subscribe-side calls. 

88 

89 Calling :meth:`result` will resolve the future by returning the message 

90 ID, unless an error occurs. 

91 """ 

92 

93 def cancel(self) -> bool: 

94 """Actions in Pub/Sub generally may not be canceled. 

95 

96 This method always returns ``False``. 

97 """ 

98 return False 

99 

100 def cancelled(self) -> bool: 

101 """Actions in Pub/Sub generally may not be canceled. 

102 

103 This method always returns ``False``. 

104 """ 

105 return False 

106 

107 def result(self, timeout: Union[int, float, None] = None) -> AcknowledgeStatus: 

108 """Return a success code or raise an exception. 

109 

110 This blocks until the operation completes successfully and 

111 returns the error code unless an exception is raised. 

112 

113 Args: 

114 timeout: The number of seconds before this call 

115 times out and raises TimeoutError. 

116 

117 Returns: 

118 AcknowledgeStatus.SUCCESS if the operation succeeded. 

119 

120 Raises: 

121 concurrent.futures.TimeoutError: If the request times out. 

122 AcknowledgeError: If the operation did not succeed for another 

123 reason. 

124 """ 

125 return super().result(timeout=timeout)