Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/dulwich/diff_tree.py: 22%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

393 statements  

1# diff_tree.py -- Utilities for diffing files and trees. 

2# Copyright (C) 2010 Google, Inc. 

3# 

4# SPDX-License-Identifier: Apache-2.0 OR GPL-2.0-or-later 

5# Dulwich is dual-licensed under the Apache License, Version 2.0 and the GNU 

6# General Public License as published by the Free Software Foundation; version 2.0 

7# or (at your option) any later version. You can redistribute it and/or 

8# modify it under the terms of either of these two licenses. 

9# 

10# Unless required by applicable law or agreed to in writing, software 

11# distributed under the License is distributed on an "AS IS" BASIS, 

12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

13# See the License for the specific language governing permissions and 

14# limitations under the License. 

15# 

16# You should have received a copy of the licenses; if not, see 

17# <http://www.gnu.org/licenses/> for a copy of the GNU General Public License 

18# and <http://www.apache.org/licenses/LICENSE-2.0> for a copy of the Apache 

19# License, Version 2.0. 

20# 

21 

22"""Utilities for diffing files and trees.""" 

23 

24import stat 

25from collections import defaultdict, namedtuple 

26from collections.abc import Iterator 

27from io import BytesIO 

28from itertools import chain 

29from typing import TYPE_CHECKING, Any, Callable, Optional, TypeVar 

30 

31from .object_store import BaseObjectStore 

32from .objects import S_ISGITLINK, ObjectID, ShaFile, Tree, TreeEntry 

33 

34# TreeChange type constants. 

35CHANGE_ADD = "add" 

36CHANGE_MODIFY = "modify" 

37CHANGE_DELETE = "delete" 

38CHANGE_RENAME = "rename" 

39CHANGE_COPY = "copy" 

40CHANGE_UNCHANGED = "unchanged" 

41 

42RENAME_CHANGE_TYPES = (CHANGE_RENAME, CHANGE_COPY) 

43 

44_NULL_ENTRY = TreeEntry(None, None, None) 

45 

46_MAX_SCORE = 100 

47RENAME_THRESHOLD = 60 

48MAX_FILES = 200 

49REWRITE_THRESHOLD: Optional[int] = None 

50 

51 

52class TreeChange(namedtuple("TreeChange", ["type", "old", "new"])): 

53 """Named tuple a single change between two trees.""" 

54 

55 @classmethod 

56 def add(cls, new: TreeEntry) -> "TreeChange": 

57 """Create a TreeChange for an added entry. 

58 

59 Args: 

60 new: New tree entry 

61 

62 Returns: 

63 TreeChange instance 

64 """ 

65 return cls(CHANGE_ADD, _NULL_ENTRY, new) 

66 

67 @classmethod 

68 def delete(cls, old: TreeEntry) -> "TreeChange": 

69 """Create a TreeChange for a deleted entry. 

70 

71 Args: 

72 old: Old tree entry 

73 

74 Returns: 

75 TreeChange instance 

76 """ 

77 return cls(CHANGE_DELETE, old, _NULL_ENTRY) 

78 

79 

80def _tree_entries(path: bytes, tree: Tree) -> list[TreeEntry]: 

81 result: list[TreeEntry] = [] 

82 if not tree: 

83 return result 

84 for entry in tree.iteritems(name_order=True): 

85 result.append(entry.in_path(path)) 

86 return result 

87 

88 

89def _merge_entries( 

90 path: bytes, tree1: Tree, tree2: Tree 

91) -> list[tuple[TreeEntry, TreeEntry]]: 

92 """Merge the entries of two trees. 

93 

94 Args: 

95 path: A path to prepend to all tree entry names. 

96 tree1: The first Tree object to iterate, or None. 

97 tree2: The second Tree object to iterate, or None. 

98 

99 Returns: 

100 A list of pairs of TreeEntry objects for each pair of entries in 

101 the trees. If an entry exists in one tree but not the other, the other 

102 entry will have all attributes set to None. If neither entry's path is 

103 None, they are guaranteed to match. 

104 """ 

105 entries1 = _tree_entries(path, tree1) 

106 entries2 = _tree_entries(path, tree2) 

107 i1 = i2 = 0 

108 len1 = len(entries1) 

109 len2 = len(entries2) 

110 

111 result = [] 

112 while i1 < len1 and i2 < len2: 

113 entry1 = entries1[i1] 

114 entry2 = entries2[i2] 

115 if entry1.path < entry2.path: 

116 result.append((entry1, _NULL_ENTRY)) 

117 i1 += 1 

118 elif entry1.path > entry2.path: 

119 result.append((_NULL_ENTRY, entry2)) 

120 i2 += 1 

121 else: 

122 result.append((entry1, entry2)) 

123 i1 += 1 

124 i2 += 1 

125 for i in range(i1, len1): 

126 result.append((entries1[i], _NULL_ENTRY)) 

127 for i in range(i2, len2): 

128 result.append((_NULL_ENTRY, entries2[i])) 

129 return result 

130 

131 

132def _is_tree(entry: TreeEntry) -> bool: 

133 mode = entry.mode 

134 if mode is None: 

135 return False 

136 return stat.S_ISDIR(mode) 

137 

138 

139def walk_trees( 

140 store: BaseObjectStore, 

141 tree1_id: Optional[ObjectID], 

142 tree2_id: Optional[ObjectID], 

143 prune_identical: bool = False, 

144 paths: Optional[list[bytes]] = None, 

145) -> Iterator[tuple[TreeEntry, TreeEntry]]: 

146 """Recursively walk all the entries of two trees. 

147 

148 Iteration is depth-first pre-order, as in e.g. os.walk. 

149 

150 Args: 

151 store: An ObjectStore for looking up objects. 

152 tree1_id: The SHA of the first Tree object to iterate, or None. 

153 tree2_id: The SHA of the second Tree object to iterate, or None. 

154 prune_identical: If True, identical subtrees will not be walked. 

155 paths: Optional list of paths to filter to (as bytes). 

156 

157 Returns: 

158 Iterator over Pairs of TreeEntry objects for each pair of entries 

159 in the trees and their subtrees recursively. If an entry exists in one 

160 tree but not the other, the other entry will have all attributes set 

161 to None. If neither entry's path is None, they are guaranteed to 

162 match. 

163 """ 

164 # This could be fairly easily generalized to >2 trees if we find a use 

165 # case. 

166 mode1 = (tree1_id and stat.S_IFDIR) or None 

167 mode2 = (tree2_id and stat.S_IFDIR) or None 

168 todo = [(TreeEntry(b"", mode1, tree1_id), TreeEntry(b"", mode2, tree2_id))] 

169 while todo: 

170 entry1, entry2 = todo.pop() 

171 is_tree1 = _is_tree(entry1) 

172 is_tree2 = _is_tree(entry2) 

173 if prune_identical and is_tree1 and is_tree2 and entry1 == entry2: 

174 continue 

175 

176 tree1 = (is_tree1 and store[entry1.sha]) or None 

177 tree2 = (is_tree2 and store[entry2.sha]) or None 

178 path = entry1.path or entry2.path 

179 

180 # If we have path filters, check if we should process this tree 

181 if paths is not None and (is_tree1 or is_tree2): 

182 # Special case for root tree 

183 if path == b"": 

184 should_recurse = True 

185 else: 

186 # Check if any of our filter paths could be under this tree 

187 should_recurse = False 

188 for filter_path in paths: 

189 if filter_path == path: 

190 # Exact match - we want this directory itself 

191 should_recurse = True 

192 break 

193 elif filter_path.startswith(path + b"/"): 

194 # Filter path is under this directory 

195 should_recurse = True 

196 break 

197 elif path.startswith(filter_path + b"/"): 

198 # This directory is under a filter path 

199 should_recurse = True 

200 break 

201 if not should_recurse: 

202 # Skip this tree entirely 

203 continue 

204 

205 # Ensure trees are Tree objects before merging 

206 if tree1 is not None and not isinstance(tree1, Tree): 

207 tree1 = None 

208 if tree2 is not None and not isinstance(tree2, Tree): 

209 tree2 = None 

210 

211 if tree1 is not None or tree2 is not None: 

212 # Use empty trees for None values 

213 if tree1 is None: 

214 tree1 = Tree() 

215 if tree2 is None: 

216 tree2 = Tree() 

217 todo.extend(reversed(_merge_entries(path, tree1, tree2))) 

218 

219 # Only yield entries that match our path filters 

220 if paths is None: 

221 yield entry1, entry2 

222 else: 

223 # Check if this entry matches any of our filters 

224 for filter_path in paths: 

225 if path == filter_path: 

226 # Exact match 

227 yield entry1, entry2 

228 break 

229 elif path.startswith(filter_path + b"/"): 

230 # This entry is under a filter directory 

231 yield entry1, entry2 

232 break 

233 elif filter_path.startswith(path + b"/") and (is_tree1 or is_tree2): 

234 # This is a parent directory of a filter path 

235 yield entry1, entry2 

236 break 

237 

238 

239def _skip_tree(entry: TreeEntry, include_trees: bool) -> TreeEntry: 

240 if entry.mode is None or (not include_trees and stat.S_ISDIR(entry.mode)): 

241 return _NULL_ENTRY 

242 return entry 

243 

244 

245def tree_changes( 

246 store: BaseObjectStore, 

247 tree1_id: Optional[ObjectID], 

248 tree2_id: Optional[ObjectID], 

249 want_unchanged: bool = False, 

250 rename_detector: Optional["RenameDetector"] = None, 

251 include_trees: bool = False, 

252 change_type_same: bool = False, 

253 paths: Optional[list[bytes]] = None, 

254) -> Iterator[TreeChange]: 

255 """Find the differences between the contents of two trees. 

256 

257 Args: 

258 store: An ObjectStore for looking up objects. 

259 tree1_id: The SHA of the source tree. 

260 tree2_id: The SHA of the target tree. 

261 want_unchanged: If True, include TreeChanges for unmodified entries 

262 as well. 

263 include_trees: Whether to include trees 

264 rename_detector: RenameDetector object for detecting renames. 

265 change_type_same: Whether to report change types in the same 

266 entry or as delete+add. 

267 paths: Optional list of paths to filter to (as bytes). 

268 

269 Returns: 

270 Iterator over TreeChange instances for each change between the 

271 source and target tree. 

272 """ 

273 if rename_detector is not None and tree1_id is not None and tree2_id is not None: 

274 yield from rename_detector.changes_with_renames( 

275 tree1_id, 

276 tree2_id, 

277 want_unchanged=want_unchanged, 

278 include_trees=include_trees, 

279 ) 

280 return 

281 

282 entries = walk_trees( 

283 store, tree1_id, tree2_id, prune_identical=(not want_unchanged), paths=paths 

284 ) 

285 for entry1, entry2 in entries: 

286 if entry1 == entry2 and not want_unchanged: 

287 continue 

288 

289 # Treat entries for trees as missing. 

290 entry1 = _skip_tree(entry1, include_trees) 

291 entry2 = _skip_tree(entry2, include_trees) 

292 

293 if entry1 != _NULL_ENTRY and entry2 != _NULL_ENTRY: 

294 if ( 

295 stat.S_IFMT(entry1.mode) != stat.S_IFMT(entry2.mode) 

296 and not change_type_same 

297 ): 

298 # File type changed: report as delete/add. 

299 yield TreeChange.delete(entry1) 

300 entry1 = _NULL_ENTRY 

301 change_type = CHANGE_ADD 

302 elif entry1 == entry2: 

303 change_type = CHANGE_UNCHANGED 

304 else: 

305 change_type = CHANGE_MODIFY 

306 elif entry1 != _NULL_ENTRY: 

307 change_type = CHANGE_DELETE 

308 elif entry2 != _NULL_ENTRY: 

309 change_type = CHANGE_ADD 

310 else: 

311 # Both were None because at least one was a tree. 

312 continue 

313 yield TreeChange(change_type, entry1, entry2) 

314 

315 

316T = TypeVar("T") 

317U = TypeVar("U") 

318 

319 

320def _all_eq(seq: list[T], key: Callable[[T], U], value: U) -> bool: 

321 for e in seq: 

322 if key(e) != value: 

323 return False 

324 return True 

325 

326 

327def _all_same(seq: list[Any], key: Callable[[Any], Any]) -> bool: 

328 return _all_eq(seq[1:], key, key(seq[0])) 

329 

330 

331def tree_changes_for_merge( 

332 store: BaseObjectStore, 

333 parent_tree_ids: list[ObjectID], 

334 tree_id: ObjectID, 

335 rename_detector: Optional["RenameDetector"] = None, 

336) -> Iterator[list[Optional[TreeChange]]]: 

337 """Get the tree changes for a merge tree relative to all its parents. 

338 

339 Args: 

340 store: An ObjectStore for looking up objects. 

341 parent_tree_ids: An iterable of the SHAs of the parent trees. 

342 tree_id: The SHA of the merge tree. 

343 rename_detector: RenameDetector object for detecting renames. 

344 

345 Returns: 

346 Iterator over lists of TreeChange objects, one per conflicted path 

347 in the merge. 

348 

349 Each list contains one element per parent, with the TreeChange for that 

350 path relative to that parent. An element may be None if it never 

351 existed in one parent and was deleted in two others. 

352 

353 A path is only included in the output if it is a conflict, i.e. its SHA 

354 in the merge tree is not found in any of the parents, or in the case of 

355 deletes, if not all of the old SHAs match. 

356 """ 

357 all_parent_changes = [ 

358 tree_changes(store, t, tree_id, rename_detector=rename_detector) 

359 for t in parent_tree_ids 

360 ] 

361 num_parents = len(parent_tree_ids) 

362 changes_by_path: dict[str, list[Optional[TreeChange]]] = defaultdict( 

363 lambda: [None] * num_parents 

364 ) 

365 

366 # Organize by path. 

367 for i, parent_changes in enumerate(all_parent_changes): 

368 for change in parent_changes: 

369 if change.type == CHANGE_DELETE: 

370 path = change.old.path 

371 else: 

372 path = change.new.path 

373 changes_by_path[path][i] = change 

374 

375 def old_sha(c: TreeChange) -> Optional[ObjectID]: 

376 return c.old.sha 

377 

378 def change_type(c: TreeChange) -> str: 

379 return c.type 

380 

381 # Yield only conflicting changes. 

382 for _, changes in sorted(changes_by_path.items()): 

383 assert len(changes) == num_parents 

384 have = [c for c in changes if c is not None] 

385 if _all_eq(have, change_type, CHANGE_DELETE): 

386 if not _all_same(have, old_sha): 

387 yield changes 

388 elif not _all_same(have, change_type): 

389 yield changes 

390 elif None not in changes: 

391 # If no change was found relative to one parent, that means the SHA 

392 # must have matched the SHA in that parent, so it is not a 

393 # conflict. 

394 yield changes 

395 

396 

397_BLOCK_SIZE = 64 

398 

399 

400def _count_blocks(obj: ShaFile) -> dict[int, int]: 

401 """Count the blocks in an object. 

402 

403 Splits the data into blocks either on lines or <=64-byte chunks of lines. 

404 

405 Args: 

406 obj: The object to count blocks for. 

407 

408 Returns: 

409 A dict of block hashcode -> total bytes occurring. 

410 """ 

411 block_counts: dict[int, int] = defaultdict(int) 

412 block = BytesIO() 

413 n = 0 

414 

415 # Cache attrs as locals to avoid expensive lookups in the inner loop. 

416 block_write = block.write 

417 block_seek = block.seek 

418 block_truncate = block.truncate 

419 block_getvalue = block.getvalue 

420 

421 for c in chain.from_iterable(obj.as_raw_chunks()): 

422 cb = c.to_bytes(1, "big") 

423 block_write(cb) 

424 n += 1 

425 if cb == b"\n" or n == _BLOCK_SIZE: 

426 value = block_getvalue() 

427 block_counts[hash(value)] += len(value) 

428 block_seek(0) 

429 block_truncate() 

430 n = 0 

431 if n > 0: 

432 last_block = block_getvalue() 

433 block_counts[hash(last_block)] += len(last_block) 

434 return block_counts 

435 

436 

437def _common_bytes(blocks1: dict[int, int], blocks2: dict[int, int]) -> int: 

438 """Count the number of common bytes in two block count dicts. 

439 

440 Args: 

441 blocks1: The first dict of block hashcode -> total bytes. 

442 blocks2: The second dict of block hashcode -> total bytes. 

443 

444 Returns: 

445 The number of bytes in common between blocks1 and blocks2. This is 

446 only approximate due to possible hash collisions. 

447 """ 

448 # Iterate over the smaller of the two dicts, since this is symmetrical. 

449 if len(blocks1) > len(blocks2): 

450 blocks1, blocks2 = blocks2, blocks1 

451 score = 0 

452 for block, count1 in blocks1.items(): 

453 count2 = blocks2.get(block) 

454 if count2: 

455 score += min(count1, count2) 

456 return score 

457 

458 

459def _similarity_score( 

460 obj1: ShaFile, 

461 obj2: ShaFile, 

462 block_cache: Optional[dict[ObjectID, dict[int, int]]] = None, 

463) -> int: 

464 """Compute a similarity score for two objects. 

465 

466 Args: 

467 obj1: The first object to score. 

468 obj2: The second object to score. 

469 block_cache: An optional dict of SHA to block counts to cache 

470 results between calls. 

471 

472 Returns: 

473 The similarity score between the two objects, defined as the 

474 number of bytes in common between the two objects divided by the 

475 maximum size, scaled to the range 0-100. 

476 """ 

477 if block_cache is None: 

478 block_cache = {} 

479 if obj1.id not in block_cache: 

480 block_cache[obj1.id] = _count_blocks(obj1) 

481 if obj2.id not in block_cache: 

482 block_cache[obj2.id] = _count_blocks(obj2) 

483 

484 common_bytes = _common_bytes(block_cache[obj1.id], block_cache[obj2.id]) 

485 max_size = max(obj1.raw_length(), obj2.raw_length()) 

486 if not max_size: 

487 return _MAX_SCORE 

488 return int(float(common_bytes) * _MAX_SCORE / max_size) 

489 

490 

491def _tree_change_key(entry: TreeChange) -> tuple[bytes, bytes]: 

492 # Sort by old path then new path. If only one exists, use it for both keys. 

493 path1 = entry.old.path 

494 path2 = entry.new.path 

495 if path1 is None: 

496 path1 = path2 

497 if path2 is None: 

498 path2 = path1 

499 return (path1, path2) 

500 

501 

502class RenameDetector: 

503 """Object for handling rename detection between two trees.""" 

504 

505 _adds: list[TreeChange] 

506 _deletes: list[TreeChange] 

507 _changes: list[TreeChange] 

508 _candidates: list[tuple[int, TreeChange]] 

509 

510 def __init__( 

511 self, 

512 store: BaseObjectStore, 

513 rename_threshold: int = RENAME_THRESHOLD, 

514 max_files: Optional[int] = MAX_FILES, 

515 rewrite_threshold: Optional[int] = REWRITE_THRESHOLD, 

516 find_copies_harder: bool = False, 

517 ) -> None: 

518 """Initialize the rename detector. 

519 

520 Args: 

521 store: An ObjectStore for looking up objects. 

522 rename_threshold: The threshold similarity score for considering 

523 an add/delete pair to be a rename/copy; see _similarity_score. 

524 max_files: The maximum number of adds and deletes to consider, 

525 or None for no limit. The detector is guaranteed to compare no more 

526 than max_files ** 2 add/delete pairs. This limit is provided 

527 because rename detection can be quadratic in the project size. If 

528 the limit is exceeded, no content rename detection is attempted. 

529 rewrite_threshold: The threshold similarity score below which a 

530 modify should be considered a delete/add, or None to not break 

531 modifies; see _similarity_score. 

532 find_copies_harder: If True, consider unmodified files when 

533 detecting copies. 

534 """ 

535 self._store = store 

536 self._rename_threshold = rename_threshold 

537 self._rewrite_threshold = rewrite_threshold 

538 self._max_files = max_files 

539 self._find_copies_harder = find_copies_harder 

540 self._want_unchanged = False 

541 

542 def _reset(self) -> None: 

543 self._adds = [] 

544 self._deletes = [] 

545 self._changes = [] 

546 

547 def _should_split(self, change: TreeChange) -> bool: 

548 if ( 

549 self._rewrite_threshold is None 

550 or change.type != CHANGE_MODIFY 

551 or change.old.sha == change.new.sha 

552 ): 

553 return False 

554 old_obj = self._store[change.old.sha] 

555 new_obj = self._store[change.new.sha] 

556 return _similarity_score(old_obj, new_obj) < self._rewrite_threshold 

557 

558 def _add_change(self, change: TreeChange) -> None: 

559 if change.type == CHANGE_ADD: 

560 self._adds.append(change) 

561 elif change.type == CHANGE_DELETE: 

562 self._deletes.append(change) 

563 elif self._should_split(change): 

564 self._deletes.append(TreeChange.delete(change.old)) 

565 self._adds.append(TreeChange.add(change.new)) 

566 elif ( 

567 self._find_copies_harder and change.type == CHANGE_UNCHANGED 

568 ) or change.type == CHANGE_MODIFY: 

569 # Treat all modifies as potential deletes for rename detection, 

570 # but don't split them (to avoid spurious renames). Setting 

571 # find_copies_harder means we treat unchanged the same as 

572 # modified. 

573 self._deletes.append(change) 

574 else: 

575 self._changes.append(change) 

576 

577 def _collect_changes( 

578 self, tree1_id: Optional[ObjectID], tree2_id: Optional[ObjectID] 

579 ) -> None: 

580 want_unchanged = self._find_copies_harder or self._want_unchanged 

581 for change in tree_changes( 

582 self._store, 

583 tree1_id, 

584 tree2_id, 

585 want_unchanged=want_unchanged, 

586 include_trees=self._include_trees, 

587 ): 

588 self._add_change(change) 

589 

590 def _prune(self, add_paths: set[bytes], delete_paths: set[bytes]) -> None: 

591 self._adds = [a for a in self._adds if a.new.path not in add_paths] 

592 self._deletes = [d for d in self._deletes if d.old.path not in delete_paths] 

593 

594 def _find_exact_renames(self) -> None: 

595 add_map = defaultdict(list) 

596 for add in self._adds: 

597 add_map[add.new.sha].append(add.new) 

598 delete_map = defaultdict(list) 

599 for delete in self._deletes: 

600 # Keep track of whether the delete was actually marked as a delete. 

601 # If not, it needs to be marked as a copy. 

602 is_delete = delete.type == CHANGE_DELETE 

603 delete_map[delete.old.sha].append((delete.old, is_delete)) 

604 

605 add_paths = set() 

606 delete_paths = set() 

607 for sha, sha_deletes in delete_map.items(): 

608 sha_adds = add_map[sha] 

609 for (old, is_delete), new in zip(sha_deletes, sha_adds): 

610 if stat.S_IFMT(old.mode) != stat.S_IFMT(new.mode): 

611 continue 

612 if is_delete: 

613 delete_paths.add(old.path) 

614 add_paths.add(new.path) 

615 new_type = (is_delete and CHANGE_RENAME) or CHANGE_COPY 

616 self._changes.append(TreeChange(new_type, old, new)) 

617 

618 num_extra_adds = len(sha_adds) - len(sha_deletes) 

619 # TODO(dborowitz): Less arbitrary way of dealing with extra copies. 

620 old = sha_deletes[0][0] 

621 if num_extra_adds > 0: 

622 for new in sha_adds[-num_extra_adds:]: 

623 add_paths.add(new.path) 

624 self._changes.append(TreeChange(CHANGE_COPY, old, new)) 

625 self._prune(add_paths, delete_paths) 

626 

627 def _should_find_content_renames(self) -> bool: 

628 if self._max_files is None: 

629 return True 

630 return len(self._adds) * len(self._deletes) <= self._max_files**2 

631 

632 def _rename_type( 

633 self, check_paths: bool, delete: TreeChange, add: TreeChange 

634 ) -> str: 

635 if check_paths and delete.old.path == add.new.path: 

636 # If the paths match, this must be a split modify, so make sure it 

637 # comes out as a modify. 

638 return CHANGE_MODIFY 

639 elif delete.type != CHANGE_DELETE: 

640 # If it's in deletes but not marked as a delete, it must have been 

641 # added due to find_copies_harder, and needs to be marked as a 

642 # copy. 

643 return CHANGE_COPY 

644 return CHANGE_RENAME 

645 

646 def _find_content_rename_candidates(self) -> None: 

647 candidates = self._candidates = [] 

648 # TODO: Optimizations: 

649 # - Compare object sizes before counting blocks. 

650 # - Skip if delete's S_IFMT differs from all adds. 

651 # - Skip if adds or deletes is empty. 

652 # Match C git's behavior of not attempting to find content renames if 

653 # the matrix size exceeds the threshold. 

654 if not self._should_find_content_renames(): 

655 return 

656 

657 block_cache = {} 

658 check_paths = self._rename_threshold is not None 

659 for delete in self._deletes: 

660 if S_ISGITLINK(delete.old.mode): 

661 continue # Git links don't exist in this repo. 

662 old_sha = delete.old.sha 

663 old_obj = self._store[old_sha] 

664 block_cache[old_sha] = _count_blocks(old_obj) 

665 for add in self._adds: 

666 if stat.S_IFMT(delete.old.mode) != stat.S_IFMT(add.new.mode): 

667 continue 

668 new_obj = self._store[add.new.sha] 

669 score = _similarity_score(old_obj, new_obj, block_cache=block_cache) 

670 if score > self._rename_threshold: 

671 new_type = self._rename_type(check_paths, delete, add) 

672 rename = TreeChange(new_type, delete.old, add.new) 

673 candidates.append((-score, rename)) 

674 

675 def _choose_content_renames(self) -> None: 

676 # Sort scores from highest to lowest, but keep names in ascending 

677 # order. 

678 self._candidates.sort() 

679 

680 delete_paths = set() 

681 add_paths = set() 

682 for _, change in self._candidates: 

683 new_path = change.new.path 

684 if new_path in add_paths: 

685 continue 

686 old_path = change.old.path 

687 orig_type = change.type 

688 if old_path in delete_paths: 

689 change = TreeChange(CHANGE_COPY, change.old, change.new) 

690 

691 # If the candidate was originally a copy, that means it came from a 

692 # modified or unchanged path, so we don't want to prune it. 

693 if orig_type != CHANGE_COPY: 

694 delete_paths.add(old_path) 

695 add_paths.add(new_path) 

696 self._changes.append(change) 

697 self._prune(add_paths, delete_paths) 

698 

699 def _join_modifies(self) -> None: 

700 if self._rewrite_threshold is None: 

701 return 

702 

703 modifies = {} 

704 delete_map = {d.old.path: d for d in self._deletes} 

705 for add in self._adds: 

706 path = add.new.path 

707 delete = delete_map.get(path) 

708 if delete is not None and stat.S_IFMT(delete.old.mode) == stat.S_IFMT( 

709 add.new.mode 

710 ): 

711 modifies[path] = TreeChange(CHANGE_MODIFY, delete.old, add.new) 

712 

713 self._adds = [a for a in self._adds if a.new.path not in modifies] 

714 self._deletes = [a for a in self._deletes if a.new.path not in modifies] 

715 self._changes += modifies.values() 

716 

717 def _sorted_changes(self) -> list[TreeChange]: 

718 result = [] 

719 result.extend(self._adds) 

720 result.extend(self._deletes) 

721 result.extend(self._changes) 

722 result.sort(key=_tree_change_key) 

723 return result 

724 

725 def _prune_unchanged(self) -> None: 

726 if self._want_unchanged: 

727 return 

728 self._deletes = [d for d in self._deletes if d.type != CHANGE_UNCHANGED] 

729 

730 def changes_with_renames( 

731 self, 

732 tree1_id: Optional[ObjectID], 

733 tree2_id: Optional[ObjectID], 

734 want_unchanged: bool = False, 

735 include_trees: bool = False, 

736 ) -> list[TreeChange]: 

737 """Iterate TreeChanges between two tree SHAs, with rename detection.""" 

738 self._reset() 

739 self._want_unchanged = want_unchanged 

740 self._include_trees = include_trees 

741 self._collect_changes(tree1_id, tree2_id) 

742 self._find_exact_renames() 

743 self._find_content_rename_candidates() 

744 self._choose_content_renames() 

745 self._join_modifies() 

746 self._prune_unchanged() 

747 return self._sorted_changes() 

748 

749 

750# Hold on to the pure-python implementations for testing. 

751_is_tree_py = _is_tree 

752_merge_entries_py = _merge_entries 

753_count_blocks_py = _count_blocks 

754 

755if TYPE_CHECKING: 

756 # For type checking, use the Python implementations 

757 pass 

758else: 

759 # At runtime, try to import Rust extensions 

760 try: 

761 # Try to import Rust versions 

762 from dulwich._diff_tree import ( 

763 _count_blocks as _rust_count_blocks, 

764 ) 

765 from dulwich._diff_tree import ( 

766 _is_tree as _rust_is_tree, 

767 ) 

768 from dulwich._diff_tree import ( 

769 _merge_entries as _rust_merge_entries, 

770 ) 

771 

772 # Override with Rust versions 

773 _count_blocks = _rust_count_blocks 

774 _is_tree = _rust_is_tree 

775 _merge_entries = _rust_merge_entries 

776 except ImportError: 

777 pass