Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/fastavro/io/parser.py: 12%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

94 statements  

1from .symbols import ( 

2 Root, 

3 Terminal, 

4 Boolean, 

5 Sequence, 

6 Repeater, 

7 Action, 

8 RecordStart, 

9 RecordEnd, 

10 FieldStart, 

11 FieldEnd, 

12 Int, 

13 Null, 

14 String, 

15 Alternative, 

16 Union, 

17 Long, 

18 Float, 

19 Double, 

20 Bytes, 

21 MapEnd, 

22 MapStart, 

23 MapKeyMarker, 

24 Enum, 

25 EnumLabels, 

26 Fixed, 

27 ArrayStart, 

28 ArrayEnd, 

29 ItemEnd, 

30 NO_DEFAULT, 

31) 

32from ..schema import extract_record_type 

33 

34 

35class Parser: 

36 def __init__(self, schema, named_schemas, action_function): 

37 self.schema = schema 

38 self.named_schemas = named_schemas 

39 self.action_function = action_function 

40 self.stack = self.parse() 

41 

42 def parse(self): 

43 symbol = self._parse(self.schema) 

44 root = Root([symbol]) 

45 root.production.insert(0, root) 

46 return [root, symbol] 

47 

48 def _parse(self, schema, default=NO_DEFAULT): 

49 record_type = extract_record_type(schema) 

50 

51 if record_type == "record": 

52 production = [] 

53 

54 production.append(RecordStart(default=default)) 

55 for field in schema["fields"]: 

56 production.insert(0, FieldStart(field["name"])) 

57 production.insert( 

58 0, self._parse(field["type"], field.get("default", NO_DEFAULT)) 

59 ) 

60 production.insert(0, FieldEnd()) 

61 production.insert(0, RecordEnd()) 

62 

63 seq = Sequence(*production) 

64 return seq 

65 

66 elif record_type == "union": 

67 symbols = [] 

68 labels = [] 

69 for candidate_schema in schema: 

70 symbols.append(self._parse(candidate_schema)) 

71 if isinstance(candidate_schema, dict): 

72 labels.append( 

73 candidate_schema.get("name", candidate_schema.get("type")) 

74 ) 

75 else: 

76 labels.append(candidate_schema) 

77 

78 return Sequence(Alternative(symbols, labels, default=default), Union()) 

79 

80 elif record_type == "map": 

81 repeat = Repeater( 

82 MapEnd(), 

83 # ItemEnd(), # TODO: Maybe need this? 

84 self._parse(schema["values"]), 

85 MapKeyMarker(), 

86 String(), 

87 ) 

88 return Sequence(repeat, MapStart(default=default)) 

89 

90 elif record_type == "array": 

91 repeat = Repeater( 

92 ArrayEnd(), 

93 ItemEnd(), 

94 self._parse(schema["items"]), 

95 ) 

96 return Sequence(repeat, ArrayStart(default=default)) 

97 

98 elif record_type == "enum": 

99 return Sequence(EnumLabels(schema["symbols"]), Enum(default=default)) 

100 

101 elif record_type == "null": 

102 return Null() 

103 elif record_type == "boolean": 

104 return Boolean(default=default) 

105 elif record_type == "string": 

106 return String(default=default) 

107 elif record_type == "bytes": 

108 return Bytes(default=default) 

109 elif record_type == "int": 

110 return Int(default=default) 

111 elif record_type == "long": 

112 return Long(default=default) 

113 elif record_type == "float": 

114 return Float(default=default) 

115 elif record_type == "double": 

116 return Double(default=default) 

117 elif record_type == "fixed": 

118 return Fixed(default=default) 

119 elif record_type in self.named_schemas: 

120 return self._parse(self.named_schemas[record_type]) 

121 else: 

122 raise Exception(f"Unhandled type: {record_type}") 

123 

124 def advance(self, symbol): 

125 while True: 

126 top = self.stack.pop() 

127 

128 if top == symbol: 

129 return top 

130 elif isinstance(top, Action): 

131 self.action_function(top) 

132 elif isinstance(top, Terminal): 

133 raise Exception(f"Internal Parser Exception: {top}") 

134 elif isinstance(top, Repeater) and top.end == symbol: 

135 return symbol 

136 else: 

137 self.stack.extend(top.production) 

138 

139 def drain_actions(self): 

140 while True: 

141 top = self.stack.pop() 

142 

143 if isinstance(top, Root): 

144 self.push_symbol(top) 

145 break 

146 elif isinstance(top, Action): 

147 self.action_function(top) 

148 elif not isinstance(top, Terminal): 

149 self.stack.extend(top.production) 

150 else: 

151 raise Exception(f"Internal Parser Exception: {top}") 

152 

153 def pop_symbol(self): 

154 return self.stack.pop() 

155 

156 def push_symbol(self, symbol): 

157 self.stack.append(symbol) 

158 

159 def flush(self): 

160 while len(self.stack) > 0: 

161 top = self.stack.pop() 

162 

163 if isinstance(top, Action) or isinstance(top, Root): 

164 self.action_function(top) 

165 else: 

166 raise Exception(f"Internal Parser Exception: {top}")