Coverage for /pythoncovmergedfiles/medio/medio/src/jupyter_server/jupyter_server/services/kernels/connection/base.py: 67%
90 statements
« prev ^ index » next coverage.py v7.3.3, created at 2023-12-15 06:13 +0000
« prev ^ index » next coverage.py v7.3.3, created at 2023-12-15 06:13 +0000
1"""Kernel connection helpers."""
2import json
3import struct
4from typing import Any, List
6from jupyter_client.session import Session
7from tornado.websocket import WebSocketHandler
8from traitlets import Float, Instance, Unicode, default
9from traitlets.config import LoggingConfigurable
11try:
12 from jupyter_client.jsonutil import json_default
13except ImportError:
14 from jupyter_client.jsonutil import date_default as json_default
16from jupyter_client.jsonutil import extract_dates
18from jupyter_server.transutils import _i18n
20from .abc import KernelWebsocketConnectionABC
23def serialize_binary_message(msg):
24 """serialize a message as a binary blob
26 Header:
28 4 bytes: number of msg parts (nbufs) as 32b int
29 4 * nbufs bytes: offset for each buffer as integer as 32b int
31 Offsets are from the start of the buffer, including the header.
33 Returns
34 -------
35 The message serialized to bytes.
37 """
38 # don't modify msg or buffer list in-place
39 msg = msg.copy()
40 buffers = list(msg.pop("buffers"))
41 bmsg = json.dumps(msg, default=json_default).encode("utf8")
42 buffers.insert(0, bmsg)
43 nbufs = len(buffers)
44 offsets = [4 * (nbufs + 1)]
45 for buf in buffers[:-1]:
46 offsets.append(offsets[-1] + len(buf))
47 offsets_buf = struct.pack("!" + "I" * (nbufs + 1), nbufs, *offsets)
48 buffers.insert(0, offsets_buf)
49 return b"".join(buffers)
52def deserialize_binary_message(bmsg):
53 """deserialize a message from a binary blog
55 Header:
57 4 bytes: number of msg parts (nbufs) as 32b int
58 4 * nbufs bytes: offset for each buffer as integer as 32b int
60 Offsets are from the start of the buffer, including the header.
62 Returns
63 -------
64 message dictionary
65 """
66 nbufs = struct.unpack("!i", bmsg[:4])[0]
67 offsets = list(struct.unpack("!" + "I" * nbufs, bmsg[4 : 4 * (nbufs + 1)]))
68 offsets.append(None)
69 bufs = []
70 for start, stop in zip(offsets[:-1], offsets[1:]):
71 bufs.append(bmsg[start:stop])
72 msg = json.loads(bufs[0].decode("utf8"))
73 msg["header"] = extract_dates(msg["header"])
74 msg["parent_header"] = extract_dates(msg["parent_header"])
75 msg["buffers"] = bufs[1:]
76 return msg
79def serialize_msg_to_ws_v1(msg_or_list, channel, pack=None):
80 """Serialize a message using the v1 protocol."""
81 if pack:
82 msg_list = [
83 pack(msg_or_list["header"]),
84 pack(msg_or_list["parent_header"]),
85 pack(msg_or_list["metadata"]),
86 pack(msg_or_list["content"]),
87 ]
88 else:
89 msg_list = msg_or_list
90 channel = channel.encode("utf-8")
91 offsets: List[Any] = []
92 offsets.append(8 * (1 + 1 + len(msg_list) + 1))
93 offsets.append(len(channel) + offsets[-1])
94 for msg in msg_list:
95 offsets.append(len(msg) + offsets[-1])
96 offset_number = len(offsets).to_bytes(8, byteorder="little")
97 offsets = [offset.to_bytes(8, byteorder="little") for offset in offsets]
98 bin_msg = b"".join([offset_number, *offsets, channel, *msg_list])
99 return bin_msg
102def deserialize_msg_from_ws_v1(ws_msg):
103 """Deserialize a message using the v1 protocol."""
104 offset_number = int.from_bytes(ws_msg[:8], "little")
105 offsets = [
106 int.from_bytes(ws_msg[8 * (i + 1) : 8 * (i + 2)], "little") for i in range(offset_number)
107 ]
108 channel = ws_msg[offsets[0] : offsets[1]].decode("utf-8")
109 msg_list = [ws_msg[offsets[i] : offsets[i + 1]] for i in range(1, offset_number - 1)]
110 return channel, msg_list
113class BaseKernelWebsocketConnection(LoggingConfigurable):
114 """A configurable base class for connecting Kernel WebSockets to ZMQ sockets."""
116 kernel_ws_protocol = Unicode(
117 None,
118 allow_none=True,
119 config=True,
120 help=_i18n(
121 "Preferred kernel message protocol over websocket to use (default: None). "
122 "If an empty string is passed, select the legacy protocol. If None, "
123 "the selected protocol will depend on what the front-end supports "
124 "(usually the most recent protocol supported by the back-end and the "
125 "front-end)."
126 ),
127 )
129 @property
130 def kernel_manager(self):
131 """The kernel manager."""
132 return self.parent
134 @property
135 def multi_kernel_manager(self):
136 """The multi kernel manager."""
137 return self.kernel_manager.parent
139 @property
140 def kernel_id(self):
141 """The kernel id."""
142 return self.kernel_manager.kernel_id
144 @property
145 def session_id(self):
146 """The session id."""
147 return self.session.session
149 kernel_info_timeout = Float()
151 @default("kernel_info_timeout")
152 def _default_kernel_info_timeout(self):
153 return self.multi_kernel_manager.kernel_info_timeout
155 session = Instance(klass=Session, config=True)
157 @default("session")
158 def _default_session(self):
159 return Session(config=self.config)
161 websocket_handler = Instance(WebSocketHandler)
163 async def connect(self):
164 """Handle a connect."""
165 raise NotImplementedError()
167 async def disconnect(self):
168 """Handle a disconnect."""
169 raise NotImplementedError()
171 def handle_incoming_message(self, incoming_msg: str) -> None:
172 """Handle an incoming message."""
173 raise NotImplementedError()
175 def handle_outgoing_message(self, stream: str, outgoing_msg: List[Any]) -> None:
176 """Handle an outgoing message."""
177 raise NotImplementedError()
180KernelWebsocketConnectionABC.register(BaseKernelWebsocketConnection)