Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/dulwich/index.py: 31%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

1181 statements  

1# index.py -- File parser/writer for the git index file 

2# Copyright (C) 2008-2013 Jelmer Vernooij <jelmer@jelmer.uk> 

3# 

4# SPDX-License-Identifier: Apache-2.0 OR GPL-2.0-or-later 

5# Dulwich is dual-licensed under the Apache License, Version 2.0 and the GNU 

6# General Public License as published by the Free Software Foundation; version 2.0 

7# or (at your option) any later version. You can redistribute it and/or 

8# modify it under the terms of either of these two licenses. 

9# 

10# Unless required by applicable law or agreed to in writing, software 

11# distributed under the License is distributed on an "AS IS" BASIS, 

12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

13# See the License for the specific language governing permissions and 

14# limitations under the License. 

15# 

16# You should have received a copy of the licenses; if not, see 

17# <http://www.gnu.org/licenses/> for a copy of the GNU General Public License 

18# and <http://www.apache.org/licenses/LICENSE-2.0> for a copy of the Apache 

19# License, Version 2.0. 

20# 

21 

22"""Parser for the git index file format.""" 

23 

24import errno 

25import os 

26import shutil 

27import stat 

28import struct 

29import sys 

30import types 

31from collections.abc import ( 

32 Callable, 

33 Generator, 

34 Iterable, 

35 Iterator, 

36 Mapping, 

37 Sequence, 

38 Set, 

39) 

40from dataclasses import dataclass 

41from enum import Enum 

42from typing import ( 

43 IO, 

44 TYPE_CHECKING, 

45 Any, 

46 BinaryIO, 

47 Optional, 

48 Union, 

49) 

50 

51if TYPE_CHECKING: 

52 from .config import Config 

53 from .diff_tree import TreeChange 

54 from .file import _GitFile 

55 from .filters import FilterBlobNormalizer 

56 from .object_store import BaseObjectStore 

57 from .repo import Repo 

58 

59from .file import GitFile 

60from .object_store import iter_tree_contents 

61from .objects import ( 

62 S_IFGITLINK, 

63 S_ISGITLINK, 

64 Blob, 

65 ObjectID, 

66 Tree, 

67 TreeEntry, 

68 hex_to_sha, 

69 sha_to_hex, 

70) 

71from .pack import ObjectContainer, SHA1Reader, SHA1Writer 

72 

73# Type alias for recursive tree structure used in commit_tree 

74TreeDict = dict[bytes, Union["TreeDict", tuple[int, bytes]]] 

75 

76# 2-bit stage (during merge) 

77FLAG_STAGEMASK = 0x3000 

78FLAG_STAGESHIFT = 12 

79FLAG_NAMEMASK = 0x0FFF 

80 

81# assume-valid 

82FLAG_VALID = 0x8000 

83 

84# extended flag (must be zero in version 2) 

85FLAG_EXTENDED = 0x4000 

86 

87# used by sparse checkout 

88EXTENDED_FLAG_SKIP_WORKTREE = 0x4000 

89 

90# used by "git add -N" 

91EXTENDED_FLAG_INTEND_TO_ADD = 0x2000 

92 

93DEFAULT_VERSION = 2 

94 

95# Index extension signatures 

96TREE_EXTENSION = b"TREE" 

97REUC_EXTENSION = b"REUC" 

98UNTR_EXTENSION = b"UNTR" 

99EOIE_EXTENSION = b"EOIE" 

100IEOT_EXTENSION = b"IEOT" 

101SDIR_EXTENSION = b"sdir" # Sparse directory extension 

102 

103 

104def _encode_varint(value: int) -> bytes: 

105 """Encode an integer using variable-width encoding. 

106 

107 Same format as used for OFS_DELTA pack entries and index v4 path compression. 

108 Uses 7 bits per byte, with the high bit indicating continuation. 

109 

110 Args: 

111 value: Integer to encode 

112 Returns: 

113 Encoded bytes 

114 """ 

115 if value == 0: 

116 return b"\x00" 

117 

118 result = [] 

119 while value > 0: 

120 byte = value & 0x7F # Take lower 7 bits 

121 value >>= 7 

122 if value > 0: 

123 byte |= 0x80 # Set continuation bit 

124 result.append(byte) 

125 

126 return bytes(result) 

127 

128 

129def _decode_varint(data: bytes, offset: int = 0) -> tuple[int, int]: 

130 """Decode a variable-width encoded integer. 

131 

132 Args: 

133 data: Bytes to decode from 

134 offset: Starting offset in data 

135 Returns: 

136 tuple of (decoded_value, new_offset) 

137 """ 

138 value = 0 

139 shift = 0 

140 pos = offset 

141 

142 while pos < len(data): 

143 byte = data[pos] 

144 pos += 1 

145 value |= (byte & 0x7F) << shift 

146 shift += 7 

147 if not (byte & 0x80): # No continuation bit 

148 break 

149 

150 return value, pos 

151 

152 

153def _compress_path(path: bytes, previous_path: bytes) -> bytes: 

154 """Compress a path relative to the previous path for index version 4. 

155 

156 Args: 

157 path: Path to compress 

158 previous_path: Previous path for comparison 

159 Returns: 

160 Compressed path data (varint prefix_len + suffix) 

161 """ 

162 # Find the common prefix length 

163 common_len = 0 

164 min_len = min(len(path), len(previous_path)) 

165 

166 for i in range(min_len): 

167 if path[i] == previous_path[i]: 

168 common_len += 1 

169 else: 

170 break 

171 

172 # The number of bytes to remove from the end of previous_path 

173 # to get the common prefix 

174 remove_len = len(previous_path) - common_len 

175 

176 # The suffix to append 

177 suffix = path[common_len:] 

178 

179 # Encode: varint(remove_len) + suffix + NUL 

180 return _encode_varint(remove_len) + suffix + b"\x00" 

181 

182 

183def _decompress_path( 

184 data: bytes, offset: int, previous_path: bytes 

185) -> tuple[bytes, int]: 

186 """Decompress a path from index version 4 compressed format. 

187 

188 Args: 

189 data: Raw data containing compressed path 

190 offset: Starting offset in data 

191 previous_path: Previous path for decompression 

192 Returns: 

193 tuple of (decompressed_path, new_offset) 

194 """ 

195 # Decode the number of bytes to remove from previous path 

196 remove_len, new_offset = _decode_varint(data, offset) 

197 

198 # Find the NUL terminator for the suffix 

199 suffix_start = new_offset 

200 suffix_end = suffix_start 

201 while suffix_end < len(data) and data[suffix_end] != 0: 

202 suffix_end += 1 

203 

204 if suffix_end >= len(data): 

205 raise ValueError("Unterminated path suffix in compressed entry") 

206 

207 suffix = data[suffix_start:suffix_end] 

208 new_offset = suffix_end + 1 # Skip the NUL terminator 

209 

210 # Reconstruct the path 

211 if remove_len > len(previous_path): 

212 raise ValueError( 

213 f"Invalid path compression: trying to remove {remove_len} bytes from {len(previous_path)}-byte path" 

214 ) 

215 

216 prefix = previous_path[:-remove_len] if remove_len > 0 else previous_path 

217 path = prefix + suffix 

218 

219 return path, new_offset 

220 

221 

222def _decompress_path_from_stream( 

223 f: BinaryIO, previous_path: bytes 

224) -> tuple[bytes, int]: 

225 """Decompress a path from index version 4 compressed format, reading from stream. 

226 

227 Args: 

228 f: File-like object to read from 

229 previous_path: Previous path for decompression 

230 Returns: 

231 tuple of (decompressed_path, bytes_consumed) 

232 """ 

233 # Decode the varint for remove_len by reading byte by byte 

234 remove_len = 0 

235 shift = 0 

236 bytes_consumed = 0 

237 

238 while True: 

239 byte_data = f.read(1) 

240 if not byte_data: 

241 raise ValueError("Unexpected end of file while reading varint") 

242 byte = byte_data[0] 

243 bytes_consumed += 1 

244 remove_len |= (byte & 0x7F) << shift 

245 shift += 7 

246 if not (byte & 0x80): # No continuation bit 

247 break 

248 

249 # Read the suffix until NUL terminator 

250 suffix = b"" 

251 while True: 

252 byte_data = f.read(1) 

253 if not byte_data: 

254 raise ValueError("Unexpected end of file while reading path suffix") 

255 byte = byte_data[0] 

256 bytes_consumed += 1 

257 if byte == 0: # NUL terminator 

258 break 

259 suffix += bytes([byte]) 

260 

261 # Reconstruct the path 

262 if remove_len > len(previous_path): 

263 raise ValueError( 

264 f"Invalid path compression: trying to remove {remove_len} bytes from {len(previous_path)}-byte path" 

265 ) 

266 

267 prefix = previous_path[:-remove_len] if remove_len > 0 else previous_path 

268 path = prefix + suffix 

269 

270 return path, bytes_consumed 

271 

272 

273class Stage(Enum): 

274 """Represents the stage of an index entry during merge conflicts.""" 

275 

276 NORMAL = 0 

277 MERGE_CONFLICT_ANCESTOR = 1 

278 MERGE_CONFLICT_THIS = 2 

279 MERGE_CONFLICT_OTHER = 3 

280 

281 

282@dataclass 

283class SerializedIndexEntry: 

284 """Represents a serialized index entry as stored in the index file. 

285 

286 This dataclass holds the raw data for an index entry before it's 

287 parsed into the more user-friendly IndexEntry format. 

288 """ 

289 

290 name: bytes 

291 ctime: int | float | tuple[int, int] 

292 mtime: int | float | tuple[int, int] 

293 dev: int 

294 ino: int 

295 mode: int 

296 uid: int 

297 gid: int 

298 size: int 

299 sha: bytes 

300 flags: int 

301 extended_flags: int 

302 

303 def stage(self) -> Stage: 

304 """Extract the stage from the flags field. 

305 

306 Returns: 

307 Stage enum value indicating merge conflict state 

308 """ 

309 return Stage((self.flags & FLAG_STAGEMASK) >> FLAG_STAGESHIFT) 

310 

311 def is_sparse_dir(self) -> bool: 

312 """Check if this entry represents a sparse directory. 

313 

314 A sparse directory entry is a collapsed representation of an entire 

315 directory tree in a sparse index. It has: 

316 - Directory mode (0o040000) 

317 - SKIP_WORKTREE flag set 

318 - Path ending with '/' 

319 - SHA pointing to a tree object 

320 

321 Returns: 

322 True if entry is a sparse directory entry 

323 """ 

324 return ( 

325 stat.S_ISDIR(self.mode) 

326 and bool(self.extended_flags & EXTENDED_FLAG_SKIP_WORKTREE) 

327 and self.name.endswith(b"/") 

328 ) 

329 

330 

331@dataclass 

332class IndexExtension: 

333 """Base class for index extensions.""" 

334 

335 signature: bytes 

336 data: bytes 

337 

338 @classmethod 

339 def from_raw(cls, signature: bytes, data: bytes) -> "IndexExtension": 

340 """Create an extension from raw data. 

341 

342 Args: 

343 signature: 4-byte extension signature 

344 data: Extension data 

345 Returns: 

346 Parsed extension object 

347 """ 

348 if signature == TREE_EXTENSION: 

349 return TreeExtension.from_bytes(data) 

350 elif signature == REUC_EXTENSION: 

351 return ResolveUndoExtension.from_bytes(data) 

352 elif signature == UNTR_EXTENSION: 

353 return UntrackedExtension.from_bytes(data) 

354 elif signature == SDIR_EXTENSION: 

355 return SparseDirExtension.from_bytes(data) 

356 else: 

357 # Unknown extension - just store raw data 

358 return cls(signature, data) 

359 

360 def to_bytes(self) -> bytes: 

361 """Serialize extension to bytes.""" 

362 return self.data 

363 

364 

365class TreeExtension(IndexExtension): 

366 """Tree cache extension.""" 

367 

368 def __init__(self, entries: list[tuple[bytes, bytes, int]]) -> None: 

369 """Initialize TreeExtension. 

370 

371 Args: 

372 entries: List of tree cache entries (path, sha, flags) 

373 """ 

374 self.entries = entries 

375 super().__init__(TREE_EXTENSION, b"") 

376 

377 @classmethod 

378 def from_bytes(cls, data: bytes) -> "TreeExtension": 

379 """Parse TreeExtension from bytes. 

380 

381 Args: 

382 data: Raw bytes to parse 

383 

384 Returns: 

385 TreeExtension instance 

386 """ 

387 # TODO: Implement tree cache parsing 

388 return cls([]) 

389 

390 def to_bytes(self) -> bytes: 

391 """Serialize TreeExtension to bytes. 

392 

393 Returns: 

394 Serialized extension data 

395 """ 

396 # TODO: Implement tree cache serialization 

397 return b"" 

398 

399 

400class ResolveUndoExtension(IndexExtension): 

401 """Resolve undo extension for recording merge conflicts.""" 

402 

403 def __init__(self, entries: list[tuple[bytes, list[tuple[int, bytes]]]]) -> None: 

404 """Initialize ResolveUndoExtension. 

405 

406 Args: 

407 entries: List of (path, stages) where stages is a list of (stage, sha) tuples 

408 """ 

409 self.entries = entries 

410 super().__init__(REUC_EXTENSION, b"") 

411 

412 @classmethod 

413 def from_bytes(cls, data: bytes) -> "ResolveUndoExtension": 

414 """Parse ResolveUndoExtension from bytes. 

415 

416 Args: 

417 data: Raw bytes to parse 

418 

419 Returns: 

420 ResolveUndoExtension instance 

421 """ 

422 # TODO: Implement resolve undo parsing 

423 return cls([]) 

424 

425 def to_bytes(self) -> bytes: 

426 """Serialize ResolveUndoExtension to bytes. 

427 

428 Returns: 

429 Serialized extension data 

430 """ 

431 # TODO: Implement resolve undo serialization 

432 return b"" 

433 

434 

435class UntrackedExtension(IndexExtension): 

436 """Untracked cache extension.""" 

437 

438 def __init__(self, data: bytes) -> None: 

439 """Initialize UntrackedExtension. 

440 

441 Args: 

442 data: Raw untracked cache data 

443 """ 

444 super().__init__(UNTR_EXTENSION, data) 

445 

446 @classmethod 

447 def from_bytes(cls, data: bytes) -> "UntrackedExtension": 

448 """Parse UntrackedExtension from bytes. 

449 

450 Args: 

451 data: Raw bytes to parse 

452 

453 Returns: 

454 UntrackedExtension instance 

455 """ 

456 return cls(data) 

457 

458 

459class SparseDirExtension(IndexExtension): 

460 """Sparse directory extension. 

461 

462 This extension indicates that the index contains sparse directory entries. 

463 Tools that don't understand sparse index should avoid interacting with 

464 the index when this extension is present. 

465 

466 The extension data is empty - its presence is the signal. 

467 """ 

468 

469 def __init__(self) -> None: 

470 """Initialize SparseDirExtension.""" 

471 super().__init__(SDIR_EXTENSION, b"") 

472 

473 @classmethod 

474 def from_bytes(cls, data: bytes) -> "SparseDirExtension": 

475 """Parse SparseDirExtension from bytes. 

476 

477 Args: 

478 data: Raw bytes to parse (should be empty) 

479 

480 Returns: 

481 SparseDirExtension instance 

482 """ 

483 return cls() 

484 

485 def to_bytes(self) -> bytes: 

486 """Serialize SparseDirExtension to bytes. 

487 

488 Returns: 

489 Empty bytes (extension presence is the signal) 

490 """ 

491 return b"" 

492 

493 

494@dataclass 

495class IndexEntry: 

496 """Represents an entry in the Git index. 

497 

498 This is a higher-level representation of an index entry that includes 

499 parsed data and convenience methods. 

500 """ 

501 

502 ctime: int | float | tuple[int, int] 

503 mtime: int | float | tuple[int, int] 

504 dev: int 

505 ino: int 

506 mode: int 

507 uid: int 

508 gid: int 

509 size: int 

510 sha: bytes 

511 flags: int = 0 

512 extended_flags: int = 0 

513 

514 @classmethod 

515 def from_serialized(cls, serialized: SerializedIndexEntry) -> "IndexEntry": 

516 """Create an IndexEntry from a SerializedIndexEntry. 

517 

518 Args: 

519 serialized: SerializedIndexEntry to convert 

520 

521 Returns: 

522 New IndexEntry instance 

523 """ 

524 return cls( 

525 ctime=serialized.ctime, 

526 mtime=serialized.mtime, 

527 dev=serialized.dev, 

528 ino=serialized.ino, 

529 mode=serialized.mode, 

530 uid=serialized.uid, 

531 gid=serialized.gid, 

532 size=serialized.size, 

533 sha=serialized.sha, 

534 flags=serialized.flags, 

535 extended_flags=serialized.extended_flags, 

536 ) 

537 

538 def serialize(self, name: bytes, stage: Stage) -> SerializedIndexEntry: 

539 """Serialize this entry with a given name and stage. 

540 

541 Args: 

542 name: Path name for the entry 

543 stage: Merge conflict stage 

544 

545 Returns: 

546 SerializedIndexEntry ready for writing to disk 

547 """ 

548 # Clear out any existing stage bits, then set them from the Stage. 

549 new_flags = self.flags & ~FLAG_STAGEMASK 

550 new_flags |= stage.value << FLAG_STAGESHIFT 

551 return SerializedIndexEntry( 

552 name=name, 

553 ctime=self.ctime, 

554 mtime=self.mtime, 

555 dev=self.dev, 

556 ino=self.ino, 

557 mode=self.mode, 

558 uid=self.uid, 

559 gid=self.gid, 

560 size=self.size, 

561 sha=self.sha, 

562 flags=new_flags, 

563 extended_flags=self.extended_flags, 

564 ) 

565 

566 def stage(self) -> Stage: 

567 """Get the merge conflict stage of this entry. 

568 

569 Returns: 

570 Stage enum value 

571 """ 

572 return Stage((self.flags & FLAG_STAGEMASK) >> FLAG_STAGESHIFT) 

573 

574 @property 

575 def skip_worktree(self) -> bool: 

576 """Return True if the skip-worktree bit is set in extended_flags.""" 

577 return bool(self.extended_flags & EXTENDED_FLAG_SKIP_WORKTREE) 

578 

579 def set_skip_worktree(self, skip: bool = True) -> None: 

580 """Helper method to set or clear the skip-worktree bit in extended_flags. 

581 

582 Also sets FLAG_EXTENDED in self.flags if needed. 

583 """ 

584 if skip: 

585 # Turn on the skip-worktree bit 

586 self.extended_flags |= EXTENDED_FLAG_SKIP_WORKTREE 

587 # Also ensure the main 'extended' bit is set in flags 

588 self.flags |= FLAG_EXTENDED 

589 else: 

590 # Turn off the skip-worktree bit 

591 self.extended_flags &= ~EXTENDED_FLAG_SKIP_WORKTREE 

592 # Optionally unset the main extended bit if no extended flags remain 

593 if self.extended_flags == 0: 

594 self.flags &= ~FLAG_EXTENDED 

595 

596 def is_sparse_dir(self, name: bytes) -> bool: 

597 """Check if this entry represents a sparse directory. 

598 

599 A sparse directory entry is a collapsed representation of an entire 

600 directory tree in a sparse index. It has: 

601 - Directory mode (0o040000) 

602 - SKIP_WORKTREE flag set 

603 - Path ending with '/' 

604 - SHA pointing to a tree object 

605 

606 Args: 

607 name: The path name for this entry (IndexEntry doesn't store name) 

608 

609 Returns: 

610 True if entry is a sparse directory entry 

611 """ 

612 return ( 

613 stat.S_ISDIR(self.mode) 

614 and bool(self.extended_flags & EXTENDED_FLAG_SKIP_WORKTREE) 

615 and name.endswith(b"/") 

616 ) 

617 

618 

619class ConflictedIndexEntry: 

620 """Index entry that represents a conflict.""" 

621 

622 ancestor: IndexEntry | None 

623 this: IndexEntry | None 

624 other: IndexEntry | None 

625 

626 def __init__( 

627 self, 

628 ancestor: IndexEntry | None = None, 

629 this: IndexEntry | None = None, 

630 other: IndexEntry | None = None, 

631 ) -> None: 

632 """Initialize ConflictedIndexEntry. 

633 

634 Args: 

635 ancestor: The common ancestor entry 

636 this: The current branch entry 

637 other: The other branch entry 

638 """ 

639 self.ancestor = ancestor 

640 self.this = this 

641 self.other = other 

642 

643 

644class UnmergedEntries(Exception): 

645 """Unmerged entries exist in the index.""" 

646 

647 

648def pathsplit(path: bytes) -> tuple[bytes, bytes]: 

649 """Split a /-delimited path into a directory part and a basename. 

650 

651 Args: 

652 path: The path to split. 

653 

654 Returns: 

655 Tuple with directory name and basename 

656 """ 

657 try: 

658 (dirname, basename) = path.rsplit(b"/", 1) 

659 except ValueError: 

660 return (b"", path) 

661 else: 

662 return (dirname, basename) 

663 

664 

665def pathjoin(*args: bytes) -> bytes: 

666 """Join a /-delimited path.""" 

667 return b"/".join([p for p in args if p]) 

668 

669 

670def read_cache_time(f: BinaryIO) -> tuple[int, int]: 

671 """Read a cache time. 

672 

673 Args: 

674 f: File-like object to read from 

675 Returns: 

676 Tuple with seconds and nanoseconds 

677 """ 

678 return struct.unpack(">LL", f.read(8)) 

679 

680 

681def write_cache_time(f: IO[bytes], t: int | float | tuple[int, int]) -> None: 

682 """Write a cache time. 

683 

684 Args: 

685 f: File-like object to write to 

686 t: Time to write (as int, float or tuple with secs and nsecs) 

687 """ 

688 if isinstance(t, int): 

689 t = (t, 0) 

690 elif isinstance(t, float): 

691 (secs, nsecs) = divmod(t, 1.0) 

692 t = (int(secs), int(nsecs * 1000000000)) 

693 elif not isinstance(t, tuple): 

694 raise TypeError(t) 

695 f.write(struct.pack(">LL", *t)) 

696 

697 

698def read_cache_entry( 

699 f: BinaryIO, version: int, previous_path: bytes = b"" 

700) -> SerializedIndexEntry: 

701 """Read an entry from a cache file. 

702 

703 Args: 

704 f: File-like object to read from 

705 version: Index version 

706 previous_path: Previous entry's path (for version 4 compression) 

707 """ 

708 beginoffset = f.tell() 

709 ctime = read_cache_time(f) 

710 mtime = read_cache_time(f) 

711 ( 

712 dev, 

713 ino, 

714 mode, 

715 uid, 

716 gid, 

717 size, 

718 sha, 

719 flags, 

720 ) = struct.unpack(">LLLLLL20sH", f.read(20 + 4 * 6 + 2)) 

721 if flags & FLAG_EXTENDED: 

722 if version < 3: 

723 raise AssertionError("extended flag set in index with version < 3") 

724 (extended_flags,) = struct.unpack(">H", f.read(2)) 

725 else: 

726 extended_flags = 0 

727 

728 if version >= 4: 

729 # Version 4: paths are always compressed (name_len should be 0) 

730 name, _consumed = _decompress_path_from_stream(f, previous_path) 

731 else: 

732 # Versions < 4: regular name reading 

733 name = f.read(flags & FLAG_NAMEMASK) 

734 

735 # Padding: 

736 if version < 4: 

737 real_size = (f.tell() - beginoffset + 8) & ~7 

738 f.read((beginoffset + real_size) - f.tell()) 

739 

740 return SerializedIndexEntry( 

741 name, 

742 ctime, 

743 mtime, 

744 dev, 

745 ino, 

746 mode, 

747 uid, 

748 gid, 

749 size, 

750 sha_to_hex(sha), 

751 flags & ~FLAG_NAMEMASK, 

752 extended_flags, 

753 ) 

754 

755 

756def write_cache_entry( 

757 f: IO[bytes], entry: SerializedIndexEntry, version: int, previous_path: bytes = b"" 

758) -> None: 

759 """Write an index entry to a file. 

760 

761 Args: 

762 f: File object 

763 entry: IndexEntry to write 

764 version: Index format version 

765 previous_path: Previous entry's path (for version 4 compression) 

766 """ 

767 beginoffset = f.tell() 

768 write_cache_time(f, entry.ctime) 

769 write_cache_time(f, entry.mtime) 

770 

771 if version >= 4: 

772 # Version 4: use compression but set name_len to actual filename length 

773 # This matches how C Git implements index v4 flags 

774 compressed_path = _compress_path(entry.name, previous_path) 

775 flags = len(entry.name) | (entry.flags & ~FLAG_NAMEMASK) 

776 else: 

777 # Versions < 4: include actual name length 

778 flags = len(entry.name) | (entry.flags & ~FLAG_NAMEMASK) 

779 

780 if entry.extended_flags: 

781 flags |= FLAG_EXTENDED 

782 if flags & FLAG_EXTENDED and version is not None and version < 3: 

783 raise AssertionError("unable to use extended flags in version < 3") 

784 

785 f.write( 

786 struct.pack( 

787 b">LLLLLL20sH", 

788 entry.dev & 0xFFFFFFFF, 

789 entry.ino & 0xFFFFFFFF, 

790 entry.mode, 

791 entry.uid, 

792 entry.gid, 

793 entry.size, 

794 hex_to_sha(entry.sha), 

795 flags, 

796 ) 

797 ) 

798 if flags & FLAG_EXTENDED: 

799 f.write(struct.pack(b">H", entry.extended_flags)) 

800 

801 if version >= 4: 

802 # Version 4: always write compressed path 

803 f.write(compressed_path) 

804 else: 

805 # Versions < 4: write regular path and padding 

806 f.write(entry.name) 

807 real_size = (f.tell() - beginoffset + 8) & ~7 

808 f.write(b"\0" * ((beginoffset + real_size) - f.tell())) 

809 

810 

811class UnsupportedIndexFormat(Exception): 

812 """An unsupported index format was encountered.""" 

813 

814 def __init__(self, version: int) -> None: 

815 """Initialize UnsupportedIndexFormat exception. 

816 

817 Args: 

818 version: The unsupported index format version 

819 """ 

820 self.index_format_version = version 

821 

822 

823def read_index_header(f: BinaryIO) -> tuple[int, int]: 

824 """Read an index header from a file. 

825 

826 Returns: 

827 tuple of (version, num_entries) 

828 """ 

829 header = f.read(4) 

830 if header != b"DIRC": 

831 raise AssertionError(f"Invalid index file header: {header!r}") 

832 (version, num_entries) = struct.unpack(b">LL", f.read(4 * 2)) 

833 if version not in (1, 2, 3, 4): 

834 raise UnsupportedIndexFormat(version) 

835 return version, num_entries 

836 

837 

838def write_index_extension(f: IO[bytes], extension: IndexExtension) -> None: 

839 """Write an index extension. 

840 

841 Args: 

842 f: File-like object to write to 

843 extension: Extension to write 

844 """ 

845 data = extension.to_bytes() 

846 f.write(extension.signature) 

847 f.write(struct.pack(">I", len(data))) 

848 f.write(data) 

849 

850 

851def read_index(f: BinaryIO) -> Iterator[SerializedIndexEntry]: 

852 """Read an index file, yielding the individual entries.""" 

853 version, num_entries = read_index_header(f) 

854 previous_path = b"" 

855 for i in range(num_entries): 

856 entry = read_cache_entry(f, version, previous_path) 

857 previous_path = entry.name 

858 yield entry 

859 

860 

861def read_index_dict_with_version( 

862 f: BinaryIO, 

863) -> tuple[dict[bytes, IndexEntry | ConflictedIndexEntry], int, list[IndexExtension]]: 

864 """Read an index file and return it as a dictionary along with the version. 

865 

866 Returns: 

867 tuple of (entries_dict, version, extensions) 

868 """ 

869 version, num_entries = read_index_header(f) 

870 

871 ret: dict[bytes, IndexEntry | ConflictedIndexEntry] = {} 

872 previous_path = b"" 

873 for i in range(num_entries): 

874 entry = read_cache_entry(f, version, previous_path) 

875 previous_path = entry.name 

876 stage = entry.stage() 

877 if stage == Stage.NORMAL: 

878 ret[entry.name] = IndexEntry.from_serialized(entry) 

879 else: 

880 existing = ret.setdefault(entry.name, ConflictedIndexEntry()) 

881 if isinstance(existing, IndexEntry): 

882 raise AssertionError(f"Non-conflicted entry for {entry.name!r} exists") 

883 if stage == Stage.MERGE_CONFLICT_ANCESTOR: 

884 existing.ancestor = IndexEntry.from_serialized(entry) 

885 elif stage == Stage.MERGE_CONFLICT_THIS: 

886 existing.this = IndexEntry.from_serialized(entry) 

887 elif stage == Stage.MERGE_CONFLICT_OTHER: 

888 existing.other = IndexEntry.from_serialized(entry) 

889 

890 # Read extensions 

891 extensions = [] 

892 while True: 

893 # Check if we're at the end (20 bytes before EOF for SHA checksum) 

894 current_pos = f.tell() 

895 f.seek(0, 2) # EOF 

896 eof_pos = f.tell() 

897 f.seek(current_pos) 

898 

899 if current_pos >= eof_pos - 20: 

900 break 

901 

902 # Try to read extension signature 

903 signature = f.read(4) 

904 if len(signature) < 4: 

905 break 

906 

907 # Check if it's a valid extension signature (4 uppercase letters) 

908 if not all(65 <= b <= 90 for b in signature): 

909 # Not an extension, seek back 

910 f.seek(-4, 1) 

911 break 

912 

913 # Read extension size 

914 size_data = f.read(4) 

915 if len(size_data) < 4: 

916 break 

917 size = struct.unpack(">I", size_data)[0] 

918 

919 # Read extension data 

920 data = f.read(size) 

921 if len(data) < size: 

922 break 

923 

924 extension = IndexExtension.from_raw(signature, data) 

925 extensions.append(extension) 

926 

927 return ret, version, extensions 

928 

929 

930def read_index_dict( 

931 f: BinaryIO, 

932) -> dict[bytes, IndexEntry | ConflictedIndexEntry]: 

933 """Read an index file and return it as a dictionary. 

934 

935 Dict Key is tuple of path and stage number, as 

936 path alone is not unique 

937 Args: 

938 f: File object to read fromls. 

939 """ 

940 ret: dict[bytes, IndexEntry | ConflictedIndexEntry] = {} 

941 for entry in read_index(f): 

942 stage = entry.stage() 

943 if stage == Stage.NORMAL: 

944 ret[entry.name] = IndexEntry.from_serialized(entry) 

945 else: 

946 existing = ret.setdefault(entry.name, ConflictedIndexEntry()) 

947 if isinstance(existing, IndexEntry): 

948 raise AssertionError(f"Non-conflicted entry for {entry.name!r} exists") 

949 if stage == Stage.MERGE_CONFLICT_ANCESTOR: 

950 existing.ancestor = IndexEntry.from_serialized(entry) 

951 elif stage == Stage.MERGE_CONFLICT_THIS: 

952 existing.this = IndexEntry.from_serialized(entry) 

953 elif stage == Stage.MERGE_CONFLICT_OTHER: 

954 existing.other = IndexEntry.from_serialized(entry) 

955 return ret 

956 

957 

958def write_index( 

959 f: IO[bytes], 

960 entries: Sequence[SerializedIndexEntry], 

961 version: int | None = None, 

962 extensions: Sequence[IndexExtension] | None = None, 

963) -> None: 

964 """Write an index file. 

965 

966 Args: 

967 f: File-like object to write to 

968 version: Version number to write 

969 entries: Iterable over the entries to write 

970 extensions: Optional list of extensions to write 

971 """ 

972 if version is None: 

973 version = DEFAULT_VERSION 

974 # STEP 1: check if any extended_flags are set 

975 uses_extended_flags = any(e.extended_flags != 0 for e in entries) 

976 if uses_extended_flags and version < 3: 

977 # Force or bump the version to 3 

978 version = 3 

979 # The rest is unchanged, but you might insert a final check: 

980 if version < 3: 

981 # Double-check no extended flags appear 

982 for e in entries: 

983 if e.extended_flags != 0: 

984 raise AssertionError("Attempt to use extended flags in index < v3") 

985 # Proceed with the existing code to write the header and entries. 

986 f.write(b"DIRC") 

987 f.write(struct.pack(b">LL", version, len(entries))) 

988 previous_path = b"" 

989 for entry in entries: 

990 write_cache_entry(f, entry, version=version, previous_path=previous_path) 

991 previous_path = entry.name 

992 

993 # Write extensions 

994 if extensions: 

995 for extension in extensions: 

996 write_index_extension(f, extension) 

997 

998 

999def write_index_dict( 

1000 f: IO[bytes], 

1001 entries: Mapping[bytes, IndexEntry | ConflictedIndexEntry], 

1002 version: int | None = None, 

1003 extensions: Sequence[IndexExtension] | None = None, 

1004) -> None: 

1005 """Write an index file based on the contents of a dictionary. 

1006 

1007 being careful to sort by path and then by stage. 

1008 """ 

1009 entries_list = [] 

1010 for key in sorted(entries): 

1011 value = entries[key] 

1012 if isinstance(value, ConflictedIndexEntry): 

1013 if value.ancestor is not None: 

1014 entries_list.append( 

1015 value.ancestor.serialize(key, Stage.MERGE_CONFLICT_ANCESTOR) 

1016 ) 

1017 if value.this is not None: 

1018 entries_list.append( 

1019 value.this.serialize(key, Stage.MERGE_CONFLICT_THIS) 

1020 ) 

1021 if value.other is not None: 

1022 entries_list.append( 

1023 value.other.serialize(key, Stage.MERGE_CONFLICT_OTHER) 

1024 ) 

1025 else: 

1026 entries_list.append(value.serialize(key, Stage.NORMAL)) 

1027 

1028 write_index(f, entries_list, version=version, extensions=extensions) 

1029 

1030 

1031def cleanup_mode(mode: int) -> int: 

1032 """Cleanup a mode value. 

1033 

1034 This will return a mode that can be stored in a tree object. 

1035 

1036 Args: 

1037 mode: Mode to clean up. 

1038 

1039 Returns: 

1040 mode 

1041 """ 

1042 if stat.S_ISLNK(mode): 

1043 return stat.S_IFLNK 

1044 elif stat.S_ISDIR(mode): 

1045 return stat.S_IFDIR 

1046 elif S_ISGITLINK(mode): 

1047 return S_IFGITLINK 

1048 ret = stat.S_IFREG | 0o644 

1049 if mode & 0o100: 

1050 ret |= 0o111 

1051 return ret 

1052 

1053 

1054class Index: 

1055 """A Git Index file.""" 

1056 

1057 _byname: dict[bytes, IndexEntry | ConflictedIndexEntry] 

1058 

1059 def __init__( 

1060 self, 

1061 filename: bytes | str | os.PathLike[str], 

1062 read: bool = True, 

1063 skip_hash: bool = False, 

1064 version: int | None = None, 

1065 ) -> None: 

1066 """Create an index object associated with the given filename. 

1067 

1068 Args: 

1069 filename: Path to the index file 

1070 read: Whether to initialize the index from the given file, should it exist. 

1071 skip_hash: Whether to skip SHA1 hash when writing (for manyfiles feature) 

1072 version: Index format version to use (None = auto-detect from file or use default) 

1073 """ 

1074 self._filename = os.fspath(filename) 

1075 # TODO(jelmer): Store the version returned by read_index 

1076 self._version = version 

1077 self._skip_hash = skip_hash 

1078 self._extensions: list[IndexExtension] = [] 

1079 self.clear() 

1080 if read: 

1081 self.read() 

1082 

1083 @property 

1084 def path(self) -> bytes | str: 

1085 """Get the path to the index file. 

1086 

1087 Returns: 

1088 Path to the index file 

1089 """ 

1090 return self._filename 

1091 

1092 def __repr__(self) -> str: 

1093 """Return string representation of Index.""" 

1094 return f"{self.__class__.__name__}({self._filename!r})" 

1095 

1096 def write(self) -> None: 

1097 """Write current contents of index to disk.""" 

1098 f = GitFile(self._filename, "wb") 

1099 try: 

1100 # Filter out extensions with no meaningful data 

1101 meaningful_extensions = [] 

1102 for ext in self._extensions: 

1103 # Skip extensions that have empty data 

1104 ext_data = ext.to_bytes() 

1105 if ext_data: 

1106 meaningful_extensions.append(ext) 

1107 

1108 if self._skip_hash: 

1109 # When skipHash is enabled, write the index without computing SHA1 

1110 write_index_dict( 

1111 f, 

1112 self._byname, 

1113 version=self._version, 

1114 extensions=meaningful_extensions, 

1115 ) 

1116 # Write 20 zero bytes instead of SHA1 

1117 f.write(b"\x00" * 20) 

1118 f.close() 

1119 else: 

1120 sha1_writer = SHA1Writer(f) 

1121 write_index_dict( 

1122 sha1_writer, 

1123 self._byname, 

1124 version=self._version, 

1125 extensions=meaningful_extensions, 

1126 ) 

1127 sha1_writer.close() 

1128 except: 

1129 f.close() 

1130 raise 

1131 

1132 def read(self) -> None: 

1133 """Read current contents of index from disk.""" 

1134 if not os.path.exists(self._filename): 

1135 return 

1136 f = GitFile(self._filename, "rb") 

1137 try: 

1138 sha1_reader = SHA1Reader(f) 

1139 entries, version, extensions = read_index_dict_with_version(sha1_reader) 

1140 self._version = version 

1141 self._extensions = extensions 

1142 self.update(entries) 

1143 # Extensions have already been read by read_index_dict_with_version 

1144 sha1_reader.check_sha(allow_empty=True) 

1145 finally: 

1146 f.close() 

1147 

1148 def __len__(self) -> int: 

1149 """Number of entries in this index file.""" 

1150 return len(self._byname) 

1151 

1152 def __getitem__(self, key: bytes) -> IndexEntry | ConflictedIndexEntry: 

1153 """Retrieve entry by relative path and stage. 

1154 

1155 Returns: Either a IndexEntry or a ConflictedIndexEntry 

1156 Raises KeyError: if the entry does not exist 

1157 """ 

1158 return self._byname[key] 

1159 

1160 def __iter__(self) -> Iterator[bytes]: 

1161 """Iterate over the paths and stages in this index.""" 

1162 return iter(self._byname) 

1163 

1164 def __contains__(self, key: bytes) -> bool: 

1165 """Check if a path exists in the index.""" 

1166 return key in self._byname 

1167 

1168 def get_sha1(self, path: bytes) -> bytes: 

1169 """Return the (git object) SHA1 for the object at a path.""" 

1170 value = self[path] 

1171 if isinstance(value, ConflictedIndexEntry): 

1172 raise UnmergedEntries 

1173 return value.sha 

1174 

1175 def get_mode(self, path: bytes) -> int: 

1176 """Return the POSIX file mode for the object at a path.""" 

1177 value = self[path] 

1178 if isinstance(value, ConflictedIndexEntry): 

1179 raise UnmergedEntries 

1180 return value.mode 

1181 

1182 def iterobjects(self) -> Iterable[tuple[bytes, bytes, int]]: 

1183 """Iterate over path, sha, mode tuples for use with commit_tree.""" 

1184 for path in self: 

1185 entry = self[path] 

1186 if isinstance(entry, ConflictedIndexEntry): 

1187 raise UnmergedEntries 

1188 yield path, entry.sha, cleanup_mode(entry.mode) 

1189 

1190 def has_conflicts(self) -> bool: 

1191 """Check if the index contains any conflicted entries. 

1192 

1193 Returns: 

1194 True if any entries are conflicted, False otherwise 

1195 """ 

1196 for value in self._byname.values(): 

1197 if isinstance(value, ConflictedIndexEntry): 

1198 return True 

1199 return False 

1200 

1201 def clear(self) -> None: 

1202 """Remove all contents from this index.""" 

1203 self._byname = {} 

1204 

1205 def __setitem__( 

1206 self, name: bytes, value: IndexEntry | ConflictedIndexEntry 

1207 ) -> None: 

1208 """Set an entry in the index.""" 

1209 assert isinstance(name, bytes) 

1210 self._byname[name] = value 

1211 

1212 def __delitem__(self, name: bytes) -> None: 

1213 """Delete an entry from the index.""" 

1214 del self._byname[name] 

1215 

1216 def iteritems( 

1217 self, 

1218 ) -> Iterator[tuple[bytes, IndexEntry | ConflictedIndexEntry]]: 

1219 """Iterate over (path, entry) pairs in the index. 

1220 

1221 Returns: 

1222 Iterator of (path, entry) tuples 

1223 """ 

1224 return iter(self._byname.items()) 

1225 

1226 def items(self) -> Iterator[tuple[bytes, IndexEntry | ConflictedIndexEntry]]: 

1227 """Get an iterator over (path, entry) pairs. 

1228 

1229 Returns: 

1230 Iterator of (path, entry) tuples 

1231 """ 

1232 return iter(self._byname.items()) 

1233 

1234 def update(self, entries: dict[bytes, IndexEntry | ConflictedIndexEntry]) -> None: 

1235 """Update the index with multiple entries. 

1236 

1237 Args: 

1238 entries: Dictionary mapping paths to index entries 

1239 """ 

1240 for key, value in entries.items(): 

1241 self[key] = value 

1242 

1243 def paths(self) -> Generator[bytes, None, None]: 

1244 """Generate all paths in the index. 

1245 

1246 Yields: 

1247 Path names as bytes 

1248 """ 

1249 yield from self._byname.keys() 

1250 

1251 def changes_from_tree( 

1252 self, 

1253 object_store: ObjectContainer, 

1254 tree: ObjectID, 

1255 want_unchanged: bool = False, 

1256 ) -> Generator[ 

1257 tuple[ 

1258 tuple[bytes | None, bytes | None], 

1259 tuple[int | None, int | None], 

1260 tuple[bytes | None, bytes | None], 

1261 ], 

1262 None, 

1263 None, 

1264 ]: 

1265 """Find the differences between the contents of this index and a tree. 

1266 

1267 Args: 

1268 object_store: Object store to use for retrieving tree contents 

1269 tree: SHA1 of the root tree 

1270 want_unchanged: Whether unchanged files should be reported 

1271 Returns: Iterator over tuples with (oldpath, newpath), (oldmode, 

1272 newmode), (oldsha, newsha) 

1273 """ 

1274 

1275 def lookup_entry(path: bytes) -> tuple[bytes, int]: 

1276 entry = self[path] 

1277 if hasattr(entry, "sha") and hasattr(entry, "mode"): 

1278 return entry.sha, cleanup_mode(entry.mode) 

1279 else: 

1280 # Handle ConflictedIndexEntry case 

1281 return b"", 0 

1282 

1283 yield from changes_from_tree( 

1284 self.paths(), 

1285 lookup_entry, 

1286 object_store, 

1287 tree, 

1288 want_unchanged=want_unchanged, 

1289 ) 

1290 

1291 def commit(self, object_store: ObjectContainer) -> bytes: 

1292 """Create a new tree from an index. 

1293 

1294 Args: 

1295 object_store: Object store to save the tree in 

1296 Returns: 

1297 Root tree SHA 

1298 """ 

1299 return commit_tree(object_store, self.iterobjects()) 

1300 

1301 def is_sparse(self) -> bool: 

1302 """Check if this index contains sparse directory entries. 

1303 

1304 Returns: 

1305 True if any sparse directory extension is present 

1306 """ 

1307 return any(isinstance(ext, SparseDirExtension) for ext in self._extensions) 

1308 

1309 def ensure_full_index(self, object_store: "BaseObjectStore") -> None: 

1310 """Expand all sparse directory entries into full file entries. 

1311 

1312 This converts a sparse index into a full index by recursively 

1313 expanding any sparse directory entries into their constituent files. 

1314 

1315 Args: 

1316 object_store: Object store to read tree objects from 

1317 

1318 Raises: 

1319 KeyError: If a tree object referenced by a sparse dir entry doesn't exist 

1320 """ 

1321 if not self.is_sparse(): 

1322 return 

1323 

1324 # Find all sparse directory entries 

1325 sparse_dirs = [] 

1326 for path, entry in list(self._byname.items()): 

1327 if isinstance(entry, IndexEntry) and entry.is_sparse_dir(path): 

1328 sparse_dirs.append((path, entry)) 

1329 

1330 # Expand each sparse directory 

1331 for path, entry in sparse_dirs: 

1332 # Remove the sparse directory entry 

1333 del self._byname[path] 

1334 

1335 # Get the tree object 

1336 tree = object_store[entry.sha] 

1337 if not isinstance(tree, Tree): 

1338 raise ValueError(f"Sparse directory {path!r} points to non-tree object") 

1339 

1340 # Recursively add all entries from the tree 

1341 self._expand_tree(path.rstrip(b"/"), tree, object_store, entry) 

1342 

1343 # Remove the sparse directory extension 

1344 self._extensions = [ 

1345 ext for ext in self._extensions if not isinstance(ext, SparseDirExtension) 

1346 ] 

1347 

1348 def _expand_tree( 

1349 self, 

1350 prefix: bytes, 

1351 tree: Tree, 

1352 object_store: "BaseObjectStore", 

1353 template_entry: IndexEntry, 

1354 ) -> None: 

1355 """Recursively expand a tree into index entries. 

1356 

1357 Args: 

1358 prefix: Path prefix for entries (without trailing slash) 

1359 tree: Tree object to expand 

1360 object_store: Object store to read nested trees from 

1361 template_entry: Template entry to copy metadata from 

1362 """ 

1363 for name, mode, sha in tree.items(): 

1364 if prefix: 

1365 full_path = prefix + b"/" + name 

1366 else: 

1367 full_path = name 

1368 

1369 if stat.S_ISDIR(mode): 

1370 # Recursively expand subdirectories 

1371 subtree = object_store[sha] 

1372 if not isinstance(subtree, Tree): 

1373 raise ValueError( 

1374 f"Directory entry {full_path!r} points to non-tree object" 

1375 ) 

1376 self._expand_tree(full_path, subtree, object_store, template_entry) 

1377 else: 

1378 # Create an index entry for this file 

1379 # Use the template entry for metadata but with the file's sha and mode 

1380 new_entry = IndexEntry( 

1381 ctime=template_entry.ctime, 

1382 mtime=template_entry.mtime, 

1383 dev=template_entry.dev, 

1384 ino=template_entry.ino, 

1385 mode=mode, 

1386 uid=template_entry.uid, 

1387 gid=template_entry.gid, 

1388 size=0, # Size is unknown from tree 

1389 sha=sha, 

1390 flags=0, 

1391 extended_flags=0, # Don't copy skip-worktree flag 

1392 ) 

1393 self._byname[full_path] = new_entry 

1394 

1395 def convert_to_sparse( 

1396 self, 

1397 object_store: "BaseObjectStore", 

1398 tree_sha: bytes, 

1399 sparse_dirs: Set[bytes], 

1400 ) -> None: 

1401 """Convert full index entries to sparse directory entries. 

1402 

1403 This collapses directories that are entirely outside the sparse 

1404 checkout cone into single sparse directory entries. 

1405 

1406 Args: 

1407 object_store: Object store to read tree objects 

1408 tree_sha: SHA of the tree (usually HEAD) to base sparse dirs on 

1409 sparse_dirs: Set of directory paths (with trailing /) to collapse 

1410 

1411 Raises: 

1412 KeyError: If tree_sha or a subdirectory doesn't exist 

1413 """ 

1414 if not sparse_dirs: 

1415 return 

1416 

1417 # Get the base tree 

1418 tree = object_store[tree_sha] 

1419 if not isinstance(tree, Tree): 

1420 raise ValueError(f"tree_sha {tree_sha!r} is not a tree object") 

1421 

1422 # For each sparse directory, find its tree SHA and create sparse entry 

1423 for dir_path in sparse_dirs: 

1424 dir_path_stripped = dir_path.rstrip(b"/") 

1425 

1426 # Find the tree SHA for this directory 

1427 subtree_sha = self._find_subtree_sha(tree, dir_path_stripped, object_store) 

1428 if subtree_sha is None: 

1429 # Directory doesn't exist in tree, skip it 

1430 continue 

1431 

1432 # Remove all entries under this directory 

1433 entries_to_remove = [ 

1434 path 

1435 for path in self._byname 

1436 if path.startswith(dir_path) or path == dir_path_stripped 

1437 ] 

1438 for path in entries_to_remove: 

1439 del self._byname[path] 

1440 

1441 # Create a sparse directory entry 

1442 # Use minimal metadata since it's not a real file 

1443 sparse_entry = IndexEntry( 

1444 ctime=0, 

1445 mtime=0, 

1446 dev=0, 

1447 ino=0, 

1448 mode=stat.S_IFDIR, 

1449 uid=0, 

1450 gid=0, 

1451 size=0, 

1452 sha=subtree_sha, 

1453 flags=0, 

1454 extended_flags=EXTENDED_FLAG_SKIP_WORKTREE, 

1455 ) 

1456 self._byname[dir_path] = sparse_entry 

1457 

1458 # Add sparse directory extension if not present 

1459 if not self.is_sparse(): 

1460 self._extensions.append(SparseDirExtension()) 

1461 

1462 def _find_subtree_sha( 

1463 self, 

1464 tree: Tree, 

1465 path: bytes, 

1466 object_store: "BaseObjectStore", 

1467 ) -> bytes | None: 

1468 """Find the SHA of a subtree at a given path. 

1469 

1470 Args: 

1471 tree: Root tree object to search in 

1472 path: Path to the subdirectory (no trailing slash) 

1473 object_store: Object store to read nested trees from 

1474 

1475 Returns: 

1476 SHA of the subtree, or None if path doesn't exist 

1477 """ 

1478 if not path: 

1479 return tree.id 

1480 

1481 parts = path.split(b"/") 

1482 current_tree = tree 

1483 

1484 for part in parts: 

1485 # Look for this part in the current tree 

1486 try: 

1487 mode, sha = current_tree[part] 

1488 except KeyError: 

1489 return None 

1490 

1491 if not stat.S_ISDIR(mode): 

1492 # Path component is a file, not a directory 

1493 return None 

1494 

1495 # Load the next tree 

1496 obj = object_store[sha] 

1497 if not isinstance(obj, Tree): 

1498 return None 

1499 current_tree = obj 

1500 

1501 return current_tree.id 

1502 

1503 

1504def commit_tree( 

1505 object_store: ObjectContainer, blobs: Iterable[tuple[bytes, bytes, int]] 

1506) -> bytes: 

1507 """Commit a new tree. 

1508 

1509 Args: 

1510 object_store: Object store to add trees to 

1511 blobs: Iterable over blob path, sha, mode entries 

1512 Returns: 

1513 SHA1 of the created tree. 

1514 """ 

1515 trees: dict[bytes, TreeDict] = {b"": {}} 

1516 

1517 def add_tree(path: bytes) -> TreeDict: 

1518 if path in trees: 

1519 return trees[path] 

1520 dirname, basename = pathsplit(path) 

1521 t = add_tree(dirname) 

1522 assert isinstance(basename, bytes) 

1523 newtree: TreeDict = {} 

1524 t[basename] = newtree 

1525 trees[path] = newtree 

1526 return newtree 

1527 

1528 for path, sha, mode in blobs: 

1529 tree_path, basename = pathsplit(path) 

1530 tree = add_tree(tree_path) 

1531 tree[basename] = (mode, sha) 

1532 

1533 def build_tree(path: bytes) -> bytes: 

1534 tree = Tree() 

1535 for basename, entry in trees[path].items(): 

1536 if isinstance(entry, dict): 

1537 mode = stat.S_IFDIR 

1538 sha = build_tree(pathjoin(path, basename)) 

1539 else: 

1540 (mode, sha) = entry 

1541 tree.add(basename, mode, sha) 

1542 object_store.add_object(tree) 

1543 return tree.id 

1544 

1545 return build_tree(b"") 

1546 

1547 

1548def commit_index(object_store: ObjectContainer, index: Index) -> bytes: 

1549 """Create a new tree from an index. 

1550 

1551 Args: 

1552 object_store: Object store to save the tree in 

1553 index: Index file 

1554 Note: This function is deprecated, use index.commit() instead. 

1555 Returns: Root tree sha. 

1556 """ 

1557 return commit_tree(object_store, index.iterobjects()) 

1558 

1559 

1560def changes_from_tree( 

1561 names: Iterable[bytes], 

1562 lookup_entry: Callable[[bytes], tuple[bytes, int]], 

1563 object_store: ObjectContainer, 

1564 tree: bytes | None, 

1565 want_unchanged: bool = False, 

1566) -> Iterable[ 

1567 tuple[ 

1568 tuple[bytes | None, bytes | None], 

1569 tuple[int | None, int | None], 

1570 tuple[bytes | None, bytes | None], 

1571 ] 

1572]: 

1573 """Find the differences between the contents of a tree and a working copy. 

1574 

1575 Args: 

1576 names: Iterable of names in the working copy 

1577 lookup_entry: Function to lookup an entry in the working copy 

1578 object_store: Object store to use for retrieving tree contents 

1579 tree: SHA1 of the root tree, or None for an empty tree 

1580 want_unchanged: Whether unchanged files should be reported 

1581 Returns: Iterator over tuples with (oldpath, newpath), (oldmode, newmode), 

1582 (oldsha, newsha) 

1583 """ 

1584 # TODO(jelmer): Support a include_trees option 

1585 other_names = set(names) 

1586 

1587 if tree is not None: 

1588 for name, mode, sha in iter_tree_contents(object_store, tree): 

1589 assert name is not None and mode is not None and sha is not None 

1590 try: 

1591 (other_sha, other_mode) = lookup_entry(name) 

1592 except KeyError: 

1593 # Was removed 

1594 yield ((name, None), (mode, None), (sha, None)) 

1595 else: 

1596 other_names.remove(name) 

1597 if want_unchanged or other_sha != sha or other_mode != mode: 

1598 yield ((name, name), (mode, other_mode), (sha, other_sha)) 

1599 

1600 # Mention added files 

1601 for name in other_names: 

1602 try: 

1603 (other_sha, other_mode) = lookup_entry(name) 

1604 except KeyError: 

1605 pass 

1606 else: 

1607 yield ((None, name), (None, other_mode), (None, other_sha)) 

1608 

1609 

1610def index_entry_from_stat( 

1611 stat_val: os.stat_result, 

1612 hex_sha: bytes, 

1613 mode: int | None = None, 

1614) -> IndexEntry: 

1615 """Create a new index entry from a stat value. 

1616 

1617 Args: 

1618 stat_val: POSIX stat_result instance 

1619 hex_sha: Hex sha of the object 

1620 mode: Optional file mode, will be derived from stat if not provided 

1621 """ 

1622 if mode is None: 

1623 mode = cleanup_mode(stat_val.st_mode) 

1624 

1625 return IndexEntry( 

1626 ctime=stat_val.st_ctime, 

1627 mtime=stat_val.st_mtime, 

1628 dev=stat_val.st_dev, 

1629 ino=stat_val.st_ino, 

1630 mode=mode, 

1631 uid=stat_val.st_uid, 

1632 gid=stat_val.st_gid, 

1633 size=stat_val.st_size, 

1634 sha=hex_sha, 

1635 flags=0, 

1636 extended_flags=0, 

1637 ) 

1638 

1639 

1640if sys.platform == "win32": 

1641 # On Windows, creating symlinks either requires administrator privileges 

1642 # or developer mode. Raise a more helpful error when we're unable to 

1643 # create symlinks 

1644 

1645 # https://github.com/jelmer/dulwich/issues/1005 

1646 

1647 class WindowsSymlinkPermissionError(PermissionError): 

1648 """Windows-specific error for symlink creation failures. 

1649 

1650 This error is raised when symlink creation fails on Windows, 

1651 typically due to lack of developer mode or administrator privileges. 

1652 """ 

1653 

1654 def __init__(self, errno: int, msg: str, filename: str | None) -> None: 

1655 """Initialize WindowsSymlinkPermissionError.""" 

1656 super(PermissionError, self).__init__( 

1657 errno, 

1658 f"Unable to create symlink; do you have developer mode enabled? {msg}", 

1659 filename, 

1660 ) 

1661 

1662 def symlink( 

1663 src: str | bytes, 

1664 dst: str | bytes, 

1665 target_is_directory: bool = False, 

1666 *, 

1667 dir_fd: int | None = None, 

1668 ) -> None: 

1669 """Create a symbolic link on Windows with better error handling. 

1670 

1671 Args: 

1672 src: Source path for the symlink 

1673 dst: Destination path where symlink will be created 

1674 target_is_directory: Whether the target is a directory 

1675 dir_fd: Optional directory file descriptor 

1676 

1677 Raises: 

1678 WindowsSymlinkPermissionError: If symlink creation fails due to permissions 

1679 """ 

1680 try: 

1681 return os.symlink( 

1682 src, dst, target_is_directory=target_is_directory, dir_fd=dir_fd 

1683 ) 

1684 except PermissionError as e: 

1685 raise WindowsSymlinkPermissionError( 

1686 e.errno or 0, e.strerror or "", e.filename 

1687 ) from e 

1688else: 

1689 symlink = os.symlink 

1690 

1691 

1692def build_file_from_blob( 

1693 blob: Blob, 

1694 mode: int, 

1695 target_path: bytes, 

1696 *, 

1697 honor_filemode: bool = True, 

1698 tree_encoding: str = "utf-8", 

1699 symlink_fn: Callable[ 

1700 [str | bytes | os.PathLike[str], str | bytes | os.PathLike[str]], None 

1701 ] 

1702 | None = None, 

1703) -> os.stat_result: 

1704 """Build a file or symlink on disk based on a Git object. 

1705 

1706 Args: 

1707 blob: The git object 

1708 mode: File mode 

1709 target_path: Path to write to 

1710 honor_filemode: An optional flag to honor core.filemode setting in 

1711 config file, default is core.filemode=True, change executable bit 

1712 tree_encoding: Encoding to use for tree contents 

1713 symlink_fn: Function to use for creating symlinks 

1714 Returns: stat object for the file 

1715 """ 

1716 try: 

1717 oldstat = os.lstat(target_path) 

1718 except FileNotFoundError: 

1719 oldstat = None 

1720 contents = blob.as_raw_string() 

1721 if stat.S_ISLNK(mode): 

1722 if oldstat: 

1723 _remove_file_with_readonly_handling(target_path) 

1724 if sys.platform == "win32": 

1725 # os.readlink on Python3 on Windows requires a unicode string. 

1726 contents_str = contents.decode(tree_encoding) 

1727 target_path_str = target_path.decode(tree_encoding) 

1728 (symlink_fn or symlink)(contents_str, target_path_str) 

1729 else: 

1730 (symlink_fn or symlink)(contents, target_path) 

1731 else: 

1732 if oldstat is not None and oldstat.st_size == len(contents): 

1733 with open(target_path, "rb") as f: 

1734 if f.read() == contents: 

1735 return oldstat 

1736 

1737 with open(target_path, "wb") as f: 

1738 # Write out file 

1739 f.write(contents) 

1740 

1741 if honor_filemode: 

1742 os.chmod(target_path, mode) 

1743 

1744 return os.lstat(target_path) 

1745 

1746 

1747INVALID_DOTNAMES = (b".git", b".", b"..", b"") 

1748 

1749 

1750def _normalize_path_element_default(element: bytes) -> bytes: 

1751 """Normalize path element for default case-insensitive comparison.""" 

1752 return element.lower() 

1753 

1754 

1755def _normalize_path_element_ntfs(element: bytes) -> bytes: 

1756 """Normalize path element for NTFS filesystem.""" 

1757 return element.rstrip(b". ").lower() 

1758 

1759 

1760def _normalize_path_element_hfs(element: bytes) -> bytes: 

1761 """Normalize path element for HFS+ filesystem.""" 

1762 import unicodedata 

1763 

1764 # Decode to Unicode (let UnicodeDecodeError bubble up) 

1765 element_str = element.decode("utf-8", errors="strict") 

1766 

1767 # Remove HFS+ ignorable characters 

1768 filtered = "".join(c for c in element_str if ord(c) not in HFS_IGNORABLE_CHARS) 

1769 # Normalize to NFD 

1770 normalized = unicodedata.normalize("NFD", filtered) 

1771 return normalized.lower().encode("utf-8", errors="strict") 

1772 

1773 

1774def get_path_element_normalizer(config: "Config") -> Callable[[bytes], bytes]: 

1775 """Get the appropriate path element normalization function based on config. 

1776 

1777 Args: 

1778 config: Repository configuration object 

1779 

1780 Returns: 

1781 Function that normalizes path elements for the configured filesystem 

1782 """ 

1783 import os 

1784 import sys 

1785 

1786 if config.get_boolean(b"core", b"protectNTFS", os.name == "nt"): 

1787 return _normalize_path_element_ntfs 

1788 elif config.get_boolean(b"core", b"protectHFS", sys.platform == "darwin"): 

1789 return _normalize_path_element_hfs 

1790 else: 

1791 return _normalize_path_element_default 

1792 

1793 

1794def validate_path_element_default(element: bytes) -> bool: 

1795 """Validate a path element using default rules. 

1796 

1797 Args: 

1798 element: Path element to validate 

1799 

1800 Returns: 

1801 True if path element is valid, False otherwise 

1802 """ 

1803 return _normalize_path_element_default(element) not in INVALID_DOTNAMES 

1804 

1805 

1806def validate_path_element_ntfs(element: bytes) -> bool: 

1807 """Validate a path element using NTFS filesystem rules. 

1808 

1809 Args: 

1810 element: Path element to validate 

1811 

1812 Returns: 

1813 True if path element is valid for NTFS, False otherwise 

1814 """ 

1815 normalized = _normalize_path_element_ntfs(element) 

1816 if normalized in INVALID_DOTNAMES: 

1817 return False 

1818 if normalized == b"git~1": 

1819 return False 

1820 return True 

1821 

1822 

1823# HFS+ ignorable Unicode codepoints (from Git's utf8.c) 

1824HFS_IGNORABLE_CHARS = { 

1825 0x200C, # ZERO WIDTH NON-JOINER 

1826 0x200D, # ZERO WIDTH JOINER 

1827 0x200E, # LEFT-TO-RIGHT MARK 

1828 0x200F, # RIGHT-TO-LEFT MARK 

1829 0x202A, # LEFT-TO-RIGHT EMBEDDING 

1830 0x202B, # RIGHT-TO-LEFT EMBEDDING 

1831 0x202C, # POP DIRECTIONAL FORMATTING 

1832 0x202D, # LEFT-TO-RIGHT OVERRIDE 

1833 0x202E, # RIGHT-TO-LEFT OVERRIDE 

1834 0x206A, # INHIBIT SYMMETRIC SWAPPING 

1835 0x206B, # ACTIVATE SYMMETRIC SWAPPING 

1836 0x206C, # INHIBIT ARABIC FORM SHAPING 

1837 0x206D, # ACTIVATE ARABIC FORM SHAPING 

1838 0x206E, # NATIONAL DIGIT SHAPES 

1839 0x206F, # NOMINAL DIGIT SHAPES 

1840 0xFEFF, # ZERO WIDTH NO-BREAK SPACE 

1841} 

1842 

1843 

1844def validate_path_element_hfs(element: bytes) -> bool: 

1845 """Validate path element for HFS+ filesystem. 

1846 

1847 Equivalent to Git's is_hfs_dotgit and related checks. 

1848 Uses NFD normalization and ignores HFS+ ignorable characters. 

1849 """ 

1850 try: 

1851 normalized = _normalize_path_element_hfs(element) 

1852 except UnicodeDecodeError: 

1853 # Malformed UTF-8 - be conservative and reject 

1854 return False 

1855 

1856 # Check against invalid names 

1857 if normalized in INVALID_DOTNAMES: 

1858 return False 

1859 

1860 # Also check for 8.3 short name 

1861 if normalized == b"git~1": 

1862 return False 

1863 

1864 return True 

1865 

1866 

1867def validate_path( 

1868 path: bytes, 

1869 element_validator: Callable[[bytes], bool] = validate_path_element_default, 

1870) -> bool: 

1871 """Default path validator that just checks for .git/.""" 

1872 parts = path.split(b"/") 

1873 for p in parts: 

1874 if not element_validator(p): 

1875 return False 

1876 else: 

1877 return True 

1878 

1879 

1880def build_index_from_tree( 

1881 root_path: str | bytes, 

1882 index_path: str | bytes, 

1883 object_store: ObjectContainer, 

1884 tree_id: bytes, 

1885 honor_filemode: bool = True, 

1886 validate_path_element: Callable[[bytes], bool] = validate_path_element_default, 

1887 symlink_fn: Callable[ 

1888 [str | bytes | os.PathLike[str], str | bytes | os.PathLike[str]], None 

1889 ] 

1890 | None = None, 

1891 blob_normalizer: Optional["FilterBlobNormalizer"] = None, 

1892 tree_encoding: str = "utf-8", 

1893) -> None: 

1894 """Generate and materialize index from a tree. 

1895 

1896 Args: 

1897 tree_id: Tree to materialize 

1898 root_path: Target dir for materialized index files 

1899 index_path: Target path for generated index 

1900 object_store: Non-empty object store holding tree contents 

1901 honor_filemode: An optional flag to honor core.filemode setting in 

1902 config file, default is core.filemode=True, change executable bit 

1903 validate_path_element: Function to validate path elements to check 

1904 out; default just refuses .git and .. directories. 

1905 symlink_fn: Function to use for creating symlinks 

1906 blob_normalizer: An optional BlobNormalizer to use for converting line 

1907 endings when writing blobs to the working directory. 

1908 tree_encoding: Encoding used for tree paths (default: utf-8) 

1909 

1910 Note: existing index is wiped and contents are not merged 

1911 in a working dir. Suitable only for fresh clones. 

1912 """ 

1913 index = Index(index_path, read=False) 

1914 if not isinstance(root_path, bytes): 

1915 root_path = os.fsencode(root_path) 

1916 

1917 for entry in iter_tree_contents(object_store, tree_id): 

1918 assert ( 

1919 entry.path is not None and entry.mode is not None and entry.sha is not None 

1920 ) 

1921 if not validate_path(entry.path, validate_path_element): 

1922 continue 

1923 full_path = _tree_to_fs_path(root_path, entry.path, tree_encoding) 

1924 

1925 if not os.path.exists(os.path.dirname(full_path)): 

1926 os.makedirs(os.path.dirname(full_path)) 

1927 

1928 # TODO(jelmer): Merge new index into working tree 

1929 if S_ISGITLINK(entry.mode): 

1930 if not os.path.isdir(full_path): 

1931 os.mkdir(full_path) 

1932 st = os.lstat(full_path) 

1933 # TODO(jelmer): record and return submodule paths 

1934 else: 

1935 obj = object_store[entry.sha] 

1936 assert isinstance(obj, Blob) 

1937 # Apply blob normalization for checkout if normalizer is provided 

1938 if blob_normalizer is not None: 

1939 obj = blob_normalizer.checkout_normalize(obj, entry.path) 

1940 st = build_file_from_blob( 

1941 obj, 

1942 entry.mode, 

1943 full_path, 

1944 honor_filemode=honor_filemode, 

1945 tree_encoding=tree_encoding, 

1946 symlink_fn=symlink_fn, 

1947 ) 

1948 

1949 # Add file to index 

1950 if not honor_filemode or S_ISGITLINK(entry.mode): 

1951 # we can not use tuple slicing to build a new tuple, 

1952 # because on windows that will convert the times to 

1953 # longs, which causes errors further along 

1954 st_tuple = ( 

1955 entry.mode, 

1956 st.st_ino, 

1957 st.st_dev, 

1958 st.st_nlink, 

1959 st.st_uid, 

1960 st.st_gid, 

1961 st.st_size, 

1962 st.st_atime, 

1963 st.st_mtime, 

1964 st.st_ctime, 

1965 ) 

1966 st = st.__class__(st_tuple) 

1967 # default to a stage 0 index entry (normal) 

1968 # when reading from the filesystem 

1969 index[entry.path] = index_entry_from_stat(st, entry.sha) 

1970 

1971 index.write() 

1972 

1973 

1974def blob_from_path_and_mode( 

1975 fs_path: bytes, mode: int, tree_encoding: str = "utf-8" 

1976) -> Blob: 

1977 """Create a blob from a path and a stat object. 

1978 

1979 Args: 

1980 fs_path: Full file system path to file 

1981 mode: File mode 

1982 tree_encoding: Encoding to use for tree contents 

1983 Returns: A `Blob` object 

1984 """ 

1985 assert isinstance(fs_path, bytes) 

1986 blob = Blob() 

1987 if stat.S_ISLNK(mode): 

1988 if sys.platform == "win32": 

1989 # os.readlink on Python3 on Windows requires a unicode string. 

1990 blob.data = os.readlink(os.fsdecode(fs_path)).encode(tree_encoding) 

1991 else: 

1992 blob.data = os.readlink(fs_path) 

1993 else: 

1994 with open(fs_path, "rb") as f: 

1995 blob.data = f.read() 

1996 return blob 

1997 

1998 

1999def blob_from_path_and_stat( 

2000 fs_path: bytes, st: os.stat_result, tree_encoding: str = "utf-8" 

2001) -> Blob: 

2002 """Create a blob from a path and a stat object. 

2003 

2004 Args: 

2005 fs_path: Full file system path to file 

2006 st: A stat object 

2007 tree_encoding: Encoding to use for tree contents 

2008 Returns: A `Blob` object 

2009 """ 

2010 return blob_from_path_and_mode(fs_path, st.st_mode, tree_encoding) 

2011 

2012 

2013def read_submodule_head(path: str | bytes) -> bytes | None: 

2014 """Read the head commit of a submodule. 

2015 

2016 Args: 

2017 path: path to the submodule 

2018 Returns: HEAD sha, None if not a valid head/repository 

2019 """ 

2020 from .errors import NotGitRepository 

2021 from .repo import Repo 

2022 

2023 # Repo currently expects a "str", so decode if necessary. 

2024 # TODO(jelmer): Perhaps move this into Repo() ? 

2025 if not isinstance(path, str): 

2026 path = os.fsdecode(path) 

2027 try: 

2028 repo = Repo(path) 

2029 except NotGitRepository: 

2030 return None 

2031 try: 

2032 return repo.head() 

2033 except KeyError: 

2034 return None 

2035 

2036 

2037def _has_directory_changed(tree_path: bytes, entry: IndexEntry) -> bool: 

2038 """Check if a directory has changed after getting an error. 

2039 

2040 When handling an error trying to create a blob from a path, call this 

2041 function. It will check if the path is a directory. If it's a directory 

2042 and a submodule, check the submodule head to see if it's has changed. If 

2043 not, consider the file as changed as Git tracked a file and not a 

2044 directory. 

2045 

2046 Return true if the given path should be considered as changed and False 

2047 otherwise or if the path is not a directory. 

2048 """ 

2049 # This is actually a directory 

2050 if os.path.exists(os.path.join(tree_path, b".git")): 

2051 # Submodule 

2052 head = read_submodule_head(tree_path) 

2053 if entry.sha != head: 

2054 return True 

2055 else: 

2056 # The file was changed to a directory, so consider it removed. 

2057 return True 

2058 

2059 return False 

2060 

2061 

2062os_sep_bytes = os.sep.encode("ascii") 

2063 

2064 

2065def _ensure_parent_dir_exists(full_path: bytes) -> None: 

2066 """Ensure parent directory exists, checking no parent is a file.""" 

2067 parent_dir = os.path.dirname(full_path) 

2068 if parent_dir and not os.path.exists(parent_dir): 

2069 # Walk up the directory tree to find the first existing parent 

2070 current = parent_dir 

2071 parents_to_check: list[bytes] = [] 

2072 

2073 while current and not os.path.exists(current): 

2074 parents_to_check.insert(0, current) 

2075 new_parent = os.path.dirname(current) 

2076 if new_parent == current: 

2077 # Reached the root or can't go up further 

2078 break 

2079 current = new_parent 

2080 

2081 # Check if the existing parent (if any) is a directory 

2082 if current and os.path.exists(current) and not os.path.isdir(current): 

2083 raise OSError( 

2084 f"Cannot create directory, parent path is a file: {current!r}" 

2085 ) 

2086 

2087 # Now check each parent we need to create isn't blocked by an existing file 

2088 for parent_path in parents_to_check: 

2089 if os.path.exists(parent_path) and not os.path.isdir(parent_path): 

2090 raise OSError( 

2091 f"Cannot create directory, parent path is a file: {parent_path!r}" 

2092 ) 

2093 

2094 os.makedirs(parent_dir) 

2095 

2096 

2097def _remove_file_with_readonly_handling(path: bytes) -> None: 

2098 """Remove a file, handling read-only files on Windows. 

2099 

2100 Args: 

2101 path: Path to the file to remove 

2102 """ 

2103 try: 

2104 os.unlink(path) 

2105 except PermissionError: 

2106 # On Windows, remove read-only attribute and retry 

2107 if sys.platform == "win32": 

2108 os.chmod(path, stat.S_IWRITE | stat.S_IREAD) 

2109 os.unlink(path) 

2110 else: 

2111 raise 

2112 

2113 

2114def _remove_empty_parents(path: bytes, stop_at: bytes) -> None: 

2115 """Remove empty parent directories up to stop_at.""" 

2116 parent = os.path.dirname(path) 

2117 while parent and parent != stop_at: 

2118 try: 

2119 os.rmdir(parent) 

2120 parent = os.path.dirname(parent) 

2121 except FileNotFoundError: 

2122 # Directory doesn't exist - stop trying 

2123 break 

2124 except OSError as e: 

2125 if e.errno == errno.ENOTEMPTY: 

2126 # Directory not empty - stop trying 

2127 break 

2128 raise 

2129 

2130 

2131def _check_symlink_matches( 

2132 full_path: bytes, repo_object_store: "BaseObjectStore", entry_sha: bytes 

2133) -> bool: 

2134 """Check if symlink target matches expected target. 

2135 

2136 Returns True if symlink matches, False if it doesn't match. 

2137 """ 

2138 try: 

2139 current_target = os.readlink(full_path) 

2140 blob_obj = repo_object_store[entry_sha] 

2141 expected_target = blob_obj.as_raw_string() 

2142 if isinstance(current_target, str): 

2143 current_target = current_target.encode() 

2144 return current_target == expected_target 

2145 except FileNotFoundError: 

2146 # Symlink doesn't exist 

2147 return False 

2148 except OSError as e: 

2149 if e.errno == errno.EINVAL: 

2150 # Not a symlink 

2151 return False 

2152 raise 

2153 

2154 

2155def _check_file_matches( 

2156 repo_object_store: "BaseObjectStore", 

2157 full_path: bytes, 

2158 entry_sha: bytes, 

2159 entry_mode: int, 

2160 current_stat: os.stat_result, 

2161 honor_filemode: bool, 

2162 blob_normalizer: Optional["FilterBlobNormalizer"] = None, 

2163 tree_path: bytes | None = None, 

2164) -> bool: 

2165 """Check if a file on disk matches the expected git object. 

2166 

2167 Returns True if file matches, False if it doesn't match. 

2168 """ 

2169 # Check mode first (if honor_filemode is True) 

2170 if honor_filemode: 

2171 current_mode = stat.S_IMODE(current_stat.st_mode) 

2172 expected_mode = stat.S_IMODE(entry_mode) 

2173 

2174 # For regular files, only check the user executable bit, not group/other permissions 

2175 # This matches Git's behavior where umask differences don't count as modifications 

2176 if stat.S_ISREG(current_stat.st_mode): 

2177 # Normalize regular file modes to ignore group/other write permissions 

2178 current_mode_normalized = ( 

2179 current_mode & 0o755 

2180 ) # Keep only user rwx and all read+execute 

2181 expected_mode_normalized = expected_mode & 0o755 

2182 

2183 # For Git compatibility, regular files should be either 644 or 755 

2184 if expected_mode_normalized not in (0o644, 0o755): 

2185 expected_mode_normalized = 0o644 # Default for regular files 

2186 if current_mode_normalized not in (0o644, 0o755): 

2187 # Determine if it should be executable based on user execute bit 

2188 if current_mode & 0o100: # User execute bit is set 

2189 current_mode_normalized = 0o755 

2190 else: 

2191 current_mode_normalized = 0o644 

2192 

2193 if current_mode_normalized != expected_mode_normalized: 

2194 return False 

2195 else: 

2196 # For non-regular files (symlinks, etc.), check mode exactly 

2197 if current_mode != expected_mode: 

2198 return False 

2199 

2200 # If mode matches (or we don't care), check content via size first 

2201 blob_obj = repo_object_store[entry_sha] 

2202 if current_stat.st_size != blob_obj.raw_length(): 

2203 return False 

2204 

2205 # Size matches, check actual content 

2206 try: 

2207 with open(full_path, "rb") as f: 

2208 current_content = f.read() 

2209 expected_content = blob_obj.as_raw_string() 

2210 if blob_normalizer and tree_path is not None: 

2211 assert isinstance(blob_obj, Blob) 

2212 normalized_blob = blob_normalizer.checkout_normalize( 

2213 blob_obj, tree_path 

2214 ) 

2215 expected_content = normalized_blob.as_raw_string() 

2216 return current_content == expected_content 

2217 except (FileNotFoundError, PermissionError, IsADirectoryError): 

2218 return False 

2219 

2220 

2221def _transition_to_submodule( 

2222 repo: "Repo", 

2223 path: bytes, 

2224 full_path: bytes, 

2225 current_stat: os.stat_result | None, 

2226 entry: IndexEntry | TreeEntry, 

2227 index: Index, 

2228) -> None: 

2229 """Transition any type to submodule.""" 

2230 from .submodule import ensure_submodule_placeholder 

2231 

2232 if current_stat is not None and stat.S_ISDIR(current_stat.st_mode): 

2233 # Already a directory, just ensure .git file exists 

2234 ensure_submodule_placeholder(repo, path) 

2235 else: 

2236 # Remove whatever is there and create submodule 

2237 if current_stat is not None: 

2238 _remove_file_with_readonly_handling(full_path) 

2239 ensure_submodule_placeholder(repo, path) 

2240 

2241 st = os.lstat(full_path) 

2242 assert entry.sha is not None 

2243 index[path] = index_entry_from_stat(st, entry.sha) 

2244 

2245 

2246def _transition_to_file( 

2247 object_store: "BaseObjectStore", 

2248 path: bytes, 

2249 full_path: bytes, 

2250 current_stat: os.stat_result | None, 

2251 entry: IndexEntry | TreeEntry, 

2252 index: Index, 

2253 honor_filemode: bool, 

2254 symlink_fn: Callable[ 

2255 [str | bytes | os.PathLike[str], str | bytes | os.PathLike[str]], None 

2256 ] 

2257 | None, 

2258 blob_normalizer: Optional["FilterBlobNormalizer"], 

2259 tree_encoding: str = "utf-8", 

2260) -> None: 

2261 """Transition any type to regular file or symlink.""" 

2262 assert entry.sha is not None and entry.mode is not None 

2263 # Check if we need to update 

2264 if ( 

2265 current_stat is not None 

2266 and stat.S_ISREG(current_stat.st_mode) 

2267 and not stat.S_ISLNK(entry.mode) 

2268 ): 

2269 # File to file - check if update needed 

2270 file_matches = _check_file_matches( 

2271 object_store, 

2272 full_path, 

2273 entry.sha, 

2274 entry.mode, 

2275 current_stat, 

2276 honor_filemode, 

2277 blob_normalizer, 

2278 path, 

2279 ) 

2280 needs_update = not file_matches 

2281 elif ( 

2282 current_stat is not None 

2283 and stat.S_ISLNK(current_stat.st_mode) 

2284 and stat.S_ISLNK(entry.mode) 

2285 ): 

2286 # Symlink to symlink - check if update needed 

2287 symlink_matches = _check_symlink_matches(full_path, object_store, entry.sha) 

2288 needs_update = not symlink_matches 

2289 else: 

2290 needs_update = True 

2291 

2292 if not needs_update: 

2293 # Just update index - current_stat should always be valid here since we're not updating 

2294 assert current_stat is not None 

2295 index[path] = index_entry_from_stat(current_stat, entry.sha) 

2296 return 

2297 

2298 # Remove existing entry if needed 

2299 if current_stat is not None and stat.S_ISDIR(current_stat.st_mode): 

2300 # Remove directory 

2301 dir_contents = set(os.listdir(full_path)) 

2302 git_file_name = b".git" if isinstance(full_path, bytes) else ".git" 

2303 

2304 if git_file_name in dir_contents: 

2305 if dir_contents != {git_file_name}: 

2306 raise IsADirectoryError( 

2307 f"Cannot replace submodule with untracked files: {full_path!r}" 

2308 ) 

2309 shutil.rmtree(full_path) 

2310 else: 

2311 try: 

2312 os.rmdir(full_path) 

2313 except OSError as e: 

2314 if e.errno == errno.ENOTEMPTY: 

2315 raise IsADirectoryError( 

2316 f"Cannot replace non-empty directory with file: {full_path!r}" 

2317 ) 

2318 raise 

2319 elif current_stat is not None: 

2320 _remove_file_with_readonly_handling(full_path) 

2321 

2322 # Ensure parent directory exists 

2323 _ensure_parent_dir_exists(full_path) 

2324 

2325 # Write the file 

2326 blob_obj = object_store[entry.sha] 

2327 assert isinstance(blob_obj, Blob) 

2328 if blob_normalizer: 

2329 blob_obj = blob_normalizer.checkout_normalize(blob_obj, path) 

2330 st = build_file_from_blob( 

2331 blob_obj, 

2332 entry.mode, 

2333 full_path, 

2334 honor_filemode=honor_filemode, 

2335 tree_encoding=tree_encoding, 

2336 symlink_fn=symlink_fn, 

2337 ) 

2338 index[path] = index_entry_from_stat(st, entry.sha) 

2339 

2340 

2341def _transition_to_absent( 

2342 repo: "Repo", 

2343 path: bytes, 

2344 full_path: bytes, 

2345 current_stat: os.stat_result | None, 

2346 index: Index, 

2347) -> None: 

2348 """Remove any type of entry.""" 

2349 if current_stat is None: 

2350 return 

2351 

2352 if stat.S_ISDIR(current_stat.st_mode): 

2353 # Check if it's a submodule directory 

2354 dir_contents = set(os.listdir(full_path)) 

2355 git_file_name = b".git" if isinstance(full_path, bytes) else ".git" 

2356 

2357 if git_file_name in dir_contents and dir_contents == {git_file_name}: 

2358 shutil.rmtree(full_path) 

2359 else: 

2360 try: 

2361 os.rmdir(full_path) 

2362 except OSError as e: 

2363 if e.errno not in (errno.ENOTEMPTY, errno.EEXIST): 

2364 raise 

2365 else: 

2366 _remove_file_with_readonly_handling(full_path) 

2367 

2368 try: 

2369 del index[path] 

2370 except KeyError: 

2371 pass 

2372 

2373 # Try to remove empty parent directories 

2374 _remove_empty_parents( 

2375 full_path, repo.path if isinstance(repo.path, bytes) else repo.path.encode() 

2376 ) 

2377 

2378 

2379def detect_case_only_renames( 

2380 changes: Sequence["TreeChange"], 

2381 config: "Config", 

2382) -> list["TreeChange"]: 

2383 """Detect and transform case-only renames in a list of tree changes. 

2384 

2385 This function identifies file renames that only differ in case (e.g., 

2386 README.txt -> readme.txt) and transforms matching ADD/DELETE pairs into 

2387 CHANGE_RENAME operations. It uses filesystem-appropriate path normalization 

2388 based on the repository configuration. 

2389 

2390 Args: 

2391 changes: List of TreeChange objects representing file changes 

2392 config: Repository configuration object 

2393 

2394 Returns: 

2395 New list of TreeChange objects with case-only renames converted to CHANGE_RENAME 

2396 """ 

2397 from .diff_tree import ( 

2398 CHANGE_ADD, 

2399 CHANGE_COPY, 

2400 CHANGE_DELETE, 

2401 CHANGE_MODIFY, 

2402 CHANGE_RENAME, 

2403 TreeChange, 

2404 ) 

2405 

2406 # Build dictionaries of old and new paths with their normalized forms 

2407 old_paths_normalized = {} 

2408 new_paths_normalized = {} 

2409 old_changes = {} # Map from old path to change object 

2410 new_changes = {} # Map from new path to change object 

2411 

2412 # Get the appropriate normalizer based on config 

2413 normalize_func = get_path_element_normalizer(config) 

2414 

2415 def normalize_path(path: bytes) -> bytes: 

2416 """Normalize entire path using element normalization.""" 

2417 return b"/".join(normalize_func(part) for part in path.split(b"/")) 

2418 

2419 # Pre-normalize all paths once to avoid repeated normalization 

2420 for change in changes: 

2421 if change.type == CHANGE_DELETE and change.old: 

2422 assert change.old.path is not None 

2423 try: 

2424 normalized = normalize_path(change.old.path) 

2425 except UnicodeDecodeError: 

2426 import logging 

2427 

2428 logging.warning( 

2429 "Skipping case-only rename detection for path with invalid UTF-8: %r", 

2430 change.old.path, 

2431 ) 

2432 else: 

2433 old_paths_normalized[normalized] = change.old.path 

2434 old_changes[change.old.path] = change 

2435 elif change.type == CHANGE_RENAME and change.old: 

2436 assert change.old.path is not None 

2437 # Treat RENAME as DELETE + ADD for case-only detection 

2438 try: 

2439 normalized = normalize_path(change.old.path) 

2440 except UnicodeDecodeError: 

2441 import logging 

2442 

2443 logging.warning( 

2444 "Skipping case-only rename detection for path with invalid UTF-8: %r", 

2445 change.old.path, 

2446 ) 

2447 else: 

2448 old_paths_normalized[normalized] = change.old.path 

2449 old_changes[change.old.path] = change 

2450 

2451 if ( 

2452 change.type in (CHANGE_ADD, CHANGE_MODIFY, CHANGE_RENAME, CHANGE_COPY) 

2453 and change.new 

2454 ): 

2455 assert change.new.path is not None 

2456 try: 

2457 normalized = normalize_path(change.new.path) 

2458 except UnicodeDecodeError: 

2459 import logging 

2460 

2461 logging.warning( 

2462 "Skipping case-only rename detection for path with invalid UTF-8: %r", 

2463 change.new.path, 

2464 ) 

2465 else: 

2466 new_paths_normalized[normalized] = change.new.path 

2467 new_changes[change.new.path] = change 

2468 

2469 # Find case-only renames and transform changes 

2470 case_only_renames = set() 

2471 new_rename_changes = [] 

2472 

2473 for norm_path, old_path in old_paths_normalized.items(): 

2474 if norm_path in new_paths_normalized: 

2475 new_path = new_paths_normalized[norm_path] 

2476 if old_path != new_path: 

2477 # Found a case-only rename 

2478 old_change = old_changes[old_path] 

2479 new_change = new_changes[new_path] 

2480 

2481 # Create a CHANGE_RENAME to replace the DELETE and ADD/MODIFY pair 

2482 if new_change.type == CHANGE_ADD: 

2483 # Simple case: DELETE + ADD becomes RENAME 

2484 rename_change = TreeChange( 

2485 CHANGE_RENAME, old_change.old, new_change.new 

2486 ) 

2487 else: 

2488 # Complex case: DELETE + MODIFY becomes RENAME 

2489 # Use the old file from DELETE and new file from MODIFY 

2490 rename_change = TreeChange( 

2491 CHANGE_RENAME, old_change.old, new_change.new 

2492 ) 

2493 

2494 new_rename_changes.append(rename_change) 

2495 

2496 # Mark the old changes for removal 

2497 case_only_renames.add(old_change) 

2498 case_only_renames.add(new_change) 

2499 

2500 # Return new list with original ADD/DELETE changes replaced by renames 

2501 result = [change for change in changes if change not in case_only_renames] 

2502 result.extend(new_rename_changes) 

2503 return result 

2504 

2505 

2506def update_working_tree( 

2507 repo: "Repo", 

2508 old_tree_id: bytes | None, 

2509 new_tree_id: bytes, 

2510 change_iterator: Iterator["TreeChange"], 

2511 honor_filemode: bool = True, 

2512 validate_path_element: Callable[[bytes], bool] | None = None, 

2513 symlink_fn: Callable[ 

2514 [str | bytes | os.PathLike[str], str | bytes | os.PathLike[str]], None 

2515 ] 

2516 | None = None, 

2517 force_remove_untracked: bool = False, 

2518 blob_normalizer: Optional["FilterBlobNormalizer"] = None, 

2519 tree_encoding: str = "utf-8", 

2520 allow_overwrite_modified: bool = False, 

2521) -> None: 

2522 """Update the working tree and index to match a new tree. 

2523 

2524 This function handles: 

2525 - Adding new files 

2526 - Updating modified files 

2527 - Removing deleted files 

2528 - Cleaning up empty directories 

2529 

2530 Args: 

2531 repo: Repository object 

2532 old_tree_id: SHA of the tree before the update 

2533 new_tree_id: SHA of the tree to update to 

2534 change_iterator: Iterator of TreeChange objects to apply 

2535 honor_filemode: An optional flag to honor core.filemode setting 

2536 validate_path_element: Function to validate path elements to check out 

2537 symlink_fn: Function to use for creating symlinks 

2538 force_remove_untracked: If True, remove files that exist in working 

2539 directory but not in target tree, even if old_tree_id is None 

2540 blob_normalizer: An optional BlobNormalizer to use for converting line 

2541 endings when writing blobs to the working directory. 

2542 tree_encoding: Encoding used for tree paths (default: utf-8) 

2543 allow_overwrite_modified: If False, raise an error when attempting to 

2544 overwrite files that have been modified compared to old_tree_id 

2545 """ 

2546 if validate_path_element is None: 

2547 validate_path_element = validate_path_element_default 

2548 

2549 from .diff_tree import ( 

2550 CHANGE_ADD, 

2551 CHANGE_COPY, 

2552 CHANGE_DELETE, 

2553 CHANGE_MODIFY, 

2554 CHANGE_RENAME, 

2555 CHANGE_UNCHANGED, 

2556 ) 

2557 

2558 repo_path = repo.path if isinstance(repo.path, bytes) else repo.path.encode() 

2559 index = repo.open_index() 

2560 

2561 # Convert iterator to list since we need multiple passes 

2562 changes = list(change_iterator) 

2563 

2564 # Transform case-only renames on case-insensitive filesystems 

2565 import platform 

2566 

2567 default_ignore_case = platform.system() in ("Windows", "Darwin") 

2568 config = repo.get_config() 

2569 ignore_case = config.get_boolean((b"core",), b"ignorecase", default_ignore_case) 

2570 

2571 if ignore_case: 

2572 config = repo.get_config() 

2573 changes = detect_case_only_renames(changes, config) 

2574 

2575 # Check for path conflicts where files need to become directories 

2576 paths_becoming_dirs = set() 

2577 for change in changes: 

2578 if change.type in (CHANGE_ADD, CHANGE_MODIFY, CHANGE_RENAME, CHANGE_COPY): 

2579 assert change.new is not None 

2580 path = change.new.path 

2581 assert path is not None 

2582 if b"/" in path: # This is a file inside a directory 

2583 # Check if any parent path exists as a file in the old tree or changes 

2584 parts = path.split(b"/") 

2585 for i in range(1, len(parts)): 

2586 parent = b"/".join(parts[:i]) 

2587 # See if this parent path is being deleted (was a file, becoming a dir) 

2588 for other_change in changes: 

2589 if ( 

2590 other_change.type == CHANGE_DELETE 

2591 and other_change.old 

2592 and other_change.old.path == parent 

2593 ): 

2594 paths_becoming_dirs.add(parent) 

2595 

2596 # Check if any path that needs to become a directory has been modified 

2597 for path in paths_becoming_dirs: 

2598 full_path = _tree_to_fs_path(repo_path, path, tree_encoding) 

2599 try: 

2600 current_stat = os.lstat(full_path) 

2601 except FileNotFoundError: 

2602 continue # File doesn't exist, nothing to check 

2603 except OSError as e: 

2604 raise OSError( 

2605 f"Cannot access {path.decode('utf-8', errors='replace')}: {e}" 

2606 ) from e 

2607 

2608 if stat.S_ISREG(current_stat.st_mode): 

2609 # Find the old entry for this path 

2610 old_change = None 

2611 for change in changes: 

2612 if ( 

2613 change.type == CHANGE_DELETE 

2614 and change.old 

2615 and change.old.path == path 

2616 ): 

2617 old_change = change 

2618 break 

2619 

2620 if old_change: 

2621 # Check if file has been modified 

2622 assert old_change.old is not None 

2623 assert ( 

2624 old_change.old.sha is not None and old_change.old.mode is not None 

2625 ) 

2626 file_matches = _check_file_matches( 

2627 repo.object_store, 

2628 full_path, 

2629 old_change.old.sha, 

2630 old_change.old.mode, 

2631 current_stat, 

2632 honor_filemode, 

2633 blob_normalizer, 

2634 path, 

2635 ) 

2636 if not file_matches: 

2637 raise OSError( 

2638 f"Cannot replace modified file with directory: {path!r}" 

2639 ) 

2640 

2641 # Check for uncommitted modifications before making any changes 

2642 if not allow_overwrite_modified and old_tree_id: 

2643 for change in changes: 

2644 # Only check files that are being modified or deleted 

2645 if change.type in (CHANGE_MODIFY, CHANGE_DELETE) and change.old: 

2646 path = change.old.path 

2647 assert path is not None 

2648 if path.startswith(b".git") or not validate_path( 

2649 path, validate_path_element 

2650 ): 

2651 continue 

2652 

2653 full_path = _tree_to_fs_path(repo_path, path, tree_encoding) 

2654 try: 

2655 current_stat = os.lstat(full_path) 

2656 except FileNotFoundError: 

2657 continue # File doesn't exist, nothing to check 

2658 except OSError as e: 

2659 raise OSError( 

2660 f"Cannot access {path.decode('utf-8', errors='replace')}: {e}" 

2661 ) from e 

2662 

2663 if stat.S_ISREG(current_stat.st_mode): 

2664 # Check if working tree file differs from old tree 

2665 assert change.old.sha is not None and change.old.mode is not None 

2666 file_matches = _check_file_matches( 

2667 repo.object_store, 

2668 full_path, 

2669 change.old.sha, 

2670 change.old.mode, 

2671 current_stat, 

2672 honor_filemode, 

2673 blob_normalizer, 

2674 path, 

2675 ) 

2676 if not file_matches: 

2677 from .errors import WorkingTreeModifiedError 

2678 

2679 raise WorkingTreeModifiedError( 

2680 f"Your local changes to '{path.decode('utf-8', errors='replace')}' " 

2681 f"would be overwritten by checkout. " 

2682 f"Please commit your changes or stash them before you switch branches." 

2683 ) 

2684 

2685 # Apply the changes 

2686 for change in changes: 

2687 if change.type in (CHANGE_DELETE, CHANGE_RENAME): 

2688 # Remove file/directory 

2689 assert change.old is not None and change.old.path is not None 

2690 path = change.old.path 

2691 if path.startswith(b".git") or not validate_path( 

2692 path, validate_path_element 

2693 ): 

2694 continue 

2695 

2696 full_path = _tree_to_fs_path(repo_path, path, tree_encoding) 

2697 try: 

2698 delete_stat: os.stat_result | None = os.lstat(full_path) 

2699 except FileNotFoundError: 

2700 delete_stat = None 

2701 except OSError as e: 

2702 raise OSError( 

2703 f"Cannot access {path.decode('utf-8', errors='replace')}: {e}" 

2704 ) from e 

2705 

2706 _transition_to_absent(repo, path, full_path, delete_stat, index) 

2707 

2708 if change.type in ( 

2709 CHANGE_ADD, 

2710 CHANGE_MODIFY, 

2711 CHANGE_UNCHANGED, 

2712 CHANGE_COPY, 

2713 CHANGE_RENAME, 

2714 ): 

2715 # Add or modify file 

2716 assert ( 

2717 change.new is not None 

2718 and change.new.path is not None 

2719 and change.new.mode is not None 

2720 ) 

2721 path = change.new.path 

2722 if path.startswith(b".git") or not validate_path( 

2723 path, validate_path_element 

2724 ): 

2725 continue 

2726 

2727 full_path = _tree_to_fs_path(repo_path, path, tree_encoding) 

2728 try: 

2729 modify_stat: os.stat_result | None = os.lstat(full_path) 

2730 except FileNotFoundError: 

2731 modify_stat = None 

2732 except OSError as e: 

2733 raise OSError( 

2734 f"Cannot access {path.decode('utf-8', errors='replace')}: {e}" 

2735 ) from e 

2736 

2737 if S_ISGITLINK(change.new.mode): 

2738 _transition_to_submodule( 

2739 repo, path, full_path, modify_stat, change.new, index 

2740 ) 

2741 else: 

2742 _transition_to_file( 

2743 repo.object_store, 

2744 path, 

2745 full_path, 

2746 modify_stat, 

2747 change.new, 

2748 index, 

2749 honor_filemode, 

2750 symlink_fn, 

2751 blob_normalizer, 

2752 tree_encoding, 

2753 ) 

2754 

2755 index.write() 

2756 

2757 

2758def _check_entry_for_changes( 

2759 tree_path: bytes, 

2760 entry: IndexEntry | ConflictedIndexEntry, 

2761 root_path: bytes, 

2762 filter_blob_callback: Callable[[bytes, bytes], bytes] | None = None, 

2763) -> bytes | None: 

2764 """Check a single index entry for changes. 

2765 

2766 Args: 

2767 tree_path: Path in the tree 

2768 entry: Index entry to check 

2769 root_path: Root filesystem path 

2770 filter_blob_callback: Optional callback to filter blobs 

2771 Returns: tree_path if changed, None otherwise 

2772 """ 

2773 if isinstance(entry, ConflictedIndexEntry): 

2774 # Conflicted files are always unstaged 

2775 return tree_path 

2776 

2777 full_path = _tree_to_fs_path(root_path, tree_path) 

2778 try: 

2779 st = os.lstat(full_path) 

2780 if stat.S_ISDIR(st.st_mode): 

2781 if _has_directory_changed(tree_path, entry): 

2782 return tree_path 

2783 return None 

2784 

2785 if not stat.S_ISREG(st.st_mode) and not stat.S_ISLNK(st.st_mode): 

2786 return None 

2787 

2788 blob = blob_from_path_and_stat(full_path, st) 

2789 

2790 if filter_blob_callback is not None: 

2791 blob.data = filter_blob_callback(blob.data, tree_path) 

2792 except FileNotFoundError: 

2793 # The file was removed, so we assume that counts as 

2794 # different from whatever file used to exist. 

2795 return tree_path 

2796 else: 

2797 if blob.id != entry.sha: 

2798 return tree_path 

2799 return None 

2800 

2801 

2802def get_unstaged_changes( 

2803 index: Index, 

2804 root_path: str | bytes, 

2805 filter_blob_callback: Callable[..., Any] | None = None, 

2806 preload_index: bool = False, 

2807) -> Generator[bytes, None, None]: 

2808 """Walk through an index and check for differences against working tree. 

2809 

2810 Args: 

2811 index: index to check 

2812 root_path: path in which to find files 

2813 filter_blob_callback: Optional callback to filter blobs 

2814 preload_index: If True, use parallel threads to check files (requires threading support) 

2815 Returns: iterator over paths with unstaged changes 

2816 """ 

2817 # For each entry in the index check the sha1 & ensure not staged 

2818 if not isinstance(root_path, bytes): 

2819 root_path = os.fsencode(root_path) 

2820 

2821 if preload_index: 

2822 # Use parallel processing for better performance on slow filesystems 

2823 try: 

2824 import multiprocessing 

2825 from concurrent.futures import ThreadPoolExecutor 

2826 except ImportError: 

2827 # If threading is not available, fall back to serial processing 

2828 preload_index = False 

2829 else: 

2830 # Collect all entries first 

2831 entries = list(index.iteritems()) 

2832 

2833 # Use number of CPUs but cap at 8 threads to avoid overhead 

2834 num_workers = min(multiprocessing.cpu_count(), 8) 

2835 

2836 # Process entries in parallel 

2837 with ThreadPoolExecutor(max_workers=num_workers) as executor: 

2838 # Submit all tasks 

2839 futures = [ 

2840 executor.submit( 

2841 _check_entry_for_changes, 

2842 tree_path, 

2843 entry, 

2844 root_path, 

2845 filter_blob_callback, 

2846 ) 

2847 for tree_path, entry in entries 

2848 ] 

2849 

2850 # Yield results as they complete 

2851 for future in futures: 

2852 result = future.result() 

2853 if result is not None: 

2854 yield result 

2855 

2856 if not preload_index: 

2857 # Serial processing 

2858 for tree_path, entry in index.iteritems(): 

2859 result = _check_entry_for_changes( 

2860 tree_path, entry, root_path, filter_blob_callback 

2861 ) 

2862 if result is not None: 

2863 yield result 

2864 

2865 

2866def _tree_to_fs_path( 

2867 root_path: bytes, tree_path: bytes, tree_encoding: str = "utf-8" 

2868) -> bytes: 

2869 """Convert a git tree path to a file system path. 

2870 

2871 Args: 

2872 root_path: Root filesystem path 

2873 tree_path: Git tree path as bytes (encoded with tree_encoding) 

2874 tree_encoding: Encoding used for tree paths (default: utf-8) 

2875 

2876 Returns: File system path. 

2877 """ 

2878 assert isinstance(tree_path, bytes) 

2879 if os_sep_bytes != b"/": 

2880 sep_corrected_path = tree_path.replace(b"/", os_sep_bytes) 

2881 else: 

2882 sep_corrected_path = tree_path 

2883 

2884 # On Windows, we need to handle tree path encoding properly 

2885 if sys.platform == "win32": 

2886 # Decode from tree encoding, then re-encode for filesystem 

2887 try: 

2888 tree_path_str = sep_corrected_path.decode(tree_encoding) 

2889 sep_corrected_path = os.fsencode(tree_path_str) 

2890 except UnicodeDecodeError: 

2891 # If decoding fails, use the original bytes 

2892 pass 

2893 

2894 return os.path.join(root_path, sep_corrected_path) 

2895 

2896 

2897def _fs_to_tree_path(fs_path: str | bytes, tree_encoding: str = "utf-8") -> bytes: 

2898 """Convert a file system path to a git tree path. 

2899 

2900 Args: 

2901 fs_path: File system path. 

2902 tree_encoding: Encoding to use for tree paths (default: utf-8) 

2903 

2904 Returns: Git tree path as bytes (encoded with tree_encoding) 

2905 """ 

2906 if not isinstance(fs_path, bytes): 

2907 fs_path_bytes = os.fsencode(fs_path) 

2908 else: 

2909 fs_path_bytes = fs_path 

2910 

2911 # On Windows, we need to ensure tree paths are properly encoded 

2912 if sys.platform == "win32": 

2913 try: 

2914 # Decode from filesystem encoding, then re-encode with tree encoding 

2915 fs_path_str = os.fsdecode(fs_path_bytes) 

2916 fs_path_bytes = fs_path_str.encode(tree_encoding) 

2917 except UnicodeDecodeError: 

2918 # If filesystem decoding fails, use the original bytes 

2919 pass 

2920 

2921 if os_sep_bytes != b"/": 

2922 tree_path = fs_path_bytes.replace(os_sep_bytes, b"/") 

2923 else: 

2924 tree_path = fs_path_bytes 

2925 return tree_path 

2926 

2927 

2928def index_entry_from_directory(st: os.stat_result, path: bytes) -> IndexEntry | None: 

2929 """Create an index entry for a directory. 

2930 

2931 This is only used for submodules (directories containing .git). 

2932 

2933 Args: 

2934 st: Stat result for the directory 

2935 path: Path to the directory 

2936 

2937 Returns: 

2938 IndexEntry for a submodule, or None if not a submodule 

2939 """ 

2940 if os.path.exists(os.path.join(path, b".git")): 

2941 head = read_submodule_head(path) 

2942 if head is None: 

2943 return None 

2944 return index_entry_from_stat(st, head, mode=S_IFGITLINK) 

2945 return None 

2946 

2947 

2948def index_entry_from_path( 

2949 path: bytes, object_store: ObjectContainer | None = None 

2950) -> IndexEntry | None: 

2951 """Create an index from a filesystem path. 

2952 

2953 This returns an index value for files, symlinks 

2954 and tree references. for directories and 

2955 non-existent files it returns None 

2956 

2957 Args: 

2958 path: Path to create an index entry for 

2959 object_store: Optional object store to 

2960 save new blobs in 

2961 Returns: An index entry; None for directories 

2962 """ 

2963 assert isinstance(path, bytes) 

2964 st = os.lstat(path) 

2965 if stat.S_ISDIR(st.st_mode): 

2966 return index_entry_from_directory(st, path) 

2967 

2968 if stat.S_ISREG(st.st_mode) or stat.S_ISLNK(st.st_mode): 

2969 blob = blob_from_path_and_stat(path, st) 

2970 if object_store is not None: 

2971 object_store.add_object(blob) 

2972 return index_entry_from_stat(st, blob.id) 

2973 

2974 return None 

2975 

2976 

2977def iter_fresh_entries( 

2978 paths: Iterable[bytes], 

2979 root_path: bytes, 

2980 object_store: ObjectContainer | None = None, 

2981) -> Iterator[tuple[bytes, IndexEntry | None]]: 

2982 """Iterate over current versions of index entries on disk. 

2983 

2984 Args: 

2985 paths: Paths to iterate over 

2986 root_path: Root path to access from 

2987 object_store: Optional store to save new blobs in 

2988 Returns: Iterator over path, index_entry 

2989 """ 

2990 for path in paths: 

2991 p = _tree_to_fs_path(root_path, path) 

2992 try: 

2993 entry = index_entry_from_path(p, object_store=object_store) 

2994 except (FileNotFoundError, IsADirectoryError): 

2995 entry = None 

2996 yield path, entry 

2997 

2998 

2999def iter_fresh_objects( 

3000 paths: Iterable[bytes], 

3001 root_path: bytes, 

3002 include_deleted: bool = False, 

3003 object_store: ObjectContainer | None = None, 

3004) -> Iterator[tuple[bytes, bytes | None, int | None]]: 

3005 """Iterate over versions of objects on disk referenced by index. 

3006 

3007 Args: 

3008 paths: Paths to check 

3009 root_path: Root path to access from 

3010 include_deleted: Include deleted entries with sha and 

3011 mode set to None 

3012 object_store: Optional object store to report new items to 

3013 Returns: Iterator over path, sha, mode 

3014 """ 

3015 for path, entry in iter_fresh_entries(paths, root_path, object_store=object_store): 

3016 if entry is None: 

3017 if include_deleted: 

3018 yield path, None, None 

3019 else: 

3020 yield path, entry.sha, cleanup_mode(entry.mode) 

3021 

3022 

3023def refresh_index(index: Index, root_path: bytes) -> None: 

3024 """Refresh the contents of an index. 

3025 

3026 This is the equivalent to running 'git commit -a'. 

3027 

3028 Args: 

3029 index: Index to update 

3030 root_path: Root filesystem path 

3031 """ 

3032 for path, entry in iter_fresh_entries(index, root_path): 

3033 if entry: 

3034 index[path] = entry 

3035 

3036 

3037class locked_index: 

3038 """Lock the index while making modifications. 

3039 

3040 Works as a context manager. 

3041 """ 

3042 

3043 _file: "_GitFile" 

3044 

3045 def __init__(self, path: bytes | str) -> None: 

3046 """Initialize locked_index.""" 

3047 self._path = path 

3048 

3049 def __enter__(self) -> Index: 

3050 """Enter context manager and lock index.""" 

3051 f = GitFile(self._path, "wb") 

3052 self._file = f 

3053 self._index = Index(self._path) 

3054 return self._index 

3055 

3056 def __exit__( 

3057 self, 

3058 exc_type: type | None, 

3059 exc_value: BaseException | None, 

3060 traceback: types.TracebackType | None, 

3061 ) -> None: 

3062 """Exit context manager and unlock index.""" 

3063 if exc_type is not None: 

3064 self._file.abort() 

3065 return 

3066 try: 

3067 f = SHA1Writer(self._file) 

3068 write_index_dict(f, self._index._byname) 

3069 except BaseException: 

3070 self._file.abort() 

3071 else: 

3072 f.close()