Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/dulwich/index.py: 31%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

1183 statements  

1# index.py -- File parser/writer for the git index file 

2# Copyright (C) 2008-2013 Jelmer Vernooij <jelmer@jelmer.uk> 

3# 

4# SPDX-License-Identifier: Apache-2.0 OR GPL-2.0-or-later 

5# Dulwich is dual-licensed under the Apache License, Version 2.0 and the GNU 

6# General Public License as published by the Free Software Foundation; version 2.0 

7# or (at your option) any later version. You can redistribute it and/or 

8# modify it under the terms of either of these two licenses. 

9# 

10# Unless required by applicable law or agreed to in writing, software 

11# distributed under the License is distributed on an "AS IS" BASIS, 

12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

13# See the License for the specific language governing permissions and 

14# limitations under the License. 

15# 

16# You should have received a copy of the licenses; if not, see 

17# <http://www.gnu.org/licenses/> for a copy of the GNU General Public License 

18# and <http://www.apache.org/licenses/LICENSE-2.0> for a copy of the Apache 

19# License, Version 2.0. 

20# 

21 

22"""Parser for the git index file format.""" 

23 

24import errno 

25import os 

26import shutil 

27import stat 

28import struct 

29import sys 

30import types 

31from collections.abc import Generator, Iterable, Iterator, Mapping, Sequence, Set 

32from dataclasses import dataclass 

33from enum import Enum 

34from typing import ( 

35 IO, 

36 TYPE_CHECKING, 

37 Any, 

38 BinaryIO, 

39 Callable, 

40 Optional, 

41 Union, 

42) 

43 

44if TYPE_CHECKING: 

45 from .config import Config 

46 from .diff_tree import TreeChange 

47 from .file import _GitFile 

48 from .filters import FilterBlobNormalizer 

49 from .object_store import BaseObjectStore 

50 from .repo import Repo 

51 

52from .file import GitFile 

53from .object_store import iter_tree_contents 

54from .objects import ( 

55 S_IFGITLINK, 

56 S_ISGITLINK, 

57 Blob, 

58 ObjectID, 

59 Tree, 

60 TreeEntry, 

61 hex_to_sha, 

62 sha_to_hex, 

63) 

64from .pack import ObjectContainer, SHA1Reader, SHA1Writer 

65 

66# Type alias for recursive tree structure used in commit_tree 

67if sys.version_info >= (3, 10): 

68 TreeDict = dict[bytes, Union["TreeDict", tuple[int, bytes]]] 

69else: 

70 TreeDict = dict[bytes, Any] 

71 

72# 2-bit stage (during merge) 

73FLAG_STAGEMASK = 0x3000 

74FLAG_STAGESHIFT = 12 

75FLAG_NAMEMASK = 0x0FFF 

76 

77# assume-valid 

78FLAG_VALID = 0x8000 

79 

80# extended flag (must be zero in version 2) 

81FLAG_EXTENDED = 0x4000 

82 

83# used by sparse checkout 

84EXTENDED_FLAG_SKIP_WORKTREE = 0x4000 

85 

86# used by "git add -N" 

87EXTENDED_FLAG_INTEND_TO_ADD = 0x2000 

88 

89DEFAULT_VERSION = 2 

90 

91# Index extension signatures 

92TREE_EXTENSION = b"TREE" 

93REUC_EXTENSION = b"REUC" 

94UNTR_EXTENSION = b"UNTR" 

95EOIE_EXTENSION = b"EOIE" 

96IEOT_EXTENSION = b"IEOT" 

97SDIR_EXTENSION = b"sdir" # Sparse directory extension 

98 

99 

100def _encode_varint(value: int) -> bytes: 

101 """Encode an integer using variable-width encoding. 

102 

103 Same format as used for OFS_DELTA pack entries and index v4 path compression. 

104 Uses 7 bits per byte, with the high bit indicating continuation. 

105 

106 Args: 

107 value: Integer to encode 

108 Returns: 

109 Encoded bytes 

110 """ 

111 if value == 0: 

112 return b"\x00" 

113 

114 result = [] 

115 while value > 0: 

116 byte = value & 0x7F # Take lower 7 bits 

117 value >>= 7 

118 if value > 0: 

119 byte |= 0x80 # Set continuation bit 

120 result.append(byte) 

121 

122 return bytes(result) 

123 

124 

125def _decode_varint(data: bytes, offset: int = 0) -> tuple[int, int]: 

126 """Decode a variable-width encoded integer. 

127 

128 Args: 

129 data: Bytes to decode from 

130 offset: Starting offset in data 

131 Returns: 

132 tuple of (decoded_value, new_offset) 

133 """ 

134 value = 0 

135 shift = 0 

136 pos = offset 

137 

138 while pos < len(data): 

139 byte = data[pos] 

140 pos += 1 

141 value |= (byte & 0x7F) << shift 

142 shift += 7 

143 if not (byte & 0x80): # No continuation bit 

144 break 

145 

146 return value, pos 

147 

148 

149def _compress_path(path: bytes, previous_path: bytes) -> bytes: 

150 """Compress a path relative to the previous path for index version 4. 

151 

152 Args: 

153 path: Path to compress 

154 previous_path: Previous path for comparison 

155 Returns: 

156 Compressed path data (varint prefix_len + suffix) 

157 """ 

158 # Find the common prefix length 

159 common_len = 0 

160 min_len = min(len(path), len(previous_path)) 

161 

162 for i in range(min_len): 

163 if path[i] == previous_path[i]: 

164 common_len += 1 

165 else: 

166 break 

167 

168 # The number of bytes to remove from the end of previous_path 

169 # to get the common prefix 

170 remove_len = len(previous_path) - common_len 

171 

172 # The suffix to append 

173 suffix = path[common_len:] 

174 

175 # Encode: varint(remove_len) + suffix + NUL 

176 return _encode_varint(remove_len) + suffix + b"\x00" 

177 

178 

179def _decompress_path( 

180 data: bytes, offset: int, previous_path: bytes 

181) -> tuple[bytes, int]: 

182 """Decompress a path from index version 4 compressed format. 

183 

184 Args: 

185 data: Raw data containing compressed path 

186 offset: Starting offset in data 

187 previous_path: Previous path for decompression 

188 Returns: 

189 tuple of (decompressed_path, new_offset) 

190 """ 

191 # Decode the number of bytes to remove from previous path 

192 remove_len, new_offset = _decode_varint(data, offset) 

193 

194 # Find the NUL terminator for the suffix 

195 suffix_start = new_offset 

196 suffix_end = suffix_start 

197 while suffix_end < len(data) and data[suffix_end] != 0: 

198 suffix_end += 1 

199 

200 if suffix_end >= len(data): 

201 raise ValueError("Unterminated path suffix in compressed entry") 

202 

203 suffix = data[suffix_start:suffix_end] 

204 new_offset = suffix_end + 1 # Skip the NUL terminator 

205 

206 # Reconstruct the path 

207 if remove_len > len(previous_path): 

208 raise ValueError( 

209 f"Invalid path compression: trying to remove {remove_len} bytes from {len(previous_path)}-byte path" 

210 ) 

211 

212 prefix = previous_path[:-remove_len] if remove_len > 0 else previous_path 

213 path = prefix + suffix 

214 

215 return path, new_offset 

216 

217 

218def _decompress_path_from_stream( 

219 f: BinaryIO, previous_path: bytes 

220) -> tuple[bytes, int]: 

221 """Decompress a path from index version 4 compressed format, reading from stream. 

222 

223 Args: 

224 f: File-like object to read from 

225 previous_path: Previous path for decompression 

226 Returns: 

227 tuple of (decompressed_path, bytes_consumed) 

228 """ 

229 # Decode the varint for remove_len by reading byte by byte 

230 remove_len = 0 

231 shift = 0 

232 bytes_consumed = 0 

233 

234 while True: 

235 byte_data = f.read(1) 

236 if not byte_data: 

237 raise ValueError("Unexpected end of file while reading varint") 

238 byte = byte_data[0] 

239 bytes_consumed += 1 

240 remove_len |= (byte & 0x7F) << shift 

241 shift += 7 

242 if not (byte & 0x80): # No continuation bit 

243 break 

244 

245 # Read the suffix until NUL terminator 

246 suffix = b"" 

247 while True: 

248 byte_data = f.read(1) 

249 if not byte_data: 

250 raise ValueError("Unexpected end of file while reading path suffix") 

251 byte = byte_data[0] 

252 bytes_consumed += 1 

253 if byte == 0: # NUL terminator 

254 break 

255 suffix += bytes([byte]) 

256 

257 # Reconstruct the path 

258 if remove_len > len(previous_path): 

259 raise ValueError( 

260 f"Invalid path compression: trying to remove {remove_len} bytes from {len(previous_path)}-byte path" 

261 ) 

262 

263 prefix = previous_path[:-remove_len] if remove_len > 0 else previous_path 

264 path = prefix + suffix 

265 

266 return path, bytes_consumed 

267 

268 

269class Stage(Enum): 

270 """Represents the stage of an index entry during merge conflicts.""" 

271 

272 NORMAL = 0 

273 MERGE_CONFLICT_ANCESTOR = 1 

274 MERGE_CONFLICT_THIS = 2 

275 MERGE_CONFLICT_OTHER = 3 

276 

277 

278@dataclass 

279class SerializedIndexEntry: 

280 """Represents a serialized index entry as stored in the index file. 

281 

282 This dataclass holds the raw data for an index entry before it's 

283 parsed into the more user-friendly IndexEntry format. 

284 """ 

285 

286 name: bytes 

287 ctime: Union[int, float, tuple[int, int]] 

288 mtime: Union[int, float, tuple[int, int]] 

289 dev: int 

290 ino: int 

291 mode: int 

292 uid: int 

293 gid: int 

294 size: int 

295 sha: bytes 

296 flags: int 

297 extended_flags: int 

298 

299 def stage(self) -> Stage: 

300 """Extract the stage from the flags field. 

301 

302 Returns: 

303 Stage enum value indicating merge conflict state 

304 """ 

305 return Stage((self.flags & FLAG_STAGEMASK) >> FLAG_STAGESHIFT) 

306 

307 def is_sparse_dir(self) -> bool: 

308 """Check if this entry represents a sparse directory. 

309 

310 A sparse directory entry is a collapsed representation of an entire 

311 directory tree in a sparse index. It has: 

312 - Directory mode (0o040000) 

313 - SKIP_WORKTREE flag set 

314 - Path ending with '/' 

315 - SHA pointing to a tree object 

316 

317 Returns: 

318 True if entry is a sparse directory entry 

319 """ 

320 return ( 

321 stat.S_ISDIR(self.mode) 

322 and bool(self.extended_flags & EXTENDED_FLAG_SKIP_WORKTREE) 

323 and self.name.endswith(b"/") 

324 ) 

325 

326 

327@dataclass 

328class IndexExtension: 

329 """Base class for index extensions.""" 

330 

331 signature: bytes 

332 data: bytes 

333 

334 @classmethod 

335 def from_raw(cls, signature: bytes, data: bytes) -> "IndexExtension": 

336 """Create an extension from raw data. 

337 

338 Args: 

339 signature: 4-byte extension signature 

340 data: Extension data 

341 Returns: 

342 Parsed extension object 

343 """ 

344 if signature == TREE_EXTENSION: 

345 return TreeExtension.from_bytes(data) 

346 elif signature == REUC_EXTENSION: 

347 return ResolveUndoExtension.from_bytes(data) 

348 elif signature == UNTR_EXTENSION: 

349 return UntrackedExtension.from_bytes(data) 

350 elif signature == SDIR_EXTENSION: 

351 return SparseDirExtension.from_bytes(data) 

352 else: 

353 # Unknown extension - just store raw data 

354 return cls(signature, data) 

355 

356 def to_bytes(self) -> bytes: 

357 """Serialize extension to bytes.""" 

358 return self.data 

359 

360 

361class TreeExtension(IndexExtension): 

362 """Tree cache extension.""" 

363 

364 def __init__(self, entries: list[tuple[bytes, bytes, int]]) -> None: 

365 """Initialize TreeExtension. 

366 

367 Args: 

368 entries: List of tree cache entries (path, sha, flags) 

369 """ 

370 self.entries = entries 

371 super().__init__(TREE_EXTENSION, b"") 

372 

373 @classmethod 

374 def from_bytes(cls, data: bytes) -> "TreeExtension": 

375 """Parse TreeExtension from bytes. 

376 

377 Args: 

378 data: Raw bytes to parse 

379 

380 Returns: 

381 TreeExtension instance 

382 """ 

383 # TODO: Implement tree cache parsing 

384 return cls([]) 

385 

386 def to_bytes(self) -> bytes: 

387 """Serialize TreeExtension to bytes. 

388 

389 Returns: 

390 Serialized extension data 

391 """ 

392 # TODO: Implement tree cache serialization 

393 return b"" 

394 

395 

396class ResolveUndoExtension(IndexExtension): 

397 """Resolve undo extension for recording merge conflicts.""" 

398 

399 def __init__(self, entries: list[tuple[bytes, list[tuple[int, bytes]]]]) -> None: 

400 """Initialize ResolveUndoExtension. 

401 

402 Args: 

403 entries: List of (path, stages) where stages is a list of (stage, sha) tuples 

404 """ 

405 self.entries = entries 

406 super().__init__(REUC_EXTENSION, b"") 

407 

408 @classmethod 

409 def from_bytes(cls, data: bytes) -> "ResolveUndoExtension": 

410 """Parse ResolveUndoExtension from bytes. 

411 

412 Args: 

413 data: Raw bytes to parse 

414 

415 Returns: 

416 ResolveUndoExtension instance 

417 """ 

418 # TODO: Implement resolve undo parsing 

419 return cls([]) 

420 

421 def to_bytes(self) -> bytes: 

422 """Serialize ResolveUndoExtension to bytes. 

423 

424 Returns: 

425 Serialized extension data 

426 """ 

427 # TODO: Implement resolve undo serialization 

428 return b"" 

429 

430 

431class UntrackedExtension(IndexExtension): 

432 """Untracked cache extension.""" 

433 

434 def __init__(self, data: bytes) -> None: 

435 """Initialize UntrackedExtension. 

436 

437 Args: 

438 data: Raw untracked cache data 

439 """ 

440 super().__init__(UNTR_EXTENSION, data) 

441 

442 @classmethod 

443 def from_bytes(cls, data: bytes) -> "UntrackedExtension": 

444 """Parse UntrackedExtension from bytes. 

445 

446 Args: 

447 data: Raw bytes to parse 

448 

449 Returns: 

450 UntrackedExtension instance 

451 """ 

452 return cls(data) 

453 

454 

455class SparseDirExtension(IndexExtension): 

456 """Sparse directory extension. 

457 

458 This extension indicates that the index contains sparse directory entries. 

459 Tools that don't understand sparse index should avoid interacting with 

460 the index when this extension is present. 

461 

462 The extension data is empty - its presence is the signal. 

463 """ 

464 

465 def __init__(self) -> None: 

466 """Initialize SparseDirExtension.""" 

467 super().__init__(SDIR_EXTENSION, b"") 

468 

469 @classmethod 

470 def from_bytes(cls, data: bytes) -> "SparseDirExtension": 

471 """Parse SparseDirExtension from bytes. 

472 

473 Args: 

474 data: Raw bytes to parse (should be empty) 

475 

476 Returns: 

477 SparseDirExtension instance 

478 """ 

479 return cls() 

480 

481 def to_bytes(self) -> bytes: 

482 """Serialize SparseDirExtension to bytes. 

483 

484 Returns: 

485 Empty bytes (extension presence is the signal) 

486 """ 

487 return b"" 

488 

489 

490@dataclass 

491class IndexEntry: 

492 """Represents an entry in the Git index. 

493 

494 This is a higher-level representation of an index entry that includes 

495 parsed data and convenience methods. 

496 """ 

497 

498 ctime: Union[int, float, tuple[int, int]] 

499 mtime: Union[int, float, tuple[int, int]] 

500 dev: int 

501 ino: int 

502 mode: int 

503 uid: int 

504 gid: int 

505 size: int 

506 sha: bytes 

507 flags: int = 0 

508 extended_flags: int = 0 

509 

510 @classmethod 

511 def from_serialized(cls, serialized: SerializedIndexEntry) -> "IndexEntry": 

512 """Create an IndexEntry from a SerializedIndexEntry. 

513 

514 Args: 

515 serialized: SerializedIndexEntry to convert 

516 

517 Returns: 

518 New IndexEntry instance 

519 """ 

520 return cls( 

521 ctime=serialized.ctime, 

522 mtime=serialized.mtime, 

523 dev=serialized.dev, 

524 ino=serialized.ino, 

525 mode=serialized.mode, 

526 uid=serialized.uid, 

527 gid=serialized.gid, 

528 size=serialized.size, 

529 sha=serialized.sha, 

530 flags=serialized.flags, 

531 extended_flags=serialized.extended_flags, 

532 ) 

533 

534 def serialize(self, name: bytes, stage: Stage) -> SerializedIndexEntry: 

535 """Serialize this entry with a given name and stage. 

536 

537 Args: 

538 name: Path name for the entry 

539 stage: Merge conflict stage 

540 

541 Returns: 

542 SerializedIndexEntry ready for writing to disk 

543 """ 

544 # Clear out any existing stage bits, then set them from the Stage. 

545 new_flags = self.flags & ~FLAG_STAGEMASK 

546 new_flags |= stage.value << FLAG_STAGESHIFT 

547 return SerializedIndexEntry( 

548 name=name, 

549 ctime=self.ctime, 

550 mtime=self.mtime, 

551 dev=self.dev, 

552 ino=self.ino, 

553 mode=self.mode, 

554 uid=self.uid, 

555 gid=self.gid, 

556 size=self.size, 

557 sha=self.sha, 

558 flags=new_flags, 

559 extended_flags=self.extended_flags, 

560 ) 

561 

562 def stage(self) -> Stage: 

563 """Get the merge conflict stage of this entry. 

564 

565 Returns: 

566 Stage enum value 

567 """ 

568 return Stage((self.flags & FLAG_STAGEMASK) >> FLAG_STAGESHIFT) 

569 

570 @property 

571 def skip_worktree(self) -> bool: 

572 """Return True if the skip-worktree bit is set in extended_flags.""" 

573 return bool(self.extended_flags & EXTENDED_FLAG_SKIP_WORKTREE) 

574 

575 def set_skip_worktree(self, skip: bool = True) -> None: 

576 """Helper method to set or clear the skip-worktree bit in extended_flags. 

577 

578 Also sets FLAG_EXTENDED in self.flags if needed. 

579 """ 

580 if skip: 

581 # Turn on the skip-worktree bit 

582 self.extended_flags |= EXTENDED_FLAG_SKIP_WORKTREE 

583 # Also ensure the main 'extended' bit is set in flags 

584 self.flags |= FLAG_EXTENDED 

585 else: 

586 # Turn off the skip-worktree bit 

587 self.extended_flags &= ~EXTENDED_FLAG_SKIP_WORKTREE 

588 # Optionally unset the main extended bit if no extended flags remain 

589 if self.extended_flags == 0: 

590 self.flags &= ~FLAG_EXTENDED 

591 

592 def is_sparse_dir(self, name: bytes) -> bool: 

593 """Check if this entry represents a sparse directory. 

594 

595 A sparse directory entry is a collapsed representation of an entire 

596 directory tree in a sparse index. It has: 

597 - Directory mode (0o040000) 

598 - SKIP_WORKTREE flag set 

599 - Path ending with '/' 

600 - SHA pointing to a tree object 

601 

602 Args: 

603 name: The path name for this entry (IndexEntry doesn't store name) 

604 

605 Returns: 

606 True if entry is a sparse directory entry 

607 """ 

608 return ( 

609 stat.S_ISDIR(self.mode) 

610 and bool(self.extended_flags & EXTENDED_FLAG_SKIP_WORKTREE) 

611 and name.endswith(b"/") 

612 ) 

613 

614 

615class ConflictedIndexEntry: 

616 """Index entry that represents a conflict.""" 

617 

618 ancestor: Optional[IndexEntry] 

619 this: Optional[IndexEntry] 

620 other: Optional[IndexEntry] 

621 

622 def __init__( 

623 self, 

624 ancestor: Optional[IndexEntry] = None, 

625 this: Optional[IndexEntry] = None, 

626 other: Optional[IndexEntry] = None, 

627 ) -> None: 

628 """Initialize ConflictedIndexEntry. 

629 

630 Args: 

631 ancestor: The common ancestor entry 

632 this: The current branch entry 

633 other: The other branch entry 

634 """ 

635 self.ancestor = ancestor 

636 self.this = this 

637 self.other = other 

638 

639 

640class UnmergedEntries(Exception): 

641 """Unmerged entries exist in the index.""" 

642 

643 

644def pathsplit(path: bytes) -> tuple[bytes, bytes]: 

645 """Split a /-delimited path into a directory part and a basename. 

646 

647 Args: 

648 path: The path to split. 

649 

650 Returns: 

651 Tuple with directory name and basename 

652 """ 

653 try: 

654 (dirname, basename) = path.rsplit(b"/", 1) 

655 except ValueError: 

656 return (b"", path) 

657 else: 

658 return (dirname, basename) 

659 

660 

661def pathjoin(*args: bytes) -> bytes: 

662 """Join a /-delimited path.""" 

663 return b"/".join([p for p in args if p]) 

664 

665 

666def read_cache_time(f: BinaryIO) -> tuple[int, int]: 

667 """Read a cache time. 

668 

669 Args: 

670 f: File-like object to read from 

671 Returns: 

672 Tuple with seconds and nanoseconds 

673 """ 

674 return struct.unpack(">LL", f.read(8)) 

675 

676 

677def write_cache_time(f: IO[bytes], t: Union[int, float, tuple[int, int]]) -> None: 

678 """Write a cache time. 

679 

680 Args: 

681 f: File-like object to write to 

682 t: Time to write (as int, float or tuple with secs and nsecs) 

683 """ 

684 if isinstance(t, int): 

685 t = (t, 0) 

686 elif isinstance(t, float): 

687 (secs, nsecs) = divmod(t, 1.0) 

688 t = (int(secs), int(nsecs * 1000000000)) 

689 elif not isinstance(t, tuple): 

690 raise TypeError(t) 

691 f.write(struct.pack(">LL", *t)) 

692 

693 

694def read_cache_entry( 

695 f: BinaryIO, version: int, previous_path: bytes = b"" 

696) -> SerializedIndexEntry: 

697 """Read an entry from a cache file. 

698 

699 Args: 

700 f: File-like object to read from 

701 version: Index version 

702 previous_path: Previous entry's path (for version 4 compression) 

703 """ 

704 beginoffset = f.tell() 

705 ctime = read_cache_time(f) 

706 mtime = read_cache_time(f) 

707 ( 

708 dev, 

709 ino, 

710 mode, 

711 uid, 

712 gid, 

713 size, 

714 sha, 

715 flags, 

716 ) = struct.unpack(">LLLLLL20sH", f.read(20 + 4 * 6 + 2)) 

717 if flags & FLAG_EXTENDED: 

718 if version < 3: 

719 raise AssertionError("extended flag set in index with version < 3") 

720 (extended_flags,) = struct.unpack(">H", f.read(2)) 

721 else: 

722 extended_flags = 0 

723 

724 if version >= 4: 

725 # Version 4: paths are always compressed (name_len should be 0) 

726 name, _consumed = _decompress_path_from_stream(f, previous_path) 

727 else: 

728 # Versions < 4: regular name reading 

729 name = f.read(flags & FLAG_NAMEMASK) 

730 

731 # Padding: 

732 if version < 4: 

733 real_size = (f.tell() - beginoffset + 8) & ~7 

734 f.read((beginoffset + real_size) - f.tell()) 

735 

736 return SerializedIndexEntry( 

737 name, 

738 ctime, 

739 mtime, 

740 dev, 

741 ino, 

742 mode, 

743 uid, 

744 gid, 

745 size, 

746 sha_to_hex(sha), 

747 flags & ~FLAG_NAMEMASK, 

748 extended_flags, 

749 ) 

750 

751 

752def write_cache_entry( 

753 f: IO[bytes], entry: SerializedIndexEntry, version: int, previous_path: bytes = b"" 

754) -> None: 

755 """Write an index entry to a file. 

756 

757 Args: 

758 f: File object 

759 entry: IndexEntry to write 

760 version: Index format version 

761 previous_path: Previous entry's path (for version 4 compression) 

762 """ 

763 beginoffset = f.tell() 

764 write_cache_time(f, entry.ctime) 

765 write_cache_time(f, entry.mtime) 

766 

767 if version >= 4: 

768 # Version 4: use compression but set name_len to actual filename length 

769 # This matches how C Git implements index v4 flags 

770 compressed_path = _compress_path(entry.name, previous_path) 

771 flags = len(entry.name) | (entry.flags & ~FLAG_NAMEMASK) 

772 else: 

773 # Versions < 4: include actual name length 

774 flags = len(entry.name) | (entry.flags & ~FLAG_NAMEMASK) 

775 

776 if entry.extended_flags: 

777 flags |= FLAG_EXTENDED 

778 if flags & FLAG_EXTENDED and version is not None and version < 3: 

779 raise AssertionError("unable to use extended flags in version < 3") 

780 

781 f.write( 

782 struct.pack( 

783 b">LLLLLL20sH", 

784 entry.dev & 0xFFFFFFFF, 

785 entry.ino & 0xFFFFFFFF, 

786 entry.mode, 

787 entry.uid, 

788 entry.gid, 

789 entry.size, 

790 hex_to_sha(entry.sha), 

791 flags, 

792 ) 

793 ) 

794 if flags & FLAG_EXTENDED: 

795 f.write(struct.pack(b">H", entry.extended_flags)) 

796 

797 if version >= 4: 

798 # Version 4: always write compressed path 

799 f.write(compressed_path) 

800 else: 

801 # Versions < 4: write regular path and padding 

802 f.write(entry.name) 

803 real_size = (f.tell() - beginoffset + 8) & ~7 

804 f.write(b"\0" * ((beginoffset + real_size) - f.tell())) 

805 

806 

807class UnsupportedIndexFormat(Exception): 

808 """An unsupported index format was encountered.""" 

809 

810 def __init__(self, version: int) -> None: 

811 """Initialize UnsupportedIndexFormat exception. 

812 

813 Args: 

814 version: The unsupported index format version 

815 """ 

816 self.index_format_version = version 

817 

818 

819def read_index_header(f: BinaryIO) -> tuple[int, int]: 

820 """Read an index header from a file. 

821 

822 Returns: 

823 tuple of (version, num_entries) 

824 """ 

825 header = f.read(4) 

826 if header != b"DIRC": 

827 raise AssertionError(f"Invalid index file header: {header!r}") 

828 (version, num_entries) = struct.unpack(b">LL", f.read(4 * 2)) 

829 if version not in (1, 2, 3, 4): 

830 raise UnsupportedIndexFormat(version) 

831 return version, num_entries 

832 

833 

834def write_index_extension(f: IO[bytes], extension: IndexExtension) -> None: 

835 """Write an index extension. 

836 

837 Args: 

838 f: File-like object to write to 

839 extension: Extension to write 

840 """ 

841 data = extension.to_bytes() 

842 f.write(extension.signature) 

843 f.write(struct.pack(">I", len(data))) 

844 f.write(data) 

845 

846 

847def read_index(f: BinaryIO) -> Iterator[SerializedIndexEntry]: 

848 """Read an index file, yielding the individual entries.""" 

849 version, num_entries = read_index_header(f) 

850 previous_path = b"" 

851 for i in range(num_entries): 

852 entry = read_cache_entry(f, version, previous_path) 

853 previous_path = entry.name 

854 yield entry 

855 

856 

857def read_index_dict_with_version( 

858 f: BinaryIO, 

859) -> tuple[ 

860 dict[bytes, Union[IndexEntry, ConflictedIndexEntry]], int, list[IndexExtension] 

861]: 

862 """Read an index file and return it as a dictionary along with the version. 

863 

864 Returns: 

865 tuple of (entries_dict, version, extensions) 

866 """ 

867 version, num_entries = read_index_header(f) 

868 

869 ret: dict[bytes, Union[IndexEntry, ConflictedIndexEntry]] = {} 

870 previous_path = b"" 

871 for i in range(num_entries): 

872 entry = read_cache_entry(f, version, previous_path) 

873 previous_path = entry.name 

874 stage = entry.stage() 

875 if stage == Stage.NORMAL: 

876 ret[entry.name] = IndexEntry.from_serialized(entry) 

877 else: 

878 existing = ret.setdefault(entry.name, ConflictedIndexEntry()) 

879 if isinstance(existing, IndexEntry): 

880 raise AssertionError(f"Non-conflicted entry for {entry.name!r} exists") 

881 if stage == Stage.MERGE_CONFLICT_ANCESTOR: 

882 existing.ancestor = IndexEntry.from_serialized(entry) 

883 elif stage == Stage.MERGE_CONFLICT_THIS: 

884 existing.this = IndexEntry.from_serialized(entry) 

885 elif stage == Stage.MERGE_CONFLICT_OTHER: 

886 existing.other = IndexEntry.from_serialized(entry) 

887 

888 # Read extensions 

889 extensions = [] 

890 while True: 

891 # Check if we're at the end (20 bytes before EOF for SHA checksum) 

892 current_pos = f.tell() 

893 f.seek(0, 2) # EOF 

894 eof_pos = f.tell() 

895 f.seek(current_pos) 

896 

897 if current_pos >= eof_pos - 20: 

898 break 

899 

900 # Try to read extension signature 

901 signature = f.read(4) 

902 if len(signature) < 4: 

903 break 

904 

905 # Check if it's a valid extension signature (4 uppercase letters) 

906 if not all(65 <= b <= 90 for b in signature): 

907 # Not an extension, seek back 

908 f.seek(-4, 1) 

909 break 

910 

911 # Read extension size 

912 size_data = f.read(4) 

913 if len(size_data) < 4: 

914 break 

915 size = struct.unpack(">I", size_data)[0] 

916 

917 # Read extension data 

918 data = f.read(size) 

919 if len(data) < size: 

920 break 

921 

922 extension = IndexExtension.from_raw(signature, data) 

923 extensions.append(extension) 

924 

925 return ret, version, extensions 

926 

927 

928def read_index_dict( 

929 f: BinaryIO, 

930) -> dict[bytes, Union[IndexEntry, ConflictedIndexEntry]]: 

931 """Read an index file and return it as a dictionary. 

932 

933 Dict Key is tuple of path and stage number, as 

934 path alone is not unique 

935 Args: 

936 f: File object to read fromls. 

937 """ 

938 ret: dict[bytes, Union[IndexEntry, ConflictedIndexEntry]] = {} 

939 for entry in read_index(f): 

940 stage = entry.stage() 

941 if stage == Stage.NORMAL: 

942 ret[entry.name] = IndexEntry.from_serialized(entry) 

943 else: 

944 existing = ret.setdefault(entry.name, ConflictedIndexEntry()) 

945 if isinstance(existing, IndexEntry): 

946 raise AssertionError(f"Non-conflicted entry for {entry.name!r} exists") 

947 if stage == Stage.MERGE_CONFLICT_ANCESTOR: 

948 existing.ancestor = IndexEntry.from_serialized(entry) 

949 elif stage == Stage.MERGE_CONFLICT_THIS: 

950 existing.this = IndexEntry.from_serialized(entry) 

951 elif stage == Stage.MERGE_CONFLICT_OTHER: 

952 existing.other = IndexEntry.from_serialized(entry) 

953 return ret 

954 

955 

956def write_index( 

957 f: IO[bytes], 

958 entries: Sequence[SerializedIndexEntry], 

959 version: Optional[int] = None, 

960 extensions: Optional[Sequence[IndexExtension]] = None, 

961) -> None: 

962 """Write an index file. 

963 

964 Args: 

965 f: File-like object to write to 

966 version: Version number to write 

967 entries: Iterable over the entries to write 

968 extensions: Optional list of extensions to write 

969 """ 

970 if version is None: 

971 version = DEFAULT_VERSION 

972 # STEP 1: check if any extended_flags are set 

973 uses_extended_flags = any(e.extended_flags != 0 for e in entries) 

974 if uses_extended_flags and version < 3: 

975 # Force or bump the version to 3 

976 version = 3 

977 # The rest is unchanged, but you might insert a final check: 

978 if version < 3: 

979 # Double-check no extended flags appear 

980 for e in entries: 

981 if e.extended_flags != 0: 

982 raise AssertionError("Attempt to use extended flags in index < v3") 

983 # Proceed with the existing code to write the header and entries. 

984 f.write(b"DIRC") 

985 f.write(struct.pack(b">LL", version, len(entries))) 

986 previous_path = b"" 

987 for entry in entries: 

988 write_cache_entry(f, entry, version=version, previous_path=previous_path) 

989 previous_path = entry.name 

990 

991 # Write extensions 

992 if extensions: 

993 for extension in extensions: 

994 write_index_extension(f, extension) 

995 

996 

997def write_index_dict( 

998 f: IO[bytes], 

999 entries: Mapping[bytes, Union[IndexEntry, ConflictedIndexEntry]], 

1000 version: Optional[int] = None, 

1001 extensions: Optional[Sequence[IndexExtension]] = None, 

1002) -> None: 

1003 """Write an index file based on the contents of a dictionary. 

1004 

1005 being careful to sort by path and then by stage. 

1006 """ 

1007 entries_list = [] 

1008 for key in sorted(entries): 

1009 value = entries[key] 

1010 if isinstance(value, ConflictedIndexEntry): 

1011 if value.ancestor is not None: 

1012 entries_list.append( 

1013 value.ancestor.serialize(key, Stage.MERGE_CONFLICT_ANCESTOR) 

1014 ) 

1015 if value.this is not None: 

1016 entries_list.append( 

1017 value.this.serialize(key, Stage.MERGE_CONFLICT_THIS) 

1018 ) 

1019 if value.other is not None: 

1020 entries_list.append( 

1021 value.other.serialize(key, Stage.MERGE_CONFLICT_OTHER) 

1022 ) 

1023 else: 

1024 entries_list.append(value.serialize(key, Stage.NORMAL)) 

1025 

1026 write_index(f, entries_list, version=version, extensions=extensions) 

1027 

1028 

1029def cleanup_mode(mode: int) -> int: 

1030 """Cleanup a mode value. 

1031 

1032 This will return a mode that can be stored in a tree object. 

1033 

1034 Args: 

1035 mode: Mode to clean up. 

1036 

1037 Returns: 

1038 mode 

1039 """ 

1040 if stat.S_ISLNK(mode): 

1041 return stat.S_IFLNK 

1042 elif stat.S_ISDIR(mode): 

1043 return stat.S_IFDIR 

1044 elif S_ISGITLINK(mode): 

1045 return S_IFGITLINK 

1046 ret = stat.S_IFREG | 0o644 

1047 if mode & 0o100: 

1048 ret |= 0o111 

1049 return ret 

1050 

1051 

1052class Index: 

1053 """A Git Index file.""" 

1054 

1055 _byname: dict[bytes, Union[IndexEntry, ConflictedIndexEntry]] 

1056 

1057 def __init__( 

1058 self, 

1059 filename: Union[bytes, str, os.PathLike[str]], 

1060 read: bool = True, 

1061 skip_hash: bool = False, 

1062 version: Optional[int] = None, 

1063 ) -> None: 

1064 """Create an index object associated with the given filename. 

1065 

1066 Args: 

1067 filename: Path to the index file 

1068 read: Whether to initialize the index from the given file, should it exist. 

1069 skip_hash: Whether to skip SHA1 hash when writing (for manyfiles feature) 

1070 version: Index format version to use (None = auto-detect from file or use default) 

1071 """ 

1072 self._filename = os.fspath(filename) 

1073 # TODO(jelmer): Store the version returned by read_index 

1074 self._version = version 

1075 self._skip_hash = skip_hash 

1076 self._extensions: list[IndexExtension] = [] 

1077 self.clear() 

1078 if read: 

1079 self.read() 

1080 

1081 @property 

1082 def path(self) -> Union[bytes, str]: 

1083 """Get the path to the index file. 

1084 

1085 Returns: 

1086 Path to the index file 

1087 """ 

1088 return self._filename 

1089 

1090 def __repr__(self) -> str: 

1091 """Return string representation of Index.""" 

1092 return f"{self.__class__.__name__}({self._filename!r})" 

1093 

1094 def write(self) -> None: 

1095 """Write current contents of index to disk.""" 

1096 f = GitFile(self._filename, "wb") 

1097 try: 

1098 # Filter out extensions with no meaningful data 

1099 meaningful_extensions = [] 

1100 for ext in self._extensions: 

1101 # Skip extensions that have empty data 

1102 ext_data = ext.to_bytes() 

1103 if ext_data: 

1104 meaningful_extensions.append(ext) 

1105 

1106 if self._skip_hash: 

1107 # When skipHash is enabled, write the index without computing SHA1 

1108 write_index_dict( 

1109 f, 

1110 self._byname, 

1111 version=self._version, 

1112 extensions=meaningful_extensions, 

1113 ) 

1114 # Write 20 zero bytes instead of SHA1 

1115 f.write(b"\x00" * 20) 

1116 f.close() 

1117 else: 

1118 sha1_writer = SHA1Writer(f) 

1119 write_index_dict( 

1120 sha1_writer, 

1121 self._byname, 

1122 version=self._version, 

1123 extensions=meaningful_extensions, 

1124 ) 

1125 sha1_writer.close() 

1126 except: 

1127 f.close() 

1128 raise 

1129 

1130 def read(self) -> None: 

1131 """Read current contents of index from disk.""" 

1132 if not os.path.exists(self._filename): 

1133 return 

1134 f = GitFile(self._filename, "rb") 

1135 try: 

1136 sha1_reader = SHA1Reader(f) 

1137 entries, version, extensions = read_index_dict_with_version(sha1_reader) 

1138 self._version = version 

1139 self._extensions = extensions 

1140 self.update(entries) 

1141 # Extensions have already been read by read_index_dict_with_version 

1142 sha1_reader.check_sha(allow_empty=True) 

1143 finally: 

1144 f.close() 

1145 

1146 def __len__(self) -> int: 

1147 """Number of entries in this index file.""" 

1148 return len(self._byname) 

1149 

1150 def __getitem__(self, key: bytes) -> Union[IndexEntry, ConflictedIndexEntry]: 

1151 """Retrieve entry by relative path and stage. 

1152 

1153 Returns: Either a IndexEntry or a ConflictedIndexEntry 

1154 Raises KeyError: if the entry does not exist 

1155 """ 

1156 return self._byname[key] 

1157 

1158 def __iter__(self) -> Iterator[bytes]: 

1159 """Iterate over the paths and stages in this index.""" 

1160 return iter(self._byname) 

1161 

1162 def __contains__(self, key: bytes) -> bool: 

1163 """Check if a path exists in the index.""" 

1164 return key in self._byname 

1165 

1166 def get_sha1(self, path: bytes) -> bytes: 

1167 """Return the (git object) SHA1 for the object at a path.""" 

1168 value = self[path] 

1169 if isinstance(value, ConflictedIndexEntry): 

1170 raise UnmergedEntries 

1171 return value.sha 

1172 

1173 def get_mode(self, path: bytes) -> int: 

1174 """Return the POSIX file mode for the object at a path.""" 

1175 value = self[path] 

1176 if isinstance(value, ConflictedIndexEntry): 

1177 raise UnmergedEntries 

1178 return value.mode 

1179 

1180 def iterobjects(self) -> Iterable[tuple[bytes, bytes, int]]: 

1181 """Iterate over path, sha, mode tuples for use with commit_tree.""" 

1182 for path in self: 

1183 entry = self[path] 

1184 if isinstance(entry, ConflictedIndexEntry): 

1185 raise UnmergedEntries 

1186 yield path, entry.sha, cleanup_mode(entry.mode) 

1187 

1188 def has_conflicts(self) -> bool: 

1189 """Check if the index contains any conflicted entries. 

1190 

1191 Returns: 

1192 True if any entries are conflicted, False otherwise 

1193 """ 

1194 for value in self._byname.values(): 

1195 if isinstance(value, ConflictedIndexEntry): 

1196 return True 

1197 return False 

1198 

1199 def clear(self) -> None: 

1200 """Remove all contents from this index.""" 

1201 self._byname = {} 

1202 

1203 def __setitem__( 

1204 self, name: bytes, value: Union[IndexEntry, ConflictedIndexEntry] 

1205 ) -> None: 

1206 """Set an entry in the index.""" 

1207 assert isinstance(name, bytes) 

1208 self._byname[name] = value 

1209 

1210 def __delitem__(self, name: bytes) -> None: 

1211 """Delete an entry from the index.""" 

1212 del self._byname[name] 

1213 

1214 def iteritems( 

1215 self, 

1216 ) -> Iterator[tuple[bytes, Union[IndexEntry, ConflictedIndexEntry]]]: 

1217 """Iterate over (path, entry) pairs in the index. 

1218 

1219 Returns: 

1220 Iterator of (path, entry) tuples 

1221 """ 

1222 return iter(self._byname.items()) 

1223 

1224 def items(self) -> Iterator[tuple[bytes, Union[IndexEntry, ConflictedIndexEntry]]]: 

1225 """Get an iterator over (path, entry) pairs. 

1226 

1227 Returns: 

1228 Iterator of (path, entry) tuples 

1229 """ 

1230 return iter(self._byname.items()) 

1231 

1232 def update( 

1233 self, entries: dict[bytes, Union[IndexEntry, ConflictedIndexEntry]] 

1234 ) -> None: 

1235 """Update the index with multiple entries. 

1236 

1237 Args: 

1238 entries: Dictionary mapping paths to index entries 

1239 """ 

1240 for key, value in entries.items(): 

1241 self[key] = value 

1242 

1243 def paths(self) -> Generator[bytes, None, None]: 

1244 """Generate all paths in the index. 

1245 

1246 Yields: 

1247 Path names as bytes 

1248 """ 

1249 yield from self._byname.keys() 

1250 

1251 def changes_from_tree( 

1252 self, 

1253 object_store: ObjectContainer, 

1254 tree: ObjectID, 

1255 want_unchanged: bool = False, 

1256 ) -> Generator[ 

1257 tuple[ 

1258 tuple[Optional[bytes], Optional[bytes]], 

1259 tuple[Optional[int], Optional[int]], 

1260 tuple[Optional[bytes], Optional[bytes]], 

1261 ], 

1262 None, 

1263 None, 

1264 ]: 

1265 """Find the differences between the contents of this index and a tree. 

1266 

1267 Args: 

1268 object_store: Object store to use for retrieving tree contents 

1269 tree: SHA1 of the root tree 

1270 want_unchanged: Whether unchanged files should be reported 

1271 Returns: Iterator over tuples with (oldpath, newpath), (oldmode, 

1272 newmode), (oldsha, newsha) 

1273 """ 

1274 

1275 def lookup_entry(path: bytes) -> tuple[bytes, int]: 

1276 entry = self[path] 

1277 if hasattr(entry, "sha") and hasattr(entry, "mode"): 

1278 return entry.sha, cleanup_mode(entry.mode) 

1279 else: 

1280 # Handle ConflictedIndexEntry case 

1281 return b"", 0 

1282 

1283 yield from changes_from_tree( 

1284 self.paths(), 

1285 lookup_entry, 

1286 object_store, 

1287 tree, 

1288 want_unchanged=want_unchanged, 

1289 ) 

1290 

1291 def commit(self, object_store: ObjectContainer) -> bytes: 

1292 """Create a new tree from an index. 

1293 

1294 Args: 

1295 object_store: Object store to save the tree in 

1296 Returns: 

1297 Root tree SHA 

1298 """ 

1299 return commit_tree(object_store, self.iterobjects()) 

1300 

1301 def is_sparse(self) -> bool: 

1302 """Check if this index contains sparse directory entries. 

1303 

1304 Returns: 

1305 True if any sparse directory extension is present 

1306 """ 

1307 return any(isinstance(ext, SparseDirExtension) for ext in self._extensions) 

1308 

1309 def ensure_full_index(self, object_store: "BaseObjectStore") -> None: 

1310 """Expand all sparse directory entries into full file entries. 

1311 

1312 This converts a sparse index into a full index by recursively 

1313 expanding any sparse directory entries into their constituent files. 

1314 

1315 Args: 

1316 object_store: Object store to read tree objects from 

1317 

1318 Raises: 

1319 KeyError: If a tree object referenced by a sparse dir entry doesn't exist 

1320 """ 

1321 if not self.is_sparse(): 

1322 return 

1323 

1324 # Find all sparse directory entries 

1325 sparse_dirs = [] 

1326 for path, entry in list(self._byname.items()): 

1327 if isinstance(entry, IndexEntry) and entry.is_sparse_dir(path): 

1328 sparse_dirs.append((path, entry)) 

1329 

1330 # Expand each sparse directory 

1331 for path, entry in sparse_dirs: 

1332 # Remove the sparse directory entry 

1333 del self._byname[path] 

1334 

1335 # Get the tree object 

1336 tree = object_store[entry.sha] 

1337 if not isinstance(tree, Tree): 

1338 raise ValueError(f"Sparse directory {path!r} points to non-tree object") 

1339 

1340 # Recursively add all entries from the tree 

1341 self._expand_tree(path.rstrip(b"/"), tree, object_store, entry) 

1342 

1343 # Remove the sparse directory extension 

1344 self._extensions = [ 

1345 ext for ext in self._extensions if not isinstance(ext, SparseDirExtension) 

1346 ] 

1347 

1348 def _expand_tree( 

1349 self, 

1350 prefix: bytes, 

1351 tree: Tree, 

1352 object_store: "BaseObjectStore", 

1353 template_entry: IndexEntry, 

1354 ) -> None: 

1355 """Recursively expand a tree into index entries. 

1356 

1357 Args: 

1358 prefix: Path prefix for entries (without trailing slash) 

1359 tree: Tree object to expand 

1360 object_store: Object store to read nested trees from 

1361 template_entry: Template entry to copy metadata from 

1362 """ 

1363 for name, mode, sha in tree.items(): 

1364 if prefix: 

1365 full_path = prefix + b"/" + name 

1366 else: 

1367 full_path = name 

1368 

1369 if stat.S_ISDIR(mode): 

1370 # Recursively expand subdirectories 

1371 subtree = object_store[sha] 

1372 if not isinstance(subtree, Tree): 

1373 raise ValueError( 

1374 f"Directory entry {full_path!r} points to non-tree object" 

1375 ) 

1376 self._expand_tree(full_path, subtree, object_store, template_entry) 

1377 else: 

1378 # Create an index entry for this file 

1379 # Use the template entry for metadata but with the file's sha and mode 

1380 new_entry = IndexEntry( 

1381 ctime=template_entry.ctime, 

1382 mtime=template_entry.mtime, 

1383 dev=template_entry.dev, 

1384 ino=template_entry.ino, 

1385 mode=mode, 

1386 uid=template_entry.uid, 

1387 gid=template_entry.gid, 

1388 size=0, # Size is unknown from tree 

1389 sha=sha, 

1390 flags=0, 

1391 extended_flags=0, # Don't copy skip-worktree flag 

1392 ) 

1393 self._byname[full_path] = new_entry 

1394 

1395 def convert_to_sparse( 

1396 self, 

1397 object_store: "BaseObjectStore", 

1398 tree_sha: bytes, 

1399 sparse_dirs: Set[bytes], 

1400 ) -> None: 

1401 """Convert full index entries to sparse directory entries. 

1402 

1403 This collapses directories that are entirely outside the sparse 

1404 checkout cone into single sparse directory entries. 

1405 

1406 Args: 

1407 object_store: Object store to read tree objects 

1408 tree_sha: SHA of the tree (usually HEAD) to base sparse dirs on 

1409 sparse_dirs: Set of directory paths (with trailing /) to collapse 

1410 

1411 Raises: 

1412 KeyError: If tree_sha or a subdirectory doesn't exist 

1413 """ 

1414 if not sparse_dirs: 

1415 return 

1416 

1417 # Get the base tree 

1418 tree = object_store[tree_sha] 

1419 if not isinstance(tree, Tree): 

1420 raise ValueError(f"tree_sha {tree_sha!r} is not a tree object") 

1421 

1422 # For each sparse directory, find its tree SHA and create sparse entry 

1423 for dir_path in sparse_dirs: 

1424 dir_path_stripped = dir_path.rstrip(b"/") 

1425 

1426 # Find the tree SHA for this directory 

1427 subtree_sha = self._find_subtree_sha(tree, dir_path_stripped, object_store) 

1428 if subtree_sha is None: 

1429 # Directory doesn't exist in tree, skip it 

1430 continue 

1431 

1432 # Remove all entries under this directory 

1433 entries_to_remove = [ 

1434 path 

1435 for path in self._byname 

1436 if path.startswith(dir_path) or path == dir_path_stripped 

1437 ] 

1438 for path in entries_to_remove: 

1439 del self._byname[path] 

1440 

1441 # Create a sparse directory entry 

1442 # Use minimal metadata since it's not a real file 

1443 sparse_entry = IndexEntry( 

1444 ctime=0, 

1445 mtime=0, 

1446 dev=0, 

1447 ino=0, 

1448 mode=stat.S_IFDIR, 

1449 uid=0, 

1450 gid=0, 

1451 size=0, 

1452 sha=subtree_sha, 

1453 flags=0, 

1454 extended_flags=EXTENDED_FLAG_SKIP_WORKTREE, 

1455 ) 

1456 self._byname[dir_path] = sparse_entry 

1457 

1458 # Add sparse directory extension if not present 

1459 if not self.is_sparse(): 

1460 self._extensions.append(SparseDirExtension()) 

1461 

1462 def _find_subtree_sha( 

1463 self, 

1464 tree: Tree, 

1465 path: bytes, 

1466 object_store: "BaseObjectStore", 

1467 ) -> Optional[bytes]: 

1468 """Find the SHA of a subtree at a given path. 

1469 

1470 Args: 

1471 tree: Root tree object to search in 

1472 path: Path to the subdirectory (no trailing slash) 

1473 object_store: Object store to read nested trees from 

1474 

1475 Returns: 

1476 SHA of the subtree, or None if path doesn't exist 

1477 """ 

1478 if not path: 

1479 return tree.id 

1480 

1481 parts = path.split(b"/") 

1482 current_tree = tree 

1483 

1484 for part in parts: 

1485 # Look for this part in the current tree 

1486 try: 

1487 mode, sha = current_tree[part] 

1488 except KeyError: 

1489 return None 

1490 

1491 if not stat.S_ISDIR(mode): 

1492 # Path component is a file, not a directory 

1493 return None 

1494 

1495 # Load the next tree 

1496 obj = object_store[sha] 

1497 if not isinstance(obj, Tree): 

1498 return None 

1499 current_tree = obj 

1500 

1501 return current_tree.id 

1502 

1503 

1504def commit_tree( 

1505 object_store: ObjectContainer, blobs: Iterable[tuple[bytes, bytes, int]] 

1506) -> bytes: 

1507 """Commit a new tree. 

1508 

1509 Args: 

1510 object_store: Object store to add trees to 

1511 blobs: Iterable over blob path, sha, mode entries 

1512 Returns: 

1513 SHA1 of the created tree. 

1514 """ 

1515 trees: dict[bytes, TreeDict] = {b"": {}} 

1516 

1517 def add_tree(path: bytes) -> TreeDict: 

1518 if path in trees: 

1519 return trees[path] 

1520 dirname, basename = pathsplit(path) 

1521 t = add_tree(dirname) 

1522 assert isinstance(basename, bytes) 

1523 newtree: TreeDict = {} 

1524 t[basename] = newtree 

1525 trees[path] = newtree 

1526 return newtree 

1527 

1528 for path, sha, mode in blobs: 

1529 tree_path, basename = pathsplit(path) 

1530 tree = add_tree(tree_path) 

1531 tree[basename] = (mode, sha) 

1532 

1533 def build_tree(path: bytes) -> bytes: 

1534 tree = Tree() 

1535 for basename, entry in trees[path].items(): 

1536 if isinstance(entry, dict): 

1537 mode = stat.S_IFDIR 

1538 sha = build_tree(pathjoin(path, basename)) 

1539 else: 

1540 (mode, sha) = entry 

1541 tree.add(basename, mode, sha) 

1542 object_store.add_object(tree) 

1543 return tree.id 

1544 

1545 return build_tree(b"") 

1546 

1547 

1548def commit_index(object_store: ObjectContainer, index: Index) -> bytes: 

1549 """Create a new tree from an index. 

1550 

1551 Args: 

1552 object_store: Object store to save the tree in 

1553 index: Index file 

1554 Note: This function is deprecated, use index.commit() instead. 

1555 Returns: Root tree sha. 

1556 """ 

1557 return commit_tree(object_store, index.iterobjects()) 

1558 

1559 

1560def changes_from_tree( 

1561 names: Iterable[bytes], 

1562 lookup_entry: Callable[[bytes], tuple[bytes, int]], 

1563 object_store: ObjectContainer, 

1564 tree: Optional[bytes], 

1565 want_unchanged: bool = False, 

1566) -> Iterable[ 

1567 tuple[ 

1568 tuple[Optional[bytes], Optional[bytes]], 

1569 tuple[Optional[int], Optional[int]], 

1570 tuple[Optional[bytes], Optional[bytes]], 

1571 ] 

1572]: 

1573 """Find the differences between the contents of a tree and a working copy. 

1574 

1575 Args: 

1576 names: Iterable of names in the working copy 

1577 lookup_entry: Function to lookup an entry in the working copy 

1578 object_store: Object store to use for retrieving tree contents 

1579 tree: SHA1 of the root tree, or None for an empty tree 

1580 want_unchanged: Whether unchanged files should be reported 

1581 Returns: Iterator over tuples with (oldpath, newpath), (oldmode, newmode), 

1582 (oldsha, newsha) 

1583 """ 

1584 # TODO(jelmer): Support a include_trees option 

1585 other_names = set(names) 

1586 

1587 if tree is not None: 

1588 for name, mode, sha in iter_tree_contents(object_store, tree): 

1589 assert name is not None and mode is not None and sha is not None 

1590 try: 

1591 (other_sha, other_mode) = lookup_entry(name) 

1592 except KeyError: 

1593 # Was removed 

1594 yield ((name, None), (mode, None), (sha, None)) 

1595 else: 

1596 other_names.remove(name) 

1597 if want_unchanged or other_sha != sha or other_mode != mode: 

1598 yield ((name, name), (mode, other_mode), (sha, other_sha)) 

1599 

1600 # Mention added files 

1601 for name in other_names: 

1602 try: 

1603 (other_sha, other_mode) = lookup_entry(name) 

1604 except KeyError: 

1605 pass 

1606 else: 

1607 yield ((None, name), (None, other_mode), (None, other_sha)) 

1608 

1609 

1610def index_entry_from_stat( 

1611 stat_val: os.stat_result, 

1612 hex_sha: bytes, 

1613 mode: Optional[int] = None, 

1614) -> IndexEntry: 

1615 """Create a new index entry from a stat value. 

1616 

1617 Args: 

1618 stat_val: POSIX stat_result instance 

1619 hex_sha: Hex sha of the object 

1620 mode: Optional file mode, will be derived from stat if not provided 

1621 """ 

1622 if mode is None: 

1623 mode = cleanup_mode(stat_val.st_mode) 

1624 

1625 return IndexEntry( 

1626 ctime=stat_val.st_ctime, 

1627 mtime=stat_val.st_mtime, 

1628 dev=stat_val.st_dev, 

1629 ino=stat_val.st_ino, 

1630 mode=mode, 

1631 uid=stat_val.st_uid, 

1632 gid=stat_val.st_gid, 

1633 size=stat_val.st_size, 

1634 sha=hex_sha, 

1635 flags=0, 

1636 extended_flags=0, 

1637 ) 

1638 

1639 

1640if sys.platform == "win32": 

1641 # On Windows, creating symlinks either requires administrator privileges 

1642 # or developer mode. Raise a more helpful error when we're unable to 

1643 # create symlinks 

1644 

1645 # https://github.com/jelmer/dulwich/issues/1005 

1646 

1647 class WindowsSymlinkPermissionError(PermissionError): 

1648 """Windows-specific error for symlink creation failures. 

1649 

1650 This error is raised when symlink creation fails on Windows, 

1651 typically due to lack of developer mode or administrator privileges. 

1652 """ 

1653 

1654 def __init__(self, errno: int, msg: str, filename: Optional[str]) -> None: 

1655 """Initialize WindowsSymlinkPermissionError.""" 

1656 super(PermissionError, self).__init__( 

1657 errno, 

1658 f"Unable to create symlink; do you have developer mode enabled? {msg}", 

1659 filename, 

1660 ) 

1661 

1662 def symlink( 

1663 src: Union[str, bytes], 

1664 dst: Union[str, bytes], 

1665 target_is_directory: bool = False, 

1666 *, 

1667 dir_fd: Optional[int] = None, 

1668 ) -> None: 

1669 """Create a symbolic link on Windows with better error handling. 

1670 

1671 Args: 

1672 src: Source path for the symlink 

1673 dst: Destination path where symlink will be created 

1674 target_is_directory: Whether the target is a directory 

1675 dir_fd: Optional directory file descriptor 

1676 

1677 Raises: 

1678 WindowsSymlinkPermissionError: If symlink creation fails due to permissions 

1679 """ 

1680 try: 

1681 return os.symlink( 

1682 src, dst, target_is_directory=target_is_directory, dir_fd=dir_fd 

1683 ) 

1684 except PermissionError as e: 

1685 raise WindowsSymlinkPermissionError( 

1686 e.errno or 0, e.strerror or "", e.filename 

1687 ) from e 

1688else: 

1689 symlink = os.symlink 

1690 

1691 

1692def build_file_from_blob( 

1693 blob: Blob, 

1694 mode: int, 

1695 target_path: bytes, 

1696 *, 

1697 honor_filemode: bool = True, 

1698 tree_encoding: str = "utf-8", 

1699 symlink_fn: Optional[ 

1700 Callable[ 

1701 [Union[str, bytes, os.PathLike[str]], Union[str, bytes, os.PathLike[str]]], 

1702 None, 

1703 ] 

1704 ] = None, 

1705) -> os.stat_result: 

1706 """Build a file or symlink on disk based on a Git object. 

1707 

1708 Args: 

1709 blob: The git object 

1710 mode: File mode 

1711 target_path: Path to write to 

1712 honor_filemode: An optional flag to honor core.filemode setting in 

1713 config file, default is core.filemode=True, change executable bit 

1714 tree_encoding: Encoding to use for tree contents 

1715 symlink_fn: Function to use for creating symlinks 

1716 Returns: stat object for the file 

1717 """ 

1718 try: 

1719 oldstat = os.lstat(target_path) 

1720 except FileNotFoundError: 

1721 oldstat = None 

1722 contents = blob.as_raw_string() 

1723 if stat.S_ISLNK(mode): 

1724 if oldstat: 

1725 _remove_file_with_readonly_handling(target_path) 

1726 if sys.platform == "win32": 

1727 # os.readlink on Python3 on Windows requires a unicode string. 

1728 contents_str = contents.decode(tree_encoding) 

1729 target_path_str = target_path.decode(tree_encoding) 

1730 (symlink_fn or symlink)(contents_str, target_path_str) 

1731 else: 

1732 (symlink_fn or symlink)(contents, target_path) 

1733 else: 

1734 if oldstat is not None and oldstat.st_size == len(contents): 

1735 with open(target_path, "rb") as f: 

1736 if f.read() == contents: 

1737 return oldstat 

1738 

1739 with open(target_path, "wb") as f: 

1740 # Write out file 

1741 f.write(contents) 

1742 

1743 if honor_filemode: 

1744 os.chmod(target_path, mode) 

1745 

1746 return os.lstat(target_path) 

1747 

1748 

1749INVALID_DOTNAMES = (b".git", b".", b"..", b"") 

1750 

1751 

1752def _normalize_path_element_default(element: bytes) -> bytes: 

1753 """Normalize path element for default case-insensitive comparison.""" 

1754 return element.lower() 

1755 

1756 

1757def _normalize_path_element_ntfs(element: bytes) -> bytes: 

1758 """Normalize path element for NTFS filesystem.""" 

1759 return element.rstrip(b". ").lower() 

1760 

1761 

1762def _normalize_path_element_hfs(element: bytes) -> bytes: 

1763 """Normalize path element for HFS+ filesystem.""" 

1764 import unicodedata 

1765 

1766 # Decode to Unicode (let UnicodeDecodeError bubble up) 

1767 element_str = element.decode("utf-8", errors="strict") 

1768 

1769 # Remove HFS+ ignorable characters 

1770 filtered = "".join(c for c in element_str if ord(c) not in HFS_IGNORABLE_CHARS) 

1771 # Normalize to NFD 

1772 normalized = unicodedata.normalize("NFD", filtered) 

1773 return normalized.lower().encode("utf-8", errors="strict") 

1774 

1775 

1776def get_path_element_normalizer(config: "Config") -> Callable[[bytes], bytes]: 

1777 """Get the appropriate path element normalization function based on config. 

1778 

1779 Args: 

1780 config: Repository configuration object 

1781 

1782 Returns: 

1783 Function that normalizes path elements for the configured filesystem 

1784 """ 

1785 import os 

1786 import sys 

1787 

1788 if config.get_boolean(b"core", b"protectNTFS", os.name == "nt"): 

1789 return _normalize_path_element_ntfs 

1790 elif config.get_boolean(b"core", b"protectHFS", sys.platform == "darwin"): 

1791 return _normalize_path_element_hfs 

1792 else: 

1793 return _normalize_path_element_default 

1794 

1795 

1796def validate_path_element_default(element: bytes) -> bool: 

1797 """Validate a path element using default rules. 

1798 

1799 Args: 

1800 element: Path element to validate 

1801 

1802 Returns: 

1803 True if path element is valid, False otherwise 

1804 """ 

1805 return _normalize_path_element_default(element) not in INVALID_DOTNAMES 

1806 

1807 

1808def validate_path_element_ntfs(element: bytes) -> bool: 

1809 """Validate a path element using NTFS filesystem rules. 

1810 

1811 Args: 

1812 element: Path element to validate 

1813 

1814 Returns: 

1815 True if path element is valid for NTFS, False otherwise 

1816 """ 

1817 normalized = _normalize_path_element_ntfs(element) 

1818 if normalized in INVALID_DOTNAMES: 

1819 return False 

1820 if normalized == b"git~1": 

1821 return False 

1822 return True 

1823 

1824 

1825# HFS+ ignorable Unicode codepoints (from Git's utf8.c) 

1826HFS_IGNORABLE_CHARS = { 

1827 0x200C, # ZERO WIDTH NON-JOINER 

1828 0x200D, # ZERO WIDTH JOINER 

1829 0x200E, # LEFT-TO-RIGHT MARK 

1830 0x200F, # RIGHT-TO-LEFT MARK 

1831 0x202A, # LEFT-TO-RIGHT EMBEDDING 

1832 0x202B, # RIGHT-TO-LEFT EMBEDDING 

1833 0x202C, # POP DIRECTIONAL FORMATTING 

1834 0x202D, # LEFT-TO-RIGHT OVERRIDE 

1835 0x202E, # RIGHT-TO-LEFT OVERRIDE 

1836 0x206A, # INHIBIT SYMMETRIC SWAPPING 

1837 0x206B, # ACTIVATE SYMMETRIC SWAPPING 

1838 0x206C, # INHIBIT ARABIC FORM SHAPING 

1839 0x206D, # ACTIVATE ARABIC FORM SHAPING 

1840 0x206E, # NATIONAL DIGIT SHAPES 

1841 0x206F, # NOMINAL DIGIT SHAPES 

1842 0xFEFF, # ZERO WIDTH NO-BREAK SPACE 

1843} 

1844 

1845 

1846def validate_path_element_hfs(element: bytes) -> bool: 

1847 """Validate path element for HFS+ filesystem. 

1848 

1849 Equivalent to Git's is_hfs_dotgit and related checks. 

1850 Uses NFD normalization and ignores HFS+ ignorable characters. 

1851 """ 

1852 try: 

1853 normalized = _normalize_path_element_hfs(element) 

1854 except UnicodeDecodeError: 

1855 # Malformed UTF-8 - be conservative and reject 

1856 return False 

1857 

1858 # Check against invalid names 

1859 if normalized in INVALID_DOTNAMES: 

1860 return False 

1861 

1862 # Also check for 8.3 short name 

1863 if normalized == b"git~1": 

1864 return False 

1865 

1866 return True 

1867 

1868 

1869def validate_path( 

1870 path: bytes, 

1871 element_validator: Callable[[bytes], bool] = validate_path_element_default, 

1872) -> bool: 

1873 """Default path validator that just checks for .git/.""" 

1874 parts = path.split(b"/") 

1875 for p in parts: 

1876 if not element_validator(p): 

1877 return False 

1878 else: 

1879 return True 

1880 

1881 

1882def build_index_from_tree( 

1883 root_path: Union[str, bytes], 

1884 index_path: Union[str, bytes], 

1885 object_store: ObjectContainer, 

1886 tree_id: bytes, 

1887 honor_filemode: bool = True, 

1888 validate_path_element: Callable[[bytes], bool] = validate_path_element_default, 

1889 symlink_fn: Optional[ 

1890 Callable[ 

1891 [Union[str, bytes, os.PathLike[str]], Union[str, bytes, os.PathLike[str]]], 

1892 None, 

1893 ] 

1894 ] = None, 

1895 blob_normalizer: Optional["FilterBlobNormalizer"] = None, 

1896 tree_encoding: str = "utf-8", 

1897) -> None: 

1898 """Generate and materialize index from a tree. 

1899 

1900 Args: 

1901 tree_id: Tree to materialize 

1902 root_path: Target dir for materialized index files 

1903 index_path: Target path for generated index 

1904 object_store: Non-empty object store holding tree contents 

1905 honor_filemode: An optional flag to honor core.filemode setting in 

1906 config file, default is core.filemode=True, change executable bit 

1907 validate_path_element: Function to validate path elements to check 

1908 out; default just refuses .git and .. directories. 

1909 symlink_fn: Function to use for creating symlinks 

1910 blob_normalizer: An optional BlobNormalizer to use for converting line 

1911 endings when writing blobs to the working directory. 

1912 tree_encoding: Encoding used for tree paths (default: utf-8) 

1913 

1914 Note: existing index is wiped and contents are not merged 

1915 in a working dir. Suitable only for fresh clones. 

1916 """ 

1917 index = Index(index_path, read=False) 

1918 if not isinstance(root_path, bytes): 

1919 root_path = os.fsencode(root_path) 

1920 

1921 for entry in iter_tree_contents(object_store, tree_id): 

1922 assert ( 

1923 entry.path is not None and entry.mode is not None and entry.sha is not None 

1924 ) 

1925 if not validate_path(entry.path, validate_path_element): 

1926 continue 

1927 full_path = _tree_to_fs_path(root_path, entry.path, tree_encoding) 

1928 

1929 if not os.path.exists(os.path.dirname(full_path)): 

1930 os.makedirs(os.path.dirname(full_path)) 

1931 

1932 # TODO(jelmer): Merge new index into working tree 

1933 if S_ISGITLINK(entry.mode): 

1934 if not os.path.isdir(full_path): 

1935 os.mkdir(full_path) 

1936 st = os.lstat(full_path) 

1937 # TODO(jelmer): record and return submodule paths 

1938 else: 

1939 obj = object_store[entry.sha] 

1940 assert isinstance(obj, Blob) 

1941 # Apply blob normalization for checkout if normalizer is provided 

1942 if blob_normalizer is not None: 

1943 obj = blob_normalizer.checkout_normalize(obj, entry.path) 

1944 st = build_file_from_blob( 

1945 obj, 

1946 entry.mode, 

1947 full_path, 

1948 honor_filemode=honor_filemode, 

1949 tree_encoding=tree_encoding, 

1950 symlink_fn=symlink_fn, 

1951 ) 

1952 

1953 # Add file to index 

1954 if not honor_filemode or S_ISGITLINK(entry.mode): 

1955 # we can not use tuple slicing to build a new tuple, 

1956 # because on windows that will convert the times to 

1957 # longs, which causes errors further along 

1958 st_tuple = ( 

1959 entry.mode, 

1960 st.st_ino, 

1961 st.st_dev, 

1962 st.st_nlink, 

1963 st.st_uid, 

1964 st.st_gid, 

1965 st.st_size, 

1966 st.st_atime, 

1967 st.st_mtime, 

1968 st.st_ctime, 

1969 ) 

1970 st = st.__class__(st_tuple) 

1971 # default to a stage 0 index entry (normal) 

1972 # when reading from the filesystem 

1973 index[entry.path] = index_entry_from_stat(st, entry.sha) 

1974 

1975 index.write() 

1976 

1977 

1978def blob_from_path_and_mode( 

1979 fs_path: bytes, mode: int, tree_encoding: str = "utf-8" 

1980) -> Blob: 

1981 """Create a blob from a path and a stat object. 

1982 

1983 Args: 

1984 fs_path: Full file system path to file 

1985 mode: File mode 

1986 tree_encoding: Encoding to use for tree contents 

1987 Returns: A `Blob` object 

1988 """ 

1989 assert isinstance(fs_path, bytes) 

1990 blob = Blob() 

1991 if stat.S_ISLNK(mode): 

1992 if sys.platform == "win32": 

1993 # os.readlink on Python3 on Windows requires a unicode string. 

1994 blob.data = os.readlink(os.fsdecode(fs_path)).encode(tree_encoding) 

1995 else: 

1996 blob.data = os.readlink(fs_path) 

1997 else: 

1998 with open(fs_path, "rb") as f: 

1999 blob.data = f.read() 

2000 return blob 

2001 

2002 

2003def blob_from_path_and_stat( 

2004 fs_path: bytes, st: os.stat_result, tree_encoding: str = "utf-8" 

2005) -> Blob: 

2006 """Create a blob from a path and a stat object. 

2007 

2008 Args: 

2009 fs_path: Full file system path to file 

2010 st: A stat object 

2011 tree_encoding: Encoding to use for tree contents 

2012 Returns: A `Blob` object 

2013 """ 

2014 return blob_from_path_and_mode(fs_path, st.st_mode, tree_encoding) 

2015 

2016 

2017def read_submodule_head(path: Union[str, bytes]) -> Optional[bytes]: 

2018 """Read the head commit of a submodule. 

2019 

2020 Args: 

2021 path: path to the submodule 

2022 Returns: HEAD sha, None if not a valid head/repository 

2023 """ 

2024 from .errors import NotGitRepository 

2025 from .repo import Repo 

2026 

2027 # Repo currently expects a "str", so decode if necessary. 

2028 # TODO(jelmer): Perhaps move this into Repo() ? 

2029 if not isinstance(path, str): 

2030 path = os.fsdecode(path) 

2031 try: 

2032 repo = Repo(path) 

2033 except NotGitRepository: 

2034 return None 

2035 try: 

2036 return repo.head() 

2037 except KeyError: 

2038 return None 

2039 

2040 

2041def _has_directory_changed(tree_path: bytes, entry: IndexEntry) -> bool: 

2042 """Check if a directory has changed after getting an error. 

2043 

2044 When handling an error trying to create a blob from a path, call this 

2045 function. It will check if the path is a directory. If it's a directory 

2046 and a submodule, check the submodule head to see if it's has changed. If 

2047 not, consider the file as changed as Git tracked a file and not a 

2048 directory. 

2049 

2050 Return true if the given path should be considered as changed and False 

2051 otherwise or if the path is not a directory. 

2052 """ 

2053 # This is actually a directory 

2054 if os.path.exists(os.path.join(tree_path, b".git")): 

2055 # Submodule 

2056 head = read_submodule_head(tree_path) 

2057 if entry.sha != head: 

2058 return True 

2059 else: 

2060 # The file was changed to a directory, so consider it removed. 

2061 return True 

2062 

2063 return False 

2064 

2065 

2066os_sep_bytes = os.sep.encode("ascii") 

2067 

2068 

2069def _ensure_parent_dir_exists(full_path: bytes) -> None: 

2070 """Ensure parent directory exists, checking no parent is a file.""" 

2071 parent_dir = os.path.dirname(full_path) 

2072 if parent_dir and not os.path.exists(parent_dir): 

2073 # Walk up the directory tree to find the first existing parent 

2074 current = parent_dir 

2075 parents_to_check: list[bytes] = [] 

2076 

2077 while current and not os.path.exists(current): 

2078 parents_to_check.insert(0, current) 

2079 new_parent = os.path.dirname(current) 

2080 if new_parent == current: 

2081 # Reached the root or can't go up further 

2082 break 

2083 current = new_parent 

2084 

2085 # Check if the existing parent (if any) is a directory 

2086 if current and os.path.exists(current) and not os.path.isdir(current): 

2087 raise OSError( 

2088 f"Cannot create directory, parent path is a file: {current!r}" 

2089 ) 

2090 

2091 # Now check each parent we need to create isn't blocked by an existing file 

2092 for parent_path in parents_to_check: 

2093 if os.path.exists(parent_path) and not os.path.isdir(parent_path): 

2094 raise OSError( 

2095 f"Cannot create directory, parent path is a file: {parent_path!r}" 

2096 ) 

2097 

2098 os.makedirs(parent_dir) 

2099 

2100 

2101def _remove_file_with_readonly_handling(path: bytes) -> None: 

2102 """Remove a file, handling read-only files on Windows. 

2103 

2104 Args: 

2105 path: Path to the file to remove 

2106 """ 

2107 try: 

2108 os.unlink(path) 

2109 except PermissionError: 

2110 # On Windows, remove read-only attribute and retry 

2111 if sys.platform == "win32": 

2112 os.chmod(path, stat.S_IWRITE | stat.S_IREAD) 

2113 os.unlink(path) 

2114 else: 

2115 raise 

2116 

2117 

2118def _remove_empty_parents(path: bytes, stop_at: bytes) -> None: 

2119 """Remove empty parent directories up to stop_at.""" 

2120 parent = os.path.dirname(path) 

2121 while parent and parent != stop_at: 

2122 try: 

2123 os.rmdir(parent) 

2124 parent = os.path.dirname(parent) 

2125 except FileNotFoundError: 

2126 # Directory doesn't exist - stop trying 

2127 break 

2128 except OSError as e: 

2129 if e.errno == errno.ENOTEMPTY: 

2130 # Directory not empty - stop trying 

2131 break 

2132 raise 

2133 

2134 

2135def _check_symlink_matches( 

2136 full_path: bytes, repo_object_store: "BaseObjectStore", entry_sha: bytes 

2137) -> bool: 

2138 """Check if symlink target matches expected target. 

2139 

2140 Returns True if symlink matches, False if it doesn't match. 

2141 """ 

2142 try: 

2143 current_target = os.readlink(full_path) 

2144 blob_obj = repo_object_store[entry_sha] 

2145 expected_target = blob_obj.as_raw_string() 

2146 if isinstance(current_target, str): 

2147 current_target = current_target.encode() 

2148 return current_target == expected_target 

2149 except FileNotFoundError: 

2150 # Symlink doesn't exist 

2151 return False 

2152 except OSError as e: 

2153 if e.errno == errno.EINVAL: 

2154 # Not a symlink 

2155 return False 

2156 raise 

2157 

2158 

2159def _check_file_matches( 

2160 repo_object_store: "BaseObjectStore", 

2161 full_path: bytes, 

2162 entry_sha: bytes, 

2163 entry_mode: int, 

2164 current_stat: os.stat_result, 

2165 honor_filemode: bool, 

2166 blob_normalizer: Optional["FilterBlobNormalizer"] = None, 

2167 tree_path: Optional[bytes] = None, 

2168) -> bool: 

2169 """Check if a file on disk matches the expected git object. 

2170 

2171 Returns True if file matches, False if it doesn't match. 

2172 """ 

2173 # Check mode first (if honor_filemode is True) 

2174 if honor_filemode: 

2175 current_mode = stat.S_IMODE(current_stat.st_mode) 

2176 expected_mode = stat.S_IMODE(entry_mode) 

2177 

2178 # For regular files, only check the user executable bit, not group/other permissions 

2179 # This matches Git's behavior where umask differences don't count as modifications 

2180 if stat.S_ISREG(current_stat.st_mode): 

2181 # Normalize regular file modes to ignore group/other write permissions 

2182 current_mode_normalized = ( 

2183 current_mode & 0o755 

2184 ) # Keep only user rwx and all read+execute 

2185 expected_mode_normalized = expected_mode & 0o755 

2186 

2187 # For Git compatibility, regular files should be either 644 or 755 

2188 if expected_mode_normalized not in (0o644, 0o755): 

2189 expected_mode_normalized = 0o644 # Default for regular files 

2190 if current_mode_normalized not in (0o644, 0o755): 

2191 # Determine if it should be executable based on user execute bit 

2192 if current_mode & 0o100: # User execute bit is set 

2193 current_mode_normalized = 0o755 

2194 else: 

2195 current_mode_normalized = 0o644 

2196 

2197 if current_mode_normalized != expected_mode_normalized: 

2198 return False 

2199 else: 

2200 # For non-regular files (symlinks, etc.), check mode exactly 

2201 if current_mode != expected_mode: 

2202 return False 

2203 

2204 # If mode matches (or we don't care), check content via size first 

2205 blob_obj = repo_object_store[entry_sha] 

2206 if current_stat.st_size != blob_obj.raw_length(): 

2207 return False 

2208 

2209 # Size matches, check actual content 

2210 try: 

2211 with open(full_path, "rb") as f: 

2212 current_content = f.read() 

2213 expected_content = blob_obj.as_raw_string() 

2214 if blob_normalizer and tree_path is not None: 

2215 assert isinstance(blob_obj, Blob) 

2216 normalized_blob = blob_normalizer.checkout_normalize( 

2217 blob_obj, tree_path 

2218 ) 

2219 expected_content = normalized_blob.as_raw_string() 

2220 return current_content == expected_content 

2221 except (FileNotFoundError, PermissionError, IsADirectoryError): 

2222 return False 

2223 

2224 

2225def _transition_to_submodule( 

2226 repo: "Repo", 

2227 path: bytes, 

2228 full_path: bytes, 

2229 current_stat: Optional[os.stat_result], 

2230 entry: Union[IndexEntry, TreeEntry], 

2231 index: Index, 

2232) -> None: 

2233 """Transition any type to submodule.""" 

2234 from .submodule import ensure_submodule_placeholder 

2235 

2236 if current_stat is not None and stat.S_ISDIR(current_stat.st_mode): 

2237 # Already a directory, just ensure .git file exists 

2238 ensure_submodule_placeholder(repo, path) 

2239 else: 

2240 # Remove whatever is there and create submodule 

2241 if current_stat is not None: 

2242 _remove_file_with_readonly_handling(full_path) 

2243 ensure_submodule_placeholder(repo, path) 

2244 

2245 st = os.lstat(full_path) 

2246 assert entry.sha is not None 

2247 index[path] = index_entry_from_stat(st, entry.sha) 

2248 

2249 

2250def _transition_to_file( 

2251 object_store: "BaseObjectStore", 

2252 path: bytes, 

2253 full_path: bytes, 

2254 current_stat: Optional[os.stat_result], 

2255 entry: Union[IndexEntry, TreeEntry], 

2256 index: Index, 

2257 honor_filemode: bool, 

2258 symlink_fn: Optional[ 

2259 Callable[ 

2260 [Union[str, bytes, os.PathLike[str]], Union[str, bytes, os.PathLike[str]]], 

2261 None, 

2262 ] 

2263 ], 

2264 blob_normalizer: Optional["FilterBlobNormalizer"], 

2265 tree_encoding: str = "utf-8", 

2266) -> None: 

2267 """Transition any type to regular file or symlink.""" 

2268 assert entry.sha is not None and entry.mode is not None 

2269 # Check if we need to update 

2270 if ( 

2271 current_stat is not None 

2272 and stat.S_ISREG(current_stat.st_mode) 

2273 and not stat.S_ISLNK(entry.mode) 

2274 ): 

2275 # File to file - check if update needed 

2276 file_matches = _check_file_matches( 

2277 object_store, 

2278 full_path, 

2279 entry.sha, 

2280 entry.mode, 

2281 current_stat, 

2282 honor_filemode, 

2283 blob_normalizer, 

2284 path, 

2285 ) 

2286 needs_update = not file_matches 

2287 elif ( 

2288 current_stat is not None 

2289 and stat.S_ISLNK(current_stat.st_mode) 

2290 and stat.S_ISLNK(entry.mode) 

2291 ): 

2292 # Symlink to symlink - check if update needed 

2293 symlink_matches = _check_symlink_matches(full_path, object_store, entry.sha) 

2294 needs_update = not symlink_matches 

2295 else: 

2296 needs_update = True 

2297 

2298 if not needs_update: 

2299 # Just update index - current_stat should always be valid here since we're not updating 

2300 assert current_stat is not None 

2301 index[path] = index_entry_from_stat(current_stat, entry.sha) 

2302 return 

2303 

2304 # Remove existing entry if needed 

2305 if current_stat is not None and stat.S_ISDIR(current_stat.st_mode): 

2306 # Remove directory 

2307 dir_contents = set(os.listdir(full_path)) 

2308 git_file_name = b".git" if isinstance(full_path, bytes) else ".git" 

2309 

2310 if git_file_name in dir_contents: 

2311 if dir_contents != {git_file_name}: 

2312 raise IsADirectoryError( 

2313 f"Cannot replace submodule with untracked files: {full_path!r}" 

2314 ) 

2315 shutil.rmtree(full_path) 

2316 else: 

2317 try: 

2318 os.rmdir(full_path) 

2319 except OSError as e: 

2320 if e.errno == errno.ENOTEMPTY: 

2321 raise IsADirectoryError( 

2322 f"Cannot replace non-empty directory with file: {full_path!r}" 

2323 ) 

2324 raise 

2325 elif current_stat is not None: 

2326 _remove_file_with_readonly_handling(full_path) 

2327 

2328 # Ensure parent directory exists 

2329 _ensure_parent_dir_exists(full_path) 

2330 

2331 # Write the file 

2332 blob_obj = object_store[entry.sha] 

2333 assert isinstance(blob_obj, Blob) 

2334 if blob_normalizer: 

2335 blob_obj = blob_normalizer.checkout_normalize(blob_obj, path) 

2336 st = build_file_from_blob( 

2337 blob_obj, 

2338 entry.mode, 

2339 full_path, 

2340 honor_filemode=honor_filemode, 

2341 tree_encoding=tree_encoding, 

2342 symlink_fn=symlink_fn, 

2343 ) 

2344 index[path] = index_entry_from_stat(st, entry.sha) 

2345 

2346 

2347def _transition_to_absent( 

2348 repo: "Repo", 

2349 path: bytes, 

2350 full_path: bytes, 

2351 current_stat: Optional[os.stat_result], 

2352 index: Index, 

2353) -> None: 

2354 """Remove any type of entry.""" 

2355 if current_stat is None: 

2356 return 

2357 

2358 if stat.S_ISDIR(current_stat.st_mode): 

2359 # Check if it's a submodule directory 

2360 dir_contents = set(os.listdir(full_path)) 

2361 git_file_name = b".git" if isinstance(full_path, bytes) else ".git" 

2362 

2363 if git_file_name in dir_contents and dir_contents == {git_file_name}: 

2364 shutil.rmtree(full_path) 

2365 else: 

2366 try: 

2367 os.rmdir(full_path) 

2368 except OSError as e: 

2369 if e.errno not in (errno.ENOTEMPTY, errno.EEXIST): 

2370 raise 

2371 else: 

2372 _remove_file_with_readonly_handling(full_path) 

2373 

2374 try: 

2375 del index[path] 

2376 except KeyError: 

2377 pass 

2378 

2379 # Try to remove empty parent directories 

2380 _remove_empty_parents( 

2381 full_path, repo.path if isinstance(repo.path, bytes) else repo.path.encode() 

2382 ) 

2383 

2384 

2385def detect_case_only_renames( 

2386 changes: Sequence["TreeChange"], 

2387 config: "Config", 

2388) -> list["TreeChange"]: 

2389 """Detect and transform case-only renames in a list of tree changes. 

2390 

2391 This function identifies file renames that only differ in case (e.g., 

2392 README.txt -> readme.txt) and transforms matching ADD/DELETE pairs into 

2393 CHANGE_RENAME operations. It uses filesystem-appropriate path normalization 

2394 based on the repository configuration. 

2395 

2396 Args: 

2397 changes: List of TreeChange objects representing file changes 

2398 config: Repository configuration object 

2399 

2400 Returns: 

2401 New list of TreeChange objects with case-only renames converted to CHANGE_RENAME 

2402 """ 

2403 from .diff_tree import ( 

2404 CHANGE_ADD, 

2405 CHANGE_COPY, 

2406 CHANGE_DELETE, 

2407 CHANGE_MODIFY, 

2408 CHANGE_RENAME, 

2409 TreeChange, 

2410 ) 

2411 

2412 # Build dictionaries of old and new paths with their normalized forms 

2413 old_paths_normalized = {} 

2414 new_paths_normalized = {} 

2415 old_changes = {} # Map from old path to change object 

2416 new_changes = {} # Map from new path to change object 

2417 

2418 # Get the appropriate normalizer based on config 

2419 normalize_func = get_path_element_normalizer(config) 

2420 

2421 def normalize_path(path: bytes) -> bytes: 

2422 """Normalize entire path using element normalization.""" 

2423 return b"/".join(normalize_func(part) for part in path.split(b"/")) 

2424 

2425 # Pre-normalize all paths once to avoid repeated normalization 

2426 for change in changes: 

2427 if change.type == CHANGE_DELETE and change.old: 

2428 assert change.old.path is not None 

2429 try: 

2430 normalized = normalize_path(change.old.path) 

2431 except UnicodeDecodeError: 

2432 import logging 

2433 

2434 logging.warning( 

2435 "Skipping case-only rename detection for path with invalid UTF-8: %r", 

2436 change.old.path, 

2437 ) 

2438 else: 

2439 old_paths_normalized[normalized] = change.old.path 

2440 old_changes[change.old.path] = change 

2441 elif change.type == CHANGE_RENAME and change.old: 

2442 assert change.old.path is not None 

2443 # Treat RENAME as DELETE + ADD for case-only detection 

2444 try: 

2445 normalized = normalize_path(change.old.path) 

2446 except UnicodeDecodeError: 

2447 import logging 

2448 

2449 logging.warning( 

2450 "Skipping case-only rename detection for path with invalid UTF-8: %r", 

2451 change.old.path, 

2452 ) 

2453 else: 

2454 old_paths_normalized[normalized] = change.old.path 

2455 old_changes[change.old.path] = change 

2456 

2457 if ( 

2458 change.type in (CHANGE_ADD, CHANGE_MODIFY, CHANGE_RENAME, CHANGE_COPY) 

2459 and change.new 

2460 ): 

2461 assert change.new.path is not None 

2462 try: 

2463 normalized = normalize_path(change.new.path) 

2464 except UnicodeDecodeError: 

2465 import logging 

2466 

2467 logging.warning( 

2468 "Skipping case-only rename detection for path with invalid UTF-8: %r", 

2469 change.new.path, 

2470 ) 

2471 else: 

2472 new_paths_normalized[normalized] = change.new.path 

2473 new_changes[change.new.path] = change 

2474 

2475 # Find case-only renames and transform changes 

2476 case_only_renames = set() 

2477 new_rename_changes = [] 

2478 

2479 for norm_path, old_path in old_paths_normalized.items(): 

2480 if norm_path in new_paths_normalized: 

2481 new_path = new_paths_normalized[norm_path] 

2482 if old_path != new_path: 

2483 # Found a case-only rename 

2484 old_change = old_changes[old_path] 

2485 new_change = new_changes[new_path] 

2486 

2487 # Create a CHANGE_RENAME to replace the DELETE and ADD/MODIFY pair 

2488 if new_change.type == CHANGE_ADD: 

2489 # Simple case: DELETE + ADD becomes RENAME 

2490 rename_change = TreeChange( 

2491 CHANGE_RENAME, old_change.old, new_change.new 

2492 ) 

2493 else: 

2494 # Complex case: DELETE + MODIFY becomes RENAME 

2495 # Use the old file from DELETE and new file from MODIFY 

2496 rename_change = TreeChange( 

2497 CHANGE_RENAME, old_change.old, new_change.new 

2498 ) 

2499 

2500 new_rename_changes.append(rename_change) 

2501 

2502 # Mark the old changes for removal 

2503 case_only_renames.add(old_change) 

2504 case_only_renames.add(new_change) 

2505 

2506 # Return new list with original ADD/DELETE changes replaced by renames 

2507 result = [change for change in changes if change not in case_only_renames] 

2508 result.extend(new_rename_changes) 

2509 return result 

2510 

2511 

2512def update_working_tree( 

2513 repo: "Repo", 

2514 old_tree_id: Optional[bytes], 

2515 new_tree_id: bytes, 

2516 change_iterator: Iterator["TreeChange"], 

2517 honor_filemode: bool = True, 

2518 validate_path_element: Optional[Callable[[bytes], bool]] = None, 

2519 symlink_fn: Optional[ 

2520 Callable[ 

2521 [Union[str, bytes, os.PathLike[str]], Union[str, bytes, os.PathLike[str]]], 

2522 None, 

2523 ] 

2524 ] = None, 

2525 force_remove_untracked: bool = False, 

2526 blob_normalizer: Optional["FilterBlobNormalizer"] = None, 

2527 tree_encoding: str = "utf-8", 

2528 allow_overwrite_modified: bool = False, 

2529) -> None: 

2530 """Update the working tree and index to match a new tree. 

2531 

2532 This function handles: 

2533 - Adding new files 

2534 - Updating modified files 

2535 - Removing deleted files 

2536 - Cleaning up empty directories 

2537 

2538 Args: 

2539 repo: Repository object 

2540 old_tree_id: SHA of the tree before the update 

2541 new_tree_id: SHA of the tree to update to 

2542 change_iterator: Iterator of TreeChange objects to apply 

2543 honor_filemode: An optional flag to honor core.filemode setting 

2544 validate_path_element: Function to validate path elements to check out 

2545 symlink_fn: Function to use for creating symlinks 

2546 force_remove_untracked: If True, remove files that exist in working 

2547 directory but not in target tree, even if old_tree_id is None 

2548 blob_normalizer: An optional BlobNormalizer to use for converting line 

2549 endings when writing blobs to the working directory. 

2550 tree_encoding: Encoding used for tree paths (default: utf-8) 

2551 allow_overwrite_modified: If False, raise an error when attempting to 

2552 overwrite files that have been modified compared to old_tree_id 

2553 """ 

2554 if validate_path_element is None: 

2555 validate_path_element = validate_path_element_default 

2556 

2557 from .diff_tree import ( 

2558 CHANGE_ADD, 

2559 CHANGE_COPY, 

2560 CHANGE_DELETE, 

2561 CHANGE_MODIFY, 

2562 CHANGE_RENAME, 

2563 CHANGE_UNCHANGED, 

2564 ) 

2565 

2566 repo_path = repo.path if isinstance(repo.path, bytes) else repo.path.encode() 

2567 index = repo.open_index() 

2568 

2569 # Convert iterator to list since we need multiple passes 

2570 changes = list(change_iterator) 

2571 

2572 # Transform case-only renames on case-insensitive filesystems 

2573 import platform 

2574 

2575 default_ignore_case = platform.system() in ("Windows", "Darwin") 

2576 config = repo.get_config() 

2577 ignore_case = config.get_boolean((b"core",), b"ignorecase", default_ignore_case) 

2578 

2579 if ignore_case: 

2580 config = repo.get_config() 

2581 changes = detect_case_only_renames(changes, config) 

2582 

2583 # Check for path conflicts where files need to become directories 

2584 paths_becoming_dirs = set() 

2585 for change in changes: 

2586 if change.type in (CHANGE_ADD, CHANGE_MODIFY, CHANGE_RENAME, CHANGE_COPY): 

2587 assert change.new is not None 

2588 path = change.new.path 

2589 assert path is not None 

2590 if b"/" in path: # This is a file inside a directory 

2591 # Check if any parent path exists as a file in the old tree or changes 

2592 parts = path.split(b"/") 

2593 for i in range(1, len(parts)): 

2594 parent = b"/".join(parts[:i]) 

2595 # See if this parent path is being deleted (was a file, becoming a dir) 

2596 for other_change in changes: 

2597 if ( 

2598 other_change.type == CHANGE_DELETE 

2599 and other_change.old 

2600 and other_change.old.path == parent 

2601 ): 

2602 paths_becoming_dirs.add(parent) 

2603 

2604 # Check if any path that needs to become a directory has been modified 

2605 for path in paths_becoming_dirs: 

2606 full_path = _tree_to_fs_path(repo_path, path, tree_encoding) 

2607 try: 

2608 current_stat = os.lstat(full_path) 

2609 except FileNotFoundError: 

2610 continue # File doesn't exist, nothing to check 

2611 except OSError as e: 

2612 raise OSError( 

2613 f"Cannot access {path.decode('utf-8', errors='replace')}: {e}" 

2614 ) from e 

2615 

2616 if stat.S_ISREG(current_stat.st_mode): 

2617 # Find the old entry for this path 

2618 old_change = None 

2619 for change in changes: 

2620 if ( 

2621 change.type == CHANGE_DELETE 

2622 and change.old 

2623 and change.old.path == path 

2624 ): 

2625 old_change = change 

2626 break 

2627 

2628 if old_change: 

2629 # Check if file has been modified 

2630 assert old_change.old is not None 

2631 assert ( 

2632 old_change.old.sha is not None and old_change.old.mode is not None 

2633 ) 

2634 file_matches = _check_file_matches( 

2635 repo.object_store, 

2636 full_path, 

2637 old_change.old.sha, 

2638 old_change.old.mode, 

2639 current_stat, 

2640 honor_filemode, 

2641 blob_normalizer, 

2642 path, 

2643 ) 

2644 if not file_matches: 

2645 raise OSError( 

2646 f"Cannot replace modified file with directory: {path!r}" 

2647 ) 

2648 

2649 # Check for uncommitted modifications before making any changes 

2650 if not allow_overwrite_modified and old_tree_id: 

2651 for change in changes: 

2652 # Only check files that are being modified or deleted 

2653 if change.type in (CHANGE_MODIFY, CHANGE_DELETE) and change.old: 

2654 path = change.old.path 

2655 assert path is not None 

2656 if path.startswith(b".git") or not validate_path( 

2657 path, validate_path_element 

2658 ): 

2659 continue 

2660 

2661 full_path = _tree_to_fs_path(repo_path, path, tree_encoding) 

2662 try: 

2663 current_stat = os.lstat(full_path) 

2664 except FileNotFoundError: 

2665 continue # File doesn't exist, nothing to check 

2666 except OSError as e: 

2667 raise OSError( 

2668 f"Cannot access {path.decode('utf-8', errors='replace')}: {e}" 

2669 ) from e 

2670 

2671 if stat.S_ISREG(current_stat.st_mode): 

2672 # Check if working tree file differs from old tree 

2673 assert change.old.sha is not None and change.old.mode is not None 

2674 file_matches = _check_file_matches( 

2675 repo.object_store, 

2676 full_path, 

2677 change.old.sha, 

2678 change.old.mode, 

2679 current_stat, 

2680 honor_filemode, 

2681 blob_normalizer, 

2682 path, 

2683 ) 

2684 if not file_matches: 

2685 from .errors import WorkingTreeModifiedError 

2686 

2687 raise WorkingTreeModifiedError( 

2688 f"Your local changes to '{path.decode('utf-8', errors='replace')}' " 

2689 f"would be overwritten by checkout. " 

2690 f"Please commit your changes or stash them before you switch branches." 

2691 ) 

2692 

2693 # Apply the changes 

2694 for change in changes: 

2695 if change.type in (CHANGE_DELETE, CHANGE_RENAME): 

2696 # Remove file/directory 

2697 assert change.old is not None and change.old.path is not None 

2698 path = change.old.path 

2699 if path.startswith(b".git") or not validate_path( 

2700 path, validate_path_element 

2701 ): 

2702 continue 

2703 

2704 full_path = _tree_to_fs_path(repo_path, path, tree_encoding) 

2705 try: 

2706 delete_stat: Optional[os.stat_result] = os.lstat(full_path) 

2707 except FileNotFoundError: 

2708 delete_stat = None 

2709 except OSError as e: 

2710 raise OSError( 

2711 f"Cannot access {path.decode('utf-8', errors='replace')}: {e}" 

2712 ) from e 

2713 

2714 _transition_to_absent(repo, path, full_path, delete_stat, index) 

2715 

2716 if change.type in ( 

2717 CHANGE_ADD, 

2718 CHANGE_MODIFY, 

2719 CHANGE_UNCHANGED, 

2720 CHANGE_COPY, 

2721 CHANGE_RENAME, 

2722 ): 

2723 # Add or modify file 

2724 assert ( 

2725 change.new is not None 

2726 and change.new.path is not None 

2727 and change.new.mode is not None 

2728 ) 

2729 path = change.new.path 

2730 if path.startswith(b".git") or not validate_path( 

2731 path, validate_path_element 

2732 ): 

2733 continue 

2734 

2735 full_path = _tree_to_fs_path(repo_path, path, tree_encoding) 

2736 try: 

2737 modify_stat: Optional[os.stat_result] = os.lstat(full_path) 

2738 except FileNotFoundError: 

2739 modify_stat = None 

2740 except OSError as e: 

2741 raise OSError( 

2742 f"Cannot access {path.decode('utf-8', errors='replace')}: {e}" 

2743 ) from e 

2744 

2745 if S_ISGITLINK(change.new.mode): 

2746 _transition_to_submodule( 

2747 repo, path, full_path, modify_stat, change.new, index 

2748 ) 

2749 else: 

2750 _transition_to_file( 

2751 repo.object_store, 

2752 path, 

2753 full_path, 

2754 modify_stat, 

2755 change.new, 

2756 index, 

2757 honor_filemode, 

2758 symlink_fn, 

2759 blob_normalizer, 

2760 tree_encoding, 

2761 ) 

2762 

2763 index.write() 

2764 

2765 

2766def _check_entry_for_changes( 

2767 tree_path: bytes, 

2768 entry: Union[IndexEntry, ConflictedIndexEntry], 

2769 root_path: bytes, 

2770 filter_blob_callback: Optional[Callable[[bytes, bytes], bytes]] = None, 

2771) -> Optional[bytes]: 

2772 """Check a single index entry for changes. 

2773 

2774 Args: 

2775 tree_path: Path in the tree 

2776 entry: Index entry to check 

2777 root_path: Root filesystem path 

2778 filter_blob_callback: Optional callback to filter blobs 

2779 Returns: tree_path if changed, None otherwise 

2780 """ 

2781 if isinstance(entry, ConflictedIndexEntry): 

2782 # Conflicted files are always unstaged 

2783 return tree_path 

2784 

2785 full_path = _tree_to_fs_path(root_path, tree_path) 

2786 try: 

2787 st = os.lstat(full_path) 

2788 if stat.S_ISDIR(st.st_mode): 

2789 if _has_directory_changed(tree_path, entry): 

2790 return tree_path 

2791 return None 

2792 

2793 if not stat.S_ISREG(st.st_mode) and not stat.S_ISLNK(st.st_mode): 

2794 return None 

2795 

2796 blob = blob_from_path_and_stat(full_path, st) 

2797 

2798 if filter_blob_callback is not None: 

2799 blob.data = filter_blob_callback(blob.data, tree_path) 

2800 except FileNotFoundError: 

2801 # The file was removed, so we assume that counts as 

2802 # different from whatever file used to exist. 

2803 return tree_path 

2804 else: 

2805 if blob.id != entry.sha: 

2806 return tree_path 

2807 return None 

2808 

2809 

2810def get_unstaged_changes( 

2811 index: Index, 

2812 root_path: Union[str, bytes], 

2813 filter_blob_callback: Optional[Callable[..., Any]] = None, 

2814 preload_index: bool = False, 

2815) -> Generator[bytes, None, None]: 

2816 """Walk through an index and check for differences against working tree. 

2817 

2818 Args: 

2819 index: index to check 

2820 root_path: path in which to find files 

2821 filter_blob_callback: Optional callback to filter blobs 

2822 preload_index: If True, use parallel threads to check files (requires threading support) 

2823 Returns: iterator over paths with unstaged changes 

2824 """ 

2825 # For each entry in the index check the sha1 & ensure not staged 

2826 if not isinstance(root_path, bytes): 

2827 root_path = os.fsencode(root_path) 

2828 

2829 if preload_index: 

2830 # Use parallel processing for better performance on slow filesystems 

2831 try: 

2832 import multiprocessing 

2833 from concurrent.futures import ThreadPoolExecutor 

2834 except ImportError: 

2835 # If threading is not available, fall back to serial processing 

2836 preload_index = False 

2837 else: 

2838 # Collect all entries first 

2839 entries = list(index.iteritems()) 

2840 

2841 # Use number of CPUs but cap at 8 threads to avoid overhead 

2842 num_workers = min(multiprocessing.cpu_count(), 8) 

2843 

2844 # Process entries in parallel 

2845 with ThreadPoolExecutor(max_workers=num_workers) as executor: 

2846 # Submit all tasks 

2847 futures = [ 

2848 executor.submit( 

2849 _check_entry_for_changes, 

2850 tree_path, 

2851 entry, 

2852 root_path, 

2853 filter_blob_callback, 

2854 ) 

2855 for tree_path, entry in entries 

2856 ] 

2857 

2858 # Yield results as they complete 

2859 for future in futures: 

2860 result = future.result() 

2861 if result is not None: 

2862 yield result 

2863 

2864 if not preload_index: 

2865 # Serial processing 

2866 for tree_path, entry in index.iteritems(): 

2867 result = _check_entry_for_changes( 

2868 tree_path, entry, root_path, filter_blob_callback 

2869 ) 

2870 if result is not None: 

2871 yield result 

2872 

2873 

2874def _tree_to_fs_path( 

2875 root_path: bytes, tree_path: bytes, tree_encoding: str = "utf-8" 

2876) -> bytes: 

2877 """Convert a git tree path to a file system path. 

2878 

2879 Args: 

2880 root_path: Root filesystem path 

2881 tree_path: Git tree path as bytes (encoded with tree_encoding) 

2882 tree_encoding: Encoding used for tree paths (default: utf-8) 

2883 

2884 Returns: File system path. 

2885 """ 

2886 assert isinstance(tree_path, bytes) 

2887 if os_sep_bytes != b"/": 

2888 sep_corrected_path = tree_path.replace(b"/", os_sep_bytes) 

2889 else: 

2890 sep_corrected_path = tree_path 

2891 

2892 # On Windows, we need to handle tree path encoding properly 

2893 if sys.platform == "win32": 

2894 # Decode from tree encoding, then re-encode for filesystem 

2895 try: 

2896 tree_path_str = sep_corrected_path.decode(tree_encoding) 

2897 sep_corrected_path = os.fsencode(tree_path_str) 

2898 except UnicodeDecodeError: 

2899 # If decoding fails, use the original bytes 

2900 pass 

2901 

2902 return os.path.join(root_path, sep_corrected_path) 

2903 

2904 

2905def _fs_to_tree_path(fs_path: Union[str, bytes], tree_encoding: str = "utf-8") -> bytes: 

2906 """Convert a file system path to a git tree path. 

2907 

2908 Args: 

2909 fs_path: File system path. 

2910 tree_encoding: Encoding to use for tree paths (default: utf-8) 

2911 

2912 Returns: Git tree path as bytes (encoded with tree_encoding) 

2913 """ 

2914 if not isinstance(fs_path, bytes): 

2915 fs_path_bytes = os.fsencode(fs_path) 

2916 else: 

2917 fs_path_bytes = fs_path 

2918 

2919 # On Windows, we need to ensure tree paths are properly encoded 

2920 if sys.platform == "win32": 

2921 try: 

2922 # Decode from filesystem encoding, then re-encode with tree encoding 

2923 fs_path_str = os.fsdecode(fs_path_bytes) 

2924 fs_path_bytes = fs_path_str.encode(tree_encoding) 

2925 except UnicodeDecodeError: 

2926 # If filesystem decoding fails, use the original bytes 

2927 pass 

2928 

2929 if os_sep_bytes != b"/": 

2930 tree_path = fs_path_bytes.replace(os_sep_bytes, b"/") 

2931 else: 

2932 tree_path = fs_path_bytes 

2933 return tree_path 

2934 

2935 

2936def index_entry_from_directory(st: os.stat_result, path: bytes) -> Optional[IndexEntry]: 

2937 """Create an index entry for a directory. 

2938 

2939 This is only used for submodules (directories containing .git). 

2940 

2941 Args: 

2942 st: Stat result for the directory 

2943 path: Path to the directory 

2944 

2945 Returns: 

2946 IndexEntry for a submodule, or None if not a submodule 

2947 """ 

2948 if os.path.exists(os.path.join(path, b".git")): 

2949 head = read_submodule_head(path) 

2950 if head is None: 

2951 return None 

2952 return index_entry_from_stat(st, head, mode=S_IFGITLINK) 

2953 return None 

2954 

2955 

2956def index_entry_from_path( 

2957 path: bytes, object_store: Optional[ObjectContainer] = None 

2958) -> Optional[IndexEntry]: 

2959 """Create an index from a filesystem path. 

2960 

2961 This returns an index value for files, symlinks 

2962 and tree references. for directories and 

2963 non-existent files it returns None 

2964 

2965 Args: 

2966 path: Path to create an index entry for 

2967 object_store: Optional object store to 

2968 save new blobs in 

2969 Returns: An index entry; None for directories 

2970 """ 

2971 assert isinstance(path, bytes) 

2972 st = os.lstat(path) 

2973 if stat.S_ISDIR(st.st_mode): 

2974 return index_entry_from_directory(st, path) 

2975 

2976 if stat.S_ISREG(st.st_mode) or stat.S_ISLNK(st.st_mode): 

2977 blob = blob_from_path_and_stat(path, st) 

2978 if object_store is not None: 

2979 object_store.add_object(blob) 

2980 return index_entry_from_stat(st, blob.id) 

2981 

2982 return None 

2983 

2984 

2985def iter_fresh_entries( 

2986 paths: Iterable[bytes], 

2987 root_path: bytes, 

2988 object_store: Optional[ObjectContainer] = None, 

2989) -> Iterator[tuple[bytes, Optional[IndexEntry]]]: 

2990 """Iterate over current versions of index entries on disk. 

2991 

2992 Args: 

2993 paths: Paths to iterate over 

2994 root_path: Root path to access from 

2995 object_store: Optional store to save new blobs in 

2996 Returns: Iterator over path, index_entry 

2997 """ 

2998 for path in paths: 

2999 p = _tree_to_fs_path(root_path, path) 

3000 try: 

3001 entry = index_entry_from_path(p, object_store=object_store) 

3002 except (FileNotFoundError, IsADirectoryError): 

3003 entry = None 

3004 yield path, entry 

3005 

3006 

3007def iter_fresh_objects( 

3008 paths: Iterable[bytes], 

3009 root_path: bytes, 

3010 include_deleted: bool = False, 

3011 object_store: Optional[ObjectContainer] = None, 

3012) -> Iterator[tuple[bytes, Optional[bytes], Optional[int]]]: 

3013 """Iterate over versions of objects on disk referenced by index. 

3014 

3015 Args: 

3016 paths: Paths to check 

3017 root_path: Root path to access from 

3018 include_deleted: Include deleted entries with sha and 

3019 mode set to None 

3020 object_store: Optional object store to report new items to 

3021 Returns: Iterator over path, sha, mode 

3022 """ 

3023 for path, entry in iter_fresh_entries(paths, root_path, object_store=object_store): 

3024 if entry is None: 

3025 if include_deleted: 

3026 yield path, None, None 

3027 else: 

3028 yield path, entry.sha, cleanup_mode(entry.mode) 

3029 

3030 

3031def refresh_index(index: Index, root_path: bytes) -> None: 

3032 """Refresh the contents of an index. 

3033 

3034 This is the equivalent to running 'git commit -a'. 

3035 

3036 Args: 

3037 index: Index to update 

3038 root_path: Root filesystem path 

3039 """ 

3040 for path, entry in iter_fresh_entries(index, root_path): 

3041 if entry: 

3042 index[path] = entry 

3043 

3044 

3045class locked_index: 

3046 """Lock the index while making modifications. 

3047 

3048 Works as a context manager. 

3049 """ 

3050 

3051 _file: "_GitFile" 

3052 

3053 def __init__(self, path: Union[bytes, str]) -> None: 

3054 """Initialize locked_index.""" 

3055 self._path = path 

3056 

3057 def __enter__(self) -> Index: 

3058 """Enter context manager and lock index.""" 

3059 f = GitFile(self._path, "wb") 

3060 self._file = f 

3061 self._index = Index(self._path) 

3062 return self._index 

3063 

3064 def __exit__( 

3065 self, 

3066 exc_type: Optional[type], 

3067 exc_value: Optional[BaseException], 

3068 traceback: Optional[types.TracebackType], 

3069 ) -> None: 

3070 """Exit context manager and unlock index.""" 

3071 if exc_type is not None: 

3072 self._file.abort() 

3073 return 

3074 try: 

3075 f = SHA1Writer(self._file) 

3076 write_index_dict(f, self._index._byname) 

3077 except BaseException: 

3078 self._file.abort() 

3079 else: 

3080 f.close()