Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/dulwich/diff_tree.py: 20%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

443 statements  

1# diff_tree.py -- Utilities for diffing files and trees. 

2# Copyright (C) 2010 Google, Inc. 

3# 

4# SPDX-License-Identifier: Apache-2.0 OR GPL-2.0-or-later 

5# Dulwich is dual-licensed under the Apache License, Version 2.0 and the GNU 

6# General Public License as published by the Free Software Foundation; version 2.0 

7# or (at your option) any later version. You can redistribute it and/or 

8# modify it under the terms of either of these two licenses. 

9# 

10# Unless required by applicable law or agreed to in writing, software 

11# distributed under the License is distributed on an "AS IS" BASIS, 

12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

13# See the License for the specific language governing permissions and 

14# limitations under the License. 

15# 

16# You should have received a copy of the licenses; if not, see 

17# <http://www.gnu.org/licenses/> for a copy of the GNU General Public License 

18# and <http://www.apache.org/licenses/LICENSE-2.0> for a copy of the Apache 

19# License, Version 2.0. 

20# 

21 

22"""Utilities for diffing files and trees.""" 

23 

24__all__ = [ 

25 "CHANGE_ADD", 

26 "CHANGE_COPY", 

27 "CHANGE_DELETE", 

28 "CHANGE_MODIFY", 

29 "CHANGE_RENAME", 

30 "CHANGE_UNCHANGED", 

31 "MAX_FILES", 

32 "RENAME_CHANGE_TYPES", 

33 "RENAME_THRESHOLD", 

34 "REWRITE_THRESHOLD", 

35 "RenameDetector", 

36 "TreeChange", 

37 "tree_changes", 

38 "tree_changes_for_merge", 

39 "walk_trees", 

40] 

41 

42import stat 

43from collections import defaultdict 

44from collections.abc import Callable, Iterator, Mapping, Sequence 

45from collections.abc import Set as AbstractSet 

46from io import BytesIO 

47from itertools import chain 

48from typing import TYPE_CHECKING, Any, NamedTuple, TypeVar 

49 

50from .object_store import BaseObjectStore 

51from .objects import S_ISGITLINK, ObjectID, ShaFile, Tree, TreeEntry 

52 

53# TreeChange type constants. 

54CHANGE_ADD = "add" 

55CHANGE_MODIFY = "modify" 

56CHANGE_DELETE = "delete" 

57CHANGE_RENAME = "rename" 

58CHANGE_COPY = "copy" 

59CHANGE_UNCHANGED = "unchanged" 

60 

61RENAME_CHANGE_TYPES = (CHANGE_RENAME, CHANGE_COPY) 

62 

63# _NULL_ENTRY removed - using None instead 

64 

65_MAX_SCORE = 100 

66RENAME_THRESHOLD = 60 

67MAX_FILES = 200 

68REWRITE_THRESHOLD: int | None = None 

69 

70 

71class TreeChange(NamedTuple): 

72 """Named tuple a single change between two trees.""" 

73 

74 type: str 

75 old: TreeEntry | None 

76 new: TreeEntry | None 

77 

78 @classmethod 

79 def add(cls, new: TreeEntry) -> "TreeChange": 

80 """Create a TreeChange for an added entry. 

81 

82 Args: 

83 new: New tree entry 

84 

85 Returns: 

86 TreeChange instance 

87 """ 

88 return cls(CHANGE_ADD, None, new) 

89 

90 @classmethod 

91 def delete(cls, old: TreeEntry) -> "TreeChange": 

92 """Create a TreeChange for a deleted entry. 

93 

94 Args: 

95 old: Old tree entry 

96 

97 Returns: 

98 TreeChange instance 

99 """ 

100 return cls(CHANGE_DELETE, old, None) 

101 

102 

103def _tree_entries(path: bytes, tree: Tree) -> list[TreeEntry]: 

104 result: list[TreeEntry] = [] 

105 if not tree: 

106 return result 

107 for entry in tree.iteritems(name_order=True): 

108 result.append(entry.in_path(path)) 

109 return result 

110 

111 

112def _merge_entries( 

113 path: bytes, tree1: Tree, tree2: Tree 

114) -> list[tuple[TreeEntry | None, TreeEntry | None]]: 

115 """Merge the entries of two trees. 

116 

117 Args: 

118 path: A path to prepend to all tree entry names. 

119 tree1: The first Tree object to iterate, or None. 

120 tree2: The second Tree object to iterate, or None. 

121 

122 Returns: 

123 A list of pairs of TreeEntry objects for each pair of entries in 

124 the trees. If an entry exists in one tree but not the other, the other 

125 entry will be None. If both entries exist, they are guaranteed to match. 

126 """ 

127 entries1 = _tree_entries(path, tree1) 

128 entries2 = _tree_entries(path, tree2) 

129 i1 = i2 = 0 

130 len1 = len(entries1) 

131 len2 = len(entries2) 

132 

133 result: list[tuple[TreeEntry | None, TreeEntry | None]] = [] 

134 while i1 < len1 and i2 < len2: 

135 entry1 = entries1[i1] 

136 entry2 = entries2[i2] 

137 if entry1.path < entry2.path: 

138 result.append((entry1, None)) 

139 i1 += 1 

140 elif entry1.path > entry2.path: 

141 result.append((None, entry2)) 

142 i2 += 1 

143 else: 

144 result.append((entry1, entry2)) 

145 i1 += 1 

146 i2 += 1 

147 for i in range(i1, len1): 

148 result.append((entries1[i], None)) 

149 for i in range(i2, len2): 

150 result.append((None, entries2[i])) 

151 return result 

152 

153 

154def _is_tree(entry: TreeEntry | None) -> bool: 

155 if entry is None or entry.mode is None: 

156 return False 

157 return stat.S_ISDIR(entry.mode) 

158 

159 

160def walk_trees( 

161 store: BaseObjectStore, 

162 tree1_id: ObjectID | None, 

163 tree2_id: ObjectID | None, 

164 prune_identical: bool = False, 

165 paths: Sequence[bytes] | None = None, 

166) -> Iterator[tuple[TreeEntry | None, TreeEntry | None]]: 

167 """Recursively walk all the entries of two trees. 

168 

169 Iteration is depth-first pre-order, as in e.g. os.walk. 

170 

171 Args: 

172 store: An ObjectStore for looking up objects. 

173 tree1_id: The SHA of the first Tree object to iterate, or None. 

174 tree2_id: The SHA of the second Tree object to iterate, or None. 

175 prune_identical: If True, identical subtrees will not be walked. 

176 paths: Optional list of paths to filter to (as bytes). 

177 

178 Returns: 

179 Iterator over Pairs of TreeEntry objects for each pair of entries 

180 in the trees and their subtrees recursively. If an entry exists in one 

181 tree but not the other, the other entry will be None. If both entries 

182 exist, they are guaranteed to match. 

183 """ 

184 # This could be fairly easily generalized to >2 trees if we find a use 

185 # case. 

186 entry1 = TreeEntry(b"", stat.S_IFDIR, tree1_id) if tree1_id else None 

187 entry2 = TreeEntry(b"", stat.S_IFDIR, tree2_id) if tree2_id else None 

188 todo: list[tuple[TreeEntry | None, TreeEntry | None]] = [(entry1, entry2)] 

189 while todo: 

190 entry1, entry2 = todo.pop() 

191 is_tree1 = _is_tree(entry1) 

192 is_tree2 = _is_tree(entry2) 

193 if prune_identical and is_tree1 and is_tree2 and entry1 == entry2: 

194 continue 

195 

196 tree1 = (is_tree1 and entry1 and store[entry1.sha]) or None 

197 tree2 = (is_tree2 and entry2 and store[entry2.sha]) or None 

198 path = ( 

199 (entry1.path if entry1 else None) 

200 or (entry2.path if entry2 else None) 

201 or b"" 

202 ) 

203 

204 # If we have path filters, check if we should process this tree 

205 if paths is not None and (is_tree1 or is_tree2) and path is not None: 

206 # Special case for root tree 

207 if path == b"": 

208 should_recurse = True 

209 else: 

210 # Check if any of our filter paths could be under this tree 

211 should_recurse = False 

212 for filter_path in paths: 

213 if filter_path == path: 

214 # Exact match - we want this directory itself 

215 should_recurse = True 

216 break 

217 elif filter_path.startswith(path + b"/"): 

218 # Filter path is under this directory 

219 should_recurse = True 

220 break 

221 elif path.startswith(filter_path + b"/"): 

222 # This directory is under a filter path 

223 should_recurse = True 

224 break 

225 if not should_recurse: 

226 # Skip this tree entirely 

227 continue 

228 

229 # Ensure trees are Tree objects before merging 

230 if tree1 is not None and not isinstance(tree1, Tree): 

231 tree1 = None 

232 if tree2 is not None and not isinstance(tree2, Tree): 

233 tree2 = None 

234 

235 if tree1 is not None or tree2 is not None: 

236 # Use empty trees for None values 

237 if tree1 is None: 

238 tree1 = Tree() 

239 if tree2 is None: 

240 tree2 = Tree() 

241 assert path is not None 

242 todo.extend(reversed(_merge_entries(path, tree1, tree2))) 

243 

244 # Only yield entries that match our path filters 

245 if paths is None: 

246 yield entry1, entry2 

247 else: 

248 # Check if this entry matches any of our filters 

249 for filter_path in paths: 

250 if path == filter_path: 

251 # Exact match 

252 yield entry1, entry2 

253 break 

254 elif path is not None and path.startswith(filter_path + b"/"): 

255 # This entry is under a filter directory 

256 yield entry1, entry2 

257 break 

258 elif ( 

259 path is not None 

260 and filter_path.startswith(path + b"/") 

261 and (is_tree1 or is_tree2) 

262 ): 

263 # This is a parent directory of a filter path 

264 yield entry1, entry2 

265 break 

266 

267 

268def _skip_tree(entry: TreeEntry | None, include_trees: bool) -> TreeEntry | None: 

269 if entry is None or entry.mode is None: 

270 return None 

271 if not include_trees and stat.S_ISDIR(entry.mode): 

272 return None 

273 return entry 

274 

275 

276def tree_changes( 

277 store: BaseObjectStore, 

278 tree1_id: ObjectID | None, 

279 tree2_id: ObjectID | None, 

280 want_unchanged: bool = False, 

281 rename_detector: "RenameDetector | None" = None, 

282 include_trees: bool = False, 

283 change_type_same: bool = False, 

284 paths: Sequence[bytes] | None = None, 

285) -> Iterator[TreeChange]: 

286 """Find the differences between the contents of two trees. 

287 

288 Args: 

289 store: An ObjectStore for looking up objects. 

290 tree1_id: The SHA of the source tree. 

291 tree2_id: The SHA of the target tree. 

292 want_unchanged: If True, include TreeChanges for unmodified entries 

293 as well. 

294 include_trees: Whether to include trees 

295 rename_detector: RenameDetector object for detecting renames. 

296 change_type_same: Whether to report change types in the same 

297 entry or as delete+add. 

298 paths: Optional list of paths to filter to (as bytes). 

299 

300 Returns: 

301 Iterator over TreeChange instances for each change between the 

302 source and target tree. 

303 """ 

304 if rename_detector is not None and tree1_id is not None and tree2_id is not None: 

305 yield from rename_detector.changes_with_renames( 

306 tree1_id, 

307 tree2_id, 

308 want_unchanged=want_unchanged, 

309 include_trees=include_trees, 

310 ) 

311 return 

312 

313 entries = walk_trees( 

314 store, tree1_id, tree2_id, prune_identical=(not want_unchanged), paths=paths 

315 ) 

316 for entry1, entry2 in entries: 

317 if entry1 == entry2 and not want_unchanged: 

318 continue 

319 

320 # Treat entries for trees as missing. 

321 entry1 = _skip_tree(entry1, include_trees) 

322 entry2 = _skip_tree(entry2, include_trees) 

323 

324 if entry1 is not None and entry2 is not None: 

325 if ( 

326 entry1.mode is not None 

327 and entry2.mode is not None 

328 and stat.S_IFMT(entry1.mode) != stat.S_IFMT(entry2.mode) 

329 and not change_type_same 

330 ): 

331 # File type changed: report as delete/add. 

332 yield TreeChange.delete(entry1) 

333 entry1 = None 

334 change_type = CHANGE_ADD 

335 elif entry1 == entry2: 

336 change_type = CHANGE_UNCHANGED 

337 else: 

338 change_type = CHANGE_MODIFY 

339 elif entry1 is not None: 

340 change_type = CHANGE_DELETE 

341 elif entry2 is not None: 

342 change_type = CHANGE_ADD 

343 else: 

344 # Both were None because at least one was a tree. 

345 continue 

346 yield TreeChange(change_type, entry1, entry2) 

347 

348 

349T = TypeVar("T") 

350U = TypeVar("U") 

351 

352 

353def _all_eq(seq: Sequence[T], key: Callable[[T], U], value: U) -> bool: 

354 for e in seq: 

355 if key(e) != value: 

356 return False 

357 return True 

358 

359 

360def _all_same(seq: Sequence[Any], key: Callable[[Any], Any]) -> bool: 

361 return _all_eq(seq[1:], key, key(seq[0])) 

362 

363 

364def tree_changes_for_merge( 

365 store: BaseObjectStore, 

366 parent_tree_ids: Sequence[ObjectID], 

367 tree_id: ObjectID, 

368 rename_detector: "RenameDetector | None" = None, 

369) -> Iterator[list[TreeChange | None]]: 

370 """Get the tree changes for a merge tree relative to all its parents. 

371 

372 Args: 

373 store: An ObjectStore for looking up objects. 

374 parent_tree_ids: An iterable of the SHAs of the parent trees. 

375 tree_id: The SHA of the merge tree. 

376 rename_detector: RenameDetector object for detecting renames. 

377 

378 Returns: 

379 Iterator over lists of TreeChange objects, one per conflicted path 

380 in the merge. 

381 

382 Each list contains one element per parent, with the TreeChange for that 

383 path relative to that parent. An element may be None if it never 

384 existed in one parent and was deleted in two others. 

385 

386 A path is only included in the output if it is a conflict, i.e. its SHA 

387 in the merge tree is not found in any of the parents, or in the case of 

388 deletes, if not all of the old SHAs match. 

389 """ 

390 all_parent_changes = [ 

391 tree_changes(store, t, tree_id, rename_detector=rename_detector) 

392 for t in parent_tree_ids 

393 ] 

394 num_parents = len(parent_tree_ids) 

395 changes_by_path: dict[bytes, list[TreeChange | None]] = defaultdict( 

396 lambda: [None] * num_parents 

397 ) 

398 

399 # Organize by path. 

400 for i, parent_changes in enumerate(all_parent_changes): 

401 for change in parent_changes: 

402 if change.type == CHANGE_DELETE: 

403 assert change.old is not None 

404 path = change.old.path 

405 else: 

406 assert change.new is not None 

407 path = change.new.path 

408 assert path is not None 

409 changes_by_path[path][i] = change 

410 

411 def old_sha(c: TreeChange) -> ObjectID | None: 

412 return c.old.sha if c.old is not None else None 

413 

414 def change_type(c: TreeChange) -> str: 

415 return c.type 

416 

417 # Yield only conflicting changes. 

418 for _, changes in sorted(changes_by_path.items()): 

419 assert len(changes) == num_parents 

420 have = [c for c in changes if c is not None] 

421 if _all_eq(have, change_type, CHANGE_DELETE): 

422 if not _all_same(have, old_sha): 

423 yield changes 

424 elif not _all_same(have, change_type): 

425 yield changes 

426 elif None not in changes: 

427 # If no change was found relative to one parent, that means the SHA 

428 # must have matched the SHA in that parent, so it is not a 

429 # conflict. 

430 yield changes 

431 

432 

433_BLOCK_SIZE = 64 

434 

435 

436def _count_blocks(obj: ShaFile) -> dict[int, int]: 

437 """Count the blocks in an object. 

438 

439 Splits the data into blocks either on lines or <=64-byte chunks of lines. 

440 

441 Args: 

442 obj: The object to count blocks for. 

443 

444 Returns: 

445 A dict of block hashcode -> total bytes occurring. 

446 """ 

447 block_counts: dict[int, int] = defaultdict(int) 

448 block = BytesIO() 

449 n = 0 

450 

451 # Cache attrs as locals to avoid expensive lookups in the inner loop. 

452 block_write = block.write 

453 block_seek = block.seek 

454 block_truncate = block.truncate 

455 block_getvalue = block.getvalue 

456 

457 for c in chain.from_iterable(obj.as_raw_chunks()): 

458 cb = c.to_bytes(1, "big") 

459 block_write(cb) 

460 n += 1 

461 if cb == b"\n" or n == _BLOCK_SIZE: 

462 value = block_getvalue() 

463 block_counts[hash(value)] += len(value) 

464 block_seek(0) 

465 block_truncate() 

466 n = 0 

467 if n > 0: 

468 last_block = block_getvalue() 

469 block_counts[hash(last_block)] += len(last_block) 

470 return block_counts 

471 

472 

473def _common_bytes(blocks1: Mapping[int, int], blocks2: Mapping[int, int]) -> int: 

474 """Count the number of common bytes in two block count dicts. 

475 

476 Args: 

477 blocks1: The first dict of block hashcode -> total bytes. 

478 blocks2: The second dict of block hashcode -> total bytes. 

479 

480 Returns: 

481 The number of bytes in common between blocks1 and blocks2. This is 

482 only approximate due to possible hash collisions. 

483 """ 

484 # Iterate over the smaller of the two dicts, since this is symmetrical. 

485 if len(blocks1) > len(blocks2): 

486 blocks1, blocks2 = blocks2, blocks1 

487 score = 0 

488 for block, count1 in blocks1.items(): 

489 count2 = blocks2.get(block) 

490 if count2: 

491 score += min(count1, count2) 

492 return score 

493 

494 

495def _similarity_score( 

496 obj1: ShaFile, 

497 obj2: ShaFile, 

498 block_cache: dict[ObjectID, dict[int, int]] | None = None, 

499) -> int: 

500 """Compute a similarity score for two objects. 

501 

502 Args: 

503 obj1: The first object to score. 

504 obj2: The second object to score. 

505 block_cache: An optional dict of SHA to block counts to cache 

506 results between calls. 

507 

508 Returns: 

509 The similarity score between the two objects, defined as the 

510 number of bytes in common between the two objects divided by the 

511 maximum size, scaled to the range 0-100. 

512 """ 

513 if block_cache is None: 

514 block_cache = {} 

515 if obj1.id not in block_cache: 

516 block_cache[obj1.id] = _count_blocks(obj1) 

517 if obj2.id not in block_cache: 

518 block_cache[obj2.id] = _count_blocks(obj2) 

519 

520 common_bytes = _common_bytes(block_cache[obj1.id], block_cache[obj2.id]) 

521 max_size = max(obj1.raw_length(), obj2.raw_length()) 

522 if not max_size: 

523 return _MAX_SCORE 

524 return int(float(common_bytes) * _MAX_SCORE / max_size) 

525 

526 

527def _tree_change_key(entry: TreeChange) -> tuple[bytes, bytes]: 

528 # Sort by old path then new path. If only one exists, use it for both keys. 

529 path1 = entry.old.path if entry.old is not None else None 

530 path2 = entry.new.path if entry.new is not None else None 

531 if path1 is None: 

532 path1 = path2 

533 if path2 is None: 

534 path2 = path1 

535 assert path1 is not None 

536 assert path2 is not None 

537 return (path1, path2) 

538 

539 

540class RenameDetector: 

541 """Object for handling rename detection between two trees.""" 

542 

543 _adds: list[TreeChange] 

544 _deletes: list[TreeChange] 

545 _changes: list[TreeChange] 

546 _candidates: list[tuple[int, TreeChange]] 

547 

548 def __init__( 

549 self, 

550 store: BaseObjectStore, 

551 rename_threshold: int = RENAME_THRESHOLD, 

552 max_files: int | None = MAX_FILES, 

553 rewrite_threshold: int | None = REWRITE_THRESHOLD, 

554 find_copies_harder: bool = False, 

555 ) -> None: 

556 """Initialize the rename detector. 

557 

558 Args: 

559 store: An ObjectStore for looking up objects. 

560 rename_threshold: The threshold similarity score for considering 

561 an add/delete pair to be a rename/copy; see _similarity_score. 

562 max_files: The maximum number of adds and deletes to consider, 

563 or None for no limit. The detector is guaranteed to compare no more 

564 than max_files ** 2 add/delete pairs. This limit is provided 

565 because rename detection can be quadratic in the project size. If 

566 the limit is exceeded, no content rename detection is attempted. 

567 rewrite_threshold: The threshold similarity score below which a 

568 modify should be considered a delete/add, or None to not break 

569 modifies; see _similarity_score. 

570 find_copies_harder: If True, consider unmodified files when 

571 detecting copies. 

572 """ 

573 self._store = store 

574 self._rename_threshold = rename_threshold 

575 self._rewrite_threshold = rewrite_threshold 

576 self._max_files = max_files 

577 self._find_copies_harder = find_copies_harder 

578 self._want_unchanged = False 

579 

580 def _reset(self) -> None: 

581 self._adds = [] 

582 self._deletes = [] 

583 self._changes = [] 

584 

585 def _should_split(self, change: TreeChange) -> bool: 

586 if self._rewrite_threshold is None or change.type != CHANGE_MODIFY: 

587 return False 

588 assert change.old is not None and change.new is not None 

589 if change.old.sha == change.new.sha: 

590 return False 

591 assert change.old.sha is not None 

592 assert change.new.sha is not None 

593 old_obj = self._store[change.old.sha] 

594 new_obj = self._store[change.new.sha] 

595 return _similarity_score(old_obj, new_obj) < self._rewrite_threshold 

596 

597 def _add_change(self, change: TreeChange) -> None: 

598 if change.type == CHANGE_ADD: 

599 self._adds.append(change) 

600 elif change.type == CHANGE_DELETE: 

601 self._deletes.append(change) 

602 elif self._should_split(change): 

603 assert change.old is not None and change.new is not None 

604 self._deletes.append(TreeChange.delete(change.old)) 

605 self._adds.append(TreeChange.add(change.new)) 

606 elif ( 

607 self._find_copies_harder and change.type == CHANGE_UNCHANGED 

608 ) or change.type == CHANGE_MODIFY: 

609 # Treat all modifies as potential deletes for rename detection, 

610 # but don't split them (to avoid spurious renames). Setting 

611 # find_copies_harder means we treat unchanged the same as 

612 # modified. 

613 self._deletes.append(change) 

614 else: 

615 self._changes.append(change) 

616 

617 def _collect_changes( 

618 self, tree1_id: ObjectID | None, tree2_id: ObjectID | None 

619 ) -> None: 

620 want_unchanged = self._find_copies_harder or self._want_unchanged 

621 for change in tree_changes( 

622 self._store, 

623 tree1_id, 

624 tree2_id, 

625 want_unchanged=want_unchanged, 

626 include_trees=self._include_trees, 

627 ): 

628 self._add_change(change) 

629 

630 def _prune( 

631 self, add_paths: AbstractSet[bytes], delete_paths: AbstractSet[bytes] 

632 ) -> None: 

633 def check_add(a: TreeChange) -> bool: 

634 assert a.new is not None 

635 return a.new.path not in add_paths 

636 

637 def check_delete(d: TreeChange) -> bool: 

638 assert d.old is not None 

639 return d.old.path not in delete_paths 

640 

641 self._adds = [a for a in self._adds if check_add(a)] 

642 self._deletes = [d for d in self._deletes if check_delete(d)] 

643 

644 def _find_exact_renames(self) -> None: 

645 add_map = defaultdict(list) 

646 for add in self._adds: 

647 assert add.new is not None 

648 add_map[add.new.sha].append(add.new) 

649 delete_map = defaultdict(list) 

650 for delete in self._deletes: 

651 # Keep track of whether the delete was actually marked as a delete. 

652 # If not, it needs to be marked as a copy. 

653 is_delete = delete.type == CHANGE_DELETE 

654 assert delete.old is not None 

655 delete_map[delete.old.sha].append((delete.old, is_delete)) 

656 

657 add_paths = set() 

658 delete_paths = set() 

659 for sha, sha_deletes in delete_map.items(): 

660 sha_adds = add_map[sha] 

661 for (old, is_delete), new in zip(sha_deletes, sha_adds): 

662 assert old.mode is not None 

663 assert new.mode is not None 

664 if stat.S_IFMT(old.mode) != stat.S_IFMT(new.mode): 

665 continue 

666 if is_delete: 

667 assert old.path is not None 

668 delete_paths.add(old.path) 

669 assert new.path is not None 

670 add_paths.add(new.path) 

671 new_type = (is_delete and CHANGE_RENAME) or CHANGE_COPY 

672 self._changes.append(TreeChange(new_type, old, new)) 

673 

674 num_extra_adds = len(sha_adds) - len(sha_deletes) 

675 # TODO(dborowitz): Less arbitrary way of dealing with extra copies. 

676 old = sha_deletes[0][0] 

677 if num_extra_adds > 0: 

678 for new in sha_adds[-num_extra_adds:]: 

679 assert new.path is not None 

680 add_paths.add(new.path) 

681 self._changes.append(TreeChange(CHANGE_COPY, old, new)) 

682 self._prune(add_paths, delete_paths) 

683 

684 def _should_find_content_renames(self) -> bool: 

685 if self._max_files is None: 

686 return True 

687 return len(self._adds) * len(self._deletes) <= self._max_files**2 

688 

689 def _rename_type( 

690 self, check_paths: bool, delete: TreeChange, add: TreeChange 

691 ) -> str: 

692 assert delete.old is not None and add.new is not None 

693 if check_paths and delete.old.path == add.new.path: 

694 # If the paths match, this must be a split modify, so make sure it 

695 # comes out as a modify. 

696 return CHANGE_MODIFY 

697 elif delete.type != CHANGE_DELETE: 

698 # If it's in deletes but not marked as a delete, it must have been 

699 # added due to find_copies_harder, and needs to be marked as a 

700 # copy. 

701 return CHANGE_COPY 

702 return CHANGE_RENAME 

703 

704 def _find_content_rename_candidates(self) -> None: 

705 candidates = self._candidates = [] 

706 # TODO: Optimizations: 

707 # - Compare object sizes before counting blocks. 

708 # - Skip if delete's S_IFMT differs from all adds. 

709 # - Skip if adds or deletes is empty. 

710 # Match C git's behavior of not attempting to find content renames if 

711 # the matrix size exceeds the threshold. 

712 if not self._should_find_content_renames(): 

713 return 

714 

715 block_cache = {} 

716 check_paths = self._rename_threshold is not None 

717 for delete in self._deletes: 

718 assert delete.old is not None 

719 assert delete.old.mode is not None 

720 if S_ISGITLINK(delete.old.mode): 

721 continue # Git links don't exist in this repo. 

722 assert delete.old.sha is not None 

723 old_sha = delete.old.sha 

724 old_obj = self._store[old_sha] 

725 block_cache[old_sha] = _count_blocks(old_obj) 

726 for add in self._adds: 

727 assert add.new is not None 

728 assert add.new.mode is not None 

729 if stat.S_IFMT(delete.old.mode) != stat.S_IFMT(add.new.mode): 

730 continue 

731 assert add.new.sha is not None 

732 new_obj = self._store[add.new.sha] 

733 score = _similarity_score(old_obj, new_obj, block_cache=block_cache) 

734 if score > self._rename_threshold: 

735 new_type = self._rename_type(check_paths, delete, add) 

736 rename = TreeChange(new_type, delete.old, add.new) 

737 candidates.append((-score, rename)) 

738 

739 def _choose_content_renames(self) -> None: 

740 # Sort scores from highest to lowest, but keep names in ascending 

741 # order. 

742 self._candidates.sort() 

743 

744 delete_paths = set() 

745 add_paths = set() 

746 for _, change in self._candidates: 

747 assert change.old is not None and change.new is not None 

748 new_path = change.new.path 

749 assert new_path is not None 

750 if new_path in add_paths: 

751 continue 

752 old_path = change.old.path 

753 assert old_path is not None 

754 orig_type = change.type 

755 if old_path in delete_paths: 

756 change = TreeChange(CHANGE_COPY, change.old, change.new) 

757 

758 # If the candidate was originally a copy, that means it came from a 

759 # modified or unchanged path, so we don't want to prune it. 

760 if orig_type != CHANGE_COPY: 

761 delete_paths.add(old_path) 

762 add_paths.add(new_path) 

763 self._changes.append(change) 

764 self._prune(add_paths, delete_paths) 

765 

766 def _join_modifies(self) -> None: 

767 if self._rewrite_threshold is None: 

768 return 

769 

770 modifies = {} 

771 delete_map = {} 

772 for d in self._deletes: 

773 assert d.old is not None 

774 delete_map[d.old.path] = d 

775 for add in self._adds: 

776 assert add.new is not None 

777 path = add.new.path 

778 delete = delete_map.get(path) 

779 if ( 

780 delete is not None 

781 and delete.old is not None 

782 and delete.old.mode is not None 

783 and add.new.mode is not None 

784 and stat.S_IFMT(delete.old.mode) == stat.S_IFMT(add.new.mode) 

785 ): 

786 modifies[path] = TreeChange(CHANGE_MODIFY, delete.old, add.new) 

787 

788 def check_add_mod(a: TreeChange) -> bool: 

789 assert a.new is not None 

790 return a.new.path not in modifies 

791 

792 def check_delete_mod(d: TreeChange) -> bool: 

793 assert d.old is not None 

794 return d.old.path not in modifies 

795 

796 self._adds = [a for a in self._adds if check_add_mod(a)] 

797 self._deletes = [d for d in self._deletes if check_delete_mod(d)] 

798 self._changes += modifies.values() 

799 

800 def _sorted_changes(self) -> list[TreeChange]: 

801 result = [] 

802 result.extend(self._adds) 

803 result.extend(self._deletes) 

804 result.extend(self._changes) 

805 result.sort(key=_tree_change_key) 

806 return result 

807 

808 def _prune_unchanged(self) -> None: 

809 if self._want_unchanged: 

810 return 

811 self._deletes = [d for d in self._deletes if d.type != CHANGE_UNCHANGED] 

812 

813 def changes_with_renames( 

814 self, 

815 tree1_id: ObjectID | None, 

816 tree2_id: ObjectID | None, 

817 want_unchanged: bool = False, 

818 include_trees: bool = False, 

819 ) -> list[TreeChange]: 

820 """Iterate TreeChanges between two tree SHAs, with rename detection.""" 

821 self._reset() 

822 self._want_unchanged = want_unchanged 

823 self._include_trees = include_trees 

824 self._collect_changes(tree1_id, tree2_id) 

825 self._find_exact_renames() 

826 self._find_content_rename_candidates() 

827 self._choose_content_renames() 

828 self._join_modifies() 

829 self._prune_unchanged() 

830 return self._sorted_changes() 

831 

832 

833# Hold on to the pure-python implementations for testing. 

834_is_tree_py = _is_tree 

835_merge_entries_py = _merge_entries 

836_count_blocks_py = _count_blocks 

837 

838if TYPE_CHECKING: 

839 # For type checking, use the Python implementations 

840 pass 

841else: 

842 # At runtime, try to import Rust extensions 

843 try: 

844 # Try to import Rust versions 

845 from dulwich._diff_tree import ( 

846 _count_blocks as _rust_count_blocks, 

847 ) 

848 from dulwich._diff_tree import ( 

849 _is_tree as _rust_is_tree, 

850 ) 

851 from dulwich._diff_tree import ( 

852 _merge_entries as _rust_merge_entries, 

853 ) 

854 

855 # Override with Rust versions 

856 _count_blocks = _rust_count_blocks 

857 _is_tree = _rust_is_tree 

858 _merge_entries = _rust_merge_entries 

859 except ImportError: 

860 pass