1"""Adapters for Jupyter msg spec versions."""
2# Copyright (c) Jupyter Development Team.
3# Distributed under the terms of the Modified BSD License.
4import json
5import re
6from typing import Any, Dict, List, Tuple
7
8from ._version import protocol_version_info
9
10
11def code_to_line(code: str, cursor_pos: int) -> Tuple[str, int]:
12 """Turn a multiline code block and cursor position into a single line
13 and new cursor position.
14
15 For adapting ``complete_`` and ``object_info_request``.
16 """
17 if not code:
18 return "", 0
19 for line in code.splitlines(True):
20 n = len(line)
21 if cursor_pos > n:
22 cursor_pos -= n
23 else:
24 break
25 return line, cursor_pos
26
27
28_match_bracket = re.compile(r"\([^\(\)]+\)", re.UNICODE)
29_end_bracket = re.compile(r"\([^\(]*$", re.UNICODE)
30_identifier = re.compile(r"[a-z_][0-9a-z._]*", re.I | re.UNICODE)
31
32
33def extract_oname_v4(code: str, cursor_pos: int) -> str:
34 """Reimplement token-finding logic from IPython 2.x javascript
35
36 for adapting object_info_request from v5 to v4
37 """
38
39 line, _ = code_to_line(code, cursor_pos)
40
41 oldline = line
42 line = _match_bracket.sub("", line)
43 while oldline != line:
44 oldline = line
45 line = _match_bracket.sub("", line)
46
47 # remove everything after last open bracket
48 line = _end_bracket.sub("", line)
49 matches = _identifier.findall(line)
50 if matches:
51 return matches[-1]
52 else:
53 return ""
54
55
56class Adapter:
57 """Base class for adapting messages
58
59 Override message_type(msg) methods to create adapters.
60 """
61
62 msg_type_map: Dict[str, str] = {}
63
64 def update_header(self, msg: Dict[str, Any]) -> Dict[str, Any]:
65 """Update the header."""
66 return msg
67
68 def update_metadata(self, msg: Dict[str, Any]) -> Dict[str, Any]:
69 """Update the metadata."""
70 return msg
71
72 def update_msg_type(self, msg: Dict[str, Any]) -> Dict[str, Any]:
73 """Update the message type."""
74 header = msg["header"]
75 msg_type = header["msg_type"]
76 if msg_type in self.msg_type_map:
77 msg["msg_type"] = header["msg_type"] = self.msg_type_map[msg_type]
78 return msg
79
80 def handle_reply_status_error(self, msg: Dict[str, Any]) -> Dict[str, Any]:
81 """This will be called *instead of* the regular handler
82
83 on any reply with status != ok
84 """
85 return msg
86
87 def __call__(self, msg: Dict[str, Any]) -> Dict[str, Any]:
88 msg = self.update_header(msg)
89 msg = self.update_metadata(msg)
90 msg = self.update_msg_type(msg)
91 header = msg["header"]
92
93 handler = getattr(self, header["msg_type"], None)
94 if handler is None:
95 return msg
96
97 # handle status=error replies separately (no change, at present)
98 if msg["content"].get("status", None) in {"error", "aborted"}:
99 return self.handle_reply_status_error(msg)
100 return handler(msg)
101
102
103def _version_str_to_list(version: str) -> List[int]:
104 """convert a version string to a list of ints
105
106 non-int segments are excluded
107 """
108 v = []
109 for part in version.split("."):
110 try:
111 v.append(int(part))
112 except ValueError:
113 pass
114 return v
115
116
117class V5toV4(Adapter):
118 """Adapt msg protocol v5 to v4"""
119
120 version = "4.1"
121
122 msg_type_map = {
123 "execute_result": "pyout",
124 "execute_input": "pyin",
125 "error": "pyerr",
126 "inspect_request": "object_info_request",
127 "inspect_reply": "object_info_reply",
128 }
129
130 def update_header(self, msg: Dict[str, Any]) -> Dict[str, Any]:
131 """Update the header."""
132 msg["header"].pop("version", None)
133 msg["parent_header"].pop("version", None)
134 return msg
135
136 # shell channel
137
138 def kernel_info_reply(self, msg: Dict[str, Any]) -> Dict[str, Any]:
139 """Handle a kernel info reply."""
140 v4c = {}
141 content = msg["content"]
142 for key in ("language_version", "protocol_version"):
143 if key in content:
144 v4c[key] = _version_str_to_list(content[key])
145 if content.get("implementation", "") == "ipython" and "implementation_version" in content:
146 v4c["ipython_version"] = _version_str_to_list(content["implementation_version"])
147 language_info = content.get("language_info", {})
148 language = language_info.get("name", "")
149 v4c.setdefault("language", language)
150 if "version" in language_info:
151 v4c.setdefault("language_version", _version_str_to_list(language_info["version"]))
152 msg["content"] = v4c
153 return msg
154
155 def execute_request(self, msg: Dict[str, Any]) -> Dict[str, Any]:
156 """Handle an execute request."""
157 content = msg["content"]
158 content.setdefault("user_variables", [])
159 return msg
160
161 def execute_reply(self, msg: Dict[str, Any]) -> Dict[str, Any]:
162 """Handle an execute reply."""
163 content = msg["content"]
164 content.setdefault("user_variables", {})
165 # TODO: handle payloads
166 return msg
167
168 def complete_request(self, msg: Dict[str, Any]) -> Dict[str, Any]:
169 """Handle a complete request."""
170 content = msg["content"]
171 code = content["code"]
172 cursor_pos = content["cursor_pos"]
173 line, cursor_pos = code_to_line(code, cursor_pos)
174
175 new_content = msg["content"] = {}
176 new_content["text"] = ""
177 new_content["line"] = line
178 new_content["block"] = None
179 new_content["cursor_pos"] = cursor_pos
180 return msg
181
182 def complete_reply(self, msg: Dict[str, Any]) -> Dict[str, Any]:
183 """Handle a complete reply."""
184 content = msg["content"]
185 cursor_start = content.pop("cursor_start")
186 cursor_end = content.pop("cursor_end")
187 match_len = cursor_end - cursor_start
188 content["matched_text"] = content["matches"][0][:match_len]
189 content.pop("metadata", None)
190 return msg
191
192 def object_info_request(self, msg: Dict[str, Any]) -> Dict[str, Any]:
193 """Handle an object info request."""
194 content = msg["content"]
195 code = content["code"]
196 cursor_pos = content["cursor_pos"]
197 line, _ = code_to_line(code, cursor_pos)
198
199 new_content = msg["content"] = {}
200 new_content["oname"] = extract_oname_v4(code, cursor_pos)
201 new_content["detail_level"] = content["detail_level"]
202 return msg
203
204 def object_info_reply(self, msg: Dict[str, Any]) -> Dict[str, Any]:
205 """inspect_reply can't be easily backward compatible"""
206 msg["content"] = {"found": False, "oname": "unknown"}
207 return msg
208
209 # iopub channel
210
211 def stream(self, msg: Dict[str, Any]) -> Dict[str, Any]:
212 """Handle a stream message."""
213 content = msg["content"]
214 content["data"] = content.pop("text")
215 return msg
216
217 def display_data(self, msg: Dict[str, Any]) -> Dict[str, Any]:
218 """Handle a display data message."""
219 content = msg["content"]
220 content.setdefault("source", "display")
221 data = content["data"]
222 if "application/json" in data:
223 try:
224 data["application/json"] = json.dumps(data["application/json"])
225 except Exception:
226 # warn?
227 pass
228 return msg
229
230 # stdin channel
231
232 def input_request(self, msg: Dict[str, Any]) -> Dict[str, Any]:
233 """Handle an input request."""
234 msg["content"].pop("password", None)
235 return msg
236
237
238class V4toV5(Adapter):
239 """Convert msg spec V4 to V5"""
240
241 version = "5.0"
242
243 # invert message renames above
244 msg_type_map = {v: k for k, v in V5toV4.msg_type_map.items()}
245
246 def update_header(self, msg: Dict[str, Any]) -> Dict[str, Any]:
247 """Update the header."""
248 msg["header"]["version"] = self.version
249 if msg["parent_header"]:
250 msg["parent_header"]["version"] = self.version
251 return msg
252
253 # shell channel
254
255 def kernel_info_reply(self, msg: Dict[str, Any]) -> Dict[str, Any]:
256 """Handle a kernel info reply."""
257 content = msg["content"]
258 for key in ("protocol_version", "ipython_version"):
259 if key in content:
260 content[key] = ".".join(map(str, content[key]))
261
262 content.setdefault("protocol_version", "4.1")
263
264 if content["language"].startswith("python") and "ipython_version" in content:
265 content["implementation"] = "ipython"
266 content["implementation_version"] = content.pop("ipython_version")
267
268 language = content.pop("language")
269 language_info = content.setdefault("language_info", {})
270 language_info.setdefault("name", language)
271 if "language_version" in content:
272 language_version = ".".join(map(str, content.pop("language_version")))
273 language_info.setdefault("version", language_version)
274
275 content["banner"] = ""
276 return msg
277
278 def execute_request(self, msg: Dict[str, Any]) -> Dict[str, Any]:
279 """Handle an execute request."""
280 content = msg["content"]
281 user_variables = content.pop("user_variables", [])
282 user_expressions = content.setdefault("user_expressions", {})
283 for v in user_variables:
284 user_expressions[v] = v
285 return msg
286
287 def execute_reply(self, msg: Dict[str, Any]) -> Dict[str, Any]:
288 """Handle an execute reply."""
289 content = msg["content"]
290 user_expressions = content.setdefault("user_expressions", {})
291 user_variables = content.pop("user_variables", {})
292 if user_variables:
293 user_expressions.update(user_variables)
294
295 # Pager payloads became a mime bundle
296 for payload in content.get("payload", []):
297 if payload.get("source", None) == "page" and ("text" in payload):
298 if "data" not in payload:
299 payload["data"] = {}
300 payload["data"]["text/plain"] = payload.pop("text")
301
302 return msg
303
304 def complete_request(self, msg: Dict[str, Any]) -> Dict[str, Any]:
305 """Handle a complete request."""
306 old_content = msg["content"]
307
308 new_content = msg["content"] = {}
309 new_content["code"] = old_content["line"]
310 new_content["cursor_pos"] = old_content["cursor_pos"]
311 return msg
312
313 def complete_reply(self, msg: Dict[str, Any]) -> Dict[str, Any]:
314 """Handle a complete reply."""
315 # complete_reply needs more context than we have to get cursor_start and end.
316 # use special end=null to indicate current cursor position and negative offset
317 # for start relative to the cursor.
318 # start=None indicates that start == end (accounts for no -0).
319 content = msg["content"]
320 new_content = msg["content"] = {"status": "ok"}
321 new_content["matches"] = content["matches"]
322 if content["matched_text"]:
323 new_content["cursor_start"] = -len(content["matched_text"])
324 else:
325 # no -0, use None to indicate that start == end
326 new_content["cursor_start"] = None
327 new_content["cursor_end"] = None
328 new_content["metadata"] = {}
329 return msg
330
331 def inspect_request(self, msg: Dict[str, Any]) -> Dict[str, Any]:
332 """Handle an inspect request."""
333 content = msg["content"]
334 name = content["oname"]
335
336 new_content = msg["content"] = {}
337 new_content["code"] = name
338 new_content["cursor_pos"] = len(name)
339 new_content["detail_level"] = content["detail_level"]
340 return msg
341
342 def inspect_reply(self, msg: Dict[str, Any]) -> Dict[str, Any]:
343 """inspect_reply can't be easily backward compatible"""
344 content = msg["content"]
345 new_content = msg["content"] = {"status": "ok"}
346 found = new_content["found"] = content["found"]
347 new_content["data"] = data = {}
348 new_content["metadata"] = {}
349 if found:
350 lines = []
351 for key in ("call_def", "init_definition", "definition"):
352 if content.get(key, False):
353 lines.append(content[key])
354 break
355 for key in ("call_docstring", "init_docstring", "docstring"):
356 if content.get(key, False):
357 lines.append(content[key])
358 break
359 if not lines:
360 lines.append("<empty docstring>")
361 data["text/plain"] = "\n".join(lines)
362 return msg
363
364 # iopub channel
365
366 def stream(self, msg: Dict[str, Any]) -> Dict[str, Any]:
367 """Handle a stream message."""
368 content = msg["content"]
369 content["text"] = content.pop("data")
370 return msg
371
372 def display_data(self, msg: Dict[str, Any]) -> Dict[str, Any]:
373 """Handle display data."""
374 content = msg["content"]
375 content.pop("source", None)
376 data = content["data"]
377 if "application/json" in data:
378 try:
379 data["application/json"] = json.loads(data["application/json"])
380 except Exception:
381 # warn?
382 pass
383 return msg
384
385 # stdin channel
386
387 def input_request(self, msg: Dict[str, Any]) -> Dict[str, Any]:
388 """Handle an input request."""
389 msg["content"].setdefault("password", False)
390 return msg
391
392
393def adapt(msg: Dict[str, Any], to_version: int = protocol_version_info[0]) -> Dict[str, Any]:
394 """Adapt a single message to a target version
395
396 Parameters
397 ----------
398
399 msg : dict
400 A Jupyter message.
401 to_version : int, optional
402 The target major version.
403 If unspecified, adapt to the current version.
404
405 Returns
406 -------
407
408 msg : dict
409 A Jupyter message appropriate in the new version.
410 """
411 from .session import utcnow
412
413 header = msg["header"]
414 if "date" not in header:
415 header["date"] = utcnow()
416 if "version" in header:
417 from_version = int(header["version"].split(".")[0])
418 else:
419 # assume last version before adding the key to the header
420 from_version = 4
421 adapter = adapters.get((from_version, to_version), None)
422 if adapter is None:
423 return msg
424 return adapter(msg)
425
426
427# one adapter per major version from,to
428adapters = {
429 (5, 4): V5toV4(),
430 (4, 5): V4toV5(),
431}