Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/dulwich/diff_tree.py: 22%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

393 statements  

1# diff_tree.py -- Utilities for diffing files and trees. 

2# Copyright (C) 2010 Google, Inc. 

3# 

4# SPDX-License-Identifier: Apache-2.0 OR GPL-2.0-or-later 

5# Dulwich is dual-licensed under the Apache License, Version 2.0 and the GNU 

6# General Public License as public by the Free Software Foundation; version 2.0 

7# or (at your option) any later version. You can redistribute it and/or 

8# modify it under the terms of either of these two licenses. 

9# 

10# Unless required by applicable law or agreed to in writing, software 

11# distributed under the License is distributed on an "AS IS" BASIS, 

12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

13# See the License for the specific language governing permissions and 

14# limitations under the License. 

15# 

16# You should have received a copy of the licenses; if not, see 

17# <http://www.gnu.org/licenses/> for a copy of the GNU General Public License 

18# and <http://www.apache.org/licenses/LICENSE-2.0> for a copy of the Apache 

19# License, Version 2.0. 

20# 

21 

22"""Utilities for diffing files and trees.""" 

23 

24import stat 

25from collections import defaultdict, namedtuple 

26from collections.abc import Iterator 

27from io import BytesIO 

28from itertools import chain 

29from typing import TYPE_CHECKING, Any, Callable, Optional, TypeVar 

30 

31from .object_store import BaseObjectStore 

32from .objects import S_ISGITLINK, ObjectID, ShaFile, Tree, TreeEntry 

33 

34# TreeChange type constants. 

35CHANGE_ADD = "add" 

36CHANGE_MODIFY = "modify" 

37CHANGE_DELETE = "delete" 

38CHANGE_RENAME = "rename" 

39CHANGE_COPY = "copy" 

40CHANGE_UNCHANGED = "unchanged" 

41 

42RENAME_CHANGE_TYPES = (CHANGE_RENAME, CHANGE_COPY) 

43 

44_NULL_ENTRY = TreeEntry(None, None, None) 

45 

46_MAX_SCORE = 100 

47RENAME_THRESHOLD = 60 

48MAX_FILES = 200 

49REWRITE_THRESHOLD = None 

50 

51 

52class TreeChange(namedtuple("TreeChange", ["type", "old", "new"])): 

53 """Named tuple a single change between two trees.""" 

54 

55 @classmethod 

56 def add(cls, new: TreeEntry) -> "TreeChange": 

57 return cls(CHANGE_ADD, _NULL_ENTRY, new) 

58 

59 @classmethod 

60 def delete(cls, old: TreeEntry) -> "TreeChange": 

61 return cls(CHANGE_DELETE, old, _NULL_ENTRY) 

62 

63 

64def _tree_entries(path: bytes, tree: Tree) -> list[TreeEntry]: 

65 result: list[TreeEntry] = [] 

66 if not tree: 

67 return result 

68 for entry in tree.iteritems(name_order=True): 

69 result.append(entry.in_path(path)) 

70 return result 

71 

72 

73def _merge_entries( 

74 path: bytes, tree1: Tree, tree2: Tree 

75) -> list[tuple[TreeEntry, TreeEntry]]: 

76 """Merge the entries of two trees. 

77 

78 Args: 

79 path: A path to prepend to all tree entry names. 

80 tree1: The first Tree object to iterate, or None. 

81 tree2: The second Tree object to iterate, or None. 

82 

83 Returns: 

84 A list of pairs of TreeEntry objects for each pair of entries in 

85 the trees. If an entry exists in one tree but not the other, the other 

86 entry will have all attributes set to None. If neither entry's path is 

87 None, they are guaranteed to match. 

88 """ 

89 entries1 = _tree_entries(path, tree1) 

90 entries2 = _tree_entries(path, tree2) 

91 i1 = i2 = 0 

92 len1 = len(entries1) 

93 len2 = len(entries2) 

94 

95 result = [] 

96 while i1 < len1 and i2 < len2: 

97 entry1 = entries1[i1] 

98 entry2 = entries2[i2] 

99 if entry1.path < entry2.path: 

100 result.append((entry1, _NULL_ENTRY)) 

101 i1 += 1 

102 elif entry1.path > entry2.path: 

103 result.append((_NULL_ENTRY, entry2)) 

104 i2 += 1 

105 else: 

106 result.append((entry1, entry2)) 

107 i1 += 1 

108 i2 += 1 

109 for i in range(i1, len1): 

110 result.append((entries1[i], _NULL_ENTRY)) 

111 for i in range(i2, len2): 

112 result.append((_NULL_ENTRY, entries2[i])) 

113 return result 

114 

115 

116def _is_tree(entry: TreeEntry) -> bool: 

117 mode = entry.mode 

118 if mode is None: 

119 return False 

120 return stat.S_ISDIR(mode) 

121 

122 

123def walk_trees( 

124 store: BaseObjectStore, 

125 tree1_id: Optional[ObjectID], 

126 tree2_id: Optional[ObjectID], 

127 prune_identical: bool = False, 

128 paths: Optional[list[bytes]] = None, 

129) -> Iterator[tuple[TreeEntry, TreeEntry]]: 

130 """Recursively walk all the entries of two trees. 

131 

132 Iteration is depth-first pre-order, as in e.g. os.walk. 

133 

134 Args: 

135 store: An ObjectStore for looking up objects. 

136 tree1_id: The SHA of the first Tree object to iterate, or None. 

137 tree2_id: The SHA of the second Tree object to iterate, or None. 

138 prune_identical: If True, identical subtrees will not be walked. 

139 paths: Optional list of paths to filter to (as bytes). 

140 

141 Returns: 

142 Iterator over Pairs of TreeEntry objects for each pair of entries 

143 in the trees and their subtrees recursively. If an entry exists in one 

144 tree but not the other, the other entry will have all attributes set 

145 to None. If neither entry's path is None, they are guaranteed to 

146 match. 

147 """ 

148 # This could be fairly easily generalized to >2 trees if we find a use 

149 # case. 

150 mode1 = (tree1_id and stat.S_IFDIR) or None 

151 mode2 = (tree2_id and stat.S_IFDIR) or None 

152 todo = [(TreeEntry(b"", mode1, tree1_id), TreeEntry(b"", mode2, tree2_id))] 

153 while todo: 

154 entry1, entry2 = todo.pop() 

155 is_tree1 = _is_tree(entry1) 

156 is_tree2 = _is_tree(entry2) 

157 if prune_identical and is_tree1 and is_tree2 and entry1 == entry2: 

158 continue 

159 

160 tree1 = (is_tree1 and store[entry1.sha]) or None 

161 tree2 = (is_tree2 and store[entry2.sha]) or None 

162 path = entry1.path or entry2.path 

163 

164 # If we have path filters, check if we should process this tree 

165 if paths is not None and (is_tree1 or is_tree2): 

166 # Special case for root tree 

167 if path == b"": 

168 should_recurse = True 

169 else: 

170 # Check if any of our filter paths could be under this tree 

171 should_recurse = False 

172 for filter_path in paths: 

173 if filter_path == path: 

174 # Exact match - we want this directory itself 

175 should_recurse = True 

176 break 

177 elif filter_path.startswith(path + b"/"): 

178 # Filter path is under this directory 

179 should_recurse = True 

180 break 

181 elif path.startswith(filter_path + b"/"): 

182 # This directory is under a filter path 

183 should_recurse = True 

184 break 

185 if not should_recurse: 

186 # Skip this tree entirely 

187 continue 

188 

189 # Ensure trees are Tree objects before merging 

190 if tree1 is not None and not isinstance(tree1, Tree): 

191 tree1 = None 

192 if tree2 is not None and not isinstance(tree2, Tree): 

193 tree2 = None 

194 

195 if tree1 is not None or tree2 is not None: 

196 # Use empty trees for None values 

197 if tree1 is None: 

198 tree1 = Tree() 

199 if tree2 is None: 

200 tree2 = Tree() 

201 todo.extend(reversed(_merge_entries(path, tree1, tree2))) 

202 

203 # Only yield entries that match our path filters 

204 if paths is None: 

205 yield entry1, entry2 

206 else: 

207 # Check if this entry matches any of our filters 

208 for filter_path in paths: 

209 if path == filter_path: 

210 # Exact match 

211 yield entry1, entry2 

212 break 

213 elif path.startswith(filter_path + b"/"): 

214 # This entry is under a filter directory 

215 yield entry1, entry2 

216 break 

217 elif filter_path.startswith(path + b"/") and (is_tree1 or is_tree2): 

218 # This is a parent directory of a filter path 

219 yield entry1, entry2 

220 break 

221 

222 

223def _skip_tree(entry: TreeEntry, include_trees: bool) -> TreeEntry: 

224 if entry.mode is None or (not include_trees and stat.S_ISDIR(entry.mode)): 

225 return _NULL_ENTRY 

226 return entry 

227 

228 

229def tree_changes( 

230 store: BaseObjectStore, 

231 tree1_id: Optional[ObjectID], 

232 tree2_id: Optional[ObjectID], 

233 want_unchanged: bool = False, 

234 rename_detector: Optional["RenameDetector"] = None, 

235 include_trees: bool = False, 

236 change_type_same: bool = False, 

237 paths: Optional[list[bytes]] = None, 

238) -> Iterator[TreeChange]: 

239 """Find the differences between the contents of two trees. 

240 

241 Args: 

242 store: An ObjectStore for looking up objects. 

243 tree1_id: The SHA of the source tree. 

244 tree2_id: The SHA of the target tree. 

245 want_unchanged: If True, include TreeChanges for unmodified entries 

246 as well. 

247 include_trees: Whether to include trees 

248 rename_detector: RenameDetector object for detecting renames. 

249 change_type_same: Whether to report change types in the same 

250 entry or as delete+add. 

251 paths: Optional list of paths to filter to (as bytes). 

252 

253 Returns: 

254 Iterator over TreeChange instances for each change between the 

255 source and target tree. 

256 """ 

257 if rename_detector is not None and tree1_id is not None and tree2_id is not None: 

258 yield from rename_detector.changes_with_renames( 

259 tree1_id, 

260 tree2_id, 

261 want_unchanged=want_unchanged, 

262 include_trees=include_trees, 

263 ) 

264 return 

265 

266 entries = walk_trees( 

267 store, tree1_id, tree2_id, prune_identical=(not want_unchanged), paths=paths 

268 ) 

269 for entry1, entry2 in entries: 

270 if entry1 == entry2 and not want_unchanged: 

271 continue 

272 

273 # Treat entries for trees as missing. 

274 entry1 = _skip_tree(entry1, include_trees) 

275 entry2 = _skip_tree(entry2, include_trees) 

276 

277 if entry1 != _NULL_ENTRY and entry2 != _NULL_ENTRY: 

278 if ( 

279 stat.S_IFMT(entry1.mode) != stat.S_IFMT(entry2.mode) 

280 and not change_type_same 

281 ): 

282 # File type changed: report as delete/add. 

283 yield TreeChange.delete(entry1) 

284 entry1 = _NULL_ENTRY 

285 change_type = CHANGE_ADD 

286 elif entry1 == entry2: 

287 change_type = CHANGE_UNCHANGED 

288 else: 

289 change_type = CHANGE_MODIFY 

290 elif entry1 != _NULL_ENTRY: 

291 change_type = CHANGE_DELETE 

292 elif entry2 != _NULL_ENTRY: 

293 change_type = CHANGE_ADD 

294 else: 

295 # Both were None because at least one was a tree. 

296 continue 

297 yield TreeChange(change_type, entry1, entry2) 

298 

299 

300T = TypeVar("T") 

301U = TypeVar("U") 

302 

303 

304def _all_eq(seq: list[T], key: Callable[[T], U], value: U) -> bool: 

305 for e in seq: 

306 if key(e) != value: 

307 return False 

308 return True 

309 

310 

311def _all_same(seq: list[Any], key: Callable[[Any], Any]) -> bool: 

312 return _all_eq(seq[1:], key, key(seq[0])) 

313 

314 

315def tree_changes_for_merge( 

316 store: BaseObjectStore, 

317 parent_tree_ids: list[ObjectID], 

318 tree_id: ObjectID, 

319 rename_detector: Optional["RenameDetector"] = None, 

320) -> Iterator[list[Optional[TreeChange]]]: 

321 """Get the tree changes for a merge tree relative to all its parents. 

322 

323 Args: 

324 store: An ObjectStore for looking up objects. 

325 parent_tree_ids: An iterable of the SHAs of the parent trees. 

326 tree_id: The SHA of the merge tree. 

327 rename_detector: RenameDetector object for detecting renames. 

328 

329 Returns: 

330 Iterator over lists of TreeChange objects, one per conflicted path 

331 in the merge. 

332 

333 Each list contains one element per parent, with the TreeChange for that 

334 path relative to that parent. An element may be None if it never 

335 existed in one parent and was deleted in two others. 

336 

337 A path is only included in the output if it is a conflict, i.e. its SHA 

338 in the merge tree is not found in any of the parents, or in the case of 

339 deletes, if not all of the old SHAs match. 

340 """ 

341 all_parent_changes = [ 

342 tree_changes(store, t, tree_id, rename_detector=rename_detector) 

343 for t in parent_tree_ids 

344 ] 

345 num_parents = len(parent_tree_ids) 

346 changes_by_path: dict[str, list[Optional[TreeChange]]] = defaultdict( 

347 lambda: [None] * num_parents 

348 ) 

349 

350 # Organize by path. 

351 for i, parent_changes in enumerate(all_parent_changes): 

352 for change in parent_changes: 

353 if change.type == CHANGE_DELETE: 

354 path = change.old.path 

355 else: 

356 path = change.new.path 

357 changes_by_path[path][i] = change 

358 

359 def old_sha(c: TreeChange) -> Optional[ObjectID]: 

360 return c.old.sha 

361 

362 def change_type(c: TreeChange) -> str: 

363 return c.type 

364 

365 # Yield only conflicting changes. 

366 for _, changes in sorted(changes_by_path.items()): 

367 assert len(changes) == num_parents 

368 have = [c for c in changes if c is not None] 

369 if _all_eq(have, change_type, CHANGE_DELETE): 

370 if not _all_same(have, old_sha): 

371 yield changes 

372 elif not _all_same(have, change_type): 

373 yield changes 

374 elif None not in changes: 

375 # If no change was found relative to one parent, that means the SHA 

376 # must have matched the SHA in that parent, so it is not a 

377 # conflict. 

378 yield changes 

379 

380 

381_BLOCK_SIZE = 64 

382 

383 

384def _count_blocks(obj: ShaFile) -> dict[int, int]: 

385 """Count the blocks in an object. 

386 

387 Splits the data into blocks either on lines or <=64-byte chunks of lines. 

388 

389 Args: 

390 obj: The object to count blocks for. 

391 

392 Returns: 

393 A dict of block hashcode -> total bytes occurring. 

394 """ 

395 block_counts: dict[int, int] = defaultdict(int) 

396 block = BytesIO() 

397 n = 0 

398 

399 # Cache attrs as locals to avoid expensive lookups in the inner loop. 

400 block_write = block.write 

401 block_seek = block.seek 

402 block_truncate = block.truncate 

403 block_getvalue = block.getvalue 

404 

405 for c in chain.from_iterable(obj.as_raw_chunks()): 

406 cb = c.to_bytes(1, "big") 

407 block_write(cb) 

408 n += 1 

409 if cb == b"\n" or n == _BLOCK_SIZE: 

410 value = block_getvalue() 

411 block_counts[hash(value)] += len(value) 

412 block_seek(0) 

413 block_truncate() 

414 n = 0 

415 if n > 0: 

416 last_block = block_getvalue() 

417 block_counts[hash(last_block)] += len(last_block) 

418 return block_counts 

419 

420 

421def _common_bytes(blocks1: dict[int, int], blocks2: dict[int, int]) -> int: 

422 """Count the number of common bytes in two block count dicts. 

423 

424 Args: 

425 blocks1: The first dict of block hashcode -> total bytes. 

426 blocks2: The second dict of block hashcode -> total bytes. 

427 

428 Returns: 

429 The number of bytes in common between blocks1 and blocks2. This is 

430 only approximate due to possible hash collisions. 

431 """ 

432 # Iterate over the smaller of the two dicts, since this is symmetrical. 

433 if len(blocks1) > len(blocks2): 

434 blocks1, blocks2 = blocks2, blocks1 

435 score = 0 

436 for block, count1 in blocks1.items(): 

437 count2 = blocks2.get(block) 

438 if count2: 

439 score += min(count1, count2) 

440 return score 

441 

442 

443def _similarity_score( 

444 obj1: ShaFile, 

445 obj2: ShaFile, 

446 block_cache: Optional[dict[ObjectID, dict[int, int]]] = None, 

447) -> int: 

448 """Compute a similarity score for two objects. 

449 

450 Args: 

451 obj1: The first object to score. 

452 obj2: The second object to score. 

453 block_cache: An optional dict of SHA to block counts to cache 

454 results between calls. 

455 

456 Returns: 

457 The similarity score between the two objects, defined as the 

458 number of bytes in common between the two objects divided by the 

459 maximum size, scaled to the range 0-100. 

460 """ 

461 if block_cache is None: 

462 block_cache = {} 

463 if obj1.id not in block_cache: 

464 block_cache[obj1.id] = _count_blocks(obj1) 

465 if obj2.id not in block_cache: 

466 block_cache[obj2.id] = _count_blocks(obj2) 

467 

468 common_bytes = _common_bytes(block_cache[obj1.id], block_cache[obj2.id]) 

469 max_size = max(obj1.raw_length(), obj2.raw_length()) 

470 if not max_size: 

471 return _MAX_SCORE 

472 return int(float(common_bytes) * _MAX_SCORE / max_size) 

473 

474 

475def _tree_change_key(entry: TreeChange) -> tuple[bytes, bytes]: 

476 # Sort by old path then new path. If only one exists, use it for both keys. 

477 path1 = entry.old.path 

478 path2 = entry.new.path 

479 if path1 is None: 

480 path1 = path2 

481 if path2 is None: 

482 path2 = path1 

483 return (path1, path2) 

484 

485 

486class RenameDetector: 

487 """Object for handling rename detection between two trees.""" 

488 

489 _adds: list[TreeChange] 

490 _deletes: list[TreeChange] 

491 _changes: list[TreeChange] 

492 _candidates: list[tuple[int, TreeChange]] 

493 

494 def __init__( 

495 self, 

496 store: BaseObjectStore, 

497 rename_threshold: int = RENAME_THRESHOLD, 

498 max_files: Optional[int] = MAX_FILES, 

499 rewrite_threshold: Optional[int] = REWRITE_THRESHOLD, 

500 find_copies_harder: bool = False, 

501 ) -> None: 

502 """Initialize the rename detector. 

503 

504 Args: 

505 store: An ObjectStore for looking up objects. 

506 rename_threshold: The threshold similarity score for considering 

507 an add/delete pair to be a rename/copy; see _similarity_score. 

508 max_files: The maximum number of adds and deletes to consider, 

509 or None for no limit. The detector is guaranteed to compare no more 

510 than max_files ** 2 add/delete pairs. This limit is provided 

511 because rename detection can be quadratic in the project size. If 

512 the limit is exceeded, no content rename detection is attempted. 

513 rewrite_threshold: The threshold similarity score below which a 

514 modify should be considered a delete/add, or None to not break 

515 modifies; see _similarity_score. 

516 find_copies_harder: If True, consider unmodified files when 

517 detecting copies. 

518 """ 

519 self._store = store 

520 self._rename_threshold = rename_threshold 

521 self._rewrite_threshold = rewrite_threshold 

522 self._max_files = max_files 

523 self._find_copies_harder = find_copies_harder 

524 self._want_unchanged = False 

525 

526 def _reset(self) -> None: 

527 self._adds = [] 

528 self._deletes = [] 

529 self._changes = [] 

530 

531 def _should_split(self, change: TreeChange) -> bool: 

532 if ( 

533 self._rewrite_threshold is None 

534 or change.type != CHANGE_MODIFY 

535 or change.old.sha == change.new.sha 

536 ): 

537 return False 

538 old_obj = self._store[change.old.sha] 

539 new_obj = self._store[change.new.sha] 

540 return _similarity_score(old_obj, new_obj) < self._rewrite_threshold 

541 

542 def _add_change(self, change: TreeChange) -> None: 

543 if change.type == CHANGE_ADD: 

544 self._adds.append(change) 

545 elif change.type == CHANGE_DELETE: 

546 self._deletes.append(change) 

547 elif self._should_split(change): 

548 self._deletes.append(TreeChange.delete(change.old)) 

549 self._adds.append(TreeChange.add(change.new)) 

550 elif ( 

551 self._find_copies_harder and change.type == CHANGE_UNCHANGED 

552 ) or change.type == CHANGE_MODIFY: 

553 # Treat all modifies as potential deletes for rename detection, 

554 # but don't split them (to avoid spurious renames). Setting 

555 # find_copies_harder means we treat unchanged the same as 

556 # modified. 

557 self._deletes.append(change) 

558 else: 

559 self._changes.append(change) 

560 

561 def _collect_changes( 

562 self, tree1_id: Optional[ObjectID], tree2_id: Optional[ObjectID] 

563 ) -> None: 

564 want_unchanged = self._find_copies_harder or self._want_unchanged 

565 for change in tree_changes( 

566 self._store, 

567 tree1_id, 

568 tree2_id, 

569 want_unchanged=want_unchanged, 

570 include_trees=self._include_trees, 

571 ): 

572 self._add_change(change) 

573 

574 def _prune(self, add_paths: set[bytes], delete_paths: set[bytes]) -> None: 

575 self._adds = [a for a in self._adds if a.new.path not in add_paths] 

576 self._deletes = [d for d in self._deletes if d.old.path not in delete_paths] 

577 

578 def _find_exact_renames(self) -> None: 

579 add_map = defaultdict(list) 

580 for add in self._adds: 

581 add_map[add.new.sha].append(add.new) 

582 delete_map = defaultdict(list) 

583 for delete in self._deletes: 

584 # Keep track of whether the delete was actually marked as a delete. 

585 # If not, it needs to be marked as a copy. 

586 is_delete = delete.type == CHANGE_DELETE 

587 delete_map[delete.old.sha].append((delete.old, is_delete)) 

588 

589 add_paths = set() 

590 delete_paths = set() 

591 for sha, sha_deletes in delete_map.items(): 

592 sha_adds = add_map[sha] 

593 for (old, is_delete), new in zip(sha_deletes, sha_adds): 

594 if stat.S_IFMT(old.mode) != stat.S_IFMT(new.mode): 

595 continue 

596 if is_delete: 

597 delete_paths.add(old.path) 

598 add_paths.add(new.path) 

599 new_type = (is_delete and CHANGE_RENAME) or CHANGE_COPY 

600 self._changes.append(TreeChange(new_type, old, new)) 

601 

602 num_extra_adds = len(sha_adds) - len(sha_deletes) 

603 # TODO(dborowitz): Less arbitrary way of dealing with extra copies. 

604 old = sha_deletes[0][0] 

605 if num_extra_adds > 0: 

606 for new in sha_adds[-num_extra_adds:]: 

607 add_paths.add(new.path) 

608 self._changes.append(TreeChange(CHANGE_COPY, old, new)) 

609 self._prune(add_paths, delete_paths) 

610 

611 def _should_find_content_renames(self) -> bool: 

612 if self._max_files is None: 

613 return True 

614 return len(self._adds) * len(self._deletes) <= self._max_files**2 

615 

616 def _rename_type( 

617 self, check_paths: bool, delete: TreeChange, add: TreeChange 

618 ) -> str: 

619 if check_paths and delete.old.path == add.new.path: 

620 # If the paths match, this must be a split modify, so make sure it 

621 # comes out as a modify. 

622 return CHANGE_MODIFY 

623 elif delete.type != CHANGE_DELETE: 

624 # If it's in deletes but not marked as a delete, it must have been 

625 # added due to find_copies_harder, and needs to be marked as a 

626 # copy. 

627 return CHANGE_COPY 

628 return CHANGE_RENAME 

629 

630 def _find_content_rename_candidates(self) -> None: 

631 candidates = self._candidates = [] 

632 # TODO: Optimizations: 

633 # - Compare object sizes before counting blocks. 

634 # - Skip if delete's S_IFMT differs from all adds. 

635 # - Skip if adds or deletes is empty. 

636 # Match C git's behavior of not attempting to find content renames if 

637 # the matrix size exceeds the threshold. 

638 if not self._should_find_content_renames(): 

639 return 

640 

641 block_cache = {} 

642 check_paths = self._rename_threshold is not None 

643 for delete in self._deletes: 

644 if S_ISGITLINK(delete.old.mode): 

645 continue # Git links don't exist in this repo. 

646 old_sha = delete.old.sha 

647 old_obj = self._store[old_sha] 

648 block_cache[old_sha] = _count_blocks(old_obj) 

649 for add in self._adds: 

650 if stat.S_IFMT(delete.old.mode) != stat.S_IFMT(add.new.mode): 

651 continue 

652 new_obj = self._store[add.new.sha] 

653 score = _similarity_score(old_obj, new_obj, block_cache=block_cache) 

654 if score > self._rename_threshold: 

655 new_type = self._rename_type(check_paths, delete, add) 

656 rename = TreeChange(new_type, delete.old, add.new) 

657 candidates.append((-score, rename)) 

658 

659 def _choose_content_renames(self) -> None: 

660 # Sort scores from highest to lowest, but keep names in ascending 

661 # order. 

662 self._candidates.sort() 

663 

664 delete_paths = set() 

665 add_paths = set() 

666 for _, change in self._candidates: 

667 new_path = change.new.path 

668 if new_path in add_paths: 

669 continue 

670 old_path = change.old.path 

671 orig_type = change.type 

672 if old_path in delete_paths: 

673 change = TreeChange(CHANGE_COPY, change.old, change.new) 

674 

675 # If the candidate was originally a copy, that means it came from a 

676 # modified or unchanged path, so we don't want to prune it. 

677 if orig_type != CHANGE_COPY: 

678 delete_paths.add(old_path) 

679 add_paths.add(new_path) 

680 self._changes.append(change) 

681 self._prune(add_paths, delete_paths) 

682 

683 def _join_modifies(self) -> None: 

684 if self._rewrite_threshold is None: 

685 return 

686 

687 modifies = {} 

688 delete_map = {d.old.path: d for d in self._deletes} 

689 for add in self._adds: 

690 path = add.new.path 

691 delete = delete_map.get(path) 

692 if delete is not None and stat.S_IFMT(delete.old.mode) == stat.S_IFMT( 

693 add.new.mode 

694 ): 

695 modifies[path] = TreeChange(CHANGE_MODIFY, delete.old, add.new) 

696 

697 self._adds = [a for a in self._adds if a.new.path not in modifies] 

698 self._deletes = [a for a in self._deletes if a.new.path not in modifies] 

699 self._changes += modifies.values() 

700 

701 def _sorted_changes(self) -> list[TreeChange]: 

702 result = [] 

703 result.extend(self._adds) 

704 result.extend(self._deletes) 

705 result.extend(self._changes) 

706 result.sort(key=_tree_change_key) 

707 return result 

708 

709 def _prune_unchanged(self) -> None: 

710 if self._want_unchanged: 

711 return 

712 self._deletes = [d for d in self._deletes if d.type != CHANGE_UNCHANGED] 

713 

714 def changes_with_renames( 

715 self, 

716 tree1_id: Optional[ObjectID], 

717 tree2_id: Optional[ObjectID], 

718 want_unchanged: bool = False, 

719 include_trees: bool = False, 

720 ) -> list[TreeChange]: 

721 """Iterate TreeChanges between two tree SHAs, with rename detection.""" 

722 self._reset() 

723 self._want_unchanged = want_unchanged 

724 self._include_trees = include_trees 

725 self._collect_changes(tree1_id, tree2_id) 

726 self._find_exact_renames() 

727 self._find_content_rename_candidates() 

728 self._choose_content_renames() 

729 self._join_modifies() 

730 self._prune_unchanged() 

731 return self._sorted_changes() 

732 

733 

734# Hold on to the pure-python implementations for testing. 

735_is_tree_py = _is_tree 

736_merge_entries_py = _merge_entries 

737_count_blocks_py = _count_blocks 

738 

739if TYPE_CHECKING: 

740 # For type checking, use the Python implementations 

741 pass 

742else: 

743 # At runtime, try to import Rust extensions 

744 try: 

745 # Try to import Rust versions 

746 from dulwich._diff_tree import ( 

747 _count_blocks as _rust_count_blocks, 

748 ) 

749 from dulwich._diff_tree import ( 

750 _is_tree as _rust_is_tree, 

751 ) 

752 from dulwich._diff_tree import ( 

753 _merge_entries as _rust_merge_entries, 

754 ) 

755 

756 # Override with Rust versions 

757 _count_blocks = _rust_count_blocks 

758 _is_tree = _rust_is_tree 

759 _merge_entries = _rust_merge_entries 

760 except ImportError: 

761 pass