Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/dulwich/diff_tree.py: 20%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

442 statements  

1# diff_tree.py -- Utilities for diffing files and trees. 

2# Copyright (C) 2010 Google, Inc. 

3# 

4# SPDX-License-Identifier: Apache-2.0 OR GPL-2.0-or-later 

5# Dulwich is dual-licensed under the Apache License, Version 2.0 and the GNU 

6# General Public License as published by the Free Software Foundation; version 2.0 

7# or (at your option) any later version. You can redistribute it and/or 

8# modify it under the terms of either of these two licenses. 

9# 

10# Unless required by applicable law or agreed to in writing, software 

11# distributed under the License is distributed on an "AS IS" BASIS, 

12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

13# See the License for the specific language governing permissions and 

14# limitations under the License. 

15# 

16# You should have received a copy of the licenses; if not, see 

17# <http://www.gnu.org/licenses/> for a copy of the GNU General Public License 

18# and <http://www.apache.org/licenses/LICENSE-2.0> for a copy of the Apache 

19# License, Version 2.0. 

20# 

21 

22"""Utilities for diffing files and trees.""" 

23 

24import stat 

25from collections import defaultdict 

26from collections.abc import Iterator, Mapping, Sequence 

27from collections.abc import Set as AbstractSet 

28from io import BytesIO 

29from itertools import chain 

30from typing import TYPE_CHECKING, Any, Callable, NamedTuple, Optional, TypeVar 

31 

32from .object_store import BaseObjectStore 

33from .objects import S_ISGITLINK, ObjectID, ShaFile, Tree, TreeEntry 

34 

35# TreeChange type constants. 

36CHANGE_ADD = "add" 

37CHANGE_MODIFY = "modify" 

38CHANGE_DELETE = "delete" 

39CHANGE_RENAME = "rename" 

40CHANGE_COPY = "copy" 

41CHANGE_UNCHANGED = "unchanged" 

42 

43RENAME_CHANGE_TYPES = (CHANGE_RENAME, CHANGE_COPY) 

44 

45# _NULL_ENTRY removed - using None instead 

46 

47_MAX_SCORE = 100 

48RENAME_THRESHOLD = 60 

49MAX_FILES = 200 

50REWRITE_THRESHOLD: Optional[int] = None 

51 

52 

53class TreeChange(NamedTuple): 

54 """Named tuple a single change between two trees.""" 

55 

56 type: str 

57 old: Optional[TreeEntry] 

58 new: Optional[TreeEntry] 

59 

60 @classmethod 

61 def add(cls, new: TreeEntry) -> "TreeChange": 

62 """Create a TreeChange for an added entry. 

63 

64 Args: 

65 new: New tree entry 

66 

67 Returns: 

68 TreeChange instance 

69 """ 

70 return cls(CHANGE_ADD, None, new) 

71 

72 @classmethod 

73 def delete(cls, old: TreeEntry) -> "TreeChange": 

74 """Create a TreeChange for a deleted entry. 

75 

76 Args: 

77 old: Old tree entry 

78 

79 Returns: 

80 TreeChange instance 

81 """ 

82 return cls(CHANGE_DELETE, old, None) 

83 

84 

85def _tree_entries(path: bytes, tree: Tree) -> list[TreeEntry]: 

86 result: list[TreeEntry] = [] 

87 if not tree: 

88 return result 

89 for entry in tree.iteritems(name_order=True): 

90 result.append(entry.in_path(path)) 

91 return result 

92 

93 

94def _merge_entries( 

95 path: bytes, tree1: Tree, tree2: Tree 

96) -> list[tuple[Optional[TreeEntry], Optional[TreeEntry]]]: 

97 """Merge the entries of two trees. 

98 

99 Args: 

100 path: A path to prepend to all tree entry names. 

101 tree1: The first Tree object to iterate, or None. 

102 tree2: The second Tree object to iterate, or None. 

103 

104 Returns: 

105 A list of pairs of TreeEntry objects for each pair of entries in 

106 the trees. If an entry exists in one tree but not the other, the other 

107 entry will be None. If both entries exist, they are guaranteed to match. 

108 """ 

109 entries1 = _tree_entries(path, tree1) 

110 entries2 = _tree_entries(path, tree2) 

111 i1 = i2 = 0 

112 len1 = len(entries1) 

113 len2 = len(entries2) 

114 

115 result: list[tuple[Optional[TreeEntry], Optional[TreeEntry]]] = [] 

116 while i1 < len1 and i2 < len2: 

117 entry1 = entries1[i1] 

118 entry2 = entries2[i2] 

119 if entry1.path < entry2.path: 

120 result.append((entry1, None)) 

121 i1 += 1 

122 elif entry1.path > entry2.path: 

123 result.append((None, entry2)) 

124 i2 += 1 

125 else: 

126 result.append((entry1, entry2)) 

127 i1 += 1 

128 i2 += 1 

129 for i in range(i1, len1): 

130 result.append((entries1[i], None)) 

131 for i in range(i2, len2): 

132 result.append((None, entries2[i])) 

133 return result 

134 

135 

136def _is_tree(entry: Optional[TreeEntry]) -> bool: 

137 if entry is None or entry.mode is None: 

138 return False 

139 return stat.S_ISDIR(entry.mode) 

140 

141 

142def walk_trees( 

143 store: BaseObjectStore, 

144 tree1_id: Optional[ObjectID], 

145 tree2_id: Optional[ObjectID], 

146 prune_identical: bool = False, 

147 paths: Optional[Sequence[bytes]] = None, 

148) -> Iterator[tuple[Optional[TreeEntry], Optional[TreeEntry]]]: 

149 """Recursively walk all the entries of two trees. 

150 

151 Iteration is depth-first pre-order, as in e.g. os.walk. 

152 

153 Args: 

154 store: An ObjectStore for looking up objects. 

155 tree1_id: The SHA of the first Tree object to iterate, or None. 

156 tree2_id: The SHA of the second Tree object to iterate, or None. 

157 prune_identical: If True, identical subtrees will not be walked. 

158 paths: Optional list of paths to filter to (as bytes). 

159 

160 Returns: 

161 Iterator over Pairs of TreeEntry objects for each pair of entries 

162 in the trees and their subtrees recursively. If an entry exists in one 

163 tree but not the other, the other entry will be None. If both entries 

164 exist, they are guaranteed to match. 

165 """ 

166 # This could be fairly easily generalized to >2 trees if we find a use 

167 # case. 

168 entry1 = TreeEntry(b"", stat.S_IFDIR, tree1_id) if tree1_id else None 

169 entry2 = TreeEntry(b"", stat.S_IFDIR, tree2_id) if tree2_id else None 

170 todo: list[tuple[Optional[TreeEntry], Optional[TreeEntry]]] = [(entry1, entry2)] 

171 while todo: 

172 entry1, entry2 = todo.pop() 

173 is_tree1 = _is_tree(entry1) 

174 is_tree2 = _is_tree(entry2) 

175 if prune_identical and is_tree1 and is_tree2 and entry1 == entry2: 

176 continue 

177 

178 tree1 = (is_tree1 and entry1 and store[entry1.sha]) or None 

179 tree2 = (is_tree2 and entry2 and store[entry2.sha]) or None 

180 path = ( 

181 (entry1.path if entry1 else None) 

182 or (entry2.path if entry2 else None) 

183 or b"" 

184 ) 

185 

186 # If we have path filters, check if we should process this tree 

187 if paths is not None and (is_tree1 or is_tree2) and path is not None: 

188 # Special case for root tree 

189 if path == b"": 

190 should_recurse = True 

191 else: 

192 # Check if any of our filter paths could be under this tree 

193 should_recurse = False 

194 for filter_path in paths: 

195 if filter_path == path: 

196 # Exact match - we want this directory itself 

197 should_recurse = True 

198 break 

199 elif filter_path.startswith(path + b"/"): 

200 # Filter path is under this directory 

201 should_recurse = True 

202 break 

203 elif path.startswith(filter_path + b"/"): 

204 # This directory is under a filter path 

205 should_recurse = True 

206 break 

207 if not should_recurse: 

208 # Skip this tree entirely 

209 continue 

210 

211 # Ensure trees are Tree objects before merging 

212 if tree1 is not None and not isinstance(tree1, Tree): 

213 tree1 = None 

214 if tree2 is not None and not isinstance(tree2, Tree): 

215 tree2 = None 

216 

217 if tree1 is not None or tree2 is not None: 

218 # Use empty trees for None values 

219 if tree1 is None: 

220 tree1 = Tree() 

221 if tree2 is None: 

222 tree2 = Tree() 

223 assert path is not None 

224 todo.extend(reversed(_merge_entries(path, tree1, tree2))) 

225 

226 # Only yield entries that match our path filters 

227 if paths is None: 

228 yield entry1, entry2 

229 else: 

230 # Check if this entry matches any of our filters 

231 for filter_path in paths: 

232 if path == filter_path: 

233 # Exact match 

234 yield entry1, entry2 

235 break 

236 elif path is not None and path.startswith(filter_path + b"/"): 

237 # This entry is under a filter directory 

238 yield entry1, entry2 

239 break 

240 elif ( 

241 path is not None 

242 and filter_path.startswith(path + b"/") 

243 and (is_tree1 or is_tree2) 

244 ): 

245 # This is a parent directory of a filter path 

246 yield entry1, entry2 

247 break 

248 

249 

250def _skip_tree(entry: Optional[TreeEntry], include_trees: bool) -> Optional[TreeEntry]: 

251 if entry is None or entry.mode is None: 

252 return None 

253 if not include_trees and stat.S_ISDIR(entry.mode): 

254 return None 

255 return entry 

256 

257 

258def tree_changes( 

259 store: BaseObjectStore, 

260 tree1_id: Optional[ObjectID], 

261 tree2_id: Optional[ObjectID], 

262 want_unchanged: bool = False, 

263 rename_detector: Optional["RenameDetector"] = None, 

264 include_trees: bool = False, 

265 change_type_same: bool = False, 

266 paths: Optional[Sequence[bytes]] = None, 

267) -> Iterator[TreeChange]: 

268 """Find the differences between the contents of two trees. 

269 

270 Args: 

271 store: An ObjectStore for looking up objects. 

272 tree1_id: The SHA of the source tree. 

273 tree2_id: The SHA of the target tree. 

274 want_unchanged: If True, include TreeChanges for unmodified entries 

275 as well. 

276 include_trees: Whether to include trees 

277 rename_detector: RenameDetector object for detecting renames. 

278 change_type_same: Whether to report change types in the same 

279 entry or as delete+add. 

280 paths: Optional list of paths to filter to (as bytes). 

281 

282 Returns: 

283 Iterator over TreeChange instances for each change between the 

284 source and target tree. 

285 """ 

286 if rename_detector is not None and tree1_id is not None and tree2_id is not None: 

287 yield from rename_detector.changes_with_renames( 

288 tree1_id, 

289 tree2_id, 

290 want_unchanged=want_unchanged, 

291 include_trees=include_trees, 

292 ) 

293 return 

294 

295 entries = walk_trees( 

296 store, tree1_id, tree2_id, prune_identical=(not want_unchanged), paths=paths 

297 ) 

298 for entry1, entry2 in entries: 

299 if entry1 == entry2 and not want_unchanged: 

300 continue 

301 

302 # Treat entries for trees as missing. 

303 entry1 = _skip_tree(entry1, include_trees) 

304 entry2 = _skip_tree(entry2, include_trees) 

305 

306 if entry1 is not None and entry2 is not None: 

307 if ( 

308 entry1.mode is not None 

309 and entry2.mode is not None 

310 and stat.S_IFMT(entry1.mode) != stat.S_IFMT(entry2.mode) 

311 and not change_type_same 

312 ): 

313 # File type changed: report as delete/add. 

314 yield TreeChange.delete(entry1) 

315 entry1 = None 

316 change_type = CHANGE_ADD 

317 elif entry1 == entry2: 

318 change_type = CHANGE_UNCHANGED 

319 else: 

320 change_type = CHANGE_MODIFY 

321 elif entry1 is not None: 

322 change_type = CHANGE_DELETE 

323 elif entry2 is not None: 

324 change_type = CHANGE_ADD 

325 else: 

326 # Both were None because at least one was a tree. 

327 continue 

328 yield TreeChange(change_type, entry1, entry2) 

329 

330 

331T = TypeVar("T") 

332U = TypeVar("U") 

333 

334 

335def _all_eq(seq: Sequence[T], key: Callable[[T], U], value: U) -> bool: 

336 for e in seq: 

337 if key(e) != value: 

338 return False 

339 return True 

340 

341 

342def _all_same(seq: Sequence[Any], key: Callable[[Any], Any]) -> bool: 

343 return _all_eq(seq[1:], key, key(seq[0])) 

344 

345 

346def tree_changes_for_merge( 

347 store: BaseObjectStore, 

348 parent_tree_ids: Sequence[ObjectID], 

349 tree_id: ObjectID, 

350 rename_detector: Optional["RenameDetector"] = None, 

351) -> Iterator[list[Optional[TreeChange]]]: 

352 """Get the tree changes for a merge tree relative to all its parents. 

353 

354 Args: 

355 store: An ObjectStore for looking up objects. 

356 parent_tree_ids: An iterable of the SHAs of the parent trees. 

357 tree_id: The SHA of the merge tree. 

358 rename_detector: RenameDetector object for detecting renames. 

359 

360 Returns: 

361 Iterator over lists of TreeChange objects, one per conflicted path 

362 in the merge. 

363 

364 Each list contains one element per parent, with the TreeChange for that 

365 path relative to that parent. An element may be None if it never 

366 existed in one parent and was deleted in two others. 

367 

368 A path is only included in the output if it is a conflict, i.e. its SHA 

369 in the merge tree is not found in any of the parents, or in the case of 

370 deletes, if not all of the old SHAs match. 

371 """ 

372 all_parent_changes = [ 

373 tree_changes(store, t, tree_id, rename_detector=rename_detector) 

374 for t in parent_tree_ids 

375 ] 

376 num_parents = len(parent_tree_ids) 

377 changes_by_path: dict[bytes, list[Optional[TreeChange]]] = defaultdict( 

378 lambda: [None] * num_parents 

379 ) 

380 

381 # Organize by path. 

382 for i, parent_changes in enumerate(all_parent_changes): 

383 for change in parent_changes: 

384 if change.type == CHANGE_DELETE: 

385 assert change.old is not None 

386 path = change.old.path 

387 else: 

388 assert change.new is not None 

389 path = change.new.path 

390 assert path is not None 

391 changes_by_path[path][i] = change 

392 

393 def old_sha(c: TreeChange) -> Optional[ObjectID]: 

394 return c.old.sha if c.old is not None else None 

395 

396 def change_type(c: TreeChange) -> str: 

397 return c.type 

398 

399 # Yield only conflicting changes. 

400 for _, changes in sorted(changes_by_path.items()): 

401 assert len(changes) == num_parents 

402 have = [c for c in changes if c is not None] 

403 if _all_eq(have, change_type, CHANGE_DELETE): 

404 if not _all_same(have, old_sha): 

405 yield changes 

406 elif not _all_same(have, change_type): 

407 yield changes 

408 elif None not in changes: 

409 # If no change was found relative to one parent, that means the SHA 

410 # must have matched the SHA in that parent, so it is not a 

411 # conflict. 

412 yield changes 

413 

414 

415_BLOCK_SIZE = 64 

416 

417 

418def _count_blocks(obj: ShaFile) -> dict[int, int]: 

419 """Count the blocks in an object. 

420 

421 Splits the data into blocks either on lines or <=64-byte chunks of lines. 

422 

423 Args: 

424 obj: The object to count blocks for. 

425 

426 Returns: 

427 A dict of block hashcode -> total bytes occurring. 

428 """ 

429 block_counts: dict[int, int] = defaultdict(int) 

430 block = BytesIO() 

431 n = 0 

432 

433 # Cache attrs as locals to avoid expensive lookups in the inner loop. 

434 block_write = block.write 

435 block_seek = block.seek 

436 block_truncate = block.truncate 

437 block_getvalue = block.getvalue 

438 

439 for c in chain.from_iterable(obj.as_raw_chunks()): 

440 cb = c.to_bytes(1, "big") 

441 block_write(cb) 

442 n += 1 

443 if cb == b"\n" or n == _BLOCK_SIZE: 

444 value = block_getvalue() 

445 block_counts[hash(value)] += len(value) 

446 block_seek(0) 

447 block_truncate() 

448 n = 0 

449 if n > 0: 

450 last_block = block_getvalue() 

451 block_counts[hash(last_block)] += len(last_block) 

452 return block_counts 

453 

454 

455def _common_bytes(blocks1: Mapping[int, int], blocks2: Mapping[int, int]) -> int: 

456 """Count the number of common bytes in two block count dicts. 

457 

458 Args: 

459 blocks1: The first dict of block hashcode -> total bytes. 

460 blocks2: The second dict of block hashcode -> total bytes. 

461 

462 Returns: 

463 The number of bytes in common between blocks1 and blocks2. This is 

464 only approximate due to possible hash collisions. 

465 """ 

466 # Iterate over the smaller of the two dicts, since this is symmetrical. 

467 if len(blocks1) > len(blocks2): 

468 blocks1, blocks2 = blocks2, blocks1 

469 score = 0 

470 for block, count1 in blocks1.items(): 

471 count2 = blocks2.get(block) 

472 if count2: 

473 score += min(count1, count2) 

474 return score 

475 

476 

477def _similarity_score( 

478 obj1: ShaFile, 

479 obj2: ShaFile, 

480 block_cache: Optional[dict[ObjectID, dict[int, int]]] = None, 

481) -> int: 

482 """Compute a similarity score for two objects. 

483 

484 Args: 

485 obj1: The first object to score. 

486 obj2: The second object to score. 

487 block_cache: An optional dict of SHA to block counts to cache 

488 results between calls. 

489 

490 Returns: 

491 The similarity score between the two objects, defined as the 

492 number of bytes in common between the two objects divided by the 

493 maximum size, scaled to the range 0-100. 

494 """ 

495 if block_cache is None: 

496 block_cache = {} 

497 if obj1.id not in block_cache: 

498 block_cache[obj1.id] = _count_blocks(obj1) 

499 if obj2.id not in block_cache: 

500 block_cache[obj2.id] = _count_blocks(obj2) 

501 

502 common_bytes = _common_bytes(block_cache[obj1.id], block_cache[obj2.id]) 

503 max_size = max(obj1.raw_length(), obj2.raw_length()) 

504 if not max_size: 

505 return _MAX_SCORE 

506 return int(float(common_bytes) * _MAX_SCORE / max_size) 

507 

508 

509def _tree_change_key(entry: TreeChange) -> tuple[bytes, bytes]: 

510 # Sort by old path then new path. If only one exists, use it for both keys. 

511 path1 = entry.old.path if entry.old is not None else None 

512 path2 = entry.new.path if entry.new is not None else None 

513 if path1 is None: 

514 path1 = path2 

515 if path2 is None: 

516 path2 = path1 

517 assert path1 is not None 

518 assert path2 is not None 

519 return (path1, path2) 

520 

521 

522class RenameDetector: 

523 """Object for handling rename detection between two trees.""" 

524 

525 _adds: list[TreeChange] 

526 _deletes: list[TreeChange] 

527 _changes: list[TreeChange] 

528 _candidates: list[tuple[int, TreeChange]] 

529 

530 def __init__( 

531 self, 

532 store: BaseObjectStore, 

533 rename_threshold: int = RENAME_THRESHOLD, 

534 max_files: Optional[int] = MAX_FILES, 

535 rewrite_threshold: Optional[int] = REWRITE_THRESHOLD, 

536 find_copies_harder: bool = False, 

537 ) -> None: 

538 """Initialize the rename detector. 

539 

540 Args: 

541 store: An ObjectStore for looking up objects. 

542 rename_threshold: The threshold similarity score for considering 

543 an add/delete pair to be a rename/copy; see _similarity_score. 

544 max_files: The maximum number of adds and deletes to consider, 

545 or None for no limit. The detector is guaranteed to compare no more 

546 than max_files ** 2 add/delete pairs. This limit is provided 

547 because rename detection can be quadratic in the project size. If 

548 the limit is exceeded, no content rename detection is attempted. 

549 rewrite_threshold: The threshold similarity score below which a 

550 modify should be considered a delete/add, or None to not break 

551 modifies; see _similarity_score. 

552 find_copies_harder: If True, consider unmodified files when 

553 detecting copies. 

554 """ 

555 self._store = store 

556 self._rename_threshold = rename_threshold 

557 self._rewrite_threshold = rewrite_threshold 

558 self._max_files = max_files 

559 self._find_copies_harder = find_copies_harder 

560 self._want_unchanged = False 

561 

562 def _reset(self) -> None: 

563 self._adds = [] 

564 self._deletes = [] 

565 self._changes = [] 

566 

567 def _should_split(self, change: TreeChange) -> bool: 

568 if self._rewrite_threshold is None or change.type != CHANGE_MODIFY: 

569 return False 

570 assert change.old is not None and change.new is not None 

571 if change.old.sha == change.new.sha: 

572 return False 

573 assert change.old.sha is not None 

574 assert change.new.sha is not None 

575 old_obj = self._store[change.old.sha] 

576 new_obj = self._store[change.new.sha] 

577 return _similarity_score(old_obj, new_obj) < self._rewrite_threshold 

578 

579 def _add_change(self, change: TreeChange) -> None: 

580 if change.type == CHANGE_ADD: 

581 self._adds.append(change) 

582 elif change.type == CHANGE_DELETE: 

583 self._deletes.append(change) 

584 elif self._should_split(change): 

585 assert change.old is not None and change.new is not None 

586 self._deletes.append(TreeChange.delete(change.old)) 

587 self._adds.append(TreeChange.add(change.new)) 

588 elif ( 

589 self._find_copies_harder and change.type == CHANGE_UNCHANGED 

590 ) or change.type == CHANGE_MODIFY: 

591 # Treat all modifies as potential deletes for rename detection, 

592 # but don't split them (to avoid spurious renames). Setting 

593 # find_copies_harder means we treat unchanged the same as 

594 # modified. 

595 self._deletes.append(change) 

596 else: 

597 self._changes.append(change) 

598 

599 def _collect_changes( 

600 self, tree1_id: Optional[ObjectID], tree2_id: Optional[ObjectID] 

601 ) -> None: 

602 want_unchanged = self._find_copies_harder or self._want_unchanged 

603 for change in tree_changes( 

604 self._store, 

605 tree1_id, 

606 tree2_id, 

607 want_unchanged=want_unchanged, 

608 include_trees=self._include_trees, 

609 ): 

610 self._add_change(change) 

611 

612 def _prune( 

613 self, add_paths: AbstractSet[bytes], delete_paths: AbstractSet[bytes] 

614 ) -> None: 

615 def check_add(a: TreeChange) -> bool: 

616 assert a.new is not None 

617 return a.new.path not in add_paths 

618 

619 def check_delete(d: TreeChange) -> bool: 

620 assert d.old is not None 

621 return d.old.path not in delete_paths 

622 

623 self._adds = [a for a in self._adds if check_add(a)] 

624 self._deletes = [d for d in self._deletes if check_delete(d)] 

625 

626 def _find_exact_renames(self) -> None: 

627 add_map = defaultdict(list) 

628 for add in self._adds: 

629 assert add.new is not None 

630 add_map[add.new.sha].append(add.new) 

631 delete_map = defaultdict(list) 

632 for delete in self._deletes: 

633 # Keep track of whether the delete was actually marked as a delete. 

634 # If not, it needs to be marked as a copy. 

635 is_delete = delete.type == CHANGE_DELETE 

636 assert delete.old is not None 

637 delete_map[delete.old.sha].append((delete.old, is_delete)) 

638 

639 add_paths = set() 

640 delete_paths = set() 

641 for sha, sha_deletes in delete_map.items(): 

642 sha_adds = add_map[sha] 

643 for (old, is_delete), new in zip(sha_deletes, sha_adds): 

644 assert old.mode is not None 

645 assert new.mode is not None 

646 if stat.S_IFMT(old.mode) != stat.S_IFMT(new.mode): 

647 continue 

648 if is_delete: 

649 assert old.path is not None 

650 delete_paths.add(old.path) 

651 assert new.path is not None 

652 add_paths.add(new.path) 

653 new_type = (is_delete and CHANGE_RENAME) or CHANGE_COPY 

654 self._changes.append(TreeChange(new_type, old, new)) 

655 

656 num_extra_adds = len(sha_adds) - len(sha_deletes) 

657 # TODO(dborowitz): Less arbitrary way of dealing with extra copies. 

658 old = sha_deletes[0][0] 

659 if num_extra_adds > 0: 

660 for new in sha_adds[-num_extra_adds:]: 

661 assert new.path is not None 

662 add_paths.add(new.path) 

663 self._changes.append(TreeChange(CHANGE_COPY, old, new)) 

664 self._prune(add_paths, delete_paths) 

665 

666 def _should_find_content_renames(self) -> bool: 

667 if self._max_files is None: 

668 return True 

669 return len(self._adds) * len(self._deletes) <= self._max_files**2 

670 

671 def _rename_type( 

672 self, check_paths: bool, delete: TreeChange, add: TreeChange 

673 ) -> str: 

674 assert delete.old is not None and add.new is not None 

675 if check_paths and delete.old.path == add.new.path: 

676 # If the paths match, this must be a split modify, so make sure it 

677 # comes out as a modify. 

678 return CHANGE_MODIFY 

679 elif delete.type != CHANGE_DELETE: 

680 # If it's in deletes but not marked as a delete, it must have been 

681 # added due to find_copies_harder, and needs to be marked as a 

682 # copy. 

683 return CHANGE_COPY 

684 return CHANGE_RENAME 

685 

686 def _find_content_rename_candidates(self) -> None: 

687 candidates = self._candidates = [] 

688 # TODO: Optimizations: 

689 # - Compare object sizes before counting blocks. 

690 # - Skip if delete's S_IFMT differs from all adds. 

691 # - Skip if adds or deletes is empty. 

692 # Match C git's behavior of not attempting to find content renames if 

693 # the matrix size exceeds the threshold. 

694 if not self._should_find_content_renames(): 

695 return 

696 

697 block_cache = {} 

698 check_paths = self._rename_threshold is not None 

699 for delete in self._deletes: 

700 assert delete.old is not None 

701 assert delete.old.mode is not None 

702 if S_ISGITLINK(delete.old.mode): 

703 continue # Git links don't exist in this repo. 

704 assert delete.old.sha is not None 

705 old_sha = delete.old.sha 

706 old_obj = self._store[old_sha] 

707 block_cache[old_sha] = _count_blocks(old_obj) 

708 for add in self._adds: 

709 assert add.new is not None 

710 assert add.new.mode is not None 

711 if stat.S_IFMT(delete.old.mode) != stat.S_IFMT(add.new.mode): 

712 continue 

713 assert add.new.sha is not None 

714 new_obj = self._store[add.new.sha] 

715 score = _similarity_score(old_obj, new_obj, block_cache=block_cache) 

716 if score > self._rename_threshold: 

717 new_type = self._rename_type(check_paths, delete, add) 

718 rename = TreeChange(new_type, delete.old, add.new) 

719 candidates.append((-score, rename)) 

720 

721 def _choose_content_renames(self) -> None: 

722 # Sort scores from highest to lowest, but keep names in ascending 

723 # order. 

724 self._candidates.sort() 

725 

726 delete_paths = set() 

727 add_paths = set() 

728 for _, change in self._candidates: 

729 assert change.old is not None and change.new is not None 

730 new_path = change.new.path 

731 assert new_path is not None 

732 if new_path in add_paths: 

733 continue 

734 old_path = change.old.path 

735 assert old_path is not None 

736 orig_type = change.type 

737 if old_path in delete_paths: 

738 change = TreeChange(CHANGE_COPY, change.old, change.new) 

739 

740 # If the candidate was originally a copy, that means it came from a 

741 # modified or unchanged path, so we don't want to prune it. 

742 if orig_type != CHANGE_COPY: 

743 delete_paths.add(old_path) 

744 add_paths.add(new_path) 

745 self._changes.append(change) 

746 self._prune(add_paths, delete_paths) 

747 

748 def _join_modifies(self) -> None: 

749 if self._rewrite_threshold is None: 

750 return 

751 

752 modifies = {} 

753 delete_map = {} 

754 for d in self._deletes: 

755 assert d.old is not None 

756 delete_map[d.old.path] = d 

757 for add in self._adds: 

758 assert add.new is not None 

759 path = add.new.path 

760 delete = delete_map.get(path) 

761 if ( 

762 delete is not None 

763 and delete.old is not None 

764 and delete.old.mode is not None 

765 and add.new.mode is not None 

766 and stat.S_IFMT(delete.old.mode) == stat.S_IFMT(add.new.mode) 

767 ): 

768 modifies[path] = TreeChange(CHANGE_MODIFY, delete.old, add.new) 

769 

770 def check_add_mod(a: TreeChange) -> bool: 

771 assert a.new is not None 

772 return a.new.path not in modifies 

773 

774 def check_delete_mod(d: TreeChange) -> bool: 

775 assert d.old is not None 

776 return d.old.path not in modifies 

777 

778 self._adds = [a for a in self._adds if check_add_mod(a)] 

779 self._deletes = [d for d in self._deletes if check_delete_mod(d)] 

780 self._changes += modifies.values() 

781 

782 def _sorted_changes(self) -> list[TreeChange]: 

783 result = [] 

784 result.extend(self._adds) 

785 result.extend(self._deletes) 

786 result.extend(self._changes) 

787 result.sort(key=_tree_change_key) 

788 return result 

789 

790 def _prune_unchanged(self) -> None: 

791 if self._want_unchanged: 

792 return 

793 self._deletes = [d for d in self._deletes if d.type != CHANGE_UNCHANGED] 

794 

795 def changes_with_renames( 

796 self, 

797 tree1_id: Optional[ObjectID], 

798 tree2_id: Optional[ObjectID], 

799 want_unchanged: bool = False, 

800 include_trees: bool = False, 

801 ) -> list[TreeChange]: 

802 """Iterate TreeChanges between two tree SHAs, with rename detection.""" 

803 self._reset() 

804 self._want_unchanged = want_unchanged 

805 self._include_trees = include_trees 

806 self._collect_changes(tree1_id, tree2_id) 

807 self._find_exact_renames() 

808 self._find_content_rename_candidates() 

809 self._choose_content_renames() 

810 self._join_modifies() 

811 self._prune_unchanged() 

812 return self._sorted_changes() 

813 

814 

815# Hold on to the pure-python implementations for testing. 

816_is_tree_py = _is_tree 

817_merge_entries_py = _merge_entries 

818_count_blocks_py = _count_blocks 

819 

820if TYPE_CHECKING: 

821 # For type checking, use the Python implementations 

822 pass 

823else: 

824 # At runtime, try to import Rust extensions 

825 try: 

826 # Try to import Rust versions 

827 from dulwich._diff_tree import ( 

828 _count_blocks as _rust_count_blocks, 

829 ) 

830 from dulwich._diff_tree import ( 

831 _is_tree as _rust_is_tree, 

832 ) 

833 from dulwich._diff_tree import ( 

834 _merge_entries as _rust_merge_entries, 

835 ) 

836 

837 # Override with Rust versions 

838 _count_blocks = _rust_count_blocks 

839 _is_tree = _rust_is_tree 

840 _merge_entries = _rust_merge_entries 

841 except ImportError: 

842 pass