1# diff_tree.py -- Utilities for diffing files and trees.
2# Copyright (C) 2010 Google, Inc.
3#
4# SPDX-License-Identifier: Apache-2.0 OR GPL-2.0-or-later
5# Dulwich is dual-licensed under the Apache License, Version 2.0 and the GNU
6# General Public License as published by the Free Software Foundation; version 2.0
7# or (at your option) any later version. You can redistribute it and/or
8# modify it under the terms of either of these two licenses.
9#
10# Unless required by applicable law or agreed to in writing, software
11# distributed under the License is distributed on an "AS IS" BASIS,
12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13# See the License for the specific language governing permissions and
14# limitations under the License.
15#
16# You should have received a copy of the licenses; if not, see
17# <http://www.gnu.org/licenses/> for a copy of the GNU General Public License
18# and <http://www.apache.org/licenses/LICENSE-2.0> for a copy of the Apache
19# License, Version 2.0.
20#
21
22"""Utilities for diffing files and trees."""
23
24import stat
25from collections import defaultdict, namedtuple
26from collections.abc import Iterator
27from io import BytesIO
28from itertools import chain
29from typing import TYPE_CHECKING, Any, Callable, Optional, TypeVar
30
31from .object_store import BaseObjectStore
32from .objects import S_ISGITLINK, ObjectID, ShaFile, Tree, TreeEntry
33
34# TreeChange type constants.
35CHANGE_ADD = "add"
36CHANGE_MODIFY = "modify"
37CHANGE_DELETE = "delete"
38CHANGE_RENAME = "rename"
39CHANGE_COPY = "copy"
40CHANGE_UNCHANGED = "unchanged"
41
42RENAME_CHANGE_TYPES = (CHANGE_RENAME, CHANGE_COPY)
43
44_NULL_ENTRY = TreeEntry(None, None, None)
45
46_MAX_SCORE = 100
47RENAME_THRESHOLD = 60
48MAX_FILES = 200
49REWRITE_THRESHOLD: Optional[int] = None
50
51
52class TreeChange(namedtuple("TreeChange", ["type", "old", "new"])):
53 """Named tuple a single change between two trees."""
54
55 @classmethod
56 def add(cls, new: TreeEntry) -> "TreeChange":
57 """Create a TreeChange for an added entry.
58
59 Args:
60 new: New tree entry
61
62 Returns:
63 TreeChange instance
64 """
65 return cls(CHANGE_ADD, _NULL_ENTRY, new)
66
67 @classmethod
68 def delete(cls, old: TreeEntry) -> "TreeChange":
69 """Create a TreeChange for a deleted entry.
70
71 Args:
72 old: Old tree entry
73
74 Returns:
75 TreeChange instance
76 """
77 return cls(CHANGE_DELETE, old, _NULL_ENTRY)
78
79
80def _tree_entries(path: bytes, tree: Tree) -> list[TreeEntry]:
81 result: list[TreeEntry] = []
82 if not tree:
83 return result
84 for entry in tree.iteritems(name_order=True):
85 result.append(entry.in_path(path))
86 return result
87
88
89def _merge_entries(
90 path: bytes, tree1: Tree, tree2: Tree
91) -> list[tuple[TreeEntry, TreeEntry]]:
92 """Merge the entries of two trees.
93
94 Args:
95 path: A path to prepend to all tree entry names.
96 tree1: The first Tree object to iterate, or None.
97 tree2: The second Tree object to iterate, or None.
98
99 Returns:
100 A list of pairs of TreeEntry objects for each pair of entries in
101 the trees. If an entry exists in one tree but not the other, the other
102 entry will have all attributes set to None. If neither entry's path is
103 None, they are guaranteed to match.
104 """
105 entries1 = _tree_entries(path, tree1)
106 entries2 = _tree_entries(path, tree2)
107 i1 = i2 = 0
108 len1 = len(entries1)
109 len2 = len(entries2)
110
111 result = []
112 while i1 < len1 and i2 < len2:
113 entry1 = entries1[i1]
114 entry2 = entries2[i2]
115 if entry1.path < entry2.path:
116 result.append((entry1, _NULL_ENTRY))
117 i1 += 1
118 elif entry1.path > entry2.path:
119 result.append((_NULL_ENTRY, entry2))
120 i2 += 1
121 else:
122 result.append((entry1, entry2))
123 i1 += 1
124 i2 += 1
125 for i in range(i1, len1):
126 result.append((entries1[i], _NULL_ENTRY))
127 for i in range(i2, len2):
128 result.append((_NULL_ENTRY, entries2[i]))
129 return result
130
131
132def _is_tree(entry: TreeEntry) -> bool:
133 mode = entry.mode
134 if mode is None:
135 return False
136 return stat.S_ISDIR(mode)
137
138
139def walk_trees(
140 store: BaseObjectStore,
141 tree1_id: Optional[ObjectID],
142 tree2_id: Optional[ObjectID],
143 prune_identical: bool = False,
144 paths: Optional[list[bytes]] = None,
145) -> Iterator[tuple[TreeEntry, TreeEntry]]:
146 """Recursively walk all the entries of two trees.
147
148 Iteration is depth-first pre-order, as in e.g. os.walk.
149
150 Args:
151 store: An ObjectStore for looking up objects.
152 tree1_id: The SHA of the first Tree object to iterate, or None.
153 tree2_id: The SHA of the second Tree object to iterate, or None.
154 prune_identical: If True, identical subtrees will not be walked.
155 paths: Optional list of paths to filter to (as bytes).
156
157 Returns:
158 Iterator over Pairs of TreeEntry objects for each pair of entries
159 in the trees and their subtrees recursively. If an entry exists in one
160 tree but not the other, the other entry will have all attributes set
161 to None. If neither entry's path is None, they are guaranteed to
162 match.
163 """
164 # This could be fairly easily generalized to >2 trees if we find a use
165 # case.
166 mode1 = (tree1_id and stat.S_IFDIR) or None
167 mode2 = (tree2_id and stat.S_IFDIR) or None
168 todo = [(TreeEntry(b"", mode1, tree1_id), TreeEntry(b"", mode2, tree2_id))]
169 while todo:
170 entry1, entry2 = todo.pop()
171 is_tree1 = _is_tree(entry1)
172 is_tree2 = _is_tree(entry2)
173 if prune_identical and is_tree1 and is_tree2 and entry1 == entry2:
174 continue
175
176 tree1 = (is_tree1 and store[entry1.sha]) or None
177 tree2 = (is_tree2 and store[entry2.sha]) or None
178 path = entry1.path or entry2.path
179
180 # If we have path filters, check if we should process this tree
181 if paths is not None and (is_tree1 or is_tree2):
182 # Special case for root tree
183 if path == b"":
184 should_recurse = True
185 else:
186 # Check if any of our filter paths could be under this tree
187 should_recurse = False
188 for filter_path in paths:
189 if filter_path == path:
190 # Exact match - we want this directory itself
191 should_recurse = True
192 break
193 elif filter_path.startswith(path + b"/"):
194 # Filter path is under this directory
195 should_recurse = True
196 break
197 elif path.startswith(filter_path + b"/"):
198 # This directory is under a filter path
199 should_recurse = True
200 break
201 if not should_recurse:
202 # Skip this tree entirely
203 continue
204
205 # Ensure trees are Tree objects before merging
206 if tree1 is not None and not isinstance(tree1, Tree):
207 tree1 = None
208 if tree2 is not None and not isinstance(tree2, Tree):
209 tree2 = None
210
211 if tree1 is not None or tree2 is not None:
212 # Use empty trees for None values
213 if tree1 is None:
214 tree1 = Tree()
215 if tree2 is None:
216 tree2 = Tree()
217 todo.extend(reversed(_merge_entries(path, tree1, tree2)))
218
219 # Only yield entries that match our path filters
220 if paths is None:
221 yield entry1, entry2
222 else:
223 # Check if this entry matches any of our filters
224 for filter_path in paths:
225 if path == filter_path:
226 # Exact match
227 yield entry1, entry2
228 break
229 elif path.startswith(filter_path + b"/"):
230 # This entry is under a filter directory
231 yield entry1, entry2
232 break
233 elif filter_path.startswith(path + b"/") and (is_tree1 or is_tree2):
234 # This is a parent directory of a filter path
235 yield entry1, entry2
236 break
237
238
239def _skip_tree(entry: TreeEntry, include_trees: bool) -> TreeEntry:
240 if entry.mode is None or (not include_trees and stat.S_ISDIR(entry.mode)):
241 return _NULL_ENTRY
242 return entry
243
244
245def tree_changes(
246 store: BaseObjectStore,
247 tree1_id: Optional[ObjectID],
248 tree2_id: Optional[ObjectID],
249 want_unchanged: bool = False,
250 rename_detector: Optional["RenameDetector"] = None,
251 include_trees: bool = False,
252 change_type_same: bool = False,
253 paths: Optional[list[bytes]] = None,
254) -> Iterator[TreeChange]:
255 """Find the differences between the contents of two trees.
256
257 Args:
258 store: An ObjectStore for looking up objects.
259 tree1_id: The SHA of the source tree.
260 tree2_id: The SHA of the target tree.
261 want_unchanged: If True, include TreeChanges for unmodified entries
262 as well.
263 include_trees: Whether to include trees
264 rename_detector: RenameDetector object for detecting renames.
265 change_type_same: Whether to report change types in the same
266 entry or as delete+add.
267 paths: Optional list of paths to filter to (as bytes).
268
269 Returns:
270 Iterator over TreeChange instances for each change between the
271 source and target tree.
272 """
273 if rename_detector is not None and tree1_id is not None and tree2_id is not None:
274 yield from rename_detector.changes_with_renames(
275 tree1_id,
276 tree2_id,
277 want_unchanged=want_unchanged,
278 include_trees=include_trees,
279 )
280 return
281
282 entries = walk_trees(
283 store, tree1_id, tree2_id, prune_identical=(not want_unchanged), paths=paths
284 )
285 for entry1, entry2 in entries:
286 if entry1 == entry2 and not want_unchanged:
287 continue
288
289 # Treat entries for trees as missing.
290 entry1 = _skip_tree(entry1, include_trees)
291 entry2 = _skip_tree(entry2, include_trees)
292
293 if entry1 != _NULL_ENTRY and entry2 != _NULL_ENTRY:
294 if (
295 stat.S_IFMT(entry1.mode) != stat.S_IFMT(entry2.mode)
296 and not change_type_same
297 ):
298 # File type changed: report as delete/add.
299 yield TreeChange.delete(entry1)
300 entry1 = _NULL_ENTRY
301 change_type = CHANGE_ADD
302 elif entry1 == entry2:
303 change_type = CHANGE_UNCHANGED
304 else:
305 change_type = CHANGE_MODIFY
306 elif entry1 != _NULL_ENTRY:
307 change_type = CHANGE_DELETE
308 elif entry2 != _NULL_ENTRY:
309 change_type = CHANGE_ADD
310 else:
311 # Both were None because at least one was a tree.
312 continue
313 yield TreeChange(change_type, entry1, entry2)
314
315
316T = TypeVar("T")
317U = TypeVar("U")
318
319
320def _all_eq(seq: list[T], key: Callable[[T], U], value: U) -> bool:
321 for e in seq:
322 if key(e) != value:
323 return False
324 return True
325
326
327def _all_same(seq: list[Any], key: Callable[[Any], Any]) -> bool:
328 return _all_eq(seq[1:], key, key(seq[0]))
329
330
331def tree_changes_for_merge(
332 store: BaseObjectStore,
333 parent_tree_ids: list[ObjectID],
334 tree_id: ObjectID,
335 rename_detector: Optional["RenameDetector"] = None,
336) -> Iterator[list[Optional[TreeChange]]]:
337 """Get the tree changes for a merge tree relative to all its parents.
338
339 Args:
340 store: An ObjectStore for looking up objects.
341 parent_tree_ids: An iterable of the SHAs of the parent trees.
342 tree_id: The SHA of the merge tree.
343 rename_detector: RenameDetector object for detecting renames.
344
345 Returns:
346 Iterator over lists of TreeChange objects, one per conflicted path
347 in the merge.
348
349 Each list contains one element per parent, with the TreeChange for that
350 path relative to that parent. An element may be None if it never
351 existed in one parent and was deleted in two others.
352
353 A path is only included in the output if it is a conflict, i.e. its SHA
354 in the merge tree is not found in any of the parents, or in the case of
355 deletes, if not all of the old SHAs match.
356 """
357 all_parent_changes = [
358 tree_changes(store, t, tree_id, rename_detector=rename_detector)
359 for t in parent_tree_ids
360 ]
361 num_parents = len(parent_tree_ids)
362 changes_by_path: dict[str, list[Optional[TreeChange]]] = defaultdict(
363 lambda: [None] * num_parents
364 )
365
366 # Organize by path.
367 for i, parent_changes in enumerate(all_parent_changes):
368 for change in parent_changes:
369 if change.type == CHANGE_DELETE:
370 path = change.old.path
371 else:
372 path = change.new.path
373 changes_by_path[path][i] = change
374
375 def old_sha(c: TreeChange) -> Optional[ObjectID]:
376 return c.old.sha
377
378 def change_type(c: TreeChange) -> str:
379 return c.type
380
381 # Yield only conflicting changes.
382 for _, changes in sorted(changes_by_path.items()):
383 assert len(changes) == num_parents
384 have = [c for c in changes if c is not None]
385 if _all_eq(have, change_type, CHANGE_DELETE):
386 if not _all_same(have, old_sha):
387 yield changes
388 elif not _all_same(have, change_type):
389 yield changes
390 elif None not in changes:
391 # If no change was found relative to one parent, that means the SHA
392 # must have matched the SHA in that parent, so it is not a
393 # conflict.
394 yield changes
395
396
397_BLOCK_SIZE = 64
398
399
400def _count_blocks(obj: ShaFile) -> dict[int, int]:
401 """Count the blocks in an object.
402
403 Splits the data into blocks either on lines or <=64-byte chunks of lines.
404
405 Args:
406 obj: The object to count blocks for.
407
408 Returns:
409 A dict of block hashcode -> total bytes occurring.
410 """
411 block_counts: dict[int, int] = defaultdict(int)
412 block = BytesIO()
413 n = 0
414
415 # Cache attrs as locals to avoid expensive lookups in the inner loop.
416 block_write = block.write
417 block_seek = block.seek
418 block_truncate = block.truncate
419 block_getvalue = block.getvalue
420
421 for c in chain.from_iterable(obj.as_raw_chunks()):
422 cb = c.to_bytes(1, "big")
423 block_write(cb)
424 n += 1
425 if cb == b"\n" or n == _BLOCK_SIZE:
426 value = block_getvalue()
427 block_counts[hash(value)] += len(value)
428 block_seek(0)
429 block_truncate()
430 n = 0
431 if n > 0:
432 last_block = block_getvalue()
433 block_counts[hash(last_block)] += len(last_block)
434 return block_counts
435
436
437def _common_bytes(blocks1: dict[int, int], blocks2: dict[int, int]) -> int:
438 """Count the number of common bytes in two block count dicts.
439
440 Args:
441 blocks1: The first dict of block hashcode -> total bytes.
442 blocks2: The second dict of block hashcode -> total bytes.
443
444 Returns:
445 The number of bytes in common between blocks1 and blocks2. This is
446 only approximate due to possible hash collisions.
447 """
448 # Iterate over the smaller of the two dicts, since this is symmetrical.
449 if len(blocks1) > len(blocks2):
450 blocks1, blocks2 = blocks2, blocks1
451 score = 0
452 for block, count1 in blocks1.items():
453 count2 = blocks2.get(block)
454 if count2:
455 score += min(count1, count2)
456 return score
457
458
459def _similarity_score(
460 obj1: ShaFile,
461 obj2: ShaFile,
462 block_cache: Optional[dict[ObjectID, dict[int, int]]] = None,
463) -> int:
464 """Compute a similarity score for two objects.
465
466 Args:
467 obj1: The first object to score.
468 obj2: The second object to score.
469 block_cache: An optional dict of SHA to block counts to cache
470 results between calls.
471
472 Returns:
473 The similarity score between the two objects, defined as the
474 number of bytes in common between the two objects divided by the
475 maximum size, scaled to the range 0-100.
476 """
477 if block_cache is None:
478 block_cache = {}
479 if obj1.id not in block_cache:
480 block_cache[obj1.id] = _count_blocks(obj1)
481 if obj2.id not in block_cache:
482 block_cache[obj2.id] = _count_blocks(obj2)
483
484 common_bytes = _common_bytes(block_cache[obj1.id], block_cache[obj2.id])
485 max_size = max(obj1.raw_length(), obj2.raw_length())
486 if not max_size:
487 return _MAX_SCORE
488 return int(float(common_bytes) * _MAX_SCORE / max_size)
489
490
491def _tree_change_key(entry: TreeChange) -> tuple[bytes, bytes]:
492 # Sort by old path then new path. If only one exists, use it for both keys.
493 path1 = entry.old.path
494 path2 = entry.new.path
495 if path1 is None:
496 path1 = path2
497 if path2 is None:
498 path2 = path1
499 return (path1, path2)
500
501
502class RenameDetector:
503 """Object for handling rename detection between two trees."""
504
505 _adds: list[TreeChange]
506 _deletes: list[TreeChange]
507 _changes: list[TreeChange]
508 _candidates: list[tuple[int, TreeChange]]
509
510 def __init__(
511 self,
512 store: BaseObjectStore,
513 rename_threshold: int = RENAME_THRESHOLD,
514 max_files: Optional[int] = MAX_FILES,
515 rewrite_threshold: Optional[int] = REWRITE_THRESHOLD,
516 find_copies_harder: bool = False,
517 ) -> None:
518 """Initialize the rename detector.
519
520 Args:
521 store: An ObjectStore for looking up objects.
522 rename_threshold: The threshold similarity score for considering
523 an add/delete pair to be a rename/copy; see _similarity_score.
524 max_files: The maximum number of adds and deletes to consider,
525 or None for no limit. The detector is guaranteed to compare no more
526 than max_files ** 2 add/delete pairs. This limit is provided
527 because rename detection can be quadratic in the project size. If
528 the limit is exceeded, no content rename detection is attempted.
529 rewrite_threshold: The threshold similarity score below which a
530 modify should be considered a delete/add, or None to not break
531 modifies; see _similarity_score.
532 find_copies_harder: If True, consider unmodified files when
533 detecting copies.
534 """
535 self._store = store
536 self._rename_threshold = rename_threshold
537 self._rewrite_threshold = rewrite_threshold
538 self._max_files = max_files
539 self._find_copies_harder = find_copies_harder
540 self._want_unchanged = False
541
542 def _reset(self) -> None:
543 self._adds = []
544 self._deletes = []
545 self._changes = []
546
547 def _should_split(self, change: TreeChange) -> bool:
548 if (
549 self._rewrite_threshold is None
550 or change.type != CHANGE_MODIFY
551 or change.old.sha == change.new.sha
552 ):
553 return False
554 old_obj = self._store[change.old.sha]
555 new_obj = self._store[change.new.sha]
556 return _similarity_score(old_obj, new_obj) < self._rewrite_threshold
557
558 def _add_change(self, change: TreeChange) -> None:
559 if change.type == CHANGE_ADD:
560 self._adds.append(change)
561 elif change.type == CHANGE_DELETE:
562 self._deletes.append(change)
563 elif self._should_split(change):
564 self._deletes.append(TreeChange.delete(change.old))
565 self._adds.append(TreeChange.add(change.new))
566 elif (
567 self._find_copies_harder and change.type == CHANGE_UNCHANGED
568 ) or change.type == CHANGE_MODIFY:
569 # Treat all modifies as potential deletes for rename detection,
570 # but don't split them (to avoid spurious renames). Setting
571 # find_copies_harder means we treat unchanged the same as
572 # modified.
573 self._deletes.append(change)
574 else:
575 self._changes.append(change)
576
577 def _collect_changes(
578 self, tree1_id: Optional[ObjectID], tree2_id: Optional[ObjectID]
579 ) -> None:
580 want_unchanged = self._find_copies_harder or self._want_unchanged
581 for change in tree_changes(
582 self._store,
583 tree1_id,
584 tree2_id,
585 want_unchanged=want_unchanged,
586 include_trees=self._include_trees,
587 ):
588 self._add_change(change)
589
590 def _prune(self, add_paths: set[bytes], delete_paths: set[bytes]) -> None:
591 self._adds = [a for a in self._adds if a.new.path not in add_paths]
592 self._deletes = [d for d in self._deletes if d.old.path not in delete_paths]
593
594 def _find_exact_renames(self) -> None:
595 add_map = defaultdict(list)
596 for add in self._adds:
597 add_map[add.new.sha].append(add.new)
598 delete_map = defaultdict(list)
599 for delete in self._deletes:
600 # Keep track of whether the delete was actually marked as a delete.
601 # If not, it needs to be marked as a copy.
602 is_delete = delete.type == CHANGE_DELETE
603 delete_map[delete.old.sha].append((delete.old, is_delete))
604
605 add_paths = set()
606 delete_paths = set()
607 for sha, sha_deletes in delete_map.items():
608 sha_adds = add_map[sha]
609 for (old, is_delete), new in zip(sha_deletes, sha_adds):
610 if stat.S_IFMT(old.mode) != stat.S_IFMT(new.mode):
611 continue
612 if is_delete:
613 delete_paths.add(old.path)
614 add_paths.add(new.path)
615 new_type = (is_delete and CHANGE_RENAME) or CHANGE_COPY
616 self._changes.append(TreeChange(new_type, old, new))
617
618 num_extra_adds = len(sha_adds) - len(sha_deletes)
619 # TODO(dborowitz): Less arbitrary way of dealing with extra copies.
620 old = sha_deletes[0][0]
621 if num_extra_adds > 0:
622 for new in sha_adds[-num_extra_adds:]:
623 add_paths.add(new.path)
624 self._changes.append(TreeChange(CHANGE_COPY, old, new))
625 self._prune(add_paths, delete_paths)
626
627 def _should_find_content_renames(self) -> bool:
628 if self._max_files is None:
629 return True
630 return len(self._adds) * len(self._deletes) <= self._max_files**2
631
632 def _rename_type(
633 self, check_paths: bool, delete: TreeChange, add: TreeChange
634 ) -> str:
635 if check_paths and delete.old.path == add.new.path:
636 # If the paths match, this must be a split modify, so make sure it
637 # comes out as a modify.
638 return CHANGE_MODIFY
639 elif delete.type != CHANGE_DELETE:
640 # If it's in deletes but not marked as a delete, it must have been
641 # added due to find_copies_harder, and needs to be marked as a
642 # copy.
643 return CHANGE_COPY
644 return CHANGE_RENAME
645
646 def _find_content_rename_candidates(self) -> None:
647 candidates = self._candidates = []
648 # TODO: Optimizations:
649 # - Compare object sizes before counting blocks.
650 # - Skip if delete's S_IFMT differs from all adds.
651 # - Skip if adds or deletes is empty.
652 # Match C git's behavior of not attempting to find content renames if
653 # the matrix size exceeds the threshold.
654 if not self._should_find_content_renames():
655 return
656
657 block_cache = {}
658 check_paths = self._rename_threshold is not None
659 for delete in self._deletes:
660 if S_ISGITLINK(delete.old.mode):
661 continue # Git links don't exist in this repo.
662 old_sha = delete.old.sha
663 old_obj = self._store[old_sha]
664 block_cache[old_sha] = _count_blocks(old_obj)
665 for add in self._adds:
666 if stat.S_IFMT(delete.old.mode) != stat.S_IFMT(add.new.mode):
667 continue
668 new_obj = self._store[add.new.sha]
669 score = _similarity_score(old_obj, new_obj, block_cache=block_cache)
670 if score > self._rename_threshold:
671 new_type = self._rename_type(check_paths, delete, add)
672 rename = TreeChange(new_type, delete.old, add.new)
673 candidates.append((-score, rename))
674
675 def _choose_content_renames(self) -> None:
676 # Sort scores from highest to lowest, but keep names in ascending
677 # order.
678 self._candidates.sort()
679
680 delete_paths = set()
681 add_paths = set()
682 for _, change in self._candidates:
683 new_path = change.new.path
684 if new_path in add_paths:
685 continue
686 old_path = change.old.path
687 orig_type = change.type
688 if old_path in delete_paths:
689 change = TreeChange(CHANGE_COPY, change.old, change.new)
690
691 # If the candidate was originally a copy, that means it came from a
692 # modified or unchanged path, so we don't want to prune it.
693 if orig_type != CHANGE_COPY:
694 delete_paths.add(old_path)
695 add_paths.add(new_path)
696 self._changes.append(change)
697 self._prune(add_paths, delete_paths)
698
699 def _join_modifies(self) -> None:
700 if self._rewrite_threshold is None:
701 return
702
703 modifies = {}
704 delete_map = {d.old.path: d for d in self._deletes}
705 for add in self._adds:
706 path = add.new.path
707 delete = delete_map.get(path)
708 if delete is not None and stat.S_IFMT(delete.old.mode) == stat.S_IFMT(
709 add.new.mode
710 ):
711 modifies[path] = TreeChange(CHANGE_MODIFY, delete.old, add.new)
712
713 self._adds = [a for a in self._adds if a.new.path not in modifies]
714 self._deletes = [a for a in self._deletes if a.new.path not in modifies]
715 self._changes += modifies.values()
716
717 def _sorted_changes(self) -> list[TreeChange]:
718 result = []
719 result.extend(self._adds)
720 result.extend(self._deletes)
721 result.extend(self._changes)
722 result.sort(key=_tree_change_key)
723 return result
724
725 def _prune_unchanged(self) -> None:
726 if self._want_unchanged:
727 return
728 self._deletes = [d for d in self._deletes if d.type != CHANGE_UNCHANGED]
729
730 def changes_with_renames(
731 self,
732 tree1_id: Optional[ObjectID],
733 tree2_id: Optional[ObjectID],
734 want_unchanged: bool = False,
735 include_trees: bool = False,
736 ) -> list[TreeChange]:
737 """Iterate TreeChanges between two tree SHAs, with rename detection."""
738 self._reset()
739 self._want_unchanged = want_unchanged
740 self._include_trees = include_trees
741 self._collect_changes(tree1_id, tree2_id)
742 self._find_exact_renames()
743 self._find_content_rename_candidates()
744 self._choose_content_renames()
745 self._join_modifies()
746 self._prune_unchanged()
747 return self._sorted_changes()
748
749
750# Hold on to the pure-python implementations for testing.
751_is_tree_py = _is_tree
752_merge_entries_py = _merge_entries
753_count_blocks_py = _count_blocks
754
755if TYPE_CHECKING:
756 # For type checking, use the Python implementations
757 pass
758else:
759 # At runtime, try to import Rust extensions
760 try:
761 # Try to import Rust versions
762 from dulwich._diff_tree import (
763 _count_blocks as _rust_count_blocks,
764 )
765 from dulwich._diff_tree import (
766 _is_tree as _rust_is_tree,
767 )
768 from dulwich._diff_tree import (
769 _merge_entries as _rust_merge_entries,
770 )
771
772 # Override with Rust versions
773 _count_blocks = _rust_count_blocks
774 _is_tree = _rust_is_tree
775 _merge_entries = _rust_merge_entries
776 except ImportError:
777 pass