Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.10/site-packages/zmq/sugar/poll.py: 24%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

63 statements  

1"""0MQ polling related functions and classes.""" 

2 

3# Copyright (C) PyZMQ Developers 

4# Distributed under the terms of the Modified BSD License. 

5 

6from __future__ import annotations 

7 

8from typing import Any 

9 

10from zmq.backend import zmq_poll 

11from zmq.constants import POLLERR, POLLIN, POLLOUT 

12 

13# ----------------------------------------------------------------------------- 

14# Polling related methods 

15# ----------------------------------------------------------------------------- 

16 

17 

18class Poller: 

19 """A stateful poll interface that mirrors Python's built-in poll.""" 

20 

21 sockets: list[tuple[Any, int]] 

22 _map: dict 

23 

24 def __init__(self) -> None: 

25 self.sockets = [] 

26 self._map = {} 

27 

28 def __contains__(self, socket: Any) -> bool: 

29 return socket in self._map 

30 

31 def register(self, socket: Any, flags: int = POLLIN | POLLOUT): 

32 """p.register(socket, flags=POLLIN|POLLOUT) 

33 

34 Register a 0MQ socket or native fd for I/O monitoring. 

35 

36 register(s,0) is equivalent to unregister(s). 

37 

38 Parameters 

39 ---------- 

40 socket : zmq.Socket or native socket 

41 A zmq.Socket or any Python object having a ``fileno()`` 

42 method that returns a valid file descriptor. 

43 flags : int 

44 The events to watch for. Can be POLLIN, POLLOUT or POLLIN|POLLOUT. 

45 If `flags=0`, socket will be unregistered. 

46 """ 

47 if flags: 

48 if socket in self._map: 

49 idx = self._map[socket] 

50 self.sockets[idx] = (socket, flags) 

51 else: 

52 idx = len(self.sockets) 

53 self.sockets.append((socket, flags)) 

54 self._map[socket] = idx 

55 elif socket in self._map: 

56 # uregister sockets registered with no events 

57 self.unregister(socket) 

58 else: 

59 # ignore new sockets with no events 

60 pass 

61 

62 def modify(self, socket, flags=POLLIN | POLLOUT): 

63 """Modify the flags for an already registered 0MQ socket or native fd.""" 

64 self.register(socket, flags) 

65 

66 def unregister(self, socket: Any): 

67 """Remove a 0MQ socket or native fd for I/O monitoring. 

68 

69 Parameters 

70 ---------- 

71 socket : Socket 

72 The socket instance to stop polling. 

73 """ 

74 idx = self._map.pop(socket) 

75 self.sockets.pop(idx) 

76 # shift indices after deletion 

77 for socket, flags in self.sockets[idx:]: 

78 self._map[socket] -= 1 

79 

80 def poll(self, timeout: int | None = None) -> list[tuple[Any, int]]: 

81 """Poll the registered 0MQ or native fds for I/O. 

82 

83 If there are currently events ready to be processed, this function will return immediately. 

84 Otherwise, this function will return as soon the first event is available or after timeout 

85 milliseconds have elapsed. 

86 

87 Parameters 

88 ---------- 

89 timeout : int 

90 The timeout in milliseconds. If None, no `timeout` (infinite). This 

91 is in milliseconds to be compatible with ``select.poll()``. 

92 

93 Returns 

94 ------- 

95 events : list 

96 The list of events that are ready to be processed. 

97 This is a list of tuples of the form ``(socket, event_mask)``, where the 0MQ Socket 

98 or integer fd is the first element, and the poll event mask (POLLIN, POLLOUT) is the second. 

99 It is common to call ``events = dict(poller.poll())``, 

100 which turns the list of tuples into a mapping of ``socket : event_mask``. 

101 """ 

102 if timeout is None or timeout < 0: 

103 timeout = -1 

104 elif isinstance(timeout, float): 

105 timeout = int(timeout) 

106 return zmq_poll(self.sockets, timeout=timeout) 

107 

108 

109def select( 

110 rlist: list, wlist: list, xlist: list, timeout: float | None = None 

111) -> tuple[list, list, list]: 

112 """select(rlist, wlist, xlist, timeout=None) -> (rlist, wlist, xlist) 

113 

114 Return the result of poll as a lists of sockets ready for r/w/exception. 

115 

116 This has the same interface as Python's built-in ``select.select()`` function. 

117 

118 Parameters 

119 ---------- 

120 timeout : float, optional 

121 The timeout in seconds. If None, no timeout (infinite). This is in seconds to be 

122 compatible with ``select.select()``. 

123 rlist : list 

124 sockets/FDs to be polled for read events 

125 wlist : list 

126 sockets/FDs to be polled for write events 

127 xlist : list 

128 sockets/FDs to be polled for error events 

129 

130 Returns 

131 ------- 

132 rlist: list 

133 list of sockets or FDs that are readable 

134 wlist: list 

135 list of sockets or FDs that are writable 

136 xlist: list 

137 list of sockets or FDs that had error events (rare) 

138 """ 

139 if timeout is None: 

140 timeout = -1 

141 # Convert from sec -> ms for zmq_poll. 

142 # zmq_poll accepts 3.x style timeout in ms 

143 timeout = int(timeout * 1000.0) 

144 if timeout < 0: 

145 timeout = -1 

146 sockets = [] 

147 for s in set(rlist + wlist + xlist): 

148 flags = 0 

149 if s in rlist: 

150 flags |= POLLIN 

151 if s in wlist: 

152 flags |= POLLOUT 

153 if s in xlist: 

154 flags |= POLLERR 

155 sockets.append((s, flags)) 

156 return_sockets = zmq_poll(sockets, timeout) 

157 rlist, wlist, xlist = [], [], [] 

158 for s, flags in return_sockets: 

159 if flags & POLLIN: 

160 rlist.append(s) 

161 if flags & POLLOUT: 

162 wlist.append(s) 

163 if flags & POLLERR: 

164 xlist.append(s) 

165 return rlist, wlist, xlist 

166 

167 

168# ----------------------------------------------------------------------------- 

169# Symbols to export 

170# ----------------------------------------------------------------------------- 

171 

172__all__ = ['Poller', 'select']