1"""This module implements an Earley parser with a dynamic lexer
2
3The core Earley algorithm used here is based on Elizabeth Scott's implementation, here:
4 https://www.sciencedirect.com/science/article/pii/S1571066108001497
5
6That is probably the best reference for understanding the algorithm here.
7
8The Earley parser outputs an SPPF-tree as per that document. The SPPF tree format
9is better documented here:
10 http://www.bramvandersanden.com/post/2014/06/shared-packed-parse-forest/
11
12Instead of running a lexer beforehand, or using a costy char-by-char method, this parser
13uses regular expressions by necessity, achieving high-performance while maintaining all of
14Earley's power in parsing any CFG.
15"""
16
17from typing import TYPE_CHECKING, Callable, Optional, List, Any
18from collections import defaultdict
19
20from ..tree import Tree
21from ..exceptions import UnexpectedCharacters
22from ..lexer import Token
23from ..grammar import Terminal
24from .earley import Parser as BaseParser
25from .earley_forest import TokenNode
26
27if TYPE_CHECKING:
28 from ..common import LexerConf, ParserConf
29
30class Parser(BaseParser):
31 def __init__(self, lexer_conf: 'LexerConf', parser_conf: 'ParserConf', term_matcher: Callable,
32 resolve_ambiguity: bool=True, complete_lex: bool=False, debug: bool=False,
33 tree_class: Optional[Callable[[str, List], Any]]=Tree, ordered_sets: bool=True):
34 BaseParser.__init__(self, lexer_conf, parser_conf, term_matcher, resolve_ambiguity,
35 debug, tree_class, ordered_sets)
36 self.ignore = [Terminal(t) for t in lexer_conf.ignore]
37 self.complete_lex = complete_lex
38
39 def _parse(self, stream, columns, to_scan, start_symbol=None):
40
41 def scan(i, to_scan):
42 """The core Earley Scanner.
43
44 This is a custom implementation of the scanner that uses the
45 Lark lexer to match tokens. The scan list is built by the
46 Earley predictor, based on the previously completed tokens.
47 This ensures that at each phase of the parse we have a custom
48 lexer context, allowing for more complex ambiguities."""
49
50 node_cache = {}
51
52 # 1) Loop the expectations and ask the lexer to match.
53 # Since regexp is forward looking on the input stream, and we only
54 # want to process tokens when we hit the point in the stream at which
55 # they complete, we push all tokens into a buffer (delayed_matches), to
56 # be held possibly for a later parse step when we reach the point in the
57 # input stream at which they complete.
58 for item in self.Set(to_scan):
59 m = match(item.expect, stream, i)
60 if m:
61 t = Token(item.expect.name, m.group(0), i, text_line, text_column)
62 delayed_matches[m.end()].append( (item, i, t) )
63
64 if self.complete_lex:
65 s = m.group(0)
66 for j in range(1, len(s)):
67 m = match(item.expect, s[:-j])
68 if m:
69 t = Token(item.expect.name, m.group(0), i, text_line, text_column)
70 delayed_matches[i+m.end()].append( (item, i, t) )
71
72 # XXX The following 3 lines were commented out for causing a bug. See issue #768
73 # # Remove any items that successfully matched in this pass from the to_scan buffer.
74 # # This ensures we don't carry over tokens that already matched, if we're ignoring below.
75 # to_scan.remove(item)
76
77 # 3) Process any ignores. This is typically used for e.g. whitespace.
78 # We carry over any unmatched items from the to_scan buffer to be matched again after
79 # the ignore. This should allow us to use ignored symbols in non-terminals to implement
80 # e.g. mandatory spacing.
81 for x in self.ignore:
82 m = match(x, stream, i)
83 if m:
84 # Carry over any items still in the scan buffer, to past the end of the ignored items.
85 delayed_matches[m.end()].extend([(item, i, None) for item in to_scan ])
86
87 # If we're ignoring up to the end of the file, # carry over the start symbol if it already completed.
88 delayed_matches[m.end()].extend([(item, i, None) for item in columns[i] if item.is_complete and item.s == start_symbol])
89
90 next_to_scan = self.Set()
91 next_set = self.Set()
92 columns.append(next_set)
93 transitives.append({})
94
95 ## 4) Process Tokens from delayed_matches.
96 # This is the core of the Earley scanner. Create an SPPF node for each Token,
97 # and create the symbol node in the SPPF tree. Advance the item that completed,
98 # and add the resulting new item to either the Earley set (for processing by the
99 # completer/predictor) or the to_scan buffer for the next parse step.
100 for item, start, token in delayed_matches[i+1]:
101 if token is not None:
102 token.end_line = text_line
103 token.end_column = text_column + 1
104 token.end_pos = i + 1
105
106 new_item = item.advance()
107 label = (new_item.s, new_item.start, i + 1)
108 token_node = TokenNode(token, terminals[token.type])
109 new_item.node = node_cache[label] if label in node_cache else node_cache.setdefault(label, self.SymbolNode(*label))
110 new_item.node.add_family(new_item.s, item.rule, new_item.start, item.node, token_node)
111 else:
112 new_item = item
113
114 if new_item.expect in self.TERMINALS:
115 # add (B ::= Aai+1.B, h, y) to Q'
116 next_to_scan.add(new_item)
117 else:
118 # add (B ::= Aa+1.B, h, y) to Ei+1
119 next_set.add(new_item)
120
121 del delayed_matches[i+1] # No longer needed, so unburden memory
122
123 if not next_set and not delayed_matches and not next_to_scan:
124 considered_rules = list(sorted(to_scan, key=lambda key: key.rule.origin.name))
125 raise UnexpectedCharacters(stream, i, text_line, text_column, {item.expect.name for item in to_scan},
126 set(to_scan), state=frozenset(i.s for i in to_scan),
127 considered_rules=considered_rules
128 )
129
130 return next_to_scan, node_cache
131
132
133 delayed_matches = defaultdict(list)
134 match = self.term_matcher
135 terminals = self.lexer_conf.terminals_by_name
136
137 # Cache for nodes & tokens created in a particular parse step.
138 transitives = [{}]
139
140 text_line = 1
141 text_column = 1
142
143 ## The main Earley loop.
144 # Run the Prediction/Completion cycle for any Items in the current Earley set.
145 # Completions will be added to the SPPF tree, and predictions will be recursively
146 # processed down to terminals/empty nodes to be added to the scanner for the next
147 # step.
148 i = 0
149 node_cache = {}
150 for token in stream:
151 self.predict_and_complete(i, to_scan, columns, transitives, node_cache)
152
153 to_scan, node_cache = scan(i, to_scan)
154
155 if token == '\n':
156 text_line += 1
157 text_column = 1
158 else:
159 text_column += 1
160 i += 1
161
162 self.predict_and_complete(i, to_scan, columns, transitives, node_cache)
163
164 ## Column is now the final column in the parse.
165 assert i == len(columns)-1
166 return to_scan