Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/dulwich/index.py: 31%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

1194 statements  

1# index.py -- File parser/writer for the git index file 

2# Copyright (C) 2008-2013 Jelmer Vernooij <jelmer@jelmer.uk> 

3# 

4# SPDX-License-Identifier: Apache-2.0 OR GPL-2.0-or-later 

5# Dulwich is dual-licensed under the Apache License, Version 2.0 and the GNU 

6# General Public License as published by the Free Software Foundation; version 2.0 

7# or (at your option) any later version. You can redistribute it and/or 

8# modify it under the terms of either of these two licenses. 

9# 

10# Unless required by applicable law or agreed to in writing, software 

11# distributed under the License is distributed on an "AS IS" BASIS, 

12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

13# See the License for the specific language governing permissions and 

14# limitations under the License. 

15# 

16# You should have received a copy of the licenses; if not, see 

17# <http://www.gnu.org/licenses/> for a copy of the GNU General Public License 

18# and <http://www.apache.org/licenses/LICENSE-2.0> for a copy of the Apache 

19# License, Version 2.0. 

20# 

21 

22"""Parser for the git index file format.""" 

23 

24import errno 

25import os 

26import shutil 

27import stat 

28import struct 

29import sys 

30import types 

31from collections.abc import ( 

32 Callable, 

33 Generator, 

34 Iterable, 

35 Iterator, 

36 Mapping, 

37 Sequence, 

38 Set, 

39) 

40from dataclasses import dataclass 

41from enum import Enum 

42from typing import ( 

43 IO, 

44 TYPE_CHECKING, 

45 Any, 

46 BinaryIO, 

47) 

48 

49if TYPE_CHECKING: 

50 from .config import Config 

51 from .diff_tree import TreeChange 

52 from .file import _GitFile 

53 from .filters import FilterBlobNormalizer 

54 from .object_store import BaseObjectStore 

55 from .repo import Repo 

56 

57from .file import GitFile 

58from .object_store import iter_tree_contents 

59from .objects import ( 

60 S_IFGITLINK, 

61 S_ISGITLINK, 

62 Blob, 

63 ObjectID, 

64 Tree, 

65 TreeEntry, 

66 hex_to_sha, 

67 sha_to_hex, 

68) 

69from .pack import ObjectContainer, SHA1Reader, SHA1Writer 

70 

71# Type alias for recursive tree structure used in commit_tree 

72TreeDict = dict[bytes, "TreeDict | tuple[int, bytes]"] 

73 

74# 2-bit stage (during merge) 

75FLAG_STAGEMASK = 0x3000 

76FLAG_STAGESHIFT = 12 

77FLAG_NAMEMASK = 0x0FFF 

78 

79# assume-valid 

80FLAG_VALID = 0x8000 

81 

82# extended flag (must be zero in version 2) 

83FLAG_EXTENDED = 0x4000 

84 

85# used by sparse checkout 

86EXTENDED_FLAG_SKIP_WORKTREE = 0x4000 

87 

88# used by "git add -N" 

89EXTENDED_FLAG_INTEND_TO_ADD = 0x2000 

90 

91DEFAULT_VERSION = 2 

92 

93# Index extension signatures 

94TREE_EXTENSION = b"TREE" 

95REUC_EXTENSION = b"REUC" 

96UNTR_EXTENSION = b"UNTR" 

97EOIE_EXTENSION = b"EOIE" 

98IEOT_EXTENSION = b"IEOT" 

99SDIR_EXTENSION = b"sdir" # Sparse directory extension 

100 

101 

102def _encode_varint(value: int) -> bytes: 

103 """Encode an integer using variable-width encoding. 

104 

105 Same format as used for OFS_DELTA pack entries and index v4 path compression. 

106 Uses 7 bits per byte, with the high bit indicating continuation. 

107 

108 Args: 

109 value: Integer to encode 

110 Returns: 

111 Encoded bytes 

112 """ 

113 if value == 0: 

114 return b"\x00" 

115 

116 result = [] 

117 while value > 0: 

118 byte = value & 0x7F # Take lower 7 bits 

119 value >>= 7 

120 if value > 0: 

121 byte |= 0x80 # Set continuation bit 

122 result.append(byte) 

123 

124 return bytes(result) 

125 

126 

127def _decode_varint(data: bytes, offset: int = 0) -> tuple[int, int]: 

128 """Decode a variable-width encoded integer. 

129 

130 Args: 

131 data: Bytes to decode from 

132 offset: Starting offset in data 

133 Returns: 

134 tuple of (decoded_value, new_offset) 

135 """ 

136 value = 0 

137 shift = 0 

138 pos = offset 

139 

140 while pos < len(data): 

141 byte = data[pos] 

142 pos += 1 

143 value |= (byte & 0x7F) << shift 

144 shift += 7 

145 if not (byte & 0x80): # No continuation bit 

146 break 

147 

148 return value, pos 

149 

150 

151def _compress_path(path: bytes, previous_path: bytes) -> bytes: 

152 """Compress a path relative to the previous path for index version 4. 

153 

154 Args: 

155 path: Path to compress 

156 previous_path: Previous path for comparison 

157 Returns: 

158 Compressed path data (varint prefix_len + suffix) 

159 """ 

160 # Find the common prefix length 

161 common_len = 0 

162 min_len = min(len(path), len(previous_path)) 

163 

164 for i in range(min_len): 

165 if path[i] == previous_path[i]: 

166 common_len += 1 

167 else: 

168 break 

169 

170 # The number of bytes to remove from the end of previous_path 

171 # to get the common prefix 

172 remove_len = len(previous_path) - common_len 

173 

174 # The suffix to append 

175 suffix = path[common_len:] 

176 

177 # Encode: varint(remove_len) + suffix + NUL 

178 return _encode_varint(remove_len) + suffix + b"\x00" 

179 

180 

181def _decompress_path( 

182 data: bytes, offset: int, previous_path: bytes 

183) -> tuple[bytes, int]: 

184 """Decompress a path from index version 4 compressed format. 

185 

186 Args: 

187 data: Raw data containing compressed path 

188 offset: Starting offset in data 

189 previous_path: Previous path for decompression 

190 Returns: 

191 tuple of (decompressed_path, new_offset) 

192 """ 

193 # Decode the number of bytes to remove from previous path 

194 remove_len, new_offset = _decode_varint(data, offset) 

195 

196 # Find the NUL terminator for the suffix 

197 suffix_start = new_offset 

198 suffix_end = suffix_start 

199 while suffix_end < len(data) and data[suffix_end] != 0: 

200 suffix_end += 1 

201 

202 if suffix_end >= len(data): 

203 raise ValueError("Unterminated path suffix in compressed entry") 

204 

205 suffix = data[suffix_start:suffix_end] 

206 new_offset = suffix_end + 1 # Skip the NUL terminator 

207 

208 # Reconstruct the path 

209 if remove_len > len(previous_path): 

210 raise ValueError( 

211 f"Invalid path compression: trying to remove {remove_len} bytes from {len(previous_path)}-byte path" 

212 ) 

213 

214 prefix = previous_path[:-remove_len] if remove_len > 0 else previous_path 

215 path = prefix + suffix 

216 

217 return path, new_offset 

218 

219 

220def _decompress_path_from_stream( 

221 f: BinaryIO, previous_path: bytes 

222) -> tuple[bytes, int]: 

223 """Decompress a path from index version 4 compressed format, reading from stream. 

224 

225 Args: 

226 f: File-like object to read from 

227 previous_path: Previous path for decompression 

228 Returns: 

229 tuple of (decompressed_path, bytes_consumed) 

230 """ 

231 # Decode the varint for remove_len by reading byte by byte 

232 remove_len = 0 

233 shift = 0 

234 bytes_consumed = 0 

235 

236 while True: 

237 byte_data = f.read(1) 

238 if not byte_data: 

239 raise ValueError("Unexpected end of file while reading varint") 

240 byte = byte_data[0] 

241 bytes_consumed += 1 

242 remove_len |= (byte & 0x7F) << shift 

243 shift += 7 

244 if not (byte & 0x80): # No continuation bit 

245 break 

246 

247 # Read the suffix until NUL terminator 

248 suffix = b"" 

249 while True: 

250 byte_data = f.read(1) 

251 if not byte_data: 

252 raise ValueError("Unexpected end of file while reading path suffix") 

253 byte = byte_data[0] 

254 bytes_consumed += 1 

255 if byte == 0: # NUL terminator 

256 break 

257 suffix += bytes([byte]) 

258 

259 # Reconstruct the path 

260 if remove_len > len(previous_path): 

261 raise ValueError( 

262 f"Invalid path compression: trying to remove {remove_len} bytes from {len(previous_path)}-byte path" 

263 ) 

264 

265 prefix = previous_path[:-remove_len] if remove_len > 0 else previous_path 

266 path = prefix + suffix 

267 

268 return path, bytes_consumed 

269 

270 

271class Stage(Enum): 

272 """Represents the stage of an index entry during merge conflicts.""" 

273 

274 NORMAL = 0 

275 MERGE_CONFLICT_ANCESTOR = 1 

276 MERGE_CONFLICT_THIS = 2 

277 MERGE_CONFLICT_OTHER = 3 

278 

279 

280@dataclass 

281class SerializedIndexEntry: 

282 """Represents a serialized index entry as stored in the index file. 

283 

284 This dataclass holds the raw data for an index entry before it's 

285 parsed into the more user-friendly IndexEntry format. 

286 """ 

287 

288 name: bytes 

289 ctime: int | float | tuple[int, int] 

290 mtime: int | float | tuple[int, int] 

291 dev: int 

292 ino: int 

293 mode: int 

294 uid: int 

295 gid: int 

296 size: int 

297 sha: bytes 

298 flags: int 

299 extended_flags: int 

300 

301 def stage(self) -> Stage: 

302 """Extract the stage from the flags field. 

303 

304 Returns: 

305 Stage enum value indicating merge conflict state 

306 """ 

307 return Stage((self.flags & FLAG_STAGEMASK) >> FLAG_STAGESHIFT) 

308 

309 def is_sparse_dir(self) -> bool: 

310 """Check if this entry represents a sparse directory. 

311 

312 A sparse directory entry is a collapsed representation of an entire 

313 directory tree in a sparse index. It has: 

314 - Directory mode (0o040000) 

315 - SKIP_WORKTREE flag set 

316 - Path ending with '/' 

317 - SHA pointing to a tree object 

318 

319 Returns: 

320 True if entry is a sparse directory entry 

321 """ 

322 return ( 

323 stat.S_ISDIR(self.mode) 

324 and bool(self.extended_flags & EXTENDED_FLAG_SKIP_WORKTREE) 

325 and self.name.endswith(b"/") 

326 ) 

327 

328 

329@dataclass 

330class IndexExtension: 

331 """Base class for index extensions.""" 

332 

333 signature: bytes 

334 data: bytes 

335 

336 @classmethod 

337 def from_raw(cls, signature: bytes, data: bytes) -> "IndexExtension": 

338 """Create an extension from raw data. 

339 

340 Args: 

341 signature: 4-byte extension signature 

342 data: Extension data 

343 Returns: 

344 Parsed extension object 

345 """ 

346 if signature == TREE_EXTENSION: 

347 return TreeExtension.from_bytes(data) 

348 elif signature == REUC_EXTENSION: 

349 return ResolveUndoExtension.from_bytes(data) 

350 elif signature == UNTR_EXTENSION: 

351 return UntrackedExtension.from_bytes(data) 

352 elif signature == SDIR_EXTENSION: 

353 return SparseDirExtension.from_bytes(data) 

354 else: 

355 # Unknown extension - just store raw data 

356 return cls(signature, data) 

357 

358 def to_bytes(self) -> bytes: 

359 """Serialize extension to bytes.""" 

360 return self.data 

361 

362 

363class TreeExtension(IndexExtension): 

364 """Tree cache extension.""" 

365 

366 def __init__(self, entries: list[tuple[bytes, bytes, int]]) -> None: 

367 """Initialize TreeExtension. 

368 

369 Args: 

370 entries: List of tree cache entries (path, sha, flags) 

371 """ 

372 self.entries = entries 

373 super().__init__(TREE_EXTENSION, b"") 

374 

375 @classmethod 

376 def from_bytes(cls, data: bytes) -> "TreeExtension": 

377 """Parse TreeExtension from bytes. 

378 

379 Args: 

380 data: Raw bytes to parse 

381 

382 Returns: 

383 TreeExtension instance 

384 """ 

385 # TODO: Implement tree cache parsing 

386 return cls([]) 

387 

388 def to_bytes(self) -> bytes: 

389 """Serialize TreeExtension to bytes. 

390 

391 Returns: 

392 Serialized extension data 

393 """ 

394 # TODO: Implement tree cache serialization 

395 return b"" 

396 

397 

398class ResolveUndoExtension(IndexExtension): 

399 """Resolve undo extension for recording merge conflicts.""" 

400 

401 def __init__(self, entries: list[tuple[bytes, list[tuple[int, bytes]]]]) -> None: 

402 """Initialize ResolveUndoExtension. 

403 

404 Args: 

405 entries: List of (path, stages) where stages is a list of (stage, sha) tuples 

406 """ 

407 self.entries = entries 

408 super().__init__(REUC_EXTENSION, b"") 

409 

410 @classmethod 

411 def from_bytes(cls, data: bytes) -> "ResolveUndoExtension": 

412 """Parse ResolveUndoExtension from bytes. 

413 

414 Args: 

415 data: Raw bytes to parse 

416 

417 Returns: 

418 ResolveUndoExtension instance 

419 """ 

420 # TODO: Implement resolve undo parsing 

421 return cls([]) 

422 

423 def to_bytes(self) -> bytes: 

424 """Serialize ResolveUndoExtension to bytes. 

425 

426 Returns: 

427 Serialized extension data 

428 """ 

429 # TODO: Implement resolve undo serialization 

430 return b"" 

431 

432 

433class UntrackedExtension(IndexExtension): 

434 """Untracked cache extension.""" 

435 

436 def __init__(self, data: bytes) -> None: 

437 """Initialize UntrackedExtension. 

438 

439 Args: 

440 data: Raw untracked cache data 

441 """ 

442 super().__init__(UNTR_EXTENSION, data) 

443 

444 @classmethod 

445 def from_bytes(cls, data: bytes) -> "UntrackedExtension": 

446 """Parse UntrackedExtension from bytes. 

447 

448 Args: 

449 data: Raw bytes to parse 

450 

451 Returns: 

452 UntrackedExtension instance 

453 """ 

454 return cls(data) 

455 

456 

457class SparseDirExtension(IndexExtension): 

458 """Sparse directory extension. 

459 

460 This extension indicates that the index contains sparse directory entries. 

461 Tools that don't understand sparse index should avoid interacting with 

462 the index when this extension is present. 

463 

464 The extension data is empty - its presence is the signal. 

465 """ 

466 

467 def __init__(self) -> None: 

468 """Initialize SparseDirExtension.""" 

469 super().__init__(SDIR_EXTENSION, b"") 

470 

471 @classmethod 

472 def from_bytes(cls, data: bytes) -> "SparseDirExtension": 

473 """Parse SparseDirExtension from bytes. 

474 

475 Args: 

476 data: Raw bytes to parse (should be empty) 

477 

478 Returns: 

479 SparseDirExtension instance 

480 """ 

481 return cls() 

482 

483 def to_bytes(self) -> bytes: 

484 """Serialize SparseDirExtension to bytes. 

485 

486 Returns: 

487 Empty bytes (extension presence is the signal) 

488 """ 

489 return b"" 

490 

491 

492@dataclass 

493class IndexEntry: 

494 """Represents an entry in the Git index. 

495 

496 This is a higher-level representation of an index entry that includes 

497 parsed data and convenience methods. 

498 """ 

499 

500 ctime: int | float | tuple[int, int] 

501 mtime: int | float | tuple[int, int] 

502 dev: int 

503 ino: int 

504 mode: int 

505 uid: int 

506 gid: int 

507 size: int 

508 sha: bytes 

509 flags: int = 0 

510 extended_flags: int = 0 

511 

512 @classmethod 

513 def from_serialized(cls, serialized: SerializedIndexEntry) -> "IndexEntry": 

514 """Create an IndexEntry from a SerializedIndexEntry. 

515 

516 Args: 

517 serialized: SerializedIndexEntry to convert 

518 

519 Returns: 

520 New IndexEntry instance 

521 """ 

522 return cls( 

523 ctime=serialized.ctime, 

524 mtime=serialized.mtime, 

525 dev=serialized.dev, 

526 ino=serialized.ino, 

527 mode=serialized.mode, 

528 uid=serialized.uid, 

529 gid=serialized.gid, 

530 size=serialized.size, 

531 sha=serialized.sha, 

532 flags=serialized.flags, 

533 extended_flags=serialized.extended_flags, 

534 ) 

535 

536 def serialize(self, name: bytes, stage: Stage) -> SerializedIndexEntry: 

537 """Serialize this entry with a given name and stage. 

538 

539 Args: 

540 name: Path name for the entry 

541 stage: Merge conflict stage 

542 

543 Returns: 

544 SerializedIndexEntry ready for writing to disk 

545 """ 

546 # Clear out any existing stage bits, then set them from the Stage. 

547 new_flags = self.flags & ~FLAG_STAGEMASK 

548 new_flags |= stage.value << FLAG_STAGESHIFT 

549 return SerializedIndexEntry( 

550 name=name, 

551 ctime=self.ctime, 

552 mtime=self.mtime, 

553 dev=self.dev, 

554 ino=self.ino, 

555 mode=self.mode, 

556 uid=self.uid, 

557 gid=self.gid, 

558 size=self.size, 

559 sha=self.sha, 

560 flags=new_flags, 

561 extended_flags=self.extended_flags, 

562 ) 

563 

564 def stage(self) -> Stage: 

565 """Get the merge conflict stage of this entry. 

566 

567 Returns: 

568 Stage enum value 

569 """ 

570 return Stage((self.flags & FLAG_STAGEMASK) >> FLAG_STAGESHIFT) 

571 

572 @property 

573 def skip_worktree(self) -> bool: 

574 """Return True if the skip-worktree bit is set in extended_flags.""" 

575 return bool(self.extended_flags & EXTENDED_FLAG_SKIP_WORKTREE) 

576 

577 def set_skip_worktree(self, skip: bool = True) -> None: 

578 """Helper method to set or clear the skip-worktree bit in extended_flags. 

579 

580 Also sets FLAG_EXTENDED in self.flags if needed. 

581 """ 

582 if skip: 

583 # Turn on the skip-worktree bit 

584 self.extended_flags |= EXTENDED_FLAG_SKIP_WORKTREE 

585 # Also ensure the main 'extended' bit is set in flags 

586 self.flags |= FLAG_EXTENDED 

587 else: 

588 # Turn off the skip-worktree bit 

589 self.extended_flags &= ~EXTENDED_FLAG_SKIP_WORKTREE 

590 # Optionally unset the main extended bit if no extended flags remain 

591 if self.extended_flags == 0: 

592 self.flags &= ~FLAG_EXTENDED 

593 

594 def is_sparse_dir(self, name: bytes) -> bool: 

595 """Check if this entry represents a sparse directory. 

596 

597 A sparse directory entry is a collapsed representation of an entire 

598 directory tree in a sparse index. It has: 

599 - Directory mode (0o040000) 

600 - SKIP_WORKTREE flag set 

601 - Path ending with '/' 

602 - SHA pointing to a tree object 

603 

604 Args: 

605 name: The path name for this entry (IndexEntry doesn't store name) 

606 

607 Returns: 

608 True if entry is a sparse directory entry 

609 """ 

610 return ( 

611 stat.S_ISDIR(self.mode) 

612 and bool(self.extended_flags & EXTENDED_FLAG_SKIP_WORKTREE) 

613 and name.endswith(b"/") 

614 ) 

615 

616 

617class ConflictedIndexEntry: 

618 """Index entry that represents a conflict.""" 

619 

620 ancestor: IndexEntry | None 

621 this: IndexEntry | None 

622 other: IndexEntry | None 

623 

624 def __init__( 

625 self, 

626 ancestor: IndexEntry | None = None, 

627 this: IndexEntry | None = None, 

628 other: IndexEntry | None = None, 

629 ) -> None: 

630 """Initialize ConflictedIndexEntry. 

631 

632 Args: 

633 ancestor: The common ancestor entry 

634 this: The current branch entry 

635 other: The other branch entry 

636 """ 

637 self.ancestor = ancestor 

638 self.this = this 

639 self.other = other 

640 

641 

642class UnmergedEntries(Exception): 

643 """Unmerged entries exist in the index.""" 

644 

645 

646def pathsplit(path: bytes) -> tuple[bytes, bytes]: 

647 """Split a /-delimited path into a directory part and a basename. 

648 

649 Args: 

650 path: The path to split. 

651 

652 Returns: 

653 Tuple with directory name and basename 

654 """ 

655 try: 

656 (dirname, basename) = path.rsplit(b"/", 1) 

657 except ValueError: 

658 return (b"", path) 

659 else: 

660 return (dirname, basename) 

661 

662 

663def pathjoin(*args: bytes) -> bytes: 

664 """Join a /-delimited path.""" 

665 return b"/".join([p for p in args if p]) 

666 

667 

668def read_cache_time(f: BinaryIO) -> tuple[int, int]: 

669 """Read a cache time. 

670 

671 Args: 

672 f: File-like object to read from 

673 Returns: 

674 Tuple with seconds and nanoseconds 

675 """ 

676 return struct.unpack(">LL", f.read(8)) 

677 

678 

679def write_cache_time(f: IO[bytes], t: int | float | tuple[int, int]) -> None: 

680 """Write a cache time. 

681 

682 Args: 

683 f: File-like object to write to 

684 t: Time to write (as int, float or tuple with secs and nsecs) 

685 """ 

686 if isinstance(t, int): 

687 t = (t, 0) 

688 elif isinstance(t, float): 

689 (secs, nsecs) = divmod(t, 1.0) 

690 t = (int(secs), int(nsecs * 1000000000)) 

691 elif not isinstance(t, tuple): 

692 raise TypeError(t) 

693 f.write(struct.pack(">LL", *t)) 

694 

695 

696def read_cache_entry( 

697 f: BinaryIO, version: int, previous_path: bytes = b"" 

698) -> SerializedIndexEntry: 

699 """Read an entry from a cache file. 

700 

701 Args: 

702 f: File-like object to read from 

703 version: Index version 

704 previous_path: Previous entry's path (for version 4 compression) 

705 """ 

706 beginoffset = f.tell() 

707 ctime = read_cache_time(f) 

708 mtime = read_cache_time(f) 

709 ( 

710 dev, 

711 ino, 

712 mode, 

713 uid, 

714 gid, 

715 size, 

716 sha, 

717 flags, 

718 ) = struct.unpack(">LLLLLL20sH", f.read(20 + 4 * 6 + 2)) 

719 if flags & FLAG_EXTENDED: 

720 if version < 3: 

721 raise AssertionError("extended flag set in index with version < 3") 

722 (extended_flags,) = struct.unpack(">H", f.read(2)) 

723 else: 

724 extended_flags = 0 

725 

726 if version >= 4: 

727 # Version 4: paths are always compressed (name_len should be 0) 

728 name, _consumed = _decompress_path_from_stream(f, previous_path) 

729 else: 

730 # Versions < 4: regular name reading 

731 name = f.read(flags & FLAG_NAMEMASK) 

732 

733 # Padding: 

734 if version < 4: 

735 real_size = (f.tell() - beginoffset + 8) & ~7 

736 f.read((beginoffset + real_size) - f.tell()) 

737 

738 return SerializedIndexEntry( 

739 name, 

740 ctime, 

741 mtime, 

742 dev, 

743 ino, 

744 mode, 

745 uid, 

746 gid, 

747 size, 

748 sha_to_hex(sha), 

749 flags & ~FLAG_NAMEMASK, 

750 extended_flags, 

751 ) 

752 

753 

754def write_cache_entry( 

755 f: IO[bytes], entry: SerializedIndexEntry, version: int, previous_path: bytes = b"" 

756) -> None: 

757 """Write an index entry to a file. 

758 

759 Args: 

760 f: File object 

761 entry: IndexEntry to write 

762 version: Index format version 

763 previous_path: Previous entry's path (for version 4 compression) 

764 """ 

765 beginoffset = f.tell() 

766 write_cache_time(f, entry.ctime) 

767 write_cache_time(f, entry.mtime) 

768 

769 if version >= 4: 

770 # Version 4: use compression but set name_len to actual filename length 

771 # This matches how C Git implements index v4 flags 

772 compressed_path = _compress_path(entry.name, previous_path) 

773 flags = len(entry.name) | (entry.flags & ~FLAG_NAMEMASK) 

774 else: 

775 # Versions < 4: include actual name length 

776 flags = len(entry.name) | (entry.flags & ~FLAG_NAMEMASK) 

777 

778 if entry.extended_flags: 

779 flags |= FLAG_EXTENDED 

780 if flags & FLAG_EXTENDED and version is not None and version < 3: 

781 raise AssertionError("unable to use extended flags in version < 3") 

782 

783 f.write( 

784 struct.pack( 

785 b">LLLLLL20sH", 

786 entry.dev & 0xFFFFFFFF, 

787 entry.ino & 0xFFFFFFFF, 

788 entry.mode, 

789 entry.uid, 

790 entry.gid, 

791 entry.size, 

792 hex_to_sha(entry.sha), 

793 flags, 

794 ) 

795 ) 

796 if flags & FLAG_EXTENDED: 

797 f.write(struct.pack(b">H", entry.extended_flags)) 

798 

799 if version >= 4: 

800 # Version 4: always write compressed path 

801 f.write(compressed_path) 

802 else: 

803 # Versions < 4: write regular path and padding 

804 f.write(entry.name) 

805 real_size = (f.tell() - beginoffset + 8) & ~7 

806 f.write(b"\0" * ((beginoffset + real_size) - f.tell())) 

807 

808 

809class UnsupportedIndexFormat(Exception): 

810 """An unsupported index format was encountered.""" 

811 

812 def __init__(self, version: int) -> None: 

813 """Initialize UnsupportedIndexFormat exception. 

814 

815 Args: 

816 version: The unsupported index format version 

817 """ 

818 self.index_format_version = version 

819 

820 

821def read_index_header(f: BinaryIO) -> tuple[int, int]: 

822 """Read an index header from a file. 

823 

824 Returns: 

825 tuple of (version, num_entries) 

826 """ 

827 header = f.read(4) 

828 if header != b"DIRC": 

829 raise AssertionError(f"Invalid index file header: {header!r}") 

830 (version, num_entries) = struct.unpack(b">LL", f.read(4 * 2)) 

831 if version not in (1, 2, 3, 4): 

832 raise UnsupportedIndexFormat(version) 

833 return version, num_entries 

834 

835 

836def write_index_extension(f: IO[bytes], extension: IndexExtension) -> None: 

837 """Write an index extension. 

838 

839 Args: 

840 f: File-like object to write to 

841 extension: Extension to write 

842 """ 

843 data = extension.to_bytes() 

844 f.write(extension.signature) 

845 f.write(struct.pack(">I", len(data))) 

846 f.write(data) 

847 

848 

849def read_index(f: BinaryIO) -> Iterator[SerializedIndexEntry]: 

850 """Read an index file, yielding the individual entries.""" 

851 version, num_entries = read_index_header(f) 

852 previous_path = b"" 

853 for i in range(num_entries): 

854 entry = read_cache_entry(f, version, previous_path) 

855 previous_path = entry.name 

856 yield entry 

857 

858 

859def read_index_dict_with_version( 

860 f: BinaryIO, 

861) -> tuple[dict[bytes, IndexEntry | ConflictedIndexEntry], int, list[IndexExtension]]: 

862 """Read an index file and return it as a dictionary along with the version. 

863 

864 Returns: 

865 tuple of (entries_dict, version, extensions) 

866 """ 

867 version, num_entries = read_index_header(f) 

868 

869 ret: dict[bytes, IndexEntry | ConflictedIndexEntry] = {} 

870 previous_path = b"" 

871 for i in range(num_entries): 

872 entry = read_cache_entry(f, version, previous_path) 

873 previous_path = entry.name 

874 stage = entry.stage() 

875 if stage == Stage.NORMAL: 

876 ret[entry.name] = IndexEntry.from_serialized(entry) 

877 else: 

878 existing = ret.setdefault(entry.name, ConflictedIndexEntry()) 

879 if isinstance(existing, IndexEntry): 

880 raise AssertionError(f"Non-conflicted entry for {entry.name!r} exists") 

881 if stage == Stage.MERGE_CONFLICT_ANCESTOR: 

882 existing.ancestor = IndexEntry.from_serialized(entry) 

883 elif stage == Stage.MERGE_CONFLICT_THIS: 

884 existing.this = IndexEntry.from_serialized(entry) 

885 elif stage == Stage.MERGE_CONFLICT_OTHER: 

886 existing.other = IndexEntry.from_serialized(entry) 

887 

888 # Read extensions 

889 extensions = [] 

890 while True: 

891 # Check if we're at the end (20 bytes before EOF for SHA checksum) 

892 current_pos = f.tell() 

893 f.seek(0, 2) # EOF 

894 eof_pos = f.tell() 

895 f.seek(current_pos) 

896 

897 if current_pos >= eof_pos - 20: 

898 break 

899 

900 # Try to read extension signature 

901 signature = f.read(4) 

902 if len(signature) < 4: 

903 break 

904 

905 # Check if it's a valid extension signature (4 uppercase letters) 

906 if not all(65 <= b <= 90 for b in signature): 

907 # Not an extension, seek back 

908 f.seek(-4, 1) 

909 break 

910 

911 # Read extension size 

912 size_data = f.read(4) 

913 if len(size_data) < 4: 

914 break 

915 size = struct.unpack(">I", size_data)[0] 

916 

917 # Read extension data 

918 data = f.read(size) 

919 if len(data) < size: 

920 break 

921 

922 extension = IndexExtension.from_raw(signature, data) 

923 extensions.append(extension) 

924 

925 return ret, version, extensions 

926 

927 

928def read_index_dict( 

929 f: BinaryIO, 

930) -> dict[bytes, IndexEntry | ConflictedIndexEntry]: 

931 """Read an index file and return it as a dictionary. 

932 

933 Dict Key is tuple of path and stage number, as 

934 path alone is not unique 

935 Args: 

936 f: File object to read fromls. 

937 """ 

938 ret: dict[bytes, IndexEntry | ConflictedIndexEntry] = {} 

939 for entry in read_index(f): 

940 stage = entry.stage() 

941 if stage == Stage.NORMAL: 

942 ret[entry.name] = IndexEntry.from_serialized(entry) 

943 else: 

944 existing = ret.setdefault(entry.name, ConflictedIndexEntry()) 

945 if isinstance(existing, IndexEntry): 

946 raise AssertionError(f"Non-conflicted entry for {entry.name!r} exists") 

947 if stage == Stage.MERGE_CONFLICT_ANCESTOR: 

948 existing.ancestor = IndexEntry.from_serialized(entry) 

949 elif stage == Stage.MERGE_CONFLICT_THIS: 

950 existing.this = IndexEntry.from_serialized(entry) 

951 elif stage == Stage.MERGE_CONFLICT_OTHER: 

952 existing.other = IndexEntry.from_serialized(entry) 

953 return ret 

954 

955 

956def write_index( 

957 f: IO[bytes], 

958 entries: Sequence[SerializedIndexEntry], 

959 version: int | None = None, 

960 extensions: Sequence[IndexExtension] | None = None, 

961) -> None: 

962 """Write an index file. 

963 

964 Args: 

965 f: File-like object to write to 

966 version: Version number to write 

967 entries: Iterable over the entries to write 

968 extensions: Optional list of extensions to write 

969 """ 

970 if version is None: 

971 version = DEFAULT_VERSION 

972 # STEP 1: check if any extended_flags are set 

973 uses_extended_flags = any(e.extended_flags != 0 for e in entries) 

974 if uses_extended_flags and version < 3: 

975 # Force or bump the version to 3 

976 version = 3 

977 # The rest is unchanged, but you might insert a final check: 

978 if version < 3: 

979 # Double-check no extended flags appear 

980 for e in entries: 

981 if e.extended_flags != 0: 

982 raise AssertionError("Attempt to use extended flags in index < v3") 

983 # Proceed with the existing code to write the header and entries. 

984 f.write(b"DIRC") 

985 f.write(struct.pack(b">LL", version, len(entries))) 

986 previous_path = b"" 

987 for entry in entries: 

988 write_cache_entry(f, entry, version=version, previous_path=previous_path) 

989 previous_path = entry.name 

990 

991 # Write extensions 

992 if extensions: 

993 for extension in extensions: 

994 write_index_extension(f, extension) 

995 

996 

997def write_index_dict( 

998 f: IO[bytes], 

999 entries: Mapping[bytes, IndexEntry | ConflictedIndexEntry], 

1000 version: int | None = None, 

1001 extensions: Sequence[IndexExtension] | None = None, 

1002) -> None: 

1003 """Write an index file based on the contents of a dictionary. 

1004 

1005 being careful to sort by path and then by stage. 

1006 """ 

1007 entries_list = [] 

1008 for key in sorted(entries): 

1009 value = entries[key] 

1010 if isinstance(value, ConflictedIndexEntry): 

1011 if value.ancestor is not None: 

1012 entries_list.append( 

1013 value.ancestor.serialize(key, Stage.MERGE_CONFLICT_ANCESTOR) 

1014 ) 

1015 if value.this is not None: 

1016 entries_list.append( 

1017 value.this.serialize(key, Stage.MERGE_CONFLICT_THIS) 

1018 ) 

1019 if value.other is not None: 

1020 entries_list.append( 

1021 value.other.serialize(key, Stage.MERGE_CONFLICT_OTHER) 

1022 ) 

1023 else: 

1024 entries_list.append(value.serialize(key, Stage.NORMAL)) 

1025 

1026 write_index(f, entries_list, version=version, extensions=extensions) 

1027 

1028 

1029def cleanup_mode(mode: int) -> int: 

1030 """Cleanup a mode value. 

1031 

1032 This will return a mode that can be stored in a tree object. 

1033 

1034 Args: 

1035 mode: Mode to clean up. 

1036 

1037 Returns: 

1038 mode 

1039 """ 

1040 if stat.S_ISLNK(mode): 

1041 return stat.S_IFLNK 

1042 elif stat.S_ISDIR(mode): 

1043 return stat.S_IFDIR 

1044 elif S_ISGITLINK(mode): 

1045 return S_IFGITLINK 

1046 ret = stat.S_IFREG | 0o644 

1047 if mode & 0o100: 

1048 ret |= 0o111 

1049 return ret 

1050 

1051 

1052class Index: 

1053 """A Git Index file.""" 

1054 

1055 _byname: dict[bytes, IndexEntry | ConflictedIndexEntry] 

1056 

1057 def __init__( 

1058 self, 

1059 filename: bytes | str | os.PathLike[str], 

1060 read: bool = True, 

1061 skip_hash: bool = False, 

1062 version: int | None = None, 

1063 *, 

1064 file_mode: int | None = None, 

1065 ) -> None: 

1066 """Create an index object associated with the given filename. 

1067 

1068 Args: 

1069 filename: Path to the index file 

1070 read: Whether to initialize the index from the given file, should it exist. 

1071 skip_hash: Whether to skip SHA1 hash when writing (for manyfiles feature) 

1072 version: Index format version to use (None = auto-detect from file or use default) 

1073 file_mode: Optional file permission mask for shared repository 

1074 """ 

1075 self._filename = os.fspath(filename) 

1076 # TODO(jelmer): Store the version returned by read_index 

1077 self._version = version 

1078 self._skip_hash = skip_hash 

1079 self._file_mode = file_mode 

1080 self._extensions: list[IndexExtension] = [] 

1081 self.clear() 

1082 if read: 

1083 self.read() 

1084 

1085 @property 

1086 def path(self) -> bytes | str: 

1087 """Get the path to the index file. 

1088 

1089 Returns: 

1090 Path to the index file 

1091 """ 

1092 return self._filename 

1093 

1094 def __repr__(self) -> str: 

1095 """Return string representation of Index.""" 

1096 return f"{self.__class__.__name__}({self._filename!r})" 

1097 

1098 def write(self) -> None: 

1099 """Write current contents of index to disk.""" 

1100 mask = self._file_mode if self._file_mode is not None else 0o644 

1101 f = GitFile(self._filename, "wb", mask=mask) 

1102 try: 

1103 # Filter out extensions with no meaningful data 

1104 meaningful_extensions = [] 

1105 for ext in self._extensions: 

1106 # Skip extensions that have empty data 

1107 ext_data = ext.to_bytes() 

1108 if ext_data: 

1109 meaningful_extensions.append(ext) 

1110 

1111 if self._skip_hash: 

1112 # When skipHash is enabled, write the index without computing SHA1 

1113 write_index_dict( 

1114 f, 

1115 self._byname, 

1116 version=self._version, 

1117 extensions=meaningful_extensions, 

1118 ) 

1119 # Write 20 zero bytes instead of SHA1 

1120 f.write(b"\x00" * 20) 

1121 f.close() 

1122 else: 

1123 sha1_writer = SHA1Writer(f) 

1124 write_index_dict( 

1125 sha1_writer, 

1126 self._byname, 

1127 version=self._version, 

1128 extensions=meaningful_extensions, 

1129 ) 

1130 sha1_writer.close() 

1131 except: 

1132 f.close() 

1133 raise 

1134 

1135 def read(self) -> None: 

1136 """Read current contents of index from disk.""" 

1137 if not os.path.exists(self._filename): 

1138 return 

1139 f = GitFile(self._filename, "rb") 

1140 try: 

1141 sha1_reader = SHA1Reader(f) 

1142 entries, version, extensions = read_index_dict_with_version(sha1_reader) 

1143 self._version = version 

1144 self._extensions = extensions 

1145 self.update(entries) 

1146 # Extensions have already been read by read_index_dict_with_version 

1147 sha1_reader.check_sha(allow_empty=True) 

1148 finally: 

1149 f.close() 

1150 

1151 def __len__(self) -> int: 

1152 """Number of entries in this index file.""" 

1153 return len(self._byname) 

1154 

1155 def __getitem__(self, key: bytes) -> IndexEntry | ConflictedIndexEntry: 

1156 """Retrieve entry by relative path and stage. 

1157 

1158 Returns: Either a IndexEntry or a ConflictedIndexEntry 

1159 Raises KeyError: if the entry does not exist 

1160 """ 

1161 return self._byname[key] 

1162 

1163 def __iter__(self) -> Iterator[bytes]: 

1164 """Iterate over the paths and stages in this index.""" 

1165 return iter(self._byname) 

1166 

1167 def __contains__(self, key: bytes) -> bool: 

1168 """Check if a path exists in the index.""" 

1169 return key in self._byname 

1170 

1171 def get_sha1(self, path: bytes) -> bytes: 

1172 """Return the (git object) SHA1 for the object at a path.""" 

1173 value = self[path] 

1174 if isinstance(value, ConflictedIndexEntry): 

1175 raise UnmergedEntries 

1176 return value.sha 

1177 

1178 def get_mode(self, path: bytes) -> int: 

1179 """Return the POSIX file mode for the object at a path.""" 

1180 value = self[path] 

1181 if isinstance(value, ConflictedIndexEntry): 

1182 raise UnmergedEntries 

1183 return value.mode 

1184 

1185 def iterobjects(self) -> Iterable[tuple[bytes, bytes, int]]: 

1186 """Iterate over path, sha, mode tuples for use with commit_tree.""" 

1187 for path in self: 

1188 entry = self[path] 

1189 if isinstance(entry, ConflictedIndexEntry): 

1190 raise UnmergedEntries 

1191 yield path, entry.sha, cleanup_mode(entry.mode) 

1192 

1193 def has_conflicts(self) -> bool: 

1194 """Check if the index contains any conflicted entries. 

1195 

1196 Returns: 

1197 True if any entries are conflicted, False otherwise 

1198 """ 

1199 for value in self._byname.values(): 

1200 if isinstance(value, ConflictedIndexEntry): 

1201 return True 

1202 return False 

1203 

1204 def clear(self) -> None: 

1205 """Remove all contents from this index.""" 

1206 self._byname = {} 

1207 

1208 def __setitem__( 

1209 self, name: bytes, value: IndexEntry | ConflictedIndexEntry 

1210 ) -> None: 

1211 """Set an entry in the index.""" 

1212 assert isinstance(name, bytes) 

1213 self._byname[name] = value 

1214 

1215 def __delitem__(self, name: bytes) -> None: 

1216 """Delete an entry from the index.""" 

1217 del self._byname[name] 

1218 

1219 def iteritems( 

1220 self, 

1221 ) -> Iterator[tuple[bytes, IndexEntry | ConflictedIndexEntry]]: 

1222 """Iterate over (path, entry) pairs in the index. 

1223 

1224 Returns: 

1225 Iterator of (path, entry) tuples 

1226 """ 

1227 return iter(self._byname.items()) 

1228 

1229 def items(self) -> Iterator[tuple[bytes, IndexEntry | ConflictedIndexEntry]]: 

1230 """Get an iterator over (path, entry) pairs. 

1231 

1232 Returns: 

1233 Iterator of (path, entry) tuples 

1234 """ 

1235 return iter(self._byname.items()) 

1236 

1237 def update(self, entries: dict[bytes, IndexEntry | ConflictedIndexEntry]) -> None: 

1238 """Update the index with multiple entries. 

1239 

1240 Args: 

1241 entries: Dictionary mapping paths to index entries 

1242 """ 

1243 for key, value in entries.items(): 

1244 self[key] = value 

1245 

1246 def paths(self) -> Generator[bytes, None, None]: 

1247 """Generate all paths in the index. 

1248 

1249 Yields: 

1250 Path names as bytes 

1251 """ 

1252 yield from self._byname.keys() 

1253 

1254 def changes_from_tree( 

1255 self, 

1256 object_store: ObjectContainer, 

1257 tree: ObjectID, 

1258 want_unchanged: bool = False, 

1259 ) -> Generator[ 

1260 tuple[ 

1261 tuple[bytes | None, bytes | None], 

1262 tuple[int | None, int | None], 

1263 tuple[bytes | None, bytes | None], 

1264 ], 

1265 None, 

1266 None, 

1267 ]: 

1268 """Find the differences between the contents of this index and a tree. 

1269 

1270 Args: 

1271 object_store: Object store to use for retrieving tree contents 

1272 tree: SHA1 of the root tree 

1273 want_unchanged: Whether unchanged files should be reported 

1274 Returns: Iterator over tuples with (oldpath, newpath), (oldmode, 

1275 newmode), (oldsha, newsha) 

1276 """ 

1277 

1278 def lookup_entry(path: bytes) -> tuple[bytes, int]: 

1279 entry = self[path] 

1280 if hasattr(entry, "sha") and hasattr(entry, "mode"): 

1281 return entry.sha, cleanup_mode(entry.mode) 

1282 else: 

1283 # Handle ConflictedIndexEntry case 

1284 return b"", 0 

1285 

1286 yield from changes_from_tree( 

1287 self.paths(), 

1288 lookup_entry, 

1289 object_store, 

1290 tree, 

1291 want_unchanged=want_unchanged, 

1292 ) 

1293 

1294 def commit(self, object_store: ObjectContainer) -> bytes: 

1295 """Create a new tree from an index. 

1296 

1297 Args: 

1298 object_store: Object store to save the tree in 

1299 Returns: 

1300 Root tree SHA 

1301 """ 

1302 return commit_tree(object_store, self.iterobjects()) 

1303 

1304 def is_sparse(self) -> bool: 

1305 """Check if this index contains sparse directory entries. 

1306 

1307 Returns: 

1308 True if any sparse directory extension is present 

1309 """ 

1310 return any(isinstance(ext, SparseDirExtension) for ext in self._extensions) 

1311 

1312 def ensure_full_index(self, object_store: "BaseObjectStore") -> None: 

1313 """Expand all sparse directory entries into full file entries. 

1314 

1315 This converts a sparse index into a full index by recursively 

1316 expanding any sparse directory entries into their constituent files. 

1317 

1318 Args: 

1319 object_store: Object store to read tree objects from 

1320 

1321 Raises: 

1322 KeyError: If a tree object referenced by a sparse dir entry doesn't exist 

1323 """ 

1324 if not self.is_sparse(): 

1325 return 

1326 

1327 # Find all sparse directory entries 

1328 sparse_dirs = [] 

1329 for path, entry in list(self._byname.items()): 

1330 if isinstance(entry, IndexEntry) and entry.is_sparse_dir(path): 

1331 sparse_dirs.append((path, entry)) 

1332 

1333 # Expand each sparse directory 

1334 for path, entry in sparse_dirs: 

1335 # Remove the sparse directory entry 

1336 del self._byname[path] 

1337 

1338 # Get the tree object 

1339 tree = object_store[entry.sha] 

1340 if not isinstance(tree, Tree): 

1341 raise ValueError(f"Sparse directory {path!r} points to non-tree object") 

1342 

1343 # Recursively add all entries from the tree 

1344 self._expand_tree(path.rstrip(b"/"), tree, object_store, entry) 

1345 

1346 # Remove the sparse directory extension 

1347 self._extensions = [ 

1348 ext for ext in self._extensions if not isinstance(ext, SparseDirExtension) 

1349 ] 

1350 

1351 def _expand_tree( 

1352 self, 

1353 prefix: bytes, 

1354 tree: Tree, 

1355 object_store: "BaseObjectStore", 

1356 template_entry: IndexEntry, 

1357 ) -> None: 

1358 """Recursively expand a tree into index entries. 

1359 

1360 Args: 

1361 prefix: Path prefix for entries (without trailing slash) 

1362 tree: Tree object to expand 

1363 object_store: Object store to read nested trees from 

1364 template_entry: Template entry to copy metadata from 

1365 """ 

1366 for name, mode, sha in tree.items(): 

1367 if prefix: 

1368 full_path = prefix + b"/" + name 

1369 else: 

1370 full_path = name 

1371 

1372 if stat.S_ISDIR(mode): 

1373 # Recursively expand subdirectories 

1374 subtree = object_store[sha] 

1375 if not isinstance(subtree, Tree): 

1376 raise ValueError( 

1377 f"Directory entry {full_path!r} points to non-tree object" 

1378 ) 

1379 self._expand_tree(full_path, subtree, object_store, template_entry) 

1380 else: 

1381 # Create an index entry for this file 

1382 # Use the template entry for metadata but with the file's sha and mode 

1383 new_entry = IndexEntry( 

1384 ctime=template_entry.ctime, 

1385 mtime=template_entry.mtime, 

1386 dev=template_entry.dev, 

1387 ino=template_entry.ino, 

1388 mode=mode, 

1389 uid=template_entry.uid, 

1390 gid=template_entry.gid, 

1391 size=0, # Size is unknown from tree 

1392 sha=sha, 

1393 flags=0, 

1394 extended_flags=0, # Don't copy skip-worktree flag 

1395 ) 

1396 self._byname[full_path] = new_entry 

1397 

1398 def convert_to_sparse( 

1399 self, 

1400 object_store: "BaseObjectStore", 

1401 tree_sha: bytes, 

1402 sparse_dirs: Set[bytes], 

1403 ) -> None: 

1404 """Convert full index entries to sparse directory entries. 

1405 

1406 This collapses directories that are entirely outside the sparse 

1407 checkout cone into single sparse directory entries. 

1408 

1409 Args: 

1410 object_store: Object store to read tree objects 

1411 tree_sha: SHA of the tree (usually HEAD) to base sparse dirs on 

1412 sparse_dirs: Set of directory paths (with trailing /) to collapse 

1413 

1414 Raises: 

1415 KeyError: If tree_sha or a subdirectory doesn't exist 

1416 """ 

1417 if not sparse_dirs: 

1418 return 

1419 

1420 # Get the base tree 

1421 tree = object_store[tree_sha] 

1422 if not isinstance(tree, Tree): 

1423 raise ValueError(f"tree_sha {tree_sha!r} is not a tree object") 

1424 

1425 # For each sparse directory, find its tree SHA and create sparse entry 

1426 for dir_path in sparse_dirs: 

1427 dir_path_stripped = dir_path.rstrip(b"/") 

1428 

1429 # Find the tree SHA for this directory 

1430 subtree_sha = self._find_subtree_sha(tree, dir_path_stripped, object_store) 

1431 if subtree_sha is None: 

1432 # Directory doesn't exist in tree, skip it 

1433 continue 

1434 

1435 # Remove all entries under this directory 

1436 entries_to_remove = [ 

1437 path 

1438 for path in self._byname 

1439 if path.startswith(dir_path) or path == dir_path_stripped 

1440 ] 

1441 for path in entries_to_remove: 

1442 del self._byname[path] 

1443 

1444 # Create a sparse directory entry 

1445 # Use minimal metadata since it's not a real file 

1446 sparse_entry = IndexEntry( 

1447 ctime=0, 

1448 mtime=0, 

1449 dev=0, 

1450 ino=0, 

1451 mode=stat.S_IFDIR, 

1452 uid=0, 

1453 gid=0, 

1454 size=0, 

1455 sha=subtree_sha, 

1456 flags=0, 

1457 extended_flags=EXTENDED_FLAG_SKIP_WORKTREE, 

1458 ) 

1459 self._byname[dir_path] = sparse_entry 

1460 

1461 # Add sparse directory extension if not present 

1462 if not self.is_sparse(): 

1463 self._extensions.append(SparseDirExtension()) 

1464 

1465 def _find_subtree_sha( 

1466 self, 

1467 tree: Tree, 

1468 path: bytes, 

1469 object_store: "BaseObjectStore", 

1470 ) -> bytes | None: 

1471 """Find the SHA of a subtree at a given path. 

1472 

1473 Args: 

1474 tree: Root tree object to search in 

1475 path: Path to the subdirectory (no trailing slash) 

1476 object_store: Object store to read nested trees from 

1477 

1478 Returns: 

1479 SHA of the subtree, or None if path doesn't exist 

1480 """ 

1481 if not path: 

1482 return tree.id 

1483 

1484 parts = path.split(b"/") 

1485 current_tree = tree 

1486 

1487 for part in parts: 

1488 # Look for this part in the current tree 

1489 try: 

1490 mode, sha = current_tree[part] 

1491 except KeyError: 

1492 return None 

1493 

1494 if not stat.S_ISDIR(mode): 

1495 # Path component is a file, not a directory 

1496 return None 

1497 

1498 # Load the next tree 

1499 obj = object_store[sha] 

1500 if not isinstance(obj, Tree): 

1501 return None 

1502 current_tree = obj 

1503 

1504 return current_tree.id 

1505 

1506 

1507def commit_tree( 

1508 object_store: ObjectContainer, blobs: Iterable[tuple[bytes, bytes, int]] 

1509) -> bytes: 

1510 """Commit a new tree. 

1511 

1512 Args: 

1513 object_store: Object store to add trees to 

1514 blobs: Iterable over blob path, sha, mode entries 

1515 Returns: 

1516 SHA1 of the created tree. 

1517 """ 

1518 trees: dict[bytes, TreeDict] = {b"": {}} 

1519 

1520 def add_tree(path: bytes) -> TreeDict: 

1521 if path in trees: 

1522 return trees[path] 

1523 dirname, basename = pathsplit(path) 

1524 t = add_tree(dirname) 

1525 assert isinstance(basename, bytes) 

1526 newtree: TreeDict = {} 

1527 t[basename] = newtree 

1528 trees[path] = newtree 

1529 return newtree 

1530 

1531 for path, sha, mode in blobs: 

1532 tree_path, basename = pathsplit(path) 

1533 tree = add_tree(tree_path) 

1534 tree[basename] = (mode, sha) 

1535 

1536 def build_tree(path: bytes) -> bytes: 

1537 tree = Tree() 

1538 for basename, entry in trees[path].items(): 

1539 if isinstance(entry, dict): 

1540 mode = stat.S_IFDIR 

1541 sha = build_tree(pathjoin(path, basename)) 

1542 else: 

1543 (mode, sha) = entry 

1544 tree.add(basename, mode, sha) 

1545 object_store.add_object(tree) 

1546 return tree.id 

1547 

1548 return build_tree(b"") 

1549 

1550 

1551def commit_index(object_store: ObjectContainer, index: Index) -> bytes: 

1552 """Create a new tree from an index. 

1553 

1554 Args: 

1555 object_store: Object store to save the tree in 

1556 index: Index file 

1557 Note: This function is deprecated, use index.commit() instead. 

1558 Returns: Root tree sha. 

1559 """ 

1560 return commit_tree(object_store, index.iterobjects()) 

1561 

1562 

1563def changes_from_tree( 

1564 names: Iterable[bytes], 

1565 lookup_entry: Callable[[bytes], tuple[bytes, int]], 

1566 object_store: ObjectContainer, 

1567 tree: bytes | None, 

1568 want_unchanged: bool = False, 

1569) -> Iterable[ 

1570 tuple[ 

1571 tuple[bytes | None, bytes | None], 

1572 tuple[int | None, int | None], 

1573 tuple[bytes | None, bytes | None], 

1574 ] 

1575]: 

1576 """Find the differences between the contents of a tree and a working copy. 

1577 

1578 Args: 

1579 names: Iterable of names in the working copy 

1580 lookup_entry: Function to lookup an entry in the working copy 

1581 object_store: Object store to use for retrieving tree contents 

1582 tree: SHA1 of the root tree, or None for an empty tree 

1583 want_unchanged: Whether unchanged files should be reported 

1584 Returns: Iterator over tuples with (oldpath, newpath), (oldmode, newmode), 

1585 (oldsha, newsha) 

1586 """ 

1587 # TODO(jelmer): Support a include_trees option 

1588 other_names = set(names) 

1589 

1590 if tree is not None: 

1591 for name, mode, sha in iter_tree_contents(object_store, tree): 

1592 assert name is not None and mode is not None and sha is not None 

1593 try: 

1594 (other_sha, other_mode) = lookup_entry(name) 

1595 except KeyError: 

1596 # Was removed 

1597 yield ((name, None), (mode, None), (sha, None)) 

1598 else: 

1599 other_names.remove(name) 

1600 if want_unchanged or other_sha != sha or other_mode != mode: 

1601 yield ((name, name), (mode, other_mode), (sha, other_sha)) 

1602 

1603 # Mention added files 

1604 for name in other_names: 

1605 try: 

1606 (other_sha, other_mode) = lookup_entry(name) 

1607 except KeyError: 

1608 pass 

1609 else: 

1610 yield ((None, name), (None, other_mode), (None, other_sha)) 

1611 

1612 

1613def index_entry_from_stat( 

1614 stat_val: os.stat_result, 

1615 hex_sha: bytes, 

1616 mode: int | None = None, 

1617) -> IndexEntry: 

1618 """Create a new index entry from a stat value. 

1619 

1620 Args: 

1621 stat_val: POSIX stat_result instance 

1622 hex_sha: Hex sha of the object 

1623 mode: Optional file mode, will be derived from stat if not provided 

1624 """ 

1625 if mode is None: 

1626 mode = cleanup_mode(stat_val.st_mode) 

1627 

1628 return IndexEntry( 

1629 ctime=stat_val.st_ctime, 

1630 mtime=stat_val.st_mtime, 

1631 dev=stat_val.st_dev, 

1632 ino=stat_val.st_ino, 

1633 mode=mode, 

1634 uid=stat_val.st_uid, 

1635 gid=stat_val.st_gid, 

1636 size=stat_val.st_size, 

1637 sha=hex_sha, 

1638 flags=0, 

1639 extended_flags=0, 

1640 ) 

1641 

1642 

1643if sys.platform == "win32": 

1644 # On Windows, creating symlinks either requires administrator privileges 

1645 # or developer mode. Raise a more helpful error when we're unable to 

1646 # create symlinks 

1647 

1648 # https://github.com/jelmer/dulwich/issues/1005 

1649 

1650 class WindowsSymlinkPermissionError(PermissionError): 

1651 """Windows-specific error for symlink creation failures. 

1652 

1653 This error is raised when symlink creation fails on Windows, 

1654 typically due to lack of developer mode or administrator privileges. 

1655 """ 

1656 

1657 def __init__(self, errno: int, msg: str, filename: str | None) -> None: 

1658 """Initialize WindowsSymlinkPermissionError.""" 

1659 super().__init__( 

1660 errno, 

1661 f"Unable to create symlink; do you have developer mode enabled? {msg}", 

1662 filename, 

1663 ) 

1664 

1665 def symlink( 

1666 src: str | bytes, 

1667 dst: str | bytes, 

1668 target_is_directory: bool = False, 

1669 *, 

1670 dir_fd: int | None = None, 

1671 ) -> None: 

1672 """Create a symbolic link on Windows with better error handling. 

1673 

1674 Args: 

1675 src: Source path for the symlink 

1676 dst: Destination path where symlink will be created 

1677 target_is_directory: Whether the target is a directory 

1678 dir_fd: Optional directory file descriptor 

1679 

1680 Raises: 

1681 WindowsSymlinkPermissionError: If symlink creation fails due to permissions 

1682 """ 

1683 try: 

1684 return os.symlink( 

1685 src, dst, target_is_directory=target_is_directory, dir_fd=dir_fd 

1686 ) 

1687 except PermissionError as e: 

1688 raise WindowsSymlinkPermissionError( 

1689 e.errno or 0, e.strerror or "", e.filename 

1690 ) from e 

1691else: 

1692 symlink = os.symlink 

1693 

1694 

1695def build_file_from_blob( 

1696 blob: Blob, 

1697 mode: int, 

1698 target_path: bytes, 

1699 *, 

1700 honor_filemode: bool = True, 

1701 tree_encoding: str = "utf-8", 

1702 symlink_fn: Callable[ 

1703 [str | bytes | os.PathLike[str], str | bytes | os.PathLike[str]], None 

1704 ] 

1705 | None = None, 

1706) -> os.stat_result: 

1707 """Build a file or symlink on disk based on a Git object. 

1708 

1709 Args: 

1710 blob: The git object 

1711 mode: File mode 

1712 target_path: Path to write to 

1713 honor_filemode: An optional flag to honor core.filemode setting in 

1714 config file, default is core.filemode=True, change executable bit 

1715 tree_encoding: Encoding to use for tree contents 

1716 symlink_fn: Function to use for creating symlinks 

1717 Returns: stat object for the file 

1718 """ 

1719 try: 

1720 oldstat = os.lstat(target_path) 

1721 except FileNotFoundError: 

1722 oldstat = None 

1723 contents = blob.as_raw_string() 

1724 if stat.S_ISLNK(mode): 

1725 if oldstat: 

1726 _remove_file_with_readonly_handling(target_path) 

1727 if sys.platform == "win32": 

1728 # os.readlink on Python3 on Windows requires a unicode string. 

1729 contents_str = contents.decode(tree_encoding) 

1730 target_path_str = target_path.decode(tree_encoding) 

1731 (symlink_fn or symlink)(contents_str, target_path_str) 

1732 else: 

1733 (symlink_fn or symlink)(contents, target_path) 

1734 else: 

1735 if oldstat is not None and oldstat.st_size == len(contents): 

1736 with open(target_path, "rb") as f: 

1737 if f.read() == contents: 

1738 return oldstat 

1739 

1740 with open(target_path, "wb") as f: 

1741 # Write out file 

1742 f.write(contents) 

1743 

1744 if honor_filemode: 

1745 os.chmod(target_path, mode) 

1746 

1747 return os.lstat(target_path) 

1748 

1749 

1750INVALID_DOTNAMES = (b".git", b".", b"..", b"") 

1751 

1752 

1753def _normalize_path_element_default(element: bytes) -> bytes: 

1754 """Normalize path element for default case-insensitive comparison.""" 

1755 return element.lower() 

1756 

1757 

1758def _normalize_path_element_ntfs(element: bytes) -> bytes: 

1759 """Normalize path element for NTFS filesystem.""" 

1760 return element.rstrip(b". ").lower() 

1761 

1762 

1763def _normalize_path_element_hfs(element: bytes) -> bytes: 

1764 """Normalize path element for HFS+ filesystem.""" 

1765 import unicodedata 

1766 

1767 # Decode to Unicode (let UnicodeDecodeError bubble up) 

1768 element_str = element.decode("utf-8", errors="strict") 

1769 

1770 # Remove HFS+ ignorable characters 

1771 filtered = "".join(c for c in element_str if ord(c) not in HFS_IGNORABLE_CHARS) 

1772 # Normalize to NFD 

1773 normalized = unicodedata.normalize("NFD", filtered) 

1774 return normalized.lower().encode("utf-8", errors="strict") 

1775 

1776 

1777def get_path_element_normalizer(config: "Config") -> Callable[[bytes], bytes]: 

1778 """Get the appropriate path element normalization function based on config. 

1779 

1780 Args: 

1781 config: Repository configuration object 

1782 

1783 Returns: 

1784 Function that normalizes path elements for the configured filesystem 

1785 """ 

1786 import os 

1787 import sys 

1788 

1789 if config.get_boolean(b"core", b"protectNTFS", os.name == "nt"): 

1790 return _normalize_path_element_ntfs 

1791 elif config.get_boolean(b"core", b"protectHFS", sys.platform == "darwin"): 

1792 return _normalize_path_element_hfs 

1793 else: 

1794 return _normalize_path_element_default 

1795 

1796 

1797def validate_path_element_default(element: bytes) -> bool: 

1798 """Validate a path element using default rules. 

1799 

1800 Args: 

1801 element: Path element to validate 

1802 

1803 Returns: 

1804 True if path element is valid, False otherwise 

1805 """ 

1806 return _normalize_path_element_default(element) not in INVALID_DOTNAMES 

1807 

1808 

1809def validate_path_element_ntfs(element: bytes) -> bool: 

1810 """Validate a path element using NTFS filesystem rules. 

1811 

1812 Args: 

1813 element: Path element to validate 

1814 

1815 Returns: 

1816 True if path element is valid for NTFS, False otherwise 

1817 """ 

1818 normalized = _normalize_path_element_ntfs(element) 

1819 if normalized in INVALID_DOTNAMES: 

1820 return False 

1821 if normalized == b"git~1": 

1822 return False 

1823 return True 

1824 

1825 

1826# HFS+ ignorable Unicode codepoints (from Git's utf8.c) 

1827HFS_IGNORABLE_CHARS = { 

1828 0x200C, # ZERO WIDTH NON-JOINER 

1829 0x200D, # ZERO WIDTH JOINER 

1830 0x200E, # LEFT-TO-RIGHT MARK 

1831 0x200F, # RIGHT-TO-LEFT MARK 

1832 0x202A, # LEFT-TO-RIGHT EMBEDDING 

1833 0x202B, # RIGHT-TO-LEFT EMBEDDING 

1834 0x202C, # POP DIRECTIONAL FORMATTING 

1835 0x202D, # LEFT-TO-RIGHT OVERRIDE 

1836 0x202E, # RIGHT-TO-LEFT OVERRIDE 

1837 0x206A, # INHIBIT SYMMETRIC SWAPPING 

1838 0x206B, # ACTIVATE SYMMETRIC SWAPPING 

1839 0x206C, # INHIBIT ARABIC FORM SHAPING 

1840 0x206D, # ACTIVATE ARABIC FORM SHAPING 

1841 0x206E, # NATIONAL DIGIT SHAPES 

1842 0x206F, # NOMINAL DIGIT SHAPES 

1843 0xFEFF, # ZERO WIDTH NO-BREAK SPACE 

1844} 

1845 

1846 

1847def validate_path_element_hfs(element: bytes) -> bool: 

1848 """Validate path element for HFS+ filesystem. 

1849 

1850 Equivalent to Git's is_hfs_dotgit and related checks. 

1851 Uses NFD normalization and ignores HFS+ ignorable characters. 

1852 """ 

1853 try: 

1854 normalized = _normalize_path_element_hfs(element) 

1855 except UnicodeDecodeError: 

1856 # Malformed UTF-8 - be conservative and reject 

1857 return False 

1858 

1859 # Check against invalid names 

1860 if normalized in INVALID_DOTNAMES: 

1861 return False 

1862 

1863 # Also check for 8.3 short name 

1864 if normalized == b"git~1": 

1865 return False 

1866 

1867 return True 

1868 

1869 

1870def validate_path( 

1871 path: bytes, 

1872 element_validator: Callable[[bytes], bool] = validate_path_element_default, 

1873) -> bool: 

1874 """Default path validator that just checks for .git/.""" 

1875 parts = path.split(b"/") 

1876 for p in parts: 

1877 if not element_validator(p): 

1878 return False 

1879 else: 

1880 return True 

1881 

1882 

1883def build_index_from_tree( 

1884 root_path: str | bytes, 

1885 index_path: str | bytes, 

1886 object_store: ObjectContainer, 

1887 tree_id: bytes, 

1888 honor_filemode: bool = True, 

1889 validate_path_element: Callable[[bytes], bool] = validate_path_element_default, 

1890 symlink_fn: Callable[ 

1891 [str | bytes | os.PathLike[str], str | bytes | os.PathLike[str]], None 

1892 ] 

1893 | None = None, 

1894 blob_normalizer: "FilterBlobNormalizer | None" = None, 

1895 tree_encoding: str = "utf-8", 

1896) -> None: 

1897 """Generate and materialize index from a tree. 

1898 

1899 Args: 

1900 tree_id: Tree to materialize 

1901 root_path: Target dir for materialized index files 

1902 index_path: Target path for generated index 

1903 object_store: Non-empty object store holding tree contents 

1904 honor_filemode: An optional flag to honor core.filemode setting in 

1905 config file, default is core.filemode=True, change executable bit 

1906 validate_path_element: Function to validate path elements to check 

1907 out; default just refuses .git and .. directories. 

1908 symlink_fn: Function to use for creating symlinks 

1909 blob_normalizer: An optional BlobNormalizer to use for converting line 

1910 endings when writing blobs to the working directory. 

1911 tree_encoding: Encoding used for tree paths (default: utf-8) 

1912 

1913 Note: existing index is wiped and contents are not merged 

1914 in a working dir. Suitable only for fresh clones. 

1915 """ 

1916 index = Index(index_path, read=False) 

1917 if not isinstance(root_path, bytes): 

1918 root_path = os.fsencode(root_path) 

1919 

1920 for entry in iter_tree_contents(object_store, tree_id): 

1921 assert ( 

1922 entry.path is not None and entry.mode is not None and entry.sha is not None 

1923 ) 

1924 if not validate_path(entry.path, validate_path_element): 

1925 continue 

1926 full_path = _tree_to_fs_path(root_path, entry.path, tree_encoding) 

1927 

1928 if not os.path.exists(os.path.dirname(full_path)): 

1929 os.makedirs(os.path.dirname(full_path)) 

1930 

1931 # TODO(jelmer): Merge new index into working tree 

1932 if S_ISGITLINK(entry.mode): 

1933 if not os.path.isdir(full_path): 

1934 os.mkdir(full_path) 

1935 st = os.lstat(full_path) 

1936 # TODO(jelmer): record and return submodule paths 

1937 else: 

1938 obj = object_store[entry.sha] 

1939 assert isinstance(obj, Blob) 

1940 # Apply blob normalization for checkout if normalizer is provided 

1941 if blob_normalizer is not None: 

1942 obj = blob_normalizer.checkout_normalize(obj, entry.path) 

1943 st = build_file_from_blob( 

1944 obj, 

1945 entry.mode, 

1946 full_path, 

1947 honor_filemode=honor_filemode, 

1948 tree_encoding=tree_encoding, 

1949 symlink_fn=symlink_fn, 

1950 ) 

1951 

1952 # Add file to index 

1953 if not honor_filemode or S_ISGITLINK(entry.mode): 

1954 # we can not use tuple slicing to build a new tuple, 

1955 # because on windows that will convert the times to 

1956 # longs, which causes errors further along 

1957 st_tuple = ( 

1958 entry.mode, 

1959 st.st_ino, 

1960 st.st_dev, 

1961 st.st_nlink, 

1962 st.st_uid, 

1963 st.st_gid, 

1964 st.st_size, 

1965 st.st_atime, 

1966 st.st_mtime, 

1967 st.st_ctime, 

1968 ) 

1969 st = st.__class__(st_tuple) 

1970 # default to a stage 0 index entry (normal) 

1971 # when reading from the filesystem 

1972 index[entry.path] = index_entry_from_stat(st, entry.sha) 

1973 

1974 index.write() 

1975 

1976 

1977def blob_from_path_and_mode( 

1978 fs_path: bytes, mode: int, tree_encoding: str = "utf-8" 

1979) -> Blob: 

1980 """Create a blob from a path and a stat object. 

1981 

1982 Args: 

1983 fs_path: Full file system path to file 

1984 mode: File mode 

1985 tree_encoding: Encoding to use for tree contents 

1986 Returns: A `Blob` object 

1987 """ 

1988 assert isinstance(fs_path, bytes) 

1989 blob = Blob() 

1990 if stat.S_ISLNK(mode): 

1991 if sys.platform == "win32": 

1992 # os.readlink on Python3 on Windows requires a unicode string. 

1993 blob.data = os.readlink(os.fsdecode(fs_path)).encode(tree_encoding) 

1994 else: 

1995 blob.data = os.readlink(fs_path) 

1996 else: 

1997 with open(fs_path, "rb") as f: 

1998 blob.data = f.read() 

1999 return blob 

2000 

2001 

2002def blob_from_path_and_stat( 

2003 fs_path: bytes, st: os.stat_result, tree_encoding: str = "utf-8" 

2004) -> Blob: 

2005 """Create a blob from a path and a stat object. 

2006 

2007 Args: 

2008 fs_path: Full file system path to file 

2009 st: A stat object 

2010 tree_encoding: Encoding to use for tree contents 

2011 Returns: A `Blob` object 

2012 """ 

2013 return blob_from_path_and_mode(fs_path, st.st_mode, tree_encoding) 

2014 

2015 

2016def read_submodule_head(path: str | bytes) -> bytes | None: 

2017 """Read the head commit of a submodule. 

2018 

2019 Args: 

2020 path: path to the submodule 

2021 Returns: HEAD sha, None if not a valid head/repository 

2022 """ 

2023 from .errors import NotGitRepository 

2024 from .repo import Repo 

2025 

2026 # Repo currently expects a "str", so decode if necessary. 

2027 # TODO(jelmer): Perhaps move this into Repo() ? 

2028 if not isinstance(path, str): 

2029 path = os.fsdecode(path) 

2030 try: 

2031 repo = Repo(path) 

2032 except NotGitRepository: 

2033 return None 

2034 try: 

2035 return repo.head() 

2036 except KeyError: 

2037 return None 

2038 

2039 

2040def _has_directory_changed(tree_path: bytes, entry: IndexEntry) -> bool: 

2041 """Check if a directory has changed after getting an error. 

2042 

2043 When handling an error trying to create a blob from a path, call this 

2044 function. It will check if the path is a directory. If it's a directory 

2045 and a submodule, check the submodule head to see if it's has changed. If 

2046 not, consider the file as changed as Git tracked a file and not a 

2047 directory. 

2048 

2049 Return true if the given path should be considered as changed and False 

2050 otherwise or if the path is not a directory. 

2051 """ 

2052 # This is actually a directory 

2053 if os.path.exists(os.path.join(tree_path, b".git")): 

2054 # Submodule 

2055 head = read_submodule_head(tree_path) 

2056 if entry.sha != head: 

2057 return True 

2058 else: 

2059 # The file was changed to a directory, so consider it removed. 

2060 return True 

2061 

2062 return False 

2063 

2064 

2065os_sep_bytes = os.sep.encode("ascii") 

2066 

2067 

2068def _ensure_parent_dir_exists(full_path: bytes) -> None: 

2069 """Ensure parent directory exists, checking no parent is a file.""" 

2070 parent_dir = os.path.dirname(full_path) 

2071 if parent_dir and not os.path.exists(parent_dir): 

2072 # Walk up the directory tree to find the first existing parent 

2073 current = parent_dir 

2074 parents_to_check: list[bytes] = [] 

2075 

2076 while current and not os.path.exists(current): 

2077 parents_to_check.insert(0, current) 

2078 new_parent = os.path.dirname(current) 

2079 if new_parent == current: 

2080 # Reached the root or can't go up further 

2081 break 

2082 current = new_parent 

2083 

2084 # Check if the existing parent (if any) is a directory 

2085 if current and os.path.exists(current) and not os.path.isdir(current): 

2086 raise OSError( 

2087 f"Cannot create directory, parent path is a file: {current!r}" 

2088 ) 

2089 

2090 # Now check each parent we need to create isn't blocked by an existing file 

2091 for parent_path in parents_to_check: 

2092 if os.path.exists(parent_path) and not os.path.isdir(parent_path): 

2093 raise OSError( 

2094 f"Cannot create directory, parent path is a file: {parent_path!r}" 

2095 ) 

2096 

2097 os.makedirs(parent_dir) 

2098 

2099 

2100def _remove_file_with_readonly_handling(path: bytes) -> None: 

2101 """Remove a file, handling read-only files on Windows. 

2102 

2103 Args: 

2104 path: Path to the file to remove 

2105 """ 

2106 try: 

2107 os.unlink(path) 

2108 except PermissionError: 

2109 # On Windows, remove read-only attribute and retry 

2110 if sys.platform == "win32": 

2111 os.chmod(path, stat.S_IWRITE | stat.S_IREAD) 

2112 os.unlink(path) 

2113 else: 

2114 raise 

2115 

2116 

2117def _remove_empty_parents(path: bytes, stop_at: bytes) -> None: 

2118 """Remove empty parent directories up to stop_at.""" 

2119 parent = os.path.dirname(path) 

2120 while parent and parent != stop_at: 

2121 try: 

2122 os.rmdir(parent) 

2123 parent = os.path.dirname(parent) 

2124 except FileNotFoundError: 

2125 # Directory doesn't exist - stop trying 

2126 break 

2127 except OSError as e: 

2128 if e.errno in (errno.ENOTEMPTY, errno.EEXIST): 

2129 # Directory not empty - stop trying 

2130 break 

2131 raise 

2132 

2133 

2134def _check_symlink_matches( 

2135 full_path: bytes, repo_object_store: "BaseObjectStore", entry_sha: bytes 

2136) -> bool: 

2137 """Check if symlink target matches expected target. 

2138 

2139 Returns True if symlink matches, False if it doesn't match. 

2140 """ 

2141 try: 

2142 current_target = os.readlink(full_path) 

2143 blob_obj = repo_object_store[entry_sha] 

2144 expected_target = blob_obj.as_raw_string() 

2145 if isinstance(current_target, str): 

2146 current_target = current_target.encode() 

2147 return current_target == expected_target 

2148 except FileNotFoundError: 

2149 # Symlink doesn't exist 

2150 return False 

2151 except OSError as e: 

2152 if e.errno == errno.EINVAL: 

2153 # Not a symlink 

2154 return False 

2155 raise 

2156 

2157 

2158def _check_file_matches( 

2159 repo_object_store: "BaseObjectStore", 

2160 full_path: bytes, 

2161 entry_sha: bytes, 

2162 entry_mode: int, 

2163 current_stat: os.stat_result, 

2164 honor_filemode: bool, 

2165 blob_normalizer: "FilterBlobNormalizer | None" = None, 

2166 tree_path: bytes | None = None, 

2167) -> bool: 

2168 """Check if a file on disk matches the expected git object. 

2169 

2170 Returns True if file matches, False if it doesn't match. 

2171 """ 

2172 # Check mode first (if honor_filemode is True) 

2173 if honor_filemode: 

2174 current_mode = stat.S_IMODE(current_stat.st_mode) 

2175 expected_mode = stat.S_IMODE(entry_mode) 

2176 

2177 # For regular files, only check the user executable bit, not group/other permissions 

2178 # This matches Git's behavior where umask differences don't count as modifications 

2179 if stat.S_ISREG(current_stat.st_mode): 

2180 # Normalize regular file modes to ignore group/other write permissions 

2181 current_mode_normalized = ( 

2182 current_mode & 0o755 

2183 ) # Keep only user rwx and all read+execute 

2184 expected_mode_normalized = expected_mode & 0o755 

2185 

2186 # For Git compatibility, regular files should be either 644 or 755 

2187 if expected_mode_normalized not in (0o644, 0o755): 

2188 expected_mode_normalized = 0o644 # Default for regular files 

2189 if current_mode_normalized not in (0o644, 0o755): 

2190 # Determine if it should be executable based on user execute bit 

2191 if current_mode & 0o100: # User execute bit is set 

2192 current_mode_normalized = 0o755 

2193 else: 

2194 current_mode_normalized = 0o644 

2195 

2196 if current_mode_normalized != expected_mode_normalized: 

2197 return False 

2198 else: 

2199 # For non-regular files (symlinks, etc.), check mode exactly 

2200 if current_mode != expected_mode: 

2201 return False 

2202 

2203 # If mode matches (or we don't care), check content via size first 

2204 blob_obj = repo_object_store[entry_sha] 

2205 if current_stat.st_size != blob_obj.raw_length(): 

2206 return False 

2207 

2208 # Size matches, check actual content 

2209 try: 

2210 with open(full_path, "rb") as f: 

2211 current_content = f.read() 

2212 expected_content = blob_obj.as_raw_string() 

2213 if blob_normalizer and tree_path is not None: 

2214 assert isinstance(blob_obj, Blob) 

2215 normalized_blob = blob_normalizer.checkout_normalize( 

2216 blob_obj, tree_path 

2217 ) 

2218 expected_content = normalized_blob.as_raw_string() 

2219 return current_content == expected_content 

2220 except (FileNotFoundError, PermissionError, IsADirectoryError): 

2221 return False 

2222 

2223 

2224def _transition_to_submodule( 

2225 repo: "Repo", 

2226 path: bytes, 

2227 full_path: bytes, 

2228 current_stat: os.stat_result | None, 

2229 entry: IndexEntry | TreeEntry, 

2230 index: Index, 

2231) -> None: 

2232 """Transition any type to submodule.""" 

2233 from .submodule import ensure_submodule_placeholder 

2234 

2235 if current_stat is not None and stat.S_ISDIR(current_stat.st_mode): 

2236 # Already a directory, just ensure .git file exists 

2237 ensure_submodule_placeholder(repo, path) 

2238 else: 

2239 # Remove whatever is there and create submodule 

2240 if current_stat is not None: 

2241 _remove_file_with_readonly_handling(full_path) 

2242 ensure_submodule_placeholder(repo, path) 

2243 

2244 st = os.lstat(full_path) 

2245 assert entry.sha is not None 

2246 index[path] = index_entry_from_stat(st, entry.sha) 

2247 

2248 

2249def _transition_to_file( 

2250 object_store: "BaseObjectStore", 

2251 path: bytes, 

2252 full_path: bytes, 

2253 current_stat: os.stat_result | None, 

2254 entry: IndexEntry | TreeEntry, 

2255 index: Index, 

2256 honor_filemode: bool, 

2257 symlink_fn: Callable[ 

2258 [str | bytes | os.PathLike[str], str | bytes | os.PathLike[str]], None 

2259 ] 

2260 | None, 

2261 blob_normalizer: "FilterBlobNormalizer | None", 

2262 tree_encoding: str = "utf-8", 

2263) -> None: 

2264 """Transition any type to regular file or symlink.""" 

2265 assert entry.sha is not None and entry.mode is not None 

2266 # Check if we need to update 

2267 if ( 

2268 current_stat is not None 

2269 and stat.S_ISREG(current_stat.st_mode) 

2270 and not stat.S_ISLNK(entry.mode) 

2271 ): 

2272 # File to file - check if update needed 

2273 file_matches = _check_file_matches( 

2274 object_store, 

2275 full_path, 

2276 entry.sha, 

2277 entry.mode, 

2278 current_stat, 

2279 honor_filemode, 

2280 blob_normalizer, 

2281 path, 

2282 ) 

2283 needs_update = not file_matches 

2284 elif ( 

2285 current_stat is not None 

2286 and stat.S_ISLNK(current_stat.st_mode) 

2287 and stat.S_ISLNK(entry.mode) 

2288 ): 

2289 # Symlink to symlink - check if update needed 

2290 symlink_matches = _check_symlink_matches(full_path, object_store, entry.sha) 

2291 needs_update = not symlink_matches 

2292 else: 

2293 needs_update = True 

2294 

2295 if not needs_update: 

2296 # Just update index - current_stat should always be valid here since we're not updating 

2297 assert current_stat is not None 

2298 index[path] = index_entry_from_stat(current_stat, entry.sha) 

2299 return 

2300 

2301 # Remove existing entry if needed 

2302 if current_stat is not None and stat.S_ISDIR(current_stat.st_mode): 

2303 # Remove directory 

2304 dir_contents = set(os.listdir(full_path)) 

2305 git_file_name = b".git" if isinstance(full_path, bytes) else ".git" 

2306 

2307 if git_file_name in dir_contents: 

2308 if dir_contents != {git_file_name}: 

2309 raise IsADirectoryError( 

2310 f"Cannot replace submodule with untracked files: {full_path!r}" 

2311 ) 

2312 shutil.rmtree(full_path) 

2313 else: 

2314 try: 

2315 os.rmdir(full_path) 

2316 except OSError as e: 

2317 if e.errno in (errno.ENOTEMPTY, errno.EEXIST): 

2318 raise IsADirectoryError( 

2319 f"Cannot replace non-empty directory with file: {full_path!r}" 

2320 ) 

2321 raise 

2322 elif current_stat is not None: 

2323 _remove_file_with_readonly_handling(full_path) 

2324 

2325 # Ensure parent directory exists 

2326 _ensure_parent_dir_exists(full_path) 

2327 

2328 # Write the file 

2329 blob_obj = object_store[entry.sha] 

2330 assert isinstance(blob_obj, Blob) 

2331 if blob_normalizer: 

2332 blob_obj = blob_normalizer.checkout_normalize(blob_obj, path) 

2333 st = build_file_from_blob( 

2334 blob_obj, 

2335 entry.mode, 

2336 full_path, 

2337 honor_filemode=honor_filemode, 

2338 tree_encoding=tree_encoding, 

2339 symlink_fn=symlink_fn, 

2340 ) 

2341 index[path] = index_entry_from_stat(st, entry.sha) 

2342 

2343 

2344def _transition_to_absent( 

2345 repo: "Repo", 

2346 path: bytes, 

2347 full_path: bytes, 

2348 current_stat: os.stat_result | None, 

2349 index: Index, 

2350) -> None: 

2351 """Remove any type of entry.""" 

2352 if current_stat is None: 

2353 return 

2354 

2355 if stat.S_ISDIR(current_stat.st_mode): 

2356 # Check if it's a submodule directory 

2357 dir_contents = set(os.listdir(full_path)) 

2358 git_file_name = b".git" if isinstance(full_path, bytes) else ".git" 

2359 

2360 if git_file_name in dir_contents and dir_contents == {git_file_name}: 

2361 shutil.rmtree(full_path) 

2362 else: 

2363 try: 

2364 os.rmdir(full_path) 

2365 except OSError as e: 

2366 if e.errno not in (errno.ENOTEMPTY, errno.EEXIST): 

2367 raise 

2368 else: 

2369 _remove_file_with_readonly_handling(full_path) 

2370 

2371 try: 

2372 del index[path] 

2373 except KeyError: 

2374 pass 

2375 

2376 # Try to remove empty parent directories 

2377 _remove_empty_parents( 

2378 full_path, repo.path if isinstance(repo.path, bytes) else repo.path.encode() 

2379 ) 

2380 

2381 

2382def detect_case_only_renames( 

2383 changes: Sequence["TreeChange"], 

2384 config: "Config", 

2385) -> list["TreeChange"]: 

2386 """Detect and transform case-only renames in a list of tree changes. 

2387 

2388 This function identifies file renames that only differ in case (e.g., 

2389 README.txt -> readme.txt) and transforms matching ADD/DELETE pairs into 

2390 CHANGE_RENAME operations. It uses filesystem-appropriate path normalization 

2391 based on the repository configuration. 

2392 

2393 Args: 

2394 changes: List of TreeChange objects representing file changes 

2395 config: Repository configuration object 

2396 

2397 Returns: 

2398 New list of TreeChange objects with case-only renames converted to CHANGE_RENAME 

2399 """ 

2400 from .diff_tree import ( 

2401 CHANGE_ADD, 

2402 CHANGE_COPY, 

2403 CHANGE_DELETE, 

2404 CHANGE_MODIFY, 

2405 CHANGE_RENAME, 

2406 TreeChange, 

2407 ) 

2408 

2409 # Build dictionaries of old and new paths with their normalized forms 

2410 old_paths_normalized = {} 

2411 new_paths_normalized = {} 

2412 old_changes = {} # Map from old path to change object 

2413 new_changes = {} # Map from new path to change object 

2414 

2415 # Get the appropriate normalizer based on config 

2416 normalize_func = get_path_element_normalizer(config) 

2417 

2418 def normalize_path(path: bytes) -> bytes: 

2419 """Normalize entire path using element normalization.""" 

2420 return b"/".join(normalize_func(part) for part in path.split(b"/")) 

2421 

2422 # Pre-normalize all paths once to avoid repeated normalization 

2423 for change in changes: 

2424 if change.type == CHANGE_DELETE and change.old: 

2425 assert change.old.path is not None 

2426 try: 

2427 normalized = normalize_path(change.old.path) 

2428 except UnicodeDecodeError: 

2429 import logging 

2430 

2431 logging.warning( 

2432 "Skipping case-only rename detection for path with invalid UTF-8: %r", 

2433 change.old.path, 

2434 ) 

2435 else: 

2436 old_paths_normalized[normalized] = change.old.path 

2437 old_changes[change.old.path] = change 

2438 elif change.type == CHANGE_RENAME and change.old: 

2439 assert change.old.path is not None 

2440 # Treat RENAME as DELETE + ADD for case-only detection 

2441 try: 

2442 normalized = normalize_path(change.old.path) 

2443 except UnicodeDecodeError: 

2444 import logging 

2445 

2446 logging.warning( 

2447 "Skipping case-only rename detection for path with invalid UTF-8: %r", 

2448 change.old.path, 

2449 ) 

2450 else: 

2451 old_paths_normalized[normalized] = change.old.path 

2452 old_changes[change.old.path] = change 

2453 

2454 if ( 

2455 change.type in (CHANGE_ADD, CHANGE_MODIFY, CHANGE_RENAME, CHANGE_COPY) 

2456 and change.new 

2457 ): 

2458 assert change.new.path is not None 

2459 try: 

2460 normalized = normalize_path(change.new.path) 

2461 except UnicodeDecodeError: 

2462 import logging 

2463 

2464 logging.warning( 

2465 "Skipping case-only rename detection for path with invalid UTF-8: %r", 

2466 change.new.path, 

2467 ) 

2468 else: 

2469 new_paths_normalized[normalized] = change.new.path 

2470 new_changes[change.new.path] = change 

2471 

2472 # Find case-only renames and transform changes 

2473 case_only_renames = set() 

2474 new_rename_changes = [] 

2475 

2476 for norm_path, old_path in old_paths_normalized.items(): 

2477 if norm_path in new_paths_normalized: 

2478 new_path = new_paths_normalized[norm_path] 

2479 if old_path != new_path: 

2480 # Found a case-only rename 

2481 old_change = old_changes[old_path] 

2482 new_change = new_changes[new_path] 

2483 

2484 # Create a CHANGE_RENAME to replace the DELETE and ADD/MODIFY pair 

2485 if new_change.type == CHANGE_ADD: 

2486 # Simple case: DELETE + ADD becomes RENAME 

2487 rename_change = TreeChange( 

2488 CHANGE_RENAME, old_change.old, new_change.new 

2489 ) 

2490 else: 

2491 # Complex case: DELETE + MODIFY becomes RENAME 

2492 # Use the old file from DELETE and new file from MODIFY 

2493 rename_change = TreeChange( 

2494 CHANGE_RENAME, old_change.old, new_change.new 

2495 ) 

2496 

2497 new_rename_changes.append(rename_change) 

2498 

2499 # Mark the old changes for removal 

2500 case_only_renames.add(old_change) 

2501 case_only_renames.add(new_change) 

2502 

2503 # Return new list with original ADD/DELETE changes replaced by renames 

2504 result = [change for change in changes if change not in case_only_renames] 

2505 result.extend(new_rename_changes) 

2506 return result 

2507 

2508 

2509def update_working_tree( 

2510 repo: "Repo", 

2511 old_tree_id: bytes | None, 

2512 new_tree_id: bytes, 

2513 change_iterator: Iterator["TreeChange"], 

2514 honor_filemode: bool = True, 

2515 validate_path_element: Callable[[bytes], bool] | None = None, 

2516 symlink_fn: Callable[ 

2517 [str | bytes | os.PathLike[str], str | bytes | os.PathLike[str]], None 

2518 ] 

2519 | None = None, 

2520 force_remove_untracked: bool = False, 

2521 blob_normalizer: "FilterBlobNormalizer | None" = None, 

2522 tree_encoding: str = "utf-8", 

2523 allow_overwrite_modified: bool = False, 

2524) -> None: 

2525 """Update the working tree and index to match a new tree. 

2526 

2527 This function handles: 

2528 - Adding new files 

2529 - Updating modified files 

2530 - Removing deleted files 

2531 - Cleaning up empty directories 

2532 

2533 Args: 

2534 repo: Repository object 

2535 old_tree_id: SHA of the tree before the update 

2536 new_tree_id: SHA of the tree to update to 

2537 change_iterator: Iterator of TreeChange objects to apply 

2538 honor_filemode: An optional flag to honor core.filemode setting 

2539 validate_path_element: Function to validate path elements to check out 

2540 symlink_fn: Function to use for creating symlinks 

2541 force_remove_untracked: If True, remove files that exist in working 

2542 directory but not in target tree, even if old_tree_id is None 

2543 blob_normalizer: An optional BlobNormalizer to use for converting line 

2544 endings when writing blobs to the working directory. 

2545 tree_encoding: Encoding used for tree paths (default: utf-8) 

2546 allow_overwrite_modified: If False, raise an error when attempting to 

2547 overwrite files that have been modified compared to old_tree_id 

2548 """ 

2549 if validate_path_element is None: 

2550 validate_path_element = validate_path_element_default 

2551 

2552 from .diff_tree import ( 

2553 CHANGE_ADD, 

2554 CHANGE_COPY, 

2555 CHANGE_DELETE, 

2556 CHANGE_MODIFY, 

2557 CHANGE_RENAME, 

2558 CHANGE_UNCHANGED, 

2559 ) 

2560 

2561 repo_path = repo.path if isinstance(repo.path, bytes) else repo.path.encode() 

2562 index = repo.open_index() 

2563 

2564 # Convert iterator to list since we need multiple passes 

2565 changes = list(change_iterator) 

2566 

2567 # Transform case-only renames on case-insensitive filesystems 

2568 import platform 

2569 

2570 default_ignore_case = platform.system() in ("Windows", "Darwin") 

2571 config = repo.get_config() 

2572 ignore_case = config.get_boolean((b"core",), b"ignorecase", default_ignore_case) 

2573 

2574 if ignore_case: 

2575 config = repo.get_config() 

2576 changes = detect_case_only_renames(changes, config) 

2577 

2578 # Check for path conflicts where files need to become directories 

2579 paths_becoming_dirs = set() 

2580 for change in changes: 

2581 if change.type in (CHANGE_ADD, CHANGE_MODIFY, CHANGE_RENAME, CHANGE_COPY): 

2582 assert change.new is not None 

2583 path = change.new.path 

2584 assert path is not None 

2585 if b"/" in path: # This is a file inside a directory 

2586 # Check if any parent path exists as a file in the old tree or changes 

2587 parts = path.split(b"/") 

2588 for i in range(1, len(parts)): 

2589 parent = b"/".join(parts[:i]) 

2590 # See if this parent path is being deleted (was a file, becoming a dir) 

2591 for other_change in changes: 

2592 if ( 

2593 other_change.type == CHANGE_DELETE 

2594 and other_change.old 

2595 and other_change.old.path == parent 

2596 ): 

2597 paths_becoming_dirs.add(parent) 

2598 

2599 # Check if any path that needs to become a directory has been modified 

2600 for path in paths_becoming_dirs: 

2601 full_path = _tree_to_fs_path(repo_path, path, tree_encoding) 

2602 try: 

2603 current_stat = os.lstat(full_path) 

2604 except FileNotFoundError: 

2605 continue # File doesn't exist, nothing to check 

2606 except OSError as e: 

2607 raise OSError( 

2608 f"Cannot access {path.decode('utf-8', errors='replace')}: {e}" 

2609 ) from e 

2610 

2611 if stat.S_ISREG(current_stat.st_mode): 

2612 # Find the old entry for this path 

2613 old_change = None 

2614 for change in changes: 

2615 if ( 

2616 change.type == CHANGE_DELETE 

2617 and change.old 

2618 and change.old.path == path 

2619 ): 

2620 old_change = change 

2621 break 

2622 

2623 if old_change: 

2624 # Check if file has been modified 

2625 assert old_change.old is not None 

2626 assert ( 

2627 old_change.old.sha is not None and old_change.old.mode is not None 

2628 ) 

2629 file_matches = _check_file_matches( 

2630 repo.object_store, 

2631 full_path, 

2632 old_change.old.sha, 

2633 old_change.old.mode, 

2634 current_stat, 

2635 honor_filemode, 

2636 blob_normalizer, 

2637 path, 

2638 ) 

2639 if not file_matches: 

2640 raise OSError( 

2641 f"Cannot replace modified file with directory: {path!r}" 

2642 ) 

2643 

2644 # Check for uncommitted modifications before making any changes 

2645 if not allow_overwrite_modified and old_tree_id: 

2646 for change in changes: 

2647 # Only check files that are being modified or deleted 

2648 if change.type in (CHANGE_MODIFY, CHANGE_DELETE) and change.old: 

2649 path = change.old.path 

2650 assert path is not None 

2651 if path.startswith(b".git") or not validate_path( 

2652 path, validate_path_element 

2653 ): 

2654 continue 

2655 

2656 full_path = _tree_to_fs_path(repo_path, path, tree_encoding) 

2657 try: 

2658 current_stat = os.lstat(full_path) 

2659 except FileNotFoundError: 

2660 continue # File doesn't exist, nothing to check 

2661 except OSError as e: 

2662 raise OSError( 

2663 f"Cannot access {path.decode('utf-8', errors='replace')}: {e}" 

2664 ) from e 

2665 

2666 if stat.S_ISREG(current_stat.st_mode): 

2667 # Check if working tree file differs from old tree 

2668 assert change.old.sha is not None and change.old.mode is not None 

2669 file_matches = _check_file_matches( 

2670 repo.object_store, 

2671 full_path, 

2672 change.old.sha, 

2673 change.old.mode, 

2674 current_stat, 

2675 honor_filemode, 

2676 blob_normalizer, 

2677 path, 

2678 ) 

2679 if not file_matches: 

2680 from .errors import WorkingTreeModifiedError 

2681 

2682 raise WorkingTreeModifiedError( 

2683 f"Your local changes to '{path.decode('utf-8', errors='replace')}' " 

2684 f"would be overwritten by checkout. " 

2685 f"Please commit your changes or stash them before you switch branches." 

2686 ) 

2687 

2688 # Apply the changes 

2689 for change in changes: 

2690 if change.type in (CHANGE_DELETE, CHANGE_RENAME): 

2691 # Remove file/directory 

2692 assert change.old is not None and change.old.path is not None 

2693 path = change.old.path 

2694 if path.startswith(b".git") or not validate_path( 

2695 path, validate_path_element 

2696 ): 

2697 continue 

2698 

2699 full_path = _tree_to_fs_path(repo_path, path, tree_encoding) 

2700 try: 

2701 delete_stat: os.stat_result | None = os.lstat(full_path) 

2702 except FileNotFoundError: 

2703 delete_stat = None 

2704 except OSError as e: 

2705 raise OSError( 

2706 f"Cannot access {path.decode('utf-8', errors='replace')}: {e}" 

2707 ) from e 

2708 

2709 _transition_to_absent(repo, path, full_path, delete_stat, index) 

2710 

2711 if change.type in ( 

2712 CHANGE_ADD, 

2713 CHANGE_MODIFY, 

2714 CHANGE_UNCHANGED, 

2715 CHANGE_COPY, 

2716 CHANGE_RENAME, 

2717 ): 

2718 # Add or modify file 

2719 assert ( 

2720 change.new is not None 

2721 and change.new.path is not None 

2722 and change.new.mode is not None 

2723 ) 

2724 path = change.new.path 

2725 if path.startswith(b".git") or not validate_path( 

2726 path, validate_path_element 

2727 ): 

2728 continue 

2729 

2730 full_path = _tree_to_fs_path(repo_path, path, tree_encoding) 

2731 try: 

2732 modify_stat: os.stat_result | None = os.lstat(full_path) 

2733 except FileNotFoundError: 

2734 modify_stat = None 

2735 except OSError as e: 

2736 raise OSError( 

2737 f"Cannot access {path.decode('utf-8', errors='replace')}: {e}" 

2738 ) from e 

2739 

2740 if S_ISGITLINK(change.new.mode): 

2741 _transition_to_submodule( 

2742 repo, path, full_path, modify_stat, change.new, index 

2743 ) 

2744 else: 

2745 _transition_to_file( 

2746 repo.object_store, 

2747 path, 

2748 full_path, 

2749 modify_stat, 

2750 change.new, 

2751 index, 

2752 honor_filemode, 

2753 symlink_fn, 

2754 blob_normalizer, 

2755 tree_encoding, 

2756 ) 

2757 

2758 index.write() 

2759 

2760 

2761def _stat_matches_entry(st: os.stat_result, entry: IndexEntry) -> bool: 

2762 """Check if filesystem stat matches index entry stat. 

2763 

2764 This is used to determine if a file might have changed without reading its content. 

2765 Git uses this optimization to avoid expensive filter operations on unchanged files. 

2766 

2767 Args: 

2768 st: Filesystem stat result 

2769 entry: Index entry to compare against 

2770 Returns: True if stat matches and file is likely unchanged 

2771 """ 

2772 # Get entry mtime 

2773 if isinstance(entry.mtime, tuple): 

2774 entry_mtime_sec = entry.mtime[0] 

2775 else: 

2776 entry_mtime_sec = int(entry.mtime) 

2777 

2778 # Compare modification time (seconds only for now) 

2779 # Note: We use int() to compare only seconds, as nanosecond precision 

2780 # can vary across filesystems 

2781 if int(st.st_mtime) != entry_mtime_sec: 

2782 return False 

2783 

2784 # Compare file size 

2785 if st.st_size != entry.size: 

2786 return False 

2787 

2788 # If both mtime and size match, file is likely unchanged 

2789 return True 

2790 

2791 

2792def _check_entry_for_changes( 

2793 tree_path: bytes, 

2794 entry: IndexEntry | ConflictedIndexEntry, 

2795 root_path: bytes, 

2796 filter_blob_callback: Callable[[bytes, bytes], bytes] | None = None, 

2797) -> bytes | None: 

2798 """Check a single index entry for changes. 

2799 

2800 Args: 

2801 tree_path: Path in the tree 

2802 entry: Index entry to check 

2803 root_path: Root filesystem path 

2804 filter_blob_callback: Optional callback to filter blobs 

2805 Returns: tree_path if changed, None otherwise 

2806 """ 

2807 if isinstance(entry, ConflictedIndexEntry): 

2808 # Conflicted files are always unstaged 

2809 return tree_path 

2810 

2811 full_path = _tree_to_fs_path(root_path, tree_path) 

2812 try: 

2813 st = os.lstat(full_path) 

2814 if stat.S_ISDIR(st.st_mode): 

2815 if _has_directory_changed(tree_path, entry): 

2816 return tree_path 

2817 return None 

2818 

2819 if not stat.S_ISREG(st.st_mode) and not stat.S_ISLNK(st.st_mode): 

2820 return None 

2821 

2822 # Optimization: If stat matches index entry (mtime and size unchanged), 

2823 # we can skip reading and filtering the file entirely. This is a significant 

2824 # performance improvement for repositories with many unchanged files. 

2825 # Even with filters (e.g., LFS), if the file hasn't been modified (stat unchanged), 

2826 # the filter output would be the same, so we can safely skip the expensive 

2827 # filter operation. This addresses performance issues with LFS repositories 

2828 # where filter operations can be very slow. 

2829 if _stat_matches_entry(st, entry): 

2830 return None 

2831 

2832 blob = blob_from_path_and_stat(full_path, st) 

2833 

2834 if filter_blob_callback is not None: 

2835 blob.data = filter_blob_callback(blob.data, tree_path) 

2836 except FileNotFoundError: 

2837 # The file was removed, so we assume that counts as 

2838 # different from whatever file used to exist. 

2839 return tree_path 

2840 else: 

2841 if blob.id != entry.sha: 

2842 return tree_path 

2843 return None 

2844 

2845 

2846def get_unstaged_changes( 

2847 index: Index, 

2848 root_path: str | bytes, 

2849 filter_blob_callback: Callable[..., Any] | None = None, 

2850 preload_index: bool = False, 

2851) -> Generator[bytes, None, None]: 

2852 """Walk through an index and check for differences against working tree. 

2853 

2854 Args: 

2855 index: index to check 

2856 root_path: path in which to find files 

2857 filter_blob_callback: Optional callback to filter blobs 

2858 preload_index: If True, use parallel threads to check files (requires threading support) 

2859 Returns: iterator over paths with unstaged changes 

2860 """ 

2861 # For each entry in the index check the sha1 & ensure not staged 

2862 if not isinstance(root_path, bytes): 

2863 root_path = os.fsencode(root_path) 

2864 

2865 if preload_index: 

2866 # Use parallel processing for better performance on slow filesystems 

2867 try: 

2868 import multiprocessing 

2869 from concurrent.futures import ThreadPoolExecutor 

2870 except ImportError: 

2871 # If threading is not available, fall back to serial processing 

2872 preload_index = False 

2873 else: 

2874 # Collect all entries first 

2875 entries = list(index.iteritems()) 

2876 

2877 # Use number of CPUs but cap at 8 threads to avoid overhead 

2878 num_workers = min(multiprocessing.cpu_count(), 8) 

2879 

2880 # Process entries in parallel 

2881 with ThreadPoolExecutor(max_workers=num_workers) as executor: 

2882 # Submit all tasks 

2883 futures = [ 

2884 executor.submit( 

2885 _check_entry_for_changes, 

2886 tree_path, 

2887 entry, 

2888 root_path, 

2889 filter_blob_callback, 

2890 ) 

2891 for tree_path, entry in entries 

2892 ] 

2893 

2894 # Yield results as they complete 

2895 for future in futures: 

2896 result = future.result() 

2897 if result is not None: 

2898 yield result 

2899 

2900 if not preload_index: 

2901 # Serial processing 

2902 for tree_path, entry in index.iteritems(): 

2903 result = _check_entry_for_changes( 

2904 tree_path, entry, root_path, filter_blob_callback 

2905 ) 

2906 if result is not None: 

2907 yield result 

2908 

2909 

2910def _tree_to_fs_path( 

2911 root_path: bytes, tree_path: bytes, tree_encoding: str = "utf-8" 

2912) -> bytes: 

2913 """Convert a git tree path to a file system path. 

2914 

2915 Args: 

2916 root_path: Root filesystem path 

2917 tree_path: Git tree path as bytes (encoded with tree_encoding) 

2918 tree_encoding: Encoding used for tree paths (default: utf-8) 

2919 

2920 Returns: File system path. 

2921 """ 

2922 assert isinstance(tree_path, bytes) 

2923 if os_sep_bytes != b"/": 

2924 sep_corrected_path = tree_path.replace(b"/", os_sep_bytes) 

2925 else: 

2926 sep_corrected_path = tree_path 

2927 

2928 # On Windows, we need to handle tree path encoding properly 

2929 if sys.platform == "win32": 

2930 # Decode from tree encoding, then re-encode for filesystem 

2931 try: 

2932 tree_path_str = sep_corrected_path.decode(tree_encoding) 

2933 sep_corrected_path = os.fsencode(tree_path_str) 

2934 except UnicodeDecodeError: 

2935 # If decoding fails, use the original bytes 

2936 pass 

2937 

2938 return os.path.join(root_path, sep_corrected_path) 

2939 

2940 

2941def _fs_to_tree_path(fs_path: str | bytes, tree_encoding: str = "utf-8") -> bytes: 

2942 """Convert a file system path to a git tree path. 

2943 

2944 Args: 

2945 fs_path: File system path. 

2946 tree_encoding: Encoding to use for tree paths (default: utf-8) 

2947 

2948 Returns: Git tree path as bytes (encoded with tree_encoding) 

2949 """ 

2950 if not isinstance(fs_path, bytes): 

2951 fs_path_bytes = os.fsencode(fs_path) 

2952 else: 

2953 fs_path_bytes = fs_path 

2954 

2955 # On Windows, we need to ensure tree paths are properly encoded 

2956 if sys.platform == "win32": 

2957 try: 

2958 # Decode from filesystem encoding, then re-encode with tree encoding 

2959 fs_path_str = os.fsdecode(fs_path_bytes) 

2960 fs_path_bytes = fs_path_str.encode(tree_encoding) 

2961 except UnicodeDecodeError: 

2962 # If filesystem decoding fails, use the original bytes 

2963 pass 

2964 

2965 if os_sep_bytes != b"/": 

2966 tree_path = fs_path_bytes.replace(os_sep_bytes, b"/") 

2967 else: 

2968 tree_path = fs_path_bytes 

2969 return tree_path 

2970 

2971 

2972def index_entry_from_directory(st: os.stat_result, path: bytes) -> IndexEntry | None: 

2973 """Create an index entry for a directory. 

2974 

2975 This is only used for submodules (directories containing .git). 

2976 

2977 Args: 

2978 st: Stat result for the directory 

2979 path: Path to the directory 

2980 

2981 Returns: 

2982 IndexEntry for a submodule, or None if not a submodule 

2983 """ 

2984 if os.path.exists(os.path.join(path, b".git")): 

2985 head = read_submodule_head(path) 

2986 if head is None: 

2987 return None 

2988 return index_entry_from_stat(st, head, mode=S_IFGITLINK) 

2989 return None 

2990 

2991 

2992def index_entry_from_path( 

2993 path: bytes, object_store: ObjectContainer | None = None 

2994) -> IndexEntry | None: 

2995 """Create an index from a filesystem path. 

2996 

2997 This returns an index value for files, symlinks 

2998 and tree references. for directories and 

2999 non-existent files it returns None 

3000 

3001 Args: 

3002 path: Path to create an index entry for 

3003 object_store: Optional object store to 

3004 save new blobs in 

3005 Returns: An index entry; None for directories 

3006 """ 

3007 assert isinstance(path, bytes) 

3008 st = os.lstat(path) 

3009 if stat.S_ISDIR(st.st_mode): 

3010 return index_entry_from_directory(st, path) 

3011 

3012 if stat.S_ISREG(st.st_mode) or stat.S_ISLNK(st.st_mode): 

3013 blob = blob_from_path_and_stat(path, st) 

3014 if object_store is not None: 

3015 object_store.add_object(blob) 

3016 return index_entry_from_stat(st, blob.id) 

3017 

3018 return None 

3019 

3020 

3021def iter_fresh_entries( 

3022 paths: Iterable[bytes], 

3023 root_path: bytes, 

3024 object_store: ObjectContainer | None = None, 

3025) -> Iterator[tuple[bytes, IndexEntry | None]]: 

3026 """Iterate over current versions of index entries on disk. 

3027 

3028 Args: 

3029 paths: Paths to iterate over 

3030 root_path: Root path to access from 

3031 object_store: Optional store to save new blobs in 

3032 Returns: Iterator over path, index_entry 

3033 """ 

3034 for path in paths: 

3035 p = _tree_to_fs_path(root_path, path) 

3036 try: 

3037 entry = index_entry_from_path(p, object_store=object_store) 

3038 except (FileNotFoundError, IsADirectoryError): 

3039 entry = None 

3040 yield path, entry 

3041 

3042 

3043def iter_fresh_objects( 

3044 paths: Iterable[bytes], 

3045 root_path: bytes, 

3046 include_deleted: bool = False, 

3047 object_store: ObjectContainer | None = None, 

3048) -> Iterator[tuple[bytes, bytes | None, int | None]]: 

3049 """Iterate over versions of objects on disk referenced by index. 

3050 

3051 Args: 

3052 paths: Paths to check 

3053 root_path: Root path to access from 

3054 include_deleted: Include deleted entries with sha and 

3055 mode set to None 

3056 object_store: Optional object store to report new items to 

3057 Returns: Iterator over path, sha, mode 

3058 """ 

3059 for path, entry in iter_fresh_entries(paths, root_path, object_store=object_store): 

3060 if entry is None: 

3061 if include_deleted: 

3062 yield path, None, None 

3063 else: 

3064 yield path, entry.sha, cleanup_mode(entry.mode) 

3065 

3066 

3067def refresh_index(index: Index, root_path: bytes) -> None: 

3068 """Refresh the contents of an index. 

3069 

3070 This is the equivalent to running 'git commit -a'. 

3071 

3072 Args: 

3073 index: Index to update 

3074 root_path: Root filesystem path 

3075 """ 

3076 for path, entry in iter_fresh_entries(index, root_path): 

3077 if entry: 

3078 index[path] = entry 

3079 

3080 

3081class locked_index: 

3082 """Lock the index while making modifications. 

3083 

3084 Works as a context manager. 

3085 """ 

3086 

3087 _file: "_GitFile" 

3088 

3089 def __init__(self, path: bytes | str) -> None: 

3090 """Initialize locked_index.""" 

3091 self._path = path 

3092 

3093 def __enter__(self) -> Index: 

3094 """Enter context manager and lock index.""" 

3095 f = GitFile(self._path, "wb") 

3096 self._file = f 

3097 self._index = Index(self._path) 

3098 return self._index 

3099 

3100 def __exit__( 

3101 self, 

3102 exc_type: type | None, 

3103 exc_value: BaseException | None, 

3104 traceback: types.TracebackType | None, 

3105 ) -> None: 

3106 """Exit context manager and unlock index.""" 

3107 if exc_type is not None: 

3108 self._file.abort() 

3109 return 

3110 try: 

3111 f = SHA1Writer(self._file) 

3112 write_index_dict(f, self._index._byname) 

3113 except BaseException: 

3114 self._file.abort() 

3115 else: 

3116 f.close()