Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/zmq/asyncio.py: 44%

86 statements  

« prev     ^ index     » next       coverage.py v7.2.7, created at 2023-07-01 06:54 +0000

1"""AsyncIO support for zmq 

2 

3Requires asyncio and Python 3. 

4""" 

5 

6# Copyright (c) PyZMQ Developers. 

7# Distributed under the terms of the Modified BSD License. 

8 

9import asyncio 

10import selectors 

11import sys 

12import warnings 

13from asyncio import Future, SelectorEventLoop 

14from weakref import WeakKeyDictionary 

15 

16import zmq as _zmq 

17from zmq import _future 

18 

19# registry of asyncio loop : selector thread 

20_selectors: WeakKeyDictionary = WeakKeyDictionary() 

21 

22 

23class ProactorSelectorThreadWarning(RuntimeWarning): 

24 """Warning class for notifying about the extra thread spawned by tornado 

25 

26 We automatically support proactor via tornado's AddThreadSelectorEventLoop""" 

27 

28 

29def _get_selector_windows( 

30 asyncio_loop, 

31) -> asyncio.AbstractEventLoop: 

32 """Get selector-compatible loop 

33 

34 Returns an object with ``add_reader`` family of methods, 

35 either the loop itself or a SelectorThread instance. 

36 

37 Workaround Windows proactor removal of 

38 *reader methods, which we need for zmq sockets. 

39 """ 

40 

41 if asyncio_loop in _selectors: 

42 return _selectors[asyncio_loop] 

43 

44 # detect add_reader instead of checking for proactor? 

45 if hasattr(asyncio, "ProactorEventLoop") and isinstance( 

46 asyncio_loop, asyncio.ProactorEventLoop # type: ignore 

47 ): 

48 try: 

49 from tornado.platform.asyncio import AddThreadSelectorEventLoop 

50 except ImportError: 

51 raise RuntimeError( 

52 "Proactor event loop does not implement add_reader family of methods required for zmq." 

53 " zmq will work with proactor if tornado >= 6.1 can be found." 

54 " Use `asyncio.set_event_loop_policy(WindowsSelectorEventLoopPolicy())`" 

55 " or install 'tornado>=6.1' to avoid this error." 

56 ) 

57 

58 warnings.warn( 

59 "Proactor event loop does not implement add_reader family of methods required for zmq." 

60 " Registering an additional selector thread for add_reader support via tornado." 

61 " Use `asyncio.set_event_loop_policy(WindowsSelectorEventLoopPolicy())`" 

62 " to avoid this warning.", 

63 RuntimeWarning, 

64 # stacklevel 5 matches most likely zmq.asyncio.Context().socket() 

65 stacklevel=5, 

66 ) 

67 

68 selector_loop = _selectors[asyncio_loop] = AddThreadSelectorEventLoop(asyncio_loop) # type: ignore 

69 

70 # patch loop.close to also close the selector thread 

71 loop_close = asyncio_loop.close 

72 

73 def _close_selector_and_loop(): 

74 # restore original before calling selector.close, 

75 # which in turn calls eventloop.close! 

76 asyncio_loop.close = loop_close 

77 _selectors.pop(asyncio_loop, None) 

78 selector_loop.close() 

79 

80 asyncio_loop.close = _close_selector_and_loop # type: ignore # mypy bug - assign a function to method 

81 return selector_loop 

82 else: 

83 return asyncio_loop 

84 

85 

86def _get_selector_noop(loop) -> asyncio.AbstractEventLoop: 

87 """no-op on non-Windows""" 

88 return loop 

89 

90 

91if sys.platform == "win32": 

92 _get_selector = _get_selector_windows 

93else: 

94 _get_selector = _get_selector_noop 

95 

96 

97class _AsyncIO: 

98 _Future = Future 

99 _WRITE = selectors.EVENT_WRITE 

100 _READ = selectors.EVENT_READ 

101 

102 def _default_loop(self): 

103 if sys.version_info >= (3, 7): 

104 try: 

105 return asyncio.get_running_loop() 

106 except RuntimeError: 

107 warnings.warn( 

108 "No running event loop. zmq.asyncio should be used from within an asyncio loop.", 

109 RuntimeWarning, 

110 stacklevel=4, 

111 ) 

112 # get_event_loop deprecated in 3.10: 

113 return asyncio.get_event_loop() 

114 

115 

116class Poller(_AsyncIO, _future._AsyncPoller): 

117 """Poller returning asyncio.Future for poll results.""" 

118 

119 def _watch_raw_socket(self, loop, socket, evt, f): 

120 """Schedule callback for a raw socket""" 

121 selector = _get_selector(loop) 

122 if evt & self._READ: 

123 selector.add_reader(socket, lambda *args: f()) 

124 if evt & self._WRITE: 

125 selector.add_writer(socket, lambda *args: f()) 

126 

127 def _unwatch_raw_sockets(self, loop, *sockets): 

128 """Unschedule callback for a raw socket""" 

129 selector = _get_selector(loop) 

130 for socket in sockets: 

131 selector.remove_reader(socket) 

132 selector.remove_writer(socket) 

133 

134 

135class Socket(_AsyncIO, _future._AsyncSocket): 

136 """Socket returning asyncio Futures for send/recv/poll methods.""" 

137 

138 _poller_class = Poller 

139 

140 def _get_selector(self, io_loop=None): 

141 if io_loop is None: 

142 io_loop = self._get_loop() 

143 return _get_selector(io_loop) 

144 

145 def _init_io_state(self, io_loop=None): 

146 """initialize the ioloop event handler""" 

147 self._get_selector(io_loop).add_reader( 

148 self._fd, lambda: self._handle_events(0, 0) 

149 ) 

150 

151 def _clear_io_state(self): 

152 """clear any ioloop event handler 

153 

154 called once at close 

155 """ 

156 loop = self._current_loop 

157 if loop and not loop.is_closed() and self._fd != -1: 

158 self._get_selector(loop).remove_reader(self._fd) 

159 

160 

161Poller._socket_class = Socket 

162 

163 

164class Context(_zmq.Context[Socket]): 

165 """Context for creating asyncio-compatible Sockets""" 

166 

167 _socket_class = Socket 

168 

169 # avoid sharing instance with base Context class 

170 _instance = None 

171 

172 

173class ZMQEventLoop(SelectorEventLoop): 

174 """DEPRECATED: AsyncIO eventloop using zmq_poll. 

175 

176 pyzmq sockets should work with any asyncio event loop as of pyzmq 17. 

177 """ 

178 

179 def __init__(self, selector=None): 

180 _deprecated() 

181 return super().__init__(selector) 

182 

183 

184_loop = None 

185 

186 

187def _deprecated(): 

188 if _deprecated.called: # type: ignore 

189 return 

190 _deprecated.called = True # type: ignore 

191 

192 warnings.warn( 

193 "ZMQEventLoop and zmq.asyncio.install are deprecated in pyzmq 17. Special eventloop integration is no longer needed.", 

194 DeprecationWarning, 

195 stacklevel=3, 

196 ) 

197 

198 

199_deprecated.called = False # type: ignore 

200 

201 

202def install(): 

203 """DEPRECATED: No longer needed in pyzmq 17""" 

204 _deprecated() 

205 

206 

207__all__ = [ 

208 "Context", 

209 "Socket", 

210 "Poller", 

211 "ZMQEventLoop", 

212 "install", 

213]