Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/zmq/sugar/poll.py: 23%
62 statements
« prev ^ index » next coverage.py v7.2.7, created at 2023-07-01 06:54 +0000
« prev ^ index » next coverage.py v7.2.7, created at 2023-07-01 06:54 +0000
1"""0MQ polling related functions and classes."""
3# Copyright (C) PyZMQ Developers
4# Distributed under the terms of the Modified BSD License.
6from typing import Any, Dict, List, Optional, Tuple
8from zmq.backend import zmq_poll
9from zmq.constants import POLLERR, POLLIN, POLLOUT
11# -----------------------------------------------------------------------------
12# Polling related methods
13# -----------------------------------------------------------------------------
16class Poller:
17 """A stateful poll interface that mirrors Python's built-in poll."""
19 sockets: List[Tuple[Any, int]]
20 _map: Dict
22 def __init__(self) -> None:
23 self.sockets = []
24 self._map = {}
26 def __contains__(self, socket: Any) -> bool:
27 return socket in self._map
29 def register(self, socket: Any, flags: int = POLLIN | POLLOUT):
30 """p.register(socket, flags=POLLIN|POLLOUT)
32 Register a 0MQ socket or native fd for I/O monitoring.
34 register(s,0) is equivalent to unregister(s).
36 Parameters
37 ----------
38 socket : zmq.Socket or native socket
39 A zmq.Socket or any Python object having a ``fileno()``
40 method that returns a valid file descriptor.
41 flags : int
42 The events to watch for. Can be POLLIN, POLLOUT or POLLIN|POLLOUT.
43 If `flags=0`, socket will be unregistered.
44 """
45 if flags:
46 if socket in self._map:
47 idx = self._map[socket]
48 self.sockets[idx] = (socket, flags)
49 else:
50 idx = len(self.sockets)
51 self.sockets.append((socket, flags))
52 self._map[socket] = idx
53 elif socket in self._map:
54 # uregister sockets registered with no events
55 self.unregister(socket)
56 else:
57 # ignore new sockets with no events
58 pass
60 def modify(self, socket, flags=POLLIN | POLLOUT):
61 """Modify the flags for an already registered 0MQ socket or native fd."""
62 self.register(socket, flags)
64 def unregister(self, socket: Any):
65 """Remove a 0MQ socket or native fd for I/O monitoring.
67 Parameters
68 ----------
69 socket : Socket
70 The socket instance to stop polling.
71 """
72 idx = self._map.pop(socket)
73 self.sockets.pop(idx)
74 # shift indices after deletion
75 for socket, flags in self.sockets[idx:]:
76 self._map[socket] -= 1
78 def poll(self, timeout: Optional[int] = None) -> List[Tuple[Any, int]]:
79 """Poll the registered 0MQ or native fds for I/O.
81 If there are currently events ready to be processed, this function will return immediately.
82 Otherwise, this function will return as soon the first event is available or after timeout
83 milliseconds have elapsed.
85 Parameters
86 ----------
87 timeout : int
88 The timeout in milliseconds. If None, no `timeout` (infinite). This
89 is in milliseconds to be compatible with ``select.poll()``.
91 Returns
92 -------
93 events : list of tuples
94 The list of events that are ready to be processed.
95 This is a list of tuples of the form ``(socket, event_mask)``, where the 0MQ Socket
96 or integer fd is the first element, and the poll event mask (POLLIN, POLLOUT) is the second.
97 It is common to call ``events = dict(poller.poll())``,
98 which turns the list of tuples into a mapping of ``socket : event_mask``.
99 """
100 if timeout is None or timeout < 0:
101 timeout = -1
102 elif isinstance(timeout, float):
103 timeout = int(timeout)
104 return zmq_poll(self.sockets, timeout=timeout)
107def select(rlist: List, wlist: List, xlist: List, timeout: Optional[float] = None):
108 """select(rlist, wlist, xlist, timeout=None) -> (rlist, wlist, xlist)
110 Return the result of poll as a lists of sockets ready for r/w/exception.
112 This has the same interface as Python's built-in ``select.select()`` function.
114 Parameters
115 ----------
116 timeout : float, int, optional
117 The timeout in seconds. If None, no timeout (infinite). This is in seconds to be
118 compatible with ``select.select()``.
119 rlist : list of sockets/FDs
120 sockets/FDs to be polled for read events
121 wlist : list of sockets/FDs
122 sockets/FDs to be polled for write events
123 xlist : list of sockets/FDs
124 sockets/FDs to be polled for error events
126 Returns
127 -------
128 (rlist, wlist, xlist) : tuple of lists of sockets (length 3)
129 Lists correspond to sockets available for read/write/error events respectively.
130 """
131 if timeout is None:
132 timeout = -1
133 # Convert from sec -> ms for zmq_poll.
134 # zmq_poll accepts 3.x style timeout in ms
135 timeout = int(timeout * 1000.0)
136 if timeout < 0:
137 timeout = -1
138 sockets = []
139 for s in set(rlist + wlist + xlist):
140 flags = 0
141 if s in rlist:
142 flags |= POLLIN
143 if s in wlist:
144 flags |= POLLOUT
145 if s in xlist:
146 flags |= POLLERR
147 sockets.append((s, flags))
148 return_sockets = zmq_poll(sockets, timeout)
149 rlist, wlist, xlist = [], [], []
150 for s, flags in return_sockets:
151 if flags & POLLIN:
152 rlist.append(s)
153 if flags & POLLOUT:
154 wlist.append(s)
155 if flags & POLLERR:
156 xlist.append(s)
157 return rlist, wlist, xlist
160# -----------------------------------------------------------------------------
161# Symbols to export
162# -----------------------------------------------------------------------------
164__all__ = ['Poller', 'select']