Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/black/ranges.py: 14%
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1"""Functions related to Black's formatting by line ranges feature."""
3import difflib
4from collections.abc import Collection, Iterator, Sequence
5from dataclasses import dataclass
7from black.nodes import (
8 LN,
9 STANDALONE_COMMENT,
10 Leaf,
11 Node,
12 Visitor,
13 first_leaf,
14 furthest_ancestor_with_last_leaf,
15 last_leaf,
16 syms,
17)
18from blib2to3.pgen2.token import ASYNC, NEWLINE
21def parse_line_ranges(line_ranges: Sequence[str]) -> list[tuple[int, int]]:
22 lines: list[tuple[int, int]] = []
23 for lines_str in line_ranges:
24 parts = lines_str.split("-")
25 if len(parts) != 2:
26 raise ValueError(
27 "Incorrect --line-ranges format, expect 'START-END', found"
28 f" {lines_str!r}"
29 )
30 try:
31 start = int(parts[0])
32 end = int(parts[1])
33 except ValueError:
34 raise ValueError(
35 "Incorrect --line-ranges value, expect integer ranges, found"
36 f" {lines_str!r}"
37 ) from None
38 else:
39 lines.append((start, end))
40 return lines
43def is_valid_line_range(lines: tuple[int, int]) -> bool:
44 """Returns whether the line range is valid."""
45 return not lines or lines[0] <= lines[1]
48def sanitized_lines(
49 lines: Collection[tuple[int, int]], src_contents: str
50) -> Collection[tuple[int, int]]:
51 """Returns the valid line ranges for the given source.
53 This removes ranges that are entirely outside the valid lines.
55 Other ranges are normalized so that the start values are at least 1 and the
56 end values are at most the (1-based) index of the last source line.
57 """
58 if not src_contents:
59 return []
60 good_lines = []
61 src_line_count = src_contents.count("\n")
62 if not src_contents.endswith("\n"):
63 src_line_count += 1
64 for start, end in lines:
65 if start > src_line_count:
66 continue
67 # line-ranges are 1-based
68 start = max(start, 1)
69 if end < start:
70 continue
71 end = min(end, src_line_count)
72 good_lines.append((start, end))
73 return good_lines
76def adjusted_lines(
77 lines: Collection[tuple[int, int]],
78 original_source: str,
79 modified_source: str,
80) -> list[tuple[int, int]]:
81 """Returns the adjusted line ranges based on edits from the original code.
83 This computes the new line ranges by diffing original_source and
84 modified_source, and adjust each range based on how the range overlaps with
85 the diffs.
87 Note the diff can contain lines outside of the original line ranges. This can
88 happen when the formatting has to be done in adjacent to maintain consistent
89 local results. For example:
91 1. def my_func(arg1, arg2,
92 2. arg3,):
93 3. pass
95 If it restricts to line 2-2, it can't simply reformat line 2, it also has
96 to reformat line 1:
98 1. def my_func(
99 2. arg1,
100 3. arg2,
101 4. arg3,
102 5. ):
103 6. pass
105 In this case, we will expand the line ranges to also include the whole diff
106 block.
108 Args:
109 lines: a collection of line ranges.
110 original_source: the original source.
111 modified_source: the modified source.
112 """
113 lines_mappings = _calculate_lines_mappings(original_source, modified_source)
115 new_lines = []
116 # Keep an index of the current search. Since the lines and lines_mappings are
117 # sorted, this makes the search complexity linear.
118 current_mapping_index = 0
119 for start, end in sorted(lines):
120 start_mapping_index = _find_lines_mapping_index(
121 start,
122 lines_mappings,
123 current_mapping_index,
124 )
125 end_mapping_index = _find_lines_mapping_index(
126 end,
127 lines_mappings,
128 start_mapping_index,
129 )
130 current_mapping_index = start_mapping_index
131 if start_mapping_index >= len(lines_mappings) or end_mapping_index >= len(
132 lines_mappings
133 ):
134 # Protect against invalid inputs.
135 continue
136 start_mapping = lines_mappings[start_mapping_index]
137 end_mapping = lines_mappings[end_mapping_index]
138 if start_mapping.is_changed_block:
139 # When the line falls into a changed block, expands to the whole block.
140 new_start = start_mapping.modified_start
141 else:
142 new_start = (
143 start - start_mapping.original_start + start_mapping.modified_start
144 )
145 if end_mapping.is_changed_block:
146 # When the line falls into a changed block, expands to the whole block.
147 new_end = end_mapping.modified_end
148 else:
149 new_end = end - end_mapping.original_start + end_mapping.modified_start
150 new_range = (new_start, new_end)
151 if is_valid_line_range(new_range):
152 new_lines.append(new_range)
153 return new_lines
156def convert_unchanged_lines(src_node: Node, lines: Collection[tuple[int, int]]) -> None:
157 r"""Converts unchanged lines to STANDALONE_COMMENT.
159 The idea is similar to how `# fmt: on/off` is implemented. It also converts the
160 nodes between those markers as a single `STANDALONE_COMMENT` leaf node with
161 the unformatted code as its value. `STANDALONE_COMMENT` is a "fake" token
162 that will be formatted as-is with its prefix normalized.
164 Here we perform two passes:
166 1. Visit the top-level statements, and convert them to a single
167 `STANDALONE_COMMENT` when unchanged. This speeds up formatting when some
168 of the top-level statements aren't changed.
169 2. Convert unchanged "unwrapped lines" to `STANDALONE_COMMENT` nodes line by
170 line. "unwrapped lines" are divided by the `NEWLINE` token. e.g. a
171 multi-line statement is *one* "unwrapped line" that ends with `NEWLINE`,
172 even though this statement itself can span multiple lines, and the
173 tokenizer only sees the last '\n' as the `NEWLINE` token.
175 NOTE: During pass (2), comment prefixes and indentations are ALWAYS
176 normalized even when the lines aren't changed. This is fixable by moving
177 more formatting to pass (1). However, it's hard to get it correct when
178 incorrect indentations are used. So we defer this to future optimizations.
179 """
180 lines_set: set[int] = set()
181 for start, end in lines:
182 lines_set.update(range(start, end + 1))
183 visitor = _TopLevelStatementsVisitor(lines_set)
184 _ = list(visitor.visit(src_node)) # Consume all results.
185 _convert_unchanged_line_by_line(src_node, lines_set)
188def _contains_standalone_comment(node: LN) -> bool:
189 if isinstance(node, Leaf):
190 return node.type == STANDALONE_COMMENT
191 else:
192 for child in node.children:
193 if _contains_standalone_comment(child):
194 return True
195 return False
198class _TopLevelStatementsVisitor(Visitor[None]):
199 """
200 A node visitor that converts unchanged top-level statements to
201 STANDALONE_COMMENT.
203 This is used in addition to _convert_unchanged_line_by_line, to
204 speed up formatting when there are unchanged top-level
205 classes/functions/statements.
206 """
208 def __init__(self, lines_set: set[int]):
209 self._lines_set = lines_set
211 def visit_simple_stmt(self, node: Node) -> Iterator[None]:
212 # This is only called for top-level statements, since `visit_suite`
213 # won't visit its children nodes.
214 yield from []
215 newline_leaf = last_leaf(node)
216 if not newline_leaf:
217 return
218 assert (
219 newline_leaf.type == NEWLINE
220 ), f"Unexpectedly found leaf.type={newline_leaf.type}"
221 # We need to find the furthest ancestor with the NEWLINE as the last
222 # leaf, since a `suite` can simply be a `simple_stmt` when it puts
223 # its body on the same line. Example: `if cond: pass`.
224 ancestor = furthest_ancestor_with_last_leaf(newline_leaf)
225 if not _get_line_range(ancestor).intersection(self._lines_set):
226 _convert_node_to_standalone_comment(ancestor)
228 def visit_suite(self, node: Node) -> Iterator[None]:
229 yield from []
230 # If there is a STANDALONE_COMMENT node, it means parts of the node tree
231 # have fmt on/off/skip markers. Those STANDALONE_COMMENT nodes can't
232 # be simply converted by calling str(node). So we just don't convert
233 # here.
234 if _contains_standalone_comment(node):
235 return
236 # Find the semantic parent of this suite. For `async_stmt` and
237 # `async_funcdef`, the ASYNC token is defined on a separate level by the
238 # grammar.
239 semantic_parent = node.parent
240 if semantic_parent is not None:
241 if (
242 semantic_parent.prev_sibling is not None
243 and semantic_parent.prev_sibling.type == ASYNC
244 ):
245 semantic_parent = semantic_parent.parent
246 if semantic_parent is not None and not _get_line_range(
247 semantic_parent
248 ).intersection(self._lines_set):
249 _convert_node_to_standalone_comment(semantic_parent)
252def _convert_unchanged_line_by_line(node: Node, lines_set: set[int]) -> None:
253 """Converts unchanged to STANDALONE_COMMENT line by line."""
254 for leaf in node.leaves():
255 if leaf.type != NEWLINE:
256 # We only consider "unwrapped lines", which are divided by the NEWLINE
257 # token.
258 continue
259 if leaf.parent and leaf.parent.type == syms.match_stmt:
260 # The `suite` node is defined as:
261 # match_stmt: "match" subject_expr ':' NEWLINE INDENT case_block+ DEDENT
262 # Here we need to check `subject_expr`. The `case_block+` will be
263 # checked by their own NEWLINEs.
264 nodes_to_ignore: list[LN] = []
265 prev_sibling = leaf.prev_sibling
266 while prev_sibling:
267 nodes_to_ignore.insert(0, prev_sibling)
268 prev_sibling = prev_sibling.prev_sibling
269 if not _get_line_range(nodes_to_ignore).intersection(lines_set):
270 _convert_nodes_to_standalone_comment(nodes_to_ignore, newline=leaf)
271 elif leaf.parent and leaf.parent.type == syms.suite:
272 # The `suite` node is defined as:
273 # suite: simple_stmt | NEWLINE INDENT stmt+ DEDENT
274 # We will check `simple_stmt` and `stmt+` separately against the lines set
275 parent_sibling = leaf.parent.prev_sibling
276 nodes_to_ignore = []
277 while parent_sibling and parent_sibling.type != syms.suite:
278 # NOTE: Multiple suite nodes can exist as siblings in e.g. `if_stmt`.
279 nodes_to_ignore.insert(0, parent_sibling)
280 parent_sibling = parent_sibling.prev_sibling
281 # Special case for `async_stmt` and `async_funcdef` where the ASYNC
282 # token is on the grandparent node.
283 grandparent = leaf.parent.parent
284 if (
285 grandparent is not None
286 and grandparent.prev_sibling is not None
287 and grandparent.prev_sibling.type == ASYNC
288 ):
289 nodes_to_ignore.insert(0, grandparent.prev_sibling)
290 if not _get_line_range(nodes_to_ignore).intersection(lines_set):
291 _convert_nodes_to_standalone_comment(nodes_to_ignore, newline=leaf)
292 else:
293 ancestor = furthest_ancestor_with_last_leaf(leaf)
294 # Consider multiple decorators as a whole block, as their
295 # newlines have different behaviors than the rest of the grammar.
296 if (
297 ancestor.type == syms.decorator
298 and ancestor.parent
299 and ancestor.parent.type == syms.decorators
300 ):
301 ancestor = ancestor.parent
302 if not _get_line_range(ancestor).intersection(lines_set):
303 _convert_node_to_standalone_comment(ancestor)
306def _convert_node_to_standalone_comment(node: LN) -> None:
307 """Convert node to STANDALONE_COMMENT by modifying the tree inline."""
308 parent = node.parent
309 if not parent:
310 return
311 first = first_leaf(node)
312 last = last_leaf(node)
313 if not first or not last:
314 return
315 if first is last:
316 # This can happen on the following edge cases:
317 # 1. A block of `# fmt: off/on` code except the `# fmt: on` is placed
318 # on the end of the last line instead of on a new line.
319 # 2. A single backslash on its own line followed by a comment line.
320 # Ideally we don't want to format them when not requested, but fixing
321 # isn't easy. These cases are also badly formatted code, so it isn't
322 # too bad we reformat them.
323 return
324 # The prefix contains comments and indentation whitespaces. They are
325 # reformatted accordingly to the correct indentation level.
326 # This also means the indentation will be changed on the unchanged lines, and
327 # this is actually required to not break incremental reformatting.
328 prefix = first.prefix
329 first.prefix = ""
330 index = node.remove()
331 if index is not None:
332 # Because of the special handling of multiple decorators, if the decorated
333 # item is a single line then there will be a missing newline between the
334 # decorator and item, so add it back. This doesn't affect any other case
335 # since a decorated item with a newline would hit the earlier suite case
336 # in _convert_unchanged_line_by_line that correctly handles the newlines.
337 if node.type == syms.decorated:
338 # A leaf of type decorated wouldn't make sense, since it should always
339 # have at least the decorator + the decorated item, so if this assert
340 # hits that means there's a problem in the parser.
341 assert isinstance(node, Node)
342 # 1 will always be the correct index since before this function is
343 # called all the decorators are collapsed into a single leaf
344 node.insert_child(1, Leaf(NEWLINE, "\n"))
345 # Remove the '\n', as STANDALONE_COMMENT will have '\n' appended when
346 # generating the formatted code.
347 value = str(node)[:-1]
348 parent.insert_child(
349 index,
350 Leaf(
351 STANDALONE_COMMENT,
352 value,
353 prefix=prefix,
354 fmt_pass_converted_first_leaf=first,
355 ),
356 )
359def _convert_nodes_to_standalone_comment(nodes: Sequence[LN], *, newline: Leaf) -> None:
360 """Convert nodes to STANDALONE_COMMENT by modifying the tree inline."""
361 if not nodes:
362 return
363 parent = nodes[0].parent
364 first = first_leaf(nodes[0])
365 if not parent or not first:
366 return
367 prefix = first.prefix
368 first.prefix = ""
369 value = "".join(str(node) for node in nodes)
370 # The prefix comment on the NEWLINE leaf is the trailing comment of the statement.
371 if newline.prefix:
372 value += newline.prefix
373 newline.prefix = ""
374 index = nodes[0].remove()
375 for node in nodes[1:]:
376 node.remove()
377 if index is not None:
378 parent.insert_child(
379 index,
380 Leaf(
381 STANDALONE_COMMENT,
382 value,
383 prefix=prefix,
384 fmt_pass_converted_first_leaf=first,
385 ),
386 )
389def _leaf_line_end(leaf: Leaf) -> int:
390 """Returns the line number of the leaf node's last line."""
391 if leaf.type == NEWLINE:
392 return leaf.lineno
393 else:
394 # Leaf nodes like multiline strings can occupy multiple lines.
395 return leaf.lineno + str(leaf).count("\n")
398def _get_line_range(node_or_nodes: LN | list[LN]) -> set[int]:
399 """Returns the line range of this node or list of nodes."""
400 if isinstance(node_or_nodes, list):
401 nodes = node_or_nodes
402 if not nodes:
403 return set()
404 first = first_leaf(nodes[0])
405 last = last_leaf(nodes[-1])
406 if first and last:
407 line_start = first.lineno
408 line_end = _leaf_line_end(last)
409 return set(range(line_start, line_end + 1))
410 else:
411 return set()
412 else:
413 node = node_or_nodes
414 if isinstance(node, Leaf):
415 return set(range(node.lineno, _leaf_line_end(node) + 1))
416 else:
417 first = first_leaf(node)
418 last = last_leaf(node)
419 if first and last:
420 return set(range(first.lineno, _leaf_line_end(last) + 1))
421 else:
422 return set()
425@dataclass
426class _LinesMapping:
427 """1-based lines mapping from original source to modified source.
429 Lines [original_start, original_end] from original source
430 are mapped to [modified_start, modified_end].
432 The ranges are inclusive on both ends.
433 """
435 original_start: int
436 original_end: int
437 modified_start: int
438 modified_end: int
439 # Whether this range corresponds to a changed block, or an unchanged block.
440 is_changed_block: bool
443def _calculate_lines_mappings(
444 original_source: str,
445 modified_source: str,
446) -> Sequence[_LinesMapping]:
447 """Returns a sequence of _LinesMapping by diffing the sources.
449 For example, given the following diff:
450 import re
451 - def func(arg1,
452 - arg2, arg3):
453 + def func(arg1, arg2, arg3):
454 pass
455 It returns the following mappings:
456 original -> modified
457 (1, 1) -> (1, 1), is_changed_block=False (the "import re" line)
458 (2, 3) -> (2, 2), is_changed_block=True (the diff)
459 (4, 4) -> (3, 3), is_changed_block=False (the "pass" line)
461 You can think of this visually as if it brings up a side-by-side diff, and tries
462 to map the line ranges from the left side to the right side:
464 (1, 1)->(1, 1) 1. import re 1. import re
465 (2, 3)->(2, 2) 2. def func(arg1, 2. def func(arg1, arg2, arg3):
466 3. arg2, arg3):
467 (4, 4)->(3, 3) 4. pass 3. pass
469 Args:
470 original_source: the original source.
471 modified_source: the modified source.
472 """
473 matcher = difflib.SequenceMatcher(
474 None,
475 original_source.splitlines(keepends=True),
476 modified_source.splitlines(keepends=True),
477 )
478 matching_blocks = matcher.get_matching_blocks()
479 lines_mappings: list[_LinesMapping] = []
480 # matching_blocks is a sequence of "same block of code ranges", see
481 # https://docs.python.org/3/library/difflib.html#difflib.SequenceMatcher.get_matching_blocks
482 # Each block corresponds to a _LinesMapping with is_changed_block=False,
483 # and the ranges between two blocks corresponds to a _LinesMapping with
484 # is_changed_block=True,
485 # NOTE: matching_blocks is 0-based, but _LinesMapping is 1-based.
486 for i, block in enumerate(matching_blocks):
487 if i == 0:
488 if block.a != 0 or block.b != 0:
489 lines_mappings.append(
490 _LinesMapping(
491 original_start=1,
492 original_end=block.a,
493 modified_start=1,
494 modified_end=block.b,
495 is_changed_block=False,
496 )
497 )
498 else:
499 previous_block = matching_blocks[i - 1]
500 lines_mappings.append(
501 _LinesMapping(
502 original_start=previous_block.a + previous_block.size + 1,
503 original_end=block.a,
504 modified_start=previous_block.b + previous_block.size + 1,
505 modified_end=block.b,
506 is_changed_block=True,
507 )
508 )
509 if i < len(matching_blocks) - 1:
510 lines_mappings.append(
511 _LinesMapping(
512 original_start=block.a + 1,
513 original_end=block.a + block.size,
514 modified_start=block.b + 1,
515 modified_end=block.b + block.size,
516 is_changed_block=False,
517 )
518 )
519 return lines_mappings
522def _find_lines_mapping_index(
523 original_line: int,
524 lines_mappings: Sequence[_LinesMapping],
525 start_index: int,
526) -> int:
527 """Returns the original index of the lines mappings for the original line."""
528 index = start_index
529 while index < len(lines_mappings):
530 mapping = lines_mappings[index]
531 if mapping.original_start <= original_line <= mapping.original_end:
532 return index
533 index += 1
534 return index