Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/zmq/sugar/poll.py: 23%

62 statements  

« prev     ^ index     » next       coverage.py v7.2.7, created at 2023-07-01 06:54 +0000

1"""0MQ polling related functions and classes.""" 

2 

3# Copyright (C) PyZMQ Developers 

4# Distributed under the terms of the Modified BSD License. 

5 

6from typing import Any, Dict, List, Optional, Tuple 

7 

8from zmq.backend import zmq_poll 

9from zmq.constants import POLLERR, POLLIN, POLLOUT 

10 

11# ----------------------------------------------------------------------------- 

12# Polling related methods 

13# ----------------------------------------------------------------------------- 

14 

15 

16class Poller: 

17 """A stateful poll interface that mirrors Python's built-in poll.""" 

18 

19 sockets: List[Tuple[Any, int]] 

20 _map: Dict 

21 

22 def __init__(self) -> None: 

23 self.sockets = [] 

24 self._map = {} 

25 

26 def __contains__(self, socket: Any) -> bool: 

27 return socket in self._map 

28 

29 def register(self, socket: Any, flags: int = POLLIN | POLLOUT): 

30 """p.register(socket, flags=POLLIN|POLLOUT) 

31 

32 Register a 0MQ socket or native fd for I/O monitoring. 

33 

34 register(s,0) is equivalent to unregister(s). 

35 

36 Parameters 

37 ---------- 

38 socket : zmq.Socket or native socket 

39 A zmq.Socket or any Python object having a ``fileno()`` 

40 method that returns a valid file descriptor. 

41 flags : int 

42 The events to watch for. Can be POLLIN, POLLOUT or POLLIN|POLLOUT. 

43 If `flags=0`, socket will be unregistered. 

44 """ 

45 if flags: 

46 if socket in self._map: 

47 idx = self._map[socket] 

48 self.sockets[idx] = (socket, flags) 

49 else: 

50 idx = len(self.sockets) 

51 self.sockets.append((socket, flags)) 

52 self._map[socket] = idx 

53 elif socket in self._map: 

54 # uregister sockets registered with no events 

55 self.unregister(socket) 

56 else: 

57 # ignore new sockets with no events 

58 pass 

59 

60 def modify(self, socket, flags=POLLIN | POLLOUT): 

61 """Modify the flags for an already registered 0MQ socket or native fd.""" 

62 self.register(socket, flags) 

63 

64 def unregister(self, socket: Any): 

65 """Remove a 0MQ socket or native fd for I/O monitoring. 

66 

67 Parameters 

68 ---------- 

69 socket : Socket 

70 The socket instance to stop polling. 

71 """ 

72 idx = self._map.pop(socket) 

73 self.sockets.pop(idx) 

74 # shift indices after deletion 

75 for socket, flags in self.sockets[idx:]: 

76 self._map[socket] -= 1 

77 

78 def poll(self, timeout: Optional[int] = None) -> List[Tuple[Any, int]]: 

79 """Poll the registered 0MQ or native fds for I/O. 

80 

81 If there are currently events ready to be processed, this function will return immediately. 

82 Otherwise, this function will return as soon the first event is available or after timeout 

83 milliseconds have elapsed. 

84 

85 Parameters 

86 ---------- 

87 timeout : int 

88 The timeout in milliseconds. If None, no `timeout` (infinite). This 

89 is in milliseconds to be compatible with ``select.poll()``. 

90 

91 Returns 

92 ------- 

93 events : list of tuples 

94 The list of events that are ready to be processed. 

95 This is a list of tuples of the form ``(socket, event_mask)``, where the 0MQ Socket 

96 or integer fd is the first element, and the poll event mask (POLLIN, POLLOUT) is the second. 

97 It is common to call ``events = dict(poller.poll())``, 

98 which turns the list of tuples into a mapping of ``socket : event_mask``. 

99 """ 

100 if timeout is None or timeout < 0: 

101 timeout = -1 

102 elif isinstance(timeout, float): 

103 timeout = int(timeout) 

104 return zmq_poll(self.sockets, timeout=timeout) 

105 

106 

107def select(rlist: List, wlist: List, xlist: List, timeout: Optional[float] = None): 

108 """select(rlist, wlist, xlist, timeout=None) -> (rlist, wlist, xlist) 

109 

110 Return the result of poll as a lists of sockets ready for r/w/exception. 

111 

112 This has the same interface as Python's built-in ``select.select()`` function. 

113 

114 Parameters 

115 ---------- 

116 timeout : float, int, optional 

117 The timeout in seconds. If None, no timeout (infinite). This is in seconds to be 

118 compatible with ``select.select()``. 

119 rlist : list of sockets/FDs 

120 sockets/FDs to be polled for read events 

121 wlist : list of sockets/FDs 

122 sockets/FDs to be polled for write events 

123 xlist : list of sockets/FDs 

124 sockets/FDs to be polled for error events 

125 

126 Returns 

127 ------- 

128 (rlist, wlist, xlist) : tuple of lists of sockets (length 3) 

129 Lists correspond to sockets available for read/write/error events respectively. 

130 """ 

131 if timeout is None: 

132 timeout = -1 

133 # Convert from sec -> ms for zmq_poll. 

134 # zmq_poll accepts 3.x style timeout in ms 

135 timeout = int(timeout * 1000.0) 

136 if timeout < 0: 

137 timeout = -1 

138 sockets = [] 

139 for s in set(rlist + wlist + xlist): 

140 flags = 0 

141 if s in rlist: 

142 flags |= POLLIN 

143 if s in wlist: 

144 flags |= POLLOUT 

145 if s in xlist: 

146 flags |= POLLERR 

147 sockets.append((s, flags)) 

148 return_sockets = zmq_poll(sockets, timeout) 

149 rlist, wlist, xlist = [], [], [] 

150 for s, flags in return_sockets: 

151 if flags & POLLIN: 

152 rlist.append(s) 

153 if flags & POLLOUT: 

154 wlist.append(s) 

155 if flags & POLLERR: 

156 xlist.append(s) 

157 return rlist, wlist, xlist 

158 

159 

160# ----------------------------------------------------------------------------- 

161# Symbols to export 

162# ----------------------------------------------------------------------------- 

163 

164__all__ = ['Poller', 'select']