Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/fastavro/io/parser.py: 11%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

109 statements  

1from .symbols import ( 

2 Root, 

3 Terminal, 

4 Boolean, 

5 Sequence, 

6 Repeater, 

7 Action, 

8 RecordStart, 

9 RecordEnd, 

10 FieldStart, 

11 FieldEnd, 

12 Int, 

13 Null, 

14 String, 

15 Alternative, 

16 Union, 

17 Long, 

18 Float, 

19 Double, 

20 Bytes, 

21 MapEnd, 

22 MapStart, 

23 MapKeyMarker, 

24 Enum, 

25 EnumLabels, 

26 Fixed, 

27 ArrayStart, 

28 ArrayEnd, 

29 ItemEnd, 

30 NO_DEFAULT, 

31) 

32from ..schema import extract_record_type 

33 

34 

35class Parser: 

36 def __init__(self, schema, named_schemas, action_function): 

37 self.schema = schema 

38 self._processed_records = [] 

39 self.named_schemas = named_schemas 

40 self.action_function = action_function 

41 self.stack = self.parse() 

42 

43 def parse(self): 

44 symbol = self._parse(self.schema) 

45 root = Root([symbol]) 

46 root.production.insert(0, root) 

47 return [root, symbol] 

48 

49 def _process_record(self, schema, default, schema_name=None): 

50 production = [] 

51 

52 production.append(RecordStart(default=default)) 

53 for field in schema["fields"]: 

54 field_name = field["name"] 

55 production.insert(0, FieldStart(field_name)) 

56 

57 if schema_name is not None and schema_name in field["type"]: 

58 # this meanns a recursive relationship, so we force a `null` 

59 internal_record = Sequence( 

60 Alternative([Null()], ["null"], default=None), Union() 

61 ) 

62 else: 

63 internal_record = self._parse( 

64 field["type"], field.get("default", NO_DEFAULT) 

65 ) 

66 

67 production.insert(0, internal_record) 

68 production.insert(0, FieldEnd()) 

69 production.insert(0, RecordEnd()) 

70 

71 return production 

72 

73 def _parse(self, schema, default=NO_DEFAULT): 

74 record_type = extract_record_type(schema) 

75 

76 if record_type == "record": 

77 production = [] 

78 schema_name = schema["name"] 

79 

80 if schema_name not in self._processed_records: 

81 self._processed_records.append(schema_name) 

82 production = self._process_record(schema, default) 

83 else: 

84 production = self._process_record( 

85 schema, default, schema_name=schema_name 

86 ) 

87 

88 seq = Sequence(*production) 

89 return seq 

90 

91 elif record_type == "union": 

92 symbols = [] 

93 labels = [] 

94 for candidate_schema in schema: 

95 symbols.append(self._parse(candidate_schema)) 

96 if isinstance(candidate_schema, dict): 

97 labels.append( 

98 candidate_schema.get("name", candidate_schema.get("type")) 

99 ) 

100 else: 

101 labels.append(candidate_schema) 

102 

103 return Sequence(Alternative(symbols, labels, default=default), Union()) 

104 

105 elif record_type == "map": 

106 repeat = Repeater( 

107 MapEnd(), 

108 # ItemEnd(), # TODO: Maybe need this? 

109 self._parse(schema["values"]), 

110 MapKeyMarker(), 

111 String(), 

112 ) 

113 return Sequence(repeat, MapStart(default=default)) 

114 

115 elif record_type == "array": 

116 repeat = Repeater( 

117 ArrayEnd(), 

118 ItemEnd(), 

119 self._parse(schema["items"]), 

120 ) 

121 return Sequence(repeat, ArrayStart(default=default)) 

122 

123 elif record_type == "enum": 

124 return Sequence(EnumLabels(schema["symbols"]), Enum(default=default)) 

125 

126 elif record_type == "null": 

127 return Null() 

128 elif record_type == "boolean": 

129 return Boolean(default=default) 

130 elif record_type == "string": 

131 return String(default=default) 

132 elif record_type == "bytes": 

133 return Bytes(default=default) 

134 elif record_type == "int": 

135 return Int(default=default) 

136 elif record_type == "long": 

137 return Long(default=default) 

138 elif record_type == "float": 

139 return Float(default=default) 

140 elif record_type == "double": 

141 return Double(default=default) 

142 elif record_type == "fixed": 

143 return Fixed(default=default) 

144 elif record_type in self.named_schemas: 

145 return self._parse(self.named_schemas[record_type]) 

146 else: 

147 raise Exception(f"Unhandled type: {record_type}") 

148 

149 def advance(self, symbol): 

150 while True: 

151 top = self.stack.pop() 

152 

153 if top == symbol: 

154 return top 

155 elif isinstance(top, Action): 

156 self.action_function(top) 

157 elif isinstance(top, Terminal): 

158 raise Exception(f"Internal Parser Exception: {top}") 

159 elif isinstance(top, Repeater) and top.end == symbol: 

160 return symbol 

161 else: 

162 self.stack.extend(top.production) 

163 

164 def drain_actions(self): 

165 while True: 

166 top = self.stack.pop() 

167 

168 if isinstance(top, Root): 

169 self.push_symbol(top) 

170 break 

171 elif isinstance(top, Action): 

172 self.action_function(top) 

173 elif not isinstance(top, Terminal): 

174 self.stack.extend(top.production) 

175 else: 

176 raise Exception(f"Internal Parser Exception: {top}") 

177 

178 def pop_symbol(self): 

179 return self.stack.pop() 

180 

181 def push_symbol(self, symbol): 

182 self.stack.append(symbol) 

183 

184 def flush(self): 

185 while len(self.stack) > 0: 

186 top = self.stack.pop() 

187 

188 if isinstance(top, Action) or isinstance(top, Root): 

189 self.action_function(top) 

190 else: 

191 raise Exception(f"Internal Parser Exception: {top}")