Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/jupyter_client/adapter.py: 18%

257 statements  

« prev     ^ index     » next       coverage.py v7.2.7, created at 2023-07-01 06:54 +0000

1"""Adapters for Jupyter msg spec versions.""" 

2# Copyright (c) Jupyter Development Team. 

3# Distributed under the terms of the Modified BSD License. 

4import json 

5import re 

6from typing import Any, Dict, List, Tuple 

7 

8from ._version import protocol_version_info 

9 

10 

11def code_to_line(code: str, cursor_pos: int) -> Tuple[str, int]: 

12 """Turn a multiline code block and cursor position into a single line 

13 and new cursor position. 

14 

15 For adapting ``complete_`` and ``object_info_request``. 

16 """ 

17 if not code: 

18 return "", 0 

19 for line in code.splitlines(True): 

20 n = len(line) 

21 if cursor_pos > n: 

22 cursor_pos -= n 

23 else: 

24 break 

25 return line, cursor_pos 

26 

27 

28_match_bracket = re.compile(r"\([^\(\)]+\)", re.UNICODE) 

29_end_bracket = re.compile(r"\([^\(]*$", re.UNICODE) 

30_identifier = re.compile(r"[a-z_][0-9a-z._]*", re.I | re.UNICODE) 

31 

32 

33def extract_oname_v4(code: str, cursor_pos: int) -> str: 

34 """Reimplement token-finding logic from IPython 2.x javascript 

35 

36 for adapting object_info_request from v5 to v4 

37 """ 

38 

39 line, _ = code_to_line(code, cursor_pos) 

40 

41 oldline = line 

42 line = _match_bracket.sub("", line) 

43 while oldline != line: 

44 oldline = line 

45 line = _match_bracket.sub("", line) 

46 

47 # remove everything after last open bracket 

48 line = _end_bracket.sub("", line) 

49 matches = _identifier.findall(line) 

50 if matches: 

51 return matches[-1] 

52 else: 

53 return "" 

54 

55 

56class Adapter: 

57 """Base class for adapting messages 

58 

59 Override message_type(msg) methods to create adapters. 

60 """ 

61 

62 msg_type_map: Dict[str, str] = {} 

63 

64 def update_header(self, msg: Dict[str, Any]) -> Dict[str, Any]: 

65 """Update the header.""" 

66 return msg 

67 

68 def update_metadata(self, msg: Dict[str, Any]) -> Dict[str, Any]: 

69 """Update the metadata.""" 

70 return msg 

71 

72 def update_msg_type(self, msg: Dict[str, Any]) -> Dict[str, Any]: 

73 """Update the message type.""" 

74 header = msg["header"] 

75 msg_type = header["msg_type"] 

76 if msg_type in self.msg_type_map: 

77 msg["msg_type"] = header["msg_type"] = self.msg_type_map[msg_type] 

78 return msg 

79 

80 def handle_reply_status_error(self, msg: Dict[str, Any]) -> Dict[str, Any]: 

81 """This will be called *instead of* the regular handler 

82 

83 on any reply with status != ok 

84 """ 

85 return msg 

86 

87 def __call__(self, msg: Dict[str, Any]) -> Dict[str, Any]: 

88 msg = self.update_header(msg) 

89 msg = self.update_metadata(msg) 

90 msg = self.update_msg_type(msg) 

91 header = msg["header"] 

92 

93 handler = getattr(self, header["msg_type"], None) 

94 if handler is None: 

95 return msg 

96 

97 # handle status=error replies separately (no change, at present) 

98 if msg["content"].get("status", None) in {"error", "aborted"}: 

99 return self.handle_reply_status_error(msg) 

100 return handler(msg) 

101 

102 

103def _version_str_to_list(version: str) -> List[int]: 

104 """convert a version string to a list of ints 

105 

106 non-int segments are excluded 

107 """ 

108 v = [] 

109 for part in version.split("."): 

110 try: 

111 v.append(int(part)) 

112 except ValueError: 

113 pass 

114 return v 

115 

116 

117class V5toV4(Adapter): 

118 """Adapt msg protocol v5 to v4""" 

119 

120 version = "4.1" 

121 

122 msg_type_map = { 

123 "execute_result": "pyout", 

124 "execute_input": "pyin", 

125 "error": "pyerr", 

126 "inspect_request": "object_info_request", 

127 "inspect_reply": "object_info_reply", 

128 } 

129 

130 def update_header(self, msg: Dict[str, Any]) -> Dict[str, Any]: 

131 """Update the header.""" 

132 msg["header"].pop("version", None) 

133 msg["parent_header"].pop("version", None) 

134 return msg 

135 

136 # shell channel 

137 

138 def kernel_info_reply(self, msg: Dict[str, Any]) -> Dict[str, Any]: 

139 """Handle a kernel info reply.""" 

140 v4c = {} 

141 content = msg["content"] 

142 for key in ("language_version", "protocol_version"): 

143 if key in content: 

144 v4c[key] = _version_str_to_list(content[key]) 

145 if content.get("implementation", "") == "ipython" and "implementation_version" in content: 

146 v4c["ipython_version"] = _version_str_to_list(content["implementation_version"]) 

147 language_info = content.get("language_info", {}) 

148 language = language_info.get("name", "") 

149 v4c.setdefault("language", language) 

150 if "version" in language_info: 

151 v4c.setdefault("language_version", _version_str_to_list(language_info["version"])) 

152 msg["content"] = v4c 

153 return msg 

154 

155 def execute_request(self, msg: Dict[str, Any]) -> Dict[str, Any]: 

156 """Handle an execute request.""" 

157 content = msg["content"] 

158 content.setdefault("user_variables", []) 

159 return msg 

160 

161 def execute_reply(self, msg: Dict[str, Any]) -> Dict[str, Any]: 

162 """Handle an execute reply.""" 

163 content = msg["content"] 

164 content.setdefault("user_variables", {}) 

165 # TODO: handle payloads 

166 return msg 

167 

168 def complete_request(self, msg: Dict[str, Any]) -> Dict[str, Any]: 

169 """Handle a complete request.""" 

170 content = msg["content"] 

171 code = content["code"] 

172 cursor_pos = content["cursor_pos"] 

173 line, cursor_pos = code_to_line(code, cursor_pos) 

174 

175 new_content = msg["content"] = {} 

176 new_content["text"] = "" 

177 new_content["line"] = line 

178 new_content["block"] = None 

179 new_content["cursor_pos"] = cursor_pos 

180 return msg 

181 

182 def complete_reply(self, msg: Dict[str, Any]) -> Dict[str, Any]: 

183 """Handle a complete reply.""" 

184 content = msg["content"] 

185 cursor_start = content.pop("cursor_start") 

186 cursor_end = content.pop("cursor_end") 

187 match_len = cursor_end - cursor_start 

188 content["matched_text"] = content["matches"][0][:match_len] 

189 content.pop("metadata", None) 

190 return msg 

191 

192 def object_info_request(self, msg: Dict[str, Any]) -> Dict[str, Any]: 

193 """Handle an object info request.""" 

194 content = msg["content"] 

195 code = content["code"] 

196 cursor_pos = content["cursor_pos"] 

197 line, _ = code_to_line(code, cursor_pos) 

198 

199 new_content = msg["content"] = {} 

200 new_content["oname"] = extract_oname_v4(code, cursor_pos) 

201 new_content["detail_level"] = content["detail_level"] 

202 return msg 

203 

204 def object_info_reply(self, msg: Dict[str, Any]) -> Dict[str, Any]: 

205 """inspect_reply can't be easily backward compatible""" 

206 msg["content"] = {"found": False, "oname": "unknown"} 

207 return msg 

208 

209 # iopub channel 

210 

211 def stream(self, msg: Dict[str, Any]) -> Dict[str, Any]: 

212 """Handle a stream message.""" 

213 content = msg["content"] 

214 content["data"] = content.pop("text") 

215 return msg 

216 

217 def display_data(self, msg: Dict[str, Any]) -> Dict[str, Any]: 

218 """Handle a display data message.""" 

219 content = msg["content"] 

220 content.setdefault("source", "display") 

221 data = content["data"] 

222 if "application/json" in data: 

223 try: 

224 data["application/json"] = json.dumps(data["application/json"]) 

225 except Exception: 

226 # warn? 

227 pass 

228 return msg 

229 

230 # stdin channel 

231 

232 def input_request(self, msg: Dict[str, Any]) -> Dict[str, Any]: 

233 """Handle an input request.""" 

234 msg["content"].pop("password", None) 

235 return msg 

236 

237 

238class V4toV5(Adapter): 

239 """Convert msg spec V4 to V5""" 

240 

241 version = "5.0" 

242 

243 # invert message renames above 

244 msg_type_map = {v: k for k, v in V5toV4.msg_type_map.items()} 

245 

246 def update_header(self, msg: Dict[str, Any]) -> Dict[str, Any]: 

247 """Update the header.""" 

248 msg["header"]["version"] = self.version 

249 if msg["parent_header"]: 

250 msg["parent_header"]["version"] = self.version 

251 return msg 

252 

253 # shell channel 

254 

255 def kernel_info_reply(self, msg: Dict[str, Any]) -> Dict[str, Any]: 

256 """Handle a kernel info reply.""" 

257 content = msg["content"] 

258 for key in ("protocol_version", "ipython_version"): 

259 if key in content: 

260 content[key] = ".".join(map(str, content[key])) 

261 

262 content.setdefault("protocol_version", "4.1") 

263 

264 if content["language"].startswith("python") and "ipython_version" in content: 

265 content["implementation"] = "ipython" 

266 content["implementation_version"] = content.pop("ipython_version") 

267 

268 language = content.pop("language") 

269 language_info = content.setdefault("language_info", {}) 

270 language_info.setdefault("name", language) 

271 if "language_version" in content: 

272 language_version = ".".join(map(str, content.pop("language_version"))) 

273 language_info.setdefault("version", language_version) 

274 

275 content["banner"] = "" 

276 return msg 

277 

278 def execute_request(self, msg: Dict[str, Any]) -> Dict[str, Any]: 

279 """Handle an execute request.""" 

280 content = msg["content"] 

281 user_variables = content.pop("user_variables", []) 

282 user_expressions = content.setdefault("user_expressions", {}) 

283 for v in user_variables: 

284 user_expressions[v] = v 

285 return msg 

286 

287 def execute_reply(self, msg: Dict[str, Any]) -> Dict[str, Any]: 

288 """Handle an execute reply.""" 

289 content = msg["content"] 

290 user_expressions = content.setdefault("user_expressions", {}) 

291 user_variables = content.pop("user_variables", {}) 

292 if user_variables: 

293 user_expressions.update(user_variables) 

294 

295 # Pager payloads became a mime bundle 

296 for payload in content.get("payload", []): 

297 if payload.get("source", None) == "page" and ("text" in payload): 

298 if "data" not in payload: 

299 payload["data"] = {} 

300 payload["data"]["text/plain"] = payload.pop("text") 

301 

302 return msg 

303 

304 def complete_request(self, msg: Dict[str, Any]) -> Dict[str, Any]: 

305 """Handle a complete request.""" 

306 old_content = msg["content"] 

307 

308 new_content = msg["content"] = {} 

309 new_content["code"] = old_content["line"] 

310 new_content["cursor_pos"] = old_content["cursor_pos"] 

311 return msg 

312 

313 def complete_reply(self, msg: Dict[str, Any]) -> Dict[str, Any]: 

314 """Handle a complete reply.""" 

315 # complete_reply needs more context than we have to get cursor_start and end. 

316 # use special end=null to indicate current cursor position and negative offset 

317 # for start relative to the cursor. 

318 # start=None indicates that start == end (accounts for no -0). 

319 content = msg["content"] 

320 new_content = msg["content"] = {"status": "ok"} 

321 new_content["matches"] = content["matches"] 

322 if content["matched_text"]: 

323 new_content["cursor_start"] = -len(content["matched_text"]) 

324 else: 

325 # no -0, use None to indicate that start == end 

326 new_content["cursor_start"] = None 

327 new_content["cursor_end"] = None 

328 new_content["metadata"] = {} 

329 return msg 

330 

331 def inspect_request(self, msg: Dict[str, Any]) -> Dict[str, Any]: 

332 """Handle an inspect request.""" 

333 content = msg["content"] 

334 name = content["oname"] 

335 

336 new_content = msg["content"] = {} 

337 new_content["code"] = name 

338 new_content["cursor_pos"] = len(name) 

339 new_content["detail_level"] = content["detail_level"] 

340 return msg 

341 

342 def inspect_reply(self, msg: Dict[str, Any]) -> Dict[str, Any]: 

343 """inspect_reply can't be easily backward compatible""" 

344 content = msg["content"] 

345 new_content = msg["content"] = {"status": "ok"} 

346 found = new_content["found"] = content["found"] 

347 new_content["data"] = data = {} 

348 new_content["metadata"] = {} 

349 if found: 

350 lines = [] 

351 for key in ("call_def", "init_definition", "definition"): 

352 if content.get(key, False): 

353 lines.append(content[key]) 

354 break 

355 for key in ("call_docstring", "init_docstring", "docstring"): 

356 if content.get(key, False): 

357 lines.append(content[key]) 

358 break 

359 if not lines: 

360 lines.append("<empty docstring>") 

361 data["text/plain"] = "\n".join(lines) 

362 return msg 

363 

364 # iopub channel 

365 

366 def stream(self, msg: Dict[str, Any]) -> Dict[str, Any]: 

367 """Handle a stream message.""" 

368 content = msg["content"] 

369 content["text"] = content.pop("data") 

370 return msg 

371 

372 def display_data(self, msg: Dict[str, Any]) -> Dict[str, Any]: 

373 """Handle display data.""" 

374 content = msg["content"] 

375 content.pop("source", None) 

376 data = content["data"] 

377 if "application/json" in data: 

378 try: 

379 data["application/json"] = json.loads(data["application/json"]) 

380 except Exception: 

381 # warn? 

382 pass 

383 return msg 

384 

385 # stdin channel 

386 

387 def input_request(self, msg: Dict[str, Any]) -> Dict[str, Any]: 

388 """Handle an input request.""" 

389 msg["content"].setdefault("password", False) 

390 return msg 

391 

392 

393def adapt(msg: Dict[str, Any], to_version: int = protocol_version_info[0]) -> Dict[str, Any]: 

394 """Adapt a single message to a target version 

395 

396 Parameters 

397 ---------- 

398 

399 msg : dict 

400 A Jupyter message. 

401 to_version : int, optional 

402 The target major version. 

403 If unspecified, adapt to the current version. 

404 

405 Returns 

406 ------- 

407 

408 msg : dict 

409 A Jupyter message appropriate in the new version. 

410 """ 

411 from .session import utcnow 

412 

413 header = msg["header"] 

414 if "date" not in header: 

415 header["date"] = utcnow() 

416 if "version" in header: 

417 from_version = int(header["version"].split(".")[0]) 

418 else: 

419 # assume last version before adding the key to the header 

420 from_version = 4 

421 adapter = adapters.get((from_version, to_version), None) 

422 if adapter is None: 

423 return msg 

424 return adapter(msg) 

425 

426 

427# one adapter per major version from,to 

428adapters = { 

429 (5, 4): V5toV4(), 

430 (4, 5): V4toV5(), 

431}