Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/zmq/asyncio.py: 46%
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1"""AsyncIO support for zmq
3Requires asyncio and Python 3.
4"""
6# Copyright (c) PyZMQ Developers.
7# Distributed under the terms of the Modified BSD License.
8from __future__ import annotations
10import asyncio
11import selectors
12import sys
13import warnings
14from asyncio import Future, SelectorEventLoop
15from weakref import WeakKeyDictionary
17import zmq as _zmq
18from zmq import _future
20# registry of asyncio loop : selector thread
21_selectors: WeakKeyDictionary = WeakKeyDictionary()
24class ProactorSelectorThreadWarning(RuntimeWarning):
25 """Warning class for notifying about the extra thread spawned by tornado
27 We automatically support proactor via tornado's AddThreadSelectorEventLoop"""
30def _get_selector_windows(
31 asyncio_loop,
32) -> asyncio.AbstractEventLoop:
33 """Get selector-compatible loop
35 Returns an object with ``add_reader`` family of methods,
36 either the loop itself or a SelectorThread instance.
38 Workaround Windows proactor removal of
39 *reader methods, which we need for zmq sockets.
40 """
42 if asyncio_loop in _selectors:
43 return _selectors[asyncio_loop]
45 # detect add_reader instead of checking for proactor?
46 if hasattr(asyncio, "ProactorEventLoop") and isinstance(
47 asyncio_loop,
48 asyncio.ProactorEventLoop, # type: ignore
49 ):
50 try:
51 from tornado.platform.asyncio import AddThreadSelectorEventLoop
52 except ImportError:
53 raise RuntimeError(
54 "Proactor event loop does not implement add_reader family of methods required for zmq."
55 " zmq will work with proactor if tornado >= 6.1 can be found."
56 " Use `asyncio.set_event_loop_policy(WindowsSelectorEventLoopPolicy())`"
57 " or install 'tornado>=6.1' to avoid this error."
58 )
60 warnings.warn(
61 "Proactor event loop does not implement add_reader family of methods required for zmq."
62 " Registering an additional selector thread for add_reader support via tornado."
63 " Use `asyncio.set_event_loop_policy(WindowsSelectorEventLoopPolicy())`"
64 " to avoid this warning.",
65 RuntimeWarning,
66 # stacklevel 5 matches most likely zmq.asyncio.Context().socket()
67 stacklevel=5,
68 )
70 selector_loop = _selectors[asyncio_loop] = AddThreadSelectorEventLoop(
71 asyncio_loop
72 ) # type: ignore
74 # patch loop.close to also close the selector thread
75 loop_close = asyncio_loop.close
77 def _close_selector_and_loop():
78 # restore original before calling selector.close,
79 # which in turn calls eventloop.close!
80 asyncio_loop.close = loop_close
81 _selectors.pop(asyncio_loop, None)
82 selector_loop.close()
84 asyncio_loop.close = _close_selector_and_loop # type: ignore # mypy bug - assign a function to method
85 return selector_loop
86 else:
87 return asyncio_loop
90def _get_selector_noop(loop) -> asyncio.AbstractEventLoop:
91 """no-op on non-Windows"""
92 return loop
95if sys.platform == "win32":
96 _get_selector = _get_selector_windows
97else:
98 _get_selector = _get_selector_noop
101class _AsyncIO:
102 _Future = Future
103 _WRITE = selectors.EVENT_WRITE
104 _READ = selectors.EVENT_READ
106 def _default_loop(self):
107 try:
108 return asyncio.get_running_loop()
109 except RuntimeError:
110 warnings.warn(
111 "No running event loop. zmq.asyncio should be used from within an asyncio loop.",
112 RuntimeWarning,
113 stacklevel=4,
114 )
115 # get_event_loop deprecated in 3.10:
116 return asyncio.get_event_loop()
119class Poller(_AsyncIO, _future._AsyncPoller):
120 """Poller returning asyncio.Future for poll results."""
122 def _watch_raw_socket(self, loop, socket, evt, f):
123 """Schedule callback for a raw socket"""
124 selector = _get_selector(loop)
125 if evt & self._READ:
126 selector.add_reader(socket, lambda *args: f())
127 if evt & self._WRITE:
128 selector.add_writer(socket, lambda *args: f())
130 def _unwatch_raw_sockets(self, loop, *sockets):
131 """Unschedule callback for a raw socket"""
132 selector = _get_selector(loop)
133 for socket in sockets:
134 selector.remove_reader(socket)
135 selector.remove_writer(socket)
138class Socket(_AsyncIO, _future._AsyncSocket):
139 """Socket returning asyncio Futures for send/recv/poll methods."""
141 _poller_class = Poller
143 def _get_selector(self, io_loop=None):
144 if io_loop is None:
145 io_loop = self._get_loop()
146 return _get_selector(io_loop)
148 def _init_io_state(self, io_loop=None):
149 """initialize the ioloop event handler"""
150 self._get_selector(io_loop).add_reader(
151 self._fd, lambda: self._handle_events(0, 0)
152 )
154 def _clear_io_state(self):
155 """clear any ioloop event handler
157 called once at close
158 """
159 loop = self._current_loop
160 if loop and not loop.is_closed() and self._fd != -1:
161 self._get_selector(loop).remove_reader(self._fd)
164Poller._socket_class = Socket
167class Context(_zmq.Context[Socket]):
168 """Context for creating asyncio-compatible Sockets"""
170 _socket_class = Socket
172 # avoid sharing instance with base Context class
173 _instance = None
175 # overload with no changes to satisfy pyright
176 def __init__(
177 self: Context,
178 io_threads: int | _zmq.Context = 1,
179 shadow: _zmq.Context | int = 0,
180 ) -> None:
181 super().__init__(io_threads, shadow) # type: ignore
184class ZMQEventLoop(SelectorEventLoop):
185 """DEPRECATED: AsyncIO eventloop using zmq_poll.
187 pyzmq sockets should work with any asyncio event loop as of pyzmq 17.
188 """
190 def __init__(self, selector=None):
191 _deprecated()
192 return super().__init__(selector)
195_loop = None
198def _deprecated():
199 if _deprecated.called: # type: ignore
200 return
201 _deprecated.called = True # type: ignore
203 warnings.warn(
204 "ZMQEventLoop and zmq.asyncio.install are deprecated in pyzmq 17. Special eventloop integration is no longer needed.",
205 DeprecationWarning,
206 stacklevel=3,
207 )
210_deprecated.called = False # type: ignore
213def install():
214 """DEPRECATED: No longer needed in pyzmq 17"""
215 _deprecated()
218__all__ = [
219 "Context",
220 "Socket",
221 "Poller",
222 "ZMQEventLoop",
223 "install",
224]