1from .symbols import (
2 Root,
3 Terminal,
4 Boolean,
5 Sequence,
6 Repeater,
7 Action,
8 RecordStart,
9 RecordEnd,
10 FieldStart,
11 FieldEnd,
12 Int,
13 Null,
14 String,
15 Alternative,
16 Union,
17 Long,
18 Float,
19 Double,
20 Bytes,
21 MapEnd,
22 MapStart,
23 MapKeyMarker,
24 Enum,
25 EnumLabels,
26 Fixed,
27 ArrayStart,
28 ArrayEnd,
29 ItemEnd,
30 NO_DEFAULT,
31)
32from ..schema import extract_record_type
33
34
35class Parser:
36 def __init__(self, schema, named_schemas, action_function):
37 self.schema = schema
38 self._processed_records = []
39 self.named_schemas = named_schemas
40 self.action_function = action_function
41 self.stack = self.parse()
42
43 def parse(self):
44 symbol = self._parse(self.schema)
45 root = Root([symbol])
46 root.production.insert(0, root)
47 return [root, symbol]
48
49 def _process_record(self, schema, default, schema_name=None):
50 production = []
51
52 production.append(RecordStart(default=default))
53 for field in schema["fields"]:
54 field_name = field["name"]
55 production.insert(0, FieldStart(field_name))
56
57 if schema_name is not None and schema_name in field["type"]:
58 # this meanns a recursive relationship, so we force a `null`
59 internal_record = Sequence(
60 Alternative([Null()], ["null"], default=None), Union()
61 )
62 else:
63 internal_record = self._parse(
64 field["type"], field.get("default", NO_DEFAULT)
65 )
66
67 production.insert(0, internal_record)
68 production.insert(0, FieldEnd())
69 production.insert(0, RecordEnd())
70
71 return production
72
73 def _parse(self, schema, default=NO_DEFAULT):
74 record_type = extract_record_type(schema)
75
76 if record_type == "record":
77 production = []
78 schema_name = schema["name"]
79
80 if schema_name not in self._processed_records:
81 self._processed_records.append(schema_name)
82 production = self._process_record(schema, default)
83 else:
84 production = self._process_record(
85 schema, default, schema_name=schema_name
86 )
87
88 seq = Sequence(*production)
89 return seq
90
91 elif record_type == "union":
92 symbols = []
93 labels = []
94 for candidate_schema in schema:
95 symbols.append(self._parse(candidate_schema))
96 if isinstance(candidate_schema, dict):
97 labels.append(
98 candidate_schema.get("name", candidate_schema.get("type"))
99 )
100 else:
101 labels.append(candidate_schema)
102
103 return Sequence(Alternative(symbols, labels, default=default), Union())
104
105 elif record_type == "map":
106 repeat = Repeater(
107 MapEnd(),
108 # ItemEnd(), # TODO: Maybe need this?
109 self._parse(schema["values"]),
110 MapKeyMarker(),
111 String(),
112 )
113 return Sequence(repeat, MapStart(default=default))
114
115 elif record_type == "array":
116 repeat = Repeater(
117 ArrayEnd(),
118 ItemEnd(),
119 self._parse(schema["items"]),
120 )
121 return Sequence(repeat, ArrayStart(default=default))
122
123 elif record_type == "enum":
124 return Sequence(EnumLabels(schema["symbols"]), Enum(default=default))
125
126 elif record_type == "null":
127 return Null()
128 elif record_type == "boolean":
129 return Boolean(default=default)
130 elif record_type == "string":
131 return String(default=default)
132 elif record_type == "bytes":
133 return Bytes(default=default)
134 elif record_type == "int":
135 return Int(default=default)
136 elif record_type == "long":
137 return Long(default=default)
138 elif record_type == "float":
139 return Float(default=default)
140 elif record_type == "double":
141 return Double(default=default)
142 elif record_type == "fixed":
143 return Fixed(default=default)
144 elif record_type in self.named_schemas:
145 return self._parse(self.named_schemas[record_type])
146 else:
147 raise Exception(f"Unhandled type: {record_type}")
148
149 def advance(self, symbol):
150 while True:
151 top = self.stack.pop()
152
153 if top == symbol:
154 return top
155 elif isinstance(top, Action):
156 self.action_function(top)
157 elif isinstance(top, Terminal):
158 raise Exception(f"Internal Parser Exception: {top}")
159 elif isinstance(top, Repeater) and top.end == symbol:
160 return symbol
161 else:
162 self.stack.extend(top.production)
163
164 def drain_actions(self):
165 while True:
166 top = self.stack.pop()
167
168 if isinstance(top, Root):
169 self.push_symbol(top)
170 break
171 elif isinstance(top, Action):
172 self.action_function(top)
173 elif not isinstance(top, Terminal):
174 self.stack.extend(top.production)
175 else:
176 raise Exception(f"Internal Parser Exception: {top}")
177
178 def pop_symbol(self):
179 return self.stack.pop()
180
181 def push_symbol(self, symbol):
182 self.stack.append(symbol)
183
184 def flush(self):
185 while len(self.stack) > 0:
186 top = self.stack.pop()
187
188 if isinstance(top, Action) or isinstance(top, Root):
189 self.action_function(top)
190 else:
191 raise Exception(f"Internal Parser Exception: {top}")