1# diff_tree.py -- Utilities for diffing files and trees.
2# Copyright (C) 2010 Google, Inc.
3#
4# SPDX-License-Identifier: Apache-2.0 OR GPL-2.0-or-later
5# Dulwich is dual-licensed under the Apache License, Version 2.0 and the GNU
6# General Public License as published by the Free Software Foundation; version 2.0
7# or (at your option) any later version. You can redistribute it and/or
8# modify it under the terms of either of these two licenses.
9#
10# Unless required by applicable law or agreed to in writing, software
11# distributed under the License is distributed on an "AS IS" BASIS,
12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13# See the License for the specific language governing permissions and
14# limitations under the License.
15#
16# You should have received a copy of the licenses; if not, see
17# <http://www.gnu.org/licenses/> for a copy of the GNU General Public License
18# and <http://www.apache.org/licenses/LICENSE-2.0> for a copy of the Apache
19# License, Version 2.0.
20#
21
22"""Utilities for diffing files and trees."""
23
24import stat
25from collections import defaultdict
26from collections.abc import Iterator
27from io import BytesIO
28from itertools import chain
29from typing import TYPE_CHECKING, Any, Callable, NamedTuple, Optional, TypeVar
30
31from .object_store import BaseObjectStore
32from .objects import S_ISGITLINK, ObjectID, ShaFile, Tree, TreeEntry
33
34# TreeChange type constants.
35CHANGE_ADD = "add"
36CHANGE_MODIFY = "modify"
37CHANGE_DELETE = "delete"
38CHANGE_RENAME = "rename"
39CHANGE_COPY = "copy"
40CHANGE_UNCHANGED = "unchanged"
41
42RENAME_CHANGE_TYPES = (CHANGE_RENAME, CHANGE_COPY)
43
44# _NULL_ENTRY removed - using None instead
45
46_MAX_SCORE = 100
47RENAME_THRESHOLD = 60
48MAX_FILES = 200
49REWRITE_THRESHOLD: Optional[int] = None
50
51
52class TreeChange(NamedTuple):
53 """Named tuple a single change between two trees."""
54
55 type: str
56 old: Optional[TreeEntry]
57 new: Optional[TreeEntry]
58
59 @classmethod
60 def add(cls, new: TreeEntry) -> "TreeChange":
61 """Create a TreeChange for an added entry.
62
63 Args:
64 new: New tree entry
65
66 Returns:
67 TreeChange instance
68 """
69 return cls(CHANGE_ADD, None, new)
70
71 @classmethod
72 def delete(cls, old: TreeEntry) -> "TreeChange":
73 """Create a TreeChange for a deleted entry.
74
75 Args:
76 old: Old tree entry
77
78 Returns:
79 TreeChange instance
80 """
81 return cls(CHANGE_DELETE, old, None)
82
83
84def _tree_entries(path: bytes, tree: Tree) -> list[TreeEntry]:
85 result: list[TreeEntry] = []
86 if not tree:
87 return result
88 for entry in tree.iteritems(name_order=True):
89 result.append(entry.in_path(path))
90 return result
91
92
93def _merge_entries(
94 path: bytes, tree1: Tree, tree2: Tree
95) -> list[tuple[Optional[TreeEntry], Optional[TreeEntry]]]:
96 """Merge the entries of two trees.
97
98 Args:
99 path: A path to prepend to all tree entry names.
100 tree1: The first Tree object to iterate, or None.
101 tree2: The second Tree object to iterate, or None.
102
103 Returns:
104 A list of pairs of TreeEntry objects for each pair of entries in
105 the trees. If an entry exists in one tree but not the other, the other
106 entry will be None. If both entries exist, they are guaranteed to match.
107 """
108 entries1 = _tree_entries(path, tree1)
109 entries2 = _tree_entries(path, tree2)
110 i1 = i2 = 0
111 len1 = len(entries1)
112 len2 = len(entries2)
113
114 result: list[tuple[Optional[TreeEntry], Optional[TreeEntry]]] = []
115 while i1 < len1 and i2 < len2:
116 entry1 = entries1[i1]
117 entry2 = entries2[i2]
118 if entry1.path < entry2.path:
119 result.append((entry1, None))
120 i1 += 1
121 elif entry1.path > entry2.path:
122 result.append((None, entry2))
123 i2 += 1
124 else:
125 result.append((entry1, entry2))
126 i1 += 1
127 i2 += 1
128 for i in range(i1, len1):
129 result.append((entries1[i], None))
130 for i in range(i2, len2):
131 result.append((None, entries2[i]))
132 return result
133
134
135def _is_tree(entry: Optional[TreeEntry]) -> bool:
136 if entry is None:
137 return False
138 return stat.S_ISDIR(entry.mode)
139
140
141def walk_trees(
142 store: BaseObjectStore,
143 tree1_id: Optional[ObjectID],
144 tree2_id: Optional[ObjectID],
145 prune_identical: bool = False,
146 paths: Optional[list[bytes]] = None,
147) -> Iterator[tuple[Optional[TreeEntry], Optional[TreeEntry]]]:
148 """Recursively walk all the entries of two trees.
149
150 Iteration is depth-first pre-order, as in e.g. os.walk.
151
152 Args:
153 store: An ObjectStore for looking up objects.
154 tree1_id: The SHA of the first Tree object to iterate, or None.
155 tree2_id: The SHA of the second Tree object to iterate, or None.
156 prune_identical: If True, identical subtrees will not be walked.
157 paths: Optional list of paths to filter to (as bytes).
158
159 Returns:
160 Iterator over Pairs of TreeEntry objects for each pair of entries
161 in the trees and their subtrees recursively. If an entry exists in one
162 tree but not the other, the other entry will be None. If both entries
163 exist, they are guaranteed to match.
164 """
165 # This could be fairly easily generalized to >2 trees if we find a use
166 # case.
167 entry1 = TreeEntry(b"", stat.S_IFDIR, tree1_id) if tree1_id else None
168 entry2 = TreeEntry(b"", stat.S_IFDIR, tree2_id) if tree2_id else None
169 todo: list[tuple[Optional[TreeEntry], Optional[TreeEntry]]] = [(entry1, entry2)]
170 while todo:
171 entry1, entry2 = todo.pop()
172 is_tree1 = _is_tree(entry1)
173 is_tree2 = _is_tree(entry2)
174 if prune_identical and is_tree1 and is_tree2 and entry1 == entry2:
175 continue
176
177 tree1 = (is_tree1 and entry1 and store[entry1.sha]) or None
178 tree2 = (is_tree2 and entry2 and store[entry2.sha]) or None
179 path = (
180 (entry1.path if entry1 else None)
181 or (entry2.path if entry2 else None)
182 or b""
183 )
184
185 # If we have path filters, check if we should process this tree
186 if paths is not None and (is_tree1 or is_tree2):
187 # Special case for root tree
188 if path == b"":
189 should_recurse = True
190 else:
191 # Check if any of our filter paths could be under this tree
192 should_recurse = False
193 for filter_path in paths:
194 if filter_path == path:
195 # Exact match - we want this directory itself
196 should_recurse = True
197 break
198 elif filter_path.startswith(path + b"/"):
199 # Filter path is under this directory
200 should_recurse = True
201 break
202 elif path.startswith(filter_path + b"/"):
203 # This directory is under a filter path
204 should_recurse = True
205 break
206 if not should_recurse:
207 # Skip this tree entirely
208 continue
209
210 # Ensure trees are Tree objects before merging
211 if tree1 is not None and not isinstance(tree1, Tree):
212 tree1 = None
213 if tree2 is not None and not isinstance(tree2, Tree):
214 tree2 = None
215
216 if tree1 is not None or tree2 is not None:
217 # Use empty trees for None values
218 if tree1 is None:
219 tree1 = Tree()
220 if tree2 is None:
221 tree2 = Tree()
222 todo.extend(reversed(_merge_entries(path, tree1, tree2)))
223
224 # Only yield entries that match our path filters
225 if paths is None:
226 yield entry1, entry2
227 else:
228 # Check if this entry matches any of our filters
229 for filter_path in paths:
230 if path == filter_path:
231 # Exact match
232 yield entry1, entry2
233 break
234 elif path.startswith(filter_path + b"/"):
235 # This entry is under a filter directory
236 yield entry1, entry2
237 break
238 elif filter_path.startswith(path + b"/") and (is_tree1 or is_tree2):
239 # This is a parent directory of a filter path
240 yield entry1, entry2
241 break
242
243
244def _skip_tree(entry: Optional[TreeEntry], include_trees: bool) -> Optional[TreeEntry]:
245 if entry is None or (not include_trees and stat.S_ISDIR(entry.mode)):
246 return None
247 return entry
248
249
250def tree_changes(
251 store: BaseObjectStore,
252 tree1_id: Optional[ObjectID],
253 tree2_id: Optional[ObjectID],
254 want_unchanged: bool = False,
255 rename_detector: Optional["RenameDetector"] = None,
256 include_trees: bool = False,
257 change_type_same: bool = False,
258 paths: Optional[list[bytes]] = None,
259) -> Iterator[TreeChange]:
260 """Find the differences between the contents of two trees.
261
262 Args:
263 store: An ObjectStore for looking up objects.
264 tree1_id: The SHA of the source tree.
265 tree2_id: The SHA of the target tree.
266 want_unchanged: If True, include TreeChanges for unmodified entries
267 as well.
268 include_trees: Whether to include trees
269 rename_detector: RenameDetector object for detecting renames.
270 change_type_same: Whether to report change types in the same
271 entry or as delete+add.
272 paths: Optional list of paths to filter to (as bytes).
273
274 Returns:
275 Iterator over TreeChange instances for each change between the
276 source and target tree.
277 """
278 if rename_detector is not None and tree1_id is not None and tree2_id is not None:
279 yield from rename_detector.changes_with_renames(
280 tree1_id,
281 tree2_id,
282 want_unchanged=want_unchanged,
283 include_trees=include_trees,
284 )
285 return
286
287 entries = walk_trees(
288 store, tree1_id, tree2_id, prune_identical=(not want_unchanged), paths=paths
289 )
290 for entry1, entry2 in entries:
291 if entry1 == entry2 and not want_unchanged:
292 continue
293
294 # Treat entries for trees as missing.
295 entry1 = _skip_tree(entry1, include_trees)
296 entry2 = _skip_tree(entry2, include_trees)
297
298 if entry1 is not None and entry2 is not None:
299 if (
300 stat.S_IFMT(entry1.mode) != stat.S_IFMT(entry2.mode)
301 and not change_type_same
302 ):
303 # File type changed: report as delete/add.
304 yield TreeChange.delete(entry1)
305 entry1 = None
306 change_type = CHANGE_ADD
307 elif entry1 == entry2:
308 change_type = CHANGE_UNCHANGED
309 else:
310 change_type = CHANGE_MODIFY
311 elif entry1 is not None:
312 change_type = CHANGE_DELETE
313 elif entry2 is not None:
314 change_type = CHANGE_ADD
315 else:
316 # Both were None because at least one was a tree.
317 continue
318 yield TreeChange(change_type, entry1, entry2)
319
320
321T = TypeVar("T")
322U = TypeVar("U")
323
324
325def _all_eq(seq: list[T], key: Callable[[T], U], value: U) -> bool:
326 for e in seq:
327 if key(e) != value:
328 return False
329 return True
330
331
332def _all_same(seq: list[Any], key: Callable[[Any], Any]) -> bool:
333 return _all_eq(seq[1:], key, key(seq[0]))
334
335
336def tree_changes_for_merge(
337 store: BaseObjectStore,
338 parent_tree_ids: list[ObjectID],
339 tree_id: ObjectID,
340 rename_detector: Optional["RenameDetector"] = None,
341) -> Iterator[list[Optional[TreeChange]]]:
342 """Get the tree changes for a merge tree relative to all its parents.
343
344 Args:
345 store: An ObjectStore for looking up objects.
346 parent_tree_ids: An iterable of the SHAs of the parent trees.
347 tree_id: The SHA of the merge tree.
348 rename_detector: RenameDetector object for detecting renames.
349
350 Returns:
351 Iterator over lists of TreeChange objects, one per conflicted path
352 in the merge.
353
354 Each list contains one element per parent, with the TreeChange for that
355 path relative to that parent. An element may be None if it never
356 existed in one parent and was deleted in two others.
357
358 A path is only included in the output if it is a conflict, i.e. its SHA
359 in the merge tree is not found in any of the parents, or in the case of
360 deletes, if not all of the old SHAs match.
361 """
362 all_parent_changes = [
363 tree_changes(store, t, tree_id, rename_detector=rename_detector)
364 for t in parent_tree_ids
365 ]
366 num_parents = len(parent_tree_ids)
367 changes_by_path: dict[bytes, list[Optional[TreeChange]]] = defaultdict(
368 lambda: [None] * num_parents
369 )
370
371 # Organize by path.
372 for i, parent_changes in enumerate(all_parent_changes):
373 for change in parent_changes:
374 if change.type == CHANGE_DELETE:
375 assert change.old is not None
376 path = change.old.path
377 else:
378 assert change.new is not None
379 path = change.new.path
380 changes_by_path[path][i] = change
381
382 def old_sha(c: TreeChange) -> Optional[ObjectID]:
383 return c.old.sha if c.old is not None else None
384
385 def change_type(c: TreeChange) -> str:
386 return c.type
387
388 # Yield only conflicting changes.
389 for _, changes in sorted(changes_by_path.items()):
390 assert len(changes) == num_parents
391 have = [c for c in changes if c is not None]
392 if _all_eq(have, change_type, CHANGE_DELETE):
393 if not _all_same(have, old_sha):
394 yield changes
395 elif not _all_same(have, change_type):
396 yield changes
397 elif None not in changes:
398 # If no change was found relative to one parent, that means the SHA
399 # must have matched the SHA in that parent, so it is not a
400 # conflict.
401 yield changes
402
403
404_BLOCK_SIZE = 64
405
406
407def _count_blocks(obj: ShaFile) -> dict[int, int]:
408 """Count the blocks in an object.
409
410 Splits the data into blocks either on lines or <=64-byte chunks of lines.
411
412 Args:
413 obj: The object to count blocks for.
414
415 Returns:
416 A dict of block hashcode -> total bytes occurring.
417 """
418 block_counts: dict[int, int] = defaultdict(int)
419 block = BytesIO()
420 n = 0
421
422 # Cache attrs as locals to avoid expensive lookups in the inner loop.
423 block_write = block.write
424 block_seek = block.seek
425 block_truncate = block.truncate
426 block_getvalue = block.getvalue
427
428 for c in chain.from_iterable(obj.as_raw_chunks()):
429 cb = c.to_bytes(1, "big")
430 block_write(cb)
431 n += 1
432 if cb == b"\n" or n == _BLOCK_SIZE:
433 value = block_getvalue()
434 block_counts[hash(value)] += len(value)
435 block_seek(0)
436 block_truncate()
437 n = 0
438 if n > 0:
439 last_block = block_getvalue()
440 block_counts[hash(last_block)] += len(last_block)
441 return block_counts
442
443
444def _common_bytes(blocks1: dict[int, int], blocks2: dict[int, int]) -> int:
445 """Count the number of common bytes in two block count dicts.
446
447 Args:
448 blocks1: The first dict of block hashcode -> total bytes.
449 blocks2: The second dict of block hashcode -> total bytes.
450
451 Returns:
452 The number of bytes in common between blocks1 and blocks2. This is
453 only approximate due to possible hash collisions.
454 """
455 # Iterate over the smaller of the two dicts, since this is symmetrical.
456 if len(blocks1) > len(blocks2):
457 blocks1, blocks2 = blocks2, blocks1
458 score = 0
459 for block, count1 in blocks1.items():
460 count2 = blocks2.get(block)
461 if count2:
462 score += min(count1, count2)
463 return score
464
465
466def _similarity_score(
467 obj1: ShaFile,
468 obj2: ShaFile,
469 block_cache: Optional[dict[ObjectID, dict[int, int]]] = None,
470) -> int:
471 """Compute a similarity score for two objects.
472
473 Args:
474 obj1: The first object to score.
475 obj2: The second object to score.
476 block_cache: An optional dict of SHA to block counts to cache
477 results between calls.
478
479 Returns:
480 The similarity score between the two objects, defined as the
481 number of bytes in common between the two objects divided by the
482 maximum size, scaled to the range 0-100.
483 """
484 if block_cache is None:
485 block_cache = {}
486 if obj1.id not in block_cache:
487 block_cache[obj1.id] = _count_blocks(obj1)
488 if obj2.id not in block_cache:
489 block_cache[obj2.id] = _count_blocks(obj2)
490
491 common_bytes = _common_bytes(block_cache[obj1.id], block_cache[obj2.id])
492 max_size = max(obj1.raw_length(), obj2.raw_length())
493 if not max_size:
494 return _MAX_SCORE
495 return int(float(common_bytes) * _MAX_SCORE / max_size)
496
497
498def _tree_change_key(entry: TreeChange) -> tuple[bytes, bytes]:
499 # Sort by old path then new path. If only one exists, use it for both keys.
500 path1 = entry.old.path if entry.old is not None else None
501 path2 = entry.new.path if entry.new is not None else None
502 if path1 is None:
503 path1 = path2
504 if path2 is None:
505 path2 = path1
506 assert path1 is not None and path2 is not None
507 return (path1, path2)
508
509
510class RenameDetector:
511 """Object for handling rename detection between two trees."""
512
513 _adds: list[TreeChange]
514 _deletes: list[TreeChange]
515 _changes: list[TreeChange]
516 _candidates: list[tuple[int, TreeChange]]
517
518 def __init__(
519 self,
520 store: BaseObjectStore,
521 rename_threshold: int = RENAME_THRESHOLD,
522 max_files: Optional[int] = MAX_FILES,
523 rewrite_threshold: Optional[int] = REWRITE_THRESHOLD,
524 find_copies_harder: bool = False,
525 ) -> None:
526 """Initialize the rename detector.
527
528 Args:
529 store: An ObjectStore for looking up objects.
530 rename_threshold: The threshold similarity score for considering
531 an add/delete pair to be a rename/copy; see _similarity_score.
532 max_files: The maximum number of adds and deletes to consider,
533 or None for no limit. The detector is guaranteed to compare no more
534 than max_files ** 2 add/delete pairs. This limit is provided
535 because rename detection can be quadratic in the project size. If
536 the limit is exceeded, no content rename detection is attempted.
537 rewrite_threshold: The threshold similarity score below which a
538 modify should be considered a delete/add, or None to not break
539 modifies; see _similarity_score.
540 find_copies_harder: If True, consider unmodified files when
541 detecting copies.
542 """
543 self._store = store
544 self._rename_threshold = rename_threshold
545 self._rewrite_threshold = rewrite_threshold
546 self._max_files = max_files
547 self._find_copies_harder = find_copies_harder
548 self._want_unchanged = False
549
550 def _reset(self) -> None:
551 self._adds = []
552 self._deletes = []
553 self._changes = []
554
555 def _should_split(self, change: TreeChange) -> bool:
556 if self._rewrite_threshold is None or change.type != CHANGE_MODIFY:
557 return False
558 assert change.old is not None and change.new is not None
559 if change.old.sha == change.new.sha:
560 return False
561 old_obj = self._store[change.old.sha]
562 new_obj = self._store[change.new.sha]
563 return _similarity_score(old_obj, new_obj) < self._rewrite_threshold
564
565 def _add_change(self, change: TreeChange) -> None:
566 if change.type == CHANGE_ADD:
567 self._adds.append(change)
568 elif change.type == CHANGE_DELETE:
569 self._deletes.append(change)
570 elif self._should_split(change):
571 assert change.old is not None and change.new is not None
572 self._deletes.append(TreeChange.delete(change.old))
573 self._adds.append(TreeChange.add(change.new))
574 elif (
575 self._find_copies_harder and change.type == CHANGE_UNCHANGED
576 ) or change.type == CHANGE_MODIFY:
577 # Treat all modifies as potential deletes for rename detection,
578 # but don't split them (to avoid spurious renames). Setting
579 # find_copies_harder means we treat unchanged the same as
580 # modified.
581 self._deletes.append(change)
582 else:
583 self._changes.append(change)
584
585 def _collect_changes(
586 self, tree1_id: Optional[ObjectID], tree2_id: Optional[ObjectID]
587 ) -> None:
588 want_unchanged = self._find_copies_harder or self._want_unchanged
589 for change in tree_changes(
590 self._store,
591 tree1_id,
592 tree2_id,
593 want_unchanged=want_unchanged,
594 include_trees=self._include_trees,
595 ):
596 self._add_change(change)
597
598 def _prune(self, add_paths: set[bytes], delete_paths: set[bytes]) -> None:
599 def check_add(a: TreeChange) -> bool:
600 assert a.new is not None
601 return a.new.path not in add_paths
602
603 def check_delete(d: TreeChange) -> bool:
604 assert d.old is not None
605 return d.old.path not in delete_paths
606
607 self._adds = [a for a in self._adds if check_add(a)]
608 self._deletes = [d for d in self._deletes if check_delete(d)]
609
610 def _find_exact_renames(self) -> None:
611 add_map = defaultdict(list)
612 for add in self._adds:
613 assert add.new is not None
614 add_map[add.new.sha].append(add.new)
615 delete_map = defaultdict(list)
616 for delete in self._deletes:
617 # Keep track of whether the delete was actually marked as a delete.
618 # If not, it needs to be marked as a copy.
619 is_delete = delete.type == CHANGE_DELETE
620 assert delete.old is not None
621 delete_map[delete.old.sha].append((delete.old, is_delete))
622
623 add_paths = set()
624 delete_paths = set()
625 for sha, sha_deletes in delete_map.items():
626 sha_adds = add_map[sha]
627 for (old, is_delete), new in zip(sha_deletes, sha_adds):
628 if stat.S_IFMT(old.mode) != stat.S_IFMT(new.mode):
629 continue
630 if is_delete:
631 delete_paths.add(old.path)
632 add_paths.add(new.path)
633 new_type = (is_delete and CHANGE_RENAME) or CHANGE_COPY
634 self._changes.append(TreeChange(new_type, old, new))
635
636 num_extra_adds = len(sha_adds) - len(sha_deletes)
637 # TODO(dborowitz): Less arbitrary way of dealing with extra copies.
638 old = sha_deletes[0][0]
639 if num_extra_adds > 0:
640 for new in sha_adds[-num_extra_adds:]:
641 add_paths.add(new.path)
642 self._changes.append(TreeChange(CHANGE_COPY, old, new))
643 self._prune(add_paths, delete_paths)
644
645 def _should_find_content_renames(self) -> bool:
646 if self._max_files is None:
647 return True
648 return len(self._adds) * len(self._deletes) <= self._max_files**2
649
650 def _rename_type(
651 self, check_paths: bool, delete: TreeChange, add: TreeChange
652 ) -> str:
653 assert delete.old is not None and add.new is not None
654 if check_paths and delete.old.path == add.new.path:
655 # If the paths match, this must be a split modify, so make sure it
656 # comes out as a modify.
657 return CHANGE_MODIFY
658 elif delete.type != CHANGE_DELETE:
659 # If it's in deletes but not marked as a delete, it must have been
660 # added due to find_copies_harder, and needs to be marked as a
661 # copy.
662 return CHANGE_COPY
663 return CHANGE_RENAME
664
665 def _find_content_rename_candidates(self) -> None:
666 candidates = self._candidates = []
667 # TODO: Optimizations:
668 # - Compare object sizes before counting blocks.
669 # - Skip if delete's S_IFMT differs from all adds.
670 # - Skip if adds or deletes is empty.
671 # Match C git's behavior of not attempting to find content renames if
672 # the matrix size exceeds the threshold.
673 if not self._should_find_content_renames():
674 return
675
676 block_cache = {}
677 check_paths = self._rename_threshold is not None
678 for delete in self._deletes:
679 assert delete.old is not None
680 if S_ISGITLINK(delete.old.mode):
681 continue # Git links don't exist in this repo.
682 old_sha = delete.old.sha
683 old_obj = self._store[old_sha]
684 block_cache[old_sha] = _count_blocks(old_obj)
685 for add in self._adds:
686 assert add.new is not None
687 if stat.S_IFMT(delete.old.mode) != stat.S_IFMT(add.new.mode):
688 continue
689 new_obj = self._store[add.new.sha]
690 score = _similarity_score(old_obj, new_obj, block_cache=block_cache)
691 if score > self._rename_threshold:
692 new_type = self._rename_type(check_paths, delete, add)
693 rename = TreeChange(new_type, delete.old, add.new)
694 candidates.append((-score, rename))
695
696 def _choose_content_renames(self) -> None:
697 # Sort scores from highest to lowest, but keep names in ascending
698 # order.
699 self._candidates.sort()
700
701 delete_paths = set()
702 add_paths = set()
703 for _, change in self._candidates:
704 assert change.old is not None and change.new is not None
705 new_path = change.new.path
706 if new_path in add_paths:
707 continue
708 old_path = change.old.path
709 orig_type = change.type
710 if old_path in delete_paths:
711 change = TreeChange(CHANGE_COPY, change.old, change.new)
712
713 # If the candidate was originally a copy, that means it came from a
714 # modified or unchanged path, so we don't want to prune it.
715 if orig_type != CHANGE_COPY:
716 delete_paths.add(old_path)
717 add_paths.add(new_path)
718 self._changes.append(change)
719 self._prune(add_paths, delete_paths)
720
721 def _join_modifies(self) -> None:
722 if self._rewrite_threshold is None:
723 return
724
725 modifies = {}
726 delete_map = {}
727 for d in self._deletes:
728 assert d.old is not None
729 delete_map[d.old.path] = d
730 for add in self._adds:
731 assert add.new is not None
732 path = add.new.path
733 delete = delete_map.get(path)
734 if delete is not None:
735 assert delete.old is not None
736 if stat.S_IFMT(delete.old.mode) == stat.S_IFMT(add.new.mode):
737 modifies[path] = TreeChange(CHANGE_MODIFY, delete.old, add.new)
738
739 def check_add_mod(a: TreeChange) -> bool:
740 assert a.new is not None
741 return a.new.path not in modifies
742
743 def check_delete_mod(d: TreeChange) -> bool:
744 assert d.old is not None
745 return d.old.path not in modifies
746
747 self._adds = [a for a in self._adds if check_add_mod(a)]
748 self._deletes = [d for d in self._deletes if check_delete_mod(d)]
749 self._changes += modifies.values()
750
751 def _sorted_changes(self) -> list[TreeChange]:
752 result = []
753 result.extend(self._adds)
754 result.extend(self._deletes)
755 result.extend(self._changes)
756 result.sort(key=_tree_change_key)
757 return result
758
759 def _prune_unchanged(self) -> None:
760 if self._want_unchanged:
761 return
762 self._deletes = [d for d in self._deletes if d.type != CHANGE_UNCHANGED]
763
764 def changes_with_renames(
765 self,
766 tree1_id: Optional[ObjectID],
767 tree2_id: Optional[ObjectID],
768 want_unchanged: bool = False,
769 include_trees: bool = False,
770 ) -> list[TreeChange]:
771 """Iterate TreeChanges between two tree SHAs, with rename detection."""
772 self._reset()
773 self._want_unchanged = want_unchanged
774 self._include_trees = include_trees
775 self._collect_changes(tree1_id, tree2_id)
776 self._find_exact_renames()
777 self._find_content_rename_candidates()
778 self._choose_content_renames()
779 self._join_modifies()
780 self._prune_unchanged()
781 return self._sorted_changes()
782
783
784# Hold on to the pure-python implementations for testing.
785_is_tree_py = _is_tree
786_merge_entries_py = _merge_entries
787_count_blocks_py = _count_blocks
788
789if TYPE_CHECKING:
790 # For type checking, use the Python implementations
791 pass
792else:
793 # At runtime, try to import Rust extensions
794 try:
795 # Try to import Rust versions
796 from dulwich._diff_tree import (
797 _count_blocks as _rust_count_blocks,
798 )
799 from dulwich._diff_tree import (
800 _is_tree as _rust_is_tree,
801 )
802 from dulwich._diff_tree import (
803 _merge_entries as _rust_merge_entries,
804 )
805
806 # Override with Rust versions
807 _count_blocks = _rust_count_blocks
808 _is_tree = _rust_is_tree
809 _merge_entries = _rust_merge_entries
810 except ImportError:
811 pass