Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/google/cloud/pubsub_v1/subscriber/_protocol/heartbeater.py: 34%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

38 statements  

1# Copyright 2018, Google LLC 

2# 

3# Licensed under the Apache License, Version 2.0 (the "License"); 

4# you may not use this file except in compliance with the License. 

5# You may obtain a copy of the License at 

6# 

7# https://www.apache.org/licenses/LICENSE-2.0 

8# 

9# Unless required by applicable law or agreed to in writing, software 

10# distributed under the License is distributed on an "AS IS" BASIS, 

11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

12# See the License for the specific language governing permissions and 

13# limitations under the License. 

14 

15from __future__ import absolute_import 

16 

17import logging 

18import threading 

19import typing 

20from typing import Optional 

21 

22if typing.TYPE_CHECKING: # pragma: NO COVER 

23 from google.cloud.pubsub_v1.subscriber._protocol.streaming_pull_manager import ( 

24 StreamingPullManager, 

25 ) 

26 

27 

28_LOGGER = logging.getLogger(__name__) 

29_HEARTBEAT_WORKER_NAME = "Thread-Heartbeater" 

30# How often to send heartbeats in seconds. Determined as half the period of 

31# time where the Pub/Sub server will close the stream as inactive, which is 

32# 60 seconds. 

33_DEFAULT_PERIOD = 30 

34 

35 

36class Heartbeater(object): 

37 def __init__(self, manager: "StreamingPullManager", period: int = _DEFAULT_PERIOD): 

38 self._thread: Optional[threading.Thread] = None 

39 self._operational_lock = threading.Lock() 

40 self._manager = manager 

41 self._stop_event = threading.Event() 

42 self._period = period 

43 

44 def heartbeat(self) -> None: 

45 """Periodically send streaming pull heartbeats.""" 

46 while not self._stop_event.is_set(): 

47 if self._manager.heartbeat(): 

48 _LOGGER.debug("Sent heartbeat.") 

49 self._stop_event.wait(timeout=self._period) 

50 

51 _LOGGER.debug("%s exiting.", _HEARTBEAT_WORKER_NAME) 

52 

53 def start(self) -> None: 

54 with self._operational_lock: 

55 if self._thread is not None: 

56 raise ValueError("Heartbeater is already running.") 

57 

58 # Create and start the helper thread. 

59 self._stop_event.clear() 

60 thread = threading.Thread( 

61 name=_HEARTBEAT_WORKER_NAME, target=self.heartbeat 

62 ) 

63 thread.daemon = True 

64 thread.start() 

65 _LOGGER.debug("Started helper thread %s", thread.name) 

66 self._thread = thread 

67 

68 def stop(self) -> None: 

69 with self._operational_lock: 

70 self._stop_event.set() 

71 

72 if self._thread is not None: 

73 # The thread should automatically exit when the consumer is 

74 # inactive. 

75 self._thread.join() 

76 

77 self._thread = None