Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/pasta/base/token_generator.py: 93%
327 statements
« prev ^ index » next coverage.py v7.2.7, created at 2023-06-07 06:12 +0000
« prev ^ index » next coverage.py v7.2.7, created at 2023-06-07 06:12 +0000
1# Copyright 2021 Google LLC
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7# https://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
15# coding=utf-8
16"""Token generator for analyzing source code in logical units.
18This module contains the TokenGenerator used for annotating a parsed syntax tree
19with source code formatting.
20"""
21# Copyright 2021 Google LLC
22#
23# Licensed under the Apache License, Version 2.0 (the "License");
24# you may not use this file except in compliance with the License.
25# You may obtain a copy of the License at
26#
27# https://www.apache.org/licenses/LICENSE-2.0
28#
29# Unless required by applicable law or agreed to in writing, software
30# distributed under the License is distributed on an "AS IS" BASIS,
31# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
32# See the License for the specific language governing permissions and
33# limitations under the License.
35from __future__ import absolute_import
36from __future__ import division
37from __future__ import print_function
39import ast
40import collections
41import contextlib
42import itertools
43import tokenize
44from six import StringIO
46from pasta.base import formatting as fmt
47from pasta.base import fstring_utils
49# Alias for extracting token names
50TOKENS = tokenize
51Token = collections.namedtuple('Token', ('type', 'src', 'start', 'end', 'line'))
52FORMATTING_TOKENS = (TOKENS.INDENT, TOKENS.DEDENT, TOKENS.NL, TOKENS.NEWLINE,
53 TOKENS.COMMENT)
56class TokenGenerator(object):
57 """Helper for sequentially parsing Python source code, token by token.
59 Holds internal state during parsing, including:
60 _tokens: List of tokens in the source code, as parsed by `tokenize` module.
61 _parens: Stack of open parenthesis at the current point in parsing.
62 _hints: Number of open parentheses, brackets, etc. at the current point.
63 _scope_stack: Stack containing tuples of nodes where the last parenthesis that
64 was open is related to one of the nodes on the top of the stack.
65 _lines: Full lines of the source code.
66 _i: Index of the last token that was parsed. Initially -1.
67 _loc: (lineno, column_offset) pair of the position in the source that has been
68 parsed to. This should be either the start or end of the token at index _i.
70 Arguments:
71 ignore_error_tokens: If True, will ignore error tokens. Otherwise, an error
72 token will cause an exception. This is useful when the source being parsed
73 contains invalid syntax, e.g. if it is in an fstring context.
74 """
76 def __init__(self, source, ignore_error_token=False):
77 self.lines = source.splitlines(True)
78 self._tokens = list(_generate_tokens(source, ignore_error_token))
79 self._parens = []
80 self._hints = 0
81 self._scope_stack = []
82 self._len = len(self._tokens)
83 self._i = -1
84 self._loc = self.loc_begin()
86 def chars_consumed(self):
87 return len(self._space_between((1, 0), self._tokens[self._i].end))
89 def loc_begin(self):
90 """Get the start column of the current location parsed to."""
91 if self._i < 0:
92 return (1, 0)
93 return self._tokens[self._i].start
95 def loc_end(self):
96 """Get the end column of the current location parsed to."""
97 if self._i < 0:
98 return (1, 0)
99 return self._tokens[self._i].end
101 def peek(self):
102 """Get the next token without advancing."""
103 if self._i + 1 >= self._len:
104 return None
105 return self._tokens[self._i + 1]
107 def peek_non_whitespace(self):
108 """Get the next non-whitespace token without advancing."""
109 return self.peek_conditional(lambda t: t.type not in FORMATTING_TOKENS)
111 def peek_conditional(self, condition):
112 """Get the next token of the given type without advancing."""
113 return next((t for t in self._tokens[self._i + 1:] if condition(t)), None)
115 def next(self, advance=True):
116 """Consume the next token and optionally advance the current location."""
117 self._i += 1
118 if self._i >= self._len:
119 return None
120 if advance:
121 self._loc = self._tokens[self._i].end
122 return self._tokens[self._i]
124 def rewind(self, amount=1):
125 """Rewind the token iterator."""
126 self._i -= amount
128 def whitespace(self, max_lines=None, comment=False):
129 """Parses whitespace from the current _loc to the next non-whitespace.
131 Arguments:
132 max_lines: (optional int) Maximum number of lines to consider as part of
133 the whitespace. Valid values are None, 0 and 1.
134 comment: (boolean) If True, look for a trailing comment even when not in
135 a parenthesized scope.
137 Pre-condition:
138 `_loc' represents the point before which everything has been parsed and
139 after which nothing has been parsed.
140 Post-condition:
141 `_loc' is exactly at the character that was parsed to.
142 """
143 next_token = self.peek()
144 if not comment and next_token and next_token.type == TOKENS.COMMENT:
145 return ''
146 def predicate(token):
147 return (token.type in (TOKENS.INDENT, TOKENS.DEDENT) or
148 token.type == TOKENS.COMMENT and (comment or self._hints) or
149 token.type == TOKENS.ERRORTOKEN and token.src == ' ' or
150 max_lines is None and token.type in (TOKENS.NL, TOKENS.NEWLINE))
151 whitespace = list(self.takewhile(predicate, advance=False))
152 next_token = self.peek()
154 result = ''
155 for tok in itertools.chain(whitespace,
156 ((next_token,) if next_token else ())):
157 result += self._space_between(self._loc, tok.start)
158 if tok != next_token:
159 result += tok.src
160 self._loc = tok.end
161 else:
162 self._loc = tok.start
164 # Eat a single newline character
165 if ((max_lines is None or max_lines > 0) and
166 next_token and next_token.type in (TOKENS.NL, TOKENS.NEWLINE)):
167 result += self.next().src
169 return result
171 def block_whitespace(self, indent_level):
172 """Parses whitespace from the current _loc to the end of the block."""
173 # Get the normal suffix lines, but don't advance the token index unless
174 # there is no indentation to account for
175 start_i = self._i
176 full_whitespace = self.whitespace(comment=True)
177 if not indent_level:
178 return full_whitespace
179 self._i = start_i
181 # Trim the full whitespace into only lines that match the indentation level
182 lines = full_whitespace.splitlines(True)
183 try:
184 last_line_idx = next(i for i, line in reversed(list(enumerate(lines)))
185 if line.startswith(indent_level + '#'))
186 except StopIteration:
187 # No comment lines at the end of this block
188 self._loc = self._tokens[self._i].end
189 return ''
190 lines = lines[:last_line_idx + 1]
192 # Advance the current location to the last token in the lines we've read
193 end_line = self._tokens[self._i].end[0] + 1 + len(lines)
194 list(self.takewhile(lambda tok: tok.start[0] < end_line))
195 self._loc = self._tokens[self._i].end
196 return ''.join(lines)
198 def dots(self, num_dots):
199 """Parse a number of dots.
201 This is to work around an oddity in python3's tokenizer, which treats three
202 `.` tokens next to each other in a FromImport's level as an ellipsis. This
203 parses until the expected number of dots have been seen.
204 """
205 result = ''
206 dots_seen = 0
207 prev_loc = self._loc
208 while dots_seen < num_dots:
209 tok = self.next()
210 assert tok.src in ('.', '...')
211 result += self._space_between(prev_loc, tok.start) + tok.src
212 dots_seen += tok.src.count('.')
213 prev_loc = self._loc
214 return result
216 def open_scope(self, node, single_paren=False):
217 """Open a parenthesized scope on the given node."""
218 result = ''
219 parens = []
220 start_i = self._i
221 start_loc = prev_loc = self._loc
223 # Eat whitespace or '(' tokens one at a time
224 for tok in self.takewhile(
225 lambda t: t.type in FORMATTING_TOKENS or t.src == '('):
226 # Stores all the code up to and including this token
227 result += self._space_between(prev_loc, tok.start)
229 if tok.src == '(' and single_paren and parens:
230 self.rewind()
231 self._loc = tok.start
232 break
234 result += tok.src
235 if tok.src == '(':
236 # Start a new scope
237 parens.append(result)
238 result = ''
239 start_i = self._i
240 start_loc = self._loc
241 prev_loc = self._loc
243 if parens:
244 # Add any additional whitespace on to the last open-paren
245 next_tok = self.peek()
246 parens[-1] += result + self._space_between(self._loc, next_tok.start)
247 self._loc = next_tok.start
248 # Add each paren onto the stack
249 for paren in parens:
250 self._parens.append(paren)
251 self._scope_stack.append(_scope_helper(node))
252 else:
253 # No parens were encountered, then reset like this method did nothing
254 self._i = start_i
255 self._loc = start_loc
257 def close_scope(self, node, prefix_attr='prefix', suffix_attr='suffix',
258 trailing_comma=False, single_paren=False):
259 """Close a parenthesized scope on the given node, if one is open."""
260 # Ensures the prefix + suffix are not None
261 if fmt.get(node, prefix_attr) is None:
262 fmt.set(node, prefix_attr, '')
263 if fmt.get(node, suffix_attr) is None:
264 fmt.set(node, suffix_attr, '')
266 if not self._parens or node not in self._scope_stack[-1]:
267 return
268 symbols = {')'}
269 if trailing_comma:
270 symbols.add(',')
271 parsed_to_i = self._i
272 parsed_to_loc = prev_loc = self._loc
273 encountered_paren = False
274 result = ''
276 for tok in self.takewhile(
277 lambda t: t.type in FORMATTING_TOKENS or t.src in symbols):
278 # Consume all space up to this token
279 result += self._space_between(prev_loc, tok.start)
280 if tok.src == ')' and single_paren and encountered_paren:
281 self.rewind()
282 parsed_to_i = self._i
283 parsed_to_loc = tok.start
284 fmt.append(node, suffix_attr, result)
285 break
287 # Consume the token itself
288 result += tok.src
290 if tok.src == ')':
291 # Close out the open scope
292 encountered_paren = True
293 self._scope_stack.pop()
294 fmt.prepend(node, prefix_attr, self._parens.pop())
295 fmt.append(node, suffix_attr, result)
296 result = ''
297 parsed_to_i = self._i
298 parsed_to_loc = tok.end
299 if not self._parens or node not in self._scope_stack[-1]:
300 break
301 prev_loc = tok.end
303 # Reset back to the last place where we parsed anything
304 self._i = parsed_to_i
305 self._loc = parsed_to_loc
307 def hint_open(self):
308 """Indicates opening a group of parentheses or brackets."""
309 self._hints += 1
311 def hint_closed(self):
312 """Indicates closing a group of parentheses or brackets."""
313 self._hints -= 1
314 if self._hints < 0:
315 raise ValueError('Hint value negative')
317 @contextlib.contextmanager
318 def scope(self, node, attr=None, trailing_comma=False):
319 """Context manager to handle a parenthesized scope."""
320 self.open_scope(node, single_paren=(attr is not None))
321 yield
322 if attr:
323 self.close_scope(node, prefix_attr=attr + '_prefix',
324 suffix_attr=attr + '_suffix',
325 trailing_comma=trailing_comma,
326 single_paren=True)
327 else:
328 self.close_scope(node, trailing_comma=trailing_comma)
330 def is_in_scope(self):
331 """Return True iff there is a scope open."""
332 return self._parens or self._hints
334 def str(self):
335 """Parse a full string literal from the input."""
336 def predicate(token):
337 return (token.type in (TOKENS.STRING, TOKENS.COMMENT) or
338 self.is_in_scope() and token.type in (TOKENS.NL, TOKENS.NEWLINE))
340 return self.eat_tokens(predicate)
342 def eat_tokens(self, predicate):
343 """Parse input from tokens while a given condition is met."""
344 content = ''
345 prev_loc = self._loc
346 tok = None
347 for tok in self.takewhile(predicate, advance=False):
348 content += self._space_between(prev_loc, tok.start)
349 content += tok.src
350 prev_loc = tok.end
352 if tok:
353 self._loc = tok.end
354 return content
356 def fstr(self):
357 """Parses an fstring, including subexpressions.
359 Returns:
360 A generator function which, when repeatedly reads a chunk of the fstring
361 up until the next subexpression and yields that chunk, plus a new token
362 generator to use to parse the subexpression. The subexpressions in the
363 original fstring data are replaced by placeholders to make it possible to
364 fill them in with new values, if desired.
365 """
366 def fstr_parser():
367 # Reads the whole fstring as a string, then parses it char by char
368 if self.peek_non_whitespace().type == TOKENS.STRING:
369 # Normal fstrings are one ore more STRING tokens, maybe mixed with
370 # spaces, e.g.: f"Hello, {name}"
371 str_content = self.str()
372 else:
373 # Format specifiers in fstrings are also JoinedStr nodes, but these are
374 # arbitrary expressions, e.g. in: f"{value:{width}.{precision}}", the
375 # format specifier is an fstring: "{width}.{precision}" but these are
376 # not STRING tokens.
377 def fstr_eater(tok):
378 if tok.type == TOKENS.OP and tok.src == '}':
379 if fstr_eater.level <= 0:
380 return False
381 fstr_eater.level -= 1
382 if tok.type == TOKENS.OP and tok.src == '{':
383 fstr_eater.level += 1
384 return True
385 fstr_eater.level = 0
386 str_content = self.eat_tokens(fstr_eater)
388 indexed_chars = enumerate(str_content)
389 val_idx = 0
390 i = -1
391 result = ''
392 in_fstring = False
393 string_quote = None
394 while i < len(str_content) - 1:
395 i, c = next(indexed_chars)
396 result += c
398 # If we haven't actually parsing string content yet, check if a string
399 # (with or without fstring prefix) has started
400 if string_quote is None:
401 if str_content[i:i+4] in ('f"""', "f'''"):
402 string_quote = str_content[i+1:i+4]
403 in_fstring = True
404 elif str_content[i:i+3] in ('"""', "'''"):
405 string_quote = str_content[i:i+3]
406 in_fstring = False
407 elif str_content[i:+2] in ('f"', "f'"):
408 string_quote = str_content[i+1]
409 in_fstring = True
410 elif c in ('"', "'"):
411 string_quote = c
412 in_fstring = False
413 if string_quote:
414 # Skip uneaten quote characters
415 for _ in range(len(string_quote) + (1 if in_fstring else 0) - 1):
416 i, c = next(indexed_chars)
417 result += c
418 continue
420 # If we are still not parsing characters in a string, no extra
421 # processing is needed
422 if string_quote is None:
423 continue
425 # If we ARE in a string, check if the next characters are the
426 # close-quote for that string
427 if (str_content[i:i+len(string_quote)] == string_quote and
428 str_content[i-1] != '\\'):
429 # Skip uneaten quote characters
430 for _ in range(len(string_quote) - 1):
431 i, c = next(indexed_chars)
432 result += c
433 string_quote = None
434 in_fstring = False
435 continue
437 # If we are NOT in an fstring, skip all FormattedValue processing.
438 if not in_fstring:
439 continue
441 # When an open bracket is encountered, start parsing a subexpression
442 if c == '{':
443 # First check if this is part of an escape sequence
444 # (f"{{" is used to escape a bracket literal)
445 nexti, nextc = next(indexed_chars)
446 if nextc == '{':
447 result += c
448 continue
449 indexed_chars = itertools.chain([(nexti, nextc)], indexed_chars)
451 # Add a placeholder onto the result
452 result += fstring_utils.placeholder(val_idx) + '}'
453 val_idx += 1
455 # Yield a new token generator to parse the subexpression only
456 tg = TokenGenerator(str_content[i+1:], ignore_error_token=True)
457 yield (result, tg)
458 result = ''
460 # Skip the number of characters consumed by the subexpression
461 for tg_i in range(tg.chars_consumed()):
462 i, c = next(indexed_chars)
464 # Eat up to and including the close bracket
465 i, c = next(indexed_chars)
466 while c != '}':
467 i, c = next(indexed_chars)
468 # Yield the rest of the fstring, when done
469 yield (result, None)
470 return fstr_parser
472 def _space_between(self, start_loc, end_loc):
473 """Parse the space between a location and the next token"""
474 if start_loc > end_loc:
475 raise ValueError('start_loc > end_loc', start_loc, end_loc)
476 if start_loc[0] > len(self.lines):
477 return ''
479 prev_row, prev_col = start_loc
480 end_row, end_col = end_loc
481 if prev_row == end_row:
482 return self.lines[prev_row - 1][prev_col:end_col]
484 return ''.join(itertools.chain(
485 (self.lines[prev_row - 1][prev_col:],),
486 self.lines[prev_row:end_row - 1],
487 (self.lines[end_row - 1][:end_col],) if end_col > 0 else '',
488 ))
490 def next_name(self):
491 """Parse the next name token."""
492 last_i = self._i
493 def predicate(token):
494 return token.type != TOKENS.NAME
496 unused_tokens = list(self.takewhile(predicate, advance=False))
497 result = self.next(advance=False)
498 self._i = last_i
499 return result
501 def next_of_type(self, token_type):
502 """Parse a token of the given type and return it."""
503 token = self.next()
504 if token.type != token_type:
505 raise ValueError("Expected %r but found %r\nline %d: %s" % (
506 tokenize.tok_name[token_type], token.src, token.start[0],
507 self.lines[token.start[0] - 1]))
508 return token
510 def takewhile(self, condition, advance=True):
511 """Parse tokens as long as a condition holds on the next token."""
512 prev_loc = self._loc
513 token = self.next(advance=advance)
514 while token is not None and condition(token):
515 yield token
516 prev_loc = self._loc
517 token = self.next(advance=advance)
518 self.rewind()
519 self._loc = prev_loc
522def _scope_helper(node):
523 """Get the closure of nodes that could begin a scope at this point.
525 For instance, when encountering a `(` when parsing a BinOp node, this could
526 indicate that the BinOp itself is parenthesized OR that the BinOp's left node
527 could be parenthesized.
529 E.g.: (a + b * c) or (a + b) * c or (a) + b * c
530 ^ ^ ^
532 Arguments:
533 node: (ast.AST) Node encountered when opening a scope.
535 Returns:
536 A closure of nodes which that scope might apply to.
537 """
538 if isinstance(node, ast.Attribute):
539 return (node,) + _scope_helper(node.value)
540 if isinstance(node, ast.Subscript):
541 return (node,) + _scope_helper(node.value)
542 if isinstance(node, ast.Assign):
543 return (node,) + _scope_helper(node.targets[0])
544 if isinstance(node, ast.AugAssign):
545 return (node,) + _scope_helper(node.target)
546 if isinstance(node, ast.Expr):
547 return (node,) + _scope_helper(node.value)
548 if isinstance(node, ast.Compare):
549 return (node,) + _scope_helper(node.left)
550 if isinstance(node, ast.BoolOp):
551 return (node,) + _scope_helper(node.values[0])
552 if isinstance(node, ast.BinOp):
553 return (node,) + _scope_helper(node.left)
554 if isinstance(node, ast.Tuple) and node.elts:
555 return (node,) + _scope_helper(node.elts[0])
556 if isinstance(node, ast.Call):
557 return (node,) + _scope_helper(node.func)
558 if isinstance(node, ast.GeneratorExp):
559 return (node,) + _scope_helper(node.elt)
560 if isinstance(node, ast.IfExp):
561 return (node,) + _scope_helper(node.body)
562 return (node,)
565def _generate_tokens(source, ignore_error_token=False):
566 token_generator = tokenize.generate_tokens(StringIO(source).readline)
567 try:
568 for tok in token_generator:
569 yield Token(*tok)
570 except tokenize.TokenError:
571 if not ignore_error_token:
572 raise