Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/black/ranges.py: 15%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

210 statements  

1"""Functions related to Black's formatting by line ranges feature.""" 

2 

3import difflib 

4from collections.abc import Collection, Iterator, Sequence 

5from dataclasses import dataclass 

6from typing import Union 

7 

8from black.nodes import ( 

9 LN, 

10 STANDALONE_COMMENT, 

11 Leaf, 

12 Node, 

13 Visitor, 

14 first_leaf, 

15 furthest_ancestor_with_last_leaf, 

16 last_leaf, 

17 syms, 

18) 

19from blib2to3.pgen2.token import ASYNC, NEWLINE 

20 

21 

22def parse_line_ranges(line_ranges: Sequence[str]) -> list[tuple[int, int]]: 

23 lines: list[tuple[int, int]] = [] 

24 for lines_str in line_ranges: 

25 parts = lines_str.split("-") 

26 if len(parts) != 2: 

27 raise ValueError( 

28 "Incorrect --line-ranges format, expect 'START-END', found" 

29 f" {lines_str!r}" 

30 ) 

31 try: 

32 start = int(parts[0]) 

33 end = int(parts[1]) 

34 except ValueError: 

35 raise ValueError( 

36 "Incorrect --line-ranges value, expect integer ranges, found" 

37 f" {lines_str!r}" 

38 ) from None 

39 else: 

40 lines.append((start, end)) 

41 return lines 

42 

43 

44def is_valid_line_range(lines: tuple[int, int]) -> bool: 

45 """Returns whether the line range is valid.""" 

46 return not lines or lines[0] <= lines[1] 

47 

48 

49def sanitized_lines( 

50 lines: Collection[tuple[int, int]], src_contents: str 

51) -> Collection[tuple[int, int]]: 

52 """Returns the valid line ranges for the given source. 

53 

54 This removes ranges that are entirely outside the valid lines. 

55 

56 Other ranges are normalized so that the start values are at least 1 and the 

57 end values are at most the (1-based) index of the last source line. 

58 """ 

59 if not src_contents: 

60 return [] 

61 good_lines = [] 

62 src_line_count = src_contents.count("\n") 

63 if not src_contents.endswith("\n"): 

64 src_line_count += 1 

65 for start, end in lines: 

66 if start > src_line_count: 

67 continue 

68 # line-ranges are 1-based 

69 start = max(start, 1) 

70 if end < start: 

71 continue 

72 end = min(end, src_line_count) 

73 good_lines.append((start, end)) 

74 return good_lines 

75 

76 

77def adjusted_lines( 

78 lines: Collection[tuple[int, int]], 

79 original_source: str, 

80 modified_source: str, 

81) -> list[tuple[int, int]]: 

82 """Returns the adjusted line ranges based on edits from the original code. 

83 

84 This computes the new line ranges by diffing original_source and 

85 modified_source, and adjust each range based on how the range overlaps with 

86 the diffs. 

87 

88 Note the diff can contain lines outside of the original line ranges. This can 

89 happen when the formatting has to be done in adjacent to maintain consistent 

90 local results. For example: 

91 

92 1. def my_func(arg1, arg2, 

93 2. arg3,): 

94 3. pass 

95 

96 If it restricts to line 2-2, it can't simply reformat line 2, it also has 

97 to reformat line 1: 

98 

99 1. def my_func( 

100 2. arg1, 

101 3. arg2, 

102 4. arg3, 

103 5. ): 

104 6. pass 

105 

106 In this case, we will expand the line ranges to also include the whole diff 

107 block. 

108 

109 Args: 

110 lines: a collection of line ranges. 

111 original_source: the original source. 

112 modified_source: the modified source. 

113 """ 

114 lines_mappings = _calculate_lines_mappings(original_source, modified_source) 

115 

116 new_lines = [] 

117 # Keep an index of the current search. Since the lines and lines_mappings are 

118 # sorted, this makes the search complexity linear. 

119 current_mapping_index = 0 

120 for start, end in sorted(lines): 

121 start_mapping_index = _find_lines_mapping_index( 

122 start, 

123 lines_mappings, 

124 current_mapping_index, 

125 ) 

126 end_mapping_index = _find_lines_mapping_index( 

127 end, 

128 lines_mappings, 

129 start_mapping_index, 

130 ) 

131 current_mapping_index = start_mapping_index 

132 if start_mapping_index >= len(lines_mappings) or end_mapping_index >= len( 

133 lines_mappings 

134 ): 

135 # Protect against invalid inputs. 

136 continue 

137 start_mapping = lines_mappings[start_mapping_index] 

138 end_mapping = lines_mappings[end_mapping_index] 

139 if start_mapping.is_changed_block: 

140 # When the line falls into a changed block, expands to the whole block. 

141 new_start = start_mapping.modified_start 

142 else: 

143 new_start = ( 

144 start - start_mapping.original_start + start_mapping.modified_start 

145 ) 

146 if end_mapping.is_changed_block: 

147 # When the line falls into a changed block, expands to the whole block. 

148 new_end = end_mapping.modified_end 

149 else: 

150 new_end = end - end_mapping.original_start + end_mapping.modified_start 

151 new_range = (new_start, new_end) 

152 if is_valid_line_range(new_range): 

153 new_lines.append(new_range) 

154 return new_lines 

155 

156 

157def convert_unchanged_lines(src_node: Node, lines: Collection[tuple[int, int]]) -> None: 

158 """Converts unchanged lines to STANDALONE_COMMENT. 

159 

160 The idea is similar to how `# fmt: on/off` is implemented. It also converts the 

161 nodes between those markers as a single `STANDALONE_COMMENT` leaf node with 

162 the unformatted code as its value. `STANDALONE_COMMENT` is a "fake" token 

163 that will be formatted as-is with its prefix normalized. 

164 

165 Here we perform two passes: 

166 

167 1. Visit the top-level statements, and convert them to a single 

168 `STANDALONE_COMMENT` when unchanged. This speeds up formatting when some 

169 of the top-level statements aren't changed. 

170 2. Convert unchanged "unwrapped lines" to `STANDALONE_COMMENT` nodes line by 

171 line. "unwrapped lines" are divided by the `NEWLINE` token. e.g. a 

172 multi-line statement is *one* "unwrapped line" that ends with `NEWLINE`, 

173 even though this statement itself can span multiple lines, and the 

174 tokenizer only sees the last '\n' as the `NEWLINE` token. 

175 

176 NOTE: During pass (2), comment prefixes and indentations are ALWAYS 

177 normalized even when the lines aren't changed. This is fixable by moving 

178 more formatting to pass (1). However, it's hard to get it correct when 

179 incorrect indentations are used. So we defer this to future optimizations. 

180 """ 

181 lines_set: set[int] = set() 

182 for start, end in lines: 

183 lines_set.update(range(start, end + 1)) 

184 visitor = _TopLevelStatementsVisitor(lines_set) 

185 _ = list(visitor.visit(src_node)) # Consume all results. 

186 _convert_unchanged_line_by_line(src_node, lines_set) 

187 

188 

189def _contains_standalone_comment(node: LN) -> bool: 

190 if isinstance(node, Leaf): 

191 return node.type == STANDALONE_COMMENT 

192 else: 

193 for child in node.children: 

194 if _contains_standalone_comment(child): 

195 return True 

196 return False 

197 

198 

199class _TopLevelStatementsVisitor(Visitor[None]): 

200 """ 

201 A node visitor that converts unchanged top-level statements to 

202 STANDALONE_COMMENT. 

203 

204 This is used in addition to _convert_unchanged_line_by_line, to 

205 speed up formatting when there are unchanged top-level 

206 classes/functions/statements. 

207 """ 

208 

209 def __init__(self, lines_set: set[int]): 

210 self._lines_set = lines_set 

211 

212 def visit_simple_stmt(self, node: Node) -> Iterator[None]: 

213 # This is only called for top-level statements, since `visit_suite` 

214 # won't visit its children nodes. 

215 yield from [] 

216 newline_leaf = last_leaf(node) 

217 if not newline_leaf: 

218 return 

219 assert ( 

220 newline_leaf.type == NEWLINE 

221 ), f"Unexpectedly found leaf.type={newline_leaf.type}" 

222 # We need to find the furthest ancestor with the NEWLINE as the last 

223 # leaf, since a `suite` can simply be a `simple_stmt` when it puts 

224 # its body on the same line. Example: `if cond: pass`. 

225 ancestor = furthest_ancestor_with_last_leaf(newline_leaf) 

226 if not _get_line_range(ancestor).intersection(self._lines_set): 

227 _convert_node_to_standalone_comment(ancestor) 

228 

229 def visit_suite(self, node: Node) -> Iterator[None]: 

230 yield from [] 

231 # If there is a STANDALONE_COMMENT node, it means parts of the node tree 

232 # have fmt on/off/skip markers. Those STANDALONE_COMMENT nodes can't 

233 # be simply converted by calling str(node). So we just don't convert 

234 # here. 

235 if _contains_standalone_comment(node): 

236 return 

237 # Find the semantic parent of this suite. For `async_stmt` and 

238 # `async_funcdef`, the ASYNC token is defined on a separate level by the 

239 # grammar. 

240 semantic_parent = node.parent 

241 if semantic_parent is not None: 

242 if ( 

243 semantic_parent.prev_sibling is not None 

244 and semantic_parent.prev_sibling.type == ASYNC 

245 ): 

246 semantic_parent = semantic_parent.parent 

247 if semantic_parent is not None and not _get_line_range( 

248 semantic_parent 

249 ).intersection(self._lines_set): 

250 _convert_node_to_standalone_comment(semantic_parent) 

251 

252 

253def _convert_unchanged_line_by_line(node: Node, lines_set: set[int]) -> None: 

254 """Converts unchanged to STANDALONE_COMMENT line by line.""" 

255 for leaf in node.leaves(): 

256 if leaf.type != NEWLINE: 

257 # We only consider "unwrapped lines", which are divided by the NEWLINE 

258 # token. 

259 continue 

260 if leaf.parent and leaf.parent.type == syms.match_stmt: 

261 # The `suite` node is defined as: 

262 # match_stmt: "match" subject_expr ':' NEWLINE INDENT case_block+ DEDENT 

263 # Here we need to check `subject_expr`. The `case_block+` will be 

264 # checked by their own NEWLINEs. 

265 nodes_to_ignore: list[LN] = [] 

266 prev_sibling = leaf.prev_sibling 

267 while prev_sibling: 

268 nodes_to_ignore.insert(0, prev_sibling) 

269 prev_sibling = prev_sibling.prev_sibling 

270 if not _get_line_range(nodes_to_ignore).intersection(lines_set): 

271 _convert_nodes_to_standalone_comment(nodes_to_ignore, newline=leaf) 

272 elif leaf.parent and leaf.parent.type == syms.suite: 

273 # The `suite` node is defined as: 

274 # suite: simple_stmt | NEWLINE INDENT stmt+ DEDENT 

275 # We will check `simple_stmt` and `stmt+` separately against the lines set 

276 parent_sibling = leaf.parent.prev_sibling 

277 nodes_to_ignore = [] 

278 while parent_sibling and not parent_sibling.type == syms.suite: 

279 # NOTE: Multiple suite nodes can exist as siblings in e.g. `if_stmt`. 

280 nodes_to_ignore.insert(0, parent_sibling) 

281 parent_sibling = parent_sibling.prev_sibling 

282 # Special case for `async_stmt` and `async_funcdef` where the ASYNC 

283 # token is on the grandparent node. 

284 grandparent = leaf.parent.parent 

285 if ( 

286 grandparent is not None 

287 and grandparent.prev_sibling is not None 

288 and grandparent.prev_sibling.type == ASYNC 

289 ): 

290 nodes_to_ignore.insert(0, grandparent.prev_sibling) 

291 if not _get_line_range(nodes_to_ignore).intersection(lines_set): 

292 _convert_nodes_to_standalone_comment(nodes_to_ignore, newline=leaf) 

293 else: 

294 ancestor = furthest_ancestor_with_last_leaf(leaf) 

295 # Consider multiple decorators as a whole block, as their 

296 # newlines have different behaviors than the rest of the grammar. 

297 if ( 

298 ancestor.type == syms.decorator 

299 and ancestor.parent 

300 and ancestor.parent.type == syms.decorators 

301 ): 

302 ancestor = ancestor.parent 

303 if not _get_line_range(ancestor).intersection(lines_set): 

304 _convert_node_to_standalone_comment(ancestor) 

305 

306 

307def _convert_node_to_standalone_comment(node: LN) -> None: 

308 """Convert node to STANDALONE_COMMENT by modifying the tree inline.""" 

309 parent = node.parent 

310 if not parent: 

311 return 

312 first = first_leaf(node) 

313 last = last_leaf(node) 

314 if not first or not last: 

315 return 

316 if first is last: 

317 # This can happen on the following edge cases: 

318 # 1. A block of `# fmt: off/on` code except the `# fmt: on` is placed 

319 # on the end of the last line instead of on a new line. 

320 # 2. A single backslash on its own line followed by a comment line. 

321 # Ideally we don't want to format them when not requested, but fixing 

322 # isn't easy. These cases are also badly formatted code, so it isn't 

323 # too bad we reformat them. 

324 return 

325 # The prefix contains comments and indentation whitespaces. They are 

326 # reformatted accordingly to the correct indentation level. 

327 # This also means the indentation will be changed on the unchanged lines, and 

328 # this is actually required to not break incremental reformatting. 

329 prefix = first.prefix 

330 first.prefix = "" 

331 index = node.remove() 

332 if index is not None: 

333 # Remove the '\n', as STANDALONE_COMMENT will have '\n' appended when 

334 # generating the formatted code. 

335 value = str(node)[:-1] 

336 parent.insert_child( 

337 index, 

338 Leaf( 

339 STANDALONE_COMMENT, 

340 value, 

341 prefix=prefix, 

342 fmt_pass_converted_first_leaf=first, 

343 ), 

344 ) 

345 

346 

347def _convert_nodes_to_standalone_comment(nodes: Sequence[LN], *, newline: Leaf) -> None: 

348 """Convert nodes to STANDALONE_COMMENT by modifying the tree inline.""" 

349 if not nodes: 

350 return 

351 parent = nodes[0].parent 

352 first = first_leaf(nodes[0]) 

353 if not parent or not first: 

354 return 

355 prefix = first.prefix 

356 first.prefix = "" 

357 value = "".join(str(node) for node in nodes) 

358 # The prefix comment on the NEWLINE leaf is the trailing comment of the statement. 

359 if newline.prefix: 

360 value += newline.prefix 

361 newline.prefix = "" 

362 index = nodes[0].remove() 

363 for node in nodes[1:]: 

364 node.remove() 

365 if index is not None: 

366 parent.insert_child( 

367 index, 

368 Leaf( 

369 STANDALONE_COMMENT, 

370 value, 

371 prefix=prefix, 

372 fmt_pass_converted_first_leaf=first, 

373 ), 

374 ) 

375 

376 

377def _leaf_line_end(leaf: Leaf) -> int: 

378 """Returns the line number of the leaf node's last line.""" 

379 if leaf.type == NEWLINE: 

380 return leaf.lineno 

381 else: 

382 # Leaf nodes like multiline strings can occupy multiple lines. 

383 return leaf.lineno + str(leaf).count("\n") 

384 

385 

386def _get_line_range(node_or_nodes: Union[LN, list[LN]]) -> set[int]: 

387 """Returns the line range of this node or list of nodes.""" 

388 if isinstance(node_or_nodes, list): 

389 nodes = node_or_nodes 

390 if not nodes: 

391 return set() 

392 first = first_leaf(nodes[0]) 

393 last = last_leaf(nodes[-1]) 

394 if first and last: 

395 line_start = first.lineno 

396 line_end = _leaf_line_end(last) 

397 return set(range(line_start, line_end + 1)) 

398 else: 

399 return set() 

400 else: 

401 node = node_or_nodes 

402 if isinstance(node, Leaf): 

403 return set(range(node.lineno, _leaf_line_end(node) + 1)) 

404 else: 

405 first = first_leaf(node) 

406 last = last_leaf(node) 

407 if first and last: 

408 return set(range(first.lineno, _leaf_line_end(last) + 1)) 

409 else: 

410 return set() 

411 

412 

413@dataclass 

414class _LinesMapping: 

415 """1-based lines mapping from original source to modified source. 

416 

417 Lines [original_start, original_end] from original source 

418 are mapped to [modified_start, modified_end]. 

419 

420 The ranges are inclusive on both ends. 

421 """ 

422 

423 original_start: int 

424 original_end: int 

425 modified_start: int 

426 modified_end: int 

427 # Whether this range corresponds to a changed block, or an unchanged block. 

428 is_changed_block: bool 

429 

430 

431def _calculate_lines_mappings( 

432 original_source: str, 

433 modified_source: str, 

434) -> Sequence[_LinesMapping]: 

435 """Returns a sequence of _LinesMapping by diffing the sources. 

436 

437 For example, given the following diff: 

438 import re 

439 - def func(arg1, 

440 - arg2, arg3): 

441 + def func(arg1, arg2, arg3): 

442 pass 

443 It returns the following mappings: 

444 original -> modified 

445 (1, 1) -> (1, 1), is_changed_block=False (the "import re" line) 

446 (2, 3) -> (2, 2), is_changed_block=True (the diff) 

447 (4, 4) -> (3, 3), is_changed_block=False (the "pass" line) 

448 

449 You can think of this visually as if it brings up a side-by-side diff, and tries 

450 to map the line ranges from the left side to the right side: 

451 

452 (1, 1)->(1, 1) 1. import re 1. import re 

453 (2, 3)->(2, 2) 2. def func(arg1, 2. def func(arg1, arg2, arg3): 

454 3. arg2, arg3): 

455 (4, 4)->(3, 3) 4. pass 3. pass 

456 

457 Args: 

458 original_source: the original source. 

459 modified_source: the modified source. 

460 """ 

461 matcher = difflib.SequenceMatcher( 

462 None, 

463 original_source.splitlines(keepends=True), 

464 modified_source.splitlines(keepends=True), 

465 ) 

466 matching_blocks = matcher.get_matching_blocks() 

467 lines_mappings: list[_LinesMapping] = [] 

468 # matching_blocks is a sequence of "same block of code ranges", see 

469 # https://docs.python.org/3/library/difflib.html#difflib.SequenceMatcher.get_matching_blocks 

470 # Each block corresponds to a _LinesMapping with is_changed_block=False, 

471 # and the ranges between two blocks corresponds to a _LinesMapping with 

472 # is_changed_block=True, 

473 # NOTE: matching_blocks is 0-based, but _LinesMapping is 1-based. 

474 for i, block in enumerate(matching_blocks): 

475 if i == 0: 

476 if block.a != 0 or block.b != 0: 

477 lines_mappings.append( 

478 _LinesMapping( 

479 original_start=1, 

480 original_end=block.a, 

481 modified_start=1, 

482 modified_end=block.b, 

483 is_changed_block=False, 

484 ) 

485 ) 

486 else: 

487 previous_block = matching_blocks[i - 1] 

488 lines_mappings.append( 

489 _LinesMapping( 

490 original_start=previous_block.a + previous_block.size + 1, 

491 original_end=block.a, 

492 modified_start=previous_block.b + previous_block.size + 1, 

493 modified_end=block.b, 

494 is_changed_block=True, 

495 ) 

496 ) 

497 if i < len(matching_blocks) - 1: 

498 lines_mappings.append( 

499 _LinesMapping( 

500 original_start=block.a + 1, 

501 original_end=block.a + block.size, 

502 modified_start=block.b + 1, 

503 modified_end=block.b + block.size, 

504 is_changed_block=False, 

505 ) 

506 ) 

507 return lines_mappings 

508 

509 

510def _find_lines_mapping_index( 

511 original_line: int, 

512 lines_mappings: Sequence[_LinesMapping], 

513 start_index: int, 

514) -> int: 

515 """Returns the original index of the lines mappings for the original line.""" 

516 index = start_index 

517 while index < len(lines_mappings): 

518 mapping = lines_mappings[index] 

519 if mapping.original_start <= original_line <= mapping.original_end: 

520 return index 

521 index += 1 

522 return index