Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/dulwich/diff_tree.py: 20%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

425 statements  

1# diff_tree.py -- Utilities for diffing files and trees. 

2# Copyright (C) 2010 Google, Inc. 

3# 

4# SPDX-License-Identifier: Apache-2.0 OR GPL-2.0-or-later 

5# Dulwich is dual-licensed under the Apache License, Version 2.0 and the GNU 

6# General Public License as published by the Free Software Foundation; version 2.0 

7# or (at your option) any later version. You can redistribute it and/or 

8# modify it under the terms of either of these two licenses. 

9# 

10# Unless required by applicable law or agreed to in writing, software 

11# distributed under the License is distributed on an "AS IS" BASIS, 

12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

13# See the License for the specific language governing permissions and 

14# limitations under the License. 

15# 

16# You should have received a copy of the licenses; if not, see 

17# <http://www.gnu.org/licenses/> for a copy of the GNU General Public License 

18# and <http://www.apache.org/licenses/LICENSE-2.0> for a copy of the Apache 

19# License, Version 2.0. 

20# 

21 

22"""Utilities for diffing files and trees.""" 

23 

24import stat 

25from collections import defaultdict 

26from collections.abc import Iterator 

27from io import BytesIO 

28from itertools import chain 

29from typing import TYPE_CHECKING, Any, Callable, NamedTuple, Optional, TypeVar 

30 

31from .object_store import BaseObjectStore 

32from .objects import S_ISGITLINK, ObjectID, ShaFile, Tree, TreeEntry 

33 

34# TreeChange type constants. 

35CHANGE_ADD = "add" 

36CHANGE_MODIFY = "modify" 

37CHANGE_DELETE = "delete" 

38CHANGE_RENAME = "rename" 

39CHANGE_COPY = "copy" 

40CHANGE_UNCHANGED = "unchanged" 

41 

42RENAME_CHANGE_TYPES = (CHANGE_RENAME, CHANGE_COPY) 

43 

44# _NULL_ENTRY removed - using None instead 

45 

46_MAX_SCORE = 100 

47RENAME_THRESHOLD = 60 

48MAX_FILES = 200 

49REWRITE_THRESHOLD: Optional[int] = None 

50 

51 

52class TreeChange(NamedTuple): 

53 """Named tuple a single change between two trees.""" 

54 

55 type: str 

56 old: Optional[TreeEntry] 

57 new: Optional[TreeEntry] 

58 

59 @classmethod 

60 def add(cls, new: TreeEntry) -> "TreeChange": 

61 """Create a TreeChange for an added entry. 

62 

63 Args: 

64 new: New tree entry 

65 

66 Returns: 

67 TreeChange instance 

68 """ 

69 return cls(CHANGE_ADD, None, new) 

70 

71 @classmethod 

72 def delete(cls, old: TreeEntry) -> "TreeChange": 

73 """Create a TreeChange for a deleted entry. 

74 

75 Args: 

76 old: Old tree entry 

77 

78 Returns: 

79 TreeChange instance 

80 """ 

81 return cls(CHANGE_DELETE, old, None) 

82 

83 

84def _tree_entries(path: bytes, tree: Tree) -> list[TreeEntry]: 

85 result: list[TreeEntry] = [] 

86 if not tree: 

87 return result 

88 for entry in tree.iteritems(name_order=True): 

89 result.append(entry.in_path(path)) 

90 return result 

91 

92 

93def _merge_entries( 

94 path: bytes, tree1: Tree, tree2: Tree 

95) -> list[tuple[Optional[TreeEntry], Optional[TreeEntry]]]: 

96 """Merge the entries of two trees. 

97 

98 Args: 

99 path: A path to prepend to all tree entry names. 

100 tree1: The first Tree object to iterate, or None. 

101 tree2: The second Tree object to iterate, or None. 

102 

103 Returns: 

104 A list of pairs of TreeEntry objects for each pair of entries in 

105 the trees. If an entry exists in one tree but not the other, the other 

106 entry will be None. If both entries exist, they are guaranteed to match. 

107 """ 

108 entries1 = _tree_entries(path, tree1) 

109 entries2 = _tree_entries(path, tree2) 

110 i1 = i2 = 0 

111 len1 = len(entries1) 

112 len2 = len(entries2) 

113 

114 result: list[tuple[Optional[TreeEntry], Optional[TreeEntry]]] = [] 

115 while i1 < len1 and i2 < len2: 

116 entry1 = entries1[i1] 

117 entry2 = entries2[i2] 

118 if entry1.path < entry2.path: 

119 result.append((entry1, None)) 

120 i1 += 1 

121 elif entry1.path > entry2.path: 

122 result.append((None, entry2)) 

123 i2 += 1 

124 else: 

125 result.append((entry1, entry2)) 

126 i1 += 1 

127 i2 += 1 

128 for i in range(i1, len1): 

129 result.append((entries1[i], None)) 

130 for i in range(i2, len2): 

131 result.append((None, entries2[i])) 

132 return result 

133 

134 

135def _is_tree(entry: Optional[TreeEntry]) -> bool: 

136 if entry is None: 

137 return False 

138 return stat.S_ISDIR(entry.mode) 

139 

140 

141def walk_trees( 

142 store: BaseObjectStore, 

143 tree1_id: Optional[ObjectID], 

144 tree2_id: Optional[ObjectID], 

145 prune_identical: bool = False, 

146 paths: Optional[list[bytes]] = None, 

147) -> Iterator[tuple[Optional[TreeEntry], Optional[TreeEntry]]]: 

148 """Recursively walk all the entries of two trees. 

149 

150 Iteration is depth-first pre-order, as in e.g. os.walk. 

151 

152 Args: 

153 store: An ObjectStore for looking up objects. 

154 tree1_id: The SHA of the first Tree object to iterate, or None. 

155 tree2_id: The SHA of the second Tree object to iterate, or None. 

156 prune_identical: If True, identical subtrees will not be walked. 

157 paths: Optional list of paths to filter to (as bytes). 

158 

159 Returns: 

160 Iterator over Pairs of TreeEntry objects for each pair of entries 

161 in the trees and their subtrees recursively. If an entry exists in one 

162 tree but not the other, the other entry will be None. If both entries 

163 exist, they are guaranteed to match. 

164 """ 

165 # This could be fairly easily generalized to >2 trees if we find a use 

166 # case. 

167 entry1 = TreeEntry(b"", stat.S_IFDIR, tree1_id) if tree1_id else None 

168 entry2 = TreeEntry(b"", stat.S_IFDIR, tree2_id) if tree2_id else None 

169 todo: list[tuple[Optional[TreeEntry], Optional[TreeEntry]]] = [(entry1, entry2)] 

170 while todo: 

171 entry1, entry2 = todo.pop() 

172 is_tree1 = _is_tree(entry1) 

173 is_tree2 = _is_tree(entry2) 

174 if prune_identical and is_tree1 and is_tree2 and entry1 == entry2: 

175 continue 

176 

177 tree1 = (is_tree1 and entry1 and store[entry1.sha]) or None 

178 tree2 = (is_tree2 and entry2 and store[entry2.sha]) or None 

179 path = ( 

180 (entry1.path if entry1 else None) 

181 or (entry2.path if entry2 else None) 

182 or b"" 

183 ) 

184 

185 # If we have path filters, check if we should process this tree 

186 if paths is not None and (is_tree1 or is_tree2): 

187 # Special case for root tree 

188 if path == b"": 

189 should_recurse = True 

190 else: 

191 # Check if any of our filter paths could be under this tree 

192 should_recurse = False 

193 for filter_path in paths: 

194 if filter_path == path: 

195 # Exact match - we want this directory itself 

196 should_recurse = True 

197 break 

198 elif filter_path.startswith(path + b"/"): 

199 # Filter path is under this directory 

200 should_recurse = True 

201 break 

202 elif path.startswith(filter_path + b"/"): 

203 # This directory is under a filter path 

204 should_recurse = True 

205 break 

206 if not should_recurse: 

207 # Skip this tree entirely 

208 continue 

209 

210 # Ensure trees are Tree objects before merging 

211 if tree1 is not None and not isinstance(tree1, Tree): 

212 tree1 = None 

213 if tree2 is not None and not isinstance(tree2, Tree): 

214 tree2 = None 

215 

216 if tree1 is not None or tree2 is not None: 

217 # Use empty trees for None values 

218 if tree1 is None: 

219 tree1 = Tree() 

220 if tree2 is None: 

221 tree2 = Tree() 

222 todo.extend(reversed(_merge_entries(path, tree1, tree2))) 

223 

224 # Only yield entries that match our path filters 

225 if paths is None: 

226 yield entry1, entry2 

227 else: 

228 # Check if this entry matches any of our filters 

229 for filter_path in paths: 

230 if path == filter_path: 

231 # Exact match 

232 yield entry1, entry2 

233 break 

234 elif path.startswith(filter_path + b"/"): 

235 # This entry is under a filter directory 

236 yield entry1, entry2 

237 break 

238 elif filter_path.startswith(path + b"/") and (is_tree1 or is_tree2): 

239 # This is a parent directory of a filter path 

240 yield entry1, entry2 

241 break 

242 

243 

244def _skip_tree(entry: Optional[TreeEntry], include_trees: bool) -> Optional[TreeEntry]: 

245 if entry is None or (not include_trees and stat.S_ISDIR(entry.mode)): 

246 return None 

247 return entry 

248 

249 

250def tree_changes( 

251 store: BaseObjectStore, 

252 tree1_id: Optional[ObjectID], 

253 tree2_id: Optional[ObjectID], 

254 want_unchanged: bool = False, 

255 rename_detector: Optional["RenameDetector"] = None, 

256 include_trees: bool = False, 

257 change_type_same: bool = False, 

258 paths: Optional[list[bytes]] = None, 

259) -> Iterator[TreeChange]: 

260 """Find the differences between the contents of two trees. 

261 

262 Args: 

263 store: An ObjectStore for looking up objects. 

264 tree1_id: The SHA of the source tree. 

265 tree2_id: The SHA of the target tree. 

266 want_unchanged: If True, include TreeChanges for unmodified entries 

267 as well. 

268 include_trees: Whether to include trees 

269 rename_detector: RenameDetector object for detecting renames. 

270 change_type_same: Whether to report change types in the same 

271 entry or as delete+add. 

272 paths: Optional list of paths to filter to (as bytes). 

273 

274 Returns: 

275 Iterator over TreeChange instances for each change between the 

276 source and target tree. 

277 """ 

278 if rename_detector is not None and tree1_id is not None and tree2_id is not None: 

279 yield from rename_detector.changes_with_renames( 

280 tree1_id, 

281 tree2_id, 

282 want_unchanged=want_unchanged, 

283 include_trees=include_trees, 

284 ) 

285 return 

286 

287 entries = walk_trees( 

288 store, tree1_id, tree2_id, prune_identical=(not want_unchanged), paths=paths 

289 ) 

290 for entry1, entry2 in entries: 

291 if entry1 == entry2 and not want_unchanged: 

292 continue 

293 

294 # Treat entries for trees as missing. 

295 entry1 = _skip_tree(entry1, include_trees) 

296 entry2 = _skip_tree(entry2, include_trees) 

297 

298 if entry1 is not None and entry2 is not None: 

299 if ( 

300 stat.S_IFMT(entry1.mode) != stat.S_IFMT(entry2.mode) 

301 and not change_type_same 

302 ): 

303 # File type changed: report as delete/add. 

304 yield TreeChange.delete(entry1) 

305 entry1 = None 

306 change_type = CHANGE_ADD 

307 elif entry1 == entry2: 

308 change_type = CHANGE_UNCHANGED 

309 else: 

310 change_type = CHANGE_MODIFY 

311 elif entry1 is not None: 

312 change_type = CHANGE_DELETE 

313 elif entry2 is not None: 

314 change_type = CHANGE_ADD 

315 else: 

316 # Both were None because at least one was a tree. 

317 continue 

318 yield TreeChange(change_type, entry1, entry2) 

319 

320 

321T = TypeVar("T") 

322U = TypeVar("U") 

323 

324 

325def _all_eq(seq: list[T], key: Callable[[T], U], value: U) -> bool: 

326 for e in seq: 

327 if key(e) != value: 

328 return False 

329 return True 

330 

331 

332def _all_same(seq: list[Any], key: Callable[[Any], Any]) -> bool: 

333 return _all_eq(seq[1:], key, key(seq[0])) 

334 

335 

336def tree_changes_for_merge( 

337 store: BaseObjectStore, 

338 parent_tree_ids: list[ObjectID], 

339 tree_id: ObjectID, 

340 rename_detector: Optional["RenameDetector"] = None, 

341) -> Iterator[list[Optional[TreeChange]]]: 

342 """Get the tree changes for a merge tree relative to all its parents. 

343 

344 Args: 

345 store: An ObjectStore for looking up objects. 

346 parent_tree_ids: An iterable of the SHAs of the parent trees. 

347 tree_id: The SHA of the merge tree. 

348 rename_detector: RenameDetector object for detecting renames. 

349 

350 Returns: 

351 Iterator over lists of TreeChange objects, one per conflicted path 

352 in the merge. 

353 

354 Each list contains one element per parent, with the TreeChange for that 

355 path relative to that parent. An element may be None if it never 

356 existed in one parent and was deleted in two others. 

357 

358 A path is only included in the output if it is a conflict, i.e. its SHA 

359 in the merge tree is not found in any of the parents, or in the case of 

360 deletes, if not all of the old SHAs match. 

361 """ 

362 all_parent_changes = [ 

363 tree_changes(store, t, tree_id, rename_detector=rename_detector) 

364 for t in parent_tree_ids 

365 ] 

366 num_parents = len(parent_tree_ids) 

367 changes_by_path: dict[bytes, list[Optional[TreeChange]]] = defaultdict( 

368 lambda: [None] * num_parents 

369 ) 

370 

371 # Organize by path. 

372 for i, parent_changes in enumerate(all_parent_changes): 

373 for change in parent_changes: 

374 if change.type == CHANGE_DELETE: 

375 assert change.old is not None 

376 path = change.old.path 

377 else: 

378 assert change.new is not None 

379 path = change.new.path 

380 changes_by_path[path][i] = change 

381 

382 def old_sha(c: TreeChange) -> Optional[ObjectID]: 

383 return c.old.sha if c.old is not None else None 

384 

385 def change_type(c: TreeChange) -> str: 

386 return c.type 

387 

388 # Yield only conflicting changes. 

389 for _, changes in sorted(changes_by_path.items()): 

390 assert len(changes) == num_parents 

391 have = [c for c in changes if c is not None] 

392 if _all_eq(have, change_type, CHANGE_DELETE): 

393 if not _all_same(have, old_sha): 

394 yield changes 

395 elif not _all_same(have, change_type): 

396 yield changes 

397 elif None not in changes: 

398 # If no change was found relative to one parent, that means the SHA 

399 # must have matched the SHA in that parent, so it is not a 

400 # conflict. 

401 yield changes 

402 

403 

404_BLOCK_SIZE = 64 

405 

406 

407def _count_blocks(obj: ShaFile) -> dict[int, int]: 

408 """Count the blocks in an object. 

409 

410 Splits the data into blocks either on lines or <=64-byte chunks of lines. 

411 

412 Args: 

413 obj: The object to count blocks for. 

414 

415 Returns: 

416 A dict of block hashcode -> total bytes occurring. 

417 """ 

418 block_counts: dict[int, int] = defaultdict(int) 

419 block = BytesIO() 

420 n = 0 

421 

422 # Cache attrs as locals to avoid expensive lookups in the inner loop. 

423 block_write = block.write 

424 block_seek = block.seek 

425 block_truncate = block.truncate 

426 block_getvalue = block.getvalue 

427 

428 for c in chain.from_iterable(obj.as_raw_chunks()): 

429 cb = c.to_bytes(1, "big") 

430 block_write(cb) 

431 n += 1 

432 if cb == b"\n" or n == _BLOCK_SIZE: 

433 value = block_getvalue() 

434 block_counts[hash(value)] += len(value) 

435 block_seek(0) 

436 block_truncate() 

437 n = 0 

438 if n > 0: 

439 last_block = block_getvalue() 

440 block_counts[hash(last_block)] += len(last_block) 

441 return block_counts 

442 

443 

444def _common_bytes(blocks1: dict[int, int], blocks2: dict[int, int]) -> int: 

445 """Count the number of common bytes in two block count dicts. 

446 

447 Args: 

448 blocks1: The first dict of block hashcode -> total bytes. 

449 blocks2: The second dict of block hashcode -> total bytes. 

450 

451 Returns: 

452 The number of bytes in common between blocks1 and blocks2. This is 

453 only approximate due to possible hash collisions. 

454 """ 

455 # Iterate over the smaller of the two dicts, since this is symmetrical. 

456 if len(blocks1) > len(blocks2): 

457 blocks1, blocks2 = blocks2, blocks1 

458 score = 0 

459 for block, count1 in blocks1.items(): 

460 count2 = blocks2.get(block) 

461 if count2: 

462 score += min(count1, count2) 

463 return score 

464 

465 

466def _similarity_score( 

467 obj1: ShaFile, 

468 obj2: ShaFile, 

469 block_cache: Optional[dict[ObjectID, dict[int, int]]] = None, 

470) -> int: 

471 """Compute a similarity score for two objects. 

472 

473 Args: 

474 obj1: The first object to score. 

475 obj2: The second object to score. 

476 block_cache: An optional dict of SHA to block counts to cache 

477 results between calls. 

478 

479 Returns: 

480 The similarity score between the two objects, defined as the 

481 number of bytes in common between the two objects divided by the 

482 maximum size, scaled to the range 0-100. 

483 """ 

484 if block_cache is None: 

485 block_cache = {} 

486 if obj1.id not in block_cache: 

487 block_cache[obj1.id] = _count_blocks(obj1) 

488 if obj2.id not in block_cache: 

489 block_cache[obj2.id] = _count_blocks(obj2) 

490 

491 common_bytes = _common_bytes(block_cache[obj1.id], block_cache[obj2.id]) 

492 max_size = max(obj1.raw_length(), obj2.raw_length()) 

493 if not max_size: 

494 return _MAX_SCORE 

495 return int(float(common_bytes) * _MAX_SCORE / max_size) 

496 

497 

498def _tree_change_key(entry: TreeChange) -> tuple[bytes, bytes]: 

499 # Sort by old path then new path. If only one exists, use it for both keys. 

500 path1 = entry.old.path if entry.old is not None else None 

501 path2 = entry.new.path if entry.new is not None else None 

502 if path1 is None: 

503 path1 = path2 

504 if path2 is None: 

505 path2 = path1 

506 assert path1 is not None and path2 is not None 

507 return (path1, path2) 

508 

509 

510class RenameDetector: 

511 """Object for handling rename detection between two trees.""" 

512 

513 _adds: list[TreeChange] 

514 _deletes: list[TreeChange] 

515 _changes: list[TreeChange] 

516 _candidates: list[tuple[int, TreeChange]] 

517 

518 def __init__( 

519 self, 

520 store: BaseObjectStore, 

521 rename_threshold: int = RENAME_THRESHOLD, 

522 max_files: Optional[int] = MAX_FILES, 

523 rewrite_threshold: Optional[int] = REWRITE_THRESHOLD, 

524 find_copies_harder: bool = False, 

525 ) -> None: 

526 """Initialize the rename detector. 

527 

528 Args: 

529 store: An ObjectStore for looking up objects. 

530 rename_threshold: The threshold similarity score for considering 

531 an add/delete pair to be a rename/copy; see _similarity_score. 

532 max_files: The maximum number of adds and deletes to consider, 

533 or None for no limit. The detector is guaranteed to compare no more 

534 than max_files ** 2 add/delete pairs. This limit is provided 

535 because rename detection can be quadratic in the project size. If 

536 the limit is exceeded, no content rename detection is attempted. 

537 rewrite_threshold: The threshold similarity score below which a 

538 modify should be considered a delete/add, or None to not break 

539 modifies; see _similarity_score. 

540 find_copies_harder: If True, consider unmodified files when 

541 detecting copies. 

542 """ 

543 self._store = store 

544 self._rename_threshold = rename_threshold 

545 self._rewrite_threshold = rewrite_threshold 

546 self._max_files = max_files 

547 self._find_copies_harder = find_copies_harder 

548 self._want_unchanged = False 

549 

550 def _reset(self) -> None: 

551 self._adds = [] 

552 self._deletes = [] 

553 self._changes = [] 

554 

555 def _should_split(self, change: TreeChange) -> bool: 

556 if self._rewrite_threshold is None or change.type != CHANGE_MODIFY: 

557 return False 

558 assert change.old is not None and change.new is not None 

559 if change.old.sha == change.new.sha: 

560 return False 

561 old_obj = self._store[change.old.sha] 

562 new_obj = self._store[change.new.sha] 

563 return _similarity_score(old_obj, new_obj) < self._rewrite_threshold 

564 

565 def _add_change(self, change: TreeChange) -> None: 

566 if change.type == CHANGE_ADD: 

567 self._adds.append(change) 

568 elif change.type == CHANGE_DELETE: 

569 self._deletes.append(change) 

570 elif self._should_split(change): 

571 assert change.old is not None and change.new is not None 

572 self._deletes.append(TreeChange.delete(change.old)) 

573 self._adds.append(TreeChange.add(change.new)) 

574 elif ( 

575 self._find_copies_harder and change.type == CHANGE_UNCHANGED 

576 ) or change.type == CHANGE_MODIFY: 

577 # Treat all modifies as potential deletes for rename detection, 

578 # but don't split them (to avoid spurious renames). Setting 

579 # find_copies_harder means we treat unchanged the same as 

580 # modified. 

581 self._deletes.append(change) 

582 else: 

583 self._changes.append(change) 

584 

585 def _collect_changes( 

586 self, tree1_id: Optional[ObjectID], tree2_id: Optional[ObjectID] 

587 ) -> None: 

588 want_unchanged = self._find_copies_harder or self._want_unchanged 

589 for change in tree_changes( 

590 self._store, 

591 tree1_id, 

592 tree2_id, 

593 want_unchanged=want_unchanged, 

594 include_trees=self._include_trees, 

595 ): 

596 self._add_change(change) 

597 

598 def _prune(self, add_paths: set[bytes], delete_paths: set[bytes]) -> None: 

599 def check_add(a: TreeChange) -> bool: 

600 assert a.new is not None 

601 return a.new.path not in add_paths 

602 

603 def check_delete(d: TreeChange) -> bool: 

604 assert d.old is not None 

605 return d.old.path not in delete_paths 

606 

607 self._adds = [a for a in self._adds if check_add(a)] 

608 self._deletes = [d for d in self._deletes if check_delete(d)] 

609 

610 def _find_exact_renames(self) -> None: 

611 add_map = defaultdict(list) 

612 for add in self._adds: 

613 assert add.new is not None 

614 add_map[add.new.sha].append(add.new) 

615 delete_map = defaultdict(list) 

616 for delete in self._deletes: 

617 # Keep track of whether the delete was actually marked as a delete. 

618 # If not, it needs to be marked as a copy. 

619 is_delete = delete.type == CHANGE_DELETE 

620 assert delete.old is not None 

621 delete_map[delete.old.sha].append((delete.old, is_delete)) 

622 

623 add_paths = set() 

624 delete_paths = set() 

625 for sha, sha_deletes in delete_map.items(): 

626 sha_adds = add_map[sha] 

627 for (old, is_delete), new in zip(sha_deletes, sha_adds): 

628 if stat.S_IFMT(old.mode) != stat.S_IFMT(new.mode): 

629 continue 

630 if is_delete: 

631 delete_paths.add(old.path) 

632 add_paths.add(new.path) 

633 new_type = (is_delete and CHANGE_RENAME) or CHANGE_COPY 

634 self._changes.append(TreeChange(new_type, old, new)) 

635 

636 num_extra_adds = len(sha_adds) - len(sha_deletes) 

637 # TODO(dborowitz): Less arbitrary way of dealing with extra copies. 

638 old = sha_deletes[0][0] 

639 if num_extra_adds > 0: 

640 for new in sha_adds[-num_extra_adds:]: 

641 add_paths.add(new.path) 

642 self._changes.append(TreeChange(CHANGE_COPY, old, new)) 

643 self._prune(add_paths, delete_paths) 

644 

645 def _should_find_content_renames(self) -> bool: 

646 if self._max_files is None: 

647 return True 

648 return len(self._adds) * len(self._deletes) <= self._max_files**2 

649 

650 def _rename_type( 

651 self, check_paths: bool, delete: TreeChange, add: TreeChange 

652 ) -> str: 

653 assert delete.old is not None and add.new is not None 

654 if check_paths and delete.old.path == add.new.path: 

655 # If the paths match, this must be a split modify, so make sure it 

656 # comes out as a modify. 

657 return CHANGE_MODIFY 

658 elif delete.type != CHANGE_DELETE: 

659 # If it's in deletes but not marked as a delete, it must have been 

660 # added due to find_copies_harder, and needs to be marked as a 

661 # copy. 

662 return CHANGE_COPY 

663 return CHANGE_RENAME 

664 

665 def _find_content_rename_candidates(self) -> None: 

666 candidates = self._candidates = [] 

667 # TODO: Optimizations: 

668 # - Compare object sizes before counting blocks. 

669 # - Skip if delete's S_IFMT differs from all adds. 

670 # - Skip if adds or deletes is empty. 

671 # Match C git's behavior of not attempting to find content renames if 

672 # the matrix size exceeds the threshold. 

673 if not self._should_find_content_renames(): 

674 return 

675 

676 block_cache = {} 

677 check_paths = self._rename_threshold is not None 

678 for delete in self._deletes: 

679 assert delete.old is not None 

680 if S_ISGITLINK(delete.old.mode): 

681 continue # Git links don't exist in this repo. 

682 old_sha = delete.old.sha 

683 old_obj = self._store[old_sha] 

684 block_cache[old_sha] = _count_blocks(old_obj) 

685 for add in self._adds: 

686 assert add.new is not None 

687 if stat.S_IFMT(delete.old.mode) != stat.S_IFMT(add.new.mode): 

688 continue 

689 new_obj = self._store[add.new.sha] 

690 score = _similarity_score(old_obj, new_obj, block_cache=block_cache) 

691 if score > self._rename_threshold: 

692 new_type = self._rename_type(check_paths, delete, add) 

693 rename = TreeChange(new_type, delete.old, add.new) 

694 candidates.append((-score, rename)) 

695 

696 def _choose_content_renames(self) -> None: 

697 # Sort scores from highest to lowest, but keep names in ascending 

698 # order. 

699 self._candidates.sort() 

700 

701 delete_paths = set() 

702 add_paths = set() 

703 for _, change in self._candidates: 

704 assert change.old is not None and change.new is not None 

705 new_path = change.new.path 

706 if new_path in add_paths: 

707 continue 

708 old_path = change.old.path 

709 orig_type = change.type 

710 if old_path in delete_paths: 

711 change = TreeChange(CHANGE_COPY, change.old, change.new) 

712 

713 # If the candidate was originally a copy, that means it came from a 

714 # modified or unchanged path, so we don't want to prune it. 

715 if orig_type != CHANGE_COPY: 

716 delete_paths.add(old_path) 

717 add_paths.add(new_path) 

718 self._changes.append(change) 

719 self._prune(add_paths, delete_paths) 

720 

721 def _join_modifies(self) -> None: 

722 if self._rewrite_threshold is None: 

723 return 

724 

725 modifies = {} 

726 delete_map = {} 

727 for d in self._deletes: 

728 assert d.old is not None 

729 delete_map[d.old.path] = d 

730 for add in self._adds: 

731 assert add.new is not None 

732 path = add.new.path 

733 delete = delete_map.get(path) 

734 if delete is not None: 

735 assert delete.old is not None 

736 if stat.S_IFMT(delete.old.mode) == stat.S_IFMT(add.new.mode): 

737 modifies[path] = TreeChange(CHANGE_MODIFY, delete.old, add.new) 

738 

739 def check_add_mod(a: TreeChange) -> bool: 

740 assert a.new is not None 

741 return a.new.path not in modifies 

742 

743 def check_delete_mod(d: TreeChange) -> bool: 

744 assert d.old is not None 

745 return d.old.path not in modifies 

746 

747 self._adds = [a for a in self._adds if check_add_mod(a)] 

748 self._deletes = [d for d in self._deletes if check_delete_mod(d)] 

749 self._changes += modifies.values() 

750 

751 def _sorted_changes(self) -> list[TreeChange]: 

752 result = [] 

753 result.extend(self._adds) 

754 result.extend(self._deletes) 

755 result.extend(self._changes) 

756 result.sort(key=_tree_change_key) 

757 return result 

758 

759 def _prune_unchanged(self) -> None: 

760 if self._want_unchanged: 

761 return 

762 self._deletes = [d for d in self._deletes if d.type != CHANGE_UNCHANGED] 

763 

764 def changes_with_renames( 

765 self, 

766 tree1_id: Optional[ObjectID], 

767 tree2_id: Optional[ObjectID], 

768 want_unchanged: bool = False, 

769 include_trees: bool = False, 

770 ) -> list[TreeChange]: 

771 """Iterate TreeChanges between two tree SHAs, with rename detection.""" 

772 self._reset() 

773 self._want_unchanged = want_unchanged 

774 self._include_trees = include_trees 

775 self._collect_changes(tree1_id, tree2_id) 

776 self._find_exact_renames() 

777 self._find_content_rename_candidates() 

778 self._choose_content_renames() 

779 self._join_modifies() 

780 self._prune_unchanged() 

781 return self._sorted_changes() 

782 

783 

784# Hold on to the pure-python implementations for testing. 

785_is_tree_py = _is_tree 

786_merge_entries_py = _merge_entries 

787_count_blocks_py = _count_blocks 

788 

789if TYPE_CHECKING: 

790 # For type checking, use the Python implementations 

791 pass 

792else: 

793 # At runtime, try to import Rust extensions 

794 try: 

795 # Try to import Rust versions 

796 from dulwich._diff_tree import ( 

797 _count_blocks as _rust_count_blocks, 

798 ) 

799 from dulwich._diff_tree import ( 

800 _is_tree as _rust_is_tree, 

801 ) 

802 from dulwich._diff_tree import ( 

803 _merge_entries as _rust_merge_entries, 

804 ) 

805 

806 # Override with Rust versions 

807 _count_blocks = _rust_count_blocks 

808 _is_tree = _rust_is_tree 

809 _merge_entries = _rust_merge_entries 

810 except ImportError: 

811 pass