1# Copyright 2018, Google LLC
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7# https://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14
15"""Schedulers provide means to *schedule* callbacks asynchronously.
16
17These are used by the subscriber to call the user-provided callback to process
18each message.
19"""
20
21import abc
22import concurrent.futures
23import queue
24import typing
25from typing import Callable, List, Optional
26import warnings
27
28if typing.TYPE_CHECKING: # pragma: NO COVER
29 from google.cloud import pubsub_v1
30
31
32class Scheduler(metaclass=abc.ABCMeta):
33 """Abstract base class for schedulers.
34
35 Schedulers are used to schedule callbacks asynchronously.
36 """
37
38 @property
39 @abc.abstractmethod
40 def queue(self) -> queue.Queue: # pragma: NO COVER
41 """Queue: A concurrency-safe queue specific to the underlying
42 concurrency implementation.
43
44 This queue is used to send messages *back* to the scheduling actor.
45 """
46 raise NotImplementedError
47
48 @abc.abstractmethod
49 def schedule(self, callback: Callable, *args, **kwargs) -> None: # pragma: NO COVER
50 """Schedule the callback to be called asynchronously.
51
52 Args:
53 callback: The function to call.
54 args: Positional arguments passed to the callback.
55 kwargs: Key-word arguments passed to the callback.
56
57 Returns:
58 None
59 """
60 raise NotImplementedError
61
62 @abc.abstractmethod
63 def shutdown(
64 self, await_msg_callbacks: bool = False
65 ) -> List["pubsub_v1.subscriber.message.Message"]: # pragma: NO COVER
66 """Shuts down the scheduler and immediately end all pending callbacks.
67
68 Args:
69 await_msg_callbacks:
70 If ``True``, the method will block until all currently executing
71 callbacks are done processing. If ``False`` (default), the
72 method will not wait for the currently running callbacks to complete.
73
74 Returns:
75 The messages submitted to the scheduler that were not yet dispatched
76 to their callbacks.
77 It is assumed that each message was submitted to the scheduler as the
78 first positional argument to the provided callback.
79 """
80 raise NotImplementedError
81
82
83def _make_default_thread_pool_executor() -> concurrent.futures.ThreadPoolExecutor:
84 return concurrent.futures.ThreadPoolExecutor(
85 max_workers=10, thread_name_prefix="ThreadPoolExecutor-ThreadScheduler"
86 )
87
88
89class ThreadScheduler(Scheduler):
90 """A thread pool-based scheduler. It must not be shared across
91 SubscriberClients.
92
93 This scheduler is useful in typical I/O-bound message processing.
94
95 Args:
96 executor:
97 An optional executor to use. If not specified, a default one
98 will be created.
99 """
100
101 def __init__(
102 self, executor: Optional[concurrent.futures.ThreadPoolExecutor] = None
103 ):
104 self._queue: queue.Queue = queue.Queue()
105 if executor is None:
106 self._executor = _make_default_thread_pool_executor()
107 else:
108 self._executor = executor
109
110 @property
111 def queue(self):
112 """Queue: A thread-safe queue used for communication between callbacks
113 and the scheduling thread."""
114 return self._queue
115
116 def schedule(self, callback: Callable, *args, **kwargs) -> None:
117 """Schedule the callback to be called asynchronously in a thread pool.
118
119 Args:
120 callback: The function to call.
121 args: Positional arguments passed to the callback.
122 kwargs: Key-word arguments passed to the callback.
123
124 Returns:
125 None
126 """
127 try:
128 self._executor.submit(callback, *args, **kwargs)
129 except RuntimeError:
130 warnings.warn(
131 "Scheduling a callback after executor shutdown.",
132 category=RuntimeWarning,
133 stacklevel=2,
134 )
135
136 def shutdown(
137 self, await_msg_callbacks: bool = False
138 ) -> List["pubsub_v1.subscriber.message.Message"]:
139 """Shut down the scheduler and immediately end all pending callbacks.
140
141 Args:
142 await_msg_callbacks:
143 If ``True``, the method will block until all currently executing
144 executor threads are done processing. If ``False`` (default), the
145 method will not wait for the currently running threads to complete.
146
147 Returns:
148 The messages submitted to the scheduler that were not yet dispatched
149 to their callbacks.
150 It is assumed that each message was submitted to the scheduler as the
151 first positional argument to the provided callback.
152 """
153 dropped_messages = []
154
155 # Drop all pending item from the executor. Without this, the executor will also
156 # try to process any pending work items before termination, which is undesirable.
157 #
158 # TODO: Replace the logic below by passing `cancel_futures=True` to shutdown()
159 # once we only need to support Python 3.9+.
160 try:
161 while True:
162 work_item = self._executor._work_queue.get(block=False)
163 if work_item is None: # Exceutor in shutdown mode.
164 continue
165 dropped_messages.append(work_item.args[0]) # type: ignore[index]
166 except queue.Empty:
167 pass
168
169 self._executor.shutdown(wait=await_msg_callbacks)
170 return dropped_messages