Coverage for /pythoncovmergedfiles/medio/medio/src/black/src/blib2to3/pgen2/driver.py: 78%
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1# Copyright 2004-2005 Elemental Security, Inc. All Rights Reserved.
2# Licensed to PSF under a Contributor Agreement.
4# Modifications:
5# Copyright 2006 Google, Inc. All Rights Reserved.
6# Licensed to PSF under a Contributor Agreement.
8"""Parser driver.
10This provides a high-level interface to parse a file into a syntax tree.
12"""
14__author__ = "Guido van Rossum <guido@python.org>"
16__all__ = ["Driver", "load_grammar"]
18# Python imports
19import io
20import logging
21import os
22import pkgutil
23import sys
24from collections.abc import Iterable, Iterator
25from contextlib import contextmanager
26from dataclasses import dataclass, field
27from logging import Logger
28from typing import IO, Any, Optional, Union, cast
30from blib2to3.pgen2.grammar import Grammar
31from blib2to3.pgen2.tokenize import TokenInfo
32from blib2to3.pytree import NL
34# Pgen imports
35from . import grammar, parse, pgen, token, tokenize
37Path = Union[str, "os.PathLike[str]"]
40@dataclass
41class ReleaseRange:
42 start: int
43 end: Optional[int] = None
44 tokens: list[Any] = field(default_factory=list)
46 def lock(self) -> None:
47 total_eaten = len(self.tokens)
48 self.end = self.start + total_eaten
51class TokenProxy:
52 def __init__(self, generator: Any) -> None:
53 self._tokens = generator
54 self._counter = 0
55 self._release_ranges: list[ReleaseRange] = []
57 @contextmanager
58 def release(self) -> Iterator["TokenProxy"]:
59 release_range = ReleaseRange(self._counter)
60 self._release_ranges.append(release_range)
61 try:
62 yield self
63 finally:
64 # Lock the last release range to the final position that
65 # has been eaten.
66 release_range.lock()
68 def eat(self, point: int) -> Any:
69 eaten_tokens = self._release_ranges[-1].tokens
70 if point < len(eaten_tokens):
71 return eaten_tokens[point]
72 else:
73 while point >= len(eaten_tokens):
74 token = next(self._tokens)
75 eaten_tokens.append(token)
76 return token
78 def __iter__(self) -> "TokenProxy":
79 return self
81 def __next__(self) -> Any:
82 # If the current position is already compromised (looked up)
83 # return the eaten token, if not just go further on the given
84 # token producer.
85 for release_range in self._release_ranges:
86 assert release_range.end is not None
88 start, end = release_range.start, release_range.end
89 if start <= self._counter < end:
90 token = release_range.tokens[self._counter - start]
91 break
92 else:
93 token = next(self._tokens)
94 self._counter += 1
95 return token
97 def can_advance(self, to: int) -> bool:
98 # Try to eat, fail if it can't. The eat operation is cached
99 # so there won't be any additional cost of eating here
100 try:
101 self.eat(to)
102 except StopIteration:
103 return False
104 else:
105 return True
108class Driver:
109 def __init__(self, grammar: Grammar, logger: Optional[Logger] = None) -> None:
110 self.grammar = grammar
111 if logger is None:
112 logger = logging.getLogger(__name__)
113 self.logger = logger
115 def parse_tokens(self, tokens: Iterable[TokenInfo], debug: bool = False) -> NL:
116 """Parse a series of tokens and return the syntax tree."""
117 # XXX Move the prefix computation into a wrapper around tokenize.
118 proxy = TokenProxy(tokens)
120 p = parse.Parser(self.grammar)
121 p.setup(proxy=proxy)
123 lineno = 1
124 column = 0
125 indent_columns: list[int] = []
126 type = value = start = end = line_text = None
127 prefix = ""
129 for quintuple in proxy:
130 type, value, start, end, line_text = quintuple
131 if start != (lineno, column):
132 assert (lineno, column) <= start, ((lineno, column), start)
133 s_lineno, s_column = start
134 if lineno < s_lineno:
135 prefix += "\n" * (s_lineno - lineno)
136 lineno = s_lineno
137 column = 0
138 if column < s_column:
139 prefix += line_text[column:s_column]
140 column = s_column
141 if type in (tokenize.COMMENT, tokenize.NL):
142 prefix += value
143 lineno, column = end
144 if value.endswith("\n"):
145 lineno += 1
146 column = 0
147 continue
148 if type == token.OP:
149 type = grammar.opmap[value]
150 if debug:
151 assert type is not None
152 self.logger.debug(
153 "%s %r (prefix=%r)", token.tok_name[type], value, prefix
154 )
155 if type == token.INDENT:
156 indent_columns.append(len(value))
157 _prefix = prefix + value
158 prefix = ""
159 value = ""
160 elif type == token.DEDENT:
161 _indent_col = indent_columns.pop()
162 prefix, _prefix = self._partially_consume_prefix(prefix, _indent_col)
163 if p.addtoken(cast(int, type), value, (prefix, start)):
164 if debug:
165 self.logger.debug("Stop.")
166 break
167 prefix = ""
168 if type in {token.INDENT, token.DEDENT}:
169 prefix = _prefix
170 lineno, column = end
171 # FSTRING_MIDDLE is the only token that can end with a newline, and
172 # `end` will point to the next line. For that case, don't increment lineno.
173 if value.endswith("\n") and type != token.FSTRING_MIDDLE:
174 lineno += 1
175 column = 0
176 else:
177 # We never broke out -- EOF is too soon (how can this happen???)
178 assert start is not None
179 raise parse.ParseError("incomplete input", type, value, (prefix, start))
180 assert p.rootnode is not None
181 return p.rootnode
183 def parse_file(
184 self, filename: Path, encoding: Optional[str] = None, debug: bool = False
185 ) -> NL:
186 """Parse a file and return the syntax tree."""
187 with open(filename, encoding=encoding) as stream:
188 text = stream.read()
189 return self.parse_string(text, debug)
191 def parse_string(self, text: str, debug: bool = False) -> NL:
192 """Parse a string and return the syntax tree."""
193 tokens = tokenize.tokenize(text, grammar=self.grammar)
194 return self.parse_tokens(tokens, debug)
196 def _partially_consume_prefix(self, prefix: str, column: int) -> tuple[str, str]:
197 lines: list[str] = []
198 current_line = ""
199 current_column = 0
200 wait_for_nl = False
201 for char in prefix:
202 current_line += char
203 if wait_for_nl:
204 if char == "\n":
205 if current_line.strip() and current_column < column:
206 res = "".join(lines)
207 return res, prefix[len(res) :]
209 lines.append(current_line)
210 current_line = ""
211 current_column = 0
212 wait_for_nl = False
213 elif char in " \t":
214 current_column += 1
215 elif char == "\n":
216 # unexpected empty line
217 current_column = 0
218 elif char == "\f":
219 current_column = 0
220 else:
221 # indent is finished
222 wait_for_nl = True
223 return "".join(lines), current_line
226def _generate_pickle_name(gt: Path, cache_dir: Optional[Path] = None) -> str:
227 head, tail = os.path.splitext(gt)
228 if tail == ".txt":
229 tail = ""
230 name = head + tail + ".".join(map(str, sys.version_info)) + ".pickle"
231 if cache_dir:
232 return os.path.join(cache_dir, os.path.basename(name))
233 else:
234 return name
237def load_grammar(
238 gt: str = "Grammar.txt",
239 gp: Optional[str] = None,
240 save: bool = True,
241 force: bool = False,
242 logger: Optional[Logger] = None,
243) -> Grammar:
244 """Load the grammar (maybe from a pickle)."""
245 if logger is None:
246 logger = logging.getLogger(__name__)
247 gp = _generate_pickle_name(gt) if gp is None else gp
248 if force or not _newer(gp, gt):
249 g: grammar.Grammar = pgen.generate_grammar(gt)
250 if save:
251 try:
252 g.dump(gp)
253 except OSError:
254 # Ignore error, caching is not vital.
255 pass
256 else:
257 g = grammar.Grammar()
258 g.load(gp)
259 return g
262def _newer(a: str, b: str) -> bool:
263 """Inquire whether file a was written since file b."""
264 if not os.path.exists(a):
265 return False
266 if not os.path.exists(b):
267 return True
268 return os.path.getmtime(a) >= os.path.getmtime(b)
271def load_packaged_grammar(
272 package: str, grammar_source: str, cache_dir: Optional[Path] = None
273) -> grammar.Grammar:
274 """Normally, loads a pickled grammar by doing
275 pkgutil.get_data(package, pickled_grammar)
276 where *pickled_grammar* is computed from *grammar_source* by adding the
277 Python version and using a ``.pickle`` extension.
279 However, if *grammar_source* is an extant file, load_grammar(grammar_source)
280 is called instead. This facilitates using a packaged grammar file when needed
281 but preserves load_grammar's automatic regeneration behavior when possible.
283 """
284 if os.path.isfile(grammar_source):
285 gp = _generate_pickle_name(grammar_source, cache_dir) if cache_dir else None
286 return load_grammar(grammar_source, gp=gp)
287 pickled_name = _generate_pickle_name(os.path.basename(grammar_source), cache_dir)
288 data = pkgutil.get_data(package, pickled_name)
289 assert data is not None
290 g = grammar.Grammar()
291 g.loads(data)
292 return g
295def main(*args: str) -> bool:
296 """Main program, when run as a script: produce grammar pickle files.
298 Calls load_grammar for each argument, a path to a grammar text file.
299 """
300 if not args:
301 args = tuple(sys.argv[1:])
302 logging.basicConfig(level=logging.INFO, stream=sys.stdout, format="%(message)s")
303 for gt in args:
304 load_grammar(gt, save=True, force=True)
305 return True
308if __name__ == "__main__":
309 sys.exit(int(not main()))