Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/zmq/asyncio.py: 46%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

89 statements  

1"""AsyncIO support for zmq 

2 

3Requires asyncio and Python 3. 

4""" 

5 

6# Copyright (c) PyZMQ Developers. 

7# Distributed under the terms of the Modified BSD License. 

8from __future__ import annotations 

9 

10import asyncio 

11import selectors 

12import sys 

13import warnings 

14from asyncio import Future, SelectorEventLoop 

15from weakref import WeakKeyDictionary 

16 

17import zmq as _zmq 

18from zmq import _future 

19 

20# registry of asyncio loop : selector thread 

21_selectors: WeakKeyDictionary = WeakKeyDictionary() 

22 

23 

24class ProactorSelectorThreadWarning(RuntimeWarning): 

25 """Warning class for notifying about the extra thread spawned by tornado 

26 

27 We automatically support proactor via tornado's AddThreadSelectorEventLoop""" 

28 

29 

30def _get_selector_windows( 

31 asyncio_loop, 

32) -> asyncio.AbstractEventLoop: 

33 """Get selector-compatible loop 

34 

35 Returns an object with ``add_reader`` family of methods, 

36 either the loop itself or a SelectorThread instance. 

37 

38 Workaround Windows proactor removal of 

39 *reader methods, which we need for zmq sockets. 

40 """ 

41 

42 if asyncio_loop in _selectors: 

43 return _selectors[asyncio_loop] 

44 

45 # detect add_reader instead of checking for proactor? 

46 if hasattr(asyncio, "ProactorEventLoop") and isinstance( 

47 asyncio_loop, 

48 asyncio.ProactorEventLoop, # type: ignore 

49 ): 

50 try: 

51 from tornado.platform.asyncio import AddThreadSelectorEventLoop 

52 except ImportError: 

53 raise RuntimeError( 

54 "Proactor event loop does not implement add_reader family of methods required for zmq." 

55 " zmq will work with proactor if tornado >= 6.1 can be found." 

56 " Use `asyncio.set_event_loop_policy(WindowsSelectorEventLoopPolicy())`" 

57 " or install 'tornado>=6.1' to avoid this error." 

58 ) 

59 

60 warnings.warn( 

61 "Proactor event loop does not implement add_reader family of methods required for zmq." 

62 " Registering an additional selector thread for add_reader support via tornado." 

63 " Use `asyncio.set_event_loop_policy(WindowsSelectorEventLoopPolicy())`" 

64 " to avoid this warning.", 

65 RuntimeWarning, 

66 # stacklevel 5 matches most likely zmq.asyncio.Context().socket() 

67 stacklevel=5, 

68 ) 

69 

70 selector_loop = _selectors[asyncio_loop] = AddThreadSelectorEventLoop( 

71 asyncio_loop 

72 ) # type: ignore 

73 

74 # patch loop.close to also close the selector thread 

75 loop_close = asyncio_loop.close 

76 

77 def _close_selector_and_loop(): 

78 # restore original before calling selector.close, 

79 # which in turn calls eventloop.close! 

80 asyncio_loop.close = loop_close 

81 _selectors.pop(asyncio_loop, None) 

82 selector_loop.close() 

83 

84 asyncio_loop.close = _close_selector_and_loop # type: ignore # mypy bug - assign a function to method 

85 return selector_loop 

86 else: 

87 return asyncio_loop 

88 

89 

90def _get_selector_noop(loop) -> asyncio.AbstractEventLoop: 

91 """no-op on non-Windows""" 

92 return loop 

93 

94 

95if sys.platform == "win32": 

96 _get_selector = _get_selector_windows 

97else: 

98 _get_selector = _get_selector_noop 

99 

100 

101class _AsyncIO: 

102 _Future = Future 

103 _WRITE = selectors.EVENT_WRITE 

104 _READ = selectors.EVENT_READ 

105 

106 def _default_loop(self): 

107 try: 

108 return asyncio.get_running_loop() 

109 except RuntimeError: 

110 warnings.warn( 

111 "No running event loop. zmq.asyncio should be used from within an asyncio loop.", 

112 RuntimeWarning, 

113 stacklevel=4, 

114 ) 

115 # get_event_loop deprecated in 3.10: 

116 return asyncio.get_event_loop() 

117 

118 

119class Poller(_AsyncIO, _future._AsyncPoller): 

120 """Poller returning asyncio.Future for poll results.""" 

121 

122 def _watch_raw_socket(self, loop, socket, evt, f): 

123 """Schedule callback for a raw socket""" 

124 selector = _get_selector(loop) 

125 if evt & self._READ: 

126 selector.add_reader(socket, lambda *args: f()) 

127 if evt & self._WRITE: 

128 selector.add_writer(socket, lambda *args: f()) 

129 

130 def _unwatch_raw_sockets(self, loop, *sockets): 

131 """Unschedule callback for a raw socket""" 

132 selector = _get_selector(loop) 

133 for socket in sockets: 

134 selector.remove_reader(socket) 

135 selector.remove_writer(socket) 

136 

137 

138class Socket(_AsyncIO, _future._AsyncSocket): 

139 """Socket returning asyncio Futures for send/recv/poll methods.""" 

140 

141 _poller_class = Poller 

142 

143 def _get_selector(self, io_loop=None): 

144 if io_loop is None: 

145 io_loop = self._get_loop() 

146 return _get_selector(io_loop) 

147 

148 def _init_io_state(self, io_loop=None): 

149 """initialize the ioloop event handler""" 

150 self._get_selector(io_loop).add_reader( 

151 self._fd, lambda: self._handle_events(0, 0) 

152 ) 

153 

154 def _clear_io_state(self): 

155 """clear any ioloop event handler 

156 

157 called once at close 

158 """ 

159 loop = self._current_loop 

160 if loop and not loop.is_closed() and self._fd != -1: 

161 self._get_selector(loop).remove_reader(self._fd) 

162 

163 

164Poller._socket_class = Socket 

165 

166 

167class Context(_zmq.Context[Socket]): 

168 """Context for creating asyncio-compatible Sockets""" 

169 

170 _socket_class = Socket 

171 

172 # avoid sharing instance with base Context class 

173 _instance = None 

174 

175 # overload with no changes to satisfy pyright 

176 def __init__( 

177 self: Context, 

178 io_threads: int | _zmq.Context = 1, 

179 shadow: _zmq.Context | int = 0, 

180 ) -> None: 

181 super().__init__(io_threads, shadow) # type: ignore 

182 

183 

184class ZMQEventLoop(SelectorEventLoop): 

185 """DEPRECATED: AsyncIO eventloop using zmq_poll. 

186 

187 pyzmq sockets should work with any asyncio event loop as of pyzmq 17. 

188 """ 

189 

190 def __init__(self, selector=None): 

191 _deprecated() 

192 return super().__init__(selector) 

193 

194 

195_loop = None 

196 

197 

198def _deprecated(): 

199 if _deprecated.called: # type: ignore 

200 return 

201 _deprecated.called = True # type: ignore 

202 

203 warnings.warn( 

204 "ZMQEventLoop and zmq.asyncio.install are deprecated in pyzmq 17. Special eventloop integration is no longer needed.", 

205 DeprecationWarning, 

206 stacklevel=3, 

207 ) 

208 

209 

210_deprecated.called = False # type: ignore 

211 

212 

213def install(): 

214 """DEPRECATED: No longer needed in pyzmq 17""" 

215 _deprecated() 

216 

217 

218__all__ = [ 

219 "Context", 

220 "Socket", 

221 "Poller", 

222 "ZMQEventLoop", 

223 "install", 

224]