Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/black/ranges.py: 15%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

213 statements  

1"""Functions related to Black's formatting by line ranges feature.""" 

2 

3import difflib 

4from collections.abc import Collection, Iterator, Sequence 

5from dataclasses import dataclass 

6from typing import Union 

7 

8from black.nodes import ( 

9 LN, 

10 STANDALONE_COMMENT, 

11 Leaf, 

12 Node, 

13 Visitor, 

14 first_leaf, 

15 furthest_ancestor_with_last_leaf, 

16 last_leaf, 

17 syms, 

18) 

19from blib2to3.pgen2.token import ASYNC, NEWLINE 

20 

21 

22def parse_line_ranges(line_ranges: Sequence[str]) -> list[tuple[int, int]]: 

23 lines: list[tuple[int, int]] = [] 

24 for lines_str in line_ranges: 

25 parts = lines_str.split("-") 

26 if len(parts) != 2: 

27 raise ValueError( 

28 "Incorrect --line-ranges format, expect 'START-END', found" 

29 f" {lines_str!r}" 

30 ) 

31 try: 

32 start = int(parts[0]) 

33 end = int(parts[1]) 

34 except ValueError: 

35 raise ValueError( 

36 "Incorrect --line-ranges value, expect integer ranges, found" 

37 f" {lines_str!r}" 

38 ) from None 

39 else: 

40 lines.append((start, end)) 

41 return lines 

42 

43 

44def is_valid_line_range(lines: tuple[int, int]) -> bool: 

45 """Returns whether the line range is valid.""" 

46 return not lines or lines[0] <= lines[1] 

47 

48 

49def sanitized_lines( 

50 lines: Collection[tuple[int, int]], src_contents: str 

51) -> Collection[tuple[int, int]]: 

52 """Returns the valid line ranges for the given source. 

53 

54 This removes ranges that are entirely outside the valid lines. 

55 

56 Other ranges are normalized so that the start values are at least 1 and the 

57 end values are at most the (1-based) index of the last source line. 

58 """ 

59 if not src_contents: 

60 return [] 

61 good_lines = [] 

62 src_line_count = src_contents.count("\n") 

63 if not src_contents.endswith("\n"): 

64 src_line_count += 1 

65 for start, end in lines: 

66 if start > src_line_count: 

67 continue 

68 # line-ranges are 1-based 

69 start = max(start, 1) 

70 if end < start: 

71 continue 

72 end = min(end, src_line_count) 

73 good_lines.append((start, end)) 

74 return good_lines 

75 

76 

77def adjusted_lines( 

78 lines: Collection[tuple[int, int]], 

79 original_source: str, 

80 modified_source: str, 

81) -> list[tuple[int, int]]: 

82 """Returns the adjusted line ranges based on edits from the original code. 

83 

84 This computes the new line ranges by diffing original_source and 

85 modified_source, and adjust each range based on how the range overlaps with 

86 the diffs. 

87 

88 Note the diff can contain lines outside of the original line ranges. This can 

89 happen when the formatting has to be done in adjacent to maintain consistent 

90 local results. For example: 

91 

92 1. def my_func(arg1, arg2, 

93 2. arg3,): 

94 3. pass 

95 

96 If it restricts to line 2-2, it can't simply reformat line 2, it also has 

97 to reformat line 1: 

98 

99 1. def my_func( 

100 2. arg1, 

101 3. arg2, 

102 4. arg3, 

103 5. ): 

104 6. pass 

105 

106 In this case, we will expand the line ranges to also include the whole diff 

107 block. 

108 

109 Args: 

110 lines: a collection of line ranges. 

111 original_source: the original source. 

112 modified_source: the modified source. 

113 """ 

114 lines_mappings = _calculate_lines_mappings(original_source, modified_source) 

115 

116 new_lines = [] 

117 # Keep an index of the current search. Since the lines and lines_mappings are 

118 # sorted, this makes the search complexity linear. 

119 current_mapping_index = 0 

120 for start, end in sorted(lines): 

121 start_mapping_index = _find_lines_mapping_index( 

122 start, 

123 lines_mappings, 

124 current_mapping_index, 

125 ) 

126 end_mapping_index = _find_lines_mapping_index( 

127 end, 

128 lines_mappings, 

129 start_mapping_index, 

130 ) 

131 current_mapping_index = start_mapping_index 

132 if start_mapping_index >= len(lines_mappings) or end_mapping_index >= len( 

133 lines_mappings 

134 ): 

135 # Protect against invalid inputs. 

136 continue 

137 start_mapping = lines_mappings[start_mapping_index] 

138 end_mapping = lines_mappings[end_mapping_index] 

139 if start_mapping.is_changed_block: 

140 # When the line falls into a changed block, expands to the whole block. 

141 new_start = start_mapping.modified_start 

142 else: 

143 new_start = ( 

144 start - start_mapping.original_start + start_mapping.modified_start 

145 ) 

146 if end_mapping.is_changed_block: 

147 # When the line falls into a changed block, expands to the whole block. 

148 new_end = end_mapping.modified_end 

149 else: 

150 new_end = end - end_mapping.original_start + end_mapping.modified_start 

151 new_range = (new_start, new_end) 

152 if is_valid_line_range(new_range): 

153 new_lines.append(new_range) 

154 return new_lines 

155 

156 

157def convert_unchanged_lines(src_node: Node, lines: Collection[tuple[int, int]]) -> None: 

158 r"""Converts unchanged lines to STANDALONE_COMMENT. 

159 

160 The idea is similar to how `# fmt: on/off` is implemented. It also converts the 

161 nodes between those markers as a single `STANDALONE_COMMENT` leaf node with 

162 the unformatted code as its value. `STANDALONE_COMMENT` is a "fake" token 

163 that will be formatted as-is with its prefix normalized. 

164 

165 Here we perform two passes: 

166 

167 1. Visit the top-level statements, and convert them to a single 

168 `STANDALONE_COMMENT` when unchanged. This speeds up formatting when some 

169 of the top-level statements aren't changed. 

170 2. Convert unchanged "unwrapped lines" to `STANDALONE_COMMENT` nodes line by 

171 line. "unwrapped lines" are divided by the `NEWLINE` token. e.g. a 

172 multi-line statement is *one* "unwrapped line" that ends with `NEWLINE`, 

173 even though this statement itself can span multiple lines, and the 

174 tokenizer only sees the last '\n' as the `NEWLINE` token. 

175 

176 NOTE: During pass (2), comment prefixes and indentations are ALWAYS 

177 normalized even when the lines aren't changed. This is fixable by moving 

178 more formatting to pass (1). However, it's hard to get it correct when 

179 incorrect indentations are used. So we defer this to future optimizations. 

180 """ 

181 lines_set: set[int] = set() 

182 for start, end in lines: 

183 lines_set.update(range(start, end + 1)) 

184 visitor = _TopLevelStatementsVisitor(lines_set) 

185 _ = list(visitor.visit(src_node)) # Consume all results. 

186 _convert_unchanged_line_by_line(src_node, lines_set) 

187 

188 

189def _contains_standalone_comment(node: LN) -> bool: 

190 if isinstance(node, Leaf): 

191 return node.type == STANDALONE_COMMENT 

192 else: 

193 for child in node.children: 

194 if _contains_standalone_comment(child): 

195 return True 

196 return False 

197 

198 

199class _TopLevelStatementsVisitor(Visitor[None]): 

200 """ 

201 A node visitor that converts unchanged top-level statements to 

202 STANDALONE_COMMENT. 

203 

204 This is used in addition to _convert_unchanged_line_by_line, to 

205 speed up formatting when there are unchanged top-level 

206 classes/functions/statements. 

207 """ 

208 

209 def __init__(self, lines_set: set[int]): 

210 self._lines_set = lines_set 

211 

212 def visit_simple_stmt(self, node: Node) -> Iterator[None]: 

213 # This is only called for top-level statements, since `visit_suite` 

214 # won't visit its children nodes. 

215 yield from [] 

216 newline_leaf = last_leaf(node) 

217 if not newline_leaf: 

218 return 

219 assert ( 

220 newline_leaf.type == NEWLINE 

221 ), f"Unexpectedly found leaf.type={newline_leaf.type}" 

222 # We need to find the furthest ancestor with the NEWLINE as the last 

223 # leaf, since a `suite` can simply be a `simple_stmt` when it puts 

224 # its body on the same line. Example: `if cond: pass`. 

225 ancestor = furthest_ancestor_with_last_leaf(newline_leaf) 

226 if not _get_line_range(ancestor).intersection(self._lines_set): 

227 _convert_node_to_standalone_comment(ancestor) 

228 

229 def visit_suite(self, node: Node) -> Iterator[None]: 

230 yield from [] 

231 # If there is a STANDALONE_COMMENT node, it means parts of the node tree 

232 # have fmt on/off/skip markers. Those STANDALONE_COMMENT nodes can't 

233 # be simply converted by calling str(node). So we just don't convert 

234 # here. 

235 if _contains_standalone_comment(node): 

236 return 

237 # Find the semantic parent of this suite. For `async_stmt` and 

238 # `async_funcdef`, the ASYNC token is defined on a separate level by the 

239 # grammar. 

240 semantic_parent = node.parent 

241 if semantic_parent is not None: 

242 if ( 

243 semantic_parent.prev_sibling is not None 

244 and semantic_parent.prev_sibling.type == ASYNC 

245 ): 

246 semantic_parent = semantic_parent.parent 

247 if semantic_parent is not None and not _get_line_range( 

248 semantic_parent 

249 ).intersection(self._lines_set): 

250 _convert_node_to_standalone_comment(semantic_parent) 

251 

252 

253def _convert_unchanged_line_by_line(node: Node, lines_set: set[int]) -> None: 

254 """Converts unchanged to STANDALONE_COMMENT line by line.""" 

255 for leaf in node.leaves(): 

256 if leaf.type != NEWLINE: 

257 # We only consider "unwrapped lines", which are divided by the NEWLINE 

258 # token. 

259 continue 

260 if leaf.parent and leaf.parent.type == syms.match_stmt: 

261 # The `suite` node is defined as: 

262 # match_stmt: "match" subject_expr ':' NEWLINE INDENT case_block+ DEDENT 

263 # Here we need to check `subject_expr`. The `case_block+` will be 

264 # checked by their own NEWLINEs. 

265 nodes_to_ignore: list[LN] = [] 

266 prev_sibling = leaf.prev_sibling 

267 while prev_sibling: 

268 nodes_to_ignore.insert(0, prev_sibling) 

269 prev_sibling = prev_sibling.prev_sibling 

270 if not _get_line_range(nodes_to_ignore).intersection(lines_set): 

271 _convert_nodes_to_standalone_comment(nodes_to_ignore, newline=leaf) 

272 elif leaf.parent and leaf.parent.type == syms.suite: 

273 # The `suite` node is defined as: 

274 # suite: simple_stmt | NEWLINE INDENT stmt+ DEDENT 

275 # We will check `simple_stmt` and `stmt+` separately against the lines set 

276 parent_sibling = leaf.parent.prev_sibling 

277 nodes_to_ignore = [] 

278 while parent_sibling and parent_sibling.type != syms.suite: 

279 # NOTE: Multiple suite nodes can exist as siblings in e.g. `if_stmt`. 

280 nodes_to_ignore.insert(0, parent_sibling) 

281 parent_sibling = parent_sibling.prev_sibling 

282 # Special case for `async_stmt` and `async_funcdef` where the ASYNC 

283 # token is on the grandparent node. 

284 grandparent = leaf.parent.parent 

285 if ( 

286 grandparent is not None 

287 and grandparent.prev_sibling is not None 

288 and grandparent.prev_sibling.type == ASYNC 

289 ): 

290 nodes_to_ignore.insert(0, grandparent.prev_sibling) 

291 if not _get_line_range(nodes_to_ignore).intersection(lines_set): 

292 _convert_nodes_to_standalone_comment(nodes_to_ignore, newline=leaf) 

293 else: 

294 ancestor = furthest_ancestor_with_last_leaf(leaf) 

295 # Consider multiple decorators as a whole block, as their 

296 # newlines have different behaviors than the rest of the grammar. 

297 if ( 

298 ancestor.type == syms.decorator 

299 and ancestor.parent 

300 and ancestor.parent.type == syms.decorators 

301 ): 

302 ancestor = ancestor.parent 

303 if not _get_line_range(ancestor).intersection(lines_set): 

304 _convert_node_to_standalone_comment(ancestor) 

305 

306 

307def _convert_node_to_standalone_comment(node: LN) -> None: 

308 """Convert node to STANDALONE_COMMENT by modifying the tree inline.""" 

309 parent = node.parent 

310 if not parent: 

311 return 

312 first = first_leaf(node) 

313 last = last_leaf(node) 

314 if not first or not last: 

315 return 

316 if first is last: 

317 # This can happen on the following edge cases: 

318 # 1. A block of `# fmt: off/on` code except the `# fmt: on` is placed 

319 # on the end of the last line instead of on a new line. 

320 # 2. A single backslash on its own line followed by a comment line. 

321 # Ideally we don't want to format them when not requested, but fixing 

322 # isn't easy. These cases are also badly formatted code, so it isn't 

323 # too bad we reformat them. 

324 return 

325 # The prefix contains comments and indentation whitespaces. They are 

326 # reformatted accordingly to the correct indentation level. 

327 # This also means the indentation will be changed on the unchanged lines, and 

328 # this is actually required to not break incremental reformatting. 

329 prefix = first.prefix 

330 first.prefix = "" 

331 index = node.remove() 

332 if index is not None: 

333 # Because of the special handling of multiple decorators, if the decorated 

334 # item is a single line then there will be a missing newline between the 

335 # decorator and item, so add it back. This doesn't affect any other case 

336 # since a decorated item with a newline would hit the earlier suite case 

337 # in _convert_unchanged_line_by_line that correctly handles the newlines. 

338 if node.type == syms.decorated: 

339 # A leaf of type decorated wouldn't make sense, since it should always 

340 # have at least the decorator + the decorated item, so if this assert 

341 # hits that means there's a problem in the parser. 

342 assert isinstance(node, Node) 

343 # 1 will always be the correct index since before this function is 

344 # called all the decorators are collapsed into a single leaf 

345 node.insert_child(1, Leaf(NEWLINE, "\n")) 

346 # Remove the '\n', as STANDALONE_COMMENT will have '\n' appended when 

347 # generating the formatted code. 

348 value = str(node)[:-1] 

349 parent.insert_child( 

350 index, 

351 Leaf( 

352 STANDALONE_COMMENT, 

353 value, 

354 prefix=prefix, 

355 fmt_pass_converted_first_leaf=first, 

356 ), 

357 ) 

358 

359 

360def _convert_nodes_to_standalone_comment(nodes: Sequence[LN], *, newline: Leaf) -> None: 

361 """Convert nodes to STANDALONE_COMMENT by modifying the tree inline.""" 

362 if not nodes: 

363 return 

364 parent = nodes[0].parent 

365 first = first_leaf(nodes[0]) 

366 if not parent or not first: 

367 return 

368 prefix = first.prefix 

369 first.prefix = "" 

370 value = "".join(str(node) for node in nodes) 

371 # The prefix comment on the NEWLINE leaf is the trailing comment of the statement. 

372 if newline.prefix: 

373 value += newline.prefix 

374 newline.prefix = "" 

375 index = nodes[0].remove() 

376 for node in nodes[1:]: 

377 node.remove() 

378 if index is not None: 

379 parent.insert_child( 

380 index, 

381 Leaf( 

382 STANDALONE_COMMENT, 

383 value, 

384 prefix=prefix, 

385 fmt_pass_converted_first_leaf=first, 

386 ), 

387 ) 

388 

389 

390def _leaf_line_end(leaf: Leaf) -> int: 

391 """Returns the line number of the leaf node's last line.""" 

392 if leaf.type == NEWLINE: 

393 return leaf.lineno 

394 else: 

395 # Leaf nodes like multiline strings can occupy multiple lines. 

396 return leaf.lineno + str(leaf).count("\n") 

397 

398 

399def _get_line_range(node_or_nodes: Union[LN, list[LN]]) -> set[int]: 

400 """Returns the line range of this node or list of nodes.""" 

401 if isinstance(node_or_nodes, list): 

402 nodes = node_or_nodes 

403 if not nodes: 

404 return set() 

405 first = first_leaf(nodes[0]) 

406 last = last_leaf(nodes[-1]) 

407 if first and last: 

408 line_start = first.lineno 

409 line_end = _leaf_line_end(last) 

410 return set(range(line_start, line_end + 1)) 

411 else: 

412 return set() 

413 else: 

414 node = node_or_nodes 

415 if isinstance(node, Leaf): 

416 return set(range(node.lineno, _leaf_line_end(node) + 1)) 

417 else: 

418 first = first_leaf(node) 

419 last = last_leaf(node) 

420 if first and last: 

421 return set(range(first.lineno, _leaf_line_end(last) + 1)) 

422 else: 

423 return set() 

424 

425 

426@dataclass 

427class _LinesMapping: 

428 """1-based lines mapping from original source to modified source. 

429 

430 Lines [original_start, original_end] from original source 

431 are mapped to [modified_start, modified_end]. 

432 

433 The ranges are inclusive on both ends. 

434 """ 

435 

436 original_start: int 

437 original_end: int 

438 modified_start: int 

439 modified_end: int 

440 # Whether this range corresponds to a changed block, or an unchanged block. 

441 is_changed_block: bool 

442 

443 

444def _calculate_lines_mappings( 

445 original_source: str, 

446 modified_source: str, 

447) -> Sequence[_LinesMapping]: 

448 """Returns a sequence of _LinesMapping by diffing the sources. 

449 

450 For example, given the following diff: 

451 import re 

452 - def func(arg1, 

453 - arg2, arg3): 

454 + def func(arg1, arg2, arg3): 

455 pass 

456 It returns the following mappings: 

457 original -> modified 

458 (1, 1) -> (1, 1), is_changed_block=False (the "import re" line) 

459 (2, 3) -> (2, 2), is_changed_block=True (the diff) 

460 (4, 4) -> (3, 3), is_changed_block=False (the "pass" line) 

461 

462 You can think of this visually as if it brings up a side-by-side diff, and tries 

463 to map the line ranges from the left side to the right side: 

464 

465 (1, 1)->(1, 1) 1. import re 1. import re 

466 (2, 3)->(2, 2) 2. def func(arg1, 2. def func(arg1, arg2, arg3): 

467 3. arg2, arg3): 

468 (4, 4)->(3, 3) 4. pass 3. pass 

469 

470 Args: 

471 original_source: the original source. 

472 modified_source: the modified source. 

473 """ 

474 matcher = difflib.SequenceMatcher( 

475 None, 

476 original_source.splitlines(keepends=True), 

477 modified_source.splitlines(keepends=True), 

478 ) 

479 matching_blocks = matcher.get_matching_blocks() 

480 lines_mappings: list[_LinesMapping] = [] 

481 # matching_blocks is a sequence of "same block of code ranges", see 

482 # https://docs.python.org/3/library/difflib.html#difflib.SequenceMatcher.get_matching_blocks 

483 # Each block corresponds to a _LinesMapping with is_changed_block=False, 

484 # and the ranges between two blocks corresponds to a _LinesMapping with 

485 # is_changed_block=True, 

486 # NOTE: matching_blocks is 0-based, but _LinesMapping is 1-based. 

487 for i, block in enumerate(matching_blocks): 

488 if i == 0: 

489 if block.a != 0 or block.b != 0: 

490 lines_mappings.append( 

491 _LinesMapping( 

492 original_start=1, 

493 original_end=block.a, 

494 modified_start=1, 

495 modified_end=block.b, 

496 is_changed_block=False, 

497 ) 

498 ) 

499 else: 

500 previous_block = matching_blocks[i - 1] 

501 lines_mappings.append( 

502 _LinesMapping( 

503 original_start=previous_block.a + previous_block.size + 1, 

504 original_end=block.a, 

505 modified_start=previous_block.b + previous_block.size + 1, 

506 modified_end=block.b, 

507 is_changed_block=True, 

508 ) 

509 ) 

510 if i < len(matching_blocks) - 1: 

511 lines_mappings.append( 

512 _LinesMapping( 

513 original_start=block.a + 1, 

514 original_end=block.a + block.size, 

515 modified_start=block.b + 1, 

516 modified_end=block.b + block.size, 

517 is_changed_block=False, 

518 ) 

519 ) 

520 return lines_mappings 

521 

522 

523def _find_lines_mapping_index( 

524 original_line: int, 

525 lines_mappings: Sequence[_LinesMapping], 

526 start_index: int, 

527) -> int: 

528 """Returns the original index of the lines mappings for the original line.""" 

529 index = start_index 

530 while index < len(lines_mappings): 

531 mapping = lines_mappings[index] 

532 if mapping.original_start <= original_line <= mapping.original_end: 

533 return index 

534 index += 1 

535 return index