Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/google/cloud/pubsub_v1/subscriber/scheduler.py: 46%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

41 statements  

1# Copyright 2018, Google LLC 

2# 

3# Licensed under the Apache License, Version 2.0 (the "License"); 

4# you may not use this file except in compliance with the License. 

5# You may obtain a copy of the License at 

6# 

7# https://www.apache.org/licenses/LICENSE-2.0 

8# 

9# Unless required by applicable law or agreed to in writing, software 

10# distributed under the License is distributed on an "AS IS" BASIS, 

11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

12# See the License for the specific language governing permissions and 

13# limitations under the License. 

14 

15"""Schedulers provide means to *schedule* callbacks asynchronously. 

16 

17These are used by the subscriber to call the user-provided callback to process 

18each message. 

19""" 

20 

21import abc 

22import concurrent.futures 

23import queue 

24import typing 

25from typing import Callable, List, Optional 

26import warnings 

27 

28if typing.TYPE_CHECKING: # pragma: NO COVER 

29 from google.cloud import pubsub_v1 

30 

31 

32class Scheduler(metaclass=abc.ABCMeta): 

33 """Abstract base class for schedulers. 

34 

35 Schedulers are used to schedule callbacks asynchronously. 

36 """ 

37 

38 @property 

39 @abc.abstractmethod 

40 def queue(self) -> queue.Queue: # pragma: NO COVER 

41 """Queue: A concurrency-safe queue specific to the underlying 

42 concurrency implementation. 

43 

44 This queue is used to send messages *back* to the scheduling actor. 

45 """ 

46 raise NotImplementedError 

47 

48 @abc.abstractmethod 

49 def schedule(self, callback: Callable, *args, **kwargs) -> None: # pragma: NO COVER 

50 """Schedule the callback to be called asynchronously. 

51 

52 Args: 

53 callback: The function to call. 

54 args: Positional arguments passed to the callback. 

55 kwargs: Key-word arguments passed to the callback. 

56 

57 Returns: 

58 None 

59 """ 

60 raise NotImplementedError 

61 

62 @abc.abstractmethod 

63 def shutdown( 

64 self, await_msg_callbacks: bool = False 

65 ) -> List["pubsub_v1.subscriber.message.Message"]: # pragma: NO COVER 

66 """Shuts down the scheduler and immediately end all pending callbacks. 

67 

68 Args: 

69 await_msg_callbacks: 

70 If ``True``, the method will block until all currently executing 

71 callbacks are done processing. If ``False`` (default), the 

72 method will not wait for the currently running callbacks to complete. 

73 

74 Returns: 

75 The messages submitted to the scheduler that were not yet dispatched 

76 to their callbacks. 

77 It is assumed that each message was submitted to the scheduler as the 

78 first positional argument to the provided callback. 

79 """ 

80 raise NotImplementedError 

81 

82 

83def _make_default_thread_pool_executor() -> concurrent.futures.ThreadPoolExecutor: 

84 return concurrent.futures.ThreadPoolExecutor( 

85 max_workers=10, thread_name_prefix="ThreadPoolExecutor-ThreadScheduler" 

86 ) 

87 

88 

89class ThreadScheduler(Scheduler): 

90 """A thread pool-based scheduler. It must not be shared across 

91 SubscriberClients. 

92 

93 This scheduler is useful in typical I/O-bound message processing. 

94 

95 Args: 

96 executor: 

97 An optional executor to use. If not specified, a default one 

98 will be created. 

99 """ 

100 

101 def __init__( 

102 self, executor: Optional[concurrent.futures.ThreadPoolExecutor] = None 

103 ): 

104 self._queue: queue.Queue = queue.Queue() 

105 if executor is None: 

106 self._executor = _make_default_thread_pool_executor() 

107 else: 

108 self._executor = executor 

109 

110 @property 

111 def queue(self): 

112 """Queue: A thread-safe queue used for communication between callbacks 

113 and the scheduling thread.""" 

114 return self._queue 

115 

116 def schedule(self, callback: Callable, *args, **kwargs) -> None: 

117 """Schedule the callback to be called asynchronously in a thread pool. 

118 

119 Args: 

120 callback: The function to call. 

121 args: Positional arguments passed to the callback. 

122 kwargs: Key-word arguments passed to the callback. 

123 

124 Returns: 

125 None 

126 """ 

127 try: 

128 self._executor.submit(callback, *args, **kwargs) 

129 except RuntimeError: 

130 warnings.warn( 

131 "Scheduling a callback after executor shutdown.", 

132 category=RuntimeWarning, 

133 stacklevel=2, 

134 ) 

135 

136 def shutdown( 

137 self, await_msg_callbacks: bool = False 

138 ) -> List["pubsub_v1.subscriber.message.Message"]: 

139 """Shut down the scheduler and immediately end all pending callbacks. 

140 

141 Args: 

142 await_msg_callbacks: 

143 If ``True``, the method will block until all currently executing 

144 executor threads are done processing. If ``False`` (default), the 

145 method will not wait for the currently running threads to complete. 

146 

147 Returns: 

148 The messages submitted to the scheduler that were not yet dispatched 

149 to their callbacks. 

150 It is assumed that each message was submitted to the scheduler as the 

151 first positional argument to the provided callback. 

152 """ 

153 dropped_messages = [] 

154 

155 # Drop all pending item from the executor. Without this, the executor will also 

156 # try to process any pending work items before termination, which is undesirable. 

157 # 

158 # TODO: Replace the logic below by passing `cancel_futures=True` to shutdown() 

159 # once we only need to support Python 3.9+. 

160 try: 

161 while True: 

162 work_item = self._executor._work_queue.get(block=False) 

163 if work_item is None: # Exceutor in shutdown mode. 

164 continue 

165 dropped_messages.append(work_item.args[0]) # type: ignore[index] 

166 except queue.Empty: 

167 pass 

168 

169 self._executor.shutdown(wait=await_msg_callbacks) 

170 return dropped_messages