1"""Implements a fully blocking kernel client.
2
3Useful for test suites and blocking terminal interfaces.
4"""
5
6# Copyright (c) Jupyter Development Team.
7# Distributed under the terms of the Modified BSD License.
8from __future__ import annotations
9
10import typing as t
11
12from traitlets import Type
13
14from ..channels import HBChannel, ZMQSocketChannel
15from ..client import KernelClient, reqrep
16from ..utils import run_sync
17
18
19def wrapped(meth: t.Callable, channel: str) -> t.Callable:
20 """Wrap a method on a channel and handle replies."""
21
22 def _(self: BlockingKernelClient, *args: t.Any, **kwargs: t.Any) -> t.Any:
23 reply = kwargs.pop("reply", False)
24 timeout = kwargs.pop("timeout", None)
25 msg_id = meth(self, *args, **kwargs)
26 if not reply:
27 return msg_id
28 return self._recv_reply(msg_id, timeout=timeout, channel=channel)
29
30 return _
31
32
33class BlockingKernelClient(KernelClient):
34 """A KernelClient with blocking APIs
35
36 ``get_[channel]_msg()`` methods wait for and return messages on channels,
37 raising :exc:`queue.Empty` if no message arrives within ``timeout`` seconds.
38 """
39
40 # --------------------------------------------------------------------------
41 # Channel proxy methods
42 # --------------------------------------------------------------------------
43
44 get_shell_msg = run_sync(KernelClient._async_get_shell_msg)
45 get_iopub_msg = run_sync(KernelClient._async_get_iopub_msg)
46 get_stdin_msg = run_sync(KernelClient._async_get_stdin_msg)
47 get_control_msg = run_sync(KernelClient._async_get_control_msg)
48
49 wait_for_ready = run_sync(KernelClient._async_wait_for_ready)
50
51 # The classes to use for the various channels
52 shell_channel_class = Type(ZMQSocketChannel) # type:ignore[assignment]
53 iopub_channel_class = Type(ZMQSocketChannel) # type:ignore[assignment]
54 stdin_channel_class = Type(ZMQSocketChannel) # type:ignore[assignment]
55 hb_channel_class = Type(HBChannel) # type:ignore[assignment]
56 control_channel_class = Type(ZMQSocketChannel) # type:ignore[assignment]
57
58 _recv_reply = run_sync(KernelClient._async_recv_reply)
59
60 # replies come on the shell channel
61 execute = reqrep(wrapped, KernelClient.execute)
62 history = reqrep(wrapped, KernelClient.history)
63 complete = reqrep(wrapped, KernelClient.complete)
64 inspect = reqrep(wrapped, KernelClient.inspect)
65 kernel_info = reqrep(wrapped, KernelClient.kernel_info)
66 comm_info = reqrep(wrapped, KernelClient.comm_info)
67
68 is_alive = run_sync(KernelClient._async_is_alive)
69 execute_interactive = run_sync(KernelClient._async_execute_interactive)
70
71 # replies come on the control channel
72 shutdown = reqrep(wrapped, KernelClient.shutdown, channel="control")