Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/fastavro/io/json_encoder.py: 27%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

121 statements  

1import json 

2from typing import IO, List, Tuple, Any 

3 

4from .parser import Parser 

5from .symbols import ( 

6 Root, 

7 Boolean, 

8 Int, 

9 RecordStart, 

10 RecordEnd, 

11 FieldStart, 

12 FieldEnd, 

13 Null, 

14 String, 

15 Union, 

16 UnionEnd, 

17 Long, 

18 Float, 

19 Double, 

20 Bytes, 

21 MapStart, 

22 MapEnd, 

23 MapKeyMarker, 

24 Enum, 

25 Fixed, 

26 ArrayStart, 

27 ArrayEnd, 

28 ItemEnd, 

29) 

30 

31 

32class AvroJSONEncoder: 

33 """Encoder for the avro JSON format. 

34 

35 NOTE: All attributes and methods on this class should be considered 

36 private. 

37 

38 Parameters 

39 ---------- 

40 fo 

41 Input stream 

42 write_union_type 

43 Determine whether to write the union type in the json message. 

44 

45 """ 

46 

47 def __init__(self, fo: IO, *, write_union_type: bool = True): 

48 self._fo = fo 

49 self._stack: List[Tuple[Any, str]] = [] 

50 self._current = None 

51 self._key = None 

52 self._records: List[Any] = [] 

53 self._write_union_type = write_union_type 

54 

55 def write_value(self, value): 

56 if isinstance(self._current, dict): 

57 if self._key: 

58 self._current[self._key] = value 

59 else: 

60 raise Exception("No key was set") 

61 elif isinstance(self._current, list): 

62 self._current.append(value) 

63 else: 

64 # If we aren't in a dict or a list then this must be a schema which 

65 # just has a single basic type 

66 self._records.append(value) 

67 

68 def _push(self): 

69 self._stack.append((self._current, self._key)) 

70 

71 def _pop(self): 

72 prev_current, prev_key = self._stack.pop() 

73 if isinstance(prev_current, dict): 

74 prev_current[prev_key] = self._current 

75 self._current = prev_current 

76 elif isinstance(prev_current, list): 

77 prev_current.append(self._current) 

78 self._current = prev_current 

79 else: 

80 assert prev_current is None 

81 assert prev_key is None 

82 # Back at None, we should have a full record in self._current 

83 self._records.append(self._current) 

84 self._current = prev_current 

85 self._key = prev_key 

86 

87 def write_buffer(self): 

88 # Newline separated 

89 json_data = "\n".join([json.dumps(record) for record in self._records]) 

90 self._fo.write(json_data) 

91 

92 def configure(self, schema, named_schemas): 

93 self._parser = Parser(schema, named_schemas, self.do_action) 

94 

95 def flush(self): 

96 self._parser.flush() 

97 

98 def do_action(self, action): 

99 if isinstance(action, RecordStart): 

100 self.write_object_start() 

101 elif isinstance(action, RecordEnd) or isinstance(action, UnionEnd): 

102 self.write_object_end() 

103 elif isinstance(action, FieldStart): 

104 self.write_object_key(action.field_name) 

105 elif isinstance(action, FieldEnd): 

106 # TODO: Do we need a FieldEnd symbol? 

107 pass 

108 elif isinstance(action, Root): 

109 self.write_buffer() 

110 else: 

111 raise Exception(f"Internal Exception: {action}") 

112 

113 def write_null(self): 

114 self._parser.advance(Null()) 

115 self.write_value(None) 

116 

117 def write_boolean(self, value): 

118 self._parser.advance(Boolean()) 

119 self.write_value(value) 

120 

121 def write_utf8(self, value): 

122 self._parser.advance(String()) 

123 if self._parser.stack[-1] == MapKeyMarker(): 

124 self._parser.advance(MapKeyMarker()) 

125 self.write_object_key(value) 

126 else: 

127 self.write_value(value) 

128 

129 def write_int(self, value): 

130 self._parser.advance(Int()) 

131 self.write_value(value) 

132 

133 def write_long(self, value): 

134 self._parser.advance(Long()) 

135 self.write_value(value) 

136 

137 def write_float(self, value): 

138 self._parser.advance(Float()) 

139 self.write_value(value) 

140 

141 def write_double(self, value): 

142 self._parser.advance(Double()) 

143 self.write_value(value) 

144 

145 def write_bytes(self, value): 

146 self._parser.advance(Bytes()) 

147 self.write_value(value.decode("iso-8859-1")) 

148 

149 def write_enum(self, index): 

150 self._parser.advance(Enum()) 

151 enum_labels = self._parser.pop_symbol() 

152 # TODO: Check symbols? 

153 self.write_value(enum_labels.labels[index]) 

154 

155 def write_fixed(self, value): 

156 self._parser.advance(Fixed()) 

157 self.write_value(value.decode("iso-8859-1")) 

158 

159 def write_array_start(self): 

160 self._parser.advance(ArrayStart()) 

161 self._push() 

162 self._current = [] 

163 

164 def write_item_count(self, length): 

165 pass 

166 

167 def end_item(self): 

168 self._parser.advance(ItemEnd()) 

169 

170 def write_array_end(self): 

171 self._parser.advance(ArrayEnd()) 

172 self._pop() 

173 

174 def write_object_start(self): 

175 self._push() 

176 self._current = {} 

177 

178 def write_object_key(self, key): 

179 self._key = key 

180 

181 def write_object_end(self): 

182 self._pop() 

183 

184 def write_map_start(self): 

185 self._parser.advance(MapStart()) 

186 self.write_object_start() 

187 

188 def write_map_end(self): 

189 self._parser.advance(MapEnd()) 

190 self.write_object_end() 

191 

192 def write_index(self, index, schema): 

193 self._parser.advance(Union()) 

194 alternative_symbol = self._parser.pop_symbol() 

195 

196 symbol = alternative_symbol.get_symbol(index) 

197 

198 if symbol != Null() and self._write_union_type: 

199 self.write_object_start() 

200 self.write_object_key(alternative_symbol.get_label(index)) 

201 # TODO: Do we need this symbol? 

202 self._parser.push_symbol(UnionEnd()) 

203 

204 self._parser.push_symbol(symbol)