Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/zmq/asyncio.py: 45%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

86 statements  

1"""AsyncIO support for zmq 

2 

3Requires asyncio and Python 3. 

4""" 

5 

6# Copyright (c) PyZMQ Developers. 

7# Distributed under the terms of the Modified BSD License. 

8from __future__ import annotations 

9 

10import asyncio 

11import selectors 

12import sys 

13import warnings 

14from asyncio import Future, SelectorEventLoop 

15from weakref import WeakKeyDictionary 

16 

17import zmq as _zmq 

18from zmq import _future 

19 

20# registry of asyncio loop : selector thread 

21_selectors: WeakKeyDictionary = WeakKeyDictionary() 

22 

23 

24class ProactorSelectorThreadWarning(RuntimeWarning): 

25 """Warning class for notifying about the extra thread spawned by tornado 

26 

27 We automatically support proactor via tornado's AddThreadSelectorEventLoop""" 

28 

29 

30def _get_selector_windows( 

31 asyncio_loop, 

32) -> asyncio.AbstractEventLoop: 

33 """Get selector-compatible loop 

34 

35 Returns an object with ``add_reader`` family of methods, 

36 either the loop itself or a SelectorThread instance. 

37 

38 Workaround Windows proactor removal of 

39 *reader methods, which we need for zmq sockets. 

40 """ 

41 

42 if asyncio_loop in _selectors: 

43 return _selectors[asyncio_loop] 

44 

45 # detect add_reader instead of checking for proactor? 

46 if hasattr(asyncio, "ProactorEventLoop") and isinstance( 

47 asyncio_loop, 

48 asyncio.ProactorEventLoop, # type: ignore 

49 ): 

50 try: 

51 from tornado.platform.asyncio import AddThreadSelectorEventLoop 

52 except ImportError: 

53 raise RuntimeError( 

54 "Proactor event loop does not implement add_reader family of methods required for zmq." 

55 " zmq will work with proactor if tornado >= 6.1 can be found." 

56 " Use `asyncio.set_event_loop_policy(WindowsSelectorEventLoopPolicy())`" 

57 " or install 'tornado>=6.1' to avoid this error." 

58 ) 

59 

60 warnings.warn( 

61 "Proactor event loop does not implement add_reader family of methods required for zmq." 

62 " Registering an additional selector thread for add_reader support via tornado." 

63 " Use `asyncio.set_event_loop_policy(WindowsSelectorEventLoopPolicy())`" 

64 " to avoid this warning.", 

65 RuntimeWarning, 

66 # stacklevel 5 matches most likely zmq.asyncio.Context().socket() 

67 stacklevel=5, 

68 ) 

69 

70 selector_loop = _selectors[asyncio_loop] = AddThreadSelectorEventLoop( 

71 asyncio_loop 

72 ) # type: ignore 

73 

74 # patch loop.close to also close the selector thread 

75 loop_close = asyncio_loop.close 

76 

77 def _close_selector_and_loop(): 

78 # restore original before calling selector.close, 

79 # which in turn calls eventloop.close! 

80 asyncio_loop.close = loop_close 

81 _selectors.pop(asyncio_loop, None) 

82 selector_loop.close() 

83 

84 asyncio_loop.close = _close_selector_and_loop # type: ignore # mypy bug - assign a function to method 

85 return selector_loop 

86 else: 

87 return asyncio_loop 

88 

89 

90def _get_selector_noop(loop) -> asyncio.AbstractEventLoop: 

91 """no-op on non-Windows""" 

92 return loop 

93 

94 

95if sys.platform == "win32": 

96 _get_selector = _get_selector_windows 

97else: 

98 _get_selector = _get_selector_noop 

99 

100 

101class _AsyncIO: 

102 _Future = Future 

103 _WRITE = selectors.EVENT_WRITE 

104 _READ = selectors.EVENT_READ 

105 

106 def _default_loop(self): 

107 try: 

108 return asyncio.get_running_loop() 

109 except RuntimeError: 

110 warnings.warn( 

111 "No running event loop. zmq.asyncio should be used from within an asyncio loop.", 

112 RuntimeWarning, 

113 stacklevel=4, 

114 ) 

115 # get_event_loop deprecated in 3.10: 

116 return asyncio.get_event_loop() 

117 

118 

119class Poller(_AsyncIO, _future._AsyncPoller): 

120 """Poller returning asyncio.Future for poll results.""" 

121 

122 def _watch_raw_socket(self, loop, socket, evt, f): 

123 """Schedule callback for a raw socket""" 

124 selector = _get_selector(loop) 

125 if evt & self._READ: 

126 selector.add_reader(socket, lambda *args: f()) 

127 if evt & self._WRITE: 

128 selector.add_writer(socket, lambda *args: f()) 

129 

130 def _unwatch_raw_sockets(self, loop, *sockets): 

131 """Unschedule callback for a raw socket""" 

132 selector = _get_selector(loop) 

133 for socket in sockets: 

134 selector.remove_reader(socket) 

135 selector.remove_writer(socket) 

136 

137 

138class Socket(_AsyncIO, _future._AsyncSocket): 

139 """Socket returning asyncio Futures for send/recv/poll methods.""" 

140 

141 _poller_class = Poller 

142 

143 def _get_selector(self, io_loop=None): 

144 if io_loop is None: 

145 io_loop = self._get_loop() 

146 return _get_selector(io_loop) 

147 

148 def _init_io_state(self, io_loop=None): 

149 """initialize the ioloop event handler""" 

150 self._get_selector(io_loop).add_reader( 

151 self._fd, lambda: self._handle_events(0, 0) 

152 ) 

153 

154 def _clear_io_state(self): 

155 """clear any ioloop event handler 

156 

157 called once at close 

158 """ 

159 loop = self._current_loop 

160 if loop and not loop.is_closed() and self._fd != -1: 

161 self._get_selector(loop).remove_reader(self._fd) 

162 

163 

164Poller._socket_class = Socket 

165 

166 

167class Context(_zmq.Context[Socket]): 

168 """Context for creating asyncio-compatible Sockets""" 

169 

170 _socket_class = Socket 

171 

172 # avoid sharing instance with base Context class 

173 _instance = None 

174 

175 

176class ZMQEventLoop(SelectorEventLoop): 

177 """DEPRECATED: AsyncIO eventloop using zmq_poll. 

178 

179 pyzmq sockets should work with any asyncio event loop as of pyzmq 17. 

180 """ 

181 

182 def __init__(self, selector=None): 

183 _deprecated() 

184 return super().__init__(selector) 

185 

186 

187_loop = None 

188 

189 

190def _deprecated(): 

191 if _deprecated.called: # type: ignore 

192 return 

193 _deprecated.called = True # type: ignore 

194 

195 warnings.warn( 

196 "ZMQEventLoop and zmq.asyncio.install are deprecated in pyzmq 17. Special eventloop integration is no longer needed.", 

197 DeprecationWarning, 

198 stacklevel=3, 

199 ) 

200 

201 

202_deprecated.called = False # type: ignore 

203 

204 

205def install(): 

206 """DEPRECATED: No longer needed in pyzmq 17""" 

207 _deprecated() 

208 

209 

210__all__ = [ 

211 "Context", 

212 "Socket", 

213 "Poller", 

214 "ZMQEventLoop", 

215 "install", 

216]