Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/jupyter_client/blocking/client.py: 84%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

37 statements  

1"""Implements a fully blocking kernel client. 

2 

3Useful for test suites and blocking terminal interfaces. 

4""" 

5# Copyright (c) Jupyter Development Team. 

6# Distributed under the terms of the Modified BSD License. 

7from __future__ import annotations 

8 

9import typing as t 

10 

11from traitlets import Type 

12 

13from ..channels import HBChannel, ZMQSocketChannel 

14from ..client import KernelClient, reqrep 

15from ..utils import run_sync 

16 

17 

18def wrapped(meth: t.Callable, channel: str) -> t.Callable: 

19 """Wrap a method on a channel and handle replies.""" 

20 

21 def _(self: BlockingKernelClient, *args: t.Any, **kwargs: t.Any) -> t.Any: 

22 reply = kwargs.pop("reply", False) 

23 timeout = kwargs.pop("timeout", None) 

24 msg_id = meth(self, *args, **kwargs) 

25 if not reply: 

26 return msg_id 

27 return self._recv_reply(msg_id, timeout=timeout, channel=channel) 

28 

29 return _ 

30 

31 

32class BlockingKernelClient(KernelClient): 

33 """A KernelClient with blocking APIs 

34 

35 ``get_[channel]_msg()`` methods wait for and return messages on channels, 

36 raising :exc:`queue.Empty` if no message arrives within ``timeout`` seconds. 

37 """ 

38 

39 # -------------------------------------------------------------------------- 

40 # Channel proxy methods 

41 # -------------------------------------------------------------------------- 

42 

43 get_shell_msg = run_sync(KernelClient._async_get_shell_msg) 

44 get_iopub_msg = run_sync(KernelClient._async_get_iopub_msg) 

45 get_stdin_msg = run_sync(KernelClient._async_get_stdin_msg) 

46 get_control_msg = run_sync(KernelClient._async_get_control_msg) 

47 

48 wait_for_ready = run_sync(KernelClient._async_wait_for_ready) 

49 

50 # The classes to use for the various channels 

51 shell_channel_class = Type(ZMQSocketChannel) # type:ignore[arg-type] 

52 iopub_channel_class = Type(ZMQSocketChannel) # type:ignore[arg-type] 

53 stdin_channel_class = Type(ZMQSocketChannel) # type:ignore[arg-type] 

54 hb_channel_class = Type(HBChannel) # type:ignore[arg-type] 

55 control_channel_class = Type(ZMQSocketChannel) # type:ignore[arg-type] 

56 

57 _recv_reply = run_sync(KernelClient._async_recv_reply) 

58 

59 # replies come on the shell channel 

60 execute = reqrep(wrapped, KernelClient.execute) 

61 history = reqrep(wrapped, KernelClient.history) 

62 complete = reqrep(wrapped, KernelClient.complete) 

63 inspect = reqrep(wrapped, KernelClient.inspect) 

64 kernel_info = reqrep(wrapped, KernelClient.kernel_info) 

65 comm_info = reqrep(wrapped, KernelClient.comm_info) 

66 

67 is_alive = run_sync(KernelClient._async_is_alive) 

68 execute_interactive = run_sync(KernelClient._async_execute_interactive) 

69 

70 # replies come on the control channel 

71 shutdown = reqrep(wrapped, KernelClient.shutdown, channel="control")