Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/dulwich/index.py: 33%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

1069 statements  

1# index.py -- File parser/writer for the git index file 

2# Copyright (C) 2008-2013 Jelmer Vernooij <jelmer@jelmer.uk> 

3# 

4# SPDX-License-Identifier: Apache-2.0 OR GPL-2.0-or-later 

5# Dulwich is dual-licensed under the Apache License, Version 2.0 and the GNU 

6# General Public License as published by the Free Software Foundation; version 2.0 

7# or (at your option) any later version. You can redistribute it and/or 

8# modify it under the terms of either of these two licenses. 

9# 

10# Unless required by applicable law or agreed to in writing, software 

11# distributed under the License is distributed on an "AS IS" BASIS, 

12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

13# See the License for the specific language governing permissions and 

14# limitations under the License. 

15# 

16# You should have received a copy of the licenses; if not, see 

17# <http://www.gnu.org/licenses/> for a copy of the GNU General Public License 

18# and <http://www.apache.org/licenses/LICENSE-2.0> for a copy of the Apache 

19# License, Version 2.0. 

20# 

21 

22"""Parser for the git index file format.""" 

23 

24import errno 

25import os 

26import shutil 

27import stat 

28import struct 

29import sys 

30import types 

31from collections.abc import Generator, Iterable, Iterator 

32from dataclasses import dataclass 

33from enum import Enum 

34from typing import ( 

35 IO, 

36 TYPE_CHECKING, 

37 Any, 

38 BinaryIO, 

39 Callable, 

40 Optional, 

41 Union, 

42) 

43 

44if TYPE_CHECKING: 

45 from .config import Config 

46 from .diff_tree import TreeChange 

47 from .file import _GitFile 

48 from .line_ending import BlobNormalizer 

49 from .object_store import BaseObjectStore 

50 from .repo import Repo 

51 

52from .file import GitFile 

53from .object_store import iter_tree_contents 

54from .objects import ( 

55 S_IFGITLINK, 

56 S_ISGITLINK, 

57 Blob, 

58 ObjectID, 

59 Tree, 

60 hex_to_sha, 

61 sha_to_hex, 

62) 

63from .pack import ObjectContainer, SHA1Reader, SHA1Writer 

64 

65# 2-bit stage (during merge) 

66FLAG_STAGEMASK = 0x3000 

67FLAG_STAGESHIFT = 12 

68FLAG_NAMEMASK = 0x0FFF 

69 

70# assume-valid 

71FLAG_VALID = 0x8000 

72 

73# extended flag (must be zero in version 2) 

74FLAG_EXTENDED = 0x4000 

75 

76# used by sparse checkout 

77EXTENDED_FLAG_SKIP_WORKTREE = 0x4000 

78 

79# used by "git add -N" 

80EXTENDED_FLAG_INTEND_TO_ADD = 0x2000 

81 

82DEFAULT_VERSION = 2 

83 

84# Index extension signatures 

85TREE_EXTENSION = b"TREE" 

86REUC_EXTENSION = b"REUC" 

87UNTR_EXTENSION = b"UNTR" 

88EOIE_EXTENSION = b"EOIE" 

89IEOT_EXTENSION = b"IEOT" 

90 

91 

92def _encode_varint(value: int) -> bytes: 

93 """Encode an integer using variable-width encoding. 

94 

95 Same format as used for OFS_DELTA pack entries and index v4 path compression. 

96 Uses 7 bits per byte, with the high bit indicating continuation. 

97 

98 Args: 

99 value: Integer to encode 

100 Returns: 

101 Encoded bytes 

102 """ 

103 if value == 0: 

104 return b"\x00" 

105 

106 result = [] 

107 while value > 0: 

108 byte = value & 0x7F # Take lower 7 bits 

109 value >>= 7 

110 if value > 0: 

111 byte |= 0x80 # Set continuation bit 

112 result.append(byte) 

113 

114 return bytes(result) 

115 

116 

117def _decode_varint(data: bytes, offset: int = 0) -> tuple[int, int]: 

118 """Decode a variable-width encoded integer. 

119 

120 Args: 

121 data: Bytes to decode from 

122 offset: Starting offset in data 

123 Returns: 

124 tuple of (decoded_value, new_offset) 

125 """ 

126 value = 0 

127 shift = 0 

128 pos = offset 

129 

130 while pos < len(data): 

131 byte = data[pos] 

132 pos += 1 

133 value |= (byte & 0x7F) << shift 

134 shift += 7 

135 if not (byte & 0x80): # No continuation bit 

136 break 

137 

138 return value, pos 

139 

140 

141def _compress_path(path: bytes, previous_path: bytes) -> bytes: 

142 """Compress a path relative to the previous path for index version 4. 

143 

144 Args: 

145 path: Path to compress 

146 previous_path: Previous path for comparison 

147 Returns: 

148 Compressed path data (varint prefix_len + suffix) 

149 """ 

150 # Find the common prefix length 

151 common_len = 0 

152 min_len = min(len(path), len(previous_path)) 

153 

154 for i in range(min_len): 

155 if path[i] == previous_path[i]: 

156 common_len += 1 

157 else: 

158 break 

159 

160 # The number of bytes to remove from the end of previous_path 

161 # to get the common prefix 

162 remove_len = len(previous_path) - common_len 

163 

164 # The suffix to append 

165 suffix = path[common_len:] 

166 

167 # Encode: varint(remove_len) + suffix + NUL 

168 return _encode_varint(remove_len) + suffix + b"\x00" 

169 

170 

171def _decompress_path( 

172 data: bytes, offset: int, previous_path: bytes 

173) -> tuple[bytes, int]: 

174 """Decompress a path from index version 4 compressed format. 

175 

176 Args: 

177 data: Raw data containing compressed path 

178 offset: Starting offset in data 

179 previous_path: Previous path for decompression 

180 Returns: 

181 tuple of (decompressed_path, new_offset) 

182 """ 

183 # Decode the number of bytes to remove from previous path 

184 remove_len, new_offset = _decode_varint(data, offset) 

185 

186 # Find the NUL terminator for the suffix 

187 suffix_start = new_offset 

188 suffix_end = suffix_start 

189 while suffix_end < len(data) and data[suffix_end] != 0: 

190 suffix_end += 1 

191 

192 if suffix_end >= len(data): 

193 raise ValueError("Unterminated path suffix in compressed entry") 

194 

195 suffix = data[suffix_start:suffix_end] 

196 new_offset = suffix_end + 1 # Skip the NUL terminator 

197 

198 # Reconstruct the path 

199 if remove_len > len(previous_path): 

200 raise ValueError( 

201 f"Invalid path compression: trying to remove {remove_len} bytes from {len(previous_path)}-byte path" 

202 ) 

203 

204 prefix = previous_path[:-remove_len] if remove_len > 0 else previous_path 

205 path = prefix + suffix 

206 

207 return path, new_offset 

208 

209 

210def _decompress_path_from_stream( 

211 f: BinaryIO, previous_path: bytes 

212) -> tuple[bytes, int]: 

213 """Decompress a path from index version 4 compressed format, reading from stream. 

214 

215 Args: 

216 f: File-like object to read from 

217 previous_path: Previous path for decompression 

218 Returns: 

219 tuple of (decompressed_path, bytes_consumed) 

220 """ 

221 # Decode the varint for remove_len by reading byte by byte 

222 remove_len = 0 

223 shift = 0 

224 bytes_consumed = 0 

225 

226 while True: 

227 byte_data = f.read(1) 

228 if not byte_data: 

229 raise ValueError("Unexpected end of file while reading varint") 

230 byte = byte_data[0] 

231 bytes_consumed += 1 

232 remove_len |= (byte & 0x7F) << shift 

233 shift += 7 

234 if not (byte & 0x80): # No continuation bit 

235 break 

236 

237 # Read the suffix until NUL terminator 

238 suffix = b"" 

239 while True: 

240 byte_data = f.read(1) 

241 if not byte_data: 

242 raise ValueError("Unexpected end of file while reading path suffix") 

243 byte = byte_data[0] 

244 bytes_consumed += 1 

245 if byte == 0: # NUL terminator 

246 break 

247 suffix += bytes([byte]) 

248 

249 # Reconstruct the path 

250 if remove_len > len(previous_path): 

251 raise ValueError( 

252 f"Invalid path compression: trying to remove {remove_len} bytes from {len(previous_path)}-byte path" 

253 ) 

254 

255 prefix = previous_path[:-remove_len] if remove_len > 0 else previous_path 

256 path = prefix + suffix 

257 

258 return path, bytes_consumed 

259 

260 

261class Stage(Enum): 

262 """Represents the stage of an index entry during merge conflicts.""" 

263 

264 NORMAL = 0 

265 MERGE_CONFLICT_ANCESTOR = 1 

266 MERGE_CONFLICT_THIS = 2 

267 MERGE_CONFLICT_OTHER = 3 

268 

269 

270@dataclass 

271class SerializedIndexEntry: 

272 """Represents a serialized index entry as stored in the index file. 

273 

274 This dataclass holds the raw data for an index entry before it's 

275 parsed into the more user-friendly IndexEntry format. 

276 """ 

277 

278 name: bytes 

279 ctime: Union[int, float, tuple[int, int]] 

280 mtime: Union[int, float, tuple[int, int]] 

281 dev: int 

282 ino: int 

283 mode: int 

284 uid: int 

285 gid: int 

286 size: int 

287 sha: bytes 

288 flags: int 

289 extended_flags: int 

290 

291 def stage(self) -> Stage: 

292 """Extract the stage from the flags field. 

293 

294 Returns: 

295 Stage enum value indicating merge conflict state 

296 """ 

297 return Stage((self.flags & FLAG_STAGEMASK) >> FLAG_STAGESHIFT) 

298 

299 

300@dataclass 

301class IndexExtension: 

302 """Base class for index extensions.""" 

303 

304 signature: bytes 

305 data: bytes 

306 

307 @classmethod 

308 def from_raw(cls, signature: bytes, data: bytes) -> "IndexExtension": 

309 """Create an extension from raw data. 

310 

311 Args: 

312 signature: 4-byte extension signature 

313 data: Extension data 

314 Returns: 

315 Parsed extension object 

316 """ 

317 if signature == TREE_EXTENSION: 

318 return TreeExtension.from_bytes(data) 

319 elif signature == REUC_EXTENSION: 

320 return ResolveUndoExtension.from_bytes(data) 

321 elif signature == UNTR_EXTENSION: 

322 return UntrackedExtension.from_bytes(data) 

323 else: 

324 # Unknown extension - just store raw data 

325 return cls(signature, data) 

326 

327 def to_bytes(self) -> bytes: 

328 """Serialize extension to bytes.""" 

329 return self.data 

330 

331 

332class TreeExtension(IndexExtension): 

333 """Tree cache extension.""" 

334 

335 def __init__(self, entries: list[tuple[bytes, bytes, int]]) -> None: 

336 """Initialize TreeExtension. 

337 

338 Args: 

339 entries: List of tree cache entries (path, sha, flags) 

340 """ 

341 self.entries = entries 

342 super().__init__(TREE_EXTENSION, b"") 

343 

344 @classmethod 

345 def from_bytes(cls, data: bytes) -> "TreeExtension": 

346 """Parse TreeExtension from bytes. 

347 

348 Args: 

349 data: Raw bytes to parse 

350 

351 Returns: 

352 TreeExtension instance 

353 """ 

354 # TODO: Implement tree cache parsing 

355 return cls([]) 

356 

357 def to_bytes(self) -> bytes: 

358 """Serialize TreeExtension to bytes. 

359 

360 Returns: 

361 Serialized extension data 

362 """ 

363 # TODO: Implement tree cache serialization 

364 return b"" 

365 

366 

367class ResolveUndoExtension(IndexExtension): 

368 """Resolve undo extension for recording merge conflicts.""" 

369 

370 def __init__(self, entries: list[tuple[bytes, list[tuple[int, bytes]]]]) -> None: 

371 """Initialize ResolveUndoExtension. 

372 

373 Args: 

374 entries: List of (path, stages) where stages is a list of (stage, sha) tuples 

375 """ 

376 self.entries = entries 

377 super().__init__(REUC_EXTENSION, b"") 

378 

379 @classmethod 

380 def from_bytes(cls, data: bytes) -> "ResolveUndoExtension": 

381 """Parse ResolveUndoExtension from bytes. 

382 

383 Args: 

384 data: Raw bytes to parse 

385 

386 Returns: 

387 ResolveUndoExtension instance 

388 """ 

389 # TODO: Implement resolve undo parsing 

390 return cls([]) 

391 

392 def to_bytes(self) -> bytes: 

393 """Serialize ResolveUndoExtension to bytes. 

394 

395 Returns: 

396 Serialized extension data 

397 """ 

398 # TODO: Implement resolve undo serialization 

399 return b"" 

400 

401 

402class UntrackedExtension(IndexExtension): 

403 """Untracked cache extension.""" 

404 

405 def __init__(self, data: bytes) -> None: 

406 """Initialize UntrackedExtension. 

407 

408 Args: 

409 data: Raw untracked cache data 

410 """ 

411 super().__init__(UNTR_EXTENSION, data) 

412 

413 @classmethod 

414 def from_bytes(cls, data: bytes) -> "UntrackedExtension": 

415 """Parse UntrackedExtension from bytes. 

416 

417 Args: 

418 data: Raw bytes to parse 

419 

420 Returns: 

421 UntrackedExtension instance 

422 """ 

423 return cls(data) 

424 

425 

426@dataclass 

427class IndexEntry: 

428 """Represents an entry in the Git index. 

429 

430 This is a higher-level representation of an index entry that includes 

431 parsed data and convenience methods. 

432 """ 

433 

434 ctime: Union[int, float, tuple[int, int]] 

435 mtime: Union[int, float, tuple[int, int]] 

436 dev: int 

437 ino: int 

438 mode: int 

439 uid: int 

440 gid: int 

441 size: int 

442 sha: bytes 

443 flags: int = 0 

444 extended_flags: int = 0 

445 

446 @classmethod 

447 def from_serialized(cls, serialized: SerializedIndexEntry) -> "IndexEntry": 

448 """Create an IndexEntry from a SerializedIndexEntry. 

449 

450 Args: 

451 serialized: SerializedIndexEntry to convert 

452 

453 Returns: 

454 New IndexEntry instance 

455 """ 

456 return cls( 

457 ctime=serialized.ctime, 

458 mtime=serialized.mtime, 

459 dev=serialized.dev, 

460 ino=serialized.ino, 

461 mode=serialized.mode, 

462 uid=serialized.uid, 

463 gid=serialized.gid, 

464 size=serialized.size, 

465 sha=serialized.sha, 

466 flags=serialized.flags, 

467 extended_flags=serialized.extended_flags, 

468 ) 

469 

470 def serialize(self, name: bytes, stage: Stage) -> SerializedIndexEntry: 

471 """Serialize this entry with a given name and stage. 

472 

473 Args: 

474 name: Path name for the entry 

475 stage: Merge conflict stage 

476 

477 Returns: 

478 SerializedIndexEntry ready for writing to disk 

479 """ 

480 # Clear out any existing stage bits, then set them from the Stage. 

481 new_flags = self.flags & ~FLAG_STAGEMASK 

482 new_flags |= stage.value << FLAG_STAGESHIFT 

483 return SerializedIndexEntry( 

484 name=name, 

485 ctime=self.ctime, 

486 mtime=self.mtime, 

487 dev=self.dev, 

488 ino=self.ino, 

489 mode=self.mode, 

490 uid=self.uid, 

491 gid=self.gid, 

492 size=self.size, 

493 sha=self.sha, 

494 flags=new_flags, 

495 extended_flags=self.extended_flags, 

496 ) 

497 

498 def stage(self) -> Stage: 

499 """Get the merge conflict stage of this entry. 

500 

501 Returns: 

502 Stage enum value 

503 """ 

504 return Stage((self.flags & FLAG_STAGEMASK) >> FLAG_STAGESHIFT) 

505 

506 @property 

507 def skip_worktree(self) -> bool: 

508 """Return True if the skip-worktree bit is set in extended_flags.""" 

509 return bool(self.extended_flags & EXTENDED_FLAG_SKIP_WORKTREE) 

510 

511 def set_skip_worktree(self, skip: bool = True) -> None: 

512 """Helper method to set or clear the skip-worktree bit in extended_flags. 

513 

514 Also sets FLAG_EXTENDED in self.flags if needed. 

515 """ 

516 if skip: 

517 # Turn on the skip-worktree bit 

518 self.extended_flags |= EXTENDED_FLAG_SKIP_WORKTREE 

519 # Also ensure the main 'extended' bit is set in flags 

520 self.flags |= FLAG_EXTENDED 

521 else: 

522 # Turn off the skip-worktree bit 

523 self.extended_flags &= ~EXTENDED_FLAG_SKIP_WORKTREE 

524 # Optionally unset the main extended bit if no extended flags remain 

525 if self.extended_flags == 0: 

526 self.flags &= ~FLAG_EXTENDED 

527 

528 

529class ConflictedIndexEntry: 

530 """Index entry that represents a conflict.""" 

531 

532 ancestor: Optional[IndexEntry] 

533 this: Optional[IndexEntry] 

534 other: Optional[IndexEntry] 

535 

536 def __init__( 

537 self, 

538 ancestor: Optional[IndexEntry] = None, 

539 this: Optional[IndexEntry] = None, 

540 other: Optional[IndexEntry] = None, 

541 ) -> None: 

542 """Initialize ConflictedIndexEntry. 

543 

544 Args: 

545 ancestor: The common ancestor entry 

546 this: The current branch entry 

547 other: The other branch entry 

548 """ 

549 self.ancestor = ancestor 

550 self.this = this 

551 self.other = other 

552 

553 

554class UnmergedEntries(Exception): 

555 """Unmerged entries exist in the index.""" 

556 

557 

558def pathsplit(path: bytes) -> tuple[bytes, bytes]: 

559 """Split a /-delimited path into a directory part and a basename. 

560 

561 Args: 

562 path: The path to split. 

563 

564 Returns: 

565 Tuple with directory name and basename 

566 """ 

567 try: 

568 (dirname, basename) = path.rsplit(b"/", 1) 

569 except ValueError: 

570 return (b"", path) 

571 else: 

572 return (dirname, basename) 

573 

574 

575def pathjoin(*args: bytes) -> bytes: 

576 """Join a /-delimited path.""" 

577 return b"/".join([p for p in args if p]) 

578 

579 

580def read_cache_time(f: BinaryIO) -> tuple[int, int]: 

581 """Read a cache time. 

582 

583 Args: 

584 f: File-like object to read from 

585 Returns: 

586 Tuple with seconds and nanoseconds 

587 """ 

588 return struct.unpack(">LL", f.read(8)) 

589 

590 

591def write_cache_time(f: IO[bytes], t: Union[int, float, tuple[int, int]]) -> None: 

592 """Write a cache time. 

593 

594 Args: 

595 f: File-like object to write to 

596 t: Time to write (as int, float or tuple with secs and nsecs) 

597 """ 

598 if isinstance(t, int): 

599 t = (t, 0) 

600 elif isinstance(t, float): 

601 (secs, nsecs) = divmod(t, 1.0) 

602 t = (int(secs), int(nsecs * 1000000000)) 

603 elif not isinstance(t, tuple): 

604 raise TypeError(t) 

605 f.write(struct.pack(">LL", *t)) 

606 

607 

608def read_cache_entry( 

609 f: BinaryIO, version: int, previous_path: bytes = b"" 

610) -> SerializedIndexEntry: 

611 """Read an entry from a cache file. 

612 

613 Args: 

614 f: File-like object to read from 

615 version: Index version 

616 previous_path: Previous entry's path (for version 4 compression) 

617 """ 

618 beginoffset = f.tell() 

619 ctime = read_cache_time(f) 

620 mtime = read_cache_time(f) 

621 ( 

622 dev, 

623 ino, 

624 mode, 

625 uid, 

626 gid, 

627 size, 

628 sha, 

629 flags, 

630 ) = struct.unpack(">LLLLLL20sH", f.read(20 + 4 * 6 + 2)) 

631 if flags & FLAG_EXTENDED: 

632 if version < 3: 

633 raise AssertionError("extended flag set in index with version < 3") 

634 (extended_flags,) = struct.unpack(">H", f.read(2)) 

635 else: 

636 extended_flags = 0 

637 

638 if version >= 4: 

639 # Version 4: paths are always compressed (name_len should be 0) 

640 name, consumed = _decompress_path_from_stream(f, previous_path) 

641 else: 

642 # Versions < 4: regular name reading 

643 name = f.read(flags & FLAG_NAMEMASK) 

644 

645 # Padding: 

646 if version < 4: 

647 real_size = (f.tell() - beginoffset + 8) & ~7 

648 f.read((beginoffset + real_size) - f.tell()) 

649 

650 return SerializedIndexEntry( 

651 name, 

652 ctime, 

653 mtime, 

654 dev, 

655 ino, 

656 mode, 

657 uid, 

658 gid, 

659 size, 

660 sha_to_hex(sha), 

661 flags & ~FLAG_NAMEMASK, 

662 extended_flags, 

663 ) 

664 

665 

666def write_cache_entry( 

667 f: IO[bytes], entry: SerializedIndexEntry, version: int, previous_path: bytes = b"" 

668) -> None: 

669 """Write an index entry to a file. 

670 

671 Args: 

672 f: File object 

673 entry: IndexEntry to write 

674 version: Index format version 

675 previous_path: Previous entry's path (for version 4 compression) 

676 """ 

677 beginoffset = f.tell() 

678 write_cache_time(f, entry.ctime) 

679 write_cache_time(f, entry.mtime) 

680 

681 if version >= 4: 

682 # Version 4: use compression but set name_len to actual filename length 

683 # This matches how C Git implements index v4 flags 

684 compressed_path = _compress_path(entry.name, previous_path) 

685 flags = len(entry.name) | (entry.flags & ~FLAG_NAMEMASK) 

686 else: 

687 # Versions < 4: include actual name length 

688 flags = len(entry.name) | (entry.flags & ~FLAG_NAMEMASK) 

689 

690 if entry.extended_flags: 

691 flags |= FLAG_EXTENDED 

692 if flags & FLAG_EXTENDED and version is not None and version < 3: 

693 raise AssertionError("unable to use extended flags in version < 3") 

694 

695 f.write( 

696 struct.pack( 

697 b">LLLLLL20sH", 

698 entry.dev & 0xFFFFFFFF, 

699 entry.ino & 0xFFFFFFFF, 

700 entry.mode, 

701 entry.uid, 

702 entry.gid, 

703 entry.size, 

704 hex_to_sha(entry.sha), 

705 flags, 

706 ) 

707 ) 

708 if flags & FLAG_EXTENDED: 

709 f.write(struct.pack(b">H", entry.extended_flags)) 

710 

711 if version >= 4: 

712 # Version 4: always write compressed path 

713 f.write(compressed_path) 

714 else: 

715 # Versions < 4: write regular path and padding 

716 f.write(entry.name) 

717 real_size = (f.tell() - beginoffset + 8) & ~7 

718 f.write(b"\0" * ((beginoffset + real_size) - f.tell())) 

719 

720 

721class UnsupportedIndexFormat(Exception): 

722 """An unsupported index format was encountered.""" 

723 

724 def __init__(self, version: int) -> None: 

725 """Initialize UnsupportedIndexFormat exception. 

726 

727 Args: 

728 version: The unsupported index format version 

729 """ 

730 self.index_format_version = version 

731 

732 

733def read_index_header(f: BinaryIO) -> tuple[int, int]: 

734 """Read an index header from a file. 

735 

736 Returns: 

737 tuple of (version, num_entries) 

738 """ 

739 header = f.read(4) 

740 if header != b"DIRC": 

741 raise AssertionError(f"Invalid index file header: {header!r}") 

742 (version, num_entries) = struct.unpack(b">LL", f.read(4 * 2)) 

743 if version not in (1, 2, 3, 4): 

744 raise UnsupportedIndexFormat(version) 

745 return version, num_entries 

746 

747 

748def write_index_extension(f: IO[bytes], extension: IndexExtension) -> None: 

749 """Write an index extension. 

750 

751 Args: 

752 f: File-like object to write to 

753 extension: Extension to write 

754 """ 

755 data = extension.to_bytes() 

756 f.write(extension.signature) 

757 f.write(struct.pack(">I", len(data))) 

758 f.write(data) 

759 

760 

761def read_index(f: BinaryIO) -> Iterator[SerializedIndexEntry]: 

762 """Read an index file, yielding the individual entries.""" 

763 version, num_entries = read_index_header(f) 

764 previous_path = b"" 

765 for i in range(num_entries): 

766 entry = read_cache_entry(f, version, previous_path) 

767 previous_path = entry.name 

768 yield entry 

769 

770 

771def read_index_dict_with_version( 

772 f: BinaryIO, 

773) -> tuple[ 

774 dict[bytes, Union[IndexEntry, ConflictedIndexEntry]], int, list[IndexExtension] 

775]: 

776 """Read an index file and return it as a dictionary along with the version. 

777 

778 Returns: 

779 tuple of (entries_dict, version, extensions) 

780 """ 

781 version, num_entries = read_index_header(f) 

782 

783 ret: dict[bytes, Union[IndexEntry, ConflictedIndexEntry]] = {} 

784 previous_path = b"" 

785 for i in range(num_entries): 

786 entry = read_cache_entry(f, version, previous_path) 

787 previous_path = entry.name 

788 stage = entry.stage() 

789 if stage == Stage.NORMAL: 

790 ret[entry.name] = IndexEntry.from_serialized(entry) 

791 else: 

792 existing = ret.setdefault(entry.name, ConflictedIndexEntry()) 

793 if isinstance(existing, IndexEntry): 

794 raise AssertionError(f"Non-conflicted entry for {entry.name!r} exists") 

795 if stage == Stage.MERGE_CONFLICT_ANCESTOR: 

796 existing.ancestor = IndexEntry.from_serialized(entry) 

797 elif stage == Stage.MERGE_CONFLICT_THIS: 

798 existing.this = IndexEntry.from_serialized(entry) 

799 elif stage == Stage.MERGE_CONFLICT_OTHER: 

800 existing.other = IndexEntry.from_serialized(entry) 

801 

802 # Read extensions 

803 extensions = [] 

804 while True: 

805 # Check if we're at the end (20 bytes before EOF for SHA checksum) 

806 current_pos = f.tell() 

807 f.seek(0, 2) # EOF 

808 eof_pos = f.tell() 

809 f.seek(current_pos) 

810 

811 if current_pos >= eof_pos - 20: 

812 break 

813 

814 # Try to read extension signature 

815 signature = f.read(4) 

816 if len(signature) < 4: 

817 break 

818 

819 # Check if it's a valid extension signature (4 uppercase letters) 

820 if not all(65 <= b <= 90 for b in signature): 

821 # Not an extension, seek back 

822 f.seek(-4, 1) 

823 break 

824 

825 # Read extension size 

826 size_data = f.read(4) 

827 if len(size_data) < 4: 

828 break 

829 size = struct.unpack(">I", size_data)[0] 

830 

831 # Read extension data 

832 data = f.read(size) 

833 if len(data) < size: 

834 break 

835 

836 extension = IndexExtension.from_raw(signature, data) 

837 extensions.append(extension) 

838 

839 return ret, version, extensions 

840 

841 

842def read_index_dict( 

843 f: BinaryIO, 

844) -> dict[bytes, Union[IndexEntry, ConflictedIndexEntry]]: 

845 """Read an index file and return it as a dictionary. 

846 

847 Dict Key is tuple of path and stage number, as 

848 path alone is not unique 

849 Args: 

850 f: File object to read fromls. 

851 """ 

852 ret: dict[bytes, Union[IndexEntry, ConflictedIndexEntry]] = {} 

853 for entry in read_index(f): 

854 stage = entry.stage() 

855 if stage == Stage.NORMAL: 

856 ret[entry.name] = IndexEntry.from_serialized(entry) 

857 else: 

858 existing = ret.setdefault(entry.name, ConflictedIndexEntry()) 

859 if isinstance(existing, IndexEntry): 

860 raise AssertionError(f"Non-conflicted entry for {entry.name!r} exists") 

861 if stage == Stage.MERGE_CONFLICT_ANCESTOR: 

862 existing.ancestor = IndexEntry.from_serialized(entry) 

863 elif stage == Stage.MERGE_CONFLICT_THIS: 

864 existing.this = IndexEntry.from_serialized(entry) 

865 elif stage == Stage.MERGE_CONFLICT_OTHER: 

866 existing.other = IndexEntry.from_serialized(entry) 

867 return ret 

868 

869 

870def write_index( 

871 f: IO[bytes], 

872 entries: list[SerializedIndexEntry], 

873 version: Optional[int] = None, 

874 extensions: Optional[list[IndexExtension]] = None, 

875) -> None: 

876 """Write an index file. 

877 

878 Args: 

879 f: File-like object to write to 

880 version: Version number to write 

881 entries: Iterable over the entries to write 

882 extensions: Optional list of extensions to write 

883 """ 

884 if version is None: 

885 version = DEFAULT_VERSION 

886 # STEP 1: check if any extended_flags are set 

887 uses_extended_flags = any(e.extended_flags != 0 for e in entries) 

888 if uses_extended_flags and version < 3: 

889 # Force or bump the version to 3 

890 version = 3 

891 # The rest is unchanged, but you might insert a final check: 

892 if version < 3: 

893 # Double-check no extended flags appear 

894 for e in entries: 

895 if e.extended_flags != 0: 

896 raise AssertionError("Attempt to use extended flags in index < v3") 

897 # Proceed with the existing code to write the header and entries. 

898 f.write(b"DIRC") 

899 f.write(struct.pack(b">LL", version, len(entries))) 

900 previous_path = b"" 

901 for entry in entries: 

902 write_cache_entry(f, entry, version=version, previous_path=previous_path) 

903 previous_path = entry.name 

904 

905 # Write extensions 

906 if extensions: 

907 for extension in extensions: 

908 write_index_extension(f, extension) 

909 

910 

911def write_index_dict( 

912 f: IO[bytes], 

913 entries: dict[bytes, Union[IndexEntry, ConflictedIndexEntry]], 

914 version: Optional[int] = None, 

915 extensions: Optional[list[IndexExtension]] = None, 

916) -> None: 

917 """Write an index file based on the contents of a dictionary. 

918 

919 being careful to sort by path and then by stage. 

920 """ 

921 entries_list = [] 

922 for key in sorted(entries): 

923 value = entries[key] 

924 if isinstance(value, ConflictedIndexEntry): 

925 if value.ancestor is not None: 

926 entries_list.append( 

927 value.ancestor.serialize(key, Stage.MERGE_CONFLICT_ANCESTOR) 

928 ) 

929 if value.this is not None: 

930 entries_list.append( 

931 value.this.serialize(key, Stage.MERGE_CONFLICT_THIS) 

932 ) 

933 if value.other is not None: 

934 entries_list.append( 

935 value.other.serialize(key, Stage.MERGE_CONFLICT_OTHER) 

936 ) 

937 else: 

938 entries_list.append(value.serialize(key, Stage.NORMAL)) 

939 

940 write_index(f, entries_list, version=version, extensions=extensions) 

941 

942 

943def cleanup_mode(mode: int) -> int: 

944 """Cleanup a mode value. 

945 

946 This will return a mode that can be stored in a tree object. 

947 

948 Args: 

949 mode: Mode to clean up. 

950 

951 Returns: 

952 mode 

953 """ 

954 if stat.S_ISLNK(mode): 

955 return stat.S_IFLNK 

956 elif stat.S_ISDIR(mode): 

957 return stat.S_IFDIR 

958 elif S_ISGITLINK(mode): 

959 return S_IFGITLINK 

960 ret = stat.S_IFREG | 0o644 

961 if mode & 0o100: 

962 ret |= 0o111 

963 return ret 

964 

965 

966class Index: 

967 """A Git Index file.""" 

968 

969 _byname: dict[bytes, Union[IndexEntry, ConflictedIndexEntry]] 

970 

971 def __init__( 

972 self, 

973 filename: Union[bytes, str, os.PathLike], 

974 read: bool = True, 

975 skip_hash: bool = False, 

976 version: Optional[int] = None, 

977 ) -> None: 

978 """Create an index object associated with the given filename. 

979 

980 Args: 

981 filename: Path to the index file 

982 read: Whether to initialize the index from the given file, should it exist. 

983 skip_hash: Whether to skip SHA1 hash when writing (for manyfiles feature) 

984 version: Index format version to use (None = auto-detect from file or use default) 

985 """ 

986 self._filename = os.fspath(filename) 

987 # TODO(jelmer): Store the version returned by read_index 

988 self._version = version 

989 self._skip_hash = skip_hash 

990 self._extensions: list[IndexExtension] = [] 

991 self.clear() 

992 if read: 

993 self.read() 

994 

995 @property 

996 def path(self) -> Union[bytes, str]: 

997 """Get the path to the index file. 

998 

999 Returns: 

1000 Path to the index file 

1001 """ 

1002 return self._filename 

1003 

1004 def __repr__(self) -> str: 

1005 """Return string representation of Index.""" 

1006 return f"{self.__class__.__name__}({self._filename!r})" 

1007 

1008 def write(self) -> None: 

1009 """Write current contents of index to disk.""" 

1010 f = GitFile(self._filename, "wb") 

1011 try: 

1012 # Filter out extensions with no meaningful data 

1013 meaningful_extensions = [] 

1014 for ext in self._extensions: 

1015 # Skip extensions that have empty data 

1016 ext_data = ext.to_bytes() 

1017 if ext_data: 

1018 meaningful_extensions.append(ext) 

1019 

1020 if self._skip_hash: 

1021 # When skipHash is enabled, write the index without computing SHA1 

1022 write_index_dict( 

1023 f, 

1024 self._byname, 

1025 version=self._version, 

1026 extensions=meaningful_extensions, 

1027 ) 

1028 # Write 20 zero bytes instead of SHA1 

1029 f.write(b"\x00" * 20) 

1030 f.close() 

1031 else: 

1032 sha1_writer = SHA1Writer(f) 

1033 write_index_dict( 

1034 sha1_writer, 

1035 self._byname, 

1036 version=self._version, 

1037 extensions=meaningful_extensions, 

1038 ) 

1039 sha1_writer.close() 

1040 except: 

1041 f.close() 

1042 raise 

1043 

1044 def read(self) -> None: 

1045 """Read current contents of index from disk.""" 

1046 if not os.path.exists(self._filename): 

1047 return 

1048 f = GitFile(self._filename, "rb") 

1049 try: 

1050 sha1_reader = SHA1Reader(f) 

1051 entries, version, extensions = read_index_dict_with_version(sha1_reader) 

1052 self._version = version 

1053 self._extensions = extensions 

1054 self.update(entries) 

1055 # Extensions have already been read by read_index_dict_with_version 

1056 sha1_reader.check_sha(allow_empty=True) 

1057 finally: 

1058 f.close() 

1059 

1060 def __len__(self) -> int: 

1061 """Number of entries in this index file.""" 

1062 return len(self._byname) 

1063 

1064 def __getitem__(self, key: bytes) -> Union[IndexEntry, ConflictedIndexEntry]: 

1065 """Retrieve entry by relative path and stage. 

1066 

1067 Returns: Either a IndexEntry or a ConflictedIndexEntry 

1068 Raises KeyError: if the entry does not exist 

1069 """ 

1070 return self._byname[key] 

1071 

1072 def __iter__(self) -> Iterator[bytes]: 

1073 """Iterate over the paths and stages in this index.""" 

1074 return iter(self._byname) 

1075 

1076 def __contains__(self, key: bytes) -> bool: 

1077 """Check if a path exists in the index.""" 

1078 return key in self._byname 

1079 

1080 def get_sha1(self, path: bytes) -> bytes: 

1081 """Return the (git object) SHA1 for the object at a path.""" 

1082 value = self[path] 

1083 if isinstance(value, ConflictedIndexEntry): 

1084 raise UnmergedEntries 

1085 return value.sha 

1086 

1087 def get_mode(self, path: bytes) -> int: 

1088 """Return the POSIX file mode for the object at a path.""" 

1089 value = self[path] 

1090 if isinstance(value, ConflictedIndexEntry): 

1091 raise UnmergedEntries 

1092 return value.mode 

1093 

1094 def iterobjects(self) -> Iterable[tuple[bytes, bytes, int]]: 

1095 """Iterate over path, sha, mode tuples for use with commit_tree.""" 

1096 for path in self: 

1097 entry = self[path] 

1098 if isinstance(entry, ConflictedIndexEntry): 

1099 raise UnmergedEntries 

1100 yield path, entry.sha, cleanup_mode(entry.mode) 

1101 

1102 def has_conflicts(self) -> bool: 

1103 """Check if the index contains any conflicted entries. 

1104 

1105 Returns: 

1106 True if any entries are conflicted, False otherwise 

1107 """ 

1108 for value in self._byname.values(): 

1109 if isinstance(value, ConflictedIndexEntry): 

1110 return True 

1111 return False 

1112 

1113 def clear(self) -> None: 

1114 """Remove all contents from this index.""" 

1115 self._byname = {} 

1116 

1117 def __setitem__( 

1118 self, name: bytes, value: Union[IndexEntry, ConflictedIndexEntry] 

1119 ) -> None: 

1120 """Set an entry in the index.""" 

1121 assert isinstance(name, bytes) 

1122 self._byname[name] = value 

1123 

1124 def __delitem__(self, name: bytes) -> None: 

1125 """Delete an entry from the index.""" 

1126 del self._byname[name] 

1127 

1128 def iteritems( 

1129 self, 

1130 ) -> Iterator[tuple[bytes, Union[IndexEntry, ConflictedIndexEntry]]]: 

1131 """Iterate over (path, entry) pairs in the index. 

1132 

1133 Returns: 

1134 Iterator of (path, entry) tuples 

1135 """ 

1136 return iter(self._byname.items()) 

1137 

1138 def items(self) -> Iterator[tuple[bytes, Union[IndexEntry, ConflictedIndexEntry]]]: 

1139 """Get an iterator over (path, entry) pairs. 

1140 

1141 Returns: 

1142 Iterator of (path, entry) tuples 

1143 """ 

1144 return iter(self._byname.items()) 

1145 

1146 def update( 

1147 self, entries: dict[bytes, Union[IndexEntry, ConflictedIndexEntry]] 

1148 ) -> None: 

1149 """Update the index with multiple entries. 

1150 

1151 Args: 

1152 entries: Dictionary mapping paths to index entries 

1153 """ 

1154 for key, value in entries.items(): 

1155 self[key] = value 

1156 

1157 def paths(self) -> Generator[bytes, None, None]: 

1158 """Generate all paths in the index. 

1159 

1160 Yields: 

1161 Path names as bytes 

1162 """ 

1163 yield from self._byname.keys() 

1164 

1165 def changes_from_tree( 

1166 self, 

1167 object_store: ObjectContainer, 

1168 tree: ObjectID, 

1169 want_unchanged: bool = False, 

1170 ) -> Generator[ 

1171 tuple[ 

1172 tuple[Optional[bytes], Optional[bytes]], 

1173 tuple[Optional[int], Optional[int]], 

1174 tuple[Optional[bytes], Optional[bytes]], 

1175 ], 

1176 None, 

1177 None, 

1178 ]: 

1179 """Find the differences between the contents of this index and a tree. 

1180 

1181 Args: 

1182 object_store: Object store to use for retrieving tree contents 

1183 tree: SHA1 of the root tree 

1184 want_unchanged: Whether unchanged files should be reported 

1185 Returns: Iterator over tuples with (oldpath, newpath), (oldmode, 

1186 newmode), (oldsha, newsha) 

1187 """ 

1188 

1189 def lookup_entry(path: bytes) -> tuple[bytes, int]: 

1190 entry = self[path] 

1191 if hasattr(entry, "sha") and hasattr(entry, "mode"): 

1192 return entry.sha, cleanup_mode(entry.mode) 

1193 else: 

1194 # Handle ConflictedIndexEntry case 

1195 return b"", 0 

1196 

1197 yield from changes_from_tree( 

1198 self.paths(), 

1199 lookup_entry, 

1200 object_store, 

1201 tree, 

1202 want_unchanged=want_unchanged, 

1203 ) 

1204 

1205 def commit(self, object_store: ObjectContainer) -> bytes: 

1206 """Create a new tree from an index. 

1207 

1208 Args: 

1209 object_store: Object store to save the tree in 

1210 Returns: 

1211 Root tree SHA 

1212 """ 

1213 return commit_tree(object_store, self.iterobjects()) 

1214 

1215 

1216def commit_tree( 

1217 object_store: ObjectContainer, blobs: Iterable[tuple[bytes, bytes, int]] 

1218) -> bytes: 

1219 """Commit a new tree. 

1220 

1221 Args: 

1222 object_store: Object store to add trees to 

1223 blobs: Iterable over blob path, sha, mode entries 

1224 Returns: 

1225 SHA1 of the created tree. 

1226 """ 

1227 trees: dict[bytes, Any] = {b"": {}} 

1228 

1229 def add_tree(path: bytes) -> dict[bytes, Any]: 

1230 if path in trees: 

1231 return trees[path] 

1232 dirname, basename = pathsplit(path) 

1233 t = add_tree(dirname) 

1234 assert isinstance(basename, bytes) 

1235 newtree: dict[bytes, Any] = {} 

1236 t[basename] = newtree 

1237 trees[path] = newtree 

1238 return newtree 

1239 

1240 for path, sha, mode in blobs: 

1241 tree_path, basename = pathsplit(path) 

1242 tree = add_tree(tree_path) 

1243 tree[basename] = (mode, sha) 

1244 

1245 def build_tree(path: bytes) -> bytes: 

1246 tree = Tree() 

1247 for basename, entry in trees[path].items(): 

1248 if isinstance(entry, dict): 

1249 mode = stat.S_IFDIR 

1250 sha = build_tree(pathjoin(path, basename)) 

1251 else: 

1252 (mode, sha) = entry 

1253 tree.add(basename, mode, sha) 

1254 object_store.add_object(tree) 

1255 return tree.id 

1256 

1257 return build_tree(b"") 

1258 

1259 

1260def commit_index(object_store: ObjectContainer, index: Index) -> bytes: 

1261 """Create a new tree from an index. 

1262 

1263 Args: 

1264 object_store: Object store to save the tree in 

1265 index: Index file 

1266 Note: This function is deprecated, use index.commit() instead. 

1267 Returns: Root tree sha. 

1268 """ 

1269 return commit_tree(object_store, index.iterobjects()) 

1270 

1271 

1272def changes_from_tree( 

1273 names: Iterable[bytes], 

1274 lookup_entry: Callable[[bytes], tuple[bytes, int]], 

1275 object_store: ObjectContainer, 

1276 tree: Optional[bytes], 

1277 want_unchanged: bool = False, 

1278) -> Iterable[ 

1279 tuple[ 

1280 tuple[Optional[bytes], Optional[bytes]], 

1281 tuple[Optional[int], Optional[int]], 

1282 tuple[Optional[bytes], Optional[bytes]], 

1283 ] 

1284]: 

1285 """Find the differences between the contents of a tree and a working copy. 

1286 

1287 Args: 

1288 names: Iterable of names in the working copy 

1289 lookup_entry: Function to lookup an entry in the working copy 

1290 object_store: Object store to use for retrieving tree contents 

1291 tree: SHA1 of the root tree, or None for an empty tree 

1292 want_unchanged: Whether unchanged files should be reported 

1293 Returns: Iterator over tuples with (oldpath, newpath), (oldmode, newmode), 

1294 (oldsha, newsha) 

1295 """ 

1296 # TODO(jelmer): Support a include_trees option 

1297 other_names = set(names) 

1298 

1299 if tree is not None: 

1300 for name, mode, sha in iter_tree_contents(object_store, tree): 

1301 try: 

1302 (other_sha, other_mode) = lookup_entry(name) 

1303 except KeyError: 

1304 # Was removed 

1305 yield ((name, None), (mode, None), (sha, None)) 

1306 else: 

1307 other_names.remove(name) 

1308 if want_unchanged or other_sha != sha or other_mode != mode: 

1309 yield ((name, name), (mode, other_mode), (sha, other_sha)) 

1310 

1311 # Mention added files 

1312 for name in other_names: 

1313 try: 

1314 (other_sha, other_mode) = lookup_entry(name) 

1315 except KeyError: 

1316 pass 

1317 else: 

1318 yield ((None, name), (None, other_mode), (None, other_sha)) 

1319 

1320 

1321def index_entry_from_stat( 

1322 stat_val: os.stat_result, 

1323 hex_sha: bytes, 

1324 mode: Optional[int] = None, 

1325) -> IndexEntry: 

1326 """Create a new index entry from a stat value. 

1327 

1328 Args: 

1329 stat_val: POSIX stat_result instance 

1330 hex_sha: Hex sha of the object 

1331 mode: Optional file mode, will be derived from stat if not provided 

1332 """ 

1333 if mode is None: 

1334 mode = cleanup_mode(stat_val.st_mode) 

1335 

1336 return IndexEntry( 

1337 ctime=stat_val.st_ctime, 

1338 mtime=stat_val.st_mtime, 

1339 dev=stat_val.st_dev, 

1340 ino=stat_val.st_ino, 

1341 mode=mode, 

1342 uid=stat_val.st_uid, 

1343 gid=stat_val.st_gid, 

1344 size=stat_val.st_size, 

1345 sha=hex_sha, 

1346 flags=0, 

1347 extended_flags=0, 

1348 ) 

1349 

1350 

1351if sys.platform == "win32": 

1352 # On Windows, creating symlinks either requires administrator privileges 

1353 # or developer mode. Raise a more helpful error when we're unable to 

1354 # create symlinks 

1355 

1356 # https://github.com/jelmer/dulwich/issues/1005 

1357 

1358 class WindowsSymlinkPermissionError(PermissionError): 

1359 """Windows-specific error for symlink creation failures. 

1360 

1361 This error is raised when symlink creation fails on Windows, 

1362 typically due to lack of developer mode or administrator privileges. 

1363 """ 

1364 

1365 def __init__(self, errno: int, msg: str, filename: Optional[str]) -> None: 

1366 """Initialize WindowsSymlinkPermissionError.""" 

1367 super(PermissionError, self).__init__( 

1368 errno, 

1369 f"Unable to create symlink; do you have developer mode enabled? {msg}", 

1370 filename, 

1371 ) 

1372 

1373 def symlink( 

1374 src: Union[str, bytes], 

1375 dst: Union[str, bytes], 

1376 target_is_directory: bool = False, 

1377 *, 

1378 dir_fd: Optional[int] = None, 

1379 ) -> None: 

1380 """Create a symbolic link on Windows with better error handling. 

1381 

1382 Args: 

1383 src: Source path for the symlink 

1384 dst: Destination path where symlink will be created 

1385 target_is_directory: Whether the target is a directory 

1386 dir_fd: Optional directory file descriptor 

1387 

1388 Raises: 

1389 WindowsSymlinkPermissionError: If symlink creation fails due to permissions 

1390 """ 

1391 try: 

1392 return os.symlink( 

1393 src, dst, target_is_directory=target_is_directory, dir_fd=dir_fd 

1394 ) 

1395 except PermissionError as e: 

1396 raise WindowsSymlinkPermissionError( 

1397 e.errno or 0, e.strerror or "", e.filename 

1398 ) from e 

1399else: 

1400 symlink = os.symlink 

1401 

1402 

1403def build_file_from_blob( 

1404 blob: Blob, 

1405 mode: int, 

1406 target_path: bytes, 

1407 *, 

1408 honor_filemode: bool = True, 

1409 tree_encoding: str = "utf-8", 

1410 symlink_fn: Optional[ 

1411 Callable[[Union[str, bytes, os.PathLike], Union[str, bytes, os.PathLike]], None] 

1412 ] = None, 

1413) -> os.stat_result: 

1414 """Build a file or symlink on disk based on a Git object. 

1415 

1416 Args: 

1417 blob: The git object 

1418 mode: File mode 

1419 target_path: Path to write to 

1420 honor_filemode: An optional flag to honor core.filemode setting in 

1421 config file, default is core.filemode=True, change executable bit 

1422 tree_encoding: Encoding to use for tree contents 

1423 symlink_fn: Function to use for creating symlinks 

1424 Returns: stat object for the file 

1425 """ 

1426 try: 

1427 oldstat = os.lstat(target_path) 

1428 except FileNotFoundError: 

1429 oldstat = None 

1430 contents = blob.as_raw_string() 

1431 if stat.S_ISLNK(mode): 

1432 if oldstat: 

1433 _remove_file_with_readonly_handling(target_path) 

1434 if sys.platform == "win32": 

1435 # os.readlink on Python3 on Windows requires a unicode string. 

1436 contents_str = contents.decode(tree_encoding) 

1437 target_path_str = target_path.decode(tree_encoding) 

1438 (symlink_fn or symlink)(contents_str, target_path_str) 

1439 else: 

1440 (symlink_fn or symlink)(contents, target_path) 

1441 else: 

1442 if oldstat is not None and oldstat.st_size == len(contents): 

1443 with open(target_path, "rb") as f: 

1444 if f.read() == contents: 

1445 return oldstat 

1446 

1447 with open(target_path, "wb") as f: 

1448 # Write out file 

1449 f.write(contents) 

1450 

1451 if honor_filemode: 

1452 os.chmod(target_path, mode) 

1453 

1454 return os.lstat(target_path) 

1455 

1456 

1457INVALID_DOTNAMES = (b".git", b".", b"..", b"") 

1458 

1459 

1460def _normalize_path_element_default(element: bytes) -> bytes: 

1461 """Normalize path element for default case-insensitive comparison.""" 

1462 return element.lower() 

1463 

1464 

1465def _normalize_path_element_ntfs(element: bytes) -> bytes: 

1466 """Normalize path element for NTFS filesystem.""" 

1467 return element.rstrip(b". ").lower() 

1468 

1469 

1470def _normalize_path_element_hfs(element: bytes) -> bytes: 

1471 """Normalize path element for HFS+ filesystem.""" 

1472 import unicodedata 

1473 

1474 # Decode to Unicode (let UnicodeDecodeError bubble up) 

1475 element_str = element.decode("utf-8", errors="strict") 

1476 

1477 # Remove HFS+ ignorable characters 

1478 filtered = "".join(c for c in element_str if ord(c) not in HFS_IGNORABLE_CHARS) 

1479 # Normalize to NFD 

1480 normalized = unicodedata.normalize("NFD", filtered) 

1481 return normalized.lower().encode("utf-8", errors="strict") 

1482 

1483 

1484def get_path_element_normalizer(config: "Config") -> Callable[[bytes], bytes]: 

1485 """Get the appropriate path element normalization function based on config. 

1486 

1487 Args: 

1488 config: Repository configuration object 

1489 

1490 Returns: 

1491 Function that normalizes path elements for the configured filesystem 

1492 """ 

1493 import os 

1494 import sys 

1495 

1496 if config.get_boolean(b"core", b"protectNTFS", os.name == "nt"): 

1497 return _normalize_path_element_ntfs 

1498 elif config.get_boolean(b"core", b"protectHFS", sys.platform == "darwin"): 

1499 return _normalize_path_element_hfs 

1500 else: 

1501 return _normalize_path_element_default 

1502 

1503 

1504def validate_path_element_default(element: bytes) -> bool: 

1505 """Validate a path element using default rules. 

1506 

1507 Args: 

1508 element: Path element to validate 

1509 

1510 Returns: 

1511 True if path element is valid, False otherwise 

1512 """ 

1513 return _normalize_path_element_default(element) not in INVALID_DOTNAMES 

1514 

1515 

1516def validate_path_element_ntfs(element: bytes) -> bool: 

1517 """Validate a path element using NTFS filesystem rules. 

1518 

1519 Args: 

1520 element: Path element to validate 

1521 

1522 Returns: 

1523 True if path element is valid for NTFS, False otherwise 

1524 """ 

1525 normalized = _normalize_path_element_ntfs(element) 

1526 if normalized in INVALID_DOTNAMES: 

1527 return False 

1528 if normalized == b"git~1": 

1529 return False 

1530 return True 

1531 

1532 

1533# HFS+ ignorable Unicode codepoints (from Git's utf8.c) 

1534HFS_IGNORABLE_CHARS = { 

1535 0x200C, # ZERO WIDTH NON-JOINER 

1536 0x200D, # ZERO WIDTH JOINER 

1537 0x200E, # LEFT-TO-RIGHT MARK 

1538 0x200F, # RIGHT-TO-LEFT MARK 

1539 0x202A, # LEFT-TO-RIGHT EMBEDDING 

1540 0x202B, # RIGHT-TO-LEFT EMBEDDING 

1541 0x202C, # POP DIRECTIONAL FORMATTING 

1542 0x202D, # LEFT-TO-RIGHT OVERRIDE 

1543 0x202E, # RIGHT-TO-LEFT OVERRIDE 

1544 0x206A, # INHIBIT SYMMETRIC SWAPPING 

1545 0x206B, # ACTIVATE SYMMETRIC SWAPPING 

1546 0x206C, # INHIBIT ARABIC FORM SHAPING 

1547 0x206D, # ACTIVATE ARABIC FORM SHAPING 

1548 0x206E, # NATIONAL DIGIT SHAPES 

1549 0x206F, # NOMINAL DIGIT SHAPES 

1550 0xFEFF, # ZERO WIDTH NO-BREAK SPACE 

1551} 

1552 

1553 

1554def validate_path_element_hfs(element: bytes) -> bool: 

1555 """Validate path element for HFS+ filesystem. 

1556 

1557 Equivalent to Git's is_hfs_dotgit and related checks. 

1558 Uses NFD normalization and ignores HFS+ ignorable characters. 

1559 """ 

1560 try: 

1561 normalized = _normalize_path_element_hfs(element) 

1562 except UnicodeDecodeError: 

1563 # Malformed UTF-8 - be conservative and reject 

1564 return False 

1565 

1566 # Check against invalid names 

1567 if normalized in INVALID_DOTNAMES: 

1568 return False 

1569 

1570 # Also check for 8.3 short name 

1571 if normalized == b"git~1": 

1572 return False 

1573 

1574 return True 

1575 

1576 

1577def validate_path( 

1578 path: bytes, 

1579 element_validator: Callable[[bytes], bool] = validate_path_element_default, 

1580) -> bool: 

1581 """Default path validator that just checks for .git/.""" 

1582 parts = path.split(b"/") 

1583 for p in parts: 

1584 if not element_validator(p): 

1585 return False 

1586 else: 

1587 return True 

1588 

1589 

1590def build_index_from_tree( 

1591 root_path: Union[str, bytes], 

1592 index_path: Union[str, bytes], 

1593 object_store: ObjectContainer, 

1594 tree_id: bytes, 

1595 honor_filemode: bool = True, 

1596 validate_path_element: Callable[[bytes], bool] = validate_path_element_default, 

1597 symlink_fn: Optional[ 

1598 Callable[[Union[str, bytes, os.PathLike], Union[str, bytes, os.PathLike]], None] 

1599 ] = None, 

1600 blob_normalizer: Optional["BlobNormalizer"] = None, 

1601 tree_encoding: str = "utf-8", 

1602) -> None: 

1603 """Generate and materialize index from a tree. 

1604 

1605 Args: 

1606 tree_id: Tree to materialize 

1607 root_path: Target dir for materialized index files 

1608 index_path: Target path for generated index 

1609 object_store: Non-empty object store holding tree contents 

1610 honor_filemode: An optional flag to honor core.filemode setting in 

1611 config file, default is core.filemode=True, change executable bit 

1612 validate_path_element: Function to validate path elements to check 

1613 out; default just refuses .git and .. directories. 

1614 symlink_fn: Function to use for creating symlinks 

1615 blob_normalizer: An optional BlobNormalizer to use for converting line 

1616 endings when writing blobs to the working directory. 

1617 tree_encoding: Encoding used for tree paths (default: utf-8) 

1618 

1619 Note: existing index is wiped and contents are not merged 

1620 in a working dir. Suitable only for fresh clones. 

1621 """ 

1622 index = Index(index_path, read=False) 

1623 if not isinstance(root_path, bytes): 

1624 root_path = os.fsencode(root_path) 

1625 

1626 for entry in iter_tree_contents(object_store, tree_id): 

1627 if not validate_path(entry.path, validate_path_element): 

1628 continue 

1629 full_path = _tree_to_fs_path(root_path, entry.path, tree_encoding) 

1630 

1631 if not os.path.exists(os.path.dirname(full_path)): 

1632 os.makedirs(os.path.dirname(full_path)) 

1633 

1634 # TODO(jelmer): Merge new index into working tree 

1635 if S_ISGITLINK(entry.mode): 

1636 if not os.path.isdir(full_path): 

1637 os.mkdir(full_path) 

1638 st = os.lstat(full_path) 

1639 # TODO(jelmer): record and return submodule paths 

1640 else: 

1641 obj = object_store[entry.sha] 

1642 assert isinstance(obj, Blob) 

1643 # Apply blob normalization for checkout if normalizer is provided 

1644 if blob_normalizer is not None: 

1645 obj = blob_normalizer.checkout_normalize(obj, entry.path) 

1646 st = build_file_from_blob( 

1647 obj, 

1648 entry.mode, 

1649 full_path, 

1650 honor_filemode=honor_filemode, 

1651 tree_encoding=tree_encoding, 

1652 symlink_fn=symlink_fn, 

1653 ) 

1654 

1655 # Add file to index 

1656 if not honor_filemode or S_ISGITLINK(entry.mode): 

1657 # we can not use tuple slicing to build a new tuple, 

1658 # because on windows that will convert the times to 

1659 # longs, which causes errors further along 

1660 st_tuple = ( 

1661 entry.mode, 

1662 st.st_ino, 

1663 st.st_dev, 

1664 st.st_nlink, 

1665 st.st_uid, 

1666 st.st_gid, 

1667 st.st_size, 

1668 st.st_atime, 

1669 st.st_mtime, 

1670 st.st_ctime, 

1671 ) 

1672 st = st.__class__(st_tuple) 

1673 # default to a stage 0 index entry (normal) 

1674 # when reading from the filesystem 

1675 index[entry.path] = index_entry_from_stat(st, entry.sha) 

1676 

1677 index.write() 

1678 

1679 

1680def blob_from_path_and_mode( 

1681 fs_path: bytes, mode: int, tree_encoding: str = "utf-8" 

1682) -> Blob: 

1683 """Create a blob from a path and a stat object. 

1684 

1685 Args: 

1686 fs_path: Full file system path to file 

1687 mode: File mode 

1688 tree_encoding: Encoding to use for tree contents 

1689 Returns: A `Blob` object 

1690 """ 

1691 assert isinstance(fs_path, bytes) 

1692 blob = Blob() 

1693 if stat.S_ISLNK(mode): 

1694 if sys.platform == "win32": 

1695 # os.readlink on Python3 on Windows requires a unicode string. 

1696 blob.data = os.readlink(os.fsdecode(fs_path)).encode(tree_encoding) 

1697 else: 

1698 blob.data = os.readlink(fs_path) 

1699 else: 

1700 with open(fs_path, "rb") as f: 

1701 blob.data = f.read() 

1702 return blob 

1703 

1704 

1705def blob_from_path_and_stat( 

1706 fs_path: bytes, st: os.stat_result, tree_encoding: str = "utf-8" 

1707) -> Blob: 

1708 """Create a blob from a path and a stat object. 

1709 

1710 Args: 

1711 fs_path: Full file system path to file 

1712 st: A stat object 

1713 tree_encoding: Encoding to use for tree contents 

1714 Returns: A `Blob` object 

1715 """ 

1716 return blob_from_path_and_mode(fs_path, st.st_mode, tree_encoding) 

1717 

1718 

1719def read_submodule_head(path: Union[str, bytes]) -> Optional[bytes]: 

1720 """Read the head commit of a submodule. 

1721 

1722 Args: 

1723 path: path to the submodule 

1724 Returns: HEAD sha, None if not a valid head/repository 

1725 """ 

1726 from .errors import NotGitRepository 

1727 from .repo import Repo 

1728 

1729 # Repo currently expects a "str", so decode if necessary. 

1730 # TODO(jelmer): Perhaps move this into Repo() ? 

1731 if not isinstance(path, str): 

1732 path = os.fsdecode(path) 

1733 try: 

1734 repo = Repo(path) 

1735 except NotGitRepository: 

1736 return None 

1737 try: 

1738 return repo.head() 

1739 except KeyError: 

1740 return None 

1741 

1742 

1743def _has_directory_changed(tree_path: bytes, entry: IndexEntry) -> bool: 

1744 """Check if a directory has changed after getting an error. 

1745 

1746 When handling an error trying to create a blob from a path, call this 

1747 function. It will check if the path is a directory. If it's a directory 

1748 and a submodule, check the submodule head to see if it's has changed. If 

1749 not, consider the file as changed as Git tracked a file and not a 

1750 directory. 

1751 

1752 Return true if the given path should be considered as changed and False 

1753 otherwise or if the path is not a directory. 

1754 """ 

1755 # This is actually a directory 

1756 if os.path.exists(os.path.join(tree_path, b".git")): 

1757 # Submodule 

1758 head = read_submodule_head(tree_path) 

1759 if entry.sha != head: 

1760 return True 

1761 else: 

1762 # The file was changed to a directory, so consider it removed. 

1763 return True 

1764 

1765 return False 

1766 

1767 

1768os_sep_bytes = os.sep.encode("ascii") 

1769 

1770 

1771def _ensure_parent_dir_exists(full_path: bytes) -> None: 

1772 """Ensure parent directory exists, checking no parent is a file.""" 

1773 parent_dir = os.path.dirname(full_path) 

1774 if parent_dir and not os.path.exists(parent_dir): 

1775 # Walk up the directory tree to find the first existing parent 

1776 current = parent_dir 

1777 parents_to_check: list[bytes] = [] 

1778 

1779 while current and not os.path.exists(current): 

1780 parents_to_check.insert(0, current) 

1781 new_parent = os.path.dirname(current) 

1782 if new_parent == current: 

1783 # Reached the root or can't go up further 

1784 break 

1785 current = new_parent 

1786 

1787 # Check if the existing parent (if any) is a directory 

1788 if current and os.path.exists(current) and not os.path.isdir(current): 

1789 raise OSError( 

1790 f"Cannot create directory, parent path is a file: {current!r}" 

1791 ) 

1792 

1793 # Now check each parent we need to create isn't blocked by an existing file 

1794 for parent_path in parents_to_check: 

1795 if os.path.exists(parent_path) and not os.path.isdir(parent_path): 

1796 raise OSError( 

1797 f"Cannot create directory, parent path is a file: {parent_path!r}" 

1798 ) 

1799 

1800 os.makedirs(parent_dir) 

1801 

1802 

1803def _remove_file_with_readonly_handling(path: bytes) -> None: 

1804 """Remove a file, handling read-only files on Windows. 

1805 

1806 Args: 

1807 path: Path to the file to remove 

1808 """ 

1809 try: 

1810 os.unlink(path) 

1811 except PermissionError: 

1812 # On Windows, remove read-only attribute and retry 

1813 if sys.platform == "win32": 

1814 os.chmod(path, stat.S_IWRITE | stat.S_IREAD) 

1815 os.unlink(path) 

1816 else: 

1817 raise 

1818 

1819 

1820def _remove_empty_parents(path: bytes, stop_at: bytes) -> None: 

1821 """Remove empty parent directories up to stop_at.""" 

1822 parent = os.path.dirname(path) 

1823 while parent and parent != stop_at: 

1824 try: 

1825 os.rmdir(parent) 

1826 parent = os.path.dirname(parent) 

1827 except FileNotFoundError: 

1828 # Directory doesn't exist - stop trying 

1829 break 

1830 except OSError as e: 

1831 if e.errno == errno.ENOTEMPTY: 

1832 # Directory not empty - stop trying 

1833 break 

1834 raise 

1835 

1836 

1837def _check_symlink_matches( 

1838 full_path: bytes, repo_object_store: "BaseObjectStore", entry_sha: bytes 

1839) -> bool: 

1840 """Check if symlink target matches expected target. 

1841 

1842 Returns True if symlink matches, False if it doesn't match. 

1843 """ 

1844 try: 

1845 current_target = os.readlink(full_path) 

1846 blob_obj = repo_object_store[entry_sha] 

1847 expected_target = blob_obj.as_raw_string() 

1848 if isinstance(current_target, str): 

1849 current_target = current_target.encode() 

1850 return current_target == expected_target 

1851 except FileNotFoundError: 

1852 # Symlink doesn't exist 

1853 return False 

1854 except OSError as e: 

1855 if e.errno == errno.EINVAL: 

1856 # Not a symlink 

1857 return False 

1858 raise 

1859 

1860 

1861def _check_file_matches( 

1862 repo_object_store: "BaseObjectStore", 

1863 full_path: bytes, 

1864 entry_sha: bytes, 

1865 entry_mode: int, 

1866 current_stat: os.stat_result, 

1867 honor_filemode: bool, 

1868 blob_normalizer: Optional["BlobNormalizer"] = None, 

1869 tree_path: Optional[bytes] = None, 

1870) -> bool: 

1871 """Check if a file on disk matches the expected git object. 

1872 

1873 Returns True if file matches, False if it doesn't match. 

1874 """ 

1875 # Check mode first (if honor_filemode is True) 

1876 if honor_filemode: 

1877 current_mode = stat.S_IMODE(current_stat.st_mode) 

1878 expected_mode = stat.S_IMODE(entry_mode) 

1879 

1880 # For regular files, only check the user executable bit, not group/other permissions 

1881 # This matches Git's behavior where umask differences don't count as modifications 

1882 if stat.S_ISREG(current_stat.st_mode): 

1883 # Normalize regular file modes to ignore group/other write permissions 

1884 current_mode_normalized = ( 

1885 current_mode & 0o755 

1886 ) # Keep only user rwx and all read+execute 

1887 expected_mode_normalized = expected_mode & 0o755 

1888 

1889 # For Git compatibility, regular files should be either 644 or 755 

1890 if expected_mode_normalized not in (0o644, 0o755): 

1891 expected_mode_normalized = 0o644 # Default for regular files 

1892 if current_mode_normalized not in (0o644, 0o755): 

1893 # Determine if it should be executable based on user execute bit 

1894 if current_mode & 0o100: # User execute bit is set 

1895 current_mode_normalized = 0o755 

1896 else: 

1897 current_mode_normalized = 0o644 

1898 

1899 if current_mode_normalized != expected_mode_normalized: 

1900 return False 

1901 else: 

1902 # For non-regular files (symlinks, etc.), check mode exactly 

1903 if current_mode != expected_mode: 

1904 return False 

1905 

1906 # If mode matches (or we don't care), check content via size first 

1907 blob_obj = repo_object_store[entry_sha] 

1908 if current_stat.st_size != blob_obj.raw_length(): 

1909 return False 

1910 

1911 # Size matches, check actual content 

1912 try: 

1913 with open(full_path, "rb") as f: 

1914 current_content = f.read() 

1915 expected_content = blob_obj.as_raw_string() 

1916 if blob_normalizer and tree_path is not None: 

1917 assert isinstance(blob_obj, Blob) 

1918 normalized_blob = blob_normalizer.checkout_normalize( 

1919 blob_obj, tree_path 

1920 ) 

1921 expected_content = normalized_blob.as_raw_string() 

1922 return current_content == expected_content 

1923 except (FileNotFoundError, PermissionError, IsADirectoryError): 

1924 return False 

1925 

1926 

1927def _transition_to_submodule( 

1928 repo: "Repo", 

1929 path: bytes, 

1930 full_path: bytes, 

1931 current_stat: Optional[os.stat_result], 

1932 entry: IndexEntry, 

1933 index: Index, 

1934) -> None: 

1935 """Transition any type to submodule.""" 

1936 from .submodule import ensure_submodule_placeholder 

1937 

1938 if current_stat is not None and stat.S_ISDIR(current_stat.st_mode): 

1939 # Already a directory, just ensure .git file exists 

1940 ensure_submodule_placeholder(repo, path) 

1941 else: 

1942 # Remove whatever is there and create submodule 

1943 if current_stat is not None: 

1944 _remove_file_with_readonly_handling(full_path) 

1945 ensure_submodule_placeholder(repo, path) 

1946 

1947 st = os.lstat(full_path) 

1948 index[path] = index_entry_from_stat(st, entry.sha) 

1949 

1950 

1951def _transition_to_file( 

1952 object_store: "BaseObjectStore", 

1953 path: bytes, 

1954 full_path: bytes, 

1955 current_stat: Optional[os.stat_result], 

1956 entry: IndexEntry, 

1957 index: Index, 

1958 honor_filemode: bool, 

1959 symlink_fn: Optional[ 

1960 Callable[[Union[str, bytes, os.PathLike], Union[str, bytes, os.PathLike]], None] 

1961 ], 

1962 blob_normalizer: Optional["BlobNormalizer"], 

1963 tree_encoding: str = "utf-8", 

1964) -> None: 

1965 """Transition any type to regular file or symlink.""" 

1966 # Check if we need to update 

1967 if ( 

1968 current_stat is not None 

1969 and stat.S_ISREG(current_stat.st_mode) 

1970 and not stat.S_ISLNK(entry.mode) 

1971 ): 

1972 # File to file - check if update needed 

1973 file_matches = _check_file_matches( 

1974 object_store, 

1975 full_path, 

1976 entry.sha, 

1977 entry.mode, 

1978 current_stat, 

1979 honor_filemode, 

1980 blob_normalizer, 

1981 path, 

1982 ) 

1983 needs_update = not file_matches 

1984 elif ( 

1985 current_stat is not None 

1986 and stat.S_ISLNK(current_stat.st_mode) 

1987 and stat.S_ISLNK(entry.mode) 

1988 ): 

1989 # Symlink to symlink - check if update needed 

1990 symlink_matches = _check_symlink_matches(full_path, object_store, entry.sha) 

1991 needs_update = not symlink_matches 

1992 else: 

1993 needs_update = True 

1994 

1995 if not needs_update: 

1996 # Just update index - current_stat should always be valid here since we're not updating 

1997 assert current_stat is not None 

1998 index[path] = index_entry_from_stat(current_stat, entry.sha) 

1999 return 

2000 

2001 # Remove existing entry if needed 

2002 if current_stat is not None and stat.S_ISDIR(current_stat.st_mode): 

2003 # Remove directory 

2004 dir_contents = set(os.listdir(full_path)) 

2005 git_file_name = b".git" if isinstance(full_path, bytes) else ".git" 

2006 

2007 if git_file_name in dir_contents: 

2008 if dir_contents != {git_file_name}: 

2009 raise IsADirectoryError( 

2010 f"Cannot replace submodule with untracked files: {full_path!r}" 

2011 ) 

2012 shutil.rmtree(full_path) 

2013 else: 

2014 try: 

2015 os.rmdir(full_path) 

2016 except OSError as e: 

2017 if e.errno == errno.ENOTEMPTY: 

2018 raise IsADirectoryError( 

2019 f"Cannot replace non-empty directory with file: {full_path!r}" 

2020 ) 

2021 raise 

2022 elif current_stat is not None: 

2023 _remove_file_with_readonly_handling(full_path) 

2024 

2025 # Ensure parent directory exists 

2026 _ensure_parent_dir_exists(full_path) 

2027 

2028 # Write the file 

2029 blob_obj = object_store[entry.sha] 

2030 assert isinstance(blob_obj, Blob) 

2031 if blob_normalizer: 

2032 blob_obj = blob_normalizer.checkout_normalize(blob_obj, path) 

2033 st = build_file_from_blob( 

2034 blob_obj, 

2035 entry.mode, 

2036 full_path, 

2037 honor_filemode=honor_filemode, 

2038 tree_encoding=tree_encoding, 

2039 symlink_fn=symlink_fn, 

2040 ) 

2041 index[path] = index_entry_from_stat(st, entry.sha) 

2042 

2043 

2044def _transition_to_absent( 

2045 repo: "Repo", 

2046 path: bytes, 

2047 full_path: bytes, 

2048 current_stat: Optional[os.stat_result], 

2049 index: Index, 

2050) -> None: 

2051 """Remove any type of entry.""" 

2052 if current_stat is None: 

2053 return 

2054 

2055 if stat.S_ISDIR(current_stat.st_mode): 

2056 # Check if it's a submodule directory 

2057 dir_contents = set(os.listdir(full_path)) 

2058 git_file_name = b".git" if isinstance(full_path, bytes) else ".git" 

2059 

2060 if git_file_name in dir_contents and dir_contents == {git_file_name}: 

2061 shutil.rmtree(full_path) 

2062 else: 

2063 try: 

2064 os.rmdir(full_path) 

2065 except OSError as e: 

2066 if e.errno not in (errno.ENOTEMPTY, errno.EEXIST): 

2067 raise 

2068 else: 

2069 _remove_file_with_readonly_handling(full_path) 

2070 

2071 try: 

2072 del index[path] 

2073 except KeyError: 

2074 pass 

2075 

2076 # Try to remove empty parent directories 

2077 _remove_empty_parents( 

2078 full_path, repo.path if isinstance(repo.path, bytes) else repo.path.encode() 

2079 ) 

2080 

2081 

2082def detect_case_only_renames( 

2083 changes: list["TreeChange"], 

2084 config: "Config", 

2085) -> list["TreeChange"]: 

2086 """Detect and transform case-only renames in a list of tree changes. 

2087 

2088 This function identifies file renames that only differ in case (e.g., 

2089 README.txt -> readme.txt) and transforms matching ADD/DELETE pairs into 

2090 CHANGE_RENAME operations. It uses filesystem-appropriate path normalization 

2091 based on the repository configuration. 

2092 

2093 Args: 

2094 changes: List of TreeChange objects representing file changes 

2095 config: Repository configuration object 

2096 

2097 Returns: 

2098 New list of TreeChange objects with case-only renames converted to CHANGE_RENAME 

2099 """ 

2100 from .diff_tree import ( 

2101 CHANGE_ADD, 

2102 CHANGE_COPY, 

2103 CHANGE_DELETE, 

2104 CHANGE_MODIFY, 

2105 CHANGE_RENAME, 

2106 TreeChange, 

2107 ) 

2108 

2109 # Build dictionaries of old and new paths with their normalized forms 

2110 old_paths_normalized = {} 

2111 new_paths_normalized = {} 

2112 old_changes = {} # Map from old path to change object 

2113 new_changes = {} # Map from new path to change object 

2114 

2115 # Get the appropriate normalizer based on config 

2116 normalize_func = get_path_element_normalizer(config) 

2117 

2118 def normalize_path(path: bytes) -> bytes: 

2119 """Normalize entire path using element normalization.""" 

2120 return b"/".join(normalize_func(part) for part in path.split(b"/")) 

2121 

2122 # Pre-normalize all paths once to avoid repeated normalization 

2123 for change in changes: 

2124 if change.type == CHANGE_DELETE and change.old: 

2125 try: 

2126 normalized = normalize_path(change.old.path) 

2127 except UnicodeDecodeError: 

2128 import logging 

2129 

2130 logging.warning( 

2131 "Skipping case-only rename detection for path with invalid UTF-8: %r", 

2132 change.old.path, 

2133 ) 

2134 else: 

2135 old_paths_normalized[normalized] = change.old.path 

2136 old_changes[change.old.path] = change 

2137 elif change.type == CHANGE_RENAME and change.old: 

2138 # Treat RENAME as DELETE + ADD for case-only detection 

2139 try: 

2140 normalized = normalize_path(change.old.path) 

2141 except UnicodeDecodeError: 

2142 import logging 

2143 

2144 logging.warning( 

2145 "Skipping case-only rename detection for path with invalid UTF-8: %r", 

2146 change.old.path, 

2147 ) 

2148 else: 

2149 old_paths_normalized[normalized] = change.old.path 

2150 old_changes[change.old.path] = change 

2151 

2152 if ( 

2153 change.type in (CHANGE_ADD, CHANGE_MODIFY, CHANGE_RENAME, CHANGE_COPY) 

2154 and change.new 

2155 ): 

2156 try: 

2157 normalized = normalize_path(change.new.path) 

2158 except UnicodeDecodeError: 

2159 import logging 

2160 

2161 logging.warning( 

2162 "Skipping case-only rename detection for path with invalid UTF-8: %r", 

2163 change.new.path, 

2164 ) 

2165 else: 

2166 new_paths_normalized[normalized] = change.new.path 

2167 new_changes[change.new.path] = change 

2168 

2169 # Find case-only renames and transform changes 

2170 case_only_renames = set() 

2171 new_rename_changes = [] 

2172 

2173 for norm_path, old_path in old_paths_normalized.items(): 

2174 if norm_path in new_paths_normalized: 

2175 new_path = new_paths_normalized[norm_path] 

2176 if old_path != new_path: 

2177 # Found a case-only rename 

2178 old_change = old_changes[old_path] 

2179 new_change = new_changes[new_path] 

2180 

2181 # Create a CHANGE_RENAME to replace the DELETE and ADD/MODIFY pair 

2182 if new_change.type == CHANGE_ADD: 

2183 # Simple case: DELETE + ADD becomes RENAME 

2184 rename_change = TreeChange( 

2185 CHANGE_RENAME, old_change.old, new_change.new 

2186 ) 

2187 else: 

2188 # Complex case: DELETE + MODIFY becomes RENAME 

2189 # Use the old file from DELETE and new file from MODIFY 

2190 rename_change = TreeChange( 

2191 CHANGE_RENAME, old_change.old, new_change.new 

2192 ) 

2193 

2194 new_rename_changes.append(rename_change) 

2195 

2196 # Mark the old changes for removal 

2197 case_only_renames.add(old_change) 

2198 case_only_renames.add(new_change) 

2199 

2200 # Return new list with original ADD/DELETE changes replaced by renames 

2201 result = [change for change in changes if change not in case_only_renames] 

2202 result.extend(new_rename_changes) 

2203 return result 

2204 

2205 

2206def update_working_tree( 

2207 repo: "Repo", 

2208 old_tree_id: Optional[bytes], 

2209 new_tree_id: bytes, 

2210 change_iterator: Iterator["TreeChange"], 

2211 honor_filemode: bool = True, 

2212 validate_path_element: Optional[Callable[[bytes], bool]] = None, 

2213 symlink_fn: Optional[ 

2214 Callable[[Union[str, bytes, os.PathLike], Union[str, bytes, os.PathLike]], None] 

2215 ] = None, 

2216 force_remove_untracked: bool = False, 

2217 blob_normalizer: Optional["BlobNormalizer"] = None, 

2218 tree_encoding: str = "utf-8", 

2219 allow_overwrite_modified: bool = False, 

2220) -> None: 

2221 """Update the working tree and index to match a new tree. 

2222 

2223 This function handles: 

2224 - Adding new files 

2225 - Updating modified files 

2226 - Removing deleted files 

2227 - Cleaning up empty directories 

2228 

2229 Args: 

2230 repo: Repository object 

2231 old_tree_id: SHA of the tree before the update 

2232 new_tree_id: SHA of the tree to update to 

2233 change_iterator: Iterator of TreeChange objects to apply 

2234 honor_filemode: An optional flag to honor core.filemode setting 

2235 validate_path_element: Function to validate path elements to check out 

2236 symlink_fn: Function to use for creating symlinks 

2237 force_remove_untracked: If True, remove files that exist in working 

2238 directory but not in target tree, even if old_tree_id is None 

2239 blob_normalizer: An optional BlobNormalizer to use for converting line 

2240 endings when writing blobs to the working directory. 

2241 tree_encoding: Encoding used for tree paths (default: utf-8) 

2242 allow_overwrite_modified: If False, raise an error when attempting to 

2243 overwrite files that have been modified compared to old_tree_id 

2244 """ 

2245 if validate_path_element is None: 

2246 validate_path_element = validate_path_element_default 

2247 

2248 from .diff_tree import ( 

2249 CHANGE_ADD, 

2250 CHANGE_COPY, 

2251 CHANGE_DELETE, 

2252 CHANGE_MODIFY, 

2253 CHANGE_RENAME, 

2254 CHANGE_UNCHANGED, 

2255 ) 

2256 

2257 repo_path = repo.path if isinstance(repo.path, bytes) else repo.path.encode() 

2258 index = repo.open_index() 

2259 

2260 # Convert iterator to list since we need multiple passes 

2261 changes = list(change_iterator) 

2262 

2263 # Transform case-only renames on case-insensitive filesystems 

2264 import platform 

2265 

2266 default_ignore_case = platform.system() in ("Windows", "Darwin") 

2267 config = repo.get_config() 

2268 ignore_case = config.get_boolean((b"core",), b"ignorecase", default_ignore_case) 

2269 

2270 if ignore_case: 

2271 config = repo.get_config() 

2272 changes = detect_case_only_renames(changes, config) 

2273 

2274 # Check for path conflicts where files need to become directories 

2275 paths_becoming_dirs = set() 

2276 for change in changes: 

2277 if change.type in (CHANGE_ADD, CHANGE_MODIFY, CHANGE_RENAME, CHANGE_COPY): 

2278 path = change.new.path 

2279 if b"/" in path: # This is a file inside a directory 

2280 # Check if any parent path exists as a file in the old tree or changes 

2281 parts = path.split(b"/") 

2282 for i in range(1, len(parts)): 

2283 parent = b"/".join(parts[:i]) 

2284 # See if this parent path is being deleted (was a file, becoming a dir) 

2285 for other_change in changes: 

2286 if ( 

2287 other_change.type == CHANGE_DELETE 

2288 and other_change.old 

2289 and other_change.old.path == parent 

2290 ): 

2291 paths_becoming_dirs.add(parent) 

2292 

2293 # Check if any path that needs to become a directory has been modified 

2294 for path in paths_becoming_dirs: 

2295 full_path = _tree_to_fs_path(repo_path, path, tree_encoding) 

2296 try: 

2297 current_stat = os.lstat(full_path) 

2298 except FileNotFoundError: 

2299 continue # File doesn't exist, nothing to check 

2300 except OSError as e: 

2301 raise OSError( 

2302 f"Cannot access {path.decode('utf-8', errors='replace')}: {e}" 

2303 ) from e 

2304 

2305 if stat.S_ISREG(current_stat.st_mode): 

2306 # Find the old entry for this path 

2307 old_change = None 

2308 for change in changes: 

2309 if ( 

2310 change.type == CHANGE_DELETE 

2311 and change.old 

2312 and change.old.path == path 

2313 ): 

2314 old_change = change 

2315 break 

2316 

2317 if old_change: 

2318 # Check if file has been modified 

2319 file_matches = _check_file_matches( 

2320 repo.object_store, 

2321 full_path, 

2322 old_change.old.sha, 

2323 old_change.old.mode, 

2324 current_stat, 

2325 honor_filemode, 

2326 blob_normalizer, 

2327 path, 

2328 ) 

2329 if not file_matches: 

2330 raise OSError( 

2331 f"Cannot replace modified file with directory: {path!r}" 

2332 ) 

2333 

2334 # Check for uncommitted modifications before making any changes 

2335 if not allow_overwrite_modified and old_tree_id: 

2336 for change in changes: 

2337 # Only check files that are being modified or deleted 

2338 if change.type in (CHANGE_MODIFY, CHANGE_DELETE) and change.old: 

2339 path = change.old.path 

2340 if path.startswith(b".git") or not validate_path( 

2341 path, validate_path_element 

2342 ): 

2343 continue 

2344 

2345 full_path = _tree_to_fs_path(repo_path, path, tree_encoding) 

2346 try: 

2347 current_stat = os.lstat(full_path) 

2348 except FileNotFoundError: 

2349 continue # File doesn't exist, nothing to check 

2350 except OSError as e: 

2351 raise OSError( 

2352 f"Cannot access {path.decode('utf-8', errors='replace')}: {e}" 

2353 ) from e 

2354 

2355 if stat.S_ISREG(current_stat.st_mode): 

2356 # Check if working tree file differs from old tree 

2357 file_matches = _check_file_matches( 

2358 repo.object_store, 

2359 full_path, 

2360 change.old.sha, 

2361 change.old.mode, 

2362 current_stat, 

2363 honor_filemode, 

2364 blob_normalizer, 

2365 path, 

2366 ) 

2367 if not file_matches: 

2368 from .errors import WorkingTreeModifiedError 

2369 

2370 raise WorkingTreeModifiedError( 

2371 f"Your local changes to '{path.decode('utf-8', errors='replace')}' " 

2372 f"would be overwritten by checkout. " 

2373 f"Please commit your changes or stash them before you switch branches." 

2374 ) 

2375 

2376 # Apply the changes 

2377 for change in changes: 

2378 if change.type in (CHANGE_DELETE, CHANGE_RENAME): 

2379 # Remove file/directory 

2380 path = change.old.path 

2381 if path.startswith(b".git") or not validate_path( 

2382 path, validate_path_element 

2383 ): 

2384 continue 

2385 

2386 full_path = _tree_to_fs_path(repo_path, path, tree_encoding) 

2387 try: 

2388 delete_stat: Optional[os.stat_result] = os.lstat(full_path) 

2389 except FileNotFoundError: 

2390 delete_stat = None 

2391 except OSError as e: 

2392 raise OSError( 

2393 f"Cannot access {path.decode('utf-8', errors='replace')}: {e}" 

2394 ) from e 

2395 

2396 _transition_to_absent(repo, path, full_path, delete_stat, index) 

2397 

2398 if change.type in ( 

2399 CHANGE_ADD, 

2400 CHANGE_MODIFY, 

2401 CHANGE_UNCHANGED, 

2402 CHANGE_COPY, 

2403 CHANGE_RENAME, 

2404 ): 

2405 # Add or modify file 

2406 path = change.new.path 

2407 if path.startswith(b".git") or not validate_path( 

2408 path, validate_path_element 

2409 ): 

2410 continue 

2411 

2412 full_path = _tree_to_fs_path(repo_path, path, tree_encoding) 

2413 try: 

2414 modify_stat: Optional[os.stat_result] = os.lstat(full_path) 

2415 except FileNotFoundError: 

2416 modify_stat = None 

2417 except OSError as e: 

2418 raise OSError( 

2419 f"Cannot access {path.decode('utf-8', errors='replace')}: {e}" 

2420 ) from e 

2421 

2422 if S_ISGITLINK(change.new.mode): 

2423 _transition_to_submodule( 

2424 repo, path, full_path, modify_stat, change.new, index 

2425 ) 

2426 else: 

2427 _transition_to_file( 

2428 repo.object_store, 

2429 path, 

2430 full_path, 

2431 modify_stat, 

2432 change.new, 

2433 index, 

2434 honor_filemode, 

2435 symlink_fn, 

2436 blob_normalizer, 

2437 tree_encoding, 

2438 ) 

2439 

2440 index.write() 

2441 

2442 

2443def get_unstaged_changes( 

2444 index: Index, 

2445 root_path: Union[str, bytes], 

2446 filter_blob_callback: Optional[Callable] = None, 

2447) -> Generator[bytes, None, None]: 

2448 """Walk through an index and check for differences against working tree. 

2449 

2450 Args: 

2451 index: index to check 

2452 root_path: path in which to find files 

2453 filter_blob_callback: Optional callback to filter blobs 

2454 Returns: iterator over paths with unstaged changes 

2455 """ 

2456 # For each entry in the index check the sha1 & ensure not staged 

2457 if not isinstance(root_path, bytes): 

2458 root_path = os.fsencode(root_path) 

2459 

2460 for tree_path, entry in index.iteritems(): 

2461 full_path = _tree_to_fs_path(root_path, tree_path) 

2462 if isinstance(entry, ConflictedIndexEntry): 

2463 # Conflicted files are always unstaged 

2464 yield tree_path 

2465 continue 

2466 

2467 try: 

2468 st = os.lstat(full_path) 

2469 if stat.S_ISDIR(st.st_mode): 

2470 if _has_directory_changed(tree_path, entry): 

2471 yield tree_path 

2472 continue 

2473 

2474 if not stat.S_ISREG(st.st_mode) and not stat.S_ISLNK(st.st_mode): 

2475 continue 

2476 

2477 blob = blob_from_path_and_stat(full_path, st) 

2478 

2479 if filter_blob_callback is not None: 

2480 blob = filter_blob_callback(blob, tree_path) 

2481 except FileNotFoundError: 

2482 # The file was removed, so we assume that counts as 

2483 # different from whatever file used to exist. 

2484 yield tree_path 

2485 else: 

2486 if blob.id != entry.sha: 

2487 yield tree_path 

2488 

2489 

2490def _tree_to_fs_path( 

2491 root_path: bytes, tree_path: bytes, tree_encoding: str = "utf-8" 

2492) -> bytes: 

2493 """Convert a git tree path to a file system path. 

2494 

2495 Args: 

2496 root_path: Root filesystem path 

2497 tree_path: Git tree path as bytes (encoded with tree_encoding) 

2498 tree_encoding: Encoding used for tree paths (default: utf-8) 

2499 

2500 Returns: File system path. 

2501 """ 

2502 assert isinstance(tree_path, bytes) 

2503 if os_sep_bytes != b"/": 

2504 sep_corrected_path = tree_path.replace(b"/", os_sep_bytes) 

2505 else: 

2506 sep_corrected_path = tree_path 

2507 

2508 # On Windows, we need to handle tree path encoding properly 

2509 if sys.platform == "win32": 

2510 # Decode from tree encoding, then re-encode for filesystem 

2511 try: 

2512 tree_path_str = sep_corrected_path.decode(tree_encoding) 

2513 sep_corrected_path = os.fsencode(tree_path_str) 

2514 except UnicodeDecodeError: 

2515 # If decoding fails, use the original bytes 

2516 pass 

2517 

2518 return os.path.join(root_path, sep_corrected_path) 

2519 

2520 

2521def _fs_to_tree_path(fs_path: Union[str, bytes], tree_encoding: str = "utf-8") -> bytes: 

2522 """Convert a file system path to a git tree path. 

2523 

2524 Args: 

2525 fs_path: File system path. 

2526 tree_encoding: Encoding to use for tree paths (default: utf-8) 

2527 

2528 Returns: Git tree path as bytes (encoded with tree_encoding) 

2529 """ 

2530 if not isinstance(fs_path, bytes): 

2531 fs_path_bytes = os.fsencode(fs_path) 

2532 else: 

2533 fs_path_bytes = fs_path 

2534 

2535 # On Windows, we need to ensure tree paths are properly encoded 

2536 if sys.platform == "win32": 

2537 try: 

2538 # Decode from filesystem encoding, then re-encode with tree encoding 

2539 fs_path_str = os.fsdecode(fs_path_bytes) 

2540 fs_path_bytes = fs_path_str.encode(tree_encoding) 

2541 except UnicodeDecodeError: 

2542 # If filesystem decoding fails, use the original bytes 

2543 pass 

2544 

2545 if os_sep_bytes != b"/": 

2546 tree_path = fs_path_bytes.replace(os_sep_bytes, b"/") 

2547 else: 

2548 tree_path = fs_path_bytes 

2549 return tree_path 

2550 

2551 

2552def index_entry_from_directory(st: os.stat_result, path: bytes) -> Optional[IndexEntry]: 

2553 """Create an index entry for a directory. 

2554 

2555 This is only used for submodules (directories containing .git). 

2556 

2557 Args: 

2558 st: Stat result for the directory 

2559 path: Path to the directory 

2560 

2561 Returns: 

2562 IndexEntry for a submodule, or None if not a submodule 

2563 """ 

2564 if os.path.exists(os.path.join(path, b".git")): 

2565 head = read_submodule_head(path) 

2566 if head is None: 

2567 return None 

2568 return index_entry_from_stat(st, head, mode=S_IFGITLINK) 

2569 return None 

2570 

2571 

2572def index_entry_from_path( 

2573 path: bytes, object_store: Optional[ObjectContainer] = None 

2574) -> Optional[IndexEntry]: 

2575 """Create an index from a filesystem path. 

2576 

2577 This returns an index value for files, symlinks 

2578 and tree references. for directories and 

2579 non-existent files it returns None 

2580 

2581 Args: 

2582 path: Path to create an index entry for 

2583 object_store: Optional object store to 

2584 save new blobs in 

2585 Returns: An index entry; None for directories 

2586 """ 

2587 assert isinstance(path, bytes) 

2588 st = os.lstat(path) 

2589 if stat.S_ISDIR(st.st_mode): 

2590 return index_entry_from_directory(st, path) 

2591 

2592 if stat.S_ISREG(st.st_mode) or stat.S_ISLNK(st.st_mode): 

2593 blob = blob_from_path_and_stat(path, st) 

2594 if object_store is not None: 

2595 object_store.add_object(blob) 

2596 return index_entry_from_stat(st, blob.id) 

2597 

2598 return None 

2599 

2600 

2601def iter_fresh_entries( 

2602 paths: Iterable[bytes], 

2603 root_path: bytes, 

2604 object_store: Optional[ObjectContainer] = None, 

2605) -> Iterator[tuple[bytes, Optional[IndexEntry]]]: 

2606 """Iterate over current versions of index entries on disk. 

2607 

2608 Args: 

2609 paths: Paths to iterate over 

2610 root_path: Root path to access from 

2611 object_store: Optional store to save new blobs in 

2612 Returns: Iterator over path, index_entry 

2613 """ 

2614 for path in paths: 

2615 p = _tree_to_fs_path(root_path, path) 

2616 try: 

2617 entry = index_entry_from_path(p, object_store=object_store) 

2618 except (FileNotFoundError, IsADirectoryError): 

2619 entry = None 

2620 yield path, entry 

2621 

2622 

2623def iter_fresh_objects( 

2624 paths: Iterable[bytes], 

2625 root_path: bytes, 

2626 include_deleted: bool = False, 

2627 object_store: Optional[ObjectContainer] = None, 

2628) -> Iterator[tuple[bytes, Optional[bytes], Optional[int]]]: 

2629 """Iterate over versions of objects on disk referenced by index. 

2630 

2631 Args: 

2632 paths: Paths to check 

2633 root_path: Root path to access from 

2634 include_deleted: Include deleted entries with sha and 

2635 mode set to None 

2636 object_store: Optional object store to report new items to 

2637 Returns: Iterator over path, sha, mode 

2638 """ 

2639 for path, entry in iter_fresh_entries(paths, root_path, object_store=object_store): 

2640 if entry is None: 

2641 if include_deleted: 

2642 yield path, None, None 

2643 else: 

2644 yield path, entry.sha, cleanup_mode(entry.mode) 

2645 

2646 

2647def refresh_index(index: Index, root_path: bytes) -> None: 

2648 """Refresh the contents of an index. 

2649 

2650 This is the equivalent to running 'git commit -a'. 

2651 

2652 Args: 

2653 index: Index to update 

2654 root_path: Root filesystem path 

2655 """ 

2656 for path, entry in iter_fresh_entries(index, root_path): 

2657 if entry: 

2658 index[path] = entry 

2659 

2660 

2661class locked_index: 

2662 """Lock the index while making modifications. 

2663 

2664 Works as a context manager. 

2665 """ 

2666 

2667 _file: "_GitFile" 

2668 

2669 def __init__(self, path: Union[bytes, str]) -> None: 

2670 """Initialize locked_index.""" 

2671 self._path = path 

2672 

2673 def __enter__(self) -> Index: 

2674 """Enter context manager and lock index.""" 

2675 f = GitFile(self._path, "wb") 

2676 assert isinstance(f, _GitFile) # GitFile in write mode always returns _GitFile 

2677 self._file = f 

2678 self._index = Index(self._path) 

2679 return self._index 

2680 

2681 def __exit__( 

2682 self, 

2683 exc_type: Optional[type], 

2684 exc_value: Optional[BaseException], 

2685 traceback: Optional[types.TracebackType], 

2686 ) -> None: 

2687 """Exit context manager and unlock index.""" 

2688 if exc_type is not None: 

2689 self._file.abort() 

2690 return 

2691 try: 

2692 f = SHA1Writer(self._file) 

2693 write_index_dict(f, self._index._byname) 

2694 except BaseException: 

2695 self._file.abort() 

2696 else: 

2697 f.close()