Coverage for /pythoncovmergedfiles/medio/medio/src/jupyter_server/jupyter_server/services/kernels/connection/base.py: 66%

87 statements  

« prev     ^ index     » next       coverage.py v7.2.3, created at 2023-04-10 06:20 +0000

1"""Kernel connection helpers.""" 

2import json 

3import struct 

4 

5from jupyter_client.session import Session 

6from tornado.websocket import WebSocketHandler 

7from traitlets import Float, Instance, default 

8from traitlets.config import LoggingConfigurable 

9 

10try: 

11 from jupyter_client.jsonutil import json_default 

12except ImportError: 

13 from jupyter_client.jsonutil import date_default as json_default 

14 

15from jupyter_client.jsonutil import extract_dates 

16 

17from .abc import KernelWebsocketConnectionABC 

18 

19 

20def serialize_binary_message(msg): 

21 """serialize a message as a binary blob 

22 

23 Header: 

24 

25 4 bytes: number of msg parts (nbufs) as 32b int 

26 4 * nbufs bytes: offset for each buffer as integer as 32b int 

27 

28 Offsets are from the start of the buffer, including the header. 

29 

30 Returns 

31 ------- 

32 The message serialized to bytes. 

33 

34 """ 

35 # don't modify msg or buffer list in-place 

36 msg = msg.copy() 

37 buffers = list(msg.pop("buffers")) 

38 bmsg = json.dumps(msg, default=json_default).encode("utf8") 

39 buffers.insert(0, bmsg) 

40 nbufs = len(buffers) 

41 offsets = [4 * (nbufs + 1)] 

42 for buf in buffers[:-1]: 

43 offsets.append(offsets[-1] + len(buf)) 

44 offsets_buf = struct.pack("!" + "I" * (nbufs + 1), nbufs, *offsets) 

45 buffers.insert(0, offsets_buf) 

46 return b"".join(buffers) 

47 

48 

49def deserialize_binary_message(bmsg): 

50 """deserialize a message from a binary blog 

51 

52 Header: 

53 

54 4 bytes: number of msg parts (nbufs) as 32b int 

55 4 * nbufs bytes: offset for each buffer as integer as 32b int 

56 

57 Offsets are from the start of the buffer, including the header. 

58 

59 Returns 

60 ------- 

61 message dictionary 

62 """ 

63 nbufs = struct.unpack("!i", bmsg[:4])[0] 

64 offsets = list(struct.unpack("!" + "I" * nbufs, bmsg[4 : 4 * (nbufs + 1)])) 

65 offsets.append(None) 

66 bufs = [] 

67 for start, stop in zip(offsets[:-1], offsets[1:]): 

68 bufs.append(bmsg[start:stop]) 

69 msg = json.loads(bufs[0].decode("utf8")) 

70 msg["header"] = extract_dates(msg["header"]) 

71 msg["parent_header"] = extract_dates(msg["parent_header"]) 

72 msg["buffers"] = bufs[1:] 

73 return msg 

74 

75 

76def serialize_msg_to_ws_v1(msg_or_list, channel, pack=None): 

77 """Serialize a message using the v1 protocol.""" 

78 if pack: 

79 msg_list = [ 

80 pack(msg_or_list["header"]), 

81 pack(msg_or_list["parent_header"]), 

82 pack(msg_or_list["metadata"]), 

83 pack(msg_or_list["content"]), 

84 ] 

85 else: 

86 msg_list = msg_or_list 

87 channel = channel.encode("utf-8") 

88 offsets: list = [] 

89 offsets.append(8 * (1 + 1 + len(msg_list) + 1)) 

90 offsets.append(len(channel) + offsets[-1]) 

91 for msg in msg_list: 

92 offsets.append(len(msg) + offsets[-1]) 

93 offset_number = len(offsets).to_bytes(8, byteorder="little") 

94 offsets = [offset.to_bytes(8, byteorder="little") for offset in offsets] 

95 bin_msg = b"".join([offset_number, *offsets] + [channel] + msg_list) 

96 return bin_msg 

97 

98 

99def deserialize_msg_from_ws_v1(ws_msg): 

100 """Deserialize a message using the v1 protocol.""" 

101 offset_number = int.from_bytes(ws_msg[:8], "little") 

102 offsets = [ 

103 int.from_bytes(ws_msg[8 * (i + 1) : 8 * (i + 2)], "little") for i in range(offset_number) 

104 ] 

105 channel = ws_msg[offsets[0] : offsets[1]].decode("utf-8") 

106 msg_list = [ws_msg[offsets[i] : offsets[i + 1]] for i in range(1, offset_number - 1)] 

107 return channel, msg_list 

108 

109 

110class BaseKernelWebsocketConnection(LoggingConfigurable): 

111 """A configurable base class for connecting Kernel WebSockets to ZMQ sockets.""" 

112 

113 @property 

114 def kernel_manager(self): 

115 """The kernel manager.""" 

116 return self.parent 

117 

118 @property 

119 def multi_kernel_manager(self): 

120 """The multi kernel manager.""" 

121 return self.kernel_manager.parent 

122 

123 @property 

124 def kernel_id(self): 

125 """The kernel id.""" 

126 return self.kernel_manager.kernel_id 

127 

128 @property 

129 def session_id(self): 

130 """The session id.""" 

131 return self.session.session 

132 

133 kernel_info_timeout = Float() 

134 

135 @default("kernel_info_timeout") 

136 def _default_kernel_info_timeout(self): 

137 return self.multi_kernel_manager.kernel_info_timeout 

138 

139 session = Instance(klass=Session, config=True) 

140 

141 @default("session") 

142 def _default_session(self): 

143 return Session(config=self.config) 

144 

145 websocket_handler = Instance(WebSocketHandler) 

146 

147 async def connect(self): 

148 """Handle a connect.""" 

149 raise NotImplementedError() 

150 

151 async def disconnect(self): 

152 """Handle a disconnect.""" 

153 raise NotImplementedError() 

154 

155 def handle_incoming_message(self, incoming_msg: str) -> None: 

156 """Handle an incoming message.""" 

157 raise NotImplementedError() 

158 

159 def handle_outgoing_message(self, stream: str, outgoing_msg: list) -> None: 

160 """Handle an outgoing message.""" 

161 raise NotImplementedError() 

162 

163 

164KernelWebsocketConnectionABC.register(BaseKernelWebsocketConnection)