Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/zmq/asyncio.py: 44%
86 statements
« prev ^ index » next coverage.py v7.2.7, created at 2023-07-01 06:54 +0000
« prev ^ index » next coverage.py v7.2.7, created at 2023-07-01 06:54 +0000
1"""AsyncIO support for zmq
3Requires asyncio and Python 3.
4"""
6# Copyright (c) PyZMQ Developers.
7# Distributed under the terms of the Modified BSD License.
9import asyncio
10import selectors
11import sys
12import warnings
13from asyncio import Future, SelectorEventLoop
14from weakref import WeakKeyDictionary
16import zmq as _zmq
17from zmq import _future
19# registry of asyncio loop : selector thread
20_selectors: WeakKeyDictionary = WeakKeyDictionary()
23class ProactorSelectorThreadWarning(RuntimeWarning):
24 """Warning class for notifying about the extra thread spawned by tornado
26 We automatically support proactor via tornado's AddThreadSelectorEventLoop"""
29def _get_selector_windows(
30 asyncio_loop,
31) -> asyncio.AbstractEventLoop:
32 """Get selector-compatible loop
34 Returns an object with ``add_reader`` family of methods,
35 either the loop itself or a SelectorThread instance.
37 Workaround Windows proactor removal of
38 *reader methods, which we need for zmq sockets.
39 """
41 if asyncio_loop in _selectors:
42 return _selectors[asyncio_loop]
44 # detect add_reader instead of checking for proactor?
45 if hasattr(asyncio, "ProactorEventLoop") and isinstance(
46 asyncio_loop, asyncio.ProactorEventLoop # type: ignore
47 ):
48 try:
49 from tornado.platform.asyncio import AddThreadSelectorEventLoop
50 except ImportError:
51 raise RuntimeError(
52 "Proactor event loop does not implement add_reader family of methods required for zmq."
53 " zmq will work with proactor if tornado >= 6.1 can be found."
54 " Use `asyncio.set_event_loop_policy(WindowsSelectorEventLoopPolicy())`"
55 " or install 'tornado>=6.1' to avoid this error."
56 )
58 warnings.warn(
59 "Proactor event loop does not implement add_reader family of methods required for zmq."
60 " Registering an additional selector thread for add_reader support via tornado."
61 " Use `asyncio.set_event_loop_policy(WindowsSelectorEventLoopPolicy())`"
62 " to avoid this warning.",
63 RuntimeWarning,
64 # stacklevel 5 matches most likely zmq.asyncio.Context().socket()
65 stacklevel=5,
66 )
68 selector_loop = _selectors[asyncio_loop] = AddThreadSelectorEventLoop(asyncio_loop) # type: ignore
70 # patch loop.close to also close the selector thread
71 loop_close = asyncio_loop.close
73 def _close_selector_and_loop():
74 # restore original before calling selector.close,
75 # which in turn calls eventloop.close!
76 asyncio_loop.close = loop_close
77 _selectors.pop(asyncio_loop, None)
78 selector_loop.close()
80 asyncio_loop.close = _close_selector_and_loop # type: ignore # mypy bug - assign a function to method
81 return selector_loop
82 else:
83 return asyncio_loop
86def _get_selector_noop(loop) -> asyncio.AbstractEventLoop:
87 """no-op on non-Windows"""
88 return loop
91if sys.platform == "win32":
92 _get_selector = _get_selector_windows
93else:
94 _get_selector = _get_selector_noop
97class _AsyncIO:
98 _Future = Future
99 _WRITE = selectors.EVENT_WRITE
100 _READ = selectors.EVENT_READ
102 def _default_loop(self):
103 if sys.version_info >= (3, 7):
104 try:
105 return asyncio.get_running_loop()
106 except RuntimeError:
107 warnings.warn(
108 "No running event loop. zmq.asyncio should be used from within an asyncio loop.",
109 RuntimeWarning,
110 stacklevel=4,
111 )
112 # get_event_loop deprecated in 3.10:
113 return asyncio.get_event_loop()
116class Poller(_AsyncIO, _future._AsyncPoller):
117 """Poller returning asyncio.Future for poll results."""
119 def _watch_raw_socket(self, loop, socket, evt, f):
120 """Schedule callback for a raw socket"""
121 selector = _get_selector(loop)
122 if evt & self._READ:
123 selector.add_reader(socket, lambda *args: f())
124 if evt & self._WRITE:
125 selector.add_writer(socket, lambda *args: f())
127 def _unwatch_raw_sockets(self, loop, *sockets):
128 """Unschedule callback for a raw socket"""
129 selector = _get_selector(loop)
130 for socket in sockets:
131 selector.remove_reader(socket)
132 selector.remove_writer(socket)
135class Socket(_AsyncIO, _future._AsyncSocket):
136 """Socket returning asyncio Futures for send/recv/poll methods."""
138 _poller_class = Poller
140 def _get_selector(self, io_loop=None):
141 if io_loop is None:
142 io_loop = self._get_loop()
143 return _get_selector(io_loop)
145 def _init_io_state(self, io_loop=None):
146 """initialize the ioloop event handler"""
147 self._get_selector(io_loop).add_reader(
148 self._fd, lambda: self._handle_events(0, 0)
149 )
151 def _clear_io_state(self):
152 """clear any ioloop event handler
154 called once at close
155 """
156 loop = self._current_loop
157 if loop and not loop.is_closed() and self._fd != -1:
158 self._get_selector(loop).remove_reader(self._fd)
161Poller._socket_class = Socket
164class Context(_zmq.Context[Socket]):
165 """Context for creating asyncio-compatible Sockets"""
167 _socket_class = Socket
169 # avoid sharing instance with base Context class
170 _instance = None
173class ZMQEventLoop(SelectorEventLoop):
174 """DEPRECATED: AsyncIO eventloop using zmq_poll.
176 pyzmq sockets should work with any asyncio event loop as of pyzmq 17.
177 """
179 def __init__(self, selector=None):
180 _deprecated()
181 return super().__init__(selector)
184_loop = None
187def _deprecated():
188 if _deprecated.called: # type: ignore
189 return
190 _deprecated.called = True # type: ignore
192 warnings.warn(
193 "ZMQEventLoop and zmq.asyncio.install are deprecated in pyzmq 17. Special eventloop integration is no longer needed.",
194 DeprecationWarning,
195 stacklevel=3,
196 )
199_deprecated.called = False # type: ignore
202def install():
203 """DEPRECATED: No longer needed in pyzmq 17"""
204 _deprecated()
207__all__ = [
208 "Context",
209 "Socket",
210 "Poller",
211 "ZMQEventLoop",
212 "install",
213]