Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/dulwich/index.py: 32%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

1105 statements  

1# index.py -- File parser/writer for the git index file 

2# Copyright (C) 2008-2013 Jelmer Vernooij <jelmer@jelmer.uk> 

3# 

4# SPDX-License-Identifier: Apache-2.0 OR GPL-2.0-or-later 

5# Dulwich is dual-licensed under the Apache License, Version 2.0 and the GNU 

6# General Public License as published by the Free Software Foundation; version 2.0 

7# or (at your option) any later version. You can redistribute it and/or 

8# modify it under the terms of either of these two licenses. 

9# 

10# Unless required by applicable law or agreed to in writing, software 

11# distributed under the License is distributed on an "AS IS" BASIS, 

12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

13# See the License for the specific language governing permissions and 

14# limitations under the License. 

15# 

16# You should have received a copy of the licenses; if not, see 

17# <http://www.gnu.org/licenses/> for a copy of the GNU General Public License 

18# and <http://www.apache.org/licenses/LICENSE-2.0> for a copy of the Apache 

19# License, Version 2.0. 

20# 

21 

22"""Parser for the git index file format.""" 

23 

24import errno 

25import os 

26import shutil 

27import stat 

28import struct 

29import sys 

30import types 

31from collections.abc import Generator, Iterable, Iterator, Mapping, Sequence 

32from dataclasses import dataclass 

33from enum import Enum 

34from typing import ( 

35 IO, 

36 TYPE_CHECKING, 

37 Any, 

38 BinaryIO, 

39 Callable, 

40 Optional, 

41 Union, 

42) 

43 

44if TYPE_CHECKING: 

45 from .config import Config 

46 from .diff_tree import TreeChange 

47 from .file import _GitFile 

48 from .filters import FilterBlobNormalizer 

49 from .object_store import BaseObjectStore 

50 from .repo import Repo 

51 

52from .file import GitFile 

53from .object_store import iter_tree_contents 

54from .objects import ( 

55 S_IFGITLINK, 

56 S_ISGITLINK, 

57 Blob, 

58 ObjectID, 

59 Tree, 

60 TreeEntry, 

61 hex_to_sha, 

62 sha_to_hex, 

63) 

64from .pack import ObjectContainer, SHA1Reader, SHA1Writer 

65 

66# Type alias for recursive tree structure used in commit_tree 

67if sys.version_info >= (3, 10): 

68 TreeDict = dict[bytes, Union["TreeDict", tuple[int, bytes]]] 

69else: 

70 TreeDict = dict[bytes, Any] 

71 

72# 2-bit stage (during merge) 

73FLAG_STAGEMASK = 0x3000 

74FLAG_STAGESHIFT = 12 

75FLAG_NAMEMASK = 0x0FFF 

76 

77# assume-valid 

78FLAG_VALID = 0x8000 

79 

80# extended flag (must be zero in version 2) 

81FLAG_EXTENDED = 0x4000 

82 

83# used by sparse checkout 

84EXTENDED_FLAG_SKIP_WORKTREE = 0x4000 

85 

86# used by "git add -N" 

87EXTENDED_FLAG_INTEND_TO_ADD = 0x2000 

88 

89DEFAULT_VERSION = 2 

90 

91# Index extension signatures 

92TREE_EXTENSION = b"TREE" 

93REUC_EXTENSION = b"REUC" 

94UNTR_EXTENSION = b"UNTR" 

95EOIE_EXTENSION = b"EOIE" 

96IEOT_EXTENSION = b"IEOT" 

97 

98 

99def _encode_varint(value: int) -> bytes: 

100 """Encode an integer using variable-width encoding. 

101 

102 Same format as used for OFS_DELTA pack entries and index v4 path compression. 

103 Uses 7 bits per byte, with the high bit indicating continuation. 

104 

105 Args: 

106 value: Integer to encode 

107 Returns: 

108 Encoded bytes 

109 """ 

110 if value == 0: 

111 return b"\x00" 

112 

113 result = [] 

114 while value > 0: 

115 byte = value & 0x7F # Take lower 7 bits 

116 value >>= 7 

117 if value > 0: 

118 byte |= 0x80 # Set continuation bit 

119 result.append(byte) 

120 

121 return bytes(result) 

122 

123 

124def _decode_varint(data: bytes, offset: int = 0) -> tuple[int, int]: 

125 """Decode a variable-width encoded integer. 

126 

127 Args: 

128 data: Bytes to decode from 

129 offset: Starting offset in data 

130 Returns: 

131 tuple of (decoded_value, new_offset) 

132 """ 

133 value = 0 

134 shift = 0 

135 pos = offset 

136 

137 while pos < len(data): 

138 byte = data[pos] 

139 pos += 1 

140 value |= (byte & 0x7F) << shift 

141 shift += 7 

142 if not (byte & 0x80): # No continuation bit 

143 break 

144 

145 return value, pos 

146 

147 

148def _compress_path(path: bytes, previous_path: bytes) -> bytes: 

149 """Compress a path relative to the previous path for index version 4. 

150 

151 Args: 

152 path: Path to compress 

153 previous_path: Previous path for comparison 

154 Returns: 

155 Compressed path data (varint prefix_len + suffix) 

156 """ 

157 # Find the common prefix length 

158 common_len = 0 

159 min_len = min(len(path), len(previous_path)) 

160 

161 for i in range(min_len): 

162 if path[i] == previous_path[i]: 

163 common_len += 1 

164 else: 

165 break 

166 

167 # The number of bytes to remove from the end of previous_path 

168 # to get the common prefix 

169 remove_len = len(previous_path) - common_len 

170 

171 # The suffix to append 

172 suffix = path[common_len:] 

173 

174 # Encode: varint(remove_len) + suffix + NUL 

175 return _encode_varint(remove_len) + suffix + b"\x00" 

176 

177 

178def _decompress_path( 

179 data: bytes, offset: int, previous_path: bytes 

180) -> tuple[bytes, int]: 

181 """Decompress a path from index version 4 compressed format. 

182 

183 Args: 

184 data: Raw data containing compressed path 

185 offset: Starting offset in data 

186 previous_path: Previous path for decompression 

187 Returns: 

188 tuple of (decompressed_path, new_offset) 

189 """ 

190 # Decode the number of bytes to remove from previous path 

191 remove_len, new_offset = _decode_varint(data, offset) 

192 

193 # Find the NUL terminator for the suffix 

194 suffix_start = new_offset 

195 suffix_end = suffix_start 

196 while suffix_end < len(data) and data[suffix_end] != 0: 

197 suffix_end += 1 

198 

199 if suffix_end >= len(data): 

200 raise ValueError("Unterminated path suffix in compressed entry") 

201 

202 suffix = data[suffix_start:suffix_end] 

203 new_offset = suffix_end + 1 # Skip the NUL terminator 

204 

205 # Reconstruct the path 

206 if remove_len > len(previous_path): 

207 raise ValueError( 

208 f"Invalid path compression: trying to remove {remove_len} bytes from {len(previous_path)}-byte path" 

209 ) 

210 

211 prefix = previous_path[:-remove_len] if remove_len > 0 else previous_path 

212 path = prefix + suffix 

213 

214 return path, new_offset 

215 

216 

217def _decompress_path_from_stream( 

218 f: BinaryIO, previous_path: bytes 

219) -> tuple[bytes, int]: 

220 """Decompress a path from index version 4 compressed format, reading from stream. 

221 

222 Args: 

223 f: File-like object to read from 

224 previous_path: Previous path for decompression 

225 Returns: 

226 tuple of (decompressed_path, bytes_consumed) 

227 """ 

228 # Decode the varint for remove_len by reading byte by byte 

229 remove_len = 0 

230 shift = 0 

231 bytes_consumed = 0 

232 

233 while True: 

234 byte_data = f.read(1) 

235 if not byte_data: 

236 raise ValueError("Unexpected end of file while reading varint") 

237 byte = byte_data[0] 

238 bytes_consumed += 1 

239 remove_len |= (byte & 0x7F) << shift 

240 shift += 7 

241 if not (byte & 0x80): # No continuation bit 

242 break 

243 

244 # Read the suffix until NUL terminator 

245 suffix = b"" 

246 while True: 

247 byte_data = f.read(1) 

248 if not byte_data: 

249 raise ValueError("Unexpected end of file while reading path suffix") 

250 byte = byte_data[0] 

251 bytes_consumed += 1 

252 if byte == 0: # NUL terminator 

253 break 

254 suffix += bytes([byte]) 

255 

256 # Reconstruct the path 

257 if remove_len > len(previous_path): 

258 raise ValueError( 

259 f"Invalid path compression: trying to remove {remove_len} bytes from {len(previous_path)}-byte path" 

260 ) 

261 

262 prefix = previous_path[:-remove_len] if remove_len > 0 else previous_path 

263 path = prefix + suffix 

264 

265 return path, bytes_consumed 

266 

267 

268class Stage(Enum): 

269 """Represents the stage of an index entry during merge conflicts.""" 

270 

271 NORMAL = 0 

272 MERGE_CONFLICT_ANCESTOR = 1 

273 MERGE_CONFLICT_THIS = 2 

274 MERGE_CONFLICT_OTHER = 3 

275 

276 

277@dataclass 

278class SerializedIndexEntry: 

279 """Represents a serialized index entry as stored in the index file. 

280 

281 This dataclass holds the raw data for an index entry before it's 

282 parsed into the more user-friendly IndexEntry format. 

283 """ 

284 

285 name: bytes 

286 ctime: Union[int, float, tuple[int, int]] 

287 mtime: Union[int, float, tuple[int, int]] 

288 dev: int 

289 ino: int 

290 mode: int 

291 uid: int 

292 gid: int 

293 size: int 

294 sha: bytes 

295 flags: int 

296 extended_flags: int 

297 

298 def stage(self) -> Stage: 

299 """Extract the stage from the flags field. 

300 

301 Returns: 

302 Stage enum value indicating merge conflict state 

303 """ 

304 return Stage((self.flags & FLAG_STAGEMASK) >> FLAG_STAGESHIFT) 

305 

306 

307@dataclass 

308class IndexExtension: 

309 """Base class for index extensions.""" 

310 

311 signature: bytes 

312 data: bytes 

313 

314 @classmethod 

315 def from_raw(cls, signature: bytes, data: bytes) -> "IndexExtension": 

316 """Create an extension from raw data. 

317 

318 Args: 

319 signature: 4-byte extension signature 

320 data: Extension data 

321 Returns: 

322 Parsed extension object 

323 """ 

324 if signature == TREE_EXTENSION: 

325 return TreeExtension.from_bytes(data) 

326 elif signature == REUC_EXTENSION: 

327 return ResolveUndoExtension.from_bytes(data) 

328 elif signature == UNTR_EXTENSION: 

329 return UntrackedExtension.from_bytes(data) 

330 else: 

331 # Unknown extension - just store raw data 

332 return cls(signature, data) 

333 

334 def to_bytes(self) -> bytes: 

335 """Serialize extension to bytes.""" 

336 return self.data 

337 

338 

339class TreeExtension(IndexExtension): 

340 """Tree cache extension.""" 

341 

342 def __init__(self, entries: list[tuple[bytes, bytes, int]]) -> None: 

343 """Initialize TreeExtension. 

344 

345 Args: 

346 entries: List of tree cache entries (path, sha, flags) 

347 """ 

348 self.entries = entries 

349 super().__init__(TREE_EXTENSION, b"") 

350 

351 @classmethod 

352 def from_bytes(cls, data: bytes) -> "TreeExtension": 

353 """Parse TreeExtension from bytes. 

354 

355 Args: 

356 data: Raw bytes to parse 

357 

358 Returns: 

359 TreeExtension instance 

360 """ 

361 # TODO: Implement tree cache parsing 

362 return cls([]) 

363 

364 def to_bytes(self) -> bytes: 

365 """Serialize TreeExtension to bytes. 

366 

367 Returns: 

368 Serialized extension data 

369 """ 

370 # TODO: Implement tree cache serialization 

371 return b"" 

372 

373 

374class ResolveUndoExtension(IndexExtension): 

375 """Resolve undo extension for recording merge conflicts.""" 

376 

377 def __init__(self, entries: list[tuple[bytes, list[tuple[int, bytes]]]]) -> None: 

378 """Initialize ResolveUndoExtension. 

379 

380 Args: 

381 entries: List of (path, stages) where stages is a list of (stage, sha) tuples 

382 """ 

383 self.entries = entries 

384 super().__init__(REUC_EXTENSION, b"") 

385 

386 @classmethod 

387 def from_bytes(cls, data: bytes) -> "ResolveUndoExtension": 

388 """Parse ResolveUndoExtension from bytes. 

389 

390 Args: 

391 data: Raw bytes to parse 

392 

393 Returns: 

394 ResolveUndoExtension instance 

395 """ 

396 # TODO: Implement resolve undo parsing 

397 return cls([]) 

398 

399 def to_bytes(self) -> bytes: 

400 """Serialize ResolveUndoExtension to bytes. 

401 

402 Returns: 

403 Serialized extension data 

404 """ 

405 # TODO: Implement resolve undo serialization 

406 return b"" 

407 

408 

409class UntrackedExtension(IndexExtension): 

410 """Untracked cache extension.""" 

411 

412 def __init__(self, data: bytes) -> None: 

413 """Initialize UntrackedExtension. 

414 

415 Args: 

416 data: Raw untracked cache data 

417 """ 

418 super().__init__(UNTR_EXTENSION, data) 

419 

420 @classmethod 

421 def from_bytes(cls, data: bytes) -> "UntrackedExtension": 

422 """Parse UntrackedExtension from bytes. 

423 

424 Args: 

425 data: Raw bytes to parse 

426 

427 Returns: 

428 UntrackedExtension instance 

429 """ 

430 return cls(data) 

431 

432 

433@dataclass 

434class IndexEntry: 

435 """Represents an entry in the Git index. 

436 

437 This is a higher-level representation of an index entry that includes 

438 parsed data and convenience methods. 

439 """ 

440 

441 ctime: Union[int, float, tuple[int, int]] 

442 mtime: Union[int, float, tuple[int, int]] 

443 dev: int 

444 ino: int 

445 mode: int 

446 uid: int 

447 gid: int 

448 size: int 

449 sha: bytes 

450 flags: int = 0 

451 extended_flags: int = 0 

452 

453 @classmethod 

454 def from_serialized(cls, serialized: SerializedIndexEntry) -> "IndexEntry": 

455 """Create an IndexEntry from a SerializedIndexEntry. 

456 

457 Args: 

458 serialized: SerializedIndexEntry to convert 

459 

460 Returns: 

461 New IndexEntry instance 

462 """ 

463 return cls( 

464 ctime=serialized.ctime, 

465 mtime=serialized.mtime, 

466 dev=serialized.dev, 

467 ino=serialized.ino, 

468 mode=serialized.mode, 

469 uid=serialized.uid, 

470 gid=serialized.gid, 

471 size=serialized.size, 

472 sha=serialized.sha, 

473 flags=serialized.flags, 

474 extended_flags=serialized.extended_flags, 

475 ) 

476 

477 def serialize(self, name: bytes, stage: Stage) -> SerializedIndexEntry: 

478 """Serialize this entry with a given name and stage. 

479 

480 Args: 

481 name: Path name for the entry 

482 stage: Merge conflict stage 

483 

484 Returns: 

485 SerializedIndexEntry ready for writing to disk 

486 """ 

487 # Clear out any existing stage bits, then set them from the Stage. 

488 new_flags = self.flags & ~FLAG_STAGEMASK 

489 new_flags |= stage.value << FLAG_STAGESHIFT 

490 return SerializedIndexEntry( 

491 name=name, 

492 ctime=self.ctime, 

493 mtime=self.mtime, 

494 dev=self.dev, 

495 ino=self.ino, 

496 mode=self.mode, 

497 uid=self.uid, 

498 gid=self.gid, 

499 size=self.size, 

500 sha=self.sha, 

501 flags=new_flags, 

502 extended_flags=self.extended_flags, 

503 ) 

504 

505 def stage(self) -> Stage: 

506 """Get the merge conflict stage of this entry. 

507 

508 Returns: 

509 Stage enum value 

510 """ 

511 return Stage((self.flags & FLAG_STAGEMASK) >> FLAG_STAGESHIFT) 

512 

513 @property 

514 def skip_worktree(self) -> bool: 

515 """Return True if the skip-worktree bit is set in extended_flags.""" 

516 return bool(self.extended_flags & EXTENDED_FLAG_SKIP_WORKTREE) 

517 

518 def set_skip_worktree(self, skip: bool = True) -> None: 

519 """Helper method to set or clear the skip-worktree bit in extended_flags. 

520 

521 Also sets FLAG_EXTENDED in self.flags if needed. 

522 """ 

523 if skip: 

524 # Turn on the skip-worktree bit 

525 self.extended_flags |= EXTENDED_FLAG_SKIP_WORKTREE 

526 # Also ensure the main 'extended' bit is set in flags 

527 self.flags |= FLAG_EXTENDED 

528 else: 

529 # Turn off the skip-worktree bit 

530 self.extended_flags &= ~EXTENDED_FLAG_SKIP_WORKTREE 

531 # Optionally unset the main extended bit if no extended flags remain 

532 if self.extended_flags == 0: 

533 self.flags &= ~FLAG_EXTENDED 

534 

535 

536class ConflictedIndexEntry: 

537 """Index entry that represents a conflict.""" 

538 

539 ancestor: Optional[IndexEntry] 

540 this: Optional[IndexEntry] 

541 other: Optional[IndexEntry] 

542 

543 def __init__( 

544 self, 

545 ancestor: Optional[IndexEntry] = None, 

546 this: Optional[IndexEntry] = None, 

547 other: Optional[IndexEntry] = None, 

548 ) -> None: 

549 """Initialize ConflictedIndexEntry. 

550 

551 Args: 

552 ancestor: The common ancestor entry 

553 this: The current branch entry 

554 other: The other branch entry 

555 """ 

556 self.ancestor = ancestor 

557 self.this = this 

558 self.other = other 

559 

560 

561class UnmergedEntries(Exception): 

562 """Unmerged entries exist in the index.""" 

563 

564 

565def pathsplit(path: bytes) -> tuple[bytes, bytes]: 

566 """Split a /-delimited path into a directory part and a basename. 

567 

568 Args: 

569 path: The path to split. 

570 

571 Returns: 

572 Tuple with directory name and basename 

573 """ 

574 try: 

575 (dirname, basename) = path.rsplit(b"/", 1) 

576 except ValueError: 

577 return (b"", path) 

578 else: 

579 return (dirname, basename) 

580 

581 

582def pathjoin(*args: bytes) -> bytes: 

583 """Join a /-delimited path.""" 

584 return b"/".join([p for p in args if p]) 

585 

586 

587def read_cache_time(f: BinaryIO) -> tuple[int, int]: 

588 """Read a cache time. 

589 

590 Args: 

591 f: File-like object to read from 

592 Returns: 

593 Tuple with seconds and nanoseconds 

594 """ 

595 return struct.unpack(">LL", f.read(8)) 

596 

597 

598def write_cache_time(f: IO[bytes], t: Union[int, float, tuple[int, int]]) -> None: 

599 """Write a cache time. 

600 

601 Args: 

602 f: File-like object to write to 

603 t: Time to write (as int, float or tuple with secs and nsecs) 

604 """ 

605 if isinstance(t, int): 

606 t = (t, 0) 

607 elif isinstance(t, float): 

608 (secs, nsecs) = divmod(t, 1.0) 

609 t = (int(secs), int(nsecs * 1000000000)) 

610 elif not isinstance(t, tuple): 

611 raise TypeError(t) 

612 f.write(struct.pack(">LL", *t)) 

613 

614 

615def read_cache_entry( 

616 f: BinaryIO, version: int, previous_path: bytes = b"" 

617) -> SerializedIndexEntry: 

618 """Read an entry from a cache file. 

619 

620 Args: 

621 f: File-like object to read from 

622 version: Index version 

623 previous_path: Previous entry's path (for version 4 compression) 

624 """ 

625 beginoffset = f.tell() 

626 ctime = read_cache_time(f) 

627 mtime = read_cache_time(f) 

628 ( 

629 dev, 

630 ino, 

631 mode, 

632 uid, 

633 gid, 

634 size, 

635 sha, 

636 flags, 

637 ) = struct.unpack(">LLLLLL20sH", f.read(20 + 4 * 6 + 2)) 

638 if flags & FLAG_EXTENDED: 

639 if version < 3: 

640 raise AssertionError("extended flag set in index with version < 3") 

641 (extended_flags,) = struct.unpack(">H", f.read(2)) 

642 else: 

643 extended_flags = 0 

644 

645 if version >= 4: 

646 # Version 4: paths are always compressed (name_len should be 0) 

647 name, _consumed = _decompress_path_from_stream(f, previous_path) 

648 else: 

649 # Versions < 4: regular name reading 

650 name = f.read(flags & FLAG_NAMEMASK) 

651 

652 # Padding: 

653 if version < 4: 

654 real_size = (f.tell() - beginoffset + 8) & ~7 

655 f.read((beginoffset + real_size) - f.tell()) 

656 

657 return SerializedIndexEntry( 

658 name, 

659 ctime, 

660 mtime, 

661 dev, 

662 ino, 

663 mode, 

664 uid, 

665 gid, 

666 size, 

667 sha_to_hex(sha), 

668 flags & ~FLAG_NAMEMASK, 

669 extended_flags, 

670 ) 

671 

672 

673def write_cache_entry( 

674 f: IO[bytes], entry: SerializedIndexEntry, version: int, previous_path: bytes = b"" 

675) -> None: 

676 """Write an index entry to a file. 

677 

678 Args: 

679 f: File object 

680 entry: IndexEntry to write 

681 version: Index format version 

682 previous_path: Previous entry's path (for version 4 compression) 

683 """ 

684 beginoffset = f.tell() 

685 write_cache_time(f, entry.ctime) 

686 write_cache_time(f, entry.mtime) 

687 

688 if version >= 4: 

689 # Version 4: use compression but set name_len to actual filename length 

690 # This matches how C Git implements index v4 flags 

691 compressed_path = _compress_path(entry.name, previous_path) 

692 flags = len(entry.name) | (entry.flags & ~FLAG_NAMEMASK) 

693 else: 

694 # Versions < 4: include actual name length 

695 flags = len(entry.name) | (entry.flags & ~FLAG_NAMEMASK) 

696 

697 if entry.extended_flags: 

698 flags |= FLAG_EXTENDED 

699 if flags & FLAG_EXTENDED and version is not None and version < 3: 

700 raise AssertionError("unable to use extended flags in version < 3") 

701 

702 f.write( 

703 struct.pack( 

704 b">LLLLLL20sH", 

705 entry.dev & 0xFFFFFFFF, 

706 entry.ino & 0xFFFFFFFF, 

707 entry.mode, 

708 entry.uid, 

709 entry.gid, 

710 entry.size, 

711 hex_to_sha(entry.sha), 

712 flags, 

713 ) 

714 ) 

715 if flags & FLAG_EXTENDED: 

716 f.write(struct.pack(b">H", entry.extended_flags)) 

717 

718 if version >= 4: 

719 # Version 4: always write compressed path 

720 f.write(compressed_path) 

721 else: 

722 # Versions < 4: write regular path and padding 

723 f.write(entry.name) 

724 real_size = (f.tell() - beginoffset + 8) & ~7 

725 f.write(b"\0" * ((beginoffset + real_size) - f.tell())) 

726 

727 

728class UnsupportedIndexFormat(Exception): 

729 """An unsupported index format was encountered.""" 

730 

731 def __init__(self, version: int) -> None: 

732 """Initialize UnsupportedIndexFormat exception. 

733 

734 Args: 

735 version: The unsupported index format version 

736 """ 

737 self.index_format_version = version 

738 

739 

740def read_index_header(f: BinaryIO) -> tuple[int, int]: 

741 """Read an index header from a file. 

742 

743 Returns: 

744 tuple of (version, num_entries) 

745 """ 

746 header = f.read(4) 

747 if header != b"DIRC": 

748 raise AssertionError(f"Invalid index file header: {header!r}") 

749 (version, num_entries) = struct.unpack(b">LL", f.read(4 * 2)) 

750 if version not in (1, 2, 3, 4): 

751 raise UnsupportedIndexFormat(version) 

752 return version, num_entries 

753 

754 

755def write_index_extension(f: IO[bytes], extension: IndexExtension) -> None: 

756 """Write an index extension. 

757 

758 Args: 

759 f: File-like object to write to 

760 extension: Extension to write 

761 """ 

762 data = extension.to_bytes() 

763 f.write(extension.signature) 

764 f.write(struct.pack(">I", len(data))) 

765 f.write(data) 

766 

767 

768def read_index(f: BinaryIO) -> Iterator[SerializedIndexEntry]: 

769 """Read an index file, yielding the individual entries.""" 

770 version, num_entries = read_index_header(f) 

771 previous_path = b"" 

772 for i in range(num_entries): 

773 entry = read_cache_entry(f, version, previous_path) 

774 previous_path = entry.name 

775 yield entry 

776 

777 

778def read_index_dict_with_version( 

779 f: BinaryIO, 

780) -> tuple[ 

781 dict[bytes, Union[IndexEntry, ConflictedIndexEntry]], int, list[IndexExtension] 

782]: 

783 """Read an index file and return it as a dictionary along with the version. 

784 

785 Returns: 

786 tuple of (entries_dict, version, extensions) 

787 """ 

788 version, num_entries = read_index_header(f) 

789 

790 ret: dict[bytes, Union[IndexEntry, ConflictedIndexEntry]] = {} 

791 previous_path = b"" 

792 for i in range(num_entries): 

793 entry = read_cache_entry(f, version, previous_path) 

794 previous_path = entry.name 

795 stage = entry.stage() 

796 if stage == Stage.NORMAL: 

797 ret[entry.name] = IndexEntry.from_serialized(entry) 

798 else: 

799 existing = ret.setdefault(entry.name, ConflictedIndexEntry()) 

800 if isinstance(existing, IndexEntry): 

801 raise AssertionError(f"Non-conflicted entry for {entry.name!r} exists") 

802 if stage == Stage.MERGE_CONFLICT_ANCESTOR: 

803 existing.ancestor = IndexEntry.from_serialized(entry) 

804 elif stage == Stage.MERGE_CONFLICT_THIS: 

805 existing.this = IndexEntry.from_serialized(entry) 

806 elif stage == Stage.MERGE_CONFLICT_OTHER: 

807 existing.other = IndexEntry.from_serialized(entry) 

808 

809 # Read extensions 

810 extensions = [] 

811 while True: 

812 # Check if we're at the end (20 bytes before EOF for SHA checksum) 

813 current_pos = f.tell() 

814 f.seek(0, 2) # EOF 

815 eof_pos = f.tell() 

816 f.seek(current_pos) 

817 

818 if current_pos >= eof_pos - 20: 

819 break 

820 

821 # Try to read extension signature 

822 signature = f.read(4) 

823 if len(signature) < 4: 

824 break 

825 

826 # Check if it's a valid extension signature (4 uppercase letters) 

827 if not all(65 <= b <= 90 for b in signature): 

828 # Not an extension, seek back 

829 f.seek(-4, 1) 

830 break 

831 

832 # Read extension size 

833 size_data = f.read(4) 

834 if len(size_data) < 4: 

835 break 

836 size = struct.unpack(">I", size_data)[0] 

837 

838 # Read extension data 

839 data = f.read(size) 

840 if len(data) < size: 

841 break 

842 

843 extension = IndexExtension.from_raw(signature, data) 

844 extensions.append(extension) 

845 

846 return ret, version, extensions 

847 

848 

849def read_index_dict( 

850 f: BinaryIO, 

851) -> dict[bytes, Union[IndexEntry, ConflictedIndexEntry]]: 

852 """Read an index file and return it as a dictionary. 

853 

854 Dict Key is tuple of path and stage number, as 

855 path alone is not unique 

856 Args: 

857 f: File object to read fromls. 

858 """ 

859 ret: dict[bytes, Union[IndexEntry, ConflictedIndexEntry]] = {} 

860 for entry in read_index(f): 

861 stage = entry.stage() 

862 if stage == Stage.NORMAL: 

863 ret[entry.name] = IndexEntry.from_serialized(entry) 

864 else: 

865 existing = ret.setdefault(entry.name, ConflictedIndexEntry()) 

866 if isinstance(existing, IndexEntry): 

867 raise AssertionError(f"Non-conflicted entry for {entry.name!r} exists") 

868 if stage == Stage.MERGE_CONFLICT_ANCESTOR: 

869 existing.ancestor = IndexEntry.from_serialized(entry) 

870 elif stage == Stage.MERGE_CONFLICT_THIS: 

871 existing.this = IndexEntry.from_serialized(entry) 

872 elif stage == Stage.MERGE_CONFLICT_OTHER: 

873 existing.other = IndexEntry.from_serialized(entry) 

874 return ret 

875 

876 

877def write_index( 

878 f: IO[bytes], 

879 entries: Sequence[SerializedIndexEntry], 

880 version: Optional[int] = None, 

881 extensions: Optional[Sequence[IndexExtension]] = None, 

882) -> None: 

883 """Write an index file. 

884 

885 Args: 

886 f: File-like object to write to 

887 version: Version number to write 

888 entries: Iterable over the entries to write 

889 extensions: Optional list of extensions to write 

890 """ 

891 if version is None: 

892 version = DEFAULT_VERSION 

893 # STEP 1: check if any extended_flags are set 

894 uses_extended_flags = any(e.extended_flags != 0 for e in entries) 

895 if uses_extended_flags and version < 3: 

896 # Force or bump the version to 3 

897 version = 3 

898 # The rest is unchanged, but you might insert a final check: 

899 if version < 3: 

900 # Double-check no extended flags appear 

901 for e in entries: 

902 if e.extended_flags != 0: 

903 raise AssertionError("Attempt to use extended flags in index < v3") 

904 # Proceed with the existing code to write the header and entries. 

905 f.write(b"DIRC") 

906 f.write(struct.pack(b">LL", version, len(entries))) 

907 previous_path = b"" 

908 for entry in entries: 

909 write_cache_entry(f, entry, version=version, previous_path=previous_path) 

910 previous_path = entry.name 

911 

912 # Write extensions 

913 if extensions: 

914 for extension in extensions: 

915 write_index_extension(f, extension) 

916 

917 

918def write_index_dict( 

919 f: IO[bytes], 

920 entries: Mapping[bytes, Union[IndexEntry, ConflictedIndexEntry]], 

921 version: Optional[int] = None, 

922 extensions: Optional[Sequence[IndexExtension]] = None, 

923) -> None: 

924 """Write an index file based on the contents of a dictionary. 

925 

926 being careful to sort by path and then by stage. 

927 """ 

928 entries_list = [] 

929 for key in sorted(entries): 

930 value = entries[key] 

931 if isinstance(value, ConflictedIndexEntry): 

932 if value.ancestor is not None: 

933 entries_list.append( 

934 value.ancestor.serialize(key, Stage.MERGE_CONFLICT_ANCESTOR) 

935 ) 

936 if value.this is not None: 

937 entries_list.append( 

938 value.this.serialize(key, Stage.MERGE_CONFLICT_THIS) 

939 ) 

940 if value.other is not None: 

941 entries_list.append( 

942 value.other.serialize(key, Stage.MERGE_CONFLICT_OTHER) 

943 ) 

944 else: 

945 entries_list.append(value.serialize(key, Stage.NORMAL)) 

946 

947 write_index(f, entries_list, version=version, extensions=extensions) 

948 

949 

950def cleanup_mode(mode: int) -> int: 

951 """Cleanup a mode value. 

952 

953 This will return a mode that can be stored in a tree object. 

954 

955 Args: 

956 mode: Mode to clean up. 

957 

958 Returns: 

959 mode 

960 """ 

961 if stat.S_ISLNK(mode): 

962 return stat.S_IFLNK 

963 elif stat.S_ISDIR(mode): 

964 return stat.S_IFDIR 

965 elif S_ISGITLINK(mode): 

966 return S_IFGITLINK 

967 ret = stat.S_IFREG | 0o644 

968 if mode & 0o100: 

969 ret |= 0o111 

970 return ret 

971 

972 

973class Index: 

974 """A Git Index file.""" 

975 

976 _byname: dict[bytes, Union[IndexEntry, ConflictedIndexEntry]] 

977 

978 def __init__( 

979 self, 

980 filename: Union[bytes, str, os.PathLike[str]], 

981 read: bool = True, 

982 skip_hash: bool = False, 

983 version: Optional[int] = None, 

984 ) -> None: 

985 """Create an index object associated with the given filename. 

986 

987 Args: 

988 filename: Path to the index file 

989 read: Whether to initialize the index from the given file, should it exist. 

990 skip_hash: Whether to skip SHA1 hash when writing (for manyfiles feature) 

991 version: Index format version to use (None = auto-detect from file or use default) 

992 """ 

993 self._filename = os.fspath(filename) 

994 # TODO(jelmer): Store the version returned by read_index 

995 self._version = version 

996 self._skip_hash = skip_hash 

997 self._extensions: list[IndexExtension] = [] 

998 self.clear() 

999 if read: 

1000 self.read() 

1001 

1002 @property 

1003 def path(self) -> Union[bytes, str]: 

1004 """Get the path to the index file. 

1005 

1006 Returns: 

1007 Path to the index file 

1008 """ 

1009 return self._filename 

1010 

1011 def __repr__(self) -> str: 

1012 """Return string representation of Index.""" 

1013 return f"{self.__class__.__name__}({self._filename!r})" 

1014 

1015 def write(self) -> None: 

1016 """Write current contents of index to disk.""" 

1017 f = GitFile(self._filename, "wb") 

1018 try: 

1019 # Filter out extensions with no meaningful data 

1020 meaningful_extensions = [] 

1021 for ext in self._extensions: 

1022 # Skip extensions that have empty data 

1023 ext_data = ext.to_bytes() 

1024 if ext_data: 

1025 meaningful_extensions.append(ext) 

1026 

1027 if self._skip_hash: 

1028 # When skipHash is enabled, write the index without computing SHA1 

1029 write_index_dict( 

1030 f, 

1031 self._byname, 

1032 version=self._version, 

1033 extensions=meaningful_extensions, 

1034 ) 

1035 # Write 20 zero bytes instead of SHA1 

1036 f.write(b"\x00" * 20) 

1037 f.close() 

1038 else: 

1039 sha1_writer = SHA1Writer(f) 

1040 write_index_dict( 

1041 sha1_writer, 

1042 self._byname, 

1043 version=self._version, 

1044 extensions=meaningful_extensions, 

1045 ) 

1046 sha1_writer.close() 

1047 except: 

1048 f.close() 

1049 raise 

1050 

1051 def read(self) -> None: 

1052 """Read current contents of index from disk.""" 

1053 if not os.path.exists(self._filename): 

1054 return 

1055 f = GitFile(self._filename, "rb") 

1056 try: 

1057 sha1_reader = SHA1Reader(f) 

1058 entries, version, extensions = read_index_dict_with_version(sha1_reader) 

1059 self._version = version 

1060 self._extensions = extensions 

1061 self.update(entries) 

1062 # Extensions have already been read by read_index_dict_with_version 

1063 sha1_reader.check_sha(allow_empty=True) 

1064 finally: 

1065 f.close() 

1066 

1067 def __len__(self) -> int: 

1068 """Number of entries in this index file.""" 

1069 return len(self._byname) 

1070 

1071 def __getitem__(self, key: bytes) -> Union[IndexEntry, ConflictedIndexEntry]: 

1072 """Retrieve entry by relative path and stage. 

1073 

1074 Returns: Either a IndexEntry or a ConflictedIndexEntry 

1075 Raises KeyError: if the entry does not exist 

1076 """ 

1077 return self._byname[key] 

1078 

1079 def __iter__(self) -> Iterator[bytes]: 

1080 """Iterate over the paths and stages in this index.""" 

1081 return iter(self._byname) 

1082 

1083 def __contains__(self, key: bytes) -> bool: 

1084 """Check if a path exists in the index.""" 

1085 return key in self._byname 

1086 

1087 def get_sha1(self, path: bytes) -> bytes: 

1088 """Return the (git object) SHA1 for the object at a path.""" 

1089 value = self[path] 

1090 if isinstance(value, ConflictedIndexEntry): 

1091 raise UnmergedEntries 

1092 return value.sha 

1093 

1094 def get_mode(self, path: bytes) -> int: 

1095 """Return the POSIX file mode for the object at a path.""" 

1096 value = self[path] 

1097 if isinstance(value, ConflictedIndexEntry): 

1098 raise UnmergedEntries 

1099 return value.mode 

1100 

1101 def iterobjects(self) -> Iterable[tuple[bytes, bytes, int]]: 

1102 """Iterate over path, sha, mode tuples for use with commit_tree.""" 

1103 for path in self: 

1104 entry = self[path] 

1105 if isinstance(entry, ConflictedIndexEntry): 

1106 raise UnmergedEntries 

1107 yield path, entry.sha, cleanup_mode(entry.mode) 

1108 

1109 def has_conflicts(self) -> bool: 

1110 """Check if the index contains any conflicted entries. 

1111 

1112 Returns: 

1113 True if any entries are conflicted, False otherwise 

1114 """ 

1115 for value in self._byname.values(): 

1116 if isinstance(value, ConflictedIndexEntry): 

1117 return True 

1118 return False 

1119 

1120 def clear(self) -> None: 

1121 """Remove all contents from this index.""" 

1122 self._byname = {} 

1123 

1124 def __setitem__( 

1125 self, name: bytes, value: Union[IndexEntry, ConflictedIndexEntry] 

1126 ) -> None: 

1127 """Set an entry in the index.""" 

1128 assert isinstance(name, bytes) 

1129 self._byname[name] = value 

1130 

1131 def __delitem__(self, name: bytes) -> None: 

1132 """Delete an entry from the index.""" 

1133 del self._byname[name] 

1134 

1135 def iteritems( 

1136 self, 

1137 ) -> Iterator[tuple[bytes, Union[IndexEntry, ConflictedIndexEntry]]]: 

1138 """Iterate over (path, entry) pairs in the index. 

1139 

1140 Returns: 

1141 Iterator of (path, entry) tuples 

1142 """ 

1143 return iter(self._byname.items()) 

1144 

1145 def items(self) -> Iterator[tuple[bytes, Union[IndexEntry, ConflictedIndexEntry]]]: 

1146 """Get an iterator over (path, entry) pairs. 

1147 

1148 Returns: 

1149 Iterator of (path, entry) tuples 

1150 """ 

1151 return iter(self._byname.items()) 

1152 

1153 def update( 

1154 self, entries: dict[bytes, Union[IndexEntry, ConflictedIndexEntry]] 

1155 ) -> None: 

1156 """Update the index with multiple entries. 

1157 

1158 Args: 

1159 entries: Dictionary mapping paths to index entries 

1160 """ 

1161 for key, value in entries.items(): 

1162 self[key] = value 

1163 

1164 def paths(self) -> Generator[bytes, None, None]: 

1165 """Generate all paths in the index. 

1166 

1167 Yields: 

1168 Path names as bytes 

1169 """ 

1170 yield from self._byname.keys() 

1171 

1172 def changes_from_tree( 

1173 self, 

1174 object_store: ObjectContainer, 

1175 tree: ObjectID, 

1176 want_unchanged: bool = False, 

1177 ) -> Generator[ 

1178 tuple[ 

1179 tuple[Optional[bytes], Optional[bytes]], 

1180 tuple[Optional[int], Optional[int]], 

1181 tuple[Optional[bytes], Optional[bytes]], 

1182 ], 

1183 None, 

1184 None, 

1185 ]: 

1186 """Find the differences between the contents of this index and a tree. 

1187 

1188 Args: 

1189 object_store: Object store to use for retrieving tree contents 

1190 tree: SHA1 of the root tree 

1191 want_unchanged: Whether unchanged files should be reported 

1192 Returns: Iterator over tuples with (oldpath, newpath), (oldmode, 

1193 newmode), (oldsha, newsha) 

1194 """ 

1195 

1196 def lookup_entry(path: bytes) -> tuple[bytes, int]: 

1197 entry = self[path] 

1198 if hasattr(entry, "sha") and hasattr(entry, "mode"): 

1199 return entry.sha, cleanup_mode(entry.mode) 

1200 else: 

1201 # Handle ConflictedIndexEntry case 

1202 return b"", 0 

1203 

1204 yield from changes_from_tree( 

1205 self.paths(), 

1206 lookup_entry, 

1207 object_store, 

1208 tree, 

1209 want_unchanged=want_unchanged, 

1210 ) 

1211 

1212 def commit(self, object_store: ObjectContainer) -> bytes: 

1213 """Create a new tree from an index. 

1214 

1215 Args: 

1216 object_store: Object store to save the tree in 

1217 Returns: 

1218 Root tree SHA 

1219 """ 

1220 return commit_tree(object_store, self.iterobjects()) 

1221 

1222 

1223def commit_tree( 

1224 object_store: ObjectContainer, blobs: Iterable[tuple[bytes, bytes, int]] 

1225) -> bytes: 

1226 """Commit a new tree. 

1227 

1228 Args: 

1229 object_store: Object store to add trees to 

1230 blobs: Iterable over blob path, sha, mode entries 

1231 Returns: 

1232 SHA1 of the created tree. 

1233 """ 

1234 trees: dict[bytes, TreeDict] = {b"": {}} 

1235 

1236 def add_tree(path: bytes) -> TreeDict: 

1237 if path in trees: 

1238 return trees[path] 

1239 dirname, basename = pathsplit(path) 

1240 t = add_tree(dirname) 

1241 assert isinstance(basename, bytes) 

1242 newtree: TreeDict = {} 

1243 t[basename] = newtree 

1244 trees[path] = newtree 

1245 return newtree 

1246 

1247 for path, sha, mode in blobs: 

1248 tree_path, basename = pathsplit(path) 

1249 tree = add_tree(tree_path) 

1250 tree[basename] = (mode, sha) 

1251 

1252 def build_tree(path: bytes) -> bytes: 

1253 tree = Tree() 

1254 for basename, entry in trees[path].items(): 

1255 if isinstance(entry, dict): 

1256 mode = stat.S_IFDIR 

1257 sha = build_tree(pathjoin(path, basename)) 

1258 else: 

1259 (mode, sha) = entry 

1260 tree.add(basename, mode, sha) 

1261 object_store.add_object(tree) 

1262 return tree.id 

1263 

1264 return build_tree(b"") 

1265 

1266 

1267def commit_index(object_store: ObjectContainer, index: Index) -> bytes: 

1268 """Create a new tree from an index. 

1269 

1270 Args: 

1271 object_store: Object store to save the tree in 

1272 index: Index file 

1273 Note: This function is deprecated, use index.commit() instead. 

1274 Returns: Root tree sha. 

1275 """ 

1276 return commit_tree(object_store, index.iterobjects()) 

1277 

1278 

1279def changes_from_tree( 

1280 names: Iterable[bytes], 

1281 lookup_entry: Callable[[bytes], tuple[bytes, int]], 

1282 object_store: ObjectContainer, 

1283 tree: Optional[bytes], 

1284 want_unchanged: bool = False, 

1285) -> Iterable[ 

1286 tuple[ 

1287 tuple[Optional[bytes], Optional[bytes]], 

1288 tuple[Optional[int], Optional[int]], 

1289 tuple[Optional[bytes], Optional[bytes]], 

1290 ] 

1291]: 

1292 """Find the differences between the contents of a tree and a working copy. 

1293 

1294 Args: 

1295 names: Iterable of names in the working copy 

1296 lookup_entry: Function to lookup an entry in the working copy 

1297 object_store: Object store to use for retrieving tree contents 

1298 tree: SHA1 of the root tree, or None for an empty tree 

1299 want_unchanged: Whether unchanged files should be reported 

1300 Returns: Iterator over tuples with (oldpath, newpath), (oldmode, newmode), 

1301 (oldsha, newsha) 

1302 """ 

1303 # TODO(jelmer): Support a include_trees option 

1304 other_names = set(names) 

1305 

1306 if tree is not None: 

1307 for name, mode, sha in iter_tree_contents(object_store, tree): 

1308 assert name is not None and mode is not None and sha is not None 

1309 try: 

1310 (other_sha, other_mode) = lookup_entry(name) 

1311 except KeyError: 

1312 # Was removed 

1313 yield ((name, None), (mode, None), (sha, None)) 

1314 else: 

1315 other_names.remove(name) 

1316 if want_unchanged or other_sha != sha or other_mode != mode: 

1317 yield ((name, name), (mode, other_mode), (sha, other_sha)) 

1318 

1319 # Mention added files 

1320 for name in other_names: 

1321 try: 

1322 (other_sha, other_mode) = lookup_entry(name) 

1323 except KeyError: 

1324 pass 

1325 else: 

1326 yield ((None, name), (None, other_mode), (None, other_sha)) 

1327 

1328 

1329def index_entry_from_stat( 

1330 stat_val: os.stat_result, 

1331 hex_sha: bytes, 

1332 mode: Optional[int] = None, 

1333) -> IndexEntry: 

1334 """Create a new index entry from a stat value. 

1335 

1336 Args: 

1337 stat_val: POSIX stat_result instance 

1338 hex_sha: Hex sha of the object 

1339 mode: Optional file mode, will be derived from stat if not provided 

1340 """ 

1341 if mode is None: 

1342 mode = cleanup_mode(stat_val.st_mode) 

1343 

1344 return IndexEntry( 

1345 ctime=stat_val.st_ctime, 

1346 mtime=stat_val.st_mtime, 

1347 dev=stat_val.st_dev, 

1348 ino=stat_val.st_ino, 

1349 mode=mode, 

1350 uid=stat_val.st_uid, 

1351 gid=stat_val.st_gid, 

1352 size=stat_val.st_size, 

1353 sha=hex_sha, 

1354 flags=0, 

1355 extended_flags=0, 

1356 ) 

1357 

1358 

1359if sys.platform == "win32": 

1360 # On Windows, creating symlinks either requires administrator privileges 

1361 # or developer mode. Raise a more helpful error when we're unable to 

1362 # create symlinks 

1363 

1364 # https://github.com/jelmer/dulwich/issues/1005 

1365 

1366 class WindowsSymlinkPermissionError(PermissionError): 

1367 """Windows-specific error for symlink creation failures. 

1368 

1369 This error is raised when symlink creation fails on Windows, 

1370 typically due to lack of developer mode or administrator privileges. 

1371 """ 

1372 

1373 def __init__(self, errno: int, msg: str, filename: Optional[str]) -> None: 

1374 """Initialize WindowsSymlinkPermissionError.""" 

1375 super(PermissionError, self).__init__( 

1376 errno, 

1377 f"Unable to create symlink; do you have developer mode enabled? {msg}", 

1378 filename, 

1379 ) 

1380 

1381 def symlink( 

1382 src: Union[str, bytes], 

1383 dst: Union[str, bytes], 

1384 target_is_directory: bool = False, 

1385 *, 

1386 dir_fd: Optional[int] = None, 

1387 ) -> None: 

1388 """Create a symbolic link on Windows with better error handling. 

1389 

1390 Args: 

1391 src: Source path for the symlink 

1392 dst: Destination path where symlink will be created 

1393 target_is_directory: Whether the target is a directory 

1394 dir_fd: Optional directory file descriptor 

1395 

1396 Raises: 

1397 WindowsSymlinkPermissionError: If symlink creation fails due to permissions 

1398 """ 

1399 try: 

1400 return os.symlink( 

1401 src, dst, target_is_directory=target_is_directory, dir_fd=dir_fd 

1402 ) 

1403 except PermissionError as e: 

1404 raise WindowsSymlinkPermissionError( 

1405 e.errno or 0, e.strerror or "", e.filename 

1406 ) from e 

1407else: 

1408 symlink = os.symlink 

1409 

1410 

1411def build_file_from_blob( 

1412 blob: Blob, 

1413 mode: int, 

1414 target_path: bytes, 

1415 *, 

1416 honor_filemode: bool = True, 

1417 tree_encoding: str = "utf-8", 

1418 symlink_fn: Optional[ 

1419 Callable[ 

1420 [Union[str, bytes, os.PathLike[str]], Union[str, bytes, os.PathLike[str]]], 

1421 None, 

1422 ] 

1423 ] = None, 

1424) -> os.stat_result: 

1425 """Build a file or symlink on disk based on a Git object. 

1426 

1427 Args: 

1428 blob: The git object 

1429 mode: File mode 

1430 target_path: Path to write to 

1431 honor_filemode: An optional flag to honor core.filemode setting in 

1432 config file, default is core.filemode=True, change executable bit 

1433 tree_encoding: Encoding to use for tree contents 

1434 symlink_fn: Function to use for creating symlinks 

1435 Returns: stat object for the file 

1436 """ 

1437 try: 

1438 oldstat = os.lstat(target_path) 

1439 except FileNotFoundError: 

1440 oldstat = None 

1441 contents = blob.as_raw_string() 

1442 if stat.S_ISLNK(mode): 

1443 if oldstat: 

1444 _remove_file_with_readonly_handling(target_path) 

1445 if sys.platform == "win32": 

1446 # os.readlink on Python3 on Windows requires a unicode string. 

1447 contents_str = contents.decode(tree_encoding) 

1448 target_path_str = target_path.decode(tree_encoding) 

1449 (symlink_fn or symlink)(contents_str, target_path_str) 

1450 else: 

1451 (symlink_fn or symlink)(contents, target_path) 

1452 else: 

1453 if oldstat is not None and oldstat.st_size == len(contents): 

1454 with open(target_path, "rb") as f: 

1455 if f.read() == contents: 

1456 return oldstat 

1457 

1458 with open(target_path, "wb") as f: 

1459 # Write out file 

1460 f.write(contents) 

1461 

1462 if honor_filemode: 

1463 os.chmod(target_path, mode) 

1464 

1465 return os.lstat(target_path) 

1466 

1467 

1468INVALID_DOTNAMES = (b".git", b".", b"..", b"") 

1469 

1470 

1471def _normalize_path_element_default(element: bytes) -> bytes: 

1472 """Normalize path element for default case-insensitive comparison.""" 

1473 return element.lower() 

1474 

1475 

1476def _normalize_path_element_ntfs(element: bytes) -> bytes: 

1477 """Normalize path element for NTFS filesystem.""" 

1478 return element.rstrip(b". ").lower() 

1479 

1480 

1481def _normalize_path_element_hfs(element: bytes) -> bytes: 

1482 """Normalize path element for HFS+ filesystem.""" 

1483 import unicodedata 

1484 

1485 # Decode to Unicode (let UnicodeDecodeError bubble up) 

1486 element_str = element.decode("utf-8", errors="strict") 

1487 

1488 # Remove HFS+ ignorable characters 

1489 filtered = "".join(c for c in element_str if ord(c) not in HFS_IGNORABLE_CHARS) 

1490 # Normalize to NFD 

1491 normalized = unicodedata.normalize("NFD", filtered) 

1492 return normalized.lower().encode("utf-8", errors="strict") 

1493 

1494 

1495def get_path_element_normalizer(config: "Config") -> Callable[[bytes], bytes]: 

1496 """Get the appropriate path element normalization function based on config. 

1497 

1498 Args: 

1499 config: Repository configuration object 

1500 

1501 Returns: 

1502 Function that normalizes path elements for the configured filesystem 

1503 """ 

1504 import os 

1505 import sys 

1506 

1507 if config.get_boolean(b"core", b"protectNTFS", os.name == "nt"): 

1508 return _normalize_path_element_ntfs 

1509 elif config.get_boolean(b"core", b"protectHFS", sys.platform == "darwin"): 

1510 return _normalize_path_element_hfs 

1511 else: 

1512 return _normalize_path_element_default 

1513 

1514 

1515def validate_path_element_default(element: bytes) -> bool: 

1516 """Validate a path element using default rules. 

1517 

1518 Args: 

1519 element: Path element to validate 

1520 

1521 Returns: 

1522 True if path element is valid, False otherwise 

1523 """ 

1524 return _normalize_path_element_default(element) not in INVALID_DOTNAMES 

1525 

1526 

1527def validate_path_element_ntfs(element: bytes) -> bool: 

1528 """Validate a path element using NTFS filesystem rules. 

1529 

1530 Args: 

1531 element: Path element to validate 

1532 

1533 Returns: 

1534 True if path element is valid for NTFS, False otherwise 

1535 """ 

1536 normalized = _normalize_path_element_ntfs(element) 

1537 if normalized in INVALID_DOTNAMES: 

1538 return False 

1539 if normalized == b"git~1": 

1540 return False 

1541 return True 

1542 

1543 

1544# HFS+ ignorable Unicode codepoints (from Git's utf8.c) 

1545HFS_IGNORABLE_CHARS = { 

1546 0x200C, # ZERO WIDTH NON-JOINER 

1547 0x200D, # ZERO WIDTH JOINER 

1548 0x200E, # LEFT-TO-RIGHT MARK 

1549 0x200F, # RIGHT-TO-LEFT MARK 

1550 0x202A, # LEFT-TO-RIGHT EMBEDDING 

1551 0x202B, # RIGHT-TO-LEFT EMBEDDING 

1552 0x202C, # POP DIRECTIONAL FORMATTING 

1553 0x202D, # LEFT-TO-RIGHT OVERRIDE 

1554 0x202E, # RIGHT-TO-LEFT OVERRIDE 

1555 0x206A, # INHIBIT SYMMETRIC SWAPPING 

1556 0x206B, # ACTIVATE SYMMETRIC SWAPPING 

1557 0x206C, # INHIBIT ARABIC FORM SHAPING 

1558 0x206D, # ACTIVATE ARABIC FORM SHAPING 

1559 0x206E, # NATIONAL DIGIT SHAPES 

1560 0x206F, # NOMINAL DIGIT SHAPES 

1561 0xFEFF, # ZERO WIDTH NO-BREAK SPACE 

1562} 

1563 

1564 

1565def validate_path_element_hfs(element: bytes) -> bool: 

1566 """Validate path element for HFS+ filesystem. 

1567 

1568 Equivalent to Git's is_hfs_dotgit and related checks. 

1569 Uses NFD normalization and ignores HFS+ ignorable characters. 

1570 """ 

1571 try: 

1572 normalized = _normalize_path_element_hfs(element) 

1573 except UnicodeDecodeError: 

1574 # Malformed UTF-8 - be conservative and reject 

1575 return False 

1576 

1577 # Check against invalid names 

1578 if normalized in INVALID_DOTNAMES: 

1579 return False 

1580 

1581 # Also check for 8.3 short name 

1582 if normalized == b"git~1": 

1583 return False 

1584 

1585 return True 

1586 

1587 

1588def validate_path( 

1589 path: bytes, 

1590 element_validator: Callable[[bytes], bool] = validate_path_element_default, 

1591) -> bool: 

1592 """Default path validator that just checks for .git/.""" 

1593 parts = path.split(b"/") 

1594 for p in parts: 

1595 if not element_validator(p): 

1596 return False 

1597 else: 

1598 return True 

1599 

1600 

1601def build_index_from_tree( 

1602 root_path: Union[str, bytes], 

1603 index_path: Union[str, bytes], 

1604 object_store: ObjectContainer, 

1605 tree_id: bytes, 

1606 honor_filemode: bool = True, 

1607 validate_path_element: Callable[[bytes], bool] = validate_path_element_default, 

1608 symlink_fn: Optional[ 

1609 Callable[ 

1610 [Union[str, bytes, os.PathLike[str]], Union[str, bytes, os.PathLike[str]]], 

1611 None, 

1612 ] 

1613 ] = None, 

1614 blob_normalizer: Optional["FilterBlobNormalizer"] = None, 

1615 tree_encoding: str = "utf-8", 

1616) -> None: 

1617 """Generate and materialize index from a tree. 

1618 

1619 Args: 

1620 tree_id: Tree to materialize 

1621 root_path: Target dir for materialized index files 

1622 index_path: Target path for generated index 

1623 object_store: Non-empty object store holding tree contents 

1624 honor_filemode: An optional flag to honor core.filemode setting in 

1625 config file, default is core.filemode=True, change executable bit 

1626 validate_path_element: Function to validate path elements to check 

1627 out; default just refuses .git and .. directories. 

1628 symlink_fn: Function to use for creating symlinks 

1629 blob_normalizer: An optional BlobNormalizer to use for converting line 

1630 endings when writing blobs to the working directory. 

1631 tree_encoding: Encoding used for tree paths (default: utf-8) 

1632 

1633 Note: existing index is wiped and contents are not merged 

1634 in a working dir. Suitable only for fresh clones. 

1635 """ 

1636 index = Index(index_path, read=False) 

1637 if not isinstance(root_path, bytes): 

1638 root_path = os.fsencode(root_path) 

1639 

1640 for entry in iter_tree_contents(object_store, tree_id): 

1641 assert ( 

1642 entry.path is not None and entry.mode is not None and entry.sha is not None 

1643 ) 

1644 if not validate_path(entry.path, validate_path_element): 

1645 continue 

1646 full_path = _tree_to_fs_path(root_path, entry.path, tree_encoding) 

1647 

1648 if not os.path.exists(os.path.dirname(full_path)): 

1649 os.makedirs(os.path.dirname(full_path)) 

1650 

1651 # TODO(jelmer): Merge new index into working tree 

1652 if S_ISGITLINK(entry.mode): 

1653 if not os.path.isdir(full_path): 

1654 os.mkdir(full_path) 

1655 st = os.lstat(full_path) 

1656 # TODO(jelmer): record and return submodule paths 

1657 else: 

1658 obj = object_store[entry.sha] 

1659 assert isinstance(obj, Blob) 

1660 # Apply blob normalization for checkout if normalizer is provided 

1661 if blob_normalizer is not None: 

1662 obj = blob_normalizer.checkout_normalize(obj, entry.path) 

1663 st = build_file_from_blob( 

1664 obj, 

1665 entry.mode, 

1666 full_path, 

1667 honor_filemode=honor_filemode, 

1668 tree_encoding=tree_encoding, 

1669 symlink_fn=symlink_fn, 

1670 ) 

1671 

1672 # Add file to index 

1673 if not honor_filemode or S_ISGITLINK(entry.mode): 

1674 # we can not use tuple slicing to build a new tuple, 

1675 # because on windows that will convert the times to 

1676 # longs, which causes errors further along 

1677 st_tuple = ( 

1678 entry.mode, 

1679 st.st_ino, 

1680 st.st_dev, 

1681 st.st_nlink, 

1682 st.st_uid, 

1683 st.st_gid, 

1684 st.st_size, 

1685 st.st_atime, 

1686 st.st_mtime, 

1687 st.st_ctime, 

1688 ) 

1689 st = st.__class__(st_tuple) 

1690 # default to a stage 0 index entry (normal) 

1691 # when reading from the filesystem 

1692 index[entry.path] = index_entry_from_stat(st, entry.sha) 

1693 

1694 index.write() 

1695 

1696 

1697def blob_from_path_and_mode( 

1698 fs_path: bytes, mode: int, tree_encoding: str = "utf-8" 

1699) -> Blob: 

1700 """Create a blob from a path and a stat object. 

1701 

1702 Args: 

1703 fs_path: Full file system path to file 

1704 mode: File mode 

1705 tree_encoding: Encoding to use for tree contents 

1706 Returns: A `Blob` object 

1707 """ 

1708 assert isinstance(fs_path, bytes) 

1709 blob = Blob() 

1710 if stat.S_ISLNK(mode): 

1711 if sys.platform == "win32": 

1712 # os.readlink on Python3 on Windows requires a unicode string. 

1713 blob.data = os.readlink(os.fsdecode(fs_path)).encode(tree_encoding) 

1714 else: 

1715 blob.data = os.readlink(fs_path) 

1716 else: 

1717 with open(fs_path, "rb") as f: 

1718 blob.data = f.read() 

1719 return blob 

1720 

1721 

1722def blob_from_path_and_stat( 

1723 fs_path: bytes, st: os.stat_result, tree_encoding: str = "utf-8" 

1724) -> Blob: 

1725 """Create a blob from a path and a stat object. 

1726 

1727 Args: 

1728 fs_path: Full file system path to file 

1729 st: A stat object 

1730 tree_encoding: Encoding to use for tree contents 

1731 Returns: A `Blob` object 

1732 """ 

1733 return blob_from_path_and_mode(fs_path, st.st_mode, tree_encoding) 

1734 

1735 

1736def read_submodule_head(path: Union[str, bytes]) -> Optional[bytes]: 

1737 """Read the head commit of a submodule. 

1738 

1739 Args: 

1740 path: path to the submodule 

1741 Returns: HEAD sha, None if not a valid head/repository 

1742 """ 

1743 from .errors import NotGitRepository 

1744 from .repo import Repo 

1745 

1746 # Repo currently expects a "str", so decode if necessary. 

1747 # TODO(jelmer): Perhaps move this into Repo() ? 

1748 if not isinstance(path, str): 

1749 path = os.fsdecode(path) 

1750 try: 

1751 repo = Repo(path) 

1752 except NotGitRepository: 

1753 return None 

1754 try: 

1755 return repo.head() 

1756 except KeyError: 

1757 return None 

1758 

1759 

1760def _has_directory_changed(tree_path: bytes, entry: IndexEntry) -> bool: 

1761 """Check if a directory has changed after getting an error. 

1762 

1763 When handling an error trying to create a blob from a path, call this 

1764 function. It will check if the path is a directory. If it's a directory 

1765 and a submodule, check the submodule head to see if it's has changed. If 

1766 not, consider the file as changed as Git tracked a file and not a 

1767 directory. 

1768 

1769 Return true if the given path should be considered as changed and False 

1770 otherwise or if the path is not a directory. 

1771 """ 

1772 # This is actually a directory 

1773 if os.path.exists(os.path.join(tree_path, b".git")): 

1774 # Submodule 

1775 head = read_submodule_head(tree_path) 

1776 if entry.sha != head: 

1777 return True 

1778 else: 

1779 # The file was changed to a directory, so consider it removed. 

1780 return True 

1781 

1782 return False 

1783 

1784 

1785os_sep_bytes = os.sep.encode("ascii") 

1786 

1787 

1788def _ensure_parent_dir_exists(full_path: bytes) -> None: 

1789 """Ensure parent directory exists, checking no parent is a file.""" 

1790 parent_dir = os.path.dirname(full_path) 

1791 if parent_dir and not os.path.exists(parent_dir): 

1792 # Walk up the directory tree to find the first existing parent 

1793 current = parent_dir 

1794 parents_to_check: list[bytes] = [] 

1795 

1796 while current and not os.path.exists(current): 

1797 parents_to_check.insert(0, current) 

1798 new_parent = os.path.dirname(current) 

1799 if new_parent == current: 

1800 # Reached the root or can't go up further 

1801 break 

1802 current = new_parent 

1803 

1804 # Check if the existing parent (if any) is a directory 

1805 if current and os.path.exists(current) and not os.path.isdir(current): 

1806 raise OSError( 

1807 f"Cannot create directory, parent path is a file: {current!r}" 

1808 ) 

1809 

1810 # Now check each parent we need to create isn't blocked by an existing file 

1811 for parent_path in parents_to_check: 

1812 if os.path.exists(parent_path) and not os.path.isdir(parent_path): 

1813 raise OSError( 

1814 f"Cannot create directory, parent path is a file: {parent_path!r}" 

1815 ) 

1816 

1817 os.makedirs(parent_dir) 

1818 

1819 

1820def _remove_file_with_readonly_handling(path: bytes) -> None: 

1821 """Remove a file, handling read-only files on Windows. 

1822 

1823 Args: 

1824 path: Path to the file to remove 

1825 """ 

1826 try: 

1827 os.unlink(path) 

1828 except PermissionError: 

1829 # On Windows, remove read-only attribute and retry 

1830 if sys.platform == "win32": 

1831 os.chmod(path, stat.S_IWRITE | stat.S_IREAD) 

1832 os.unlink(path) 

1833 else: 

1834 raise 

1835 

1836 

1837def _remove_empty_parents(path: bytes, stop_at: bytes) -> None: 

1838 """Remove empty parent directories up to stop_at.""" 

1839 parent = os.path.dirname(path) 

1840 while parent and parent != stop_at: 

1841 try: 

1842 os.rmdir(parent) 

1843 parent = os.path.dirname(parent) 

1844 except FileNotFoundError: 

1845 # Directory doesn't exist - stop trying 

1846 break 

1847 except OSError as e: 

1848 if e.errno == errno.ENOTEMPTY: 

1849 # Directory not empty - stop trying 

1850 break 

1851 raise 

1852 

1853 

1854def _check_symlink_matches( 

1855 full_path: bytes, repo_object_store: "BaseObjectStore", entry_sha: bytes 

1856) -> bool: 

1857 """Check if symlink target matches expected target. 

1858 

1859 Returns True if symlink matches, False if it doesn't match. 

1860 """ 

1861 try: 

1862 current_target = os.readlink(full_path) 

1863 blob_obj = repo_object_store[entry_sha] 

1864 expected_target = blob_obj.as_raw_string() 

1865 if isinstance(current_target, str): 

1866 current_target = current_target.encode() 

1867 return current_target == expected_target 

1868 except FileNotFoundError: 

1869 # Symlink doesn't exist 

1870 return False 

1871 except OSError as e: 

1872 if e.errno == errno.EINVAL: 

1873 # Not a symlink 

1874 return False 

1875 raise 

1876 

1877 

1878def _check_file_matches( 

1879 repo_object_store: "BaseObjectStore", 

1880 full_path: bytes, 

1881 entry_sha: bytes, 

1882 entry_mode: int, 

1883 current_stat: os.stat_result, 

1884 honor_filemode: bool, 

1885 blob_normalizer: Optional["FilterBlobNormalizer"] = None, 

1886 tree_path: Optional[bytes] = None, 

1887) -> bool: 

1888 """Check if a file on disk matches the expected git object. 

1889 

1890 Returns True if file matches, False if it doesn't match. 

1891 """ 

1892 # Check mode first (if honor_filemode is True) 

1893 if honor_filemode: 

1894 current_mode = stat.S_IMODE(current_stat.st_mode) 

1895 expected_mode = stat.S_IMODE(entry_mode) 

1896 

1897 # For regular files, only check the user executable bit, not group/other permissions 

1898 # This matches Git's behavior where umask differences don't count as modifications 

1899 if stat.S_ISREG(current_stat.st_mode): 

1900 # Normalize regular file modes to ignore group/other write permissions 

1901 current_mode_normalized = ( 

1902 current_mode & 0o755 

1903 ) # Keep only user rwx and all read+execute 

1904 expected_mode_normalized = expected_mode & 0o755 

1905 

1906 # For Git compatibility, regular files should be either 644 or 755 

1907 if expected_mode_normalized not in (0o644, 0o755): 

1908 expected_mode_normalized = 0o644 # Default for regular files 

1909 if current_mode_normalized not in (0o644, 0o755): 

1910 # Determine if it should be executable based on user execute bit 

1911 if current_mode & 0o100: # User execute bit is set 

1912 current_mode_normalized = 0o755 

1913 else: 

1914 current_mode_normalized = 0o644 

1915 

1916 if current_mode_normalized != expected_mode_normalized: 

1917 return False 

1918 else: 

1919 # For non-regular files (symlinks, etc.), check mode exactly 

1920 if current_mode != expected_mode: 

1921 return False 

1922 

1923 # If mode matches (or we don't care), check content via size first 

1924 blob_obj = repo_object_store[entry_sha] 

1925 if current_stat.st_size != blob_obj.raw_length(): 

1926 return False 

1927 

1928 # Size matches, check actual content 

1929 try: 

1930 with open(full_path, "rb") as f: 

1931 current_content = f.read() 

1932 expected_content = blob_obj.as_raw_string() 

1933 if blob_normalizer and tree_path is not None: 

1934 assert isinstance(blob_obj, Blob) 

1935 normalized_blob = blob_normalizer.checkout_normalize( 

1936 blob_obj, tree_path 

1937 ) 

1938 expected_content = normalized_blob.as_raw_string() 

1939 return current_content == expected_content 

1940 except (FileNotFoundError, PermissionError, IsADirectoryError): 

1941 return False 

1942 

1943 

1944def _transition_to_submodule( 

1945 repo: "Repo", 

1946 path: bytes, 

1947 full_path: bytes, 

1948 current_stat: Optional[os.stat_result], 

1949 entry: Union[IndexEntry, TreeEntry], 

1950 index: Index, 

1951) -> None: 

1952 """Transition any type to submodule.""" 

1953 from .submodule import ensure_submodule_placeholder 

1954 

1955 if current_stat is not None and stat.S_ISDIR(current_stat.st_mode): 

1956 # Already a directory, just ensure .git file exists 

1957 ensure_submodule_placeholder(repo, path) 

1958 else: 

1959 # Remove whatever is there and create submodule 

1960 if current_stat is not None: 

1961 _remove_file_with_readonly_handling(full_path) 

1962 ensure_submodule_placeholder(repo, path) 

1963 

1964 st = os.lstat(full_path) 

1965 assert entry.sha is not None 

1966 index[path] = index_entry_from_stat(st, entry.sha) 

1967 

1968 

1969def _transition_to_file( 

1970 object_store: "BaseObjectStore", 

1971 path: bytes, 

1972 full_path: bytes, 

1973 current_stat: Optional[os.stat_result], 

1974 entry: Union[IndexEntry, TreeEntry], 

1975 index: Index, 

1976 honor_filemode: bool, 

1977 symlink_fn: Optional[ 

1978 Callable[ 

1979 [Union[str, bytes, os.PathLike[str]], Union[str, bytes, os.PathLike[str]]], 

1980 None, 

1981 ] 

1982 ], 

1983 blob_normalizer: Optional["FilterBlobNormalizer"], 

1984 tree_encoding: str = "utf-8", 

1985) -> None: 

1986 """Transition any type to regular file or symlink.""" 

1987 assert entry.sha is not None and entry.mode is not None 

1988 # Check if we need to update 

1989 if ( 

1990 current_stat is not None 

1991 and stat.S_ISREG(current_stat.st_mode) 

1992 and not stat.S_ISLNK(entry.mode) 

1993 ): 

1994 # File to file - check if update needed 

1995 file_matches = _check_file_matches( 

1996 object_store, 

1997 full_path, 

1998 entry.sha, 

1999 entry.mode, 

2000 current_stat, 

2001 honor_filemode, 

2002 blob_normalizer, 

2003 path, 

2004 ) 

2005 needs_update = not file_matches 

2006 elif ( 

2007 current_stat is not None 

2008 and stat.S_ISLNK(current_stat.st_mode) 

2009 and stat.S_ISLNK(entry.mode) 

2010 ): 

2011 # Symlink to symlink - check if update needed 

2012 symlink_matches = _check_symlink_matches(full_path, object_store, entry.sha) 

2013 needs_update = not symlink_matches 

2014 else: 

2015 needs_update = True 

2016 

2017 if not needs_update: 

2018 # Just update index - current_stat should always be valid here since we're not updating 

2019 assert current_stat is not None 

2020 index[path] = index_entry_from_stat(current_stat, entry.sha) 

2021 return 

2022 

2023 # Remove existing entry if needed 

2024 if current_stat is not None and stat.S_ISDIR(current_stat.st_mode): 

2025 # Remove directory 

2026 dir_contents = set(os.listdir(full_path)) 

2027 git_file_name = b".git" if isinstance(full_path, bytes) else ".git" 

2028 

2029 if git_file_name in dir_contents: 

2030 if dir_contents != {git_file_name}: 

2031 raise IsADirectoryError( 

2032 f"Cannot replace submodule with untracked files: {full_path!r}" 

2033 ) 

2034 shutil.rmtree(full_path) 

2035 else: 

2036 try: 

2037 os.rmdir(full_path) 

2038 except OSError as e: 

2039 if e.errno == errno.ENOTEMPTY: 

2040 raise IsADirectoryError( 

2041 f"Cannot replace non-empty directory with file: {full_path!r}" 

2042 ) 

2043 raise 

2044 elif current_stat is not None: 

2045 _remove_file_with_readonly_handling(full_path) 

2046 

2047 # Ensure parent directory exists 

2048 _ensure_parent_dir_exists(full_path) 

2049 

2050 # Write the file 

2051 blob_obj = object_store[entry.sha] 

2052 assert isinstance(blob_obj, Blob) 

2053 if blob_normalizer: 

2054 blob_obj = blob_normalizer.checkout_normalize(blob_obj, path) 

2055 st = build_file_from_blob( 

2056 blob_obj, 

2057 entry.mode, 

2058 full_path, 

2059 honor_filemode=honor_filemode, 

2060 tree_encoding=tree_encoding, 

2061 symlink_fn=symlink_fn, 

2062 ) 

2063 index[path] = index_entry_from_stat(st, entry.sha) 

2064 

2065 

2066def _transition_to_absent( 

2067 repo: "Repo", 

2068 path: bytes, 

2069 full_path: bytes, 

2070 current_stat: Optional[os.stat_result], 

2071 index: Index, 

2072) -> None: 

2073 """Remove any type of entry.""" 

2074 if current_stat is None: 

2075 return 

2076 

2077 if stat.S_ISDIR(current_stat.st_mode): 

2078 # Check if it's a submodule directory 

2079 dir_contents = set(os.listdir(full_path)) 

2080 git_file_name = b".git" if isinstance(full_path, bytes) else ".git" 

2081 

2082 if git_file_name in dir_contents and dir_contents == {git_file_name}: 

2083 shutil.rmtree(full_path) 

2084 else: 

2085 try: 

2086 os.rmdir(full_path) 

2087 except OSError as e: 

2088 if e.errno not in (errno.ENOTEMPTY, errno.EEXIST): 

2089 raise 

2090 else: 

2091 _remove_file_with_readonly_handling(full_path) 

2092 

2093 try: 

2094 del index[path] 

2095 except KeyError: 

2096 pass 

2097 

2098 # Try to remove empty parent directories 

2099 _remove_empty_parents( 

2100 full_path, repo.path if isinstance(repo.path, bytes) else repo.path.encode() 

2101 ) 

2102 

2103 

2104def detect_case_only_renames( 

2105 changes: Sequence["TreeChange"], 

2106 config: "Config", 

2107) -> list["TreeChange"]: 

2108 """Detect and transform case-only renames in a list of tree changes. 

2109 

2110 This function identifies file renames that only differ in case (e.g., 

2111 README.txt -> readme.txt) and transforms matching ADD/DELETE pairs into 

2112 CHANGE_RENAME operations. It uses filesystem-appropriate path normalization 

2113 based on the repository configuration. 

2114 

2115 Args: 

2116 changes: List of TreeChange objects representing file changes 

2117 config: Repository configuration object 

2118 

2119 Returns: 

2120 New list of TreeChange objects with case-only renames converted to CHANGE_RENAME 

2121 """ 

2122 from .diff_tree import ( 

2123 CHANGE_ADD, 

2124 CHANGE_COPY, 

2125 CHANGE_DELETE, 

2126 CHANGE_MODIFY, 

2127 CHANGE_RENAME, 

2128 TreeChange, 

2129 ) 

2130 

2131 # Build dictionaries of old and new paths with their normalized forms 

2132 old_paths_normalized = {} 

2133 new_paths_normalized = {} 

2134 old_changes = {} # Map from old path to change object 

2135 new_changes = {} # Map from new path to change object 

2136 

2137 # Get the appropriate normalizer based on config 

2138 normalize_func = get_path_element_normalizer(config) 

2139 

2140 def normalize_path(path: bytes) -> bytes: 

2141 """Normalize entire path using element normalization.""" 

2142 return b"/".join(normalize_func(part) for part in path.split(b"/")) 

2143 

2144 # Pre-normalize all paths once to avoid repeated normalization 

2145 for change in changes: 

2146 if change.type == CHANGE_DELETE and change.old: 

2147 assert change.old.path is not None 

2148 try: 

2149 normalized = normalize_path(change.old.path) 

2150 except UnicodeDecodeError: 

2151 import logging 

2152 

2153 logging.warning( 

2154 "Skipping case-only rename detection for path with invalid UTF-8: %r", 

2155 change.old.path, 

2156 ) 

2157 else: 

2158 old_paths_normalized[normalized] = change.old.path 

2159 old_changes[change.old.path] = change 

2160 elif change.type == CHANGE_RENAME and change.old: 

2161 assert change.old.path is not None 

2162 # Treat RENAME as DELETE + ADD for case-only detection 

2163 try: 

2164 normalized = normalize_path(change.old.path) 

2165 except UnicodeDecodeError: 

2166 import logging 

2167 

2168 logging.warning( 

2169 "Skipping case-only rename detection for path with invalid UTF-8: %r", 

2170 change.old.path, 

2171 ) 

2172 else: 

2173 old_paths_normalized[normalized] = change.old.path 

2174 old_changes[change.old.path] = change 

2175 

2176 if ( 

2177 change.type in (CHANGE_ADD, CHANGE_MODIFY, CHANGE_RENAME, CHANGE_COPY) 

2178 and change.new 

2179 ): 

2180 assert change.new.path is not None 

2181 try: 

2182 normalized = normalize_path(change.new.path) 

2183 except UnicodeDecodeError: 

2184 import logging 

2185 

2186 logging.warning( 

2187 "Skipping case-only rename detection for path with invalid UTF-8: %r", 

2188 change.new.path, 

2189 ) 

2190 else: 

2191 new_paths_normalized[normalized] = change.new.path 

2192 new_changes[change.new.path] = change 

2193 

2194 # Find case-only renames and transform changes 

2195 case_only_renames = set() 

2196 new_rename_changes = [] 

2197 

2198 for norm_path, old_path in old_paths_normalized.items(): 

2199 if norm_path in new_paths_normalized: 

2200 new_path = new_paths_normalized[norm_path] 

2201 if old_path != new_path: 

2202 # Found a case-only rename 

2203 old_change = old_changes[old_path] 

2204 new_change = new_changes[new_path] 

2205 

2206 # Create a CHANGE_RENAME to replace the DELETE and ADD/MODIFY pair 

2207 if new_change.type == CHANGE_ADD: 

2208 # Simple case: DELETE + ADD becomes RENAME 

2209 rename_change = TreeChange( 

2210 CHANGE_RENAME, old_change.old, new_change.new 

2211 ) 

2212 else: 

2213 # Complex case: DELETE + MODIFY becomes RENAME 

2214 # Use the old file from DELETE and new file from MODIFY 

2215 rename_change = TreeChange( 

2216 CHANGE_RENAME, old_change.old, new_change.new 

2217 ) 

2218 

2219 new_rename_changes.append(rename_change) 

2220 

2221 # Mark the old changes for removal 

2222 case_only_renames.add(old_change) 

2223 case_only_renames.add(new_change) 

2224 

2225 # Return new list with original ADD/DELETE changes replaced by renames 

2226 result = [change for change in changes if change not in case_only_renames] 

2227 result.extend(new_rename_changes) 

2228 return result 

2229 

2230 

2231def update_working_tree( 

2232 repo: "Repo", 

2233 old_tree_id: Optional[bytes], 

2234 new_tree_id: bytes, 

2235 change_iterator: Iterator["TreeChange"], 

2236 honor_filemode: bool = True, 

2237 validate_path_element: Optional[Callable[[bytes], bool]] = None, 

2238 symlink_fn: Optional[ 

2239 Callable[ 

2240 [Union[str, bytes, os.PathLike[str]], Union[str, bytes, os.PathLike[str]]], 

2241 None, 

2242 ] 

2243 ] = None, 

2244 force_remove_untracked: bool = False, 

2245 blob_normalizer: Optional["FilterBlobNormalizer"] = None, 

2246 tree_encoding: str = "utf-8", 

2247 allow_overwrite_modified: bool = False, 

2248) -> None: 

2249 """Update the working tree and index to match a new tree. 

2250 

2251 This function handles: 

2252 - Adding new files 

2253 - Updating modified files 

2254 - Removing deleted files 

2255 - Cleaning up empty directories 

2256 

2257 Args: 

2258 repo: Repository object 

2259 old_tree_id: SHA of the tree before the update 

2260 new_tree_id: SHA of the tree to update to 

2261 change_iterator: Iterator of TreeChange objects to apply 

2262 honor_filemode: An optional flag to honor core.filemode setting 

2263 validate_path_element: Function to validate path elements to check out 

2264 symlink_fn: Function to use for creating symlinks 

2265 force_remove_untracked: If True, remove files that exist in working 

2266 directory but not in target tree, even if old_tree_id is None 

2267 blob_normalizer: An optional BlobNormalizer to use for converting line 

2268 endings when writing blobs to the working directory. 

2269 tree_encoding: Encoding used for tree paths (default: utf-8) 

2270 allow_overwrite_modified: If False, raise an error when attempting to 

2271 overwrite files that have been modified compared to old_tree_id 

2272 """ 

2273 if validate_path_element is None: 

2274 validate_path_element = validate_path_element_default 

2275 

2276 from .diff_tree import ( 

2277 CHANGE_ADD, 

2278 CHANGE_COPY, 

2279 CHANGE_DELETE, 

2280 CHANGE_MODIFY, 

2281 CHANGE_RENAME, 

2282 CHANGE_UNCHANGED, 

2283 ) 

2284 

2285 repo_path = repo.path if isinstance(repo.path, bytes) else repo.path.encode() 

2286 index = repo.open_index() 

2287 

2288 # Convert iterator to list since we need multiple passes 

2289 changes = list(change_iterator) 

2290 

2291 # Transform case-only renames on case-insensitive filesystems 

2292 import platform 

2293 

2294 default_ignore_case = platform.system() in ("Windows", "Darwin") 

2295 config = repo.get_config() 

2296 ignore_case = config.get_boolean((b"core",), b"ignorecase", default_ignore_case) 

2297 

2298 if ignore_case: 

2299 config = repo.get_config() 

2300 changes = detect_case_only_renames(changes, config) 

2301 

2302 # Check for path conflicts where files need to become directories 

2303 paths_becoming_dirs = set() 

2304 for change in changes: 

2305 if change.type in (CHANGE_ADD, CHANGE_MODIFY, CHANGE_RENAME, CHANGE_COPY): 

2306 assert change.new is not None 

2307 path = change.new.path 

2308 assert path is not None 

2309 if b"/" in path: # This is a file inside a directory 

2310 # Check if any parent path exists as a file in the old tree or changes 

2311 parts = path.split(b"/") 

2312 for i in range(1, len(parts)): 

2313 parent = b"/".join(parts[:i]) 

2314 # See if this parent path is being deleted (was a file, becoming a dir) 

2315 for other_change in changes: 

2316 if ( 

2317 other_change.type == CHANGE_DELETE 

2318 and other_change.old 

2319 and other_change.old.path == parent 

2320 ): 

2321 paths_becoming_dirs.add(parent) 

2322 

2323 # Check if any path that needs to become a directory has been modified 

2324 for path in paths_becoming_dirs: 

2325 full_path = _tree_to_fs_path(repo_path, path, tree_encoding) 

2326 try: 

2327 current_stat = os.lstat(full_path) 

2328 except FileNotFoundError: 

2329 continue # File doesn't exist, nothing to check 

2330 except OSError as e: 

2331 raise OSError( 

2332 f"Cannot access {path.decode('utf-8', errors='replace')}: {e}" 

2333 ) from e 

2334 

2335 if stat.S_ISREG(current_stat.st_mode): 

2336 # Find the old entry for this path 

2337 old_change = None 

2338 for change in changes: 

2339 if ( 

2340 change.type == CHANGE_DELETE 

2341 and change.old 

2342 and change.old.path == path 

2343 ): 

2344 old_change = change 

2345 break 

2346 

2347 if old_change: 

2348 # Check if file has been modified 

2349 assert old_change.old is not None 

2350 assert ( 

2351 old_change.old.sha is not None and old_change.old.mode is not None 

2352 ) 

2353 file_matches = _check_file_matches( 

2354 repo.object_store, 

2355 full_path, 

2356 old_change.old.sha, 

2357 old_change.old.mode, 

2358 current_stat, 

2359 honor_filemode, 

2360 blob_normalizer, 

2361 path, 

2362 ) 

2363 if not file_matches: 

2364 raise OSError( 

2365 f"Cannot replace modified file with directory: {path!r}" 

2366 ) 

2367 

2368 # Check for uncommitted modifications before making any changes 

2369 if not allow_overwrite_modified and old_tree_id: 

2370 for change in changes: 

2371 # Only check files that are being modified or deleted 

2372 if change.type in (CHANGE_MODIFY, CHANGE_DELETE) and change.old: 

2373 path = change.old.path 

2374 assert path is not None 

2375 if path.startswith(b".git") or not validate_path( 

2376 path, validate_path_element 

2377 ): 

2378 continue 

2379 

2380 full_path = _tree_to_fs_path(repo_path, path, tree_encoding) 

2381 try: 

2382 current_stat = os.lstat(full_path) 

2383 except FileNotFoundError: 

2384 continue # File doesn't exist, nothing to check 

2385 except OSError as e: 

2386 raise OSError( 

2387 f"Cannot access {path.decode('utf-8', errors='replace')}: {e}" 

2388 ) from e 

2389 

2390 if stat.S_ISREG(current_stat.st_mode): 

2391 # Check if working tree file differs from old tree 

2392 assert change.old.sha is not None and change.old.mode is not None 

2393 file_matches = _check_file_matches( 

2394 repo.object_store, 

2395 full_path, 

2396 change.old.sha, 

2397 change.old.mode, 

2398 current_stat, 

2399 honor_filemode, 

2400 blob_normalizer, 

2401 path, 

2402 ) 

2403 if not file_matches: 

2404 from .errors import WorkingTreeModifiedError 

2405 

2406 raise WorkingTreeModifiedError( 

2407 f"Your local changes to '{path.decode('utf-8', errors='replace')}' " 

2408 f"would be overwritten by checkout. " 

2409 f"Please commit your changes or stash them before you switch branches." 

2410 ) 

2411 

2412 # Apply the changes 

2413 for change in changes: 

2414 if change.type in (CHANGE_DELETE, CHANGE_RENAME): 

2415 # Remove file/directory 

2416 assert change.old is not None and change.old.path is not None 

2417 path = change.old.path 

2418 if path.startswith(b".git") or not validate_path( 

2419 path, validate_path_element 

2420 ): 

2421 continue 

2422 

2423 full_path = _tree_to_fs_path(repo_path, path, tree_encoding) 

2424 try: 

2425 delete_stat: Optional[os.stat_result] = os.lstat(full_path) 

2426 except FileNotFoundError: 

2427 delete_stat = None 

2428 except OSError as e: 

2429 raise OSError( 

2430 f"Cannot access {path.decode('utf-8', errors='replace')}: {e}" 

2431 ) from e 

2432 

2433 _transition_to_absent(repo, path, full_path, delete_stat, index) 

2434 

2435 if change.type in ( 

2436 CHANGE_ADD, 

2437 CHANGE_MODIFY, 

2438 CHANGE_UNCHANGED, 

2439 CHANGE_COPY, 

2440 CHANGE_RENAME, 

2441 ): 

2442 # Add or modify file 

2443 assert ( 

2444 change.new is not None 

2445 and change.new.path is not None 

2446 and change.new.mode is not None 

2447 ) 

2448 path = change.new.path 

2449 if path.startswith(b".git") or not validate_path( 

2450 path, validate_path_element 

2451 ): 

2452 continue 

2453 

2454 full_path = _tree_to_fs_path(repo_path, path, tree_encoding) 

2455 try: 

2456 modify_stat: Optional[os.stat_result] = os.lstat(full_path) 

2457 except FileNotFoundError: 

2458 modify_stat = None 

2459 except OSError as e: 

2460 raise OSError( 

2461 f"Cannot access {path.decode('utf-8', errors='replace')}: {e}" 

2462 ) from e 

2463 

2464 if S_ISGITLINK(change.new.mode): 

2465 _transition_to_submodule( 

2466 repo, path, full_path, modify_stat, change.new, index 

2467 ) 

2468 else: 

2469 _transition_to_file( 

2470 repo.object_store, 

2471 path, 

2472 full_path, 

2473 modify_stat, 

2474 change.new, 

2475 index, 

2476 honor_filemode, 

2477 symlink_fn, 

2478 blob_normalizer, 

2479 tree_encoding, 

2480 ) 

2481 

2482 index.write() 

2483 

2484 

2485def _check_entry_for_changes( 

2486 tree_path: bytes, 

2487 entry: Union[IndexEntry, ConflictedIndexEntry], 

2488 root_path: bytes, 

2489 filter_blob_callback: Optional[Callable[[bytes, bytes], bytes]] = None, 

2490) -> Optional[bytes]: 

2491 """Check a single index entry for changes. 

2492 

2493 Args: 

2494 tree_path: Path in the tree 

2495 entry: Index entry to check 

2496 root_path: Root filesystem path 

2497 filter_blob_callback: Optional callback to filter blobs 

2498 Returns: tree_path if changed, None otherwise 

2499 """ 

2500 if isinstance(entry, ConflictedIndexEntry): 

2501 # Conflicted files are always unstaged 

2502 return tree_path 

2503 

2504 full_path = _tree_to_fs_path(root_path, tree_path) 

2505 try: 

2506 st = os.lstat(full_path) 

2507 if stat.S_ISDIR(st.st_mode): 

2508 if _has_directory_changed(tree_path, entry): 

2509 return tree_path 

2510 return None 

2511 

2512 if not stat.S_ISREG(st.st_mode) and not stat.S_ISLNK(st.st_mode): 

2513 return None 

2514 

2515 blob = blob_from_path_and_stat(full_path, st) 

2516 

2517 if filter_blob_callback is not None: 

2518 blob.data = filter_blob_callback(blob.data, tree_path) 

2519 except FileNotFoundError: 

2520 # The file was removed, so we assume that counts as 

2521 # different from whatever file used to exist. 

2522 return tree_path 

2523 else: 

2524 if blob.id != entry.sha: 

2525 return tree_path 

2526 return None 

2527 

2528 

2529def get_unstaged_changes( 

2530 index: Index, 

2531 root_path: Union[str, bytes], 

2532 filter_blob_callback: Optional[Callable[..., Any]] = None, 

2533 preload_index: bool = False, 

2534) -> Generator[bytes, None, None]: 

2535 """Walk through an index and check for differences against working tree. 

2536 

2537 Args: 

2538 index: index to check 

2539 root_path: path in which to find files 

2540 filter_blob_callback: Optional callback to filter blobs 

2541 preload_index: If True, use parallel threads to check files (requires threading support) 

2542 Returns: iterator over paths with unstaged changes 

2543 """ 

2544 # For each entry in the index check the sha1 & ensure not staged 

2545 if not isinstance(root_path, bytes): 

2546 root_path = os.fsencode(root_path) 

2547 

2548 if preload_index: 

2549 # Use parallel processing for better performance on slow filesystems 

2550 try: 

2551 import multiprocessing 

2552 from concurrent.futures import ThreadPoolExecutor 

2553 except ImportError: 

2554 # If threading is not available, fall back to serial processing 

2555 preload_index = False 

2556 else: 

2557 # Collect all entries first 

2558 entries = list(index.iteritems()) 

2559 

2560 # Use number of CPUs but cap at 8 threads to avoid overhead 

2561 num_workers = min(multiprocessing.cpu_count(), 8) 

2562 

2563 # Process entries in parallel 

2564 with ThreadPoolExecutor(max_workers=num_workers) as executor: 

2565 # Submit all tasks 

2566 futures = [ 

2567 executor.submit( 

2568 _check_entry_for_changes, 

2569 tree_path, 

2570 entry, 

2571 root_path, 

2572 filter_blob_callback, 

2573 ) 

2574 for tree_path, entry in entries 

2575 ] 

2576 

2577 # Yield results as they complete 

2578 for future in futures: 

2579 result = future.result() 

2580 if result is not None: 

2581 yield result 

2582 

2583 if not preload_index: 

2584 # Serial processing 

2585 for tree_path, entry in index.iteritems(): 

2586 result = _check_entry_for_changes( 

2587 tree_path, entry, root_path, filter_blob_callback 

2588 ) 

2589 if result is not None: 

2590 yield result 

2591 

2592 

2593def _tree_to_fs_path( 

2594 root_path: bytes, tree_path: bytes, tree_encoding: str = "utf-8" 

2595) -> bytes: 

2596 """Convert a git tree path to a file system path. 

2597 

2598 Args: 

2599 root_path: Root filesystem path 

2600 tree_path: Git tree path as bytes (encoded with tree_encoding) 

2601 tree_encoding: Encoding used for tree paths (default: utf-8) 

2602 

2603 Returns: File system path. 

2604 """ 

2605 assert isinstance(tree_path, bytes) 

2606 if os_sep_bytes != b"/": 

2607 sep_corrected_path = tree_path.replace(b"/", os_sep_bytes) 

2608 else: 

2609 sep_corrected_path = tree_path 

2610 

2611 # On Windows, we need to handle tree path encoding properly 

2612 if sys.platform == "win32": 

2613 # Decode from tree encoding, then re-encode for filesystem 

2614 try: 

2615 tree_path_str = sep_corrected_path.decode(tree_encoding) 

2616 sep_corrected_path = os.fsencode(tree_path_str) 

2617 except UnicodeDecodeError: 

2618 # If decoding fails, use the original bytes 

2619 pass 

2620 

2621 return os.path.join(root_path, sep_corrected_path) 

2622 

2623 

2624def _fs_to_tree_path(fs_path: Union[str, bytes], tree_encoding: str = "utf-8") -> bytes: 

2625 """Convert a file system path to a git tree path. 

2626 

2627 Args: 

2628 fs_path: File system path. 

2629 tree_encoding: Encoding to use for tree paths (default: utf-8) 

2630 

2631 Returns: Git tree path as bytes (encoded with tree_encoding) 

2632 """ 

2633 if not isinstance(fs_path, bytes): 

2634 fs_path_bytes = os.fsencode(fs_path) 

2635 else: 

2636 fs_path_bytes = fs_path 

2637 

2638 # On Windows, we need to ensure tree paths are properly encoded 

2639 if sys.platform == "win32": 

2640 try: 

2641 # Decode from filesystem encoding, then re-encode with tree encoding 

2642 fs_path_str = os.fsdecode(fs_path_bytes) 

2643 fs_path_bytes = fs_path_str.encode(tree_encoding) 

2644 except UnicodeDecodeError: 

2645 # If filesystem decoding fails, use the original bytes 

2646 pass 

2647 

2648 if os_sep_bytes != b"/": 

2649 tree_path = fs_path_bytes.replace(os_sep_bytes, b"/") 

2650 else: 

2651 tree_path = fs_path_bytes 

2652 return tree_path 

2653 

2654 

2655def index_entry_from_directory(st: os.stat_result, path: bytes) -> Optional[IndexEntry]: 

2656 """Create an index entry for a directory. 

2657 

2658 This is only used for submodules (directories containing .git). 

2659 

2660 Args: 

2661 st: Stat result for the directory 

2662 path: Path to the directory 

2663 

2664 Returns: 

2665 IndexEntry for a submodule, or None if not a submodule 

2666 """ 

2667 if os.path.exists(os.path.join(path, b".git")): 

2668 head = read_submodule_head(path) 

2669 if head is None: 

2670 return None 

2671 return index_entry_from_stat(st, head, mode=S_IFGITLINK) 

2672 return None 

2673 

2674 

2675def index_entry_from_path( 

2676 path: bytes, object_store: Optional[ObjectContainer] = None 

2677) -> Optional[IndexEntry]: 

2678 """Create an index from a filesystem path. 

2679 

2680 This returns an index value for files, symlinks 

2681 and tree references. for directories and 

2682 non-existent files it returns None 

2683 

2684 Args: 

2685 path: Path to create an index entry for 

2686 object_store: Optional object store to 

2687 save new blobs in 

2688 Returns: An index entry; None for directories 

2689 """ 

2690 assert isinstance(path, bytes) 

2691 st = os.lstat(path) 

2692 if stat.S_ISDIR(st.st_mode): 

2693 return index_entry_from_directory(st, path) 

2694 

2695 if stat.S_ISREG(st.st_mode) or stat.S_ISLNK(st.st_mode): 

2696 blob = blob_from_path_and_stat(path, st) 

2697 if object_store is not None: 

2698 object_store.add_object(blob) 

2699 return index_entry_from_stat(st, blob.id) 

2700 

2701 return None 

2702 

2703 

2704def iter_fresh_entries( 

2705 paths: Iterable[bytes], 

2706 root_path: bytes, 

2707 object_store: Optional[ObjectContainer] = None, 

2708) -> Iterator[tuple[bytes, Optional[IndexEntry]]]: 

2709 """Iterate over current versions of index entries on disk. 

2710 

2711 Args: 

2712 paths: Paths to iterate over 

2713 root_path: Root path to access from 

2714 object_store: Optional store to save new blobs in 

2715 Returns: Iterator over path, index_entry 

2716 """ 

2717 for path in paths: 

2718 p = _tree_to_fs_path(root_path, path) 

2719 try: 

2720 entry = index_entry_from_path(p, object_store=object_store) 

2721 except (FileNotFoundError, IsADirectoryError): 

2722 entry = None 

2723 yield path, entry 

2724 

2725 

2726def iter_fresh_objects( 

2727 paths: Iterable[bytes], 

2728 root_path: bytes, 

2729 include_deleted: bool = False, 

2730 object_store: Optional[ObjectContainer] = None, 

2731) -> Iterator[tuple[bytes, Optional[bytes], Optional[int]]]: 

2732 """Iterate over versions of objects on disk referenced by index. 

2733 

2734 Args: 

2735 paths: Paths to check 

2736 root_path: Root path to access from 

2737 include_deleted: Include deleted entries with sha and 

2738 mode set to None 

2739 object_store: Optional object store to report new items to 

2740 Returns: Iterator over path, sha, mode 

2741 """ 

2742 for path, entry in iter_fresh_entries(paths, root_path, object_store=object_store): 

2743 if entry is None: 

2744 if include_deleted: 

2745 yield path, None, None 

2746 else: 

2747 yield path, entry.sha, cleanup_mode(entry.mode) 

2748 

2749 

2750def refresh_index(index: Index, root_path: bytes) -> None: 

2751 """Refresh the contents of an index. 

2752 

2753 This is the equivalent to running 'git commit -a'. 

2754 

2755 Args: 

2756 index: Index to update 

2757 root_path: Root filesystem path 

2758 """ 

2759 for path, entry in iter_fresh_entries(index, root_path): 

2760 if entry: 

2761 index[path] = entry 

2762 

2763 

2764class locked_index: 

2765 """Lock the index while making modifications. 

2766 

2767 Works as a context manager. 

2768 """ 

2769 

2770 _file: "_GitFile" 

2771 

2772 def __init__(self, path: Union[bytes, str]) -> None: 

2773 """Initialize locked_index.""" 

2774 self._path = path 

2775 

2776 def __enter__(self) -> Index: 

2777 """Enter context manager and lock index.""" 

2778 f = GitFile(self._path, "wb") 

2779 self._file = f 

2780 self._index = Index(self._path) 

2781 return self._index 

2782 

2783 def __exit__( 

2784 self, 

2785 exc_type: Optional[type], 

2786 exc_value: Optional[BaseException], 

2787 traceback: Optional[types.TracebackType], 

2788 ) -> None: 

2789 """Exit context manager and unlock index.""" 

2790 if exc_type is not None: 

2791 self._file.abort() 

2792 return 

2793 try: 

2794 f = SHA1Writer(self._file) 

2795 write_index_dict(f, self._index._byname) 

2796 except BaseException: 

2797 self._file.abort() 

2798 else: 

2799 f.close()