Coverage for /pythoncovmergedfiles/medio/medio/src/jupyter_server/jupyter_server/services/kernels/connection/base.py: 66%
87 statements
« prev ^ index » next coverage.py v7.2.3, created at 2023-04-10 06:20 +0000
« prev ^ index » next coverage.py v7.2.3, created at 2023-04-10 06:20 +0000
1"""Kernel connection helpers."""
2import json
3import struct
5from jupyter_client.session import Session
6from tornado.websocket import WebSocketHandler
7from traitlets import Float, Instance, default
8from traitlets.config import LoggingConfigurable
10try:
11 from jupyter_client.jsonutil import json_default
12except ImportError:
13 from jupyter_client.jsonutil import date_default as json_default
15from jupyter_client.jsonutil import extract_dates
17from .abc import KernelWebsocketConnectionABC
20def serialize_binary_message(msg):
21 """serialize a message as a binary blob
23 Header:
25 4 bytes: number of msg parts (nbufs) as 32b int
26 4 * nbufs bytes: offset for each buffer as integer as 32b int
28 Offsets are from the start of the buffer, including the header.
30 Returns
31 -------
32 The message serialized to bytes.
34 """
35 # don't modify msg or buffer list in-place
36 msg = msg.copy()
37 buffers = list(msg.pop("buffers"))
38 bmsg = json.dumps(msg, default=json_default).encode("utf8")
39 buffers.insert(0, bmsg)
40 nbufs = len(buffers)
41 offsets = [4 * (nbufs + 1)]
42 for buf in buffers[:-1]:
43 offsets.append(offsets[-1] + len(buf))
44 offsets_buf = struct.pack("!" + "I" * (nbufs + 1), nbufs, *offsets)
45 buffers.insert(0, offsets_buf)
46 return b"".join(buffers)
49def deserialize_binary_message(bmsg):
50 """deserialize a message from a binary blog
52 Header:
54 4 bytes: number of msg parts (nbufs) as 32b int
55 4 * nbufs bytes: offset for each buffer as integer as 32b int
57 Offsets are from the start of the buffer, including the header.
59 Returns
60 -------
61 message dictionary
62 """
63 nbufs = struct.unpack("!i", bmsg[:4])[0]
64 offsets = list(struct.unpack("!" + "I" * nbufs, bmsg[4 : 4 * (nbufs + 1)]))
65 offsets.append(None)
66 bufs = []
67 for start, stop in zip(offsets[:-1], offsets[1:]):
68 bufs.append(bmsg[start:stop])
69 msg = json.loads(bufs[0].decode("utf8"))
70 msg["header"] = extract_dates(msg["header"])
71 msg["parent_header"] = extract_dates(msg["parent_header"])
72 msg["buffers"] = bufs[1:]
73 return msg
76def serialize_msg_to_ws_v1(msg_or_list, channel, pack=None):
77 """Serialize a message using the v1 protocol."""
78 if pack:
79 msg_list = [
80 pack(msg_or_list["header"]),
81 pack(msg_or_list["parent_header"]),
82 pack(msg_or_list["metadata"]),
83 pack(msg_or_list["content"]),
84 ]
85 else:
86 msg_list = msg_or_list
87 channel = channel.encode("utf-8")
88 offsets: list = []
89 offsets.append(8 * (1 + 1 + len(msg_list) + 1))
90 offsets.append(len(channel) + offsets[-1])
91 for msg in msg_list:
92 offsets.append(len(msg) + offsets[-1])
93 offset_number = len(offsets).to_bytes(8, byteorder="little")
94 offsets = [offset.to_bytes(8, byteorder="little") for offset in offsets]
95 bin_msg = b"".join([offset_number, *offsets] + [channel] + msg_list)
96 return bin_msg
99def deserialize_msg_from_ws_v1(ws_msg):
100 """Deserialize a message using the v1 protocol."""
101 offset_number = int.from_bytes(ws_msg[:8], "little")
102 offsets = [
103 int.from_bytes(ws_msg[8 * (i + 1) : 8 * (i + 2)], "little") for i in range(offset_number)
104 ]
105 channel = ws_msg[offsets[0] : offsets[1]].decode("utf-8")
106 msg_list = [ws_msg[offsets[i] : offsets[i + 1]] for i in range(1, offset_number - 1)]
107 return channel, msg_list
110class BaseKernelWebsocketConnection(LoggingConfigurable):
111 """A configurable base class for connecting Kernel WebSockets to ZMQ sockets."""
113 @property
114 def kernel_manager(self):
115 """The kernel manager."""
116 return self.parent
118 @property
119 def multi_kernel_manager(self):
120 """The multi kernel manager."""
121 return self.kernel_manager.parent
123 @property
124 def kernel_id(self):
125 """The kernel id."""
126 return self.kernel_manager.kernel_id
128 @property
129 def session_id(self):
130 """The session id."""
131 return self.session.session
133 kernel_info_timeout = Float()
135 @default("kernel_info_timeout")
136 def _default_kernel_info_timeout(self):
137 return self.multi_kernel_manager.kernel_info_timeout
139 session = Instance(klass=Session, config=True)
141 @default("session")
142 def _default_session(self):
143 return Session(config=self.config)
145 websocket_handler = Instance(WebSocketHandler)
147 async def connect(self):
148 """Handle a connect."""
149 raise NotImplementedError()
151 async def disconnect(self):
152 """Handle a disconnect."""
153 raise NotImplementedError()
155 def handle_incoming_message(self, incoming_msg: str) -> None:
156 """Handle an incoming message."""
157 raise NotImplementedError()
159 def handle_outgoing_message(self, stream: str, outgoing_msg: list) -> None:
160 """Handle an outgoing message."""
161 raise NotImplementedError()
164KernelWebsocketConnectionABC.register(BaseKernelWebsocketConnection)