1"""Base class to manage the interaction with a running kernel"""
2# Copyright (c) Jupyter Development Team.
3# Distributed under the terms of the Modified BSD License.
4import asyncio
5import inspect
6import sys
7import time
8import typing as t
9from functools import partial
10from getpass import getpass
11from queue import Empty
12
13import zmq.asyncio
14from jupyter_core.utils import ensure_async
15from traitlets import Any, Bool, Instance, Type
16
17from .channels import major_protocol_version
18from .channelsabc import ChannelABC, HBChannelABC
19from .clientabc import KernelClientABC
20from .connect import ConnectionFileMixin
21from .session import Session
22
23# some utilities to validate message structure, these might get moved elsewhere
24# if they prove to have more generic utility
25
26
27def validate_string_dict(dct: t.Dict[str, str]) -> None:
28 """Validate that the input is a dict with string keys and values.
29
30 Raises ValueError if not."""
31 for k, v in dct.items():
32 if not isinstance(k, str):
33 raise ValueError("key %r in dict must be a string" % k)
34 if not isinstance(v, str):
35 raise ValueError("value %r in dict must be a string" % v)
36
37
38def reqrep(wrapped: t.Callable, meth: t.Callable, channel: str = "shell") -> t.Callable:
39 wrapped = wrapped(meth, channel)
40 if not meth.__doc__:
41 # python -OO removes docstrings,
42 # so don't bother building the wrapped docstring
43 return wrapped
44
45 basedoc, _ = meth.__doc__.split("Returns\n", 1)
46 parts = [basedoc.strip()]
47 if "Parameters" not in basedoc:
48 parts.append(
49 """
50 Parameters
51 ----------
52 """
53 )
54 parts.append(
55 """
56 reply: bool (default: False)
57 Whether to wait for and return reply
58 timeout: float or None (default: None)
59 Timeout to use when waiting for a reply
60
61 Returns
62 -------
63 msg_id: str
64 The msg_id of the request sent, if reply=False (default)
65 reply: dict
66 The reply message for this request, if reply=True
67 """
68 )
69 wrapped.__doc__ = "\n".join(parts)
70 return wrapped
71
72
73class KernelClient(ConnectionFileMixin):
74 """Communicates with a single kernel on any host via zmq channels.
75
76 There are five channels associated with each kernel:
77
78 * shell: for request/reply calls to the kernel.
79 * iopub: for the kernel to publish results to frontends.
80 * hb: for monitoring the kernel's heartbeat.
81 * stdin: for frontends to reply to raw_input calls in the kernel.
82 * control: for kernel management calls to the kernel.
83
84 The messages that can be sent on these channels are exposed as methods of the
85 client (KernelClient.execute, complete, history, etc.). These methods only
86 send the message, they don't wait for a reply. To get results, use e.g.
87 :meth:`get_shell_msg` to fetch messages from the shell channel.
88 """
89
90 # The PyZMQ Context to use for communication with the kernel.
91 context = Instance(zmq.Context)
92
93 _created_context = Bool(False)
94
95 def _context_default(self) -> zmq.Context:
96 self._created_context = True
97 return zmq.Context()
98
99 # The classes to use for the various channels
100 shell_channel_class = Type(ChannelABC)
101 iopub_channel_class = Type(ChannelABC)
102 stdin_channel_class = Type(ChannelABC)
103 hb_channel_class = Type(HBChannelABC)
104 control_channel_class = Type(ChannelABC)
105
106 # Protected traits
107 _shell_channel = Any()
108 _iopub_channel = Any()
109 _stdin_channel = Any()
110 _hb_channel = Any()
111 _control_channel = Any()
112
113 # flag for whether execute requests should be allowed to call raw_input:
114 allow_stdin: bool = True
115
116 def __del__(self) -> None:
117 """Handle garbage collection. Destroy context if applicable."""
118 if (
119 self._created_context
120 and self.context is not None # type:ignore[redundant-expr]
121 and not self.context.closed
122 ):
123 if self.channels_running:
124 if self.log:
125 self.log.warning("Could not destroy zmq context for %s", self)
126 else:
127 if self.log:
128 self.log.debug("Destroying zmq context for %s", self)
129 self.context.destroy()
130 try:
131 super_del = super().__del__ # type:ignore[misc]
132 except AttributeError:
133 pass
134 else:
135 super_del()
136
137 # --------------------------------------------------------------------------
138 # Channel proxy methods
139 # --------------------------------------------------------------------------
140
141 async def _async_get_shell_msg(self, *args: t.Any, **kwargs: t.Any) -> t.Dict[str, t.Any]:
142 """Get a message from the shell channel"""
143 return await ensure_async(self.shell_channel.get_msg(*args, **kwargs))
144
145 async def _async_get_iopub_msg(self, *args: t.Any, **kwargs: t.Any) -> t.Dict[str, t.Any]:
146 """Get a message from the iopub channel"""
147 return await ensure_async(self.iopub_channel.get_msg(*args, **kwargs))
148
149 async def _async_get_stdin_msg(self, *args: t.Any, **kwargs: t.Any) -> t.Dict[str, t.Any]:
150 """Get a message from the stdin channel"""
151 return await ensure_async(self.stdin_channel.get_msg(*args, **kwargs))
152
153 async def _async_get_control_msg(self, *args: t.Any, **kwargs: t.Any) -> t.Dict[str, t.Any]:
154 """Get a message from the control channel"""
155 return await ensure_async(self.control_channel.get_msg(*args, **kwargs))
156
157 async def _async_wait_for_ready(self, timeout: t.Optional[float] = None) -> None:
158 """Waits for a response when a client is blocked
159
160 - Sets future time for timeout
161 - Blocks on shell channel until a message is received
162 - Exit if the kernel has died
163 - If client times out before receiving a message from the kernel, send RuntimeError
164 - Flush the IOPub channel
165 """
166 if timeout is None:
167 timeout = float("inf")
168 abs_timeout = time.time() + timeout
169
170 from .manager import KernelManager
171
172 if not isinstance(self.parent, KernelManager):
173 # This Client was not created by a KernelManager,
174 # so wait for kernel to become responsive to heartbeats
175 # before checking for kernel_info reply
176 while not await self._async_is_alive():
177 if time.time() > abs_timeout:
178 raise RuntimeError(
179 "Kernel didn't respond to heartbeats in %d seconds and timed out" % timeout
180 )
181 await asyncio.sleep(0.2)
182
183 # Wait for kernel info reply on shell channel
184 while True:
185 self.kernel_info()
186 try:
187 msg = await ensure_async(self.shell_channel.get_msg(timeout=1))
188 except Empty:
189 pass
190 else:
191 if msg["msg_type"] == "kernel_info_reply":
192 # Checking that IOPub is connected. If it is not connected, start over.
193 try:
194 await ensure_async(self.iopub_channel.get_msg(timeout=0.2))
195 except Empty:
196 pass
197 else:
198 self._handle_kernel_info_reply(msg)
199 break
200
201 if not await self._async_is_alive():
202 msg = "Kernel died before replying to kernel_info"
203 raise RuntimeError(msg)
204
205 # Check if current time is ready check time plus timeout
206 if time.time() > abs_timeout:
207 raise RuntimeError("Kernel didn't respond in %d seconds" % timeout)
208
209 # Flush IOPub channel
210 while True:
211 try:
212 msg = await ensure_async(self.iopub_channel.get_msg(timeout=0.2))
213 except Empty:
214 break
215
216 async def _async_recv_reply(
217 self, msg_id: str, timeout: t.Optional[float] = None, channel: str = "shell"
218 ) -> t.Dict[str, t.Any]:
219 """Receive and return the reply for a given request"""
220 if timeout is not None:
221 deadline = time.monotonic() + timeout
222 while True:
223 if timeout is not None:
224 timeout = max(0, deadline - time.monotonic())
225 try:
226 if channel == "control":
227 reply = await self._async_get_control_msg(timeout=timeout)
228 else:
229 reply = await self._async_get_shell_msg(timeout=timeout)
230 except Empty as e:
231 msg = "Timeout waiting for reply"
232 raise TimeoutError(msg) from e
233 if reply["parent_header"].get("msg_id") != msg_id:
234 # not my reply, someone may have forgotten to retrieve theirs
235 continue
236 return reply
237
238 async def _stdin_hook_default(self, msg: t.Dict[str, t.Any]) -> None:
239 """Handle an input request"""
240 content = msg["content"]
241 prompt = getpass if content.get("password", False) else input
242
243 try:
244 raw_data = prompt(content["prompt"]) # type:ignore[operator]
245 except EOFError:
246 # turn EOFError into EOF character
247 raw_data = "\x04"
248 except KeyboardInterrupt:
249 sys.stdout.write("\n")
250 return
251
252 # only send stdin reply if there *was not* another request
253 # or execution finished while we were reading.
254 if not (await self.stdin_channel.msg_ready() or await self.shell_channel.msg_ready()):
255 self.input(raw_data)
256
257 def _output_hook_default(self, msg: t.Dict[str, t.Any]) -> None:
258 """Default hook for redisplaying plain-text output"""
259 msg_type = msg["header"]["msg_type"]
260 content = msg["content"]
261 if msg_type == "stream":
262 stream = getattr(sys, content["name"])
263 stream.write(content["text"])
264 elif msg_type in ("display_data", "execute_result"):
265 sys.stdout.write(content["data"].get("text/plain", ""))
266 elif msg_type == "error":
267 sys.stderr.write("\n".join(content["traceback"]))
268
269 def _output_hook_kernel(
270 self,
271 session: Session,
272 socket: zmq.sugar.socket.Socket,
273 parent_header: t.Any,
274 msg: t.Dict[str, t.Any],
275 ) -> None:
276 """Output hook when running inside an IPython kernel
277
278 adds rich output support.
279 """
280 msg_type = msg["header"]["msg_type"]
281 if msg_type in ("display_data", "execute_result", "error"):
282 session.send(socket, msg_type, msg["content"], parent=parent_header)
283 else:
284 self._output_hook_default(msg)
285
286 # --------------------------------------------------------------------------
287 # Channel management methods
288 # --------------------------------------------------------------------------
289
290 def start_channels(
291 self,
292 shell: bool = True,
293 iopub: bool = True,
294 stdin: bool = True,
295 hb: bool = True,
296 control: bool = True,
297 ) -> None:
298 """Starts the channels for this kernel.
299
300 This will create the channels if they do not exist and then start
301 them (their activity runs in a thread). If port numbers of 0 are
302 being used (random ports) then you must first call
303 :meth:`start_kernel`. If the channels have been stopped and you
304 call this, :class:`RuntimeError` will be raised.
305 """
306 if iopub:
307 self.iopub_channel.start()
308 if shell:
309 self.shell_channel.start()
310 if stdin:
311 self.stdin_channel.start()
312 self.allow_stdin = True
313 else:
314 self.allow_stdin = False
315 if hb:
316 self.hb_channel.start()
317 if control:
318 self.control_channel.start()
319
320 def stop_channels(self) -> None:
321 """Stops all the running channels for this kernel.
322
323 This stops their event loops and joins their threads.
324 """
325 if self.shell_channel.is_alive():
326 self.shell_channel.stop()
327 if self.iopub_channel.is_alive():
328 self.iopub_channel.stop()
329 if self.stdin_channel.is_alive():
330 self.stdin_channel.stop()
331 if self.hb_channel.is_alive():
332 self.hb_channel.stop()
333 if self.control_channel.is_alive():
334 self.control_channel.stop()
335
336 @property
337 def channels_running(self) -> bool:
338 """Are any of the channels created and running?"""
339 return (
340 (self._shell_channel and self.shell_channel.is_alive())
341 or (self._iopub_channel and self.iopub_channel.is_alive())
342 or (self._stdin_channel and self.stdin_channel.is_alive())
343 or (self._hb_channel and self.hb_channel.is_alive())
344 or (self._control_channel and self.control_channel.is_alive())
345 )
346
347 ioloop = None # Overridden in subclasses that use pyzmq event loop
348
349 @property
350 def shell_channel(self) -> t.Any:
351 """Get the shell channel object for this kernel."""
352 if self._shell_channel is None:
353 url = self._make_url("shell")
354 self.log.debug("connecting shell channel to %s", url)
355 socket = self.connect_shell(identity=self.session.bsession)
356 self._shell_channel = self.shell_channel_class( # type:ignore[call-arg,abstract]
357 socket, self.session, self.ioloop
358 )
359 return self._shell_channel
360
361 @property
362 def iopub_channel(self) -> t.Any:
363 """Get the iopub channel object for this kernel."""
364 if self._iopub_channel is None:
365 url = self._make_url("iopub")
366 self.log.debug("connecting iopub channel to %s", url)
367 socket = self.connect_iopub()
368 self._iopub_channel = self.iopub_channel_class( # type:ignore[call-arg,abstract]
369 socket, self.session, self.ioloop
370 )
371 return self._iopub_channel
372
373 @property
374 def stdin_channel(self) -> t.Any:
375 """Get the stdin channel object for this kernel."""
376 if self._stdin_channel is None:
377 url = self._make_url("stdin")
378 self.log.debug("connecting stdin channel to %s", url)
379 socket = self.connect_stdin(identity=self.session.bsession)
380 self._stdin_channel = self.stdin_channel_class( # type:ignore[call-arg,abstract]
381 socket, self.session, self.ioloop
382 )
383 return self._stdin_channel
384
385 @property
386 def hb_channel(self) -> t.Any:
387 """Get the hb channel object for this kernel."""
388 if self._hb_channel is None:
389 url = self._make_url("hb")
390 self.log.debug("connecting heartbeat channel to %s", url)
391 self._hb_channel = self.hb_channel_class( # type:ignore[call-arg,abstract]
392 self.context, self.session, url
393 )
394 return self._hb_channel
395
396 @property
397 def control_channel(self) -> t.Any:
398 """Get the control channel object for this kernel."""
399 if self._control_channel is None:
400 url = self._make_url("control")
401 self.log.debug("connecting control channel to %s", url)
402 socket = self.connect_control(identity=self.session.bsession)
403 self._control_channel = self.control_channel_class( # type:ignore[call-arg,abstract]
404 socket, self.session, self.ioloop
405 )
406 return self._control_channel
407
408 async def _async_is_alive(self) -> bool:
409 """Is the kernel process still running?"""
410 from .manager import KernelManager
411
412 if isinstance(self.parent, KernelManager):
413 # This KernelClient was created by a KernelManager,
414 # we can ask the parent KernelManager:
415 return await self.parent._async_is_alive()
416 if self._hb_channel is not None:
417 # We don't have access to the KernelManager,
418 # so we use the heartbeat.
419 return self._hb_channel.is_beating()
420 # no heartbeat and not local, we can't tell if it's running,
421 # so naively return True
422 return True
423
424 async def _async_execute_interactive(
425 self,
426 code: str,
427 silent: bool = False,
428 store_history: bool = True,
429 user_expressions: t.Optional[t.Dict[str, t.Any]] = None,
430 allow_stdin: t.Optional[bool] = None,
431 stop_on_error: bool = True,
432 timeout: t.Optional[float] = None,
433 output_hook: t.Optional[t.Callable] = None,
434 stdin_hook: t.Optional[t.Callable] = None,
435 ) -> t.Dict[str, t.Any]:
436 """Execute code in the kernel interactively
437
438 Output will be redisplayed, and stdin prompts will be relayed as well.
439 If an IPython kernel is detected, rich output will be displayed.
440
441 You can pass a custom output_hook callable that will be called
442 with every IOPub message that is produced instead of the default redisplay.
443
444 .. versionadded:: 5.0
445
446 Parameters
447 ----------
448 code : str
449 A string of code in the kernel's language.
450
451 silent : bool, optional (default False)
452 If set, the kernel will execute the code as quietly possible, and
453 will force store_history to be False.
454
455 store_history : bool, optional (default True)
456 If set, the kernel will store command history. This is forced
457 to be False if silent is True.
458
459 user_expressions : dict, optional
460 A dict mapping names to expressions to be evaluated in the user's
461 dict. The expression values are returned as strings formatted using
462 :func:`repr`.
463
464 allow_stdin : bool, optional (default self.allow_stdin)
465 Flag for whether the kernel can send stdin requests to frontends.
466
467 Some frontends (e.g. the Notebook) do not support stdin requests.
468 If raw_input is called from code executed from such a frontend, a
469 StdinNotImplementedError will be raised.
470
471 stop_on_error: bool, optional (default True)
472 Flag whether to abort the execution queue, if an exception is encountered.
473
474 timeout: float or None (default: None)
475 Timeout to use when waiting for a reply
476
477 output_hook: callable(msg)
478 Function to be called with output messages.
479 If not specified, output will be redisplayed.
480
481 stdin_hook: callable(msg)
482 Function or awaitable to be called with stdin_request messages.
483 If not specified, input/getpass will be called.
484
485 Returns
486 -------
487 reply: dict
488 The reply message for this request
489 """
490 if not self.iopub_channel.is_alive():
491 emsg = "IOPub channel must be running to receive output"
492 raise RuntimeError(emsg)
493 if allow_stdin is None:
494 allow_stdin = self.allow_stdin
495 if allow_stdin and not self.stdin_channel.is_alive():
496 emsg = "stdin channel must be running to allow input"
497 raise RuntimeError(emsg)
498 msg_id = await ensure_async(
499 self.execute(
500 code,
501 silent=silent,
502 store_history=store_history,
503 user_expressions=user_expressions,
504 allow_stdin=allow_stdin,
505 stop_on_error=stop_on_error,
506 )
507 )
508 if stdin_hook is None:
509 stdin_hook = self._stdin_hook_default
510 # detect IPython kernel
511 if output_hook is None and "IPython" in sys.modules:
512 from IPython import get_ipython
513
514 ip = get_ipython() # type:ignore[no-untyped-call]
515 in_kernel = getattr(ip, "kernel", False)
516 if in_kernel:
517 output_hook = partial(
518 self._output_hook_kernel,
519 ip.display_pub.session,
520 ip.display_pub.pub_socket,
521 ip.display_pub.parent_header,
522 )
523 if output_hook is None:
524 # default: redisplay plain-text outputs
525 output_hook = self._output_hook_default
526
527 # set deadline based on timeout
528 if timeout is not None:
529 deadline = time.monotonic() + timeout
530 else:
531 timeout_ms = None
532
533 poller = zmq.asyncio.Poller()
534 iopub_socket = self.iopub_channel.socket
535 poller.register(iopub_socket, zmq.POLLIN)
536 if allow_stdin:
537 stdin_socket = self.stdin_channel.socket
538 poller.register(stdin_socket, zmq.POLLIN)
539 else:
540 stdin_socket = None
541
542 # wait for output and redisplay it
543 while True:
544 if timeout is not None:
545 timeout = max(0, deadline - time.monotonic())
546 timeout_ms = int(1000 * timeout)
547 events = dict(await poller.poll(timeout_ms))
548 if not events:
549 emsg = "Timeout waiting for output"
550 raise TimeoutError(emsg)
551 if stdin_socket in events:
552 req = await ensure_async(self.stdin_channel.get_msg(timeout=0))
553 res = stdin_hook(req)
554 if inspect.isawaitable(res):
555 await res
556 continue
557 if iopub_socket not in events:
558 continue
559
560 msg = await ensure_async(self.iopub_channel.get_msg(timeout=0))
561
562 if msg["parent_header"].get("msg_id") != msg_id:
563 # not from my request
564 continue
565 output_hook(msg)
566
567 # stop on idle
568 if (
569 msg["header"]["msg_type"] == "status"
570 and msg["content"]["execution_state"] == "idle"
571 ):
572 break
573
574 # output is done, get the reply
575 if timeout is not None:
576 timeout = max(0, deadline - time.monotonic())
577 return await self._async_recv_reply(msg_id, timeout=timeout)
578
579 # Methods to send specific messages on channels
580 def execute(
581 self,
582 code: str,
583 silent: bool = False,
584 store_history: bool = True,
585 user_expressions: t.Optional[t.Dict[str, t.Any]] = None,
586 allow_stdin: t.Optional[bool] = None,
587 stop_on_error: bool = True,
588 ) -> str:
589 """Execute code in the kernel.
590
591 Parameters
592 ----------
593 code : str
594 A string of code in the kernel's language.
595
596 silent : bool, optional (default False)
597 If set, the kernel will execute the code as quietly possible, and
598 will force store_history to be False.
599
600 store_history : bool, optional (default True)
601 If set, the kernel will store command history. This is forced
602 to be False if silent is True.
603
604 user_expressions : dict, optional
605 A dict mapping names to expressions to be evaluated in the user's
606 dict. The expression values are returned as strings formatted using
607 :func:`repr`.
608
609 allow_stdin : bool, optional (default self.allow_stdin)
610 Flag for whether the kernel can send stdin requests to frontends.
611
612 Some frontends (e.g. the Notebook) do not support stdin requests.
613 If raw_input is called from code executed from such a frontend, a
614 StdinNotImplementedError will be raised.
615
616 stop_on_error: bool, optional (default True)
617 Flag whether to abort the execution queue, if an exception is encountered.
618
619 Returns
620 -------
621 The msg_id of the message sent.
622 """
623 if user_expressions is None:
624 user_expressions = {}
625 if allow_stdin is None:
626 allow_stdin = self.allow_stdin
627
628 # Don't waste network traffic if inputs are invalid
629 if not isinstance(code, str):
630 raise ValueError("code %r must be a string" % code)
631 validate_string_dict(user_expressions)
632
633 # Create class for content/msg creation. Related to, but possibly
634 # not in Session.
635 content = {
636 "code": code,
637 "silent": silent,
638 "store_history": store_history,
639 "user_expressions": user_expressions,
640 "allow_stdin": allow_stdin,
641 "stop_on_error": stop_on_error,
642 }
643 msg = self.session.msg("execute_request", content)
644 self.shell_channel.send(msg)
645 return msg["header"]["msg_id"]
646
647 def complete(self, code: str, cursor_pos: t.Optional[int] = None) -> str:
648 """Tab complete text in the kernel's namespace.
649
650 Parameters
651 ----------
652 code : str
653 The context in which completion is requested.
654 Can be anything between a variable name and an entire cell.
655 cursor_pos : int, optional
656 The position of the cursor in the block of code where the completion was requested.
657 Default: ``len(code)``
658
659 Returns
660 -------
661 The msg_id of the message sent.
662 """
663 if cursor_pos is None:
664 cursor_pos = len(code)
665 content = {"code": code, "cursor_pos": cursor_pos}
666 msg = self.session.msg("complete_request", content)
667 self.shell_channel.send(msg)
668 return msg["header"]["msg_id"]
669
670 def inspect(self, code: str, cursor_pos: t.Optional[int] = None, detail_level: int = 0) -> str:
671 """Get metadata information about an object in the kernel's namespace.
672
673 It is up to the kernel to determine the appropriate object to inspect.
674
675 Parameters
676 ----------
677 code : str
678 The context in which info is requested.
679 Can be anything between a variable name and an entire cell.
680 cursor_pos : int, optional
681 The position of the cursor in the block of code where the info was requested.
682 Default: ``len(code)``
683 detail_level : int, optional
684 The level of detail for the introspection (0-2)
685
686 Returns
687 -------
688 The msg_id of the message sent.
689 """
690 if cursor_pos is None:
691 cursor_pos = len(code)
692 content = {
693 "code": code,
694 "cursor_pos": cursor_pos,
695 "detail_level": detail_level,
696 }
697 msg = self.session.msg("inspect_request", content)
698 self.shell_channel.send(msg)
699 return msg["header"]["msg_id"]
700
701 def history(
702 self,
703 raw: bool = True,
704 output: bool = False,
705 hist_access_type: str = "range",
706 **kwargs: t.Any,
707 ) -> str:
708 """Get entries from the kernel's history list.
709
710 Parameters
711 ----------
712 raw : bool
713 If True, return the raw input.
714 output : bool
715 If True, then return the output as well.
716 hist_access_type : str
717 'range' (fill in session, start and stop params), 'tail' (fill in n)
718 or 'search' (fill in pattern param).
719
720 session : int
721 For a range request, the session from which to get lines. Session
722 numbers are positive integers; negative ones count back from the
723 current session.
724 start : int
725 The first line number of a history range.
726 stop : int
727 The final (excluded) line number of a history range.
728
729 n : int
730 The number of lines of history to get for a tail request.
731
732 pattern : str
733 The glob-syntax pattern for a search request.
734
735 Returns
736 -------
737 The ID of the message sent.
738 """
739 if hist_access_type == "range":
740 kwargs.setdefault("session", 0)
741 kwargs.setdefault("start", 0)
742 content = dict(raw=raw, output=output, hist_access_type=hist_access_type, **kwargs)
743 msg = self.session.msg("history_request", content)
744 self.shell_channel.send(msg)
745 return msg["header"]["msg_id"]
746
747 def kernel_info(self) -> str:
748 """Request kernel info
749
750 Returns
751 -------
752 The msg_id of the message sent
753 """
754 msg = self.session.msg("kernel_info_request")
755 self.shell_channel.send(msg)
756 return msg["header"]["msg_id"]
757
758 def comm_info(self, target_name: t.Optional[str] = None) -> str:
759 """Request comm info
760
761 Returns
762 -------
763 The msg_id of the message sent
764 """
765 content = {} if target_name is None else {"target_name": target_name}
766 msg = self.session.msg("comm_info_request", content)
767 self.shell_channel.send(msg)
768 return msg["header"]["msg_id"]
769
770 def _handle_kernel_info_reply(self, msg: t.Dict[str, t.Any]) -> None:
771 """handle kernel info reply
772
773 sets protocol adaptation version. This might
774 be run from a separate thread.
775 """
776 adapt_version = int(msg["content"]["protocol_version"].split(".")[0])
777 if adapt_version != major_protocol_version:
778 self.session.adapt_version = adapt_version
779
780 def is_complete(self, code: str) -> str:
781 """Ask the kernel whether some code is complete and ready to execute.
782
783 Returns
784 -------
785 The ID of the message sent.
786 """
787 msg = self.session.msg("is_complete_request", {"code": code})
788 self.shell_channel.send(msg)
789 return msg["header"]["msg_id"]
790
791 def input(self, string: str) -> None:
792 """Send a string of raw input to the kernel.
793
794 This should only be called in response to the kernel sending an
795 ``input_request`` message on the stdin channel.
796
797 Returns
798 -------
799 The ID of the message sent.
800 """
801 content = {"value": string}
802 msg = self.session.msg("input_reply", content)
803 self.stdin_channel.send(msg)
804
805 def shutdown(self, restart: bool = False) -> str:
806 """Request an immediate kernel shutdown on the control channel.
807
808 Upon receipt of the (empty) reply, client code can safely assume that
809 the kernel has shut down and it's safe to forcefully terminate it if
810 it's still alive.
811
812 The kernel will send the reply via a function registered with Python's
813 atexit module, ensuring it's truly done as the kernel is done with all
814 normal operation.
815
816 Returns
817 -------
818 The msg_id of the message sent
819 """
820 # Send quit message to kernel. Once we implement kernel-side setattr,
821 # this should probably be done that way, but for now this will do.
822 msg = self.session.msg("shutdown_request", {"restart": restart})
823 self.control_channel.send(msg)
824 return msg["header"]["msg_id"]
825
826
827KernelClientABC.register(KernelClient)