Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/black/ranges.py: 15%
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1"""Functions related to Black's formatting by line ranges feature."""
3import difflib
4from collections.abc import Collection, Iterator, Sequence
5from dataclasses import dataclass
6from typing import Union
8from black.nodes import (
9 LN,
10 STANDALONE_COMMENT,
11 Leaf,
12 Node,
13 Visitor,
14 first_leaf,
15 furthest_ancestor_with_last_leaf,
16 last_leaf,
17 syms,
18)
19from blib2to3.pgen2.token import ASYNC, NEWLINE
22def parse_line_ranges(line_ranges: Sequence[str]) -> list[tuple[int, int]]:
23 lines: list[tuple[int, int]] = []
24 for lines_str in line_ranges:
25 parts = lines_str.split("-")
26 if len(parts) != 2:
27 raise ValueError(
28 "Incorrect --line-ranges format, expect 'START-END', found"
29 f" {lines_str!r}"
30 )
31 try:
32 start = int(parts[0])
33 end = int(parts[1])
34 except ValueError:
35 raise ValueError(
36 "Incorrect --line-ranges value, expect integer ranges, found"
37 f" {lines_str!r}"
38 ) from None
39 else:
40 lines.append((start, end))
41 return lines
44def is_valid_line_range(lines: tuple[int, int]) -> bool:
45 """Returns whether the line range is valid."""
46 return not lines or lines[0] <= lines[1]
49def sanitized_lines(
50 lines: Collection[tuple[int, int]], src_contents: str
51) -> Collection[tuple[int, int]]:
52 """Returns the valid line ranges for the given source.
54 This removes ranges that are entirely outside the valid lines.
56 Other ranges are normalized so that the start values are at least 1 and the
57 end values are at most the (1-based) index of the last source line.
58 """
59 if not src_contents:
60 return []
61 good_lines = []
62 src_line_count = src_contents.count("\n")
63 if not src_contents.endswith("\n"):
64 src_line_count += 1
65 for start, end in lines:
66 if start > src_line_count:
67 continue
68 # line-ranges are 1-based
69 start = max(start, 1)
70 if end < start:
71 continue
72 end = min(end, src_line_count)
73 good_lines.append((start, end))
74 return good_lines
77def adjusted_lines(
78 lines: Collection[tuple[int, int]],
79 original_source: str,
80 modified_source: str,
81) -> list[tuple[int, int]]:
82 """Returns the adjusted line ranges based on edits from the original code.
84 This computes the new line ranges by diffing original_source and
85 modified_source, and adjust each range based on how the range overlaps with
86 the diffs.
88 Note the diff can contain lines outside of the original line ranges. This can
89 happen when the formatting has to be done in adjacent to maintain consistent
90 local results. For example:
92 1. def my_func(arg1, arg2,
93 2. arg3,):
94 3. pass
96 If it restricts to line 2-2, it can't simply reformat line 2, it also has
97 to reformat line 1:
99 1. def my_func(
100 2. arg1,
101 3. arg2,
102 4. arg3,
103 5. ):
104 6. pass
106 In this case, we will expand the line ranges to also include the whole diff
107 block.
109 Args:
110 lines: a collection of line ranges.
111 original_source: the original source.
112 modified_source: the modified source.
113 """
114 lines_mappings = _calculate_lines_mappings(original_source, modified_source)
116 new_lines = []
117 # Keep an index of the current search. Since the lines and lines_mappings are
118 # sorted, this makes the search complexity linear.
119 current_mapping_index = 0
120 for start, end in sorted(lines):
121 start_mapping_index = _find_lines_mapping_index(
122 start,
123 lines_mappings,
124 current_mapping_index,
125 )
126 end_mapping_index = _find_lines_mapping_index(
127 end,
128 lines_mappings,
129 start_mapping_index,
130 )
131 current_mapping_index = start_mapping_index
132 if start_mapping_index >= len(lines_mappings) or end_mapping_index >= len(
133 lines_mappings
134 ):
135 # Protect against invalid inputs.
136 continue
137 start_mapping = lines_mappings[start_mapping_index]
138 end_mapping = lines_mappings[end_mapping_index]
139 if start_mapping.is_changed_block:
140 # When the line falls into a changed block, expands to the whole block.
141 new_start = start_mapping.modified_start
142 else:
143 new_start = (
144 start - start_mapping.original_start + start_mapping.modified_start
145 )
146 if end_mapping.is_changed_block:
147 # When the line falls into a changed block, expands to the whole block.
148 new_end = end_mapping.modified_end
149 else:
150 new_end = end - end_mapping.original_start + end_mapping.modified_start
151 new_range = (new_start, new_end)
152 if is_valid_line_range(new_range):
153 new_lines.append(new_range)
154 return new_lines
157def convert_unchanged_lines(src_node: Node, lines: Collection[tuple[int, int]]) -> None:
158 r"""Converts unchanged lines to STANDALONE_COMMENT.
160 The idea is similar to how `# fmt: on/off` is implemented. It also converts the
161 nodes between those markers as a single `STANDALONE_COMMENT` leaf node with
162 the unformatted code as its value. `STANDALONE_COMMENT` is a "fake" token
163 that will be formatted as-is with its prefix normalized.
165 Here we perform two passes:
167 1. Visit the top-level statements, and convert them to a single
168 `STANDALONE_COMMENT` when unchanged. This speeds up formatting when some
169 of the top-level statements aren't changed.
170 2. Convert unchanged "unwrapped lines" to `STANDALONE_COMMENT` nodes line by
171 line. "unwrapped lines" are divided by the `NEWLINE` token. e.g. a
172 multi-line statement is *one* "unwrapped line" that ends with `NEWLINE`,
173 even though this statement itself can span multiple lines, and the
174 tokenizer only sees the last '\n' as the `NEWLINE` token.
176 NOTE: During pass (2), comment prefixes and indentations are ALWAYS
177 normalized even when the lines aren't changed. This is fixable by moving
178 more formatting to pass (1). However, it's hard to get it correct when
179 incorrect indentations are used. So we defer this to future optimizations.
180 """
181 lines_set: set[int] = set()
182 for start, end in lines:
183 lines_set.update(range(start, end + 1))
184 visitor = _TopLevelStatementsVisitor(lines_set)
185 _ = list(visitor.visit(src_node)) # Consume all results.
186 _convert_unchanged_line_by_line(src_node, lines_set)
189def _contains_standalone_comment(node: LN) -> bool:
190 if isinstance(node, Leaf):
191 return node.type == STANDALONE_COMMENT
192 else:
193 for child in node.children:
194 if _contains_standalone_comment(child):
195 return True
196 return False
199class _TopLevelStatementsVisitor(Visitor[None]):
200 """
201 A node visitor that converts unchanged top-level statements to
202 STANDALONE_COMMENT.
204 This is used in addition to _convert_unchanged_line_by_line, to
205 speed up formatting when there are unchanged top-level
206 classes/functions/statements.
207 """
209 def __init__(self, lines_set: set[int]):
210 self._lines_set = lines_set
212 def visit_simple_stmt(self, node: Node) -> Iterator[None]:
213 # This is only called for top-level statements, since `visit_suite`
214 # won't visit its children nodes.
215 yield from []
216 newline_leaf = last_leaf(node)
217 if not newline_leaf:
218 return
219 assert (
220 newline_leaf.type == NEWLINE
221 ), f"Unexpectedly found leaf.type={newline_leaf.type}"
222 # We need to find the furthest ancestor with the NEWLINE as the last
223 # leaf, since a `suite` can simply be a `simple_stmt` when it puts
224 # its body on the same line. Example: `if cond: pass`.
225 ancestor = furthest_ancestor_with_last_leaf(newline_leaf)
226 if not _get_line_range(ancestor).intersection(self._lines_set):
227 _convert_node_to_standalone_comment(ancestor)
229 def visit_suite(self, node: Node) -> Iterator[None]:
230 yield from []
231 # If there is a STANDALONE_COMMENT node, it means parts of the node tree
232 # have fmt on/off/skip markers. Those STANDALONE_COMMENT nodes can't
233 # be simply converted by calling str(node). So we just don't convert
234 # here.
235 if _contains_standalone_comment(node):
236 return
237 # Find the semantic parent of this suite. For `async_stmt` and
238 # `async_funcdef`, the ASYNC token is defined on a separate level by the
239 # grammar.
240 semantic_parent = node.parent
241 if semantic_parent is not None:
242 if (
243 semantic_parent.prev_sibling is not None
244 and semantic_parent.prev_sibling.type == ASYNC
245 ):
246 semantic_parent = semantic_parent.parent
247 if semantic_parent is not None and not _get_line_range(
248 semantic_parent
249 ).intersection(self._lines_set):
250 _convert_node_to_standalone_comment(semantic_parent)
253def _convert_unchanged_line_by_line(node: Node, lines_set: set[int]) -> None:
254 """Converts unchanged to STANDALONE_COMMENT line by line."""
255 for leaf in node.leaves():
256 if leaf.type != NEWLINE:
257 # We only consider "unwrapped lines", which are divided by the NEWLINE
258 # token.
259 continue
260 if leaf.parent and leaf.parent.type == syms.match_stmt:
261 # The `suite` node is defined as:
262 # match_stmt: "match" subject_expr ':' NEWLINE INDENT case_block+ DEDENT
263 # Here we need to check `subject_expr`. The `case_block+` will be
264 # checked by their own NEWLINEs.
265 nodes_to_ignore: list[LN] = []
266 prev_sibling = leaf.prev_sibling
267 while prev_sibling:
268 nodes_to_ignore.insert(0, prev_sibling)
269 prev_sibling = prev_sibling.prev_sibling
270 if not _get_line_range(nodes_to_ignore).intersection(lines_set):
271 _convert_nodes_to_standalone_comment(nodes_to_ignore, newline=leaf)
272 elif leaf.parent and leaf.parent.type == syms.suite:
273 # The `suite` node is defined as:
274 # suite: simple_stmt | NEWLINE INDENT stmt+ DEDENT
275 # We will check `simple_stmt` and `stmt+` separately against the lines set
276 parent_sibling = leaf.parent.prev_sibling
277 nodes_to_ignore = []
278 while parent_sibling and parent_sibling.type != syms.suite:
279 # NOTE: Multiple suite nodes can exist as siblings in e.g. `if_stmt`.
280 nodes_to_ignore.insert(0, parent_sibling)
281 parent_sibling = parent_sibling.prev_sibling
282 # Special case for `async_stmt` and `async_funcdef` where the ASYNC
283 # token is on the grandparent node.
284 grandparent = leaf.parent.parent
285 if (
286 grandparent is not None
287 and grandparent.prev_sibling is not None
288 and grandparent.prev_sibling.type == ASYNC
289 ):
290 nodes_to_ignore.insert(0, grandparent.prev_sibling)
291 if not _get_line_range(nodes_to_ignore).intersection(lines_set):
292 _convert_nodes_to_standalone_comment(nodes_to_ignore, newline=leaf)
293 else:
294 ancestor = furthest_ancestor_with_last_leaf(leaf)
295 # Consider multiple decorators as a whole block, as their
296 # newlines have different behaviors than the rest of the grammar.
297 if (
298 ancestor.type == syms.decorator
299 and ancestor.parent
300 and ancestor.parent.type == syms.decorators
301 ):
302 ancestor = ancestor.parent
303 if not _get_line_range(ancestor).intersection(lines_set):
304 _convert_node_to_standalone_comment(ancestor)
307def _convert_node_to_standalone_comment(node: LN) -> None:
308 """Convert node to STANDALONE_COMMENT by modifying the tree inline."""
309 parent = node.parent
310 if not parent:
311 return
312 first = first_leaf(node)
313 last = last_leaf(node)
314 if not first or not last:
315 return
316 if first is last:
317 # This can happen on the following edge cases:
318 # 1. A block of `# fmt: off/on` code except the `# fmt: on` is placed
319 # on the end of the last line instead of on a new line.
320 # 2. A single backslash on its own line followed by a comment line.
321 # Ideally we don't want to format them when not requested, but fixing
322 # isn't easy. These cases are also badly formatted code, so it isn't
323 # too bad we reformat them.
324 return
325 # The prefix contains comments and indentation whitespaces. They are
326 # reformatted accordingly to the correct indentation level.
327 # This also means the indentation will be changed on the unchanged lines, and
328 # this is actually required to not break incremental reformatting.
329 prefix = first.prefix
330 first.prefix = ""
331 index = node.remove()
332 if index is not None:
333 # Because of the special handling of multiple decorators, if the decorated
334 # item is a single line then there will be a missing newline between the
335 # decorator and item, so add it back. This doesn't affect any other case
336 # since a decorated item with a newline would hit the earlier suite case
337 # in _convert_unchanged_line_by_line that correctly handles the newlines.
338 if node.type == syms.decorated:
339 # A leaf of type decorated wouldn't make sense, since it should always
340 # have at least the decorator + the decorated item, so if this assert
341 # hits that means there's a problem in the parser.
342 assert isinstance(node, Node)
343 # 1 will always be the correct index since before this function is
344 # called all the decorators are collapsed into a single leaf
345 node.insert_child(1, Leaf(NEWLINE, "\n"))
346 # Remove the '\n', as STANDALONE_COMMENT will have '\n' appended when
347 # generating the formatted code.
348 value = str(node)[:-1]
349 parent.insert_child(
350 index,
351 Leaf(
352 STANDALONE_COMMENT,
353 value,
354 prefix=prefix,
355 fmt_pass_converted_first_leaf=first,
356 ),
357 )
360def _convert_nodes_to_standalone_comment(nodes: Sequence[LN], *, newline: Leaf) -> None:
361 """Convert nodes to STANDALONE_COMMENT by modifying the tree inline."""
362 if not nodes:
363 return
364 parent = nodes[0].parent
365 first = first_leaf(nodes[0])
366 if not parent or not first:
367 return
368 prefix = first.prefix
369 first.prefix = ""
370 value = "".join(str(node) for node in nodes)
371 # The prefix comment on the NEWLINE leaf is the trailing comment of the statement.
372 if newline.prefix:
373 value += newline.prefix
374 newline.prefix = ""
375 index = nodes[0].remove()
376 for node in nodes[1:]:
377 node.remove()
378 if index is not None:
379 parent.insert_child(
380 index,
381 Leaf(
382 STANDALONE_COMMENT,
383 value,
384 prefix=prefix,
385 fmt_pass_converted_first_leaf=first,
386 ),
387 )
390def _leaf_line_end(leaf: Leaf) -> int:
391 """Returns the line number of the leaf node's last line."""
392 if leaf.type == NEWLINE:
393 return leaf.lineno
394 else:
395 # Leaf nodes like multiline strings can occupy multiple lines.
396 return leaf.lineno + str(leaf).count("\n")
399def _get_line_range(node_or_nodes: Union[LN, list[LN]]) -> set[int]:
400 """Returns the line range of this node or list of nodes."""
401 if isinstance(node_or_nodes, list):
402 nodes = node_or_nodes
403 if not nodes:
404 return set()
405 first = first_leaf(nodes[0])
406 last = last_leaf(nodes[-1])
407 if first and last:
408 line_start = first.lineno
409 line_end = _leaf_line_end(last)
410 return set(range(line_start, line_end + 1))
411 else:
412 return set()
413 else:
414 node = node_or_nodes
415 if isinstance(node, Leaf):
416 return set(range(node.lineno, _leaf_line_end(node) + 1))
417 else:
418 first = first_leaf(node)
419 last = last_leaf(node)
420 if first and last:
421 return set(range(first.lineno, _leaf_line_end(last) + 1))
422 else:
423 return set()
426@dataclass
427class _LinesMapping:
428 """1-based lines mapping from original source to modified source.
430 Lines [original_start, original_end] from original source
431 are mapped to [modified_start, modified_end].
433 The ranges are inclusive on both ends.
434 """
436 original_start: int
437 original_end: int
438 modified_start: int
439 modified_end: int
440 # Whether this range corresponds to a changed block, or an unchanged block.
441 is_changed_block: bool
444def _calculate_lines_mappings(
445 original_source: str,
446 modified_source: str,
447) -> Sequence[_LinesMapping]:
448 """Returns a sequence of _LinesMapping by diffing the sources.
450 For example, given the following diff:
451 import re
452 - def func(arg1,
453 - arg2, arg3):
454 + def func(arg1, arg2, arg3):
455 pass
456 It returns the following mappings:
457 original -> modified
458 (1, 1) -> (1, 1), is_changed_block=False (the "import re" line)
459 (2, 3) -> (2, 2), is_changed_block=True (the diff)
460 (4, 4) -> (3, 3), is_changed_block=False (the "pass" line)
462 You can think of this visually as if it brings up a side-by-side diff, and tries
463 to map the line ranges from the left side to the right side:
465 (1, 1)->(1, 1) 1. import re 1. import re
466 (2, 3)->(2, 2) 2. def func(arg1, 2. def func(arg1, arg2, arg3):
467 3. arg2, arg3):
468 (4, 4)->(3, 3) 4. pass 3. pass
470 Args:
471 original_source: the original source.
472 modified_source: the modified source.
473 """
474 matcher = difflib.SequenceMatcher(
475 None,
476 original_source.splitlines(keepends=True),
477 modified_source.splitlines(keepends=True),
478 )
479 matching_blocks = matcher.get_matching_blocks()
480 lines_mappings: list[_LinesMapping] = []
481 # matching_blocks is a sequence of "same block of code ranges", see
482 # https://docs.python.org/3/library/difflib.html#difflib.SequenceMatcher.get_matching_blocks
483 # Each block corresponds to a _LinesMapping with is_changed_block=False,
484 # and the ranges between two blocks corresponds to a _LinesMapping with
485 # is_changed_block=True,
486 # NOTE: matching_blocks is 0-based, but _LinesMapping is 1-based.
487 for i, block in enumerate(matching_blocks):
488 if i == 0:
489 if block.a != 0 or block.b != 0:
490 lines_mappings.append(
491 _LinesMapping(
492 original_start=1,
493 original_end=block.a,
494 modified_start=1,
495 modified_end=block.b,
496 is_changed_block=False,
497 )
498 )
499 else:
500 previous_block = matching_blocks[i - 1]
501 lines_mappings.append(
502 _LinesMapping(
503 original_start=previous_block.a + previous_block.size + 1,
504 original_end=block.a,
505 modified_start=previous_block.b + previous_block.size + 1,
506 modified_end=block.b,
507 is_changed_block=True,
508 )
509 )
510 if i < len(matching_blocks) - 1:
511 lines_mappings.append(
512 _LinesMapping(
513 original_start=block.a + 1,
514 original_end=block.a + block.size,
515 modified_start=block.b + 1,
516 modified_end=block.b + block.size,
517 is_changed_block=False,
518 )
519 )
520 return lines_mappings
523def _find_lines_mapping_index(
524 original_line: int,
525 lines_mappings: Sequence[_LinesMapping],
526 start_index: int,
527) -> int:
528 """Returns the original index of the lines mappings for the original line."""
529 index = start_index
530 while index < len(lines_mappings):
531 mapping = lines_mappings[index]
532 if mapping.original_start <= original_line <= mapping.original_end:
533 return index
534 index += 1
535 return index