Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/black/ranges.py: 14%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

212 statements  

1"""Functions related to Black's formatting by line ranges feature.""" 

2 

3import difflib 

4from collections.abc import Collection, Iterator, Sequence 

5from dataclasses import dataclass 

6 

7from black.nodes import ( 

8 LN, 

9 STANDALONE_COMMENT, 

10 Leaf, 

11 Node, 

12 Visitor, 

13 first_leaf, 

14 furthest_ancestor_with_last_leaf, 

15 last_leaf, 

16 syms, 

17) 

18from blib2to3.pgen2.token import ASYNC, NEWLINE 

19 

20 

21def parse_line_ranges(line_ranges: Sequence[str]) -> list[tuple[int, int]]: 

22 lines: list[tuple[int, int]] = [] 

23 for lines_str in line_ranges: 

24 parts = lines_str.split("-") 

25 if len(parts) != 2: 

26 raise ValueError( 

27 "Incorrect --line-ranges format, expect 'START-END', found" 

28 f" {lines_str!r}" 

29 ) 

30 try: 

31 start = int(parts[0]) 

32 end = int(parts[1]) 

33 except ValueError: 

34 raise ValueError( 

35 "Incorrect --line-ranges value, expect integer ranges, found" 

36 f" {lines_str!r}" 

37 ) from None 

38 else: 

39 lines.append((start, end)) 

40 return lines 

41 

42 

43def is_valid_line_range(lines: tuple[int, int]) -> bool: 

44 """Returns whether the line range is valid.""" 

45 return not lines or lines[0] <= lines[1] 

46 

47 

48def sanitized_lines( 

49 lines: Collection[tuple[int, int]], src_contents: str 

50) -> Collection[tuple[int, int]]: 

51 """Returns the valid line ranges for the given source. 

52 

53 This removes ranges that are entirely outside the valid lines. 

54 

55 Other ranges are normalized so that the start values are at least 1 and the 

56 end values are at most the (1-based) index of the last source line. 

57 """ 

58 if not src_contents: 

59 return [] 

60 good_lines = [] 

61 src_line_count = src_contents.count("\n") 

62 if not src_contents.endswith("\n"): 

63 src_line_count += 1 

64 for start, end in lines: 

65 if start > src_line_count: 

66 continue 

67 # line-ranges are 1-based 

68 start = max(start, 1) 

69 if end < start: 

70 continue 

71 end = min(end, src_line_count) 

72 good_lines.append((start, end)) 

73 return good_lines 

74 

75 

76def adjusted_lines( 

77 lines: Collection[tuple[int, int]], 

78 original_source: str, 

79 modified_source: str, 

80) -> list[tuple[int, int]]: 

81 """Returns the adjusted line ranges based on edits from the original code. 

82 

83 This computes the new line ranges by diffing original_source and 

84 modified_source, and adjust each range based on how the range overlaps with 

85 the diffs. 

86 

87 Note the diff can contain lines outside of the original line ranges. This can 

88 happen when the formatting has to be done in adjacent to maintain consistent 

89 local results. For example: 

90 

91 1. def my_func(arg1, arg2, 

92 2. arg3,): 

93 3. pass 

94 

95 If it restricts to line 2-2, it can't simply reformat line 2, it also has 

96 to reformat line 1: 

97 

98 1. def my_func( 

99 2. arg1, 

100 3. arg2, 

101 4. arg3, 

102 5. ): 

103 6. pass 

104 

105 In this case, we will expand the line ranges to also include the whole diff 

106 block. 

107 

108 Args: 

109 lines: a collection of line ranges. 

110 original_source: the original source. 

111 modified_source: the modified source. 

112 """ 

113 lines_mappings = _calculate_lines_mappings(original_source, modified_source) 

114 

115 new_lines = [] 

116 # Keep an index of the current search. Since the lines and lines_mappings are 

117 # sorted, this makes the search complexity linear. 

118 current_mapping_index = 0 

119 for start, end in sorted(lines): 

120 start_mapping_index = _find_lines_mapping_index( 

121 start, 

122 lines_mappings, 

123 current_mapping_index, 

124 ) 

125 end_mapping_index = _find_lines_mapping_index( 

126 end, 

127 lines_mappings, 

128 start_mapping_index, 

129 ) 

130 current_mapping_index = start_mapping_index 

131 if start_mapping_index >= len(lines_mappings) or end_mapping_index >= len( 

132 lines_mappings 

133 ): 

134 # Protect against invalid inputs. 

135 continue 

136 start_mapping = lines_mappings[start_mapping_index] 

137 end_mapping = lines_mappings[end_mapping_index] 

138 if start_mapping.is_changed_block: 

139 # When the line falls into a changed block, expands to the whole block. 

140 new_start = start_mapping.modified_start 

141 else: 

142 new_start = ( 

143 start - start_mapping.original_start + start_mapping.modified_start 

144 ) 

145 if end_mapping.is_changed_block: 

146 # When the line falls into a changed block, expands to the whole block. 

147 new_end = end_mapping.modified_end 

148 else: 

149 new_end = end - end_mapping.original_start + end_mapping.modified_start 

150 new_range = (new_start, new_end) 

151 if is_valid_line_range(new_range): 

152 new_lines.append(new_range) 

153 return new_lines 

154 

155 

156def convert_unchanged_lines(src_node: Node, lines: Collection[tuple[int, int]]) -> None: 

157 r"""Converts unchanged lines to STANDALONE_COMMENT. 

158 

159 The idea is similar to how `# fmt: on/off` is implemented. It also converts the 

160 nodes between those markers as a single `STANDALONE_COMMENT` leaf node with 

161 the unformatted code as its value. `STANDALONE_COMMENT` is a "fake" token 

162 that will be formatted as-is with its prefix normalized. 

163 

164 Here we perform two passes: 

165 

166 1. Visit the top-level statements, and convert them to a single 

167 `STANDALONE_COMMENT` when unchanged. This speeds up formatting when some 

168 of the top-level statements aren't changed. 

169 2. Convert unchanged "unwrapped lines" to `STANDALONE_COMMENT` nodes line by 

170 line. "unwrapped lines" are divided by the `NEWLINE` token. e.g. a 

171 multi-line statement is *one* "unwrapped line" that ends with `NEWLINE`, 

172 even though this statement itself can span multiple lines, and the 

173 tokenizer only sees the last '\n' as the `NEWLINE` token. 

174 

175 NOTE: During pass (2), comment prefixes and indentations are ALWAYS 

176 normalized even when the lines aren't changed. This is fixable by moving 

177 more formatting to pass (1). However, it's hard to get it correct when 

178 incorrect indentations are used. So we defer this to future optimizations. 

179 """ 

180 lines_set: set[int] = set() 

181 for start, end in lines: 

182 lines_set.update(range(start, end + 1)) 

183 visitor = _TopLevelStatementsVisitor(lines_set) 

184 _ = list(visitor.visit(src_node)) # Consume all results. 

185 _convert_unchanged_line_by_line(src_node, lines_set) 

186 

187 

188def _contains_standalone_comment(node: LN) -> bool: 

189 if isinstance(node, Leaf): 

190 return node.type == STANDALONE_COMMENT 

191 else: 

192 for child in node.children: 

193 if _contains_standalone_comment(child): 

194 return True 

195 return False 

196 

197 

198class _TopLevelStatementsVisitor(Visitor[None]): 

199 """ 

200 A node visitor that converts unchanged top-level statements to 

201 STANDALONE_COMMENT. 

202 

203 This is used in addition to _convert_unchanged_line_by_line, to 

204 speed up formatting when there are unchanged top-level 

205 classes/functions/statements. 

206 """ 

207 

208 def __init__(self, lines_set: set[int]): 

209 self._lines_set = lines_set 

210 

211 def visit_simple_stmt(self, node: Node) -> Iterator[None]: 

212 # This is only called for top-level statements, since `visit_suite` 

213 # won't visit its children nodes. 

214 yield from [] 

215 newline_leaf = last_leaf(node) 

216 if not newline_leaf: 

217 return 

218 assert ( 

219 newline_leaf.type == NEWLINE 

220 ), f"Unexpectedly found leaf.type={newline_leaf.type}" 

221 # We need to find the furthest ancestor with the NEWLINE as the last 

222 # leaf, since a `suite` can simply be a `simple_stmt` when it puts 

223 # its body on the same line. Example: `if cond: pass`. 

224 ancestor = furthest_ancestor_with_last_leaf(newline_leaf) 

225 if not _get_line_range(ancestor).intersection(self._lines_set): 

226 _convert_node_to_standalone_comment(ancestor) 

227 

228 def visit_suite(self, node: Node) -> Iterator[None]: 

229 yield from [] 

230 # If there is a STANDALONE_COMMENT node, it means parts of the node tree 

231 # have fmt on/off/skip markers. Those STANDALONE_COMMENT nodes can't 

232 # be simply converted by calling str(node). So we just don't convert 

233 # here. 

234 if _contains_standalone_comment(node): 

235 return 

236 # Find the semantic parent of this suite. For `async_stmt` and 

237 # `async_funcdef`, the ASYNC token is defined on a separate level by the 

238 # grammar. 

239 semantic_parent = node.parent 

240 if semantic_parent is not None: 

241 if ( 

242 semantic_parent.prev_sibling is not None 

243 and semantic_parent.prev_sibling.type == ASYNC 

244 ): 

245 semantic_parent = semantic_parent.parent 

246 if semantic_parent is not None and not _get_line_range( 

247 semantic_parent 

248 ).intersection(self._lines_set): 

249 _convert_node_to_standalone_comment(semantic_parent) 

250 

251 

252def _convert_unchanged_line_by_line(node: Node, lines_set: set[int]) -> None: 

253 """Converts unchanged to STANDALONE_COMMENT line by line.""" 

254 for leaf in node.leaves(): 

255 if leaf.type != NEWLINE: 

256 # We only consider "unwrapped lines", which are divided by the NEWLINE 

257 # token. 

258 continue 

259 if leaf.parent and leaf.parent.type == syms.match_stmt: 

260 # The `suite` node is defined as: 

261 # match_stmt: "match" subject_expr ':' NEWLINE INDENT case_block+ DEDENT 

262 # Here we need to check `subject_expr`. The `case_block+` will be 

263 # checked by their own NEWLINEs. 

264 nodes_to_ignore: list[LN] = [] 

265 prev_sibling = leaf.prev_sibling 

266 while prev_sibling: 

267 nodes_to_ignore.insert(0, prev_sibling) 

268 prev_sibling = prev_sibling.prev_sibling 

269 if not _get_line_range(nodes_to_ignore).intersection(lines_set): 

270 _convert_nodes_to_standalone_comment(nodes_to_ignore, newline=leaf) 

271 elif leaf.parent and leaf.parent.type == syms.suite: 

272 # The `suite` node is defined as: 

273 # suite: simple_stmt | NEWLINE INDENT stmt+ DEDENT 

274 # We will check `simple_stmt` and `stmt+` separately against the lines set 

275 parent_sibling = leaf.parent.prev_sibling 

276 nodes_to_ignore = [] 

277 while parent_sibling and parent_sibling.type != syms.suite: 

278 # NOTE: Multiple suite nodes can exist as siblings in e.g. `if_stmt`. 

279 nodes_to_ignore.insert(0, parent_sibling) 

280 parent_sibling = parent_sibling.prev_sibling 

281 # Special case for `async_stmt` and `async_funcdef` where the ASYNC 

282 # token is on the grandparent node. 

283 grandparent = leaf.parent.parent 

284 if ( 

285 grandparent is not None 

286 and grandparent.prev_sibling is not None 

287 and grandparent.prev_sibling.type == ASYNC 

288 ): 

289 nodes_to_ignore.insert(0, grandparent.prev_sibling) 

290 if not _get_line_range(nodes_to_ignore).intersection(lines_set): 

291 _convert_nodes_to_standalone_comment(nodes_to_ignore, newline=leaf) 

292 else: 

293 ancestor = furthest_ancestor_with_last_leaf(leaf) 

294 # Consider multiple decorators as a whole block, as their 

295 # newlines have different behaviors than the rest of the grammar. 

296 if ( 

297 ancestor.type == syms.decorator 

298 and ancestor.parent 

299 and ancestor.parent.type == syms.decorators 

300 ): 

301 ancestor = ancestor.parent 

302 if not _get_line_range(ancestor).intersection(lines_set): 

303 _convert_node_to_standalone_comment(ancestor) 

304 

305 

306def _convert_node_to_standalone_comment(node: LN) -> None: 

307 """Convert node to STANDALONE_COMMENT by modifying the tree inline.""" 

308 parent = node.parent 

309 if not parent: 

310 return 

311 first = first_leaf(node) 

312 last = last_leaf(node) 

313 if not first or not last: 

314 return 

315 if first is last: 

316 # This can happen on the following edge cases: 

317 # 1. A block of `# fmt: off/on` code except the `# fmt: on` is placed 

318 # on the end of the last line instead of on a new line. 

319 # 2. A single backslash on its own line followed by a comment line. 

320 # Ideally we don't want to format them when not requested, but fixing 

321 # isn't easy. These cases are also badly formatted code, so it isn't 

322 # too bad we reformat them. 

323 return 

324 # The prefix contains comments and indentation whitespaces. They are 

325 # reformatted accordingly to the correct indentation level. 

326 # This also means the indentation will be changed on the unchanged lines, and 

327 # this is actually required to not break incremental reformatting. 

328 prefix = first.prefix 

329 first.prefix = "" 

330 index = node.remove() 

331 if index is not None: 

332 # Because of the special handling of multiple decorators, if the decorated 

333 # item is a single line then there will be a missing newline between the 

334 # decorator and item, so add it back. This doesn't affect any other case 

335 # since a decorated item with a newline would hit the earlier suite case 

336 # in _convert_unchanged_line_by_line that correctly handles the newlines. 

337 if node.type == syms.decorated: 

338 # A leaf of type decorated wouldn't make sense, since it should always 

339 # have at least the decorator + the decorated item, so if this assert 

340 # hits that means there's a problem in the parser. 

341 assert isinstance(node, Node) 

342 # 1 will always be the correct index since before this function is 

343 # called all the decorators are collapsed into a single leaf 

344 node.insert_child(1, Leaf(NEWLINE, "\n")) 

345 # Remove the '\n', as STANDALONE_COMMENT will have '\n' appended when 

346 # generating the formatted code. 

347 value = str(node)[:-1] 

348 parent.insert_child( 

349 index, 

350 Leaf( 

351 STANDALONE_COMMENT, 

352 value, 

353 prefix=prefix, 

354 fmt_pass_converted_first_leaf=first, 

355 ), 

356 ) 

357 

358 

359def _convert_nodes_to_standalone_comment(nodes: Sequence[LN], *, newline: Leaf) -> None: 

360 """Convert nodes to STANDALONE_COMMENT by modifying the tree inline.""" 

361 if not nodes: 

362 return 

363 parent = nodes[0].parent 

364 first = first_leaf(nodes[0]) 

365 if not parent or not first: 

366 return 

367 prefix = first.prefix 

368 first.prefix = "" 

369 value = "".join(str(node) for node in nodes) 

370 # The prefix comment on the NEWLINE leaf is the trailing comment of the statement. 

371 if newline.prefix: 

372 value += newline.prefix 

373 newline.prefix = "" 

374 index = nodes[0].remove() 

375 for node in nodes[1:]: 

376 node.remove() 

377 if index is not None: 

378 parent.insert_child( 

379 index, 

380 Leaf( 

381 STANDALONE_COMMENT, 

382 value, 

383 prefix=prefix, 

384 fmt_pass_converted_first_leaf=first, 

385 ), 

386 ) 

387 

388 

389def _leaf_line_end(leaf: Leaf) -> int: 

390 """Returns the line number of the leaf node's last line.""" 

391 if leaf.type == NEWLINE: 

392 return leaf.lineno 

393 else: 

394 # Leaf nodes like multiline strings can occupy multiple lines. 

395 return leaf.lineno + str(leaf).count("\n") 

396 

397 

398def _get_line_range(node_or_nodes: LN | list[LN]) -> set[int]: 

399 """Returns the line range of this node or list of nodes.""" 

400 if isinstance(node_or_nodes, list): 

401 nodes = node_or_nodes 

402 if not nodes: 

403 return set() 

404 first = first_leaf(nodes[0]) 

405 last = last_leaf(nodes[-1]) 

406 if first and last: 

407 line_start = first.lineno 

408 line_end = _leaf_line_end(last) 

409 return set(range(line_start, line_end + 1)) 

410 else: 

411 return set() 

412 else: 

413 node = node_or_nodes 

414 if isinstance(node, Leaf): 

415 return set(range(node.lineno, _leaf_line_end(node) + 1)) 

416 else: 

417 first = first_leaf(node) 

418 last = last_leaf(node) 

419 if first and last: 

420 return set(range(first.lineno, _leaf_line_end(last) + 1)) 

421 else: 

422 return set() 

423 

424 

425@dataclass 

426class _LinesMapping: 

427 """1-based lines mapping from original source to modified source. 

428 

429 Lines [original_start, original_end] from original source 

430 are mapped to [modified_start, modified_end]. 

431 

432 The ranges are inclusive on both ends. 

433 """ 

434 

435 original_start: int 

436 original_end: int 

437 modified_start: int 

438 modified_end: int 

439 # Whether this range corresponds to a changed block, or an unchanged block. 

440 is_changed_block: bool 

441 

442 

443def _calculate_lines_mappings( 

444 original_source: str, 

445 modified_source: str, 

446) -> Sequence[_LinesMapping]: 

447 """Returns a sequence of _LinesMapping by diffing the sources. 

448 

449 For example, given the following diff: 

450 import re 

451 - def func(arg1, 

452 - arg2, arg3): 

453 + def func(arg1, arg2, arg3): 

454 pass 

455 It returns the following mappings: 

456 original -> modified 

457 (1, 1) -> (1, 1), is_changed_block=False (the "import re" line) 

458 (2, 3) -> (2, 2), is_changed_block=True (the diff) 

459 (4, 4) -> (3, 3), is_changed_block=False (the "pass" line) 

460 

461 You can think of this visually as if it brings up a side-by-side diff, and tries 

462 to map the line ranges from the left side to the right side: 

463 

464 (1, 1)->(1, 1) 1. import re 1. import re 

465 (2, 3)->(2, 2) 2. def func(arg1, 2. def func(arg1, arg2, arg3): 

466 3. arg2, arg3): 

467 (4, 4)->(3, 3) 4. pass 3. pass 

468 

469 Args: 

470 original_source: the original source. 

471 modified_source: the modified source. 

472 """ 

473 matcher = difflib.SequenceMatcher( 

474 None, 

475 original_source.splitlines(keepends=True), 

476 modified_source.splitlines(keepends=True), 

477 ) 

478 matching_blocks = matcher.get_matching_blocks() 

479 lines_mappings: list[_LinesMapping] = [] 

480 # matching_blocks is a sequence of "same block of code ranges", see 

481 # https://docs.python.org/3/library/difflib.html#difflib.SequenceMatcher.get_matching_blocks 

482 # Each block corresponds to a _LinesMapping with is_changed_block=False, 

483 # and the ranges between two blocks corresponds to a _LinesMapping with 

484 # is_changed_block=True, 

485 # NOTE: matching_blocks is 0-based, but _LinesMapping is 1-based. 

486 for i, block in enumerate(matching_blocks): 

487 if i == 0: 

488 if block.a != 0 or block.b != 0: 

489 lines_mappings.append( 

490 _LinesMapping( 

491 original_start=1, 

492 original_end=block.a, 

493 modified_start=1, 

494 modified_end=block.b, 

495 is_changed_block=False, 

496 ) 

497 ) 

498 else: 

499 previous_block = matching_blocks[i - 1] 

500 lines_mappings.append( 

501 _LinesMapping( 

502 original_start=previous_block.a + previous_block.size + 1, 

503 original_end=block.a, 

504 modified_start=previous_block.b + previous_block.size + 1, 

505 modified_end=block.b, 

506 is_changed_block=True, 

507 ) 

508 ) 

509 if i < len(matching_blocks) - 1: 

510 lines_mappings.append( 

511 _LinesMapping( 

512 original_start=block.a + 1, 

513 original_end=block.a + block.size, 

514 modified_start=block.b + 1, 

515 modified_end=block.b + block.size, 

516 is_changed_block=False, 

517 ) 

518 ) 

519 return lines_mappings 

520 

521 

522def _find_lines_mapping_index( 

523 original_line: int, 

524 lines_mappings: Sequence[_LinesMapping], 

525 start_index: int, 

526) -> int: 

527 """Returns the original index of the lines mappings for the original line.""" 

528 index = start_index 

529 while index < len(lines_mappings): 

530 mapping = lines_mappings[index] 

531 if mapping.original_start <= original_line <= mapping.original_end: 

532 return index 

533 index += 1 

534 return index