Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/jupyter_client/blocking/client.py: 83%
36 statements
« prev ^ index » next coverage.py v7.3.3, created at 2023-12-15 06:13 +0000
« prev ^ index » next coverage.py v7.3.3, created at 2023-12-15 06:13 +0000
1"""Implements a fully blocking kernel client.
3Useful for test suites and blocking terminal interfaces.
4"""
5# Copyright (c) Jupyter Development Team.
6# Distributed under the terms of the Modified BSD License.
7from __future__ import annotations
9import typing as t
11from traitlets import Type
13from ..channels import HBChannel, ZMQSocketChannel
14from ..client import KernelClient, reqrep
15from ..utils import run_sync
18def wrapped(meth: t.Callable, channel: str) -> t.Callable:
19 """Wrap a method on a channel and handle replies."""
21 def _(self: BlockingKernelClient, *args: t.Any, **kwargs: t.Any) -> t.Any:
22 reply = kwargs.pop("reply", False)
23 timeout = kwargs.pop("timeout", None)
24 msg_id = meth(self, *args, **kwargs)
25 if not reply:
26 return msg_id
27 return self._recv_reply(msg_id, timeout=timeout, channel=channel)
29 return _
32class BlockingKernelClient(KernelClient):
33 """A KernelClient with blocking APIs
35 ``get_[channel]_msg()`` methods wait for and return messages on channels,
36 raising :exc:`queue.Empty` if no message arrives within ``timeout`` seconds.
37 """
39 # --------------------------------------------------------------------------
40 # Channel proxy methods
41 # --------------------------------------------------------------------------
43 get_shell_msg = run_sync(KernelClient._async_get_shell_msg)
44 get_iopub_msg = run_sync(KernelClient._async_get_iopub_msg)
45 get_stdin_msg = run_sync(KernelClient._async_get_stdin_msg)
46 get_control_msg = run_sync(KernelClient._async_get_control_msg)
48 wait_for_ready = run_sync(KernelClient._async_wait_for_ready)
50 # The classes to use for the various channels
51 shell_channel_class = Type(ZMQSocketChannel) # type:ignore[arg-type]
52 iopub_channel_class = Type(ZMQSocketChannel) # type:ignore[arg-type]
53 stdin_channel_class = Type(ZMQSocketChannel) # type:ignore[arg-type]
54 hb_channel_class = Type(HBChannel) # type:ignore[arg-type]
55 control_channel_class = Type(ZMQSocketChannel) # type:ignore[arg-type]
57 _recv_reply = run_sync(KernelClient._async_recv_reply)
59 # replies come on the shell channel
60 execute = reqrep(wrapped, KernelClient.execute)
61 history = reqrep(wrapped, KernelClient.history)
62 complete = reqrep(wrapped, KernelClient.complete)
63 inspect = reqrep(wrapped, KernelClient.inspect)
64 kernel_info = reqrep(wrapped, KernelClient.kernel_info)
65 comm_info = reqrep(wrapped, KernelClient.comm_info)
67 is_alive = run_sync(KernelClient._async_is_alive)
68 execute_interactive = run_sync(KernelClient._async_execute_interactive)
70 # replies come on the control channel
71 shutdown = reqrep(wrapped, KernelClient.shutdown, channel="control")