Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/black/ranges.py: 15%
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1"""Functions related to Black's formatting by line ranges feature."""
3import difflib
4from collections.abc import Collection, Iterator, Sequence
5from dataclasses import dataclass
6from typing import Union
8from black.nodes import (
9 LN,
10 STANDALONE_COMMENT,
11 Leaf,
12 Node,
13 Visitor,
14 first_leaf,
15 furthest_ancestor_with_last_leaf,
16 last_leaf,
17 syms,
18)
19from blib2to3.pgen2.token import ASYNC, NEWLINE
22def parse_line_ranges(line_ranges: Sequence[str]) -> list[tuple[int, int]]:
23 lines: list[tuple[int, int]] = []
24 for lines_str in line_ranges:
25 parts = lines_str.split("-")
26 if len(parts) != 2:
27 raise ValueError(
28 "Incorrect --line-ranges format, expect 'START-END', found"
29 f" {lines_str!r}"
30 )
31 try:
32 start = int(parts[0])
33 end = int(parts[1])
34 except ValueError:
35 raise ValueError(
36 "Incorrect --line-ranges value, expect integer ranges, found"
37 f" {lines_str!r}"
38 ) from None
39 else:
40 lines.append((start, end))
41 return lines
44def is_valid_line_range(lines: tuple[int, int]) -> bool:
45 """Returns whether the line range is valid."""
46 return not lines or lines[0] <= lines[1]
49def sanitized_lines(
50 lines: Collection[tuple[int, int]], src_contents: str
51) -> Collection[tuple[int, int]]:
52 """Returns the valid line ranges for the given source.
54 This removes ranges that are entirely outside the valid lines.
56 Other ranges are normalized so that the start values are at least 1 and the
57 end values are at most the (1-based) index of the last source line.
58 """
59 if not src_contents:
60 return []
61 good_lines = []
62 src_line_count = src_contents.count("\n")
63 if not src_contents.endswith("\n"):
64 src_line_count += 1
65 for start, end in lines:
66 if start > src_line_count:
67 continue
68 # line-ranges are 1-based
69 start = max(start, 1)
70 if end < start:
71 continue
72 end = min(end, src_line_count)
73 good_lines.append((start, end))
74 return good_lines
77def adjusted_lines(
78 lines: Collection[tuple[int, int]],
79 original_source: str,
80 modified_source: str,
81) -> list[tuple[int, int]]:
82 """Returns the adjusted line ranges based on edits from the original code.
84 This computes the new line ranges by diffing original_source and
85 modified_source, and adjust each range based on how the range overlaps with
86 the diffs.
88 Note the diff can contain lines outside of the original line ranges. This can
89 happen when the formatting has to be done in adjacent to maintain consistent
90 local results. For example:
92 1. def my_func(arg1, arg2,
93 2. arg3,):
94 3. pass
96 If it restricts to line 2-2, it can't simply reformat line 2, it also has
97 to reformat line 1:
99 1. def my_func(
100 2. arg1,
101 3. arg2,
102 4. arg3,
103 5. ):
104 6. pass
106 In this case, we will expand the line ranges to also include the whole diff
107 block.
109 Args:
110 lines: a collection of line ranges.
111 original_source: the original source.
112 modified_source: the modified source.
113 """
114 lines_mappings = _calculate_lines_mappings(original_source, modified_source)
116 new_lines = []
117 # Keep an index of the current search. Since the lines and lines_mappings are
118 # sorted, this makes the search complexity linear.
119 current_mapping_index = 0
120 for start, end in sorted(lines):
121 start_mapping_index = _find_lines_mapping_index(
122 start,
123 lines_mappings,
124 current_mapping_index,
125 )
126 end_mapping_index = _find_lines_mapping_index(
127 end,
128 lines_mappings,
129 start_mapping_index,
130 )
131 current_mapping_index = start_mapping_index
132 if start_mapping_index >= len(lines_mappings) or end_mapping_index >= len(
133 lines_mappings
134 ):
135 # Protect against invalid inputs.
136 continue
137 start_mapping = lines_mappings[start_mapping_index]
138 end_mapping = lines_mappings[end_mapping_index]
139 if start_mapping.is_changed_block:
140 # When the line falls into a changed block, expands to the whole block.
141 new_start = start_mapping.modified_start
142 else:
143 new_start = (
144 start - start_mapping.original_start + start_mapping.modified_start
145 )
146 if end_mapping.is_changed_block:
147 # When the line falls into a changed block, expands to the whole block.
148 new_end = end_mapping.modified_end
149 else:
150 new_end = end - end_mapping.original_start + end_mapping.modified_start
151 new_range = (new_start, new_end)
152 if is_valid_line_range(new_range):
153 new_lines.append(new_range)
154 return new_lines
157def convert_unchanged_lines(src_node: Node, lines: Collection[tuple[int, int]]) -> None:
158 """Converts unchanged lines to STANDALONE_COMMENT.
160 The idea is similar to how `# fmt: on/off` is implemented. It also converts the
161 nodes between those markers as a single `STANDALONE_COMMENT` leaf node with
162 the unformatted code as its value. `STANDALONE_COMMENT` is a "fake" token
163 that will be formatted as-is with its prefix normalized.
165 Here we perform two passes:
167 1. Visit the top-level statements, and convert them to a single
168 `STANDALONE_COMMENT` when unchanged. This speeds up formatting when some
169 of the top-level statements aren't changed.
170 2. Convert unchanged "unwrapped lines" to `STANDALONE_COMMENT` nodes line by
171 line. "unwrapped lines" are divided by the `NEWLINE` token. e.g. a
172 multi-line statement is *one* "unwrapped line" that ends with `NEWLINE`,
173 even though this statement itself can span multiple lines, and the
174 tokenizer only sees the last '\n' as the `NEWLINE` token.
176 NOTE: During pass (2), comment prefixes and indentations are ALWAYS
177 normalized even when the lines aren't changed. This is fixable by moving
178 more formatting to pass (1). However, it's hard to get it correct when
179 incorrect indentations are used. So we defer this to future optimizations.
180 """
181 lines_set: set[int] = set()
182 for start, end in lines:
183 lines_set.update(range(start, end + 1))
184 visitor = _TopLevelStatementsVisitor(lines_set)
185 _ = list(visitor.visit(src_node)) # Consume all results.
186 _convert_unchanged_line_by_line(src_node, lines_set)
189def _contains_standalone_comment(node: LN) -> bool:
190 if isinstance(node, Leaf):
191 return node.type == STANDALONE_COMMENT
192 else:
193 for child in node.children:
194 if _contains_standalone_comment(child):
195 return True
196 return False
199class _TopLevelStatementsVisitor(Visitor[None]):
200 """
201 A node visitor that converts unchanged top-level statements to
202 STANDALONE_COMMENT.
204 This is used in addition to _convert_unchanged_line_by_line, to
205 speed up formatting when there are unchanged top-level
206 classes/functions/statements.
207 """
209 def __init__(self, lines_set: set[int]):
210 self._lines_set = lines_set
212 def visit_simple_stmt(self, node: Node) -> Iterator[None]:
213 # This is only called for top-level statements, since `visit_suite`
214 # won't visit its children nodes.
215 yield from []
216 newline_leaf = last_leaf(node)
217 if not newline_leaf:
218 return
219 assert (
220 newline_leaf.type == NEWLINE
221 ), f"Unexpectedly found leaf.type={newline_leaf.type}"
222 # We need to find the furthest ancestor with the NEWLINE as the last
223 # leaf, since a `suite` can simply be a `simple_stmt` when it puts
224 # its body on the same line. Example: `if cond: pass`.
225 ancestor = furthest_ancestor_with_last_leaf(newline_leaf)
226 if not _get_line_range(ancestor).intersection(self._lines_set):
227 _convert_node_to_standalone_comment(ancestor)
229 def visit_suite(self, node: Node) -> Iterator[None]:
230 yield from []
231 # If there is a STANDALONE_COMMENT node, it means parts of the node tree
232 # have fmt on/off/skip markers. Those STANDALONE_COMMENT nodes can't
233 # be simply converted by calling str(node). So we just don't convert
234 # here.
235 if _contains_standalone_comment(node):
236 return
237 # Find the semantic parent of this suite. For `async_stmt` and
238 # `async_funcdef`, the ASYNC token is defined on a separate level by the
239 # grammar.
240 semantic_parent = node.parent
241 if semantic_parent is not None:
242 if (
243 semantic_parent.prev_sibling is not None
244 and semantic_parent.prev_sibling.type == ASYNC
245 ):
246 semantic_parent = semantic_parent.parent
247 if semantic_parent is not None and not _get_line_range(
248 semantic_parent
249 ).intersection(self._lines_set):
250 _convert_node_to_standalone_comment(semantic_parent)
253def _convert_unchanged_line_by_line(node: Node, lines_set: set[int]) -> None:
254 """Converts unchanged to STANDALONE_COMMENT line by line."""
255 for leaf in node.leaves():
256 if leaf.type != NEWLINE:
257 # We only consider "unwrapped lines", which are divided by the NEWLINE
258 # token.
259 continue
260 if leaf.parent and leaf.parent.type == syms.match_stmt:
261 # The `suite` node is defined as:
262 # match_stmt: "match" subject_expr ':' NEWLINE INDENT case_block+ DEDENT
263 # Here we need to check `subject_expr`. The `case_block+` will be
264 # checked by their own NEWLINEs.
265 nodes_to_ignore: list[LN] = []
266 prev_sibling = leaf.prev_sibling
267 while prev_sibling:
268 nodes_to_ignore.insert(0, prev_sibling)
269 prev_sibling = prev_sibling.prev_sibling
270 if not _get_line_range(nodes_to_ignore).intersection(lines_set):
271 _convert_nodes_to_standalone_comment(nodes_to_ignore, newline=leaf)
272 elif leaf.parent and leaf.parent.type == syms.suite:
273 # The `suite` node is defined as:
274 # suite: simple_stmt | NEWLINE INDENT stmt+ DEDENT
275 # We will check `simple_stmt` and `stmt+` separately against the lines set
276 parent_sibling = leaf.parent.prev_sibling
277 nodes_to_ignore = []
278 while parent_sibling and not parent_sibling.type == syms.suite:
279 # NOTE: Multiple suite nodes can exist as siblings in e.g. `if_stmt`.
280 nodes_to_ignore.insert(0, parent_sibling)
281 parent_sibling = parent_sibling.prev_sibling
282 # Special case for `async_stmt` and `async_funcdef` where the ASYNC
283 # token is on the grandparent node.
284 grandparent = leaf.parent.parent
285 if (
286 grandparent is not None
287 and grandparent.prev_sibling is not None
288 and grandparent.prev_sibling.type == ASYNC
289 ):
290 nodes_to_ignore.insert(0, grandparent.prev_sibling)
291 if not _get_line_range(nodes_to_ignore).intersection(lines_set):
292 _convert_nodes_to_standalone_comment(nodes_to_ignore, newline=leaf)
293 else:
294 ancestor = furthest_ancestor_with_last_leaf(leaf)
295 # Consider multiple decorators as a whole block, as their
296 # newlines have different behaviors than the rest of the grammar.
297 if (
298 ancestor.type == syms.decorator
299 and ancestor.parent
300 and ancestor.parent.type == syms.decorators
301 ):
302 ancestor = ancestor.parent
303 if not _get_line_range(ancestor).intersection(lines_set):
304 _convert_node_to_standalone_comment(ancestor)
307def _convert_node_to_standalone_comment(node: LN) -> None:
308 """Convert node to STANDALONE_COMMENT by modifying the tree inline."""
309 parent = node.parent
310 if not parent:
311 return
312 first = first_leaf(node)
313 last = last_leaf(node)
314 if not first or not last:
315 return
316 if first is last:
317 # This can happen on the following edge cases:
318 # 1. A block of `# fmt: off/on` code except the `# fmt: on` is placed
319 # on the end of the last line instead of on a new line.
320 # 2. A single backslash on its own line followed by a comment line.
321 # Ideally we don't want to format them when not requested, but fixing
322 # isn't easy. These cases are also badly formatted code, so it isn't
323 # too bad we reformat them.
324 return
325 # The prefix contains comments and indentation whitespaces. They are
326 # reformatted accordingly to the correct indentation level.
327 # This also means the indentation will be changed on the unchanged lines, and
328 # this is actually required to not break incremental reformatting.
329 prefix = first.prefix
330 first.prefix = ""
331 index = node.remove()
332 if index is not None:
333 # Remove the '\n', as STANDALONE_COMMENT will have '\n' appended when
334 # generating the formatted code.
335 value = str(node)[:-1]
336 parent.insert_child(
337 index,
338 Leaf(
339 STANDALONE_COMMENT,
340 value,
341 prefix=prefix,
342 fmt_pass_converted_first_leaf=first,
343 ),
344 )
347def _convert_nodes_to_standalone_comment(nodes: Sequence[LN], *, newline: Leaf) -> None:
348 """Convert nodes to STANDALONE_COMMENT by modifying the tree inline."""
349 if not nodes:
350 return
351 parent = nodes[0].parent
352 first = first_leaf(nodes[0])
353 if not parent or not first:
354 return
355 prefix = first.prefix
356 first.prefix = ""
357 value = "".join(str(node) for node in nodes)
358 # The prefix comment on the NEWLINE leaf is the trailing comment of the statement.
359 if newline.prefix:
360 value += newline.prefix
361 newline.prefix = ""
362 index = nodes[0].remove()
363 for node in nodes[1:]:
364 node.remove()
365 if index is not None:
366 parent.insert_child(
367 index,
368 Leaf(
369 STANDALONE_COMMENT,
370 value,
371 prefix=prefix,
372 fmt_pass_converted_first_leaf=first,
373 ),
374 )
377def _leaf_line_end(leaf: Leaf) -> int:
378 """Returns the line number of the leaf node's last line."""
379 if leaf.type == NEWLINE:
380 return leaf.lineno
381 else:
382 # Leaf nodes like multiline strings can occupy multiple lines.
383 return leaf.lineno + str(leaf).count("\n")
386def _get_line_range(node_or_nodes: Union[LN, list[LN]]) -> set[int]:
387 """Returns the line range of this node or list of nodes."""
388 if isinstance(node_or_nodes, list):
389 nodes = node_or_nodes
390 if not nodes:
391 return set()
392 first = first_leaf(nodes[0])
393 last = last_leaf(nodes[-1])
394 if first and last:
395 line_start = first.lineno
396 line_end = _leaf_line_end(last)
397 return set(range(line_start, line_end + 1))
398 else:
399 return set()
400 else:
401 node = node_or_nodes
402 if isinstance(node, Leaf):
403 return set(range(node.lineno, _leaf_line_end(node) + 1))
404 else:
405 first = first_leaf(node)
406 last = last_leaf(node)
407 if first and last:
408 return set(range(first.lineno, _leaf_line_end(last) + 1))
409 else:
410 return set()
413@dataclass
414class _LinesMapping:
415 """1-based lines mapping from original source to modified source.
417 Lines [original_start, original_end] from original source
418 are mapped to [modified_start, modified_end].
420 The ranges are inclusive on both ends.
421 """
423 original_start: int
424 original_end: int
425 modified_start: int
426 modified_end: int
427 # Whether this range corresponds to a changed block, or an unchanged block.
428 is_changed_block: bool
431def _calculate_lines_mappings(
432 original_source: str,
433 modified_source: str,
434) -> Sequence[_LinesMapping]:
435 """Returns a sequence of _LinesMapping by diffing the sources.
437 For example, given the following diff:
438 import re
439 - def func(arg1,
440 - arg2, arg3):
441 + def func(arg1, arg2, arg3):
442 pass
443 It returns the following mappings:
444 original -> modified
445 (1, 1) -> (1, 1), is_changed_block=False (the "import re" line)
446 (2, 3) -> (2, 2), is_changed_block=True (the diff)
447 (4, 4) -> (3, 3), is_changed_block=False (the "pass" line)
449 You can think of this visually as if it brings up a side-by-side diff, and tries
450 to map the line ranges from the left side to the right side:
452 (1, 1)->(1, 1) 1. import re 1. import re
453 (2, 3)->(2, 2) 2. def func(arg1, 2. def func(arg1, arg2, arg3):
454 3. arg2, arg3):
455 (4, 4)->(3, 3) 4. pass 3. pass
457 Args:
458 original_source: the original source.
459 modified_source: the modified source.
460 """
461 matcher = difflib.SequenceMatcher(
462 None,
463 original_source.splitlines(keepends=True),
464 modified_source.splitlines(keepends=True),
465 )
466 matching_blocks = matcher.get_matching_blocks()
467 lines_mappings: list[_LinesMapping] = []
468 # matching_blocks is a sequence of "same block of code ranges", see
469 # https://docs.python.org/3/library/difflib.html#difflib.SequenceMatcher.get_matching_blocks
470 # Each block corresponds to a _LinesMapping with is_changed_block=False,
471 # and the ranges between two blocks corresponds to a _LinesMapping with
472 # is_changed_block=True,
473 # NOTE: matching_blocks is 0-based, but _LinesMapping is 1-based.
474 for i, block in enumerate(matching_blocks):
475 if i == 0:
476 if block.a != 0 or block.b != 0:
477 lines_mappings.append(
478 _LinesMapping(
479 original_start=1,
480 original_end=block.a,
481 modified_start=1,
482 modified_end=block.b,
483 is_changed_block=False,
484 )
485 )
486 else:
487 previous_block = matching_blocks[i - 1]
488 lines_mappings.append(
489 _LinesMapping(
490 original_start=previous_block.a + previous_block.size + 1,
491 original_end=block.a,
492 modified_start=previous_block.b + previous_block.size + 1,
493 modified_end=block.b,
494 is_changed_block=True,
495 )
496 )
497 if i < len(matching_blocks) - 1:
498 lines_mappings.append(
499 _LinesMapping(
500 original_start=block.a + 1,
501 original_end=block.a + block.size,
502 modified_start=block.b + 1,
503 modified_end=block.b + block.size,
504 is_changed_block=False,
505 )
506 )
507 return lines_mappings
510def _find_lines_mapping_index(
511 original_line: int,
512 lines_mappings: Sequence[_LinesMapping],
513 start_index: int,
514) -> int:
515 """Returns the original index of the lines mappings for the original line."""
516 index = start_index
517 while index < len(lines_mappings):
518 mapping = lines_mappings[index]
519 if mapping.original_start <= original_line <= mapping.original_end:
520 return index
521 index += 1
522 return index