1import json
2from typing import IO, List, Tuple, Any
3
4from .parser import Parser
5from .symbols import (
6 Root,
7 Boolean,
8 Int,
9 RecordStart,
10 RecordEnd,
11 FieldStart,
12 FieldEnd,
13 Null,
14 String,
15 Union,
16 UnionEnd,
17 Long,
18 Float,
19 Double,
20 Bytes,
21 MapStart,
22 MapEnd,
23 MapKeyMarker,
24 Enum,
25 Fixed,
26 ArrayStart,
27 ArrayEnd,
28 ItemEnd,
29)
30
31
32class AvroJSONEncoder:
33 """Encoder for the avro JSON format.
34
35 NOTE: All attributes and methods on this class should be considered
36 private.
37
38 Parameters
39 ----------
40 fo
41 Input stream
42 write_union_type
43 Determine whether to write the union type in the json message.
44
45 """
46
47 def __init__(self, fo: IO, *, write_union_type: bool = True):
48 self._fo = fo
49 self._stack: List[Tuple[Any, str]] = []
50 self._current = None
51 self._key = None
52 self._records: List[Any] = []
53 self._write_union_type = write_union_type
54
55 def write_value(self, value):
56 if isinstance(self._current, dict):
57 if self._key:
58 self._current[self._key] = value
59 else:
60 raise Exception("No key was set")
61 elif isinstance(self._current, list):
62 self._current.append(value)
63 else:
64 # If we aren't in a dict or a list then this must be a schema which
65 # just has a single basic type
66 self._records.append(value)
67
68 def _push(self):
69 self._stack.append((self._current, self._key))
70
71 def _pop(self):
72 prev_current, prev_key = self._stack.pop()
73 if isinstance(prev_current, dict):
74 prev_current[prev_key] = self._current
75 self._current = prev_current
76 elif isinstance(prev_current, list):
77 prev_current.append(self._current)
78 self._current = prev_current
79 else:
80 assert prev_current is None
81 assert prev_key is None
82 # Back at None, we should have a full record in self._current
83 self._records.append(self._current)
84 self._current = prev_current
85 self._key = prev_key
86
87 def write_buffer(self):
88 # Newline separated
89 json_data = "\n".join([json.dumps(record) for record in self._records])
90 self._fo.write(json_data)
91
92 def configure(self, schema, named_schemas):
93 self._parser = Parser(schema, named_schemas, self.do_action)
94
95 def flush(self):
96 self._parser.flush()
97
98 def do_action(self, action):
99 if isinstance(action, RecordStart):
100 self.write_object_start()
101 elif isinstance(action, RecordEnd) or isinstance(action, UnionEnd):
102 self.write_object_end()
103 elif isinstance(action, FieldStart):
104 self.write_object_key(action.field_name)
105 elif isinstance(action, FieldEnd):
106 # TODO: Do we need a FieldEnd symbol?
107 pass
108 elif isinstance(action, Root):
109 self.write_buffer()
110 else:
111 raise Exception(f"Internal Exception: {action}")
112
113 def write_null(self):
114 self._parser.advance(Null())
115 self.write_value(None)
116
117 def write_boolean(self, value):
118 self._parser.advance(Boolean())
119 self.write_value(value)
120
121 def write_utf8(self, value):
122 self._parser.advance(String())
123 if self._parser.stack[-1] == MapKeyMarker():
124 self._parser.advance(MapKeyMarker())
125 self.write_object_key(value)
126 else:
127 self.write_value(value)
128
129 def write_int(self, value):
130 self._parser.advance(Int())
131 self.write_value(value)
132
133 def write_long(self, value):
134 self._parser.advance(Long())
135 self.write_value(value)
136
137 def write_float(self, value):
138 self._parser.advance(Float())
139 self.write_value(value)
140
141 def write_double(self, value):
142 self._parser.advance(Double())
143 self.write_value(value)
144
145 def write_bytes(self, value):
146 self._parser.advance(Bytes())
147 self.write_value(value.decode("iso-8859-1"))
148
149 def write_enum(self, index):
150 self._parser.advance(Enum())
151 enum_labels = self._parser.pop_symbol()
152 # TODO: Check symbols?
153 self.write_value(enum_labels.labels[index])
154
155 def write_fixed(self, value):
156 self._parser.advance(Fixed())
157 self.write_value(value.decode("iso-8859-1"))
158
159 def write_array_start(self):
160 self._parser.advance(ArrayStart())
161 self._push()
162 self._current = []
163
164 def write_item_count(self, length):
165 pass
166
167 def end_item(self):
168 self._parser.advance(ItemEnd())
169
170 def write_array_end(self):
171 self._parser.advance(ArrayEnd())
172 self._pop()
173
174 def write_object_start(self):
175 self._push()
176 self._current = {}
177
178 def write_object_key(self, key):
179 self._key = key
180
181 def write_object_end(self):
182 self._pop()
183
184 def write_map_start(self):
185 self._parser.advance(MapStart())
186 self.write_object_start()
187
188 def write_map_end(self):
189 self._parser.advance(MapEnd())
190 self.write_object_end()
191
192 def write_index(self, index, schema):
193 self._parser.advance(Union())
194 alternative_symbol = self._parser.pop_symbol()
195
196 symbol = alternative_symbol.get_symbol(index)
197
198 if symbol != Null() and self._write_union_type:
199 self.write_object_start()
200 self.write_object_key(alternative_symbol.get_label(index))
201 # TODO: Do we need this symbol?
202 self._parser.push_symbol(UnionEnd())
203
204 self._parser.push_symbol(symbol)