1"""Base class to manage the interaction with a running kernel"""
2
3# Copyright (c) Jupyter Development Team.
4# Distributed under the terms of the Modified BSD License.
5import asyncio
6import inspect
7import sys
8import time
9import typing as t
10from functools import partial
11from getpass import getpass
12from queue import Empty
13
14import zmq.asyncio
15from jupyter_core.utils import ensure_async
16from traitlets import Any, Bool, Instance, Type
17
18from .channels import major_protocol_version
19from .channelsabc import ChannelABC, HBChannelABC
20from .clientabc import KernelClientABC
21from .connect import ConnectionFileMixin
22from .session import Session
23
24# some utilities to validate message structure, these might get moved elsewhere
25# if they prove to have more generic utility
26
27
28def validate_string_dict(dct: t.Dict[str, str]) -> None:
29 """Validate that the input is a dict with string keys and values.
30
31 Raises ValueError if not."""
32 for k, v in dct.items():
33 if not isinstance(k, str):
34 raise ValueError("key %r in dict must be a string" % k)
35 if not isinstance(v, str):
36 raise ValueError("value %r in dict must be a string" % v)
37
38
39def reqrep(wrapped: t.Callable, meth: t.Callable, channel: str = "shell") -> t.Callable:
40 wrapped = wrapped(meth, channel)
41 if not meth.__doc__:
42 # python -OO removes docstrings,
43 # so don't bother building the wrapped docstring
44 return wrapped
45
46 basedoc, _ = meth.__doc__.split("Returns\n", 1)
47 parts = [basedoc.strip()]
48 if "Parameters" not in basedoc:
49 parts.append(
50 """
51 Parameters
52 ----------
53 """
54 )
55 parts.append(
56 """
57 reply: bool (default: False)
58 Whether to wait for and return reply
59 timeout: float or None (default: None)
60 Timeout to use when waiting for a reply
61
62 Returns
63 -------
64 msg_id: str
65 The msg_id of the request sent, if reply=False (default)
66 reply: dict
67 The reply message for this request, if reply=True
68 """
69 )
70 wrapped.__doc__ = "\n".join(parts)
71 return wrapped
72
73
74class KernelClient(ConnectionFileMixin):
75 """Communicates with a single kernel on any host via zmq channels.
76
77 There are five channels associated with each kernel:
78
79 * shell: for request/reply calls to the kernel.
80 * iopub: for the kernel to publish results to frontends.
81 * hb: for monitoring the kernel's heartbeat.
82 * stdin: for frontends to reply to raw_input calls in the kernel.
83 * control: for kernel management calls to the kernel.
84
85 The messages that can be sent on these channels are exposed as methods of the
86 client (KernelClient.execute, complete, history, etc.). These methods only
87 send the message, they don't wait for a reply. To get results, use e.g.
88 :meth:`get_shell_msg` to fetch messages from the shell channel.
89 """
90
91 # The PyZMQ Context to use for communication with the kernel.
92 context = Instance(zmq.Context)
93
94 _created_context = Bool(False)
95
96 def _context_default(self) -> zmq.Context:
97 self._created_context = True
98 return zmq.Context()
99
100 # The classes to use for the various channels
101 shell_channel_class = Type(ChannelABC)
102 iopub_channel_class = Type(ChannelABC)
103 stdin_channel_class = Type(ChannelABC)
104 hb_channel_class = Type(HBChannelABC)
105 control_channel_class = Type(ChannelABC)
106
107 # Protected traits
108 _shell_channel = Any()
109 _iopub_channel = Any()
110 _stdin_channel = Any()
111 _hb_channel = Any()
112 _control_channel = Any()
113
114 # flag for whether execute requests should be allowed to call raw_input:
115 allow_stdin: bool = True
116
117 def __del__(self) -> None:
118 """Handle garbage collection. Destroy context if applicable."""
119 if (
120 self._created_context
121 and self.context is not None # type:ignore[redundant-expr]
122 and not self.context.closed
123 ):
124 if self.channels_running:
125 if self.log:
126 self.log.warning("Could not destroy zmq context for %s", self)
127 else:
128 if self.log:
129 self.log.debug("Destroying zmq context for %s", self)
130 self.context.destroy(linger=100)
131 try:
132 super_del = super().__del__ # type:ignore[misc]
133 except AttributeError:
134 pass
135 else:
136 super_del()
137
138 # --------------------------------------------------------------------------
139 # Channel proxy methods
140 # --------------------------------------------------------------------------
141
142 async def _async_get_shell_msg(self, *args: t.Any, **kwargs: t.Any) -> t.Dict[str, t.Any]:
143 """Get a message from the shell channel"""
144 return await ensure_async(self.shell_channel.get_msg(*args, **kwargs))
145
146 async def _async_get_iopub_msg(self, *args: t.Any, **kwargs: t.Any) -> t.Dict[str, t.Any]:
147 """Get a message from the iopub channel"""
148 return await ensure_async(self.iopub_channel.get_msg(*args, **kwargs))
149
150 async def _async_get_stdin_msg(self, *args: t.Any, **kwargs: t.Any) -> t.Dict[str, t.Any]:
151 """Get a message from the stdin channel"""
152 return await ensure_async(self.stdin_channel.get_msg(*args, **kwargs))
153
154 async def _async_get_control_msg(self, *args: t.Any, **kwargs: t.Any) -> t.Dict[str, t.Any]:
155 """Get a message from the control channel"""
156 return await ensure_async(self.control_channel.get_msg(*args, **kwargs))
157
158 async def _async_wait_for_ready(self, timeout: float | None = None) -> None:
159 """Waits for a response when a client is blocked
160
161 - Sets future time for timeout
162 - Blocks on shell channel until a message is received
163 - Exit if the kernel has died
164 - If client times out before receiving a message from the kernel, send RuntimeError
165 - Flush the IOPub channel
166 """
167 if timeout is None:
168 timeout = float("inf")
169 abs_timeout = time.time() + timeout
170
171 from .manager import KernelManager
172
173 if not isinstance(self.parent, KernelManager):
174 # This Client was not created by a KernelManager,
175 # so wait for kernel to become responsive to heartbeats
176 # before checking for kernel_info reply
177 while not await self._async_is_alive():
178 if time.time() > abs_timeout:
179 raise RuntimeError(
180 "Kernel didn't respond to heartbeats in %d seconds and timed out" % timeout
181 )
182 await asyncio.sleep(0.2)
183
184 # Wait for kernel info reply on shell channel
185 while True:
186 self.kernel_info()
187 try:
188 msg = await ensure_async(self.shell_channel.get_msg(timeout=1))
189 except Empty:
190 pass
191 else:
192 if msg["msg_type"] == "kernel_info_reply":
193 # Checking that IOPub is connected. If it is not connected, start over.
194 try:
195 await ensure_async(self.iopub_channel.get_msg(timeout=0.2))
196 except Empty:
197 pass
198 else:
199 self._handle_kernel_info_reply(msg)
200 break
201
202 if not await self._async_is_alive():
203 msg = "Kernel died before replying to kernel_info"
204 raise RuntimeError(msg)
205
206 # Check if current time is ready check time plus timeout
207 if time.time() > abs_timeout:
208 raise RuntimeError("Kernel didn't respond in %d seconds" % timeout)
209
210 # Flush IOPub channel
211 while True:
212 try:
213 msg = await ensure_async(self.iopub_channel.get_msg(timeout=0.2))
214 except Empty:
215 break
216
217 async def _async_recv_reply(
218 self, msg_id: str, timeout: float | None = None, channel: str = "shell"
219 ) -> t.Dict[str, t.Any]:
220 """Receive and return the reply for a given request"""
221 if timeout is not None:
222 deadline = time.monotonic() + timeout
223 while True:
224 if timeout is not None:
225 timeout = max(0, deadline - time.monotonic())
226 try:
227 if channel == "control":
228 reply = await self._async_get_control_msg(timeout=timeout)
229 else:
230 reply = await self._async_get_shell_msg(timeout=timeout)
231 except Empty as e:
232 msg = "Timeout waiting for reply"
233 raise TimeoutError(msg) from e
234 if reply["parent_header"].get("msg_id") != msg_id:
235 # not my reply, someone may have forgotten to retrieve theirs
236 continue
237 return reply
238
239 async def _stdin_hook_default(self, msg: t.Dict[str, t.Any]) -> None:
240 """Handle an input request"""
241 content = msg["content"]
242 prompt = getpass if content.get("password", False) else input
243
244 try:
245 raw_data = prompt(content["prompt"])
246 except EOFError:
247 # turn EOFError into EOF character
248 raw_data = "\x04"
249 except KeyboardInterrupt:
250 sys.stdout.write("\n")
251 return
252
253 # only send stdin reply if there *was not* another request
254 # or execution finished while we were reading.
255 if not (await self.stdin_channel.msg_ready() or await self.shell_channel.msg_ready()):
256 self.input(raw_data)
257
258 def _output_hook_default(self, msg: t.Dict[str, t.Any]) -> None:
259 """Default hook for redisplaying plain-text output"""
260 msg_type = msg["header"]["msg_type"]
261 content = msg["content"]
262 if msg_type == "stream":
263 stream = getattr(sys, content["name"])
264 stream.write(content["text"])
265 elif msg_type in ("display_data", "execute_result"):
266 sys.stdout.write(content["data"].get("text/plain", ""))
267 elif msg_type == "error":
268 sys.stderr.write("\n".join(content["traceback"]))
269
270 def _output_hook_kernel(
271 self,
272 session: Session,
273 socket: zmq.sugar.socket.Socket,
274 parent_header: t.Any,
275 msg: t.Dict[str, t.Any],
276 ) -> None:
277 """Output hook when running inside an IPython kernel
278
279 adds rich output support.
280 """
281 msg_type = msg["header"]["msg_type"]
282 if msg_type in ("display_data", "execute_result", "error"):
283 session.send(socket, msg_type, msg["content"], parent=parent_header)
284 else:
285 self._output_hook_default(msg)
286
287 # --------------------------------------------------------------------------
288 # Channel management methods
289 # --------------------------------------------------------------------------
290
291 def start_channels(
292 self,
293 shell: bool = True,
294 iopub: bool = True,
295 stdin: bool = True,
296 hb: bool = True,
297 control: bool = True,
298 ) -> None:
299 """Starts the channels for this kernel.
300
301 This will create the channels if they do not exist and then start
302 them (their activity runs in a thread). If port numbers of 0 are
303 being used (random ports) then you must first call
304 :meth:`start_kernel`. If the channels have been stopped and you
305 call this, :class:`RuntimeError` will be raised.
306 """
307 if iopub:
308 self.iopub_channel.start()
309 if shell:
310 self.shell_channel.start()
311 if stdin:
312 self.stdin_channel.start()
313 self.allow_stdin = True
314 else:
315 self.allow_stdin = False
316 if hb:
317 self.hb_channel.start()
318 if control:
319 self.control_channel.start()
320
321 def stop_channels(self) -> None:
322 """Stops all the running channels for this kernel.
323
324 This stops their event loops and joins their threads.
325 """
326 if self.shell_channel.is_alive():
327 self.shell_channel.stop()
328 if self.iopub_channel.is_alive():
329 self.iopub_channel.stop()
330 if self.stdin_channel.is_alive():
331 self.stdin_channel.stop()
332 if self.hb_channel.is_alive():
333 self.hb_channel.stop()
334 if self.control_channel.is_alive():
335 self.control_channel.stop()
336
337 if self._created_context and not self.context.closed:
338 self.context.destroy(linger=100)
339
340 @property
341 def channels_running(self) -> bool:
342 """Are any of the channels created and running?"""
343 return (
344 (self._shell_channel and self.shell_channel.is_alive())
345 or (self._iopub_channel and self.iopub_channel.is_alive())
346 or (self._stdin_channel and self.stdin_channel.is_alive())
347 or (self._hb_channel and self.hb_channel.is_alive())
348 or (self._control_channel and self.control_channel.is_alive())
349 )
350
351 ioloop = None # Overridden in subclasses that use pyzmq event loop
352
353 @property
354 def shell_channel(self) -> t.Any:
355 """Get the shell channel object for this kernel."""
356 if self._shell_channel is None:
357 url = self._make_url("shell")
358 self.log.debug("connecting shell channel to %s", url)
359 socket = self.connect_shell(identity=self.session.bsession)
360 self._shell_channel = self.shell_channel_class( # type:ignore[call-arg,abstract]
361 socket, self.session, self.ioloop
362 )
363 return self._shell_channel
364
365 @property
366 def iopub_channel(self) -> t.Any:
367 """Get the iopub channel object for this kernel."""
368 if self._iopub_channel is None:
369 url = self._make_url("iopub")
370 self.log.debug("connecting iopub channel to %s", url)
371 socket = self.connect_iopub()
372 self._iopub_channel = self.iopub_channel_class( # type:ignore[call-arg,abstract]
373 socket, self.session, self.ioloop
374 )
375 return self._iopub_channel
376
377 @property
378 def stdin_channel(self) -> t.Any:
379 """Get the stdin channel object for this kernel."""
380 if self._stdin_channel is None:
381 url = self._make_url("stdin")
382 self.log.debug("connecting stdin channel to %s", url)
383 socket = self.connect_stdin(identity=self.session.bsession)
384 self._stdin_channel = self.stdin_channel_class( # type:ignore[call-arg,abstract]
385 socket, self.session, self.ioloop
386 )
387 return self._stdin_channel
388
389 @property
390 def hb_channel(self) -> t.Any:
391 """Get the hb channel object for this kernel."""
392 if self._hb_channel is None:
393 url = self._make_url("hb")
394 self.log.debug("connecting heartbeat channel to %s", url)
395 self._hb_channel = self.hb_channel_class( # type:ignore[call-arg,abstract]
396 self.context, self.session, url
397 )
398 return self._hb_channel
399
400 @property
401 def control_channel(self) -> t.Any:
402 """Get the control channel object for this kernel."""
403 if self._control_channel is None:
404 url = self._make_url("control")
405 self.log.debug("connecting control channel to %s", url)
406 socket = self.connect_control(identity=self.session.bsession)
407 self._control_channel = self.control_channel_class( # type:ignore[call-arg,abstract]
408 socket, self.session, self.ioloop
409 )
410 return self._control_channel
411
412 async def _async_is_alive(self) -> bool:
413 """Is the kernel process still running?"""
414 from .manager import KernelManager
415
416 if isinstance(self.parent, KernelManager):
417 # This KernelClient was created by a KernelManager,
418 # we can ask the parent KernelManager:
419 return await self.parent._async_is_alive()
420 if self._hb_channel is not None:
421 # We don't have access to the KernelManager,
422 # so we use the heartbeat.
423 return self._hb_channel.is_beating()
424 # no heartbeat and not local, we can't tell if it's running,
425 # so naively return True
426 return True
427
428 async def _async_execute_interactive(
429 self,
430 code: str,
431 silent: bool = False,
432 store_history: bool = True,
433 user_expressions: t.Dict[str, t.Any] | None = None,
434 allow_stdin: bool | None = None,
435 stop_on_error: bool = True,
436 timeout: float | None = None,
437 output_hook: t.Callable | None = None,
438 stdin_hook: t.Callable | None = None,
439 ) -> t.Dict[str, t.Any]:
440 """Execute code in the kernel interactively
441
442 Output will be redisplayed, and stdin prompts will be relayed as well.
443 If an IPython kernel is detected, rich output will be displayed.
444
445 You can pass a custom output_hook callable that will be called
446 with every IOPub message that is produced instead of the default redisplay.
447
448 .. versionadded:: 5.0
449
450 Parameters
451 ----------
452 code : str
453 A string of code in the kernel's language.
454
455 silent : bool, optional (default False)
456 If set, the kernel will execute the code as quietly possible, and
457 will force store_history to be False.
458
459 store_history : bool, optional (default True)
460 If set, the kernel will store command history. This is forced
461 to be False if silent is True.
462
463 user_expressions : dict, optional
464 A dict mapping names to expressions to be evaluated in the user's
465 dict. The expression values are returned as strings formatted using
466 :func:`repr`.
467
468 allow_stdin : bool, optional (default self.allow_stdin)
469 Flag for whether the kernel can send stdin requests to frontends.
470
471 Some frontends (e.g. the Notebook) do not support stdin requests.
472 If raw_input is called from code executed from such a frontend, a
473 StdinNotImplementedError will be raised.
474
475 stop_on_error: bool, optional (default True)
476 Flag whether to abort the execution queue, if an exception is encountered.
477
478 timeout: float or None (default: None)
479 Timeout to use when waiting for a reply
480
481 output_hook: callable(msg)
482 Function to be called with output messages.
483 If not specified, output will be redisplayed.
484
485 stdin_hook: callable(msg)
486 Function or awaitable to be called with stdin_request messages.
487 If not specified, input/getpass will be called.
488
489 Returns
490 -------
491 reply: dict
492 The reply message for this request
493 """
494 if not self.iopub_channel.is_alive():
495 emsg = "IOPub channel must be running to receive output"
496 raise RuntimeError(emsg)
497 if allow_stdin is None:
498 allow_stdin = self.allow_stdin
499 if allow_stdin and not self.stdin_channel.is_alive():
500 emsg = "stdin channel must be running to allow input"
501 raise RuntimeError(emsg)
502 msg_id = await ensure_async(
503 self.execute(
504 code,
505 silent=silent,
506 store_history=store_history,
507 user_expressions=user_expressions,
508 allow_stdin=allow_stdin,
509 stop_on_error=stop_on_error,
510 )
511 )
512 if stdin_hook is None:
513 stdin_hook = self._stdin_hook_default
514 # detect IPython kernel
515 if output_hook is None and "IPython" in sys.modules:
516 from IPython import get_ipython
517
518 ip = get_ipython() # type:ignore[no-untyped-call]
519 in_kernel = getattr(ip, "kernel", False)
520 if in_kernel:
521 output_hook = partial(
522 self._output_hook_kernel,
523 ip.display_pub.session,
524 ip.display_pub.pub_socket,
525 ip.display_pub.parent_header,
526 )
527 if output_hook is None:
528 # default: redisplay plain-text outputs
529 output_hook = self._output_hook_default
530
531 # set deadline based on timeout
532 if timeout is not None:
533 deadline = time.monotonic() + timeout
534 else:
535 timeout_ms = None
536
537 poller = zmq.asyncio.Poller()
538 iopub_socket = self.iopub_channel.socket
539 poller.register(iopub_socket, zmq.POLLIN)
540 if allow_stdin:
541 stdin_socket = self.stdin_channel.socket
542 poller.register(stdin_socket, zmq.POLLIN)
543 else:
544 stdin_socket = None
545
546 # wait for output and redisplay it
547 while True:
548 if timeout is not None:
549 timeout = max(0, deadline - time.monotonic())
550 timeout_ms = int(1000 * timeout)
551 events = dict(await poller.poll(timeout_ms))
552 if not events:
553 emsg = "Timeout waiting for output"
554 raise TimeoutError(emsg)
555 if stdin_socket in events:
556 req = await ensure_async(self.stdin_channel.get_msg(timeout=0))
557 res = stdin_hook(req)
558 if inspect.isawaitable(res):
559 await res
560 continue
561 if iopub_socket not in events:
562 continue
563
564 msg = await ensure_async(self.iopub_channel.get_msg(timeout=0))
565
566 if msg["parent_header"].get("msg_id") != msg_id:
567 # not from my request
568 continue
569 output_hook(msg)
570
571 # stop on idle
572 if (
573 msg["header"]["msg_type"] == "status"
574 and msg["content"]["execution_state"] == "idle"
575 ):
576 break
577
578 # output is done, get the reply
579 if timeout is not None:
580 timeout = max(0, deadline - time.monotonic())
581 return await self._async_recv_reply(msg_id, timeout=timeout)
582
583 # Methods to send specific messages on channels
584 def execute(
585 self,
586 code: str,
587 silent: bool = False,
588 store_history: bool = True,
589 user_expressions: t.Dict[str, t.Any] | None = None,
590 allow_stdin: bool | None = None,
591 stop_on_error: bool = True,
592 ) -> str:
593 """Execute code in the kernel.
594
595 Parameters
596 ----------
597 code : str
598 A string of code in the kernel's language.
599
600 silent : bool, optional (default False)
601 If set, the kernel will execute the code as quietly possible, and
602 will force store_history to be False.
603
604 store_history : bool, optional (default True)
605 If set, the kernel will store command history. This is forced
606 to be False if silent is True.
607
608 user_expressions : dict, optional
609 A dict mapping names to expressions to be evaluated in the user's
610 dict. The expression values are returned as strings formatted using
611 :func:`repr`.
612
613 allow_stdin : bool, optional (default self.allow_stdin)
614 Flag for whether the kernel can send stdin requests to frontends.
615
616 Some frontends (e.g. the Notebook) do not support stdin requests.
617 If raw_input is called from code executed from such a frontend, a
618 StdinNotImplementedError will be raised.
619
620 stop_on_error: bool, optional (default True)
621 Flag whether to abort the execution queue, if an exception is encountered.
622
623 Returns
624 -------
625 The msg_id of the message sent.
626 """
627 if user_expressions is None:
628 user_expressions = {}
629 if allow_stdin is None:
630 allow_stdin = self.allow_stdin
631
632 # Don't waste network traffic if inputs are invalid
633 if not isinstance(code, str):
634 raise ValueError("code %r must be a string" % code)
635 validate_string_dict(user_expressions)
636
637 # Create class for content/msg creation. Related to, but possibly
638 # not in Session.
639 content = {
640 "code": code,
641 "silent": silent,
642 "store_history": store_history,
643 "user_expressions": user_expressions,
644 "allow_stdin": allow_stdin,
645 "stop_on_error": stop_on_error,
646 }
647 msg = self.session.msg("execute_request", content)
648 self.shell_channel.send(msg)
649 return msg["header"]["msg_id"]
650
651 def complete(self, code: str, cursor_pos: int | None = None) -> str:
652 """Tab complete text in the kernel's namespace.
653
654 Parameters
655 ----------
656 code : str
657 The context in which completion is requested.
658 Can be anything between a variable name and an entire cell.
659 cursor_pos : int, optional
660 The position of the cursor in the block of code where the completion was requested.
661 Default: ``len(code)``
662
663 Returns
664 -------
665 The msg_id of the message sent.
666 """
667 if cursor_pos is None:
668 cursor_pos = len(code)
669 content = {"code": code, "cursor_pos": cursor_pos}
670 msg = self.session.msg("complete_request", content)
671 self.shell_channel.send(msg)
672 return msg["header"]["msg_id"]
673
674 def inspect(self, code: str, cursor_pos: int | None = None, detail_level: int = 0) -> str:
675 """Get metadata information about an object in the kernel's namespace.
676
677 It is up to the kernel to determine the appropriate object to inspect.
678
679 Parameters
680 ----------
681 code : str
682 The context in which info is requested.
683 Can be anything between a variable name and an entire cell.
684 cursor_pos : int, optional
685 The position of the cursor in the block of code where the info was requested.
686 Default: ``len(code)``
687 detail_level : int, optional
688 The level of detail for the introspection (0-2)
689
690 Returns
691 -------
692 The msg_id of the message sent.
693 """
694 if cursor_pos is None:
695 cursor_pos = len(code)
696 content = {
697 "code": code,
698 "cursor_pos": cursor_pos,
699 "detail_level": detail_level,
700 }
701 msg = self.session.msg("inspect_request", content)
702 self.shell_channel.send(msg)
703 return msg["header"]["msg_id"]
704
705 def history(
706 self,
707 raw: bool = True,
708 output: bool = False,
709 hist_access_type: str = "range",
710 **kwargs: t.Any,
711 ) -> str:
712 """Get entries from the kernel's history list.
713
714 Parameters
715 ----------
716 raw : bool
717 If True, return the raw input.
718 output : bool
719 If True, then return the output as well.
720 hist_access_type : str
721 'range' (fill in session, start and stop params), 'tail' (fill in n)
722 or 'search' (fill in pattern param).
723
724 session : int
725 For a range request, the session from which to get lines. Session
726 numbers are positive integers; negative ones count back from the
727 current session.
728 start : int
729 The first line number of a history range.
730 stop : int
731 The final (excluded) line number of a history range.
732
733 n : int
734 The number of lines of history to get for a tail request.
735
736 pattern : str
737 The glob-syntax pattern for a search request.
738
739 Returns
740 -------
741 The ID of the message sent.
742 """
743 if hist_access_type == "range":
744 kwargs.setdefault("session", 0)
745 kwargs.setdefault("start", 0)
746 content = dict(raw=raw, output=output, hist_access_type=hist_access_type, **kwargs)
747 msg = self.session.msg("history_request", content)
748 self.shell_channel.send(msg)
749 return msg["header"]["msg_id"]
750
751 def kernel_info(self) -> str:
752 """Request kernel info
753
754 Returns
755 -------
756 The msg_id of the message sent
757 """
758 msg = self.session.msg("kernel_info_request")
759 self.shell_channel.send(msg)
760 return msg["header"]["msg_id"]
761
762 def comm_info(self, target_name: str | None = None) -> str:
763 """Request comm info
764
765 Returns
766 -------
767 The msg_id of the message sent
768 """
769 content = {} if target_name is None else {"target_name": target_name}
770 msg = self.session.msg("comm_info_request", content)
771 self.shell_channel.send(msg)
772 return msg["header"]["msg_id"]
773
774 def _handle_kernel_info_reply(self, msg: t.Dict[str, t.Any]) -> None:
775 """handle kernel info reply
776
777 sets protocol adaptation version. This might
778 be run from a separate thread.
779 """
780 adapt_version = int(msg["content"]["protocol_version"].split(".")[0])
781 if adapt_version != major_protocol_version:
782 self.session.adapt_version = adapt_version
783
784 def is_complete(self, code: str) -> str:
785 """Ask the kernel whether some code is complete and ready to execute.
786
787 Returns
788 -------
789 The ID of the message sent.
790 """
791 msg = self.session.msg("is_complete_request", {"code": code})
792 self.shell_channel.send(msg)
793 return msg["header"]["msg_id"]
794
795 def input(self, string: str) -> None:
796 """Send a string of raw input to the kernel.
797
798 This should only be called in response to the kernel sending an
799 ``input_request`` message on the stdin channel.
800
801 Returns
802 -------
803 The ID of the message sent.
804 """
805 content = {"value": string}
806 msg = self.session.msg("input_reply", content)
807 self.stdin_channel.send(msg)
808
809 def shutdown(self, restart: bool = False) -> str:
810 """Request an immediate kernel shutdown on the control channel.
811
812 Upon receipt of the (empty) reply, client code can safely assume that
813 the kernel has shut down and it's safe to forcefully terminate it if
814 it's still alive.
815
816 The kernel will send the reply via a function registered with Python's
817 atexit module, ensuring it's truly done as the kernel is done with all
818 normal operation.
819
820 Returns
821 -------
822 The msg_id of the message sent
823 """
824 # Send quit message to kernel. Once we implement kernel-side setattr,
825 # this should probably be done that way, but for now this will do.
826 msg = self.session.msg("shutdown_request", {"restart": restart})
827 self.control_channel.send(msg)
828 return msg["header"]["msg_id"]
829
830
831KernelClientABC.register(KernelClient)