Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/dulwich/index.py: 31%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

1212 statements  

1# index.py -- File parser/writer for the git index file 

2# Copyright (C) 2008-2013 Jelmer Vernooij <jelmer@jelmer.uk> 

3# 

4# SPDX-License-Identifier: Apache-2.0 OR GPL-2.0-or-later 

5# Dulwich is dual-licensed under the Apache License, Version 2.0 and the GNU 

6# General Public License as published by the Free Software Foundation; version 2.0 

7# or (at your option) any later version. You can redistribute it and/or 

8# modify it under the terms of either of these two licenses. 

9# 

10# Unless required by applicable law or agreed to in writing, software 

11# distributed under the License is distributed on an "AS IS" BASIS, 

12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

13# See the License for the specific language governing permissions and 

14# limitations under the License. 

15# 

16# You should have received a copy of the licenses; if not, see 

17# <http://www.gnu.org/licenses/> for a copy of the GNU General Public License 

18# and <http://www.apache.org/licenses/LICENSE-2.0> for a copy of the Apache 

19# License, Version 2.0. 

20# 

21 

22"""Parser for the git index file format.""" 

23 

24__all__ = [ 

25 "DEFAULT_VERSION", 

26 "EOIE_EXTENSION", 

27 "EXTENDED_FLAG_INTEND_TO_ADD", 

28 "EXTENDED_FLAG_SKIP_WORKTREE", 

29 "FLAG_EXTENDED", 

30 "FLAG_NAMEMASK", 

31 "FLAG_STAGEMASK", 

32 "FLAG_STAGESHIFT", 

33 "FLAG_VALID", 

34 "HFS_IGNORABLE_CHARS", 

35 "IEOT_EXTENSION", 

36 "INVALID_DOTNAMES", 

37 "REUC_EXTENSION", 

38 "SDIR_EXTENSION", 

39 "TREE_EXTENSION", 

40 "UNTR_EXTENSION", 

41 "Index", 

42 "IndexEntry", 

43 "IndexExtension", 

44 "ResolveUndoExtension", 

45 "SerializedIndexEntry", 

46 "SparseDirExtension", 

47 "Stage", 

48 "TreeDict", 

49 "TreeExtension", 

50 "UnmergedEntries", 

51 "UnsupportedIndexFormat", 

52 "UntrackedExtension", 

53 "blob_from_path_and_mode", 

54 "blob_from_path_and_stat", 

55 "build_file_from_blob", 

56 "build_index_from_tree", 

57 "changes_from_tree", 

58 "cleanup_mode", 

59 "commit_index", 

60 "commit_tree", 

61 "detect_case_only_renames", 

62 "get_path_element_normalizer", 

63 "get_unstaged_changes", 

64 "index_entry_from_stat", 

65 "pathjoin", 

66 "pathsplit", 

67 "read_cache_entry", 

68 "read_cache_time", 

69 "read_index", 

70 "read_index_dict", 

71 "read_index_dict_with_version", 

72 "read_index_header", 

73 "read_submodule_head", 

74 "update_working_tree", 

75 "validate_path", 

76 "validate_path_element_default", 

77 "validate_path_element_hfs", 

78 "validate_path_element_ntfs", 

79 "write_cache_entry", 

80 "write_cache_time", 

81 "write_index", 

82 "write_index_dict", 

83 "write_index_extension", 

84] 

85 

86import errno 

87import os 

88import shutil 

89import stat 

90import struct 

91import sys 

92import types 

93from collections.abc import ( 

94 Callable, 

95 Generator, 

96 Iterable, 

97 Iterator, 

98 Mapping, 

99 Sequence, 

100 Set, 

101) 

102from dataclasses import dataclass 

103from enum import Enum 

104from typing import ( 

105 IO, 

106 TYPE_CHECKING, 

107 Any, 

108 BinaryIO, 

109) 

110 

111if TYPE_CHECKING: 

112 from .config import Config 

113 from .diff_tree import TreeChange 

114 from .file import _GitFile 

115 from .filters import FilterBlobNormalizer 

116 from .object_store import BaseObjectStore 

117 from .repo import Repo 

118 

119from .file import GitFile 

120from .object_store import iter_tree_contents 

121from .objects import ( 

122 S_IFGITLINK, 

123 S_ISGITLINK, 

124 Blob, 

125 ObjectID, 

126 Tree, 

127 TreeEntry, 

128 hex_to_sha, 

129 sha_to_hex, 

130) 

131from .pack import ObjectContainer, SHA1Reader, SHA1Writer 

132 

133# Type alias for recursive tree structure used in commit_tree 

134TreeDict = dict[bytes, "TreeDict | tuple[int, ObjectID]"] 

135 

136# 2-bit stage (during merge) 

137FLAG_STAGEMASK = 0x3000 

138FLAG_STAGESHIFT = 12 

139FLAG_NAMEMASK = 0x0FFF 

140 

141# assume-valid 

142FLAG_VALID = 0x8000 

143 

144# extended flag (must be zero in version 2) 

145FLAG_EXTENDED = 0x4000 

146 

147# used by sparse checkout 

148EXTENDED_FLAG_SKIP_WORKTREE = 0x4000 

149 

150# used by "git add -N" 

151EXTENDED_FLAG_INTEND_TO_ADD = 0x2000 

152 

153DEFAULT_VERSION = 2 

154 

155# Index extension signatures 

156TREE_EXTENSION = b"TREE" 

157REUC_EXTENSION = b"REUC" 

158UNTR_EXTENSION = b"UNTR" 

159EOIE_EXTENSION = b"EOIE" 

160IEOT_EXTENSION = b"IEOT" 

161SDIR_EXTENSION = b"sdir" # Sparse directory extension 

162 

163 

164def _encode_varint(value: int) -> bytes: 

165 """Encode an integer using variable-width encoding. 

166 

167 Same format as used for OFS_DELTA pack entries and index v4 path compression. 

168 Uses 7 bits per byte, with the high bit indicating continuation. 

169 

170 Args: 

171 value: Integer to encode 

172 Returns: 

173 Encoded bytes 

174 """ 

175 if value == 0: 

176 return b"\x00" 

177 

178 result = [] 

179 while value > 0: 

180 byte = value & 0x7F # Take lower 7 bits 

181 value >>= 7 

182 if value > 0: 

183 byte |= 0x80 # Set continuation bit 

184 result.append(byte) 

185 

186 return bytes(result) 

187 

188 

189def _decode_varint(data: bytes, offset: int = 0) -> tuple[int, int]: 

190 """Decode a variable-width encoded integer. 

191 

192 Args: 

193 data: Bytes to decode from 

194 offset: Starting offset in data 

195 Returns: 

196 tuple of (decoded_value, new_offset) 

197 """ 

198 value = 0 

199 shift = 0 

200 pos = offset 

201 

202 while pos < len(data): 

203 byte = data[pos] 

204 pos += 1 

205 value |= (byte & 0x7F) << shift 

206 shift += 7 

207 if not (byte & 0x80): # No continuation bit 

208 break 

209 

210 return value, pos 

211 

212 

213def _compress_path(path: bytes, previous_path: bytes) -> bytes: 

214 """Compress a path relative to the previous path for index version 4. 

215 

216 Args: 

217 path: Path to compress 

218 previous_path: Previous path for comparison 

219 Returns: 

220 Compressed path data (varint prefix_len + suffix) 

221 """ 

222 # Find the common prefix length 

223 common_len = 0 

224 min_len = min(len(path), len(previous_path)) 

225 

226 for i in range(min_len): 

227 if path[i] == previous_path[i]: 

228 common_len += 1 

229 else: 

230 break 

231 

232 # The number of bytes to remove from the end of previous_path 

233 # to get the common prefix 

234 remove_len = len(previous_path) - common_len 

235 

236 # The suffix to append 

237 suffix = path[common_len:] 

238 

239 # Encode: varint(remove_len) + suffix + NUL 

240 return _encode_varint(remove_len) + suffix + b"\x00" 

241 

242 

243def _decompress_path( 

244 data: bytes, offset: int, previous_path: bytes 

245) -> tuple[bytes, int]: 

246 """Decompress a path from index version 4 compressed format. 

247 

248 Args: 

249 data: Raw data containing compressed path 

250 offset: Starting offset in data 

251 previous_path: Previous path for decompression 

252 Returns: 

253 tuple of (decompressed_path, new_offset) 

254 """ 

255 # Decode the number of bytes to remove from previous path 

256 remove_len, new_offset = _decode_varint(data, offset) 

257 

258 # Find the NUL terminator for the suffix 

259 suffix_start = new_offset 

260 suffix_end = suffix_start 

261 while suffix_end < len(data) and data[suffix_end] != 0: 

262 suffix_end += 1 

263 

264 if suffix_end >= len(data): 

265 raise ValueError("Unterminated path suffix in compressed entry") 

266 

267 suffix = data[suffix_start:suffix_end] 

268 new_offset = suffix_end + 1 # Skip the NUL terminator 

269 

270 # Reconstruct the path 

271 if remove_len > len(previous_path): 

272 raise ValueError( 

273 f"Invalid path compression: trying to remove {remove_len} bytes from {len(previous_path)}-byte path" 

274 ) 

275 

276 prefix = previous_path[:-remove_len] if remove_len > 0 else previous_path 

277 path = prefix + suffix 

278 

279 return path, new_offset 

280 

281 

282def _decompress_path_from_stream( 

283 f: BinaryIO, previous_path: bytes 

284) -> tuple[bytes, int]: 

285 """Decompress a path from index version 4 compressed format, reading from stream. 

286 

287 Args: 

288 f: File-like object to read from 

289 previous_path: Previous path for decompression 

290 Returns: 

291 tuple of (decompressed_path, bytes_consumed) 

292 """ 

293 # Decode the varint for remove_len by reading byte by byte 

294 remove_len = 0 

295 shift = 0 

296 bytes_consumed = 0 

297 

298 while True: 

299 byte_data = f.read(1) 

300 if not byte_data: 

301 raise ValueError("Unexpected end of file while reading varint") 

302 byte = byte_data[0] 

303 bytes_consumed += 1 

304 remove_len |= (byte & 0x7F) << shift 

305 shift += 7 

306 if not (byte & 0x80): # No continuation bit 

307 break 

308 

309 # Read the suffix until NUL terminator 

310 suffix = b"" 

311 while True: 

312 byte_data = f.read(1) 

313 if not byte_data: 

314 raise ValueError("Unexpected end of file while reading path suffix") 

315 byte = byte_data[0] 

316 bytes_consumed += 1 

317 if byte == 0: # NUL terminator 

318 break 

319 suffix += bytes([byte]) 

320 

321 # Reconstruct the path 

322 if remove_len > len(previous_path): 

323 raise ValueError( 

324 f"Invalid path compression: trying to remove {remove_len} bytes from {len(previous_path)}-byte path" 

325 ) 

326 

327 prefix = previous_path[:-remove_len] if remove_len > 0 else previous_path 

328 path = prefix + suffix 

329 

330 return path, bytes_consumed 

331 

332 

333class Stage(Enum): 

334 """Represents the stage of an index entry during merge conflicts.""" 

335 

336 NORMAL = 0 

337 MERGE_CONFLICT_ANCESTOR = 1 

338 MERGE_CONFLICT_THIS = 2 

339 MERGE_CONFLICT_OTHER = 3 

340 

341 

342@dataclass 

343class SerializedIndexEntry: 

344 """Represents a serialized index entry as stored in the index file. 

345 

346 This dataclass holds the raw data for an index entry before it's 

347 parsed into the more user-friendly IndexEntry format. 

348 """ 

349 

350 name: bytes 

351 ctime: int | float | tuple[int, int] 

352 mtime: int | float | tuple[int, int] 

353 dev: int 

354 ino: int 

355 mode: int 

356 uid: int 

357 gid: int 

358 size: int 

359 sha: ObjectID 

360 flags: int 

361 extended_flags: int 

362 

363 def stage(self) -> Stage: 

364 """Extract the stage from the flags field. 

365 

366 Returns: 

367 Stage enum value indicating merge conflict state 

368 """ 

369 return Stage((self.flags & FLAG_STAGEMASK) >> FLAG_STAGESHIFT) 

370 

371 def is_sparse_dir(self) -> bool: 

372 """Check if this entry represents a sparse directory. 

373 

374 A sparse directory entry is a collapsed representation of an entire 

375 directory tree in a sparse index. It has: 

376 - Directory mode (0o040000) 

377 - SKIP_WORKTREE flag set 

378 - Path ending with '/' 

379 - SHA pointing to a tree object 

380 

381 Returns: 

382 True if entry is a sparse directory entry 

383 """ 

384 return ( 

385 stat.S_ISDIR(self.mode) 

386 and bool(self.extended_flags & EXTENDED_FLAG_SKIP_WORKTREE) 

387 and self.name.endswith(b"/") 

388 ) 

389 

390 

391@dataclass 

392class IndexExtension: 

393 """Base class for index extensions.""" 

394 

395 signature: bytes 

396 data: bytes 

397 

398 @classmethod 

399 def from_raw(cls, signature: bytes, data: bytes) -> "IndexExtension": 

400 """Create an extension from raw data. 

401 

402 Args: 

403 signature: 4-byte extension signature 

404 data: Extension data 

405 Returns: 

406 Parsed extension object 

407 """ 

408 if signature == TREE_EXTENSION: 

409 return TreeExtension.from_bytes(data) 

410 elif signature == REUC_EXTENSION: 

411 return ResolveUndoExtension.from_bytes(data) 

412 elif signature == UNTR_EXTENSION: 

413 return UntrackedExtension.from_bytes(data) 

414 elif signature == SDIR_EXTENSION: 

415 return SparseDirExtension.from_bytes(data) 

416 else: 

417 # Unknown extension - just store raw data 

418 return cls(signature, data) 

419 

420 def to_bytes(self) -> bytes: 

421 """Serialize extension to bytes.""" 

422 return self.data 

423 

424 

425class TreeExtension(IndexExtension): 

426 """Tree cache extension.""" 

427 

428 def __init__(self, entries: list[tuple[bytes, bytes, int]]) -> None: 

429 """Initialize TreeExtension. 

430 

431 Args: 

432 entries: List of tree cache entries (path, sha, flags) 

433 """ 

434 self.entries = entries 

435 super().__init__(TREE_EXTENSION, b"") 

436 

437 @classmethod 

438 def from_bytes(cls, data: bytes) -> "TreeExtension": 

439 """Parse TreeExtension from bytes. 

440 

441 Args: 

442 data: Raw bytes to parse 

443 

444 Returns: 

445 TreeExtension instance 

446 """ 

447 # TODO: Implement tree cache parsing 

448 return cls([]) 

449 

450 def to_bytes(self) -> bytes: 

451 """Serialize TreeExtension to bytes. 

452 

453 Returns: 

454 Serialized extension data 

455 """ 

456 # TODO: Implement tree cache serialization 

457 return b"" 

458 

459 

460class ResolveUndoExtension(IndexExtension): 

461 """Resolve undo extension for recording merge conflicts.""" 

462 

463 def __init__(self, entries: list[tuple[bytes, list[tuple[int, bytes]]]]) -> None: 

464 """Initialize ResolveUndoExtension. 

465 

466 Args: 

467 entries: List of (path, stages) where stages is a list of (stage, sha) tuples 

468 """ 

469 self.entries = entries 

470 super().__init__(REUC_EXTENSION, b"") 

471 

472 @classmethod 

473 def from_bytes(cls, data: bytes) -> "ResolveUndoExtension": 

474 """Parse ResolveUndoExtension from bytes. 

475 

476 Args: 

477 data: Raw bytes to parse 

478 

479 Returns: 

480 ResolveUndoExtension instance 

481 """ 

482 # TODO: Implement resolve undo parsing 

483 return cls([]) 

484 

485 def to_bytes(self) -> bytes: 

486 """Serialize ResolveUndoExtension to bytes. 

487 

488 Returns: 

489 Serialized extension data 

490 """ 

491 # TODO: Implement resolve undo serialization 

492 return b"" 

493 

494 

495class UntrackedExtension(IndexExtension): 

496 """Untracked cache extension.""" 

497 

498 def __init__(self, data: bytes) -> None: 

499 """Initialize UntrackedExtension. 

500 

501 Args: 

502 data: Raw untracked cache data 

503 """ 

504 super().__init__(UNTR_EXTENSION, data) 

505 

506 @classmethod 

507 def from_bytes(cls, data: bytes) -> "UntrackedExtension": 

508 """Parse UntrackedExtension from bytes. 

509 

510 Args: 

511 data: Raw bytes to parse 

512 

513 Returns: 

514 UntrackedExtension instance 

515 """ 

516 return cls(data) 

517 

518 

519class SparseDirExtension(IndexExtension): 

520 """Sparse directory extension. 

521 

522 This extension indicates that the index contains sparse directory entries. 

523 Tools that don't understand sparse index should avoid interacting with 

524 the index when this extension is present. 

525 

526 The extension data is empty - its presence is the signal. 

527 """ 

528 

529 def __init__(self) -> None: 

530 """Initialize SparseDirExtension.""" 

531 super().__init__(SDIR_EXTENSION, b"") 

532 

533 @classmethod 

534 def from_bytes(cls, data: bytes) -> "SparseDirExtension": 

535 """Parse SparseDirExtension from bytes. 

536 

537 Args: 

538 data: Raw bytes to parse (should be empty) 

539 

540 Returns: 

541 SparseDirExtension instance 

542 """ 

543 return cls() 

544 

545 def to_bytes(self) -> bytes: 

546 """Serialize SparseDirExtension to bytes. 

547 

548 Returns: 

549 Empty bytes (extension presence is the signal) 

550 """ 

551 return b"" 

552 

553 

554@dataclass 

555class IndexEntry: 

556 """Represents an entry in the Git index. 

557 

558 This is a higher-level representation of an index entry that includes 

559 parsed data and convenience methods. 

560 """ 

561 

562 ctime: int | float | tuple[int, int] 

563 mtime: int | float | tuple[int, int] 

564 dev: int 

565 ino: int 

566 mode: int 

567 uid: int 

568 gid: int 

569 size: int 

570 sha: ObjectID 

571 flags: int = 0 

572 extended_flags: int = 0 

573 

574 @classmethod 

575 def from_serialized(cls, serialized: SerializedIndexEntry) -> "IndexEntry": 

576 """Create an IndexEntry from a SerializedIndexEntry. 

577 

578 Args: 

579 serialized: SerializedIndexEntry to convert 

580 

581 Returns: 

582 New IndexEntry instance 

583 """ 

584 return cls( 

585 ctime=serialized.ctime, 

586 mtime=serialized.mtime, 

587 dev=serialized.dev, 

588 ino=serialized.ino, 

589 mode=serialized.mode, 

590 uid=serialized.uid, 

591 gid=serialized.gid, 

592 size=serialized.size, 

593 sha=serialized.sha, 

594 flags=serialized.flags, 

595 extended_flags=serialized.extended_flags, 

596 ) 

597 

598 def serialize(self, name: bytes, stage: Stage) -> SerializedIndexEntry: 

599 """Serialize this entry with a given name and stage. 

600 

601 Args: 

602 name: Path name for the entry 

603 stage: Merge conflict stage 

604 

605 Returns: 

606 SerializedIndexEntry ready for writing to disk 

607 """ 

608 # Clear out any existing stage bits, then set them from the Stage. 

609 new_flags = self.flags & ~FLAG_STAGEMASK 

610 new_flags |= stage.value << FLAG_STAGESHIFT 

611 return SerializedIndexEntry( 

612 name=name, 

613 ctime=self.ctime, 

614 mtime=self.mtime, 

615 dev=self.dev, 

616 ino=self.ino, 

617 mode=self.mode, 

618 uid=self.uid, 

619 gid=self.gid, 

620 size=self.size, 

621 sha=self.sha, 

622 flags=new_flags, 

623 extended_flags=self.extended_flags, 

624 ) 

625 

626 def stage(self) -> Stage: 

627 """Get the merge conflict stage of this entry. 

628 

629 Returns: 

630 Stage enum value 

631 """ 

632 return Stage((self.flags & FLAG_STAGEMASK) >> FLAG_STAGESHIFT) 

633 

634 @property 

635 def skip_worktree(self) -> bool: 

636 """Return True if the skip-worktree bit is set in extended_flags.""" 

637 return bool(self.extended_flags & EXTENDED_FLAG_SKIP_WORKTREE) 

638 

639 def set_skip_worktree(self, skip: bool = True) -> None: 

640 """Helper method to set or clear the skip-worktree bit in extended_flags. 

641 

642 Also sets FLAG_EXTENDED in self.flags if needed. 

643 """ 

644 if skip: 

645 # Turn on the skip-worktree bit 

646 self.extended_flags |= EXTENDED_FLAG_SKIP_WORKTREE 

647 # Also ensure the main 'extended' bit is set in flags 

648 self.flags |= FLAG_EXTENDED 

649 else: 

650 # Turn off the skip-worktree bit 

651 self.extended_flags &= ~EXTENDED_FLAG_SKIP_WORKTREE 

652 # Optionally unset the main extended bit if no extended flags remain 

653 if self.extended_flags == 0: 

654 self.flags &= ~FLAG_EXTENDED 

655 

656 def is_sparse_dir(self, name: bytes) -> bool: 

657 """Check if this entry represents a sparse directory. 

658 

659 A sparse directory entry is a collapsed representation of an entire 

660 directory tree in a sparse index. It has: 

661 - Directory mode (0o040000) 

662 - SKIP_WORKTREE flag set 

663 - Path ending with '/' 

664 - SHA pointing to a tree object 

665 

666 Args: 

667 name: The path name for this entry (IndexEntry doesn't store name) 

668 

669 Returns: 

670 True if entry is a sparse directory entry 

671 """ 

672 return ( 

673 stat.S_ISDIR(self.mode) 

674 and bool(self.extended_flags & EXTENDED_FLAG_SKIP_WORKTREE) 

675 and name.endswith(b"/") 

676 ) 

677 

678 

679class ConflictedIndexEntry: 

680 """Index entry that represents a conflict.""" 

681 

682 ancestor: IndexEntry | None 

683 this: IndexEntry | None 

684 other: IndexEntry | None 

685 

686 def __init__( 

687 self, 

688 ancestor: IndexEntry | None = None, 

689 this: IndexEntry | None = None, 

690 other: IndexEntry | None = None, 

691 ) -> None: 

692 """Initialize ConflictedIndexEntry. 

693 

694 Args: 

695 ancestor: The common ancestor entry 

696 this: The current branch entry 

697 other: The other branch entry 

698 """ 

699 self.ancestor = ancestor 

700 self.this = this 

701 self.other = other 

702 

703 

704class UnmergedEntries(Exception): 

705 """Unmerged entries exist in the index.""" 

706 

707 

708def pathsplit(path: bytes) -> tuple[bytes, bytes]: 

709 """Split a /-delimited path into a directory part and a basename. 

710 

711 Args: 

712 path: The path to split. 

713 

714 Returns: 

715 Tuple with directory name and basename 

716 """ 

717 try: 

718 (dirname, basename) = path.rsplit(b"/", 1) 

719 except ValueError: 

720 return (b"", path) 

721 else: 

722 return (dirname, basename) 

723 

724 

725def pathjoin(*args: bytes) -> bytes: 

726 """Join a /-delimited path.""" 

727 return b"/".join([p for p in args if p]) 

728 

729 

730def read_cache_time(f: BinaryIO) -> tuple[int, int]: 

731 """Read a cache time. 

732 

733 Args: 

734 f: File-like object to read from 

735 Returns: 

736 Tuple with seconds and nanoseconds 

737 """ 

738 return struct.unpack(">LL", f.read(8)) 

739 

740 

741def write_cache_time(f: IO[bytes], t: int | float | tuple[int, int]) -> None: 

742 """Write a cache time. 

743 

744 Args: 

745 f: File-like object to write to 

746 t: Time to write (as int, float or tuple with secs and nsecs) 

747 """ 

748 if isinstance(t, int): 

749 t = (t, 0) 

750 elif isinstance(t, float): 

751 (secs, nsecs) = divmod(t, 1.0) 

752 t = (int(secs), int(nsecs * 1000000000)) 

753 elif not isinstance(t, tuple): 

754 raise TypeError(t) 

755 f.write(struct.pack(">LL", *t)) 

756 

757 

758def read_cache_entry( 

759 f: BinaryIO, version: int, previous_path: bytes = b"" 

760) -> SerializedIndexEntry: 

761 """Read an entry from a cache file. 

762 

763 Args: 

764 f: File-like object to read from 

765 version: Index version 

766 previous_path: Previous entry's path (for version 4 compression) 

767 """ 

768 beginoffset = f.tell() 

769 ctime = read_cache_time(f) 

770 mtime = read_cache_time(f) 

771 ( 

772 dev, 

773 ino, 

774 mode, 

775 uid, 

776 gid, 

777 size, 

778 sha, 

779 flags, 

780 ) = struct.unpack(">LLLLLL20sH", f.read(20 + 4 * 6 + 2)) 

781 if flags & FLAG_EXTENDED: 

782 if version < 3: 

783 raise AssertionError("extended flag set in index with version < 3") 

784 (extended_flags,) = struct.unpack(">H", f.read(2)) 

785 else: 

786 extended_flags = 0 

787 

788 if version >= 4: 

789 # Version 4: paths are always compressed (name_len should be 0) 

790 name, _consumed = _decompress_path_from_stream(f, previous_path) 

791 else: 

792 # Versions < 4: regular name reading 

793 name = f.read(flags & FLAG_NAMEMASK) 

794 

795 # Padding: 

796 if version < 4: 

797 real_size = (f.tell() - beginoffset + 8) & ~7 

798 f.read((beginoffset + real_size) - f.tell()) 

799 

800 return SerializedIndexEntry( 

801 name, 

802 ctime, 

803 mtime, 

804 dev, 

805 ino, 

806 mode, 

807 uid, 

808 gid, 

809 size, 

810 sha_to_hex(sha), 

811 flags & ~FLAG_NAMEMASK, 

812 extended_flags, 

813 ) 

814 

815 

816def write_cache_entry( 

817 f: IO[bytes], entry: SerializedIndexEntry, version: int, previous_path: bytes = b"" 

818) -> None: 

819 """Write an index entry to a file. 

820 

821 Args: 

822 f: File object 

823 entry: IndexEntry to write 

824 version: Index format version 

825 previous_path: Previous entry's path (for version 4 compression) 

826 """ 

827 beginoffset = f.tell() 

828 write_cache_time(f, entry.ctime) 

829 write_cache_time(f, entry.mtime) 

830 

831 if version >= 4: 

832 # Version 4: use compression but set name_len to actual filename length 

833 # This matches how C Git implements index v4 flags 

834 compressed_path = _compress_path(entry.name, previous_path) 

835 flags = len(entry.name) | (entry.flags & ~FLAG_NAMEMASK) 

836 else: 

837 # Versions < 4: include actual name length 

838 flags = len(entry.name) | (entry.flags & ~FLAG_NAMEMASK) 

839 

840 if entry.extended_flags: 

841 flags |= FLAG_EXTENDED 

842 if flags & FLAG_EXTENDED and version is not None and version < 3: 

843 raise AssertionError("unable to use extended flags in version < 3") 

844 

845 f.write( 

846 struct.pack( 

847 b">LLLLLL20sH", 

848 entry.dev & 0xFFFFFFFF, 

849 entry.ino & 0xFFFFFFFF, 

850 entry.mode, 

851 entry.uid, 

852 entry.gid, 

853 entry.size, 

854 hex_to_sha(entry.sha), 

855 flags, 

856 ) 

857 ) 

858 if flags & FLAG_EXTENDED: 

859 f.write(struct.pack(b">H", entry.extended_flags)) 

860 

861 if version >= 4: 

862 # Version 4: always write compressed path 

863 f.write(compressed_path) 

864 else: 

865 # Versions < 4: write regular path and padding 

866 f.write(entry.name) 

867 real_size = (f.tell() - beginoffset + 8) & ~7 

868 f.write(b"\0" * ((beginoffset + real_size) - f.tell())) 

869 

870 

871class UnsupportedIndexFormat(Exception): 

872 """An unsupported index format was encountered.""" 

873 

874 def __init__(self, version: int) -> None: 

875 """Initialize UnsupportedIndexFormat exception. 

876 

877 Args: 

878 version: The unsupported index format version 

879 """ 

880 self.index_format_version = version 

881 

882 

883def read_index_header(f: BinaryIO) -> tuple[int, int]: 

884 """Read an index header from a file. 

885 

886 Returns: 

887 tuple of (version, num_entries) 

888 """ 

889 header = f.read(4) 

890 if header != b"DIRC": 

891 raise AssertionError(f"Invalid index file header: {header!r}") 

892 (version, num_entries) = struct.unpack(b">LL", f.read(4 * 2)) 

893 if version not in (1, 2, 3, 4): 

894 raise UnsupportedIndexFormat(version) 

895 return version, num_entries 

896 

897 

898def write_index_extension(f: IO[bytes], extension: IndexExtension) -> None: 

899 """Write an index extension. 

900 

901 Args: 

902 f: File-like object to write to 

903 extension: Extension to write 

904 """ 

905 data = extension.to_bytes() 

906 f.write(extension.signature) 

907 f.write(struct.pack(">I", len(data))) 

908 f.write(data) 

909 

910 

911def read_index(f: BinaryIO) -> Iterator[SerializedIndexEntry]: 

912 """Read an index file, yielding the individual entries.""" 

913 version, num_entries = read_index_header(f) 

914 previous_path = b"" 

915 for i in range(num_entries): 

916 entry = read_cache_entry(f, version, previous_path) 

917 previous_path = entry.name 

918 yield entry 

919 

920 

921def read_index_dict_with_version( 

922 f: BinaryIO, 

923) -> tuple[dict[bytes, IndexEntry | ConflictedIndexEntry], int, list[IndexExtension]]: 

924 """Read an index file and return it as a dictionary along with the version. 

925 

926 Returns: 

927 tuple of (entries_dict, version, extensions) 

928 """ 

929 version, num_entries = read_index_header(f) 

930 

931 ret: dict[bytes, IndexEntry | ConflictedIndexEntry] = {} 

932 previous_path = b"" 

933 for i in range(num_entries): 

934 entry = read_cache_entry(f, version, previous_path) 

935 previous_path = entry.name 

936 stage = entry.stage() 

937 if stage == Stage.NORMAL: 

938 ret[entry.name] = IndexEntry.from_serialized(entry) 

939 else: 

940 existing = ret.setdefault(entry.name, ConflictedIndexEntry()) 

941 if isinstance(existing, IndexEntry): 

942 raise AssertionError(f"Non-conflicted entry for {entry.name!r} exists") 

943 if stage == Stage.MERGE_CONFLICT_ANCESTOR: 

944 existing.ancestor = IndexEntry.from_serialized(entry) 

945 elif stage == Stage.MERGE_CONFLICT_THIS: 

946 existing.this = IndexEntry.from_serialized(entry) 

947 elif stage == Stage.MERGE_CONFLICT_OTHER: 

948 existing.other = IndexEntry.from_serialized(entry) 

949 

950 # Read extensions 

951 extensions = [] 

952 while True: 

953 # Check if we're at the end (20 bytes before EOF for SHA checksum) 

954 current_pos = f.tell() 

955 f.seek(0, 2) # EOF 

956 eof_pos = f.tell() 

957 f.seek(current_pos) 

958 

959 if current_pos >= eof_pos - 20: 

960 break 

961 

962 # Try to read extension signature 

963 signature = f.read(4) 

964 if len(signature) < 4: 

965 break 

966 

967 # Check if it's a valid extension signature (4 uppercase letters) 

968 if not all(65 <= b <= 90 for b in signature): 

969 # Not an extension, seek back 

970 f.seek(-4, 1) 

971 break 

972 

973 # Read extension size 

974 size_data = f.read(4) 

975 if len(size_data) < 4: 

976 break 

977 size = struct.unpack(">I", size_data)[0] 

978 

979 # Read extension data 

980 data = f.read(size) 

981 if len(data) < size: 

982 break 

983 

984 extension = IndexExtension.from_raw(signature, data) 

985 extensions.append(extension) 

986 

987 return ret, version, extensions 

988 

989 

990def read_index_dict( 

991 f: BinaryIO, 

992) -> dict[bytes, IndexEntry | ConflictedIndexEntry]: 

993 """Read an index file and return it as a dictionary. 

994 

995 Dict Key is tuple of path and stage number, as 

996 path alone is not unique 

997 Args: 

998 f: File object to read fromls. 

999 """ 

1000 ret: dict[bytes, IndexEntry | ConflictedIndexEntry] = {} 

1001 for entry in read_index(f): 

1002 stage = entry.stage() 

1003 if stage == Stage.NORMAL: 

1004 ret[entry.name] = IndexEntry.from_serialized(entry) 

1005 else: 

1006 existing = ret.setdefault(entry.name, ConflictedIndexEntry()) 

1007 if isinstance(existing, IndexEntry): 

1008 raise AssertionError(f"Non-conflicted entry for {entry.name!r} exists") 

1009 if stage == Stage.MERGE_CONFLICT_ANCESTOR: 

1010 existing.ancestor = IndexEntry.from_serialized(entry) 

1011 elif stage == Stage.MERGE_CONFLICT_THIS: 

1012 existing.this = IndexEntry.from_serialized(entry) 

1013 elif stage == Stage.MERGE_CONFLICT_OTHER: 

1014 existing.other = IndexEntry.from_serialized(entry) 

1015 return ret 

1016 

1017 

1018def write_index( 

1019 f: IO[bytes], 

1020 entries: Sequence[SerializedIndexEntry], 

1021 version: int | None = None, 

1022 extensions: Sequence[IndexExtension] | None = None, 

1023) -> None: 

1024 """Write an index file. 

1025 

1026 Args: 

1027 f: File-like object to write to 

1028 version: Version number to write 

1029 entries: Iterable over the entries to write 

1030 extensions: Optional list of extensions to write 

1031 """ 

1032 if version is None: 

1033 version = DEFAULT_VERSION 

1034 # STEP 1: check if any extended_flags are set 

1035 uses_extended_flags = any(e.extended_flags != 0 for e in entries) 

1036 if uses_extended_flags and version < 3: 

1037 # Force or bump the version to 3 

1038 version = 3 

1039 # The rest is unchanged, but you might insert a final check: 

1040 if version < 3: 

1041 # Double-check no extended flags appear 

1042 for e in entries: 

1043 if e.extended_flags != 0: 

1044 raise AssertionError("Attempt to use extended flags in index < v3") 

1045 # Proceed with the existing code to write the header and entries. 

1046 f.write(b"DIRC") 

1047 f.write(struct.pack(b">LL", version, len(entries))) 

1048 previous_path = b"" 

1049 for entry in entries: 

1050 write_cache_entry(f, entry, version=version, previous_path=previous_path) 

1051 previous_path = entry.name 

1052 

1053 # Write extensions 

1054 if extensions: 

1055 for extension in extensions: 

1056 write_index_extension(f, extension) 

1057 

1058 

1059def write_index_dict( 

1060 f: IO[bytes], 

1061 entries: Mapping[bytes, IndexEntry | ConflictedIndexEntry], 

1062 version: int | None = None, 

1063 extensions: Sequence[IndexExtension] | None = None, 

1064) -> None: 

1065 """Write an index file based on the contents of a dictionary. 

1066 

1067 being careful to sort by path and then by stage. 

1068 """ 

1069 entries_list = [] 

1070 for key in sorted(entries): 

1071 value = entries[key] 

1072 if isinstance(value, ConflictedIndexEntry): 

1073 if value.ancestor is not None: 

1074 entries_list.append( 

1075 value.ancestor.serialize(key, Stage.MERGE_CONFLICT_ANCESTOR) 

1076 ) 

1077 if value.this is not None: 

1078 entries_list.append( 

1079 value.this.serialize(key, Stage.MERGE_CONFLICT_THIS) 

1080 ) 

1081 if value.other is not None: 

1082 entries_list.append( 

1083 value.other.serialize(key, Stage.MERGE_CONFLICT_OTHER) 

1084 ) 

1085 else: 

1086 entries_list.append(value.serialize(key, Stage.NORMAL)) 

1087 

1088 write_index(f, entries_list, version=version, extensions=extensions) 

1089 

1090 

1091def cleanup_mode(mode: int) -> int: 

1092 """Cleanup a mode value. 

1093 

1094 This will return a mode that can be stored in a tree object. 

1095 

1096 Args: 

1097 mode: Mode to clean up. 

1098 

1099 Returns: 

1100 mode 

1101 """ 

1102 if stat.S_ISLNK(mode): 

1103 return stat.S_IFLNK 

1104 elif stat.S_ISDIR(mode): 

1105 return stat.S_IFDIR 

1106 elif S_ISGITLINK(mode): 

1107 return S_IFGITLINK 

1108 ret = stat.S_IFREG | 0o644 

1109 if mode & 0o100: 

1110 ret |= 0o111 

1111 return ret 

1112 

1113 

1114class Index: 

1115 """A Git Index file.""" 

1116 

1117 _byname: dict[bytes, IndexEntry | ConflictedIndexEntry] 

1118 

1119 def __init__( 

1120 self, 

1121 filename: bytes | str | os.PathLike[str], 

1122 read: bool = True, 

1123 skip_hash: bool = False, 

1124 version: int | None = None, 

1125 *, 

1126 file_mode: int | None = None, 

1127 ) -> None: 

1128 """Create an index object associated with the given filename. 

1129 

1130 Args: 

1131 filename: Path to the index file 

1132 read: Whether to initialize the index from the given file, should it exist. 

1133 skip_hash: Whether to skip SHA1 hash when writing (for manyfiles feature) 

1134 version: Index format version to use (None = auto-detect from file or use default) 

1135 file_mode: Optional file permission mask for shared repository 

1136 """ 

1137 self._filename = os.fspath(filename) 

1138 # TODO(jelmer): Store the version returned by read_index 

1139 self._version = version 

1140 self._skip_hash = skip_hash 

1141 self._file_mode = file_mode 

1142 self._extensions: list[IndexExtension] = [] 

1143 self.clear() 

1144 if read: 

1145 self.read() 

1146 

1147 @property 

1148 def path(self) -> bytes | str: 

1149 """Get the path to the index file. 

1150 

1151 Returns: 

1152 Path to the index file 

1153 """ 

1154 return self._filename 

1155 

1156 def __repr__(self) -> str: 

1157 """Return string representation of Index.""" 

1158 return f"{self.__class__.__name__}({self._filename!r})" 

1159 

1160 def write(self) -> None: 

1161 """Write current contents of index to disk.""" 

1162 mask = self._file_mode if self._file_mode is not None else 0o644 

1163 f = GitFile(self._filename, "wb", mask=mask) 

1164 try: 

1165 # Filter out extensions with no meaningful data 

1166 meaningful_extensions = [] 

1167 for ext in self._extensions: 

1168 # Skip extensions that have empty data 

1169 ext_data = ext.to_bytes() 

1170 if ext_data: 

1171 meaningful_extensions.append(ext) 

1172 

1173 if self._skip_hash: 

1174 # When skipHash is enabled, write the index without computing SHA1 

1175 write_index_dict( 

1176 f, 

1177 self._byname, 

1178 version=self._version, 

1179 extensions=meaningful_extensions, 

1180 ) 

1181 # Write 20 zero bytes instead of SHA1 

1182 f.write(b"\x00" * 20) 

1183 f.close() 

1184 else: 

1185 sha1_writer = SHA1Writer(f) 

1186 write_index_dict( 

1187 sha1_writer, 

1188 self._byname, 

1189 version=self._version, 

1190 extensions=meaningful_extensions, 

1191 ) 

1192 sha1_writer.close() 

1193 except: 

1194 f.close() 

1195 raise 

1196 

1197 def read(self) -> None: 

1198 """Read current contents of index from disk.""" 

1199 if not os.path.exists(self._filename): 

1200 return 

1201 f = GitFile(self._filename, "rb") 

1202 try: 

1203 sha1_reader = SHA1Reader(f) 

1204 entries, version, extensions = read_index_dict_with_version(sha1_reader) 

1205 self._version = version 

1206 self._extensions = extensions 

1207 self.update(entries) 

1208 # Extensions have already been read by read_index_dict_with_version 

1209 sha1_reader.check_sha(allow_empty=True) 

1210 finally: 

1211 f.close() 

1212 

1213 def __len__(self) -> int: 

1214 """Number of entries in this index file.""" 

1215 return len(self._byname) 

1216 

1217 def __getitem__(self, key: bytes) -> IndexEntry | ConflictedIndexEntry: 

1218 """Retrieve entry by relative path and stage. 

1219 

1220 Returns: Either a IndexEntry or a ConflictedIndexEntry 

1221 Raises KeyError: if the entry does not exist 

1222 """ 

1223 return self._byname[key] 

1224 

1225 def __iter__(self) -> Iterator[bytes]: 

1226 """Iterate over the paths and stages in this index.""" 

1227 return iter(self._byname) 

1228 

1229 def __contains__(self, key: bytes) -> bool: 

1230 """Check if a path exists in the index.""" 

1231 return key in self._byname 

1232 

1233 def get_sha1(self, path: bytes) -> ObjectID: 

1234 """Return the (git object) SHA1 for the object at a path.""" 

1235 value = self[path] 

1236 if isinstance(value, ConflictedIndexEntry): 

1237 raise UnmergedEntries 

1238 return value.sha 

1239 

1240 def get_mode(self, path: bytes) -> int: 

1241 """Return the POSIX file mode for the object at a path.""" 

1242 value = self[path] 

1243 if isinstance(value, ConflictedIndexEntry): 

1244 raise UnmergedEntries 

1245 return value.mode 

1246 

1247 def iterobjects(self) -> Iterable[tuple[bytes, ObjectID, int]]: 

1248 """Iterate over path, sha, mode tuples for use with commit_tree.""" 

1249 for path in self: 

1250 entry = self[path] 

1251 if isinstance(entry, ConflictedIndexEntry): 

1252 raise UnmergedEntries 

1253 yield path, entry.sha, cleanup_mode(entry.mode) 

1254 

1255 def has_conflicts(self) -> bool: 

1256 """Check if the index contains any conflicted entries. 

1257 

1258 Returns: 

1259 True if any entries are conflicted, False otherwise 

1260 """ 

1261 for value in self._byname.values(): 

1262 if isinstance(value, ConflictedIndexEntry): 

1263 return True 

1264 return False 

1265 

1266 def clear(self) -> None: 

1267 """Remove all contents from this index.""" 

1268 self._byname = {} 

1269 

1270 def __setitem__( 

1271 self, name: bytes, value: IndexEntry | ConflictedIndexEntry 

1272 ) -> None: 

1273 """Set an entry in the index.""" 

1274 assert isinstance(name, bytes) 

1275 self._byname[name] = value 

1276 

1277 def __delitem__(self, name: bytes) -> None: 

1278 """Delete an entry from the index.""" 

1279 del self._byname[name] 

1280 

1281 def iteritems( 

1282 self, 

1283 ) -> Iterator[tuple[bytes, IndexEntry | ConflictedIndexEntry]]: 

1284 """Iterate over (path, entry) pairs in the index. 

1285 

1286 Returns: 

1287 Iterator of (path, entry) tuples 

1288 """ 

1289 return iter(self._byname.items()) 

1290 

1291 def items(self) -> Iterator[tuple[bytes, IndexEntry | ConflictedIndexEntry]]: 

1292 """Get an iterator over (path, entry) pairs. 

1293 

1294 Returns: 

1295 Iterator of (path, entry) tuples 

1296 """ 

1297 return iter(self._byname.items()) 

1298 

1299 def update(self, entries: dict[bytes, IndexEntry | ConflictedIndexEntry]) -> None: 

1300 """Update the index with multiple entries. 

1301 

1302 Args: 

1303 entries: Dictionary mapping paths to index entries 

1304 """ 

1305 for key, value in entries.items(): 

1306 self[key] = value 

1307 

1308 def paths(self) -> Generator[bytes, None, None]: 

1309 """Generate all paths in the index. 

1310 

1311 Yields: 

1312 Path names as bytes 

1313 """ 

1314 yield from self._byname.keys() 

1315 

1316 def changes_from_tree( 

1317 self, 

1318 object_store: ObjectContainer, 

1319 tree: ObjectID, 

1320 want_unchanged: bool = False, 

1321 ) -> Generator[ 

1322 tuple[ 

1323 tuple[bytes | None, bytes | None], 

1324 tuple[int | None, int | None], 

1325 tuple[bytes | None, bytes | None], 

1326 ], 

1327 None, 

1328 None, 

1329 ]: 

1330 """Find the differences between the contents of this index and a tree. 

1331 

1332 Args: 

1333 object_store: Object store to use for retrieving tree contents 

1334 tree: SHA1 of the root tree 

1335 want_unchanged: Whether unchanged files should be reported 

1336 Returns: Iterator over tuples with (oldpath, newpath), (oldmode, 

1337 newmode), (oldsha, newsha) 

1338 """ 

1339 

1340 def lookup_entry(path: bytes) -> tuple[bytes, int]: 

1341 entry = self[path] 

1342 if hasattr(entry, "sha") and hasattr(entry, "mode"): 

1343 return entry.sha, cleanup_mode(entry.mode) 

1344 else: 

1345 # Handle ConflictedIndexEntry case 

1346 return b"", 0 

1347 

1348 yield from changes_from_tree( 

1349 self.paths(), 

1350 lookup_entry, 

1351 object_store, 

1352 tree, 

1353 want_unchanged=want_unchanged, 

1354 ) 

1355 

1356 def commit(self, object_store: ObjectContainer) -> ObjectID: 

1357 """Create a new tree from an index. 

1358 

1359 Args: 

1360 object_store: Object store to save the tree in 

1361 Returns: 

1362 Root tree SHA 

1363 """ 

1364 return commit_tree(object_store, self.iterobjects()) 

1365 

1366 def is_sparse(self) -> bool: 

1367 """Check if this index contains sparse directory entries. 

1368 

1369 Returns: 

1370 True if any sparse directory extension is present 

1371 """ 

1372 return any(isinstance(ext, SparseDirExtension) for ext in self._extensions) 

1373 

1374 def ensure_full_index(self, object_store: "BaseObjectStore") -> None: 

1375 """Expand all sparse directory entries into full file entries. 

1376 

1377 This converts a sparse index into a full index by recursively 

1378 expanding any sparse directory entries into their constituent files. 

1379 

1380 Args: 

1381 object_store: Object store to read tree objects from 

1382 

1383 Raises: 

1384 KeyError: If a tree object referenced by a sparse dir entry doesn't exist 

1385 """ 

1386 if not self.is_sparse(): 

1387 return 

1388 

1389 # Find all sparse directory entries 

1390 sparse_dirs = [] 

1391 for path, entry in list(self._byname.items()): 

1392 if isinstance(entry, IndexEntry) and entry.is_sparse_dir(path): 

1393 sparse_dirs.append((path, entry)) 

1394 

1395 # Expand each sparse directory 

1396 for path, entry in sparse_dirs: 

1397 # Remove the sparse directory entry 

1398 del self._byname[path] 

1399 

1400 # Get the tree object 

1401 tree = object_store[entry.sha] 

1402 if not isinstance(tree, Tree): 

1403 raise ValueError(f"Sparse directory {path!r} points to non-tree object") 

1404 

1405 # Recursively add all entries from the tree 

1406 self._expand_tree(path.rstrip(b"/"), tree, object_store, entry) 

1407 

1408 # Remove the sparse directory extension 

1409 self._extensions = [ 

1410 ext for ext in self._extensions if not isinstance(ext, SparseDirExtension) 

1411 ] 

1412 

1413 def _expand_tree( 

1414 self, 

1415 prefix: bytes, 

1416 tree: Tree, 

1417 object_store: "BaseObjectStore", 

1418 template_entry: IndexEntry, 

1419 ) -> None: 

1420 """Recursively expand a tree into index entries. 

1421 

1422 Args: 

1423 prefix: Path prefix for entries (without trailing slash) 

1424 tree: Tree object to expand 

1425 object_store: Object store to read nested trees from 

1426 template_entry: Template entry to copy metadata from 

1427 """ 

1428 for name, mode, sha in tree.items(): 

1429 if prefix: 

1430 full_path = prefix + b"/" + name 

1431 else: 

1432 full_path = name 

1433 

1434 if stat.S_ISDIR(mode): 

1435 # Recursively expand subdirectories 

1436 subtree = object_store[sha] 

1437 if not isinstance(subtree, Tree): 

1438 raise ValueError( 

1439 f"Directory entry {full_path!r} points to non-tree object" 

1440 ) 

1441 self._expand_tree(full_path, subtree, object_store, template_entry) 

1442 else: 

1443 # Create an index entry for this file 

1444 # Use the template entry for metadata but with the file's sha and mode 

1445 new_entry = IndexEntry( 

1446 ctime=template_entry.ctime, 

1447 mtime=template_entry.mtime, 

1448 dev=template_entry.dev, 

1449 ino=template_entry.ino, 

1450 mode=mode, 

1451 uid=template_entry.uid, 

1452 gid=template_entry.gid, 

1453 size=0, # Size is unknown from tree 

1454 sha=sha, 

1455 flags=0, 

1456 extended_flags=0, # Don't copy skip-worktree flag 

1457 ) 

1458 self._byname[full_path] = new_entry 

1459 

1460 def convert_to_sparse( 

1461 self, 

1462 object_store: "BaseObjectStore", 

1463 tree_sha: ObjectID, 

1464 sparse_dirs: Set[bytes], 

1465 ) -> None: 

1466 """Convert full index entries to sparse directory entries. 

1467 

1468 This collapses directories that are entirely outside the sparse 

1469 checkout cone into single sparse directory entries. 

1470 

1471 Args: 

1472 object_store: Object store to read tree objects 

1473 tree_sha: SHA of the tree (usually HEAD) to base sparse dirs on 

1474 sparse_dirs: Set of directory paths (with trailing /) to collapse 

1475 

1476 Raises: 

1477 KeyError: If tree_sha or a subdirectory doesn't exist 

1478 """ 

1479 if not sparse_dirs: 

1480 return 

1481 

1482 # Get the base tree 

1483 tree = object_store[tree_sha] 

1484 if not isinstance(tree, Tree): 

1485 raise ValueError(f"tree_sha {tree_sha!r} is not a tree object") 

1486 

1487 # For each sparse directory, find its tree SHA and create sparse entry 

1488 for dir_path in sparse_dirs: 

1489 dir_path_stripped = dir_path.rstrip(b"/") 

1490 

1491 # Find the tree SHA for this directory 

1492 subtree_sha = self._find_subtree_sha(tree, dir_path_stripped, object_store) 

1493 if subtree_sha is None: 

1494 # Directory doesn't exist in tree, skip it 

1495 continue 

1496 

1497 # Remove all entries under this directory 

1498 entries_to_remove = [ 

1499 path 

1500 for path in self._byname 

1501 if path.startswith(dir_path) or path == dir_path_stripped 

1502 ] 

1503 for path in entries_to_remove: 

1504 del self._byname[path] 

1505 

1506 # Create a sparse directory entry 

1507 # Use minimal metadata since it's not a real file 

1508 from dulwich.objects import ObjectID 

1509 

1510 sparse_entry = IndexEntry( 

1511 ctime=0, 

1512 mtime=0, 

1513 dev=0, 

1514 ino=0, 

1515 mode=stat.S_IFDIR, 

1516 uid=0, 

1517 gid=0, 

1518 size=0, 

1519 sha=ObjectID(subtree_sha), 

1520 flags=0, 

1521 extended_flags=EXTENDED_FLAG_SKIP_WORKTREE, 

1522 ) 

1523 self._byname[dir_path] = sparse_entry 

1524 

1525 # Add sparse directory extension if not present 

1526 if not self.is_sparse(): 

1527 self._extensions.append(SparseDirExtension()) 

1528 

1529 def _find_subtree_sha( 

1530 self, 

1531 tree: Tree, 

1532 path: bytes, 

1533 object_store: "BaseObjectStore", 

1534 ) -> bytes | None: 

1535 """Find the SHA of a subtree at a given path. 

1536 

1537 Args: 

1538 tree: Root tree object to search in 

1539 path: Path to the subdirectory (no trailing slash) 

1540 object_store: Object store to read nested trees from 

1541 

1542 Returns: 

1543 SHA of the subtree, or None if path doesn't exist 

1544 """ 

1545 if not path: 

1546 return tree.id 

1547 

1548 parts = path.split(b"/") 

1549 current_tree = tree 

1550 

1551 for part in parts: 

1552 # Look for this part in the current tree 

1553 try: 

1554 mode, sha = current_tree[part] 

1555 except KeyError: 

1556 return None 

1557 

1558 if not stat.S_ISDIR(mode): 

1559 # Path component is a file, not a directory 

1560 return None 

1561 

1562 # Load the next tree 

1563 obj = object_store[sha] 

1564 if not isinstance(obj, Tree): 

1565 return None 

1566 current_tree = obj 

1567 

1568 return current_tree.id 

1569 

1570 

1571def commit_tree( 

1572 object_store: ObjectContainer, blobs: Iterable[tuple[bytes, ObjectID, int]] 

1573) -> ObjectID: 

1574 """Commit a new tree. 

1575 

1576 Args: 

1577 object_store: Object store to add trees to 

1578 blobs: Iterable over blob path, sha, mode entries 

1579 Returns: 

1580 SHA1 of the created tree. 

1581 """ 

1582 trees: dict[bytes, TreeDict] = {b"": {}} 

1583 

1584 def add_tree(path: bytes) -> TreeDict: 

1585 if path in trees: 

1586 return trees[path] 

1587 dirname, basename = pathsplit(path) 

1588 t = add_tree(dirname) 

1589 assert isinstance(basename, bytes) 

1590 newtree: TreeDict = {} 

1591 t[basename] = newtree 

1592 trees[path] = newtree 

1593 return newtree 

1594 

1595 for path, sha, mode in blobs: 

1596 tree_path, basename = pathsplit(path) 

1597 tree = add_tree(tree_path) 

1598 tree[basename] = (mode, sha) 

1599 

1600 def build_tree(path: bytes) -> ObjectID: 

1601 tree = Tree() 

1602 for basename, entry in trees[path].items(): 

1603 if isinstance(entry, dict): 

1604 mode = stat.S_IFDIR 

1605 sha = build_tree(pathjoin(path, basename)) 

1606 else: 

1607 (mode, sha) = entry 

1608 tree.add(basename, mode, sha) 

1609 object_store.add_object(tree) 

1610 return tree.id 

1611 

1612 return build_tree(b"") 

1613 

1614 

1615def commit_index(object_store: ObjectContainer, index: Index) -> ObjectID: 

1616 """Create a new tree from an index. 

1617 

1618 Args: 

1619 object_store: Object store to save the tree in 

1620 index: Index file 

1621 Note: This function is deprecated, use index.commit() instead. 

1622 Returns: Root tree sha. 

1623 """ 

1624 return commit_tree(object_store, index.iterobjects()) 

1625 

1626 

1627def changes_from_tree( 

1628 names: Iterable[bytes], 

1629 lookup_entry: Callable[[bytes], tuple[bytes, int]], 

1630 object_store: ObjectContainer, 

1631 tree: ObjectID | None, 

1632 want_unchanged: bool = False, 

1633) -> Iterable[ 

1634 tuple[ 

1635 tuple[bytes | None, bytes | None], 

1636 tuple[int | None, int | None], 

1637 tuple[bytes | None, bytes | None], 

1638 ] 

1639]: 

1640 """Find the differences between the contents of a tree and a working copy. 

1641 

1642 Args: 

1643 names: Iterable of names in the working copy 

1644 lookup_entry: Function to lookup an entry in the working copy 

1645 object_store: Object store to use for retrieving tree contents 

1646 tree: SHA1 of the root tree, or None for an empty tree 

1647 want_unchanged: Whether unchanged files should be reported 

1648 Returns: Iterator over tuples with (oldpath, newpath), (oldmode, newmode), 

1649 (oldsha, newsha) 

1650 """ 

1651 # TODO(jelmer): Support a include_trees option 

1652 other_names = set(names) 

1653 

1654 if tree is not None: 

1655 for name, mode, sha in iter_tree_contents(object_store, tree): 

1656 assert name is not None and mode is not None and sha is not None 

1657 try: 

1658 (other_sha, other_mode) = lookup_entry(name) 

1659 except KeyError: 

1660 # Was removed 

1661 yield ((name, None), (mode, None), (sha, None)) 

1662 else: 

1663 other_names.remove(name) 

1664 if want_unchanged or other_sha != sha or other_mode != mode: 

1665 yield ((name, name), (mode, other_mode), (sha, other_sha)) 

1666 

1667 # Mention added files 

1668 for name in other_names: 

1669 try: 

1670 (other_sha, other_mode) = lookup_entry(name) 

1671 except KeyError: 

1672 pass 

1673 else: 

1674 yield ((None, name), (None, other_mode), (None, other_sha)) 

1675 

1676 

1677def index_entry_from_stat( 

1678 stat_val: os.stat_result, 

1679 hex_sha: bytes, 

1680 mode: int | None = None, 

1681) -> IndexEntry: 

1682 """Create a new index entry from a stat value. 

1683 

1684 Args: 

1685 stat_val: POSIX stat_result instance 

1686 hex_sha: Hex sha of the object 

1687 mode: Optional file mode, will be derived from stat if not provided 

1688 """ 

1689 if mode is None: 

1690 mode = cleanup_mode(stat_val.st_mode) 

1691 

1692 from dulwich.objects import ObjectID 

1693 

1694 # Use nanosecond precision when available to avoid precision loss 

1695 # through float representation 

1696 ctime: int | float | tuple[int, int] 

1697 mtime: int | float | tuple[int, int] 

1698 st_ctime_ns = getattr(stat_val, "st_ctime_ns", None) 

1699 if st_ctime_ns is not None: 

1700 ctime = ( 

1701 st_ctime_ns // 1_000_000_000, 

1702 st_ctime_ns % 1_000_000_000, 

1703 ) 

1704 else: 

1705 ctime = stat_val.st_ctime 

1706 

1707 st_mtime_ns = getattr(stat_val, "st_mtime_ns", None) 

1708 if st_mtime_ns is not None: 

1709 mtime = ( 

1710 st_mtime_ns // 1_000_000_000, 

1711 st_mtime_ns % 1_000_000_000, 

1712 ) 

1713 else: 

1714 mtime = stat_val.st_mtime 

1715 

1716 return IndexEntry( 

1717 ctime=ctime, 

1718 mtime=mtime, 

1719 dev=stat_val.st_dev, 

1720 ino=stat_val.st_ino, 

1721 mode=mode, 

1722 uid=stat_val.st_uid, 

1723 gid=stat_val.st_gid, 

1724 size=stat_val.st_size, 

1725 sha=ObjectID(hex_sha), 

1726 flags=0, 

1727 extended_flags=0, 

1728 ) 

1729 

1730 

1731if sys.platform == "win32": 

1732 # On Windows, creating symlinks either requires administrator privileges 

1733 # or developer mode. Raise a more helpful error when we're unable to 

1734 # create symlinks 

1735 

1736 # https://github.com/jelmer/dulwich/issues/1005 

1737 

1738 class WindowsSymlinkPermissionError(PermissionError): 

1739 """Windows-specific error for symlink creation failures. 

1740 

1741 This error is raised when symlink creation fails on Windows, 

1742 typically due to lack of developer mode or administrator privileges. 

1743 """ 

1744 

1745 def __init__(self, errno: int, msg: str, filename: str | None) -> None: 

1746 """Initialize WindowsSymlinkPermissionError.""" 

1747 super().__init__( 

1748 errno, 

1749 f"Unable to create symlink; do you have developer mode enabled? {msg}", 

1750 filename, 

1751 ) 

1752 

1753 def symlink( 

1754 src: str | bytes, 

1755 dst: str | bytes, 

1756 target_is_directory: bool = False, 

1757 *, 

1758 dir_fd: int | None = None, 

1759 ) -> None: 

1760 """Create a symbolic link on Windows with better error handling. 

1761 

1762 Args: 

1763 src: Source path for the symlink 

1764 dst: Destination path where symlink will be created 

1765 target_is_directory: Whether the target is a directory 

1766 dir_fd: Optional directory file descriptor 

1767 

1768 Raises: 

1769 WindowsSymlinkPermissionError: If symlink creation fails due to permissions 

1770 """ 

1771 try: 

1772 return os.symlink( 

1773 src, dst, target_is_directory=target_is_directory, dir_fd=dir_fd 

1774 ) 

1775 except PermissionError as e: 

1776 raise WindowsSymlinkPermissionError( 

1777 e.errno or 0, e.strerror or "", e.filename 

1778 ) from e 

1779else: 

1780 symlink = os.symlink 

1781 

1782 

1783def build_file_from_blob( 

1784 blob: Blob, 

1785 mode: int, 

1786 target_path: bytes, 

1787 *, 

1788 honor_filemode: bool = True, 

1789 tree_encoding: str = "utf-8", 

1790 symlink_fn: Callable[ 

1791 [str | bytes | os.PathLike[str], str | bytes | os.PathLike[str]], None 

1792 ] 

1793 | None = None, 

1794) -> os.stat_result: 

1795 """Build a file or symlink on disk based on a Git object. 

1796 

1797 Args: 

1798 blob: The git object 

1799 mode: File mode 

1800 target_path: Path to write to 

1801 honor_filemode: An optional flag to honor core.filemode setting in 

1802 config file, default is core.filemode=True, change executable bit 

1803 tree_encoding: Encoding to use for tree contents 

1804 symlink_fn: Function to use for creating symlinks 

1805 Returns: stat object for the file 

1806 """ 

1807 try: 

1808 oldstat = os.lstat(target_path) 

1809 except FileNotFoundError: 

1810 oldstat = None 

1811 contents = blob.as_raw_string() 

1812 if stat.S_ISLNK(mode): 

1813 if oldstat: 

1814 _remove_file_with_readonly_handling(target_path) 

1815 if sys.platform == "win32": 

1816 # os.readlink on Python3 on Windows requires a unicode string. 

1817 contents_str = contents.decode(tree_encoding) 

1818 target_path_str = target_path.decode(tree_encoding) 

1819 (symlink_fn or symlink)(contents_str, target_path_str) 

1820 else: 

1821 (symlink_fn or symlink)(contents, target_path) 

1822 else: 

1823 if oldstat is not None and oldstat.st_size == len(contents): 

1824 with open(target_path, "rb") as f: 

1825 if f.read() == contents: 

1826 return oldstat 

1827 

1828 with open(target_path, "wb") as f: 

1829 # Write out file 

1830 f.write(contents) 

1831 

1832 if honor_filemode: 

1833 os.chmod(target_path, mode) 

1834 

1835 return os.lstat(target_path) 

1836 

1837 

1838INVALID_DOTNAMES = (b".git", b".", b"..", b"") 

1839 

1840 

1841def _normalize_path_element_default(element: bytes) -> bytes: 

1842 """Normalize path element for default case-insensitive comparison.""" 

1843 return element.lower() 

1844 

1845 

1846def _normalize_path_element_ntfs(element: bytes) -> bytes: 

1847 """Normalize path element for NTFS filesystem.""" 

1848 return element.rstrip(b". ").lower() 

1849 

1850 

1851def _normalize_path_element_hfs(element: bytes) -> bytes: 

1852 """Normalize path element for HFS+ filesystem.""" 

1853 import unicodedata 

1854 

1855 # Decode to Unicode (let UnicodeDecodeError bubble up) 

1856 element_str = element.decode("utf-8", errors="strict") 

1857 

1858 # Remove HFS+ ignorable characters 

1859 filtered = "".join(c for c in element_str if ord(c) not in HFS_IGNORABLE_CHARS) 

1860 # Normalize to NFD 

1861 normalized = unicodedata.normalize("NFD", filtered) 

1862 return normalized.lower().encode("utf-8", errors="strict") 

1863 

1864 

1865def get_path_element_normalizer(config: "Config") -> Callable[[bytes], bytes]: 

1866 """Get the appropriate path element normalization function based on config. 

1867 

1868 Args: 

1869 config: Repository configuration object 

1870 

1871 Returns: 

1872 Function that normalizes path elements for the configured filesystem 

1873 """ 

1874 import os 

1875 import sys 

1876 

1877 if config.get_boolean(b"core", b"protectNTFS", os.name == "nt"): 

1878 return _normalize_path_element_ntfs 

1879 elif config.get_boolean(b"core", b"protectHFS", sys.platform == "darwin"): 

1880 return _normalize_path_element_hfs 

1881 else: 

1882 return _normalize_path_element_default 

1883 

1884 

1885def validate_path_element_default(element: bytes) -> bool: 

1886 """Validate a path element using default rules. 

1887 

1888 Args: 

1889 element: Path element to validate 

1890 

1891 Returns: 

1892 True if path element is valid, False otherwise 

1893 """ 

1894 return _normalize_path_element_default(element) not in INVALID_DOTNAMES 

1895 

1896 

1897def validate_path_element_ntfs(element: bytes) -> bool: 

1898 """Validate a path element using NTFS filesystem rules. 

1899 

1900 Args: 

1901 element: Path element to validate 

1902 

1903 Returns: 

1904 True if path element is valid for NTFS, False otherwise 

1905 """ 

1906 normalized = _normalize_path_element_ntfs(element) 

1907 if normalized in INVALID_DOTNAMES: 

1908 return False 

1909 if normalized == b"git~1": 

1910 return False 

1911 return True 

1912 

1913 

1914# HFS+ ignorable Unicode codepoints (from Git's utf8.c) 

1915HFS_IGNORABLE_CHARS = { 

1916 0x200C, # ZERO WIDTH NON-JOINER 

1917 0x200D, # ZERO WIDTH JOINER 

1918 0x200E, # LEFT-TO-RIGHT MARK 

1919 0x200F, # RIGHT-TO-LEFT MARK 

1920 0x202A, # LEFT-TO-RIGHT EMBEDDING 

1921 0x202B, # RIGHT-TO-LEFT EMBEDDING 

1922 0x202C, # POP DIRECTIONAL FORMATTING 

1923 0x202D, # LEFT-TO-RIGHT OVERRIDE 

1924 0x202E, # RIGHT-TO-LEFT OVERRIDE 

1925 0x206A, # INHIBIT SYMMETRIC SWAPPING 

1926 0x206B, # ACTIVATE SYMMETRIC SWAPPING 

1927 0x206C, # INHIBIT ARABIC FORM SHAPING 

1928 0x206D, # ACTIVATE ARABIC FORM SHAPING 

1929 0x206E, # NATIONAL DIGIT SHAPES 

1930 0x206F, # NOMINAL DIGIT SHAPES 

1931 0xFEFF, # ZERO WIDTH NO-BREAK SPACE 

1932} 

1933 

1934 

1935def validate_path_element_hfs(element: bytes) -> bool: 

1936 """Validate path element for HFS+ filesystem. 

1937 

1938 Equivalent to Git's is_hfs_dotgit and related checks. 

1939 Uses NFD normalization and ignores HFS+ ignorable characters. 

1940 """ 

1941 try: 

1942 normalized = _normalize_path_element_hfs(element) 

1943 except UnicodeDecodeError: 

1944 # Malformed UTF-8 - be conservative and reject 

1945 return False 

1946 

1947 # Check against invalid names 

1948 if normalized in INVALID_DOTNAMES: 

1949 return False 

1950 

1951 # Also check for 8.3 short name 

1952 if normalized == b"git~1": 

1953 return False 

1954 

1955 return True 

1956 

1957 

1958def validate_path( 

1959 path: bytes, 

1960 element_validator: Callable[[bytes], bool] = validate_path_element_default, 

1961) -> bool: 

1962 """Default path validator that just checks for .git/.""" 

1963 parts = path.split(b"/") 

1964 for p in parts: 

1965 if not element_validator(p): 

1966 return False 

1967 else: 

1968 return True 

1969 

1970 

1971def build_index_from_tree( 

1972 root_path: str | bytes, 

1973 index_path: str | bytes, 

1974 object_store: ObjectContainer, 

1975 tree_id: ObjectID, 

1976 honor_filemode: bool = True, 

1977 validate_path_element: Callable[[bytes], bool] = validate_path_element_default, 

1978 symlink_fn: Callable[ 

1979 [str | bytes | os.PathLike[str], str | bytes | os.PathLike[str]], None 

1980 ] 

1981 | None = None, 

1982 blob_normalizer: "FilterBlobNormalizer | None" = None, 

1983 tree_encoding: str = "utf-8", 

1984) -> None: 

1985 """Generate and materialize index from a tree. 

1986 

1987 Args: 

1988 tree_id: Tree to materialize 

1989 root_path: Target dir for materialized index files 

1990 index_path: Target path for generated index 

1991 object_store: Non-empty object store holding tree contents 

1992 honor_filemode: An optional flag to honor core.filemode setting in 

1993 config file, default is core.filemode=True, change executable bit 

1994 validate_path_element: Function to validate path elements to check 

1995 out; default just refuses .git and .. directories. 

1996 symlink_fn: Function to use for creating symlinks 

1997 blob_normalizer: An optional BlobNormalizer to use for converting line 

1998 endings when writing blobs to the working directory. 

1999 tree_encoding: Encoding used for tree paths (default: utf-8) 

2000 

2001 Note: existing index is wiped and contents are not merged 

2002 in a working dir. Suitable only for fresh clones. 

2003 """ 

2004 index = Index(index_path, read=False) 

2005 if not isinstance(root_path, bytes): 

2006 root_path = os.fsencode(root_path) 

2007 

2008 for entry in iter_tree_contents(object_store, tree_id): 

2009 assert ( 

2010 entry.path is not None and entry.mode is not None and entry.sha is not None 

2011 ) 

2012 if not validate_path(entry.path, validate_path_element): 

2013 continue 

2014 full_path = _tree_to_fs_path(root_path, entry.path, tree_encoding) 

2015 

2016 if not os.path.exists(os.path.dirname(full_path)): 

2017 os.makedirs(os.path.dirname(full_path)) 

2018 

2019 # TODO(jelmer): Merge new index into working tree 

2020 if S_ISGITLINK(entry.mode): 

2021 if not os.path.isdir(full_path): 

2022 os.mkdir(full_path) 

2023 st = os.lstat(full_path) 

2024 # TODO(jelmer): record and return submodule paths 

2025 else: 

2026 obj = object_store[entry.sha] 

2027 assert isinstance(obj, Blob) 

2028 # Apply blob normalization for checkout if normalizer is provided 

2029 if blob_normalizer is not None: 

2030 obj = blob_normalizer.checkout_normalize(obj, entry.path) 

2031 st = build_file_from_blob( 

2032 obj, 

2033 entry.mode, 

2034 full_path, 

2035 honor_filemode=honor_filemode, 

2036 tree_encoding=tree_encoding, 

2037 symlink_fn=symlink_fn, 

2038 ) 

2039 

2040 # Add file to index 

2041 if not honor_filemode or S_ISGITLINK(entry.mode): 

2042 # we can not use tuple slicing to build a new tuple, 

2043 # because on windows that will convert the times to 

2044 # longs, which causes errors further along 

2045 st_tuple = ( 

2046 entry.mode, 

2047 st.st_ino, 

2048 st.st_dev, 

2049 st.st_nlink, 

2050 st.st_uid, 

2051 st.st_gid, 

2052 st.st_size, 

2053 st.st_atime, 

2054 st.st_mtime, 

2055 st.st_ctime, 

2056 ) 

2057 st = st.__class__(st_tuple) 

2058 # default to a stage 0 index entry (normal) 

2059 # when reading from the filesystem 

2060 index[entry.path] = index_entry_from_stat(st, entry.sha) 

2061 

2062 index.write() 

2063 

2064 

2065def blob_from_path_and_mode( 

2066 fs_path: bytes, mode: int, tree_encoding: str = "utf-8" 

2067) -> Blob: 

2068 """Create a blob from a path and a stat object. 

2069 

2070 Args: 

2071 fs_path: Full file system path to file 

2072 mode: File mode 

2073 tree_encoding: Encoding to use for tree contents 

2074 Returns: A `Blob` object 

2075 """ 

2076 assert isinstance(fs_path, bytes) 

2077 blob = Blob() 

2078 if stat.S_ISLNK(mode): 

2079 if sys.platform == "win32": 

2080 # os.readlink on Python3 on Windows requires a unicode string. 

2081 blob.data = os.readlink(os.fsdecode(fs_path)).encode(tree_encoding) 

2082 else: 

2083 blob.data = os.readlink(fs_path) 

2084 else: 

2085 with open(fs_path, "rb") as f: 

2086 blob.data = f.read() 

2087 return blob 

2088 

2089 

2090def blob_from_path_and_stat( 

2091 fs_path: bytes, st: os.stat_result, tree_encoding: str = "utf-8" 

2092) -> Blob: 

2093 """Create a blob from a path and a stat object. 

2094 

2095 Args: 

2096 fs_path: Full file system path to file 

2097 st: A stat object 

2098 tree_encoding: Encoding to use for tree contents 

2099 Returns: A `Blob` object 

2100 """ 

2101 return blob_from_path_and_mode(fs_path, st.st_mode, tree_encoding) 

2102 

2103 

2104def read_submodule_head(path: str | bytes) -> bytes | None: 

2105 """Read the head commit of a submodule. 

2106 

2107 Args: 

2108 path: path to the submodule 

2109 Returns: HEAD sha, None if not a valid head/repository 

2110 """ 

2111 from .errors import NotGitRepository 

2112 from .repo import Repo 

2113 

2114 # Repo currently expects a "str", so decode if necessary. 

2115 # TODO(jelmer): Perhaps move this into Repo() ? 

2116 if not isinstance(path, str): 

2117 path = os.fsdecode(path) 

2118 try: 

2119 repo = Repo(path) 

2120 except NotGitRepository: 

2121 return None 

2122 try: 

2123 return repo.head() 

2124 except KeyError: 

2125 return None 

2126 

2127 

2128def _has_directory_changed(tree_path: bytes, entry: IndexEntry) -> bool: 

2129 """Check if a directory has changed after getting an error. 

2130 

2131 When handling an error trying to create a blob from a path, call this 

2132 function. It will check if the path is a directory. If it's a directory 

2133 and a submodule, check the submodule head to see if it's has changed. If 

2134 not, consider the file as changed as Git tracked a file and not a 

2135 directory. 

2136 

2137 Return true if the given path should be considered as changed and False 

2138 otherwise or if the path is not a directory. 

2139 """ 

2140 # This is actually a directory 

2141 if os.path.exists(os.path.join(tree_path, b".git")): 

2142 # Submodule 

2143 head = read_submodule_head(tree_path) 

2144 if entry.sha != head: 

2145 return True 

2146 else: 

2147 # The file was changed to a directory, so consider it removed. 

2148 return True 

2149 

2150 return False 

2151 

2152 

2153os_sep_bytes = os.sep.encode("ascii") 

2154 

2155 

2156def _ensure_parent_dir_exists(full_path: bytes) -> None: 

2157 """Ensure parent directory exists, checking no parent is a file.""" 

2158 parent_dir = os.path.dirname(full_path) 

2159 if parent_dir and not os.path.exists(parent_dir): 

2160 # Walk up the directory tree to find the first existing parent 

2161 current = parent_dir 

2162 parents_to_check: list[bytes] = [] 

2163 

2164 while current and not os.path.exists(current): 

2165 parents_to_check.insert(0, current) 

2166 new_parent = os.path.dirname(current) 

2167 if new_parent == current: 

2168 # Reached the root or can't go up further 

2169 break 

2170 current = new_parent 

2171 

2172 # Check if the existing parent (if any) is a directory 

2173 if current and os.path.exists(current) and not os.path.isdir(current): 

2174 raise OSError( 

2175 f"Cannot create directory, parent path is a file: {current!r}" 

2176 ) 

2177 

2178 # Now check each parent we need to create isn't blocked by an existing file 

2179 for parent_path in parents_to_check: 

2180 if os.path.exists(parent_path) and not os.path.isdir(parent_path): 

2181 raise OSError( 

2182 f"Cannot create directory, parent path is a file: {parent_path!r}" 

2183 ) 

2184 

2185 os.makedirs(parent_dir) 

2186 

2187 

2188def _remove_file_with_readonly_handling(path: bytes) -> None: 

2189 """Remove a file, handling read-only files on Windows. 

2190 

2191 Args: 

2192 path: Path to the file to remove 

2193 """ 

2194 try: 

2195 os.unlink(path) 

2196 except PermissionError: 

2197 # On Windows, remove read-only attribute and retry 

2198 if sys.platform == "win32": 

2199 os.chmod(path, stat.S_IWRITE | stat.S_IREAD) 

2200 os.unlink(path) 

2201 else: 

2202 raise 

2203 

2204 

2205def _remove_empty_parents(path: bytes, stop_at: bytes) -> None: 

2206 """Remove empty parent directories up to stop_at.""" 

2207 parent = os.path.dirname(path) 

2208 while parent and parent != stop_at: 

2209 try: 

2210 os.rmdir(parent) 

2211 parent = os.path.dirname(parent) 

2212 except FileNotFoundError: 

2213 # Directory doesn't exist - stop trying 

2214 break 

2215 except OSError as e: 

2216 if e.errno in (errno.ENOTEMPTY, errno.EEXIST): 

2217 # Directory not empty - stop trying 

2218 break 

2219 raise 

2220 

2221 

2222def _check_symlink_matches( 

2223 full_path: bytes, repo_object_store: "BaseObjectStore", entry_sha: ObjectID 

2224) -> bool: 

2225 """Check if symlink target matches expected target. 

2226 

2227 Returns True if symlink matches, False if it doesn't match. 

2228 """ 

2229 try: 

2230 current_target = os.readlink(full_path) 

2231 blob_obj = repo_object_store[entry_sha] 

2232 expected_target = blob_obj.as_raw_string() 

2233 if isinstance(current_target, str): 

2234 current_target = current_target.encode() 

2235 return current_target == expected_target 

2236 except FileNotFoundError: 

2237 # Symlink doesn't exist 

2238 return False 

2239 except OSError as e: 

2240 if e.errno == errno.EINVAL: 

2241 # Not a symlink 

2242 return False 

2243 raise 

2244 

2245 

2246def _check_file_matches( 

2247 repo_object_store: "BaseObjectStore", 

2248 full_path: bytes, 

2249 entry_sha: ObjectID, 

2250 entry_mode: int, 

2251 current_stat: os.stat_result, 

2252 honor_filemode: bool, 

2253 blob_normalizer: "FilterBlobNormalizer | None" = None, 

2254 tree_path: bytes | None = None, 

2255) -> bool: 

2256 """Check if a file on disk matches the expected git object. 

2257 

2258 Returns True if file matches, False if it doesn't match. 

2259 """ 

2260 # Check mode first (if honor_filemode is True) 

2261 if honor_filemode: 

2262 current_mode = stat.S_IMODE(current_stat.st_mode) 

2263 expected_mode = stat.S_IMODE(entry_mode) 

2264 

2265 # For regular files, only check the user executable bit, not group/other permissions 

2266 # This matches Git's behavior where umask differences don't count as modifications 

2267 if stat.S_ISREG(current_stat.st_mode): 

2268 # Normalize regular file modes to ignore group/other write permissions 

2269 current_mode_normalized = ( 

2270 current_mode & 0o755 

2271 ) # Keep only user rwx and all read+execute 

2272 expected_mode_normalized = expected_mode & 0o755 

2273 

2274 # For Git compatibility, regular files should be either 644 or 755 

2275 if expected_mode_normalized not in (0o644, 0o755): 

2276 expected_mode_normalized = 0o644 # Default for regular files 

2277 if current_mode_normalized not in (0o644, 0o755): 

2278 # Determine if it should be executable based on user execute bit 

2279 if current_mode & 0o100: # User execute bit is set 

2280 current_mode_normalized = 0o755 

2281 else: 

2282 current_mode_normalized = 0o644 

2283 

2284 if current_mode_normalized != expected_mode_normalized: 

2285 return False 

2286 else: 

2287 # For non-regular files (symlinks, etc.), check mode exactly 

2288 if current_mode != expected_mode: 

2289 return False 

2290 

2291 # If mode matches (or we don't care), check content via size first 

2292 blob_obj = repo_object_store[entry_sha] 

2293 if current_stat.st_size != blob_obj.raw_length(): 

2294 return False 

2295 

2296 # Size matches, check actual content 

2297 try: 

2298 with open(full_path, "rb") as f: 

2299 current_content = f.read() 

2300 expected_content = blob_obj.as_raw_string() 

2301 if blob_normalizer and tree_path is not None: 

2302 assert isinstance(blob_obj, Blob) 

2303 normalized_blob = blob_normalizer.checkout_normalize( 

2304 blob_obj, tree_path 

2305 ) 

2306 expected_content = normalized_blob.as_raw_string() 

2307 return current_content == expected_content 

2308 except (FileNotFoundError, PermissionError, IsADirectoryError): 

2309 return False 

2310 

2311 

2312def _transition_to_submodule( 

2313 repo: "Repo", 

2314 path: bytes, 

2315 full_path: bytes, 

2316 current_stat: os.stat_result | None, 

2317 entry: IndexEntry | TreeEntry, 

2318 index: Index, 

2319) -> None: 

2320 """Transition any type to submodule.""" 

2321 from .submodule import ensure_submodule_placeholder 

2322 

2323 if current_stat is not None and stat.S_ISDIR(current_stat.st_mode): 

2324 # Already a directory, just ensure .git file exists 

2325 ensure_submodule_placeholder(repo, path) 

2326 else: 

2327 # Remove whatever is there and create submodule 

2328 if current_stat is not None: 

2329 _remove_file_with_readonly_handling(full_path) 

2330 ensure_submodule_placeholder(repo, path) 

2331 

2332 st = os.lstat(full_path) 

2333 assert entry.sha is not None 

2334 index[path] = index_entry_from_stat(st, entry.sha) 

2335 

2336 

2337def _transition_to_file( 

2338 object_store: "BaseObjectStore", 

2339 path: bytes, 

2340 full_path: bytes, 

2341 current_stat: os.stat_result | None, 

2342 entry: IndexEntry | TreeEntry, 

2343 index: Index, 

2344 honor_filemode: bool, 

2345 symlink_fn: Callable[ 

2346 [str | bytes | os.PathLike[str], str | bytes | os.PathLike[str]], None 

2347 ] 

2348 | None, 

2349 blob_normalizer: "FilterBlobNormalizer | None", 

2350 tree_encoding: str = "utf-8", 

2351) -> None: 

2352 """Transition any type to regular file or symlink.""" 

2353 assert entry.sha is not None and entry.mode is not None 

2354 # Check if we need to update 

2355 if ( 

2356 current_stat is not None 

2357 and stat.S_ISREG(current_stat.st_mode) 

2358 and not stat.S_ISLNK(entry.mode) 

2359 ): 

2360 # File to file - check if update needed 

2361 file_matches = _check_file_matches( 

2362 object_store, 

2363 full_path, 

2364 entry.sha, 

2365 entry.mode, 

2366 current_stat, 

2367 honor_filemode, 

2368 blob_normalizer, 

2369 path, 

2370 ) 

2371 needs_update = not file_matches 

2372 elif ( 

2373 current_stat is not None 

2374 and stat.S_ISLNK(current_stat.st_mode) 

2375 and stat.S_ISLNK(entry.mode) 

2376 ): 

2377 # Symlink to symlink - check if update needed 

2378 symlink_matches = _check_symlink_matches(full_path, object_store, entry.sha) 

2379 needs_update = not symlink_matches 

2380 else: 

2381 needs_update = True 

2382 

2383 if not needs_update: 

2384 # Just update index - current_stat should always be valid here since we're not updating 

2385 assert current_stat is not None 

2386 index[path] = index_entry_from_stat(current_stat, entry.sha) 

2387 return 

2388 

2389 # Remove existing entry if needed 

2390 if current_stat is not None and stat.S_ISDIR(current_stat.st_mode): 

2391 # Remove directory 

2392 dir_contents = set(os.listdir(full_path)) 

2393 git_file_name = b".git" if isinstance(full_path, bytes) else ".git" 

2394 

2395 if git_file_name in dir_contents: 

2396 if dir_contents != {git_file_name}: 

2397 raise IsADirectoryError( 

2398 f"Cannot replace submodule with untracked files: {full_path!r}" 

2399 ) 

2400 shutil.rmtree(full_path) 

2401 else: 

2402 try: 

2403 os.rmdir(full_path) 

2404 except OSError as e: 

2405 if e.errno in (errno.ENOTEMPTY, errno.EEXIST): 

2406 raise IsADirectoryError( 

2407 f"Cannot replace non-empty directory with file: {full_path!r}" 

2408 ) 

2409 raise 

2410 elif current_stat is not None: 

2411 _remove_file_with_readonly_handling(full_path) 

2412 

2413 # Ensure parent directory exists 

2414 _ensure_parent_dir_exists(full_path) 

2415 

2416 # Write the file 

2417 blob_obj = object_store[entry.sha] 

2418 assert isinstance(blob_obj, Blob) 

2419 if blob_normalizer: 

2420 blob_obj = blob_normalizer.checkout_normalize(blob_obj, path) 

2421 st = build_file_from_blob( 

2422 blob_obj, 

2423 entry.mode, 

2424 full_path, 

2425 honor_filemode=honor_filemode, 

2426 tree_encoding=tree_encoding, 

2427 symlink_fn=symlink_fn, 

2428 ) 

2429 index[path] = index_entry_from_stat(st, entry.sha) 

2430 

2431 

2432def _transition_to_absent( 

2433 repo: "Repo", 

2434 path: bytes, 

2435 full_path: bytes, 

2436 current_stat: os.stat_result | None, 

2437 index: Index, 

2438) -> None: 

2439 """Remove any type of entry.""" 

2440 if current_stat is None: 

2441 return 

2442 

2443 if stat.S_ISDIR(current_stat.st_mode): 

2444 # Check if it's a submodule directory 

2445 dir_contents = set(os.listdir(full_path)) 

2446 git_file_name = b".git" if isinstance(full_path, bytes) else ".git" 

2447 

2448 if git_file_name in dir_contents and dir_contents == {git_file_name}: 

2449 shutil.rmtree(full_path) 

2450 else: 

2451 try: 

2452 os.rmdir(full_path) 

2453 except OSError as e: 

2454 if e.errno not in (errno.ENOTEMPTY, errno.EEXIST): 

2455 raise 

2456 else: 

2457 _remove_file_with_readonly_handling(full_path) 

2458 

2459 try: 

2460 del index[path] 

2461 except KeyError: 

2462 pass 

2463 

2464 # Try to remove empty parent directories 

2465 _remove_empty_parents( 

2466 full_path, repo.path if isinstance(repo.path, bytes) else repo.path.encode() 

2467 ) 

2468 

2469 

2470def detect_case_only_renames( 

2471 changes: Sequence["TreeChange"], 

2472 config: "Config", 

2473) -> list["TreeChange"]: 

2474 """Detect and transform case-only renames in a list of tree changes. 

2475 

2476 This function identifies file renames that only differ in case (e.g., 

2477 README.txt -> readme.txt) and transforms matching ADD/DELETE pairs into 

2478 CHANGE_RENAME operations. It uses filesystem-appropriate path normalization 

2479 based on the repository configuration. 

2480 

2481 Args: 

2482 changes: List of TreeChange objects representing file changes 

2483 config: Repository configuration object 

2484 

2485 Returns: 

2486 New list of TreeChange objects with case-only renames converted to CHANGE_RENAME 

2487 """ 

2488 from .diff_tree import ( 

2489 CHANGE_ADD, 

2490 CHANGE_COPY, 

2491 CHANGE_DELETE, 

2492 CHANGE_MODIFY, 

2493 CHANGE_RENAME, 

2494 TreeChange, 

2495 ) 

2496 

2497 # Build dictionaries of old and new paths with their normalized forms 

2498 old_paths_normalized = {} 

2499 new_paths_normalized = {} 

2500 old_changes = {} # Map from old path to change object 

2501 new_changes = {} # Map from new path to change object 

2502 

2503 # Get the appropriate normalizer based on config 

2504 normalize_func = get_path_element_normalizer(config) 

2505 

2506 def normalize_path(path: bytes) -> bytes: 

2507 """Normalize entire path using element normalization.""" 

2508 return b"/".join(normalize_func(part) for part in path.split(b"/")) 

2509 

2510 # Pre-normalize all paths once to avoid repeated normalization 

2511 for change in changes: 

2512 if change.type == CHANGE_DELETE and change.old: 

2513 assert change.old.path is not None 

2514 try: 

2515 normalized = normalize_path(change.old.path) 

2516 except UnicodeDecodeError: 

2517 import logging 

2518 

2519 logging.warning( 

2520 "Skipping case-only rename detection for path with invalid UTF-8: %r", 

2521 change.old.path, 

2522 ) 

2523 else: 

2524 old_paths_normalized[normalized] = change.old.path 

2525 old_changes[change.old.path] = change 

2526 elif change.type == CHANGE_RENAME and change.old: 

2527 assert change.old.path is not None 

2528 # Treat RENAME as DELETE + ADD for case-only detection 

2529 try: 

2530 normalized = normalize_path(change.old.path) 

2531 except UnicodeDecodeError: 

2532 import logging 

2533 

2534 logging.warning( 

2535 "Skipping case-only rename detection for path with invalid UTF-8: %r", 

2536 change.old.path, 

2537 ) 

2538 else: 

2539 old_paths_normalized[normalized] = change.old.path 

2540 old_changes[change.old.path] = change 

2541 

2542 if ( 

2543 change.type in (CHANGE_ADD, CHANGE_MODIFY, CHANGE_RENAME, CHANGE_COPY) 

2544 and change.new 

2545 ): 

2546 assert change.new.path is not None 

2547 try: 

2548 normalized = normalize_path(change.new.path) 

2549 except UnicodeDecodeError: 

2550 import logging 

2551 

2552 logging.warning( 

2553 "Skipping case-only rename detection for path with invalid UTF-8: %r", 

2554 change.new.path, 

2555 ) 

2556 else: 

2557 new_paths_normalized[normalized] = change.new.path 

2558 new_changes[change.new.path] = change 

2559 

2560 # Find case-only renames and transform changes 

2561 case_only_renames = set() 

2562 new_rename_changes = [] 

2563 

2564 for norm_path, old_path in old_paths_normalized.items(): 

2565 if norm_path in new_paths_normalized: 

2566 new_path = new_paths_normalized[norm_path] 

2567 if old_path != new_path: 

2568 # Found a case-only rename 

2569 old_change = old_changes[old_path] 

2570 new_change = new_changes[new_path] 

2571 

2572 # Create a CHANGE_RENAME to replace the DELETE and ADD/MODIFY pair 

2573 if new_change.type == CHANGE_ADD: 

2574 # Simple case: DELETE + ADD becomes RENAME 

2575 rename_change = TreeChange( 

2576 CHANGE_RENAME, old_change.old, new_change.new 

2577 ) 

2578 else: 

2579 # Complex case: DELETE + MODIFY becomes RENAME 

2580 # Use the old file from DELETE and new file from MODIFY 

2581 rename_change = TreeChange( 

2582 CHANGE_RENAME, old_change.old, new_change.new 

2583 ) 

2584 

2585 new_rename_changes.append(rename_change) 

2586 

2587 # Mark the old changes for removal 

2588 case_only_renames.add(old_change) 

2589 case_only_renames.add(new_change) 

2590 

2591 # Return new list with original ADD/DELETE changes replaced by renames 

2592 result = [change for change in changes if change not in case_only_renames] 

2593 result.extend(new_rename_changes) 

2594 return result 

2595 

2596 

2597def update_working_tree( 

2598 repo: "Repo", 

2599 old_tree_id: bytes | None, 

2600 new_tree_id: bytes, 

2601 change_iterator: Iterator["TreeChange"], 

2602 honor_filemode: bool = True, 

2603 validate_path_element: Callable[[bytes], bool] | None = None, 

2604 symlink_fn: Callable[ 

2605 [str | bytes | os.PathLike[str], str | bytes | os.PathLike[str]], None 

2606 ] 

2607 | None = None, 

2608 force_remove_untracked: bool = False, 

2609 blob_normalizer: "FilterBlobNormalizer | None" = None, 

2610 tree_encoding: str = "utf-8", 

2611 allow_overwrite_modified: bool = False, 

2612) -> None: 

2613 """Update the working tree and index to match a new tree. 

2614 

2615 This function handles: 

2616 - Adding new files 

2617 - Updating modified files 

2618 - Removing deleted files 

2619 - Cleaning up empty directories 

2620 

2621 Args: 

2622 repo: Repository object 

2623 old_tree_id: SHA of the tree before the update 

2624 new_tree_id: SHA of the tree to update to 

2625 change_iterator: Iterator of TreeChange objects to apply 

2626 honor_filemode: An optional flag to honor core.filemode setting 

2627 validate_path_element: Function to validate path elements to check out 

2628 symlink_fn: Function to use for creating symlinks 

2629 force_remove_untracked: If True, remove files that exist in working 

2630 directory but not in target tree, even if old_tree_id is None 

2631 blob_normalizer: An optional BlobNormalizer to use for converting line 

2632 endings when writing blobs to the working directory. 

2633 tree_encoding: Encoding used for tree paths (default: utf-8) 

2634 allow_overwrite_modified: If False, raise an error when attempting to 

2635 overwrite files that have been modified compared to old_tree_id 

2636 """ 

2637 if validate_path_element is None: 

2638 validate_path_element = validate_path_element_default 

2639 

2640 from .diff_tree import ( 

2641 CHANGE_ADD, 

2642 CHANGE_COPY, 

2643 CHANGE_DELETE, 

2644 CHANGE_MODIFY, 

2645 CHANGE_RENAME, 

2646 CHANGE_UNCHANGED, 

2647 ) 

2648 

2649 repo_path = repo.path if isinstance(repo.path, bytes) else repo.path.encode() 

2650 index = repo.open_index() 

2651 

2652 # Convert iterator to list since we need multiple passes 

2653 changes = list(change_iterator) 

2654 

2655 # Transform case-only renames on case-insensitive filesystems 

2656 import platform 

2657 

2658 default_ignore_case = platform.system() in ("Windows", "Darwin") 

2659 config = repo.get_config() 

2660 ignore_case = config.get_boolean((b"core",), b"ignorecase", default_ignore_case) 

2661 

2662 if ignore_case: 

2663 config = repo.get_config() 

2664 changes = detect_case_only_renames(changes, config) 

2665 

2666 # Check for path conflicts where files need to become directories 

2667 paths_becoming_dirs = set() 

2668 for change in changes: 

2669 if change.type in (CHANGE_ADD, CHANGE_MODIFY, CHANGE_RENAME, CHANGE_COPY): 

2670 assert change.new is not None 

2671 path = change.new.path 

2672 assert path is not None 

2673 if b"/" in path: # This is a file inside a directory 

2674 # Check if any parent path exists as a file in the old tree or changes 

2675 parts = path.split(b"/") 

2676 for i in range(1, len(parts)): 

2677 parent = b"/".join(parts[:i]) 

2678 # See if this parent path is being deleted (was a file, becoming a dir) 

2679 for other_change in changes: 

2680 if ( 

2681 other_change.type == CHANGE_DELETE 

2682 and other_change.old 

2683 and other_change.old.path == parent 

2684 ): 

2685 paths_becoming_dirs.add(parent) 

2686 

2687 # Check if any path that needs to become a directory has been modified 

2688 for path in paths_becoming_dirs: 

2689 full_path = _tree_to_fs_path(repo_path, path, tree_encoding) 

2690 try: 

2691 current_stat = os.lstat(full_path) 

2692 except FileNotFoundError: 

2693 continue # File doesn't exist, nothing to check 

2694 except OSError as e: 

2695 raise OSError( 

2696 f"Cannot access {path.decode('utf-8', errors='replace')}: {e}" 

2697 ) from e 

2698 

2699 if stat.S_ISREG(current_stat.st_mode): 

2700 # Find the old entry for this path 

2701 old_change = None 

2702 for change in changes: 

2703 if ( 

2704 change.type == CHANGE_DELETE 

2705 and change.old 

2706 and change.old.path == path 

2707 ): 

2708 old_change = change 

2709 break 

2710 

2711 if old_change: 

2712 # Check if file has been modified 

2713 assert old_change.old is not None 

2714 assert ( 

2715 old_change.old.sha is not None and old_change.old.mode is not None 

2716 ) 

2717 file_matches = _check_file_matches( 

2718 repo.object_store, 

2719 full_path, 

2720 old_change.old.sha, 

2721 old_change.old.mode, 

2722 current_stat, 

2723 honor_filemode, 

2724 blob_normalizer, 

2725 path, 

2726 ) 

2727 if not file_matches: 

2728 raise OSError( 

2729 f"Cannot replace modified file with directory: {path!r}" 

2730 ) 

2731 

2732 # Check for uncommitted modifications before making any changes 

2733 if not allow_overwrite_modified and old_tree_id: 

2734 for change in changes: 

2735 # Only check files that are being modified or deleted 

2736 if change.type in (CHANGE_MODIFY, CHANGE_DELETE) and change.old: 

2737 path = change.old.path 

2738 assert path is not None 

2739 if path.startswith(b".git") or not validate_path( 

2740 path, validate_path_element 

2741 ): 

2742 continue 

2743 

2744 full_path = _tree_to_fs_path(repo_path, path, tree_encoding) 

2745 try: 

2746 current_stat = os.lstat(full_path) 

2747 except FileNotFoundError: 

2748 continue # File doesn't exist, nothing to check 

2749 except OSError as e: 

2750 raise OSError( 

2751 f"Cannot access {path.decode('utf-8', errors='replace')}: {e}" 

2752 ) from e 

2753 

2754 if stat.S_ISREG(current_stat.st_mode): 

2755 # Check if working tree file differs from old tree 

2756 assert change.old.sha is not None and change.old.mode is not None 

2757 file_matches = _check_file_matches( 

2758 repo.object_store, 

2759 full_path, 

2760 change.old.sha, 

2761 change.old.mode, 

2762 current_stat, 

2763 honor_filemode, 

2764 blob_normalizer, 

2765 path, 

2766 ) 

2767 if not file_matches: 

2768 from .errors import WorkingTreeModifiedError 

2769 

2770 raise WorkingTreeModifiedError( 

2771 f"Your local changes to '{path.decode('utf-8', errors='replace')}' " 

2772 f"would be overwritten by checkout. " 

2773 f"Please commit your changes or stash them before you switch branches." 

2774 ) 

2775 

2776 # Apply the changes 

2777 for change in changes: 

2778 if change.type in (CHANGE_DELETE, CHANGE_RENAME): 

2779 # Remove file/directory 

2780 assert change.old is not None and change.old.path is not None 

2781 path = change.old.path 

2782 if path.startswith(b".git") or not validate_path( 

2783 path, validate_path_element 

2784 ): 

2785 continue 

2786 

2787 full_path = _tree_to_fs_path(repo_path, path, tree_encoding) 

2788 try: 

2789 delete_stat: os.stat_result | None = os.lstat(full_path) 

2790 except FileNotFoundError: 

2791 delete_stat = None 

2792 except OSError as e: 

2793 raise OSError( 

2794 f"Cannot access {path.decode('utf-8', errors='replace')}: {e}" 

2795 ) from e 

2796 

2797 _transition_to_absent(repo, path, full_path, delete_stat, index) 

2798 

2799 if change.type in ( 

2800 CHANGE_ADD, 

2801 CHANGE_MODIFY, 

2802 CHANGE_UNCHANGED, 

2803 CHANGE_COPY, 

2804 CHANGE_RENAME, 

2805 ): 

2806 # Add or modify file 

2807 assert ( 

2808 change.new is not None 

2809 and change.new.path is not None 

2810 and change.new.mode is not None 

2811 ) 

2812 path = change.new.path 

2813 if path.startswith(b".git") or not validate_path( 

2814 path, validate_path_element 

2815 ): 

2816 continue 

2817 

2818 full_path = _tree_to_fs_path(repo_path, path, tree_encoding) 

2819 try: 

2820 modify_stat: os.stat_result | None = os.lstat(full_path) 

2821 except FileNotFoundError: 

2822 modify_stat = None 

2823 except OSError as e: 

2824 raise OSError( 

2825 f"Cannot access {path.decode('utf-8', errors='replace')}: {e}" 

2826 ) from e 

2827 

2828 if S_ISGITLINK(change.new.mode): 

2829 _transition_to_submodule( 

2830 repo, path, full_path, modify_stat, change.new, index 

2831 ) 

2832 else: 

2833 _transition_to_file( 

2834 repo.object_store, 

2835 path, 

2836 full_path, 

2837 modify_stat, 

2838 change.new, 

2839 index, 

2840 honor_filemode, 

2841 symlink_fn, 

2842 blob_normalizer, 

2843 tree_encoding, 

2844 ) 

2845 

2846 index.write() 

2847 

2848 

2849def _stat_matches_entry(st: os.stat_result, entry: IndexEntry) -> bool: 

2850 """Check if filesystem stat matches index entry stat. 

2851 

2852 This is used to determine if a file might have changed without reading its content. 

2853 Git uses this optimization to avoid expensive filter operations on unchanged files. 

2854 

2855 Args: 

2856 st: Filesystem stat result 

2857 entry: Index entry to compare against 

2858 Returns: True if stat matches and file is likely unchanged 

2859 """ 

2860 # Get entry mtime with nanosecond precision if available 

2861 if isinstance(entry.mtime, tuple): 

2862 entry_mtime_sec = entry.mtime[0] 

2863 entry_mtime_nsec = entry.mtime[1] 

2864 else: 

2865 entry_mtime_sec = int(entry.mtime) 

2866 entry_mtime_nsec = 0 

2867 

2868 # Compare modification time with nanosecond precision if available 

2869 # This is important for fast workflows (e.g., stash) where files can be 

2870 # modified multiple times within the same second 

2871 if hasattr(st, "st_mtime_ns"): 

2872 # Use nanosecond precision when available 

2873 st_mtime_nsec = st.st_mtime_ns 

2874 entry_mtime_nsec_total = entry_mtime_sec * 1_000_000_000 + entry_mtime_nsec 

2875 if st_mtime_nsec != entry_mtime_nsec_total: 

2876 return False 

2877 else: 

2878 # Fall back to second precision 

2879 if int(st.st_mtime) != entry_mtime_sec: 

2880 return False 

2881 

2882 # Compare file size 

2883 if st.st_size != entry.size: 

2884 return False 

2885 

2886 # If both mtime and size match, file is likely unchanged 

2887 return True 

2888 

2889 

2890def _check_entry_for_changes( 

2891 tree_path: bytes, 

2892 entry: IndexEntry | ConflictedIndexEntry, 

2893 root_path: bytes, 

2894 filter_blob_callback: Callable[[Blob, bytes], Blob] | None = None, 

2895) -> bytes | None: 

2896 """Check a single index entry for changes. 

2897 

2898 Args: 

2899 tree_path: Path in the tree 

2900 entry: Index entry to check 

2901 root_path: Root filesystem path 

2902 filter_blob_callback: Optional callback to filter blobs 

2903 Returns: tree_path if changed, None otherwise 

2904 """ 

2905 if isinstance(entry, ConflictedIndexEntry): 

2906 # Conflicted files are always unstaged 

2907 return tree_path 

2908 

2909 full_path = _tree_to_fs_path(root_path, tree_path) 

2910 try: 

2911 st = os.lstat(full_path) 

2912 if stat.S_ISDIR(st.st_mode): 

2913 if _has_directory_changed(tree_path, entry): 

2914 return tree_path 

2915 return None 

2916 

2917 if not stat.S_ISREG(st.st_mode) and not stat.S_ISLNK(st.st_mode): 

2918 return None 

2919 

2920 # Optimization: If stat matches index entry (mtime and size unchanged), 

2921 # we can skip reading and filtering the file entirely. This is a significant 

2922 # performance improvement for repositories with many unchanged files. 

2923 # Even with filters (e.g., LFS), if the file hasn't been modified (stat unchanged), 

2924 # the filter output would be the same, so we can safely skip the expensive 

2925 # filter operation. This addresses performance issues with LFS repositories 

2926 # where filter operations can be very slow. 

2927 if _stat_matches_entry(st, entry): 

2928 return None 

2929 

2930 blob = blob_from_path_and_stat(full_path, st) 

2931 

2932 if filter_blob_callback is not None: 

2933 blob = filter_blob_callback(blob, tree_path) 

2934 except FileNotFoundError: 

2935 # The file was removed, so we assume that counts as 

2936 # different from whatever file used to exist. 

2937 return tree_path 

2938 else: 

2939 if blob.id != entry.sha: 

2940 return tree_path 

2941 return None 

2942 

2943 

2944def get_unstaged_changes( 

2945 index: Index, 

2946 root_path: str | bytes, 

2947 filter_blob_callback: Callable[..., Any] | None = None, 

2948 preload_index: bool = False, 

2949) -> Generator[bytes, None, None]: 

2950 """Walk through an index and check for differences against working tree. 

2951 

2952 Args: 

2953 index: index to check 

2954 root_path: path in which to find files 

2955 filter_blob_callback: Optional callback to filter blobs 

2956 preload_index: If True, use parallel threads to check files (requires threading support) 

2957 Returns: iterator over paths with unstaged changes 

2958 """ 

2959 # For each entry in the index check the sha1 & ensure not staged 

2960 if not isinstance(root_path, bytes): 

2961 root_path = os.fsencode(root_path) 

2962 

2963 if preload_index: 

2964 # Use parallel processing for better performance on slow filesystems 

2965 try: 

2966 import multiprocessing 

2967 from concurrent.futures import ThreadPoolExecutor 

2968 except ImportError: 

2969 # If threading is not available, fall back to serial processing 

2970 preload_index = False 

2971 else: 

2972 # Collect all entries first 

2973 entries = list(index.iteritems()) 

2974 

2975 # Use number of CPUs but cap at 8 threads to avoid overhead 

2976 num_workers = min(multiprocessing.cpu_count(), 8) 

2977 

2978 # Process entries in parallel 

2979 with ThreadPoolExecutor(max_workers=num_workers) as executor: 

2980 # Submit all tasks 

2981 futures = [ 

2982 executor.submit( 

2983 _check_entry_for_changes, 

2984 tree_path, 

2985 entry, 

2986 root_path, 

2987 filter_blob_callback, 

2988 ) 

2989 for tree_path, entry in entries 

2990 ] 

2991 

2992 # Yield results as they complete 

2993 for future in futures: 

2994 result = future.result() 

2995 if result is not None: 

2996 yield result 

2997 

2998 if not preload_index: 

2999 # Serial processing 

3000 for tree_path, entry in index.iteritems(): 

3001 result = _check_entry_for_changes( 

3002 tree_path, entry, root_path, filter_blob_callback 

3003 ) 

3004 if result is not None: 

3005 yield result 

3006 

3007 

3008def _tree_to_fs_path( 

3009 root_path: bytes, tree_path: bytes, tree_encoding: str = "utf-8" 

3010) -> bytes: 

3011 """Convert a git tree path to a file system path. 

3012 

3013 Args: 

3014 root_path: Root filesystem path 

3015 tree_path: Git tree path as bytes (encoded with tree_encoding) 

3016 tree_encoding: Encoding used for tree paths (default: utf-8) 

3017 

3018 Returns: File system path. 

3019 """ 

3020 assert isinstance(tree_path, bytes) 

3021 if os_sep_bytes != b"/": 

3022 sep_corrected_path = tree_path.replace(b"/", os_sep_bytes) 

3023 else: 

3024 sep_corrected_path = tree_path 

3025 

3026 # On Windows, we need to handle tree path encoding properly 

3027 if sys.platform == "win32": 

3028 # Decode from tree encoding, then re-encode for filesystem 

3029 try: 

3030 tree_path_str = sep_corrected_path.decode(tree_encoding) 

3031 sep_corrected_path = os.fsencode(tree_path_str) 

3032 except UnicodeDecodeError: 

3033 # If decoding fails, use the original bytes 

3034 pass 

3035 

3036 return os.path.join(root_path, sep_corrected_path) 

3037 

3038 

3039def _fs_to_tree_path(fs_path: str | bytes, tree_encoding: str = "utf-8") -> bytes: 

3040 """Convert a file system path to a git tree path. 

3041 

3042 Args: 

3043 fs_path: File system path. 

3044 tree_encoding: Encoding to use for tree paths (default: utf-8) 

3045 

3046 Returns: Git tree path as bytes (encoded with tree_encoding) 

3047 """ 

3048 if not isinstance(fs_path, bytes): 

3049 fs_path_bytes = os.fsencode(fs_path) 

3050 else: 

3051 fs_path_bytes = fs_path 

3052 

3053 # On Windows, we need to ensure tree paths are properly encoded 

3054 if sys.platform == "win32": 

3055 try: 

3056 # Decode from filesystem encoding, then re-encode with tree encoding 

3057 fs_path_str = os.fsdecode(fs_path_bytes) 

3058 fs_path_bytes = fs_path_str.encode(tree_encoding) 

3059 except UnicodeDecodeError: 

3060 # If filesystem decoding fails, use the original bytes 

3061 pass 

3062 

3063 if os_sep_bytes != b"/": 

3064 tree_path = fs_path_bytes.replace(os_sep_bytes, b"/") 

3065 else: 

3066 tree_path = fs_path_bytes 

3067 return tree_path 

3068 

3069 

3070def index_entry_from_directory(st: os.stat_result, path: bytes) -> IndexEntry | None: 

3071 """Create an index entry for a directory. 

3072 

3073 This is only used for submodules (directories containing .git). 

3074 

3075 Args: 

3076 st: Stat result for the directory 

3077 path: Path to the directory 

3078 

3079 Returns: 

3080 IndexEntry for a submodule, or None if not a submodule 

3081 """ 

3082 if os.path.exists(os.path.join(path, b".git")): 

3083 head = read_submodule_head(path) 

3084 if head is None: 

3085 return None 

3086 return index_entry_from_stat(st, head, mode=S_IFGITLINK) 

3087 return None 

3088 

3089 

3090def index_entry_from_path( 

3091 path: bytes, object_store: ObjectContainer | None = None 

3092) -> IndexEntry | None: 

3093 """Create an index from a filesystem path. 

3094 

3095 This returns an index value for files, symlinks 

3096 and tree references. for directories and 

3097 non-existent files it returns None 

3098 

3099 Args: 

3100 path: Path to create an index entry for 

3101 object_store: Optional object store to 

3102 save new blobs in 

3103 Returns: An index entry; None for directories 

3104 """ 

3105 assert isinstance(path, bytes) 

3106 st = os.lstat(path) 

3107 if stat.S_ISDIR(st.st_mode): 

3108 return index_entry_from_directory(st, path) 

3109 

3110 if stat.S_ISREG(st.st_mode) or stat.S_ISLNK(st.st_mode): 

3111 blob = blob_from_path_and_stat(path, st) 

3112 if object_store is not None: 

3113 object_store.add_object(blob) 

3114 return index_entry_from_stat(st, blob.id) 

3115 

3116 return None 

3117 

3118 

3119def iter_fresh_entries( 

3120 paths: Iterable[bytes], 

3121 root_path: bytes, 

3122 object_store: ObjectContainer | None = None, 

3123) -> Iterator[tuple[bytes, IndexEntry | None]]: 

3124 """Iterate over current versions of index entries on disk. 

3125 

3126 Args: 

3127 paths: Paths to iterate over 

3128 root_path: Root path to access from 

3129 object_store: Optional store to save new blobs in 

3130 Returns: Iterator over path, index_entry 

3131 """ 

3132 for path in paths: 

3133 p = _tree_to_fs_path(root_path, path) 

3134 try: 

3135 entry = index_entry_from_path(p, object_store=object_store) 

3136 except (FileNotFoundError, IsADirectoryError): 

3137 entry = None 

3138 yield path, entry 

3139 

3140 

3141def iter_fresh_objects( 

3142 paths: Iterable[bytes], 

3143 root_path: bytes, 

3144 include_deleted: bool = False, 

3145 object_store: ObjectContainer | None = None, 

3146) -> Iterator[tuple[bytes, ObjectID | None, int | None]]: 

3147 """Iterate over versions of objects on disk referenced by index. 

3148 

3149 Args: 

3150 paths: Paths to check 

3151 root_path: Root path to access from 

3152 include_deleted: Include deleted entries with sha and 

3153 mode set to None 

3154 object_store: Optional object store to report new items to 

3155 Returns: Iterator over path, sha, mode 

3156 """ 

3157 for path, entry in iter_fresh_entries(paths, root_path, object_store=object_store): 

3158 if entry is None: 

3159 if include_deleted: 

3160 yield path, None, None 

3161 else: 

3162 yield path, entry.sha, cleanup_mode(entry.mode) 

3163 

3164 

3165def refresh_index(index: Index, root_path: bytes) -> None: 

3166 """Refresh the contents of an index. 

3167 

3168 This is the equivalent to running 'git commit -a'. 

3169 

3170 Args: 

3171 index: Index to update 

3172 root_path: Root filesystem path 

3173 """ 

3174 for path, entry in iter_fresh_entries(index, root_path): 

3175 if entry: 

3176 index[path] = entry 

3177 

3178 

3179class locked_index: 

3180 """Lock the index while making modifications. 

3181 

3182 Works as a context manager. 

3183 """ 

3184 

3185 _file: "_GitFile" 

3186 

3187 def __init__(self, path: bytes | str) -> None: 

3188 """Initialize locked_index.""" 

3189 self._path = path 

3190 

3191 def __enter__(self) -> Index: 

3192 """Enter context manager and lock index.""" 

3193 f = GitFile(self._path, "wb") 

3194 self._file = f 

3195 self._index = Index(self._path) 

3196 return self._index 

3197 

3198 def __exit__( 

3199 self, 

3200 exc_type: type | None, 

3201 exc_value: BaseException | None, 

3202 traceback: types.TracebackType | None, 

3203 ) -> None: 

3204 """Exit context manager and unlock index.""" 

3205 if exc_type is not None: 

3206 self._file.abort() 

3207 return 

3208 try: 

3209 f = SHA1Writer(self._file) 

3210 write_index_dict(f, self._index._byname) 

3211 except BaseException: 

3212 self._file.abort() 

3213 else: 

3214 f.close()