Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/jupyter_client/channels.py: 32%
179 statements
« prev ^ index » next coverage.py v7.2.7, created at 2023-07-01 06:54 +0000
« prev ^ index » next coverage.py v7.2.7, created at 2023-07-01 06:54 +0000
1"""Base classes to manage a Client's interaction with a running kernel"""
2# Copyright (c) Jupyter Development Team.
3# Distributed under the terms of the Modified BSD License.
4import asyncio
5import atexit
6import time
7import typing as t
8from queue import Empty
9from threading import Event, Thread
11import zmq.asyncio
12from jupyter_core.utils import ensure_async
14from ._version import protocol_version_info
15from .channelsabc import HBChannelABC
16from .session import Session
18# import ZMQError in top-level namespace, to avoid ugly attribute-error messages
19# during garbage collection of threads at exit
21# -----------------------------------------------------------------------------
22# Constants and exceptions
23# -----------------------------------------------------------------------------
25major_protocol_version = protocol_version_info[0]
28class InvalidPortNumber(Exception): # noqa
29 """An exception raised for an invalid port number."""
31 pass
34class HBChannel(Thread):
35 """The heartbeat channel which monitors the kernel heartbeat.
37 Note that the heartbeat channel is paused by default. As long as you start
38 this channel, the kernel manager will ensure that it is paused and un-paused
39 as appropriate.
40 """
42 session = None
43 socket = None
44 address = None
45 _exiting = False
47 time_to_dead: float = 1.0
48 _running = None
49 _pause = None
50 _beating = None
52 def __init__(
53 self,
54 context: t.Optional[zmq.Context] = None,
55 session: t.Optional[Session] = None,
56 address: t.Union[t.Tuple[str, int], str] = "",
57 ):
58 """Create the heartbeat monitor thread.
60 Parameters
61 ----------
62 context : :class:`zmq.Context`
63 The ZMQ context to use.
64 session : :class:`session.Session`
65 The session to use.
66 address : zmq url
67 Standard (ip, port) tuple that the kernel is listening on.
68 """
69 super().__init__()
70 self.daemon = True
72 self.context = context
73 self.session = session
74 if isinstance(address, tuple):
75 if address[1] == 0:
76 message = "The port number for a channel cannot be 0."
77 raise InvalidPortNumber(message)
78 address_str = "tcp://%s:%i" % address
79 else:
80 address_str = address
81 self.address = address_str
83 # running is False until `.start()` is called
84 self._running = False
85 self._exit = Event()
86 # don't start paused
87 self._pause = False
88 self.poller = zmq.Poller()
90 @staticmethod
91 @atexit.register
92 def _notice_exit() -> None:
93 # Class definitions can be torn down during interpreter shutdown.
94 # We only need to set _exiting flag if this hasn't happened.
95 if HBChannel is not None:
96 HBChannel._exiting = True
98 def _create_socket(self) -> None:
99 if self.socket is not None:
100 # close previous socket, before opening a new one
101 self.poller.unregister(self.socket)
102 self.socket.close()
103 assert self.context is not None
104 self.socket = self.context.socket(zmq.REQ)
105 self.socket.linger = 1000
106 assert self.address is not None
107 self.socket.connect(self.address)
109 self.poller.register(self.socket, zmq.POLLIN)
111 async def _async_run(self) -> None:
112 """The thread's main activity. Call start() instead."""
113 self._create_socket()
114 self._running = True
115 self._beating = True
116 assert self.socket is not None
118 while self._running:
119 if self._pause:
120 # just sleep, and skip the rest of the loop
121 self._exit.wait(self.time_to_dead)
122 continue
124 since_last_heartbeat = 0.0
125 # no need to catch EFSM here, because the previous event was
126 # either a recv or connect, which cannot be followed by EFSM)
127 await ensure_async(self.socket.send(b"ping"))
128 request_time = time.time()
129 # Wait until timeout
130 self._exit.wait(self.time_to_dead)
131 # poll(0) means return immediately (see http://api.zeromq.org/2-1:zmq-poll)
132 self._beating = bool(self.poller.poll(0))
133 if self._beating:
134 # the poll above guarantees we have something to recv
135 await ensure_async(self.socket.recv())
136 continue
137 elif self._running:
138 # nothing was received within the time limit, signal heart failure
139 since_last_heartbeat = time.time() - request_time
140 self.call_handlers(since_last_heartbeat)
141 # and close/reopen the socket, because the REQ/REP cycle has been broken
142 self._create_socket()
143 continue
145 def run(self) -> None:
146 """Run the heartbeat thread."""
147 loop = asyncio.new_event_loop()
148 asyncio.set_event_loop(loop)
149 loop.run_until_complete(self._async_run())
150 loop.close()
152 def pause(self) -> None:
153 """Pause the heartbeat."""
154 self._pause = True
156 def unpause(self) -> None:
157 """Unpause the heartbeat."""
158 self._pause = False
160 def is_beating(self) -> bool:
161 """Is the heartbeat running and responsive (and not paused)."""
162 if self.is_alive() and not self._pause and self._beating: # noqa
163 return True
164 else:
165 return False
167 def stop(self) -> None:
168 """Stop the channel's event loop and join its thread."""
169 self._running = False
170 self._exit.set()
171 self.join()
172 self.close()
174 def close(self) -> None:
175 """Close the heartbeat thread."""
176 if self.socket is not None:
177 try:
178 self.socket.close(linger=0)
179 except Exception:
180 pass
181 self.socket = None
183 def call_handlers(self, since_last_heartbeat: float) -> None:
184 """This method is called in the ioloop thread when a message arrives.
186 Subclasses should override this method to handle incoming messages.
187 It is important to remember that this method is called in the thread
188 so that some logic must be done to ensure that the application level
189 handlers are called in the application thread.
190 """
191 pass
194HBChannelABC.register(HBChannel)
197class ZMQSocketChannel:
198 """A ZMQ socket wrapper"""
200 def __init__(self, socket: zmq.Socket, session: Session, loop: t.Any = None) -> None:
201 """Create a channel.
203 Parameters
204 ----------
205 socket : :class:`zmq.Socket`
206 The ZMQ socket to use.
207 session : :class:`session.Session`
208 The session to use.
209 loop
210 Unused here, for other implementations
211 """
212 super().__init__()
214 self.socket: t.Optional[zmq.Socket] = socket
215 self.session = session
217 def _recv(self, **kwargs: t.Any) -> t.Dict[str, t.Any]:
218 assert self.socket is not None
219 msg = self.socket.recv_multipart(**kwargs)
220 ident, smsg = self.session.feed_identities(msg)
221 return self.session.deserialize(smsg)
223 def get_msg(self, timeout: t.Optional[float] = None) -> t.Dict[str, t.Any]:
224 """Gets a message if there is one that is ready."""
225 assert self.socket is not None
226 if timeout is not None:
227 timeout *= 1000 # seconds to ms
228 ready = self.socket.poll(timeout)
229 if ready:
230 res = self._recv()
231 return res
232 else:
233 raise Empty
235 def get_msgs(self) -> t.List[t.Dict[str, t.Any]]:
236 """Get all messages that are currently ready."""
237 msgs = []
238 while True:
239 try:
240 msgs.append(self.get_msg())
241 except Empty:
242 break
243 return msgs
245 def msg_ready(self) -> bool:
246 """Is there a message that has been received?"""
247 assert self.socket is not None
248 return bool(self.socket.poll(timeout=0))
250 def close(self) -> None:
251 """Close the socket channel."""
252 if self.socket is not None:
253 try:
254 self.socket.close(linger=0)
255 except Exception:
256 pass
257 self.socket = None
259 stop = close
261 def is_alive(self) -> bool:
262 """Test whether the channel is alive."""
263 return self.socket is not None
265 def send(self, msg: t.Dict[str, t.Any]) -> None:
266 """Pass a message to the ZMQ socket to send"""
267 assert self.socket is not None
268 self.session.send(self.socket, msg)
270 def start(self) -> None:
271 """Start the socket channel."""
272 pass
275class AsyncZMQSocketChannel(ZMQSocketChannel):
276 """A ZMQ socket in an async API"""
278 socket: zmq.asyncio.Socket
280 def __init__(self, socket: zmq.asyncio.Socket, session: Session, loop: t.Any = None) -> None:
281 """Create a channel.
283 Parameters
284 ----------
285 socket : :class:`zmq.asyncio.Socket`
286 The ZMQ socket to use.
287 session : :class:`session.Session`
288 The session to use.
289 loop
290 Unused here, for other implementations
291 """
292 if not isinstance(socket, zmq.asyncio.Socket):
293 msg = 'Socket must be asyncio'
294 raise ValueError(msg)
295 super().__init__(socket, session)
297 async def _recv(self, **kwargs: t.Any) -> t.Dict[str, t.Any]: # type:ignore[override]
298 assert self.socket is not None
299 msg = await self.socket.recv_multipart(**kwargs)
300 _, smsg = self.session.feed_identities(msg)
301 return self.session.deserialize(smsg)
303 async def get_msg( # type:ignore[override]
304 self, timeout: t.Optional[float] = None
305 ) -> t.Dict[str, t.Any]:
306 """Gets a message if there is one that is ready."""
307 assert self.socket is not None
308 if timeout is not None:
309 timeout *= 1000 # seconds to ms
310 ready = await self.socket.poll(timeout)
311 if ready:
312 res = await self._recv()
313 return res
314 else:
315 raise Empty
317 async def get_msgs(self) -> t.List[t.Dict[str, t.Any]]: # type:ignore[override]
318 """Get all messages that are currently ready."""
319 msgs = []
320 while True:
321 try:
322 msgs.append(await self.get_msg())
323 except Empty:
324 break
325 return msgs
327 async def msg_ready(self) -> bool: # type:ignore[override]
328 """Is there a message that has been received?"""
329 assert self.socket is not None
330 return bool(await self.socket.poll(timeout=0))