Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/zmq/sugar/tracker.py: 32%

53 statements  

« prev     ^ index     » next       coverage.py v7.2.7, created at 2023-07-01 06:54 +0000

1"""Tracker for zero-copy messages with 0MQ.""" 

2 

3# Copyright (C) PyZMQ Developers 

4# Distributed under the terms of the Modified BSD License. 

5 

6import time 

7from threading import Event 

8from typing import Set, Tuple, Union 

9 

10from zmq.backend import Frame 

11from zmq.error import NotDone 

12 

13 

14class MessageTracker: 

15 """MessageTracker(*towatch) 

16 

17 A class for tracking if 0MQ is done using one or more messages. 

18 

19 When you send a 0MQ message, it is not sent immediately. The 0MQ IO thread 

20 sends the message at some later time. Often you want to know when 0MQ has 

21 actually sent the message though. This is complicated by the fact that 

22 a single 0MQ message can be sent multiple times using different sockets. 

23 This class allows you to track all of the 0MQ usages of a message. 

24 

25 Parameters 

26 ---------- 

27 towatch : Event, MessageTracker, Message instances. 

28 This objects to track. This class can track the low-level 

29 Events used by the Message class, other MessageTrackers or 

30 actual Messages. 

31 """ 

32 

33 events: Set[Event] 

34 peers: Set["MessageTracker"] 

35 

36 def __init__(self, *towatch: Tuple[Union["MessageTracker", Event, Frame]]): 

37 """MessageTracker(*towatch) 

38 

39 Create a message tracker to track a set of messages. 

40 

41 Parameters 

42 ---------- 

43 *towatch : tuple of Event, MessageTracker, Message instances. 

44 This list of objects to track. This class can track the low-level 

45 Events used by the Message class, other MessageTrackers or 

46 actual Messages. 

47 """ 

48 self.events = set() 

49 self.peers = set() 

50 for obj in towatch: 

51 if isinstance(obj, Event): 

52 self.events.add(obj) 

53 elif isinstance(obj, MessageTracker): 

54 self.peers.add(obj) 

55 elif isinstance(obj, Frame): 

56 if not obj.tracker: 

57 raise ValueError("Not a tracked message") 

58 self.peers.add(obj.tracker) 

59 else: 

60 raise TypeError("Require Events or Message Frames, not %s" % type(obj)) 

61 

62 @property 

63 def done(self): 

64 """Is 0MQ completely done with the message(s) being tracked?""" 

65 for evt in self.events: 

66 if not evt.is_set(): 

67 return False 

68 for pm in self.peers: 

69 if not pm.done: 

70 return False 

71 return True 

72 

73 def wait(self, timeout: Union[float, int] = -1): 

74 """mt.wait(timeout=-1) 

75 

76 Wait for 0MQ to be done with the message or until `timeout`. 

77 

78 Parameters 

79 ---------- 

80 timeout : float [default: -1, wait forever] 

81 Maximum time in (s) to wait before raising NotDone. 

82 

83 Returns 

84 ------- 

85 None 

86 if done before `timeout` 

87 

88 Raises 

89 ------ 

90 NotDone 

91 if `timeout` reached before I am done. 

92 """ 

93 tic = time.time() 

94 remaining: float 

95 if timeout is False or timeout < 0: 

96 remaining = 3600 * 24 * 7 # a week 

97 else: 

98 remaining = timeout 

99 for evt in self.events: 

100 if remaining < 0: 

101 raise NotDone 

102 evt.wait(timeout=remaining) 

103 if not evt.is_set(): 

104 raise NotDone 

105 toc = time.time() 

106 remaining -= toc - tic 

107 tic = toc 

108 

109 for peer in self.peers: 

110 if remaining < 0: 

111 raise NotDone 

112 peer.wait(timeout=remaining) 

113 toc = time.time() 

114 remaining -= toc - tic 

115 tic = toc 

116 

117 

118_FINISHED_TRACKER = MessageTracker() 

119 

120__all__ = ['MessageTracker', '_FINISHED_TRACKER']