Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/dulwich/index.py: 33%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

1058 statements  

1# index.py -- File parser/writer for the git index file 

2# Copyright (C) 2008-2013 Jelmer Vernooij <jelmer@jelmer.uk> 

3# 

4# SPDX-License-Identifier: Apache-2.0 OR GPL-2.0-or-later 

5# Dulwich is dual-licensed under the Apache License, Version 2.0 and the GNU 

6# General Public License as published by the Free Software Foundation; version 2.0 

7# or (at your option) any later version. You can redistribute it and/or 

8# modify it under the terms of either of these two licenses. 

9# 

10# Unless required by applicable law or agreed to in writing, software 

11# distributed under the License is distributed on an "AS IS" BASIS, 

12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

13# See the License for the specific language governing permissions and 

14# limitations under the License. 

15# 

16# You should have received a copy of the licenses; if not, see 

17# <http://www.gnu.org/licenses/> for a copy of the GNU General Public License 

18# and <http://www.apache.org/licenses/LICENSE-2.0> for a copy of the Apache 

19# License, Version 2.0. 

20# 

21 

22"""Parser for the git index file format.""" 

23 

24import errno 

25import os 

26import shutil 

27import stat 

28import struct 

29import sys 

30import types 

31from collections.abc import Generator, Iterable, Iterator 

32from dataclasses import dataclass 

33from enum import Enum 

34from typing import ( 

35 TYPE_CHECKING, 

36 Any, 

37 BinaryIO, 

38 Callable, 

39 Optional, 

40 Union, 

41 cast, 

42) 

43 

44if TYPE_CHECKING: 

45 from .config import Config 

46 from .diff_tree import TreeChange 

47 from .file import _GitFile 

48 from .line_ending import BlobNormalizer 

49 from .repo import Repo 

50 

51from .file import GitFile 

52from .object_store import iter_tree_contents 

53from .objects import ( 

54 S_IFGITLINK, 

55 S_ISGITLINK, 

56 Blob, 

57 ObjectID, 

58 Tree, 

59 hex_to_sha, 

60 sha_to_hex, 

61) 

62from .pack import ObjectContainer, SHA1Reader, SHA1Writer 

63 

64# 2-bit stage (during merge) 

65FLAG_STAGEMASK = 0x3000 

66FLAG_STAGESHIFT = 12 

67FLAG_NAMEMASK = 0x0FFF 

68 

69# assume-valid 

70FLAG_VALID = 0x8000 

71 

72# extended flag (must be zero in version 2) 

73FLAG_EXTENDED = 0x4000 

74 

75# used by sparse checkout 

76EXTENDED_FLAG_SKIP_WORKTREE = 0x4000 

77 

78# used by "git add -N" 

79EXTENDED_FLAG_INTEND_TO_ADD = 0x2000 

80 

81DEFAULT_VERSION = 2 

82 

83# Index extension signatures 

84TREE_EXTENSION = b"TREE" 

85REUC_EXTENSION = b"REUC" 

86UNTR_EXTENSION = b"UNTR" 

87EOIE_EXTENSION = b"EOIE" 

88IEOT_EXTENSION = b"IEOT" 

89 

90 

91def _encode_varint(value: int) -> bytes: 

92 """Encode an integer using variable-width encoding. 

93 

94 Same format as used for OFS_DELTA pack entries and index v4 path compression. 

95 Uses 7 bits per byte, with the high bit indicating continuation. 

96 

97 Args: 

98 value: Integer to encode 

99 Returns: 

100 Encoded bytes 

101 """ 

102 if value == 0: 

103 return b"\x00" 

104 

105 result = [] 

106 while value > 0: 

107 byte = value & 0x7F # Take lower 7 bits 

108 value >>= 7 

109 if value > 0: 

110 byte |= 0x80 # Set continuation bit 

111 result.append(byte) 

112 

113 return bytes(result) 

114 

115 

116def _decode_varint(data: bytes, offset: int = 0) -> tuple[int, int]: 

117 """Decode a variable-width encoded integer. 

118 

119 Args: 

120 data: Bytes to decode from 

121 offset: Starting offset in data 

122 Returns: 

123 tuple of (decoded_value, new_offset) 

124 """ 

125 value = 0 

126 shift = 0 

127 pos = offset 

128 

129 while pos < len(data): 

130 byte = data[pos] 

131 pos += 1 

132 value |= (byte & 0x7F) << shift 

133 shift += 7 

134 if not (byte & 0x80): # No continuation bit 

135 break 

136 

137 return value, pos 

138 

139 

140def _compress_path(path: bytes, previous_path: bytes) -> bytes: 

141 """Compress a path relative to the previous path for index version 4. 

142 

143 Args: 

144 path: Path to compress 

145 previous_path: Previous path for comparison 

146 Returns: 

147 Compressed path data (varint prefix_len + suffix) 

148 """ 

149 # Find the common prefix length 

150 common_len = 0 

151 min_len = min(len(path), len(previous_path)) 

152 

153 for i in range(min_len): 

154 if path[i] == previous_path[i]: 

155 common_len += 1 

156 else: 

157 break 

158 

159 # The number of bytes to remove from the end of previous_path 

160 # to get the common prefix 

161 remove_len = len(previous_path) - common_len 

162 

163 # The suffix to append 

164 suffix = path[common_len:] 

165 

166 # Encode: varint(remove_len) + suffix + NUL 

167 return _encode_varint(remove_len) + suffix + b"\x00" 

168 

169 

170def _decompress_path( 

171 data: bytes, offset: int, previous_path: bytes 

172) -> tuple[bytes, int]: 

173 """Decompress a path from index version 4 compressed format. 

174 

175 Args: 

176 data: Raw data containing compressed path 

177 offset: Starting offset in data 

178 previous_path: Previous path for decompression 

179 Returns: 

180 tuple of (decompressed_path, new_offset) 

181 """ 

182 # Decode the number of bytes to remove from previous path 

183 remove_len, new_offset = _decode_varint(data, offset) 

184 

185 # Find the NUL terminator for the suffix 

186 suffix_start = new_offset 

187 suffix_end = suffix_start 

188 while suffix_end < len(data) and data[suffix_end] != 0: 

189 suffix_end += 1 

190 

191 if suffix_end >= len(data): 

192 raise ValueError("Unterminated path suffix in compressed entry") 

193 

194 suffix = data[suffix_start:suffix_end] 

195 new_offset = suffix_end + 1 # Skip the NUL terminator 

196 

197 # Reconstruct the path 

198 if remove_len > len(previous_path): 

199 raise ValueError( 

200 f"Invalid path compression: trying to remove {remove_len} bytes from {len(previous_path)}-byte path" 

201 ) 

202 

203 prefix = previous_path[:-remove_len] if remove_len > 0 else previous_path 

204 path = prefix + suffix 

205 

206 return path, new_offset 

207 

208 

209def _decompress_path_from_stream( 

210 f: BinaryIO, previous_path: bytes 

211) -> tuple[bytes, int]: 

212 """Decompress a path from index version 4 compressed format, reading from stream. 

213 

214 Args: 

215 f: File-like object to read from 

216 previous_path: Previous path for decompression 

217 Returns: 

218 tuple of (decompressed_path, bytes_consumed) 

219 """ 

220 # Decode the varint for remove_len by reading byte by byte 

221 remove_len = 0 

222 shift = 0 

223 bytes_consumed = 0 

224 

225 while True: 

226 byte_data = f.read(1) 

227 if not byte_data: 

228 raise ValueError("Unexpected end of file while reading varint") 

229 byte = byte_data[0] 

230 bytes_consumed += 1 

231 remove_len |= (byte & 0x7F) << shift 

232 shift += 7 

233 if not (byte & 0x80): # No continuation bit 

234 break 

235 

236 # Read the suffix until NUL terminator 

237 suffix = b"" 

238 while True: 

239 byte_data = f.read(1) 

240 if not byte_data: 

241 raise ValueError("Unexpected end of file while reading path suffix") 

242 byte = byte_data[0] 

243 bytes_consumed += 1 

244 if byte == 0: # NUL terminator 

245 break 

246 suffix += bytes([byte]) 

247 

248 # Reconstruct the path 

249 if remove_len > len(previous_path): 

250 raise ValueError( 

251 f"Invalid path compression: trying to remove {remove_len} bytes from {len(previous_path)}-byte path" 

252 ) 

253 

254 prefix = previous_path[:-remove_len] if remove_len > 0 else previous_path 

255 path = prefix + suffix 

256 

257 return path, bytes_consumed 

258 

259 

260class Stage(Enum): 

261 NORMAL = 0 

262 MERGE_CONFLICT_ANCESTOR = 1 

263 MERGE_CONFLICT_THIS = 2 

264 MERGE_CONFLICT_OTHER = 3 

265 

266 

267@dataclass 

268class SerializedIndexEntry: 

269 name: bytes 

270 ctime: Union[int, float, tuple[int, int]] 

271 mtime: Union[int, float, tuple[int, int]] 

272 dev: int 

273 ino: int 

274 mode: int 

275 uid: int 

276 gid: int 

277 size: int 

278 sha: bytes 

279 flags: int 

280 extended_flags: int 

281 

282 def stage(self) -> Stage: 

283 return Stage((self.flags & FLAG_STAGEMASK) >> FLAG_STAGESHIFT) 

284 

285 

286@dataclass 

287class IndexExtension: 

288 """Base class for index extensions.""" 

289 

290 signature: bytes 

291 data: bytes 

292 

293 @classmethod 

294 def from_raw(cls, signature: bytes, data: bytes) -> "IndexExtension": 

295 """Create an extension from raw data. 

296 

297 Args: 

298 signature: 4-byte extension signature 

299 data: Extension data 

300 Returns: 

301 Parsed extension object 

302 """ 

303 if signature == TREE_EXTENSION: 

304 return TreeExtension.from_bytes(data) 

305 elif signature == REUC_EXTENSION: 

306 return ResolveUndoExtension.from_bytes(data) 

307 elif signature == UNTR_EXTENSION: 

308 return UntrackedExtension.from_bytes(data) 

309 else: 

310 # Unknown extension - just store raw data 

311 return cls(signature, data) 

312 

313 def to_bytes(self) -> bytes: 

314 """Serialize extension to bytes.""" 

315 return self.data 

316 

317 

318class TreeExtension(IndexExtension): 

319 """Tree cache extension.""" 

320 

321 def __init__(self, entries: list[tuple[bytes, bytes, int]]) -> None: 

322 self.entries = entries 

323 super().__init__(TREE_EXTENSION, b"") 

324 

325 @classmethod 

326 def from_bytes(cls, data: bytes) -> "TreeExtension": 

327 # TODO: Implement tree cache parsing 

328 return cls([]) 

329 

330 def to_bytes(self) -> bytes: 

331 # TODO: Implement tree cache serialization 

332 return b"" 

333 

334 

335class ResolveUndoExtension(IndexExtension): 

336 """Resolve undo extension for recording merge conflicts.""" 

337 

338 def __init__(self, entries: list[tuple[bytes, list[tuple[int, bytes]]]]) -> None: 

339 self.entries = entries 

340 super().__init__(REUC_EXTENSION, b"") 

341 

342 @classmethod 

343 def from_bytes(cls, data: bytes) -> "ResolveUndoExtension": 

344 # TODO: Implement resolve undo parsing 

345 return cls([]) 

346 

347 def to_bytes(self) -> bytes: 

348 # TODO: Implement resolve undo serialization 

349 return b"" 

350 

351 

352class UntrackedExtension(IndexExtension): 

353 """Untracked cache extension.""" 

354 

355 def __init__(self, data: bytes) -> None: 

356 super().__init__(UNTR_EXTENSION, data) 

357 

358 @classmethod 

359 def from_bytes(cls, data: bytes) -> "UntrackedExtension": 

360 return cls(data) 

361 

362 

363@dataclass 

364class IndexEntry: 

365 ctime: Union[int, float, tuple[int, int]] 

366 mtime: Union[int, float, tuple[int, int]] 

367 dev: int 

368 ino: int 

369 mode: int 

370 uid: int 

371 gid: int 

372 size: int 

373 sha: bytes 

374 flags: int = 0 

375 extended_flags: int = 0 

376 

377 @classmethod 

378 def from_serialized(cls, serialized: SerializedIndexEntry) -> "IndexEntry": 

379 return cls( 

380 ctime=serialized.ctime, 

381 mtime=serialized.mtime, 

382 dev=serialized.dev, 

383 ino=serialized.ino, 

384 mode=serialized.mode, 

385 uid=serialized.uid, 

386 gid=serialized.gid, 

387 size=serialized.size, 

388 sha=serialized.sha, 

389 flags=serialized.flags, 

390 extended_flags=serialized.extended_flags, 

391 ) 

392 

393 def serialize(self, name: bytes, stage: Stage) -> SerializedIndexEntry: 

394 # Clear out any existing stage bits, then set them from the Stage. 

395 new_flags = self.flags & ~FLAG_STAGEMASK 

396 new_flags |= stage.value << FLAG_STAGESHIFT 

397 return SerializedIndexEntry( 

398 name=name, 

399 ctime=self.ctime, 

400 mtime=self.mtime, 

401 dev=self.dev, 

402 ino=self.ino, 

403 mode=self.mode, 

404 uid=self.uid, 

405 gid=self.gid, 

406 size=self.size, 

407 sha=self.sha, 

408 flags=new_flags, 

409 extended_flags=self.extended_flags, 

410 ) 

411 

412 def stage(self) -> Stage: 

413 return Stage((self.flags & FLAG_STAGEMASK) >> FLAG_STAGESHIFT) 

414 

415 @property 

416 def skip_worktree(self) -> bool: 

417 """Return True if the skip-worktree bit is set in extended_flags.""" 

418 return bool(self.extended_flags & EXTENDED_FLAG_SKIP_WORKTREE) 

419 

420 def set_skip_worktree(self, skip: bool = True) -> None: 

421 """Helper method to set or clear the skip-worktree bit in extended_flags. 

422 Also sets FLAG_EXTENDED in self.flags if needed. 

423 """ 

424 if skip: 

425 # Turn on the skip-worktree bit 

426 self.extended_flags |= EXTENDED_FLAG_SKIP_WORKTREE 

427 # Also ensure the main 'extended' bit is set in flags 

428 self.flags |= FLAG_EXTENDED 

429 else: 

430 # Turn off the skip-worktree bit 

431 self.extended_flags &= ~EXTENDED_FLAG_SKIP_WORKTREE 

432 # Optionally unset the main extended bit if no extended flags remain 

433 if self.extended_flags == 0: 

434 self.flags &= ~FLAG_EXTENDED 

435 

436 

437class ConflictedIndexEntry: 

438 """Index entry that represents a conflict.""" 

439 

440 ancestor: Optional[IndexEntry] 

441 this: Optional[IndexEntry] 

442 other: Optional[IndexEntry] 

443 

444 def __init__( 

445 self, 

446 ancestor: Optional[IndexEntry] = None, 

447 this: Optional[IndexEntry] = None, 

448 other: Optional[IndexEntry] = None, 

449 ) -> None: 

450 self.ancestor = ancestor 

451 self.this = this 

452 self.other = other 

453 

454 

455class UnmergedEntries(Exception): 

456 """Unmerged entries exist in the index.""" 

457 

458 

459def pathsplit(path: bytes) -> tuple[bytes, bytes]: 

460 """Split a /-delimited path into a directory part and a basename. 

461 

462 Args: 

463 path: The path to split. 

464 

465 Returns: 

466 Tuple with directory name and basename 

467 """ 

468 try: 

469 (dirname, basename) = path.rsplit(b"/", 1) 

470 except ValueError: 

471 return (b"", path) 

472 else: 

473 return (dirname, basename) 

474 

475 

476def pathjoin(*args: bytes) -> bytes: 

477 """Join a /-delimited path.""" 

478 return b"/".join([p for p in args if p]) 

479 

480 

481def read_cache_time(f: BinaryIO) -> tuple[int, int]: 

482 """Read a cache time. 

483 

484 Args: 

485 f: File-like object to read from 

486 Returns: 

487 Tuple with seconds and nanoseconds 

488 """ 

489 return struct.unpack(">LL", f.read(8)) 

490 

491 

492def write_cache_time(f: BinaryIO, t: Union[int, float, tuple[int, int]]) -> None: 

493 """Write a cache time. 

494 

495 Args: 

496 f: File-like object to write to 

497 t: Time to write (as int, float or tuple with secs and nsecs) 

498 """ 

499 if isinstance(t, int): 

500 t = (t, 0) 

501 elif isinstance(t, float): 

502 (secs, nsecs) = divmod(t, 1.0) 

503 t = (int(secs), int(nsecs * 1000000000)) 

504 elif not isinstance(t, tuple): 

505 raise TypeError(t) 

506 f.write(struct.pack(">LL", *t)) 

507 

508 

509def read_cache_entry( 

510 f: BinaryIO, version: int, previous_path: bytes = b"" 

511) -> SerializedIndexEntry: 

512 """Read an entry from a cache file. 

513 

514 Args: 

515 f: File-like object to read from 

516 version: Index version 

517 previous_path: Previous entry's path (for version 4 compression) 

518 """ 

519 beginoffset = f.tell() 

520 ctime = read_cache_time(f) 

521 mtime = read_cache_time(f) 

522 ( 

523 dev, 

524 ino, 

525 mode, 

526 uid, 

527 gid, 

528 size, 

529 sha, 

530 flags, 

531 ) = struct.unpack(">LLLLLL20sH", f.read(20 + 4 * 6 + 2)) 

532 if flags & FLAG_EXTENDED: 

533 if version < 3: 

534 raise AssertionError("extended flag set in index with version < 3") 

535 (extended_flags,) = struct.unpack(">H", f.read(2)) 

536 else: 

537 extended_flags = 0 

538 

539 if version >= 4: 

540 # Version 4: paths are always compressed (name_len should be 0) 

541 name, consumed = _decompress_path_from_stream(f, previous_path) 

542 else: 

543 # Versions < 4: regular name reading 

544 name = f.read(flags & FLAG_NAMEMASK) 

545 

546 # Padding: 

547 if version < 4: 

548 real_size = (f.tell() - beginoffset + 8) & ~7 

549 f.read((beginoffset + real_size) - f.tell()) 

550 

551 return SerializedIndexEntry( 

552 name, 

553 ctime, 

554 mtime, 

555 dev, 

556 ino, 

557 mode, 

558 uid, 

559 gid, 

560 size, 

561 sha_to_hex(sha), 

562 flags & ~FLAG_NAMEMASK, 

563 extended_flags, 

564 ) 

565 

566 

567def write_cache_entry( 

568 f: BinaryIO, entry: SerializedIndexEntry, version: int, previous_path: bytes = b"" 

569) -> None: 

570 """Write an index entry to a file. 

571 

572 Args: 

573 f: File object 

574 entry: IndexEntry to write 

575 version: Index format version 

576 previous_path: Previous entry's path (for version 4 compression) 

577 """ 

578 beginoffset = f.tell() 

579 write_cache_time(f, entry.ctime) 

580 write_cache_time(f, entry.mtime) 

581 

582 if version >= 4: 

583 # Version 4: use compression but set name_len to actual filename length 

584 # This matches how C Git implements index v4 flags 

585 compressed_path = _compress_path(entry.name, previous_path) 

586 flags = len(entry.name) | (entry.flags & ~FLAG_NAMEMASK) 

587 else: 

588 # Versions < 4: include actual name length 

589 flags = len(entry.name) | (entry.flags & ~FLAG_NAMEMASK) 

590 

591 if entry.extended_flags: 

592 flags |= FLAG_EXTENDED 

593 if flags & FLAG_EXTENDED and version is not None and version < 3: 

594 raise AssertionError("unable to use extended flags in version < 3") 

595 

596 f.write( 

597 struct.pack( 

598 b">LLLLLL20sH", 

599 entry.dev & 0xFFFFFFFF, 

600 entry.ino & 0xFFFFFFFF, 

601 entry.mode, 

602 entry.uid, 

603 entry.gid, 

604 entry.size, 

605 hex_to_sha(entry.sha), 

606 flags, 

607 ) 

608 ) 

609 if flags & FLAG_EXTENDED: 

610 f.write(struct.pack(b">H", entry.extended_flags)) 

611 

612 if version >= 4: 

613 # Version 4: always write compressed path 

614 f.write(compressed_path) 

615 else: 

616 # Versions < 4: write regular path and padding 

617 f.write(entry.name) 

618 real_size = (f.tell() - beginoffset + 8) & ~7 

619 f.write(b"\0" * ((beginoffset + real_size) - f.tell())) 

620 

621 

622class UnsupportedIndexFormat(Exception): 

623 """An unsupported index format was encountered.""" 

624 

625 def __init__(self, version: int) -> None: 

626 self.index_format_version = version 

627 

628 

629def read_index_header(f: BinaryIO) -> tuple[int, int]: 

630 """Read an index header from a file. 

631 

632 Returns: 

633 tuple of (version, num_entries) 

634 """ 

635 header = f.read(4) 

636 if header != b"DIRC": 

637 raise AssertionError(f"Invalid index file header: {header!r}") 

638 (version, num_entries) = struct.unpack(b">LL", f.read(4 * 2)) 

639 if version not in (1, 2, 3, 4): 

640 raise UnsupportedIndexFormat(version) 

641 return version, num_entries 

642 

643 

644def write_index_extension(f: BinaryIO, extension: IndexExtension) -> None: 

645 """Write an index extension. 

646 

647 Args: 

648 f: File-like object to write to 

649 extension: Extension to write 

650 """ 

651 data = extension.to_bytes() 

652 f.write(extension.signature) 

653 f.write(struct.pack(">I", len(data))) 

654 f.write(data) 

655 

656 

657def read_index(f: BinaryIO) -> Iterator[SerializedIndexEntry]: 

658 """Read an index file, yielding the individual entries.""" 

659 version, num_entries = read_index_header(f) 

660 previous_path = b"" 

661 for i in range(num_entries): 

662 entry = read_cache_entry(f, version, previous_path) 

663 previous_path = entry.name 

664 yield entry 

665 

666 

667def read_index_dict_with_version( 

668 f: BinaryIO, 

669) -> tuple[ 

670 dict[bytes, Union[IndexEntry, ConflictedIndexEntry]], int, list[IndexExtension] 

671]: 

672 """Read an index file and return it as a dictionary along with the version. 

673 

674 Returns: 

675 tuple of (entries_dict, version, extensions) 

676 """ 

677 version, num_entries = read_index_header(f) 

678 

679 ret: dict[bytes, Union[IndexEntry, ConflictedIndexEntry]] = {} 

680 previous_path = b"" 

681 for i in range(num_entries): 

682 entry = read_cache_entry(f, version, previous_path) 

683 previous_path = entry.name 

684 stage = entry.stage() 

685 if stage == Stage.NORMAL: 

686 ret[entry.name] = IndexEntry.from_serialized(entry) 

687 else: 

688 existing = ret.setdefault(entry.name, ConflictedIndexEntry()) 

689 if isinstance(existing, IndexEntry): 

690 raise AssertionError(f"Non-conflicted entry for {entry.name!r} exists") 

691 if stage == Stage.MERGE_CONFLICT_ANCESTOR: 

692 existing.ancestor = IndexEntry.from_serialized(entry) 

693 elif stage == Stage.MERGE_CONFLICT_THIS: 

694 existing.this = IndexEntry.from_serialized(entry) 

695 elif stage == Stage.MERGE_CONFLICT_OTHER: 

696 existing.other = IndexEntry.from_serialized(entry) 

697 

698 # Read extensions 

699 extensions = [] 

700 while True: 

701 # Check if we're at the end (20 bytes before EOF for SHA checksum) 

702 current_pos = f.tell() 

703 f.seek(0, 2) # EOF 

704 eof_pos = f.tell() 

705 f.seek(current_pos) 

706 

707 if current_pos >= eof_pos - 20: 

708 break 

709 

710 # Try to read extension signature 

711 signature = f.read(4) 

712 if len(signature) < 4: 

713 break 

714 

715 # Check if it's a valid extension signature (4 uppercase letters) 

716 if not all(65 <= b <= 90 for b in signature): 

717 # Not an extension, seek back 

718 f.seek(-4, 1) 

719 break 

720 

721 # Read extension size 

722 size_data = f.read(4) 

723 if len(size_data) < 4: 

724 break 

725 size = struct.unpack(">I", size_data)[0] 

726 

727 # Read extension data 

728 data = f.read(size) 

729 if len(data) < size: 

730 break 

731 

732 extension = IndexExtension.from_raw(signature, data) 

733 extensions.append(extension) 

734 

735 return ret, version, extensions 

736 

737 

738def read_index_dict( 

739 f: BinaryIO, 

740) -> dict[bytes, Union[IndexEntry, ConflictedIndexEntry]]: 

741 """Read an index file and return it as a dictionary. 

742 Dict Key is tuple of path and stage number, as 

743 path alone is not unique 

744 Args: 

745 f: File object to read fromls. 

746 """ 

747 ret: dict[bytes, Union[IndexEntry, ConflictedIndexEntry]] = {} 

748 for entry in read_index(f): 

749 stage = entry.stage() 

750 if stage == Stage.NORMAL: 

751 ret[entry.name] = IndexEntry.from_serialized(entry) 

752 else: 

753 existing = ret.setdefault(entry.name, ConflictedIndexEntry()) 

754 if isinstance(existing, IndexEntry): 

755 raise AssertionError(f"Non-conflicted entry for {entry.name!r} exists") 

756 if stage == Stage.MERGE_CONFLICT_ANCESTOR: 

757 existing.ancestor = IndexEntry.from_serialized(entry) 

758 elif stage == Stage.MERGE_CONFLICT_THIS: 

759 existing.this = IndexEntry.from_serialized(entry) 

760 elif stage == Stage.MERGE_CONFLICT_OTHER: 

761 existing.other = IndexEntry.from_serialized(entry) 

762 return ret 

763 

764 

765def write_index( 

766 f: BinaryIO, 

767 entries: list[SerializedIndexEntry], 

768 version: Optional[int] = None, 

769 extensions: Optional[list[IndexExtension]] = None, 

770) -> None: 

771 """Write an index file. 

772 

773 Args: 

774 f: File-like object to write to 

775 version: Version number to write 

776 entries: Iterable over the entries to write 

777 extensions: Optional list of extensions to write 

778 """ 

779 if version is None: 

780 version = DEFAULT_VERSION 

781 # STEP 1: check if any extended_flags are set 

782 uses_extended_flags = any(e.extended_flags != 0 for e in entries) 

783 if uses_extended_flags and version < 3: 

784 # Force or bump the version to 3 

785 version = 3 

786 # The rest is unchanged, but you might insert a final check: 

787 if version < 3: 

788 # Double-check no extended flags appear 

789 for e in entries: 

790 if e.extended_flags != 0: 

791 raise AssertionError("Attempt to use extended flags in index < v3") 

792 # Proceed with the existing code to write the header and entries. 

793 f.write(b"DIRC") 

794 f.write(struct.pack(b">LL", version, len(entries))) 

795 previous_path = b"" 

796 for entry in entries: 

797 write_cache_entry(f, entry, version=version, previous_path=previous_path) 

798 previous_path = entry.name 

799 

800 # Write extensions 

801 if extensions: 

802 for extension in extensions: 

803 write_index_extension(f, extension) 

804 

805 

806def write_index_dict( 

807 f: BinaryIO, 

808 entries: dict[bytes, Union[IndexEntry, ConflictedIndexEntry]], 

809 version: Optional[int] = None, 

810 extensions: Optional[list[IndexExtension]] = None, 

811) -> None: 

812 """Write an index file based on the contents of a dictionary. 

813 being careful to sort by path and then by stage. 

814 """ 

815 entries_list = [] 

816 for key in sorted(entries): 

817 value = entries[key] 

818 if isinstance(value, ConflictedIndexEntry): 

819 if value.ancestor is not None: 

820 entries_list.append( 

821 value.ancestor.serialize(key, Stage.MERGE_CONFLICT_ANCESTOR) 

822 ) 

823 if value.this is not None: 

824 entries_list.append( 

825 value.this.serialize(key, Stage.MERGE_CONFLICT_THIS) 

826 ) 

827 if value.other is not None: 

828 entries_list.append( 

829 value.other.serialize(key, Stage.MERGE_CONFLICT_OTHER) 

830 ) 

831 else: 

832 entries_list.append(value.serialize(key, Stage.NORMAL)) 

833 

834 write_index(f, entries_list, version=version, extensions=extensions) 

835 

836 

837def cleanup_mode(mode: int) -> int: 

838 """Cleanup a mode value. 

839 

840 This will return a mode that can be stored in a tree object. 

841 

842 Args: 

843 mode: Mode to clean up. 

844 

845 Returns: 

846 mode 

847 """ 

848 if stat.S_ISLNK(mode): 

849 return stat.S_IFLNK 

850 elif stat.S_ISDIR(mode): 

851 return stat.S_IFDIR 

852 elif S_ISGITLINK(mode): 

853 return S_IFGITLINK 

854 ret = stat.S_IFREG | 0o644 

855 if mode & 0o100: 

856 ret |= 0o111 

857 return ret 

858 

859 

860class Index: 

861 """A Git Index file.""" 

862 

863 _byname: dict[bytes, Union[IndexEntry, ConflictedIndexEntry]] 

864 

865 def __init__( 

866 self, 

867 filename: Union[bytes, str, os.PathLike], 

868 read: bool = True, 

869 skip_hash: bool = False, 

870 version: Optional[int] = None, 

871 ) -> None: 

872 """Create an index object associated with the given filename. 

873 

874 Args: 

875 filename: Path to the index file 

876 read: Whether to initialize the index from the given file, should it exist. 

877 skip_hash: Whether to skip SHA1 hash when writing (for manyfiles feature) 

878 version: Index format version to use (None = auto-detect from file or use default) 

879 """ 

880 self._filename = os.fspath(filename) 

881 # TODO(jelmer): Store the version returned by read_index 

882 self._version = version 

883 self._skip_hash = skip_hash 

884 self._extensions: list[IndexExtension] = [] 

885 self.clear() 

886 if read: 

887 self.read() 

888 

889 @property 

890 def path(self) -> Union[bytes, str]: 

891 return self._filename 

892 

893 def __repr__(self) -> str: 

894 return f"{self.__class__.__name__}({self._filename!r})" 

895 

896 def write(self) -> None: 

897 """Write current contents of index to disk.""" 

898 from typing import BinaryIO, cast 

899 

900 f = GitFile(self._filename, "wb") 

901 try: 

902 # Filter out extensions with no meaningful data 

903 meaningful_extensions = [] 

904 for ext in self._extensions: 

905 # Skip extensions that have empty data 

906 ext_data = ext.to_bytes() 

907 if ext_data: 

908 meaningful_extensions.append(ext) 

909 

910 if self._skip_hash: 

911 # When skipHash is enabled, write the index without computing SHA1 

912 write_index_dict( 

913 cast(BinaryIO, f), 

914 self._byname, 

915 version=self._version, 

916 extensions=meaningful_extensions, 

917 ) 

918 # Write 20 zero bytes instead of SHA1 

919 f.write(b"\x00" * 20) 

920 f.close() 

921 else: 

922 sha1_writer = SHA1Writer(cast(BinaryIO, f)) 

923 write_index_dict( 

924 cast(BinaryIO, sha1_writer), 

925 self._byname, 

926 version=self._version, 

927 extensions=meaningful_extensions, 

928 ) 

929 sha1_writer.close() 

930 except: 

931 f.close() 

932 raise 

933 

934 def read(self) -> None: 

935 """Read current contents of index from disk.""" 

936 if not os.path.exists(self._filename): 

937 return 

938 f = GitFile(self._filename, "rb") 

939 try: 

940 sha1_reader = SHA1Reader(f) 

941 entries, version, extensions = read_index_dict_with_version( 

942 cast(BinaryIO, sha1_reader) 

943 ) 

944 self._version = version 

945 self._extensions = extensions 

946 self.update(entries) 

947 # Extensions have already been read by read_index_dict_with_version 

948 sha1_reader.check_sha(allow_empty=True) 

949 finally: 

950 f.close() 

951 

952 def __len__(self) -> int: 

953 """Number of entries in this index file.""" 

954 return len(self._byname) 

955 

956 def __getitem__(self, key: bytes) -> Union[IndexEntry, ConflictedIndexEntry]: 

957 """Retrieve entry by relative path and stage. 

958 

959 Returns: Either a IndexEntry or a ConflictedIndexEntry 

960 Raises KeyError: if the entry does not exist 

961 """ 

962 return self._byname[key] 

963 

964 def __iter__(self) -> Iterator[bytes]: 

965 """Iterate over the paths and stages in this index.""" 

966 return iter(self._byname) 

967 

968 def __contains__(self, key: bytes) -> bool: 

969 return key in self._byname 

970 

971 def get_sha1(self, path: bytes) -> bytes: 

972 """Return the (git object) SHA1 for the object at a path.""" 

973 value = self[path] 

974 if isinstance(value, ConflictedIndexEntry): 

975 raise UnmergedEntries 

976 return value.sha 

977 

978 def get_mode(self, path: bytes) -> int: 

979 """Return the POSIX file mode for the object at a path.""" 

980 value = self[path] 

981 if isinstance(value, ConflictedIndexEntry): 

982 raise UnmergedEntries 

983 return value.mode 

984 

985 def iterobjects(self) -> Iterable[tuple[bytes, bytes, int]]: 

986 """Iterate over path, sha, mode tuples for use with commit_tree.""" 

987 for path in self: 

988 entry = self[path] 

989 if isinstance(entry, ConflictedIndexEntry): 

990 raise UnmergedEntries 

991 yield path, entry.sha, cleanup_mode(entry.mode) 

992 

993 def has_conflicts(self) -> bool: 

994 for value in self._byname.values(): 

995 if isinstance(value, ConflictedIndexEntry): 

996 return True 

997 return False 

998 

999 def clear(self) -> None: 

1000 """Remove all contents from this index.""" 

1001 self._byname = {} 

1002 

1003 def __setitem__( 

1004 self, name: bytes, value: Union[IndexEntry, ConflictedIndexEntry] 

1005 ) -> None: 

1006 assert isinstance(name, bytes) 

1007 self._byname[name] = value 

1008 

1009 def __delitem__(self, name: bytes) -> None: 

1010 del self._byname[name] 

1011 

1012 def iteritems( 

1013 self, 

1014 ) -> Iterator[tuple[bytes, Union[IndexEntry, ConflictedIndexEntry]]]: 

1015 return iter(self._byname.items()) 

1016 

1017 def items(self) -> Iterator[tuple[bytes, Union[IndexEntry, ConflictedIndexEntry]]]: 

1018 return iter(self._byname.items()) 

1019 

1020 def update( 

1021 self, entries: dict[bytes, Union[IndexEntry, ConflictedIndexEntry]] 

1022 ) -> None: 

1023 for key, value in entries.items(): 

1024 self[key] = value 

1025 

1026 def paths(self) -> Generator[bytes, None, None]: 

1027 yield from self._byname.keys() 

1028 

1029 def changes_from_tree( 

1030 self, 

1031 object_store: ObjectContainer, 

1032 tree: ObjectID, 

1033 want_unchanged: bool = False, 

1034 ) -> Generator[ 

1035 tuple[ 

1036 tuple[Optional[bytes], Optional[bytes]], 

1037 tuple[Optional[int], Optional[int]], 

1038 tuple[Optional[bytes], Optional[bytes]], 

1039 ], 

1040 None, 

1041 None, 

1042 ]: 

1043 """Find the differences between the contents of this index and a tree. 

1044 

1045 Args: 

1046 object_store: Object store to use for retrieving tree contents 

1047 tree: SHA1 of the root tree 

1048 want_unchanged: Whether unchanged files should be reported 

1049 Returns: Iterator over tuples with (oldpath, newpath), (oldmode, 

1050 newmode), (oldsha, newsha) 

1051 """ 

1052 

1053 def lookup_entry(path: bytes) -> tuple[bytes, int]: 

1054 entry = self[path] 

1055 if hasattr(entry, "sha") and hasattr(entry, "mode"): 

1056 return entry.sha, cleanup_mode(entry.mode) 

1057 else: 

1058 # Handle ConflictedIndexEntry case 

1059 return b"", 0 

1060 

1061 yield from changes_from_tree( 

1062 self.paths(), 

1063 lookup_entry, 

1064 object_store, 

1065 tree, 

1066 want_unchanged=want_unchanged, 

1067 ) 

1068 

1069 def commit(self, object_store: ObjectContainer) -> bytes: 

1070 """Create a new tree from an index. 

1071 

1072 Args: 

1073 object_store: Object store to save the tree in 

1074 Returns: 

1075 Root tree SHA 

1076 """ 

1077 return commit_tree(object_store, self.iterobjects()) 

1078 

1079 

1080def commit_tree( 

1081 object_store: ObjectContainer, blobs: Iterable[tuple[bytes, bytes, int]] 

1082) -> bytes: 

1083 """Commit a new tree. 

1084 

1085 Args: 

1086 object_store: Object store to add trees to 

1087 blobs: Iterable over blob path, sha, mode entries 

1088 Returns: 

1089 SHA1 of the created tree. 

1090 """ 

1091 trees: dict[bytes, Any] = {b"": {}} 

1092 

1093 def add_tree(path: bytes) -> dict[bytes, Any]: 

1094 if path in trees: 

1095 return trees[path] 

1096 dirname, basename = pathsplit(path) 

1097 t = add_tree(dirname) 

1098 assert isinstance(basename, bytes) 

1099 newtree: dict[bytes, Any] = {} 

1100 t[basename] = newtree 

1101 trees[path] = newtree 

1102 return newtree 

1103 

1104 for path, sha, mode in blobs: 

1105 tree_path, basename = pathsplit(path) 

1106 tree = add_tree(tree_path) 

1107 tree[basename] = (mode, sha) 

1108 

1109 def build_tree(path: bytes) -> bytes: 

1110 tree = Tree() 

1111 for basename, entry in trees[path].items(): 

1112 if isinstance(entry, dict): 

1113 mode = stat.S_IFDIR 

1114 sha = build_tree(pathjoin(path, basename)) 

1115 else: 

1116 (mode, sha) = entry 

1117 tree.add(basename, mode, sha) 

1118 object_store.add_object(tree) 

1119 return tree.id 

1120 

1121 return build_tree(b"") 

1122 

1123 

1124def commit_index(object_store: ObjectContainer, index: Index) -> bytes: 

1125 """Create a new tree from an index. 

1126 

1127 Args: 

1128 object_store: Object store to save the tree in 

1129 index: Index file 

1130 Note: This function is deprecated, use index.commit() instead. 

1131 Returns: Root tree sha. 

1132 """ 

1133 return commit_tree(object_store, index.iterobjects()) 

1134 

1135 

1136def changes_from_tree( 

1137 names: Iterable[bytes], 

1138 lookup_entry: Callable[[bytes], tuple[bytes, int]], 

1139 object_store: ObjectContainer, 

1140 tree: Optional[bytes], 

1141 want_unchanged: bool = False, 

1142) -> Iterable[ 

1143 tuple[ 

1144 tuple[Optional[bytes], Optional[bytes]], 

1145 tuple[Optional[int], Optional[int]], 

1146 tuple[Optional[bytes], Optional[bytes]], 

1147 ] 

1148]: 

1149 """Find the differences between the contents of a tree and 

1150 a working copy. 

1151 

1152 Args: 

1153 names: Iterable of names in the working copy 

1154 lookup_entry: Function to lookup an entry in the working copy 

1155 object_store: Object store to use for retrieving tree contents 

1156 tree: SHA1 of the root tree, or None for an empty tree 

1157 want_unchanged: Whether unchanged files should be reported 

1158 Returns: Iterator over tuples with (oldpath, newpath), (oldmode, newmode), 

1159 (oldsha, newsha) 

1160 """ 

1161 # TODO(jelmer): Support a include_trees option 

1162 other_names = set(names) 

1163 

1164 if tree is not None: 

1165 for name, mode, sha in iter_tree_contents(object_store, tree): 

1166 try: 

1167 (other_sha, other_mode) = lookup_entry(name) 

1168 except KeyError: 

1169 # Was removed 

1170 yield ((name, None), (mode, None), (sha, None)) 

1171 else: 

1172 other_names.remove(name) 

1173 if want_unchanged or other_sha != sha or other_mode != mode: 

1174 yield ((name, name), (mode, other_mode), (sha, other_sha)) 

1175 

1176 # Mention added files 

1177 for name in other_names: 

1178 try: 

1179 (other_sha, other_mode) = lookup_entry(name) 

1180 except KeyError: 

1181 pass 

1182 else: 

1183 yield ((None, name), (None, other_mode), (None, other_sha)) 

1184 

1185 

1186def index_entry_from_stat( 

1187 stat_val: os.stat_result, 

1188 hex_sha: bytes, 

1189 mode: Optional[int] = None, 

1190) -> IndexEntry: 

1191 """Create a new index entry from a stat value. 

1192 

1193 Args: 

1194 stat_val: POSIX stat_result instance 

1195 hex_sha: Hex sha of the object 

1196 """ 

1197 if mode is None: 

1198 mode = cleanup_mode(stat_val.st_mode) 

1199 

1200 return IndexEntry( 

1201 ctime=stat_val.st_ctime, 

1202 mtime=stat_val.st_mtime, 

1203 dev=stat_val.st_dev, 

1204 ino=stat_val.st_ino, 

1205 mode=mode, 

1206 uid=stat_val.st_uid, 

1207 gid=stat_val.st_gid, 

1208 size=stat_val.st_size, 

1209 sha=hex_sha, 

1210 flags=0, 

1211 extended_flags=0, 

1212 ) 

1213 

1214 

1215if sys.platform == "win32": 

1216 # On Windows, creating symlinks either requires administrator privileges 

1217 # or developer mode. Raise a more helpful error when we're unable to 

1218 # create symlinks 

1219 

1220 # https://github.com/jelmer/dulwich/issues/1005 

1221 

1222 class WindowsSymlinkPermissionError(PermissionError): 

1223 def __init__(self, errno: int, msg: str, filename: Optional[str]) -> None: 

1224 super(PermissionError, self).__init__( 

1225 errno, 

1226 f"Unable to create symlink; do you have developer mode enabled? {msg}", 

1227 filename, 

1228 ) 

1229 

1230 def symlink( 

1231 src: Union[str, bytes], 

1232 dst: Union[str, bytes], 

1233 target_is_directory: bool = False, 

1234 *, 

1235 dir_fd: Optional[int] = None, 

1236 ) -> None: 

1237 try: 

1238 return os.symlink( 

1239 src, dst, target_is_directory=target_is_directory, dir_fd=dir_fd 

1240 ) 

1241 except PermissionError as e: 

1242 raise WindowsSymlinkPermissionError( 

1243 e.errno or 0, e.strerror or "", e.filename 

1244 ) from e 

1245else: 

1246 symlink = os.symlink 

1247 

1248 

1249def build_file_from_blob( 

1250 blob: Blob, 

1251 mode: int, 

1252 target_path: bytes, 

1253 *, 

1254 honor_filemode: bool = True, 

1255 tree_encoding: str = "utf-8", 

1256 symlink_fn: Optional[Callable] = None, 

1257) -> os.stat_result: 

1258 """Build a file or symlink on disk based on a Git object. 

1259 

1260 Args: 

1261 blob: The git object 

1262 mode: File mode 

1263 target_path: Path to write to 

1264 honor_filemode: An optional flag to honor core.filemode setting in 

1265 config file, default is core.filemode=True, change executable bit 

1266 symlink_fn: Function to use for creating symlinks 

1267 Returns: stat object for the file 

1268 """ 

1269 try: 

1270 oldstat = os.lstat(target_path) 

1271 except FileNotFoundError: 

1272 oldstat = None 

1273 contents = blob.as_raw_string() 

1274 if stat.S_ISLNK(mode): 

1275 if oldstat: 

1276 _remove_file_with_readonly_handling(target_path) 

1277 if sys.platform == "win32": 

1278 # os.readlink on Python3 on Windows requires a unicode string. 

1279 contents_str = contents.decode(tree_encoding) 

1280 target_path_str = target_path.decode(tree_encoding) 

1281 (symlink_fn or symlink)(contents_str, target_path_str) 

1282 else: 

1283 (symlink_fn or symlink)(contents, target_path) 

1284 else: 

1285 if oldstat is not None and oldstat.st_size == len(contents): 

1286 with open(target_path, "rb") as f: 

1287 if f.read() == contents: 

1288 return oldstat 

1289 

1290 with open(target_path, "wb") as f: 

1291 # Write out file 

1292 f.write(contents) 

1293 

1294 if honor_filemode: 

1295 os.chmod(target_path, mode) 

1296 

1297 return os.lstat(target_path) 

1298 

1299 

1300INVALID_DOTNAMES = (b".git", b".", b"..", b"") 

1301 

1302 

1303def _normalize_path_element_default(element: bytes) -> bytes: 

1304 """Normalize path element for default case-insensitive comparison.""" 

1305 return element.lower() 

1306 

1307 

1308def _normalize_path_element_ntfs(element: bytes) -> bytes: 

1309 """Normalize path element for NTFS filesystem.""" 

1310 return element.rstrip(b". ").lower() 

1311 

1312 

1313def _normalize_path_element_hfs(element: bytes) -> bytes: 

1314 """Normalize path element for HFS+ filesystem.""" 

1315 import unicodedata 

1316 

1317 # Decode to Unicode (let UnicodeDecodeError bubble up) 

1318 element_str = element.decode("utf-8", errors="strict") 

1319 

1320 # Remove HFS+ ignorable characters 

1321 filtered = "".join(c for c in element_str if ord(c) not in HFS_IGNORABLE_CHARS) 

1322 # Normalize to NFD 

1323 normalized = unicodedata.normalize("NFD", filtered) 

1324 return normalized.lower().encode("utf-8", errors="strict") 

1325 

1326 

1327def get_path_element_normalizer(config) -> Callable[[bytes], bytes]: 

1328 """Get the appropriate path element normalization function based on config. 

1329 

1330 Args: 

1331 config: Repository configuration object 

1332 

1333 Returns: 

1334 Function that normalizes path elements for the configured filesystem 

1335 """ 

1336 import os 

1337 import sys 

1338 

1339 if config.get_boolean(b"core", b"protectNTFS", os.name == "nt"): 

1340 return _normalize_path_element_ntfs 

1341 elif config.get_boolean(b"core", b"protectHFS", sys.platform == "darwin"): 

1342 return _normalize_path_element_hfs 

1343 else: 

1344 return _normalize_path_element_default 

1345 

1346 

1347def validate_path_element_default(element: bytes) -> bool: 

1348 return _normalize_path_element_default(element) not in INVALID_DOTNAMES 

1349 

1350 

1351def validate_path_element_ntfs(element: bytes) -> bool: 

1352 normalized = _normalize_path_element_ntfs(element) 

1353 if normalized in INVALID_DOTNAMES: 

1354 return False 

1355 if normalized == b"git~1": 

1356 return False 

1357 return True 

1358 

1359 

1360# HFS+ ignorable Unicode codepoints (from Git's utf8.c) 

1361HFS_IGNORABLE_CHARS = { 

1362 0x200C, # ZERO WIDTH NON-JOINER 

1363 0x200D, # ZERO WIDTH JOINER 

1364 0x200E, # LEFT-TO-RIGHT MARK 

1365 0x200F, # RIGHT-TO-LEFT MARK 

1366 0x202A, # LEFT-TO-RIGHT EMBEDDING 

1367 0x202B, # RIGHT-TO-LEFT EMBEDDING 

1368 0x202C, # POP DIRECTIONAL FORMATTING 

1369 0x202D, # LEFT-TO-RIGHT OVERRIDE 

1370 0x202E, # RIGHT-TO-LEFT OVERRIDE 

1371 0x206A, # INHIBIT SYMMETRIC SWAPPING 

1372 0x206B, # ACTIVATE SYMMETRIC SWAPPING 

1373 0x206C, # INHIBIT ARABIC FORM SHAPING 

1374 0x206D, # ACTIVATE ARABIC FORM SHAPING 

1375 0x206E, # NATIONAL DIGIT SHAPES 

1376 0x206F, # NOMINAL DIGIT SHAPES 

1377 0xFEFF, # ZERO WIDTH NO-BREAK SPACE 

1378} 

1379 

1380 

1381def validate_path_element_hfs(element: bytes) -> bool: 

1382 """Validate path element for HFS+ filesystem. 

1383 

1384 Equivalent to Git's is_hfs_dotgit and related checks. 

1385 Uses NFD normalization and ignores HFS+ ignorable characters. 

1386 """ 

1387 try: 

1388 normalized = _normalize_path_element_hfs(element) 

1389 except UnicodeDecodeError: 

1390 # Malformed UTF-8 - be conservative and reject 

1391 return False 

1392 

1393 # Check against invalid names 

1394 if normalized in INVALID_DOTNAMES: 

1395 return False 

1396 

1397 # Also check for 8.3 short name 

1398 if normalized == b"git~1": 

1399 return False 

1400 

1401 return True 

1402 

1403 

1404def validate_path( 

1405 path: bytes, 

1406 element_validator: Callable[[bytes], bool] = validate_path_element_default, 

1407) -> bool: 

1408 """Default path validator that just checks for .git/.""" 

1409 parts = path.split(b"/") 

1410 for p in parts: 

1411 if not element_validator(p): 

1412 return False 

1413 else: 

1414 return True 

1415 

1416 

1417def build_index_from_tree( 

1418 root_path: Union[str, bytes], 

1419 index_path: Union[str, bytes], 

1420 object_store: ObjectContainer, 

1421 tree_id: bytes, 

1422 honor_filemode: bool = True, 

1423 validate_path_element: Callable[[bytes], bool] = validate_path_element_default, 

1424 symlink_fn: Optional[Callable] = None, 

1425 blob_normalizer: Optional["BlobNormalizer"] = None, 

1426 tree_encoding: str = "utf-8", 

1427) -> None: 

1428 """Generate and materialize index from a tree. 

1429 

1430 Args: 

1431 tree_id: Tree to materialize 

1432 root_path: Target dir for materialized index files 

1433 index_path: Target path for generated index 

1434 object_store: Non-empty object store holding tree contents 

1435 honor_filemode: An optional flag to honor core.filemode setting in 

1436 config file, default is core.filemode=True, change executable bit 

1437 validate_path_element: Function to validate path elements to check 

1438 out; default just refuses .git and .. directories. 

1439 blob_normalizer: An optional BlobNormalizer to use for converting line 

1440 endings when writing blobs to the working directory. 

1441 tree_encoding: Encoding used for tree paths (default: utf-8) 

1442 

1443 Note: existing index is wiped and contents are not merged 

1444 in a working dir. Suitable only for fresh clones. 

1445 """ 

1446 index = Index(index_path, read=False) 

1447 if not isinstance(root_path, bytes): 

1448 root_path = os.fsencode(root_path) 

1449 

1450 for entry in iter_tree_contents(object_store, tree_id): 

1451 if not validate_path(entry.path, validate_path_element): 

1452 continue 

1453 full_path = _tree_to_fs_path(root_path, entry.path, tree_encoding) 

1454 

1455 if not os.path.exists(os.path.dirname(full_path)): 

1456 os.makedirs(os.path.dirname(full_path)) 

1457 

1458 # TODO(jelmer): Merge new index into working tree 

1459 if S_ISGITLINK(entry.mode): 

1460 if not os.path.isdir(full_path): 

1461 os.mkdir(full_path) 

1462 st = os.lstat(full_path) 

1463 # TODO(jelmer): record and return submodule paths 

1464 else: 

1465 obj = object_store[entry.sha] 

1466 assert isinstance(obj, Blob) 

1467 # Apply blob normalization for checkout if normalizer is provided 

1468 if blob_normalizer is not None: 

1469 obj = blob_normalizer.checkout_normalize(obj, entry.path) 

1470 st = build_file_from_blob( 

1471 obj, 

1472 entry.mode, 

1473 full_path, 

1474 honor_filemode=honor_filemode, 

1475 tree_encoding=tree_encoding, 

1476 symlink_fn=symlink_fn, 

1477 ) 

1478 

1479 # Add file to index 

1480 if not honor_filemode or S_ISGITLINK(entry.mode): 

1481 # we can not use tuple slicing to build a new tuple, 

1482 # because on windows that will convert the times to 

1483 # longs, which causes errors further along 

1484 st_tuple = ( 

1485 entry.mode, 

1486 st.st_ino, 

1487 st.st_dev, 

1488 st.st_nlink, 

1489 st.st_uid, 

1490 st.st_gid, 

1491 st.st_size, 

1492 st.st_atime, 

1493 st.st_mtime, 

1494 st.st_ctime, 

1495 ) 

1496 st = st.__class__(st_tuple) 

1497 # default to a stage 0 index entry (normal) 

1498 # when reading from the filesystem 

1499 index[entry.path] = index_entry_from_stat(st, entry.sha) 

1500 

1501 index.write() 

1502 

1503 

1504def blob_from_path_and_mode( 

1505 fs_path: bytes, mode: int, tree_encoding: str = "utf-8" 

1506) -> Blob: 

1507 """Create a blob from a path and a stat object. 

1508 

1509 Args: 

1510 fs_path: Full file system path to file 

1511 mode: File mode 

1512 Returns: A `Blob` object 

1513 """ 

1514 assert isinstance(fs_path, bytes) 

1515 blob = Blob() 

1516 if stat.S_ISLNK(mode): 

1517 if sys.platform == "win32": 

1518 # os.readlink on Python3 on Windows requires a unicode string. 

1519 blob.data = os.readlink(os.fsdecode(fs_path)).encode(tree_encoding) 

1520 else: 

1521 blob.data = os.readlink(fs_path) 

1522 else: 

1523 with open(fs_path, "rb") as f: 

1524 blob.data = f.read() 

1525 return blob 

1526 

1527 

1528def blob_from_path_and_stat( 

1529 fs_path: bytes, st: os.stat_result, tree_encoding: str = "utf-8" 

1530) -> Blob: 

1531 """Create a blob from a path and a stat object. 

1532 

1533 Args: 

1534 fs_path: Full file system path to file 

1535 st: A stat object 

1536 Returns: A `Blob` object 

1537 """ 

1538 return blob_from_path_and_mode(fs_path, st.st_mode, tree_encoding) 

1539 

1540 

1541def read_submodule_head(path: Union[str, bytes]) -> Optional[bytes]: 

1542 """Read the head commit of a submodule. 

1543 

1544 Args: 

1545 path: path to the submodule 

1546 Returns: HEAD sha, None if not a valid head/repository 

1547 """ 

1548 from .errors import NotGitRepository 

1549 from .repo import Repo 

1550 

1551 # Repo currently expects a "str", so decode if necessary. 

1552 # TODO(jelmer): Perhaps move this into Repo() ? 

1553 if not isinstance(path, str): 

1554 path = os.fsdecode(path) 

1555 try: 

1556 repo = Repo(path) 

1557 except NotGitRepository: 

1558 return None 

1559 try: 

1560 return repo.head() 

1561 except KeyError: 

1562 return None 

1563 

1564 

1565def _has_directory_changed(tree_path: bytes, entry: IndexEntry) -> bool: 

1566 """Check if a directory has changed after getting an error. 

1567 

1568 When handling an error trying to create a blob from a path, call this 

1569 function. It will check if the path is a directory. If it's a directory 

1570 and a submodule, check the submodule head to see if it's has changed. If 

1571 not, consider the file as changed as Git tracked a file and not a 

1572 directory. 

1573 

1574 Return true if the given path should be considered as changed and False 

1575 otherwise or if the path is not a directory. 

1576 """ 

1577 # This is actually a directory 

1578 if os.path.exists(os.path.join(tree_path, b".git")): 

1579 # Submodule 

1580 head = read_submodule_head(tree_path) 

1581 if entry.sha != head: 

1582 return True 

1583 else: 

1584 # The file was changed to a directory, so consider it removed. 

1585 return True 

1586 

1587 return False 

1588 

1589 

1590os_sep_bytes = os.sep.encode("ascii") 

1591 

1592 

1593def _ensure_parent_dir_exists(full_path: bytes) -> None: 

1594 """Ensure parent directory exists, checking no parent is a file.""" 

1595 parent_dir = os.path.dirname(full_path) 

1596 if parent_dir and not os.path.exists(parent_dir): 

1597 # Check if any parent in the path is a file 

1598 parts = parent_dir.split(os_sep_bytes) 

1599 for i in range(len(parts)): 

1600 partial_path = os_sep_bytes.join(parts[: i + 1]) 

1601 if ( 

1602 partial_path 

1603 and os.path.exists(partial_path) 

1604 and not os.path.isdir(partial_path) 

1605 ): 

1606 # Parent path is a file, this is an error 

1607 raise OSError( 

1608 f"Cannot create directory, parent path is a file: {partial_path!r}" 

1609 ) 

1610 os.makedirs(parent_dir) 

1611 

1612 

1613def _remove_file_with_readonly_handling(path: bytes) -> None: 

1614 """Remove a file, handling read-only files on Windows. 

1615 

1616 Args: 

1617 path: Path to the file to remove 

1618 """ 

1619 try: 

1620 os.unlink(path) 

1621 except PermissionError: 

1622 # On Windows, remove read-only attribute and retry 

1623 if sys.platform == "win32": 

1624 os.chmod(path, stat.S_IWRITE | stat.S_IREAD) 

1625 os.unlink(path) 

1626 else: 

1627 raise 

1628 

1629 

1630def _remove_empty_parents(path: bytes, stop_at: bytes) -> None: 

1631 """Remove empty parent directories up to stop_at.""" 

1632 parent = os.path.dirname(path) 

1633 while parent and parent != stop_at: 

1634 try: 

1635 os.rmdir(parent) 

1636 parent = os.path.dirname(parent) 

1637 except FileNotFoundError: 

1638 # Directory doesn't exist - stop trying 

1639 break 

1640 except OSError as e: 

1641 if e.errno == errno.ENOTEMPTY: 

1642 # Directory not empty - stop trying 

1643 break 

1644 raise 

1645 

1646 

1647def _check_symlink_matches( 

1648 full_path: bytes, repo_object_store, entry_sha: bytes 

1649) -> bool: 

1650 """Check if symlink target matches expected target. 

1651 

1652 Returns True if symlink matches, False if it doesn't match. 

1653 """ 

1654 try: 

1655 current_target = os.readlink(full_path) 

1656 blob_obj = repo_object_store[entry_sha] 

1657 expected_target = blob_obj.as_raw_string() 

1658 if isinstance(current_target, str): 

1659 current_target = current_target.encode() 

1660 return current_target == expected_target 

1661 except FileNotFoundError: 

1662 # Symlink doesn't exist 

1663 return False 

1664 except OSError as e: 

1665 if e.errno == errno.EINVAL: 

1666 # Not a symlink 

1667 return False 

1668 raise 

1669 

1670 

1671def _check_file_matches( 

1672 repo_object_store, 

1673 full_path: bytes, 

1674 entry_sha: bytes, 

1675 entry_mode: int, 

1676 current_stat: os.stat_result, 

1677 honor_filemode: bool, 

1678 blob_normalizer: Optional["BlobNormalizer"] = None, 

1679 tree_path: Optional[bytes] = None, 

1680) -> bool: 

1681 """Check if a file on disk matches the expected git object. 

1682 

1683 Returns True if file matches, False if it doesn't match. 

1684 """ 

1685 # Check mode first (if honor_filemode is True) 

1686 if honor_filemode: 

1687 current_mode = stat.S_IMODE(current_stat.st_mode) 

1688 expected_mode = stat.S_IMODE(entry_mode) 

1689 

1690 # For regular files, only check the user executable bit, not group/other permissions 

1691 # This matches Git's behavior where umask differences don't count as modifications 

1692 if stat.S_ISREG(current_stat.st_mode): 

1693 # Normalize regular file modes to ignore group/other write permissions 

1694 current_mode_normalized = ( 

1695 current_mode & 0o755 

1696 ) # Keep only user rwx and all read+execute 

1697 expected_mode_normalized = expected_mode & 0o755 

1698 

1699 # For Git compatibility, regular files should be either 644 or 755 

1700 if expected_mode_normalized not in (0o644, 0o755): 

1701 expected_mode_normalized = 0o644 # Default for regular files 

1702 if current_mode_normalized not in (0o644, 0o755): 

1703 # Determine if it should be executable based on user execute bit 

1704 if current_mode & 0o100: # User execute bit is set 

1705 current_mode_normalized = 0o755 

1706 else: 

1707 current_mode_normalized = 0o644 

1708 

1709 if current_mode_normalized != expected_mode_normalized: 

1710 return False 

1711 else: 

1712 # For non-regular files (symlinks, etc.), check mode exactly 

1713 if current_mode != expected_mode: 

1714 return False 

1715 

1716 # If mode matches (or we don't care), check content via size first 

1717 blob_obj = repo_object_store[entry_sha] 

1718 if current_stat.st_size != blob_obj.raw_length(): 

1719 return False 

1720 

1721 # Size matches, check actual content 

1722 try: 

1723 with open(full_path, "rb") as f: 

1724 current_content = f.read() 

1725 expected_content = blob_obj.as_raw_string() 

1726 if blob_normalizer and tree_path is not None: 

1727 normalized_blob = blob_normalizer.checkout_normalize( 

1728 blob_obj, tree_path 

1729 ) 

1730 expected_content = normalized_blob.as_raw_string() 

1731 return current_content == expected_content 

1732 except (FileNotFoundError, PermissionError, IsADirectoryError): 

1733 return False 

1734 

1735 

1736def _transition_to_submodule(repo, path, full_path, current_stat, entry, index): 

1737 """Transition any type to submodule.""" 

1738 from .submodule import ensure_submodule_placeholder 

1739 

1740 if current_stat is not None and stat.S_ISDIR(current_stat.st_mode): 

1741 # Already a directory, just ensure .git file exists 

1742 ensure_submodule_placeholder(repo, path) 

1743 else: 

1744 # Remove whatever is there and create submodule 

1745 if current_stat is not None: 

1746 _remove_file_with_readonly_handling(full_path) 

1747 ensure_submodule_placeholder(repo, path) 

1748 

1749 st = os.lstat(full_path) 

1750 index[path] = index_entry_from_stat(st, entry.sha) 

1751 

1752 

1753def _transition_to_file( 

1754 object_store, 

1755 path, 

1756 full_path, 

1757 current_stat, 

1758 entry, 

1759 index, 

1760 honor_filemode, 

1761 symlink_fn, 

1762 blob_normalizer, 

1763 tree_encoding="utf-8", 

1764): 

1765 """Transition any type to regular file or symlink.""" 

1766 # Check if we need to update 

1767 if ( 

1768 current_stat is not None 

1769 and stat.S_ISREG(current_stat.st_mode) 

1770 and not stat.S_ISLNK(entry.mode) 

1771 ): 

1772 # File to file - check if update needed 

1773 file_matches = _check_file_matches( 

1774 object_store, 

1775 full_path, 

1776 entry.sha, 

1777 entry.mode, 

1778 current_stat, 

1779 honor_filemode, 

1780 blob_normalizer, 

1781 path, 

1782 ) 

1783 needs_update = not file_matches 

1784 elif ( 

1785 current_stat is not None 

1786 and stat.S_ISLNK(current_stat.st_mode) 

1787 and stat.S_ISLNK(entry.mode) 

1788 ): 

1789 # Symlink to symlink - check if update needed 

1790 symlink_matches = _check_symlink_matches(full_path, object_store, entry.sha) 

1791 needs_update = not symlink_matches 

1792 else: 

1793 needs_update = True 

1794 

1795 if not needs_update: 

1796 # Just update index - current_stat should always be valid here since we're not updating 

1797 index[path] = index_entry_from_stat(current_stat, entry.sha) 

1798 return 

1799 

1800 # Remove existing entry if needed 

1801 if current_stat is not None and stat.S_ISDIR(current_stat.st_mode): 

1802 # Remove directory 

1803 dir_contents = set(os.listdir(full_path)) 

1804 git_file_name = b".git" if isinstance(full_path, bytes) else ".git" 

1805 

1806 if git_file_name in dir_contents: 

1807 if dir_contents != {git_file_name}: 

1808 raise IsADirectoryError( 

1809 f"Cannot replace submodule with untracked files: {full_path!r}" 

1810 ) 

1811 shutil.rmtree(full_path) 

1812 else: 

1813 try: 

1814 os.rmdir(full_path) 

1815 except OSError as e: 

1816 if e.errno == errno.ENOTEMPTY: 

1817 raise IsADirectoryError( 

1818 f"Cannot replace non-empty directory with file: {full_path!r}" 

1819 ) 

1820 raise 

1821 elif current_stat is not None: 

1822 _remove_file_with_readonly_handling(full_path) 

1823 

1824 # Ensure parent directory exists 

1825 _ensure_parent_dir_exists(full_path) 

1826 

1827 # Write the file 

1828 blob_obj = object_store[entry.sha] 

1829 assert isinstance(blob_obj, Blob) 

1830 if blob_normalizer: 

1831 blob_obj = blob_normalizer.checkout_normalize(blob_obj, path) 

1832 st = build_file_from_blob( 

1833 blob_obj, 

1834 entry.mode, 

1835 full_path, 

1836 honor_filemode=honor_filemode, 

1837 tree_encoding=tree_encoding, 

1838 symlink_fn=symlink_fn, 

1839 ) 

1840 index[path] = index_entry_from_stat(st, entry.sha) 

1841 

1842 

1843def _transition_to_absent(repo, path, full_path, current_stat, index): 

1844 """Remove any type of entry.""" 

1845 if current_stat is None: 

1846 return 

1847 

1848 if stat.S_ISDIR(current_stat.st_mode): 

1849 # Check if it's a submodule directory 

1850 dir_contents = set(os.listdir(full_path)) 

1851 git_file_name = b".git" if isinstance(full_path, bytes) else ".git" 

1852 

1853 if git_file_name in dir_contents and dir_contents == {git_file_name}: 

1854 shutil.rmtree(full_path) 

1855 else: 

1856 try: 

1857 os.rmdir(full_path) 

1858 except OSError as e: 

1859 if e.errno not in (errno.ENOTEMPTY, errno.EEXIST): 

1860 raise 

1861 else: 

1862 _remove_file_with_readonly_handling(full_path) 

1863 

1864 try: 

1865 del index[path] 

1866 except KeyError: 

1867 pass 

1868 

1869 # Try to remove empty parent directories 

1870 _remove_empty_parents( 

1871 full_path, repo.path if isinstance(repo.path, bytes) else repo.path.encode() 

1872 ) 

1873 

1874 

1875def detect_case_only_renames( 

1876 changes: list["TreeChange"], 

1877 config: "Config", 

1878) -> list["TreeChange"]: 

1879 """Detect and transform case-only renames in a list of tree changes. 

1880 

1881 This function identifies file renames that only differ in case (e.g., 

1882 README.txt -> readme.txt) and transforms matching ADD/DELETE pairs into 

1883 CHANGE_RENAME operations. It uses filesystem-appropriate path normalization 

1884 based on the repository configuration. 

1885 

1886 Args: 

1887 changes: List of TreeChange objects representing file changes 

1888 config: Repository configuration object 

1889 

1890 Returns: 

1891 New list of TreeChange objects with case-only renames converted to CHANGE_RENAME 

1892 """ 

1893 from .diff_tree import ( 

1894 CHANGE_ADD, 

1895 CHANGE_COPY, 

1896 CHANGE_DELETE, 

1897 CHANGE_MODIFY, 

1898 CHANGE_RENAME, 

1899 TreeChange, 

1900 ) 

1901 

1902 # Build dictionaries of old and new paths with their normalized forms 

1903 old_paths_normalized = {} 

1904 new_paths_normalized = {} 

1905 old_changes = {} # Map from old path to change object 

1906 new_changes = {} # Map from new path to change object 

1907 

1908 # Get the appropriate normalizer based on config 

1909 normalize_func = get_path_element_normalizer(config) 

1910 

1911 def normalize_path(path: bytes) -> bytes: 

1912 """Normalize entire path using element normalization.""" 

1913 return b"/".join(normalize_func(part) for part in path.split(b"/")) 

1914 

1915 # Pre-normalize all paths once to avoid repeated normalization 

1916 for change in changes: 

1917 if change.type == CHANGE_DELETE and change.old: 

1918 try: 

1919 normalized = normalize_path(change.old.path) 

1920 except UnicodeDecodeError: 

1921 import logging 

1922 

1923 logging.warning( 

1924 "Skipping case-only rename detection for path with invalid UTF-8: %r", 

1925 change.old.path, 

1926 ) 

1927 else: 

1928 old_paths_normalized[normalized] = change.old.path 

1929 old_changes[change.old.path] = change 

1930 elif change.type == CHANGE_RENAME and change.old: 

1931 # Treat RENAME as DELETE + ADD for case-only detection 

1932 try: 

1933 normalized = normalize_path(change.old.path) 

1934 except UnicodeDecodeError: 

1935 import logging 

1936 

1937 logging.warning( 

1938 "Skipping case-only rename detection for path with invalid UTF-8: %r", 

1939 change.old.path, 

1940 ) 

1941 else: 

1942 old_paths_normalized[normalized] = change.old.path 

1943 old_changes[change.old.path] = change 

1944 

1945 if ( 

1946 change.type in (CHANGE_ADD, CHANGE_MODIFY, CHANGE_RENAME, CHANGE_COPY) 

1947 and change.new 

1948 ): 

1949 try: 

1950 normalized = normalize_path(change.new.path) 

1951 except UnicodeDecodeError: 

1952 import logging 

1953 

1954 logging.warning( 

1955 "Skipping case-only rename detection for path with invalid UTF-8: %r", 

1956 change.new.path, 

1957 ) 

1958 else: 

1959 new_paths_normalized[normalized] = change.new.path 

1960 new_changes[change.new.path] = change 

1961 

1962 # Find case-only renames and transform changes 

1963 case_only_renames = set() 

1964 new_rename_changes = [] 

1965 

1966 for norm_path, old_path in old_paths_normalized.items(): 

1967 if norm_path in new_paths_normalized: 

1968 new_path = new_paths_normalized[norm_path] 

1969 if old_path != new_path: 

1970 # Found a case-only rename 

1971 old_change = old_changes[old_path] 

1972 new_change = new_changes[new_path] 

1973 

1974 # Create a CHANGE_RENAME to replace the DELETE and ADD/MODIFY pair 

1975 if new_change.type == CHANGE_ADD: 

1976 # Simple case: DELETE + ADD becomes RENAME 

1977 rename_change = TreeChange( 

1978 CHANGE_RENAME, old_change.old, new_change.new 

1979 ) 

1980 else: 

1981 # Complex case: DELETE + MODIFY becomes RENAME 

1982 # Use the old file from DELETE and new file from MODIFY 

1983 rename_change = TreeChange( 

1984 CHANGE_RENAME, old_change.old, new_change.new 

1985 ) 

1986 

1987 new_rename_changes.append(rename_change) 

1988 

1989 # Mark the old changes for removal 

1990 case_only_renames.add(old_change) 

1991 case_only_renames.add(new_change) 

1992 

1993 # Return new list with original ADD/DELETE changes replaced by renames 

1994 result = [change for change in changes if change not in case_only_renames] 

1995 result.extend(new_rename_changes) 

1996 return result 

1997 

1998 

1999def update_working_tree( 

2000 repo: "Repo", 

2001 old_tree_id: Optional[bytes], 

2002 new_tree_id: bytes, 

2003 change_iterator: Iterator["TreeChange"], 

2004 honor_filemode: bool = True, 

2005 validate_path_element: Optional[Callable[[bytes], bool]] = None, 

2006 symlink_fn: Optional[Callable] = None, 

2007 force_remove_untracked: bool = False, 

2008 blob_normalizer: Optional["BlobNormalizer"] = None, 

2009 tree_encoding: str = "utf-8", 

2010 allow_overwrite_modified: bool = False, 

2011) -> None: 

2012 """Update the working tree and index to match a new tree. 

2013 

2014 This function handles: 

2015 - Adding new files 

2016 - Updating modified files 

2017 - Removing deleted files 

2018 - Cleaning up empty directories 

2019 

2020 Args: 

2021 repo: Repository object 

2022 old_tree_id: SHA of the tree before the update 

2023 new_tree_id: SHA of the tree to update to 

2024 change_iterator: Iterator of TreeChange objects to apply 

2025 honor_filemode: An optional flag to honor core.filemode setting 

2026 validate_path_element: Function to validate path elements to check out 

2027 symlink_fn: Function to use for creating symlinks 

2028 force_remove_untracked: If True, remove files that exist in working 

2029 directory but not in target tree, even if old_tree_id is None 

2030 blob_normalizer: An optional BlobNormalizer to use for converting line 

2031 endings when writing blobs to the working directory. 

2032 tree_encoding: Encoding used for tree paths (default: utf-8) 

2033 allow_overwrite_modified: If False, raise an error when attempting to 

2034 overwrite files that have been modified compared to old_tree_id 

2035 """ 

2036 if validate_path_element is None: 

2037 validate_path_element = validate_path_element_default 

2038 

2039 from .diff_tree import ( 

2040 CHANGE_ADD, 

2041 CHANGE_COPY, 

2042 CHANGE_DELETE, 

2043 CHANGE_MODIFY, 

2044 CHANGE_RENAME, 

2045 CHANGE_UNCHANGED, 

2046 ) 

2047 

2048 repo_path = repo.path if isinstance(repo.path, bytes) else repo.path.encode() 

2049 index = repo.open_index() 

2050 

2051 # Convert iterator to list since we need multiple passes 

2052 changes = list(change_iterator) 

2053 

2054 # Transform case-only renames on case-insensitive filesystems 

2055 import platform 

2056 

2057 default_ignore_case = platform.system() in ("Windows", "Darwin") 

2058 config = repo.get_config() 

2059 ignore_case = config.get_boolean((b"core",), b"ignorecase", default_ignore_case) 

2060 

2061 if ignore_case: 

2062 config = repo.get_config() 

2063 changes = detect_case_only_renames(changes, config) 

2064 

2065 # Check for path conflicts where files need to become directories 

2066 paths_becoming_dirs = set() 

2067 for change in changes: 

2068 if change.type in (CHANGE_ADD, CHANGE_MODIFY, CHANGE_RENAME, CHANGE_COPY): 

2069 path = change.new.path 

2070 if b"/" in path: # This is a file inside a directory 

2071 # Check if any parent path exists as a file in the old tree or changes 

2072 parts = path.split(b"/") 

2073 for i in range(1, len(parts)): 

2074 parent = b"/".join(parts[:i]) 

2075 # See if this parent path is being deleted (was a file, becoming a dir) 

2076 for other_change in changes: 

2077 if ( 

2078 other_change.type == CHANGE_DELETE 

2079 and other_change.old 

2080 and other_change.old.path == parent 

2081 ): 

2082 paths_becoming_dirs.add(parent) 

2083 

2084 # Check if any path that needs to become a directory has been modified 

2085 for path in paths_becoming_dirs: 

2086 full_path = _tree_to_fs_path(repo_path, path, tree_encoding) 

2087 try: 

2088 current_stat = os.lstat(full_path) 

2089 except FileNotFoundError: 

2090 continue # File doesn't exist, nothing to check 

2091 except OSError as e: 

2092 raise OSError( 

2093 f"Cannot access {path.decode('utf-8', errors='replace')}: {e}" 

2094 ) from e 

2095 

2096 if stat.S_ISREG(current_stat.st_mode): 

2097 # Find the old entry for this path 

2098 old_change = None 

2099 for change in changes: 

2100 if ( 

2101 change.type == CHANGE_DELETE 

2102 and change.old 

2103 and change.old.path == path 

2104 ): 

2105 old_change = change 

2106 break 

2107 

2108 if old_change: 

2109 # Check if file has been modified 

2110 file_matches = _check_file_matches( 

2111 repo.object_store, 

2112 full_path, 

2113 old_change.old.sha, 

2114 old_change.old.mode, 

2115 current_stat, 

2116 honor_filemode, 

2117 blob_normalizer, 

2118 path, 

2119 ) 

2120 if not file_matches: 

2121 raise OSError( 

2122 f"Cannot replace modified file with directory: {path!r}" 

2123 ) 

2124 

2125 # Check for uncommitted modifications before making any changes 

2126 if not allow_overwrite_modified and old_tree_id: 

2127 for change in changes: 

2128 # Only check files that are being modified or deleted 

2129 if change.type in (CHANGE_MODIFY, CHANGE_DELETE) and change.old: 

2130 path = change.old.path 

2131 if path.startswith(b".git") or not validate_path( 

2132 path, validate_path_element 

2133 ): 

2134 continue 

2135 

2136 full_path = _tree_to_fs_path(repo_path, path, tree_encoding) 

2137 try: 

2138 current_stat = os.lstat(full_path) 

2139 except FileNotFoundError: 

2140 continue # File doesn't exist, nothing to check 

2141 except OSError as e: 

2142 raise OSError( 

2143 f"Cannot access {path.decode('utf-8', errors='replace')}: {e}" 

2144 ) from e 

2145 

2146 if stat.S_ISREG(current_stat.st_mode): 

2147 # Check if working tree file differs from old tree 

2148 file_matches = _check_file_matches( 

2149 repo.object_store, 

2150 full_path, 

2151 change.old.sha, 

2152 change.old.mode, 

2153 current_stat, 

2154 honor_filemode, 

2155 blob_normalizer, 

2156 path, 

2157 ) 

2158 if not file_matches: 

2159 from .errors import WorkingTreeModifiedError 

2160 

2161 raise WorkingTreeModifiedError( 

2162 f"Your local changes to '{path.decode('utf-8', errors='replace')}' " 

2163 f"would be overwritten by checkout. " 

2164 f"Please commit your changes or stash them before you switch branches." 

2165 ) 

2166 

2167 # Apply the changes 

2168 for change in changes: 

2169 if change.type in (CHANGE_DELETE, CHANGE_RENAME): 

2170 # Remove file/directory 

2171 path = change.old.path 

2172 if path.startswith(b".git") or not validate_path( 

2173 path, validate_path_element 

2174 ): 

2175 continue 

2176 

2177 full_path = _tree_to_fs_path(repo_path, path, tree_encoding) 

2178 try: 

2179 delete_stat: Optional[os.stat_result] = os.lstat(full_path) 

2180 except FileNotFoundError: 

2181 delete_stat = None 

2182 except OSError as e: 

2183 raise OSError( 

2184 f"Cannot access {path.decode('utf-8', errors='replace')}: {e}" 

2185 ) from e 

2186 

2187 _transition_to_absent(repo, path, full_path, delete_stat, index) 

2188 

2189 if change.type in ( 

2190 CHANGE_ADD, 

2191 CHANGE_MODIFY, 

2192 CHANGE_UNCHANGED, 

2193 CHANGE_COPY, 

2194 CHANGE_RENAME, 

2195 ): 

2196 # Add or modify file 

2197 path = change.new.path 

2198 if path.startswith(b".git") or not validate_path( 

2199 path, validate_path_element 

2200 ): 

2201 continue 

2202 

2203 full_path = _tree_to_fs_path(repo_path, path, tree_encoding) 

2204 try: 

2205 modify_stat: Optional[os.stat_result] = os.lstat(full_path) 

2206 except FileNotFoundError: 

2207 modify_stat = None 

2208 except OSError as e: 

2209 raise OSError( 

2210 f"Cannot access {path.decode('utf-8', errors='replace')}: {e}" 

2211 ) from e 

2212 

2213 if S_ISGITLINK(change.new.mode): 

2214 _transition_to_submodule( 

2215 repo, path, full_path, modify_stat, change.new, index 

2216 ) 

2217 else: 

2218 _transition_to_file( 

2219 repo.object_store, 

2220 path, 

2221 full_path, 

2222 modify_stat, 

2223 change.new, 

2224 index, 

2225 honor_filemode, 

2226 symlink_fn, 

2227 blob_normalizer, 

2228 tree_encoding, 

2229 ) 

2230 

2231 index.write() 

2232 

2233 

2234def get_unstaged_changes( 

2235 index: Index, 

2236 root_path: Union[str, bytes], 

2237 filter_blob_callback: Optional[Callable] = None, 

2238) -> Generator[bytes, None, None]: 

2239 """Walk through an index and check for differences against working tree. 

2240 

2241 Args: 

2242 index: index to check 

2243 root_path: path in which to find files 

2244 Returns: iterator over paths with unstaged changes 

2245 """ 

2246 # For each entry in the index check the sha1 & ensure not staged 

2247 if not isinstance(root_path, bytes): 

2248 root_path = os.fsencode(root_path) 

2249 

2250 for tree_path, entry in index.iteritems(): 

2251 full_path = _tree_to_fs_path(root_path, tree_path) 

2252 if isinstance(entry, ConflictedIndexEntry): 

2253 # Conflicted files are always unstaged 

2254 yield tree_path 

2255 continue 

2256 

2257 try: 

2258 st = os.lstat(full_path) 

2259 if stat.S_ISDIR(st.st_mode): 

2260 if _has_directory_changed(tree_path, entry): 

2261 yield tree_path 

2262 continue 

2263 

2264 if not stat.S_ISREG(st.st_mode) and not stat.S_ISLNK(st.st_mode): 

2265 continue 

2266 

2267 blob = blob_from_path_and_stat(full_path, st) 

2268 

2269 if filter_blob_callback is not None: 

2270 blob = filter_blob_callback(blob, tree_path) 

2271 except FileNotFoundError: 

2272 # The file was removed, so we assume that counts as 

2273 # different from whatever file used to exist. 

2274 yield tree_path 

2275 else: 

2276 if blob.id != entry.sha: 

2277 yield tree_path 

2278 

2279 

2280def _tree_to_fs_path( 

2281 root_path: bytes, tree_path: bytes, tree_encoding: str = "utf-8" 

2282) -> bytes: 

2283 """Convert a git tree path to a file system path. 

2284 

2285 Args: 

2286 root_path: Root filesystem path 

2287 tree_path: Git tree path as bytes (encoded with tree_encoding) 

2288 tree_encoding: Encoding used for tree paths (default: utf-8) 

2289 

2290 Returns: File system path. 

2291 """ 

2292 assert isinstance(tree_path, bytes) 

2293 if os_sep_bytes != b"/": 

2294 sep_corrected_path = tree_path.replace(b"/", os_sep_bytes) 

2295 else: 

2296 sep_corrected_path = tree_path 

2297 

2298 # On Windows, we need to handle tree path encoding properly 

2299 if sys.platform == "win32": 

2300 # Decode from tree encoding, then re-encode for filesystem 

2301 try: 

2302 tree_path_str = sep_corrected_path.decode(tree_encoding) 

2303 sep_corrected_path = os.fsencode(tree_path_str) 

2304 except UnicodeDecodeError: 

2305 # If decoding fails, use the original bytes 

2306 pass 

2307 

2308 return os.path.join(root_path, sep_corrected_path) 

2309 

2310 

2311def _fs_to_tree_path(fs_path: Union[str, bytes], tree_encoding: str = "utf-8") -> bytes: 

2312 """Convert a file system path to a git tree path. 

2313 

2314 Args: 

2315 fs_path: File system path. 

2316 tree_encoding: Encoding to use for tree paths (default: utf-8) 

2317 

2318 Returns: Git tree path as bytes (encoded with tree_encoding) 

2319 """ 

2320 if not isinstance(fs_path, bytes): 

2321 fs_path_bytes = os.fsencode(fs_path) 

2322 else: 

2323 fs_path_bytes = fs_path 

2324 

2325 # On Windows, we need to ensure tree paths are properly encoded 

2326 if sys.platform == "win32": 

2327 try: 

2328 # Decode from filesystem encoding, then re-encode with tree encoding 

2329 fs_path_str = os.fsdecode(fs_path_bytes) 

2330 fs_path_bytes = fs_path_str.encode(tree_encoding) 

2331 except UnicodeDecodeError: 

2332 # If filesystem decoding fails, use the original bytes 

2333 pass 

2334 

2335 if os_sep_bytes != b"/": 

2336 tree_path = fs_path_bytes.replace(os_sep_bytes, b"/") 

2337 else: 

2338 tree_path = fs_path_bytes 

2339 return tree_path 

2340 

2341 

2342def index_entry_from_directory(st: os.stat_result, path: bytes) -> Optional[IndexEntry]: 

2343 if os.path.exists(os.path.join(path, b".git")): 

2344 head = read_submodule_head(path) 

2345 if head is None: 

2346 return None 

2347 return index_entry_from_stat(st, head, mode=S_IFGITLINK) 

2348 return None 

2349 

2350 

2351def index_entry_from_path( 

2352 path: bytes, object_store: Optional[ObjectContainer] = None 

2353) -> Optional[IndexEntry]: 

2354 """Create an index from a filesystem path. 

2355 

2356 This returns an index value for files, symlinks 

2357 and tree references. for directories and 

2358 non-existent files it returns None 

2359 

2360 Args: 

2361 path: Path to create an index entry for 

2362 object_store: Optional object store to 

2363 save new blobs in 

2364 Returns: An index entry; None for directories 

2365 """ 

2366 assert isinstance(path, bytes) 

2367 st = os.lstat(path) 

2368 if stat.S_ISDIR(st.st_mode): 

2369 return index_entry_from_directory(st, path) 

2370 

2371 if stat.S_ISREG(st.st_mode) or stat.S_ISLNK(st.st_mode): 

2372 blob = blob_from_path_and_stat(path, st) 

2373 if object_store is not None: 

2374 object_store.add_object(blob) 

2375 return index_entry_from_stat(st, blob.id) 

2376 

2377 return None 

2378 

2379 

2380def iter_fresh_entries( 

2381 paths: Iterable[bytes], 

2382 root_path: bytes, 

2383 object_store: Optional[ObjectContainer] = None, 

2384) -> Iterator[tuple[bytes, Optional[IndexEntry]]]: 

2385 """Iterate over current versions of index entries on disk. 

2386 

2387 Args: 

2388 paths: Paths to iterate over 

2389 root_path: Root path to access from 

2390 object_store: Optional store to save new blobs in 

2391 Returns: Iterator over path, index_entry 

2392 """ 

2393 for path in paths: 

2394 p = _tree_to_fs_path(root_path, path) 

2395 try: 

2396 entry = index_entry_from_path(p, object_store=object_store) 

2397 except (FileNotFoundError, IsADirectoryError): 

2398 entry = None 

2399 yield path, entry 

2400 

2401 

2402def iter_fresh_objects( 

2403 paths: Iterable[bytes], 

2404 root_path: bytes, 

2405 include_deleted: bool = False, 

2406 object_store: Optional[ObjectContainer] = None, 

2407) -> Iterator[tuple[bytes, Optional[bytes], Optional[int]]]: 

2408 """Iterate over versions of objects on disk referenced by index. 

2409 

2410 Args: 

2411 root_path: Root path to access from 

2412 include_deleted: Include deleted entries with sha and 

2413 mode set to None 

2414 object_store: Optional object store to report new items to 

2415 Returns: Iterator over path, sha, mode 

2416 """ 

2417 for path, entry in iter_fresh_entries(paths, root_path, object_store=object_store): 

2418 if entry is None: 

2419 if include_deleted: 

2420 yield path, None, None 

2421 else: 

2422 yield path, entry.sha, cleanup_mode(entry.mode) 

2423 

2424 

2425def refresh_index(index: Index, root_path: bytes) -> None: 

2426 """Refresh the contents of an index. 

2427 

2428 This is the equivalent to running 'git commit -a'. 

2429 

2430 Args: 

2431 index: Index to update 

2432 root_path: Root filesystem path 

2433 """ 

2434 for path, entry in iter_fresh_entries(index, root_path): 

2435 if entry: 

2436 index[path] = entry 

2437 

2438 

2439class locked_index: 

2440 """Lock the index while making modifications. 

2441 

2442 Works as a context manager. 

2443 """ 

2444 

2445 _file: "_GitFile" 

2446 

2447 def __init__(self, path: Union[bytes, str]) -> None: 

2448 self._path = path 

2449 

2450 def __enter__(self) -> Index: 

2451 self._file = GitFile(self._path, "wb") 

2452 self._index = Index(self._path) 

2453 return self._index 

2454 

2455 def __exit__( 

2456 self, 

2457 exc_type: Optional[type], 

2458 exc_value: Optional[BaseException], 

2459 traceback: Optional[types.TracebackType], 

2460 ) -> None: 

2461 if exc_type is not None: 

2462 self._file.abort() 

2463 return 

2464 try: 

2465 from typing import BinaryIO, cast 

2466 

2467 f = SHA1Writer(cast(BinaryIO, self._file)) 

2468 write_index_dict(cast(BinaryIO, f), self._index._byname) 

2469 except BaseException: 

2470 self._file.abort() 

2471 else: 

2472 f.close()