Coverage for /pythoncovmergedfiles/medio/medio/src/black/src/blib2to3/pgen2/driver.py: 85%
205 statements
« prev ^ index » next coverage.py v7.2.7, created at 2023-06-07 06:15 +0000
« prev ^ index » next coverage.py v7.2.7, created at 2023-06-07 06:15 +0000
1# Copyright 2004-2005 Elemental Security, Inc. All Rights Reserved.
2# Licensed to PSF under a Contributor Agreement.
4# Modifications:
5# Copyright 2006 Google, Inc. All Rights Reserved.
6# Licensed to PSF under a Contributor Agreement.
8"""Parser driver.
10This provides a high-level interface to parse a file into a syntax tree.
12"""
14__author__ = "Guido van Rossum <guido@python.org>"
16__all__ = ["Driver", "load_grammar"]
18# Python imports
19import io
20import os
21import logging
22import pkgutil
23import sys
24from typing import (
25 Any,
26 cast,
27 IO,
28 Iterable,
29 List,
30 Optional,
31 Text,
32 Iterator,
33 Tuple,
34 TypeVar,
35 Generic,
36 Union,
37)
38from contextlib import contextmanager
39from dataclasses import dataclass, field
41# Pgen imports
42from . import grammar, parse, token, tokenize, pgen
43from logging import Logger
44from blib2to3.pytree import NL
45from blib2to3.pgen2.grammar import Grammar
46from blib2to3.pgen2.tokenize import GoodTokenInfo
48Path = Union[str, "os.PathLike[str]"]
51@dataclass
52class ReleaseRange:
53 start: int
54 end: Optional[int] = None
55 tokens: List[Any] = field(default_factory=list)
57 def lock(self) -> None:
58 total_eaten = len(self.tokens)
59 self.end = self.start + total_eaten
62class TokenProxy:
63 def __init__(self, generator: Any) -> None:
64 self._tokens = generator
65 self._counter = 0
66 self._release_ranges: List[ReleaseRange] = []
68 @contextmanager
69 def release(self) -> Iterator["TokenProxy"]:
70 release_range = ReleaseRange(self._counter)
71 self._release_ranges.append(release_range)
72 try:
73 yield self
74 finally:
75 # Lock the last release range to the final position that
76 # has been eaten.
77 release_range.lock()
79 def eat(self, point: int) -> Any:
80 eaten_tokens = self._release_ranges[-1].tokens
81 if point < len(eaten_tokens):
82 return eaten_tokens[point]
83 else:
84 while point >= len(eaten_tokens):
85 token = next(self._tokens)
86 eaten_tokens.append(token)
87 return token
89 def __iter__(self) -> "TokenProxy":
90 return self
92 def __next__(self) -> Any:
93 # If the current position is already compromised (looked up)
94 # return the eaten token, if not just go further on the given
95 # token producer.
96 for release_range in self._release_ranges:
97 assert release_range.end is not None
99 start, end = release_range.start, release_range.end
100 if start <= self._counter < end:
101 token = release_range.tokens[self._counter - start]
102 break
103 else:
104 token = next(self._tokens)
105 self._counter += 1
106 return token
108 def can_advance(self, to: int) -> bool:
109 # Try to eat, fail if it can't. The eat operation is cached
110 # so there won't be any additional cost of eating here
111 try:
112 self.eat(to)
113 except StopIteration:
114 return False
115 else:
116 return True
119class Driver(object):
120 def __init__(self, grammar: Grammar, logger: Optional[Logger] = None) -> None:
121 self.grammar = grammar
122 if logger is None:
123 logger = logging.getLogger(__name__)
124 self.logger = logger
126 def parse_tokens(self, tokens: Iterable[GoodTokenInfo], debug: bool = False) -> NL:
127 """Parse a series of tokens and return the syntax tree."""
128 # XXX Move the prefix computation into a wrapper around tokenize.
129 proxy = TokenProxy(tokens)
131 p = parse.Parser(self.grammar)
132 p.setup(proxy=proxy)
134 lineno = 1
135 column = 0
136 indent_columns: List[int] = []
137 type = value = start = end = line_text = None
138 prefix = ""
140 for quintuple in proxy:
141 type, value, start, end, line_text = quintuple
142 if start != (lineno, column):
143 assert (lineno, column) <= start, ((lineno, column), start)
144 s_lineno, s_column = start
145 if lineno < s_lineno:
146 prefix += "\n" * (s_lineno - lineno)
147 lineno = s_lineno
148 column = 0
149 if column < s_column:
150 prefix += line_text[column:s_column]
151 column = s_column
152 if type in (tokenize.COMMENT, tokenize.NL):
153 prefix += value
154 lineno, column = end
155 if value.endswith("\n"):
156 lineno += 1
157 column = 0
158 continue
159 if type == token.OP:
160 type = grammar.opmap[value]
161 if debug:
162 assert type is not None
163 self.logger.debug(
164 "%s %r (prefix=%r)", token.tok_name[type], value, prefix
165 )
166 if type == token.INDENT:
167 indent_columns.append(len(value))
168 _prefix = prefix + value
169 prefix = ""
170 value = ""
171 elif type == token.DEDENT:
172 _indent_col = indent_columns.pop()
173 prefix, _prefix = self._partially_consume_prefix(prefix, _indent_col)
174 if p.addtoken(cast(int, type), value, (prefix, start)):
175 if debug:
176 self.logger.debug("Stop.")
177 break
178 prefix = ""
179 if type in {token.INDENT, token.DEDENT}:
180 prefix = _prefix
181 lineno, column = end
182 if value.endswith("\n"):
183 lineno += 1
184 column = 0
185 else:
186 # We never broke out -- EOF is too soon (how can this happen???)
187 assert start is not None
188 raise parse.ParseError("incomplete input", type, value, (prefix, start))
189 assert p.rootnode is not None
190 return p.rootnode
192 def parse_stream_raw(self, stream: IO[Text], debug: bool = False) -> NL:
193 """Parse a stream and return the syntax tree."""
194 tokens = tokenize.generate_tokens(stream.readline, grammar=self.grammar)
195 return self.parse_tokens(tokens, debug)
197 def parse_stream(self, stream: IO[Text], debug: bool = False) -> NL:
198 """Parse a stream and return the syntax tree."""
199 return self.parse_stream_raw(stream, debug)
201 def parse_file(
202 self, filename: Path, encoding: Optional[Text] = None, debug: bool = False
203 ) -> NL:
204 """Parse a file and return the syntax tree."""
205 with io.open(filename, "r", encoding=encoding) as stream:
206 return self.parse_stream(stream, debug)
208 def parse_string(self, text: Text, debug: bool = False) -> NL:
209 """Parse a string and return the syntax tree."""
210 tokens = tokenize.generate_tokens(
211 io.StringIO(text).readline, grammar=self.grammar
212 )
213 return self.parse_tokens(tokens, debug)
215 def _partially_consume_prefix(self, prefix: Text, column: int) -> Tuple[Text, Text]:
216 lines: List[str] = []
217 current_line = ""
218 current_column = 0
219 wait_for_nl = False
220 for char in prefix:
221 current_line += char
222 if wait_for_nl:
223 if char == "\n":
224 if current_line.strip() and current_column < column:
225 res = "".join(lines)
226 return res, prefix[len(res) :]
228 lines.append(current_line)
229 current_line = ""
230 current_column = 0
231 wait_for_nl = False
232 elif char in " \t":
233 current_column += 1
234 elif char == "\n":
235 # unexpected empty line
236 current_column = 0
237 else:
238 # indent is finished
239 wait_for_nl = True
240 return "".join(lines), current_line
243def _generate_pickle_name(gt: Path, cache_dir: Optional[Path] = None) -> Text:
244 head, tail = os.path.splitext(gt)
245 if tail == ".txt":
246 tail = ""
247 name = head + tail + ".".join(map(str, sys.version_info)) + ".pickle"
248 if cache_dir:
249 return os.path.join(cache_dir, os.path.basename(name))
250 else:
251 return name
254def load_grammar(
255 gt: Text = "Grammar.txt",
256 gp: Optional[Text] = None,
257 save: bool = True,
258 force: bool = False,
259 logger: Optional[Logger] = None,
260) -> Grammar:
261 """Load the grammar (maybe from a pickle)."""
262 if logger is None:
263 logger = logging.getLogger(__name__)
264 gp = _generate_pickle_name(gt) if gp is None else gp
265 if force or not _newer(gp, gt):
266 g: grammar.Grammar = pgen.generate_grammar(gt)
267 if save:
268 try:
269 g.dump(gp)
270 except OSError:
271 # Ignore error, caching is not vital.
272 pass
273 else:
274 g = grammar.Grammar()
275 g.load(gp)
276 return g
279def _newer(a: Text, b: Text) -> bool:
280 """Inquire whether file a was written since file b."""
281 if not os.path.exists(a):
282 return False
283 if not os.path.exists(b):
284 return True
285 return os.path.getmtime(a) >= os.path.getmtime(b)
288def load_packaged_grammar(
289 package: str, grammar_source: Text, cache_dir: Optional[Path] = None
290) -> grammar.Grammar:
291 """Normally, loads a pickled grammar by doing
292 pkgutil.get_data(package, pickled_grammar)
293 where *pickled_grammar* is computed from *grammar_source* by adding the
294 Python version and using a ``.pickle`` extension.
296 However, if *grammar_source* is an extant file, load_grammar(grammar_source)
297 is called instead. This facilitates using a packaged grammar file when needed
298 but preserves load_grammar's automatic regeneration behavior when possible.
300 """
301 if os.path.isfile(grammar_source):
302 gp = _generate_pickle_name(grammar_source, cache_dir) if cache_dir else None
303 return load_grammar(grammar_source, gp=gp)
304 pickled_name = _generate_pickle_name(os.path.basename(grammar_source), cache_dir)
305 data = pkgutil.get_data(package, pickled_name)
306 assert data is not None
307 g = grammar.Grammar()
308 g.loads(data)
309 return g
312def main(*args: Text) -> bool:
313 """Main program, when run as a script: produce grammar pickle files.
315 Calls load_grammar for each argument, a path to a grammar text file.
316 """
317 if not args:
318 args = tuple(sys.argv[1:])
319 logging.basicConfig(level=logging.INFO, stream=sys.stdout, format="%(message)s")
320 for gt in args:
321 load_grammar(gt, save=True, force=True)
322 return True
325if __name__ == "__main__":
326 sys.exit(int(not main()))