Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/dulwich/index.py: 33%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

1071 statements  

1# index.py -- File parser/writer for the git index file 

2# Copyright (C) 2008-2013 Jelmer Vernooij <jelmer@jelmer.uk> 

3# 

4# SPDX-License-Identifier: Apache-2.0 OR GPL-2.0-or-later 

5# Dulwich is dual-licensed under the Apache License, Version 2.0 and the GNU 

6# General Public License as published by the Free Software Foundation; version 2.0 

7# or (at your option) any later version. You can redistribute it and/or 

8# modify it under the terms of either of these two licenses. 

9# 

10# Unless required by applicable law or agreed to in writing, software 

11# distributed under the License is distributed on an "AS IS" BASIS, 

12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

13# See the License for the specific language governing permissions and 

14# limitations under the License. 

15# 

16# You should have received a copy of the licenses; if not, see 

17# <http://www.gnu.org/licenses/> for a copy of the GNU General Public License 

18# and <http://www.apache.org/licenses/LICENSE-2.0> for a copy of the Apache 

19# License, Version 2.0. 

20# 

21 

22"""Parser for the git index file format.""" 

23 

24import errno 

25import os 

26import shutil 

27import stat 

28import struct 

29import sys 

30import types 

31from collections.abc import Generator, Iterable, Iterator 

32from dataclasses import dataclass 

33from enum import Enum 

34from typing import ( 

35 TYPE_CHECKING, 

36 Any, 

37 BinaryIO, 

38 Callable, 

39 Optional, 

40 Union, 

41 cast, 

42) 

43 

44if TYPE_CHECKING: 

45 from .config import Config 

46 from .diff_tree import TreeChange 

47 from .file import _GitFile 

48 from .line_ending import BlobNormalizer 

49 from .object_store import BaseObjectStore 

50 from .repo import Repo 

51 

52from .file import GitFile 

53from .object_store import iter_tree_contents 

54from .objects import ( 

55 S_IFGITLINK, 

56 S_ISGITLINK, 

57 Blob, 

58 ObjectID, 

59 Tree, 

60 hex_to_sha, 

61 sha_to_hex, 

62) 

63from .pack import ObjectContainer, SHA1Reader, SHA1Writer 

64 

65# 2-bit stage (during merge) 

66FLAG_STAGEMASK = 0x3000 

67FLAG_STAGESHIFT = 12 

68FLAG_NAMEMASK = 0x0FFF 

69 

70# assume-valid 

71FLAG_VALID = 0x8000 

72 

73# extended flag (must be zero in version 2) 

74FLAG_EXTENDED = 0x4000 

75 

76# used by sparse checkout 

77EXTENDED_FLAG_SKIP_WORKTREE = 0x4000 

78 

79# used by "git add -N" 

80EXTENDED_FLAG_INTEND_TO_ADD = 0x2000 

81 

82DEFAULT_VERSION = 2 

83 

84# Index extension signatures 

85TREE_EXTENSION = b"TREE" 

86REUC_EXTENSION = b"REUC" 

87UNTR_EXTENSION = b"UNTR" 

88EOIE_EXTENSION = b"EOIE" 

89IEOT_EXTENSION = b"IEOT" 

90 

91 

92def _encode_varint(value: int) -> bytes: 

93 """Encode an integer using variable-width encoding. 

94 

95 Same format as used for OFS_DELTA pack entries and index v4 path compression. 

96 Uses 7 bits per byte, with the high bit indicating continuation. 

97 

98 Args: 

99 value: Integer to encode 

100 Returns: 

101 Encoded bytes 

102 """ 

103 if value == 0: 

104 return b"\x00" 

105 

106 result = [] 

107 while value > 0: 

108 byte = value & 0x7F # Take lower 7 bits 

109 value >>= 7 

110 if value > 0: 

111 byte |= 0x80 # Set continuation bit 

112 result.append(byte) 

113 

114 return bytes(result) 

115 

116 

117def _decode_varint(data: bytes, offset: int = 0) -> tuple[int, int]: 

118 """Decode a variable-width encoded integer. 

119 

120 Args: 

121 data: Bytes to decode from 

122 offset: Starting offset in data 

123 Returns: 

124 tuple of (decoded_value, new_offset) 

125 """ 

126 value = 0 

127 shift = 0 

128 pos = offset 

129 

130 while pos < len(data): 

131 byte = data[pos] 

132 pos += 1 

133 value |= (byte & 0x7F) << shift 

134 shift += 7 

135 if not (byte & 0x80): # No continuation bit 

136 break 

137 

138 return value, pos 

139 

140 

141def _compress_path(path: bytes, previous_path: bytes) -> bytes: 

142 """Compress a path relative to the previous path for index version 4. 

143 

144 Args: 

145 path: Path to compress 

146 previous_path: Previous path for comparison 

147 Returns: 

148 Compressed path data (varint prefix_len + suffix) 

149 """ 

150 # Find the common prefix length 

151 common_len = 0 

152 min_len = min(len(path), len(previous_path)) 

153 

154 for i in range(min_len): 

155 if path[i] == previous_path[i]: 

156 common_len += 1 

157 else: 

158 break 

159 

160 # The number of bytes to remove from the end of previous_path 

161 # to get the common prefix 

162 remove_len = len(previous_path) - common_len 

163 

164 # The suffix to append 

165 suffix = path[common_len:] 

166 

167 # Encode: varint(remove_len) + suffix + NUL 

168 return _encode_varint(remove_len) + suffix + b"\x00" 

169 

170 

171def _decompress_path( 

172 data: bytes, offset: int, previous_path: bytes 

173) -> tuple[bytes, int]: 

174 """Decompress a path from index version 4 compressed format. 

175 

176 Args: 

177 data: Raw data containing compressed path 

178 offset: Starting offset in data 

179 previous_path: Previous path for decompression 

180 Returns: 

181 tuple of (decompressed_path, new_offset) 

182 """ 

183 # Decode the number of bytes to remove from previous path 

184 remove_len, new_offset = _decode_varint(data, offset) 

185 

186 # Find the NUL terminator for the suffix 

187 suffix_start = new_offset 

188 suffix_end = suffix_start 

189 while suffix_end < len(data) and data[suffix_end] != 0: 

190 suffix_end += 1 

191 

192 if suffix_end >= len(data): 

193 raise ValueError("Unterminated path suffix in compressed entry") 

194 

195 suffix = data[suffix_start:suffix_end] 

196 new_offset = suffix_end + 1 # Skip the NUL terminator 

197 

198 # Reconstruct the path 

199 if remove_len > len(previous_path): 

200 raise ValueError( 

201 f"Invalid path compression: trying to remove {remove_len} bytes from {len(previous_path)}-byte path" 

202 ) 

203 

204 prefix = previous_path[:-remove_len] if remove_len > 0 else previous_path 

205 path = prefix + suffix 

206 

207 return path, new_offset 

208 

209 

210def _decompress_path_from_stream( 

211 f: BinaryIO, previous_path: bytes 

212) -> tuple[bytes, int]: 

213 """Decompress a path from index version 4 compressed format, reading from stream. 

214 

215 Args: 

216 f: File-like object to read from 

217 previous_path: Previous path for decompression 

218 Returns: 

219 tuple of (decompressed_path, bytes_consumed) 

220 """ 

221 # Decode the varint for remove_len by reading byte by byte 

222 remove_len = 0 

223 shift = 0 

224 bytes_consumed = 0 

225 

226 while True: 

227 byte_data = f.read(1) 

228 if not byte_data: 

229 raise ValueError("Unexpected end of file while reading varint") 

230 byte = byte_data[0] 

231 bytes_consumed += 1 

232 remove_len |= (byte & 0x7F) << shift 

233 shift += 7 

234 if not (byte & 0x80): # No continuation bit 

235 break 

236 

237 # Read the suffix until NUL terminator 

238 suffix = b"" 

239 while True: 

240 byte_data = f.read(1) 

241 if not byte_data: 

242 raise ValueError("Unexpected end of file while reading path suffix") 

243 byte = byte_data[0] 

244 bytes_consumed += 1 

245 if byte == 0: # NUL terminator 

246 break 

247 suffix += bytes([byte]) 

248 

249 # Reconstruct the path 

250 if remove_len > len(previous_path): 

251 raise ValueError( 

252 f"Invalid path compression: trying to remove {remove_len} bytes from {len(previous_path)}-byte path" 

253 ) 

254 

255 prefix = previous_path[:-remove_len] if remove_len > 0 else previous_path 

256 path = prefix + suffix 

257 

258 return path, bytes_consumed 

259 

260 

261class Stage(Enum): 

262 NORMAL = 0 

263 MERGE_CONFLICT_ANCESTOR = 1 

264 MERGE_CONFLICT_THIS = 2 

265 MERGE_CONFLICT_OTHER = 3 

266 

267 

268@dataclass 

269class SerializedIndexEntry: 

270 name: bytes 

271 ctime: Union[int, float, tuple[int, int]] 

272 mtime: Union[int, float, tuple[int, int]] 

273 dev: int 

274 ino: int 

275 mode: int 

276 uid: int 

277 gid: int 

278 size: int 

279 sha: bytes 

280 flags: int 

281 extended_flags: int 

282 

283 def stage(self) -> Stage: 

284 return Stage((self.flags & FLAG_STAGEMASK) >> FLAG_STAGESHIFT) 

285 

286 

287@dataclass 

288class IndexExtension: 

289 """Base class for index extensions.""" 

290 

291 signature: bytes 

292 data: bytes 

293 

294 @classmethod 

295 def from_raw(cls, signature: bytes, data: bytes) -> "IndexExtension": 

296 """Create an extension from raw data. 

297 

298 Args: 

299 signature: 4-byte extension signature 

300 data: Extension data 

301 Returns: 

302 Parsed extension object 

303 """ 

304 if signature == TREE_EXTENSION: 

305 return TreeExtension.from_bytes(data) 

306 elif signature == REUC_EXTENSION: 

307 return ResolveUndoExtension.from_bytes(data) 

308 elif signature == UNTR_EXTENSION: 

309 return UntrackedExtension.from_bytes(data) 

310 else: 

311 # Unknown extension - just store raw data 

312 return cls(signature, data) 

313 

314 def to_bytes(self) -> bytes: 

315 """Serialize extension to bytes.""" 

316 return self.data 

317 

318 

319class TreeExtension(IndexExtension): 

320 """Tree cache extension.""" 

321 

322 def __init__(self, entries: list[tuple[bytes, bytes, int]]) -> None: 

323 self.entries = entries 

324 super().__init__(TREE_EXTENSION, b"") 

325 

326 @classmethod 

327 def from_bytes(cls, data: bytes) -> "TreeExtension": 

328 # TODO: Implement tree cache parsing 

329 return cls([]) 

330 

331 def to_bytes(self) -> bytes: 

332 # TODO: Implement tree cache serialization 

333 return b"" 

334 

335 

336class ResolveUndoExtension(IndexExtension): 

337 """Resolve undo extension for recording merge conflicts.""" 

338 

339 def __init__(self, entries: list[tuple[bytes, list[tuple[int, bytes]]]]) -> None: 

340 self.entries = entries 

341 super().__init__(REUC_EXTENSION, b"") 

342 

343 @classmethod 

344 def from_bytes(cls, data: bytes) -> "ResolveUndoExtension": 

345 # TODO: Implement resolve undo parsing 

346 return cls([]) 

347 

348 def to_bytes(self) -> bytes: 

349 # TODO: Implement resolve undo serialization 

350 return b"" 

351 

352 

353class UntrackedExtension(IndexExtension): 

354 """Untracked cache extension.""" 

355 

356 def __init__(self, data: bytes) -> None: 

357 super().__init__(UNTR_EXTENSION, data) 

358 

359 @classmethod 

360 def from_bytes(cls, data: bytes) -> "UntrackedExtension": 

361 return cls(data) 

362 

363 

364@dataclass 

365class IndexEntry: 

366 ctime: Union[int, float, tuple[int, int]] 

367 mtime: Union[int, float, tuple[int, int]] 

368 dev: int 

369 ino: int 

370 mode: int 

371 uid: int 

372 gid: int 

373 size: int 

374 sha: bytes 

375 flags: int = 0 

376 extended_flags: int = 0 

377 

378 @classmethod 

379 def from_serialized(cls, serialized: SerializedIndexEntry) -> "IndexEntry": 

380 return cls( 

381 ctime=serialized.ctime, 

382 mtime=serialized.mtime, 

383 dev=serialized.dev, 

384 ino=serialized.ino, 

385 mode=serialized.mode, 

386 uid=serialized.uid, 

387 gid=serialized.gid, 

388 size=serialized.size, 

389 sha=serialized.sha, 

390 flags=serialized.flags, 

391 extended_flags=serialized.extended_flags, 

392 ) 

393 

394 def serialize(self, name: bytes, stage: Stage) -> SerializedIndexEntry: 

395 # Clear out any existing stage bits, then set them from the Stage. 

396 new_flags = self.flags & ~FLAG_STAGEMASK 

397 new_flags |= stage.value << FLAG_STAGESHIFT 

398 return SerializedIndexEntry( 

399 name=name, 

400 ctime=self.ctime, 

401 mtime=self.mtime, 

402 dev=self.dev, 

403 ino=self.ino, 

404 mode=self.mode, 

405 uid=self.uid, 

406 gid=self.gid, 

407 size=self.size, 

408 sha=self.sha, 

409 flags=new_flags, 

410 extended_flags=self.extended_flags, 

411 ) 

412 

413 def stage(self) -> Stage: 

414 return Stage((self.flags & FLAG_STAGEMASK) >> FLAG_STAGESHIFT) 

415 

416 @property 

417 def skip_worktree(self) -> bool: 

418 """Return True if the skip-worktree bit is set in extended_flags.""" 

419 return bool(self.extended_flags & EXTENDED_FLAG_SKIP_WORKTREE) 

420 

421 def set_skip_worktree(self, skip: bool = True) -> None: 

422 """Helper method to set or clear the skip-worktree bit in extended_flags. 

423 Also sets FLAG_EXTENDED in self.flags if needed. 

424 """ 

425 if skip: 

426 # Turn on the skip-worktree bit 

427 self.extended_flags |= EXTENDED_FLAG_SKIP_WORKTREE 

428 # Also ensure the main 'extended' bit is set in flags 

429 self.flags |= FLAG_EXTENDED 

430 else: 

431 # Turn off the skip-worktree bit 

432 self.extended_flags &= ~EXTENDED_FLAG_SKIP_WORKTREE 

433 # Optionally unset the main extended bit if no extended flags remain 

434 if self.extended_flags == 0: 

435 self.flags &= ~FLAG_EXTENDED 

436 

437 

438class ConflictedIndexEntry: 

439 """Index entry that represents a conflict.""" 

440 

441 ancestor: Optional[IndexEntry] 

442 this: Optional[IndexEntry] 

443 other: Optional[IndexEntry] 

444 

445 def __init__( 

446 self, 

447 ancestor: Optional[IndexEntry] = None, 

448 this: Optional[IndexEntry] = None, 

449 other: Optional[IndexEntry] = None, 

450 ) -> None: 

451 self.ancestor = ancestor 

452 self.this = this 

453 self.other = other 

454 

455 

456class UnmergedEntries(Exception): 

457 """Unmerged entries exist in the index.""" 

458 

459 

460def pathsplit(path: bytes) -> tuple[bytes, bytes]: 

461 """Split a /-delimited path into a directory part and a basename. 

462 

463 Args: 

464 path: The path to split. 

465 

466 Returns: 

467 Tuple with directory name and basename 

468 """ 

469 try: 

470 (dirname, basename) = path.rsplit(b"/", 1) 

471 except ValueError: 

472 return (b"", path) 

473 else: 

474 return (dirname, basename) 

475 

476 

477def pathjoin(*args: bytes) -> bytes: 

478 """Join a /-delimited path.""" 

479 return b"/".join([p for p in args if p]) 

480 

481 

482def read_cache_time(f: BinaryIO) -> tuple[int, int]: 

483 """Read a cache time. 

484 

485 Args: 

486 f: File-like object to read from 

487 Returns: 

488 Tuple with seconds and nanoseconds 

489 """ 

490 return struct.unpack(">LL", f.read(8)) 

491 

492 

493def write_cache_time(f: BinaryIO, t: Union[int, float, tuple[int, int]]) -> None: 

494 """Write a cache time. 

495 

496 Args: 

497 f: File-like object to write to 

498 t: Time to write (as int, float or tuple with secs and nsecs) 

499 """ 

500 if isinstance(t, int): 

501 t = (t, 0) 

502 elif isinstance(t, float): 

503 (secs, nsecs) = divmod(t, 1.0) 

504 t = (int(secs), int(nsecs * 1000000000)) 

505 elif not isinstance(t, tuple): 

506 raise TypeError(t) 

507 f.write(struct.pack(">LL", *t)) 

508 

509 

510def read_cache_entry( 

511 f: BinaryIO, version: int, previous_path: bytes = b"" 

512) -> SerializedIndexEntry: 

513 """Read an entry from a cache file. 

514 

515 Args: 

516 f: File-like object to read from 

517 version: Index version 

518 previous_path: Previous entry's path (for version 4 compression) 

519 """ 

520 beginoffset = f.tell() 

521 ctime = read_cache_time(f) 

522 mtime = read_cache_time(f) 

523 ( 

524 dev, 

525 ino, 

526 mode, 

527 uid, 

528 gid, 

529 size, 

530 sha, 

531 flags, 

532 ) = struct.unpack(">LLLLLL20sH", f.read(20 + 4 * 6 + 2)) 

533 if flags & FLAG_EXTENDED: 

534 if version < 3: 

535 raise AssertionError("extended flag set in index with version < 3") 

536 (extended_flags,) = struct.unpack(">H", f.read(2)) 

537 else: 

538 extended_flags = 0 

539 

540 if version >= 4: 

541 # Version 4: paths are always compressed (name_len should be 0) 

542 name, consumed = _decompress_path_from_stream(f, previous_path) 

543 else: 

544 # Versions < 4: regular name reading 

545 name = f.read(flags & FLAG_NAMEMASK) 

546 

547 # Padding: 

548 if version < 4: 

549 real_size = (f.tell() - beginoffset + 8) & ~7 

550 f.read((beginoffset + real_size) - f.tell()) 

551 

552 return SerializedIndexEntry( 

553 name, 

554 ctime, 

555 mtime, 

556 dev, 

557 ino, 

558 mode, 

559 uid, 

560 gid, 

561 size, 

562 sha_to_hex(sha), 

563 flags & ~FLAG_NAMEMASK, 

564 extended_flags, 

565 ) 

566 

567 

568def write_cache_entry( 

569 f: BinaryIO, entry: SerializedIndexEntry, version: int, previous_path: bytes = b"" 

570) -> None: 

571 """Write an index entry to a file. 

572 

573 Args: 

574 f: File object 

575 entry: IndexEntry to write 

576 version: Index format version 

577 previous_path: Previous entry's path (for version 4 compression) 

578 """ 

579 beginoffset = f.tell() 

580 write_cache_time(f, entry.ctime) 

581 write_cache_time(f, entry.mtime) 

582 

583 if version >= 4: 

584 # Version 4: use compression but set name_len to actual filename length 

585 # This matches how C Git implements index v4 flags 

586 compressed_path = _compress_path(entry.name, previous_path) 

587 flags = len(entry.name) | (entry.flags & ~FLAG_NAMEMASK) 

588 else: 

589 # Versions < 4: include actual name length 

590 flags = len(entry.name) | (entry.flags & ~FLAG_NAMEMASK) 

591 

592 if entry.extended_flags: 

593 flags |= FLAG_EXTENDED 

594 if flags & FLAG_EXTENDED and version is not None and version < 3: 

595 raise AssertionError("unable to use extended flags in version < 3") 

596 

597 f.write( 

598 struct.pack( 

599 b">LLLLLL20sH", 

600 entry.dev & 0xFFFFFFFF, 

601 entry.ino & 0xFFFFFFFF, 

602 entry.mode, 

603 entry.uid, 

604 entry.gid, 

605 entry.size, 

606 hex_to_sha(entry.sha), 

607 flags, 

608 ) 

609 ) 

610 if flags & FLAG_EXTENDED: 

611 f.write(struct.pack(b">H", entry.extended_flags)) 

612 

613 if version >= 4: 

614 # Version 4: always write compressed path 

615 f.write(compressed_path) 

616 else: 

617 # Versions < 4: write regular path and padding 

618 f.write(entry.name) 

619 real_size = (f.tell() - beginoffset + 8) & ~7 

620 f.write(b"\0" * ((beginoffset + real_size) - f.tell())) 

621 

622 

623class UnsupportedIndexFormat(Exception): 

624 """An unsupported index format was encountered.""" 

625 

626 def __init__(self, version: int) -> None: 

627 self.index_format_version = version 

628 

629 

630def read_index_header(f: BinaryIO) -> tuple[int, int]: 

631 """Read an index header from a file. 

632 

633 Returns: 

634 tuple of (version, num_entries) 

635 """ 

636 header = f.read(4) 

637 if header != b"DIRC": 

638 raise AssertionError(f"Invalid index file header: {header!r}") 

639 (version, num_entries) = struct.unpack(b">LL", f.read(4 * 2)) 

640 if version not in (1, 2, 3, 4): 

641 raise UnsupportedIndexFormat(version) 

642 return version, num_entries 

643 

644 

645def write_index_extension(f: BinaryIO, extension: IndexExtension) -> None: 

646 """Write an index extension. 

647 

648 Args: 

649 f: File-like object to write to 

650 extension: Extension to write 

651 """ 

652 data = extension.to_bytes() 

653 f.write(extension.signature) 

654 f.write(struct.pack(">I", len(data))) 

655 f.write(data) 

656 

657 

658def read_index(f: BinaryIO) -> Iterator[SerializedIndexEntry]: 

659 """Read an index file, yielding the individual entries.""" 

660 version, num_entries = read_index_header(f) 

661 previous_path = b"" 

662 for i in range(num_entries): 

663 entry = read_cache_entry(f, version, previous_path) 

664 previous_path = entry.name 

665 yield entry 

666 

667 

668def read_index_dict_with_version( 

669 f: BinaryIO, 

670) -> tuple[ 

671 dict[bytes, Union[IndexEntry, ConflictedIndexEntry]], int, list[IndexExtension] 

672]: 

673 """Read an index file and return it as a dictionary along with the version. 

674 

675 Returns: 

676 tuple of (entries_dict, version, extensions) 

677 """ 

678 version, num_entries = read_index_header(f) 

679 

680 ret: dict[bytes, Union[IndexEntry, ConflictedIndexEntry]] = {} 

681 previous_path = b"" 

682 for i in range(num_entries): 

683 entry = read_cache_entry(f, version, previous_path) 

684 previous_path = entry.name 

685 stage = entry.stage() 

686 if stage == Stage.NORMAL: 

687 ret[entry.name] = IndexEntry.from_serialized(entry) 

688 else: 

689 existing = ret.setdefault(entry.name, ConflictedIndexEntry()) 

690 if isinstance(existing, IndexEntry): 

691 raise AssertionError(f"Non-conflicted entry for {entry.name!r} exists") 

692 if stage == Stage.MERGE_CONFLICT_ANCESTOR: 

693 existing.ancestor = IndexEntry.from_serialized(entry) 

694 elif stage == Stage.MERGE_CONFLICT_THIS: 

695 existing.this = IndexEntry.from_serialized(entry) 

696 elif stage == Stage.MERGE_CONFLICT_OTHER: 

697 existing.other = IndexEntry.from_serialized(entry) 

698 

699 # Read extensions 

700 extensions = [] 

701 while True: 

702 # Check if we're at the end (20 bytes before EOF for SHA checksum) 

703 current_pos = f.tell() 

704 f.seek(0, 2) # EOF 

705 eof_pos = f.tell() 

706 f.seek(current_pos) 

707 

708 if current_pos >= eof_pos - 20: 

709 break 

710 

711 # Try to read extension signature 

712 signature = f.read(4) 

713 if len(signature) < 4: 

714 break 

715 

716 # Check if it's a valid extension signature (4 uppercase letters) 

717 if not all(65 <= b <= 90 for b in signature): 

718 # Not an extension, seek back 

719 f.seek(-4, 1) 

720 break 

721 

722 # Read extension size 

723 size_data = f.read(4) 

724 if len(size_data) < 4: 

725 break 

726 size = struct.unpack(">I", size_data)[0] 

727 

728 # Read extension data 

729 data = f.read(size) 

730 if len(data) < size: 

731 break 

732 

733 extension = IndexExtension.from_raw(signature, data) 

734 extensions.append(extension) 

735 

736 return ret, version, extensions 

737 

738 

739def read_index_dict( 

740 f: BinaryIO, 

741) -> dict[bytes, Union[IndexEntry, ConflictedIndexEntry]]: 

742 """Read an index file and return it as a dictionary. 

743 Dict Key is tuple of path and stage number, as 

744 path alone is not unique 

745 Args: 

746 f: File object to read fromls. 

747 """ 

748 ret: dict[bytes, Union[IndexEntry, ConflictedIndexEntry]] = {} 

749 for entry in read_index(f): 

750 stage = entry.stage() 

751 if stage == Stage.NORMAL: 

752 ret[entry.name] = IndexEntry.from_serialized(entry) 

753 else: 

754 existing = ret.setdefault(entry.name, ConflictedIndexEntry()) 

755 if isinstance(existing, IndexEntry): 

756 raise AssertionError(f"Non-conflicted entry for {entry.name!r} exists") 

757 if stage == Stage.MERGE_CONFLICT_ANCESTOR: 

758 existing.ancestor = IndexEntry.from_serialized(entry) 

759 elif stage == Stage.MERGE_CONFLICT_THIS: 

760 existing.this = IndexEntry.from_serialized(entry) 

761 elif stage == Stage.MERGE_CONFLICT_OTHER: 

762 existing.other = IndexEntry.from_serialized(entry) 

763 return ret 

764 

765 

766def write_index( 

767 f: BinaryIO, 

768 entries: list[SerializedIndexEntry], 

769 version: Optional[int] = None, 

770 extensions: Optional[list[IndexExtension]] = None, 

771) -> None: 

772 """Write an index file. 

773 

774 Args: 

775 f: File-like object to write to 

776 version: Version number to write 

777 entries: Iterable over the entries to write 

778 extensions: Optional list of extensions to write 

779 """ 

780 if version is None: 

781 version = DEFAULT_VERSION 

782 # STEP 1: check if any extended_flags are set 

783 uses_extended_flags = any(e.extended_flags != 0 for e in entries) 

784 if uses_extended_flags and version < 3: 

785 # Force or bump the version to 3 

786 version = 3 

787 # The rest is unchanged, but you might insert a final check: 

788 if version < 3: 

789 # Double-check no extended flags appear 

790 for e in entries: 

791 if e.extended_flags != 0: 

792 raise AssertionError("Attempt to use extended flags in index < v3") 

793 # Proceed with the existing code to write the header and entries. 

794 f.write(b"DIRC") 

795 f.write(struct.pack(b">LL", version, len(entries))) 

796 previous_path = b"" 

797 for entry in entries: 

798 write_cache_entry(f, entry, version=version, previous_path=previous_path) 

799 previous_path = entry.name 

800 

801 # Write extensions 

802 if extensions: 

803 for extension in extensions: 

804 write_index_extension(f, extension) 

805 

806 

807def write_index_dict( 

808 f: BinaryIO, 

809 entries: dict[bytes, Union[IndexEntry, ConflictedIndexEntry]], 

810 version: Optional[int] = None, 

811 extensions: Optional[list[IndexExtension]] = None, 

812) -> None: 

813 """Write an index file based on the contents of a dictionary. 

814 being careful to sort by path and then by stage. 

815 """ 

816 entries_list = [] 

817 for key in sorted(entries): 

818 value = entries[key] 

819 if isinstance(value, ConflictedIndexEntry): 

820 if value.ancestor is not None: 

821 entries_list.append( 

822 value.ancestor.serialize(key, Stage.MERGE_CONFLICT_ANCESTOR) 

823 ) 

824 if value.this is not None: 

825 entries_list.append( 

826 value.this.serialize(key, Stage.MERGE_CONFLICT_THIS) 

827 ) 

828 if value.other is not None: 

829 entries_list.append( 

830 value.other.serialize(key, Stage.MERGE_CONFLICT_OTHER) 

831 ) 

832 else: 

833 entries_list.append(value.serialize(key, Stage.NORMAL)) 

834 

835 write_index(f, entries_list, version=version, extensions=extensions) 

836 

837 

838def cleanup_mode(mode: int) -> int: 

839 """Cleanup a mode value. 

840 

841 This will return a mode that can be stored in a tree object. 

842 

843 Args: 

844 mode: Mode to clean up. 

845 

846 Returns: 

847 mode 

848 """ 

849 if stat.S_ISLNK(mode): 

850 return stat.S_IFLNK 

851 elif stat.S_ISDIR(mode): 

852 return stat.S_IFDIR 

853 elif S_ISGITLINK(mode): 

854 return S_IFGITLINK 

855 ret = stat.S_IFREG | 0o644 

856 if mode & 0o100: 

857 ret |= 0o111 

858 return ret 

859 

860 

861class Index: 

862 """A Git Index file.""" 

863 

864 _byname: dict[bytes, Union[IndexEntry, ConflictedIndexEntry]] 

865 

866 def __init__( 

867 self, 

868 filename: Union[bytes, str, os.PathLike], 

869 read: bool = True, 

870 skip_hash: bool = False, 

871 version: Optional[int] = None, 

872 ) -> None: 

873 """Create an index object associated with the given filename. 

874 

875 Args: 

876 filename: Path to the index file 

877 read: Whether to initialize the index from the given file, should it exist. 

878 skip_hash: Whether to skip SHA1 hash when writing (for manyfiles feature) 

879 version: Index format version to use (None = auto-detect from file or use default) 

880 """ 

881 self._filename = os.fspath(filename) 

882 # TODO(jelmer): Store the version returned by read_index 

883 self._version = version 

884 self._skip_hash = skip_hash 

885 self._extensions: list[IndexExtension] = [] 

886 self.clear() 

887 if read: 

888 self.read() 

889 

890 @property 

891 def path(self) -> Union[bytes, str]: 

892 return self._filename 

893 

894 def __repr__(self) -> str: 

895 return f"{self.__class__.__name__}({self._filename!r})" 

896 

897 def write(self) -> None: 

898 """Write current contents of index to disk.""" 

899 from typing import BinaryIO, cast 

900 

901 f = GitFile(self._filename, "wb") 

902 try: 

903 # Filter out extensions with no meaningful data 

904 meaningful_extensions = [] 

905 for ext in self._extensions: 

906 # Skip extensions that have empty data 

907 ext_data = ext.to_bytes() 

908 if ext_data: 

909 meaningful_extensions.append(ext) 

910 

911 if self._skip_hash: 

912 # When skipHash is enabled, write the index without computing SHA1 

913 write_index_dict( 

914 cast(BinaryIO, f), 

915 self._byname, 

916 version=self._version, 

917 extensions=meaningful_extensions, 

918 ) 

919 # Write 20 zero bytes instead of SHA1 

920 f.write(b"\x00" * 20) 

921 f.close() 

922 else: 

923 sha1_writer = SHA1Writer(cast(BinaryIO, f)) 

924 write_index_dict( 

925 cast(BinaryIO, sha1_writer), 

926 self._byname, 

927 version=self._version, 

928 extensions=meaningful_extensions, 

929 ) 

930 sha1_writer.close() 

931 except: 

932 f.close() 

933 raise 

934 

935 def read(self) -> None: 

936 """Read current contents of index from disk.""" 

937 if not os.path.exists(self._filename): 

938 return 

939 f = GitFile(self._filename, "rb") 

940 try: 

941 sha1_reader = SHA1Reader(f) 

942 entries, version, extensions = read_index_dict_with_version( 

943 cast(BinaryIO, sha1_reader) 

944 ) 

945 self._version = version 

946 self._extensions = extensions 

947 self.update(entries) 

948 # Extensions have already been read by read_index_dict_with_version 

949 sha1_reader.check_sha(allow_empty=True) 

950 finally: 

951 f.close() 

952 

953 def __len__(self) -> int: 

954 """Number of entries in this index file.""" 

955 return len(self._byname) 

956 

957 def __getitem__(self, key: bytes) -> Union[IndexEntry, ConflictedIndexEntry]: 

958 """Retrieve entry by relative path and stage. 

959 

960 Returns: Either a IndexEntry or a ConflictedIndexEntry 

961 Raises KeyError: if the entry does not exist 

962 """ 

963 return self._byname[key] 

964 

965 def __iter__(self) -> Iterator[bytes]: 

966 """Iterate over the paths and stages in this index.""" 

967 return iter(self._byname) 

968 

969 def __contains__(self, key: bytes) -> bool: 

970 return key in self._byname 

971 

972 def get_sha1(self, path: bytes) -> bytes: 

973 """Return the (git object) SHA1 for the object at a path.""" 

974 value = self[path] 

975 if isinstance(value, ConflictedIndexEntry): 

976 raise UnmergedEntries 

977 return value.sha 

978 

979 def get_mode(self, path: bytes) -> int: 

980 """Return the POSIX file mode for the object at a path.""" 

981 value = self[path] 

982 if isinstance(value, ConflictedIndexEntry): 

983 raise UnmergedEntries 

984 return value.mode 

985 

986 def iterobjects(self) -> Iterable[tuple[bytes, bytes, int]]: 

987 """Iterate over path, sha, mode tuples for use with commit_tree.""" 

988 for path in self: 

989 entry = self[path] 

990 if isinstance(entry, ConflictedIndexEntry): 

991 raise UnmergedEntries 

992 yield path, entry.sha, cleanup_mode(entry.mode) 

993 

994 def has_conflicts(self) -> bool: 

995 for value in self._byname.values(): 

996 if isinstance(value, ConflictedIndexEntry): 

997 return True 

998 return False 

999 

1000 def clear(self) -> None: 

1001 """Remove all contents from this index.""" 

1002 self._byname = {} 

1003 

1004 def __setitem__( 

1005 self, name: bytes, value: Union[IndexEntry, ConflictedIndexEntry] 

1006 ) -> None: 

1007 assert isinstance(name, bytes) 

1008 self._byname[name] = value 

1009 

1010 def __delitem__(self, name: bytes) -> None: 

1011 del self._byname[name] 

1012 

1013 def iteritems( 

1014 self, 

1015 ) -> Iterator[tuple[bytes, Union[IndexEntry, ConflictedIndexEntry]]]: 

1016 return iter(self._byname.items()) 

1017 

1018 def items(self) -> Iterator[tuple[bytes, Union[IndexEntry, ConflictedIndexEntry]]]: 

1019 return iter(self._byname.items()) 

1020 

1021 def update( 

1022 self, entries: dict[bytes, Union[IndexEntry, ConflictedIndexEntry]] 

1023 ) -> None: 

1024 for key, value in entries.items(): 

1025 self[key] = value 

1026 

1027 def paths(self) -> Generator[bytes, None, None]: 

1028 yield from self._byname.keys() 

1029 

1030 def changes_from_tree( 

1031 self, 

1032 object_store: ObjectContainer, 

1033 tree: ObjectID, 

1034 want_unchanged: bool = False, 

1035 ) -> Generator[ 

1036 tuple[ 

1037 tuple[Optional[bytes], Optional[bytes]], 

1038 tuple[Optional[int], Optional[int]], 

1039 tuple[Optional[bytes], Optional[bytes]], 

1040 ], 

1041 None, 

1042 None, 

1043 ]: 

1044 """Find the differences between the contents of this index and a tree. 

1045 

1046 Args: 

1047 object_store: Object store to use for retrieving tree contents 

1048 tree: SHA1 of the root tree 

1049 want_unchanged: Whether unchanged files should be reported 

1050 Returns: Iterator over tuples with (oldpath, newpath), (oldmode, 

1051 newmode), (oldsha, newsha) 

1052 """ 

1053 

1054 def lookup_entry(path: bytes) -> tuple[bytes, int]: 

1055 entry = self[path] 

1056 if hasattr(entry, "sha") and hasattr(entry, "mode"): 

1057 return entry.sha, cleanup_mode(entry.mode) 

1058 else: 

1059 # Handle ConflictedIndexEntry case 

1060 return b"", 0 

1061 

1062 yield from changes_from_tree( 

1063 self.paths(), 

1064 lookup_entry, 

1065 object_store, 

1066 tree, 

1067 want_unchanged=want_unchanged, 

1068 ) 

1069 

1070 def commit(self, object_store: ObjectContainer) -> bytes: 

1071 """Create a new tree from an index. 

1072 

1073 Args: 

1074 object_store: Object store to save the tree in 

1075 Returns: 

1076 Root tree SHA 

1077 """ 

1078 return commit_tree(object_store, self.iterobjects()) 

1079 

1080 

1081def commit_tree( 

1082 object_store: ObjectContainer, blobs: Iterable[tuple[bytes, bytes, int]] 

1083) -> bytes: 

1084 """Commit a new tree. 

1085 

1086 Args: 

1087 object_store: Object store to add trees to 

1088 blobs: Iterable over blob path, sha, mode entries 

1089 Returns: 

1090 SHA1 of the created tree. 

1091 """ 

1092 trees: dict[bytes, Any] = {b"": {}} 

1093 

1094 def add_tree(path: bytes) -> dict[bytes, Any]: 

1095 if path in trees: 

1096 return trees[path] 

1097 dirname, basename = pathsplit(path) 

1098 t = add_tree(dirname) 

1099 assert isinstance(basename, bytes) 

1100 newtree: dict[bytes, Any] = {} 

1101 t[basename] = newtree 

1102 trees[path] = newtree 

1103 return newtree 

1104 

1105 for path, sha, mode in blobs: 

1106 tree_path, basename = pathsplit(path) 

1107 tree = add_tree(tree_path) 

1108 tree[basename] = (mode, sha) 

1109 

1110 def build_tree(path: bytes) -> bytes: 

1111 tree = Tree() 

1112 for basename, entry in trees[path].items(): 

1113 if isinstance(entry, dict): 

1114 mode = stat.S_IFDIR 

1115 sha = build_tree(pathjoin(path, basename)) 

1116 else: 

1117 (mode, sha) = entry 

1118 tree.add(basename, mode, sha) 

1119 object_store.add_object(tree) 

1120 return tree.id 

1121 

1122 return build_tree(b"") 

1123 

1124 

1125def commit_index(object_store: ObjectContainer, index: Index) -> bytes: 

1126 """Create a new tree from an index. 

1127 

1128 Args: 

1129 object_store: Object store to save the tree in 

1130 index: Index file 

1131 Note: This function is deprecated, use index.commit() instead. 

1132 Returns: Root tree sha. 

1133 """ 

1134 return commit_tree(object_store, index.iterobjects()) 

1135 

1136 

1137def changes_from_tree( 

1138 names: Iterable[bytes], 

1139 lookup_entry: Callable[[bytes], tuple[bytes, int]], 

1140 object_store: ObjectContainer, 

1141 tree: Optional[bytes], 

1142 want_unchanged: bool = False, 

1143) -> Iterable[ 

1144 tuple[ 

1145 tuple[Optional[bytes], Optional[bytes]], 

1146 tuple[Optional[int], Optional[int]], 

1147 tuple[Optional[bytes], Optional[bytes]], 

1148 ] 

1149]: 

1150 """Find the differences between the contents of a tree and 

1151 a working copy. 

1152 

1153 Args: 

1154 names: Iterable of names in the working copy 

1155 lookup_entry: Function to lookup an entry in the working copy 

1156 object_store: Object store to use for retrieving tree contents 

1157 tree: SHA1 of the root tree, or None for an empty tree 

1158 want_unchanged: Whether unchanged files should be reported 

1159 Returns: Iterator over tuples with (oldpath, newpath), (oldmode, newmode), 

1160 (oldsha, newsha) 

1161 """ 

1162 # TODO(jelmer): Support a include_trees option 

1163 other_names = set(names) 

1164 

1165 if tree is not None: 

1166 for name, mode, sha in iter_tree_contents(object_store, tree): 

1167 try: 

1168 (other_sha, other_mode) = lookup_entry(name) 

1169 except KeyError: 

1170 # Was removed 

1171 yield ((name, None), (mode, None), (sha, None)) 

1172 else: 

1173 other_names.remove(name) 

1174 if want_unchanged or other_sha != sha or other_mode != mode: 

1175 yield ((name, name), (mode, other_mode), (sha, other_sha)) 

1176 

1177 # Mention added files 

1178 for name in other_names: 

1179 try: 

1180 (other_sha, other_mode) = lookup_entry(name) 

1181 except KeyError: 

1182 pass 

1183 else: 

1184 yield ((None, name), (None, other_mode), (None, other_sha)) 

1185 

1186 

1187def index_entry_from_stat( 

1188 stat_val: os.stat_result, 

1189 hex_sha: bytes, 

1190 mode: Optional[int] = None, 

1191) -> IndexEntry: 

1192 """Create a new index entry from a stat value. 

1193 

1194 Args: 

1195 stat_val: POSIX stat_result instance 

1196 hex_sha: Hex sha of the object 

1197 """ 

1198 if mode is None: 

1199 mode = cleanup_mode(stat_val.st_mode) 

1200 

1201 return IndexEntry( 

1202 ctime=stat_val.st_ctime, 

1203 mtime=stat_val.st_mtime, 

1204 dev=stat_val.st_dev, 

1205 ino=stat_val.st_ino, 

1206 mode=mode, 

1207 uid=stat_val.st_uid, 

1208 gid=stat_val.st_gid, 

1209 size=stat_val.st_size, 

1210 sha=hex_sha, 

1211 flags=0, 

1212 extended_flags=0, 

1213 ) 

1214 

1215 

1216if sys.platform == "win32": 

1217 # On Windows, creating symlinks either requires administrator privileges 

1218 # or developer mode. Raise a more helpful error when we're unable to 

1219 # create symlinks 

1220 

1221 # https://github.com/jelmer/dulwich/issues/1005 

1222 

1223 class WindowsSymlinkPermissionError(PermissionError): 

1224 def __init__(self, errno: int, msg: str, filename: Optional[str]) -> None: 

1225 super(PermissionError, self).__init__( 

1226 errno, 

1227 f"Unable to create symlink; do you have developer mode enabled? {msg}", 

1228 filename, 

1229 ) 

1230 

1231 def symlink( 

1232 src: Union[str, bytes], 

1233 dst: Union[str, bytes], 

1234 target_is_directory: bool = False, 

1235 *, 

1236 dir_fd: Optional[int] = None, 

1237 ) -> None: 

1238 try: 

1239 return os.symlink( 

1240 src, dst, target_is_directory=target_is_directory, dir_fd=dir_fd 

1241 ) 

1242 except PermissionError as e: 

1243 raise WindowsSymlinkPermissionError( 

1244 e.errno or 0, e.strerror or "", e.filename 

1245 ) from e 

1246else: 

1247 symlink = os.symlink 

1248 

1249 

1250def build_file_from_blob( 

1251 blob: Blob, 

1252 mode: int, 

1253 target_path: bytes, 

1254 *, 

1255 honor_filemode: bool = True, 

1256 tree_encoding: str = "utf-8", 

1257 symlink_fn: Optional[Callable] = None, 

1258) -> os.stat_result: 

1259 """Build a file or symlink on disk based on a Git object. 

1260 

1261 Args: 

1262 blob: The git object 

1263 mode: File mode 

1264 target_path: Path to write to 

1265 honor_filemode: An optional flag to honor core.filemode setting in 

1266 config file, default is core.filemode=True, change executable bit 

1267 symlink_fn: Function to use for creating symlinks 

1268 Returns: stat object for the file 

1269 """ 

1270 try: 

1271 oldstat = os.lstat(target_path) 

1272 except FileNotFoundError: 

1273 oldstat = None 

1274 contents = blob.as_raw_string() 

1275 if stat.S_ISLNK(mode): 

1276 if oldstat: 

1277 _remove_file_with_readonly_handling(target_path) 

1278 if sys.platform == "win32": 

1279 # os.readlink on Python3 on Windows requires a unicode string. 

1280 contents_str = contents.decode(tree_encoding) 

1281 target_path_str = target_path.decode(tree_encoding) 

1282 (symlink_fn or symlink)(contents_str, target_path_str) 

1283 else: 

1284 (symlink_fn or symlink)(contents, target_path) 

1285 else: 

1286 if oldstat is not None and oldstat.st_size == len(contents): 

1287 with open(target_path, "rb") as f: 

1288 if f.read() == contents: 

1289 return oldstat 

1290 

1291 with open(target_path, "wb") as f: 

1292 # Write out file 

1293 f.write(contents) 

1294 

1295 if honor_filemode: 

1296 os.chmod(target_path, mode) 

1297 

1298 return os.lstat(target_path) 

1299 

1300 

1301INVALID_DOTNAMES = (b".git", b".", b"..", b"") 

1302 

1303 

1304def _normalize_path_element_default(element: bytes) -> bytes: 

1305 """Normalize path element for default case-insensitive comparison.""" 

1306 return element.lower() 

1307 

1308 

1309def _normalize_path_element_ntfs(element: bytes) -> bytes: 

1310 """Normalize path element for NTFS filesystem.""" 

1311 return element.rstrip(b". ").lower() 

1312 

1313 

1314def _normalize_path_element_hfs(element: bytes) -> bytes: 

1315 """Normalize path element for HFS+ filesystem.""" 

1316 import unicodedata 

1317 

1318 # Decode to Unicode (let UnicodeDecodeError bubble up) 

1319 element_str = element.decode("utf-8", errors="strict") 

1320 

1321 # Remove HFS+ ignorable characters 

1322 filtered = "".join(c for c in element_str if ord(c) not in HFS_IGNORABLE_CHARS) 

1323 # Normalize to NFD 

1324 normalized = unicodedata.normalize("NFD", filtered) 

1325 return normalized.lower().encode("utf-8", errors="strict") 

1326 

1327 

1328def get_path_element_normalizer(config: "Config") -> Callable[[bytes], bytes]: 

1329 """Get the appropriate path element normalization function based on config. 

1330 

1331 Args: 

1332 config: Repository configuration object 

1333 

1334 Returns: 

1335 Function that normalizes path elements for the configured filesystem 

1336 """ 

1337 import os 

1338 import sys 

1339 

1340 if config.get_boolean(b"core", b"protectNTFS", os.name == "nt"): 

1341 return _normalize_path_element_ntfs 

1342 elif config.get_boolean(b"core", b"protectHFS", sys.platform == "darwin"): 

1343 return _normalize_path_element_hfs 

1344 else: 

1345 return _normalize_path_element_default 

1346 

1347 

1348def validate_path_element_default(element: bytes) -> bool: 

1349 return _normalize_path_element_default(element) not in INVALID_DOTNAMES 

1350 

1351 

1352def validate_path_element_ntfs(element: bytes) -> bool: 

1353 normalized = _normalize_path_element_ntfs(element) 

1354 if normalized in INVALID_DOTNAMES: 

1355 return False 

1356 if normalized == b"git~1": 

1357 return False 

1358 return True 

1359 

1360 

1361# HFS+ ignorable Unicode codepoints (from Git's utf8.c) 

1362HFS_IGNORABLE_CHARS = { 

1363 0x200C, # ZERO WIDTH NON-JOINER 

1364 0x200D, # ZERO WIDTH JOINER 

1365 0x200E, # LEFT-TO-RIGHT MARK 

1366 0x200F, # RIGHT-TO-LEFT MARK 

1367 0x202A, # LEFT-TO-RIGHT EMBEDDING 

1368 0x202B, # RIGHT-TO-LEFT EMBEDDING 

1369 0x202C, # POP DIRECTIONAL FORMATTING 

1370 0x202D, # LEFT-TO-RIGHT OVERRIDE 

1371 0x202E, # RIGHT-TO-LEFT OVERRIDE 

1372 0x206A, # INHIBIT SYMMETRIC SWAPPING 

1373 0x206B, # ACTIVATE SYMMETRIC SWAPPING 

1374 0x206C, # INHIBIT ARABIC FORM SHAPING 

1375 0x206D, # ACTIVATE ARABIC FORM SHAPING 

1376 0x206E, # NATIONAL DIGIT SHAPES 

1377 0x206F, # NOMINAL DIGIT SHAPES 

1378 0xFEFF, # ZERO WIDTH NO-BREAK SPACE 

1379} 

1380 

1381 

1382def validate_path_element_hfs(element: bytes) -> bool: 

1383 """Validate path element for HFS+ filesystem. 

1384 

1385 Equivalent to Git's is_hfs_dotgit and related checks. 

1386 Uses NFD normalization and ignores HFS+ ignorable characters. 

1387 """ 

1388 try: 

1389 normalized = _normalize_path_element_hfs(element) 

1390 except UnicodeDecodeError: 

1391 # Malformed UTF-8 - be conservative and reject 

1392 return False 

1393 

1394 # Check against invalid names 

1395 if normalized in INVALID_DOTNAMES: 

1396 return False 

1397 

1398 # Also check for 8.3 short name 

1399 if normalized == b"git~1": 

1400 return False 

1401 

1402 return True 

1403 

1404 

1405def validate_path( 

1406 path: bytes, 

1407 element_validator: Callable[[bytes], bool] = validate_path_element_default, 

1408) -> bool: 

1409 """Default path validator that just checks for .git/.""" 

1410 parts = path.split(b"/") 

1411 for p in parts: 

1412 if not element_validator(p): 

1413 return False 

1414 else: 

1415 return True 

1416 

1417 

1418def build_index_from_tree( 

1419 root_path: Union[str, bytes], 

1420 index_path: Union[str, bytes], 

1421 object_store: ObjectContainer, 

1422 tree_id: bytes, 

1423 honor_filemode: bool = True, 

1424 validate_path_element: Callable[[bytes], bool] = validate_path_element_default, 

1425 symlink_fn: Optional[Callable] = None, 

1426 blob_normalizer: Optional["BlobNormalizer"] = None, 

1427 tree_encoding: str = "utf-8", 

1428) -> None: 

1429 """Generate and materialize index from a tree. 

1430 

1431 Args: 

1432 tree_id: Tree to materialize 

1433 root_path: Target dir for materialized index files 

1434 index_path: Target path for generated index 

1435 object_store: Non-empty object store holding tree contents 

1436 honor_filemode: An optional flag to honor core.filemode setting in 

1437 config file, default is core.filemode=True, change executable bit 

1438 validate_path_element: Function to validate path elements to check 

1439 out; default just refuses .git and .. directories. 

1440 blob_normalizer: An optional BlobNormalizer to use for converting line 

1441 endings when writing blobs to the working directory. 

1442 tree_encoding: Encoding used for tree paths (default: utf-8) 

1443 

1444 Note: existing index is wiped and contents are not merged 

1445 in a working dir. Suitable only for fresh clones. 

1446 """ 

1447 index = Index(index_path, read=False) 

1448 if not isinstance(root_path, bytes): 

1449 root_path = os.fsencode(root_path) 

1450 

1451 for entry in iter_tree_contents(object_store, tree_id): 

1452 if not validate_path(entry.path, validate_path_element): 

1453 continue 

1454 full_path = _tree_to_fs_path(root_path, entry.path, tree_encoding) 

1455 

1456 if not os.path.exists(os.path.dirname(full_path)): 

1457 os.makedirs(os.path.dirname(full_path)) 

1458 

1459 # TODO(jelmer): Merge new index into working tree 

1460 if S_ISGITLINK(entry.mode): 

1461 if not os.path.isdir(full_path): 

1462 os.mkdir(full_path) 

1463 st = os.lstat(full_path) 

1464 # TODO(jelmer): record and return submodule paths 

1465 else: 

1466 obj = object_store[entry.sha] 

1467 assert isinstance(obj, Blob) 

1468 # Apply blob normalization for checkout if normalizer is provided 

1469 if blob_normalizer is not None: 

1470 obj = blob_normalizer.checkout_normalize(obj, entry.path) 

1471 st = build_file_from_blob( 

1472 obj, 

1473 entry.mode, 

1474 full_path, 

1475 honor_filemode=honor_filemode, 

1476 tree_encoding=tree_encoding, 

1477 symlink_fn=symlink_fn, 

1478 ) 

1479 

1480 # Add file to index 

1481 if not honor_filemode or S_ISGITLINK(entry.mode): 

1482 # we can not use tuple slicing to build a new tuple, 

1483 # because on windows that will convert the times to 

1484 # longs, which causes errors further along 

1485 st_tuple = ( 

1486 entry.mode, 

1487 st.st_ino, 

1488 st.st_dev, 

1489 st.st_nlink, 

1490 st.st_uid, 

1491 st.st_gid, 

1492 st.st_size, 

1493 st.st_atime, 

1494 st.st_mtime, 

1495 st.st_ctime, 

1496 ) 

1497 st = st.__class__(st_tuple) 

1498 # default to a stage 0 index entry (normal) 

1499 # when reading from the filesystem 

1500 index[entry.path] = index_entry_from_stat(st, entry.sha) 

1501 

1502 index.write() 

1503 

1504 

1505def blob_from_path_and_mode( 

1506 fs_path: bytes, mode: int, tree_encoding: str = "utf-8" 

1507) -> Blob: 

1508 """Create a blob from a path and a stat object. 

1509 

1510 Args: 

1511 fs_path: Full file system path to file 

1512 mode: File mode 

1513 Returns: A `Blob` object 

1514 """ 

1515 assert isinstance(fs_path, bytes) 

1516 blob = Blob() 

1517 if stat.S_ISLNK(mode): 

1518 if sys.platform == "win32": 

1519 # os.readlink on Python3 on Windows requires a unicode string. 

1520 blob.data = os.readlink(os.fsdecode(fs_path)).encode(tree_encoding) 

1521 else: 

1522 blob.data = os.readlink(fs_path) 

1523 else: 

1524 with open(fs_path, "rb") as f: 

1525 blob.data = f.read() 

1526 return blob 

1527 

1528 

1529def blob_from_path_and_stat( 

1530 fs_path: bytes, st: os.stat_result, tree_encoding: str = "utf-8" 

1531) -> Blob: 

1532 """Create a blob from a path and a stat object. 

1533 

1534 Args: 

1535 fs_path: Full file system path to file 

1536 st: A stat object 

1537 Returns: A `Blob` object 

1538 """ 

1539 return blob_from_path_and_mode(fs_path, st.st_mode, tree_encoding) 

1540 

1541 

1542def read_submodule_head(path: Union[str, bytes]) -> Optional[bytes]: 

1543 """Read the head commit of a submodule. 

1544 

1545 Args: 

1546 path: path to the submodule 

1547 Returns: HEAD sha, None if not a valid head/repository 

1548 """ 

1549 from .errors import NotGitRepository 

1550 from .repo import Repo 

1551 

1552 # Repo currently expects a "str", so decode if necessary. 

1553 # TODO(jelmer): Perhaps move this into Repo() ? 

1554 if not isinstance(path, str): 

1555 path = os.fsdecode(path) 

1556 try: 

1557 repo = Repo(path) 

1558 except NotGitRepository: 

1559 return None 

1560 try: 

1561 return repo.head() 

1562 except KeyError: 

1563 return None 

1564 

1565 

1566def _has_directory_changed(tree_path: bytes, entry: IndexEntry) -> bool: 

1567 """Check if a directory has changed after getting an error. 

1568 

1569 When handling an error trying to create a blob from a path, call this 

1570 function. It will check if the path is a directory. If it's a directory 

1571 and a submodule, check the submodule head to see if it's has changed. If 

1572 not, consider the file as changed as Git tracked a file and not a 

1573 directory. 

1574 

1575 Return true if the given path should be considered as changed and False 

1576 otherwise or if the path is not a directory. 

1577 """ 

1578 # This is actually a directory 

1579 if os.path.exists(os.path.join(tree_path, b".git")): 

1580 # Submodule 

1581 head = read_submodule_head(tree_path) 

1582 if entry.sha != head: 

1583 return True 

1584 else: 

1585 # The file was changed to a directory, so consider it removed. 

1586 return True 

1587 

1588 return False 

1589 

1590 

1591os_sep_bytes = os.sep.encode("ascii") 

1592 

1593 

1594def _ensure_parent_dir_exists(full_path: bytes) -> None: 

1595 """Ensure parent directory exists, checking no parent is a file.""" 

1596 parent_dir = os.path.dirname(full_path) 

1597 if parent_dir and not os.path.exists(parent_dir): 

1598 # Walk up the directory tree to find the first existing parent 

1599 current = parent_dir 

1600 parents_to_check: list[bytes] = [] 

1601 

1602 while current and not os.path.exists(current): 

1603 parents_to_check.insert(0, current) 

1604 new_parent = os.path.dirname(current) 

1605 if new_parent == current: 

1606 # Reached the root or can't go up further 

1607 break 

1608 current = new_parent 

1609 

1610 # Check if the existing parent (if any) is a directory 

1611 if current and os.path.exists(current) and not os.path.isdir(current): 

1612 raise OSError( 

1613 f"Cannot create directory, parent path is a file: {current!r}" 

1614 ) 

1615 

1616 # Now check each parent we need to create isn't blocked by an existing file 

1617 for parent_path in parents_to_check: 

1618 if os.path.exists(parent_path) and not os.path.isdir(parent_path): 

1619 raise OSError( 

1620 f"Cannot create directory, parent path is a file: {parent_path!r}" 

1621 ) 

1622 

1623 os.makedirs(parent_dir) 

1624 

1625 

1626def _remove_file_with_readonly_handling(path: bytes) -> None: 

1627 """Remove a file, handling read-only files on Windows. 

1628 

1629 Args: 

1630 path: Path to the file to remove 

1631 """ 

1632 try: 

1633 os.unlink(path) 

1634 except PermissionError: 

1635 # On Windows, remove read-only attribute and retry 

1636 if sys.platform == "win32": 

1637 os.chmod(path, stat.S_IWRITE | stat.S_IREAD) 

1638 os.unlink(path) 

1639 else: 

1640 raise 

1641 

1642 

1643def _remove_empty_parents(path: bytes, stop_at: bytes) -> None: 

1644 """Remove empty parent directories up to stop_at.""" 

1645 parent = os.path.dirname(path) 

1646 while parent and parent != stop_at: 

1647 try: 

1648 os.rmdir(parent) 

1649 parent = os.path.dirname(parent) 

1650 except FileNotFoundError: 

1651 # Directory doesn't exist - stop trying 

1652 break 

1653 except OSError as e: 

1654 if e.errno == errno.ENOTEMPTY: 

1655 # Directory not empty - stop trying 

1656 break 

1657 raise 

1658 

1659 

1660def _check_symlink_matches( 

1661 full_path: bytes, repo_object_store: "BaseObjectStore", entry_sha: bytes 

1662) -> bool: 

1663 """Check if symlink target matches expected target. 

1664 

1665 Returns True if symlink matches, False if it doesn't match. 

1666 """ 

1667 try: 

1668 current_target = os.readlink(full_path) 

1669 blob_obj = repo_object_store[entry_sha] 

1670 expected_target = blob_obj.as_raw_string() 

1671 if isinstance(current_target, str): 

1672 current_target = current_target.encode() 

1673 return current_target == expected_target 

1674 except FileNotFoundError: 

1675 # Symlink doesn't exist 

1676 return False 

1677 except OSError as e: 

1678 if e.errno == errno.EINVAL: 

1679 # Not a symlink 

1680 return False 

1681 raise 

1682 

1683 

1684def _check_file_matches( 

1685 repo_object_store: "BaseObjectStore", 

1686 full_path: bytes, 

1687 entry_sha: bytes, 

1688 entry_mode: int, 

1689 current_stat: os.stat_result, 

1690 honor_filemode: bool, 

1691 blob_normalizer: Optional["BlobNormalizer"] = None, 

1692 tree_path: Optional[bytes] = None, 

1693) -> bool: 

1694 """Check if a file on disk matches the expected git object. 

1695 

1696 Returns True if file matches, False if it doesn't match. 

1697 """ 

1698 # Check mode first (if honor_filemode is True) 

1699 if honor_filemode: 

1700 current_mode = stat.S_IMODE(current_stat.st_mode) 

1701 expected_mode = stat.S_IMODE(entry_mode) 

1702 

1703 # For regular files, only check the user executable bit, not group/other permissions 

1704 # This matches Git's behavior where umask differences don't count as modifications 

1705 if stat.S_ISREG(current_stat.st_mode): 

1706 # Normalize regular file modes to ignore group/other write permissions 

1707 current_mode_normalized = ( 

1708 current_mode & 0o755 

1709 ) # Keep only user rwx and all read+execute 

1710 expected_mode_normalized = expected_mode & 0o755 

1711 

1712 # For Git compatibility, regular files should be either 644 or 755 

1713 if expected_mode_normalized not in (0o644, 0o755): 

1714 expected_mode_normalized = 0o644 # Default for regular files 

1715 if current_mode_normalized not in (0o644, 0o755): 

1716 # Determine if it should be executable based on user execute bit 

1717 if current_mode & 0o100: # User execute bit is set 

1718 current_mode_normalized = 0o755 

1719 else: 

1720 current_mode_normalized = 0o644 

1721 

1722 if current_mode_normalized != expected_mode_normalized: 

1723 return False 

1724 else: 

1725 # For non-regular files (symlinks, etc.), check mode exactly 

1726 if current_mode != expected_mode: 

1727 return False 

1728 

1729 # If mode matches (or we don't care), check content via size first 

1730 blob_obj = repo_object_store[entry_sha] 

1731 if current_stat.st_size != blob_obj.raw_length(): 

1732 return False 

1733 

1734 # Size matches, check actual content 

1735 try: 

1736 with open(full_path, "rb") as f: 

1737 current_content = f.read() 

1738 expected_content = blob_obj.as_raw_string() 

1739 if blob_normalizer and tree_path is not None: 

1740 assert isinstance(blob_obj, Blob) 

1741 normalized_blob = blob_normalizer.checkout_normalize( 

1742 blob_obj, tree_path 

1743 ) 

1744 expected_content = normalized_blob.as_raw_string() 

1745 return current_content == expected_content 

1746 except (FileNotFoundError, PermissionError, IsADirectoryError): 

1747 return False 

1748 

1749 

1750def _transition_to_submodule( 

1751 repo: "Repo", 

1752 path: bytes, 

1753 full_path: bytes, 

1754 current_stat: Optional[os.stat_result], 

1755 entry: IndexEntry, 

1756 index: Index, 

1757) -> None: 

1758 """Transition any type to submodule.""" 

1759 from .submodule import ensure_submodule_placeholder 

1760 

1761 if current_stat is not None and stat.S_ISDIR(current_stat.st_mode): 

1762 # Already a directory, just ensure .git file exists 

1763 ensure_submodule_placeholder(repo, path) 

1764 else: 

1765 # Remove whatever is there and create submodule 

1766 if current_stat is not None: 

1767 _remove_file_with_readonly_handling(full_path) 

1768 ensure_submodule_placeholder(repo, path) 

1769 

1770 st = os.lstat(full_path) 

1771 index[path] = index_entry_from_stat(st, entry.sha) 

1772 

1773 

1774def _transition_to_file( 

1775 object_store: "BaseObjectStore", 

1776 path: bytes, 

1777 full_path: bytes, 

1778 current_stat: Optional[os.stat_result], 

1779 entry: IndexEntry, 

1780 index: Index, 

1781 honor_filemode: bool, 

1782 symlink_fn: Optional[Callable[[bytes, bytes], None]], 

1783 blob_normalizer: Optional["BlobNormalizer"], 

1784 tree_encoding: str = "utf-8", 

1785) -> None: 

1786 """Transition any type to regular file or symlink.""" 

1787 # Check if we need to update 

1788 if ( 

1789 current_stat is not None 

1790 and stat.S_ISREG(current_stat.st_mode) 

1791 and not stat.S_ISLNK(entry.mode) 

1792 ): 

1793 # File to file - check if update needed 

1794 file_matches = _check_file_matches( 

1795 object_store, 

1796 full_path, 

1797 entry.sha, 

1798 entry.mode, 

1799 current_stat, 

1800 honor_filemode, 

1801 blob_normalizer, 

1802 path, 

1803 ) 

1804 needs_update = not file_matches 

1805 elif ( 

1806 current_stat is not None 

1807 and stat.S_ISLNK(current_stat.st_mode) 

1808 and stat.S_ISLNK(entry.mode) 

1809 ): 

1810 # Symlink to symlink - check if update needed 

1811 symlink_matches = _check_symlink_matches(full_path, object_store, entry.sha) 

1812 needs_update = not symlink_matches 

1813 else: 

1814 needs_update = True 

1815 

1816 if not needs_update: 

1817 # Just update index - current_stat should always be valid here since we're not updating 

1818 assert current_stat is not None 

1819 index[path] = index_entry_from_stat(current_stat, entry.sha) 

1820 return 

1821 

1822 # Remove existing entry if needed 

1823 if current_stat is not None and stat.S_ISDIR(current_stat.st_mode): 

1824 # Remove directory 

1825 dir_contents = set(os.listdir(full_path)) 

1826 git_file_name = b".git" if isinstance(full_path, bytes) else ".git" 

1827 

1828 if git_file_name in dir_contents: 

1829 if dir_contents != {git_file_name}: 

1830 raise IsADirectoryError( 

1831 f"Cannot replace submodule with untracked files: {full_path!r}" 

1832 ) 

1833 shutil.rmtree(full_path) 

1834 else: 

1835 try: 

1836 os.rmdir(full_path) 

1837 except OSError as e: 

1838 if e.errno == errno.ENOTEMPTY: 

1839 raise IsADirectoryError( 

1840 f"Cannot replace non-empty directory with file: {full_path!r}" 

1841 ) 

1842 raise 

1843 elif current_stat is not None: 

1844 _remove_file_with_readonly_handling(full_path) 

1845 

1846 # Ensure parent directory exists 

1847 _ensure_parent_dir_exists(full_path) 

1848 

1849 # Write the file 

1850 blob_obj = object_store[entry.sha] 

1851 assert isinstance(blob_obj, Blob) 

1852 if blob_normalizer: 

1853 blob_obj = blob_normalizer.checkout_normalize(blob_obj, path) 

1854 st = build_file_from_blob( 

1855 blob_obj, 

1856 entry.mode, 

1857 full_path, 

1858 honor_filemode=honor_filemode, 

1859 tree_encoding=tree_encoding, 

1860 symlink_fn=symlink_fn, 

1861 ) 

1862 index[path] = index_entry_from_stat(st, entry.sha) 

1863 

1864 

1865def _transition_to_absent( 

1866 repo: "Repo", 

1867 path: bytes, 

1868 full_path: bytes, 

1869 current_stat: Optional[os.stat_result], 

1870 index: Index, 

1871) -> None: 

1872 """Remove any type of entry.""" 

1873 if current_stat is None: 

1874 return 

1875 

1876 if stat.S_ISDIR(current_stat.st_mode): 

1877 # Check if it's a submodule directory 

1878 dir_contents = set(os.listdir(full_path)) 

1879 git_file_name = b".git" if isinstance(full_path, bytes) else ".git" 

1880 

1881 if git_file_name in dir_contents and dir_contents == {git_file_name}: 

1882 shutil.rmtree(full_path) 

1883 else: 

1884 try: 

1885 os.rmdir(full_path) 

1886 except OSError as e: 

1887 if e.errno not in (errno.ENOTEMPTY, errno.EEXIST): 

1888 raise 

1889 else: 

1890 _remove_file_with_readonly_handling(full_path) 

1891 

1892 try: 

1893 del index[path] 

1894 except KeyError: 

1895 pass 

1896 

1897 # Try to remove empty parent directories 

1898 _remove_empty_parents( 

1899 full_path, repo.path if isinstance(repo.path, bytes) else repo.path.encode() 

1900 ) 

1901 

1902 

1903def detect_case_only_renames( 

1904 changes: list["TreeChange"], 

1905 config: "Config", 

1906) -> list["TreeChange"]: 

1907 """Detect and transform case-only renames in a list of tree changes. 

1908 

1909 This function identifies file renames that only differ in case (e.g., 

1910 README.txt -> readme.txt) and transforms matching ADD/DELETE pairs into 

1911 CHANGE_RENAME operations. It uses filesystem-appropriate path normalization 

1912 based on the repository configuration. 

1913 

1914 Args: 

1915 changes: List of TreeChange objects representing file changes 

1916 config: Repository configuration object 

1917 

1918 Returns: 

1919 New list of TreeChange objects with case-only renames converted to CHANGE_RENAME 

1920 """ 

1921 from .diff_tree import ( 

1922 CHANGE_ADD, 

1923 CHANGE_COPY, 

1924 CHANGE_DELETE, 

1925 CHANGE_MODIFY, 

1926 CHANGE_RENAME, 

1927 TreeChange, 

1928 ) 

1929 

1930 # Build dictionaries of old and new paths with their normalized forms 

1931 old_paths_normalized = {} 

1932 new_paths_normalized = {} 

1933 old_changes = {} # Map from old path to change object 

1934 new_changes = {} # Map from new path to change object 

1935 

1936 # Get the appropriate normalizer based on config 

1937 normalize_func = get_path_element_normalizer(config) 

1938 

1939 def normalize_path(path: bytes) -> bytes: 

1940 """Normalize entire path using element normalization.""" 

1941 return b"/".join(normalize_func(part) for part in path.split(b"/")) 

1942 

1943 # Pre-normalize all paths once to avoid repeated normalization 

1944 for change in changes: 

1945 if change.type == CHANGE_DELETE and change.old: 

1946 try: 

1947 normalized = normalize_path(change.old.path) 

1948 except UnicodeDecodeError: 

1949 import logging 

1950 

1951 logging.warning( 

1952 "Skipping case-only rename detection for path with invalid UTF-8: %r", 

1953 change.old.path, 

1954 ) 

1955 else: 

1956 old_paths_normalized[normalized] = change.old.path 

1957 old_changes[change.old.path] = change 

1958 elif change.type == CHANGE_RENAME and change.old: 

1959 # Treat RENAME as DELETE + ADD for case-only detection 

1960 try: 

1961 normalized = normalize_path(change.old.path) 

1962 except UnicodeDecodeError: 

1963 import logging 

1964 

1965 logging.warning( 

1966 "Skipping case-only rename detection for path with invalid UTF-8: %r", 

1967 change.old.path, 

1968 ) 

1969 else: 

1970 old_paths_normalized[normalized] = change.old.path 

1971 old_changes[change.old.path] = change 

1972 

1973 if ( 

1974 change.type in (CHANGE_ADD, CHANGE_MODIFY, CHANGE_RENAME, CHANGE_COPY) 

1975 and change.new 

1976 ): 

1977 try: 

1978 normalized = normalize_path(change.new.path) 

1979 except UnicodeDecodeError: 

1980 import logging 

1981 

1982 logging.warning( 

1983 "Skipping case-only rename detection for path with invalid UTF-8: %r", 

1984 change.new.path, 

1985 ) 

1986 else: 

1987 new_paths_normalized[normalized] = change.new.path 

1988 new_changes[change.new.path] = change 

1989 

1990 # Find case-only renames and transform changes 

1991 case_only_renames = set() 

1992 new_rename_changes = [] 

1993 

1994 for norm_path, old_path in old_paths_normalized.items(): 

1995 if norm_path in new_paths_normalized: 

1996 new_path = new_paths_normalized[norm_path] 

1997 if old_path != new_path: 

1998 # Found a case-only rename 

1999 old_change = old_changes[old_path] 

2000 new_change = new_changes[new_path] 

2001 

2002 # Create a CHANGE_RENAME to replace the DELETE and ADD/MODIFY pair 

2003 if new_change.type == CHANGE_ADD: 

2004 # Simple case: DELETE + ADD becomes RENAME 

2005 rename_change = TreeChange( 

2006 CHANGE_RENAME, old_change.old, new_change.new 

2007 ) 

2008 else: 

2009 # Complex case: DELETE + MODIFY becomes RENAME 

2010 # Use the old file from DELETE and new file from MODIFY 

2011 rename_change = TreeChange( 

2012 CHANGE_RENAME, old_change.old, new_change.new 

2013 ) 

2014 

2015 new_rename_changes.append(rename_change) 

2016 

2017 # Mark the old changes for removal 

2018 case_only_renames.add(old_change) 

2019 case_only_renames.add(new_change) 

2020 

2021 # Return new list with original ADD/DELETE changes replaced by renames 

2022 result = [change for change in changes if change not in case_only_renames] 

2023 result.extend(new_rename_changes) 

2024 return result 

2025 

2026 

2027def update_working_tree( 

2028 repo: "Repo", 

2029 old_tree_id: Optional[bytes], 

2030 new_tree_id: bytes, 

2031 change_iterator: Iterator["TreeChange"], 

2032 honor_filemode: bool = True, 

2033 validate_path_element: Optional[Callable[[bytes], bool]] = None, 

2034 symlink_fn: Optional[Callable] = None, 

2035 force_remove_untracked: bool = False, 

2036 blob_normalizer: Optional["BlobNormalizer"] = None, 

2037 tree_encoding: str = "utf-8", 

2038 allow_overwrite_modified: bool = False, 

2039) -> None: 

2040 """Update the working tree and index to match a new tree. 

2041 

2042 This function handles: 

2043 - Adding new files 

2044 - Updating modified files 

2045 - Removing deleted files 

2046 - Cleaning up empty directories 

2047 

2048 Args: 

2049 repo: Repository object 

2050 old_tree_id: SHA of the tree before the update 

2051 new_tree_id: SHA of the tree to update to 

2052 change_iterator: Iterator of TreeChange objects to apply 

2053 honor_filemode: An optional flag to honor core.filemode setting 

2054 validate_path_element: Function to validate path elements to check out 

2055 symlink_fn: Function to use for creating symlinks 

2056 force_remove_untracked: If True, remove files that exist in working 

2057 directory but not in target tree, even if old_tree_id is None 

2058 blob_normalizer: An optional BlobNormalizer to use for converting line 

2059 endings when writing blobs to the working directory. 

2060 tree_encoding: Encoding used for tree paths (default: utf-8) 

2061 allow_overwrite_modified: If False, raise an error when attempting to 

2062 overwrite files that have been modified compared to old_tree_id 

2063 """ 

2064 if validate_path_element is None: 

2065 validate_path_element = validate_path_element_default 

2066 

2067 from .diff_tree import ( 

2068 CHANGE_ADD, 

2069 CHANGE_COPY, 

2070 CHANGE_DELETE, 

2071 CHANGE_MODIFY, 

2072 CHANGE_RENAME, 

2073 CHANGE_UNCHANGED, 

2074 ) 

2075 

2076 repo_path = repo.path if isinstance(repo.path, bytes) else repo.path.encode() 

2077 index = repo.open_index() 

2078 

2079 # Convert iterator to list since we need multiple passes 

2080 changes = list(change_iterator) 

2081 

2082 # Transform case-only renames on case-insensitive filesystems 

2083 import platform 

2084 

2085 default_ignore_case = platform.system() in ("Windows", "Darwin") 

2086 config = repo.get_config() 

2087 ignore_case = config.get_boolean((b"core",), b"ignorecase", default_ignore_case) 

2088 

2089 if ignore_case: 

2090 config = repo.get_config() 

2091 changes = detect_case_only_renames(changes, config) 

2092 

2093 # Check for path conflicts where files need to become directories 

2094 paths_becoming_dirs = set() 

2095 for change in changes: 

2096 if change.type in (CHANGE_ADD, CHANGE_MODIFY, CHANGE_RENAME, CHANGE_COPY): 

2097 path = change.new.path 

2098 if b"/" in path: # This is a file inside a directory 

2099 # Check if any parent path exists as a file in the old tree or changes 

2100 parts = path.split(b"/") 

2101 for i in range(1, len(parts)): 

2102 parent = b"/".join(parts[:i]) 

2103 # See if this parent path is being deleted (was a file, becoming a dir) 

2104 for other_change in changes: 

2105 if ( 

2106 other_change.type == CHANGE_DELETE 

2107 and other_change.old 

2108 and other_change.old.path == parent 

2109 ): 

2110 paths_becoming_dirs.add(parent) 

2111 

2112 # Check if any path that needs to become a directory has been modified 

2113 for path in paths_becoming_dirs: 

2114 full_path = _tree_to_fs_path(repo_path, path, tree_encoding) 

2115 try: 

2116 current_stat = os.lstat(full_path) 

2117 except FileNotFoundError: 

2118 continue # File doesn't exist, nothing to check 

2119 except OSError as e: 

2120 raise OSError( 

2121 f"Cannot access {path.decode('utf-8', errors='replace')}: {e}" 

2122 ) from e 

2123 

2124 if stat.S_ISREG(current_stat.st_mode): 

2125 # Find the old entry for this path 

2126 old_change = None 

2127 for change in changes: 

2128 if ( 

2129 change.type == CHANGE_DELETE 

2130 and change.old 

2131 and change.old.path == path 

2132 ): 

2133 old_change = change 

2134 break 

2135 

2136 if old_change: 

2137 # Check if file has been modified 

2138 file_matches = _check_file_matches( 

2139 repo.object_store, 

2140 full_path, 

2141 old_change.old.sha, 

2142 old_change.old.mode, 

2143 current_stat, 

2144 honor_filemode, 

2145 blob_normalizer, 

2146 path, 

2147 ) 

2148 if not file_matches: 

2149 raise OSError( 

2150 f"Cannot replace modified file with directory: {path!r}" 

2151 ) 

2152 

2153 # Check for uncommitted modifications before making any changes 

2154 if not allow_overwrite_modified and old_tree_id: 

2155 for change in changes: 

2156 # Only check files that are being modified or deleted 

2157 if change.type in (CHANGE_MODIFY, CHANGE_DELETE) and change.old: 

2158 path = change.old.path 

2159 if path.startswith(b".git") or not validate_path( 

2160 path, validate_path_element 

2161 ): 

2162 continue 

2163 

2164 full_path = _tree_to_fs_path(repo_path, path, tree_encoding) 

2165 try: 

2166 current_stat = os.lstat(full_path) 

2167 except FileNotFoundError: 

2168 continue # File doesn't exist, nothing to check 

2169 except OSError as e: 

2170 raise OSError( 

2171 f"Cannot access {path.decode('utf-8', errors='replace')}: {e}" 

2172 ) from e 

2173 

2174 if stat.S_ISREG(current_stat.st_mode): 

2175 # Check if working tree file differs from old tree 

2176 file_matches = _check_file_matches( 

2177 repo.object_store, 

2178 full_path, 

2179 change.old.sha, 

2180 change.old.mode, 

2181 current_stat, 

2182 honor_filemode, 

2183 blob_normalizer, 

2184 path, 

2185 ) 

2186 if not file_matches: 

2187 from .errors import WorkingTreeModifiedError 

2188 

2189 raise WorkingTreeModifiedError( 

2190 f"Your local changes to '{path.decode('utf-8', errors='replace')}' " 

2191 f"would be overwritten by checkout. " 

2192 f"Please commit your changes or stash them before you switch branches." 

2193 ) 

2194 

2195 # Apply the changes 

2196 for change in changes: 

2197 if change.type in (CHANGE_DELETE, CHANGE_RENAME): 

2198 # Remove file/directory 

2199 path = change.old.path 

2200 if path.startswith(b".git") or not validate_path( 

2201 path, validate_path_element 

2202 ): 

2203 continue 

2204 

2205 full_path = _tree_to_fs_path(repo_path, path, tree_encoding) 

2206 try: 

2207 delete_stat: Optional[os.stat_result] = os.lstat(full_path) 

2208 except FileNotFoundError: 

2209 delete_stat = None 

2210 except OSError as e: 

2211 raise OSError( 

2212 f"Cannot access {path.decode('utf-8', errors='replace')}: {e}" 

2213 ) from e 

2214 

2215 _transition_to_absent(repo, path, full_path, delete_stat, index) 

2216 

2217 if change.type in ( 

2218 CHANGE_ADD, 

2219 CHANGE_MODIFY, 

2220 CHANGE_UNCHANGED, 

2221 CHANGE_COPY, 

2222 CHANGE_RENAME, 

2223 ): 

2224 # Add or modify file 

2225 path = change.new.path 

2226 if path.startswith(b".git") or not validate_path( 

2227 path, validate_path_element 

2228 ): 

2229 continue 

2230 

2231 full_path = _tree_to_fs_path(repo_path, path, tree_encoding) 

2232 try: 

2233 modify_stat: Optional[os.stat_result] = os.lstat(full_path) 

2234 except FileNotFoundError: 

2235 modify_stat = None 

2236 except OSError as e: 

2237 raise OSError( 

2238 f"Cannot access {path.decode('utf-8', errors='replace')}: {e}" 

2239 ) from e 

2240 

2241 if S_ISGITLINK(change.new.mode): 

2242 _transition_to_submodule( 

2243 repo, path, full_path, modify_stat, change.new, index 

2244 ) 

2245 else: 

2246 _transition_to_file( 

2247 repo.object_store, 

2248 path, 

2249 full_path, 

2250 modify_stat, 

2251 change.new, 

2252 index, 

2253 honor_filemode, 

2254 symlink_fn, 

2255 blob_normalizer, 

2256 tree_encoding, 

2257 ) 

2258 

2259 index.write() 

2260 

2261 

2262def get_unstaged_changes( 

2263 index: Index, 

2264 root_path: Union[str, bytes], 

2265 filter_blob_callback: Optional[Callable] = None, 

2266) -> Generator[bytes, None, None]: 

2267 """Walk through an index and check for differences against working tree. 

2268 

2269 Args: 

2270 index: index to check 

2271 root_path: path in which to find files 

2272 Returns: iterator over paths with unstaged changes 

2273 """ 

2274 # For each entry in the index check the sha1 & ensure not staged 

2275 if not isinstance(root_path, bytes): 

2276 root_path = os.fsencode(root_path) 

2277 

2278 for tree_path, entry in index.iteritems(): 

2279 full_path = _tree_to_fs_path(root_path, tree_path) 

2280 if isinstance(entry, ConflictedIndexEntry): 

2281 # Conflicted files are always unstaged 

2282 yield tree_path 

2283 continue 

2284 

2285 try: 

2286 st = os.lstat(full_path) 

2287 if stat.S_ISDIR(st.st_mode): 

2288 if _has_directory_changed(tree_path, entry): 

2289 yield tree_path 

2290 continue 

2291 

2292 if not stat.S_ISREG(st.st_mode) and not stat.S_ISLNK(st.st_mode): 

2293 continue 

2294 

2295 blob = blob_from_path_and_stat(full_path, st) 

2296 

2297 if filter_blob_callback is not None: 

2298 blob = filter_blob_callback(blob, tree_path) 

2299 except FileNotFoundError: 

2300 # The file was removed, so we assume that counts as 

2301 # different from whatever file used to exist. 

2302 yield tree_path 

2303 else: 

2304 if blob.id != entry.sha: 

2305 yield tree_path 

2306 

2307 

2308def _tree_to_fs_path( 

2309 root_path: bytes, tree_path: bytes, tree_encoding: str = "utf-8" 

2310) -> bytes: 

2311 """Convert a git tree path to a file system path. 

2312 

2313 Args: 

2314 root_path: Root filesystem path 

2315 tree_path: Git tree path as bytes (encoded with tree_encoding) 

2316 tree_encoding: Encoding used for tree paths (default: utf-8) 

2317 

2318 Returns: File system path. 

2319 """ 

2320 assert isinstance(tree_path, bytes) 

2321 if os_sep_bytes != b"/": 

2322 sep_corrected_path = tree_path.replace(b"/", os_sep_bytes) 

2323 else: 

2324 sep_corrected_path = tree_path 

2325 

2326 # On Windows, we need to handle tree path encoding properly 

2327 if sys.platform == "win32": 

2328 # Decode from tree encoding, then re-encode for filesystem 

2329 try: 

2330 tree_path_str = sep_corrected_path.decode(tree_encoding) 

2331 sep_corrected_path = os.fsencode(tree_path_str) 

2332 except UnicodeDecodeError: 

2333 # If decoding fails, use the original bytes 

2334 pass 

2335 

2336 return os.path.join(root_path, sep_corrected_path) 

2337 

2338 

2339def _fs_to_tree_path(fs_path: Union[str, bytes], tree_encoding: str = "utf-8") -> bytes: 

2340 """Convert a file system path to a git tree path. 

2341 

2342 Args: 

2343 fs_path: File system path. 

2344 tree_encoding: Encoding to use for tree paths (default: utf-8) 

2345 

2346 Returns: Git tree path as bytes (encoded with tree_encoding) 

2347 """ 

2348 if not isinstance(fs_path, bytes): 

2349 fs_path_bytes = os.fsencode(fs_path) 

2350 else: 

2351 fs_path_bytes = fs_path 

2352 

2353 # On Windows, we need to ensure tree paths are properly encoded 

2354 if sys.platform == "win32": 

2355 try: 

2356 # Decode from filesystem encoding, then re-encode with tree encoding 

2357 fs_path_str = os.fsdecode(fs_path_bytes) 

2358 fs_path_bytes = fs_path_str.encode(tree_encoding) 

2359 except UnicodeDecodeError: 

2360 # If filesystem decoding fails, use the original bytes 

2361 pass 

2362 

2363 if os_sep_bytes != b"/": 

2364 tree_path = fs_path_bytes.replace(os_sep_bytes, b"/") 

2365 else: 

2366 tree_path = fs_path_bytes 

2367 return tree_path 

2368 

2369 

2370def index_entry_from_directory(st: os.stat_result, path: bytes) -> Optional[IndexEntry]: 

2371 if os.path.exists(os.path.join(path, b".git")): 

2372 head = read_submodule_head(path) 

2373 if head is None: 

2374 return None 

2375 return index_entry_from_stat(st, head, mode=S_IFGITLINK) 

2376 return None 

2377 

2378 

2379def index_entry_from_path( 

2380 path: bytes, object_store: Optional[ObjectContainer] = None 

2381) -> Optional[IndexEntry]: 

2382 """Create an index from a filesystem path. 

2383 

2384 This returns an index value for files, symlinks 

2385 and tree references. for directories and 

2386 non-existent files it returns None 

2387 

2388 Args: 

2389 path: Path to create an index entry for 

2390 object_store: Optional object store to 

2391 save new blobs in 

2392 Returns: An index entry; None for directories 

2393 """ 

2394 assert isinstance(path, bytes) 

2395 st = os.lstat(path) 

2396 if stat.S_ISDIR(st.st_mode): 

2397 return index_entry_from_directory(st, path) 

2398 

2399 if stat.S_ISREG(st.st_mode) or stat.S_ISLNK(st.st_mode): 

2400 blob = blob_from_path_and_stat(path, st) 

2401 if object_store is not None: 

2402 object_store.add_object(blob) 

2403 return index_entry_from_stat(st, blob.id) 

2404 

2405 return None 

2406 

2407 

2408def iter_fresh_entries( 

2409 paths: Iterable[bytes], 

2410 root_path: bytes, 

2411 object_store: Optional[ObjectContainer] = None, 

2412) -> Iterator[tuple[bytes, Optional[IndexEntry]]]: 

2413 """Iterate over current versions of index entries on disk. 

2414 

2415 Args: 

2416 paths: Paths to iterate over 

2417 root_path: Root path to access from 

2418 object_store: Optional store to save new blobs in 

2419 Returns: Iterator over path, index_entry 

2420 """ 

2421 for path in paths: 

2422 p = _tree_to_fs_path(root_path, path) 

2423 try: 

2424 entry = index_entry_from_path(p, object_store=object_store) 

2425 except (FileNotFoundError, IsADirectoryError): 

2426 entry = None 

2427 yield path, entry 

2428 

2429 

2430def iter_fresh_objects( 

2431 paths: Iterable[bytes], 

2432 root_path: bytes, 

2433 include_deleted: bool = False, 

2434 object_store: Optional[ObjectContainer] = None, 

2435) -> Iterator[tuple[bytes, Optional[bytes], Optional[int]]]: 

2436 """Iterate over versions of objects on disk referenced by index. 

2437 

2438 Args: 

2439 root_path: Root path to access from 

2440 include_deleted: Include deleted entries with sha and 

2441 mode set to None 

2442 object_store: Optional object store to report new items to 

2443 Returns: Iterator over path, sha, mode 

2444 """ 

2445 for path, entry in iter_fresh_entries(paths, root_path, object_store=object_store): 

2446 if entry is None: 

2447 if include_deleted: 

2448 yield path, None, None 

2449 else: 

2450 yield path, entry.sha, cleanup_mode(entry.mode) 

2451 

2452 

2453def refresh_index(index: Index, root_path: bytes) -> None: 

2454 """Refresh the contents of an index. 

2455 

2456 This is the equivalent to running 'git commit -a'. 

2457 

2458 Args: 

2459 index: Index to update 

2460 root_path: Root filesystem path 

2461 """ 

2462 for path, entry in iter_fresh_entries(index, root_path): 

2463 if entry: 

2464 index[path] = entry 

2465 

2466 

2467class locked_index: 

2468 """Lock the index while making modifications. 

2469 

2470 Works as a context manager. 

2471 """ 

2472 

2473 _file: "_GitFile" 

2474 

2475 def __init__(self, path: Union[bytes, str]) -> None: 

2476 self._path = path 

2477 

2478 def __enter__(self) -> Index: 

2479 f = GitFile(self._path, "wb") 

2480 assert isinstance(f, _GitFile) # GitFile in write mode always returns _GitFile 

2481 self._file = f 

2482 self._index = Index(self._path) 

2483 return self._index 

2484 

2485 def __exit__( 

2486 self, 

2487 exc_type: Optional[type], 

2488 exc_value: Optional[BaseException], 

2489 traceback: Optional[types.TracebackType], 

2490 ) -> None: 

2491 if exc_type is not None: 

2492 self._file.abort() 

2493 return 

2494 try: 

2495 from typing import BinaryIO, cast 

2496 

2497 f = SHA1Writer(cast(BinaryIO, self._file)) 

2498 write_index_dict(cast(BinaryIO, f), self._index._byname) 

2499 except BaseException: 

2500 self._file.abort() 

2501 else: 

2502 f.close()