1"""Implements a fully blocking kernel client.
2
3Useful for test suites and blocking terminal interfaces.
4"""
5# Copyright (c) Jupyter Development Team.
6# Distributed under the terms of the Modified BSD License.
7from __future__ import annotations
8
9import typing as t
10
11from traitlets import Type
12
13from ..channels import HBChannel, ZMQSocketChannel
14from ..client import KernelClient, reqrep
15from ..utils import run_sync
16
17
18def wrapped(meth: t.Callable, channel: str) -> t.Callable:
19 """Wrap a method on a channel and handle replies."""
20
21 def _(self: BlockingKernelClient, *args: t.Any, **kwargs: t.Any) -> t.Any:
22 reply = kwargs.pop("reply", False)
23 timeout = kwargs.pop("timeout", None)
24 msg_id = meth(self, *args, **kwargs)
25 if not reply:
26 return msg_id
27 return self._recv_reply(msg_id, timeout=timeout, channel=channel)
28
29 return _
30
31
32class BlockingKernelClient(KernelClient):
33 """A KernelClient with blocking APIs
34
35 ``get_[channel]_msg()`` methods wait for and return messages on channels,
36 raising :exc:`queue.Empty` if no message arrives within ``timeout`` seconds.
37 """
38
39 # --------------------------------------------------------------------------
40 # Channel proxy methods
41 # --------------------------------------------------------------------------
42
43 get_shell_msg = run_sync(KernelClient._async_get_shell_msg)
44 get_iopub_msg = run_sync(KernelClient._async_get_iopub_msg)
45 get_stdin_msg = run_sync(KernelClient._async_get_stdin_msg)
46 get_control_msg = run_sync(KernelClient._async_get_control_msg)
47
48 wait_for_ready = run_sync(KernelClient._async_wait_for_ready)
49
50 # The classes to use for the various channels
51 shell_channel_class = Type(ZMQSocketChannel) # type:ignore[arg-type]
52 iopub_channel_class = Type(ZMQSocketChannel) # type:ignore[arg-type]
53 stdin_channel_class = Type(ZMQSocketChannel) # type:ignore[arg-type]
54 hb_channel_class = Type(HBChannel) # type:ignore[arg-type]
55 control_channel_class = Type(ZMQSocketChannel) # type:ignore[arg-type]
56
57 _recv_reply = run_sync(KernelClient._async_recv_reply)
58
59 # replies come on the shell channel
60 execute = reqrep(wrapped, KernelClient.execute)
61 history = reqrep(wrapped, KernelClient.history)
62 complete = reqrep(wrapped, KernelClient.complete)
63 inspect = reqrep(wrapped, KernelClient.inspect)
64 kernel_info = reqrep(wrapped, KernelClient.kernel_info)
65 comm_info = reqrep(wrapped, KernelClient.comm_info)
66
67 is_alive = run_sync(KernelClient._async_is_alive)
68 execute_interactive = run_sync(KernelClient._async_execute_interactive)
69
70 # replies come on the control channel
71 shutdown = reqrep(wrapped, KernelClient.shutdown, channel="control")