Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/jupyter_client/blocking/client.py: 84%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

37 statements  

1"""Implements a fully blocking kernel client. 

2 

3Useful for test suites and blocking terminal interfaces. 

4""" 

5 

6# Copyright (c) Jupyter Development Team. 

7# Distributed under the terms of the Modified BSD License. 

8from __future__ import annotations 

9 

10import typing as t 

11 

12from traitlets import Type 

13 

14from ..channels import HBChannel, ZMQSocketChannel 

15from ..client import KernelClient, reqrep 

16from ..utils import run_sync 

17 

18 

19def wrapped(meth: t.Callable, channel: str) -> t.Callable: 

20 """Wrap a method on a channel and handle replies.""" 

21 

22 def _(self: BlockingKernelClient, *args: t.Any, **kwargs: t.Any) -> t.Any: 

23 reply = kwargs.pop("reply", False) 

24 timeout = kwargs.pop("timeout", None) 

25 msg_id = meth(self, *args, **kwargs) 

26 if not reply: 

27 return msg_id 

28 return self._recv_reply(msg_id, timeout=timeout, channel=channel) 

29 

30 return _ 

31 

32 

33class BlockingKernelClient(KernelClient): 

34 """A KernelClient with blocking APIs 

35 

36 ``get_[channel]_msg()`` methods wait for and return messages on channels, 

37 raising :exc:`queue.Empty` if no message arrives within ``timeout`` seconds. 

38 """ 

39 

40 # -------------------------------------------------------------------------- 

41 # Channel proxy methods 

42 # -------------------------------------------------------------------------- 

43 

44 get_shell_msg = run_sync(KernelClient._async_get_shell_msg) 

45 get_iopub_msg = run_sync(KernelClient._async_get_iopub_msg) 

46 get_stdin_msg = run_sync(KernelClient._async_get_stdin_msg) 

47 get_control_msg = run_sync(KernelClient._async_get_control_msg) 

48 

49 wait_for_ready = run_sync(KernelClient._async_wait_for_ready) 

50 

51 # The classes to use for the various channels 

52 shell_channel_class = Type(ZMQSocketChannel) # type:ignore[assignment] 

53 iopub_channel_class = Type(ZMQSocketChannel) # type:ignore[assignment] 

54 stdin_channel_class = Type(ZMQSocketChannel) # type:ignore[assignment] 

55 hb_channel_class = Type(HBChannel) # type:ignore[assignment] 

56 control_channel_class = Type(ZMQSocketChannel) # type:ignore[assignment] 

57 

58 _recv_reply = run_sync(KernelClient._async_recv_reply) 

59 

60 # replies come on the shell channel 

61 execute = reqrep(wrapped, KernelClient.execute) 

62 history = reqrep(wrapped, KernelClient.history) 

63 complete = reqrep(wrapped, KernelClient.complete) 

64 inspect = reqrep(wrapped, KernelClient.inspect) 

65 kernel_info = reqrep(wrapped, KernelClient.kernel_info) 

66 comm_info = reqrep(wrapped, KernelClient.comm_info) 

67 

68 is_alive = run_sync(KernelClient._async_is_alive) 

69 execute_interactive = run_sync(KernelClient._async_execute_interactive) 

70 

71 # replies come on the control channel 

72 shutdown = reqrep(wrapped, KernelClient.shutdown, channel="control")