Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/fastavro/io/json_decoder.py: 23%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

136 statements  

1import json 

2from typing import IO, Any, Tuple, List 

3 

4from .parser import Parser 

5from .symbols import ( 

6 RecordStart, 

7 FieldStart, 

8 Boolean, 

9 Int, 

10 Null, 

11 String, 

12 Long, 

13 Float, 

14 Double, 

15 Bytes, 

16 FieldEnd, 

17 RecordEnd, 

18 Union, 

19 UnionEnd, 

20 MapStart, 

21 MapEnd, 

22 MapKeyMarker, 

23 Fixed, 

24 ArrayStart, 

25 ArrayEnd, 

26 Enum, 

27 ItemEnd, 

28) 

29 

30 

31class AvroJSONDecoder: 

32 """Decoder for the avro JSON format. 

33 

34 NOTE: All attributes and methods on this class should be considered 

35 private. 

36 

37 Parameters 

38 ---------- 

39 fo 

40 File-like object to reader from 

41 

42 """ 

43 

44 def __init__(self, fo: IO): 

45 self._fo = fo 

46 self._stack: List[Tuple[Any, str]] = [] 

47 self._json_data = [json.loads(line.strip()) for line in fo] 

48 if self._json_data: 

49 self._current = self._json_data.pop(0) 

50 self.done = False 

51 else: 

52 self.done = True 

53 self._key = None 

54 

55 def read_value(self, symbol): 

56 if isinstance(self._current, dict): 

57 if self._key not in self._current: 

58 # Use the default value 

59 return symbol.get_default() 

60 else: 

61 return self._current[self._key] 

62 else: 

63 # If we aren't in a dict or a list then this must be a schema which 

64 # just has a single basic type 

65 return self._current 

66 

67 def _push(self): 

68 self._stack.append((self._current, self._key)) 

69 

70 def _push_and_adjust(self, symbol=None): 

71 self._push() 

72 if isinstance(self._current, dict) and self._key is not None: 

73 if self._key not in self._current: 

74 self._current = symbol.get_default() 

75 else: 

76 # self._current = self._current.pop(self._key) 

77 self._current = self._current[self._key] 

78 

79 def _pop(self): 

80 self._current, self._key = self._stack.pop() 

81 

82 def configure(self, schema, named_schemas): 

83 self._parser = Parser(schema, named_schemas, self.do_action) 

84 

85 def do_action(self, action): 

86 if isinstance(action, RecordStart): 

87 self._push_and_adjust(action) 

88 elif isinstance(action, RecordEnd): 

89 self._pop() 

90 elif isinstance(action, FieldStart): 

91 self.read_object_key(action.field_name) 

92 elif isinstance(action, FieldEnd) or isinstance(action, UnionEnd): 

93 # TODO: Do we need a FieldEnd and UnionEnd symbol? 

94 pass 

95 else: 

96 raise Exception(f"cannot handle: {action}") 

97 

98 def drain(self): 

99 self._parser.drain_actions() 

100 if self._json_data: 

101 self._current = self._json_data.pop(0) 

102 self._key = None 

103 else: 

104 self.done = True 

105 

106 def read_null(self): 

107 symbol = self._parser.advance(Null()) 

108 return self.read_value(symbol) 

109 

110 def read_boolean(self): 

111 symbol = self._parser.advance(Boolean()) 

112 return self.read_value(symbol) 

113 

114 def read_utf8(self, handle_unicode_errors="strict"): 

115 symbol = self._parser.advance(String()) 

116 if self._parser.stack[-1] == MapKeyMarker(): 

117 self._parser.advance(MapKeyMarker()) 

118 for key in self._current: 

119 self._key = key 

120 break 

121 return self._key 

122 else: 

123 return self.read_value(symbol) 

124 

125 def read_bytes(self): 

126 symbol = self._parser.advance(Bytes()) 

127 return self.read_value(symbol).encode("iso-8859-1") 

128 

129 def read_int(self): 

130 symbol = self._parser.advance(Int()) 

131 return self.read_value(symbol) 

132 

133 def read_long(self): 

134 symbol = self._parser.advance(Long()) 

135 return self.read_value(symbol) 

136 

137 def read_float(self): 

138 symbol = self._parser.advance(Float()) 

139 return self.read_value(symbol) 

140 

141 def read_double(self): 

142 symbol = self._parser.advance(Double()) 

143 return self.read_value(symbol) 

144 

145 def read_enum(self): 

146 symbol = self._parser.advance(Enum()) 

147 enum_labels = self._parser.pop_symbol() # pop the enumlabels 

148 # TODO: Should we verify the value is one of the symbols? 

149 label = self.read_value(symbol) 

150 return enum_labels.labels.index(label) 

151 

152 def read_fixed(self, size): 

153 symbol = self._parser.advance(Fixed()) 

154 return self.read_value(symbol).encode("iso-8859-1") 

155 

156 def read_map_start(self): 

157 symbol = self._parser.advance(MapStart()) 

158 self._push_and_adjust(symbol) 

159 

160 def read_object_key(self, key): 

161 self._key = key 

162 

163 def iter_map(self): 

164 while len(self._current) > 0: 

165 self._push() 

166 for key in self._current: 

167 break 

168 yield 

169 self._pop() 

170 del self._current[key] 

171 

172 def read_map_end(self): 

173 self._parser.advance(MapEnd()) 

174 self._pop() 

175 

176 def read_array_start(self): 

177 symbol = self._parser.advance(ArrayStart()) 

178 self._push_and_adjust(symbol) 

179 self._key = None 

180 

181 def read_array_end(self): 

182 self._parser.advance(ArrayEnd()) 

183 self._pop() 

184 

185 def iter_array(self): 

186 while len(self._current) > 0: 

187 self._push() 

188 self._current = self._current.pop(0) 

189 yield 

190 self._pop() 

191 self._parser.advance(ItemEnd()) 

192 

193 def read_index(self): 

194 self._parser.advance(Union()) 

195 alternative_symbol = self._parser.pop_symbol() 

196 

197 # TODO: Try to clean this up. 

198 # A JSON union is encoded like this: {"union_field": {int: 32}} and so 

199 # what we are doing is trying to change that into {"union_field": 32} 

200 # before eventually reading the value of "union_field" 

201 if self._key is None: 

202 # If self._key is None, self._current is an item in an array 

203 if self._current is None: 

204 label = "null" 

205 else: 

206 label, data = self._current.popitem() 

207 self._current = data 

208 # TODO: Do we need to do this? 

209 self._parser.push_symbol(UnionEnd()) 

210 else: 

211 # self._current is a JSON object and self._key should be the name 

212 # of the union field 

213 if self._key not in self._current: 

214 self._current[self._key] = { 

215 alternative_symbol.labels[0]: alternative_symbol.get_default() 

216 } 

217 

218 if self._current[self._key] is None: 

219 label = "null" 

220 else: 

221 label, data = self._current[self._key].popitem() 

222 self._current[self._key] = data 

223 # TODO: Do we need to do this? 

224 self._parser.push_symbol(UnionEnd()) 

225 

226 index = alternative_symbol.labels.index(label) 

227 symbol = alternative_symbol.get_symbol(index) 

228 self._parser.push_symbol(symbol) 

229 return index