Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/lark/parsers/xearley.py: 80%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

84 statements  

1"""This module implements an Earley parser with a dynamic lexer 

2 

3The core Earley algorithm used here is based on Elizabeth Scott's implementation, here: 

4 https://www.sciencedirect.com/science/article/pii/S1571066108001497 

5 

6That is probably the best reference for understanding the algorithm here. 

7 

8The Earley parser outputs an SPPF-tree as per that document. The SPPF tree format 

9is better documented here: 

10 http://www.bramvandersanden.com/post/2014/06/shared-packed-parse-forest/ 

11 

12Instead of running a lexer beforehand, or using a costy char-by-char method, this parser 

13uses regular expressions by necessity, achieving high-performance while maintaining all of 

14Earley's power in parsing any CFG. 

15""" 

16 

17from typing import TYPE_CHECKING, Callable, Optional, List, Any 

18from collections import defaultdict 

19 

20from ..tree import Tree 

21from ..exceptions import UnexpectedCharacters 

22from ..lexer import Token 

23from ..grammar import Terminal 

24from .earley import Parser as BaseParser 

25from .earley_common import Item 

26from .earley_forest import TokenNode 

27 

28if TYPE_CHECKING: 

29 from ..common import LexerConf, ParserConf 

30 

31class Parser(BaseParser): 

32 def __init__(self, lexer_conf: 'LexerConf', parser_conf: 'ParserConf', term_matcher: Callable, 

33 resolve_ambiguity: bool=True, complete_lex: bool=False, debug: bool=False, 

34 tree_class: Optional[Callable[[str, List], Any]]=Tree, ordered_sets: bool=True): 

35 BaseParser.__init__(self, lexer_conf, parser_conf, term_matcher, resolve_ambiguity, 

36 debug, tree_class, ordered_sets) 

37 self.ignore = [Terminal(t) for t in lexer_conf.ignore] 

38 self.complete_lex = complete_lex 

39 

40 def _parse(self, stream, columns, to_scan, start_symbol=None): 

41 

42 def scan(i, to_scan): 

43 """The core Earley Scanner. 

44 

45 This is a custom implementation of the scanner that uses the 

46 Lark lexer to match tokens. The scan list is built by the 

47 Earley predictor, based on the previously completed tokens. 

48 This ensures that at each phase of the parse we have a custom 

49 lexer context, allowing for more complex ambiguities.""" 

50 

51 node_cache = {} 

52 

53 # 1) Loop the expectations and ask the lexer to match. 

54 # Since regexp is forward looking on the input stream, and we only 

55 # want to process tokens when we hit the point in the stream at which 

56 # they complete, we push all tokens into a buffer (delayed_matches), to 

57 # be held possibly for a later parse step when we reach the point in the 

58 # input stream at which they complete. 

59 for item in self.Set(to_scan): 

60 m = match(item.expect, stream, i) 

61 if m: 

62 t = Token(item.expect.name, m.group(0), i, text_line, text_column) 

63 delayed_matches[m.end()].append( (item, i, t) ) 

64 

65 if self.complete_lex: 

66 s = m.group(0) 

67 for j in range(1, len(s)): 

68 m = match(item.expect, s[:-j]) 

69 if m: 

70 t = Token(item.expect.name, m.group(0), i, text_line, text_column) 

71 delayed_matches[i+m.end()].append( (item, i, t) ) 

72 

73 # XXX The following 3 lines were commented out for causing a bug. See issue #768 

74 # # Remove any items that successfully matched in this pass from the to_scan buffer. 

75 # # This ensures we don't carry over tokens that already matched, if we're ignoring below. 

76 # to_scan.remove(item) 

77 

78 # 3) Process any ignores. This is typically used for e.g. whitespace. 

79 # We carry over any unmatched items from the to_scan buffer to be matched again after 

80 # the ignore. This should allow us to use ignored symbols in non-terminals to implement 

81 # e.g. mandatory spacing. 

82 for x in self.ignore: 

83 m = match(x, stream, i) 

84 if m: 

85 # Carry over any items still in the scan buffer, to past the end of the ignored items. 

86 delayed_matches[m.end()].extend([(item, i, None) for item in to_scan ]) 

87 

88 # If we're ignoring up to the end of the file, # carry over the start symbol if it already completed. 

89 delayed_matches[m.end()].extend([(item, i, None) for item in columns[i] if item.is_complete and item.s == start_symbol]) 

90 

91 next_to_scan = self.Set() 

92 next_set = self.Set() 

93 columns.append(next_set) 

94 transitives.append({}) 

95 

96 ## 4) Process Tokens from delayed_matches. 

97 # This is the core of the Earley scanner. Create an SPPF node for each Token, 

98 # and create the symbol node in the SPPF tree. Advance the item that completed, 

99 # and add the resulting new item to either the Earley set (for processing by the 

100 # completer/predictor) or the to_scan buffer for the next parse step. 

101 for item, start, token in delayed_matches[i+1]: 

102 if token is not None: 

103 token.end_line = text_line 

104 token.end_column = text_column + 1 

105 token.end_pos = i + 1 

106 

107 new_item = item.advance() 

108 label = (new_item.s, new_item.start, i + 1) 

109 token_node = TokenNode(token, terminals[token.type]) 

110 new_item.node = node_cache[label] if label in node_cache else node_cache.setdefault(label, self.SymbolNode(*label)) 

111 new_item.node.add_family(new_item.s, item.rule, new_item.start, item.node, token_node) 

112 else: 

113 # Handle items carried over due to ignores 

114 new_item = Item(item.rule, item.ptr, item.start) 

115 label = (new_item.s, new_item.start, i + 1) 

116 new_item.node = node_cache[label] if label in node_cache else node_cache.setdefault(label, self.SymbolNode(*label)) 

117 if item.node: 

118 # new_item.node and item.node both represent the same symbol, so merge their children 

119 for child in item.node.children: 

120 new_item.node.add_family(new_item.s, child.rule, new_item.start, child.left, child.right) 

121 

122 if new_item.expect in self.TERMINALS: 

123 # add (B ::= Aai+1.B, h, y) to Q' 

124 next_to_scan.add(new_item) 

125 else: 

126 # add (B ::= Aa+1.B, h, y) to Ei+1 

127 next_set.add(new_item) 

128 

129 del delayed_matches[i+1] # No longer needed, so unburden memory 

130 

131 if not next_set and not delayed_matches and not next_to_scan: 

132 considered_rules = list(sorted(to_scan, key=lambda key: key.rule.origin.name)) 

133 raise UnexpectedCharacters(stream, i, text_line, text_column, {item.expect.name for item in to_scan}, 

134 set(to_scan), state=frozenset(i.s for i in to_scan), 

135 considered_rules=considered_rules 

136 ) 

137 

138 return next_to_scan, node_cache 

139 

140 

141 delayed_matches = defaultdict(list) 

142 match = self.term_matcher 

143 terminals = self.lexer_conf.terminals_by_name 

144 

145 # Cache for nodes & tokens created in a particular parse step. 

146 transitives = [{}] 

147 

148 text_line = 1 

149 text_column = 1 

150 

151 ## The main Earley loop. 

152 # Run the Prediction/Completion cycle for any Items in the current Earley set. 

153 # Completions will be added to the SPPF tree, and predictions will be recursively 

154 # processed down to terminals/empty nodes to be added to the scanner for the next 

155 # step. 

156 i = 0 

157 node_cache = {} 

158 for token in stream: 

159 self.predict_and_complete(i, to_scan, columns, transitives, node_cache) 

160 

161 to_scan, node_cache = scan(i, to_scan) 

162 

163 if token == '\n': 

164 text_line += 1 

165 text_column = 1 

166 else: 

167 text_column += 1 

168 i += 1 

169 

170 self.predict_and_complete(i, to_scan, columns, transitives, node_cache) 

171 

172 ## Column is now the final column in the parse. 

173 assert i == len(columns)-1 

174 return to_scan