Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/dulwich/index.py: 30%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

1292 statements  

1# index.py -- File parser/writer for the git index file 

2# Copyright (C) 2008-2013 Jelmer Vernooij <jelmer@jelmer.uk> 

3# 

4# SPDX-License-Identifier: Apache-2.0 OR GPL-2.0-or-later 

5# Dulwich is dual-licensed under the Apache License, Version 2.0 and the GNU 

6# General Public License as published by the Free Software Foundation; version 2.0 

7# or (at your option) any later version. You can redistribute it and/or 

8# modify it under the terms of either of these two licenses. 

9# 

10# Unless required by applicable law or agreed to in writing, software 

11# distributed under the License is distributed on an "AS IS" BASIS, 

12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

13# See the License for the specific language governing permissions and 

14# limitations under the License. 

15# 

16# You should have received a copy of the licenses; if not, see 

17# <http://www.gnu.org/licenses/> for a copy of the GNU General Public License 

18# and <http://www.apache.org/licenses/LICENSE-2.0> for a copy of the Apache 

19# License, Version 2.0. 

20# 

21 

22"""Parser for the git index file format.""" 

23 

24__all__ = [ 

25 "DEFAULT_VERSION", 

26 "EOIE_EXTENSION", 

27 "EXTENDED_FLAG_INTEND_TO_ADD", 

28 "EXTENDED_FLAG_SKIP_WORKTREE", 

29 "FLAG_EXTENDED", 

30 "FLAG_NAMEMASK", 

31 "FLAG_STAGEMASK", 

32 "FLAG_STAGESHIFT", 

33 "FLAG_VALID", 

34 "HFS_IGNORABLE_CHARS", 

35 "IEOT_EXTENSION", 

36 "INVALID_DOTNAMES", 

37 "REUC_EXTENSION", 

38 "SDIR_EXTENSION", 

39 "TREE_EXTENSION", 

40 "UNTR_EXTENSION", 

41 "Index", 

42 "IndexEntry", 

43 "IndexExtension", 

44 "ResolveUndoExtension", 

45 "SerializedIndexEntry", 

46 "SparseDirExtension", 

47 "Stage", 

48 "TreeDict", 

49 "TreeExtension", 

50 "UnmergedEntries", 

51 "UnsupportedIndexFormat", 

52 "UntrackedExtension", 

53 "blob_from_path_and_mode", 

54 "blob_from_path_and_stat", 

55 "build_file_from_blob", 

56 "build_index_from_tree", 

57 "changes_from_tree", 

58 "cleanup_mode", 

59 "commit_index", 

60 "commit_tree", 

61 "detect_case_only_renames", 

62 "get_path_element_normalizer", 

63 "get_unstaged_changes", 

64 "index_entry_from_stat", 

65 "make_path_normalizer", 

66 "pathjoin", 

67 "pathsplit", 

68 "read_cache_entry", 

69 "read_cache_time", 

70 "read_index", 

71 "read_index_dict", 

72 "read_index_dict_with_version", 

73 "read_index_header", 

74 "read_submodule_head", 

75 "update_working_tree", 

76 "validate_path", 

77 "validate_path_element_default", 

78 "validate_path_element_hfs", 

79 "validate_path_element_ntfs", 

80 "write_cache_entry", 

81 "write_cache_time", 

82 "write_index", 

83 "write_index_dict", 

84 "write_index_extension", 

85] 

86 

87import errno 

88import os 

89import shutil 

90import stat 

91import struct 

92import sys 

93import types 

94from collections.abc import ( 

95 Callable, 

96 Generator, 

97 Iterable, 

98 Iterator, 

99 Mapping, 

100 Sequence, 

101 Set, 

102) 

103from dataclasses import dataclass 

104from enum import Enum 

105from typing import ( 

106 IO, 

107 TYPE_CHECKING, 

108 Any, 

109 BinaryIO, 

110) 

111 

112if TYPE_CHECKING: 

113 from .config import Config 

114 from .diff_tree import TreeChange 

115 from .file import _GitFile 

116 from .filters import FilterBlobNormalizer 

117 from .object_store import BaseObjectStore 

118 from .repo import Repo 

119 

120from .file import GitFile 

121from .object_store import iter_tree_contents 

122from .objects import ( 

123 S_IFGITLINK, 

124 S_ISGITLINK, 

125 Blob, 

126 ObjectID, 

127 Tree, 

128 TreeEntry, 

129 hex_to_sha, 

130 sha_to_hex, 

131) 

132from .pack import ObjectContainer, SHA1Reader, SHA1Writer 

133 

134# Type alias for recursive tree structure used in commit_tree 

135TreeDict = dict[bytes, "TreeDict | tuple[int, ObjectID]"] 

136 

137# 2-bit stage (during merge) 

138FLAG_STAGEMASK = 0x3000 

139FLAG_STAGESHIFT = 12 

140FLAG_NAMEMASK = 0x0FFF 

141 

142# assume-valid 

143FLAG_VALID = 0x8000 

144 

145# extended flag (must be zero in version 2) 

146FLAG_EXTENDED = 0x4000 

147 

148# used by sparse checkout 

149EXTENDED_FLAG_SKIP_WORKTREE = 0x4000 

150 

151# used by "git add -N" 

152EXTENDED_FLAG_INTEND_TO_ADD = 0x2000 

153 

154DEFAULT_VERSION = 2 

155 

156# Index extension signatures 

157TREE_EXTENSION = b"TREE" 

158REUC_EXTENSION = b"REUC" 

159UNTR_EXTENSION = b"UNTR" 

160EOIE_EXTENSION = b"EOIE" 

161IEOT_EXTENSION = b"IEOT" 

162SDIR_EXTENSION = b"sdir" # Sparse directory extension 

163 

164 

165def _encode_varint(value: int) -> bytes: 

166 """Encode an integer using variable-width encoding. 

167 

168 Same format as used for OFS_DELTA pack entries and index v4 path compression. 

169 Uses 7 bits per byte, with the high bit indicating continuation. 

170 

171 Args: 

172 value: Integer to encode 

173 Returns: 

174 Encoded bytes 

175 """ 

176 if value == 0: 

177 return b"\x00" 

178 

179 result = [] 

180 while value > 0: 

181 byte = value & 0x7F # Take lower 7 bits 

182 value >>= 7 

183 if value > 0: 

184 byte |= 0x80 # Set continuation bit 

185 result.append(byte) 

186 

187 return bytes(result) 

188 

189 

190def _decode_varint(data: bytes, offset: int = 0) -> tuple[int, int]: 

191 """Decode a variable-width encoded integer. 

192 

193 Args: 

194 data: Bytes to decode from 

195 offset: Starting offset in data 

196 Returns: 

197 tuple of (decoded_value, new_offset) 

198 """ 

199 value = 0 

200 shift = 0 

201 pos = offset 

202 

203 while pos < len(data): 

204 byte = data[pos] 

205 pos += 1 

206 value |= (byte & 0x7F) << shift 

207 shift += 7 

208 if not (byte & 0x80): # No continuation bit 

209 break 

210 

211 return value, pos 

212 

213 

214def _compress_path(path: bytes, previous_path: bytes) -> bytes: 

215 """Compress a path relative to the previous path for index version 4. 

216 

217 Args: 

218 path: Path to compress 

219 previous_path: Previous path for comparison 

220 Returns: 

221 Compressed path data (varint prefix_len + suffix) 

222 """ 

223 # Find the common prefix length 

224 common_len = 0 

225 min_len = min(len(path), len(previous_path)) 

226 

227 for i in range(min_len): 

228 if path[i] == previous_path[i]: 

229 common_len += 1 

230 else: 

231 break 

232 

233 # The number of bytes to remove from the end of previous_path 

234 # to get the common prefix 

235 remove_len = len(previous_path) - common_len 

236 

237 # The suffix to append 

238 suffix = path[common_len:] 

239 

240 # Encode: varint(remove_len) + suffix + NUL 

241 return _encode_varint(remove_len) + suffix + b"\x00" 

242 

243 

244def _decompress_path( 

245 data: bytes, offset: int, previous_path: bytes 

246) -> tuple[bytes, int]: 

247 """Decompress a path from index version 4 compressed format. 

248 

249 Args: 

250 data: Raw data containing compressed path 

251 offset: Starting offset in data 

252 previous_path: Previous path for decompression 

253 Returns: 

254 tuple of (decompressed_path, new_offset) 

255 """ 

256 # Decode the number of bytes to remove from previous path 

257 remove_len, new_offset = _decode_varint(data, offset) 

258 

259 # Find the NUL terminator for the suffix 

260 suffix_start = new_offset 

261 suffix_end = suffix_start 

262 while suffix_end < len(data) and data[suffix_end] != 0: 

263 suffix_end += 1 

264 

265 if suffix_end >= len(data): 

266 raise ValueError("Unterminated path suffix in compressed entry") 

267 

268 suffix = data[suffix_start:suffix_end] 

269 new_offset = suffix_end + 1 # Skip the NUL terminator 

270 

271 # Reconstruct the path 

272 if remove_len > len(previous_path): 

273 raise ValueError( 

274 f"Invalid path compression: trying to remove {remove_len} bytes from {len(previous_path)}-byte path" 

275 ) 

276 

277 prefix = previous_path[:-remove_len] if remove_len > 0 else previous_path 

278 path = prefix + suffix 

279 

280 return path, new_offset 

281 

282 

283def _decompress_path_from_stream( 

284 f: BinaryIO, previous_path: bytes 

285) -> tuple[bytes, int]: 

286 """Decompress a path from index version 4 compressed format, reading from stream. 

287 

288 Args: 

289 f: File-like object to read from 

290 previous_path: Previous path for decompression 

291 Returns: 

292 tuple of (decompressed_path, bytes_consumed) 

293 """ 

294 # Decode the varint for remove_len by reading byte by byte 

295 remove_len = 0 

296 shift = 0 

297 bytes_consumed = 0 

298 

299 while True: 

300 byte_data = f.read(1) 

301 if not byte_data: 

302 raise ValueError("Unexpected end of file while reading varint") 

303 byte = byte_data[0] 

304 bytes_consumed += 1 

305 remove_len |= (byte & 0x7F) << shift 

306 shift += 7 

307 if not (byte & 0x80): # No continuation bit 

308 break 

309 

310 # Read the suffix until NUL terminator 

311 suffix = b"" 

312 while True: 

313 byte_data = f.read(1) 

314 if not byte_data: 

315 raise ValueError("Unexpected end of file while reading path suffix") 

316 byte = byte_data[0] 

317 bytes_consumed += 1 

318 if byte == 0: # NUL terminator 

319 break 

320 suffix += bytes([byte]) 

321 

322 # Reconstruct the path 

323 if remove_len > len(previous_path): 

324 raise ValueError( 

325 f"Invalid path compression: trying to remove {remove_len} bytes from {len(previous_path)}-byte path" 

326 ) 

327 

328 prefix = previous_path[:-remove_len] if remove_len > 0 else previous_path 

329 path = prefix + suffix 

330 

331 return path, bytes_consumed 

332 

333 

334class Stage(Enum): 

335 """Represents the stage of an index entry during merge conflicts.""" 

336 

337 NORMAL = 0 

338 MERGE_CONFLICT_ANCESTOR = 1 

339 MERGE_CONFLICT_THIS = 2 

340 MERGE_CONFLICT_OTHER = 3 

341 

342 

343@dataclass 

344class SerializedIndexEntry: 

345 """Represents a serialized index entry as stored in the index file. 

346 

347 This dataclass holds the raw data for an index entry before it's 

348 parsed into the more user-friendly IndexEntry format. 

349 """ 

350 

351 name: bytes 

352 ctime: int | float | tuple[int, int] 

353 mtime: int | float | tuple[int, int] 

354 dev: int 

355 ino: int 

356 mode: int 

357 uid: int 

358 gid: int 

359 size: int 

360 sha: ObjectID 

361 flags: int 

362 extended_flags: int 

363 

364 def stage(self) -> Stage: 

365 """Extract the stage from the flags field. 

366 

367 Returns: 

368 Stage enum value indicating merge conflict state 

369 """ 

370 return Stage((self.flags & FLAG_STAGEMASK) >> FLAG_STAGESHIFT) 

371 

372 def is_sparse_dir(self) -> bool: 

373 """Check if this entry represents a sparse directory. 

374 

375 A sparse directory entry is a collapsed representation of an entire 

376 directory tree in a sparse index. It has: 

377 - Directory mode (0o040000) 

378 - SKIP_WORKTREE flag set 

379 - Path ending with '/' 

380 - SHA pointing to a tree object 

381 

382 Returns: 

383 True if entry is a sparse directory entry 

384 """ 

385 return ( 

386 stat.S_ISDIR(self.mode) 

387 and bool(self.extended_flags & EXTENDED_FLAG_SKIP_WORKTREE) 

388 and self.name.endswith(b"/") 

389 ) 

390 

391 

392@dataclass 

393class IndexExtension: 

394 """Base class for index extensions.""" 

395 

396 signature: bytes 

397 data: bytes 

398 

399 @classmethod 

400 def from_raw(cls, signature: bytes, data: bytes) -> "IndexExtension": 

401 """Create an extension from raw data. 

402 

403 Args: 

404 signature: 4-byte extension signature 

405 data: Extension data 

406 Returns: 

407 Parsed extension object 

408 """ 

409 if signature == TREE_EXTENSION: 

410 return TreeExtension.from_bytes(data) 

411 elif signature == REUC_EXTENSION: 

412 return ResolveUndoExtension.from_bytes(data) 

413 elif signature == UNTR_EXTENSION: 

414 return UntrackedExtension.from_bytes(data) 

415 elif signature == SDIR_EXTENSION: 

416 return SparseDirExtension.from_bytes(data) 

417 else: 

418 # Unknown extension - just store raw data 

419 return cls(signature, data) 

420 

421 def to_bytes(self) -> bytes: 

422 """Serialize extension to bytes.""" 

423 return self.data 

424 

425 

426class TreeExtension(IndexExtension): 

427 """Tree cache extension.""" 

428 

429 def __init__(self, entries: list[tuple[bytes, bytes, int]]) -> None: 

430 """Initialize TreeExtension. 

431 

432 Args: 

433 entries: List of tree cache entries (path, sha, flags) 

434 """ 

435 self.entries = entries 

436 super().__init__(TREE_EXTENSION, b"") 

437 

438 @classmethod 

439 def from_bytes(cls, data: bytes) -> "TreeExtension": 

440 """Parse TreeExtension from bytes. 

441 

442 Args: 

443 data: Raw bytes to parse 

444 

445 Returns: 

446 TreeExtension instance 

447 """ 

448 # TODO: Implement tree cache parsing 

449 return cls([]) 

450 

451 def to_bytes(self) -> bytes: 

452 """Serialize TreeExtension to bytes. 

453 

454 Returns: 

455 Serialized extension data 

456 """ 

457 # TODO: Implement tree cache serialization 

458 return b"" 

459 

460 

461class ResolveUndoExtension(IndexExtension): 

462 """Resolve undo extension for recording merge conflicts.""" 

463 

464 def __init__(self, entries: list[tuple[bytes, list[tuple[int, bytes]]]]) -> None: 

465 """Initialize ResolveUndoExtension. 

466 

467 Args: 

468 entries: List of (path, stages) where stages is a list of (stage, sha) tuples 

469 """ 

470 self.entries = entries 

471 super().__init__(REUC_EXTENSION, b"") 

472 

473 @classmethod 

474 def from_bytes(cls, data: bytes) -> "ResolveUndoExtension": 

475 """Parse ResolveUndoExtension from bytes. 

476 

477 Args: 

478 data: Raw bytes to parse 

479 

480 Returns: 

481 ResolveUndoExtension instance 

482 """ 

483 # TODO: Implement resolve undo parsing 

484 return cls([]) 

485 

486 def to_bytes(self) -> bytes: 

487 """Serialize ResolveUndoExtension to bytes. 

488 

489 Returns: 

490 Serialized extension data 

491 """ 

492 # TODO: Implement resolve undo serialization 

493 return b"" 

494 

495 

496class UntrackedExtension(IndexExtension): 

497 """Untracked cache extension.""" 

498 

499 def __init__(self, data: bytes) -> None: 

500 """Initialize UntrackedExtension. 

501 

502 Args: 

503 data: Raw untracked cache data 

504 """ 

505 super().__init__(UNTR_EXTENSION, data) 

506 

507 @classmethod 

508 def from_bytes(cls, data: bytes) -> "UntrackedExtension": 

509 """Parse UntrackedExtension from bytes. 

510 

511 Args: 

512 data: Raw bytes to parse 

513 

514 Returns: 

515 UntrackedExtension instance 

516 """ 

517 return cls(data) 

518 

519 

520class SparseDirExtension(IndexExtension): 

521 """Sparse directory extension. 

522 

523 This extension indicates that the index contains sparse directory entries. 

524 Tools that don't understand sparse index should avoid interacting with 

525 the index when this extension is present. 

526 

527 The extension data is empty - its presence is the signal. 

528 """ 

529 

530 def __init__(self) -> None: 

531 """Initialize SparseDirExtension.""" 

532 super().__init__(SDIR_EXTENSION, b"") 

533 

534 @classmethod 

535 def from_bytes(cls, data: bytes) -> "SparseDirExtension": 

536 """Parse SparseDirExtension from bytes. 

537 

538 Args: 

539 data: Raw bytes to parse (should be empty) 

540 

541 Returns: 

542 SparseDirExtension instance 

543 """ 

544 return cls() 

545 

546 def to_bytes(self) -> bytes: 

547 """Serialize SparseDirExtension to bytes. 

548 

549 Returns: 

550 Empty bytes (extension presence is the signal) 

551 """ 

552 return b"" 

553 

554 

555@dataclass 

556class IndexEntry: 

557 """Represents an entry in the Git index. 

558 

559 This is a higher-level representation of an index entry that includes 

560 parsed data and convenience methods. 

561 """ 

562 

563 ctime: int | float | tuple[int, int] 

564 mtime: int | float | tuple[int, int] 

565 dev: int 

566 ino: int 

567 mode: int 

568 uid: int 

569 gid: int 

570 size: int 

571 sha: ObjectID 

572 flags: int = 0 

573 extended_flags: int = 0 

574 

575 @classmethod 

576 def from_serialized(cls, serialized: SerializedIndexEntry) -> "IndexEntry": 

577 """Create an IndexEntry from a SerializedIndexEntry. 

578 

579 Args: 

580 serialized: SerializedIndexEntry to convert 

581 

582 Returns: 

583 New IndexEntry instance 

584 """ 

585 return cls( 

586 ctime=serialized.ctime, 

587 mtime=serialized.mtime, 

588 dev=serialized.dev, 

589 ino=serialized.ino, 

590 mode=serialized.mode, 

591 uid=serialized.uid, 

592 gid=serialized.gid, 

593 size=serialized.size, 

594 sha=serialized.sha, 

595 flags=serialized.flags, 

596 extended_flags=serialized.extended_flags, 

597 ) 

598 

599 def serialize(self, name: bytes, stage: Stage) -> SerializedIndexEntry: 

600 """Serialize this entry with a given name and stage. 

601 

602 Args: 

603 name: Path name for the entry 

604 stage: Merge conflict stage 

605 

606 Returns: 

607 SerializedIndexEntry ready for writing to disk 

608 """ 

609 # Clear out any existing stage bits, then set them from the Stage. 

610 new_flags = self.flags & ~FLAG_STAGEMASK 

611 new_flags |= stage.value << FLAG_STAGESHIFT 

612 return SerializedIndexEntry( 

613 name=name, 

614 ctime=self.ctime, 

615 mtime=self.mtime, 

616 dev=self.dev, 

617 ino=self.ino, 

618 mode=self.mode, 

619 uid=self.uid, 

620 gid=self.gid, 

621 size=self.size, 

622 sha=self.sha, 

623 flags=new_flags, 

624 extended_flags=self.extended_flags, 

625 ) 

626 

627 def stage(self) -> Stage: 

628 """Get the merge conflict stage of this entry. 

629 

630 Returns: 

631 Stage enum value 

632 """ 

633 return Stage((self.flags & FLAG_STAGEMASK) >> FLAG_STAGESHIFT) 

634 

635 @property 

636 def skip_worktree(self) -> bool: 

637 """Return True if the skip-worktree bit is set in extended_flags.""" 

638 return bool(self.extended_flags & EXTENDED_FLAG_SKIP_WORKTREE) 

639 

640 def set_skip_worktree(self, skip: bool = True) -> None: 

641 """Helper method to set or clear the skip-worktree bit in extended_flags. 

642 

643 Also sets FLAG_EXTENDED in self.flags if needed. 

644 """ 

645 if skip: 

646 # Turn on the skip-worktree bit 

647 self.extended_flags |= EXTENDED_FLAG_SKIP_WORKTREE 

648 # Also ensure the main 'extended' bit is set in flags 

649 self.flags |= FLAG_EXTENDED 

650 else: 

651 # Turn off the skip-worktree bit 

652 self.extended_flags &= ~EXTENDED_FLAG_SKIP_WORKTREE 

653 # Optionally unset the main extended bit if no extended flags remain 

654 if self.extended_flags == 0: 

655 self.flags &= ~FLAG_EXTENDED 

656 

657 def is_sparse_dir(self, name: bytes) -> bool: 

658 """Check if this entry represents a sparse directory. 

659 

660 A sparse directory entry is a collapsed representation of an entire 

661 directory tree in a sparse index. It has: 

662 - Directory mode (0o040000) 

663 - SKIP_WORKTREE flag set 

664 - Path ending with '/' 

665 - SHA pointing to a tree object 

666 

667 Args: 

668 name: The path name for this entry (IndexEntry doesn't store name) 

669 

670 Returns: 

671 True if entry is a sparse directory entry 

672 """ 

673 return ( 

674 stat.S_ISDIR(self.mode) 

675 and bool(self.extended_flags & EXTENDED_FLAG_SKIP_WORKTREE) 

676 and name.endswith(b"/") 

677 ) 

678 

679 

680class ConflictedIndexEntry: 

681 """Index entry that represents a conflict.""" 

682 

683 ancestor: IndexEntry | None 

684 this: IndexEntry | None 

685 other: IndexEntry | None 

686 

687 def __init__( 

688 self, 

689 ancestor: IndexEntry | None = None, 

690 this: IndexEntry | None = None, 

691 other: IndexEntry | None = None, 

692 ) -> None: 

693 """Initialize ConflictedIndexEntry. 

694 

695 Args: 

696 ancestor: The common ancestor entry 

697 this: The current branch entry 

698 other: The other branch entry 

699 """ 

700 self.ancestor = ancestor 

701 self.this = this 

702 self.other = other 

703 

704 

705class UnmergedEntries(Exception): 

706 """Unmerged entries exist in the index.""" 

707 

708 

709def pathsplit(path: bytes) -> tuple[bytes, bytes]: 

710 """Split a /-delimited path into a directory part and a basename. 

711 

712 Args: 

713 path: The path to split. 

714 

715 Returns: 

716 Tuple with directory name and basename 

717 """ 

718 try: 

719 (dirname, basename) = path.rsplit(b"/", 1) 

720 except ValueError: 

721 return (b"", path) 

722 else: 

723 return (dirname, basename) 

724 

725 

726def pathjoin(*args: bytes) -> bytes: 

727 """Join a /-delimited path.""" 

728 return b"/".join([p for p in args if p]) 

729 

730 

731def read_cache_time(f: BinaryIO) -> tuple[int, int]: 

732 """Read a cache time. 

733 

734 Args: 

735 f: File-like object to read from 

736 Returns: 

737 Tuple with seconds and nanoseconds 

738 """ 

739 return struct.unpack(">LL", f.read(8)) 

740 

741 

742def write_cache_time(f: IO[bytes], t: int | float | tuple[int, int]) -> None: 

743 """Write a cache time. 

744 

745 Args: 

746 f: File-like object to write to 

747 t: Time to write (as int, float or tuple with secs and nsecs) 

748 """ 

749 if isinstance(t, int): 

750 t = (t, 0) 

751 elif isinstance(t, float): 

752 (secs, nsecs) = divmod(t, 1.0) 

753 t = (int(secs), int(nsecs * 1000000000)) 

754 elif not isinstance(t, tuple): 

755 raise TypeError(t) 

756 f.write(struct.pack(">LL", *t)) 

757 

758 

759def read_cache_entry( 

760 f: BinaryIO, version: int, previous_path: bytes = b"" 

761) -> SerializedIndexEntry: 

762 """Read an entry from a cache file. 

763 

764 Args: 

765 f: File-like object to read from 

766 version: Index version 

767 previous_path: Previous entry's path (for version 4 compression) 

768 """ 

769 beginoffset = f.tell() 

770 ctime = read_cache_time(f) 

771 mtime = read_cache_time(f) 

772 ( 

773 dev, 

774 ino, 

775 mode, 

776 uid, 

777 gid, 

778 size, 

779 sha, 

780 flags, 

781 ) = struct.unpack(">LLLLLL20sH", f.read(20 + 4 * 6 + 2)) 

782 if flags & FLAG_EXTENDED: 

783 if version < 3: 

784 raise AssertionError("extended flag set in index with version < 3") 

785 (extended_flags,) = struct.unpack(">H", f.read(2)) 

786 else: 

787 extended_flags = 0 

788 

789 if version >= 4: 

790 # Version 4: paths are always compressed (name_len should be 0) 

791 name, _consumed = _decompress_path_from_stream(f, previous_path) 

792 else: 

793 # Versions < 4: regular name reading 

794 name = f.read(flags & FLAG_NAMEMASK) 

795 

796 # Padding: 

797 if version < 4: 

798 real_size = (f.tell() - beginoffset + 8) & ~7 

799 f.read((beginoffset + real_size) - f.tell()) 

800 

801 return SerializedIndexEntry( 

802 name, 

803 ctime, 

804 mtime, 

805 dev, 

806 ino, 

807 mode, 

808 uid, 

809 gid, 

810 size, 

811 sha_to_hex(sha), 

812 flags & ~FLAG_NAMEMASK, 

813 extended_flags, 

814 ) 

815 

816 

817def write_cache_entry( 

818 f: IO[bytes], entry: SerializedIndexEntry, version: int, previous_path: bytes = b"" 

819) -> None: 

820 """Write an index entry to a file. 

821 

822 Args: 

823 f: File object 

824 entry: IndexEntry to write 

825 version: Index format version 

826 previous_path: Previous entry's path (for version 4 compression) 

827 """ 

828 beginoffset = f.tell() 

829 write_cache_time(f, entry.ctime) 

830 write_cache_time(f, entry.mtime) 

831 

832 if version >= 4: 

833 # Version 4: use compression but set name_len to actual filename length 

834 # This matches how C Git implements index v4 flags 

835 compressed_path = _compress_path(entry.name, previous_path) 

836 flags = len(entry.name) | (entry.flags & ~FLAG_NAMEMASK) 

837 else: 

838 # Versions < 4: include actual name length 

839 flags = len(entry.name) | (entry.flags & ~FLAG_NAMEMASK) 

840 

841 if entry.extended_flags: 

842 flags |= FLAG_EXTENDED 

843 if flags & FLAG_EXTENDED and version is not None and version < 3: 

844 raise AssertionError("unable to use extended flags in version < 3") 

845 

846 f.write( 

847 struct.pack( 

848 b">LLLLLL20sH", 

849 entry.dev & 0xFFFFFFFF, 

850 entry.ino & 0xFFFFFFFF, 

851 entry.mode, 

852 entry.uid, 

853 entry.gid, 

854 entry.size, 

855 hex_to_sha(entry.sha), 

856 flags, 

857 ) 

858 ) 

859 if flags & FLAG_EXTENDED: 

860 f.write(struct.pack(b">H", entry.extended_flags)) 

861 

862 if version >= 4: 

863 # Version 4: always write compressed path 

864 f.write(compressed_path) 

865 else: 

866 # Versions < 4: write regular path and padding 

867 f.write(entry.name) 

868 real_size = (f.tell() - beginoffset + 8) & ~7 

869 f.write(b"\0" * ((beginoffset + real_size) - f.tell())) 

870 

871 

872class UnsupportedIndexFormat(Exception): 

873 """An unsupported index format was encountered.""" 

874 

875 def __init__(self, version: int) -> None: 

876 """Initialize UnsupportedIndexFormat exception. 

877 

878 Args: 

879 version: The unsupported index format version 

880 """ 

881 self.index_format_version = version 

882 

883 

884def read_index_header(f: BinaryIO) -> tuple[int, int]: 

885 """Read an index header from a file. 

886 

887 Returns: 

888 tuple of (version, num_entries) 

889 """ 

890 header = f.read(4) 

891 if header != b"DIRC": 

892 raise AssertionError(f"Invalid index file header: {header!r}") 

893 (version, num_entries) = struct.unpack(b">LL", f.read(4 * 2)) 

894 if version not in (1, 2, 3, 4): 

895 raise UnsupportedIndexFormat(version) 

896 return version, num_entries 

897 

898 

899def write_index_extension(f: IO[bytes], extension: IndexExtension) -> None: 

900 """Write an index extension. 

901 

902 Args: 

903 f: File-like object to write to 

904 extension: Extension to write 

905 """ 

906 data = extension.to_bytes() 

907 f.write(extension.signature) 

908 f.write(struct.pack(">I", len(data))) 

909 f.write(data) 

910 

911 

912def read_index(f: BinaryIO) -> Iterator[SerializedIndexEntry]: 

913 """Read an index file, yielding the individual entries.""" 

914 version, num_entries = read_index_header(f) 

915 previous_path = b"" 

916 for i in range(num_entries): 

917 entry = read_cache_entry(f, version, previous_path) 

918 previous_path = entry.name 

919 yield entry 

920 

921 

922def read_index_dict_with_version( 

923 f: BinaryIO, 

924) -> tuple[dict[bytes, IndexEntry | ConflictedIndexEntry], int, list[IndexExtension]]: 

925 """Read an index file and return it as a dictionary along with the version. 

926 

927 Returns: 

928 tuple of (entries_dict, version, extensions) 

929 """ 

930 version, num_entries = read_index_header(f) 

931 

932 ret: dict[bytes, IndexEntry | ConflictedIndexEntry] = {} 

933 previous_path = b"" 

934 for i in range(num_entries): 

935 entry = read_cache_entry(f, version, previous_path) 

936 previous_path = entry.name 

937 stage = entry.stage() 

938 if stage == Stage.NORMAL: 

939 ret[entry.name] = IndexEntry.from_serialized(entry) 

940 else: 

941 existing = ret.setdefault(entry.name, ConflictedIndexEntry()) 

942 if isinstance(existing, IndexEntry): 

943 raise AssertionError(f"Non-conflicted entry for {entry.name!r} exists") 

944 if stage == Stage.MERGE_CONFLICT_ANCESTOR: 

945 existing.ancestor = IndexEntry.from_serialized(entry) 

946 elif stage == Stage.MERGE_CONFLICT_THIS: 

947 existing.this = IndexEntry.from_serialized(entry) 

948 elif stage == Stage.MERGE_CONFLICT_OTHER: 

949 existing.other = IndexEntry.from_serialized(entry) 

950 

951 # Read extensions 

952 extensions = [] 

953 while True: 

954 # Check if we're at the end (20 bytes before EOF for SHA checksum) 

955 current_pos = f.tell() 

956 f.seek(0, 2) # EOF 

957 eof_pos = f.tell() 

958 f.seek(current_pos) 

959 

960 if current_pos >= eof_pos - 20: 

961 break 

962 

963 # Try to read extension signature 

964 signature = f.read(4) 

965 if len(signature) < 4: 

966 break 

967 

968 # Check if it's a valid extension signature (4 uppercase letters) 

969 if not all(65 <= b <= 90 for b in signature): 

970 # Not an extension, seek back 

971 f.seek(-4, 1) 

972 break 

973 

974 # Read extension size 

975 size_data = f.read(4) 

976 if len(size_data) < 4: 

977 break 

978 size = struct.unpack(">I", size_data)[0] 

979 

980 # Read extension data 

981 data = f.read(size) 

982 if len(data) < size: 

983 break 

984 

985 extension = IndexExtension.from_raw(signature, data) 

986 extensions.append(extension) 

987 

988 return ret, version, extensions 

989 

990 

991def read_index_dict( 

992 f: BinaryIO, 

993) -> dict[bytes, IndexEntry | ConflictedIndexEntry]: 

994 """Read an index file and return it as a dictionary. 

995 

996 Dict Key is tuple of path and stage number, as 

997 path alone is not unique 

998 Args: 

999 f: File object to read fromls. 

1000 """ 

1001 ret: dict[bytes, IndexEntry | ConflictedIndexEntry] = {} 

1002 for entry in read_index(f): 

1003 stage = entry.stage() 

1004 if stage == Stage.NORMAL: 

1005 ret[entry.name] = IndexEntry.from_serialized(entry) 

1006 else: 

1007 existing = ret.setdefault(entry.name, ConflictedIndexEntry()) 

1008 if isinstance(existing, IndexEntry): 

1009 raise AssertionError(f"Non-conflicted entry for {entry.name!r} exists") 

1010 if stage == Stage.MERGE_CONFLICT_ANCESTOR: 

1011 existing.ancestor = IndexEntry.from_serialized(entry) 

1012 elif stage == Stage.MERGE_CONFLICT_THIS: 

1013 existing.this = IndexEntry.from_serialized(entry) 

1014 elif stage == Stage.MERGE_CONFLICT_OTHER: 

1015 existing.other = IndexEntry.from_serialized(entry) 

1016 return ret 

1017 

1018 

1019def write_index( 

1020 f: IO[bytes], 

1021 entries: Sequence[SerializedIndexEntry], 

1022 version: int | None = None, 

1023 extensions: Sequence[IndexExtension] | None = None, 

1024) -> None: 

1025 """Write an index file. 

1026 

1027 Args: 

1028 f: File-like object to write to 

1029 version: Version number to write 

1030 entries: Iterable over the entries to write 

1031 extensions: Optional list of extensions to write 

1032 """ 

1033 if version is None: 

1034 version = DEFAULT_VERSION 

1035 # STEP 1: check if any extended_flags are set 

1036 uses_extended_flags = any(e.extended_flags != 0 for e in entries) 

1037 if uses_extended_flags and version < 3: 

1038 # Force or bump the version to 3 

1039 version = 3 

1040 # The rest is unchanged, but you might insert a final check: 

1041 if version < 3: 

1042 # Double-check no extended flags appear 

1043 for e in entries: 

1044 if e.extended_flags != 0: 

1045 raise AssertionError("Attempt to use extended flags in index < v3") 

1046 # Proceed with the existing code to write the header and entries. 

1047 f.write(b"DIRC") 

1048 f.write(struct.pack(b">LL", version, len(entries))) 

1049 previous_path = b"" 

1050 for entry in entries: 

1051 write_cache_entry(f, entry, version=version, previous_path=previous_path) 

1052 previous_path = entry.name 

1053 

1054 # Write extensions 

1055 if extensions: 

1056 for extension in extensions: 

1057 write_index_extension(f, extension) 

1058 

1059 

1060def write_index_dict( 

1061 f: IO[bytes], 

1062 entries: Mapping[bytes, IndexEntry | ConflictedIndexEntry], 

1063 version: int | None = None, 

1064 extensions: Sequence[IndexExtension] | None = None, 

1065) -> None: 

1066 """Write an index file based on the contents of a dictionary. 

1067 

1068 being careful to sort by path and then by stage. 

1069 """ 

1070 entries_list = [] 

1071 for key in sorted(entries): 

1072 value = entries[key] 

1073 if isinstance(value, ConflictedIndexEntry): 

1074 if value.ancestor is not None: 

1075 entries_list.append( 

1076 value.ancestor.serialize(key, Stage.MERGE_CONFLICT_ANCESTOR) 

1077 ) 

1078 if value.this is not None: 

1079 entries_list.append( 

1080 value.this.serialize(key, Stage.MERGE_CONFLICT_THIS) 

1081 ) 

1082 if value.other is not None: 

1083 entries_list.append( 

1084 value.other.serialize(key, Stage.MERGE_CONFLICT_OTHER) 

1085 ) 

1086 else: 

1087 entries_list.append(value.serialize(key, Stage.NORMAL)) 

1088 

1089 write_index(f, entries_list, version=version, extensions=extensions) 

1090 

1091 

1092def cleanup_mode(mode: int) -> int: 

1093 """Cleanup a mode value. 

1094 

1095 This will return a mode that can be stored in a tree object. 

1096 

1097 Args: 

1098 mode: Mode to clean up. 

1099 

1100 Returns: 

1101 mode 

1102 """ 

1103 if stat.S_ISLNK(mode): 

1104 return stat.S_IFLNK 

1105 elif stat.S_ISDIR(mode): 

1106 return stat.S_IFDIR 

1107 elif S_ISGITLINK(mode): 

1108 return S_IFGITLINK 

1109 ret = stat.S_IFREG | 0o644 

1110 if mode & 0o100: 

1111 ret |= 0o111 

1112 return ret 

1113 

1114 

1115class Index: 

1116 """A Git Index file.""" 

1117 

1118 _byname: dict[bytes, IndexEntry | ConflictedIndexEntry] 

1119 

1120 def __init__( 

1121 self, 

1122 filename: bytes | str | os.PathLike[str], 

1123 read: bool = True, 

1124 skip_hash: bool = False, 

1125 version: int | None = None, 

1126 *, 

1127 file_mode: int | None = None, 

1128 path_normalizer: Callable[[bytes], bytes] | None = None, 

1129 ) -> None: 

1130 """Create an index object associated with the given filename. 

1131 

1132 Args: 

1133 filename: Path to the index file 

1134 read: Whether to initialize the index from the given file, should it exist. 

1135 skip_hash: Whether to skip SHA1 hash when writing (for manyfiles feature) 

1136 version: Index format version to use (None = auto-detect from file or use default) 

1137 file_mode: Optional file permission mask for shared repository 

1138 path_normalizer: Optional function mapping a filesystem path to a 

1139 canonical form (e.g. case-folded, NFC-normalized). When provided, 

1140 lookups (``index[path]``, ``path in index``, ``del index[path]``) 

1141 transparently match paths that normalize to the same form as an 

1142 existing entry. 

1143 """ 

1144 self._filename = os.fspath(filename) 

1145 # TODO(jelmer): Store the version returned by read_index 

1146 self._version = version 

1147 self._skip_hash = skip_hash 

1148 self._file_mode = file_mode 

1149 self._extensions: list[IndexExtension] = [] 

1150 self._path_normalizer = path_normalizer 

1151 self._normalized: dict[bytes, bytes] | None = ( 

1152 {} if path_normalizer is not None else None 

1153 ) 

1154 self.clear() 

1155 if read: 

1156 self.read() 

1157 

1158 def canonical_path(self, name: bytes) -> bytes: 

1159 """Resolve ``name`` to the canonical key stored in the index. 

1160 

1161 If an entry already exists under ``name`` (or no normalizer is 

1162 configured), ``name`` is returned unchanged. Otherwise the 

1163 normalizer is applied and the key of any entry with the same 

1164 normalized form is returned. Falls back to ``name`` if none. 

1165 

1166 Normally callers do not need this because ``index[name]``, 

1167 ``name in index``, and ``del index[name]`` already apply 

1168 normalization transparently. Use this when the path is also 

1169 being used outside the index (for example to look up the same 

1170 entry in a commit tree), so that both sides agree on the key. 

1171 """ 

1172 if self._normalized is None or name in self._byname: 

1173 return name 

1174 assert self._path_normalizer is not None 

1175 return self._normalized.get(self._path_normalizer(name), name) 

1176 

1177 @property 

1178 def path(self) -> bytes | str: 

1179 """Get the path to the index file. 

1180 

1181 Returns: 

1182 Path to the index file 

1183 """ 

1184 return self._filename 

1185 

1186 def __repr__(self) -> str: 

1187 """Return string representation of Index.""" 

1188 return f"{self.__class__.__name__}({self._filename!r})" 

1189 

1190 def write(self) -> None: 

1191 """Write current contents of index to disk.""" 

1192 mask = self._file_mode if self._file_mode is not None else 0o644 

1193 f = GitFile(self._filename, "wb", mask=mask) 

1194 try: 

1195 # Filter out extensions with no meaningful data 

1196 meaningful_extensions = [] 

1197 for ext in self._extensions: 

1198 # Skip extensions that have empty data 

1199 ext_data = ext.to_bytes() 

1200 if ext_data: 

1201 meaningful_extensions.append(ext) 

1202 

1203 if self._skip_hash: 

1204 # When skipHash is enabled, write the index without computing SHA1 

1205 write_index_dict( 

1206 f, 

1207 self._byname, 

1208 version=self._version, 

1209 extensions=meaningful_extensions, 

1210 ) 

1211 # Write 20 zero bytes instead of SHA1 

1212 f.write(b"\x00" * 20) 

1213 f.close() 

1214 else: 

1215 sha1_writer = SHA1Writer(f) 

1216 write_index_dict( 

1217 sha1_writer, 

1218 self._byname, 

1219 version=self._version, 

1220 extensions=meaningful_extensions, 

1221 ) 

1222 sha1_writer.close() 

1223 except: 

1224 f.close() 

1225 raise 

1226 

1227 def read(self) -> None: 

1228 """Read current contents of index from disk.""" 

1229 if not os.path.exists(self._filename): 

1230 return 

1231 f = GitFile(self._filename, "rb") 

1232 try: 

1233 sha1_reader = SHA1Reader(f) 

1234 entries, version, extensions = read_index_dict_with_version(sha1_reader) 

1235 self._version = version 

1236 self._extensions = extensions 

1237 self.update(entries) 

1238 # Extensions have already been read by read_index_dict_with_version 

1239 sha1_reader.check_sha(allow_empty=True) 

1240 finally: 

1241 f.close() 

1242 

1243 def __len__(self) -> int: 

1244 """Number of entries in this index file.""" 

1245 return len(self._byname) 

1246 

1247 def __getitem__(self, key: bytes) -> IndexEntry | ConflictedIndexEntry: 

1248 """Retrieve entry by relative path and stage. 

1249 

1250 Returns: Either a IndexEntry or a ConflictedIndexEntry 

1251 Raises KeyError: if the entry does not exist 

1252 """ 

1253 return self._byname[self.canonical_path(key)] 

1254 

1255 def __iter__(self) -> Iterator[bytes]: 

1256 """Iterate over the paths and stages in this index.""" 

1257 return iter(self._byname) 

1258 

1259 def __contains__(self, key: bytes) -> bool: 

1260 """Check if a path exists in the index.""" 

1261 return self.canonical_path(key) in self._byname 

1262 

1263 def get_sha1(self, path: bytes) -> ObjectID: 

1264 """Return the (git object) SHA1 for the object at a path.""" 

1265 value = self[path] 

1266 if isinstance(value, ConflictedIndexEntry): 

1267 raise UnmergedEntries 

1268 return value.sha 

1269 

1270 def get_mode(self, path: bytes) -> int: 

1271 """Return the POSIX file mode for the object at a path.""" 

1272 value = self[path] 

1273 if isinstance(value, ConflictedIndexEntry): 

1274 raise UnmergedEntries 

1275 return value.mode 

1276 

1277 def iterobjects(self) -> Iterable[tuple[bytes, ObjectID, int]]: 

1278 """Iterate over path, sha, mode tuples for use with commit_tree.""" 

1279 for path in self: 

1280 entry = self[path] 

1281 if isinstance(entry, ConflictedIndexEntry): 

1282 raise UnmergedEntries 

1283 yield path, entry.sha, cleanup_mode(entry.mode) 

1284 

1285 def has_conflicts(self) -> bool: 

1286 """Check if the index contains any conflicted entries. 

1287 

1288 Returns: 

1289 True if any entries are conflicted, False otherwise 

1290 """ 

1291 for value in self._byname.values(): 

1292 if isinstance(value, ConflictedIndexEntry): 

1293 return True 

1294 return False 

1295 

1296 def clear(self) -> None: 

1297 """Remove all contents from this index.""" 

1298 self._byname = {} 

1299 if self._normalized is not None: 

1300 self._normalized = {} 

1301 

1302 def __setitem__( 

1303 self, name: bytes, value: IndexEntry | ConflictedIndexEntry 

1304 ) -> None: 

1305 """Set an entry in the index.""" 

1306 assert isinstance(name, bytes) 

1307 name = self.canonical_path(name) 

1308 is_new = name not in self._byname 

1309 self._byname[name] = value 

1310 if is_new and self._normalized is not None: 

1311 assert self._path_normalizer is not None 

1312 self._normalized.setdefault(self._path_normalizer(name), name) 

1313 

1314 def __delitem__(self, name: bytes) -> None: 

1315 """Delete an entry from the index.""" 

1316 name = self.canonical_path(name) 

1317 del self._byname[name] 

1318 if self._normalized is not None: 

1319 assert self._path_normalizer is not None 

1320 normalized_key = self._path_normalizer(name) 

1321 if self._normalized.get(normalized_key) == name: 

1322 del self._normalized[normalized_key] 

1323 

1324 def iteritems( 

1325 self, 

1326 ) -> Iterator[tuple[bytes, IndexEntry | ConflictedIndexEntry]]: 

1327 """Iterate over (path, entry) pairs in the index. 

1328 

1329 Returns: 

1330 Iterator of (path, entry) tuples 

1331 """ 

1332 return iter(self._byname.items()) 

1333 

1334 def items(self) -> Iterator[tuple[bytes, IndexEntry | ConflictedIndexEntry]]: 

1335 """Get an iterator over (path, entry) pairs. 

1336 

1337 Returns: 

1338 Iterator of (path, entry) tuples 

1339 """ 

1340 return iter(self._byname.items()) 

1341 

1342 def update(self, entries: dict[bytes, IndexEntry | ConflictedIndexEntry]) -> None: 

1343 """Update the index with multiple entries. 

1344 

1345 Args: 

1346 entries: Dictionary mapping paths to index entries 

1347 """ 

1348 for key, value in entries.items(): 

1349 self[key] = value 

1350 

1351 def paths(self) -> Generator[bytes, None, None]: 

1352 """Generate all paths in the index. 

1353 

1354 Yields: 

1355 Path names as bytes 

1356 """ 

1357 yield from self._byname.keys() 

1358 

1359 def changes_from_tree( 

1360 self, 

1361 object_store: ObjectContainer, 

1362 tree: ObjectID, 

1363 want_unchanged: bool = False, 

1364 ) -> Generator[ 

1365 tuple[ 

1366 tuple[bytes | None, bytes | None], 

1367 tuple[int | None, int | None], 

1368 tuple[bytes | None, bytes | None], 

1369 ], 

1370 None, 

1371 None, 

1372 ]: 

1373 """Find the differences between the contents of this index and a tree. 

1374 

1375 Args: 

1376 object_store: Object store to use for retrieving tree contents 

1377 tree: SHA1 of the root tree 

1378 want_unchanged: Whether unchanged files should be reported 

1379 Returns: Iterator over tuples with (oldpath, newpath), (oldmode, 

1380 newmode), (oldsha, newsha) 

1381 """ 

1382 

1383 def lookup_entry(path: bytes) -> tuple[bytes, int]: 

1384 entry = self[path] 

1385 if hasattr(entry, "sha") and hasattr(entry, "mode"): 

1386 return entry.sha, cleanup_mode(entry.mode) 

1387 else: 

1388 # Handle ConflictedIndexEntry case 

1389 return b"", 0 

1390 

1391 yield from changes_from_tree( 

1392 self.paths(), 

1393 lookup_entry, 

1394 object_store, 

1395 tree, 

1396 want_unchanged=want_unchanged, 

1397 ) 

1398 

1399 def commit(self, object_store: ObjectContainer) -> ObjectID: 

1400 """Create a new tree from an index. 

1401 

1402 Args: 

1403 object_store: Object store to save the tree in 

1404 Returns: 

1405 Root tree SHA 

1406 """ 

1407 return commit_tree(object_store, self.iterobjects()) 

1408 

1409 def is_sparse(self) -> bool: 

1410 """Check if this index contains sparse directory entries. 

1411 

1412 Returns: 

1413 True if any sparse directory extension is present 

1414 """ 

1415 return any(isinstance(ext, SparseDirExtension) for ext in self._extensions) 

1416 

1417 def ensure_full_index(self, object_store: "BaseObjectStore") -> None: 

1418 """Expand all sparse directory entries into full file entries. 

1419 

1420 This converts a sparse index into a full index by recursively 

1421 expanding any sparse directory entries into their constituent files. 

1422 

1423 Args: 

1424 object_store: Object store to read tree objects from 

1425 

1426 Raises: 

1427 KeyError: If a tree object referenced by a sparse dir entry doesn't exist 

1428 """ 

1429 if not self.is_sparse(): 

1430 return 

1431 

1432 # Find all sparse directory entries 

1433 sparse_dirs = [] 

1434 for path, entry in list(self._byname.items()): 

1435 if isinstance(entry, IndexEntry) and entry.is_sparse_dir(path): 

1436 sparse_dirs.append((path, entry)) 

1437 

1438 # Expand each sparse directory 

1439 for path, entry in sparse_dirs: 

1440 # Remove the sparse directory entry 

1441 del self[path] 

1442 

1443 # Get the tree object 

1444 tree = object_store[entry.sha] 

1445 if not isinstance(tree, Tree): 

1446 raise ValueError(f"Sparse directory {path!r} points to non-tree object") 

1447 

1448 # Recursively add all entries from the tree 

1449 self._expand_tree(path.rstrip(b"/"), tree, object_store, entry) 

1450 

1451 # Remove the sparse directory extension 

1452 self._extensions = [ 

1453 ext for ext in self._extensions if not isinstance(ext, SparseDirExtension) 

1454 ] 

1455 

1456 def _expand_tree( 

1457 self, 

1458 prefix: bytes, 

1459 tree: Tree, 

1460 object_store: "BaseObjectStore", 

1461 template_entry: IndexEntry, 

1462 ) -> None: 

1463 """Recursively expand a tree into index entries. 

1464 

1465 Args: 

1466 prefix: Path prefix for entries (without trailing slash) 

1467 tree: Tree object to expand 

1468 object_store: Object store to read nested trees from 

1469 template_entry: Template entry to copy metadata from 

1470 """ 

1471 for name, mode, sha in tree.items(): 

1472 if prefix: 

1473 full_path = prefix + b"/" + name 

1474 else: 

1475 full_path = name 

1476 

1477 if stat.S_ISDIR(mode): 

1478 # Recursively expand subdirectories 

1479 subtree = object_store[sha] 

1480 if not isinstance(subtree, Tree): 

1481 raise ValueError( 

1482 f"Directory entry {full_path!r} points to non-tree object" 

1483 ) 

1484 self._expand_tree(full_path, subtree, object_store, template_entry) 

1485 else: 

1486 # Create an index entry for this file 

1487 # Use the template entry for metadata but with the file's sha and mode 

1488 new_entry = IndexEntry( 

1489 ctime=template_entry.ctime, 

1490 mtime=template_entry.mtime, 

1491 dev=template_entry.dev, 

1492 ino=template_entry.ino, 

1493 mode=mode, 

1494 uid=template_entry.uid, 

1495 gid=template_entry.gid, 

1496 size=0, # Size is unknown from tree 

1497 sha=sha, 

1498 flags=0, 

1499 extended_flags=0, # Don't copy skip-worktree flag 

1500 ) 

1501 self[full_path] = new_entry 

1502 

1503 def convert_to_sparse( 

1504 self, 

1505 object_store: "BaseObjectStore", 

1506 tree_sha: ObjectID, 

1507 sparse_dirs: Set[bytes], 

1508 ) -> None: 

1509 """Convert full index entries to sparse directory entries. 

1510 

1511 This collapses directories that are entirely outside the sparse 

1512 checkout cone into single sparse directory entries. 

1513 

1514 Args: 

1515 object_store: Object store to read tree objects 

1516 tree_sha: SHA of the tree (usually HEAD) to base sparse dirs on 

1517 sparse_dirs: Set of directory paths (with trailing /) to collapse 

1518 

1519 Raises: 

1520 KeyError: If tree_sha or a subdirectory doesn't exist 

1521 """ 

1522 if not sparse_dirs: 

1523 return 

1524 

1525 # Get the base tree 

1526 tree = object_store[tree_sha] 

1527 if not isinstance(tree, Tree): 

1528 raise ValueError(f"tree_sha {tree_sha!r} is not a tree object") 

1529 

1530 # For each sparse directory, find its tree SHA and create sparse entry 

1531 for dir_path in sparse_dirs: 

1532 dir_path_stripped = dir_path.rstrip(b"/") 

1533 

1534 # Find the tree SHA for this directory 

1535 subtree_sha = self._find_subtree_sha(tree, dir_path_stripped, object_store) 

1536 if subtree_sha is None: 

1537 # Directory doesn't exist in tree, skip it 

1538 continue 

1539 

1540 # Remove all entries under this directory 

1541 entries_to_remove = [ 

1542 path 

1543 for path in self._byname 

1544 if path.startswith(dir_path) or path == dir_path_stripped 

1545 ] 

1546 for path in entries_to_remove: 

1547 del self[path] 

1548 

1549 # Create a sparse directory entry 

1550 # Use minimal metadata since it's not a real file 

1551 from dulwich.objects import ObjectID 

1552 

1553 sparse_entry = IndexEntry( 

1554 ctime=0, 

1555 mtime=0, 

1556 dev=0, 

1557 ino=0, 

1558 mode=stat.S_IFDIR, 

1559 uid=0, 

1560 gid=0, 

1561 size=0, 

1562 sha=ObjectID(subtree_sha), 

1563 flags=0, 

1564 extended_flags=EXTENDED_FLAG_SKIP_WORKTREE, 

1565 ) 

1566 self[dir_path] = sparse_entry 

1567 

1568 # Add sparse directory extension if not present 

1569 if not self.is_sparse(): 

1570 self._extensions.append(SparseDirExtension()) 

1571 

1572 def _find_subtree_sha( 

1573 self, 

1574 tree: Tree, 

1575 path: bytes, 

1576 object_store: "BaseObjectStore", 

1577 ) -> bytes | None: 

1578 """Find the SHA of a subtree at a given path. 

1579 

1580 Args: 

1581 tree: Root tree object to search in 

1582 path: Path to the subdirectory (no trailing slash) 

1583 object_store: Object store to read nested trees from 

1584 

1585 Returns: 

1586 SHA of the subtree, or None if path doesn't exist 

1587 """ 

1588 if not path: 

1589 return tree.id 

1590 

1591 parts = path.split(b"/") 

1592 current_tree = tree 

1593 

1594 for part in parts: 

1595 # Look for this part in the current tree 

1596 try: 

1597 mode, sha = current_tree[part] 

1598 except KeyError: 

1599 return None 

1600 

1601 if not stat.S_ISDIR(mode): 

1602 # Path component is a file, not a directory 

1603 return None 

1604 

1605 # Load the next tree 

1606 obj = object_store[sha] 

1607 if not isinstance(obj, Tree): 

1608 return None 

1609 current_tree = obj 

1610 

1611 return current_tree.id 

1612 

1613 

1614def commit_tree( 

1615 object_store: ObjectContainer, blobs: Iterable[tuple[bytes, ObjectID, int]] 

1616) -> ObjectID: 

1617 """Commit a new tree. 

1618 

1619 Args: 

1620 object_store: Object store to add trees to 

1621 blobs: Iterable over blob path, sha, mode entries 

1622 Returns: 

1623 SHA1 of the created tree. 

1624 """ 

1625 trees: dict[bytes, TreeDict] = {b"": {}} 

1626 

1627 def add_tree(path: bytes) -> TreeDict: 

1628 if path in trees: 

1629 return trees[path] 

1630 dirname, basename = pathsplit(path) 

1631 t = add_tree(dirname) 

1632 assert isinstance(basename, bytes) 

1633 newtree: TreeDict = {} 

1634 t[basename] = newtree 

1635 trees[path] = newtree 

1636 return newtree 

1637 

1638 for path, sha, mode in blobs: 

1639 tree_path, basename = pathsplit(path) 

1640 tree = add_tree(tree_path) 

1641 tree[basename] = (mode, sha) 

1642 

1643 def build_tree(path: bytes) -> ObjectID: 

1644 tree = Tree() 

1645 for basename, entry in trees[path].items(): 

1646 if isinstance(entry, dict): 

1647 mode = stat.S_IFDIR 

1648 sha = build_tree(pathjoin(path, basename)) 

1649 else: 

1650 (mode, sha) = entry 

1651 tree.add(basename, mode, sha) 

1652 object_store.add_object(tree) 

1653 return tree.id 

1654 

1655 return build_tree(b"") 

1656 

1657 

1658def commit_index(object_store: ObjectContainer, index: Index) -> ObjectID: 

1659 """Create a new tree from an index. 

1660 

1661 Args: 

1662 object_store: Object store to save the tree in 

1663 index: Index file 

1664 Note: This function is deprecated, use index.commit() instead. 

1665 Returns: Root tree sha. 

1666 """ 

1667 return commit_tree(object_store, index.iterobjects()) 

1668 

1669 

1670def changes_from_tree( 

1671 names: Iterable[bytes], 

1672 lookup_entry: Callable[[bytes], tuple[bytes, int]], 

1673 object_store: ObjectContainer, 

1674 tree: ObjectID | None, 

1675 want_unchanged: bool = False, 

1676) -> Iterable[ 

1677 tuple[ 

1678 tuple[bytes | None, bytes | None], 

1679 tuple[int | None, int | None], 

1680 tuple[bytes | None, bytes | None], 

1681 ] 

1682]: 

1683 """Find the differences between the contents of a tree and a working copy. 

1684 

1685 Args: 

1686 names: Iterable of names in the working copy 

1687 lookup_entry: Function to lookup an entry in the working copy 

1688 object_store: Object store to use for retrieving tree contents 

1689 tree: SHA1 of the root tree, or None for an empty tree 

1690 want_unchanged: Whether unchanged files should be reported 

1691 Returns: Iterator over tuples with (oldpath, newpath), (oldmode, newmode), 

1692 (oldsha, newsha) 

1693 """ 

1694 # TODO(jelmer): Support a include_trees option 

1695 other_names = set(names) 

1696 

1697 if tree is not None: 

1698 for name, mode, sha in iter_tree_contents(object_store, tree): 

1699 assert name is not None and mode is not None and sha is not None 

1700 try: 

1701 (other_sha, other_mode) = lookup_entry(name) 

1702 except KeyError: 

1703 # Was removed 

1704 yield ((name, None), (mode, None), (sha, None)) 

1705 else: 

1706 other_names.remove(name) 

1707 if want_unchanged or other_sha != sha or other_mode != mode: 

1708 yield ((name, name), (mode, other_mode), (sha, other_sha)) 

1709 

1710 # Mention added files 

1711 for name in other_names: 

1712 try: 

1713 (other_sha, other_mode) = lookup_entry(name) 

1714 except KeyError: 

1715 pass 

1716 else: 

1717 yield ((None, name), (None, other_mode), (None, other_sha)) 

1718 

1719 

1720def index_entry_from_stat( 

1721 stat_val: os.stat_result, 

1722 hex_sha: bytes, 

1723 mode: int | None = None, 

1724) -> IndexEntry: 

1725 """Create a new index entry from a stat value. 

1726 

1727 Args: 

1728 stat_val: POSIX stat_result instance 

1729 hex_sha: Hex sha of the object 

1730 mode: Optional file mode, will be derived from stat if not provided 

1731 """ 

1732 if mode is None: 

1733 mode = cleanup_mode(stat_val.st_mode) 

1734 

1735 from dulwich.objects import ObjectID 

1736 

1737 # Use nanosecond precision when available to avoid precision loss 

1738 # through float representation 

1739 ctime: int | float | tuple[int, int] 

1740 mtime: int | float | tuple[int, int] 

1741 st_ctime_ns = getattr(stat_val, "st_ctime_ns", None) 

1742 if st_ctime_ns is not None: 

1743 ctime = ( 

1744 st_ctime_ns // 1_000_000_000, 

1745 st_ctime_ns % 1_000_000_000, 

1746 ) 

1747 else: 

1748 ctime = stat_val.st_ctime 

1749 

1750 st_mtime_ns = getattr(stat_val, "st_mtime_ns", None) 

1751 if st_mtime_ns is not None: 

1752 mtime = ( 

1753 st_mtime_ns // 1_000_000_000, 

1754 st_mtime_ns % 1_000_000_000, 

1755 ) 

1756 else: 

1757 mtime = stat_val.st_mtime 

1758 

1759 return IndexEntry( 

1760 ctime=ctime, 

1761 mtime=mtime, 

1762 dev=stat_val.st_dev, 

1763 ino=stat_val.st_ino, 

1764 mode=mode, 

1765 uid=stat_val.st_uid, 

1766 gid=stat_val.st_gid, 

1767 size=stat_val.st_size, 

1768 sha=ObjectID(hex_sha), 

1769 flags=0, 

1770 extended_flags=0, 

1771 ) 

1772 

1773 

1774if sys.platform == "win32": 

1775 # On Windows, creating symlinks either requires administrator privileges 

1776 # or developer mode. Raise a more helpful error when we're unable to 

1777 # create symlinks 

1778 

1779 # https://github.com/jelmer/dulwich/issues/1005 

1780 

1781 class WindowsSymlinkPermissionError(PermissionError): 

1782 """Windows-specific error for symlink creation failures. 

1783 

1784 This error is raised when symlink creation fails on Windows, 

1785 typically due to lack of developer mode or administrator privileges. 

1786 """ 

1787 

1788 def __init__(self, errno: int, msg: str, filename: str | None) -> None: 

1789 """Initialize WindowsSymlinkPermissionError.""" 

1790 super().__init__( 

1791 errno, 

1792 f"Unable to create symlink; do you have developer mode enabled? {msg}", 

1793 filename, 

1794 ) 

1795 

1796 def symlink( 

1797 src: str | bytes, 

1798 dst: str | bytes, 

1799 target_is_directory: bool = False, 

1800 *, 

1801 dir_fd: int | None = None, 

1802 ) -> None: 

1803 """Create a symbolic link on Windows with better error handling. 

1804 

1805 Args: 

1806 src: Source path for the symlink 

1807 dst: Destination path where symlink will be created 

1808 target_is_directory: Whether the target is a directory 

1809 dir_fd: Optional directory file descriptor 

1810 

1811 Raises: 

1812 WindowsSymlinkPermissionError: If symlink creation fails due to permissions 

1813 """ 

1814 try: 

1815 return os.symlink( 

1816 src, dst, target_is_directory=target_is_directory, dir_fd=dir_fd 

1817 ) 

1818 except PermissionError as e: 

1819 raise WindowsSymlinkPermissionError( 

1820 e.errno or 0, e.strerror or "", e.filename 

1821 ) from e 

1822else: 

1823 symlink = os.symlink 

1824 

1825 

1826def build_file_from_blob( 

1827 blob: Blob, 

1828 mode: int, 

1829 target_path: bytes, 

1830 *, 

1831 honor_filemode: bool = True, 

1832 tree_encoding: str = "utf-8", 

1833 symlink_fn: Callable[ 

1834 [str | bytes | os.PathLike[str], str | bytes | os.PathLike[str]], None 

1835 ] 

1836 | None = None, 

1837) -> os.stat_result: 

1838 """Build a file or symlink on disk based on a Git object. 

1839 

1840 Args: 

1841 blob: The git object 

1842 mode: File mode 

1843 target_path: Path to write to 

1844 honor_filemode: An optional flag to honor core.filemode setting in 

1845 config file, default is core.filemode=True, change executable bit 

1846 tree_encoding: Encoding to use for tree contents 

1847 symlink_fn: Function to use for creating symlinks 

1848 Returns: stat object for the file 

1849 """ 

1850 try: 

1851 oldstat = os.lstat(target_path) 

1852 except FileNotFoundError: 

1853 oldstat = None 

1854 contents = blob.as_raw_string() 

1855 if stat.S_ISLNK(mode): 

1856 if oldstat: 

1857 _remove_file_with_readonly_handling(target_path) 

1858 if sys.platform == "win32": 

1859 # os.readlink on Python3 on Windows requires a unicode string. 

1860 contents_str = contents.decode(tree_encoding) 

1861 target_path_str = target_path.decode(tree_encoding) 

1862 (symlink_fn or symlink)(contents_str, target_path_str) 

1863 else: 

1864 (symlink_fn or symlink)(contents, target_path) 

1865 else: 

1866 if oldstat is not None and oldstat.st_size == len(contents): 

1867 with open(target_path, "rb") as f: 

1868 if f.read() == contents: 

1869 return oldstat 

1870 

1871 with open(target_path, "wb") as f: 

1872 # Write out file 

1873 f.write(contents) 

1874 

1875 if honor_filemode: 

1876 os.chmod(target_path, mode) 

1877 

1878 return os.lstat(target_path) 

1879 

1880 

1881INVALID_DOTNAMES = (b".git", b".", b"..", b"") 

1882 

1883 

1884def _normalize_path_element_default(element: bytes) -> bytes: 

1885 """Normalize path element for default case-insensitive comparison.""" 

1886 return element.lower() 

1887 

1888 

1889def _normalize_path_element_ntfs(element: bytes) -> bytes: 

1890 """Normalize path element for NTFS filesystem.""" 

1891 return element.rstrip(b". ").lower() 

1892 

1893 

1894def _normalize_path_element_hfs(element: bytes) -> bytes: 

1895 """Normalize path element for HFS+ filesystem.""" 

1896 import unicodedata 

1897 

1898 # Decode to Unicode (let UnicodeDecodeError bubble up) 

1899 element_str = element.decode("utf-8", errors="strict") 

1900 

1901 # Remove HFS+ ignorable characters 

1902 filtered = "".join(c for c in element_str if ord(c) not in HFS_IGNORABLE_CHARS) 

1903 # Normalize to NFD 

1904 normalized = unicodedata.normalize("NFD", filtered) 

1905 return normalized.lower().encode("utf-8", errors="strict") 

1906 

1907 

1908def get_path_element_normalizer(config: "Config") -> Callable[[bytes], bytes]: 

1909 """Get the appropriate path element normalization function based on config. 

1910 

1911 Args: 

1912 config: Repository configuration object 

1913 

1914 Returns: 

1915 Function that normalizes path elements for the configured filesystem 

1916 """ 

1917 import os 

1918 import sys 

1919 

1920 if config.get_boolean(b"core", b"protectNTFS", os.name == "nt"): 

1921 return _normalize_path_element_ntfs 

1922 elif config.get_boolean(b"core", b"protectHFS", sys.platform == "darwin"): 

1923 return _normalize_path_element_hfs 

1924 else: 

1925 return _normalize_path_element_default 

1926 

1927 

1928def make_path_normalizer( 

1929 config: "Config", 

1930) -> Callable[[bytes], bytes] | None: 

1931 """Build a path normalizer honoring ``core.ignorecase`` and ``core.precomposeunicode``. 

1932 

1933 The returned callable maps a filesystem-form path to a canonical form 

1934 used to match equivalent paths (e.g. ``Foo.txt`` ↔ ``foo.txt`` when 

1935 ``core.ignorecase=true``, NFD ↔ NFC when ``core.precomposeunicode=true``). 

1936 Returns ``None`` when neither option is active so callers can skip the 

1937 comparison entirely. 

1938 """ 

1939 ignorecase = config.get_boolean(b"core", b"ignorecase", False) 

1940 precompose = config.get_boolean(b"core", b"precomposeunicode", False) 

1941 if not ignorecase and not precompose: 

1942 return None 

1943 

1944 def normalize(path: bytes) -> bytes: 

1945 if precompose: 

1946 import unicodedata 

1947 

1948 try: 

1949 path = unicodedata.normalize("NFC", path.decode("utf-8")).encode( 

1950 "utf-8" 

1951 ) 

1952 except UnicodeDecodeError: 

1953 pass 

1954 if ignorecase: 

1955 path = path.lower() 

1956 return path 

1957 

1958 return normalize 

1959 

1960 

1961def validate_path_element_default(element: bytes) -> bool: 

1962 """Validate a path element using default rules. 

1963 

1964 Args: 

1965 element: Path element to validate 

1966 

1967 Returns: 

1968 True if path element is valid, False otherwise 

1969 """ 

1970 return _normalize_path_element_default(element) not in INVALID_DOTNAMES 

1971 

1972 

1973def validate_path_element_ntfs(element: bytes) -> bool: 

1974 """Validate a path element using NTFS filesystem rules. 

1975 

1976 Args: 

1977 element: Path element to validate 

1978 

1979 Returns: 

1980 True if path element is valid for NTFS, False otherwise 

1981 """ 

1982 normalized = _normalize_path_element_ntfs(element) 

1983 if normalized in INVALID_DOTNAMES: 

1984 return False 

1985 if normalized == b"git~1": 

1986 return False 

1987 return True 

1988 

1989 

1990# HFS+ ignorable Unicode codepoints (from Git's utf8.c) 

1991HFS_IGNORABLE_CHARS = { 

1992 0x200C, # ZERO WIDTH NON-JOINER 

1993 0x200D, # ZERO WIDTH JOINER 

1994 0x200E, # LEFT-TO-RIGHT MARK 

1995 0x200F, # RIGHT-TO-LEFT MARK 

1996 0x202A, # LEFT-TO-RIGHT EMBEDDING 

1997 0x202B, # RIGHT-TO-LEFT EMBEDDING 

1998 0x202C, # POP DIRECTIONAL FORMATTING 

1999 0x202D, # LEFT-TO-RIGHT OVERRIDE 

2000 0x202E, # RIGHT-TO-LEFT OVERRIDE 

2001 0x206A, # INHIBIT SYMMETRIC SWAPPING 

2002 0x206B, # ACTIVATE SYMMETRIC SWAPPING 

2003 0x206C, # INHIBIT ARABIC FORM SHAPING 

2004 0x206D, # ACTIVATE ARABIC FORM SHAPING 

2005 0x206E, # NATIONAL DIGIT SHAPES 

2006 0x206F, # NOMINAL DIGIT SHAPES 

2007 0xFEFF, # ZERO WIDTH NO-BREAK SPACE 

2008} 

2009 

2010 

2011def validate_path_element_hfs(element: bytes) -> bool: 

2012 """Validate path element for HFS+ filesystem. 

2013 

2014 Equivalent to Git's is_hfs_dotgit and related checks. 

2015 Uses NFD normalization and ignores HFS+ ignorable characters. 

2016 """ 

2017 try: 

2018 normalized = _normalize_path_element_hfs(element) 

2019 except UnicodeDecodeError: 

2020 # Malformed UTF-8 - be conservative and reject 

2021 return False 

2022 

2023 # Check against invalid names 

2024 if normalized in INVALID_DOTNAMES: 

2025 return False 

2026 

2027 # Also check for 8.3 short name 

2028 if normalized == b"git~1": 

2029 return False 

2030 

2031 return True 

2032 

2033 

2034def validate_path( 

2035 path: bytes, 

2036 element_validator: Callable[[bytes], bool] = validate_path_element_default, 

2037) -> bool: 

2038 """Default path validator that just checks for .git/.""" 

2039 parts = path.split(b"/") 

2040 for p in parts: 

2041 if not element_validator(p): 

2042 return False 

2043 else: 

2044 return True 

2045 

2046 

2047def build_index_from_tree( 

2048 root_path: str | bytes, 

2049 index_path: str | bytes, 

2050 object_store: ObjectContainer, 

2051 tree_id: ObjectID, 

2052 honor_filemode: bool = True, 

2053 validate_path_element: Callable[[bytes], bool] = validate_path_element_default, 

2054 symlink_fn: Callable[ 

2055 [str | bytes | os.PathLike[str], str | bytes | os.PathLike[str]], None 

2056 ] 

2057 | None = None, 

2058 blob_normalizer: "FilterBlobNormalizer | None" = None, 

2059 tree_encoding: str = "utf-8", 

2060) -> None: 

2061 """Generate and materialize index from a tree. 

2062 

2063 Args: 

2064 tree_id: Tree to materialize 

2065 root_path: Target dir for materialized index files 

2066 index_path: Target path for generated index 

2067 object_store: Non-empty object store holding tree contents 

2068 honor_filemode: An optional flag to honor core.filemode setting in 

2069 config file, default is core.filemode=True, change executable bit 

2070 validate_path_element: Function to validate path elements to check 

2071 out; default just refuses .git and .. directories. 

2072 symlink_fn: Function to use for creating symlinks 

2073 blob_normalizer: An optional BlobNormalizer to use for converting line 

2074 endings when writing blobs to the working directory. 

2075 tree_encoding: Encoding used for tree paths (default: utf-8) 

2076 

2077 Note: existing index is wiped and contents are not merged 

2078 in a working dir. Suitable only for fresh clones. 

2079 """ 

2080 index = Index(index_path, read=False) 

2081 if not isinstance(root_path, bytes): 

2082 root_path = os.fsencode(root_path) 

2083 

2084 for entry in iter_tree_contents(object_store, tree_id): 

2085 assert ( 

2086 entry.path is not None and entry.mode is not None and entry.sha is not None 

2087 ) 

2088 if not validate_path(entry.path, validate_path_element): 

2089 continue 

2090 full_path = _tree_to_fs_path(root_path, entry.path, tree_encoding) 

2091 

2092 if not os.path.exists(os.path.dirname(full_path)): 

2093 os.makedirs(os.path.dirname(full_path)) 

2094 

2095 # TODO(jelmer): Merge new index into working tree 

2096 if S_ISGITLINK(entry.mode): 

2097 if not os.path.isdir(full_path): 

2098 os.mkdir(full_path) 

2099 st = os.lstat(full_path) 

2100 # TODO(jelmer): record and return submodule paths 

2101 else: 

2102 obj = object_store[entry.sha] 

2103 assert isinstance(obj, Blob) 

2104 # Apply blob normalization for checkout if normalizer is provided 

2105 if blob_normalizer is not None: 

2106 obj = blob_normalizer.checkout_normalize(obj, entry.path) 

2107 st = build_file_from_blob( 

2108 obj, 

2109 entry.mode, 

2110 full_path, 

2111 honor_filemode=honor_filemode, 

2112 tree_encoding=tree_encoding, 

2113 symlink_fn=symlink_fn, 

2114 ) 

2115 

2116 # Add file to index 

2117 if not honor_filemode or S_ISGITLINK(entry.mode): 

2118 # we can not use tuple slicing to build a new tuple, 

2119 # because on windows that will convert the times to 

2120 # longs, which causes errors further along 

2121 st_tuple = ( 

2122 entry.mode, 

2123 st.st_ino, 

2124 st.st_dev, 

2125 st.st_nlink, 

2126 st.st_uid, 

2127 st.st_gid, 

2128 st.st_size, 

2129 st.st_atime, 

2130 st.st_mtime, 

2131 st.st_ctime, 

2132 ) 

2133 st = st.__class__(st_tuple) 

2134 # default to a stage 0 index entry (normal) 

2135 # when reading from the filesystem 

2136 index[entry.path] = index_entry_from_stat(st, entry.sha) 

2137 

2138 index.write() 

2139 

2140 

2141def blob_from_path_and_mode( 

2142 fs_path: bytes, mode: int, tree_encoding: str = "utf-8" 

2143) -> Blob: 

2144 """Create a blob from a path and a stat object. 

2145 

2146 Args: 

2147 fs_path: Full file system path to file 

2148 mode: File mode 

2149 tree_encoding: Encoding to use for tree contents 

2150 Returns: A `Blob` object 

2151 """ 

2152 assert isinstance(fs_path, bytes) 

2153 blob = Blob() 

2154 if stat.S_ISLNK(mode): 

2155 if sys.platform == "win32": 

2156 # os.readlink on Python3 on Windows requires a unicode string. 

2157 blob.data = os.readlink(os.fsdecode(fs_path)).encode(tree_encoding) 

2158 else: 

2159 blob.data = os.readlink(fs_path) 

2160 else: 

2161 with open(fs_path, "rb") as f: 

2162 blob.data = f.read() 

2163 return blob 

2164 

2165 

2166def blob_from_path_and_stat( 

2167 fs_path: bytes, st: os.stat_result, tree_encoding: str = "utf-8" 

2168) -> Blob: 

2169 """Create a blob from a path and a stat object. 

2170 

2171 Args: 

2172 fs_path: Full file system path to file 

2173 st: A stat object 

2174 tree_encoding: Encoding to use for tree contents 

2175 Returns: A `Blob` object 

2176 """ 

2177 return blob_from_path_and_mode(fs_path, st.st_mode, tree_encoding) 

2178 

2179 

2180def read_submodule_head(path: str | bytes) -> bytes | None: 

2181 """Read the head commit of a submodule. 

2182 

2183 Args: 

2184 path: path to the submodule 

2185 Returns: HEAD sha, None if not a valid head/repository 

2186 """ 

2187 from .errors import NotGitRepository 

2188 from .repo import Repo 

2189 

2190 # Repo currently expects a "str", so decode if necessary. 

2191 # TODO(jelmer): Perhaps move this into Repo() ? 

2192 if not isinstance(path, str): 

2193 path = os.fsdecode(path) 

2194 try: 

2195 repo = Repo(path) 

2196 except NotGitRepository: 

2197 return None 

2198 try: 

2199 return repo.head() 

2200 except KeyError: 

2201 return None 

2202 

2203 

2204def _has_directory_changed(tree_path: bytes, entry: IndexEntry) -> bool: 

2205 """Check if a directory has changed after getting an error. 

2206 

2207 When handling an error trying to create a blob from a path, call this 

2208 function. It will check if the path is a directory. If it's a directory 

2209 and a submodule, check the submodule head to see if it's has changed. If 

2210 not, consider the file as changed as Git tracked a file and not a 

2211 directory. 

2212 

2213 Return true if the given path should be considered as changed and False 

2214 otherwise or if the path is not a directory. 

2215 """ 

2216 # This is actually a directory 

2217 if os.path.exists(os.path.join(tree_path, b".git")): 

2218 # Submodule 

2219 head = read_submodule_head(tree_path) 

2220 if entry.sha != head: 

2221 return True 

2222 else: 

2223 # The file was changed to a directory, so consider it removed. 

2224 return True 

2225 

2226 return False 

2227 

2228 

2229os_sep_bytes = os.sep.encode("ascii") 

2230 

2231 

2232def _ensure_parent_dir_exists(full_path: bytes) -> None: 

2233 """Ensure parent directory exists, checking no parent is a file.""" 

2234 parent_dir = os.path.dirname(full_path) 

2235 if parent_dir and not os.path.exists(parent_dir): 

2236 # Walk up the directory tree to find the first existing parent 

2237 current = parent_dir 

2238 parents_to_check: list[bytes] = [] 

2239 

2240 while current and not os.path.exists(current): 

2241 parents_to_check.insert(0, current) 

2242 new_parent = os.path.dirname(current) 

2243 if new_parent == current: 

2244 # Reached the root or can't go up further 

2245 break 

2246 current = new_parent 

2247 

2248 # Check if the existing parent (if any) is a directory 

2249 if current and os.path.exists(current) and not os.path.isdir(current): 

2250 raise OSError( 

2251 f"Cannot create directory, parent path is a file: {current!r}" 

2252 ) 

2253 

2254 # Now check each parent we need to create isn't blocked by an existing file 

2255 for parent_path in parents_to_check: 

2256 if os.path.exists(parent_path) and not os.path.isdir(parent_path): 

2257 raise OSError( 

2258 f"Cannot create directory, parent path is a file: {parent_path!r}" 

2259 ) 

2260 

2261 os.makedirs(parent_dir) 

2262 

2263 

2264def _remove_file_with_readonly_handling(path: bytes) -> None: 

2265 """Remove a file, handling read-only files on Windows. 

2266 

2267 Args: 

2268 path: Path to the file to remove 

2269 """ 

2270 try: 

2271 os.unlink(path) 

2272 except PermissionError: 

2273 # On Windows, remove read-only attribute and retry 

2274 if sys.platform == "win32": 

2275 os.chmod(path, stat.S_IWRITE | stat.S_IREAD) 

2276 os.unlink(path) 

2277 else: 

2278 raise 

2279 

2280 

2281def _remove_empty_parents(path: bytes, stop_at: bytes) -> None: 

2282 """Remove empty parent directories up to stop_at.""" 

2283 parent = os.path.dirname(path) 

2284 while parent and parent != stop_at: 

2285 try: 

2286 os.rmdir(parent) 

2287 parent = os.path.dirname(parent) 

2288 except FileNotFoundError: 

2289 # Directory doesn't exist - stop trying 

2290 break 

2291 except OSError as e: 

2292 if e.errno in (errno.ENOTEMPTY, errno.EEXIST): 

2293 # Directory not empty - stop trying 

2294 break 

2295 raise 

2296 

2297 

2298def _check_symlink_matches( 

2299 full_path: bytes, repo_object_store: "BaseObjectStore", entry_sha: ObjectID 

2300) -> bool: 

2301 """Check if symlink target matches expected target. 

2302 

2303 Returns True if symlink matches, False if it doesn't match. 

2304 """ 

2305 try: 

2306 current_target = os.readlink(full_path) 

2307 blob_obj = repo_object_store[entry_sha] 

2308 expected_target = blob_obj.as_raw_string() 

2309 if isinstance(current_target, str): 

2310 current_target = current_target.encode() 

2311 return current_target == expected_target 

2312 except FileNotFoundError: 

2313 # Symlink doesn't exist 

2314 return False 

2315 except OSError as e: 

2316 if e.errno == errno.EINVAL: 

2317 # Not a symlink 

2318 return False 

2319 raise 

2320 

2321 

2322def _check_file_matches( 

2323 repo_object_store: "BaseObjectStore", 

2324 full_path: bytes, 

2325 entry_sha: ObjectID, 

2326 entry_mode: int, 

2327 current_stat: os.stat_result, 

2328 honor_filemode: bool, 

2329 blob_normalizer: "FilterBlobNormalizer | None" = None, 

2330 tree_path: bytes | None = None, 

2331) -> bool: 

2332 """Check if a file on disk matches the expected git object. 

2333 

2334 Returns True if file matches, False if it doesn't match. 

2335 """ 

2336 # Check mode first (if honor_filemode is True) 

2337 if honor_filemode: 

2338 current_mode = stat.S_IMODE(current_stat.st_mode) 

2339 expected_mode = stat.S_IMODE(entry_mode) 

2340 

2341 # For regular files, only check the user executable bit, not group/other permissions 

2342 # This matches Git's behavior where umask differences don't count as modifications 

2343 if stat.S_ISREG(current_stat.st_mode): 

2344 # Normalize regular file modes to ignore group/other write permissions 

2345 current_mode_normalized = ( 

2346 current_mode & 0o755 

2347 ) # Keep only user rwx and all read+execute 

2348 expected_mode_normalized = expected_mode & 0o755 

2349 

2350 # For Git compatibility, regular files should be either 644 or 755 

2351 if expected_mode_normalized not in (0o644, 0o755): 

2352 expected_mode_normalized = 0o644 # Default for regular files 

2353 if current_mode_normalized not in (0o644, 0o755): 

2354 # Determine if it should be executable based on user execute bit 

2355 if current_mode & 0o100: # User execute bit is set 

2356 current_mode_normalized = 0o755 

2357 else: 

2358 current_mode_normalized = 0o644 

2359 

2360 if current_mode_normalized != expected_mode_normalized: 

2361 return False 

2362 else: 

2363 # For non-regular files (symlinks, etc.), check mode exactly 

2364 if current_mode != expected_mode: 

2365 return False 

2366 

2367 # If mode matches (or we don't care), check content via size first 

2368 blob_obj = repo_object_store[entry_sha] 

2369 if current_stat.st_size != blob_obj.raw_length(): 

2370 return False 

2371 

2372 # Size matches, check actual content 

2373 try: 

2374 with open(full_path, "rb") as f: 

2375 current_content = f.read() 

2376 expected_content = blob_obj.as_raw_string() 

2377 if blob_normalizer and tree_path is not None: 

2378 assert isinstance(blob_obj, Blob) 

2379 normalized_blob = blob_normalizer.checkout_normalize( 

2380 blob_obj, tree_path 

2381 ) 

2382 expected_content = normalized_blob.as_raw_string() 

2383 return current_content == expected_content 

2384 except (FileNotFoundError, PermissionError, IsADirectoryError): 

2385 return False 

2386 

2387 

2388def _transition_to_submodule( 

2389 repo: "Repo", 

2390 path: bytes, 

2391 full_path: bytes, 

2392 current_stat: os.stat_result | None, 

2393 entry: IndexEntry | TreeEntry, 

2394 index: Index, 

2395) -> None: 

2396 """Transition any type to submodule.""" 

2397 from .submodule import ensure_submodule_placeholder 

2398 

2399 if current_stat is not None and stat.S_ISDIR(current_stat.st_mode): 

2400 # Already a directory, just ensure .git file exists 

2401 ensure_submodule_placeholder(repo, path) 

2402 else: 

2403 # Remove whatever is there and create submodule 

2404 if current_stat is not None: 

2405 _remove_file_with_readonly_handling(full_path) 

2406 ensure_submodule_placeholder(repo, path) 

2407 

2408 st = os.lstat(full_path) 

2409 assert entry.sha is not None 

2410 index[path] = index_entry_from_stat(st, entry.sha) 

2411 

2412 

2413def _transition_to_file( 

2414 object_store: "BaseObjectStore", 

2415 path: bytes, 

2416 full_path: bytes, 

2417 current_stat: os.stat_result | None, 

2418 entry: IndexEntry | TreeEntry, 

2419 index: Index, 

2420 honor_filemode: bool, 

2421 symlink_fn: Callable[ 

2422 [str | bytes | os.PathLike[str], str | bytes | os.PathLike[str]], None 

2423 ] 

2424 | None, 

2425 blob_normalizer: "FilterBlobNormalizer | None", 

2426 tree_encoding: str = "utf-8", 

2427) -> None: 

2428 """Transition any type to regular file or symlink.""" 

2429 assert entry.sha is not None and entry.mode is not None 

2430 # Check if we need to update 

2431 if ( 

2432 current_stat is not None 

2433 and stat.S_ISREG(current_stat.st_mode) 

2434 and not stat.S_ISLNK(entry.mode) 

2435 ): 

2436 # File to file - check if update needed 

2437 file_matches = _check_file_matches( 

2438 object_store, 

2439 full_path, 

2440 entry.sha, 

2441 entry.mode, 

2442 current_stat, 

2443 honor_filemode, 

2444 blob_normalizer, 

2445 path, 

2446 ) 

2447 needs_update = not file_matches 

2448 elif ( 

2449 current_stat is not None 

2450 and stat.S_ISLNK(current_stat.st_mode) 

2451 and stat.S_ISLNK(entry.mode) 

2452 ): 

2453 # Symlink to symlink - check if update needed 

2454 symlink_matches = _check_symlink_matches(full_path, object_store, entry.sha) 

2455 needs_update = not symlink_matches 

2456 else: 

2457 needs_update = True 

2458 

2459 if not needs_update: 

2460 # Just update index - current_stat should always be valid here since we're not updating 

2461 assert current_stat is not None 

2462 index[path] = index_entry_from_stat(current_stat, entry.sha) 

2463 return 

2464 

2465 # Remove existing entry if needed 

2466 if current_stat is not None and stat.S_ISDIR(current_stat.st_mode): 

2467 # Remove directory 

2468 dir_contents = set(os.listdir(full_path)) 

2469 git_file_name = b".git" if isinstance(full_path, bytes) else ".git" 

2470 

2471 if git_file_name in dir_contents: 

2472 if dir_contents != {git_file_name}: 

2473 raise IsADirectoryError( 

2474 f"Cannot replace submodule with untracked files: {full_path!r}" 

2475 ) 

2476 shutil.rmtree(full_path) 

2477 else: 

2478 try: 

2479 os.rmdir(full_path) 

2480 except OSError as e: 

2481 if e.errno in (errno.ENOTEMPTY, errno.EEXIST): 

2482 raise IsADirectoryError( 

2483 f"Cannot replace non-empty directory with file: {full_path!r}" 

2484 ) 

2485 raise 

2486 elif current_stat is not None: 

2487 _remove_file_with_readonly_handling(full_path) 

2488 

2489 # Ensure parent directory exists 

2490 _ensure_parent_dir_exists(full_path) 

2491 

2492 # Write the file 

2493 blob_obj = object_store[entry.sha] 

2494 assert isinstance(blob_obj, Blob) 

2495 if blob_normalizer: 

2496 blob_obj = blob_normalizer.checkout_normalize(blob_obj, path) 

2497 st = build_file_from_blob( 

2498 blob_obj, 

2499 entry.mode, 

2500 full_path, 

2501 honor_filemode=honor_filemode, 

2502 tree_encoding=tree_encoding, 

2503 symlink_fn=symlink_fn, 

2504 ) 

2505 index[path] = index_entry_from_stat(st, entry.sha) 

2506 

2507 

2508def _transition_to_absent( 

2509 repo: "Repo", 

2510 path: bytes, 

2511 full_path: bytes, 

2512 current_stat: os.stat_result | None, 

2513 index: Index, 

2514) -> None: 

2515 """Remove any type of entry.""" 

2516 if current_stat is None: 

2517 return 

2518 

2519 if stat.S_ISDIR(current_stat.st_mode): 

2520 # Check if it's a submodule directory 

2521 dir_contents = set(os.listdir(full_path)) 

2522 git_file_name = b".git" if isinstance(full_path, bytes) else ".git" 

2523 

2524 if git_file_name in dir_contents and dir_contents == {git_file_name}: 

2525 shutil.rmtree(full_path) 

2526 else: 

2527 try: 

2528 os.rmdir(full_path) 

2529 except OSError as e: 

2530 if e.errno not in (errno.ENOTEMPTY, errno.EEXIST): 

2531 raise 

2532 else: 

2533 _remove_file_with_readonly_handling(full_path) 

2534 

2535 try: 

2536 del index[path] 

2537 except KeyError: 

2538 pass 

2539 

2540 # Try to remove empty parent directories 

2541 _remove_empty_parents( 

2542 full_path, repo.path if isinstance(repo.path, bytes) else repo.path.encode() 

2543 ) 

2544 

2545 

2546def detect_case_only_renames( 

2547 changes: Sequence["TreeChange"], 

2548 config: "Config", 

2549) -> list["TreeChange"]: 

2550 """Detect and transform case-only renames in a list of tree changes. 

2551 

2552 This function identifies file renames that only differ in case (e.g., 

2553 README.txt -> readme.txt) and transforms matching ADD/DELETE pairs into 

2554 CHANGE_RENAME operations. It uses filesystem-appropriate path normalization 

2555 based on the repository configuration. 

2556 

2557 Args: 

2558 changes: List of TreeChange objects representing file changes 

2559 config: Repository configuration object 

2560 

2561 Returns: 

2562 New list of TreeChange objects with case-only renames converted to CHANGE_RENAME 

2563 """ 

2564 from .diff_tree import ( 

2565 CHANGE_ADD, 

2566 CHANGE_COPY, 

2567 CHANGE_DELETE, 

2568 CHANGE_MODIFY, 

2569 CHANGE_RENAME, 

2570 TreeChange, 

2571 ) 

2572 

2573 # Build dictionaries of old and new paths with their normalized forms 

2574 old_paths_normalized = {} 

2575 new_paths_normalized = {} 

2576 old_changes = {} # Map from old path to change object 

2577 new_changes = {} # Map from new path to change object 

2578 

2579 # Get the appropriate normalizer based on config 

2580 normalize_func = get_path_element_normalizer(config) 

2581 

2582 def normalize_path(path: bytes) -> bytes: 

2583 """Normalize entire path using element normalization.""" 

2584 return b"/".join(normalize_func(part) for part in path.split(b"/")) 

2585 

2586 # Pre-normalize all paths once to avoid repeated normalization 

2587 for change in changes: 

2588 if change.type == CHANGE_DELETE and change.old: 

2589 assert change.old.path is not None 

2590 try: 

2591 normalized = normalize_path(change.old.path) 

2592 except UnicodeDecodeError: 

2593 import logging 

2594 

2595 logging.warning( 

2596 "Skipping case-only rename detection for path with invalid UTF-8: %r", 

2597 change.old.path, 

2598 ) 

2599 else: 

2600 old_paths_normalized[normalized] = change.old.path 

2601 old_changes[change.old.path] = change 

2602 elif change.type == CHANGE_RENAME and change.old: 

2603 assert change.old.path is not None 

2604 # Treat RENAME as DELETE + ADD for case-only detection 

2605 try: 

2606 normalized = normalize_path(change.old.path) 

2607 except UnicodeDecodeError: 

2608 import logging 

2609 

2610 logging.warning( 

2611 "Skipping case-only rename detection for path with invalid UTF-8: %r", 

2612 change.old.path, 

2613 ) 

2614 else: 

2615 old_paths_normalized[normalized] = change.old.path 

2616 old_changes[change.old.path] = change 

2617 

2618 if ( 

2619 change.type in (CHANGE_ADD, CHANGE_MODIFY, CHANGE_RENAME, CHANGE_COPY) 

2620 and change.new 

2621 ): 

2622 assert change.new.path is not None 

2623 try: 

2624 normalized = normalize_path(change.new.path) 

2625 except UnicodeDecodeError: 

2626 import logging 

2627 

2628 logging.warning( 

2629 "Skipping case-only rename detection for path with invalid UTF-8: %r", 

2630 change.new.path, 

2631 ) 

2632 else: 

2633 new_paths_normalized[normalized] = change.new.path 

2634 new_changes[change.new.path] = change 

2635 

2636 # Find case-only renames and transform changes 

2637 case_only_renames = set() 

2638 new_rename_changes = [] 

2639 

2640 for norm_path, old_path in old_paths_normalized.items(): 

2641 if norm_path in new_paths_normalized: 

2642 new_path = new_paths_normalized[norm_path] 

2643 if old_path != new_path: 

2644 # Found a case-only rename 

2645 old_change = old_changes[old_path] 

2646 new_change = new_changes[new_path] 

2647 

2648 # Create a CHANGE_RENAME to replace the DELETE and ADD/MODIFY pair 

2649 if new_change.type == CHANGE_ADD: 

2650 # Simple case: DELETE + ADD becomes RENAME 

2651 rename_change = TreeChange( 

2652 CHANGE_RENAME, old_change.old, new_change.new 

2653 ) 

2654 else: 

2655 # Complex case: DELETE + MODIFY becomes RENAME 

2656 # Use the old file from DELETE and new file from MODIFY 

2657 rename_change = TreeChange( 

2658 CHANGE_RENAME, old_change.old, new_change.new 

2659 ) 

2660 

2661 new_rename_changes.append(rename_change) 

2662 

2663 # Mark the old changes for removal 

2664 case_only_renames.add(old_change) 

2665 case_only_renames.add(new_change) 

2666 

2667 # Return new list with original ADD/DELETE changes replaced by renames 

2668 result = [change for change in changes if change not in case_only_renames] 

2669 result.extend(new_rename_changes) 

2670 return result 

2671 

2672 

2673def update_working_tree( 

2674 repo: "Repo", 

2675 old_tree_id: bytes | None, 

2676 new_tree_id: bytes, 

2677 change_iterator: Iterator["TreeChange"], 

2678 honor_filemode: bool = True, 

2679 validate_path_element: Callable[[bytes], bool] | None = None, 

2680 symlink_fn: Callable[ 

2681 [str | bytes | os.PathLike[str], str | bytes | os.PathLike[str]], None 

2682 ] 

2683 | None = None, 

2684 force_remove_untracked: bool = False, 

2685 blob_normalizer: "FilterBlobNormalizer | None" = None, 

2686 tree_encoding: str = "utf-8", 

2687 allow_overwrite_modified: bool = False, 

2688 *, 

2689 config: "Config | None" = None, 

2690) -> None: 

2691 """Update the working tree and index to match a new tree. 

2692 

2693 This function handles: 

2694 - Adding new files 

2695 - Updating modified files 

2696 - Removing deleted files 

2697 - Cleaning up empty directories 

2698 

2699 Args: 

2700 repo: Repository object 

2701 old_tree_id: SHA of the tree before the update 

2702 new_tree_id: SHA of the tree to update to 

2703 change_iterator: Iterator of TreeChange objects to apply 

2704 honor_filemode: An optional flag to honor core.filemode setting 

2705 validate_path_element: Function to validate path elements to check out 

2706 symlink_fn: Function to use for creating symlinks 

2707 force_remove_untracked: If True, remove files that exist in working 

2708 directory but not in target tree, even if old_tree_id is None 

2709 blob_normalizer: An optional BlobNormalizer to use for converting line 

2710 endings when writing blobs to the working directory. 

2711 tree_encoding: Encoding used for tree paths (default: utf-8) 

2712 allow_overwrite_modified: If False, raise an error when attempting to 

2713 overwrite files that have been modified compared to old_tree_id 

2714 config: Repository configuration. If None, falls back to 

2715 ``repo.get_config_stack()``. 

2716 """ 

2717 if validate_path_element is None: 

2718 validate_path_element = validate_path_element_default 

2719 

2720 from .diff_tree import ( 

2721 CHANGE_ADD, 

2722 CHANGE_COPY, 

2723 CHANGE_DELETE, 

2724 CHANGE_MODIFY, 

2725 CHANGE_RENAME, 

2726 CHANGE_UNCHANGED, 

2727 ) 

2728 

2729 repo_path = repo.path if isinstance(repo.path, bytes) else repo.path.encode() 

2730 if config is None: 

2731 config = repo.get_config_stack() 

2732 index = repo.open_index(config=config) 

2733 

2734 # Convert iterator to list since we need multiple passes 

2735 changes = list(change_iterator) 

2736 

2737 # Transform case-only renames on case-insensitive filesystems 

2738 import platform 

2739 

2740 default_ignore_case = platform.system() in ("Windows", "Darwin") 

2741 config = repo.get_config() 

2742 ignore_case = config.get_boolean((b"core",), b"ignorecase", default_ignore_case) 

2743 

2744 if ignore_case: 

2745 config = repo.get_config() 

2746 changes = detect_case_only_renames(changes, config) 

2747 

2748 # Check for path conflicts where files need to become directories 

2749 paths_becoming_dirs = set() 

2750 for change in changes: 

2751 if change.type in (CHANGE_ADD, CHANGE_MODIFY, CHANGE_RENAME, CHANGE_COPY): 

2752 assert change.new is not None 

2753 path = change.new.path 

2754 assert path is not None 

2755 if b"/" in path: # This is a file inside a directory 

2756 # Check if any parent path exists as a file in the old tree or changes 

2757 parts = path.split(b"/") 

2758 for i in range(1, len(parts)): 

2759 parent = b"/".join(parts[:i]) 

2760 # See if this parent path is being deleted (was a file, becoming a dir) 

2761 for other_change in changes: 

2762 if ( 

2763 other_change.type == CHANGE_DELETE 

2764 and other_change.old 

2765 and other_change.old.path == parent 

2766 ): 

2767 paths_becoming_dirs.add(parent) 

2768 

2769 # Check if any path that needs to become a directory has been modified 

2770 for path in paths_becoming_dirs: 

2771 full_path = _tree_to_fs_path(repo_path, path, tree_encoding) 

2772 try: 

2773 current_stat = os.lstat(full_path) 

2774 except FileNotFoundError: 

2775 continue # File doesn't exist, nothing to check 

2776 except OSError as e: 

2777 raise OSError( 

2778 f"Cannot access {path.decode('utf-8', errors='replace')}: {e}" 

2779 ) from e 

2780 

2781 if stat.S_ISREG(current_stat.st_mode): 

2782 # Find the old entry for this path 

2783 old_change = None 

2784 for change in changes: 

2785 if ( 

2786 change.type == CHANGE_DELETE 

2787 and change.old 

2788 and change.old.path == path 

2789 ): 

2790 old_change = change 

2791 break 

2792 

2793 if old_change: 

2794 # Check if file has been modified 

2795 assert old_change.old is not None 

2796 assert ( 

2797 old_change.old.sha is not None and old_change.old.mode is not None 

2798 ) 

2799 file_matches = _check_file_matches( 

2800 repo.object_store, 

2801 full_path, 

2802 old_change.old.sha, 

2803 old_change.old.mode, 

2804 current_stat, 

2805 honor_filemode, 

2806 blob_normalizer, 

2807 path, 

2808 ) 

2809 if not file_matches: 

2810 raise OSError( 

2811 f"Cannot replace modified file with directory: {path!r}" 

2812 ) 

2813 

2814 # Check for uncommitted modifications before making any changes 

2815 if not allow_overwrite_modified and old_tree_id: 

2816 for change in changes: 

2817 # Only check files that are being modified or deleted 

2818 if change.type in (CHANGE_MODIFY, CHANGE_DELETE) and change.old: 

2819 path = change.old.path 

2820 assert path is not None 

2821 if not validate_path(path, validate_path_element): 

2822 continue 

2823 

2824 full_path = _tree_to_fs_path(repo_path, path, tree_encoding) 

2825 try: 

2826 current_stat = os.lstat(full_path) 

2827 except FileNotFoundError: 

2828 continue # File doesn't exist, nothing to check 

2829 except OSError as e: 

2830 raise OSError( 

2831 f"Cannot access {path.decode('utf-8', errors='replace')}: {e}" 

2832 ) from e 

2833 

2834 if stat.S_ISREG(current_stat.st_mode): 

2835 # Check if working tree file differs from old tree 

2836 assert change.old.sha is not None and change.old.mode is not None 

2837 file_matches = _check_file_matches( 

2838 repo.object_store, 

2839 full_path, 

2840 change.old.sha, 

2841 change.old.mode, 

2842 current_stat, 

2843 honor_filemode, 

2844 blob_normalizer, 

2845 path, 

2846 ) 

2847 if not file_matches: 

2848 from .errors import WorkingTreeModifiedError 

2849 

2850 raise WorkingTreeModifiedError( 

2851 f"Your local changes to '{path.decode('utf-8', errors='replace')}' " 

2852 f"would be overwritten by checkout. " 

2853 f"Please commit your changes or stash them before you switch branches." 

2854 ) 

2855 

2856 # Apply the changes 

2857 for change in changes: 

2858 if change.type in (CHANGE_DELETE, CHANGE_RENAME): 

2859 # Remove file/directory 

2860 assert change.old is not None and change.old.path is not None 

2861 path = change.old.path 

2862 if not validate_path(path, validate_path_element): 

2863 continue 

2864 

2865 full_path = _tree_to_fs_path(repo_path, path, tree_encoding) 

2866 try: 

2867 delete_stat: os.stat_result | None = os.lstat(full_path) 

2868 except FileNotFoundError: 

2869 delete_stat = None 

2870 except OSError as e: 

2871 raise OSError( 

2872 f"Cannot access {path.decode('utf-8', errors='replace')}: {e}" 

2873 ) from e 

2874 

2875 _transition_to_absent(repo, path, full_path, delete_stat, index) 

2876 

2877 if change.type in ( 

2878 CHANGE_ADD, 

2879 CHANGE_MODIFY, 

2880 CHANGE_UNCHANGED, 

2881 CHANGE_COPY, 

2882 CHANGE_RENAME, 

2883 ): 

2884 # Add or modify file 

2885 assert ( 

2886 change.new is not None 

2887 and change.new.path is not None 

2888 and change.new.mode is not None 

2889 ) 

2890 path = change.new.path 

2891 if not validate_path(path, validate_path_element): 

2892 continue 

2893 

2894 full_path = _tree_to_fs_path(repo_path, path, tree_encoding) 

2895 try: 

2896 modify_stat: os.stat_result | None = os.lstat(full_path) 

2897 except FileNotFoundError: 

2898 modify_stat = None 

2899 except OSError as e: 

2900 raise OSError( 

2901 f"Cannot access {path.decode('utf-8', errors='replace')}: {e}" 

2902 ) from e 

2903 

2904 if S_ISGITLINK(change.new.mode): 

2905 _transition_to_submodule( 

2906 repo, path, full_path, modify_stat, change.new, index 

2907 ) 

2908 else: 

2909 _transition_to_file( 

2910 repo.object_store, 

2911 path, 

2912 full_path, 

2913 modify_stat, 

2914 change.new, 

2915 index, 

2916 honor_filemode, 

2917 symlink_fn, 

2918 blob_normalizer, 

2919 tree_encoding, 

2920 ) 

2921 

2922 index.write() 

2923 

2924 

2925def _stat_matches_entry( 

2926 st: os.stat_result, entry: IndexEntry, trust_ctime: bool = True 

2927) -> bool: 

2928 """Check if filesystem stat matches index entry stat. 

2929 

2930 This is used to determine if a file might have changed without reading its content. 

2931 Git uses this optimization to avoid expensive filter operations on unchanged files. 

2932 

2933 Args: 

2934 st: Filesystem stat result 

2935 entry: Index entry to compare against 

2936 trust_ctime: If True, also check ctime (default: True, matching Git behavior) 

2937 Returns: True if stat matches and file is likely unchanged 

2938 """ 

2939 # Compare change time (ctime) if trust_ctime is enabled 

2940 if trust_ctime: 

2941 # Get entry ctime with nanosecond precision if available 

2942 if isinstance(entry.ctime, tuple): 

2943 entry_ctime_sec = entry.ctime[0] 

2944 entry_ctime_nsec = entry.ctime[1] 

2945 else: 

2946 entry_ctime_sec = int(entry.ctime) 

2947 entry_ctime_nsec = 0 

2948 

2949 if hasattr(st, "st_ctime_ns"): 

2950 # Use nanosecond precision when available 

2951 st_ctime_nsec = st.st_ctime_ns 

2952 entry_ctime_nsec_total = entry_ctime_sec * 1_000_000_000 + entry_ctime_nsec 

2953 if st_ctime_nsec != entry_ctime_nsec_total: 

2954 return False 

2955 else: 

2956 # Fall back to second precision 

2957 if int(st.st_ctime) != entry_ctime_sec: 

2958 return False 

2959 

2960 # Get entry mtime with nanosecond precision if available 

2961 if isinstance(entry.mtime, tuple): 

2962 entry_mtime_sec = entry.mtime[0] 

2963 entry_mtime_nsec = entry.mtime[1] 

2964 else: 

2965 entry_mtime_sec = int(entry.mtime) 

2966 entry_mtime_nsec = 0 

2967 

2968 # Compare modification time with nanosecond precision if available 

2969 # This is important for fast workflows (e.g., stash) where files can be 

2970 # modified multiple times within the same second 

2971 if hasattr(st, "st_mtime_ns"): 

2972 # Use nanosecond precision when available 

2973 st_mtime_nsec = st.st_mtime_ns 

2974 entry_mtime_nsec_total = entry_mtime_sec * 1_000_000_000 + entry_mtime_nsec 

2975 if st_mtime_nsec != entry_mtime_nsec_total: 

2976 return False 

2977 else: 

2978 # Fall back to second precision 

2979 if int(st.st_mtime) != entry_mtime_sec: 

2980 return False 

2981 

2982 # Compare file size 

2983 if st.st_size != entry.size: 

2984 return False 

2985 

2986 # If all checks pass, file is likely unchanged 

2987 return True 

2988 

2989 

2990def _check_entry_for_changes( 

2991 tree_path: bytes, 

2992 entry: IndexEntry | ConflictedIndexEntry, 

2993 root_path: bytes, 

2994 filter_blob_callback: Callable[[Blob, bytes], Blob] | None = None, 

2995 trust_ctime: bool = True, 

2996) -> bytes | None: 

2997 """Check a single index entry for changes. 

2998 

2999 Args: 

3000 tree_path: Path in the tree 

3001 entry: Index entry to check 

3002 root_path: Root filesystem path 

3003 filter_blob_callback: Optional callback to filter blobs 

3004 trust_ctime: If True, use ctime for change detection (default: True) 

3005 Returns: tree_path if changed, None otherwise 

3006 """ 

3007 if isinstance(entry, ConflictedIndexEntry): 

3008 # Conflicted files are always unstaged 

3009 return tree_path 

3010 

3011 full_path = _tree_to_fs_path(root_path, tree_path) 

3012 try: 

3013 st = os.lstat(full_path) 

3014 if stat.S_ISDIR(st.st_mode): 

3015 if _has_directory_changed(tree_path, entry): 

3016 return tree_path 

3017 return None 

3018 

3019 if not stat.S_ISREG(st.st_mode) and not stat.S_ISLNK(st.st_mode): 

3020 return None 

3021 

3022 # Optimization: If stat matches index entry (mtime and size unchanged), 

3023 # we can skip reading and filtering the file entirely. This is a significant 

3024 # performance improvement for repositories with many unchanged files. 

3025 # Even with filters (e.g., LFS), if the file hasn't been modified (stat unchanged), 

3026 # the filter output would be the same, so we can safely skip the expensive 

3027 # filter operation. This addresses performance issues with LFS repositories 

3028 # where filter operations can be very slow. 

3029 if _stat_matches_entry(st, entry, trust_ctime): 

3030 return None 

3031 

3032 blob = blob_from_path_and_stat(full_path, st) 

3033 

3034 if filter_blob_callback is not None: 

3035 blob = filter_blob_callback(blob, tree_path) 

3036 except FileNotFoundError: 

3037 # The file was removed, so we assume that counts as 

3038 # different from whatever file used to exist. 

3039 return tree_path 

3040 else: 

3041 if blob.id != entry.sha: 

3042 return tree_path 

3043 return None 

3044 

3045 

3046def get_unstaged_changes( 

3047 index: Index, 

3048 root_path: str | bytes, 

3049 filter_blob_callback: Callable[..., Any] | None = None, 

3050 preload_index: bool = False, 

3051 trust_ctime: bool = True, 

3052 max_stat: int | None = None, 

3053) -> Generator[bytes, None, None]: 

3054 """Walk through an index and check for differences against working tree. 

3055 

3056 Args: 

3057 index: index to check 

3058 root_path: path in which to find files 

3059 filter_blob_callback: Optional callback to filter blobs 

3060 preload_index: If True, use parallel threads to check files (requires threading support) 

3061 trust_ctime: If True, use ctime for change detection (default: True) 

3062 max_stat: If set, limit the number of stat operations performed. 

3063 When the limit is reached, remaining files are assumed unchanged. 

3064 Returns: iterator over paths with unstaged changes 

3065 """ 

3066 # For each entry in the index check the sha1 & ensure not staged 

3067 if not isinstance(root_path, bytes): 

3068 root_path = os.fsencode(root_path) 

3069 

3070 stat_count = 0 

3071 

3072 if preload_index: 

3073 # Use parallel processing for better performance on slow filesystems 

3074 try: 

3075 import multiprocessing 

3076 from concurrent.futures import ThreadPoolExecutor 

3077 except ImportError: 

3078 # If threading is not available, fall back to serial processing 

3079 preload_index = False 

3080 else: 

3081 # Collect all entries first 

3082 entries = list(index.iteritems()) 

3083 

3084 if max_stat is not None: 

3085 # When max_stat is set, limit the entries we process 

3086 entries = entries[:max_stat] 

3087 

3088 # Use number of CPUs but cap at 8 threads to avoid overhead 

3089 num_workers = min(multiprocessing.cpu_count(), 8) 

3090 

3091 # Process entries in parallel 

3092 with ThreadPoolExecutor(max_workers=num_workers) as executor: 

3093 # Submit all tasks 

3094 futures = [ 

3095 executor.submit( 

3096 _check_entry_for_changes, 

3097 tree_path, 

3098 entry, 

3099 root_path, 

3100 filter_blob_callback, 

3101 trust_ctime, 

3102 ) 

3103 for tree_path, entry in entries 

3104 ] 

3105 

3106 # Yield results as they complete 

3107 for future in futures: 

3108 result = future.result() 

3109 if result is not None: 

3110 yield result 

3111 

3112 if not preload_index: 

3113 # Serial processing 

3114 for tree_path, entry in index.iteritems(): 

3115 if max_stat is not None and stat_count >= max_stat: 

3116 return 

3117 result = _check_entry_for_changes( 

3118 tree_path, entry, root_path, filter_blob_callback, trust_ctime 

3119 ) 

3120 stat_count += 1 

3121 if result is not None: 

3122 yield result 

3123 

3124 

3125def _decode_utf8_with_fallback(data: bytes) -> str: 

3126 """Decode bytes as UTF-8, with lossy fallbacks for invalid sequences. 

3127 

3128 Mirrors the behaviour of git-for-windows's ``xutftowcsn`` (in 

3129 ``compat/mingw.c``) so that tree paths containing legacy-encoded or 

3130 otherwise invalid UTF-8 produce the same on-disk filename as C git. 

3131 

3132 Rules: 

3133 * Valid UTF-8 (1-4 byte sequences, excluding overlongs and codepoints 

3134 > U+10FFFF) is decoded normally. 

3135 * Invalid bytes in 0xa0-0xff map 1:1 to U+00A0-U+00FF. 

3136 * Invalid bytes in 0x80-0x9f are expanded to two lowercase ASCII hex 

3137 digits (e.g. byte 0x80 -> "80"). 

3138 * Truncated multi-byte sequences and overlong/out-of-range encodings 

3139 cause the lead byte to fall through to the above invalid-byte rules 

3140 (the trail bytes are re-evaluated on the next iteration). 

3141 """ 

3142 out: list[str] = [] 

3143 i = 0 

3144 n = len(data) 

3145 while i < n: 

3146 c = data[i] 

3147 if c < 0x80: 

3148 out.append(chr(c)) 

3149 i += 1 

3150 elif 0xC2 <= c < 0xE0 and i + 1 < n and (data[i + 1] & 0xC0) == 0x80: 

3151 cp = ((c & 0x1F) << 6) | (data[i + 1] & 0x3F) 

3152 out.append(chr(cp)) 

3153 i += 2 

3154 elif ( 

3155 0xE0 <= c < 0xF0 

3156 and i + 2 < n 

3157 and not (c == 0xE0 and data[i + 1] < 0xA0) 

3158 and (data[i + 1] & 0xC0) == 0x80 

3159 and (data[i + 2] & 0xC0) == 0x80 

3160 ): 

3161 cp = ((c & 0x0F) << 12) | ((data[i + 1] & 0x3F) << 6) | (data[i + 2] & 0x3F) 

3162 out.append(chr(cp)) 

3163 i += 3 

3164 elif ( 

3165 0xF0 <= c < 0xF5 

3166 and i + 3 < n 

3167 and not (c == 0xF0 and data[i + 1] < 0x90) 

3168 and not (c == 0xF4 and data[i + 1] >= 0x90) 

3169 and (data[i + 1] & 0xC0) == 0x80 

3170 and (data[i + 2] & 0xC0) == 0x80 

3171 and (data[i + 3] & 0xC0) == 0x80 

3172 ): 

3173 cp = ( 

3174 ((c & 0x07) << 18) 

3175 | ((data[i + 1] & 0x3F) << 12) 

3176 | ((data[i + 2] & 0x3F) << 6) 

3177 | (data[i + 3] & 0x3F) 

3178 ) 

3179 out.append(chr(cp)) 

3180 i += 4 

3181 elif c >= 0xA0: 

3182 out.append(chr(c)) 

3183 i += 1 

3184 else: 

3185 out.append(f"{c:02x}") 

3186 i += 1 

3187 return "".join(out) 

3188 

3189 

3190def _tree_to_fs_path( 

3191 root_path: bytes, tree_path: bytes, tree_encoding: str = "utf-8" 

3192) -> bytes: 

3193 """Convert a git tree path to a file system path. 

3194 

3195 Args: 

3196 root_path: Root filesystem path 

3197 tree_path: Git tree path as bytes (encoded with tree_encoding) 

3198 tree_encoding: Encoding used for tree paths (default: utf-8) 

3199 

3200 Returns: File system path. 

3201 """ 

3202 assert isinstance(tree_path, bytes) 

3203 if os_sep_bytes != b"/": 

3204 sep_corrected_path = tree_path.replace(b"/", os_sep_bytes) 

3205 else: 

3206 sep_corrected_path = tree_path 

3207 

3208 # On Windows, decode tree-encoded bytes to a str so they can flow into 

3209 # the wide-char Win32 APIs via Python's filesystem layer. For UTF-8 

3210 # (the default tree encoding) we use a lossy decoder that matches C 

3211 # git's xutftowcsn fallbacks; for other encodings we let UnicodeDecodeError 

3212 # propagate rather than silently producing a corrupt path. 

3213 if sys.platform == "win32": 

3214 if tree_encoding == "utf-8": 

3215 tree_path_str = _decode_utf8_with_fallback(sep_corrected_path) 

3216 else: 

3217 tree_path_str = sep_corrected_path.decode(tree_encoding) 

3218 sep_corrected_path = os.fsencode(tree_path_str) 

3219 

3220 return os.path.join(root_path, sep_corrected_path) 

3221 

3222 

3223def _fs_to_tree_path(fs_path: str | bytes, tree_encoding: str = "utf-8") -> bytes: 

3224 """Convert a file system path to a git tree path. 

3225 

3226 Args: 

3227 fs_path: File system path. 

3228 tree_encoding: Encoding to use for tree paths (default: utf-8) 

3229 

3230 Returns: Git tree path as bytes (encoded with tree_encoding) 

3231 """ 

3232 if not isinstance(fs_path, bytes): 

3233 fs_path_bytes = os.fsencode(fs_path) 

3234 else: 

3235 fs_path_bytes = fs_path 

3236 

3237 # On Windows the on-disk filename is a UTF-16 wide string; Python gives 

3238 # us either str (already decoded) or bytes encoded via the filesystem 

3239 # codec. Normalise to str, then encode under the tree encoding so the 

3240 # resulting tree path is plain UTF-8. This matches C git's xwcstoutf, 

3241 # which is just WideCharToMultiByte(CP_UTF8) — it makes no attempt to 

3242 # reverse the xutftowcsn fallbacks, so a file that was checked out from 

3243 # a tree path with invalid UTF-8 will read back as the lossy form (the 

3244 # same divergence C git exhibits, documented as a one-way mapping). 

3245 if sys.platform == "win32": 

3246 fs_path_str = os.fsdecode(fs_path_bytes) 

3247 fs_path_bytes = fs_path_str.encode(tree_encoding) 

3248 

3249 if os_sep_bytes != b"/": 

3250 tree_path = fs_path_bytes.replace(os_sep_bytes, b"/") 

3251 else: 

3252 tree_path = fs_path_bytes 

3253 return tree_path 

3254 

3255 

3256def index_entry_from_directory(st: os.stat_result, path: bytes) -> IndexEntry | None: 

3257 """Create an index entry for a directory. 

3258 

3259 This is only used for submodules (directories containing .git). 

3260 

3261 Args: 

3262 st: Stat result for the directory 

3263 path: Path to the directory 

3264 

3265 Returns: 

3266 IndexEntry for a submodule, or None if not a submodule 

3267 """ 

3268 if os.path.exists(os.path.join(path, b".git")): 

3269 head = read_submodule_head(path) 

3270 if head is None: 

3271 return None 

3272 return index_entry_from_stat(st, head, mode=S_IFGITLINK) 

3273 return None 

3274 

3275 

3276def index_entry_from_path( 

3277 path: bytes, object_store: ObjectContainer | None = None 

3278) -> IndexEntry | None: 

3279 """Create an index from a filesystem path. 

3280 

3281 This returns an index value for files, symlinks 

3282 and tree references. for directories and 

3283 non-existent files it returns None 

3284 

3285 Args: 

3286 path: Path to create an index entry for 

3287 object_store: Optional object store to 

3288 save new blobs in 

3289 Returns: An index entry; None for directories 

3290 """ 

3291 assert isinstance(path, bytes) 

3292 st = os.lstat(path) 

3293 if stat.S_ISDIR(st.st_mode): 

3294 return index_entry_from_directory(st, path) 

3295 

3296 if stat.S_ISREG(st.st_mode) or stat.S_ISLNK(st.st_mode): 

3297 blob = blob_from_path_and_stat(path, st) 

3298 if object_store is not None: 

3299 object_store.add_object(blob) 

3300 return index_entry_from_stat(st, blob.id) 

3301 

3302 return None 

3303 

3304 

3305def iter_fresh_entries( 

3306 paths: Iterable[bytes], 

3307 root_path: bytes, 

3308 object_store: ObjectContainer | None = None, 

3309) -> Iterator[tuple[bytes, IndexEntry | None]]: 

3310 """Iterate over current versions of index entries on disk. 

3311 

3312 Args: 

3313 paths: Paths to iterate over 

3314 root_path: Root path to access from 

3315 object_store: Optional store to save new blobs in 

3316 Returns: Iterator over path, index_entry 

3317 """ 

3318 for path in paths: 

3319 p = _tree_to_fs_path(root_path, path) 

3320 try: 

3321 entry = index_entry_from_path(p, object_store=object_store) 

3322 except (FileNotFoundError, IsADirectoryError): 

3323 entry = None 

3324 yield path, entry 

3325 

3326 

3327def iter_fresh_objects( 

3328 paths: Iterable[bytes], 

3329 root_path: bytes, 

3330 include_deleted: bool = False, 

3331 object_store: ObjectContainer | None = None, 

3332) -> Iterator[tuple[bytes, ObjectID | None, int | None]]: 

3333 """Iterate over versions of objects on disk referenced by index. 

3334 

3335 Args: 

3336 paths: Paths to check 

3337 root_path: Root path to access from 

3338 include_deleted: Include deleted entries with sha and 

3339 mode set to None 

3340 object_store: Optional object store to report new items to 

3341 Returns: Iterator over path, sha, mode 

3342 """ 

3343 for path, entry in iter_fresh_entries(paths, root_path, object_store=object_store): 

3344 if entry is None: 

3345 if include_deleted: 

3346 yield path, None, None 

3347 else: 

3348 yield path, entry.sha, cleanup_mode(entry.mode) 

3349 

3350 

3351def refresh_index(index: Index, root_path: bytes) -> None: 

3352 """Refresh the contents of an index. 

3353 

3354 This is the equivalent to running 'git commit -a'. 

3355 

3356 Args: 

3357 index: Index to update 

3358 root_path: Root filesystem path 

3359 """ 

3360 for path, entry in iter_fresh_entries(index, root_path): 

3361 if entry: 

3362 index[path] = entry 

3363 

3364 

3365class locked_index: 

3366 """Lock the index while making modifications. 

3367 

3368 Works as a context manager. 

3369 """ 

3370 

3371 _file: "_GitFile" 

3372 

3373 def __init__(self, path: bytes | str) -> None: 

3374 """Initialize locked_index.""" 

3375 self._path = path 

3376 

3377 def __enter__(self) -> Index: 

3378 """Enter context manager and lock index.""" 

3379 f = GitFile(self._path, "wb") 

3380 self._file = f 

3381 self._index = Index(self._path) 

3382 return self._index 

3383 

3384 def __exit__( 

3385 self, 

3386 exc_type: type | None, 

3387 exc_value: BaseException | None, 

3388 traceback: types.TracebackType | None, 

3389 ) -> None: 

3390 """Exit context manager and unlock index.""" 

3391 if exc_type is not None: 

3392 self._file.abort() 

3393 return 

3394 try: 

3395 f = SHA1Writer(self._file) 

3396 write_index_dict(f, self._index._byname) 

3397 except BaseException: 

3398 self._file.abort() 

3399 else: 

3400 f.close()