Coverage for /pythoncovmergedfiles/medio/medio/src/jupyter_server/jupyter_server/services/kernels/connection/base.py: 67%

90 statements  

« prev     ^ index     » next       coverage.py v7.3.3, created at 2023-12-15 06:13 +0000

1"""Kernel connection helpers.""" 

2import json 

3import struct 

4from typing import Any, List 

5 

6from jupyter_client.session import Session 

7from tornado.websocket import WebSocketHandler 

8from traitlets import Float, Instance, Unicode, default 

9from traitlets.config import LoggingConfigurable 

10 

11try: 

12 from jupyter_client.jsonutil import json_default 

13except ImportError: 

14 from jupyter_client.jsonutil import date_default as json_default 

15 

16from jupyter_client.jsonutil import extract_dates 

17 

18from jupyter_server.transutils import _i18n 

19 

20from .abc import KernelWebsocketConnectionABC 

21 

22 

23def serialize_binary_message(msg): 

24 """serialize a message as a binary blob 

25 

26 Header: 

27 

28 4 bytes: number of msg parts (nbufs) as 32b int 

29 4 * nbufs bytes: offset for each buffer as integer as 32b int 

30 

31 Offsets are from the start of the buffer, including the header. 

32 

33 Returns 

34 ------- 

35 The message serialized to bytes. 

36 

37 """ 

38 # don't modify msg or buffer list in-place 

39 msg = msg.copy() 

40 buffers = list(msg.pop("buffers")) 

41 bmsg = json.dumps(msg, default=json_default).encode("utf8") 

42 buffers.insert(0, bmsg) 

43 nbufs = len(buffers) 

44 offsets = [4 * (nbufs + 1)] 

45 for buf in buffers[:-1]: 

46 offsets.append(offsets[-1] + len(buf)) 

47 offsets_buf = struct.pack("!" + "I" * (nbufs + 1), nbufs, *offsets) 

48 buffers.insert(0, offsets_buf) 

49 return b"".join(buffers) 

50 

51 

52def deserialize_binary_message(bmsg): 

53 """deserialize a message from a binary blog 

54 

55 Header: 

56 

57 4 bytes: number of msg parts (nbufs) as 32b int 

58 4 * nbufs bytes: offset for each buffer as integer as 32b int 

59 

60 Offsets are from the start of the buffer, including the header. 

61 

62 Returns 

63 ------- 

64 message dictionary 

65 """ 

66 nbufs = struct.unpack("!i", bmsg[:4])[0] 

67 offsets = list(struct.unpack("!" + "I" * nbufs, bmsg[4 : 4 * (nbufs + 1)])) 

68 offsets.append(None) 

69 bufs = [] 

70 for start, stop in zip(offsets[:-1], offsets[1:]): 

71 bufs.append(bmsg[start:stop]) 

72 msg = json.loads(bufs[0].decode("utf8")) 

73 msg["header"] = extract_dates(msg["header"]) 

74 msg["parent_header"] = extract_dates(msg["parent_header"]) 

75 msg["buffers"] = bufs[1:] 

76 return msg 

77 

78 

79def serialize_msg_to_ws_v1(msg_or_list, channel, pack=None): 

80 """Serialize a message using the v1 protocol.""" 

81 if pack: 

82 msg_list = [ 

83 pack(msg_or_list["header"]), 

84 pack(msg_or_list["parent_header"]), 

85 pack(msg_or_list["metadata"]), 

86 pack(msg_or_list["content"]), 

87 ] 

88 else: 

89 msg_list = msg_or_list 

90 channel = channel.encode("utf-8") 

91 offsets: List[Any] = [] 

92 offsets.append(8 * (1 + 1 + len(msg_list) + 1)) 

93 offsets.append(len(channel) + offsets[-1]) 

94 for msg in msg_list: 

95 offsets.append(len(msg) + offsets[-1]) 

96 offset_number = len(offsets).to_bytes(8, byteorder="little") 

97 offsets = [offset.to_bytes(8, byteorder="little") for offset in offsets] 

98 bin_msg = b"".join([offset_number, *offsets, channel, *msg_list]) 

99 return bin_msg 

100 

101 

102def deserialize_msg_from_ws_v1(ws_msg): 

103 """Deserialize a message using the v1 protocol.""" 

104 offset_number = int.from_bytes(ws_msg[:8], "little") 

105 offsets = [ 

106 int.from_bytes(ws_msg[8 * (i + 1) : 8 * (i + 2)], "little") for i in range(offset_number) 

107 ] 

108 channel = ws_msg[offsets[0] : offsets[1]].decode("utf-8") 

109 msg_list = [ws_msg[offsets[i] : offsets[i + 1]] for i in range(1, offset_number - 1)] 

110 return channel, msg_list 

111 

112 

113class BaseKernelWebsocketConnection(LoggingConfigurable): 

114 """A configurable base class for connecting Kernel WebSockets to ZMQ sockets.""" 

115 

116 kernel_ws_protocol = Unicode( 

117 None, 

118 allow_none=True, 

119 config=True, 

120 help=_i18n( 

121 "Preferred kernel message protocol over websocket to use (default: None). " 

122 "If an empty string is passed, select the legacy protocol. If None, " 

123 "the selected protocol will depend on what the front-end supports " 

124 "(usually the most recent protocol supported by the back-end and the " 

125 "front-end)." 

126 ), 

127 ) 

128 

129 @property 

130 def kernel_manager(self): 

131 """The kernel manager.""" 

132 return self.parent 

133 

134 @property 

135 def multi_kernel_manager(self): 

136 """The multi kernel manager.""" 

137 return self.kernel_manager.parent 

138 

139 @property 

140 def kernel_id(self): 

141 """The kernel id.""" 

142 return self.kernel_manager.kernel_id 

143 

144 @property 

145 def session_id(self): 

146 """The session id.""" 

147 return self.session.session 

148 

149 kernel_info_timeout = Float() 

150 

151 @default("kernel_info_timeout") 

152 def _default_kernel_info_timeout(self): 

153 return self.multi_kernel_manager.kernel_info_timeout 

154 

155 session = Instance(klass=Session, config=True) 

156 

157 @default("session") 

158 def _default_session(self): 

159 return Session(config=self.config) 

160 

161 websocket_handler = Instance(WebSocketHandler) 

162 

163 async def connect(self): 

164 """Handle a connect.""" 

165 raise NotImplementedError() 

166 

167 async def disconnect(self): 

168 """Handle a disconnect.""" 

169 raise NotImplementedError() 

170 

171 def handle_incoming_message(self, incoming_msg: str) -> None: 

172 """Handle an incoming message.""" 

173 raise NotImplementedError() 

174 

175 def handle_outgoing_message(self, stream: str, outgoing_msg: List[Any]) -> None: 

176 """Handle an outgoing message.""" 

177 raise NotImplementedError() 

178 

179 

180KernelWebsocketConnectionABC.register(BaseKernelWebsocketConnection)