1"""Base classes and utilities for readers and writers."""
2
3# Copyright (c) IPython Development Team.
4# Distributed under the terms of the Modified BSD License.
5from __future__ import annotations
6
7
8def _is_json_mime(mime):
9 """Is a key a JSON mime-type that should be left alone?"""
10 return mime == "application/json" or (
11 mime.startswith("application/") and mime.endswith("+json")
12 )
13
14
15def _rejoin_mimebundle(data):
16 """Rejoin the multi-line string fields in a mimebundle (in-place)"""
17 for key, value in list(data.items()):
18 if (
19 not _is_json_mime(key)
20 and isinstance(value, list)
21 and all(isinstance(line, str) for line in value)
22 ):
23 data[key] = "".join(value)
24 return data
25
26
27def rejoin_lines(nb):
28 """rejoin multiline text into strings
29
30 For reversing effects of ``split_lines(nb)``.
31
32 This only rejoins lines that have been split, so if text objects were not split
33 they will pass through unchanged.
34
35 Used when reading JSON files that may have been passed through split_lines.
36 """
37 for cell in nb.cells:
38 if "source" in cell and isinstance(cell.source, list):
39 cell.source = "".join(cell.source)
40
41 attachments = cell.get("attachments", {})
42 for _, attachment in attachments.items():
43 _rejoin_mimebundle(attachment)
44
45 if cell.get("cell_type", None) == "code":
46 for output in cell.get("outputs", []):
47 output_type = output.get("output_type", "")
48 if output_type in {"execute_result", "display_data"}:
49 _rejoin_mimebundle(output.get("data", {}))
50 elif output_type and isinstance(output.get("text", ""), list):
51 output.text = "".join(output.text)
52 return nb
53
54
55_non_text_split_mimes = {
56 "application/javascript",
57 "image/svg+xml",
58}
59
60
61def _split_mimebundle(data):
62 """Split multi-line string fields in a mimebundle (in-place)"""
63 for key, value in list(data.items()):
64 if isinstance(value, str) and (key.startswith("text/") or key in _non_text_split_mimes):
65 data[key] = value.splitlines(True)
66 return data
67
68
69def split_lines(nb):
70 """split likely multiline text into lists of strings
71
72 For file output more friendly to line-based VCS. ``rejoin_lines(nb)`` will
73 reverse the effects of ``split_lines(nb)``.
74
75 Used when writing JSON files.
76 """
77 for cell in nb.cells:
78 source = cell.get("source", None)
79 if isinstance(source, str):
80 cell["source"] = source.splitlines(True)
81
82 attachments = cell.get("attachments", {})
83 for _, attachment in attachments.items():
84 _split_mimebundle(attachment)
85
86 if cell.cell_type == "code":
87 for output in cell.outputs:
88 if output.output_type in {"execute_result", "display_data"}:
89 _split_mimebundle(output.get("data", {}))
90 elif output.output_type == "stream" and isinstance(output.text, str):
91 output.text = output.text.splitlines(True)
92 return nb
93
94
95def strip_transient(nb):
96 """Strip transient values that shouldn't be stored in files.
97
98 This should be called in *both* read and write.
99 """
100 nb.metadata.pop("orig_nbformat", None)
101 nb.metadata.pop("orig_nbformat_minor", None)
102 nb.metadata.pop("signature", None)
103 for cell in nb.cells:
104 cell.metadata.pop("trusted", None)
105 return nb
106
107
108class NotebookReader:
109 """A class for reading notebooks."""
110
111 def reads(self, s, **kwargs):
112 """Read a notebook from a string."""
113 msg = "reads must be implemented in a subclass"
114 raise NotImplementedError(msg)
115
116 def read(self, fp, **kwargs):
117 """Read a notebook from a file like object"""
118 nbs = fp.read()
119 return self.reads(nbs, **kwargs)
120
121
122class NotebookWriter:
123 """A class for writing notebooks."""
124
125 def writes(self, nb, **kwargs):
126 """Write a notebook to a string."""
127 msg = "writes must be implemented in a subclass"
128 raise NotImplementedError(msg)
129
130 def write(self, nb, fp, **kwargs):
131 """Write a notebook to a file like object"""
132 nbs = self.writes(nb, **kwargs)
133 return fp.write(nbs)