1# Copyright 2018, Google LLC
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7# https://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14
15from __future__ import absolute_import
16
17import logging
18import threading
19import typing
20from typing import Optional
21
22if typing.TYPE_CHECKING: # pragma: NO COVER
23 from google.cloud.pubsub_v1.subscriber._protocol.streaming_pull_manager import (
24 StreamingPullManager,
25 )
26
27
28_LOGGER = logging.getLogger(__name__)
29_HEARTBEAT_WORKER_NAME = "Thread-Heartbeater"
30# How often to send heartbeats in seconds. Determined as half the period of
31# time where the Pub/Sub server will close the stream as inactive, which is
32# 60 seconds.
33_DEFAULT_PERIOD = 30
34
35
36class Heartbeater(object):
37 def __init__(self, manager: "StreamingPullManager", period: int = _DEFAULT_PERIOD):
38 self._thread: Optional[threading.Thread] = None
39 self._operational_lock = threading.Lock()
40 self._manager = manager
41 self._stop_event = threading.Event()
42 self._period = period
43
44 def heartbeat(self) -> None:
45 """Periodically send streaming pull heartbeats."""
46 while not self._stop_event.is_set():
47 if self._manager.heartbeat():
48 _LOGGER.debug("Sent heartbeat.")
49 self._stop_event.wait(timeout=self._period)
50
51 _LOGGER.debug("%s exiting.", _HEARTBEAT_WORKER_NAME)
52
53 def start(self) -> None:
54 with self._operational_lock:
55 if self._thread is not None:
56 raise ValueError("Heartbeater is already running.")
57
58 # Create and start the helper thread.
59 self._stop_event.clear()
60 thread = threading.Thread(
61 name=_HEARTBEAT_WORKER_NAME, target=self.heartbeat
62 )
63 thread.daemon = True
64 thread.start()
65 _LOGGER.debug("Started helper thread %s", thread.name)
66 self._thread = thread
67
68 def stop(self) -> None:
69 with self._operational_lock:
70 self._stop_event.set()
71
72 if self._thread is not None:
73 # The thread should automatically exit when the consumer is
74 # inactive.
75 self._thread.join()
76
77 self._thread = None