Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.10/site-packages/zmq/sugar/poll.py: 24%
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1"""0MQ polling related functions and classes."""
3# Copyright (C) PyZMQ Developers
4# Distributed under the terms of the Modified BSD License.
6from __future__ import annotations
8from typing import Any
10from zmq.backend import zmq_poll
11from zmq.constants import POLLERR, POLLIN, POLLOUT
13# -----------------------------------------------------------------------------
14# Polling related methods
15# -----------------------------------------------------------------------------
18class Poller:
19 """A stateful poll interface that mirrors Python's built-in poll."""
21 sockets: list[tuple[Any, int]]
22 _map: dict
24 def __init__(self) -> None:
25 self.sockets = []
26 self._map = {}
28 def __contains__(self, socket: Any) -> bool:
29 return socket in self._map
31 def register(self, socket: Any, flags: int = POLLIN | POLLOUT):
32 """p.register(socket, flags=POLLIN|POLLOUT)
34 Register a 0MQ socket or native fd for I/O monitoring.
36 register(s,0) is equivalent to unregister(s).
38 Parameters
39 ----------
40 socket : zmq.Socket or native socket
41 A zmq.Socket or any Python object having a ``fileno()``
42 method that returns a valid file descriptor.
43 flags : int
44 The events to watch for. Can be POLLIN, POLLOUT or POLLIN|POLLOUT.
45 If `flags=0`, socket will be unregistered.
46 """
47 if flags:
48 if socket in self._map:
49 idx = self._map[socket]
50 self.sockets[idx] = (socket, flags)
51 else:
52 idx = len(self.sockets)
53 self.sockets.append((socket, flags))
54 self._map[socket] = idx
55 elif socket in self._map:
56 # uregister sockets registered with no events
57 self.unregister(socket)
58 else:
59 # ignore new sockets with no events
60 pass
62 def modify(self, socket, flags=POLLIN | POLLOUT):
63 """Modify the flags for an already registered 0MQ socket or native fd."""
64 self.register(socket, flags)
66 def unregister(self, socket: Any):
67 """Remove a 0MQ socket or native fd for I/O monitoring.
69 Parameters
70 ----------
71 socket : Socket
72 The socket instance to stop polling.
73 """
74 idx = self._map.pop(socket)
75 self.sockets.pop(idx)
76 # shift indices after deletion
77 for socket, flags in self.sockets[idx:]:
78 self._map[socket] -= 1
80 def poll(self, timeout: int | None = None) -> list[tuple[Any, int]]:
81 """Poll the registered 0MQ or native fds for I/O.
83 If there are currently events ready to be processed, this function will return immediately.
84 Otherwise, this function will return as soon the first event is available or after timeout
85 milliseconds have elapsed.
87 Parameters
88 ----------
89 timeout : int
90 The timeout in milliseconds. If None, no `timeout` (infinite). This
91 is in milliseconds to be compatible with ``select.poll()``.
93 Returns
94 -------
95 events : list
96 The list of events that are ready to be processed.
97 This is a list of tuples of the form ``(socket, event_mask)``, where the 0MQ Socket
98 or integer fd is the first element, and the poll event mask (POLLIN, POLLOUT) is the second.
99 It is common to call ``events = dict(poller.poll())``,
100 which turns the list of tuples into a mapping of ``socket : event_mask``.
101 """
102 if timeout is None or timeout < 0:
103 timeout = -1
104 elif isinstance(timeout, float):
105 timeout = int(timeout)
106 return zmq_poll(self.sockets, timeout=timeout)
109def select(
110 rlist: list, wlist: list, xlist: list, timeout: float | None = None
111) -> tuple[list, list, list]:
112 """select(rlist, wlist, xlist, timeout=None) -> (rlist, wlist, xlist)
114 Return the result of poll as a lists of sockets ready for r/w/exception.
116 This has the same interface as Python's built-in ``select.select()`` function.
118 Parameters
119 ----------
120 timeout : float, optional
121 The timeout in seconds. If None, no timeout (infinite). This is in seconds to be
122 compatible with ``select.select()``.
123 rlist : list
124 sockets/FDs to be polled for read events
125 wlist : list
126 sockets/FDs to be polled for write events
127 xlist : list
128 sockets/FDs to be polled for error events
130 Returns
131 -------
132 rlist: list
133 list of sockets or FDs that are readable
134 wlist: list
135 list of sockets or FDs that are writable
136 xlist: list
137 list of sockets or FDs that had error events (rare)
138 """
139 if timeout is None:
140 timeout = -1
141 # Convert from sec -> ms for zmq_poll.
142 # zmq_poll accepts 3.x style timeout in ms
143 timeout = int(timeout * 1000.0)
144 if timeout < 0:
145 timeout = -1
146 sockets = []
147 for s in set(rlist + wlist + xlist):
148 flags = 0
149 if s in rlist:
150 flags |= POLLIN
151 if s in wlist:
152 flags |= POLLOUT
153 if s in xlist:
154 flags |= POLLERR
155 sockets.append((s, flags))
156 return_sockets = zmq_poll(sockets, timeout)
157 rlist, wlist, xlist = [], [], []
158 for s, flags in return_sockets:
159 if flags & POLLIN:
160 rlist.append(s)
161 if flags & POLLOUT:
162 wlist.append(s)
163 if flags & POLLERR:
164 xlist.append(s)
165 return rlist, wlist, xlist
168# -----------------------------------------------------------------------------
169# Symbols to export
170# -----------------------------------------------------------------------------
172__all__ = ['Poller', 'select']