1# diff_tree.py -- Utilities for diffing files and trees.
2# Copyright (C) 2010 Google, Inc.
3#
4# SPDX-License-Identifier: Apache-2.0 OR GPL-2.0-or-later
5# Dulwich is dual-licensed under the Apache License, Version 2.0 and the GNU
6# General Public License as published by the Free Software Foundation; version 2.0
7# or (at your option) any later version. You can redistribute it and/or
8# modify it under the terms of either of these two licenses.
9#
10# Unless required by applicable law or agreed to in writing, software
11# distributed under the License is distributed on an "AS IS" BASIS,
12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13# See the License for the specific language governing permissions and
14# limitations under the License.
15#
16# You should have received a copy of the licenses; if not, see
17# <http://www.gnu.org/licenses/> for a copy of the GNU General Public License
18# and <http://www.apache.org/licenses/LICENSE-2.0> for a copy of the Apache
19# License, Version 2.0.
20#
21
22"""Utilities for diffing files and trees."""
23
24import stat
25from collections import defaultdict
26from collections.abc import Iterator, Mapping, Sequence
27from collections.abc import Set as AbstractSet
28from io import BytesIO
29from itertools import chain
30from typing import TYPE_CHECKING, Any, Callable, NamedTuple, Optional, TypeVar
31
32from .object_store import BaseObjectStore
33from .objects import S_ISGITLINK, ObjectID, ShaFile, Tree, TreeEntry
34
35# TreeChange type constants.
36CHANGE_ADD = "add"
37CHANGE_MODIFY = "modify"
38CHANGE_DELETE = "delete"
39CHANGE_RENAME = "rename"
40CHANGE_COPY = "copy"
41CHANGE_UNCHANGED = "unchanged"
42
43RENAME_CHANGE_TYPES = (CHANGE_RENAME, CHANGE_COPY)
44
45# _NULL_ENTRY removed - using None instead
46
47_MAX_SCORE = 100
48RENAME_THRESHOLD = 60
49MAX_FILES = 200
50REWRITE_THRESHOLD: Optional[int] = None
51
52
53class TreeChange(NamedTuple):
54 """Named tuple a single change between two trees."""
55
56 type: str
57 old: Optional[TreeEntry]
58 new: Optional[TreeEntry]
59
60 @classmethod
61 def add(cls, new: TreeEntry) -> "TreeChange":
62 """Create a TreeChange for an added entry.
63
64 Args:
65 new: New tree entry
66
67 Returns:
68 TreeChange instance
69 """
70 return cls(CHANGE_ADD, None, new)
71
72 @classmethod
73 def delete(cls, old: TreeEntry) -> "TreeChange":
74 """Create a TreeChange for a deleted entry.
75
76 Args:
77 old: Old tree entry
78
79 Returns:
80 TreeChange instance
81 """
82 return cls(CHANGE_DELETE, old, None)
83
84
85def _tree_entries(path: bytes, tree: Tree) -> list[TreeEntry]:
86 result: list[TreeEntry] = []
87 if not tree:
88 return result
89 for entry in tree.iteritems(name_order=True):
90 result.append(entry.in_path(path))
91 return result
92
93
94def _merge_entries(
95 path: bytes, tree1: Tree, tree2: Tree
96) -> list[tuple[Optional[TreeEntry], Optional[TreeEntry]]]:
97 """Merge the entries of two trees.
98
99 Args:
100 path: A path to prepend to all tree entry names.
101 tree1: The first Tree object to iterate, or None.
102 tree2: The second Tree object to iterate, or None.
103
104 Returns:
105 A list of pairs of TreeEntry objects for each pair of entries in
106 the trees. If an entry exists in one tree but not the other, the other
107 entry will be None. If both entries exist, they are guaranteed to match.
108 """
109 entries1 = _tree_entries(path, tree1)
110 entries2 = _tree_entries(path, tree2)
111 i1 = i2 = 0
112 len1 = len(entries1)
113 len2 = len(entries2)
114
115 result: list[tuple[Optional[TreeEntry], Optional[TreeEntry]]] = []
116 while i1 < len1 and i2 < len2:
117 entry1 = entries1[i1]
118 entry2 = entries2[i2]
119 if entry1.path < entry2.path:
120 result.append((entry1, None))
121 i1 += 1
122 elif entry1.path > entry2.path:
123 result.append((None, entry2))
124 i2 += 1
125 else:
126 result.append((entry1, entry2))
127 i1 += 1
128 i2 += 1
129 for i in range(i1, len1):
130 result.append((entries1[i], None))
131 for i in range(i2, len2):
132 result.append((None, entries2[i]))
133 return result
134
135
136def _is_tree(entry: Optional[TreeEntry]) -> bool:
137 if entry is None or entry.mode is None:
138 return False
139 return stat.S_ISDIR(entry.mode)
140
141
142def walk_trees(
143 store: BaseObjectStore,
144 tree1_id: Optional[ObjectID],
145 tree2_id: Optional[ObjectID],
146 prune_identical: bool = False,
147 paths: Optional[Sequence[bytes]] = None,
148) -> Iterator[tuple[Optional[TreeEntry], Optional[TreeEntry]]]:
149 """Recursively walk all the entries of two trees.
150
151 Iteration is depth-first pre-order, as in e.g. os.walk.
152
153 Args:
154 store: An ObjectStore for looking up objects.
155 tree1_id: The SHA of the first Tree object to iterate, or None.
156 tree2_id: The SHA of the second Tree object to iterate, or None.
157 prune_identical: If True, identical subtrees will not be walked.
158 paths: Optional list of paths to filter to (as bytes).
159
160 Returns:
161 Iterator over Pairs of TreeEntry objects for each pair of entries
162 in the trees and their subtrees recursively. If an entry exists in one
163 tree but not the other, the other entry will be None. If both entries
164 exist, they are guaranteed to match.
165 """
166 # This could be fairly easily generalized to >2 trees if we find a use
167 # case.
168 entry1 = TreeEntry(b"", stat.S_IFDIR, tree1_id) if tree1_id else None
169 entry2 = TreeEntry(b"", stat.S_IFDIR, tree2_id) if tree2_id else None
170 todo: list[tuple[Optional[TreeEntry], Optional[TreeEntry]]] = [(entry1, entry2)]
171 while todo:
172 entry1, entry2 = todo.pop()
173 is_tree1 = _is_tree(entry1)
174 is_tree2 = _is_tree(entry2)
175 if prune_identical and is_tree1 and is_tree2 and entry1 == entry2:
176 continue
177
178 tree1 = (is_tree1 and entry1 and store[entry1.sha]) or None
179 tree2 = (is_tree2 and entry2 and store[entry2.sha]) or None
180 path = (
181 (entry1.path if entry1 else None)
182 or (entry2.path if entry2 else None)
183 or b""
184 )
185
186 # If we have path filters, check if we should process this tree
187 if paths is not None and (is_tree1 or is_tree2) and path is not None:
188 # Special case for root tree
189 if path == b"":
190 should_recurse = True
191 else:
192 # Check if any of our filter paths could be under this tree
193 should_recurse = False
194 for filter_path in paths:
195 if filter_path == path:
196 # Exact match - we want this directory itself
197 should_recurse = True
198 break
199 elif filter_path.startswith(path + b"/"):
200 # Filter path is under this directory
201 should_recurse = True
202 break
203 elif path.startswith(filter_path + b"/"):
204 # This directory is under a filter path
205 should_recurse = True
206 break
207 if not should_recurse:
208 # Skip this tree entirely
209 continue
210
211 # Ensure trees are Tree objects before merging
212 if tree1 is not None and not isinstance(tree1, Tree):
213 tree1 = None
214 if tree2 is not None and not isinstance(tree2, Tree):
215 tree2 = None
216
217 if tree1 is not None or tree2 is not None:
218 # Use empty trees for None values
219 if tree1 is None:
220 tree1 = Tree()
221 if tree2 is None:
222 tree2 = Tree()
223 assert path is not None
224 todo.extend(reversed(_merge_entries(path, tree1, tree2)))
225
226 # Only yield entries that match our path filters
227 if paths is None:
228 yield entry1, entry2
229 else:
230 # Check if this entry matches any of our filters
231 for filter_path in paths:
232 if path == filter_path:
233 # Exact match
234 yield entry1, entry2
235 break
236 elif path is not None and path.startswith(filter_path + b"/"):
237 # This entry is under a filter directory
238 yield entry1, entry2
239 break
240 elif (
241 path is not None
242 and filter_path.startswith(path + b"/")
243 and (is_tree1 or is_tree2)
244 ):
245 # This is a parent directory of a filter path
246 yield entry1, entry2
247 break
248
249
250def _skip_tree(entry: Optional[TreeEntry], include_trees: bool) -> Optional[TreeEntry]:
251 if entry is None or entry.mode is None:
252 return None
253 if not include_trees and stat.S_ISDIR(entry.mode):
254 return None
255 return entry
256
257
258def tree_changes(
259 store: BaseObjectStore,
260 tree1_id: Optional[ObjectID],
261 tree2_id: Optional[ObjectID],
262 want_unchanged: bool = False,
263 rename_detector: Optional["RenameDetector"] = None,
264 include_trees: bool = False,
265 change_type_same: bool = False,
266 paths: Optional[Sequence[bytes]] = None,
267) -> Iterator[TreeChange]:
268 """Find the differences between the contents of two trees.
269
270 Args:
271 store: An ObjectStore for looking up objects.
272 tree1_id: The SHA of the source tree.
273 tree2_id: The SHA of the target tree.
274 want_unchanged: If True, include TreeChanges for unmodified entries
275 as well.
276 include_trees: Whether to include trees
277 rename_detector: RenameDetector object for detecting renames.
278 change_type_same: Whether to report change types in the same
279 entry or as delete+add.
280 paths: Optional list of paths to filter to (as bytes).
281
282 Returns:
283 Iterator over TreeChange instances for each change between the
284 source and target tree.
285 """
286 if rename_detector is not None and tree1_id is not None and tree2_id is not None:
287 yield from rename_detector.changes_with_renames(
288 tree1_id,
289 tree2_id,
290 want_unchanged=want_unchanged,
291 include_trees=include_trees,
292 )
293 return
294
295 entries = walk_trees(
296 store, tree1_id, tree2_id, prune_identical=(not want_unchanged), paths=paths
297 )
298 for entry1, entry2 in entries:
299 if entry1 == entry2 and not want_unchanged:
300 continue
301
302 # Treat entries for trees as missing.
303 entry1 = _skip_tree(entry1, include_trees)
304 entry2 = _skip_tree(entry2, include_trees)
305
306 if entry1 is not None and entry2 is not None:
307 if (
308 entry1.mode is not None
309 and entry2.mode is not None
310 and stat.S_IFMT(entry1.mode) != stat.S_IFMT(entry2.mode)
311 and not change_type_same
312 ):
313 # File type changed: report as delete/add.
314 yield TreeChange.delete(entry1)
315 entry1 = None
316 change_type = CHANGE_ADD
317 elif entry1 == entry2:
318 change_type = CHANGE_UNCHANGED
319 else:
320 change_type = CHANGE_MODIFY
321 elif entry1 is not None:
322 change_type = CHANGE_DELETE
323 elif entry2 is not None:
324 change_type = CHANGE_ADD
325 else:
326 # Both were None because at least one was a tree.
327 continue
328 yield TreeChange(change_type, entry1, entry2)
329
330
331T = TypeVar("T")
332U = TypeVar("U")
333
334
335def _all_eq(seq: Sequence[T], key: Callable[[T], U], value: U) -> bool:
336 for e in seq:
337 if key(e) != value:
338 return False
339 return True
340
341
342def _all_same(seq: Sequence[Any], key: Callable[[Any], Any]) -> bool:
343 return _all_eq(seq[1:], key, key(seq[0]))
344
345
346def tree_changes_for_merge(
347 store: BaseObjectStore,
348 parent_tree_ids: Sequence[ObjectID],
349 tree_id: ObjectID,
350 rename_detector: Optional["RenameDetector"] = None,
351) -> Iterator[list[Optional[TreeChange]]]:
352 """Get the tree changes for a merge tree relative to all its parents.
353
354 Args:
355 store: An ObjectStore for looking up objects.
356 parent_tree_ids: An iterable of the SHAs of the parent trees.
357 tree_id: The SHA of the merge tree.
358 rename_detector: RenameDetector object for detecting renames.
359
360 Returns:
361 Iterator over lists of TreeChange objects, one per conflicted path
362 in the merge.
363
364 Each list contains one element per parent, with the TreeChange for that
365 path relative to that parent. An element may be None if it never
366 existed in one parent and was deleted in two others.
367
368 A path is only included in the output if it is a conflict, i.e. its SHA
369 in the merge tree is not found in any of the parents, or in the case of
370 deletes, if not all of the old SHAs match.
371 """
372 all_parent_changes = [
373 tree_changes(store, t, tree_id, rename_detector=rename_detector)
374 for t in parent_tree_ids
375 ]
376 num_parents = len(parent_tree_ids)
377 changes_by_path: dict[bytes, list[Optional[TreeChange]]] = defaultdict(
378 lambda: [None] * num_parents
379 )
380
381 # Organize by path.
382 for i, parent_changes in enumerate(all_parent_changes):
383 for change in parent_changes:
384 if change.type == CHANGE_DELETE:
385 assert change.old is not None
386 path = change.old.path
387 else:
388 assert change.new is not None
389 path = change.new.path
390 assert path is not None
391 changes_by_path[path][i] = change
392
393 def old_sha(c: TreeChange) -> Optional[ObjectID]:
394 return c.old.sha if c.old is not None else None
395
396 def change_type(c: TreeChange) -> str:
397 return c.type
398
399 # Yield only conflicting changes.
400 for _, changes in sorted(changes_by_path.items()):
401 assert len(changes) == num_parents
402 have = [c for c in changes if c is not None]
403 if _all_eq(have, change_type, CHANGE_DELETE):
404 if not _all_same(have, old_sha):
405 yield changes
406 elif not _all_same(have, change_type):
407 yield changes
408 elif None not in changes:
409 # If no change was found relative to one parent, that means the SHA
410 # must have matched the SHA in that parent, so it is not a
411 # conflict.
412 yield changes
413
414
415_BLOCK_SIZE = 64
416
417
418def _count_blocks(obj: ShaFile) -> dict[int, int]:
419 """Count the blocks in an object.
420
421 Splits the data into blocks either on lines or <=64-byte chunks of lines.
422
423 Args:
424 obj: The object to count blocks for.
425
426 Returns:
427 A dict of block hashcode -> total bytes occurring.
428 """
429 block_counts: dict[int, int] = defaultdict(int)
430 block = BytesIO()
431 n = 0
432
433 # Cache attrs as locals to avoid expensive lookups in the inner loop.
434 block_write = block.write
435 block_seek = block.seek
436 block_truncate = block.truncate
437 block_getvalue = block.getvalue
438
439 for c in chain.from_iterable(obj.as_raw_chunks()):
440 cb = c.to_bytes(1, "big")
441 block_write(cb)
442 n += 1
443 if cb == b"\n" or n == _BLOCK_SIZE:
444 value = block_getvalue()
445 block_counts[hash(value)] += len(value)
446 block_seek(0)
447 block_truncate()
448 n = 0
449 if n > 0:
450 last_block = block_getvalue()
451 block_counts[hash(last_block)] += len(last_block)
452 return block_counts
453
454
455def _common_bytes(blocks1: Mapping[int, int], blocks2: Mapping[int, int]) -> int:
456 """Count the number of common bytes in two block count dicts.
457
458 Args:
459 blocks1: The first dict of block hashcode -> total bytes.
460 blocks2: The second dict of block hashcode -> total bytes.
461
462 Returns:
463 The number of bytes in common between blocks1 and blocks2. This is
464 only approximate due to possible hash collisions.
465 """
466 # Iterate over the smaller of the two dicts, since this is symmetrical.
467 if len(blocks1) > len(blocks2):
468 blocks1, blocks2 = blocks2, blocks1
469 score = 0
470 for block, count1 in blocks1.items():
471 count2 = blocks2.get(block)
472 if count2:
473 score += min(count1, count2)
474 return score
475
476
477def _similarity_score(
478 obj1: ShaFile,
479 obj2: ShaFile,
480 block_cache: Optional[dict[ObjectID, dict[int, int]]] = None,
481) -> int:
482 """Compute a similarity score for two objects.
483
484 Args:
485 obj1: The first object to score.
486 obj2: The second object to score.
487 block_cache: An optional dict of SHA to block counts to cache
488 results between calls.
489
490 Returns:
491 The similarity score between the two objects, defined as the
492 number of bytes in common between the two objects divided by the
493 maximum size, scaled to the range 0-100.
494 """
495 if block_cache is None:
496 block_cache = {}
497 if obj1.id not in block_cache:
498 block_cache[obj1.id] = _count_blocks(obj1)
499 if obj2.id not in block_cache:
500 block_cache[obj2.id] = _count_blocks(obj2)
501
502 common_bytes = _common_bytes(block_cache[obj1.id], block_cache[obj2.id])
503 max_size = max(obj1.raw_length(), obj2.raw_length())
504 if not max_size:
505 return _MAX_SCORE
506 return int(float(common_bytes) * _MAX_SCORE / max_size)
507
508
509def _tree_change_key(entry: TreeChange) -> tuple[bytes, bytes]:
510 # Sort by old path then new path. If only one exists, use it for both keys.
511 path1 = entry.old.path if entry.old is not None else None
512 path2 = entry.new.path if entry.new is not None else None
513 if path1 is None:
514 path1 = path2
515 if path2 is None:
516 path2 = path1
517 assert path1 is not None
518 assert path2 is not None
519 return (path1, path2)
520
521
522class RenameDetector:
523 """Object for handling rename detection between two trees."""
524
525 _adds: list[TreeChange]
526 _deletes: list[TreeChange]
527 _changes: list[TreeChange]
528 _candidates: list[tuple[int, TreeChange]]
529
530 def __init__(
531 self,
532 store: BaseObjectStore,
533 rename_threshold: int = RENAME_THRESHOLD,
534 max_files: Optional[int] = MAX_FILES,
535 rewrite_threshold: Optional[int] = REWRITE_THRESHOLD,
536 find_copies_harder: bool = False,
537 ) -> None:
538 """Initialize the rename detector.
539
540 Args:
541 store: An ObjectStore for looking up objects.
542 rename_threshold: The threshold similarity score for considering
543 an add/delete pair to be a rename/copy; see _similarity_score.
544 max_files: The maximum number of adds and deletes to consider,
545 or None for no limit. The detector is guaranteed to compare no more
546 than max_files ** 2 add/delete pairs. This limit is provided
547 because rename detection can be quadratic in the project size. If
548 the limit is exceeded, no content rename detection is attempted.
549 rewrite_threshold: The threshold similarity score below which a
550 modify should be considered a delete/add, or None to not break
551 modifies; see _similarity_score.
552 find_copies_harder: If True, consider unmodified files when
553 detecting copies.
554 """
555 self._store = store
556 self._rename_threshold = rename_threshold
557 self._rewrite_threshold = rewrite_threshold
558 self._max_files = max_files
559 self._find_copies_harder = find_copies_harder
560 self._want_unchanged = False
561
562 def _reset(self) -> None:
563 self._adds = []
564 self._deletes = []
565 self._changes = []
566
567 def _should_split(self, change: TreeChange) -> bool:
568 if self._rewrite_threshold is None or change.type != CHANGE_MODIFY:
569 return False
570 assert change.old is not None and change.new is not None
571 if change.old.sha == change.new.sha:
572 return False
573 assert change.old.sha is not None
574 assert change.new.sha is not None
575 old_obj = self._store[change.old.sha]
576 new_obj = self._store[change.new.sha]
577 return _similarity_score(old_obj, new_obj) < self._rewrite_threshold
578
579 def _add_change(self, change: TreeChange) -> None:
580 if change.type == CHANGE_ADD:
581 self._adds.append(change)
582 elif change.type == CHANGE_DELETE:
583 self._deletes.append(change)
584 elif self._should_split(change):
585 assert change.old is not None and change.new is not None
586 self._deletes.append(TreeChange.delete(change.old))
587 self._adds.append(TreeChange.add(change.new))
588 elif (
589 self._find_copies_harder and change.type == CHANGE_UNCHANGED
590 ) or change.type == CHANGE_MODIFY:
591 # Treat all modifies as potential deletes for rename detection,
592 # but don't split them (to avoid spurious renames). Setting
593 # find_copies_harder means we treat unchanged the same as
594 # modified.
595 self._deletes.append(change)
596 else:
597 self._changes.append(change)
598
599 def _collect_changes(
600 self, tree1_id: Optional[ObjectID], tree2_id: Optional[ObjectID]
601 ) -> None:
602 want_unchanged = self._find_copies_harder or self._want_unchanged
603 for change in tree_changes(
604 self._store,
605 tree1_id,
606 tree2_id,
607 want_unchanged=want_unchanged,
608 include_trees=self._include_trees,
609 ):
610 self._add_change(change)
611
612 def _prune(
613 self, add_paths: AbstractSet[bytes], delete_paths: AbstractSet[bytes]
614 ) -> None:
615 def check_add(a: TreeChange) -> bool:
616 assert a.new is not None
617 return a.new.path not in add_paths
618
619 def check_delete(d: TreeChange) -> bool:
620 assert d.old is not None
621 return d.old.path not in delete_paths
622
623 self._adds = [a for a in self._adds if check_add(a)]
624 self._deletes = [d for d in self._deletes if check_delete(d)]
625
626 def _find_exact_renames(self) -> None:
627 add_map = defaultdict(list)
628 for add in self._adds:
629 assert add.new is not None
630 add_map[add.new.sha].append(add.new)
631 delete_map = defaultdict(list)
632 for delete in self._deletes:
633 # Keep track of whether the delete was actually marked as a delete.
634 # If not, it needs to be marked as a copy.
635 is_delete = delete.type == CHANGE_DELETE
636 assert delete.old is not None
637 delete_map[delete.old.sha].append((delete.old, is_delete))
638
639 add_paths = set()
640 delete_paths = set()
641 for sha, sha_deletes in delete_map.items():
642 sha_adds = add_map[sha]
643 for (old, is_delete), new in zip(sha_deletes, sha_adds):
644 assert old.mode is not None
645 assert new.mode is not None
646 if stat.S_IFMT(old.mode) != stat.S_IFMT(new.mode):
647 continue
648 if is_delete:
649 assert old.path is not None
650 delete_paths.add(old.path)
651 assert new.path is not None
652 add_paths.add(new.path)
653 new_type = (is_delete and CHANGE_RENAME) or CHANGE_COPY
654 self._changes.append(TreeChange(new_type, old, new))
655
656 num_extra_adds = len(sha_adds) - len(sha_deletes)
657 # TODO(dborowitz): Less arbitrary way of dealing with extra copies.
658 old = sha_deletes[0][0]
659 if num_extra_adds > 0:
660 for new in sha_adds[-num_extra_adds:]:
661 assert new.path is not None
662 add_paths.add(new.path)
663 self._changes.append(TreeChange(CHANGE_COPY, old, new))
664 self._prune(add_paths, delete_paths)
665
666 def _should_find_content_renames(self) -> bool:
667 if self._max_files is None:
668 return True
669 return len(self._adds) * len(self._deletes) <= self._max_files**2
670
671 def _rename_type(
672 self, check_paths: bool, delete: TreeChange, add: TreeChange
673 ) -> str:
674 assert delete.old is not None and add.new is not None
675 if check_paths and delete.old.path == add.new.path:
676 # If the paths match, this must be a split modify, so make sure it
677 # comes out as a modify.
678 return CHANGE_MODIFY
679 elif delete.type != CHANGE_DELETE:
680 # If it's in deletes but not marked as a delete, it must have been
681 # added due to find_copies_harder, and needs to be marked as a
682 # copy.
683 return CHANGE_COPY
684 return CHANGE_RENAME
685
686 def _find_content_rename_candidates(self) -> None:
687 candidates = self._candidates = []
688 # TODO: Optimizations:
689 # - Compare object sizes before counting blocks.
690 # - Skip if delete's S_IFMT differs from all adds.
691 # - Skip if adds or deletes is empty.
692 # Match C git's behavior of not attempting to find content renames if
693 # the matrix size exceeds the threshold.
694 if not self._should_find_content_renames():
695 return
696
697 block_cache = {}
698 check_paths = self._rename_threshold is not None
699 for delete in self._deletes:
700 assert delete.old is not None
701 assert delete.old.mode is not None
702 if S_ISGITLINK(delete.old.mode):
703 continue # Git links don't exist in this repo.
704 assert delete.old.sha is not None
705 old_sha = delete.old.sha
706 old_obj = self._store[old_sha]
707 block_cache[old_sha] = _count_blocks(old_obj)
708 for add in self._adds:
709 assert add.new is not None
710 assert add.new.mode is not None
711 if stat.S_IFMT(delete.old.mode) != stat.S_IFMT(add.new.mode):
712 continue
713 assert add.new.sha is not None
714 new_obj = self._store[add.new.sha]
715 score = _similarity_score(old_obj, new_obj, block_cache=block_cache)
716 if score > self._rename_threshold:
717 new_type = self._rename_type(check_paths, delete, add)
718 rename = TreeChange(new_type, delete.old, add.new)
719 candidates.append((-score, rename))
720
721 def _choose_content_renames(self) -> None:
722 # Sort scores from highest to lowest, but keep names in ascending
723 # order.
724 self._candidates.sort()
725
726 delete_paths = set()
727 add_paths = set()
728 for _, change in self._candidates:
729 assert change.old is not None and change.new is not None
730 new_path = change.new.path
731 assert new_path is not None
732 if new_path in add_paths:
733 continue
734 old_path = change.old.path
735 assert old_path is not None
736 orig_type = change.type
737 if old_path in delete_paths:
738 change = TreeChange(CHANGE_COPY, change.old, change.new)
739
740 # If the candidate was originally a copy, that means it came from a
741 # modified or unchanged path, so we don't want to prune it.
742 if orig_type != CHANGE_COPY:
743 delete_paths.add(old_path)
744 add_paths.add(new_path)
745 self._changes.append(change)
746 self._prune(add_paths, delete_paths)
747
748 def _join_modifies(self) -> None:
749 if self._rewrite_threshold is None:
750 return
751
752 modifies = {}
753 delete_map = {}
754 for d in self._deletes:
755 assert d.old is not None
756 delete_map[d.old.path] = d
757 for add in self._adds:
758 assert add.new is not None
759 path = add.new.path
760 delete = delete_map.get(path)
761 if (
762 delete is not None
763 and delete.old is not None
764 and delete.old.mode is not None
765 and add.new.mode is not None
766 and stat.S_IFMT(delete.old.mode) == stat.S_IFMT(add.new.mode)
767 ):
768 modifies[path] = TreeChange(CHANGE_MODIFY, delete.old, add.new)
769
770 def check_add_mod(a: TreeChange) -> bool:
771 assert a.new is not None
772 return a.new.path not in modifies
773
774 def check_delete_mod(d: TreeChange) -> bool:
775 assert d.old is not None
776 return d.old.path not in modifies
777
778 self._adds = [a for a in self._adds if check_add_mod(a)]
779 self._deletes = [d for d in self._deletes if check_delete_mod(d)]
780 self._changes += modifies.values()
781
782 def _sorted_changes(self) -> list[TreeChange]:
783 result = []
784 result.extend(self._adds)
785 result.extend(self._deletes)
786 result.extend(self._changes)
787 result.sort(key=_tree_change_key)
788 return result
789
790 def _prune_unchanged(self) -> None:
791 if self._want_unchanged:
792 return
793 self._deletes = [d for d in self._deletes if d.type != CHANGE_UNCHANGED]
794
795 def changes_with_renames(
796 self,
797 tree1_id: Optional[ObjectID],
798 tree2_id: Optional[ObjectID],
799 want_unchanged: bool = False,
800 include_trees: bool = False,
801 ) -> list[TreeChange]:
802 """Iterate TreeChanges between two tree SHAs, with rename detection."""
803 self._reset()
804 self._want_unchanged = want_unchanged
805 self._include_trees = include_trees
806 self._collect_changes(tree1_id, tree2_id)
807 self._find_exact_renames()
808 self._find_content_rename_candidates()
809 self._choose_content_renames()
810 self._join_modifies()
811 self._prune_unchanged()
812 return self._sorted_changes()
813
814
815# Hold on to the pure-python implementations for testing.
816_is_tree_py = _is_tree
817_merge_entries_py = _merge_entries
818_count_blocks_py = _count_blocks
819
820if TYPE_CHECKING:
821 # For type checking, use the Python implementations
822 pass
823else:
824 # At runtime, try to import Rust extensions
825 try:
826 # Try to import Rust versions
827 from dulwich._diff_tree import (
828 _count_blocks as _rust_count_blocks,
829 )
830 from dulwich._diff_tree import (
831 _is_tree as _rust_is_tree,
832 )
833 from dulwich._diff_tree import (
834 _merge_entries as _rust_merge_entries,
835 )
836
837 # Override with Rust versions
838 _count_blocks = _rust_count_blocks
839 _is_tree = _rust_is_tree
840 _merge_entries = _rust_merge_entries
841 except ImportError:
842 pass