1"""An output widget mimic."""
2from __future__ import annotations
3
4from typing import Any
5
6from jupyter_client.client import KernelClient
7from nbformat import NotebookNode
8from nbformat.v4 import output_from_msg
9
10from .jsonutil import json_clean
11
12
13class OutputWidget:
14 """This class mimics a front end output widget"""
15
16 def __init__(
17 self, comm_id: str, state: dict[str, Any], kernel_client: KernelClient, executor: Any
18 ) -> None:
19 """Initialize the widget."""
20 self.comm_id: str = comm_id
21 self.state: dict[str, Any] = state
22 self.kernel_client: KernelClient = kernel_client
23 self.executor = executor
24 self.topic: bytes = ("comm-%s" % self.comm_id).encode("ascii")
25 self.outputs: list[NotebookNode] = self.state["outputs"]
26 self.clear_before_next_output: bool = False
27
28 def clear_output(self, outs: list[NotebookNode], msg: dict[str, Any], cell_index: int) -> None:
29 """Clear output."""
30 self.parent_header = msg["parent_header"]
31 content = msg["content"]
32 if content.get("wait"):
33 self.clear_before_next_output = True
34 else:
35 self.outputs = []
36 # sync back the state to the kernel
37 self.sync_state()
38 if hasattr(self.executor, "widget_state"):
39 # sync the state to the nbconvert state as well, since that is used for testing
40 self.executor.widget_state[self.comm_id]["outputs"] = self.outputs
41
42 def sync_state(self) -> None:
43 """Sync state."""
44 state = {"outputs": self.outputs}
45 msg = {"method": "update", "state": state, "buffer_paths": []}
46 self.send(msg)
47
48 def _publish_msg(
49 self,
50 msg_type: str,
51 data: dict[str, Any] | None = None,
52 metadata: dict[str, Any] | None = None,
53 buffers: list[Any] | None = None,
54 **keys: Any,
55 ) -> None:
56 """Helper for sending a comm message on IOPub"""
57 data = {} if data is None else data
58 metadata = {} if metadata is None else metadata
59 content = json_clean(dict(data=data, comm_id=self.comm_id, **keys))
60 msg = self.kernel_client.session.msg(
61 msg_type, content=content, parent=self.parent_header, metadata=metadata
62 )
63 self.kernel_client.shell_channel.send(msg)
64
65 def send(
66 self,
67 data: dict[str, Any] | None = None,
68 metadata: dict[str, Any] | None = None,
69 buffers: list[Any] | None = None,
70 ) -> None:
71 """Send a comm message."""
72 self._publish_msg("comm_msg", data=data, metadata=metadata, buffers=buffers)
73
74 def output(
75 self, outs: list[NotebookNode], msg: dict[str, Any], display_id: str, cell_index: int
76 ) -> None:
77 """Handle output."""
78 if self.clear_before_next_output:
79 self.outputs = []
80 self.clear_before_next_output = False
81 self.parent_header = msg["parent_header"]
82 output = output_from_msg(msg) # type:ignore[no-untyped-call]
83
84 if self.outputs:
85 # try to coalesce/merge output text
86 last_output = self.outputs[-1]
87 if (
88 last_output["output_type"] == "stream"
89 and output["output_type"] == "stream"
90 and last_output["name"] == output["name"]
91 ):
92 last_output["text"] += output["text"]
93 else:
94 self.outputs.append(output)
95 else:
96 self.outputs.append(output)
97 self.sync_state()
98 if hasattr(self.executor, "widget_state"):
99 # sync the state to the nbconvert state as well, since that is used for testing
100 self.executor.widget_state[self.comm_id]["outputs"] = self.outputs
101
102 def set_state(self, state: dict[str, Any]) -> None:
103 """Set the state."""
104 if "msg_id" in state:
105 msg_id = state.get("msg_id")
106 if msg_id:
107 self.executor.register_output_hook(msg_id, self)
108 self.msg_id = msg_id
109 else:
110 self.executor.remove_output_hook(self.msg_id, self)
111 self.msg_id = msg_id
112
113 def handle_msg(self, msg: dict[str, Any]) -> None:
114 """Handle a message."""
115 content = msg["content"]
116 comm_id = content["comm_id"]
117 if comm_id != self.comm_id:
118 raise AssertionError("Mismatched comm id")
119 data = content["data"]
120 if "state" in data:
121 self.set_state(data["state"])