Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/dulwich/index.py: 36%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

968 statements  

1# index.py -- File parser/writer for the git index file 

2# Copyright (C) 2008-2013 Jelmer Vernooij <jelmer@jelmer.uk> 

3# 

4# SPDX-License-Identifier: Apache-2.0 OR GPL-2.0-or-later 

5# Dulwich is dual-licensed under the Apache License, Version 2.0 and the GNU 

6# General Public License as public by the Free Software Foundation; version 2.0 

7# or (at your option) any later version. You can redistribute it and/or 

8# modify it under the terms of either of these two licenses. 

9# 

10# Unless required by applicable law or agreed to in writing, software 

11# distributed under the License is distributed on an "AS IS" BASIS, 

12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

13# See the License for the specific language governing permissions and 

14# limitations under the License. 

15# 

16# You should have received a copy of the licenses; if not, see 

17# <http://www.gnu.org/licenses/> for a copy of the GNU General Public License 

18# and <http://www.apache.org/licenses/LICENSE-2.0> for a copy of the Apache 

19# License, Version 2.0. 

20# 

21 

22"""Parser for the git index file format.""" 

23 

24import errno 

25import os 

26import shutil 

27import stat 

28import struct 

29import sys 

30import types 

31from collections.abc import Generator, Iterable, Iterator 

32from dataclasses import dataclass 

33from enum import Enum 

34from typing import ( 

35 TYPE_CHECKING, 

36 Any, 

37 BinaryIO, 

38 Callable, 

39 Optional, 

40 Union, 

41 cast, 

42) 

43 

44if TYPE_CHECKING: 

45 from .file import _GitFile 

46 from .line_ending import BlobNormalizer 

47 from .repo import Repo 

48 

49from .file import GitFile 

50from .object_store import iter_tree_contents 

51from .objects import ( 

52 S_IFGITLINK, 

53 S_ISGITLINK, 

54 Blob, 

55 ObjectID, 

56 Tree, 

57 hex_to_sha, 

58 sha_to_hex, 

59) 

60from .pack import ObjectContainer, SHA1Reader, SHA1Writer 

61 

62# 2-bit stage (during merge) 

63FLAG_STAGEMASK = 0x3000 

64FLAG_STAGESHIFT = 12 

65FLAG_NAMEMASK = 0x0FFF 

66 

67# assume-valid 

68FLAG_VALID = 0x8000 

69 

70# extended flag (must be zero in version 2) 

71FLAG_EXTENDED = 0x4000 

72 

73# used by sparse checkout 

74EXTENDED_FLAG_SKIP_WORKTREE = 0x4000 

75 

76# used by "git add -N" 

77EXTENDED_FLAG_INTEND_TO_ADD = 0x2000 

78 

79DEFAULT_VERSION = 2 

80 

81# Index extension signatures 

82TREE_EXTENSION = b"TREE" 

83REUC_EXTENSION = b"REUC" 

84UNTR_EXTENSION = b"UNTR" 

85EOIE_EXTENSION = b"EOIE" 

86IEOT_EXTENSION = b"IEOT" 

87 

88 

89def _encode_varint(value: int) -> bytes: 

90 """Encode an integer using variable-width encoding. 

91 

92 Same format as used for OFS_DELTA pack entries and index v4 path compression. 

93 Uses 7 bits per byte, with the high bit indicating continuation. 

94 

95 Args: 

96 value: Integer to encode 

97 Returns: 

98 Encoded bytes 

99 """ 

100 if value == 0: 

101 return b"\x00" 

102 

103 result = [] 

104 while value > 0: 

105 byte = value & 0x7F # Take lower 7 bits 

106 value >>= 7 

107 if value > 0: 

108 byte |= 0x80 # Set continuation bit 

109 result.append(byte) 

110 

111 return bytes(result) 

112 

113 

114def _decode_varint(data: bytes, offset: int = 0) -> tuple[int, int]: 

115 """Decode a variable-width encoded integer. 

116 

117 Args: 

118 data: Bytes to decode from 

119 offset: Starting offset in data 

120 Returns: 

121 tuple of (decoded_value, new_offset) 

122 """ 

123 value = 0 

124 shift = 0 

125 pos = offset 

126 

127 while pos < len(data): 

128 byte = data[pos] 

129 pos += 1 

130 value |= (byte & 0x7F) << shift 

131 shift += 7 

132 if not (byte & 0x80): # No continuation bit 

133 break 

134 

135 return value, pos 

136 

137 

138def _compress_path(path: bytes, previous_path: bytes) -> bytes: 

139 """Compress a path relative to the previous path for index version 4. 

140 

141 Args: 

142 path: Path to compress 

143 previous_path: Previous path for comparison 

144 Returns: 

145 Compressed path data (varint prefix_len + suffix) 

146 """ 

147 # Find the common prefix length 

148 common_len = 0 

149 min_len = min(len(path), len(previous_path)) 

150 

151 for i in range(min_len): 

152 if path[i] == previous_path[i]: 

153 common_len += 1 

154 else: 

155 break 

156 

157 # The number of bytes to remove from the end of previous_path 

158 # to get the common prefix 

159 remove_len = len(previous_path) - common_len 

160 

161 # The suffix to append 

162 suffix = path[common_len:] 

163 

164 # Encode: varint(remove_len) + suffix + NUL 

165 return _encode_varint(remove_len) + suffix + b"\x00" 

166 

167 

168def _decompress_path( 

169 data: bytes, offset: int, previous_path: bytes 

170) -> tuple[bytes, int]: 

171 """Decompress a path from index version 4 compressed format. 

172 

173 Args: 

174 data: Raw data containing compressed path 

175 offset: Starting offset in data 

176 previous_path: Previous path for decompression 

177 Returns: 

178 tuple of (decompressed_path, new_offset) 

179 """ 

180 # Decode the number of bytes to remove from previous path 

181 remove_len, new_offset = _decode_varint(data, offset) 

182 

183 # Find the NUL terminator for the suffix 

184 suffix_start = new_offset 

185 suffix_end = suffix_start 

186 while suffix_end < len(data) and data[suffix_end] != 0: 

187 suffix_end += 1 

188 

189 if suffix_end >= len(data): 

190 raise ValueError("Unterminated path suffix in compressed entry") 

191 

192 suffix = data[suffix_start:suffix_end] 

193 new_offset = suffix_end + 1 # Skip the NUL terminator 

194 

195 # Reconstruct the path 

196 if remove_len > len(previous_path): 

197 raise ValueError( 

198 f"Invalid path compression: trying to remove {remove_len} bytes from {len(previous_path)}-byte path" 

199 ) 

200 

201 prefix = previous_path[:-remove_len] if remove_len > 0 else previous_path 

202 path = prefix + suffix 

203 

204 return path, new_offset 

205 

206 

207def _decompress_path_from_stream( 

208 f: BinaryIO, previous_path: bytes 

209) -> tuple[bytes, int]: 

210 """Decompress a path from index version 4 compressed format, reading from stream. 

211 

212 Args: 

213 f: File-like object to read from 

214 previous_path: Previous path for decompression 

215 Returns: 

216 tuple of (decompressed_path, bytes_consumed) 

217 """ 

218 # Decode the varint for remove_len by reading byte by byte 

219 remove_len = 0 

220 shift = 0 

221 bytes_consumed = 0 

222 

223 while True: 

224 byte_data = f.read(1) 

225 if not byte_data: 

226 raise ValueError("Unexpected end of file while reading varint") 

227 byte = byte_data[0] 

228 bytes_consumed += 1 

229 remove_len |= (byte & 0x7F) << shift 

230 shift += 7 

231 if not (byte & 0x80): # No continuation bit 

232 break 

233 

234 # Read the suffix until NUL terminator 

235 suffix = b"" 

236 while True: 

237 byte_data = f.read(1) 

238 if not byte_data: 

239 raise ValueError("Unexpected end of file while reading path suffix") 

240 byte = byte_data[0] 

241 bytes_consumed += 1 

242 if byte == 0: # NUL terminator 

243 break 

244 suffix += bytes([byte]) 

245 

246 # Reconstruct the path 

247 if remove_len > len(previous_path): 

248 raise ValueError( 

249 f"Invalid path compression: trying to remove {remove_len} bytes from {len(previous_path)}-byte path" 

250 ) 

251 

252 prefix = previous_path[:-remove_len] if remove_len > 0 else previous_path 

253 path = prefix + suffix 

254 

255 return path, bytes_consumed 

256 

257 

258class Stage(Enum): 

259 NORMAL = 0 

260 MERGE_CONFLICT_ANCESTOR = 1 

261 MERGE_CONFLICT_THIS = 2 

262 MERGE_CONFLICT_OTHER = 3 

263 

264 

265@dataclass 

266class SerializedIndexEntry: 

267 name: bytes 

268 ctime: Union[int, float, tuple[int, int]] 

269 mtime: Union[int, float, tuple[int, int]] 

270 dev: int 

271 ino: int 

272 mode: int 

273 uid: int 

274 gid: int 

275 size: int 

276 sha: bytes 

277 flags: int 

278 extended_flags: int 

279 

280 def stage(self) -> Stage: 

281 return Stage((self.flags & FLAG_STAGEMASK) >> FLAG_STAGESHIFT) 

282 

283 

284@dataclass 

285class IndexExtension: 

286 """Base class for index extensions.""" 

287 

288 signature: bytes 

289 data: bytes 

290 

291 @classmethod 

292 def from_raw(cls, signature: bytes, data: bytes) -> "IndexExtension": 

293 """Create an extension from raw data. 

294 

295 Args: 

296 signature: 4-byte extension signature 

297 data: Extension data 

298 Returns: 

299 Parsed extension object 

300 """ 

301 if signature == TREE_EXTENSION: 

302 return TreeExtension.from_bytes(data) 

303 elif signature == REUC_EXTENSION: 

304 return ResolveUndoExtension.from_bytes(data) 

305 elif signature == UNTR_EXTENSION: 

306 return UntrackedExtension.from_bytes(data) 

307 else: 

308 # Unknown extension - just store raw data 

309 return cls(signature, data) 

310 

311 def to_bytes(self) -> bytes: 

312 """Serialize extension to bytes.""" 

313 return self.data 

314 

315 

316class TreeExtension(IndexExtension): 

317 """Tree cache extension.""" 

318 

319 def __init__(self, entries: list[tuple[bytes, bytes, int]]) -> None: 

320 self.entries = entries 

321 super().__init__(TREE_EXTENSION, b"") 

322 

323 @classmethod 

324 def from_bytes(cls, data: bytes) -> "TreeExtension": 

325 # TODO: Implement tree cache parsing 

326 return cls([]) 

327 

328 def to_bytes(self) -> bytes: 

329 # TODO: Implement tree cache serialization 

330 return b"" 

331 

332 

333class ResolveUndoExtension(IndexExtension): 

334 """Resolve undo extension for recording merge conflicts.""" 

335 

336 def __init__(self, entries: list[tuple[bytes, list[tuple[int, bytes]]]]) -> None: 

337 self.entries = entries 

338 super().__init__(REUC_EXTENSION, b"") 

339 

340 @classmethod 

341 def from_bytes(cls, data: bytes) -> "ResolveUndoExtension": 

342 # TODO: Implement resolve undo parsing 

343 return cls([]) 

344 

345 def to_bytes(self) -> bytes: 

346 # TODO: Implement resolve undo serialization 

347 return b"" 

348 

349 

350class UntrackedExtension(IndexExtension): 

351 """Untracked cache extension.""" 

352 

353 def __init__(self, data: bytes) -> None: 

354 super().__init__(UNTR_EXTENSION, data) 

355 

356 @classmethod 

357 def from_bytes(cls, data: bytes) -> "UntrackedExtension": 

358 return cls(data) 

359 

360 

361@dataclass 

362class IndexEntry: 

363 ctime: Union[int, float, tuple[int, int]] 

364 mtime: Union[int, float, tuple[int, int]] 

365 dev: int 

366 ino: int 

367 mode: int 

368 uid: int 

369 gid: int 

370 size: int 

371 sha: bytes 

372 flags: int = 0 

373 extended_flags: int = 0 

374 

375 @classmethod 

376 def from_serialized(cls, serialized: SerializedIndexEntry) -> "IndexEntry": 

377 return cls( 

378 ctime=serialized.ctime, 

379 mtime=serialized.mtime, 

380 dev=serialized.dev, 

381 ino=serialized.ino, 

382 mode=serialized.mode, 

383 uid=serialized.uid, 

384 gid=serialized.gid, 

385 size=serialized.size, 

386 sha=serialized.sha, 

387 flags=serialized.flags, 

388 extended_flags=serialized.extended_flags, 

389 ) 

390 

391 def serialize(self, name: bytes, stage: Stage) -> SerializedIndexEntry: 

392 # Clear out any existing stage bits, then set them from the Stage. 

393 new_flags = self.flags & ~FLAG_STAGEMASK 

394 new_flags |= stage.value << FLAG_STAGESHIFT 

395 return SerializedIndexEntry( 

396 name=name, 

397 ctime=self.ctime, 

398 mtime=self.mtime, 

399 dev=self.dev, 

400 ino=self.ino, 

401 mode=self.mode, 

402 uid=self.uid, 

403 gid=self.gid, 

404 size=self.size, 

405 sha=self.sha, 

406 flags=new_flags, 

407 extended_flags=self.extended_flags, 

408 ) 

409 

410 def stage(self) -> Stage: 

411 return Stage((self.flags & FLAG_STAGEMASK) >> FLAG_STAGESHIFT) 

412 

413 @property 

414 def skip_worktree(self) -> bool: 

415 """Return True if the skip-worktree bit is set in extended_flags.""" 

416 return bool(self.extended_flags & EXTENDED_FLAG_SKIP_WORKTREE) 

417 

418 def set_skip_worktree(self, skip: bool = True) -> None: 

419 """Helper method to set or clear the skip-worktree bit in extended_flags. 

420 Also sets FLAG_EXTENDED in self.flags if needed. 

421 """ 

422 if skip: 

423 # Turn on the skip-worktree bit 

424 self.extended_flags |= EXTENDED_FLAG_SKIP_WORKTREE 

425 # Also ensure the main 'extended' bit is set in flags 

426 self.flags |= FLAG_EXTENDED 

427 else: 

428 # Turn off the skip-worktree bit 

429 self.extended_flags &= ~EXTENDED_FLAG_SKIP_WORKTREE 

430 # Optionally unset the main extended bit if no extended flags remain 

431 if self.extended_flags == 0: 

432 self.flags &= ~FLAG_EXTENDED 

433 

434 

435class ConflictedIndexEntry: 

436 """Index entry that represents a conflict.""" 

437 

438 ancestor: Optional[IndexEntry] 

439 this: Optional[IndexEntry] 

440 other: Optional[IndexEntry] 

441 

442 def __init__( 

443 self, 

444 ancestor: Optional[IndexEntry] = None, 

445 this: Optional[IndexEntry] = None, 

446 other: Optional[IndexEntry] = None, 

447 ) -> None: 

448 self.ancestor = ancestor 

449 self.this = this 

450 self.other = other 

451 

452 

453class UnmergedEntries(Exception): 

454 """Unmerged entries exist in the index.""" 

455 

456 

457def pathsplit(path: bytes) -> tuple[bytes, bytes]: 

458 """Split a /-delimited path into a directory part and a basename. 

459 

460 Args: 

461 path: The path to split. 

462 

463 Returns: 

464 Tuple with directory name and basename 

465 """ 

466 try: 

467 (dirname, basename) = path.rsplit(b"/", 1) 

468 except ValueError: 

469 return (b"", path) 

470 else: 

471 return (dirname, basename) 

472 

473 

474def pathjoin(*args: bytes) -> bytes: 

475 """Join a /-delimited path.""" 

476 return b"/".join([p for p in args if p]) 

477 

478 

479def read_cache_time(f: BinaryIO) -> tuple[int, int]: 

480 """Read a cache time. 

481 

482 Args: 

483 f: File-like object to read from 

484 Returns: 

485 Tuple with seconds and nanoseconds 

486 """ 

487 return struct.unpack(">LL", f.read(8)) 

488 

489 

490def write_cache_time(f: BinaryIO, t: Union[int, float, tuple[int, int]]) -> None: 

491 """Write a cache time. 

492 

493 Args: 

494 f: File-like object to write to 

495 t: Time to write (as int, float or tuple with secs and nsecs) 

496 """ 

497 if isinstance(t, int): 

498 t = (t, 0) 

499 elif isinstance(t, float): 

500 (secs, nsecs) = divmod(t, 1.0) 

501 t = (int(secs), int(nsecs * 1000000000)) 

502 elif not isinstance(t, tuple): 

503 raise TypeError(t) 

504 f.write(struct.pack(">LL", *t)) 

505 

506 

507def read_cache_entry( 

508 f: BinaryIO, version: int, previous_path: bytes = b"" 

509) -> SerializedIndexEntry: 

510 """Read an entry from a cache file. 

511 

512 Args: 

513 f: File-like object to read from 

514 version: Index version 

515 previous_path: Previous entry's path (for version 4 compression) 

516 """ 

517 beginoffset = f.tell() 

518 ctime = read_cache_time(f) 

519 mtime = read_cache_time(f) 

520 ( 

521 dev, 

522 ino, 

523 mode, 

524 uid, 

525 gid, 

526 size, 

527 sha, 

528 flags, 

529 ) = struct.unpack(">LLLLLL20sH", f.read(20 + 4 * 6 + 2)) 

530 if flags & FLAG_EXTENDED: 

531 if version < 3: 

532 raise AssertionError("extended flag set in index with version < 3") 

533 (extended_flags,) = struct.unpack(">H", f.read(2)) 

534 else: 

535 extended_flags = 0 

536 

537 if version >= 4: 

538 # Version 4: paths are always compressed (name_len should be 0) 

539 name, consumed = _decompress_path_from_stream(f, previous_path) 

540 else: 

541 # Versions < 4: regular name reading 

542 name = f.read(flags & FLAG_NAMEMASK) 

543 

544 # Padding: 

545 if version < 4: 

546 real_size = (f.tell() - beginoffset + 8) & ~7 

547 f.read((beginoffset + real_size) - f.tell()) 

548 

549 return SerializedIndexEntry( 

550 name, 

551 ctime, 

552 mtime, 

553 dev, 

554 ino, 

555 mode, 

556 uid, 

557 gid, 

558 size, 

559 sha_to_hex(sha), 

560 flags & ~FLAG_NAMEMASK, 

561 extended_flags, 

562 ) 

563 

564 

565def write_cache_entry( 

566 f: BinaryIO, entry: SerializedIndexEntry, version: int, previous_path: bytes = b"" 

567) -> None: 

568 """Write an index entry to a file. 

569 

570 Args: 

571 f: File object 

572 entry: IndexEntry to write 

573 version: Index format version 

574 previous_path: Previous entry's path (for version 4 compression) 

575 """ 

576 beginoffset = f.tell() 

577 write_cache_time(f, entry.ctime) 

578 write_cache_time(f, entry.mtime) 

579 

580 if version >= 4: 

581 # Version 4: use compression but set name_len to actual filename length 

582 # This matches how C Git implements index v4 flags 

583 compressed_path = _compress_path(entry.name, previous_path) 

584 flags = len(entry.name) | (entry.flags & ~FLAG_NAMEMASK) 

585 else: 

586 # Versions < 4: include actual name length 

587 flags = len(entry.name) | (entry.flags & ~FLAG_NAMEMASK) 

588 

589 if entry.extended_flags: 

590 flags |= FLAG_EXTENDED 

591 if flags & FLAG_EXTENDED and version is not None and version < 3: 

592 raise AssertionError("unable to use extended flags in version < 3") 

593 

594 f.write( 

595 struct.pack( 

596 b">LLLLLL20sH", 

597 entry.dev & 0xFFFFFFFF, 

598 entry.ino & 0xFFFFFFFF, 

599 entry.mode, 

600 entry.uid, 

601 entry.gid, 

602 entry.size, 

603 hex_to_sha(entry.sha), 

604 flags, 

605 ) 

606 ) 

607 if flags & FLAG_EXTENDED: 

608 f.write(struct.pack(b">H", entry.extended_flags)) 

609 

610 if version >= 4: 

611 # Version 4: always write compressed path 

612 f.write(compressed_path) 

613 else: 

614 # Versions < 4: write regular path and padding 

615 f.write(entry.name) 

616 real_size = (f.tell() - beginoffset + 8) & ~7 

617 f.write(b"\0" * ((beginoffset + real_size) - f.tell())) 

618 

619 

620class UnsupportedIndexFormat(Exception): 

621 """An unsupported index format was encountered.""" 

622 

623 def __init__(self, version: int) -> None: 

624 self.index_format_version = version 

625 

626 

627def read_index_header(f: BinaryIO) -> tuple[int, int]: 

628 """Read an index header from a file. 

629 

630 Returns: 

631 tuple of (version, num_entries) 

632 """ 

633 header = f.read(4) 

634 if header != b"DIRC": 

635 raise AssertionError(f"Invalid index file header: {header!r}") 

636 (version, num_entries) = struct.unpack(b">LL", f.read(4 * 2)) 

637 if version not in (1, 2, 3, 4): 

638 raise UnsupportedIndexFormat(version) 

639 return version, num_entries 

640 

641 

642def write_index_extension(f: BinaryIO, extension: IndexExtension) -> None: 

643 """Write an index extension. 

644 

645 Args: 

646 f: File-like object to write to 

647 extension: Extension to write 

648 """ 

649 data = extension.to_bytes() 

650 f.write(extension.signature) 

651 f.write(struct.pack(">I", len(data))) 

652 f.write(data) 

653 

654 

655def read_index(f: BinaryIO) -> Iterator[SerializedIndexEntry]: 

656 """Read an index file, yielding the individual entries.""" 

657 version, num_entries = read_index_header(f) 

658 previous_path = b"" 

659 for i in range(num_entries): 

660 entry = read_cache_entry(f, version, previous_path) 

661 previous_path = entry.name 

662 yield entry 

663 

664 

665def read_index_dict_with_version( 

666 f: BinaryIO, 

667) -> tuple[ 

668 dict[bytes, Union[IndexEntry, ConflictedIndexEntry]], int, list[IndexExtension] 

669]: 

670 """Read an index file and return it as a dictionary along with the version. 

671 

672 Returns: 

673 tuple of (entries_dict, version, extensions) 

674 """ 

675 version, num_entries = read_index_header(f) 

676 

677 ret: dict[bytes, Union[IndexEntry, ConflictedIndexEntry]] = {} 

678 previous_path = b"" 

679 for i in range(num_entries): 

680 entry = read_cache_entry(f, version, previous_path) 

681 previous_path = entry.name 

682 stage = entry.stage() 

683 if stage == Stage.NORMAL: 

684 ret[entry.name] = IndexEntry.from_serialized(entry) 

685 else: 

686 existing = ret.setdefault(entry.name, ConflictedIndexEntry()) 

687 if isinstance(existing, IndexEntry): 

688 raise AssertionError(f"Non-conflicted entry for {entry.name!r} exists") 

689 if stage == Stage.MERGE_CONFLICT_ANCESTOR: 

690 existing.ancestor = IndexEntry.from_serialized(entry) 

691 elif stage == Stage.MERGE_CONFLICT_THIS: 

692 existing.this = IndexEntry.from_serialized(entry) 

693 elif stage == Stage.MERGE_CONFLICT_OTHER: 

694 existing.other = IndexEntry.from_serialized(entry) 

695 

696 # Read extensions 

697 extensions = [] 

698 while True: 

699 # Check if we're at the end (20 bytes before EOF for SHA checksum) 

700 current_pos = f.tell() 

701 f.seek(0, 2) # EOF 

702 eof_pos = f.tell() 

703 f.seek(current_pos) 

704 

705 if current_pos >= eof_pos - 20: 

706 break 

707 

708 # Try to read extension signature 

709 signature = f.read(4) 

710 if len(signature) < 4: 

711 break 

712 

713 # Check if it's a valid extension signature (4 uppercase letters) 

714 if not all(65 <= b <= 90 for b in signature): 

715 # Not an extension, seek back 

716 f.seek(-4, 1) 

717 break 

718 

719 # Read extension size 

720 size_data = f.read(4) 

721 if len(size_data) < 4: 

722 break 

723 size = struct.unpack(">I", size_data)[0] 

724 

725 # Read extension data 

726 data = f.read(size) 

727 if len(data) < size: 

728 break 

729 

730 extension = IndexExtension.from_raw(signature, data) 

731 extensions.append(extension) 

732 

733 return ret, version, extensions 

734 

735 

736def read_index_dict( 

737 f: BinaryIO, 

738) -> dict[bytes, Union[IndexEntry, ConflictedIndexEntry]]: 

739 """Read an index file and return it as a dictionary. 

740 Dict Key is tuple of path and stage number, as 

741 path alone is not unique 

742 Args: 

743 f: File object to read fromls. 

744 """ 

745 ret: dict[bytes, Union[IndexEntry, ConflictedIndexEntry]] = {} 

746 for entry in read_index(f): 

747 stage = entry.stage() 

748 if stage == Stage.NORMAL: 

749 ret[entry.name] = IndexEntry.from_serialized(entry) 

750 else: 

751 existing = ret.setdefault(entry.name, ConflictedIndexEntry()) 

752 if isinstance(existing, IndexEntry): 

753 raise AssertionError(f"Non-conflicted entry for {entry.name!r} exists") 

754 if stage == Stage.MERGE_CONFLICT_ANCESTOR: 

755 existing.ancestor = IndexEntry.from_serialized(entry) 

756 elif stage == Stage.MERGE_CONFLICT_THIS: 

757 existing.this = IndexEntry.from_serialized(entry) 

758 elif stage == Stage.MERGE_CONFLICT_OTHER: 

759 existing.other = IndexEntry.from_serialized(entry) 

760 return ret 

761 

762 

763def write_index( 

764 f: BinaryIO, 

765 entries: list[SerializedIndexEntry], 

766 version: Optional[int] = None, 

767 extensions: Optional[list[IndexExtension]] = None, 

768) -> None: 

769 """Write an index file. 

770 

771 Args: 

772 f: File-like object to write to 

773 version: Version number to write 

774 entries: Iterable over the entries to write 

775 extensions: Optional list of extensions to write 

776 """ 

777 if version is None: 

778 version = DEFAULT_VERSION 

779 # STEP 1: check if any extended_flags are set 

780 uses_extended_flags = any(e.extended_flags != 0 for e in entries) 

781 if uses_extended_flags and version < 3: 

782 # Force or bump the version to 3 

783 version = 3 

784 # The rest is unchanged, but you might insert a final check: 

785 if version < 3: 

786 # Double-check no extended flags appear 

787 for e in entries: 

788 if e.extended_flags != 0: 

789 raise AssertionError("Attempt to use extended flags in index < v3") 

790 # Proceed with the existing code to write the header and entries. 

791 f.write(b"DIRC") 

792 f.write(struct.pack(b">LL", version, len(entries))) 

793 previous_path = b"" 

794 for entry in entries: 

795 write_cache_entry(f, entry, version=version, previous_path=previous_path) 

796 previous_path = entry.name 

797 

798 # Write extensions 

799 if extensions: 

800 for extension in extensions: 

801 write_index_extension(f, extension) 

802 

803 

804def write_index_dict( 

805 f: BinaryIO, 

806 entries: dict[bytes, Union[IndexEntry, ConflictedIndexEntry]], 

807 version: Optional[int] = None, 

808 extensions: Optional[list[IndexExtension]] = None, 

809) -> None: 

810 """Write an index file based on the contents of a dictionary. 

811 being careful to sort by path and then by stage. 

812 """ 

813 entries_list = [] 

814 for key in sorted(entries): 

815 value = entries[key] 

816 if isinstance(value, ConflictedIndexEntry): 

817 if value.ancestor is not None: 

818 entries_list.append( 

819 value.ancestor.serialize(key, Stage.MERGE_CONFLICT_ANCESTOR) 

820 ) 

821 if value.this is not None: 

822 entries_list.append( 

823 value.this.serialize(key, Stage.MERGE_CONFLICT_THIS) 

824 ) 

825 if value.other is not None: 

826 entries_list.append( 

827 value.other.serialize(key, Stage.MERGE_CONFLICT_OTHER) 

828 ) 

829 else: 

830 entries_list.append(value.serialize(key, Stage.NORMAL)) 

831 

832 write_index(f, entries_list, version=version, extensions=extensions) 

833 

834 

835def cleanup_mode(mode: int) -> int: 

836 """Cleanup a mode value. 

837 

838 This will return a mode that can be stored in a tree object. 

839 

840 Args: 

841 mode: Mode to clean up. 

842 

843 Returns: 

844 mode 

845 """ 

846 if stat.S_ISLNK(mode): 

847 return stat.S_IFLNK 

848 elif stat.S_ISDIR(mode): 

849 return stat.S_IFDIR 

850 elif S_ISGITLINK(mode): 

851 return S_IFGITLINK 

852 ret = stat.S_IFREG | 0o644 

853 if mode & 0o100: 

854 ret |= 0o111 

855 return ret 

856 

857 

858class Index: 

859 """A Git Index file.""" 

860 

861 _byname: dict[bytes, Union[IndexEntry, ConflictedIndexEntry]] 

862 

863 def __init__( 

864 self, 

865 filename: Union[bytes, str, os.PathLike], 

866 read: bool = True, 

867 skip_hash: bool = False, 

868 version: Optional[int] = None, 

869 ) -> None: 

870 """Create an index object associated with the given filename. 

871 

872 Args: 

873 filename: Path to the index file 

874 read: Whether to initialize the index from the given file, should it exist. 

875 skip_hash: Whether to skip SHA1 hash when writing (for manyfiles feature) 

876 version: Index format version to use (None = auto-detect from file or use default) 

877 """ 

878 self._filename = os.fspath(filename) 

879 # TODO(jelmer): Store the version returned by read_index 

880 self._version = version 

881 self._skip_hash = skip_hash 

882 self._extensions: list[IndexExtension] = [] 

883 self.clear() 

884 if read: 

885 self.read() 

886 

887 @property 

888 def path(self) -> Union[bytes, str]: 

889 return self._filename 

890 

891 def __repr__(self) -> str: 

892 return f"{self.__class__.__name__}({self._filename!r})" 

893 

894 def write(self) -> None: 

895 """Write current contents of index to disk.""" 

896 from typing import BinaryIO, cast 

897 

898 f = GitFile(self._filename, "wb") 

899 try: 

900 # Filter out extensions with no meaningful data 

901 meaningful_extensions = [] 

902 for ext in self._extensions: 

903 # Skip extensions that have empty data 

904 ext_data = ext.to_bytes() 

905 if ext_data: 

906 meaningful_extensions.append(ext) 

907 

908 if self._skip_hash: 

909 # When skipHash is enabled, write the index without computing SHA1 

910 write_index_dict( 

911 cast(BinaryIO, f), 

912 self._byname, 

913 version=self._version, 

914 extensions=meaningful_extensions, 

915 ) 

916 # Write 20 zero bytes instead of SHA1 

917 f.write(b"\x00" * 20) 

918 f.close() 

919 else: 

920 sha1_writer = SHA1Writer(cast(BinaryIO, f)) 

921 write_index_dict( 

922 cast(BinaryIO, sha1_writer), 

923 self._byname, 

924 version=self._version, 

925 extensions=meaningful_extensions, 

926 ) 

927 sha1_writer.close() 

928 except: 

929 f.close() 

930 raise 

931 

932 def read(self) -> None: 

933 """Read current contents of index from disk.""" 

934 if not os.path.exists(self._filename): 

935 return 

936 f = GitFile(self._filename, "rb") 

937 try: 

938 sha1_reader = SHA1Reader(f) 

939 entries, version, extensions = read_index_dict_with_version( 

940 cast(BinaryIO, sha1_reader) 

941 ) 

942 self._version = version 

943 self._extensions = extensions 

944 self.update(entries) 

945 # Extensions have already been read by read_index_dict_with_version 

946 sha1_reader.check_sha(allow_empty=True) 

947 finally: 

948 f.close() 

949 

950 def __len__(self) -> int: 

951 """Number of entries in this index file.""" 

952 return len(self._byname) 

953 

954 def __getitem__(self, key: bytes) -> Union[IndexEntry, ConflictedIndexEntry]: 

955 """Retrieve entry by relative path and stage. 

956 

957 Returns: Either a IndexEntry or a ConflictedIndexEntry 

958 Raises KeyError: if the entry does not exist 

959 """ 

960 return self._byname[key] 

961 

962 def __iter__(self) -> Iterator[bytes]: 

963 """Iterate over the paths and stages in this index.""" 

964 return iter(self._byname) 

965 

966 def __contains__(self, key: bytes) -> bool: 

967 return key in self._byname 

968 

969 def get_sha1(self, path: bytes) -> bytes: 

970 """Return the (git object) SHA1 for the object at a path.""" 

971 value = self[path] 

972 if isinstance(value, ConflictedIndexEntry): 

973 raise UnmergedEntries 

974 return value.sha 

975 

976 def get_mode(self, path: bytes) -> int: 

977 """Return the POSIX file mode for the object at a path.""" 

978 value = self[path] 

979 if isinstance(value, ConflictedIndexEntry): 

980 raise UnmergedEntries 

981 return value.mode 

982 

983 def iterobjects(self) -> Iterable[tuple[bytes, bytes, int]]: 

984 """Iterate over path, sha, mode tuples for use with commit_tree.""" 

985 for path in self: 

986 entry = self[path] 

987 if isinstance(entry, ConflictedIndexEntry): 

988 raise UnmergedEntries 

989 yield path, entry.sha, cleanup_mode(entry.mode) 

990 

991 def has_conflicts(self) -> bool: 

992 for value in self._byname.values(): 

993 if isinstance(value, ConflictedIndexEntry): 

994 return True 

995 return False 

996 

997 def clear(self) -> None: 

998 """Remove all contents from this index.""" 

999 self._byname = {} 

1000 

1001 def __setitem__( 

1002 self, name: bytes, value: Union[IndexEntry, ConflictedIndexEntry] 

1003 ) -> None: 

1004 assert isinstance(name, bytes) 

1005 self._byname[name] = value 

1006 

1007 def __delitem__(self, name: bytes) -> None: 

1008 del self._byname[name] 

1009 

1010 def iteritems( 

1011 self, 

1012 ) -> Iterator[tuple[bytes, Union[IndexEntry, ConflictedIndexEntry]]]: 

1013 return iter(self._byname.items()) 

1014 

1015 def items(self) -> Iterator[tuple[bytes, Union[IndexEntry, ConflictedIndexEntry]]]: 

1016 return iter(self._byname.items()) 

1017 

1018 def update( 

1019 self, entries: dict[bytes, Union[IndexEntry, ConflictedIndexEntry]] 

1020 ) -> None: 

1021 for key, value in entries.items(): 

1022 self[key] = value 

1023 

1024 def paths(self) -> Generator[bytes, None, None]: 

1025 yield from self._byname.keys() 

1026 

1027 def changes_from_tree( 

1028 self, 

1029 object_store: ObjectContainer, 

1030 tree: ObjectID, 

1031 want_unchanged: bool = False, 

1032 ) -> Generator[ 

1033 tuple[ 

1034 tuple[Optional[bytes], Optional[bytes]], 

1035 tuple[Optional[int], Optional[int]], 

1036 tuple[Optional[bytes], Optional[bytes]], 

1037 ], 

1038 None, 

1039 None, 

1040 ]: 

1041 """Find the differences between the contents of this index and a tree. 

1042 

1043 Args: 

1044 object_store: Object store to use for retrieving tree contents 

1045 tree: SHA1 of the root tree 

1046 want_unchanged: Whether unchanged files should be reported 

1047 Returns: Iterator over tuples with (oldpath, newpath), (oldmode, 

1048 newmode), (oldsha, newsha) 

1049 """ 

1050 

1051 def lookup_entry(path: bytes) -> tuple[bytes, int]: 

1052 entry = self[path] 

1053 if hasattr(entry, "sha") and hasattr(entry, "mode"): 

1054 return entry.sha, cleanup_mode(entry.mode) 

1055 else: 

1056 # Handle ConflictedIndexEntry case 

1057 return b"", 0 

1058 

1059 yield from changes_from_tree( 

1060 self.paths(), 

1061 lookup_entry, 

1062 object_store, 

1063 tree, 

1064 want_unchanged=want_unchanged, 

1065 ) 

1066 

1067 def commit(self, object_store: ObjectContainer) -> bytes: 

1068 """Create a new tree from an index. 

1069 

1070 Args: 

1071 object_store: Object store to save the tree in 

1072 Returns: 

1073 Root tree SHA 

1074 """ 

1075 return commit_tree(object_store, self.iterobjects()) 

1076 

1077 

1078def commit_tree( 

1079 object_store: ObjectContainer, blobs: Iterable[tuple[bytes, bytes, int]] 

1080) -> bytes: 

1081 """Commit a new tree. 

1082 

1083 Args: 

1084 object_store: Object store to add trees to 

1085 blobs: Iterable over blob path, sha, mode entries 

1086 Returns: 

1087 SHA1 of the created tree. 

1088 """ 

1089 trees: dict[bytes, Any] = {b"": {}} 

1090 

1091 def add_tree(path: bytes) -> dict[bytes, Any]: 

1092 if path in trees: 

1093 return trees[path] 

1094 dirname, basename = pathsplit(path) 

1095 t = add_tree(dirname) 

1096 assert isinstance(basename, bytes) 

1097 newtree: dict[bytes, Any] = {} 

1098 t[basename] = newtree 

1099 trees[path] = newtree 

1100 return newtree 

1101 

1102 for path, sha, mode in blobs: 

1103 tree_path, basename = pathsplit(path) 

1104 tree = add_tree(tree_path) 

1105 tree[basename] = (mode, sha) 

1106 

1107 def build_tree(path: bytes) -> bytes: 

1108 tree = Tree() 

1109 for basename, entry in trees[path].items(): 

1110 if isinstance(entry, dict): 

1111 mode = stat.S_IFDIR 

1112 sha = build_tree(pathjoin(path, basename)) 

1113 else: 

1114 (mode, sha) = entry 

1115 tree.add(basename, mode, sha) 

1116 object_store.add_object(tree) 

1117 return tree.id 

1118 

1119 return build_tree(b"") 

1120 

1121 

1122def commit_index(object_store: ObjectContainer, index: Index) -> bytes: 

1123 """Create a new tree from an index. 

1124 

1125 Args: 

1126 object_store: Object store to save the tree in 

1127 index: Index file 

1128 Note: This function is deprecated, use index.commit() instead. 

1129 Returns: Root tree sha. 

1130 """ 

1131 return commit_tree(object_store, index.iterobjects()) 

1132 

1133 

1134def changes_from_tree( 

1135 names: Iterable[bytes], 

1136 lookup_entry: Callable[[bytes], tuple[bytes, int]], 

1137 object_store: ObjectContainer, 

1138 tree: Optional[bytes], 

1139 want_unchanged: bool = False, 

1140) -> Iterable[ 

1141 tuple[ 

1142 tuple[Optional[bytes], Optional[bytes]], 

1143 tuple[Optional[int], Optional[int]], 

1144 tuple[Optional[bytes], Optional[bytes]], 

1145 ] 

1146]: 

1147 """Find the differences between the contents of a tree and 

1148 a working copy. 

1149 

1150 Args: 

1151 names: Iterable of names in the working copy 

1152 lookup_entry: Function to lookup an entry in the working copy 

1153 object_store: Object store to use for retrieving tree contents 

1154 tree: SHA1 of the root tree, or None for an empty tree 

1155 want_unchanged: Whether unchanged files should be reported 

1156 Returns: Iterator over tuples with (oldpath, newpath), (oldmode, newmode), 

1157 (oldsha, newsha) 

1158 """ 

1159 # TODO(jelmer): Support a include_trees option 

1160 other_names = set(names) 

1161 

1162 if tree is not None: 

1163 for name, mode, sha in iter_tree_contents(object_store, tree): 

1164 try: 

1165 (other_sha, other_mode) = lookup_entry(name) 

1166 except KeyError: 

1167 # Was removed 

1168 yield ((name, None), (mode, None), (sha, None)) 

1169 else: 

1170 other_names.remove(name) 

1171 if want_unchanged or other_sha != sha or other_mode != mode: 

1172 yield ((name, name), (mode, other_mode), (sha, other_sha)) 

1173 

1174 # Mention added files 

1175 for name in other_names: 

1176 try: 

1177 (other_sha, other_mode) = lookup_entry(name) 

1178 except KeyError: 

1179 pass 

1180 else: 

1181 yield ((None, name), (None, other_mode), (None, other_sha)) 

1182 

1183 

1184def index_entry_from_stat( 

1185 stat_val: os.stat_result, 

1186 hex_sha: bytes, 

1187 mode: Optional[int] = None, 

1188) -> IndexEntry: 

1189 """Create a new index entry from a stat value. 

1190 

1191 Args: 

1192 stat_val: POSIX stat_result instance 

1193 hex_sha: Hex sha of the object 

1194 """ 

1195 if mode is None: 

1196 mode = cleanup_mode(stat_val.st_mode) 

1197 

1198 return IndexEntry( 

1199 ctime=stat_val.st_ctime, 

1200 mtime=stat_val.st_mtime, 

1201 dev=stat_val.st_dev, 

1202 ino=stat_val.st_ino, 

1203 mode=mode, 

1204 uid=stat_val.st_uid, 

1205 gid=stat_val.st_gid, 

1206 size=stat_val.st_size, 

1207 sha=hex_sha, 

1208 flags=0, 

1209 extended_flags=0, 

1210 ) 

1211 

1212 

1213if sys.platform == "win32": 

1214 # On Windows, creating symlinks either requires administrator privileges 

1215 # or developer mode. Raise a more helpful error when we're unable to 

1216 # create symlinks 

1217 

1218 # https://github.com/jelmer/dulwich/issues/1005 

1219 

1220 class WindowsSymlinkPermissionError(PermissionError): 

1221 def __init__(self, errno: int, msg: str, filename: Optional[str]) -> None: 

1222 super(PermissionError, self).__init__( 

1223 errno, 

1224 f"Unable to create symlink; do you have developer mode enabled? {msg}", 

1225 filename, 

1226 ) 

1227 

1228 def symlink( 

1229 src: Union[str, bytes], 

1230 dst: Union[str, bytes], 

1231 target_is_directory: bool = False, 

1232 *, 

1233 dir_fd: Optional[int] = None, 

1234 ) -> None: 

1235 try: 

1236 return os.symlink( 

1237 src, dst, target_is_directory=target_is_directory, dir_fd=dir_fd 

1238 ) 

1239 except PermissionError as e: 

1240 raise WindowsSymlinkPermissionError( 

1241 e.errno or 0, e.strerror or "", e.filename 

1242 ) from e 

1243else: 

1244 symlink = os.symlink 

1245 

1246 

1247def build_file_from_blob( 

1248 blob: Blob, 

1249 mode: int, 

1250 target_path: bytes, 

1251 *, 

1252 honor_filemode: bool = True, 

1253 tree_encoding: str = "utf-8", 

1254 symlink_fn: Optional[Callable] = None, 

1255) -> os.stat_result: 

1256 """Build a file or symlink on disk based on a Git object. 

1257 

1258 Args: 

1259 blob: The git object 

1260 mode: File mode 

1261 target_path: Path to write to 

1262 honor_filemode: An optional flag to honor core.filemode setting in 

1263 config file, default is core.filemode=True, change executable bit 

1264 symlink_fn: Function to use for creating symlinks 

1265 Returns: stat object for the file 

1266 """ 

1267 try: 

1268 oldstat = os.lstat(target_path) 

1269 except FileNotFoundError: 

1270 oldstat = None 

1271 contents = blob.as_raw_string() 

1272 if stat.S_ISLNK(mode): 

1273 if oldstat: 

1274 _remove_file_with_readonly_handling(target_path) 

1275 if sys.platform == "win32": 

1276 # os.readlink on Python3 on Windows requires a unicode string. 

1277 contents_str = contents.decode(tree_encoding) 

1278 target_path_str = target_path.decode(tree_encoding) 

1279 (symlink_fn or symlink)(contents_str, target_path_str) 

1280 else: 

1281 (symlink_fn or symlink)(contents, target_path) 

1282 else: 

1283 if oldstat is not None and oldstat.st_size == len(contents): 

1284 with open(target_path, "rb") as f: 

1285 if f.read() == contents: 

1286 return oldstat 

1287 

1288 with open(target_path, "wb") as f: 

1289 # Write out file 

1290 f.write(contents) 

1291 

1292 if honor_filemode: 

1293 os.chmod(target_path, mode) 

1294 

1295 return os.lstat(target_path) 

1296 

1297 

1298INVALID_DOTNAMES = (b".git", b".", b"..", b"") 

1299 

1300 

1301def validate_path_element_default(element: bytes) -> bool: 

1302 return element.lower() not in INVALID_DOTNAMES 

1303 

1304 

1305def validate_path_element_ntfs(element: bytes) -> bool: 

1306 stripped = element.rstrip(b". ").lower() 

1307 if stripped in INVALID_DOTNAMES: 

1308 return False 

1309 if stripped == b"git~1": 

1310 return False 

1311 return True 

1312 

1313 

1314# HFS+ ignorable Unicode codepoints (from Git's utf8.c) 

1315HFS_IGNORABLE_CHARS = { 

1316 0x200C, # ZERO WIDTH NON-JOINER 

1317 0x200D, # ZERO WIDTH JOINER 

1318 0x200E, # LEFT-TO-RIGHT MARK 

1319 0x200F, # RIGHT-TO-LEFT MARK 

1320 0x202A, # LEFT-TO-RIGHT EMBEDDING 

1321 0x202B, # RIGHT-TO-LEFT EMBEDDING 

1322 0x202C, # POP DIRECTIONAL FORMATTING 

1323 0x202D, # LEFT-TO-RIGHT OVERRIDE 

1324 0x202E, # RIGHT-TO-LEFT OVERRIDE 

1325 0x206A, # INHIBIT SYMMETRIC SWAPPING 

1326 0x206B, # ACTIVATE SYMMETRIC SWAPPING 

1327 0x206C, # INHIBIT ARABIC FORM SHAPING 

1328 0x206D, # ACTIVATE ARABIC FORM SHAPING 

1329 0x206E, # NATIONAL DIGIT SHAPES 

1330 0x206F, # NOMINAL DIGIT SHAPES 

1331 0xFEFF, # ZERO WIDTH NO-BREAK SPACE 

1332} 

1333 

1334 

1335def validate_path_element_hfs(element: bytes) -> bool: 

1336 """Validate path element for HFS+ filesystem. 

1337 

1338 Equivalent to Git's is_hfs_dotgit and related checks. 

1339 Uses NFD normalization and ignores HFS+ ignorable characters. 

1340 """ 

1341 import unicodedata 

1342 

1343 try: 

1344 # Decode to Unicode 

1345 element_str = element.decode("utf-8", errors="strict") 

1346 except UnicodeDecodeError: 

1347 # Malformed UTF-8 - be conservative and reject 

1348 return False 

1349 

1350 # Remove HFS+ ignorable characters (like Git's next_hfs_char) 

1351 filtered = "".join(c for c in element_str if ord(c) not in HFS_IGNORABLE_CHARS) 

1352 

1353 # Normalize to NFD (HFS+ uses a variant of NFD) 

1354 normalized = unicodedata.normalize("NFD", filtered) 

1355 

1356 # Check against invalid names (case-insensitive) 

1357 normalized_bytes = normalized.encode("utf-8", errors="strict") 

1358 if normalized_bytes.lower() in INVALID_DOTNAMES: 

1359 return False 

1360 

1361 # Also check for 8.3 short name 

1362 if normalized_bytes.lower() == b"git~1": 

1363 return False 

1364 

1365 return True 

1366 

1367 

1368def validate_path( 

1369 path: bytes, 

1370 element_validator: Callable[[bytes], bool] = validate_path_element_default, 

1371) -> bool: 

1372 """Default path validator that just checks for .git/.""" 

1373 parts = path.split(b"/") 

1374 for p in parts: 

1375 if not element_validator(p): 

1376 return False 

1377 else: 

1378 return True 

1379 

1380 

1381def build_index_from_tree( 

1382 root_path: Union[str, bytes], 

1383 index_path: Union[str, bytes], 

1384 object_store: ObjectContainer, 

1385 tree_id: bytes, 

1386 honor_filemode: bool = True, 

1387 validate_path_element: Callable[[bytes], bool] = validate_path_element_default, 

1388 symlink_fn: Optional[Callable] = None, 

1389 blob_normalizer: Optional["BlobNormalizer"] = None, 

1390) -> None: 

1391 """Generate and materialize index from a tree. 

1392 

1393 Args: 

1394 tree_id: Tree to materialize 

1395 root_path: Target dir for materialized index files 

1396 index_path: Target path for generated index 

1397 object_store: Non-empty object store holding tree contents 

1398 honor_filemode: An optional flag to honor core.filemode setting in 

1399 config file, default is core.filemode=True, change executable bit 

1400 validate_path_element: Function to validate path elements to check 

1401 out; default just refuses .git and .. directories. 

1402 blob_normalizer: An optional BlobNormalizer to use for converting line 

1403 endings when writing blobs to the working directory. 

1404 

1405 Note: existing index is wiped and contents are not merged 

1406 in a working dir. Suitable only for fresh clones. 

1407 """ 

1408 index = Index(index_path, read=False) 

1409 if not isinstance(root_path, bytes): 

1410 root_path = os.fsencode(root_path) 

1411 

1412 for entry in iter_tree_contents(object_store, tree_id): 

1413 if not validate_path(entry.path, validate_path_element): 

1414 continue 

1415 full_path = _tree_to_fs_path(root_path, entry.path) 

1416 

1417 if not os.path.exists(os.path.dirname(full_path)): 

1418 os.makedirs(os.path.dirname(full_path)) 

1419 

1420 # TODO(jelmer): Merge new index into working tree 

1421 if S_ISGITLINK(entry.mode): 

1422 if not os.path.isdir(full_path): 

1423 os.mkdir(full_path) 

1424 st = os.lstat(full_path) 

1425 # TODO(jelmer): record and return submodule paths 

1426 else: 

1427 obj = object_store[entry.sha] 

1428 assert isinstance(obj, Blob) 

1429 # Apply blob normalization for checkout if normalizer is provided 

1430 if blob_normalizer is not None: 

1431 obj = blob_normalizer.checkout_normalize(obj, entry.path) 

1432 st = build_file_from_blob( 

1433 obj, 

1434 entry.mode, 

1435 full_path, 

1436 honor_filemode=honor_filemode, 

1437 symlink_fn=symlink_fn, 

1438 ) 

1439 

1440 # Add file to index 

1441 if not honor_filemode or S_ISGITLINK(entry.mode): 

1442 # we can not use tuple slicing to build a new tuple, 

1443 # because on windows that will convert the times to 

1444 # longs, which causes errors further along 

1445 st_tuple = ( 

1446 entry.mode, 

1447 st.st_ino, 

1448 st.st_dev, 

1449 st.st_nlink, 

1450 st.st_uid, 

1451 st.st_gid, 

1452 st.st_size, 

1453 st.st_atime, 

1454 st.st_mtime, 

1455 st.st_ctime, 

1456 ) 

1457 st = st.__class__(st_tuple) 

1458 # default to a stage 0 index entry (normal) 

1459 # when reading from the filesystem 

1460 index[entry.path] = index_entry_from_stat(st, entry.sha) 

1461 

1462 index.write() 

1463 

1464 

1465def blob_from_path_and_mode( 

1466 fs_path: bytes, mode: int, tree_encoding: str = "utf-8" 

1467) -> Blob: 

1468 """Create a blob from a path and a stat object. 

1469 

1470 Args: 

1471 fs_path: Full file system path to file 

1472 mode: File mode 

1473 Returns: A `Blob` object 

1474 """ 

1475 assert isinstance(fs_path, bytes) 

1476 blob = Blob() 

1477 if stat.S_ISLNK(mode): 

1478 if sys.platform == "win32": 

1479 # os.readlink on Python3 on Windows requires a unicode string. 

1480 blob.data = os.readlink(os.fsdecode(fs_path)).encode(tree_encoding) 

1481 else: 

1482 blob.data = os.readlink(fs_path) 

1483 else: 

1484 with open(fs_path, "rb") as f: 

1485 blob.data = f.read() 

1486 return blob 

1487 

1488 

1489def blob_from_path_and_stat( 

1490 fs_path: bytes, st: os.stat_result, tree_encoding: str = "utf-8" 

1491) -> Blob: 

1492 """Create a blob from a path and a stat object. 

1493 

1494 Args: 

1495 fs_path: Full file system path to file 

1496 st: A stat object 

1497 Returns: A `Blob` object 

1498 """ 

1499 return blob_from_path_and_mode(fs_path, st.st_mode, tree_encoding) 

1500 

1501 

1502def read_submodule_head(path: Union[str, bytes]) -> Optional[bytes]: 

1503 """Read the head commit of a submodule. 

1504 

1505 Args: 

1506 path: path to the submodule 

1507 Returns: HEAD sha, None if not a valid head/repository 

1508 """ 

1509 from .errors import NotGitRepository 

1510 from .repo import Repo 

1511 

1512 # Repo currently expects a "str", so decode if necessary. 

1513 # TODO(jelmer): Perhaps move this into Repo() ? 

1514 if not isinstance(path, str): 

1515 path = os.fsdecode(path) 

1516 try: 

1517 repo = Repo(path) 

1518 except NotGitRepository: 

1519 return None 

1520 try: 

1521 return repo.head() 

1522 except KeyError: 

1523 return None 

1524 

1525 

1526def _has_directory_changed(tree_path: bytes, entry: IndexEntry) -> bool: 

1527 """Check if a directory has changed after getting an error. 

1528 

1529 When handling an error trying to create a blob from a path, call this 

1530 function. It will check if the path is a directory. If it's a directory 

1531 and a submodule, check the submodule head to see if it's has changed. If 

1532 not, consider the file as changed as Git tracked a file and not a 

1533 directory. 

1534 

1535 Return true if the given path should be considered as changed and False 

1536 otherwise or if the path is not a directory. 

1537 """ 

1538 # This is actually a directory 

1539 if os.path.exists(os.path.join(tree_path, b".git")): 

1540 # Submodule 

1541 head = read_submodule_head(tree_path) 

1542 if entry.sha != head: 

1543 return True 

1544 else: 

1545 # The file was changed to a directory, so consider it removed. 

1546 return True 

1547 

1548 return False 

1549 

1550 

1551os_sep_bytes = os.sep.encode("ascii") 

1552 

1553 

1554def _ensure_parent_dir_exists(full_path: bytes) -> None: 

1555 """Ensure parent directory exists, checking no parent is a file.""" 

1556 parent_dir = os.path.dirname(full_path) 

1557 if parent_dir and not os.path.exists(parent_dir): 

1558 # Check if any parent in the path is a file 

1559 parts = parent_dir.split(os_sep_bytes) 

1560 for i in range(len(parts)): 

1561 partial_path = os_sep_bytes.join(parts[: i + 1]) 

1562 if ( 

1563 partial_path 

1564 and os.path.exists(partial_path) 

1565 and not os.path.isdir(partial_path) 

1566 ): 

1567 # Parent path is a file, this is an error 

1568 raise OSError( 

1569 f"Cannot create directory, parent path is a file: {partial_path!r}" 

1570 ) 

1571 os.makedirs(parent_dir) 

1572 

1573 

1574def _remove_file_with_readonly_handling(path: bytes) -> None: 

1575 """Remove a file, handling read-only files on Windows. 

1576 

1577 Args: 

1578 path: Path to the file to remove 

1579 """ 

1580 try: 

1581 os.unlink(path) 

1582 except PermissionError: 

1583 # On Windows, remove read-only attribute and retry 

1584 if sys.platform == "win32": 

1585 os.chmod(path, stat.S_IWRITE | stat.S_IREAD) 

1586 os.unlink(path) 

1587 else: 

1588 raise 

1589 

1590 

1591def _remove_empty_parents(path: bytes, stop_at: bytes) -> None: 

1592 """Remove empty parent directories up to stop_at.""" 

1593 parent = os.path.dirname(path) 

1594 while parent and parent != stop_at: 

1595 try: 

1596 os.rmdir(parent) 

1597 parent = os.path.dirname(parent) 

1598 except FileNotFoundError: 

1599 # Directory doesn't exist - stop trying 

1600 break 

1601 except OSError as e: 

1602 if e.errno == errno.ENOTEMPTY: 

1603 # Directory not empty - stop trying 

1604 break 

1605 raise 

1606 

1607 

1608def _check_symlink_matches( 

1609 full_path: bytes, repo_object_store, entry_sha: bytes 

1610) -> bool: 

1611 """Check if symlink target matches expected target. 

1612 

1613 Returns True if symlink needs to be written, False if it matches. 

1614 """ 

1615 try: 

1616 current_target = os.readlink(full_path) 

1617 blob_obj = repo_object_store[entry_sha] 

1618 expected_target = blob_obj.as_raw_string() 

1619 if isinstance(current_target, str): 

1620 current_target = current_target.encode() 

1621 return current_target != expected_target 

1622 except FileNotFoundError: 

1623 # Symlink doesn't exist 

1624 return True 

1625 except OSError as e: 

1626 if e.errno == errno.EINVAL: 

1627 # Not a symlink 

1628 return True 

1629 raise 

1630 

1631 

1632def _check_file_matches( 

1633 repo_object_store, 

1634 full_path: bytes, 

1635 entry_sha: bytes, 

1636 entry_mode: int, 

1637 current_stat: os.stat_result, 

1638 honor_filemode: bool, 

1639 blob_normalizer: Optional["BlobNormalizer"] = None, 

1640 tree_path: Optional[bytes] = None, 

1641) -> bool: 

1642 """Check if a file on disk matches the expected git object. 

1643 

1644 Returns True if file needs to be written, False if it matches. 

1645 """ 

1646 # Check mode first (if honor_filemode is True) 

1647 if honor_filemode: 

1648 current_mode = stat.S_IMODE(current_stat.st_mode) 

1649 expected_mode = stat.S_IMODE(entry_mode) 

1650 if current_mode != expected_mode: 

1651 return True 

1652 

1653 # If mode matches (or we don't care), check content via size first 

1654 blob_obj = repo_object_store[entry_sha] 

1655 if current_stat.st_size != blob_obj.raw_length(): 

1656 return True 

1657 

1658 # Size matches, check actual content 

1659 try: 

1660 with open(full_path, "rb") as f: 

1661 current_content = f.read() 

1662 expected_content = blob_obj.as_raw_string() 

1663 if blob_normalizer and tree_path is not None: 

1664 normalized_blob = blob_normalizer.checkout_normalize( 

1665 blob_obj, tree_path 

1666 ) 

1667 expected_content = normalized_blob.as_raw_string() 

1668 return current_content != expected_content 

1669 except (FileNotFoundError, PermissionError, IsADirectoryError): 

1670 return True 

1671 

1672 

1673def _transition_to_submodule(repo, path, full_path, current_stat, entry, index): 

1674 """Transition any type to submodule.""" 

1675 from .submodule import ensure_submodule_placeholder 

1676 

1677 if current_stat is not None and stat.S_ISDIR(current_stat.st_mode): 

1678 # Already a directory, just ensure .git file exists 

1679 ensure_submodule_placeholder(repo, path) 

1680 else: 

1681 # Remove whatever is there and create submodule 

1682 if current_stat is not None: 

1683 _remove_file_with_readonly_handling(full_path) 

1684 ensure_submodule_placeholder(repo, path) 

1685 

1686 st = os.lstat(full_path) 

1687 index[path] = index_entry_from_stat(st, entry.sha) 

1688 

1689 

1690def _transition_to_file( 

1691 object_store, 

1692 path, 

1693 full_path, 

1694 current_stat, 

1695 entry, 

1696 index, 

1697 honor_filemode, 

1698 symlink_fn, 

1699 blob_normalizer, 

1700): 

1701 """Transition any type to regular file or symlink.""" 

1702 # Check if we need to update 

1703 if ( 

1704 current_stat is not None 

1705 and stat.S_ISREG(current_stat.st_mode) 

1706 and not stat.S_ISLNK(entry.mode) 

1707 ): 

1708 # File to file - check if update needed 

1709 needs_update = _check_file_matches( 

1710 object_store, 

1711 full_path, 

1712 entry.sha, 

1713 entry.mode, 

1714 current_stat, 

1715 honor_filemode, 

1716 blob_normalizer, 

1717 path, 

1718 ) 

1719 elif ( 

1720 current_stat is not None 

1721 and stat.S_ISLNK(current_stat.st_mode) 

1722 and stat.S_ISLNK(entry.mode) 

1723 ): 

1724 # Symlink to symlink - check if update needed 

1725 needs_update = _check_symlink_matches(full_path, object_store, entry.sha) 

1726 else: 

1727 needs_update = True 

1728 

1729 if not needs_update: 

1730 # Just update index - current_stat should always be valid here since we're not updating 

1731 index[path] = index_entry_from_stat(current_stat, entry.sha) 

1732 return 

1733 

1734 # Remove existing entry if needed 

1735 if current_stat is not None and stat.S_ISDIR(current_stat.st_mode): 

1736 # Remove directory 

1737 dir_contents = set(os.listdir(full_path)) 

1738 git_file_name = b".git" if isinstance(full_path, bytes) else ".git" 

1739 

1740 if git_file_name in dir_contents: 

1741 if dir_contents != {git_file_name}: 

1742 raise IsADirectoryError( 

1743 f"Cannot replace submodule with untracked files: {full_path!r}" 

1744 ) 

1745 shutil.rmtree(full_path) 

1746 else: 

1747 try: 

1748 os.rmdir(full_path) 

1749 except OSError as e: 

1750 if e.errno == errno.ENOTEMPTY: 

1751 raise IsADirectoryError( 

1752 f"Cannot replace non-empty directory with file: {full_path!r}" 

1753 ) 

1754 raise 

1755 elif current_stat is not None: 

1756 _remove_file_with_readonly_handling(full_path) 

1757 

1758 # Ensure parent directory exists 

1759 _ensure_parent_dir_exists(full_path) 

1760 

1761 # Write the file 

1762 blob_obj = object_store[entry.sha] 

1763 assert isinstance(blob_obj, Blob) 

1764 if blob_normalizer: 

1765 blob_obj = blob_normalizer.checkout_normalize(blob_obj, path) 

1766 st = build_file_from_blob( 

1767 blob_obj, 

1768 entry.mode, 

1769 full_path, 

1770 honor_filemode=honor_filemode, 

1771 symlink_fn=symlink_fn, 

1772 ) 

1773 index[path] = index_entry_from_stat(st, entry.sha) 

1774 

1775 

1776def _transition_to_absent(repo, path, full_path, current_stat, index): 

1777 """Remove any type of entry.""" 

1778 if current_stat is None: 

1779 return 

1780 

1781 if stat.S_ISDIR(current_stat.st_mode): 

1782 # Check if it's a submodule directory 

1783 dir_contents = set(os.listdir(full_path)) 

1784 git_file_name = b".git" if isinstance(full_path, bytes) else ".git" 

1785 

1786 if git_file_name in dir_contents and dir_contents == {git_file_name}: 

1787 shutil.rmtree(full_path) 

1788 else: 

1789 try: 

1790 os.rmdir(full_path) 

1791 except OSError as e: 

1792 if e.errno not in (errno.ENOTEMPTY, errno.EEXIST): 

1793 raise 

1794 else: 

1795 _remove_file_with_readonly_handling(full_path) 

1796 

1797 try: 

1798 del index[path] 

1799 except KeyError: 

1800 pass 

1801 

1802 # Try to remove empty parent directories 

1803 _remove_empty_parents( 

1804 full_path, repo.path if isinstance(repo.path, bytes) else repo.path.encode() 

1805 ) 

1806 

1807 

1808def update_working_tree( 

1809 repo: "Repo", 

1810 old_tree_id: Optional[bytes], 

1811 new_tree_id: bytes, 

1812 honor_filemode: bool = True, 

1813 validate_path_element: Optional[Callable[[bytes], bool]] = None, 

1814 symlink_fn: Optional[Callable] = None, 

1815 force_remove_untracked: bool = False, 

1816 blob_normalizer: Optional["BlobNormalizer"] = None, 

1817) -> None: 

1818 """Update the working tree and index to match a new tree. 

1819 

1820 This function handles: 

1821 - Adding new files 

1822 - Updating modified files 

1823 - Removing deleted files 

1824 - Cleaning up empty directories 

1825 

1826 Args: 

1827 repo: Repository object 

1828 old_tree_id: SHA of the tree before the update 

1829 new_tree_id: SHA of the tree to update to 

1830 honor_filemode: An optional flag to honor core.filemode setting 

1831 validate_path_element: Function to validate path elements to check out 

1832 symlink_fn: Function to use for creating symlinks 

1833 force_remove_untracked: If True, remove files that exist in working 

1834 directory but not in target tree, even if old_tree_id is None 

1835 blob_normalizer: An optional BlobNormalizer to use for converting line 

1836 endings when writing blobs to the working directory. 

1837 """ 

1838 if validate_path_element is None: 

1839 validate_path_element = validate_path_element_default 

1840 

1841 repo_path = repo.path if isinstance(repo.path, bytes) else repo.path.encode() 

1842 index = repo.open_index() 

1843 

1844 # Build sets of paths for efficient lookup 

1845 new_paths = {} 

1846 for entry in iter_tree_contents(repo.object_store, new_tree_id): 

1847 if entry.path.startswith(b".git") or not validate_path( 

1848 entry.path, validate_path_element 

1849 ): 

1850 continue 

1851 new_paths[entry.path] = entry 

1852 

1853 old_paths = {} 

1854 if old_tree_id: 

1855 for entry in iter_tree_contents(repo.object_store, old_tree_id): 

1856 if not entry.path.startswith(b".git"): 

1857 old_paths[entry.path] = entry 

1858 

1859 # Process all paths 

1860 all_paths = set(new_paths.keys()) | set(old_paths.keys()) 

1861 

1862 # Check for paths that need to become directories 

1863 paths_needing_dir = set() 

1864 for path in new_paths: 

1865 parts = path.split(b"/") 

1866 for i in range(1, len(parts)): 

1867 parent = b"/".join(parts[:i]) 

1868 if parent in old_paths and parent not in new_paths: 

1869 paths_needing_dir.add(parent) 

1870 

1871 # Check if any path that needs to become a directory has been modified 

1872 current_stat: Optional[os.stat_result] 

1873 stat_cache: dict[bytes, Optional[os.stat_result]] = {} 

1874 for path in paths_needing_dir: 

1875 full_path = _tree_to_fs_path(repo_path, path) 

1876 try: 

1877 current_stat = os.lstat(full_path) 

1878 except FileNotFoundError: 

1879 # File doesn't exist, proceed 

1880 stat_cache[full_path] = None 

1881 except PermissionError: 

1882 # Can't read file, proceed 

1883 pass 

1884 else: 

1885 stat_cache[full_path] = current_stat 

1886 if stat.S_ISREG(current_stat.st_mode): 

1887 # Check if file has been modified 

1888 old_entry = old_paths[path] 

1889 if _check_file_matches( 

1890 repo.object_store, 

1891 full_path, 

1892 old_entry.sha, 

1893 old_entry.mode, 

1894 current_stat, 

1895 honor_filemode, 

1896 blob_normalizer, 

1897 path, 

1898 ): 

1899 # File has been modified, can't replace with directory 

1900 raise OSError( 

1901 f"Cannot replace modified file with directory: {path!r}" 

1902 ) 

1903 

1904 # Process in two passes: deletions first, then additions/updates 

1905 # This handles case-only renames on case-insensitive filesystems correctly 

1906 paths_to_remove = [] 

1907 paths_to_update = [] 

1908 

1909 for path in sorted(all_paths): 

1910 if path in new_paths: 

1911 paths_to_update.append(path) 

1912 else: 

1913 paths_to_remove.append(path) 

1914 

1915 # First process removals 

1916 for path in paths_to_remove: 

1917 full_path = _tree_to_fs_path(repo_path, path) 

1918 

1919 # Determine current state - use cache if available 

1920 try: 

1921 current_stat = stat_cache[full_path] 

1922 except KeyError: 

1923 try: 

1924 current_stat = os.lstat(full_path) 

1925 except FileNotFoundError: 

1926 current_stat = None 

1927 

1928 _transition_to_absent(repo, path, full_path, current_stat, index) 

1929 

1930 # Then process additions/updates 

1931 for path in paths_to_update: 

1932 full_path = _tree_to_fs_path(repo_path, path) 

1933 

1934 # Determine current state - use cache if available 

1935 try: 

1936 current_stat = stat_cache[full_path] 

1937 except KeyError: 

1938 try: 

1939 current_stat = os.lstat(full_path) 

1940 except FileNotFoundError: 

1941 current_stat = None 

1942 

1943 new_entry = new_paths[path] 

1944 

1945 # Path should exist 

1946 if S_ISGITLINK(new_entry.mode): 

1947 _transition_to_submodule( 

1948 repo, path, full_path, current_stat, new_entry, index 

1949 ) 

1950 else: 

1951 _transition_to_file( 

1952 repo.object_store, 

1953 path, 

1954 full_path, 

1955 current_stat, 

1956 new_entry, 

1957 index, 

1958 honor_filemode, 

1959 symlink_fn, 

1960 blob_normalizer, 

1961 ) 

1962 

1963 # Handle force_remove_untracked 

1964 if force_remove_untracked: 

1965 for root, dirs, files in os.walk(repo_path): 

1966 if b".git" in os.fsencode(root): 

1967 continue 

1968 root_bytes = os.fsencode(root) 

1969 for file in files: 

1970 full_path = os.path.join(root_bytes, os.fsencode(file)) 

1971 tree_path = os.path.relpath(full_path, repo_path) 

1972 if os.sep != "/": 

1973 tree_path = tree_path.replace(os.sep.encode(), b"/") 

1974 

1975 if tree_path not in new_paths: 

1976 _remove_file_with_readonly_handling(full_path) 

1977 if tree_path in index: 

1978 del index[tree_path] 

1979 

1980 # Clean up empty directories 

1981 for root, dirs, files in os.walk(repo_path, topdown=False): 

1982 root_bytes = os.fsencode(root) 

1983 if ( 

1984 b".git" not in root_bytes 

1985 and root_bytes != repo_path 

1986 and not files 

1987 and not dirs 

1988 ): 

1989 try: 

1990 os.rmdir(root) 

1991 except FileNotFoundError: 

1992 # Directory was already removed 

1993 pass 

1994 except OSError as e: 

1995 if e.errno != errno.ENOTEMPTY: 

1996 # Only ignore "directory not empty" errors 

1997 raise 

1998 

1999 index.write() 

2000 

2001 

2002def get_unstaged_changes( 

2003 index: Index, 

2004 root_path: Union[str, bytes], 

2005 filter_blob_callback: Optional[Callable] = None, 

2006) -> Generator[bytes, None, None]: 

2007 """Walk through an index and check for differences against working tree. 

2008 

2009 Args: 

2010 index: index to check 

2011 root_path: path in which to find files 

2012 Returns: iterator over paths with unstaged changes 

2013 """ 

2014 # For each entry in the index check the sha1 & ensure not staged 

2015 if not isinstance(root_path, bytes): 

2016 root_path = os.fsencode(root_path) 

2017 

2018 for tree_path, entry in index.iteritems(): 

2019 full_path = _tree_to_fs_path(root_path, tree_path) 

2020 if isinstance(entry, ConflictedIndexEntry): 

2021 # Conflicted files are always unstaged 

2022 yield tree_path 

2023 continue 

2024 

2025 try: 

2026 st = os.lstat(full_path) 

2027 if stat.S_ISDIR(st.st_mode): 

2028 if _has_directory_changed(tree_path, entry): 

2029 yield tree_path 

2030 continue 

2031 

2032 if not stat.S_ISREG(st.st_mode) and not stat.S_ISLNK(st.st_mode): 

2033 continue 

2034 

2035 blob = blob_from_path_and_stat(full_path, st) 

2036 

2037 if filter_blob_callback is not None: 

2038 blob = filter_blob_callback(blob, tree_path) 

2039 except FileNotFoundError: 

2040 # The file was removed, so we assume that counts as 

2041 # different from whatever file used to exist. 

2042 yield tree_path 

2043 else: 

2044 if blob.id != entry.sha: 

2045 yield tree_path 

2046 

2047 

2048def _tree_to_fs_path(root_path: bytes, tree_path: bytes) -> bytes: 

2049 """Convert a git tree path to a file system path. 

2050 

2051 Args: 

2052 root_path: Root filesystem path 

2053 tree_path: Git tree path as bytes 

2054 

2055 Returns: File system path. 

2056 """ 

2057 assert isinstance(tree_path, bytes) 

2058 if os_sep_bytes != b"/": 

2059 sep_corrected_path = tree_path.replace(b"/", os_sep_bytes) 

2060 else: 

2061 sep_corrected_path = tree_path 

2062 return os.path.join(root_path, sep_corrected_path) 

2063 

2064 

2065def _fs_to_tree_path(fs_path: Union[str, bytes]) -> bytes: 

2066 """Convert a file system path to a git tree path. 

2067 

2068 Args: 

2069 fs_path: File system path. 

2070 

2071 Returns: Git tree path as bytes 

2072 """ 

2073 if not isinstance(fs_path, bytes): 

2074 fs_path_bytes = os.fsencode(fs_path) 

2075 else: 

2076 fs_path_bytes = fs_path 

2077 if os_sep_bytes != b"/": 

2078 tree_path = fs_path_bytes.replace(os_sep_bytes, b"/") 

2079 else: 

2080 tree_path = fs_path_bytes 

2081 return tree_path 

2082 

2083 

2084def index_entry_from_directory(st: os.stat_result, path: bytes) -> Optional[IndexEntry]: 

2085 if os.path.exists(os.path.join(path, b".git")): 

2086 head = read_submodule_head(path) 

2087 if head is None: 

2088 return None 

2089 return index_entry_from_stat(st, head, mode=S_IFGITLINK) 

2090 return None 

2091 

2092 

2093def index_entry_from_path( 

2094 path: bytes, object_store: Optional[ObjectContainer] = None 

2095) -> Optional[IndexEntry]: 

2096 """Create an index from a filesystem path. 

2097 

2098 This returns an index value for files, symlinks 

2099 and tree references. for directories and 

2100 non-existent files it returns None 

2101 

2102 Args: 

2103 path: Path to create an index entry for 

2104 object_store: Optional object store to 

2105 save new blobs in 

2106 Returns: An index entry; None for directories 

2107 """ 

2108 assert isinstance(path, bytes) 

2109 st = os.lstat(path) 

2110 if stat.S_ISDIR(st.st_mode): 

2111 return index_entry_from_directory(st, path) 

2112 

2113 if stat.S_ISREG(st.st_mode) or stat.S_ISLNK(st.st_mode): 

2114 blob = blob_from_path_and_stat(path, st) 

2115 if object_store is not None: 

2116 object_store.add_object(blob) 

2117 return index_entry_from_stat(st, blob.id) 

2118 

2119 return None 

2120 

2121 

2122def iter_fresh_entries( 

2123 paths: Iterable[bytes], 

2124 root_path: bytes, 

2125 object_store: Optional[ObjectContainer] = None, 

2126) -> Iterator[tuple[bytes, Optional[IndexEntry]]]: 

2127 """Iterate over current versions of index entries on disk. 

2128 

2129 Args: 

2130 paths: Paths to iterate over 

2131 root_path: Root path to access from 

2132 object_store: Optional store to save new blobs in 

2133 Returns: Iterator over path, index_entry 

2134 """ 

2135 for path in paths: 

2136 p = _tree_to_fs_path(root_path, path) 

2137 try: 

2138 entry = index_entry_from_path(p, object_store=object_store) 

2139 except (FileNotFoundError, IsADirectoryError): 

2140 entry = None 

2141 yield path, entry 

2142 

2143 

2144def iter_fresh_objects( 

2145 paths: Iterable[bytes], 

2146 root_path: bytes, 

2147 include_deleted: bool = False, 

2148 object_store: Optional[ObjectContainer] = None, 

2149) -> Iterator[tuple[bytes, Optional[bytes], Optional[int]]]: 

2150 """Iterate over versions of objects on disk referenced by index. 

2151 

2152 Args: 

2153 root_path: Root path to access from 

2154 include_deleted: Include deleted entries with sha and 

2155 mode set to None 

2156 object_store: Optional object store to report new items to 

2157 Returns: Iterator over path, sha, mode 

2158 """ 

2159 for path, entry in iter_fresh_entries(paths, root_path, object_store=object_store): 

2160 if entry is None: 

2161 if include_deleted: 

2162 yield path, None, None 

2163 else: 

2164 yield path, entry.sha, cleanup_mode(entry.mode) 

2165 

2166 

2167def refresh_index(index: Index, root_path: bytes) -> None: 

2168 """Refresh the contents of an index. 

2169 

2170 This is the equivalent to running 'git commit -a'. 

2171 

2172 Args: 

2173 index: Index to update 

2174 root_path: Root filesystem path 

2175 """ 

2176 for path, entry in iter_fresh_entries(index, root_path): 

2177 if entry: 

2178 index[path] = entry 

2179 

2180 

2181class locked_index: 

2182 """Lock the index while making modifications. 

2183 

2184 Works as a context manager. 

2185 """ 

2186 

2187 _file: "_GitFile" 

2188 

2189 def __init__(self, path: Union[bytes, str]) -> None: 

2190 self._path = path 

2191 

2192 def __enter__(self) -> Index: 

2193 self._file = GitFile(self._path, "wb") 

2194 self._index = Index(self._path) 

2195 return self._index 

2196 

2197 def __exit__( 

2198 self, 

2199 exc_type: Optional[type], 

2200 exc_value: Optional[BaseException], 

2201 traceback: Optional[types.TracebackType], 

2202 ) -> None: 

2203 if exc_type is not None: 

2204 self._file.abort() 

2205 return 

2206 try: 

2207 from typing import BinaryIO, cast 

2208 

2209 f = SHA1Writer(cast(BinaryIO, self._file)) 

2210 write_index_dict(cast(BinaryIO, f), self._index._byname) 

2211 except BaseException: 

2212 self._file.abort() 

2213 else: 

2214 f.close()