Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/jupyter_client/blocking/client.py: 82%

34 statements  

« prev     ^ index     » next       coverage.py v7.2.7, created at 2023-06-03 06:10 +0000

1"""Implements a fully blocking kernel client. 

2 

3Useful for test suites and blocking terminal interfaces. 

4""" 

5# Copyright (c) Jupyter Development Team. 

6# Distributed under the terms of the Modified BSD License. 

7from traitlets import Type 

8 

9from ..channels import HBChannel, ZMQSocketChannel 

10from ..client import KernelClient, reqrep 

11from ..utils import run_sync 

12 

13 

14def wrapped(meth, channel): 

15 """Wrap a method on a channel and handle replies.""" 

16 

17 def _(self, *args, **kwargs): 

18 reply = kwargs.pop("reply", False) 

19 timeout = kwargs.pop("timeout", None) 

20 msg_id = meth(self, *args, **kwargs) 

21 if not reply: 

22 return msg_id 

23 return self._recv_reply(msg_id, timeout=timeout, channel=channel) 

24 

25 return _ 

26 

27 

28class BlockingKernelClient(KernelClient): 

29 """A KernelClient with blocking APIs 

30 

31 ``get_[channel]_msg()`` methods wait for and return messages on channels, 

32 raising :exc:`queue.Empty` if no message arrives within ``timeout`` seconds. 

33 """ 

34 

35 # -------------------------------------------------------------------------- 

36 # Channel proxy methods 

37 # -------------------------------------------------------------------------- 

38 

39 get_shell_msg = run_sync(KernelClient._async_get_shell_msg) 

40 get_iopub_msg = run_sync(KernelClient._async_get_iopub_msg) 

41 get_stdin_msg = run_sync(KernelClient._async_get_stdin_msg) 

42 get_control_msg = run_sync(KernelClient._async_get_control_msg) 

43 

44 wait_for_ready = run_sync(KernelClient._async_wait_for_ready) 

45 

46 # The classes to use for the various channels 

47 shell_channel_class = Type(ZMQSocketChannel) 

48 iopub_channel_class = Type(ZMQSocketChannel) 

49 stdin_channel_class = Type(ZMQSocketChannel) 

50 hb_channel_class = Type(HBChannel) 

51 control_channel_class = Type(ZMQSocketChannel) 

52 

53 _recv_reply = run_sync(KernelClient._async_recv_reply) 

54 

55 # replies come on the shell channel 

56 execute = reqrep(wrapped, KernelClient.execute) 

57 history = reqrep(wrapped, KernelClient.history) 

58 complete = reqrep(wrapped, KernelClient.complete) 

59 inspect = reqrep(wrapped, KernelClient.inspect) 

60 kernel_info = reqrep(wrapped, KernelClient.kernel_info) 

61 comm_info = reqrep(wrapped, KernelClient.comm_info) 

62 

63 is_alive = run_sync(KernelClient._async_is_alive) 

64 execute_interactive = run_sync(KernelClient._async_execute_interactive) 

65 

66 # replies come on the control channel 

67 shutdown = reqrep(wrapped, KernelClient.shutdown, channel="control")