1"""Adapters for Jupyter msg spec versions."""
2
3# Copyright (c) Jupyter Development Team.
4# Distributed under the terms of the Modified BSD License.
5import json
6import re
7from typing import Any
8
9from ._version import protocol_version_info
10
11
12def code_to_line(code: str, cursor_pos: int) -> tuple[str, int]:
13 """Turn a multiline code block and cursor position into a single line
14 and new cursor position.
15
16 For adapting ``complete_`` and ``object_info_request``.
17 """
18 if not code:
19 return "", 0
20 for line in code.splitlines(True):
21 n = len(line)
22 if cursor_pos > n:
23 cursor_pos -= n
24 else:
25 break
26 return line, cursor_pos
27
28
29_match_bracket = re.compile(r"\([^\(\)]+\)", re.UNICODE)
30_end_bracket = re.compile(r"\([^\(]*$", re.UNICODE)
31_identifier = re.compile(r"[a-z_][0-9a-z._]*", re.I | re.UNICODE)
32
33
34def extract_oname_v4(code: str, cursor_pos: int) -> str:
35 """Reimplement token-finding logic from IPython 2.x javascript
36
37 for adapting object_info_request from v5 to v4
38 """
39
40 line, _ = code_to_line(code, cursor_pos)
41
42 oldline = line
43 line = _match_bracket.sub("", line)
44 while oldline != line:
45 oldline = line
46 line = _match_bracket.sub("", line)
47
48 # remove everything after last open bracket
49 line = _end_bracket.sub("", line)
50 matches = _identifier.findall(line)
51 if matches:
52 return matches[-1]
53 else:
54 return ""
55
56
57class Adapter:
58 """Base class for adapting messages
59
60 Override message_type(msg) methods to create adapters.
61 """
62
63 msg_type_map: dict[str, str] = {}
64
65 def update_header(self, msg: dict[str, Any]) -> dict[str, Any]:
66 """Update the header."""
67 return msg
68
69 def update_metadata(self, msg: dict[str, Any]) -> dict[str, Any]:
70 """Update the metadata."""
71 return msg
72
73 def update_msg_type(self, msg: dict[str, Any]) -> dict[str, Any]:
74 """Update the message type."""
75 header = msg["header"]
76 msg_type = header["msg_type"]
77 if msg_type in self.msg_type_map:
78 msg["msg_type"] = header["msg_type"] = self.msg_type_map[msg_type]
79 return msg
80
81 def handle_reply_status_error(self, msg: dict[str, Any]) -> dict[str, Any]:
82 """This will be called *instead of* the regular handler
83
84 on any reply with status != ok
85 """
86 return msg
87
88 def __call__(self, msg: dict[str, Any]) -> dict[str, Any]:
89 msg = self.update_header(msg)
90 msg = self.update_metadata(msg)
91 msg = self.update_msg_type(msg)
92 header = msg["header"]
93
94 handler = getattr(self, header["msg_type"], None)
95 if handler is None:
96 return msg
97
98 # handle status=error replies separately (no change, at present)
99 if msg["content"].get("status", None) in {"error", "aborted"}:
100 return self.handle_reply_status_error(msg)
101 return handler(msg)
102
103
104def _version_str_to_list(version: str) -> list[int]:
105 """convert a version string to a list of ints
106
107 non-int segments are excluded
108 """
109 v = []
110 for part in version.split("."):
111 try:
112 v.append(int(part))
113 except ValueError:
114 pass
115 return v
116
117
118class V5toV4(Adapter):
119 """Adapt msg protocol v5 to v4"""
120
121 version = "4.1"
122
123 msg_type_map = {
124 "execute_result": "pyout",
125 "execute_input": "pyin",
126 "error": "pyerr",
127 "inspect_request": "object_info_request",
128 "inspect_reply": "object_info_reply",
129 }
130
131 def update_header(self, msg: dict[str, Any]) -> dict[str, Any]:
132 """Update the header."""
133 msg["header"].pop("version", None)
134 msg["parent_header"].pop("version", None)
135 return msg
136
137 # shell channel
138
139 def kernel_info_reply(self, msg: dict[str, Any]) -> dict[str, Any]:
140 """Handle a kernel info reply."""
141 v4c = {}
142 content = msg["content"]
143 for key in ("language_version", "protocol_version"):
144 if key in content:
145 v4c[key] = _version_str_to_list(content[key])
146 if content.get("implementation", "") == "ipython" and "implementation_version" in content:
147 v4c["ipython_version"] = _version_str_to_list(content["implementation_version"])
148 language_info = content.get("language_info", {})
149 language = language_info.get("name", "")
150 v4c.setdefault("language", language)
151 if "version" in language_info:
152 v4c.setdefault("language_version", _version_str_to_list(language_info["version"]))
153 msg["content"] = v4c
154 return msg
155
156 def execute_request(self, msg: dict[str, Any]) -> dict[str, Any]:
157 """Handle an execute request."""
158 content = msg["content"]
159 content.setdefault("user_variables", [])
160 return msg
161
162 def execute_reply(self, msg: dict[str, Any]) -> dict[str, Any]:
163 """Handle an execute reply."""
164 content = msg["content"]
165 content.setdefault("user_variables", {})
166 # TODO: handle payloads
167 return msg
168
169 def complete_request(self, msg: dict[str, Any]) -> dict[str, Any]:
170 """Handle a complete request."""
171 content = msg["content"]
172 code = content["code"]
173 cursor_pos = content["cursor_pos"]
174 line, cursor_pos = code_to_line(code, cursor_pos)
175
176 new_content = msg["content"] = {}
177 new_content["text"] = ""
178 new_content["line"] = line
179 new_content["block"] = None
180 new_content["cursor_pos"] = cursor_pos
181 return msg
182
183 def complete_reply(self, msg: dict[str, Any]) -> dict[str, Any]:
184 """Handle a complete reply."""
185 content = msg["content"]
186 cursor_start = content.pop("cursor_start")
187 cursor_end = content.pop("cursor_end")
188 match_len = cursor_end - cursor_start
189 content["matched_text"] = content["matches"][0][:match_len]
190 content.pop("metadata", None)
191 return msg
192
193 def object_info_request(self, msg: dict[str, Any]) -> dict[str, Any]:
194 """Handle an object info request."""
195 content = msg["content"]
196 code = content["code"]
197 cursor_pos = content["cursor_pos"]
198 _line, _ = code_to_line(code, cursor_pos)
199
200 new_content = msg["content"] = {}
201 new_content["oname"] = extract_oname_v4(code, cursor_pos)
202 new_content["detail_level"] = content["detail_level"]
203 return msg
204
205 def object_info_reply(self, msg: dict[str, Any]) -> dict[str, Any]:
206 """inspect_reply can't be easily backward compatible"""
207 msg["content"] = {"found": False, "oname": "unknown"}
208 return msg
209
210 # iopub channel
211
212 def stream(self, msg: dict[str, Any]) -> dict[str, Any]:
213 """Handle a stream message."""
214 content = msg["content"]
215 content["data"] = content.pop("text")
216 return msg
217
218 def display_data(self, msg: dict[str, Any]) -> dict[str, Any]:
219 """Handle a display data message."""
220 content = msg["content"]
221 content.setdefault("source", "display")
222 data = content["data"]
223 if "application/json" in data:
224 try:
225 data["application/json"] = json.dumps(data["application/json"])
226 except Exception:
227 # warn?
228 pass
229 return msg
230
231 # stdin channel
232
233 def input_request(self, msg: dict[str, Any]) -> dict[str, Any]:
234 """Handle an input request."""
235 msg["content"].pop("password", None)
236 return msg
237
238
239class V4toV5(Adapter):
240 """Convert msg spec V4 to V5"""
241
242 version = "5.0"
243
244 # invert message renames above
245 msg_type_map = {v: k for k, v in V5toV4.msg_type_map.items()}
246
247 def update_header(self, msg: dict[str, Any]) -> dict[str, Any]:
248 """Update the header."""
249 msg["header"]["version"] = self.version
250 if msg["parent_header"]:
251 msg["parent_header"]["version"] = self.version
252 return msg
253
254 # shell channel
255
256 def kernel_info_reply(self, msg: dict[str, Any]) -> dict[str, Any]:
257 """Handle a kernel info reply."""
258 content = msg["content"]
259 for key in ("protocol_version", "ipython_version"):
260 if key in content:
261 content[key] = ".".join(map(str, content[key]))
262
263 content.setdefault("protocol_version", "4.1")
264
265 if content["language"].startswith("python") and "ipython_version" in content:
266 content["implementation"] = "ipython"
267 content["implementation_version"] = content.pop("ipython_version")
268
269 language = content.pop("language")
270 language_info = content.setdefault("language_info", {})
271 language_info.setdefault("name", language)
272 if "language_version" in content:
273 language_version = ".".join(map(str, content.pop("language_version")))
274 language_info.setdefault("version", language_version)
275
276 content["banner"] = ""
277 return msg
278
279 def execute_request(self, msg: dict[str, Any]) -> dict[str, Any]:
280 """Handle an execute request."""
281 content = msg["content"]
282 user_variables = content.pop("user_variables", [])
283 user_expressions = content.setdefault("user_expressions", {})
284 for v in user_variables:
285 user_expressions[v] = v
286 return msg
287
288 def execute_reply(self, msg: dict[str, Any]) -> dict[str, Any]:
289 """Handle an execute reply."""
290 content = msg["content"]
291 user_expressions = content.setdefault("user_expressions", {})
292 user_variables = content.pop("user_variables", {})
293 if user_variables:
294 user_expressions.update(user_variables)
295
296 # Pager payloads became a mime bundle
297 for payload in content.get("payload", []):
298 if payload.get("source", None) == "page" and ("text" in payload):
299 if "data" not in payload:
300 payload["data"] = {}
301 payload["data"]["text/plain"] = payload.pop("text")
302
303 return msg
304
305 def complete_request(self, msg: dict[str, Any]) -> dict[str, Any]:
306 """Handle a complete request."""
307 old_content = msg["content"]
308
309 new_content = msg["content"] = {}
310 new_content["code"] = old_content["line"]
311 new_content["cursor_pos"] = old_content["cursor_pos"]
312 return msg
313
314 def complete_reply(self, msg: dict[str, Any]) -> dict[str, Any]:
315 """Handle a complete reply."""
316 # complete_reply needs more context than we have to get cursor_start and end.
317 # use special end=null to indicate current cursor position and negative offset
318 # for start relative to the cursor.
319 # start=None indicates that start == end (accounts for no -0).
320 content = msg["content"]
321 new_content = msg["content"] = {"status": "ok"}
322 new_content["matches"] = content["matches"]
323 if content["matched_text"]:
324 new_content["cursor_start"] = -len(content["matched_text"])
325 else:
326 # no -0, use None to indicate that start == end
327 new_content["cursor_start"] = None
328 new_content["cursor_end"] = None
329 new_content["metadata"] = {}
330 return msg
331
332 def inspect_request(self, msg: dict[str, Any]) -> dict[str, Any]:
333 """Handle an inspect request."""
334 content = msg["content"]
335 name = content["oname"]
336
337 new_content = msg["content"] = {}
338 new_content["code"] = name
339 new_content["cursor_pos"] = len(name)
340 new_content["detail_level"] = content["detail_level"]
341 return msg
342
343 def inspect_reply(self, msg: dict[str, Any]) -> dict[str, Any]:
344 """inspect_reply can't be easily backward compatible"""
345 content = msg["content"]
346 new_content = msg["content"] = {"status": "ok"}
347 found = new_content["found"] = content["found"]
348 new_content["data"] = data = {}
349 new_content["metadata"] = {}
350 if found:
351 lines = []
352 for key in ("call_def", "init_definition", "definition"):
353 if content.get(key, False):
354 lines.append(content[key])
355 break
356 for key in ("call_docstring", "init_docstring", "docstring"):
357 if content.get(key, False):
358 lines.append(content[key])
359 break
360 if not lines:
361 lines.append("<empty docstring>")
362 data["text/plain"] = "\n".join(lines)
363 return msg
364
365 # iopub channel
366
367 def stream(self, msg: dict[str, Any]) -> dict[str, Any]:
368 """Handle a stream message."""
369 content = msg["content"]
370 content["text"] = content.pop("data")
371 return msg
372
373 def display_data(self, msg: dict[str, Any]) -> dict[str, Any]:
374 """Handle display data."""
375 content = msg["content"]
376 content.pop("source", None)
377 data = content["data"]
378 if "application/json" in data:
379 try:
380 data["application/json"] = json.loads(data["application/json"])
381 except Exception:
382 # warn?
383 pass
384 return msg
385
386 # stdin channel
387
388 def input_request(self, msg: dict[str, Any]) -> dict[str, Any]:
389 """Handle an input request."""
390 msg["content"].setdefault("password", False)
391 return msg
392
393
394def adapt(msg: dict[str, Any], to_version: int = protocol_version_info[0]) -> dict[str, Any]:
395 """Adapt a single message to a target version
396
397 Parameters
398 ----------
399
400 msg : dict
401 A Jupyter message.
402 to_version : int, optional
403 The target major version.
404 If unspecified, adapt to the current version.
405
406 Returns
407 -------
408
409 msg : dict
410 A Jupyter message appropriate in the new version.
411 """
412 from .session import utcnow
413
414 header = msg["header"]
415 if "date" not in header:
416 header["date"] = utcnow()
417 if "version" in header:
418 from_version = int(header["version"].split(".")[0])
419 else:
420 # assume last version before adding the key to the header
421 from_version = 4
422 adapter = adapters.get((from_version, to_version), None)
423 if adapter is None:
424 return msg
425 return adapter(msg)
426
427
428# one adapter per major version from,to
429adapters = {
430 (5, 4): V5toV4(),
431 (4, 5): V4toV5(),
432}