1import json
2from typing import IO, Any, Tuple, List
3
4from .parser import Parser
5from .symbols import (
6 RecordStart,
7 FieldStart,
8 Boolean,
9 Int,
10 Null,
11 String,
12 Long,
13 Float,
14 Double,
15 Bytes,
16 FieldEnd,
17 RecordEnd,
18 Union,
19 UnionEnd,
20 MapStart,
21 MapEnd,
22 MapKeyMarker,
23 Fixed,
24 ArrayStart,
25 ArrayEnd,
26 Enum,
27 ItemEnd,
28)
29
30
31class AvroJSONDecoder:
32 """Decoder for the avro JSON format.
33
34 NOTE: All attributes and methods on this class should be considered
35 private.
36
37 Parameters
38 ----------
39 fo
40 File-like object to reader from
41
42 """
43
44 def __init__(self, fo: IO):
45 self._fo = fo
46 self._stack: List[Tuple[Any, str]] = []
47 self._json_data = [json.loads(line.strip()) for line in fo]
48 if self._json_data:
49 self._current = self._json_data.pop(0)
50 self.done = False
51 else:
52 self.done = True
53 self._key = None
54
55 def read_value(self, symbol):
56 if isinstance(self._current, dict):
57 if self._key not in self._current:
58 # Use the default value
59 return symbol.get_default()
60 else:
61 return self._current[self._key]
62 else:
63 # If we aren't in a dict or a list then this must be a schema which
64 # just has a single basic type
65 return self._current
66
67 def _push(self):
68 self._stack.append((self._current, self._key))
69
70 def _push_and_adjust(self, symbol=None):
71 self._push()
72 if isinstance(self._current, dict) and self._key is not None:
73 if self._key not in self._current:
74 self._current = symbol.get_default()
75 else:
76 # self._current = self._current.pop(self._key)
77 self._current = self._current[self._key]
78
79 def _pop(self):
80 self._current, self._key = self._stack.pop()
81
82 def configure(self, schema, named_schemas):
83 self._parser = Parser(schema, named_schemas, self.do_action)
84
85 def do_action(self, action):
86 if isinstance(action, RecordStart):
87 self._push_and_adjust(action)
88 elif isinstance(action, RecordEnd):
89 self._pop()
90 elif isinstance(action, FieldStart):
91 self.read_object_key(action.field_name)
92 elif isinstance(action, FieldEnd) or isinstance(action, UnionEnd):
93 # TODO: Do we need a FieldEnd and UnionEnd symbol?
94 pass
95 else:
96 raise Exception(f"cannot handle: {action}")
97
98 def drain(self):
99 self._parser.drain_actions()
100 if self._json_data:
101 self._current = self._json_data.pop(0)
102 self._key = None
103 else:
104 self.done = True
105
106 def read_null(self):
107 symbol = self._parser.advance(Null())
108 return self.read_value(symbol)
109
110 def read_boolean(self):
111 symbol = self._parser.advance(Boolean())
112 return self.read_value(symbol)
113
114 def read_utf8(self, handle_unicode_errors="strict"):
115 symbol = self._parser.advance(String())
116 if self._parser.stack[-1] == MapKeyMarker():
117 self._parser.advance(MapKeyMarker())
118 for key in self._current:
119 self._key = key
120 break
121 return self._key
122 else:
123 return self.read_value(symbol)
124
125 def read_bytes(self):
126 symbol = self._parser.advance(Bytes())
127 return self.read_value(symbol).encode("iso-8859-1")
128
129 def read_int(self):
130 symbol = self._parser.advance(Int())
131 return self.read_value(symbol)
132
133 def read_long(self):
134 symbol = self._parser.advance(Long())
135 return self.read_value(symbol)
136
137 def read_float(self):
138 symbol = self._parser.advance(Float())
139 return self.read_value(symbol)
140
141 def read_double(self):
142 symbol = self._parser.advance(Double())
143 return self.read_value(symbol)
144
145 def read_enum(self):
146 symbol = self._parser.advance(Enum())
147 enum_labels = self._parser.pop_symbol() # pop the enumlabels
148 # TODO: Should we verify the value is one of the symbols?
149 label = self.read_value(symbol)
150 return enum_labels.labels.index(label)
151
152 def read_fixed(self, size):
153 symbol = self._parser.advance(Fixed())
154 return self.read_value(symbol).encode("iso-8859-1")
155
156 def read_map_start(self):
157 symbol = self._parser.advance(MapStart())
158 self._push_and_adjust(symbol)
159
160 def read_object_key(self, key):
161 self._key = key
162
163 def iter_map(self):
164 while len(self._current) > 0:
165 self._push()
166 for key in self._current:
167 break
168 yield
169 self._pop()
170 del self._current[key]
171
172 def read_map_end(self):
173 self._parser.advance(MapEnd())
174 self._pop()
175
176 def read_array_start(self):
177 symbol = self._parser.advance(ArrayStart())
178 self._push_and_adjust(symbol)
179 self._key = None
180
181 def read_array_end(self):
182 self._parser.advance(ArrayEnd())
183 self._pop()
184
185 def iter_array(self):
186 while len(self._current) > 0:
187 self._push()
188 self._current = self._current.pop(0)
189 yield
190 self._pop()
191 self._parser.advance(ItemEnd())
192
193 def read_index(self):
194 self._parser.advance(Union())
195 alternative_symbol = self._parser.pop_symbol()
196
197 # TODO: Try to clean this up.
198 # A JSON union is encoded like this: {"union_field": {int: 32}} and so
199 # what we are doing is trying to change that into {"union_field": 32}
200 # before eventually reading the value of "union_field"
201 if self._key is None:
202 # If self._key is None, self._current is an item in an array
203 if self._current is None:
204 label = "null"
205 else:
206 label, data = self._current.popitem()
207 self._current = data
208 # TODO: Do we need to do this?
209 self._parser.push_symbol(UnionEnd())
210 else:
211 # self._current is a JSON object and self._key should be the name
212 # of the union field
213 if self._key not in self._current:
214 self._current[self._key] = {
215 alternative_symbol.labels[0]: alternative_symbol.get_default()
216 }
217
218 if self._current[self._key] is None:
219 label = "null"
220 else:
221 label, data = self._current[self._key].popitem()
222 self._current[self._key] = data
223 # TODO: Do we need to do this?
224 self._parser.push_symbol(UnionEnd())
225
226 index = alternative_symbol.labels.index(label)
227 symbol = alternative_symbol.get_symbol(index)
228 self._parser.push_symbol(symbol)
229 return index