Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/zmq/sugar/tracker.py: 32%
53 statements
« prev ^ index » next coverage.py v7.2.7, created at 2023-07-01 06:54 +0000
« prev ^ index » next coverage.py v7.2.7, created at 2023-07-01 06:54 +0000
1"""Tracker for zero-copy messages with 0MQ."""
3# Copyright (C) PyZMQ Developers
4# Distributed under the terms of the Modified BSD License.
6import time
7from threading import Event
8from typing import Set, Tuple, Union
10from zmq.backend import Frame
11from zmq.error import NotDone
14class MessageTracker:
15 """MessageTracker(*towatch)
17 A class for tracking if 0MQ is done using one or more messages.
19 When you send a 0MQ message, it is not sent immediately. The 0MQ IO thread
20 sends the message at some later time. Often you want to know when 0MQ has
21 actually sent the message though. This is complicated by the fact that
22 a single 0MQ message can be sent multiple times using different sockets.
23 This class allows you to track all of the 0MQ usages of a message.
25 Parameters
26 ----------
27 towatch : Event, MessageTracker, Message instances.
28 This objects to track. This class can track the low-level
29 Events used by the Message class, other MessageTrackers or
30 actual Messages.
31 """
33 events: Set[Event]
34 peers: Set["MessageTracker"]
36 def __init__(self, *towatch: Tuple[Union["MessageTracker", Event, Frame]]):
37 """MessageTracker(*towatch)
39 Create a message tracker to track a set of messages.
41 Parameters
42 ----------
43 *towatch : tuple of Event, MessageTracker, Message instances.
44 This list of objects to track. This class can track the low-level
45 Events used by the Message class, other MessageTrackers or
46 actual Messages.
47 """
48 self.events = set()
49 self.peers = set()
50 for obj in towatch:
51 if isinstance(obj, Event):
52 self.events.add(obj)
53 elif isinstance(obj, MessageTracker):
54 self.peers.add(obj)
55 elif isinstance(obj, Frame):
56 if not obj.tracker:
57 raise ValueError("Not a tracked message")
58 self.peers.add(obj.tracker)
59 else:
60 raise TypeError("Require Events or Message Frames, not %s" % type(obj))
62 @property
63 def done(self):
64 """Is 0MQ completely done with the message(s) being tracked?"""
65 for evt in self.events:
66 if not evt.is_set():
67 return False
68 for pm in self.peers:
69 if not pm.done:
70 return False
71 return True
73 def wait(self, timeout: Union[float, int] = -1):
74 """mt.wait(timeout=-1)
76 Wait for 0MQ to be done with the message or until `timeout`.
78 Parameters
79 ----------
80 timeout : float [default: -1, wait forever]
81 Maximum time in (s) to wait before raising NotDone.
83 Returns
84 -------
85 None
86 if done before `timeout`
88 Raises
89 ------
90 NotDone
91 if `timeout` reached before I am done.
92 """
93 tic = time.time()
94 remaining: float
95 if timeout is False or timeout < 0:
96 remaining = 3600 * 24 * 7 # a week
97 else:
98 remaining = timeout
99 for evt in self.events:
100 if remaining < 0:
101 raise NotDone
102 evt.wait(timeout=remaining)
103 if not evt.is_set():
104 raise NotDone
105 toc = time.time()
106 remaining -= toc - tic
107 tic = toc
109 for peer in self.peers:
110 if remaining < 0:
111 raise NotDone
112 peer.wait(timeout=remaining)
113 toc = time.time()
114 remaining -= toc - tic
115 tic = toc
118_FINISHED_TRACKER = MessageTracker()
120__all__ = ['MessageTracker', '_FINISHED_TRACKER']