Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/dulwich/diff_tree.py: 20%
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1# diff_tree.py -- Utilities for diffing files and trees.
2# Copyright (C) 2010 Google, Inc.
3#
4# SPDX-License-Identifier: Apache-2.0 OR GPL-2.0-or-later
5# Dulwich is dual-licensed under the Apache License, Version 2.0 and the GNU
6# General Public License as published by the Free Software Foundation; version 2.0
7# or (at your option) any later version. You can redistribute it and/or
8# modify it under the terms of either of these two licenses.
9#
10# Unless required by applicable law or agreed to in writing, software
11# distributed under the License is distributed on an "AS IS" BASIS,
12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13# See the License for the specific language governing permissions and
14# limitations under the License.
15#
16# You should have received a copy of the licenses; if not, see
17# <http://www.gnu.org/licenses/> for a copy of the GNU General Public License
18# and <http://www.apache.org/licenses/LICENSE-2.0> for a copy of the Apache
19# License, Version 2.0.
20#
22"""Utilities for diffing files and trees."""
24__all__ = [
25 "CHANGE_ADD",
26 "CHANGE_COPY",
27 "CHANGE_DELETE",
28 "CHANGE_MODIFY",
29 "CHANGE_RENAME",
30 "CHANGE_UNCHANGED",
31 "MAX_FILES",
32 "RENAME_CHANGE_TYPES",
33 "RENAME_THRESHOLD",
34 "REWRITE_THRESHOLD",
35 "RenameDetector",
36 "TreeChange",
37 "tree_changes",
38 "tree_changes_for_merge",
39 "walk_trees",
40]
42import stat
43from collections import defaultdict
44from collections.abc import Callable, Iterator, Mapping, Sequence
45from collections.abc import Set as AbstractSet
46from io import BytesIO
47from itertools import chain
48from typing import TYPE_CHECKING, Any, NamedTuple, TypeVar
50from .object_store import BaseObjectStore
51from .objects import S_ISGITLINK, ObjectID, ShaFile, Tree, TreeEntry
53# TreeChange type constants.
54CHANGE_ADD = "add"
55CHANGE_MODIFY = "modify"
56CHANGE_DELETE = "delete"
57CHANGE_RENAME = "rename"
58CHANGE_COPY = "copy"
59CHANGE_UNCHANGED = "unchanged"
61RENAME_CHANGE_TYPES = (CHANGE_RENAME, CHANGE_COPY)
63# _NULL_ENTRY removed - using None instead
65_MAX_SCORE = 100
66RENAME_THRESHOLD = 60
67MAX_FILES = 200
68REWRITE_THRESHOLD: int | None = None
71class TreeChange(NamedTuple):
72 """Named tuple a single change between two trees."""
74 type: str
75 old: TreeEntry | None
76 new: TreeEntry | None
78 @classmethod
79 def add(cls, new: TreeEntry) -> "TreeChange":
80 """Create a TreeChange for an added entry.
82 Args:
83 new: New tree entry
85 Returns:
86 TreeChange instance
87 """
88 return cls(CHANGE_ADD, None, new)
90 @classmethod
91 def delete(cls, old: TreeEntry) -> "TreeChange":
92 """Create a TreeChange for a deleted entry.
94 Args:
95 old: Old tree entry
97 Returns:
98 TreeChange instance
99 """
100 return cls(CHANGE_DELETE, old, None)
103def _tree_entries(path: bytes, tree: Tree) -> list[TreeEntry]:
104 result: list[TreeEntry] = []
105 if not tree:
106 return result
107 for entry in tree.iteritems(name_order=True):
108 result.append(entry.in_path(path))
109 return result
112def _merge_entries(
113 path: bytes, tree1: Tree, tree2: Tree
114) -> list[tuple[TreeEntry | None, TreeEntry | None]]:
115 """Merge the entries of two trees.
117 Args:
118 path: A path to prepend to all tree entry names.
119 tree1: The first Tree object to iterate, or None.
120 tree2: The second Tree object to iterate, or None.
122 Returns:
123 A list of pairs of TreeEntry objects for each pair of entries in
124 the trees. If an entry exists in one tree but not the other, the other
125 entry will be None. If both entries exist, they are guaranteed to match.
126 """
127 entries1 = _tree_entries(path, tree1)
128 entries2 = _tree_entries(path, tree2)
129 i1 = i2 = 0
130 len1 = len(entries1)
131 len2 = len(entries2)
133 result: list[tuple[TreeEntry | None, TreeEntry | None]] = []
134 while i1 < len1 and i2 < len2:
135 entry1 = entries1[i1]
136 entry2 = entries2[i2]
137 if entry1.path < entry2.path:
138 result.append((entry1, None))
139 i1 += 1
140 elif entry1.path > entry2.path:
141 result.append((None, entry2))
142 i2 += 1
143 else:
144 result.append((entry1, entry2))
145 i1 += 1
146 i2 += 1
147 for i in range(i1, len1):
148 result.append((entries1[i], None))
149 for i in range(i2, len2):
150 result.append((None, entries2[i]))
151 return result
154def _is_tree(entry: TreeEntry | None) -> bool:
155 if entry is None or entry.mode is None:
156 return False
157 return stat.S_ISDIR(entry.mode)
160def walk_trees(
161 store: BaseObjectStore,
162 tree1_id: ObjectID | None,
163 tree2_id: ObjectID | None,
164 prune_identical: bool = False,
165 paths: Sequence[bytes] | None = None,
166) -> Iterator[tuple[TreeEntry | None, TreeEntry | None]]:
167 """Recursively walk all the entries of two trees.
169 Iteration is depth-first pre-order, as in e.g. os.walk.
171 Args:
172 store: An ObjectStore for looking up objects.
173 tree1_id: The SHA of the first Tree object to iterate, or None.
174 tree2_id: The SHA of the second Tree object to iterate, or None.
175 prune_identical: If True, identical subtrees will not be walked.
176 paths: Optional list of paths to filter to (as bytes).
178 Returns:
179 Iterator over Pairs of TreeEntry objects for each pair of entries
180 in the trees and their subtrees recursively. If an entry exists in one
181 tree but not the other, the other entry will be None. If both entries
182 exist, they are guaranteed to match.
183 """
184 # This could be fairly easily generalized to >2 trees if we find a use
185 # case.
186 entry1 = TreeEntry(b"", stat.S_IFDIR, tree1_id) if tree1_id else None
187 entry2 = TreeEntry(b"", stat.S_IFDIR, tree2_id) if tree2_id else None
188 todo: list[tuple[TreeEntry | None, TreeEntry | None]] = [(entry1, entry2)]
189 while todo:
190 entry1, entry2 = todo.pop()
191 is_tree1 = _is_tree(entry1)
192 is_tree2 = _is_tree(entry2)
193 if prune_identical and is_tree1 and is_tree2 and entry1 == entry2:
194 continue
196 tree1 = (is_tree1 and entry1 and store[entry1.sha]) or None
197 tree2 = (is_tree2 and entry2 and store[entry2.sha]) or None
198 path = (
199 (entry1.path if entry1 else None)
200 or (entry2.path if entry2 else None)
201 or b""
202 )
204 # If we have path filters, check if we should process this tree
205 if paths is not None and (is_tree1 or is_tree2) and path is not None:
206 # Special case for root tree
207 if path == b"":
208 should_recurse = True
209 else:
210 # Check if any of our filter paths could be under this tree
211 should_recurse = False
212 for filter_path in paths:
213 if filter_path == path:
214 # Exact match - we want this directory itself
215 should_recurse = True
216 break
217 elif filter_path.startswith(path + b"/"):
218 # Filter path is under this directory
219 should_recurse = True
220 break
221 elif path.startswith(filter_path + b"/"):
222 # This directory is under a filter path
223 should_recurse = True
224 break
225 if not should_recurse:
226 # Skip this tree entirely
227 continue
229 # Ensure trees are Tree objects before merging
230 if tree1 is not None and not isinstance(tree1, Tree):
231 tree1 = None
232 if tree2 is not None and not isinstance(tree2, Tree):
233 tree2 = None
235 if tree1 is not None or tree2 is not None:
236 # Use empty trees for None values
237 if tree1 is None:
238 tree1 = Tree()
239 if tree2 is None:
240 tree2 = Tree()
241 assert path is not None
242 todo.extend(reversed(_merge_entries(path, tree1, tree2)))
244 # Only yield entries that match our path filters
245 if paths is None:
246 yield entry1, entry2
247 else:
248 # Check if this entry matches any of our filters
249 for filter_path in paths:
250 if path == filter_path:
251 # Exact match
252 yield entry1, entry2
253 break
254 elif path is not None and path.startswith(filter_path + b"/"):
255 # This entry is under a filter directory
256 yield entry1, entry2
257 break
258 elif (
259 path is not None
260 and filter_path.startswith(path + b"/")
261 and (is_tree1 or is_tree2)
262 ):
263 # This is a parent directory of a filter path
264 yield entry1, entry2
265 break
268def _skip_tree(entry: TreeEntry | None, include_trees: bool) -> TreeEntry | None:
269 if entry is None or entry.mode is None:
270 return None
271 if not include_trees and stat.S_ISDIR(entry.mode):
272 return None
273 return entry
276def tree_changes(
277 store: BaseObjectStore,
278 tree1_id: ObjectID | None,
279 tree2_id: ObjectID | None,
280 want_unchanged: bool = False,
281 rename_detector: "RenameDetector | None" = None,
282 include_trees: bool = False,
283 change_type_same: bool = False,
284 paths: Sequence[bytes] | None = None,
285) -> Iterator[TreeChange]:
286 """Find the differences between the contents of two trees.
288 Args:
289 store: An ObjectStore for looking up objects.
290 tree1_id: The SHA of the source tree.
291 tree2_id: The SHA of the target tree.
292 want_unchanged: If True, include TreeChanges for unmodified entries
293 as well.
294 include_trees: Whether to include trees
295 rename_detector: RenameDetector object for detecting renames.
296 change_type_same: Whether to report change types in the same
297 entry or as delete+add.
298 paths: Optional list of paths to filter to (as bytes).
300 Returns:
301 Iterator over TreeChange instances for each change between the
302 source and target tree.
303 """
304 if rename_detector is not None and tree1_id is not None and tree2_id is not None:
305 yield from rename_detector.changes_with_renames(
306 tree1_id,
307 tree2_id,
308 want_unchanged=want_unchanged,
309 include_trees=include_trees,
310 )
311 return
313 entries = walk_trees(
314 store, tree1_id, tree2_id, prune_identical=(not want_unchanged), paths=paths
315 )
316 for entry1, entry2 in entries:
317 if entry1 == entry2 and not want_unchanged:
318 continue
320 # Treat entries for trees as missing.
321 entry1 = _skip_tree(entry1, include_trees)
322 entry2 = _skip_tree(entry2, include_trees)
324 if entry1 is not None and entry2 is not None:
325 if (
326 entry1.mode is not None
327 and entry2.mode is not None
328 and stat.S_IFMT(entry1.mode) != stat.S_IFMT(entry2.mode)
329 and not change_type_same
330 ):
331 # File type changed: report as delete/add.
332 yield TreeChange.delete(entry1)
333 entry1 = None
334 change_type = CHANGE_ADD
335 elif entry1 == entry2:
336 change_type = CHANGE_UNCHANGED
337 else:
338 change_type = CHANGE_MODIFY
339 elif entry1 is not None:
340 change_type = CHANGE_DELETE
341 elif entry2 is not None:
342 change_type = CHANGE_ADD
343 else:
344 # Both were None because at least one was a tree.
345 continue
346 yield TreeChange(change_type, entry1, entry2)
349T = TypeVar("T")
350U = TypeVar("U")
353def _all_eq(seq: Sequence[T], key: Callable[[T], U], value: U) -> bool:
354 for e in seq:
355 if key(e) != value:
356 return False
357 return True
360def _all_same(seq: Sequence[Any], key: Callable[[Any], Any]) -> bool:
361 return _all_eq(seq[1:], key, key(seq[0]))
364def tree_changes_for_merge(
365 store: BaseObjectStore,
366 parent_tree_ids: Sequence[ObjectID],
367 tree_id: ObjectID,
368 rename_detector: "RenameDetector | None" = None,
369) -> Iterator[list[TreeChange | None]]:
370 """Get the tree changes for a merge tree relative to all its parents.
372 Args:
373 store: An ObjectStore for looking up objects.
374 parent_tree_ids: An iterable of the SHAs of the parent trees.
375 tree_id: The SHA of the merge tree.
376 rename_detector: RenameDetector object for detecting renames.
378 Returns:
379 Iterator over lists of TreeChange objects, one per conflicted path
380 in the merge.
382 Each list contains one element per parent, with the TreeChange for that
383 path relative to that parent. An element may be None if it never
384 existed in one parent and was deleted in two others.
386 A path is only included in the output if it is a conflict, i.e. its SHA
387 in the merge tree is not found in any of the parents, or in the case of
388 deletes, if not all of the old SHAs match.
389 """
390 all_parent_changes = [
391 tree_changes(store, t, tree_id, rename_detector=rename_detector)
392 for t in parent_tree_ids
393 ]
394 num_parents = len(parent_tree_ids)
395 changes_by_path: dict[bytes, list[TreeChange | None]] = defaultdict(
396 lambda: [None] * num_parents
397 )
399 # Organize by path.
400 for i, parent_changes in enumerate(all_parent_changes):
401 for change in parent_changes:
402 if change.type == CHANGE_DELETE:
403 assert change.old is not None
404 path = change.old.path
405 else:
406 assert change.new is not None
407 path = change.new.path
408 assert path is not None
409 changes_by_path[path][i] = change
411 def old_sha(c: TreeChange) -> ObjectID | None:
412 return c.old.sha if c.old is not None else None
414 def change_type(c: TreeChange) -> str:
415 return c.type
417 # Yield only conflicting changes.
418 for _, changes in sorted(changes_by_path.items()):
419 assert len(changes) == num_parents
420 have = [c for c in changes if c is not None]
421 if _all_eq(have, change_type, CHANGE_DELETE):
422 if not _all_same(have, old_sha):
423 yield changes
424 elif not _all_same(have, change_type):
425 yield changes
426 elif None not in changes:
427 # If no change was found relative to one parent, that means the SHA
428 # must have matched the SHA in that parent, so it is not a
429 # conflict.
430 yield changes
433_BLOCK_SIZE = 64
436def _count_blocks(obj: ShaFile) -> dict[int, int]:
437 """Count the blocks in an object.
439 Splits the data into blocks either on lines or <=64-byte chunks of lines.
441 Args:
442 obj: The object to count blocks for.
444 Returns:
445 A dict of block hashcode -> total bytes occurring.
446 """
447 block_counts: dict[int, int] = defaultdict(int)
448 block = BytesIO()
449 n = 0
451 # Cache attrs as locals to avoid expensive lookups in the inner loop.
452 block_write = block.write
453 block_seek = block.seek
454 block_truncate = block.truncate
455 block_getvalue = block.getvalue
457 for c in chain.from_iterable(obj.as_raw_chunks()):
458 cb = c.to_bytes(1, "big")
459 block_write(cb)
460 n += 1
461 if cb == b"\n" or n == _BLOCK_SIZE:
462 value = block_getvalue()
463 block_counts[hash(value)] += len(value)
464 block_seek(0)
465 block_truncate()
466 n = 0
467 if n > 0:
468 last_block = block_getvalue()
469 block_counts[hash(last_block)] += len(last_block)
470 return block_counts
473def _common_bytes(blocks1: Mapping[int, int], blocks2: Mapping[int, int]) -> int:
474 """Count the number of common bytes in two block count dicts.
476 Args:
477 blocks1: The first dict of block hashcode -> total bytes.
478 blocks2: The second dict of block hashcode -> total bytes.
480 Returns:
481 The number of bytes in common between blocks1 and blocks2. This is
482 only approximate due to possible hash collisions.
483 """
484 # Iterate over the smaller of the two dicts, since this is symmetrical.
485 if len(blocks1) > len(blocks2):
486 blocks1, blocks2 = blocks2, blocks1
487 score = 0
488 for block, count1 in blocks1.items():
489 count2 = blocks2.get(block)
490 if count2:
491 score += min(count1, count2)
492 return score
495def _similarity_score(
496 obj1: ShaFile,
497 obj2: ShaFile,
498 block_cache: dict[ObjectID, dict[int, int]] | None = None,
499) -> int:
500 """Compute a similarity score for two objects.
502 Args:
503 obj1: The first object to score.
504 obj2: The second object to score.
505 block_cache: An optional dict of SHA to block counts to cache
506 results between calls.
508 Returns:
509 The similarity score between the two objects, defined as the
510 number of bytes in common between the two objects divided by the
511 maximum size, scaled to the range 0-100.
512 """
513 if block_cache is None:
514 block_cache = {}
515 if obj1.id not in block_cache:
516 block_cache[obj1.id] = _count_blocks(obj1)
517 if obj2.id not in block_cache:
518 block_cache[obj2.id] = _count_blocks(obj2)
520 common_bytes = _common_bytes(block_cache[obj1.id], block_cache[obj2.id])
521 max_size = max(obj1.raw_length(), obj2.raw_length())
522 if not max_size:
523 return _MAX_SCORE
524 return int(float(common_bytes) * _MAX_SCORE / max_size)
527def _tree_change_key(entry: TreeChange) -> tuple[bytes, bytes]:
528 # Sort by old path then new path. If only one exists, use it for both keys.
529 path1 = entry.old.path if entry.old is not None else None
530 path2 = entry.new.path if entry.new is not None else None
531 if path1 is None:
532 path1 = path2
533 if path2 is None:
534 path2 = path1
535 assert path1 is not None
536 assert path2 is not None
537 return (path1, path2)
540class RenameDetector:
541 """Object for handling rename detection between two trees."""
543 _adds: list[TreeChange]
544 _deletes: list[TreeChange]
545 _changes: list[TreeChange]
546 _candidates: list[tuple[int, TreeChange]]
548 def __init__(
549 self,
550 store: BaseObjectStore,
551 rename_threshold: int = RENAME_THRESHOLD,
552 max_files: int | None = MAX_FILES,
553 rewrite_threshold: int | None = REWRITE_THRESHOLD,
554 find_copies_harder: bool = False,
555 ) -> None:
556 """Initialize the rename detector.
558 Args:
559 store: An ObjectStore for looking up objects.
560 rename_threshold: The threshold similarity score for considering
561 an add/delete pair to be a rename/copy; see _similarity_score.
562 max_files: The maximum number of adds and deletes to consider,
563 or None for no limit. The detector is guaranteed to compare no more
564 than max_files ** 2 add/delete pairs. This limit is provided
565 because rename detection can be quadratic in the project size. If
566 the limit is exceeded, no content rename detection is attempted.
567 rewrite_threshold: The threshold similarity score below which a
568 modify should be considered a delete/add, or None to not break
569 modifies; see _similarity_score.
570 find_copies_harder: If True, consider unmodified files when
571 detecting copies.
572 """
573 self._store = store
574 self._rename_threshold = rename_threshold
575 self._rewrite_threshold = rewrite_threshold
576 self._max_files = max_files
577 self._find_copies_harder = find_copies_harder
578 self._want_unchanged = False
580 def _reset(self) -> None:
581 self._adds = []
582 self._deletes = []
583 self._changes = []
585 def _should_split(self, change: TreeChange) -> bool:
586 if self._rewrite_threshold is None or change.type != CHANGE_MODIFY:
587 return False
588 assert change.old is not None and change.new is not None
589 if change.old.sha == change.new.sha:
590 return False
591 assert change.old.sha is not None
592 assert change.new.sha is not None
593 old_obj = self._store[change.old.sha]
594 new_obj = self._store[change.new.sha]
595 return _similarity_score(old_obj, new_obj) < self._rewrite_threshold
597 def _add_change(self, change: TreeChange) -> None:
598 if change.type == CHANGE_ADD:
599 self._adds.append(change)
600 elif change.type == CHANGE_DELETE:
601 self._deletes.append(change)
602 elif self._should_split(change):
603 assert change.old is not None and change.new is not None
604 self._deletes.append(TreeChange.delete(change.old))
605 self._adds.append(TreeChange.add(change.new))
606 elif (
607 self._find_copies_harder and change.type == CHANGE_UNCHANGED
608 ) or change.type == CHANGE_MODIFY:
609 # Treat all modifies as potential deletes for rename detection,
610 # but don't split them (to avoid spurious renames). Setting
611 # find_copies_harder means we treat unchanged the same as
612 # modified.
613 self._deletes.append(change)
614 else:
615 self._changes.append(change)
617 def _collect_changes(
618 self, tree1_id: ObjectID | None, tree2_id: ObjectID | None
619 ) -> None:
620 want_unchanged = self._find_copies_harder or self._want_unchanged
621 for change in tree_changes(
622 self._store,
623 tree1_id,
624 tree2_id,
625 want_unchanged=want_unchanged,
626 include_trees=self._include_trees,
627 ):
628 self._add_change(change)
630 def _prune(
631 self, add_paths: AbstractSet[bytes], delete_paths: AbstractSet[bytes]
632 ) -> None:
633 def check_add(a: TreeChange) -> bool:
634 assert a.new is not None
635 return a.new.path not in add_paths
637 def check_delete(d: TreeChange) -> bool:
638 assert d.old is not None
639 return d.old.path not in delete_paths
641 self._adds = [a for a in self._adds if check_add(a)]
642 self._deletes = [d for d in self._deletes if check_delete(d)]
644 def _find_exact_renames(self) -> None:
645 add_map = defaultdict(list)
646 for add in self._adds:
647 assert add.new is not None
648 add_map[add.new.sha].append(add.new)
649 delete_map = defaultdict(list)
650 for delete in self._deletes:
651 # Keep track of whether the delete was actually marked as a delete.
652 # If not, it needs to be marked as a copy.
653 is_delete = delete.type == CHANGE_DELETE
654 assert delete.old is not None
655 delete_map[delete.old.sha].append((delete.old, is_delete))
657 add_paths = set()
658 delete_paths = set()
659 for sha, sha_deletes in delete_map.items():
660 sha_adds = add_map[sha]
661 for (old, is_delete), new in zip(sha_deletes, sha_adds):
662 assert old.mode is not None
663 assert new.mode is not None
664 if stat.S_IFMT(old.mode) != stat.S_IFMT(new.mode):
665 continue
666 if is_delete:
667 assert old.path is not None
668 delete_paths.add(old.path)
669 assert new.path is not None
670 add_paths.add(new.path)
671 new_type = (is_delete and CHANGE_RENAME) or CHANGE_COPY
672 self._changes.append(TreeChange(new_type, old, new))
674 num_extra_adds = len(sha_adds) - len(sha_deletes)
675 # TODO(dborowitz): Less arbitrary way of dealing with extra copies.
676 old = sha_deletes[0][0]
677 if num_extra_adds > 0:
678 for new in sha_adds[-num_extra_adds:]:
679 assert new.path is not None
680 add_paths.add(new.path)
681 self._changes.append(TreeChange(CHANGE_COPY, old, new))
682 self._prune(add_paths, delete_paths)
684 def _should_find_content_renames(self) -> bool:
685 if self._max_files is None:
686 return True
687 return len(self._adds) * len(self._deletes) <= self._max_files**2
689 def _rename_type(
690 self, check_paths: bool, delete: TreeChange, add: TreeChange
691 ) -> str:
692 assert delete.old is not None and add.new is not None
693 if check_paths and delete.old.path == add.new.path:
694 # If the paths match, this must be a split modify, so make sure it
695 # comes out as a modify.
696 return CHANGE_MODIFY
697 elif delete.type != CHANGE_DELETE:
698 # If it's in deletes but not marked as a delete, it must have been
699 # added due to find_copies_harder, and needs to be marked as a
700 # copy.
701 return CHANGE_COPY
702 return CHANGE_RENAME
704 def _find_content_rename_candidates(self) -> None:
705 candidates = self._candidates = []
706 # TODO: Optimizations:
707 # - Compare object sizes before counting blocks.
708 # - Skip if delete's S_IFMT differs from all adds.
709 # - Skip if adds or deletes is empty.
710 # Match C git's behavior of not attempting to find content renames if
711 # the matrix size exceeds the threshold.
712 if not self._should_find_content_renames():
713 return
715 block_cache = {}
716 check_paths = self._rename_threshold is not None
717 for delete in self._deletes:
718 assert delete.old is not None
719 assert delete.old.mode is not None
720 if S_ISGITLINK(delete.old.mode):
721 continue # Git links don't exist in this repo.
722 assert delete.old.sha is not None
723 old_sha = delete.old.sha
724 old_obj = self._store[old_sha]
725 block_cache[old_sha] = _count_blocks(old_obj)
726 for add in self._adds:
727 assert add.new is not None
728 assert add.new.mode is not None
729 if stat.S_IFMT(delete.old.mode) != stat.S_IFMT(add.new.mode):
730 continue
731 assert add.new.sha is not None
732 new_obj = self._store[add.new.sha]
733 score = _similarity_score(old_obj, new_obj, block_cache=block_cache)
734 if score > self._rename_threshold:
735 new_type = self._rename_type(check_paths, delete, add)
736 rename = TreeChange(new_type, delete.old, add.new)
737 candidates.append((-score, rename))
739 def _choose_content_renames(self) -> None:
740 # Sort scores from highest to lowest, but keep names in ascending
741 # order.
742 self._candidates.sort()
744 delete_paths = set()
745 add_paths = set()
746 for _, change in self._candidates:
747 assert change.old is not None and change.new is not None
748 new_path = change.new.path
749 assert new_path is not None
750 if new_path in add_paths:
751 continue
752 old_path = change.old.path
753 assert old_path is not None
754 orig_type = change.type
755 if old_path in delete_paths:
756 change = TreeChange(CHANGE_COPY, change.old, change.new)
758 # If the candidate was originally a copy, that means it came from a
759 # modified or unchanged path, so we don't want to prune it.
760 if orig_type != CHANGE_COPY:
761 delete_paths.add(old_path)
762 add_paths.add(new_path)
763 self._changes.append(change)
764 self._prune(add_paths, delete_paths)
766 def _join_modifies(self) -> None:
767 if self._rewrite_threshold is None:
768 return
770 modifies = {}
771 delete_map = {}
772 for d in self._deletes:
773 assert d.old is not None
774 delete_map[d.old.path] = d
775 for add in self._adds:
776 assert add.new is not None
777 path = add.new.path
778 delete = delete_map.get(path)
779 if (
780 delete is not None
781 and delete.old is not None
782 and delete.old.mode is not None
783 and add.new.mode is not None
784 and stat.S_IFMT(delete.old.mode) == stat.S_IFMT(add.new.mode)
785 ):
786 modifies[path] = TreeChange(CHANGE_MODIFY, delete.old, add.new)
788 def check_add_mod(a: TreeChange) -> bool:
789 assert a.new is not None
790 return a.new.path not in modifies
792 def check_delete_mod(d: TreeChange) -> bool:
793 assert d.old is not None
794 return d.old.path not in modifies
796 self._adds = [a for a in self._adds if check_add_mod(a)]
797 self._deletes = [d for d in self._deletes if check_delete_mod(d)]
798 self._changes += modifies.values()
800 def _sorted_changes(self) -> list[TreeChange]:
801 result = []
802 result.extend(self._adds)
803 result.extend(self._deletes)
804 result.extend(self._changes)
805 result.sort(key=_tree_change_key)
806 return result
808 def _prune_unchanged(self) -> None:
809 if self._want_unchanged:
810 return
811 self._deletes = [d for d in self._deletes if d.type != CHANGE_UNCHANGED]
813 def changes_with_renames(
814 self,
815 tree1_id: ObjectID | None,
816 tree2_id: ObjectID | None,
817 want_unchanged: bool = False,
818 include_trees: bool = False,
819 ) -> list[TreeChange]:
820 """Iterate TreeChanges between two tree SHAs, with rename detection."""
821 self._reset()
822 self._want_unchanged = want_unchanged
823 self._include_trees = include_trees
824 self._collect_changes(tree1_id, tree2_id)
825 self._find_exact_renames()
826 self._find_content_rename_candidates()
827 self._choose_content_renames()
828 self._join_modifies()
829 self._prune_unchanged()
830 return self._sorted_changes()
833# Hold on to the pure-python implementations for testing.
834_is_tree_py = _is_tree
835_merge_entries_py = _merge_entries
836_count_blocks_py = _count_blocks
838if TYPE_CHECKING:
839 # For type checking, use the Python implementations
840 pass
841else:
842 # At runtime, try to import Rust extensions
843 try:
844 # Try to import Rust versions
845 from dulwich._diff_tree import (
846 _count_blocks as _rust_count_blocks,
847 )
848 from dulwich._diff_tree import (
849 _is_tree as _rust_is_tree,
850 )
851 from dulwich._diff_tree import (
852 _merge_entries as _rust_merge_entries,
853 )
855 # Override with Rust versions
856 _count_blocks = _rust_count_blocks
857 _is_tree = _rust_is_tree
858 _merge_entries = _rust_merge_entries
859 except ImportError:
860 pass