1# diff_tree.py -- Utilities for diffing files and trees.
2# Copyright (C) 2010 Google, Inc.
3#
4# SPDX-License-Identifier: Apache-2.0 OR GPL-2.0-or-later
5# Dulwich is dual-licensed under the Apache License, Version 2.0 and the GNU
6# General Public License as public by the Free Software Foundation; version 2.0
7# or (at your option) any later version. You can redistribute it and/or
8# modify it under the terms of either of these two licenses.
9#
10# Unless required by applicable law or agreed to in writing, software
11# distributed under the License is distributed on an "AS IS" BASIS,
12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13# See the License for the specific language governing permissions and
14# limitations under the License.
15#
16# You should have received a copy of the licenses; if not, see
17# <http://www.gnu.org/licenses/> for a copy of the GNU General Public License
18# and <http://www.apache.org/licenses/LICENSE-2.0> for a copy of the Apache
19# License, Version 2.0.
20#
21
22"""Utilities for diffing files and trees."""
23
24import stat
25from collections import defaultdict, namedtuple
26from collections.abc import Iterator
27from io import BytesIO
28from itertools import chain
29from typing import TYPE_CHECKING, Any, Callable, Optional, TypeVar
30
31from .object_store import BaseObjectStore
32from .objects import S_ISGITLINK, ObjectID, ShaFile, Tree, TreeEntry
33
34# TreeChange type constants.
35CHANGE_ADD = "add"
36CHANGE_MODIFY = "modify"
37CHANGE_DELETE = "delete"
38CHANGE_RENAME = "rename"
39CHANGE_COPY = "copy"
40CHANGE_UNCHANGED = "unchanged"
41
42RENAME_CHANGE_TYPES = (CHANGE_RENAME, CHANGE_COPY)
43
44_NULL_ENTRY = TreeEntry(None, None, None)
45
46_MAX_SCORE = 100
47RENAME_THRESHOLD = 60
48MAX_FILES = 200
49REWRITE_THRESHOLD = None
50
51
52class TreeChange(namedtuple("TreeChange", ["type", "old", "new"])):
53 """Named tuple a single change between two trees."""
54
55 @classmethod
56 def add(cls, new: TreeEntry) -> "TreeChange":
57 return cls(CHANGE_ADD, _NULL_ENTRY, new)
58
59 @classmethod
60 def delete(cls, old: TreeEntry) -> "TreeChange":
61 return cls(CHANGE_DELETE, old, _NULL_ENTRY)
62
63
64def _tree_entries(path: bytes, tree: Tree) -> list[TreeEntry]:
65 result: list[TreeEntry] = []
66 if not tree:
67 return result
68 for entry in tree.iteritems(name_order=True):
69 result.append(entry.in_path(path))
70 return result
71
72
73def _merge_entries(
74 path: bytes, tree1: Tree, tree2: Tree
75) -> list[tuple[TreeEntry, TreeEntry]]:
76 """Merge the entries of two trees.
77
78 Args:
79 path: A path to prepend to all tree entry names.
80 tree1: The first Tree object to iterate, or None.
81 tree2: The second Tree object to iterate, or None.
82
83 Returns:
84 A list of pairs of TreeEntry objects for each pair of entries in
85 the trees. If an entry exists in one tree but not the other, the other
86 entry will have all attributes set to None. If neither entry's path is
87 None, they are guaranteed to match.
88 """
89 entries1 = _tree_entries(path, tree1)
90 entries2 = _tree_entries(path, tree2)
91 i1 = i2 = 0
92 len1 = len(entries1)
93 len2 = len(entries2)
94
95 result = []
96 while i1 < len1 and i2 < len2:
97 entry1 = entries1[i1]
98 entry2 = entries2[i2]
99 if entry1.path < entry2.path:
100 result.append((entry1, _NULL_ENTRY))
101 i1 += 1
102 elif entry1.path > entry2.path:
103 result.append((_NULL_ENTRY, entry2))
104 i2 += 1
105 else:
106 result.append((entry1, entry2))
107 i1 += 1
108 i2 += 1
109 for i in range(i1, len1):
110 result.append((entries1[i], _NULL_ENTRY))
111 for i in range(i2, len2):
112 result.append((_NULL_ENTRY, entries2[i]))
113 return result
114
115
116def _is_tree(entry: TreeEntry) -> bool:
117 mode = entry.mode
118 if mode is None:
119 return False
120 return stat.S_ISDIR(mode)
121
122
123def walk_trees(
124 store: BaseObjectStore,
125 tree1_id: Optional[ObjectID],
126 tree2_id: Optional[ObjectID],
127 prune_identical: bool = False,
128) -> Iterator[tuple[TreeEntry, TreeEntry]]:
129 """Recursively walk all the entries of two trees.
130
131 Iteration is depth-first pre-order, as in e.g. os.walk.
132
133 Args:
134 store: An ObjectStore for looking up objects.
135 tree1_id: The SHA of the first Tree object to iterate, or None.
136 tree2_id: The SHA of the second Tree object to iterate, or None.
137 prune_identical: If True, identical subtrees will not be walked.
138
139 Returns:
140 Iterator over Pairs of TreeEntry objects for each pair of entries
141 in the trees and their subtrees recursively. If an entry exists in one
142 tree but not the other, the other entry will have all attributes set
143 to None. If neither entry's path is None, they are guaranteed to
144 match.
145 """
146 # This could be fairly easily generalized to >2 trees if we find a use
147 # case.
148 mode1 = (tree1_id and stat.S_IFDIR) or None
149 mode2 = (tree2_id and stat.S_IFDIR) or None
150 todo = [(TreeEntry(b"", mode1, tree1_id), TreeEntry(b"", mode2, tree2_id))]
151 while todo:
152 entry1, entry2 = todo.pop()
153 is_tree1 = _is_tree(entry1)
154 is_tree2 = _is_tree(entry2)
155 if prune_identical and is_tree1 and is_tree2 and entry1 == entry2:
156 continue
157
158 tree1 = (is_tree1 and store[entry1.sha]) or None
159 tree2 = (is_tree2 and store[entry2.sha]) or None
160 path = entry1.path or entry2.path
161
162 # Ensure trees are Tree objects before merging
163 if tree1 is not None and not isinstance(tree1, Tree):
164 tree1 = None
165 if tree2 is not None and not isinstance(tree2, Tree):
166 tree2 = None
167
168 if tree1 is not None or tree2 is not None:
169 # Use empty trees for None values
170 if tree1 is None:
171 tree1 = Tree()
172 if tree2 is None:
173 tree2 = Tree()
174 todo.extend(reversed(_merge_entries(path, tree1, tree2)))
175 yield entry1, entry2
176
177
178def _skip_tree(entry: TreeEntry, include_trees: bool) -> TreeEntry:
179 if entry.mode is None or (not include_trees and stat.S_ISDIR(entry.mode)):
180 return _NULL_ENTRY
181 return entry
182
183
184def tree_changes(
185 store: BaseObjectStore,
186 tree1_id: Optional[ObjectID],
187 tree2_id: Optional[ObjectID],
188 want_unchanged: bool = False,
189 rename_detector: Optional["RenameDetector"] = None,
190 include_trees: bool = False,
191 change_type_same: bool = False,
192) -> Iterator[TreeChange]:
193 """Find the differences between the contents of two trees.
194
195 Args:
196 store: An ObjectStore for looking up objects.
197 tree1_id: The SHA of the source tree.
198 tree2_id: The SHA of the target tree.
199 want_unchanged: If True, include TreeChanges for unmodified entries
200 as well.
201 include_trees: Whether to include trees
202 rename_detector: RenameDetector object for detecting renames.
203 change_type_same: Whether to report change types in the same
204 entry or as delete+add.
205
206 Returns:
207 Iterator over TreeChange instances for each change between the
208 source and target tree.
209 """
210 if rename_detector is not None and tree1_id is not None and tree2_id is not None:
211 yield from rename_detector.changes_with_renames(
212 tree1_id,
213 tree2_id,
214 want_unchanged=want_unchanged,
215 include_trees=include_trees,
216 )
217 return
218
219 entries = walk_trees(
220 store, tree1_id, tree2_id, prune_identical=(not want_unchanged)
221 )
222 for entry1, entry2 in entries:
223 if entry1 == entry2 and not want_unchanged:
224 continue
225
226 # Treat entries for trees as missing.
227 entry1 = _skip_tree(entry1, include_trees)
228 entry2 = _skip_tree(entry2, include_trees)
229
230 if entry1 != _NULL_ENTRY and entry2 != _NULL_ENTRY:
231 if (
232 stat.S_IFMT(entry1.mode) != stat.S_IFMT(entry2.mode)
233 and not change_type_same
234 ):
235 # File type changed: report as delete/add.
236 yield TreeChange.delete(entry1)
237 entry1 = _NULL_ENTRY
238 change_type = CHANGE_ADD
239 elif entry1 == entry2:
240 change_type = CHANGE_UNCHANGED
241 else:
242 change_type = CHANGE_MODIFY
243 elif entry1 != _NULL_ENTRY:
244 change_type = CHANGE_DELETE
245 elif entry2 != _NULL_ENTRY:
246 change_type = CHANGE_ADD
247 else:
248 # Both were None because at least one was a tree.
249 continue
250 yield TreeChange(change_type, entry1, entry2)
251
252
253T = TypeVar("T")
254U = TypeVar("U")
255
256
257def _all_eq(seq: list[T], key: Callable[[T], U], value: U) -> bool:
258 for e in seq:
259 if key(e) != value:
260 return False
261 return True
262
263
264def _all_same(seq: list[Any], key: Callable[[Any], Any]) -> bool:
265 return _all_eq(seq[1:], key, key(seq[0]))
266
267
268def tree_changes_for_merge(
269 store: BaseObjectStore,
270 parent_tree_ids: list[ObjectID],
271 tree_id: ObjectID,
272 rename_detector: Optional["RenameDetector"] = None,
273) -> Iterator[list[Optional[TreeChange]]]:
274 """Get the tree changes for a merge tree relative to all its parents.
275
276 Args:
277 store: An ObjectStore for looking up objects.
278 parent_tree_ids: An iterable of the SHAs of the parent trees.
279 tree_id: The SHA of the merge tree.
280 rename_detector: RenameDetector object for detecting renames.
281
282 Returns:
283 Iterator over lists of TreeChange objects, one per conflicted path
284 in the merge.
285
286 Each list contains one element per parent, with the TreeChange for that
287 path relative to that parent. An element may be None if it never
288 existed in one parent and was deleted in two others.
289
290 A path is only included in the output if it is a conflict, i.e. its SHA
291 in the merge tree is not found in any of the parents, or in the case of
292 deletes, if not all of the old SHAs match.
293 """
294 all_parent_changes = [
295 tree_changes(store, t, tree_id, rename_detector=rename_detector)
296 for t in parent_tree_ids
297 ]
298 num_parents = len(parent_tree_ids)
299 changes_by_path: dict[str, list[Optional[TreeChange]]] = defaultdict(
300 lambda: [None] * num_parents
301 )
302
303 # Organize by path.
304 for i, parent_changes in enumerate(all_parent_changes):
305 for change in parent_changes:
306 if change.type == CHANGE_DELETE:
307 path = change.old.path
308 else:
309 path = change.new.path
310 changes_by_path[path][i] = change
311
312 def old_sha(c: TreeChange) -> Optional[ObjectID]:
313 return c.old.sha
314
315 def change_type(c: TreeChange) -> str:
316 return c.type
317
318 # Yield only conflicting changes.
319 for _, changes in sorted(changes_by_path.items()):
320 assert len(changes) == num_parents
321 have = [c for c in changes if c is not None]
322 if _all_eq(have, change_type, CHANGE_DELETE):
323 if not _all_same(have, old_sha):
324 yield changes
325 elif not _all_same(have, change_type):
326 yield changes
327 elif None not in changes:
328 # If no change was found relative to one parent, that means the SHA
329 # must have matched the SHA in that parent, so it is not a
330 # conflict.
331 yield changes
332
333
334_BLOCK_SIZE = 64
335
336
337def _count_blocks(obj: ShaFile) -> dict[int, int]:
338 """Count the blocks in an object.
339
340 Splits the data into blocks either on lines or <=64-byte chunks of lines.
341
342 Args:
343 obj: The object to count blocks for.
344
345 Returns:
346 A dict of block hashcode -> total bytes occurring.
347 """
348 block_counts: dict[int, int] = defaultdict(int)
349 block = BytesIO()
350 n = 0
351
352 # Cache attrs as locals to avoid expensive lookups in the inner loop.
353 block_write = block.write
354 block_seek = block.seek
355 block_truncate = block.truncate
356 block_getvalue = block.getvalue
357
358 for c in chain.from_iterable(obj.as_raw_chunks()):
359 cb = c.to_bytes(1, "big")
360 block_write(cb)
361 n += 1
362 if cb == b"\n" or n == _BLOCK_SIZE:
363 value = block_getvalue()
364 block_counts[hash(value)] += len(value)
365 block_seek(0)
366 block_truncate()
367 n = 0
368 if n > 0:
369 last_block = block_getvalue()
370 block_counts[hash(last_block)] += len(last_block)
371 return block_counts
372
373
374def _common_bytes(blocks1: dict[int, int], blocks2: dict[int, int]) -> int:
375 """Count the number of common bytes in two block count dicts.
376
377 Args:
378 blocks1: The first dict of block hashcode -> total bytes.
379 blocks2: The second dict of block hashcode -> total bytes.
380
381 Returns:
382 The number of bytes in common between blocks1 and blocks2. This is
383 only approximate due to possible hash collisions.
384 """
385 # Iterate over the smaller of the two dicts, since this is symmetrical.
386 if len(blocks1) > len(blocks2):
387 blocks1, blocks2 = blocks2, blocks1
388 score = 0
389 for block, count1 in blocks1.items():
390 count2 = blocks2.get(block)
391 if count2:
392 score += min(count1, count2)
393 return score
394
395
396def _similarity_score(
397 obj1: ShaFile,
398 obj2: ShaFile,
399 block_cache: Optional[dict[ObjectID, dict[int, int]]] = None,
400) -> int:
401 """Compute a similarity score for two objects.
402
403 Args:
404 obj1: The first object to score.
405 obj2: The second object to score.
406 block_cache: An optional dict of SHA to block counts to cache
407 results between calls.
408
409 Returns:
410 The similarity score between the two objects, defined as the
411 number of bytes in common between the two objects divided by the
412 maximum size, scaled to the range 0-100.
413 """
414 if block_cache is None:
415 block_cache = {}
416 if obj1.id not in block_cache:
417 block_cache[obj1.id] = _count_blocks(obj1)
418 if obj2.id not in block_cache:
419 block_cache[obj2.id] = _count_blocks(obj2)
420
421 common_bytes = _common_bytes(block_cache[obj1.id], block_cache[obj2.id])
422 max_size = max(obj1.raw_length(), obj2.raw_length())
423 if not max_size:
424 return _MAX_SCORE
425 return int(float(common_bytes) * _MAX_SCORE / max_size)
426
427
428def _tree_change_key(entry: TreeChange) -> tuple[bytes, bytes]:
429 # Sort by old path then new path. If only one exists, use it for both keys.
430 path1 = entry.old.path
431 path2 = entry.new.path
432 if path1 is None:
433 path1 = path2
434 if path2 is None:
435 path2 = path1
436 return (path1, path2)
437
438
439class RenameDetector:
440 """Object for handling rename detection between two trees."""
441
442 _adds: list[TreeChange]
443 _deletes: list[TreeChange]
444 _changes: list[TreeChange]
445 _candidates: list[tuple[int, TreeChange]]
446
447 def __init__(
448 self,
449 store: BaseObjectStore,
450 rename_threshold: int = RENAME_THRESHOLD,
451 max_files: Optional[int] = MAX_FILES,
452 rewrite_threshold: Optional[int] = REWRITE_THRESHOLD,
453 find_copies_harder: bool = False,
454 ) -> None:
455 """Initialize the rename detector.
456
457 Args:
458 store: An ObjectStore for looking up objects.
459 rename_threshold: The threshold similarity score for considering
460 an add/delete pair to be a rename/copy; see _similarity_score.
461 max_files: The maximum number of adds and deletes to consider,
462 or None for no limit. The detector is guaranteed to compare no more
463 than max_files ** 2 add/delete pairs. This limit is provided
464 because rename detection can be quadratic in the project size. If
465 the limit is exceeded, no content rename detection is attempted.
466 rewrite_threshold: The threshold similarity score below which a
467 modify should be considered a delete/add, or None to not break
468 modifies; see _similarity_score.
469 find_copies_harder: If True, consider unmodified files when
470 detecting copies.
471 """
472 self._store = store
473 self._rename_threshold = rename_threshold
474 self._rewrite_threshold = rewrite_threshold
475 self._max_files = max_files
476 self._find_copies_harder = find_copies_harder
477 self._want_unchanged = False
478
479 def _reset(self) -> None:
480 self._adds = []
481 self._deletes = []
482 self._changes = []
483
484 def _should_split(self, change: TreeChange) -> bool:
485 if (
486 self._rewrite_threshold is None
487 or change.type != CHANGE_MODIFY
488 or change.old.sha == change.new.sha
489 ):
490 return False
491 old_obj = self._store[change.old.sha]
492 new_obj = self._store[change.new.sha]
493 return _similarity_score(old_obj, new_obj) < self._rewrite_threshold
494
495 def _add_change(self, change: TreeChange) -> None:
496 if change.type == CHANGE_ADD:
497 self._adds.append(change)
498 elif change.type == CHANGE_DELETE:
499 self._deletes.append(change)
500 elif self._should_split(change):
501 self._deletes.append(TreeChange.delete(change.old))
502 self._adds.append(TreeChange.add(change.new))
503 elif (
504 self._find_copies_harder and change.type == CHANGE_UNCHANGED
505 ) or change.type == CHANGE_MODIFY:
506 # Treat all modifies as potential deletes for rename detection,
507 # but don't split them (to avoid spurious renames). Setting
508 # find_copies_harder means we treat unchanged the same as
509 # modified.
510 self._deletes.append(change)
511 else:
512 self._changes.append(change)
513
514 def _collect_changes(
515 self, tree1_id: Optional[ObjectID], tree2_id: Optional[ObjectID]
516 ) -> None:
517 want_unchanged = self._find_copies_harder or self._want_unchanged
518 for change in tree_changes(
519 self._store,
520 tree1_id,
521 tree2_id,
522 want_unchanged=want_unchanged,
523 include_trees=self._include_trees,
524 ):
525 self._add_change(change)
526
527 def _prune(self, add_paths: set[bytes], delete_paths: set[bytes]) -> None:
528 self._adds = [a for a in self._adds if a.new.path not in add_paths]
529 self._deletes = [d for d in self._deletes if d.old.path not in delete_paths]
530
531 def _find_exact_renames(self) -> None:
532 add_map = defaultdict(list)
533 for add in self._adds:
534 add_map[add.new.sha].append(add.new)
535 delete_map = defaultdict(list)
536 for delete in self._deletes:
537 # Keep track of whether the delete was actually marked as a delete.
538 # If not, it needs to be marked as a copy.
539 is_delete = delete.type == CHANGE_DELETE
540 delete_map[delete.old.sha].append((delete.old, is_delete))
541
542 add_paths = set()
543 delete_paths = set()
544 for sha, sha_deletes in delete_map.items():
545 sha_adds = add_map[sha]
546 for (old, is_delete), new in zip(sha_deletes, sha_adds):
547 if stat.S_IFMT(old.mode) != stat.S_IFMT(new.mode):
548 continue
549 if is_delete:
550 delete_paths.add(old.path)
551 add_paths.add(new.path)
552 new_type = (is_delete and CHANGE_RENAME) or CHANGE_COPY
553 self._changes.append(TreeChange(new_type, old, new))
554
555 num_extra_adds = len(sha_adds) - len(sha_deletes)
556 # TODO(dborowitz): Less arbitrary way of dealing with extra copies.
557 old = sha_deletes[0][0]
558 if num_extra_adds > 0:
559 for new in sha_adds[-num_extra_adds:]:
560 add_paths.add(new.path)
561 self._changes.append(TreeChange(CHANGE_COPY, old, new))
562 self._prune(add_paths, delete_paths)
563
564 def _should_find_content_renames(self) -> bool:
565 if self._max_files is None:
566 return True
567 return len(self._adds) * len(self._deletes) <= self._max_files**2
568
569 def _rename_type(
570 self, check_paths: bool, delete: TreeChange, add: TreeChange
571 ) -> str:
572 if check_paths and delete.old.path == add.new.path:
573 # If the paths match, this must be a split modify, so make sure it
574 # comes out as a modify.
575 return CHANGE_MODIFY
576 elif delete.type != CHANGE_DELETE:
577 # If it's in deletes but not marked as a delete, it must have been
578 # added due to find_copies_harder, and needs to be marked as a
579 # copy.
580 return CHANGE_COPY
581 return CHANGE_RENAME
582
583 def _find_content_rename_candidates(self) -> None:
584 candidates = self._candidates = []
585 # TODO: Optimizations:
586 # - Compare object sizes before counting blocks.
587 # - Skip if delete's S_IFMT differs from all adds.
588 # - Skip if adds or deletes is empty.
589 # Match C git's behavior of not attempting to find content renames if
590 # the matrix size exceeds the threshold.
591 if not self._should_find_content_renames():
592 return
593
594 block_cache = {}
595 check_paths = self._rename_threshold is not None
596 for delete in self._deletes:
597 if S_ISGITLINK(delete.old.mode):
598 continue # Git links don't exist in this repo.
599 old_sha = delete.old.sha
600 old_obj = self._store[old_sha]
601 block_cache[old_sha] = _count_blocks(old_obj)
602 for add in self._adds:
603 if stat.S_IFMT(delete.old.mode) != stat.S_IFMT(add.new.mode):
604 continue
605 new_obj = self._store[add.new.sha]
606 score = _similarity_score(old_obj, new_obj, block_cache=block_cache)
607 if score > self._rename_threshold:
608 new_type = self._rename_type(check_paths, delete, add)
609 rename = TreeChange(new_type, delete.old, add.new)
610 candidates.append((-score, rename))
611
612 def _choose_content_renames(self) -> None:
613 # Sort scores from highest to lowest, but keep names in ascending
614 # order.
615 self._candidates.sort()
616
617 delete_paths = set()
618 add_paths = set()
619 for _, change in self._candidates:
620 new_path = change.new.path
621 if new_path in add_paths:
622 continue
623 old_path = change.old.path
624 orig_type = change.type
625 if old_path in delete_paths:
626 change = TreeChange(CHANGE_COPY, change.old, change.new)
627
628 # If the candidate was originally a copy, that means it came from a
629 # modified or unchanged path, so we don't want to prune it.
630 if orig_type != CHANGE_COPY:
631 delete_paths.add(old_path)
632 add_paths.add(new_path)
633 self._changes.append(change)
634 self._prune(add_paths, delete_paths)
635
636 def _join_modifies(self) -> None:
637 if self._rewrite_threshold is None:
638 return
639
640 modifies = {}
641 delete_map = {d.old.path: d for d in self._deletes}
642 for add in self._adds:
643 path = add.new.path
644 delete = delete_map.get(path)
645 if delete is not None and stat.S_IFMT(delete.old.mode) == stat.S_IFMT(
646 add.new.mode
647 ):
648 modifies[path] = TreeChange(CHANGE_MODIFY, delete.old, add.new)
649
650 self._adds = [a for a in self._adds if a.new.path not in modifies]
651 self._deletes = [a for a in self._deletes if a.new.path not in modifies]
652 self._changes += modifies.values()
653
654 def _sorted_changes(self) -> list[TreeChange]:
655 result = []
656 result.extend(self._adds)
657 result.extend(self._deletes)
658 result.extend(self._changes)
659 result.sort(key=_tree_change_key)
660 return result
661
662 def _prune_unchanged(self) -> None:
663 if self._want_unchanged:
664 return
665 self._deletes = [d for d in self._deletes if d.type != CHANGE_UNCHANGED]
666
667 def changes_with_renames(
668 self,
669 tree1_id: Optional[ObjectID],
670 tree2_id: Optional[ObjectID],
671 want_unchanged: bool = False,
672 include_trees: bool = False,
673 ) -> list[TreeChange]:
674 """Iterate TreeChanges between two tree SHAs, with rename detection."""
675 self._reset()
676 self._want_unchanged = want_unchanged
677 self._include_trees = include_trees
678 self._collect_changes(tree1_id, tree2_id)
679 self._find_exact_renames()
680 self._find_content_rename_candidates()
681 self._choose_content_renames()
682 self._join_modifies()
683 self._prune_unchanged()
684 return self._sorted_changes()
685
686
687# Hold on to the pure-python implementations for testing.
688_is_tree_py = _is_tree
689_merge_entries_py = _merge_entries
690_count_blocks_py = _count_blocks
691
692if TYPE_CHECKING:
693 # For type checking, use the Python implementations
694 pass
695else:
696 # At runtime, try to import Rust extensions
697 try:
698 # Try to import Rust versions
699 from dulwich._diff_tree import (
700 _count_blocks as _rust_count_blocks,
701 )
702 from dulwich._diff_tree import (
703 _is_tree as _rust_is_tree,
704 )
705 from dulwich._diff_tree import (
706 _merge_entries as _rust_merge_entries,
707 )
708
709 # Override with Rust versions
710 _count_blocks = _rust_count_blocks
711 _is_tree = _rust_is_tree
712 _merge_entries = _rust_merge_entries
713 except ImportError:
714 pass