1# Copyright 2004-2005 Elemental Security, Inc. All Rights Reserved.
2# Licensed to PSF under a Contributor Agreement.
3
4# Modifications:
5# Copyright 2006 Google, Inc. All Rights Reserved.
6# Licensed to PSF under a Contributor Agreement.
7
8"""Parser driver.
9
10This provides a high-level interface to parse a file into a syntax tree.
11
12"""
13
14__author__ = "Guido van Rossum <guido@python.org>"
15
16__all__ = ["Driver", "load_grammar"]
17
18# Python imports
19import io
20import logging
21import os
22import pkgutil
23import sys
24from collections.abc import Iterable, Iterator
25from contextlib import contextmanager
26from dataclasses import dataclass, field
27from logging import Logger
28from typing import IO, Any, Optional, Union, cast
29
30from blib2to3.pgen2.grammar import Grammar
31from blib2to3.pgen2.tokenize import TokenInfo
32from blib2to3.pytree import NL
33
34# Pgen imports
35from . import grammar, parse, pgen, token, tokenize
36
37Path = Union[str, "os.PathLike[str]"]
38
39
40@dataclass
41class ReleaseRange:
42 start: int
43 end: Optional[int] = None
44 tokens: list[Any] = field(default_factory=list)
45
46 def lock(self) -> None:
47 total_eaten = len(self.tokens)
48 self.end = self.start + total_eaten
49
50
51class TokenProxy:
52 def __init__(self, generator: Any) -> None:
53 self._tokens = generator
54 self._counter = 0
55 self._release_ranges: list[ReleaseRange] = []
56
57 @contextmanager
58 def release(self) -> Iterator["TokenProxy"]:
59 release_range = ReleaseRange(self._counter)
60 self._release_ranges.append(release_range)
61 try:
62 yield self
63 finally:
64 # Lock the last release range to the final position that
65 # has been eaten.
66 release_range.lock()
67
68 def eat(self, point: int) -> Any:
69 eaten_tokens = self._release_ranges[-1].tokens
70 if point < len(eaten_tokens):
71 return eaten_tokens[point]
72 else:
73 while point >= len(eaten_tokens):
74 token = next(self._tokens)
75 eaten_tokens.append(token)
76 return token
77
78 def __iter__(self) -> "TokenProxy":
79 return self
80
81 def __next__(self) -> Any:
82 # If the current position is already compromised (looked up)
83 # return the eaten token, if not just go further on the given
84 # token producer.
85 for release_range in self._release_ranges:
86 assert release_range.end is not None
87
88 start, end = release_range.start, release_range.end
89 if start <= self._counter < end:
90 token = release_range.tokens[self._counter - start]
91 break
92 else:
93 token = next(self._tokens)
94 self._counter += 1
95 return token
96
97 def can_advance(self, to: int) -> bool:
98 # Try to eat, fail if it can't. The eat operation is cached
99 # so there won't be any additional cost of eating here
100 try:
101 self.eat(to)
102 except StopIteration:
103 return False
104 else:
105 return True
106
107
108class Driver:
109 def __init__(self, grammar: Grammar, logger: Optional[Logger] = None) -> None:
110 self.grammar = grammar
111 if logger is None:
112 logger = logging.getLogger(__name__)
113 self.logger = logger
114
115 def parse_tokens(self, tokens: Iterable[TokenInfo], debug: bool = False) -> NL:
116 """Parse a series of tokens and return the syntax tree."""
117 # XXX Move the prefix computation into a wrapper around tokenize.
118 proxy = TokenProxy(tokens)
119
120 p = parse.Parser(self.grammar)
121 p.setup(proxy=proxy)
122
123 lineno = 1
124 column = 0
125 indent_columns: list[int] = []
126 type = value = start = end = line_text = None
127 prefix = ""
128
129 for quintuple in proxy:
130 type, value, start, end, line_text = quintuple
131 if start != (lineno, column):
132 assert (lineno, column) <= start, ((lineno, column), start)
133 s_lineno, s_column = start
134 if lineno < s_lineno:
135 prefix += "\n" * (s_lineno - lineno)
136 lineno = s_lineno
137 column = 0
138 if column < s_column:
139 prefix += line_text[column:s_column]
140 column = s_column
141 if type in (tokenize.COMMENT, tokenize.NL):
142 prefix += value
143 lineno, column = end
144 if value.endswith("\n"):
145 lineno += 1
146 column = 0
147 continue
148 if type == token.OP:
149 type = grammar.opmap[value]
150 if debug:
151 assert type is not None
152 self.logger.debug(
153 "%s %r (prefix=%r)", token.tok_name[type], value, prefix
154 )
155 if type == token.INDENT:
156 indent_columns.append(len(value))
157 _prefix = prefix + value
158 prefix = ""
159 value = ""
160 elif type == token.DEDENT:
161 _indent_col = indent_columns.pop()
162 prefix, _prefix = self._partially_consume_prefix(prefix, _indent_col)
163 if p.addtoken(cast(int, type), value, (prefix, start)):
164 if debug:
165 self.logger.debug("Stop.")
166 break
167 prefix = ""
168 if type in {token.INDENT, token.DEDENT}:
169 prefix = _prefix
170 lineno, column = end
171 # FSTRING_MIDDLE and TSTRING_MIDDLE are the only token that can end with a
172 # newline, and `end` will point to the next line. For that case, don't
173 # increment lineno.
174 if value.endswith("\n") and type not in (
175 token.FSTRING_MIDDLE,
176 token.TSTRING_MIDDLE,
177 ):
178 lineno += 1
179 column = 0
180 else:
181 # We never broke out -- EOF is too soon (how can this happen???)
182 assert start is not None
183 raise parse.ParseError("incomplete input", type, value, (prefix, start))
184 assert p.rootnode is not None
185 return p.rootnode
186
187 def parse_file(
188 self, filename: Path, encoding: Optional[str] = None, debug: bool = False
189 ) -> NL:
190 """Parse a file and return the syntax tree."""
191 with open(filename, encoding=encoding) as stream:
192 text = stream.read()
193 return self.parse_string(text, debug)
194
195 def parse_string(self, text: str, debug: bool = False) -> NL:
196 """Parse a string and return the syntax tree."""
197 tokens = tokenize.tokenize(text, grammar=self.grammar)
198 return self.parse_tokens(tokens, debug)
199
200 def _partially_consume_prefix(self, prefix: str, column: int) -> tuple[str, str]:
201 lines: list[str] = []
202 current_line = ""
203 current_column = 0
204 wait_for_nl = False
205 for char in prefix:
206 current_line += char
207 if wait_for_nl:
208 if char == "\n":
209 if current_line.strip() and current_column < column:
210 res = "".join(lines)
211 return res, prefix[len(res) :]
212
213 lines.append(current_line)
214 current_line = ""
215 current_column = 0
216 wait_for_nl = False
217 elif char in " \t":
218 current_column += 1
219 elif char == "\n":
220 # unexpected empty line
221 current_column = 0
222 elif char == "\f":
223 current_column = 0
224 else:
225 # indent is finished
226 wait_for_nl = True
227 return "".join(lines), current_line
228
229
230def _generate_pickle_name(gt: Path, cache_dir: Optional[Path] = None) -> str:
231 head, tail = os.path.splitext(gt)
232 if tail == ".txt":
233 tail = ""
234 name = head + tail + ".".join(map(str, sys.version_info)) + ".pickle"
235 if cache_dir:
236 return os.path.join(cache_dir, os.path.basename(name))
237 else:
238 return name
239
240
241def load_grammar(
242 gt: str = "Grammar.txt",
243 gp: Optional[str] = None,
244 save: bool = True,
245 force: bool = False,
246 logger: Optional[Logger] = None,
247) -> Grammar:
248 """Load the grammar (maybe from a pickle)."""
249 if logger is None:
250 logger = logging.getLogger(__name__)
251 gp = _generate_pickle_name(gt) if gp is None else gp
252 if force or not _newer(gp, gt):
253 g: grammar.Grammar = pgen.generate_grammar(gt)
254 if save:
255 try:
256 g.dump(gp)
257 except OSError:
258 # Ignore error, caching is not vital.
259 pass
260 else:
261 g = grammar.Grammar()
262 g.load(gp)
263 return g
264
265
266def _newer(a: str, b: str) -> bool:
267 """Inquire whether file a was written since file b."""
268 if not os.path.exists(a):
269 return False
270 if not os.path.exists(b):
271 return True
272 return os.path.getmtime(a) >= os.path.getmtime(b)
273
274
275def load_packaged_grammar(
276 package: str, grammar_source: str, cache_dir: Optional[Path] = None
277) -> grammar.Grammar:
278 """Normally, loads a pickled grammar by doing
279 pkgutil.get_data(package, pickled_grammar)
280 where *pickled_grammar* is computed from *grammar_source* by adding the
281 Python version and using a ``.pickle`` extension.
282
283 However, if *grammar_source* is an extant file, load_grammar(grammar_source)
284 is called instead. This facilitates using a packaged grammar file when needed
285 but preserves load_grammar's automatic regeneration behavior when possible.
286
287 """
288 if os.path.isfile(grammar_source):
289 gp = _generate_pickle_name(grammar_source, cache_dir) if cache_dir else None
290 return load_grammar(grammar_source, gp=gp)
291 pickled_name = _generate_pickle_name(os.path.basename(grammar_source), cache_dir)
292 data = pkgutil.get_data(package, pickled_name)
293 assert data is not None
294 g = grammar.Grammar()
295 g.loads(data)
296 return g
297
298
299def main(*args: str) -> bool:
300 """Main program, when run as a script: produce grammar pickle files.
301
302 Calls load_grammar for each argument, a path to a grammar text file.
303 """
304 if not args:
305 args = tuple(sys.argv[1:])
306 logging.basicConfig(level=logging.INFO, stream=sys.stdout, format="%(message)s")
307 for gt in args:
308 load_grammar(gt, save=True, force=True)
309 return True
310
311
312if __name__ == "__main__":
313 sys.exit(int(not main()))